Project
Loading...
Searching...
No Matches
IntegratingMerger.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
18
21
22#include <InfoLogger/InfoLogger.hxx>
23
24#include <Monitoring/MonitoringFactory.h>
25
27#include "Framework/Logger.h"
28
29using namespace o2::framework;
30
31namespace o2::mergers
32{
33
35 : mConfig(config),
36 mSubSpec(subSpec)
37{
38}
39
41{
42 mCyclesSinceReset = 0;
43 mCollector = monitoring::MonitoringFactory::Get(mConfig.monitoringUrl);
44 mCollector->addGlobalTag(monitoring::tags::Key::Subsystem, monitoring::tags::Value::Mergers);
45
46 // clear the state before starting the run, especially important for START->STOP->START sequence
47 ictx.services().get<CallbackService>().set<CallbackService::Id::Start>([this]() { clear(); });
48
49 // set detector field in infologger
50 try {
51 auto& ilContext = ictx.services().get<AliceO2::InfoLogger::InfoLoggerContext>();
52 ilContext.setField(AliceO2::InfoLogger::InfoLoggerContext::FieldName::Detector, mConfig.detectorName);
53 } catch (const RuntimeErrorRef& err) {
54 LOG(warn) << "Could not find the DPL InfoLogger Context.";
55 }
56}
57
59{
60 // we have to avoid mistaking the timer input with data inputs.
61 auto* timerHeader = ctx.inputs().get("timer-publish").header;
62
63 for (const DataRef& ref : InputRecordWalker(ctx.inputs())) {
64 if (ref.header != timerHeader) {
66 merge(mMergedObjectLastCycle, std::move(other));
67 mDeltasMerged++;
68 }
69 }
70
71 if (shouldFinishCycle(ctx.inputs())) {
72 finishCycle(ctx.outputs());
73 }
74}
75
76bool IntegratingMerger::shouldFinishCycle(const framework::InputRecord& inputs) const
77{
79 return inputs.isValid("timer-publish");
80 } else if (mConfig.publicationDecision.value == PublicationDecision::EachNArrivals) {
81 return mDeltasMerged > 0 && mDeltasMerged % mConfig.publicationDecision.param.decision.begin()->first == 0;
82 } else {
83 throw std::runtime_error("unsupported publication decision parameter");
84 }
85}
86
87void IntegratingMerger::finishCycle(DataAllocator& outputs)
88{
89 mCyclesSinceReset++;
90
92 publishMovingWindow(outputs);
93 }
94
95 if (!std::holds_alternative<std::monostate>(mMergedObjectLastCycle)) {
96 merge(mMergedObjectIntegral, std::move(mMergedObjectLastCycle));
97 }
98 mMergedObjectLastCycle = std::monostate{};
99 mTotalDeltasMerged += mDeltasMerged;
100
101 publishIntegral(outputs);
102
104 mConfig.mergedObjectTimespan.value == MergedObjectTimespan::NCycles && mConfig.mergedObjectTimespan.param == mCyclesSinceReset) {
105 clear();
106 }
107
108 mCollector->send({mTotalDeltasMerged, "total_deltas_merged"}, monitoring::DerivedMetricMode::RATE);
109 mCollector->send({mDeltasMerged, "deltas_merged_since_last_publication"});
110 mCollector->send({mCyclesSinceReset, "cycles_since_reset"});
111 mDeltasMerged = 0;
112}
113
114void IntegratingMerger::merge(ObjectStore& target, ObjectStore&& other)
115{
116 if (std::holds_alternative<std::monostate>(target)) {
117 LOG(debug) << "Received the first input object in the run or after the last delta reset";
118 target = std::move(other);
119 other = std::monostate{};
120 } else if (std::holds_alternative<TObjectPtr>(target)) {
121 // We expect that if the first object was TObject, then all should.
122 auto targetAsTObject = std::get<TObjectPtr>(target);
123 auto otherAsTObject = std::get<TObjectPtr>(other);
124 algorithm::merge(targetAsTObject.get(), otherAsTObject.get());
125 } else if (std::holds_alternative<MergeInterfacePtr>(target)) {
126 // We expect that if the first object inherited MergeInterface, then all should.
127 auto otherAsMergeInterface = std::get<MergeInterfacePtr>(other);
128 std::get<MergeInterfacePtr>(target)->merge(otherAsMergeInterface.get());
129 } else if (std::holds_alternative<VectorOfTObjectPtrs>(target)) {
130 // We expect that if the first object was Vector of TObjects, then all should.
131 auto targetAsVector = std::get<VectorOfTObjectPtrs>(target);
132 const auto otherAsVector = std::get<VectorOfTObjectPtrs>(other);
133 algorithm::merge(targetAsVector, otherAsVector);
134 } else {
135 LOG(error) << "The target variant has an unrecognized value";
136 }
137}
138
140{
141 finishCycle(eosContext.outputs());
142}
143
144// I am not calling it reset(), because it does not have to be performed during the FairMQs reset.
145void IntegratingMerger::clear()
146{
147 mMergedObjectLastCycle = std::monostate{};
148 mMergedObjectIntegral = std::monostate{};
149 mCyclesSinceReset = 0;
150 mTotalDeltasMerged = 0;
151 mDeltasMerged = 0;
152}
153
154void IntegratingMerger::publishIntegral(framework::DataAllocator& allocator)
155{
156 if (std::holds_alternative<std::monostate>(mMergedObjectIntegral)) {
157 LOG(info) << "No objects received since start or reset, nothing to publish";
158 } else if (object_store_helpers::snapshot(allocator, mSubSpec, mMergedObjectIntegral)) {
159 LOG(info) << "Published the merged object with " << mTotalDeltasMerged << " deltas in total,"
160 << " including " << mDeltasMerged << " in the last cycle.";
161 } else {
162 LOG(error) << "mMergedObjectIntegral' variant has an unrecognized value.";
163 }
164}
165
166void IntegratingMerger::publishMovingWindow(framework::DataAllocator& allocator)
167{
168 if (std::holds_alternative<std::monostate>(mMergedObjectLastCycle)) {
169 LOG(info) << "No objects received since the last reset, no moving window to publish";
170 } else if (std::holds_alternative<MergeInterfacePtr>(mMergedObjectLastCycle)) {
171 // if there is MergeInterface, we can publish only the selected moving windows
172 if (auto mw = std::get<MergeInterfacePtr>(mMergedObjectLastCycle)->cloneMovingWindow()) {
174 delete mw;
175 }
176 LOG(info) << "Published a moving window with " << mDeltasMerged << " deltas";
177 } else if (std::holds_alternative<TObjectPtr>(mMergedObjectLastCycle)) {
178 // if there is no MergeInterface, we just publish all deltas
180 *std::get<TObjectPtr>(mMergedObjectLastCycle));
181 LOG(info) << "Published a moving window with " << mDeltasMerged << " deltas.";
182 } else if (std::holds_alternative<VectorOfTObjectPtrs>(mMergedObjectLastCycle)) {
183 const auto& mergedVector = std::get<VectorOfTObjectPtrs>(mMergedObjectLastCycle);
184 const auto vectorToSnapshot = object_store_helpers::toRawObserverPointers(mergedVector);
185 allocator.snapshot(framework::OutputRef{MergerBuilder::mergerIntegralOutputBinding(), mSubSpec}, vectorToSnapshot);
186 } else {
187 LOG(error) << "mMergedObjectIntegral' variant has an unrecognized value.";
188 }
189}
190
191} // namespace o2::mergers
A helper class to iteratate over all parts of all input routes.
Definition of O2 IntegratingMerger, v0.1.
Algorithms for merging objects.
std::ostringstream debug
void snapshot(const Output &spec, T const &object)
ServiceRegistryRef services()
Definition InitContext.h:34
A helper class to iteratate over all parts of all input routes.
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
bool isValid(std::string const &s) const
Helper method to be used to check if a given part of the InputRecord is present.
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
IntegratingMerger(const MergerConfig &, const header::DataHeader::SubSpecificationType &)
Default constructor. It expects Merger configuration and subSpec of output channel.
void endOfStream(framework::EndOfStreamContext &eosContext) override
Callback for CallbackService::Id::EndOfStream.
void run(framework::ProcessingContext &ctx) override
IntegratingMerger process callback.
void init(framework::InitContext &ctx) override
IntegratingMerger init callback.
static std::string mergerIntegralOutputBinding()
static std::string mergerMovingWindowOutputBinding()
GLenum target
Definition glcorearb.h:1641
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
void merge(TObject *const target, TObject *const other)
A function which merges TObjects.
ObjectStore extractObjectFrom(const framework::DataRef &ref)
Takes a DataRef, deserializes it (if type is supported) and puts into an ObjectStore.
VectorOfRawTObjects toRawObserverPointers(const VectorOfTObjectPtrs &)
Helper function that converts vector of smart pointers to the vector of raw pointers that is serializ...
bool snapshot(framework::DataAllocator &allocator, const header::DataHeader::SubSpecificationType subSpec, const ObjectStore &mergedObject)
Used in FullHistorMerger's and IntegratingMerger's publish function. Checks mergedObject for every st...
std::variant< std::monostate, TObjectPtr, VectorOfTObjectPtrs, MergeInterfacePtr > ObjectStore
Definition ObjectStore.h:45
uint32_t SubSpecificationType
Definition DataHeader.h:620
ConfigEntry< PublicationDecision, PublicationDecisionParameter > publicationDecision
ConfigEntry< PublishMovingWindow > publishMovingWindow
ConfigEntry< MergedObjectTimespan, int > mergedObjectTimespan
VectorOfTObjectPtrs other
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"