Project
Loading...
Searching...
No Matches
AnalysisTask.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#ifndef FRAMEWORK_ANALYSIS_TASK_H_
13#define FRAMEWORK_ANALYSIS_TASK_H_
14
25#include "Framework/Traits.h"
29
30#include <arrow/compute/kernel.h>
31#include <arrow/table.h>
32#include <gandiva/node.h>
33#include <type_traits>
34#include <utility>
35#include <memory>
36#include <tuple> // IWYU pragma: export
37
38namespace o2::framework
39{
48};
49
50template <int64_t BEGIN, int64_t END, int64_t STEP = 1>
52 static constexpr int64_t begin = BEGIN;
53 static constexpr int64_t end = END;
54 static constexpr int64_t step = STEP;
55};
56
57template <typename T>
58static constexpr bool is_enumeration_v = false;
59
60template <int64_t BEGIN, int64_t END, int64_t STEP>
61static constexpr bool is_enumeration_v<Enumeration<BEGIN, END, STEP>> = true;
62
63template <typename T>
64concept is_enumeration = is_enumeration_v<std::decay_t<T>>;
65
66// Helper struct which builds a DataProcessorSpec from
67// the contents of an AnalysisTask...
68namespace {
69struct AnalysisDataProcessorBuilder {
70 template <soa::is_iterator G, typename... Args>
71 static void addGroupingCandidates(Cache& bk, Cache& bku, bool enabled)
72 {
73 [&bk, &bku, enabled]<typename... As>(framework::pack<As...>) mutable {
74 auto key = std::string{"fIndex"} + o2::framework::cutString(soa::getLabelFromType<std::decay_t<G>>());
75 ([&bk, &bku, &key, enabled]() mutable {
76 if constexpr (soa::relatedByIndex<std::decay_t<G>, std::decay_t<As>>()) {
77 auto binding = soa::getLabelFromTypeForKey<std::decay_t<As>>(key);
80 } else {
82 }
83 }
84 }(),
85 ...);
86 }(framework::pack<Args...>{});
87 }
88
89 template <soa::TableRef R>
90 static void addOriginalRef(const char* name, bool value, std::vector<InputSpec>& inputs)
91 {
92 auto spec = soa::tableRef2InputSpec<R>();
93 spec.metadata.emplace_back(ConfigParamSpec{std::string{"control:"} + name, VariantType::Bool, value, {"\"\""}});
94 DataSpecUtils::updateInputList(inputs, std::move(spec));
95 }
96
98 template <soa::is_table A>
100 static void addExpression(int, uint32_t, std::vector<ExpressionInfo>&)
101 {
102 }
103
104 template <soa::is_filtered_table A>
105 static void addExpression(int ai, uint32_t hash, std::vector<ExpressionInfo>& eInfos)
106 {
107 auto fields = soa::createFieldsFromColumns(typename std::decay_t<A>::persistent_columns_t{});
108 eInfos.emplace_back(ai, hash, std::decay_t<A>::hashes(), std::make_shared<arrow::Schema>(fields));
109 }
110
111 template <soa::is_iterator A>
112 static void addExpression(int ai, uint32_t hash, std::vector<ExpressionInfo>& eInfos)
113 {
114 addExpression<typename std::decay_t<A>::parent_t>(ai, hash, eInfos);
115 }
116
118 template <soa::is_table A>
119 static void addInput(const char* name, bool value, std::vector<InputSpec>& inputs)
120 {
121 [&name, &value, &inputs]<size_t N, std::array<soa::TableRef, N> refs, size_t... Is>(std::index_sequence<Is...>) mutable {
122 (addOriginalRef<refs[Is]>(name, value, inputs), ...);
123 }.template operator()<A::originals.size(), std::decay_t<A>::originals>(std::make_index_sequence<std::decay_t<A>::originals.size()>());
124 }
125
126 template <soa::is_iterator A>
127 static void addInput(const char* name, bool value, std::vector<InputSpec>& inputs)
128 {
129 addInput<typename std::decay_t<A>::parent_t>(name, value, inputs);
130 }
131
133 template <soa::is_table... As>
134 static void addInputsAndExpressions(uint32_t hash, const char* name, bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos)
135 {
136 int ai = -1;
137 ([&ai, &hash, &eInfos, &name, &value, &inputs]() mutable {
138 ++ai;
139 using T = std::decay_t<As>;
140 addExpression<T>(ai, hash, eInfos);
141 addInput<T>(name, value, inputs);
142 }(),
143 ...);
144 }
145
147 template <typename T>
148 inline static bool requestInputsFromArgs(T&, std::string const&, std::vector<InputSpec>&, std::vector<ExpressionInfo>&)
149 {
150 return false;
151 }
152 template <is_process_configurable T>
153 inline static bool requestInputsFromArgs(T& pc, std::string const& name, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eis)
154 {
155 AnalysisDataProcessorBuilder::inputsFromArgs(pc.process, (name + "/" + pc.name).c_str(), pc.value, inputs, eis);
156 return true;
157 }
158 template <typename T>
159 inline static bool requestCacheFromArgs(T&, Cache&, Cache&)
160 {
161 return false;
162 }
163 template <is_process_configurable T>
164 inline static bool requestCacheFromArgs(T& pc, Cache& bk, Cache& bku)
165 {
166 AnalysisDataProcessorBuilder::cacheFromArgs(pc.process, pc.value, bk, bku);
167 return true;
168 }
170 template <typename R, typename C, is_enumeration A>
171 static void inputsFromArgs(R (C::*)(A), const char* /*name*/, bool /*value*/, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>&) //, Cache&, Cache&)
172 {
173 std::vector<ConfigParamSpec> inputMetadata;
174 // FIXME: for the moment we do not support begin, end and step.
175 DataSpecUtils::updateInputList(inputs, InputSpec{"enumeration", "DPL", "ENUM", 0, Lifetime::Enumeration, inputMetadata});
176 }
177
179 template <typename R, typename C, soa::is_iterator A, soa::is_table... Args>
180 static void inputsFromArgs(R (C::*)(A, Args...), const char* name, bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos) //, Cache& bk, Cache& bku)
181 requires(std::is_lvalue_reference_v<A> && (std::is_lvalue_reference_v<Args> && ...))
182 {
183 constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId<R (C::*)(A, Args...)>();
184 addInputsAndExpressions<typename std::decay_t<A>::parent_t, Args...>(hash, name, value, inputs, eInfos);
185 }
186
188 template <typename R, typename C, soa::is_table... Args>
189 static void inputsFromArgs(R (C::*)(Args...), const char* name, bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos) //, Cache&, Cache&)
190 requires(std::is_lvalue_reference_v<Args> && ...)
191 {
192 constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId<R (C::*)(Args...)>();
193 addInputsAndExpressions<Args...>(hash, name, value, inputs, eInfos);
194 }
195
197 template <typename R, typename C, is_enumeration A>
198 static void cacheFromArgs(R (C::*)(A), bool, Cache&, Cache&)
199 {
200 }
202 template <typename R, typename C, soa::is_iterator A, soa::is_table... Args>
203 static void cacheFromArgs(R (C::*)(A, Args...), bool value, Cache& bk, Cache& bku)
204 {
205 addGroupingCandidates<A, Args...>(bk, bku, value);
206 }
208 template <typename R, typename C, soa::is_table A, soa::is_table... Args>
209 static void cacheFromArgs(R (C::*)(A, Args...), bool, Cache&, Cache&)
210 {
211 }
212
213 template <soa::TableRef R>
214 static auto extractTableFromRecord(InputRecord& record)
215 {
216 auto table = record.get<TableConsumer>(o2::aod::label<R>())->asArrowTable();
217 if (table->num_rows() == 0) {
218 table = makeEmptyTable<R>();
219 }
220 return table;
221 }
222
223 template <soa::is_table T>
224 static auto extractFromRecord(InputRecord& record)
225 {
226 return T { [&record]<size_t N, std::array<soa::TableRef, N> refs, size_t... Is>(std::index_sequence<Is...>) { return std::vector{extractTableFromRecord<refs[Is]>(record)...}; }.template operator()<T::originals.size(), T::originals>(std::make_index_sequence<T::originals.size()>()) };
227 }
228
229 template <soa::is_iterator T>
230 static auto extractFromRecord(InputRecord& record)
231 {
232 return typename T::parent_t { [&record]<size_t N, std::array<soa::TableRef, N> refs, size_t... Is>(std::index_sequence<Is...>) { return std::vector{extractTableFromRecord<refs[Is]>(record)...}; }.template operator()<T::parent_t::originals.size(), T::parent_t::originals>(std::make_index_sequence<T::parent_t::originals.size()>()) };
233 }
234
235 template <soa::is_filtered T>
236 static auto extractFilteredFromRecord(InputRecord& record, ExpressionInfo& info)
237 {
238 std::shared_ptr<arrow::Table> table = nullptr;
239 auto joiner = [&record]<size_t N, std::array<soa::TableRef, N> refs, size_t... Is>(std::index_sequence<Is...>) { return std::vector{extractTableFromRecord<refs[Is]>(record)...}; };
240 if constexpr (soa::is_iterator<T>) {
241 table = o2::soa::ArrowHelpers::joinTables(joiner.template operator()<T::parent_t::originals.size(), T::parent_t::originals>(std::make_index_sequence<T::parent_t::originals.size()>()), std::span{T::parent_t::originalLabels});
242 } else {
243 table = o2::soa::ArrowHelpers::joinTables(joiner.template operator()<T::originals.size(), T::originals>(std::make_index_sequence<T::originals.size()>()), std::span{T::originalLabels});
244 }
247 if (info.selection == nullptr) {
249 }
250 }
251 if constexpr (soa::is_iterator<T>) {
252 return typename T::parent_t({table}, info.selection);
253 } else {
254 return T({table}, info.selection);
255 }
256 }
257
258 template <is_enumeration T, int AI>
259 static auto extract(InputRecord&, std::vector<ExpressionInfo>&, size_t)
260 {
261 return T{};
262 }
263
264 template <soa::is_iterator T, int AI>
265 static auto extract(InputRecord& record, std::vector<ExpressionInfo>& infos, size_t phash)
266 {
267 if constexpr (std::same_as<typename T::policy_t, soa::FilteredIndexPolicy>) {
268 return extractFilteredFromRecord<T>(record, *std::find_if(infos.begin(), infos.end(), [&phash](ExpressionInfo const& i) { return (i.processHash == phash && i.argumentIndex == AI); }));
269 } else {
270 return extractFromRecord<T>(record);
271 }
272 }
273
274 template <soa::is_table T, int AI>
275 static auto extract(InputRecord& record, std::vector<ExpressionInfo>& infos, size_t phash)
276 {
277 if constexpr (soa::is_filtered_table<T>) {
278 return extractFilteredFromRecord<T>(record, *std::find_if(infos.begin(), infos.end(), [&phash](ExpressionInfo const& i) { return (i.processHash == phash && i.argumentIndex == AI); }));
279 } else {
280 return extractFromRecord<T>(record);
281 }
282 }
283
284 template <typename R, typename C, typename Grouping, typename... Args>
285 static auto bindGroupingTable(InputRecord& record, R (C::*)(Grouping, Args...), std::vector<ExpressionInfo>& infos)
286 requires(!std::same_as<Grouping, void>)
287 {
288 constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId<R (C::*)(Grouping, Args...)>();
289 return extract<std::decay_t<Grouping>, 0>(record, infos, hash);
290 }
291
292 template <typename R, typename C, typename Grouping, typename... Args>
293 static auto bindAssociatedTables(InputRecord& record, R (C::*)(Grouping, Args...), std::vector<ExpressionInfo>& infos)
294 requires(!std::same_as<Grouping, void> && sizeof...(Args) > 0)
295 {
296 constexpr auto p = pack<Args...>{};
297 constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId<R (C::*)(Grouping, Args...)>();
298 return std::make_tuple(extract<std::decay_t<Args>, has_type_at_v<Args>(p) + 1>(record, infos, hash)...);
299 }
300
301 template <typename... As>
302 static void overwriteInternalIndices(std::tuple<As...>& dest, std::tuple<As...> const& src)
303 {
304 (std::get<As>(dest).bindInternalIndicesTo(&std::get<As>(src)), ...);
305 }
306
307 template <typename Task, typename R, typename C, typename Grouping, typename... Associated>
308 static void invokeProcess(Task& task, InputRecord& inputs, R (C::*processingFunction)(Grouping, Associated...), std::vector<ExpressionInfo>& infos, ArrowTableSlicingCache& slices)
309 {
310 using G = std::decay_t<Grouping>;
311 auto groupingTable = AnalysisDataProcessorBuilder::bindGroupingTable(inputs, processingFunction, infos);
312
313 // set filtered tables for partitions with grouping
314 homogeneous_apply_refs([&groupingTable](auto& element) {
315 analysis_task_parsers::setPartition(element, groupingTable);
317 return true;
318 },
319 task);
320
321 if constexpr (sizeof...(Associated) == 0) {
322 // single argument to process
323 homogeneous_apply_refs([&groupingTable](auto& element) {
325 analysis_task_parsers::setGroupedCombination(element, groupingTable);
326 return true;
327 },
328 task);
329 if constexpr (soa::is_iterator<G>) {
330 for (auto& element : groupingTable) {
331 std::invoke(processingFunction, task, *element);
332 }
333 } else {
334 static_assert(soa::is_table<G> || is_enumeration<G>,
335 "Single argument of process() should be a table-like or an iterator");
336 std::invoke(processingFunction, task, groupingTable);
337 }
338 } else {
339 // multiple arguments to process
340 static_assert(((soa::is_iterator<std::decay_t<Associated>> == false) && ...),
341 "Associated arguments of process() should not be iterators");
342 auto associatedTables = AnalysisDataProcessorBuilder::bindAssociatedTables(inputs, processingFunction, infos);
343 // pre-bind self indices
344 std::apply(
345 [&task](auto&... t) mutable {
347 [&t](auto& p) {
349 return true;
350 },
351 task),
352 ...);
353 },
354 associatedTables);
355
356 auto binder = [&task, &groupingTable, &associatedTables](auto& x) mutable {
357 x.bindExternalIndices(&groupingTable, &std::get<std::decay_t<Associated>>(associatedTables)...);
358 homogeneous_apply_refs([&x](auto& t) mutable {
361 return true;
362 },
363 task);
364 };
365 groupingTable.bindExternalIndices(&std::get<std::decay_t<Associated>>(associatedTables)...);
366
367 // always pre-bind full tables to support index hierarchy
368 std::apply(
369 [&binder](auto&... x) mutable {
370 (binder(x), ...);
371 },
372 associatedTables);
373
374 // GroupedCombinations bound separately, as they should be set once for all associated tables
375 homogeneous_apply_refs([&groupingTable, &associatedTables](auto& t) {
376 analysis_task_parsers::setGroupedCombination(t, groupingTable, associatedTables);
377 return true;
378 },
379 task);
380 overwriteInternalIndices(associatedTables, associatedTables);
381 if constexpr (soa::is_iterator<std::decay_t<G>>) {
382 auto slicer = GroupSlicer(groupingTable, associatedTables, slices);
383 for (auto& slice : slicer) {
384 auto associatedSlices = slice.associatedTables();
385 overwriteInternalIndices(associatedSlices, associatedTables);
386 std::apply(
387 [&binder](auto&... x) mutable {
388 (binder(x), ...);
389 },
390 associatedSlices);
391
392 // bind partitions and grouping table
393 homogeneous_apply_refs([&groupingTable](auto& x) {
395 return true;
396 },
397 task);
398
399 invokeProcessWithArgs(task, processingFunction, slice.groupingElement(), associatedSlices);
400 }
401 } else {
402 // bind partitions and grouping table
403 homogeneous_apply_refs([&groupingTable](auto& x) {
405 return true;
406 },
407 task);
408
409 invokeProcessWithArgs(task, processingFunction, groupingTable, associatedTables);
410 }
411 }
412 }
413
414 template <typename C, typename T, typename G, typename... A>
415 static void invokeProcessWithArgs(C& task, T processingFunction, G g, std::tuple<A...>& at)
416 {
417 std::invoke(processingFunction, task, g, std::get<A>(at)...);
418 }
419};
420}
421
423 std::vector<std::pair<std::string, bool>> map;
424};
425
427struct TaskName {
428 TaskName(std::string name) : value{std::move(name)} {}
429 std::string value;
430};
431
432namespace {
433template <typename T, typename... A>
434auto getTaskNameSetProcesses(std::string& outputName, TaskName first, SetDefaultProcesses second, A... args)
435{
436 auto task = std::make_shared<T>(std::forward<A>(args)...);
437 for (auto& setting : second.map) {
439 [&](auto& element) {
440 return analysis_task_parsers::setProcessSwitch(setting, element);
441 },
442 *task.get());
443 }
444 outputName = first.value;
445 return task;
446}
447
448template <typename T, typename... A>
449auto getTaskNameSetProcesses(std::string& outputName, SetDefaultProcesses first, TaskName second, A... args)
450{
451 auto task = std::make_shared<T>(std::forward<A>(args)...);
452 for (auto& setting : first.map) {
454 [&](auto& element) {
455 return analysis_task_parsers::setProcessSwitch(setting, element);
456 },
457 *task.get());
458 }
459 outputName = second.value;
460 return task;
461}
462
463template <typename T, typename... A>
464auto getTaskNameSetProcesses(std::string& outputName, SetDefaultProcesses first, A... args)
465{
466 auto task = std::make_shared<T>(std::forward<A>(args)...);
467 for (auto& setting : first.map) {
469 [&](auto& element) {
470 return analysis_task_parsers::setProcessSwitch(setting, element);
471 },
472 *task.get());
473 }
474 auto type_name_str = type_name<T>();
475 outputName = type_to_task_name(type_name_str);
476 return task;
477}
478
479template <typename T, typename... A>
480auto getTaskNameSetProcesses(std::string& outputName, TaskName first, A... args)
481{
482 auto task = std::make_shared<T>(std::forward<A>(args)...);
483 outputName = first.value;
484 return task;
485}
486
487template <typename T, typename... A>
488auto getTaskNameSetProcesses(std::string& outputName, A... args)
489{
490 auto task = std::make_shared<T>(std::forward<A>(args)...);
491 auto type_name_str = type_name<T>();
492 outputName = type_to_task_name(type_name_str);
493 return task;
494}
495
496}
497
500template <typename T, typename... Args>
502{
503 TH1::AddDirectory(false);
504
505 std::string name_str;
506 auto task = getTaskNameSetProcesses<T>(name_str, args...);
507
508 auto suffix = ctx.options().get<std::string>("workflow-suffix");
509 if (!suffix.empty()) {
510 name_str += suffix;
511 }
512 const char* name = name_str.c_str();
513
514 auto hash = runtime_hash(name);
515
516 std::vector<OutputSpec> outputs;
517 std::vector<InputSpec> inputs;
518 std::vector<ConfigParamSpec> options;
519 std::vector<ExpressionInfo> expressionInfos;
520
522 homogeneous_apply_refs([&options, &hash](auto& element) { return analysis_task_parsers::appendOption(options, element); }, *task.get());
524 homogeneous_apply_refs([&inputs](auto& element) { return analysis_task_parsers::appendCondition(inputs, element); }, *task.get());
525
527 if constexpr (requires { &T::process; }) {
528 AnalysisDataProcessorBuilder::inputsFromArgs(&T::process, "default", true, inputs, expressionInfos);
529 }
531 [name = name_str, &expressionInfos, &inputs](auto& x) mutable {
532 // this pushes (argumentIndex, processHash, schemaPtr, nullptr) into expressionInfos for arguments that are Filtered/filtered_iterators
533 return AnalysisDataProcessorBuilder::requestInputsFromArgs(x, name, inputs, expressionInfos);
534 },
535 *task.get());
536
537 // request base tables for spawnable extended tables and indices to be built
538 // this checks for duplications
539 homogeneous_apply_refs([&inputs](auto& element) {
540 return analysis_task_parsers::requestInputs(inputs, element);
541 },
542 *task.get());
543
544 // no static way to check if the task defines any processing, we can only make sure it subscribes to at least something
545 if (inputs.empty() == true) {
546 LOG(warn) << "Task " << name_str << " has no inputs";
547 }
548
549 homogeneous_apply_refs([&outputs, &hash](auto& element) { return analysis_task_parsers::appendOutput(outputs, element, hash); }, *task.get());
550
551 auto requiredServices = CommonServices::defaultServices();
552 auto arrowServices = CommonServices::arrowServices();
553 requiredServices.insert(requiredServices.end(), arrowServices.begin(), arrowServices.end());
554 homogeneous_apply_refs([&requiredServices](auto& element) { return analysis_task_parsers::addService(requiredServices, element); }, *task.get());
555
556 auto algo = AlgorithmSpec::InitCallback{[task = task, expressionInfos](InitContext& ic) mutable {
557 Cache bindingsKeys;
558 Cache bindingsKeysUnsorted;
559 // add preslice declarations to slicing cache definition
560 homogeneous_apply_refs([&bindingsKeys, &bindingsKeysUnsorted](auto& element) { return analysis_task_parsers::registerCache(element, bindingsKeys, bindingsKeysUnsorted); }, *task.get());
561
562 homogeneous_apply_refs([&ic](auto&& element) { return analysis_task_parsers::prepareOption(ic, element); }, *task.get());
563 homogeneous_apply_refs([&ic](auto&& element) { return analysis_task_parsers::prepareService(ic, element); }, *task.get());
564
565 auto& callbacks = ic.services().get<CallbackService>();
566 auto eoscb = [task](EndOfStreamContext& eosContext) {
567 homogeneous_apply_refs([&eosContext](auto& element) {
568 analysis_task_parsers::postRunService(eosContext, element);
569 analysis_task_parsers::postRunOutput(eosContext, element);
570 return true; },
571 *task.get());
572 eosContext.services().get<ControlService>().readyToQuit(QuitRequest::Me);
573 };
574
575 callbacks.set<CallbackService::Id::EndOfStream>(eoscb);
576
579 [&ic](auto& element) -> bool { return analysis_task_parsers::updatePlaceholders(ic, element); },
580 *task.get());
582 homogeneous_apply_refs([&expressionInfos](auto& element) {
583 return analysis_task_parsers::createExpressionTrees(expressionInfos, element);
584 },
585 *task.get());
586
587 if constexpr (requires { task->init(ic); }) {
588 task->init(ic);
589 }
590
592 if constexpr (requires { &T::process; }) {
593 AnalysisDataProcessorBuilder::cacheFromArgs(&T::process, true, bindingsKeys, bindingsKeysUnsorted);
594 }
596 [&bindingsKeys, &bindingsKeysUnsorted](auto& x) mutable {
597 return AnalysisDataProcessorBuilder::requestCacheFromArgs(x, bindingsKeys, bindingsKeysUnsorted);
598 },
599 *task.get());
600
601 ic.services().get<ArrowTableSlicingCacheDef>().setCaches(std::move(bindingsKeys));
602 ic.services().get<ArrowTableSlicingCacheDef>().setCachesUnsorted(std::move(bindingsKeysUnsorted));
603 // initialize global caches
604 homogeneous_apply_refs([&ic](auto& element) {
606 },
607 *(task.get()));
608
609 return [task, expressionInfos](ProcessingContext& pc) mutable {
610 // load the ccdb object from their cache
611 homogeneous_apply_refs([&pc](auto& element) { return analysis_task_parsers::newDataframeCondition(pc.inputs(), element); }, *task.get());
612 // reset partitions once per dataframe
613 homogeneous_apply_refs([](auto& element) { return analysis_task_parsers::newDataframePartition(element); }, *task.get());
614 // reset selections for the next dataframe
615 for (auto& info : expressionInfos) {
616 info.resetSelection = true;
617 }
618 // reset pre-slice for the next dataframe
619 auto slices = pc.services().get<ArrowTableSlicingCache>();
620 homogeneous_apply_refs([&pc, &slices](auto& element) {
621 return analysis_task_parsers::updateSliceInfo(element, slices);
622 },
623 *(task.get()));
624 // initialize local caches
625 homogeneous_apply_refs([&pc](auto& element) { return analysis_task_parsers::initializeCache(pc, element); }, *(task.get()));
626 // prepare outputs
627 homogeneous_apply_refs([&pc](auto& element) { return analysis_task_parsers::prepareOutput(pc, element); }, *task.get());
628 // execute run()
629 if constexpr (requires { task->run(pc); }) {
630 task->run(pc);
631 }
632 // execute process()
633 if constexpr (requires { AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), &T::process, expressionInfos, slices); }) {
634 AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), &T::process, expressionInfos, slices);
635 }
636 // execute optional process()
638 [&pc, &expressionInfos, &task, &slices](auto& x) mutable {
639 if constexpr (base_of_template<ProcessConfigurable, std::decay_t<decltype(x)>>) {
640 if (x.value == true) {
641 AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), x.process, expressionInfos, slices);
642 return true;
643 }
644 }
645 return false;
646 },
647 *task.get());
648 // prepare delayed outputs
649 homogeneous_apply_refs([&pc](auto& element) { return analysis_task_parsers::prepareDelayedOutput(pc, element); }, *task.get());
650 // finalize outputs
651 homogeneous_apply_refs([&pc](auto& element) { return analysis_task_parsers::finalizeOutput(pc, element); }, *task.get());
652 };
653 }};
654
655 return {
656 name,
657 // FIXME: For the moment we hardcode this. We could build
658 // this list from the list of methods actually implemented in the
659 // task itself.
660 inputs,
661 outputs,
662 algo,
663 options,
664 requiredServices};
665}
666
667} // namespace o2::framework
668#endif // FRAMEWORK_ANALYSISTASK_H_
int32_t i
constexpr uint32_t runtime_hash(char const *str)
StringRef key
Definition A.h:16
ConfigParamRegistry & options() const
Helper to check if a type T is an iterator.
Definition ASoA.h:1257
GLint GLenum GLint x
Definition glcorearb.h:403
GLenum src
Definition glcorearb.h:1767
GLuint GLuint end
Definition glcorearb.h:469
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum GLenum GLsizei const GLuint GLboolean enabled
Definition glcorearb.h:2513
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLboolean GLboolean g
Definition glcorearb.h:1233
bool prepareService(InitContext &, T &)
bool requestInputs(std::vector< InputSpec > &, T const &)
Table auto-creation handling.
void setGroupedCombination(C &, TG &, Ts &...)
Combinations handling.
bool initializeCache(ProcessingContext &, T &)
bool preInitializeCache(InitContext &, T &)
Cache handling.
bool prepareDelayedOutput(ProcessingContext &, T &)
bool prepareOption(InitContext &, O &)
bool appendOutput(std::vector< OutputSpec > &, T &, uint32_t)
Outputs handling.
bool finalizeOutput(ProcessingContext &, T &)
bool registerCache(T &, Cache &, Cache &)
Preslice handling.
bool postRunOutput(EndOfStreamContext &, T &)
bool createExpressionTrees(std::vector< ExpressionInfo > &, T &)
bool prepareOutput(ProcessingContext &, T &)
bool appendOption(std::vector< ConfigParamSpec > &, O &)
Options handling.
bool updateSliceInfo(T &, ArrowTableSlicingCache &)
bool appendCondition(std::vector< InputSpec > &, C &)
Conditions handling.
bool postRunService(EndOfStreamContext &, T &)
bool addService(std::vector< ServiceSpec > &, T &)
Service handling.
bool updatePlaceholders(InitContext &, T &)
Filter handling.
bool newDataframeCondition(InputRecord &, C &)
void updateFilterInfo(ExpressionInfo &info, std::shared_ptr< arrow::Table > &table)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
auto homogeneous_apply_refs(L l, T &&object)
DataProcessorSpec adaptAnalysisTask(ConfigContext const &ctx, Args &&... args)
std::vector< Entry > Cache
void updatePairList(Cache &list, std::string const &binding, std::string const &key, bool enabled)
@ Me
Only quit this data processor.
std::string cutString(std::string &&str)
Definition ASoA.cxx:177
auto createFieldsFromColumns(framework::pack< C... >)
Definition ASoA.h:407
void missingFilterDeclaration(int hash, int ai)
Definition ASoA.cxx:28
Defining DataPointCompositeObject explicitly as copiable.
gandiva::Selection selection
Definition Expressions.h:65
size_t processHash
Definition Expressions.h:60
std::function< ProcessCallback(InitContext &)> InitCallback
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static std::vector< ServiceSpec > arrowServices()
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static constexpr int64_t step
static constexpr int64_t begin
std::vector< std::pair< std::string, bool > > map
Struct to differentiate task names from possible task string arguments.
TaskName(std::string name)
static constexpr uint32_t uniqueId()
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables, std::span< const char *const > labels)
Definition ASoA.cxx:67
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"