235 using o2::monitoring::Metric;
236 using o2::monitoring::Monitoring;
237 using o2::monitoring::tags::Key;
238 using o2::monitoring::tags::Value;
241 .
name =
"arrow-backend",
251 int64_t totalBytesCreated = 0;
252 int64_t shmOfferBytesConsumed = 0;
253 int64_t totalBytesDestroyed = 0;
254 int64_t totalBytesExpired = 0;
255 int64_t totalMessagesCreated = 0;
256 int64_t totalMessagesDestroyed = 0;
257 int64_t totalTimeframesRead = 0;
258 int64_t totalTimeframesConsumed = 0;
259 int64_t totalTimeframesExpired = 0;
260 auto &driverMetrics = sm.driverMetricsInfo;
261 auto &allDeviceMetrics = sm.deviceMetricsInfos;
262 auto &specs = sm.deviceSpecs;
263 auto &infos = sm.deviceInfos;
265 static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"rate-limit-state");
266 static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-created");
267 static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-shm-offer-bytes-consumed");
269 static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-unused-offered-shared-memory");
270 static auto unusedOfferedTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-unused-offered-timeslices");
271 static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-available-shared-memory");
272 static auto availableTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-available-timeslices");
273 static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-offered-shared-memory");
274 static auto offeredTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-offered-timeslices");
276 static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-destroyed");
277 static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-expired");
278 static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-messages-created");
279 static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-messages-destroyed");
280 static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-timeframes-read");
281 static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-timeframes-consumed");
282 static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-timeframes-in-fly");
283 static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"arrow-bytes-delta");
284 static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"changed-metrics-count");
285 static auto totalSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-reader-signals");
286 static auto signalLatencyMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-signal-latency");
287 static auto skippedSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-skipped-signals");
288 static auto remainingBytes = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-remaining-bytes");
291 bool changed =
false;
293 size_t lastTimestamp = 0;
295 for (
size_t mi = 0; mi < allDeviceMetrics.size(); ++mi) {
296 auto& deviceMetrics = allDeviceMetrics[mi];
297 if (deviceMetrics.changed.size() != deviceMetrics.metrics.size()) {
298 throw std::runtime_error(
"deviceMetrics.size() != allDeviceMetrics.size()");
300 auto&
indices = allIndices[mi];
303 assert(
index < deviceMetrics.metrics.size());
304 changed |= deviceMetrics.changed[
index];
306 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
307 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
308 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
310 totalBytesCreated +=
value;
311 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
315 assert(
index < deviceMetrics.metrics.size());
316 changed |= deviceMetrics.changed[
index];
318 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
319 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
320 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
322 shmOfferBytesConsumed +=
value;
323 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
327 assert(
index < deviceMetrics.metrics.size());
328 changed |= deviceMetrics.changed[
index];
330 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
331 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
333 totalBytesDestroyed +=
value;
334 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
335 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
339 assert(
index < deviceMetrics.metrics.size());
340 changed |= deviceMetrics.changed[
index];
342 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
343 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
345 totalBytesExpired +=
value;
346 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
347 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
351 assert(
index < deviceMetrics.metrics.size());
353 changed |= deviceMetrics.changed[
index];
354 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
355 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
357 totalMessagesCreated +=
value;
358 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
359 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
363 assert(
index < deviceMetrics.metrics.size());
365 changed |= deviceMetrics.changed[
index];
366 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
367 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
369 totalMessagesDestroyed +=
value;
370 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
371 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
375 assert(
index < deviceMetrics.metrics.size());
376 changed |= deviceMetrics.changed[
index];
378 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
379 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
381 totalTimeframesRead +=
value;
382 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
383 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
387 assert(
index < deviceMetrics.metrics.size());
388 changed |= deviceMetrics.changed[
index];
390 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
391 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
393 totalTimeframesConsumed +=
value;
394 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
395 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
399 assert(
index < deviceMetrics.metrics.size());
400 changed |= deviceMetrics.changed[
index];
402 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
403 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
405 totalTimeframesExpired +=
value;
406 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
407 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
410 static uint64_t unchangedCount = 0;
412 totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp);
413 totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp);
414 totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp);
415 shmOfferConsumedMetric(driverMetrics, shmOfferBytesConsumed, timestamp);
416 totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp);
417 totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
418 totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
419 totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
420 totalTimeframesInFlyMetric(driverMetrics, (
int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
421 totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
425 changedCountMetric(driverMetrics, unchangedCount, timestamp);
428 .
name =
"shared memory",
430 .api =
"/shm-offer {}",
434 .metricOfferScaleFactor = 1000000,
438 .unit =
"timeslices",
439 .api =
"/timeslice-offer {}",
443 .metricOfferScaleFactor = 1,
446 .
available = shmResourceSpec.maxAvailable,
449 .
available = timesliceResourceSpec.maxAvailable,
452 .
enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
453 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
456 .
enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
457 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
460 offerResources(timesliceResourceState, timesliceResourceSpec, timesliceResourceStats,
461 specs, infos, manager, totalTimeframesConsumed, totalTimeframesExpired,
462 totalTimeframesRead, totalTimeframesConsumed, timestamp, driverMetrics,
463 availableTimeslicesMetric, unusedOfferedTimeslicesMetric, offeredTimeslicesMetric,
466 offerResources(shmResourceState, shmResourceSpec, shmResourceStats,
467 specs, infos, manager, shmOfferBytesConsumed, totalBytesExpired,
468 totalBytesCreated, totalBytesDestroyed, timestamp, driverMetrics,
469 availableSharedMemoryMetric, unusedOfferedSharedMemoryMetric, offeredSharedMemoryMetric,
475 auto totalMessages = 0;
477 for (
auto& input : ctx.
inputs()) {
478 if (input.header ==
nullptr) {
481 auto const* dh = DataRefUtils::getHeader<DataHeader*>(input);
485 "Message %{public}.4s/%{public}.16s is not of kind arrow, therefore we are not accounting its shared memory.",
486 dh->dataOrigin.str, dh->dataDescription.str);
489 bool forwarded =
false;
498 "Message %{public}.4s/%{public}.16s is forwarded so we are not returning its memory.",
499 dh->dataOrigin.str, dh->dataDescription.str);
503 "Message %{public}.4s/%{public}.16s is being deleted. We will return %{bytes}f MB.",
504 dh->dataOrigin.str, dh->dataDescription.str, payloadSize / 1000000.);
505 totalBytes += payloadSize;
508 arrow->updateBytesDestroyed(totalBytes);
510 "%{bytes}f MB bytes being given back to reader, totaling %{bytes}f MB",
511 totalBytes / 1000000.,
arrow->bytesDestroyed() / 1000000.);
512 arrow->updateMessagesDestroyed(totalMessages);
516 stats.processCommandQueue(); },
519 int readers = std::stoll(dc.options[
"readers"].as<std::string>());
520 if (dc.options.count(
"aod-memory-rate-limit") && dc.options[
"aod-memory-rate-limit"].defaulted() ==
false) {
521 config->maxMemory = std::stoll(dc.options[
"aod-memory-rate-limit"].as<std::string>()) / 1000000;
523 config->maxMemory = readers * 500;
525 if (dc.options.count(
"timeframes-rate-limit") && dc.options[
"timeframes-rate-limit"].defaulted() ==
false) {
526 config->maxTimeframes = std::stoll(dc.options[
"timeframes-rate-limit"].as<std::string>());
530 static bool once =
false;
535 "Rate limiting set up at %{bytes}llu MB and %llu timeframes distributed over %d readers",
536 config->maxMemory, config->maxTimeframes, readers);
537 registry.
registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
541 auto& workflow =
node.specs;
542 auto spawner = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-spawner"; });
543 auto analysisCCDB = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-ccdb"; });
544 auto builder = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-index-builder"; });
545 auto reader = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-reader"; });
546 auto writer = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-writer"; });
549 ac.requestedDYNs.clear();
550 ac.providedDYNs.clear();
551 ac.providedTIMs.clear();
552 ac.requestedTIMs.clear();
558 if (builder != workflow.end()) {
560 ac.requestedIDXs.clear();
561 for (
auto& d : workflow) {
562 if (d.name == builder->name) {
565 for (
auto&
i : d.inputs) {
573 builder->inputs.clear();
574 builder->outputs.clear();
581 if (
spawner != workflow.end()) {
583 for (
auto& d : workflow) {
587 for (
auto const&
i : d.inputs) {
593 for (
auto const& o : d.outputs) {
595 ac.providedDYNs.emplace_back(o);
599 std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
600 std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
601 ac.spawnerInputs.clear();
602 for (
auto& input : ac.requestedDYNs) {
603 if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](
auto const&
x) { return DataSpecUtils::match(input, x); })) {
604 ac.spawnerInputs.emplace_back(input);
616 if (analysisCCDB != workflow.end()) {
617 for (
auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
621 std::sort(ac.requestedTIMs.begin(), ac.requestedTIMs.end(), inputSpecLessThan);
622 std::sort(ac.providedTIMs.begin(), ac.providedTIMs.end(), outputSpecLessThan);
624 ac.analysisCCDBInputs.clear();
625 ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) |
sinks::append_to{ac.analysisCCDBInputs};
628 analysisCCDB->outputs.clear();
629 analysisCCDB->inputs.clear();
637 if (writer != workflow.end()) {
638 workflow.erase(writer);
641 if (reader != workflow.end()) {
644 for (
auto& d : workflow) {
645 for (
auto const&
i : d.inputs) {
654 auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](
OutputSpec const& o) {
657 reader->outputs.erase(o_end, reader->outputs.end());
658 if (reader->outputs.empty()) {
660 workflow.erase(reader);
675 ac.outputsInputsAOD.clear();
677 for (
auto ii = 0u; ii < outputsInputs.size(); ii++) {
679 auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]);
680 if (!
ds.empty() || isDangling[ii]) {
681 ac.outputsInputsAOD.emplace_back(outputsInputs[ii]);
687 if (!ac.outputsInputsAOD.empty()) {
689 ac.outputsInputsAOD.emplace_back(
"tfn",
"TFN",
"TFNumber");
690 ac.outputsInputsAOD.emplace_back(
"tff",
"TFF",
"TFFilename");
694 for (
size_t i = 0;
i < workflow.size(); ++
i) {
695 if (workflow[
i].
name ==
"internal-dpl-injected-dummy-sink") {
696 workflow.push_back(workflow[
i]);
697 workflow.erase(workflow.begin() +
i);