255 using o2::monitoring::Metric;
256 using o2::monitoring::Monitoring;
257 using o2::monitoring::tags::Key;
258 using o2::monitoring::tags::Value;
261 .
name =
"arrow-backend",
271 int64_t totalBytesCreated = 0;
272 int64_t shmOfferBytesConsumed = 0;
273 int64_t totalBytesDestroyed = 0;
274 int64_t totalBytesExpired = 0;
275 int64_t totalMessagesCreated = 0;
276 int64_t totalMessagesDestroyed = 0;
277 int64_t totalTimeframesRead = 0;
278 int64_t totalTimeframesConsumed = 0;
279 int64_t totalTimeframesExpired = 0;
280 int64_t totalTimeslicesStarted = 0;
281 int64_t totalTimeslicesDone = 0;
282 int64_t totalTimeslicesExpired = 0;
283 auto &driverMetrics = sm.driverMetricsInfo;
284 auto &allDeviceMetrics = sm.deviceMetricsInfos;
285 auto &specs = sm.deviceSpecs;
286 auto &infos = sm.deviceInfos;
289 auto createUint64DriverMetric = [&driverMetrics](
char const*
name) ->
auto {
290 return DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
name);
292 auto createIntDriverMetric = [&driverMetrics](
char const*
name) ->
auto {
293 return DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
name);
296 static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"rate-limit-state");
297 static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-created");
298 static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-shm-offer-bytes-consumed");
300 static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-unused-offered-shared-memory");
301 static auto unusedOfferedTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-unused-offered-timeslices");
302 static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-available-shared-memory");
303 static auto availableTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-available-timeslices");
304 static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-offered-shared-memory");
305 static auto offeredTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-offered-timeslices");
307 static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-destroyed");
308 static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-expired");
309 static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-messages-created");
310 static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-messages-destroyed");
311 static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-timeframes-read");
312 static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-timeframes-consumed");
313 static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-timeframes-in-fly");
315 static auto totalTimeslicesStartedMetric = createUint64DriverMetric(
"total-timeslices-started");
316 static auto totalTimeslicesExpiredMetric = createUint64DriverMetric(
"total-timeslices-expired");
317 static auto totalTimeslicesDoneMetric = createUint64DriverMetric(
"total-timeslices-done");
318 static auto totalTimeslicesInFlyMetric = createIntDriverMetric(
"total-timeslices-in-fly");
320 static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"arrow-bytes-delta");
321 static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"changed-metrics-count");
322 static auto totalSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-reader-signals");
323 static auto signalLatencyMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-signal-latency");
324 static auto skippedSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-skipped-signals");
325 static auto remainingBytes = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-remaining-bytes");
328 bool changed =
false;
330 size_t lastTimestamp = 0;
332 for (
size_t mi = 0; mi < allDeviceMetrics.size(); ++mi) {
333 auto& deviceMetrics = allDeviceMetrics[mi];
334 if (deviceMetrics.changed.size() != deviceMetrics.metrics.size()) {
335 throw std::runtime_error(
"deviceMetrics.size() != allDeviceMetrics.size()");
337 auto&
indices = allIndices[mi];
340 assert(
index < deviceMetrics.metrics.size());
341 changed |= deviceMetrics.changed[
index];
343 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
344 auto&
data = deviceMetrics.uint64Metrics[info.storeIdx];
345 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
347 totalBytesCreated +=
value;
348 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) %
data.size()]);
352 assert(
index < deviceMetrics.metrics.size());
353 changed |= deviceMetrics.changed[
index];
355 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
356 auto&
data = deviceMetrics.uint64Metrics[info.storeIdx];
357 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
359 shmOfferBytesConsumed +=
value;
360 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) %
data.size()]);
364 assert(
index < deviceMetrics.metrics.size());
365 changed |= deviceMetrics.changed[
index];
367 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
368 auto&
data = deviceMetrics.uint64Metrics[info.storeIdx];
370 totalBytesDestroyed +=
value;
371 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
372 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) %
data.size()]);
376 assert(
index < deviceMetrics.metrics.size());
377 changed |= deviceMetrics.changed[
index];
379 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
380 auto&
data = deviceMetrics.uint64Metrics[info.storeIdx];
382 totalBytesExpired +=
value;
383 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
384 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) %
data.size()]);
388 assert(
index < deviceMetrics.metrics.size());
390 changed |= deviceMetrics.changed[
index];
391 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
392 auto&
data = deviceMetrics.uint64Metrics[info.storeIdx];
394 totalMessagesCreated +=
value;
395 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
396 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) %
data.size()]);
400 assert(
index < deviceMetrics.metrics.size());
402 changed |= deviceMetrics.changed[
index];
403 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
404 auto&
data = deviceMetrics.uint64Metrics[info.storeIdx];
406 totalMessagesDestroyed +=
value;
407 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
408 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) %
data.size()]);
412 assert(
index < deviceMetrics.metrics.size());
413 changed |= deviceMetrics.changed[
index];
415 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
416 auto&
data = deviceMetrics.uint64Metrics[info.storeIdx];
418 totalTimeframesRead +=
value;
419 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
420 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) %
data.size()]);
424 assert(
index < deviceMetrics.metrics.size());
425 changed |= deviceMetrics.changed[
index];
427 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
428 auto&
data = deviceMetrics.uint64Metrics[info.storeIdx];
430 totalTimeframesConsumed +=
value;
431 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
432 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) %
data.size()]);
436 assert(
index < deviceMetrics.metrics.size());
437 changed |= deviceMetrics.changed[
index];
439 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
440 auto&
data = deviceMetrics.uint64Metrics[info.storeIdx];
442 totalTimeframesExpired +=
value;
443 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
444 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) %
data.size()]);
450 static uint64_t unchangedCount = 0;
452 totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp);
453 totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp);
454 totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp);
455 shmOfferConsumedMetric(driverMetrics, shmOfferBytesConsumed, timestamp);
456 totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp);
457 totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
458 totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
459 totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
460 totalTimeframesInFlyMetric(driverMetrics, (
int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
461 totalTimeslicesStartedMetric(driverMetrics, totalTimeslicesStarted, timestamp);
462 totalTimeslicesExpiredMetric(driverMetrics, totalTimeslicesExpired, timestamp);
463 totalTimeslicesDoneMetric(driverMetrics, totalTimeslicesDone, timestamp);
464 totalTimeslicesInFlyMetric(driverMetrics, (
int)(totalTimeslicesStarted - totalTimeslicesDone), timestamp);
465 totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
469 changedCountMetric(driverMetrics, unchangedCount, timestamp);
472 .
name =
"shared memory",
474 .api =
"/shm-offer {}",
478 .metricOfferScaleFactor = 1000000,
482 .unit =
"timeslices",
483 .api =
"/timeslice-offer {}",
487 .metricOfferScaleFactor = 1,
490 .
available = shmResourceSpec.maxAvailable,
493 .
available = timesliceResourceSpec.maxAvailable,
496 .
enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
497 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
500 .
enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
501 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
504 offerResources(timesliceResourceState, timesliceResourceSpec, timesliceResourceStats,
505 specs, infos, manager, totalTimeframesConsumed, totalTimeslicesExpired,
506 totalTimeslicesStarted, totalTimeslicesDone, timestamp, driverMetrics,
507 availableTimeslicesMetric, unusedOfferedTimeslicesMetric, offeredTimeslicesMetric,
510 offerResources(shmResourceState, shmResourceSpec, shmResourceStats,
511 specs, infos, manager, shmOfferBytesConsumed, totalBytesExpired,
512 totalBytesCreated, totalBytesDestroyed, timestamp, driverMetrics,
513 availableSharedMemoryMetric, unusedOfferedSharedMemoryMetric, offeredSharedMemoryMetric,
519 auto totalMessages = 0;
521 for (
auto& input : ctx.
inputs()) {
522 if (input.header ==
nullptr) {
525 auto const* dh = DataRefUtils::getHeader<DataHeader*>(input);
529 "Message %{public}.4s/%{public}.16s is not of kind arrow, therefore we are not accounting its shared memory.",
530 dh->dataOrigin.str, dh->dataDescription.str);
533 bool forwarded = std::ranges::any_of(ctx.
services().
get<
DeviceSpec const>().
forwards, [&dh](
auto const& forward) { return DataSpecUtils::match(forward.matcher, *dh); });
536 "Message %{public}.4s/%{public}.16s is forwarded so we are not returning its memory.",
537 dh->dataOrigin.str, dh->dataDescription.str);
541 "Message %{public}.4s/%{public}.16s is being deleted. We will return %{bytes}f MB.",
542 dh->dataOrigin.str, dh->dataDescription.str, payloadSize / 1000000.);
543 totalBytes += payloadSize;
546 arrow->updateBytesDestroyed(totalBytes);
548 "%{bytes}f MB bytes being given back to reader, totaling %{bytes}f MB",
549 totalBytes / 1000000.,
arrow->bytesDestroyed() / 1000000.);
550 arrow->updateMessagesDestroyed(totalMessages);
554 stats.processCommandQueue(); },
557 int readers = std::stoll(dc.options[
"readers"].as<std::string>());
558 if (dc.options.count(
"aod-memory-rate-limit") && dc.options[
"aod-memory-rate-limit"].defaulted() ==
false) {
559 config->maxMemory = std::stoll(dc.options[
"aod-memory-rate-limit"].as<std::string>()) / 1000000;
561 config->maxMemory = readers * 2000;
563 if (dc.options.count(
"timeframes-rate-limit") && dc.options[
"timeframes-rate-limit"].defaulted() ==
false) {
564 config->maxTimeframes = std::stoll(dc.options[
"timeframes-rate-limit"].as<std::string>());
568 static bool once =
false;
573 "Rate limiting set up at %{bytes}llu MB and %llu timeframes distributed over %d readers",
574 config->maxMemory, config->maxTimeframes, readers);
575 registry.
registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
579 auto& workflow =
node.specs;
582 dec.requestedDYNs.clear();
587 auto builder = std::ranges::find_if(workflow, [](
DataProcessorSpec const& spec) {
return spec.
name.starts_with(
"internal-dpl-aod-index-builder"); });
588 if (builder != workflow.end()) {
590 dec.requestedIDXs.clear();
591 dec.providedIDXs.clear();
592 for (
auto& d : workflow | views::exclude_by_name(builder->name)) {
594 views::filter_with_params_by_name(
"index-records") |
597 views::filter_with_params_by_name(
"index-records") |
600 std::ranges::sort(dec.requestedIDXs, inputSpecLessThan);
601 std::ranges::sort(dec.providedIDXs, outputSpecLessThan);
602 dec.builderInputs.clear();
604 views::filter_not_matching(dec.providedIDXs) |
607 builder->inputs.clear();
608 builder->outputs.clear();
610 if (!builder->inputs.empty()) {
616 auto analysisCCDB = std::ranges::find_if(workflow, [](
DataProcessorSpec const& spec) {
return spec.
name.starts_with(
"internal-dpl-aod-ccdb"); });
617 if (analysisCCDB != workflow.end()) {
618 dec.requestedTIMs.clear();
619 dec.providedTIMs.clear();
620 for (
auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
622 views::filter_with_params_by_name_starting(
"ccdb:") |
625 views::filter_with_params_by_name_starting(
"ccdb:") |
628 std::ranges::sort(dec.requestedTIMs, inputSpecLessThan);
629 std::ranges::sort(dec.providedTIMs, outputSpecLessThan);
631 dec.analysisCCDBInputs.clear();
633 views::filter_not_matching(dec.providedTIMs) |
637 analysisCCDB->outputs.clear();
638 analysisCCDB->inputs.clear();
644 auto spawner = std::ranges::find_if(workflow, [](
DataProcessorSpec const& spec) {
return spec.
name.starts_with(
"internal-dpl-aod-spawner"); });
645 if (
spawner != workflow.end()) {
646 dec.providedDYNs.clear();
648 for (
auto& d : workflow | views::exclude_by_name(
spawner->name)) {
650 views::filter_with_params_by_name(
"projectors") |
653 views::filter_with_params_by_name(
"projectors") |
656 std::ranges::sort(dec.requestedDYNs, inputSpecLessThan);
657 std::ranges::sort(dec.providedDYNs, outputSpecLessThan);
658 dec.spawnerInputs.clear();
660 views::filter_not_matching(dec.providedDYNs) |
666 if (!
spawner->inputs.empty()) {
672 auto writer = std::ranges::find_if(workflow, [](
DataProcessorSpec const& spec) {
return spec.
name.starts_with(
"internal-dpl-aod-writer"); });
673 if (writer != workflow.end()) {
674 workflow.erase(writer);
678 auto reader = std::ranges::find_if(workflow, [](
DataProcessorSpec const& spec) {
return spec.
name.starts_with(
"internal-dpl-aod-reader"); });
680 if (reader != workflow.end()) {
683 for (
auto& d : workflow) {
685 views::partial_match_filter(AODOrigins) |
690 auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](
OutputSpec const& o) {
693 reader->outputs.erase(o_end, reader->outputs.end());
694 if (reader->outputs.empty()) {
696 workflow.erase(reader);
699 auto mctracks2aod = std::find_if(workflow.begin(), workflow.end(), [](
auto const&
x) { return x.name ==
"mctracks-to-aod"; });
700 if (mctracks2aod == workflow.end()) {
709 for (
size_t i = 0;
i < workflow.size(); ++
i) {
710 if (workflow[
i].
name ==
"internal-dpl-injected-dummy-sink") {
711 workflow.push_back(workflow[
i]);
712 workflow.erase(workflow.begin() +
i);