Project
Loading...
Searching...
No Matches
DataProcessingStats.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef O2_FRAMEWORK_DATAPROCESSINGSTATS_H_
12#define O2_FRAMEWORK_DATAPROCESSINGSTATS_H_
13
14#include "DeviceState.h"
16#include <atomic>
17#include <cstdint>
18#include <array>
19#include <memory>
20#include <numeric>
21#include <mutex>
22#include <utility>
23
24namespace o2::framework
25{
26
77
80 // Parameters for the default behaviour
84
86
87 DataProcessingStats(std::function<void(int64_t& base, int64_t& offset)> getRealtimeBase,
88 std::function<int64_t(int64_t base, int64_t offset)> getTimestamp,
89 DefaultConfig config);
90
92 constexpr static unsigned short MAX_METRICS = 1 << 15;
93 constexpr static short MAX_CMDS = 64;
94
95 enum struct Op : char {
96 Nop,
97 Set,
101 Add,
102 Sub,
103 Max,
104 Min
105 };
106
107 // Kind of the metric. This is used to know how to interpret the value
108 enum struct Kind : char {
109 Int,
110 UInt64,
111 Double,
112 Rate,
115 Unknown,
116 };
117
118 // The scope for a given metric. DPL is used for the DPL Monitoring GUI,
119 // Online is used for the online monitoring.
120 enum struct Scope : char {
121 DPL,
122 Online
123 };
124
125 // This is what the user passes. Notice that there is no
126 // need to specify the timestamp, because we calculate it for them
127 // using the delta between the last update and the current time.
128 struct CommandSpec {
129 unsigned short id = 0;
132 };
133
134 // This is the structure to keep track of local updates to the stats.
135 // Each command will be queued in a buffer and then flushed to the
136 // global stats either when the buffer is full (after MAX_CMDS commands)
137 // or when the queue is flushed explicitly via the processQueue() method.
138 struct Command {
139 unsigned short id = 0; // StatsId of the metric to update
140 int64_t value = 0; // Value to update the metric with
141 int64_t timestamp = 0; // Timestamp of the update
142 Op op = Op::Nop; // Operation to perform to do the update
143 };
144
145 // This structure is used to keep track of the last updates
146 // for each of the metrics. This can be used to defer the need
147 // to flush the buffers to the remote end, so that we do not need to
148 // send metrics synchronously but we can do e.g. as a batch update.
149 // It also prevents that we send the same metric multiple times, because
150 // we keep track of the time of the last update.
151 struct UpdateInfo {
152 int64_t timestamp = 0; // When the update actually took place
153 int64_t lastPublished = 0; // When the update was last published
154 };
155
156 struct MetricSpec {
157 // Id of the metric. It must match the index in the metrics array.
158 // Name of the metric
159 std::string name = "";
160 // Wether or not the metric is enabled
161 bool enabled = true;
162 int metricId = -1;
173 uint64_t maxRefreshLatency = -1;
176 bool sendInitialValue = false;
177 };
178
179 void registerMetric(MetricSpec const& spec);
180
181 // Update some stats as specified by the @cmd cmd
182 void updateStats(CommandSpec cmd);
183
184 char const* findMetricNameById(ProcessingStatsId id) const;
187 void processCommandQueue();
188
189 void flushChangedMetrics(std::function<void(MetricSpec const&, int64_t, int64_t)> const& callback);
190
191 std::atomic<size_t> statesSize = 0;
192
193 std::array<Command, MAX_CMDS> cmds = {};
194 std::array<int64_t, MAX_METRICS> metrics = {};
195 std::array<bool, MAX_METRICS> updated = {};
196 std::array<std::string, MAX_METRICS> metricsNames = {};
197 std::array<UpdateInfo, MAX_METRICS> updateInfos = {};
198 std::array<MetricSpec, MAX_METRICS> metricSpecs = {};
199 std::array<int64_t, MAX_METRICS> lastPublishedMetrics = {};
200 std::vector<int> availableMetrics;
201 // for fast check for AVAILABLE_MANAGED_SHM metric which is only provided for readout-proxy
202 bool hasAvailSHMMetric = false;
203 // How many commands have been committed to the queue.
204 std::atomic<int> insertedCmds = 0;
205 // The insertion point for the next command.
206 std::atomic<int> nextCmd = 0;
207 // How many commands are currently in flight.
208 std::atomic<int> pendingCmds = 0;
211 // This is the mutex to protect the queue of commands.
212 std::mutex mMutex;
213
214 // Function to retrieve an aritrary base for the realtime clock.
215 std::function<void(int64_t& base, int64_t& offset)> getRealtimeBase;
216 // Function to retrieve the timestamp from the value returned by getRealtimeBase.
218 // The value of the uv_hrtime() at the last update.
220 // The value of the uv_now() at the last update.
222
223 // Invoke to make sure that the updatedMetricsTotal is updated.
233
234 // Telemetry for the metric updates and pushes
235 std::atomic<int64_t> updatedMetricsLapse = 0;
243};
244
245} // namespace o2::framework
246
247#endif // O2_FRAMEWORK_DATAPROCESSINGSTATS_H_
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum GLenum GLsizei const GLuint GLboolean enabled
Definition glcorearb.h:2513
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
Definition glcorearb.h:5500
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLintptr offset
Definition glcorearb.h:660
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
Defining PrimaryVertex explicitly as messageable.
ServiceKind
The kind of service we are asking for.
int64_t minPublishInterval
How many milliseconds must have passed since the last publishing.
int64_t defaultValue
The default value for the metric.
Helper struct to hold statistics about the data processing happening.
std::array< bool, MAX_METRICS > updated
char const * findMetricNameById(ProcessingStatsId id) const
void registerMetric(MetricSpec const &spec)
std::array< MetricSpec, MAX_METRICS > metricSpecs
std::function< int64_t(int64_t base, int64_t offset)> getTimestamp
@ SetIfPositive
Set the value to the specified value.
@ InstantaneousRate
Update the rate of the metric given the cumulative value since last time it got published.
@ Max
Subtract the value from the current value.
@ Min
Set the value to the maximum of the current value and the specified value.
@ CumulativeRate
Set the value to the specified value if it is positive.
@ Sub
Add the value to the current value.
@ Add
Update the rate of the metric given the amount since the last time.
std::array< std::string, MAX_METRICS > metricsNames
std::array< Command, MAX_CMDS > cmds
void flushChangedMetrics(std::function< void(MetricSpec const &, int64_t, int64_t)> const &callback)
std::array< int64_t, MAX_METRICS > lastPublishedMetrics
std::function< void(int64_t &base, int64_t &offset)> getRealtimeBase
static constexpr unsigned short MAX_METRICS
std::array< UpdateInfo, MAX_METRICS > updateInfos
static constexpr ServiceKind service_kind