Project
Loading...
Searching...
No Matches
WorkflowSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
15
16#include <cstddef>
17#include <functional>
18#include <string>
19
20namespace o2::framework
21{
22
24 size_t maxIndex,
25 std::function<void(DataProcessorSpec&, size_t)> amendCallback)
26{
27 WorkflowSpec results;
28 results.reserve(maxIndex);
29 for (size_t i = 0; i < maxIndex; ++i) {
30 results.push_back(original);
31 results.back().name = original.name + "_" + std::to_string(i);
32 results.back().rank = i;
33 results.back().nSlots = maxIndex;
34 amendCallback(results.back(), i);
35 }
36 return results;
37}
38
40 size_t maxIndex,
41 std::function<void(DataProcessorSpec&, size_t)> amendCallback)
42{
43 WorkflowSpec results;
44 results.reserve(specs.size() * maxIndex);
45 for (auto& spec : specs) {
46 auto result = parallel(spec, maxIndex, amendCallback);
47 results.insert(results.end(), result.begin(), result.end());
48 }
49
50 return results;
51}
52
54 size_t nPipelines,
55 std::function<size_t()> getNumberOfSubspecs,
56 std::function<size_t(size_t)> getSubSpec)
57{
59 size_t numberOfSubspecs = getNumberOfSubspecs();
60 if (numberOfSubspecs < nPipelines) {
61 // no need to create more pipelines than the number of parallel Ids, in that case
62 // each pipeline serves one id
63 nPipelines = numberOfSubspecs;
64 }
65 for (auto process : specs) {
66 size_t channels = numberOfSubspecs;
67 size_t inputMultiplicity = numberOfSubspecs / nPipelines;
68 if (numberOfSubspecs % nPipelines) {
69 // some processes will get one more channel to handle all channels
70 inputMultiplicity += 1;
71 }
72 auto amendProcess = [numberOfSubspecs, nPipelines, &channels, &inputMultiplicity, getSubSpec](DataProcessorSpec& spec, size_t pipeline) {
73 auto inputs = std::move(spec.inputs);
74 auto outputs = std::move(spec.outputs);
75 spec.inputs.reserve(inputMultiplicity);
76 spec.outputs.reserve(inputMultiplicity);
77 for (size_t inputNo = 0; inputNo < inputMultiplicity; ++inputNo) {
78 for (auto& input : inputs) {
79 spec.inputs.push_back(input);
80 spec.inputs.back().binding += std::to_string(inputNo);
81 DataSpecUtils::updateMatchingSubspec(spec.inputs.back(), getSubSpec(pipeline + inputNo * nPipelines));
82 }
83 for (auto& output : outputs) {
84 spec.outputs.push_back(output);
85 spec.outputs.back().binding.value += std::to_string(inputNo);
86 // FIXME: this will be unneeded once we have a subSpec-less variant...
87 DataSpecUtils::updateMatchingSubspec(spec.outputs.back(), getSubSpec(pipeline + inputNo * nPipelines));
88 }
89 }
90 channels -= inputMultiplicity;
91 if (inputMultiplicity > numberOfSubspecs / nPipelines &&
92 (channels % (nPipelines - (pipeline + 1))) == 0) {
93 // if the remaining ids can be distributed equally among the remaining pipelines
94 // we can decrease multiplicity
95 inputMultiplicity = numberOfSubspecs / nPipelines;
96 }
97 };
98
99 if (nPipelines > 1) {
100 // add multiple processes and distribute inputs among them
101 auto amendedProcessors = parallel(process, nPipelines, amendProcess);
102 result.insert(result.end(), amendedProcessors.begin(), amendedProcessors.end());
103 } else if (nPipelines == 1) {
104 // add one single process with all the inputs
105 amendProcess(process, 0);
106 result.push_back(process);
107 }
108 }
109 return result;
110}
111
113 size_t maxIndex,
114 std::function<void(InputSpec&, size_t)> amendCallback)
115{
116 Inputs results;
117 results.reserve(maxIndex);
118 for (size_t i = 0; i < maxIndex; ++i) {
119 results.push_back(original);
120 amendCallback(results.back(), i);
121 }
122 return results;
123}
124
126 size_t maxIndex,
127 std::function<void(InputSpec&, size_t)> amendCallback)
128{
129 Inputs results;
130 results.reserve(inputs.size() * maxIndex);
131 for (size_t i = 0; i < maxIndex; ++i) {
132 for (auto const& original : inputs) {
133 results.push_back(original);
134 amendCallback(results.back(), i);
135 }
136 }
137 return results;
138}
139
141 size_t count)
142{
143 if (original.maxInputTimeslices != 1) {
144 std::runtime_error("You can time slice only once");
145 }
146 original.maxInputTimeslices = count;
147 return original;
148}
149
152std::vector<InputSpec> select(const char* matcher)
153{
154 return DataDescriptorQueryBuilder::parse(matcher);
155}
156
157namespace workflow
158{
159WorkflowSpec combine(char const* name, std::vector<DataProcessorSpec> const& specs, bool doIt)
160{
161 if (!doIt) {
162 return specs;
163 }
164
165 DataProcessorSpec combined;
166 combined.name = name;
167 // add all the inputs to combined
168 for (auto& spec : specs) {
169 for (auto& input : spec.inputs) {
170 combined.inputs.push_back(input);
171 }
172 for (auto& output : spec.outputs) {
173 combined.outputs.push_back(output);
174 }
175 for (auto& option : spec.options) {
176 combined.options.push_back(option);
177 }
178 for (auto& label : spec.labels) {
179 combined.labels.push_back(label);
180 }
181 for (auto& metadatum : spec.metadata) {
182 combined.metadata.push_back(metadatum);
183 }
184 for (auto& service : spec.requiredServices) {
185 // Insert in the final list of services
186 // only if a spec with the same name is not there
187 // already.
188 bool found = false;
189 for (auto& existing : combined.requiredServices) {
190 if (existing.name == service.name) {
191 found = true;
192 break;
193 }
194 }
195 if (!found) {
196 combined.requiredServices.push_back(service);
197 }
198 }
199 }
200
201 combined.algorithm = AlgorithmSpec{[specs](InitContext& ctx) {
202 std::vector<AlgorithmSpec::ProcessCallback> callbacks;
203 for (auto& spec : specs) {
204 if (spec.algorithm.onInit) {
205 callbacks.push_back(spec.algorithm.onInit(ctx));
206 } else if (spec.algorithm.onProcess) {
207 callbacks.push_back(spec.algorithm.onProcess);
208 }
209 }
210 return [callbacks](ProcessingContext& context) {
211 for (auto& callback : callbacks) {
212 callback(context);
213 }
214 };
215 }};
216 return {combined};
217}
218} // namespace workflow
219
220} // namespace o2::framework
int32_t i
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
GLint GLsizei count
Definition glcorearb.h:399
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint const GLchar * name
Definition glcorearb.h:781
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
WorkflowSpec combine(const char *name, std::vector< DataProcessorSpec > const &specs, bool doIt)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
WorkflowSpec parallelPipeline(const WorkflowSpec &specs, size_t nPipelines, std::function< size_t()> getNumberOfSubspecs, std::function< size_t(size_t)> getSubSpec)
WorkflowSpec parallel(DataProcessorSpec original, size_t maxIndex, std::function< void(DataProcessorSpec &, size_t id)> amendCallback)
Inputs mergeInputs(InputSpec original, size_t maxIndex, std::function< void(InputSpec &, size_t)> amendCallback)
std::vector< DataProcessorSpec > WorkflowSpec
DataProcessorSpec timePipeline(DataProcessorSpec original, size_t count)
std::vector< InputSpec > select(char const *matcher="")
std::vector< InputSpec > Inputs
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static std::vector< InputSpec > parse(const char *s="")
std::vector< DataProcessorMetadata > metadata
std::vector< ServiceSpec > requiredServices
std::vector< DataProcessorLabel > labels
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
size_t nPipelines
std::vector< ChannelData > channels