110 using o2::monitoring::Metric;
111 using o2::monitoring::Monitoring;
112 using o2::monitoring::tags::Key;
113 using o2::monitoring::tags::Value;
116 .
name =
"arrow-backend",
126 int64_t totalBytesCreated = 0;
127 int64_t shmOfferBytesConsumed = 0;
128 int64_t totalBytesDestroyed = 0;
129 int64_t totalBytesExpired = 0;
130 int64_t totalMessagesCreated = 0;
131 int64_t totalMessagesDestroyed = 0;
132 int64_t totalTimeframesRead = 0;
133 int64_t totalTimeframesConsumed = 0;
134 auto &driverMetrics = sm.driverMetricsInfo;
135 auto &allDeviceMetrics = sm.deviceMetricsInfos;
136 auto &specs = sm.deviceSpecs;
137 auto &infos = sm.deviceInfos;
140 static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"rate-limit-state");
141 static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-created");
142 static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-shm-offer-bytes-consumed");
143 static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-unused-offered-shared-memory");
144 static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-available-shared-memory");
145 static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-offered-shared-memory");
146 static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-destroyed");
147 static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-expired");
148 static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-messages-created");
149 static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-messages-destroyed");
150 static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-timeframes-read");
151 static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-timeframes-consumed");
152 static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-timeframes-in-fly");
153 static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"arrow-bytes-delta");
154 static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"changed-metrics-count");
155 static auto totalSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-reader-signals");
156 static auto signalLatencyMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-signal-latency");
157 static auto skippedSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-skipped-signals");
158 static auto remainingBytes = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-remaining-bytes");
161 bool changed =
false;
163 size_t lastTimestamp = 0;
165 for (
size_t mi = 0; mi < allDeviceMetrics.size(); ++mi) {
166 auto& deviceMetrics = allDeviceMetrics[mi];
167 if (deviceMetrics.changed.size() != deviceMetrics.metrics.size()) {
168 throw std::runtime_error(
"deviceMetrics.size() != allDeviceMetrics.size()");
170 auto&
indices = allIndices[mi];
173 assert(
index < deviceMetrics.metrics.size());
174 changed |= deviceMetrics.changed[
index];
176 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
177 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
178 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
180 totalBytesCreated +=
value;
181 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
185 assert(
index < deviceMetrics.metrics.size());
186 changed |= deviceMetrics.changed[
index];
188 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
189 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
190 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
192 shmOfferBytesConsumed +=
value;
193 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
197 assert(
index < deviceMetrics.metrics.size());
198 changed |= deviceMetrics.changed[
index];
200 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
201 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
203 totalBytesDestroyed +=
value;
204 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
205 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
209 assert(
index < deviceMetrics.metrics.size());
210 changed |= deviceMetrics.changed[
index];
212 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
213 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
215 totalBytesExpired +=
value;
216 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
217 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
221 assert(
index < deviceMetrics.metrics.size());
223 changed |= deviceMetrics.changed[
index];
224 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
225 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
227 totalMessagesCreated +=
value;
228 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
229 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
233 assert(
index < deviceMetrics.metrics.size());
235 changed |= deviceMetrics.changed[
index];
236 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
237 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
239 totalMessagesDestroyed +=
value;
240 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
241 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
245 assert(
index < deviceMetrics.metrics.size());
246 changed |= deviceMetrics.changed[
index];
248 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
249 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
251 totalTimeframesRead +=
value;
252 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
253 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
257 assert(
index < deviceMetrics.metrics.size());
258 changed |= deviceMetrics.changed[
index];
260 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
261 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
263 totalTimeframesConsumed +=
value;
264 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
265 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
268 static uint64_t unchangedCount = 0;
270 totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp);
271 totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp);
272 totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp);
273 shmOfferConsumedMetric(driverMetrics, shmOfferBytesConsumed, timestamp);
274 totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp);
275 totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
276 totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
277 totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
278 totalTimeframesInFlyMetric(driverMetrics, (
int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
279 totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
283 changedCountMetric(driverMetrics, unchangedCount, timestamp);
285 if (maxTimeframes && (totalTimeframesRead - totalTimeframesConsumed) > maxTimeframes) {
290 constexpr int64_t MAX_QUANTUM_SHARED_MEMORY = 100;
291 constexpr int64_t MIN_QUANTUM_SHARED_MEMORY = 50;
293 static int64_t availableSharedMemory = MAX_SHARED_MEMORY;
294 static int64_t offeredSharedMemory = 0;
295 static int64_t lastDeviceOffered = 0;
298 int64_t lastCandidate = -1;
299 static int enoughSharedMemoryCount = availableSharedMemory - MIN_QUANTUM_SHARED_MEMORY > 0 ? 1 : 0;
300 static int lowSharedMemoryCount = availableSharedMemory - MIN_QUANTUM_SHARED_MEMORY > 0 ? 0 : 1;
301 int64_t possibleOffer = MIN_QUANTUM_SHARED_MEMORY;
302 for (
size_t di = 0;
di < specs.size();
di++) {
303 if (availableSharedMemory < possibleOffer) {
304 if (lowSharedMemoryCount == 0) {
306 "We do not have enough shared memory (%{bytes}llu MB) to offer %{bytes}llu MB. Total offerings %{bytes}llu",
307 availableSharedMemory, possibleOffer, offeredSharedMemory);
309 lowSharedMemoryCount++;
310 enoughSharedMemoryCount = 0;
313 if (enoughSharedMemoryCount == 0) {
315 "We are back in a state where we enough shared memory: %{bytes}llu MB", availableSharedMemory);
317 enoughSharedMemoryCount++;
318 lowSharedMemoryCount = 0;
320 size_t candidate = (lastDeviceOffered +
di) % specs.size();
322 auto& info = infos[candidate];
326 if (info.active ==
false || info.readyToQuit) {
329 if (specs[candidate].
name !=
"internal-dpl-aod-reader") {
332 possibleOffer = std::min(MAX_QUANTUM_SHARED_MEMORY, availableSharedMemory);
334 "Offering %{bytes}llu MB out of %{bytes}llu to %{public}s",
335 possibleOffer, availableSharedMemory, specs[candidate].
id.c_str());
336 manager.queueMessage(specs[candidate].
id.c_str(), fmt::format(
"/shm-offer {}", possibleOffer).
data());
337 availableSharedMemory -= possibleOffer;
338 offeredSharedMemory += possibleOffer;
339 lastCandidate = candidate;
343 if (lastCandidate >= 0) {
344 lastDeviceOffered = lastCandidate + 1;
350 static int64_t lastShmOfferConsumed = 0;
351 static int64_t lastUnusedOfferedMemory = 0;
352 if (shmOfferBytesConsumed != lastShmOfferConsumed) {
354 "Offer consumed so far %{bytes}llu", shmOfferBytesConsumed);
355 lastShmOfferConsumed = shmOfferBytesConsumed;
357 int unusedOfferedMemory = (offeredSharedMemory - (totalBytesExpired + shmOfferBytesConsumed) / 1000000);
358 if (lastUnusedOfferedMemory != unusedOfferedMemory) {
360 "unusedOfferedMemory:%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / 1000000",
361 unusedOfferedMemory, offeredSharedMemory, totalBytesExpired / 1000000, shmOfferBytesConsumed / 1000000);
362 lastUnusedOfferedMemory = unusedOfferedMemory;
367 availableSharedMemory = MAX_SHARED_MEMORY + ((totalBytesDestroyed - totalBytesCreated) / 1000000) - unusedOfferedMemory;
368 availableSharedMemoryMetric(driverMetrics, availableSharedMemory, timestamp);
369 unusedOfferedSharedMemoryMetric(driverMetrics, unusedOfferedMemory, timestamp);
371 offeredSharedMemoryMetric(driverMetrics, offeredSharedMemory, timestamp); },
376 auto totalMessages = 0;
378 for (
auto& input : ctx.
inputs()) {
379 if (input.header ==
nullptr) {
382 auto const* dh = DataRefUtils::getHeader<DataHeader*>(input);
386 "Message %{public}.4s/%{public}.16s is not of kind arrow, therefore we are not accounting its shared memory.",
387 dh->dataOrigin.str, dh->dataDescription.str);
390 bool forwarded =
false;
399 "Message %{public}.4s/%{public}16.s is forwarded so we are not returning its memory.",
400 dh->dataOrigin.str, dh->dataDescription.str);
404 "Message %{public}.4s/%{public}.16s is being deleted. We will return %{bytes}f MB.",
405 dh->dataOrigin.str, dh->dataDescription.str, payloadSize / 1000000.);
406 totalBytes += payloadSize;
409 arrow->updateBytesDestroyed(totalBytes);
411 "%{bytes}f MB bytes being given back to reader, totaling %{bytes}f MB",
412 totalBytes / 1000000.,
arrow->bytesDestroyed() / 1000000.);
413 arrow->updateMessagesDestroyed(totalMessages);
417 stats.processCommandQueue(); },
420 int readers = std::stoll(dc.options[
"readers"].as<std::string>());
421 if (dc.options.count(
"aod-memory-rate-limit") && dc.options[
"aod-memory-rate-limit"].defaulted() ==
false) {
422 config->maxMemory = std::stoll(dc.options[
"aod-memory-rate-limit"].as<std::string>()) / 1000000;
424 config->maxMemory = readers * 500;
426 if (dc.options.count(
"timeframes-rate-limit") && dc.options[
"timeframes-rate-limit"].as<std::string>() ==
"readers") {
427 config->maxTimeframes = readers;
429 config->maxTimeframes = std::stoll(dc.options[
"timeframes-rate-limit"].as<std::string>());
431 static bool once =
false;
436 "Rate limiting set up at %{bytes}llu MB distributed over %d readers",
437 config->maxMemory, readers);
438 registry.
registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
442 auto& workflow = node.
specs;
443 auto spawner = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-spawner"; });
444 auto builder = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-index-builder"; });
445 auto reader = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-reader"; });
446 auto writer = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-writer"; });
449 ac.requestedDYNs.clear();
450 ac.providedDYNs.clear();
456 if (builder != workflow.end()) {
458 ac.requestedIDXs.clear();
459 for (
auto& d : workflow) {
460 if (d.name == builder->name) {
463 for (
auto&
i : d.inputs) {
471 builder->inputs.clear();
472 builder->outputs.clear();
479 if (
spawner != workflow.end()) {
481 for (
auto& d : workflow) {
485 for (
auto const&
i : d.inputs) {
491 for (
auto const&
o : d.outputs) {
493 ac.providedDYNs.emplace_back(
o);
497 std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
498 std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
499 ac.spawnerInputs.clear();
500 for (
auto& input : ac.requestedDYNs) {
501 if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](
auto const&
x) { return DataSpecUtils::match(input, x); })) {
502 ac.spawnerInputs.emplace_back(input);
514 if (writer != workflow.end()) {
515 workflow.erase(writer);
518 if (reader != workflow.end()) {
521 for (
auto& d : workflow) {
522 for (
auto const&
i : d.inputs) {
531 auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](
OutputSpec const&
o) {
534 reader->outputs.erase(o_end, reader->outputs.end());
535 if (reader->outputs.empty()) {
537 workflow.erase(reader);
550 ac.outputsInputsAOD.clear();
552 for (
auto ii = 0u; ii < outputsInputs.size(); ii++) {
554 auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]);
555 if (!
ds.empty() || isDangling[ii]) {
556 ac.outputsInputsAOD.emplace_back(outputsInputs[ii]);
562 if (!ac.outputsInputsAOD.empty()) {
564 ac.outputsInputsAOD.emplace_back(
"tfn",
"TFN",
"TFNumber");
565 ac.outputsInputsAOD.emplace_back(
"tff",
"TFF",
"TFFilename");
569 for (
size_t i = 0;
i < workflow.size(); ++
i) {
570 if (workflow[
i].
name ==
"internal-dpl-injected-dummy-sink") {
571 workflow.push_back(workflow[
i]);
572 workflow.erase(workflow.begin() +
i);