12#ifndef FRAMEWORK_ANALYSIS_TASK_H_
13#define FRAMEWORK_ANALYSIS_TASK_H_
30#include <arrow/compute/kernel.h>
31#include <arrow/table.h>
32#include <gandiva/node.h>
48template <
int64_t BEGIN,
int64_t END,
int64_t STEP = 1>
56static constexpr bool is_enumeration_v =
false;
58template <
int64_t BEGIN,
int64_t END,
int64_t STEP>
59static constexpr bool is_enumeration_v<Enumeration<BEGIN, END, STEP>> =
true;
67struct AnalysisDataProcessorBuilder {
69 static ConfigParamSpec getSpec()
72 return ConfigParamSpec{std::string{
"input:"} + aod::MetadataTrait<T>::metadata::tableLabel(),
VariantType::String, aod::MetadataTrait<T>::metadata::sourceSpec(), {
"\"\""}};
74 using O1 = framework::pack_element_t<0, typename T::originals>;
75 return ConfigParamSpec{std::string{
"input:"} + aod::MetadataTrait<T>::metadata::tableLabel(),
VariantType::String, aod::MetadataTrait<O1>::metadata::sourceSpec(), {
"\"\""}};
79 template <soa::TableRef R>
80 static ConfigParamSpec getSpec()
82 return soa::tableRef2ConfigParamSpec<R>();
85 template <soa::with_sources T>
86 static inline auto getSources()
88 return []<
size_t N, std::array<soa::TableRef, N> refs>() {
89 return []<
size_t... Is>(std::index_sequence<Is...>) {
90 return std::vector{soa::tableRef2ConfigParamSpec<refs[Is]>()...};
91 }(std::make_index_sequence<N>());
92 }.template operator()<T::sources.size(), T::sources>();
95 template <soa::with_sources T>
97 static auto getInputMetadata()
99 std::vector<ConfigParamSpec> inputMetadata;
100 auto inputSources = getSources<T>();
101 std::sort(inputSources.begin(), inputSources.end(), [](ConfigParamSpec
const&
a, ConfigParamSpec
const&
b) { return a.name < b.name; });
102 auto last = std::unique(inputSources.begin(), inputSources.end(), [](ConfigParamSpec
const&
a, ConfigParamSpec
const&
b) { return a.name == b.name; });
103 inputSources.erase(last, inputSources.end());
104 inputMetadata.insert(inputMetadata.end(), inputSources.begin(), inputSources.end());
105 return inputMetadata;
108 template <
typename G,
typename... Args>
109 static void addGroupingCandidates(std::vector<StringPair>& bk, std::vector<StringPair>& bku)
111 [&bk, &bku]<
typename... As>(framework::pack<As...>)
mutable {
116 ([&bk, &bku, &
key]()
mutable {
117 if constexpr (soa::relatedByIndex<std::decay_t<G>, std::decay_t<As>>()) {
118 auto binding = soa::getLabelFromTypeForKey<std::decay_t<As>>(
key);
127 }(framework::pack<Args...>{});
130 template <soa::TableRef R>
131 static void addOriginalRef(
const char*
name,
bool value, std::vector<InputSpec>& inputs)
133 using metadata =
typename aod::MetadataTrait<
o2::aod::Hash<
R.desc_hash>>::metadata;
134 std::vector<ConfigParamSpec> inputMetadata;
137 auto inputSources = getInputMetadata<metadata>();
138 inputMetadata.insert(inputMetadata.end(), inputSources.begin(), inputSources.end());
144 template <soa::is_table A>
146 static void addExpression(
int, uint32_t, std::vector<ExpressionInfo>&)
150 template <soa::is_filtered_table A>
151 static void addExpression(
int ai, uint32_t hash, std::vector<ExpressionInfo>& eInfos)
154 eInfos.emplace_back(ai, hash, std::decay_t<A>::hashes(), std::make_shared<arrow::Schema>(fields));
157 template <soa::is_iterator A>
158 static void addExpression(
int ai, uint32_t hash, std::vector<ExpressionInfo>& eInfos)
160 addExpression<typename std::decay_t<A>::parent_t>(ai, hash, eInfos);
164 template <soa::is_table A>
165 static void addInput(
const char*
name,
bool value, std::vector<InputSpec>& inputs)
167 [&
name, &
value, &inputs]<
size_t N, std::array<soa::TableRef, N> refs,
size_t... Is>(std::index_sequence<Is...>)
mutable {
168 (addOriginalRef<refs[Is]>(
name,
value, inputs), ...);
169 }.template operator()<A::originals.size(), std::decay_t<A>::originals>(std::make_index_sequence<std::decay_t<A>::originals.size()>());
172 template <soa::is_iterator A>
173 static void addInput(
const char*
name,
bool value, std::vector<InputSpec>& inputs)
175 addInput<typename std::decay_t<A>::parent_t>(
name,
value, inputs);
180 static void addInputsAndExpressions(uint32_t hash,
const char*
name,
bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos)
183 ([&ai, &hash, &eInfos, &
name, &
value, &inputs]()
mutable {
185 using T = std::decay_t<As>;
186 addExpression<T>(ai, hash, eInfos);
194 template <
typename R,
typename C, is_enumeration A>
195 static void inputsFromArgs(
R (C::*)(
A),
const char* ,
bool , std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>&, std::vector<StringPair>&, std::vector<StringPair>&)
197 std::vector<ConfigParamSpec> inputMetadata;
204 static void inputsFromArgs(
R (C::*)(
A, Args...),
const char*
name,
bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos, std::vector<StringPair>& bk, std::vector<StringPair>& bku)
205 requires(std::is_lvalue_reference_v<A> && (std::is_lvalue_reference_v<Args> && ...))
207 addGroupingCandidates<
A, Args...>(bk, bku);
209 addInputsAndExpressions<typename std::decay_t<A>::parent_t, Args...>(hash,
name,
value, inputs, eInfos);
214 static void inputsFromArgs(
R (C::*)(Args...),
const char*
name,
bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos, std::vector<StringPair>&, std::vector<StringPair>&)
215 requires(std::is_lvalue_reference_v<Args> && ...)
218 addInputsAndExpressions<Args...>(hash,
name,
value, inputs, eInfos);
221 template <soa::TableRef R>
222 static auto extractTableFromRecord(InputRecord& record)
224 auto table = record.get<TableConsumer>(o2::aod::label<R>())->asArrowTable();
225 if (table->num_rows() == 0) {
226 table = makeEmptyTable<R>();
231 template <soa::is_table T>
232 static auto extractFromRecord(InputRecord& record)
234 return T { [&record]<
size_t N, std::array<soa::TableRef, N> refs,
size_t... Is>(std::index_sequence<Is...>) {
return std::vector{extractTableFromRecord<refs[Is]>(record)...}; }.template operator()<T::originals.size(), T::originals>(std::make_index_sequence<T::originals.size()>()) };
237 template <soa::is_iterator T>
238 static auto extractFromRecord(InputRecord& record)
240 return typename T::parent_t { [&record]<
size_t N, std::array<soa::TableRef, N> refs,
size_t... Is>(std::index_sequence<Is...>) {
return std::vector{extractTableFromRecord<refs[Is]>(record)...}; }.template operator()<T::parent_t::originals.size(), T::parent_t::originals>(std::make_index_sequence<T::parent_t::originals.size()>()) };
243 template <soa::is_filtered T>
244 static auto extractFilteredFromRecord(InputRecord& record,
ExpressionInfo& info)
246 std::shared_ptr<arrow::Table> table =
nullptr;
247 auto joiner = [&record]<
size_t N, std::array<soa::TableRef, N> refs,
size_t... Is>(std::index_sequence<Is...>) {
return std::vector{extractTableFromRecord<refs[Is]>(record)...}; };
249 table =
o2::soa::ArrowHelpers::joinTables(joiner.template
operator()<T::parent_t::originals.size(), T::parent_t::originals>(std::make_index_sequence<T::parent_t::originals.size()>()));
260 return typename T::parent_t({table}, info.
selection);
266 template <is_enumeration T,
int AI>
267 static auto extract(InputRecord&, std::vector<ExpressionInfo>&,
size_t)
272 template <soa::is_iterator T,
int AI>
273 static auto extract(InputRecord& record, std::vector<ExpressionInfo>& infos,
size_t phash)
275 if constexpr (std::same_as<typename T::policy_t, soa::FilteredIndexPolicy>) {
276 return extractFilteredFromRecord<T>(record, *std::find_if(infos.begin(), infos.end(), [&phash](
ExpressionInfo const&
i) { return (i.processHash == phash && i.argumentIndex == AI); }));
278 return extractFromRecord<T>(record);
282 template <soa::is_table T,
int AI>
283 static auto extract(InputRecord& record, std::vector<ExpressionInfo>& infos,
size_t phash)
286 return extractFilteredFromRecord<T>(record, *std::find_if(infos.begin(), infos.end(), [&phash](
ExpressionInfo const&
i) { return (i.processHash == phash && i.argumentIndex == AI); }));
288 return extractFromRecord<T>(record);
292 template <
typename R,
typename C,
typename Grouping,
typename... Args>
293 static auto bindGroupingTable(InputRecord& record,
R (C::*)(Grouping, Args...), std::vector<ExpressionInfo>& infos)
294 requires(!std::same_as<Grouping, void>)
297 return extract<std::decay_t<Grouping>, 0>(record, infos, hash);
300 template <
typename R,
typename C,
typename Grouping,
typename... Args>
301 static auto bindAssociatedTables(InputRecord& record,
R (C::*)(Grouping, Args...), std::vector<ExpressionInfo>& infos)
302 requires(!std::same_as<Grouping, void> &&
sizeof...(Args) > 0)
304 constexpr auto p = pack<Args...>{};
306 return std::make_tuple(extract<std::decay_t<Args>, has_type_at_v<Args>(p) + 1>(record, infos, hash)...);
309 template <
typename... As>
310 static void overwriteInternalIndices(std::tuple<As...>& dest, std::tuple<As...>
const&
src)
312 (std::get<As>(dest).bindInternalIndicesTo(&std::get<As>(
src)), ...);
315 template <
typename Task,
typename R,
typename C,
typename Grouping,
typename... Associated>
316 static void invokeProcess(Task& task, InputRecord& inputs,
R (C::*processingFunction)(Grouping, Associated...), std::vector<ExpressionInfo>& infos, ArrowTableSlicingCache& slices)
318 using G = std::decay_t<Grouping>;
319 auto groupingTable = AnalysisDataProcessorBuilder::bindGroupingTable(inputs, processingFunction, infos);
329 if constexpr (
sizeof...(Associated) == 0) {
338 for (
auto& element : groupingTable) {
339 std::invoke(processingFunction, task, *element);
343 "Single argument of process() should be a table-like or an iterator");
344 std::invoke(processingFunction, task, groupingTable);
349 "Associated arguments of process() should not be iterators");
350 auto associatedTables = AnalysisDataProcessorBuilder::bindAssociatedTables(inputs, processingFunction, infos);
353 [&task](
auto&... t)
mutable {
364 auto binder = [&task, &groupingTable, &associatedTables](
auto&
x)
mutable {
365 x.bindExternalIndices(&groupingTable, &std::get<std::decay_t<Associated>>(associatedTables)...);
373 groupingTable.bindExternalIndices(&std::get<std::decay_t<Associated>>(associatedTables)...);
377 [&binder](
auto&...
x)
mutable {
388 overwriteInternalIndices(associatedTables, associatedTables);
390 auto slicer = GroupSlicer(groupingTable, associatedTables, slices);
391 for (
auto& slice : slicer) {
392 auto associatedSlices = slice.associatedTables();
393 overwriteInternalIndices(associatedSlices, associatedTables);
395 [&binder](
auto&...
x)
mutable {
407 invokeProcessWithArgs(task, processingFunction, slice.groupingElement(), associatedSlices);
417 invokeProcessWithArgs(task, processingFunction, groupingTable, associatedTables);
422 template <
typename C,
typename T,
typename G,
typename...
A>
423 static void invokeProcessWithArgs(C& task, T processingFunction, G
g, std::tuple<A...>& at)
425 std::invoke(processingFunction, task,
g, std::get<A>(at)...);
431 std::vector<std::pair<std::string, bool>>
map;
441template <
typename T,
typename...
A>
444 auto task = std::make_shared<T>(std::forward<A>(args)...);
445 for (
auto& setting : second.
map) {
448 return analysis_task_parsers::setProcessSwitch(setting, element);
452 outputName =
first.value;
456template <
typename T,
typename...
A>
459 auto task = std::make_shared<T>(std::forward<A>(args)...);
460 for (
auto& setting :
first.map) {
463 return analysis_task_parsers::setProcessSwitch(setting, element);
467 outputName = second.
value;
471template <
typename T,
typename...
A>
472auto getTaskNameSetProcesses(std::string& outputName, SetDefaultProcesses
first,
A... args)
474 auto task = std::make_shared<T>(std::forward<A>(args)...);
475 for (
auto& setting :
first.map) {
478 return analysis_task_parsers::setProcessSwitch(setting, element);
482 auto type_name_str = type_name<T>();
483 outputName = type_to_task_name(type_name_str);
487template <
typename T,
typename...
A>
488auto getTaskNameSetProcesses(std::string& outputName,
TaskName first,
A... args)
490 auto task = std::make_shared<T>(std::forward<A>(args)...);
491 outputName =
first.value;
495template <
typename T,
typename...
A>
496auto getTaskNameSetProcesses(std::string& outputName,
A... args)
498 auto task = std::make_shared<T>(std::forward<A>(args)...);
499 auto type_name_str = type_name<T>();
500 outputName = type_to_task_name(type_name_str);
508template <
typename T,
typename... Args>
511 TH1::AddDirectory(
false);
513 std::string name_str;
514 auto task = getTaskNameSetProcesses<T>(name_str, args...);
516 auto suffix = ctx.
options().
get<std::string>(
"workflow-suffix");
517 if (!suffix.empty()) {
520 const char*
name = name_str.c_str();
524 std::vector<OutputSpec> outputs;
525 std::vector<InputSpec> inputs;
526 std::vector<ConfigParamSpec> options;
527 std::vector<ExpressionInfo> expressionInfos;
528 std::vector<StringPair> bindingsKeys;
529 std::vector<StringPair> bindingsKeysUnsorted;
537 if constexpr (
requires { &T::process; }) {
538 AnalysisDataProcessorBuilder::inputsFromArgs(&T::process,
"default",
true, inputs, expressionInfos, bindingsKeys, bindingsKeysUnsorted);
544 AnalysisDataProcessorBuilder::inputsFromArgs(
x.process, (
name +
"/" +
x.name).c_str(),
x.value, inputs, expressionInfos, bindingsKeys, bindingsKeysUnsorted);
563 if (inputs.empty() ==
true) {
564 LOG(warn) <<
"Task " << name_str <<
" has no inputs";
571 requiredServices.insert(requiredServices.end(), arrowServices.begin(), arrowServices.end());
600 if constexpr (
requires { task->init(ic); }) {
618 for (
auto& info : expressionInfos) {
619 info.resetSelection =
true;
632 if constexpr (
requires { task->run(pc); }) {
636 if constexpr (
requires { AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), &T::process, expressionInfos, slices); }) {
637 AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), &T::process, expressionInfos, slices);
641 [&pc, &expressionInfos, &task, &slices](
auto&
x)
mutable {
643 if (
x.value ==
true) {
644 AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(),
x.process, expressionInfos, slices);
constexpr uint32_t runtime_hash(char const *str)
ConfigParamRegistry & options() const
T get(const char *key) const
Helper to check if a type T is an iterator.
GLuint const GLchar * name
GLboolean GLboolean GLboolean b
GLsizei const GLfloat * value
GLboolean GLboolean GLboolean GLboolean a
bool prepareService(InitContext &, T &)
void bindExternalIndicesPartition(P &, T *...)
bool requestInputs(std::vector< InputSpec > &, T const &)
Table auto-creation handling.
void setGroupedCombination(C &, TG &, Ts &...)
Combinations handling.
bool initializeCache(ProcessingContext &, T &)
bool preInitializeCache(InitContext &, T &)
Cache handling.
void setPartition(P &, T &...)
bool prepareOption(InitContext &, O &)
bool appendOutput(std::vector< OutputSpec > &, T &, uint32_t)
Outputs handling.
bool finalizeOutput(ProcessingContext &, T &)
bool postRunOutput(EndOfStreamContext &, T &)
bool createExpressionTrees(std::vector< ExpressionInfo > &, T &)
bool newDataframePartition(T &)
bool prepareOutput(ProcessingContext &, T &)
bool appendOption(std::vector< ConfigParamSpec > &, O &)
Options handling.
void bindInternalIndicesPartition(P &, T *)
bool updateSliceInfo(T &, ArrowTableSlicingCache &)
bool appendCondition(std::vector< InputSpec > &, C &)
Conditions handling.
bool postRunService(EndOfStreamContext &, T &)
bool addService(std::vector< ServiceSpec > &, T &)
Service handling.
bool updatePlaceholders(InitContext &, T &)
Filter handling.
bool registerCache(T &, std::vector< StringPair > &, std::vector< StringPair > &)
Preslice handling.
bool newDataframeCondition(InputRecord &, C &)
void updateFilterInfo(ExpressionInfo &info, std::shared_ptr< arrow::Table > &table)
Defining PrimaryVertex explicitly as messageable.
void updatePairList(std::vector< StringPair > &list, std::string const &binding, std::string const &key)
auto homogeneous_apply_refs(L l, T &&object)
DataProcessorSpec adaptAnalysisTask(ConfigContext const &ctx, Args &&... args)
@ Me
Only quit this data processor.
std::string cutString(std::string &&str)
auto createFieldsFromColumns(framework::pack< C... >)
void missingFilterDeclaration(int hash, int ai)
Defining DataPointCompositeObject explicitly as copiable.
gandiva::Selection selection
std::function< ProcessCallback(InitContext &)> InitCallback
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static std::vector< ServiceSpec > arrowServices()
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static constexpr int64_t step
static constexpr int64_t begin
std::vector< std::pair< std::string, bool > > map
Struct to differentiate task names from possible task string arguments.
TaskName(std::string name)
static constexpr uint32_t uniqueId()
From https://en.cppreference.com/w/cpp/utility/variant/visit.
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"