Project
Loading...
Searching...
No Matches
o2::framework::DataRelayer Class Reference

#include <DataRelayer.h>

Classes

struct  ActivityStats
 
struct  InputInfo
 
struct  PruneOp
 
struct  RecordAction
 
struct  RelayChoice
 

Public Types

enum struct  InputType : int { Invalid = 0 , Data = 1 , SourceInfo = 2 , DomainInfo = 3 }
 
using OnDropCallback = std::function< void(TimesliceSlot, std::vector< MessageSet > &, TimesliceIndex::OldestOutputInfo info)>
 

Public Member Functions

 DataRelayer (CompletionPolicy const &, std::vector< InputRoute > const &routes, TimesliceIndex &, ServiceRegistryRef)
 
ActivityStats processDanglingInputs (std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
 
void prunePending (OnDropCallback)
 Prune all the pending entries in the cache.
 
void pruneCache (TimesliceSlot slot, OnDropCallback onDrop=nullptr)
 Prune the cache for a given slot.
 
RelayChoice relay (void const *rawHeader, std::unique_ptr< fair::mq::Message > *messages, InputInfo const &info, size_t nMessages, size_t nPayloads=1, OnDropCallback onDrop=nullptr)
 
void setOldestPossibleInput (TimesliceId timeslice, ChannelIndex channel)
 
TimesliceIndex::OldestOutputInfo getOldestPossibleOutput () const
 
void getReadyToProcess (std::vector< RecordAction > &completed)
 
std::vector< MessageSetconsumeAllInputsForTimeslice (TimesliceSlot id)
 
std::vector< MessageSetconsumeExistingInputsForTimeslice (TimesliceSlot id)
 
size_t getParallelTimeslices () const
 Returns how many timeslices we can handle in parallel.
 
void setPipelineLength (size_t s)
 Tune the maximum number of in flight timeslices this can handle.
 
void sendContextState ()
 Send metrics with the VariableContext information.
 
void publishMetrics ()
 
TimesliceId getTimesliceForSlot (TimesliceSlot slot)
 
void updateCacheStatus (TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
 
uint32_t getFirstTFOrbitForSlot (TimesliceSlot slot)
 Get the firstTForbit associate to a given slot.
 
uint32_t getFirstTFCounterForSlot (TimesliceSlot slot)
 Get the firstTFCounter associate to a given slot.
 
uint32_t getRunNumberForSlot (TimesliceSlot slot)
 Get the runNumber associated to a given slot.
 
uint64_t getCreationTimeForSlot (TimesliceSlot slot)
 Get the creation time associated to a given slot.
 
void clear ()
 Remove all pending messages.
 
void rescan ()
 
size_t getCacheSize () const
 
size_t getNumberOfTimeslices () const
 
size_t getNumberOfUniqueInputs () const
 

Static Public Attributes

static constexpr ServiceKind service_kind = ServiceKind::Global
 

Detailed Description

Definition at line 47 of file DataRelayer.h.

Member Typedef Documentation

◆ OnDropCallback

Definition at line 115 of file DataRelayer.h.

Member Enumeration Documentation

◆ InputType

Enumerator
Invalid 
Data 
SourceInfo 
DomainInfo 

Definition at line 84 of file DataRelayer.h.

Constructor & Destructor Documentation

◆ DataRelayer()

o2::framework::DataRelayer::DataRelayer ( CompletionPolicy const &  policy,
std::vector< InputRoute > const &  routes,
TimesliceIndex index,
ServiceRegistryRef  services 
)

Definition at line 64 of file DataRelayer.cxx.

Member Function Documentation

◆ clear()

void o2::framework::DataRelayer::clear ( )

Remove all pending messages.

Definition at line 914 of file DataRelayer.cxx.

◆ consumeAllInputsForTimeslice()

std::vector< o2::framework::MessageSet > o2::framework::DataRelayer::consumeAllInputsForTimeslice ( TimesliceSlot  id)

Returns an input registry associated to the given timeslice and gives ownership to the caller. This is because once the inputs are out of the DataRelayer they need to be deleted once the processing is concluded.

Definition at line 817 of file DataRelayer.cxx.

◆ consumeExistingInputsForTimeslice()

std::vector< o2::framework::MessageSet > o2::framework::DataRelayer::consumeExistingInputsForTimeslice ( TimesliceSlot  id)

Definition at line 871 of file DataRelayer.cxx.

◆ getCacheSize()

size_t o2::framework::DataRelayer::getCacheSize ( ) const
inline

Definition at line 190 of file DataRelayer.h.

◆ getCreationTimeForSlot()

uint64_t o2::framework::DataRelayer::getCreationTimeForSlot ( TimesliceSlot  slot)

Get the creation time associated to a given slot.

Definition at line 1000 of file DataRelayer.cxx.

◆ getFirstTFCounterForSlot()

uint32_t o2::framework::DataRelayer::getFirstTFCounterForSlot ( TimesliceSlot  slot)

Get the firstTFCounter associate to a given slot.

Definition at line 988 of file DataRelayer.cxx.

◆ getFirstTFOrbitForSlot()

uint32_t o2::framework::DataRelayer::getFirstTFOrbitForSlot ( TimesliceSlot  slot)

Get the firstTForbit associate to a given slot.

Definition at line 982 of file DataRelayer.cxx.

◆ getNumberOfTimeslices()

size_t o2::framework::DataRelayer::getNumberOfTimeslices ( ) const
inline

Definition at line 191 of file DataRelayer.h.

◆ getNumberOfUniqueInputs()

size_t o2::framework::DataRelayer::getNumberOfUniqueInputs ( ) const
inline

Definition at line 192 of file DataRelayer.h.

◆ getOldestPossibleOutput()

TimesliceIndex::OldestOutputInfo o2::framework::DataRelayer::getOldestPossibleOutput ( ) const

This is to retrieve the oldest possible timeslice this relayer can possibly have in output.

Definition at line 346 of file DataRelayer.cxx.

◆ getParallelTimeslices()

size_t o2::framework::DataRelayer::getParallelTimeslices ( ) const

Returns how many timeslices we can handle in parallel.

Definition at line 927 of file DataRelayer.cxx.

◆ getReadyToProcess()

void o2::framework::DataRelayer::getReadyToProcess ( std::vector< RecordAction > &  completed)
Returns
the actions ready to be performed.

Definition at line 665 of file DataRelayer.cxx.

◆ getRunNumberForSlot()

uint32_t o2::framework::DataRelayer::getRunNumberForSlot ( TimesliceSlot  slot)

Get the runNumber associated to a given slot.

Definition at line 994 of file DataRelayer.cxx.

◆ getTimesliceForSlot()

TimesliceId o2::framework::DataRelayer::getTimesliceForSlot ( TimesliceSlot  slot)

Get timeslice associated to a given slot. Notice how this avoids exposing the timesliceIndex directly so that we can mutex on it.

Definition at line 103 of file DataRelayer.cxx.

◆ processDanglingInputs()

DataRelayer::ActivityStats o2::framework::DataRelayer::processDanglingInputs ( std::vector< ExpirationHandler > const &  expirationHandlers,
ServiceRegistryRef  context,
bool  createNew 
)

This invokes the appropriate InputRoute::danglingChecker on every entry in the cache and if it returns true, it creates a new cache entry by invoking the associated InputRoute::expirationHandler. createNew true if the dangling inputs are allowed to create new slots.

Returns
true if there were expirations, false if not.

Nothing to do if nothing can expire.

Definition at line 110 of file DataRelayer.cxx.

◆ pruneCache()

void o2::framework::DataRelayer::pruneCache ( TimesliceSlot  slot,
OnDropCallback  onDrop = nullptr 
)

Prune the cache for a given slot.

Definition at line 359 of file DataRelayer.cxx.

◆ prunePending()

void o2::framework::DataRelayer::prunePending ( OnDropCallback  onDrop)

Prune all the pending entries in the cache.

Definition at line 351 of file DataRelayer.cxx.

◆ publishMetrics()

void o2::framework::DataRelayer::publishMetrics ( )

Definition at line 945 of file DataRelayer.cxx.

◆ relay()

DataRelayer::RelayChoice o2::framework::DataRelayer::relay ( void const *  rawHeader,
std::unique_ptr< fair::mq::Message > *  messages,
InputInfo const &  info,
size_t  nMessages,
size_t  nPayloads = 1,
OnDropCallback  onDrop = nullptr 
)

This is to relay a whole set of fair::mq::Messages, all which are part of the same set of split parts. rawHeader raw header pointer messages pointer to array of messages nMessages size of the array nPayloads number of payploads in the message sequence, default is 1 which is the standard header-payload message pair, in this case nMessages / 2 pairs will be inserted and considered separate parts onDrop function to be called if an message is dropped Notice that we expect that the header is an O2 Header Stack

FIXME: for the moment we only use the first context and reset between one invokation and the other.

The first argument is always matched against the data start time, so we can assert it's the same as the dph->startTime

If we get a valid result, we can store the message in cache.

If not, we find which timeslice we really were looking at and see if we can prune something from the cache.

Definition at line 412 of file DataRelayer.cxx.

◆ rescan()

void o2::framework::DataRelayer::rescan ( )
inline

Rescan the whole data to see if there is anything new we should do, e.g. as consequnce of an OOB event.

Definition at line 188 of file DataRelayer.h.

◆ sendContextState()

void o2::framework::DataRelayer::sendContextState ( )

Send metrics with the VariableContext information.

Definition at line 1006 of file DataRelayer.cxx.

◆ setOldestPossibleInput()

void o2::framework::DataRelayer::setOldestPossibleInput ( TimesliceId  timeslice,
ChannelIndex  channel 
)

This is to set the oldest possible timeslice this relayer can possibly see on an input channel channel.

Definition at line 284 of file DataRelayer.cxx.

◆ setPipelineLength()

void o2::framework::DataRelayer::setPipelineLength ( size_t  s)

Tune the maximum number of in flight timeslices this can handle.

Tune the maximum number of in flight timeslices this can handle. Notice that in case we have time pipelining we need to count the actual number of different types, without taking into account the time pipelining.

Definition at line 936 of file DataRelayer.cxx.

◆ updateCacheStatus()

void o2::framework::DataRelayer::updateCacheStatus ( TimesliceSlot  slot,
CacheEntryStatus  oldStatus,
CacheEntryStatus  newStatus 
)

Mark a given slot as done so that the GUI can reflect that.

Definition at line 799 of file DataRelayer.cxx.

Member Data Documentation

◆ service_kind

constexpr ServiceKind o2::framework::DataRelayer::service_kind = ServiceKind::Global
staticconstexpr

DataRelayer is thread safe because we have a lock around each method and there is no particular order in which methods need to be called.

Definition at line 53 of file DataRelayer.h.


The documentation for this class was generated from the following files: