Project
Loading...
Searching...
No Matches
ArrowSupport.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "ArrowSupport.h"
12
30#include "WorkflowHelpers.h"
35#include "Framework/Signpost.h"
37
39#include <Monitoring/Monitoring.h>
40#include "Headers/DataHeader.h"
41
42#include <RtypesCore.h>
43#include <fairmq/ProgOptions.h>
44
45#include <uv.h>
46#include <boost/program_options/variables_map.hpp>
47#include <csignal>
48
50
51namespace o2::framework
52{
53
54class EndOfStreamContext;
55class ProcessingContext;
56
57enum struct RateLimitingState {
58 UNKNOWN = 0, // No information received yet.
59 STARTED = 1, // Information received, new timeframe not requested.
60 CHANGED = 2, // Information received, new timeframe requested but not yet accounted.
61 BELOW_LIMIT = 3, // New metric received, we are below limit.
62 NEXT_ITERATION_FROM_BELOW = 4, // Iteration when previously in BELOW_LIMIT.
63 ABOVE_LIMIT = 5, // New metric received, we are above limit.
64 EMPTY = 6, //
65};
66
68 int64_t maxMemory = 2000;
69 int64_t maxTimeframes = 1000;
70};
71
73 size_t arrowBytesCreated = -1;
77 size_t arrowBytesExpired = -1;
79 size_t timeframesRead = -1;
80 size_t timeframesConsumed = -1;
81 size_t timeframesExpired = -1;
82 // Timeslices counting
83 size_t timeslicesStarted = -1;
84 size_t timeslicesExpired = -1;
85 size_t timeslicesDone = -1;
86};
87
88std::vector<MetricIndices> createDefaultIndices(std::vector<DeviceMetricsInfo>& allDevicesMetrics)
89{
90 std::vector<MetricIndices> results;
91
92 for (auto& info : allDevicesMetrics) {
93 results.emplace_back(MetricIndices{
94 .arrowBytesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-created"),
95 .arrowBytesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-destroyed"),
96 .arrowMessagesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-created"),
97 .arrowMessagesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-destroyed"),
98 .arrowBytesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-expired"),
99 .shmOfferBytesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "shm-offer-bytes-consumed"),
100 .timeframesRead = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "df-sent"),
101 .timeframesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "consumed-timeframes"),
102 .timeframesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "expired-timeframes"),
103 .timeslicesStarted = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "timeslices-started"),
104 .timeslicesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "timeslices-expired"),
105 .timeslicesDone = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "timeslices-done"),
106 });
107 }
108 return results;
109}
110
112 int64_t available;
113 int64_t offered = 0;
114 int64_t lastDeviceOffered = 0;
115};
117 int64_t enoughCount;
118 int64_t lowCount;
119};
121 char const* name;
122 char const* unit;
123 char const* api;
124 int64_t maxAvailable;
125 int64_t maxQuantum;
126 int64_t minQuantum;
128};
129
130auto offerResources(ResourceState& resourceState,
131 ResourceSpec const& resourceSpec,
132 ResourceStats& resourceStats,
133 std::vector<DeviceSpec> const& specs,
134 std::vector<DeviceInfo> const& infos,
135 DevicesManager& manager,
136 int64_t offerConsumedCurrentValue,
137 int64_t offerExpiredCurrentValue,
138 int64_t acquiredResourceCurrentValue,
139 int64_t disposedResourceCurrentValue,
140 size_t timestamp,
141 DeviceMetricsInfo& driverMetrics,
142 std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& availableResourceMetric,
143 std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& unusedOfferedResourceMetric,
144 std::function<void(DeviceMetricsInfo&, int value, size_t timestamp)>& offeredResourceMetric,
145 void* signpostId) -> void
146{
147 O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, signpostId);
150 int64_t lastCandidate = -1;
151 int64_t possibleOffer = resourceSpec.minQuantum;
152
153 for (size_t di = 0; di < specs.size(); di++) {
154 if (resourceState.available < possibleOffer) {
155 if (resourceStats.lowCount == 0) {
156 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "not enough",
157 "We do not have enough %{public}s (%llu %{public}s) to offer %llu %{public}s. Total offerings %{bytes}llu %{string}s.",
158 resourceSpec.name, resourceState.available, resourceSpec.unit,
159 possibleOffer, resourceSpec.unit,
160 resourceState.offered, resourceSpec.unit);
161 }
162 resourceStats.lowCount++;
163 resourceStats.enoughCount = 0;
164 break;
165 } else {
166 if (resourceStats.enoughCount == 0) {
167 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "enough",
168 "We are back in a state where we enough %{public}s: %llu %{public}s",
169 resourceSpec.name,
170 resourceState.available,
171 resourceSpec.unit);
172 }
173 resourceStats.lowCount = 0;
174 resourceStats.enoughCount++;
175 }
176 size_t candidate = (resourceState.lastDeviceOffered + di) % specs.size();
177
178 auto& info = infos[candidate];
179 // Do not bother for inactive devices
180 // FIXME: there is probably a race condition if the device died and we did not
181 // took notice yet...
182 if (info.active == false || info.readyToQuit) {
183 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
184 "Device %s is inactive not offering %{public}s to it.",
185 specs[candidate].name.c_str(), resourceSpec.name);
186 continue;
187 }
188 if (specs[candidate].name != "internal-dpl-aod-reader") {
189 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
190 "Device %s is not a reader. Not offering %{public}s to it.",
191 specs[candidate].name.c_str(),
192 resourceSpec.name);
193 continue;
194 }
195 possibleOffer = std::min(resourceSpec.maxQuantum, resourceState.available);
196 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
197 "Offering %llu %{public}s out of %llu to %{public}s",
198 possibleOffer, resourceSpec.unit, resourceState.available, specs[candidate].id.c_str());
199 manager.queueMessage(specs[candidate].id.c_str(), fmt::format(fmt::runtime(resourceSpec.api), possibleOffer).data());
200 resourceState.available -= possibleOffer;
201 resourceState.offered += possibleOffer;
202 lastCandidate = candidate;
203 }
204 // We had at least a valid candidate, so
205 // next time we offer to the next device.
206 if (lastCandidate >= 0) {
207 resourceState.lastDeviceOffered = lastCandidate + 1;
208 }
209
210 // unusedOfferedSharedMemory is the amount of memory which was offered and which we know it was
211 // not used so far. So we need to account for the amount which got actually read (readerBytesCreated)
212 // and the amount which we know was given back.
213 static int64_t lastResourceOfferConsumed = 0;
214 static int64_t lastUnusedOfferedResource = 0;
215 if (offerConsumedCurrentValue != lastResourceOfferConsumed) {
216 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
217 "Offer consumed so far %llu", offerConsumedCurrentValue);
218 lastResourceOfferConsumed = offerConsumedCurrentValue;
219 }
220 int unusedOfferedResource = (resourceState.offered - (offerExpiredCurrentValue + offerConsumedCurrentValue) / resourceSpec.metricOfferScaleFactor);
221 if (lastUnusedOfferedResource != unusedOfferedResource) {
222 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
223 "unusedOfferedResource(%{public}s):%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / %lli",
224 resourceSpec.name,
225 unusedOfferedResource, resourceState.offered,
226 offerExpiredCurrentValue / resourceSpec.metricOfferScaleFactor,
227 offerConsumedCurrentValue / resourceSpec.metricOfferScaleFactor,
228 resourceSpec.metricOfferScaleFactor);
229 lastUnusedOfferedResource = unusedOfferedResource;
230 }
231 // availableSharedMemory is the amount of memory which we know is available to be offered.
232 // We subtract the amount which we know was already offered but it's unused and we then balance how
233 // much was created with how much was destroyed.
234 resourceState.available = resourceSpec.maxAvailable + ((disposedResourceCurrentValue - acquiredResourceCurrentValue) / resourceSpec.metricOfferScaleFactor) - unusedOfferedResource;
235 availableResourceMetric(driverMetrics, resourceState.available, timestamp);
236 unusedOfferedResourceMetric(driverMetrics, unusedOfferedResource, timestamp);
237
238 offeredResourceMetric(driverMetrics, resourceState.offered, timestamp);
239};
240
241auto processTimeslices = [](size_t index, DeviceMetricsInfo& deviceMetrics, bool& changed,
242 int64_t& totalMetricValue, size_t& lastTimestamp) {
243 assert(index < deviceMetrics.metrics.size());
244 changed |= deviceMetrics.changed[index];
245 MetricInfo info = deviceMetrics.metrics[index];
246 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
247 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
248 auto value = (int64_t)data[(info.pos - 1) % data.size()];
249 totalMetricValue += value;
250 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
251 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
252};
253
255{
256 using o2::monitoring::Metric;
257 using o2::monitoring::Monitoring;
258 using o2::monitoring::tags::Key;
259 using o2::monitoring::tags::Value;
260
261 return ServiceSpec{
262 .name = "arrow-backend",
264 .configure = CommonServices::noConfiguration(),
269 .metricHandling = [](ServiceRegistryRef registry,
270 ServiceMetricsInfo const& sm,
271 size_t timestamp) {
272 int64_t totalBytesCreated = 0;
273 int64_t shmOfferBytesConsumed = 0;
274 int64_t totalBytesDestroyed = 0;
275 int64_t totalBytesExpired = 0;
276 int64_t totalMessagesCreated = 0;
277 int64_t totalMessagesDestroyed = 0;
278 int64_t totalTimeframesRead = 0;
279 int64_t totalTimeframesConsumed = 0;
280 int64_t totalTimeframesExpired = 0;
281 int64_t totalTimeslicesStarted = 0;
282 int64_t totalTimeslicesDone = 0;
283 int64_t totalTimeslicesExpired = 0;
284 auto &driverMetrics = sm.driverMetricsInfo;
285 auto &allDeviceMetrics = sm.deviceMetricsInfos;
286 auto &specs = sm.deviceSpecs;
287 auto &infos = sm.deviceInfos;
288
289 // Aggregated driver metrics for timeslice rate limiting
290 auto createUint64DriverMetric = [&driverMetrics](char const*name) -> auto {
291 return DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, name);
292 };
293 auto createIntDriverMetric = [&driverMetrics](char const*name) -> auto {
294 return DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, name);
295 };
296
297 static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "rate-limit-state");
298 static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-created");
299 static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-shm-offer-bytes-consumed");
300 // These are really to monitor the rate limiting
301 static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-shared-memory");
302 static auto unusedOfferedTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-timeslices");
303 static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-shared-memory");
304 static auto availableTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-timeslices");
305 static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-shared-memory");
306 static auto offeredTimeslicesMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-timeslices");
307
308 static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-destroyed");
309 static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-expired");
310 static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-created");
311 static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-destroyed");
312 static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-read");
313 static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-consumed");
314 static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-timeframes-in-fly");
315
316 static auto totalTimeslicesStartedMetric = createUint64DriverMetric("total-timeslices-started");
317 static auto totalTimeslicesExpiredMetric = createUint64DriverMetric("total-timeslices-expired");
318 static auto totalTimeslicesDoneMetric = createUint64DriverMetric("total-timeslices-done");
319 static auto totalTimeslicesInFlyMetric = createIntDriverMetric("total-timeslices-in-fly");
320
321 static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "arrow-bytes-delta");
322 static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "changed-metrics-count");
323 static auto totalSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-reader-signals");
324 static auto signalLatencyMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-signal-latency");
325 static auto skippedSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-skipped-signals");
326 static auto remainingBytes = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-remaining-bytes");
327 auto& manager = registry.get<DevicesManager>();
328
329 bool changed = false;
330
331 size_t lastTimestamp = 0;
332 static std::vector<MetricIndices> allIndices = createDefaultIndices(allDeviceMetrics);
333 for (size_t mi = 0; mi < allDeviceMetrics.size(); ++mi) {
334 auto& deviceMetrics = allDeviceMetrics[mi];
335 if (deviceMetrics.changed.size() != deviceMetrics.metrics.size()) {
336 throw std::runtime_error("deviceMetrics.size() != allDeviceMetrics.size()");
337 }
338 auto& indices = allIndices[mi];
339 {
340 size_t index = indices.arrowBytesCreated;
341 assert(index < deviceMetrics.metrics.size());
342 changed |= deviceMetrics.changed[index];
343 MetricInfo info = deviceMetrics.metrics[index];
344 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
345 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
346 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
347 auto value = (int64_t)data[(info.pos - 1) % data.size()];
348 totalBytesCreated += value;
349 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
350 }
351 {
352 size_t index = indices.shmOfferBytesConsumed;
353 assert(index < deviceMetrics.metrics.size());
354 changed |= deviceMetrics.changed[index];
355 MetricInfo info = deviceMetrics.metrics[index];
356 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
357 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
358 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
359 auto value = (int64_t)data[(info.pos - 1) % data.size()];
360 shmOfferBytesConsumed += value;
361 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
362 }
363 {
364 size_t index = indices.arrowBytesDestroyed;
365 assert(index < deviceMetrics.metrics.size());
366 changed |= deviceMetrics.changed[index];
367 MetricInfo info = deviceMetrics.metrics[index];
368 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
369 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
370 auto value = (int64_t)data[(info.pos - 1) % data.size()];
371 totalBytesDestroyed += value;
372 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
373 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
374 }
375 {
376 size_t index = indices.arrowBytesExpired;
377 assert(index < deviceMetrics.metrics.size());
378 changed |= deviceMetrics.changed[index];
379 MetricInfo info = deviceMetrics.metrics[index];
380 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
381 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
382 auto value = (int64_t)data[(info.pos - 1) % data.size()];
383 totalBytesExpired += value;
384 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
385 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
386 }
387 {
388 size_t index = indices.arrowMessagesCreated;
389 assert(index < deviceMetrics.metrics.size());
390 MetricInfo info = deviceMetrics.metrics[index];
391 changed |= deviceMetrics.changed[index];
392 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
393 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
394 auto value = (int64_t)data[(info.pos - 1) % data.size()];
395 totalMessagesCreated += value;
396 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
397 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
398 }
399 {
400 size_t index = indices.arrowMessagesDestroyed;
401 assert(index < deviceMetrics.metrics.size());
402 MetricInfo info = deviceMetrics.metrics[index];
403 changed |= deviceMetrics.changed[index];
404 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
405 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
406 auto value = (int64_t)data[(info.pos - 1) % data.size()];
407 totalMessagesDestroyed += value;
408 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
409 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
410 }
411 {
412 size_t index = indices.timeframesRead;
413 assert(index < deviceMetrics.metrics.size());
414 changed |= deviceMetrics.changed[index];
415 MetricInfo info = deviceMetrics.metrics[index];
416 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
417 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
418 auto value = (int64_t)data[(info.pos - 1) % data.size()];
419 totalTimeframesRead += value;
420 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
421 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
422 }
423 {
424 size_t index = indices.timeframesConsumed;
425 assert(index < deviceMetrics.metrics.size());
426 changed |= deviceMetrics.changed[index];
427 MetricInfo info = deviceMetrics.metrics[index];
428 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
429 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
430 auto value = (int64_t)data[(info.pos - 1) % data.size()];
431 totalTimeframesConsumed += value;
432 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
433 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
434 }
435 {
436 size_t index = indices.timeframesExpired;
437 assert(index < deviceMetrics.metrics.size());
438 changed |= deviceMetrics.changed[index];
439 MetricInfo info = deviceMetrics.metrics[index];
440 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
441 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
442 auto value = (int64_t)data[(info.pos - 1) % data.size()];
443 totalTimeframesExpired += value;
444 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
445 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
446 }
447 processTimeslices(indices.timeslicesStarted, deviceMetrics, changed, totalTimeslicesStarted, lastTimestamp);
448 processTimeslices(indices.timeslicesExpired, deviceMetrics, changed, totalTimeslicesExpired, lastTimestamp);
449 processTimeslices(indices.timeslicesDone, deviceMetrics, changed, totalTimeslicesDone, lastTimestamp);
450 }
451 static uint64_t unchangedCount = 0;
452 if (changed) {
453 totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp);
454 totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp);
455 totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp);
456 shmOfferConsumedMetric(driverMetrics, shmOfferBytesConsumed, timestamp);
457 totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp);
458 totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
459 totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
460 totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
461 totalTimeframesInFlyMetric(driverMetrics, (int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
462 totalTimeslicesStartedMetric(driverMetrics, totalTimeslicesStarted, timestamp);
463 totalTimeslicesExpiredMetric(driverMetrics, totalTimeslicesExpired, timestamp);
464 totalTimeslicesDoneMetric(driverMetrics, totalTimeslicesDone, timestamp);
465 totalTimeslicesInFlyMetric(driverMetrics, (int)(totalTimeslicesStarted - totalTimeslicesDone), timestamp);
466 totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
467 } else {
468 unchangedCount++;
469 }
470 changedCountMetric(driverMetrics, unchangedCount, timestamp);
471
472 static const ResourceSpec shmResourceSpec{
473 .name = "shared memory",
474 .unit = "MB",
475 .api = "/shm-offer {}",
476 .maxAvailable = (int64_t)registry.get<RateLimitConfig>().maxMemory,
477 .maxQuantum = 100,
478 .minQuantum = 50,
479 .metricOfferScaleFactor = 1000000,
480 };
481 static const ResourceSpec timesliceResourceSpec{
482 .name = "timeslice",
483 .unit = "timeslices",
484 .api = "/timeslice-offer {}",
485 .maxAvailable = (int64_t)registry.get<RateLimitConfig>().maxTimeframes,
486 .maxQuantum = 1,
487 .minQuantum = 1,
488 .metricOfferScaleFactor = 1,
489 };
490 static ResourceState shmResourceState{
491 .available = shmResourceSpec.maxAvailable,
492 };
493 static ResourceState timesliceResourceState{
494 .available = timesliceResourceSpec.maxAvailable,
495 };
496 static ResourceStats shmResourceStats{
497 .enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
498 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
499 };
500 static ResourceStats timesliceResourceStats{
501 .enoughCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 1 : 0,
502 .lowCount = shmResourceState.available - shmResourceSpec.minQuantum > 0 ? 0 : 1
503 };
504
505 offerResources(timesliceResourceState, timesliceResourceSpec, timesliceResourceStats,
506 specs, infos, manager, totalTimeframesConsumed, totalTimeslicesExpired,
507 totalTimeslicesStarted, totalTimeslicesDone, timestamp, driverMetrics,
508 availableTimeslicesMetric, unusedOfferedTimeslicesMetric, offeredTimeslicesMetric,
509 (void*)&sm);
510
511 offerResources(shmResourceState, shmResourceSpec, shmResourceStats,
512 specs, infos, manager, shmOfferBytesConsumed, totalBytesExpired,
513 totalBytesCreated, totalBytesDestroyed, timestamp, driverMetrics,
514 availableSharedMemoryMetric, unusedOfferedSharedMemoryMetric, offeredSharedMemoryMetric,
515 (void*)&sm); },
516 .postDispatching = [](ProcessingContext& ctx, void* service) {
518 auto* arrow = reinterpret_cast<ArrowContext*>(service);
519 auto totalBytes = 0;
520 auto totalMessages = 0;
521 O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, &arrow);
522 for (auto& input : ctx.inputs()) {
523 if (input.header == nullptr) {
524 continue;
525 }
526 auto const* dh = DataRefUtils::getHeader<DataHeader*>(input);
527 auto payloadSize = DataRefUtils::getPayloadSize(input);
528 if (dh->serialization != o2::header::gSerializationMethodArrow) {
529 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
530 "Message %{public}.4s/%{public}.16s is not of kind arrow, therefore we are not accounting its shared memory.",
531 dh->dataOrigin.str, dh->dataDescription.str);
532 continue;
533 }
534 bool forwarded = false;
535 for (auto const& forward : ctx.services().get<DeviceSpec const>().forwards) {
536 if (DataSpecUtils::match(forward.matcher, *dh)) {
537 forwarded = true;
538 break;
539 }
540 }
541 if (forwarded) {
542 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
543 "Message %{public}.4s/%{public}.16s is forwarded so we are not returning its memory.",
544 dh->dataOrigin.str, dh->dataDescription.str);
545 continue;
546 }
547 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
548 "Message %{public}.4s/%{public}.16s is being deleted. We will return %{bytes}f MB.",
549 dh->dataOrigin.str, dh->dataDescription.str, payloadSize / 1000000.);
550 totalBytes += payloadSize;
551 totalMessages += 1;
552 }
553 arrow->updateBytesDestroyed(totalBytes);
554 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "give back",
555 "%{bytes}f MB bytes being given back to reader, totaling %{bytes}f MB",
556 totalBytes / 1000000., arrow->bytesDestroyed() / 1000000.);
557 arrow->updateMessagesDestroyed(totalMessages);
558 auto& stats = ctx.services().get<DataProcessingStats>();
559 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_DESTROYED), DataProcessingStats::Op::Set, static_cast<int64_t>(arrow->bytesDestroyed())});
560 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_DESTROYED), DataProcessingStats::Op::Set, static_cast<int64_t>(arrow->messagesDestroyed())});
561 stats.processCommandQueue(); },
562 .driverInit = [](ServiceRegistryRef registry, DeviceConfig const& dc) {
563 auto config = new RateLimitConfig{};
564 int readers = std::stoll(dc.options["readers"].as<std::string>());
565 if (dc.options.count("aod-memory-rate-limit") && dc.options["aod-memory-rate-limit"].defaulted() == false) {
566 config->maxMemory = std::stoll(dc.options["aod-memory-rate-limit"].as<std::string>()) / 1000000;
567 } else {
568 config->maxMemory = readers * 2000;
569 }
570 if (dc.options.count("timeframes-rate-limit") && dc.options["timeframes-rate-limit"].defaulted() == false) {
571 config->maxTimeframes = std::stoll(dc.options["timeframes-rate-limit"].as<std::string>());
572 } else {
573 config->maxTimeframes = readers * DefaultsHelpers::pipelineLength();
574 }
575 static bool once = false;
576 // Until we guarantee this is called only once...
577 if (!once) {
578 O2_SIGNPOST_ID_GENERATE(sid, rate_limiting);
579 O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "setup",
580 "Rate limiting set up at %{bytes}llu MB and %llu timeframes distributed over %d readers",
581 config->maxMemory, config->maxTimeframes, readers);
582 registry.registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
583 once = true;
584 } },
585 .adjustTopology = [](WorkflowSpecNode& node, ConfigContext const& ctx) {
586 auto& workflow = node.specs;
587 auto spawner = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-spawner"; });
588 auto analysisCCDB = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-ccdb"; });
589 auto builder = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-index-builder"; });
590 auto reader = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-reader"; });
591 auto writer = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-writer"; });
592 auto& dec = ctx.services().get<DanglingEdgesContext>();
593 dec.requestedAODs.clear();
594 dec.requestedDYNs.clear();
595 dec.providedDYNs.clear();
596 dec.providedTIMs.clear();
597 dec.requestedTIMs.clear();
598
599 auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
600 auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
601
602 if (builder != workflow.end()) {
603 // collect currently requested IDXs
604 dec.requestedIDXs.clear();
605 for (auto& d : workflow | views::exclude_by_name(builder->name)) {
606 d.inputs |
607 views::partial_match_filter(header::DataOrigin{"IDX"}) |
608 sinks::update_input_list{dec.requestedIDXs};
609 }
610 // recreate inputs and outputs
611 builder->inputs.clear();
612 builder->outputs.clear();
613
614 // load real AlgorithmSpec before deployment
615 builder->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "IndexTableBuilder", ctx);
616 AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.requestedIDXs, dec.requestedAODs, dec.requestedDYNs, *builder);
617 }
618
619 if (spawner != workflow.end()) {
620 // collect currently requested DYNs
621 for (auto& d : workflow | views::exclude_by_name(spawner->name)) {
622 d.inputs |
623 views::partial_match_filter(header::DataOrigin{"DYN"}) |
624 sinks::update_input_list{dec.requestedDYNs};
625 d.outputs |
626 views::partial_match_filter(header::DataOrigin{"DYN"}) |
627 sinks::append_to{dec.providedDYNs};
628 }
629 std::sort(dec.requestedDYNs.begin(), dec.requestedDYNs.end(), inputSpecLessThan);
630 std::sort(dec.providedDYNs.begin(), dec.providedDYNs.end(), outputSpecLessThan);
631 dec.spawnerInputs.clear();
632 dec.requestedDYNs |
633 views::filter_not_matching(dec.providedDYNs) |
634 sinks::append_to{dec.spawnerInputs};
635 // recreate inputs and outputs
636 spawner->outputs.clear();
637 spawner->inputs.clear();
638
639 // load real AlgorithmSpec before deployment
640 spawner->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkOnDemandTablesSupport", "ExtendedTableSpawner", ctx);
641 AnalysisSupportHelpers::addMissingOutputsToSpawner({}, dec.spawnerInputs, dec.requestedAODs, *spawner);
642 }
643
644 if (analysisCCDB != workflow.end()) {
645 for (auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
646 d.inputs | views::partial_match_filter(header::DataOrigin{"ATIM"}) | sinks::update_input_list{dec.requestedTIMs};
647 d.outputs | views::partial_match_filter(header::DataOrigin{"ATIM"}) | sinks::append_to{dec.providedTIMs};
648 }
649 std::sort(dec.requestedTIMs.begin(), dec.requestedTIMs.end(), inputSpecLessThan);
650 std::sort(dec.providedTIMs.begin(), dec.providedTIMs.end(), outputSpecLessThan);
651 // Use ranges::to<std::vector<>> in C++23...
652 dec.analysisCCDBInputs.clear();
653 dec.requestedTIMs | views::filter_not_matching(dec.providedTIMs) | sinks::append_to{dec.analysisCCDBInputs};
654
655 // recreate inputs and outputs
656 analysisCCDB->outputs.clear();
657 analysisCCDB->inputs.clear();
658 // load real AlgorithmSpec before deployment
659 // FIXME how can I make the lookup depend on DYN tables as well??
660 analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
661 AnalysisSupportHelpers::addMissingOutputsToBuilder(dec.analysisCCDBInputs, dec.requestedAODs, dec.requestedDYNs, *analysisCCDB);
662 }
663
664 if (writer != workflow.end()) {
665 workflow.erase(writer);
666 }
667
668 if (reader != workflow.end()) {
669 // If reader and/or builder were adjusted, remove unneeded outputs
670 // update currently requested AODs
671 for (auto& d : workflow) {
672 d.inputs |
673 views::partial_match_filter(AODOrigins) |
674 sinks::update_input_list{dec.requestedAODs};
675 }
676
677 // remove unmatched outputs
678 auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](OutputSpec const& o) {
679 return !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFNumber"}) && !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFFilename"}) && std::none_of(dec.requestedAODs.begin(), dec.requestedAODs.end(), [&](InputSpec const& i) { return DataSpecUtils::match(i, o); });
680 });
681 reader->outputs.erase(o_end, reader->outputs.end());
682 if (reader->outputs.empty()) {
683 // nothing to read
684 workflow.erase(reader);
685 } else {
686 // load reader algorithm before deployment
687 auto&& algo = PluginManager::loadAlgorithmFromPlugin("O2FrameworkAnalysisSupport", "ROOTFileReader", ctx);
689 }
690 }
691
692 // replace writer as some outputs may have become dangling and some are now consumed
693 auto [outputsInputs, isDangling] = WorkflowHelpers::analyzeOutputs(workflow);
694
695 // create DataOutputDescriptor
696 std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
697
698 // select outputs of type AOD which need to be saved
699 // ATTENTION: if there are dangling outputs the getGlobalAODSink
700 // has to be created in any case!
701 dec.outputsInputsAOD.clear();
702
703 for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
704 if (DataSpecUtils::partialMatch(outputsInputs[ii], extendedAODOrigins)) {
705 auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]);
706 if (!ds.empty() || isDangling[ii]) {
707 dec.outputsInputsAOD.emplace_back(outputsInputs[ii]);
708 }
709 }
710 }
711
712 // file sink for any AOD output
713 if (!dec.outputsInputsAOD.empty()) {
714 // add TFNumber and TFFilename as input to the writer
715 dec.outputsInputsAOD.emplace_back("tfn", "TFN", "TFNumber");
716 dec.outputsInputsAOD.emplace_back("tff", "TFF", "TFFilename");
717 workflow.push_back(AnalysisSupportHelpers::getGlobalAODSink(ctx));
718 }
719 // Move the dummy sink at the end, if needed
720 for (size_t i = 0; i < workflow.size(); ++i) {
721 if (workflow[i].name == "internal-dpl-injected-dummy-sink") {
722 workflow.push_back(workflow[i]);
723 workflow.erase(workflow.begin() + i);
724 break;
725 }
726 } },
727 .kind = ServiceKind::Global};
728}
729
731{
732 return ServiceSpec{
733 .name = "arrow-slicing-cache-def",
734 .uniqueId = CommonServices::simpleServiceId<ArrowTableSlicingCacheDef>(),
735 .init = CommonServices::simpleServiceInit<ArrowTableSlicingCacheDef, ArrowTableSlicingCacheDef, ServiceKind::Global>(),
736 .kind = ServiceKind::Global};
737}
738
740{
741 return ServiceSpec{
742 .name = "arrow-slicing-cache",
743 .uniqueId = CommonServices::simpleServiceId<ArrowTableSlicingCache>(),
744 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions&) { return ServiceHandle{TypeIdHelpers::uniqueId<ArrowTableSlicingCache>(),
745 new ArrowTableSlicingCache(Cache{services.get<ArrowTableSlicingCacheDef>().bindingsKeys},
746 Cache{services.get<ArrowTableSlicingCacheDef>().bindingsKeysUnsorted}),
748 .configure = CommonServices::noConfiguration(),
749 .preProcessing = [](ProcessingContext& pc, void* service_ptr) {
750 auto* service = static_cast<ArrowTableSlicingCache*>(service_ptr);
751 auto& caches = service->bindingsKeys;
752 for (auto i = 0u; i < caches.size(); ++i) {
753 if (caches[i].enabled && pc.inputs().getPos(caches[i].binding.c_str()) >= 0) {
754 auto status = service->updateCacheEntry(i, pc.inputs().get<TableConsumer>(caches[i].binding.c_str())->asArrowTable());
755 if (!status.ok()) {
756 throw runtime_error_f("Failed to update slice cache for %s/%s", caches[i].binding.c_str(), caches[i].key.c_str());
757 }
758 }
759 }
760 auto& unsortedCaches = service->bindingsKeysUnsorted;
761 for (auto i = 0u; i < unsortedCaches.size(); ++i) {
762 if (unsortedCaches[i].enabled && pc.inputs().getPos(unsortedCaches[i].binding.c_str()) >= 0) {
763 auto status = service->updateCacheEntryUnsorted(i, pc.inputs().get<TableConsumer>(unsortedCaches[i].binding.c_str())->asArrowTable());
764 if (!status.ok()) {
765 throw runtime_error_f("failed to update slice cache (unsorted) for %s/%s", unsortedCaches[i].binding.c_str(), unsortedCaches[i].key.c_str());
766 }
767 }
768 } },
769 .kind = ServiceKind::Stream};
770}
771
772} // namespace o2::framework
std::string binding
std::unique_ptr< expressions::Node > node
int32_t i
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:531
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
int getPos(const char *name) const
decltype(auto) get(R binding, int part=0) const
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
std::shared_ptr< arrow::Table > asArrowTable()
Return the table in the message as a arrow::Table instance.
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum GLenum GLsizei const GLuint GLboolean enabled
Definition glcorearb.h:2513
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLboolean * data
Definition glcorearb.h:298
GLsizei GLenum const void * indices
Definition glcorearb.h:400
Defining PrimaryVertex explicitly as messageable.
auto spawner(std::shared_ptr< arrow::Table > const &fullTable, const char *name, o2::framework::expressions::Projector *projectors, std::shared_ptr< gandiva::Projector > &projector, std::shared_ptr< arrow::Schema > const &schema)
Expression-based column generator to materialize columns.
auto offerResources(ResourceState &resourceState, ResourceSpec const &resourceSpec, ResourceStats &resourceStats, std::vector< DeviceSpec > const &specs, std::vector< DeviceInfo > const &infos, DevicesManager &manager, int64_t offerConsumedCurrentValue, int64_t offerExpiredCurrentValue, int64_t acquiredResourceCurrentValue, int64_t disposedResourceCurrentValue, size_t timestamp, DeviceMetricsInfo &driverMetrics, std::function< void(DeviceMetricsInfo &, int value, size_t timestamp)> &availableResourceMetric, std::function< void(DeviceMetricsInfo &, int value, size_t timestamp)> &unusedOfferedResourceMetric, std::function< void(DeviceMetricsInfo &, int value, size_t timestamp)> &offeredResourceMetric, void *signpostId) -> void
std::vector< Entry > Cache
std::vector< MetricIndices > createDefaultIndices(std::vector< DeviceMetricsInfo > &allDevicesMetrics)
RuntimeErrorRef runtime_error_f(const char *,...)
constexpr o2::header::SerializationMethod gSerializationMethodArrow
Definition DataHeader.h:331
static void addMissingOutputsToBuilder(std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static std::shared_ptr< DataOutputDirector > getDataOutputDirector(ConfigContext const &ctx)
Get the data director.
static void addMissingOutputsToSpawner(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, DataProcessorSpec &publisher)
static DataProcessorSpec getGlobalAODSink(ConfigContext const &)
writes inputs of kind AOD to file
static ServiceSpec arrowTableSlicingCacheSpec()
static ServiceSpec arrowBackendSpec()
static ServiceSpec arrowTableSlicingCacheDefSpec()
static AlgorithmSpec wrapWithTimesliceConsumption(AlgorithmSpec spec)
static ServiceConfigureCallback noConfiguration()
Helper struct to hold statistics about the data processing happening.
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
static unsigned int pipelineLength()
get max number of timeslices in the queue
Running state information of a given device.
Definition DeviceState.h:34
void queueMessage(char const *receiver, char const *msg)
static auto loadAlgorithmFromPlugin(std::string library, std::string plugin, ConfigContext const &context) -> AlgorithmSpec
int64_t maxAvailable
The callback to give resources to a device.
int64_t minQuantum
Largest offer which can be given.
int64_t maxQuantum
Maximum available quantity for a resource.
int64_t metricOfferScaleFactor
Smallest offer which can be given.
int64_t lowCount
How many times the resources were enough.
std::string name
Name of the service.
static std::tuple< std::vector< InputSpec >, std::vector< bool > > analyzeOutputs(WorkflowSpec const &workflow)
the main header struct
Definition DataHeader.h:619
o2::mch::DsIndex ds