256 using o2::monitoring::Metric;
257 using o2::monitoring::Monitoring;
258 using o2::monitoring::tags::Key;
259 using o2::monitoring::tags::Value;
262 .
name =
"arrow-backend",
272 int64_t totalBytesCreated = 0;
273 int64_t shmOfferBytesConsumed = 0;
274 int64_t totalBytesDestroyed = 0;
275 int64_t totalBytesExpired = 0;
276 int64_t totalMessagesCreated = 0;
277 int64_t totalMessagesDestroyed = 0;
278 int64_t totalTimeframesRead = 0;
279 int64_t totalTimeframesConsumed = 0;
280 int64_t totalTimeframesExpired = 0;
281 int64_t totalTimeslicesStarted = 0;
282 int64_t totalTimeslicesDone = 0;
283 int64_t totalTimeslicesExpired = 0;
284 auto &driverMetrics = sm.driverMetricsInfo;
285 auto &allDeviceMetrics = sm.deviceMetricsInfos;
286 auto &specs = sm.deviceSpecs;
287 auto &infos = sm.deviceInfos;
290 auto createUint64DriverMetric = [&driverMetrics](
char const*
name) ->
auto {
291 return DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
name);
293 auto createIntDriverMetric = [&driverMetrics](
char const*
name) ->
auto {
294 return DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
name);
297 static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"rate-limit-state");
298 static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-created");
299 static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-shm-offer-bytes-consumed");
301 static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-unused-offered-shared-memory");
302 static auto unusedOfferedTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-unused-offered-timeslices");
303 static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-available-shared-memory");
304 static auto availableTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-available-timeslices");
305 static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-offered-shared-memory");
306 static auto offeredTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-offered-timeslices");
308 static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-destroyed");
309 static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-expired");
310 static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-messages-created");
311 static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-messages-destroyed");
312 static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-timeframes-read");
313 static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-timeframes-consumed");
314 static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-timeframes-in-fly");
316 static auto totalTimeslicesStartedMetric = createUint64DriverMetric(
"total-timeslices-started");
317 static auto totalTimeslicesExpiredMetric = createUint64DriverMetric(
"total-timeslices-expired");
318 static auto totalTimeslicesDoneMetric = createUint64DriverMetric(
"total-timeslices-done");
319 static auto totalTimeslicesInFlyMetric = createIntDriverMetric(
"total-timeslices-in-fly");
321 static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"arrow-bytes-delta");
322 static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"changed-metrics-count");
323 static auto totalSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-reader-signals");
324 static auto signalLatencyMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-signal-latency");
325 static auto skippedSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-skipped-signals");
326 static auto remainingBytes = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-remaining-bytes");
329 bool changed =
false;
331 size_t lastTimestamp = 0;
333 for (
size_t mi = 0; mi < allDeviceMetrics.size(); ++mi) {
334 auto& deviceMetrics = allDeviceMetrics[mi];
335 if (deviceMetrics.changed.size() != deviceMetrics.metrics.size()) {
336 throw std::runtime_error(
"deviceMetrics.size() != allDeviceMetrics.size()");
338 auto&
indices = allIndices[mi];
341 assert(
index < deviceMetrics.metrics.size());
342 changed |= deviceMetrics.changed[
index];
344 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
345 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
346 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
348 totalBytesCreated +=
value;
349 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
353 assert(
index < deviceMetrics.metrics.size());
354 changed |= deviceMetrics.changed[
index];
356 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
357 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
358 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
360 shmOfferBytesConsumed +=
value;
361 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
365 assert(
index < deviceMetrics.metrics.size());
366 changed |= deviceMetrics.changed[
index];
368 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
369 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
371 totalBytesDestroyed +=
value;
372 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
373 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
377 assert(
index < deviceMetrics.metrics.size());
378 changed |= deviceMetrics.changed[
index];
380 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
381 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
383 totalBytesExpired +=
value;
384 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
385 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
389 assert(
index < deviceMetrics.metrics.size());
391 changed |= deviceMetrics.changed[
index];
392 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
393 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
395 totalMessagesCreated +=
value;
396 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
397 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
401 assert(
index < deviceMetrics.metrics.size());
403 changed |= deviceMetrics.changed[
index];
404 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
405 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
407 totalMessagesDestroyed +=
value;
408 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
409 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
413 assert(
index < deviceMetrics.metrics.size());
414 changed |= deviceMetrics.changed[
index];
416 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
417 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
419 totalTimeframesRead +=
value;
420 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
421 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
425 assert(
index < deviceMetrics.metrics.size());
426 changed |= deviceMetrics.changed[
index];
428 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
429 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
431 totalTimeframesConsumed +=
value;
432 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
433 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
437 assert(
index < deviceMetrics.metrics.size());
438 changed |= deviceMetrics.changed[
index];
440 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
441 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
443 totalTimeframesExpired +=
value;
444 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
445 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
451 static uint64_t unchangedCount = 0;
453 totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp);
454 totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp);
455 totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp);
456 shmOfferConsumedMetric(driverMetrics, shmOfferBytesConsumed, timestamp);
457 totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp);
458 totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
459 totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
460 totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
461 totalTimeframesInFlyMetric(driverMetrics, (
int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
462 totalTimeslicesStartedMetric(driverMetrics, totalTimeslicesStarted, timestamp);
463 totalTimeslicesExpiredMetric(driverMetrics, totalTimeslicesExpired, timestamp);
464 totalTimeslicesDoneMetric(driverMetrics, totalTimeslicesDone, timestamp);
465 totalTimeslicesInFlyMetric(driverMetrics, (
int)(totalTimeslicesStarted - totalTimeslicesDone), timestamp);
466 totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
470 changedCountMetric(driverMetrics, unchangedCount, timestamp);
473 .
name =
"shared memory",
475 .api =
"/shm-offer {}",
479 .metricOfferScaleFactor = 1000000,
483 .unit =
"timeslices",
484 .api =
"/timeslice-offer {}",
488 .metricOfferScaleFactor = 1,
491 .
available = shmResourceSpec.maxAvailable,
494 .
available = timesliceResourceSpec.maxAvailable,
497 .
enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
498 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
501 .
enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
502 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
505 offerResources(timesliceResourceState, timesliceResourceSpec, timesliceResourceStats,
506 specs, infos, manager, totalTimeframesConsumed, totalTimeslicesExpired,
507 totalTimeslicesStarted, totalTimeslicesDone, timestamp, driverMetrics,
508 availableTimeslicesMetric, unusedOfferedTimeslicesMetric, offeredTimeslicesMetric,
511 offerResources(shmResourceState, shmResourceSpec, shmResourceStats,
512 specs, infos, manager, shmOfferBytesConsumed, totalBytesExpired,
513 totalBytesCreated, totalBytesDestroyed, timestamp, driverMetrics,
514 availableSharedMemoryMetric, unusedOfferedSharedMemoryMetric, offeredSharedMemoryMetric,
520 auto totalMessages = 0;
522 for (
auto& input : ctx.
inputs()) {
523 if (input.header ==
nullptr) {
526 auto const* dh = DataRefUtils::getHeader<DataHeader*>(input);
530 "Message %{public}.4s/%{public}.16s is not of kind arrow, therefore we are not accounting its shared memory.",
531 dh->dataOrigin.str, dh->dataDescription.str);
534 bool forwarded = std::ranges::any_of(ctx.
services().
get<
DeviceSpec const>().
forwards, [&dh](
auto const& forward) { return DataSpecUtils::match(forward.matcher, *dh); });
537 "Message %{public}.4s/%{public}.16s is forwarded so we are not returning its memory.",
538 dh->dataOrigin.str, dh->dataDescription.str);
542 "Message %{public}.4s/%{public}.16s is being deleted. We will return %{bytes}f MB.",
543 dh->dataOrigin.str, dh->dataDescription.str, payloadSize / 1000000.);
544 totalBytes += payloadSize;
547 arrow->updateBytesDestroyed(totalBytes);
549 "%{bytes}f MB bytes being given back to reader, totaling %{bytes}f MB",
550 totalBytes / 1000000.,
arrow->bytesDestroyed() / 1000000.);
551 arrow->updateMessagesDestroyed(totalMessages);
555 stats.processCommandQueue(); },
558 int readers = std::stoll(dc.options[
"readers"].as<std::string>());
559 if (dc.options.count(
"aod-memory-rate-limit") && dc.options[
"aod-memory-rate-limit"].defaulted() ==
false) {
560 config->maxMemory = std::stoll(dc.options[
"aod-memory-rate-limit"].as<std::string>()) / 1000000;
562 config->maxMemory = readers * 2000;
564 if (dc.options.count(
"timeframes-rate-limit") && dc.options[
"timeframes-rate-limit"].defaulted() ==
false) {
565 config->maxTimeframes = std::stoll(dc.options[
"timeframes-rate-limit"].as<std::string>());
569 static bool once =
false;
574 "Rate limiting set up at %{bytes}llu MB and %llu timeframes distributed over %d readers",
575 config->maxMemory, config->maxTimeframes, readers);
576 registry.
registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
580 auto& workflow =
node.specs;
581 auto spawner = std::ranges::find_if(workflow, [](
DataProcessorSpec const& spec) {
return spec.
name.starts_with(
"internal-dpl-aod-spawner"); });
582 auto analysisCCDB = std::ranges::find_if(workflow, [](
DataProcessorSpec const& spec) {
return spec.
name.starts_with(
"internal-dpl-aod-ccdb"); });
583 auto builder = std::ranges::find_if(workflow, [](
DataProcessorSpec const& spec) {
return spec.
name.starts_with(
"internal-dpl-aod-index-builder"); });
584 auto reader = std::ranges::find_if(workflow, [](
DataProcessorSpec const& spec) {
return spec.
name.starts_with(
"internal-dpl-aod-reader"); });
585 auto writer = std::ranges::find_if(workflow, [](
DataProcessorSpec const& spec) {
return spec.
name.starts_with(
"internal-dpl-aod-writer"); });
588 dec.requestedDYNs.clear();
589 dec.providedDYNs.clear();
590 dec.providedTIMs.clear();
591 dec.requestedTIMs.clear();
596 if (builder != workflow.end()) {
598 dec.requestedIDXs.clear();
599 for (
auto& d : workflow | views::exclude_by_name(builder->name)) {
605 builder->inputs.clear();
606 builder->outputs.clear();
613 if (
spawner != workflow.end()) {
615 for (
auto& d : workflow | views::exclude_by_name(
spawner->name)) {
623 std::ranges::sort(dec.requestedDYNs, inputSpecLessThan);
624 std::ranges::sort(dec.providedDYNs, outputSpecLessThan);
625 dec.spawnerInputs.clear();
627 views::filter_not_matching(dec.providedDYNs) |
638 if (analysisCCDB != workflow.end()) {
639 for (
auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
643 std::ranges::sort(dec.requestedTIMs, inputSpecLessThan);
644 std::ranges::sort(dec.providedTIMs, outputSpecLessThan);
646 dec.analysisCCDBInputs.clear();
647 dec.requestedTIMs | views::filter_not_matching(dec.providedTIMs) |
sinks::append_to{dec.analysisCCDBInputs};
650 analysisCCDB->outputs.clear();
651 analysisCCDB->inputs.clear();
658 if (writer != workflow.end()) {
659 workflow.erase(writer);
662 if (reader != workflow.end()) {
665 for (
auto& d : workflow) {
667 views::partial_match_filter(AODOrigins) |
672 auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](
OutputSpec const& o) {
675 reader->outputs.erase(o_end, reader->outputs.end());
676 if (reader->outputs.empty()) {
678 workflow.erase(reader);
681 auto mctracks2aod = std::find_if(workflow.begin(), workflow.end(), [](
auto const&
x) { return x.name ==
"mctracks-to-aod"; });
682 if (mctracks2aod == workflow.end()) {
691 for (
size_t i = 0;
i < workflow.size(); ++
i) {
692 if (workflow[
i].
name ==
"internal-dpl-injected-dummy-sink") {
693 workflow.push_back(workflow[
i]);
694 workflow.erase(workflow.begin() +
i);