Project
Loading...
Searching...
No Matches
DataProcessingStats.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef O2_FRAMEWORK_DATAPROCESSINGSTATS_H_
12#define O2_FRAMEWORK_DATAPROCESSINGSTATS_H_
13
14#include "DeviceState.h"
16#include <atomic>
17#include <cstdint>
18#include <array>
19#include <memory>
20#include <numeric>
21#include <mutex>
22#include <utility>
23
24namespace o2::framework
25{
26
70
73 // Parameters for the default behaviour
76 };
77
79
80 DataProcessingStats(std::function<void(int64_t& base, int64_t& offset)> getRealtimeBase,
81 std::function<int64_t(int64_t base, int64_t offset)> getTimestamp,
82 DefaultConfig config);
83
85 constexpr static unsigned short MAX_METRICS = 1 << 15;
86 constexpr static short MAX_CMDS = 64;
87
88 enum struct Op : char {
89 Nop,
90 Set,
94 Add,
95 Sub,
96 Max,
97 Min
98 };
99
100 // Kind of the metric. This is used to know how to interpret the value
101 enum struct Kind : char {
102 Int,
103 UInt64,
104 Double,
105 Rate,
108 Unknown,
109 };
110
111 // The scope for a given metric. DPL is used for the DPL Monitoring GUI,
112 // Online is used for the online monitoring.
113 enum struct Scope : char {
114 DPL,
115 Online
116 };
117
118 // This is what the user passes. Notice that there is no
119 // need to specify the timestamp, because we calculate it for them
120 // using the delta between the last update and the current time.
121 struct CommandSpec {
122 unsigned short id = 0;
124 int64_t value = 0;
125 };
126
127 // This is the structure to keep track of local updates to the stats.
128 // Each command will be queued in a buffer and then flushed to the
129 // global stats either when the buffer is full (after MAX_CMDS commands)
130 // or when the queue is flushed explicitly via the processQueue() method.
131 struct Command {
132 unsigned short id = 0; // StatsId of the metric to update
133 int64_t value = 0; // Value to update the metric with
134 int64_t timestamp = 0; // Timestamp of the update
135 Op op = Op::Nop; // Operation to perform to do the update
136 };
137
138 // This structure is used to keep track of the last updates
139 // for each of the metrics. This can be used to defer the need
140 // to flush the buffers to the remote end, so that we do not need to
141 // send metrics synchronously but we can do e.g. as a batch update.
142 // It also prevents that we send the same metric multiple times, because
143 // we keep track of the time of the last update.
144 struct UpdateInfo {
145 int64_t timestamp = 0; // When the update actually took place
146 int64_t lastPublished = 0; // When the update was last published
147 };
148
149 struct MetricSpec {
150 // Id of the metric. It must match the index in the metrics array.
151 // Name of the metric
152 std::string name = "";
153 // Wether or not the metric is enabled
154 bool enabled = true;
155 int metricId = -1;
161 int64_t defaultValue = 0;
166 uint64_t maxRefreshLatency = -1;
169 bool sendInitialValue = false;
170 };
171
172 void registerMetric(MetricSpec const& spec);
173 // Update some stats as specified by the @cmd cmd
174 void updateStats(CommandSpec cmd);
175
178 void processCommandQueue();
179
180 void flushChangedMetrics(std::function<void(MetricSpec const&, int64_t, int64_t)> const& callback);
181
182 std::atomic<size_t> statesSize = 0;
183
184 std::array<Command, MAX_CMDS> cmds = {};
185 std::array<int64_t, MAX_METRICS> metrics = {};
186 std::array<bool, MAX_METRICS> updated = {};
187 std::array<std::string, MAX_METRICS> metricsNames = {};
188 std::array<UpdateInfo, MAX_METRICS> updateInfos = {};
189 std::array<MetricSpec, MAX_METRICS> metricSpecs = {};
190 std::array<int64_t, MAX_METRICS> lastPublishedMetrics = {};
191 std::vector<int> availableMetrics;
192 // How many commands have been committed to the queue.
193 std::atomic<int> insertedCmds = 0;
194 // The insertion point for the next command.
195 std::atomic<int> nextCmd = 0;
196 // How many commands are currently in flight.
197 std::atomic<int> pendingCmds = 0;
199 int64_t lastMetrics = 0;
200 // This is the mutex to protect the queue of commands.
201 std::mutex mMutex;
202
203 // Function to retrieve an aritrary base for the realtime clock.
204 std::function<void(int64_t& base, int64_t& offset)> getRealtimeBase;
205 // Function to retrieve the timestamp from the value returned by getRealtimeBase.
206 std::function<int64_t(int64_t base, int64_t offset)> getTimestamp;
207 // The value of the uv_hrtime() at the last update.
208 int64_t realTimeBase = 0;
209 // The value of the uv_now() at the last update.
210 int64_t initialTimeOffset = 0;
211
212 // Invoke to make sure that the updatedMetricsTotal is updated.
222
223 // Telemetry for the metric updates and pushes
224 std::atomic<int64_t> updatedMetricsLapse = 0;
232};
233
234} // namespace o2::framework
235
236#endif // O2_FRAMEWORK_DATAPROCESSINGSTATS_H_
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum GLenum GLsizei const GLuint GLboolean enabled
Definition glcorearb.h:2513
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
Definition glcorearb.h:5500
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLintptr offset
Definition glcorearb.h:660
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
ServiceKind
The kind of service we are asking for.
int64_t minPublishInterval
How many milliseconds must have passed since the last publishing.
int64_t defaultValue
The default value for the metric.
Helper struct to hold statistics about the data processing happening.
std::array< bool, MAX_METRICS > updated
void registerMetric(MetricSpec const &spec)
std::array< MetricSpec, MAX_METRICS > metricSpecs
std::function< int64_t(int64_t base, int64_t offset)> getTimestamp
@ SetIfPositive
Set the value to the specified value.
@ InstantaneousRate
Update the rate of the metric given the cumulative value since last time it got published.
@ Max
Subtract the value from the current value.
@ Min
Set the value to the maximum of the current value and the specified value.
@ CumulativeRate
Set the value to the specified value if it is positive.
@ Sub
Add the value to the current value.
@ Add
Update the rate of the metric given the amount since the last time.
std::array< std::string, MAX_METRICS > metricsNames
std::array< Command, MAX_CMDS > cmds
void flushChangedMetrics(std::function< void(MetricSpec const &, int64_t, int64_t)> const &callback)
std::array< int64_t, MAX_METRICS > lastPublishedMetrics
std::function< void(int64_t &base, int64_t &offset)> getRealtimeBase
static constexpr unsigned short MAX_METRICS
std::array< UpdateInfo, MAX_METRICS > updateInfos
static constexpr ServiceKind service_kind