Project
Loading...
Searching...
No Matches
FullHistoryMerger.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
21
22#include "Headers/DataHeader.h"
25#include "Framework/Logger.h"
26#include <Monitoring/MonitoringFactory.h>
27#include <InfoLogger/InfoLogger.hxx>
28
29using namespace o2::header;
30using namespace o2::framework;
31using namespace std::chrono;
32
33namespace o2::mergers
34{
35
37 : mConfig(config),
38 mSubSpec(subSpec)
39{
40}
41
43{
44 delete mFirstObjectSerialized.second.header;
45 delete mFirstObjectSerialized.second.payload;
46 delete mFirstObjectSerialized.second.spec;
47}
48
50{
51 mCyclesSinceReset = 0;
52 mCollector = monitoring::MonitoringFactory::Get(mConfig.monitoringUrl);
53 mCollector->addGlobalTag(monitoring::tags::Key::Subsystem, monitoring::tags::Value::Mergers);
54
55 // clear the state before starting the run, especially important for START->STOP->START sequence
56 ictx.services().get<CallbackService>().set<CallbackService::Id::Start>([this]() { clear(); });
57
58 // set detector field in infologger
59 try {
60 auto& ilContext = ictx.services().get<AliceO2::InfoLogger::InfoLoggerContext>();
61 ilContext.setField(AliceO2::InfoLogger::InfoLoggerContext::FieldName::Detector, mConfig.detectorName);
62 } catch (const RuntimeErrorRef& err) {
63 LOG(warn) << "Could not find the DPL InfoLogger Context.";
64 }
65}
66
68{
69 // we have to avoid mistaking the timer input with data inputs.
70 auto* timerHeader = ctx.inputs().get("timer-publish").header;
71
72 for (const DataRef& ref : InputRecordWalker(ctx.inputs())) {
73 if (ref.header != timerHeader) {
74 updateCache(ref);
75 mUpdatesReceived++;
76 }
77 }
78
79 if (shouldFinishCycle(ctx.inputs())) {
80 mCyclesSinceReset++;
81 mergeCache();
82 publish(ctx.outputs());
83
85 mConfig.mergedObjectTimespan.value == MergedObjectTimespan::NCycles && mConfig.mergedObjectTimespan.param == mCyclesSinceReset) {
86 clear();
87 }
88 }
89}
90
91bool FullHistoryMerger::shouldFinishCycle(const framework::InputRecord& inputs) const
92{
93 if (mFirstObjectSerialized.first.empty()) {
94 return false;
95 }
96
98 return inputs.isValid("timer-publish");
99 } else if (mConfig.publicationDecision.value == PublicationDecision::EachNArrivals) {
100 return mUpdatesReceived > 0 && mUpdatesReceived % mConfig.publicationDecision.param.decision.begin()->first == 0;
101 } else {
102 throw std::runtime_error("unsupported publication decision parameter");
103 }
104}
105
107{
108 mergeCache();
109 publish(eosContext.outputs());
110}
111
112// I am not calling it reset(), because it does not have to be performed during the FairMQs reset.
113void FullHistoryMerger::clear()
114{
115 mFirstObjectSerialized.first.clear();
116 delete mFirstObjectSerialized.second.header;
117 delete mFirstObjectSerialized.second.payload;
118 delete mFirstObjectSerialized.second.spec;
119 mFirstObjectSerialized.second.header = nullptr;
120 mFirstObjectSerialized.second.payload = nullptr;
121 mFirstObjectSerialized.second.spec = nullptr;
122 mMergedObject = std::monostate{};
123 mCache.clear();
124 mCyclesSinceReset = 0;
125 mTotalObjectsMerged = 0;
126 mObjectsMerged = 0;
127 mTotalUpdatesReceived = 0;
128 mUpdatesReceived = 0;
129}
130
131void FullHistoryMerger::updateCache(const DataRef& ref)
132{
133 auto* dh = DataRefUtils::getHeader<DataHeader*>(ref);
134 auto payloadSize = DataRefUtils::getPayloadSize(ref);
135 std::string sourceID = std::string(dh->dataOrigin.str) + "/" + std::string(dh->dataDescription.str) + "/" + std::to_string(dh->subSpecification);
136
137 // I am not sure if ref.spec is always a concrete spec and not a broader matcher. Comparing it this way should be safer.
138 if (mFirstObjectSerialized.first.empty() || mFirstObjectSerialized.first == sourceID) {
139 // We store one object in the serialized form, so we can take it as the first object to be merged (multiple times).
140 // If we kept it deserialized, we would need to require implementing a clone() method in MergeInterface.
141 LOG(debug) << "Received the first input object in the run or after the last moving window reset";
142
143 delete mFirstObjectSerialized.second.spec;
144 delete mFirstObjectSerialized.second.header;
145 delete mFirstObjectSerialized.second.payload;
146
147 mFirstObjectSerialized.first = sourceID;
148 mFirstObjectSerialized.second.spec = new InputSpec(*ref.spec);
149 mFirstObjectSerialized.second.header = new char[Stack::headerStackSize(reinterpret_cast<std::byte const*>(dh))];
150 memcpy((void*)mFirstObjectSerialized.second.header, ref.header, dh->headerSize);
151 mFirstObjectSerialized.second.payload = new char[payloadSize];
152 memcpy((void*)mFirstObjectSerialized.second.payload, ref.payload, payloadSize);
153
154 } else {
156 }
157}
158
159void FullHistoryMerger::mergeCache()
160{
161 LOG(debug) << "Merging " << mCache.size() + 1 << " objects.";
162
163 if (mFirstObjectSerialized.second.payload == nullptr) {
164 // no objects arrived to the Merger yet, nothing to use.
165 return;
166 }
167
168 mMergedObject = object_store_helpers::extractObjectFrom(mFirstObjectSerialized.second);
169 assert(!std::holds_alternative<std::monostate>(mMergedObject));
170 mObjectsMerged++;
171
172 // We expect that all the objects use the same kind of interface
173 if (std::holds_alternative<TObjectPtr>(mMergedObject)) {
174 auto target = std::get<TObjectPtr>(mMergedObject);
175 for (auto& [name, entry] : mCache) {
176 (void)name;
177 auto other = std::get<TObjectPtr>(entry);
178 algorithm::merge(target.get(), other.get());
179 mObjectsMerged++;
180 }
181
182 } else if (std::holds_alternative<MergeInterfacePtr>(mMergedObject)) {
183 auto target = std::get<MergeInterfacePtr>(mMergedObject);
184 for (auto& [name, entry] : mCache) {
185 (void)name;
186 auto other = std::get<MergeInterfacePtr>(entry);
187 target->merge(other.get());
188 mObjectsMerged++;
189 }
190
191 } else if (std::holds_alternative<VectorOfTObjectPtrs>(mMergedObject)) {
192 auto target = std::get<VectorOfTObjectPtrs>(mMergedObject);
193 for (auto& [_, entry] : mCache) {
194 auto other = std::get<VectorOfTObjectPtrs>(entry);
196 mObjectsMerged += target.size();
197 }
198 }
199}
200
201void FullHistoryMerger::publish(framework::DataAllocator& allocator)
202{
203 if (std::holds_alternative<std::monostate>(mMergedObject)) {
204 LOG(info) << "No objects received since start or reset, nothing to publish";
205 } else if (object_store_helpers::snapshot(allocator, mSubSpec, mMergedObject)) {
206 LOG(info) << "Published the merged object containing " << mCache.size() + 1 << " incomplete objects. "
207 << mUpdatesReceived << " updates were received during the last cycle.";
208 } else {
209 throw std::runtime_error("mMergedObjectIntegral' variant has no value.");
210 }
211
212 mTotalObjectsMerged += mObjectsMerged;
213 mTotalUpdatesReceived += mUpdatesReceived;
214 mCollector->send({mTotalObjectsMerged, "total_objects_merged"}, monitoring::DerivedMetricMode::RATE);
215 mCollector->send({mObjectsMerged, "objects_merged_since_last_publication"});
216 mCollector->send({mTotalUpdatesReceived, "total_updates_received"}, monitoring::DerivedMetricMode::RATE);
217 mCollector->send({mUpdatesReceived, "updates_received_since_last_publication"});
218 mCollector->send({mCyclesSinceReset, "cycles_since_reset"});
219 mObjectsMerged = 0;
220 mUpdatesReceived = 0;
221}
222
223} // namespace o2::mergers
Definition of O2 FullHistoryMerger, v0.1.
A helper class to iteratate over all parts of all input routes.
Definition of O2 Mergers merging interface, v0.1.
Algorithms for merging objects.
std::ostringstream debug
ServiceRegistryRef services()
Definition InitContext.h:34
A helper class to iteratate over all parts of all input routes.
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
bool isValid(std::string const &s) const
Helper method to be used to check if a given part of the InputRecord is present.
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
FullHistoryMerger(const MergerConfig &, const header::DataHeader::SubSpecificationType &)
Default constructor. It expects Merger configuration and subSpec of output channel.
void endOfStream(framework::EndOfStreamContext &eosContext) override
Callback for CallbackService::Id::EndOfStream.
~FullHistoryMerger() override
Default destructor.
void run(framework::ProcessingContext &ctx) override
FullHistoryMerger process callback.
void init(framework::InitContext &ctx) override
FullHistoryMerger init callback.
GLuint entry
Definition glcorearb.h:5735
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum target
Definition glcorearb.h:1641
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
O2 data header classes and API, v0.1.
Definition DetID.h:49
void merge(TObject *const target, TObject *const other)
A function which merges TObjects.
ObjectStore extractObjectFrom(const framework::DataRef &ref)
Takes a DataRef, deserializes it (if type is supported) and puts into an ObjectStore.
bool snapshot(framework::DataAllocator &allocator, const header::DataHeader::SubSpecificationType subSpec, const ObjectStore &mergedObject)
Used in FullHistorMerger's and IntegratingMerger's publish function. Checks mergedObject for every st...
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
uint32_t SubSpecificationType
Definition DataHeader.h:620
static size_t headerStackSize(std::byte const *buf)
Definition Stack.h:71
ConfigEntry< PublicationDecision, PublicationDecisionParameter > publicationDecision
ConfigEntry< MergedObjectTimespan, int > mergedObjectTimespan
VectorOfTObjectPtrs other
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"