Project
Loading...
Searching...
No Matches
dataSamplingParallel.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13
14#include <thread>
15
16using namespace o2::framework;
17using namespace o2::utilities;
18void customize(std::vector<CompletionPolicy>& policies)
19{
21}
22void customize(std::vector<ChannelConfigurationPolicy>& policies)
23{
25}
26
27#include "Framework/InputSpec.h"
34
35#include <chrono>
36#include <iostream>
37
38#include <boost/algorithm/string.hpp>
39
40using namespace o2::framework;
41
42struct FakeCluster {
43 float x;
44 float y;
45 float z;
46 float q;
47};
49
50size_t parallelSize = 4;
51size_t collectionChunkSize = 1000;
55
57{
58 auto dataProducers = parallel(
60 "dataProducer",
61 Inputs{},
62 {OutputSpec{"TPC", "CLUSTERS"}},
66 [](DataProcessorSpec& spec, size_t index) {
68 });
69
70 auto processingStages = parallel(
72 "processingStage",
73 Inputs{
74 {"dataTPC", "TPC", "CLUSTERS"}},
75 Outputs{
76 {"TPC", "CLUSTERS_P"}},
80 [](DataProcessorSpec& spec, size_t index) {
83 });
84
85 auto inputsSink = mergeInputs(
86 {"dataTPC-proc", "TPC", "CLUSTERS_P"},
88 [](InputSpec& input, size_t index) {
90 });
91
93 "sink",
94 inputsSink,
95 Outputs{},
98
99 // clang-format off
100 DataProcessorSpec simpleQcTask{
101 "simpleQcTask",
102 Inputs{
103 { "TPC_CLUSTERS_S", { "DS", "simpleQcTask0" } },
104 { "TPC_CLUSTERS_P_S", { "DS", "simpleQcTask1" } }
105 },
106 Outputs{},
109 auto inputDataTpc = reinterpret_cast<const FakeCluster*>(ctx.inputs().get("TPC_CLUSTERS_S").payload);
110 auto inputDataTpcProcessed = reinterpret_cast<const FakeCluster*>(ctx.inputs().get(
111 "TPC_CLUSTERS_P_S").payload);
112
113 auto ref = ctx.inputs().get("TPC_CLUSTERS_S");
114 const auto* header = DataRefUtils::getHeader<DataHeader*>(ref);
115
116 bool dataGood = true;
117 for (int j = 0; j < DataRefUtils::getPayloadSize(ref) / sizeof(FakeCluster); ++j) {
118 float diff = std::abs(-inputDataTpc[j].x - inputDataTpcProcessed[j].x) +
119 std::abs(2 * inputDataTpc[j].y - inputDataTpcProcessed[j].y) +
120 std::abs(inputDataTpc[j].z * inputDataTpc[j].q - inputDataTpcProcessed[j].z) +
121 std::abs(inputDataTpc[j].q - inputDataTpcProcessed[j].q);
122 if (diff > 1) {
123 dataGood = false;
124 break;
125 }
126 }
127
128 LOG(info) << "simpleQcTask - received data is " << (dataGood ? "correct" : "wrong");
129 }
130 }
131 };
132
133 DataProcessorSpec dummyProducer{
134 "dummy",
135 Inputs{},
136 Outputs{
137 { {"tsthistos"}, "TST", "HISTOS", 0 },
138 { {"tststring"}, "TST", "STRING", 0 }
139 },
141 };
142
143 WorkflowSpec specs;
144 specs.swap(dataProducers);
145 specs.insert(std::end(specs), std::begin(processingStages), std::end(processingStages));
146 specs.push_back(sink);
147 specs.push_back(simpleQcTask);
148 specs.push_back(dummyProducer);
149
150 const char* o2Root = getenv("O2_ROOT");
151 if (o2Root == nullptr) {
152 throw std::runtime_error("The O2_ROOT environment variable is not set, probably the O2 environment has not been loaded.");
153 }
154 std::string configurationSource = std::string("json:/") + o2Root + "/share/etc/exampleDataSamplingConfig.json";
155 DataSampling::GenerateInfrastructure(specs, configurationSource);
156 return specs;
157}
158// clang-format on
159
161{
162 size_t index = ctx.services().get<ParallelContext>().index1D();
163 std::this_thread::sleep_for(std::chrono::seconds(1));
164 // Creates a new message of size collectionChunkSize which
165 // has "TPC" as data origin and "CLUSTERS" as data description.
166 auto& tpcClusters = ctx.outputs().make<FakeCluster>(
168 int i = 0;
169
170 for (auto& cluster : tpcClusters) {
171 assert(i < collectionChunkSize);
172 cluster.x = index;
173 cluster.y = i;
174 cluster.z = i;
175 cluster.q = rand() % 1000;
176 i++;
177 }
178}
179
181{
182 size_t index = ctx.services().get<ParallelContext>().index1D();
183
184 const FakeCluster* inputDataTpc = reinterpret_cast<const FakeCluster*>(ctx.inputs().get("dataTPC").payload);
185 auto& processedTpcClusters = ctx.outputs().make<FakeCluster>(
186 Output{"TPC", "CLUSTERS_P", static_cast<o2::header::DataHeader::SubSpecificationType>(index)},
188
189 int i = 0;
190 for (auto& cluster : processedTpcClusters) {
191 assert(i < collectionChunkSize);
192 cluster.x = -inputDataTpc[i].x;
193 cluster.y = 2 * inputDataTpc[i].y;
194 cluster.z = inputDataTpc[i].z * inputDataTpc[i].q;
195 cluster.q = inputDataTpc[i].q;
196 i++;
197 }
198};
199
201{
202 const FakeCluster* inputDataTpc = reinterpret_cast<const FakeCluster*>(ctx.inputs().get("dataTPC-proc").payload);
203}
Definition of O2 Data Sampling, v1.0.
int32_t i
uint32_t j
Definition RawData.h:0
decltype(auto) make(const Output &spec, Args... args)
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
static void CustomizeInfrastructure(std::vector< framework::CompletionPolicy > &)
Configures dispatcher to consume any data immediately.
static void GenerateInfrastructure(framework::WorkflowSpec &workflow, const std::string &policiesSource, size_t threads=1, const std::string &host="")
Generates data sampling infrastructure.
size_t collectionChunkSize
void someProcessingStageAlgorithm(ProcessingContext &ctx)
void customize(std::vector< CompletionPolicy > &policies)
size_t parallelSize
void someDataProducerAlgorithm(ProcessingContext &ctx)
void someSinkAlgorithm(ProcessingContext &ctx)
WorkflowSpec defineDataProcessing(ConfigContext const &)
This function hooks up the the workflow specifications into the DPL driver.
GLint GLenum GLint x
Definition glcorearb.h:403
GLuint index
Definition glcorearb.h:781
GLdouble GLdouble GLdouble z
Definition glcorearb.h:843
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
WorkflowSpec parallel(DataProcessorSpec original, size_t maxIndex, std::function< void(DataProcessorSpec &, size_t id)> amendCallback)
Inputs mergeInputs(InputSpec original, size_t maxIndex, std::function< void(InputSpec &, size_t)> amendCallback)
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
A header which contains some meta-data generated by Data Sampling.
std::function< void(ProcessingContext &)> ProcessCallback
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
the main header struct
Definition DataHeader.h:618
uint32_t SubSpecificationType
Definition DataHeader.h:620
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"