Project
Loading...
Searching...
No Matches
DataProcessingStats.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef O2_FRAMEWORK_DATAPROCESSINGSTATS_H_
12#define O2_FRAMEWORK_DATAPROCESSINGSTATS_H_
13
14#include "DeviceState.h"
16#include <atomic>
17#include <cstdint>
18#include <array>
19#include <memory>
20#include <numeric>
21#include <mutex>
22#include <utility>
23
24namespace o2::framework
25{
26
79
82 // Parameters for the default behaviour
86
88
89 DataProcessingStats(std::function<void(int64_t& base, int64_t& offset)> getRealtimeBase,
90 std::function<int64_t(int64_t base, int64_t offset)> getTimestamp,
91 DefaultConfig config);
92
94 constexpr static unsigned short MAX_METRICS = 1 << 15;
95 constexpr static short MAX_CMDS = 64;
96
97 enum struct Op : char {
98 Nop,
99 Set,
103 Add,
104 Sub,
105 Max,
106 Min
107 };
108
109 // Kind of the metric. This is used to know how to interpret the value
110 enum struct Kind : char {
111 Int,
112 UInt64,
113 Double,
114 Rate,
117 Unknown,
118 };
119
120 // The scope for a given metric. DPL is used for the DPL Monitoring GUI,
121 // Online is used for the online monitoring.
122 enum struct Scope : char {
123 DPL,
124 Online
125 };
126
127 // This is what the user passes. Notice that there is no
128 // need to specify the timestamp, because we calculate it for them
129 // using the delta between the last update and the current time.
130 struct CommandSpec {
131 unsigned short id = 0;
134 };
135
136 // This is the structure to keep track of local updates to the stats.
137 // Each command will be queued in a buffer and then flushed to the
138 // global stats either when the buffer is full (after MAX_CMDS commands)
139 // or when the queue is flushed explicitly via the processQueue() method.
140 struct Command {
141 unsigned short id = 0; // StatsId of the metric to update
142 int64_t value = 0; // Value to update the metric with
143 int64_t timestamp = 0; // Timestamp of the update
144 Op op = Op::Nop; // Operation to perform to do the update
145 };
146
147 // This structure is used to keep track of the last updates
148 // for each of the metrics. This can be used to defer the need
149 // to flush the buffers to the remote end, so that we do not need to
150 // send metrics synchronously but we can do e.g. as a batch update.
151 // It also prevents that we send the same metric multiple times, because
152 // we keep track of the time of the last update.
153 struct UpdateInfo {
154 int64_t timestamp = 0; // When the update actually took place
155 int64_t lastPublished = 0; // When the update was last published
156 };
157
158 struct MetricSpec {
159 // Id of the metric. It must match the index in the metrics array.
160 // Name of the metric
161 std::string name = "";
162 // Wether or not the metric is enabled
163 bool enabled = true;
164 int metricId = -1;
175 uint64_t maxRefreshLatency = -1;
178 bool sendInitialValue = false;
179 };
180
181 void registerMetric(MetricSpec const& spec);
182
183 // Update some stats as specified by the @cmd cmd
184 void updateStats(CommandSpec cmd);
185
186 char const* findMetricNameById(ProcessingStatsId id) const;
189 void processCommandQueue();
190
191 void flushChangedMetrics(std::function<void(MetricSpec const&, int64_t, int64_t)> const& callback);
192
193 std::atomic<size_t> statesSize = 0;
194
195 std::array<Command, MAX_CMDS> cmds = {};
196 std::array<int64_t, MAX_METRICS> metrics = {};
197 std::array<bool, MAX_METRICS> updated = {};
198 std::array<std::string, MAX_METRICS> metricsNames = {};
199 std::array<UpdateInfo, MAX_METRICS> updateInfos = {};
200 std::array<MetricSpec, MAX_METRICS> metricSpecs = {};
201 std::array<int64_t, MAX_METRICS> lastPublishedMetrics = {};
202 std::vector<int> availableMetrics;
203 // for fast check for AVAILABLE_MANAGED_SHM metric which is only provided for readout-proxy
204 bool hasAvailSHMMetric = false;
205 // How many commands have been committed to the queue.
206 std::atomic<int> insertedCmds = 0;
207 // The insertion point for the next command.
208 std::atomic<int> nextCmd = 0;
209 // How many commands are currently in flight.
210 std::atomic<int> pendingCmds = 0;
213 // This is the mutex to protect the queue of commands.
214 std::mutex mMutex;
215
216 // Function to retrieve an aritrary base for the realtime clock.
217 std::function<void(int64_t& base, int64_t& offset)> getRealtimeBase;
218 // Function to retrieve the timestamp from the value returned by getRealtimeBase.
220 // The value of the uv_hrtime() at the last update.
222 // The value of the uv_now() at the last update.
224
225 // Invoke to make sure that the updatedMetricsTotal is updated.
235
236 // Telemetry for the metric updates and pushes
237 std::atomic<int64_t> updatedMetricsLapse = 0;
245};
246
247} // namespace o2::framework
248
249#endif // O2_FRAMEWORK_DATAPROCESSINGSTATS_H_
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum GLenum GLsizei const GLuint GLboolean enabled
Definition glcorearb.h:2513
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
Definition glcorearb.h:5500
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLintptr offset
Definition glcorearb.h:660
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
ServiceKind
The kind of service we are asking for.
int64_t minPublishInterval
How many milliseconds must have passed since the last publishing.
int64_t defaultValue
The default value for the metric.
Helper struct to hold statistics about the data processing happening.
std::array< bool, MAX_METRICS > updated
char const * findMetricNameById(ProcessingStatsId id) const
void registerMetric(MetricSpec const &spec)
std::array< MetricSpec, MAX_METRICS > metricSpecs
std::function< int64_t(int64_t base, int64_t offset)> getTimestamp
@ SetIfPositive
Set the value to the specified value.
@ InstantaneousRate
Update the rate of the metric given the cumulative value since last time it got published.
@ Max
Subtract the value from the current value.
@ Min
Set the value to the maximum of the current value and the specified value.
@ CumulativeRate
Set the value to the specified value if it is positive.
@ Sub
Add the value to the current value.
@ Add
Update the rate of the metric given the amount since the last time.
std::array< std::string, MAX_METRICS > metricsNames
std::array< Command, MAX_CMDS > cmds
void flushChangedMetrics(std::function< void(MetricSpec const &, int64_t, int64_t)> const &callback)
std::array< int64_t, MAX_METRICS > lastPublishedMetrics
std::function< void(int64_t &base, int64_t &offset)> getRealtimeBase
static constexpr unsigned short MAX_METRICS
std::array< UpdateInfo, MAX_METRICS > updateInfos
static constexpr ServiceKind service_kind