Project
Loading...
Searching...
No Matches
MergerInfrastructureBuilder.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
20
22
23using namespace o2::framework;
24
25namespace o2::mergers
26{
27
29 : mOutputSpecIntegral{header::gDataOriginInvalid, header::gDataDescriptionInvalid},
30 mOutputSpecMovingWindow{header::gDataOriginInvalid, header::gDataDescriptionInvalid}
31{
32}
33
35{
36 mInfrastructureName = name;
37}
38
40{
41 mInputs = inputs;
42}
43
45{
46 mOutputSpecIntegral = outputSpec;
47}
48
50{
51 mOutputSpecMovingWindow = outputSpec;
52}
53
55{
56 mConfig = config;
57}
58
59std::string MergerInfrastructureBuilder::validateConfig()
60{
61 std::string error;
62 const std::string preamble = "MergerInfrastructureBuilder error: ";
63 if (mInfrastructureName.empty()) {
64 error += preamble + "the infrastructure name is empty\n";
65 }
66 if (mInputs.empty()) {
67 error += preamble + "no inputs specified\n";
68 }
69 if (DataSpecUtils::validate(mOutputSpecIntegral) == false) {
70 error += preamble + "invalid output\n";
71 }
72
73 if ((mConfig.topologySize.value == TopologySize::NumberOfLayers || mConfig.topologySize.value == TopologySize::ReductionFactor) && !std::holds_alternative<int>(mConfig.topologySize.param)) {
74 error += preamble + "TopologySize::NumberOfLayers and TopologySize::ReductionFactor require a single int as parameter\n";
75 } else {
76 if (mConfig.topologySize.value == TopologySize::NumberOfLayers && std::get<int>(mConfig.topologySize.param) < 1) {
77 error += preamble + "number of layers less than 1 (" + std::to_string(std::get<int>(mConfig.topologySize.param)) + ")\n";
78 }
79 if (mConfig.topologySize.value == TopologySize::ReductionFactor && std::get<int>(mConfig.topologySize.param) < 2) {
80 error += preamble + "reduction factor smaller than 2 (" + std::to_string(std::get<int>(mConfig.topologySize.param)) + ")\n";
81 }
82 }
84 if (!std::holds_alternative<std::vector<size_t>>(mConfig.topologySize.param)) {
85 error += preamble + "TopologySize::MergersPerLayer require std::vector<size_t> as parameter\n";
86 } else {
87 auto mergersPerLayer = std::get<std::vector<size_t>>(mConfig.topologySize.param);
88 if (mergersPerLayer.empty()) {
89 error += preamble + "TopologySize::MergersPerLayer was used, but the provided vector is empty\n";
90 } else if (mergersPerLayer.back() != 1) {
91 error += preamble + "Last Merger layer should consist of one Merger, " + mergersPerLayer.back() + " was used\n";
92 }
93 }
94 }
95
97 error += preamble + "ParallelismType::RoundRobin does not apply to InputObjectsTimespan::FullHistory\n";
98 }
99
101 error += preamble + "MergedObjectTimespan::LastDifference does not apply to InputObjectsTimespan::FullHistory\n";
102 }
103
105 error += preamble + "PublishMovingWindow::Yes is not supported with InputObjectsTimespan::FullHistory\n";
106 }
107
108 for (const auto& input : mInputs) {
109 if (DataSpecUtils::match(input, mOutputSpecIntegral)) {
110 error += preamble + "output '" + DataSpecUtils::label(mOutputSpecIntegral) + "' matches input '" + DataSpecUtils::label(input) + "'. That will cause a circular dependency!";
111 }
112 }
113
114 if (mConfig.detectorName.empty()) {
115 error += preamble + "detector name is empty";
116 }
117
118 return error;
119}
120
122{
123 if (std::string error = validateConfig(); !error.empty()) {
124 throw std::runtime_error(error);
125 }
126
128 auto layerInputs = mInputs;
129
130 // preparing some numbers
131 const auto mergersPerLayer = computeNumberOfMergersPerLayer(layerInputs.size());
132 const bool expendable = std::ranges::any_of(mConfig.labels, [](const auto& label) { return label.value == "expendable"; });
133
134 // topology generation
135 MergerBuilder mergerBuilder;
136 mergerBuilder.setName(mInfrastructureName);
137 mergerBuilder.setOutputSpecMovingWindow(mOutputSpecMovingWindow);
138
139 for (size_t layer = 1; layer < mergersPerLayer.size(); layer++) {
140
141 size_t numberOfMergers = mergersPerLayer[layer];
142 size_t splitInputsMergers = mConfig.parallelismType.value == ParallelismType::SplitInputs ? numberOfMergers : 1;
143 size_t timePipelineVal = mConfig.parallelismType.value == ParallelismType::SplitInputs ? 1 : numberOfMergers;
144 size_t inputsPerMerger = layerInputs.size() / splitInputsMergers;
145 size_t inputsPerMergerRemainder = layerInputs.size() % splitInputsMergers;
146
147 MergerConfig layerConfig = mConfig;
148 if (layer < mergersPerLayer.size() - 1) {
149 // in intermediate layers we should reset the results, so the same data is not added many times.
151 // we also expect moving windows to be published only by the last layer
153 }
154
155 framework::Inputs nextLayerInputs;
156 auto inputsRangeBegin = layerInputs.begin();
157
158 for (size_t m = 0; m < splitInputsMergers; m++) {
159
160 mergerBuilder.setTopologyPosition(layer, m);
161 mergerBuilder.setTimePipeline(timePipelineVal);
162
163 auto inputsRangeEnd = inputsRangeBegin + inputsPerMerger + (m < inputsPerMergerRemainder);
164 mergerBuilder.setInputSpecs(framework::Inputs(inputsRangeBegin, inputsRangeEnd));
165
166 if (layer > 1 && !expendable) {
167 // we optimize the latency of higher Merger layers by publishing an object as soon as we get the expected number of inputs.
168 // we can do that safely only if tasks are not expendable, i.e. we are guaranteed that workflow stops if a Merger crashes.
169 const auto inputNumber = std::distance(inputsRangeBegin, inputsRangeEnd);
170 assert(inputNumber != 0);
171 layerConfig.publicationDecision = {PublicationDecision::EachNArrivals, inputNumber};
172 }
173 if (layer == mergersPerLayer.size() - 1) {
174 // the last layer => use the specified external OutputSpec
175 mergerBuilder.setOutputSpec(mOutputSpecIntegral);
176 }
177 mergerBuilder.setConfig(layerConfig);
178 auto merger = mergerBuilder.buildSpec();
179
180 auto input = DataSpecUtils::matchingInput(merger.outputs.at(0));
181 input.binding = "in";
182 nextLayerInputs.push_back(input);
183
184 workflow.emplace_back(std::move(merger));
185 inputsRangeBegin = inputsRangeEnd;
186 }
187 layerInputs = nextLayerInputs; // todo: could be optimised with pointers
188 }
189
190 return workflow;
191}
192
193std::vector<size_t> MergerInfrastructureBuilder::computeNumberOfMergersPerLayer(const size_t inputs) const
194{
195 std::vector<size_t> mergersPerLayer{inputs};
197 // _ _
198 // | L - i | where:
199 // | ----- | L - number of layers
200 // |V| --- | |V| L | i - layer index (0 - input layer)
201 // | |i --- | | |0 | M_i - number of mergers in i layer
202 // | |
203 //
204
205 size_t L = std::get<int>(mConfig.topologySize.param);
206 for (size_t i = 1; i <= L; i++) {
207 mergersPerLayer.push_back(static_cast<size_t>(ceil(pow(inputs, (L - i) / static_cast<double>(L)))));
208 }
209
210 } else if (mConfig.topologySize.value == TopologySize::ReductionFactor) {
211 // _ _
212 // | |V| | where:
213 // |V| --- | | |i-1 | R - reduction factor
214 // | |i --- | -------- | i - layer index (0 - input layer)
215 // | R | M_i - number of mergers in i layer
216 //
217
218 double R = std::get<int>(mConfig.topologySize.param);
219 size_t Mi, prevMi = inputs;
220 do {
221 Mi = static_cast<size_t>(ceil(prevMi / R));
222 mergersPerLayer.push_back(Mi);
223 prevMi = Mi;
224 } while (Mi > 1);
225 } else { // mConfig.topologySize.value == TopologySize::MergersPerLayer
226 auto mergersPerLayerConfig = std::get<std::vector<size_t>>(mConfig.topologySize.param);
227 mergersPerLayer.insert(mergersPerLayer.cend(), mergersPerLayerConfig.begin(), mergersPerLayerConfig.end());
228 }
229
230 return mergersPerLayer;
231}
232
234{
235 auto mergersInfrastructure = generateInfrastructure();
236 workflow.insert(std::end(workflow), std::begin(mergersInfrastructure), std::end(mergersInfrastructure));
237}
238
239} // namespace o2::mergers
int32_t i
Algorithms for merging objects.
Definition of O2 MergerInfrastructureBuilder, v0.1.
A builder class to generate a DataProcessorSpec of one Merger.
void setOutputSpecMovingWindow(const framework::OutputSpec &outputSpec)
void setTopologyPosition(size_t layer, size_t id)
framework::DataProcessorSpec buildSpec()
void setTimePipeline(size_t timepipeline)
void setConfig(MergerConfig)
void setOutputSpec(const framework::OutputSpec &outputSpec)
void setInputSpecs(const framework::Inputs &)
void setOutputSpec(const framework::OutputSpec &outputSpec)
void setOutputSpecMovingWindow(const framework::OutputSpec &outputSpec)
void setInputSpecs(const framework::Inputs &inputs)
const GLfloat * m
Definition glcorearb.h:4066
GLuint const GLchar * name
Definition glcorearb.h:781
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLenum GLuint GLint GLint layer
Definition glcorearb.h:1310
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static bool validate(InputSpec const &input)
static InputSpec matchingInput(OutputSpec const &spec)
static std::string label(InputSpec const &spec)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
ConfigEntry< PublicationDecision, PublicationDecisionParameter > publicationDecision
std::vector< o2::framework::DataProcessorLabel > labels
ConfigEntry< PublishMovingWindow > publishMovingWindow
ConfigEntry< TopologySize, std::variant< int, std::vector< size_t > > > topologySize
ConfigEntry< ParallelismType > parallelismType
ConfigEntry< InputObjectsTimespan > inputObjectTimespan
ConfigEntry< MergedObjectTimespan, int > mergedObjectTimespan