256 using o2::monitoring::Metric;
257 using o2::monitoring::Monitoring;
258 using o2::monitoring::tags::Key;
259 using o2::monitoring::tags::Value;
262 .
name =
"arrow-backend",
272 int64_t totalBytesCreated = 0;
273 int64_t shmOfferBytesConsumed = 0;
274 int64_t totalBytesDestroyed = 0;
275 int64_t totalBytesExpired = 0;
276 int64_t totalMessagesCreated = 0;
277 int64_t totalMessagesDestroyed = 0;
278 int64_t totalTimeframesRead = 0;
279 int64_t totalTimeframesConsumed = 0;
280 int64_t totalTimeframesExpired = 0;
281 int64_t totalTimeslicesStarted = 0;
282 int64_t totalTimeslicesDone = 0;
283 int64_t totalTimeslicesExpired = 0;
284 auto &driverMetrics = sm.driverMetricsInfo;
285 auto &allDeviceMetrics = sm.deviceMetricsInfos;
286 auto &specs = sm.deviceSpecs;
287 auto &infos = sm.deviceInfos;
290 auto createUint64DriverMetric = [&driverMetrics](
char const*
name) ->
auto {
291 return DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
name);
293 auto createIntDriverMetric = [&driverMetrics](
char const*
name) ->
auto {
294 return DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
name);
297 static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"rate-limit-state");
298 static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-created");
299 static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-shm-offer-bytes-consumed");
301 static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-unused-offered-shared-memory");
302 static auto unusedOfferedTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-unused-offered-timeslices");
303 static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-available-shared-memory");
304 static auto availableTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-available-timeslices");
305 static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-offered-shared-memory");
306 static auto offeredTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-offered-timeslices");
308 static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-destroyed");
309 static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-expired");
310 static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-messages-created");
311 static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-messages-destroyed");
312 static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-timeframes-read");
313 static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-timeframes-consumed");
314 static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-timeframes-in-fly");
316 static auto totalTimeslicesStartedMetric = createUint64DriverMetric(
"total-timeslices-started");
317 static auto totalTimeslicesExpiredMetric = createUint64DriverMetric(
"total-timeslices-expired");
318 static auto totalTimeslicesDoneMetric = createUint64DriverMetric(
"total-timeslices-done");
319 static auto totalTimeslicesInFlyMetric = createIntDriverMetric(
"total-timeslices-in-fly");
321 static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"arrow-bytes-delta");
322 static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"changed-metrics-count");
323 static auto totalSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-reader-signals");
324 static auto signalLatencyMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-signal-latency");
325 static auto skippedSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-skipped-signals");
326 static auto remainingBytes = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-remaining-bytes");
329 bool changed =
false;
331 size_t lastTimestamp = 0;
333 for (
size_t mi = 0; mi < allDeviceMetrics.size(); ++mi) {
334 auto& deviceMetrics = allDeviceMetrics[mi];
335 if (deviceMetrics.changed.size() != deviceMetrics.metrics.size()) {
336 throw std::runtime_error(
"deviceMetrics.size() != allDeviceMetrics.size()");
338 auto&
indices = allIndices[mi];
341 assert(
index < deviceMetrics.metrics.size());
342 changed |= deviceMetrics.changed[
index];
344 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
345 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
346 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
348 totalBytesCreated +=
value;
349 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
353 assert(
index < deviceMetrics.metrics.size());
354 changed |= deviceMetrics.changed[
index];
356 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
357 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
358 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
360 shmOfferBytesConsumed +=
value;
361 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
365 assert(
index < deviceMetrics.metrics.size());
366 changed |= deviceMetrics.changed[
index];
368 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
369 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
371 totalBytesDestroyed +=
value;
372 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
373 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
377 assert(
index < deviceMetrics.metrics.size());
378 changed |= deviceMetrics.changed[
index];
380 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
381 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
383 totalBytesExpired +=
value;
384 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
385 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
389 assert(
index < deviceMetrics.metrics.size());
391 changed |= deviceMetrics.changed[
index];
392 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
393 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
395 totalMessagesCreated +=
value;
396 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
397 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
401 assert(
index < deviceMetrics.metrics.size());
403 changed |= deviceMetrics.changed[
index];
404 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
405 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
407 totalMessagesDestroyed +=
value;
408 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
409 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
413 assert(
index < deviceMetrics.metrics.size());
414 changed |= deviceMetrics.changed[
index];
416 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
417 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
419 totalTimeframesRead +=
value;
420 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
421 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
425 assert(
index < deviceMetrics.metrics.size());
426 changed |= deviceMetrics.changed[
index];
428 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
429 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
431 totalTimeframesConsumed +=
value;
432 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
433 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
437 assert(
index < deviceMetrics.metrics.size());
438 changed |= deviceMetrics.changed[
index];
440 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
441 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
443 totalTimeframesExpired +=
value;
444 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
445 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
451 static uint64_t unchangedCount = 0;
453 totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp);
454 totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp);
455 totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp);
456 shmOfferConsumedMetric(driverMetrics, shmOfferBytesConsumed, timestamp);
457 totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp);
458 totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
459 totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
460 totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
461 totalTimeframesInFlyMetric(driverMetrics, (
int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
462 totalTimeslicesStartedMetric(driverMetrics, totalTimeslicesStarted, timestamp);
463 totalTimeslicesExpiredMetric(driverMetrics, totalTimeslicesExpired, timestamp);
464 totalTimeslicesDoneMetric(driverMetrics, totalTimeslicesDone, timestamp);
465 totalTimeslicesInFlyMetric(driverMetrics, (
int)(totalTimeslicesStarted - totalTimeslicesDone), timestamp);
466 totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
470 changedCountMetric(driverMetrics, unchangedCount, timestamp);
473 .
name =
"shared memory",
475 .api =
"/shm-offer {}",
479 .metricOfferScaleFactor = 1000000,
483 .unit =
"timeslices",
484 .api =
"/timeslice-offer {}",
488 .metricOfferScaleFactor = 1,
491 .
available = shmResourceSpec.maxAvailable,
494 .
available = timesliceResourceSpec.maxAvailable,
497 .
enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
498 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
501 .
enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
502 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
505 offerResources(timesliceResourceState, timesliceResourceSpec, timesliceResourceStats,
506 specs, infos, manager, totalTimeframesConsumed, totalTimeslicesExpired,
507 totalTimeslicesStarted, totalTimeslicesDone, timestamp, driverMetrics,
508 availableTimeslicesMetric, unusedOfferedTimeslicesMetric, offeredTimeslicesMetric,
511 offerResources(shmResourceState, shmResourceSpec, shmResourceStats,
512 specs, infos, manager, shmOfferBytesConsumed, totalBytesExpired,
513 totalBytesCreated, totalBytesDestroyed, timestamp, driverMetrics,
514 availableSharedMemoryMetric, unusedOfferedSharedMemoryMetric, offeredSharedMemoryMetric,
520 auto totalMessages = 0;
522 for (
auto& input : ctx.
inputs()) {
523 if (input.header ==
nullptr) {
526 auto const* dh = DataRefUtils::getHeader<DataHeader*>(input);
530 "Message %{public}.4s/%{public}.16s is not of kind arrow, therefore we are not accounting its shared memory.",
531 dh->dataOrigin.str, dh->dataDescription.str);
534 bool forwarded =
false;
543 "Message %{public}.4s/%{public}.16s is forwarded so we are not returning its memory.",
544 dh->dataOrigin.str, dh->dataDescription.str);
548 "Message %{public}.4s/%{public}.16s is being deleted. We will return %{bytes}f MB.",
549 dh->dataOrigin.str, dh->dataDescription.str, payloadSize / 1000000.);
550 totalBytes += payloadSize;
553 arrow->updateBytesDestroyed(totalBytes);
555 "%{bytes}f MB bytes being given back to reader, totaling %{bytes}f MB",
556 totalBytes / 1000000.,
arrow->bytesDestroyed() / 1000000.);
557 arrow->updateMessagesDestroyed(totalMessages);
561 stats.processCommandQueue(); },
564 int readers = std::stoll(dc.options[
"readers"].as<std::string>());
565 if (dc.options.count(
"aod-memory-rate-limit") && dc.options[
"aod-memory-rate-limit"].defaulted() ==
false) {
566 config->maxMemory = std::stoll(dc.options[
"aod-memory-rate-limit"].as<std::string>()) / 1000000;
568 config->maxMemory = readers * 500;
570 if (dc.options.count(
"timeframes-rate-limit") && dc.options[
"timeframes-rate-limit"].defaulted() ==
false) {
571 config->maxTimeframes = std::stoll(dc.options[
"timeframes-rate-limit"].as<std::string>());
575 static bool once =
false;
580 "Rate limiting set up at %{bytes}llu MB and %llu timeframes distributed over %d readers",
581 config->maxMemory, config->maxTimeframes, readers);
582 registry.
registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
586 auto& workflow =
node.specs;
587 auto spawner = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-spawner"; });
588 auto analysisCCDB = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-ccdb"; });
589 auto builder = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-index-builder"; });
590 auto reader = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-reader"; });
591 auto writer = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-writer"; });
594 ac.requestedDYNs.clear();
595 ac.providedDYNs.clear();
596 ac.providedTIMs.clear();
597 ac.requestedTIMs.clear();
603 if (builder != workflow.end()) {
605 ac.requestedIDXs.clear();
606 for (
auto& d : workflow) {
607 if (d.name == builder->name) {
610 for (
auto&
i : d.inputs) {
618 builder->inputs.clear();
619 builder->outputs.clear();
626 if (
spawner != workflow.end()) {
628 for (
auto& d : workflow) {
632 for (
auto const&
i : d.inputs) {
638 for (
auto const& o : d.outputs) {
640 ac.providedDYNs.emplace_back(o);
644 std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
645 std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
646 ac.spawnerInputs.clear();
647 for (
auto& input : ac.requestedDYNs) {
648 if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](
auto const&
x) { return DataSpecUtils::match(input, x); })) {
649 ac.spawnerInputs.emplace_back(input);
661 if (analysisCCDB != workflow.end()) {
662 for (
auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
666 std::sort(ac.requestedTIMs.begin(), ac.requestedTIMs.end(), inputSpecLessThan);
667 std::sort(ac.providedTIMs.begin(), ac.providedTIMs.end(), outputSpecLessThan);
669 ac.analysisCCDBInputs.clear();
670 ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) |
sinks::append_to{ac.analysisCCDBInputs};
673 analysisCCDB->outputs.clear();
674 analysisCCDB->inputs.clear();
682 if (writer != workflow.end()) {
683 workflow.erase(writer);
686 if (reader != workflow.end()) {
689 for (
auto& d : workflow) {
690 for (
auto const&
i : d.inputs) {
699 auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](
OutputSpec const& o) {
702 reader->outputs.erase(o_end, reader->outputs.end());
703 if (reader->outputs.empty()) {
705 workflow.erase(reader);
720 ac.outputsInputsAOD.clear();
722 for (
auto ii = 0u; ii < outputsInputs.size(); ii++) {
724 auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]);
725 if (!
ds.empty() || isDangling[ii]) {
726 ac.outputsInputsAOD.emplace_back(outputsInputs[ii]);
732 if (!ac.outputsInputsAOD.empty()) {
734 ac.outputsInputsAOD.emplace_back(
"tfn",
"TFN",
"TFNumber");
735 ac.outputsInputsAOD.emplace_back(
"tff",
"TFF",
"TFFilename");
739 for (
size_t i = 0;
i < workflow.size(); ++
i) {
740 if (workflow[
i].
name ==
"internal-dpl-injected-dummy-sink") {
741 workflow.push_back(workflow[
i]);
742 workflow.erase(workflow.begin() +
i);