46#include <Monitoring/Metric.h>
47#include <Monitoring/Monitoring.h>
49#include <fairlogger/Logger.h>
50#include <fairmq/Channel.h>
52#include <fairmq/shmem/Message.h>
53#include <fairmq/Device.h>
54#include <fmt/format.h>
55#include <fmt/ostream.h>
74 std::vector<InputRoute>
const&
routes,
79 mTimesliceIndex{
index},
80 mCompletionPolicy{policy},
85 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
88 if (pipelineLength == -1) {
89 auto getPipelineLengthHelper = [&services]() {
96 static int detectedPipelineLength = getPipelineLengthHelper();
97 pipelineLength = detectedPipelineLength;
105 auto numInputTypes = mDistinctRoutesIndex.size();
107 std::string queries =
"";
108 for (
short i = 0;
i < numInputTypes; ++
i) {
110 assert(mDistinctRoutesIndex[
i] <
routes.size());
111 mInputs.push_back(
routes[mDistinctRoutesIndex[
i]].matcher);
112 auto& matcher =
routes[mDistinctRoutesIndex[
i]].matcher;
118 states.registerState({.name =
"data_queries", .stateId = stateId, .sendInitialValue =
true, .defaultEnabled =
true});
120 states.processCommandQueue();
125 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
133 LOGP(
debug,
"DataRelayer::processDanglingInputs");
134 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
139 if (expirationHandlers.empty()) {
140 LOGP(
debug,
"DataRelayer::processDanglingInputs: No expiration handlers");
144 std::vector<TimesliceSlot> slotsCreatedByHandlers;
146 LOGP(
debug,
"Creating new slot");
147 for (
auto& handler : expirationHandlers) {
148 LOGP(
debug,
"handler.creator for {}", handler.name);
149 auto channelIndex = deviceProxy.getInputChannelIndex(handler.routeIndex);
150 slotsCreatedByHandlers.push_back(handler.creator(services, channelIndex));
155 for (
auto slot : slotsCreatedByHandlers) {
161 if (validSlots > 0) {
163 LOGP(
debug,
"DataRelayer::processDanglingInputs: {} slots created by handler", validSlots);
165 LOGP(
debug,
"DataRelayer::processDanglingInputs: no slots created by handler");
169 int headerPresent = 0;
170 int payloadPresent = 0;
173 int checkerDenied = 0;
174 for (
size_t ti = 0; ti < mTimesliceIndex.
size(); ++ti) {
176 if (mTimesliceIndex.
isValid(slot) ==
false) {
179 assert(mDistinctRoutesIndex.empty() ==
false);
183 for (
size_t ei = 0; ei < expirationHandlers.size(); ++ei) {
184 auto& expirator = expirationHandlers[ei];
187 auto& part = mCache[ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value];
188 if (!part.empty() && (part |
get_header{0}) !=
nullptr) {
192 if (!part.empty() && (part |
get_payload{0, 0}) !=
nullptr) {
197 if (!expirator.checker) {
201 if (slotsCreatedByHandlers[ei] != slot) {
206 auto getPartialRecord = [&cache = mCache, numInputTypes = mDistinctRoutesIndex.size()](
int li) -> std::span<std::vector<fair::mq::MessagePtr>
const> {
207 auto offset = li * numInputTypes;
208 assert(cache.size() >=
offset + numInputTypes);
210 auto const end = cache.data() +
offset + numInputTypes;
214 auto partial = getPartialRecord(ti);
215 auto nPartsGetter = [&partial](
size_t idx) {
218 auto refCountGetter = [&partial](
size_t idx) ->
int {
219 auto& header =
static_cast<const fair::mq::shmem::Message&
>(*(partial[idx] |
get_header{0}));
220 return header.GetRefCount();
223 if (!partial[idx].
empty()) {
224 auto const& headerMsg = partial[idx][
indices.headerIdx];
225 auto const& payloadMsg = partial[idx][
indices.payloadIdx];
228 reinterpret_cast<const char*
>(headerMsg->GetData()),
229 payloadMsg ?
reinterpret_cast<char const*
>(payloadMsg->GetData()) :
nullptr,
230 payloadMsg ? payloadMsg->GetSize() : 0};
237 return next.headerIdx < partial[idx].size() ? next :
DataRefIndices{size_t(-1), size_t(-1)};
239 InputSpan span{nPartsGetter, refCountGetter, indicesGetter, nextIndicesGetter,
static_cast<size_t>(partial.size())};
242 if (expirator.checker(services, timestamp.value, span) ==
false) {
247 assert(ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value < mCache.size());
248 assert(expirator.handler);
250 expirator.handler(services, newRef, variables);
252 part.emplace_back(std::move(newRef.
header));
253 part.emplace_back(std::move(newRef.
payload));
254 activity.expiredSlots++;
261 LOGP(
debug,
"DataRelayer::processDanglingInputs headerPresent:{}, payloadPresent:{}, noCheckers:{}, badSlot:{}, checkerDenied:{}",
262 headerPresent, payloadPresent, noCheckers, badSlot, checkerDenied);
270 std::vector<DataDescriptorMatcher>
const&
matchers,
271 std::vector<size_t>
const&
index,
274 for (
size_t ri = 0, re =
index.size(); ri < re; ++ri) {
277 if (matcher.match(
reinterpret_cast<char const*
>(
data), context)) {
290 static const std::string nullstring{
"null"};
294 static std::string
state =
"";
297 auto var = variables.
get(
i);
298 if (
auto pval = std::get_if<uint64_t>(&var)) {
300 }
else if (
auto pval = std::get_if<uint32_t>(&var)) {
302 }
else if (
auto pval2 = std::get_if<std::string>(&var)) {
310 .data =
state.data()});
318 LOGP(
debug,
"DataRelayer::setOldestPossibleInput {} from channel {}", newOldest.timeslice.value, newOldest.channel.value);
319 static bool dontDrop = getenv(
"DPL_DONT_DROP_OLD_TIMESLICE") && atoi(getenv(
"DPL_DONT_DROP_OLD_TIMESLICE"));
323 for (
size_t si = 0; si < mCache.size() / mInputs.size(); ++si) {
328 if (mTimesliceIndex.
isValid({si})) {
329 LOGP(
debug,
"Keeping slot {} because data has timestamp {} while oldest possible timestamp is {}", si, timestamp.value, newOldest.timeslice.value);
333 mPruneOps.push_back(
PruneOp{si});
334 bool didDrop =
false;
335 for (
size_t mi = 0; mi < mInputs.size(); ++mi) {
336 auto& input = mInputs[mi];
337 auto&
element = mCache[si * mInputs.size() + mi];
339 if (input.lifetime != Lifetime::Condition && mCompletionPolicy.
name !=
"internal-dpl-injected-dummy-sink") {
343 LOGP(warning,
"Stop transition requested. Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it will never be completed.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
345 LOGP(error,
"Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it can never be completed.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
349 "Silently dropping data {} in pipeline slot {} because it has timeslice {} < {} after receiving data from channel {}."
350 "Because Lifetime::Timeframe data not there and not expected (e.g. due to sampling) we drop non sampled, non timeframe data (e.g. Conditions).",
358 for (
size_t mi = 0; mi < mInputs.size(); ++mi) {
359 auto& input = mInputs[mi];
360 if (input.lifetime == Lifetime::Timer) {
363 auto&
element = mCache[si * mInputs.size() + mi];
369 O2_SIGNPOST_EVENT_EMIT(calibration, cid,
"expected_missing_data",
"Expected missing %{public}s (lifetime:%d) while dropping non-calibration data in slot %zu with timestamp %zu < %zu.",
372 LOGP(info,
"Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
377 O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid,
"expected_missing_data",
"Not processing in calibration mode: missing %s (lifetime:%d) while dropping incomplete data in slot %zu with timestamp %zu < %zu.",
380 LOGP(error,
"Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
396 for (
auto&
op : mPruneOps) {
409 &cachedStateMetrics = mCachedStateMetrics,
410 numInputTypes = mDistinctRoutesIndex.size(),
411 &
index = mTimesliceIndex,
414 auto oldestPossibleTimeslice =
index.getOldestPossibleOutput();
416 std::vector<std::vector<fair::mq::MessagePtr>> dropped(numInputTypes);
417 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
418 auto cacheId = slot.
index * numInputTypes + ai;
422 if (!cache[cacheId].
empty()) {
423 dropped[ai] = std::move(cache[cacheId]);
426 bool anyDropped = std::any_of(dropped.begin(), dropped.end(), [](
auto&
m) { return !m.empty(); });
429 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid,
"pruneCache",
"Dropping stuff from slot %zu with timeslice %zu", slot.
index, oldestPossibleTimeslice.timeslice.value);
430 onDrop(slot, dropped, oldestPossibleTimeslice);
433 assert(cache.empty() ==
false);
434 assert(
index.size() * numInputTypes == cache.size());
438 assert(numInputTypes * slot.
index < cache.size());
439 for (
size_t ai = slot.
index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
450 auto* dph = o2::header::get<DataProcessingHeader*>(
first->GetData());
456 std::unique_ptr<fair::mq::Message>* messages,
463 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
469 return (slot.index % maxLanes) == (currentLane % maxLanes);
474 auto getInputTimeslice = [&
matchers = mInputMatchers,
475 &distinctRoutes = mDistinctRoutesIndex,
478 -> std::tuple<int, TimesliceId> {
491 if (
auto pval = std::get_if<uint64_t>(&context.get(0))) {
493 return {input, timeslice};
503 auto saveInSlot = [&cachedStateMetrics = mCachedStateMetrics,
509 &services = mContext,
512 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid,
"saveInSlot",
"saving %{public}s@%zu in slot %zu from %{public}s",
513 fmt::format(
"{:x}", *o2::header::get<DataHeader*>(messages[0]->GetData())).c_str(),
514 timeslice.value, slot.index,
516 auto cacheIdx = numInputTypes * slot.index + input;
517 auto&
target = cache[cacheIdx];
521 assert(nPayloads > 0);
525 auto allMessages = std::span<fair::mq::MessagePtr>(messages, messages + nMessages);
527 onInsertion(services, allMessages);
529 for (
size_t mi = 0; mi < nMessages; ++mi) {
530 assert(mi + nPayloads < nMessages);
536 "Dropping incoming %zu messages because they are data processing.", nPayloads);
538 for (
size_t i = mi;
i < mi + nPayloads + 1;
i++) {
539 auto discard = std::move(messages[
i]);
544 auto span = std::span<fair::mq::MessagePtr>(messages + mi, messages + mi + nPayloads + 1);
547 for (
size_t i = 0;
i < nPayloads + 1; ++
i) {
548 target.emplace_back(std::move(span[
i]));
587 auto&
index = mTimesliceIndex;
589 bool needsCleaning =
false;
592 for (
size_t ci = 0; ci <
index.size(); ++ci) {
594 if (!isSlotInLane(slot)) {
597 if (
index.isValid(slot) ==
false) {
600 std::tie(input, timeslice) = getInputTimeslice(
index.getVariablesForSlot(slot));
609 for (
size_t ci = 0; ci <
index.size(); ++ci) {
611 if (
index.isValid(slot) ==
true) {
614 if (!isSlotInLane(slot)) {
617 std::tie(input, timeslice) = getInputTimeslice(
index.getVariablesForSlot(slot));
619 needsCleaning =
true;
630 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](
const auto&
x) { return x.slot == slot; }), mPruneOps.end());
632 size_t saved = saveInSlot(timeslice, input, slot, info);
636 index.publishSlot(slot);
637 index.markAsDirty(slot,
true);
645 std::tie(input, timeslice) = getInputTimeslice(pristineContext);
647 auto DataHeaderInfo = [&rawHeader]() {
650 const auto* dh = o2::header::get<o2::header::DataHeader*>(rawHeader);
652 error += fmt::format(
"{}/{}/{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification);
654 error +=
"invalid header";
660 LOG(error) <<
"Could not match incoming data to any input route: " << DataHeaderInfo();
663 for (
size_t pi = 0; pi < nMessages; ++pi) {
664 messages[pi].reset(
nullptr);
670 LOG(error) <<
"Could not determine the timeslice for input: " << DataHeaderInfo();
673 for (
size_t pi = 0; pi < nMessages; ++pi) {
674 messages[pi].reset(
nullptr);
681 std::tie(action, slot) =
index.replaceLRUWith(pristineContext, timeslice);
682 uint64_t
const* debugTimestamp = std::get_if<uint64_t>(&pristineContext.
get(0));
685 "Slot %zu updated with %zu using action %d, %" PRIu64, slot.index, timeslice.value, (
int)action, *debugTimestamp);
688 updateStatistics(action);
694 static std::atomic<size_t> obsoleteCount = 0;
695 static std::atomic<size_t> mult = 1;
696 if ((obsoleteCount++ % (1 * mult)) == 0) {
697 LOGP(warning,
"Over {} incoming messages are already obsolete, not relaying.", obsoleteCount.load());
698 if (obsoleteCount > mult * 10) {
704 LOG(warning) <<
"Incoming data is invalid, not relaying.";
707 for (
size_t pi = 0; pi < nMessages; ++pi) {
708 messages[pi].reset(
nullptr);
716 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](
const auto&
x) { return x.slot == slot; }), mPruneOps.end());
717 size_t saved = saveInSlot(timeslice, input, slot, info);
721 index.publishSlot(slot);
722 index.markAsDirty(slot,
true);
730 LOGP(
debug,
"DataRelayer::getReadyToProcess");
731 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
734 const auto& cache = mCache;
735 const auto numInputTypes = mDistinctRoutesIndex.size();
741 auto getPartialRecord = [&cache, &numInputTypes](
int li) -> std::span<std::vector<fair::mq::MessagePtr>
const> {
742 auto offset = li * numInputTypes;
743 assert(cache.size() >=
offset + numInputTypes);
745 auto const end = cache.data() +
offset + numInputTypes;
754 LOGP(
debug,
"Doing action {} for slot {} (timeslice: {})", (
int)
op, li.
index, *timeslice);
757 LOGP(
debug,
"No timeslice associated with slot ", li.
index);
772 if (numInputTypes == 0) {
773 LOGP(
debug,
"numInputTypes == 0, returning.");
776 size_t cacheLines = cache.size() / numInputTypes;
777 assert(cacheLines * numInputTypes == cache.size());
778 int countConsume = 0;
779 int countConsumeExisting = 0;
780 int countProcess = 0;
781 int countDiscard = 0;
785 for (
int li = cacheLines - 1; li >= 0; --li) {
789 if (mTimesliceIndex.
isDirty(slot) ==
false) {
794 throw runtime_error_f(
"Completion police %s has no callback set", mCompletionPolicy.
name.c_str());
796 auto partial = getPartialRecord(li);
797 auto nPartsGetter = [&partial](
size_t idx) {
800 auto refCountGetter = [&partial](
size_t idx) ->
int {
801 auto& header =
static_cast<const fair::mq::shmem::Message&
>(*(partial[idx] |
get_header{0}));
802 return header.GetRefCount();
805 if (!partial[idx].
empty()) {
806 auto const& headerMsg = partial[idx][
indices.headerIdx];
807 auto const& payloadMsg = partial[idx][
indices.payloadIdx];
810 reinterpret_cast<const char*
>(headerMsg->GetData()),
811 payloadMsg ?
reinterpret_cast<char const*
>(payloadMsg->GetData()) :
nullptr,
812 payloadMsg ? payloadMsg->GetSize() : 0};
819 return next.headerIdx < partial[idx].size() ? next :
DataRefIndices{size_t(-1), size_t(-1)};
821 InputSpan span{nPartsGetter, refCountGetter, indicesGetter, nextIndicesGetter,
static_cast<size_t>(partial.size())};
825 auto timeslice = std::get_if<uint64_t>(&variables.get(0));
829 updateCompletionResults(slot, timeslice, action);
836 updateCompletionResults(slot, timeslice, action);
840 countConsumeExisting++;
841 updateCompletionResults(slot, timeslice, action);
846 updateCompletionResults(slot, timeslice, action);
851 updateCompletionResults(slot, timeslice, action);
866 LOGP(
debug,
"DataRelayer::getReadyToProcess results notDirty:{}, consume:{}, consumeExisting:{}, process:{}, discard:{}, wait:{}",
867 notDirty, countConsume, countConsumeExisting, countProcess,
868 countDiscard, countWait);
873 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
874 const auto numInputTypes = mDistinctRoutesIndex.size();
876 auto markInputDone = [&cachedStateMetrics = mCachedStateMetrics,
878 auto cacheId = s.index * numInputTypes + arg;
879 if (cachedStateMetrics[cacheId] == oldStatus) {
880 cachedStateMetrics[cacheId] = newStatus;
884 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
885 markInputDone(slot, ai, oldStatus, newStatus);
891 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
893 const auto numInputTypes = mDistinctRoutesIndex.size();
895 std::vector<std::vector<fair::mq::MessagePtr>> messages(numInputTypes);
896 auto& cache = mCache;
897 auto&
index = mTimesliceIndex;
908 auto moveHeaderPayloadToOutput = [&messages,
909 &cachedStateMetrics = mCachedStateMetrics,
911 auto cacheId = s.index * numInputTypes + arg;
915 if (!cache[cacheId].
empty()) {
916 messages[arg] = std::move(cache[cacheId]);
918 index.markAsInvalid(s);
926 for (
size_t ai = s.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
927 assert(std::accumulate(cache[ai].begin(), cache[ai].
end(),
true, [](
bool result,
auto const&
element) {
return result &&
element.get() ==
nullptr; }));
930 index.markAsInvalid(s);
934 jumpToCacheEntryAssociatedWith(slot);
935 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
936 moveHeaderPayloadToOutput(slot, ai);
938 invalidateCacheFor(slot);
945 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
947 const auto numInputTypes = mDistinctRoutesIndex.size();
949 std::vector<std::vector<fair::mq::MessagePtr>> messages(numInputTypes);
950 auto& cache = mCache;
951 auto&
index = mTimesliceIndex;
962 auto copyHeaderPayloadToOutput = [&messages,
963 &cachedStateMetrics = mCachedStateMetrics,
965 auto cacheId = s.index * numInputTypes + arg;
969 for (
size_t pi = 0; pi < (cache[cacheId] |
count_parts{}); pi++) {
970 auto& header = cache[cacheId] |
get_header{pi};
971 auto&& newHeader = header->GetTransport()->CreateMessage();
972 newHeader->Copy(*header);
973 messages[arg].emplace_back(std::move(newHeader));
974 messages[arg].emplace_back(std::move(cache[cacheId] |
get_payload{pi, 0}));
979 jumpToCacheEntryAssociatedWith(slot);
980 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
981 copyHeaderPayloadToOutput(slot, ai);
984 return std::move(messages);
989 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
991 for (
auto& cache : mCache) {
994 for (
size_t s = 0; s < mTimesliceIndex.
size(); ++s) {
1002 return mCache.size() / mDistinctRoutesIndex.size();
1011 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1013 mTimesliceIndex.
resize(s);
1014 mVariableContextes.resize(s);
1020 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1022 auto numInputTypes = mDistinctRoutesIndex.size();
1026 mCache.resize(numInputTypes * mTimesliceIndex.
size());
1029 mCachedStateMetrics.resize(mCache.size());
1034 for (
size_t i = 0;
i < mVariableContextes.size(); ++
i) {
1036 .
name = fmt::format(
"matcher_variables/{}",
i),
1038 .minPublishInterval = 500,
1039 .sendInitialValue =
true,
1044 for (
int ci = 0; ci < mTimesliceIndex.
size(); ci++) {
1046 .
name = fmt::format(
"data_relayer/{}", ci),
1048 .minPublishInterval = 800,
1049 .sendInitialValue =
true,
1057 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1063 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1069 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1075 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1084 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1086 for (
size_t ci = 0; ci < mTimesliceIndex.
size(); ++ci) {
1091 char relayerSlotState[1024];
1094 int written = snprintf(relayerSlotState, 1024,
"%d ", (
int)mTimesliceIndex.
size());
1095 char*
buffer = relayerSlotState + written;
1096 for (
size_t ci = 0; ci < mTimesliceIndex.
size(); ++ci) {
1097 for (
size_t si = 0; si < mDistinctRoutesIndex.size(); ++si) {
1098 int index = ci * mDistinctRoutesIndex.size() + si;
1099 int value =
static_cast<int>(mCachedStateMetrics[
index]);
1107 buffer[mDistinctRoutesIndex.size()] =
'\0';
1108 auto size = (
int)(
buffer - relayerSlotState + mDistinctRoutesIndex.size());
std::vector< framework::ConcreteDataMatcher > matchers
std::vector< OutputRoute > routes
#define O2_BUILTIN_UNREACHABLE
o2::monitoring::Verbosity Verbosity
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
void getReadyToProcess(std::vector< RecordAction > &completed)
std::function< void(TimesliceSlot, std::vector< std::vector< fair::mq::MessagePtr > > &, TimesliceIndex::OldestOutputInfo info)> OnDropCallback
void setPipelineLength(size_t s)
Tune the maximum number of in flight timeslices this can handle.
std::vector< std::vector< fair::mq::MessagePtr > > consumeAllInputsForTimeslice(TimesliceSlot id)
size_t getParallelTimeslices() const
Returns how many timeslices we can handle in parallel.
RelayChoice relay(void const *rawHeader, std::unique_ptr< fair::mq::Message > *messages, InputInfo const &info, size_t nMessages, size_t nPayloads=1, OnInsertionCallback onInsertion=nullptr, OnDropCallback onDrop=nullptr)
void pruneCache(TimesliceSlot slot, OnDropCallback onDrop=nullptr)
Prune the cache for a given slot.
DataRelayer(CompletionPolicy const &, std::vector< InputRoute > const &routes, TimesliceIndex &, ServiceRegistryRef, int)
std::function< void(ServiceRegistryRef &, std::span< fair::mq::MessagePtr > &)> OnInsertionCallback
std::vector< std::vector< fair::mq::MessagePtr > > consumeExistingInputsForTimeslice(TimesliceSlot id)
void setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
void sendContextState()
Send metrics with the VariableContext information.
TimesliceId getTimesliceForSlot(TimesliceSlot slot)
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
TimesliceIndex::OldestOutputInfo getOldestPossibleOutput() const
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
void clear()
Remove all pending messages.
virtual fair::mq::Device * device()=0
void markAsDirty(TimesliceSlot slot, bool value)
data_matcher::VariableContext & getPublishedVariablesForSlot(TimesliceSlot slot)
OldestInputInfo setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
OldestOutputInfo getOldestPossibleOutput() const
bool isDirty(TimesliceSlot const &slot) const
InputChannelInfo const & getChannelInfo(ChannelIndex channel) const
ActionTaken
The outcome for the processing of a given timeslot.
@ Wait
An obsolete slot is used to hold the new context and the old one is dropped.
@ DropObsolete
An invalid context is not inserted in the index and dropped.
@ DropInvalid
We wait for the oldest slot to complete.
@ ReplaceObsolete
An unused / invalid slot is used to hold the new context.
void rescan()
Mark all the cachelines as invalid, e.g. due to an out of band event.
bool validateSlot(TimesliceSlot slot, TimesliceId currentOldest)
bool isValid(TimesliceSlot const &slot) const
void markAsInvalid(TimesliceSlot slot)
OldestOutputInfo updateOldestPossibleOutput(bool rewinded)
data_matcher::VariableContext & getVariablesForSlot(TimesliceSlot slot)
void publish(void(*callback)(VariableContext const &, TimesliceSlot slot, void *context), void *context, TimesliceSlot slot)
ContextElement::Value const & get(size_t pos) const
GLsizei const GLfloat * value
GLsizei GLenum const void * indices
constexpr int MAX_MATCHING_VARIABLE
Defining ITS Vertex explicitly as messageable.
size_t matchToContext(void const *data, std::vector< DataDescriptorMatcher > const &matchers, std::vector< size_t > const &index, VariableContext &context)
constexpr int INVALID_INPUT
bool isCalibrationData(std::unique_ptr< fair::mq::Message > &first)
void sendVariableContextMetrics(VariableContext &context, TimesliceSlot slot, DataProcessingStates &states)
@ NoTransition
No pending transitions.
@ DROPPED_INCOMING_MESSAGES
RuntimeErrorRef runtime_error_f(const char *,...)
std::string to_string(gsl::span< T, Size > span)
static constexpr int INVALID
std::string name
Name of the policy itself.
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
CallbackConfigureRelayer configureRelayer
CallbackFull callbackFull
Actual policy which decides what to do with a partial InputRecord, extended version.
Helper struct to hold statistics about the data processing happening.
@ Add
Update the rate of the metric given the amount since the last time.
Type type
What was the outcome of the relay operation.
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static std::string describe(InputSpec const &spec)
static unsigned int pipelineLength(unsigned int minLength)
get max number of timeslices in the queue
static bool onlineDeploymentMode()
@true if running online
Running state information of a given device.
ProcessingType allowedProcessing
Reference to an inflight part.
std::unique_ptr< fair::mq::Message > header
std::unique_ptr< fair::mq::Message > payload
static bool isValid(TimesliceId const ×lice)
static constexpr uint64_t INVALID
static bool isValid(TimesliceSlot const &slot)
static constexpr uint64_t INVALID
static uint32_t getRunNumber(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFCounter(data_matcher::VariableContext const &variables)
static uint64_t getCreationTime(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFOrbit(data_matcher::VariableContext const &variables)
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"