Project
Loading...
Searching...
No Matches
ArrowSupport.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "ArrowSupport.h"
12
30#include "WorkflowHelpers.h"
35#include "Framework/Signpost.h"
36
38#include <Monitoring/Monitoring.h>
39#include "Headers/DataHeader.h"
40
41#include <RtypesCore.h>
42#include <fairmq/ProgOptions.h>
43
44#include <uv.h>
45#include <boost/program_options/variables_map.hpp>
46#include <csignal>
47
49
50namespace o2::framework
51{
52
53class EndOfStreamContext;
54class ProcessingContext;
55
56enum struct RateLimitingState {
57 UNKNOWN = 0, // No information received yet.
58 STARTED = 1, // Information received, new timeframe not requested.
59 CHANGED = 2, // Information received, new timeframe requested but not yet accounted.
60 BELOW_LIMIT = 3, // New metric received, we are below limit.
61 NEXT_ITERATION_FROM_BELOW = 4, // Iteration when previously in BELOW_LIMIT.
62 ABOVE_LIMIT = 5, // New metric received, we are above limit.
63 EMPTY = 6, //
64};
65
67 int64_t maxMemory = 2000;
68 int64_t maxTimeframes = 0;
69};
70
72 size_t arrowBytesCreated = -1;
76 size_t arrowBytesExpired = -1;
78 size_t timeframesRead = -1;
79 size_t timeframesConsumed = -1;
80};
81
82std::vector<MetricIndices> createDefaultIndices(std::vector<DeviceMetricsInfo>& allDevicesMetrics)
83{
84 std::vector<MetricIndices> results;
85
86 for (auto& info : allDevicesMetrics) {
88 indices.arrowBytesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-created");
89 indices.arrowBytesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-destroyed");
90 indices.arrowMessagesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-created");
91 indices.arrowMessagesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-destroyed");
92 indices.arrowBytesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-expired");
93 indices.shmOfferBytesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "shm-offer-bytes-consumed");
94 indices.timeframesRead = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "df-sent");
95 indices.timeframesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "consumed-timeframes");
96 results.push_back(indices);
97 }
98 return results;
99}
100
102{
103 return registry.get<RateLimitConfig>().maxMemory;
104}
105
107 int64_t available;
108 int64_t offered = 0;
109 int64_t lastDeviceOffered = 0;
110};
112 int64_t enoughCount;
113 int64_t lowCount;
114};
116 char const* name;
117 char const* unit;
118 char const* api;
119 int64_t maxAvailable;
120 int64_t maxQuantum;
121 int64_t minQuantum;
123};
124
125auto offerResources(ResourceState& resourceState,
126 ResourceSpec const& resourceSpec,
127 ResourceStats& resourceStats,
128 std::vector<DeviceSpec> const& specs,
129 std::vector<DeviceInfo> const& infos,
130 DevicesManager& manager,
131 int64_t offerConsumedCurrentValue,
132 int64_t offerExpiredCurrentValue,
133 int64_t acquiredResourceCurrentValue,
134 int64_t disposedResourceCurrentValue,
135 size_t timestamp,
136 DeviceMetricsInfo& driverMetrics,
137 std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& availableResourceMetric,
138 std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& unusedOfferedResourceMetric,
139 std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& offeredResourceMetric,
140 void* signpostId) -> void
141{
142 O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, signpostId);
145 int64_t lastCandidate = -1;
146 int64_t possibleOffer = resourceSpec.minQuantum;
147
148 for (size_t di = 0; di < specs.size(); di++) {
149 if (resourceState.available < possibleOffer) {
150 if (resourceStats.lowCount == 0) {
151 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "not enough",
152 "We do not have enough %{public}s (%llu %{public}s) to offer %llu %{public}s. Total offerings %{bytes}llu %{string}s.",
153 resourceSpec.name, resourceState.available, resourceSpec.unit,
154 possibleOffer, resourceSpec.unit,
155 resourceState.offered, resourceSpec.unit);
156 }
157 resourceStats.lowCount++;
158 resourceStats.enoughCount = 0;
159 break;
160 } else {
161 if (resourceStats.enoughCount == 0) {
162 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "enough",
163 "We are back in a state where we enough %{public}s: %llu %{public}s",
164 resourceSpec.name,
165 resourceState.available,
166 resourceSpec.unit);
167 }
168 resourceStats.lowCount = 0;
169 resourceStats.enoughCount++;
170 }
171 size_t candidate = (resourceState.lastDeviceOffered + di) % specs.size();
172
173 auto& info = infos[candidate];
174 // Do not bother for inactive devices
175 // FIXME: there is probably a race condition if the device died and we did not
176 // took notice yet...
177 if (info.active == false || info.readyToQuit) {
178 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
179 "Device %s is inactive not offering %{public}s to it.",
180 specs[candidate].name.c_str(), resourceSpec.name);
181 continue;
182 }
183 if (specs[candidate].name != "internal-dpl-aod-reader") {
184 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
185 "Device %s is not a reader. Not offering %{public}s to it.",
186 specs[candidate].name.c_str(),
187 resourceSpec.name);
188 continue;
189 }
190 possibleOffer = std::min(resourceSpec.maxQuantum, resourceState.available);
191 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
192 "Offering %llu %{public}s out of %llu to %{public}s",
193 possibleOffer, resourceSpec.unit, resourceState.available, specs[candidate].id.c_str());
194 manager.queueMessage(specs[candidate].id.c_str(), fmt::format(fmt::runtime(resourceSpec.api), possibleOffer).data());
195 resourceState.available -= possibleOffer;
196 resourceState.offered += possibleOffer;
197 lastCandidate = candidate;
198 }
199 // We had at least a valid candidate, so
200 // next time we offer to the next device.
201 if (lastCandidate >= 0) {
202 resourceState.lastDeviceOffered = lastCandidate + 1;
203 }
204
205 // unusedOfferedSharedMemory is the amount of memory which was offered and which we know it was
206 // not used so far. So we need to account for the amount which got actually read (readerBytesCreated)
207 // and the amount which we know was given back.
208 static int64_t lastShmOfferConsumed = 0;
209 static int64_t lastUnusedOfferedMemory = 0;
210 if (offerConsumedCurrentValue != lastShmOfferConsumed) {
211 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
212 "Offer consumed so far %llu", offerConsumedCurrentValue);
213 lastShmOfferConsumed = offerConsumedCurrentValue;
214 }
215 int unusedOfferedMemory = (resourceState.offered - (offerExpiredCurrentValue + offerConsumedCurrentValue) / resourceSpec.metricOfferScaleFactor);
216 if (lastUnusedOfferedMemory != unusedOfferedMemory) {
217 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
218 "unusedOfferedMemory:%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / %lli",
219 unusedOfferedMemory, resourceState.offered,
220 offerExpiredCurrentValue / resourceSpec.metricOfferScaleFactor,
221 offerConsumedCurrentValue / resourceSpec.metricOfferScaleFactor,
222 resourceSpec.metricOfferScaleFactor);
223 lastUnusedOfferedMemory = unusedOfferedMemory;
224 }
225 // availableSharedMemory is the amount of memory which we know is available to be offered.
226 // We subtract the amount which we know was already offered but it's unused and we then balance how
227 // much was created with how much was destroyed.
228 resourceState.available = resourceSpec.maxAvailable + ((disposedResourceCurrentValue - acquiredResourceCurrentValue) / resourceSpec.metricOfferScaleFactor) - unusedOfferedMemory;
229 availableResourceMetric(driverMetrics, resourceState.available, timestamp);
230 unusedOfferedResourceMetric(driverMetrics, unusedOfferedMemory, timestamp);
231
232 offeredResourceMetric(driverMetrics, resourceState.offered, timestamp);
233};
234
236{
237 using o2::monitoring::Metric;
238 using o2::monitoring::Monitoring;
239 using o2::monitoring::tags::Key;
240 using o2::monitoring::tags::Value;
241
242 return ServiceSpec{
243 .name = "arrow-backend",
245 .configure = CommonServices::noConfiguration(),
250 .metricHandling = [](ServiceRegistryRef registry,
251 ServiceMetricsInfo const& sm,
252 size_t timestamp) {
253 int64_t totalBytesCreated = 0;
254 int64_t shmOfferBytesConsumed = 0;
255 int64_t totalBytesDestroyed = 0;
256 int64_t totalBytesExpired = 0;
257 int64_t totalMessagesCreated = 0;
258 int64_t totalMessagesDestroyed = 0;
259 int64_t totalTimeframesRead = 0;
260 int64_t totalTimeframesConsumed = 0;
261 auto &driverMetrics = sm.driverMetricsInfo;
262 auto &allDeviceMetrics = sm.deviceMetricsInfos;
263 auto &specs = sm.deviceSpecs;
264 auto &infos = sm.deviceInfos;
265
266 static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "rate-limit-state");
267 static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-created");
268 static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-shm-offer-bytes-consumed");
269 static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-shared-memory");
270 static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-shared-memory");
271 static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-shared-memory");
272 static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-destroyed");
273 static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-expired");
274 static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-created");
275 static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-destroyed");
276 static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-read");
277 static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-consumed");
278 static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-timeframes-in-fly");
279 static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "arrow-bytes-delta");
280 static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "changed-metrics-count");
281 static auto totalSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-reader-signals");
282 static auto signalLatencyMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-signal-latency");
283 static auto skippedSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-skipped-signals");
284 static auto remainingBytes = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-remaining-bytes");
285 auto& manager = registry.get<DevicesManager>();
286
287 bool changed = false;
288
289 size_t lastTimestamp = 0;
290 static std::vector<MetricIndices> allIndices = createDefaultIndices(allDeviceMetrics);
291 for (size_t mi = 0; mi < allDeviceMetrics.size(); ++mi) {
292 auto& deviceMetrics = allDeviceMetrics[mi];
293 if (deviceMetrics.changed.size() != deviceMetrics.metrics.size()) {
294 throw std::runtime_error("deviceMetrics.size() != allDeviceMetrics.size()");
295 }
296 auto& indices = allIndices[mi];
297 {
298 size_t index = indices.arrowBytesCreated;
299 assert(index < deviceMetrics.metrics.size());
300 changed |= deviceMetrics.changed[index];
301 MetricInfo info = deviceMetrics.metrics[index];
302 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
303 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
304 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
305 auto value = (int64_t)data[(info.pos - 1) % data.size()];
306 totalBytesCreated += value;
307 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
308 }
309 {
310 size_t index = indices.shmOfferBytesConsumed;
311 assert(index < deviceMetrics.metrics.size());
312 changed |= deviceMetrics.changed[index];
313 MetricInfo info = deviceMetrics.metrics[index];
314 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
315 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
316 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
317 auto value = (int64_t)data[(info.pos - 1) % data.size()];
318 shmOfferBytesConsumed += value;
319 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
320 }
321 {
322 size_t index = indices.arrowBytesDestroyed;
323 assert(index < deviceMetrics.metrics.size());
324 changed |= deviceMetrics.changed[index];
325 MetricInfo info = deviceMetrics.metrics[index];
326 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
327 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
328 auto value = (int64_t)data[(info.pos - 1) % data.size()];
329 totalBytesDestroyed += value;
330 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
331 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
332 }
333 {
334 size_t index = indices.arrowBytesExpired;
335 assert(index < deviceMetrics.metrics.size());
336 changed |= deviceMetrics.changed[index];
337 MetricInfo info = deviceMetrics.metrics[index];
338 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
339 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
340 auto value = (int64_t)data[(info.pos - 1) % data.size()];
341 totalBytesExpired += value;
342 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
343 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
344 }
345 {
346 size_t index = indices.arrowMessagesCreated;
347 assert(index < deviceMetrics.metrics.size());
348 MetricInfo info = deviceMetrics.metrics[index];
349 changed |= deviceMetrics.changed[index];
350 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
351 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
352 auto value = (int64_t)data[(info.pos - 1) % data.size()];
353 totalMessagesCreated += value;
354 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
355 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
356 }
357 {
358 size_t index = indices.arrowMessagesDestroyed;
359 assert(index < deviceMetrics.metrics.size());
360 MetricInfo info = deviceMetrics.metrics[index];
361 changed |= deviceMetrics.changed[index];
362 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
363 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
364 auto value = (int64_t)data[(info.pos - 1) % data.size()];
365 totalMessagesDestroyed += value;
366 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
367 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
368 }
369 {
370 size_t index = indices.timeframesRead;
371 assert(index < deviceMetrics.metrics.size());
372 changed |= deviceMetrics.changed[index];
373 MetricInfo info = deviceMetrics.metrics[index];
374 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
375 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
376 auto value = (int64_t)data[(info.pos - 1) % data.size()];
377 totalTimeframesRead += value;
378 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
379 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
380 }
381 {
382 size_t index = indices.timeframesConsumed;
383 assert(index < deviceMetrics.metrics.size());
384 changed |= deviceMetrics.changed[index];
385 MetricInfo info = deviceMetrics.metrics[index];
386 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
387 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
388 auto value = (int64_t)data[(info.pos - 1) % data.size()];
389 totalTimeframesConsumed += value;
390 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
391 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
392 }
393 }
394 static uint64_t unchangedCount = 0;
395 if (changed) {
396 totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp);
397 totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp);
398 totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp);
399 shmOfferConsumedMetric(driverMetrics, shmOfferBytesConsumed, timestamp);
400 totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp);
401 totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
402 totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
403 totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
404 totalTimeframesInFlyMetric(driverMetrics, (int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
405 totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
406 } else {
407 unchangedCount++;
408 }
409 changedCountMetric(driverMetrics, unchangedCount, timestamp);
410 auto maxTimeframes = registry.get<RateLimitConfig>().maxTimeframes;
411 if (maxTimeframes && (totalTimeframesRead - totalTimeframesConsumed) > maxTimeframes) {
412 return;
413 }
414 static const ResourceSpec shmResourceSpec{
415 .name = "shared memory",
416 .unit = "MB",
417 .api = "/shm-offer {}",
418 .maxAvailable = (int64_t)calculateAvailableSharedMemory(registry),
419 .maxQuantum = 100,
420 .minQuantum = 50,
421 .metricOfferScaleFactor = 1000000,
422 };
423 static ResourceState shmResourceState{
424 .available = shmResourceSpec.maxAvailable,
425 };
426 static ResourceStats shmResourceStats{
427 .enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
428 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
429 };
430
431 offerResources(shmResourceState, shmResourceSpec, shmResourceStats,
432 specs, infos, manager, shmOfferBytesConsumed, totalBytesExpired,
433 totalBytesCreated, totalBytesDestroyed, timestamp, driverMetrics,
434 availableSharedMemoryMetric, unusedOfferedSharedMemoryMetric, offeredSharedMemoryMetric,
435 (void*)&sm); },
436 .postDispatching = [](ProcessingContext& ctx, void* service) {
438 auto* arrow = reinterpret_cast<ArrowContext*>(service);
439 auto totalBytes = 0;
440 auto totalMessages = 0;
441 O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, &arrow);
442 for (auto& input : ctx.inputs()) {
443 if (input.header == nullptr) {
444 continue;
445 }
446 auto const* dh = DataRefUtils::getHeader<DataHeader*>(input);
447 auto payloadSize = DataRefUtils::getPayloadSize(input);
448 if (dh->serialization != o2::header::gSerializationMethodArrow) {
449 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
450 "Message %{public}.4s/%{public}.16s is not of kind arrow, therefore we are not accounting its shared memory.",
451 dh->dataOrigin.str, dh->dataDescription.str);
452 continue;
453 }
454 bool forwarded = false;
455 for (auto const& forward : ctx.services().get<DeviceSpec const>().forwards) {
456 if (DataSpecUtils::match(forward.matcher, *dh)) {
457 forwarded = true;
458 break;
459 }
460 }
461 if (forwarded) {
462 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
463 "Message %{public}.4s/%{public}.16s is forwarded so we are not returning its memory.",
464 dh->dataOrigin.str, dh->dataDescription.str);
465 continue;
466 }
467 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
468 "Message %{public}.4s/%{public}.16s is being deleted. We will return %{bytes}f MB.",
469 dh->dataOrigin.str, dh->dataDescription.str, payloadSize / 1000000.);
470 totalBytes += payloadSize;
471 totalMessages += 1;
472 }
473 arrow->updateBytesDestroyed(totalBytes);
474 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "give back",
475 "%{bytes}f MB bytes being given back to reader, totaling %{bytes}f MB",
476 totalBytes / 1000000., arrow->bytesDestroyed() / 1000000.);
477 arrow->updateMessagesDestroyed(totalMessages);
478 auto& stats = ctx.services().get<DataProcessingStats>();
479 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_DESTROYED), DataProcessingStats::Op::Set, static_cast<int64_t>(arrow->bytesDestroyed())});
480 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_DESTROYED), DataProcessingStats::Op::Set, static_cast<int64_t>(arrow->messagesDestroyed())});
481 stats.processCommandQueue(); },
482 .driverInit = [](ServiceRegistryRef registry, DeviceConfig const& dc) {
483 auto config = new RateLimitConfig{};
484 int readers = std::stoll(dc.options["readers"].as<std::string>());
485 if (dc.options.count("aod-memory-rate-limit") && dc.options["aod-memory-rate-limit"].defaulted() == false) {
486 config->maxMemory = std::stoll(dc.options["aod-memory-rate-limit"].as<std::string>()) / 1000000;
487 } else {
488 config->maxMemory = readers * 500;
489 }
490 if (dc.options.count("timeframes-rate-limit") && dc.options["timeframes-rate-limit"].as<std::string>() == "readers") {
491 config->maxTimeframes = readers;
492 } else {
493 config->maxTimeframes = std::stoll(dc.options["timeframes-rate-limit"].as<std::string>());
494 }
495 static bool once = false;
496 // Until we guarantee this is called only once...
497 if (!once) {
498 O2_SIGNPOST_ID_GENERATE(sid, rate_limiting);
499 O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "setup",
500 "Rate limiting set up at %{bytes}llu MB distributed over %d readers",
501 config->maxMemory, readers);
502 registry.registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
503 once = true;
504 } },
505 .adjustTopology = [](WorkflowSpecNode& node, ConfigContext const& ctx) {
506 auto& workflow = node.specs;
507 auto spawner = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-spawner"; });
508 auto analysisCCDB = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-ccdb"; });
509 auto builder = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-index-builder"; });
510 auto reader = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-reader"; });
511 auto writer = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-writer"; });
512 auto &ac = ctx.services().get<AnalysisContext>();
513 ac.requestedAODs.clear();
514 ac.requestedDYNs.clear();
515 ac.providedDYNs.clear();
516 ac.providedTIMs.clear();
517 ac.requestedTIMs.clear();
518
519
520 auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
521 auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
522
523 if (builder != workflow.end()) {
524 // collect currently requested IDXs
525 ac.requestedIDXs.clear();
526 for (auto& d : workflow) {
527 if (d.name == builder->name) {
528 continue;
529 }
530 for (auto& i : d.inputs) {
532 auto copy = i;
533 DataSpecUtils::updateInputList(ac.requestedIDXs, std::move(copy));
534 }
535 }
536 }
537 // recreate inputs and outputs
538 builder->inputs.clear();
539 builder->outputs.clear();
540 // replace AlgorithmSpec
541 // FIXME: it should be made more generic, so it does not need replacement...
542 builder->algorithm = readers::AODReaderHelpers::indexBuilderCallback(ac.requestedIDXs);
543 AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.requestedIDXs, ac.requestedAODs, ac.requestedDYNs, *builder);
544 }
545
546 if (spawner != workflow.end()) {
547 // collect currently requested DYNs
548 for (auto& d : workflow) {
549 if (d.name == spawner->name) {
550 continue;
551 }
552 for (auto const& i : d.inputs) {
554 auto copy = i;
555 DataSpecUtils::updateInputList(ac.requestedDYNs, std::move(copy));
556 }
557 }
558 for (auto const& o : d.outputs) {
560 ac.providedDYNs.emplace_back(o);
561 }
562 }
563 }
564 std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
565 std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
566 ac.spawnerInputs.clear();
567 for (auto& input : ac.requestedDYNs) {
568 if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) {
569 ac.spawnerInputs.emplace_back(input);
570 }
571 }
572 // recreate inputs and outputs
573 spawner->outputs.clear();
574 spawner->inputs.clear();
575 // replace AlgorithmSpec
576 // FIXME: it should be made more generic, so it does not need replacement...
577 spawner->algorithm = readers::AODReaderHelpers::aodSpawnerCallback(ac.spawnerInputs);
578 AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, *spawner);
579 }
580
581 if (analysisCCDB != workflow.end()) {
582 for (auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
583 d.inputs | views::partial_match_filter(header::DataOrigin{"ATIM"}) | sinks::update_input_list{ac.requestedTIMs};
584 d.outputs | views::partial_match_filter(header::DataOrigin{"ATIM"}) | sinks::append_to{ac.providedTIMs};
585 }
586 std::sort(ac.requestedTIMs.begin(), ac.requestedTIMs.end(), inputSpecLessThan);
587 std::sort(ac.providedTIMs.begin(), ac.providedTIMs.end(), outputSpecLessThan);
588 // Use ranges::to<std::vector<>> in C++23...
589 ac.analysisCCDBInputs.clear();
590 ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) | sinks::append_to{ac.analysisCCDBInputs};
591
592 // recreate inputs and outputs
593 analysisCCDB->outputs.clear();
594 analysisCCDB->inputs.clear();
595 // replace AlgorithmSpec
596 // FIXME: it should be made more generic, so it does not need replacement...
597 // FIXME how can I make the lookup depend on DYN tables as well??
598 analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
599 AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher({}, ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedDYNs, *analysisCCDB);
600 }
601
602 if (writer != workflow.end()) {
603 workflow.erase(writer);
604 }
605
606 if (reader != workflow.end()) {
607 // If reader and/or builder were adjusted, remove unneeded outputs
608 // update currently requested AODs
609 for (auto& d : workflow) {
610 for (auto const& i : d.inputs) {
611 if (DataSpecUtils::partialMatch(i, AODOrigins)) {
612 auto copy = i;
613 DataSpecUtils::updateInputList(ac.requestedAODs, std::move(copy));
614 }
615 }
616 }
617
618 // remove unmatched outputs
619 auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](OutputSpec const& o) {
620 return !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFNumber"}) && !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFFilename"}) && std::none_of(ac.requestedAODs.begin(), ac.requestedAODs.end(), [&](InputSpec const& i) { return DataSpecUtils::match(i, o); });
621 });
622 reader->outputs.erase(o_end, reader->outputs.end());
623 if (reader->outputs.empty()) {
624 // nothing to read
625 workflow.erase(reader);
626 }
627 }
628
629
630
631 // replace writer as some outputs may have become dangling and some are now consumed
632 auto [outputsInputs, isDangling] = WorkflowHelpers::analyzeOutputs(workflow);
633
634 // create DataOutputDescriptor
635 std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
636
637 // select outputs of type AOD which need to be saved
638 // ATTENTION: if there are dangling outputs the getGlobalAODSink
639 // has to be created in any case!
640 ac.outputsInputsAOD.clear();
641
642 for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
643 if (DataSpecUtils::partialMatch(outputsInputs[ii], extendedAODOrigins)) {
644 auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]);
645 if (!ds.empty() || isDangling[ii]) {
646 ac.outputsInputsAOD.emplace_back(outputsInputs[ii]);
647 }
648 }
649 }
650
651 // file sink for any AOD output
652 if (!ac.outputsInputsAOD.empty()) {
653 // add TFNumber and TFFilename as input to the writer
654 ac.outputsInputsAOD.emplace_back("tfn", "TFN", "TFNumber");
655 ac.outputsInputsAOD.emplace_back("tff", "TFF", "TFFilename");
656 workflow.push_back(AnalysisSupportHelpers::getGlobalAODSink(ctx));
657 }
658 // Move the dummy sink at the end, if needed
659 for (size_t i = 0; i < workflow.size(); ++i) {
660 if (workflow[i].name == "internal-dpl-injected-dummy-sink") {
661 workflow.push_back(workflow[i]);
662 workflow.erase(workflow.begin() + i);
663 break;
664 }
665 } },
666 .kind = ServiceKind::Global};
667}
668
670{
671 return ServiceSpec{
672 .name = "arrow-slicing-cache-def",
673 .uniqueId = CommonServices::simpleServiceId<ArrowTableSlicingCacheDef>(),
674 .init = CommonServices::simpleServiceInit<ArrowTableSlicingCacheDef, ArrowTableSlicingCacheDef, ServiceKind::Global>(),
675 .kind = ServiceKind::Global};
676}
677
679{
680 return ServiceSpec{
681 .name = "arrow-slicing-cache",
682 .uniqueId = CommonServices::simpleServiceId<ArrowTableSlicingCache>(),
683 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions&) { return ServiceHandle{TypeIdHelpers::uniqueId<ArrowTableSlicingCache>(),
684 new ArrowTableSlicingCache(Cache{services.get<ArrowTableSlicingCacheDef>().bindingsKeys},
685 Cache{services.get<ArrowTableSlicingCacheDef>().bindingsKeysUnsorted}),
687 .configure = CommonServices::noConfiguration(),
688 .preProcessing = [](ProcessingContext& pc, void* service_ptr) {
689 auto* service = static_cast<ArrowTableSlicingCache*>(service_ptr);
690 auto& caches = service->bindingsKeys;
691 for (auto i = 0u; i < caches.size(); ++i) {
692 if (caches[i].enabled && pc.inputs().getPos(caches[i].binding.c_str()) >= 0) {
693 auto status = service->updateCacheEntry(i, pc.inputs().get<TableConsumer>(caches[i].binding.c_str())->asArrowTable());
694 if (!status.ok()) {
695 throw runtime_error_f("Failed to update slice cache for %s/%s", caches[i].binding.c_str(), caches[i].key.c_str());
696 }
697 }
698 }
699 auto& unsortedCaches = service->bindingsKeysUnsorted;
700 for (auto i = 0u; i < unsortedCaches.size(); ++i) {
701 if (unsortedCaches[i].enabled && pc.inputs().getPos(unsortedCaches[i].binding.c_str()) >= 0) {
702 auto status = service->updateCacheEntryUnsorted(i, pc.inputs().get<TableConsumer>(unsortedCaches[i].binding.c_str())->asArrowTable());
703 if (!status.ok()) {
704 throw runtime_error_f("failed to update slice cache (unsorted) for %s/%s", unsortedCaches[i].binding.c_str(), unsortedCaches[i].key.c_str());
705 }
706 }
707 } },
708 .kind = ServiceKind::Stream};
709}
710
711} // namespace o2::framework
int32_t i
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:531
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
int getPos(const char *name) const
decltype(auto) get(R binding, int part=0) const
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
std::shared_ptr< arrow::Table > asArrowTable()
Return the table in the message as a arrow::Table instance.
GLint GLenum GLint x
Definition glcorearb.h:403
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum GLenum GLsizei const GLuint GLboolean enabled
Definition glcorearb.h:2513
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLboolean * data
Definition glcorearb.h:298
GLsizei GLenum const void * indices
Definition glcorearb.h:400
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
auto spawner(std::shared_ptr< arrow::Table > const &fullTable, const char *name, o2::framework::expressions::Projector *projectors, std::shared_ptr< gandiva::Projector > &projector, std::shared_ptr< arrow::Schema > const &schema)
Expression-based column generator to materialize columns.
auto offerResources(ResourceState &resourceState, ResourceSpec const &resourceSpec, ResourceStats &resourceStats, std::vector< DeviceSpec > const &specs, std::vector< DeviceInfo > const &infos, DevicesManager &manager, int64_t offerConsumedCurrentValue, int64_t offerExpiredCurrentValue, int64_t acquiredResourceCurrentValue, int64_t disposedResourceCurrentValue, size_t timestamp, DeviceMetricsInfo &driverMetrics, std::function< void(DeviceMetricsInfo &, int value, size_t timestamp)> &availableResourceMetric, std::function< void(DeviceMetricsInfo &, int value, size_t timestamp)> &unusedOfferedResourceMetric, std::function< void(DeviceMetricsInfo &, int value, size_t timestamp)> &offeredResourceMetric, void *signpostId) -> void
std::vector< Entry > Cache
std::vector< MetricIndices > createDefaultIndices(std::vector< DeviceMetricsInfo > &allDevicesMetrics)
uint64_t calculateAvailableSharedMemory(ServiceRegistryRef registry)
RuntimeErrorRef runtime_error_f(const char *,...)
constexpr o2::header::SerializationMethod gSerializationMethodArrow
Definition DataHeader.h:331
std::vector< InputSpec > requestedAODs
static void addMissingOutputsToBuilder(std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static std::shared_ptr< DataOutputDirector > getDataOutputDirector(ConfigContext const &ctx)
Get the data director.
static void addMissingOutputsToSpawner(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, DataProcessorSpec &publisher)
static void addMissingOutputsToAnalysisCCDBFetcher(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static DataProcessorSpec getGlobalAODSink(ConfigContext const &)
writes inputs of kind AOD to file
static ServiceSpec arrowTableSlicingCacheSpec()
static ServiceSpec arrowBackendSpec()
static ServiceSpec arrowTableSlicingCacheDefSpec()
static ServiceConfigureCallback noConfiguration()
Helper struct to hold statistics about the data processing happening.
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
Running state information of a given device.
Definition DeviceState.h:34
void queueMessage(char const *receiver, char const *msg)
static auto loadAlgorithmFromPlugin(std::string library, std::string plugin, ConfigContext const &context) -> AlgorithmSpec
int64_t maxAvailable
The callback to give resources to a device.
int64_t minQuantum
Largest offer which can be given.
int64_t maxQuantum
Maximum available quantity for a resource.
int64_t metricOfferScaleFactor
Smallest offer which can be given.
int64_t lowCount
How many times the resources were enough.
std::string name
Name of the service.
static std::tuple< std::vector< InputSpec >, std::vector< bool > > analyzeOutputs(WorkflowSpec const &workflow)
std::vector< DataProcessorSpec > & specs
static AlgorithmSpec aodSpawnerCallback(std::vector< InputSpec > &requested)
static AlgorithmSpec indexBuilderCallback(std::vector< InputSpec > &requested)
the main header struct
Definition DataHeader.h:619
o2::mch::DsIndex ds