Project
Loading...
Searching...
No Matches
MergerInfrastructureBuilder.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
20
22
23using namespace o2::framework;
24
25namespace o2::mergers
26{
27
29 : mOutputSpecIntegral{header::gDataOriginInvalid, header::gDataDescriptionInvalid},
30 mOutputSpecMovingWindow{header::gDataOriginInvalid, header::gDataDescriptionInvalid}
31{
32}
33
35{
36 mInfrastructureName = name;
37}
38
40{
41 mInputs = inputs;
42}
43
45{
46 mOutputSpecIntegral = outputSpec;
47}
48
50{
51 mOutputSpecMovingWindow = outputSpec;
52}
53
55{
56 mConfig = config;
57}
58
59std::string MergerInfrastructureBuilder::validateConfig()
60{
61 std::string error;
62 const std::string preamble = "MergerInfrastructureBuilder error: ";
63 if (mInfrastructureName.empty()) {
64 error += preamble + "the infrastructure name is empty\n";
65 }
66 if (mInputs.empty()) {
67 error += preamble + "no inputs specified\n";
68 }
69 if (DataSpecUtils::validate(mOutputSpecIntegral) == false) {
70 error += preamble + "invalid output\n";
71 }
72
73 if ((mConfig.topologySize.value == TopologySize::NumberOfLayers || mConfig.topologySize.value == TopologySize::ReductionFactor) && !std::holds_alternative<int>(mConfig.topologySize.param)) {
74 error += preamble + "TopologySize::NumberOfLayers and TopologySize::ReductionFactor require a single int as parameter\n";
75 } else {
76 if (mConfig.topologySize.value == TopologySize::NumberOfLayers && std::get<int>(mConfig.topologySize.param) < 1) {
77 error += preamble + "number of layers less than 1 (" + std::to_string(std::get<int>(mConfig.topologySize.param)) + ")\n";
78 }
79 if (mConfig.topologySize.value == TopologySize::ReductionFactor && std::get<int>(mConfig.topologySize.param) < 2) {
80 error += preamble + "reduction factor smaller than 2 (" + std::to_string(std::get<int>(mConfig.topologySize.param)) + ")\n";
81 }
82 }
84 if (!std::holds_alternative<std::vector<size_t>>(mConfig.topologySize.param)) {
85 error += preamble + "TopologySize::MergersPerLayer require std::vector<size_t> as parameter\n";
86 } else {
87 auto mergersPerLayer = std::get<std::vector<size_t>>(mConfig.topologySize.param);
88 if (mergersPerLayer.empty()) {
89 error += preamble + "TopologySize::MergersPerLayer was used, but the provided vector is empty\n";
90 } else if (mergersPerLayer.back() != 1) {
91 error += preamble + "Last Merger layer should consist of one Merger, " + mergersPerLayer.back() + " was used\n";
92 }
93 }
94 }
95
97 error += preamble + "ParallelismType::RoundRobin does not apply to InputObjectsTimespan::FullHistory\n";
98 }
99
101 error += preamble + "MergedObjectTimespan::LastDifference does not apply to InputObjectsTimespan::FullHistory\n";
102 }
103
105 error += preamble + "PublishMovingWindow::Yes is not supported with InputObjectsTimespan::FullHistory\n";
106 }
107
108 for (const auto& input : mInputs) {
109 if (DataSpecUtils::match(input, mOutputSpecIntegral)) {
110 error += preamble + "output '" + DataSpecUtils::label(mOutputSpecIntegral) + "' matches input '" + DataSpecUtils::label(input) + "'. That will cause a circular dependency!";
111 }
112 }
113
114 if (mConfig.detectorName.empty()) {
115 error += preamble + "detector name is empty";
116 }
117
118 return error;
119}
120
122{
123 if (std::string error = validateConfig(); !error.empty()) {
124 throw std::runtime_error(error);
125 }
126
128 auto layerInputs = mInputs;
129
130 // preparing some numbers
131 const auto mergersPerLayer = computeNumberOfMergersPerLayer(layerInputs.size());
132 const bool expendable = std::ranges::any_of(mConfig.labels, [](const auto& label) { return label.value == "expendable"; });
133
134 // topology generation
135 MergerBuilder mergerBuilder;
136 mergerBuilder.setName(mInfrastructureName);
137 mergerBuilder.setOutputSpecMovingWindow(mOutputSpecMovingWindow);
138
139 size_t timePipelinePreviousLayer = 1;
140 for (size_t layer = 1; layer < mergersPerLayer.size(); layer++) {
141
142 size_t numberOfMergers = mergersPerLayer[layer];
143 size_t splitInputsMergers = mConfig.parallelismType.value == ParallelismType::SplitInputs ? numberOfMergers : 1;
144 size_t timePipelineVal = mConfig.parallelismType.value == ParallelismType::SplitInputs ? 1 : numberOfMergers;
145 size_t inputsPerMerger = layerInputs.size() / splitInputsMergers;
146 size_t inputsPerMergerRemainder = layerInputs.size() % splitInputsMergers;
147
148 MergerConfig layerConfig = mConfig;
149 if (layer < mergersPerLayer.size() - 1) {
150 // in intermediate layers we should reset the results, so the same data is not added many times.
152 // we also expect moving windows to be published only by the last layer
154 }
155
156 framework::Inputs nextLayerInputs;
157 auto inputsRangeBegin = layerInputs.begin();
158
159 for (size_t m = 0; m < splitInputsMergers; m++) {
160
161 mergerBuilder.setTopologyPosition(layer, m);
162 mergerBuilder.setTimePipeline(timePipelineVal);
163
164 auto inputsRangeEnd = inputsRangeBegin + inputsPerMerger + (m < inputsPerMergerRemainder);
165 mergerBuilder.setInputSpecs(framework::Inputs(inputsRangeBegin, inputsRangeEnd));
166
167 if (layer > 1 && !expendable) {
168 // we optimize the latency of higher Merger layers by publishing an object as soon as we get the expected number of inputs.
169 // we can do that safely only if tasks are not expendable, i.e. we are guaranteed that workflow stops if a Merger crashes.
170
171 // The formula below takes into account both ways of splitting inputs - by consuming a subset of InputSpecs and by using time-pipelined data processors.
172 const auto inputNumber = std::distance(inputsRangeBegin, inputsRangeEnd) * timePipelinePreviousLayer / timePipelineVal;
173 assert(inputNumber != 0);
174 layerConfig.publicationDecision = {PublicationDecision::EachNArrivals, inputNumber};
175 }
176 if (layer == mergersPerLayer.size() - 1) {
177 // the last layer => use the specified external OutputSpec
178 mergerBuilder.setOutputSpec(mOutputSpecIntegral);
179 }
180 mergerBuilder.setConfig(layerConfig);
181 auto merger = mergerBuilder.buildSpec();
182
183 auto input = DataSpecUtils::matchingInput(merger.outputs.at(0));
184 input.binding = "in";
185 nextLayerInputs.push_back(input);
186
187 workflow.emplace_back(std::move(merger));
188 inputsRangeBegin = inputsRangeEnd;
189 }
190 layerInputs = nextLayerInputs; // todo: could be optimised with pointers
191 timePipelinePreviousLayer = timePipelineVal;
192 }
193
194 return workflow;
195}
196
197std::vector<size_t> MergerInfrastructureBuilder::computeNumberOfMergersPerLayer(const size_t inputs) const
198{
199 std::vector<size_t> mergersPerLayer{inputs};
201 // _ _
202 // | L - i | where:
203 // | ----- | L - number of layers
204 // |V| --- | |V| L | i - layer index (0 - input layer)
205 // | |i --- | | |0 | M_i - number of mergers in i layer
206 // | |
207 //
208
209 size_t L = std::get<int>(mConfig.topologySize.param);
210 for (size_t i = 1; i <= L; i++) {
211 mergersPerLayer.push_back(static_cast<size_t>(ceil(pow(inputs, (L - i) / static_cast<double>(L)))));
212 }
213
214 } else if (mConfig.topologySize.value == TopologySize::ReductionFactor) {
215 // _ _
216 // | |V| | where:
217 // |V| --- | | |i-1 | R - reduction factor
218 // | |i --- | -------- | i - layer index (0 - input layer)
219 // | R | M_i - number of mergers in i layer
220 //
221
222 double R = std::get<int>(mConfig.topologySize.param);
223 size_t Mi, prevMi = inputs;
224 do {
225 Mi = static_cast<size_t>(ceil(prevMi / R));
226 mergersPerLayer.push_back(Mi);
227 prevMi = Mi;
228 } while (Mi > 1);
229 } else { // mConfig.topologySize.value == TopologySize::MergersPerLayer
230 auto mergersPerLayerConfig = std::get<std::vector<size_t>>(mConfig.topologySize.param);
231 mergersPerLayer.insert(mergersPerLayer.cend(), mergersPerLayerConfig.begin(), mergersPerLayerConfig.end());
232 }
233
234 return mergersPerLayer;
235}
236
238{
239 auto mergersInfrastructure = generateInfrastructure();
240 workflow.insert(std::end(workflow), std::begin(mergersInfrastructure), std::end(mergersInfrastructure));
241}
242
243} // namespace o2::mergers
int32_t i
Algorithms for merging objects.
Definition of O2 MergerInfrastructureBuilder, v0.1.
A builder class to generate a DataProcessorSpec of one Merger.
void setOutputSpecMovingWindow(const framework::OutputSpec &outputSpec)
void setTopologyPosition(size_t layer, size_t id)
framework::DataProcessorSpec buildSpec()
void setTimePipeline(size_t timepipeline)
void setConfig(MergerConfig)
void setOutputSpec(const framework::OutputSpec &outputSpec)
void setInputSpecs(const framework::Inputs &)
void setOutputSpec(const framework::OutputSpec &outputSpec)
void setOutputSpecMovingWindow(const framework::OutputSpec &outputSpec)
void setInputSpecs(const framework::Inputs &inputs)
const GLfloat * m
Definition glcorearb.h:4066
GLuint const GLchar * name
Definition glcorearb.h:781
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLenum GLuint GLint GLint layer
Definition glcorearb.h:1310
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static bool validate(InputSpec const &input)
static InputSpec matchingInput(OutputSpec const &spec)
static std::string label(InputSpec const &spec)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
ConfigEntry< PublicationDecision, PublicationDecisionParameter > publicationDecision
std::vector< o2::framework::DataProcessorLabel > labels
ConfigEntry< PublishMovingWindow > publishMovingWindow
ConfigEntry< TopologySize, std::variant< int, std::vector< size_t > > > topologySize
ConfigEntry< ParallelismType > parallelismType
ConfigEntry< InputObjectsTimespan > inputObjectTimespan
ConfigEntry< MergedObjectTimespan, int > mergedObjectTimespan