11#ifndef O2_FRAMEWORK_DATARELAYER_H_
12#define O2_FRAMEWORK_DATARELAYER_H_
30#include <fairmq/FwdDecls.h>
103 std::vector<InputRoute>
const& routes,
134 std::unique_ptr<fair::mq::Message>* messages,
137 size_t nPayloads = 1,
201 std::vector<MessageSet> mCache;
208 std::vector<size_t> mDistinctRoutesIndex;
209 std::vector<InputSpec> mInputs;
210 std::vector<data_matcher::DataDescriptorMatcher> mInputMatchers;
211 std::vector<data_matcher::VariableContext> mVariableContextes;
212 std::vector<CacheEntryStatus> mCachedStateMetrics;
213 std::vector<PruneOp> mPruneOps;
#define O2_LOCKABLE_NAMED(T, V, N)
o2::monitoring::Monitoring Monitoring
std::vector< MessageSet > consumeExistingInputsForTimeslice(TimesliceSlot id)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
std::function< void(TimesliceSlot, std::vector< MessageSet > &, TimesliceIndex::OldestOutputInfo info)> OnDropCallback
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
size_t getCacheSize() const
void getReadyToProcess(std::vector< RecordAction > &completed)
void setPipelineLength(size_t s)
Tune the maximum number of in flight timeslices this can handle.
size_t getParallelTimeslices() const
Returns how many timeslices we can handle in parallel.
std::vector< MessageSet > consumeAllInputsForTimeslice(TimesliceSlot id)
void pruneCache(TimesliceSlot slot, OnDropCallback onDrop=nullptr)
Prune the cache for a given slot.
RelayChoice relay(void const *rawHeader, std::unique_ptr< fair::mq::Message > *messages, InputInfo const &info, size_t nMessages, size_t nPayloads=1, OnDropCallback onDrop=nullptr)
void setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
static constexpr ServiceKind service_kind
void sendContextState()
Send metrics with the VariableContext information.
TimesliceId getTimesliceForSlot(TimesliceSlot slot)
size_t getNumberOfTimeslices() const
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
size_t getNumberOfUniqueInputs() const
TimesliceIndex::OldestOutputInfo getOldestPossibleOutput() const
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
void clear()
Remove all pending messages.
void rescan()
Mark all the cachelines as invalid, e.g. due to an out of band event.
GLint GLint GLsizei GLint GLenum GLenum type
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
Defining PrimaryVertex explicitly as messageable.
ServiceKind
The kind of service we are asking for.
CompletionOp
Action to take with the InputRecord:
CompletionPolicy::CompletionOp op
Type type
What was the outcome of the relay operation.
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.