44#include <Monitoring/Metric.h>
45#include <Monitoring/Monitoring.h>
47#include <fairlogger/Logger.h>
48#include <fairmq/Channel.h>
50#if __has_include(<fairmq/shmem/Message.h>)
51#include <fairmq/shmem/Message.h>
53#include <fmt/format.h>
54#include <fmt/ostream.h>
73 std::vector<InputRoute>
const& routes,
77 mTimesliceIndex{
index},
78 mCompletionPolicy{policy},
83 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
93 auto numInputTypes = mDistinctRoutesIndex.size();
95 std::string queries =
"";
96 for (
short i = 0;
i < numInputTypes; ++
i) {
98 assert(mDistinctRoutesIndex[
i] < routes.size());
99 mInputs.push_back(routes[mDistinctRoutesIndex[
i]].matcher);
100 auto& matcher = routes[mDistinctRoutesIndex[
i]].matcher;
106 states.registerState({.name =
"data_queries", .stateId = stateId, .sendInitialValue =
true, .defaultEnabled =
true});
108 states.processCommandQueue();
113 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
121 LOGP(
debug,
"DataRelayer::processDanglingInputs");
122 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
127 if (expirationHandlers.empty()) {
128 LOGP(
debug,
"DataRelayer::processDanglingInputs: No expiration handlers");
132 std::vector<TimesliceSlot> slotsCreatedByHandlers;
134 LOGP(
debug,
"Creating new slot");
135 for (
auto& handler : expirationHandlers) {
136 LOGP(
debug,
"handler.creator for {}", handler.name);
137 auto channelIndex = deviceProxy.getInputChannelIndex(handler.routeIndex);
138 slotsCreatedByHandlers.push_back(handler.creator(services, channelIndex));
143 for (
auto slot : slotsCreatedByHandlers) {
149 if (validSlots > 0) {
151 LOGP(
debug,
"DataRelayer::processDanglingInputs: {} slots created by handler", validSlots);
153 LOGP(
debug,
"DataRelayer::processDanglingInputs: no slots created by handler");
157 int headerPresent = 0;
158 int payloadPresent = 0;
161 int checkerDenied = 0;
162 for (
size_t ti = 0; ti < mTimesliceIndex.
size(); ++ti) {
164 if (mTimesliceIndex.
isValid(slot) ==
false) {
167 assert(mDistinctRoutesIndex.empty() ==
false);
171 for (
size_t ei = 0; ei < expirationHandlers.size(); ++ei) {
172 auto& expirator = expirationHandlers[ei];
175 auto& part = mCache[ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value];
176 if (part.size() > 0 && part.header(0) !=
nullptr) {
180 if (part.size() > 0 && part.payload(0) !=
nullptr) {
185 if (!expirator.checker) {
189 if (slotsCreatedByHandlers[ei] != slot) {
194 auto getPartialRecord = [&cache = mCache, numInputTypes = mDistinctRoutesIndex.size()](
int li) -> gsl::span<MessageSet const> {
195 auto offset = li * numInputTypes;
196 assert(cache.size() >=
offset + numInputTypes);
198 auto const end = cache.data() +
offset + numInputTypes;
202 auto partial = getPartialRecord(ti);
204 auto getter = [&partial](
size_t idx,
size_t part) {
205 if (partial[idx].
size() > 0 && partial[idx].header(part).get()) {
206 auto header = partial[idx].header(part).get();
207 auto payload = partial[idx].payload(part).get();
209 reinterpret_cast<const char*
>(header->GetData()),
210 reinterpret_cast<char const*
>(payload ? payload->GetData() :
nullptr),
211 payload ? payload->GetSize() : 0};
215 auto nPartsGetter = [&partial](
size_t idx) {
216 return partial[idx].size();
218#if __has_include(<fairmq/shmem/Message.h>)
219 auto refCountGetter = [&partial](
size_t idx) ->
int {
220 auto& header =
static_cast<const fair::mq::shmem::Message&
>(*partial[idx].header(0));
221 return header.GetRefCount();
224 std::function<
int(
size_t)> refCountGetter =
nullptr;
226 InputSpan span{getter, nPartsGetter, refCountGetter,
static_cast<size_t>(partial.size())};
229 if (expirator.checker(services, timestamp.value, span) ==
false) {
234 assert(ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value < mCache.size());
235 assert(expirator.handler);
237 expirator.handler(services, newRef, variables);
238 part.reset(std::move(newRef));
239 activity.expiredSlots++;
242 assert(part.header(0) !=
nullptr);
243 assert(part.payload(0) !=
nullptr);
246 LOGP(
debug,
"DataRelayer::processDanglingInputs headerPresent:{}, payloadPresent:{}, noCheckers:{}, badSlot:{}, checkerDenied:{}",
247 headerPresent, payloadPresent, noCheckers, badSlot, checkerDenied);
255 std::vector<DataDescriptorMatcher>
const& matchers,
256 std::vector<size_t>
const&
index,
259 for (
size_t ri = 0, re =
index.size(); ri < re; ++ri) {
260 auto& matcher = matchers[
index[ri]];
262 if (matcher.match(
reinterpret_cast<char const*
>(
data), context)) {
275 static const std::string nullstring{
"null"};
279 static std::string
state =
"";
282 auto var = variables.
get(
i);
283 if (
auto pval = std::get_if<uint64_t>(&var)) {
285 }
else if (
auto pval = std::get_if<uint32_t>(&var)) {
287 }
else if (
auto pval2 = std::get_if<std::string>(&var)) {
295 .data =
state.data()});
303 LOGP(
debug,
"DataRelayer::setOldestPossibleInput {} from channel {}", newOldest.timeslice.value, newOldest.channel.value);
304 static bool dontDrop = getenv(
"DPL_DONT_DROP_OLD_TIMESLICE") && atoi(getenv(
"DPL_DONT_DROP_OLD_TIMESLICE"));
308 for (
size_t si = 0; si < mCache.size() / mInputs.size(); ++si) {
311 auto valid = mTimesliceIndex.
validateSlot({si}, newOldest.timeslice);
313 if (mTimesliceIndex.
isValid({si})) {
314 LOGP(
debug,
"Keeping slot {} because data has timestamp {} while oldest possible timestamp is {}", si, timestamp.value, newOldest.timeslice.value);
318 mPruneOps.push_back(
PruneOp{si});
319 bool didDrop =
false;
320 for (
size_t mi = 0; mi < mInputs.size(); ++mi) {
321 auto& input = mInputs[mi];
322 auto& element = mCache[si * mInputs.size() + mi];
323 if (element.size() != 0) {
324 if (input.lifetime != Lifetime::Condition && mCompletionPolicy.
name !=
"internal-dpl-injected-dummy-sink") {
328 LOGP(warning,
"Stop transition requested. Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it will never be completed.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
330 LOGP(error,
"Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it can never be completed.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
334 "Silently dropping data {} in pipeline slot {} because it has timeslice {} < {} after receiving data from channel {}."
335 "Because Lifetime::Timeframe data not there and not expected (e.g. due to sampling) we drop non sampled, non timeframe data (e.g. Conditions).",
343 for (
size_t mi = 0; mi < mInputs.size(); ++mi) {
344 auto& input = mInputs[mi];
345 if (input.lifetime == Lifetime::Timer) {
348 auto& element = mCache[si * mInputs.size() + mi];
349 if (element.size() == 0) {
354 O2_SIGNPOST_EVENT_EMIT(calibration, cid,
"expected_missing_data",
"Expected missing %{public}s (lifetime:%d) while dropping non-calibration data in slot %zu with timestamp %zu < %zu.",
357 LOGP(info,
"Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
362 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid,
"expected_missing_data",
"Not processing in calibration mode: missing %s (lifetime:%d) while dropping incomplete data in slot %zu with timestamp %zu < %zu.",
365 LOGP(error,
"Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
381 for (
auto&
op : mPruneOps) {
394 &cachedStateMetrics = mCachedStateMetrics,
395 numInputTypes = mDistinctRoutesIndex.size(),
396 &
index = mTimesliceIndex,
399 auto oldestPossibleTimeslice =
index.getOldestPossibleOutput();
401 std::vector<MessageSet> dropped(numInputTypes);
402 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
403 auto cacheId = slot.
index * numInputTypes + ai;
407 if (cache[cacheId].
size() > 0) {
408 dropped[ai] = std::move(cache[cacheId]);
411 bool anyDropped = std::any_of(dropped.begin(), dropped.end(), [](
auto&
m) { return m.size(); });
414 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid,
"pruneCache",
"Dropping stuff from slot %zu with timeslice %zu", slot.
index, oldestPossibleTimeslice.timeslice.value);
415 onDrop(slot, dropped, oldestPossibleTimeslice);
418 assert(cache.empty() ==
false);
419 assert(
index.size() * numInputTypes == cache.size());
423 assert(numInputTypes * slot.
index < cache.size());
424 for (
size_t ai = slot.
index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
435 auto* dph = o2::header::get<DataProcessingHeader*>(
first->GetData());
441 std::unique_ptr<fair::mq::Message>* messages,
447 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
453 return (slot.index % maxLanes) == (currentLane % maxLanes);
458 auto getInputTimeslice = [&matchers = mInputMatchers,
459 &distinctRoutes = mDistinctRoutesIndex,
462 -> std::tuple<int, TimesliceId> {
465 auto input =
matchToContext(rawHeader, matchers, distinctRoutes, context);
475 if (
auto pval = std::get_if<uint64_t>(&context.get(0))) {
477 return {input, timeslice};
487 auto saveInSlot = [&cachedStateMetrics = mCachedStateMetrics,
492 &services = mContext,
495 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid,
"saveInSlot",
"saving %{public}s@%zu in slot %zu from %{public}s",
496 fmt::format(
"{:x}", *o2::header::get<DataHeader*>(messages[0]->GetData())).c_str(),
497 timeslice.value, slot.index,
499 auto cacheIdx = numInputTypes * slot.index + input;
504 assert(nPayloads > 0);
506 for (
size_t mi = 0; mi < nMessages; ++mi) {
507 assert(mi + nPayloads < nMessages);
513 "Dropping incoming %zu messages because they are data processing.", nPayloads);
515 for (
size_t i = mi;
i < mi + nPayloads + 1;
i++) {
516 auto discard = std::move(messages[
i]);
521 target.add([&messages, &mi](
size_t i) -> fair::mq::MessagePtr& {
return messages[mi +
i]; }, nPayloads + 1);
559 auto&
index = mTimesliceIndex;
561 bool needsCleaning =
false;
564 for (
size_t ci = 0; ci <
index.size(); ++ci) {
566 if (!isSlotInLane(slot)) {
569 if (
index.isValid(slot) ==
false) {
572 std::tie(input, timeslice) = getInputTimeslice(
index.getVariablesForSlot(slot));
581 for (
size_t ci = 0; ci <
index.size(); ++ci) {
583 if (
index.isValid(slot) ==
true) {
586 if (!isSlotInLane(slot)) {
589 std::tie(input, timeslice) = getInputTimeslice(
index.getVariablesForSlot(slot));
591 needsCleaning =
true;
602 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](
const auto&
x) { return x.slot == slot; }), mPruneOps.end());
604 size_t saved = saveInSlot(timeslice, input, slot, info);
608 index.publishSlot(slot);
609 index.markAsDirty(slot,
true);
617 std::tie(input, timeslice) = getInputTimeslice(pristineContext);
619 auto DataHeaderInfo = [&rawHeader]() {
622 const auto* dh = o2::header::get<o2::header::DataHeader*>(rawHeader);
624 error += fmt::format(
"{}/{}/{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification);
626 error +=
"invalid header";
632 LOG(error) <<
"Could not match incoming data to any input route: " << DataHeaderInfo();
635 for (
size_t pi = 0; pi < nMessages; ++pi) {
636 messages[pi].reset(
nullptr);
642 LOG(error) <<
"Could not determine the timeslice for input: " << DataHeaderInfo();
645 for (
size_t pi = 0; pi < nMessages; ++pi) {
646 messages[pi].reset(
nullptr);
653 std::tie(action, slot) =
index.replaceLRUWith(pristineContext, timeslice);
654 uint64_t
const* debugTimestamp = std::get_if<uint64_t>(&pristineContext.
get(0));
657 "Slot %zu updated with %zu using action %d, %" PRIu64, slot.index, timeslice.value, (
int)action, *debugTimestamp);
660 updateStatistics(action);
666 static std::atomic<size_t> obsoleteCount = 0;
667 static std::atomic<size_t> mult = 1;
668 if ((obsoleteCount++ % (1 * mult)) == 0) {
669 LOGP(warning,
"Over {} incoming messages are already obsolete, not relaying.", obsoleteCount.load());
670 if (obsoleteCount > mult * 10) {
676 LOG(warning) <<
"Incoming data is invalid, not relaying.";
679 for (
size_t pi = 0; pi < nMessages; ++pi) {
680 messages[pi].reset(
nullptr);
688 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](
const auto&
x) { return x.slot == slot; }), mPruneOps.end());
689 size_t saved = saveInSlot(timeslice, input, slot, info);
693 index.publishSlot(slot);
694 index.markAsDirty(slot,
true);
702 LOGP(
debug,
"DataRelayer::getReadyToProcess");
703 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
706 const auto& cache = mCache;
707 const auto numInputTypes = mDistinctRoutesIndex.size();
713 auto getPartialRecord = [&cache, &numInputTypes](
int li) -> gsl::span<MessageSet const> {
714 auto offset = li * numInputTypes;
715 assert(cache.size() >=
offset + numInputTypes);
717 auto const end = cache.data() +
offset + numInputTypes;
726 LOGP(
debug,
"Doing action {} for slot {} (timeslice: {})", (
int)
op, li.
index, *timeslice);
729 LOGP(
debug,
"No timeslice associated with slot ", li.
index);
744 if (numInputTypes == 0) {
745 LOGP(
debug,
"numInputTypes == 0, returning.");
748 size_t cacheLines = cache.size() / numInputTypes;
749 assert(cacheLines * numInputTypes == cache.size());
750 int countConsume = 0;
751 int countConsumeExisting = 0;
752 int countProcess = 0;
753 int countDiscard = 0;
757 for (
int li = cacheLines - 1; li >= 0; --li) {
761 if (mTimesliceIndex.
isDirty(slot) ==
false) {
766 throw runtime_error_f(
"Completion police %s has no callback set", mCompletionPolicy.
name.c_str());
768 auto partial = getPartialRecord(li);
770 auto getter = [&partial](
size_t idx,
size_t part) {
771 if (partial[idx].
size() > 0 && partial[idx].header(part).get()) {
772 auto header = partial[idx].header(part).get();
773 auto payload = partial[idx].payload(part).get();
775 reinterpret_cast<const char*
>(header->GetData()),
776 reinterpret_cast<char const*
>(payload ? payload->GetData() :
nullptr),
777 payload ? payload->GetSize() : 0};
781 auto nPartsGetter = [&partial](
size_t idx) {
782 return partial[idx].size();
784#if __has_include(<fairmq/shmem/Message.h>)
785 auto refCountGetter = [&partial](
size_t idx) ->
int {
786 auto& header =
static_cast<const fair::mq::shmem::Message&
>(*partial[idx].header(0));
787 return header.GetRefCount();
790 std::function<
int(
size_t)> refCountGetter =
nullptr;
792 InputSpan span{getter, nPartsGetter, refCountGetter,
static_cast<size_t>(partial.size())};
796 auto timeslice = std::get_if<uint64_t>(&variables.get(0));
800 updateCompletionResults(slot, timeslice, action);
807 updateCompletionResults(slot, timeslice, action);
811 countConsumeExisting++;
812 updateCompletionResults(slot, timeslice, action);
817 updateCompletionResults(slot, timeslice, action);
822 updateCompletionResults(slot, timeslice, action);
837 LOGP(
debug,
"DataRelayer::getReadyToProcess results notDirty:{}, consume:{}, consumeExisting:{}, process:{}, discard:{}, wait:{}",
838 notDirty, countConsume, countConsumeExisting, countProcess,
839 countDiscard, countWait);
844 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
845 const auto numInputTypes = mDistinctRoutesIndex.size();
847 auto markInputDone = [&cachedStateMetrics = mCachedStateMetrics,
849 auto cacheId = s.index * numInputTypes + arg;
850 if (cachedStateMetrics[cacheId] == oldStatus) {
851 cachedStateMetrics[cacheId] = newStatus;
855 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
856 markInputDone(slot, ai, oldStatus, newStatus);
862 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
864 const auto numInputTypes = mDistinctRoutesIndex.size();
866 std::vector<MessageSet> messages(numInputTypes);
867 auto& cache = mCache;
868 auto&
index = mTimesliceIndex;
879 auto moveHeaderPayloadToOutput = [&messages,
880 &cachedStateMetrics = mCachedStateMetrics,
882 auto cacheId = s.index * numInputTypes + arg;
886 if (cache[cacheId].
size() > 0) {
887 messages[arg] = std::move(cache[cacheId]);
889 index.markAsInvalid(s);
897 for (
size_t ai = s.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
898 assert(std::accumulate(cache[ai].messages.begin(), cache[ai].messages.end(),
true, [](
bool result,
auto const& element) { return result && element.get() == nullptr; }));
901 index.markAsInvalid(s);
905 jumpToCacheEntryAssociatedWith(slot);
906 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
907 moveHeaderPayloadToOutput(slot, ai);
909 invalidateCacheFor(slot);
916 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
918 const auto numInputTypes = mDistinctRoutesIndex.size();
920 std::vector<MessageSet> messages(numInputTypes);
921 auto& cache = mCache;
922 auto&
index = mTimesliceIndex;
933 auto copyHeaderPayloadToOutput = [&messages,
934 &cachedStateMetrics = mCachedStateMetrics,
936 auto cacheId = s.index * numInputTypes + arg;
940 for (
size_t pi = 0; pi < cache[cacheId].size(); pi++) {
941 auto& header = cache[cacheId].header(pi);
942 auto&& newHeader = header->GetTransport()->CreateMessage();
943 newHeader->Copy(*header);
944 messages[arg].add(
PartRef{std::move(newHeader), std::move(cache[cacheId].payload(pi))});
949 jumpToCacheEntryAssociatedWith(slot);
950 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
951 copyHeaderPayloadToOutput(slot, ai);
954 return std::move(messages);
959 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
961 for (
auto& cache : mCache) {
964 for (
size_t s = 0; s < mTimesliceIndex.
size(); ++s) {
972 return mCache.size() / mDistinctRoutesIndex.size();
981 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
983 mTimesliceIndex.
resize(s);
984 mVariableContextes.resize(s);
990 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
992 auto numInputTypes = mDistinctRoutesIndex.size();
996 mCache.resize(numInputTypes * mTimesliceIndex.
size());
999 mCachedStateMetrics.resize(mCache.size());
1004 for (
size_t i = 0;
i < mVariableContextes.size(); ++
i) {
1006 .
name = fmt::format(
"matcher_variables/{}",
i),
1008 .minPublishInterval = 500,
1009 .sendInitialValue =
true,
1014 for (
int ci = 0; ci < mTimesliceIndex.
size(); ci++) {
1016 .
name = fmt::format(
"data_relayer/{}", ci),
1018 .minPublishInterval = 800,
1019 .sendInitialValue =
true,
1027 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1033 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1039 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1045 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1054 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1056 for (
size_t ci = 0; ci < mTimesliceIndex.
size(); ++ci) {
1061 char relayerSlotState[1024];
1064 int written = snprintf(relayerSlotState, 1024,
"%d ", (
int)mTimesliceIndex.
size());
1065 char*
buffer = relayerSlotState + written;
1066 for (
size_t ci = 0; ci < mTimesliceIndex.
size(); ++ci) {
1067 for (
size_t si = 0; si < mDistinctRoutesIndex.size(); ++si) {
1068 int index = si * mTimesliceIndex.
size() + ci;
1069 int value =
static_cast<int>(mCachedStateMetrics[
index]);
1077 buffer[mDistinctRoutesIndex.size()] =
'\0';
1078 auto size = (
int)(
buffer - relayerSlotState + mDistinctRoutesIndex.size());
#define O2_BUILTIN_UNREACHABLE
o2::monitoring::Verbosity Verbosity
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
std::vector< MessageSet > consumeExistingInputsForTimeslice(TimesliceSlot id)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
std::function< void(TimesliceSlot, std::vector< MessageSet > &, TimesliceIndex::OldestOutputInfo info)> OnDropCallback
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
void getReadyToProcess(std::vector< RecordAction > &completed)
void setPipelineLength(size_t s)
Tune the maximum number of in flight timeslices this can handle.
size_t getParallelTimeslices() const
Returns how many timeslices we can handle in parallel.
std::vector< MessageSet > consumeAllInputsForTimeslice(TimesliceSlot id)
void pruneCache(TimesliceSlot slot, OnDropCallback onDrop=nullptr)
Prune the cache for a given slot.
RelayChoice relay(void const *rawHeader, std::unique_ptr< fair::mq::Message > *messages, InputInfo const &info, size_t nMessages, size_t nPayloads=1, OnDropCallback onDrop=nullptr)
DataRelayer(CompletionPolicy const &, std::vector< InputRoute > const &routes, TimesliceIndex &, ServiceRegistryRef)
void setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
void sendContextState()
Send metrics with the VariableContext information.
TimesliceId getTimesliceForSlot(TimesliceSlot slot)
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
TimesliceIndex::OldestOutputInfo getOldestPossibleOutput() const
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
void clear()
Remove all pending messages.
void markAsDirty(TimesliceSlot slot, bool value)
data_matcher::VariableContext & getPublishedVariablesForSlot(TimesliceSlot slot)
OldestInputInfo setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
OldestOutputInfo getOldestPossibleOutput() const
bool isDirty(TimesliceSlot const &slot) const
InputChannelInfo const & getChannelInfo(ChannelIndex channel) const
ActionTaken
The outcome for the processing of a given timeslot.
@ Wait
An obsolete slot is used to hold the new context and the old one is dropped.
@ DropObsolete
An invalid context is not inserted in the index and dropped.
@ DropInvalid
We wait for the oldest slot to complete.
@ ReplaceObsolete
An unused / invalid slot is used to hold the new context.
void rescan()
Mark all the cachelines as invalid, e.g. due to an out of band event.
bool validateSlot(TimesliceSlot slot, TimesliceId currentOldest)
bool isValid(TimesliceSlot const &slot) const
void markAsInvalid(TimesliceSlot slot)
OldestOutputInfo updateOldestPossibleOutput(bool rewinded)
data_matcher::VariableContext & getVariablesForSlot(TimesliceSlot slot)
void publish(void(*callback)(VariableContext const &, TimesliceSlot slot, void *context), void *context, TimesliceSlot slot)
ContextElement::Value const & get(size_t pos) const
GLsizei const GLfloat * value
constexpr int MAX_MATCHING_VARIABLE
Defining PrimaryVertex explicitly as messageable.
size_t matchToContext(void const *data, std::vector< DataDescriptorMatcher > const &matchers, std::vector< size_t > const &index, VariableContext &context)
constexpr int INVALID_INPUT
bool isCalibrationData(std::unique_ptr< fair::mq::Message > &first)
void sendVariableContextMetrics(VariableContext &context, TimesliceSlot slot, DataProcessingStates &states)
@ NoTransition
No pending transitions.
@ DROPPED_INCOMING_MESSAGES
RuntimeErrorRef runtime_error_f(const char *,...)
std::string to_string(gsl::span< T, Size > span)
static constexpr int INVALID
std::string name
Name of the policy itself.
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
CallbackConfigureRelayer configureRelayer
CallbackFull callbackFull
Actual policy which decides what to do with a partial InputRecord, extended version.
Helper struct to hold statistics about the data processing happening.
@ Add
Update the rate of the metric given the amount since the last time.
Type type
What was the outcome of the relay operation.
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static std::string describe(InputSpec const &spec)
static unsigned int pipelineLength()
get max number of timeslices in the queue
static bool onlineDeploymentMode()
@true if running online
Running state information of a given device.
ProcessingType allowedProcessing
Reference to an inflight part.
static bool isValid(TimesliceId const ×lice)
static constexpr uint64_t INVALID
static bool isValid(TimesliceSlot const &slot)
static constexpr uint64_t INVALID
static uint32_t getRunNumber(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFCounter(data_matcher::VariableContext const &variables)
static uint64_t getCreationTime(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFOrbit(data_matcher::VariableContext const &variables)
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"