Project
Loading...
Searching...
No Matches
DataProcessingStates.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef O2_FRAMEWORK_DATAPROCESSINGSTATES_H_
12#define O2_FRAMEWORK_DATAPROCESSINGSTATES_H_
13
14#include "DeviceState.h"
17#include <atomic>
18#include <cstdint>
19#include <array>
20#include <numeric>
21#include <mutex>
22#include <utility>
23
24namespace o2::framework
25{
26
30 static std::function<void(int64_t& base, int64_t& offset)> defaultRealtimeBaseConfigurator(uint64_t offset, uv_loop_t* loop);
31 static std::function<int64_t(int64_t base, int64_t offset)> defaultCPUTimeConfigurator();
32};
33
35enum struct ProcessingStateId : short {
36 DUMMY_STATE = 0,
37 DATA_QUERIES = 1,
41};
42
50 DataProcessingStates(std::function<void(int64_t& base, int64_t& offset)> getRealtimeBase,
51 std::function<int64_t(int64_t base, int64_t offset)> getTimestamp);
52
72
74 constexpr static int STATES_BUFFER_SIZE = 1 << 18;
75 constexpr static int MAX_STATES = 2048;
76
77 // This is the structure to request the state update
78 struct CommandSpec {
79 short id = -1; // Id of the state to update.
80 int size = 0; // Size of the state.
81 char const* data = nullptr; // Pointer to the beginning of the state
82 };
83
84 // This is the structure to keep track of local updates to the states.
85 // Notice how the states are kept in a single buffer in reverse order
86 // and that the actual payload is stored after the header.
87 // This way we can simply flush the buffer by iterating from the
88 // previous insertion point forward, skipping things which have and
89 // older timestamp. Besides the generation, we can also keep track of
91 short id = 0; // The id of the state to update
92 int size = 0; // The size of the state
93 int64_t timestamp = 0; // Timestamp of the update
94 };
95
96 // This structure is used to keep track of the last updates
97 // for each of the metrics. This can be used to defer the need
98 // to flush the buffers to the remote end, so that we do not need to
99 // send metrics synchronously but we can do e.g. as a batch update.
100 // It also prevents that we send the same metric multiple times, because
101 // we keep track of the time of the last update.
102 struct UpdateInfo {
103 int64_t timestamp = 0; // When the update actually took place
104 int64_t lastPublished = 0; // When the update was last published
105 int64_t generation = -1; // The generation which did the update
106 };
107
108 struct StateSpec {
109 // Id of the metric. It must match the index in the metrics array.
110 // Name of the metric
111 std::string name = "";
112 short stateId = -1;
117 uint64_t maxRefreshLatency = -1;
120 bool sendInitialValue = false;
124 bool defaultEnabled = true;
125 };
126
127 struct StateView {
129 int first = 0;
131 short size = 0;
133 short capacity = 0;
134 };
135
136 std::string_view state(int64_t id) const
137 {
138 auto& view = statesViews[id];
139
140 return std::string_view(statesBuffer.data() + view.first, view.size);
141 }
142
143 void registerState(StateSpec const& spec);
144 // Update some stats as specified by the @cmd cmd
145 void updateState(CommandSpec state);
146 // Flush the states which are pending on the intermediate buffer.
147 void processCommandQueue();
148
149 void flushChangedStates(std::function<void(std::string const&, int64_t, std::string_view)> const& callback);
150 void repack();
151
152 std::atomic<size_t> statesSize;
153
156 std::array<char, STATES_BUFFER_SIZE> store = {};
158 std::vector<char> statesBuffer;
160 std::array<StateView, MAX_STATES> statesViews = {};
161 // Wether or not a given state has been updated.
162 std::array<bool, MAX_STATES> updated = {};
163 // Wether or not a given state is enabled
164 std::array<bool, MAX_STATES> enabled = {};
165 std::array<std::string, MAX_STATES> stateNames = {};
166 std::array<UpdateInfo, MAX_STATES> updateInfos;
167 std::array<StateSpec, MAX_STATES> stateSpecs;
168 // The last insertion point for the next state.
169 // We use this to actually process the command buffer
170 // because nextState is already pointing to the next
171 // insertion point.
172 std::atomic<int> lastInsertedState = 0;
173 // The insertion point for the next state. Notice we
174 // insert in the buffer backwards, so that on flush we iterate
175 // from the last insertion point forward.
176 std::atomic<int> nextState = STATES_BUFFER_SIZE;
177 // How many commands are currently in flight.
178 std::atomic<int> pendingStates = 0;
180 int64_t lastMetrics = 0;
181
182 // Function to retrieve an aritrary base for the realtime clock.
183 std::function<void(int64_t& base, int64_t& offset)> getRealtimeBase;
184 // Function to retrieve the timestamp from the value returned by getRealtimeBase.
185 std::function<int64_t(int64_t base, int64_t offset)> getTimestamp;
186 // The value of the uv_hrtime() at the last update.
187 int64_t realTimeBase = 0;
188 // The value of the uv_now() at the last update.
189 int64_t initialTimeOffset = 0;
190
191 // Invoke to make sure that the updatedMetricsTotal is updated.
201
202 // How many times we have invoked the processing of the command queue.
203 // Notice that we use this to order the updates, so that we need
204 // to update only once per generation, because the items are
205 // inserted in the buffer in reverse time order and only
206 // the more recent update is interesting for us.
207 std::atomic<int64_t> generation = 0;
208 // Telemetry for the metric updates and pushes
209 std::atomic<int64_t> updatedMetricsLapse = 0;
217 std::vector<short> registeredStates = {};
218};
219
220} // namespace o2::framework
221
222#endif // O2_FRAMEWORK_DATAPROCESSINGSTATES_H_
benchmark::State & state
struct uv_loop_s uv_loop_t
GLsizeiptr size
Definition glcorearb.h:659
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum GLenum GLsizei const GLuint GLboolean enabled
Definition glcorearb.h:2513
GLboolean * data
Definition glcorearb.h:298
GLintptr offset
Definition glcorearb.h:660
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLuint id
Definition glcorearb.h:650
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
ServiceKind
The kind of service we are asking for.
ProcessingStateId
Helper struct to define some known states.
int64_t minPublishInterval
How many milliseconds must have passed since the last publishing.
std::string_view state(int64_t id) const
std::array< StateView, MAX_STATES > statesViews
The views for all the states, indexed by the state id.
static constexpr ServiceKind service_kind
std::array< StateSpec, MAX_STATES > stateSpecs
std::array< std::string, MAX_STATES > stateNames
DataProcessingStates(DataProcessingStates const &other)
std::function< int64_t(int64_t base, int64_t offset)> getTimestamp
std::array< char, STATES_BUFFER_SIZE > store
void registerState(StateSpec const &spec)
std::array< bool, MAX_STATES > updated
void flushChangedStates(std::function< void(std::string const &, int64_t, std::string_view)> const &callback)
std::function< void(int64_t &base, int64_t &offset)> getRealtimeBase
std::vector< char > statesBuffer
The buffer were we store the state before flushing it.
std::array< UpdateInfo, MAX_STATES > updateInfos
static std::function< int64_t(int64_t base, int64_t offset)> defaultCPUTimeConfigurator()
static std::function< void(int64_t &base, int64_t &offset)> defaultRealtimeBaseConfigurator(uint64_t offset, uv_loop_t *loop)
VectorOfTObjectPtrs other