256 using o2::monitoring::Metric;
257 using o2::monitoring::Monitoring;
258 using o2::monitoring::tags::Key;
259 using o2::monitoring::tags::Value;
262 .
name =
"arrow-backend",
272 int64_t totalBytesCreated = 0;
273 int64_t shmOfferBytesConsumed = 0;
274 int64_t totalBytesDestroyed = 0;
275 int64_t totalBytesExpired = 0;
276 int64_t totalMessagesCreated = 0;
277 int64_t totalMessagesDestroyed = 0;
278 int64_t totalTimeframesRead = 0;
279 int64_t totalTimeframesConsumed = 0;
280 int64_t totalTimeframesExpired = 0;
281 int64_t totalTimeslicesStarted = 0;
282 int64_t totalTimeslicesDone = 0;
283 int64_t totalTimeslicesExpired = 0;
284 auto &driverMetrics = sm.driverMetricsInfo;
285 auto &allDeviceMetrics = sm.deviceMetricsInfos;
286 auto &specs = sm.deviceSpecs;
287 auto &infos = sm.deviceInfos;
290 auto createUint64DriverMetric = [&driverMetrics](
char const*
name) ->
auto {
291 return DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
name);
293 auto createIntDriverMetric = [&driverMetrics](
char const*
name) ->
auto {
294 return DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
name);
297 static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"rate-limit-state");
298 static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-created");
299 static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-shm-offer-bytes-consumed");
301 static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-unused-offered-shared-memory");
302 static auto unusedOfferedTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-unused-offered-timeslices");
303 static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-available-shared-memory");
304 static auto availableTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-available-timeslices");
305 static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-offered-shared-memory");
306 static auto offeredTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-offered-timeslices");
308 static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-destroyed");
309 static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-expired");
310 static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-messages-created");
311 static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-messages-destroyed");
312 static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-timeframes-read");
313 static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-timeframes-consumed");
314 static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-timeframes-in-fly");
316 static auto totalTimeslicesStartedMetric = createUint64DriverMetric(
"total-timeslices-started");
317 static auto totalTimeslicesExpiredMetric = createUint64DriverMetric(
"total-timeslices-expired");
318 static auto totalTimeslicesDoneMetric = createUint64DriverMetric(
"total-timeslices-done");
319 static auto totalTimeslicesInFlyMetric = createIntDriverMetric(
"total-timeslices-in-fly");
321 static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"arrow-bytes-delta");
322 static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"changed-metrics-count");
323 static auto totalSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-reader-signals");
324 static auto signalLatencyMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-signal-latency");
325 static auto skippedSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-skipped-signals");
326 static auto remainingBytes = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-remaining-bytes");
329 bool changed =
false;
331 size_t lastTimestamp = 0;
333 for (
size_t mi = 0; mi < allDeviceMetrics.size(); ++mi) {
334 auto& deviceMetrics = allDeviceMetrics[mi];
335 if (deviceMetrics.changed.size() != deviceMetrics.metrics.size()) {
336 throw std::runtime_error(
"deviceMetrics.size() != allDeviceMetrics.size()");
338 auto&
indices = allIndices[mi];
341 assert(
index < deviceMetrics.metrics.size());
342 changed |= deviceMetrics.changed[
index];
344 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
345 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
346 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
348 totalBytesCreated +=
value;
349 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
353 assert(
index < deviceMetrics.metrics.size());
354 changed |= deviceMetrics.changed[
index];
356 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
357 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
358 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
360 shmOfferBytesConsumed +=
value;
361 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
365 assert(
index < deviceMetrics.metrics.size());
366 changed |= deviceMetrics.changed[
index];
368 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
369 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
371 totalBytesDestroyed +=
value;
372 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
373 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
377 assert(
index < deviceMetrics.metrics.size());
378 changed |= deviceMetrics.changed[
index];
380 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
381 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
383 totalBytesExpired +=
value;
384 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
385 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
389 assert(
index < deviceMetrics.metrics.size());
391 changed |= deviceMetrics.changed[
index];
392 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
393 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
395 totalMessagesCreated +=
value;
396 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
397 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
401 assert(
index < deviceMetrics.metrics.size());
403 changed |= deviceMetrics.changed[
index];
404 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
405 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
407 totalMessagesDestroyed +=
value;
408 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
409 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
413 assert(
index < deviceMetrics.metrics.size());
414 changed |= deviceMetrics.changed[
index];
416 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
417 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
419 totalTimeframesRead +=
value;
420 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
421 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
425 assert(
index < deviceMetrics.metrics.size());
426 changed |= deviceMetrics.changed[
index];
428 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
429 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
431 totalTimeframesConsumed +=
value;
432 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
433 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
437 assert(
index < deviceMetrics.metrics.size());
438 changed |= deviceMetrics.changed[
index];
440 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
441 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
443 totalTimeframesExpired +=
value;
444 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
445 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
451 static uint64_t unchangedCount = 0;
453 totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp);
454 totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp);
455 totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp);
456 shmOfferConsumedMetric(driverMetrics, shmOfferBytesConsumed, timestamp);
457 totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp);
458 totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
459 totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
460 totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
461 totalTimeframesInFlyMetric(driverMetrics, (
int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
462 totalTimeslicesStartedMetric(driverMetrics, totalTimeslicesStarted, timestamp);
463 totalTimeslicesExpiredMetric(driverMetrics, totalTimeslicesExpired, timestamp);
464 totalTimeslicesDoneMetric(driverMetrics, totalTimeslicesDone, timestamp);
465 totalTimeslicesInFlyMetric(driverMetrics, (
int)(totalTimeslicesStarted - totalTimeslicesDone), timestamp);
466 totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
470 changedCountMetric(driverMetrics, unchangedCount, timestamp);
473 .
name =
"shared memory",
475 .api =
"/shm-offer {}",
479 .metricOfferScaleFactor = 1000000,
483 .unit =
"timeslices",
484 .api =
"/timeslice-offer {}",
488 .metricOfferScaleFactor = 1,
491 .
available = shmResourceSpec.maxAvailable,
494 .
available = timesliceResourceSpec.maxAvailable,
497 .
enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
498 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
501 .
enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
502 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
505 offerResources(timesliceResourceState, timesliceResourceSpec, timesliceResourceStats,
506 specs, infos, manager, totalTimeframesConsumed, totalTimeslicesExpired,
507 totalTimeslicesStarted, totalTimeslicesDone, timestamp, driverMetrics,
508 availableTimeslicesMetric, unusedOfferedTimeslicesMetric, offeredTimeslicesMetric,
511 offerResources(shmResourceState, shmResourceSpec, shmResourceStats,
512 specs, infos, manager, shmOfferBytesConsumed, totalBytesExpired,
513 totalBytesCreated, totalBytesDestroyed, timestamp, driverMetrics,
514 availableSharedMemoryMetric, unusedOfferedSharedMemoryMetric, offeredSharedMemoryMetric,
520 auto totalMessages = 0;
522 for (
auto& input : ctx.
inputs()) {
523 if (input.header ==
nullptr) {
526 auto const* dh = DataRefUtils::getHeader<DataHeader*>(input);
530 "Message %{public}.4s/%{public}.16s is not of kind arrow, therefore we are not accounting its shared memory.",
531 dh->dataOrigin.str, dh->dataDescription.str);
534 bool forwarded =
false;
543 "Message %{public}.4s/%{public}.16s is forwarded so we are not returning its memory.",
544 dh->dataOrigin.str, dh->dataDescription.str);
548 "Message %{public}.4s/%{public}.16s is being deleted. We will return %{bytes}f MB.",
549 dh->dataOrigin.str, dh->dataDescription.str, payloadSize / 1000000.);
550 totalBytes += payloadSize;
553 arrow->updateBytesDestroyed(totalBytes);
555 "%{bytes}f MB bytes being given back to reader, totaling %{bytes}f MB",
556 totalBytes / 1000000.,
arrow->bytesDestroyed() / 1000000.);
557 arrow->updateMessagesDestroyed(totalMessages);
561 stats.processCommandQueue(); },
564 int readers = std::stoll(dc.options[
"readers"].as<std::string>());
565 if (dc.options.count(
"aod-memory-rate-limit") && dc.options[
"aod-memory-rate-limit"].defaulted() ==
false) {
566 config->maxMemory = std::stoll(dc.options[
"aod-memory-rate-limit"].as<std::string>()) / 1000000;
568 config->maxMemory = readers * 2000;
570 if (dc.options.count(
"timeframes-rate-limit") && dc.options[
"timeframes-rate-limit"].defaulted() ==
false) {
571 config->maxTimeframes = std::stoll(dc.options[
"timeframes-rate-limit"].as<std::string>());
575 static bool once =
false;
580 "Rate limiting set up at %{bytes}llu MB and %llu timeframes distributed over %d readers",
581 config->maxMemory, config->maxTimeframes, readers);
582 registry.
registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
586 auto& workflow =
node.specs;
587 auto spawner = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-spawner"; });
588 auto analysisCCDB = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-ccdb"; });
589 auto builder = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-index-builder"; });
590 auto reader = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-reader"; });
591 auto writer = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-writer"; });
594 dec.requestedDYNs.clear();
595 dec.providedDYNs.clear();
596 dec.providedTIMs.clear();
597 dec.requestedTIMs.clear();
602 if (builder != workflow.end()) {
604 dec.requestedIDXs.clear();
605 for (
auto& d : workflow | views::exclude_by_name(builder->name)) {
611 builder->inputs.clear();
612 builder->outputs.clear();
619 if (
spawner != workflow.end()) {
621 for (
auto& d : workflow | views::exclude_by_name(
spawner->name)) {
629 std::sort(dec.requestedDYNs.begin(), dec.requestedDYNs.end(), inputSpecLessThan);
630 std::sort(dec.providedDYNs.begin(), dec.providedDYNs.end(), outputSpecLessThan);
631 dec.spawnerInputs.clear();
633 views::filter_not_matching(dec.providedDYNs) |
644 if (analysisCCDB != workflow.end()) {
645 for (
auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
649 std::sort(dec.requestedTIMs.begin(), dec.requestedTIMs.end(), inputSpecLessThan);
650 std::sort(dec.providedTIMs.begin(), dec.providedTIMs.end(), outputSpecLessThan);
652 dec.analysisCCDBInputs.clear();
653 dec.requestedTIMs | views::filter_not_matching(dec.providedTIMs) |
sinks::append_to{dec.analysisCCDBInputs};
656 analysisCCDB->outputs.clear();
657 analysisCCDB->inputs.clear();
664 if (writer != workflow.end()) {
665 workflow.erase(writer);
668 if (reader != workflow.end()) {
671 for (
auto& d : workflow) {
673 views::partial_match_filter(AODOrigins) |
678 auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](
OutputSpec const& o) {
681 reader->outputs.erase(o_end, reader->outputs.end());
682 if (reader->outputs.empty()) {
684 workflow.erase(reader);
701 dec.outputsInputsAOD.clear();
703 for (
auto ii = 0u; ii < outputsInputs.size(); ii++) {
705 auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]);
706 if (!
ds.empty() || isDangling[ii]) {
707 dec.outputsInputsAOD.emplace_back(outputsInputs[ii]);
713 if (!dec.outputsInputsAOD.empty()) {
715 dec.outputsInputsAOD.emplace_back(
"tfn",
"TFN",
"TFNumber");
716 dec.outputsInputsAOD.emplace_back(
"tff",
"TFF",
"TFFilename");
720 for (
size_t i = 0;
i < workflow.size(); ++
i) {
721 if (workflow[
i].
name ==
"internal-dpl-injected-dummy-sink") {
722 workflow.push_back(workflow[
i]);
723 workflow.erase(workflow.begin() +
i);