Project
Loading...
Searching...
No Matches
DataProcessingStats.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef O2_FRAMEWORK_DATAPROCESSINGSTATS_H_
12#define O2_FRAMEWORK_DATAPROCESSINGSTATS_H_
13
14#include "DeviceState.h"
16#include <atomic>
17#include <cstdint>
18#include <array>
19#include <memory>
20#include <numeric>
21#include <mutex>
22#include <utility>
23
24namespace o2::framework
25{
26
74
77 // Parameters for the default behaviour
80 };
81
83
84 DataProcessingStats(std::function<void(int64_t& base, int64_t& offset)> getRealtimeBase,
85 std::function<int64_t(int64_t base, int64_t offset)> getTimestamp,
86 DefaultConfig config);
87
89 constexpr static unsigned short MAX_METRICS = 1 << 15;
90 constexpr static short MAX_CMDS = 64;
91
92 enum struct Op : char {
93 Nop,
94 Set,
98 Add,
99 Sub,
100 Max,
101 Min
102 };
103
104 // Kind of the metric. This is used to know how to interpret the value
105 enum struct Kind : char {
106 Int,
107 UInt64,
108 Double,
109 Rate,
112 Unknown,
113 };
114
115 // The scope for a given metric. DPL is used for the DPL Monitoring GUI,
116 // Online is used for the online monitoring.
117 enum struct Scope : char {
118 DPL,
119 Online
120 };
121
122 // This is what the user passes. Notice that there is no
123 // need to specify the timestamp, because we calculate it for them
124 // using the delta between the last update and the current time.
125 struct CommandSpec {
126 unsigned short id = 0;
128 int64_t value = 0;
129 };
130
131 // This is the structure to keep track of local updates to the stats.
132 // Each command will be queued in a buffer and then flushed to the
133 // global stats either when the buffer is full (after MAX_CMDS commands)
134 // or when the queue is flushed explicitly via the processQueue() method.
135 struct Command {
136 unsigned short id = 0; // StatsId of the metric to update
137 int64_t value = 0; // Value to update the metric with
138 int64_t timestamp = 0; // Timestamp of the update
139 Op op = Op::Nop; // Operation to perform to do the update
140 };
141
142 // This structure is used to keep track of the last updates
143 // for each of the metrics. This can be used to defer the need
144 // to flush the buffers to the remote end, so that we do not need to
145 // send metrics synchronously but we can do e.g. as a batch update.
146 // It also prevents that we send the same metric multiple times, because
147 // we keep track of the time of the last update.
148 struct UpdateInfo {
149 int64_t timestamp = 0; // When the update actually took place
150 int64_t lastPublished = 0; // When the update was last published
151 };
152
153 struct MetricSpec {
154 // Id of the metric. It must match the index in the metrics array.
155 // Name of the metric
156 std::string name = "";
157 // Wether or not the metric is enabled
158 bool enabled = true;
159 int metricId = -1;
165 int64_t defaultValue = 0;
170 uint64_t maxRefreshLatency = -1;
173 bool sendInitialValue = false;
174 };
175
176 void registerMetric(MetricSpec const& spec);
177
178 // Update some stats as specified by the @cmd cmd
179 void updateStats(CommandSpec cmd);
180
181 char const* findMetricNameById(ProcessingStatsId id) const;
184 void processCommandQueue();
185
186 void flushChangedMetrics(std::function<void(MetricSpec const&, int64_t, int64_t)> const& callback);
187
188 std::atomic<size_t> statesSize = 0;
189
190 std::array<Command, MAX_CMDS> cmds = {};
191 std::array<int64_t, MAX_METRICS> metrics = {};
192 std::array<bool, MAX_METRICS> updated = {};
193 std::array<std::string, MAX_METRICS> metricsNames = {};
194 std::array<UpdateInfo, MAX_METRICS> updateInfos = {};
195 std::array<MetricSpec, MAX_METRICS> metricSpecs = {};
196 std::array<int64_t, MAX_METRICS> lastPublishedMetrics = {};
197 std::vector<int> availableMetrics;
198 // for fast check for AVAILABLE_MANAGED_SHM metric which is only provided for readout-proxy
199 bool hasAvailSHMMetric = false;
200 // How many commands have been committed to the queue.
201 std::atomic<int> insertedCmds = 0;
202 // The insertion point for the next command.
203 std::atomic<int> nextCmd = 0;
204 // How many commands are currently in flight.
205 std::atomic<int> pendingCmds = 0;
207 int64_t lastMetrics = 0;
208 // This is the mutex to protect the queue of commands.
209 std::mutex mMutex;
210
211 // Function to retrieve an aritrary base for the realtime clock.
212 std::function<void(int64_t& base, int64_t& offset)> getRealtimeBase;
213 // Function to retrieve the timestamp from the value returned by getRealtimeBase.
214 std::function<int64_t(int64_t base, int64_t offset)> getTimestamp;
215 // The value of the uv_hrtime() at the last update.
216 int64_t realTimeBase = 0;
217 // The value of the uv_now() at the last update.
218 int64_t initialTimeOffset = 0;
219
220 // Invoke to make sure that the updatedMetricsTotal is updated.
230
231 // Telemetry for the metric updates and pushes
232 std::atomic<int64_t> updatedMetricsLapse = 0;
240};
241
242} // namespace o2::framework
243
244#endif // O2_FRAMEWORK_DATAPROCESSINGSTATS_H_
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum GLenum GLsizei const GLuint GLboolean enabled
Definition glcorearb.h:2513
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
Definition glcorearb.h:5500
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLintptr offset
Definition glcorearb.h:660
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
Defining PrimaryVertex explicitly as messageable.
ServiceKind
The kind of service we are asking for.
int64_t minPublishInterval
How many milliseconds must have passed since the last publishing.
int64_t defaultValue
The default value for the metric.
Helper struct to hold statistics about the data processing happening.
std::array< bool, MAX_METRICS > updated
char const * findMetricNameById(ProcessingStatsId id) const
void registerMetric(MetricSpec const &spec)
std::array< MetricSpec, MAX_METRICS > metricSpecs
std::function< int64_t(int64_t base, int64_t offset)> getTimestamp
@ SetIfPositive
Set the value to the specified value.
@ InstantaneousRate
Update the rate of the metric given the cumulative value since last time it got published.
@ Max
Subtract the value from the current value.
@ Min
Set the value to the maximum of the current value and the specified value.
@ CumulativeRate
Set the value to the specified value if it is positive.
@ Sub
Add the value to the current value.
@ Add
Update the rate of the metric given the amount since the last time.
std::array< std::string, MAX_METRICS > metricsNames
std::array< Command, MAX_CMDS > cmds
void flushChangedMetrics(std::function< void(MetricSpec const &, int64_t, int64_t)> const &callback)
std::array< int64_t, MAX_METRICS > lastPublishedMetrics
std::function< void(int64_t &base, int64_t &offset)> getRealtimeBase
static constexpr unsigned short MAX_METRICS
std::array< UpdateInfo, MAX_METRICS > updateInfos
static constexpr ServiceKind service_kind