12#ifndef FRAMEWORK_ANALYSIS_TASK_H_
13#define FRAMEWORK_ANALYSIS_TASK_H_
30#include <arrow/compute/kernel.h>
31#include <arrow/table.h>
32#include <gandiva/node.h>
50template <
int64_t BEGIN,
int64_t END,
int64_t STEP = 1>
58static constexpr bool is_enumeration_v =
false;
60template <
int64_t BEGIN,
int64_t END,
int64_t STEP>
61static constexpr bool is_enumeration_v<Enumeration<BEGIN, END, STEP>> =
true;
70struct AnalysisDataProcessorBuilder {
74 [&bk, &bku,
enabled]<
typename... As>(framework::pack<As...>)
mutable {
77 if constexpr (soa::relatedByIndex<std::decay_t<G>, std::decay_t<As>>()) {
78 auto binding = soa::getLabelFromTypeForKey<std::decay_t<As>>(
key);
87 }(framework::pack<Args...>{});
90 template <soa::TableRef R>
91 static void addOriginalRef(
const char*
name,
bool value, std::vector<InputSpec>& inputs)
93 auto spec = soa::tableRef2InputSpec<R>();
99 template <soa::is_table A>
101 static void addExpression(
int, uint32_t, std::vector<ExpressionInfo>&)
105 template <soa::is_filtered_table A>
106 static void addExpression(
int ai, uint32_t hash, std::vector<ExpressionInfo>& eInfos)
109 eInfos.emplace_back(ai, hash, std::decay_t<A>::hashes(), std::make_shared<arrow::Schema>(fields));
112 template <soa::is_iterator A>
113 static void addExpression(
int ai, uint32_t hash, std::vector<ExpressionInfo>& eInfos)
115 addExpression<typename std::decay_t<A>::parent_t>(ai, hash, eInfos);
119 template <soa::is_table A>
120 static void addInput(
const char*
name,
bool value, std::vector<InputSpec>& inputs)
122 [&
name, &
value, &inputs]<
size_t N, std::array<soa::TableRef, N> refs,
size_t... Is>(std::index_sequence<Is...>)
mutable {
123 (addOriginalRef<refs[Is]>(
name,
value, inputs), ...);
124 }.template operator()<A::originals.size(), std::decay_t<A>::originals>(std::make_index_sequence<std::decay_t<A>::originals.size()>());
127 template <soa::is_iterator A>
128 static void addInput(
const char*
name,
bool value, std::vector<InputSpec>& inputs)
130 addInput<typename std::decay_t<A>::parent_t>(
name,
value, inputs);
135 static void addInputsAndExpressions(uint32_t hash,
const char*
name,
bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos)
138 ([&ai, &hash, &eInfos, &
name, &
value, &inputs]()
mutable {
140 using T = std::decay_t<As>;
141 addExpression<T>(ai, hash, eInfos);
148 template <
typename T>
149 inline static bool requestInputsFromArgs(T&, std::string
const&, std::vector<InputSpec>&, std::vector<ExpressionInfo>&)
153 template <is_process_configurable T>
154 inline static bool requestInputsFromArgs(T& pc, std::string
const&
name, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eis)
156 AnalysisDataProcessorBuilder::inputsFromArgs(pc.process, (
name +
"/" + pc.name).c_str(), pc.value, inputs, eis);
159 template <
typename T>
160 inline static bool requestCacheFromArgs(T&,
Cache&,
Cache&)
164 template <is_process_configurable T>
165 inline static bool requestCacheFromArgs(T& pc,
Cache& bk,
Cache& bku)
167 AnalysisDataProcessorBuilder::cacheFromArgs(pc.process, pc.value, bk, bku);
171 template <
typename R,
typename C, is_enumeration A>
172 static void inputsFromArgs(
R (C::*)(
A),
const char* ,
bool , std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>&)
174 std::vector<ConfigParamSpec> inputMetadata;
181 static void inputsFromArgs(
R (C::*)(
A, Args...),
const char*
name,
bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos)
182 requires(std::is_lvalue_reference_v<A> && (std::is_lvalue_reference_v<Args> && ...))
185 addInputsAndExpressions<typename std::decay_t<A>::parent_t, Args...>(hash,
name,
value, inputs, eInfos);
190 static void inputsFromArgs(
R (C::*)(Args...),
const char*
name,
bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos)
191 requires(std::is_lvalue_reference_v<Args> && ...)
194 addInputsAndExpressions<Args...>(hash,
name,
value, inputs, eInfos);
198 template <
typename R,
typename C, is_enumeration A>
199 static void cacheFromArgs(
R (C::*)(
A), bool,
Cache&,
Cache&)
204 static void cacheFromArgs(
R (C::*)(
A, Args...),
bool value,
Cache& bk,
Cache& bku)
206 addGroupingCandidates<
A, Args...>(bk, bku,
value);
210 static void cacheFromArgs(
R (C::*)(
A, Args...), bool,
Cache&,
Cache&)
214 template <soa::TableRef R>
215 static auto extractTableFromRecord(InputRecord& record)
217 auto table = record.get<TableConsumer>(o2::aod::label<R>())->asArrowTable();
218 if (table->num_rows() == 0) {
219 table = makeEmptyTable<R>();
224 template <soa::is_table T>
225 static auto extractFromRecord(InputRecord& record)
227 return T { [&record]<
size_t N, std::array<soa::TableRef, N> refs,
size_t... Is>(std::index_sequence<Is...>) {
return std::vector{extractTableFromRecord<refs[Is]>(record)...}; }.template operator()<T::originals.size(), T::originals>(std::make_index_sequence<T::originals.size()>()) };
230 template <soa::is_iterator T>
231 static auto extractFromRecord(InputRecord& record)
233 return typename T::parent_t { [&record]<
size_t N, std::array<soa::TableRef, N> refs,
size_t... Is>(std::index_sequence<Is...>) {
return std::vector{extractTableFromRecord<refs[Is]>(record)...}; }.template operator()<T::parent_t::originals.size(), T::parent_t::originals>(std::make_index_sequence<T::parent_t::originals.size()>()) };
236 template <soa::is_filtered T>
237 static auto extractFilteredFromRecord(InputRecord& record,
ExpressionInfo& info)
239 std::shared_ptr<arrow::Table> table =
nullptr;
240 auto joiner = [&record]<
size_t N, std::array<soa::TableRef, N> refs,
size_t... Is>(std::index_sequence<Is...>) {
return std::vector{extractTableFromRecord<refs[Is]>(record)...}; };
242 table =
o2::soa::ArrowHelpers::joinTables(joiner.template
operator()<T::parent_t::originals.size(), T::parent_t::originals>(std::make_index_sequence<T::parent_t::originals.size()>()), std::span{T::parent_t::originalLabels});
244 table =
o2::soa::ArrowHelpers::joinTables(joiner.template
operator()<T::originals.size(), T::originals>(std::make_index_sequence<T::originals.size()>()), std::span{T::originalLabels});
253 return typename T::parent_t({table}, info.
selection);
259 template <is_enumeration T,
int AI>
260 static auto extract(InputRecord&, std::vector<ExpressionInfo>&,
size_t)
265 template <soa::is_iterator T,
int AI>
266 static auto extract(InputRecord& record, std::vector<ExpressionInfo>& infos,
size_t phash)
268 if constexpr (std::same_as<typename T::policy_t, soa::FilteredIndexPolicy>) {
269 return extractFilteredFromRecord<T>(record, *std::find_if(infos.begin(), infos.end(), [&phash](
ExpressionInfo const&
i) { return (i.processHash == phash && i.argumentIndex == AI); }));
271 return extractFromRecord<T>(record);
275 template <soa::is_table T,
int AI>
276 static auto extract(InputRecord& record, std::vector<ExpressionInfo>& infos,
size_t phash)
279 return extractFilteredFromRecord<T>(record, *std::find_if(infos.begin(), infos.end(), [&phash](
ExpressionInfo const&
i) { return (i.processHash == phash && i.argumentIndex == AI); }));
281 return extractFromRecord<T>(record);
285 template <
typename R,
typename C,
typename Grouping,
typename... Args>
286 static auto bindGroupingTable(InputRecord& record,
R (C::*)(Grouping, Args...), std::vector<ExpressionInfo>& infos)
287 requires(!std::same_as<Grouping, void>)
290 return extract<std::decay_t<Grouping>, 0>(record, infos, hash);
293 template <
typename R,
typename C,
typename Grouping,
typename... Args>
294 static auto bindAssociatedTables(InputRecord& record,
R (C::*)(Grouping, Args...), std::vector<ExpressionInfo>& infos)
295 requires(!std::same_as<Grouping, void> &&
sizeof...(Args) > 0)
297 constexpr auto p = pack<Args...>{};
299 return std::make_tuple(extract<std::decay_t<Args>, has_type_at_v<Args>(p) + 1>(record, infos, hash)...);
302 template <
typename... As>
303 static void overwriteInternalIndices(std::tuple<As...>& dest, std::tuple<As...>
const&
src)
305 (std::get<As>(dest).bindInternalIndicesTo(&std::get<As>(
src)), ...);
308 template <
typename Task,
typename R,
typename C,
typename Grouping,
typename... Associated>
309 static void invokeProcess(Task& task, InputRecord& inputs,
R (C::*processingFunction)(Grouping, Associated...), std::vector<ExpressionInfo>& infos, ArrowTableSlicingCache& slices)
311 using G = std::decay_t<Grouping>;
312 auto groupingTable = AnalysisDataProcessorBuilder::bindGroupingTable(inputs, processingFunction, infos);
322 if constexpr (
sizeof...(Associated) == 0) {
331 for (
auto& element : groupingTable) {
332 std::invoke(processingFunction, task, *element);
336 "Single argument of process() should be a table-like or an iterator");
337 std::invoke(processingFunction, task, groupingTable);
342 "Associated arguments of process() should not be iterators");
343 auto associatedTables = AnalysisDataProcessorBuilder::bindAssociatedTables(inputs, processingFunction, infos);
346 [&task](
auto&... t)
mutable {
357 auto binder = [&task, &groupingTable, &associatedTables](
auto&
x)
mutable {
358 x.bindExternalIndices(&groupingTable, &std::get<std::decay_t<Associated>>(associatedTables)...);
366 groupingTable.bindExternalIndices(&std::get<std::decay_t<Associated>>(associatedTables)...);
370 [&binder](
auto&...
x)
mutable {
381 overwriteInternalIndices(associatedTables, associatedTables);
383 auto slicer = GroupSlicer(groupingTable, associatedTables, slices);
384 for (
auto& slice : slicer) {
385 auto associatedSlices = slice.associatedTables();
386 overwriteInternalIndices(associatedSlices, associatedTables);
388 [&binder](
auto&...
x)
mutable {
400 invokeProcessWithArgs(task, processingFunction, slice.groupingElement(), associatedSlices);
410 invokeProcessWithArgs(task, processingFunction, groupingTable, associatedTables);
415 template <
typename C,
typename T,
typename G,
typename...
A>
416 static void invokeProcessWithArgs(C& task, T processingFunction, G
g, std::tuple<A...>& at)
418 std::invoke(processingFunction, task,
g, std::get<A>(at)...);
424 std::vector<std::pair<std::string, bool>>
map;
435template <
typename T,
typename...
A>
438 auto task = std::make_shared<T>(std::forward<A>(args)...);
439 for (
auto& setting : second.
map) {
442 return analysis_task_parsers::setProcessSwitch(setting, element);
446 outputName =
first.value;
450template <
typename T,
typename...
A>
453 auto task = std::make_shared<T>(std::forward<A>(args)...);
454 for (
auto& setting :
first.map) {
457 return analysis_task_parsers::setProcessSwitch(setting, element);
461 outputName = second.
value;
465template <
typename T,
typename...
A>
466auto getTaskNameSetProcesses(std::string& outputName, SetDefaultProcesses
first,
A... args)
468 auto task = std::make_shared<T>(std::forward<A>(args)...);
469 for (
auto& setting :
first.map) {
472 return analysis_task_parsers::setProcessSwitch(setting, element);
476 auto type_name_str = type_name<T>();
477 outputName = type_to_task_name(type_name_str);
481template <
typename T,
typename...
A>
482auto getTaskNameSetProcesses(std::string& outputName,
TaskName first,
A... args)
484 auto task = std::make_shared<T>(std::forward<A>(args)...);
485 outputName =
first.value;
489template <
typename T,
typename...
A>
490auto getTaskNameSetProcesses(std::string& outputName,
A... args)
492 auto task = std::make_shared<T>(std::forward<A>(args)...);
493 auto type_name_str = type_name<T>();
494 outputName = type_to_task_name(type_name_str);
502template <
typename T,
typename... Args>
505 TH1::AddDirectory(
false);
507 std::string name_str;
508 auto task = getTaskNameSetProcesses<T>(name_str, args...);
510 auto suffix = ctx.
options().
get<std::string>(
"workflow-suffix");
511 if (!suffix.empty()) {
514 const char*
name = name_str.c_str();
518 std::vector<OutputSpec> outputs;
519 std::vector<InputSpec> inputs;
520 std::vector<ConfigParamSpec> options;
521 std::vector<ExpressionInfo> expressionInfos;
529 if constexpr (
requires { &T::process; }) {
530 AnalysisDataProcessorBuilder::inputsFromArgs(&T::process,
"default",
true, inputs, expressionInfos);
533 [
name = name_str, &expressionInfos, &inputs](
auto&
x)
mutable {
535 return AnalysisDataProcessorBuilder::requestInputsFromArgs(
x,
name, inputs, expressionInfos);
547 if (inputs.empty() ==
true) {
548 LOG(warn) <<
"Task " << name_str <<
" has no inputs";
555 requiredServices.insert(requiredServices.end(), arrowServices.begin(), arrowServices.end());
560 Cache bindingsKeysUnsorted;
580 if constexpr (
requires { task->init(ic); }) {
595 if constexpr (
requires { &T::process; }) {
596 AnalysisDataProcessorBuilder::cacheFromArgs(&T::process,
true, bindingsKeys, bindingsKeysUnsorted);
599 [&bindingsKeys, &bindingsKeysUnsorted](
auto&
x)
mutable {
600 return AnalysisDataProcessorBuilder::requestCacheFromArgs(
x, bindingsKeys, bindingsKeysUnsorted);
618 for (
auto& info : expressionInfos) {
619 info.resetSelection =
true;
632 if constexpr (
requires { task->run(pc); }) {
636 if constexpr (
requires { AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), &T::process, expressionInfos, slices); }) {
637 AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), &T::process, expressionInfos, slices);
641 [&pc, &expressionInfos, &task, &slices](
auto&
x)
mutable {
643 if (
x.value ==
true) {
644 AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(),
x.process, expressionInfos, slices);
constexpr uint32_t runtime_hash(char const *str)
ConfigParamRegistry & options() const
T get(const char *key) const
Helper to check if a type T is an iterator.
GLuint const GLchar * name
GLenum GLenum GLsizei const GLuint GLboolean enabled
GLsizei const GLfloat * value
bool prepareService(InitContext &, T &)
void bindExternalIndicesPartition(P &, T *...)
bool requestInputs(std::vector< InputSpec > &, T const &)
Table auto-creation handling.
void setGroupedCombination(C &, TG &, Ts &...)
Combinations handling.
bool initializeCache(ProcessingContext &, T &)
bool preInitializeCache(InitContext &, T &)
Cache handling.
void setPartition(P &, T &...)
bool prepareDelayedOutput(ProcessingContext &, T &)
bool prepareOption(InitContext &, O &)
bool appendOutput(std::vector< OutputSpec > &, T &, uint32_t)
Outputs handling.
bool finalizeOutput(ProcessingContext &, T &)
bool registerCache(T &, Cache &, Cache &)
Preslice handling.
bool postRunOutput(EndOfStreamContext &, T &)
bool createExpressionTrees(std::vector< ExpressionInfo > &, T &)
bool newDataframePartition(T &)
bool prepareOutput(ProcessingContext &, T &)
bool appendOption(std::vector< ConfigParamSpec > &, O &)
Options handling.
bool updateSliceInfo(T &, ArrowTableSlicingCache &)
void bindInternalIndicesPartition(P &, T *)
bool appendCondition(std::vector< InputSpec > &, C &)
Conditions handling.
bool postRunService(EndOfStreamContext &, T &)
bool addService(std::vector< ServiceSpec > &, T &)
Service handling.
bool updatePlaceholders(InitContext &, T &)
Filter handling.
bool newDataframeCondition(InputRecord &, C &)
void updateFilterInfo(ExpressionInfo &info, std::shared_ptr< arrow::Table > &table)
Defining PrimaryVertex explicitly as messageable.
auto homogeneous_apply_refs(L l, T &&object)
DataProcessorSpec adaptAnalysisTask(ConfigContext const &ctx, Args &&... args)
std::vector< Entry > Cache
void updatePairList(Cache &list, std::string const &binding, std::string const &key, bool enabled)
@ Me
Only quit this data processor.
std::string cutString(std::string &&str)
auto createFieldsFromColumns(framework::pack< C... >)
void missingFilterDeclaration(int hash, int ai)
Defining DataPointCompositeObject explicitly as copiable.
gandiva::Selection selection
std::function< ProcessCallback(InitContext &)> InitCallback
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static std::vector< ServiceSpec > arrowServices()
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static constexpr int64_t step
static constexpr int64_t begin
std::vector< std::pair< std::string, bool > > map
Struct to differentiate task names from possible task string arguments.
TaskName(std::string name)
static constexpr uint32_t uniqueId()
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables, std::span< const char *const > labels)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"