Project
Loading...
Searching...
No Matches
ArrowSupport.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "ArrowSupport.h"
12
24#include "Framework/Tracing.h"
31#include "WorkflowHelpers.h"
36
38#include <Monitoring/Monitoring.h>
39#include "Headers/DataHeader.h"
41
42#include <RtypesCore.h>
43#include <fairmq/ProgOptions.h>
44
45#include <uv.h>
46#include <boost/program_options/variables_map.hpp>
47#include <csignal>
48
49namespace o2::framework
50{
51
52class EndOfStreamContext;
53class ProcessingContext;
54
55enum struct RateLimitingState {
56 UNKNOWN = 0, // No information received yet.
57 STARTED = 1, // Information received, new timeframe not requested.
58 CHANGED = 2, // Information received, new timeframe requested but not yet accounted.
59 BELOW_LIMIT = 3, // New metric received, we are below limit.
60 NEXT_ITERATION_FROM_BELOW = 4, // Iteration when previously in BELOW_LIMIT.
61 ABOVE_LIMIT = 5, // New metric received, we are above limit.
62 EMPTY = 6, //
63};
64
66 int64_t maxMemory = 2000;
67 int64_t maxTimeframes = 0;
68};
69
71 size_t arrowBytesCreated = -1;
75 size_t arrowBytesExpired = -1;
77 size_t timeframesRead = -1;
78 size_t timeframesConsumed = -1;
79};
80
81std::vector<MetricIndices> createDefaultIndices(std::vector<DeviceMetricsInfo>& allDevicesMetrics)
82{
83 std::vector<MetricIndices> results;
84
85 for (auto& info : allDevicesMetrics) {
87 indices.arrowBytesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-created");
88 indices.arrowBytesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-destroyed");
89 indices.arrowMessagesCreated = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-created");
90 indices.arrowMessagesDestroyed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-messages-destroyed");
91 indices.arrowBytesExpired = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "arrow-bytes-expired");
92 indices.shmOfferBytesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "shm-offer-bytes-consumed");
93 indices.timeframesRead = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "df-sent");
94 indices.timeframesConsumed = DeviceMetricsHelper::bookNumericMetric<uint64_t>(info, "consumed-timeframes");
95 results.push_back(indices);
96 }
97 return results;
98}
99
101{
102 return registry.get<RateLimitConfig>().maxMemory;
103}
104
106{
107 using o2::monitoring::Metric;
108 using o2::monitoring::Monitoring;
109 using o2::monitoring::tags::Key;
110 using o2::monitoring::tags::Value;
111
112 return ServiceSpec{
113 .name = "arrow-backend",
115 .configure = CommonServices::noConfiguration(),
120 .metricHandling = [](ServiceRegistryRef registry,
121 ServiceMetricsInfo const& sm,
122 size_t timestamp) {
123 int64_t totalBytesCreated = 0;
124 int64_t shmOfferBytesConsumed = 0;
125 int64_t totalBytesDestroyed = 0;
126 int64_t totalBytesExpired = 0;
127 int64_t totalMessagesCreated = 0;
128 int64_t totalMessagesDestroyed = 0;
129 int64_t totalTimeframesRead = 0;
130 int64_t totalTimeframesConsumed = 0;
131 auto &driverMetrics = sm.driverMetricsInfo;
132 auto &allDeviceMetrics = sm.deviceMetricsInfos;
133 auto &specs = sm.deviceSpecs;
134 auto &infos = sm.deviceInfos;
135
136 static auto stateMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "rate-limit-state");
137 static auto totalBytesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-created");
138 static auto shmOfferConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-shm-offer-bytes-consumed");
139 static auto unusedOfferedSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-unused-offered-shared-memory");
140 static auto availableSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-available-shared-memory");
141 static auto offeredSharedMemoryMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-offered-shared-memory");
142 static auto totalBytesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-destroyed");
143 static auto totalBytesExpiredMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-bytes-expired");
144 static auto totalMessagesCreatedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-created");
145 static auto totalMessagesDestroyedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-arrow-messages-destroyed");
146 static auto totalTimeframesReadMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-read");
147 static auto totalTimeframesConsumedMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "total-timeframes-consumed");
148 static auto totalTimeframesInFlyMetric = DeviceMetricsHelper::createNumericMetric<int>(driverMetrics, "total-timeframes-in-fly");
149 static auto totalBytesDeltaMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "arrow-bytes-delta");
150 static auto changedCountMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "changed-metrics-count");
151 static auto totalSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-reader-signals");
152 static auto signalLatencyMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-signal-latency");
153 static auto skippedSignalsMetric = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-skipped-signals");
154 static auto remainingBytes = DeviceMetricsHelper::createNumericMetric<uint64_t>(driverMetrics, "aod-remaining-bytes");
155 auto& manager = registry.get<DevicesManager>();
156
157 bool changed = false;
158
159 size_t lastTimestamp = 0;
160 static std::vector<MetricIndices> allIndices = createDefaultIndices(allDeviceMetrics);
161 for (size_t mi = 0; mi < allDeviceMetrics.size(); ++mi) {
162 auto& deviceMetrics = allDeviceMetrics[mi];
163 if (deviceMetrics.changed.size() != deviceMetrics.metrics.size()) {
164 throw std::runtime_error("deviceMetrics.size() != allDeviceMetrics.size()");
165 }
166 auto& indices = allIndices[mi];
167 {
168 size_t index = indices.arrowBytesCreated;
169 assert(index < deviceMetrics.metrics.size());
170 changed |= deviceMetrics.changed[index];
171 MetricInfo info = deviceMetrics.metrics[index];
172 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
173 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
174 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
175 auto value = (int64_t)data[(info.pos - 1) % data.size()];
176 totalBytesCreated += value;
177 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
178 }
179 {
180 size_t index = indices.shmOfferBytesConsumed;
181 assert(index < deviceMetrics.metrics.size());
182 changed |= deviceMetrics.changed[index];
183 MetricInfo info = deviceMetrics.metrics[index];
184 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
185 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
186 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
187 auto value = (int64_t)data[(info.pos - 1) % data.size()];
188 shmOfferBytesConsumed += value;
189 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
190 }
191 {
192 size_t index = indices.arrowBytesDestroyed;
193 assert(index < deviceMetrics.metrics.size());
194 changed |= deviceMetrics.changed[index];
195 MetricInfo info = deviceMetrics.metrics[index];
196 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
197 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
198 auto value = (int64_t)data[(info.pos - 1) % data.size()];
199 totalBytesDestroyed += value;
200 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
201 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
202 }
203 {
204 size_t index = indices.arrowBytesExpired;
205 assert(index < deviceMetrics.metrics.size());
206 changed |= deviceMetrics.changed[index];
207 MetricInfo info = deviceMetrics.metrics[index];
208 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
209 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
210 auto value = (int64_t)data[(info.pos - 1) % data.size()];
211 totalBytesExpired += value;
212 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
213 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
214 }
215 {
216 size_t index = indices.arrowMessagesCreated;
217 assert(index < deviceMetrics.metrics.size());
218 MetricInfo info = deviceMetrics.metrics[index];
219 changed |= deviceMetrics.changed[index];
220 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
221 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
222 auto value = (int64_t)data[(info.pos - 1) % data.size()];
223 totalMessagesCreated += value;
224 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
225 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
226 }
227 {
228 size_t index = indices.arrowMessagesDestroyed;
229 assert(index < deviceMetrics.metrics.size());
230 MetricInfo info = deviceMetrics.metrics[index];
231 changed |= deviceMetrics.changed[index];
232 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
233 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
234 auto value = (int64_t)data[(info.pos - 1) % data.size()];
235 totalMessagesDestroyed += value;
236 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
237 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
238 }
239 {
240 size_t index = indices.timeframesRead;
241 assert(index < deviceMetrics.metrics.size());
242 changed |= deviceMetrics.changed[index];
243 MetricInfo info = deviceMetrics.metrics[index];
244 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
245 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
246 auto value = (int64_t)data[(info.pos - 1) % data.size()];
247 totalTimeframesRead += value;
248 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
249 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
250 }
251 {
252 size_t index = indices.timeframesConsumed;
253 assert(index < deviceMetrics.metrics.size());
254 changed |= deviceMetrics.changed[index];
255 MetricInfo info = deviceMetrics.metrics[index];
256 assert(info.storeIdx < deviceMetrics.uint64Metrics.size());
257 auto& data = deviceMetrics.uint64Metrics[info.storeIdx];
258 auto value = (int64_t)data[(info.pos - 1) % data.size()];
259 totalTimeframesConsumed += value;
260 auto const& timestamps = DeviceMetricsHelper::getTimestampsStore<uint64_t>(deviceMetrics)[info.storeIdx];
261 lastTimestamp = std::max(lastTimestamp, timestamps[(info.pos - 1) % data.size()]);
262 }
263 }
264 static uint64_t unchangedCount = 0;
265 if (changed) {
266 totalBytesCreatedMetric(driverMetrics, totalBytesCreated, timestamp);
267 totalBytesDestroyedMetric(driverMetrics, totalBytesDestroyed, timestamp);
268 totalBytesExpiredMetric(driverMetrics, totalBytesExpired, timestamp);
269 shmOfferConsumedMetric(driverMetrics, shmOfferBytesConsumed, timestamp);
270 totalMessagesCreatedMetric(driverMetrics, totalMessagesCreated, timestamp);
271 totalMessagesDestroyedMetric(driverMetrics, totalMessagesDestroyed, timestamp);
272 totalTimeframesReadMetric(driverMetrics, totalTimeframesRead, timestamp);
273 totalTimeframesConsumedMetric(driverMetrics, totalTimeframesConsumed, timestamp);
274 totalTimeframesInFlyMetric(driverMetrics, (int)(totalTimeframesRead - totalTimeframesConsumed), timestamp);
275 totalBytesDeltaMetric(driverMetrics, totalBytesCreated - totalBytesExpired - totalBytesDestroyed, timestamp);
276 } else {
277 unchangedCount++;
278 }
279 changedCountMetric(driverMetrics, unchangedCount, timestamp);
280 auto maxTimeframes = registry.get<RateLimitConfig>().maxTimeframes;
281 if (maxTimeframes && (totalTimeframesRead - totalTimeframesConsumed) > maxTimeframes) {
282 return;
283 }
284
285 static int64_t MAX_SHARED_MEMORY = calculateAvailableSharedMemory(registry);
286 constexpr int64_t MAX_QUANTUM_SHARED_MEMORY = 100;
287 constexpr int64_t MIN_QUANTUM_SHARED_MEMORY = 50;
288
289 static int64_t availableSharedMemory = MAX_SHARED_MEMORY;
290 static int64_t offeredSharedMemory = 0;
291 static int64_t lastDeviceOffered = 0;
294 int64_t lastCandidate = -1;
295 static int enoughSharedMemoryCount = availableSharedMemory - MIN_QUANTUM_SHARED_MEMORY > 0 ? 1 : 0;
296 static int lowSharedMemoryCount = availableSharedMemory - MIN_QUANTUM_SHARED_MEMORY > 0 ? 0 : 1;
297 int64_t possibleOffer = MIN_QUANTUM_SHARED_MEMORY;
298 for (size_t di = 0; di < specs.size(); di++) {
299 if (availableSharedMemory < possibleOffer) {
300 if (lowSharedMemoryCount == 0) {
301 LOGP(detail, "We do not have enough shared memory ({}MB) to offer {}MB. Total offerings {}", availableSharedMemory, possibleOffer, offeredSharedMemory);
302 }
303 lowSharedMemoryCount++;
304 enoughSharedMemoryCount = 0;
305 break;
306 } else {
307 if (enoughSharedMemoryCount == 0) {
308 LOGP(detail, "We are back in a state where we enough shared memory: {}MB", availableSharedMemory);
309 }
310 enoughSharedMemoryCount++;
311 lowSharedMemoryCount = 0;
312 }
313 size_t candidate = (lastDeviceOffered + di) % specs.size();
314
315 auto& info = infos[candidate];
316 // Do not bother for inactive devices
317 // FIXME: there is probably a race condition if the device died and we did not
318 // took notice yet...
319 if (info.active == false || info.readyToQuit) {
320 continue;
321 }
322 if (specs[candidate].name != "internal-dpl-aod-reader") {
323 continue;
324 }
325 possibleOffer = std::min(MAX_QUANTUM_SHARED_MEMORY, availableSharedMemory);
326 LOGP(detail, "Offering {}MB out of {} to {}", possibleOffer, availableSharedMemory, specs[candidate].id);
327 manager.queueMessage(specs[candidate].id.c_str(), fmt::format("/shm-offer {}", possibleOffer).data());
328 availableSharedMemory -= possibleOffer;
329 offeredSharedMemory += possibleOffer;
330 lastCandidate = candidate;
331 }
332 // We had at least a valid candidate, so
333 // next time we offer to the next device.
334 if (lastCandidate >= 0) {
335 lastDeviceOffered = lastCandidate + 1;
336 }
337
338 // unusedOfferedSharedMemory is the amount of memory which was offered and which we know it was
339 // not used so far. So we need to account for the amount which got actually read (readerBytesCreated)
340 // and the amount which we know was given back.
341 static int64_t lastShmOfferConsumed = 0;
342 static int64_t lastUnusedOfferedMemory = 0;
343 if (shmOfferBytesConsumed != lastShmOfferConsumed) {
344 LOGP(detail, "Offer consumed so far {}", shmOfferBytesConsumed);
345 lastShmOfferConsumed = shmOfferBytesConsumed;
346 }
347 int unusedOfferedMemory = (offeredSharedMemory - (totalBytesExpired + shmOfferBytesConsumed) / 1000000);
348 if (lastUnusedOfferedMemory != unusedOfferedMemory) {
349 LOGP(detail, "unusedOfferedMemory:{} = offered:{} - (expired:{} + consumed:{}) / 1000000", unusedOfferedMemory, offeredSharedMemory, totalBytesExpired / 1000000, shmOfferBytesConsumed / 1000000);
350 lastUnusedOfferedMemory = unusedOfferedMemory;
351 }
352 // availableSharedMemory is the amount of memory which we know is available to be offered.
353 // We subtract the amount which we know was already offered but it's unused and we then balance how
354 // much was created with how much was destroyed.
355 availableSharedMemory = MAX_SHARED_MEMORY + ((totalBytesDestroyed - totalBytesCreated) / 1000000) - unusedOfferedMemory;
356 availableSharedMemoryMetric(driverMetrics, availableSharedMemory, timestamp);
357 unusedOfferedSharedMemoryMetric(driverMetrics, unusedOfferedMemory, timestamp);
358
359 offeredSharedMemoryMetric(driverMetrics, offeredSharedMemory, timestamp); },
360 .postDispatching = [](ProcessingContext& ctx, void* service) {
362 auto* arrow = reinterpret_cast<ArrowContext*>(service);
363 auto totalBytes = 0;
364 auto totalMessages = 0;
365 for (auto& input : ctx.inputs()) {
366 if (input.header == nullptr) {
367 continue;
368 }
369 auto const* dh = DataRefUtils::getHeader<DataHeader*>(input);
370 auto payloadSize = DataRefUtils::getPayloadSize(input);
371 if (dh->serialization != o2::header::gSerializationMethodArrow) {
372 LOGP(debug, "Message {}/{} is not of kind arrow, therefore we are not accounting its shared memory", dh->dataOrigin, dh->dataDescription);
373 continue;
374 }
375 bool forwarded = false;
376 for (auto const& forward : ctx.services().get<DeviceSpec const>().forwards) {
377 if (DataSpecUtils::match(forward.matcher, *dh)) {
378 forwarded = true;
379 break;
380 }
381 }
382 if (forwarded) {
383 LOGP(debug, "Message {}/{} is forwarded so we are not returning its memory.", dh->dataOrigin, dh->dataDescription);
384 continue;
385 }
386 LOGP(debug, "Message {}/{} is being deleted. We will return {}MB.", dh->dataOrigin, dh->dataDescription, payloadSize / 1000000.);
387 totalBytes += payloadSize;
388 totalMessages += 1;
389 }
390 arrow->updateBytesDestroyed(totalBytes);
391 LOGP(debug, "{}MB bytes being given back to reader, totaling {}MB", totalBytes / 1000000., arrow->bytesDestroyed() / 1000000.);
392 arrow->updateMessagesDestroyed(totalMessages);
393 auto& stats = ctx.services().get<DataProcessingStats>();
394 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_BYTES_DESTROYED), DataProcessingStats::Op::Set, static_cast<int64_t>(arrow->bytesDestroyed())});
395 stats.updateStats({static_cast<short>(ProcessingStatsId::ARROW_MESSAGES_DESTROYED), DataProcessingStats::Op::Set, static_cast<int64_t>(arrow->messagesDestroyed())});
396 stats.processCommandQueue(); },
397 .driverInit = [](ServiceRegistryRef registry, DeviceConfig const& dc) {
398 auto config = new RateLimitConfig{};
399 int readers = std::stoll(dc.options["readers"].as<std::string>());
400 if (dc.options.count("aod-memory-rate-limit") && dc.options["aod-memory-rate-limit"].defaulted() == false) {
401 config->maxMemory = std::stoll(dc.options["aod-memory-rate-limit"].as<std::string>()) / 1000000;
402 } else {
403 config->maxMemory = readers * 500;
404 }
405 if (dc.options.count("timeframes-rate-limit") && dc.options["timeframes-rate-limit"].as<std::string>() == "readers") {
406 config->maxTimeframes = readers;
407 } else {
408 config->maxTimeframes = std::stoll(dc.options["timeframes-rate-limit"].as<std::string>());
409 }
410 static bool once = false;
411 // Until we guarantee this is called only once...
412 if (!once) {
413 LOGP(info, "Rate limiting set up at {}MB distributed over {} readers", config->maxMemory, readers);
414 registry.registerService(ServiceRegistryHelpers::handleForService<RateLimitConfig>(config));
415 once = true;
416 } },
417 .adjustTopology = [](WorkflowSpecNode& node, ConfigContext const& ctx) {
418 auto& workflow = node.specs;
419 auto spawner = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-spawner"; });
420 auto builder = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-index-builder"; });
421 auto reader = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-reader"; });
422 auto writer = std::find_if(workflow.begin(), workflow.end(), [](DataProcessorSpec const& spec) { return spec.name == "internal-dpl-aod-writer"; });
423 auto &ac = ctx.services().get<AnalysisContext>();
424 ac.requestedAODs.clear();
425 ac.requestedDYNs.clear();
426 ac.providedDYNs.clear();
427
428
429 auto inputSpecLessThan = [](InputSpec const& lhs, InputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
430 auto outputSpecLessThan = [](OutputSpec const& lhs, OutputSpec const& rhs) { return DataSpecUtils::describe(lhs) < DataSpecUtils::describe(rhs); };
431
432 if (builder != workflow.end()) {
433 // collect currently requested IDXs
434 ac.requestedIDXs.clear();
435 for (auto& d : workflow) {
436 if (d.name == builder->name) {
437 continue;
438 }
439 for (auto& i : d.inputs) {
441 auto copy = i;
442 DataSpecUtils::updateInputList(ac.requestedIDXs, std::move(copy));
443 }
444 }
445 }
446 // recreate inputs and outputs
447 builder->inputs.clear();
448 builder->outputs.clear();
449 // replace AlgorithmSpec
450 // FIXME: it should be made more generic, so it does not need replacement...
451 builder->algorithm = readers::AODReaderHelpers::indexBuilderCallback(ac.requestedIDXs);
452 AnalysisSupportHelpers::addMissingOutputsToBuilder(ac.requestedIDXs, ac.requestedAODs, ac.requestedDYNs, *builder);
453 }
454
455 if (spawner != workflow.end()) {
456 // collect currently requested DYNs
457 for (auto& d : workflow) {
458 if (d.name == spawner->name) {
459 continue;
460 }
461 for (auto const& i : d.inputs) {
463 auto copy = i;
464 DataSpecUtils::updateInputList(ac.requestedDYNs, std::move(copy));
465 }
466 }
467 for (auto const& o : d.outputs) {
469 ac.providedDYNs.emplace_back(o);
470 }
471 }
472 }
473 std::sort(ac.requestedDYNs.begin(), ac.requestedDYNs.end(), inputSpecLessThan);
474 std::sort(ac.providedDYNs.begin(), ac.providedDYNs.end(), outputSpecLessThan);
475 ac.spawnerInputs.clear();
476 for (auto& input : ac.requestedDYNs) {
477 if (std::none_of(ac.providedDYNs.begin(), ac.providedDYNs.end(), [&input](auto const& x) { return DataSpecUtils::match(input, x); })) {
478 ac.spawnerInputs.emplace_back(input);
479 }
480 }
481 // recreate inputs and outputs
482 spawner->outputs.clear();
483 spawner->inputs.clear();
484 // replace AlgorithmSpec
485 // FIXME: it should be made more generic, so it does not need replacement...
486 spawner->algorithm = readers::AODReaderHelpers::aodSpawnerCallback(ac.spawnerInputs);
487 AnalysisSupportHelpers::addMissingOutputsToSpawner({}, ac.spawnerInputs, ac.requestedAODs, *spawner);
488 }
489
490 if (writer != workflow.end()) {
491 workflow.erase(writer);
492 }
493
494 if (reader != workflow.end()) {
495 // If reader and/or builder were adjusted, remove unneeded outputs
496 // update currently requested AODs
497 for (auto& d : workflow) {
498 for (auto const& i : d.inputs) {
499 if (DataSpecUtils::partialMatch(i, AODOrigins)) {
500 auto copy = i;
501 DataSpecUtils::updateInputList(ac.requestedAODs, std::move(copy));
502 }
503 }
504 }
505
506 // remove unmatched outputs
507 auto o_end = std::remove_if(reader->outputs.begin(), reader->outputs.end(), [&](OutputSpec const& o) {
508 return !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFNumber"}) && !DataSpecUtils::partialMatch(o, o2::header::DataDescription{"TFFilename"}) && std::none_of(ac.requestedAODs.begin(), ac.requestedAODs.end(), [&](InputSpec const& i) { return DataSpecUtils::match(i, o); });
509 });
510 reader->outputs.erase(o_end, reader->outputs.end());
511 if (reader->outputs.empty()) {
512 // nothing to read
513 workflow.erase(reader);
514 }
515 }
516
517 // replace writer as some outputs may have become dangling and some are now consumed
518 auto [outputsInputs, isDangling] = WorkflowHelpers::analyzeOutputs(workflow);
519
520 // create DataOutputDescriptor
521 std::shared_ptr<DataOutputDirector> dod = AnalysisSupportHelpers::getDataOutputDirector(ctx);
522
523 // select outputs of type AOD which need to be saved
524 // ATTENTION: if there are dangling outputs the getGlobalAODSink
525 // has to be created in any case!
526 ac.outputsInputsAOD.clear();
527
528 for (auto ii = 0u; ii < outputsInputs.size(); ii++) {
529 if (DataSpecUtils::partialMatch(outputsInputs[ii], extendedAODOrigins)) {
530 auto ds = dod->getDataOutputDescriptors(outputsInputs[ii]);
531 if (!ds.empty() || isDangling[ii]) {
532 ac.outputsInputsAOD.emplace_back(outputsInputs[ii]);
533 }
534 }
535 }
536
537 // file sink for any AOD output
538 if (!ac.outputsInputsAOD.empty()) {
539 // add TFNumber and TFFilename as input to the writer
540 ac.outputsInputsAOD.emplace_back("tfn", "TFN", "TFNumber");
541 ac.outputsInputsAOD.emplace_back("tff", "TFF", "TFFilename");
542 workflow.push_back(AnalysisSupportHelpers::getGlobalAODSink(ctx));
543 }
544 // Move the dummy sink at the end, if needed
545 for (size_t i = 0; i < workflow.size(); ++i) {
546 if (workflow[i].name == "internal-dpl-injected-dummy-sink") {
547 workflow.push_back(workflow[i]);
548 workflow.erase(workflow.begin() + i);
549 break;
550 }
551 } },
552 .kind = ServiceKind::Global};
553}
554
556{
557 return ServiceSpec{
558 .name = "arrow-slicing-cache-def",
559 .uniqueId = CommonServices::simpleServiceId<ArrowTableSlicingCacheDef>(),
560 .init = CommonServices::simpleServiceInit<ArrowTableSlicingCacheDef, ArrowTableSlicingCacheDef, ServiceKind::Global>(),
561 .kind = ServiceKind::Global};
562}
563
565{
566 return ServiceSpec{
567 .name = "arrow-slicing-cache",
568 .uniqueId = CommonServices::simpleServiceId<ArrowTableSlicingCache>(),
569 .init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions&) { return ServiceHandle{TypeIdHelpers::uniqueId<ArrowTableSlicingCache>(),
570 new ArrowTableSlicingCache(std::vector<std::pair<std::string, std::string>>{services.get<ArrowTableSlicingCacheDef>().bindingsKeys}, std::vector{services.get<ArrowTableSlicingCacheDef>().bindingsKeysUnsorted}),
572 .configure = CommonServices::noConfiguration(),
573 .preProcessing = [](ProcessingContext& pc, void* service_ptr) {
574 auto* service = static_cast<ArrowTableSlicingCache*>(service_ptr);
575 auto& caches = service->bindingsKeys;
576 for (auto i = 0; i < caches.size(); ++i) {
577 if (pc.inputs().getPos(caches[i].first.c_str()) >= 0) {
578 auto status = service->updateCacheEntry(i, pc.inputs().get<TableConsumer>(caches[i].first.c_str())->asArrowTable());
579 if (!status.ok()) {
580 throw runtime_error_f("Failed to update slice cache for %s/%s", caches[i].first.c_str(), caches[i].second.c_str());
581 }
582 }
583 }
584 auto& unsortedCaches = service->bindingsKeysUnsorted;
585 for (auto i = 0; i < unsortedCaches.size(); ++i) {
586 if (pc.inputs().getPos(unsortedCaches[i].first.c_str()) >= 0) {
587 auto status = service->updateCacheEntryUnsorted(i, pc.inputs().get<TableConsumer>(unsortedCaches[i].first.c_str())->asArrowTable());
588 if (!status.ok()) {
589 throw runtime_error_f("failed to update slice cache (unsorted) for %s/%s", unsortedCaches[i].first.c_str(), unsortedCaches[i].second.c_str());
590 }
591 }
592 } },
593 .kind = ServiceKind::Stream};
594}
595
596} // namespace o2::framework
int32_t i
bool o
std::ostringstream debug
int getPos(const char *name) const
decltype(auto) get(R binding, int part=0) const
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, char const *name=nullptr) const
std::shared_ptr< arrow::Table > asArrowTable()
Return the table in the message as a arrow::Table instance.
GLint GLenum GLint x
Definition glcorearb.h:403
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLboolean * data
Definition glcorearb.h:298
GLsizei GLenum const void * indices
Definition glcorearb.h:400
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
auto spawner(std::vector< std::shared_ptr< arrow::Table > > &&tables, const char *name)
Expression-based column generator to materialize columns.
std::vector< MetricIndices > createDefaultIndices(std::vector< DeviceMetricsInfo > &allDevicesMetrics)
uint64_t calculateAvailableSharedMemory(ServiceRegistryRef registry)
RuntimeErrorRef runtime_error_f(const char *,...)
constexpr o2::header::SerializationMethod gSerializationMethodArrow
Definition DataHeader.h:331
std::vector< InputSpec > requestedAODs
static void addMissingOutputsToBuilder(std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, std::vector< InputSpec > &requestedDYNs, DataProcessorSpec &publisher)
static std::shared_ptr< DataOutputDirector > getDataOutputDirector(ConfigContext const &ctx)
Get the data director.
static void addMissingOutputsToSpawner(std::vector< OutputSpec > const &providedSpecials, std::vector< InputSpec > const &requestedSpecials, std::vector< InputSpec > &requestedAODs, DataProcessorSpec &publisher)
static DataProcessorSpec getGlobalAODSink(ConfigContext const &)
writes inputs of kind AOD to file
static ServiceSpec arrowTableSlicingCacheSpec()
static ServiceSpec arrowBackendSpec()
static ServiceSpec arrowTableSlicingCacheDefSpec()
static ServiceConfigureCallback noConfiguration()
Helper struct to hold statistics about the data processing happening.
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
Running state information of a given device.
Definition DeviceState.h:34
std::string name
Name of the service.
static std::tuple< std::vector< InputSpec >, std::vector< bool > > analyzeOutputs(WorkflowSpec const &workflow)
std::vector< DataProcessorSpec > & specs
static AlgorithmSpec aodSpawnerCallback(std::vector< InputSpec > &requested)
static AlgorithmSpec indexBuilderCallback(std::vector< InputSpec > &requested)
the main header struct
Definition DataHeader.h:618
o2::mch::DsIndex ds