Project
Loading...
Searching...
No Matches
AnalysisTask.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#ifndef FRAMEWORK_ANALYSIS_TASK_H_
13#define FRAMEWORK_ANALYSIS_TASK_H_
14
28
29#include <arrow/compute/kernel.h>
30#include <arrow/table.h>
31#include <gandiva/node.h>
32#include <type_traits>
33#include <utility>
34#include <memory>
35#include <tuple> // IWYU pragma: export
36
37namespace o2::framework
38{
40std::string type_to_task_name(std::string_view const& camelCase);
41
50};
51
52template <int64_t BEGIN, int64_t END, int64_t STEP = 1>
54 static constexpr int64_t begin = BEGIN;
55 static constexpr int64_t end = END;
56 static constexpr int64_t step = STEP;
57};
58
59template <typename T>
60static constexpr bool is_enumeration_v = false;
61
62template <int64_t BEGIN, int64_t END, int64_t STEP>
63static constexpr bool is_enumeration_v<Enumeration<BEGIN, END, STEP>> = true;
64
65template <typename T>
66concept is_enumeration = is_enumeration_v<std::decay_t<T>>;
67
68template <typename T>
70
71// Helper struct which builds a DataProcessorSpec from
72// the contents of an AnalysisTask...
73namespace
74{
75struct AnalysisDataProcessorBuilder {
76 template <soa::is_iterator G, soa::is_table... Args>
77 static void addGroupingCandidates(Cache& bk, Cache& bku, bool enabled)
78 {
79 []<soa::is_table... As>(framework::pack<As...>, Cache& bk, Cache& bku, bool enabled) {
80 auto key = std::string{"fIndex"} + o2::framework::cutString(soa::getLabelFromType<std::decay_t<G>>());
81 ([](Cache& bk, Cache& bku, bool enabled, std::string const& key) {
82 if constexpr (soa::relatedByIndex<std::decay_t<G>, std::decay_t<As>>()) {
83 Entry e{soa::getLabelFromTypeForKey<std::decay_t<As>>(key), soa::getMatcherFromTypeForKey<std::decay_t<As>>(key), key, enabled};
86 } else {
88 }
89 }
90 }(bk, bku, enabled, key),
91 ...);
92 }(framework::pack<Args...>{}, bk, bku, enabled);
93 }
94
95 template <soa::TableRef R>
96 static void addOriginalRef(const char* name, bool value, std::vector<InputSpec>& inputs)
97 {
98 auto spec = soa::tableRef2InputSpec<R>();
99 spec.metadata.emplace_back(ConfigParamSpec{std::string{"control:"} + name, VariantType::Bool, value, {"\"\""}});
100 DataSpecUtils::updateInputList(inputs, std::move(spec));
101 }
102
104 template <soa::is_table A>
106 static void addExpression(int, uint32_t, std::vector<ExpressionInfo>&)
107 {
108 }
109
110 template <soa::is_filtered_table A>
111 static void addExpression(int ai, uint32_t hash, std::vector<ExpressionInfo>& eInfos)
112 {
113 auto fields = soa::createFieldsFromColumns(typename std::decay_t<A>::persistent_columns_t{});
114 eInfos.emplace_back(ai, hash, std::decay_t<A>::hashes(), std::make_shared<arrow::Schema>(fields));
115 }
116
117 template <soa::is_iterator A>
118 static void addExpression(int ai, uint32_t hash, std::vector<ExpressionInfo>& eInfos)
119 {
120 addExpression<typename std::decay_t<A>::parent_t>(ai, hash, eInfos);
121 }
122
124 template <soa::is_table A>
125 static void addInput(const char* name, bool value, std::vector<InputSpec>& inputs)
126 {
127 [&name, &value, &inputs]<size_t N, std::array<soa::TableRef, N> refs, size_t... Is>(std::index_sequence<Is...>) mutable {
128 (addOriginalRef<refs[Is]>(name, value, inputs), ...);
129 }.template operator()<A::originals.size(), std::decay_t<A>::originals>(std::make_index_sequence<std::decay_t<A>::originals.size()>());
130 }
131
132 template <soa::is_iterator A>
133 static void addInput(const char* name, bool value, std::vector<InputSpec>& inputs)
134 {
135 addInput<typename std::decay_t<A>::parent_t>(name, value, inputs);
136 }
137
139 template <soa::is_table... As>
140 static void addInputsAndExpressions(uint32_t hash, const char* name, bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos)
141 {
142 int ai = -1;
143 ([&ai, &hash, &eInfos, &name, &value, &inputs]() mutable {
144 ++ai;
145 using T = std::decay_t<As>;
146 addExpression<T>(ai, hash, eInfos);
147 addInput<T>(name, value, inputs);
148 }(),
149 ...);
150 }
151
153 template <typename T>
154 inline static bool requestInputsFromArgs(T&, std::string const&, std::vector<InputSpec>&, std::vector<ExpressionInfo>&)
155 {
156 return false;
157 }
158 template <is_process_configurable T>
159 inline static bool requestInputsFromArgs(T& pc, std::string const& name, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eis)
160 {
161 AnalysisDataProcessorBuilder::inputsFromArgs(pc.process, (name + "/" + pc.name).c_str(), pc.value, inputs, eis);
162 return true;
163 }
164 template <typename T>
165 inline static bool requestCacheFromArgs(T&, Cache&, Cache&)
166 {
167 return false;
168 }
169 template <is_process_configurable T>
170 inline static bool requestCacheFromArgs(T& pc, Cache& bk, Cache& bku)
171 {
172 AnalysisDataProcessorBuilder::cacheFromArgs(pc.process, pc.value, bk, bku);
173 return true;
174 }
176 template <typename C, is_enumeration A>
177 static void inputsFromArgs(void (C::*)(A), const char* /*name*/, bool /*value*/, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>&) //, Cache&, Cache&)
178 {
179 std::vector<ConfigParamSpec> inputMetadata;
180 // FIXME: for the moment we do not support begin, end and step.
181 DataSpecUtils::updateInputList(inputs, InputSpec{"enumeration", "DPL", "ENUM", 0, Lifetime::Enumeration, inputMetadata});
182 }
183
185 template <typename C, soa::is_iterator A, soa::is_table... Args>
186 static void inputsFromArgs(void (C::*)(A, Args...), const char* name, bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos) //, Cache& bk, Cache& bku)
187 requires(std::is_lvalue_reference_v<A> && (std::is_lvalue_reference_v<Args> && ...))
188 {
189 constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId<void (C::*)(A, Args...)>();
190 addInputsAndExpressions<typename std::decay_t<A>::parent_t, Args...>(hash, name, value, inputs, eInfos);
191 }
192
194 template <typename C, soa::is_table... Args>
195 static void inputsFromArgs(void (C::*)(Args...), const char* name, bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos) //, Cache&, Cache&)
196 requires(std::is_lvalue_reference_v<Args> && ...)
197 {
198 constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId<void (C::*)(Args...)>();
199 addInputsAndExpressions<Args...>(hash, name, value, inputs, eInfos);
200 }
201
203 template <typename C, is_enumeration A>
204 static void cacheFromArgs(void (C::*)(A), bool, Cache&, Cache&)
205 {
206 }
208 template <typename C, soa::is_iterator A, soa::is_table... Args>
209 static void cacheFromArgs(void (C::*)(A, Args...), bool value, Cache& bk, Cache& bku)
210 {
211 addGroupingCandidates<A, Args...>(bk, bku, value);
212 }
214 template <typename C, soa::is_table A, soa::is_table... Args>
215 static void cacheFromArgs(void (C::*)(A, Args...), bool, Cache&, Cache&)
216 {
217 }
218
219 template <soa::TableRef R>
220 static auto extractTableFromRecord(InputRecord& record)
221 {
222 auto table = record.get<TableConsumer>(o2::aod::matcher<R>())->asArrowTable();
223 if (table->num_rows() == 0) {
224 table = makeEmptyTable<R>();
225 }
226 return table;
227 }
228
229 template <soa::is_table T>
230 static auto extractFromRecord(InputRecord& record)
231 {
232 return T { [&record]<size_t N, std::array<soa::TableRef, N> refs, size_t... Is>(std::index_sequence<Is...>) { return std::vector{extractTableFromRecord<refs[Is]>(record)...}; }.template operator()<T::originals.size(), T::originals>(std::make_index_sequence<T::originals.size()>()) };
233 }
234
235 template <soa::is_iterator T>
236 static auto extractFromRecord(InputRecord& record)
237 {
238 return typename T::parent_t { [&record]<size_t N, std::array<soa::TableRef, N> refs, size_t... Is>(std::index_sequence<Is...>) { return std::vector{extractTableFromRecord<refs[Is]>(record)...}; }.template operator()<T::parent_t::originals.size(), T::parent_t::originals>(std::make_index_sequence<T::parent_t::originals.size()>()) };
239 }
240
241 template <soa::is_filtered T>
242 static auto extractFilteredFromRecord(InputRecord& record, ExpressionInfo& info)
243 {
244 std::shared_ptr<arrow::Table> table = nullptr;
245 auto joiner = [&record]<size_t N, std::array<soa::TableRef, N> refs, size_t... Is>(std::index_sequence<Is...>) { return std::vector{extractTableFromRecord<refs[Is]>(record)...}; };
246 if constexpr (soa::is_iterator<T>) {
247 table = o2::soa::ArrowHelpers::joinTables(joiner.template operator()<T::parent_t::originals.size(), T::parent_t::originals>(std::make_index_sequence<T::parent_t::originals.size()>()), std::span{T::parent_t::originalLabels});
248 } else {
249 table = o2::soa::ArrowHelpers::joinTables(joiner.template operator()<T::originals.size(), T::originals>(std::make_index_sequence<T::originals.size()>()), std::span{T::originalLabels});
250 }
253 if (info.selection == nullptr) {
255 }
256 }
257 if constexpr (soa::is_iterator<T>) {
258 return typename T::parent_t({table}, info.selection);
259 } else {
260 return T({table}, info.selection);
261 }
262 }
263
264 template <is_enumeration T, int AI>
265 static auto extract(InputRecord&, std::vector<ExpressionInfo>&, size_t)
266 {
267 return T{};
268 }
269
270 template <soa::is_iterator T, int AI>
271 static auto extract(InputRecord& record, std::vector<ExpressionInfo>& infos, size_t phash)
272 {
273 if constexpr (std::same_as<typename T::policy_t, soa::FilteredIndexPolicy>) {
274 return extractFilteredFromRecord<T>(record, *std::find_if(infos.begin(), infos.end(), [&phash](ExpressionInfo const& i) { return (i.processHash == phash && i.argumentIndex == AI); }));
275 } else {
276 return extractFromRecord<T>(record);
277 }
278 }
279
280 template <soa::is_table T, int AI>
281 static auto extract(InputRecord& record, std::vector<ExpressionInfo>& infos, size_t phash)
282 {
283 if constexpr (soa::is_filtered_table<T>) {
284 return extractFilteredFromRecord<T>(record, *std::find_if(infos.begin(), infos.end(), [&phash](ExpressionInfo const& i) { return (i.processHash == phash && i.argumentIndex == AI); }));
285 } else {
286 return extractFromRecord<T>(record);
287 }
288 }
289
290 template <typename C, is_table_iterator_or_enumeration Grouping, soa::is_table... Args>
291 static auto bindGroupingTable(InputRecord& record, void (C::*)(Grouping, Args...), std::vector<ExpressionInfo>& infos)
292 requires(!std::same_as<Grouping, void>)
293 {
294 constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId<void (C::*)(Grouping, Args...)>();
295 return extract<std::decay_t<Grouping>, 0>(record, infos, hash);
296 }
297
298 template <typename C, is_table_iterator_or_enumeration Grouping, soa::is_table... Args>
299 static auto bindAssociatedTables(InputRecord& record, void (C::*)(Grouping, Args...), std::vector<ExpressionInfo>& infos)
300 requires(!std::same_as<Grouping, void> && sizeof...(Args) > 0)
301 {
302 constexpr auto p = pack<Args...>{};
303 constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId<void (C::*)(Grouping, Args...)>();
304 return std::make_tuple(extract<std::decay_t<Args>, has_type_at_v<Args>(p) + 1>(record, infos, hash)...);
305 }
306
307 template <soa::is_table... As>
308 static void overwriteInternalIndices(std::tuple<As...>& dest, std::tuple<As...> const& src)
309 {
310 (std::get<As>(dest).bindInternalIndicesTo(&std::get<As>(src)), ...);
311 }
312
313 template <typename Task, is_table_iterator_or_enumeration Grouping, soa::is_table... Associated>
314 static void invokeProcess(Task& task, InputRecord& inputs, void (Task::*processingFunction)(Grouping, Associated...), std::vector<ExpressionInfo>& infos, ArrowTableSlicingCache& slices)
315 {
316 using G = std::decay_t<Grouping>;
317 auto groupingTable = AnalysisDataProcessorBuilder::bindGroupingTable(inputs, processingFunction, infos);
318
319 constexpr const int numElements = nested_brace_constructible_size<false, std::decay_t<Task>>() / 10;
320
321 // set filtered tables for partitions with grouping
322 homogeneous_apply_refs_sized<numElements>([&groupingTable](auto& element) {
325 return true;
326 },
327 task);
328
329 if constexpr (sizeof...(Associated) == 0) {
330 // single argument to process
331 homogeneous_apply_refs_sized<numElements>([&groupingTable](auto& element) {
334 return true;
335 },
336 task);
337 if constexpr (soa::is_iterator<G>) {
338 for (auto& element : groupingTable) {
339 std::invoke(processingFunction, task, *element);
340 }
341 } else {
342 static_assert(soa::is_table<G> || is_enumeration<G>,
343 "Single argument of process() should be a table-like or an iterator");
344 std::invoke(processingFunction, task, groupingTable);
345 }
346 } else {
347 // multiple arguments to process
348 static_assert(((soa::is_iterator<std::decay_t<Associated>> == false) && ...),
349 "Associated arguments of process() should not be iterators");
350 auto associatedTables = AnalysisDataProcessorBuilder::bindAssociatedTables(inputs, processingFunction, infos);
351 // pre-bind self indices
352 std::apply(
353 [&task](auto&... t) mutable {
354 (homogeneous_apply_refs_sized<numElements>(
355 [&t](auto& p) {
357 return true;
358 },
359 task),
360 ...);
361 },
362 associatedTables);
363
364 auto binder = [&task, &groupingTable, &associatedTables](auto& x) mutable {
365 x.bindExternalIndices(&groupingTable, &std::get<std::decay_t<Associated>>(associatedTables)...);
366 homogeneous_apply_refs_sized<numElements>([&x](auto& t) mutable {
369 return true;
370 },
371 task);
372 };
373 groupingTable.bindExternalIndices(&std::get<std::decay_t<Associated>>(associatedTables)...);
374
375 // always pre-bind full tables to support index hierarchy
376 std::apply(
377 [&binder](auto&... x) mutable {
378 (binder(x), ...);
379 },
380 associatedTables);
381
382 // GroupedCombinations bound separately, as they should be set once for all associated tables
383 homogeneous_apply_refs_sized<numElements>([&groupingTable, &associatedTables](auto& t) {
384 analysis_task_parsers::setGroupedCombination(t, groupingTable, associatedTables);
385 return true;
386 },
387 task);
388 overwriteInternalIndices(associatedTables, associatedTables);
389 if constexpr (soa::is_iterator<std::decay_t<G>>) {
390 auto slicer = GroupSlicer(groupingTable, associatedTables, slices);
391 for (auto& slice : slicer) {
392 auto associatedSlices = slice.associatedTables();
393 overwriteInternalIndices(associatedSlices, associatedTables);
394 std::apply(
395 [&binder](auto&... x) mutable {
396 (binder(x), ...);
397 },
398 associatedSlices);
399
400 // bind partitions and grouping table
401 homogeneous_apply_refs_sized<numElements>([&groupingTable](auto& x) {
403 return true;
404 },
405 task);
406
407 invokeProcessWithArgs(task, processingFunction, slice.groupingElement(), associatedSlices);
408 }
409 } else {
410 // bind partitions and grouping table
411 homogeneous_apply_refs_sized<numElements>([&groupingTable](auto& x) {
413 return true;
414 },
415 task);
416
417 invokeProcessWithArgs(task, processingFunction, groupingTable, associatedTables);
418 }
419 }
420 }
421
422 template <typename C, typename T, is_table_iterator_or_enumeration G, soa::is_table... A>
423 static void invokeProcessWithArgs(C& task, T processingFunction, G g, std::tuple<A...>& at)
424 {
425 std::invoke(processingFunction, task, g, std::get<A>(at)...);
426 }
427};
428} // namespace
429
431 std::vector<std::pair<std::string, bool>> map;
432};
433
435struct TaskName {
436 TaskName(std::string name) : value{std::move(name)} {}
437 std::string value;
438};
439
440namespace
441{
442template <typename T, typename... A>
443auto getTaskNameSetProcesses(std::string& outputName, TaskName first, SetDefaultProcesses second, A... args)
444{
445 auto task = std::make_shared<T>(std::forward<A>(args)...);
446 for (auto& setting : second.map) {
448 [&](auto& element) {
449 return analysis_task_parsers::setProcessSwitch(setting, element);
450 },
451 *task.get());
452 }
453 outputName = first.value;
454 return task;
455}
456
457template <typename T, typename... A>
458auto getTaskNameSetProcesses(std::string& outputName, SetDefaultProcesses first, TaskName second, A... args)
459{
460 auto task = std::make_shared<T>(std::forward<A>(args)...);
461 for (auto& setting : first.map) {
463 [&](auto& element) {
464 return analysis_task_parsers::setProcessSwitch(setting, element);
465 },
466 *task.get());
467 }
468 outputName = second.value;
469 return task;
470}
471
472template <typename T, typename... A>
473auto getTaskNameSetProcesses(std::string& outputName, SetDefaultProcesses first, A... args)
474{
475 auto task = std::make_shared<T>(std::forward<A>(args)...);
476 for (auto& setting : first.map) {
478 [&](auto& element) {
479 return analysis_task_parsers::setProcessSwitch(setting, element);
480 },
481 *task.get());
482 }
483 auto type_name_str = type_name<T>();
484 outputName = type_to_task_name(type_name_str);
485 return task;
486}
487
488template <typename T, typename... A>
489auto getTaskNameSetProcesses(std::string& outputName, TaskName first, A... args)
490{
491 auto task = std::make_shared<T>(std::forward<A>(args)...);
492 outputName = first.value;
493 return task;
494}
495
496template <typename T, typename... A>
497auto getTaskNameSetProcesses(std::string& outputName, A... args)
498{
499 auto task = std::make_shared<T>(std::forward<A>(args)...);
500 auto type_name_str = type_name<T>();
501 outputName = type_to_task_name(type_name_str);
502 return task;
503}
504
505} // namespace
506
509template <typename T, typename... Args>
511{
512 TH1::AddDirectory(false);
513
514 std::string name_str;
515 auto task = getTaskNameSetProcesses<T>(name_str, args...);
516
517 auto suffix = ctx.options().get<std::string>("workflow-suffix");
518 if (!suffix.empty()) {
519 name_str += suffix;
520 }
521 const char* name = name_str.c_str();
522
523 auto hash = runtime_hash(name);
524
525 std::vector<OutputSpec> outputs;
526 std::vector<InputSpec> inputs;
527 std::vector<ConfigParamSpec> options;
528 std::vector<ExpressionInfo> expressionInfos;
529
530 constexpr const int numElements = nested_brace_constructible_size<false, std::decay_t<T>>() / 10;
531
533 homogeneous_apply_refs_sized<numElements>([&options](auto& element) { return analysis_task_parsers::appendOption(options, element); }, *task.get());
535 homogeneous_apply_refs_sized<numElements>([&inputs](auto& element) { return analysis_task_parsers::appendCondition(inputs, element); }, *task.get());
536
538 if constexpr (requires { &T::process; }) {
539 AnalysisDataProcessorBuilder::inputsFromArgs(&T::process, "default", true, inputs, expressionInfos);
540 }
541 homogeneous_apply_refs_sized<numElements>(
542 [name = name_str, &expressionInfos, &inputs](auto& x) mutable {
543 // this pushes (argumentIndex, processHash, schemaPtr, nullptr) into expressionInfos for arguments that are Filtered/filtered_iterators
544 return AnalysisDataProcessorBuilder::requestInputsFromArgs(x, name, inputs, expressionInfos);
545 },
546 *task.get());
547
548 // request base tables for spawnable extended tables and indices to be built
549 // this checks for duplications
550 homogeneous_apply_refs_sized<numElements>([&inputs](auto& element) {
552 },
553 *task.get());
554
555 // no static way to check if the task defines any processing, we can only make sure it subscribes to at least something
556 if (inputs.empty() == true) {
557 LOG(warn) << "Task " << name_str << " has no inputs";
558 }
559
560 homogeneous_apply_refs_sized<numElements>([&outputs, &hash](auto& element) { return analysis_task_parsers::appendOutput(outputs, element, hash); }, *task.get());
561
562 auto requiredServices = CommonServices::defaultServices();
563 auto arrowServices = CommonServices::arrowServices();
564 requiredServices.insert(requiredServices.end(), arrowServices.begin(), arrowServices.end());
565 homogeneous_apply_refs_sized<numElements>([&requiredServices](auto& element) { return analysis_task_parsers::addService(requiredServices, element); }, *task.get());
566
567 auto algo = AlgorithmSpec::InitCallback{[task = task, expressionInfos](InitContext& ic) mutable {
568 Cache bindingsKeys;
569 Cache bindingsKeysUnsorted;
570 // add preslice declarations to slicing cache definition
571 homogeneous_apply_refs_sized<numElements>([&bindingsKeys, &bindingsKeysUnsorted](auto& element) { return analysis_task_parsers::registerCache(element, bindingsKeys, bindingsKeysUnsorted); }, *task.get());
572
573 homogeneous_apply_refs_sized<numElements>([&ic](auto&& element) { return analysis_task_parsers::prepareOption(ic, element); }, *task.get());
574 homogeneous_apply_refs_sized<numElements>([&ic](auto&& element) { return analysis_task_parsers::prepareService(ic, element); }, *task.get());
575
576 auto& callbacks = ic.services().get<CallbackService>();
577 auto eoscb = [task](EndOfStreamContext& eosContext) {
578 homogeneous_apply_refs_sized<numElements>([&eosContext](auto& element) {
581 return true; },
582 *task.get());
583 eosContext.services().get<ControlService>().readyToQuit(QuitRequest::Me);
584 };
585
586 callbacks.set<CallbackService::Id::EndOfStream>(eoscb);
587
589 if constexpr (requires { task->init(ic); }) {
590 task->init(ic);
591 }
592
594 homogeneous_apply_refs_sized<numElements>(
595 [&ic](auto& element) -> bool { return analysis_task_parsers::updatePlaceholders(ic, element); },
596 *task.get());
598 homogeneous_apply_refs_sized<numElements>([&expressionInfos](auto& element) {
600 },
601 *task.get());
602
604 if constexpr (requires { &T::process; }) {
605 AnalysisDataProcessorBuilder::cacheFromArgs(&T::process, true, bindingsKeys, bindingsKeysUnsorted);
606 }
607 homogeneous_apply_refs_sized<numElements>(
608 [&bindingsKeys, &bindingsKeysUnsorted](auto& x) {
609 return AnalysisDataProcessorBuilder::requestCacheFromArgs(x, bindingsKeys, bindingsKeysUnsorted);
610 },
611 *task.get());
612
613 ic.services().get<ArrowTableSlicingCacheDef>().setCaches(std::move(bindingsKeys));
614 ic.services().get<ArrowTableSlicingCacheDef>().setCachesUnsorted(std::move(bindingsKeysUnsorted));
615
616 return [task, expressionInfos](ProcessingContext& pc) mutable {
617 // load the ccdb object from their cache
618 homogeneous_apply_refs_sized<numElements>([&pc](auto& element) { return analysis_task_parsers::newDataframeCondition(pc.inputs(), element); }, *task.get());
619 // reset partitions once per dataframe
620 homogeneous_apply_refs_sized<numElements>([](auto& element) { return analysis_task_parsers::newDataframePartition(element); }, *task.get());
621 // reset selections for the next dataframe
622 std::ranges::for_each(expressionInfos, [](auto& info) { info.resetSelection = true; });
623 // reset pre-slice for the next dataframe
624 auto slices = pc.services().get<ArrowTableSlicingCache>();
625 homogeneous_apply_refs_sized<numElements>([&slices](auto& element) {
627 },
628 *(task.get()));
629 // initialize local caches
630 homogeneous_apply_refs_sized<numElements>([&pc](auto& element) { return analysis_task_parsers::initializeCache(pc, element); }, *(task.get()));
631 // prepare outputs
632 homogeneous_apply_refs_sized<numElements>([&pc](auto& element) { return analysis_task_parsers::prepareOutput(pc, element); }, *task.get());
633 // execute run()
634 if constexpr (requires { task->run(pc); }) {
635 task->run(pc);
636 }
637 // execute process()
638 if constexpr (requires { &T::process; }) {
639 AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), &T::process, expressionInfos, slices);
640 }
641 // execute optional process()
642 homogeneous_apply_refs_sized<numElements>(
643 [&pc, &expressionInfos, &task, &slices](auto& x) {
644 if constexpr (is_process_configurable<decltype(x)>) {
645 if (x.value == true) {
646 AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), x.process, expressionInfos, slices);
647 return true;
648 }
649 return false;
650 }
651 return false;
652 },
653 *task.get());
654 // prepare delayed outputs
655 homogeneous_apply_refs_sized<numElements>([&pc](auto& element) { return analysis_task_parsers::prepareDelayedOutput(pc, element); }, *task.get());
656 // finalize outputs
657 homogeneous_apply_refs_sized<numElements>([&pc](auto& element) { return analysis_task_parsers::finalizeOutput(pc, element); }, *task.get());
658 };
659 }};
660
661 return {
662 name,
663 inputs,
664 outputs,
665 algo,
666 options,
667 requiredServices};
668}
669
670} // namespace o2::framework
671#endif // FRAMEWORK_ANALYSISTASK_H_
uint32_t hash
atype::type element
std::vector< std::shared_ptr< arrow::Field > > fields
int32_t i
constexpr uint32_t runtime_hash(char const *str)
StringRef key
Definition A.h:16
ConfigParamRegistry & options() const
Helper to check if a type T is an iterator.
Definition ASoA.h:1294
GLint GLenum GLint x
Definition glcorearb.h:403
GLenum src
Definition glcorearb.h:1767
GLuint GLuint end
Definition glcorearb.h:469
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum GLenum GLsizei const GLuint GLboolean enabled
Definition glcorearb.h:2513
GLsizei const GLfloat * value
Definition glcorearb.h:819
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLboolean GLboolean g
Definition glcorearb.h:1233
bool prepareService(InitContext &, T &)
bool requestInputs(std::vector< InputSpec > &, T const &)
Table auto-creation handling.
void setGroupedCombination(C &, TG &, Ts &...)
Combinations handling.
bool initializeCache(ProcessingContext &, T &)
Cache handling.
bool prepareDelayedOutput(ProcessingContext &, T &)
bool prepareOption(InitContext &, O &)
bool appendOutput(std::vector< OutputSpec > &, T &, uint32_t)
Outputs handling.
bool finalizeOutput(ProcessingContext &, T &)
bool registerCache(T &, Cache &, Cache &)
Preslice handling.
bool postRunOutput(EndOfStreamContext &, T &)
bool createExpressionTrees(std::vector< ExpressionInfo > &, T &)
bool prepareOutput(ProcessingContext &, T &)
bool appendOption(std::vector< ConfigParamSpec > &, O &)
Options handling.
bool updateSliceInfo(T &, ArrowTableSlicingCache &)
bool appendCondition(std::vector< InputSpec > &, C &)
Conditions handling.
bool postRunService(EndOfStreamContext &, T &)
bool addService(std::vector< ServiceSpec > &, T &)
Service handling.
bool updatePlaceholders(InitContext &, T &)
Filter handling.
bool newDataframeCondition(InputRecord &, C &)
void updateFilterInfo(ExpressionInfo &info, std::shared_ptr< arrow::Table > &table)
Defining PrimaryVertex explicitly as messageable.
void updatePairList(Cache &list, Entry &entry)
DataProcessorSpec adaptAnalysisTask(ConfigContext const &ctx, Args &&... args)
std::vector< Entry > Cache
std::string type_to_task_name(std::string_view const &camelCase)
Convert a CamelCase task struct name to snake-case task name.
@ Me
Only quit this data processor.
constexpr auto homogeneous_apply_refs(L l, T &&object)
std::string cutString(std::string &&str)
Definition ASoA.cxx:266
auto createFieldsFromColumns(framework::pack< C... >)
Definition ASoA.h:58
void missingFilterDeclaration(int hash, int ai)
Definition ASoA.cxx:33
Defining DataPointCompositeObject explicitly as copiable.
gandiva::Selection selection
Definition Expressions.h:65
size_t processHash
Definition Expressions.h:60
std::function< ProcessCallback(InitContext &)> InitCallback
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static std::vector< ServiceSpec > arrowServices()
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static constexpr int64_t step
static constexpr int64_t begin
std::vector< std::pair< std::string, bool > > map
Struct to differentiate task names from possible task string arguments.
TaskName(std::string name)
static constexpr uint32_t uniqueId()
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables)
Definition ASoA.cxx:72
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"