44#include <Monitoring/Metric.h>
45#include <Monitoring/Monitoring.h>
47#include <fairlogger/Logger.h>
48#include <fairmq/Channel.h>
50#include <fairmq/shmem/Message.h>
51#include <fmt/format.h>
52#include <fmt/ostream.h>
71 std::vector<InputRoute>
const&
routes,
75 mTimesliceIndex{
index},
76 mCompletionPolicy{policy},
81 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
91 auto numInputTypes = mDistinctRoutesIndex.size();
93 std::string queries =
"";
94 for (
short i = 0;
i < numInputTypes; ++
i) {
96 assert(mDistinctRoutesIndex[
i] <
routes.size());
97 mInputs.push_back(
routes[mDistinctRoutesIndex[
i]].matcher);
98 auto& matcher =
routes[mDistinctRoutesIndex[
i]].matcher;
104 states.registerState({.name =
"data_queries", .stateId = stateId, .sendInitialValue =
true, .defaultEnabled =
true});
106 states.processCommandQueue();
111 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
119 LOGP(
debug,
"DataRelayer::processDanglingInputs");
120 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
125 if (expirationHandlers.empty()) {
126 LOGP(
debug,
"DataRelayer::processDanglingInputs: No expiration handlers");
130 std::vector<TimesliceSlot> slotsCreatedByHandlers;
132 LOGP(
debug,
"Creating new slot");
133 for (
auto& handler : expirationHandlers) {
134 LOGP(
debug,
"handler.creator for {}", handler.name);
135 auto channelIndex = deviceProxy.getInputChannelIndex(handler.routeIndex);
136 slotsCreatedByHandlers.push_back(handler.creator(services, channelIndex));
141 for (
auto slot : slotsCreatedByHandlers) {
147 if (validSlots > 0) {
149 LOGP(
debug,
"DataRelayer::processDanglingInputs: {} slots created by handler", validSlots);
151 LOGP(
debug,
"DataRelayer::processDanglingInputs: no slots created by handler");
155 int headerPresent = 0;
156 int payloadPresent = 0;
159 int checkerDenied = 0;
160 for (
size_t ti = 0; ti < mTimesliceIndex.
size(); ++ti) {
162 if (mTimesliceIndex.
isValid(slot) ==
false) {
165 assert(mDistinctRoutesIndex.empty() ==
false);
169 for (
size_t ei = 0; ei < expirationHandlers.size(); ++ei) {
170 auto& expirator = expirationHandlers[ei];
173 auto& part = mCache[ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value];
174 if (part.size() > 0 && part.header(0) !=
nullptr) {
178 if (part.size() > 0 && part.payload(0) !=
nullptr) {
183 if (!expirator.checker) {
187 if (slotsCreatedByHandlers[ei] != slot) {
192 auto getPartialRecord = [&cache = mCache, numInputTypes = mDistinctRoutesIndex.size()](
int li) -> std::span<MessageSet const> {
193 auto offset = li * numInputTypes;
194 assert(cache.size() >=
offset + numInputTypes);
196 auto const end = cache.data() +
offset + numInputTypes;
200 auto partial = getPartialRecord(ti);
202 auto getter = [&partial](
size_t idx,
size_t part) {
203 if (partial[idx].
size() > 0 && partial[idx].header(part).get()) {
204 auto header = partial[idx].header(part).get();
205 auto payload = partial[idx].payload(part).get();
207 reinterpret_cast<const char*
>(header->GetData()),
208 reinterpret_cast<char const*
>(payload ? payload->GetData() :
nullptr),
209 payload ? payload->GetSize() : 0};
213 auto nPartsGetter = [&partial](
size_t idx) {
214 return partial[idx].size();
216 auto refCountGetter = [&partial](
size_t idx) ->
int {
217 auto& header =
static_cast<const fair::mq::shmem::Message&
>(*partial[idx].header(0));
218 return header.GetRefCount();
220 InputSpan span{getter, nPartsGetter, refCountGetter,
static_cast<size_t>(partial.size())};
223 if (expirator.checker(services, timestamp.value, span) ==
false) {
228 assert(ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value < mCache.size());
229 assert(expirator.handler);
231 expirator.handler(services, newRef, variables);
232 part.reset(std::move(newRef));
233 activity.expiredSlots++;
236 assert(part.header(0) !=
nullptr);
237 assert(part.payload(0) !=
nullptr);
240 LOGP(
debug,
"DataRelayer::processDanglingInputs headerPresent:{}, payloadPresent:{}, noCheckers:{}, badSlot:{}, checkerDenied:{}",
241 headerPresent, payloadPresent, noCheckers, badSlot, checkerDenied);
249 std::vector<DataDescriptorMatcher>
const&
matchers,
250 std::vector<size_t>
const&
index,
253 for (
size_t ri = 0, re =
index.size(); ri < re; ++ri) {
256 if (matcher.match(
reinterpret_cast<char const*
>(
data), context)) {
269 static const std::string nullstring{
"null"};
273 static std::string
state =
"";
276 auto var = variables.
get(
i);
277 if (
auto pval = std::get_if<uint64_t>(&var)) {
279 }
else if (
auto pval = std::get_if<uint32_t>(&var)) {
281 }
else if (
auto pval2 = std::get_if<std::string>(&var)) {
289 .data =
state.data()});
297 LOGP(
debug,
"DataRelayer::setOldestPossibleInput {} from channel {}", newOldest.timeslice.value, newOldest.channel.value);
298 static bool dontDrop = getenv(
"DPL_DONT_DROP_OLD_TIMESLICE") && atoi(getenv(
"DPL_DONT_DROP_OLD_TIMESLICE"));
302 for (
size_t si = 0; si < mCache.size() / mInputs.size(); ++si) {
305 auto valid = mTimesliceIndex.
validateSlot({si}, newOldest.timeslice);
307 if (mTimesliceIndex.
isValid({si})) {
308 LOGP(
debug,
"Keeping slot {} because data has timestamp {} while oldest possible timestamp is {}", si, timestamp.value, newOldest.timeslice.value);
312 mPruneOps.push_back(
PruneOp{si});
313 bool didDrop =
false;
314 for (
size_t mi = 0; mi < mInputs.size(); ++mi) {
315 auto& input = mInputs[mi];
316 auto&
element = mCache[si * mInputs.size() + mi];
318 if (input.lifetime != Lifetime::Condition && mCompletionPolicy.
name !=
"internal-dpl-injected-dummy-sink") {
322 LOGP(warning,
"Stop transition requested. Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it will never be completed.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
324 LOGP(error,
"Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it can never be completed.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
328 "Silently dropping data {} in pipeline slot {} because it has timeslice {} < {} after receiving data from channel {}."
329 "Because Lifetime::Timeframe data not there and not expected (e.g. due to sampling) we drop non sampled, non timeframe data (e.g. Conditions).",
337 for (
size_t mi = 0; mi < mInputs.size(); ++mi) {
338 auto& input = mInputs[mi];
339 if (input.lifetime == Lifetime::Timer) {
342 auto&
element = mCache[si * mInputs.size() + mi];
348 O2_SIGNPOST_EVENT_EMIT(calibration, cid,
"expected_missing_data",
"Expected missing %{public}s (lifetime:%d) while dropping non-calibration data in slot %zu with timestamp %zu < %zu.",
351 LOGP(info,
"Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
356 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid,
"expected_missing_data",
"Not processing in calibration mode: missing %s (lifetime:%d) while dropping incomplete data in slot %zu with timestamp %zu < %zu.",
359 LOGP(error,
"Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
375 for (
auto&
op : mPruneOps) {
388 &cachedStateMetrics = mCachedStateMetrics,
389 numInputTypes = mDistinctRoutesIndex.size(),
390 &
index = mTimesliceIndex,
393 auto oldestPossibleTimeslice =
index.getOldestPossibleOutput();
395 std::vector<MessageSet> dropped(numInputTypes);
396 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
397 auto cacheId = slot.
index * numInputTypes + ai;
401 if (cache[cacheId].
size() > 0) {
402 dropped[ai] = std::move(cache[cacheId]);
405 bool anyDropped = std::any_of(dropped.begin(), dropped.end(), [](
auto&
m) { return m.size(); });
408 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid,
"pruneCache",
"Dropping stuff from slot %zu with timeslice %zu", slot.
index, oldestPossibleTimeslice.timeslice.value);
409 onDrop(slot, dropped, oldestPossibleTimeslice);
412 assert(cache.empty() ==
false);
413 assert(
index.size() * numInputTypes == cache.size());
417 assert(numInputTypes * slot.
index < cache.size());
418 for (
size_t ai = slot.
index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
429 auto* dph = o2::header::get<DataProcessingHeader*>(
first->GetData());
435 std::unique_ptr<fair::mq::Message>* messages,
442 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
448 return (slot.index % maxLanes) == (currentLane % maxLanes);
453 auto getInputTimeslice = [&
matchers = mInputMatchers,
454 &distinctRoutes = mDistinctRoutesIndex,
457 -> std::tuple<int, TimesliceId> {
470 if (
auto pval = std::get_if<uint64_t>(&context.get(0))) {
472 return {input, timeslice};
482 auto saveInSlot = [&cachedStateMetrics = mCachedStateMetrics,
488 &services = mContext,
491 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid,
"saveInSlot",
"saving %{public}s@%zu in slot %zu from %{public}s",
492 fmt::format(
"{:x}", *o2::header::get<DataHeader*>(messages[0]->GetData())).c_str(),
493 timeslice.value, slot.index,
495 auto cacheIdx = numInputTypes * slot.index + input;
500 assert(nPayloads > 0);
504 auto allMessages = std::span<fair::mq::MessagePtr>(messages, messages + nMessages);
506 onInsertion(services, allMessages);
508 for (
size_t mi = 0; mi < nMessages; ++mi) {
509 assert(mi + nPayloads < nMessages);
515 "Dropping incoming %zu messages because they are data processing.", nPayloads);
517 for (
size_t i = mi;
i < mi + nPayloads + 1;
i++) {
518 auto discard = std::move(messages[
i]);
523 auto span = std::span<fair::mq::MessagePtr>(messages + mi, messages + mi + nPayloads + 1);
526 target.add([&span](
size_t i) -> fair::mq::MessagePtr& {
return span[
i]; }, nPayloads + 1);
564 auto&
index = mTimesliceIndex;
566 bool needsCleaning =
false;
569 for (
size_t ci = 0; ci <
index.size(); ++ci) {
571 if (!isSlotInLane(slot)) {
574 if (
index.isValid(slot) ==
false) {
577 std::tie(input, timeslice) = getInputTimeslice(
index.getVariablesForSlot(slot));
586 for (
size_t ci = 0; ci <
index.size(); ++ci) {
588 if (
index.isValid(slot) ==
true) {
591 if (!isSlotInLane(slot)) {
594 std::tie(input, timeslice) = getInputTimeslice(
index.getVariablesForSlot(slot));
596 needsCleaning =
true;
607 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](
const auto&
x) { return x.slot == slot; }), mPruneOps.end());
609 size_t saved = saveInSlot(timeslice, input, slot, info);
613 index.publishSlot(slot);
614 index.markAsDirty(slot,
true);
622 std::tie(input, timeslice) = getInputTimeslice(pristineContext);
624 auto DataHeaderInfo = [&rawHeader]() {
627 const auto* dh = o2::header::get<o2::header::DataHeader*>(rawHeader);
629 error += fmt::format(
"{}/{}/{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification);
631 error +=
"invalid header";
637 LOG(error) <<
"Could not match incoming data to any input route: " << DataHeaderInfo();
640 for (
size_t pi = 0; pi < nMessages; ++pi) {
641 messages[pi].reset(
nullptr);
647 LOG(error) <<
"Could not determine the timeslice for input: " << DataHeaderInfo();
650 for (
size_t pi = 0; pi < nMessages; ++pi) {
651 messages[pi].reset(
nullptr);
658 std::tie(action, slot) =
index.replaceLRUWith(pristineContext, timeslice);
659 uint64_t
const* debugTimestamp = std::get_if<uint64_t>(&pristineContext.
get(0));
662 "Slot %zu updated with %zu using action %d, %" PRIu64, slot.index, timeslice.value, (
int)action, *debugTimestamp);
665 updateStatistics(action);
671 static std::atomic<size_t> obsoleteCount = 0;
672 static std::atomic<size_t> mult = 1;
673 if ((obsoleteCount++ % (1 * mult)) == 0) {
674 LOGP(warning,
"Over {} incoming messages are already obsolete, not relaying.", obsoleteCount.load());
675 if (obsoleteCount > mult * 10) {
681 LOG(warning) <<
"Incoming data is invalid, not relaying.";
684 for (
size_t pi = 0; pi < nMessages; ++pi) {
685 messages[pi].reset(
nullptr);
693 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](
const auto&
x) { return x.slot == slot; }), mPruneOps.end());
694 size_t saved = saveInSlot(timeslice, input, slot, info);
698 index.publishSlot(slot);
699 index.markAsDirty(slot,
true);
707 LOGP(
debug,
"DataRelayer::getReadyToProcess");
708 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
711 const auto& cache = mCache;
712 const auto numInputTypes = mDistinctRoutesIndex.size();
718 auto getPartialRecord = [&cache, &numInputTypes](
int li) -> std::span<MessageSet const> {
719 auto offset = li * numInputTypes;
720 assert(cache.size() >=
offset + numInputTypes);
722 auto const end = cache.data() +
offset + numInputTypes;
731 LOGP(
debug,
"Doing action {} for slot {} (timeslice: {})", (
int)
op, li.
index, *timeslice);
734 LOGP(
debug,
"No timeslice associated with slot ", li.
index);
749 if (numInputTypes == 0) {
750 LOGP(
debug,
"numInputTypes == 0, returning.");
753 size_t cacheLines = cache.size() / numInputTypes;
754 assert(cacheLines * numInputTypes == cache.size());
755 int countConsume = 0;
756 int countConsumeExisting = 0;
757 int countProcess = 0;
758 int countDiscard = 0;
762 for (
int li = cacheLines - 1; li >= 0; --li) {
766 if (mTimesliceIndex.
isDirty(slot) ==
false) {
771 throw runtime_error_f(
"Completion police %s has no callback set", mCompletionPolicy.
name.c_str());
773 auto partial = getPartialRecord(li);
775 auto getter = [&partial](
size_t idx,
size_t part) {
776 if (partial[idx].
size() > 0 && partial[idx].header(part).get()) {
777 auto header = partial[idx].header(part).get();
778 auto payload = partial[idx].payload(part).get();
780 reinterpret_cast<const char*
>(header->GetData()),
781 reinterpret_cast<char const*
>(payload ? payload->GetData() :
nullptr),
782 payload ? payload->GetSize() : 0};
786 auto nPartsGetter = [&partial](
size_t idx) {
787 return partial[idx].size();
789 auto refCountGetter = [&partial](
size_t idx) ->
int {
790 auto& header =
static_cast<const fair::mq::shmem::Message&
>(*partial[idx].header(0));
791 return header.GetRefCount();
793 InputSpan span{getter, nPartsGetter, refCountGetter,
static_cast<size_t>(partial.size())};
797 auto timeslice = std::get_if<uint64_t>(&variables.get(0));
801 updateCompletionResults(slot, timeslice, action);
808 updateCompletionResults(slot, timeslice, action);
812 countConsumeExisting++;
813 updateCompletionResults(slot, timeslice, action);
818 updateCompletionResults(slot, timeslice, action);
823 updateCompletionResults(slot, timeslice, action);
838 LOGP(
debug,
"DataRelayer::getReadyToProcess results notDirty:{}, consume:{}, consumeExisting:{}, process:{}, discard:{}, wait:{}",
839 notDirty, countConsume, countConsumeExisting, countProcess,
840 countDiscard, countWait);
845 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
846 const auto numInputTypes = mDistinctRoutesIndex.size();
848 auto markInputDone = [&cachedStateMetrics = mCachedStateMetrics,
850 auto cacheId = s.index * numInputTypes + arg;
851 if (cachedStateMetrics[cacheId] == oldStatus) {
852 cachedStateMetrics[cacheId] = newStatus;
856 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
857 markInputDone(slot, ai, oldStatus, newStatus);
863 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
865 const auto numInputTypes = mDistinctRoutesIndex.size();
867 std::vector<MessageSet> messages(numInputTypes);
868 auto& cache = mCache;
869 auto&
index = mTimesliceIndex;
880 auto moveHeaderPayloadToOutput = [&messages,
881 &cachedStateMetrics = mCachedStateMetrics,
883 auto cacheId = s.index * numInputTypes + arg;
887 if (cache[cacheId].
size() > 0) {
888 messages[arg] = std::move(cache[cacheId]);
890 index.markAsInvalid(s);
898 for (
size_t ai = s.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
899 assert(std::accumulate(cache[ai].messages.begin(), cache[ai].messages.end(),
true, [](
bool result,
auto const&
element) { return result && element.get() == nullptr; }));
902 index.markAsInvalid(s);
906 jumpToCacheEntryAssociatedWith(slot);
907 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
908 moveHeaderPayloadToOutput(slot, ai);
910 invalidateCacheFor(slot);
917 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
919 const auto numInputTypes = mDistinctRoutesIndex.size();
921 std::vector<MessageSet> messages(numInputTypes);
922 auto& cache = mCache;
923 auto&
index = mTimesliceIndex;
934 auto copyHeaderPayloadToOutput = [&messages,
935 &cachedStateMetrics = mCachedStateMetrics,
937 auto cacheId = s.index * numInputTypes + arg;
941 for (
size_t pi = 0; pi < cache[cacheId].size(); pi++) {
942 auto& header = cache[cacheId].header(pi);
943 auto&& newHeader = header->GetTransport()->CreateMessage();
944 newHeader->Copy(*header);
945 messages[arg].add(
PartRef{std::move(newHeader), std::move(cache[cacheId].payload(pi))});
950 jumpToCacheEntryAssociatedWith(slot);
951 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
952 copyHeaderPayloadToOutput(slot, ai);
955 return std::move(messages);
960 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
962 for (
auto& cache : mCache) {
965 for (
size_t s = 0; s < mTimesliceIndex.
size(); ++s) {
973 return mCache.size() / mDistinctRoutesIndex.size();
982 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
984 mTimesliceIndex.
resize(s);
985 mVariableContextes.resize(s);
991 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
993 auto numInputTypes = mDistinctRoutesIndex.size();
997 mCache.resize(numInputTypes * mTimesliceIndex.
size());
1000 mCachedStateMetrics.resize(mCache.size());
1005 for (
size_t i = 0;
i < mVariableContextes.size(); ++
i) {
1007 .
name = fmt::format(
"matcher_variables/{}",
i),
1009 .minPublishInterval = 500,
1010 .sendInitialValue =
true,
1015 for (
int ci = 0; ci < mTimesliceIndex.
size(); ci++) {
1017 .
name = fmt::format(
"data_relayer/{}", ci),
1019 .minPublishInterval = 800,
1020 .sendInitialValue =
true,
1028 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1034 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1040 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1046 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1055 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1057 for (
size_t ci = 0; ci < mTimesliceIndex.
size(); ++ci) {
1062 char relayerSlotState[1024];
1065 int written = snprintf(relayerSlotState, 1024,
"%d ", (
int)mTimesliceIndex.
size());
1066 char*
buffer = relayerSlotState + written;
1067 for (
size_t ci = 0; ci < mTimesliceIndex.
size(); ++ci) {
1068 for (
size_t si = 0; si < mDistinctRoutesIndex.size(); ++si) {
1069 int index = ci * mDistinctRoutesIndex.size() + si;
1070 int value =
static_cast<int>(mCachedStateMetrics[
index]);
1078 buffer[mDistinctRoutesIndex.size()] =
'\0';
1079 auto size = (
int)(
buffer - relayerSlotState + mDistinctRoutesIndex.size());
std::vector< framework::ConcreteDataMatcher > matchers
std::vector< OutputRoute > routes
#define O2_BUILTIN_UNREACHABLE
o2::monitoring::Verbosity Verbosity
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
std::vector< MessageSet > consumeExistingInputsForTimeslice(TimesliceSlot id)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
std::function< void(TimesliceSlot, std::vector< MessageSet > &, TimesliceIndex::OldestOutputInfo info)> OnDropCallback
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
void getReadyToProcess(std::vector< RecordAction > &completed)
void setPipelineLength(size_t s)
Tune the maximum number of in flight timeslices this can handle.
size_t getParallelTimeslices() const
Returns how many timeslices we can handle in parallel.
std::vector< MessageSet > consumeAllInputsForTimeslice(TimesliceSlot id)
RelayChoice relay(void const *rawHeader, std::unique_ptr< fair::mq::Message > *messages, InputInfo const &info, size_t nMessages, size_t nPayloads=1, OnInsertionCallback onInsertion=nullptr, OnDropCallback onDrop=nullptr)
void pruneCache(TimesliceSlot slot, OnDropCallback onDrop=nullptr)
Prune the cache for a given slot.
std::function< void(ServiceRegistryRef &, std::span< fair::mq::MessagePtr > &)> OnInsertionCallback
DataRelayer(CompletionPolicy const &, std::vector< InputRoute > const &routes, TimesliceIndex &, ServiceRegistryRef)
void setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
void sendContextState()
Send metrics with the VariableContext information.
TimesliceId getTimesliceForSlot(TimesliceSlot slot)
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
TimesliceIndex::OldestOutputInfo getOldestPossibleOutput() const
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
void clear()
Remove all pending messages.
void markAsDirty(TimesliceSlot slot, bool value)
data_matcher::VariableContext & getPublishedVariablesForSlot(TimesliceSlot slot)
OldestInputInfo setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
OldestOutputInfo getOldestPossibleOutput() const
bool isDirty(TimesliceSlot const &slot) const
InputChannelInfo const & getChannelInfo(ChannelIndex channel) const
ActionTaken
The outcome for the processing of a given timeslot.
@ Wait
An obsolete slot is used to hold the new context and the old one is dropped.
@ DropObsolete
An invalid context is not inserted in the index and dropped.
@ DropInvalid
We wait for the oldest slot to complete.
@ ReplaceObsolete
An unused / invalid slot is used to hold the new context.
void rescan()
Mark all the cachelines as invalid, e.g. due to an out of band event.
bool validateSlot(TimesliceSlot slot, TimesliceId currentOldest)
bool isValid(TimesliceSlot const &slot) const
void markAsInvalid(TimesliceSlot slot)
OldestOutputInfo updateOldestPossibleOutput(bool rewinded)
data_matcher::VariableContext & getVariablesForSlot(TimesliceSlot slot)
void publish(void(*callback)(VariableContext const &, TimesliceSlot slot, void *context), void *context, TimesliceSlot slot)
ContextElement::Value const & get(size_t pos) const
GLsizei const GLfloat * value
constexpr int MAX_MATCHING_VARIABLE
Defining PrimaryVertex explicitly as messageable.
size_t matchToContext(void const *data, std::vector< DataDescriptorMatcher > const &matchers, std::vector< size_t > const &index, VariableContext &context)
constexpr int INVALID_INPUT
bool isCalibrationData(std::unique_ptr< fair::mq::Message > &first)
void sendVariableContextMetrics(VariableContext &context, TimesliceSlot slot, DataProcessingStates &states)
@ NoTransition
No pending transitions.
@ DROPPED_INCOMING_MESSAGES
RuntimeErrorRef runtime_error_f(const char *,...)
std::string to_string(gsl::span< T, Size > span)
static constexpr int INVALID
std::string name
Name of the policy itself.
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
CallbackConfigureRelayer configureRelayer
CallbackFull callbackFull
Actual policy which decides what to do with a partial InputRecord, extended version.
Helper struct to hold statistics about the data processing happening.
@ Add
Update the rate of the metric given the amount since the last time.
Type type
What was the outcome of the relay operation.
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static std::string describe(InputSpec const &spec)
static unsigned int pipelineLength()
get max number of timeslices in the queue
static bool onlineDeploymentMode()
@true if running online
Running state information of a given device.
ProcessingType allowedProcessing
Reference to an inflight part.
static bool isValid(TimesliceId const ×lice)
static constexpr uint64_t INVALID
static bool isValid(TimesliceSlot const &slot)
static constexpr uint64_t INVALID
static uint32_t getRunNumber(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFCounter(data_matcher::VariableContext const &variables)
static uint64_t getCreationTime(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFOrbit(data_matcher::VariableContext const &variables)
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"