29 : mOutputSpecIntegral{header::gDataOriginInvalid, header::gDataDescriptionInvalid},
30 mOutputSpecMovingWindow{header::gDataOriginInvalid, header::gDataDescriptionInvalid}
36 mInfrastructureName =
name;
46 mOutputSpecIntegral = outputSpec;
51 mOutputSpecMovingWindow = outputSpec;
59std::string MergerInfrastructureBuilder::validateConfig()
62 const std::string preamble =
"MergerInfrastructureBuilder error: ";
63 if (mInfrastructureName.empty()) {
64 error += preamble +
"the infrastructure name is empty\n";
66 if (mInputs.empty()) {
67 error += preamble +
"no inputs specified\n";
70 error += preamble +
"invalid output\n";
74 error += preamble +
"TopologySize::NumberOfLayers and TopologySize::ReductionFactor require a single int as parameter\n";
85 error += preamble +
"TopologySize::MergersPerLayer require std::vector<size_t> as parameter\n";
88 if (mergersPerLayer.empty()) {
89 error += preamble +
"TopologySize::MergersPerLayer was used, but the provided vector is empty\n";
90 }
else if (mergersPerLayer.back() != 1) {
91 error += preamble +
"Last Merger layer should consist of one Merger, " + mergersPerLayer.back() +
" was used\n";
97 error += preamble +
"ParallelismType::RoundRobin does not apply to InputObjectsTimespan::FullHistory\n";
101 error += preamble +
"MergedObjectTimespan::LastDifference does not apply to InputObjectsTimespan::FullHistory\n";
105 error += preamble +
"PublishMovingWindow::Yes is not supported with InputObjectsTimespan::FullHistory\n";
108 for (
const auto& input : mInputs) {
115 error += preamble +
"detector name is empty";
123 if (std::string error = validateConfig(); !error.empty()) {
124 throw std::runtime_error(error);
128 auto layerInputs = mInputs;
131 const auto mergersPerLayer = computeNumberOfMergersPerLayer(layerInputs.size());
132 const bool expendable = std::ranges::any_of(mConfig.
labels, [](
const auto&
label) { return label.value ==
"expendable"; });
136 mergerBuilder.
setName(mInfrastructureName);
139 size_t timePipelinePreviousLayer = 1;
142 size_t numberOfMergers = mergersPerLayer[
layer];
145 size_t inputsPerMerger = layerInputs.size() / splitInputsMergers;
146 size_t inputsPerMergerRemainder = layerInputs.size() % splitInputsMergers;
149 if (
layer < mergersPerLayer.size() - 1) {
157 auto inputsRangeBegin = layerInputs.begin();
159 for (
size_t m = 0;
m < splitInputsMergers;
m++) {
164 auto inputsRangeEnd = inputsRangeBegin + inputsPerMerger + (
m < inputsPerMergerRemainder);
167 if (
layer > 1 && !expendable) {
172 const auto inputNumber = std::distance(inputsRangeBegin, inputsRangeEnd) * timePipelinePreviousLayer / timePipelineVal;
173 assert(inputNumber != 0);
176 if (
layer == mergersPerLayer.size() - 1) {
184 input.binding =
"in";
185 nextLayerInputs.push_back(input);
187 workflow.emplace_back(std::move(merger));
188 inputsRangeBegin = inputsRangeEnd;
190 layerInputs = nextLayerInputs;
191 timePipelinePreviousLayer = timePipelineVal;
197std::vector<size_t> MergerInfrastructureBuilder::computeNumberOfMergersPerLayer(
const size_t inputs)
const
199 std::vector<size_t> mergersPerLayer{inputs};
210 for (
size_t i = 1;
i <= L;
i++) {
211 mergersPerLayer.push_back(
static_cast<size_t>(ceil(pow(inputs, (L -
i) /
static_cast<double>(L)))));
223 size_t Mi, prevMi = inputs;
225 Mi =
static_cast<size_t>(ceil(prevMi /
R));
226 mergersPerLayer.push_back(Mi);
230 auto mergersPerLayerConfig = std::get<std::vector<size_t>>(mConfig.
topologySize.
param);
231 mergersPerLayer.insert(mergersPerLayer.cend(), mergersPerLayerConfig.begin(), mergersPerLayerConfig.end());
234 return mergersPerLayer;
240 workflow.insert(std::end(workflow), std::begin(mergersInfrastructure), std::end(mergersInfrastructure));
Algorithms for merging objects.
Definition of O2 MergerInfrastructureBuilder, v0.1.
A builder class to generate a DataProcessorSpec of one Merger.
void setOutputSpecMovingWindow(const framework::OutputSpec &outputSpec)
void setTopologyPosition(size_t layer, size_t id)
void setName(std::string)
framework::DataProcessorSpec buildSpec()
void setTimePipeline(size_t timepipeline)
void setConfig(MergerConfig)
void setOutputSpec(const framework::OutputSpec &outputSpec)
void setInputSpecs(const framework::Inputs &)
MergerInfrastructureBuilder()
Default constructor.
void setInfrastructureName(std::string name)
void setOutputSpec(const framework::OutputSpec &outputSpec)
framework::WorkflowSpec generateInfrastructure()
void setConfig(MergerConfig config)
void setOutputSpecMovingWindow(const framework::OutputSpec &outputSpec)
void setInputSpecs(const framework::Inputs &inputs)
GLuint const GLchar * name
GLuint GLsizei const GLchar * label
GLenum GLuint GLint GLint layer
Defining PrimaryVertex explicitly as messageable.
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
std::string to_string(gsl::span< T, Size > span)
static bool validate(InputSpec const &input)
static InputSpec matchingInput(OutputSpec const &spec)
static std::string label(InputSpec const &spec)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
ConfigEntry< PublicationDecision, PublicationDecisionParameter > publicationDecision
std::vector< o2::framework::DataProcessorLabel > labels
ConfigEntry< PublishMovingWindow > publishMovingWindow
ConfigEntry< TopologySize, std::variant< int, std::vector< size_t > > > topologySize
ConfigEntry< ParallelismType > parallelismType
ConfigEntry< InputObjectsTimespan > inputObjectTimespan
ConfigEntry< MergedObjectTimespan, int > mergedObjectTimespan