Project
Loading...
Searching...
No Matches
AnalysisTask.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#ifndef FRAMEWORK_ANALYSIS_TASK_H_
13#define FRAMEWORK_ANALYSIS_TASK_H_
14
25#include "Framework/Traits.h"
29
30#include <arrow/compute/kernel.h>
31#include <arrow/table.h>
32#include <gandiva/node.h>
33#include <type_traits>
34#include <utility>
35#include <memory>
36namespace o2::framework
37{
46};
47
48template <int64_t BEGIN, int64_t END, int64_t STEP = 1>
50 static constexpr int64_t begin = BEGIN;
51 static constexpr int64_t end = END;
52 static constexpr int64_t step = STEP;
53};
54
55template <typename T>
56static constexpr bool is_enumeration_v = false;
57
58template <int64_t BEGIN, int64_t END, int64_t STEP>
59static constexpr bool is_enumeration_v<Enumeration<BEGIN, END, STEP>> = true;
60
61template <typename T>
62concept is_enumeration = is_enumeration_v<std::decay_t<T>>;
63
64// Helper struct which builds a DataProcessorSpec from
65// the contents of an AnalysisTask...
66namespace {
67struct AnalysisDataProcessorBuilder {
68 template <typename T>
69 static ConfigParamSpec getSpec()
70 {
72 return ConfigParamSpec{std::string{"input:"} + aod::MetadataTrait<T>::metadata::tableLabel(), VariantType::String, aod::MetadataTrait<T>::metadata::sourceSpec(), {"\"\""}};
73 } else {
74 using O1 = framework::pack_element_t<0, typename T::originals>;
75 return ConfigParamSpec{std::string{"input:"} + aod::MetadataTrait<T>::metadata::tableLabel(), VariantType::String, aod::MetadataTrait<O1>::metadata::sourceSpec(), {"\"\""}};
76 }
77 }
78
79 template <soa::TableRef R>
80 static ConfigParamSpec getSpec()
81 {
82 return soa::tableRef2ConfigParamSpec<R>();
83 }
84
85 template <soa::with_sources T>
86 static inline auto getSources()
87 {
88 return []<size_t N, std::array<soa::TableRef, N> refs>() {
89 return []<size_t... Is>(std::index_sequence<Is...>) {
90 return std::vector{soa::tableRef2ConfigParamSpec<refs[Is]>()...};
91 }(std::make_index_sequence<N>());
92 }.template operator()<T::sources.size(), T::sources>();
93 }
94
95 template <soa::with_sources T>
96
97 static auto getInputMetadata()
98 {
99 std::vector<ConfigParamSpec> inputMetadata;
100 auto inputSources = getSources<T>();
101 std::sort(inputSources.begin(), inputSources.end(), [](ConfigParamSpec const& a, ConfigParamSpec const& b) { return a.name < b.name; });
102 auto last = std::unique(inputSources.begin(), inputSources.end(), [](ConfigParamSpec const& a, ConfigParamSpec const& b) { return a.name == b.name; });
103 inputSources.erase(last, inputSources.end());
104 inputMetadata.insert(inputMetadata.end(), inputSources.begin(), inputSources.end());
105 return inputMetadata;
106 }
107
108 template <typename G, typename... Args>
109 static void addGroupingCandidates(std::vector<StringPair>& bk, std::vector<StringPair>& bku)
110 {
111 [&bk, &bku]<typename... As>(framework::pack<As...>) mutable {
112 std::string key;
113 if constexpr (soa::is_iterator<std::decay_t<G>>) {
114 key = std::string{"fIndex"} + o2::framework::cutString(soa::getLabelFromType<std::decay_t<G>>());
115 }
116 ([&bk, &bku, &key]() mutable {
117 if constexpr (soa::relatedByIndex<std::decay_t<G>, std::decay_t<As>>()) {
118 auto binding = soa::getLabelFromTypeForKey<std::decay_t<As>>(key);
120 framework::updatePairList(bku, binding, key);
121 } else {
122 framework::updatePairList(bk, binding, key);
123 }
124 }
125 }(),
126 ...);
127 }(framework::pack<Args...>{});
128 }
129
130 template <soa::TableRef R>
131 static void addOriginalRef(const char* name, bool value, std::vector<InputSpec>& inputs)
132 {
133 using metadata = typename aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata;
134 std::vector<ConfigParamSpec> inputMetadata;
135 inputMetadata.emplace_back(ConfigParamSpec{std::string{"control:"} + name, VariantType::Bool, value, {"\"\""}});
136 if constexpr (soa::with_sources<metadata>) {
137 auto inputSources = getInputMetadata<metadata>();
138 inputMetadata.insert(inputMetadata.end(), inputSources.begin(), inputSources.end());
139 }
140 DataSpecUtils::updateInputList(inputs, InputSpec{o2::aod::label<R>(), o2::aod::origin<R>(), aod::description(o2::aod::signature<R>()), R.version, Lifetime::Timeframe, inputMetadata});
141 }
142
144 template <soa::is_table A>
146 static void addExpression(int, uint32_t, std::vector<ExpressionInfo>&)
147 {
148 }
149
150 template <soa::is_filtered_table A>
151 static void addExpression(int ai, uint32_t hash, std::vector<ExpressionInfo>& eInfos)
152 {
153 auto fields = soa::createFieldsFromColumns(typename std::decay_t<A>::persistent_columns_t{});
154 eInfos.emplace_back(ai, hash, std::decay_t<A>::hashes(), std::make_shared<arrow::Schema>(fields));
155 }
156
157 template <soa::is_iterator A>
158 static void addExpression(int ai, uint32_t hash, std::vector<ExpressionInfo>& eInfos)
159 {
160 addExpression<typename std::decay_t<A>::parent_t>(ai, hash, eInfos);
161 }
162
164 template <soa::is_table A>
165 static void addInput(const char* name, bool value, std::vector<InputSpec>& inputs)
166 {
167 [&name, &value, &inputs]<size_t N, std::array<soa::TableRef, N> refs, size_t... Is>(std::index_sequence<Is...>) mutable {
168 (addOriginalRef<refs[Is]>(name, value, inputs), ...);
169 }.template operator()<A::originals.size(), std::decay_t<A>::originals>(std::make_index_sequence<std::decay_t<A>::originals.size()>());
170 }
171
172 template <soa::is_iterator A>
173 static void addInput(const char* name, bool value, std::vector<InputSpec>& inputs)
174 {
175 addInput<typename std::decay_t<A>::parent_t>(name, value, inputs);
176 }
177
179 template <soa::is_table... As>
180 static void addInputsAndExpressions(uint32_t hash, const char* name, bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos)
181 {
182 int ai = -1;
183 ([&ai, &hash, &eInfos, &name, &value, &inputs]() mutable {
184 ++ai;
185 using T = std::decay_t<As>;
186 addExpression<T>(ai, hash, eInfos);
187 addInput<T>(name, value, inputs);
188 }(),
189 ...);
190 }
191
194 template <typename R, typename C, is_enumeration A>
195 static void inputsFromArgs(R (C::*)(A), const char* /*name*/, bool /*value*/, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>&, std::vector<StringPair>&, std::vector<StringPair>&)
196 {
197 std::vector<ConfigParamSpec> inputMetadata;
198 // FIXME: for the moment we do not support begin, end and step.
199 DataSpecUtils::updateInputList(inputs, InputSpec{"enumeration", "DPL", "ENUM", 0, Lifetime::Enumeration, inputMetadata});
200 }
201
203 template <typename R, typename C, soa::is_iterator A, soa::is_table... Args>
204 static void inputsFromArgs(R (C::*)(A, Args...), const char* name, bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos, std::vector<StringPair>& bk, std::vector<StringPair>& bku)
205 requires(std::is_lvalue_reference_v<A> && (std::is_lvalue_reference_v<Args> && ...))
206 {
207 addGroupingCandidates<A, Args...>(bk, bku);
208 constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId<R (C::*)(A, Args...)>();
209 addInputsAndExpressions<typename std::decay_t<A>::parent_t, Args...>(hash, name, value, inputs, eInfos);
210 }
211
213 template <typename R, typename C, soa::is_table... Args>
214 static void inputsFromArgs(R (C::*)(Args...), const char* name, bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos, std::vector<StringPair>&, std::vector<StringPair>&)
215 requires(std::is_lvalue_reference_v<Args> && ...)
216 {
217 constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId<R (C::*)(Args...)>();
218 addInputsAndExpressions<Args...>(hash, name, value, inputs, eInfos);
219 }
220
221 template <soa::TableRef R>
222 static auto extractTableFromRecord(InputRecord& record)
223 {
224 auto table = record.get<TableConsumer>(o2::aod::label<R>())->asArrowTable();
225 if (table->num_rows() == 0) {
226 table = makeEmptyTable<R>();
227 }
228 return table;
229 }
230
231 template <soa::is_table T>
232 static auto extractFromRecord(InputRecord& record)
233 {
234 return T { [&record]<size_t N, std::array<soa::TableRef, N> refs, size_t... Is>(std::index_sequence<Is...>) { return std::vector{extractTableFromRecord<refs[Is]>(record)...}; }.template operator()<T::originals.size(), T::originals>(std::make_index_sequence<T::originals.size()>()) };
235 }
236
237 template <soa::is_iterator T>
238 static auto extractFromRecord(InputRecord& record)
239 {
240 return typename T::parent_t { [&record]<size_t N, std::array<soa::TableRef, N> refs, size_t... Is>(std::index_sequence<Is...>) { return std::vector{extractTableFromRecord<refs[Is]>(record)...}; }.template operator()<T::parent_t::originals.size(), T::parent_t::originals>(std::make_index_sequence<T::parent_t::originals.size()>()) };
241 }
242
243 template <soa::is_filtered T>
244 static auto extractFilteredFromRecord(InputRecord& record, ExpressionInfo& info)
245 {
246 std::shared_ptr<arrow::Table> table = nullptr;
247 auto joiner = [&record]<size_t N, std::array<soa::TableRef, N> refs, size_t... Is>(std::index_sequence<Is...>) { return std::vector{extractTableFromRecord<refs[Is]>(record)...}; };
248 if constexpr (soa::is_iterator<T>) {
249 table = o2::soa::ArrowHelpers::joinTables(joiner.template operator()<T::parent_t::originals.size(), T::parent_t::originals>(std::make_index_sequence<T::parent_t::originals.size()>()));
250 } else {
251 table = o2::soa::ArrowHelpers::joinTables(joiner.template operator()<T::originals.size(), T::originals>(std::make_index_sequence<T::originals.size()>()));
252 }
255 if (info.selection == nullptr) {
257 }
258 }
259 if constexpr (soa::is_iterator<T>) {
260 return typename T::parent_t({table}, info.selection);
261 } else {
262 return T({table}, info.selection);
263 }
264 }
265
266 template <is_enumeration T, int AI>
267 static auto extract(InputRecord&, std::vector<ExpressionInfo>&, size_t)
268 {
269 return T{};
270 }
271
272 template <soa::is_iterator T, int AI>
273 static auto extract(InputRecord& record, std::vector<ExpressionInfo>& infos, size_t phash)
274 {
275 if constexpr (std::same_as<typename T::policy_t, soa::FilteredIndexPolicy>) {
276 return extractFilteredFromRecord<T>(record, *std::find_if(infos.begin(), infos.end(), [&phash](ExpressionInfo const& i) { return (i.processHash == phash && i.argumentIndex == AI); }));
277 } else {
278 return extractFromRecord<T>(record);
279 }
280 }
281
282 template <soa::is_table T, int AI>
283 static auto extract(InputRecord& record, std::vector<ExpressionInfo>& infos, size_t phash)
284 {
285 if constexpr (soa::is_filtered_table<T>) {
286 return extractFilteredFromRecord<T>(record, *std::find_if(infos.begin(), infos.end(), [&phash](ExpressionInfo const& i) { return (i.processHash == phash && i.argumentIndex == AI); }));
287 } else {
288 return extractFromRecord<T>(record);
289 }
290 }
291
292 template <typename R, typename C, typename Grouping, typename... Args>
293 static auto bindGroupingTable(InputRecord& record, R (C::*)(Grouping, Args...), std::vector<ExpressionInfo>& infos)
294 requires(!std::same_as<Grouping, void>)
295 {
296 constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId<R (C::*)(Grouping, Args...)>();
297 return extract<std::decay_t<Grouping>, 0>(record, infos, hash);
298 }
299
300 template <typename R, typename C, typename Grouping, typename... Args>
301 static auto bindAssociatedTables(InputRecord& record, R (C::*)(Grouping, Args...), std::vector<ExpressionInfo>& infos)
302 requires(!std::same_as<Grouping, void> && sizeof...(Args) > 0)
303 {
304 constexpr auto p = pack<Args...>{};
305 constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId<R (C::*)(Grouping, Args...)>();
306 return std::make_tuple(extract<std::decay_t<Args>, has_type_at_v<Args>(p) + 1>(record, infos, hash)...);
307 }
308
309 template <typename... As>
310 static void overwriteInternalIndices(std::tuple<As...>& dest, std::tuple<As...> const& src)
311 {
312 (std::get<As>(dest).bindInternalIndicesTo(&std::get<As>(src)), ...);
313 }
314
315 template <typename Task, typename R, typename C, typename Grouping, typename... Associated>
316 static void invokeProcess(Task& task, InputRecord& inputs, R (C::*processingFunction)(Grouping, Associated...), std::vector<ExpressionInfo>& infos, ArrowTableSlicingCache& slices)
317 {
318 using G = std::decay_t<Grouping>;
319 auto groupingTable = AnalysisDataProcessorBuilder::bindGroupingTable(inputs, processingFunction, infos);
320
321 // set filtered tables for partitions with grouping
322 homogeneous_apply_refs([&groupingTable](auto& element) {
323 analysis_task_parsers::setPartition(element, groupingTable);
325 return true;
326 },
327 task);
328
329 if constexpr (sizeof...(Associated) == 0) {
330 // single argument to process
331 homogeneous_apply_refs([&groupingTable](auto& element) {
333 analysis_task_parsers::setGroupedCombination(element, groupingTable);
334 return true;
335 },
336 task);
337 if constexpr (soa::is_iterator<G>) {
338 for (auto& element : groupingTable) {
339 std::invoke(processingFunction, task, *element);
340 }
341 } else {
342 static_assert(soa::is_table<G> || is_enumeration<G>,
343 "Single argument of process() should be a table-like or an iterator");
344 std::invoke(processingFunction, task, groupingTable);
345 }
346 } else {
347 // multiple arguments to process
348 static_assert(((soa::is_iterator<std::decay_t<Associated>> == false) && ...),
349 "Associated arguments of process() should not be iterators");
350 auto associatedTables = AnalysisDataProcessorBuilder::bindAssociatedTables(inputs, processingFunction, infos);
351 // pre-bind self indices
352 std::apply(
353 [&task](auto&... t) mutable {
355 [&t](auto& p) {
357 return true;
358 },
359 task),
360 ...);
361 },
362 associatedTables);
363
364 auto binder = [&task, &groupingTable, &associatedTables](auto& x) mutable {
365 x.bindExternalIndices(&groupingTable, &std::get<std::decay_t<Associated>>(associatedTables)...);
366 homogeneous_apply_refs([&x](auto& t) mutable {
369 return true;
370 },
371 task);
372 };
373 groupingTable.bindExternalIndices(&std::get<std::decay_t<Associated>>(associatedTables)...);
374
375 // always pre-bind full tables to support index hierarchy
376 std::apply(
377 [&binder](auto&... x) mutable {
378 (binder(x), ...);
379 },
380 associatedTables);
381
382 // GroupedCombinations bound separately, as they should be set once for all associated tables
383 homogeneous_apply_refs([&groupingTable, &associatedTables](auto& t) {
384 analysis_task_parsers::setGroupedCombination(t, groupingTable, associatedTables);
385 return true;
386 },
387 task);
388 overwriteInternalIndices(associatedTables, associatedTables);
389 if constexpr (soa::is_iterator<std::decay_t<G>>) {
390 auto slicer = GroupSlicer(groupingTable, associatedTables, slices);
391 for (auto& slice : slicer) {
392 auto associatedSlices = slice.associatedTables();
393 overwriteInternalIndices(associatedSlices, associatedTables);
394 std::apply(
395 [&binder](auto&... x) mutable {
396 (binder(x), ...);
397 },
398 associatedSlices);
399
400 // bind partitions and grouping table
401 homogeneous_apply_refs([&groupingTable](auto& x) {
403 return true;
404 },
405 task);
406
407 invokeProcessWithArgs(task, processingFunction, slice.groupingElement(), associatedSlices);
408 }
409 } else {
410 // bind partitions and grouping table
411 homogeneous_apply_refs([&groupingTable](auto& x) {
413 return true;
414 },
415 task);
416
417 invokeProcessWithArgs(task, processingFunction, groupingTable, associatedTables);
418 }
419 }
420 }
421
422 template <typename C, typename T, typename G, typename... A>
423 static void invokeProcessWithArgs(C& task, T processingFunction, G g, std::tuple<A...>& at)
424 {
425 std::invoke(processingFunction, task, g, std::get<A>(at)...);
426 }
427};
428}
429
431 std::vector<std::pair<std::string, bool>> map;
432};
433
435struct TaskName {
436 TaskName(std::string name) : value{std::move(name)} {}
437 std::string value;
438};
439
440namespace {
441template <typename T, typename... A>
442auto getTaskNameSetProcesses(std::string& outputName, TaskName first, SetDefaultProcesses second, A... args)
443{
444 auto task = std::make_shared<T>(std::forward<A>(args)...);
445 for (auto& setting : second.map) {
447 [&](auto& element) {
448 return analysis_task_parsers::setProcessSwitch(setting, element);
449 },
450 *task.get());
451 }
452 outputName = first.value;
453 return task;
454}
455
456template <typename T, typename... A>
457auto getTaskNameSetProcesses(std::string& outputName, SetDefaultProcesses first, TaskName second, A... args)
458{
459 auto task = std::make_shared<T>(std::forward<A>(args)...);
460 for (auto& setting : first.map) {
462 [&](auto& element) {
463 return analysis_task_parsers::setProcessSwitch(setting, element);
464 },
465 *task.get());
466 }
467 outputName = second.value;
468 return task;
469}
470
471template <typename T, typename... A>
472auto getTaskNameSetProcesses(std::string& outputName, SetDefaultProcesses first, A... args)
473{
474 auto task = std::make_shared<T>(std::forward<A>(args)...);
475 for (auto& setting : first.map) {
477 [&](auto& element) {
478 return analysis_task_parsers::setProcessSwitch(setting, element);
479 },
480 *task.get());
481 }
482 auto type_name_str = type_name<T>();
483 outputName = type_to_task_name(type_name_str);
484 return task;
485}
486
487template <typename T, typename... A>
488auto getTaskNameSetProcesses(std::string& outputName, TaskName first, A... args)
489{
490 auto task = std::make_shared<T>(std::forward<A>(args)...);
491 outputName = first.value;
492 return task;
493}
494
495template <typename T, typename... A>
496auto getTaskNameSetProcesses(std::string& outputName, A... args)
497{
498 auto task = std::make_shared<T>(std::forward<A>(args)...);
499 auto type_name_str = type_name<T>();
500 outputName = type_to_task_name(type_name_str);
501 return task;
502}
503
504}
505
508template <typename T, typename... Args>
510{
511 TH1::AddDirectory(false);
512
513 std::string name_str;
514 auto task = getTaskNameSetProcesses<T>(name_str, args...);
515
516 auto suffix = ctx.options().get<std::string>("workflow-suffix");
517 if (!suffix.empty()) {
518 name_str += suffix;
519 }
520 const char* name = name_str.c_str();
521
522 auto hash = runtime_hash(name);
523
524 std::vector<OutputSpec> outputs;
525 std::vector<InputSpec> inputs;
526 std::vector<ConfigParamSpec> options;
527 std::vector<ExpressionInfo> expressionInfos;
528 std::vector<StringPair> bindingsKeys;
529 std::vector<StringPair> bindingsKeysUnsorted;
530
532 homogeneous_apply_refs([&options, &hash](auto& element) { return analysis_task_parsers::appendOption(options, element); }, *task.get());
534 homogeneous_apply_refs([&inputs](auto& element) { return analysis_task_parsers::appendCondition(inputs, element); }, *task.get());
535
537 if constexpr (requires { &T::process; }) {
538 AnalysisDataProcessorBuilder::inputsFromArgs(&T::process, "default", true, inputs, expressionInfos, bindingsKeys, bindingsKeysUnsorted);
539 }
542 [name = name_str, &expressionInfos, &inputs, &bindingsKeys, &bindingsKeysUnsorted](framework::is_process_configurable auto& x) mutable {
543 // this pushes (argumentIndex,processHash,schemaPtr,nullptr) into expressionInfos for arguments that are Filtered/filtered_iterators
544 AnalysisDataProcessorBuilder::inputsFromArgs(x.process, (name + "/" + x.name).c_str(), x.value, inputs, expressionInfos, bindingsKeys, bindingsKeysUnsorted);
545 return true;
546 },
547 [](auto&) {
548 return false;
549 }},
550 *task.get());
551
552 // add preslice declarations to slicing cache definition
553 homogeneous_apply_refs([&bindingsKeys, &bindingsKeysUnsorted](auto& element) { return analysis_task_parsers::registerCache(element, bindingsKeys, bindingsKeysUnsorted); }, *task.get());
554
555 // request base tables for spawnable extended tables and indices to be built
556 // this checks for duplications
557 homogeneous_apply_refs([&inputs](auto& element) {
558 return analysis_task_parsers::requestInputs(inputs, element);
559 },
560 *task.get());
561
562 // no static way to check if the task defines any processing, we can only make sure it subscribes to at least something
563 if (inputs.empty() == true) {
564 LOG(warn) << "Task " << name_str << " has no inputs";
565 }
566
567 homogeneous_apply_refs([&outputs, &hash](auto& element) { return analysis_task_parsers::appendOutput(outputs, element, hash); }, *task.get());
568
569 auto requiredServices = CommonServices::defaultServices();
570 auto arrowServices = CommonServices::arrowServices();
571 requiredServices.insert(requiredServices.end(), arrowServices.begin(), arrowServices.end());
572 homogeneous_apply_refs([&requiredServices](auto& element) { return analysis_task_parsers::addService(requiredServices, element); }, *task.get());
573
574 auto algo = AlgorithmSpec::InitCallback{[task = task, expressionInfos, bindingsKeys, bindingsKeysUnsorted](InitContext& ic) mutable {
575 homogeneous_apply_refs([&ic](auto&& element) { return analysis_task_parsers::prepareOption(ic, element); }, *task.get());
576 homogeneous_apply_refs([&ic](auto&& element) { return analysis_task_parsers::prepareService(ic, element); }, *task.get());
577
578 auto& callbacks = ic.services().get<CallbackService>();
579 auto endofdatacb = [task](EndOfStreamContext& eosContext) {
580 homogeneous_apply_refs([&eosContext](auto& element) {
581 analysis_task_parsers::postRunService(eosContext, element);
582 analysis_task_parsers::postRunOutput(eosContext, element);
583 return true; },
584 *task.get());
585 eosContext.services().get<ControlService>().readyToQuit(QuitRequest::Me);
586 };
587
588 callbacks.set<CallbackService::Id::EndOfStream>(endofdatacb);
589
592 [&ic](auto& element) -> bool { return analysis_task_parsers::updatePlaceholders(ic, element); },
593 *task.get());
595 homogeneous_apply_refs([&expressionInfos](auto& element) {
596 return analysis_task_parsers::createExpressionTrees(expressionInfos, element);
597 },
598 *task.get());
599
600 if constexpr (requires { task->init(ic); }) {
601 task->init(ic);
602 }
603
604 ic.services().get<ArrowTableSlicingCacheDef>().setCaches(std::move(bindingsKeys));
605 ic.services().get<ArrowTableSlicingCacheDef>().setCachesUnsorted(std::move(bindingsKeysUnsorted));
606 // initialize global caches
607 homogeneous_apply_refs([&ic](auto& element) {
609 },
610 *(task.get()));
611
612 return [task, expressionInfos](ProcessingContext& pc) mutable {
613 // load the ccdb object from their cache
614 homogeneous_apply_refs([&pc](auto& element) { return analysis_task_parsers::newDataframeCondition(pc.inputs(), element); }, *task.get());
615 // reset partitions once per dataframe
616 homogeneous_apply_refs([](auto& element) { return analysis_task_parsers::newDataframePartition(element); }, *task.get());
617 // reset selections for the next dataframe
618 for (auto& info : expressionInfos) {
619 info.resetSelection = true;
620 }
621 // reset pre-slice for the next dataframe
622 auto slices = pc.services().get<ArrowTableSlicingCache>();
623 homogeneous_apply_refs([&pc, &slices](auto& element) {
624 return analysis_task_parsers::updateSliceInfo(element, slices);
625 },
626 *(task.get()));
627 // initialize local caches
628 homogeneous_apply_refs([&pc](auto& element) { return analysis_task_parsers::initializeCache(pc, element); }, *(task.get()));
629 // prepare outputs
630 homogeneous_apply_refs([&pc](auto& element) { return analysis_task_parsers::prepareOutput(pc, element); }, *task.get());
631 // execute run()
632 if constexpr (requires { task->run(pc); }) {
633 task->run(pc);
634 }
635 // execute process()
636 if constexpr (requires { AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), &T::process, expressionInfos, slices); }) {
637 AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), &T::process, expressionInfos, slices);
638 }
639 // execute optional process()
641 [&pc, &expressionInfos, &task, &slices](auto& x) mutable {
642 if constexpr (base_of_template<ProcessConfigurable, std::decay_t<decltype(x)>>) {
643 if (x.value == true) {
644 AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), x.process, expressionInfos, slices);
645 return true;
646 }
647 }
648 return false;
649 },
650 *task.get());
651 // finalize outputs
652 homogeneous_apply_refs([&pc](auto& element) { return analysis_task_parsers::finalizeOutput(pc, element); }, *task.get());
653 };
654 }};
655
656 return {
657 name,
658 // FIXME: For the moment we hardcode this. We could build
659 // this list from the list of methods actually implemented in the
660 // task itself.
661 inputs,
662 outputs,
663 algo,
664 options,
665 requiredServices};
666}
667
668} // namespace o2::framework
669#endif // FRAMEWORK_ANALYSISTASK_H_
int32_t i
constexpr uint32_t runtime_hash(char const *str)
StringRef key
Definition A.h:16
ConfigParamRegistry & options() const
Helper to check if a type T is an iterator.
Definition ASoA.h:1246
GLint GLenum GLint x
Definition glcorearb.h:403
GLenum src
Definition glcorearb.h:1767
GLuint GLuint end
Definition glcorearb.h:469
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLboolean GLboolean g
Definition glcorearb.h:1233
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
bool prepareService(InitContext &, T &)
bool requestInputs(std::vector< InputSpec > &, T const &)
Table auto-creation handling.
void setGroupedCombination(C &, TG &, Ts &...)
Combinations handling.
bool initializeCache(ProcessingContext &, T &)
bool preInitializeCache(InitContext &, T &)
Cache handling.
bool prepareOption(InitContext &, O &)
bool appendOutput(std::vector< OutputSpec > &, T &, uint32_t)
Outputs handling.
bool finalizeOutput(ProcessingContext &, T &)
bool postRunOutput(EndOfStreamContext &, T &)
bool createExpressionTrees(std::vector< ExpressionInfo > &, T &)
bool prepareOutput(ProcessingContext &, T &)
bool appendOption(std::vector< ConfigParamSpec > &, O &)
Options handling.
bool updateSliceInfo(T &, ArrowTableSlicingCache &)
bool appendCondition(std::vector< InputSpec > &, C &)
Conditions handling.
bool postRunService(EndOfStreamContext &, T &)
bool addService(std::vector< ServiceSpec > &, T &)
Service handling.
bool updatePlaceholders(InitContext &, T &)
Filter handling.
bool registerCache(T &, std::vector< StringPair > &, std::vector< StringPair > &)
Preslice handling.
bool newDataframeCondition(InputRecord &, C &)
void updateFilterInfo(ExpressionInfo &info, std::shared_ptr< arrow::Table > &table)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
void updatePairList(std::vector< StringPair > &list, std::string const &binding, std::string const &key)
auto homogeneous_apply_refs(L l, T &&object)
DataProcessorSpec adaptAnalysisTask(ConfigContext const &ctx, Args &&... args)
@ Me
Only quit this data processor.
std::string cutString(std::string &&str)
Definition ASoA.cxx:180
auto createFieldsFromColumns(framework::pack< C... >)
Definition ASoA.h:401
void missingFilterDeclaration(int hash, int ai)
Definition ASoA.cxx:28
Defining DataPointCompositeObject explicitly as copiable.
gandiva::Selection selection
Definition Expressions.h:65
size_t processHash
Definition Expressions.h:60
std::function< ProcessCallback(InitContext &)> InitCallback
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static std::vector< ServiceSpec > arrowServices()
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static constexpr int64_t step
static constexpr int64_t begin
std::vector< std::pair< std::string, bool > > map
Struct to differentiate task names from possible task string arguments.
TaskName(std::string name)
static constexpr uint32_t uniqueId()
From https://en.cppreference.com/w/cpp/utility/variant/visit.
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables)
Definition ASoA.cxx:67
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"