12#ifndef FRAMEWORK_ANALYSIS_TASK_H_
13#define FRAMEWORK_ANALYSIS_TASK_H_
30#include <arrow/compute/kernel.h>
31#include <arrow/table.h>
32#include <gandiva/node.h>
50template <
int64_t BEGIN,
int64_t END,
int64_t STEP = 1>
58static constexpr bool is_enumeration_v =
false;
60template <
int64_t BEGIN,
int64_t END,
int64_t STEP>
61static constexpr bool is_enumeration_v<Enumeration<BEGIN, END, STEP>> =
true;
69struct AnalysisDataProcessorBuilder {
73 [&bk, &bku,
enabled]<
typename... As>(framework::pack<As...>)
mutable {
76 if constexpr (soa::relatedByIndex<std::decay_t<G>, std::decay_t<As>>()) {
77 auto binding = soa::getLabelFromTypeForKey<std::decay_t<As>>(
key);
86 }(framework::pack<Args...>{});
89 template <soa::TableRef R>
90 static void addOriginalRef(
const char*
name,
bool value, std::vector<InputSpec>& inputs)
92 auto spec = soa::tableRef2InputSpec<R>();
98 template <soa::is_table A>
100 static void addExpression(
int, uint32_t, std::vector<ExpressionInfo>&)
104 template <soa::is_filtered_table A>
105 static void addExpression(
int ai, uint32_t hash, std::vector<ExpressionInfo>& eInfos)
108 eInfos.emplace_back(ai, hash, std::decay_t<A>::hashes(), std::make_shared<arrow::Schema>(fields));
111 template <soa::is_iterator A>
112 static void addExpression(
int ai, uint32_t hash, std::vector<ExpressionInfo>& eInfos)
114 addExpression<typename std::decay_t<A>::parent_t>(ai, hash, eInfos);
118 template <soa::is_table A>
119 static void addInput(
const char*
name,
bool value, std::vector<InputSpec>& inputs)
121 [&
name, &
value, &inputs]<
size_t N, std::array<soa::TableRef, N> refs,
size_t... Is>(std::index_sequence<Is...>)
mutable {
122 (addOriginalRef<refs[Is]>(
name,
value, inputs), ...);
123 }.template operator()<A::originals.size(), std::decay_t<A>::originals>(std::make_index_sequence<std::decay_t<A>::originals.size()>());
126 template <soa::is_iterator A>
127 static void addInput(
const char*
name,
bool value, std::vector<InputSpec>& inputs)
129 addInput<typename std::decay_t<A>::parent_t>(
name,
value, inputs);
134 static void addInputsAndExpressions(uint32_t hash,
const char*
name,
bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos)
137 ([&ai, &hash, &eInfos, &
name, &
value, &inputs]()
mutable {
139 using T = std::decay_t<As>;
140 addExpression<T>(ai, hash, eInfos);
147 template <
typename T>
148 inline static bool requestInputsFromArgs(T&, std::string
const&, std::vector<InputSpec>&, std::vector<ExpressionInfo>&)
152 template <is_process_configurable T>
153 inline static bool requestInputsFromArgs(T& pc, std::string
const&
name, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eis)
155 AnalysisDataProcessorBuilder::inputsFromArgs(pc.process, (
name +
"/" + pc.name).c_str(), pc.value, inputs, eis);
158 template <
typename T>
159 inline static bool requestCacheFromArgs(T&,
Cache&,
Cache&)
163 template <is_process_configurable T>
164 inline static bool requestCacheFromArgs(T& pc,
Cache& bk,
Cache& bku)
166 AnalysisDataProcessorBuilder::cacheFromArgs(pc.process, pc.value, bk, bku);
170 template <
typename R,
typename C, is_enumeration A>
171 static void inputsFromArgs(
R (C::*)(
A),
const char* ,
bool , std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>&)
173 std::vector<ConfigParamSpec> inputMetadata;
180 static void inputsFromArgs(
R (C::*)(
A, Args...),
const char*
name,
bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos)
181 requires(std::is_lvalue_reference_v<A> && (std::is_lvalue_reference_v<Args> && ...))
184 addInputsAndExpressions<typename std::decay_t<A>::parent_t, Args...>(hash,
name,
value, inputs, eInfos);
189 static void inputsFromArgs(
R (C::*)(Args...),
const char*
name,
bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos)
190 requires(std::is_lvalue_reference_v<Args> && ...)
193 addInputsAndExpressions<Args...>(hash,
name,
value, inputs, eInfos);
197 template <
typename R,
typename C, is_enumeration A>
198 static void cacheFromArgs(
R (C::*)(
A), bool,
Cache&,
Cache&)
203 static void cacheFromArgs(
R (C::*)(
A, Args...),
bool value,
Cache& bk,
Cache& bku)
205 addGroupingCandidates<
A, Args...>(bk, bku,
value);
209 static void cacheFromArgs(
R (C::*)(
A, Args...), bool,
Cache&,
Cache&)
213 template <soa::TableRef R>
214 static auto extractTableFromRecord(InputRecord& record)
216 auto table = record.get<TableConsumer>(o2::aod::label<R>())->asArrowTable();
217 if (table->num_rows() == 0) {
218 table = makeEmptyTable<R>();
223 template <soa::is_table T>
224 static auto extractFromRecord(InputRecord& record)
226 return T { [&record]<
size_t N, std::array<soa::TableRef, N> refs,
size_t... Is>(std::index_sequence<Is...>) {
return std::vector{extractTableFromRecord<refs[Is]>(record)...}; }.template operator()<T::originals.size(), T::originals>(std::make_index_sequence<T::originals.size()>()) };
229 template <soa::is_iterator T>
230 static auto extractFromRecord(InputRecord& record)
232 return typename T::parent_t { [&record]<
size_t N, std::array<soa::TableRef, N> refs,
size_t... Is>(std::index_sequence<Is...>) {
return std::vector{extractTableFromRecord<refs[Is]>(record)...}; }.template operator()<T::parent_t::originals.size(), T::parent_t::originals>(std::make_index_sequence<T::parent_t::originals.size()>()) };
235 template <soa::is_filtered T>
236 static auto extractFilteredFromRecord(InputRecord& record,
ExpressionInfo& info)
238 std::shared_ptr<arrow::Table> table =
nullptr;
239 auto joiner = [&record]<
size_t N, std::array<soa::TableRef, N> refs,
size_t... Is>(std::index_sequence<Is...>) {
return std::vector{extractTableFromRecord<refs[Is]>(record)...}; };
241 table =
o2::soa::ArrowHelpers::joinTables(joiner.template
operator()<T::parent_t::originals.size(), T::parent_t::originals>(std::make_index_sequence<T::parent_t::originals.size()>()), std::span{T::parent_t::originalLabels});
243 table =
o2::soa::ArrowHelpers::joinTables(joiner.template
operator()<T::originals.size(), T::originals>(std::make_index_sequence<T::originals.size()>()), std::span{T::originalLabels});
252 return typename T::parent_t({table}, info.
selection);
258 template <is_enumeration T,
int AI>
259 static auto extract(InputRecord&, std::vector<ExpressionInfo>&,
size_t)
264 template <soa::is_iterator T,
int AI>
265 static auto extract(InputRecord& record, std::vector<ExpressionInfo>& infos,
size_t phash)
267 if constexpr (std::same_as<typename T::policy_t, soa::FilteredIndexPolicy>) {
268 return extractFilteredFromRecord<T>(record, *std::find_if(infos.begin(), infos.end(), [&phash](
ExpressionInfo const&
i) { return (i.processHash == phash && i.argumentIndex == AI); }));
270 return extractFromRecord<T>(record);
274 template <soa::is_table T,
int AI>
275 static auto extract(InputRecord& record, std::vector<ExpressionInfo>& infos,
size_t phash)
278 return extractFilteredFromRecord<T>(record, *std::find_if(infos.begin(), infos.end(), [&phash](
ExpressionInfo const&
i) { return (i.processHash == phash && i.argumentIndex == AI); }));
280 return extractFromRecord<T>(record);
284 template <
typename R,
typename C,
typename Grouping,
typename... Args>
285 static auto bindGroupingTable(InputRecord& record,
R (C::*)(Grouping, Args...), std::vector<ExpressionInfo>& infos)
286 requires(!std::same_as<Grouping, void>)
289 return extract<std::decay_t<Grouping>, 0>(record, infos, hash);
292 template <
typename R,
typename C,
typename Grouping,
typename... Args>
293 static auto bindAssociatedTables(InputRecord& record,
R (C::*)(Grouping, Args...), std::vector<ExpressionInfo>& infos)
294 requires(!std::same_as<Grouping, void> &&
sizeof...(Args) > 0)
296 constexpr auto p = pack<Args...>{};
298 return std::make_tuple(extract<std::decay_t<Args>, has_type_at_v<Args>(p) + 1>(record, infos, hash)...);
301 template <
typename... As>
302 static void overwriteInternalIndices(std::tuple<As...>& dest, std::tuple<As...>
const&
src)
304 (std::get<As>(dest).bindInternalIndicesTo(&std::get<As>(
src)), ...);
307 template <
typename Task,
typename R,
typename C,
typename Grouping,
typename... Associated>
308 static void invokeProcess(Task& task, InputRecord& inputs,
R (C::*processingFunction)(Grouping, Associated...), std::vector<ExpressionInfo>& infos, ArrowTableSlicingCache& slices)
310 using G = std::decay_t<Grouping>;
311 auto groupingTable = AnalysisDataProcessorBuilder::bindGroupingTable(inputs, processingFunction, infos);
321 if constexpr (
sizeof...(Associated) == 0) {
330 for (
auto& element : groupingTable) {
331 std::invoke(processingFunction, task, *element);
335 "Single argument of process() should be a table-like or an iterator");
336 std::invoke(processingFunction, task, groupingTable);
341 "Associated arguments of process() should not be iterators");
342 auto associatedTables = AnalysisDataProcessorBuilder::bindAssociatedTables(inputs, processingFunction, infos);
345 [&task](
auto&... t)
mutable {
356 auto binder = [&task, &groupingTable, &associatedTables](
auto&
x)
mutable {
357 x.bindExternalIndices(&groupingTable, &std::get<std::decay_t<Associated>>(associatedTables)...);
365 groupingTable.bindExternalIndices(&std::get<std::decay_t<Associated>>(associatedTables)...);
369 [&binder](
auto&...
x)
mutable {
380 overwriteInternalIndices(associatedTables, associatedTables);
382 auto slicer = GroupSlicer(groupingTable, associatedTables, slices);
383 for (
auto& slice : slicer) {
384 auto associatedSlices = slice.associatedTables();
385 overwriteInternalIndices(associatedSlices, associatedTables);
387 [&binder](
auto&...
x)
mutable {
399 invokeProcessWithArgs(task, processingFunction, slice.groupingElement(), associatedSlices);
409 invokeProcessWithArgs(task, processingFunction, groupingTable, associatedTables);
414 template <
typename C,
typename T,
typename G,
typename...
A>
415 static void invokeProcessWithArgs(C& task, T processingFunction, G
g, std::tuple<A...>& at)
417 std::invoke(processingFunction, task,
g, std::get<A>(at)...);
423 std::vector<std::pair<std::string, bool>>
map;
433template <
typename T,
typename...
A>
436 auto task = std::make_shared<T>(std::forward<A>(args)...);
437 for (
auto& setting : second.
map) {
440 return analysis_task_parsers::setProcessSwitch(setting, element);
444 outputName =
first.value;
448template <
typename T,
typename...
A>
451 auto task = std::make_shared<T>(std::forward<A>(args)...);
452 for (
auto& setting :
first.map) {
455 return analysis_task_parsers::setProcessSwitch(setting, element);
459 outputName = second.
value;
463template <
typename T,
typename...
A>
464auto getTaskNameSetProcesses(std::string& outputName, SetDefaultProcesses
first,
A... args)
466 auto task = std::make_shared<T>(std::forward<A>(args)...);
467 for (
auto& setting :
first.map) {
470 return analysis_task_parsers::setProcessSwitch(setting, element);
474 auto type_name_str = type_name<T>();
475 outputName = type_to_task_name(type_name_str);
479template <
typename T,
typename...
A>
480auto getTaskNameSetProcesses(std::string& outputName,
TaskName first,
A... args)
482 auto task = std::make_shared<T>(std::forward<A>(args)...);
483 outputName =
first.value;
487template <
typename T,
typename...
A>
488auto getTaskNameSetProcesses(std::string& outputName,
A... args)
490 auto task = std::make_shared<T>(std::forward<A>(args)...);
491 auto type_name_str = type_name<T>();
492 outputName = type_to_task_name(type_name_str);
500template <
typename T,
typename... Args>
503 TH1::AddDirectory(
false);
505 std::string name_str;
506 auto task = getTaskNameSetProcesses<T>(name_str, args...);
508 auto suffix = ctx.
options().
get<std::string>(
"workflow-suffix");
509 if (!suffix.empty()) {
512 const char*
name = name_str.c_str();
516 std::vector<OutputSpec> outputs;
517 std::vector<InputSpec> inputs;
518 std::vector<ConfigParamSpec> options;
519 std::vector<ExpressionInfo> expressionInfos;
527 if constexpr (
requires { &T::process; }) {
528 AnalysisDataProcessorBuilder::inputsFromArgs(&T::process,
"default",
true, inputs, expressionInfos);
531 [
name = name_str, &expressionInfos, &inputs](
auto&
x)
mutable {
533 return AnalysisDataProcessorBuilder::requestInputsFromArgs(
x,
name, inputs, expressionInfos);
545 if (inputs.empty() ==
true) {
546 LOG(warn) <<
"Task " << name_str <<
" has no inputs";
553 requiredServices.insert(requiredServices.end(), arrowServices.begin(), arrowServices.end());
558 Cache bindingsKeysUnsorted;
587 if constexpr (
requires { task->init(ic); }) {
592 if constexpr (
requires { &T::process; }) {
593 AnalysisDataProcessorBuilder::cacheFromArgs(&T::process,
true, bindingsKeys, bindingsKeysUnsorted);
596 [&bindingsKeys, &bindingsKeysUnsorted](
auto&
x)
mutable {
597 return AnalysisDataProcessorBuilder::requestCacheFromArgs(
x, bindingsKeys, bindingsKeysUnsorted);
615 for (
auto& info : expressionInfos) {
616 info.resetSelection =
true;
629 if constexpr (
requires { task->run(pc); }) {
633 if constexpr (
requires { AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), &T::process, expressionInfos, slices); }) {
634 AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), &T::process, expressionInfos, slices);
638 [&pc, &expressionInfos, &task, &slices](
auto&
x)
mutable {
640 if (
x.value ==
true) {
641 AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(),
x.process, expressionInfos, slices);
constexpr uint32_t runtime_hash(char const *str)
ConfigParamRegistry & options() const
T get(const char *key) const
Helper to check if a type T is an iterator.
GLuint const GLchar * name
GLenum GLenum GLsizei const GLuint GLboolean enabled
GLsizei const GLfloat * value
bool prepareService(InitContext &, T &)
void bindExternalIndicesPartition(P &, T *...)
bool requestInputs(std::vector< InputSpec > &, T const &)
Table auto-creation handling.
void setGroupedCombination(C &, TG &, Ts &...)
Combinations handling.
bool initializeCache(ProcessingContext &, T &)
bool preInitializeCache(InitContext &, T &)
Cache handling.
void setPartition(P &, T &...)
bool prepareDelayedOutput(ProcessingContext &, T &)
bool prepareOption(InitContext &, O &)
bool appendOutput(std::vector< OutputSpec > &, T &, uint32_t)
Outputs handling.
bool finalizeOutput(ProcessingContext &, T &)
bool registerCache(T &, Cache &, Cache &)
Preslice handling.
bool postRunOutput(EndOfStreamContext &, T &)
bool createExpressionTrees(std::vector< ExpressionInfo > &, T &)
bool newDataframePartition(T &)
bool prepareOutput(ProcessingContext &, T &)
bool appendOption(std::vector< ConfigParamSpec > &, O &)
Options handling.
bool updateSliceInfo(T &, ArrowTableSlicingCache &)
void bindInternalIndicesPartition(P &, T *)
bool appendCondition(std::vector< InputSpec > &, C &)
Conditions handling.
bool postRunService(EndOfStreamContext &, T &)
bool addService(std::vector< ServiceSpec > &, T &)
Service handling.
bool updatePlaceholders(InitContext &, T &)
Filter handling.
bool newDataframeCondition(InputRecord &, C &)
void updateFilterInfo(ExpressionInfo &info, std::shared_ptr< arrow::Table > &table)
Defining PrimaryVertex explicitly as messageable.
auto homogeneous_apply_refs(L l, T &&object)
DataProcessorSpec adaptAnalysisTask(ConfigContext const &ctx, Args &&... args)
std::vector< Entry > Cache
void updatePairList(Cache &list, std::string const &binding, std::string const &key, bool enabled)
@ Me
Only quit this data processor.
std::string cutString(std::string &&str)
auto createFieldsFromColumns(framework::pack< C... >)
void missingFilterDeclaration(int hash, int ai)
Defining DataPointCompositeObject explicitly as copiable.
gandiva::Selection selection
std::function< ProcessCallback(InitContext &)> InitCallback
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static std::vector< ServiceSpec > arrowServices()
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static constexpr int64_t step
static constexpr int64_t begin
std::vector< std::pair< std::string, bool > > map
Struct to differentiate task names from possible task string arguments.
TaskName(std::string name)
static constexpr uint32_t uniqueId()
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables, std::span< const char *const > labels)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"