Project
Loading...
Searching...
No Matches
ArrowSupport.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "ArrowSupport.h"
12
30#include "WorkflowHelpers.h"
35#include "Framework/Signpost.h"
37
39#include <Monitoring/Monitoring.h>
40#include "Headers/DataHeader.h"
41
42#include <RtypesCore.h>
43#include <fairmq/ProgOptions.h>
44
45#include <uv.h>
46#include <boost/program_options/variables_map.hpp>
47#include <csignal>
48
50
51namespace o2::framework
52{
53
54class EndOfStreamContext;
55class ProcessingContext;
56
57enum struct RateLimitingState {
58 UNKNOWN = 0, // No information received yet.
59 STARTED = 1, // Information received, new timeframe not requested.
60 CHANGED = 2, // Information received, new timeframe requested but not yet accounted.
61 BELOW_LIMIT = 3, // New metric received, we are below limit.
62 NEXT_ITERATION_FROM_BELOW = 4, // Iteration when previously in BELOW_LIMIT.
63 ABOVE_LIMIT = 5, // New metric received, we are above limit.
64 EMPTY = 6, //
65};
66
68 int64_t maxMemory = 2000;
69 int64_t maxTimeframes = 1000;
70};
71
73 size_t arrowBytesCreated = -1;
77 size_t arrowBytesExpired = -1;
79 size_t timeframesRead = -1;
80 size_t timeframesConsumed = -1;
81 size_t timeframesExpired = -1;
82};
83
84std::vector<MetricIndices> createDefaultIndices(std::vector<DeviceMetricsInfo>& allDevicesMetrics)
85{
86 std::vector<MetricIndices> results;
87
88 for (auto& info : allDevicesMetrics) {
89 results.emplace_back(MetricIndices{
90 .arrowBytesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-created"),
91 .arrowBytesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-destroyed"),
92 .arrowMessagesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-created"),
93 .arrowMessagesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-destroyed"),
94 .arrowBytesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-expired"),
95 .shmOfferBytesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "shm-offer-bytes-consumed"),
96 .timeframesRead = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "df-sent"),
97 .timeframesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "consumed-timeframes"),
98 .timeframesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "expired-timeframes")});
99 }
100 return results;
101}
102
104 int64_t available;
105 int64_t offered = 0;
106 int64_t lastDeviceOffered = 0;
107};
109 int64_t enoughCount;
110 int64_t lowCount;
111};
113 char const* name;
114 char const* unit;
115 char const* api;
116 int64_t maxAvailable;
117 int64_t maxQuantum;
118 int64_t minQuantum;
120};
121
122auto offerResources(ResourceState& resourceState,
123 ResourceSpec const& resourceSpec,
124 ResourceStats& resourceStats,
125 std::vector<DeviceSpec> const& specs,
126 std::vector<DeviceInfo> const& infos,
127 DevicesManager& manager,
128 int64_t offerConsumedCurrentValue,
129 int64_t offerExpiredCurrentValue,
130 int64_t acquiredResourceCurrentValue,
131 int64_t disposedResourceCurrentValue,
132 size_t timestamp,
133 DeviceMetricsInfo& driverMetrics,
134 std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& availableResourceMetric,
135 std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& unusedOfferedResourceMetric,
136 std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& offeredResourceMetric,
137 void* signpostId) -> void
138{
139 O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, signpostId);
142 int64_t lastCandidate = -1;
143 int64_t possibleOffer = resourceSpec.minQuantum;
144
145 for (size_t di = 0; di < specs.size(); di++) {
146 if (resourceState.available < possibleOffer) {
147 if (resourceStats.lowCount == 0) {
148 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "not enough",
149 "We do not have enough %{public}s (%llu %{public}s) to offer %llu %{public}s. Total offerings %{bytes}llu %{string}s.",
150 resourceSpec.name, resourceState.available, resourceSpec.unit,
151 possibleOffer, resourceSpec.unit,
152 resourceState.offered, resourceSpec.unit);
153 }
154 resourceStats.lowCount++;
155 resourceStats.enoughCount = 0;
156 break;
157 } else {
158 if (resourceStats.enoughCount == 0) {
159 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "enough",
160 "We are back in a state where we enough %{public}s: %llu %{public}s",
161 resourceSpec.name,
162 resourceState.available,
163 resourceSpec.unit);
164 }
165 resourceStats.lowCount = 0;
166 resourceStats.enoughCount++;
167 }
168 size_t candidate = (resourceState.lastDeviceOffered + di) % specs.size();
169
170 auto& info = infos[candidate];
171 // Do not bother for inactive devices
172 // FIXME: there is probably a race condition if the device died and we did not
173 // took notice yet...
174 if (info.active == false || info.readyToQuit) {
175 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
176 "Device %s is inactive not offering %{public}s to it.",
177 specs[candidate].name.c_str(), resourceSpec.name);
178 continue;
179 }
180 if (specs[candidate].name != "internal-dpl-aod-reader") {
181 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
182 "Device %s is not a reader. Not offering %{public}s to it.",
183 specs[candidate].name.c_str(),
184 resourceSpec.name);
185 continue;
186 }
187 possibleOffer = std::min(resourceSpec.maxQuantum, resourceState.available);
188 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
189 "Offering %llu %{public}s out of %llu to %{public}s",
190 possibleOffer, resourceSpec.unit, resourceState.available, specs[candidate].id.c_str());
191 manager.queueMessage(specs[candidate].id.c_str(), fmt::format(fmt::runtime(resourceSpec.api), possibleOffer).data());
192 resourceState.available -= possibleOffer;
193 resourceState.offered += possibleOffer;
194 lastCandidate = candidate;
195 }
196 // We had at least a valid candidate, so
197 // next time we offer to the next device.
198 if (lastCandidate >= 0) {
199 resourceState.lastDeviceOffered = lastCandidate + 1;
200 }
201
202 // unusedOfferedSharedMemory is the amount of memory which was offered and which we know it was
203 // not used so far. So we need to account for the amount which got actually read (readerBytesCreated)
204 // and the amount which we know was given back.
205 static int64_t lastResourceOfferConsumed = 0;
206 static int64_t lastUnusedOfferedResource = 0;
207 if (offerConsumedCurrentValue != lastResourceOfferConsumed) {
208 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
209 "Offer consumed so far %llu", offerConsumedCurrentValue);
210 lastResourceOfferConsumed = offerConsumedCurrentValue;
211 }
212 int unusedOfferedResource = (resourceState.offered - (offerExpiredCurrentValue + offerConsumedCurrentValue) / resourceSpec.metricOfferScaleFactor);
213 if (lastUnusedOfferedResource != unusedOfferedResource) {
214 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
215 "unusedOfferedResource(%{public}s):%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / %lli",
216 resourceSpec.name,
217 unusedOfferedResource, resourceState.offered,
218 offerExpiredCurrentValue / resourceSpec.metricOfferScaleFactor,
219 offerConsumedCurrentValue / resourceSpec.metricOfferScaleFactor,
220 resourceSpec.metricOfferScaleFactor);
221 lastUnusedOfferedResource = unusedOfferedResource;
222 }
223 // availableSharedMemory is the amount of memory which we know is available to be offered.
224 // We subtract the amount which we know was already offered but it's unused and we then balance how
225 // much was created with how much was destroyed.
226 resourceState.available = resourceSpec.maxAvailable + ((disposedResourceCurrentValue - acquiredResourceCurrentValue) / resourceSpec.metricOfferScaleFactor) - unusedOfferedResource;
227 availableResourceMetric(driverMetrics, resourceState.available, timestamp);
228 unusedOfferedResourceMetric(driverMetrics, unusedOfferedResource, timestamp);
229
230 offeredResourceMetric(driverMetrics, resourceState.offered, timestamp);
231};
232
234{
235 using o2::monitoring::Metric;
236 using o2::monitoring::Monitoring;
237 using o2::monitoring::tags::Key;
238 using o2::monitoring::tags::Value;
239
240 return ServiceSpec{
241 .name = "arrow-backend",
243 .configure = CommonServices::noConfiguration(),
248 .metricHandling = [](ServiceRegistryRef registry,
249 ServiceMetricsInfo const& sm,
250 size_t timestamp) {
251 int64_t totalBytesCreated = 0;
252 int64_t shmOfferBytesConsumed = 0;
253 int64_t totalBytesDestroyed = 0;
254 int64_t totalBytesExpired = 0;
255 int64_t totalMessagesCreated = 0;
256 int64_t totalMessagesDestroyed = 0;
257 int64_t totalTimeframesRead = 0;
258 int64_t totalTimeframesConsumed = 0;
259 int64_t totalTimeframesExpired = 0;
260 auto &driverMetrics = sm.driverMetricsInfo;
261 auto &allDeviceMetrics = sm.deviceMetricsInfos;
262 auto &specs = sm.deviceSpecs;
263 auto &infos = sm.deviceInfos;
264
265 static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "rate-limit-state");
266 static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-created");
267 static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-shm-offer-bytes-consumed");
268 // These are really to monitor the rate limiting
269 static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-shared-memory");
270 static auto unusedOfferedTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-timeslices");
271 static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-shared-memory");
272 static auto availableTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-timeslices");
273 static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-shared-memory");
274 static auto offeredTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-timeslices");
275
276 static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-destroyed");
277 static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-expired");
278 static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-created");
279 static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-destroyed");
280 static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-read");
281 static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-consumed");
282 static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-timeframes-in-fly");
283 static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "arrow-bytes-delta");
284 static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "changed-metrics-count");
285 static auto totalSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-reader-signals");
286 static auto signalLatencyMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-signal-latency");
287 static auto skippedSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-skipped-signals");
288 static auto remainingBytes = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-remaining-bytes");
289 auto& manager = registry.get<DevicesManager>();
290
291 bool changed = false;
292
293 size_t lastTimestamp = 0;
294 static std::vector<MetricIndices> allIndices = createDefaultIndices(allDeviceMetrics);
295 for (size_t mi = 0; mi < allDeviceMetrics.size(); ++mi) {
296 auto& deviceMetrics = allDeviceMetrics[mi];
297 if (deviceMetrics.changed.size() != deviceMetrics.metrics.size()) {
298 throw std::runtime_error("deviceMetrics.size() != allDeviceMetrics.size()");
299 }
300 auto& indices = allIndices[mi];
301 {
302 size_t index = indices.arrowBytesCreated;
303 assert(index < deviceMetrics.metrics.size());
304 changed |= deviceMetrics.changed[index];
305 MetricInfo info = deviceMetrics.metrics[index];
306 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
307 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
308 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
309 auto value = (int64_t)data[(info.pos - 1) % data.size()];
310 totalBytesCreated += value;
311 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
312 }
313 {
314 size_t index = indices.shmOfferBytesConsumed;
315 assert(index < deviceMetrics.metrics.size());
316 changed |= deviceMetrics.changed[index];
317 MetricInfo info = deviceMetrics.metrics[index];
318 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
319 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
320 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
321 auto value = (int64_t)data[(info.pos - 1) % data.size()];
322 shmOfferBytesConsumed += value;
323 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
324 }
325 {
326 size_t index = indices.arrowBytesDestroyed;
327 assert(index < deviceMetrics.metrics.size());
328 changed |= deviceMetrics.changed[index];
329 MetricInfo info = deviceMetrics.metrics[index];
330 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
331 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
332 auto value = (int64_t)data[(info.pos - 1) % data.size()];
333 totalBytesDestroyed += value;
334 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
335 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
336 }
337 {
338 size_t index = indices.arrowBytesExpired;
339 assert(index < deviceMetrics.metrics.size());
340 changed |= deviceMetrics.changed[index];
341 MetricInfo info = deviceMetrics.metrics[index];
342 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
343 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
344 auto value = (int64_t)data[(info.pos - 1) % data.size()];
345 totalBytesExpired += value;
346 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
347 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
348 }
349 {
350 size_t index = indices.arrowMessagesCreated;
351 assert(index < deviceMetrics.metrics.size());
352 MetricInfo info = deviceMetrics.metrics[index];
353 changed |= deviceMetrics.changed[index];
354 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
355 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
356 auto value = (int64_t)data[(info.pos - 1) % data.size()];
357 totalMessagesCreated += value;
358 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
359 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
360 }
361 {
362 size_t index = indices.arrowMessagesDestroyed;
363 assert(index < deviceMetrics.metrics.size());
364 MetricInfo info = deviceMetrics.metrics[index];
365 changed |= deviceMetrics.changed[index];
366 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
367 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
368 auto value = (int64_t)data[(info.pos - 1) % data.size()];
369 totalMessagesDestroyed += value;
370 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
371 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
372 }
373 {
374 size_t index = indices.timeframesRead;
375 assert(index < deviceMetrics.metrics.size());
376 changed |= deviceMetrics.changed[index];
377 MetricInfo info = deviceMetrics.metrics[index];
378 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
379 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
380 auto value = (int64_t)data[(info.pos - 1) % data.size()];
381 totalTimeframesRead += value;
382 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
383 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
384 }
385 {
386 size_t index = indices.timeframesConsumed;
387 assert(index < deviceMetrics.metrics.size());
388 changed |= deviceMetrics.changed[index];
389 MetricInfo info = deviceMetrics.metrics[index];
390 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
391 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
392 auto value = (int64_t)data[(info.pos - 1) % data.size()];
393 totalTimeframesConsumed += value;
394 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
395 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
396 }
397 {
398 size_t index = indices.timeframesExpired;
399 assert(index < deviceMetrics.metrics.size());
400 changed |= deviceMetrics.changed[index];
401 MetricInfo info = deviceMetrics.metrics[index];
402 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
403 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
404 auto value = (int64_t)data[(info.pos - 1) % data.size()];
405 totalTimeframesExpired += value;
406 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
407 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
408 }
409 }
410 static uint64_t unchangedCount = 0;
411 if (changed) {
412 totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp);
413 totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp);
414 totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp);
415 shmOfferConsumedMetric(driverMetrics, shmOfferBytesConsumed, timestamp);
416 totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp);
417 totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
418 totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
419 totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
420 totalTimeframesInFlyMetric(driverMetrics, (int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
421 totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
422 } else {
423 unchangedCount++;
424 }
425 changedCountMetric(driverMetrics, unchangedCount, timestamp);
426
427 static const ResourceSpec shmResourceSpec{
428 .name = "shared memory",
429 .unit = "MB",
430 .api = "/shm-offer {}",
431 .maxAvailable = (int64_t)registry.get<RateLimitConfig>().maxMemory,
432 .maxQuantum = 100,
433 .minQuantum = 50,
434 .metricOfferScaleFactor = 1000000,
435 };
436 static const ResourceSpec timesliceResourceSpec{
437 .name = "timeslice",
438 .unit = "timeslices",
439 .api = "/timeslice-offer {}",
440 .maxAvailable = (int64_t)registry.get<RateLimitConfig>().maxTimeframes,
441 .maxQuantum = 1,
442 .minQuantum = 1,
443 .metricOfferScaleFactor = 1,
444 };
445 static ResourceState shmResourceState{
446 .available = shmResourceSpec.maxAvailable,
447 };
448 static ResourceState timesliceResourceState{
449 .available = timesliceResourceSpec.maxAvailable,
450 };
451 static ResourceStats shmResourceStats{
452 .enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
453 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
454 };
455 static ResourceStats timesliceResourceStats{
456 .enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
457 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
458 };
459
460 offerResources(timesliceResourceState, timesliceResourceSpec, timesliceResourceStats,
461 specs, infos, manager, totalTimeframesConsumed, totalTimeframesExpired,
462 totalTimeframesRead, totalTimeframesConsumed, timestamp, driverMetrics,
463 availableTimeslicesMetric, unusedOfferedTimeslicesMetric, offeredTimeslicesMetric,
464 (void*)&sm);
465
466 offerResources(shmResourceState, shmResourceSpec, shmResourceStats,
467 specs, infos, manager, shmOfferBytesConsumed, totalBytesExpired,
468 totalBytesCreated, totalBytesDestroyed, timestamp, driverMetrics,
469 availableSharedMemoryMetric, unusedOfferedSharedMemoryMetric, offeredSharedMemoryMetric,
470 (void*)&sm); },
471 .postDispatching = [](ProcessingContext& ctx, void* service) {
473 auto* arrow = reinterpret_cast<ArrowContext*>(service);
474 auto totalBytes = 0;
475 auto totalMessages = 0;
476 O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, &arrow);
477 for (auto& input : ctx.inputs()) {
478 if (input.header == nullptr) {
479 continue;
480 }
481 auto const* dh = DataRefUtils::getHeader<DataHeader*>(input);
482 auto payloadSize = DataRefUtils::getPayloadSize(input);
483 if (dh->serialization != o2::header::gSerializationMethodArrow) {
484 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
485 "Message %{public}.4s/%{public}.16s is not of kind arrow, therefore we are not accounting its shared memory.",
486 dh->dataOrigin.str, dh->dataDescription.str);
487 continue;
488 }
489 bool forwarded = false;
490 for (auto const& forward : ctx.services().get<DeviceSpec const>().forwards) {
491 if (DataSpecUtils::match(forward.matcher, *dh)) {
492 forwarded = true;
493 break;
494 }
495 }
496 if (forwarded) {
497 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
498 "Message %{public}.4s/%{public}.16s is forwarded so we are not returning its memory.",
499 dh->dataOrigin.str, dh->dataDescription.str);
500 continue;
501 }
502 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
503 "Message %{public}.4s/%{public}.16s is being deleted. We will return %{bytes}f MB.",
504 dh->dataOrigin.str, dh->dataDescription.str, payloadSize / 1000000.);
505 totalBytes += payloadSize;
506 totalMessages += 1;
507 }
508 arrow->updateBytesDestroyed(totalBytes);
509 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "give back",
510 "%{bytes}f MB bytes being given back to reader, totaling %{bytes}f MB",
511 totalBytes / 1000000., arrow->bytesDestroyed() / 1000000.);
512 arrow->updateMessagesDestroyed(totalMessages);
513 auto& stats = ctx.services().get<DataProcessingStats>();
514 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_DESTROYED), DataProcessingStats::Op::Set, static_cast<int64_t>(arrow->bytesDestroyed())});
515 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_DESTROYED), DataProcessingStats::Op::Set, static_cast<int64_t>(arrow->messagesDestroyed())});
516 stats.processCommandQueue(); },
517 .driverInit = [](ServiceRegistryRef registry, DeviceConfig const& dc) {
518 auto config = new RateLimitConfig{};
519 int readers = std::stoll(dc.options["readers"].as<std::string>());
520 if (dc.options.count("aod-memory-rate-limit") && dc.options["aod-memory-rate-limit"].defaulted() == false) {
521 config->maxMemory = std::stoll(dc.options["aod-memory-rate-limit"].as<std::string>()) / 1000000;
522 } else {
523 config->maxMemory = readers * 500;
524 }
525 if (dc.options.count("timeframes-rate-limit") && dc.options["timeframes-rate-limit"].defaulted() == false) {
526 config->maxTimeframes = std::stoll(dc.options["timeframes-rate-limit"].as<std::string>());
527 } else {
528 config->maxTimeframes = readers * DefaultsHelpers::pipelineLength();
529 }
530 static bool once = false;
531 // Until we guarantee this is called only once...
532 if (!once) {
533 O2_SIGNPOST_ID_GENERATE(sid, rate_limiting);
534 O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "setup",
535 "Rate limiting set up at %{bytes}llu MB and %llu timeframes distributed over %d readers",
536 config->maxMemory, config->maxTimeframes, readers);
537 registry.registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
538 once = true;
539 } },
540 .adjustTopology = [](WorkflowSpecNode& node, ConfigContext const& ctx) {
541 auto& workflow = node.specs;
542 auto spawner = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-spawner"; });
543 auto analysisCCDB = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-ccdb"; });
544 auto builder = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-index-builder"; });
545 auto reader = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-reader"; });
546 auto writer = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-writer"; });
547 auto &ac = ctx.services().get<AnalysisContext>();
548 ac.requestedAODs.clear();
549 ac.requestedDYNs.clear();
550 ac.providedDYNs.clear();
551 ac.providedTIMs.clear();
552 ac.requestedTIMs.clear();
553
554
555 auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
556 auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
557
558 if (builder != workflow.end()) {
559 // collect currently requested IDXs
560 ac.requestedIDXs.clear();
561 for (auto& d : workflow) {
562 if (d.name == builder->name) {
563 continue;
564 }
565 for (auto& i : d.inputs) {
567 auto copy = i;
568 DataSpecUtils::updateInputList(ac.requestedIDXs, std::move(copy));
569 }
570 }
571 }
572 // recreate inputs and outputs
573 builder->inputs.clear();
574 builder->outputs.clear();
575 // replace AlgorithmSpec
576 // FIXME: it should be made more generic, so it does not need replacement...
577 builder->algorithm = readers::AODReaderHelpers::indexBuilderCallback(ac.requestedIDXs);
578 AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.requestedIDXs, ac.requestedAODs, ac.requestedDYNs, *builder);
579 }
580
581 if (spawner != workflow.end()) {
582 // collect currently requested DYNs
583 for (auto& d : workflow) {
584 if (d.name == spawner->name) {
585 continue;
586 }
587 for (auto const& i : d.inputs) {
589 auto copy = i;
590 DataSpecUtils::updateInputList(ac.requestedDYNs, std::move(copy));
591 }
592 }
593 for (auto const& o : d.outputs) {
595 ac.providedDYNs.emplace_back(o);
596 }
597 }
598 }
599 std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
600 std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
601 ac.spawnerInputs.clear();
602 for (auto& input : ac.requestedDYNs) {
603 if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) {
604 ac.spawnerInputs.emplace_back(input);
605 }
606 }
607 // recreate inputs and outputs
608 spawner->outputs.clear();
609 spawner->inputs.clear();
610 // replace AlgorithmSpec
611 // FIXME: it should be made more generic, so it does not need replacement...
613 AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, *spawner);
614 }
615
616 if (analysisCCDB != workflow.end()) {
617 for (auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
618 d.inputs | views::partial_match_filter(header::DataOrigin{"ATIM"}) | sinks::update_input_list{ac.requestedTIMs};
619 d.outputs | views::partial_match_filter(header::DataOrigin{"ATIM"}) | sinks::append_to{ac.providedTIMs};
620 }
621 std::sort(ac.requestedTIMs.begin(), ac.requestedTIMs.end(), inputSpecLessThan);
622 std::sort(ac.providedTIMs.begin(), ac.providedTIMs.end(), outputSpecLessThan);
623 // Use ranges::to<std::vector<>> in C++23...
624 ac.analysisCCDBInputs.clear();
625 ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) | sinks::append_to{ac.analysisCCDBInputs};
626
627 // recreate inputs and outputs
628 analysisCCDB->outputs.clear();
629 analysisCCDB->inputs.clear();
630 // replace AlgorithmSpec
631 // FIXME: it should be made more generic, so it does not need replacement...
632 // FIXME how can I make the lookup depend on DYN tables as well??
633 analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
634 AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher({}, ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedDYNs, *analysisCCDB);
635 }
636
637 if (writer != workflow.end()) {
638 workflow.erase(writer);
639 }
640
641 if (reader != workflow.end()) {
642 // If reader and/or builder were adjusted, remove unneeded outputs
643 // update currently requested AODs
644 for (auto& d : workflow) {
645 for (auto const& i : d.inputs) {
646 if (DataSpecUtils::partialMatch(i, AODOrigins)) {
647 auto copy = i;
648 DataSpecUtils::updateInputList(ac.requestedAODs, std::move(copy));
649 }
650 }
651 }
652
653 // remove unmatched outputs
654 auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](OutputSpec const& o) {
655 return !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFNumber"}) && !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFFilename"}) && std::none_of(ac.requestedAODs.begin(), ac.requestedAODs.end(), [&](InputSpec const& i) { return DataSpecUtils::match(i, o); });
656 });
657 reader->outputs.erase(o_end, reader->outputs.end());
658 if (reader->outputs.empty()) {
659 // nothing to read
660 workflow.erase(reader);
661 }
662 }
663
664
665
666 // replace writer as some outputs may have become dangling and some are now consumed
667 auto [outputsInputs, isDangling] = WorkflowHelpers::analyzeOutputs(workflow);
668
669 // create DataOutputDescriptor
670 std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
671
672 // select outputs of type AOD which need to be saved
673 // ATTENTION: if there are dangling outputs the getGlobalAODSink
674 // has to be created in any case!
675 ac.outputsInputsAOD.clear();
676
677 for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
678 if (DataSpecUtils::partialMatch(outputsInputs[ii], extendedAODOrigins)) {
679 auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]);
680 if (!ds.empty() || isDangling[ii]) {
681 ac.outputsInputsAOD.emplace_back(outputsInputs[ii]);
682 }
683 }
684 }
685
686 // file sink for any AOD output
687 if (!ac.outputsInputsAOD.empty()) {
688 // add TFNumber and TFFilename as input to the writer
689 ac.outputsInputsAOD.emplace_back("tfn", "TFN", "TFNumber");
690 ac.outputsInputsAOD.emplace_back("tff", "TFF", "TFFilename");
691 workflow.push_back(AnalysisSupportHelpers::getGlobalAODSink(ctx));
692 }
693 // Move the dummy sink at the end, if needed
694 for (size_t i = 0; i < workflow.size(); ++i) {
695 if (workflow[i].name == "internal-dpl-injected-dummy-sink") {
696 workflow.push_back(workflow[i]);
697 workflow.erase(workflow.begin() + i);
698 break;
699 }
700 } },
701 .kind = ServiceKind::Global};
702}
703
705{
706 return ServiceSpec{
707 .name = "arrow-slicing-cache-def",
708 .uniqueId = CommonServices::simpleServiceId<ArrowTableSlicingCacheDef>(),
709 .init = CommonServices::simpleServiceInit<ArrowTableSlicingCacheDef, ArrowTableSlicingCacheDef, ServiceKind::Global>(),
710 .kind = ServiceKind::Global};
711}
712
714{
715 return ServiceSpec{
716 .name = "arrow-slicing-cache",
717 .uniqueId = CommonServices::simpleServiceId<ArrowTableSlicingCache>(),
718 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions&) { return ServiceHandle{TypeIdHelpers::uniqueId<ArrowTableSlicingCache>(),
719 new ArrowTableSlicingCache(Cache{services.get<ArrowTableSlicingCacheDef>().bindingsKeys},
720 Cache{services.get<ArrowTableSlicingCacheDef>().bindingsKeysUnsorted}),
722 .configure = CommonServices::noConfiguration(),
723 .preProcessing = [](ProcessingContext& pc, void* service_ptr) {
724 auto* service = static_cast<ArrowTableSlicingCache*>(service_ptr);
725 auto& caches = service->bindingsKeys;
726 for (auto i = 0u; i < caches.size(); ++i) {
727 if (caches[i].enabled && pc.inputs().getPos(caches[i].binding.c_str()) >= 0) {
728 auto status = service->updateCacheEntry(i, pc.inputs().get<TableConsumer>(caches[i].binding.c_str())->asArrowTable());
729 if (!status.ok()) {
730 throw runtime_error_f("Failed to update slice cache for %s/%s", caches[i].binding.c_str(), caches[i].key.c_str());
731 }
732 }
733 }
734 auto& unsortedCaches = service->bindingsKeysUnsorted;
735 for (auto i = 0u; i < unsortedCaches.size(); ++i) {
736 if (unsortedCaches[i].enabled && pc.inputs().getPos(unsortedCaches[i].binding.c_str()) >= 0) {
737 auto status = service->updateCacheEntryUnsorted(i, pc.inputs().get<TableConsumer>(unsortedCaches[i].binding.c_str())->asArrowTable());
738 if (!status.ok()) {
739 throw runtime_error_f("failed to update slice cache (unsorted) for %s/%s", unsortedCaches[i].binding.c_str(), unsortedCaches[i].key.c_str());
740 }
741 }
742 } },
743 .kind = ServiceKind::Stream};
744}
745
746} // namespace o2::framework
std::string binding
std::unique_ptr< expressions::Node > node
int32_t i
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:531
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
int getPos(const char *name) const
decltype(auto) get(R binding, int part=0) const
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
std::shared_ptr< arrow::Table > asArrowTable()
Return the table in the message as a arrow::Table instance.
GLint GLenum GLint x
Definition glcorearb.h:403
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum GLenum GLsizei const GLuint GLboolean enabled
Definition glcorearb.h:2513
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLboolean * data
Definition glcorearb.h:298
GLsizei GLenum const void * indices
Definition glcorearb.h:400
Defining PrimaryVertex explicitly as messageable.
auto spawner(std::shared_ptr< arrow::Table > const &fullTable, const char *name, o2::framework::expressions::Projector *projectors, std::shared_ptr< gandiva::Projector > &projector, std::shared_ptr< arrow::Schema > const &schema)
Expression-based column generator to materialize columns.
auto offerResources(ResourceState &resourceState, ResourceSpec const &resourceSpec, ResourceStats &resourceStats, std::vector< DeviceSpec > const &specs, std::vector< DeviceInfo > const &infos, DevicesManager &manager, int64_t offerConsumedCurrentValue, int64_t offerExpiredCurrentValue, int64_t acquiredResourceCurrentValue, int64_t disposedResourceCurrentValue, size_t timestamp, DeviceMetricsInfo &driverMetrics, std::function< void(DeviceMetricsInfo &, int value, size_t timestamp)> &availableResourceMetric, std::function< void(DeviceMetricsInfo &, int value, size_t timestamp)> &unusedOfferedResourceMetric, std::function< void(DeviceMetricsInfo &, int value, size_t timestamp)> &offeredResourceMetric, void *signpostId) -> void
std::vector< Entry > Cache
std::vector< MetricIndices > createDefaultIndices(std::vector< DeviceMetricsInfo > &allDevicesMetrics)
RuntimeErrorRef runtime_error_f(const char *,...)
constexpr o2::header::SerializationMethod gSerializationMethodArrow
Definition DataHeader.h:331
std::vector< InputSpec > requestedAODs
static void addMissingOutputsToBuilder(std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static std::shared_ptr< DataOutputDirector > getDataOutputDirector(ConfigContext const &ctx)
Get the data director.
static void addMissingOutputsToSpawner(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, DataProcessorSpec &publisher)
static void addMissingOutputsToAnalysisCCDBFetcher(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static DataProcessorSpec getGlobalAODSink(ConfigContext const &)
writes inputs of kind AOD to file
static ServiceSpec arrowTableSlicingCacheSpec()
static ServiceSpec arrowBackendSpec()
static ServiceSpec arrowTableSlicingCacheDefSpec()
static ServiceConfigureCallback noConfiguration()
Helper struct to hold statistics about the data processing happening.
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
static unsigned int pipelineLength()
get max number of timeslices in the queue
Running state information of a given device.
Definition DeviceState.h:34
void queueMessage(char const *receiver, char const *msg)
static auto loadAlgorithmFromPlugin(std::string library, std::string plugin, ConfigContext const &context) -> AlgorithmSpec
int64_t maxAvailable
The callback to give resources to a device.
int64_t minQuantum
Largest offer which can be given.
int64_t maxQuantum
Maximum available quantity for a resource.
int64_t metricOfferScaleFactor
Smallest offer which can be given.
int64_t lowCount
How many times the resources were enough.
std::string name
Name of the service.
static std::tuple< std::vector< InputSpec >, std::vector< bool > > analyzeOutputs(WorkflowSpec const &workflow)
static AlgorithmSpec aodSpawnerCallback(ConfigContext const &ctx)
static AlgorithmSpec indexBuilderCallback(std::vector< InputSpec > &requested)
the main header struct
Definition DataHeader.h:619
o2::mch::DsIndex ds