Project
Loading...
Searching...
No Matches
ArrowSupport.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "ArrowSupport.h"
12
30#include "WorkflowHelpers.h"
35#include "Framework/Signpost.h"
37
39#include <Monitoring/Monitoring.h>
40#include "Headers/DataHeader.h"
41
42#include <RtypesCore.h>
43#include <fairmq/ProgOptions.h>
44
45#include <uv.h>
46#include <boost/program_options/variables_map.hpp>
47#include <csignal>
48
50
51namespace o2::framework
52{
53class EndOfStreamContext;
54class ProcessingContext;
55
56enum struct RateLimitingState {
57 UNKNOWN = 0, // No information received yet.
58 STARTED = 1, // Information received, new timeframe not requested.
59 CHANGED = 2, // Information received, new timeframe requested but not yet accounted.
60 BELOW_LIMIT = 3, // New metric received, we are below limit.
61 NEXT_ITERATION_FROM_BELOW = 4, // Iteration when previously in BELOW_LIMIT.
62 ABOVE_LIMIT = 5, // New metric received, we are above limit.
63 EMPTY = 6, //
64};
65
67 int64_t maxMemory = 2000;
68 int64_t maxTimeframes = 1000;
69};
70
72 size_t arrowBytesCreated = -1;
76 size_t arrowBytesExpired = -1;
78 size_t timeframesRead = -1;
79 size_t timeframesConsumed = -1;
80 size_t timeframesExpired = -1;
81 // Timeslices counting
82 size_t timeslicesStarted = -1;
83 size_t timeslicesExpired = -1;
84 size_t timeslicesDone = -1;
85};
86
87std::vector<MetricIndices> createDefaultIndices(std::vector<DeviceMetricsInfo>& allDevicesMetrics)
88{
89 std::vector<MetricIndices> results;
90
91 for (auto& info : allDevicesMetrics) {
92 results.emplace_back(MetricIndices{
93 .arrowBytesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-created"),
94 .arrowBytesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-destroyed"),
95 .arrowMessagesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-created"),
96 .arrowMessagesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-destroyed"),
97 .arrowBytesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-expired"),
98 .shmOfferBytesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "shm-offer-bytes-consumed"),
99 .timeframesRead = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "df-sent"),
100 .timeframesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "consumed-timeframes"),
101 .timeframesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "expired-timeframes"),
102 .timeslicesStarted = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "timeslices-started"),
103 .timeslicesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "timeslices-expired"),
104 .timeslicesDone = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "timeslices-done"),
105 });
106 }
107 return results;
108}
109
111 int64_t available;
112 int64_t offered = 0;
113 int64_t lastDeviceOffered = 0;
114};
116 int64_t enoughCount;
117 int64_t lowCount;
118};
120 char const* name;
121 char const* unit;
122 char const* api;
123 int64_t maxAvailable;
124 int64_t maxQuantum;
125 int64_t minQuantum;
127};
128
129auto offerResources(ResourceState& resourceState,
130 ResourceSpec const& resourceSpec,
131 ResourceStats& resourceStats,
132 std::vector<DeviceSpec> const& specs,
133 std::vector<DeviceInfo> const& infos,
134 DevicesManager& manager,
135 int64_t offerConsumedCurrentValue,
136 int64_t offerExpiredCurrentValue,
137 int64_t acquiredResourceCurrentValue,
138 int64_t disposedResourceCurrentValue,
139 size_t timestamp,
140 DeviceMetricsInfo& driverMetrics,
141 std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& availableResourceMetric,
142 std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& unusedOfferedResourceMetric,
143 std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& offeredResourceMetric,
144 void* signpostId) -> void
145{
146 O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, signpostId);
149 int64_t lastCandidate = -1;
150 int64_t possibleOffer = resourceSpec.minQuantum;
151
152 for (size_t di = 0; di < specs.size(); di++) {
153 if (resourceState.available < possibleOffer) {
154 if (resourceStats.lowCount == 0) {
155 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "not enough",
156 "We do not have enough %{public}s (%llu %{public}s) to offer %llu %{public}s. Total offerings %{bytes}llu %{string}s.",
157 resourceSpec.name, resourceState.available, resourceSpec.unit,
158 possibleOffer, resourceSpec.unit,
159 resourceState.offered, resourceSpec.unit);
160 }
161 resourceStats.lowCount++;
162 resourceStats.enoughCount = 0;
163 break;
164 } else {
165 if (resourceStats.enoughCount == 0) {
166 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "enough",
167 "We are back in a state where we enough %{public}s: %llu %{public}s",
168 resourceSpec.name,
169 resourceState.available,
170 resourceSpec.unit);
171 }
172 resourceStats.lowCount = 0;
173 resourceStats.enoughCount++;
174 }
175 size_t candidate = (resourceState.lastDeviceOffered + di) % specs.size();
176
177 auto& info = infos[candidate];
178 // Do not bother for inactive devices
179 // FIXME: there is probably a race condition if the device died and we did not
180 // took notice yet...
181 if (info.active == false || info.readyToQuit) {
182 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
183 "Device %s is inactive not offering %{public}s to it.",
184 specs[candidate].name.c_str(), resourceSpec.name);
185 continue;
186 }
187 if (specs[candidate].name != "internal-dpl-aod-reader") {
188 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
189 "Device %s is not a reader. Not offering %{public}s to it.",
190 specs[candidate].name.c_str(),
191 resourceSpec.name);
192 continue;
193 }
194 possibleOffer = std::min(resourceSpec.maxQuantum, resourceState.available);
195 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
196 "Offering %llu %{public}s out of %llu to %{public}s",
197 possibleOffer, resourceSpec.unit, resourceState.available, specs[candidate].id.c_str());
198 manager.queueMessage(specs[candidate].id.c_str(), fmt::format(fmt::runtime(resourceSpec.api), possibleOffer).data());
199 resourceState.available -= possibleOffer;
200 resourceState.offered += possibleOffer;
201 lastCandidate = candidate;
202 }
203 // We had at least a valid candidate, so
204 // next time we offer to the next device.
205 if (lastCandidate >= 0) {
206 resourceState.lastDeviceOffered = lastCandidate + 1;
207 }
208
209 // unusedOfferedSharedMemory is the amount of memory which was offered and which we know it was
210 // not used so far. So we need to account for the amount which got actually read (readerBytesCreated)
211 // and the amount which we know was given back.
212 static int64_t lastResourceOfferConsumed = 0;
213 static int64_t lastUnusedOfferedResource = 0;
214 if (offerConsumedCurrentValue != lastResourceOfferConsumed) {
215 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
216 "Offer consumed so far %llu", offerConsumedCurrentValue);
217 lastResourceOfferConsumed = offerConsumedCurrentValue;
218 }
219 int unusedOfferedResource = (resourceState.offered - (offerExpiredCurrentValue + offerConsumedCurrentValue) / resourceSpec.metricOfferScaleFactor);
220 if (lastUnusedOfferedResource != unusedOfferedResource) {
221 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
222 "unusedOfferedResource(%{public}s):%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / %lli",
223 resourceSpec.name,
224 unusedOfferedResource, resourceState.offered,
225 offerExpiredCurrentValue / resourceSpec.metricOfferScaleFactor,
226 offerConsumedCurrentValue / resourceSpec.metricOfferScaleFactor,
227 resourceSpec.metricOfferScaleFactor);
228 lastUnusedOfferedResource = unusedOfferedResource;
229 }
230 // availableSharedMemory is the amount of memory which we know is available to be offered.
231 // We subtract the amount which we know was already offered but it's unused and we then balance how
232 // much was created with how much was destroyed.
233 resourceState.available = resourceSpec.maxAvailable + ((disposedResourceCurrentValue - acquiredResourceCurrentValue) / resourceSpec.metricOfferScaleFactor) - unusedOfferedResource;
234 availableResourceMetric(driverMetrics, resourceState.available, timestamp);
235 unusedOfferedResourceMetric(driverMetrics, unusedOfferedResource, timestamp);
236
237 offeredResourceMetric(driverMetrics, resourceState.offered, timestamp);
238};
239
240auto processTimeslices = [](size_t index, DeviceMetricsInfo& deviceMetrics, bool& changed,
241 int64_t& totalMetricValue, size_t& lastTimestamp) {
242 assert(index < deviceMetrics.metrics.size());
243 changed |= deviceMetrics.changed[index];
244 MetricInfo info = deviceMetrics.metrics[index];
245 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
246 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
247 auto value = (int64_t)data[(info.pos - 1) % data.size()];
248 totalMetricValue += value;
249 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
250 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
251};
252
254{
255 using o2::monitoring::Metric;
256 using o2::monitoring::Monitoring;
257 using o2::monitoring::tags::Key;
258 using o2::monitoring::tags::Value;
259
260 return ServiceSpec{
261 .name = "arrow-backend",
263 .configure = CommonServices::noConfiguration(),
268 .metricHandling = [](ServiceRegistryRef registry,
269 ServiceMetricsInfo const& sm,
270 size_t timestamp) {
271 int64_t totalBytesCreated = 0;
272 int64_t shmOfferBytesConsumed = 0;
273 int64_t totalBytesDestroyed = 0;
274 int64_t totalBytesExpired = 0;
275 int64_t totalMessagesCreated = 0;
276 int64_t totalMessagesDestroyed = 0;
277 int64_t totalTimeframesRead = 0;
278 int64_t totalTimeframesConsumed = 0;
279 int64_t totalTimeframesExpired = 0;
280 int64_t totalTimeslicesStarted = 0;
281 int64_t totalTimeslicesDone = 0;
282 int64_t totalTimeslicesExpired = 0;
283 auto &driverMetrics = sm.driverMetricsInfo;
284 auto &allDeviceMetrics = sm.deviceMetricsInfos;
285 auto &specs = sm.deviceSpecs;
286 auto &infos = sm.deviceInfos;
287
288 // Aggregated driver metrics for timeslice rate limiting
289 auto createUint64DriverMetric = [&driverMetrics](char const*name) -> auto {
290 return DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, name);
291 };
292 auto createIntDriverMetric = [&driverMetrics](char const*name) -> auto {
293 return DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, name);
294 };
295
296 static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "rate-limit-state");
297 static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-created");
298 static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-shm-offer-bytes-consumed");
299 // These are really to monitor the rate limiting
300 static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-shared-memory");
301 static auto unusedOfferedTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-timeslices");
302 static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-shared-memory");
303 static auto availableTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-timeslices");
304 static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-shared-memory");
305 static auto offeredTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-timeslices");
306
307 static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-destroyed");
308 static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-expired");
309 static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-created");
310 static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-destroyed");
311 static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-read");
312 static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-consumed");
313 static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-timeframes-in-fly");
314
315 static auto totalTimeslicesStartedMetric = createUint64DriverMetric("total-timeslices-started");
316 static auto totalTimeslicesExpiredMetric = createUint64DriverMetric("total-timeslices-expired");
317 static auto totalTimeslicesDoneMetric = createUint64DriverMetric("total-timeslices-done");
318 static auto totalTimeslicesInFlyMetric = createIntDriverMetric("total-timeslices-in-fly");
319
320 static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "arrow-bytes-delta");
321 static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "changed-metrics-count");
322 static auto totalSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-reader-signals");
323 static auto signalLatencyMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-signal-latency");
324 static auto skippedSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-skipped-signals");
325 static auto remainingBytes = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-remaining-bytes");
326 auto& manager = registry.get<DevicesManager>();
327
328 bool changed = false;
329
330 size_t lastTimestamp = 0;
331 static std::vector<MetricIndices> allIndices = createDefaultIndices(allDeviceMetrics);
332 for (size_t mi = 0; mi < allDeviceMetrics.size(); ++mi) {
333 auto& deviceMetrics = allDeviceMetrics[mi];
334 if (deviceMetrics.changed.size() != deviceMetrics.metrics.size()) {
335 throw std::runtime_error("deviceMetrics.size() != allDeviceMetrics.size()");
336 }
337 auto& indices = allIndices[mi];
338 {
339 size_t index = indices.arrowBytesCreated;
340 assert(index < deviceMetrics.metrics.size());
341 changed |= deviceMetrics.changed[index];
342 MetricInfo info = deviceMetrics.metrics[index];
343 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
344 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
345 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
346 auto value = (int64_t)data[(info.pos - 1) % data.size()];
347 totalBytesCreated += value;
348 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
349 }
350 {
351 size_t index = indices.shmOfferBytesConsumed;
352 assert(index < deviceMetrics.metrics.size());
353 changed |= deviceMetrics.changed[index];
354 MetricInfo info = deviceMetrics.metrics[index];
355 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
356 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
357 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
358 auto value = (int64_t)data[(info.pos - 1) % data.size()];
359 shmOfferBytesConsumed += value;
360 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
361 }
362 {
363 size_t index = indices.arrowBytesDestroyed;
364 assert(index < deviceMetrics.metrics.size());
365 changed |= deviceMetrics.changed[index];
366 MetricInfo info = deviceMetrics.metrics[index];
367 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
368 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
369 auto value = (int64_t)data[(info.pos - 1) % data.size()];
370 totalBytesDestroyed += value;
371 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
372 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
373 }
374 {
375 size_t index = indices.arrowBytesExpired;
376 assert(index < deviceMetrics.metrics.size());
377 changed |= deviceMetrics.changed[index];
378 MetricInfo info = deviceMetrics.metrics[index];
379 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
380 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
381 auto value = (int64_t)data[(info.pos - 1) % data.size()];
382 totalBytesExpired += value;
383 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
384 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
385 }
386 {
387 size_t index = indices.arrowMessagesCreated;
388 assert(index < deviceMetrics.metrics.size());
389 MetricInfo info = deviceMetrics.metrics[index];
390 changed |= deviceMetrics.changed[index];
391 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
392 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
393 auto value = (int64_t)data[(info.pos - 1) % data.size()];
394 totalMessagesCreated += value;
395 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
396 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
397 }
398 {
399 size_t index = indices.arrowMessagesDestroyed;
400 assert(index < deviceMetrics.metrics.size());
401 MetricInfo info = deviceMetrics.metrics[index];
402 changed |= deviceMetrics.changed[index];
403 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
404 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
405 auto value = (int64_t)data[(info.pos - 1) % data.size()];
406 totalMessagesDestroyed += value;
407 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
408 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
409 }
410 {
411 size_t index = indices.timeframesRead;
412 assert(index < deviceMetrics.metrics.size());
413 changed |= deviceMetrics.changed[index];
414 MetricInfo info = deviceMetrics.metrics[index];
415 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
416 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
417 auto value = (int64_t)data[(info.pos - 1) % data.size()];
418 totalTimeframesRead += value;
419 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
420 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
421 }
422 {
423 size_t index = indices.timeframesConsumed;
424 assert(index < deviceMetrics.metrics.size());
425 changed |= deviceMetrics.changed[index];
426 MetricInfo info = deviceMetrics.metrics[index];
427 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
428 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
429 auto value = (int64_t)data[(info.pos - 1) % data.size()];
430 totalTimeframesConsumed += value;
431 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
432 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
433 }
434 {
435 size_t index = indices.timeframesExpired;
436 assert(index < deviceMetrics.metrics.size());
437 changed |= deviceMetrics.changed[index];
438 MetricInfo info = deviceMetrics.metrics[index];
439 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
440 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
441 auto value = (int64_t)data[(info.pos - 1) % data.size()];
442 totalTimeframesExpired += value;
443 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
444 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
445 }
446 processTimeslices(indices.timeslicesStarted, deviceMetrics, changed, totalTimeslicesStarted, lastTimestamp);
447 processTimeslices(indices.timeslicesExpired, deviceMetrics, changed, totalTimeslicesExpired, lastTimestamp);
448 processTimeslices(indices.timeslicesDone, deviceMetrics, changed, totalTimeslicesDone, lastTimestamp);
449 }
450 static uint64_t unchangedCount = 0;
451 if (changed) {
452 totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp);
453 totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp);
454 totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp);
455 shmOfferConsumedMetric(driverMetrics, shmOfferBytesConsumed, timestamp);
456 totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp);
457 totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
458 totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
459 totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
460 totalTimeframesInFlyMetric(driverMetrics, (int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
461 totalTimeslicesStartedMetric(driverMetrics, totalTimeslicesStarted, timestamp);
462 totalTimeslicesExpiredMetric(driverMetrics, totalTimeslicesExpired, timestamp);
463 totalTimeslicesDoneMetric(driverMetrics, totalTimeslicesDone, timestamp);
464 totalTimeslicesInFlyMetric(driverMetrics, (int)(totalTimeslicesStarted - totalTimeslicesDone), timestamp);
465 totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
466 } else {
467 unchangedCount++;
468 }
469 changedCountMetric(driverMetrics, unchangedCount, timestamp);
470
471 static const ResourceSpec shmResourceSpec{
472 .name = "shared memory",
473 .unit = "MB",
474 .api = "/shm-offer {}",
475 .maxAvailable = (int64_t)registry.get<RateLimitConfig>().maxMemory,
476 .maxQuantum = 100,
477 .minQuantum = 50,
478 .metricOfferScaleFactor = 1000000,
479 };
480 static const ResourceSpec timesliceResourceSpec{
481 .name = "timeslice",
482 .unit = "timeslices",
483 .api = "/timeslice-offer {}",
484 .maxAvailable = (int64_t)registry.get<RateLimitConfig>().maxTimeframes,
485 .maxQuantum = 1,
486 .minQuantum = 1,
487 .metricOfferScaleFactor = 1,
488 };
489 static ResourceState shmResourceState{
490 .available = shmResourceSpec.maxAvailable,
491 };
492 static ResourceState timesliceResourceState{
493 .available = timesliceResourceSpec.maxAvailable,
494 };
495 static ResourceStats shmResourceStats{
496 .enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
497 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
498 };
499 static ResourceStats timesliceResourceStats{
500 .enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
501 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
502 };
503
504 offerResources(timesliceResourceState, timesliceResourceSpec, timesliceResourceStats,
505 specs, infos, manager, totalTimeframesConsumed, totalTimeslicesExpired,
506 totalTimeslicesStarted, totalTimeslicesDone, timestamp, driverMetrics,
507 availableTimeslicesMetric, unusedOfferedTimeslicesMetric, offeredTimeslicesMetric,
508 (void*)&sm);
509
510 offerResources(shmResourceState, shmResourceSpec, shmResourceStats,
511 specs, infos, manager, shmOfferBytesConsumed, totalBytesExpired,
512 totalBytesCreated, totalBytesDestroyed, timestamp, driverMetrics,
513 availableSharedMemoryMetric, unusedOfferedSharedMemoryMetric, offeredSharedMemoryMetric,
514 (void*)&sm); },
515 .postDispatching = [](ProcessingContext& ctx, void* service) {
517 auto* arrow = reinterpret_cast<ArrowContext*>(service);
518 auto totalBytes = 0;
519 auto totalMessages = 0;
520 O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, &arrow);
521 for (auto& input : ctx.inputs()) {
522 if (input.header == nullptr) {
523 continue;
524 }
525 auto const* dh = DataRefUtils::getHeader<DataHeader*>(input);
526 auto payloadSize = DataRefUtils::getPayloadSize(input);
527 if (dh->serialization != o2::header::gSerializationMethodArrow) {
528 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
529 "Message %{public}.4s/%{public}.16s is not of kind arrow, therefore we are not accounting its shared memory.",
530 dh->dataOrigin.str, dh->dataDescription.str);
531 continue;
532 }
533 bool forwarded = std::ranges::any_of(ctx.services().get<DeviceSpec const>().forwards, [&dh](auto const& forward) { return DataSpecUtils::match(forward.matcher, *dh); });
534 if (forwarded) {
535 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
536 "Message %{public}.4s/%{public}.16s is forwarded so we are not returning its memory.",
537 dh->dataOrigin.str, dh->dataDescription.str);
538 continue;
539 }
540 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
541 "Message %{public}.4s/%{public}.16s is being deleted. We will return %{bytes}f MB.",
542 dh->dataOrigin.str, dh->dataDescription.str, payloadSize / 1000000.);
543 totalBytes += payloadSize;
544 totalMessages += 1;
545 }
546 arrow->updateBytesDestroyed(totalBytes);
547 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "give back",
548 "%{bytes}f MB bytes being given back to reader, totaling %{bytes}f MB",
549 totalBytes / 1000000., arrow->bytesDestroyed() / 1000000.);
550 arrow->updateMessagesDestroyed(totalMessages);
551 auto& stats = ctx.services().get<DataProcessingStats>();
552 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_DESTROYED), DataProcessingStats::Op::Set, static_cast<int64_t>(arrow->bytesDestroyed())});
553 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_DESTROYED), DataProcessingStats::Op::Set, static_cast<int64_t>(arrow->messagesDestroyed())});
554 stats.processCommandQueue(); },
555 .driverInit = [](ServiceRegistryRef registry, DeviceConfig const& dc) {
556 auto config = new RateLimitConfig{};
557 int readers = std::stoll(dc.options["readers"].as<std::string>());
558 if (dc.options.count("aod-memory-rate-limit") && dc.options["aod-memory-rate-limit"].defaulted() == false) {
559 config->maxMemory = std::stoll(dc.options["aod-memory-rate-limit"].as<std::string>()) / 1000000;
560 } else {
561 config->maxMemory = readers * 2000;
562 }
563 if (dc.options.count("timeframes-rate-limit") && dc.options["timeframes-rate-limit"].defaulted() == false) {
564 config->maxTimeframes = std::stoll(dc.options["timeframes-rate-limit"].as<std::string>());
565 } else {
566 config->maxTimeframes = readers * DefaultsHelpers::pipelineLength(dc);
567 }
568 static bool once = false;
569 // Until we guarantee this is called only once...
570 if (!once) {
571 O2_SIGNPOST_ID_GENERATE(sid, rate_limiting);
572 O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "setup",
573 "Rate limiting set up at %{bytes}llu MB and %llu timeframes distributed over %d readers",
574 config->maxMemory, config->maxTimeframes, readers);
575 registry.registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
576 once = true;
577 } },
578 .adjustTopology = [](WorkflowSpecNode& node, ConfigContext const& ctx) {
579 auto& workflow = node.specs;
580 auto& dec = ctx.services().get<DanglingEdgesContext>();
581 dec.requestedAODs.clear();
582 dec.requestedDYNs.clear();
583
584 auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
585 auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
586
587 auto builder = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-index-builder"); });
588 if (builder != workflow.end()) {
589 // collect currently requested IDXs
590 dec.requestedIDXs.clear();
591 dec.providedIDXs.clear();
592 for (auto& d : workflow | views::exclude_by_name(builder->name)) {
593 d.inputs |
594 views::filter_with_params_by_name("index-records") |
595 sinks::update_input_list{dec.requestedIDXs};
596 d.outputs |
597 views::filter_with_params_by_name("index-records") |
598 sinks::update_output_list{dec.providedIDXs};
599 }
600 std::ranges::sort(dec.requestedIDXs, inputSpecLessThan);
601 std::ranges::sort(dec.providedIDXs, outputSpecLessThan);
602 dec.builderInputs.clear();
603 dec.requestedIDXs |
604 views::filter_not_matching(dec.providedIDXs) |
605 sinks::append_to{dec.builderInputs};
606 // recreate inputs and outputs
607 builder->inputs.clear();
608 builder->outputs.clear();
609 AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.builderInputs, dec.requestedAODs, dec.requestedDYNs, *builder);
610 if (!builder->inputs.empty()) {
611 // load real AlgorithmSpec before deployment
612 builder->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "IndexTableBuilder", ctx);
613 }
614 }
615
616 auto analysisCCDB = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-ccdb"); });
617 if (analysisCCDB != workflow.end()) {
618 dec.requestedTIMs.clear();
619 dec.providedTIMs.clear();
620 for (auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
621 d.inputs |
622 views::filter_with_params_by_name_starting("ccdb:") |
623 sinks::update_input_list{dec.requestedTIMs};
624 d.outputs |
625 views::filter_with_params_by_name_starting("ccdb:") |
626 sinks::append_to{dec.providedTIMs};
627 }
628 std::ranges::sort(dec.requestedTIMs, inputSpecLessThan);
629 std::ranges::sort(dec.providedTIMs, outputSpecLessThan);
630 // Use ranges::to<std::vector<>> in C++23...
631 dec.analysisCCDBInputs.clear();
632 dec.requestedTIMs |
633 views::filter_not_matching(dec.providedTIMs) |
634 sinks::append_to{dec.analysisCCDBInputs};
635
636 // recreate inputs and outputs
637 analysisCCDB->outputs.clear();
638 analysisCCDB->inputs.clear();
639 AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.analysisCCDBInputs, dec.requestedAODs, dec.requestedDYNs, *analysisCCDB);
640 // load real AlgorithmSpec before deployment
641 analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
642 }
643
644 auto spawner = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-spawner"); });
645 if (spawner != workflow.end()) {
646 dec.providedDYNs.clear();
647 // collect currently requested DYNs
648 for (auto& d : workflow | views::exclude_by_name(spawner->name)) {
649 d.inputs |
650 views::filter_with_params_by_name("projectors") |
651 sinks::update_input_list{dec.requestedDYNs};
652 d.outputs |
653 views::filter_with_params_by_name("projectors") |
654 sinks::append_to{dec.providedDYNs};
655 }
656 std::ranges::sort(dec.requestedDYNs, inputSpecLessThan);
657 std::ranges::sort(dec.providedDYNs, outputSpecLessThan);
658 dec.spawnerInputs.clear();
659 dec.requestedDYNs |
660 views::filter_not_matching(dec.providedDYNs) |
661 sinks::append_to{dec.spawnerInputs};
662 // recreate inputs and outputs
663 spawner->outputs.clear();
664 spawner->inputs.clear();
665 AnalysisSupportHelpers::addMissingOutputsToSpawner({}, dec.spawnerInputs, dec.requestedAODs, *spawner);
666 if (!spawner->inputs.empty()) {
667 // load real AlgorithmSpec before deployment
668 spawner->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "ExtendedTableSpawner", ctx);
669 }
670 }
671
672 auto writer = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-writer"); });
673 if (writer != workflow.end()) {
674 workflow.erase(writer);
675 }
676
677 // removing writer would invalidate the reader iterator if it was created before
678 auto reader = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-reader"); });
679
680 if (reader != workflow.end()) {
681 // If reader and/or builder were adjusted, remove unneeded outputs
682 // update currently requested AODs
683 for (auto& d : workflow) {
684 d.inputs |
685 views::partial_match_filter(AODOrigins) |
686 sinks::update_input_list{dec.requestedAODs};
687 }
688
689 // remove unmatched outputs
690 auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](OutputSpec const& o) {
691 return !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFNumber"}) && !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFFilename"}) && std::none_of(dec.requestedAODs.begin(), dec.requestedAODs.end(), [&](InputSpec const& i) { return DataSpecUtils::match(i, o); });
692 });
693 reader->outputs.erase(o_end, reader->outputs.end());
694 if (reader->outputs.empty()) {
695 // nothing to read
696 workflow.erase(reader);
697 } else {
698 // load reader algorithm before deployment
699 auto mctracks2aod = std::find_if(workflow.begin(), workflow.end(), [](auto const& x) { return x.name == "mctracks-to-aod"; });
700 if (mctracks2aod == workflow.end()) { // add normal reader algorithm only if no on-the-fly generator is injected
701 reader->algorithm = CommonDataProcessors::wrapWithTimesliceConsumption(PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTFileReader", ctx));
702 } // otherwise the algorithm was set in injectServiceDevices
703 }
704 }
705
707
708 // Move the dummy sink at the end, if needed
709 for (size_t i = 0; i < workflow.size(); ++i) {
710 if (workflow[i].name == "internal-dpl-injected-dummy-sink") {
711 workflow.push_back(workflow[i]);
712 workflow.erase(workflow.begin() + i);
713 break;
714 }
715 } },
716 .kind = ServiceKind::Global};
717}
718
720{
721 return ServiceSpec{
722 .name = "arrow-slicing-cache-def",
723 .uniqueId = CommonServices::simpleServiceId<ArrowTableSlicingCacheDef>(),
724 .init = CommonServices::simpleServiceInit<ArrowTableSlicingCacheDef, ArrowTableSlicingCacheDef, ServiceKind::Global>(),
725 .kind = ServiceKind::Global};
726}
727
729{
730 return ServiceSpec{
731 .name = "arrow-slicing-cache",
732 .uniqueId = CommonServices::simpleServiceId<ArrowTableSlicingCache>(),
733 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions&) { return ServiceHandle{TypeIdHelpers::uniqueId<ArrowTableSlicingCache>(),
734 new ArrowTableSlicingCache(Cache{services.get<ArrowTableSlicingCacheDef>().bindingsKeys},
735 Cache{services.get<ArrowTableSlicingCacheDef>().bindingsKeysUnsorted}),
737 .configure = CommonServices::noConfiguration(),
738 .preProcessing = [](ProcessingContext& pc, void* service_ptr) {
739 auto* service = static_cast<ArrowTableSlicingCache*>(service_ptr);
740 auto& caches = service->bindingsKeys;
741 for (auto i = 0u; i < caches.size(); ++i) {
742 if (caches[i].enabled && pc.inputs().getPos(caches[i].binding.c_str()) >= 0) {
743 auto status = service->updateCacheEntry(i, pc.inputs().get<TableConsumer>(caches[i].matcher)->asArrowTable());
744 if (!status.ok()) {
745 throw runtime_error_f("Failed to update slice cache for %s/%s", caches[i].binding.c_str(), caches[i].key.c_str());
746 }
747 }
748 }
749 auto& unsortedCaches = service->bindingsKeysUnsorted;
750 for (auto i = 0u; i < unsortedCaches.size(); ++i) {
751 if (unsortedCaches[i].enabled && pc.inputs().getPos(unsortedCaches[i].binding.c_str()) >= 0) {
752 auto status = service->updateCacheEntryUnsorted(i, pc.inputs().get<TableConsumer>(unsortedCaches[i].matcher)->asArrowTable());
753 if (!status.ok()) {
754 throw runtime_error_f("failed to update slice cache (unsorted) for %s/%s", unsortedCaches[i].binding.c_str(), unsortedCaches[i].key.c_str());
755 }
756 }
757 } },
758 .kind = ServiceKind::Stream};
759}
760
761} // namespace o2::framework
std::string binding
std::unique_ptr< expressions::Node > node
int32_t i
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:531
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
int getPos(const char *name) const
decltype(auto) get(R binding, int part=0) const
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
std::shared_ptr< arrow::Table > asArrowTable()
Return the table in the message as a arrow::Table instance.
GLint GLenum GLint x
Definition glcorearb.h:403
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum GLenum GLsizei const GLuint GLboolean enabled
Definition glcorearb.h:2513
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLboolean * data
Definition glcorearb.h:298
GLsizei GLenum const void * indices
Definition glcorearb.h:400
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
auto spawner(std::shared_ptr< arrow::Table > const &fullTable, const char *name, o2::framework::expressions::Projector *projectors, std::shared_ptr< gandiva::Projector > &projector, std::shared_ptr< arrow::Schema > const &schema)
Expression-based column generator to materialize columns.
auto offerResources(ResourceState &resourceState, ResourceSpec const &resourceSpec, ResourceStats &resourceStats, std::vector< DeviceSpec > const &specs, std::vector< DeviceInfo > const &infos, DevicesManager &manager, int64_t offerConsumedCurrentValue, int64_t offerExpiredCurrentValue, int64_t acquiredResourceCurrentValue, int64_t disposedResourceCurrentValue, size_t timestamp, DeviceMetricsInfo &driverMetrics, std::function< void(DeviceMetricsInfo &, int value, size_t timestamp)> &availableResourceMetric, std::function< void(DeviceMetricsInfo &, int value, size_t timestamp)> &unusedOfferedResourceMetric, std::function< void(DeviceMetricsInfo &, int value, size_t timestamp)> &offeredResourceMetric, void *signpostId) -> void
std::vector< Entry > Cache
std::vector< MetricIndices > createDefaultIndices(std::vector< DeviceMetricsInfo > &allDevicesMetrics)
RuntimeErrorRef runtime_error_f(const char *,...)
constexpr o2::header::SerializationMethod gSerializationMethodArrow
Definition DataHeader.h:331
static void addMissingOutputsToBuilder(std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static void addMissingOutputsToSpawner(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, DataProcessorSpec &publisher)
static ServiceSpec arrowTableSlicingCacheSpec()
static ServiceSpec arrowBackendSpec()
static ServiceSpec arrowTableSlicingCacheDefSpec()
static AlgorithmSpec wrapWithTimesliceConsumption(AlgorithmSpec spec)
static ServiceConfigureCallback noConfiguration()
Helper struct to hold statistics about the data processing happening.
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static unsigned int pipelineLength(unsigned int minLength)
get max number of timeslices in the queue
std::vector< ForwardRoute > forwards
Definition DeviceSpec.h:64
Running state information of a given device.
Definition DeviceState.h:34
void queueMessage(char const *receiver, char const *msg)
static auto loadAlgorithmFromPlugin(std::string library, std::string plugin, ConfigContext const &context) -> AlgorithmSpec
int64_t maxAvailable
The callback to give resources to a device.
int64_t minQuantum
Largest offer which can be given.
int64_t maxQuantum
Maximum available quantity for a resource.
int64_t metricOfferScaleFactor
Smallest offer which can be given.
int64_t lowCount
How many times the resources were enough.
std::string name
Name of the service.
static void injectAODWriter(WorkflowSpec &workflow, ConfigContext const &ctx)
the main header struct
Definition DataHeader.h:620