43#include <Monitoring/Metric.h>
44#include <Monitoring/Monitoring.h>
46#include <fairmq/Channel.h>
48#if __has_include(<fairmq/shmem/Message.h>)
49#include <fairmq/shmem/Message.h>
51#include <fmt/format.h>
52#include <fmt/ostream.h>
71 std::vector<InputRoute>
const& routes,
75 mTimesliceIndex{
index},
76 mCompletionPolicy{policy},
81 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
91 auto numInputTypes = mDistinctRoutesIndex.size();
93 std::string queries =
"";
94 for (
short i = 0;
i < numInputTypes; ++
i) {
96 assert(mDistinctRoutesIndex[
i] < routes.size());
97 mInputs.push_back(routes[mDistinctRoutesIndex[
i]].matcher);
98 auto& matcher = routes[mDistinctRoutesIndex[
i]].matcher;
104 states.registerState({.name =
"data_queries", .stateId = stateId, .sendInitialValue =
true, .defaultEnabled =
true});
106 states.processCommandQueue();
111 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
119 LOGP(
debug,
"DataRelayer::processDanglingInputs");
120 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
125 if (expirationHandlers.empty()) {
126 LOGP(
debug,
"DataRelayer::processDanglingInputs: No expiration handlers");
130 std::vector<TimesliceSlot> slotsCreatedByHandlers;
132 LOGP(
debug,
"Creating new slot");
133 for (
auto& handler : expirationHandlers) {
134 LOGP(
debug,
"handler.creator for {}", handler.name);
135 auto channelIndex = deviceProxy.getInputChannelIndex(handler.routeIndex);
136 slotsCreatedByHandlers.push_back(handler.creator(services, channelIndex));
141 for (
auto slot : slotsCreatedByHandlers) {
147 if (validSlots > 0) {
149 LOGP(
debug,
"DataRelayer::processDanglingInputs: {} slots created by handler", validSlots);
151 LOGP(
debug,
"DataRelayer::processDanglingInputs: no slots created by handler");
155 int headerPresent = 0;
156 int payloadPresent = 0;
159 int checkerDenied = 0;
160 for (
size_t ti = 0; ti < mTimesliceIndex.
size(); ++ti) {
162 if (mTimesliceIndex.
isValid(slot) ==
false) {
165 assert(mDistinctRoutesIndex.empty() ==
false);
169 for (
size_t ei = 0; ei < expirationHandlers.size(); ++ei) {
170 auto& expirator = expirationHandlers[ei];
173 auto& part = mCache[ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value];
174 if (part.size() > 0 && part.header(0) !=
nullptr) {
178 if (part.size() > 0 && part.payload(0) !=
nullptr) {
183 if (!expirator.checker) {
187 if (slotsCreatedByHandlers[ei] != slot) {
192 auto getPartialRecord = [&cache = mCache, numInputTypes = mDistinctRoutesIndex.size()](
int li) -> gsl::span<MessageSet const> {
193 auto offset = li * numInputTypes;
194 assert(cache.size() >=
offset + numInputTypes);
196 auto const end = cache.data() +
offset + numInputTypes;
200 auto partial = getPartialRecord(ti);
202 auto getter = [&partial](
size_t idx,
size_t part) {
203 if (partial[idx].
size() > 0 && partial[idx].header(part).get()) {
204 auto header = partial[idx].header(part).get();
205 auto payload = partial[idx].payload(part).get();
207 reinterpret_cast<const char*
>(header->GetData()),
208 reinterpret_cast<char const*
>(payload ? payload->GetData() :
nullptr),
209 payload ? payload->GetSize() : 0};
213 auto nPartsGetter = [&partial](
size_t idx) {
214 return partial[idx].size();
216#if __has_include(<fairmq/shmem/Message.h>)
217 auto refCountGetter = [&partial](
size_t idx) ->
int {
218 auto& header =
static_cast<const fair::mq::shmem::Message&
>(*partial[idx].header(0));
219 return header.GetRefCount();
222 std::function<
int(
size_t)> refCountGetter =
nullptr;
224 InputSpan span{getter, nPartsGetter, refCountGetter,
static_cast<size_t>(partial.size())};
227 if (expirator.checker(services, timestamp.value, span) ==
false) {
232 assert(ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value < mCache.size());
233 assert(expirator.handler);
235 expirator.handler(services, newRef, variables);
236 part.reset(std::move(newRef));
237 activity.expiredSlots++;
240 assert(part.header(0) !=
nullptr);
241 assert(part.payload(0) !=
nullptr);
244 LOGP(
debug,
"DataRelayer::processDanglingInputs headerPresent:{}, payloadPresent:{}, noCheckers:{}, badSlot:{}, checkerDenied:{}",
245 headerPresent, payloadPresent, noCheckers, badSlot, checkerDenied);
253 std::vector<DataDescriptorMatcher>
const& matchers,
254 std::vector<size_t>
const&
index,
257 for (
size_t ri = 0, re =
index.size(); ri < re; ++ri) {
258 auto& matcher = matchers[
index[ri]];
260 if (matcher.match(
reinterpret_cast<char const*
>(
data), context)) {
273 static const std::string nullstring{
"null"};
277 static std::string
state =
"";
280 auto var = variables.
get(
i);
281 if (
auto pval = std::get_if<uint64_t>(&var)) {
283 }
else if (
auto pval = std::get_if<uint32_t>(&var)) {
285 }
else if (
auto pval2 = std::get_if<std::string>(&var)) {
293 .data =
state.data()});
301 LOGP(
debug,
"DataRelayer::setOldestPossibleInput {} from channel {}", newOldest.timeslice.value, newOldest.channel.value);
302 static bool dontDrop = getenv(
"DPL_DONT_DROP_OLD_TIMESLICE") && atoi(getenv(
"DPL_DONT_DROP_OLD_TIMESLICE"));
306 for (
size_t si = 0; si < mCache.size() / mInputs.size(); ++si) {
309 auto valid = mTimesliceIndex.
validateSlot({si}, newOldest.timeslice);
311 if (mTimesliceIndex.
isValid({si})) {
312 LOGP(
debug,
"Keeping slot {} because data has timestamp {} while oldest possible timestamp is {}", si, timestamp.value, newOldest.timeslice.value);
316 mPruneOps.push_back(
PruneOp{si});
317 bool didDrop =
false;
318 for (
size_t mi = 0; mi < mInputs.size(); ++mi) {
319 auto& input = mInputs[mi];
320 auto& element = mCache[si * mInputs.size() + mi];
321 if (element.size() != 0) {
322 if (input.lifetime != Lifetime::Condition && mCompletionPolicy.
name !=
"internal-dpl-injected-dummy-sink") {
326 LOGP(warning,
"Stop transition requested. Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it will never be completed.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
328 LOGP(error,
"Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it can never be completed.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
332 "Silently dropping data {} in pipeline slot {} because it has timeslice {} < {} after receiving data from channel {}."
333 "Because Lifetime::Timeframe data not there and not expected (e.g. due to sampling) we drop non sampled, non timeframe data (e.g. Conditions).",
341 for (
size_t mi = 0; mi < mInputs.size(); ++mi) {
342 auto& input = mInputs[mi];
343 if (input.lifetime == Lifetime::Timer) {
346 auto& element = mCache[si * mInputs.size() + mi];
347 if (element.size() == 0) {
350 LOGP(warning,
"Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
352 LOGP(error,
"Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
367 for (
auto&
op : mPruneOps) {
380 &cachedStateMetrics = mCachedStateMetrics,
381 numInputTypes = mDistinctRoutesIndex.size(),
382 &
index = mTimesliceIndex,
385 auto oldestPossibleTimeslice =
index.getOldestPossibleOutput();
387 std::vector<MessageSet> dropped(numInputTypes);
388 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
389 auto cacheId = slot.
index * numInputTypes + ai;
393 if (cache[cacheId].
size() > 0) {
394 dropped[ai] = std::move(cache[cacheId]);
397 bool anyDropped = std::any_of(dropped.begin(), dropped.end(), [](
auto&
m) { return m.size(); });
400 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid,
"pruneCache",
"Dropping stuff from slot %zu with timeslice %zu", slot.
index, oldestPossibleTimeslice.timeslice.value);
401 onDrop(slot, dropped, oldestPossibleTimeslice);
404 assert(cache.empty() ==
false);
405 assert(
index.size() * numInputTypes == cache.size());
409 assert(numInputTypes * slot.
index < cache.size());
410 for (
size_t ai = slot.
index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
421 auto* dh = o2::header::get<DataHeader*>(
first->GetData());
427 std::unique_ptr<fair::mq::Message>* messages,
433 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
439 return (slot.index % maxLanes) == (currentLane % maxLanes);
444 auto getInputTimeslice = [&matchers = mInputMatchers,
445 &distinctRoutes = mDistinctRoutesIndex,
448 -> std::tuple<int, TimesliceId> {
451 auto input =
matchToContext(rawHeader, matchers, distinctRoutes, context);
461 if (
auto pval = std::get_if<uint64_t>(&context.get(0))) {
463 return {input, timeslice};
473 auto saveInSlot = [&cachedStateMetrics = mCachedStateMetrics,
478 &services = mContext,
481 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid,
"saveInSlot",
"saving %{public}s@%zu in slot %zu from %{public}s",
482 fmt::format(
"{:x}", *o2::header::get<DataHeader*>(messages[0]->GetData())).c_str(),
483 timeslice.value, slot.index,
485 auto cacheIdx = numInputTypes * slot.index + input;
490 assert(nPayloads > 0);
492 for (
size_t mi = 0; mi < nMessages; ++mi) {
493 assert(mi + nPayloads < nMessages);
499 "Dropping incoming %zu messages because they are data processing.", nPayloads);
501 for (
size_t i = mi;
i < mi + nPayloads + 1;
i++) {
502 auto discard = std::move(messages[
i]);
507 target.add([&messages, &mi](
size_t i) -> fair::mq::MessagePtr& {
return messages[mi +
i]; }, nPayloads + 1);
545 auto&
index = mTimesliceIndex;
547 bool needsCleaning =
false;
550 for (
size_t ci = 0; ci <
index.size(); ++ci) {
552 if (!isSlotInLane(slot)) {
555 if (
index.isValid(slot) ==
false) {
558 std::tie(input, timeslice) = getInputTimeslice(
index.getVariablesForSlot(slot));
567 for (
size_t ci = 0; ci <
index.size(); ++ci) {
569 if (
index.isValid(slot) ==
true) {
572 if (!isSlotInLane(slot)) {
575 std::tie(input, timeslice) = getInputTimeslice(
index.getVariablesForSlot(slot));
577 needsCleaning =
true;
588 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](
const auto&
x) { return x.slot == slot; }), mPruneOps.end());
590 size_t saved = saveInSlot(timeslice, input, slot, info);
594 index.publishSlot(slot);
595 index.markAsDirty(slot,
true);
603 std::tie(input, timeslice) = getInputTimeslice(pristineContext);
605 auto DataHeaderInfo = [&rawHeader]() {
608 const auto* dh = o2::header::get<o2::header::DataHeader*>(rawHeader);
610 error += fmt::format(
"{}/{}/{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification);
612 error +=
"invalid header";
618 LOG(error) <<
"Could not match incoming data to any input route: " << DataHeaderInfo();
621 for (
size_t pi = 0; pi < nMessages; ++pi) {
622 messages[pi].reset(
nullptr);
628 LOG(error) <<
"Could not determine the timeslice for input: " << DataHeaderInfo();
631 for (
size_t pi = 0; pi < nMessages; ++pi) {
632 messages[pi].reset(
nullptr);
639 std::tie(action, slot) =
index.replaceLRUWith(pristineContext, timeslice);
640 uint64_t
const* debugTimestamp = std::get_if<uint64_t>(&pristineContext.
get(0));
643 "Slot %zu updated with %zu using action %d, %" PRIu64, slot.index, timeslice.value, (
int)action, *debugTimestamp);
646 updateStatistics(action);
652 static std::atomic<size_t> obsoleteCount = 0;
653 static std::atomic<size_t> mult = 1;
654 if ((obsoleteCount++ % (1 * mult)) == 0) {
655 LOGP(warning,
"Over {} incoming messages are already obsolete, not relaying.", obsoleteCount.load());
656 if (obsoleteCount > mult * 10) {
662 LOG(warning) <<
"Incoming data is invalid, not relaying.";
665 for (
size_t pi = 0; pi < nMessages; ++pi) {
666 messages[pi].reset(
nullptr);
674 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](
const auto&
x) { return x.slot == slot; }), mPruneOps.end());
675 size_t saved = saveInSlot(timeslice, input, slot, info);
679 index.publishSlot(slot);
680 index.markAsDirty(slot,
true);
688 LOGP(
debug,
"DataRelayer::getReadyToProcess");
689 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
692 const auto& cache = mCache;
693 const auto numInputTypes = mDistinctRoutesIndex.size();
699 auto getPartialRecord = [&cache, &numInputTypes](
int li) -> gsl::span<MessageSet const> {
700 auto offset = li * numInputTypes;
701 assert(cache.size() >=
offset + numInputTypes);
703 auto const end = cache.data() +
offset + numInputTypes;
712 LOGP(
debug,
"Doing action {} for slot {} (timeslice: {})", (
int)
op, li.
index, *timeslice);
715 LOGP(
debug,
"No timeslice associated with slot ", li.
index);
730 if (numInputTypes == 0) {
731 LOGP(
debug,
"numInputTypes == 0, returning.");
734 size_t cacheLines = cache.size() / numInputTypes;
735 assert(cacheLines * numInputTypes == cache.size());
736 int countConsume = 0;
737 int countConsumeExisting = 0;
738 int countProcess = 0;
739 int countDiscard = 0;
743 for (
int li = cacheLines - 1; li >= 0; --li) {
747 if (mTimesliceIndex.
isDirty(slot) ==
false) {
752 throw runtime_error_f(
"Completion police %s has no callback set", mCompletionPolicy.
name.c_str());
754 auto partial = getPartialRecord(li);
756 auto getter = [&partial](
size_t idx,
size_t part) {
757 if (partial[idx].
size() > 0 && partial[idx].header(part).get()) {
758 auto header = partial[idx].header(part).get();
759 auto payload = partial[idx].payload(part).get();
761 reinterpret_cast<const char*
>(header->GetData()),
762 reinterpret_cast<char const*
>(payload ? payload->GetData() :
nullptr),
763 payload ? payload->GetSize() : 0};
767 auto nPartsGetter = [&partial](
size_t idx) {
768 return partial[idx].size();
770#if __has_include(<fairmq/shmem/Message.h>)
771 auto refCountGetter = [&partial](
size_t idx) ->
int {
772 auto& header =
static_cast<const fair::mq::shmem::Message&
>(*partial[idx].header(0));
773 return header.GetRefCount();
776 std::function<
int(
size_t)> refCountGetter =
nullptr;
778 InputSpan span{getter, nPartsGetter, refCountGetter,
static_cast<size_t>(partial.size())};
782 auto timeslice = std::get_if<uint64_t>(&variables.get(0));
786 updateCompletionResults(slot, timeslice, action);
793 updateCompletionResults(slot, timeslice, action);
797 countConsumeExisting++;
798 updateCompletionResults(slot, timeslice, action);
803 updateCompletionResults(slot, timeslice, action);
808 updateCompletionResults(slot, timeslice, action);
823 LOGP(
debug,
"DataRelayer::getReadyToProcess results notDirty:{}, consume:{}, consumeExisting:{}, process:{}, discard:{}, wait:{}",
824 notDirty, countConsume, countConsumeExisting, countProcess,
825 countDiscard, countWait);
830 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
831 const auto numInputTypes = mDistinctRoutesIndex.size();
833 auto markInputDone = [&cachedStateMetrics = mCachedStateMetrics,
835 auto cacheId = s.index * numInputTypes + arg;
836 if (cachedStateMetrics[cacheId] == oldStatus) {
837 cachedStateMetrics[cacheId] = newStatus;
841 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
842 markInputDone(slot, ai, oldStatus, newStatus);
848 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
850 const auto numInputTypes = mDistinctRoutesIndex.size();
852 std::vector<MessageSet> messages(numInputTypes);
853 auto& cache = mCache;
854 auto&
index = mTimesliceIndex;
865 auto moveHeaderPayloadToOutput = [&messages,
866 &cachedStateMetrics = mCachedStateMetrics,
868 auto cacheId = s.index * numInputTypes + arg;
872 if (cache[cacheId].
size() > 0) {
873 messages[arg] = std::move(cache[cacheId]);
875 index.markAsInvalid(s);
883 for (
size_t ai = s.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
884 assert(std::accumulate(cache[ai].messages.begin(), cache[ai].messages.end(),
true, [](
bool result,
auto const& element) { return result && element.get() == nullptr; }));
887 index.markAsInvalid(s);
891 jumpToCacheEntryAssociatedWith(slot);
892 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
893 moveHeaderPayloadToOutput(slot, ai);
895 invalidateCacheFor(slot);
902 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
904 const auto numInputTypes = mDistinctRoutesIndex.size();
906 std::vector<MessageSet> messages(numInputTypes);
907 auto& cache = mCache;
908 auto&
index = mTimesliceIndex;
919 auto copyHeaderPayloadToOutput = [&messages,
920 &cachedStateMetrics = mCachedStateMetrics,
922 auto cacheId = s.index * numInputTypes + arg;
926 for (
size_t pi = 0; pi < cache[cacheId].size(); pi++) {
927 auto& header = cache[cacheId].header(pi);
928 auto&& newHeader = header->GetTransport()->CreateMessage();
929 newHeader->Copy(*header);
930 messages[arg].add(
PartRef{std::move(newHeader), std::move(cache[cacheId].payload(pi))});
935 jumpToCacheEntryAssociatedWith(slot);
936 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
937 copyHeaderPayloadToOutput(slot, ai);
940 return std::move(messages);
945 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
947 for (
auto& cache : mCache) {
950 for (
size_t s = 0; s < mTimesliceIndex.
size(); ++s) {
958 return mCache.size() / mDistinctRoutesIndex.size();
967 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
969 mTimesliceIndex.
resize(s);
970 mVariableContextes.resize(s);
976 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
978 auto numInputTypes = mDistinctRoutesIndex.size();
982 mCache.resize(numInputTypes * mTimesliceIndex.
size());
985 mCachedStateMetrics.resize(mCache.size());
990 for (
size_t i = 0;
i < mVariableContextes.size(); ++
i) {
992 .
name = fmt::format(
"matcher_variables/{}",
i),
994 .minPublishInterval = 500,
995 .sendInitialValue =
true,
1000 for (
int ci = 0; ci < mTimesliceIndex.
size(); ci++) {
1002 .
name = fmt::format(
"data_relayer/{}", ci),
1004 .minPublishInterval = 800,
1005 .sendInitialValue =
true,
1013 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1019 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1025 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1031 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1037 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1039 for (
size_t ci = 0; ci < mTimesliceIndex.
size(); ++ci) {
1044 char relayerSlotState[1024];
1047 int written = snprintf(relayerSlotState, 1024,
"%d ", (
int)mTimesliceIndex.
size());
1048 char*
buffer = relayerSlotState + written;
1049 for (
size_t ci = 0; ci < mTimesliceIndex.
size(); ++ci) {
1050 for (
size_t si = 0; si < mDistinctRoutesIndex.size(); ++si) {
1051 int index = si * mTimesliceIndex.
size() + ci;
1052 int value =
static_cast<int>(mCachedStateMetrics[
index]);
1060 buffer[mDistinctRoutesIndex.size()] =
'\0';
1061 auto size = (
int)(
buffer - relayerSlotState + mDistinctRoutesIndex.size());
#define O2_BUILTIN_UNREACHABLE
o2::monitoring::Verbosity Verbosity
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
std::vector< MessageSet > consumeExistingInputsForTimeslice(TimesliceSlot id)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
std::function< void(TimesliceSlot, std::vector< MessageSet > &, TimesliceIndex::OldestOutputInfo info)> OnDropCallback
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
void getReadyToProcess(std::vector< RecordAction > &completed)
void setPipelineLength(size_t s)
Tune the maximum number of in flight timeslices this can handle.
size_t getParallelTimeslices() const
Returns how many timeslices we can handle in parallel.
std::vector< MessageSet > consumeAllInputsForTimeslice(TimesliceSlot id)
void pruneCache(TimesliceSlot slot, OnDropCallback onDrop=nullptr)
Prune the cache for a given slot.
RelayChoice relay(void const *rawHeader, std::unique_ptr< fair::mq::Message > *messages, InputInfo const &info, size_t nMessages, size_t nPayloads=1, OnDropCallback onDrop=nullptr)
DataRelayer(CompletionPolicy const &, std::vector< InputRoute > const &routes, TimesliceIndex &, ServiceRegistryRef)
void setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
void sendContextState()
Send metrics with the VariableContext information.
TimesliceId getTimesliceForSlot(TimesliceSlot slot)
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
TimesliceIndex::OldestOutputInfo getOldestPossibleOutput() const
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
void clear()
Remove all pending messages.
void markAsDirty(TimesliceSlot slot, bool value)
data_matcher::VariableContext & getPublishedVariablesForSlot(TimesliceSlot slot)
OldestInputInfo setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
OldestOutputInfo getOldestPossibleOutput() const
bool isDirty(TimesliceSlot const &slot) const
InputChannelInfo const & getChannelInfo(ChannelIndex channel) const
ActionTaken
The outcome for the processing of a given timeslot.
@ Wait
An obsolete slot is used to hold the new context and the old one is dropped.
@ DropObsolete
An invalid context is not inserted in the index and dropped.
@ DropInvalid
We wait for the oldest slot to complete.
@ ReplaceObsolete
An unused / invalid slot is used to hold the new context.
void rescan()
Mark all the cachelines as invalid, e.g. due to an out of band event.
bool validateSlot(TimesliceSlot slot, TimesliceId currentOldest)
bool isValid(TimesliceSlot const &slot) const
void markAsInvalid(TimesliceSlot slot)
OldestOutputInfo updateOldestPossibleOutput(bool rewinded)
data_matcher::VariableContext & getVariablesForSlot(TimesliceSlot slot)
void publish(void(*callback)(VariableContext const &, TimesliceSlot slot, void *context), void *context, TimesliceSlot slot)
ContextElement::Value const & get(size_t pos) const
GLsizei const GLfloat * value
constexpr int MAX_MATCHING_VARIABLE
Defining PrimaryVertex explicitly as messageable.
size_t matchToContext(void const *data, std::vector< DataDescriptorMatcher > const &matchers, std::vector< size_t > const &index, VariableContext &context)
constexpr int INVALID_INPUT
bool isCalibrationData(std::unique_ptr< fair::mq::Message > &first)
void sendVariableContextMetrics(VariableContext &context, TimesliceSlot slot, DataProcessingStates &states)
@ NoTransition
No pending transitions.
@ DROPPED_INCOMING_MESSAGES
RuntimeErrorRef runtime_error_f(const char *,...)
std::string to_string(gsl::span< T, Size > span)
static constexpr int INVALID
std::string name
Name of the policy itself.
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
CallbackConfigureRelayer configureRelayer
CallbackFull callbackFull
Actual policy which decides what to do with a partial InputRecord, extended version.
Helper struct to hold statistics about the data processing happening.
@ Add
Update the rate of the metric given the amount since the last time.
Type type
What was the outcome of the relay operation.
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static std::string describe(InputSpec const &spec)
static unsigned int pipelineLength()
get max number of timeslices in the queue
static bool onlineDeploymentMode()
@true if running online
Running state information of a given device.
ProcessingType allowedProcessing
Reference to an inflight part.
static bool isValid(TimesliceId const ×lice)
static constexpr uint64_t INVALID
static bool isValid(TimesliceSlot const &slot)
static constexpr uint64_t INVALID
static uint32_t getRunNumber(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFCounter(data_matcher::VariableContext const &variables)
static uint64_t getCreationTime(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFOrbit(data_matcher::VariableContext const &variables)
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"