Project
Loading...
Searching...
No Matches
DataRelayer.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
16
22#include "Framework/DataRef.h"
24#include "Framework/InputSpan.h"
26#include "Framework/Logger.h"
27#include "Framework/PartRef.h"
33#include "DataRelayerHelpers.h"
34#include "InputRouteHelpers.h"
41
44
45#include <Monitoring/Metric.h>
46#include <Monitoring/Monitoring.h>
47
48#include <fairlogger/Logger.h>
49#include <fairmq/Channel.h>
50#include <functional>
51#include <fairmq/shmem/Message.h>
52#include <fairmq/Device.h>
53#include <fmt/format.h>
54#include <fmt/ostream.h>
55#include <span>
56#include <string>
57
58using namespace o2::framework::data_matcher;
61using Verbosity = o2::monitoring::Verbosity;
62
64// Stream which keeps track of the calibration lifetime logic
66
67namespace o2::framework
68{
69
70constexpr int INVALID_INPUT = -1;
71
73 std::vector<InputRoute> const& routes,
75 ServiceRegistryRef services,
76 int pipelineLength)
77 : mContext{services},
78 mTimesliceIndex{index},
79 mCompletionPolicy{policy},
80 mDistinctRoutesIndex{DataRelayerHelpers::createDistinctRouteIndex(routes)},
81 mInputMatchers{DataRelayerHelpers::createInputMatchers(routes)},
82 mMaxLanes{InputRouteHelpers::maxLanes(routes)}
83{
84 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
85
86 if (policy.configureRelayer == nullptr) {
87 if (pipelineLength == -1) {
88 auto getPipelineLengthHelper = [&services]() {
89 try {
90 return DefaultsHelpers::pipelineLength(*services.get<RawDeviceService>().device()->fConfig);
91 } catch (...) {
93 }
94 };
95 static int detectedPipelineLength = getPipelineLengthHelper();
96 pipelineLength = detectedPipelineLength;
97 }
98 setPipelineLength(pipelineLength);
99 } else {
100 policy.configureRelayer(*this);
101 }
102
103 // The queries are all the same, so we only have width 1
104 auto numInputTypes = mDistinctRoutesIndex.size();
105 auto& states = services.get<DataProcessingStates>();
106 std::string queries = "";
107 for (short i = 0; i < numInputTypes; ++i) {
108 char buffer[128];
109 assert(mDistinctRoutesIndex[i] < routes.size());
110 mInputs.push_back(routes[mDistinctRoutesIndex[i]].matcher);
111 auto& matcher = routes[mDistinctRoutesIndex[i]].matcher;
112 DataSpecUtils::describe(buffer, 127, matcher);
113 queries += std::string_view(buffer, strlen(buffer));
114 queries += ";";
115 }
116 auto stateId = (short)ProcessingStateId::DATA_QUERIES;
117 states.registerState({.name = "data_queries", .stateId = stateId, .sendInitialValue = true, .defaultEnabled = true});
118 states.updateState(DataProcessingStates::CommandSpec{.id = stateId, .size = (int)queries.size(), .data = queries.data()});
119 states.processCommandQueue();
120}
121
123{
124 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
125 auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
126 return VariableContextHelpers::getTimeslice(variables);
127}
128
129DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<ExpirationHandler> const& expirationHandlers,
130 ServiceRegistryRef services, bool createNew)
131{
132 LOGP(debug, "DataRelayer::processDanglingInputs");
133 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
134 auto& deviceProxy = services.get<FairMQDeviceProxy>();
135
136 ActivityStats activity;
138 if (expirationHandlers.empty()) {
139 LOGP(debug, "DataRelayer::processDanglingInputs: No expiration handlers");
140 return activity;
141 }
142 // Create any slot for the time based fields
143 std::vector<TimesliceSlot> slotsCreatedByHandlers;
144 if (createNew) {
145 LOGP(debug, "Creating new slot");
146 for (auto& handler : expirationHandlers) {
147 LOGP(debug, "handler.creator for {}", handler.name);
148 auto channelIndex = deviceProxy.getInputChannelIndex(handler.routeIndex);
149 slotsCreatedByHandlers.push_back(handler.creator(services, channelIndex));
150 }
151 }
152 // Count how many slots are not invalid
153 auto validSlots = 0;
154 for (auto slot : slotsCreatedByHandlers) {
155 if (slot.index == TimesliceSlot::INVALID) {
156 continue;
157 }
158 validSlots++;
159 }
160 if (validSlots > 0) {
161 activity.newSlots++;
162 LOGP(debug, "DataRelayer::processDanglingInputs: {} slots created by handler", validSlots);
163 } else {
164 LOGP(debug, "DataRelayer::processDanglingInputs: no slots created by handler");
165 }
166 // Outer loop, we process all the records because the fact that the record
167 // expires is independent from having received data for it.
168 int headerPresent = 0;
169 int payloadPresent = 0;
170 int noCheckers = 0;
171 int badSlot = 0;
172 int checkerDenied = 0;
173 for (size_t ti = 0; ti < mTimesliceIndex.size(); ++ti) {
174 TimesliceSlot slot{ti};
175 if (mTimesliceIndex.isValid(slot) == false) {
176 continue;
177 }
178 assert(mDistinctRoutesIndex.empty() == false);
179 auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
180 auto timestamp = VariableContextHelpers::getTimeslice(variables);
181 // We iterate on all the hanlders checking if they need to be expired.
182 for (size_t ei = 0; ei < expirationHandlers.size(); ++ei) {
183 auto& expirator = expirationHandlers[ei];
184 // We check that no data is already there for the given cell
185 // it is enough to check the first element
186 auto& part = mCache[ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value];
187 if (part.size() > 0 && part.header(0) != nullptr) {
188 headerPresent++;
189 continue;
190 }
191 if (part.size() > 0 && part.payload(0) != nullptr) {
192 payloadPresent++;
193 continue;
194 }
195 // We check that the cell can actually be expired.
196 if (!expirator.checker) {
197 noCheckers++;
198 continue;
199 }
200 if (slotsCreatedByHandlers[ei] != slot) {
201 badSlot++;
202 continue;
203 }
204
205 auto getPartialRecord = [&cache = mCache, numInputTypes = mDistinctRoutesIndex.size()](int li) -> std::span<MessageSet const> {
206 auto offset = li * numInputTypes;
207 assert(cache.size() >= offset + numInputTypes);
208 auto const start = cache.data() + offset;
209 auto const end = cache.data() + offset + numInputTypes;
210 return {start, end};
211 };
212
213 auto partial = getPartialRecord(ti);
214 // TODO: get the data ref from message model
215 auto getter = [&partial](size_t idx, size_t part) {
216 if (partial[idx].size() > 0 && partial[idx].header(part).get()) {
217 auto header = partial[idx].header(part).get();
218 auto payload = partial[idx].payload(part).get();
219 return DataRef{nullptr,
220 reinterpret_cast<const char*>(header->GetData()),
221 reinterpret_cast<char const*>(payload ? payload->GetData() : nullptr),
222 payload ? payload->GetSize() : 0};
223 }
224 return DataRef{};
225 };
226 auto nPartsGetter = [&partial](size_t idx) {
227 return partial[idx].size();
228 };
229 auto refCountGetter = [&partial](size_t idx) -> int {
230 auto& header = static_cast<const fair::mq::shmem::Message&>(*partial[idx].header(0));
231 return header.GetRefCount();
232 };
233 InputSpan span{getter, nPartsGetter, refCountGetter, static_cast<size_t>(partial.size())};
234 // Setup the input span
235
236 if (expirator.checker(services, timestamp.value, span) == false) {
237 checkerDenied++;
238 continue;
239 }
240
241 assert(ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value < mCache.size());
242 assert(expirator.handler);
243 PartRef newRef;
244 expirator.handler(services, newRef, variables);
245 part.reset(std::move(newRef));
246 activity.expiredSlots++;
247
248 mTimesliceIndex.markAsDirty(slot, true);
249 assert(part.header(0) != nullptr);
250 assert(part.payload(0) != nullptr);
251 }
252 }
253 LOGP(debug, "DataRelayer::processDanglingInputs headerPresent:{}, payloadPresent:{}, noCheckers:{}, badSlot:{}, checkerDenied:{}",
254 headerPresent, payloadPresent, noCheckers, badSlot, checkerDenied);
255 return activity;
256}
257
261size_t matchToContext(void const* data,
262 std::vector<DataDescriptorMatcher> const& matchers,
263 std::vector<size_t> const& index,
264 VariableContext& context)
265{
266 for (size_t ri = 0, re = index.size(); ri < re; ++ri) {
267 auto& matcher = matchers[index[ri]];
268
269 if (matcher.match(reinterpret_cast<char const*>(data), context)) {
270 context.commit();
271 return ri;
272 }
273 context.discard();
274 }
275 return INVALID_INPUT;
276}
277
281{
282 static const std::string nullstring{"null"};
283
284 context.publish([](VariableContext const& variables, TimesliceSlot slot, void* context) {
285 auto& states = *static_cast<DataProcessingStates*>(context);
286 static std::string state = "";
287 state.clear();
288 for (size_t i = 0; i < MAX_MATCHING_VARIABLE; ++i) {
289 auto var = variables.get(i);
290 if (auto pval = std::get_if<uint64_t>(&var)) {
291 state += std::to_string(*pval);
292 } else if (auto pval = std::get_if<uint32_t>(&var)) {
293 state += std::to_string(*pval);
294 } else if (auto pval2 = std::get_if<std::string>(&var)) {
295 state += *pval2;
296 } else {
297 }
298 state += ";";
299 }
300 states.updateState({.id = short((int)ProcessingStateId::CONTEXT_VARIABLES_BASE + slot.index),
301 .size = (int)state.size(),
302 .data = state.data()});
303 },
304 &states, slot);
305}
306
308{
309 auto newOldest = mTimesliceIndex.setOldestPossibleInput(proposed, channel);
310 LOGP(debug, "DataRelayer::setOldestPossibleInput {} from channel {}", newOldest.timeslice.value, newOldest.channel.value);
311 static bool dontDrop = getenv("DPL_DONT_DROP_OLD_TIMESLICE") && atoi(getenv("DPL_DONT_DROP_OLD_TIMESLICE"));
312 if (dontDrop) {
313 return;
314 }
315 for (size_t si = 0; si < mCache.size() / mInputs.size(); ++si) {
316 auto& variables = mTimesliceIndex.getVariablesForSlot({si});
317 auto timestamp = VariableContextHelpers::getTimeslice(variables);
318 auto valid = mTimesliceIndex.validateSlot({si}, newOldest.timeslice);
319 if (valid) {
320 if (mTimesliceIndex.isValid({si})) {
321 LOGP(debug, "Keeping slot {} because data has timestamp {} while oldest possible timestamp is {}", si, timestamp.value, newOldest.timeslice.value);
322 }
323 continue;
324 }
325 mPruneOps.push_back(PruneOp{si});
326 bool didDrop = false;
327 for (size_t mi = 0; mi < mInputs.size(); ++mi) {
328 auto& input = mInputs[mi];
329 auto& element = mCache[si * mInputs.size() + mi];
330 if (element.size() != 0) {
331 if (input.lifetime != Lifetime::Condition && mCompletionPolicy.name != "internal-dpl-injected-dummy-sink") {
332 didDrop = true;
333 auto& state = mContext.get<DeviceState>();
335 LOGP(warning, "Stop transition requested. Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it will never be completed.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
336 } else {
337 LOGP(error, "Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it can never be completed.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
338 }
339 } else {
340 LOGP(debug,
341 "Silently dropping data {} in pipeline slot {} because it has timeslice {} < {} after receiving data from channel {}."
342 "Because Lifetime::Timeframe data not there and not expected (e.g. due to sampling) we drop non sampled, non timeframe data (e.g. Conditions).",
343 DataSpecUtils::describe(input), si, timestamp.value, newOldest.timeslice.value,
344 mTimesliceIndex.getChannelInfo(channel).channel->GetName());
345 }
346 }
347 }
348 // We did drop some data. Let's print what was missing.
349 if (didDrop) {
350 for (size_t mi = 0; mi < mInputs.size(); ++mi) {
351 auto& input = mInputs[mi];
352 if (input.lifetime == Lifetime::Timer) {
353 continue;
354 }
355 auto& element = mCache[si * mInputs.size() + mi];
356 if (element.size() == 0) {
357 auto& state = mContext.get<DeviceState>();
359 if (state.allowedProcessing == DeviceState::CalibrationOnly) {
360 O2_SIGNPOST_ID_GENERATE(cid, calibration);
361 O2_SIGNPOST_EVENT_EMIT(calibration, cid, "expected_missing_data", "Expected missing %{public}s (lifetime:%d) while dropping non-calibration data in slot %zu with timestamp %zu < %zu.",
362 DataSpecUtils::describe(input).c_str(), (int)input.lifetime, si, timestamp.value, newOldest.timeslice.value);
363 } else {
364 LOGP(info, "Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
365 }
366 } else {
367 if (state.allowedProcessing == DeviceState::CalibrationOnly) {
368 O2_SIGNPOST_ID_GENERATE(cid, calibration);
369 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "expected_missing_data", "Not processing in calibration mode: missing %s (lifetime:%d) while dropping incomplete data in slot %zu with timestamp %zu < %zu.",
370 DataSpecUtils::describe(input).c_str(), (int)input.lifetime, si, timestamp.value, newOldest.timeslice.value);
371 } else {
372 LOGP(error, "Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
373 }
374 }
375 }
376 }
377 }
378 }
379}
380
385
387{
388 for (auto& op : mPruneOps) {
389 this->pruneCache(op.slot, onDrop);
390 }
391 mPruneOps.clear();
392}
393
395{
396 // We need to prune the cache from the old stuff, if any. Otherwise we
397 // simply store the payload in the cache and we mark relevant bit in the
398 // hence the first if.
399 auto pruneCache = [&onDrop,
400 &cache = mCache,
401 &cachedStateMetrics = mCachedStateMetrics,
402 numInputTypes = mDistinctRoutesIndex.size(),
403 &index = mTimesliceIndex,
404 ref = mContext](TimesliceSlot slot) {
405 if (onDrop) {
406 auto oldestPossibleTimeslice = index.getOldestPossibleOutput();
407 // State of the computation
408 std::vector<MessageSet> dropped(numInputTypes);
409 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
410 auto cacheId = slot.index * numInputTypes + ai;
411 cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
412 // TODO: in the original implementation of the cache, there have been only two messages per entry,
413 // check if the 2 above corresponds to the number of messages.
414 if (cache[cacheId].size() > 0) {
415 dropped[ai] = std::move(cache[cacheId]);
416 }
417 }
418 bool anyDropped = std::any_of(dropped.begin(), dropped.end(), [](auto& m) { return m.size(); });
419 if (anyDropped) {
420 O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
421 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "pruneCache", "Dropping stuff from slot %zu with timeslice %zu", slot.index, oldestPossibleTimeslice.timeslice.value);
422 onDrop(slot, dropped, oldestPossibleTimeslice);
423 }
424 }
425 assert(cache.empty() == false);
426 assert(index.size() * numInputTypes == cache.size());
427 // Prune old stuff from the cache, hopefully deleting it...
428 // We set the current slot to the timeslice value, so that old stuff
429 // will be ignored.
430 assert(numInputTypes * slot.index < cache.size());
431 for (size_t ai = slot.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
432 cache[ai].clear();
433 cachedStateMetrics[ai] = CacheEntryStatus::EMPTY;
434 }
435 };
436
437 pruneCache(slot);
438}
439
440bool isCalibrationData(std::unique_ptr<fair::mq::Message>& first)
441{
442 auto* dph = o2::header::get<DataProcessingHeader*>(first->GetData());
444}
445
447 DataRelayer::relay(void const* rawHeader,
448 std::unique_ptr<fair::mq::Message>* messages,
449 InputInfo const& info,
450 size_t nMessages,
451 size_t nPayloads,
452 OnInsertionCallback onInsertion,
453 OnDropCallback onDrop)
454{
455 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
456 DataProcessingHeader const* dph = o2::header::get<DataProcessingHeader*>(rawHeader);
457 // IMPLEMENTATION DETAILS
458 //
459 // This returns true if a given slot is available for the current number of lanes
460 auto isSlotInLane = [currentLane = dph->startTime, maxLanes = mMaxLanes](TimesliceSlot slot) {
461 return (slot.index % maxLanes) == (currentLane % maxLanes);
462 };
463 // This returns the identifier for the given input. We use a separate
464 // function because while it's trivial now, the actual matchmaking will
465 // become more complicated when we will start supporting ranges.
466 auto getInputTimeslice = [&matchers = mInputMatchers,
467 &distinctRoutes = mDistinctRoutesIndex,
468 &rawHeader,
469 &index = mTimesliceIndex](VariableContext& context)
470 -> std::tuple<int, TimesliceId> {
473 auto input = matchToContext(rawHeader, matchers, distinctRoutes, context);
474
475 if (input == INVALID_INPUT) {
476 return {
479 };
480 }
483 if (auto pval = std::get_if<uint64_t>(&context.get(0))) {
484 TimesliceId timeslice{*pval};
485 return {input, timeslice};
486 }
487 // If we get here it means we need to push something out of the cache.
488 return {
491 };
492 };
493
494 // Actually save the header / payload in the slot
495 auto saveInSlot = [&cachedStateMetrics = mCachedStateMetrics,
496 &messages,
497 &nMessages,
498 &nPayloads,
499 &onInsertion,
500 &cache = mCache,
501 &services = mContext,
502 numInputTypes = mDistinctRoutesIndex.size()](TimesliceId timeslice, int input, TimesliceSlot slot, InputInfo const& info) -> size_t {
503 O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
504 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "saveInSlot", "saving %{public}s@%zu in slot %zu from %{public}s",
505 fmt::format("{:x}", *o2::header::get<DataHeader*>(messages[0]->GetData())).c_str(),
506 timeslice.value, slot.index,
507 info.index.value == ChannelIndex::INVALID ? "invalid" : services.get<FairMQDeviceProxy>().getInputChannel(info.index)->GetName().c_str());
508 auto cacheIdx = numInputTypes * slot.index + input;
509 MessageSet& target = cache[cacheIdx];
510 cachedStateMetrics[cacheIdx] = CacheEntryStatus::PENDING;
511 // TODO: make sure that multiple parts can only be added within the same call of
512 // DataRelayer::relay
513 assert(nPayloads > 0);
514 size_t saved = 0;
515 // It's guaranteed we will see all these messages only once, so we can
516 // do the forwarding here.
517 auto allMessages = std::span<fair::mq::MessagePtr>(messages, messages + nMessages);
518 if (onInsertion) {
519 onInsertion(services, allMessages);
520 }
521 for (size_t mi = 0; mi < nMessages; ++mi) {
522 assert(mi + nPayloads < nMessages);
523 // We are in calibration mode and the data does not have the calibration bit set.
524 // We do not store it.
526 O2_SIGNPOST_ID_FROM_POINTER(cid, calibration, &services.get<DataProcessorContext>());
527 O2_SIGNPOST_EVENT_EMIT(calibration, cid, "calibration",
528 "Dropping incoming %zu messages because they are data processing.", nPayloads);
529 // Actually dropping messages.
530 for (size_t i = mi; i < mi + nPayloads + 1; i++) {
531 auto discard = std::move(messages[i]);
532 }
533 mi += nPayloads;
534 continue;
535 }
536 auto span = std::span<fair::mq::MessagePtr>(messages + mi, messages + mi + nPayloads + 1);
537 // Notice this will split [(header, payload), (header, payload)] multiparts
538 // in N different subParts for the message spec.
539 target.add([&span](size_t i) -> fair::mq::MessagePtr& { return span[i]; }, nPayloads + 1);
540 mi += nPayloads;
541 saved += nPayloads;
542 }
543 return saved;
544 };
545
546 auto updateStatistics = [ref = mContext](TimesliceIndex::ActionTaken action) {
547 auto& stats = ref.get<DataProcessingStats>();
548
549 // Update statistics for what happened
550 switch (action) {
552 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
553 break;
555 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
556 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), DataProcessingStats::Op::Add, (int)1});
557 break;
559 stats.updateStats({static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), DataProcessingStats::Op::Add, (int)1});
560 break;
562 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
563 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), DataProcessingStats::Op::Add, (int)1});
564 break;
566 break;
567 }
568 };
569
570 // OUTER LOOP
571 //
572 // This is the actual outer loop processing input as part of a given
573 // timeslice. All the other implementation details are hidden by the lambdas
574 auto input = INVALID_INPUT;
575 auto timeslice = TimesliceId{TimesliceId::INVALID};
577 auto& index = mTimesliceIndex;
578
579 bool needsCleaning = false;
580 // First look for matching slots which already have some
581 // partial match.
582 for (size_t ci = 0; ci < index.size(); ++ci) {
583 slot = TimesliceSlot{ci};
584 if (!isSlotInLane(slot)) {
585 continue;
586 }
587 if (index.isValid(slot) == false) {
588 continue;
589 }
590 std::tie(input, timeslice) = getInputTimeslice(index.getVariablesForSlot(slot));
591 if (input != INVALID_INPUT) {
592 break;
593 }
594 }
595
596 // If we did not find anything, look for slots which
597 // are invalid.
598 if (input == INVALID_INPUT) {
599 for (size_t ci = 0; ci < index.size(); ++ci) {
600 slot = TimesliceSlot{ci};
601 if (index.isValid(slot) == true) {
602 continue;
603 }
604 if (!isSlotInLane(slot)) {
605 continue;
606 }
607 std::tie(input, timeslice) = getInputTimeslice(index.getVariablesForSlot(slot));
608 if (input != INVALID_INPUT) {
609 needsCleaning = true;
610 break;
611 }
612 }
613 }
614
615 auto& stats = mContext.get<DataProcessingStats>();
617 if (input != INVALID_INPUT && TimesliceId::isValid(timeslice) && TimesliceSlot::isValid(slot)) {
618 if (needsCleaning) {
619 this->pruneCache(slot, onDrop);
620 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](const auto& x) { return x.slot == slot; }), mPruneOps.end());
621 }
622 size_t saved = saveInSlot(timeslice, input, slot, info);
623 if (saved == 0) {
624 return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
625 }
626 index.publishSlot(slot);
627 index.markAsDirty(slot, true);
628 stats.updateStats({static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), DataProcessingStats::Op::Add, (int)1});
629 return RelayChoice{.type = RelayChoice::Type::WillRelay, .timeslice = timeslice};
630 }
631
634 VariableContext pristineContext;
635 std::tie(input, timeslice) = getInputTimeslice(pristineContext);
636
637 auto DataHeaderInfo = [&rawHeader]() {
638 std::string error;
639 // extract header from message model
640 const auto* dh = o2::header::get<o2::header::DataHeader*>(rawHeader);
641 if (dh) {
642 error += fmt::format("{}/{}/{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification);
643 } else {
644 error += "invalid header";
645 }
646 return error;
647 };
648
649 if (input == INVALID_INPUT) {
650 LOG(error) << "Could not match incoming data to any input route: " << DataHeaderInfo();
651 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
652 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
653 for (size_t pi = 0; pi < nMessages; ++pi) {
654 messages[pi].reset(nullptr);
655 }
656 return RelayChoice{.type = RelayChoice::Type::Invalid, .timeslice = timeslice};
657 }
658
659 if (TimesliceId::isValid(timeslice) == false) {
660 LOG(error) << "Could not determine the timeslice for input: " << DataHeaderInfo();
661 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
662 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
663 for (size_t pi = 0; pi < nMessages; ++pi) {
664 messages[pi].reset(nullptr);
665 }
666 return RelayChoice{.type = RelayChoice::Type::Invalid, .timeslice = timeslice};
667 }
668
669 O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
671 std::tie(action, slot) = index.replaceLRUWith(pristineContext, timeslice);
672 uint64_t const* debugTimestamp = std::get_if<uint64_t>(&pristineContext.get(0));
673 if (action != TimesliceIndex::ActionTaken::Wait) {
674 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "saveInSlot",
675 "Slot %zu updated with %zu using action %d, %" PRIu64, slot.index, timeslice.value, (int)action, *debugTimestamp);
676 }
677
678 updateStatistics(action);
679
680 switch (action) {
682 return RelayChoice{.type = RelayChoice::Type::Backpressured, .timeslice = timeslice};
684 static std::atomic<size_t> obsoleteCount = 0;
685 static std::atomic<size_t> mult = 1;
686 if ((obsoleteCount++ % (1 * mult)) == 0) {
687 LOGP(warning, "Over {} incoming messages are already obsolete, not relaying.", obsoleteCount.load());
688 if (obsoleteCount > mult * 10) {
689 mult = mult * 10;
690 }
691 }
692 return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
694 LOG(warning) << "Incoming data is invalid, not relaying.";
695 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
696 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
697 for (size_t pi = 0; pi < nMessages; ++pi) {
698 messages[pi].reset(nullptr);
699 }
700 return RelayChoice{.type = RelayChoice::Type::Invalid, .timeslice = timeslice};
703 // At this point the variables match the new input but the
704 // cache still holds the old data, so we prune it.
705 this->pruneCache(slot, onDrop);
706 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](const auto& x) { return x.slot == slot; }), mPruneOps.end());
707 size_t saved = saveInSlot(timeslice, input, slot, info);
708 if (saved == 0) {
709 return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
710 }
711 index.publishSlot(slot);
712 index.markAsDirty(slot, true);
713 return RelayChoice{.type = RelayChoice::Type::WillRelay, .timeslice = timeslice};
714 }
716}
717
718void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& completed)
719{
720 LOGP(debug, "DataRelayer::getReadyToProcess");
721 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
722
723 // THE STATE
724 const auto& cache = mCache;
725 const auto numInputTypes = mDistinctRoutesIndex.size();
726 //
727 // THE IMPLEMENTATION DETAILS
728 //
729 // We use this to bail out early from the check as soon as we find something
730 // which we know is not complete.
731 auto getPartialRecord = [&cache, &numInputTypes](int li) -> std::span<MessageSet const> {
732 auto offset = li * numInputTypes;
733 assert(cache.size() >= offset + numInputTypes);
734 auto const start = cache.data() + offset;
735 auto const end = cache.data() + offset + numInputTypes;
736 return {start, end};
737 };
738
739 // These two are trivial, but in principle the whole loop could be parallelised
740 // or vectorised so "completed" could be a thread local variable which needs
741 // merging at the end.
742 auto updateCompletionResults = [&completed](TimesliceSlot li, uint64_t const* timeslice, CompletionPolicy::CompletionOp op) {
743 if (timeslice) {
744 LOGP(debug, "Doing action {} for slot {} (timeslice: {})", (int)op, li.index, *timeslice);
745 completed.emplace_back(RecordAction{li, {*timeslice}, op});
746 } else {
747 LOGP(debug, "No timeslice associated with slot ", li.index);
748 }
749 };
750
751 // THE OUTER LOOP
752 //
753 // To determine if a line is complete, we iterate on all the arguments
754 // and check if they are ready. We do it this way, because in the end
755 // the number of inputs is going to be small and having a more complex
756 // structure will probably result in a larger footprint in any case.
757 // Also notice that ai == inputsNumber only when we reach the end of the
758 // iteration, that means we have found all the required bits.
759 //
760 // Notice that the only time numInputTypes is 0 is when we are a dummy
761 // device created as a source for timers / conditions.
762 if (numInputTypes == 0) {
763 LOGP(debug, "numInputTypes == 0, returning.");
764 return;
765 }
766 size_t cacheLines = cache.size() / numInputTypes;
767 assert(cacheLines * numInputTypes == cache.size());
768 int countConsume = 0;
769 int countConsumeExisting = 0;
770 int countProcess = 0;
771 int countDiscard = 0;
772 int countWait = 0;
773 int notDirty = 0;
774
775 for (int li = cacheLines - 1; li >= 0; --li) {
776 TimesliceSlot slot{(size_t)li};
777 // We only check the cachelines which have been updated by an incoming
778 // message.
779 if (mTimesliceIndex.isDirty(slot) == false) {
780 notDirty++;
781 continue;
782 }
783 if (!mCompletionPolicy.callbackFull) {
784 throw runtime_error_f("Completion police %s has no callback set", mCompletionPolicy.name.c_str());
785 }
786 auto partial = getPartialRecord(li);
787 // TODO: get the data ref from message model
788 auto getter = [&partial](size_t idx, size_t part) {
789 if (partial[idx].size() > 0 && partial[idx].header(part).get()) {
790 auto header = partial[idx].header(part).get();
791 auto payload = partial[idx].payload(part).get();
792 return DataRef{nullptr,
793 reinterpret_cast<const char*>(header->GetData()),
794 reinterpret_cast<char const*>(payload ? payload->GetData() : nullptr),
795 payload ? payload->GetSize() : 0};
796 }
797 return DataRef{};
798 };
799 auto nPartsGetter = [&partial](size_t idx) {
800 return partial[idx].size();
801 };
802 auto refCountGetter = [&partial](size_t idx) -> int {
803 auto& header = static_cast<const fair::mq::shmem::Message&>(*partial[idx].header(0));
804 return header.GetRefCount();
805 };
806 InputSpan span{getter, nPartsGetter, refCountGetter, static_cast<size_t>(partial.size())};
807 CompletionPolicy::CompletionOp action = mCompletionPolicy.callbackFull(span, mInputs, mContext);
808
809 auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
810 auto timeslice = std::get_if<uint64_t>(&variables.get(0));
811 switch (action) {
813 countConsume++;
814 updateCompletionResults(slot, timeslice, action);
815 mTimesliceIndex.markAsDirty(slot, false);
816 break;
818 // This is just like Consume, but we also mark all slots as dirty
819 countConsume++;
821 updateCompletionResults(slot, timeslice, action);
822 mTimesliceIndex.rescan();
823 break;
825 countConsumeExisting++;
826 updateCompletionResults(slot, timeslice, action);
827 mTimesliceIndex.markAsDirty(slot, false);
828 break;
830 countProcess++;
831 updateCompletionResults(slot, timeslice, action);
832 mTimesliceIndex.markAsDirty(slot, false);
833 break;
835 countDiscard++;
836 updateCompletionResults(slot, timeslice, action);
837 mTimesliceIndex.markAsDirty(slot, false);
838 break;
840 countWait++;
841 mTimesliceIndex.markAsDirty(slot, true);
843 break;
845 countWait++;
846 mTimesliceIndex.markAsDirty(slot, false);
847 break;
848 }
849 }
850 mTimesliceIndex.updateOldestPossibleOutput(false);
851 LOGP(debug, "DataRelayer::getReadyToProcess results notDirty:{}, consume:{}, consumeExisting:{}, process:{}, discard:{}, wait:{}",
852 notDirty, countConsume, countConsumeExisting, countProcess,
853 countDiscard, countWait);
854}
855
857{
858 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
859 const auto numInputTypes = mDistinctRoutesIndex.size();
860
861 auto markInputDone = [&cachedStateMetrics = mCachedStateMetrics,
862 &numInputTypes](TimesliceSlot s, size_t arg, CacheEntryStatus oldStatus, CacheEntryStatus newStatus) {
863 auto cacheId = s.index * numInputTypes + arg;
864 if (cachedStateMetrics[cacheId] == oldStatus) {
865 cachedStateMetrics[cacheId] = newStatus;
866 }
867 };
868
869 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
870 markInputDone(slot, ai, oldStatus, newStatus);
871 }
872}
873
874std::vector<o2::framework::MessageSet> DataRelayer::consumeAllInputsForTimeslice(TimesliceSlot slot)
875{
876 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
877
878 const auto numInputTypes = mDistinctRoutesIndex.size();
879 // State of the computation
880 std::vector<MessageSet> messages(numInputTypes);
881 auto& cache = mCache;
882 auto& index = mTimesliceIndex;
883
884 // Nothing to see here, this is just to make the outer loop more understandable.
885 auto jumpToCacheEntryAssociatedWith = [](TimesliceSlot) {
886 return;
887 };
888
889 // We move ownership so that the cache can be reused once the computation is
890 // finished. We mark the given cache slot invalid, so that it can be reused
891 // This means we can still handle old messages if there is still space in the
892 // cache where to put them.
893 auto moveHeaderPayloadToOutput = [&messages,
894 &cachedStateMetrics = mCachedStateMetrics,
895 &cache, &index, &numInputTypes](TimesliceSlot s, size_t arg) {
896 auto cacheId = s.index * numInputTypes + arg;
897 cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
898 // TODO: in the original implementation of the cache, there have been only two messages per entry,
899 // check if the 2 above corresponds to the number of messages.
900 if (cache[cacheId].size() > 0) {
901 messages[arg] = std::move(cache[cacheId]);
902 }
903 index.markAsInvalid(s);
904 };
905
906 // An invalid set of arguments is a set of arguments associated to an invalid
907 // timeslice, so I can simply do that. I keep the assertion there because in principle
908 // we should have dispatched the timeslice already!
909 // FIXME: what happens when we have enough timeslices to hit the invalid one?
910 auto invalidateCacheFor = [&numInputTypes, &index, &cache](TimesliceSlot s) {
911 for (size_t ai = s.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
912 assert(std::accumulate(cache[ai].messages.begin(), cache[ai].messages.end(), true, [](bool result, auto const& element) { return result && element.get() == nullptr; }));
913 cache[ai].clear();
914 }
915 index.markAsInvalid(s);
916 };
917
918 // Outer loop here.
919 jumpToCacheEntryAssociatedWith(slot);
920 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
921 moveHeaderPayloadToOutput(slot, ai);
922 }
923 invalidateCacheFor(slot);
924
925 return messages;
926}
927
928std::vector<o2::framework::MessageSet> DataRelayer::consumeExistingInputsForTimeslice(TimesliceSlot slot)
929{
930 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
931
932 const auto numInputTypes = mDistinctRoutesIndex.size();
933 // State of the computation
934 std::vector<MessageSet> messages(numInputTypes);
935 auto& cache = mCache;
936 auto& index = mTimesliceIndex;
937
938 // Nothing to see here, this is just to make the outer loop more understandable.
939 auto jumpToCacheEntryAssociatedWith = [](TimesliceSlot) {
940 return;
941 };
942
943 // We move ownership so that the cache can be reused once the computation is
944 // finished. We mark the given cache slot invalid, so that it can be reused
945 // This means we can still handle old messages if there is still space in the
946 // cache where to put them.
947 auto copyHeaderPayloadToOutput = [&messages,
948 &cachedStateMetrics = mCachedStateMetrics,
949 &cache, &index, &numInputTypes](TimesliceSlot s, size_t arg) {
950 auto cacheId = s.index * numInputTypes + arg;
951 cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
952 // TODO: in the original implementation of the cache, there have been only two messages per entry,
953 // check if the 2 above corresponds to the number of messages.
954 for (size_t pi = 0; pi < cache[cacheId].size(); pi++) {
955 auto& header = cache[cacheId].header(pi);
956 auto&& newHeader = header->GetTransport()->CreateMessage();
957 newHeader->Copy(*header);
958 messages[arg].add(PartRef{std::move(newHeader), std::move(cache[cacheId].payload(pi))});
959 }
960 };
961
962 // Outer loop here.
963 jumpToCacheEntryAssociatedWith(slot);
964 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
965 copyHeaderPayloadToOutput(slot, ai);
966 }
967
968 return std::move(messages);
969}
970
972{
973 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
974
975 for (auto& cache : mCache) {
976 cache.clear();
977 }
978 for (size_t s = 0; s < mTimesliceIndex.size(); ++s) {
979 mTimesliceIndex.markAsInvalid(TimesliceSlot{s});
980 }
981}
982
983size_t
985{
986 return mCache.size() / mDistinctRoutesIndex.size();
987}
988
994{
995 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
996
997 mTimesliceIndex.resize(s);
998 mVariableContextes.resize(s);
1000}
1001
1003{
1004 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1005
1006 auto numInputTypes = mDistinctRoutesIndex.size();
1007 // FIXME: many of the DataRelayer function rely on allocated cache, so its
1008 // maybe misleading to have the allocation in a function primarily for
1009 // metrics publishing, do better in setPipelineLength?
1010 mCache.resize(numInputTypes * mTimesliceIndex.size());
1011 auto& states = mContext.get<DataProcessingStates>();
1012
1013 mCachedStateMetrics.resize(mCache.size());
1014
1015 // There is maximum 16 variables available. We keep them row-wise so that
1016 // that we can take mod 16 of the index to understand which variable we
1017 // are talking about.
1018 for (size_t i = 0; i < mVariableContextes.size(); ++i) {
1020 .name = fmt::format("matcher_variables/{}", i),
1021 .stateId = static_cast<short>((short)(ProcessingStateId::CONTEXT_VARIABLES_BASE) + i),
1022 .minPublishInterval = 500, // if we publish too often we flood the GUI and we are not able to read it in any case
1023 .sendInitialValue = true,
1024 .defaultEnabled = mContext.get<DriverConfig const>().driverHasGUI,
1025 });
1026 }
1027
1028 for (int ci = 0; ci < mTimesliceIndex.size(); ci++) {
1030 .name = fmt::format("data_relayer/{}", ci),
1031 .stateId = static_cast<short>((short)(ProcessingStateId::DATA_RELAYER_BASE) + (short)ci),
1032 .minPublishInterval = 800, // if we publish too often we flood the GUI and we are not able to read it in any case
1033 .sendInitialValue = true,
1034 .defaultEnabled = mContext.get<DriverConfig const>().driverHasGUI,
1035 });
1036 }
1037}
1038
1040{
1041 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1043}
1044
1046{
1047 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1049}
1050
1052{
1053 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1054 return VariableContextHelpers::getRunNumber(mTimesliceIndex.getVariablesForSlot(slot));
1055}
1056
1058{
1059 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1061}
1062
1064{
1065 if (!mContext.get<DriverConfig const>().driverHasGUI) {
1066 return;
1067 }
1068 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1069 auto& states = mContext.get<DataProcessingStates>();
1070 for (size_t ci = 0; ci < mTimesliceIndex.size(); ++ci) {
1071 auto slot = TimesliceSlot{ci};
1072 sendVariableContextMetrics(mTimesliceIndex.getPublishedVariablesForSlot(slot), slot,
1073 states);
1074 }
1075 char relayerSlotState[1024];
1076 // The number of timeslices is encoded in each state
1077 // We serialise the state of a Timeslot in a given state.
1078 int written = snprintf(relayerSlotState, 1024, "%d ", (int)mTimesliceIndex.size());
1079 char* buffer = relayerSlotState + written;
1080 for (size_t ci = 0; ci < mTimesliceIndex.size(); ++ci) {
1081 for (size_t si = 0; si < mDistinctRoutesIndex.size(); ++si) {
1082 int index = ci * mDistinctRoutesIndex.size() + si;
1083 int value = static_cast<int>(mCachedStateMetrics[index]);
1084 buffer[si] = value + '0';
1085 // Anything which is done is actually already empty,
1086 // so after we report it we mark it as such.
1087 if (mCachedStateMetrics[index] == CacheEntryStatus::DONE) {
1088 mCachedStateMetrics[index] = CacheEntryStatus::EMPTY;
1089 }
1090 }
1091 buffer[mDistinctRoutesIndex.size()] = '\0';
1092 auto size = (int)(buffer - relayerSlotState + mDistinctRoutesIndex.size());
1093 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + ci), .size = size, .data = relayerSlotState});
1094 }
1095}
1096
1097} // namespace o2::framework
std::vector< framework::ConcreteDataMatcher > matchers
benchmark::State & state
std::vector< OutputRoute > routes
#define O2_BUILTIN_UNREACHABLE
o2::monitoring::Verbosity Verbosity
atype::type element
std::ostringstream debug
int32_t i
uint32_t op
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:531
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_LOCKABLE(T)
Definition Tracing.h:20
std::vector< MessageSet > consumeExistingInputsForTimeslice(TimesliceSlot id)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
std::function< void(TimesliceSlot, std::vector< MessageSet > &, TimesliceIndex::OldestOutputInfo info)> OnDropCallback
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
void getReadyToProcess(std::vector< RecordAction > &completed)
void setPipelineLength(size_t s)
Tune the maximum number of in flight timeslices this can handle.
size_t getParallelTimeslices() const
Returns how many timeslices we can handle in parallel.
std::vector< MessageSet > consumeAllInputsForTimeslice(TimesliceSlot id)
RelayChoice relay(void const *rawHeader, std::unique_ptr< fair::mq::Message > *messages, InputInfo const &info, size_t nMessages, size_t nPayloads=1, OnInsertionCallback onInsertion=nullptr, OnDropCallback onDrop=nullptr)
void pruneCache(TimesliceSlot slot, OnDropCallback onDrop=nullptr)
Prune the cache for a given slot.
DataRelayer(CompletionPolicy const &, std::vector< InputRoute > const &routes, TimesliceIndex &, ServiceRegistryRef, int)
std::function< void(ServiceRegistryRef &, std::span< fair::mq::MessagePtr > &)> OnInsertionCallback
void setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
void sendContextState()
Send metrics with the VariableContext information.
TimesliceId getTimesliceForSlot(TimesliceSlot slot)
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
TimesliceIndex::OldestOutputInfo getOldestPossibleOutput() const
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
void clear()
Remove all pending messages.
virtual fair::mq::Device * device()=0
void markAsDirty(TimesliceSlot slot, bool value)
data_matcher::VariableContext & getPublishedVariablesForSlot(TimesliceSlot slot)
OldestInputInfo setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
OldestOutputInfo getOldestPossibleOutput() const
bool isDirty(TimesliceSlot const &slot) const
InputChannelInfo const & getChannelInfo(ChannelIndex channel) const
ActionTaken
The outcome for the processing of a given timeslot.
@ Wait
An obsolete slot is used to hold the new context and the old one is dropped.
@ DropObsolete
An invalid context is not inserted in the index and dropped.
@ DropInvalid
We wait for the oldest slot to complete.
@ ReplaceObsolete
An unused / invalid slot is used to hold the new context.
void rescan()
Mark all the cachelines as invalid, e.g. due to an out of band event.
bool validateSlot(TimesliceSlot slot, TimesliceId currentOldest)
bool isValid(TimesliceSlot const &slot) const
void markAsInvalid(TimesliceSlot slot)
OldestOutputInfo updateOldestPossibleOutput(bool rewinded)
data_matcher::VariableContext & getVariablesForSlot(TimesliceSlot slot)
void publish(void(*callback)(VariableContext const &, TimesliceSlot slot, void *context), void *context, TimesliceSlot slot)
ContextElement::Value const & get(size_t pos) const
GLint GLenum GLint x
Definition glcorearb.h:403
const GLfloat * m
Definition glcorearb.h:4066
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLuint index
Definition glcorearb.h:781
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum target
Definition glcorearb.h:1641
GLboolean * data
Definition glcorearb.h:298
GLintptr offset
Definition glcorearb.h:660
GLuint start
Definition glcorearb.h:469
GLuint * states
Definition glcorearb.h:4932
Defining PrimaryVertex explicitly as messageable.
size_t matchToContext(void const *data, std::vector< DataDescriptorMatcher > const &matchers, std::vector< size_t > const &index, VariableContext &context)
constexpr int INVALID_INPUT
bool isCalibrationData(std::unique_ptr< fair::mq::Message > &first)
void sendVariableContextMetrics(VariableContext &context, TimesliceSlot slot, DataProcessingStates &states)
@ NoTransition
No pending transitions.
RuntimeErrorRef runtime_error_f(const char *,...)
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static constexpr int INVALID
std::string name
Name of the policy itself.
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
CallbackConfigureRelayer configureRelayer
CallbackFull callbackFull
Actual policy which decides what to do with a partial InputRecord, extended version.
static constexpr int32_t KEEP_AT_EOS_FLAG
Helper struct to hold statistics about the data processing happening.
@ Add
Update the rate of the metric given the amount since the last time.
Type type
What was the outcome of the relay operation.
Definition DataRelayer.h:64
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static std::string describe(InputSpec const &spec)
static unsigned int pipelineLength(unsigned int minLength)
get max number of timeslices in the queue
static bool onlineDeploymentMode()
@true if running online
Running state information of a given device.
Definition DeviceState.h:34
ProcessingType allowedProcessing
Definition DeviceState.h:79
fair::mq::Channel * channel
Definition ChannelInfo.h:51
Reference to an inflight part.
Definition PartRef.h:24
static bool isValid(TimesliceId const &timeslice)
static constexpr uint64_t INVALID
static bool isValid(TimesliceSlot const &slot)
static constexpr uint64_t INVALID
static uint32_t getRunNumber(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFCounter(data_matcher::VariableContext const &variables)
static uint64_t getCreationTime(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFOrbit(data_matcher::VariableContext const &variables)
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
the base header struct Every header type must begin (i.e. derive) with this. Don't use this struct di...
Definition DataHeader.h:351
the main header struct
Definition DataHeader.h:619
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"