Project
Loading...
Searching...
No Matches
MergerBuilder.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
21#include <Monitoring/Monitoring.h>
23
27
28using namespace o2::framework;
29
30namespace o2::mergers
31{
32
33MergerBuilder::MergerBuilder() : mName("INVALID"),
34 mInputSpecs{},
35 mOutputSpecIntegral{header::gDataOriginInvalid, header::gDataDescriptionInvalid},
36 mOutputSpecMovingWindow{header::gDataOriginInvalid, header::gDataDescriptionInvalid},
37 mConfig{}
38{
39}
40
42{
43 mName = name;
44}
45
47{
48 mLayer = layer;
49 mId = id;
50}
51
52void MergerBuilder::setTimePipeline(size_t timepipeline)
53{
54 mTimePipeline = timepipeline;
55}
56
58{
59 mInputSpecs = inputs;
60}
61
63{
64 mOutputSpecIntegral = outputSpec;
66}
67
69{
70 mOutputSpecMovingWindow = outputSpec;
71 mOutputSpecMovingWindow.binding = {MergerBuilder::mergerMovingWindowOutputBinding()};
72}
73
75{
76 mConfig = config;
77}
78
80{
82
83 merger.name = mConfig.detectorName + "-" + mergerIdString() + "-" + mName + std::to_string(mLayer) + "l-" + std::to_string(mId);
84
85 merger.inputs = mInputSpecs;
86
87 merger.outputs.push_back(mOutputSpecIntegral);
89 if (DataSpecUtils::validate(mOutputSpecIntegral) == false) {
90 // inner layer => generate output spec according to scheme
91 subSpec = mergerSubSpec(mLayer, mId);
95 subSpec, // it servers as a unique merger output ID
96 Lifetime::Sporadic};
97 } else {
98 // last layer
99 merger.outputs[0].binding = {mergerIntegralOutputBinding()};
100 }
101
103 merger.outputs.push_back(mOutputSpecMovingWindow);
104 }
105
107 merger.algorithm = framework::adaptFromTask<IntegratingMerger>(mConfig, subSpec);
108 } else {
109 merger.algorithm = framework::adaptFromTask<FullHistoryMerger>(mConfig, subSpec);
110 }
111
112 // Create the TimerSpec for cycleDurations
113 std::vector<o2::framework::TimerSpec> timers;
114 for (auto& [cycleDuration, validity] : mConfig.publicationDecision.param.decision) {
115 timers.push_back({cycleDuration * 1000000000 /*µs*/, validity});
116 }
117
118 merger.inputs.push_back({"timer-publish", "TMR", mergerDataDescription(mName), mergerSubSpec(mLayer, mId), framework::Lifetime::Timer, timerSpecs(timers)});
119 merger.labels.push_back(mergerLabel());
120 merger.labels.insert(merger.labels.end(), mConfig.labels.begin(), mConfig.labels.end());
121 std::sort(merger.labels.begin(), merger.labels.end());
122 merger.labels.erase(std::unique(merger.labels.begin(), merger.labels.end()), merger.labels.end());
123 merger.maxInputTimeslices = mTimePipeline;
124
125 return std::move(merger);
126}
127
128void MergerBuilder::customizeInfrastructure(std::vector<framework::CompletionPolicy>& policies)
129{
130 auto matcher = [label = mergerLabel()](framework::DeviceSpec const& device) {
131 return std::find(device.labels.begin(), device.labels.end(), label) != device.labels.end();
132 };
133 // each merger's name contains the common label and should always consume
134 policies.emplace_back(CompletionPolicyHelpers::consumeWhenAny("MergerCompletionPolicy", matcher));
135}
136
137} // namespace o2::mergers
Definition of O2 FullHistoryMerger, v0.1.
Definition of O2 IntegratingMerger, v0.1.
o2::header::DataHeader::SubSpecificationType SubSpecificationType
static std::string mergerIdString()
static header::DataOrigin mergerDataOrigin()
void setOutputSpecMovingWindow(const framework::OutputSpec &outputSpec)
void setTopologyPosition(size_t layer, size_t id)
static std::string mergerIntegralOutputBinding()
static framework::DataProcessorLabel mergerLabel()
static header::DataHeader::SubSpecificationType mergerSubSpec(size_t layer, size_t id)
static void customizeInfrastructure(std::vector< framework::CompletionPolicy > &)
Configures mergers to consume any data immediately.
framework::DataProcessorSpec buildSpec()
MergerBuilder()
Default constructor.
void setTimePipeline(size_t timepipeline)
void setConfig(MergerConfig)
static std::string mergerMovingWindowOutputBinding()
void setOutputSpec(const framework::OutputSpec &outputSpec)
static header::DataDescription mergerDataDescription(std::string name)
void setInputSpecs(const framework::Inputs &)
GLuint const GLchar * name
Definition glcorearb.h:781
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLenum GLuint GLint GLint layer
Definition glcorearb.h:1310
GLuint id
Definition glcorearb.h:650
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< InputSpec > Inputs
std::vector< ConfigParamSpec > timerSpecs(std::vector< TimerSpec > intervals)
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static CompletionPolicy consumeWhenAny(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
std::vector< DataProcessorLabel > labels
static bool validate(InputSpec const &input)
static std::optional< header::DataHeader::SubSpecificationType > getOptionalSubSpec(OutputSpec const &spec)
Get the subspec, if available.
ConfigEntry< PublicationDecision, PublicationDecisionParameter > publicationDecision
std::vector< o2::framework::DataProcessorLabel > labels
ConfigEntry< PublishMovingWindow > publishMovingWindow
ConfigEntry< InputObjectsTimespan > inputObjectTimespan