113 using o2::monitoring::Metric;
114 using o2::monitoring::Monitoring;
115 using o2::monitoring::tags::Key;
116 using o2::monitoring::tags::Value;
119 .
name =
"arrow-backend",
129 int64_t totalBytesCreated = 0;
130 int64_t shmOfferBytesConsumed = 0;
131 int64_t totalBytesDestroyed = 0;
132 int64_t totalBytesExpired = 0;
133 int64_t totalMessagesCreated = 0;
134 int64_t totalMessagesDestroyed = 0;
135 int64_t totalTimeframesRead = 0;
136 int64_t totalTimeframesConsumed = 0;
137 auto &driverMetrics = sm.driverMetricsInfo;
138 auto &allDeviceMetrics = sm.deviceMetricsInfos;
139 auto &specs = sm.deviceSpecs;
140 auto &infos = sm.deviceInfos;
143 static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"rate-limit-state");
144 static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-created");
145 static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-shm-offer-bytes-consumed");
146 static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-unused-offered-shared-memory");
147 static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-available-shared-memory");
148 static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-offered-shared-memory");
149 static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-destroyed");
150 static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-expired");
151 static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-messages-created");
152 static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-messages-destroyed");
153 static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-timeframes-read");
154 static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-timeframes-consumed");
155 static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-timeframes-in-fly");
156 static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"arrow-bytes-delta");
157 static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"changed-metrics-count");
158 static auto totalSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-reader-signals");
159 static auto signalLatencyMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-signal-latency");
160 static auto skippedSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-skipped-signals");
161 static auto remainingBytes = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-remaining-bytes");
164 bool changed =
false;
166 size_t lastTimestamp = 0;
168 for (
size_t mi = 0; mi < allDeviceMetrics.size(); ++mi) {
169 auto& deviceMetrics = allDeviceMetrics[mi];
170 if (deviceMetrics.changed.size() != deviceMetrics.metrics.size()) {
171 throw std::runtime_error(
"deviceMetrics.size() != allDeviceMetrics.size()");
173 auto&
indices = allIndices[mi];
176 assert(
index < deviceMetrics.metrics.size());
177 changed |= deviceMetrics.changed[
index];
179 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
180 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
181 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
183 totalBytesCreated +=
value;
184 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
188 assert(
index < deviceMetrics.metrics.size());
189 changed |= deviceMetrics.changed[
index];
191 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
192 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
193 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
195 shmOfferBytesConsumed +=
value;
196 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
200 assert(
index < deviceMetrics.metrics.size());
201 changed |= deviceMetrics.changed[
index];
203 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
204 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
206 totalBytesDestroyed +=
value;
207 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
208 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
212 assert(
index < deviceMetrics.metrics.size());
213 changed |= deviceMetrics.changed[
index];
215 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
216 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
218 totalBytesExpired +=
value;
219 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
220 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
224 assert(
index < deviceMetrics.metrics.size());
226 changed |= deviceMetrics.changed[
index];
227 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
228 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
230 totalMessagesCreated +=
value;
231 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
232 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
236 assert(
index < deviceMetrics.metrics.size());
238 changed |= deviceMetrics.changed[
index];
239 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
240 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
242 totalMessagesDestroyed +=
value;
243 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
244 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
248 assert(
index < deviceMetrics.metrics.size());
249 changed |= deviceMetrics.changed[
index];
251 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
252 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
254 totalTimeframesRead +=
value;
255 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
256 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
260 assert(
index < deviceMetrics.metrics.size());
261 changed |= deviceMetrics.changed[
index];
263 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
264 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
266 totalTimeframesConsumed +=
value;
267 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
268 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
271 static uint64_t unchangedCount = 0;
273 totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp);
274 totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp);
275 totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp);
276 shmOfferConsumedMetric(driverMetrics, shmOfferBytesConsumed, timestamp);
277 totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp);
278 totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
279 totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
280 totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
281 totalTimeframesInFlyMetric(driverMetrics, (
int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
282 totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
286 changedCountMetric(driverMetrics, unchangedCount, timestamp);
288 if (maxTimeframes && (totalTimeframesRead - totalTimeframesConsumed) > maxTimeframes) {
293 constexpr int64_t MAX_QUANTUM_SHARED_MEMORY = 100;
294 constexpr int64_t MIN_QUANTUM_SHARED_MEMORY = 50;
296 static int64_t availableSharedMemory = MAX_SHARED_MEMORY;
297 static int64_t offeredSharedMemory = 0;
298 static int64_t lastDeviceOffered = 0;
301 int64_t lastCandidate = -1;
302 static int enoughSharedMemoryCount = availableSharedMemory - MIN_QUANTUM_SHARED_MEMORY > 0 ? 1 : 0;
303 static int lowSharedMemoryCount = availableSharedMemory - MIN_QUANTUM_SHARED_MEMORY > 0 ? 0 : 1;
304 int64_t possibleOffer = MIN_QUANTUM_SHARED_MEMORY;
305 for (
size_t di = 0;
di < specs.size();
di++) {
306 if (availableSharedMemory < possibleOffer) {
307 if (lowSharedMemoryCount == 0) {
309 "We do not have enough shared memory (%{bytes}llu MB) to offer %{bytes}llu MB. Total offerings %{bytes}llu",
310 availableSharedMemory, possibleOffer, offeredSharedMemory);
312 lowSharedMemoryCount++;
313 enoughSharedMemoryCount = 0;
316 if (enoughSharedMemoryCount == 0) {
318 "We are back in a state where we enough shared memory: %{bytes}llu MB", availableSharedMemory);
320 enoughSharedMemoryCount++;
321 lowSharedMemoryCount = 0;
323 size_t candidate = (lastDeviceOffered +
di) % specs.size();
325 auto& info = infos[candidate];
329 if (info.active ==
false || info.readyToQuit) {
332 if (specs[candidate].
name !=
"internal-dpl-aod-reader") {
335 possibleOffer = std::min(MAX_QUANTUM_SHARED_MEMORY, availableSharedMemory);
337 "Offering %{bytes}llu MB out of %{bytes}llu to %{public}s",
338 possibleOffer, availableSharedMemory, specs[candidate].
id.c_str());
339 manager.queueMessage(specs[candidate].
id.c_str(), fmt::format(
"/shm-offer {}", possibleOffer).
data());
340 availableSharedMemory -= possibleOffer;
341 offeredSharedMemory += possibleOffer;
342 lastCandidate = candidate;
346 if (lastCandidate >= 0) {
347 lastDeviceOffered = lastCandidate + 1;
353 static int64_t lastShmOfferConsumed = 0;
354 static int64_t lastUnusedOfferedMemory = 0;
355 if (shmOfferBytesConsumed != lastShmOfferConsumed) {
357 "Offer consumed so far %{bytes}llu", shmOfferBytesConsumed);
358 lastShmOfferConsumed = shmOfferBytesConsumed;
360 int unusedOfferedMemory = (offeredSharedMemory - (totalBytesExpired + shmOfferBytesConsumed) / 1000000);
361 if (lastUnusedOfferedMemory != unusedOfferedMemory) {
363 "unusedOfferedMemory:%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / 1000000",
364 unusedOfferedMemory, offeredSharedMemory, totalBytesExpired / 1000000, shmOfferBytesConsumed / 1000000);
365 lastUnusedOfferedMemory = unusedOfferedMemory;
370 availableSharedMemory = MAX_SHARED_MEMORY + ((totalBytesDestroyed - totalBytesCreated) / 1000000) - unusedOfferedMemory;
371 availableSharedMemoryMetric(driverMetrics, availableSharedMemory, timestamp);
372 unusedOfferedSharedMemoryMetric(driverMetrics, unusedOfferedMemory, timestamp);
374 offeredSharedMemoryMetric(driverMetrics, offeredSharedMemory, timestamp); },
379 auto totalMessages = 0;
381 for (
auto& input : ctx.
inputs()) {
382 if (input.header ==
nullptr) {
385 auto const* dh = DataRefUtils::getHeader<DataHeader*>(input);
389 "Message %{public}.4s/%{public}.16s is not of kind arrow, therefore we are not accounting its shared memory.",
390 dh->dataOrigin.str, dh->dataDescription.str);
393 bool forwarded =
false;
402 "Message %{public}.4s/%{public}16.s is forwarded so we are not returning its memory.",
403 dh->dataOrigin.str, dh->dataDescription.str);
407 "Message %{public}.4s/%{public}.16s is being deleted. We will return %{bytes}f MB.",
408 dh->dataOrigin.str, dh->dataDescription.str, payloadSize / 1000000.);
409 totalBytes += payloadSize;
412 arrow->updateBytesDestroyed(totalBytes);
414 "%{bytes}f MB bytes being given back to reader, totaling %{bytes}f MB",
415 totalBytes / 1000000.,
arrow->bytesDestroyed() / 1000000.);
416 arrow->updateMessagesDestroyed(totalMessages);
420 stats.processCommandQueue(); },
423 int readers = std::stoll(dc.options[
"readers"].as<std::string>());
424 if (dc.options.count(
"aod-memory-rate-limit") && dc.options[
"aod-memory-rate-limit"].defaulted() ==
false) {
425 config->maxMemory = std::stoll(dc.options[
"aod-memory-rate-limit"].as<std::string>()) / 1000000;
427 config->maxMemory = readers * 500;
429 if (dc.options.count(
"timeframes-rate-limit") && dc.options[
"timeframes-rate-limit"].as<std::string>() ==
"readers") {
430 config->maxTimeframes = readers;
432 config->maxTimeframes = std::stoll(dc.options[
"timeframes-rate-limit"].as<std::string>());
434 static bool once =
false;
439 "Rate limiting set up at %{bytes}llu MB distributed over %d readers",
440 config->maxMemory, readers);
441 registry.
registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
445 auto& workflow = node.
specs;
446 auto spawner = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-spawner"; });
447 auto analysisCCDB = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-ccdb"; });
448 auto builder = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-index-builder"; });
449 auto reader = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-reader"; });
450 auto writer = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-writer"; });
453 ac.requestedDYNs.clear();
454 ac.providedDYNs.clear();
455 ac.providedTIMs.clear();
456 ac.requestedTIMs.clear();
462 if (builder != workflow.end()) {
464 ac.requestedIDXs.clear();
465 for (
auto& d : workflow) {
466 if (d.name == builder->name) {
469 for (
auto&
i : d.inputs) {
477 builder->inputs.clear();
478 builder->outputs.clear();
485 if (
spawner != workflow.end()) {
487 for (
auto& d : workflow) {
491 for (
auto const&
i : d.inputs) {
497 for (
auto const&
o : d.outputs) {
499 ac.providedDYNs.emplace_back(
o);
503 std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
504 std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
505 ac.spawnerInputs.clear();
506 for (
auto& input : ac.requestedDYNs) {
507 if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](
auto const&
x) { return DataSpecUtils::match(input, x); })) {
508 ac.spawnerInputs.emplace_back(input);
520 if (analysisCCDB != workflow.end()) {
521 for (
auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
525 std::sort(ac.requestedTIMs.begin(), ac.requestedTIMs.end(), inputSpecLessThan);
526 std::sort(ac.providedTIMs.begin(), ac.providedTIMs.end(), outputSpecLessThan);
528 ac.analysisCCDBInputs.clear();
529 ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) |
sinks::append_to{ac.analysisCCDBInputs};
532 analysisCCDB->outputs.clear();
533 analysisCCDB->inputs.clear();
541 if (writer != workflow.end()) {
542 workflow.erase(writer);
545 if (reader != workflow.end()) {
548 for (
auto& d : workflow) {
549 for (
auto const&
i : d.inputs) {
558 auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](
OutputSpec const&
o) {
561 reader->outputs.erase(o_end, reader->outputs.end());
562 if (reader->outputs.empty()) {
564 workflow.erase(reader);
579 ac.outputsInputsAOD.clear();
581 for (
auto ii = 0u; ii < outputsInputs.size(); ii++) {
583 auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]);
584 if (!
ds.empty() || isDangling[ii]) {
585 ac.outputsInputsAOD.emplace_back(outputsInputs[ii]);
591 if (!ac.outputsInputsAOD.empty()) {
593 ac.outputsInputsAOD.emplace_back(
"tfn",
"TFN",
"TFNumber");
594 ac.outputsInputsAOD.emplace_back(
"tff",
"TFF",
"TFFilename");
598 for (
size_t i = 0;
i < workflow.size(); ++
i) {
599 if (workflow[
i].
name ==
"internal-dpl-injected-dummy-sink") {
600 workflow.push_back(workflow[
i]);
601 workflow.erase(workflow.begin() +
i);