29 : mOutputSpecIntegral{header::gDataOriginInvalid, header::gDataDescriptionInvalid},
30 mOutputSpecMovingWindow{header::gDataOriginInvalid, header::gDataDescriptionInvalid}
36 mInfrastructureName =
name;
46 mOutputSpecIntegral = outputSpec;
51 mOutputSpecMovingWindow = outputSpec;
59std::string MergerInfrastructureBuilder::validateConfig()
62 const std::string preamble =
"MergerInfrastructureBuilder error: ";
63 if (mInfrastructureName.empty()) {
64 error += preamble +
"the infrastructure name is empty\n";
66 if (mInputs.empty()) {
67 error += preamble +
"no inputs specified\n";
70 error += preamble +
"invalid output\n";
74 error += preamble +
"TopologySize::NumberOfLayers and TopologySize::ReductionFactor require a single int as parameter\n";
85 error += preamble +
"TopologySize::MergersPerLayer require std::vector<size_t> as parameter\n";
88 if (mergersPerLayer.empty()) {
89 error += preamble +
"TopologySize::MergersPerLayer was used, but the provided vector is empty\n";
90 }
else if (mergersPerLayer.back() != 1) {
91 error += preamble +
"Last Merger layer should consist of one Merger, " + mergersPerLayer.back() +
" was used\n";
97 error += preamble +
"ParallelismType::RoundRobin does not apply to InputObjectsTimespan::FullHistory\n";
101 error += preamble +
"MergedObjectTimespan::LastDifference does not apply to InputObjectsTimespan::FullHistory\n";
105 error += preamble +
"PublishMovingWindow::Yes is not supported with InputObjectsTimespan::FullHistory\n";
108 for (
const auto& input : mInputs) {
115 error += preamble +
"detector name is empty";
123 if (std::string error = validateConfig(); !error.empty()) {
124 throw std::runtime_error(error);
128 auto layerInputs = mInputs;
131 const auto mergersPerLayer = computeNumberOfMergersPerLayer(layerInputs.size());
132 const bool expendable = std::ranges::any_of(mConfig.
labels, [](
const auto&
label) { return label.value ==
"expendable"; });
136 mergerBuilder.
setName(mInfrastructureName);
141 size_t numberOfMergers = mergersPerLayer[
layer];
144 size_t inputsPerMerger = layerInputs.size() / splitInputsMergers;
145 size_t inputsPerMergerRemainder = layerInputs.size() % splitInputsMergers;
148 if (
layer < mergersPerLayer.size() - 1) {
156 auto inputsRangeBegin = layerInputs.begin();
158 for (
size_t m = 0;
m < splitInputsMergers;
m++) {
163 auto inputsRangeEnd = inputsRangeBegin + inputsPerMerger + (
m < inputsPerMergerRemainder);
166 if (
layer > 1 && !expendable) {
169 const auto inputNumber = std::distance(inputsRangeBegin, inputsRangeEnd);
170 assert(inputNumber != 0);
173 if (
layer == mergersPerLayer.size() - 1) {
181 input.binding =
"in";
182 nextLayerInputs.push_back(input);
184 workflow.emplace_back(std::move(merger));
185 inputsRangeBegin = inputsRangeEnd;
187 layerInputs = nextLayerInputs;
193std::vector<size_t> MergerInfrastructureBuilder::computeNumberOfMergersPerLayer(
const size_t inputs)
const
195 std::vector<size_t> mergersPerLayer{inputs};
206 for (
size_t i = 1;
i <= L;
i++) {
207 mergersPerLayer.push_back(
static_cast<size_t>(ceil(pow(inputs, (L -
i) /
static_cast<double>(L)))));
219 size_t Mi, prevMi = inputs;
221 Mi =
static_cast<size_t>(ceil(prevMi /
R));
222 mergersPerLayer.push_back(Mi);
226 auto mergersPerLayerConfig = std::get<std::vector<size_t>>(mConfig.
topologySize.
param);
227 mergersPerLayer.insert(mergersPerLayer.cend(), mergersPerLayerConfig.begin(), mergersPerLayerConfig.end());
230 return mergersPerLayer;
236 workflow.insert(std::end(workflow), std::begin(mergersInfrastructure), std::end(mergersInfrastructure));
Algorithms for merging objects.
Definition of O2 MergerInfrastructureBuilder, v0.1.
A builder class to generate a DataProcessorSpec of one Merger.
void setOutputSpecMovingWindow(const framework::OutputSpec &outputSpec)
void setTopologyPosition(size_t layer, size_t id)
void setName(std::string)
framework::DataProcessorSpec buildSpec()
void setTimePipeline(size_t timepipeline)
void setConfig(MergerConfig)
void setOutputSpec(const framework::OutputSpec &outputSpec)
void setInputSpecs(const framework::Inputs &)
MergerInfrastructureBuilder()
Default constructor.
void setInfrastructureName(std::string name)
void setOutputSpec(const framework::OutputSpec &outputSpec)
framework::WorkflowSpec generateInfrastructure()
void setConfig(MergerConfig config)
void setOutputSpecMovingWindow(const framework::OutputSpec &outputSpec)
void setInputSpecs(const framework::Inputs &inputs)
GLuint const GLchar * name
GLuint GLsizei const GLchar * label
GLenum GLuint GLint GLint layer
Defining PrimaryVertex explicitly as messageable.
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
std::string to_string(gsl::span< T, Size > span)
static bool validate(InputSpec const &input)
static InputSpec matchingInput(OutputSpec const &spec)
static std::string label(InputSpec const &spec)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
ConfigEntry< PublicationDecision, PublicationDecisionParameter > publicationDecision
std::vector< o2::framework::DataProcessorLabel > labels
ConfigEntry< PublishMovingWindow > publishMovingWindow
ConfigEntry< TopologySize, std::variant< int, std::vector< size_t > > > topologySize
ConfigEntry< ParallelismType > parallelismType
ConfigEntry< InputObjectsTimespan > inputObjectTimespan
ConfigEntry< MergedObjectTimespan, int > mergedObjectTimespan