12#ifndef FRAMEWORK_ANALYSIS_TASK_H_
13#define FRAMEWORK_ANALYSIS_TASK_H_
30#include <arrow/compute/kernel.h>
31#include <arrow/table.h>
32#include <gandiva/node.h>
53template <
int64_t BEGIN,
int64_t END,
int64_t STEP = 1>
61static constexpr bool is_enumeration_v =
false;
63template <
int64_t BEGIN,
int64_t END,
int64_t STEP>
64static constexpr bool is_enumeration_v<Enumeration<BEGIN, END, STEP>> =
true;
76struct AnalysisDataProcessorBuilder {
83 if constexpr (soa::relatedByIndex<std::decay_t<G>, std::decay_t<As>>()) {
84 Entry e{soa::getLabelFromTypeForKey<std::decay_t<As>>(
key), soa::getMatcherFromTypeForKey<std::decay_t<As>>(
key),
key,
enabled};
93 }(framework::pack<Args...>{}, bk, bku,
enabled);
96 template <soa::TableRef R>
99 auto spec = soa::tableRef2InputSpec<R>(newOrigin);
100 if (
R.origin_hash !=
"AOD"_h) {
101 spec.metadata.emplace_back(ConfigParamSpec{
"aod-origin-replaced",
VariantType::Bool,
true, {
"\"\""}});
106 auto locate = std::ranges::find_if(iInfos, [&
hash](
auto const& info) {
return info.hash ==
hash; });
107 if (locate == iInfos.end()) {
108 iInfos.emplace_back(
hash, std::vector{std::pair{ai, matcher}});
110 if (std::ranges::none_of(locate->matchers, [&ai, &matcher](
auto const&
match) { return (match.first == ai) && (match.second == matcher); })) {
111 locate->matchers.emplace_back(std::pair{ai, matcher});
117 template <soa::is_table A>
119 static void addExpression(
int, uint32_t, std::vector<ExpressionInfo>&)
123 template <soa::is_filtered_table A>
124 static void addExpression(
int ai, uint32_t
hash, std::vector<ExpressionInfo>& eInfos)
127 eInfos.emplace_back(ai,
hash, std::decay_t<A>::hashes(), std::make_shared<arrow::Schema>(
fields));
130 template <soa::is_iterator A>
131 static void addExpression(
int ai, uint32_t
hash, std::vector<ExpressionInfo>& eInfos)
133 addExpression<typename std::decay_t<A>::parent_t>(ai,
hash, eInfos);
137 template <soa::is_table A>
140 [&
name, &
value, &inputs, &iInfos, &ai, &
hash, newOrigin = std::move(newOrigin)]<
size_t N, std::array<soa::TableRef, N> refs,
size_t... Is>(std::index_sequence<Is...>)
mutable {
141 (addOriginalRef<refs[Is]>(
name,
value, inputs, iInfos, ai,
hash, newOrigin), ...);
142 }.template operator()<A::originals.size(), std::decay_t<A>::originals>(std::make_index_sequence<std::decay_t<A>::originals.size()>());
147 static void addInputsAndExpressions(uint32_t
hash,
const char*
name,
bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos, std::vector<InputInfo>& iInfos,
header::DataOrigin&& newOrigin =
header::DataOrigin{
"AOD"})
150 ([&ai, &
hash, &eInfos, &
name, &
value, &inputs, &iInfos, newOrigin]()
mutable {
152 using T = std::decay_t<As>;
153 addExpression<T>(ai,
hash, eInfos);
154 addInput<T>(
name,
value, inputs, iInfos, ai,
hash, std::move(newOrigin));
160 template <
typename T>
161 inline static bool requestInputsFromArgs(T&, std::string
const&, std::vector<InputSpec>&, std::vector<ExpressionInfo>&, std::vector<InputInfo>&,
header::DataOrigin)
165 template <is_process_configurable T>
166 inline static bool requestInputsFromArgs(T& pc, std::string
const&
name, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eis, std::vector<InputInfo>& iifs,
header::DataOrigin newOrigin =
header::DataOrigin{
"AOD"})
168 AnalysisDataProcessorBuilder::inputsFromArgs(pc.process, (
name +
"/" + pc.name).c_str(), pc.value, inputs, eis, iifs, newOrigin);
171 template <
typename T>
172 inline static bool requestCacheFromArgs(T&,
Cache&,
Cache&)
176 template <is_process_configurable T>
177 inline static bool requestCacheFromArgs(T& pc,
Cache& bk,
Cache& bku)
179 AnalysisDataProcessorBuilder::cacheFromArgs(pc.process, pc.value, bk, bku);
183 template <
typename C, is_enumeration A>
184 static void inputsFromArgs(
void (C::*)(
A),
const char* ,
bool , std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>&, std::vector<InputInfo>&,
header::DataOrigin)
186 std::vector<ConfigParamSpec> inputMetadata;
193 static void inputsFromArgs(
void (C::*)(
A, Args...),
const char*
name,
bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos, std::vector<InputInfo>& iInfos,
header::DataOrigin newOrigin =
header::DataOrigin{
"AOD"})
194 requires(std::is_lvalue_reference_v<A> && (std::is_lvalue_reference_v<Args> && ...))
197 addInputsAndExpressions<typename std::decay_t<A>::parent_t, Args...>(
hash,
name,
value, inputs, eInfos, iInfos, std::move(newOrigin));
202 static void inputsFromArgs(
void (C::*)(Args...),
const char*
name,
bool value, std::vector<InputSpec>& inputs, std::vector<ExpressionInfo>& eInfos, std::vector<InputInfo>& iInfos,
header::DataOrigin newOrigin =
header::DataOrigin{
"AOD"})
203 requires(std::is_lvalue_reference_v<Args> && ...)
206 addInputsAndExpressions<Args...>(
hash,
name,
value, inputs, eInfos, iInfos, std::move(newOrigin));
210 template <
typename C, is_enumeration A>
211 static void cacheFromArgs(
void (C::*)(
A), bool,
Cache&,
Cache&)
216 static void cacheFromArgs(
void (C::*)(
A, Args...),
bool value,
Cache& bk,
Cache& bku)
218 addGroupingCandidates<
A, Args...>(bk, bku,
value);
222 static void cacheFromArgs(
void (C::*)(
A, Args...), bool,
Cache&,
Cache&)
226 template <std::ranges::input_range R>
227 static auto extractTablesFromRecord(InputRecord& record,
R matchers)
229 std::vector<std::shared_ptr<arrow::Table>> tables;
230 std::ranges::transform(
matchers, std::back_inserter(tables), [&record](
auto const&
m) {
231 return record.get<TableConsumer>(
m.second)->asArrowTable();
236 template <soa::is_table T, std::ranges::input_range R>
237 static auto extractFromRecord(InputRecord& record,
R matchers)
239 return T{extractTablesFromRecord(record,
matchers)};
242 template <soa::is_iterator T, std::ranges::input_range R>
243 static auto extractFromRecord(InputRecord& record,
R matchers)
245 return typename T::parent_t{extractTablesFromRecord(record,
matchers)};
248 template <soa::is_filtered T, std::ranges::input_range R>
254 if (info.selection ==
nullptr) {
259 return typename T::parent_t({table}, info.selection);
261 return T({table}, info.selection);
265 template <is_enumeration T,
int AI, std::ranges::input_range R>
266 static auto extract(InputRecord&,
R, std::vector<ExpressionInfo>&,
size_t)
271 template <soa::is_table_or_iterator T,
int AI, std::ranges::input_range R>
272 static auto extract(InputRecord& record,
R matchers, std::vector<ExpressionInfo>& infos,
size_t phash)
276 return extractFilteredFromRecord<T>(record,
matchers, *std::ranges::find_if(infos, [&phash](
ExpressionInfo const&
i) {
return (
i.processHash == phash &&
i.argumentIndex == AI); }));
278 return extractFromRecord<T>(record,
matchers);
283 static auto bindGroupingTable(InputRecord& record,
R matchers,
void (C::*)(Grouping, Args...), std::vector<ExpressionInfo>& infos)
284 requires(!std::same_as<Grouping, void>)
287 return extract<std::decay_t<Grouping>, 0>(record,
matchers | std::views::filter([](
auto const& pair) {
return pair.first == 0; }), infos,
hash);
291 static auto bindAssociatedTables(InputRecord& record,
R matchers,
void (C::*)(Grouping, Args...), std::vector<ExpressionInfo>& infos)
292 requires(!std::same_as<Grouping, void> &&
sizeof...(Args) > 0)
295 return std::make_tuple(extract<std::decay_t<Args>, has_type_at_v<Args>(pack<Args...>{}) + 1>(record,
matchers | std::views::filter([](
auto const& pair) {
return pair.first == has_type_at_v<Args>(pack<Args...>{}) + 1; }), infos,
hash)...);
299 static void overwriteInternalIndices(std::tuple<As...>& dest, std::tuple<As...>
const&
src)
301 (std::get<As>(dest).bindInternalIndicesTo(&std::get<As>(
src)), ...);
305 static void invokeProcess(Task& task, InputRecord& inputs,
R matchers,
void (Task::*processingFunction)(Grouping, Associated...), std::vector<ExpressionInfo>& infos, ArrowTableSlicingCache& slices,
header::DataOrigin newOrigin =
header::DataOrigin{
"AOD"})
307 using G = std::decay_t<Grouping>;
308 auto groupingTable = AnalysisDataProcessorBuilder::bindGroupingTable(inputs,
matchers, processingFunction, infos);
310 constexpr const int numElements = nested_brace_constructible_size<false, std::decay_t<Task>>() / 10;
313 homogeneous_apply_refs_sized<numElements>([&groupingTable](
auto&
element) {
320 if constexpr (
sizeof...(Associated) == 0) {
322 homogeneous_apply_refs_sized<numElements>([&groupingTable](
auto&
element) {
329 for (
auto&
element : groupingTable) {
330 std::invoke(processingFunction, task, *
element);
333 std::invoke(processingFunction, task, groupingTable);
337 auto associatedTables = AnalysisDataProcessorBuilder::bindAssociatedTables(inputs,
matchers, processingFunction, infos);
340 [&task](
auto&... t)
mutable {
341 (homogeneous_apply_refs_sized<numElements>(
351 auto binder = [&task, &groupingTable, &associatedTables](
auto&
x)
mutable {
352 x.bindExternalIndices(&groupingTable, &std::get<std::decay_t<Associated>>(associatedTables)...);
353 homogeneous_apply_refs_sized<numElements>([&
x](
auto& t)
mutable {
360 groupingTable.bindExternalIndices(&std::get<std::decay_t<Associated>>(associatedTables)...);
364 [&binder](
auto&...
x)
mutable {
370 homogeneous_apply_refs_sized<numElements>([&groupingTable, &associatedTables](
auto& t) {
375 overwriteInternalIndices(associatedTables, associatedTables);
377 auto slicer = GroupSlicer(groupingTable, associatedTables, slices, newOrigin);
378 for (
auto& slice : slicer) {
379 auto associatedSlices = slice.associatedTables();
380 overwriteInternalIndices(associatedSlices, associatedTables);
382 [&binder](
auto&...
x)
mutable {
388 homogeneous_apply_refs_sized<numElements>([&groupingTable](
auto&
x) {
394 [](Task& task,
void (Task::*processingFunction)(Grouping, Associated...), Grouping
g, std::tuple<std::decay_t<Associated>...>& at) {
395 std::invoke(processingFunction, task,
g, std::get<std::decay_t<Associated>>(at)...);
396 }(task, processingFunction, slice.groupingElement(), associatedSlices);
400 homogeneous_apply_refs_sized<numElements>([&groupingTable](
auto&
x) {
406 [](Task& task,
void (Task::*processingFunction)(Grouping, Associated...), Grouping
g, std::tuple<std::decay_t<Associated>...>& at) {
407 std::invoke(processingFunction, task,
g, std::get<std::decay_t<Associated>>(at)...);
408 }(task, processingFunction, groupingTable, associatedTables);
416 std::vector<std::pair<std::string, bool>>
map;
427template <
typename T,
typename...
A>
430 auto task = std::make_shared<T>(std::forward<A>(args)...);
431 for (
auto& setting : second.
map) {
434 return analysis_task_parsers::setProcessSwitch(setting,
element);
438 outputName =
first.value;
442template <
typename T,
typename...
A>
445 auto task = std::make_shared<T>(std::forward<A>(args)...);
446 for (
auto& setting :
first.map) {
449 return analysis_task_parsers::setProcessSwitch(setting,
element);
453 outputName = second.
value;
457template <
typename T,
typename...
A>
458auto getTaskNameSetProcesses(std::string& outputName, SetDefaultProcesses
first,
A... args)
460 auto task = std::make_shared<T>(std::forward<A>(args)...);
461 for (
auto& setting :
first.map) {
464 return analysis_task_parsers::setProcessSwitch(setting,
element);
468 auto type_name_str = type_name<T>();
473template <
typename T,
typename...
A>
474auto getTaskNameSetProcesses(std::string& outputName,
TaskName first,
A... args)
476 auto task = std::make_shared<T>(std::forward<A>(args)...);
477 outputName =
first.value;
481template <
typename T,
typename...
A>
482auto getTaskNameSetProcesses(std::string& outputName,
A... args)
484 auto task = std::make_shared<T>(std::forward<A>(args)...);
485 auto type_name_str = type_name<T>();
493template <
typename T,
typename... Args>
496 TH1::AddDirectory(
false);
498 std::string name_str;
499 auto task = getTaskNameSetProcesses<T>(name_str, args...);
501 auto suffix = ctx.
options().
get<std::string>(
"workflow-suffix");
502 if (!suffix.empty()) {
505 const char*
name = name_str.c_str();
509 std::vector<OutputSpec> outputs;
510 std::vector<InputSpec> inputs;
511 std::vector<ConfigParamSpec> options;
512 std::vector<ExpressionInfo> expressionInfos;
513 std::vector<InputInfo> inputInfos;
515 std::string newOriginStr;
518 newOriginStr = ctx.
options().
get<std::string>(
"aod-origin-replace");
519 if (newOriginStr.size() > 4UL) {
523 if (!newOriginStr.empty()) {
524 newOrigin.runtimeInit(newOriginStr.c_str(), std::min(newOriginStr.size(), 4UL));
527 constexpr const int numElements = nested_brace_constructible_size<false, std::decay_t<T>>() / 10;
535 if constexpr (
requires { &T::process; }) {
536 AnalysisDataProcessorBuilder::inputsFromArgs(&T::process,
"default",
true, inputs, expressionInfos, inputInfos, newOrigin);
538 homogeneous_apply_refs_sized<numElements>(
539 [
name = name_str, &expressionInfos, &inputs, &inputInfos, &newOrigin](
auto&
x)
mutable {
541 return AnalysisDataProcessorBuilder::requestInputsFromArgs(
x,
name, inputs, expressionInfos, inputInfos, newOrigin);
547 homogeneous_apply_refs_sized<numElements>([&inputs, &newOrigin](
auto&
element) {
553 if (inputs.empty() ==
true) {
554 LOG(warn) <<
"Task " << name_str <<
" has no inputs";
563 for (
auto& input : inputs) {
564 for (
auto& meta : input.metadata) {
565 if (meta.name.starts_with(
"ccdb:") && meta.name !=
"ccdb:") {
577 requiredServices.insert(requiredServices.end(), arrowServices.begin(), arrowServices.end());
585 Cache bindingsKeysUnsorted;
594 homogeneous_apply_refs_sized<numElements>([&eosContext](
auto&
element) {
605 if constexpr (
requires { task->init(ic); }) {
610 homogeneous_apply_refs_sized<numElements>(
614 homogeneous_apply_refs_sized<numElements>([&expressionInfos](
auto&
element) {
620 if constexpr (
requires { &T::process; }) {
621 AnalysisDataProcessorBuilder::cacheFromArgs(&T::process,
true, bindingsKeys, bindingsKeysUnsorted);
623 homogeneous_apply_refs_sized<numElements>(
624 [&bindingsKeys, &bindingsKeysUnsorted](
auto&
x) {
625 return AnalysisDataProcessorBuilder::requestCacheFromArgs(
x, bindingsKeys, bindingsKeysUnsorted);
630 std::ranges::transform(bindingsKeys, bindingsKeys.begin(), [&newOrigin](
Entry&
entry) {
631 if ((entry.matcher.origin == header::DataOrigin{
"AOD"}) && (newOrigin !=
header::DataOrigin{
"AOD"})) {
636 std::ranges::transform(bindingsKeysUnsorted, bindingsKeysUnsorted.begin(), [&newOrigin](
Entry&
entry) {
637 if ((entry.matcher.origin == header::DataOrigin{
"AOD"}) && (newOrigin !=
header::DataOrigin{
"AOD"})) {
647 return [task, expressionInfos, inputInfos, newOrigin](
ProcessingContext& pc)
mutable {
653 std::ranges::for_each(expressionInfos, [](
auto& info) { info.resetSelection =
true; });
656 homogeneous_apply_refs_sized<numElements>([&slices](
auto&
element) {
665 if constexpr (
requires { task->run(pc); }) {
669 if constexpr (
requires { &T::process; }) {
670 auto loc = std::ranges::find_if(inputInfos, [](
auto const& info) {
return info.hash == o2::framework::TypeIdHelpers::uniqueId<decltype(&T::process)>(); });
671 auto matchers = loc == inputInfos.end() ? std::vector<std::pair<int, ConcreteDataMatcher>>{} : loc->matchers;
672 AnalysisDataProcessorBuilder::invokeProcess(*(task.get()), pc.inputs(),
matchers, &T::process, expressionInfos, slices, newOrigin);
675 homogeneous_apply_refs_sized<numElements>(
676 [&pc, &expressionInfos, &task, &slices, &inputInfos, &newOrigin](
auto&
x) {
678 if (
x.value ==
true) {
680 auto matchers = loc == inputInfos.end() ? std::vector<std::pair<int, ConcreteDataMatcher>>{} : loc->matchers;
681 AnalysisDataProcessorBuilder::invokeProcess(*task.get(), pc.inputs(),
matchers,
x.process, expressionInfos, slices, newOrigin);
std::vector< framework::ConcreteDataMatcher > matchers
std::vector< std::shared_ptr< arrow::Field > > fields
constexpr uint32_t runtime_hash(char const *str)
ConfigParamRegistry & options() const
bool hasOption(const char *key) const
T get(const char *key) const
Helper to check if a type T is an iterator.
bool match(const std::vector< std::string > &queries, const char *pattern)
GLuint const GLchar * name
GLenum GLenum GLsizei const GLuint GLboolean enabled
GLsizei const GLfloat * value
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
bool prepareService(InitContext &, T &)
void bindExternalIndicesPartition(P &, T *...)
void setGroupedCombination(C &, TG &, Ts &...)
Combinations handling.
bool initializeCache(ProcessingContext &, T &)
Cache handling.
bool replaceOrigin(T &, header::DataOrigin const &)
Preslice handling.
bool requestInputs(std::vector< InputSpec > &, T &, header::DataOrigin)
void setPartition(P &, T &...)
bool prepareDelayedOutput(ProcessingContext &, T &)
bool prepareOption(InitContext &, O &)
bool finalizeOutput(ProcessingContext &, T &)
bool registerCache(T &, Cache &, Cache &)
bool postRunOutput(EndOfStreamContext &, T &)
bool createExpressionTrees(std::vector< ExpressionInfo > &, T &)
bool newDataframePartition(T &)
bool prepareOutput(ProcessingContext &, T &)
bool appendOption(std::vector< ConfigParamSpec > &, O &)
Options handling.
bool updateSliceInfo(T &, ArrowTableSlicingCache &)
void bindInternalIndicesPartition(P &, T *)
bool appendCondition(std::vector< InputSpec > &, C &)
Conditions handling.
bool postRunService(EndOfStreamContext &, T &)
bool addService(std::vector< ServiceSpec > &, T &)
Service handling.
bool updatePlaceholders(InitContext &, T &)
Filter handling.
constexpr bool appendOutput(std::vector< OutputSpec > &, T &, uint32_t)
Outputs handling.
bool newDataframeCondition(InputRecord &, C &)
bool updateOutputSpec(T &, header::DataOrigin)
void updateFilterInfo(ExpressionInfo &info, std::shared_ptr< arrow::Table > &table)
Defining ITS Vertex explicitly as messageable.
void updatePairList(Cache &list, Entry &entry)
DataProcessorSpec adaptAnalysisTask(ConfigContext const &ctx, Args &&... args)
std::vector< Entry > Cache
std::string type_to_task_name(std::string_view const &camelCase)
Convert a CamelCase task struct name to snake-case task name.
@ Me
Only quit this data processor.
constexpr auto homogeneous_apply_refs(L l, T &&object)
ConfigParamSpec replaceOrigin(ConfigParamSpec &source, std::string const &originStr)
void wrongOriginReplacement(std::string_view replacement)
std::string cutString(std::string &&str)
auto createFieldsFromColumns(framework::pack< C... >)
void missingFilterDeclaration(int hash, int ai)
std::function< ProcessCallback(InitContext &)> InitCallback
static std::vector< ServiceSpec > defaultServices(std::string extraPlugins="", int numWorkers=0)
Split a string into a vector of strings using : as a separator.
static std::vector< ServiceSpec > arrowServices()
static void addOptionIfMissing(std::vector< ConfigParamSpec > &specs, const ConfigParamSpec &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static constexpr int64_t step
static constexpr int64_t begin
std::vector< std::pair< std::string, bool > > map
Struct to differentiate task names from possible task string arguments.
TaskName(std::string name)
static constexpr uint32_t uniqueId()
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"