46namespace analysis_task_parsers
56template <is_configurable O>
57bool appendOption(std::vector<ConfigParamSpec>& options, O& option)
62template <is_configurable_group O>
63bool appendOption(std::vector<ConfigParamSpec>& options, O& optionGroup)
65 if constexpr (
requires { optionGroup.prefix; }) {
66 homogeneous_apply_refs<true>([prefix = optionGroup.prefix]<
typename C>(C& option) {
67 if constexpr (requires { option.name; }) {
68 option.name.insert(0, 1,
'.');
69 option.name.insert(0, prefix);
75 homogeneous_apply_refs<true>([&options](
auto& option) {
return appendOption(options, option); }, optionGroup);
85template <is_configurable O>
86bool prepareOption(
InitContext& context, O& configurable)
89 configurable.value = context.
options().
get<
typename O::type>(configurable.name.c_str());
91 auto pt = context.
options().
get<boost::property_tree::ptree>(configurable.name.c_str());
92 configurable.value = RootConfigParamHelpers::as<typename O::type>(pt);
97template <is_configurable_group O>
100 homogeneous_apply_refs<true>([&context](
auto&& configurable) {
return prepareOption(context, configurable); }, configurableGroup);
111template <is_condition C>
112bool appendCondition(std::vector<InputSpec>& inputs, C& condition)
118template <is_condition_group C>
119bool appendCondition(std::vector<InputSpec>& inputs, C& conditionGroup)
121 homogeneous_apply_refs<true>([&inputs](
auto& condition) {
return appendCondition(inputs, condition); }, conditionGroup);
132template <is_spawns T>
135 return "control:spawn";
138template <is_builds T>
139const char* controlOption()
141 return "control:build";
144template <is_defines T>
145const char* controlOption()
147 return "control:define";
153template <with_base_table T>
154bool requestInputs(std::vector<InputSpec>& inputs, T
const& entity)
156 auto base_specs = T::base_specs();
157 for (
auto base_spec : base_specs) {
170template <is_condition C>
171bool newDataframeCondition(
InputRecord& record, C& condition)
173 condition.instance = (
typename C::type*)record.
get<
typename C::type*>(condition.path).get();
177template <is_condition_group C>
178bool newDataframeCondition(
InputRecord& record, C& conditionGroup)
180 homogeneous_apply_refs<true>([&record](
auto&& condition) {
return newDataframeCondition(record, condition); }, conditionGroup);
191template <is_produces T>
192bool appendOutput(std::vector<OutputSpec>& outputs, T&, uint32_t)
198template <is_produces_group T>
199bool appendOutput(std::vector<OutputSpec>& outputs, T& producesGroup, uint32_t
hash)
201 homogeneous_apply_refs<true>([&outputs,
hash](
auto& produces) {
return appendOutput(outputs, produces,
hash); }, producesGroup);
205template <is_histogram_registry T>
209 outputs.emplace_back(hr.spec());
213template <is_outputobj T>
217 outputs.emplace_back(obj.spec());
222 requires(is_spawns<T> || is_builds<T> || is_defines<T>)
223bool appendOutput(std::vector<OutputSpec>& outputs, T& entity, uint32_t)
225 outputs.emplace_back(entity.spec());
235template <is_histogram_registry T>
239 auto sendHistos = [deviceSpec, &context](
HistogramRegistry const& self, TNamed* obj)
mutable {
240 context.
outputs().
snapshot(self.ref(deviceSpec.inputTimesliceId, deviceSpec.maxInputTimeslices), *obj);
242 hr.apply(sendHistos);
247template <is_outputobj T>
248bool postRunOutput(EndOfStreamContext& context, T& obj)
251 context.
outputs().snapshot(obj.ref(deviceSpec.inputTimesliceId, deviceSpec.maxInputTimeslices), *obj);
261template <is_produces T>
268template <is_produces_group T>
271 homogeneous_apply_refs<true>([&context](
auto& produces) {
return prepareOutput(context, produces); }, producesGroup);
275template <is_spawns T>
279 auto originalTable =
soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.
size(), metadata::sources>(context), std::span{metadata::base_table_t::originalLabels});
280 if (originalTable->num_rows() == 0) {
281 originalTable = makeEmptyTable<metadata::base_table_t::ref>();
283 using D =
o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>;
285 spawns.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<D>(originalTable,
286 o2::aod::label<metadata::extension_table_t::ref>(),
287 spawns.projectors.data(),
290 spawns.table = std::make_shared<typename T::spawnable_t::table_t>(
soa::ArrowHelpers::joinTables({spawns.extension->asArrowTable(), originalTable}, std::span{T::spawnable_t::table_t::originalLabels}));
294template <is_builds T>
298 return builds.build(extractOriginals<metadata::sources.
size(), metadata::sources>(context));
301template <is_defines T>
303 requires(T::delayed ==
false)
306 auto originalTable =
soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.
size(), metadata::sources>(context), std::span{metadata::base_table_t::originalLabels});
307 if (originalTable->num_rows() == 0) {
308 originalTable = makeEmptyTable<metadata::base_table_t::ref>();
310 if (defines.inputSchema ==
nullptr) {
311 defines.inputSchema = originalTable->schema();
313 using D =
o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>;
315 defines.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<D>(originalTable,
316 o2::aod::label<metadata::extension_table_t::ref>(),
317 defines.projectors.data(),
320 defines.table = std::make_shared<typename T::spawnable_t::table_t>(
soa::ArrowHelpers::joinTables({defines.extension->asArrowTable(), originalTable}, std::span{T::spawnable_t::table_t::originalLabels}));
330template <is_defines T>
331 requires(T::delayed ==
true)
334 if (defines.needRecompilation) {
338 auto originalTable =
soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.
size(), metadata::sources>(context), std::span{metadata::base_table_t::originalLabels});
339 if (originalTable->num_rows() == 0) {
340 originalTable = makeEmptyTable<metadata::base_table_t::ref>();
342 if (defines.inputSchema ==
nullptr) {
343 defines.inputSchema = originalTable->schema();
345 using D =
o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>;
347 defines.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<D>(originalTable,
348 o2::aod::label<metadata::extension_table_t::ref>(),
349 defines.projectors.data(),
352 defines.table = std::make_shared<typename T::spawnable_t::table_t>(
soa::ArrowHelpers::joinTables({defines.extension->asArrowTable(), originalTable}, std::span{T::spawnable_t::table_t::originalLabels}));
362template <is_produces T>
365 produces.setLabel(o2::aod::label<T::persistent_table_t::ref>());
370template <is_produces_group T>
373 homogeneous_apply_refs<true>([&context](
auto& produces) {
return finalizeOutput(context, produces); }, producesGroup);
377template <is_spawns T>
380 context.outputs().adopt(spawns.output(), spawns.asArrowTable());
384template <is_builds T>
387 context.outputs().adopt(builds.output(), builds.asArrowTable());
391template <is_defines T>
394 context.outputs().adopt(defines.output(), defines.asArrowTable());
405template <is_service T>
406bool addService(std::vector<ServiceSpec>& specs, T&)
409 auto p =
typename T::service_t{};
411 PluginManager::loadFromPlugin<ServiceSpec, ServicePlugin>(loadableServices, specs);
422template <is_service T>
423bool prepareService(
InitContext& context, T& service)
425 using S =
typename T::service_t;
426 if constexpr (
requires { &S::instance; }) {
427 service.service = &(S::instance());
442template <is_service T>
447 if constexpr (
requires { &T::service_t::endOfStream; }) {
448 service.service->endOfStream();
461template <expressions::is_filter T>
468template <is_partition T>
469bool updatePlaceholders(
InitContext& context, T& partition)
471 partition.updatePlaceholders(context);
481template <expressions::is_filter T>
482bool createExpressionTrees(std::vector<ExpressionInfo>& expressionInfos, T&
filter)
494template <is_partition T>
495bool newDataframePartition(T& partition)
497 partition.dataframeChanged =
true;
501template <
typename P,
typename... T>
509 ([&]() {
if constexpr (std::same_as<typename P::content_t, T>) {partition.bindTable(tables);} }(), ...);
512template <
typename P,
typename T>
517template <is_partition P,
typename T>
518void bindInternalIndicesPartition(
P& partition, T* table)
520 if constexpr (o2::soa::is_binding_compatible_v<typename P::content_t, std::decay_t<T>>()) {
521 partition.bindInternalIndicesTo(table);
525template <
typename P,
typename... T>
533 partition.bindExternalIndices(tables...);
549template <is_slice_cache T>
552 if (cache.ptr ==
nullptr) {
559template <
typename C,
typename TG,
typename... Ts>
565static void setGroupedCombination(C& comb, TG& grouping, std::tuple<Ts...>& associated)
567 if constexpr (std::same_as<typename C::g_t, std::decay_t<TG>>) {
568 comb.setTables(grouping, associated);
574 requires(!is_preslice<T> && !is_preslice_group<T>)
580template <is_preslice T>
581 requires std::same_as<typename T::policy_t, framework::PreslicePolicySorted>
584 if constexpr (T::optional) {
585 if (preslice.binding ==
"[MISSING]") {
589 auto locate = std::find_if(bsks.begin(), bsks.end(), [&](
auto const&
entry) { return (entry.binding == preslice.bindingKey.binding) && (entry.key == preslice.bindingKey.key); });
590 if (locate == bsks.end()) {
591 bsks.emplace_back(preslice.getBindingKey());
592 }
else if (locate->enabled ==
false) {
593 locate->enabled =
true;
598template <is_preslice T>
599 requires std::same_as<typename T::policy_t, framework::PreslicePolicyGeneral>
602 if constexpr (T::optional) {
603 if (preslice.binding ==
"[MISSING]") {
607 auto locate = std::find_if(bsksU.begin(), bsksU.end(), [&](
auto const&
entry) { return (entry.binding == preslice.bindingKey.binding) && (entry.key == preslice.bindingKey.key); });
608 if (locate == bsksU.end()) {
609 bsksU.emplace_back(preslice.getBindingKey());
610 }
else if (locate->enabled ==
false) {
611 locate->enabled =
true;
616template <is_preslice_group T>
619 homogeneous_apply_refs<true>([&bsks, &bsksU](
auto& preslice) {
return registerCache(preslice, bsks, bsksU); }, presliceGroup);
630template <is_preslice T>
632 requires std::same_as<typename T::policy_t, framework::PreslicePolicySorted>
634 if constexpr (T::optional) {
635 if (preslice.binding ==
"[MISSING]") {
639 preslice.updateSliceInfo(cache.getCacheFor(preslice.getBindingKey()));
643template <is_preslice T>
645 requires std::same_as<typename T::policy_t, framework::PreslicePolicyGeneral>
647 if constexpr (T::optional) {
648 if (preslice.binding ==
"[MISSING]") {
652 preslice.updateSliceInfo(cache.getCacheUnsortedFor(preslice.getBindingKey()));
656template <is_preslice_group T>
657static bool updateSliceInfo(T& presliceGroup, ArrowTableSlicingCache& cache)
659 homogeneous_apply_refs<true>([&cache](
auto& preslice) {
return updateSliceInfo(preslice, cache); }, presliceGroup);
665static bool setProcessSwitch(std::pair<std::string, bool>, T&)
670template <is_process_configurable T>
671static bool setProcessSwitch(std::pair<std::string, bool> setting, T& pc)
673 if (pc.name == setting.first) {
674 pc.value = setting.second;