107 using o2::monitoring::Metric;
108 using o2::monitoring::Monitoring;
109 using o2::monitoring::tags::Key;
110 using o2::monitoring::tags::Value;
113 .
name =
"arrow-backend",
123 int64_t totalBytesCreated = 0;
124 int64_t shmOfferBytesConsumed = 0;
125 int64_t totalBytesDestroyed = 0;
126 int64_t totalBytesExpired = 0;
127 int64_t totalMessagesCreated = 0;
128 int64_t totalMessagesDestroyed = 0;
129 int64_t totalTimeframesRead = 0;
130 int64_t totalTimeframesConsumed = 0;
131 auto &driverMetrics = sm.driverMetricsInfo;
132 auto &allDeviceMetrics = sm.deviceMetricsInfos;
133 auto &specs = sm.deviceSpecs;
134 auto &infos = sm.deviceInfos;
136 static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"rate-limit-state");
137 static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-created");
138 static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-shm-offer-bytes-consumed");
139 static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-unused-offered-shared-memory");
140 static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-available-shared-memory");
141 static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-offered-shared-memory");
142 static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-destroyed");
143 static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-bytes-expired");
144 static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-messages-created");
145 static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-arrow-messages-destroyed");
146 static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-timeframes-read");
147 static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"total-timeframes-consumed");
148 static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics,
"total-timeframes-in-fly");
149 static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"arrow-bytes-delta");
150 static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"changed-metrics-count");
151 static auto totalSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-reader-signals");
152 static auto signalLatencyMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-signal-latency");
153 static auto skippedSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-skipped-signals");
154 static auto remainingBytes = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics,
"aod-remaining-bytes");
157 bool changed =
false;
159 size_t lastTimestamp = 0;
161 for (
size_t mi = 0; mi < allDeviceMetrics.size(); ++mi) {
162 auto& deviceMetrics = allDeviceMetrics[mi];
163 if (deviceMetrics.changed.size() != deviceMetrics.metrics.size()) {
164 throw std::runtime_error(
"deviceMetrics.size() != allDeviceMetrics.size()");
166 auto&
indices = allIndices[mi];
169 assert(
index < deviceMetrics.metrics.size());
170 changed |= deviceMetrics.changed[
index];
172 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
173 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
174 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
176 totalBytesCreated +=
value;
177 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
181 assert(
index < deviceMetrics.metrics.size());
182 changed |= deviceMetrics.changed[
index];
184 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
185 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
186 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
188 shmOfferBytesConsumed +=
value;
189 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
193 assert(
index < deviceMetrics.metrics.size());
194 changed |= deviceMetrics.changed[
index];
196 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
197 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
199 totalBytesDestroyed +=
value;
200 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
201 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
205 assert(
index < deviceMetrics.metrics.size());
206 changed |= deviceMetrics.changed[
index];
208 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
209 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
211 totalBytesExpired +=
value;
212 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
213 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
217 assert(
index < deviceMetrics.metrics.size());
219 changed |= deviceMetrics.changed[
index];
220 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
221 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
223 totalMessagesCreated +=
value;
224 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
225 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
229 assert(
index < deviceMetrics.metrics.size());
231 changed |= deviceMetrics.changed[
index];
232 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
233 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
235 totalMessagesDestroyed +=
value;
236 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
237 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
241 assert(
index < deviceMetrics.metrics.size());
242 changed |= deviceMetrics.changed[
index];
244 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
245 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
247 totalTimeframesRead +=
value;
248 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
249 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
253 assert(
index < deviceMetrics.metrics.size());
254 changed |= deviceMetrics.changed[
index];
256 assert(info.
storeIdx < deviceMetrics.uint64Metrics.size());
257 auto&
data = deviceMetrics.uint64Metrics[info.
storeIdx];
259 totalTimeframesConsumed +=
value;
260 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.
storeIdx];
261 lastTimestamp = std::max(lastTimestamp, timestamps[(info.
pos - 1) %
data.size()]);
264 static uint64_t unchangedCount = 0;
266 totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp);
267 totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp);
268 totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp);
269 shmOfferConsumedMetric(driverMetrics, shmOfferBytesConsumed, timestamp);
270 totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp);
271 totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
272 totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
273 totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
274 totalTimeframesInFlyMetric(driverMetrics, (
int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
275 totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
279 changedCountMetric(driverMetrics, unchangedCount, timestamp);
281 if (maxTimeframes && (totalTimeframesRead - totalTimeframesConsumed) > maxTimeframes) {
286 constexpr int64_t MAX_QUANTUM_SHARED_MEMORY = 100;
287 constexpr int64_t MIN_QUANTUM_SHARED_MEMORY = 50;
289 static int64_t availableSharedMemory = MAX_SHARED_MEMORY;
290 static int64_t offeredSharedMemory = 0;
291 static int64_t lastDeviceOffered = 0;
294 int64_t lastCandidate = -1;
295 static int enoughSharedMemoryCount = availableSharedMemory - MIN_QUANTUM_SHARED_MEMORY > 0 ? 1 : 0;
296 static int lowSharedMemoryCount = availableSharedMemory - MIN_QUANTUM_SHARED_MEMORY > 0 ? 0 : 1;
297 int64_t possibleOffer = MIN_QUANTUM_SHARED_MEMORY;
298 for (
size_t di = 0;
di < specs.size();
di++) {
299 if (availableSharedMemory < possibleOffer) {
300 if (lowSharedMemoryCount == 0) {
301 LOGP(detail,
"We do not have enough shared memory ({}MB) to offer {}MB. Total offerings {}", availableSharedMemory, possibleOffer, offeredSharedMemory);
303 lowSharedMemoryCount++;
304 enoughSharedMemoryCount = 0;
307 if (enoughSharedMemoryCount == 0) {
308 LOGP(detail,
"We are back in a state where we enough shared memory: {}MB", availableSharedMemory);
310 enoughSharedMemoryCount++;
311 lowSharedMemoryCount = 0;
313 size_t candidate = (lastDeviceOffered +
di) % specs.size();
315 auto& info = infos[candidate];
319 if (info.active ==
false || info.readyToQuit) {
322 if (specs[candidate].
name !=
"internal-dpl-aod-reader") {
325 possibleOffer = std::min(MAX_QUANTUM_SHARED_MEMORY, availableSharedMemory);
326 LOGP(detail,
"Offering {}MB out of {} to {}", possibleOffer, availableSharedMemory, specs[candidate].
id);
327 manager.queueMessage(specs[candidate].
id.c_str(), fmt::format(
"/shm-offer {}", possibleOffer).
data());
328 availableSharedMemory -= possibleOffer;
329 offeredSharedMemory += possibleOffer;
330 lastCandidate = candidate;
334 if (lastCandidate >= 0) {
335 lastDeviceOffered = lastCandidate + 1;
341 static int64_t lastShmOfferConsumed = 0;
342 static int64_t lastUnusedOfferedMemory = 0;
343 if (shmOfferBytesConsumed != lastShmOfferConsumed) {
344 LOGP(detail,
"Offer consumed so far {}", shmOfferBytesConsumed);
345 lastShmOfferConsumed = shmOfferBytesConsumed;
347 int unusedOfferedMemory = (offeredSharedMemory - (totalBytesExpired + shmOfferBytesConsumed) / 1000000);
348 if (lastUnusedOfferedMemory != unusedOfferedMemory) {
349 LOGP(detail,
"unusedOfferedMemory:{} = offered:{} - (expired:{} + consumed:{}) / 1000000", unusedOfferedMemory, offeredSharedMemory, totalBytesExpired / 1000000, shmOfferBytesConsumed / 1000000);
350 lastUnusedOfferedMemory = unusedOfferedMemory;
355 availableSharedMemory = MAX_SHARED_MEMORY + ((totalBytesDestroyed - totalBytesCreated) / 1000000) - unusedOfferedMemory;
356 availableSharedMemoryMetric(driverMetrics, availableSharedMemory, timestamp);
357 unusedOfferedSharedMemoryMetric(driverMetrics, unusedOfferedMemory, timestamp);
359 offeredSharedMemoryMetric(driverMetrics, offeredSharedMemory, timestamp); },
364 auto totalMessages = 0;
365 for (
auto& input : ctx.
inputs()) {
366 if (input.header ==
nullptr) {
369 auto const* dh = DataRefUtils::getHeader<DataHeader*>(input);
372 LOGP(
debug,
"Message {}/{} is not of kind arrow, therefore we are not accounting its shared memory", dh->dataOrigin, dh->dataDescription);
375 bool forwarded =
false;
383 LOGP(
debug,
"Message {}/{} is forwarded so we are not returning its memory.", dh->dataOrigin, dh->dataDescription);
386 LOGP(
debug,
"Message {}/{} is being deleted. We will return {}MB.", dh->dataOrigin, dh->dataDescription, payloadSize / 1000000.);
387 totalBytes += payloadSize;
390 arrow->updateBytesDestroyed(totalBytes);
391 LOGP(
debug,
"{}MB bytes being given back to reader, totaling {}MB", totalBytes / 1000000.,
arrow->bytesDestroyed() / 1000000.);
392 arrow->updateMessagesDestroyed(totalMessages);
396 stats.processCommandQueue(); },
399 int readers = std::stoll(dc.options[
"readers"].as<std::string>());
400 if (dc.options.count(
"aod-memory-rate-limit") && dc.options[
"aod-memory-rate-limit"].defaulted() ==
false) {
401 config->maxMemory = std::stoll(dc.options[
"aod-memory-rate-limit"].as<std::string>()) / 1000000;
403 config->maxMemory = readers * 500;
405 if (dc.options.count(
"timeframes-rate-limit") && dc.options[
"timeframes-rate-limit"].as<std::string>() ==
"readers") {
406 config->maxTimeframes = readers;
408 config->maxTimeframes = std::stoll(dc.options[
"timeframes-rate-limit"].as<std::string>());
410 static bool once =
false;
413 LOGP(info,
"Rate limiting set up at {}MB distributed over {} readers", config->maxMemory, readers);
414 registry.
registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
418 auto& workflow = node.
specs;
419 auto spawner = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-spawner"; });
420 auto builder = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-index-builder"; });
421 auto reader = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-reader"; });
422 auto writer = std::find_if(workflow.begin(), workflow.end(), [](
DataProcessorSpec const& spec) { return spec.name ==
"internal-dpl-aod-writer"; });
425 ac.requestedDYNs.clear();
426 ac.providedDYNs.clear();
432 if (builder != workflow.end()) {
434 ac.requestedIDXs.clear();
435 for (
auto& d : workflow) {
436 if (d.name == builder->name) {
439 for (
auto&
i : d.inputs) {
447 builder->inputs.clear();
448 builder->outputs.clear();
455 if (
spawner != workflow.end()) {
457 for (
auto& d : workflow) {
461 for (
auto const&
i : d.inputs) {
467 for (
auto const&
o : d.outputs) {
469 ac.providedDYNs.emplace_back(
o);
473 std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
474 std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
475 ac.spawnerInputs.clear();
476 for (
auto& input : ac.requestedDYNs) {
477 if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](
auto const&
x) { return DataSpecUtils::match(input, x); })) {
478 ac.spawnerInputs.emplace_back(input);
490 if (writer != workflow.end()) {
491 workflow.erase(writer);
494 if (reader != workflow.end()) {
497 for (
auto& d : workflow) {
498 for (
auto const&
i : d.inputs) {
507 auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](
OutputSpec const&
o) {
510 reader->outputs.erase(o_end, reader->outputs.end());
511 if (reader->outputs.empty()) {
513 workflow.erase(reader);
526 ac.outputsInputsAOD.clear();
528 for (
auto ii = 0u; ii < outputsInputs.size(); ii++) {
530 auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]);
531 if (!
ds.empty() || isDangling[ii]) {
532 ac.outputsInputsAOD.emplace_back(outputsInputs[ii]);
538 if (!ac.outputsInputsAOD.empty()) {
540 ac.outputsInputsAOD.emplace_back(
"tfn",
"TFN",
"TFNumber");
541 ac.outputsInputsAOD.emplace_back(
"tff",
"TFF",
"TFFilename");
545 for (
size_t i = 0;
i < workflow.size(); ++
i) {
546 if (workflow[
i].
name ==
"internal-dpl-injected-dummy-sink") {
547 workflow.push_back(workflow[
i]);
548 workflow.erase(workflow.begin() +
i);