Project
Loading...
Searching...
No Matches
test_ParallelPipeline.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include "Framework/InputSpec.h"
24#include <fairmq/Device.h>
25#include <algorithm>
26#include <memory>
27#include <unordered_map>
28
29// customize clusterers and cluster decoders to process immediately what comes in
30void customize(std::vector<o2::framework::CompletionPolicy>& policies)
31{
32 // we customize the pipeline processors to consume data as it comes
35 policies.push_back(CompletionPolicyHelpers::defineByName("consumer", CompletionPolicy::CompletionOp::Consume));
36}
38
39#define ASSERT_ERROR(condition) \
40 if ((condition) == false) { \
41 LOG(fatal) << R"(Test condition ")" #condition R"(" failed)"; \
42 }
43
45using namespace o2::framework;
46
47size_t nPipelines = 4;
49size_t nRolls = 1;
50
51std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const&)
52{
53 // define a template workflow with processors to be executed in a pipeline
54 std::vector<DataProcessorSpec> workflowSpecs{
55 {"processor1",
56 Inputs{
57 {"input", "TST", "TRIGGER", 0, Lifetime::Timeframe}},
58 Outputs{
59 {{"output"}, "TST", "PREPROC", 0, Lifetime::Timeframe}},
61 for (auto const& input : ctx.inputs()) {
62 auto const& parallelContext = ctx.services().get<ParallelContext>();
63 LOG(debug) << "instance " << parallelContext.index1D() << " of " << parallelContext.index1DSize() << ": "
64 << *input.spec << ": " << *((int*)input.payload);
65 auto const* dataheader = DataRefUtils::getHeader<o2::header::DataHeader*>(input);
66 auto& data = ctx.outputs().make<int>(Output{"TST", "PREPROC", dataheader->subSpecification});
67 ASSERT_ERROR(ctx.inputs().get<int>(input.spec->binding.c_str()) == parallelContext.index1D());
68 data = parallelContext.index1D();
69 }
70 }}},
71 {"processor2",
72 Inputs{
73 {"input", "TST", "PREPROC", 0, Lifetime::Timeframe}},
74 Outputs{
75 {{"output"}, "TST", "DATA", 0, Lifetime::Timeframe},
76 {{"metadt"}, "TST", "META", 0, Lifetime::Timeframe}},
78 for (auto const& input : ctx.inputs()) {
79 auto const& parallelContext = ctx.services().get<ParallelContext>();
80 LOG(debug) << "instance " << parallelContext.index1D() << " of " << parallelContext.index1DSize() << ": "
81 << *input.spec << ": " << *((int*)input.payload);
82 ASSERT_ERROR(ctx.inputs().get<int>(input.spec->binding.c_str()) == parallelContext.index1D());
83 auto const* dataheader = DataRefUtils::getHeader<o2::header::DataHeader*>(input);
84 // TODO: there is a bug in the API for using OutputRef, returns an rvalue which can not be bound to
85 // lvalue reference
86 auto& data = ctx.outputs().make<int>(Output{"TST", "DATA", dataheader->subSpecification});
87 data = ctx.inputs().get<int>(input.spec->binding.c_str());
88 auto& meta = ctx.outputs().make<int>(Output{"TST", "META", dataheader->subSpecification});
89 meta = dataheader->subSpecification;
90 }
91 }}},
92 };
93
94 // create parallel pipelines from the template workflow, the number of parallel channel is defined by
95 // nParallelChannels and is distributed among the pipelines
96 std::vector<o2::header::DataHeader::SubSpecificationType> subspecs(nParallelChannels);
97 std::generate(subspecs.begin(), subspecs.end(), [counter = std::make_shared<int>(0)]() { return 0x1 << (*counter)++; });
98 // correspondence between the subspec and the instance which serves this particular subspec
99 // this is checked in the final consumer
100 auto checkMap = std::make_shared<std::unordered_map<o2::header::DataHeader::SubSpecificationType, int>>();
101 {
102 size_t pipeline = 0;
103 for (auto const& subspec : subspecs) {
104 (*checkMap)[subspec] = pipeline;
105 pipeline++;
106 if (pipeline >= nPipelines) {
107 pipeline = 0;
108 }
109 }
110 }
111 workflowSpecs = parallelPipeline(
112 workflowSpecs, nPipelines,
113 [&subspecs]() { return subspecs.size(); },
114 [&subspecs](size_t index) { return subspecs[index]; });
115
116 // define a producer process with outputs for all subspecs
117 auto producerOutputs = [&subspecs]() {
118 Outputs outputs;
119 for (auto const& subspec : subspecs) {
120 outputs.emplace_back("TST", "TRIGGER", subspec, Lifetime::Timeframe);
121 }
122 return outputs;
123 };
124
125 workflowSpecs.emplace_back(DataProcessorSpec{
126 "trigger",
127 Inputs{},
128 producerOutputs(),
129 AlgorithmSpec{[subspecs, counter = std::make_shared<int>(0)](ProcessingContext& ctx) {
130 if (*counter < nRolls) {
131 size_t pipeline = 0;
132 size_t channels = subspecs.size();
133 std::vector<size_t> multiplicities(nPipelines);
134 for (pipeline = 0; pipeline < nPipelines; pipeline++) {
135 multiplicities[pipeline] = channels / (nPipelines - pipeline) + ((channels % (nPipelines - pipeline)) > 0 ? 1 : 0);
136 channels -= multiplicities[pipeline];
137 }
139 size_t index = 0;
140 auto end = subspecs.size();
141 for (pipeline = 0; index < end; index++) {
142 if (multiplicities[pipeline] == 0) {
143 continue;
144 }
145 ctx.outputs().make<int>(Output{"TST", "TRIGGER", subspecs[index]}) = pipeline;
146 multiplicities[pipeline++]--;
147 if (pipeline >= nPipelines) {
148 pipeline = 0;
149 }
150 }
151 ASSERT_ERROR(index == subspecs.size());
152 (*counter)++;
153 }
154 if (*counter == nRolls) {
155 ctx.services().get<ControlService>().endOfStream();
156 ctx.services().get<ControlService>().readyToQuit(QuitRequest::Me);
157 }
158 }}});
159
160 // the final consumer
161 // map of bindings is used to check the channel names, note that the object is captured by
162 // reference in mergeInputs which is a helper executed at construction of DataProcessorSpec,
163 // while the AlgorithmSpec stores a lambda to be called later on, and the object must be
164 // passed by copy or move in order to have a valid object upon invocation
165 std::unordered_map<o2::header::DataHeader::SubSpecificationType, std::string> bindings;
166 workflowSpecs.emplace_back(DataProcessorSpec{
167 "consumer",
168 mergeInputs({{"datain", "TST", "DATA", 0, Lifetime::Timeframe},
169 {"metain", "TST", "META", 0, Lifetime::Timeframe}},
170 subspecs.size(),
171 [&subspecs, &bindings](InputSpec& input, size_t index) {
172 input.binding += std::to_string(index);
174 if (input.binding.compare(0, 6, "datain") == 0) {
175 bindings[subspecs[index]] = input.binding;
176 }
177 }),
178 Outputs(),
179 AlgorithmSpec{adaptStateful([checkMap, bindings = std::move(bindings)](CallbackService& callbacks) {
180 callbacks.set<CallbackService::Id::EndOfStream>([checkMap](EndOfStreamContext& ctx) {
181 for (auto const& [subspec, pipeline] : *checkMap) {
182 // we require all checks to be invalidated
183 ASSERT_ERROR(pipeline == -1);
184 }
185 checkMap->clear();
186 });
187 callbacks.set<CallbackService::Id::Stop>([checkMap]() {
188 ASSERT_ERROR(checkMap->size() == 0);
189 });
190 return adaptStateless([checkMap, bindings = std::move(bindings)](InputRecord& inputs) {
191 bool haveDataIn = false;
192 size_t index = 0;
193 for (auto const& input : inputs) {
194 if (!DataRefUtils::isValid(input)) {
195 continue;
196 }
197 LOG(info) << "consuming : " << *input.spec << ": " << *((int*)input.payload);
198 auto const* dataheader = DataRefUtils::getHeader<o2::header::DataHeader*>(input);
199 if (input.spec->binding.compare(0, 6, "datain") == 0) {
200 if (input.spec->binding != bindings.at(dataheader->subSpecification)) {
201 LOG(error) << "data with subspec " << dataheader->subSpecification << " at unexpected binding " << input.spec->binding << ", expected " << bindings.at(dataheader->subSpecification);
202 }
203 haveDataIn = true;
204 ASSERT_ERROR(checkMap->at(dataheader->subSpecification) == inputs.get<int>(input.spec->binding.c_str()));
205 // keep a backup before invalidating, the backup is used in the check below, which can throw and therefor
206 // must be after invalidation
207 auto pipeline = checkMap->at(dataheader->subSpecification);
208 // invalidate, we check in the end of stream callback that all are invalidated
209 (*checkMap)[dataheader->subSpecification] = -1;
210 // check if we can access channels by binding
211 if (inputs.isValid(bindings.at(dataheader->subSpecification))) {
212 ASSERT_ERROR(inputs.get<int>(bindings.at(dataheader->subSpecification)) == pipeline);
213 }
214 }
215 }
216 // we require each input cycle to have data on datain channel
217 ASSERT_ERROR(haveDataIn);
218 });
219 })}});
220
221 return workflowSpecs;
222}
std::ostringstream debug
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
bool isValid(std::string const &s) const
Helper method to be used to check if a given part of the InputRecord is present.
decltype(auto) get(R binding, int part=0) const
GLuint GLuint end
Definition glcorearb.h:469
GLuint index
Definition glcorearb.h:781
GLboolean * data
Definition glcorearb.h:298
GLuint counter
Definition glcorearb.h:3987
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
WorkflowSpec parallelPipeline(const WorkflowSpec &specs, size_t nPipelines, std::function< size_t()> getNumberOfSubspecs, std::function< size_t(size_t)> getSubSpec)
Inputs mergeInputs(InputSpec original, size_t maxIndex, std::function< void(InputSpec &, size_t)> amendCallback)
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
Helper class which holds commonly used policies.
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
static bool isValid(DataRef const &ref)
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
std::string binding
A mnemonic name for the input spec.
Definition InputSpec.h:66
the main header struct
Definition DataHeader.h:618
size_t nPipelines
size_t nParallelChannels
#define ASSERT_ERROR(condition)
size_t nRolls
std::vector< DataProcessorSpec > defineDataProcessing(ConfigContext const &)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< o2::framework::CompletionPolicy > &policies)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::vector< ChannelData > channels