234 using o2::monitoring::Metric;
235 using o2::monitoring::Monitoring;
236 using o2::monitoring::tags::Key;
237 using o2::monitoring::tags::Value;
240 .
name =
"arrow-backend",
250 int64_t totalBytesCreated = 0;
251 int64_t shmOfferBytesConsumed = 0;
252 int64_t totalBytesDestroyed = 0;
253 int64_t totalBytesExpired = 0;
254 int64_t totalMessagesCreated = 0;
255 int64_t totalMessagesDestroyed = 0;
256 int64_t totalTimeframesRead = 0;
257 int64_t totalTimeframesConsumed = 0;
258 int64_t totalTimeframesExpired = 0;
259 auto &driverMetrics = sm.driverMetricsInfo;
260 auto &allDeviceMetrics = sm.deviceMetricsInfos;
261 auto &specs = sm.deviceSpecs;
262 auto &infos = sm.deviceInfos;
264 static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"rate-limit-state");
265 static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-created");
266 static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-shm-offer-bytes-consumed");
268 static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-unused-offered-shared-memory");
269 static auto unusedOfferedTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-unused-offered-timeslices");
270 static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-available-shared-memory");
271 static auto availableTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-available-timeslices");
272 static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-offered-shared-memory");
273 static auto offeredTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-offered-timeslices");
275 static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-destroyed");
276 static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-expired");
277 static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-messages-created");
278 static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-messages-destroyed");
279 static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-timeframes-read");
280 static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-timeframes-consumed");
281 static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-timeframes-in-fly");
282 static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"arrow-bytes-delta");
283 static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"changed-metrics-count");
284 static auto totalSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-reader-signals");
285 static auto signalLatencyMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-signal-latency");
286 static auto skippedSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-skipped-signals");
287 static auto remainingBytes = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-remaining-bytes");
290 bool changed =
false;
292 size_t lastTimestamp = 0;
294 for (
size_t mi = 0; mi < allDeviceMetrics.size(); ++mi) {
295 auto& deviceMetrics = allDeviceMetrics[mi];
296 if (deviceMetrics.changed.size() != deviceMetrics.metrics.size()) {
297 throw std::runtime_error(
"deviceMetrics.size() != allDeviceMetrics.size()");
299 auto&
indices = allIndices[mi];
302 assert(
index < deviceMetrics.metrics.size());
303 changed |= deviceMetrics.changed[
index];
305 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
306 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
307 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
309 totalBytesCreated +=
value;
310 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
314 assert(
index < deviceMetrics.metrics.size());
315 changed |= deviceMetrics.changed[
index];
317 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
318 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
319 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
321 shmOfferBytesConsumed +=
value;
322 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
326 assert(
index < deviceMetrics.metrics.size());
327 changed |= deviceMetrics.changed[
index];
329 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
330 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
332 totalBytesDestroyed +=
value;
333 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
334 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
338 assert(
index < deviceMetrics.metrics.size());
339 changed |= deviceMetrics.changed[
index];
341 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
342 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
344 totalBytesExpired +=
value;
345 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
346 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
350 assert(
index < deviceMetrics.metrics.size());
352 changed |= deviceMetrics.changed[
index];
353 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
354 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
356 totalMessagesCreated +=
value;
357 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
358 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
362 assert(
index < deviceMetrics.metrics.size());
364 changed |= deviceMetrics.changed[
index];
365 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
366 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
368 totalMessagesDestroyed +=
value;
369 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
370 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
374 assert(
index < deviceMetrics.metrics.size());
375 changed |= deviceMetrics.changed[
index];
377 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
378 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
380 totalTimeframesRead +=
value;
381 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
382 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
386 assert(
index < deviceMetrics.metrics.size());
387 changed |= deviceMetrics.changed[
index];
389 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
390 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
392 totalTimeframesConsumed +=
value;
393 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
394 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
398 assert(
index < deviceMetrics.metrics.size());
399 changed |= deviceMetrics.changed[
index];
401 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
402 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
404 totalTimeframesExpired +=
value;
405 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
406 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
409 static uint64_t unchangedCount = 0;
411 totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp);
412 totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp);
413 totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp);
414 shmOfferConsumedMetric(driverMetrics, shmOfferBytesConsumed, timestamp);
415 totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp);
416 totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
417 totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
418 totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
419 totalTimeframesInFlyMetric(driverMetrics, (
int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
420 totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
424 changedCountMetric(driverMetrics, unchangedCount, timestamp);
427 .
name =
"shared memory",
429 .api =
"/shm-offer {}",
433 .metricOfferScaleFactor = 1000000,
437 .unit =
"timeslices",
438 .api =
"/timeslice-offer {}",
442 .metricOfferScaleFactor = 1,
445 .
available = shmResourceSpec.maxAvailable,
448 .
available = timesliceResourceSpec.maxAvailable,
451 .
enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
452 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
455 .
enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
456 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
459 offerResources(timesliceResourceState, timesliceResourceSpec, timesliceResourceStats,
460 specs, infos, manager, totalTimeframesConsumed, totalTimeframesExpired,
461 totalTimeframesRead, totalTimeframesConsumed, timestamp, driverMetrics,
462 availableTimeslicesMetric, unusedOfferedTimeslicesMetric, offeredTimeslicesMetric,
465 offerResources(shmResourceState, shmResourceSpec, shmResourceStats,
466 specs, infos, manager, shmOfferBytesConsumed, totalBytesExpired,
467 totalBytesCreated, totalBytesDestroyed, timestamp, driverMetrics,
468 availableSharedMemoryMetric, unusedOfferedSharedMemoryMetric, offeredSharedMemoryMetric,
474 auto totalMessages = 0;
476 for (
auto& input : ctx.
inputs()) {
477 if (input.header ==
nullptr) {
480 auto const* dh = DataRefUtils::getHeader<DataHeader*>(input);
484 "Message %{public}.4s/%{public}.16s is not of kind arrow, therefore we are not accounting its shared memory.",
485 dh->dataOrigin.str, dh->dataDescription.str);
488 bool forwarded =
false;
497 "Message %{public}.4s/%{public}.16s is forwarded so we are not returning its memory.",
498 dh->dataOrigin.str, dh->dataDescription.str);
502 "Message %{public}.4s/%{public}.16s is being deleted. We will return %{bytes}f MB.",
503 dh->dataOrigin.str, dh->dataDescription.str, payloadSize / 1000000.);
504 totalBytes += payloadSize;
507 arrow->updateBytesDestroyed(totalBytes);
509 "%{bytes}f MB bytes being given back to reader, totaling %{bytes}f MB",
510 totalBytes / 1000000.,
arrow->bytesDestroyed() / 1000000.);
511 arrow->updateMessagesDestroyed(totalMessages);
515 stats.processCommandQueue(); },
518 int readers = std::stoll(dc.options[
"readers"].as<std::string>());
519 if (dc.options.count(
"aod-memory-rate-limit") && dc.options[
"aod-memory-rate-limit"].defaulted() ==
false) {
520 config->maxMemory = std::stoll(dc.options[
"aod-memory-rate-limit"].as<std::string>()) / 1000000;
522 config->maxMemory = readers * 500;
524 if (dc.options.count(
"timeframes-rate-limit") && dc.options[
"timeframes-rate-limit"].defaulted() ==
false) {
525 config->maxTimeframes = std::stoll(dc.options[
"timeframes-rate-limit"].as<std::string>());
527 config->maxTimeframes = readers;
529 static bool once =
false;
534 "Rate limiting set up at %{bytes}llu MB and %llu timeframes distributed over %d readers",
535 config->maxMemory, config->maxTimeframes, readers);
536 registry.
registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
540 auto& workflow = node.
specs;
541 auto spawner = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-spawner"; });
542 auto analysisCCDB = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-ccdb"; });
543 auto builder = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-index-builder"; });
544 auto reader = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-reader"; });
545 auto writer = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-writer"; });
548 ac.requestedDYNs.clear();
549 ac.providedDYNs.clear();
550 ac.providedTIMs.clear();
551 ac.requestedTIMs.clear();
557 if (builder != workflow.end()) {
559 ac.requestedIDXs.clear();
560 for (
auto& d : workflow) {
561 if (d.name == builder->name) {
564 for (
auto&
i : d.inputs) {
572 builder->inputs.clear();
573 builder->outputs.clear();
580 if (
spawner != workflow.end()) {
582 for (
auto& d : workflow) {
586 for (
auto const&
i : d.inputs) {
592 for (
auto const& o : d.outputs) {
594 ac.providedDYNs.emplace_back(o);
598 std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
599 std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
600 ac.spawnerInputs.clear();
601 for (
auto& input : ac.requestedDYNs) {
602 if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](
auto const&
x) { return DataSpecUtils::match(input, x); })) {
603 ac.spawnerInputs.emplace_back(input);
615 if (analysisCCDB != workflow.end()) {
616 for (
auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
620 std::sort(ac.requestedTIMs.begin(), ac.requestedTIMs.end(), inputSpecLessThan);
621 std::sort(ac.providedTIMs.begin(), ac.providedTIMs.end(), outputSpecLessThan);
623 ac.analysisCCDBInputs.clear();
624 ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) |
sinks::append_to{ac.analysisCCDBInputs};
627 analysisCCDB->outputs.clear();
628 analysisCCDB->inputs.clear();
636 if (writer != workflow.end()) {
637 workflow.erase(writer);
640 if (reader != workflow.end()) {
643 for (
auto& d : workflow) {
644 for (
auto const&
i : d.inputs) {
653 auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](
OutputSpec const& o) {
656 reader->outputs.erase(o_end, reader->outputs.end());
657 if (reader->outputs.empty()) {
659 workflow.erase(reader);
674 ac.outputsInputsAOD.clear();
676 for (
auto ii = 0u; ii < outputsInputs.size(); ii++) {
678 auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]);
679 if (!
ds.empty() || isDangling[ii]) {
680 ac.outputsInputsAOD.emplace_back(outputsInputs[ii]);
686 if (!ac.outputsInputsAOD.empty()) {
688 ac.outputsInputsAOD.emplace_back(
"tfn",
"TFN",
"TFNumber");
689 ac.outputsInputsAOD.emplace_back(
"tff",
"TFF",
"TFFilename");
693 for (
size_t i = 0;
i < workflow.size(); ++
i) {
694 if (workflow[
i].
name ==
"internal-dpl-injected-dummy-sink") {
695 workflow.push_back(workflow[
i]);
696 workflow.erase(workflow.begin() +
i);