26#include <Monitoring/MonitoringFactory.h>
27#include <InfoLogger/InfoLogger.hxx>
31using namespace std::chrono;
44 delete mFirstObjectSerialized.second.header;
45 delete mFirstObjectSerialized.second.payload;
46 delete mFirstObjectSerialized.second.spec;
51 mCyclesSinceReset = 0;
52 mCollector = monitoring::MonitoringFactory::Get(mConfig.
monitoringUrl);
53 mCollector->addGlobalTag(monitoring::tags::Key::Subsystem, monitoring::tags::Value::Mergers);
60 auto& ilContext = ictx.
services().
get<AliceO2::InfoLogger::InfoLoggerContext>();
61 ilContext.setField(AliceO2::InfoLogger::InfoLoggerContext::FieldName::Detector, mConfig.
detectorName);
63 LOG(warn) <<
"Could not find the DPL InfoLogger Context.";
70 auto* timerHeader = ctx.
inputs().
get(
"timer-publish").header;
73 if (
ref.header != timerHeader) {
79 if (shouldFinishCycle(ctx.
inputs())) {
93 if (mFirstObjectSerialized.first.empty()) {
98 return inputs.
isValid(
"timer-publish");
100 return mUpdatesReceived > 0 && mUpdatesReceived % mConfig.
publicationDecision.param.decision.begin()->first == 0;
102 throw std::runtime_error(
"unsupported publication decision parameter");
113void FullHistoryMerger::clear()
115 mFirstObjectSerialized.first.clear();
116 delete mFirstObjectSerialized.second.header;
117 delete mFirstObjectSerialized.second.payload;
118 delete mFirstObjectSerialized.second.spec;
119 mFirstObjectSerialized.second.header =
nullptr;
120 mFirstObjectSerialized.second.payload =
nullptr;
121 mFirstObjectSerialized.second.spec =
nullptr;
122 mMergedObject = std::monostate{};
124 mCyclesSinceReset = 0;
125 mTotalObjectsMerged = 0;
127 mTotalUpdatesReceived = 0;
128 mUpdatesReceived = 0;
131void FullHistoryMerger::updateCache(
const DataRef&
ref)
133 auto* dh = DataRefUtils::getHeader<DataHeader*>(
ref);
135 std::string sourceID = std::string(dh->dataOrigin.str) +
"/" + std::string(dh->dataDescription.str) +
"/" +
std::to_string(dh->subSpecification);
138 if (mFirstObjectSerialized.first.empty() || mFirstObjectSerialized.first == sourceID) {
141 LOG(
debug) <<
"Received the first input object in the run or after the last moving window reset";
143 delete mFirstObjectSerialized.second.spec;
144 delete mFirstObjectSerialized.second.header;
145 delete mFirstObjectSerialized.second.payload;
147 mFirstObjectSerialized.first = sourceID;
148 mFirstObjectSerialized.second.spec =
new InputSpec(*
ref.spec);
149 mFirstObjectSerialized.second.header =
new char[
Stack::headerStackSize(
reinterpret_cast<std::byte const*
>(dh))];
150 memcpy((
void*)mFirstObjectSerialized.second.header,
ref.header, dh->headerSize);
151 mFirstObjectSerialized.second.payload =
new char[payloadSize];
152 memcpy((
void*)mFirstObjectSerialized.second.payload,
ref.payload, payloadSize);
159void FullHistoryMerger::mergeCache()
161 LOG(
debug) <<
"Merging " << mCache.size() + 1 <<
" objects.";
163 if (mFirstObjectSerialized.second.payload ==
nullptr) {
169 assert(!std::holds_alternative<std::monostate>(mMergedObject));
173 if (std::holds_alternative<TObjectPtr>(mMergedObject)) {
174 auto target = std::get<TObjectPtr>(mMergedObject);
182 }
else if (std::holds_alternative<MergeInterfacePtr>(mMergedObject)) {
183 auto target = std::get<MergeInterfacePtr>(mMergedObject);
186 auto other = std::get<MergeInterfacePtr>(
entry);
191 }
else if (std::holds_alternative<VectorOfTObjectPtrs>(mMergedObject)) {
192 auto target = std::get<VectorOfTObjectPtrs>(mMergedObject);
193 for (
auto& [_,
entry] : mCache) {
194 auto other = std::get<VectorOfTObjectPtrs>(
entry);
196 mObjectsMerged +=
target.size();
203 if (std::holds_alternative<std::monostate>(mMergedObject)) {
204 LOG(info) <<
"No objects received since start or reset, nothing to publish";
206 LOG(info) <<
"Published the merged object containing " << mCache.size() + 1 <<
" incomplete objects. "
207 << mUpdatesReceived <<
" updates were received during the last cycle.";
209 throw std::runtime_error(
"mMergedObjectIntegral' variant has no value.");
212 mTotalObjectsMerged += mObjectsMerged;
213 mTotalUpdatesReceived += mUpdatesReceived;
214 mCollector->send({mTotalObjectsMerged,
"total_objects_merged"}, monitoring::DerivedMetricMode::RATE);
215 mCollector->send({mObjectsMerged,
"objects_merged_since_last_publication"});
216 mCollector->send({mTotalUpdatesReceived,
"total_updates_received"}, monitoring::DerivedMetricMode::RATE);
217 mCollector->send({mUpdatesReceived,
"updates_received_since_last_publication"});
218 mCollector->send({mCyclesSinceReset,
"cycles_since_reset"});
220 mUpdatesReceived = 0;
Definition of O2 FullHistoryMerger, v0.1.
Definition of O2 Mergers merging interface, v0.1.
Algorithms for merging objects.
DataAllocator & outputs()
ServiceRegistryRef services()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
FullHistoryMerger(const MergerConfig &, const header::DataHeader::SubSpecificationType &)
Default constructor. It expects Merger configuration and subSpec of output channel.
void endOfStream(framework::EndOfStreamContext &eosContext) override
Callback for CallbackService::Id::EndOfStream.
~FullHistoryMerger() override
Default destructor.
void run(framework::ProcessingContext &ctx) override
FullHistoryMerger process callback.
void init(framework::InitContext &ctx) override
FullHistoryMerger init callback.
GLuint const GLchar * name
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
Defining PrimaryVertex explicitly as messageable.
void merge(TObject *const target, TObject *const other)
A function which merges TObjects.
ObjectStore extractObjectFrom(const framework::DataRef &ref)
Takes a DataRef, deserializes it (if type is supported) and puts into an ObjectStore.
bool snapshot(framework::DataAllocator &allocator, const header::DataHeader::SubSpecificationType subSpec, const ObjectStore &mergedObject)
Used in FullHistorMerger's and IntegratingMerger's publish function. Checks mergedObject for every st...
std::string to_string(gsl::span< T, Size > span)
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
ConfigEntry< PublicationDecision, PublicationDecisionParameter > publicationDecision
std::string monitoringUrl
ConfigEntry< MergedObjectTimespan, int > mergedObjectTimespan
VectorOfTObjectPtrs other
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"