Project
Loading...
Searching...
No Matches
DataRelayer.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef O2_FRAMEWORK_DATARELAYER_H_
12#define O2_FRAMEWORK_DATARELAYER_H_
13
21#include "Framework/Tracing.h"
24
25#include <cstddef>
26#include <mutex>
27#include <vector>
28#include <functional>
29
30#include <fairmq/FwdDecls.h>
31
32namespace o2::monitoring
33{
34class Monitoring;
35}
36
37namespace o2::framework
38{
39
40enum struct CacheEntryStatus : int {
41 EMPTY,
42 PENDING,
43 RUNNING,
44 DONE
45};
46
48{
49 public:
56 struct RelayChoice {
57 enum struct Type {
58 WillRelay,
59 Invalid,
61 Dropped
62 };
65 // The timeslice affected by the given operation.
67 };
68
70 int newSlots = 0;
71 int expiredSlots = 0;
72 };
73
74 struct PruneOp {
76 };
77
83
84 enum struct InputType : int {
85 Invalid = 0,
86 Data = 1,
87 SourceInfo = 2,
88 DomainInfo = 3
89 };
90
91 struct InputInfo {
92 InputInfo(size_t p, size_t s, InputType t, ChannelIndex i)
93 : position(p), size(s), type(t), index(i)
94 {
95 }
96 size_t position;
97 size_t size;
100 };
101
103 std::vector<InputRoute> const& routes,
106
112 ActivityStats processDanglingInputs(std::vector<ExpirationHandler> const&,
113 ServiceRegistryRef context, bool createNew);
114
115 using OnDropCallback = std::function<void(TimesliceSlot, std::vector<MessageSet>&, TimesliceIndex::OldestOutputInfo info)>;
116
120 void pruneCache(TimesliceSlot slot, OnDropCallback onDrop = nullptr);
121
133 RelayChoice relay(void const* rawHeader,
134 std::unique_ptr<fair::mq::Message>* messages,
135 InputInfo const& info,
136 size_t nMessages,
137 size_t nPayloads = 1,
138 OnDropCallback onDrop = nullptr);
139
142 void setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel);
143
147
149 void getReadyToProcess(std::vector<RecordAction>& completed);
150
154 std::vector<MessageSet> consumeAllInputsForTimeslice(TimesliceSlot id);
155 std::vector<MessageSet> consumeExistingInputsForTimeslice(TimesliceSlot id);
156
158 [[nodiscard]] size_t getParallelTimeslices() const;
159
161 void setPipelineLength(size_t s);
162
164 void sendContextState();
165 void publishMetrics();
166
171
174 void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus);
180 uint32_t getRunNumberForSlot(TimesliceSlot slot);
184 void clear();
185
188 void rescan() { mTimesliceIndex.rescan(); };
189
190 [[nodiscard]] size_t getCacheSize() const { return mCache.size(); }
191 [[nodiscard]] size_t getNumberOfTimeslices() const { return mTimesliceIndex.size(); }
192 [[nodiscard]] size_t getNumberOfUniqueInputs() const { return mDistinctRoutesIndex.size(); }
193
194 private:
195 ServiceRegistryRef mContext;
196
201 std::vector<MessageSet> mCache;
202
205 TimesliceIndex& mTimesliceIndex;
206
207 CompletionPolicy mCompletionPolicy;
208 std::vector<size_t> mDistinctRoutesIndex;
209 std::vector<InputSpec> mInputs;
210 std::vector<data_matcher::DataDescriptorMatcher> mInputMatchers;
211 std::vector<data_matcher::VariableContext> mVariableContextes;
212 std::vector<CacheEntryStatus> mCachedStateMetrics;
213 std::vector<PruneOp> mPruneOps;
214 size_t mMaxLanes;
215
216 O2_LOCKABLE_NAMED(std::recursive_mutex, mMutex, "data relayer mutex");
217};
218
219} // namespace o2::framework
220
221#endif // O2_FRAMEWORK_DATARELAYER_H_
int32_t i
#define O2_LOCKABLE_NAMED(T, V, N)
Definition Tracing.h:19
o2::monitoring::Monitoring Monitoring
std::vector< MessageSet > consumeExistingInputsForTimeslice(TimesliceSlot id)
uint32_t getFirstTFOrbitForSlot(TimesliceSlot slot)
Get the firstTForbit associate to a given slot.
void updateCacheStatus(TimesliceSlot slot, CacheEntryStatus oldStatus, CacheEntryStatus newStatus)
std::function< void(TimesliceSlot, std::vector< MessageSet > &, TimesliceIndex::OldestOutputInfo info)> OnDropCallback
uint32_t getRunNumberForSlot(TimesliceSlot slot)
Get the runNumber associated to a given slot.
void prunePending(OnDropCallback)
Prune all the pending entries in the cache.
size_t getCacheSize() const
void getReadyToProcess(std::vector< RecordAction > &completed)
void setPipelineLength(size_t s)
Tune the maximum number of in flight timeslices this can handle.
size_t getParallelTimeslices() const
Returns how many timeslices we can handle in parallel.
std::vector< MessageSet > consumeAllInputsForTimeslice(TimesliceSlot id)
void pruneCache(TimesliceSlot slot, OnDropCallback onDrop=nullptr)
Prune the cache for a given slot.
RelayChoice relay(void const *rawHeader, std::unique_ptr< fair::mq::Message > *messages, InputInfo const &info, size_t nMessages, size_t nPayloads=1, OnDropCallback onDrop=nullptr)
void setOldestPossibleInput(TimesliceId timeslice, ChannelIndex channel)
uint64_t getCreationTimeForSlot(TimesliceSlot slot)
Get the creation time associated to a given slot.
static constexpr ServiceKind service_kind
Definition DataRelayer.h:53
void sendContextState()
Send metrics with the VariableContext information.
TimesliceId getTimesliceForSlot(TimesliceSlot slot)
size_t getNumberOfTimeslices() const
ActivityStats processDanglingInputs(std::vector< ExpirationHandler > const &, ServiceRegistryRef context, bool createNew)
size_t getNumberOfUniqueInputs() const
TimesliceIndex::OldestOutputInfo getOldestPossibleOutput() const
uint32_t getFirstTFCounterForSlot(TimesliceSlot slot)
Get the firstTFCounter associate to a given slot.
void clear()
Remove all pending messages.
void rescan()
Mark all the cachelines as invalid, e.g. due to an out of band event.
GLsizeiptr size
Definition glcorearb.h:659
GLuint index
Definition glcorearb.h:781
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
ServiceKind
The kind of service we are asking for.
CompletionOp
Action to take with the InputRecord:
InputInfo(size_t p, size_t s, InputType t, ChannelIndex i)
Definition DataRelayer.h:92
CompletionPolicy::CompletionOp op
Definition DataRelayer.h:81
Type type
What was the outcome of the relay operation.
Definition DataRelayer.h:64
@ Invalid
Ownership of the data has been taken.
@ Backpressured
The incoming data was not valid and has been dropped.
@ Dropped
The incoming data was not relayed, because we are backpressured.