256 using o2::monitoring::Metric;
257 using o2::monitoring::Monitoring;
258 using o2::monitoring::tags::Key;
259 using o2::monitoring::tags::Value;
262 .
name =
"arrow-backend",
272 int64_t totalBytesCreated = 0;
273 int64_t shmOfferBytesConsumed = 0;
274 int64_t totalBytesDestroyed = 0;
275 int64_t totalBytesExpired = 0;
276 int64_t totalMessagesCreated = 0;
277 int64_t totalMessagesDestroyed = 0;
278 int64_t totalTimeframesRead = 0;
279 int64_t totalTimeframesConsumed = 0;
280 int64_t totalTimeframesExpired = 0;
281 int64_t totalTimeslicesStarted = 0;
282 int64_t totalTimeslicesDone = 0;
283 int64_t totalTimeslicesExpired = 0;
284 auto &driverMetrics = sm.driverMetricsInfo;
285 auto &allDeviceMetrics = sm.deviceMetricsInfos;
286 auto &specs = sm.deviceSpecs;
287 auto &infos = sm.deviceInfos;
290 auto createUint64DriverMetric = [&driverMetrics](
char const*
name) ->
auto {
291 return DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
name);
293 auto createIntDriverMetric = [&driverMetrics](
char const*
name) ->
auto {
294 return DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
name);
297 static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"rate-limit-state");
298 static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-created");
299 static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-shm-offer-bytes-consumed");
301 static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-unused-offered-shared-memory");
302 static auto unusedOfferedTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-unused-offered-timeslices");
303 static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-available-shared-memory");
304 static auto availableTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-available-timeslices");
305 static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-offered-shared-memory");
306 static auto offeredTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-offered-timeslices");
308 static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-destroyed");
309 static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-expired");
310 static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-messages-created");
311 static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-messages-destroyed");
312 static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-timeframes-read");
313 static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-timeframes-consumed");
314 static auto totalTimeframesInFlightMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-timeframes-in-flight");
316 static auto totalTimeslicesStartedMetric = createUint64DriverMetric(
"total-timeslices-started");
317 static auto totalTimeslicesExpiredMetric = createUint64DriverMetric(
"total-timeslices-expired");
318 static auto totalTimeslicesDoneMetric = createUint64DriverMetric(
"total-timeslices-done");
319 static auto totalTimeslicesInFlightMetric = createIntDriverMetric(
"total-timeslices-in-flight");
321 static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"arrow-bytes-delta");
322 static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"changed-metrics-count");
323 static auto totalSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-reader-signals");
324 static auto signalLatencyMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-signal-latency");
325 static auto skippedSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-skipped-signals");
326 static auto remainingBytes = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-remaining-bytes");
329 bool changed =
false;
331 size_t lastTimestamp = 0;
333 for (
size_t mi = 0; mi < allDeviceMetrics.size(); ++mi) {
334 auto& deviceMetrics = allDeviceMetrics[mi];
335 if (deviceMetrics.changed.size() != deviceMetrics.metrics.size()) {
336 throw std::runtime_error(
"deviceMetrics.size() != allDeviceMetrics.size()");
338 auto&
indices = allIndices[mi];
341 assert(
index < deviceMetrics.metrics.size());
342 changed |= deviceMetrics.changed[
index];
344 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
345 auto&
data = deviceMetrics.uint64Metrics[info.storeIdx];
346 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
348 totalBytesCreated +=
value;
349 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) %
data.size()]);
353 assert(
index < deviceMetrics.metrics.size());
354 changed |= deviceMetrics.changed[
index];
356 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
357 auto&
data = deviceMetrics.uint64Metrics[info.storeIdx];
358 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
360 shmOfferBytesConsumed +=
value;
361 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) %
data.size()]);
365 assert(
index < deviceMetrics.metrics.size());
366 changed |= deviceMetrics.changed[
index];
368 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
369 auto&
data = deviceMetrics.uint64Metrics[info.storeIdx];
371 totalBytesDestroyed +=
value;
372 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
373 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) %
data.size()]);
377 assert(
index < deviceMetrics.metrics.size());
378 changed |= deviceMetrics.changed[
index];
380 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
381 auto&
data = deviceMetrics.uint64Metrics[info.storeIdx];
383 totalBytesExpired +=
value;
384 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
385 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) %
data.size()]);
389 assert(
index < deviceMetrics.metrics.size());
391 changed |= deviceMetrics.changed[
index];
392 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
393 auto&
data = deviceMetrics.uint64Metrics[info.storeIdx];
395 totalMessagesCreated +=
value;
396 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
397 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) %
data.size()]);
401 assert(
index < deviceMetrics.metrics.size());
403 changed |= deviceMetrics.changed[
index];
404 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
405 auto&
data = deviceMetrics.uint64Metrics[info.storeIdx];
407 totalMessagesDestroyed +=
value;
408 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
409 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) %
data.size()]);
413 assert(
index < deviceMetrics.metrics.size());
414 changed |= deviceMetrics.changed[
index];
416 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
417 auto&
data = deviceMetrics.uint64Metrics[info.storeIdx];
419 totalTimeframesRead +=
value;
420 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
421 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) %
data.size()]);
425 assert(
index < deviceMetrics.metrics.size());
426 changed |= deviceMetrics.changed[
index];
428 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
429 auto&
data = deviceMetrics.uint64Metrics[info.storeIdx];
431 totalTimeframesConsumed +=
value;
432 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
433 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) %
data.size()]);
437 assert(
index < deviceMetrics.metrics.size());
438 changed |= deviceMetrics.changed[
index];
440 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
441 auto&
data = deviceMetrics.uint64Metrics[info.storeIdx];
443 totalTimeframesExpired +=
value;
444 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
445 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) %
data.size()]);
451 static uint64_t unchangedCount = 0;
453 totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp);
454 totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp);
455 totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp);
456 shmOfferConsumedMetric(driverMetrics, shmOfferBytesConsumed, timestamp);
457 totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp);
458 totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
459 totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
460 totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
461 totalTimeframesInFlightMetric(driverMetrics, (
int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
462 totalTimeslicesStartedMetric(driverMetrics, totalTimeslicesStarted, timestamp);
463 totalTimeslicesExpiredMetric(driverMetrics, totalTimeslicesExpired, timestamp);
464 totalTimeslicesDoneMetric(driverMetrics, totalTimeslicesDone, timestamp);
465 totalTimeslicesInFlightMetric(driverMetrics, (
int)(totalTimeslicesStarted - totalTimeslicesDone), timestamp);
466 totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
470 changedCountMetric(driverMetrics, unchangedCount, timestamp);
473 .
name =
"shared memory",
475 .api =
"/shm-offer {}",
479 .metricOfferScaleFactor = 1000000,
483 .unit =
"timeslices",
484 .api =
"/timeslice-offer {}",
488 .metricOfferScaleFactor = 1,
491 .
available = shmResourceSpec.maxAvailable,
494 .
available = timesliceResourceSpec.maxAvailable,
497 .
enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
498 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
501 .
enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
502 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
505 offerResources(timesliceResourceState, timesliceResourceSpec, timesliceResourceStats,
506 specs, infos, manager, totalTimeframesConsumed, totalTimeslicesExpired,
507 totalTimeslicesStarted, totalTimeslicesDone, timestamp, driverMetrics,
508 availableTimeslicesMetric, unusedOfferedTimeslicesMetric, offeredTimeslicesMetric,
511 offerResources(shmResourceState, shmResourceSpec, shmResourceStats,
512 specs, infos, manager, shmOfferBytesConsumed, totalBytesExpired,
513 totalBytesCreated, totalBytesDestroyed, timestamp, driverMetrics,
514 availableSharedMemoryMetric, unusedOfferedSharedMemoryMetric, offeredSharedMemoryMetric,
520 auto totalMessages = 0;
522 for (
auto& input : ctx.
inputs()) {
523 if (input.header ==
nullptr) {
526 auto const* dh = DataRefUtils::getHeader<DataHeader*>(input);
530 "Message %{public}.4s/%{public}.16s is not of kind arrow, therefore we are not accounting its shared memory.",
531 dh->dataOrigin.str, dh->dataDescription.str);
534 bool forwarded = std::ranges::any_of(ctx.
services().
get<
DeviceSpec const>().
forwards, [&dh](
auto const& forward) { return DataSpecUtils::match(forward.matcher, *dh); });
537 "Message %{public}.4s/%{public}.16s is forwarded so we are not returning its memory.",
538 dh->dataOrigin.str, dh->dataDescription.str);
542 "Message %{public}.4s/%{public}.16s is being deleted. We will return %{bytes}f MB.",
543 dh->dataOrigin.str, dh->dataDescription.str, payloadSize / 1000000.);
544 totalBytes += payloadSize;
547 arrow->updateBytesDestroyed(totalBytes);
549 "%{bytes}f MB bytes being given back to reader, totaling %{bytes}f MB",
550 totalBytes / 1000000.,
arrow->bytesDestroyed() / 1000000.);
551 arrow->updateMessagesDestroyed(totalMessages);
555 stats.processCommandQueue(); },
558 int readers = std::stoll(dc.options[
"readers"].as<std::string>());
559 if (dc.options.count(
"aod-memory-rate-limit") && dc.options[
"aod-memory-rate-limit"].defaulted() ==
false) {
560 config->maxMemory = std::stoll(dc.options[
"aod-memory-rate-limit"].as<std::string>()) / 1000000;
562 config->maxMemory = readers * 2000;
564 if (dc.options.count(
"timeframes-rate-limit") && dc.options[
"timeframes-rate-limit"].defaulted() ==
false) {
565 config->maxTimeframes = std::stoll(dc.options[
"timeframes-rate-limit"].as<std::string>());
569 static bool once =
false;
574 "Rate limiting set up at %{bytes}llu MB and %llu timeframes distributed over %d readers",
575 config->maxMemory, config->maxTimeframes, readers);
576 registry.
registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
580 auto& workflow =
node.specs;
583 dec.requestedDYNs.clear();
588 auto builder = std::ranges::find_if(workflow, [](
DataProcessorSpec const& spec) {
return spec.
name.starts_with(
"internal-dpl-aod-index-builder"); });
589 if (builder != workflow.end()) {
591 dec.requestedIDXs.clear();
592 dec.providedIDXs.clear();
593 for (
auto& d : workflow | views::exclude_by_name(builder->name)) {
595 views::filter_with_params_by_name(
"index-records") |
598 views::filter_with_params_by_name(
"index-records") |
601 std::ranges::sort(dec.requestedIDXs, inputSpecLessThan);
602 std::ranges::sort(dec.providedIDXs, outputSpecLessThan);
603 dec.builderInputs.clear();
605 views::filter_not_matching(dec.providedIDXs) |
608 builder->inputs.clear();
609 builder->outputs.clear();
611 if (!builder->inputs.empty()) {
617 auto analysisCCDB = std::ranges::find_if(workflow, [](
DataProcessorSpec const& spec) {
return spec.
name.starts_with(
"internal-dpl-aod-ccdb"); });
618 if (analysisCCDB != workflow.end()) {
619 dec.requestedTIMs.clear();
620 dec.providedTIMs.clear();
621 for (
auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
623 views::filter_with_params_by_name_starting(
"ccdb:") |
626 views::filter_with_params_by_name_starting(
"ccdb:") |
629 std::ranges::sort(dec.requestedTIMs, inputSpecLessThan);
630 std::ranges::sort(dec.providedTIMs, outputSpecLessThan);
632 dec.analysisCCDBInputs.clear();
634 views::filter_not_matching(dec.providedTIMs) |
638 analysisCCDB->outputs.clear();
639 analysisCCDB->inputs.clear();
646 for (
auto& input : dec.analysisCCDBInputs) {
647 for (
auto&
m : input.metadata | std::views::filter(checks::has_params_with_name_starting(
"ccdb:"))) {
649 bool foundFirst =
false;
650 for (
auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
651 for (
auto& opt : d.options) {
652 if (opt.name ==
m.name) {
657 LOGP(warn,
"Task '{}' declares Configurable '{}' = '{}' which conflicts "
658 "with an earlier value '{}'; earlier value will be used.",
659 d.name, opt.name, opt.defaultValue.asString(),
673 auto spawner = std::ranges::find_if(workflow, [](
DataProcessorSpec const& spec) {
return spec.
name.starts_with(
"internal-dpl-aod-spawner"); });
674 if (
spawner != workflow.end()) {
675 dec.providedDYNs.clear();
677 for (
auto& d : workflow | views::exclude_by_name(
spawner->name)) {
679 views::filter_with_params_by_name(
"projectors") |
682 views::filter_with_params_by_name(
"projectors") |
685 std::ranges::sort(dec.requestedDYNs, inputSpecLessThan);
686 std::ranges::sort(dec.providedDYNs, outputSpecLessThan);
687 dec.spawnerInputs.clear();
689 views::filter_not_matching(dec.providedDYNs) |
695 if (!
spawner->inputs.empty()) {
701 auto writer = std::ranges::find_if(workflow, [](
DataProcessorSpec const& spec) {
return spec.
name.starts_with(
"internal-dpl-aod-writer"); });
702 if (writer != workflow.end()) {
703 workflow.erase(writer);
707 auto reader = std::ranges::find_if(workflow, [](
DataProcessorSpec const& spec) {
return spec.
name.starts_with(
"internal-dpl-aod-reader"); });
709 if (reader != workflow.end()) {
712 for (
auto& d : workflow) {
714 views::partial_match_filter(AODOrigins) |
719 auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](
OutputSpec const& o) {
722 reader->outputs.erase(o_end, reader->outputs.end());
723 if (reader->outputs.empty()) {
725 workflow.erase(reader);
728 auto tfnsource = std::ranges::find_if(workflow, [](
DataProcessorSpec const& spec) {
730 return DataSpecUtils::match(output,
"TFN",
"TFNumber", 0);
733 if (tfnsource == workflow.end()) {
742 for (
size_t i = 0;
i < workflow.size(); ++
i) {
743 if (workflow[
i].
name ==
"internal-dpl-injected-dummy-sink") {
744 workflow.push_back(workflow[
i]);
745 workflow.erase(workflow.begin() +
i);