12#ifndef FRAMEWORK_ANALYSIS_TASK_H_
13#define FRAMEWORK_ANALYSIS_TASK_H_
29#include <arrow/compute/kernel.h>
30#include <arrow/table.h>
31#include <gandiva/node.h>
52template <
int64_t BEGIN,
int64_t END,
int64_t STEP = 1>
60static constexpr bool is_enumeration_v =
false;
62template <
int64_t BEGIN,
int64_t END,
int64_t STEP>
63static constexpr bool is_enumeration_v<Enumeration<BEGIN, END, STEP>> =
true;
75struct AnalysisDataProcessorBuilder {
82 if constexpr (soa::relatedByIndex<std::decay_t<G>, std::decay_t<As>>()) {
83 Entry e{soa::getLabelFromTypeForKey<std::decay_t<As>>(
key), soa::getMatcherFromTypeForKey<std::decay_t<As>>(
key),
key,
enabled};
92 }(framework::pack<Args...>{}, bk, bku,
enabled);
95 template <soa::TableRef R>
96 static void addOriginalRef(
const char*
name,
bool value, std::vector<InputSpec>& inputs)
98 auto spec = soa::tableRef2InputSpec<R>();
104 template <soa::is_table A>
106 static void addExpression(
int, uint32_t, std::vector<ExpressionInfo>&)
110 template <soa::is_filtered_table A>
111 static void addExpression(
int ai, uint32_t
hash, std::vector<ExpressionInfo>& eInfos)
114 eInfos.emplace_back(ai,
hash, std::decay_t<A>::hashes(), std::make_shared<arrow::Schema>(
fields));
117 template <soa::is_iterator A>
118 static void addExpression(
int ai, uint32_t
hash, std::vector<ExpressionInfo>& eInfos)
120 addExpression<typename std::decay_t<A>::parent_t>(ai,
hash, eInfos);
124 template <soa::is_table A>
125 static void addInput(
const char*
name,
bool value, std::vector<InputSpec>& inputs)
127 [&
name, &
value, &inputs]<
size_t N, std::array<soa::TableRef, N> refs,
size_t... Is>(std::index_sequence<Is...>)
mutable {
128 (addOriginalRef<refs[Is]>(
name,
value, inputs), ...);
129 }.template operator()<A::originals.size(), std::decay_t<A>::originals>(std::make_index_sequence<std::decay_t<A>::originals.size()>());
132 template <soa::is_iterator A>
133 static void addInput(
const char*
name,
bool value, std::vector<InputSpec>& inputs)
135 addInput<typename std::decay_t<A>::parent_t>(
name,
value, inputs);
140 static void addInputsAndExpressions(uint32_t
hash,
const char*
name,
bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos)
145 using T = std::decay_t<As>;
146 addExpression<T>(ai,
hash, eInfos);
153 template <
typename T>
154 inline static bool requestInputsFromArgs(T&, std::string
const&, std::vector<InputSpec>&, std::vector<ExpressionInfo>&)
158 template <is_process_configurable T>
159 inline static bool requestInputsFromArgs(T& pc, std::string
const&
name, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eis)
161 AnalysisDataProcessorBuilder::inputsFromArgs(pc.process, (
name +
"/" + pc.name).c_str(), pc.value, inputs, eis);
164 template <
typename T>
165 inline static bool requestCacheFromArgs(T&,
Cache&,
Cache&)
169 template <is_process_configurable T>
170 inline static bool requestCacheFromArgs(T& pc,
Cache& bk,
Cache& bku)
172 AnalysisDataProcessorBuilder::cacheFromArgs(pc.process, pc.value, bk, bku);
176 template <
typename C, is_enumeration A>
177 static void inputsFromArgs(
void (C::*)(
A),
const char* ,
bool , std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>&)
179 std::vector<ConfigParamSpec> inputMetadata;
186 static void inputsFromArgs(
void (C::*)(
A, Args...),
const char*
name,
bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos)
187 requires(std::is_lvalue_reference_v<A> && (std::is_lvalue_reference_v<Args> && ...))
190 addInputsAndExpressions<typename std::decay_t<A>::parent_t, Args...>(
hash,
name,
value, inputs, eInfos);
195 static void inputsFromArgs(
void (C::*)(Args...),
const char*
name,
bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos)
196 requires(std::is_lvalue_reference_v<Args> && ...)
199 addInputsAndExpressions<Args...>(
hash,
name,
value, inputs, eInfos);
203 template <
typename C, is_enumeration A>
204 static void cacheFromArgs(
void (C::*)(
A), bool,
Cache&,
Cache&)
209 static void cacheFromArgs(
void (C::*)(
A, Args...),
bool value,
Cache& bk,
Cache& bku)
211 addGroupingCandidates<
A, Args...>(bk, bku,
value);
215 static void cacheFromArgs(
void (C::*)(
A, Args...), bool,
Cache&,
Cache&)
219 template <soa::TableRef R>
220 static auto extractTableFromRecord(InputRecord& record)
222 auto table = record.get<TableConsumer>(o2::aod::matcher<R>())->asArrowTable();
223 if (table->num_rows() == 0) {
224 table = makeEmptyTable<R>();
229 template <soa::is_table T>
230 static auto extractFromRecord(InputRecord& record)
232 return T { [&record]<
size_t N, std::array<soa::TableRef, N> refs,
size_t... Is>(std::index_sequence<Is...>) {
return std::vector{extractTableFromRecord<refs[Is]>(record)...}; }.template operator()<T::originals.size(), T::originals>(std::make_index_sequence<T::originals.size()>()) };
235 template <soa::is_iterator T>
236 static auto extractFromRecord(InputRecord& record)
238 return typename T::parent_t { [&record]<
size_t N, std::array<soa::TableRef, N> refs,
size_t... Is>(std::index_sequence<Is...>) {
return std::vector{extractTableFromRecord<refs[Is]>(record)...}; }.template operator()<T::parent_t::originals.size(), T::parent_t::originals>(std::make_index_sequence<T::parent_t::originals.size()>()) };
241 template <soa::is_filtered T>
242 static auto extractFilteredFromRecord(InputRecord& record,
ExpressionInfo& info)
244 std::shared_ptr<arrow::Table> table =
nullptr;
245 auto joiner = [&record]<
size_t N, std::array<soa::TableRef, N> refs,
size_t... Is>(std::index_sequence<Is...>) {
return std::vector{extractTableFromRecord<refs[Is]>(record)...}; };
247 table =
o2::soa::ArrowHelpers::joinTables(joiner.template
operator()<T::parent_t::originals.size(), T::parent_t::originals>(std::make_index_sequence<T::parent_t::originals.size()>()), std::span{T::parent_t::originalLabels});
249 table =
o2::soa::ArrowHelpers::joinTables(joiner.template
operator()<T::originals.size(), T::originals>(std::make_index_sequence<T::originals.size()>()), std::span{T::originalLabels});
258 return typename T::parent_t({table}, info.
selection);
264 template <is_enumeration T,
int AI>
265 static auto extract(InputRecord&, std::vector<ExpressionInfo>&,
size_t)
270 template <soa::is_iterator T,
int AI>
271 static auto extract(InputRecord& record, std::vector<ExpressionInfo>& infos,
size_t phash)
273 if constexpr (std::same_as<typename T::policy_t, soa::FilteredIndexPolicy>) {
274 return extractFilteredFromRecord<T>(record, *std::find_if(infos.begin(), infos.end(), [&phash](
ExpressionInfo const&
i) { return (i.processHash == phash && i.argumentIndex == AI); }));
276 return extractFromRecord<T>(record);
280 template <soa::is_table T,
int AI>
281 static auto extract(InputRecord& record, std::vector<ExpressionInfo>& infos,
size_t phash)
284 return extractFilteredFromRecord<T>(record, *std::find_if(infos.begin(), infos.end(), [&phash](
ExpressionInfo const&
i) { return (i.processHash == phash && i.argumentIndex == AI); }));
286 return extractFromRecord<T>(record);
291 static auto bindGroupingTable(InputRecord& record,
void (C::*)(Grouping, Args...), std::vector<ExpressionInfo>& infos)
292 requires(!std::same_as<Grouping, void>)
295 return extract<std::decay_t<Grouping>, 0>(record, infos,
hash);
299 static auto bindAssociatedTables(InputRecord& record,
void (C::*)(Grouping, Args...), std::vector<ExpressionInfo>& infos)
300 requires(!std::same_as<Grouping, void> &&
sizeof...(Args) > 0)
302 constexpr auto p = pack<Args...>{};
304 return std::make_tuple(extract<std::decay_t<Args>, has_type_at_v<Args>(p) + 1>(record, infos,
hash)...);
308 static void overwriteInternalIndices(std::tuple<As...>& dest, std::tuple<As...>
const&
src)
310 (std::get<As>(dest).bindInternalIndicesTo(&std::get<As>(
src)), ...);
314 static void invokeProcess(Task& task, InputRecord& inputs,
void (Task::*processingFunction)(Grouping, Associated...), std::vector<ExpressionInfo>& infos, ArrowTableSlicingCache& slices)
316 using G = std::decay_t<Grouping>;
317 auto groupingTable = AnalysisDataProcessorBuilder::bindGroupingTable(inputs, processingFunction, infos);
319 constexpr const int numElements = nested_brace_constructible_size<false, std::decay_t<Task>>() / 10;
322 homogeneous_apply_refs_sized<numElements>([&groupingTable](
auto&
element) {
329 if constexpr (
sizeof...(Associated) == 0) {
331 homogeneous_apply_refs_sized<numElements>([&groupingTable](
auto&
element) {
338 for (
auto&
element : groupingTable) {
339 std::invoke(processingFunction, task, *
element);
343 "Single argument of process() should be a table-like or an iterator");
344 std::invoke(processingFunction, task, groupingTable);
349 "Associated arguments of process() should not be iterators");
350 auto associatedTables = AnalysisDataProcessorBuilder::bindAssociatedTables(inputs, processingFunction, infos);
353 [&task](
auto&... t)
mutable {
354 (homogeneous_apply_refs_sized<numElements>(
364 auto binder = [&task, &groupingTable, &associatedTables](
auto&
x)
mutable {
365 x.bindExternalIndices(&groupingTable, &std::get<std::decay_t<Associated>>(associatedTables)...);
366 homogeneous_apply_refs_sized<numElements>([&
x](
auto& t)
mutable {
373 groupingTable.bindExternalIndices(&std::get<std::decay_t<Associated>>(associatedTables)...);
377 [&binder](
auto&...
x)
mutable {
383 homogeneous_apply_refs_sized<numElements>([&groupingTable, &associatedTables](
auto& t) {
388 overwriteInternalIndices(associatedTables, associatedTables);
390 auto slicer = GroupSlicer(groupingTable, associatedTables, slices);
391 for (
auto& slice : slicer) {
392 auto associatedSlices = slice.associatedTables();
393 overwriteInternalIndices(associatedSlices, associatedTables);
395 [&binder](
auto&...
x)
mutable {
401 homogeneous_apply_refs_sized<numElements>([&groupingTable](
auto&
x) {
407 invokeProcessWithArgs(task, processingFunction, slice.groupingElement(), associatedSlices);
411 homogeneous_apply_refs_sized<numElements>([&groupingTable](
auto&
x) {
417 invokeProcessWithArgs(task, processingFunction, groupingTable, associatedTables);
423 static void invokeProcessWithArgs(C& task, T processingFunction, G
g, std::tuple<A...>& at)
425 std::invoke(processingFunction, task,
g, std::get<A>(at)...);
431 std::vector<std::pair<std::string, bool>>
map;
442template <
typename T,
typename...
A>
445 auto task = std::make_shared<T>(std::forward<A>(args)...);
446 for (
auto& setting : second.
map) {
449 return analysis_task_parsers::setProcessSwitch(setting,
element);
453 outputName =
first.value;
457template <
typename T,
typename...
A>
460 auto task = std::make_shared<T>(std::forward<A>(args)...);
461 for (
auto& setting :
first.map) {
464 return analysis_task_parsers::setProcessSwitch(setting,
element);
468 outputName = second.
value;
472template <
typename T,
typename...
A>
473auto getTaskNameSetProcesses(std::string& outputName, SetDefaultProcesses
first,
A... args)
475 auto task = std::make_shared<T>(std::forward<A>(args)...);
476 for (
auto& setting :
first.map) {
479 return analysis_task_parsers::setProcessSwitch(setting,
element);
483 auto type_name_str = type_name<T>();
488template <
typename T,
typename...
A>
489auto getTaskNameSetProcesses(std::string& outputName,
TaskName first,
A... args)
491 auto task = std::make_shared<T>(std::forward<A>(args)...);
492 outputName =
first.value;
496template <
typename T,
typename...
A>
497auto getTaskNameSetProcesses(std::string& outputName,
A... args)
499 auto task = std::make_shared<T>(std::forward<A>(args)...);
500 auto type_name_str = type_name<T>();
509template <
typename T,
typename... Args>
512 TH1::AddDirectory(
false);
514 std::string name_str;
515 auto task = getTaskNameSetProcesses<T>(name_str, args...);
517 auto suffix = ctx.
options().
get<std::string>(
"workflow-suffix");
518 if (!suffix.empty()) {
521 const char*
name = name_str.c_str();
525 std::vector<OutputSpec> outputs;
526 std::vector<InputSpec> inputs;
527 std::vector<ConfigParamSpec> options;
528 std::vector<ExpressionInfo> expressionInfos;
530 constexpr const int numElements = nested_brace_constructible_size<false, std::decay_t<T>>() / 10;
538 if constexpr (
requires { &T::process; }) {
539 AnalysisDataProcessorBuilder::inputsFromArgs(&T::process,
"default",
true, inputs, expressionInfos);
541 homogeneous_apply_refs_sized<numElements>(
542 [
name = name_str, &expressionInfos, &inputs](
auto&
x)
mutable {
544 return AnalysisDataProcessorBuilder::requestInputsFromArgs(
x,
name, inputs, expressionInfos);
550 homogeneous_apply_refs_sized<numElements>([&inputs](
auto&
element) {
556 if (inputs.empty() ==
true) {
557 LOG(warn) <<
"Task " << name_str <<
" has no inputs";
564 requiredServices.insert(requiredServices.end(), arrowServices.begin(), arrowServices.end());
569 Cache bindingsKeysUnsorted;
578 homogeneous_apply_refs_sized<numElements>([&eosContext](
auto&
element) {
589 if constexpr (
requires { task->init(ic); }) {
594 homogeneous_apply_refs_sized<numElements>(
598 homogeneous_apply_refs_sized<numElements>([&expressionInfos](
auto&
element) {
604 if constexpr (
requires { &T::process; }) {
605 AnalysisDataProcessorBuilder::cacheFromArgs(&T::process,
true, bindingsKeys, bindingsKeysUnsorted);
607 homogeneous_apply_refs_sized<numElements>(
608 [&bindingsKeys, &bindingsKeysUnsorted](
auto&
x) {
609 return AnalysisDataProcessorBuilder::requestCacheFromArgs(
x, bindingsKeys, bindingsKeysUnsorted);
622 std::ranges::for_each(expressionInfos, [](
auto& info) { info.resetSelection =
true; });
625 homogeneous_apply_refs_sized<numElements>([&slices](
auto&
element) {
634 if constexpr (
requires { task->run(pc); }) {
638 if constexpr (
requires { &T::process; }) {
639 AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(), &T::process, expressionInfos, slices);
642 homogeneous_apply_refs_sized<numElements>(
643 [&pc, &expressionInfos, &task, &slices](
auto&
x) {
645 if (
x.value ==
true) {
646 AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(),
x.process, expressionInfos, slices);
std::vector< std::shared_ptr< arrow::Field > > fields
constexpr uint32_t runtime_hash(char const *str)
ConfigParamRegistry & options() const
T get(const char *key) const
Helper to check if a type T is an iterator.
GLuint const GLchar * name
GLenum GLenum GLsizei const GLuint GLboolean enabled
GLsizei const GLfloat * value
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
bool prepareService(InitContext &, T &)
void bindExternalIndicesPartition(P &, T *...)
bool requestInputs(std::vector< InputSpec > &, T const &)
Table auto-creation handling.
void setGroupedCombination(C &, TG &, Ts &...)
Combinations handling.
bool initializeCache(ProcessingContext &, T &)
Cache handling.
void setPartition(P &, T &...)
bool prepareDelayedOutput(ProcessingContext &, T &)
bool prepareOption(InitContext &, O &)
bool appendOutput(std::vector< OutputSpec > &, T &, uint32_t)
Outputs handling.
bool finalizeOutput(ProcessingContext &, T &)
bool registerCache(T &, Cache &, Cache &)
Preslice handling.
bool postRunOutput(EndOfStreamContext &, T &)
bool createExpressionTrees(std::vector< ExpressionInfo > &, T &)
bool newDataframePartition(T &)
bool prepareOutput(ProcessingContext &, T &)
bool appendOption(std::vector< ConfigParamSpec > &, O &)
Options handling.
bool updateSliceInfo(T &, ArrowTableSlicingCache &)
void bindInternalIndicesPartition(P &, T *)
bool appendCondition(std::vector< InputSpec > &, C &)
Conditions handling.
bool postRunService(EndOfStreamContext &, T &)
bool addService(std::vector< ServiceSpec > &, T &)
Service handling.
bool updatePlaceholders(InitContext &, T &)
Filter handling.
bool newDataframeCondition(InputRecord &, C &)
void updateFilterInfo(ExpressionInfo &info, std::shared_ptr< arrow::Table > &table)
Defining PrimaryVertex explicitly as messageable.
void updatePairList(Cache &list, Entry &entry)
DataProcessorSpec adaptAnalysisTask(ConfigContext const &ctx, Args &&... args)
std::vector< Entry > Cache
std::string type_to_task_name(std::string_view const &camelCase)
Convert a CamelCase task struct name to snake-case task name.
@ Me
Only quit this data processor.
constexpr auto homogeneous_apply_refs(L l, T &&object)
std::string cutString(std::string &&str)
auto createFieldsFromColumns(framework::pack< C... >)
void missingFilterDeclaration(int hash, int ai)
Defining DataPointCompositeObject explicitly as copiable.
gandiva::Selection selection
std::function< ProcessCallback(InitContext &)> InitCallback
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static std::vector< ServiceSpec > arrowServices()
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static constexpr int64_t step
static constexpr int64_t begin
std::vector< std::pair< std::string, bool > > map
Struct to differentiate task names from possible task string arguments.
TaskName(std::string name)
static constexpr uint32_t uniqueId()
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"