Project
Loading...
Searching...
No Matches
ArrowSupport.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "ArrowSupport.h"
12
30#include "WorkflowHelpers.h"
35#include "Framework/Signpost.h"
36
38#include <Monitoring/Monitoring.h>
39#include "Headers/DataHeader.h"
40
41#include <RtypesCore.h>
42#include <fairmq/ProgOptions.h>
43
44#include <uv.h>
45#include <boost/program_options/variables_map.hpp>
46#include <csignal>
47
49
50namespace o2::framework
51{
52
53class EndOfStreamContext;
54class ProcessingContext;
55
56enum struct RateLimitingState {
57 UNKNOWN = 0, // No information received yet.
58 STARTED = 1, // Information received, new timeframe not requested.
59 CHANGED = 2, // Information received, new timeframe requested but not yet accounted.
60 BELOW_LIMIT = 3, // New metric received, we are below limit.
61 NEXT_ITERATION_FROM_BELOW = 4, // Iteration when previously in BELOW_LIMIT.
62 ABOVE_LIMIT = 5, // New metric received, we are above limit.
63 EMPTY = 6, //
64};
65
67 int64_t maxMemory = 2000;
68 int64_t maxTimeframes = 1;
69};
70
72 size_t arrowBytesCreated = -1;
76 size_t arrowBytesExpired = -1;
78 size_t timeframesRead = -1;
79 size_t timeframesConsumed = -1;
80 size_t timeframesExpired = -1;
81};
82
83std::vector<MetricIndices> createDefaultIndices(std::vector<DeviceMetricsInfo>& allDevicesMetrics)
84{
85 std::vector<MetricIndices> results;
86
87 for (auto& info : allDevicesMetrics) {
88 results.emplace_back(MetricIndices{
89 .arrowBytesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-created"),
90 .arrowBytesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-destroyed"),
91 .arrowMessagesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-created"),
92 .arrowMessagesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-destroyed"),
93 .arrowBytesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-expired"),
94 .shmOfferBytesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "shm-offer-bytes-consumed"),
95 .timeframesRead = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "df-sent"),
96 .timeframesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "consumed-timeframes"),
97 .timeframesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "expired-timeframes")});
98 }
99 return results;
100}
101
103 int64_t available;
104 int64_t offered = 0;
105 int64_t lastDeviceOffered = 0;
106};
108 int64_t enoughCount;
109 int64_t lowCount;
110};
112 char const* name;
113 char const* unit;
114 char const* api;
115 int64_t maxAvailable;
116 int64_t maxQuantum;
117 int64_t minQuantum;
119};
120
121auto offerResources(ResourceState& resourceState,
122 ResourceSpec const& resourceSpec,
123 ResourceStats& resourceStats,
124 std::vector<DeviceSpec> const& specs,
125 std::vector<DeviceInfo> const& infos,
126 DevicesManager& manager,
127 int64_t offerConsumedCurrentValue,
128 int64_t offerExpiredCurrentValue,
129 int64_t acquiredResourceCurrentValue,
130 int64_t disposedResourceCurrentValue,
131 size_t timestamp,
132 DeviceMetricsInfo& driverMetrics,
133 std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& availableResourceMetric,
134 std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& unusedOfferedResourceMetric,
135 std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& offeredResourceMetric,
136 void* signpostId) -> void
137{
138 O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, signpostId);
141 int64_t lastCandidate = -1;
142 int64_t possibleOffer = resourceSpec.minQuantum;
143
144 for (size_t di = 0; di < specs.size(); di++) {
145 if (resourceState.available < possibleOffer) {
146 if (resourceStats.lowCount == 0) {
147 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "not enough",
148 "We do not have enough %{public}s (%llu %{public}s) to offer %llu %{public}s. Total offerings %{bytes}llu %{string}s.",
149 resourceSpec.name, resourceState.available, resourceSpec.unit,
150 possibleOffer, resourceSpec.unit,
151 resourceState.offered, resourceSpec.unit);
152 }
153 resourceStats.lowCount++;
154 resourceStats.enoughCount = 0;
155 break;
156 } else {
157 if (resourceStats.enoughCount == 0) {
158 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "enough",
159 "We are back in a state where we enough %{public}s: %llu %{public}s",
160 resourceSpec.name,
161 resourceState.available,
162 resourceSpec.unit);
163 }
164 resourceStats.lowCount = 0;
165 resourceStats.enoughCount++;
166 }
167 size_t candidate = (resourceState.lastDeviceOffered + di) % specs.size();
168
169 auto& info = infos[candidate];
170 // Do not bother for inactive devices
171 // FIXME: there is probably a race condition if the device died and we did not
172 // took notice yet...
173 if (info.active == false || info.readyToQuit) {
174 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
175 "Device %s is inactive not offering %{public}s to it.",
176 specs[candidate].name.c_str(), resourceSpec.name);
177 continue;
178 }
179 if (specs[candidate].name != "internal-dpl-aod-reader") {
180 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
181 "Device %s is not a reader. Not offering %{public}s to it.",
182 specs[candidate].name.c_str(),
183 resourceSpec.name);
184 continue;
185 }
186 possibleOffer = std::min(resourceSpec.maxQuantum, resourceState.available);
187 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
188 "Offering %llu %{public}s out of %llu to %{public}s",
189 possibleOffer, resourceSpec.unit, resourceState.available, specs[candidate].id.c_str());
190 manager.queueMessage(specs[candidate].id.c_str(), fmt::format(fmt::runtime(resourceSpec.api), possibleOffer).data());
191 resourceState.available -= possibleOffer;
192 resourceState.offered += possibleOffer;
193 lastCandidate = candidate;
194 }
195 // We had at least a valid candidate, so
196 // next time we offer to the next device.
197 if (lastCandidate >= 0) {
198 resourceState.lastDeviceOffered = lastCandidate + 1;
199 }
200
201 // unusedOfferedSharedMemory is the amount of memory which was offered and which we know it was
202 // not used so far. So we need to account for the amount which got actually read (readerBytesCreated)
203 // and the amount which we know was given back.
204 static int64_t lastResourceOfferConsumed = 0;
205 static int64_t lastUnusedOfferedResource = 0;
206 if (offerConsumedCurrentValue != lastResourceOfferConsumed) {
207 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
208 "Offer consumed so far %llu", offerConsumedCurrentValue);
209 lastResourceOfferConsumed = offerConsumedCurrentValue;
210 }
211 int unusedOfferedResource = (resourceState.offered - (offerExpiredCurrentValue + offerConsumedCurrentValue) / resourceSpec.metricOfferScaleFactor);
212 if (lastUnusedOfferedResource != unusedOfferedResource) {
213 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
214 "unusedOfferedResource(%{public}s):%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / %lli",
215 resourceSpec.name,
216 unusedOfferedResource, resourceState.offered,
217 offerExpiredCurrentValue / resourceSpec.metricOfferScaleFactor,
218 offerConsumedCurrentValue / resourceSpec.metricOfferScaleFactor,
219 resourceSpec.metricOfferScaleFactor);
220 lastUnusedOfferedResource = unusedOfferedResource;
221 }
222 // availableSharedMemory is the amount of memory which we know is available to be offered.
223 // We subtract the amount which we know was already offered but it's unused and we then balance how
224 // much was created with how much was destroyed.
225 resourceState.available = resourceSpec.maxAvailable + ((disposedResourceCurrentValue - acquiredResourceCurrentValue) / resourceSpec.metricOfferScaleFactor) - unusedOfferedResource;
226 availableResourceMetric(driverMetrics, resourceState.available, timestamp);
227 unusedOfferedResourceMetric(driverMetrics, unusedOfferedResource, timestamp);
228
229 offeredResourceMetric(driverMetrics, resourceState.offered, timestamp);
230};
231
233{
234 using o2::monitoring::Metric;
235 using o2::monitoring::Monitoring;
236 using o2::monitoring::tags::Key;
237 using o2::monitoring::tags::Value;
238
239 return ServiceSpec{
240 .name = "arrow-backend",
242 .configure = CommonServices::noConfiguration(),
247 .metricHandling = [](ServiceRegistryRef registry,
248 ServiceMetricsInfo const& sm,
249 size_t timestamp) {
250 int64_t totalBytesCreated = 0;
251 int64_t shmOfferBytesConsumed = 0;
252 int64_t totalBytesDestroyed = 0;
253 int64_t totalBytesExpired = 0;
254 int64_t totalMessagesCreated = 0;
255 int64_t totalMessagesDestroyed = 0;
256 int64_t totalTimeframesRead = 0;
257 int64_t totalTimeframesConsumed = 0;
258 int64_t totalTimeframesExpired = 0;
259 auto &driverMetrics = sm.driverMetricsInfo;
260 auto &allDeviceMetrics = sm.deviceMetricsInfos;
261 auto &specs = sm.deviceSpecs;
262 auto &infos = sm.deviceInfos;
263
264 static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "rate-limit-state");
265 static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-created");
266 static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-shm-offer-bytes-consumed");
267 // These are really to monitor the rate limiting
268 static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-shared-memory");
269 static auto unusedOfferedTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-timeslices");
270 static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-shared-memory");
271 static auto availableTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-timeslices");
272 static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-shared-memory");
273 static auto offeredTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-timeslices");
274
275 static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-destroyed");
276 static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-expired");
277 static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-created");
278 static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-destroyed");
279 static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-read");
280 static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-consumed");
281 static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-timeframes-in-fly");
282 static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "arrow-bytes-delta");
283 static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "changed-metrics-count");
284 static auto totalSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-reader-signals");
285 static auto signalLatencyMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-signal-latency");
286 static auto skippedSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-skipped-signals");
287 static auto remainingBytes = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-remaining-bytes");
288 auto& manager = registry.get<DevicesManager>();
289
290 bool changed = false;
291
292 size_t lastTimestamp = 0;
293 static std::vector<MetricIndices> allIndices = createDefaultIndices(allDeviceMetrics);
294 for (size_t mi = 0; mi < allDeviceMetrics.size(); ++mi) {
295 auto& deviceMetrics = allDeviceMetrics[mi];
296 if (deviceMetrics.changed.size() != deviceMetrics.metrics.size()) {
297 throw std::runtime_error("deviceMetrics.size() != allDeviceMetrics.size()");
298 }
299 auto& indices = allIndices[mi];
300 {
301 size_t index = indices.arrowBytesCreated;
302 assert(index < deviceMetrics.metrics.size());
303 changed |= deviceMetrics.changed[index];
304 MetricInfo info = deviceMetrics.metrics[index];
305 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
306 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
307 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
308 auto value = (int64_t)data[(info.pos - 1) % data.size()];
309 totalBytesCreated += value;
310 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
311 }
312 {
313 size_t index = indices.shmOfferBytesConsumed;
314 assert(index < deviceMetrics.metrics.size());
315 changed |= deviceMetrics.changed[index];
316 MetricInfo info = deviceMetrics.metrics[index];
317 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
318 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
319 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
320 auto value = (int64_t)data[(info.pos - 1) % data.size()];
321 shmOfferBytesConsumed += value;
322 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
323 }
324 {
325 size_t index = indices.arrowBytesDestroyed;
326 assert(index < deviceMetrics.metrics.size());
327 changed |= deviceMetrics.changed[index];
328 MetricInfo info = deviceMetrics.metrics[index];
329 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
330 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
331 auto value = (int64_t)data[(info.pos - 1) % data.size()];
332 totalBytesDestroyed += value;
333 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
334 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
335 }
336 {
337 size_t index = indices.arrowBytesExpired;
338 assert(index < deviceMetrics.metrics.size());
339 changed |= deviceMetrics.changed[index];
340 MetricInfo info = deviceMetrics.metrics[index];
341 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
342 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
343 auto value = (int64_t)data[(info.pos - 1) % data.size()];
344 totalBytesExpired += value;
345 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
346 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
347 }
348 {
349 size_t index = indices.arrowMessagesCreated;
350 assert(index < deviceMetrics.metrics.size());
351 MetricInfo info = deviceMetrics.metrics[index];
352 changed |= deviceMetrics.changed[index];
353 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
354 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
355 auto value = (int64_t)data[(info.pos - 1) % data.size()];
356 totalMessagesCreated += value;
357 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
358 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
359 }
360 {
361 size_t index = indices.arrowMessagesDestroyed;
362 assert(index < deviceMetrics.metrics.size());
363 MetricInfo info = deviceMetrics.metrics[index];
364 changed |= deviceMetrics.changed[index];
365 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
366 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
367 auto value = (int64_t)data[(info.pos - 1) % data.size()];
368 totalMessagesDestroyed += value;
369 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
370 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
371 }
372 {
373 size_t index = indices.timeframesRead;
374 assert(index < deviceMetrics.metrics.size());
375 changed |= deviceMetrics.changed[index];
376 MetricInfo info = deviceMetrics.metrics[index];
377 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
378 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
379 auto value = (int64_t)data[(info.pos - 1) % data.size()];
380 totalTimeframesRead += value;
381 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
382 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
383 }
384 {
385 size_t index = indices.timeframesConsumed;
386 assert(index < deviceMetrics.metrics.size());
387 changed |= deviceMetrics.changed[index];
388 MetricInfo info = deviceMetrics.metrics[index];
389 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
390 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
391 auto value = (int64_t)data[(info.pos - 1) % data.size()];
392 totalTimeframesConsumed += value;
393 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
394 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
395 }
396 {
397 size_t index = indices.timeframesExpired;
398 assert(index < deviceMetrics.metrics.size());
399 changed |= deviceMetrics.changed[index];
400 MetricInfo info = deviceMetrics.metrics[index];
401 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
402 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
403 auto value = (int64_t)data[(info.pos - 1) % data.size()];
404 totalTimeframesExpired += value;
405 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
406 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
407 }
408 }
409 static uint64_t unchangedCount = 0;
410 if (changed) {
411 totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp);
412 totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp);
413 totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp);
414 shmOfferConsumedMetric(driverMetrics, shmOfferBytesConsumed, timestamp);
415 totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp);
416 totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
417 totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
418 totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
419 totalTimeframesInFlyMetric(driverMetrics, (int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
420 totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
421 } else {
422 unchangedCount++;
423 }
424 changedCountMetric(driverMetrics, unchangedCount, timestamp);
425
426 static const ResourceSpec shmResourceSpec{
427 .name = "shared memory",
428 .unit = "MB",
429 .api = "/shm-offer {}",
430 .maxAvailable = (int64_t)registry.get<RateLimitConfig>().maxMemory,
431 .maxQuantum = 100,
432 .minQuantum = 50,
433 .metricOfferScaleFactor = 1000000,
434 };
435 static const ResourceSpec timesliceResourceSpec{
436 .name = "timeslice",
437 .unit = "timeslices",
438 .api = "/timeslice-offer {}",
439 .maxAvailable = (int64_t)registry.get<RateLimitConfig>().maxTimeframes,
440 .maxQuantum = 1,
441 .minQuantum = 1,
442 .metricOfferScaleFactor = 1,
443 };
444 static ResourceState shmResourceState{
445 .available = shmResourceSpec.maxAvailable,
446 };
447 static ResourceState timesliceResourceState{
448 .available = timesliceResourceSpec.maxAvailable,
449 };
450 static ResourceStats shmResourceStats{
451 .enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
452 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
453 };
454 static ResourceStats timesliceResourceStats{
455 .enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
456 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
457 };
458
459 offerResources(timesliceResourceState, timesliceResourceSpec, timesliceResourceStats,
460 specs, infos, manager, totalTimeframesConsumed, totalTimeframesExpired,
461 totalTimeframesRead, totalTimeframesConsumed, timestamp, driverMetrics,
462 availableTimeslicesMetric, unusedOfferedTimeslicesMetric, offeredTimeslicesMetric,
463 (void*)&sm);
464
465 offerResources(shmResourceState, shmResourceSpec, shmResourceStats,
466 specs, infos, manager, shmOfferBytesConsumed, totalBytesExpired,
467 totalBytesCreated, totalBytesDestroyed, timestamp, driverMetrics,
468 availableSharedMemoryMetric, unusedOfferedSharedMemoryMetric, offeredSharedMemoryMetric,
469 (void*)&sm); },
470 .postDispatching = [](ProcessingContext& ctx, void* service) {
472 auto* arrow = reinterpret_cast<ArrowContext*>(service);
473 auto totalBytes = 0;
474 auto totalMessages = 0;
475 O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, &arrow);
476 for (auto& input : ctx.inputs()) {
477 if (input.header == nullptr) {
478 continue;
479 }
480 auto const* dh = DataRefUtils::getHeader<DataHeader*>(input);
481 auto payloadSize = DataRefUtils::getPayloadSize(input);
482 if (dh->serialization != o2::header::gSerializationMethodArrow) {
483 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
484 "Message %{public}.4s/%{public}.16s is not of kind arrow, therefore we are not accounting its shared memory.",
485 dh->dataOrigin.str, dh->dataDescription.str);
486 continue;
487 }
488 bool forwarded = false;
489 for (auto const& forward : ctx.services().get<DeviceSpec const>().forwards) {
490 if (DataSpecUtils::match(forward.matcher, *dh)) {
491 forwarded = true;
492 break;
493 }
494 }
495 if (forwarded) {
496 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
497 "Message %{public}.4s/%{public}.16s is forwarded so we are not returning its memory.",
498 dh->dataOrigin.str, dh->dataDescription.str);
499 continue;
500 }
501 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
502 "Message %{public}.4s/%{public}.16s is being deleted. We will return %{bytes}f MB.",
503 dh->dataOrigin.str, dh->dataDescription.str, payloadSize / 1000000.);
504 totalBytes += payloadSize;
505 totalMessages += 1;
506 }
507 arrow->updateBytesDestroyed(totalBytes);
508 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "give back",
509 "%{bytes}f MB bytes being given back to reader, totaling %{bytes}f MB",
510 totalBytes / 1000000., arrow->bytesDestroyed() / 1000000.);
511 arrow->updateMessagesDestroyed(totalMessages);
512 auto& stats = ctx.services().get<DataProcessingStats>();
513 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_DESTROYED), DataProcessingStats::Op::Set, static_cast<int64_t>(arrow->bytesDestroyed())});
514 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_DESTROYED), DataProcessingStats::Op::Set, static_cast<int64_t>(arrow->messagesDestroyed())});
515 stats.processCommandQueue(); },
516 .driverInit = [](ServiceRegistryRef registry, DeviceConfig const& dc) {
517 auto config = new RateLimitConfig{};
518 int readers = std::stoll(dc.options["readers"].as<std::string>());
519 if (dc.options.count("aod-memory-rate-limit") && dc.options["aod-memory-rate-limit"].defaulted() == false) {
520 config->maxMemory = std::stoll(dc.options["aod-memory-rate-limit"].as<std::string>()) / 1000000;
521 } else {
522 config->maxMemory = readers * 500;
523 }
524 if (dc.options.count("timeframes-rate-limit") && dc.options["timeframes-rate-limit"].defaulted() == false) {
525 config->maxTimeframes = std::stoll(dc.options["timeframes-rate-limit"].as<std::string>());
526 } else {
527 config->maxTimeframes = readers;
528 }
529 static bool once = false;
530 // Until we guarantee this is called only once...
531 if (!once) {
532 O2_SIGNPOST_ID_GENERATE(sid, rate_limiting);
533 O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "setup",
534 "Rate limiting set up at %{bytes}llu MB and %llu timeframes distributed over %d readers",
535 config->maxMemory, config->maxTimeframes, readers);
536 registry.registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
537 once = true;
538 } },
539 .adjustTopology = [](WorkflowSpecNode& node, ConfigContext const& ctx) {
540 auto& workflow = node.specs;
541 auto spawner = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-spawner"; });
542 auto analysisCCDB = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-ccdb"; });
543 auto builder = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-index-builder"; });
544 auto reader = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-reader"; });
545 auto writer = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-writer"; });
546 auto &ac = ctx.services().get<AnalysisContext>();
547 ac.requestedAODs.clear();
548 ac.requestedDYNs.clear();
549 ac.providedDYNs.clear();
550 ac.providedTIMs.clear();
551 ac.requestedTIMs.clear();
552
553
554 auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
555 auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
556
557 if (builder != workflow.end()) {
558 // collect currently requested IDXs
559 ac.requestedIDXs.clear();
560 for (auto& d : workflow) {
561 if (d.name == builder->name) {
562 continue;
563 }
564 for (auto& i : d.inputs) {
566 auto copy = i;
567 DataSpecUtils::updateInputList(ac.requestedIDXs, std::move(copy));
568 }
569 }
570 }
571 // recreate inputs and outputs
572 builder->inputs.clear();
573 builder->outputs.clear();
574 // replace AlgorithmSpec
575 // FIXME: it should be made more generic, so it does not need replacement...
576 builder->algorithm = readers::AODReaderHelpers::indexBuilderCallback(ac.requestedIDXs);
577 AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.requestedIDXs, ac.requestedAODs, ac.requestedDYNs, *builder);
578 }
579
580 if (spawner != workflow.end()) {
581 // collect currently requested DYNs
582 for (auto& d : workflow) {
583 if (d.name == spawner->name) {
584 continue;
585 }
586 for (auto const& i : d.inputs) {
588 auto copy = i;
589 DataSpecUtils::updateInputList(ac.requestedDYNs, std::move(copy));
590 }
591 }
592 for (auto const& o : d.outputs) {
594 ac.providedDYNs.emplace_back(o);
595 }
596 }
597 }
598 std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
599 std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
600 ac.spawnerInputs.clear();
601 for (auto& input : ac.requestedDYNs) {
602 if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) {
603 ac.spawnerInputs.emplace_back(input);
604 }
605 }
606 // recreate inputs and outputs
607 spawner->outputs.clear();
608 spawner->inputs.clear();
609 // replace AlgorithmSpec
610 // FIXME: it should be made more generic, so it does not need replacement...
611 spawner->algorithm = readers::AODReaderHelpers::aodSpawnerCallback(ac.spawnerInputs);
612 AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, *spawner);
613 }
614
615 if (analysisCCDB != workflow.end()) {
616 for (auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
617 d.inputs | views::partial_match_filter(header::DataOrigin{"ATIM"}) | sinks::update_input_list{ac.requestedTIMs};
618 d.outputs | views::partial_match_filter(header::DataOrigin{"ATIM"}) | sinks::append_to{ac.providedTIMs};
619 }
620 std::sort(ac.requestedTIMs.begin(), ac.requestedTIMs.end(), inputSpecLessThan);
621 std::sort(ac.providedTIMs.begin(), ac.providedTIMs.end(), outputSpecLessThan);
622 // Use ranges::to<std::vector<>> in C++23...
623 ac.analysisCCDBInputs.clear();
624 ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) | sinks::append_to{ac.analysisCCDBInputs};
625
626 // recreate inputs and outputs
627 analysisCCDB->outputs.clear();
628 analysisCCDB->inputs.clear();
629 // replace AlgorithmSpec
630 // FIXME: it should be made more generic, so it does not need replacement...
631 // FIXME how can I make the lookup depend on DYN tables as well??
632 analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
633 AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher({}, ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedDYNs, *analysisCCDB);
634 }
635
636 if (writer != workflow.end()) {
637 workflow.erase(writer);
638 }
639
640 if (reader != workflow.end()) {
641 // If reader and/or builder were adjusted, remove unneeded outputs
642 // update currently requested AODs
643 for (auto& d : workflow) {
644 for (auto const& i : d.inputs) {
645 if (DataSpecUtils::partialMatch(i, AODOrigins)) {
646 auto copy = i;
647 DataSpecUtils::updateInputList(ac.requestedAODs, std::move(copy));
648 }
649 }
650 }
651
652 // remove unmatched outputs
653 auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](OutputSpec const& o) {
654 return !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFNumber"}) && !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFFilename"}) && std::none_of(ac.requestedAODs.begin(), ac.requestedAODs.end(), [&](InputSpec const& i) { return DataSpecUtils::match(i, o); });
655 });
656 reader->outputs.erase(o_end, reader->outputs.end());
657 if (reader->outputs.empty()) {
658 // nothing to read
659 workflow.erase(reader);
660 }
661 }
662
663
664
665 // replace writer as some outputs may have become dangling and some are now consumed
666 auto [outputsInputs, isDangling] = WorkflowHelpers::analyzeOutputs(workflow);
667
668 // create DataOutputDescriptor
669 std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
670
671 // select outputs of type AOD which need to be saved
672 // ATTENTION: if there are dangling outputs the getGlobalAODSink
673 // has to be created in any case!
674 ac.outputsInputsAOD.clear();
675
676 for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
677 if (DataSpecUtils::partialMatch(outputsInputs[ii], extendedAODOrigins)) {
678 auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]);
679 if (!ds.empty() || isDangling[ii]) {
680 ac.outputsInputsAOD.emplace_back(outputsInputs[ii]);
681 }
682 }
683 }
684
685 // file sink for any AOD output
686 if (!ac.outputsInputsAOD.empty()) {
687 // add TFNumber and TFFilename as input to the writer
688 ac.outputsInputsAOD.emplace_back("tfn", "TFN", "TFNumber");
689 ac.outputsInputsAOD.emplace_back("tff", "TFF", "TFFilename");
690 workflow.push_back(AnalysisSupportHelpers::getGlobalAODSink(ctx));
691 }
692 // Move the dummy sink at the end, if needed
693 for (size_t i = 0; i < workflow.size(); ++i) {
694 if (workflow[i].name == "internal-dpl-injected-dummy-sink") {
695 workflow.push_back(workflow[i]);
696 workflow.erase(workflow.begin() + i);
697 break;
698 }
699 } },
700 .kind = ServiceKind::Global};
701}
702
704{
705 return ServiceSpec{
706 .name = "arrow-slicing-cache-def",
707 .uniqueId = CommonServices::simpleServiceId<ArrowTableSlicingCacheDef>(),
708 .init = CommonServices::simpleServiceInit<ArrowTableSlicingCacheDef, ArrowTableSlicingCacheDef, ServiceKind::Global>(),
709 .kind = ServiceKind::Global};
710}
711
713{
714 return ServiceSpec{
715 .name = "arrow-slicing-cache",
716 .uniqueId = CommonServices::simpleServiceId<ArrowTableSlicingCache>(),
717 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions&) { return ServiceHandle{TypeIdHelpers::uniqueId<ArrowTableSlicingCache>(),
718 new ArrowTableSlicingCache(Cache{services.get<ArrowTableSlicingCacheDef>().bindingsKeys},
719 Cache{services.get<ArrowTableSlicingCacheDef>().bindingsKeysUnsorted}),
721 .configure = CommonServices::noConfiguration(),
722 .preProcessing = [](ProcessingContext& pc, void* service_ptr) {
723 auto* service = static_cast<ArrowTableSlicingCache*>(service_ptr);
724 auto& caches = service->bindingsKeys;
725 for (auto i = 0u; i < caches.size(); ++i) {
726 if (caches[i].enabled && pc.inputs().getPos(caches[i].binding.c_str()) >= 0) {
727 auto status = service->updateCacheEntry(i, pc.inputs().get<TableConsumer>(caches[i].binding.c_str())->asArrowTable());
728 if (!status.ok()) {
729 throw runtime_error_f("Failed to update slice cache for %s/%s", caches[i].binding.c_str(), caches[i].key.c_str());
730 }
731 }
732 }
733 auto& unsortedCaches = service->bindingsKeysUnsorted;
734 for (auto i = 0u; i < unsortedCaches.size(); ++i) {
735 if (unsortedCaches[i].enabled && pc.inputs().getPos(unsortedCaches[i].binding.c_str()) >= 0) {
736 auto status = service->updateCacheEntryUnsorted(i, pc.inputs().get<TableConsumer>(unsortedCaches[i].binding.c_str())->asArrowTable());
737 if (!status.ok()) {
738 throw runtime_error_f("failed to update slice cache (unsorted) for %s/%s", unsortedCaches[i].binding.c_str(), unsortedCaches[i].key.c_str());
739 }
740 }
741 } },
742 .kind = ServiceKind::Stream};
743}
744
745} // namespace o2::framework
int32_t i
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:531
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
int getPos(const char *name) const
decltype(auto) get(R binding, int part=0) const
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
std::shared_ptr< arrow::Table > asArrowTable()
Return the table in the message as a arrow::Table instance.
GLint GLenum GLint x
Definition glcorearb.h:403
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum GLenum GLsizei const GLuint GLboolean enabled
Definition glcorearb.h:2513
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLboolean * data
Definition glcorearb.h:298
GLsizei GLenum const void * indices
Definition glcorearb.h:400
Defining PrimaryVertex explicitly as messageable.
auto spawner(std::shared_ptr< arrow::Table > const &fullTable, const char *name, o2::framework::expressions::Projector *projectors, std::shared_ptr< gandiva::Projector > &projector, std::shared_ptr< arrow::Schema > const &schema)
Expression-based column generator to materialize columns.
auto offerResources(ResourceState &resourceState, ResourceSpec const &resourceSpec, ResourceStats &resourceStats, std::vector< DeviceSpec > const &specs, std::vector< DeviceInfo > const &infos, DevicesManager &manager, int64_t offerConsumedCurrentValue, int64_t offerExpiredCurrentValue, int64_t acquiredResourceCurrentValue, int64_t disposedResourceCurrentValue, size_t timestamp, DeviceMetricsInfo &driverMetrics, std::function< void(DeviceMetricsInfo &, int value, size_t timestamp)> &availableResourceMetric, std::function< void(DeviceMetricsInfo &, int value, size_t timestamp)> &unusedOfferedResourceMetric, std::function< void(DeviceMetricsInfo &, int value, size_t timestamp)> &offeredResourceMetric, void *signpostId) -> void
std::vector< Entry > Cache
std::vector< MetricIndices > createDefaultIndices(std::vector< DeviceMetricsInfo > &allDevicesMetrics)
RuntimeErrorRef runtime_error_f(const char *,...)
constexpr o2::header::SerializationMethod gSerializationMethodArrow
Definition DataHeader.h:331
std::vector< InputSpec > requestedAODs
static void addMissingOutputsToBuilder(std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static std::shared_ptr< DataOutputDirector > getDataOutputDirector(ConfigContext const &ctx)
Get the data director.
static void addMissingOutputsToSpawner(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, DataProcessorSpec &publisher)
static void addMissingOutputsToAnalysisCCDBFetcher(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static DataProcessorSpec getGlobalAODSink(ConfigContext const &)
writes inputs of kind AOD to file
static ServiceSpec arrowTableSlicingCacheSpec()
static ServiceSpec arrowBackendSpec()
static ServiceSpec arrowTableSlicingCacheDefSpec()
static ServiceConfigureCallback noConfiguration()
Helper struct to hold statistics about the data processing happening.
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
Running state information of a given device.
Definition DeviceState.h:34
void queueMessage(char const *receiver, char const *msg)
static auto loadAlgorithmFromPlugin(std::string library, std::string plugin, ConfigContext const &context) -> AlgorithmSpec
int64_t maxAvailable
The callback to give resources to a device.
int64_t minQuantum
Largest offer which can be given.
int64_t maxQuantum
Maximum available quantity for a resource.
int64_t metricOfferScaleFactor
Smallest offer which can be given.
int64_t lowCount
How many times the resources were enough.
std::string name
Name of the service.
static std::tuple< std::vector< InputSpec >, std::vector< bool > > analyzeOutputs(WorkflowSpec const &workflow)
std::vector< DataProcessorSpec > & specs
static AlgorithmSpec aodSpawnerCallback(std::vector< InputSpec > &requested)
static AlgorithmSpec indexBuilderCallback(std::vector< InputSpec > &requested)
the main header struct
Definition DataHeader.h:619
o2::mch::DsIndex ds