Project
Loading...
Searching...
No Matches
DataProcessingStats.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14#include "Framework/Logger.h"
15#include <uv.h>
16#include <atomic>
17#include <thread>
18
19namespace o2::framework
20{
21
22DataProcessingStats::DataProcessingStats(std::function<void(int64_t& base, int64_t& offset)> getRealtimeBase_,
23 std::function<int64_t(int64_t base, int64_t offset)> getTimestamp_,
24 DefaultConfig config_)
25 : getTimestamp(getTimestamp_),
26 getRealtimeBase(getRealtimeBase_),
27 config(config_)
28{
30}
31
33{
34 for (auto& spec : metricSpecs) {
35 if (spec.metricId == (int)id) {
36 return spec.name.c_str();
37 }
38 }
39 return nullptr;
40}
41
43{
44 if (metricSpecs[cmd.id].name.empty()) {
45 throw runtime_error_f("MetricID %d was not registered", (int)cmd.id);
46 }
47 if (metricSpecs[cmd.id].enabled == false) {
48 LOGP(debug, "MetricID {} is disabled", (int)cmd.id);
49 return;
50 }
51 if (cmd.id >= metrics.size()) {
52 throw runtime_error_f("MetricID %d is out of range", (int)cmd.id);
53 }
54 if (cmd.id >= updateInfos.size()) {
55 throw runtime_error_f("MetricID %d is out of range", (int)cmd.id);
56 }
58 // Add a static mutex to protect the queue
59 // Get the next available operation in an atomic way.
60 auto idx = nextCmd.fetch_add(1, std::memory_order_relaxed);
61 if (idx == cmds.size()) {
62 // We abort this command
64 while (pendingCmds.load(std::memory_order_relaxed) > 0) {
65 // We need to wait for all the pending commands to be processed.
66 // This is needed because we are going to flush the queue.
67 // We cannot flush the queue while there are pending commands
68 // as we might end up with a command being processed twice.
69 std::this_thread::sleep_for(std::chrono::milliseconds(1));
70 }
72 insertedCmds.store(0, std::memory_order_relaxed);
73 nextCmd.store(0, std::memory_order_relaxed);
75 idx = nextCmd.fetch_add(1, std::memory_order_relaxed);
76 } else if (idx > cmds.size()) {
77 while (cmds.size()) {
78 // We need to wait for the flushing of the queue
79 std::this_thread::sleep_for(std::chrono::milliseconds(1));
80 idx = nextCmd.load(std::memory_order_relaxed);
81 }
82 return updateStats(cmd);
83 }
84 // Save the command.
85 assert(idx < cmds.size());
86 assert(cmd.id < metrics.size());
90 // Keep track of the number of commands we have received.
92}
93
95{
96 std::array<char, MAX_CMDS> order;
97 // The range cannot be larger than the number of commands we have.
98 auto range = insertedCmds.load(std::memory_order_relaxed);
99 if (insertedCmds.load(std::memory_order_relaxed) == 0) {
100 return;
101 }
102
103 std::iota(order.begin(), order.begin() + range, 0);
104 // Shuffle the order in which we will process the commands based
105 // on their timestamp. If two commands are inserted at the same
106 // time, we expect to process them in the order they were inserted.
107 std::stable_sort(order.begin(), order.begin() + range, [this](char a, char b) {
108 return cmds[a].timestamp < cmds[b].timestamp;
109 });
110
111 // Process the commands in the order we have just computed.
112 for (int i = 0; i < range; ++i) {
113 auto& cmd = cmds[i];
114 assert(cmd.id < updateInfos.size());
115 auto& update = updateInfos[cmd.id];
116 switch (cmd.op) {
117 case Op::Nop:
118 break;
119 case Op::Set:
120 if (cmd.value != metrics[cmd.id] && cmd.timestamp >= update.timestamp) {
121 metrics[cmd.id] = cmd.value;
122 updated[cmd.id] = true;
123 update.timestamp = cmd.timestamp;
125 }
126 break;
127 case Op::Add:
128 if (cmd.value) {
129 metrics[cmd.id] += cmd.value;
130 updated[cmd.id] = true;
131 update.timestamp = cmd.timestamp;
133 }
134 break;
135 case Op::Sub:
136 if (cmd.value) {
137 metrics[cmd.id] -= cmd.value;
138 updated[cmd.id] = true;
139 update.timestamp = cmd.timestamp;
141 }
142 break;
143 case Op::Max:
144 if (cmd.value > metrics[cmd.id]) {
145 metrics[cmd.id] = cmd.value;
146 updated[cmd.id] = true;
147 update.timestamp = cmd.timestamp;
149 }
150 break;
151 case Op::Min:
152 if (cmd.value < metrics[cmd.id]) {
153 metrics[cmd.id] = cmd.value;
154 updated[cmd.id] = true;
155 update.timestamp = cmd.timestamp;
157 }
158 break;
160 if (cmd.value > 0 && cmd.timestamp >= update.timestamp) {
161 metrics[cmd.id] = cmd.value;
162 updated[cmd.id] = true;
163 update.timestamp = cmd.timestamp;
165 }
166 break;
168 if (metricSpecs[cmd.id].kind != Kind::Rate) {
169 throw runtime_error_f("MetricID %d is not a rate", (int)cmd.id);
170 }
171 // We keep setting the value to the time average of the previous
172 // update period. so that we can compute the average over time
173 // at the moment of publishing.
174 metrics[cmd.id] = cmd.value;
175 updated[cmd.id] = true;
176 if (update.timestamp == 0) {
177 update.timestamp = cmd.timestamp;
178 }
180 } break;
181 case Op::CumulativeRate: {
182 if (metricSpecs[cmd.id].kind != Kind::Rate) {
183 throw runtime_error_f("MetricID %d is not a rate", (int)cmd.id);
184 }
185 // We keep setting the value to the time average of the previous
186 // update period. so that we can compute the average over time
187 // at the moment of publishing.
188 metrics[cmd.id] += cmd.value;
189 updated[cmd.id] = true;
190 if (update.timestamp == 0) {
191 update.timestamp = cmd.timestamp;
192 }
194 } break;
195 }
196 }
197 // No one should have tried to insert more commands while processing.
198 assert(range == insertedCmds.load(std::memory_order_relaxed));
199 nextCmd.store(0, std::memory_order_relaxed);
200 insertedCmds.store(0, std::memory_order_relaxed);
201}
202
203void DataProcessingStats::flushChangedMetrics(std::function<void(DataProcessingStats::MetricSpec const&, int64_t, int64_t)> const& callback)
204{
206 bool publish = false;
207 auto currentTimestamp = getTimestamp(realTimeBase, initialTimeOffset);
208 for (size_t ami = 0; ami < availableMetrics.size(); ++ami) {
209 int mi = availableMetrics[ami];
210 auto& update = updateInfos[mi];
211 MetricSpec& spec = metricSpecs[mi];
212 if (spec.name.empty()) {
213 continue;
214 }
215 if (spec.enabled == false) {
216 LOGP(debug, "Metric {} is disabled", spec.name);
217 continue;
218 }
219 if (updated[mi] == false && currentTimestamp - update.timestamp > spec.maxRefreshLatency) {
220 updated[mi] = true;
221 update.timestamp = currentTimestamp;
222 }
223 if (updated[mi] == false) {
224 continue;
225 }
226 if (currentTimestamp - update.lastPublished < spec.minPublishInterval) {
227 continue;
228 }
229 publish = true;
230 if (spec.kind == Kind::Unknown) {
231 LOGP(fatal, "Metric {} has unknown kind", spec.name);
232 }
233 if (spec.kind == Kind::Rate) {
234 if (currentTimestamp - update.timestamp == 0) {
235 callback(spec, update.timestamp, 0);
236 } else {
237 // Timestamp is in milliseconds, we want to convert to seconds.
238 callback(spec, update.timestamp, (1000 * (metrics[mi] - lastPublishedMetrics[mi])) / (currentTimestamp - update.timestamp));
239 }
240 update.timestamp = currentTimestamp; // We reset the timestamp to the current time.
241 } else {
242 callback(spec, update.timestamp, metrics[mi]);
243 }
246 update.lastPublished = currentTimestamp;
247 updated[mi] = false;
248 }
249 if (publish) {
251 }
252 static int64_t startTime = uv_hrtime();
253 int64_t now = uv_hrtime();
254
255 auto timeDelta = std::max(int64_t(1), now - startTime); // min 1 unit of time to exclude division by 0
256 double averageInvocations = (publishingInvokedTotal * 1000000000) / timeDelta;
257 double averagePublishing = (publishedMetricsLapse * 1000000000) / timeDelta;
258
259 LOGP(debug, "Publishing invoked {} times / s, {} metrics published / s", (int)averageInvocations, (int)averagePublishing);
260}
261
263{
264 if (spec.name.size() == 0) {
265 throw runtime_error("Metric name cannot be empty.");
266 }
267 if (spec.metricId >= metricSpecs.size()) {
268 throw runtime_error_f("Metric id %d is out of range. Max is %d", spec.metricId, metricSpecs.size());
269 }
270 if (metricSpecs[spec.metricId].name.size() != 0 && spec.name != metricSpecs[spec.metricId].name) {
271 auto currentName = metricSpecs[spec.metricId].name;
272 throw runtime_error_f("Metric %d already registered with name %s", spec.metricId, currentName.data(), spec.name.data());
273 }
274 auto currentMetric = std::find_if(metricSpecs.begin(), metricSpecs.end(), [&spec](MetricSpec const& s) { return s.name == spec.name && s.metricId != spec.metricId; });
275 if (currentMetric != metricSpecs.end()) {
276 throw runtime_error_f("Metric %s already registered with id %d. Cannot reregister with %d.", spec.name.data(), currentMetric->metricId, spec.metricId);
277 }
278 metricSpecs[spec.metricId] = spec;
279 metricsNames[spec.metricId] = spec.name;
280 metrics[spec.metricId] = spec.defaultValue;
281 if (metricSpecs[spec.metricId].scope == Scope::Online) {
282 metricSpecs[spec.metricId].minPublishInterval = std::max(metricSpecs[spec.metricId].minPublishInterval, config.minOnlinePublishInterval);
283 }
284 int64_t currentTime = getTimestamp(realTimeBase, initialTimeOffset);
285 updateInfos[spec.metricId] = UpdateInfo{currentTime, currentTime};
286 updated[spec.metricId] = spec.sendInitialValue;
287 availableMetrics.push_back(spec.metricId);
288}
289
290} // namespace o2::framework
std::ostringstream debug
int32_t i
bool publish(std::string const &filename, std::string const &path, std::string CCDBpath)
Definition GRPTool.cxx:198
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
Definition glcorearb.h:5500
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLenum GLint * range
Definition glcorearb.h:1899
GLintptr offset
Definition glcorearb.h:660
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLuint id
Definition glcorearb.h:650
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error(const char *)
RuntimeErrorRef runtime_error_f(const char *,...)
int64_t minPublishInterval
How many milliseconds must have passed since the last publishing.
int64_t defaultValue
The default value for the metric.
std::array< bool, MAX_METRICS > updated
char const * findMetricNameById(ProcessingStatsId id) const
void registerMetric(MetricSpec const &spec)
std::array< MetricSpec, MAX_METRICS > metricSpecs
std::function< int64_t(int64_t base, int64_t offset)> getTimestamp
@ SetIfPositive
Set the value to the specified value.
@ InstantaneousRate
Update the rate of the metric given the cumulative value since last time it got published.
@ Max
Subtract the value from the current value.
@ Min
Set the value to the maximum of the current value and the specified value.
@ CumulativeRate
Set the value to the specified value if it is positive.
@ Sub
Add the value to the current value.
@ Add
Update the rate of the metric given the amount since the last time.
std::array< std::string, MAX_METRICS > metricsNames
std::array< Command, MAX_CMDS > cmds
void flushChangedMetrics(std::function< void(MetricSpec const &, int64_t, int64_t)> const &callback)
std::array< int64_t, MAX_METRICS > lastPublishedMetrics
DataProcessingStats(std::function< void(int64_t &base, int64_t &offset)> getRealtimeBase, std::function< int64_t(int64_t base, int64_t offset)> getTimestamp, DefaultConfig config)
std::function< void(int64_t &base, int64_t &offset)> getRealtimeBase
std::array< UpdateInfo, MAX_METRICS > updateInfos