44#include <Monitoring/Metric.h>
45#include <Monitoring/Monitoring.h>
47#include <fairlogger/Logger.h>
48#include <fairmq/Channel.h>
50#include <fairmq/shmem/Message.h>
51#include <fmt/format.h>
52#include <fmt/ostream.h>
71 std::vector<InputRoute>
const&
routes,
75 mTimesliceIndex{
index},
76 mCompletionPolicy{policy},
81 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
91 auto numInputTypes = mDistinctRoutesIndex.size();
93 std::string queries =
"";
94 for (
short i = 0;
i < numInputTypes; ++
i) {
96 assert(mDistinctRoutesIndex[
i] <
routes.size());
97 mInputs.push_back(
routes[mDistinctRoutesIndex[
i]].matcher);
98 auto& matcher =
routes[mDistinctRoutesIndex[
i]].matcher;
104 states.registerState({.name =
"data_queries", .stateId = stateId, .sendInitialValue =
true, .defaultEnabled =
true});
106 states.processCommandQueue();
111 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
119 LOGP(
debug,
"DataRelayer::processDanglingInputs");
120 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
125 if (expirationHandlers.empty()) {
126 LOGP(
debug,
"DataRelayer::processDanglingInputs: No expiration handlers");
130 std::vector<TimesliceSlot> slotsCreatedByHandlers;
132 LOGP(
debug,
"Creating new slot");
133 for (
auto& handler : expirationHandlers) {
134 LOGP(
debug,
"handler.creator for {}", handler.name);
135 auto channelIndex = deviceProxy.getInputChannelIndex(handler.routeIndex);
136 slotsCreatedByHandlers.push_back(handler.creator(services, channelIndex));
141 for (
auto slot : slotsCreatedByHandlers) {
147 if (validSlots > 0) {
149 LOGP(
debug,
"DataRelayer::processDanglingInputs: {} slots created by handler", validSlots);
151 LOGP(
debug,
"DataRelayer::processDanglingInputs: no slots created by handler");
155 int headerPresent = 0;
156 int payloadPresent = 0;
159 int checkerDenied = 0;
160 for (
size_t ti = 0; ti < mTimesliceIndex.
size(); ++ti) {
162 if (mTimesliceIndex.
isValid(slot) ==
false) {
165 assert(mDistinctRoutesIndex.empty() ==
false);
169 for (
size_t ei = 0; ei < expirationHandlers.size(); ++ei) {
170 auto& expirator = expirationHandlers[ei];
173 auto& part = mCache[ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value];
174 if (part.size() > 0 && part.header(0) !=
nullptr) {
178 if (part.size() > 0 && part.payload(0) !=
nullptr) {
183 if (!expirator.checker) {
187 if (slotsCreatedByHandlers[ei] != slot) {
192 auto getPartialRecord = [&cache = mCache, numInputTypes = mDistinctRoutesIndex.size()](
int li) -> std::span<MessageSet const> {
193 auto offset = li * numInputTypes;
194 assert(cache.size() >=
offset + numInputTypes);
196 auto const end = cache.data() +
offset + numInputTypes;
200 auto partial = getPartialRecord(ti);
202 auto getter = [&partial](
size_t idx,
size_t part) {
203 if (partial[idx].
size() > 0 && partial[idx].header(part).get()) {
204 auto header = partial[idx].header(part).get();
205 auto payload = partial[idx].payload(part).get();
207 reinterpret_cast<const char*
>(header->GetData()),
208 reinterpret_cast<char const*
>(payload ? payload->GetData() :
nullptr),
209 payload ? payload->GetSize() : 0};
213 auto nPartsGetter = [&partial](
size_t idx) {
214 return partial[idx].size();
216 auto refCountGetter = [&partial](
size_t idx) ->
int {
217 auto& header =
static_cast<const fair::mq::shmem::Message&
>(*partial[idx].header(0));
218 return header.GetRefCount();
220 InputSpan span{getter, nPartsGetter, refCountGetter,
static_cast<size_t>(partial.size())};
223 if (expirator.checker(services, timestamp.value, span) ==
false) {
228 assert(ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value < mCache.size());
229 assert(expirator.handler);
231 expirator.handler(services, newRef, variables);
232 part.reset(std::move(newRef));
233 activity.expiredSlots++;
236 assert(part.header(0) !=
nullptr);
237 assert(part.payload(0) !=
nullptr);
240 LOGP(
debug,
"DataRelayer::processDanglingInputs headerPresent:{}, payloadPresent:{}, noCheckers:{}, badSlot:{}, checkerDenied:{}",
241 headerPresent, payloadPresent, noCheckers, badSlot, checkerDenied);
249 std::vector<DataDescriptorMatcher>
const& matchers,
250 std::vector<size_t>
const&
index,
253 for (
size_t ri = 0, re =
index.size(); ri < re; ++ri) {
254 auto& matcher = matchers[
index[ri]];
256 if (matcher.match(
reinterpret_cast<char const*
>(
data), context)) {
269 static const std::string nullstring{
"null"};
273 static std::string
state =
"";
276 auto var = variables.
get(
i);
277 if (
auto pval = std::get_if<uint64_t>(&var)) {
279 }
else if (
auto pval = std::get_if<uint32_t>(&var)) {
281 }
else if (
auto pval2 = std::get_if<std::string>(&var)) {
289 .data =
state.data()});
297 LOGP(
debug,
"DataRelayer::setOldestPossibleInput {} from channel {}", newOldest.timeslice.value, newOldest.channel.value);
298 static bool dontDrop = getenv(
"DPL_DONT_DROP_OLD_TIMESLICE") && atoi(getenv(
"DPL_DONT_DROP_OLD_TIMESLICE"));
302 for (
size_t si = 0; si < mCache.size() / mInputs.size(); ++si) {
305 auto valid = mTimesliceIndex.
validateSlot({si}, newOldest.timeslice);
307 if (mTimesliceIndex.
isValid({si})) {
308 LOGP(
debug,
"Keeping slot {} because data has timestamp {} while oldest possible timestamp is {}", si, timestamp.value, newOldest.timeslice.value);
312 mPruneOps.push_back(
PruneOp{si});
313 bool didDrop =
false;
314 for (
size_t mi = 0; mi < mInputs.size(); ++mi) {
315 auto& input = mInputs[mi];
316 auto&
element = mCache[si * mInputs.size() + mi];
318 if (input.lifetime != Lifetime::Condition && mCompletionPolicy.
name !=
"internal-dpl-injected-dummy-sink") {
322 LOGP(warning,
"Stop transition requested. Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it will never be completed.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
324 LOGP(error,
"Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it can never be completed.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
328 "Silently dropping data {} in pipeline slot {} because it has timeslice {} < {} after receiving data from channel {}."
329 "Because Lifetime::Timeframe data not there and not expected (e.g. due to sampling) we drop non sampled, non timeframe data (e.g. Conditions).",
337 for (
size_t mi = 0; mi < mInputs.size(); ++mi) {
338 auto& input = mInputs[mi];
339 if (input.lifetime == Lifetime::Timer) {
342 auto&
element = mCache[si * mInputs.size() + mi];
348 O2_SIGNPOST_EVENT_EMIT(calibration, cid,
"expected_missing_data",
"Expected missing %{public}s (lifetime:%d) while dropping non-calibration data in slot %zu with timestamp %zu < %zu.",
351 LOGP(info,
"Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
356 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid,
"expected_missing_data",
"Not processing in calibration mode: missing %s (lifetime:%d) while dropping incomplete data in slot %zu with timestamp %zu < %zu.",
359 LOGP(error,
"Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
375 for (
auto&
op : mPruneOps) {
388 &cachedStateMetrics = mCachedStateMetrics,
389 numInputTypes = mDistinctRoutesIndex.size(),
390 &
index = mTimesliceIndex,
393 auto oldestPossibleTimeslice =
index.getOldestPossibleOutput();
395 std::vector<MessageSet> dropped(numInputTypes);
396 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
397 auto cacheId = slot.
index * numInputTypes + ai;
401 if (cache[cacheId].
size() > 0) {
402 dropped[ai] = std::move(cache[cacheId]);
405 bool anyDropped = std::any_of(dropped.begin(), dropped.end(), [](
auto&
m) { return m.size(); });
408 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid,
"pruneCache",
"Dropping stuff from slot %zu with timeslice %zu", slot.
index, oldestPossibleTimeslice.timeslice.value);
409 onDrop(slot, dropped, oldestPossibleTimeslice);
412 assert(cache.empty() ==
false);
413 assert(
index.size() * numInputTypes == cache.size());
417 assert(numInputTypes * slot.
index < cache.size());
418 for (
size_t ai = slot.
index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
429 auto* dph = o2::header::get<DataProcessingHeader*>(
first->GetData());
435 std::unique_ptr<fair::mq::Message>* messages,
441 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
447 return (slot.index % maxLanes) == (currentLane % maxLanes);
452 auto getInputTimeslice = [&matchers = mInputMatchers,
453 &distinctRoutes = mDistinctRoutesIndex,
456 -> std::tuple<int, TimesliceId> {
459 auto input =
matchToContext(rawHeader, matchers, distinctRoutes, context);
469 if (
auto pval = std::get_if<uint64_t>(&context.get(0))) {
471 return {input, timeslice};
481 auto saveInSlot = [&cachedStateMetrics = mCachedStateMetrics,
486 &services = mContext,
489 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid,
"saveInSlot",
"saving %{public}s@%zu in slot %zu from %{public}s",
490 fmt::format(
"{:x}", *o2::header::get<DataHeader*>(messages[0]->GetData())).c_str(),
491 timeslice.value, slot.index,
493 auto cacheIdx = numInputTypes * slot.index + input;
498 assert(nPayloads > 0);
500 for (
size_t mi = 0; mi < nMessages; ++mi) {
501 assert(mi + nPayloads < nMessages);
507 "Dropping incoming %zu messages because they are data processing.", nPayloads);
509 for (
size_t i = mi;
i < mi + nPayloads + 1;
i++) {
510 auto discard = std::move(messages[
i]);
515 target.add([&messages, &mi](
size_t i) -> fair::mq::MessagePtr& {
return messages[mi +
i]; }, nPayloads + 1);
553 auto&
index = mTimesliceIndex;
555 bool needsCleaning =
false;
558 for (
size_t ci = 0; ci <
index.size(); ++ci) {
560 if (!isSlotInLane(slot)) {
563 if (
index.isValid(slot) ==
false) {
566 std::tie(input, timeslice) = getInputTimeslice(
index.getVariablesForSlot(slot));
575 for (
size_t ci = 0; ci <
index.size(); ++ci) {
577 if (
index.isValid(slot) ==
true) {
580 if (!isSlotInLane(slot)) {
583 std::tie(input, timeslice) = getInputTimeslice(
index.getVariablesForSlot(slot));
585 needsCleaning =
true;
596 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](
const auto&
x) { return x.slot == slot; }), mPruneOps.end());
598 size_t saved = saveInSlot(timeslice, input, slot, info);
602 index.publishSlot(slot);
603 index.markAsDirty(slot,
true);
611 std::tie(input, timeslice) = getInputTimeslice(pristineContext);
613 auto DataHeaderInfo = [&rawHeader]() {
616 const auto* dh = o2::header::get<o2::header::DataHeader*>(rawHeader);
618 error += fmt::format(
"{}/{}/{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification);
620 error +=
"invalid header";
626 LOG(error) <<
"Could not match incoming data to any input route: " << DataHeaderInfo();
629 for (
size_t pi = 0; pi < nMessages; ++pi) {
630 messages[pi].reset(
nullptr);
636 LOG(error) <<
"Could not determine the timeslice for input: " << DataHeaderInfo();
639 for (
size_t pi = 0; pi < nMessages; ++pi) {
640 messages[pi].reset(
nullptr);
647 std::tie(action, slot) =
index.replaceLRUWith(pristineContext, timeslice);
648 uint64_t
const* debugTimestamp = std::get_if<uint64_t>(&pristineContext.
get(0));
651 "Slot %zu updated with %zu using action %d, %" PRIu64, slot.index, timeslice.value, (
int)action, *debugTimestamp);
654 updateStatistics(action);
660 static std::atomic<size_t> obsoleteCount = 0;
661 static std::atomic<size_t> mult = 1;
662 if ((obsoleteCount++ % (1 * mult)) == 0) {
663 LOGP(warning,
"Over {} incoming messages are already obsolete, not relaying.", obsoleteCount.load());
664 if (obsoleteCount > mult * 10) {
670 LOG(warning) <<
"Incoming data is invalid, not relaying.";
673 for (
size_t pi = 0; pi < nMessages; ++pi) {
674 messages[pi].reset(
nullptr);
682 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](
const auto&
x) { return x.slot == slot; }), mPruneOps.end());
683 size_t saved = saveInSlot(timeslice, input, slot, info);
687 index.publishSlot(slot);
688 index.markAsDirty(slot,
true);
696 LOGP(
debug,
"DataRelayer::getReadyToProcess");
697 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
700 const auto& cache = mCache;
701 const auto numInputTypes = mDistinctRoutesIndex.size();
707 auto getPartialRecord = [&cache, &numInputTypes](
int li) -> std::span<MessageSet const> {
708 auto offset = li * numInputTypes;
709 assert(cache.size() >=
offset + numInputTypes);
711 auto const end = cache.data() +
offset + numInputTypes;
720 LOGP(
debug,
"Doing action {} for slot {} (timeslice: {})", (
int)
op, li.
index, *timeslice);
723 LOGP(
debug,
"No timeslice associated with slot ", li.
index);
738 if (numInputTypes == 0) {
739 LOGP(
debug,
"numInputTypes == 0, returning.");
742 size_t cacheLines = cache.size() / numInputTypes;
743 assert(cacheLines * numInputTypes == cache.size());
744 int countConsume = 0;
745 int countConsumeExisting = 0;
746 int countProcess = 0;
747 int countDiscard = 0;
751 for (
int li = cacheLines - 1; li >= 0; --li) {
755 if (mTimesliceIndex.
isDirty(slot) ==
false) {
760 throw runtime_error_f(
"Completion police %s has no callback set", mCompletionPolicy.
name.c_str());
762 auto partial = getPartialRecord(li);
764 auto getter = [&partial](
size_t idx,
size_t part) {
765 if (partial[idx].
size() > 0 && partial[idx].header(part).get()) {
766 auto header = partial[idx].header(part).get();
767 auto payload = partial[idx].payload(part).get();
769 reinterpret_cast<const char*
>(header->GetData()),
770 reinterpret_cast<char const*
>(payload ? payload->GetData() :
nullptr),
771 payload ? payload->GetSize() : 0};
775 auto nPartsGetter = [&partial](
size_t idx) {
776 return partial[idx].size();
778 auto refCountGetter = [&partial](
size_t idx) ->
int {
779 auto& header =
static_cast<const fair::mq::shmem::Message&
>(*partial[idx].header(0));
780 return header.GetRefCount();
782 InputSpan span{getter, nPartsGetter, refCountGetter,
static_cast<size_t>(partial.size())};
786 auto timeslice = std::get_if<uint64_t>(&variables.get(0));
790 updateCompletionResults(slot, timeslice, action);
797 updateCompletionResults(slot, timeslice, action);
801 countConsumeExisting++;
802 updateCompletionResults(slot, timeslice, action);
807 updateCompletionResults(slot, timeslice, action);
812 updateCompletionResults(slot, timeslice, action);
827 LOGP(
debug,
"DataRelayer::getReadyToProcess results notDirty:{}, consume:{}, consumeExisting:{}, process:{}, discard:{}, wait:{}",
828 notDirty, countConsume, countConsumeExisting, countProcess,
829 countDiscard, countWait);
834 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
835 const auto numInputTypes = mDistinctRoutesIndex.size();
837 auto markInputDone = [&cachedStateMetrics = mCachedStateMetrics,
839 auto cacheId = s.index * numInputTypes + arg;
840 if (cachedStateMetrics[cacheId] == oldStatus) {
841 cachedStateMetrics[cacheId] = newStatus;
845 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
846 markInputDone(slot, ai, oldStatus, newStatus);
852 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
854 const auto numInputTypes = mDistinctRoutesIndex.size();
856 std::vector<MessageSet> messages(numInputTypes);
857 auto& cache = mCache;
858 auto&
index = mTimesliceIndex;
869 auto moveHeaderPayloadToOutput = [&messages,
870 &cachedStateMetrics = mCachedStateMetrics,
872 auto cacheId = s.index * numInputTypes + arg;
876 if (cache[cacheId].
size() > 0) {
877 messages[arg] = std::move(cache[cacheId]);
879 index.markAsInvalid(s);
887 for (
size_t ai = s.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
888 assert(std::accumulate(cache[ai].messages.begin(), cache[ai].messages.end(),
true, [](
bool result,
auto const&
element) { return result && element.get() == nullptr; }));
891 index.markAsInvalid(s);
895 jumpToCacheEntryAssociatedWith(slot);
896 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
897 moveHeaderPayloadToOutput(slot, ai);
899 invalidateCacheFor(slot);
906 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
908 const auto numInputTypes = mDistinctRoutesIndex.size();
910 std::vector<MessageSet> messages(numInputTypes);
911 auto& cache = mCache;
912 auto&
index = mTimesliceIndex;
923 auto copyHeaderPayloadToOutput = [&messages,
924 &cachedStateMetrics = mCachedStateMetrics,
926 auto cacheId = s.index * numInputTypes + arg;
930 for (
size_t pi = 0; pi < cache[cacheId].size(); pi++) {
931 auto& header = cache[cacheId].header(pi);
932 auto&& newHeader = header->GetTransport()->CreateMessage();
933 newHeader->Copy(*header);
934 messages[arg].add(
PartRef{std::move(newHeader), std::move(cache[cacheId].payload(pi))});
939 jumpToCacheEntryAssociatedWith(slot);
940 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
941 copyHeaderPayloadToOutput(slot, ai);
944 return std::move(messages);
949 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
951 for (
auto& cache : mCache) {
954 for (
size_t s = 0; s < mTimesliceIndex.
size(); ++s) {
962 return mCache.size() / mDistinctRoutesIndex.size();
971 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
973 mTimesliceIndex.
resize(s);
974 mVariableContextes.resize(s);
980 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
982 auto numInputTypes = mDistinctRoutesIndex.size();
986 mCache.resize(numInputTypes * mTimesliceIndex.
size());
989 mCachedStateMetrics.resize(mCache.size());
994 for (
size_t i = 0;
i < mVariableContextes.size(); ++
i) {
996 .
name = fmt::format(
"matcher_variables/{}",
i),
998 .minPublishInterval = 500,
999 .sendInitialValue =
true,
1004 for (
int ci = 0; ci < mTimesliceIndex.
size(); ci++) {
1006 .
name = fmt::format(
"data_relayer/{}", ci),
1008 .minPublishInterval = 800,
1009 .sendInitialValue =
true,
1017 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1023 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1029 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1035 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1044 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1046 for (
size_t ci = 0; ci < mTimesliceIndex.
size(); ++ci) {
1051 char relayerSlotState[1024];
1054 int written = snprintf(relayerSlotState, 1024,
"%d ", (
int)mTimesliceIndex.
size());
1055 char*
buffer = relayerSlotState + written;
1056 for (
size_t ci = 0; ci < mTimesliceIndex.
size(); ++ci) {
1057 for (
size_t si = 0; si < mDistinctRoutesIndex.size(); ++si) {
1058 int index = ci * mDistinctRoutesIndex.size() + si;
1059 int value =
static_cast<int>(mCachedStateMetrics[
index]);
1067 buffer[mDistinctRoutesIndex.size()] =
'\0';
1068 auto size = (
int)(
buffer - relayerSlotState + mDistinctRoutesIndex.size());
std::vector< OutputRoute > routes
#define O2_BUILTIN_UNREACHABLE
o2::monitoring::Verbosity Verbosity
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
std::vector< MessageSet > consumeExistingInputsForTimeslice(TimesliceSlot id)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
std::function< void(TimesliceSlot, std::vector< MessageSet > &, TimesliceIndex::OldestOutputInfo info)> OnDropCallback
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
void getReadyToProcess(std::vector< RecordAction > &completed)
void setPipelineLength(size_t s)
Tune the maximum number of in flight timeslices this can handle.
size_t getParallelTimeslices() const
Returns how many timeslices we can handle in parallel.
std::vector< MessageSet > consumeAllInputsForTimeslice(TimesliceSlot id)
void pruneCache(TimesliceSlot slot, OnDropCallback onDrop=nullptr)
Prune the cache for a given slot.
RelayChoice relay(void const *rawHeader, std::unique_ptr< fair::mq::Message > *messages, InputInfo const &info, size_t nMessages, size_t nPayloads=1, OnDropCallback onDrop=nullptr)
DataRelayer(CompletionPolicy const &, std::vector< InputRoute > const &routes, TimesliceIndex &, ServiceRegistryRef)
void setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
void sendContextState()
Send metrics with the VariableContext information.
TimesliceId getTimesliceForSlot(TimesliceSlot slot)
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
TimesliceIndex::OldestOutputInfo getOldestPossibleOutput() const
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
void clear()
Remove all pending messages.
void markAsDirty(TimesliceSlot slot, bool value)
data_matcher::VariableContext & getPublishedVariablesForSlot(TimesliceSlot slot)
OldestInputInfo setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
OldestOutputInfo getOldestPossibleOutput() const
bool isDirty(TimesliceSlot const &slot) const
InputChannelInfo const & getChannelInfo(ChannelIndex channel) const
ActionTaken
The outcome for the processing of a given timeslot.
@ Wait
An obsolete slot is used to hold the new context and the old one is dropped.
@ DropObsolete
An invalid context is not inserted in the index and dropped.
@ DropInvalid
We wait for the oldest slot to complete.
@ ReplaceObsolete
An unused / invalid slot is used to hold the new context.
void rescan()
Mark all the cachelines as invalid, e.g. due to an out of band event.
bool validateSlot(TimesliceSlot slot, TimesliceId currentOldest)
bool isValid(TimesliceSlot const &slot) const
void markAsInvalid(TimesliceSlot slot)
OldestOutputInfo updateOldestPossibleOutput(bool rewinded)
data_matcher::VariableContext & getVariablesForSlot(TimesliceSlot slot)
void publish(void(*callback)(VariableContext const &, TimesliceSlot slot, void *context), void *context, TimesliceSlot slot)
ContextElement::Value const & get(size_t pos) const
GLsizei const GLfloat * value
constexpr int MAX_MATCHING_VARIABLE
Defining PrimaryVertex explicitly as messageable.
size_t matchToContext(void const *data, std::vector< DataDescriptorMatcher > const &matchers, std::vector< size_t > const &index, VariableContext &context)
constexpr int INVALID_INPUT
bool isCalibrationData(std::unique_ptr< fair::mq::Message > &first)
void sendVariableContextMetrics(VariableContext &context, TimesliceSlot slot, DataProcessingStates &states)
@ NoTransition
No pending transitions.
@ DROPPED_INCOMING_MESSAGES
RuntimeErrorRef runtime_error_f(const char *,...)
std::string to_string(gsl::span< T, Size > span)
static constexpr int INVALID
std::string name
Name of the policy itself.
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
CallbackConfigureRelayer configureRelayer
CallbackFull callbackFull
Actual policy which decides what to do with a partial InputRecord, extended version.
Helper struct to hold statistics about the data processing happening.
@ Add
Update the rate of the metric given the amount since the last time.
Type type
What was the outcome of the relay operation.
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static std::string describe(InputSpec const &spec)
static unsigned int pipelineLength()
get max number of timeslices in the queue
static bool onlineDeploymentMode()
@true if running online
Running state information of a given device.
ProcessingType allowedProcessing
Reference to an inflight part.
static bool isValid(TimesliceId const ×lice)
static constexpr uint64_t INVALID
static bool isValid(TimesliceSlot const &slot)
static constexpr uint64_t INVALID
static uint32_t getRunNumber(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFCounter(data_matcher::VariableContext const &variables)
static uint64_t getCreationTime(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFOrbit(data_matcher::VariableContext const &variables)
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"