Project
Loading...
Searching...
No Matches
ArrowSupport.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "ArrowSupport.h"
12
24#include "Framework/Tracing.h"
31#include "WorkflowHelpers.h"
36#include "Framework/Signpost.h"
37
39#include <Monitoring/Monitoring.h>
40#include "Headers/DataHeader.h"
42
43#include <RtypesCore.h>
44#include <fairmq/ProgOptions.h>
45
46#include <uv.h>
47#include <boost/program_options/variables_map.hpp>
48#include <csignal>
49
51
52namespace o2::framework
53{
54
55class EndOfStreamContext;
56class ProcessingContext;
57
58enum struct RateLimitingState {
59 UNKNOWN = 0, // No information received yet.
60 STARTED = 1, // Information received, new timeframe not requested.
61 CHANGED = 2, // Information received, new timeframe requested but not yet accounted.
62 BELOW_LIMIT = 3, // New metric received, we are below limit.
63 NEXT_ITERATION_FROM_BELOW = 4, // Iteration when previously in BELOW_LIMIT.
64 ABOVE_LIMIT = 5, // New metric received, we are above limit.
65 EMPTY = 6, //
66};
67
69 int64_t maxMemory = 2000;
70 int64_t maxTimeframes = 0;
71};
72
74 size_t arrowBytesCreated = -1;
78 size_t arrowBytesExpired = -1;
80 size_t timeframesRead = -1;
81 size_t timeframesConsumed = -1;
82};
83
84std::vector<MetricIndices> createDefaultIndices(std::vector<DeviceMetricsInfo>& allDevicesMetrics)
85{
86 std::vector<MetricIndices> results;
87
88 for (auto& info : allDevicesMetrics) {
90 indices.arrowBytesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-created");
91 indices.arrowBytesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-destroyed");
92 indices.arrowMessagesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-created");
93 indices.arrowMessagesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-destroyed");
94 indices.arrowBytesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-expired");
95 indices.shmOfferBytesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "shm-offer-bytes-consumed");
96 indices.timeframesRead = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "df-sent");
97 indices.timeframesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "consumed-timeframes");
98 results.push_back(indices);
99 }
100 return results;
101}
102
104{
105 return registry.get<RateLimitConfig>().maxMemory;
106}
107
109{
110 using o2::monitoring::Metric;
111 using o2::monitoring::Monitoring;
112 using o2::monitoring::tags::Key;
113 using o2::monitoring::tags::Value;
114
115 return ServiceSpec{
116 .name = "arrow-backend",
118 .configure = CommonServices::noConfiguration(),
123 .metricHandling = [](ServiceRegistryRef registry,
124 ServiceMetricsInfo const& sm,
125 size_t timestamp) {
126 int64_t totalBytesCreated = 0;
127 int64_t shmOfferBytesConsumed = 0;
128 int64_t totalBytesDestroyed = 0;
129 int64_t totalBytesExpired = 0;
130 int64_t totalMessagesCreated = 0;
131 int64_t totalMessagesDestroyed = 0;
132 int64_t totalTimeframesRead = 0;
133 int64_t totalTimeframesConsumed = 0;
134 auto &driverMetrics = sm.driverMetricsInfo;
135 auto &allDeviceMetrics = sm.deviceMetricsInfos;
136 auto &specs = sm.deviceSpecs;
137 auto &infos = sm.deviceInfos;
138 O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, &sm);
139
140 static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "rate-limit-state");
141 static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-created");
142 static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-shm-offer-bytes-consumed");
143 static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-shared-memory");
144 static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-shared-memory");
145 static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-shared-memory");
146 static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-destroyed");
147 static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-expired");
148 static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-created");
149 static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-destroyed");
150 static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-read");
151 static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-consumed");
152 static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-timeframes-in-fly");
153 static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "arrow-bytes-delta");
154 static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "changed-metrics-count");
155 static auto totalSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-reader-signals");
156 static auto signalLatencyMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-signal-latency");
157 static auto skippedSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-skipped-signals");
158 static auto remainingBytes = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-remaining-bytes");
159 auto& manager = registry.get<DevicesManager>();
160
161 bool changed = false;
162
163 size_t lastTimestamp = 0;
164 static std::vector<MetricIndices> allIndices = createDefaultIndices(allDeviceMetrics);
165 for (size_t mi = 0; mi < allDeviceMetrics.size(); ++mi) {
166 auto& deviceMetrics = allDeviceMetrics[mi];
167 if (deviceMetrics.changed.size() != deviceMetrics.metrics.size()) {
168 throw std::runtime_error("deviceMetrics.size() != allDeviceMetrics.size()");
169 }
170 auto& indices = allIndices[mi];
171 {
172 size_t index = indices.arrowBytesCreated;
173 assert(index < deviceMetrics.metrics.size());
174 changed |= deviceMetrics.changed[index];
175 MetricInfo info = deviceMetrics.metrics[index];
176 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
177 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
178 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
179 auto value = (int64_t)data[(info.pos - 1) % data.size()];
180 totalBytesCreated += value;
181 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
182 }
183 {
184 size_t index = indices.shmOfferBytesConsumed;
185 assert(index < deviceMetrics.metrics.size());
186 changed |= deviceMetrics.changed[index];
187 MetricInfo info = deviceMetrics.metrics[index];
188 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
189 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
190 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
191 auto value = (int64_t)data[(info.pos - 1) % data.size()];
192 shmOfferBytesConsumed += value;
193 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
194 }
195 {
196 size_t index = indices.arrowBytesDestroyed;
197 assert(index < deviceMetrics.metrics.size());
198 changed |= deviceMetrics.changed[index];
199 MetricInfo info = deviceMetrics.metrics[index];
200 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
201 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
202 auto value = (int64_t)data[(info.pos - 1) % data.size()];
203 totalBytesDestroyed += value;
204 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
205 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
206 }
207 {
208 size_t index = indices.arrowBytesExpired;
209 assert(index < deviceMetrics.metrics.size());
210 changed |= deviceMetrics.changed[index];
211 MetricInfo info = deviceMetrics.metrics[index];
212 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
213 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
214 auto value = (int64_t)data[(info.pos - 1) % data.size()];
215 totalBytesExpired += value;
216 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
217 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
218 }
219 {
220 size_t index = indices.arrowMessagesCreated;
221 assert(index < deviceMetrics.metrics.size());
222 MetricInfo info = deviceMetrics.metrics[index];
223 changed |= deviceMetrics.changed[index];
224 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
225 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
226 auto value = (int64_t)data[(info.pos - 1) % data.size()];
227 totalMessagesCreated += value;
228 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
229 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
230 }
231 {
232 size_t index = indices.arrowMessagesDestroyed;
233 assert(index < deviceMetrics.metrics.size());
234 MetricInfo info = deviceMetrics.metrics[index];
235 changed |= deviceMetrics.changed[index];
236 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
237 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
238 auto value = (int64_t)data[(info.pos - 1) % data.size()];
239 totalMessagesDestroyed += value;
240 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
241 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
242 }
243 {
244 size_t index = indices.timeframesRead;
245 assert(index < deviceMetrics.metrics.size());
246 changed |= deviceMetrics.changed[index];
247 MetricInfo info = deviceMetrics.metrics[index];
248 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
249 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
250 auto value = (int64_t)data[(info.pos - 1) % data.size()];
251 totalTimeframesRead += value;
252 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
253 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
254 }
255 {
256 size_t index = indices.timeframesConsumed;
257 assert(index < deviceMetrics.metrics.size());
258 changed |= deviceMetrics.changed[index];
259 MetricInfo info = deviceMetrics.metrics[index];
260 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
261 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
262 auto value = (int64_t)data[(info.pos - 1) % data.size()];
263 totalTimeframesConsumed += value;
264 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
265 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
266 }
267 }
268 static uint64_t unchangedCount = 0;
269 if (changed) {
270 totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp);
271 totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp);
272 totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp);
273 shmOfferConsumedMetric(driverMetrics, shmOfferBytesConsumed, timestamp);
274 totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp);
275 totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
276 totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
277 totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
278 totalTimeframesInFlyMetric(driverMetrics, (int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
279 totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
280 } else {
281 unchangedCount++;
282 }
283 changedCountMetric(driverMetrics, unchangedCount, timestamp);
284 auto maxTimeframes = registry.get<RateLimitConfig>().maxTimeframes;
285 if (maxTimeframes && (totalTimeframesRead - totalTimeframesConsumed) > maxTimeframes) {
286 return;
287 }
288
289 static int64_t MAX_SHARED_MEMORY = calculateAvailableSharedMemory(registry);
290 constexpr int64_t MAX_QUANTUM_SHARED_MEMORY = 100;
291 constexpr int64_t MIN_QUANTUM_SHARED_MEMORY = 50;
292
293 static int64_t availableSharedMemory = MAX_SHARED_MEMORY;
294 static int64_t offeredSharedMemory = 0;
295 static int64_t lastDeviceOffered = 0;
298 int64_t lastCandidate = -1;
299 static int enoughSharedMemoryCount = availableSharedMemory - MIN_QUANTUM_SHARED_MEMORY > 0 ? 1 : 0;
300 static int lowSharedMemoryCount = availableSharedMemory - MIN_QUANTUM_SHARED_MEMORY > 0 ? 0 : 1;
301 int64_t possibleOffer = MIN_QUANTUM_SHARED_MEMORY;
302 for (size_t di = 0; di < specs.size(); di++) {
303 if (availableSharedMemory < possibleOffer) {
304 if (lowSharedMemoryCount == 0) {
305 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "not enough",
306 "We do not have enough shared memory (%{bytes}llu MB) to offer %{bytes}llu MB. Total offerings %{bytes}llu",
307 availableSharedMemory, possibleOffer, offeredSharedMemory);
308 }
309 lowSharedMemoryCount++;
310 enoughSharedMemoryCount = 0;
311 break;
312 } else {
313 if (enoughSharedMemoryCount == 0) {
314 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "enough",
315 "We are back in a state where we enough shared memory: %{bytes}llu MB", availableSharedMemory);
316 }
317 enoughSharedMemoryCount++;
318 lowSharedMemoryCount = 0;
319 }
320 size_t candidate = (lastDeviceOffered + di) % specs.size();
321
322 auto& info = infos[candidate];
323 // Do not bother for inactive devices
324 // FIXME: there is probably a race condition if the device died and we did not
325 // took notice yet...
326 if (info.active == false || info.readyToQuit) {
327 continue;
328 }
329 if (specs[candidate].name != "internal-dpl-aod-reader") {
330 continue;
331 }
332 possibleOffer = std::min(MAX_QUANTUM_SHARED_MEMORY, availableSharedMemory);
333 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
334 "Offering %{bytes}llu MB out of %{bytes}llu to %{public}s",
335 possibleOffer, availableSharedMemory, specs[candidate].id.c_str());
336 manager.queueMessage(specs[candidate].id.c_str(), fmt::format("/shm-offer {}", possibleOffer).data());
337 availableSharedMemory -= possibleOffer;
338 offeredSharedMemory += possibleOffer;
339 lastCandidate = candidate;
340 }
341 // We had at least a valid candidate, so
342 // next time we offer to the next device.
343 if (lastCandidate >= 0) {
344 lastDeviceOffered = lastCandidate + 1;
345 }
346
347 // unusedOfferedSharedMemory is the amount of memory which was offered and which we know it was
348 // not used so far. So we need to account for the amount which got actually read (readerBytesCreated)
349 // and the amount which we know was given back.
350 static int64_t lastShmOfferConsumed = 0;
351 static int64_t lastUnusedOfferedMemory = 0;
352 if (shmOfferBytesConsumed != lastShmOfferConsumed) {
353 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
354 "Offer consumed so far %{bytes}llu", shmOfferBytesConsumed);
355 lastShmOfferConsumed = shmOfferBytesConsumed;
356 }
357 int unusedOfferedMemory = (offeredSharedMemory - (totalBytesExpired + shmOfferBytesConsumed) / 1000000);
358 if (lastUnusedOfferedMemory != unusedOfferedMemory) {
359 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
360 "unusedOfferedMemory:%{bytes}d = offered:%{bytes}llu - (expired:%{bytes}llu + consumed:%{bytes}llu) / 1000000",
361 unusedOfferedMemory, offeredSharedMemory, totalBytesExpired / 1000000, shmOfferBytesConsumed / 1000000);
362 lastUnusedOfferedMemory = unusedOfferedMemory;
363 }
364 // availableSharedMemory is the amount of memory which we know is available to be offered.
365 // We subtract the amount which we know was already offered but it's unused and we then balance how
366 // much was created with how much was destroyed.
367 availableSharedMemory = MAX_SHARED_MEMORY + ((totalBytesDestroyed - totalBytesCreated) / 1000000) - unusedOfferedMemory;
368 availableSharedMemoryMetric(driverMetrics, availableSharedMemory, timestamp);
369 unusedOfferedSharedMemoryMetric(driverMetrics, unusedOfferedMemory, timestamp);
370
371 offeredSharedMemoryMetric(driverMetrics, offeredSharedMemory, timestamp); },
372 .postDispatching = [](ProcessingContext& ctx, void* service) {
374 auto* arrow = reinterpret_cast<ArrowContext*>(service);
375 auto totalBytes = 0;
376 auto totalMessages = 0;
377 O2_SIGNPOST_ID_FROM_POINTER(sid, rate_limiting, &arrow);
378 for (auto& input : ctx.inputs()) {
379 if (input.header == nullptr) {
380 continue;
381 }
382 auto const* dh = DataRefUtils::getHeader<DataHeader*>(input);
383 auto payloadSize = DataRefUtils::getPayloadSize(input);
384 if (dh->serialization != o2::header::gSerializationMethodArrow) {
385 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
386 "Message %{public}.4s/%{public}.16s is not of kind arrow, therefore we are not accounting its shared memory.",
387 dh->dataOrigin.str, dh->dataDescription.str);
388 continue;
389 }
390 bool forwarded = false;
391 for (auto const& forward : ctx.services().get<DeviceSpec const>().forwards) {
392 if (DataSpecUtils::match(forward.matcher, *dh)) {
393 forwarded = true;
394 break;
395 }
396 }
397 if (forwarded) {
398 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
399 "Message %{public}.4s/%{public}16.s is forwarded so we are not returning its memory.",
400 dh->dataOrigin.str, dh->dataDescription.str);
401 continue;
402 }
403 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "offer",
404 "Message %{public}.4s/%{public}.16s is being deleted. We will return %{bytes}f MB.",
405 dh->dataOrigin.str, dh->dataDescription.str, payloadSize / 1000000.);
406 totalBytes += payloadSize;
407 totalMessages += 1;
408 }
409 arrow->updateBytesDestroyed(totalBytes);
410 O2_SIGNPOST_EVENT_EMIT(rate_limiting, sid, "give back",
411 "%{bytes}f MB bytes being given back to reader, totaling %{bytes}f MB",
412 totalBytes / 1000000., arrow->bytesDestroyed() / 1000000.);
413 arrow->updateMessagesDestroyed(totalMessages);
414 auto& stats = ctx.services().get<DataProcessingStats>();
415 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_DESTROYED), DataProcessingStats::Op::Set, static_cast<int64_t>(arrow->bytesDestroyed())});
416 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_DESTROYED), DataProcessingStats::Op::Set, static_cast<int64_t>(arrow->messagesDestroyed())});
417 stats.processCommandQueue(); },
418 .driverInit = [](ServiceRegistryRef registry, DeviceConfig const& dc) {
419 auto config = new RateLimitConfig{};
420 int readers = std::stoll(dc.options["readers"].as<std::string>());
421 if (dc.options.count("aod-memory-rate-limit") && dc.options["aod-memory-rate-limit"].defaulted() == false) {
422 config->maxMemory = std::stoll(dc.options["aod-memory-rate-limit"].as<std::string>()) / 1000000;
423 } else {
424 config->maxMemory = readers * 500;
425 }
426 if (dc.options.count("timeframes-rate-limit") && dc.options["timeframes-rate-limit"].as<std::string>() == "readers") {
427 config->maxTimeframes = readers;
428 } else {
429 config->maxTimeframes = std::stoll(dc.options["timeframes-rate-limit"].as<std::string>());
430 }
431 static bool once = false;
432 // Until we guarantee this is called only once...
433 if (!once) {
434 O2_SIGNPOST_ID_GENERATE(sid, rate_limiting);
435 O2_SIGNPOST_EVENT_EMIT_INFO(rate_limiting, sid, "setup",
436 "Rate limiting set up at %{bytes}llu MB distributed over %d readers",
437 config->maxMemory, readers);
438 registry.registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
439 once = true;
440 } },
441 .adjustTopology = [](WorkflowSpecNode& node, ConfigContext const& ctx) {
442 auto& workflow = node.specs;
443 auto spawner = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-spawner"; });
444 auto builder = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-index-builder"; });
445 auto reader = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-reader"; });
446 auto writer = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-writer"; });
447 auto &ac = ctx.services().get<AnalysisContext>();
448 ac.requestedAODs.clear();
449 ac.requestedDYNs.clear();
450 ac.providedDYNs.clear();
451
452
453 auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
454 auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
455
456 if (builder != workflow.end()) {
457 // collect currently requested IDXs
458 ac.requestedIDXs.clear();
459 for (auto& d : workflow) {
460 if (d.name == builder->name) {
461 continue;
462 }
463 for (auto& i : d.inputs) {
465 auto copy = i;
466 DataSpecUtils::updateInputList(ac.requestedIDXs, std::move(copy));
467 }
468 }
469 }
470 // recreate inputs and outputs
471 builder->inputs.clear();
472 builder->outputs.clear();
473 // replace AlgorithmSpec
474 // FIXME: it should be made more generic, so it does not need replacement...
475 builder->algorithm = readers::AODReaderHelpers::indexBuilderCallback(ac.requestedIDXs);
476 AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.requestedIDXs, ac.requestedAODs, ac.requestedDYNs, *builder);
477 }
478
479 if (spawner != workflow.end()) {
480 // collect currently requested DYNs
481 for (auto& d : workflow) {
482 if (d.name == spawner->name) {
483 continue;
484 }
485 for (auto const& i : d.inputs) {
487 auto copy = i;
488 DataSpecUtils::updateInputList(ac.requestedDYNs, std::move(copy));
489 }
490 }
491 for (auto const& o : d.outputs) {
493 ac.providedDYNs.emplace_back(o);
494 }
495 }
496 }
497 std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
498 std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
499 ac.spawnerInputs.clear();
500 for (auto& input : ac.requestedDYNs) {
501 if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) {
502 ac.spawnerInputs.emplace_back(input);
503 }
504 }
505 // recreate inputs and outputs
506 spawner->outputs.clear();
507 spawner->inputs.clear();
508 // replace AlgorithmSpec
509 // FIXME: it should be made more generic, so it does not need replacement...
510 spawner->algorithm = readers::AODReaderHelpers::aodSpawnerCallback(ac.spawnerInputs);
511 AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, *spawner);
512 }
513
514 if (writer != workflow.end()) {
515 workflow.erase(writer);
516 }
517
518 if (reader != workflow.end()) {
519 // If reader and/or builder were adjusted, remove unneeded outputs
520 // update currently requested AODs
521 for (auto& d : workflow) {
522 for (auto const& i : d.inputs) {
523 if (DataSpecUtils::partialMatch(i, AODOrigins)) {
524 auto copy = i;
525 DataSpecUtils::updateInputList(ac.requestedAODs, std::move(copy));
526 }
527 }
528 }
529
530 // remove unmatched outputs
531 auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](OutputSpec const& o) {
532 return !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFNumber"}) && !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFFilename"}) && std::none_of(ac.requestedAODs.begin(), ac.requestedAODs.end(), [&](InputSpec const& i) { return DataSpecUtils::match(i, o); });
533 });
534 reader->outputs.erase(o_end, reader->outputs.end());
535 if (reader->outputs.empty()) {
536 // nothing to read
537 workflow.erase(reader);
538 }
539 }
540
541 // replace writer as some outputs may have become dangling and some are now consumed
542 auto [outputsInputs, isDangling] = WorkflowHelpers::analyzeOutputs(workflow);
543
544 // create DataOutputDescriptor
545 std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
546
547 // select outputs of type AOD which need to be saved
548 // ATTENTION: if there are dangling outputs the getGlobalAODSink
549 // has to be created in any case!
550 ac.outputsInputsAOD.clear();
551
552 for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
553 if (DataSpecUtils::partialMatch(outputsInputs[ii], extendedAODOrigins)) {
554 auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]);
555 if (!ds.empty() || isDangling[ii]) {
556 ac.outputsInputsAOD.emplace_back(outputsInputs[ii]);
557 }
558 }
559 }
560
561 // file sink for any AOD output
562 if (!ac.outputsInputsAOD.empty()) {
563 // add TFNumber and TFFilename as input to the writer
564 ac.outputsInputsAOD.emplace_back("tfn", "TFN", "TFNumber");
565 ac.outputsInputsAOD.emplace_back("tff", "TFF", "TFFilename");
566 workflow.push_back(AnalysisSupportHelpers::getGlobalAODSink(ctx));
567 }
568 // Move the dummy sink at the end, if needed
569 for (size_t i = 0; i < workflow.size(); ++i) {
570 if (workflow[i].name == "internal-dpl-injected-dummy-sink") {
571 workflow.push_back(workflow[i]);
572 workflow.erase(workflow.begin() + i);
573 break;
574 }
575 } },
576 .kind = ServiceKind::Global};
577}
578
580{
581 return ServiceSpec{
582 .name = "arrow-slicing-cache-def",
583 .uniqueId = CommonServices::simpleServiceId<ArrowTableSlicingCacheDef>(),
584 .init = CommonServices::simpleServiceInit<ArrowTableSlicingCacheDef, ArrowTableSlicingCacheDef, ServiceKind::Global>(),
585 .kind = ServiceKind::Global};
586}
587
589{
590 return ServiceSpec{
591 .name = "arrow-slicing-cache",
592 .uniqueId = CommonServices::simpleServiceId<ArrowTableSlicingCache>(),
593 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions&) { return ServiceHandle{TypeIdHelpers::uniqueId<ArrowTableSlicingCache>(),
594 new ArrowTableSlicingCache(Cache{services.get<ArrowTableSlicingCacheDef>().bindingsKeys},
595 Cache{services.get<ArrowTableSlicingCacheDef>().bindingsKeysUnsorted}),
597 .configure = CommonServices::noConfiguration(),
598 .preProcessing = [](ProcessingContext& pc, void* service_ptr) {
599 auto* service = static_cast<ArrowTableSlicingCache*>(service_ptr);
600 auto& caches = service->bindingsKeys;
601 for (auto i = 0u; i < caches.size(); ++i) {
602 if (caches[i].enabled && pc.inputs().getPos(caches[i].binding.c_str()) >= 0) {
603 auto status = service->updateCacheEntry(i, pc.inputs().get<TableConsumer>(caches[i].binding.c_str())->asArrowTable());
604 if (!status.ok()) {
605 throw runtime_error_f("Failed to update slice cache for %s/%s", caches[i].binding.c_str(), caches[i].key.c_str());
606 }
607 }
608 }
609 auto& unsortedCaches = service->bindingsKeysUnsorted;
610 for (auto i = 0u; i < unsortedCaches.size(); ++i) {
611 if (unsortedCaches[i].enabled && pc.inputs().getPos(unsortedCaches[i].binding.c_str()) >= 0) {
612 auto status = service->updateCacheEntryUnsorted(i, pc.inputs().get<TableConsumer>(unsortedCaches[i].binding.c_str())->asArrowTable());
613 if (!status.ok()) {
614 throw runtime_error_f("failed to update slice cache (unsorted) for %s/%s", unsortedCaches[i].binding.c_str(), unsortedCaches[i].key.c_str());
615 }
616 }
617 } },
618 .kind = ServiceKind::Stream};
619}
620
621} // namespace o2::framework
int32_t i
bool o
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:483
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:499
#define O2_SIGNPOST_EVENT_EMIT_INFO(log, id, name, format,...)
Definition Signpost.h:525
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:500
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:516
int getPos(const char *name) const
decltype(auto) get(R binding, int part=0) const
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
std::shared_ptr< arrow::Table > asArrowTable()
Return the table in the message as a arrow::Table instance.
GLint GLenum GLint x
Definition glcorearb.h:403
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum GLenum GLsizei const GLuint GLboolean enabled
Definition glcorearb.h:2513
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLboolean * data
Definition glcorearb.h:298
GLsizei GLenum const void * indices
Definition glcorearb.h:400
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
auto spawner(std::shared_ptr< arrow::Table > const &fullTable, const char *name, o2::framework::expressions::Projector *projectors, std::shared_ptr< gandiva::Projector > &projector, std::shared_ptr< arrow::Schema > const &schema)
Expression-based column generator to materialize columns.
std::vector< Entry > Cache
std::vector< MetricIndices > createDefaultIndices(std::vector< DeviceMetricsInfo > &allDevicesMetrics)
uint64_t calculateAvailableSharedMemory(ServiceRegistryRef registry)
RuntimeErrorRef runtime_error_f(const char *,...)
constexpr o2::header::SerializationMethod gSerializationMethodArrow
Definition DataHeader.h:331
std::vector< InputSpec > requestedAODs
static void addMissingOutputsToBuilder(std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static std::shared_ptr< DataOutputDirector > getDataOutputDirector(ConfigContext const &ctx)
Get the data director.
static void addMissingOutputsToSpawner(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, DataProcessorSpec &publisher)
static DataProcessorSpec getGlobalAODSink(ConfigContext const &)
writes inputs of kind AOD to file
static ServiceSpec arrowTableSlicingCacheSpec()
static ServiceSpec arrowBackendSpec()
static ServiceSpec arrowTableSlicingCacheDefSpec()
static ServiceConfigureCallback noConfiguration()
Helper struct to hold statistics about the data processing happening.
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
Running state information of a given device.
Definition DeviceState.h:34
std::string name
Name of the service.
static std::tuple< std::vector< InputSpec >, std::vector< bool > > analyzeOutputs(WorkflowSpec const &workflow)
std::vector< DataProcessorSpec > & specs
static AlgorithmSpec aodSpawnerCallback(std::vector< InputSpec > &requested)
static AlgorithmSpec indexBuilderCallback(std::vector< InputSpec > &requested)
the main header struct
Definition DataHeader.h:618
o2::mch::DsIndex ds