Project
Loading...
Searching...
No Matches
test_DataProcessingStats.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16#include <catch_amalgamated.hpp>
17#include <uv.h>
18
19using namespace o2::framework;
20
27
28using namespace o2::framework;
29
30TEST_CASE("DataProcessingStats")
31{
33 TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()), {});
34
36 stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric});
38 REQUIRE_THROWS(stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric2}));
40 REQUIRE_THROWS(stats.registerMetric({.name = "dummy_metric2", .metricId = DummyMetric}));
42 REQUIRE_THROWS(stats.registerMetric({.name = "", .metricId = ZeroSize}));
43
44 stats.registerMetric({.name = "dummy_metric2", .metricId = DummyMetric2});
45 REQUIRE(stats.metricsNames[DummyMetric] == "dummy_metric");
46 stats.updateStats({DummyMetric, DataProcessingStats::Op::Add, 1});
47 REQUIRE_THROWS(stats.updateStats({Missing, DataProcessingStats::Op::Add, 1}));
48 REQUIRE(stats.nextCmd.load() == 1);
49 REQUIRE(stats.updatedMetricsLapse.load() == 1);
50 REQUIRE(stats.pushedMetricsLapse == 0);
51 REQUIRE(stats.publishedMetricsLapse == 0);
52 // Queue was not yet processed here
53 REQUIRE(stats.metrics[DummyMetric] == 0);
54 REQUIRE(stats.updated[DummyMetric] == false);
55 REQUIRE(stats.metrics[DummyMetric2] == false);
56 stats.processCommandQueue();
57 REQUIRE(stats.updatedMetricsLapse.load() == 1);
58 REQUIRE(stats.pushedMetricsLapse == 1);
59 REQUIRE(stats.publishedMetricsLapse == 0);
60 REQUIRE(stats.nextCmd.load() == 0);
61 REQUIRE(stats.metrics[DummyMetric] == 1);
62 REQUIRE(stats.updated[DummyMetric] == true);
63 REQUIRE(stats.metrics[DummyMetric2] == 0);
64 REQUIRE(stats.updated[DummyMetric2] == false);
65 stats.updateStats({DummyMetric, DataProcessingStats::Op::Add, 1});
66 REQUIRE(stats.nextCmd.load() == 1);
67 // Queue was not yet processed here
68 REQUIRE(stats.metrics[DummyMetric] == 1);
69 // This is true because we have not flushed it yet
70 REQUIRE(stats.updated[DummyMetric] == true);
71 REQUIRE(stats.metrics[DummyMetric2] == false);
72 stats.processCommandQueue();
73 REQUIRE(stats.metrics[DummyMetric] == 2);
74 REQUIRE(stats.updated[DummyMetric] == true);
75 REQUIRE(stats.metrics[DummyMetric2] == 0);
76 REQUIRE(stats.updated[DummyMetric2] == false);
77
78 std::vector<std::string> updated;
79 auto simpleFlush = [&updated](DataProcessingStats::MetricSpec const& spec, int64_t timestamp, int64_t value) {
80 updated.emplace_back(spec.name);
81 };
82
83 stats.flushChangedMetrics(simpleFlush);
84 REQUIRE(stats.updatedMetricsLapse.load() == 2);
85 REQUIRE(stats.pushedMetricsLapse == 2);
86 REQUIRE(stats.publishedMetricsLapse == 1);
87 REQUIRE(updated.size() == 1);
88 REQUIRE(updated[0] == "dummy_metric");
89 stats.flushChangedMetrics(simpleFlush);
90 REQUIRE(stats.updated[DummyMetric] == false);
91 REQUIRE(stats.updated[DummyMetric2] == false);
92 REQUIRE(updated.size() == 1);
93 REQUIRE(updated[0] == "dummy_metric");
94 stats.updateStats({DummyMetric, DataProcessingStats::Op::Sub, 1});
95 stats.updateStats({DummyMetric, DataProcessingStats::Op::Add, 2});
96 REQUIRE(stats.nextCmd.load() == 2);
97 stats.processCommandQueue();
98 REQUIRE(stats.updatedMetricsLapse.load() == 4);
99 REQUIRE(stats.pushedMetricsLapse == 4);
100 REQUIRE(stats.publishedMetricsLapse == 1);
101 REQUIRE(stats.nextCmd.load() == 0);
102 REQUIRE(stats.updated[DummyMetric] == true);
103 REQUIRE(stats.updated[DummyMetric2] == false);
104 REQUIRE(stats.metrics[DummyMetric] == 3);
105 stats.updateStats({DummyMetric, DataProcessingStats::Op::Add, 2});
106 stats.updateStats({DummyMetric, DataProcessingStats::Op::Set, 1});
107 REQUIRE(stats.updatedMetricsLapse.load() == 6);
108 REQUIRE(stats.pushedMetricsLapse == 4);
109 REQUIRE(stats.publishedMetricsLapse == 1);
110 stats.processCommandQueue();
111 REQUIRE(stats.updatedMetricsLapse.load() == 6);
112 REQUIRE(stats.pushedMetricsLapse == 6);
113 REQUIRE(stats.publishedMetricsLapse == 1);
114 REQUIRE(stats.updated[DummyMetric] == true);
115 REQUIRE(stats.metrics[DummyMetric] == 1);
116 stats.flushChangedMetrics(simpleFlush);
117 REQUIRE(stats.updated[DummyMetric] == false);
118 REQUIRE(stats.metrics[DummyMetric] == 1);
119
120 // Setting the same value does not change the updated flag
121 stats.updateStats({DummyMetric, DataProcessingStats::Op::Set, 1});
122 stats.processCommandQueue();
123 REQUIRE(stats.updatedMetricsLapse.load() == 7);
124 REQUIRE(stats.pushedMetricsLapse == 6);
125 REQUIRE(stats.publishedMetricsLapse == 2);
126 REQUIRE(stats.metrics[DummyMetric] == 1);
127 REQUIRE(stats.updated[DummyMetric] == false);
128
129 REQUIRE(stats.nextCmd.load() == 0);
130 for (size_t i = 0; i < 65; ++i) {
131 stats.updateStats({DummyMetric, DataProcessingStats::Op::Add, 1});
132 }
133 REQUIRE(stats.nextCmd.load() == 1);
134 REQUIRE(stats.metrics[DummyMetric] == 65);
135 REQUIRE(stats.updated[DummyMetric] == true);
136 stats.processCommandQueue();
137 stats.flushChangedMetrics(simpleFlush);
138 REQUIRE(stats.metrics[DummyMetric] == 66);
139 REQUIRE(stats.updated[DummyMetric] == false);
140
141 SECTION("Test all operations")
142 {
143 stats.updateStats({DummyMetric, DataProcessingStats::Op::Set, 100});
144 stats.processCommandQueue();
145 REQUIRE(stats.metrics[DummyMetric] == 100);
146 stats.updateStats({DummyMetric, DataProcessingStats::Op::Add, 1});
147 stats.processCommandQueue();
148 REQUIRE(stats.metrics[DummyMetric] == 101);
149 stats.updateStats({DummyMetric, DataProcessingStats::Op::Add, 3});
150 stats.processCommandQueue();
151 REQUIRE(stats.metrics[DummyMetric] == 104);
152 stats.flushChangedMetrics(simpleFlush);
153 stats.updateStats({DummyMetric, DataProcessingStats::Op::Add, 0});
154 stats.processCommandQueue();
155 REQUIRE(stats.metrics[DummyMetric] == 104);
156 REQUIRE(stats.updated[DummyMetric] == false);
157
158 stats.updateStats({DummyMetric, DataProcessingStats::Op::Sub, 1});
159 stats.processCommandQueue();
160 REQUIRE(stats.metrics[DummyMetric] == 103);
161 stats.updateStats({DummyMetric, DataProcessingStats::Op::Sub, 2});
162 stats.processCommandQueue();
163 REQUIRE(stats.metrics[DummyMetric] == 101);
164 stats.updateStats({DummyMetric, DataProcessingStats::Op::Min, 102});
165 stats.processCommandQueue();
166 REQUIRE(stats.metrics[DummyMetric] == 101);
167 stats.updateStats({DummyMetric, DataProcessingStats::Op::Min, 10});
168 stats.processCommandQueue();
169 REQUIRE(stats.metrics[DummyMetric] == 10);
170 stats.updateStats({DummyMetric, DataProcessingStats::Op::SetIfPositive, 11});
171 stats.processCommandQueue();
172 REQUIRE(stats.metrics[DummyMetric] == 11);
173 stats.updateStats({DummyMetric, DataProcessingStats::Op::SetIfPositive, -10});
174 stats.processCommandQueue();
175 REQUIRE(stats.metrics[DummyMetric] == 11);
176 }
177}
178
179// Here we artificially create a situation where the metrics are not added in
180// time order (e.g. two different threads) but we still make sure that the
181// metrics are updated correctly.
182TEST_CASE("DataProcessingStatsOutOfOrder")
183{
184 auto realtimeTime = [](int64_t& base, int64_t& offset) {
185 base = 10;
186 offset = 1000;
187 };
188 auto cpuTime = [](int64_t base, int64_t offset) -> int64_t {
189 static int count = 0;
190 int64_t value[] = {0, 1000, 999, 998};
191 return base + value[count++] - offset;
192 };
193 DataProcessingStats stats(realtimeTime, cpuTime, {});
194 // Notice this will consume one value in the cpuTime.
195 stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric});
196 stats.updateStats({DummyMetric, DataProcessingStats::Op::Set, 2});
197 // In reality this should have a lower timestamp than the previous one
198 // so it will be committed before.
199 stats.updateStats({DummyMetric, DataProcessingStats::Op::Set, 1});
200 stats.processCommandQueue();
201 REQUIRE(stats.metrics[DummyMetric] == 2);
202
203 // In realtiy this should have a lower timestamp than the first
204 // so we do not set it, even if it happens after a processCommandQueue.
205 stats.updateStats({DummyMetric, DataProcessingStats::Op::Set, 3});
206 stats.processCommandQueue();
207 REQUIRE(stats.metrics[DummyMetric] == 2);
208}
209
211TEST_CASE("DataProcessingStatsInstantaneousRate")
212{
213 auto realtimeConfigurator = [](int64_t& base, int64_t& offset) {
214 base = 0;
215 offset = 0;
216 };
217 auto cpuTimeConfigurator = [](int64_t base, int64_t offset) -> int64_t {
218 static int count = 0;
219 int64_t value[] = {0, 1000, 2000, 5000, 10000};
220 return base + value[count++] - offset;
221 };
222
223 // I want to push deltas since the last update and have the immediate time
224 // averaged being stored.
225 DataProcessingStats stats(realtimeConfigurator, cpuTimeConfigurator, {});
226 stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric, .kind = DataProcessingStats::Kind::Rate});
227 REQUIRE(stats.updateInfos[DummyMetric].timestamp == 0);
228 REQUIRE(stats.updateInfos[DummyMetric].lastPublished == 0);
229 // Fake to be after 1 second
230 stats.updateStats({DummyMetric, DataProcessingStats::Op::InstantaneousRate, 2000});
231 stats.processCommandQueue();
232 REQUIRE(stats.updateInfos[DummyMetric].timestamp == 1000);
233 REQUIRE(stats.updateInfos[DummyMetric].lastPublished == 0);
234 // Faked to be after 2 seconds
235 stats.updateStats({DummyMetric, DataProcessingStats::Op::InstantaneousRate, 2000});
236 stats.processCommandQueue();
237 REQUIRE(stats.updateInfos[DummyMetric].timestamp == 1000);
238 REQUIRE(stats.updateInfos[DummyMetric].lastPublished == 0);
239 // Faked to be after 5 seconds
240 stats.updateStats({DummyMetric, DataProcessingStats::Op::InstantaneousRate, 6000});
241 stats.processCommandQueue();
242 REQUIRE(stats.updateInfos[DummyMetric].timestamp == 1000);
243 REQUIRE(stats.updateInfos[DummyMetric].lastPublished == 0);
244 REQUIRE(stats.metrics[DummyMetric] == 6000);
245
246 stats.updateStats({DummyMetric, DataProcessingStats::Op::InstantaneousRate, 5000});
247 stats.processCommandQueue();
248 REQUIRE(stats.updateInfos[DummyMetric].timestamp == 1000);
249 REQUIRE(stats.updateInfos[DummyMetric].lastPublished == 0);
250 REQUIRE(stats.metrics[DummyMetric] == 5000);
251}
252
254TEST_CASE("DataProcessingStatsCumulativeRate")
255{
256 auto realtimeConfigurator = [](int64_t& base, int64_t& offset) {
257 base = 1000;
258 offset = 0;
259 };
260 int64_t count = 0;
261 auto cpuTimeConfigurator = [&count](int64_t base, int64_t offset) -> int64_t {
262 int64_t value[] = {0, 1000, 2000, 5000, 10000};
263 return base + value[count++] - offset;
264 };
265
266 // I want to push deltas since the last update and have the immediate time
267 // averaged being stored.
268 DataProcessingStats stats(realtimeConfigurator, cpuTimeConfigurator, {});
269 stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric, .kind = DataProcessingStats::Kind::Rate});
270 REQUIRE(stats.updateInfos[DummyMetric].timestamp == 1000);
271 REQUIRE(stats.updateInfos[DummyMetric].lastPublished == 1000);
272 REQUIRE(stats.metrics[DummyMetric] == 0);
273 // Fake to be after 1 second
274 stats.updateStats({DummyMetric, DataProcessingStats::Op::CumulativeRate, 2000});
275 stats.processCommandQueue();
276 REQUIRE(stats.updateInfos[DummyMetric].timestamp == 1000);
277 REQUIRE(stats.updateInfos[DummyMetric].lastPublished == 1000);
278 REQUIRE(stats.metrics[DummyMetric] == 2000);
279 // Faked to be after 2 seconds
280 stats.updateStats({DummyMetric, DataProcessingStats::Op::CumulativeRate, 2000});
281 stats.processCommandQueue();
282 REQUIRE(stats.updateInfos[DummyMetric].timestamp == 1000);
283 REQUIRE(stats.updateInfos[DummyMetric].lastPublished == 1000);
284 REQUIRE(stats.metrics[DummyMetric] == 4000);
285 // Faked to be after 5 seconds
286 stats.updateStats({DummyMetric, DataProcessingStats::Op::CumulativeRate, 6000});
287 stats.processCommandQueue();
288 REQUIRE(stats.updateInfos[DummyMetric].timestamp == 1000);
289 REQUIRE(stats.updateInfos[DummyMetric].lastPublished == 1000);
290 REQUIRE(stats.metrics[DummyMetric] == 10000);
291
292 stats.updateStats({DummyMetric, DataProcessingStats::Op::CumulativeRate, 1000});
293 stats.processCommandQueue();
294 REQUIRE(stats.updateInfos[DummyMetric].timestamp == 1000);
295 REQUIRE(stats.updateInfos[DummyMetric].lastPublished == 1000);
296 REQUIRE(stats.metrics[DummyMetric] == 11000);
297}
298
299TEST_CASE("DataProcessingStatsPublishing")
300{
301 auto realtimeTimestamp = [](int64_t& base, int64_t& offset) {
302 base = 1000;
303 offset = 0;
304 };
305 int64_t count = 0;
306 auto cpuTimeTimestamp = [&count](int64_t base, int64_t offset) -> int64_t {
307 int64_t value[] = {0, 1000, 1001, 2001, 2002, 3000, 5000, 10000, 11000, 12000};
308 return base + value[count++] - offset;
309 };
310
311 // I want to push deltas since the last update and have the immediate time
312 // averaged being stored.
313 DataProcessingStats stats(realtimeTimestamp, cpuTimeTimestamp, {});
314 stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric, .minPublishInterval = 5000});
315 stats.registerMetric({.name = "dummy_metric2", .metricId = DummyMetric2, .minPublishInterval = 2000});
316 REQUIRE(stats.updateInfos[DummyMetric].timestamp == 1000);
317 REQUIRE(stats.updateInfos[DummyMetric].lastPublished == 1000);
318 REQUIRE(stats.metrics[DummyMetric] == 0);
319
320 std::vector<std::string> updated;
321 auto simpleFlush = [&updated](o2::framework::DataProcessingStats::MetricSpec const& spec, int64_t timestamp, int64_t value) {
322 updated.emplace_back(spec.name);
323 };
324
325 // Fake to be after 1 second
326 stats.updateStats({DummyMetric, DataProcessingStats::Op::Set, 2000});
327 stats.updateStats({DummyMetric2, DataProcessingStats::Op::Set, 1000});
328 REQUIRE(stats.updateInfos[0].timestamp == 1000);
329 REQUIRE(stats.updateInfos[1].timestamp == 2000);
330 stats.processCommandQueue();
331 REQUIRE(count == 4);
332
333 stats.flushChangedMetrics(simpleFlush);
334 REQUIRE(count == 5);
335 REQUIRE(updated.empty());
336 stats.flushChangedMetrics(simpleFlush);
337 REQUIRE(count == 6);
338 REQUIRE(updated.size() == 1);
339 updated.clear();
340 stats.flushChangedMetrics(simpleFlush);
341 REQUIRE(updated.size() == 1);
342}
343
344TEST_CASE("DataProcessingStatsPublishingRepeated")
345{
346 auto realtimeTimestamp = [](int64_t& base, int64_t& offset) {
347 base = 1000;
348 offset = 0;
349 };
350 int64_t count = 0;
351 static int64_t timestamps[] = {0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 19000, 20000};
352 auto cpuTimeTimestamp = [&count](int64_t base, int64_t offset) -> int64_t {
353 return base + timestamps[count++] - offset;
354 };
355
356 // I want to push deltas since the last update and have the immediate time
357 // averaged being stored.
358 DataProcessingStats stats(realtimeTimestamp, cpuTimeTimestamp, {});
359 stats.registerMetric({.name = "dummy_metric", .metricId = DummyMetric, .minPublishInterval = 3000, .maxRefreshLatency = 9000});
360 REQUIRE(stats.updateInfos[DummyMetric].timestamp == 1000);
361 REQUIRE(stats.updateInfos[DummyMetric].lastPublished == 1000);
362 REQUIRE(stats.metrics[DummyMetric] == 0);
363
364 std::vector<std::string> updated;
365 auto simpleFlush = [&updated](o2::framework::DataProcessingStats::MetricSpec const& spec, int64_t timestamp, int64_t value) {
366 updated.emplace_back(spec.name);
367 };
368
369 // Fake to be after 1 second
370 stats.updateStats({DummyMetric, DataProcessingStats::Op::Set, 1000});
371 REQUIRE(stats.updateInfos[0].timestamp == 1000);
372 stats.flushChangedMetrics(simpleFlush);
373 REQUIRE(count == 3);
374 REQUIRE(updated.empty());
375 stats.processCommandQueue();
376 stats.flushChangedMetrics(simpleFlush);
377 REQUIRE(count == 4);
378 CHECK(updated.size() == 1);
379 stats.processCommandQueue();
380 stats.flushChangedMetrics(simpleFlush);
381 REQUIRE(count == 5);
382 CHECK(updated.size() == 1);
383 stats.processCommandQueue();
384 stats.flushChangedMetrics(simpleFlush);
385 REQUIRE(count == 6);
386 CHECK(updated.size() == 1);
387 stats.processCommandQueue();
388 stats.flushChangedMetrics(simpleFlush);
389 REQUIRE(count == 7);
390 CHECK(updated.size() == 1);
391 stats.processCommandQueue();
392 stats.flushChangedMetrics(simpleFlush);
393 CHECK(updated.size() == 1);
394 REQUIRE(count == 8);
395 stats.processCommandQueue();
396 stats.flushChangedMetrics(simpleFlush);
397 CHECK(updated.size() == 1);
398 REQUIRE(count == 9);
399 stats.processCommandQueue();
400 stats.flushChangedMetrics(simpleFlush);
401 REQUIRE(updated.size() == 2);
402 REQUIRE(count == 10);
403}
int32_t i
#define CHECK
GLint GLsizei count
Definition glcorearb.h:399
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLintptr offset
Definition glcorearb.h:660
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
void clean_all_runtime_errors()
Helper struct to hold statistics about the data processing happening.
static std::function< int64_t(int64_t base, int64_t offset)> defaultCPUTimeConfigurator(uv_loop_t *loop)
static std::function< void(int64_t &base, int64_t &offset)> defaultRealtimeBaseConfigurator(uint64_t offset, uv_loop_t *loop)
TEST_CASE("DataProcessingStats")