![]() |
Project
|
#include <DataRelayer.h>
Classes | |
struct | ActivityStats |
struct | InputInfo |
struct | PruneOp |
struct | RecordAction |
struct | RelayChoice |
Public Types | |
enum struct | InputType : int { Invalid = 0 , Data = 1 , SourceInfo = 2 , DomainInfo = 3 } |
using | OnDropCallback = std::function< void(TimesliceSlot, std::vector< MessageSet > &, TimesliceIndex::OldestOutputInfo info)> |
Public Member Functions | |
DataRelayer (CompletionPolicy const &, std::vector< InputRoute > const &routes, TimesliceIndex &, ServiceRegistryRef) | |
ActivityStats | processDanglingInputs (std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew) |
void | prunePending (OnDropCallback) |
Prune all the pending entries in the cache. | |
void | pruneCache (TimesliceSlot slot, OnDropCallback onDrop=nullptr) |
Prune the cache for a given slot. | |
RelayChoice | relay (void const *rawHeader, std::unique_ptr< fair::mq::Message > *messages, InputInfo const &info, size_t nMessages, size_t nPayloads=1, OnDropCallback onDrop=nullptr) |
void | setOldestPossibleInput (TimesliceId timeslice, ChannelIndex channel) |
TimesliceIndex::OldestOutputInfo | getOldestPossibleOutput () const |
void | getReadyToProcess (std::vector< RecordAction > &completed) |
std::vector< MessageSet > | consumeAllInputsForTimeslice (TimesliceSlot id) |
std::vector< MessageSet > | consumeExistingInputsForTimeslice (TimesliceSlot id) |
size_t | getParallelTimeslices () const |
Returns how many timeslices we can handle in parallel. | |
void | setPipelineLength (size_t s) |
Tune the maximum number of in flight timeslices this can handle. | |
void | sendContextState () |
Send metrics with the VariableContext information. | |
void | publishMetrics () |
TimesliceId | getTimesliceForSlot (TimesliceSlot slot) |
void | updateCacheStatus (TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus) |
uint32_t | getFirstTFOrbitForSlot (TimesliceSlot slot) |
Get the firstTForbit associate to a given slot. | |
uint32_t | getFirstTFCounterForSlot (TimesliceSlot slot) |
Get the firstTFCounter associate to a given slot. | |
uint32_t | getRunNumberForSlot (TimesliceSlot slot) |
Get the runNumber associated to a given slot. | |
uint64_t | getCreationTimeForSlot (TimesliceSlot slot) |
Get the creation time associated to a given slot. | |
void | clear () |
Remove all pending messages. | |
void | rescan () |
size_t | getCacheSize () const |
size_t | getNumberOfTimeslices () const |
size_t | getNumberOfUniqueInputs () const |
Static Public Attributes | |
static constexpr ServiceKind | service_kind = ServiceKind::Global |
Definition at line 47 of file DataRelayer.h.
using o2::framework::DataRelayer::OnDropCallback = std::function<void(TimesliceSlot, std::vector<MessageSet>&, TimesliceIndex::OldestOutputInfo info)> |
Definition at line 115 of file DataRelayer.h.
|
strong |
Enumerator | |
---|---|
Invalid | |
Data | |
SourceInfo | |
DomainInfo |
Definition at line 84 of file DataRelayer.h.
o2::framework::DataRelayer::DataRelayer | ( | CompletionPolicy const & | policy, |
std::vector< InputRoute > const & | routes, | ||
TimesliceIndex & | index, | ||
ServiceRegistryRef | services | ||
) |
Definition at line 64 of file DataRelayer.cxx.
void o2::framework::DataRelayer::clear | ( | ) |
Remove all pending messages.
Definition at line 914 of file DataRelayer.cxx.
std::vector< o2::framework::MessageSet > o2::framework::DataRelayer::consumeAllInputsForTimeslice | ( | TimesliceSlot | id | ) |
Returns an input registry associated to the given timeslice and gives ownership to the caller. This is because once the inputs are out of the DataRelayer they need to be deleted once the processing is concluded.
Definition at line 817 of file DataRelayer.cxx.
std::vector< o2::framework::MessageSet > o2::framework::DataRelayer::consumeExistingInputsForTimeslice | ( | TimesliceSlot | id | ) |
Definition at line 871 of file DataRelayer.cxx.
|
inline |
Definition at line 190 of file DataRelayer.h.
uint64_t o2::framework::DataRelayer::getCreationTimeForSlot | ( | TimesliceSlot | slot | ) |
Get the creation time associated to a given slot.
Definition at line 1000 of file DataRelayer.cxx.
uint32_t o2::framework::DataRelayer::getFirstTFCounterForSlot | ( | TimesliceSlot | slot | ) |
Get the firstTFCounter associate to a given slot.
Definition at line 988 of file DataRelayer.cxx.
uint32_t o2::framework::DataRelayer::getFirstTFOrbitForSlot | ( | TimesliceSlot | slot | ) |
Get the firstTForbit associate to a given slot.
Definition at line 982 of file DataRelayer.cxx.
|
inline |
Definition at line 191 of file DataRelayer.h.
|
inline |
Definition at line 192 of file DataRelayer.h.
TimesliceIndex::OldestOutputInfo o2::framework::DataRelayer::getOldestPossibleOutput | ( | ) | const |
This is to retrieve the oldest possible timeslice this relayer can possibly have in output.
Definition at line 346 of file DataRelayer.cxx.
size_t o2::framework::DataRelayer::getParallelTimeslices | ( | ) | const |
Returns how many timeslices we can handle in parallel.
Definition at line 927 of file DataRelayer.cxx.
void o2::framework::DataRelayer::getReadyToProcess | ( | std::vector< RecordAction > & | completed | ) |
Definition at line 665 of file DataRelayer.cxx.
uint32_t o2::framework::DataRelayer::getRunNumberForSlot | ( | TimesliceSlot | slot | ) |
Get the runNumber associated to a given slot.
Definition at line 994 of file DataRelayer.cxx.
TimesliceId o2::framework::DataRelayer::getTimesliceForSlot | ( | TimesliceSlot | slot | ) |
Get timeslice associated to a given slot. Notice how this avoids exposing the timesliceIndex directly so that we can mutex on it.
Definition at line 103 of file DataRelayer.cxx.
DataRelayer::ActivityStats o2::framework::DataRelayer::processDanglingInputs | ( | std::vector< ExpirationHandler > const & | expirationHandlers, |
ServiceRegistryRef | context, | ||
bool | createNew | ||
) |
This invokes the appropriate InputRoute::danglingChecker
on every entry in the cache and if it returns true, it creates a new cache entry by invoking the associated InputRoute::expirationHandler
. createNew true if the dangling inputs are allowed to create new slots.
Nothing to do if nothing can expire.
Definition at line 110 of file DataRelayer.cxx.
void o2::framework::DataRelayer::pruneCache | ( | TimesliceSlot | slot, |
OnDropCallback | onDrop = nullptr |
||
) |
Prune the cache for a given slot.
Definition at line 359 of file DataRelayer.cxx.
void o2::framework::DataRelayer::prunePending | ( | OnDropCallback | onDrop | ) |
Prune all the pending entries in the cache.
Definition at line 351 of file DataRelayer.cxx.
void o2::framework::DataRelayer::publishMetrics | ( | ) |
Definition at line 945 of file DataRelayer.cxx.
DataRelayer::RelayChoice o2::framework::DataRelayer::relay | ( | void const * | rawHeader, |
std::unique_ptr< fair::mq::Message > * | messages, | ||
InputInfo const & | info, | ||
size_t | nMessages, | ||
size_t | nPayloads = 1 , |
||
OnDropCallback | onDrop = nullptr |
||
) |
This is to relay a whole set of fair::mq::Messages, all which are part of the same set of split parts. rawHeader raw header pointer messages pointer to array of messages nMessages size of the array nPayloads number of payploads in the message sequence, default is 1 which is the standard header-payload message pair, in this case nMessages / 2 pairs will be inserted and considered separate parts onDrop function to be called if an message is dropped Notice that we expect that the header is an O2 Header Stack
FIXME: for the moment we only use the first context and reset between one invokation and the other.
The first argument is always matched against the data start time, so we can assert it's the same as the dph->startTime
If we get a valid result, we can store the message in cache.
If not, we find which timeslice we really were looking at and see if we can prune something from the cache.
Definition at line 412 of file DataRelayer.cxx.
|
inline |
Rescan the whole data to see if there is anything new we should do, e.g. as consequnce of an OOB event.
Definition at line 188 of file DataRelayer.h.
void o2::framework::DataRelayer::sendContextState | ( | ) |
Send metrics with the VariableContext information.
Definition at line 1006 of file DataRelayer.cxx.
void o2::framework::DataRelayer::setOldestPossibleInput | ( | TimesliceId | timeslice, |
ChannelIndex | channel | ||
) |
This is to set the oldest possible timeslice this relayer can possibly see on an input channel channel.
Definition at line 284 of file DataRelayer.cxx.
void o2::framework::DataRelayer::setPipelineLength | ( | size_t | s | ) |
Tune the maximum number of in flight timeslices this can handle.
Tune the maximum number of in flight timeslices this can handle. Notice that in case we have time pipelining we need to count the actual number of different types, without taking into account the time pipelining.
Definition at line 936 of file DataRelayer.cxx.
void o2::framework::DataRelayer::updateCacheStatus | ( | TimesliceSlot | slot, |
CacheEntryStatus | oldStatus, | ||
CacheEntryStatus | newStatus | ||
) |
Mark a given slot as done so that the GUI can reflect that.
Definition at line 799 of file DataRelayer.cxx.
|
staticconstexpr |
DataRelayer is thread safe because we have a lock around each method and there is no particular order in which methods need to be called.
Definition at line 53 of file DataRelayer.h.