1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See for details of the copyright holders.
3// All rights not expressly granted are reserved.
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
20#include "Framework/DataRef.h"
22#include "Framework/InputSpan.h"
24#include "Framework/Logger.h"
25#include "Framework/PartRef.h"
31#include "DataRelayerHelpers.h"
32#include "InputRouteHelpers.h"
42#include <Monitoring/Metric.h>
43#include <Monitoring/Monitoring.h>
45#include <fairmq/Channel.h>
46#include <fmt/format.h>
47#include <fmt/ostream.h>
48#include <gsl/span>
49#include <numeric>
50#include <string>
52using namespace o2::framework::data_matcher;
55using Verbosity = o2::monitoring::Verbosity;
59namespace o2::framework
62constexpr int INVALID_INPUT = -1;
65 std::vector<InputRoute> const& routes,
67 ServiceRegistryRef services)
68 : mContext{services},
69 mTimesliceIndex{index},
70 mCompletionPolicy{policy},
71 mDistinctRoutesIndex{DataRelayerHelpers::createDistinctRouteIndex(routes)},
72 mInputMatchers{DataRelayerHelpers::createInputMatchers(routes)},
73 mMaxLanes{InputRouteHelpers::maxLanes(routes)}
75 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
77 if (policy.configureRelayer == nullptr) {
78 static int pipelineLength = DefaultsHelpers::pipelineLength();
79 setPipelineLength(pipelineLength);
80 } else {
81 policy.configureRelayer(*this);
82 }
84 // The queries are all the same, so we only have width 1
85 auto numInputTypes = mDistinctRoutesIndex.size();
86 auto& states = services.get<DataProcessingStates>();
87 std::string queries = "";
88 for (short i = 0; i < numInputTypes; ++i) {
89 char buffer[128];
90 assert(mDistinctRoutesIndex[i] < routes.size());
91 mInputs.push_back(routes[mDistinctRoutesIndex[i]].matcher);
92 auto& matcher = routes[mDistinctRoutesIndex[i]].matcher;
93 DataSpecUtils::describe(buffer, 127, matcher);
94 queries += std::string_view(buffer, strlen(buffer));
95 queries += ";";
96 }
98 states.registerState({.name = "data_queries", .stateId = stateId, .sendInitialValue = true, .defaultEnabled = true});
99 states.updateState(DataProcessingStates::CommandSpec{.id = stateId, .size = (int)queries.size(), .data =});
100 states.processCommandQueue();
105 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
106 auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
107 return VariableContextHelpers::getTimeslice(variables);
110DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<ExpirationHandler> const& expirationHandlers,
111 ServiceRegistryRef services, bool createNew)
113 LOGP(debug, "DataRelayer::processDanglingInputs");
114 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
115 auto& deviceProxy = services.get<FairMQDeviceProxy>();
117 ActivityStats activity;
119 if (expirationHandlers.empty()) {
120 LOGP(debug, "DataRelayer::processDanglingInputs: No expiration handlers");
121 return activity;
122 }
123 // Create any slot for the time based fields
124 std::vector<TimesliceSlot> slotsCreatedByHandlers;
125 if (createNew) {
126 LOGP(debug, "Creating new slot");
127 for (auto& handler : expirationHandlers) {
128 LOGP(debug, "handler.creator for {}",;
129 auto channelIndex = deviceProxy.getInputChannelIndex(handler.routeIndex);
130 slotsCreatedByHandlers.push_back(handler.creator(services, channelIndex));
131 }
132 }
133 // Count how many slots are not invalid
134 auto validSlots = 0;
135 for (auto slot : slotsCreatedByHandlers) {
136 if (slot.index == TimesliceSlot::INVALID) {
137 continue;
138 }
139 validSlots++;
140 }
141 if (validSlots > 0) {
142 activity.newSlots++;
143 LOGP(debug, "DataRelayer::processDanglingInputs: {} slots created by handler", validSlots);
144 } else {
145 LOGP(debug, "DataRelayer::processDanglingInputs: no slots created by handler");
146 }
147 // Outer loop, we process all the records because the fact that the record
148 // expires is independent from having received data for it.
149 int headerPresent = 0;
150 int payloadPresent = 0;
151 int noCheckers = 0;
152 int badSlot = 0;
153 int checkerDenied = 0;
154 for (size_t ti = 0; ti < mTimesliceIndex.size(); ++ti) {
155 TimesliceSlot slot{ti};
156 if (mTimesliceIndex.isValid(slot) == false) {
157 continue;
158 }
159 assert(mDistinctRoutesIndex.empty() == false);
160 auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
161 auto timestamp = VariableContextHelpers::getTimeslice(variables);
162 // We iterate on all the hanlders checking if they need to be expired.
163 for (size_t ei = 0; ei < expirationHandlers.size(); ++ei) {
164 auto& expirator = expirationHandlers[ei];
165 // We check that no data is already there for the given cell
166 // it is enough to check the first element
167 auto& part = mCache[ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value];
168 if (part.size() > 0 && part.header(0) != nullptr) {
169 headerPresent++;
170 continue;
171 }
172 if (part.size() > 0 && part.payload(0) != nullptr) {
173 payloadPresent++;
174 continue;
175 }
176 // We check that the cell can actually be expired.
177 if (!expirator.checker) {
178 noCheckers++;
179 continue;
180 }
181 if (slotsCreatedByHandlers[ei] != slot) {
182 badSlot++;
183 continue;
184 }
186 auto getPartialRecord = [&cache = mCache, numInputTypes = mDistinctRoutesIndex.size()](int li) -> gsl::span<MessageSet const> {
187 auto offset = li * numInputTypes;
188 assert(cache.size() >= offset + numInputTypes);
189 auto const start = + offset;
190 auto const end = + offset + numInputTypes;
191 return {start, end};
192 };
194 auto partial = getPartialRecord(ti);
195 // TODO: get the data ref from message model
196 auto getter = [&partial](size_t idx, size_t part) {
197 if (partial[idx].size() > 0 && partial[idx].header(part).get()) {
198 auto header = partial[idx].header(part).get();
199 auto payload = partial[idx].payload(part).get();
200 return DataRef{nullptr,
201 reinterpret_cast<const char*>(header->GetData()),
202 reinterpret_cast<char const*>(payload ? payload->GetData() : nullptr),
203 payload ? payload->GetSize() : 0};
204 }
205 return DataRef{};
206 };
207 auto nPartsGetter = [&partial](size_t idx) {
208 return partial[idx].size();
209 };
210 InputSpan span{getter, nPartsGetter, static_cast<size_t>(partial.size())};
211 // Setup the input span
213 if (expirator.checker(services, timestamp.value, span) == false) {
214 checkerDenied++;
215 continue;
216 }
218 assert(ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value < mCache.size());
219 assert(expirator.handler);
220 PartRef newRef;
221 expirator.handler(services, newRef, variables);
222 part.reset(std::move(newRef));
223 activity.expiredSlots++;
225 mTimesliceIndex.markAsDirty(slot, true);
226 assert(part.header(0) != nullptr);
227 assert(part.payload(0) != nullptr);
228 }
229 }
230 LOGP(debug, "DataRelayer::processDanglingInputs headerPresent:{}, payloadPresent:{}, noCheckers:{}, badSlot:{}, checkerDenied:{}",
231 headerPresent, payloadPresent, noCheckers, badSlot, checkerDenied);
232 return activity;
238size_t matchToContext(void const* data,
239 std::vector<DataDescriptorMatcher> const& matchers,
240 std::vector<size_t> const& index,
241 VariableContext& context)
243 for (size_t ri = 0, re = index.size(); ri < re; ++ri) {
244 auto& matcher = matchers[index[ri]];
246 if (matcher.match(reinterpret_cast<char const*>(data), context)) {
247 context.commit();
248 return ri;
249 }
250 context.discard();
251 }
252 return INVALID_INPUT;
259 static const std::string nullstring{"null"};
261 context.publish([](VariableContext const& variables, TimesliceSlot slot, void* context) {
262 auto& states = *static_cast<DataProcessingStates*>(context);
263 static std::string state = "";
264 state.clear();
265 for (size_t i = 0; i < MAX_MATCHING_VARIABLE; ++i) {
266 auto var = variables.get(i);
267 if (auto pval = std::get_if<uint64_t>(&var)) {
268 state += std::to_string(*pval);
269 } else if (auto pval = std::get_if<uint32_t>(&var)) {
270 state += std::to_string(*pval);
271 } else if (auto pval2 = std::get_if<std::string>(&var)) {
272 state += *pval2;
273 } else {
274 }
275 state += ";";
276 }
277 states.updateState({.id = short((int)ProcessingStateId::CONTEXT_VARIABLES_BASE + slot.index),
278 .size = (int)state.size(),
279 .data =});
280 },
281 &states, slot);
286 auto newOldest = mTimesliceIndex.setOldestPossibleInput(proposed, channel);
287 LOGP(debug, "DataRelayer::setOldestPossibleInput {} from channel {}", newOldest.timeslice.value,;
288 static bool dontDrop = getenv("DPL_DONT_DROP_OLD_TIMESLICE") && atoi(getenv("DPL_DONT_DROP_OLD_TIMESLICE"));
289 if (dontDrop) {
290 return;
291 }
292 for (size_t si = 0; si < mCache.size() / mInputs.size(); ++si) {
293 auto& variables = mTimesliceIndex.getVariablesForSlot({si});
294 auto timestamp = VariableContextHelpers::getTimeslice(variables);
295 auto valid = mTimesliceIndex.validateSlot({si}, newOldest.timeslice);
296 if (valid) {
297 if (mTimesliceIndex.isValid({si})) {
298 LOGP(debug, "Keeping slot {} because data has timestamp {} while oldest possible timestamp is {}", si, timestamp.value, newOldest.timeslice.value);
299 }
300 continue;
301 }
302 mPruneOps.push_back(PruneOp{si});
303 bool didDrop = false;
304 for (size_t mi = 0; mi < mInputs.size(); ++mi) {
305 auto& input = mInputs[mi];
306 auto& element = mCache[si * mInputs.size() + mi];
307 if (element.size() != 0) {
308 if (input.lifetime != Lifetime::Condition && != "internal-dpl-injected-dummy-sink") {
309 didDrop = true;
310 auto& state = mContext.get<DeviceState>();
312 LOGP(warning, "Stop transition requested. Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it will never be completed.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
313 } else {
314 LOGP(error, "Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it can never be completed.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
315 }
316 } else {
317 LOGP(debug,
318 "Silently dropping data {} in pipeline slot {} because it has timeslice {} < {} after receiving data from channel {}."
319 "Because Lifetime::Timeframe data not there and not expected (e.g. due to sampling) we drop non sampled, non timeframe data (e.g. Conditions).",
320 DataSpecUtils::describe(input), si, timestamp.value, newOldest.timeslice.value,
321 mTimesliceIndex.getChannelInfo(channel).channel->GetName());
322 }
323 }
324 }
325 // We did drop some data. Let's print what was missing.
326 if (didDrop) {
327 for (size_t mi = 0; mi < mInputs.size(); ++mi) {
328 auto& input = mInputs[mi];
329 if (input.lifetime == Lifetime::Timer) {
330 continue;
331 }
332 auto& element = mCache[si * mInputs.size() + mi];
333 if (element.size() == 0) {
334 auto& state = mContext.get<DeviceState>();
336 LOGP(warning, "Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
337 } else {
338 LOGP(error, "Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
339 }
340 }
341 }
342 }
343 }
353 for (auto& op : mPruneOps) {
354 this->pruneCache(op.slot, onDrop);
355 }
356 mPruneOps.clear();
361 // We need to prune the cache from the old stuff, if any. Otherwise we
362 // simply store the payload in the cache and we mark relevant bit in the
363 // hence the first if.
364 auto pruneCache = [&onDrop,
365 &cache = mCache,
366 &cachedStateMetrics = mCachedStateMetrics,
367 numInputTypes = mDistinctRoutesIndex.size(),
368 &index = mTimesliceIndex,
369 ref = mContext](TimesliceSlot slot) {
370 if (onDrop) {
371 auto oldestPossibleTimeslice = index.getOldestPossibleOutput();
372 // State of the computation
373 std::vector<MessageSet> dropped(numInputTypes);
374 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
375 auto cacheId = slot.index * numInputTypes + ai;
376 cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
377 // TODO: in the original implementation of the cache, there have been only two messages per entry,
378 // check if the 2 above corresponds to the number of messages.
379 if (cache[cacheId].size() > 0) {
380 dropped[ai] = std::move(cache[cacheId]);
381 }
382 }
383 bool anyDropped = std::any_of(dropped.begin(), dropped.end(), [](auto& m) { return m.size(); });
384 if (anyDropped) {
385 O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
386 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "pruneCache", "Dropping stuff from slot %zu with timeslice %zu", slot.index, oldestPossibleTimeslice.timeslice.value);
387 onDrop(slot, dropped, oldestPossibleTimeslice);
388 }
389 }
390 assert(cache.empty() == false);
391 assert(index.size() * numInputTypes == cache.size());
392 // Prune old stuff from the cache, hopefully deleting it...
393 // We set the current slot to the timeslice value, so that old stuff
394 // will be ignored.
395 assert(numInputTypes * slot.index < cache.size());
396 for (size_t ai = slot.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
397 cache[ai].clear();
398 cachedStateMetrics[ai] = CacheEntryStatus::EMPTY;
399 }
400 };
402 pruneCache(slot);
405bool isCalibrationData(std::unique_ptr<fair::mq::Message>& first)
407 auto* dh = o2::header::get<DataHeader*>(first->GetData());
408 return dh->flagsDerivedHeader & DataProcessingHeader::KEEP_AT_EOS_FLAG;
412 DataRelayer::relay(void const* rawHeader,
413 std::unique_ptr<fair::mq::Message>* messages,
414 InputInfo const& info,
415 size_t nMessages,
416 size_t nPayloads,
417 std::function<void(TimesliceSlot, std::vector<MessageSet>&, TimesliceIndex::OldestOutputInfo)> onDrop)
419 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
420 DataProcessingHeader const* dph = o2::header::get<DataProcessingHeader*>(rawHeader);
422 //
423 // This returns true if a given slot is available for the current number of lanes
424 auto isSlotInLane = [currentLane = dph->startTime, maxLanes = mMaxLanes](TimesliceSlot slot) {
425 return (slot.index % maxLanes) == (currentLane % maxLanes);
426 };
427 // This returns the identifier for the given input. We use a separate
428 // function because while it's trivial now, the actual matchmaking will
429 // become more complicated when we will start supporting ranges.
430 auto getInputTimeslice = [&matchers = mInputMatchers,
431 &distinctRoutes = mDistinctRoutesIndex,
432 &rawHeader,
433 &index = mTimesliceIndex](VariableContext& context)
434 -> std::tuple<int, TimesliceId> {
437 auto input = matchToContext(rawHeader, matchers, distinctRoutes, context);
439 if (input == INVALID_INPUT) {
440 return {
443 };
444 }
447 if (auto pval = std::get_if<uint64_t>(&context.get(0))) {
448 TimesliceId timeslice{*pval};
449 return {input, timeslice};
450 }
451 // If we get here it means we need to push something out of the cache.
452 return {
455 };
456 };
458 // Actually save the header / payload in the slot
459 auto saveInSlot = [&cachedStateMetrics = mCachedStateMetrics,
460 &messages,
461 &nMessages,
462 &nPayloads,
463 &cache = mCache,
464 &services = mContext,
465 numInputTypes = mDistinctRoutesIndex.size()](TimesliceId timeslice, int input, TimesliceSlot slot, InputInfo const& info) -> size_t {
466 O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
467 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "saveInSlot", "saving %{public}s@%zu in slot %zu from %{public}s",
468 fmt::format("{:x}", *o2::header::get<DataHeader*>(messages[0]->GetData())).c_str(),
469 timeslice.value, slot.index,
470 info.index.value == ChannelIndex::INVALID ? "invalid" : services.get<FairMQDeviceProxy>().getInputChannel(info.index)->GetName().c_str());
471 auto cacheIdx = numInputTypes * slot.index + input;
472 MessageSet& target = cache[cacheIdx];
473 cachedStateMetrics[cacheIdx] = CacheEntryStatus::PENDING;
474 // TODO: make sure that multiple parts can only be added within the same call of
475 // DataRelayer::relay
476 assert(nPayloads > 0);
477 size_t saved = 0;
478 for (size_t mi = 0; mi < nMessages; ++mi) {
479 assert(mi + nPayloads < nMessages);
480 // We are in calibration mode and the data does not have the calibration bit set.
481 // We do not store it.
483 mi += nPayloads;
484 continue;
485 }
486 target.add([&messages, &mi](size_t i) -> fair::mq::MessagePtr& { return messages[mi + i]; }, nPayloads + 1);
487 mi += nPayloads;
488 saved += nPayloads;
489 }
490 return saved;
491 };
493 auto updateStatistics = [ref = mContext](TimesliceIndex::ActionTaken action) {
494 auto& stats = ref.get<DataProcessingStats>();
496 // Update statistics for what happened
497 switch (action) {
499 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
500 break;
502 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
503 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), DataProcessingStats::Op::Add, (int)1});
504 break;
506 stats.updateStats({static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), DataProcessingStats::Op::Add, (int)1});
507 break;
509 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
510 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), DataProcessingStats::Op::Add, (int)1});
511 break;
513 break;
514 }
515 };
518 //
519 // This is the actual outer loop processing input as part of a given
520 // timeslice. All the other implementation details are hidden by the lambdas
521 auto input = INVALID_INPUT;
522 auto timeslice = TimesliceId{TimesliceId::INVALID};
524 auto& index = mTimesliceIndex;
526 bool needsCleaning = false;
527 // First look for matching slots which already have some
528 // partial match.
529 for (size_t ci = 0; ci < index.size(); ++ci) {
530 slot = TimesliceSlot{ci};
531 if (!isSlotInLane(slot)) {
532 continue;
533 }
534 if (index.isValid(slot) == false) {
535 continue;
536 }
537 std::tie(input, timeslice) = getInputTimeslice(index.getVariablesForSlot(slot));
538 if (input != INVALID_INPUT) {
539 break;
540 }
541 }
543 // If we did not find anything, look for slots which
544 // are invalid.
545 if (input == INVALID_INPUT) {
546 for (size_t ci = 0; ci < index.size(); ++ci) {
547 slot = TimesliceSlot{ci};
548 if (index.isValid(slot) == true) {
549 continue;
550 }
551 if (!isSlotInLane(slot)) {
552 continue;
553 }
554 std::tie(input, timeslice) = getInputTimeslice(index.getVariablesForSlot(slot));
555 if (input != INVALID_INPUT) {
556 needsCleaning = true;
557 break;
558 }
559 }
560 }
562 auto& stats = mContext.get<DataProcessingStats>();
564 if (input != INVALID_INPUT && TimesliceId::isValid(timeslice) && TimesliceSlot::isValid(slot)) {
565 if (needsCleaning) {
566 this->pruneCache(slot, onDrop);
567 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](const auto& x) { return x.slot == slot; }), mPruneOps.end());
568 }
569 size_t saved = saveInSlot(timeslice, input, slot, info);
570 if (saved == 0) {
571 return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
572 }
573 index.publishSlot(slot);
574 index.markAsDirty(slot, true);
575 stats.updateStats({static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), DataProcessingStats::Op::Add, (int)1});
576 return RelayChoice{.type = RelayChoice::Type::WillRelay, .timeslice = timeslice};
577 }
581 VariableContext pristineContext;
582 std::tie(input, timeslice) = getInputTimeslice(pristineContext);
584 auto DataHeaderInfo = [&rawHeader]() {
585 std::string error;
586 // extract header from message model
587 const auto* dh = o2::header::get<o2::header::DataHeader*>(rawHeader);
588 if (dh) {
589 error += fmt::format("{}/{}/{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification);
590 } else {
591 error += "invalid header";
592 }
593 return error;
594 };
596 if (input == INVALID_INPUT) {
597 LOG(error) << "Could not match incoming data to any input route: " << DataHeaderInfo();
598 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
599 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
600 for (size_t pi = 0; pi < nMessages; ++pi) {
601 messages[pi].reset(nullptr);
602 }
603 return RelayChoice{.type = RelayChoice::Type::Invalid, .timeslice = timeslice};
604 }
606 if (TimesliceId::isValid(timeslice) == false) {
607 LOG(error) << "Could not determine the timeslice for input: " << DataHeaderInfo();
608 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
609 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
610 for (size_t pi = 0; pi < nMessages; ++pi) {
611 messages[pi].reset(nullptr);
612 }
613 return RelayChoice{.type = RelayChoice::Type::Invalid, .timeslice = timeslice};
614 }
616 O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
618 std::tie(action, slot) = index.replaceLRUWith(pristineContext, timeslice);
619 uint64_t const* debugTimestamp = std::get_if<uint64_t>(&pristineContext.get(0));
620 if (action != TimesliceIndex::ActionTaken::Wait) {
621 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "saveInSlot",
622 "Slot %zu updated with %zu using action %d, %" PRIu64, slot.index, timeslice.value, (int)action, *debugTimestamp);
623 }
625 updateStatistics(action);
627 switch (action) {
629 return RelayChoice{.type = RelayChoice::Type::Backpressured, .timeslice = timeslice};
631 static std::atomic<size_t> obsoleteCount = 0;
632 static std::atomic<size_t> mult = 1;
633 if ((obsoleteCount++ % (1 * mult)) == 0) {
634 LOGP(warning, "Over {} incoming messages are already obsolete, not relaying.", obsoleteCount.load());
635 if (obsoleteCount > mult * 10) {
636 mult = mult * 10;
637 }
638 }
639 return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
641 LOG(warning) << "Incoming data is invalid, not relaying.";
642 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
643 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
644 for (size_t pi = 0; pi < nMessages; ++pi) {
645 messages[pi].reset(nullptr);
646 }
647 return RelayChoice{.type = RelayChoice::Type::Invalid, .timeslice = timeslice};
650 // At this point the variables match the new input but the
651 // cache still holds the old data, so we prune it.
652 this->pruneCache(slot, onDrop);
653 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](const auto& x) { return x.slot == slot; }), mPruneOps.end());
654 size_t saved = saveInSlot(timeslice, input, slot, info);
655 if (saved == 0) {
656 return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
657 }
658 index.publishSlot(slot);
659 index.markAsDirty(slot, true);
661 }
665void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& completed)
667 LOGP(debug, "DataRelayer::getReadyToProcess");
668 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
670 // THE STATE
671 const auto& cache = mCache;
672 const auto numInputTypes = mDistinctRoutesIndex.size();
673 //
675 //
676 // We use this to bail out early from the check as soon as we find something
677 // which we know is not complete.
678 auto getPartialRecord = [&cache, &numInputTypes](int li) -> gsl::span<MessageSet const> {
679 auto offset = li * numInputTypes;
680 assert(cache.size() >= offset + numInputTypes);
681 auto const start = + offset;
682 auto const end = + offset + numInputTypes;
683 return {start, end};
684 };
686 // These two are trivial, but in principle the whole loop could be parallelised
687 // or vectorised so "completed" could be a thread local variable which needs
688 // merging at the end.
689 auto updateCompletionResults = [&completed](TimesliceSlot li, uint64_t const* timeslice, CompletionPolicy::CompletionOp op) {
690 if (timeslice) {
691 LOGP(debug, "Doing action {} for slot {} (timeslice: {})", (int)op, li.index, *timeslice);
692 completed.emplace_back(RecordAction{li, {*timeslice}, op});
693 } else {
694 LOGP(debug, "No timeslice associated with slot ", li.index);
695 }
696 };
699 //
700 // To determine if a line is complete, we iterate on all the arguments
701 // and check if they are ready. We do it this way, because in the end
702 // the number of inputs is going to be small and having a more complex
703 // structure will probably result in a larger footprint in any case.
704 // Also notice that ai == inputsNumber only when we reach the end of the
705 // iteration, that means we have found all the required bits.
706 //
707 // Notice that the only time numInputTypes is 0 is when we are a dummy
708 // device created as a source for timers / conditions.
709 if (numInputTypes == 0) {
710 LOGP(debug, "numInputTypes == 0, returning.");
711 return;
712 }
713 size_t cacheLines = cache.size() / numInputTypes;
714 assert(cacheLines * numInputTypes == cache.size());
715 int countConsume = 0;
716 int countConsumeExisting = 0;
717 int countProcess = 0;
718 int countDiscard = 0;
719 int countWait = 0;
720 int notDirty = 0;
722 for (int li = cacheLines - 1; li >= 0; --li) {
723 TimesliceSlot slot{(size_t)li};
724 // We only check the cachelines which have been updated by an incoming
725 // message.
726 if (mTimesliceIndex.isDirty(slot) == false) {
727 notDirty++;
728 continue;
729 }
730 if (!mCompletionPolicy.callbackFull) {
731 throw runtime_error_f("Completion police %s has no callback set",;
732 }
733 auto partial = getPartialRecord(li);
734 // TODO: get the data ref from message model
735 auto getter = [&partial](size_t idx, size_t part) {
736 if (partial[idx].size() > 0 && partial[idx].header(part).get()) {
737 auto header = partial[idx].header(part).get();
738 auto payload = partial[idx].payload(part).get();
739 return DataRef{nullptr,
740 reinterpret_cast<const char*>(header->GetData()),
741 reinterpret_cast<char const*>(payload ? payload->GetData() : nullptr),
742 payload ? payload->GetSize() : 0};
743 }
744 return DataRef{};
745 };
746 auto nPartsGetter = [&partial](size_t idx) {
747 return partial[idx].size();
748 };
749 InputSpan span{getter, nPartsGetter, static_cast<size_t>(partial.size())};
750 CompletionPolicy::CompletionOp action = mCompletionPolicy.callbackFull(span, mInputs, mContext);
752 auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
753 auto timeslice = std::get_if<uint64_t>(&variables.get(0));
754 switch (action) {
756 countConsume++;
757 updateCompletionResults(slot, timeslice, action);
758 mTimesliceIndex.markAsDirty(slot, false);
759 break;
761 // This is just like Consume, but we also mark all slots as dirty
762 countConsume++;
764 updateCompletionResults(slot, timeslice, action);
765 mTimesliceIndex.rescan();
766 break;
768 countConsumeExisting++;
769 updateCompletionResults(slot, timeslice, action);
770 mTimesliceIndex.markAsDirty(slot, false);
771 break;
773 countProcess++;
774 updateCompletionResults(slot, timeslice, action);
775 mTimesliceIndex.markAsDirty(slot, false);
776 break;
778 countDiscard++;
779 updateCompletionResults(slot, timeslice, action);
780 mTimesliceIndex.markAsDirty(slot, false);
781 break;
783 countWait++;
784 mTimesliceIndex.markAsDirty(slot, true);
786 break;
788 countWait++;
789 mTimesliceIndex.markAsDirty(slot, false);
790 break;
791 }
792 }
793 mTimesliceIndex.updateOldestPossibleOutput(false);
794 LOGP(debug, "DataRelayer::getReadyToProcess results notDirty:{}, consume:{}, consumeExisting:{}, process:{}, discard:{}, wait:{}",
795 notDirty, countConsume, countConsumeExisting, countProcess,
796 countDiscard, countWait);
801 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
802 const auto numInputTypes = mDistinctRoutesIndex.size();
804 auto markInputDone = [&cachedStateMetrics = mCachedStateMetrics,
805 &numInputTypes](TimesliceSlot s, size_t arg, CacheEntryStatus oldStatus, CacheEntryStatus newStatus) {
806 auto cacheId = s.index * numInputTypes + arg;
807 if (cachedStateMetrics[cacheId] == oldStatus) {
808 cachedStateMetrics[cacheId] = newStatus;
809 }
810 };
812 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
813 markInputDone(slot, ai, oldStatus, newStatus);
814 }
817std::vector<o2::framework::MessageSet> DataRelayer::consumeAllInputsForTimeslice(TimesliceSlot slot)
819 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
821 const auto numInputTypes = mDistinctRoutesIndex.size();
822 // State of the computation
823 std::vector<MessageSet> messages(numInputTypes);
824 auto& cache = mCache;
825 auto& index = mTimesliceIndex;
827 // Nothing to see here, this is just to make the outer loop more understandable.
828 auto jumpToCacheEntryAssociatedWith = [](TimesliceSlot) {
829 return;
830 };
832 // We move ownership so that the cache can be reused once the computation is
833 // finished. We mark the given cache slot invalid, so that it can be reused
834 // This means we can still handle old messages if there is still space in the
835 // cache where to put them.
836 auto moveHeaderPayloadToOutput = [&messages,
837 &cachedStateMetrics = mCachedStateMetrics,
838 &cache, &index, &numInputTypes](TimesliceSlot s, size_t arg) {
839 auto cacheId = s.index * numInputTypes + arg;
840 cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
841 // TODO: in the original implementation of the cache, there have been only two messages per entry,
842 // check if the 2 above corresponds to the number of messages.
843 if (cache[cacheId].size() > 0) {
844 messages[arg] = std::move(cache[cacheId]);
845 }
846 index.markAsInvalid(s);
847 };
849 // An invalid set of arguments is a set of arguments associated to an invalid
850 // timeslice, so I can simply do that. I keep the assertion there because in principle
851 // we should have dispatched the timeslice already!
852 // FIXME: what happens when we have enough timeslices to hit the invalid one?
853 auto invalidateCacheFor = [&numInputTypes, &index, &cache](TimesliceSlot s) {
854 for (size_t ai = s.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
855 assert(std::accumulate(cache[ai].messages.begin(), cache[ai].messages.end(), true, [](bool result, auto const& element) { return result && element.get() == nullptr; }));
856 cache[ai].clear();
857 }
858 index.markAsInvalid(s);
859 };
861 // Outer loop here.
862 jumpToCacheEntryAssociatedWith(slot);
863 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
864 moveHeaderPayloadToOutput(slot, ai);
865 }
866 invalidateCacheFor(slot);
868 return messages;
871std::vector<o2::framework::MessageSet> DataRelayer::consumeExistingInputsForTimeslice(TimesliceSlot slot)
873 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
875 const auto numInputTypes = mDistinctRoutesIndex.size();
876 // State of the computation
877 std::vector<MessageSet> messages(numInputTypes);
878 auto& cache = mCache;
879 auto& index = mTimesliceIndex;
881 // Nothing to see here, this is just to make the outer loop more understandable.
882 auto jumpToCacheEntryAssociatedWith = [](TimesliceSlot) {
883 return;
884 };
886 // We move ownership so that the cache can be reused once the computation is
887 // finished. We mark the given cache slot invalid, so that it can be reused
888 // This means we can still handle old messages if there is still space in the
889 // cache where to put them.
890 auto copyHeaderPayloadToOutput = [&messages,
891 &cachedStateMetrics = mCachedStateMetrics,
892 &cache, &index, &numInputTypes](TimesliceSlot s, size_t arg) {
893 auto cacheId = s.index * numInputTypes + arg;
894 cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
895 // TODO: in the original implementation of the cache, there have been only two messages per entry,
896 // check if the 2 above corresponds to the number of messages.
897 for (size_t pi = 0; pi < cache[cacheId].size(); pi++) {
898 auto& header = cache[cacheId].header(pi);
899 auto&& newHeader = header->GetTransport()->CreateMessage();
900 newHeader->Copy(*header);
901 messages[arg].add(PartRef{std::move(newHeader), std::move(cache[cacheId].payload(pi))});
902 }
903 };
905 // Outer loop here.
906 jumpToCacheEntryAssociatedWith(slot);
907 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
908 copyHeaderPayloadToOutput(slot, ai);
909 }
911 return std::move(messages);
916 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
918 for (auto& cache : mCache) {
919 cache.clear();
920 }
921 for (size_t s = 0; s < mTimesliceIndex.size(); ++s) {
922 mTimesliceIndex.markAsInvalid(TimesliceSlot{s});
923 }
929 return mCache.size() / mDistinctRoutesIndex.size();
938 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
940 mTimesliceIndex.resize(s);
941 mVariableContextes.resize(s);
947 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
949 auto numInputTypes = mDistinctRoutesIndex.size();
950 // FIXME: many of the DataRelayer function rely on allocated cache, so its
951 // maybe misleading to have the allocation in a function primarily for
952 // metrics publishing, do better in setPipelineLength?
953 mCache.resize(numInputTypes * mTimesliceIndex.size());
954 auto& states = mContext.get<DataProcessingStates>();
956 mCachedStateMetrics.resize(mCache.size());
958 // There is maximum 16 variables available. We keep them row-wise so that
959 // that we can take mod 16 of the index to understand which variable we
960 // are talking about.
961 for (size_t i = 0; i < mVariableContextes.size(); ++i) {
963 .name = fmt::format("matcher_variables/{}", i),
964 .stateId = static_cast<short>((short)(ProcessingStateId::CONTEXT_VARIABLES_BASE) + i),
965 .minPublishInterval = 500, // if we publish too often we flood the GUI and we are not able to read it in any case
966 .sendInitialValue = true,
967 .defaultEnabled = mContext.get<DriverConfig const>().driverHasGUI,
968 });
969 }
971 for (int ci = 0; ci < mTimesliceIndex.size(); ci++) {
973 .name = fmt::format("data_relayer/{}", ci),
974 .stateId = static_cast<short>((short)(ProcessingStateId::DATA_RELAYER_BASE) + (short)ci),
975 .minPublishInterval = 800, // if we publish too often we flood the GUI and we are not able to read it in any case
976 .sendInitialValue = true,
977 .defaultEnabled = mContext.get<DriverConfig const>().driverHasGUI,
978 });
979 }
984 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
990 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
996 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
997 return VariableContextHelpers::getRunNumber(mTimesliceIndex.getVariablesForSlot(slot));
1002 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1008 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1009 auto& states = mContext.get<DataProcessingStates>();
1010 for (size_t ci = 0; ci < mTimesliceIndex.size(); ++ci) {
1011 auto slot = TimesliceSlot{ci};
1012 sendVariableContextMetrics(mTimesliceIndex.getPublishedVariablesForSlot(slot), slot,
1013 states);
1014 }
1015 char relayerSlotState[1024];
1016 // The number of timeslices is encoded in each state
1017 // We serialise the state of a Timeslot in a given state.
1018 int written = snprintf(relayerSlotState, 1024, "%d ", (int)mTimesliceIndex.size());
1019 char* buffer = relayerSlotState + written;
1020 for (size_t ci = 0; ci < mTimesliceIndex.size(); ++ci) {
1021 for (size_t si = 0; si < mDistinctRoutesIndex.size(); ++si) {
1022 int index = si * mTimesliceIndex.size() + ci;
1023 int value = static_cast<int>(mCachedStateMetrics[index]);
1024 buffer[si] = value + '0';
1025 // Anything which is done is actually already empty,
1026 // so after we report it we mark it as such.
1027 if (mCachedStateMetrics[index] == CacheEntryStatus::DONE) {
1028 mCachedStateMetrics[index] = CacheEntryStatus::EMPTY;
1029 }
1030 }
1031 buffer[mDistinctRoutesIndex.size()] = '\0';
1032 auto size = (int)(buffer - relayerSlotState + mDistinctRoutesIndex.size());
1033 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + ci), .size = size, .data = relayerSlotState});
1034 }
1037} // namespace o2::framework
