Project
Loading...
Searching...
No Matches
test_Parallel.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include "Framework/InputSpec.h"
19
20using namespace o2::framework;
21
22struct FakeCluster {
23 float x;
24 float y;
25 float z;
26 float q;
27};
29
30size_t parallelSize = 4;
31size_t collectionChunkSize = 1000;
34
35std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const&)
36{
37 std::vector<DataProcessorSpec> specs;
38 auto dataProducers = parallel(
40 "dataProducer",
41 Inputs{},
42 {OutputSpec{"TPC", "CLUSTERS", 0, Lifetime::Timeframe}},
46 [](DataProcessorSpec& spec, size_t index) {
48 });
49
50 auto processingStages = parallel(
52 "processingStage",
53 Inputs{
54 {"dataTPC", "TPC", "CLUSTERS", 0, Lifetime::Timeframe}},
55 Outputs{
56 {"TPC", "CLUSTERS_P", 0, Lifetime::Timeframe}},
58 // CLion says it ambiguous without (AlgorithmSpec::ProcessCallback), but cmake compiles fine anyway.
61 [](DataProcessorSpec& spec, size_t index) {
64 });
65
66 auto inputsDataSampler = mergeInputs(
67 {"dataTPC", "TPC", "CLUSTERS", 0, Lifetime::Timeframe},
69 [](InputSpec& input, size_t index) {
71 });
72 auto inputsTpcProc = mergeInputs(
73 {"dataTPC-proc", "TPC", "CLUSTERS_P", 0, Lifetime::Timeframe},
75 [](InputSpec& input, size_t index) {
77 });
78 inputsDataSampler.insert(std::end(inputsDataSampler), std::begin(inputsTpcProc), std::end(inputsTpcProc));
79
80 auto dataSampler = DataProcessorSpec{
81 "dataSampler",
82 inputsDataSampler,
83 Outputs{
84 {"TPC", "CLUSTERS_S"},
85 {"TPC", "CLUSTERS_P_S"}},
88 InputRecord& inputs = ctx.inputs();
89
90 for (auto& input : inputs) {
91
92 const InputSpec* inputSpec = input.spec;
93 auto matcher = DataSpecUtils::asConcreteDataMatcher(*inputSpec);
94 o2::header::DataDescription outputDescription = matcher.description;
95
96 // todo: better sampled data flagging
97 size_t len = strlen(outputDescription.str);
98 if (len < outputDescription.size - 2) {
99 outputDescription.str[len] = '_';
100 outputDescription.str[len + 1] = 'S';
101 }
102
103 Output description{
104 matcher.origin,
105 outputDescription,
106 0,
107 inputSpec->lifetime};
108
109 LOG(debug) << "DataSampler sends data from subSpec: " << matcher.subSpec;
110
111 const auto* inputHeader = DataRefUtils::getHeader<o2::header::DataHeader*>(input);
112 auto& output = ctx.outputs().make<char>(description, inputHeader->size());
113
114 // todo: use some std function or adopt(), when it is available for POD data
115 const char* input_ptr = input.payload;
116 for (char& it : output) {
117 it = *input_ptr++;
118 }
119 }
120}
121}
122}
123;
124
125DataProcessorSpec qcTask{
126 "qcTask",
127 Inputs{
128 {"dataTPC-sampled", "TPC", "CLUSTERS_S"},
129 {"dataTPC-proc-sampled", "TPC", "CLUSTERS_P_S"}},
130 Outputs{},
133 const FakeCluster* inputDataTpc = reinterpret_cast<const FakeCluster*>(ctx.inputs().get("dataTPC-sampled").payload);
134const InputSpec* inputSpec = ctx.inputs().get("dataTPC-sampled").spec;
135auto matcher = DataSpecUtils::asConcreteDataMatcher(*inputSpec);
136LOG(debug) << "qcTask received data with subSpec: " << matcher.subSpec;
137}
138}
139}
140;
141
143 "sink",
145 {"dataTPC-proc", "TPC", "CLUSTERS_P"},
147 [](InputSpec& input, size_t index) {
149 }),
150 Outputs{},
152 [](ProcessingContext& ctx) {
153 const FakeCluster* inputDataTpc = reinterpret_cast<const FakeCluster*>(ctx.inputs().get("dataTPC-proc").payload);
154 }}};
155
156// error in qcTask:
157specs.swap(dataProducers);
158specs.insert(std::end(specs), std::begin(processingStages), std::end(processingStages));
159specs.push_back(sink);
160specs.push_back(dataSampler);
161specs.push_back(qcTask);
162
163// no error:
164// specs.swap(dataProducers);
165// specs.insert(std::end(specs), std::begin(processingStages), std::end(processingStages));
166// specs.push_back(dataSampler);
167// specs.push_back(qcTask);
168// specs.push_back(sink);
169
170return specs;
171}
172
174{
175 size_t index = ctx.services().get<ParallelContext>().index1D();
176 // Creates a new message of size collectionChunkSize which
177 // has "TPC" as data origin and "CLUSTERS" as data description.
178 auto& tpcClusters = ctx.outputs().make<FakeCluster>(
180 int i = 0;
181
182 for (auto& cluster : tpcClusters) {
183 assert(i < collectionChunkSize);
184 cluster.x = index;
185 cluster.y = i;
186 cluster.z = i;
187 cluster.q = rand() % 1000;
188 i++;
189 }
190 ctx.services().get<ControlService>().endOfStream();
191}
192
194{
195 size_t index = ctx.services().get<ParallelContext>().index1D();
196
197 const FakeCluster* inputDataTpc = reinterpret_cast<const FakeCluster*>(ctx.inputs().get("dataTPC").payload);
198
199 auto& processedTpcClusters = ctx.outputs().make<FakeCluster>(
200 Output{"TPC", "CLUSTERS_P", static_cast<o2::header::DataHeader::SubSpecificationType>(index)},
202
203 int i = 0;
204 for (auto& cluster : processedTpcClusters) {
205 assert(i < collectionChunkSize);
206 cluster.x = -inputDataTpc[i].x;
207 cluster.y = 2 * inputDataTpc[i].y;
208 cluster.z = inputDataTpc[i].z * inputDataTpc[i].q;
209 cluster.q = inputDataTpc[i].q;
210 i++;
211 }
212};
int32_t i
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
std::ostringstream debug
decltype(auto) make(const Output &spec, Args... args)
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
GLuint index
Definition glcorearb.h:781
GLenum GLenum GLsizei len
Definition glcorearb.h:4232
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
WorkflowSpec parallel(DataProcessorSpec original, size_t maxIndex, std::function< void(DataProcessorSpec &, size_t id)> amendCallback)
Inputs mergeInputs(InputSpec original, size_t maxIndex, std::function< void(InputSpec &, size_t)> amendCallback)
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
std::function< void(ProcessingContext &)> ProcessCallback
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
enum Lifetime lifetime
Definition InputSpec.h:73
header::DataOrigin origin
Definition Output.h:28
the main header struct
Definition DataHeader.h:618
uint32_t SubSpecificationType
Definition DataHeader.h:620
static int constexpr size
Definition DataHeader.h:211
size_t collectionChunkSize
void someProcessingStageAlgorithm(ProcessingContext &ctx)
size_t parallelSize
std::vector< DataProcessorSpec > defineDataProcessing(ConfigContext const &)
This function hooks up the the workflow specifications into the DPL driver.
void someDataProducerAlgorithm(ProcessingContext &ctx)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"