Project
Loading...
Searching...
No Matches
test_Parallel.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include "Framework/InputSpec.h"
19
20#include <boost/algorithm/string.hpp>
21
22using namespace o2::framework;
23
24struct FakeCluster {
25 float x;
26 float y;
27 float z;
28 float q;
29};
31
32size_t parallelSize = 4;
33size_t collectionChunkSize = 1000;
36
37std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const&)
38{
39 std::vector<DataProcessorSpec> specs;
40 auto dataProducers = parallel(
42 "dataProducer",
43 Inputs{},
44 {OutputSpec{"TPC", "CLUSTERS", 0, Lifetime::Timeframe}},
48 [](DataProcessorSpec& spec, size_t index) {
50 });
51
52 auto processingStages = parallel(
54 "processingStage",
55 Inputs{
56 {"dataTPC", "TPC", "CLUSTERS", 0, Lifetime::Timeframe}},
57 Outputs{
58 {"TPC", "CLUSTERS_P", 0, Lifetime::Timeframe}},
60 // CLion says it ambiguous without (AlgorithmSpec::ProcessCallback), but cmake compiles fine anyway.
63 [](DataProcessorSpec& spec, size_t index) {
66 });
67
68 auto inputsDataSampler = mergeInputs(
69 {"dataTPC", "TPC", "CLUSTERS", 0, Lifetime::Timeframe},
71 [](InputSpec& input, size_t index) {
73 });
74 auto inputsTpcProc = mergeInputs(
75 {"dataTPC-proc", "TPC", "CLUSTERS_P", 0, Lifetime::Timeframe},
77 [](InputSpec& input, size_t index) {
79 });
80 inputsDataSampler.insert(std::end(inputsDataSampler), std::begin(inputsTpcProc), std::end(inputsTpcProc));
81
82 auto dataSampler = DataProcessorSpec{
83 "dataSampler",
84 inputsDataSampler,
85 Outputs{
86 {"TPC", "CLUSTERS_S"},
87 {"TPC", "CLUSTERS_P_S"}},
90 InputRecord& inputs = ctx.inputs();
91
92 for (auto& input : inputs) {
93
94 const InputSpec* inputSpec = input.spec;
95 auto matcher = DataSpecUtils::asConcreteDataMatcher(*inputSpec);
96 o2::header::DataDescription outputDescription = matcher.description;
97
98 // todo: better sampled data flagging
99 size_t len = strlen(outputDescription.str);
100 if (len < outputDescription.size - 2) {
101 outputDescription.str[len] = '_';
102 outputDescription.str[len + 1] = 'S';
103 }
104
105 Output description{
106 matcher.origin,
107 outputDescription,
108 0,
109 inputSpec->lifetime};
110
111 LOG(debug) << "DataSampler sends data from subSpec: " << matcher.subSpec;
112
113 const auto* inputHeader = DataRefUtils::getHeader<o2::header::DataHeader*>(input);
114 auto& output = ctx.outputs().make<char>(description, inputHeader->size());
115
116 // todo: use some std function or adopt(), when it is available for POD data
117 const char* input_ptr = input.payload;
118 for (char& it : output) {
119 it = *input_ptr++;
120 }
121 }
122}
123}
124}
125;
126
127DataProcessorSpec qcTask{
128 "qcTask",
129 Inputs{
130 {"dataTPC-sampled", "TPC", "CLUSTERS_S"},
131 {"dataTPC-proc-sampled", "TPC", "CLUSTERS_P_S"}},
132 Outputs{},
135 const FakeCluster* inputDataTpc = reinterpret_cast<const FakeCluster*>(ctx.inputs().get("dataTPC-sampled").payload);
136const InputSpec* inputSpec = ctx.inputs().get("dataTPC-sampled").spec;
137auto matcher = DataSpecUtils::asConcreteDataMatcher(*inputSpec);
138LOG(debug) << "qcTask received data with subSpec: " << matcher.subSpec;
139}
140}
141}
142;
143
145 "sink",
147 {"dataTPC-proc", "TPC", "CLUSTERS_P"},
149 [](InputSpec& input, size_t index) {
151 }),
152 Outputs{},
154 [](ProcessingContext& ctx) {
155 const FakeCluster* inputDataTpc = reinterpret_cast<const FakeCluster*>(ctx.inputs().get("dataTPC-proc").payload);
156 }}};
157
158// error in qcTask:
159specs.swap(dataProducers);
160specs.insert(std::end(specs), std::begin(processingStages), std::end(processingStages));
161specs.push_back(sink);
162specs.push_back(dataSampler);
163specs.push_back(qcTask);
164
165// no error:
166// specs.swap(dataProducers);
167// specs.insert(std::end(specs), std::begin(processingStages), std::end(processingStages));
168// specs.push_back(dataSampler);
169// specs.push_back(qcTask);
170// specs.push_back(sink);
171
172return specs;
173}
174
176{
177 size_t index = ctx.services().get<ParallelContext>().index1D();
178 // Creates a new message of size collectionChunkSize which
179 // has "TPC" as data origin and "CLUSTERS" as data description.
180 auto& tpcClusters = ctx.outputs().make<FakeCluster>(
182 int i = 0;
183
184 for (auto& cluster : tpcClusters) {
185 assert(i < collectionChunkSize);
186 cluster.x = index;
187 cluster.y = i;
188 cluster.z = i;
189 cluster.q = rand() % 1000;
190 i++;
191 }
192 ctx.services().get<ControlService>().endOfStream();
193}
194
196{
197 size_t index = ctx.services().get<ParallelContext>().index1D();
198
199 const FakeCluster* inputDataTpc = reinterpret_cast<const FakeCluster*>(ctx.inputs().get("dataTPC").payload);
200
201 auto& processedTpcClusters = ctx.outputs().make<FakeCluster>(
202 Output{"TPC", "CLUSTERS_P", static_cast<o2::header::DataHeader::SubSpecificationType>(index)},
204
205 int i = 0;
206 for (auto& cluster : processedTpcClusters) {
207 assert(i < collectionChunkSize);
208 cluster.x = -inputDataTpc[i].x;
209 cluster.y = 2 * inputDataTpc[i].y;
210 cluster.z = inputDataTpc[i].z * inputDataTpc[i].q;
211 cluster.q = inputDataTpc[i].q;
212 i++;
213 }
214};
int32_t i
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
std::ostringstream debug
decltype(auto) make(const Output &spec, Args... args)
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
GLuint index
Definition glcorearb.h:781
GLenum GLenum GLsizei len
Definition glcorearb.h:4232
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
WorkflowSpec parallel(DataProcessorSpec original, size_t maxIndex, std::function< void(DataProcessorSpec &, size_t id)> amendCallback)
Inputs mergeInputs(InputSpec original, size_t maxIndex, std::function< void(InputSpec &, size_t)> amendCallback)
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
std::function< void(ProcessingContext &)> ProcessCallback
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
enum Lifetime lifetime
Definition InputSpec.h:73
header::DataOrigin origin
Definition Output.h:28
the main header struct
Definition DataHeader.h:618
uint32_t SubSpecificationType
Definition DataHeader.h:620
static int constexpr size
Definition DataHeader.h:211
size_t collectionChunkSize
void someProcessingStageAlgorithm(ProcessingContext &ctx)
size_t parallelSize
std::vector< DataProcessorSpec > defineDataProcessing(ConfigContext const &)
This function hooks up the the workflow specifications into the DPL driver.
void someDataProducerAlgorithm(ProcessingContext &ctx)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"