12#ifndef FRAMEWORK_ANALYSIS_TASK_H_
13#define FRAMEWORK_ANALYSIS_TASK_H_
30#include <arrow/compute/kernel.h>
31#include <arrow/table.h>
32#include <gandiva/node.h>
48template <
int64_t BEGIN,
int64_t END,
int64_t STEP = 1>
56static constexpr bool is_enumeration_v =
false;
58template <
int64_t BEGIN,
int64_t END,
int64_t STEP>
59static constexpr bool is_enumeration_v<Enumeration<BEGIN, END, STEP>> =
true;
67struct AnalysisDataProcessorBuilder {
68 template <
typename G,
typename... Args>
69 static void addGroupingCandidates(std::vector<StringPair>& bk, std::vector<StringPair>& bku)
71 [&bk, &bku]<
typename... As>(framework::pack<As...>)
mutable {
76 ([&bk, &bku, &
key]()
mutable {
77 if constexpr (soa::relatedByIndex<std::decay_t<G>, std::decay_t<As>>()) {
78 auto binding = soa::getLabelFromTypeForKey<std::decay_t<As>>(
key);
87 }(framework::pack<Args...>{});
90 template <soa::TableRef R>
91 static void addOriginalRef(
const char*
name,
bool value, std::vector<InputSpec>& inputs)
93 auto spec = soa::tableRef2InputSpec<R>();
99 template <soa::is_table A>
101 static void addExpression(
int, uint32_t, std::vector<ExpressionInfo>&)
105 template <soa::is_filtered_table A>
106 static void addExpression(
int ai, uint32_t hash, std::vector<ExpressionInfo>& eInfos)
109 eInfos.emplace_back(ai, hash, std::decay_t<A>::hashes(), std::make_shared<arrow::Schema>(fields));
112 template <soa::is_iterator A>
113 static void addExpression(
int ai, uint32_t hash, std::vector<ExpressionInfo>& eInfos)
115 addExpression<typename std::decay_t<A>::parent_t>(ai, hash, eInfos);
119 template <soa::is_table A>
120 static void addInput(
const char*
name,
bool value, std::vector<InputSpec>& inputs)
122 [&
name, &
value, &inputs]<
size_t N, std::array<soa::TableRef, N> refs,
size_t... Is>(std::index_sequence<Is...>)
mutable {
123 (addOriginalRef<refs[Is]>(
name,
value, inputs), ...);
124 }.template operator()<A::originals.size(), std::decay_t<A>::originals>(std::make_index_sequence<std::decay_t<A>::originals.size()>());
127 template <soa::is_iterator A>
128 static void addInput(
const char*
name,
bool value, std::vector<InputSpec>& inputs)
130 addInput<typename std::decay_t<A>::parent_t>(
name,
value, inputs);
135 static void addInputsAndExpressions(uint32_t hash,
const char*
name,
bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos)
138 ([&ai, &hash, &eInfos, &
name, &
value, &inputs]()
mutable {
140 using T = std::decay_t<As>;
141 addExpression<T>(ai, hash, eInfos);
149 template <
typename R,
typename C, is_enumeration A>
150 static void inputsFromArgs(
R (C::*)(
A),
const char* ,
bool , std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>&, std::vector<StringPair>&, std::vector<StringPair>&)
152 std::vector<ConfigParamSpec> inputMetadata;
159 static void inputsFromArgs(
R (C::*)(
A, Args...),
const char*
name,
bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos, std::vector<StringPair>& bk, std::vector<StringPair>& bku)
160 requires(std::is_lvalue_reference_v<A> && (std::is_lvalue_reference_v<Args> && ...))
162 addGroupingCandidates<
A, Args...>(bk, bku);
164 addInputsAndExpressions<typename std::decay_t<A>::parent_t, Args...>(hash,
name,
value, inputs, eInfos);
169 static void inputsFromArgs(
R (C::*)(Args...),
const char*
name,
bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos, std::vector<StringPair>&, std::vector<StringPair>&)
170 requires(std::is_lvalue_reference_v<Args> && ...)
173 addInputsAndExpressions<Args...>(hash,
name,
value, inputs, eInfos);
176 template <soa::TableRef R>
177 static auto extractTableFromRecord(InputRecord& record)
179 auto table = record.get<TableConsumer>(o2::aod::label<R>())->asArrowTable();
180 if (table->num_rows() == 0) {
181 table = makeEmptyTable<R>();
186 template <soa::is_table T>
187 static auto extractFromRecord(InputRecord& record)
189 return T { [&record]<
size_t N, std::array<soa::TableRef, N> refs,
size_t... Is>(std::index_sequence<Is...>) {
return std::vector{extractTableFromRecord<refs[Is]>(record)...}; }.template operator()<T::originals.size(), T::originals>(std::make_index_sequence<T::originals.size()>()) };
192 template <soa::is_iterator T>
193 static auto extractFromRecord(InputRecord& record)
195 return typename T::parent_t { [&record]<
size_t N, std::array<soa::TableRef, N> refs,
size_t... Is>(std::index_sequence<Is...>) {
return std::vector{extractTableFromRecord<refs[Is]>(record)...}; }.template operator()<T::parent_t::originals.size(), T::parent_t::originals>(std::make_index_sequence<T::parent_t::originals.size()>()) };
198 template <soa::is_filtered T>
199 static auto extractFilteredFromRecord(InputRecord& record,
ExpressionInfo& info)
201 std::shared_ptr<arrow::Table> table =
nullptr;
202 auto joiner = [&record]<
size_t N, std::array<soa::TableRef, N> refs,
size_t... Is>(std::index_sequence<Is...>) {
return std::vector{extractTableFromRecord<refs[Is]>(record)...}; };
204 table =
o2::soa::ArrowHelpers::joinTables(joiner.template
operator()<T::parent_t::originals.size(), T::parent_t::originals>(std::make_index_sequence<T::parent_t::originals.size()>()));
215 return typename T::parent_t({table}, info.
selection);
221 template <is_enumeration T,
int AI>
222 static auto extract(InputRecord&, std::vector<ExpressionInfo>&,
size_t)
227 template <soa::is_iterator T,
int AI>
228 static auto extract(InputRecord& record, std::vector<ExpressionInfo>& infos,
size_t phash)
230 if constexpr (std::same_as<typename T::policy_t, soa::FilteredIndexPolicy>) {
231 return extractFilteredFromRecord<T>(record, *std::find_if(infos.begin(), infos.end(), [&phash](
ExpressionInfo const&
i) { return (i.processHash == phash && i.argumentIndex == AI); }));
233 return extractFromRecord<T>(record);
237 template <soa::is_table T,
int AI>
238 static auto extract(InputRecord& record, std::vector<ExpressionInfo>& infos,
size_t phash)
241 return extractFilteredFromRecord<T>(record, *std::find_if(infos.begin(), infos.end(), [&phash](
ExpressionInfo const&
i) { return (i.processHash == phash && i.argumentIndex == AI); }));
243 return extractFromRecord<T>(record);
247 template <
typename R,
typename C,
typename Grouping,
typename... Args>
248 static auto bindGroupingTable(InputRecord& record,
R (C::*)(Grouping, Args...), std::vector<ExpressionInfo>& infos)
249 requires(!std::same_as<Grouping, void>)
252 return extract<std::decay_t<Grouping>, 0>(record, infos, hash);
255 template <
typename R,
typename C,
typename Grouping,
typename... Args>
256 static auto bindAssociatedTables(InputRecord& record,
R (C::*)(Grouping, Args...), std::vector<ExpressionInfo>& infos)
257 requires(!std::same_as<Grouping, void> &&
sizeof...(Args) > 0)
259 constexpr auto p = pack<Args...>{};
261 return std::make_tuple(extract<std::decay_t<Args>, has_type_at_v<Args>(p) + 1>(record, infos, hash)...);
264 template <
typename... As>
265 static void overwriteInternalIndices(std::tuple<As...>& dest, std::tuple<As...>
const&
src)
267 (std::get<As>(dest).bindInternalIndicesTo(&std::get<As>(
src)), ...);
270 template <
typename Task,
typename R,
typename C,
typename Grouping,
typename... Associated>
271 static void invokeProcess(Task& task, InputRecord& inputs,
R (C::*processingFunction)(Grouping, Associated...), std::vector<ExpressionInfo>& infos, ArrowTableSlicingCache& slices)
273 using G = std::decay_t<Grouping>;
274 auto groupingTable = AnalysisDataProcessorBuilder::bindGroupingTable(inputs, processingFunction, infos);
284 if constexpr (
sizeof...(Associated) == 0) {
293 for (
auto& element : groupingTable) {
294 std::invoke(processingFunction, task, *element);
298 "Single argument of process() should be a table-like or an iterator");
299 std::invoke(processingFunction, task, groupingTable);
304 "Associated arguments of process() should not be iterators");
305 auto associatedTables = AnalysisDataProcessorBuilder::bindAssociatedTables(inputs, processingFunction, infos);
308 [&task](
auto&... t)
mutable {
319 auto binder = [&task, &groupingTable, &associatedTables](
auto&
x)
mutable {
320 x.bindExternalIndices(&groupingTable, &std::get<std::decay_t<Associated>>(associatedTables)...);
328 groupingTable.bindExternalIndices(&std::get<std::decay_t<Associated>>(associatedTables)...);
332 [&binder](
auto&...
x)
mutable {
343 overwriteInternalIndices(associatedTables, associatedTables);
345 auto slicer = GroupSlicer(groupingTable, associatedTables, slices);
346 for (
auto& slice : slicer) {
347 auto associatedSlices = slice.associatedTables();
348 overwriteInternalIndices(associatedSlices, associatedTables);
350 [&binder](
auto&...
x)
mutable {
362 invokeProcessWithArgs(task, processingFunction, slice.groupingElement(), associatedSlices);
372 invokeProcessWithArgs(task, processingFunction, groupingTable, associatedTables);
377 template <
typename C,
typename T,
typename G,
typename...
A>
378 static void invokeProcessWithArgs(C& task, T processingFunction, G
g, std::tuple<A...>& at)
380 std::invoke(processingFunction, task,
g, std::get<A>(at)...);
386 std::vector<std::pair<std::string, bool>>
map;
396template <
typename T,
typename...
A>
399 auto task = std::make_shared<T>(std::forward<A>(args)...);
400 for (
auto& setting : second.
map) {
403 return analysis_task_parsers::setProcessSwitch(setting, element);
407 outputName =
first.value;
411template <
typename T,
typename...
A>
414 auto task = std::make_shared<T>(std::forward<A>(args)...);
415 for (
auto& setting :
first.map) {
418 return analysis_task_parsers::setProcessSwitch(setting, element);
422 outputName = second.
value;
426template <
typename T,
typename...
A>
427auto getTaskNameSetProcesses(std::string& outputName, SetDefaultProcesses
first,
A... args)
429 auto task = std::make_shared<T>(std::forward<A>(args)...);
430 for (
auto& setting :
first.map) {
433 return analysis_task_parsers::setProcessSwitch(setting, element);
437 auto type_name_str = type_name<T>();
438 outputName = type_to_task_name(type_name_str);
442template <
typename T,
typename...
A>
443auto getTaskNameSetProcesses(std::string& outputName,
TaskName first,
A... args)
445 auto task = std::make_shared<T>(std::forward<A>(args)...);
446 outputName =
first.value;
450template <
typename T,
typename...
A>
451auto getTaskNameSetProcesses(std::string& outputName,
A... args)
453 auto task = std::make_shared<T>(std::forward<A>(args)...);
454 auto type_name_str = type_name<T>();
455 outputName = type_to_task_name(type_name_str);
463template <
typename T,
typename... Args>
466 TH1::AddDirectory(
false);
468 std::string name_str;
469 auto task = getTaskNameSetProcesses<T>(name_str, args...);
471 auto suffix = ctx.
options().
get<std::string>(
"workflow-suffix");
472 if (!suffix.empty()) {
475 const char*
name = name_str.c_str();
479 std::vector<OutputSpec> outputs;
480 std::vector<InputSpec> inputs;
481 std::vector<ConfigParamSpec> options;
482 std::vector<ExpressionInfo> expressionInfos;
483 std::vector<StringPair> bindingsKeys;
484 std::vector<StringPair> bindingsKeysUnsorted;
492 if constexpr (
requires { &T::process; }) {
493 AnalysisDataProcessorBuilder::inputsFromArgs(&T::process,
"default",
true, inputs, expressionInfos, bindingsKeys, bindingsKeysUnsorted);
499 AnalysisDataProcessorBuilder::inputsFromArgs(
x.process, (
name +
"/" +
x.name).c_str(),
x.value, inputs, expressionInfos, bindingsKeys, bindingsKeysUnsorted);
518 if (inputs.empty() ==
true) {
519 LOG(warn) <<
"Task " << name_str <<
" has no inputs";
526 requiredServices.insert(requiredServices.end(), arrowServices.begin(), arrowServices.end());
555 if constexpr (
requires { task->init(ic); }) {
573 for (
auto& info : expressionInfos) {
574 info.resetSelection =
true;
587 if constexpr (
requires { task->run(pc); }) {
591 if constexpr (
requires { AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), &T::process, expressionInfos, slices); }) {
592 AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), &T::process, expressionInfos, slices);
596 [&pc, &expressionInfos, &task, &slices](
auto&
x)
mutable {
598 if (
x.value ==
true) {
599 AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(),
x.process, expressionInfos, slices);
constexpr uint32_t runtime_hash(char const *str)
ConfigParamRegistry & options() const
T get(const char *key) const
Helper to check if a type T is an iterator.
GLuint const GLchar * name
GLsizei const GLfloat * value
bool prepareService(InitContext &, T &)
void bindExternalIndicesPartition(P &, T *...)
bool requestInputs(std::vector< InputSpec > &, T const &)
Table auto-creation handling.
void setGroupedCombination(C &, TG &, Ts &...)
Combinations handling.
bool initializeCache(ProcessingContext &, T &)
bool preInitializeCache(InitContext &, T &)
Cache handling.
void setPartition(P &, T &...)
bool prepareOption(InitContext &, O &)
bool appendOutput(std::vector< OutputSpec > &, T &, uint32_t)
Outputs handling.
bool finalizeOutput(ProcessingContext &, T &)
bool postRunOutput(EndOfStreamContext &, T &)
bool createExpressionTrees(std::vector< ExpressionInfo > &, T &)
bool newDataframePartition(T &)
bool prepareOutput(ProcessingContext &, T &)
bool appendOption(std::vector< ConfigParamSpec > &, O &)
Options handling.
void bindInternalIndicesPartition(P &, T *)
bool updateSliceInfo(T &, ArrowTableSlicingCache &)
bool appendCondition(std::vector< InputSpec > &, C &)
Conditions handling.
bool postRunService(EndOfStreamContext &, T &)
bool addService(std::vector< ServiceSpec > &, T &)
Service handling.
bool updatePlaceholders(InitContext &, T &)
Filter handling.
bool registerCache(T &, std::vector< StringPair > &, std::vector< StringPair > &)
Preslice handling.
bool newDataframeCondition(InputRecord &, C &)
void updateFilterInfo(ExpressionInfo &info, std::shared_ptr< arrow::Table > &table)
Defining PrimaryVertex explicitly as messageable.
void updatePairList(std::vector< StringPair > &list, std::string const &binding, std::string const &key)
auto homogeneous_apply_refs(L l, T &&object)
DataProcessorSpec adaptAnalysisTask(ConfigContext const &ctx, Args &&... args)
@ Me
Only quit this data processor.
std::string cutString(std::string &&str)
auto createFieldsFromColumns(framework::pack< C... >)
void missingFilterDeclaration(int hash, int ai)
Defining DataPointCompositeObject explicitly as copiable.
gandiva::Selection selection
std::function< ProcessCallback(InitContext &)> InitCallback
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static std::vector< ServiceSpec > arrowServices()
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static constexpr int64_t step
static constexpr int64_t begin
std::vector< std::pair< std::string, bool > > map
Struct to differentiate task names from possible task string arguments.
TaskName(std::string name)
static constexpr uint32_t uniqueId()
From https://en.cppreference.com/w/cpp/utility/variant/visit.
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"