Project
Loading...
Searching...
No Matches
ArrowSupport.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "ArrowSupport.h"
12
29#include "WorkflowHelpers.h"
34#include "Framework/Signpost.h"
36
38#include <Monitoring/Monitoring.h>
39#include "Headers/DataHeader.h"
40
41#include <RtypesCore.h>
42#include <fairmq/ProgOptions.h>
43
44#include <uv.h>
45#include <boost/program_options/variables_map.hpp>
46#include <csignal>
47
49
50namespace o2::framework
51{
52
53class EndOfStreamContext;
54class ProcessingContext;
55
56enum struct RateLimitingState {
57 UNKNOWN = 0, // No information received yet.
58 STARTED = 1, // Information received, new timeframe not requested.
59 CHANGED = 2, // Information received, new timeframe requested but not yet accounted.
60 BELOW_LIMIT = 3, // New metric received, we are below limit.
61 NEXT_ITERATION_FROM_BELOW = 4, // Iteration when previously in BELOW_LIMIT.
62 ABOVE_LIMIT = 5, // New metric received, we are above limit.
63 EMPTY = 6, //
64};
65
67 int64_t maxMemory = 2000;
68 int64_t maxTimeframes = 1000;
69};
70
72 size_t arrowBytesCreated = -1;
76 size_t arrowBytesExpired = -1;
78 size_t timeframesRead = -1;
79 size_t timeframesConsumed = -1;
80 size_t timeframesExpired = -1;
81 // Timeslices counting
82 size_t timeslicesStarted = -1;
83 size_t timeslicesExpired = -1;
84 size_t timeslicesDone = -1;
85};
86
87std::vector<MetricIndices> createDefaultIndices(std::vector<DeviceMetricsInfo>& allDevicesMetrics)
88{
89 std::vector<MetricIndices> results;
90
91 for (auto& info : allDevicesMetrics) {
92 results.emplace_back(MetricIndices{
93 .arrowBytesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-created"),
94 .arrowBytesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-destroyed"),
95 .arrowMessagesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-created"),
96 .arrowMessagesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-destroyed"),
97 .arrowBytesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-expired"),
98 .shmOfferBytesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "shm-offer-bytes-consumed"),
99 .timeframesRead = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "df-sent"),
100 .timeframesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "consumed-timeframes"),
101 .timeframesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "expired-timeframes"),
102 .timeslicesStarted = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "timeslices-started"),
103 .timeslicesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "timeslices-expired"),
104 .timeslicesDone = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "timeslices-done"),
105 });
106 }
107 return results;
108}
109
111 int64_t available;
112 int64_t offered = 0;
113 int64_t lastDeviceOffered = 0;
114};
116 int64_t enoughCount;
117 int64_t lowCount;
118};
120 char const* name;
121 char const* unit;
122 char const* api;
123 int64_t maxAvailable;
124 int64_t maxQuantum;
125 int64_t minQuantum;
127};
128
129auto offerResources(ResourceState& resourceState,
130 ResourceSpec const& resourceSpec,
131 ResourceStats& resourceStats,
132 std::vector<DeviceSpec> const& specs,
133 std::vector<DeviceInfo> const& infos,
134 DevicesManager& manager,
135 int64_t offerConsumedCurrentValue,
136 int64_t offerExpiredCurrentValue,
137 int64_t acquiredResourceCurrentValue,
138 int64_t disposedResourceCurrentValue,
139 size_t timestamp,
140 DeviceMetricsInfo& driverMetrics,
141 std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& availableResourceMetric,
142 std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& unusedOfferedResourceMetric,
143 std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& offeredResourceMetric,
144 void* signpostId) -> void
145{
146 O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, signpostId);
149 int64_t lastCandidate = -1;
150 int64_t possibleOffer = resourceSpec.minQuantum;
151
152 for (size_t di = 0; di < specs.size(); di++) {
153 if (resourceState.available < possibleOffer) {
154 if (resourceStats.lowCount == 0) {
155 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "not enough",
156 "We do not have enough %{public}s (%llu %{public}s) to offer %llu %{public}s. Total offerings %{bytes}llu %{string}s.",
157 resourceSpec.name, resourceState.available, resourceSpec.unit,
158 possibleOffer, resourceSpec.unit,
159 resourceState.offered, resourceSpec.unit);
160 }
161 resourceStats.lowCount++;
162 resourceStats.enoughCount = 0;
163 break;
164 } else {
165 if (resourceStats.enoughCount == 0) {
166 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "enough",
167 "We are back in a state where we enough %{public}s: %llu %{public}s",
168 resourceSpec.name,
169 resourceState.available,
170 resourceSpec.unit);
171 }
172 resourceStats.lowCount = 0;
173 resourceStats.enoughCount++;
174 }
175 size_t candidate = (resourceState.lastDeviceOffered + di) % specs.size();
176
177 auto& info = infos[candidate];
178 // Do not bother for inactive devices
179 // FIXME: there is probably a race condition if the device died and we did not
180 // took notice yet...
181 if (info.active == false || info.readyToQuit) {
182 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
183 "Device %s is inactive not offering %{public}s to it.",
184 specs[candidate].name.c_str(), resourceSpec.name);
185 continue;
186 }
187 if (specs[candidate].name != "internal-dpl-aod-reader") {
188 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
189 "Device %s is not a reader. Not offering %{public}s to it.",
190 specs[candidate].name.c_str(),
191 resourceSpec.name);
192 continue;
193 }
194 possibleOffer = std::min(resourceSpec.maxQuantum, resourceState.available);
195 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
196 "Offering %llu %{public}s out of %llu to %{public}s",
197 possibleOffer, resourceSpec.unit, resourceState.available, specs[candidate].id.c_str());
198 manager.queueMessage(specs[candidate].id.c_str(), fmt::format(fmt::runtime(resourceSpec.api), possibleOffer).data());
199 resourceState.available -= possibleOffer;
200 resourceState.offered += possibleOffer;
201 lastCandidate = candidate;
202 }
203 // We had at least a valid candidate, so
204 // next time we offer to the next device.
205 if (lastCandidate >= 0) {
206 resourceState.lastDeviceOffered = lastCandidate + 1;
207 }
208
209 // unusedOfferedSharedMemory is the amount of memory which was offered and which we know it was
210 // not used so far. So we need to account for the amount which got actually read (readerBytesCreated)
211 // and the amount which we know was given back.
212 static int64_t lastResourceOfferConsumed = 0;
213 static int64_t lastUnusedOfferedResource = 0;
214 if (offerConsumedCurrentValue != lastResourceOfferConsumed) {
215 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
216 "Offer consumed so far %llu", offerConsumedCurrentValue);
217 lastResourceOfferConsumed = offerConsumedCurrentValue;
218 }
219 int unusedOfferedResource = (resourceState.offered - (offerExpiredCurrentValue + offerConsumedCurrentValue) / resourceSpec.metricOfferScaleFactor);
220 if (lastUnusedOfferedResource != unusedOfferedResource) {
221 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
222 "unusedOfferedResource(%{public}s):%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / %lli",
223 resourceSpec.name,
224 unusedOfferedResource, resourceState.offered,
225 offerExpiredCurrentValue / resourceSpec.metricOfferScaleFactor,
226 offerConsumedCurrentValue / resourceSpec.metricOfferScaleFactor,
227 resourceSpec.metricOfferScaleFactor);
228 lastUnusedOfferedResource = unusedOfferedResource;
229 }
230 // availableSharedMemory is the amount of memory which we know is available to be offered.
231 // We subtract the amount which we know was already offered but it's unused and we then balance how
232 // much was created with how much was destroyed.
233 resourceState.available = resourceSpec.maxAvailable + ((disposedResourceCurrentValue - acquiredResourceCurrentValue) / resourceSpec.metricOfferScaleFactor) - unusedOfferedResource;
234 availableResourceMetric(driverMetrics, resourceState.available, timestamp);
235 unusedOfferedResourceMetric(driverMetrics, unusedOfferedResource, timestamp);
236
237 offeredResourceMetric(driverMetrics, resourceState.offered, timestamp);
238};
239
240auto processTimeslices = [](size_t index, DeviceMetricsInfo& deviceMetrics, bool& changed,
241 int64_t& totalMetricValue, size_t& lastTimestamp) {
242 assert(index < deviceMetrics.metrics.size());
243 changed |= deviceMetrics.changed[index];
244 MetricInfo info = deviceMetrics.metrics[index];
245 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
246 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
247 auto value = (int64_t)data[(info.pos - 1) % data.size()];
248 totalMetricValue += value;
249 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
250 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
251};
252
254{
255 using o2::monitoring::Metric;
256 using o2::monitoring::Monitoring;
257 using o2::monitoring::tags::Key;
258 using o2::monitoring::tags::Value;
259
260 return ServiceSpec{
261 .name = "arrow-backend",
263 .configure = CommonServices::noConfiguration(),
268 .metricHandling = [](ServiceRegistryRef registry,
269 ServiceMetricsInfo const& sm,
270 size_t timestamp) {
271 int64_t totalBytesCreated = 0;
272 int64_t shmOfferBytesConsumed = 0;
273 int64_t totalBytesDestroyed = 0;
274 int64_t totalBytesExpired = 0;
275 int64_t totalMessagesCreated = 0;
276 int64_t totalMessagesDestroyed = 0;
277 int64_t totalTimeframesRead = 0;
278 int64_t totalTimeframesConsumed = 0;
279 int64_t totalTimeframesExpired = 0;
280 int64_t totalTimeslicesStarted = 0;
281 int64_t totalTimeslicesDone = 0;
282 int64_t totalTimeslicesExpired = 0;
283 auto &driverMetrics = sm.driverMetricsInfo;
284 auto &allDeviceMetrics = sm.deviceMetricsInfos;
285 auto &specs = sm.deviceSpecs;
286 auto &infos = sm.deviceInfos;
287
288 // Aggregated driver metrics for timeslice rate limiting
289 auto createUint64DriverMetric = [&driverMetrics](char const*name) -> auto {
290 return DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, name);
291 };
292 auto createIntDriverMetric = [&driverMetrics](char const*name) -> auto {
293 return DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, name);
294 };
295
296 static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "rate-limit-state");
297 static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-created");
298 static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-shm-offer-bytes-consumed");
299 // These are really to monitor the rate limiting
300 static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-shared-memory");
301 static auto unusedOfferedTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-timeslices");
302 static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-shared-memory");
303 static auto availableTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-timeslices");
304 static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-shared-memory");
305 static auto offeredTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-timeslices");
306
307 static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-destroyed");
308 static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-expired");
309 static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-created");
310 static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-destroyed");
311 static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-read");
312 static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-consumed");
313 static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-timeframes-in-fly");
314
315 static auto totalTimeslicesStartedMetric = createUint64DriverMetric("total-timeslices-started");
316 static auto totalTimeslicesExpiredMetric = createUint64DriverMetric("total-timeslices-expired");
317 static auto totalTimeslicesDoneMetric = createUint64DriverMetric("total-timeslices-done");
318 static auto totalTimeslicesInFlyMetric = createIntDriverMetric("total-timeslices-in-fly");
319
320 static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "arrow-bytes-delta");
321 static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "changed-metrics-count");
322 static auto totalSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-reader-signals");
323 static auto signalLatencyMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-signal-latency");
324 static auto skippedSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-skipped-signals");
325 static auto remainingBytes = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-remaining-bytes");
326 auto& manager = registry.get<DevicesManager>();
327
328 bool changed = false;
329
330 size_t lastTimestamp = 0;
331 static std::vector<MetricIndices> allIndices = createDefaultIndices(allDeviceMetrics);
332 for (size_t mi = 0; mi < allDeviceMetrics.size(); ++mi) {
333 auto& deviceMetrics = allDeviceMetrics[mi];
334 if (deviceMetrics.changed.size() != deviceMetrics.metrics.size()) {
335 throw std::runtime_error("deviceMetrics.size() != allDeviceMetrics.size()");
336 }
337 auto& indices = allIndices[mi];
338 {
339 size_t index = indices.arrowBytesCreated;
340 assert(index < deviceMetrics.metrics.size());
341 changed |= deviceMetrics.changed[index];
342 MetricInfo info = deviceMetrics.metrics[index];
343 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
344 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
345 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
346 auto value = (int64_t)data[(info.pos - 1) % data.size()];
347 totalBytesCreated += value;
348 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
349 }
350 {
351 size_t index = indices.shmOfferBytesConsumed;
352 assert(index < deviceMetrics.metrics.size());
353 changed |= deviceMetrics.changed[index];
354 MetricInfo info = deviceMetrics.metrics[index];
355 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
356 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
357 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
358 auto value = (int64_t)data[(info.pos - 1) % data.size()];
359 shmOfferBytesConsumed += value;
360 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
361 }
362 {
363 size_t index = indices.arrowBytesDestroyed;
364 assert(index < deviceMetrics.metrics.size());
365 changed |= deviceMetrics.changed[index];
366 MetricInfo info = deviceMetrics.metrics[index];
367 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
368 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
369 auto value = (int64_t)data[(info.pos - 1) % data.size()];
370 totalBytesDestroyed += value;
371 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
372 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
373 }
374 {
375 size_t index = indices.arrowBytesExpired;
376 assert(index < deviceMetrics.metrics.size());
377 changed |= deviceMetrics.changed[index];
378 MetricInfo info = deviceMetrics.metrics[index];
379 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
380 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
381 auto value = (int64_t)data[(info.pos - 1) % data.size()];
382 totalBytesExpired += value;
383 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
384 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
385 }
386 {
387 size_t index = indices.arrowMessagesCreated;
388 assert(index < deviceMetrics.metrics.size());
389 MetricInfo info = deviceMetrics.metrics[index];
390 changed |= deviceMetrics.changed[index];
391 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
392 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
393 auto value = (int64_t)data[(info.pos - 1) % data.size()];
394 totalMessagesCreated += value;
395 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
396 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
397 }
398 {
399 size_t index = indices.arrowMessagesDestroyed;
400 assert(index < deviceMetrics.metrics.size());
401 MetricInfo info = deviceMetrics.metrics[index];
402 changed |= deviceMetrics.changed[index];
403 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
404 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
405 auto value = (int64_t)data[(info.pos - 1) % data.size()];
406 totalMessagesDestroyed += value;
407 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
408 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
409 }
410 {
411 size_t index = indices.timeframesRead;
412 assert(index < deviceMetrics.metrics.size());
413 changed |= deviceMetrics.changed[index];
414 MetricInfo info = deviceMetrics.metrics[index];
415 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
416 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
417 auto value = (int64_t)data[(info.pos - 1) % data.size()];
418 totalTimeframesRead += value;
419 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
420 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
421 }
422 {
423 size_t index = indices.timeframesConsumed;
424 assert(index < deviceMetrics.metrics.size());
425 changed |= deviceMetrics.changed[index];
426 MetricInfo info = deviceMetrics.metrics[index];
427 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
428 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
429 auto value = (int64_t)data[(info.pos - 1) % data.size()];
430 totalTimeframesConsumed += value;
431 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
432 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
433 }
434 {
435 size_t index = indices.timeframesExpired;
436 assert(index < deviceMetrics.metrics.size());
437 changed |= deviceMetrics.changed[index];
438 MetricInfo info = deviceMetrics.metrics[index];
439 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
440 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
441 auto value = (int64_t)data[(info.pos - 1) % data.size()];
442 totalTimeframesExpired += value;
443 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
444 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
445 }
446 processTimeslices(indices.timeslicesStarted, deviceMetrics, changed, totalTimeslicesStarted, lastTimestamp);
447 processTimeslices(indices.timeslicesExpired, deviceMetrics, changed, totalTimeslicesExpired, lastTimestamp);
448 processTimeslices(indices.timeslicesDone, deviceMetrics, changed, totalTimeslicesDone, lastTimestamp);
449 }
450 static uint64_t unchangedCount = 0;
451 if (changed) {
452 totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp);
453 totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp);
454 totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp);
455 shmOfferConsumedMetric(driverMetrics, shmOfferBytesConsumed, timestamp);
456 totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp);
457 totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
458 totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
459 totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
460 totalTimeframesInFlyMetric(driverMetrics, (int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
461 totalTimeslicesStartedMetric(driverMetrics, totalTimeslicesStarted, timestamp);
462 totalTimeslicesExpiredMetric(driverMetrics, totalTimeslicesExpired, timestamp);
463 totalTimeslicesDoneMetric(driverMetrics, totalTimeslicesDone, timestamp);
464 totalTimeslicesInFlyMetric(driverMetrics, (int)(totalTimeslicesStarted - totalTimeslicesDone), timestamp);
465 totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
466 } else {
467 unchangedCount++;
468 }
469 changedCountMetric(driverMetrics, unchangedCount, timestamp);
470
471 static const ResourceSpec shmResourceSpec{
472 .name = "shared memory",
473 .unit = "MB",
474 .api = "/shm-offer {}",
475 .maxAvailable = (int64_t)registry.get<RateLimitConfig>().maxMemory,
476 .maxQuantum = 100,
477 .minQuantum = 50,
478 .metricOfferScaleFactor = 1000000,
479 };
480 static const ResourceSpec timesliceResourceSpec{
481 .name = "timeslice",
482 .unit = "timeslices",
483 .api = "/timeslice-offer {}",
484 .maxAvailable = (int64_t)registry.get<RateLimitConfig>().maxTimeframes,
485 .maxQuantum = 1,
486 .minQuantum = 1,
487 .metricOfferScaleFactor = 1,
488 };
489 static ResourceState shmResourceState{
490 .available = shmResourceSpec.maxAvailable,
491 };
492 static ResourceState timesliceResourceState{
493 .available = timesliceResourceSpec.maxAvailable,
494 };
495 static ResourceStats shmResourceStats{
496 .enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
497 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
498 };
499 static ResourceStats timesliceResourceStats{
500 .enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
501 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
502 };
503
504 offerResources(timesliceResourceState, timesliceResourceSpec, timesliceResourceStats,
505 specs, infos, manager, totalTimeframesConsumed, totalTimeslicesExpired,
506 totalTimeslicesStarted, totalTimeslicesDone, timestamp, driverMetrics,
507 availableTimeslicesMetric, unusedOfferedTimeslicesMetric, offeredTimeslicesMetric,
508 (void*)&sm);
509
510 offerResources(shmResourceState, shmResourceSpec, shmResourceStats,
511 specs, infos, manager, shmOfferBytesConsumed, totalBytesExpired,
512 totalBytesCreated, totalBytesDestroyed, timestamp, driverMetrics,
513 availableSharedMemoryMetric, unusedOfferedSharedMemoryMetric, offeredSharedMemoryMetric,
514 (void*)&sm); },
515 .postDispatching = [](ProcessingContext& ctx, void* service) {
517 auto* arrow = reinterpret_cast<ArrowContext*>(service);
518 auto totalBytes = 0;
519 auto totalMessages = 0;
520 O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, &arrow);
521 for (auto& input : ctx.inputs()) {
522 if (input.header == nullptr) {
523 continue;
524 }
525 auto const* dh = DataRefUtils::getHeader<DataHeader*>(input);
526 auto payloadSize = DataRefUtils::getPayloadSize(input);
527 if (dh->serialization != o2::header::gSerializationMethodArrow) {
528 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
529 "Message %{public}.4s/%{public}.16s is not of kind arrow, therefore we are not accounting its shared memory.",
530 dh->dataOrigin.str, dh->dataDescription.str);
531 continue;
532 }
533 bool forwarded = false;
534 for (auto const& forward : ctx.services().get<DeviceSpec const>().forwards) {
535 if (DataSpecUtils::match(forward.matcher, *dh)) {
536 forwarded = true;
537 break;
538 }
539 }
540 if (forwarded) {
541 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
542 "Message %{public}.4s/%{public}.16s is forwarded so we are not returning its memory.",
543 dh->dataOrigin.str, dh->dataDescription.str);
544 continue;
545 }
546 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
547 "Message %{public}.4s/%{public}.16s is being deleted. We will return %{bytes}f MB.",
548 dh->dataOrigin.str, dh->dataDescription.str, payloadSize / 1000000.);
549 totalBytes += payloadSize;
550 totalMessages += 1;
551 }
552 arrow->updateBytesDestroyed(totalBytes);
553 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "give back",
554 "%{bytes}f MB bytes being given back to reader, totaling %{bytes}f MB",
555 totalBytes / 1000000., arrow->bytesDestroyed() / 1000000.);
556 arrow->updateMessagesDestroyed(totalMessages);
557 auto& stats = ctx.services().get<DataProcessingStats>();
558 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_DESTROYED), DataProcessingStats::Op::Set, static_cast<int64_t>(arrow->bytesDestroyed())});
559 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_DESTROYED), DataProcessingStats::Op::Set, static_cast<int64_t>(arrow->messagesDestroyed())});
560 stats.processCommandQueue(); },
561 .driverInit = [](ServiceRegistryRef registry, DeviceConfig const& dc) {
562 auto config = new RateLimitConfig{};
563 int readers = std::stoll(dc.options["readers"].as<std::string>());
564 if (dc.options.count("aod-memory-rate-limit") && dc.options["aod-memory-rate-limit"].defaulted() == false) {
565 config->maxMemory = std::stoll(dc.options["aod-memory-rate-limit"].as<std::string>()) / 1000000;
566 } else {
567 config->maxMemory = readers * 2000;
568 }
569 if (dc.options.count("timeframes-rate-limit") && dc.options["timeframes-rate-limit"].defaulted() == false) {
570 config->maxTimeframes = std::stoll(dc.options["timeframes-rate-limit"].as<std::string>());
571 } else {
572 config->maxTimeframes = readers * DefaultsHelpers::pipelineLength();
573 }
574 static bool once = false;
575 // Until we guarantee this is called only once...
576 if (!once) {
577 O2_SIGNPOST_ID_GENERATE(sid, rate_limiting);
578 O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "setup",
579 "Rate limiting set up at %{bytes}llu MB and %llu timeframes distributed over %d readers",
580 config->maxMemory, config->maxTimeframes, readers);
581 registry.registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
582 once = true;
583 } },
584 .adjustTopology = [](WorkflowSpecNode& node, ConfigContext const& ctx) {
585 auto& workflow = node.specs;
586 auto spawner = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-spawner"; });
587 auto analysisCCDB = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-ccdb"; });
588 auto builder = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-index-builder"; });
589 auto reader = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-reader"; });
590 auto writer = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-writer"; });
591 auto &ac = ctx.services().get<AnalysisContext>();
592 ac.requestedAODs.clear();
593 ac.requestedDYNs.clear();
594 ac.providedDYNs.clear();
595 ac.providedTIMs.clear();
596 ac.requestedTIMs.clear();
597
598
599 auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
600 auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
601
602 if (builder != workflow.end()) {
603 // collect currently requested IDXs
604 ac.requestedIDXs.clear();
605 for (auto& d : workflow) {
606 if (d.name == builder->name) {
607 continue;
608 }
609 for (auto& i : d.inputs) {
611 auto copy = i;
612 DataSpecUtils::updateInputList(ac.requestedIDXs, std::move(copy));
613 }
614 }
615 }
616 // recreate inputs and outputs
617 builder->inputs.clear();
618 builder->outputs.clear();
619 // replace AlgorithmSpec
620 // FIXME: it should be made more generic, so it does not need replacement...
621 builder->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "IndexTableBuilder", ctx); // readers::AODReaderHelpers::indexBuilderCallback(ctx);
622 AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.requestedIDXs, ac.requestedAODs, ac.requestedDYNs, *builder);
623 }
624
625 if (spawner != workflow.end()) {
626 // collect currently requested DYNs
627 for (auto& d : workflow) {
628 if (d.name == spawner->name) {
629 continue;
630 }
631 for (auto const& i : d.inputs) {
633 auto copy = i;
634 DataSpecUtils::updateInputList(ac.requestedDYNs, std::move(copy));
635 }
636 }
637 for (auto const& o : d.outputs) {
639 ac.providedDYNs.emplace_back(o);
640 }
641 }
642 }
643 std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
644 std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
645 ac.spawnerInputs.clear();
646 for (auto& input : ac.requestedDYNs) {
647 if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) {
648 ac.spawnerInputs.emplace_back(input);
649 }
650 }
651 // recreate inputs and outputs
652 spawner->outputs.clear();
653 spawner->inputs.clear();
654 // replace AlgorithmSpec
655 // FIXME: it should be made more generic, so it does not need replacement...
656 spawner->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "ExtendedTableSpawner", ctx);
657 AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, *spawner);
658 }
659
660 if (analysisCCDB != workflow.end()) {
661 for (auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
662 d.inputs | views::partial_match_filter(header::DataOrigin{"ATIM"}) | sinks::update_input_list{ac.requestedTIMs};
663 d.outputs | views::partial_match_filter(header::DataOrigin{"ATIM"}) | sinks::append_to{ac.providedTIMs};
664 }
665 std::sort(ac.requestedTIMs.begin(), ac.requestedTIMs.end(), inputSpecLessThan);
666 std::sort(ac.providedTIMs.begin(), ac.providedTIMs.end(), outputSpecLessThan);
667 // Use ranges::to<std::vector<>> in C++23...
668 ac.analysisCCDBInputs.clear();
669 ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) | sinks::append_to{ac.analysisCCDBInputs};
670
671 // recreate inputs and outputs
672 analysisCCDB->outputs.clear();
673 analysisCCDB->inputs.clear();
674 // replace AlgorithmSpec
675 // FIXME: it should be made more generic, so it does not need replacement...
676 // FIXME how can I make the lookup depend on DYN tables as well??
677 analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
678 AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher({}, ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedDYNs, *analysisCCDB);
679 }
680
681 if (writer != workflow.end()) {
682 workflow.erase(writer);
683 }
684
685 if (reader != workflow.end()) {
686 // If reader and/or builder were adjusted, remove unneeded outputs
687 // update currently requested AODs
688 for (auto& d : workflow) {
689 for (auto const& i : d.inputs) {
690 if (DataSpecUtils::partialMatch(i, AODOrigins)) {
691 auto copy = i;
692 DataSpecUtils::updateInputList(ac.requestedAODs, std::move(copy));
693 }
694 }
695 }
696
697 // remove unmatched outputs
698 auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](OutputSpec const& o) {
699 return !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFNumber"}) && !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFFilename"}) && std::none_of(ac.requestedAODs.begin(), ac.requestedAODs.end(), [&](InputSpec const& i) { return DataSpecUtils::match(i, o); });
700 });
701 reader->outputs.erase(o_end, reader->outputs.end());
702 if (reader->outputs.empty()) {
703 // nothing to read
704 workflow.erase(reader);
705 }
706 }
707
708
709
710 // replace writer as some outputs may have become dangling and some are now consumed
711 auto [outputsInputs, isDangling] = WorkflowHelpers::analyzeOutputs(workflow);
712
713 // create DataOutputDescriptor
714 std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
715
716 // select outputs of type AOD which need to be saved
717 // ATTENTION: if there are dangling outputs the getGlobalAODSink
718 // has to be created in any case!
719 ac.outputsInputsAOD.clear();
720
721 for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
722 if (DataSpecUtils::partialMatch(outputsInputs[ii], extendedAODOrigins)) {
723 auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]);
724 if (!ds.empty() || isDangling[ii]) {
725 ac.outputsInputsAOD.emplace_back(outputsInputs[ii]);
726 }
727 }
728 }
729
730 // file sink for any AOD output
731 if (!ac.outputsInputsAOD.empty()) {
732 // add TFNumber and TFFilename as input to the writer
733 ac.outputsInputsAOD.emplace_back("tfn", "TFN", "TFNumber");
734 ac.outputsInputsAOD.emplace_back("tff", "TFF", "TFFilename");
735 workflow.push_back(AnalysisSupportHelpers::getGlobalAODSink(ctx));
736 }
737 // Move the dummy sink at the end, if needed
738 for (size_t i = 0; i < workflow.size(); ++i) {
739 if (workflow[i].name == "internal-dpl-injected-dummy-sink") {
740 workflow.push_back(workflow[i]);
741 workflow.erase(workflow.begin() + i);
742 break;
743 }
744 } },
745 .kind = ServiceKind::Global};
746}
747
749{
750 return ServiceSpec{
751 .name = "arrow-slicing-cache-def",
752 .uniqueId = CommonServices::simpleServiceId<ArrowTableSlicingCacheDef>(),
753 .init = CommonServices::simpleServiceInit<ArrowTableSlicingCacheDef, ArrowTableSlicingCacheDef, ServiceKind::Global>(),
754 .kind = ServiceKind::Global};
755}
756
758{
759 return ServiceSpec{
760 .name = "arrow-slicing-cache",
761 .uniqueId = CommonServices::simpleServiceId<ArrowTableSlicingCache>(),
762 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions&) { return ServiceHandle{TypeIdHelpers::uniqueId<ArrowTableSlicingCache>(),
763 new ArrowTableSlicingCache(Cache{services.get<ArrowTableSlicingCacheDef>().bindingsKeys},
764 Cache{services.get<ArrowTableSlicingCacheDef>().bindingsKeysUnsorted}),
766 .configure = CommonServices::noConfiguration(),
767 .preProcessing = [](ProcessingContext& pc, void* service_ptr) {
768 auto* service = static_cast<ArrowTableSlicingCache*>(service_ptr);
769 auto& caches = service->bindingsKeys;
770 for (auto i = 0u; i < caches.size(); ++i) {
771 if (caches[i].enabled && pc.inputs().getPos(caches[i].binding.c_str()) >= 0) {
772 auto status = service->updateCacheEntry(i, pc.inputs().get<TableConsumer>(caches[i].binding.c_str())->asArrowTable());
773 if (!status.ok()) {
774 throw runtime_error_f("Failed to update slice cache for %s/%s", caches[i].binding.c_str(), caches[i].key.c_str());
775 }
776 }
777 }
778 auto& unsortedCaches = service->bindingsKeysUnsorted;
779 for (auto i = 0u; i < unsortedCaches.size(); ++i) {
780 if (unsortedCaches[i].enabled && pc.inputs().getPos(unsortedCaches[i].binding.c_str()) >= 0) {
781 auto status = service->updateCacheEntryUnsorted(i, pc.inputs().get<TableConsumer>(unsortedCaches[i].binding.c_str())->asArrowTable());
782 if (!status.ok()) {
783 throw runtime_error_f("failed to update slice cache (unsorted) for %s/%s", unsortedCaches[i].binding.c_str(), unsortedCaches[i].key.c_str());
784 }
785 }
786 } },
787 .kind = ServiceKind::Stream};
788}
789
790} // namespace o2::framework
std::string binding
std::unique_ptr< expressions::Node > node
int32_t i
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:531
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
int getPos(const char *name) const
decltype(auto) get(R binding, int part=0) const
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
std::shared_ptr< arrow::Table > asArrowTable()
Return the table in the message as a arrow::Table instance.
GLint GLenum GLint x
Definition glcorearb.h:403
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum GLenum GLsizei const GLuint GLboolean enabled
Definition glcorearb.h:2513
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLboolean * data
Definition glcorearb.h:298
GLsizei GLenum const void * indices
Definition glcorearb.h:400
Defining PrimaryVertex explicitly as messageable.
auto spawner(std::shared_ptr< arrow::Table > const &fullTable, const char *name, o2::framework::expressions::Projector *projectors, std::shared_ptr< gandiva::Projector > &projector, std::shared_ptr< arrow::Schema > const &schema)
Expression-based column generator to materialize columns.
auto offerResources(ResourceState &resourceState, ResourceSpec const &resourceSpec, ResourceStats &resourceStats, std::vector< DeviceSpec > const &specs, std::vector< DeviceInfo > const &infos, DevicesManager &manager, int64_t offerConsumedCurrentValue, int64_t offerExpiredCurrentValue, int64_t acquiredResourceCurrentValue, int64_t disposedResourceCurrentValue, size_t timestamp, DeviceMetricsInfo &driverMetrics, std::function< void(DeviceMetricsInfo &, int value, size_t timestamp)> &availableResourceMetric, std::function< void(DeviceMetricsInfo &, int value, size_t timestamp)> &unusedOfferedResourceMetric, std::function< void(DeviceMetricsInfo &, int value, size_t timestamp)> &offeredResourceMetric, void *signpostId) -> void
std::vector< Entry > Cache
std::vector< MetricIndices > createDefaultIndices(std::vector< DeviceMetricsInfo > &allDevicesMetrics)
RuntimeErrorRef runtime_error_f(const char *,...)
constexpr o2::header::SerializationMethod gSerializationMethodArrow
Definition DataHeader.h:331
std::vector< InputSpec > requestedAODs
static void addMissingOutputsToBuilder(std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static std::shared_ptr< DataOutputDirector > getDataOutputDirector(ConfigContext const &ctx)
Get the data director.
static void addMissingOutputsToSpawner(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, DataProcessorSpec &publisher)
static void addMissingOutputsToAnalysisCCDBFetcher(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static DataProcessorSpec getGlobalAODSink(ConfigContext const &)
writes inputs of kind AOD to file
static ServiceSpec arrowTableSlicingCacheSpec()
static ServiceSpec arrowBackendSpec()
static ServiceSpec arrowTableSlicingCacheDefSpec()
static ServiceConfigureCallback noConfiguration()
Helper struct to hold statistics about the data processing happening.
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
static unsigned int pipelineLength()
get max number of timeslices in the queue
Running state information of a given device.
Definition DeviceState.h:34
void queueMessage(char const *receiver, char const *msg)
static auto loadAlgorithmFromPlugin(std::string library, std::string plugin, ConfigContext const &context) -> AlgorithmSpec
int64_t maxAvailable
The callback to give resources to a device.
int64_t minQuantum
Largest offer which can be given.
int64_t maxQuantum
Maximum available quantity for a resource.
int64_t metricOfferScaleFactor
Smallest offer which can be given.
int64_t lowCount
How many times the resources were enough.
std::string name
Name of the service.
static std::tuple< std::vector< InputSpec >, std::vector< bool > > analyzeOutputs(WorkflowSpec const &workflow)
the main header struct
Definition DataHeader.h:619
o2::mch::DsIndex ds