Project
Loading...
Searching...
No Matches
DataRelayer.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
16
22#include "Framework/DataRef.h"
24#include "Framework/InputSpan.h"
26#include "Framework/Logger.h"
27#include "Framework/PartRef.h"
33#include "DataRelayerHelpers.h"
34#include "InputRouteHelpers.h"
42
45
46#include <Monitoring/Metric.h>
47#include <Monitoring/Monitoring.h>
48
49#include <fairlogger/Logger.h>
50#include <fairmq/Channel.h>
51#include <functional>
52#include <fairmq/shmem/Message.h>
53#include <fairmq/Device.h>
54#include <fmt/format.h>
55#include <fmt/ostream.h>
56#include <span>
57#include <string>
58
59using namespace o2::framework::data_matcher;
62using Verbosity = o2::monitoring::Verbosity;
63
65// Stream which keeps track of the calibration lifetime logic
67
68namespace o2::framework
69{
70
71constexpr int INVALID_INPUT = -1;
72
74 std::vector<InputRoute> const& routes,
76 ServiceRegistryRef services,
77 int pipelineLength)
78 : mContext{services},
79 mTimesliceIndex{index},
80 mCompletionPolicy{policy},
81 mDistinctRoutesIndex{DataRelayerHelpers::createDistinctRouteIndex(routes)},
82 mInputMatchers{DataRelayerHelpers::createInputMatchers(routes)},
83 mMaxLanes{InputRouteHelpers::maxLanes(routes)}
84{
85 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
86
87 if (policy.configureRelayer == nullptr) {
88 if (pipelineLength == -1) {
89 auto getPipelineLengthHelper = [&services]() {
90 try {
91 return DefaultsHelpers::pipelineLength(*services.get<RawDeviceService>().device()->fConfig);
92 } catch (...) {
94 }
95 };
96 static int detectedPipelineLength = getPipelineLengthHelper();
97 pipelineLength = detectedPipelineLength;
98 }
99 setPipelineLength(pipelineLength);
100 } else {
101 policy.configureRelayer(*this);
102 }
103
104 // The queries are all the same, so we only have width 1
105 auto numInputTypes = mDistinctRoutesIndex.size();
106 auto& states = services.get<DataProcessingStates>();
107 std::string queries = "";
108 for (short i = 0; i < numInputTypes; ++i) {
109 char buffer[128];
110 assert(mDistinctRoutesIndex[i] < routes.size());
111 mInputs.push_back(routes[mDistinctRoutesIndex[i]].matcher);
112 auto& matcher = routes[mDistinctRoutesIndex[i]].matcher;
113 DataSpecUtils::describe(buffer, 127, matcher);
114 queries += std::string_view(buffer, strlen(buffer));
115 queries += ";";
116 }
117 auto stateId = (short)ProcessingStateId::DATA_QUERIES;
118 states.registerState({.name = "data_queries", .stateId = stateId, .sendInitialValue = true, .defaultEnabled = true});
119 states.updateState(DataProcessingStates::CommandSpec{.id = stateId, .size = (int)queries.size(), .data = queries.data()});
120 states.processCommandQueue();
121}
122
124{
125 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
126 auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
127 return VariableContextHelpers::getTimeslice(variables);
128}
129
130DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<ExpirationHandler> const& expirationHandlers,
131 ServiceRegistryRef services, bool createNew)
132{
133 LOGP(debug, "DataRelayer::processDanglingInputs");
134 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
135 auto& deviceProxy = services.get<FairMQDeviceProxy>();
136
137 ActivityStats activity;
139 if (expirationHandlers.empty()) {
140 LOGP(debug, "DataRelayer::processDanglingInputs: No expiration handlers");
141 return activity;
142 }
143 // Create any slot for the time based fields
144 std::vector<TimesliceSlot> slotsCreatedByHandlers;
145 if (createNew) {
146 LOGP(debug, "Creating new slot");
147 for (auto& handler : expirationHandlers) {
148 LOGP(debug, "handler.creator for {}", handler.name);
149 auto channelIndex = deviceProxy.getInputChannelIndex(handler.routeIndex);
150 slotsCreatedByHandlers.push_back(handler.creator(services, channelIndex));
151 }
152 }
153 // Count how many slots are not invalid
154 auto validSlots = 0;
155 for (auto slot : slotsCreatedByHandlers) {
156 if (slot.index == TimesliceSlot::INVALID) {
157 continue;
158 }
159 validSlots++;
160 }
161 if (validSlots > 0) {
162 activity.newSlots++;
163 LOGP(debug, "DataRelayer::processDanglingInputs: {} slots created by handler", validSlots);
164 } else {
165 LOGP(debug, "DataRelayer::processDanglingInputs: no slots created by handler");
166 }
167 // Outer loop, we process all the records because the fact that the record
168 // expires is independent from having received data for it.
169 int headerPresent = 0;
170 int payloadPresent = 0;
171 int noCheckers = 0;
172 int badSlot = 0;
173 int checkerDenied = 0;
174 for (size_t ti = 0; ti < mTimesliceIndex.size(); ++ti) {
175 TimesliceSlot slot{ti};
176 if (mTimesliceIndex.isValid(slot) == false) {
177 continue;
178 }
179 assert(mDistinctRoutesIndex.empty() == false);
180 auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
181 auto timestamp = VariableContextHelpers::getTimeslice(variables);
182 // We iterate on all the hanlders checking if they need to be expired.
183 for (size_t ei = 0; ei < expirationHandlers.size(); ++ei) {
184 auto& expirator = expirationHandlers[ei];
185 // We check that no data is already there for the given cell
186 // it is enough to check the first element
187 auto& part = mCache[ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value];
188 if (!part.empty() && (part | get_header{0}) != nullptr) {
189 headerPresent++;
190 continue;
191 }
192 if (!part.empty() && (part | get_payload{0, 0}) != nullptr) {
193 payloadPresent++;
194 continue;
195 }
196 // We check that the cell can actually be expired.
197 if (!expirator.checker) {
198 noCheckers++;
199 continue;
200 }
201 if (slotsCreatedByHandlers[ei] != slot) {
202 badSlot++;
203 continue;
204 }
205
206 auto getPartialRecord = [&cache = mCache, numInputTypes = mDistinctRoutesIndex.size()](int li) -> std::span<std::vector<fair::mq::MessagePtr> const> {
207 auto offset = li * numInputTypes;
208 assert(cache.size() >= offset + numInputTypes);
209 auto const start = cache.data() + offset;
210 auto const end = cache.data() + offset + numInputTypes;
211 return {start, end};
212 };
213
214 auto partial = getPartialRecord(ti);
215 auto nPartsGetter = [&partial](size_t idx) {
216 return partial[idx] | count_parts{};
217 };
218 auto refCountGetter = [&partial](size_t idx) -> int {
219 auto& header = static_cast<const fair::mq::shmem::Message&>(*(partial[idx] | get_header{0}));
220 return header.GetRefCount();
221 };
222 auto indicesGetter = [&partial](size_t idx, DataRefIndices indices) -> DataRef {
223 if (!partial[idx].empty()) {
224 auto const& headerMsg = partial[idx][indices.headerIdx];
225 auto const& payloadMsg = partial[idx][indices.payloadIdx];
226 if (headerMsg) {
227 return DataRef{nullptr,
228 reinterpret_cast<const char*>(headerMsg->GetData()),
229 payloadMsg ? reinterpret_cast<char const*>(payloadMsg->GetData()) : nullptr,
230 payloadMsg ? payloadMsg->GetSize() : 0};
231 }
232 }
233 return DataRef{};
234 };
235 auto nextIndicesGetter = [&partial](size_t idx, DataRefIndices current) -> DataRefIndices {
236 auto next = partial[idx] | get_next_pair{current};
237 return next.headerIdx < partial[idx].size() ? next : DataRefIndices{size_t(-1), size_t(-1)};
238 };
239 InputSpan span{nPartsGetter, refCountGetter, indicesGetter, nextIndicesGetter, static_cast<size_t>(partial.size())};
240 // Setup the input span
241
242 if (expirator.checker(services, timestamp.value, span) == false) {
243 checkerDenied++;
244 continue;
245 }
246
247 assert(ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value < mCache.size());
248 assert(expirator.handler);
249 PartRef newRef;
250 expirator.handler(services, newRef, variables);
251 part.clear();
252 part.emplace_back(std::move(newRef.header));
253 part.emplace_back(std::move(newRef.payload));
254 activity.expiredSlots++;
255
256 mTimesliceIndex.markAsDirty(slot, true);
257 assert((part | get_header{0}) != nullptr);
258 assert((part | get_payload{0, 0}) != nullptr);
259 }
260 }
261 LOGP(debug, "DataRelayer::processDanglingInputs headerPresent:{}, payloadPresent:{}, noCheckers:{}, badSlot:{}, checkerDenied:{}",
262 headerPresent, payloadPresent, noCheckers, badSlot, checkerDenied);
263 return activity;
264}
265
269size_t matchToContext(void const* data,
270 std::vector<DataDescriptorMatcher> const& matchers,
271 std::vector<size_t> const& index,
272 VariableContext& context)
273{
274 for (size_t ri = 0, re = index.size(); ri < re; ++ri) {
275 auto& matcher = matchers[index[ri]];
276
277 if (matcher.match(reinterpret_cast<char const*>(data), context)) {
278 context.commit();
279 return ri;
280 }
281 context.discard();
282 }
283 return INVALID_INPUT;
284}
285
289{
290 static const std::string nullstring{"null"};
291
292 context.publish([](VariableContext const& variables, TimesliceSlot slot, void* context) {
293 auto& states = *static_cast<DataProcessingStates*>(context);
294 static std::string state = "";
295 state.clear();
296 for (size_t i = 0; i < MAX_MATCHING_VARIABLE; ++i) {
297 auto var = variables.get(i);
298 if (auto pval = std::get_if<uint64_t>(&var)) {
299 state += std::to_string(*pval);
300 } else if (auto pval = std::get_if<uint32_t>(&var)) {
301 state += std::to_string(*pval);
302 } else if (auto pval2 = std::get_if<std::string>(&var)) {
303 state += *pval2;
304 } else {
305 }
306 state += ";";
307 }
308 states.updateState({.id = short((int)ProcessingStateId::CONTEXT_VARIABLES_BASE + slot.index),
309 .size = (int)state.size(),
310 .data = state.data()});
311 },
312 &states, slot);
313}
314
316{
317 auto newOldest = mTimesliceIndex.setOldestPossibleInput(proposed, channel);
318 LOGP(debug, "DataRelayer::setOldestPossibleInput {} from channel {}", newOldest.timeslice.value, newOldest.channel.value);
319 static bool dontDrop = getenv("DPL_DONT_DROP_OLD_TIMESLICE") && atoi(getenv("DPL_DONT_DROP_OLD_TIMESLICE"));
320 if (dontDrop) {
321 return;
322 }
323 for (size_t si = 0; si < mCache.size() / mInputs.size(); ++si) {
324 auto& variables = mTimesliceIndex.getVariablesForSlot({si});
325 auto timestamp = VariableContextHelpers::getTimeslice(variables);
326 auto valid = mTimesliceIndex.validateSlot({si}, newOldest.timeslice);
327 if (valid) {
328 if (mTimesliceIndex.isValid({si})) {
329 LOGP(debug, "Keeping slot {} because data has timestamp {} while oldest possible timestamp is {}", si, timestamp.value, newOldest.timeslice.value);
330 }
331 continue;
332 }
333 mPruneOps.push_back(PruneOp{si});
334 bool didDrop = false;
335 for (size_t mi = 0; mi < mInputs.size(); ++mi) {
336 auto& input = mInputs[mi];
337 auto& element = mCache[si * mInputs.size() + mi];
338 if (!element.empty()) {
339 if (input.lifetime != Lifetime::Condition && mCompletionPolicy.name != "internal-dpl-injected-dummy-sink") {
340 didDrop = true;
341 auto& state = mContext.get<DeviceState>();
343 LOGP(warning, "Stop transition requested. Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it will never be completed.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
344 } else {
345 LOGP(error, "Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it can never be completed.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
346 }
347 } else {
348 LOGP(debug,
349 "Silently dropping data {} in pipeline slot {} because it has timeslice {} < {} after receiving data from channel {}."
350 "Because Lifetime::Timeframe data not there and not expected (e.g. due to sampling) we drop non sampled, non timeframe data (e.g. Conditions).",
351 DataSpecUtils::describe(input), si, timestamp.value, newOldest.timeslice.value,
352 mTimesliceIndex.getChannelInfo(channel).channel->GetName());
353 }
354 }
355 }
356 // We did drop some data. Let's print what was missing.
357 if (didDrop) {
358 for (size_t mi = 0; mi < mInputs.size(); ++mi) {
359 auto& input = mInputs[mi];
360 if (input.lifetime == Lifetime::Timer) {
361 continue;
362 }
363 auto& element = mCache[si * mInputs.size() + mi];
364 if (element.empty()) {
365 auto& state = mContext.get<DeviceState>();
367 if (state.allowedProcessing == DeviceState::CalibrationOnly) {
368 O2_SIGNPOST_ID_GENERATE(cid, calibration);
369 O2_SIGNPOST_EVENT_EMIT(calibration, cid, "expected_missing_data", "Expected missing %{public}s (lifetime:%d) while dropping non-calibration data in slot %zu with timestamp %zu < %zu.",
370 DataSpecUtils::describe(input).c_str(), (int)input.lifetime, si, timestamp.value, newOldest.timeslice.value);
371 } else {
372 LOGP(info, "Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
373 }
374 } else {
375 if (state.allowedProcessing == DeviceState::CalibrationOnly) {
376 O2_SIGNPOST_ID_GENERATE(cid, calibration);
377 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "expected_missing_data", "Not processing in calibration mode: missing %s (lifetime:%d) while dropping incomplete data in slot %zu with timestamp %zu < %zu.",
378 DataSpecUtils::describe(input).c_str(), (int)input.lifetime, si, timestamp.value, newOldest.timeslice.value);
379 } else {
380 LOGP(error, "Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
381 }
382 }
383 }
384 }
385 }
386 }
387}
388
393
395{
396 for (auto& op : mPruneOps) {
397 this->pruneCache(op.slot, onDrop);
398 }
399 mPruneOps.clear();
400}
401
403{
404 // We need to prune the cache from the old stuff, if any. Otherwise we
405 // simply store the payload in the cache and we mark relevant bit in the
406 // hence the first if.
407 auto pruneCache = [&onDrop,
408 &cache = mCache,
409 &cachedStateMetrics = mCachedStateMetrics,
410 numInputTypes = mDistinctRoutesIndex.size(),
411 &index = mTimesliceIndex,
412 ref = mContext](TimesliceSlot slot) {
413 if (onDrop) {
414 auto oldestPossibleTimeslice = index.getOldestPossibleOutput();
415 // State of the computation
416 std::vector<std::vector<fair::mq::MessagePtr>> dropped(numInputTypes);
417 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
418 auto cacheId = slot.index * numInputTypes + ai;
419 cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
420 // TODO: in the original implementation of the cache, there have been only two messages per entry,
421 // check if the 2 above corresponds to the number of messages.
422 if (!cache[cacheId].empty()) {
423 dropped[ai] = std::move(cache[cacheId]);
424 }
425 }
426 bool anyDropped = std::any_of(dropped.begin(), dropped.end(), [](auto& m) { return !m.empty(); });
427 if (anyDropped) {
428 O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
429 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "pruneCache", "Dropping stuff from slot %zu with timeslice %zu", slot.index, oldestPossibleTimeslice.timeslice.value);
430 onDrop(slot, dropped, oldestPossibleTimeslice);
431 }
432 }
433 assert(cache.empty() == false);
434 assert(index.size() * numInputTypes == cache.size());
435 // Prune old stuff from the cache, hopefully deleting it...
436 // We set the current slot to the timeslice value, so that old stuff
437 // will be ignored.
438 assert(numInputTypes * slot.index < cache.size());
439 for (size_t ai = slot.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
440 cache[ai].clear();
441 cachedStateMetrics[ai] = CacheEntryStatus::EMPTY;
442 }
443 };
444
445 pruneCache(slot);
446}
447
448bool isCalibrationData(std::unique_ptr<fair::mq::Message>& first)
449{
450 auto* dph = o2::header::get<DataProcessingHeader*>(first->GetData());
452}
453
455 DataRelayer::relay(void const* rawHeader,
456 std::unique_ptr<fair::mq::Message>* messages,
457 InputInfo const& info,
458 size_t nMessages,
459 size_t nPayloads,
460 OnInsertionCallback onInsertion,
461 OnDropCallback onDrop)
462{
463 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
464 DataProcessingHeader const* dph = o2::header::get<DataProcessingHeader*>(rawHeader);
465 // IMPLEMENTATION DETAILS
466 //
467 // This returns true if a given slot is available for the current number of lanes
468 auto isSlotInLane = [currentLane = dph->startTime, maxLanes = mMaxLanes](TimesliceSlot slot) {
469 return (slot.index % maxLanes) == (currentLane % maxLanes);
470 };
471 // This returns the identifier for the given input. We use a separate
472 // function because while it's trivial now, the actual matchmaking will
473 // become more complicated when we will start supporting ranges.
474 auto getInputTimeslice = [&matchers = mInputMatchers,
475 &distinctRoutes = mDistinctRoutesIndex,
476 &rawHeader,
477 &index = mTimesliceIndex](VariableContext& context)
478 -> std::tuple<int, TimesliceId> {
481 auto input = matchToContext(rawHeader, matchers, distinctRoutes, context);
482
483 if (input == INVALID_INPUT) {
484 return {
487 };
488 }
491 if (auto pval = std::get_if<uint64_t>(&context.get(0))) {
492 TimesliceId timeslice{*pval};
493 return {input, timeslice};
494 }
495 // If we get here it means we need to push something out of the cache.
496 return {
499 };
500 };
501
502 // Actually save the header / payload in the slot
503 auto saveInSlot = [&cachedStateMetrics = mCachedStateMetrics,
504 &messages,
505 &nMessages,
506 &nPayloads,
507 &onInsertion,
508 &cache = mCache,
509 &services = mContext,
510 numInputTypes = mDistinctRoutesIndex.size()](TimesliceId timeslice, int input, TimesliceSlot slot, InputInfo const& info) -> size_t {
511 O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
512 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "saveInSlot", "saving %{public}s@%zu in slot %zu from %{public}s",
513 fmt::format("{:x}", *o2::header::get<DataHeader*>(messages[0]->GetData())).c_str(),
514 timeslice.value, slot.index,
515 info.index.value == ChannelIndex::INVALID ? "invalid" : services.get<FairMQDeviceProxy>().getInputChannel(info.index)->GetName().c_str());
516 auto cacheIdx = numInputTypes * slot.index + input;
517 auto& target = cache[cacheIdx];
518 cachedStateMetrics[cacheIdx] = CacheEntryStatus::PENDING;
519 // TODO: make sure that multiple parts can only be added within the same call of
520 // DataRelayer::relay
521 assert(nPayloads > 0);
522 size_t saved = 0;
523 // It's guaranteed we will see all these messages only once, so we can
524 // do the forwarding here.
525 auto allMessages = std::span<fair::mq::MessagePtr>(messages, messages + nMessages);
526 if (onInsertion) {
527 onInsertion(services, allMessages);
528 }
529 for (size_t mi = 0; mi < nMessages; ++mi) {
530 assert(mi + nPayloads < nMessages);
531 // We are in calibration mode and the data does not have the calibration bit set.
532 // We do not store it.
534 O2_SIGNPOST_ID_FROM_POINTER(cid, calibration, &services.get<DataProcessorContext>());
535 O2_SIGNPOST_EVENT_EMIT(calibration, cid, "calibration",
536 "Dropping incoming %zu messages because they are data processing.", nPayloads);
537 // Actually dropping messages.
538 for (size_t i = mi; i < mi + nPayloads + 1; i++) {
539 auto discard = std::move(messages[i]);
540 }
541 mi += nPayloads;
542 continue;
543 }
544 auto span = std::span<fair::mq::MessagePtr>(messages + mi, messages + mi + nPayloads + 1);
545 // Notice this will split [(header, payload), (header, payload)] multiparts
546 // in N different subParts for the message spec.
547 for (size_t i = 0; i < nPayloads + 1; ++i) {
548 target.emplace_back(std::move(span[i]));
549 }
550 mi += nPayloads;
551 saved += nPayloads;
552 }
553 return saved;
554 };
555
556 auto updateStatistics = [ref = mContext](TimesliceIndex::ActionTaken action) {
557 auto& stats = ref.get<DataProcessingStats>();
558
559 // Update statistics for what happened
560 switch (action) {
562 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
563 break;
565 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
566 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), DataProcessingStats::Op::Add, (int)1});
567 break;
569 stats.updateStats({static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), DataProcessingStats::Op::Add, (int)1});
570 break;
572 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
573 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), DataProcessingStats::Op::Add, (int)1});
574 break;
576 break;
577 }
578 };
579
580 // OUTER LOOP
581 //
582 // This is the actual outer loop processing input as part of a given
583 // timeslice. All the other implementation details are hidden by the lambdas
584 auto input = INVALID_INPUT;
585 auto timeslice = TimesliceId{TimesliceId::INVALID};
587 auto& index = mTimesliceIndex;
588
589 bool needsCleaning = false;
590 // First look for matching slots which already have some
591 // partial match.
592 for (size_t ci = 0; ci < index.size(); ++ci) {
593 slot = TimesliceSlot{ci};
594 if (!isSlotInLane(slot)) {
595 continue;
596 }
597 if (index.isValid(slot) == false) {
598 continue;
599 }
600 std::tie(input, timeslice) = getInputTimeslice(index.getVariablesForSlot(slot));
601 if (input != INVALID_INPUT) {
602 break;
603 }
604 }
605
606 // If we did not find anything, look for slots which
607 // are invalid.
608 if (input == INVALID_INPUT) {
609 for (size_t ci = 0; ci < index.size(); ++ci) {
610 slot = TimesliceSlot{ci};
611 if (index.isValid(slot) == true) {
612 continue;
613 }
614 if (!isSlotInLane(slot)) {
615 continue;
616 }
617 std::tie(input, timeslice) = getInputTimeslice(index.getVariablesForSlot(slot));
618 if (input != INVALID_INPUT) {
619 needsCleaning = true;
620 break;
621 }
622 }
623 }
624
625 auto& stats = mContext.get<DataProcessingStats>();
627 if (input != INVALID_INPUT && TimesliceId::isValid(timeslice) && TimesliceSlot::isValid(slot)) {
628 if (needsCleaning) {
629 this->pruneCache(slot, onDrop);
630 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](const auto& x) { return x.slot == slot; }), mPruneOps.end());
631 }
632 size_t saved = saveInSlot(timeslice, input, slot, info);
633 if (saved == 0) {
634 return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
635 }
636 index.publishSlot(slot);
637 index.markAsDirty(slot, true);
638 stats.updateStats({static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), DataProcessingStats::Op::Add, (int)1});
639 return RelayChoice{.type = RelayChoice::Type::WillRelay, .timeslice = timeslice};
640 }
641
644 VariableContext pristineContext;
645 std::tie(input, timeslice) = getInputTimeslice(pristineContext);
646
647 auto DataHeaderInfo = [&rawHeader]() {
648 std::string error;
649 // extract header from message model
650 const auto* dh = o2::header::get<o2::header::DataHeader*>(rawHeader);
651 if (dh) {
652 error += fmt::format("{}/{}/{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification);
653 } else {
654 error += "invalid header";
655 }
656 return error;
657 };
658
659 if (input == INVALID_INPUT) {
660 LOG(error) << "Could not match incoming data to any input route: " << DataHeaderInfo();
661 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
662 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
663 for (size_t pi = 0; pi < nMessages; ++pi) {
664 messages[pi].reset(nullptr);
665 }
666 return RelayChoice{.type = RelayChoice::Type::Invalid, .timeslice = timeslice};
667 }
668
669 if (TimesliceId::isValid(timeslice) == false) {
670 LOG(error) << "Could not determine the timeslice for input: " << DataHeaderInfo();
671 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
672 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
673 for (size_t pi = 0; pi < nMessages; ++pi) {
674 messages[pi].reset(nullptr);
675 }
676 return RelayChoice{.type = RelayChoice::Type::Invalid, .timeslice = timeslice};
677 }
678
679 O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
681 std::tie(action, slot) = index.replaceLRUWith(pristineContext, timeslice);
682 uint64_t const* debugTimestamp = std::get_if<uint64_t>(&pristineContext.get(0));
683 if (action != TimesliceIndex::ActionTaken::Wait) {
684 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "saveInSlot",
685 "Slot %zu updated with %zu using action %d, %" PRIu64, slot.index, timeslice.value, (int)action, *debugTimestamp);
686 }
687
688 updateStatistics(action);
689
690 switch (action) {
692 return RelayChoice{.type = RelayChoice::Type::Backpressured, .timeslice = timeslice};
694 static std::atomic<size_t> obsoleteCount = 0;
695 static std::atomic<size_t> mult = 1;
696 if ((obsoleteCount++ % (1 * mult)) == 0) {
697 LOGP(warning, "Over {} incoming messages are already obsolete, not relaying.", obsoleteCount.load());
698 if (obsoleteCount > mult * 10) {
699 mult = mult * 10;
700 }
701 }
702 return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
704 LOG(warning) << "Incoming data is invalid, not relaying.";
705 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
706 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
707 for (size_t pi = 0; pi < nMessages; ++pi) {
708 messages[pi].reset(nullptr);
709 }
710 return RelayChoice{.type = RelayChoice::Type::Invalid, .timeslice = timeslice};
713 // At this point the variables match the new input but the
714 // cache still holds the old data, so we prune it.
715 this->pruneCache(slot, onDrop);
716 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](const auto& x) { return x.slot == slot; }), mPruneOps.end());
717 size_t saved = saveInSlot(timeslice, input, slot, info);
718 if (saved == 0) {
719 return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
720 }
721 index.publishSlot(slot);
722 index.markAsDirty(slot, true);
723 return RelayChoice{.type = RelayChoice::Type::WillRelay, .timeslice = timeslice};
724 }
726}
727
728void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& completed)
729{
730 LOGP(debug, "DataRelayer::getReadyToProcess");
731 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
732
733 // THE STATE
734 const auto& cache = mCache;
735 const auto numInputTypes = mDistinctRoutesIndex.size();
736 //
737 // THE IMPLEMENTATION DETAILS
738 //
739 // We use this to bail out early from the check as soon as we find something
740 // which we know is not complete.
741 auto getPartialRecord = [&cache, &numInputTypes](int li) -> std::span<std::vector<fair::mq::MessagePtr> const> {
742 auto offset = li * numInputTypes;
743 assert(cache.size() >= offset + numInputTypes);
744 auto const start = cache.data() + offset;
745 auto const end = cache.data() + offset + numInputTypes;
746 return {start, end};
747 };
748
749 // These two are trivial, but in principle the whole loop could be parallelised
750 // or vectorised so "completed" could be a thread local variable which needs
751 // merging at the end.
752 auto updateCompletionResults = [&completed](TimesliceSlot li, uint64_t const* timeslice, CompletionPolicy::CompletionOp op) {
753 if (timeslice) {
754 LOGP(debug, "Doing action {} for slot {} (timeslice: {})", (int)op, li.index, *timeslice);
755 completed.emplace_back(RecordAction{li, {*timeslice}, op});
756 } else {
757 LOGP(debug, "No timeslice associated with slot ", li.index);
758 }
759 };
760
761 // THE OUTER LOOP
762 //
763 // To determine if a line is complete, we iterate on all the arguments
764 // and check if they are ready. We do it this way, because in the end
765 // the number of inputs is going to be small and having a more complex
766 // structure will probably result in a larger footprint in any case.
767 // Also notice that ai == inputsNumber only when we reach the end of the
768 // iteration, that means we have found all the required bits.
769 //
770 // Notice that the only time numInputTypes is 0 is when we are a dummy
771 // device created as a source for timers / conditions.
772 if (numInputTypes == 0) {
773 LOGP(debug, "numInputTypes == 0, returning.");
774 return;
775 }
776 size_t cacheLines = cache.size() / numInputTypes;
777 assert(cacheLines * numInputTypes == cache.size());
778 int countConsume = 0;
779 int countConsumeExisting = 0;
780 int countProcess = 0;
781 int countDiscard = 0;
782 int countWait = 0;
783 int notDirty = 0;
784
785 for (int li = cacheLines - 1; li >= 0; --li) {
786 TimesliceSlot slot{(size_t)li};
787 // We only check the cachelines which have been updated by an incoming
788 // message.
789 if (mTimesliceIndex.isDirty(slot) == false) {
790 notDirty++;
791 continue;
792 }
793 if (!mCompletionPolicy.callbackFull) {
794 throw runtime_error_f("Completion police %s has no callback set", mCompletionPolicy.name.c_str());
795 }
796 auto partial = getPartialRecord(li);
797 auto nPartsGetter = [&partial](size_t idx) {
798 return partial[idx] | count_parts{};
799 };
800 auto refCountGetter = [&partial](size_t idx) -> int {
801 auto& header = static_cast<const fair::mq::shmem::Message&>(*(partial[idx] | get_header{0}));
802 return header.GetRefCount();
803 };
804 auto indicesGetter = [&partial](size_t idx, DataRefIndices indices) -> DataRef {
805 if (!partial[idx].empty()) {
806 auto const& headerMsg = partial[idx][indices.headerIdx];
807 auto const& payloadMsg = partial[idx][indices.payloadIdx];
808 if (headerMsg) {
809 return DataRef{nullptr,
810 reinterpret_cast<const char*>(headerMsg->GetData()),
811 payloadMsg ? reinterpret_cast<char const*>(payloadMsg->GetData()) : nullptr,
812 payloadMsg ? payloadMsg->GetSize() : 0};
813 }
814 }
815 return DataRef{};
816 };
817 auto nextIndicesGetter = [&partial](size_t idx, DataRefIndices current) -> DataRefIndices {
818 auto next = partial[idx] | get_next_pair{current};
819 return next.headerIdx < partial[idx].size() ? next : DataRefIndices{size_t(-1), size_t(-1)};
820 };
821 InputSpan span{nPartsGetter, refCountGetter, indicesGetter, nextIndicesGetter, static_cast<size_t>(partial.size())};
822 CompletionPolicy::CompletionOp action = mCompletionPolicy.callbackFull(span, mInputs, mContext);
823
824 auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
825 auto timeslice = std::get_if<uint64_t>(&variables.get(0));
826 switch (action) {
828 countConsume++;
829 updateCompletionResults(slot, timeslice, action);
830 mTimesliceIndex.markAsDirty(slot, false);
831 break;
833 // This is just like Consume, but we also mark all slots as dirty
834 countConsume++;
836 updateCompletionResults(slot, timeslice, action);
837 mTimesliceIndex.rescan();
838 break;
840 countConsumeExisting++;
841 updateCompletionResults(slot, timeslice, action);
842 mTimesliceIndex.markAsDirty(slot, false);
843 break;
845 countProcess++;
846 updateCompletionResults(slot, timeslice, action);
847 mTimesliceIndex.markAsDirty(slot, false);
848 break;
850 countDiscard++;
851 updateCompletionResults(slot, timeslice, action);
852 mTimesliceIndex.markAsDirty(slot, false);
853 break;
855 countWait++;
856 mTimesliceIndex.markAsDirty(slot, true);
858 break;
860 countWait++;
861 mTimesliceIndex.markAsDirty(slot, false);
862 break;
863 }
864 }
865 mTimesliceIndex.updateOldestPossibleOutput(false);
866 LOGP(debug, "DataRelayer::getReadyToProcess results notDirty:{}, consume:{}, consumeExisting:{}, process:{}, discard:{}, wait:{}",
867 notDirty, countConsume, countConsumeExisting, countProcess,
868 countDiscard, countWait);
869}
870
872{
873 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
874 const auto numInputTypes = mDistinctRoutesIndex.size();
875
876 auto markInputDone = [&cachedStateMetrics = mCachedStateMetrics,
877 &numInputTypes](TimesliceSlot s, size_t arg, CacheEntryStatus oldStatus, CacheEntryStatus newStatus) {
878 auto cacheId = s.index * numInputTypes + arg;
879 if (cachedStateMetrics[cacheId] == oldStatus) {
880 cachedStateMetrics[cacheId] = newStatus;
881 }
882 };
883
884 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
885 markInputDone(slot, ai, oldStatus, newStatus);
886 }
887}
888
889std::vector<std::vector<fair::mq::MessagePtr>> DataRelayer::consumeAllInputsForTimeslice(TimesliceSlot slot)
890{
891 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
892
893 const auto numInputTypes = mDistinctRoutesIndex.size();
894 // State of the computation
895 std::vector<std::vector<fair::mq::MessagePtr>> messages(numInputTypes);
896 auto& cache = mCache;
897 auto& index = mTimesliceIndex;
898
899 // Nothing to see here, this is just to make the outer loop more understandable.
900 auto jumpToCacheEntryAssociatedWith = [](TimesliceSlot) {
901 return;
902 };
903
904 // We move ownership so that the cache can be reused once the computation is
905 // finished. We mark the given cache slot invalid, so that it can be reused
906 // This means we can still handle old messages if there is still space in the
907 // cache where to put them.
908 auto moveHeaderPayloadToOutput = [&messages,
909 &cachedStateMetrics = mCachedStateMetrics,
910 &cache, &index, &numInputTypes](TimesliceSlot s, size_t arg) {
911 auto cacheId = s.index * numInputTypes + arg;
912 cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
913 // TODO: in the original implementation of the cache, there have been only two messages per entry,
914 // check if the 2 above corresponds to the number of messages.
915 if (!cache[cacheId].empty()) {
916 messages[arg] = std::move(cache[cacheId]);
917 }
918 index.markAsInvalid(s);
919 };
920
921 // An invalid set of arguments is a set of arguments associated to an invalid
922 // timeslice, so I can simply do that. I keep the assertion there because in principle
923 // we should have dispatched the timeslice already!
924 // FIXME: what happens when we have enough timeslices to hit the invalid one?
925 auto invalidateCacheFor = [&numInputTypes, &index, &cache](TimesliceSlot s) {
926 for (size_t ai = s.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
927 assert(std::accumulate(cache[ai].begin(), cache[ai].end(), true, [](bool result, auto const& element) { return result && element.get() == nullptr; }));
928 cache[ai].clear();
929 }
930 index.markAsInvalid(s);
931 };
932
933 // Outer loop here.
934 jumpToCacheEntryAssociatedWith(slot);
935 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
936 moveHeaderPayloadToOutput(slot, ai);
937 }
938 invalidateCacheFor(slot);
939
940 return messages;
941}
942
943std::vector<std::vector<fair::mq::MessagePtr>> DataRelayer::consumeExistingInputsForTimeslice(TimesliceSlot slot)
944{
945 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
946
947 const auto numInputTypes = mDistinctRoutesIndex.size();
948 // State of the computation
949 std::vector<std::vector<fair::mq::MessagePtr>> messages(numInputTypes);
950 auto& cache = mCache;
951 auto& index = mTimesliceIndex;
952
953 // Nothing to see here, this is just to make the outer loop more understandable.
954 auto jumpToCacheEntryAssociatedWith = [](TimesliceSlot) {
955 return;
956 };
957
958 // We move ownership so that the cache can be reused once the computation is
959 // finished. We mark the given cache slot invalid, so that it can be reused
960 // This means we can still handle old messages if there is still space in the
961 // cache where to put them.
962 auto copyHeaderPayloadToOutput = [&messages,
963 &cachedStateMetrics = mCachedStateMetrics,
964 &cache, &index, &numInputTypes](TimesliceSlot s, size_t arg) {
965 auto cacheId = s.index * numInputTypes + arg;
966 cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
967 // TODO: in the original implementation of the cache, there have been only two messages per entry,
968 // check if the 2 above corresponds to the number of messages.
969 for (size_t pi = 0; pi < (cache[cacheId] | count_parts{}); pi++) {
970 auto& header = cache[cacheId] | get_header{pi};
971 auto&& newHeader = header->GetTransport()->CreateMessage();
972 newHeader->Copy(*header);
973 messages[arg].emplace_back(std::move(newHeader));
974 messages[arg].emplace_back(std::move(cache[cacheId] | get_payload{pi, 0}));
975 }
976 };
977
978 // Outer loop here.
979 jumpToCacheEntryAssociatedWith(slot);
980 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
981 copyHeaderPayloadToOutput(slot, ai);
982 }
983
984 return std::move(messages);
985}
986
988{
989 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
990
991 for (auto& cache : mCache) {
992 cache.clear();
993 }
994 for (size_t s = 0; s < mTimesliceIndex.size(); ++s) {
995 mTimesliceIndex.markAsInvalid(TimesliceSlot{s});
996 }
997}
998
999size_t
1001{
1002 return mCache.size() / mDistinctRoutesIndex.size();
1003}
1004
1010{
1011 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1012
1013 mTimesliceIndex.resize(s);
1014 mVariableContextes.resize(s);
1016}
1017
1019{
1020 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1021
1022 auto numInputTypes = mDistinctRoutesIndex.size();
1023 // FIXME: many of the DataRelayer function rely on allocated cache, so its
1024 // maybe misleading to have the allocation in a function primarily for
1025 // metrics publishing, do better in setPipelineLength?
1026 mCache.resize(numInputTypes * mTimesliceIndex.size());
1027 auto& states = mContext.get<DataProcessingStates>();
1028
1029 mCachedStateMetrics.resize(mCache.size());
1030
1031 // There is maximum 16 variables available. We keep them row-wise so that
1032 // that we can take mod 16 of the index to understand which variable we
1033 // are talking about.
1034 for (size_t i = 0; i < mVariableContextes.size(); ++i) {
1036 .name = fmt::format("matcher_variables/{}", i),
1037 .stateId = static_cast<short>((short)(ProcessingStateId::CONTEXT_VARIABLES_BASE) + i),
1038 .minPublishInterval = 500, // if we publish too often we flood the GUI and we are not able to read it in any case
1039 .sendInitialValue = true,
1040 .defaultEnabled = mContext.get<DriverConfig const>().driverHasGUI,
1041 });
1042 }
1043
1044 for (int ci = 0; ci < mTimesliceIndex.size(); ci++) {
1046 .name = fmt::format("data_relayer/{}", ci),
1047 .stateId = static_cast<short>((short)(ProcessingStateId::DATA_RELAYER_BASE) + (short)ci),
1048 .minPublishInterval = 800, // if we publish too often we flood the GUI and we are not able to read it in any case
1049 .sendInitialValue = true,
1050 .defaultEnabled = mContext.get<DriverConfig const>().driverHasGUI,
1051 });
1052 }
1053}
1054
1056{
1057 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1059}
1060
1062{
1063 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1065}
1066
1068{
1069 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1070 return VariableContextHelpers::getRunNumber(mTimesliceIndex.getVariablesForSlot(slot));
1071}
1072
1074{
1075 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1077}
1078
1080{
1081 if (!mContext.get<DriverConfig const>().driverHasGUI) {
1082 return;
1083 }
1084 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1085 auto& states = mContext.get<DataProcessingStates>();
1086 for (size_t ci = 0; ci < mTimesliceIndex.size(); ++ci) {
1087 auto slot = TimesliceSlot{ci};
1088 sendVariableContextMetrics(mTimesliceIndex.getPublishedVariablesForSlot(slot), slot,
1089 states);
1090 }
1091 char relayerSlotState[1024];
1092 // The number of timeslices is encoded in each state
1093 // We serialise the state of a Timeslot in a given state.
1094 int written = snprintf(relayerSlotState, 1024, "%d ", (int)mTimesliceIndex.size());
1095 char* buffer = relayerSlotState + written;
1096 for (size_t ci = 0; ci < mTimesliceIndex.size(); ++ci) {
1097 for (size_t si = 0; si < mDistinctRoutesIndex.size(); ++si) {
1098 int index = ci * mDistinctRoutesIndex.size() + si;
1099 int value = static_cast<int>(mCachedStateMetrics[index]);
1100 buffer[si] = value + '0';
1101 // Anything which is done is actually already empty,
1102 // so after we report it we mark it as such.
1103 if (mCachedStateMetrics[index] == CacheEntryStatus::DONE) {
1104 mCachedStateMetrics[index] = CacheEntryStatus::EMPTY;
1105 }
1106 }
1107 buffer[mDistinctRoutesIndex.size()] = '\0';
1108 auto size = (int)(buffer - relayerSlotState + mDistinctRoutesIndex.size());
1109 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + ci), .size = size, .data = relayerSlotState});
1110 }
1111}
1112
1113} // namespace o2::framework
std::vector< framework::ConcreteDataMatcher > matchers
benchmark::State & state
std::vector< OutputRoute > routes
#define O2_BUILTIN_UNREACHABLE
o2::monitoring::Verbosity Verbosity
atype::type element
std::ostringstream debug
int32_t i
uint32_t op
bool valid
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:531
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_LOCKABLE(T)
Definition Tracing.h:20
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
void getReadyToProcess(std::vector< RecordAction > &completed)
std::function< void(TimesliceSlot, std::vector< std::vector< fair::mq::MessagePtr > > &, TimesliceIndex::OldestOutputInfo info)> OnDropCallback
void setPipelineLength(size_t s)
Tune the maximum number of in flight timeslices this can handle.
std::vector< std::vector< fair::mq::MessagePtr > > consumeAllInputsForTimeslice(TimesliceSlot id)
size_t getParallelTimeslices() const
Returns how many timeslices we can handle in parallel.
RelayChoice relay(void const *rawHeader, std::unique_ptr< fair::mq::Message > *messages, InputInfo const &info, size_t nMessages, size_t nPayloads=1, OnInsertionCallback onInsertion=nullptr, OnDropCallback onDrop=nullptr)
void pruneCache(TimesliceSlot slot, OnDropCallback onDrop=nullptr)
Prune the cache for a given slot.
DataRelayer(CompletionPolicy const &, std::vector< InputRoute > const &routes, TimesliceIndex &, ServiceRegistryRef, int)
std::function< void(ServiceRegistryRef &, std::span< fair::mq::MessagePtr > &)> OnInsertionCallback
std::vector< std::vector< fair::mq::MessagePtr > > consumeExistingInputsForTimeslice(TimesliceSlot id)
void setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
void sendContextState()
Send metrics with the VariableContext information.
TimesliceId getTimesliceForSlot(TimesliceSlot slot)
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
TimesliceIndex::OldestOutputInfo getOldestPossibleOutput() const
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
void clear()
Remove all pending messages.
virtual fair::mq::Device * device()=0
void markAsDirty(TimesliceSlot slot, bool value)
data_matcher::VariableContext & getPublishedVariablesForSlot(TimesliceSlot slot)
OldestInputInfo setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
OldestOutputInfo getOldestPossibleOutput() const
bool isDirty(TimesliceSlot const &slot) const
InputChannelInfo const & getChannelInfo(ChannelIndex channel) const
ActionTaken
The outcome for the processing of a given timeslot.
@ Wait
An obsolete slot is used to hold the new context and the old one is dropped.
@ DropObsolete
An invalid context is not inserted in the index and dropped.
@ DropInvalid
We wait for the oldest slot to complete.
@ ReplaceObsolete
An unused / invalid slot is used to hold the new context.
void rescan()
Mark all the cachelines as invalid, e.g. due to an out of band event.
bool validateSlot(TimesliceSlot slot, TimesliceId currentOldest)
bool isValid(TimesliceSlot const &slot) const
void markAsInvalid(TimesliceSlot slot)
OldestOutputInfo updateOldestPossibleOutput(bool rewinded)
data_matcher::VariableContext & getVariablesForSlot(TimesliceSlot slot)
void publish(void(*callback)(VariableContext const &, TimesliceSlot slot, void *context), void *context, TimesliceSlot slot)
ContextElement::Value const & get(size_t pos) const
GLint GLenum GLint x
Definition glcorearb.h:403
const GLfloat * m
Definition glcorearb.h:4066
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLuint index
Definition glcorearb.h:781
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum target
Definition glcorearb.h:1641
GLboolean * data
Definition glcorearb.h:298
GLintptr offset
Definition glcorearb.h:660
GLsizei GLenum const void * indices
Definition glcorearb.h:400
GLuint start
Definition glcorearb.h:469
GLuint * states
Definition glcorearb.h:4932
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
size_t matchToContext(void const *data, std::vector< DataDescriptorMatcher > const &matchers, std::vector< size_t > const &index, VariableContext &context)
constexpr int INVALID_INPUT
bool isCalibrationData(std::unique_ptr< fair::mq::Message > &first)
void sendVariableContextMetrics(VariableContext &context, TimesliceSlot slot, DataProcessingStates &states)
@ NoTransition
No pending transitions.
RuntimeErrorRef runtime_error_f(const char *,...)
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
void empty(int)
static constexpr int INVALID
std::string name
Name of the policy itself.
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
CallbackConfigureRelayer configureRelayer
CallbackFull callbackFull
Actual policy which decides what to do with a partial InputRecord, extended version.
static constexpr int32_t KEEP_AT_EOS_FLAG
Helper struct to hold statistics about the data processing happening.
@ Add
Update the rate of the metric given the amount since the last time.
Type type
What was the outcome of the relay operation.
Definition DataRelayer.h:64
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static std::string describe(InputSpec const &spec)
static unsigned int pipelineLength(unsigned int minLength)
get max number of timeslices in the queue
static bool onlineDeploymentMode()
@true if running online
Running state information of a given device.
Definition DeviceState.h:34
ProcessingType allowedProcessing
Definition DeviceState.h:79
fair::mq::Channel * channel
Definition ChannelInfo.h:51
Reference to an inflight part.
Definition PartRef.h:24
std::unique_ptr< fair::mq::Message > header
Definition PartRef.h:25
std::unique_ptr< fair::mq::Message > payload
Definition PartRef.h:26
static bool isValid(TimesliceId const &timeslice)
static constexpr uint64_t INVALID
static bool isValid(TimesliceSlot const &slot)
static constexpr uint64_t INVALID
static uint32_t getRunNumber(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFCounter(data_matcher::VariableContext const &variables)
static uint64_t getCreationTime(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFOrbit(data_matcher::VariableContext const &variables)
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
the base header struct Every header type must begin (i.e. derive) with this. Don't use this struct di...
Definition DataHeader.h:351
the main header struct
Definition DataHeader.h:620
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"