Project
Loading...
Searching...
No Matches
DataRelayer.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
15
21#include "Framework/DataRef.h"
23#include "Framework/InputSpan.h"
25#include "Framework/Logger.h"
26#include "Framework/PartRef.h"
32#include "DataRelayerHelpers.h"
33#include "InputRouteHelpers.h"
39
42
43#include <Monitoring/Metric.h>
44#include <Monitoring/Monitoring.h>
45
46#include <fairmq/Channel.h>
47#include <functional>
48#if __has_include(<fairmq/shmem/Message.h>)
49#include <fairmq/shmem/Message.h>
50#endif
51#include <fmt/format.h>
52#include <fmt/ostream.h>
53#include <gsl/span>
54#include <string>
55
56using namespace o2::framework::data_matcher;
59using Verbosity = o2::monitoring::Verbosity;
60
62// Stream which keeps track of the calibration lifetime logic
64
65namespace o2::framework
66{
67
68constexpr int INVALID_INPUT = -1;
69
71 std::vector<InputRoute> const& routes,
73 ServiceRegistryRef services)
74 : mContext{services},
75 mTimesliceIndex{index},
76 mCompletionPolicy{policy},
77 mDistinctRoutesIndex{DataRelayerHelpers::createDistinctRouteIndex(routes)},
78 mInputMatchers{DataRelayerHelpers::createInputMatchers(routes)},
79 mMaxLanes{InputRouteHelpers::maxLanes(routes)}
80{
81 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
82
83 if (policy.configureRelayer == nullptr) {
84 static int pipelineLength = DefaultsHelpers::pipelineLength();
85 setPipelineLength(pipelineLength);
86 } else {
87 policy.configureRelayer(*this);
88 }
89
90 // The queries are all the same, so we only have width 1
91 auto numInputTypes = mDistinctRoutesIndex.size();
92 auto& states = services.get<DataProcessingStates>();
93 std::string queries = "";
94 for (short i = 0; i < numInputTypes; ++i) {
95 char buffer[128];
96 assert(mDistinctRoutesIndex[i] < routes.size());
97 mInputs.push_back(routes[mDistinctRoutesIndex[i]].matcher);
98 auto& matcher = routes[mDistinctRoutesIndex[i]].matcher;
99 DataSpecUtils::describe(buffer, 127, matcher);
100 queries += std::string_view(buffer, strlen(buffer));
101 queries += ";";
102 }
104 states.registerState({.name = "data_queries", .stateId = stateId, .sendInitialValue = true, .defaultEnabled = true});
105 states.updateState(DataProcessingStates::CommandSpec{.id = stateId, .size = (int)queries.size(), .data = queries.data()});
106 states.processCommandQueue();
107}
108
110{
111 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
112 auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
113 return VariableContextHelpers::getTimeslice(variables);
114}
115
116DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<ExpirationHandler> const& expirationHandlers,
117 ServiceRegistryRef services, bool createNew)
118{
119 LOGP(debug, "DataRelayer::processDanglingInputs");
120 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
121 auto& deviceProxy = services.get<FairMQDeviceProxy>();
122
123 ActivityStats activity;
125 if (expirationHandlers.empty()) {
126 LOGP(debug, "DataRelayer::processDanglingInputs: No expiration handlers");
127 return activity;
128 }
129 // Create any slot for the time based fields
130 std::vector<TimesliceSlot> slotsCreatedByHandlers;
131 if (createNew) {
132 LOGP(debug, "Creating new slot");
133 for (auto& handler : expirationHandlers) {
134 LOGP(debug, "handler.creator for {}", handler.name);
135 auto channelIndex = deviceProxy.getInputChannelIndex(handler.routeIndex);
136 slotsCreatedByHandlers.push_back(handler.creator(services, channelIndex));
137 }
138 }
139 // Count how many slots are not invalid
140 auto validSlots = 0;
141 for (auto slot : slotsCreatedByHandlers) {
142 if (slot.index == TimesliceSlot::INVALID) {
143 continue;
144 }
145 validSlots++;
146 }
147 if (validSlots > 0) {
148 activity.newSlots++;
149 LOGP(debug, "DataRelayer::processDanglingInputs: {} slots created by handler", validSlots);
150 } else {
151 LOGP(debug, "DataRelayer::processDanglingInputs: no slots created by handler");
152 }
153 // Outer loop, we process all the records because the fact that the record
154 // expires is independent from having received data for it.
155 int headerPresent = 0;
156 int payloadPresent = 0;
157 int noCheckers = 0;
158 int badSlot = 0;
159 int checkerDenied = 0;
160 for (size_t ti = 0; ti < mTimesliceIndex.size(); ++ti) {
161 TimesliceSlot slot{ti};
162 if (mTimesliceIndex.isValid(slot) == false) {
163 continue;
164 }
165 assert(mDistinctRoutesIndex.empty() == false);
166 auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
167 auto timestamp = VariableContextHelpers::getTimeslice(variables);
168 // We iterate on all the hanlders checking if they need to be expired.
169 for (size_t ei = 0; ei < expirationHandlers.size(); ++ei) {
170 auto& expirator = expirationHandlers[ei];
171 // We check that no data is already there for the given cell
172 // it is enough to check the first element
173 auto& part = mCache[ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value];
174 if (part.size() > 0 && part.header(0) != nullptr) {
175 headerPresent++;
176 continue;
177 }
178 if (part.size() > 0 && part.payload(0) != nullptr) {
179 payloadPresent++;
180 continue;
181 }
182 // We check that the cell can actually be expired.
183 if (!expirator.checker) {
184 noCheckers++;
185 continue;
186 }
187 if (slotsCreatedByHandlers[ei] != slot) {
188 badSlot++;
189 continue;
190 }
191
192 auto getPartialRecord = [&cache = mCache, numInputTypes = mDistinctRoutesIndex.size()](int li) -> gsl::span<MessageSet const> {
193 auto offset = li * numInputTypes;
194 assert(cache.size() >= offset + numInputTypes);
195 auto const start = cache.data() + offset;
196 auto const end = cache.data() + offset + numInputTypes;
197 return {start, end};
198 };
199
200 auto partial = getPartialRecord(ti);
201 // TODO: get the data ref from message model
202 auto getter = [&partial](size_t idx, size_t part) {
203 if (partial[idx].size() > 0 && partial[idx].header(part).get()) {
204 auto header = partial[idx].header(part).get();
205 auto payload = partial[idx].payload(part).get();
206 return DataRef{nullptr,
207 reinterpret_cast<const char*>(header->GetData()),
208 reinterpret_cast<char const*>(payload ? payload->GetData() : nullptr),
209 payload ? payload->GetSize() : 0};
210 }
211 return DataRef{};
212 };
213 auto nPartsGetter = [&partial](size_t idx) {
214 return partial[idx].size();
215 };
216#if __has_include(<fairmq/shmem/Message.h>)
217 auto refCountGetter = [&partial](size_t idx) -> int {
218 auto& header = static_cast<const fair::mq::shmem::Message&>(*partial[idx].header(0));
219 return header.GetRefCount();
220 };
221#else
222 std::function<int(size_t)> refCountGetter = nullptr;
223#endif
224 InputSpan span{getter, nPartsGetter, refCountGetter, static_cast<size_t>(partial.size())};
225 // Setup the input span
226
227 if (expirator.checker(services, timestamp.value, span) == false) {
228 checkerDenied++;
229 continue;
230 }
231
232 assert(ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value < mCache.size());
233 assert(expirator.handler);
234 PartRef newRef;
235 expirator.handler(services, newRef, variables);
236 part.reset(std::move(newRef));
237 activity.expiredSlots++;
238
239 mTimesliceIndex.markAsDirty(slot, true);
240 assert(part.header(0) != nullptr);
241 assert(part.payload(0) != nullptr);
242 }
243 }
244 LOGP(debug, "DataRelayer::processDanglingInputs headerPresent:{}, payloadPresent:{}, noCheckers:{}, badSlot:{}, checkerDenied:{}",
245 headerPresent, payloadPresent, noCheckers, badSlot, checkerDenied);
246 return activity;
247}
248
252size_t matchToContext(void const* data,
253 std::vector<DataDescriptorMatcher> const& matchers,
254 std::vector<size_t> const& index,
255 VariableContext& context)
256{
257 for (size_t ri = 0, re = index.size(); ri < re; ++ri) {
258 auto& matcher = matchers[index[ri]];
259
260 if (matcher.match(reinterpret_cast<char const*>(data), context)) {
261 context.commit();
262 return ri;
263 }
264 context.discard();
265 }
266 return INVALID_INPUT;
267}
268
272{
273 static const std::string nullstring{"null"};
274
275 context.publish([](VariableContext const& variables, TimesliceSlot slot, void* context) {
276 auto& states = *static_cast<DataProcessingStates*>(context);
277 static std::string state = "";
278 state.clear();
279 for (size_t i = 0; i < MAX_MATCHING_VARIABLE; ++i) {
280 auto var = variables.get(i);
281 if (auto pval = std::get_if<uint64_t>(&var)) {
282 state += std::to_string(*pval);
283 } else if (auto pval = std::get_if<uint32_t>(&var)) {
284 state += std::to_string(*pval);
285 } else if (auto pval2 = std::get_if<std::string>(&var)) {
286 state += *pval2;
287 } else {
288 }
289 state += ";";
290 }
291 states.updateState({.id = short((int)ProcessingStateId::CONTEXT_VARIABLES_BASE + slot.index),
292 .size = (int)state.size(),
293 .data = state.data()});
294 },
295 &states, slot);
296}
297
299{
300 auto newOldest = mTimesliceIndex.setOldestPossibleInput(proposed, channel);
301 LOGP(debug, "DataRelayer::setOldestPossibleInput {} from channel {}", newOldest.timeslice.value, newOldest.channel.value);
302 static bool dontDrop = getenv("DPL_DONT_DROP_OLD_TIMESLICE") && atoi(getenv("DPL_DONT_DROP_OLD_TIMESLICE"));
303 if (dontDrop) {
304 return;
305 }
306 for (size_t si = 0; si < mCache.size() / mInputs.size(); ++si) {
307 auto& variables = mTimesliceIndex.getVariablesForSlot({si});
308 auto timestamp = VariableContextHelpers::getTimeslice(variables);
309 auto valid = mTimesliceIndex.validateSlot({si}, newOldest.timeslice);
310 if (valid) {
311 if (mTimesliceIndex.isValid({si})) {
312 LOGP(debug, "Keeping slot {} because data has timestamp {} while oldest possible timestamp is {}", si, timestamp.value, newOldest.timeslice.value);
313 }
314 continue;
315 }
316 mPruneOps.push_back(PruneOp{si});
317 bool didDrop = false;
318 for (size_t mi = 0; mi < mInputs.size(); ++mi) {
319 auto& input = mInputs[mi];
320 auto& element = mCache[si * mInputs.size() + mi];
321 if (element.size() != 0) {
322 if (input.lifetime != Lifetime::Condition && mCompletionPolicy.name != "internal-dpl-injected-dummy-sink") {
323 didDrop = true;
324 auto& state = mContext.get<DeviceState>();
326 LOGP(warning, "Stop transition requested. Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it will never be completed.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
327 } else {
328 LOGP(error, "Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it can never be completed.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
329 }
330 } else {
331 LOGP(debug,
332 "Silently dropping data {} in pipeline slot {} because it has timeslice {} < {} after receiving data from channel {}."
333 "Because Lifetime::Timeframe data not there and not expected (e.g. due to sampling) we drop non sampled, non timeframe data (e.g. Conditions).",
334 DataSpecUtils::describe(input), si, timestamp.value, newOldest.timeslice.value,
335 mTimesliceIndex.getChannelInfo(channel).channel->GetName());
336 }
337 }
338 }
339 // We did drop some data. Let's print what was missing.
340 if (didDrop) {
341 for (size_t mi = 0; mi < mInputs.size(); ++mi) {
342 auto& input = mInputs[mi];
343 if (input.lifetime == Lifetime::Timer) {
344 continue;
345 }
346 auto& element = mCache[si * mInputs.size() + mi];
347 if (element.size() == 0) {
348 auto& state = mContext.get<DeviceState>();
350 LOGP(warning, "Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
351 } else {
352 LOGP(error, "Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
353 }
354 }
355 }
356 }
357 }
358}
359
364
366{
367 for (auto& op : mPruneOps) {
368 this->pruneCache(op.slot, onDrop);
369 }
370 mPruneOps.clear();
371}
372
374{
375 // We need to prune the cache from the old stuff, if any. Otherwise we
376 // simply store the payload in the cache and we mark relevant bit in the
377 // hence the first if.
378 auto pruneCache = [&onDrop,
379 &cache = mCache,
380 &cachedStateMetrics = mCachedStateMetrics,
381 numInputTypes = mDistinctRoutesIndex.size(),
382 &index = mTimesliceIndex,
383 ref = mContext](TimesliceSlot slot) {
384 if (onDrop) {
385 auto oldestPossibleTimeslice = index.getOldestPossibleOutput();
386 // State of the computation
387 std::vector<MessageSet> dropped(numInputTypes);
388 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
389 auto cacheId = slot.index * numInputTypes + ai;
390 cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
391 // TODO: in the original implementation of the cache, there have been only two messages per entry,
392 // check if the 2 above corresponds to the number of messages.
393 if (cache[cacheId].size() > 0) {
394 dropped[ai] = std::move(cache[cacheId]);
395 }
396 }
397 bool anyDropped = std::any_of(dropped.begin(), dropped.end(), [](auto& m) { return m.size(); });
398 if (anyDropped) {
399 O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
400 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "pruneCache", "Dropping stuff from slot %zu with timeslice %zu", slot.index, oldestPossibleTimeslice.timeslice.value);
401 onDrop(slot, dropped, oldestPossibleTimeslice);
402 }
403 }
404 assert(cache.empty() == false);
405 assert(index.size() * numInputTypes == cache.size());
406 // Prune old stuff from the cache, hopefully deleting it...
407 // We set the current slot to the timeslice value, so that old stuff
408 // will be ignored.
409 assert(numInputTypes * slot.index < cache.size());
410 for (size_t ai = slot.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
411 cache[ai].clear();
412 cachedStateMetrics[ai] = CacheEntryStatus::EMPTY;
413 }
414 };
415
416 pruneCache(slot);
417}
418
419bool isCalibrationData(std::unique_ptr<fair::mq::Message>& first)
420{
421 auto* dh = o2::header::get<DataHeader*>(first->GetData());
422 return dh->flagsDerivedHeader & DataProcessingHeader::KEEP_AT_EOS_FLAG;
423}
424
426 DataRelayer::relay(void const* rawHeader,
427 std::unique_ptr<fair::mq::Message>* messages,
428 InputInfo const& info,
429 size_t nMessages,
430 size_t nPayloads,
431 std::function<void(TimesliceSlot, std::vector<MessageSet>&, TimesliceIndex::OldestOutputInfo)> onDrop)
432{
433 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
434 DataProcessingHeader const* dph = o2::header::get<DataProcessingHeader*>(rawHeader);
435 // IMPLEMENTATION DETAILS
436 //
437 // This returns true if a given slot is available for the current number of lanes
438 auto isSlotInLane = [currentLane = dph->startTime, maxLanes = mMaxLanes](TimesliceSlot slot) {
439 return (slot.index % maxLanes) == (currentLane % maxLanes);
440 };
441 // This returns the identifier for the given input. We use a separate
442 // function because while it's trivial now, the actual matchmaking will
443 // become more complicated when we will start supporting ranges.
444 auto getInputTimeslice = [&matchers = mInputMatchers,
445 &distinctRoutes = mDistinctRoutesIndex,
446 &rawHeader,
447 &index = mTimesliceIndex](VariableContext& context)
448 -> std::tuple<int, TimesliceId> {
451 auto input = matchToContext(rawHeader, matchers, distinctRoutes, context);
452
453 if (input == INVALID_INPUT) {
454 return {
457 };
458 }
461 if (auto pval = std::get_if<uint64_t>(&context.get(0))) {
462 TimesliceId timeslice{*pval};
463 return {input, timeslice};
464 }
465 // If we get here it means we need to push something out of the cache.
466 return {
469 };
470 };
471
472 // Actually save the header / payload in the slot
473 auto saveInSlot = [&cachedStateMetrics = mCachedStateMetrics,
474 &messages,
475 &nMessages,
476 &nPayloads,
477 &cache = mCache,
478 &services = mContext,
479 numInputTypes = mDistinctRoutesIndex.size()](TimesliceId timeslice, int input, TimesliceSlot slot, InputInfo const& info) -> size_t {
480 O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
481 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "saveInSlot", "saving %{public}s@%zu in slot %zu from %{public}s",
482 fmt::format("{:x}", *o2::header::get<DataHeader*>(messages[0]->GetData())).c_str(),
483 timeslice.value, slot.index,
484 info.index.value == ChannelIndex::INVALID ? "invalid" : services.get<FairMQDeviceProxy>().getInputChannel(info.index)->GetName().c_str());
485 auto cacheIdx = numInputTypes * slot.index + input;
486 MessageSet& target = cache[cacheIdx];
487 cachedStateMetrics[cacheIdx] = CacheEntryStatus::PENDING;
488 // TODO: make sure that multiple parts can only be added within the same call of
489 // DataRelayer::relay
490 assert(nPayloads > 0);
491 size_t saved = 0;
492 for (size_t mi = 0; mi < nMessages; ++mi) {
493 assert(mi + nPayloads < nMessages);
494 // We are in calibration mode and the data does not have the calibration bit set.
495 // We do not store it.
497 O2_SIGNPOST_ID_FROM_POINTER(cid, calibration, &services.get<DataProcessorContext>());
498 O2_SIGNPOST_EVENT_EMIT(calibration, cid, "calibration",
499 "Dropping incoming %zu messages because they are data processing.", nPayloads);
500 // Actually dropping messages.
501 for (size_t i = mi; i < mi + nPayloads + 1; i++) {
502 auto discard = std::move(messages[i]);
503 }
504 mi += nPayloads;
505 continue;
506 }
507 target.add([&messages, &mi](size_t i) -> fair::mq::MessagePtr& { return messages[mi + i]; }, nPayloads + 1);
508 mi += nPayloads;
509 saved += nPayloads;
510 }
511 return saved;
512 };
513
514 auto updateStatistics = [ref = mContext](TimesliceIndex::ActionTaken action) {
515 auto& stats = ref.get<DataProcessingStats>();
516
517 // Update statistics for what happened
518 switch (action) {
520 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
521 break;
523 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
524 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), DataProcessingStats::Op::Add, (int)1});
525 break;
527 stats.updateStats({static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), DataProcessingStats::Op::Add, (int)1});
528 break;
530 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
531 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), DataProcessingStats::Op::Add, (int)1});
532 break;
534 break;
535 }
536 };
537
538 // OUTER LOOP
539 //
540 // This is the actual outer loop processing input as part of a given
541 // timeslice. All the other implementation details are hidden by the lambdas
542 auto input = INVALID_INPUT;
543 auto timeslice = TimesliceId{TimesliceId::INVALID};
545 auto& index = mTimesliceIndex;
546
547 bool needsCleaning = false;
548 // First look for matching slots which already have some
549 // partial match.
550 for (size_t ci = 0; ci < index.size(); ++ci) {
551 slot = TimesliceSlot{ci};
552 if (!isSlotInLane(slot)) {
553 continue;
554 }
555 if (index.isValid(slot) == false) {
556 continue;
557 }
558 std::tie(input, timeslice) = getInputTimeslice(index.getVariablesForSlot(slot));
559 if (input != INVALID_INPUT) {
560 break;
561 }
562 }
563
564 // If we did not find anything, look for slots which
565 // are invalid.
566 if (input == INVALID_INPUT) {
567 for (size_t ci = 0; ci < index.size(); ++ci) {
568 slot = TimesliceSlot{ci};
569 if (index.isValid(slot) == true) {
570 continue;
571 }
572 if (!isSlotInLane(slot)) {
573 continue;
574 }
575 std::tie(input, timeslice) = getInputTimeslice(index.getVariablesForSlot(slot));
576 if (input != INVALID_INPUT) {
577 needsCleaning = true;
578 break;
579 }
580 }
581 }
582
583 auto& stats = mContext.get<DataProcessingStats>();
585 if (input != INVALID_INPUT && TimesliceId::isValid(timeslice) && TimesliceSlot::isValid(slot)) {
586 if (needsCleaning) {
587 this->pruneCache(slot, onDrop);
588 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](const auto& x) { return x.slot == slot; }), mPruneOps.end());
589 }
590 size_t saved = saveInSlot(timeslice, input, slot, info);
591 if (saved == 0) {
592 return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
593 }
594 index.publishSlot(slot);
595 index.markAsDirty(slot, true);
596 stats.updateStats({static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), DataProcessingStats::Op::Add, (int)1});
597 return RelayChoice{.type = RelayChoice::Type::WillRelay, .timeslice = timeslice};
598 }
599
602 VariableContext pristineContext;
603 std::tie(input, timeslice) = getInputTimeslice(pristineContext);
604
605 auto DataHeaderInfo = [&rawHeader]() {
606 std::string error;
607 // extract header from message model
608 const auto* dh = o2::header::get<o2::header::DataHeader*>(rawHeader);
609 if (dh) {
610 error += fmt::format("{}/{}/{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification);
611 } else {
612 error += "invalid header";
613 }
614 return error;
615 };
616
617 if (input == INVALID_INPUT) {
618 LOG(error) << "Could not match incoming data to any input route: " << DataHeaderInfo();
619 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
620 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
621 for (size_t pi = 0; pi < nMessages; ++pi) {
622 messages[pi].reset(nullptr);
623 }
624 return RelayChoice{.type = RelayChoice::Type::Invalid, .timeslice = timeslice};
625 }
626
627 if (TimesliceId::isValid(timeslice) == false) {
628 LOG(error) << "Could not determine the timeslice for input: " << DataHeaderInfo();
629 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
630 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
631 for (size_t pi = 0; pi < nMessages; ++pi) {
632 messages[pi].reset(nullptr);
633 }
634 return RelayChoice{.type = RelayChoice::Type::Invalid, .timeslice = timeslice};
635 }
636
637 O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
639 std::tie(action, slot) = index.replaceLRUWith(pristineContext, timeslice);
640 uint64_t const* debugTimestamp = std::get_if<uint64_t>(&pristineContext.get(0));
641 if (action != TimesliceIndex::ActionTaken::Wait) {
642 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "saveInSlot",
643 "Slot %zu updated with %zu using action %d, %" PRIu64, slot.index, timeslice.value, (int)action, *debugTimestamp);
644 }
645
646 updateStatistics(action);
647
648 switch (action) {
650 return RelayChoice{.type = RelayChoice::Type::Backpressured, .timeslice = timeslice};
652 static std::atomic<size_t> obsoleteCount = 0;
653 static std::atomic<size_t> mult = 1;
654 if ((obsoleteCount++ % (1 * mult)) == 0) {
655 LOGP(warning, "Over {} incoming messages are already obsolete, not relaying.", obsoleteCount.load());
656 if (obsoleteCount > mult * 10) {
657 mult = mult * 10;
658 }
659 }
660 return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
662 LOG(warning) << "Incoming data is invalid, not relaying.";
663 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
664 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
665 for (size_t pi = 0; pi < nMessages; ++pi) {
666 messages[pi].reset(nullptr);
667 }
668 return RelayChoice{.type = RelayChoice::Type::Invalid, .timeslice = timeslice};
671 // At this point the variables match the new input but the
672 // cache still holds the old data, so we prune it.
673 this->pruneCache(slot, onDrop);
674 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](const auto& x) { return x.slot == slot; }), mPruneOps.end());
675 size_t saved = saveInSlot(timeslice, input, slot, info);
676 if (saved == 0) {
677 return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
678 }
679 index.publishSlot(slot);
680 index.markAsDirty(slot, true);
682 }
684}
685
686void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& completed)
687{
688 LOGP(debug, "DataRelayer::getReadyToProcess");
689 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
690
691 // THE STATE
692 const auto& cache = mCache;
693 const auto numInputTypes = mDistinctRoutesIndex.size();
694 //
695 // THE IMPLEMENTATION DETAILS
696 //
697 // We use this to bail out early from the check as soon as we find something
698 // which we know is not complete.
699 auto getPartialRecord = [&cache, &numInputTypes](int li) -> gsl::span<MessageSet const> {
700 auto offset = li * numInputTypes;
701 assert(cache.size() >= offset + numInputTypes);
702 auto const start = cache.data() + offset;
703 auto const end = cache.data() + offset + numInputTypes;
704 return {start, end};
705 };
706
707 // These two are trivial, but in principle the whole loop could be parallelised
708 // or vectorised so "completed" could be a thread local variable which needs
709 // merging at the end.
710 auto updateCompletionResults = [&completed](TimesliceSlot li, uint64_t const* timeslice, CompletionPolicy::CompletionOp op) {
711 if (timeslice) {
712 LOGP(debug, "Doing action {} for slot {} (timeslice: {})", (int)op, li.index, *timeslice);
713 completed.emplace_back(RecordAction{li, {*timeslice}, op});
714 } else {
715 LOGP(debug, "No timeslice associated with slot ", li.index);
716 }
717 };
718
719 // THE OUTER LOOP
720 //
721 // To determine if a line is complete, we iterate on all the arguments
722 // and check if they are ready. We do it this way, because in the end
723 // the number of inputs is going to be small and having a more complex
724 // structure will probably result in a larger footprint in any case.
725 // Also notice that ai == inputsNumber only when we reach the end of the
726 // iteration, that means we have found all the required bits.
727 //
728 // Notice that the only time numInputTypes is 0 is when we are a dummy
729 // device created as a source for timers / conditions.
730 if (numInputTypes == 0) {
731 LOGP(debug, "numInputTypes == 0, returning.");
732 return;
733 }
734 size_t cacheLines = cache.size() / numInputTypes;
735 assert(cacheLines * numInputTypes == cache.size());
736 int countConsume = 0;
737 int countConsumeExisting = 0;
738 int countProcess = 0;
739 int countDiscard = 0;
740 int countWait = 0;
741 int notDirty = 0;
742
743 for (int li = cacheLines - 1; li >= 0; --li) {
744 TimesliceSlot slot{(size_t)li};
745 // We only check the cachelines which have been updated by an incoming
746 // message.
747 if (mTimesliceIndex.isDirty(slot) == false) {
748 notDirty++;
749 continue;
750 }
751 if (!mCompletionPolicy.callbackFull) {
752 throw runtime_error_f("Completion police %s has no callback set", mCompletionPolicy.name.c_str());
753 }
754 auto partial = getPartialRecord(li);
755 // TODO: get the data ref from message model
756 auto getter = [&partial](size_t idx, size_t part) {
757 if (partial[idx].size() > 0 && partial[idx].header(part).get()) {
758 auto header = partial[idx].header(part).get();
759 auto payload = partial[idx].payload(part).get();
760 return DataRef{nullptr,
761 reinterpret_cast<const char*>(header->GetData()),
762 reinterpret_cast<char const*>(payload ? payload->GetData() : nullptr),
763 payload ? payload->GetSize() : 0};
764 }
765 return DataRef{};
766 };
767 auto nPartsGetter = [&partial](size_t idx) {
768 return partial[idx].size();
769 };
770#if __has_include(<fairmq/shmem/Message.h>)
771 auto refCountGetter = [&partial](size_t idx) -> int {
772 auto& header = static_cast<const fair::mq::shmem::Message&>(*partial[idx].header(0));
773 return header.GetRefCount();
774 };
775#else
776 std::function<int(size_t)> refCountGetter = nullptr;
777#endif
778 InputSpan span{getter, nPartsGetter, refCountGetter, static_cast<size_t>(partial.size())};
779 CompletionPolicy::CompletionOp action = mCompletionPolicy.callbackFull(span, mInputs, mContext);
780
781 auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
782 auto timeslice = std::get_if<uint64_t>(&variables.get(0));
783 switch (action) {
785 countConsume++;
786 updateCompletionResults(slot, timeslice, action);
787 mTimesliceIndex.markAsDirty(slot, false);
788 break;
790 // This is just like Consume, but we also mark all slots as dirty
791 countConsume++;
793 updateCompletionResults(slot, timeslice, action);
794 mTimesliceIndex.rescan();
795 break;
797 countConsumeExisting++;
798 updateCompletionResults(slot, timeslice, action);
799 mTimesliceIndex.markAsDirty(slot, false);
800 break;
802 countProcess++;
803 updateCompletionResults(slot, timeslice, action);
804 mTimesliceIndex.markAsDirty(slot, false);
805 break;
807 countDiscard++;
808 updateCompletionResults(slot, timeslice, action);
809 mTimesliceIndex.markAsDirty(slot, false);
810 break;
812 countWait++;
813 mTimesliceIndex.markAsDirty(slot, true);
815 break;
817 countWait++;
818 mTimesliceIndex.markAsDirty(slot, false);
819 break;
820 }
821 }
822 mTimesliceIndex.updateOldestPossibleOutput(false);
823 LOGP(debug, "DataRelayer::getReadyToProcess results notDirty:{}, consume:{}, consumeExisting:{}, process:{}, discard:{}, wait:{}",
824 notDirty, countConsume, countConsumeExisting, countProcess,
825 countDiscard, countWait);
826}
827
829{
830 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
831 const auto numInputTypes = mDistinctRoutesIndex.size();
832
833 auto markInputDone = [&cachedStateMetrics = mCachedStateMetrics,
834 &numInputTypes](TimesliceSlot s, size_t arg, CacheEntryStatus oldStatus, CacheEntryStatus newStatus) {
835 auto cacheId = s.index * numInputTypes + arg;
836 if (cachedStateMetrics[cacheId] == oldStatus) {
837 cachedStateMetrics[cacheId] = newStatus;
838 }
839 };
840
841 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
842 markInputDone(slot, ai, oldStatus, newStatus);
843 }
844}
845
846std::vector<o2::framework::MessageSet> DataRelayer::consumeAllInputsForTimeslice(TimesliceSlot slot)
847{
848 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
849
850 const auto numInputTypes = mDistinctRoutesIndex.size();
851 // State of the computation
852 std::vector<MessageSet> messages(numInputTypes);
853 auto& cache = mCache;
854 auto& index = mTimesliceIndex;
855
856 // Nothing to see here, this is just to make the outer loop more understandable.
857 auto jumpToCacheEntryAssociatedWith = [](TimesliceSlot) {
858 return;
859 };
860
861 // We move ownership so that the cache can be reused once the computation is
862 // finished. We mark the given cache slot invalid, so that it can be reused
863 // This means we can still handle old messages if there is still space in the
864 // cache where to put them.
865 auto moveHeaderPayloadToOutput = [&messages,
866 &cachedStateMetrics = mCachedStateMetrics,
867 &cache, &index, &numInputTypes](TimesliceSlot s, size_t arg) {
868 auto cacheId = s.index * numInputTypes + arg;
869 cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
870 // TODO: in the original implementation of the cache, there have been only two messages per entry,
871 // check if the 2 above corresponds to the number of messages.
872 if (cache[cacheId].size() > 0) {
873 messages[arg] = std::move(cache[cacheId]);
874 }
875 index.markAsInvalid(s);
876 };
877
878 // An invalid set of arguments is a set of arguments associated to an invalid
879 // timeslice, so I can simply do that. I keep the assertion there because in principle
880 // we should have dispatched the timeslice already!
881 // FIXME: what happens when we have enough timeslices to hit the invalid one?
882 auto invalidateCacheFor = [&numInputTypes, &index, &cache](TimesliceSlot s) {
883 for (size_t ai = s.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
884 assert(std::accumulate(cache[ai].messages.begin(), cache[ai].messages.end(), true, [](bool result, auto const& element) { return result && element.get() == nullptr; }));
885 cache[ai].clear();
886 }
887 index.markAsInvalid(s);
888 };
889
890 // Outer loop here.
891 jumpToCacheEntryAssociatedWith(slot);
892 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
893 moveHeaderPayloadToOutput(slot, ai);
894 }
895 invalidateCacheFor(slot);
896
897 return messages;
898}
899
900std::vector<o2::framework::MessageSet> DataRelayer::consumeExistingInputsForTimeslice(TimesliceSlot slot)
901{
902 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
903
904 const auto numInputTypes = mDistinctRoutesIndex.size();
905 // State of the computation
906 std::vector<MessageSet> messages(numInputTypes);
907 auto& cache = mCache;
908 auto& index = mTimesliceIndex;
909
910 // Nothing to see here, this is just to make the outer loop more understandable.
911 auto jumpToCacheEntryAssociatedWith = [](TimesliceSlot) {
912 return;
913 };
914
915 // We move ownership so that the cache can be reused once the computation is
916 // finished. We mark the given cache slot invalid, so that it can be reused
917 // This means we can still handle old messages if there is still space in the
918 // cache where to put them.
919 auto copyHeaderPayloadToOutput = [&messages,
920 &cachedStateMetrics = mCachedStateMetrics,
921 &cache, &index, &numInputTypes](TimesliceSlot s, size_t arg) {
922 auto cacheId = s.index * numInputTypes + arg;
923 cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
924 // TODO: in the original implementation of the cache, there have been only two messages per entry,
925 // check if the 2 above corresponds to the number of messages.
926 for (size_t pi = 0; pi < cache[cacheId].size(); pi++) {
927 auto& header = cache[cacheId].header(pi);
928 auto&& newHeader = header->GetTransport()->CreateMessage();
929 newHeader->Copy(*header);
930 messages[arg].add(PartRef{std::move(newHeader), std::move(cache[cacheId].payload(pi))});
931 }
932 };
933
934 // Outer loop here.
935 jumpToCacheEntryAssociatedWith(slot);
936 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
937 copyHeaderPayloadToOutput(slot, ai);
938 }
939
940 return std::move(messages);
941}
942
944{
945 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
946
947 for (auto& cache : mCache) {
948 cache.clear();
949 }
950 for (size_t s = 0; s < mTimesliceIndex.size(); ++s) {
951 mTimesliceIndex.markAsInvalid(TimesliceSlot{s});
952 }
953}
954
955size_t
957{
958 return mCache.size() / mDistinctRoutesIndex.size();
959}
960
966{
967 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
968
969 mTimesliceIndex.resize(s);
970 mVariableContextes.resize(s);
972}
973
975{
976 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
977
978 auto numInputTypes = mDistinctRoutesIndex.size();
979 // FIXME: many of the DataRelayer function rely on allocated cache, so its
980 // maybe misleading to have the allocation in a function primarily for
981 // metrics publishing, do better in setPipelineLength?
982 mCache.resize(numInputTypes * mTimesliceIndex.size());
983 auto& states = mContext.get<DataProcessingStates>();
984
985 mCachedStateMetrics.resize(mCache.size());
986
987 // There is maximum 16 variables available. We keep them row-wise so that
988 // that we can take mod 16 of the index to understand which variable we
989 // are talking about.
990 for (size_t i = 0; i < mVariableContextes.size(); ++i) {
992 .name = fmt::format("matcher_variables/{}", i),
993 .stateId = static_cast<short>((short)(ProcessingStateId::CONTEXT_VARIABLES_BASE) + i),
994 .minPublishInterval = 500, // if we publish too often we flood the GUI and we are not able to read it in any case
995 .sendInitialValue = true,
996 .defaultEnabled = mContext.get<DriverConfig const>().driverHasGUI,
997 });
998 }
999
1000 for (int ci = 0; ci < mTimesliceIndex.size(); ci++) {
1002 .name = fmt::format("data_relayer/{}", ci),
1003 .stateId = static_cast<short>((short)(ProcessingStateId::DATA_RELAYER_BASE) + (short)ci),
1004 .minPublishInterval = 800, // if we publish too often we flood the GUI and we are not able to read it in any case
1005 .sendInitialValue = true,
1006 .defaultEnabled = mContext.get<DriverConfig const>().driverHasGUI,
1007 });
1008 }
1009}
1010
1012{
1013 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1015}
1016
1018{
1019 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1021}
1022
1024{
1025 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1026 return VariableContextHelpers::getRunNumber(mTimesliceIndex.getVariablesForSlot(slot));
1027}
1028
1030{
1031 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1033}
1034
1036{
1037 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1038 auto& states = mContext.get<DataProcessingStates>();
1039 for (size_t ci = 0; ci < mTimesliceIndex.size(); ++ci) {
1040 auto slot = TimesliceSlot{ci};
1041 sendVariableContextMetrics(mTimesliceIndex.getPublishedVariablesForSlot(slot), slot,
1042 states);
1043 }
1044 char relayerSlotState[1024];
1045 // The number of timeslices is encoded in each state
1046 // We serialise the state of a Timeslot in a given state.
1047 int written = snprintf(relayerSlotState, 1024, "%d ", (int)mTimesliceIndex.size());
1048 char* buffer = relayerSlotState + written;
1049 for (size_t ci = 0; ci < mTimesliceIndex.size(); ++ci) {
1050 for (size_t si = 0; si < mDistinctRoutesIndex.size(); ++si) {
1051 int index = si * mTimesliceIndex.size() + ci;
1052 int value = static_cast<int>(mCachedStateMetrics[index]);
1053 buffer[si] = value + '0';
1054 // Anything which is done is actually already empty,
1055 // so after we report it we mark it as such.
1056 if (mCachedStateMetrics[index] == CacheEntryStatus::DONE) {
1057 mCachedStateMetrics[index] = CacheEntryStatus::EMPTY;
1058 }
1059 }
1060 buffer[mDistinctRoutesIndex.size()] = '\0';
1061 auto size = (int)(buffer - relayerSlotState + mDistinctRoutesIndex.size());
1062 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + ci), .size = size, .data = relayerSlotState});
1063 }
1064}
1065
1066} // namespace o2::framework
benchmark::State & state
#define O2_BUILTIN_UNREACHABLE
o2::monitoring::Verbosity Verbosity
int32_t i
uint32_t op
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:483
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:499
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:500
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:516
#define O2_LOCKABLE(T)
Definition Tracing.h:20
std::ostringstream debug
std::vector< MessageSet > consumeExistingInputsForTimeslice(TimesliceSlot id)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
std::function< void(TimesliceSlot, std::vector< MessageSet > &, TimesliceIndex::OldestOutputInfo info)> OnDropCallback
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
void getReadyToProcess(std::vector< RecordAction > &completed)
void setPipelineLength(size_t s)
Tune the maximum number of in flight timeslices this can handle.
size_t getParallelTimeslices() const
Returns how many timeslices we can handle in parallel.
std::vector< MessageSet > consumeAllInputsForTimeslice(TimesliceSlot id)
void pruneCache(TimesliceSlot slot, OnDropCallback onDrop=nullptr)
Prune the cache for a given slot.
RelayChoice relay(void const *rawHeader, std::unique_ptr< fair::mq::Message > *messages, InputInfo const &info, size_t nMessages, size_t nPayloads=1, OnDropCallback onDrop=nullptr)
DataRelayer(CompletionPolicy const &, std::vector< InputRoute > const &routes, TimesliceIndex &, ServiceRegistryRef)
void setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
void sendContextState()
Send metrics with the VariableContext information.
TimesliceId getTimesliceForSlot(TimesliceSlot slot)
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
TimesliceIndex::OldestOutputInfo getOldestPossibleOutput() const
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
void clear()
Remove all pending messages.
void markAsDirty(TimesliceSlot slot, bool value)
data_matcher::VariableContext & getPublishedVariablesForSlot(TimesliceSlot slot)
OldestInputInfo setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
OldestOutputInfo getOldestPossibleOutput() const
bool isDirty(TimesliceSlot const &slot) const
InputChannelInfo const & getChannelInfo(ChannelIndex channel) const
ActionTaken
The outcome for the processing of a given timeslot.
@ Wait
An obsolete slot is used to hold the new context and the old one is dropped.
@ DropObsolete
An invalid context is not inserted in the index and dropped.
@ DropInvalid
We wait for the oldest slot to complete.
@ ReplaceObsolete
An unused / invalid slot is used to hold the new context.
void rescan()
Mark all the cachelines as invalid, e.g. due to an out of band event.
bool validateSlot(TimesliceSlot slot, TimesliceId currentOldest)
bool isValid(TimesliceSlot const &slot) const
void markAsInvalid(TimesliceSlot slot)
OldestOutputInfo updateOldestPossibleOutput(bool rewinded)
data_matcher::VariableContext & getVariablesForSlot(TimesliceSlot slot)
void publish(void(*callback)(VariableContext const &, TimesliceSlot slot, void *context), void *context, TimesliceSlot slot)
ContextElement::Value const & get(size_t pos) const
GLint GLenum GLint x
Definition glcorearb.h:403
const GLfloat * m
Definition glcorearb.h:4066
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLuint index
Definition glcorearb.h:781
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum target
Definition glcorearb.h:1641
GLboolean * data
Definition glcorearb.h:298
GLintptr offset
Definition glcorearb.h:660
GLuint start
Definition glcorearb.h:469
GLuint * states
Definition glcorearb.h:4932
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
size_t matchToContext(void const *data, std::vector< DataDescriptorMatcher > const &matchers, std::vector< size_t > const &index, VariableContext &context)
constexpr int INVALID_INPUT
bool isCalibrationData(std::unique_ptr< fair::mq::Message > &first)
void sendVariableContextMetrics(VariableContext &context, TimesliceSlot slot, DataProcessingStates &states)
@ NoTransition
No pending transitions.
RuntimeErrorRef runtime_error_f(const char *,...)
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static constexpr int INVALID
std::string name
Name of the policy itself.
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
CallbackConfigureRelayer configureRelayer
CallbackFull callbackFull
Actual policy which decides what to do with a partial InputRecord, extended version.
static constexpr int32_t KEEP_AT_EOS_FLAG
Helper struct to hold statistics about the data processing happening.
@ Add
Update the rate of the metric given the amount since the last time.
Type type
What was the outcome of the relay operation.
Definition DataRelayer.h:64
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static std::string describe(InputSpec const &spec)
static unsigned int pipelineLength()
get max number of timeslices in the queue
static bool onlineDeploymentMode()
@true if running online
Running state information of a given device.
Definition DeviceState.h:34
ProcessingType allowedProcessing
Definition DeviceState.h:79
fair::mq::Channel * channel
Definition ChannelInfo.h:51
Reference to an inflight part.
Definition PartRef.h:24
static bool isValid(TimesliceId const &timeslice)
static constexpr uint64_t INVALID
static bool isValid(TimesliceSlot const &slot)
static constexpr uint64_t INVALID
static uint32_t getRunNumber(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFCounter(data_matcher::VariableContext const &variables)
static uint64_t getCreationTime(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFOrbit(data_matcher::VariableContext const &variables)
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
the main header struct
Definition DataHeader.h:618
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"