Project
Loading...
Searching...
No Matches
DataRelayer.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
16
22#include "Framework/DataRef.h"
24#include "Framework/InputSpan.h"
26#include "Framework/Logger.h"
27#include "Framework/PartRef.h"
33#include "DataRelayerHelpers.h"
34#include "InputRouteHelpers.h"
40
43
44#include <Monitoring/Metric.h>
45#include <Monitoring/Monitoring.h>
46
47#include <fairlogger/Logger.h>
48#include <fairmq/Channel.h>
49#include <functional>
50#if __has_include(<fairmq/shmem/Message.h>)
51#include <fairmq/shmem/Message.h>
52#endif
53#include <fmt/format.h>
54#include <fmt/ostream.h>
55#include <gsl/span>
56#include <string>
57
58using namespace o2::framework::data_matcher;
61using Verbosity = o2::monitoring::Verbosity;
62
64// Stream which keeps track of the calibration lifetime logic
66
67namespace o2::framework
68{
69
70constexpr int INVALID_INPUT = -1;
71
73 std::vector<InputRoute> const& routes,
75 ServiceRegistryRef services)
76 : mContext{services},
77 mTimesliceIndex{index},
78 mCompletionPolicy{policy},
79 mDistinctRoutesIndex{DataRelayerHelpers::createDistinctRouteIndex(routes)},
80 mInputMatchers{DataRelayerHelpers::createInputMatchers(routes)},
81 mMaxLanes{InputRouteHelpers::maxLanes(routes)}
82{
83 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
84
85 if (policy.configureRelayer == nullptr) {
86 static int pipelineLength = DefaultsHelpers::pipelineLength();
87 setPipelineLength(pipelineLength);
88 } else {
89 policy.configureRelayer(*this);
90 }
91
92 // The queries are all the same, so we only have width 1
93 auto numInputTypes = mDistinctRoutesIndex.size();
94 auto& states = services.get<DataProcessingStates>();
95 std::string queries = "";
96 for (short i = 0; i < numInputTypes; ++i) {
97 char buffer[128];
98 assert(mDistinctRoutesIndex[i] < routes.size());
99 mInputs.push_back(routes[mDistinctRoutesIndex[i]].matcher);
100 auto& matcher = routes[mDistinctRoutesIndex[i]].matcher;
101 DataSpecUtils::describe(buffer, 127, matcher);
102 queries += std::string_view(buffer, strlen(buffer));
103 queries += ";";
104 }
106 states.registerState({.name = "data_queries", .stateId = stateId, .sendInitialValue = true, .defaultEnabled = true});
107 states.updateState(DataProcessingStates::CommandSpec{.id = stateId, .size = (int)queries.size(), .data = queries.data()});
108 states.processCommandQueue();
109}
110
112{
113 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
114 auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
115 return VariableContextHelpers::getTimeslice(variables);
116}
117
118DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<ExpirationHandler> const& expirationHandlers,
119 ServiceRegistryRef services, bool createNew)
120{
121 LOGP(debug, "DataRelayer::processDanglingInputs");
122 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
123 auto& deviceProxy = services.get<FairMQDeviceProxy>();
124
125 ActivityStats activity;
127 if (expirationHandlers.empty()) {
128 LOGP(debug, "DataRelayer::processDanglingInputs: No expiration handlers");
129 return activity;
130 }
131 // Create any slot for the time based fields
132 std::vector<TimesliceSlot> slotsCreatedByHandlers;
133 if (createNew) {
134 LOGP(debug, "Creating new slot");
135 for (auto& handler : expirationHandlers) {
136 LOGP(debug, "handler.creator for {}", handler.name);
137 auto channelIndex = deviceProxy.getInputChannelIndex(handler.routeIndex);
138 slotsCreatedByHandlers.push_back(handler.creator(services, channelIndex));
139 }
140 }
141 // Count how many slots are not invalid
142 auto validSlots = 0;
143 for (auto slot : slotsCreatedByHandlers) {
144 if (slot.index == TimesliceSlot::INVALID) {
145 continue;
146 }
147 validSlots++;
148 }
149 if (validSlots > 0) {
150 activity.newSlots++;
151 LOGP(debug, "DataRelayer::processDanglingInputs: {} slots created by handler", validSlots);
152 } else {
153 LOGP(debug, "DataRelayer::processDanglingInputs: no slots created by handler");
154 }
155 // Outer loop, we process all the records because the fact that the record
156 // expires is independent from having received data for it.
157 int headerPresent = 0;
158 int payloadPresent = 0;
159 int noCheckers = 0;
160 int badSlot = 0;
161 int checkerDenied = 0;
162 for (size_t ti = 0; ti < mTimesliceIndex.size(); ++ti) {
163 TimesliceSlot slot{ti};
164 if (mTimesliceIndex.isValid(slot) == false) {
165 continue;
166 }
167 assert(mDistinctRoutesIndex.empty() == false);
168 auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
169 auto timestamp = VariableContextHelpers::getTimeslice(variables);
170 // We iterate on all the hanlders checking if they need to be expired.
171 for (size_t ei = 0; ei < expirationHandlers.size(); ++ei) {
172 auto& expirator = expirationHandlers[ei];
173 // We check that no data is already there for the given cell
174 // it is enough to check the first element
175 auto& part = mCache[ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value];
176 if (part.size() > 0 && part.header(0) != nullptr) {
177 headerPresent++;
178 continue;
179 }
180 if (part.size() > 0 && part.payload(0) != nullptr) {
181 payloadPresent++;
182 continue;
183 }
184 // We check that the cell can actually be expired.
185 if (!expirator.checker) {
186 noCheckers++;
187 continue;
188 }
189 if (slotsCreatedByHandlers[ei] != slot) {
190 badSlot++;
191 continue;
192 }
193
194 auto getPartialRecord = [&cache = mCache, numInputTypes = mDistinctRoutesIndex.size()](int li) -> gsl::span<MessageSet const> {
195 auto offset = li * numInputTypes;
196 assert(cache.size() >= offset + numInputTypes);
197 auto const start = cache.data() + offset;
198 auto const end = cache.data() + offset + numInputTypes;
199 return {start, end};
200 };
201
202 auto partial = getPartialRecord(ti);
203 // TODO: get the data ref from message model
204 auto getter = [&partial](size_t idx, size_t part) {
205 if (partial[idx].size() > 0 && partial[idx].header(part).get()) {
206 auto header = partial[idx].header(part).get();
207 auto payload = partial[idx].payload(part).get();
208 return DataRef{nullptr,
209 reinterpret_cast<const char*>(header->GetData()),
210 reinterpret_cast<char const*>(payload ? payload->GetData() : nullptr),
211 payload ? payload->GetSize() : 0};
212 }
213 return DataRef{};
214 };
215 auto nPartsGetter = [&partial](size_t idx) {
216 return partial[idx].size();
217 };
218#if __has_include(<fairmq/shmem/Message.h>)
219 auto refCountGetter = [&partial](size_t idx) -> int {
220 auto& header = static_cast<const fair::mq::shmem::Message&>(*partial[idx].header(0));
221 return header.GetRefCount();
222 };
223#else
224 std::function<int(size_t)> refCountGetter = nullptr;
225#endif
226 InputSpan span{getter, nPartsGetter, refCountGetter, static_cast<size_t>(partial.size())};
227 // Setup the input span
228
229 if (expirator.checker(services, timestamp.value, span) == false) {
230 checkerDenied++;
231 continue;
232 }
233
234 assert(ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value < mCache.size());
235 assert(expirator.handler);
236 PartRef newRef;
237 expirator.handler(services, newRef, variables);
238 part.reset(std::move(newRef));
239 activity.expiredSlots++;
240
241 mTimesliceIndex.markAsDirty(slot, true);
242 assert(part.header(0) != nullptr);
243 assert(part.payload(0) != nullptr);
244 }
245 }
246 LOGP(debug, "DataRelayer::processDanglingInputs headerPresent:{}, payloadPresent:{}, noCheckers:{}, badSlot:{}, checkerDenied:{}",
247 headerPresent, payloadPresent, noCheckers, badSlot, checkerDenied);
248 return activity;
249}
250
254size_t matchToContext(void const* data,
255 std::vector<DataDescriptorMatcher> const& matchers,
256 std::vector<size_t> const& index,
257 VariableContext& context)
258{
259 for (size_t ri = 0, re = index.size(); ri < re; ++ri) {
260 auto& matcher = matchers[index[ri]];
261
262 if (matcher.match(reinterpret_cast<char const*>(data), context)) {
263 context.commit();
264 return ri;
265 }
266 context.discard();
267 }
268 return INVALID_INPUT;
269}
270
274{
275 static const std::string nullstring{"null"};
276
277 context.publish([](VariableContext const& variables, TimesliceSlot slot, void* context) {
278 auto& states = *static_cast<DataProcessingStates*>(context);
279 static std::string state = "";
280 state.clear();
281 for (size_t i = 0; i < MAX_MATCHING_VARIABLE; ++i) {
282 auto var = variables.get(i);
283 if (auto pval = std::get_if<uint64_t>(&var)) {
284 state += std::to_string(*pval);
285 } else if (auto pval = std::get_if<uint32_t>(&var)) {
286 state += std::to_string(*pval);
287 } else if (auto pval2 = std::get_if<std::string>(&var)) {
288 state += *pval2;
289 } else {
290 }
291 state += ";";
292 }
293 states.updateState({.id = short((int)ProcessingStateId::CONTEXT_VARIABLES_BASE + slot.index),
294 .size = (int)state.size(),
295 .data = state.data()});
296 },
297 &states, slot);
298}
299
301{
302 auto newOldest = mTimesliceIndex.setOldestPossibleInput(proposed, channel);
303 LOGP(debug, "DataRelayer::setOldestPossibleInput {} from channel {}", newOldest.timeslice.value, newOldest.channel.value);
304 static bool dontDrop = getenv("DPL_DONT_DROP_OLD_TIMESLICE") && atoi(getenv("DPL_DONT_DROP_OLD_TIMESLICE"));
305 if (dontDrop) {
306 return;
307 }
308 for (size_t si = 0; si < mCache.size() / mInputs.size(); ++si) {
309 auto& variables = mTimesliceIndex.getVariablesForSlot({si});
310 auto timestamp = VariableContextHelpers::getTimeslice(variables);
311 auto valid = mTimesliceIndex.validateSlot({si}, newOldest.timeslice);
312 if (valid) {
313 if (mTimesliceIndex.isValid({si})) {
314 LOGP(debug, "Keeping slot {} because data has timestamp {} while oldest possible timestamp is {}", si, timestamp.value, newOldest.timeslice.value);
315 }
316 continue;
317 }
318 mPruneOps.push_back(PruneOp{si});
319 bool didDrop = false;
320 for (size_t mi = 0; mi < mInputs.size(); ++mi) {
321 auto& input = mInputs[mi];
322 auto& element = mCache[si * mInputs.size() + mi];
323 if (element.size() != 0) {
324 if (input.lifetime != Lifetime::Condition && mCompletionPolicy.name != "internal-dpl-injected-dummy-sink") {
325 didDrop = true;
326 auto& state = mContext.get<DeviceState>();
328 LOGP(warning, "Stop transition requested. Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it will never be completed.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
329 } else {
330 LOGP(error, "Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it can never be completed.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
331 }
332 } else {
333 LOGP(debug,
334 "Silently dropping data {} in pipeline slot {} because it has timeslice {} < {} after receiving data from channel {}."
335 "Because Lifetime::Timeframe data not there and not expected (e.g. due to sampling) we drop non sampled, non timeframe data (e.g. Conditions).",
336 DataSpecUtils::describe(input), si, timestamp.value, newOldest.timeslice.value,
337 mTimesliceIndex.getChannelInfo(channel).channel->GetName());
338 }
339 }
340 }
341 // We did drop some data. Let's print what was missing.
342 if (didDrop) {
343 for (size_t mi = 0; mi < mInputs.size(); ++mi) {
344 auto& input = mInputs[mi];
345 if (input.lifetime == Lifetime::Timer) {
346 continue;
347 }
348 auto& element = mCache[si * mInputs.size() + mi];
349 if (element.size() == 0) {
350 auto& state = mContext.get<DeviceState>();
352 if (state.allowedProcessing == DeviceState::CalibrationOnly) {
353 O2_SIGNPOST_ID_GENERATE(cid, calibration);
354 O2_SIGNPOST_EVENT_EMIT(calibration, cid, "expected_missing_data", "Expected missing %{public}s (lifetime:%d) while dropping non-calibration data in slot %zu with timestamp %zu < %zu.",
355 DataSpecUtils::describe(input).c_str(), (int)input.lifetime, si, timestamp.value, newOldest.timeslice.value);
356 } else {
357 LOGP(info, "Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
358 }
359 } else {
360 if (state.allowedProcessing == DeviceState::CalibrationOnly) {
361 O2_SIGNPOST_ID_GENERATE(cid, calibration);
362 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "expected_missing_data", "Not processing in calibration mode: missing %s (lifetime:%d) while dropping incomplete data in slot %zu with timestamp %zu < %zu.",
363 DataSpecUtils::describe(input).c_str(), (int)input.lifetime, si, timestamp.value, newOldest.timeslice.value);
364 } else {
365 LOGP(error, "Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
366 }
367 }
368 }
369 }
370 }
371 }
372}
373
378
380{
381 for (auto& op : mPruneOps) {
382 this->pruneCache(op.slot, onDrop);
383 }
384 mPruneOps.clear();
385}
386
388{
389 // We need to prune the cache from the old stuff, if any. Otherwise we
390 // simply store the payload in the cache and we mark relevant bit in the
391 // hence the first if.
392 auto pruneCache = [&onDrop,
393 &cache = mCache,
394 &cachedStateMetrics = mCachedStateMetrics,
395 numInputTypes = mDistinctRoutesIndex.size(),
396 &index = mTimesliceIndex,
397 ref = mContext](TimesliceSlot slot) {
398 if (onDrop) {
399 auto oldestPossibleTimeslice = index.getOldestPossibleOutput();
400 // State of the computation
401 std::vector<MessageSet> dropped(numInputTypes);
402 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
403 auto cacheId = slot.index * numInputTypes + ai;
404 cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
405 // TODO: in the original implementation of the cache, there have been only two messages per entry,
406 // check if the 2 above corresponds to the number of messages.
407 if (cache[cacheId].size() > 0) {
408 dropped[ai] = std::move(cache[cacheId]);
409 }
410 }
411 bool anyDropped = std::any_of(dropped.begin(), dropped.end(), [](auto& m) { return m.size(); });
412 if (anyDropped) {
413 O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
414 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "pruneCache", "Dropping stuff from slot %zu with timeslice %zu", slot.index, oldestPossibleTimeslice.timeslice.value);
415 onDrop(slot, dropped, oldestPossibleTimeslice);
416 }
417 }
418 assert(cache.empty() == false);
419 assert(index.size() * numInputTypes == cache.size());
420 // Prune old stuff from the cache, hopefully deleting it...
421 // We set the current slot to the timeslice value, so that old stuff
422 // will be ignored.
423 assert(numInputTypes * slot.index < cache.size());
424 for (size_t ai = slot.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
425 cache[ai].clear();
426 cachedStateMetrics[ai] = CacheEntryStatus::EMPTY;
427 }
428 };
429
430 pruneCache(slot);
431}
432
433bool isCalibrationData(std::unique_ptr<fair::mq::Message>& first)
434{
435 auto* dph = o2::header::get<DataProcessingHeader*>(first->GetData());
437}
438
440 DataRelayer::relay(void const* rawHeader,
441 std::unique_ptr<fair::mq::Message>* messages,
442 InputInfo const& info,
443 size_t nMessages,
444 size_t nPayloads,
445 std::function<void(TimesliceSlot, std::vector<MessageSet>&, TimesliceIndex::OldestOutputInfo)> onDrop)
446{
447 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
448 DataProcessingHeader const* dph = o2::header::get<DataProcessingHeader*>(rawHeader);
449 // IMPLEMENTATION DETAILS
450 //
451 // This returns true if a given slot is available for the current number of lanes
452 auto isSlotInLane = [currentLane = dph->startTime, maxLanes = mMaxLanes](TimesliceSlot slot) {
453 return (slot.index % maxLanes) == (currentLane % maxLanes);
454 };
455 // This returns the identifier for the given input. We use a separate
456 // function because while it's trivial now, the actual matchmaking will
457 // become more complicated when we will start supporting ranges.
458 auto getInputTimeslice = [&matchers = mInputMatchers,
459 &distinctRoutes = mDistinctRoutesIndex,
460 &rawHeader,
461 &index = mTimesliceIndex](VariableContext& context)
462 -> std::tuple<int, TimesliceId> {
465 auto input = matchToContext(rawHeader, matchers, distinctRoutes, context);
466
467 if (input == INVALID_INPUT) {
468 return {
471 };
472 }
475 if (auto pval = std::get_if<uint64_t>(&context.get(0))) {
476 TimesliceId timeslice{*pval};
477 return {input, timeslice};
478 }
479 // If we get here it means we need to push something out of the cache.
480 return {
483 };
484 };
485
486 // Actually save the header / payload in the slot
487 auto saveInSlot = [&cachedStateMetrics = mCachedStateMetrics,
488 &messages,
489 &nMessages,
490 &nPayloads,
491 &cache = mCache,
492 &services = mContext,
493 numInputTypes = mDistinctRoutesIndex.size()](TimesliceId timeslice, int input, TimesliceSlot slot, InputInfo const& info) -> size_t {
494 O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
495 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "saveInSlot", "saving %{public}s@%zu in slot %zu from %{public}s",
496 fmt::format("{:x}", *o2::header::get<DataHeader*>(messages[0]->GetData())).c_str(),
497 timeslice.value, slot.index,
498 info.index.value == ChannelIndex::INVALID ? "invalid" : services.get<FairMQDeviceProxy>().getInputChannel(info.index)->GetName().c_str());
499 auto cacheIdx = numInputTypes * slot.index + input;
500 MessageSet& target = cache[cacheIdx];
501 cachedStateMetrics[cacheIdx] = CacheEntryStatus::PENDING;
502 // TODO: make sure that multiple parts can only be added within the same call of
503 // DataRelayer::relay
504 assert(nPayloads > 0);
505 size_t saved = 0;
506 for (size_t mi = 0; mi < nMessages; ++mi) {
507 assert(mi + nPayloads < nMessages);
508 // We are in calibration mode and the data does not have the calibration bit set.
509 // We do not store it.
511 O2_SIGNPOST_ID_FROM_POINTER(cid, calibration, &services.get<DataProcessorContext>());
512 O2_SIGNPOST_EVENT_EMIT(calibration, cid, "calibration",
513 "Dropping incoming %zu messages because they are data processing.", nPayloads);
514 // Actually dropping messages.
515 for (size_t i = mi; i < mi + nPayloads + 1; i++) {
516 auto discard = std::move(messages[i]);
517 }
518 mi += nPayloads;
519 continue;
520 }
521 target.add([&messages, &mi](size_t i) -> fair::mq::MessagePtr& { return messages[mi + i]; }, nPayloads + 1);
522 mi += nPayloads;
523 saved += nPayloads;
524 }
525 return saved;
526 };
527
528 auto updateStatistics = [ref = mContext](TimesliceIndex::ActionTaken action) {
529 auto& stats = ref.get<DataProcessingStats>();
530
531 // Update statistics for what happened
532 switch (action) {
534 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
535 break;
537 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
538 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), DataProcessingStats::Op::Add, (int)1});
539 break;
541 stats.updateStats({static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), DataProcessingStats::Op::Add, (int)1});
542 break;
544 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
545 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), DataProcessingStats::Op::Add, (int)1});
546 break;
548 break;
549 }
550 };
551
552 // OUTER LOOP
553 //
554 // This is the actual outer loop processing input as part of a given
555 // timeslice. All the other implementation details are hidden by the lambdas
556 auto input = INVALID_INPUT;
557 auto timeslice = TimesliceId{TimesliceId::INVALID};
559 auto& index = mTimesliceIndex;
560
561 bool needsCleaning = false;
562 // First look for matching slots which already have some
563 // partial match.
564 for (size_t ci = 0; ci < index.size(); ++ci) {
565 slot = TimesliceSlot{ci};
566 if (!isSlotInLane(slot)) {
567 continue;
568 }
569 if (index.isValid(slot) == false) {
570 continue;
571 }
572 std::tie(input, timeslice) = getInputTimeslice(index.getVariablesForSlot(slot));
573 if (input != INVALID_INPUT) {
574 break;
575 }
576 }
577
578 // If we did not find anything, look for slots which
579 // are invalid.
580 if (input == INVALID_INPUT) {
581 for (size_t ci = 0; ci < index.size(); ++ci) {
582 slot = TimesliceSlot{ci};
583 if (index.isValid(slot) == true) {
584 continue;
585 }
586 if (!isSlotInLane(slot)) {
587 continue;
588 }
589 std::tie(input, timeslice) = getInputTimeslice(index.getVariablesForSlot(slot));
590 if (input != INVALID_INPUT) {
591 needsCleaning = true;
592 break;
593 }
594 }
595 }
596
597 auto& stats = mContext.get<DataProcessingStats>();
599 if (input != INVALID_INPUT && TimesliceId::isValid(timeslice) && TimesliceSlot::isValid(slot)) {
600 if (needsCleaning) {
601 this->pruneCache(slot, onDrop);
602 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](const auto& x) { return x.slot == slot; }), mPruneOps.end());
603 }
604 size_t saved = saveInSlot(timeslice, input, slot, info);
605 if (saved == 0) {
606 return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
607 }
608 index.publishSlot(slot);
609 index.markAsDirty(slot, true);
610 stats.updateStats({static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), DataProcessingStats::Op::Add, (int)1});
611 return RelayChoice{.type = RelayChoice::Type::WillRelay, .timeslice = timeslice};
612 }
613
616 VariableContext pristineContext;
617 std::tie(input, timeslice) = getInputTimeslice(pristineContext);
618
619 auto DataHeaderInfo = [&rawHeader]() {
620 std::string error;
621 // extract header from message model
622 const auto* dh = o2::header::get<o2::header::DataHeader*>(rawHeader);
623 if (dh) {
624 error += fmt::format("{}/{}/{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification);
625 } else {
626 error += "invalid header";
627 }
628 return error;
629 };
630
631 if (input == INVALID_INPUT) {
632 LOG(error) << "Could not match incoming data to any input route: " << DataHeaderInfo();
633 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
634 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
635 for (size_t pi = 0; pi < nMessages; ++pi) {
636 messages[pi].reset(nullptr);
637 }
638 return RelayChoice{.type = RelayChoice::Type::Invalid, .timeslice = timeslice};
639 }
640
641 if (TimesliceId::isValid(timeslice) == false) {
642 LOG(error) << "Could not determine the timeslice for input: " << DataHeaderInfo();
643 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
644 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
645 for (size_t pi = 0; pi < nMessages; ++pi) {
646 messages[pi].reset(nullptr);
647 }
648 return RelayChoice{.type = RelayChoice::Type::Invalid, .timeslice = timeslice};
649 }
650
651 O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
653 std::tie(action, slot) = index.replaceLRUWith(pristineContext, timeslice);
654 uint64_t const* debugTimestamp = std::get_if<uint64_t>(&pristineContext.get(0));
655 if (action != TimesliceIndex::ActionTaken::Wait) {
656 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "saveInSlot",
657 "Slot %zu updated with %zu using action %d, %" PRIu64, slot.index, timeslice.value, (int)action, *debugTimestamp);
658 }
659
660 updateStatistics(action);
661
662 switch (action) {
664 return RelayChoice{.type = RelayChoice::Type::Backpressured, .timeslice = timeslice};
666 static std::atomic<size_t> obsoleteCount = 0;
667 static std::atomic<size_t> mult = 1;
668 if ((obsoleteCount++ % (1 * mult)) == 0) {
669 LOGP(warning, "Over {} incoming messages are already obsolete, not relaying.", obsoleteCount.load());
670 if (obsoleteCount > mult * 10) {
671 mult = mult * 10;
672 }
673 }
674 return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
676 LOG(warning) << "Incoming data is invalid, not relaying.";
677 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
678 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
679 for (size_t pi = 0; pi < nMessages; ++pi) {
680 messages[pi].reset(nullptr);
681 }
682 return RelayChoice{.type = RelayChoice::Type::Invalid, .timeslice = timeslice};
685 // At this point the variables match the new input but the
686 // cache still holds the old data, so we prune it.
687 this->pruneCache(slot, onDrop);
688 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](const auto& x) { return x.slot == slot; }), mPruneOps.end());
689 size_t saved = saveInSlot(timeslice, input, slot, info);
690 if (saved == 0) {
691 return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
692 }
693 index.publishSlot(slot);
694 index.markAsDirty(slot, true);
695 return RelayChoice{.type = RelayChoice::Type::WillRelay, .timeslice = timeslice};
696 }
698}
699
700void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& completed)
701{
702 LOGP(debug, "DataRelayer::getReadyToProcess");
703 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
704
705 // THE STATE
706 const auto& cache = mCache;
707 const auto numInputTypes = mDistinctRoutesIndex.size();
708 //
709 // THE IMPLEMENTATION DETAILS
710 //
711 // We use this to bail out early from the check as soon as we find something
712 // which we know is not complete.
713 auto getPartialRecord = [&cache, &numInputTypes](int li) -> gsl::span<MessageSet const> {
714 auto offset = li * numInputTypes;
715 assert(cache.size() >= offset + numInputTypes);
716 auto const start = cache.data() + offset;
717 auto const end = cache.data() + offset + numInputTypes;
718 return {start, end};
719 };
720
721 // These two are trivial, but in principle the whole loop could be parallelised
722 // or vectorised so "completed" could be a thread local variable which needs
723 // merging at the end.
724 auto updateCompletionResults = [&completed](TimesliceSlot li, uint64_t const* timeslice, CompletionPolicy::CompletionOp op) {
725 if (timeslice) {
726 LOGP(debug, "Doing action {} for slot {} (timeslice: {})", (int)op, li.index, *timeslice);
727 completed.emplace_back(RecordAction{li, {*timeslice}, op});
728 } else {
729 LOGP(debug, "No timeslice associated with slot ", li.index);
730 }
731 };
732
733 // THE OUTER LOOP
734 //
735 // To determine if a line is complete, we iterate on all the arguments
736 // and check if they are ready. We do it this way, because in the end
737 // the number of inputs is going to be small and having a more complex
738 // structure will probably result in a larger footprint in any case.
739 // Also notice that ai == inputsNumber only when we reach the end of the
740 // iteration, that means we have found all the required bits.
741 //
742 // Notice that the only time numInputTypes is 0 is when we are a dummy
743 // device created as a source for timers / conditions.
744 if (numInputTypes == 0) {
745 LOGP(debug, "numInputTypes == 0, returning.");
746 return;
747 }
748 size_t cacheLines = cache.size() / numInputTypes;
749 assert(cacheLines * numInputTypes == cache.size());
750 int countConsume = 0;
751 int countConsumeExisting = 0;
752 int countProcess = 0;
753 int countDiscard = 0;
754 int countWait = 0;
755 int notDirty = 0;
756
757 for (int li = cacheLines - 1; li >= 0; --li) {
758 TimesliceSlot slot{(size_t)li};
759 // We only check the cachelines which have been updated by an incoming
760 // message.
761 if (mTimesliceIndex.isDirty(slot) == false) {
762 notDirty++;
763 continue;
764 }
765 if (!mCompletionPolicy.callbackFull) {
766 throw runtime_error_f("Completion police %s has no callback set", mCompletionPolicy.name.c_str());
767 }
768 auto partial = getPartialRecord(li);
769 // TODO: get the data ref from message model
770 auto getter = [&partial](size_t idx, size_t part) {
771 if (partial[idx].size() > 0 && partial[idx].header(part).get()) {
772 auto header = partial[idx].header(part).get();
773 auto payload = partial[idx].payload(part).get();
774 return DataRef{nullptr,
775 reinterpret_cast<const char*>(header->GetData()),
776 reinterpret_cast<char const*>(payload ? payload->GetData() : nullptr),
777 payload ? payload->GetSize() : 0};
778 }
779 return DataRef{};
780 };
781 auto nPartsGetter = [&partial](size_t idx) {
782 return partial[idx].size();
783 };
784#if __has_include(<fairmq/shmem/Message.h>)
785 auto refCountGetter = [&partial](size_t idx) -> int {
786 auto& header = static_cast<const fair::mq::shmem::Message&>(*partial[idx].header(0));
787 return header.GetRefCount();
788 };
789#else
790 std::function<int(size_t)> refCountGetter = nullptr;
791#endif
792 InputSpan span{getter, nPartsGetter, refCountGetter, static_cast<size_t>(partial.size())};
793 CompletionPolicy::CompletionOp action = mCompletionPolicy.callbackFull(span, mInputs, mContext);
794
795 auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
796 auto timeslice = std::get_if<uint64_t>(&variables.get(0));
797 switch (action) {
799 countConsume++;
800 updateCompletionResults(slot, timeslice, action);
801 mTimesliceIndex.markAsDirty(slot, false);
802 break;
804 // This is just like Consume, but we also mark all slots as dirty
805 countConsume++;
807 updateCompletionResults(slot, timeslice, action);
808 mTimesliceIndex.rescan();
809 break;
811 countConsumeExisting++;
812 updateCompletionResults(slot, timeslice, action);
813 mTimesliceIndex.markAsDirty(slot, false);
814 break;
816 countProcess++;
817 updateCompletionResults(slot, timeslice, action);
818 mTimesliceIndex.markAsDirty(slot, false);
819 break;
821 countDiscard++;
822 updateCompletionResults(slot, timeslice, action);
823 mTimesliceIndex.markAsDirty(slot, false);
824 break;
826 countWait++;
827 mTimesliceIndex.markAsDirty(slot, true);
829 break;
831 countWait++;
832 mTimesliceIndex.markAsDirty(slot, false);
833 break;
834 }
835 }
836 mTimesliceIndex.updateOldestPossibleOutput(false);
837 LOGP(debug, "DataRelayer::getReadyToProcess results notDirty:{}, consume:{}, consumeExisting:{}, process:{}, discard:{}, wait:{}",
838 notDirty, countConsume, countConsumeExisting, countProcess,
839 countDiscard, countWait);
840}
841
843{
844 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
845 const auto numInputTypes = mDistinctRoutesIndex.size();
846
847 auto markInputDone = [&cachedStateMetrics = mCachedStateMetrics,
848 &numInputTypes](TimesliceSlot s, size_t arg, CacheEntryStatus oldStatus, CacheEntryStatus newStatus) {
849 auto cacheId = s.index * numInputTypes + arg;
850 if (cachedStateMetrics[cacheId] == oldStatus) {
851 cachedStateMetrics[cacheId] = newStatus;
852 }
853 };
854
855 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
856 markInputDone(slot, ai, oldStatus, newStatus);
857 }
858}
859
860std::vector<o2::framework::MessageSet> DataRelayer::consumeAllInputsForTimeslice(TimesliceSlot slot)
861{
862 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
863
864 const auto numInputTypes = mDistinctRoutesIndex.size();
865 // State of the computation
866 std::vector<MessageSet> messages(numInputTypes);
867 auto& cache = mCache;
868 auto& index = mTimesliceIndex;
869
870 // Nothing to see here, this is just to make the outer loop more understandable.
871 auto jumpToCacheEntryAssociatedWith = [](TimesliceSlot) {
872 return;
873 };
874
875 // We move ownership so that the cache can be reused once the computation is
876 // finished. We mark the given cache slot invalid, so that it can be reused
877 // This means we can still handle old messages if there is still space in the
878 // cache where to put them.
879 auto moveHeaderPayloadToOutput = [&messages,
880 &cachedStateMetrics = mCachedStateMetrics,
881 &cache, &index, &numInputTypes](TimesliceSlot s, size_t arg) {
882 auto cacheId = s.index * numInputTypes + arg;
883 cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
884 // TODO: in the original implementation of the cache, there have been only two messages per entry,
885 // check if the 2 above corresponds to the number of messages.
886 if (cache[cacheId].size() > 0) {
887 messages[arg] = std::move(cache[cacheId]);
888 }
889 index.markAsInvalid(s);
890 };
891
892 // An invalid set of arguments is a set of arguments associated to an invalid
893 // timeslice, so I can simply do that. I keep the assertion there because in principle
894 // we should have dispatched the timeslice already!
895 // FIXME: what happens when we have enough timeslices to hit the invalid one?
896 auto invalidateCacheFor = [&numInputTypes, &index, &cache](TimesliceSlot s) {
897 for (size_t ai = s.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
898 assert(std::accumulate(cache[ai].messages.begin(), cache[ai].messages.end(), true, [](bool result, auto const& element) { return result && element.get() == nullptr; }));
899 cache[ai].clear();
900 }
901 index.markAsInvalid(s);
902 };
903
904 // Outer loop here.
905 jumpToCacheEntryAssociatedWith(slot);
906 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
907 moveHeaderPayloadToOutput(slot, ai);
908 }
909 invalidateCacheFor(slot);
910
911 return messages;
912}
913
914std::vector<o2::framework::MessageSet> DataRelayer::consumeExistingInputsForTimeslice(TimesliceSlot slot)
915{
916 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
917
918 const auto numInputTypes = mDistinctRoutesIndex.size();
919 // State of the computation
920 std::vector<MessageSet> messages(numInputTypes);
921 auto& cache = mCache;
922 auto& index = mTimesliceIndex;
923
924 // Nothing to see here, this is just to make the outer loop more understandable.
925 auto jumpToCacheEntryAssociatedWith = [](TimesliceSlot) {
926 return;
927 };
928
929 // We move ownership so that the cache can be reused once the computation is
930 // finished. We mark the given cache slot invalid, so that it can be reused
931 // This means we can still handle old messages if there is still space in the
932 // cache where to put them.
933 auto copyHeaderPayloadToOutput = [&messages,
934 &cachedStateMetrics = mCachedStateMetrics,
935 &cache, &index, &numInputTypes](TimesliceSlot s, size_t arg) {
936 auto cacheId = s.index * numInputTypes + arg;
937 cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
938 // TODO: in the original implementation of the cache, there have been only two messages per entry,
939 // check if the 2 above corresponds to the number of messages.
940 for (size_t pi = 0; pi < cache[cacheId].size(); pi++) {
941 auto& header = cache[cacheId].header(pi);
942 auto&& newHeader = header->GetTransport()->CreateMessage();
943 newHeader->Copy(*header);
944 messages[arg].add(PartRef{std::move(newHeader), std::move(cache[cacheId].payload(pi))});
945 }
946 };
947
948 // Outer loop here.
949 jumpToCacheEntryAssociatedWith(slot);
950 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
951 copyHeaderPayloadToOutput(slot, ai);
952 }
953
954 return std::move(messages);
955}
956
958{
959 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
960
961 for (auto& cache : mCache) {
962 cache.clear();
963 }
964 for (size_t s = 0; s < mTimesliceIndex.size(); ++s) {
965 mTimesliceIndex.markAsInvalid(TimesliceSlot{s});
966 }
967}
968
969size_t
971{
972 return mCache.size() / mDistinctRoutesIndex.size();
973}
974
980{
981 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
982
983 mTimesliceIndex.resize(s);
984 mVariableContextes.resize(s);
986}
987
989{
990 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
991
992 auto numInputTypes = mDistinctRoutesIndex.size();
993 // FIXME: many of the DataRelayer function rely on allocated cache, so its
994 // maybe misleading to have the allocation in a function primarily for
995 // metrics publishing, do better in setPipelineLength?
996 mCache.resize(numInputTypes * mTimesliceIndex.size());
997 auto& states = mContext.get<DataProcessingStates>();
998
999 mCachedStateMetrics.resize(mCache.size());
1000
1001 // There is maximum 16 variables available. We keep them row-wise so that
1002 // that we can take mod 16 of the index to understand which variable we
1003 // are talking about.
1004 for (size_t i = 0; i < mVariableContextes.size(); ++i) {
1006 .name = fmt::format("matcher_variables/{}", i),
1007 .stateId = static_cast<short>((short)(ProcessingStateId::CONTEXT_VARIABLES_BASE) + i),
1008 .minPublishInterval = 500, // if we publish too often we flood the GUI and we are not able to read it in any case
1009 .sendInitialValue = true,
1010 .defaultEnabled = mContext.get<DriverConfig const>().driverHasGUI,
1011 });
1012 }
1013
1014 for (int ci = 0; ci < mTimesliceIndex.size(); ci++) {
1016 .name = fmt::format("data_relayer/{}", ci),
1017 .stateId = static_cast<short>((short)(ProcessingStateId::DATA_RELAYER_BASE) + (short)ci),
1018 .minPublishInterval = 800, // if we publish too often we flood the GUI and we are not able to read it in any case
1019 .sendInitialValue = true,
1020 .defaultEnabled = mContext.get<DriverConfig const>().driverHasGUI,
1021 });
1022 }
1023}
1024
1026{
1027 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1029}
1030
1032{
1033 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1035}
1036
1038{
1039 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1040 return VariableContextHelpers::getRunNumber(mTimesliceIndex.getVariablesForSlot(slot));
1041}
1042
1044{
1045 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1047}
1048
1050{
1051 if (!mContext.get<DriverConfig const>().driverHasGUI) {
1052 return;
1053 }
1054 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1055 auto& states = mContext.get<DataProcessingStates>();
1056 for (size_t ci = 0; ci < mTimesliceIndex.size(); ++ci) {
1057 auto slot = TimesliceSlot{ci};
1058 sendVariableContextMetrics(mTimesliceIndex.getPublishedVariablesForSlot(slot), slot,
1059 states);
1060 }
1061 char relayerSlotState[1024];
1062 // The number of timeslices is encoded in each state
1063 // We serialise the state of a Timeslot in a given state.
1064 int written = snprintf(relayerSlotState, 1024, "%d ", (int)mTimesliceIndex.size());
1065 char* buffer = relayerSlotState + written;
1066 for (size_t ci = 0; ci < mTimesliceIndex.size(); ++ci) {
1067 for (size_t si = 0; si < mDistinctRoutesIndex.size(); ++si) {
1068 int index = si * mTimesliceIndex.size() + ci;
1069 int value = static_cast<int>(mCachedStateMetrics[index]);
1070 buffer[si] = value + '0';
1071 // Anything which is done is actually already empty,
1072 // so after we report it we mark it as such.
1073 if (mCachedStateMetrics[index] == CacheEntryStatus::DONE) {
1074 mCachedStateMetrics[index] = CacheEntryStatus::EMPTY;
1075 }
1076 }
1077 buffer[mDistinctRoutesIndex.size()] = '\0';
1078 auto size = (int)(buffer - relayerSlotState + mDistinctRoutesIndex.size());
1079 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + ci), .size = size, .data = relayerSlotState});
1080 }
1081}
1082
1083} // namespace o2::framework
benchmark::State & state
#define O2_BUILTIN_UNREACHABLE
o2::monitoring::Verbosity Verbosity
int32_t i
uint32_t op
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:483
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:499
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:525
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:500
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:516
#define O2_LOCKABLE(T)
Definition Tracing.h:20
std::ostringstream debug
std::vector< MessageSet > consumeExistingInputsForTimeslice(TimesliceSlot id)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
std::function< void(TimesliceSlot, std::vector< MessageSet > &, TimesliceIndex::OldestOutputInfo info)> OnDropCallback
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
void getReadyToProcess(std::vector< RecordAction > &completed)
void setPipelineLength(size_t s)
Tune the maximum number of in flight timeslices this can handle.
size_t getParallelTimeslices() const
Returns how many timeslices we can handle in parallel.
std::vector< MessageSet > consumeAllInputsForTimeslice(TimesliceSlot id)
void pruneCache(TimesliceSlot slot, OnDropCallback onDrop=nullptr)
Prune the cache for a given slot.
RelayChoice relay(void const *rawHeader, std::unique_ptr< fair::mq::Message > *messages, InputInfo const &info, size_t nMessages, size_t nPayloads=1, OnDropCallback onDrop=nullptr)
DataRelayer(CompletionPolicy const &, std::vector< InputRoute > const &routes, TimesliceIndex &, ServiceRegistryRef)
void setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
void sendContextState()
Send metrics with the VariableContext information.
TimesliceId getTimesliceForSlot(TimesliceSlot slot)
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
TimesliceIndex::OldestOutputInfo getOldestPossibleOutput() const
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
void clear()
Remove all pending messages.
void markAsDirty(TimesliceSlot slot, bool value)
data_matcher::VariableContext & getPublishedVariablesForSlot(TimesliceSlot slot)
OldestInputInfo setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
OldestOutputInfo getOldestPossibleOutput() const
bool isDirty(TimesliceSlot const &slot) const
InputChannelInfo const & getChannelInfo(ChannelIndex channel) const
ActionTaken
The outcome for the processing of a given timeslot.
@ Wait
An obsolete slot is used to hold the new context and the old one is dropped.
@ DropObsolete
An invalid context is not inserted in the index and dropped.
@ DropInvalid
We wait for the oldest slot to complete.
@ ReplaceObsolete
An unused / invalid slot is used to hold the new context.
void rescan()
Mark all the cachelines as invalid, e.g. due to an out of band event.
bool validateSlot(TimesliceSlot slot, TimesliceId currentOldest)
bool isValid(TimesliceSlot const &slot) const
void markAsInvalid(TimesliceSlot slot)
OldestOutputInfo updateOldestPossibleOutput(bool rewinded)
data_matcher::VariableContext & getVariablesForSlot(TimesliceSlot slot)
void publish(void(*callback)(VariableContext const &, TimesliceSlot slot, void *context), void *context, TimesliceSlot slot)
ContextElement::Value const & get(size_t pos) const
GLint GLenum GLint x
Definition glcorearb.h:403
const GLfloat * m
Definition glcorearb.h:4066
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLuint index
Definition glcorearb.h:781
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum target
Definition glcorearb.h:1641
GLboolean * data
Definition glcorearb.h:298
GLintptr offset
Definition glcorearb.h:660
GLuint start
Definition glcorearb.h:469
GLuint * states
Definition glcorearb.h:4932
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
size_t matchToContext(void const *data, std::vector< DataDescriptorMatcher > const &matchers, std::vector< size_t > const &index, VariableContext &context)
constexpr int INVALID_INPUT
bool isCalibrationData(std::unique_ptr< fair::mq::Message > &first)
void sendVariableContextMetrics(VariableContext &context, TimesliceSlot slot, DataProcessingStates &states)
@ NoTransition
No pending transitions.
RuntimeErrorRef runtime_error_f(const char *,...)
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static constexpr int INVALID
std::string name
Name of the policy itself.
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
CallbackConfigureRelayer configureRelayer
CallbackFull callbackFull
Actual policy which decides what to do with a partial InputRecord, extended version.
static constexpr int32_t KEEP_AT_EOS_FLAG
Helper struct to hold statistics about the data processing happening.
@ Add
Update the rate of the metric given the amount since the last time.
Type type
What was the outcome of the relay operation.
Definition DataRelayer.h:64
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static std::string describe(InputSpec const &spec)
static unsigned int pipelineLength()
get max number of timeslices in the queue
static bool onlineDeploymentMode()
@true if running online
Running state information of a given device.
Definition DeviceState.h:34
ProcessingType allowedProcessing
Definition DeviceState.h:79
fair::mq::Channel * channel
Definition ChannelInfo.h:51
Reference to an inflight part.
Definition PartRef.h:24
static bool isValid(TimesliceId const &timeslice)
static constexpr uint64_t INVALID
static bool isValid(TimesliceSlot const &slot)
static constexpr uint64_t INVALID
static uint32_t getRunNumber(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFCounter(data_matcher::VariableContext const &variables)
static uint64_t getCreationTime(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFOrbit(data_matcher::VariableContext const &variables)
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
the base header struct Every header type must begin (i.e. derive) with this. Don't use this struct di...
Definition DataHeader.h:351
the main header struct
Definition DataHeader.h:618
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"