Project
Loading...
Searching...
No Matches
ArrowSupport.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "ArrowSupport.h"
12
30#include "WorkflowHelpers.h"
35#include "Framework/Signpost.h"
37
39#include <Monitoring/Monitoring.h>
40#include "Headers/DataHeader.h"
41
42#include <RtypesCore.h>
43#include <fairmq/ProgOptions.h>
44
45#include <uv.h>
46#include <boost/program_options/variables_map.hpp>
47#include <csignal>
48
50
51namespace o2::framework
52{
53
54class EndOfStreamContext;
55class ProcessingContext;
56
57enum struct RateLimitingState {
58 UNKNOWN = 0, // No information received yet.
59 STARTED = 1, // Information received, new timeframe not requested.
60 CHANGED = 2, // Information received, new timeframe requested but not yet accounted.
61 BELOW_LIMIT = 3, // New metric received, we are below limit.
62 NEXT_ITERATION_FROM_BELOW = 4, // Iteration when previously in BELOW_LIMIT.
63 ABOVE_LIMIT = 5, // New metric received, we are above limit.
64 EMPTY = 6, //
65};
66
68 int64_t maxMemory = 2000;
69 int64_t maxTimeframes = 1000;
70};
71
73 size_t arrowBytesCreated = -1;
77 size_t arrowBytesExpired = -1;
79 size_t timeframesRead = -1;
80 size_t timeframesConsumed = -1;
81 size_t timeframesExpired = -1;
82 // Timeslices counting
83 size_t timeslicesStarted = -1;
84 size_t timeslicesExpired = -1;
85 size_t timeslicesDone = -1;
86};
87
88std::vector<MetricIndices> createDefaultIndices(std::vector<DeviceMetricsInfo>& allDevicesMetrics)
89{
90 std::vector<MetricIndices> results;
91
92 for (auto& info : allDevicesMetrics) {
93 results.emplace_back(MetricIndices{
94 .arrowBytesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-created"),
95 .arrowBytesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-destroyed"),
96 .arrowMessagesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-created"),
97 .arrowMessagesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-destroyed"),
98 .arrowBytesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-expired"),
99 .shmOfferBytesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "shm-offer-bytes-consumed"),
100 .timeframesRead = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "df-sent"),
101 .timeframesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "consumed-timeframes"),
102 .timeframesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "expired-timeframes"),
103 .timeslicesStarted = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "timeslices-started"),
104 .timeslicesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "timeslices-expired"),
105 .timeslicesDone = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "timeslices-done"),
106 });
107 }
108 return results;
109}
110
112 int64_t available;
113 int64_t offered = 0;
114 int64_t lastDeviceOffered = 0;
115};
117 int64_t enoughCount;
118 int64_t lowCount;
119};
121 char const* name;
122 char const* unit;
123 char const* api;
124 int64_t maxAvailable;
125 int64_t maxQuantum;
126 int64_t minQuantum;
128};
129
130auto offerResources(ResourceState& resourceState,
131 ResourceSpec const& resourceSpec,
132 ResourceStats& resourceStats,
133 std::vector<DeviceSpec> const& specs,
134 std::vector<DeviceInfo> const& infos,
135 DevicesManager& manager,
136 int64_t offerConsumedCurrentValue,
137 int64_t offerExpiredCurrentValue,
138 int64_t acquiredResourceCurrentValue,
139 int64_t disposedResourceCurrentValue,
140 size_t timestamp,
141 DeviceMetricsInfo& driverMetrics,
142 std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& availableResourceMetric,
143 std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& unusedOfferedResourceMetric,
144 std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& offeredResourceMetric,
145 void* signpostId) -> void
146{
147 O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, signpostId);
150 int64_t lastCandidate = -1;
151 int64_t possibleOffer = resourceSpec.minQuantum;
152
153 for (size_t di = 0; di < specs.size(); di++) {
154 if (resourceState.available < possibleOffer) {
155 if (resourceStats.lowCount == 0) {
156 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "not enough",
157 "We do not have enough %{public}s (%llu %{public}s) to offer %llu %{public}s. Total offerings %{bytes}llu %{string}s.",
158 resourceSpec.name, resourceState.available, resourceSpec.unit,
159 possibleOffer, resourceSpec.unit,
160 resourceState.offered, resourceSpec.unit);
161 }
162 resourceStats.lowCount++;
163 resourceStats.enoughCount = 0;
164 break;
165 } else {
166 if (resourceStats.enoughCount == 0) {
167 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "enough",
168 "We are back in a state where we enough %{public}s: %llu %{public}s",
169 resourceSpec.name,
170 resourceState.available,
171 resourceSpec.unit);
172 }
173 resourceStats.lowCount = 0;
174 resourceStats.enoughCount++;
175 }
176 size_t candidate = (resourceState.lastDeviceOffered + di) % specs.size();
177
178 auto& info = infos[candidate];
179 // Do not bother for inactive devices
180 // FIXME: there is probably a race condition if the device died and we did not
181 // took notice yet...
182 if (info.active == false || info.readyToQuit) {
183 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
184 "Device %s is inactive not offering %{public}s to it.",
185 specs[candidate].name.c_str(), resourceSpec.name);
186 continue;
187 }
188 if (specs[candidate].name != "internal-dpl-aod-reader") {
189 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
190 "Device %s is not a reader. Not offering %{public}s to it.",
191 specs[candidate].name.c_str(),
192 resourceSpec.name);
193 continue;
194 }
195 possibleOffer = std::min(resourceSpec.maxQuantum, resourceState.available);
196 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
197 "Offering %llu %{public}s out of %llu to %{public}s",
198 possibleOffer, resourceSpec.unit, resourceState.available, specs[candidate].id.c_str());
199 manager.queueMessage(specs[candidate].id.c_str(), fmt::format(fmt::runtime(resourceSpec.api), possibleOffer).data());
200 resourceState.available -= possibleOffer;
201 resourceState.offered += possibleOffer;
202 lastCandidate = candidate;
203 }
204 // We had at least a valid candidate, so
205 // next time we offer to the next device.
206 if (lastCandidate >= 0) {
207 resourceState.lastDeviceOffered = lastCandidate + 1;
208 }
209
210 // unusedOfferedSharedMemory is the amount of memory which was offered and which we know it was
211 // not used so far. So we need to account for the amount which got actually read (readerBytesCreated)
212 // and the amount which we know was given back.
213 static int64_t lastResourceOfferConsumed = 0;
214 static int64_t lastUnusedOfferedResource = 0;
215 if (offerConsumedCurrentValue != lastResourceOfferConsumed) {
216 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
217 "Offer consumed so far %llu", offerConsumedCurrentValue);
218 lastResourceOfferConsumed = offerConsumedCurrentValue;
219 }
220 int unusedOfferedResource = (resourceState.offered - (offerExpiredCurrentValue + offerConsumedCurrentValue) / resourceSpec.metricOfferScaleFactor);
221 if (lastUnusedOfferedResource != unusedOfferedResource) {
222 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
223 "unusedOfferedResource(%{public}s):%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / %lli",
224 resourceSpec.name,
225 unusedOfferedResource, resourceState.offered,
226 offerExpiredCurrentValue / resourceSpec.metricOfferScaleFactor,
227 offerConsumedCurrentValue / resourceSpec.metricOfferScaleFactor,
228 resourceSpec.metricOfferScaleFactor);
229 lastUnusedOfferedResource = unusedOfferedResource;
230 }
231 // availableSharedMemory is the amount of memory which we know is available to be offered.
232 // We subtract the amount which we know was already offered but it's unused and we then balance how
233 // much was created with how much was destroyed.
234 resourceState.available = resourceSpec.maxAvailable + ((disposedResourceCurrentValue - acquiredResourceCurrentValue) / resourceSpec.metricOfferScaleFactor) - unusedOfferedResource;
235 availableResourceMetric(driverMetrics, resourceState.available, timestamp);
236 unusedOfferedResourceMetric(driverMetrics, unusedOfferedResource, timestamp);
237
238 offeredResourceMetric(driverMetrics, resourceState.offered, timestamp);
239};
240
241auto processTimeslices = [](size_t index, DeviceMetricsInfo& deviceMetrics, bool& changed,
242 int64_t& totalMetricValue, size_t& lastTimestamp) {
243 assert(index < deviceMetrics.metrics.size());
244 changed |= deviceMetrics.changed[index];
245 MetricInfo info = deviceMetrics.metrics[index];
246 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
247 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
248 auto value = (int64_t)data[(info.pos - 1) % data.size()];
249 totalMetricValue += value;
250 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
251 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
252};
253
255{
256 using o2::monitoring::Metric;
257 using o2::monitoring::Monitoring;
258 using o2::monitoring::tags::Key;
259 using o2::monitoring::tags::Value;
260
261 return ServiceSpec{
262 .name = "arrow-backend",
264 .configure = CommonServices::noConfiguration(),
269 .metricHandling = [](ServiceRegistryRef registry,
270 ServiceMetricsInfo const& sm,
271 size_t timestamp) {
272 int64_t totalBytesCreated = 0;
273 int64_t shmOfferBytesConsumed = 0;
274 int64_t totalBytesDestroyed = 0;
275 int64_t totalBytesExpired = 0;
276 int64_t totalMessagesCreated = 0;
277 int64_t totalMessagesDestroyed = 0;
278 int64_t totalTimeframesRead = 0;
279 int64_t totalTimeframesConsumed = 0;
280 int64_t totalTimeframesExpired = 0;
281 int64_t totalTimeslicesStarted = 0;
282 int64_t totalTimeslicesDone = 0;
283 int64_t totalTimeslicesExpired = 0;
284 auto &driverMetrics = sm.driverMetricsInfo;
285 auto &allDeviceMetrics = sm.deviceMetricsInfos;
286 auto &specs = sm.deviceSpecs;
287 auto &infos = sm.deviceInfos;
288
289 // Aggregated driver metrics for timeslice rate limiting
290 auto createUint64DriverMetric = [&driverMetrics](char const*name) -> auto {
291 return DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, name);
292 };
293 auto createIntDriverMetric = [&driverMetrics](char const*name) -> auto {
294 return DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, name);
295 };
296
297 static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "rate-limit-state");
298 static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-created");
299 static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-shm-offer-bytes-consumed");
300 // These are really to monitor the rate limiting
301 static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-shared-memory");
302 static auto unusedOfferedTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-timeslices");
303 static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-shared-memory");
304 static auto availableTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-timeslices");
305 static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-shared-memory");
306 static auto offeredTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-timeslices");
307
308 static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-destroyed");
309 static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-expired");
310 static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-created");
311 static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-destroyed");
312 static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-read");
313 static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-consumed");
314 static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-timeframes-in-fly");
315
316 static auto totalTimeslicesStartedMetric = createUint64DriverMetric("total-timeslices-started");
317 static auto totalTimeslicesExpiredMetric = createUint64DriverMetric("total-timeslices-expired");
318 static auto totalTimeslicesDoneMetric = createUint64DriverMetric("total-timeslices-done");
319 static auto totalTimeslicesInFlyMetric = createIntDriverMetric("total-timeslices-in-fly");
320
321 static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "arrow-bytes-delta");
322 static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "changed-metrics-count");
323 static auto totalSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-reader-signals");
324 static auto signalLatencyMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-signal-latency");
325 static auto skippedSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-skipped-signals");
326 static auto remainingBytes = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-remaining-bytes");
327 auto& manager = registry.get<DevicesManager>();
328
329 bool changed = false;
330
331 size_t lastTimestamp = 0;
332 static std::vector<MetricIndices> allIndices = createDefaultIndices(allDeviceMetrics);
333 for (size_t mi = 0; mi < allDeviceMetrics.size(); ++mi) {
334 auto& deviceMetrics = allDeviceMetrics[mi];
335 if (deviceMetrics.changed.size() != deviceMetrics.metrics.size()) {
336 throw std::runtime_error("deviceMetrics.size() != allDeviceMetrics.size()");
337 }
338 auto& indices = allIndices[mi];
339 {
340 size_t index = indices.arrowBytesCreated;
341 assert(index < deviceMetrics.metrics.size());
342 changed |= deviceMetrics.changed[index];
343 MetricInfo info = deviceMetrics.metrics[index];
344 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
345 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
346 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
347 auto value = (int64_t)data[(info.pos - 1) % data.size()];
348 totalBytesCreated += value;
349 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
350 }
351 {
352 size_t index = indices.shmOfferBytesConsumed;
353 assert(index < deviceMetrics.metrics.size());
354 changed |= deviceMetrics.changed[index];
355 MetricInfo info = deviceMetrics.metrics[index];
356 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
357 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
358 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
359 auto value = (int64_t)data[(info.pos - 1) % data.size()];
360 shmOfferBytesConsumed += value;
361 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
362 }
363 {
364 size_t index = indices.arrowBytesDestroyed;
365 assert(index < deviceMetrics.metrics.size());
366 changed |= deviceMetrics.changed[index];
367 MetricInfo info = deviceMetrics.metrics[index];
368 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
369 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
370 auto value = (int64_t)data[(info.pos - 1) % data.size()];
371 totalBytesDestroyed += value;
372 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
373 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
374 }
375 {
376 size_t index = indices.arrowBytesExpired;
377 assert(index < deviceMetrics.metrics.size());
378 changed |= deviceMetrics.changed[index];
379 MetricInfo info = deviceMetrics.metrics[index];
380 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
381 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
382 auto value = (int64_t)data[(info.pos - 1) % data.size()];
383 totalBytesExpired += value;
384 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
385 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
386 }
387 {
388 size_t index = indices.arrowMessagesCreated;
389 assert(index < deviceMetrics.metrics.size());
390 MetricInfo info = deviceMetrics.metrics[index];
391 changed |= deviceMetrics.changed[index];
392 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
393 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
394 auto value = (int64_t)data[(info.pos - 1) % data.size()];
395 totalMessagesCreated += value;
396 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
397 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
398 }
399 {
400 size_t index = indices.arrowMessagesDestroyed;
401 assert(index < deviceMetrics.metrics.size());
402 MetricInfo info = deviceMetrics.metrics[index];
403 changed |= deviceMetrics.changed[index];
404 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
405 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
406 auto value = (int64_t)data[(info.pos - 1) % data.size()];
407 totalMessagesDestroyed += value;
408 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
409 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
410 }
411 {
412 size_t index = indices.timeframesRead;
413 assert(index < deviceMetrics.metrics.size());
414 changed |= deviceMetrics.changed[index];
415 MetricInfo info = deviceMetrics.metrics[index];
416 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
417 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
418 auto value = (int64_t)data[(info.pos - 1) % data.size()];
419 totalTimeframesRead += value;
420 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
421 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
422 }
423 {
424 size_t index = indices.timeframesConsumed;
425 assert(index < deviceMetrics.metrics.size());
426 changed |= deviceMetrics.changed[index];
427 MetricInfo info = deviceMetrics.metrics[index];
428 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
429 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
430 auto value = (int64_t)data[(info.pos - 1) % data.size()];
431 totalTimeframesConsumed += value;
432 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
433 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
434 }
435 {
436 size_t index = indices.timeframesExpired;
437 assert(index < deviceMetrics.metrics.size());
438 changed |= deviceMetrics.changed[index];
439 MetricInfo info = deviceMetrics.metrics[index];
440 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
441 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
442 auto value = (int64_t)data[(info.pos - 1) % data.size()];
443 totalTimeframesExpired += value;
444 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
445 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
446 }
447 processTimeslices(indices.timeslicesStarted, deviceMetrics, changed, totalTimeslicesStarted, lastTimestamp);
448 processTimeslices(indices.timeslicesExpired, deviceMetrics, changed, totalTimeslicesExpired, lastTimestamp);
449 processTimeslices(indices.timeslicesDone, deviceMetrics, changed, totalTimeslicesDone, lastTimestamp);
450 }
451 static uint64_t unchangedCount = 0;
452 if (changed) {
453 totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp);
454 totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp);
455 totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp);
456 shmOfferConsumedMetric(driverMetrics, shmOfferBytesConsumed, timestamp);
457 totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp);
458 totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
459 totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
460 totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
461 totalTimeframesInFlyMetric(driverMetrics, (int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
462 totalTimeslicesStartedMetric(driverMetrics, totalTimeslicesStarted, timestamp);
463 totalTimeslicesExpiredMetric(driverMetrics, totalTimeslicesExpired, timestamp);
464 totalTimeslicesDoneMetric(driverMetrics, totalTimeslicesDone, timestamp);
465 totalTimeslicesInFlyMetric(driverMetrics, (int)(totalTimeslicesStarted - totalTimeslicesDone), timestamp);
466 totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
467 } else {
468 unchangedCount++;
469 }
470 changedCountMetric(driverMetrics, unchangedCount, timestamp);
471
472 static const ResourceSpec shmResourceSpec{
473 .name = "shared memory",
474 .unit = "MB",
475 .api = "/shm-offer {}",
476 .maxAvailable = (int64_t)registry.get<RateLimitConfig>().maxMemory,
477 .maxQuantum = 100,
478 .minQuantum = 50,
479 .metricOfferScaleFactor = 1000000,
480 };
481 static const ResourceSpec timesliceResourceSpec{
482 .name = "timeslice",
483 .unit = "timeslices",
484 .api = "/timeslice-offer {}",
485 .maxAvailable = (int64_t)registry.get<RateLimitConfig>().maxTimeframes,
486 .maxQuantum = 1,
487 .minQuantum = 1,
488 .metricOfferScaleFactor = 1,
489 };
490 static ResourceState shmResourceState{
491 .available = shmResourceSpec.maxAvailable,
492 };
493 static ResourceState timesliceResourceState{
494 .available = timesliceResourceSpec.maxAvailable,
495 };
496 static ResourceStats shmResourceStats{
497 .enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
498 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
499 };
500 static ResourceStats timesliceResourceStats{
501 .enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
502 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
503 };
504
505 offerResources(timesliceResourceState, timesliceResourceSpec, timesliceResourceStats,
506 specs, infos, manager, totalTimeframesConsumed, totalTimeslicesExpired,
507 totalTimeslicesStarted, totalTimeslicesDone, timestamp, driverMetrics,
508 availableTimeslicesMetric, unusedOfferedTimeslicesMetric, offeredTimeslicesMetric,
509 (void*)&sm);
510
511 offerResources(shmResourceState, shmResourceSpec, shmResourceStats,
512 specs, infos, manager, shmOfferBytesConsumed, totalBytesExpired,
513 totalBytesCreated, totalBytesDestroyed, timestamp, driverMetrics,
514 availableSharedMemoryMetric, unusedOfferedSharedMemoryMetric, offeredSharedMemoryMetric,
515 (void*)&sm); },
516 .postDispatching = [](ProcessingContext& ctx, void* service) {
518 auto* arrow = reinterpret_cast<ArrowContext*>(service);
519 auto totalBytes = 0;
520 auto totalMessages = 0;
521 O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, &arrow);
522 for (auto& input : ctx.inputs()) {
523 if (input.header == nullptr) {
524 continue;
525 }
526 auto const* dh = DataRefUtils::getHeader<DataHeader*>(input);
527 auto payloadSize = DataRefUtils::getPayloadSize(input);
528 if (dh->serialization != o2::header::gSerializationMethodArrow) {
529 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
530 "Message %{public}.4s/%{public}.16s is not of kind arrow, therefore we are not accounting its shared memory.",
531 dh->dataOrigin.str, dh->dataDescription.str);
532 continue;
533 }
534 bool forwarded = false;
535 for (auto const& forward : ctx.services().get<DeviceSpec const>().forwards) {
536 if (DataSpecUtils::match(forward.matcher, *dh)) {
537 forwarded = true;
538 break;
539 }
540 }
541 if (forwarded) {
542 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
543 "Message %{public}.4s/%{public}.16s is forwarded so we are not returning its memory.",
544 dh->dataOrigin.str, dh->dataDescription.str);
545 continue;
546 }
547 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
548 "Message %{public}.4s/%{public}.16s is being deleted. We will return %{bytes}f MB.",
549 dh->dataOrigin.str, dh->dataDescription.str, payloadSize / 1000000.);
550 totalBytes += payloadSize;
551 totalMessages += 1;
552 }
553 arrow->updateBytesDestroyed(totalBytes);
554 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "give back",
555 "%{bytes}f MB bytes being given back to reader, totaling %{bytes}f MB",
556 totalBytes / 1000000., arrow->bytesDestroyed() / 1000000.);
557 arrow->updateMessagesDestroyed(totalMessages);
558 auto& stats = ctx.services().get<DataProcessingStats>();
559 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_DESTROYED), DataProcessingStats::Op::Set, static_cast<int64_t>(arrow->bytesDestroyed())});
560 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_DESTROYED), DataProcessingStats::Op::Set, static_cast<int64_t>(arrow->messagesDestroyed())});
561 stats.processCommandQueue(); },
562 .driverInit = [](ServiceRegistryRef registry, DeviceConfig const& dc) {
563 auto config = new RateLimitConfig{};
564 int readers = std::stoll(dc.options["readers"].as<std::string>());
565 if (dc.options.count("aod-memory-rate-limit") && dc.options["aod-memory-rate-limit"].defaulted() == false) {
566 config->maxMemory = std::stoll(dc.options["aod-memory-rate-limit"].as<std::string>()) / 1000000;
567 } else {
568 config->maxMemory = readers * 500;
569 }
570 if (dc.options.count("timeframes-rate-limit") && dc.options["timeframes-rate-limit"].defaulted() == false) {
571 config->maxTimeframes = std::stoll(dc.options["timeframes-rate-limit"].as<std::string>());
572 } else {
573 config->maxTimeframes = readers * DefaultsHelpers::pipelineLength();
574 }
575 static bool once = false;
576 // Until we guarantee this is called only once...
577 if (!once) {
578 O2_SIGNPOST_ID_GENERATE(sid, rate_limiting);
579 O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "setup",
580 "Rate limiting set up at %{bytes}llu MB and %llu timeframes distributed over %d readers",
581 config->maxMemory, config->maxTimeframes, readers);
582 registry.registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
583 once = true;
584 } },
585 .adjustTopology = [](WorkflowSpecNode& node, ConfigContext const& ctx) {
586 auto& workflow = node.specs;
587 auto spawner = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-spawner"; });
588 auto analysisCCDB = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-ccdb"; });
589 auto builder = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-index-builder"; });
590 auto reader = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-reader"; });
591 auto writer = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-writer"; });
592 auto &ac = ctx.services().get<AnalysisContext>();
593 ac.requestedAODs.clear();
594 ac.requestedDYNs.clear();
595 ac.providedDYNs.clear();
596 ac.providedTIMs.clear();
597 ac.requestedTIMs.clear();
598
599
600 auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
601 auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
602
603 if (builder != workflow.end()) {
604 // collect currently requested IDXs
605 ac.requestedIDXs.clear();
606 for (auto& d : workflow) {
607 if (d.name == builder->name) {
608 continue;
609 }
610 for (auto& i : d.inputs) {
612 auto copy = i;
613 DataSpecUtils::updateInputList(ac.requestedIDXs, std::move(copy));
614 }
615 }
616 }
617 // recreate inputs and outputs
618 builder->inputs.clear();
619 builder->outputs.clear();
620 // replace AlgorithmSpec
621 // FIXME: it should be made more generic, so it does not need replacement...
622 builder->algorithm = readers::AODReaderHelpers::indexBuilderCallback(ac.requestedIDXs);
623 AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.requestedIDXs, ac.requestedAODs, ac.requestedDYNs, *builder);
624 }
625
626 if (spawner != workflow.end()) {
627 // collect currently requested DYNs
628 for (auto& d : workflow) {
629 if (d.name == spawner->name) {
630 continue;
631 }
632 for (auto const& i : d.inputs) {
634 auto copy = i;
635 DataSpecUtils::updateInputList(ac.requestedDYNs, std::move(copy));
636 }
637 }
638 for (auto const& o : d.outputs) {
640 ac.providedDYNs.emplace_back(o);
641 }
642 }
643 }
644 std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
645 std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
646 ac.spawnerInputs.clear();
647 for (auto& input : ac.requestedDYNs) {
648 if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) {
649 ac.spawnerInputs.emplace_back(input);
650 }
651 }
652 // recreate inputs and outputs
653 spawner->outputs.clear();
654 spawner->inputs.clear();
655 // replace AlgorithmSpec
656 // FIXME: it should be made more generic, so it does not need replacement...
658 AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, *spawner);
659 }
660
661 if (analysisCCDB != workflow.end()) {
662 for (auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
663 d.inputs | views::partial_match_filter(header::DataOrigin{"ATIM"}) | sinks::update_input_list{ac.requestedTIMs};
664 d.outputs | views::partial_match_filter(header::DataOrigin{"ATIM"}) | sinks::append_to{ac.providedTIMs};
665 }
666 std::sort(ac.requestedTIMs.begin(), ac.requestedTIMs.end(), inputSpecLessThan);
667 std::sort(ac.providedTIMs.begin(), ac.providedTIMs.end(), outputSpecLessThan);
668 // Use ranges::to<std::vector<>> in C++23...
669 ac.analysisCCDBInputs.clear();
670 ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) | sinks::append_to{ac.analysisCCDBInputs};
671
672 // recreate inputs and outputs
673 analysisCCDB->outputs.clear();
674 analysisCCDB->inputs.clear();
675 // replace AlgorithmSpec
676 // FIXME: it should be made more generic, so it does not need replacement...
677 // FIXME how can I make the lookup depend on DYN tables as well??
678 analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
679 AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher({}, ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedDYNs, *analysisCCDB);
680 }
681
682 if (writer != workflow.end()) {
683 workflow.erase(writer);
684 }
685
686 if (reader != workflow.end()) {
687 // If reader and/or builder were adjusted, remove unneeded outputs
688 // update currently requested AODs
689 for (auto& d : workflow) {
690 for (auto const& i : d.inputs) {
691 if (DataSpecUtils::partialMatch(i, AODOrigins)) {
692 auto copy = i;
693 DataSpecUtils::updateInputList(ac.requestedAODs, std::move(copy));
694 }
695 }
696 }
697
698 // remove unmatched outputs
699 auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](OutputSpec const& o) {
700 return !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFNumber"}) && !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFFilename"}) && std::none_of(ac.requestedAODs.begin(), ac.requestedAODs.end(), [&](InputSpec const& i) { return DataSpecUtils::match(i, o); });
701 });
702 reader->outputs.erase(o_end, reader->outputs.end());
703 if (reader->outputs.empty()) {
704 // nothing to read
705 workflow.erase(reader);
706 }
707 }
708
709
710
711 // replace writer as some outputs may have become dangling and some are now consumed
712 auto [outputsInputs, isDangling] = WorkflowHelpers::analyzeOutputs(workflow);
713
714 // create DataOutputDescriptor
715 std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
716
717 // select outputs of type AOD which need to be saved
718 // ATTENTION: if there are dangling outputs the getGlobalAODSink
719 // has to be created in any case!
720 ac.outputsInputsAOD.clear();
721
722 for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
723 if (DataSpecUtils::partialMatch(outputsInputs[ii], extendedAODOrigins)) {
724 auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]);
725 if (!ds.empty() || isDangling[ii]) {
726 ac.outputsInputsAOD.emplace_back(outputsInputs[ii]);
727 }
728 }
729 }
730
731 // file sink for any AOD output
732 if (!ac.outputsInputsAOD.empty()) {
733 // add TFNumber and TFFilename as input to the writer
734 ac.outputsInputsAOD.emplace_back("tfn", "TFN", "TFNumber");
735 ac.outputsInputsAOD.emplace_back("tff", "TFF", "TFFilename");
736 workflow.push_back(AnalysisSupportHelpers::getGlobalAODSink(ctx));
737 }
738 // Move the dummy sink at the end, if needed
739 for (size_t i = 0; i < workflow.size(); ++i) {
740 if (workflow[i].name == "internal-dpl-injected-dummy-sink") {
741 workflow.push_back(workflow[i]);
742 workflow.erase(workflow.begin() + i);
743 break;
744 }
745 } },
746 .kind = ServiceKind::Global};
747}
748
750{
751 return ServiceSpec{
752 .name = "arrow-slicing-cache-def",
753 .uniqueId = CommonServices::simpleServiceId<ArrowTableSlicingCacheDef>(),
754 .init = CommonServices::simpleServiceInit<ArrowTableSlicingCacheDef, ArrowTableSlicingCacheDef, ServiceKind::Global>(),
755 .kind = ServiceKind::Global};
756}
757
759{
760 return ServiceSpec{
761 .name = "arrow-slicing-cache",
762 .uniqueId = CommonServices::simpleServiceId<ArrowTableSlicingCache>(),
763 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions&) { return ServiceHandle{TypeIdHelpers::uniqueId<ArrowTableSlicingCache>(),
764 new ArrowTableSlicingCache(Cache{services.get<ArrowTableSlicingCacheDef>().bindingsKeys},
765 Cache{services.get<ArrowTableSlicingCacheDef>().bindingsKeysUnsorted}),
767 .configure = CommonServices::noConfiguration(),
768 .preProcessing = [](ProcessingContext& pc, void* service_ptr) {
769 auto* service = static_cast<ArrowTableSlicingCache*>(service_ptr);
770 auto& caches = service->bindingsKeys;
771 for (auto i = 0u; i < caches.size(); ++i) {
772 if (caches[i].enabled && pc.inputs().getPos(caches[i].binding.c_str()) >= 0) {
773 auto status = service->updateCacheEntry(i, pc.inputs().get<TableConsumer>(caches[i].binding.c_str())->asArrowTable());
774 if (!status.ok()) {
775 throw runtime_error_f("Failed to update slice cache for %s/%s", caches[i].binding.c_str(), caches[i].key.c_str());
776 }
777 }
778 }
779 auto& unsortedCaches = service->bindingsKeysUnsorted;
780 for (auto i = 0u; i < unsortedCaches.size(); ++i) {
781 if (unsortedCaches[i].enabled && pc.inputs().getPos(unsortedCaches[i].binding.c_str()) >= 0) {
782 auto status = service->updateCacheEntryUnsorted(i, pc.inputs().get<TableConsumer>(unsortedCaches[i].binding.c_str())->asArrowTable());
783 if (!status.ok()) {
784 throw runtime_error_f("failed to update slice cache (unsorted) for %s/%s", unsortedCaches[i].binding.c_str(), unsortedCaches[i].key.c_str());
785 }
786 }
787 } },
788 .kind = ServiceKind::Stream};
789}
790
791} // namespace o2::framework
std::string binding
std::unique_ptr< expressions::Node > node
int32_t i
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:531
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
int getPos(const char *name) const
decltype(auto) get(R binding, int part=0) const
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
std::shared_ptr< arrow::Table > asArrowTable()
Return the table in the message as a arrow::Table instance.
GLint GLenum GLint x
Definition glcorearb.h:403
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum GLenum GLsizei const GLuint GLboolean enabled
Definition glcorearb.h:2513
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLboolean * data
Definition glcorearb.h:298
GLsizei GLenum const void * indices
Definition glcorearb.h:400
Defining PrimaryVertex explicitly as messageable.
auto spawner(std::shared_ptr< arrow::Table > const &fullTable, const char *name, o2::framework::expressions::Projector *projectors, std::shared_ptr< gandiva::Projector > &projector, std::shared_ptr< arrow::Schema > const &schema)
Expression-based column generator to materialize columns.
auto offerResources(ResourceState &resourceState, ResourceSpec const &resourceSpec, ResourceStats &resourceStats, std::vector< DeviceSpec > const &specs, std::vector< DeviceInfo > const &infos, DevicesManager &manager, int64_t offerConsumedCurrentValue, int64_t offerExpiredCurrentValue, int64_t acquiredResourceCurrentValue, int64_t disposedResourceCurrentValue, size_t timestamp, DeviceMetricsInfo &driverMetrics, std::function< void(DeviceMetricsInfo &, int value, size_t timestamp)> &availableResourceMetric, std::function< void(DeviceMetricsInfo &, int value, size_t timestamp)> &unusedOfferedResourceMetric, std::function< void(DeviceMetricsInfo &, int value, size_t timestamp)> &offeredResourceMetric, void *signpostId) -> void
std::vector< Entry > Cache
std::vector< MetricIndices > createDefaultIndices(std::vector< DeviceMetricsInfo > &allDevicesMetrics)
RuntimeErrorRef runtime_error_f(const char *,...)
constexpr o2::header::SerializationMethod gSerializationMethodArrow
Definition DataHeader.h:331
std::vector< InputSpec > requestedAODs
static void addMissingOutputsToBuilder(std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static std::shared_ptr< DataOutputDirector > getDataOutputDirector(ConfigContext const &ctx)
Get the data director.
static void addMissingOutputsToSpawner(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, DataProcessorSpec &publisher)
static void addMissingOutputsToAnalysisCCDBFetcher(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static DataProcessorSpec getGlobalAODSink(ConfigContext const &)
writes inputs of kind AOD to file
static ServiceSpec arrowTableSlicingCacheSpec()
static ServiceSpec arrowBackendSpec()
static ServiceSpec arrowTableSlicingCacheDefSpec()
static ServiceConfigureCallback noConfiguration()
Helper struct to hold statistics about the data processing happening.
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
static unsigned int pipelineLength()
get max number of timeslices in the queue
Running state information of a given device.
Definition DeviceState.h:34
void queueMessage(char const *receiver, char const *msg)
static auto loadAlgorithmFromPlugin(std::string library, std::string plugin, ConfigContext const &context) -> AlgorithmSpec
int64_t maxAvailable
The callback to give resources to a device.
int64_t minQuantum
Largest offer which can be given.
int64_t maxQuantum
Maximum available quantity for a resource.
int64_t metricOfferScaleFactor
Smallest offer which can be given.
int64_t lowCount
How many times the resources were enough.
std::string name
Name of the service.
static std::tuple< std::vector< InputSpec >, std::vector< bool > > analyzeOutputs(WorkflowSpec const &workflow)
static AlgorithmSpec aodSpawnerCallback(ConfigContext const &ctx)
static AlgorithmSpec indexBuilderCallback(std::vector< InputSpec > &requested)
the main header struct
Definition DataHeader.h:619
o2::mch::DsIndex ds