Project
Loading...
Searching...
No Matches
ArrowSupport.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "ArrowSupport.h"
12
30#include "WorkflowHelpers.h"
35#include "Framework/Signpost.h"
37
39#include <Monitoring/Monitoring.h>
40#include "Headers/DataHeader.h"
41
42#include <RtypesCore.h>
43#include <fairmq/ProgOptions.h>
44
45#include <uv.h>
46#include <boost/program_options/variables_map.hpp>
47#include <csignal>
48
50
51namespace o2::framework
52{
53
54class EndOfStreamContext;
55class ProcessingContext;
56
57enum struct RateLimitingState {
58 UNKNOWN = 0, // No information received yet.
59 STARTED = 1, // Information received, new timeframe not requested.
60 CHANGED = 2, // Information received, new timeframe requested but not yet accounted.
61 BELOW_LIMIT = 3, // New metric received, we are below limit.
62 NEXT_ITERATION_FROM_BELOW = 4, // Iteration when previously in BELOW_LIMIT.
63 ABOVE_LIMIT = 5, // New metric received, we are above limit.
64 EMPTY = 6, //
65};
66
68 int64_t maxMemory = 2000;
69 int64_t maxTimeframes = 1000;
70};
71
73 size_t arrowBytesCreated = -1;
77 size_t arrowBytesExpired = -1;
79 size_t timeframesRead = -1;
80 size_t timeframesConsumed = -1;
81 size_t timeframesExpired = -1;
82 // Timeslices counting
83 size_t timeslicesStarted = -1;
84 size_t timeslicesExpired = -1;
85 size_t timeslicesDone = -1;
86};
87
88std::vector<MetricIndices> createDefaultIndices(std::vector<DeviceMetricsInfo>& allDevicesMetrics)
89{
90 std::vector<MetricIndices> results;
91
92 for (auto& info : allDevicesMetrics) {
93 results.emplace_back(MetricIndices{
94 .arrowBytesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-created"),
95 .arrowBytesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-destroyed"),
96 .arrowMessagesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-created"),
97 .arrowMessagesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-destroyed"),
98 .arrowBytesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-expired"),
99 .shmOfferBytesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "shm-offer-bytes-consumed"),
100 .timeframesRead = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "df-sent"),
101 .timeframesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "consumed-timeframes"),
102 .timeframesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "expired-timeframes"),
103 .timeslicesStarted = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "timeslices-started"),
104 .timeslicesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "timeslices-expired"),
105 .timeslicesDone = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "timeslices-done"),
106 });
107 }
108 return results;
109}
110
112 int64_t available;
113 int64_t offered = 0;
114 int64_t lastDeviceOffered = 0;
115};
117 int64_t enoughCount;
118 int64_t lowCount;
119};
121 char const* name;
122 char const* unit;
123 char const* api;
124 int64_t maxAvailable;
125 int64_t maxQuantum;
126 int64_t minQuantum;
128};
129
130auto offerResources(ResourceState& resourceState,
131 ResourceSpec const& resourceSpec,
132 ResourceStats& resourceStats,
133 std::vector<DeviceSpec> const& specs,
134 std::vector<DeviceInfo> const& infos,
135 DevicesManager& manager,
136 int64_t offerConsumedCurrentValue,
137 int64_t offerExpiredCurrentValue,
138 int64_t acquiredResourceCurrentValue,
139 int64_t disposedResourceCurrentValue,
140 size_t timestamp,
141 DeviceMetricsInfo& driverMetrics,
142 std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& availableResourceMetric,
143 std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& unusedOfferedResourceMetric,
144 std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& offeredResourceMetric,
145 void* signpostId) -> void
146{
147 O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, signpostId);
150 int64_t lastCandidate = -1;
151 int64_t possibleOffer = resourceSpec.minQuantum;
152
153 for (size_t di = 0; di < specs.size(); di++) {
154 if (resourceState.available < possibleOffer) {
155 if (resourceStats.lowCount == 0) {
156 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "not enough",
157 "We do not have enough %{public}s (%llu %{public}s) to offer %llu %{public}s. Total offerings %{bytes}llu %{string}s.",
158 resourceSpec.name, resourceState.available, resourceSpec.unit,
159 possibleOffer, resourceSpec.unit,
160 resourceState.offered, resourceSpec.unit);
161 }
162 resourceStats.lowCount++;
163 resourceStats.enoughCount = 0;
164 break;
165 } else {
166 if (resourceStats.enoughCount == 0) {
167 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "enough",
168 "We are back in a state where we enough %{public}s: %llu %{public}s",
169 resourceSpec.name,
170 resourceState.available,
171 resourceSpec.unit);
172 }
173 resourceStats.lowCount = 0;
174 resourceStats.enoughCount++;
175 }
176 size_t candidate = (resourceState.lastDeviceOffered + di) % specs.size();
177
178 auto& info = infos[candidate];
179 // Do not bother for inactive devices
180 // FIXME: there is probably a race condition if the device died and we did not
181 // took notice yet...
182 if (info.active == false || info.readyToQuit) {
183 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
184 "Device %s is inactive not offering %{public}s to it.",
185 specs[candidate].name.c_str(), resourceSpec.name);
186 continue;
187 }
188 if (specs[candidate].name != "internal-dpl-aod-reader") {
189 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
190 "Device %s is not a reader. Not offering %{public}s to it.",
191 specs[candidate].name.c_str(),
192 resourceSpec.name);
193 continue;
194 }
195 possibleOffer = std::min(resourceSpec.maxQuantum, resourceState.available);
196 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
197 "Offering %llu %{public}s out of %llu to %{public}s",
198 possibleOffer, resourceSpec.unit, resourceState.available, specs[candidate].id.c_str());
199 manager.queueMessage(specs[candidate].id.c_str(), fmt::format(fmt::runtime(resourceSpec.api), possibleOffer).data());
200 resourceState.available -= possibleOffer;
201 resourceState.offered += possibleOffer;
202 lastCandidate = candidate;
203 }
204 // We had at least a valid candidate, so
205 // next time we offer to the next device.
206 if (lastCandidate >= 0) {
207 resourceState.lastDeviceOffered = lastCandidate + 1;
208 }
209
210 // unusedOfferedSharedMemory is the amount of memory which was offered and which we know it was
211 // not used so far. So we need to account for the amount which got actually read (readerBytesCreated)
212 // and the amount which we know was given back.
213 static int64_t lastResourceOfferConsumed = 0;
214 static int64_t lastUnusedOfferedResource = 0;
215 if (offerConsumedCurrentValue != lastResourceOfferConsumed) {
216 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
217 "Offer consumed so far %llu", offerConsumedCurrentValue);
218 lastResourceOfferConsumed = offerConsumedCurrentValue;
219 }
220 int unusedOfferedResource = (resourceState.offered - (offerExpiredCurrentValue + offerConsumedCurrentValue) / resourceSpec.metricOfferScaleFactor);
221 if (lastUnusedOfferedResource != unusedOfferedResource) {
222 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
223 "unusedOfferedResource(%{public}s):%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / %lli",
224 resourceSpec.name,
225 unusedOfferedResource, resourceState.offered,
226 offerExpiredCurrentValue / resourceSpec.metricOfferScaleFactor,
227 offerConsumedCurrentValue / resourceSpec.metricOfferScaleFactor,
228 resourceSpec.metricOfferScaleFactor);
229 lastUnusedOfferedResource = unusedOfferedResource;
230 }
231 // availableSharedMemory is the amount of memory which we know is available to be offered.
232 // We subtract the amount which we know was already offered but it's unused and we then balance how
233 // much was created with how much was destroyed.
234 resourceState.available = resourceSpec.maxAvailable + ((disposedResourceCurrentValue - acquiredResourceCurrentValue) / resourceSpec.metricOfferScaleFactor) - unusedOfferedResource;
235 availableResourceMetric(driverMetrics, resourceState.available, timestamp);
236 unusedOfferedResourceMetric(driverMetrics, unusedOfferedResource, timestamp);
237
238 offeredResourceMetric(driverMetrics, resourceState.offered, timestamp);
239};
240
241auto processTimeslices = [](size_t index, DeviceMetricsInfo& deviceMetrics, bool& changed,
242 int64_t& totalMetricValue, size_t& lastTimestamp) {
243 assert(index < deviceMetrics.metrics.size());
244 changed |= deviceMetrics.changed[index];
245 MetricInfo info = deviceMetrics.metrics[index];
246 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
247 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
248 auto value = (int64_t)data[(info.pos - 1) % data.size()];
249 totalMetricValue += value;
250 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
251 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
252};
253
255{
256 using o2::monitoring::Metric;
257 using o2::monitoring::Monitoring;
258 using o2::monitoring::tags::Key;
259 using o2::monitoring::tags::Value;
260
261 return ServiceSpec{
262 .name = "arrow-backend",
264 .configure = CommonServices::noConfiguration(),
269 .metricHandling = [](ServiceRegistryRef registry,
270 ServiceMetricsInfo const& sm,
271 size_t timestamp) {
272 int64_t totalBytesCreated = 0;
273 int64_t shmOfferBytesConsumed = 0;
274 int64_t totalBytesDestroyed = 0;
275 int64_t totalBytesExpired = 0;
276 int64_t totalMessagesCreated = 0;
277 int64_t totalMessagesDestroyed = 0;
278 int64_t totalTimeframesRead = 0;
279 int64_t totalTimeframesConsumed = 0;
280 int64_t totalTimeframesExpired = 0;
281 int64_t totalTimeslicesStarted = 0;
282 int64_t totalTimeslicesDone = 0;
283 int64_t totalTimeslicesExpired = 0;
284 auto &driverMetrics = sm.driverMetricsInfo;
285 auto &allDeviceMetrics = sm.deviceMetricsInfos;
286 auto &specs = sm.deviceSpecs;
287 auto &infos = sm.deviceInfos;
288
289 // Aggregated driver metrics for timeslice rate limiting
290 auto createUint64DriverMetric = [&driverMetrics](char const*name) -> auto {
291 return DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, name);
292 };
293 auto createIntDriverMetric = [&driverMetrics](char const*name) -> auto {
294 return DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, name);
295 };
296
297 static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "rate-limit-state");
298 static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-created");
299 static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-shm-offer-bytes-consumed");
300 // These are really to monitor the rate limiting
301 static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-shared-memory");
302 static auto unusedOfferedTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-timeslices");
303 static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-shared-memory");
304 static auto availableTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-timeslices");
305 static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-shared-memory");
306 static auto offeredTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-timeslices");
307
308 static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-destroyed");
309 static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-expired");
310 static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-created");
311 static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-destroyed");
312 static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-read");
313 static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-consumed");
314 static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-timeframes-in-fly");
315
316 static auto totalTimeslicesStartedMetric = createUint64DriverMetric("total-timeslices-started");
317 static auto totalTimeslicesExpiredMetric = createUint64DriverMetric("total-timeslices-expired");
318 static auto totalTimeslicesDoneMetric = createUint64DriverMetric("total-timeslices-done");
319 static auto totalTimeslicesInFlyMetric = createIntDriverMetric("total-timeslices-in-fly");
320
321 static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "arrow-bytes-delta");
322 static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "changed-metrics-count");
323 static auto totalSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-reader-signals");
324 static auto signalLatencyMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-signal-latency");
325 static auto skippedSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-skipped-signals");
326 static auto remainingBytes = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-remaining-bytes");
327 auto& manager = registry.get<DevicesManager>();
328
329 bool changed = false;
330
331 size_t lastTimestamp = 0;
332 static std::vector<MetricIndices> allIndices = createDefaultIndices(allDeviceMetrics);
333 for (size_t mi = 0; mi < allDeviceMetrics.size(); ++mi) {
334 auto& deviceMetrics = allDeviceMetrics[mi];
335 if (deviceMetrics.changed.size() != deviceMetrics.metrics.size()) {
336 throw std::runtime_error("deviceMetrics.size() != allDeviceMetrics.size()");
337 }
338 auto& indices = allIndices[mi];
339 {
340 size_t index = indices.arrowBytesCreated;
341 assert(index < deviceMetrics.metrics.size());
342 changed |= deviceMetrics.changed[index];
343 MetricInfo info = deviceMetrics.metrics[index];
344 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
345 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
346 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
347 auto value = (int64_t)data[(info.pos - 1) % data.size()];
348 totalBytesCreated += value;
349 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
350 }
351 {
352 size_t index = indices.shmOfferBytesConsumed;
353 assert(index < deviceMetrics.metrics.size());
354 changed |= deviceMetrics.changed[index];
355 MetricInfo info = deviceMetrics.metrics[index];
356 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
357 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
358 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
359 auto value = (int64_t)data[(info.pos - 1) % data.size()];
360 shmOfferBytesConsumed += value;
361 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
362 }
363 {
364 size_t index = indices.arrowBytesDestroyed;
365 assert(index < deviceMetrics.metrics.size());
366 changed |= deviceMetrics.changed[index];
367 MetricInfo info = deviceMetrics.metrics[index];
368 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
369 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
370 auto value = (int64_t)data[(info.pos - 1) % data.size()];
371 totalBytesDestroyed += value;
372 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
373 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
374 }
375 {
376 size_t index = indices.arrowBytesExpired;
377 assert(index < deviceMetrics.metrics.size());
378 changed |= deviceMetrics.changed[index];
379 MetricInfo info = deviceMetrics.metrics[index];
380 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
381 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
382 auto value = (int64_t)data[(info.pos - 1) % data.size()];
383 totalBytesExpired += value;
384 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
385 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
386 }
387 {
388 size_t index = indices.arrowMessagesCreated;
389 assert(index < deviceMetrics.metrics.size());
390 MetricInfo info = deviceMetrics.metrics[index];
391 changed |= deviceMetrics.changed[index];
392 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
393 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
394 auto value = (int64_t)data[(info.pos - 1) % data.size()];
395 totalMessagesCreated += value;
396 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
397 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
398 }
399 {
400 size_t index = indices.arrowMessagesDestroyed;
401 assert(index < deviceMetrics.metrics.size());
402 MetricInfo info = deviceMetrics.metrics[index];
403 changed |= deviceMetrics.changed[index];
404 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
405 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
406 auto value = (int64_t)data[(info.pos - 1) % data.size()];
407 totalMessagesDestroyed += value;
408 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
409 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
410 }
411 {
412 size_t index = indices.timeframesRead;
413 assert(index < deviceMetrics.metrics.size());
414 changed |= deviceMetrics.changed[index];
415 MetricInfo info = deviceMetrics.metrics[index];
416 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
417 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
418 auto value = (int64_t)data[(info.pos - 1) % data.size()];
419 totalTimeframesRead += value;
420 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
421 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
422 }
423 {
424 size_t index = indices.timeframesConsumed;
425 assert(index < deviceMetrics.metrics.size());
426 changed |= deviceMetrics.changed[index];
427 MetricInfo info = deviceMetrics.metrics[index];
428 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
429 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
430 auto value = (int64_t)data[(info.pos - 1) % data.size()];
431 totalTimeframesConsumed += value;
432 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
433 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
434 }
435 {
436 size_t index = indices.timeframesExpired;
437 assert(index < deviceMetrics.metrics.size());
438 changed |= deviceMetrics.changed[index];
439 MetricInfo info = deviceMetrics.metrics[index];
440 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
441 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
442 auto value = (int64_t)data[(info.pos - 1) % data.size()];
443 totalTimeframesExpired += value;
444 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
445 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
446 }
447 processTimeslices(indices.timeslicesStarted, deviceMetrics, changed, totalTimeslicesStarted, lastTimestamp);
448 processTimeslices(indices.timeslicesExpired, deviceMetrics, changed, totalTimeslicesExpired, lastTimestamp);
449 processTimeslices(indices.timeslicesDone, deviceMetrics, changed, totalTimeslicesDone, lastTimestamp);
450 }
451 static uint64_t unchangedCount = 0;
452 if (changed) {
453 totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp);
454 totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp);
455 totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp);
456 shmOfferConsumedMetric(driverMetrics, shmOfferBytesConsumed, timestamp);
457 totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp);
458 totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
459 totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
460 totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
461 totalTimeframesInFlyMetric(driverMetrics, (int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
462 totalTimeslicesStartedMetric(driverMetrics, totalTimeslicesStarted, timestamp);
463 totalTimeslicesExpiredMetric(driverMetrics, totalTimeslicesExpired, timestamp);
464 totalTimeslicesDoneMetric(driverMetrics, totalTimeslicesDone, timestamp);
465 totalTimeslicesInFlyMetric(driverMetrics, (int)(totalTimeslicesStarted - totalTimeslicesDone), timestamp);
466 totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
467 } else {
468 unchangedCount++;
469 }
470 changedCountMetric(driverMetrics, unchangedCount, timestamp);
471
472 static const ResourceSpec shmResourceSpec{
473 .name = "shared memory",
474 .unit = "MB",
475 .api = "/shm-offer {}",
476 .maxAvailable = (int64_t)registry.get<RateLimitConfig>().maxMemory,
477 .maxQuantum = 100,
478 .minQuantum = 50,
479 .metricOfferScaleFactor = 1000000,
480 };
481 static const ResourceSpec timesliceResourceSpec{
482 .name = "timeslice",
483 .unit = "timeslices",
484 .api = "/timeslice-offer {}",
485 .maxAvailable = (int64_t)registry.get<RateLimitConfig>().maxTimeframes,
486 .maxQuantum = 1,
487 .minQuantum = 1,
488 .metricOfferScaleFactor = 1,
489 };
490 static ResourceState shmResourceState{
491 .available = shmResourceSpec.maxAvailable,
492 };
493 static ResourceState timesliceResourceState{
494 .available = timesliceResourceSpec.maxAvailable,
495 };
496 static ResourceStats shmResourceStats{
497 .enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
498 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
499 };
500 static ResourceStats timesliceResourceStats{
501 .enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
502 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
503 };
504
505 offerResources(timesliceResourceState, timesliceResourceSpec, timesliceResourceStats,
506 specs, infos, manager, totalTimeframesConsumed, totalTimeslicesExpired,
507 totalTimeslicesStarted, totalTimeslicesDone, timestamp, driverMetrics,
508 availableTimeslicesMetric, unusedOfferedTimeslicesMetric, offeredTimeslicesMetric,
509 (void*)&sm);
510
511 offerResources(shmResourceState, shmResourceSpec, shmResourceStats,
512 specs, infos, manager, shmOfferBytesConsumed, totalBytesExpired,
513 totalBytesCreated, totalBytesDestroyed, timestamp, driverMetrics,
514 availableSharedMemoryMetric, unusedOfferedSharedMemoryMetric, offeredSharedMemoryMetric,
515 (void*)&sm); },
516 .postDispatching = [](ProcessingContext& ctx, void* service) {
518 auto* arrow = reinterpret_cast<ArrowContext*>(service);
519 auto totalBytes = 0;
520 auto totalMessages = 0;
521 O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, &arrow);
522 for (auto& input : ctx.inputs()) {
523 if (input.header == nullptr) {
524 continue;
525 }
526 auto const* dh = DataRefUtils::getHeader<DataHeader*>(input);
527 auto payloadSize = DataRefUtils::getPayloadSize(input);
528 if (dh->serialization != o2::header::gSerializationMethodArrow) {
529 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
530 "Message %{public}.4s/%{public}.16s is not of kind arrow, therefore we are not accounting its shared memory.",
531 dh->dataOrigin.str, dh->dataDescription.str);
532 continue;
533 }
534 bool forwarded = std::ranges::any_of(ctx.services().get<DeviceSpec const>().forwards, [&dh](auto const& forward) { return DataSpecUtils::match(forward.matcher, *dh); });
535 if (forwarded) {
536 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
537 "Message %{public}.4s/%{public}.16s is forwarded so we are not returning its memory.",
538 dh->dataOrigin.str, dh->dataDescription.str);
539 continue;
540 }
541 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
542 "Message %{public}.4s/%{public}.16s is being deleted. We will return %{bytes}f MB.",
543 dh->dataOrigin.str, dh->dataDescription.str, payloadSize / 1000000.);
544 totalBytes += payloadSize;
545 totalMessages += 1;
546 }
547 arrow->updateBytesDestroyed(totalBytes);
548 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "give back",
549 "%{bytes}f MB bytes being given back to reader, totaling %{bytes}f MB",
550 totalBytes / 1000000., arrow->bytesDestroyed() / 1000000.);
551 arrow->updateMessagesDestroyed(totalMessages);
552 auto& stats = ctx.services().get<DataProcessingStats>();
553 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_DESTROYED), DataProcessingStats::Op::Set, static_cast<int64_t>(arrow->bytesDestroyed())});
554 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_DESTROYED), DataProcessingStats::Op::Set, static_cast<int64_t>(arrow->messagesDestroyed())});
555 stats.processCommandQueue(); },
556 .driverInit = [](ServiceRegistryRef registry, DeviceConfig const& dc) {
557 auto config = new RateLimitConfig{};
558 int readers = std::stoll(dc.options["readers"].as<std::string>());
559 if (dc.options.count("aod-memory-rate-limit") && dc.options["aod-memory-rate-limit"].defaulted() == false) {
560 config->maxMemory = std::stoll(dc.options["aod-memory-rate-limit"].as<std::string>()) / 1000000;
561 } else {
562 config->maxMemory = readers * 2000;
563 }
564 if (dc.options.count("timeframes-rate-limit") && dc.options["timeframes-rate-limit"].defaulted() == false) {
565 config->maxTimeframes = std::stoll(dc.options["timeframes-rate-limit"].as<std::string>());
566 } else {
567 config->maxTimeframes = readers * DefaultsHelpers::pipelineLength();
568 }
569 static bool once = false;
570 // Until we guarantee this is called only once...
571 if (!once) {
572 O2_SIGNPOST_ID_GENERATE(sid, rate_limiting);
573 O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "setup",
574 "Rate limiting set up at %{bytes}llu MB and %llu timeframes distributed over %d readers",
575 config->maxMemory, config->maxTimeframes, readers);
576 registry.registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
577 once = true;
578 } },
579 .adjustTopology = [](WorkflowSpecNode& node, ConfigContext const& ctx) {
580 auto& workflow = node.specs;
581 auto spawner = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-spawner"); });
582 auto analysisCCDB = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-ccdb"); });
583 auto builder = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-index-builder"); });
584 auto reader = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-reader"); });
585 auto writer = std::ranges::find_if(workflow, [](DataProcessorSpec const& spec) { return spec.name.starts_with("internal-dpl-aod-writer"); });
586 auto& dec = ctx.services().get<DanglingEdgesContext>();
587 dec.requestedAODs.clear();
588 dec.requestedDYNs.clear();
589 dec.providedDYNs.clear();
590 dec.providedTIMs.clear();
591 dec.requestedTIMs.clear();
592
593 auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
594 auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
595
596 if (builder != workflow.end()) {
597 // collect currently requested IDXs
598 dec.requestedIDXs.clear();
599 for (auto& d : workflow | views::exclude_by_name(builder->name)) {
600 d.inputs |
601 views::partial_match_filter(header::DataOrigin{"IDX"}) |
602 sinks::update_input_list{dec.requestedIDXs};
603 }
604 // recreate inputs and outputs
605 builder->inputs.clear();
606 builder->outputs.clear();
607
608 // load real AlgorithmSpec before deployment
609 builder->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "IndexTableBuilder", ctx);
610 AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.requestedIDXs, dec.requestedAODs, dec.requestedDYNs, *builder);
611 }
612
613 if (spawner != workflow.end()) {
614 // collect currently requested DYNs
615 for (auto& d : workflow | views::exclude_by_name(spawner->name)) {
616 d.inputs |
617 views::partial_match_filter(header::DataOrigin{"DYN"}) |
618 sinks::update_input_list{dec.requestedDYNs};
619 d.outputs |
620 views::partial_match_filter(header::DataOrigin{"DYN"}) |
621 sinks::append_to{dec.providedDYNs};
622 }
623 std::ranges::sort(dec.requestedDYNs, inputSpecLessThan);
624 std::ranges::sort(dec.providedDYNs, outputSpecLessThan);
625 dec.spawnerInputs.clear();
626 dec.requestedDYNs |
627 views::filter_not_matching(dec.providedDYNs) |
628 sinks::append_to{dec.spawnerInputs};
629 // recreate inputs and outputs
630 spawner->outputs.clear();
631 spawner->inputs.clear();
632
633 // load real AlgorithmSpec before deployment
634 spawner->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "ExtendedTableSpawner", ctx);
635 AnalysisSupportHelpers::addMissingOutputsToSpawner({}, dec.spawnerInputs, dec.requestedAODs, *spawner);
636 }
637
638 if (analysisCCDB != workflow.end()) {
639 for (auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
640 d.inputs | views::partial_match_filter(header::DataOrigin{"ATIM"}) | sinks::update_input_list{dec.requestedTIMs};
641 d.outputs | views::partial_match_filter(header::DataOrigin{"ATIM"}) | sinks::append_to{dec.providedTIMs};
642 }
643 std::ranges::sort(dec.requestedTIMs, inputSpecLessThan);
644 std::ranges::sort(dec.providedTIMs, outputSpecLessThan);
645 // Use ranges::to<std::vector<>> in C++23...
646 dec.analysisCCDBInputs.clear();
647 dec.requestedTIMs | views::filter_not_matching(dec.providedTIMs) | sinks::append_to{dec.analysisCCDBInputs};
648
649 // recreate inputs and outputs
650 analysisCCDB->outputs.clear();
651 analysisCCDB->inputs.clear();
652 // load real AlgorithmSpec before deployment
653 // FIXME how can I make the lookup depend on DYN tables as well??
654 analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
655 AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.analysisCCDBInputs, dec.requestedAODs, dec.requestedDYNs, *analysisCCDB);
656 }
657
658 if (writer != workflow.end()) {
659 workflow.erase(writer);
660 }
661
662 if (reader != workflow.end()) {
663 // If reader and/or builder were adjusted, remove unneeded outputs
664 // update currently requested AODs
665 for (auto& d : workflow) {
666 d.inputs |
667 views::partial_match_filter(AODOrigins) |
668 sinks::update_input_list{dec.requestedAODs};
669 }
670
671 // remove unmatched outputs
672 auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](OutputSpec const& o) {
673 return !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFNumber"}) && !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFFilename"}) && std::none_of(dec.requestedAODs.begin(), dec.requestedAODs.end(), [&](InputSpec const& i) { return DataSpecUtils::match(i, o); });
674 });
675 reader->outputs.erase(o_end, reader->outputs.end());
676 if (reader->outputs.empty()) {
677 // nothing to read
678 workflow.erase(reader);
679 } else {
680 // load reader algorithm before deployment
681 auto mctracks2aod = std::find_if(workflow.begin(), workflow.end(), [](auto const& x) { return x.name == "mctracks-to-aod"; });
682 if (mctracks2aod == workflow.end()) { // add normal reader algorithm only if no on-the-fly generator is injected
683 reader->algorithm = CommonDataProcessors::wrapWithTimesliceConsumption(PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTFileReader", ctx));
684 } // otherwise the algorithm was set in injectServiceDevices
685 }
686 }
687
689
690 // Move the dummy sink at the end, if needed
691 for (size_t i = 0; i < workflow.size(); ++i) {
692 if (workflow[i].name == "internal-dpl-injected-dummy-sink") {
693 workflow.push_back(workflow[i]);
694 workflow.erase(workflow.begin() + i);
695 break;
696 }
697 } },
698 .kind = ServiceKind::Global};
699}
700
702{
703 return ServiceSpec{
704 .name = "arrow-slicing-cache-def",
705 .uniqueId = CommonServices::simpleServiceId<ArrowTableSlicingCacheDef>(),
706 .init = CommonServices::simpleServiceInit<ArrowTableSlicingCacheDef, ArrowTableSlicingCacheDef, ServiceKind::Global>(),
707 .kind = ServiceKind::Global};
708}
709
711{
712 return ServiceSpec{
713 .name = "arrow-slicing-cache",
714 .uniqueId = CommonServices::simpleServiceId<ArrowTableSlicingCache>(),
715 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions&) { return ServiceHandle{TypeIdHelpers::uniqueId<ArrowTableSlicingCache>(),
716 new ArrowTableSlicingCache(Cache{services.get<ArrowTableSlicingCacheDef>().bindingsKeys},
717 Cache{services.get<ArrowTableSlicingCacheDef>().bindingsKeysUnsorted}),
719 .configure = CommonServices::noConfiguration(),
720 .preProcessing = [](ProcessingContext& pc, void* service_ptr) {
721 auto* service = static_cast<ArrowTableSlicingCache*>(service_ptr);
722 auto& caches = service->bindingsKeys;
723 for (auto i = 0u; i < caches.size(); ++i) {
724 if (caches[i].enabled && pc.inputs().getPos(caches[i].binding.c_str()) >= 0) {
725 auto status = service->updateCacheEntry(i, pc.inputs().get<TableConsumer>(caches[i].matcher)->asArrowTable());
726 if (!status.ok()) {
727 throw runtime_error_f("Failed to update slice cache for %s/%s", caches[i].binding.c_str(), caches[i].key.c_str());
728 }
729 }
730 }
731 auto& unsortedCaches = service->bindingsKeysUnsorted;
732 for (auto i = 0u; i < unsortedCaches.size(); ++i) {
733 if (unsortedCaches[i].enabled && pc.inputs().getPos(unsortedCaches[i].binding.c_str()) >= 0) {
734 auto status = service->updateCacheEntryUnsorted(i, pc.inputs().get<TableConsumer>(unsortedCaches[i].matcher)->asArrowTable());
735 if (!status.ok()) {
736 throw runtime_error_f("failed to update slice cache (unsorted) for %s/%s", unsortedCaches[i].binding.c_str(), unsortedCaches[i].key.c_str());
737 }
738 }
739 } },
740 .kind = ServiceKind::Stream};
741}
742
743} // namespace o2::framework
std::string binding
std::unique_ptr< expressions::Node > node
int32_t i
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:531
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
int getPos(const char *name) const
decltype(auto) get(R binding, int part=0) const
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
std::shared_ptr< arrow::Table > asArrowTable()
Return the table in the message as a arrow::Table instance.
GLint GLenum GLint x
Definition glcorearb.h:403
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum GLenum GLsizei const GLuint GLboolean enabled
Definition glcorearb.h:2513
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLboolean * data
Definition glcorearb.h:298
GLsizei GLenum const void * indices
Definition glcorearb.h:400
Defining PrimaryVertex explicitly as messageable.
auto spawner(std::shared_ptr< arrow::Table > const &fullTable, const char *name, o2::framework::expressions::Projector *projectors, std::shared_ptr< gandiva::Projector > &projector, std::shared_ptr< arrow::Schema > const &schema)
Expression-based column generator to materialize columns.
auto offerResources(ResourceState &resourceState, ResourceSpec const &resourceSpec, ResourceStats &resourceStats, std::vector< DeviceSpec > const &specs, std::vector< DeviceInfo > const &infos, DevicesManager &manager, int64_t offerConsumedCurrentValue, int64_t offerExpiredCurrentValue, int64_t acquiredResourceCurrentValue, int64_t disposedResourceCurrentValue, size_t timestamp, DeviceMetricsInfo &driverMetrics, std::function< void(DeviceMetricsInfo &, int value, size_t timestamp)> &availableResourceMetric, std::function< void(DeviceMetricsInfo &, int value, size_t timestamp)> &unusedOfferedResourceMetric, std::function< void(DeviceMetricsInfo &, int value, size_t timestamp)> &offeredResourceMetric, void *signpostId) -> void
std::vector< Entry > Cache
std::vector< MetricIndices > createDefaultIndices(std::vector< DeviceMetricsInfo > &allDevicesMetrics)
RuntimeErrorRef runtime_error_f(const char *,...)
constexpr o2::header::SerializationMethod gSerializationMethodArrow
Definition DataHeader.h:331
static void addMissingOutputsToBuilder(std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static void addMissingOutputsToSpawner(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, DataProcessorSpec &publisher)
static ServiceSpec arrowTableSlicingCacheSpec()
static ServiceSpec arrowBackendSpec()
static ServiceSpec arrowTableSlicingCacheDefSpec()
static AlgorithmSpec wrapWithTimesliceConsumption(AlgorithmSpec spec)
static ServiceConfigureCallback noConfiguration()
Helper struct to hold statistics about the data processing happening.
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static unsigned int pipelineLength()
get max number of timeslices in the queue
std::vector< ForwardRoute > forwards
Definition DeviceSpec.h:64
Running state information of a given device.
Definition DeviceState.h:34
void queueMessage(char const *receiver, char const *msg)
static auto loadAlgorithmFromPlugin(std::string library, std::string plugin, ConfigContext const &context) -> AlgorithmSpec
int64_t maxAvailable
The callback to give resources to a device.
int64_t minQuantum
Largest offer which can be given.
int64_t maxQuantum
Maximum available quantity for a resource.
int64_t metricOfferScaleFactor
Smallest offer which can be given.
int64_t lowCount
How many times the resources were enough.
std::string name
Name of the service.
static void injectAODWriter(WorkflowSpec &workflow, ConfigContext const &ctx)
the main header struct
Definition DataHeader.h:619