Project
Loading...
Searching...
No Matches
ArrowSupport.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "ArrowSupport.h"
12
26#include "Framework/Tracing.h"
34#include "WorkflowHelpers.h"
39#include "Framework/Signpost.h"
40
42#include <Monitoring/Monitoring.h>
43#include "Headers/DataHeader.h"
45
46#include <RtypesCore.h>
47#include <fairmq/ProgOptions.h>
48
49#include <uv.h>
50#include <boost/program_options/variables_map.hpp>
51#include <csignal>
52
54
55namespace o2::framework
56{
57
58class EndOfStreamContext;
59class ProcessingContext;
60
61enum struct RateLimitingState {
62 UNKNOWN = 0, // No information received yet.
63 STARTED = 1, // Information received, new timeframe not requested.
64 CHANGED = 2, // Information received, new timeframe requested but not yet accounted.
65 BELOW_LIMIT = 3, // New metric received, we are below limit.
66 NEXT_ITERATION_FROM_BELOW = 4, // Iteration when previously in BELOW_LIMIT.
67 ABOVE_LIMIT = 5, // New metric received, we are above limit.
68 EMPTY = 6, //
69};
70
72 int64_t maxMemory = 2000;
73 int64_t maxTimeframes = 0;
74};
75
77 size_t arrowBytesCreated = -1;
81 size_t arrowBytesExpired = -1;
83 size_t timeframesRead = -1;
84 size_t timeframesConsumed = -1;
85};
86
87std::vector<MetricIndices> createDefaultIndices(std::vector<DeviceMetricsInfo>& allDevicesMetrics)
88{
89 std::vector<MetricIndices> results;
90
91 for (auto& info : allDevicesMetrics) {
93 indices.arrowBytesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-created");
94 indices.arrowBytesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-destroyed");
95 indices.arrowMessagesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-created");
96 indices.arrowMessagesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-destroyed");
97 indices.arrowBytesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-expired");
98 indices.shmOfferBytesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "shm-offer-bytes-consumed");
99 indices.timeframesRead = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "df-sent");
100 indices.timeframesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "consumed-timeframes");
101 results.push_back(indices);
102 }
103 return results;
104}
105
107{
108 return registry.get<RateLimitConfig>().maxMemory;
109}
110
112{
113 using o2::monitoring::Metric;
114 using o2::monitoring::Monitoring;
115 using o2::monitoring::tags::Key;
116 using o2::monitoring::tags::Value;
117
118 return ServiceSpec{
119 .name = "arrow-backend",
121 .configure = CommonServices::noConfiguration(),
126 .metricHandling = [](ServiceRegistryRef registry,
127 ServiceMetricsInfo const& sm,
128 size_t timestamp) {
129 int64_t totalBytesCreated = 0;
130 int64_t shmOfferBytesConsumed = 0;
131 int64_t totalBytesDestroyed = 0;
132 int64_t totalBytesExpired = 0;
133 int64_t totalMessagesCreated = 0;
134 int64_t totalMessagesDestroyed = 0;
135 int64_t totalTimeframesRead = 0;
136 int64_t totalTimeframesConsumed = 0;
137 auto &driverMetrics = sm.driverMetricsInfo;
138 auto &allDeviceMetrics = sm.deviceMetricsInfos;
139 auto &specs = sm.deviceSpecs;
140 auto &infos = sm.deviceInfos;
141 O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, &sm);
142
143 static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "rate-limit-state");
144 static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-created");
145 static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-shm-offer-bytes-consumed");
146 static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-shared-memory");
147 static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-shared-memory");
148 static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-shared-memory");
149 static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-destroyed");
150 static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-expired");
151 static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-created");
152 static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-destroyed");
153 static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-read");
154 static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-consumed");
155 static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-timeframes-in-fly");
156 static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "arrow-bytes-delta");
157 static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "changed-metrics-count");
158 static auto totalSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-reader-signals");
159 static auto signalLatencyMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-signal-latency");
160 static auto skippedSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-skipped-signals");
161 static auto remainingBytes = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-remaining-bytes");
162 auto& manager = registry.get<DevicesManager>();
163
164 bool changed = false;
165
166 size_t lastTimestamp = 0;
167 static std::vector<MetricIndices> allIndices = createDefaultIndices(allDeviceMetrics);
168 for (size_t mi = 0; mi < allDeviceMetrics.size(); ++mi) {
169 auto& deviceMetrics = allDeviceMetrics[mi];
170 if (deviceMetrics.changed.size() != deviceMetrics.metrics.size()) {
171 throw std::runtime_error("deviceMetrics.size() != allDeviceMetrics.size()");
172 }
173 auto& indices = allIndices[mi];
174 {
175 size_t index = indices.arrowBytesCreated;
176 assert(index < deviceMetrics.metrics.size());
177 changed |= deviceMetrics.changed[index];
178 MetricInfo info = deviceMetrics.metrics[index];
179 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
180 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
181 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
182 auto value = (int64_t)data[(info.pos - 1) % data.size()];
183 totalBytesCreated += value;
184 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
185 }
186 {
187 size_t index = indices.shmOfferBytesConsumed;
188 assert(index < deviceMetrics.metrics.size());
189 changed |= deviceMetrics.changed[index];
190 MetricInfo info = deviceMetrics.metrics[index];
191 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
192 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
193 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
194 auto value = (int64_t)data[(info.pos - 1) % data.size()];
195 shmOfferBytesConsumed += value;
196 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
197 }
198 {
199 size_t index = indices.arrowBytesDestroyed;
200 assert(index < deviceMetrics.metrics.size());
201 changed |= deviceMetrics.changed[index];
202 MetricInfo info = deviceMetrics.metrics[index];
203 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
204 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
205 auto value = (int64_t)data[(info.pos - 1) % data.size()];
206 totalBytesDestroyed += value;
207 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
208 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
209 }
210 {
211 size_t index = indices.arrowBytesExpired;
212 assert(index < deviceMetrics.metrics.size());
213 changed |= deviceMetrics.changed[index];
214 MetricInfo info = deviceMetrics.metrics[index];
215 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
216 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
217 auto value = (int64_t)data[(info.pos - 1) % data.size()];
218 totalBytesExpired += value;
219 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
220 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
221 }
222 {
223 size_t index = indices.arrowMessagesCreated;
224 assert(index < deviceMetrics.metrics.size());
225 MetricInfo info = deviceMetrics.metrics[index];
226 changed |= deviceMetrics.changed[index];
227 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
228 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
229 auto value = (int64_t)data[(info.pos - 1) % data.size()];
230 totalMessagesCreated += value;
231 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
232 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
233 }
234 {
235 size_t index = indices.arrowMessagesDestroyed;
236 assert(index < deviceMetrics.metrics.size());
237 MetricInfo info = deviceMetrics.metrics[index];
238 changed |= deviceMetrics.changed[index];
239 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
240 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
241 auto value = (int64_t)data[(info.pos - 1) % data.size()];
242 totalMessagesDestroyed += value;
243 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
244 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
245 }
246 {
247 size_t index = indices.timeframesRead;
248 assert(index < deviceMetrics.metrics.size());
249 changed |= deviceMetrics.changed[index];
250 MetricInfo info = deviceMetrics.metrics[index];
251 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
252 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
253 auto value = (int64_t)data[(info.pos - 1) % data.size()];
254 totalTimeframesRead += value;
255 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
256 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
257 }
258 {
259 size_t index = indices.timeframesConsumed;
260 assert(index < deviceMetrics.metrics.size());
261 changed |= deviceMetrics.changed[index];
262 MetricInfo info = deviceMetrics.metrics[index];
263 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
264 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
265 auto value = (int64_t)data[(info.pos - 1) % data.size()];
266 totalTimeframesConsumed += value;
267 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
268 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
269 }
270 }
271 static uint64_t unchangedCount = 0;
272 if (changed) {
273 totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp);
274 totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp);
275 totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp);
276 shmOfferConsumedMetric(driverMetrics, shmOfferBytesConsumed, timestamp);
277 totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp);
278 totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
279 totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
280 totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
281 totalTimeframesInFlyMetric(driverMetrics, (int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
282 totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
283 } else {
284 unchangedCount++;
285 }
286 changedCountMetric(driverMetrics, unchangedCount, timestamp);
287 auto maxTimeframes = registry.get<RateLimitConfig>().maxTimeframes;
288 if (maxTimeframes && (totalTimeframesRead - totalTimeframesConsumed) > maxTimeframes) {
289 return;
290 }
291
292 static int64_t MAX_SHARED_MEMORY = calculateAvailableSharedMemory(registry);
293 constexpr int64_t MAX_QUANTUM_SHARED_MEMORY = 100;
294 constexpr int64_t MIN_QUANTUM_SHARED_MEMORY = 50;
295
296 static int64_t availableSharedMemory = MAX_SHARED_MEMORY;
297 static int64_t offeredSharedMemory = 0;
298 static int64_t lastDeviceOffered = 0;
301 int64_t lastCandidate = -1;
302 static int enoughSharedMemoryCount = availableSharedMemory - MIN_QUANTUM_SHARED_MEMORY > 0 ? 1 : 0;
303 static int lowSharedMemoryCount = availableSharedMemory - MIN_QUANTUM_SHARED_MEMORY > 0 ? 0 : 1;
304 int64_t possibleOffer = MIN_QUANTUM_SHARED_MEMORY;
305 for (size_t di = 0; di < specs.size(); di++) {
306 if (availableSharedMemory < possibleOffer) {
307 if (lowSharedMemoryCount == 0) {
308 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "not enough",
309 "We do not have enough shared memory (%{bytes}llu MB) to offer %{bytes}llu MB. Total offerings %{bytes}llu",
310 availableSharedMemory, possibleOffer, offeredSharedMemory);
311 }
312 lowSharedMemoryCount++;
313 enoughSharedMemoryCount = 0;
314 break;
315 } else {
316 if (enoughSharedMemoryCount == 0) {
317 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "enough",
318 "We are back in a state where we enough shared memory: %{bytes}llu MB", availableSharedMemory);
319 }
320 enoughSharedMemoryCount++;
321 lowSharedMemoryCount = 0;
322 }
323 size_t candidate = (lastDeviceOffered + di) % specs.size();
324
325 auto& info = infos[candidate];
326 // Do not bother for inactive devices
327 // FIXME: there is probably a race condition if the device died and we did not
328 // took notice yet...
329 if (info.active == false || info.readyToQuit) {
330 continue;
331 }
332 if (specs[candidate].name != "internal-dpl-aod-reader") {
333 continue;
334 }
335 possibleOffer = std::min(MAX_QUANTUM_SHARED_MEMORY, availableSharedMemory);
336 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
337 "Offering %{bytes}llu MB out of %{bytes}llu to %{public}s",
338 possibleOffer, availableSharedMemory, specs[candidate].id.c_str());
339 manager.queueMessage(specs[candidate].id.c_str(), fmt::format("/shm-offer {}", possibleOffer).data());
340 availableSharedMemory -= possibleOffer;
341 offeredSharedMemory += possibleOffer;
342 lastCandidate = candidate;
343 }
344 // We had at least a valid candidate, so
345 // next time we offer to the next device.
346 if (lastCandidate >= 0) {
347 lastDeviceOffered = lastCandidate + 1;
348 }
349
350 // unusedOfferedSharedMemory is the amount of memory which was offered and which we know it was
351 // not used so far. So we need to account for the amount which got actually read (readerBytesCreated)
352 // and the amount which we know was given back.
353 static int64_t lastShmOfferConsumed = 0;
354 static int64_t lastUnusedOfferedMemory = 0;
355 if (shmOfferBytesConsumed != lastShmOfferConsumed) {
356 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
357 "Offer consumed so far %{bytes}llu", shmOfferBytesConsumed);
358 lastShmOfferConsumed = shmOfferBytesConsumed;
359 }
360 int unusedOfferedMemory = (offeredSharedMemory - (totalBytesExpired + shmOfferBytesConsumed) / 1000000);
361 if (lastUnusedOfferedMemory != unusedOfferedMemory) {
362 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
363 "unusedOfferedMemory:%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / 1000000",
364 unusedOfferedMemory, offeredSharedMemory, totalBytesExpired / 1000000, shmOfferBytesConsumed / 1000000);
365 lastUnusedOfferedMemory = unusedOfferedMemory;
366 }
367 // availableSharedMemory is the amount of memory which we know is available to be offered.
368 // We subtract the amount which we know was already offered but it's unused and we then balance how
369 // much was created with how much was destroyed.
370 availableSharedMemory = MAX_SHARED_MEMORY + ((totalBytesDestroyed - totalBytesCreated) / 1000000) - unusedOfferedMemory;
371 availableSharedMemoryMetric(driverMetrics, availableSharedMemory, timestamp);
372 unusedOfferedSharedMemoryMetric(driverMetrics, unusedOfferedMemory, timestamp);
373
374 offeredSharedMemoryMetric(driverMetrics, offeredSharedMemory, timestamp); },
375 .postDispatching = [](ProcessingContext& ctx, void* service) {
377 auto* arrow = reinterpret_cast<ArrowContext*>(service);
378 auto totalBytes = 0;
379 auto totalMessages = 0;
380 O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, &arrow);
381 for (auto& input : ctx.inputs()) {
382 if (input.header == nullptr) {
383 continue;
384 }
385 auto const* dh = DataRefUtils::getHeader<DataHeader*>(input);
386 auto payloadSize = DataRefUtils::getPayloadSize(input);
387 if (dh->serialization != o2::header::gSerializationMethodArrow) {
388 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
389 "Message %{public}.4s/%{public}.16s is not of kind arrow, therefore we are not accounting its shared memory.",
390 dh->dataOrigin.str, dh->dataDescription.str);
391 continue;
392 }
393 bool forwarded = false;
394 for (auto const& forward : ctx.services().get<DeviceSpec const>().forwards) {
395 if (DataSpecUtils::match(forward.matcher, *dh)) {
396 forwarded = true;
397 break;
398 }
399 }
400 if (forwarded) {
401 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
402 "Message %{public}.4s/%{public}16.s is forwarded so we are not returning its memory.",
403 dh->dataOrigin.str, dh->dataDescription.str);
404 continue;
405 }
406 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
407 "Message %{public}.4s/%{public}.16s is being deleted. We will return %{bytes}f MB.",
408 dh->dataOrigin.str, dh->dataDescription.str, payloadSize / 1000000.);
409 totalBytes += payloadSize;
410 totalMessages += 1;
411 }
412 arrow->updateBytesDestroyed(totalBytes);
413 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "give back",
414 "%{bytes}f MB bytes being given back to reader, totaling %{bytes}f MB",
415 totalBytes / 1000000., arrow->bytesDestroyed() / 1000000.);
416 arrow->updateMessagesDestroyed(totalMessages);
417 auto& stats = ctx.services().get<DataProcessingStats>();
418 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_DESTROYED), DataProcessingStats::Op::Set, static_cast<int64_t>(arrow->bytesDestroyed())});
419 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_DESTROYED), DataProcessingStats::Op::Set, static_cast<int64_t>(arrow->messagesDestroyed())});
420 stats.processCommandQueue(); },
421 .driverInit = [](ServiceRegistryRef registry, DeviceConfig const& dc) {
422 auto config = new RateLimitConfig{};
423 int readers = std::stoll(dc.options["readers"].as<std::string>());
424 if (dc.options.count("aod-memory-rate-limit") && dc.options["aod-memory-rate-limit"].defaulted() == false) {
425 config->maxMemory = std::stoll(dc.options["aod-memory-rate-limit"].as<std::string>()) / 1000000;
426 } else {
427 config->maxMemory = readers * 500;
428 }
429 if (dc.options.count("timeframes-rate-limit") && dc.options["timeframes-rate-limit"].as<std::string>() == "readers") {
430 config->maxTimeframes = readers;
431 } else {
432 config->maxTimeframes = std::stoll(dc.options["timeframes-rate-limit"].as<std::string>());
433 }
434 static bool once = false;
435 // Until we guarantee this is called only once...
436 if (!once) {
437 O2_SIGNPOST_ID_GENERATE(sid, rate_limiting);
438 O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "setup",
439 "Rate limiting set up at %{bytes}llu MB distributed over %d readers",
440 config->maxMemory, readers);
441 registry.registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
442 once = true;
443 } },
444 .adjustTopology = [](WorkflowSpecNode& node, ConfigContext const& ctx) {
445 auto& workflow = node.specs;
446 auto spawner = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-spawner"; });
447 auto analysisCCDB = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-ccdb"; });
448 auto builder = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-index-builder"; });
449 auto reader = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-reader"; });
450 auto writer = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-writer"; });
451 auto &ac = ctx.services().get<AnalysisContext>();
452 ac.requestedAODs.clear();
453 ac.requestedDYNs.clear();
454 ac.providedDYNs.clear();
455 ac.providedTIMs.clear();
456 ac.requestedTIMs.clear();
457
458
459 auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
460 auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
461
462 if (builder != workflow.end()) {
463 // collect currently requested IDXs
464 ac.requestedIDXs.clear();
465 for (auto& d : workflow) {
466 if (d.name == builder->name) {
467 continue;
468 }
469 for (auto& i : d.inputs) {
471 auto copy = i;
472 DataSpecUtils::updateInputList(ac.requestedIDXs, std::move(copy));
473 }
474 }
475 }
476 // recreate inputs and outputs
477 builder->inputs.clear();
478 builder->outputs.clear();
479 // replace AlgorithmSpec
480 // FIXME: it should be made more generic, so it does not need replacement...
481 builder->algorithm = readers::AODReaderHelpers::indexBuilderCallback(ac.requestedIDXs);
482 AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.requestedIDXs, ac.requestedAODs, ac.requestedDYNs, *builder);
483 }
484
485 if (spawner != workflow.end()) {
486 // collect currently requested DYNs
487 for (auto& d : workflow) {
488 if (d.name == spawner->name) {
489 continue;
490 }
491 for (auto const& i : d.inputs) {
493 auto copy = i;
494 DataSpecUtils::updateInputList(ac.requestedDYNs, std::move(copy));
495 }
496 }
497 for (auto const& o : d.outputs) {
499 ac.providedDYNs.emplace_back(o);
500 }
501 }
502 }
503 std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
504 std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
505 ac.spawnerInputs.clear();
506 for (auto& input : ac.requestedDYNs) {
507 if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) {
508 ac.spawnerInputs.emplace_back(input);
509 }
510 }
511 // recreate inputs and outputs
512 spawner->outputs.clear();
513 spawner->inputs.clear();
514 // replace AlgorithmSpec
515 // FIXME: it should be made more generic, so it does not need replacement...
516 spawner->algorithm = readers::AODReaderHelpers::aodSpawnerCallback(ac.spawnerInputs);
517 AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, *spawner);
518 }
519
520 if (analysisCCDB != workflow.end()) {
521 for (auto& d : workflow | views::exclude_by_name(analysisCCDB->name)) {
522 d.inputs | views::partial_match_filter(header::DataOrigin{"ATIM"}) | sinks::update_input_list{ac.requestedTIMs};
523 d.outputs | views::partial_match_filter(header::DataOrigin{"ATIM"}) | sinks::append_to{ac.providedTIMs};
524 }
525 std::sort(ac.requestedTIMs.begin(), ac.requestedTIMs.end(), inputSpecLessThan);
526 std::sort(ac.providedTIMs.begin(), ac.providedTIMs.end(), outputSpecLessThan);
527 // Use ranges::to<std::vector<>> in C++23...
528 ac.analysisCCDBInputs.clear();
529 ac.requestedTIMs | views::filter_not_matching(ac.providedTIMs) | sinks::append_to{ac.analysisCCDBInputs};
530
531 // recreate inputs and outputs
532 analysisCCDB->outputs.clear();
533 analysisCCDB->inputs.clear();
534 // replace AlgorithmSpec
535 // FIXME: it should be made more generic, so it does not need replacement...
536 // FIXME how can I make the lookup depend on DYN tables as well??
537 analysisCCDB->algorithm = PluginManager::loadAlgorithmFromPlugin("O2FrameworkCCDBSupport", "AnalysisCCDBFetcherPlugin", ctx);
538 AnalysisSupportHelpers::addMissingOutputsToAnalysisCCDBFetcher({}, ac.analysisCCDBInputs, ac.requestedAODs, ac.requestedDYNs, *analysisCCDB);
539 }
540
541 if (writer != workflow.end()) {
542 workflow.erase(writer);
543 }
544
545 if (reader != workflow.end()) {
546 // If reader and/or builder were adjusted, remove unneeded outputs
547 // update currently requested AODs
548 for (auto& d : workflow) {
549 for (auto const& i : d.inputs) {
550 if (DataSpecUtils::partialMatch(i, AODOrigins)) {
551 auto copy = i;
552 DataSpecUtils::updateInputList(ac.requestedAODs, std::move(copy));
553 }
554 }
555 }
556
557 // remove unmatched outputs
558 auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](OutputSpec const& o) {
559 return !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFNumber"}) && !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFFilename"}) && std::none_of(ac.requestedAODs.begin(), ac.requestedAODs.end(), [&](InputSpec const& i) { return DataSpecUtils::match(i, o); });
560 });
561 reader->outputs.erase(o_end, reader->outputs.end());
562 if (reader->outputs.empty()) {
563 // nothing to read
564 workflow.erase(reader);
565 }
566 }
567
568
569
570 // replace writer as some outputs may have become dangling and some are now consumed
571 auto [outputsInputs, isDangling] = WorkflowHelpers::analyzeOutputs(workflow);
572
573 // create DataOutputDescriptor
574 std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
575
576 // select outputs of type AOD which need to be saved
577 // ATTENTION: if there are dangling outputs the getGlobalAODSink
578 // has to be created in any case!
579 ac.outputsInputsAOD.clear();
580
581 for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
582 if (DataSpecUtils::partialMatch(outputsInputs[ii], extendedAODOrigins)) {
583 auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]);
584 if (!ds.empty() || isDangling[ii]) {
585 ac.outputsInputsAOD.emplace_back(outputsInputs[ii]);
586 }
587 }
588 }
589
590 // file sink for any AOD output
591 if (!ac.outputsInputsAOD.empty()) {
592 // add TFNumber and TFFilename as input to the writer
593 ac.outputsInputsAOD.emplace_back("tfn", "TFN", "TFNumber");
594 ac.outputsInputsAOD.emplace_back("tff", "TFF", "TFFilename");
595 workflow.push_back(AnalysisSupportHelpers::getGlobalAODSink(ctx));
596 }
597 // Move the dummy sink at the end, if needed
598 for (size_t i = 0; i < workflow.size(); ++i) {
599 if (workflow[i].name == "internal-dpl-injected-dummy-sink") {
600 workflow.push_back(workflow[i]);
601 workflow.erase(workflow.begin() + i);
602 break;
603 }
604 } },
605 .kind = ServiceKind::Global};
606}
607
609{
610 return ServiceSpec{
611 .name = "arrow-slicing-cache-def",
612 .uniqueId = CommonServices::simpleServiceId<ArrowTableSlicingCacheDef>(),
613 .init = CommonServices::simpleServiceInit<ArrowTableSlicingCacheDef, ArrowTableSlicingCacheDef, ServiceKind::Global>(),
614 .kind = ServiceKind::Global};
615}
616
618{
619 return ServiceSpec{
620 .name = "arrow-slicing-cache",
621 .uniqueId = CommonServices::simpleServiceId<ArrowTableSlicingCache>(),
622 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions&) { return ServiceHandle{TypeIdHelpers::uniqueId<ArrowTableSlicingCache>(),
623 new ArrowTableSlicingCache(Cache{services.get<ArrowTableSlicingCacheDef>().bindingsKeys},
624 Cache{services.get<ArrowTableSlicingCacheDef>().bindingsKeysUnsorted}),
626 .configure = CommonServices::noConfiguration(),
627 .preProcessing = [](ProcessingContext& pc, void* service_ptr) {
628 auto* service = static_cast<ArrowTableSlicingCache*>(service_ptr);
629 auto& caches = service->bindingsKeys;
630 for (auto i = 0u; i < caches.size(); ++i) {
631 if (caches[i].enabled && pc.inputs().getPos(caches[i].binding.c_str()) >= 0) {
632 auto status = service->updateCacheEntry(i, pc.inputs().get<TableConsumer>(caches[i].binding.c_str())->asArrowTable());
633 if (!status.ok()) {
634 throw runtime_error_f("Failed to update slice cache for %s/%s", caches[i].binding.c_str(), caches[i].key.c_str());
635 }
636 }
637 }
638 auto& unsortedCaches = service->bindingsKeysUnsorted;
639 for (auto i = 0u; i < unsortedCaches.size(); ++i) {
640 if (unsortedCaches[i].enabled && pc.inputs().getPos(unsortedCaches[i].binding.c_str()) >= 0) {
641 auto status = service->updateCacheEntryUnsorted(i, pc.inputs().get<TableConsumer>(unsortedCaches[i].binding.c_str())->asArrowTable());
642 if (!status.ok()) {
643 throw runtime_error_f("failed to update slice cache (unsorted) for %s/%s", unsortedCaches[i].binding.c_str(), unsortedCaches[i].key.c_str());
644 }
645 }
646 } },
647 .kind = ServiceKind::Stream};
648}
649
650} // namespace o2::framework
int32_t i
bool o
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:531
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
int getPos(const char *name) const
decltype(auto) get(R binding, int part=0) const
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
std::shared_ptr< arrow::Table > asArrowTable()
Return the table in the message as a arrow::Table instance.
GLint GLenum GLint x
Definition glcorearb.h:403
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum GLenum GLsizei const GLuint GLboolean enabled
Definition glcorearb.h:2513
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLboolean * data
Definition glcorearb.h:298
GLsizei GLenum const void * indices
Definition glcorearb.h:400
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
auto spawner(std::shared_ptr< arrow::Table > const &fullTable, const char *name, o2::framework::expressions::Projector *projectors, std::shared_ptr< gandiva::Projector > &projector, std::shared_ptr< arrow::Schema > const &schema)
Expression-based column generator to materialize columns.
std::vector< Entry > Cache
std::vector< MetricIndices > createDefaultIndices(std::vector< DeviceMetricsInfo > &allDevicesMetrics)
uint64_t calculateAvailableSharedMemory(ServiceRegistryRef registry)
RuntimeErrorRef runtime_error_f(const char *,...)
constexpr o2::header::SerializationMethod gSerializationMethodArrow
Definition DataHeader.h:331
std::vector< InputSpec > requestedAODs
static void addMissingOutputsToBuilder(std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static std::shared_ptr< DataOutputDirector > getDataOutputDirector(ConfigContext const &ctx)
Get the data director.
static void addMissingOutputsToSpawner(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, DataProcessorSpec &publisher)
static void addMissingOutputsToAnalysisCCDBFetcher(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static DataProcessorSpec getGlobalAODSink(ConfigContext const &)
writes inputs of kind AOD to file
static ServiceSpec arrowTableSlicingCacheSpec()
static ServiceSpec arrowBackendSpec()
static ServiceSpec arrowTableSlicingCacheDefSpec()
static ServiceConfigureCallback noConfiguration()
Helper struct to hold statistics about the data processing happening.
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
Running state information of a given device.
Definition DeviceState.h:34
static auto loadAlgorithmFromPlugin(std::string library, std::string plugin, ConfigContext const &context) -> AlgorithmSpec
std::string name
Name of the service.
static std::tuple< std::vector< InputSpec >, std::vector< bool > > analyzeOutputs(WorkflowSpec const &workflow)
std::vector< DataProcessorSpec > & specs
static AlgorithmSpec aodSpawnerCallback(std::vector< InputSpec > &requested)
static AlgorithmSpec indexBuilderCallback(std::vector< InputSpec > &requested)
the main header struct
Definition DataHeader.h:618
o2::mch::DsIndex ds