255 using o2::monitoring::Metric;
256 using o2::monitoring::Monitoring;
257 using o2::monitoring::tags::Key;
258 using o2::monitoring::tags::Value;
261 .
name =
"arrow-backend",
271 int64_t totalBytesCreated = 0;
272 int64_t shmOfferBytesConsumed = 0;
273 int64_t totalBytesDestroyed = 0;
274 int64_t totalBytesExpired = 0;
275 int64_t totalMessagesCreated = 0;
276 int64_t totalMessagesDestroyed = 0;
277 int64_t totalTimeframesRead = 0;
278 int64_t totalTimeframesConsumed = 0;
279 int64_t totalTimeframesExpired = 0;
280 int64_t totalTimeslicesStarted = 0;
281 int64_t totalTimeslicesDone = 0;
282 int64_t totalTimeslicesExpired = 0;
283 auto &driverMetrics = sm.driverMetricsInfo;
284 auto &allDeviceMetrics = sm.deviceMetricsInfos;
285 auto &specs = sm.deviceSpecs;
286 auto &infos = sm.deviceInfos;
289 auto createUint64DriverMetric = [&driverMetrics](
char const*
name) ->
auto {
290 return DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
name);
292 auto createIntDriverMetric = [&driverMetrics](
char const*
name) ->
auto {
293 return DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
name);
296 static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"rate-limit-state");
297 static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-created");
298 static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-shm-offer-bytes-consumed");
300 static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-unused-offered-shared-memory");
301 static auto unusedOfferedTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-unused-offered-timeslices");
302 static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-available-shared-memory");
303 static auto availableTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-available-timeslices");
304 static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-offered-shared-memory");
305 static auto offeredTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-offered-timeslices");
307 static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-destroyed");
308 static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-expired");
309 static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-messages-created");
310 static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-messages-destroyed");
311 static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-timeframes-read");
312 static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-timeframes-consumed");
313 static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-timeframes-in-fly");
315 static auto totalTimeslicesStartedMetric = createUint64DriverMetric(
"total-timeslices-started");
316 static auto totalTimeslicesExpiredMetric = createUint64DriverMetric(
"total-timeslices-expired");
317 static auto totalTimeslicesDoneMetric = createUint64DriverMetric(
"total-timeslices-done");
318 static auto totalTimeslicesInFlyMetric = createIntDriverMetric(
"total-timeslices-in-fly");
320 static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"arrow-bytes-delta");
321 static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"changed-metrics-count");
322 static auto totalSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-reader-signals");
323 static auto signalLatencyMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-signal-latency");
324 static auto skippedSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-skipped-signals");
325 static auto remainingBytes = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-remaining-bytes");
328 bool changed =
false;
330 size_t lastTimestamp = 0;
332 for (
size_t mi = 0; mi < allDeviceMetrics.size(); ++mi) {
333 auto& deviceMetrics = allDeviceMetrics[mi];
334 if (deviceMetrics.changed.size() != deviceMetrics.metrics.size()) {
335 throw std::runtime_error(
"deviceMetrics.size() != allDeviceMetrics.size()");
337 auto&
indices = allIndices[mi];
340 assert(
index < deviceMetrics.metrics.size());
341 changed |= deviceMetrics.changed[
index];
343 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
344 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
345 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
347 totalBytesCreated +=
value;
348 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
352 assert(
index < deviceMetrics.metrics.size());
353 changed |= deviceMetrics.changed[
index];
355 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
356 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
357 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
359 shmOfferBytesConsumed +=
value;
360 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
364 assert(
index < deviceMetrics.metrics.size());
365 changed |= deviceMetrics.changed[
index];
367 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
368 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
370 totalBytesDestroyed +=
value;
371 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
372 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
376 assert(
index < deviceMetrics.metrics.size());
377 changed |= deviceMetrics.changed[
index];
379 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
380 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
382 totalBytesExpired +=
value;
383 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
384 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
388 assert(
index < deviceMetrics.metrics.size());
390 changed |= deviceMetrics.changed[
index];
391 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
392 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
394 totalMessagesCreated +=
value;
395 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
396 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
400 assert(
index < deviceMetrics.metrics.size());
402 changed |= deviceMetrics.changed[
index];
403 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
404 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
406 totalMessagesDestroyed +=
value;
407 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
408 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
412 assert(
index < deviceMetrics.metrics.size());
413 changed |= deviceMetrics.changed[
index];
415 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
416 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
418 totalTimeframesRead +=
value;
419 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
420 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
424 assert(
index < deviceMetrics.metrics.size());
425 changed |= deviceMetrics.changed[
index];
427 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
428 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
430 totalTimeframesConsumed +=
value;
431 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
432 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
436 assert(
index < deviceMetrics.metrics.size());
437 changed |= deviceMetrics.changed[
index];
439 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
440 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
442 totalTimeframesExpired +=
value;
443 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
444 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
450 static uint64_t unchangedCount = 0;
452 totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp);
453 totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp);
454 totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp);
455 shmOfferConsumedMetric(driverMetrics, shmOfferBytesConsumed, timestamp);
456 totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp);
457 totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
458 totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
459 totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
460 totalTimeframesInFlyMetric(driverMetrics, (
int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
461 totalTimeslicesStartedMetric(driverMetrics, totalTimeslicesStarted, timestamp);
462 totalTimeslicesExpiredMetric(driverMetrics, totalTimeslicesExpired, timestamp);
463 totalTimeslicesDoneMetric(driverMetrics, totalTimeslicesDone, timestamp);
464 totalTimeslicesInFlyMetric(driverMetrics, (
int)(totalTimeslicesStarted - totalTimeslicesDone), timestamp);
465 totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
469 changedCountMetric(driverMetrics, unchangedCount, timestamp);
472 .
name =
"shared memory",
474 .api =
"/shm-offer {}",
478 .metricOfferScaleFactor = 1000000,
482 .unit =
"timeslices",
483 .api =
"/timeslice-offer {}",
487 .metricOfferScaleFactor = 1,
490 .
available = shmResourceSpec.maxAvailable,
493 .
available = timesliceResourceSpec.maxAvailable,
496 .
enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
497 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
500 .
enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
501 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
504 offerResources(timesliceResourceState, timesliceResourceSpec, timesliceResourceStats,
505 specs, infos, manager, totalTimeframesConsumed, totalTimeslicesExpired,
506 totalTimeslicesStarted, totalTimeslicesDone, timestamp, driverMetrics,
507 availableTimeslicesMetric, unusedOfferedTimeslicesMetric, offeredTimeslicesMetric,
510 offerResources(shmResourceState, shmResourceSpec, shmResourceStats,
511 specs, infos, manager, shmOfferBytesConsumed, totalBytesExpired,
512 totalBytesCreated, totalBytesDestroyed, timestamp, driverMetrics,
513 availableSharedMemoryMetric, unusedOfferedSharedMemoryMetric, offeredSharedMemoryMetric,
519 auto totalMessages = 0;
521 for (
auto& input : ctx.
inputs()) {
522 if (input.header ==
nullptr) {
525 auto const* dh = DataRefUtils::getHeader<DataHeader*>(input);
529 "Message %{public}.4s/%{public}.16s is not of kind arrow, therefore we are not accounting its shared memory.",
530 dh->dataOrigin.str, dh->dataDescription.str);
533 bool forwarded =
false;
542 "Message %{public}.4s/%{public}.16s is forwarded so we are not returning its memory.",
543 dh->dataOrigin.str, dh->dataDescription.str);
547 "Message %{public}.4s/%{public}.16s is being deleted. We will return %{bytes}f MB.",
548 dh->dataOrigin.str, dh->dataDescription.str, payloadSize / 1000000.);
549 totalBytes += payloadSize;
552 arrow->updateBytesDestroyed(totalBytes);
554 "%{bytes}f MB bytes being given back to reader, totaling %{bytes}f MB",
555 totalBytes / 1000000.,
arrow->bytesDestroyed() / 1000000.);
556 arrow->updateMessagesDestroyed(totalMessages);
560 stats.processCommandQueue(); },
563 int readers = std::stoll(dc.options[
"readers"].as<std::string>());
564 if (dc.options.count(
"aod-memory-rate-limit") && dc.options[
"aod-memory-rate-limit"].defaulted() ==
false) {
565 config->maxMemory = std::stoll(dc.options[
"aod-memory-rate-limit"].as<std::string>()) / 1000000;
567 config->maxMemory = readers * 2000;
569 if (dc.options.count(
"timeframes-rate-limit") && dc.options[
"timeframes-rate-limit"].defaulted() ==
false) {
570 config->maxTimeframes = std::stoll(dc.options[
"timeframes-rate-limit"].as<std::string>());
574 static bool once =
false;
579 "Rate limiting set up at %{bytes}llu MB and %llu timeframes distributed over %d readers",
580 config->maxMemory, config->maxTimeframes, readers);
581 registry.
registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
585 auto& workflow =
node.specs;
586 auto spawner = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-spawner"; });
587 auto analysisCCDB = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-ccdb"; });
588 auto builder = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-index-builder"; });
589 auto reader = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-reader"; });
590 auto writer = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-writer"; });
593 ac.requestedDYNs.clear();
594 ac.providedDYNs.clear();
595 ac.providedTIMs.clear();
596 ac.requestedTIMs.clear();
602 if (builder != workflow.end()) {
604 ac.requestedIDXs.clear();
605 for (
auto& d : workflow) {
606 if (d.name == builder->name) {
609 for (
auto&
i : d.inputs) {
617 builder->inputs.clear();
618 builder->outputs.clear();
625 if (
spawner != workflow.end()) {
627 for (
auto& d : workflow) {
631 for (
auto const&
i : d.inputs) {
637 for (
auto const& o : d.outputs) {
639 ac.providedDYNs.emplace_back(o);
643 std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
644 std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
645 ac.spawnerInputs.clear();
646 for (
auto& input : ac.requestedDYNs) {
647 if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](
auto const&
x) { return DataSpecUtils::match(input, x); })) {
648 ac.spawnerInputs.emplace_back(input);
660 if (analysisCCDB != workflow.end()) {
661 for (
auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
665 std::sort(ac.requestedTIMs.begin(), ac.requestedTIMs.end(), inputSpecLessThan);
666 std::sort(ac.providedTIMs.begin(), ac.providedTIMs.end(), outputSpecLessThan);
668 ac.analysisCCDBInputs.clear();
669 ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) |
sinks::append_to{ac.analysisCCDBInputs};
672 analysisCCDB->outputs.clear();
673 analysisCCDB->inputs.clear();
681 if (writer != workflow.end()) {
682 workflow.erase(writer);
685 if (reader != workflow.end()) {
688 for (
auto& d : workflow) {
689 for (
auto const&
i : d.inputs) {
698 auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](
OutputSpec const& o) {
701 reader->outputs.erase(o_end, reader->outputs.end());
702 if (reader->outputs.empty()) {
704 workflow.erase(reader);
719 ac.outputsInputsAOD.clear();
721 for (
auto ii = 0u; ii < outputsInputs.size(); ii++) {
723 auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]);
724 if (!
ds.empty() || isDangling[ii]) {
725 ac.outputsInputsAOD.emplace_back(outputsInputs[ii]);
731 if (!ac.outputsInputsAOD.empty()) {
733 ac.outputsInputsAOD.emplace_back(
"tfn",
"TFN",
"TFNumber");
734 ac.outputsInputsAOD.emplace_back(
"tff",
"TFF",
"TFFilename");
738 for (
size_t i = 0;
i < workflow.size(); ++
i) {
739 if (workflow[
i].
name ==
"internal-dpl-injected-dummy-sink") {
740 workflow.push_back(workflow[
i]);
741 workflow.erase(workflow.begin() +
i);