Project
Loading...
Searching...
No Matches
AnalysisTask.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#ifndef FRAMEWORK_ANALYSIS_TASK_H_
13#define FRAMEWORK_ANALYSIS_TASK_H_
14
29
30#include <arrow/compute/kernel.h>
31#include <arrow/table.h>
32#include <gandiva/node.h>
33#include <type_traits>
34#include <utility>
35#include <memory>
36#include <tuple> // IWYU pragma: export
37
38namespace o2::framework
39{
41std::string type_to_task_name(std::string_view const& camelCase);
42
51};
52
53template <int64_t BEGIN, int64_t END, int64_t STEP = 1>
55 static constexpr int64_t begin = BEGIN;
56 static constexpr int64_t end = END;
57 static constexpr int64_t step = STEP;
58};
59
60template <typename T>
61static constexpr bool is_enumeration_v = false;
62
63template <int64_t BEGIN, int64_t END, int64_t STEP>
64static constexpr bool is_enumeration_v<Enumeration<BEGIN, END, STEP>> = true;
65
66template <typename T>
67concept is_enumeration = is_enumeration_v<std::decay_t<T>>;
68
69template <typename T>
71
72// Helper struct which builds a DataProcessorSpec from
73// the contents of an AnalysisTask...
74namespace
75{
76struct AnalysisDataProcessorBuilder {
77 template <soa::is_iterator G, soa::is_table... Args>
78 static void addGroupingCandidates(Cache& bk, Cache& bku, bool enabled)
79 {
80 []<soa::is_table... As>(framework::pack<As...>, Cache& bk, Cache& bku, bool enabled) {
81 auto key = std::string{"fIndex"} + o2::framework::cutString(soa::getLabelFromType<std::decay_t<G>>());
82 ([](Cache& bk, Cache& bku, bool enabled, std::string const& key) {
83 if constexpr (soa::relatedByIndex<std::decay_t<G>, std::decay_t<As>>()) {
84 Entry e{soa::getLabelFromTypeForKey<std::decay_t<As>>(key), soa::getMatcherFromTypeForKey<std::decay_t<As>>(key), key, enabled};
87 } else {
89 }
90 }
91 }(bk, bku, enabled, key),
92 ...);
93 }(framework::pack<Args...>{}, bk, bku, enabled);
94 }
95
96 template <soa::TableRef R>
97 static void addOriginalRef(const char* name, bool value, std::vector<InputSpec>& inputs, std::vector<InputInfo>& iInfos, int ai, uint32_t hash, header::DataOrigin newOrigin = header::DataOrigin{"AOD"})
98 {
99 auto spec = soa::tableRef2InputSpec<R>(newOrigin);
100 if (R.origin_hash != "AOD"_h) {
101 spec.metadata.emplace_back(ConfigParamSpec{"aod-origin-replaced", VariantType::Bool, true, {"\"\""}});
102 }
103 spec.metadata.emplace_back(ConfigParamSpec{std::string{"control:"} + name, VariantType::Bool, value, {"\"\""}});
104 auto matcher = DataSpecUtils::asConcreteDataMatcher(spec);
105 DataSpecUtils::updateInputList(inputs, std::move(spec));
106 auto locate = std::ranges::find_if(iInfos, [&hash](auto const& info) { return info.hash == hash; });
107 if (locate == iInfos.end()) {
108 iInfos.emplace_back(hash, std::vector{std::pair{ai, matcher}});
109 } else {
110 if (std::ranges::none_of(locate->matchers, [&ai, &matcher](auto const& match) { return (match.first == ai) && (match.second == matcher); })) {
111 locate->matchers.emplace_back(std::pair{ai, matcher});
112 }
113 }
114 }
115
117 template <soa::is_table A>
119 static void addExpression(int, uint32_t, std::vector<ExpressionInfo>&)
120 {
121 }
122
123 template <soa::is_filtered_table A>
124 static void addExpression(int ai, uint32_t hash, std::vector<ExpressionInfo>& eInfos)
125 {
126 auto fields = soa::createFieldsFromColumns(typename std::decay_t<A>::persistent_columns_t{});
127 eInfos.emplace_back(ai, hash, std::decay_t<A>::hashes(), std::make_shared<arrow::Schema>(fields));
128 }
129
130 template <soa::is_iterator A>
131 static void addExpression(int ai, uint32_t hash, std::vector<ExpressionInfo>& eInfos)
132 {
133 addExpression<typename std::decay_t<A>::parent_t>(ai, hash, eInfos);
134 }
135
137 template <soa::is_table A>
138 static void addInput(const char* name, bool value, std::vector<InputSpec>& inputs, std::vector<InputInfo>& iInfos, int ai, uint32_t hash, header::DataOrigin&& newOrigin = header::DataOrigin{"AOD"})
139 {
140 [&name, &value, &inputs, &iInfos, &ai, &hash, newOrigin = std::move(newOrigin)]<size_t N, std::array<soa::TableRef, N> refs, size_t... Is>(std::index_sequence<Is...>) mutable {
141 (addOriginalRef<refs[Is]>(name, value, inputs, iInfos, ai, hash, newOrigin), ...);
142 }.template operator()<A::originals.size(), std::decay_t<A>::originals>(std::make_index_sequence<std::decay_t<A>::originals.size()>());
143 }
144
146 template <soa::is_table... As>
147 static void addInputsAndExpressions(uint32_t hash, const char* name, bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos, std::vector<InputInfo>& iInfos, header::DataOrigin&& newOrigin = header::DataOrigin{"AOD"})
148 {
149 int ai = -1;
150 ([&ai, &hash, &eInfos, &name, &value, &inputs, &iInfos, newOrigin]() mutable {
151 ++ai;
152 using T = std::decay_t<As>;
153 addExpression<T>(ai, hash, eInfos);
154 addInput<T>(name, value, inputs, iInfos, ai, hash, std::move(newOrigin));
155 }(),
156 ...);
157 }
158
160 template <typename T>
161 inline static bool requestInputsFromArgs(T&, std::string const&, std::vector<InputSpec>&, std::vector<ExpressionInfo>&, std::vector<InputInfo>&, header::DataOrigin)
162 {
163 return false;
164 }
165 template <is_process_configurable T>
166 inline static bool requestInputsFromArgs(T& pc, std::string const& name, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eis, std::vector<InputInfo>& iifs, header::DataOrigin newOrigin = header::DataOrigin{"AOD"})
167 {
168 AnalysisDataProcessorBuilder::inputsFromArgs(pc.process, (name + "/" + pc.name).c_str(), pc.value, inputs, eis, iifs, newOrigin);
169 return true;
170 }
171 template <typename T>
172 inline static bool requestCacheFromArgs(T&, Cache&, Cache&)
173 {
174 return false;
175 }
176 template <is_process_configurable T>
177 inline static bool requestCacheFromArgs(T& pc, Cache& bk, Cache& bku)
178 {
179 AnalysisDataProcessorBuilder::cacheFromArgs(pc.process, pc.value, bk, bku);
180 return true;
181 }
183 template <typename C, is_enumeration A>
184 static void inputsFromArgs(void (C::*)(A), const char* /*name*/, bool /*value*/, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>&, std::vector<InputInfo>&, header::DataOrigin)
185 {
186 std::vector<ConfigParamSpec> inputMetadata;
187 // FIXME: for the moment we do not support begin, end and step.
188 DataSpecUtils::updateInputList(inputs, InputSpec{"enumeration", "DPL", "ENUM", 0, Lifetime::Enumeration, inputMetadata});
189 }
190
192 template <typename C, soa::is_iterator A, soa::is_table... Args>
193 static void inputsFromArgs(void (C::*)(A, Args...), const char* name, bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos, std::vector<InputInfo>& iInfos, header::DataOrigin newOrigin = header::DataOrigin{"AOD"})
194 requires(std::is_lvalue_reference_v<A> && (std::is_lvalue_reference_v<Args> && ...))
195 {
196 constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId<void (C::*)(A, Args...)>();
197 addInputsAndExpressions<typename std::decay_t<A>::parent_t, Args...>(hash, name, value, inputs, eInfos, iInfos, std::move(newOrigin));
198 }
199
201 template <typename C, soa::is_table... Args>
202 static void inputsFromArgs(void (C::*)(Args...), const char* name, bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos, std::vector<InputInfo>& iInfos, header::DataOrigin newOrigin = header::DataOrigin{"AOD"})
203 requires(std::is_lvalue_reference_v<Args> && ...)
204 {
205 constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId<void (C::*)(Args...)>();
206 addInputsAndExpressions<Args...>(hash, name, value, inputs, eInfos, iInfos, std::move(newOrigin));
207 }
208
210 template <typename C, is_enumeration A>
211 static void cacheFromArgs(void (C::*)(A), bool, Cache&, Cache&)
212 {
213 }
215 template <typename C, soa::is_iterator A, soa::is_table... Args>
216 static void cacheFromArgs(void (C::*)(A, Args...), bool value, Cache& bk, Cache& bku)
217 {
218 addGroupingCandidates<A, Args...>(bk, bku, value);
219 }
221 template <typename C, soa::is_table A, soa::is_table... Args>
222 static void cacheFromArgs(void (C::*)(A, Args...), bool, Cache&, Cache&)
223 {
224 }
225
226 template <std::ranges::input_range R>
227 static auto extractTablesFromRecord(InputRecord& record, R matchers)
228 {
229 std::vector<std::shared_ptr<arrow::Table>> tables;
230 std::ranges::transform(matchers, std::back_inserter(tables), [&record](auto const& m) {
231 return record.get<TableConsumer>(m.second)->asArrowTable();
232 });
233 return tables;
234 }
235
236 template <soa::is_table T, std::ranges::input_range R>
237 static auto extractFromRecord(InputRecord& record, R matchers)
238 {
239 return T{extractTablesFromRecord(record, matchers)};
240 }
241
242 template <soa::is_iterator T, std::ranges::input_range R>
243 static auto extractFromRecord(InputRecord& record, R matchers)
244 {
245 return typename T::parent_t{extractTablesFromRecord(record, matchers)};
246 }
247
248 template <soa::is_filtered T, std::ranges::input_range R>
249 static auto extractFilteredFromRecord(InputRecord& record, R matchers, ExpressionInfo& info)
250 {
251 std::shared_ptr<arrow::Table> table = soa::ArrowHelpers::joinTables(extractTablesFromRecord(record, matchers));
254 if (info.selection == nullptr) {
255 soa::missingFilterDeclaration(info.processHash, info.argumentIndex);
256 }
257 }
258 if constexpr (soa::is_iterator<T>) {
259 return typename T::parent_t({table}, info.selection);
260 } else {
261 return T({table}, info.selection);
262 }
263 }
264
265 template <is_enumeration T, int AI, std::ranges::input_range R>
266 static auto extract(InputRecord&, R, std::vector<ExpressionInfo>&, size_t)
267 {
268 return T{};
269 }
270
271 template <soa::is_table_or_iterator T, int AI, std::ranges::input_range R>
272 static auto extract(InputRecord& record, R matchers, std::vector<ExpressionInfo>& infos, size_t phash)
273 {
274 // auto matchers = std::ranges::find_if(iInfos, [&phash](auto const& info) { return info.hash == phash; })->matchers | std::views::filter([](auto const& pair) { return pair.first == AI; });
275 if constexpr (soa::is_filtered<T>) {
276 return extractFilteredFromRecord<T>(record, matchers, *std::ranges::find_if(infos, [&phash](ExpressionInfo const& i) { return (i.processHash == phash && i.argumentIndex == AI); }));
277 } else {
278 return extractFromRecord<T>(record, matchers);
279 }
280 }
281
282 template <std::ranges::input_range R, typename C, is_table_iterator_or_enumeration Grouping, soa::is_table... Args>
283 static auto bindGroupingTable(InputRecord& record, R matchers, void (C::*)(Grouping, Args...), std::vector<ExpressionInfo>& infos)
284 requires(!std::same_as<Grouping, void>)
285 {
286 constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId<void (C::*)(Grouping, Args...)>();
287 return extract<std::decay_t<Grouping>, 0>(record, matchers | std::views::filter([](auto const& pair) { return pair.first == 0; }), infos, hash);
288 }
289
290 template <std::ranges::input_range R, typename C, is_table_iterator_or_enumeration Grouping, soa::is_table... Args>
291 static auto bindAssociatedTables(InputRecord& record, R matchers, void (C::*)(Grouping, Args...), std::vector<ExpressionInfo>& infos)
292 requires(!std::same_as<Grouping, void> && sizeof...(Args) > 0)
293 {
294 constexpr auto hash = o2::framework::TypeIdHelpers::uniqueId<void (C::*)(Grouping, Args...)>();
295 return std::make_tuple(extract<std::decay_t<Args>, has_type_at_v<Args>(pack<Args...>{}) + 1>(record, matchers | std::views::filter([](auto const& pair) { return pair.first == has_type_at_v<Args>(pack<Args...>{}) + 1; }), infos, hash)...);
296 }
297
298 template <soa::is_table... As>
299 static void overwriteInternalIndices(std::tuple<As...>& dest, std::tuple<As...> const& src)
300 {
301 (std::get<As>(dest).bindInternalIndicesTo(&std::get<As>(src)), ...);
302 }
303
304 template <typename Task, is_table_iterator_or_enumeration Grouping, std::ranges::input_range R, soa::is_table... Associated>
305 static void invokeProcess(Task& task, InputRecord& inputs, R matchers, void (Task::*processingFunction)(Grouping, Associated...), std::vector<ExpressionInfo>& infos, ArrowTableSlicingCache& slices, header::DataOrigin newOrigin = header::DataOrigin{"AOD"})
306 {
307 using G = std::decay_t<Grouping>;
308 auto groupingTable = AnalysisDataProcessorBuilder::bindGroupingTable(inputs, matchers, processingFunction, infos);
309
310 constexpr const int numElements = nested_brace_constructible_size<false, std::decay_t<Task>>() / 10;
311
312 // set filtered tables for partitions with grouping
313 homogeneous_apply_refs_sized<numElements>([&groupingTable](auto& element) {
316 return true;
317 },
318 task);
319
320 if constexpr (sizeof...(Associated) == 0) {
321 // single argument to process
322 homogeneous_apply_refs_sized<numElements>([&groupingTable](auto& element) {
325 return true;
326 },
327 task);
328 if constexpr (soa::is_iterator<G>) {
329 for (auto& element : groupingTable) {
330 std::invoke(processingFunction, task, *element);
331 }
332 } else {
333 std::invoke(processingFunction, task, groupingTable);
334 }
335 } else {
336 // multiple arguments to process
337 auto associatedTables = AnalysisDataProcessorBuilder::bindAssociatedTables(inputs, matchers, processingFunction, infos);
338 // pre-bind self indices
339 std::apply(
340 [&task](auto&... t) mutable {
341 (homogeneous_apply_refs_sized<numElements>(
342 [&t](auto& p) {
344 return true;
345 },
346 task),
347 ...);
348 },
349 associatedTables);
350
351 auto binder = [&task, &groupingTable, &associatedTables](auto& x) mutable {
352 x.bindExternalIndices(&groupingTable, &std::get<std::decay_t<Associated>>(associatedTables)...);
353 homogeneous_apply_refs_sized<numElements>([&x](auto& t) mutable {
356 return true;
357 },
358 task);
359 };
360 groupingTable.bindExternalIndices(&std::get<std::decay_t<Associated>>(associatedTables)...);
361
362 // always pre-bind full tables to support index hierarchy
363 std::apply(
364 [&binder](auto&... x) mutable {
365 (binder(x), ...);
366 },
367 associatedTables);
368
369 // GroupedCombinations bound separately, as they should be set once for all associated tables
370 homogeneous_apply_refs_sized<numElements>([&groupingTable, &associatedTables](auto& t) {
371 analysis_task_parsers::setGroupedCombination(t, groupingTable, associatedTables);
372 return true;
373 },
374 task);
375 overwriteInternalIndices(associatedTables, associatedTables);
376 if constexpr (soa::is_iterator<G>) {
377 auto slicer = GroupSlicer(groupingTable, associatedTables, slices, newOrigin);
378 for (auto& slice : slicer) {
379 auto associatedSlices = slice.associatedTables();
380 overwriteInternalIndices(associatedSlices, associatedTables);
381 std::apply(
382 [&binder](auto&... x) mutable {
383 (binder(x), ...);
384 },
385 associatedSlices);
386
387 // bind partitions and grouping table
388 homogeneous_apply_refs_sized<numElements>([&groupingTable](auto& x) {
390 return true;
391 },
392 task);
393
394 [](Task& task, void (Task::*processingFunction)(Grouping, Associated...), Grouping g, std::tuple<std::decay_t<Associated>...>& at) {
395 std::invoke(processingFunction, task, g, std::get<std::decay_t<Associated>>(at)...);
396 }(task, processingFunction, slice.groupingElement(), associatedSlices);
397 }
398 } else {
399 // bind partitions and grouping table
400 homogeneous_apply_refs_sized<numElements>([&groupingTable](auto& x) {
402 return true;
403 },
404 task);
405
406 [](Task& task, void (Task::*processingFunction)(Grouping, Associated...), Grouping g, std::tuple<std::decay_t<Associated>...>& at) {
407 std::invoke(processingFunction, task, g, std::get<std::decay_t<Associated>>(at)...);
408 }(task, processingFunction, groupingTable, associatedTables);
409 }
410 }
411 }
412};
413} // namespace
414
416 std::vector<std::pair<std::string, bool>> map;
417};
418
420struct TaskName {
421 TaskName(std::string name) : value{std::move(name)} {}
422 std::string value;
423};
424
425namespace
426{
427template <typename T, typename... A>
428auto getTaskNameSetProcesses(std::string& outputName, TaskName first, SetDefaultProcesses second, A... args)
429{
430 auto task = std::make_shared<T>(std::forward<A>(args)...);
431 for (auto& setting : second.map) {
433 [&](auto& element) {
434 return analysis_task_parsers::setProcessSwitch(setting, element);
435 },
436 *task.get());
437 }
438 outputName = first.value;
439 return task;
440}
441
442template <typename T, typename... A>
443auto getTaskNameSetProcesses(std::string& outputName, SetDefaultProcesses first, TaskName second, A... args)
444{
445 auto task = std::make_shared<T>(std::forward<A>(args)...);
446 for (auto& setting : first.map) {
448 [&](auto& element) {
449 return analysis_task_parsers::setProcessSwitch(setting, element);
450 },
451 *task.get());
452 }
453 outputName = second.value;
454 return task;
455}
456
457template <typename T, typename... A>
458auto getTaskNameSetProcesses(std::string& outputName, SetDefaultProcesses first, A... args)
459{
460 auto task = std::make_shared<T>(std::forward<A>(args)...);
461 for (auto& setting : first.map) {
463 [&](auto& element) {
464 return analysis_task_parsers::setProcessSwitch(setting, element);
465 },
466 *task.get());
467 }
468 auto type_name_str = type_name<T>();
469 outputName = type_to_task_name(type_name_str);
470 return task;
471}
472
473template <typename T, typename... A>
474auto getTaskNameSetProcesses(std::string& outputName, TaskName first, A... args)
475{
476 auto task = std::make_shared<T>(std::forward<A>(args)...);
477 outputName = first.value;
478 return task;
479}
480
481template <typename T, typename... A>
482auto getTaskNameSetProcesses(std::string& outputName, A... args)
483{
484 auto task = std::make_shared<T>(std::forward<A>(args)...);
485 auto type_name_str = type_name<T>();
486 outputName = type_to_task_name(type_name_str);
487 return task;
488}
489} // namespace
490
493template <typename T, typename... Args>
495{
496 TH1::AddDirectory(false);
497
498 std::string name_str;
499 auto task = getTaskNameSetProcesses<T>(name_str, args...);
500
501 auto suffix = ctx.options().get<std::string>("workflow-suffix");
502 if (!suffix.empty()) {
503 name_str += suffix;
504 }
505 const char* name = name_str.c_str();
506
507 auto hash = runtime_hash(name);
508
509 std::vector<OutputSpec> outputs;
510 std::vector<InputSpec> inputs;
511 std::vector<ConfigParamSpec> options;
512 std::vector<ExpressionInfo> expressionInfos;
513 std::vector<InputInfo> inputInfos;
514
515 std::string newOriginStr;
516 header::DataOrigin newOrigin{"AOD"};
517 if (ctx.options().hasOption("aod-origin-replace")) {
518 newOriginStr = ctx.options().get<std::string>("aod-origin-replace");
519 if (newOriginStr.size() > 4UL) {
520 wrongOriginReplacement(newOriginStr);
521 }
522 }
523 if (!newOriginStr.empty()) {
524 newOrigin.runtimeInit(newOriginStr.c_str(), std::min(newOriginStr.size(), 4UL));
525 }
526
527 constexpr const int numElements = nested_brace_constructible_size<false, std::decay_t<T>>() / 10;
528
530 homogeneous_apply_refs_sized<numElements>([&options](auto& element) { return analysis_task_parsers::appendOption(options, element); }, *task.get());
532 homogeneous_apply_refs_sized<numElements>([&inputs](auto& element) { return analysis_task_parsers::appendCondition(inputs, element); }, *task.get());
533
535 if constexpr (requires { &T::process; }) {
536 AnalysisDataProcessorBuilder::inputsFromArgs(&T::process, "default", true, inputs, expressionInfos, inputInfos, newOrigin);
537 }
538 homogeneous_apply_refs_sized<numElements>(
539 [name = name_str, &expressionInfos, &inputs, &inputInfos, &newOrigin](auto& x) mutable {
540 // this pushes (argumentIndex, processHash, schemaPtr, nullptr) into expressionInfos for arguments that are Filtered/filtered_iterators
541 return AnalysisDataProcessorBuilder::requestInputsFromArgs(x, name, inputs, expressionInfos, inputInfos, newOrigin);
542 },
543 *task.get());
544
545 // request base tables for spawnable extended tables and indices to be built
546 // this checks for duplications
547 homogeneous_apply_refs_sized<numElements>([&inputs, &newOrigin](auto& element) {
548 return analysis_task_parsers::requestInputs(inputs, element, newOrigin);
549 },
550 *task.get());
551
552 // no static way to check if the task defines any processing, we can only make sure it subscribes to at least something
553 if (inputs.empty() == true) {
554 LOG(warn) << "Task " << name_str << " has no inputs";
555 }
556
557 // update OutputSpecs in output declarations
558 homogeneous_apply_refs_sized<numElements>([&newOrigin](auto& element) { return analysis_task_parsers::updateOutputSpec(element, newOrigin); }, *task.get());
559
560 // Auto-register default ccdb: path options from subscribed timestamped-table inputs.
561 // This allows tasks to accept --ccdb:fXxx overrides without requiring an explicit
562 // ConfigurableCCDBPath<> member for every column in the subscribed table.
563 for (auto& input : inputs) {
564 for (auto& meta : input.metadata) {
565 if (meta.name.starts_with("ccdb:") && meta.name != "ccdb:") {
567 }
568 }
569 }
570
571 // append outputs
572 homogeneous_apply_refs_sized<numElements>([&outputs, &hash](auto& element) { return analysis_task_parsers::appendOutput(outputs, element, hash); }, *task.get());
573
574 // request services
575 auto requiredServices = CommonServices::defaultServices();
576 auto arrowServices = CommonServices::arrowServices();
577 requiredServices.insert(requiredServices.end(), arrowServices.begin(), arrowServices.end());
578 homogeneous_apply_refs_sized<numElements>([&requiredServices](auto& element) { return analysis_task_parsers::addService(requiredServices, element); }, *task.get());
579
580 // replace origins in Preslice declarations
581 homogeneous_apply_refs_sized<numElements>([&newOrigin](auto& element) { return analysis_task_parsers::replaceOrigin(element, newOrigin); }, *task.get());
582
583 auto algo = AlgorithmSpec::InitCallback{[task = task, expressionInfos, inputInfos, newOrigin, newOriginStr](InitContext& ic) mutable {
584 Cache bindingsKeys;
585 Cache bindingsKeysUnsorted;
586 // add preslice declarations to slicing cache definition
587 homogeneous_apply_refs_sized<numElements>([&bindingsKeys, &bindingsKeysUnsorted](auto& element) { return analysis_task_parsers::registerCache(element, bindingsKeys, bindingsKeysUnsorted); }, *task.get());
588
589 homogeneous_apply_refs_sized<numElements>([&ic](auto&& element) { return analysis_task_parsers::prepareOption(ic, element); }, *task.get());
590 homogeneous_apply_refs_sized<numElements>([&ic](auto&& element) { return analysis_task_parsers::prepareService(ic, element); }, *task.get());
591
592 auto& callbacks = ic.services().get<CallbackService>();
593 auto eoscb = [task](EndOfStreamContext& eosContext) {
594 homogeneous_apply_refs_sized<numElements>([&eosContext](auto& element) {
597 return true; },
598 *task.get());
599 eosContext.services().get<ControlService>().readyToQuit(QuitRequest::Me);
600 };
601
602 callbacks.set<CallbackService::Id::EndOfStream>(eoscb);
603
605 if constexpr (requires { task->init(ic); }) {
606 task->init(ic);
607 }
608
610 homogeneous_apply_refs_sized<numElements>(
611 [&ic](auto& element) -> bool { return analysis_task_parsers::updatePlaceholders(ic, element); },
612 *task.get());
614 homogeneous_apply_refs_sized<numElements>([&expressionInfos](auto& element) {
616 },
617 *task.get());
618
620 if constexpr (requires { &T::process; }) {
621 AnalysisDataProcessorBuilder::cacheFromArgs(&T::process, true, bindingsKeys, bindingsKeysUnsorted);
622 }
623 homogeneous_apply_refs_sized<numElements>(
624 [&bindingsKeys, &bindingsKeysUnsorted](auto& x) {
625 return AnalysisDataProcessorBuilder::requestCacheFromArgs(x, bindingsKeys, bindingsKeysUnsorted);
626 },
627 *task.get());
628
630 std::ranges::transform(bindingsKeys, bindingsKeys.begin(), [&newOrigin](Entry& entry) {
631 if ((entry.matcher.origin == header::DataOrigin{"AOD"}) && (newOrigin != header::DataOrigin{"AOD"})) {
632 entry.matcher = replaceOrigin(entry.matcher, newOrigin);
633 }
634 return entry;
635 });
636 std::ranges::transform(bindingsKeysUnsorted, bindingsKeysUnsorted.begin(), [&newOrigin](Entry& entry) {
637 if ((entry.matcher.origin == header::DataOrigin{"AOD"}) && (newOrigin != header::DataOrigin{"AOD"})) {
638 entry.matcher = replaceOrigin(entry.matcher, newOrigin);
639 }
640 return entry;
641 });
642
643 ic.services().get<ArrowTableSlicingCacheDef>().setCaches(std::move(bindingsKeys));
644 ic.services().get<ArrowTableSlicingCacheDef>().setCachesUnsorted(std::move(bindingsKeysUnsorted));
645 ic.services().get<ArrowTableSlicingCacheDef>().setOrigin(newOrigin);
646
647 return [task, expressionInfos, inputInfos, newOrigin](ProcessingContext& pc) mutable {
648 // load the ccdb object from their cache
649 homogeneous_apply_refs_sized<numElements>([&pc](auto& element) { return analysis_task_parsers::newDataframeCondition(pc.inputs(), element); }, *task.get());
650 // reset partitions once per dataframe
651 homogeneous_apply_refs_sized<numElements>([](auto& element) { return analysis_task_parsers::newDataframePartition(element); }, *task.get());
652 // reset selections for the next dataframe
653 std::ranges::for_each(expressionInfos, [](auto& info) { info.resetSelection = true; });
654 // reset pre-slice for the next dataframe
655 auto& slices = pc.services().get<ArrowTableSlicingCache>();
656 homogeneous_apply_refs_sized<numElements>([&slices](auto& element) {
658 },
659 *(task.get()));
660 // initialize local caches
661 homogeneous_apply_refs_sized<numElements>([&pc](auto& element) { return analysis_task_parsers::initializeCache(pc, element); }, *(task.get()));
662 // prepare outputs
663 homogeneous_apply_refs_sized<numElements>([&pc](auto& element) { return analysis_task_parsers::prepareOutput(pc, element); }, *task.get());
664 // execute run()
665 if constexpr (requires { task->run(pc); }) {
666 task->run(pc);
667 }
668 // execute process()
669 if constexpr (requires { &T::process; }) {
670 auto loc = std::ranges::find_if(inputInfos, [](auto const& info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId<decltype(&T::process)>(); });
671 auto matchers = loc == inputInfos.end() ? std::vector<std::pair<int, ConcreteDataMatcher>>{} : loc->matchers;
672 AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), matchers, &T::process, expressionInfos, slices, newOrigin);
673 }
674 // execute optional process()
675 homogeneous_apply_refs_sized<numElements>(
676 [&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin](auto& x) {
677 if constexpr (is_process_configurable<decltype(x)>) {
678 if (x.value == true) {
679 auto loc = std::ranges::find_if(inputInfos, [](auto const& info) { return info.hash == o2::framework::TypeIdHelpers::uniqueId<decltype(x.process)>(); });
680 auto matchers = loc == inputInfos.end() ? std::vector<std::pair<int, ConcreteDataMatcher>>{} : loc->matchers;
681 AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(), matchers, x.process, expressionInfos, slices, newOrigin);
682 return true;
683 }
684 return false;
685 }
686 return false;
687 },
688 *task.get());
689 // prepare delayed outputs
690 homogeneous_apply_refs_sized<numElements>([&pc](auto& element) { return analysis_task_parsers::prepareDelayedOutput(pc, element); }, *task.get());
691 // finalize outputs
692 homogeneous_apply_refs_sized<numElements>([&pc](auto& element) { return analysis_task_parsers::finalizeOutput(pc, element); }, *task.get());
693 };
694 }};
695
696 return {
697 name,
698 inputs,
699 outputs,
700 algo,
701 options,
702 requiredServices};
703}
704
705} // namespace o2::framework
706#endif // FRAMEWORK_ANALYSISTASK_H_
std::vector< framework::ConcreteDataMatcher > matchers
uint32_t hash
atype::type element
std::vector< std::shared_ptr< arrow::Field > > fields
int32_t i
constexpr uint32_t runtime_hash(char const *str)
StringRef key
Definition A.h:16
ConfigParamRegistry & options() const
bool hasOption(const char *key) const
Helper to check if a type T is an iterator.
Definition ASoA.h:1313
bool match(const std::vector< std::string > &queries, const char *pattern)
Definition dcs-ccdb.cxx:229
GLint GLenum GLint x
Definition glcorearb.h:403
const GLfloat * m
Definition glcorearb.h:4066
GLenum src
Definition glcorearb.h:1767
GLuint entry
Definition glcorearb.h:5735
GLuint GLuint end
Definition glcorearb.h:469
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum GLenum GLsizei const GLuint GLboolean enabled
Definition glcorearb.h:2513
GLsizei const GLfloat * value
Definition glcorearb.h:819
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLboolean GLboolean g
Definition glcorearb.h:1233
bool prepareService(InitContext &, T &)
void setGroupedCombination(C &, TG &, Ts &...)
Combinations handling.
bool initializeCache(ProcessingContext &, T &)
Cache handling.
bool replaceOrigin(T &, header::DataOrigin const &)
Preslice handling.
bool requestInputs(std::vector< InputSpec > &, T &, header::DataOrigin)
bool prepareDelayedOutput(ProcessingContext &, T &)
bool prepareOption(InitContext &, O &)
bool finalizeOutput(ProcessingContext &, T &)
bool registerCache(T &, Cache &, Cache &)
bool postRunOutput(EndOfStreamContext &, T &)
bool createExpressionTrees(std::vector< ExpressionInfo > &, T &)
bool prepareOutput(ProcessingContext &, T &)
bool appendOption(std::vector< ConfigParamSpec > &, O &)
Options handling.
bool updateSliceInfo(T &, ArrowTableSlicingCache &)
bool appendCondition(std::vector< InputSpec > &, C &)
Conditions handling.
bool postRunService(EndOfStreamContext &, T &)
bool addService(std::vector< ServiceSpec > &, T &)
Service handling.
bool updatePlaceholders(InitContext &, T &)
Filter handling.
constexpr bool appendOutput(std::vector< OutputSpec > &, T &, uint32_t)
Outputs handling.
bool newDataframeCondition(InputRecord &, C &)
bool updateOutputSpec(T &, header::DataOrigin)
void updateFilterInfo(ExpressionInfo &info, std::shared_ptr< arrow::Table > &table)
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
void updatePairList(Cache &list, Entry &entry)
DataProcessorSpec adaptAnalysisTask(ConfigContext const &ctx, Args &&... args)
std::vector< Entry > Cache
std::string type_to_task_name(std::string_view const &camelCase)
Convert a CamelCase task struct name to snake-case task name.
@ Me
Only quit this data processor.
constexpr auto homogeneous_apply_refs(L l, T &&object)
ConfigParamSpec replaceOrigin(ConfigParamSpec &source, std::string const &originStr)
void wrongOriginReplacement(std::string_view replacement)
std::string cutString(std::string &&str)
Definition ASoA.cxx:276
Descriptor< gSizeDataOriginString > DataOrigin
Definition DataHeader.h:550
auto createFieldsFromColumns(framework::pack< C... >)
Definition ASoA.h:58
void missingFilterDeclaration(int hash, int ai)
Definition ASoA.cxx:33
std::function< ProcessCallback(InitContext &)> InitCallback
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static std::vector< ServiceSpec > arrowServices()
static void addOptionIfMissing(std::vector< ConfigParamSpec > &specs, const ConfigParamSpec &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static constexpr int64_t step
static constexpr int64_t begin
std::vector< std::pair< std::string, bool > > map
Struct to differentiate task names from possible task string arguments.
TaskName(std::string name)
static constexpr uint32_t uniqueId()
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables)
Definition ASoA.cxx:72
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"