22#include <InfoLogger/InfoLogger.hxx>
24#include <Monitoring/MonitoringFactory.h>
42 mCyclesSinceReset = 0;
43 mCollector = monitoring::MonitoringFactory::Get(mConfig.
monitoringUrl);
44 mCollector->addGlobalTag(monitoring::tags::Key::Subsystem, monitoring::tags::Value::Mergers);
51 auto& ilContext = ictx.
services().
get<AliceO2::InfoLogger::InfoLoggerContext>();
52 ilContext.setField(AliceO2::InfoLogger::InfoLoggerContext::FieldName::Detector, mConfig.
detectorName);
54 LOG(warn) <<
"Could not find the DPL InfoLogger Context.";
61 auto* timerHeader = ctx.
inputs().
get(
"timer-publish").header;
64 if (
ref.header != timerHeader) {
66 merge(mMergedObjectLastCycle, std::move(
other));
71 if (shouldFinishCycle(ctx.
inputs())) {
79 return inputs.
isValid(
"timer-publish");
81 return mDeltasMerged > 0 && mDeltasMerged % mConfig.
publicationDecision.param.decision.begin()->first == 0;
83 throw std::runtime_error(
"unsupported publication decision parameter");
92 publishMovingWindow(outputs);
95 if (!std::holds_alternative<std::monostate>(mMergedObjectLastCycle)) {
96 merge(mMergedObjectIntegral, std::move(mMergedObjectLastCycle));
98 mMergedObjectLastCycle = std::monostate{};
99 mTotalDeltasMerged += mDeltasMerged;
101 publishIntegral(outputs);
108 mCollector->send({mTotalDeltasMerged,
"total_deltas_merged"}, monitoring::DerivedMetricMode::RATE);
109 mCollector->send({mDeltasMerged,
"deltas_merged_since_last_publication"});
110 mCollector->send({mCyclesSinceReset,
"cycles_since_reset"});
116 if (std::holds_alternative<std::monostate>(
target)) {
117 LOG(
debug) <<
"Received the first input object in the run or after the last delta reset";
119 other = std::monostate{};
120 }
else if (std::holds_alternative<TObjectPtr>(
target)) {
122 auto targetAsTObject = std::get<TObjectPtr>(
target);
123 auto otherAsTObject = std::get<TObjectPtr>(
other);
125 }
else if (std::holds_alternative<MergeInterfacePtr>(
target)) {
127 auto otherAsMergeInterface = std::get<MergeInterfacePtr>(
other);
128 std::get<MergeInterfacePtr>(
target)->merge(otherAsMergeInterface.get());
129 }
else if (std::holds_alternative<VectorOfTObjectPtrs>(
target)) {
131 auto targetAsVector = std::get<VectorOfTObjectPtrs>(
target);
132 const auto otherAsVector = std::get<VectorOfTObjectPtrs>(
other);
135 LOG(error) <<
"The target variant has an unrecognized value";
141 finishCycle(eosContext.
outputs());
145void IntegratingMerger::clear()
147 mMergedObjectLastCycle = std::monostate{};
148 mMergedObjectIntegral = std::monostate{};
149 mCyclesSinceReset = 0;
150 mTotalDeltasMerged = 0;
156 if (std::holds_alternative<std::monostate>(mMergedObjectIntegral)) {
157 LOG(info) <<
"No objects received since start or reset, nothing to publish";
159 LOG(info) <<
"Published the merged object with " << mTotalDeltasMerged <<
" deltas in total,"
160 <<
" including " << mDeltasMerged <<
" in the last cycle.";
162 LOG(error) <<
"mMergedObjectIntegral' variant has an unrecognized value.";
168 if (std::holds_alternative<std::monostate>(mMergedObjectLastCycle)) {
169 LOG(info) <<
"No objects received since the last reset, no moving window to publish";
170 }
else if (std::holds_alternative<MergeInterfacePtr>(mMergedObjectLastCycle)) {
172 if (
auto mw = std::get<MergeInterfacePtr>(mMergedObjectLastCycle)->cloneMovingWindow()) {
176 LOG(info) <<
"Published a moving window with " << mDeltasMerged <<
" deltas";
177 }
else if (std::holds_alternative<TObjectPtr>(mMergedObjectLastCycle)) {
180 *std::get<TObjectPtr>(mMergedObjectLastCycle));
181 LOG(info) <<
"Published a moving window with " << mDeltasMerged <<
" deltas.";
182 }
else if (std::holds_alternative<VectorOfTObjectPtrs>(mMergedObjectLastCycle)) {
183 const auto& mergedVector = std::get<VectorOfTObjectPtrs>(mMergedObjectLastCycle);
187 LOG(error) <<
"mMergedObjectIntegral' variant has an unrecognized value.";
Definition of O2 IntegratingMerger, v0.1.
Algorithms for merging objects.
void snapshot(const Output &spec, T const &object)
DataAllocator & outputs()
ServiceRegistryRef services()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
IntegratingMerger(const MergerConfig &, const header::DataHeader::SubSpecificationType &)
Default constructor. It expects Merger configuration and subSpec of output channel.
void endOfStream(framework::EndOfStreamContext &eosContext) override
Callback for CallbackService::Id::EndOfStream.
void run(framework::ProcessingContext &ctx) override
IntegratingMerger process callback.
void init(framework::InitContext &ctx) override
IntegratingMerger init callback.
static std::string mergerIntegralOutputBinding()
static std::string mergerMovingWindowOutputBinding()
Defining PrimaryVertex explicitly as messageable.
void merge(TObject *const target, TObject *const other)
A function which merges TObjects.
ObjectStore extractObjectFrom(const framework::DataRef &ref)
Takes a DataRef, deserializes it (if type is supported) and puts into an ObjectStore.
VectorOfRawTObjects toRawObserverPointers(const VectorOfTObjectPtrs &)
Helper function that converts vector of smart pointers to the vector of raw pointers that is serializ...
bool snapshot(framework::DataAllocator &allocator, const header::DataHeader::SubSpecificationType subSpec, const ObjectStore &mergedObject)
Used in FullHistorMerger's and IntegratingMerger's publish function. Checks mergedObject for every st...
std::variant< std::monostate, TObjectPtr, VectorOfTObjectPtrs, MergeInterfacePtr > ObjectStore
ConfigEntry< PublicationDecision, PublicationDecisionParameter > publicationDecision
ConfigEntry< PublishMovingWindow > publishMovingWindow
std::string monitoringUrl
ConfigEntry< MergedObjectTimespan, int > mergedObjectTimespan
VectorOfTObjectPtrs other
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"