45#include <Monitoring/Metric.h>
46#include <Monitoring/Monitoring.h>
48#include <fairlogger/Logger.h>
49#include <fairmq/Channel.h>
51#include <fairmq/shmem/Message.h>
52#include <fairmq/Device.h>
53#include <fmt/format.h>
54#include <fmt/ostream.h>
73 std::vector<InputRoute>
const&
routes,
78 mTimesliceIndex{
index},
79 mCompletionPolicy{policy},
84 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
87 if (pipelineLength == -1) {
88 auto getPipelineLengthHelper = [&services]() {
95 static int detectedPipelineLength = getPipelineLengthHelper();
96 pipelineLength = detectedPipelineLength;
104 auto numInputTypes = mDistinctRoutesIndex.size();
106 std::string queries =
"";
107 for (
short i = 0;
i < numInputTypes; ++
i) {
109 assert(mDistinctRoutesIndex[
i] <
routes.size());
110 mInputs.push_back(
routes[mDistinctRoutesIndex[
i]].matcher);
111 auto& matcher =
routes[mDistinctRoutesIndex[
i]].matcher;
117 states.registerState({.name =
"data_queries", .stateId = stateId, .sendInitialValue =
true, .defaultEnabled =
true});
119 states.processCommandQueue();
124 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
132 LOGP(
debug,
"DataRelayer::processDanglingInputs");
133 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
138 if (expirationHandlers.empty()) {
139 LOGP(
debug,
"DataRelayer::processDanglingInputs: No expiration handlers");
143 std::vector<TimesliceSlot> slotsCreatedByHandlers;
145 LOGP(
debug,
"Creating new slot");
146 for (
auto& handler : expirationHandlers) {
147 LOGP(
debug,
"handler.creator for {}", handler.name);
148 auto channelIndex = deviceProxy.getInputChannelIndex(handler.routeIndex);
149 slotsCreatedByHandlers.push_back(handler.creator(services, channelIndex));
154 for (
auto slot : slotsCreatedByHandlers) {
160 if (validSlots > 0) {
162 LOGP(
debug,
"DataRelayer::processDanglingInputs: {} slots created by handler", validSlots);
164 LOGP(
debug,
"DataRelayer::processDanglingInputs: no slots created by handler");
168 int headerPresent = 0;
169 int payloadPresent = 0;
172 int checkerDenied = 0;
173 for (
size_t ti = 0; ti < mTimesliceIndex.
size(); ++ti) {
175 if (mTimesliceIndex.
isValid(slot) ==
false) {
178 assert(mDistinctRoutesIndex.empty() ==
false);
182 for (
size_t ei = 0; ei < expirationHandlers.size(); ++ei) {
183 auto& expirator = expirationHandlers[ei];
186 auto& part = mCache[ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value];
187 if (part.size() > 0 && part.header(0) !=
nullptr) {
191 if (part.size() > 0 && part.payload(0) !=
nullptr) {
196 if (!expirator.checker) {
200 if (slotsCreatedByHandlers[ei] != slot) {
205 auto getPartialRecord = [&cache = mCache, numInputTypes = mDistinctRoutesIndex.size()](
int li) -> std::span<MessageSet const> {
206 auto offset = li * numInputTypes;
207 assert(cache.size() >=
offset + numInputTypes);
209 auto const end = cache.data() +
offset + numInputTypes;
213 auto partial = getPartialRecord(ti);
215 auto getter = [&partial](
size_t idx,
size_t part) {
216 if (partial[idx].
size() > 0 && partial[idx].header(part).get()) {
217 auto header = partial[idx].header(part).get();
218 auto payload = partial[idx].payload(part).get();
220 reinterpret_cast<const char*
>(header->GetData()),
221 reinterpret_cast<char const*
>(payload ? payload->GetData() :
nullptr),
222 payload ? payload->GetSize() : 0};
226 auto nPartsGetter = [&partial](
size_t idx) {
227 return partial[idx].size();
229 auto refCountGetter = [&partial](
size_t idx) ->
int {
230 auto& header =
static_cast<const fair::mq::shmem::Message&
>(*partial[idx].header(0));
231 return header.GetRefCount();
233 InputSpan span{getter, nPartsGetter, refCountGetter,
static_cast<size_t>(partial.size())};
236 if (expirator.checker(services, timestamp.value, span) ==
false) {
241 assert(ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value < mCache.size());
242 assert(expirator.handler);
244 expirator.handler(services, newRef, variables);
245 part.reset(std::move(newRef));
246 activity.expiredSlots++;
249 assert(part.header(0) !=
nullptr);
250 assert(part.payload(0) !=
nullptr);
253 LOGP(
debug,
"DataRelayer::processDanglingInputs headerPresent:{}, payloadPresent:{}, noCheckers:{}, badSlot:{}, checkerDenied:{}",
254 headerPresent, payloadPresent, noCheckers, badSlot, checkerDenied);
262 std::vector<DataDescriptorMatcher>
const&
matchers,
263 std::vector<size_t>
const&
index,
266 for (
size_t ri = 0, re =
index.size(); ri < re; ++ri) {
269 if (matcher.match(
reinterpret_cast<char const*
>(
data), context)) {
282 static const std::string nullstring{
"null"};
286 static std::string
state =
"";
289 auto var = variables.
get(
i);
290 if (
auto pval = std::get_if<uint64_t>(&var)) {
292 }
else if (
auto pval = std::get_if<uint32_t>(&var)) {
294 }
else if (
auto pval2 = std::get_if<std::string>(&var)) {
302 .data =
state.data()});
310 LOGP(
debug,
"DataRelayer::setOldestPossibleInput {} from channel {}", newOldest.timeslice.value, newOldest.channel.value);
311 static bool dontDrop = getenv(
"DPL_DONT_DROP_OLD_TIMESLICE") && atoi(getenv(
"DPL_DONT_DROP_OLD_TIMESLICE"));
315 for (
size_t si = 0; si < mCache.size() / mInputs.size(); ++si) {
318 auto valid = mTimesliceIndex.
validateSlot({si}, newOldest.timeslice);
320 if (mTimesliceIndex.
isValid({si})) {
321 LOGP(
debug,
"Keeping slot {} because data has timestamp {} while oldest possible timestamp is {}", si, timestamp.value, newOldest.timeslice.value);
325 mPruneOps.push_back(
PruneOp{si});
326 bool didDrop =
false;
327 for (
size_t mi = 0; mi < mInputs.size(); ++mi) {
328 auto& input = mInputs[mi];
329 auto&
element = mCache[si * mInputs.size() + mi];
331 if (input.lifetime != Lifetime::Condition && mCompletionPolicy.
name !=
"internal-dpl-injected-dummy-sink") {
335 LOGP(warning,
"Stop transition requested. Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it will never be completed.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
337 LOGP(error,
"Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it can never be completed.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
341 "Silently dropping data {} in pipeline slot {} because it has timeslice {} < {} after receiving data from channel {}."
342 "Because Lifetime::Timeframe data not there and not expected (e.g. due to sampling) we drop non sampled, non timeframe data (e.g. Conditions).",
350 for (
size_t mi = 0; mi < mInputs.size(); ++mi) {
351 auto& input = mInputs[mi];
352 if (input.lifetime == Lifetime::Timer) {
355 auto&
element = mCache[si * mInputs.size() + mi];
361 O2_SIGNPOST_EVENT_EMIT(calibration, cid,
"expected_missing_data",
"Expected missing %{public}s (lifetime:%d) while dropping non-calibration data in slot %zu with timestamp %zu < %zu.",
364 LOGP(info,
"Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
369 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid,
"expected_missing_data",
"Not processing in calibration mode: missing %s (lifetime:%d) while dropping incomplete data in slot %zu with timestamp %zu < %zu.",
372 LOGP(error,
"Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
388 for (
auto&
op : mPruneOps) {
401 &cachedStateMetrics = mCachedStateMetrics,
402 numInputTypes = mDistinctRoutesIndex.size(),
403 &
index = mTimesliceIndex,
406 auto oldestPossibleTimeslice =
index.getOldestPossibleOutput();
408 std::vector<MessageSet> dropped(numInputTypes);
409 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
410 auto cacheId = slot.
index * numInputTypes + ai;
414 if (cache[cacheId].
size() > 0) {
415 dropped[ai] = std::move(cache[cacheId]);
418 bool anyDropped = std::any_of(dropped.begin(), dropped.end(), [](
auto&
m) { return m.size(); });
421 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid,
"pruneCache",
"Dropping stuff from slot %zu with timeslice %zu", slot.
index, oldestPossibleTimeslice.timeslice.value);
422 onDrop(slot, dropped, oldestPossibleTimeslice);
425 assert(cache.empty() ==
false);
426 assert(
index.size() * numInputTypes == cache.size());
430 assert(numInputTypes * slot.
index < cache.size());
431 for (
size_t ai = slot.
index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
442 auto* dph = o2::header::get<DataProcessingHeader*>(
first->GetData());
448 std::unique_ptr<fair::mq::Message>* messages,
455 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
461 return (slot.index % maxLanes) == (currentLane % maxLanes);
466 auto getInputTimeslice = [&
matchers = mInputMatchers,
467 &distinctRoutes = mDistinctRoutesIndex,
470 -> std::tuple<int, TimesliceId> {
483 if (
auto pval = std::get_if<uint64_t>(&context.get(0))) {
485 return {input, timeslice};
495 auto saveInSlot = [&cachedStateMetrics = mCachedStateMetrics,
501 &services = mContext,
504 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid,
"saveInSlot",
"saving %{public}s@%zu in slot %zu from %{public}s",
505 fmt::format(
"{:x}", *o2::header::get<DataHeader*>(messages[0]->GetData())).c_str(),
506 timeslice.value, slot.index,
508 auto cacheIdx = numInputTypes * slot.index + input;
513 assert(nPayloads > 0);
517 auto allMessages = std::span<fair::mq::MessagePtr>(messages, messages + nMessages);
519 onInsertion(services, allMessages);
521 for (
size_t mi = 0; mi < nMessages; ++mi) {
522 assert(mi + nPayloads < nMessages);
528 "Dropping incoming %zu messages because they are data processing.", nPayloads);
530 for (
size_t i = mi;
i < mi + nPayloads + 1;
i++) {
531 auto discard = std::move(messages[
i]);
536 auto span = std::span<fair::mq::MessagePtr>(messages + mi, messages + mi + nPayloads + 1);
539 target.add([&span](
size_t i) -> fair::mq::MessagePtr& {
return span[
i]; }, nPayloads + 1);
577 auto&
index = mTimesliceIndex;
579 bool needsCleaning =
false;
582 for (
size_t ci = 0; ci <
index.size(); ++ci) {
584 if (!isSlotInLane(slot)) {
587 if (
index.isValid(slot) ==
false) {
590 std::tie(input, timeslice) = getInputTimeslice(
index.getVariablesForSlot(slot));
599 for (
size_t ci = 0; ci <
index.size(); ++ci) {
601 if (
index.isValid(slot) ==
true) {
604 if (!isSlotInLane(slot)) {
607 std::tie(input, timeslice) = getInputTimeslice(
index.getVariablesForSlot(slot));
609 needsCleaning =
true;
620 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](
const auto&
x) { return x.slot == slot; }), mPruneOps.end());
622 size_t saved = saveInSlot(timeslice, input, slot, info);
626 index.publishSlot(slot);
627 index.markAsDirty(slot,
true);
635 std::tie(input, timeslice) = getInputTimeslice(pristineContext);
637 auto DataHeaderInfo = [&rawHeader]() {
640 const auto* dh = o2::header::get<o2::header::DataHeader*>(rawHeader);
642 error += fmt::format(
"{}/{}/{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification);
644 error +=
"invalid header";
650 LOG(error) <<
"Could not match incoming data to any input route: " << DataHeaderInfo();
653 for (
size_t pi = 0; pi < nMessages; ++pi) {
654 messages[pi].reset(
nullptr);
660 LOG(error) <<
"Could not determine the timeslice for input: " << DataHeaderInfo();
663 for (
size_t pi = 0; pi < nMessages; ++pi) {
664 messages[pi].reset(
nullptr);
671 std::tie(action, slot) =
index.replaceLRUWith(pristineContext, timeslice);
672 uint64_t
const* debugTimestamp = std::get_if<uint64_t>(&pristineContext.
get(0));
675 "Slot %zu updated with %zu using action %d, %" PRIu64, slot.index, timeslice.value, (
int)action, *debugTimestamp);
678 updateStatistics(action);
684 static std::atomic<size_t> obsoleteCount = 0;
685 static std::atomic<size_t> mult = 1;
686 if ((obsoleteCount++ % (1 * mult)) == 0) {
687 LOGP(warning,
"Over {} incoming messages are already obsolete, not relaying.", obsoleteCount.load());
688 if (obsoleteCount > mult * 10) {
694 LOG(warning) <<
"Incoming data is invalid, not relaying.";
697 for (
size_t pi = 0; pi < nMessages; ++pi) {
698 messages[pi].reset(
nullptr);
706 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](
const auto&
x) { return x.slot == slot; }), mPruneOps.end());
707 size_t saved = saveInSlot(timeslice, input, slot, info);
711 index.publishSlot(slot);
712 index.markAsDirty(slot,
true);
720 LOGP(
debug,
"DataRelayer::getReadyToProcess");
721 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
724 const auto& cache = mCache;
725 const auto numInputTypes = mDistinctRoutesIndex.size();
731 auto getPartialRecord = [&cache, &numInputTypes](
int li) -> std::span<MessageSet const> {
732 auto offset = li * numInputTypes;
733 assert(cache.size() >=
offset + numInputTypes);
735 auto const end = cache.data() +
offset + numInputTypes;
744 LOGP(
debug,
"Doing action {} for slot {} (timeslice: {})", (
int)
op, li.
index, *timeslice);
747 LOGP(
debug,
"No timeslice associated with slot ", li.
index);
762 if (numInputTypes == 0) {
763 LOGP(
debug,
"numInputTypes == 0, returning.");
766 size_t cacheLines = cache.size() / numInputTypes;
767 assert(cacheLines * numInputTypes == cache.size());
768 int countConsume = 0;
769 int countConsumeExisting = 0;
770 int countProcess = 0;
771 int countDiscard = 0;
775 for (
int li = cacheLines - 1; li >= 0; --li) {
779 if (mTimesliceIndex.
isDirty(slot) ==
false) {
784 throw runtime_error_f(
"Completion police %s has no callback set", mCompletionPolicy.
name.c_str());
786 auto partial = getPartialRecord(li);
788 auto getter = [&partial](
size_t idx,
size_t part) {
789 if (partial[idx].
size() > 0 && partial[idx].header(part).get()) {
790 auto header = partial[idx].header(part).get();
791 auto payload = partial[idx].payload(part).get();
793 reinterpret_cast<const char*
>(header->GetData()),
794 reinterpret_cast<char const*
>(payload ? payload->GetData() :
nullptr),
795 payload ? payload->GetSize() : 0};
799 auto nPartsGetter = [&partial](
size_t idx) {
800 return partial[idx].size();
802 auto refCountGetter = [&partial](
size_t idx) ->
int {
803 auto& header =
static_cast<const fair::mq::shmem::Message&
>(*partial[idx].header(0));
804 return header.GetRefCount();
806 InputSpan span{getter, nPartsGetter, refCountGetter,
static_cast<size_t>(partial.size())};
810 auto timeslice = std::get_if<uint64_t>(&variables.get(0));
814 updateCompletionResults(slot, timeslice, action);
821 updateCompletionResults(slot, timeslice, action);
825 countConsumeExisting++;
826 updateCompletionResults(slot, timeslice, action);
831 updateCompletionResults(slot, timeslice, action);
836 updateCompletionResults(slot, timeslice, action);
851 LOGP(
debug,
"DataRelayer::getReadyToProcess results notDirty:{}, consume:{}, consumeExisting:{}, process:{}, discard:{}, wait:{}",
852 notDirty, countConsume, countConsumeExisting, countProcess,
853 countDiscard, countWait);
858 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
859 const auto numInputTypes = mDistinctRoutesIndex.size();
861 auto markInputDone = [&cachedStateMetrics = mCachedStateMetrics,
863 auto cacheId = s.index * numInputTypes + arg;
864 if (cachedStateMetrics[cacheId] == oldStatus) {
865 cachedStateMetrics[cacheId] = newStatus;
869 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
870 markInputDone(slot, ai, oldStatus, newStatus);
876 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
878 const auto numInputTypes = mDistinctRoutesIndex.size();
880 std::vector<MessageSet> messages(numInputTypes);
881 auto& cache = mCache;
882 auto&
index = mTimesliceIndex;
893 auto moveHeaderPayloadToOutput = [&messages,
894 &cachedStateMetrics = mCachedStateMetrics,
896 auto cacheId = s.index * numInputTypes + arg;
900 if (cache[cacheId].
size() > 0) {
901 messages[arg] = std::move(cache[cacheId]);
903 index.markAsInvalid(s);
911 for (
size_t ai = s.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
912 assert(std::accumulate(cache[ai].messages.begin(), cache[ai].messages.end(),
true, [](
bool result,
auto const&
element) { return result && element.get() == nullptr; }));
915 index.markAsInvalid(s);
919 jumpToCacheEntryAssociatedWith(slot);
920 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
921 moveHeaderPayloadToOutput(slot, ai);
923 invalidateCacheFor(slot);
930 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
932 const auto numInputTypes = mDistinctRoutesIndex.size();
934 std::vector<MessageSet> messages(numInputTypes);
935 auto& cache = mCache;
936 auto&
index = mTimesliceIndex;
947 auto copyHeaderPayloadToOutput = [&messages,
948 &cachedStateMetrics = mCachedStateMetrics,
950 auto cacheId = s.index * numInputTypes + arg;
954 for (
size_t pi = 0; pi < cache[cacheId].size(); pi++) {
955 auto& header = cache[cacheId].header(pi);
956 auto&& newHeader = header->GetTransport()->CreateMessage();
957 newHeader->Copy(*header);
958 messages[arg].add(
PartRef{std::move(newHeader), std::move(cache[cacheId].payload(pi))});
963 jumpToCacheEntryAssociatedWith(slot);
964 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
965 copyHeaderPayloadToOutput(slot, ai);
968 return std::move(messages);
973 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
975 for (
auto& cache : mCache) {
978 for (
size_t s = 0; s < mTimesliceIndex.
size(); ++s) {
986 return mCache.size() / mDistinctRoutesIndex.size();
995 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
997 mTimesliceIndex.
resize(s);
998 mVariableContextes.resize(s);
1004 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1006 auto numInputTypes = mDistinctRoutesIndex.size();
1010 mCache.resize(numInputTypes * mTimesliceIndex.
size());
1013 mCachedStateMetrics.resize(mCache.size());
1018 for (
size_t i = 0;
i < mVariableContextes.size(); ++
i) {
1020 .
name = fmt::format(
"matcher_variables/{}",
i),
1022 .minPublishInterval = 500,
1023 .sendInitialValue =
true,
1028 for (
int ci = 0; ci < mTimesliceIndex.
size(); ci++) {
1030 .
name = fmt::format(
"data_relayer/{}", ci),
1032 .minPublishInterval = 800,
1033 .sendInitialValue =
true,
1041 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1047 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1053 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1059 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1068 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1070 for (
size_t ci = 0; ci < mTimesliceIndex.
size(); ++ci) {
1075 char relayerSlotState[1024];
1078 int written = snprintf(relayerSlotState, 1024,
"%d ", (
int)mTimesliceIndex.
size());
1079 char*
buffer = relayerSlotState + written;
1080 for (
size_t ci = 0; ci < mTimesliceIndex.
size(); ++ci) {
1081 for (
size_t si = 0; si < mDistinctRoutesIndex.size(); ++si) {
1082 int index = ci * mDistinctRoutesIndex.size() + si;
1083 int value =
static_cast<int>(mCachedStateMetrics[
index]);
1091 buffer[mDistinctRoutesIndex.size()] =
'\0';
1092 auto size = (
int)(
buffer - relayerSlotState + mDistinctRoutesIndex.size());
std::vector< framework::ConcreteDataMatcher > matchers
std::vector< OutputRoute > routes
#define O2_BUILTIN_UNREACHABLE
o2::monitoring::Verbosity Verbosity
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
std::vector< MessageSet > consumeExistingInputsForTimeslice(TimesliceSlot id)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
std::function< void(TimesliceSlot, std::vector< MessageSet > &, TimesliceIndex::OldestOutputInfo info)> OnDropCallback
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
void getReadyToProcess(std::vector< RecordAction > &completed)
void setPipelineLength(size_t s)
Tune the maximum number of in flight timeslices this can handle.
size_t getParallelTimeslices() const
Returns how many timeslices we can handle in parallel.
std::vector< MessageSet > consumeAllInputsForTimeslice(TimesliceSlot id)
RelayChoice relay(void const *rawHeader, std::unique_ptr< fair::mq::Message > *messages, InputInfo const &info, size_t nMessages, size_t nPayloads=1, OnInsertionCallback onInsertion=nullptr, OnDropCallback onDrop=nullptr)
void pruneCache(TimesliceSlot slot, OnDropCallback onDrop=nullptr)
Prune the cache for a given slot.
DataRelayer(CompletionPolicy const &, std::vector< InputRoute > const &routes, TimesliceIndex &, ServiceRegistryRef, int)
std::function< void(ServiceRegistryRef &, std::span< fair::mq::MessagePtr > &)> OnInsertionCallback
void setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
void sendContextState()
Send metrics with the VariableContext information.
TimesliceId getTimesliceForSlot(TimesliceSlot slot)
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
TimesliceIndex::OldestOutputInfo getOldestPossibleOutput() const
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
void clear()
Remove all pending messages.
virtual fair::mq::Device * device()=0
void markAsDirty(TimesliceSlot slot, bool value)
data_matcher::VariableContext & getPublishedVariablesForSlot(TimesliceSlot slot)
OldestInputInfo setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
OldestOutputInfo getOldestPossibleOutput() const
bool isDirty(TimesliceSlot const &slot) const
InputChannelInfo const & getChannelInfo(ChannelIndex channel) const
ActionTaken
The outcome for the processing of a given timeslot.
@ Wait
An obsolete slot is used to hold the new context and the old one is dropped.
@ DropObsolete
An invalid context is not inserted in the index and dropped.
@ DropInvalid
We wait for the oldest slot to complete.
@ ReplaceObsolete
An unused / invalid slot is used to hold the new context.
void rescan()
Mark all the cachelines as invalid, e.g. due to an out of band event.
bool validateSlot(TimesliceSlot slot, TimesliceId currentOldest)
bool isValid(TimesliceSlot const &slot) const
void markAsInvalid(TimesliceSlot slot)
OldestOutputInfo updateOldestPossibleOutput(bool rewinded)
data_matcher::VariableContext & getVariablesForSlot(TimesliceSlot slot)
void publish(void(*callback)(VariableContext const &, TimesliceSlot slot, void *context), void *context, TimesliceSlot slot)
ContextElement::Value const & get(size_t pos) const
GLsizei const GLfloat * value
constexpr int MAX_MATCHING_VARIABLE
Defining PrimaryVertex explicitly as messageable.
size_t matchToContext(void const *data, std::vector< DataDescriptorMatcher > const &matchers, std::vector< size_t > const &index, VariableContext &context)
constexpr int INVALID_INPUT
bool isCalibrationData(std::unique_ptr< fair::mq::Message > &first)
void sendVariableContextMetrics(VariableContext &context, TimesliceSlot slot, DataProcessingStates &states)
@ NoTransition
No pending transitions.
@ DROPPED_INCOMING_MESSAGES
RuntimeErrorRef runtime_error_f(const char *,...)
std::string to_string(gsl::span< T, Size > span)
static constexpr int INVALID
std::string name
Name of the policy itself.
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
CallbackConfigureRelayer configureRelayer
CallbackFull callbackFull
Actual policy which decides what to do with a partial InputRecord, extended version.
Helper struct to hold statistics about the data processing happening.
@ Add
Update the rate of the metric given the amount since the last time.
Type type
What was the outcome of the relay operation.
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static std::string describe(InputSpec const &spec)
static unsigned int pipelineLength(unsigned int minLength)
get max number of timeslices in the queue
static bool onlineDeploymentMode()
@true if running online
Running state information of a given device.
ProcessingType allowedProcessing
Reference to an inflight part.
static bool isValid(TimesliceId const ×lice)
static constexpr uint64_t INVALID
static bool isValid(TimesliceSlot const &slot)
static constexpr uint64_t INVALID
static uint32_t getRunNumber(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFCounter(data_matcher::VariableContext const &variables)
static uint64_t getCreationTime(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFOrbit(data_matcher::VariableContext const &variables)
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"