Project
Loading...
Searching...
No Matches
DataProcessingStats.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14#include "Framework/Logger.h"
15#include <uv.h>
16#include <atomic>
17#include <thread>
18
19namespace o2::framework
20{
21
22DataProcessingStats::DataProcessingStats(std::function<void(int64_t& base, int64_t& offset)> getRealtimeBase_,
23 std::function<int64_t(int64_t base, int64_t offset)> getTimestamp_,
24 DefaultConfig config_)
25 : getTimestamp(getTimestamp_),
26 getRealtimeBase(getRealtimeBase_),
27 config(config_)
28{
30}
31
33{
34 if (metricSpecs[cmd.id].name.empty()) {
35 throw runtime_error_f("MetricID %d was not registered", (int)cmd.id);
36 }
37 if (metricSpecs[cmd.id].enabled == false) {
38 LOGP(debug, "MetricID {} is disabled", (int)cmd.id);
39 return;
40 }
41 if (cmd.id >= metrics.size()) {
42 throw runtime_error_f("MetricID %d is out of range", (int)cmd.id);
43 }
44 if (cmd.id >= updateInfos.size()) {
45 throw runtime_error_f("MetricID %d is out of range", (int)cmd.id);
46 }
48 // Add a static mutex to protect the queue
49 // Get the next available operation in an atomic way.
50 auto idx = nextCmd.fetch_add(1, std::memory_order_relaxed);
51 if (idx == cmds.size()) {
52 // We abort this command
54 while (pendingCmds.load(std::memory_order_relaxed) > 0) {
55 // We need to wait for all the pending commands to be processed.
56 // This is needed because we are going to flush the queue.
57 // We cannot flush the queue while there are pending commands
58 // as we might end up with a command being processed twice.
59 std::this_thread::sleep_for(std::chrono::milliseconds(1));
60 }
62 insertedCmds.store(0, std::memory_order_relaxed);
63 nextCmd.store(0, std::memory_order_relaxed);
65 idx = nextCmd.fetch_add(1, std::memory_order_relaxed);
66 } else if (idx > cmds.size()) {
67 while (cmds.size()) {
68 // We need to wait for the flushing of the queue
69 std::this_thread::sleep_for(std::chrono::milliseconds(1));
70 idx = nextCmd.load(std::memory_order_relaxed);
71 }
72 return updateStats(cmd);
73 }
74 // Save the command.
75 assert(idx < cmds.size());
76 assert(cmd.id < metrics.size());
80 // Keep track of the number of commands we have received.
82}
83
85{
86 std::array<char, MAX_CMDS> order;
87 // The range cannot be larger than the number of commands we have.
88 auto range = insertedCmds.load(std::memory_order_relaxed);
89 if (insertedCmds.load(std::memory_order_relaxed) == 0) {
90 return;
91 }
92
93 std::iota(order.begin(), order.begin() + range, 0);
94 // Shuffle the order in which we will process the commands based
95 // on their timestamp. If two commands are inserted at the same
96 // time, we expect to process them in the order they were inserted.
97 std::stable_sort(order.begin(), order.begin() + range, [this](char a, char b) {
98 return cmds[a].timestamp < cmds[b].timestamp;
99 });
100
101 // Process the commands in the order we have just computed.
102 for (int i = 0; i < range; ++i) {
103 auto& cmd = cmds[i];
104 assert(cmd.id < updateInfos.size());
105 auto& update = updateInfos[cmd.id];
106 switch (cmd.op) {
107 case Op::Nop:
108 break;
109 case Op::Set:
110 if (cmd.value != metrics[cmd.id] && cmd.timestamp >= update.timestamp) {
111 metrics[cmd.id] = cmd.value;
112 updated[cmd.id] = true;
113 update.timestamp = cmd.timestamp;
115 }
116 break;
117 case Op::Add:
118 if (cmd.value) {
119 metrics[cmd.id] += cmd.value;
120 updated[cmd.id] = true;
121 update.timestamp = cmd.timestamp;
123 }
124 break;
125 case Op::Sub:
126 if (cmd.value) {
127 metrics[cmd.id] -= cmd.value;
128 updated[cmd.id] = true;
129 update.timestamp = cmd.timestamp;
131 }
132 break;
133 case Op::Max:
134 if (cmd.value > metrics[cmd.id]) {
135 metrics[cmd.id] = cmd.value;
136 updated[cmd.id] = true;
137 update.timestamp = cmd.timestamp;
139 }
140 break;
141 case Op::Min:
142 if (cmd.value < metrics[cmd.id]) {
143 metrics[cmd.id] = cmd.value;
144 updated[cmd.id] = true;
145 update.timestamp = cmd.timestamp;
147 }
148 break;
150 if (cmd.value > 0 && cmd.timestamp >= update.timestamp) {
151 metrics[cmd.id] = cmd.value;
152 updated[cmd.id] = true;
153 update.timestamp = cmd.timestamp;
155 }
156 break;
158 if (metricSpecs[cmd.id].kind != Kind::Rate) {
159 throw runtime_error_f("MetricID %d is not a rate", (int)cmd.id);
160 }
161 // We keep setting the value to the time average of the previous
162 // update period. so that we can compute the average over time
163 // at the moment of publishing.
164 metrics[cmd.id] = cmd.value;
165 updated[cmd.id] = true;
166 if (update.timestamp == 0) {
167 update.timestamp = cmd.timestamp;
168 }
170 } break;
171 case Op::CumulativeRate: {
172 if (metricSpecs[cmd.id].kind != Kind::Rate) {
173 throw runtime_error_f("MetricID %d is not a rate", (int)cmd.id);
174 }
175 // We keep setting the value to the time average of the previous
176 // update period. so that we can compute the average over time
177 // at the moment of publishing.
178 metrics[cmd.id] += cmd.value;
179 updated[cmd.id] = true;
180 if (update.timestamp == 0) {
181 update.timestamp = cmd.timestamp;
182 }
184 } break;
185 }
186 }
187 // No one should have tried to insert more commands while processing.
188 assert(range == insertedCmds.load(std::memory_order_relaxed));
189 nextCmd.store(0, std::memory_order_relaxed);
190 insertedCmds.store(0, std::memory_order_relaxed);
191}
192
193void DataProcessingStats::flushChangedMetrics(std::function<void(DataProcessingStats::MetricSpec const&, int64_t, int64_t)> const& callback)
194{
196 bool publish = false;
197 auto currentTimestamp = getTimestamp(realTimeBase, initialTimeOffset);
198 for (size_t ami = 0; ami < availableMetrics.size(); ++ami) {
199 int mi = availableMetrics[ami];
200 auto& update = updateInfos[mi];
201 MetricSpec& spec = metricSpecs[mi];
202 if (spec.name.empty()) {
203 continue;
204 }
205 if (spec.enabled == false) {
206 LOGP(debug, "Metric {} is disabled", spec.name);
207 continue;
208 }
209 if (updated[mi] == false && currentTimestamp - update.timestamp > spec.maxRefreshLatency) {
210 updated[mi] = true;
211 update.timestamp = currentTimestamp;
212 }
213 if (updated[mi] == false) {
214 continue;
215 }
216 if (currentTimestamp - update.lastPublished < spec.minPublishInterval) {
217 continue;
218 }
219 publish = true;
220 if (spec.kind == Kind::Unknown) {
221 LOGP(fatal, "Metric {} has unknown kind", spec.name);
222 }
223 if (spec.kind == Kind::Rate) {
224 if (currentTimestamp - update.timestamp == 0) {
225 callback(spec, update.timestamp, 0);
226 } else {
227 // Timestamp is in milliseconds, we want to convert to seconds.
228 callback(spec, update.timestamp, (1000 * (metrics[mi] - lastPublishedMetrics[mi])) / (currentTimestamp - update.timestamp));
229 }
230 update.timestamp = currentTimestamp; // We reset the timestamp to the current time.
231 } else {
232 callback(spec, update.timestamp, metrics[mi]);
233 }
236 update.lastPublished = currentTimestamp;
237 updated[mi] = false;
238 }
239 if (publish) {
241 }
242 static int64_t startTime = uv_hrtime();
243 int64_t now = uv_hrtime();
244
245 auto timeDelta = std::max(int64_t(1), now - startTime); // min 1 unit of time to exclude division by 0
246 double averageInvocations = (publishingInvokedTotal * 1000000000) / timeDelta;
247 double averagePublishing = (publishedMetricsLapse * 1000000000) / timeDelta;
248
249 LOGP(debug, "Publishing invoked {} times / s, {} metrics published / s", (int)averageInvocations, (int)averagePublishing);
250}
251
253{
254 if (spec.name.size() == 0) {
255 throw runtime_error("Metric name cannot be empty.");
256 }
257 if (spec.metricId >= metricSpecs.size()) {
258 throw runtime_error_f("Metric id %d is out of range. Max is %d", spec.metricId, metricSpecs.size());
259 }
260 if (metricSpecs[spec.metricId].name.size() != 0 && spec.name != metricSpecs[spec.metricId].name) {
261 auto currentName = metricSpecs[spec.metricId].name;
262 throw runtime_error_f("Metric %d already registered with name %s", spec.metricId, currentName.data(), spec.name.data());
263 }
264 auto currentMetric = std::find_if(metricSpecs.begin(), metricSpecs.end(), [&spec](MetricSpec const& s) { return s.name == spec.name && s.metricId != spec.metricId; });
265 if (currentMetric != metricSpecs.end()) {
266 throw runtime_error_f("Metric %s already registered with id %d. Cannot reregister with %d.", spec.name.data(), currentMetric->metricId, spec.metricId);
267 }
268 metricSpecs[spec.metricId] = spec;
269 metricsNames[spec.metricId] = spec.name;
270 metrics[spec.metricId] = spec.defaultValue;
271 if (metricSpecs[spec.metricId].scope == Scope::Online) {
272 metricSpecs[spec.metricId].minPublishInterval = std::max(metricSpecs[spec.metricId].minPublishInterval, config.minOnlinePublishInterval);
273 }
274 int64_t currentTime = getTimestamp(realTimeBase, initialTimeOffset);
275 updateInfos[spec.metricId] = UpdateInfo{currentTime, currentTime};
276 updated[spec.metricId] = spec.sendInitialValue;
277 availableMetrics.push_back(spec.metricId);
278}
279
280} // namespace o2::framework
int32_t i
bool publish(std::string const &filename, std::string const &path, std::string CCDBpath)
Definition GRPTool.cxx:198
std::ostringstream debug
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
Definition glcorearb.h:5500
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLenum GLint * range
Definition glcorearb.h:1899
GLintptr offset
Definition glcorearb.h:660
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RuntimeErrorRef runtime_error(const char *)
RuntimeErrorRef runtime_error_f(const char *,...)
int64_t minPublishInterval
How many milliseconds must have passed since the last publishing.
int64_t defaultValue
The default value for the metric.
std::array< bool, MAX_METRICS > updated
void registerMetric(MetricSpec const &spec)
std::array< MetricSpec, MAX_METRICS > metricSpecs
std::function< int64_t(int64_t base, int64_t offset)> getTimestamp
@ SetIfPositive
Set the value to the specified value.
@ InstantaneousRate
Update the rate of the metric given the cumulative value since last time it got published.
@ Max
Subtract the value from the current value.
@ Min
Set the value to the maximum of the current value and the specified value.
@ CumulativeRate
Set the value to the specified value if it is positive.
@ Sub
Add the value to the current value.
@ Add
Update the rate of the metric given the amount since the last time.
std::array< std::string, MAX_METRICS > metricsNames
std::array< Command, MAX_CMDS > cmds
void flushChangedMetrics(std::function< void(MetricSpec const &, int64_t, int64_t)> const &callback)
std::array< int64_t, MAX_METRICS > lastPublishedMetrics
DataProcessingStats(std::function< void(int64_t &base, int64_t &offset)> getRealtimeBase, std::function< int64_t(int64_t base, int64_t offset)> getTimestamp, DefaultConfig config)
std::function< void(int64_t &base, int64_t &offset)> getRealtimeBase
std::array< UpdateInfo, MAX_METRICS > updateInfos