Project
Loading...
Searching...
No Matches
DataRelayer.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
16
22#include "Framework/DataRef.h"
24#include "Framework/InputSpan.h"
26#include "Framework/Logger.h"
27#include "Framework/PartRef.h"
33#include "DataRelayerHelpers.h"
34#include "InputRouteHelpers.h"
40
43
44#include <Monitoring/Metric.h>
45#include <Monitoring/Monitoring.h>
46
47#include <fairlogger/Logger.h>
48#include <fairmq/Channel.h>
49#include <functional>
50#include <fairmq/shmem/Message.h>
51#include <fmt/format.h>
52#include <fmt/ostream.h>
53#include <span>
54#include <string>
55
56using namespace o2::framework::data_matcher;
59using Verbosity = o2::monitoring::Verbosity;
60
62// Stream which keeps track of the calibration lifetime logic
64
65namespace o2::framework
66{
67
68constexpr int INVALID_INPUT = -1;
69
71 std::vector<InputRoute> const& routes,
73 ServiceRegistryRef services)
74 : mContext{services},
75 mTimesliceIndex{index},
76 mCompletionPolicy{policy},
77 mDistinctRoutesIndex{DataRelayerHelpers::createDistinctRouteIndex(routes)},
78 mInputMatchers{DataRelayerHelpers::createInputMatchers(routes)},
79 mMaxLanes{InputRouteHelpers::maxLanes(routes)}
80{
81 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
82
83 if (policy.configureRelayer == nullptr) {
84 static int pipelineLength = DefaultsHelpers::pipelineLength();
85 setPipelineLength(pipelineLength);
86 } else {
87 policy.configureRelayer(*this);
88 }
89
90 // The queries are all the same, so we only have width 1
91 auto numInputTypes = mDistinctRoutesIndex.size();
92 auto& states = services.get<DataProcessingStates>();
93 std::string queries = "";
94 for (short i = 0; i < numInputTypes; ++i) {
95 char buffer[128];
96 assert(mDistinctRoutesIndex[i] < routes.size());
97 mInputs.push_back(routes[mDistinctRoutesIndex[i]].matcher);
98 auto& matcher = routes[mDistinctRoutesIndex[i]].matcher;
99 DataSpecUtils::describe(buffer, 127, matcher);
100 queries += std::string_view(buffer, strlen(buffer));
101 queries += ";";
102 }
103 auto stateId = (short)ProcessingStateId::DATA_QUERIES;
104 states.registerState({.name = "data_queries", .stateId = stateId, .sendInitialValue = true, .defaultEnabled = true});
105 states.updateState(DataProcessingStates::CommandSpec{.id = stateId, .size = (int)queries.size(), .data = queries.data()});
106 states.processCommandQueue();
107}
108
110{
111 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
112 auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
113 return VariableContextHelpers::getTimeslice(variables);
114}
115
116DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<ExpirationHandler> const& expirationHandlers,
117 ServiceRegistryRef services, bool createNew)
118{
119 LOGP(debug, "DataRelayer::processDanglingInputs");
120 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
121 auto& deviceProxy = services.get<FairMQDeviceProxy>();
122
123 ActivityStats activity;
125 if (expirationHandlers.empty()) {
126 LOGP(debug, "DataRelayer::processDanglingInputs: No expiration handlers");
127 return activity;
128 }
129 // Create any slot for the time based fields
130 std::vector<TimesliceSlot> slotsCreatedByHandlers;
131 if (createNew) {
132 LOGP(debug, "Creating new slot");
133 for (auto& handler : expirationHandlers) {
134 LOGP(debug, "handler.creator for {}", handler.name);
135 auto channelIndex = deviceProxy.getInputChannelIndex(handler.routeIndex);
136 slotsCreatedByHandlers.push_back(handler.creator(services, channelIndex));
137 }
138 }
139 // Count how many slots are not invalid
140 auto validSlots = 0;
141 for (auto slot : slotsCreatedByHandlers) {
142 if (slot.index == TimesliceSlot::INVALID) {
143 continue;
144 }
145 validSlots++;
146 }
147 if (validSlots > 0) {
148 activity.newSlots++;
149 LOGP(debug, "DataRelayer::processDanglingInputs: {} slots created by handler", validSlots);
150 } else {
151 LOGP(debug, "DataRelayer::processDanglingInputs: no slots created by handler");
152 }
153 // Outer loop, we process all the records because the fact that the record
154 // expires is independent from having received data for it.
155 int headerPresent = 0;
156 int payloadPresent = 0;
157 int noCheckers = 0;
158 int badSlot = 0;
159 int checkerDenied = 0;
160 for (size_t ti = 0; ti < mTimesliceIndex.size(); ++ti) {
161 TimesliceSlot slot{ti};
162 if (mTimesliceIndex.isValid(slot) == false) {
163 continue;
164 }
165 assert(mDistinctRoutesIndex.empty() == false);
166 auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
167 auto timestamp = VariableContextHelpers::getTimeslice(variables);
168 // We iterate on all the hanlders checking if they need to be expired.
169 for (size_t ei = 0; ei < expirationHandlers.size(); ++ei) {
170 auto& expirator = expirationHandlers[ei];
171 // We check that no data is already there for the given cell
172 // it is enough to check the first element
173 auto& part = mCache[ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value];
174 if (part.size() > 0 && part.header(0) != nullptr) {
175 headerPresent++;
176 continue;
177 }
178 if (part.size() > 0 && part.payload(0) != nullptr) {
179 payloadPresent++;
180 continue;
181 }
182 // We check that the cell can actually be expired.
183 if (!expirator.checker) {
184 noCheckers++;
185 continue;
186 }
187 if (slotsCreatedByHandlers[ei] != slot) {
188 badSlot++;
189 continue;
190 }
191
192 auto getPartialRecord = [&cache = mCache, numInputTypes = mDistinctRoutesIndex.size()](int li) -> std::span<MessageSet const> {
193 auto offset = li * numInputTypes;
194 assert(cache.size() >= offset + numInputTypes);
195 auto const start = cache.data() + offset;
196 auto const end = cache.data() + offset + numInputTypes;
197 return {start, end};
198 };
199
200 auto partial = getPartialRecord(ti);
201 // TODO: get the data ref from message model
202 auto getter = [&partial](size_t idx, size_t part) {
203 if (partial[idx].size() > 0 && partial[idx].header(part).get()) {
204 auto header = partial[idx].header(part).get();
205 auto payload = partial[idx].payload(part).get();
206 return DataRef{nullptr,
207 reinterpret_cast<const char*>(header->GetData()),
208 reinterpret_cast<char const*>(payload ? payload->GetData() : nullptr),
209 payload ? payload->GetSize() : 0};
210 }
211 return DataRef{};
212 };
213 auto nPartsGetter = [&partial](size_t idx) {
214 return partial[idx].size();
215 };
216 auto refCountGetter = [&partial](size_t idx) -> int {
217 auto& header = static_cast<const fair::mq::shmem::Message&>(*partial[idx].header(0));
218 return header.GetRefCount();
219 };
220 InputSpan span{getter, nPartsGetter, refCountGetter, static_cast<size_t>(partial.size())};
221 // Setup the input span
222
223 if (expirator.checker(services, timestamp.value, span) == false) {
224 checkerDenied++;
225 continue;
226 }
227
228 assert(ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value < mCache.size());
229 assert(expirator.handler);
230 PartRef newRef;
231 expirator.handler(services, newRef, variables);
232 part.reset(std::move(newRef));
233 activity.expiredSlots++;
234
235 mTimesliceIndex.markAsDirty(slot, true);
236 assert(part.header(0) != nullptr);
237 assert(part.payload(0) != nullptr);
238 }
239 }
240 LOGP(debug, "DataRelayer::processDanglingInputs headerPresent:{}, payloadPresent:{}, noCheckers:{}, badSlot:{}, checkerDenied:{}",
241 headerPresent, payloadPresent, noCheckers, badSlot, checkerDenied);
242 return activity;
243}
244
248size_t matchToContext(void const* data,
249 std::vector<DataDescriptorMatcher> const& matchers,
250 std::vector<size_t> const& index,
251 VariableContext& context)
252{
253 for (size_t ri = 0, re = index.size(); ri < re; ++ri) {
254 auto& matcher = matchers[index[ri]];
255
256 if (matcher.match(reinterpret_cast<char const*>(data), context)) {
257 context.commit();
258 return ri;
259 }
260 context.discard();
261 }
262 return INVALID_INPUT;
263}
264
268{
269 static const std::string nullstring{"null"};
270
271 context.publish([](VariableContext const& variables, TimesliceSlot slot, void* context) {
272 auto& states = *static_cast<DataProcessingStates*>(context);
273 static std::string state = "";
274 state.clear();
275 for (size_t i = 0; i < MAX_MATCHING_VARIABLE; ++i) {
276 auto var = variables.get(i);
277 if (auto pval = std::get_if<uint64_t>(&var)) {
278 state += std::to_string(*pval);
279 } else if (auto pval = std::get_if<uint32_t>(&var)) {
280 state += std::to_string(*pval);
281 } else if (auto pval2 = std::get_if<std::string>(&var)) {
282 state += *pval2;
283 } else {
284 }
285 state += ";";
286 }
287 states.updateState({.id = short((int)ProcessingStateId::CONTEXT_VARIABLES_BASE + slot.index),
288 .size = (int)state.size(),
289 .data = state.data()});
290 },
291 &states, slot);
292}
293
295{
296 auto newOldest = mTimesliceIndex.setOldestPossibleInput(proposed, channel);
297 LOGP(debug, "DataRelayer::setOldestPossibleInput {} from channel {}", newOldest.timeslice.value, newOldest.channel.value);
298 static bool dontDrop = getenv("DPL_DONT_DROP_OLD_TIMESLICE") && atoi(getenv("DPL_DONT_DROP_OLD_TIMESLICE"));
299 if (dontDrop) {
300 return;
301 }
302 for (size_t si = 0; si < mCache.size() / mInputs.size(); ++si) {
303 auto& variables = mTimesliceIndex.getVariablesForSlot({si});
304 auto timestamp = VariableContextHelpers::getTimeslice(variables);
305 auto valid = mTimesliceIndex.validateSlot({si}, newOldest.timeslice);
306 if (valid) {
307 if (mTimesliceIndex.isValid({si})) {
308 LOGP(debug, "Keeping slot {} because data has timestamp {} while oldest possible timestamp is {}", si, timestamp.value, newOldest.timeslice.value);
309 }
310 continue;
311 }
312 mPruneOps.push_back(PruneOp{si});
313 bool didDrop = false;
314 for (size_t mi = 0; mi < mInputs.size(); ++mi) {
315 auto& input = mInputs[mi];
316 auto& element = mCache[si * mInputs.size() + mi];
317 if (element.size() != 0) {
318 if (input.lifetime != Lifetime::Condition && mCompletionPolicy.name != "internal-dpl-injected-dummy-sink") {
319 didDrop = true;
320 auto& state = mContext.get<DeviceState>();
322 LOGP(warning, "Stop transition requested. Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it will never be completed.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
323 } else {
324 LOGP(error, "Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it can never be completed.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
325 }
326 } else {
327 LOGP(debug,
328 "Silently dropping data {} in pipeline slot {} because it has timeslice {} < {} after receiving data from channel {}."
329 "Because Lifetime::Timeframe data not there and not expected (e.g. due to sampling) we drop non sampled, non timeframe data (e.g. Conditions).",
330 DataSpecUtils::describe(input), si, timestamp.value, newOldest.timeslice.value,
331 mTimesliceIndex.getChannelInfo(channel).channel->GetName());
332 }
333 }
334 }
335 // We did drop some data. Let's print what was missing.
336 if (didDrop) {
337 for (size_t mi = 0; mi < mInputs.size(); ++mi) {
338 auto& input = mInputs[mi];
339 if (input.lifetime == Lifetime::Timer) {
340 continue;
341 }
342 auto& element = mCache[si * mInputs.size() + mi];
343 if (element.size() == 0) {
344 auto& state = mContext.get<DeviceState>();
346 if (state.allowedProcessing == DeviceState::CalibrationOnly) {
347 O2_SIGNPOST_ID_GENERATE(cid, calibration);
348 O2_SIGNPOST_EVENT_EMIT(calibration, cid, "expected_missing_data", "Expected missing %{public}s (lifetime:%d) while dropping non-calibration data in slot %zu with timestamp %zu < %zu.",
349 DataSpecUtils::describe(input).c_str(), (int)input.lifetime, si, timestamp.value, newOldest.timeslice.value);
350 } else {
351 LOGP(info, "Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
352 }
353 } else {
354 if (state.allowedProcessing == DeviceState::CalibrationOnly) {
355 O2_SIGNPOST_ID_GENERATE(cid, calibration);
356 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "expected_missing_data", "Not processing in calibration mode: missing %s (lifetime:%d) while dropping incomplete data in slot %zu with timestamp %zu < %zu.",
357 DataSpecUtils::describe(input).c_str(), (int)input.lifetime, si, timestamp.value, newOldest.timeslice.value);
358 } else {
359 LOGP(error, "Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
360 }
361 }
362 }
363 }
364 }
365 }
366}
367
372
374{
375 for (auto& op : mPruneOps) {
376 this->pruneCache(op.slot, onDrop);
377 }
378 mPruneOps.clear();
379}
380
382{
383 // We need to prune the cache from the old stuff, if any. Otherwise we
384 // simply store the payload in the cache and we mark relevant bit in the
385 // hence the first if.
386 auto pruneCache = [&onDrop,
387 &cache = mCache,
388 &cachedStateMetrics = mCachedStateMetrics,
389 numInputTypes = mDistinctRoutesIndex.size(),
390 &index = mTimesliceIndex,
391 ref = mContext](TimesliceSlot slot) {
392 if (onDrop) {
393 auto oldestPossibleTimeslice = index.getOldestPossibleOutput();
394 // State of the computation
395 std::vector<MessageSet> dropped(numInputTypes);
396 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
397 auto cacheId = slot.index * numInputTypes + ai;
398 cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
399 // TODO: in the original implementation of the cache, there have been only two messages per entry,
400 // check if the 2 above corresponds to the number of messages.
401 if (cache[cacheId].size() > 0) {
402 dropped[ai] = std::move(cache[cacheId]);
403 }
404 }
405 bool anyDropped = std::any_of(dropped.begin(), dropped.end(), [](auto& m) { return m.size(); });
406 if (anyDropped) {
407 O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
408 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "pruneCache", "Dropping stuff from slot %zu with timeslice %zu", slot.index, oldestPossibleTimeslice.timeslice.value);
409 onDrop(slot, dropped, oldestPossibleTimeslice);
410 }
411 }
412 assert(cache.empty() == false);
413 assert(index.size() * numInputTypes == cache.size());
414 // Prune old stuff from the cache, hopefully deleting it...
415 // We set the current slot to the timeslice value, so that old stuff
416 // will be ignored.
417 assert(numInputTypes * slot.index < cache.size());
418 for (size_t ai = slot.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
419 cache[ai].clear();
420 cachedStateMetrics[ai] = CacheEntryStatus::EMPTY;
421 }
422 };
423
424 pruneCache(slot);
425}
426
427bool isCalibrationData(std::unique_ptr<fair::mq::Message>& first)
428{
429 auto* dph = o2::header::get<DataProcessingHeader*>(first->GetData());
431}
432
434 DataRelayer::relay(void const* rawHeader,
435 std::unique_ptr<fair::mq::Message>* messages,
436 InputInfo const& info,
437 size_t nMessages,
438 size_t nPayloads,
439 OnInsertionCallback onInsertion,
440 OnDropCallback onDrop)
441{
442 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
443 DataProcessingHeader const* dph = o2::header::get<DataProcessingHeader*>(rawHeader);
444 // IMPLEMENTATION DETAILS
445 //
446 // This returns true if a given slot is available for the current number of lanes
447 auto isSlotInLane = [currentLane = dph->startTime, maxLanes = mMaxLanes](TimesliceSlot slot) {
448 return (slot.index % maxLanes) == (currentLane % maxLanes);
449 };
450 // This returns the identifier for the given input. We use a separate
451 // function because while it's trivial now, the actual matchmaking will
452 // become more complicated when we will start supporting ranges.
453 auto getInputTimeslice = [&matchers = mInputMatchers,
454 &distinctRoutes = mDistinctRoutesIndex,
455 &rawHeader,
456 &index = mTimesliceIndex](VariableContext& context)
457 -> std::tuple<int, TimesliceId> {
460 auto input = matchToContext(rawHeader, matchers, distinctRoutes, context);
461
462 if (input == INVALID_INPUT) {
463 return {
466 };
467 }
470 if (auto pval = std::get_if<uint64_t>(&context.get(0))) {
471 TimesliceId timeslice{*pval};
472 return {input, timeslice};
473 }
474 // If we get here it means we need to push something out of the cache.
475 return {
478 };
479 };
480
481 // Actually save the header / payload in the slot
482 auto saveInSlot = [&cachedStateMetrics = mCachedStateMetrics,
483 &messages,
484 &nMessages,
485 &nPayloads,
486 &onInsertion,
487 &cache = mCache,
488 &services = mContext,
489 numInputTypes = mDistinctRoutesIndex.size()](TimesliceId timeslice, int input, TimesliceSlot slot, InputInfo const& info) -> size_t {
490 O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
491 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "saveInSlot", "saving %{public}s@%zu in slot %zu from %{public}s",
492 fmt::format("{:x}", *o2::header::get<DataHeader*>(messages[0]->GetData())).c_str(),
493 timeslice.value, slot.index,
494 info.index.value == ChannelIndex::INVALID ? "invalid" : services.get<FairMQDeviceProxy>().getInputChannel(info.index)->GetName().c_str());
495 auto cacheIdx = numInputTypes * slot.index + input;
496 MessageSet& target = cache[cacheIdx];
497 cachedStateMetrics[cacheIdx] = CacheEntryStatus::PENDING;
498 // TODO: make sure that multiple parts can only be added within the same call of
499 // DataRelayer::relay
500 assert(nPayloads > 0);
501 size_t saved = 0;
502 // It's guaranteed we will see all these messages only once, so we can
503 // do the forwarding here.
504 auto allMessages = std::span<fair::mq::MessagePtr>(messages, messages + nMessages);
505 if (onInsertion) {
506 onInsertion(services, allMessages);
507 }
508 for (size_t mi = 0; mi < nMessages; ++mi) {
509 assert(mi + nPayloads < nMessages);
510 // We are in calibration mode and the data does not have the calibration bit set.
511 // We do not store it.
513 O2_SIGNPOST_ID_FROM_POINTER(cid, calibration, &services.get<DataProcessorContext>());
514 O2_SIGNPOST_EVENT_EMIT(calibration, cid, "calibration",
515 "Dropping incoming %zu messages because they are data processing.", nPayloads);
516 // Actually dropping messages.
517 for (size_t i = mi; i < mi + nPayloads + 1; i++) {
518 auto discard = std::move(messages[i]);
519 }
520 mi += nPayloads;
521 continue;
522 }
523 auto span = std::span<fair::mq::MessagePtr>(messages + mi, messages + mi + nPayloads + 1);
524 // Notice this will split [(header, payload), (header, payload)] multiparts
525 // in N different subParts for the message spec.
526 target.add([&span](size_t i) -> fair::mq::MessagePtr& { return span[i]; }, nPayloads + 1);
527 mi += nPayloads;
528 saved += nPayloads;
529 }
530 return saved;
531 };
532
533 auto updateStatistics = [ref = mContext](TimesliceIndex::ActionTaken action) {
534 auto& stats = ref.get<DataProcessingStats>();
535
536 // Update statistics for what happened
537 switch (action) {
539 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
540 break;
542 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
543 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), DataProcessingStats::Op::Add, (int)1});
544 break;
546 stats.updateStats({static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), DataProcessingStats::Op::Add, (int)1});
547 break;
549 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
550 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), DataProcessingStats::Op::Add, (int)1});
551 break;
553 break;
554 }
555 };
556
557 // OUTER LOOP
558 //
559 // This is the actual outer loop processing input as part of a given
560 // timeslice. All the other implementation details are hidden by the lambdas
561 auto input = INVALID_INPUT;
562 auto timeslice = TimesliceId{TimesliceId::INVALID};
564 auto& index = mTimesliceIndex;
565
566 bool needsCleaning = false;
567 // First look for matching slots which already have some
568 // partial match.
569 for (size_t ci = 0; ci < index.size(); ++ci) {
570 slot = TimesliceSlot{ci};
571 if (!isSlotInLane(slot)) {
572 continue;
573 }
574 if (index.isValid(slot) == false) {
575 continue;
576 }
577 std::tie(input, timeslice) = getInputTimeslice(index.getVariablesForSlot(slot));
578 if (input != INVALID_INPUT) {
579 break;
580 }
581 }
582
583 // If we did not find anything, look for slots which
584 // are invalid.
585 if (input == INVALID_INPUT) {
586 for (size_t ci = 0; ci < index.size(); ++ci) {
587 slot = TimesliceSlot{ci};
588 if (index.isValid(slot) == true) {
589 continue;
590 }
591 if (!isSlotInLane(slot)) {
592 continue;
593 }
594 std::tie(input, timeslice) = getInputTimeslice(index.getVariablesForSlot(slot));
595 if (input != INVALID_INPUT) {
596 needsCleaning = true;
597 break;
598 }
599 }
600 }
601
602 auto& stats = mContext.get<DataProcessingStats>();
604 if (input != INVALID_INPUT && TimesliceId::isValid(timeslice) && TimesliceSlot::isValid(slot)) {
605 if (needsCleaning) {
606 this->pruneCache(slot, onDrop);
607 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](const auto& x) { return x.slot == slot; }), mPruneOps.end());
608 }
609 size_t saved = saveInSlot(timeslice, input, slot, info);
610 if (saved == 0) {
611 return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
612 }
613 index.publishSlot(slot);
614 index.markAsDirty(slot, true);
615 stats.updateStats({static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), DataProcessingStats::Op::Add, (int)1});
616 return RelayChoice{.type = RelayChoice::Type::WillRelay, .timeslice = timeslice};
617 }
618
621 VariableContext pristineContext;
622 std::tie(input, timeslice) = getInputTimeslice(pristineContext);
623
624 auto DataHeaderInfo = [&rawHeader]() {
625 std::string error;
626 // extract header from message model
627 const auto* dh = o2::header::get<o2::header::DataHeader*>(rawHeader);
628 if (dh) {
629 error += fmt::format("{}/{}/{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification);
630 } else {
631 error += "invalid header";
632 }
633 return error;
634 };
635
636 if (input == INVALID_INPUT) {
637 LOG(error) << "Could not match incoming data to any input route: " << DataHeaderInfo();
638 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
639 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
640 for (size_t pi = 0; pi < nMessages; ++pi) {
641 messages[pi].reset(nullptr);
642 }
643 return RelayChoice{.type = RelayChoice::Type::Invalid, .timeslice = timeslice};
644 }
645
646 if (TimesliceId::isValid(timeslice) == false) {
647 LOG(error) << "Could not determine the timeslice for input: " << DataHeaderInfo();
648 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
649 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
650 for (size_t pi = 0; pi < nMessages; ++pi) {
651 messages[pi].reset(nullptr);
652 }
653 return RelayChoice{.type = RelayChoice::Type::Invalid, .timeslice = timeslice};
654 }
655
656 O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
658 std::tie(action, slot) = index.replaceLRUWith(pristineContext, timeslice);
659 uint64_t const* debugTimestamp = std::get_if<uint64_t>(&pristineContext.get(0));
660 if (action != TimesliceIndex::ActionTaken::Wait) {
661 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "saveInSlot",
662 "Slot %zu updated with %zu using action %d, %" PRIu64, slot.index, timeslice.value, (int)action, *debugTimestamp);
663 }
664
665 updateStatistics(action);
666
667 switch (action) {
669 return RelayChoice{.type = RelayChoice::Type::Backpressured, .timeslice = timeslice};
671 static std::atomic<size_t> obsoleteCount = 0;
672 static std::atomic<size_t> mult = 1;
673 if ((obsoleteCount++ % (1 * mult)) == 0) {
674 LOGP(warning, "Over {} incoming messages are already obsolete, not relaying.", obsoleteCount.load());
675 if (obsoleteCount > mult * 10) {
676 mult = mult * 10;
677 }
678 }
679 return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
681 LOG(warning) << "Incoming data is invalid, not relaying.";
682 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
683 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
684 for (size_t pi = 0; pi < nMessages; ++pi) {
685 messages[pi].reset(nullptr);
686 }
687 return RelayChoice{.type = RelayChoice::Type::Invalid, .timeslice = timeslice};
690 // At this point the variables match the new input but the
691 // cache still holds the old data, so we prune it.
692 this->pruneCache(slot, onDrop);
693 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](const auto& x) { return x.slot == slot; }), mPruneOps.end());
694 size_t saved = saveInSlot(timeslice, input, slot, info);
695 if (saved == 0) {
696 return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
697 }
698 index.publishSlot(slot);
699 index.markAsDirty(slot, true);
700 return RelayChoice{.type = RelayChoice::Type::WillRelay, .timeslice = timeslice};
701 }
703}
704
705void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& completed)
706{
707 LOGP(debug, "DataRelayer::getReadyToProcess");
708 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
709
710 // THE STATE
711 const auto& cache = mCache;
712 const auto numInputTypes = mDistinctRoutesIndex.size();
713 //
714 // THE IMPLEMENTATION DETAILS
715 //
716 // We use this to bail out early from the check as soon as we find something
717 // which we know is not complete.
718 auto getPartialRecord = [&cache, &numInputTypes](int li) -> std::span<MessageSet const> {
719 auto offset = li * numInputTypes;
720 assert(cache.size() >= offset + numInputTypes);
721 auto const start = cache.data() + offset;
722 auto const end = cache.data() + offset + numInputTypes;
723 return {start, end};
724 };
725
726 // These two are trivial, but in principle the whole loop could be parallelised
727 // or vectorised so "completed" could be a thread local variable which needs
728 // merging at the end.
729 auto updateCompletionResults = [&completed](TimesliceSlot li, uint64_t const* timeslice, CompletionPolicy::CompletionOp op) {
730 if (timeslice) {
731 LOGP(debug, "Doing action {} for slot {} (timeslice: {})", (int)op, li.index, *timeslice);
732 completed.emplace_back(RecordAction{li, {*timeslice}, op});
733 } else {
734 LOGP(debug, "No timeslice associated with slot ", li.index);
735 }
736 };
737
738 // THE OUTER LOOP
739 //
740 // To determine if a line is complete, we iterate on all the arguments
741 // and check if they are ready. We do it this way, because in the end
742 // the number of inputs is going to be small and having a more complex
743 // structure will probably result in a larger footprint in any case.
744 // Also notice that ai == inputsNumber only when we reach the end of the
745 // iteration, that means we have found all the required bits.
746 //
747 // Notice that the only time numInputTypes is 0 is when we are a dummy
748 // device created as a source for timers / conditions.
749 if (numInputTypes == 0) {
750 LOGP(debug, "numInputTypes == 0, returning.");
751 return;
752 }
753 size_t cacheLines = cache.size() / numInputTypes;
754 assert(cacheLines * numInputTypes == cache.size());
755 int countConsume = 0;
756 int countConsumeExisting = 0;
757 int countProcess = 0;
758 int countDiscard = 0;
759 int countWait = 0;
760 int notDirty = 0;
761
762 for (int li = cacheLines - 1; li >= 0; --li) {
763 TimesliceSlot slot{(size_t)li};
764 // We only check the cachelines which have been updated by an incoming
765 // message.
766 if (mTimesliceIndex.isDirty(slot) == false) {
767 notDirty++;
768 continue;
769 }
770 if (!mCompletionPolicy.callbackFull) {
771 throw runtime_error_f("Completion police %s has no callback set", mCompletionPolicy.name.c_str());
772 }
773 auto partial = getPartialRecord(li);
774 // TODO: get the data ref from message model
775 auto getter = [&partial](size_t idx, size_t part) {
776 if (partial[idx].size() > 0 && partial[idx].header(part).get()) {
777 auto header = partial[idx].header(part).get();
778 auto payload = partial[idx].payload(part).get();
779 return DataRef{nullptr,
780 reinterpret_cast<const char*>(header->GetData()),
781 reinterpret_cast<char const*>(payload ? payload->GetData() : nullptr),
782 payload ? payload->GetSize() : 0};
783 }
784 return DataRef{};
785 };
786 auto nPartsGetter = [&partial](size_t idx) {
787 return partial[idx].size();
788 };
789 auto refCountGetter = [&partial](size_t idx) -> int {
790 auto& header = static_cast<const fair::mq::shmem::Message&>(*partial[idx].header(0));
791 return header.GetRefCount();
792 };
793 InputSpan span{getter, nPartsGetter, refCountGetter, static_cast<size_t>(partial.size())};
794 CompletionPolicy::CompletionOp action = mCompletionPolicy.callbackFull(span, mInputs, mContext);
795
796 auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
797 auto timeslice = std::get_if<uint64_t>(&variables.get(0));
798 switch (action) {
800 countConsume++;
801 updateCompletionResults(slot, timeslice, action);
802 mTimesliceIndex.markAsDirty(slot, false);
803 break;
805 // This is just like Consume, but we also mark all slots as dirty
806 countConsume++;
808 updateCompletionResults(slot, timeslice, action);
809 mTimesliceIndex.rescan();
810 break;
812 countConsumeExisting++;
813 updateCompletionResults(slot, timeslice, action);
814 mTimesliceIndex.markAsDirty(slot, false);
815 break;
817 countProcess++;
818 updateCompletionResults(slot, timeslice, action);
819 mTimesliceIndex.markAsDirty(slot, false);
820 break;
822 countDiscard++;
823 updateCompletionResults(slot, timeslice, action);
824 mTimesliceIndex.markAsDirty(slot, false);
825 break;
827 countWait++;
828 mTimesliceIndex.markAsDirty(slot, true);
830 break;
832 countWait++;
833 mTimesliceIndex.markAsDirty(slot, false);
834 break;
835 }
836 }
837 mTimesliceIndex.updateOldestPossibleOutput(false);
838 LOGP(debug, "DataRelayer::getReadyToProcess results notDirty:{}, consume:{}, consumeExisting:{}, process:{}, discard:{}, wait:{}",
839 notDirty, countConsume, countConsumeExisting, countProcess,
840 countDiscard, countWait);
841}
842
844{
845 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
846 const auto numInputTypes = mDistinctRoutesIndex.size();
847
848 auto markInputDone = [&cachedStateMetrics = mCachedStateMetrics,
849 &numInputTypes](TimesliceSlot s, size_t arg, CacheEntryStatus oldStatus, CacheEntryStatus newStatus) {
850 auto cacheId = s.index * numInputTypes + arg;
851 if (cachedStateMetrics[cacheId] == oldStatus) {
852 cachedStateMetrics[cacheId] = newStatus;
853 }
854 };
855
856 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
857 markInputDone(slot, ai, oldStatus, newStatus);
858 }
859}
860
861std::vector<o2::framework::MessageSet> DataRelayer::consumeAllInputsForTimeslice(TimesliceSlot slot)
862{
863 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
864
865 const auto numInputTypes = mDistinctRoutesIndex.size();
866 // State of the computation
867 std::vector<MessageSet> messages(numInputTypes);
868 auto& cache = mCache;
869 auto& index = mTimesliceIndex;
870
871 // Nothing to see here, this is just to make the outer loop more understandable.
872 auto jumpToCacheEntryAssociatedWith = [](TimesliceSlot) {
873 return;
874 };
875
876 // We move ownership so that the cache can be reused once the computation is
877 // finished. We mark the given cache slot invalid, so that it can be reused
878 // This means we can still handle old messages if there is still space in the
879 // cache where to put them.
880 auto moveHeaderPayloadToOutput = [&messages,
881 &cachedStateMetrics = mCachedStateMetrics,
882 &cache, &index, &numInputTypes](TimesliceSlot s, size_t arg) {
883 auto cacheId = s.index * numInputTypes + arg;
884 cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
885 // TODO: in the original implementation of the cache, there have been only two messages per entry,
886 // check if the 2 above corresponds to the number of messages.
887 if (cache[cacheId].size() > 0) {
888 messages[arg] = std::move(cache[cacheId]);
889 }
890 index.markAsInvalid(s);
891 };
892
893 // An invalid set of arguments is a set of arguments associated to an invalid
894 // timeslice, so I can simply do that. I keep the assertion there because in principle
895 // we should have dispatched the timeslice already!
896 // FIXME: what happens when we have enough timeslices to hit the invalid one?
897 auto invalidateCacheFor = [&numInputTypes, &index, &cache](TimesliceSlot s) {
898 for (size_t ai = s.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
899 assert(std::accumulate(cache[ai].messages.begin(), cache[ai].messages.end(), true, [](bool result, auto const& element) { return result && element.get() == nullptr; }));
900 cache[ai].clear();
901 }
902 index.markAsInvalid(s);
903 };
904
905 // Outer loop here.
906 jumpToCacheEntryAssociatedWith(slot);
907 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
908 moveHeaderPayloadToOutput(slot, ai);
909 }
910 invalidateCacheFor(slot);
911
912 return messages;
913}
914
915std::vector<o2::framework::MessageSet> DataRelayer::consumeExistingInputsForTimeslice(TimesliceSlot slot)
916{
917 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
918
919 const auto numInputTypes = mDistinctRoutesIndex.size();
920 // State of the computation
921 std::vector<MessageSet> messages(numInputTypes);
922 auto& cache = mCache;
923 auto& index = mTimesliceIndex;
924
925 // Nothing to see here, this is just to make the outer loop more understandable.
926 auto jumpToCacheEntryAssociatedWith = [](TimesliceSlot) {
927 return;
928 };
929
930 // We move ownership so that the cache can be reused once the computation is
931 // finished. We mark the given cache slot invalid, so that it can be reused
932 // This means we can still handle old messages if there is still space in the
933 // cache where to put them.
934 auto copyHeaderPayloadToOutput = [&messages,
935 &cachedStateMetrics = mCachedStateMetrics,
936 &cache, &index, &numInputTypes](TimesliceSlot s, size_t arg) {
937 auto cacheId = s.index * numInputTypes + arg;
938 cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
939 // TODO: in the original implementation of the cache, there have been only two messages per entry,
940 // check if the 2 above corresponds to the number of messages.
941 for (size_t pi = 0; pi < cache[cacheId].size(); pi++) {
942 auto& header = cache[cacheId].header(pi);
943 auto&& newHeader = header->GetTransport()->CreateMessage();
944 newHeader->Copy(*header);
945 messages[arg].add(PartRef{std::move(newHeader), std::move(cache[cacheId].payload(pi))});
946 }
947 };
948
949 // Outer loop here.
950 jumpToCacheEntryAssociatedWith(slot);
951 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
952 copyHeaderPayloadToOutput(slot, ai);
953 }
954
955 return std::move(messages);
956}
957
959{
960 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
961
962 for (auto& cache : mCache) {
963 cache.clear();
964 }
965 for (size_t s = 0; s < mTimesliceIndex.size(); ++s) {
966 mTimesliceIndex.markAsInvalid(TimesliceSlot{s});
967 }
968}
969
970size_t
972{
973 return mCache.size() / mDistinctRoutesIndex.size();
974}
975
981{
982 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
983
984 mTimesliceIndex.resize(s);
985 mVariableContextes.resize(s);
987}
988
990{
991 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
992
993 auto numInputTypes = mDistinctRoutesIndex.size();
994 // FIXME: many of the DataRelayer function rely on allocated cache, so its
995 // maybe misleading to have the allocation in a function primarily for
996 // metrics publishing, do better in setPipelineLength?
997 mCache.resize(numInputTypes * mTimesliceIndex.size());
998 auto& states = mContext.get<DataProcessingStates>();
999
1000 mCachedStateMetrics.resize(mCache.size());
1001
1002 // There is maximum 16 variables available. We keep them row-wise so that
1003 // that we can take mod 16 of the index to understand which variable we
1004 // are talking about.
1005 for (size_t i = 0; i < mVariableContextes.size(); ++i) {
1007 .name = fmt::format("matcher_variables/{}", i),
1008 .stateId = static_cast<short>((short)(ProcessingStateId::CONTEXT_VARIABLES_BASE) + i),
1009 .minPublishInterval = 500, // if we publish too often we flood the GUI and we are not able to read it in any case
1010 .sendInitialValue = true,
1011 .defaultEnabled = mContext.get<DriverConfig const>().driverHasGUI,
1012 });
1013 }
1014
1015 for (int ci = 0; ci < mTimesliceIndex.size(); ci++) {
1017 .name = fmt::format("data_relayer/{}", ci),
1018 .stateId = static_cast<short>((short)(ProcessingStateId::DATA_RELAYER_BASE) + (short)ci),
1019 .minPublishInterval = 800, // if we publish too often we flood the GUI and we are not able to read it in any case
1020 .sendInitialValue = true,
1021 .defaultEnabled = mContext.get<DriverConfig const>().driverHasGUI,
1022 });
1023 }
1024}
1025
1027{
1028 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1030}
1031
1033{
1034 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1036}
1037
1039{
1040 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1041 return VariableContextHelpers::getRunNumber(mTimesliceIndex.getVariablesForSlot(slot));
1042}
1043
1045{
1046 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1048}
1049
1051{
1052 if (!mContext.get<DriverConfig const>().driverHasGUI) {
1053 return;
1054 }
1055 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1056 auto& states = mContext.get<DataProcessingStates>();
1057 for (size_t ci = 0; ci < mTimesliceIndex.size(); ++ci) {
1058 auto slot = TimesliceSlot{ci};
1059 sendVariableContextMetrics(mTimesliceIndex.getPublishedVariablesForSlot(slot), slot,
1060 states);
1061 }
1062 char relayerSlotState[1024];
1063 // The number of timeslices is encoded in each state
1064 // We serialise the state of a Timeslot in a given state.
1065 int written = snprintf(relayerSlotState, 1024, "%d ", (int)mTimesliceIndex.size());
1066 char* buffer = relayerSlotState + written;
1067 for (size_t ci = 0; ci < mTimesliceIndex.size(); ++ci) {
1068 for (size_t si = 0; si < mDistinctRoutesIndex.size(); ++si) {
1069 int index = ci * mDistinctRoutesIndex.size() + si;
1070 int value = static_cast<int>(mCachedStateMetrics[index]);
1071 buffer[si] = value + '0';
1072 // Anything which is done is actually already empty,
1073 // so after we report it we mark it as such.
1074 if (mCachedStateMetrics[index] == CacheEntryStatus::DONE) {
1075 mCachedStateMetrics[index] = CacheEntryStatus::EMPTY;
1076 }
1077 }
1078 buffer[mDistinctRoutesIndex.size()] = '\0';
1079 auto size = (int)(buffer - relayerSlotState + mDistinctRoutesIndex.size());
1080 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + ci), .size = size, .data = relayerSlotState});
1081 }
1082}
1083
1084} // namespace o2::framework
std::vector< framework::ConcreteDataMatcher > matchers
benchmark::State & state
std::vector< OutputRoute > routes
#define O2_BUILTIN_UNREACHABLE
o2::monitoring::Verbosity Verbosity
atype::type element
std::ostringstream debug
int32_t i
uint32_t op
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:531
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_LOCKABLE(T)
Definition Tracing.h:20
std::vector< MessageSet > consumeExistingInputsForTimeslice(TimesliceSlot id)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
std::function< void(TimesliceSlot, std::vector< MessageSet > &, TimesliceIndex::OldestOutputInfo info)> OnDropCallback
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
void getReadyToProcess(std::vector< RecordAction > &completed)
void setPipelineLength(size_t s)
Tune the maximum number of in flight timeslices this can handle.
size_t getParallelTimeslices() const
Returns how many timeslices we can handle in parallel.
std::vector< MessageSet > consumeAllInputsForTimeslice(TimesliceSlot id)
RelayChoice relay(void const *rawHeader, std::unique_ptr< fair::mq::Message > *messages, InputInfo const &info, size_t nMessages, size_t nPayloads=1, OnInsertionCallback onInsertion=nullptr, OnDropCallback onDrop=nullptr)
void pruneCache(TimesliceSlot slot, OnDropCallback onDrop=nullptr)
Prune the cache for a given slot.
std::function< void(ServiceRegistryRef &, std::span< fair::mq::MessagePtr > &)> OnInsertionCallback
DataRelayer(CompletionPolicy const &, std::vector< InputRoute > const &routes, TimesliceIndex &, ServiceRegistryRef)
void setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
void sendContextState()
Send metrics with the VariableContext information.
TimesliceId getTimesliceForSlot(TimesliceSlot slot)
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
TimesliceIndex::OldestOutputInfo getOldestPossibleOutput() const
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
void clear()
Remove all pending messages.
void markAsDirty(TimesliceSlot slot, bool value)
data_matcher::VariableContext & getPublishedVariablesForSlot(TimesliceSlot slot)
OldestInputInfo setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
OldestOutputInfo getOldestPossibleOutput() const
bool isDirty(TimesliceSlot const &slot) const
InputChannelInfo const & getChannelInfo(ChannelIndex channel) const
ActionTaken
The outcome for the processing of a given timeslot.
@ Wait
An obsolete slot is used to hold the new context and the old one is dropped.
@ DropObsolete
An invalid context is not inserted in the index and dropped.
@ DropInvalid
We wait for the oldest slot to complete.
@ ReplaceObsolete
An unused / invalid slot is used to hold the new context.
void rescan()
Mark all the cachelines as invalid, e.g. due to an out of band event.
bool validateSlot(TimesliceSlot slot, TimesliceId currentOldest)
bool isValid(TimesliceSlot const &slot) const
void markAsInvalid(TimesliceSlot slot)
OldestOutputInfo updateOldestPossibleOutput(bool rewinded)
data_matcher::VariableContext & getVariablesForSlot(TimesliceSlot slot)
void publish(void(*callback)(VariableContext const &, TimesliceSlot slot, void *context), void *context, TimesliceSlot slot)
ContextElement::Value const & get(size_t pos) const
GLint GLenum GLint x
Definition glcorearb.h:403
const GLfloat * m
Definition glcorearb.h:4066
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLuint index
Definition glcorearb.h:781
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum target
Definition glcorearb.h:1641
GLboolean * data
Definition glcorearb.h:298
GLintptr offset
Definition glcorearb.h:660
GLuint start
Definition glcorearb.h:469
GLuint * states
Definition glcorearb.h:4932
Defining PrimaryVertex explicitly as messageable.
size_t matchToContext(void const *data, std::vector< DataDescriptorMatcher > const &matchers, std::vector< size_t > const &index, VariableContext &context)
constexpr int INVALID_INPUT
bool isCalibrationData(std::unique_ptr< fair::mq::Message > &first)
void sendVariableContextMetrics(VariableContext &context, TimesliceSlot slot, DataProcessingStates &states)
@ NoTransition
No pending transitions.
RuntimeErrorRef runtime_error_f(const char *,...)
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static constexpr int INVALID
std::string name
Name of the policy itself.
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
CallbackConfigureRelayer configureRelayer
CallbackFull callbackFull
Actual policy which decides what to do with a partial InputRecord, extended version.
static constexpr int32_t KEEP_AT_EOS_FLAG
Helper struct to hold statistics about the data processing happening.
@ Add
Update the rate of the metric given the amount since the last time.
Type type
What was the outcome of the relay operation.
Definition DataRelayer.h:64
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static std::string describe(InputSpec const &spec)
static unsigned int pipelineLength()
get max number of timeslices in the queue
static bool onlineDeploymentMode()
@true if running online
Running state information of a given device.
Definition DeviceState.h:34
ProcessingType allowedProcessing
Definition DeviceState.h:79
fair::mq::Channel * channel
Definition ChannelInfo.h:51
Reference to an inflight part.
Definition PartRef.h:24
static bool isValid(TimesliceId const &timeslice)
static constexpr uint64_t INVALID
static bool isValid(TimesliceSlot const &slot)
static constexpr uint64_t INVALID
static uint32_t getRunNumber(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFCounter(data_matcher::VariableContext const &variables)
static uint64_t getCreationTime(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFOrbit(data_matcher::VariableContext const &variables)
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
the base header struct Every header type must begin (i.e. derive) with this. Don't use this struct di...
Definition DataHeader.h:351
the main header struct
Definition DataHeader.h:619
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"