237 using o2::monitoring::Metric;
238 using o2::monitoring::Monitoring;
239 using o2::monitoring::tags::Key;
240 using o2::monitoring::tags::Value;
243 .
name =
"arrow-backend",
253 int64_t totalBytesCreated = 0;
254 int64_t shmOfferBytesConsumed = 0;
255 int64_t totalBytesDestroyed = 0;
256 int64_t totalBytesExpired = 0;
257 int64_t totalMessagesCreated = 0;
258 int64_t totalMessagesDestroyed = 0;
259 int64_t totalTimeframesRead = 0;
260 int64_t totalTimeframesConsumed = 0;
261 auto &driverMetrics = sm.driverMetricsInfo;
262 auto &allDeviceMetrics = sm.deviceMetricsInfos;
263 auto &specs = sm.deviceSpecs;
264 auto &infos = sm.deviceInfos;
266 static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"rate-limit-state");
267 static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-created");
268 static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-shm-offer-bytes-consumed");
269 static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-unused-offered-shared-memory");
270 static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-available-shared-memory");
271 static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-offered-shared-memory");
272 static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-destroyed");
273 static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-expired");
274 static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-messages-created");
275 static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-messages-destroyed");
276 static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-timeframes-read");
277 static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-timeframes-consumed");
278 static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-timeframes-in-fly");
279 static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"arrow-bytes-delta");
280 static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"changed-metrics-count");
281 static auto totalSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-reader-signals");
282 static auto signalLatencyMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-signal-latency");
283 static auto skippedSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-skipped-signals");
284 static auto remainingBytes = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-remaining-bytes");
287 bool changed =
false;
289 size_t lastTimestamp = 0;
291 for (
size_t mi = 0; mi < allDeviceMetrics.size(); ++mi) {
292 auto& deviceMetrics = allDeviceMetrics[mi];
293 if (deviceMetrics.changed.size() != deviceMetrics.metrics.size()) {
294 throw std::runtime_error(
"deviceMetrics.size() != allDeviceMetrics.size()");
296 auto&
indices = allIndices[mi];
299 assert(
index < deviceMetrics.metrics.size());
300 changed |= deviceMetrics.changed[
index];
302 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
303 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
304 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
306 totalBytesCreated +=
value;
307 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
311 assert(
index < deviceMetrics.metrics.size());
312 changed |= deviceMetrics.changed[
index];
314 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
315 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
316 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
318 shmOfferBytesConsumed +=
value;
319 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
323 assert(
index < deviceMetrics.metrics.size());
324 changed |= deviceMetrics.changed[
index];
326 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
327 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
329 totalBytesDestroyed +=
value;
330 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
331 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
335 assert(
index < deviceMetrics.metrics.size());
336 changed |= deviceMetrics.changed[
index];
338 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
339 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
341 totalBytesExpired +=
value;
342 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
343 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
347 assert(
index < deviceMetrics.metrics.size());
349 changed |= deviceMetrics.changed[
index];
350 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
351 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
353 totalMessagesCreated +=
value;
354 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
355 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
359 assert(
index < deviceMetrics.metrics.size());
361 changed |= deviceMetrics.changed[
index];
362 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
363 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
365 totalMessagesDestroyed +=
value;
366 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
367 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
371 assert(
index < deviceMetrics.metrics.size());
372 changed |= deviceMetrics.changed[
index];
374 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
375 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
377 totalTimeframesRead +=
value;
378 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
379 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
383 assert(
index < deviceMetrics.metrics.size());
384 changed |= deviceMetrics.changed[
index];
386 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
387 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
389 totalTimeframesConsumed +=
value;
390 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
391 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
394 static uint64_t unchangedCount = 0;
396 totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp);
397 totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp);
398 totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp);
399 shmOfferConsumedMetric(driverMetrics, shmOfferBytesConsumed, timestamp);
400 totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp);
401 totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
402 totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
403 totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
404 totalTimeframesInFlyMetric(driverMetrics, (
int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
405 totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
409 changedCountMetric(driverMetrics, unchangedCount, timestamp);
411 if (maxTimeframes && (totalTimeframesRead - totalTimeframesConsumed) > maxTimeframes) {
415 .
name =
"shared memory",
417 .api =
"/shm-offer {}",
421 .metricOfferScaleFactor = 1000000,
424 .
available = shmResourceSpec.maxAvailable,
427 .
enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
428 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
431 offerResources(shmResourceState, shmResourceSpec, shmResourceStats,
432 specs, infos, manager, shmOfferBytesConsumed, totalBytesExpired,
433 totalBytesCreated, totalBytesDestroyed, timestamp, driverMetrics,
434 availableSharedMemoryMetric, unusedOfferedSharedMemoryMetric, offeredSharedMemoryMetric,
440 auto totalMessages = 0;
442 for (
auto& input : ctx.
inputs()) {
443 if (input.header ==
nullptr) {
446 auto const* dh = DataRefUtils::getHeader<DataHeader*>(input);
450 "Message %{public}.4s/%{public}.16s is not of kind arrow, therefore we are not accounting its shared memory.",
451 dh->dataOrigin.str, dh->dataDescription.str);
454 bool forwarded =
false;
463 "Message %{public}.4s/%{public}.16s is forwarded so we are not returning its memory.",
464 dh->dataOrigin.str, dh->dataDescription.str);
468 "Message %{public}.4s/%{public}.16s is being deleted. We will return %{bytes}f MB.",
469 dh->dataOrigin.str, dh->dataDescription.str, payloadSize / 1000000.);
470 totalBytes += payloadSize;
473 arrow->updateBytesDestroyed(totalBytes);
475 "%{bytes}f MB bytes being given back to reader, totaling %{bytes}f MB",
476 totalBytes / 1000000.,
arrow->bytesDestroyed() / 1000000.);
477 arrow->updateMessagesDestroyed(totalMessages);
481 stats.processCommandQueue(); },
484 int readers = std::stoll(dc.options[
"readers"].as<std::string>());
485 if (dc.options.count(
"aod-memory-rate-limit") && dc.options[
"aod-memory-rate-limit"].defaulted() ==
false) {
486 config->maxMemory = std::stoll(dc.options[
"aod-memory-rate-limit"].as<std::string>()) / 1000000;
488 config->maxMemory = readers * 500;
490 if (dc.options.count(
"timeframes-rate-limit") && dc.options[
"timeframes-rate-limit"].as<std::string>() ==
"readers") {
491 config->maxTimeframes = readers;
493 config->maxTimeframes = std::stoll(dc.options[
"timeframes-rate-limit"].as<std::string>());
495 static bool once =
false;
500 "Rate limiting set up at %{bytes}llu MB distributed over %d readers",
501 config->maxMemory, readers);
502 registry.
registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
506 auto& workflow = node.
specs;
507 auto spawner = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-spawner"; });
508 auto analysisCCDB = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-ccdb"; });
509 auto builder = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-index-builder"; });
510 auto reader = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-reader"; });
511 auto writer = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-writer"; });
514 ac.requestedDYNs.clear();
515 ac.providedDYNs.clear();
516 ac.providedTIMs.clear();
517 ac.requestedTIMs.clear();
523 if (builder != workflow.end()) {
525 ac.requestedIDXs.clear();
526 for (
auto& d : workflow) {
527 if (d.name == builder->name) {
530 for (
auto&
i : d.inputs) {
538 builder->inputs.clear();
539 builder->outputs.clear();
546 if (
spawner != workflow.end()) {
548 for (
auto& d : workflow) {
552 for (
auto const&
i : d.inputs) {
558 for (
auto const& o : d.outputs) {
560 ac.providedDYNs.emplace_back(o);
564 std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
565 std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
566 ac.spawnerInputs.clear();
567 for (
auto& input : ac.requestedDYNs) {
568 if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](
auto const&
x) { return DataSpecUtils::match(input, x); })) {
569 ac.spawnerInputs.emplace_back(input);
581 if (analysisCCDB != workflow.end()) {
582 for (
auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
586 std::sort(ac.requestedTIMs.begin(), ac.requestedTIMs.end(), inputSpecLessThan);
587 std::sort(ac.providedTIMs.begin(), ac.providedTIMs.end(), outputSpecLessThan);
589 ac.analysisCCDBInputs.clear();
590 ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) |
sinks::append_to{ac.analysisCCDBInputs};
593 analysisCCDB->outputs.clear();
594 analysisCCDB->inputs.clear();
602 if (writer != workflow.end()) {
603 workflow.erase(writer);
606 if (reader != workflow.end()) {
609 for (
auto& d : workflow) {
610 for (
auto const&
i : d.inputs) {
619 auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](
OutputSpec const& o) {
622 reader->outputs.erase(o_end, reader->outputs.end());
623 if (reader->outputs.empty()) {
625 workflow.erase(reader);
640 ac.outputsInputsAOD.clear();
642 for (
auto ii = 0u; ii < outputsInputs.size(); ii++) {
644 auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]);
645 if (!
ds.empty() || isDangling[ii]) {
646 ac.outputsInputsAOD.emplace_back(outputsInputs[ii]);
652 if (!ac.outputsInputsAOD.empty()) {
654 ac.outputsInputsAOD.emplace_back(
"tfn",
"TFN",
"TFNumber");
655 ac.outputsInputsAOD.emplace_back(
"tff",
"TFF",
"TFFilename");
659 for (
size_t i = 0;
i < workflow.size(); ++
i) {
660 if (workflow[
i].
name ==
"internal-dpl-injected-dummy-sink") {
661 workflow.push_back(workflow[
i]);
662 workflow.erase(workflow.begin() +
i);