Project
Loading...
Searching...
No Matches
DataRelayer.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
15
20#include "Framework/DataRef.h"
22#include "Framework/InputSpan.h"
24#include "Framework/Logger.h"
25#include "Framework/PartRef.h"
31#include "DataRelayerHelpers.h"
32#include "InputRouteHelpers.h"
38
41
42#include <Monitoring/Metric.h>
43#include <Monitoring/Monitoring.h>
44
45#include <fairmq/Channel.h>
46#include <fmt/format.h>
47#include <fmt/ostream.h>
48#include <gsl/span>
49#include <numeric>
50#include <string>
51
52using namespace o2::framework::data_matcher;
55using Verbosity = o2::monitoring::Verbosity;
56
58
59namespace o2::framework
60{
61
62constexpr int INVALID_INPUT = -1;
63
65 std::vector<InputRoute> const& routes,
67 ServiceRegistryRef services)
68 : mContext{services},
69 mTimesliceIndex{index},
70 mCompletionPolicy{policy},
71 mDistinctRoutesIndex{DataRelayerHelpers::createDistinctRouteIndex(routes)},
72 mInputMatchers{DataRelayerHelpers::createInputMatchers(routes)},
73 mMaxLanes{InputRouteHelpers::maxLanes(routes)}
74{
75 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
76
77 if (policy.configureRelayer == nullptr) {
78 static int pipelineLength = DefaultsHelpers::pipelineLength();
79 setPipelineLength(pipelineLength);
80 } else {
81 policy.configureRelayer(*this);
82 }
83
84 // The queries are all the same, so we only have width 1
85 auto numInputTypes = mDistinctRoutesIndex.size();
86 auto& states = services.get<DataProcessingStates>();
87 std::string queries = "";
88 for (short i = 0; i < numInputTypes; ++i) {
89 char buffer[128];
90 assert(mDistinctRoutesIndex[i] < routes.size());
91 mInputs.push_back(routes[mDistinctRoutesIndex[i]].matcher);
92 auto& matcher = routes[mDistinctRoutesIndex[i]].matcher;
93 DataSpecUtils::describe(buffer, 127, matcher);
94 queries += std::string_view(buffer, strlen(buffer));
95 queries += ";";
96 }
98 states.registerState({.name = "data_queries", .stateId = stateId, .sendInitialValue = true, .defaultEnabled = true});
99 states.updateState(DataProcessingStates::CommandSpec{.id = stateId, .size = (int)queries.size(), .data = queries.data()});
100 states.processCommandQueue();
101}
102
104{
105 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
106 auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
107 return VariableContextHelpers::getTimeslice(variables);
108}
109
110DataRelayer::ActivityStats DataRelayer::processDanglingInputs(std::vector<ExpirationHandler> const& expirationHandlers,
111 ServiceRegistryRef services, bool createNew)
112{
113 LOGP(debug, "DataRelayer::processDanglingInputs");
114 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
115 auto& deviceProxy = services.get<FairMQDeviceProxy>();
116
117 ActivityStats activity;
119 if (expirationHandlers.empty()) {
120 LOGP(debug, "DataRelayer::processDanglingInputs: No expiration handlers");
121 return activity;
122 }
123 // Create any slot for the time based fields
124 std::vector<TimesliceSlot> slotsCreatedByHandlers;
125 if (createNew) {
126 LOGP(debug, "Creating new slot");
127 for (auto& handler : expirationHandlers) {
128 LOGP(debug, "handler.creator for {}", handler.name);
129 auto channelIndex = deviceProxy.getInputChannelIndex(handler.routeIndex);
130 slotsCreatedByHandlers.push_back(handler.creator(services, channelIndex));
131 }
132 }
133 // Count how many slots are not invalid
134 auto validSlots = 0;
135 for (auto slot : slotsCreatedByHandlers) {
136 if (slot.index == TimesliceSlot::INVALID) {
137 continue;
138 }
139 validSlots++;
140 }
141 if (validSlots > 0) {
142 activity.newSlots++;
143 LOGP(debug, "DataRelayer::processDanglingInputs: {} slots created by handler", validSlots);
144 } else {
145 LOGP(debug, "DataRelayer::processDanglingInputs: no slots created by handler");
146 }
147 // Outer loop, we process all the records because the fact that the record
148 // expires is independent from having received data for it.
149 int headerPresent = 0;
150 int payloadPresent = 0;
151 int noCheckers = 0;
152 int badSlot = 0;
153 int checkerDenied = 0;
154 for (size_t ti = 0; ti < mTimesliceIndex.size(); ++ti) {
155 TimesliceSlot slot{ti};
156 if (mTimesliceIndex.isValid(slot) == false) {
157 continue;
158 }
159 assert(mDistinctRoutesIndex.empty() == false);
160 auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
161 auto timestamp = VariableContextHelpers::getTimeslice(variables);
162 // We iterate on all the hanlders checking if they need to be expired.
163 for (size_t ei = 0; ei < expirationHandlers.size(); ++ei) {
164 auto& expirator = expirationHandlers[ei];
165 // We check that no data is already there for the given cell
166 // it is enough to check the first element
167 auto& part = mCache[ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value];
168 if (part.size() > 0 && part.header(0) != nullptr) {
169 headerPresent++;
170 continue;
171 }
172 if (part.size() > 0 && part.payload(0) != nullptr) {
173 payloadPresent++;
174 continue;
175 }
176 // We check that the cell can actually be expired.
177 if (!expirator.checker) {
178 noCheckers++;
179 continue;
180 }
181 if (slotsCreatedByHandlers[ei] != slot) {
182 badSlot++;
183 continue;
184 }
185
186 auto getPartialRecord = [&cache = mCache, numInputTypes = mDistinctRoutesIndex.size()](int li) -> gsl::span<MessageSet const> {
187 auto offset = li * numInputTypes;
188 assert(cache.size() >= offset + numInputTypes);
189 auto const start = cache.data() + offset;
190 auto const end = cache.data() + offset + numInputTypes;
191 return {start, end};
192 };
193
194 auto partial = getPartialRecord(ti);
195 // TODO: get the data ref from message model
196 auto getter = [&partial](size_t idx, size_t part) {
197 if (partial[idx].size() > 0 && partial[idx].header(part).get()) {
198 auto header = partial[idx].header(part).get();
199 auto payload = partial[idx].payload(part).get();
200 return DataRef{nullptr,
201 reinterpret_cast<const char*>(header->GetData()),
202 reinterpret_cast<char const*>(payload ? payload->GetData() : nullptr),
203 payload ? payload->GetSize() : 0};
204 }
205 return DataRef{};
206 };
207 auto nPartsGetter = [&partial](size_t idx) {
208 return partial[idx].size();
209 };
210 InputSpan span{getter, nPartsGetter, static_cast<size_t>(partial.size())};
211 // Setup the input span
212
213 if (expirator.checker(services, timestamp.value, span) == false) {
214 checkerDenied++;
215 continue;
216 }
217
218 assert(ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value < mCache.size());
219 assert(expirator.handler);
220 PartRef newRef;
221 expirator.handler(services, newRef, variables);
222 part.reset(std::move(newRef));
223 activity.expiredSlots++;
224
225 mTimesliceIndex.markAsDirty(slot, true);
226 assert(part.header(0) != nullptr);
227 assert(part.payload(0) != nullptr);
228 }
229 }
230 LOGP(debug, "DataRelayer::processDanglingInputs headerPresent:{}, payloadPresent:{}, noCheckers:{}, badSlot:{}, checkerDenied:{}",
231 headerPresent, payloadPresent, noCheckers, badSlot, checkerDenied);
232 return activity;
233}
234
238size_t matchToContext(void const* data,
239 std::vector<DataDescriptorMatcher> const& matchers,
240 std::vector<size_t> const& index,
241 VariableContext& context)
242{
243 for (size_t ri = 0, re = index.size(); ri < re; ++ri) {
244 auto& matcher = matchers[index[ri]];
245
246 if (matcher.match(reinterpret_cast<char const*>(data), context)) {
247 context.commit();
248 return ri;
249 }
250 context.discard();
251 }
252 return INVALID_INPUT;
253}
254
258{
259 static const std::string nullstring{"null"};
260
261 context.publish([](VariableContext const& variables, TimesliceSlot slot, void* context) {
262 auto& states = *static_cast<DataProcessingStates*>(context);
263 static std::string state = "";
264 state.clear();
265 for (size_t i = 0; i < MAX_MATCHING_VARIABLE; ++i) {
266 auto var = variables.get(i);
267 if (auto pval = std::get_if<uint64_t>(&var)) {
268 state += std::to_string(*pval);
269 } else if (auto pval = std::get_if<uint32_t>(&var)) {
270 state += std::to_string(*pval);
271 } else if (auto pval2 = std::get_if<std::string>(&var)) {
272 state += *pval2;
273 } else {
274 }
275 state += ";";
276 }
277 states.updateState({.id = short((int)ProcessingStateId::CONTEXT_VARIABLES_BASE + slot.index),
278 .size = (int)state.size(),
279 .data = state.data()});
280 },
281 &states, slot);
282}
283
285{
286 auto newOldest = mTimesliceIndex.setOldestPossibleInput(proposed, channel);
287 LOGP(debug, "DataRelayer::setOldestPossibleInput {} from channel {}", newOldest.timeslice.value, newOldest.channel.value);
288 static bool dontDrop = getenv("DPL_DONT_DROP_OLD_TIMESLICE") && atoi(getenv("DPL_DONT_DROP_OLD_TIMESLICE"));
289 if (dontDrop) {
290 return;
291 }
292 for (size_t si = 0; si < mCache.size() / mInputs.size(); ++si) {
293 auto& variables = mTimesliceIndex.getVariablesForSlot({si});
294 auto timestamp = VariableContextHelpers::getTimeslice(variables);
295 auto valid = mTimesliceIndex.validateSlot({si}, newOldest.timeslice);
296 if (valid) {
297 if (mTimesliceIndex.isValid({si})) {
298 LOGP(debug, "Keeping slot {} because data has timestamp {} while oldest possible timestamp is {}", si, timestamp.value, newOldest.timeslice.value);
299 }
300 continue;
301 }
302 mPruneOps.push_back(PruneOp{si});
303 bool didDrop = false;
304 for (size_t mi = 0; mi < mInputs.size(); ++mi) {
305 auto& input = mInputs[mi];
306 auto& element = mCache[si * mInputs.size() + mi];
307 if (element.size() != 0) {
308 if (input.lifetime != Lifetime::Condition && mCompletionPolicy.name != "internal-dpl-injected-dummy-sink") {
309 didDrop = true;
310 auto& state = mContext.get<DeviceState>();
312 LOGP(warning, "Stop transition requested. Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it will never be completed.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
313 } else {
314 LOGP(error, "Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it can never be completed.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
315 }
316 } else {
317 LOGP(debug,
318 "Silently dropping data {} in pipeline slot {} because it has timeslice {} < {} after receiving data from channel {}."
319 "Because Lifetime::Timeframe data not there and not expected (e.g. due to sampling) we drop non sampled, non timeframe data (e.g. Conditions).",
320 DataSpecUtils::describe(input), si, timestamp.value, newOldest.timeslice.value,
321 mTimesliceIndex.getChannelInfo(channel).channel->GetName());
322 }
323 }
324 }
325 // We did drop some data. Let's print what was missing.
326 if (didDrop) {
327 for (size_t mi = 0; mi < mInputs.size(); ++mi) {
328 auto& input = mInputs[mi];
329 if (input.lifetime == Lifetime::Timer) {
330 continue;
331 }
332 auto& element = mCache[si * mInputs.size() + mi];
333 if (element.size() == 0) {
334 auto& state = mContext.get<DeviceState>();
336 LOGP(warning, "Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
337 } else {
338 LOGP(error, "Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.", DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
339 }
340 }
341 }
342 }
343 }
344}
345
350
352{
353 for (auto& op : mPruneOps) {
354 this->pruneCache(op.slot, onDrop);
355 }
356 mPruneOps.clear();
357}
358
360{
361 // We need to prune the cache from the old stuff, if any. Otherwise we
362 // simply store the payload in the cache and we mark relevant bit in the
363 // hence the first if.
364 auto pruneCache = [&onDrop,
365 &cache = mCache,
366 &cachedStateMetrics = mCachedStateMetrics,
367 numInputTypes = mDistinctRoutesIndex.size(),
368 &index = mTimesliceIndex,
369 ref = mContext](TimesliceSlot slot) {
370 if (onDrop) {
371 auto oldestPossibleTimeslice = index.getOldestPossibleOutput();
372 // State of the computation
373 std::vector<MessageSet> dropped(numInputTypes);
374 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
375 auto cacheId = slot.index * numInputTypes + ai;
376 cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
377 // TODO: in the original implementation of the cache, there have been only two messages per entry,
378 // check if the 2 above corresponds to the number of messages.
379 if (cache[cacheId].size() > 0) {
380 dropped[ai] = std::move(cache[cacheId]);
381 }
382 }
383 bool anyDropped = std::any_of(dropped.begin(), dropped.end(), [](auto& m) { return m.size(); });
384 if (anyDropped) {
385 O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
386 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "pruneCache", "Dropping stuff from slot %zu with timeslice %zu", slot.index, oldestPossibleTimeslice.timeslice.value);
387 onDrop(slot, dropped, oldestPossibleTimeslice);
388 }
389 }
390 assert(cache.empty() == false);
391 assert(index.size() * numInputTypes == cache.size());
392 // Prune old stuff from the cache, hopefully deleting it...
393 // We set the current slot to the timeslice value, so that old stuff
394 // will be ignored.
395 assert(numInputTypes * slot.index < cache.size());
396 for (size_t ai = slot.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
397 cache[ai].clear();
398 cachedStateMetrics[ai] = CacheEntryStatus::EMPTY;
399 }
400 };
401
402 pruneCache(slot);
403}
404
405bool isCalibrationData(std::unique_ptr<fair::mq::Message>& first)
406{
407 auto* dh = o2::header::get<DataHeader*>(first->GetData());
408 return dh->flagsDerivedHeader & DataProcessingHeader::KEEP_AT_EOS_FLAG;
409}
410
412 DataRelayer::relay(void const* rawHeader,
413 std::unique_ptr<fair::mq::Message>* messages,
414 InputInfo const& info,
415 size_t nMessages,
416 size_t nPayloads,
417 std::function<void(TimesliceSlot, std::vector<MessageSet>&, TimesliceIndex::OldestOutputInfo)> onDrop)
418{
419 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
420 DataProcessingHeader const* dph = o2::header::get<DataProcessingHeader*>(rawHeader);
421 // IMPLEMENTATION DETAILS
422 //
423 // This returns true if a given slot is available for the current number of lanes
424 auto isSlotInLane = [currentLane = dph->startTime, maxLanes = mMaxLanes](TimesliceSlot slot) {
425 return (slot.index % maxLanes) == (currentLane % maxLanes);
426 };
427 // This returns the identifier for the given input. We use a separate
428 // function because while it's trivial now, the actual matchmaking will
429 // become more complicated when we will start supporting ranges.
430 auto getInputTimeslice = [&matchers = mInputMatchers,
431 &distinctRoutes = mDistinctRoutesIndex,
432 &rawHeader,
433 &index = mTimesliceIndex](VariableContext& context)
434 -> std::tuple<int, TimesliceId> {
437 auto input = matchToContext(rawHeader, matchers, distinctRoutes, context);
438
439 if (input == INVALID_INPUT) {
440 return {
443 };
444 }
447 if (auto pval = std::get_if<uint64_t>(&context.get(0))) {
448 TimesliceId timeslice{*pval};
449 return {input, timeslice};
450 }
451 // If we get here it means we need to push something out of the cache.
452 return {
455 };
456 };
457
458 // Actually save the header / payload in the slot
459 auto saveInSlot = [&cachedStateMetrics = mCachedStateMetrics,
460 &messages,
461 &nMessages,
462 &nPayloads,
463 &cache = mCache,
464 &services = mContext,
465 numInputTypes = mDistinctRoutesIndex.size()](TimesliceId timeslice, int input, TimesliceSlot slot, InputInfo const& info) -> size_t {
466 O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
467 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "saveInSlot", "saving %{public}s@%zu in slot %zu from %{public}s",
468 fmt::format("{:x}", *o2::header::get<DataHeader*>(messages[0]->GetData())).c_str(),
469 timeslice.value, slot.index,
470 info.index.value == ChannelIndex::INVALID ? "invalid" : services.get<FairMQDeviceProxy>().getInputChannel(info.index)->GetName().c_str());
471 auto cacheIdx = numInputTypes * slot.index + input;
472 MessageSet& target = cache[cacheIdx];
473 cachedStateMetrics[cacheIdx] = CacheEntryStatus::PENDING;
474 // TODO: make sure that multiple parts can only be added within the same call of
475 // DataRelayer::relay
476 assert(nPayloads > 0);
477 size_t saved = 0;
478 for (size_t mi = 0; mi < nMessages; ++mi) {
479 assert(mi + nPayloads < nMessages);
480 // We are in calibration mode and the data does not have the calibration bit set.
481 // We do not store it.
483 mi += nPayloads;
484 continue;
485 }
486 target.add([&messages, &mi](size_t i) -> fair::mq::MessagePtr& { return messages[mi + i]; }, nPayloads + 1);
487 mi += nPayloads;
488 saved += nPayloads;
489 }
490 return saved;
491 };
492
493 auto updateStatistics = [ref = mContext](TimesliceIndex::ActionTaken action) {
494 auto& stats = ref.get<DataProcessingStats>();
495
496 // Update statistics for what happened
497 switch (action) {
499 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
500 break;
502 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
503 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), DataProcessingStats::Op::Add, (int)1});
504 break;
506 stats.updateStats({static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), DataProcessingStats::Op::Add, (int)1});
507 break;
509 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
510 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), DataProcessingStats::Op::Add, (int)1});
511 break;
513 break;
514 }
515 };
516
517 // OUTER LOOP
518 //
519 // This is the actual outer loop processing input as part of a given
520 // timeslice. All the other implementation details are hidden by the lambdas
521 auto input = INVALID_INPUT;
522 auto timeslice = TimesliceId{TimesliceId::INVALID};
524 auto& index = mTimesliceIndex;
525
526 bool needsCleaning = false;
527 // First look for matching slots which already have some
528 // partial match.
529 for (size_t ci = 0; ci < index.size(); ++ci) {
530 slot = TimesliceSlot{ci};
531 if (!isSlotInLane(slot)) {
532 continue;
533 }
534 if (index.isValid(slot) == false) {
535 continue;
536 }
537 std::tie(input, timeslice) = getInputTimeslice(index.getVariablesForSlot(slot));
538 if (input != INVALID_INPUT) {
539 break;
540 }
541 }
542
543 // If we did not find anything, look for slots which
544 // are invalid.
545 if (input == INVALID_INPUT) {
546 for (size_t ci = 0; ci < index.size(); ++ci) {
547 slot = TimesliceSlot{ci};
548 if (index.isValid(slot) == true) {
549 continue;
550 }
551 if (!isSlotInLane(slot)) {
552 continue;
553 }
554 std::tie(input, timeslice) = getInputTimeslice(index.getVariablesForSlot(slot));
555 if (input != INVALID_INPUT) {
556 needsCleaning = true;
557 break;
558 }
559 }
560 }
561
562 auto& stats = mContext.get<DataProcessingStats>();
564 if (input != INVALID_INPUT && TimesliceId::isValid(timeslice) && TimesliceSlot::isValid(slot)) {
565 if (needsCleaning) {
566 this->pruneCache(slot, onDrop);
567 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](const auto& x) { return x.slot == slot; }), mPruneOps.end());
568 }
569 size_t saved = saveInSlot(timeslice, input, slot, info);
570 if (saved == 0) {
571 return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
572 }
573 index.publishSlot(slot);
574 index.markAsDirty(slot, true);
575 stats.updateStats({static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), DataProcessingStats::Op::Add, (int)1});
576 return RelayChoice{.type = RelayChoice::Type::WillRelay, .timeslice = timeslice};
577 }
578
581 VariableContext pristineContext;
582 std::tie(input, timeslice) = getInputTimeslice(pristineContext);
583
584 auto DataHeaderInfo = [&rawHeader]() {
585 std::string error;
586 // extract header from message model
587 const auto* dh = o2::header::get<o2::header::DataHeader*>(rawHeader);
588 if (dh) {
589 error += fmt::format("{}/{}/{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification);
590 } else {
591 error += "invalid header";
592 }
593 return error;
594 };
595
596 if (input == INVALID_INPUT) {
597 LOG(error) << "Could not match incoming data to any input route: " << DataHeaderInfo();
598 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
599 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
600 for (size_t pi = 0; pi < nMessages; ++pi) {
601 messages[pi].reset(nullptr);
602 }
603 return RelayChoice{.type = RelayChoice::Type::Invalid, .timeslice = timeslice};
604 }
605
606 if (TimesliceId::isValid(timeslice) == false) {
607 LOG(error) << "Could not determine the timeslice for input: " << DataHeaderInfo();
608 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
609 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
610 for (size_t pi = 0; pi < nMessages; ++pi) {
611 messages[pi].reset(nullptr);
612 }
613 return RelayChoice{.type = RelayChoice::Type::Invalid, .timeslice = timeslice};
614 }
615
616 O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
618 std::tie(action, slot) = index.replaceLRUWith(pristineContext, timeslice);
619 uint64_t const* debugTimestamp = std::get_if<uint64_t>(&pristineContext.get(0));
620 if (action != TimesliceIndex::ActionTaken::Wait) {
621 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "saveInSlot",
622 "Slot %zu updated with %zu using action %d, %" PRIu64, slot.index, timeslice.value, (int)action, *debugTimestamp);
623 }
624
625 updateStatistics(action);
626
627 switch (action) {
629 return RelayChoice{.type = RelayChoice::Type::Backpressured, .timeslice = timeslice};
631 static std::atomic<size_t> obsoleteCount = 0;
632 static std::atomic<size_t> mult = 1;
633 if ((obsoleteCount++ % (1 * mult)) == 0) {
634 LOGP(warning, "Over {} incoming messages are already obsolete, not relaying.", obsoleteCount.load());
635 if (obsoleteCount > mult * 10) {
636 mult = mult * 10;
637 }
638 }
639 return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
641 LOG(warning) << "Incoming data is invalid, not relaying.";
642 stats.updateStats({static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), DataProcessingStats::Op::Add, (int)1});
643 stats.updateStats({static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), DataProcessingStats::Op::Add, (int)1});
644 for (size_t pi = 0; pi < nMessages; ++pi) {
645 messages[pi].reset(nullptr);
646 }
647 return RelayChoice{.type = RelayChoice::Type::Invalid, .timeslice = timeslice};
650 // At this point the variables match the new input but the
651 // cache still holds the old data, so we prune it.
652 this->pruneCache(slot, onDrop);
653 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](const auto& x) { return x.slot == slot; }), mPruneOps.end());
654 size_t saved = saveInSlot(timeslice, input, slot, info);
655 if (saved == 0) {
656 return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
657 }
658 index.publishSlot(slot);
659 index.markAsDirty(slot, true);
661 }
663}
664
665void DataRelayer::getReadyToProcess(std::vector<DataRelayer::RecordAction>& completed)
666{
667 LOGP(debug, "DataRelayer::getReadyToProcess");
668 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
669
670 // THE STATE
671 const auto& cache = mCache;
672 const auto numInputTypes = mDistinctRoutesIndex.size();
673 //
674 // THE IMPLEMENTATION DETAILS
675 //
676 // We use this to bail out early from the check as soon as we find something
677 // which we know is not complete.
678 auto getPartialRecord = [&cache, &numInputTypes](int li) -> gsl::span<MessageSet const> {
679 auto offset = li * numInputTypes;
680 assert(cache.size() >= offset + numInputTypes);
681 auto const start = cache.data() + offset;
682 auto const end = cache.data() + offset + numInputTypes;
683 return {start, end};
684 };
685
686 // These two are trivial, but in principle the whole loop could be parallelised
687 // or vectorised so "completed" could be a thread local variable which needs
688 // merging at the end.
689 auto updateCompletionResults = [&completed](TimesliceSlot li, uint64_t const* timeslice, CompletionPolicy::CompletionOp op) {
690 if (timeslice) {
691 LOGP(debug, "Doing action {} for slot {} (timeslice: {})", (int)op, li.index, *timeslice);
692 completed.emplace_back(RecordAction{li, {*timeslice}, op});
693 } else {
694 LOGP(debug, "No timeslice associated with slot ", li.index);
695 }
696 };
697
698 // THE OUTER LOOP
699 //
700 // To determine if a line is complete, we iterate on all the arguments
701 // and check if they are ready. We do it this way, because in the end
702 // the number of inputs is going to be small and having a more complex
703 // structure will probably result in a larger footprint in any case.
704 // Also notice that ai == inputsNumber only when we reach the end of the
705 // iteration, that means we have found all the required bits.
706 //
707 // Notice that the only time numInputTypes is 0 is when we are a dummy
708 // device created as a source for timers / conditions.
709 if (numInputTypes == 0) {
710 LOGP(debug, "numInputTypes == 0, returning.");
711 return;
712 }
713 size_t cacheLines = cache.size() / numInputTypes;
714 assert(cacheLines * numInputTypes == cache.size());
715 int countConsume = 0;
716 int countConsumeExisting = 0;
717 int countProcess = 0;
718 int countDiscard = 0;
719 int countWait = 0;
720 int notDirty = 0;
721
722 for (int li = cacheLines - 1; li >= 0; --li) {
723 TimesliceSlot slot{(size_t)li};
724 // We only check the cachelines which have been updated by an incoming
725 // message.
726 if (mTimesliceIndex.isDirty(slot) == false) {
727 notDirty++;
728 continue;
729 }
730 if (!mCompletionPolicy.callbackFull) {
731 throw runtime_error_f("Completion police %s has no callback set", mCompletionPolicy.name.c_str());
732 }
733 auto partial = getPartialRecord(li);
734 // TODO: get the data ref from message model
735 auto getter = [&partial](size_t idx, size_t part) {
736 if (partial[idx].size() > 0 && partial[idx].header(part).get()) {
737 auto header = partial[idx].header(part).get();
738 auto payload = partial[idx].payload(part).get();
739 return DataRef{nullptr,
740 reinterpret_cast<const char*>(header->GetData()),
741 reinterpret_cast<char const*>(payload ? payload->GetData() : nullptr),
742 payload ? payload->GetSize() : 0};
743 }
744 return DataRef{};
745 };
746 auto nPartsGetter = [&partial](size_t idx) {
747 return partial[idx].size();
748 };
749 InputSpan span{getter, nPartsGetter, static_cast<size_t>(partial.size())};
750 CompletionPolicy::CompletionOp action = mCompletionPolicy.callbackFull(span, mInputs, mContext);
751
752 auto& variables = mTimesliceIndex.getVariablesForSlot(slot);
753 auto timeslice = std::get_if<uint64_t>(&variables.get(0));
754 switch (action) {
756 countConsume++;
757 updateCompletionResults(slot, timeslice, action);
758 mTimesliceIndex.markAsDirty(slot, false);
759 break;
761 // This is just like Consume, but we also mark all slots as dirty
762 countConsume++;
764 updateCompletionResults(slot, timeslice, action);
765 mTimesliceIndex.rescan();
766 break;
768 countConsumeExisting++;
769 updateCompletionResults(slot, timeslice, action);
770 mTimesliceIndex.markAsDirty(slot, false);
771 break;
773 countProcess++;
774 updateCompletionResults(slot, timeslice, action);
775 mTimesliceIndex.markAsDirty(slot, false);
776 break;
778 countDiscard++;
779 updateCompletionResults(slot, timeslice, action);
780 mTimesliceIndex.markAsDirty(slot, false);
781 break;
783 countWait++;
784 mTimesliceIndex.markAsDirty(slot, true);
786 break;
788 countWait++;
789 mTimesliceIndex.markAsDirty(slot, false);
790 break;
791 }
792 }
793 mTimesliceIndex.updateOldestPossibleOutput(false);
794 LOGP(debug, "DataRelayer::getReadyToProcess results notDirty:{}, consume:{}, consumeExisting:{}, process:{}, discard:{}, wait:{}",
795 notDirty, countConsume, countConsumeExisting, countProcess,
796 countDiscard, countWait);
797}
798
800{
801 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
802 const auto numInputTypes = mDistinctRoutesIndex.size();
803
804 auto markInputDone = [&cachedStateMetrics = mCachedStateMetrics,
805 &numInputTypes](TimesliceSlot s, size_t arg, CacheEntryStatus oldStatus, CacheEntryStatus newStatus) {
806 auto cacheId = s.index * numInputTypes + arg;
807 if (cachedStateMetrics[cacheId] == oldStatus) {
808 cachedStateMetrics[cacheId] = newStatus;
809 }
810 };
811
812 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
813 markInputDone(slot, ai, oldStatus, newStatus);
814 }
815}
816
817std::vector<o2::framework::MessageSet> DataRelayer::consumeAllInputsForTimeslice(TimesliceSlot slot)
818{
819 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
820
821 const auto numInputTypes = mDistinctRoutesIndex.size();
822 // State of the computation
823 std::vector<MessageSet> messages(numInputTypes);
824 auto& cache = mCache;
825 auto& index = mTimesliceIndex;
826
827 // Nothing to see here, this is just to make the outer loop more understandable.
828 auto jumpToCacheEntryAssociatedWith = [](TimesliceSlot) {
829 return;
830 };
831
832 // We move ownership so that the cache can be reused once the computation is
833 // finished. We mark the given cache slot invalid, so that it can be reused
834 // This means we can still handle old messages if there is still space in the
835 // cache where to put them.
836 auto moveHeaderPayloadToOutput = [&messages,
837 &cachedStateMetrics = mCachedStateMetrics,
838 &cache, &index, &numInputTypes](TimesliceSlot s, size_t arg) {
839 auto cacheId = s.index * numInputTypes + arg;
840 cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
841 // TODO: in the original implementation of the cache, there have been only two messages per entry,
842 // check if the 2 above corresponds to the number of messages.
843 if (cache[cacheId].size() > 0) {
844 messages[arg] = std::move(cache[cacheId]);
845 }
846 index.markAsInvalid(s);
847 };
848
849 // An invalid set of arguments is a set of arguments associated to an invalid
850 // timeslice, so I can simply do that. I keep the assertion there because in principle
851 // we should have dispatched the timeslice already!
852 // FIXME: what happens when we have enough timeslices to hit the invalid one?
853 auto invalidateCacheFor = [&numInputTypes, &index, &cache](TimesliceSlot s) {
854 for (size_t ai = s.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
855 assert(std::accumulate(cache[ai].messages.begin(), cache[ai].messages.end(), true, [](bool result, auto const& element) { return result && element.get() == nullptr; }));
856 cache[ai].clear();
857 }
858 index.markAsInvalid(s);
859 };
860
861 // Outer loop here.
862 jumpToCacheEntryAssociatedWith(slot);
863 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
864 moveHeaderPayloadToOutput(slot, ai);
865 }
866 invalidateCacheFor(slot);
867
868 return messages;
869}
870
871std::vector<o2::framework::MessageSet> DataRelayer::consumeExistingInputsForTimeslice(TimesliceSlot slot)
872{
873 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
874
875 const auto numInputTypes = mDistinctRoutesIndex.size();
876 // State of the computation
877 std::vector<MessageSet> messages(numInputTypes);
878 auto& cache = mCache;
879 auto& index = mTimesliceIndex;
880
881 // Nothing to see here, this is just to make the outer loop more understandable.
882 auto jumpToCacheEntryAssociatedWith = [](TimesliceSlot) {
883 return;
884 };
885
886 // We move ownership so that the cache can be reused once the computation is
887 // finished. We mark the given cache slot invalid, so that it can be reused
888 // This means we can still handle old messages if there is still space in the
889 // cache where to put them.
890 auto copyHeaderPayloadToOutput = [&messages,
891 &cachedStateMetrics = mCachedStateMetrics,
892 &cache, &index, &numInputTypes](TimesliceSlot s, size_t arg) {
893 auto cacheId = s.index * numInputTypes + arg;
894 cachedStateMetrics[cacheId] = CacheEntryStatus::RUNNING;
895 // TODO: in the original implementation of the cache, there have been only two messages per entry,
896 // check if the 2 above corresponds to the number of messages.
897 for (size_t pi = 0; pi < cache[cacheId].size(); pi++) {
898 auto& header = cache[cacheId].header(pi);
899 auto&& newHeader = header->GetTransport()->CreateMessage();
900 newHeader->Copy(*header);
901 messages[arg].add(PartRef{std::move(newHeader), std::move(cache[cacheId].payload(pi))});
902 }
903 };
904
905 // Outer loop here.
906 jumpToCacheEntryAssociatedWith(slot);
907 for (size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
908 copyHeaderPayloadToOutput(slot, ai);
909 }
910
911 return std::move(messages);
912}
913
915{
916 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
917
918 for (auto& cache : mCache) {
919 cache.clear();
920 }
921 for (size_t s = 0; s < mTimesliceIndex.size(); ++s) {
922 mTimesliceIndex.markAsInvalid(TimesliceSlot{s});
923 }
924}
925
926size_t
928{
929 return mCache.size() / mDistinctRoutesIndex.size();
930}
931
937{
938 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
939
940 mTimesliceIndex.resize(s);
941 mVariableContextes.resize(s);
943}
944
946{
947 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
948
949 auto numInputTypes = mDistinctRoutesIndex.size();
950 // FIXME: many of the DataRelayer function rely on allocated cache, so its
951 // maybe misleading to have the allocation in a function primarily for
952 // metrics publishing, do better in setPipelineLength?
953 mCache.resize(numInputTypes * mTimesliceIndex.size());
954 auto& states = mContext.get<DataProcessingStates>();
955
956 mCachedStateMetrics.resize(mCache.size());
957
958 // There is maximum 16 variables available. We keep them row-wise so that
959 // that we can take mod 16 of the index to understand which variable we
960 // are talking about.
961 for (size_t i = 0; i < mVariableContextes.size(); ++i) {
963 .name = fmt::format("matcher_variables/{}", i),
964 .stateId = static_cast<short>((short)(ProcessingStateId::CONTEXT_VARIABLES_BASE) + i),
965 .minPublishInterval = 500, // if we publish too often we flood the GUI and we are not able to read it in any case
966 .sendInitialValue = true,
967 .defaultEnabled = mContext.get<DriverConfig const>().driverHasGUI,
968 });
969 }
970
971 for (int ci = 0; ci < mTimesliceIndex.size(); ci++) {
973 .name = fmt::format("data_relayer/{}", ci),
974 .stateId = static_cast<short>((short)(ProcessingStateId::DATA_RELAYER_BASE) + (short)ci),
975 .minPublishInterval = 800, // if we publish too often we flood the GUI and we are not able to read it in any case
976 .sendInitialValue = true,
977 .defaultEnabled = mContext.get<DriverConfig const>().driverHasGUI,
978 });
979 }
980}
981
983{
984 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
986}
987
989{
990 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
992}
993
995{
996 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
997 return VariableContextHelpers::getRunNumber(mTimesliceIndex.getVariablesForSlot(slot));
998}
999
1001{
1002 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1004}
1005
1007{
1008 std::scoped_lock<O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1009 auto& states = mContext.get<DataProcessingStates>();
1010 for (size_t ci = 0; ci < mTimesliceIndex.size(); ++ci) {
1011 auto slot = TimesliceSlot{ci};
1012 sendVariableContextMetrics(mTimesliceIndex.getPublishedVariablesForSlot(slot), slot,
1013 states);
1014 }
1015 char relayerSlotState[1024];
1016 // The number of timeslices is encoded in each state
1017 // We serialise the state of a Timeslot in a given state.
1018 int written = snprintf(relayerSlotState, 1024, "%d ", (int)mTimesliceIndex.size());
1019 char* buffer = relayerSlotState + written;
1020 for (size_t ci = 0; ci < mTimesliceIndex.size(); ++ci) {
1021 for (size_t si = 0; si < mDistinctRoutesIndex.size(); ++si) {
1022 int index = si * mTimesliceIndex.size() + ci;
1023 int value = static_cast<int>(mCachedStateMetrics[index]);
1024 buffer[si] = value + '0';
1025 // Anything which is done is actually already empty,
1026 // so after we report it we mark it as such.
1027 if (mCachedStateMetrics[index] == CacheEntryStatus::DONE) {
1028 mCachedStateMetrics[index] = CacheEntryStatus::EMPTY;
1029 }
1030 }
1031 buffer[mDistinctRoutesIndex.size()] = '\0';
1032 auto size = (int)(buffer - relayerSlotState + mDistinctRoutesIndex.size());
1033 states.updateState({.id = short((int)ProcessingStateId::DATA_RELAYER_BASE + ci), .size = size, .data = relayerSlotState});
1034 }
1035}
1036
1037} // namespace o2::framework
benchmark::State & state
#define O2_BUILTIN_UNREACHABLE
o2::monitoring::Verbosity Verbosity
int32_t i
uint32_t op
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:473
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:490
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:495
#define O2_LOCKABLE(T)
Definition Tracing.h:20
std::ostringstream debug
std::vector< MessageSet > consumeExistingInputsForTimeslice(TimesliceSlot id)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
std::function< void(TimesliceSlot, std::vector< MessageSet > &, TimesliceIndex::OldestOutputInfo info)> OnDropCallback
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
void getReadyToProcess(std::vector< RecordAction > &completed)
void setPipelineLength(size_t s)
Tune the maximum number of in flight timeslices this can handle.
size_t getParallelTimeslices() const
Returns how many timeslices we can handle in parallel.
std::vector< MessageSet > consumeAllInputsForTimeslice(TimesliceSlot id)
void pruneCache(TimesliceSlot slot, OnDropCallback onDrop=nullptr)
Prune the cache for a given slot.
RelayChoice relay(void const *rawHeader, std::unique_ptr< fair::mq::Message > *messages, InputInfo const &info, size_t nMessages, size_t nPayloads=1, OnDropCallback onDrop=nullptr)
DataRelayer(CompletionPolicy const &, std::vector< InputRoute > const &routes, TimesliceIndex &, ServiceRegistryRef)
void setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
void sendContextState()
Send metrics with the VariableContext information.
TimesliceId getTimesliceForSlot(TimesliceSlot slot)
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
TimesliceIndex::OldestOutputInfo getOldestPossibleOutput() const
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
void clear()
Remove all pending messages.
void markAsDirty(TimesliceSlot slot, bool value)
data_matcher::VariableContext & getPublishedVariablesForSlot(TimesliceSlot slot)
OldestInputInfo setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
OldestOutputInfo getOldestPossibleOutput() const
bool isDirty(TimesliceSlot const &slot) const
InputChannelInfo const & getChannelInfo(ChannelIndex channel) const
ActionTaken
The outcome for the processing of a given timeslot.
@ Wait
An obsolete slot is used to hold the new context and the old one is dropped.
@ DropObsolete
An invalid context is not inserted in the index and dropped.
@ DropInvalid
We wait for the oldest slot to complete.
@ ReplaceObsolete
An unused / invalid slot is used to hold the new context.
void rescan()
Mark all the cachelines as invalid, e.g. due to an out of band event.
bool validateSlot(TimesliceSlot slot, TimesliceId currentOldest)
bool isValid(TimesliceSlot const &slot) const
void markAsInvalid(TimesliceSlot slot)
OldestOutputInfo updateOldestPossibleOutput(bool rewinded)
data_matcher::VariableContext & getVariablesForSlot(TimesliceSlot slot)
void publish(void(*callback)(VariableContext const &, TimesliceSlot slot, void *context), void *context, TimesliceSlot slot)
ContextElement::Value const & get(size_t pos) const
GLint GLenum GLint x
Definition glcorearb.h:403
const GLfloat * m
Definition glcorearb.h:4066
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLuint index
Definition glcorearb.h:781
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum target
Definition glcorearb.h:1641
GLboolean * data
Definition glcorearb.h:298
GLintptr offset
Definition glcorearb.h:660
GLuint start
Definition glcorearb.h:469
GLuint * states
Definition glcorearb.h:4932
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
size_t matchToContext(void const *data, std::vector< DataDescriptorMatcher > const &matchers, std::vector< size_t > const &index, VariableContext &context)
constexpr int INVALID_INPUT
bool isCalibrationData(std::unique_ptr< fair::mq::Message > &first)
void sendVariableContextMetrics(VariableContext &context, TimesliceSlot slot, DataProcessingStates &states)
@ NoTransition
No pending transitions.
RuntimeErrorRef runtime_error_f(const char *,...)
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static constexpr int INVALID
std::string name
Name of the policy itself.
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
CallbackConfigureRelayer configureRelayer
CallbackFull callbackFull
Actual policy which decides what to do with a partial InputRecord, extended version.
static constexpr int32_t KEEP_AT_EOS_FLAG
Helper struct to hold statistics about the data processing happening.
@ Add
Update the rate of the metric given the amount since the last time.
Type type
What was the outcome of the relay operation.
Definition DataRelayer.h:64
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static std::string describe(InputSpec const &spec)
static unsigned int pipelineLength()
get max number of timeslices in the queue
static bool onlineDeploymentMode()
@true if running online
Running state information of a given device.
Definition DeviceState.h:34
ProcessingType allowedProcessing
Definition DeviceState.h:79
fair::mq::Channel * channel
Definition ChannelInfo.h:51
Reference to an inflight part.
Definition PartRef.h:24
static bool isValid(TimesliceId const &timeslice)
static constexpr uint64_t INVALID
static bool isValid(TimesliceSlot const &slot)
static constexpr uint64_t INVALID
static uint32_t getRunNumber(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFCounter(data_matcher::VariableContext const &variables)
static uint64_t getCreationTime(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFOrbit(data_matcher::VariableContext const &variables)
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
the main header struct
Definition DataHeader.h:618
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"