42#include <Monitoring/Metric.h>
43#include <Monitoring/Monitoring.h>
45#include <fairmq/Channel.h>
46#include <fmt/format.h>
47#include <fmt/ostream.h>
65 std::vector<InputRoute>
const& routes,
69 mTimesliceIndex{
index},
70 mCompletionPolicy{policy},
75 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
85 auto numInputTypes = mDistinctRoutesIndex.size();
87 std::string queries =
"";
88 for (
short i = 0;
i < numInputTypes; ++
i) {
90 assert(mDistinctRoutesIndex[
i] < routes.size());
91 mInputs.push_back(routes[mDistinctRoutesIndex[
i]].matcher);
92 auto& matcher = routes[mDistinctRoutesIndex[
i]].matcher;
98 states.registerState({.name =
"data_queries", .stateId = stateId, .sendInitialValue =
true, .defaultEnabled =
true});
100 states.processCommandQueue();
105 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
113 LOGP(
debug,
"DataRelayer::processDanglingInputs");
114 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
119 if (expirationHandlers.empty()) {
120 LOGP(
debug,
"DataRelayer::processDanglingInputs: No expiration handlers");
124 std::vector<TimesliceSlot> slotsCreatedByHandlers;
126 LOGP(
debug,
"Creating new slot");
127 for (
auto& handler : expirationHandlers) {
128 LOGP(
debug,
"handler.creator for {}", handler.name);
129 auto channelIndex = deviceProxy.getInputChannelIndex(handler.routeIndex);
130 slotsCreatedByHandlers.push_back(handler.creator(services, channelIndex));
135 for (
auto slot : slotsCreatedByHandlers) {
141 if (validSlots > 0) {
143 LOGP(
debug,
"DataRelayer::processDanglingInputs: {} slots created by handler", validSlots);
145 LOGP(
debug,
"DataRelayer::processDanglingInputs: no slots created by handler");
149 int headerPresent = 0;
150 int payloadPresent = 0;
153 int checkerDenied = 0;
154 for (
size_t ti = 0; ti < mTimesliceIndex.
size(); ++ti) {
156 if (mTimesliceIndex.
isValid(slot) ==
false) {
159 assert(mDistinctRoutesIndex.empty() ==
false);
163 for (
size_t ei = 0; ei < expirationHandlers.size(); ++ei) {
164 auto& expirator = expirationHandlers[ei];
167 auto& part = mCache[ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value];
168 if (part.size() > 0 && part.header(0) !=
nullptr) {
172 if (part.size() > 0 && part.payload(0) !=
nullptr) {
177 if (!expirator.checker) {
181 if (slotsCreatedByHandlers[ei] != slot) {
186 auto getPartialRecord = [&cache = mCache, numInputTypes = mDistinctRoutesIndex.size()](
int li) -> gsl::span<MessageSet const> {
187 auto offset = li * numInputTypes;
188 assert(cache.size() >=
offset + numInputTypes);
190 auto const end = cache.data() +
offset + numInputTypes;
194 auto partial = getPartialRecord(ti);
196 auto getter = [&partial](
size_t idx,
size_t part) {
197 if (partial[idx].
size() > 0 && partial[idx].header(part).get()) {
198 auto header = partial[idx].header(part).get();
199 auto payload = partial[idx].payload(part).get();
201 reinterpret_cast<const char*
>(header->GetData()),
202 reinterpret_cast<char const*
>(payload ? payload->GetData() :
nullptr),
203 payload ? payload->GetSize() : 0};
207 auto nPartsGetter = [&partial](
size_t idx) {
208 return partial[idx].size();
210 InputSpan span{getter, nPartsGetter,
static_cast<size_t>(partial.size())};
213 if (expirator.checker(services, timestamp.value, span) ==
false) {
218 assert(ti * mDistinctRoutesIndex.size() + expirator.routeIndex.value < mCache.size());
219 assert(expirator.handler);
221 expirator.handler(services, newRef, variables);
222 part.reset(std::move(newRef));
223 activity.expiredSlots++;
226 assert(part.header(0) !=
nullptr);
227 assert(part.payload(0) !=
nullptr);
230 LOGP(
debug,
"DataRelayer::processDanglingInputs headerPresent:{}, payloadPresent:{}, noCheckers:{}, badSlot:{}, checkerDenied:{}",
231 headerPresent, payloadPresent, noCheckers, badSlot, checkerDenied);
239 std::vector<DataDescriptorMatcher>
const& matchers,
240 std::vector<size_t>
const&
index,
243 for (
size_t ri = 0, re =
index.size(); ri < re; ++ri) {
244 auto& matcher = matchers[
index[ri]];
246 if (matcher.match(
reinterpret_cast<char const*
>(
data), context)) {
259 static const std::string nullstring{
"null"};
263 static std::string
state =
"";
266 auto var = variables.
get(
i);
267 if (
auto pval = std::get_if<uint64_t>(&var)) {
269 }
else if (
auto pval = std::get_if<uint32_t>(&var)) {
271 }
else if (
auto pval2 = std::get_if<std::string>(&var)) {
279 .data =
state.data()});
287 LOGP(
debug,
"DataRelayer::setOldestPossibleInput {} from channel {}", newOldest.timeslice.value, newOldest.channel.value);
288 static bool dontDrop = getenv(
"DPL_DONT_DROP_OLD_TIMESLICE") && atoi(getenv(
"DPL_DONT_DROP_OLD_TIMESLICE"));
292 for (
size_t si = 0; si < mCache.size() / mInputs.size(); ++si) {
295 auto valid = mTimesliceIndex.
validateSlot({si}, newOldest.timeslice);
297 if (mTimesliceIndex.
isValid({si})) {
298 LOGP(
debug,
"Keeping slot {} because data has timestamp {} while oldest possible timestamp is {}", si, timestamp.value, newOldest.timeslice.value);
302 mPruneOps.push_back(
PruneOp{si});
303 bool didDrop =
false;
304 for (
size_t mi = 0; mi < mInputs.size(); ++mi) {
305 auto& input = mInputs[mi];
306 auto& element = mCache[si * mInputs.size() + mi];
307 if (element.size() != 0) {
308 if (input.lifetime != Lifetime::Condition && mCompletionPolicy.
name !=
"internal-dpl-injected-dummy-sink") {
312 LOGP(warning,
"Stop transition requested. Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it will never be completed.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
314 LOGP(error,
"Dropping incomplete {} Lifetime::{} data in slot {} with timestamp {} < {} as it can never be completed.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
318 "Silently dropping data {} in pipeline slot {} because it has timeslice {} < {} after receiving data from channel {}."
319 "Because Lifetime::Timeframe data not there and not expected (e.g. due to sampling) we drop non sampled, non timeframe data (e.g. Conditions).",
327 for (
size_t mi = 0; mi < mInputs.size(); ++mi) {
328 auto& input = mInputs[mi];
329 if (input.lifetime == Lifetime::Timer) {
332 auto& element = mCache[si * mInputs.size() + mi];
333 if (element.size() == 0) {
336 LOGP(warning,
"Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
338 LOGP(error,
"Missing {} (lifetime:{}) while dropping incomplete data in slot {} with timestamp {} < {}.",
DataSpecUtils::describe(input), input.lifetime, si, timestamp.value, newOldest.timeslice.value);
353 for (
auto&
op : mPruneOps) {
366 &cachedStateMetrics = mCachedStateMetrics,
367 numInputTypes = mDistinctRoutesIndex.size(),
368 &
index = mTimesliceIndex,
371 auto oldestPossibleTimeslice =
index.getOldestPossibleOutput();
373 std::vector<MessageSet> dropped(numInputTypes);
374 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
375 auto cacheId = slot.
index * numInputTypes + ai;
379 if (cache[cacheId].
size() > 0) {
380 dropped[ai] = std::move(cache[cacheId]);
383 bool anyDropped = std::any_of(dropped.begin(), dropped.end(), [](
auto&
m) { return m.size(); });
386 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid,
"pruneCache",
"Dropping stuff from slot %zu with timeslice %zu", slot.
index, oldestPossibleTimeslice.timeslice.value);
387 onDrop(slot, dropped, oldestPossibleTimeslice);
390 assert(cache.empty() ==
false);
391 assert(
index.size() * numInputTypes == cache.size());
395 assert(numInputTypes * slot.
index < cache.size());
396 for (
size_t ai = slot.
index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
407 auto* dh = o2::header::get<DataHeader*>(
first->GetData());
413 std::unique_ptr<fair::mq::Message>* messages,
419 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
425 return (slot.index % maxLanes) == (currentLane % maxLanes);
430 auto getInputTimeslice = [&matchers = mInputMatchers,
431 &distinctRoutes = mDistinctRoutesIndex,
434 -> std::tuple<int, TimesliceId> {
437 auto input =
matchToContext(rawHeader, matchers, distinctRoutes, context);
447 if (
auto pval = std::get_if<uint64_t>(&context.get(0))) {
449 return {input, timeslice};
459 auto saveInSlot = [&cachedStateMetrics = mCachedStateMetrics,
464 &services = mContext,
467 O2_SIGNPOST_EVENT_EMIT(data_relayer, aid,
"saveInSlot",
"saving %{public}s@%zu in slot %zu from %{public}s",
468 fmt::format(
"{:x}", *o2::header::get<DataHeader*>(messages[0]->GetData())).c_str(),
469 timeslice.value, slot.index,
471 auto cacheIdx = numInputTypes * slot.index + input;
476 assert(nPayloads > 0);
478 for (
size_t mi = 0; mi < nMessages; ++mi) {
479 assert(mi + nPayloads < nMessages);
486 target.add([&messages, &mi](
size_t i) -> fair::mq::MessagePtr& {
return messages[mi +
i]; }, nPayloads + 1);
524 auto&
index = mTimesliceIndex;
526 bool needsCleaning =
false;
529 for (
size_t ci = 0; ci <
index.size(); ++ci) {
531 if (!isSlotInLane(slot)) {
534 if (
index.isValid(slot) ==
false) {
537 std::tie(input, timeslice) = getInputTimeslice(
index.getVariablesForSlot(slot));
546 for (
size_t ci = 0; ci <
index.size(); ++ci) {
548 if (
index.isValid(slot) ==
true) {
551 if (!isSlotInLane(slot)) {
554 std::tie(input, timeslice) = getInputTimeslice(
index.getVariablesForSlot(slot));
556 needsCleaning =
true;
567 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](
const auto&
x) { return x.slot == slot; }), mPruneOps.end());
569 size_t saved = saveInSlot(timeslice, input, slot, info);
573 index.publishSlot(slot);
574 index.markAsDirty(slot,
true);
582 std::tie(input, timeslice) = getInputTimeslice(pristineContext);
584 auto DataHeaderInfo = [&rawHeader]() {
587 const auto* dh = o2::header::get<o2::header::DataHeader*>(rawHeader);
589 error += fmt::format(
"{}/{}/{}", dh->dataOrigin, dh->dataDescription, dh->subSpecification);
591 error +=
"invalid header";
597 LOG(error) <<
"Could not match incoming data to any input route: " << DataHeaderInfo();
600 for (
size_t pi = 0; pi < nMessages; ++pi) {
601 messages[pi].reset(
nullptr);
607 LOG(error) <<
"Could not determine the timeslice for input: " << DataHeaderInfo();
610 for (
size_t pi = 0; pi < nMessages; ++pi) {
611 messages[pi].reset(
nullptr);
618 std::tie(action, slot) =
index.replaceLRUWith(pristineContext, timeslice);
619 uint64_t
const* debugTimestamp = std::get_if<uint64_t>(&pristineContext.
get(0));
622 "Slot %zu updated with %zu using action %d, %" PRIu64, slot.index, timeslice.value, (
int)action, *debugTimestamp);
625 updateStatistics(action);
631 static std::atomic<size_t> obsoleteCount = 0;
632 static std::atomic<size_t> mult = 1;
633 if ((obsoleteCount++ % (1 * mult)) == 0) {
634 LOGP(warning,
"Over {} incoming messages are already obsolete, not relaying.", obsoleteCount.load());
635 if (obsoleteCount > mult * 10) {
641 LOG(warning) <<
"Incoming data is invalid, not relaying.";
644 for (
size_t pi = 0; pi < nMessages; ++pi) {
645 messages[pi].reset(
nullptr);
653 mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](
const auto&
x) { return x.slot == slot; }), mPruneOps.end());
654 size_t saved = saveInSlot(timeslice, input, slot, info);
658 index.publishSlot(slot);
659 index.markAsDirty(slot,
true);
667 LOGP(
debug,
"DataRelayer::getReadyToProcess");
668 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
671 const auto& cache = mCache;
672 const auto numInputTypes = mDistinctRoutesIndex.size();
678 auto getPartialRecord = [&cache, &numInputTypes](
int li) -> gsl::span<MessageSet const> {
679 auto offset = li * numInputTypes;
680 assert(cache.size() >=
offset + numInputTypes);
682 auto const end = cache.data() +
offset + numInputTypes;
691 LOGP(
debug,
"Doing action {} for slot {} (timeslice: {})", (
int)
op, li.
index, *timeslice);
694 LOGP(
debug,
"No timeslice associated with slot ", li.
index);
709 if (numInputTypes == 0) {
710 LOGP(
debug,
"numInputTypes == 0, returning.");
713 size_t cacheLines = cache.size() / numInputTypes;
714 assert(cacheLines * numInputTypes == cache.size());
715 int countConsume = 0;
716 int countConsumeExisting = 0;
717 int countProcess = 0;
718 int countDiscard = 0;
722 for (
int li = cacheLines - 1; li >= 0; --li) {
726 if (mTimesliceIndex.
isDirty(slot) ==
false) {
731 throw runtime_error_f(
"Completion police %s has no callback set", mCompletionPolicy.
name.c_str());
733 auto partial = getPartialRecord(li);
735 auto getter = [&partial](
size_t idx,
size_t part) {
736 if (partial[idx].
size() > 0 && partial[idx].header(part).get()) {
737 auto header = partial[idx].header(part).get();
738 auto payload = partial[idx].payload(part).get();
740 reinterpret_cast<const char*
>(header->GetData()),
741 reinterpret_cast<char const*
>(payload ? payload->GetData() :
nullptr),
742 payload ? payload->GetSize() : 0};
746 auto nPartsGetter = [&partial](
size_t idx) {
747 return partial[idx].size();
749 InputSpan span{getter, nPartsGetter,
static_cast<size_t>(partial.size())};
753 auto timeslice = std::get_if<uint64_t>(&variables.get(0));
757 updateCompletionResults(slot, timeslice, action);
764 updateCompletionResults(slot, timeslice, action);
768 countConsumeExisting++;
769 updateCompletionResults(slot, timeslice, action);
774 updateCompletionResults(slot, timeslice, action);
779 updateCompletionResults(slot, timeslice, action);
794 LOGP(
debug,
"DataRelayer::getReadyToProcess results notDirty:{}, consume:{}, consumeExisting:{}, process:{}, discard:{}, wait:{}",
795 notDirty, countConsume, countConsumeExisting, countProcess,
796 countDiscard, countWait);
801 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
802 const auto numInputTypes = mDistinctRoutesIndex.size();
804 auto markInputDone = [&cachedStateMetrics = mCachedStateMetrics,
806 auto cacheId = s.index * numInputTypes + arg;
807 if (cachedStateMetrics[cacheId] == oldStatus) {
808 cachedStateMetrics[cacheId] = newStatus;
812 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
813 markInputDone(slot, ai, oldStatus, newStatus);
819 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
821 const auto numInputTypes = mDistinctRoutesIndex.size();
823 std::vector<MessageSet> messages(numInputTypes);
824 auto& cache = mCache;
825 auto&
index = mTimesliceIndex;
836 auto moveHeaderPayloadToOutput = [&messages,
837 &cachedStateMetrics = mCachedStateMetrics,
839 auto cacheId = s.index * numInputTypes + arg;
843 if (cache[cacheId].
size() > 0) {
844 messages[arg] = std::move(cache[cacheId]);
846 index.markAsInvalid(s);
854 for (
size_t ai = s.index * numInputTypes, ae = ai + numInputTypes; ai != ae; ++ai) {
855 assert(std::accumulate(cache[ai].messages.begin(), cache[ai].messages.end(),
true, [](
bool result,
auto const& element) { return result && element.get() == nullptr; }));
858 index.markAsInvalid(s);
862 jumpToCacheEntryAssociatedWith(slot);
863 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
864 moveHeaderPayloadToOutput(slot, ai);
866 invalidateCacheFor(slot);
873 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
875 const auto numInputTypes = mDistinctRoutesIndex.size();
877 std::vector<MessageSet> messages(numInputTypes);
878 auto& cache = mCache;
879 auto&
index = mTimesliceIndex;
890 auto copyHeaderPayloadToOutput = [&messages,
891 &cachedStateMetrics = mCachedStateMetrics,
893 auto cacheId = s.index * numInputTypes + arg;
897 for (
size_t pi = 0; pi < cache[cacheId].size(); pi++) {
898 auto& header = cache[cacheId].header(pi);
899 auto&& newHeader = header->GetTransport()->CreateMessage();
900 newHeader->Copy(*header);
901 messages[arg].add(
PartRef{std::move(newHeader), std::move(cache[cacheId].payload(pi))});
906 jumpToCacheEntryAssociatedWith(slot);
907 for (
size_t ai = 0, ae = numInputTypes; ai != ae; ++ai) {
908 copyHeaderPayloadToOutput(slot, ai);
911 return std::move(messages);
916 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
918 for (
auto& cache : mCache) {
921 for (
size_t s = 0; s < mTimesliceIndex.
size(); ++s) {
929 return mCache.size() / mDistinctRoutesIndex.size();
938 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
940 mTimesliceIndex.
resize(s);
941 mVariableContextes.resize(s);
947 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
949 auto numInputTypes = mDistinctRoutesIndex.size();
953 mCache.resize(numInputTypes * mTimesliceIndex.
size());
956 mCachedStateMetrics.resize(mCache.size());
961 for (
size_t i = 0;
i < mVariableContextes.size(); ++
i) {
963 .
name = fmt::format(
"matcher_variables/{}",
i),
965 .minPublishInterval = 500,
966 .sendInitialValue =
true,
971 for (
int ci = 0; ci < mTimesliceIndex.
size(); ci++) {
973 .
name = fmt::format(
"data_relayer/{}", ci),
975 .minPublishInterval = 800,
976 .sendInitialValue =
true,
984 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
990 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
996 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1002 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1008 std::scoped_lock<
O2_LOCKABLE(std::recursive_mutex)> lock(mMutex);
1010 for (
size_t ci = 0; ci < mTimesliceIndex.
size(); ++ci) {
1015 char relayerSlotState[1024];
1018 int written = snprintf(relayerSlotState, 1024,
"%d ", (
int)mTimesliceIndex.
size());
1019 char*
buffer = relayerSlotState + written;
1020 for (
size_t ci = 0; ci < mTimesliceIndex.
size(); ++ci) {
1021 for (
size_t si = 0; si < mDistinctRoutesIndex.size(); ++si) {
1022 int index = si * mTimesliceIndex.
size() + ci;
1023 int value =
static_cast<int>(mCachedStateMetrics[
index]);
1031 buffer[mDistinctRoutesIndex.size()] =
'\0';
1032 auto size = (
int)(
buffer - relayerSlotState + mDistinctRoutesIndex.size());
#define O2_BUILTIN_UNREACHABLE
o2::monitoring::Verbosity Verbosity
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
std::vector< MessageSet > consumeExistingInputsForTimeslice(TimesliceSlot id)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
std::function< void(TimesliceSlot, std::vector< MessageSet > &, TimesliceIndex::OldestOutputInfo info)> OnDropCallback
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
void getReadyToProcess(std::vector< RecordAction > &completed)
void setPipelineLength(size_t s)
Tune the maximum number of in flight timeslices this can handle.
size_t getParallelTimeslices() const
Returns how many timeslices we can handle in parallel.
std::vector< MessageSet > consumeAllInputsForTimeslice(TimesliceSlot id)
void pruneCache(TimesliceSlot slot, OnDropCallback onDrop=nullptr)
Prune the cache for a given slot.
RelayChoice relay(void const *rawHeader, std::unique_ptr< fair::mq::Message > *messages, InputInfo const &info, size_t nMessages, size_t nPayloads=1, OnDropCallback onDrop=nullptr)
DataRelayer(CompletionPolicy const &, std::vector< InputRoute > const &routes, TimesliceIndex &, ServiceRegistryRef)
void setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
void sendContextState()
Send metrics with the VariableContext information.
TimesliceId getTimesliceForSlot(TimesliceSlot slot)
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
TimesliceIndex::OldestOutputInfo getOldestPossibleOutput() const
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
void clear()
Remove all pending messages.
void markAsDirty(TimesliceSlot slot, bool value)
data_matcher::VariableContext & getPublishedVariablesForSlot(TimesliceSlot slot)
OldestInputInfo setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
OldestOutputInfo getOldestPossibleOutput() const
bool isDirty(TimesliceSlot const &slot) const
InputChannelInfo const & getChannelInfo(ChannelIndex channel) const
ActionTaken
The outcome for the processing of a given timeslot.
@ Wait
An obsolete slot is used to hold the new context and the old one is dropped.
@ DropObsolete
An invalid context is not inserted in the index and dropped.
@ DropInvalid
We wait for the oldest slot to complete.
@ ReplaceObsolete
An unused / invalid slot is used to hold the new context.
void rescan()
Mark all the cachelines as invalid, e.g. due to an out of band event.
bool validateSlot(TimesliceSlot slot, TimesliceId currentOldest)
bool isValid(TimesliceSlot const &slot) const
void markAsInvalid(TimesliceSlot slot)
OldestOutputInfo updateOldestPossibleOutput(bool rewinded)
data_matcher::VariableContext & getVariablesForSlot(TimesliceSlot slot)
void publish(void(*callback)(VariableContext const &, TimesliceSlot slot, void *context), void *context, TimesliceSlot slot)
ContextElement::Value const & get(size_t pos) const
GLsizei const GLfloat * value
constexpr int MAX_MATCHING_VARIABLE
Defining PrimaryVertex explicitly as messageable.
size_t matchToContext(void const *data, std::vector< DataDescriptorMatcher > const &matchers, std::vector< size_t > const &index, VariableContext &context)
constexpr int INVALID_INPUT
bool isCalibrationData(std::unique_ptr< fair::mq::Message > &first)
void sendVariableContextMetrics(VariableContext &context, TimesliceSlot slot, DataProcessingStates &states)
@ NoTransition
No pending transitions.
@ DROPPED_INCOMING_MESSAGES
RuntimeErrorRef runtime_error_f(const char *,...)
std::string to_string(gsl::span< T, Size > span)
static constexpr int INVALID
std::string name
Name of the policy itself.
CompletionOp
Action to take with the InputRecord:
@ Retry
Like Wait but mark the cacheline as dirty.
CallbackConfigureRelayer configureRelayer
CallbackFull callbackFull
Actual policy which decides what to do with a partial InputRecord, extended version.
Helper struct to hold statistics about the data processing happening.
@ Add
Update the rate of the metric given the amount since the last time.
Type type
What was the outcome of the relay operation.
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.
static std::string describe(InputSpec const &spec)
static unsigned int pipelineLength()
get max number of timeslices in the queue
static bool onlineDeploymentMode()
@true if running online
Running state information of a given device.
ProcessingType allowedProcessing
Reference to an inflight part.
static bool isValid(TimesliceId const ×lice)
static constexpr uint64_t INVALID
static bool isValid(TimesliceSlot const &slot)
static constexpr uint64_t INVALID
static uint32_t getRunNumber(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFCounter(data_matcher::VariableContext const &variables)
static uint64_t getCreationTime(data_matcher::VariableContext const &variables)
static uint32_t getFirstTFOrbit(data_matcher::VariableContext const &variables)
static TimesliceId getTimeslice(data_matcher::VariableContext const &variables)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"