Project
Loading...
Searching...
No Matches
DataRelayer.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
16
22#include "Framework/DataRef.h"
24#include "Framework/InputSpan.h"
26#include "Framework/Logger.h"
27#include "Framework/PartRef.h"
33#include "DataRelayerHelpers.h"
34#include "InputRouteHelpers.h"
40
43
44#include <Monitoring/Metric.h>
45#include <Monitoring/Monitoring.h>
46
47#include <fairlogger/Logger.h>
48#include <fairmq/Channel.h>
49#include <functional>
50#include <fairmq/shmem/Message.h>
51#include <fmt/format.h>
52#include <fmt/ostream.h>
53#include <span>
54#include <string>
55
56using namespace o2::framework::data_matcher;
59using Verbosity = o2::monitoring::Verbosity;
60
62// Stream which keeps track of the calibration lifetime logic
64
65namespace o2::framework
66{
67
68constexpr int INVALID_INPUT = -1;
69
71 std::vector<InputRoute> const& routes,
73 ServiceRegistryRef services)
74 : mContext{services},
75 mTimesliceIndex{index},
76 mCompletionPolicy{policy},
77 mDistinctRoutesIndex{DataRelayerHelpers::createDistinctRouteIndex(routes)},
78 mInputMatchers{DataRelayerHelpers::createInputMatchers(routes)},
79 mMaxLanes{InputRouteHelpers::maxLanes(routes)}
80{
81 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
82
83 if (policy.configureRelayer == nullptr) {
84 static int pipelineLength = DefaultsHelpers::pipelineLength();
85 setPipelineLength(pipelineLength);
86 } else {
87 policy.configureRelayer(*this);
88 }
89
90 // The queries are all the same, so we only have width 1
91 auto numInputTypes = mDistinctRoutesIndex.size();
92 auto& states = services.get<DataProcessingStates>();
93 std::string queries = "";
94 for (short i = 0; i < numInputTypes; ++i) {
95 char buffer[128];
96 assert(mDistinctRoutesIndex[i] < routes.size());
97 mInputs.push_back(routes[mDistinctRoutesIndex[i]].matcher);
98 auto& matcher = routes[mDistinctRoutesIndex[i]].matcher;
99 DataSpecUtils::describe(buffer, 127, matcher);
100 queries += std::string_view(buffer, strlen(buffer));
101 queries += ";";
102 }
103 auto stateId = (short)ProcessingStateId::DATA_QUERIES;
104 states.registerState({.name = "data_queries", .stateId = stateId, .sendInitialValue = true, .defaultEnabled = true});
105 states.updateState(DataProcessingStates::CommandSpec{.id = stateId, .size = (int)queries.size(), .data = queries.data()});
106 states.processCommandQueue();
107}
108
110{
111 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
112 auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
113 return VariableContextHelpers::getTimeslice(variables);
114}
115
116DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<ExpirationHandler> const& expirationHandlers,
117 ServiceRegistryRef services, bool createNew)
118{
119 LOGP(debug, "DataRelayer::processDanglingInputs");
120 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
121 auto& deviceProxy = services.get<FairMQDeviceProxy>();
122
123 ActivityStats activity;
125 if (expirationHandlers.empty()) {
126 LOGP(debug, "DataRelayer::processDanglingInputs: No expiration handlers");
127 return activity;
128 }
129 // Create any slot for the time based fields
130 std::vector<TimesliceSlot> slotsCreatedByHandlers;
131 if (createNew) {
132 LOGP(debug, "Creating new slot");
133 for (auto& handler : expirationHandlers) {
134 LOGP(debug, "handler.creator for {}", handler.name);
135 auto channelIndex = deviceProxy.getInputChannelIndex(handler.routeIndex);
136 slotsCreatedByHandlers.push_back(handler.creator(services, channelIndex));
137 }
138 }
139 // Count how many slots are not invalid
140 auto validSlots = 0;
141 for (auto slot : slotsCreatedByHandlers) {
142 if (slot.index == TimesliceSlot::INVALID) {
143 continue;
144 }
145 validSlots++;
146 }
147 if (validSlots > 0) {
148 activity.newSlots++;
149 LOGP(debug, "DataRelayer::processDanglingInputs: {} slots created by handler", validSlots);
150 } else {
151 LOGP(debug, "DataRelayer::processDanglingInputs: no slots created by handler");
152 }
153 // Outer loop, we process all the records because the fact that the record
154 // expires is independent from having received data for it.
155 int headerPresent = 0;
156 int payloadPresent = 0;
157 int noCheckers = 0;
158 int badSlot = 0;
159 int checkerDenied = 0;
160 for (size_t ti = 0; ti < mTimesliceIndex.size(); ++ti) {
161 TimesliceSlot slot{ti};
162 if (mTimesliceIndex.isValid(slot) == false) {
163 continue;
164 }
165 assert(mDistinctRoutesIndex.empty() == false);
166 auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
167 auto timestamp = VariableContextHelpers::getTimeslice(variables);
168 // We iterate on all the hanlders checking if they need to be expired.
169 for (size_t ei = 0; ei < expirationHandlers.size(); ++ei) {
170 auto& expirator = expirationHandlers[ei];
171 // We check that no data is already there for the given cell
172 // it is enough to check the first element
173 auto& part = mCache[ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value];
174 if (part.size() > 0 && part.header(0) != nullptr) {
175 headerPresent++;
176 continue;
177 }
178 if (part.size() > 0 && part.payload(0) != nullptr) {
179 payloadPresent++;
180 continue;
181 }
182 // We check that the cell can actually be expired.
183 if (!expirator.checker) {
184 noCheckers++;
185 continue;
186 }
187 if (slotsCreatedByHandlers[ei] != slot) {
188 badSlot++;
189 continue;
190 }
191
192 auto getPartialRecord = [&cache = mCache, numInputTypes = mDistinctRoutesIndex.size()](int li) -> std::span<MessageSet const> {
193 auto offset = li * numInputTypes;
194 assert(cache.size() >= offset + numInputTypes);
195 auto const start = cache.data() + offset;
196 auto const end = cache.data() + offset + numInputTypes;
197 return {start, end};
198 };
199
200 auto partial = getPartialRecord(ti);
201 // TODO: get the data ref from message model
202 auto getter = [&partial](size_t idx, size_t part) {
203 if (partial[idx].size() > 0 && partial[idx].header(part).get()) {
204 auto header = partial[idx].header(part).get();
205 auto payload = partial[idx].payload(part).get();
206 return DataRef{nullptr,
207 reinterpret_cast<const char*>(header->GetData()),
208 reinterpret_cast<char const*>(payload ? payload->GetData() : nullptr),
209 payload ? payload->GetSize() : 0};
210 }
211 return DataRef{};
212 };
213 auto nPartsGetter = [&partial](size_t idx) {
214 return partial[idx].size();
215 };
216 auto refCountGetter = [&partial](size_t idx) -> int {
217 auto& header = static_cast<const fair::mq::shmem::Message&>(*partial[idx].header(0));
218 return header.GetRefCount();
219 };
220 InputSpan span{getter, nPartsGetter, refCountGetter, static_cast<size_t>(partial.size())};
221 // Setup the input span
222
223 if (expirator.checker(services, timestamp.value, span) == false) {
224 checkerDenied++;
225 continue;
226 }
227
228 assert(ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value < mCache.size());
229 assert(expirator.handler);
230 PartRef newRef;
231 expirator.handler(services, newRef, variables);
232 part.reset(std::move(newRef));
233 activity.expiredSlots++;
234
235 mTimesliceIndex.markAsDirty(slot, true);
236 assert(part.header(0) != nullptr);
237 assert(part.payload(0) != nullptr);
238 }
239 }
240 LOGP(debug, "DataRelayer::processDanglingInputs headerPresent:{}, payloadPresent:{}, noCheckers:{}, badSlot:{}, checkerDenied:{}",
241 headerPresent, payloadPresent, noCheckers, badSlot, checkerDenied);
242 return activity;
243}
244
248size_t matchToContext(void const* data,
249 std::vector<DataDescriptorMatcher> const& matchers,
250 std::vector<size_t> const& index,
251 VariableContext& context)
252{
253 for (size_t ri = 0, re = index.size(); ri < re; ++ri) {
254 auto& matcher = matchers[index[ri]];
255
256 if (matcher.match(reinterpret_cast<char const*>(data), context)) {
257 context.commit();
258 return ri;
259 }
260 context.discard();
261 }
262 return INVALID_INPUT;
263}
264
268{
269 static const std::string nullstring{"null"};
270
271 context.publish([](VariableContext const& variables, TimesliceSlot slot, void* context) {
272 auto& states = *static_cast<DataProcessingStates*>(context);
273 static std::string state = "";
274 state.clear();
275 for (size_t i = 0; i < MAX_MATCHING_VARIABLE; ++i) {
276 auto var = variables.get(i);
277 if (auto pval = std::get_if<uint64_t>(&var)) {
278 state += std::to_string(*pval);
279 } else if (auto pval = std::get_if<uint32_t>(&var)) {
280 state += std::to_string(*pval);
281 } else if (auto pval2 = std::get_if<std::string>(&var)) {
282 state += *pval2;
283 } else {
284 }
285 state += ";";
286 }
287 states.updateState({.id = short((int)ProcessingStateId::CONTEXT_VARIABLES_BASE + slot.index),
288 .size = (int)state.size(),
289 .data = state.data()});
290 },
291 &states, slot);
292}
293
295{
296 auto newOldest = mTimesliceIndex.setOldestPossibleInput(proposed, channel);
297 LOGP(debug, "DataRelayer::setOldestPossibleInput {} from channel {}", newOldest.timeslice.value, newOldest.channel.value);
298 static bool dontDrop = getenv("DPL_DONT_DROP_OLD_TIMESLICE") && atoi(getenv("DPL_DONT_DROP_OLD_TIMESLICE"));
299 if (dontDrop) {
300 return;
301 }
302 for (size_t si = 0; si < mCache.size() / mInputs.size(); ++si) {
303 auto& variables = mTimesliceIndex.getVariablesForSlot({si});
304 auto timestamp = VariableContextHelpers::getTimeslice(variables);
305 auto valid = mTimesliceIndex.validateSlot({si}, newOldest.timeslice);
306 if (valid) {
307 if (mTimesliceIndex.isValid({si})) {
308 LOGP(debug, "Keeping slot {} because data has timestamp {} while oldest possible timestamp is {}", si, timestamp.value, newOldest.timeslice.value);
309 }
310 continue;
311 }
312 mPruneOps.push_back(PruneOp{si});
313 bool didDrop = false;
314 for (size_t mi = 0; mi < mInputs.size(); ++mi) {
315 auto& input = mInputs[mi];
316 auto& element = mCache[si * mInputs.size() + mi];
317 if (element.size() != 0) {
318 if (input.lifetime != Lifetime::Condition && mCompletionPolicy.name != "internal-dpl-injected-dummy-sink") {
319 didDrop = true;
320 auto& state = mContext.get<DeviceState>();
322 LOGP(warning, "Stop transition requested. Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it will never be completed.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
323 } else {
324 LOGP(error, "Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it can never be completed.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
325 }
326 } else {
327 LOGP(debug,
328 "Silently dropping data {} in pipeline slot {} because it has timeslice {} < {} after receiving data from channel {}."
329 "Because Lifetime::Timeframe data not there and not expected (e.g. due to sampling) we drop non sampled, non timeframe data (e.g. Conditions).",
330 DataSpecUtils::describe(input), si, timestamp.value, newOldest.timeslice.value,
331 mTimesliceIndex.getChannelInfo(channel).channel->GetName());
332 }
333 }
334 }
335 // We did drop some data. Let's print what was missing.
336 if (didDrop) {
337 for (size_t mi = 0; mi < mInputs.size(); ++mi) {
338 auto& input = mInputs[mi];
339 if (input.lifetime == Lifetime::Timer) {
340 continue;
341 }
342 auto& element = mCache[si * mInputs.size() + mi];
343 if (element.size() == 0) {
344 auto& state = mContext.get<DeviceState>();
346 if (state.allowedProcessing == DeviceState::CalibrationOnly) {
347 O2_SIGNPOST_ID_GENERATE(cid, calibration);
348 O2_SIGNPOST_EVENT_EMIT(calibration, cid, "expected_missing_data", "Expected missing %{public}s (lifetime:%d) while dropping non-calibration data in slot %zu with timestamp %zu < %zu.",
349 DataSpecUtils::describe(input).c_str(), (int)input.lifetime, si, timestamp.value, newOldest.timeslice.value);
350 } else {
351 LOGP(info, "Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
352 }
353 } else {
354 if (state.allowedProcessing == DeviceState::CalibrationOnly) {
355 O2_SIGNPOST_ID_GENERATE(cid, calibration);
356 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "expected_missing_data", "Not processing in calibration mode: missing %s (lifetime:%d) while dropping incomplete data in slot %zu with timestamp %zu < %zu.",
357 DataSpecUtils::describe(input).c_str(), (int)input.lifetime, si, timestamp.value, newOldest.timeslice.value);
358 } else {
359 LOGP(error, "Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
360 }
361 }
362 }
363 }
364 }
365 }
366}
367
372
374{
375 for (auto& op : mPruneOps) {
376 this->pruneCache(op.slot, onDrop);
377 }
378 mPruneOps.clear();
379}
380
382{
383 // We need to prune the cache from the old stuff, if any. Otherwise we
384 // simply store the payload in the cache and we mark relevant bit in the
385 // hence the first if.
386 auto pruneCache = [&onDrop,
387 &cache = mCache,
388 &cachedStateMetrics = mCachedStateMetrics,
389 numInputTypes = mDistinctRoutesIndex.size(),
390 &index = mTimesliceIndex,
391 ref = mContext](TimesliceSlot slot) {
392 if (onDrop) {
393 auto oldestPossibleTimeslice = index.getOldestPossibleOutput();
394 // State of the computation
395 std::vector<MessageSet> dropped(numInputTypes);
396 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
397 auto cacheId = slot.index * numInputTypes + ai;
398 cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
399 // TODO: in the original implementation of the cache, there have been only two messages per entry,
400 // check if the 2 above corresponds to the number of messages.
401 if (cache[cacheId].size() > 0) {
402 dropped[ai] = std::move(cache[cacheId]);
403 }
404 }
405 bool anyDropped = std::any_of(dropped.begin(), dropped.end(), [](auto& m) { return m.size(); });
406 if (anyDropped) {
407 O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
408 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "pruneCache", "Dropping stuff from slot %zu with timeslice %zu", slot.index, oldestPossibleTimeslice.timeslice.value);
409 onDrop(slot, dropped, oldestPossibleTimeslice);
410 }
411 }
412 assert(cache.empty() == false);
413 assert(index.size() * numInputTypes == cache.size());
414 // Prune old stuff from the cache, hopefully deleting it...
415 // We set the current slot to the timeslice value, so that old stuff
416 // will be ignored.
417 assert(numInputTypes * slot.index < cache.size());
418 for (size_t ai = slot.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
419 cache[ai].clear();
420 cachedStateMetrics[ai] = CacheEntryStatus::EMPTY;
421 }
422 };
423
424 pruneCache(slot);
425}
426
427bool isCalibrationData(std::unique_ptr<fair::mq::Message>& first)
428{
429 auto* dph = o2::header::get<DataProcessingHeader*>(first->GetData());
431}
432
434 DataRelayer::relay(void const* rawHeader,
435 std::unique_ptr<fair::mq::Message>* messages,
436 InputInfo const& info,
437 size_t nMessages,
438 size_t nPayloads,
439 std::function<void(TimesliceSlot, std::vector<MessageSet>&, TimesliceIndex::OldestOutputInfo)> onDrop)
440{
441 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
442 DataProcessingHeader const* dph = o2::header::get<DataProcessingHeader*>(rawHeader);
443 // IMPLEMENTATION DETAILS
444 //
445 // This returns true if a given slot is available for the current number of lanes
446 auto isSlotInLane = [currentLane = dph->startTime, maxLanes = mMaxLanes](TimesliceSlot slot) {
447 return (slot.index % maxLanes) == (currentLane % maxLanes);
448 };
449 // This returns the identifier for the given input. We use a separate
450 // function because while it's trivial now, the actual matchmaking will
451 // become more complicated when we will start supporting ranges.
452 auto getInputTimeslice = [&matchers = mInputMatchers,
453 &distinctRoutes = mDistinctRoutesIndex,
454 &rawHeader,
455 &index = mTimesliceIndex](VariableContext& context)
456 -> std::tuple<int, TimesliceId> {
459 auto input = matchToContext(rawHeader, matchers, distinctRoutes, context);
460
461 if (input == INVALID_INPUT) {
462 return {
465 };
466 }
469 if (auto pval = std::get_if<uint64_t>(&context.get(0))) {
470 TimesliceId timeslice{*pval};
471 return {input, timeslice};
472 }
473 // If we get here it means we need to push something out of the cache.
474 return {
477 };
478 };
479
480 // Actually save the header / payload in the slot
481 auto saveInSlot = [&cachedStateMetrics = mCachedStateMetrics,
482 &messages,
483 &nMessages,
484 &nPayloads,
485 &cache = mCache,
486 &services = mContext,
487 numInputTypes = mDistinctRoutesIndex.size()](TimesliceId timeslice, int input, TimesliceSlot slot, InputInfo const& info) -> size_t {
488 O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
489 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "saveInSlot", "saving %{public}s@%zu in slot %zu from %{public}s",
490 fmt::format("{:x}", *o2::header::get<DataHeader*>(messages[0]->GetData())).c_str(),
491 timeslice.value, slot.index,
492 info.index.value == ChannelIndex::INVALID ? "invalid" : services.get<FairMQDeviceProxy>().getInputChannel(info.index)->GetName().c_str());
493 auto cacheIdx = numInputTypes * slot.index + input;
494 MessageSet& target = cache[cacheIdx];
495 cachedStateMetrics[cacheIdx] = CacheEntryStatus::PENDING;
496 // TODO: make sure that multiple parts can only be added within the same call of
497 // DataRelayer::relay
498 assert(nPayloads > 0);
499 size_t saved = 0;
500 for (size_t mi = 0; mi < nMessages; ++mi) {
501 assert(mi + nPayloads < nMessages);
502 // We are in calibration mode and the data does not have the calibration bit set.
503 // We do not store it.
505 O2_SIGNPOST_ID_FROM_POINTER(cid, calibration, &services.get<DataProcessorContext>());
506 O2_SIGNPOST_EVENT_EMIT(calibration, cid, "calibration",
507 "Dropping incoming %zu messages because they are data processing.", nPayloads);
508 // Actually dropping messages.
509 for (size_t i = mi; i < mi + nPayloads + 1; i++) {
510 auto discard = std::move(messages[i]);
511 }
512 mi += nPayloads;
513 continue;
514 }
515 target.add([&messages, &mi](size_t i) -> fair::mq::MessagePtr& { return messages[mi + i]; }, nPayloads + 1);
516 mi += nPayloads;
517 saved += nPayloads;
518 }
519 return saved;
520 };
521
522 auto updateStatistics = [ref = mContext](TimesliceIndex::ActionTaken action) {
523 auto& stats = ref.get<DataProcessingStats>();
524
525 // Update statistics for what happened
526 switch (action) {
528 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
529 break;
531 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
532 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), DataProcessingStats::Op::Add, (int)1});
533 break;
535 stats.updateStats({static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), DataProcessingStats::Op::Add, (int)1});
536 break;
538 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
539 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), DataProcessingStats::Op::Add, (int)1});
540 break;
542 break;
543 }
544 };
545
546 // OUTER LOOP
547 //
548 // This is the actual outer loop processing input as part of a given
549 // timeslice. All the other implementation details are hidden by the lambdas
550 auto input = INVALID_INPUT;
551 auto timeslice = TimesliceId{TimesliceId::INVALID};
553 auto& index = mTimesliceIndex;
554
555 bool needsCleaning = false;
556 // First look for matching slots which already have some
557 // partial match.
558 for (size_t ci = 0; ci < index.size(); ++ci) {
559 slot = TimesliceSlot{ci};
560 if (!isSlotInLane(slot)) {
561 continue;
562 }
563 if (index.isValid(slot) == false) {
564 continue;
565 }
566 std::tie(input, timeslice) = getInputTimeslice(index.getVariablesForSlot(slot));
567 if (input != INVALID_INPUT) {
568 break;
569 }
570 }
571
572 // If we did not find anything, look for slots which
573 // are invalid.
574 if (input == INVALID_INPUT) {
575 for (size_t ci = 0; ci < index.size(); ++ci) {
576 slot = TimesliceSlot{ci};
577 if (index.isValid(slot) == true) {
578 continue;
579 }
580 if (!isSlotInLane(slot)) {
581 continue;
582 }
583 std::tie(input, timeslice) = getInputTimeslice(index.getVariablesForSlot(slot));
584 if (input != INVALID_INPUT) {
585 needsCleaning = true;
586 break;
587 }
588 }
589 }
590
591 auto& stats = mContext.get<DataProcessingStats>();
593 if (input != INVALID_INPUT && TimesliceId::isValid(timeslice) && TimesliceSlot::isValid(slot)) {
594 if (needsCleaning) {
595 this->pruneCache(slot, onDrop);
596 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](const auto& x) { return x.slot == slot; }), mPruneOps.end());
597 }
598 size_t saved = saveInSlot(timeslice, input, slot, info);
599 if (saved == 0) {
600 return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
601 }
602 index.publishSlot(slot);
603 index.markAsDirty(slot, true);
604 stats.updateStats({static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), DataProcessingStats::Op::Add, (int)1});
605 return RelayChoice{.type = RelayChoice::Type::WillRelay, .timeslice = timeslice};
606 }
607
610 VariableContext pristineContext;
611 std::tie(input, timeslice) = getInputTimeslice(pristineContext);
612
613 auto DataHeaderInfo = [&rawHeader]() {
614 std::string error;
615 // extract header from message model
616 const auto* dh = o2::header::get<o2::header::DataHeader*>(rawHeader);
617 if (dh) {
618 error += fmt::format("{}/{}/{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification);
619 } else {
620 error += "invalid header";
621 }
622 return error;
623 };
624
625 if (input == INVALID_INPUT) {
626 LOG(error) << "Could not match incoming data to any input route: " << DataHeaderInfo();
627 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
628 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
629 for (size_t pi = 0; pi < nMessages; ++pi) {
630 messages[pi].reset(nullptr);
631 }
632 return RelayChoice{.type = RelayChoice::Type::Invalid, .timeslice = timeslice};
633 }
634
635 if (TimesliceId::isValid(timeslice) == false) {
636 LOG(error) << "Could not determine the timeslice for input: " << DataHeaderInfo();
637 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
638 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
639 for (size_t pi = 0; pi < nMessages; ++pi) {
640 messages[pi].reset(nullptr);
641 }
642 return RelayChoice{.type = RelayChoice::Type::Invalid, .timeslice = timeslice};
643 }
644
645 O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
647 std::tie(action, slot) = index.replaceLRUWith(pristineContext, timeslice);
648 uint64_t const* debugTimestamp = std::get_if<uint64_t>(&pristineContext.get(0));
649 if (action != TimesliceIndex::ActionTaken::Wait) {
650 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "saveInSlot",
651 "Slot %zu updated with %zu using action %d, %" PRIu64, slot.index, timeslice.value, (int)action, *debugTimestamp);
652 }
653
654 updateStatistics(action);
655
656 switch (action) {
658 return RelayChoice{.type = RelayChoice::Type::Backpressured, .timeslice = timeslice};
660 static std::atomic<size_t> obsoleteCount = 0;
661 static std::atomic<size_t> mult = 1;
662 if ((obsoleteCount++ % (1 * mult)) == 0) {
663 LOGP(warning, "Over {} incoming messages are already obsolete, not relaying.", obsoleteCount.load());
664 if (obsoleteCount > mult * 10) {
665 mult = mult * 10;
666 }
667 }
668 return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
670 LOG(warning) << "Incoming data is invalid, not relaying.";
671 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
672 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
673 for (size_t pi = 0; pi < nMessages; ++pi) {
674 messages[pi].reset(nullptr);
675 }
676 return RelayChoice{.type = RelayChoice::Type::Invalid, .timeslice = timeslice};
679 // At this point the variables match the new input but the
680 // cache still holds the old data, so we prune it.
681 this->pruneCache(slot, onDrop);
682 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](const auto& x) { return x.slot == slot; }), mPruneOps.end());
683 size_t saved = saveInSlot(timeslice, input, slot, info);
684 if (saved == 0) {
685 return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
686 }
687 index.publishSlot(slot);
688 index.markAsDirty(slot, true);
689 return RelayChoice{.type = RelayChoice::Type::WillRelay, .timeslice = timeslice};
690 }
692}
693
694void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& completed)
695{
696 LOGP(debug, "DataRelayer::getReadyToProcess");
697 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
698
699 // THE STATE
700 const auto& cache = mCache;
701 const auto numInputTypes = mDistinctRoutesIndex.size();
702 //
703 // THE IMPLEMENTATION DETAILS
704 //
705 // We use this to bail out early from the check as soon as we find something
706 // which we know is not complete.
707 auto getPartialRecord = [&cache, &numInputTypes](int li) -> std::span<MessageSet const> {
708 auto offset = li * numInputTypes;
709 assert(cache.size() >= offset + numInputTypes);
710 auto const start = cache.data() + offset;
711 auto const end = cache.data() + offset + numInputTypes;
712 return {start, end};
713 };
714
715 // These two are trivial, but in principle the whole loop could be parallelised
716 // or vectorised so "completed" could be a thread local variable which needs
717 // merging at the end.
718 auto updateCompletionResults = [&completed](TimesliceSlot li, uint64_t const* timeslice, CompletionPolicy::CompletionOp op) {
719 if (timeslice) {
720 LOGP(debug, "Doing action {} for slot {} (timeslice: {})", (int)op, li.index, *timeslice);
721 completed.emplace_back(RecordAction{li, {*timeslice}, op});
722 } else {
723 LOGP(debug, "No timeslice associated with slot ", li.index);
724 }
725 };
726
727 // THE OUTER LOOP
728 //
729 // To determine if a line is complete, we iterate on all the arguments
730 // and check if they are ready. We do it this way, because in the end
731 // the number of inputs is going to be small and having a more complex
732 // structure will probably result in a larger footprint in any case.
733 // Also notice that ai == inputsNumber only when we reach the end of the
734 // iteration, that means we have found all the required bits.
735 //
736 // Notice that the only time numInputTypes is 0 is when we are a dummy
737 // device created as a source for timers / conditions.
738 if (numInputTypes == 0) {
739 LOGP(debug, "numInputTypes == 0, returning.");
740 return;
741 }
742 size_t cacheLines = cache.size() / numInputTypes;
743 assert(cacheLines * numInputTypes == cache.size());
744 int countConsume = 0;
745 int countConsumeExisting = 0;
746 int countProcess = 0;
747 int countDiscard = 0;
748 int countWait = 0;
749 int notDirty = 0;
750
751 for (int li = cacheLines - 1; li >= 0; --li) {
752 TimesliceSlot slot{(size_t)li};
753 // We only check the cachelines which have been updated by an incoming
754 // message.
755 if (mTimesliceIndex.isDirty(slot) == false) {
756 notDirty++;
757 continue;
758 }
759 if (!mCompletionPolicy.callbackFull) {
760 throw runtime_error_f("Completion police %s has no callback set", mCompletionPolicy.name.c_str());
761 }
762 auto partial = getPartialRecord(li);
763 // TODO: get the data ref from message model
764 auto getter = [&partial](size_t idx, size_t part) {
765 if (partial[idx].size() > 0 && partial[idx].header(part).get()) {
766 auto header = partial[idx].header(part).get();
767 auto payload = partial[idx].payload(part).get();
768 return DataRef{nullptr,
769 reinterpret_cast<const char*>(header->GetData()),
770 reinterpret_cast<char const*>(payload ? payload->GetData() : nullptr),
771 payload ? payload->GetSize() : 0};
772 }
773 return DataRef{};
774 };
775 auto nPartsGetter = [&partial](size_t idx) {
776 return partial[idx].size();
777 };
778 auto refCountGetter = [&partial](size_t idx) -> int {
779 auto& header = static_cast<const fair::mq::shmem::Message&>(*partial[idx].header(0));
780 return header.GetRefCount();
781 };
782 InputSpan span{getter, nPartsGetter, refCountGetter, static_cast<size_t>(partial.size())};
783 CompletionPolicy::CompletionOp action = mCompletionPolicy.callbackFull(span, mInputs, mContext);
784
785 auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
786 auto timeslice = std::get_if<uint64_t>(&variables.get(0));
787 switch (action) {
789 countConsume++;
790 updateCompletionResults(slot, timeslice, action);
791 mTimesliceIndex.markAsDirty(slot, false);
792 break;
794 // This is just like Consume, but we also mark all slots as dirty
795 countConsume++;
797 updateCompletionResults(slot, timeslice, action);
798 mTimesliceIndex.rescan();
799 break;
801 countConsumeExisting++;
802 updateCompletionResults(slot, timeslice, action);
803 mTimesliceIndex.markAsDirty(slot, false);
804 break;
806 countProcess++;
807 updateCompletionResults(slot, timeslice, action);
808 mTimesliceIndex.markAsDirty(slot, false);
809 break;
811 countDiscard++;
812 updateCompletionResults(slot, timeslice, action);
813 mTimesliceIndex.markAsDirty(slot, false);
814 break;
816 countWait++;
817 mTimesliceIndex.markAsDirty(slot, true);
819 break;
821 countWait++;
822 mTimesliceIndex.markAsDirty(slot, false);
823 break;
824 }
825 }
826 mTimesliceIndex.updateOldestPossibleOutput(false);
827 LOGP(debug, "DataRelayer::getReadyToProcess results notDirty:{}, consume:{}, consumeExisting:{}, process:{}, discard:{}, wait:{}",
828 notDirty, countConsume, countConsumeExisting, countProcess,
829 countDiscard, countWait);
830}
831
833{
834 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
835 const auto numInputTypes = mDistinctRoutesIndex.size();
836
837 auto markInputDone = [&cachedStateMetrics = mCachedStateMetrics,
838 &numInputTypes](TimesliceSlot s, size_t arg, CacheEntryStatus oldStatus, CacheEntryStatus newStatus) {
839 auto cacheId = s.index * numInputTypes + arg;
840 if (cachedStateMetrics[cacheId] == oldStatus) {
841 cachedStateMetrics[cacheId] = newStatus;
842 }
843 };
844
845 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
846 markInputDone(slot, ai, oldStatus, newStatus);
847 }
848}
849
850std::vector<o2::framework::MessageSet> DataRelayer::consumeAllInputsForTimeslice(TimesliceSlot slot)
851{
852 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
853
854 const auto numInputTypes = mDistinctRoutesIndex.size();
855 // State of the computation
856 std::vector<MessageSet> messages(numInputTypes);
857 auto& cache = mCache;
858 auto& index = mTimesliceIndex;
859
860 // Nothing to see here, this is just to make the outer loop more understandable.
861 auto jumpToCacheEntryAssociatedWith = [](TimesliceSlot) {
862 return;
863 };
864
865 // We move ownership so that the cache can be reused once the computation is
866 // finished. We mark the given cache slot invalid, so that it can be reused
867 // This means we can still handle old messages if there is still space in the
868 // cache where to put them.
869 auto moveHeaderPayloadToOutput = [&messages,
870 &cachedStateMetrics = mCachedStateMetrics,
871 &cache, &index, &numInputTypes](TimesliceSlot s, size_t arg) {
872 auto cacheId = s.index * numInputTypes + arg;
873 cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
874 // TODO: in the original implementation of the cache, there have been only two messages per entry,
875 // check if the 2 above corresponds to the number of messages.
876 if (cache[cacheId].size() > 0) {
877 messages[arg] = std::move(cache[cacheId]);
878 }
879 index.markAsInvalid(s);
880 };
881
882 // An invalid set of arguments is a set of arguments associated to an invalid
883 // timeslice, so I can simply do that. I keep the assertion there because in principle
884 // we should have dispatched the timeslice already!
885 // FIXME: what happens when we have enough timeslices to hit the invalid one?
886 auto invalidateCacheFor = [&numInputTypes, &index, &cache](TimesliceSlot s) {
887 for (size_t ai = s.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
888 assert(std::accumulate(cache[ai].messages.begin(), cache[ai].messages.end(), true, [](bool result, auto const& element) { return result && element.get() == nullptr; }));
889 cache[ai].clear();
890 }
891 index.markAsInvalid(s);
892 };
893
894 // Outer loop here.
895 jumpToCacheEntryAssociatedWith(slot);
896 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
897 moveHeaderPayloadToOutput(slot, ai);
898 }
899 invalidateCacheFor(slot);
900
901 return messages;
902}
903
904std::vector<o2::framework::MessageSet> DataRelayer::consumeExistingInputsForTimeslice(TimesliceSlot slot)
905{
906 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
907
908 const auto numInputTypes = mDistinctRoutesIndex.size();
909 // State of the computation
910 std::vector<MessageSet> messages(numInputTypes);
911 auto& cache = mCache;
912 auto& index = mTimesliceIndex;
913
914 // Nothing to see here, this is just to make the outer loop more understandable.
915 auto jumpToCacheEntryAssociatedWith = [](TimesliceSlot) {
916 return;
917 };
918
919 // We move ownership so that the cache can be reused once the computation is
920 // finished. We mark the given cache slot invalid, so that it can be reused
921 // This means we can still handle old messages if there is still space in the
922 // cache where to put them.
923 auto copyHeaderPayloadToOutput = [&messages,
924 &cachedStateMetrics = mCachedStateMetrics,
925 &cache, &index, &numInputTypes](TimesliceSlot s, size_t arg) {
926 auto cacheId = s.index * numInputTypes + arg;
927 cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
928 // TODO: in the original implementation of the cache, there have been only two messages per entry,
929 // check if the 2 above corresponds to the number of messages.
930 for (size_t pi = 0; pi < cache[cacheId].size(); pi++) {
931 auto& header = cache[cacheId].header(pi);
932 auto&& newHeader = header->GetTransport()->CreateMessage();
933 newHeader->Copy(*header);
934 messages[arg].add(PartRef{std::move(newHeader), std::move(cache[cacheId].payload(pi))});
935 }
936 };
937
938 // Outer loop here.
939 jumpToCacheEntryAssociatedWith(slot);
940 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
941 copyHeaderPayloadToOutput(slot, ai);
942 }
943
944 return std::move(messages);
945}
946
948{
949 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
950
951 for (auto& cache : mCache) {
952 cache.clear();
953 }
954 for (size_t s = 0; s < mTimesliceIndex.size(); ++s) {
955 mTimesliceIndex.markAsInvalid(TimesliceSlot{s});
956 }
957}
958
959size_t
961{
962 return mCache.size() / mDistinctRoutesIndex.size();
963}
964
970{
971 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
972
973 mTimesliceIndex.resize(s);
974 mVariableContextes.resize(s);
976}
977
979{
980 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
981
982 auto numInputTypes = mDistinctRoutesIndex.size();
983 // FIXME: many of the DataRelayer function rely on allocated cache, so its
984 // maybe misleading to have the allocation in a function primarily for
985 // metrics publishing, do better in setPipelineLength?
986 mCache.resize(numInputTypes * mTimesliceIndex.size());
987 auto& states = mContext.get<DataProcessingStates>();
988
989 mCachedStateMetrics.resize(mCache.size());
990
991 // There is maximum 16 variables available. We keep them row-wise so that
992 // that we can take mod 16 of the index to understand which variable we
993 // are talking about.
994 for (size_t i = 0; i < mVariableContextes.size(); ++i) {
996 .name = fmt::format("matcher_variables/{}", i),
997 .stateId = static_cast<short>((short)(ProcessingStateId::CONTEXT_VARIABLES_BASE) + i),
998 .minPublishInterval = 500, // if we publish too often we flood the GUI and we are not able to read it in any case
999 .sendInitialValue = true,
1000 .defaultEnabled = mContext.get<DriverConfig const>().driverHasGUI,
1001 });
1002 }
1003
1004 for (int ci = 0; ci < mTimesliceIndex.size(); ci++) {
1006 .name = fmt::format("data_relayer/{}", ci),
1007 .stateId = static_cast<short>((short)(ProcessingStateId::DATA_RELAYER_BASE) + (short)ci),
1008 .minPublishInterval = 800, // if we publish too often we flood the GUI and we are not able to read it in any case
1009 .sendInitialValue = true,
1010 .defaultEnabled = mContext.get<DriverConfig const>().driverHasGUI,
1011 });
1012 }
1013}
1014
1016{
1017 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1019}
1020
1022{
1023 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1025}
1026
1028{
1029 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1030 return VariableContextHelpers::getRunNumber(mTimesliceIndex.getVariablesForSlot(slot));
1031}
1032
1034{
1035 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1037}
1038
1040{
1041 if (!mContext.get<DriverConfig const>().driverHasGUI) {
1042 return;
1043 }
1044 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1045 auto& states = mContext.get<DataProcessingStates>();
1046 for (size_t ci = 0; ci < mTimesliceIndex.size(); ++ci) {
1047 auto slot = TimesliceSlot{ci};
1048 sendVariableContextMetrics(mTimesliceIndex.getPublishedVariablesForSlot(slot), slot,
1049 states);
1050 }
1051 char relayerSlotState[1024];
1052 // The number of timeslices is encoded in each state
1053 // We serialise the state of a Timeslot in a given state.
1054 int written = snprintf(relayerSlotState, 1024, "%d ", (int)mTimesliceIndex.size());
1055 char* buffer = relayerSlotState + written;
1056 for (size_t ci = 0; ci < mTimesliceIndex.size(); ++ci) {
1057 for (size_t si = 0; si < mDistinctRoutesIndex.size(); ++si) {
1058 int index = ci * mDistinctRoutesIndex.size() + si;
1059 int value = static_cast<int>(mCachedStateMetrics[index]);
1060 buffer[si] = value + '0';
1061 // Anything which is done is actually already empty,
1062 // so after we report it we mark it as such.
1063 if (mCachedStateMetrics[index] == CacheEntryStatus::DONE) {
1064 mCachedStateMetrics[index] = CacheEntryStatus::EMPTY;
1065 }
1066 }
1067 buffer[mDistinctRoutesIndex.size()] = '\0';
1068 auto size = (int)(buffer - relayerSlotState + mDistinctRoutesIndex.size());
1069 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + ci), .size = size, .data = relayerSlotState});
1070 }
1071}
1072
1073} // namespace o2::framework
benchmark::State & state
std::vector< OutputRoute > routes
#define O2_BUILTIN_UNREACHABLE
o2::monitoring::Verbosity Verbosity
atype::type element
std::ostringstream debug
int32_t i
uint32_t op
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:531
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_LOCKABLE(T)
Definition Tracing.h:20
std::vector< MessageSet > consumeExistingInputsForTimeslice(TimesliceSlot id)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
std::function< void(TimesliceSlot, std::vector< MessageSet > &, TimesliceIndex::OldestOutputInfo info)> OnDropCallback
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
void getReadyToProcess(std::vector< RecordAction > &completed)
void setPipelineLength(size_t s)
Tune the maximum number of in flight timeslices this can handle.
size_t getParallelTimeslices() const
Returns how many timeslices we can handle in parallel.
std::vector< MessageSet > consumeAllInputsForTimeslice(TimesliceSlot id)
void pruneCache(TimesliceSlot slot, OnDropCallback onDrop=nullptr)
Prune the cache for a given slot.
RelayChoice relay(void const *rawHeader, std::unique_ptr< fair::mq::Message > *messages, InputInfo const &info, size_t nMessages, size_t nPayloads=1, OnDropCallback onDrop=nullptr)
DataRelayer(CompletionPolicy const &, std::vector< InputRoute > const &routes, TimesliceIndex &, ServiceRegistryRef)
void setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
void sendContextState()
Send metrics with the VariableContext information.
TimesliceId getTimesliceForSlot(TimesliceSlot slot)
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
TimesliceIndex::OldestOutputInfo getOldestPossibleOutput() const
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
void clear()
Remove all pending messages.
void markAsDirty(TimesliceSlot slot, bool value)
data_matcher::VariableContext & getPublishedVariablesForSlot(TimesliceSlot slot)
OldestInputInfo setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
OldestOutputInfo getOldestPossibleOutput() const
bool isDirty(TimesliceSlot const &slot) const
InputChannelInfo const & getChannelInfo(ChannelIndex channel) const
ActionTaken
The outcome for the processing of a given timeslot.
@ Wait
An obsolete slot is used to hold the new context and the old one is dropped.
@ DropObsolete
An invalid context is not inserted in the index and dropped.
@ DropInvalid
We wait for the oldest slot to complete.
@ ReplaceObsolete
An unused / invalid slot is used to hold the new context.
void rescan()
Mark all the cachelines as invalid, e.g. due to an out of band event.
bool validateSlot(TimesliceSlot slot, TimesliceId currentOldest)
bool isValid(TimesliceSlot const &slot) const
void markAsInvalid(TimesliceSlot slot)
OldestOutputInfo updateOldestPossibleOutput(bool rewinded)
data_matcher::VariableContext & getVariablesForSlot(TimesliceSlot slot)
void publish(void(*callback)(VariableContext const &, TimesliceSlot slot, void *context), void *context, TimesliceSlot slot)
ContextElement::Value const & get(size_t pos) const
GLint GLenum GLint x
Definition glcorearb.h:403
const GLfloat * m
Definition glcorearb.h:4066
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLuint index
Definition glcorearb.h:781
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum target
Definition glcorearb.h:1641
GLboolean * data
Definition glcorearb.h:298
GLintptr offset
Definition glcorearb.h:660
GLuint start
Definition glcorearb.h:469
GLuint * states
Definition glcorearb.h:4932
Defining PrimaryVertex explicitly as messageable.
size_t matchToContext(void const *data, std::vector< DataDescriptorMatcher > const &matchers, std::vector< size_t > const &index, VariableContext &context)
constexpr int INVALID_INPUT
bool isCalibrationData(std::unique_ptr< fair::mq::Message > &first)
void sendVariableContextMetrics(VariableContext &context, TimesliceSlot slot, DataProcessingStates &states)
@ NoTransition
No pending transitions.
RuntimeErrorRef runtime_error_f(const char *,...)
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static constexpr int INVALID
std::string name
Name of the policy itself.
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
CallbackConfigureRelayer configureRelayer
CallbackFull callbackFull
Actual policy which decides what to do with a partial InputRecord, extended version.
static constexpr int32_t KEEP_AT_EOS_FLAG
Helper struct to hold statistics about the data processing happening.
@ Add
Update the rate of the metric given the amount since the last time.
Type type
What was the outcome of the relay operation.
Definition DataRelayer.h:64
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static std::string describe(InputSpec const &spec)
static unsigned int pipelineLength()
get max number of timeslices in the queue
static bool onlineDeploymentMode()
@true if running online
Running state information of a given device.
Definition DeviceState.h:34
ProcessingType allowedProcessing
Definition DeviceState.h:79
fair::mq::Channel * channel
Definition ChannelInfo.h:51
Reference to an inflight part.
Definition PartRef.h:24
static bool isValid(TimesliceId const &timeslice)
static constexpr uint64_t INVALID
static bool isValid(TimesliceSlot const &slot)
static constexpr uint64_t INVALID
static uint32_t getRunNumber(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFCounter(data_matcher::VariableContext const &variables)
static uint64_t getCreationTime(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFOrbit(data_matcher::VariableContext const &variables)
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
the base header struct Every header type must begin (i.e. derive) with this. Don't use this struct di...
Definition DataHeader.h:351
the main header struct
Definition DataHeader.h:619
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"