58namespace analysis_task_parsers
68template <is_configurable O>
69bool appendOption(std::vector<ConfigParamSpec>& options, O& option)
74template <is_configurable_group O>
75bool appendOption(std::vector<ConfigParamSpec>& options, O& optionGroup)
77 if constexpr (
requires { optionGroup.prefix; }) {
78 homogeneous_apply_refs<true>([prefix = optionGroup.prefix]<
typename C>(C& option) {
79 if constexpr (requires { option.name; }) {
80 option.name.insert(0, 1,
'.');
81 option.name.insert(0, prefix);
87 homogeneous_apply_refs<true>([&options](
auto& option) {
return appendOption(options, option); }, optionGroup);
97template <is_configurable O>
98bool prepareOption(
InitContext& context, O& configurable)
101 configurable.value = context.
options().
get<
typename O::type>(configurable.name.c_str());
103 auto pt = context.
options().
get<boost::property_tree::ptree>(configurable.name.c_str());
104 configurable.value = RootConfigParamHelpers::as<typename O::type>(pt);
109template <is_configurable_group O>
110bool prepareOption(InitContext& context, O& configurableGroup)
112 homogeneous_apply_refs<true>([&context](
auto&& configurable) {
return prepareOption(context, configurable); }, configurableGroup);
123template <is_condition C>
124bool appendCondition(std::vector<InputSpec>& inputs, C& condition)
130template <is_condition_group C>
131bool appendCondition(std::vector<InputSpec>& inputs, C& conditionGroup)
133 homogeneous_apply_refs<true>([&inputs](
auto& condition) {
return appendCondition(inputs, condition); }, conditionGroup);
144template <is_spawns T>
147 return "control:spawn";
150template <is_builds T>
151const char* controlOption()
153 return "control:build";
156template <is_defines T>
157const char* controlOption()
159 return "control:define";
165template <with_base_table T>
166bool requestInputs(std::vector<InputSpec>& inputs, T
const& entity)
168 auto base_specs = entity.base_specs();
169 for (
auto base_spec : base_specs) {
182template <is_condition C>
183bool newDataframeCondition(
InputRecord& record, C& condition)
185 condition.instance = (
typename C::type*)record.
get<
typename C::type*>(condition.path).get();
189template <is_condition_group C>
190bool newDataframeCondition(
InputRecord& record, C& conditionGroup)
192 homogeneous_apply_refs<true>([&record](
auto&& condition) {
return newDataframeCondition(record, condition); }, conditionGroup);
203template <is_produces T>
204bool appendOutput(std::vector<OutputSpec>& outputs, T&, uint32_t)
210template <is_produces_group T>
211bool appendOutput(std::vector<OutputSpec>& outputs, T& producesGroup, uint32_t hash)
213 homogeneous_apply_refs<true>([&outputs, hash](
auto& produces) {
return appendOutput(outputs, produces, hash); }, producesGroup);
217template <is_histogram_registry T>
218bool appendOutput(std::vector<OutputSpec>& outputs, T& hr, uint32_t hash)
221 outputs.emplace_back(hr.spec());
225template <is_outputobj T>
226bool appendOutput(std::vector<OutputSpec>& outputs, T& obj, uint32_t hash)
229 outputs.emplace_back(obj.spec());
234 requires(is_spawns<T> || is_builds<T> || is_defines<T>)
235bool appendOutput(std::vector<OutputSpec>& outputs, T& entity, uint32_t)
237 outputs.emplace_back(entity.spec());
247template <is_histogram_registry T>
251 auto sendHistos = [deviceSpec, &context](
HistogramRegistry const& self, TNamed* obj)
mutable {
252 context.
outputs().
snapshot(self.ref(deviceSpec.inputTimesliceId, deviceSpec.maxInputTimeslices), *obj);
254 hr.apply(sendHistos);
259template <is_outputobj T>
260bool postRunOutput(EndOfStreamContext& context, T& obj)
263 context.
outputs().snapshot(obj.ref(deviceSpec.inputTimesliceId, deviceSpec.maxInputTimeslices), *obj);
273template <is_produces T>
280template <is_produces_group T>
283 homogeneous_apply_refs<true>([&context](
auto& produces) {
return prepareOutput(context, produces); }, producesGroup);
287template <is_spawns T>
291 auto originalTable =
soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.
size(), metadata::sources>(context), std::span{metadata::base_table_t::originalLabels});
292 if (originalTable->schema()->fields().empty() ==
true) {
293 using base_table_t =
typename T::base_table_t::table_t;
294 originalTable = makeEmptyTable<base_table_t>(o2::aod::label<metadata::extension_table_t::ref>());
296 using D =
o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>;
298 spawns.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<D>(originalTable,
299 o2::aod::label<metadata::extension_table_t::ref>(),
300 spawns.projectors.data(),
303 spawns.table = std::make_shared<typename T::spawnable_t::table_t>(
soa::ArrowHelpers::joinTables({spawns.extension->asArrowTable(), originalTable}, std::span{T::spawnable_t::table_t::originalLabels}));
307template <is_builds T>
311 return builds.template build<typename T::buildable_t::indexing_t>(builds.pack(), extractOriginals<metadata::sources.size(), metadata::sources>(context));
314template <is_defines T>
316 requires(T::delayed ==
false)
319 auto originalTable =
soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.
size(), metadata::sources>(context), std::span{metadata::base_table_t::originalLabels});
320 if (originalTable->schema()->fields().empty() ==
true) {
321 using base_table_t =
typename T::base_table_t::table_t;
322 originalTable = makeEmptyTable<base_table_t>(o2::aod::label<metadata::extension_table_t::ref>());
324 if (defines.inputSchema ==
nullptr) {
325 defines.inputSchema = originalTable->schema();
327 using D =
o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>;
329 defines.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<D>(originalTable,
330 o2::aod::label<metadata::extension_table_t::ref>(),
331 defines.projectors.data(),
334 defines.table = std::make_shared<typename T::spawnable_t::table_t>(
soa::ArrowHelpers::joinTables({defines.extension->asArrowTable(), originalTable}, std::span{T::spawnable_t::table_t::originalLabels}));
344template <is_defines T>
345 requires(T::delayed ==
true)
348 if (defines.needRecompilation) {
352 auto originalTable =
soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.
size(), metadata::sources>(context), std::span{metadata::base_table_t::originalLabels});
353 if (originalTable->schema()->fields().empty() ==
true) {
354 using base_table_t =
typename T::base_table_t::table_t;
355 originalTable = makeEmptyTable<base_table_t>(o2::aod::label<metadata::extension_table_t::ref>());
357 if (defines.inputSchema ==
nullptr) {
358 defines.inputSchema = originalTable->schema();
360 using D =
o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>;
362 defines.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<D>(originalTable,
363 o2::aod::label<metadata::extension_table_t::ref>(),
364 defines.projectors.data(),
367 defines.table = std::make_shared<typename T::spawnable_t::table_t>(
soa::ArrowHelpers::joinTables({defines.extension->asArrowTable(), originalTable}, std::span{T::spawnable_t::table_t::originalLabels}));
377template <is_produces T>
380 produces.setLabel(o2::aod::label<T::persistent_table_t::ref>());
385template <is_produces_group T>
388 homogeneous_apply_refs<true>([&context](
auto& produces) {
return finalizeOutput(context, produces); }, producesGroup);
392template <is_spawns T>
395 context.outputs().adopt(spawns.output(), spawns.asArrowTable());
399template <is_builds T>
402 context.outputs().adopt(builds.output(), builds.asArrowTable());
406template <is_defines T>
409 context.outputs().adopt(defines.output(), defines.asArrowTable());
420template <is_service T>
421bool addService(std::vector<ServiceSpec>& specs, T&)
424 auto p =
typename T::service_t{};
426 PluginManager::loadFromPlugin<ServiceSpec, ServicePlugin>(loadableServices, specs);
437template <is_service T>
438bool prepareService(
InitContext& context, T& service)
440 using S =
typename T::service_t;
441 if constexpr (
requires { &S::instance; }) {
442 service.service = &(S::instance());
457template <is_service T>
462 if constexpr (
requires { &T::service_t::endOfStream; }) {
463 service.service->endOfStream();
476template <expressions::is_filter T>
483template <is_partition T>
484bool updatePlaceholders(
InitContext& context, T& partition)
486 partition.updatePlaceholders(context);
496template <expressions::is_filter T>
497bool createExpressionTrees(std::vector<ExpressionInfo>& expressionInfos, T&
filter)
509template <is_partition T>
510bool newDataframePartition(T& partition)
512 partition.dataframeChanged =
true;
516template <
typename P,
typename... T>
524 ([&]() {
if constexpr (std::same_as<typename P::content_t, T>) {partition.bindTable(tables);} }(), ...);
527template <
typename P,
typename T>
532template <is_partition P,
typename T>
533void bindInternalIndicesPartition(
P& partition, T* table)
535 if constexpr (o2::soa::is_binding_compatible_v<typename P::content_t, std::decay_t<T>>()) {
536 partition.bindInternalIndicesTo(table);
540template <
typename P,
typename... T>
548 partition.bindExternalIndices(tables...);
564template <is_slice_cache T>
567 if (cache.ptr ==
nullptr) {
574template <
typename C,
typename TG,
typename... Ts>
580static void setGroupedCombination(C& comb, TG& grouping, std::tuple<Ts...>& associated)
582 if constexpr (std::same_as<typename C::g_t, std::decay_t<TG>>) {
583 comb.setTables(grouping, associated);
589 requires(!is_preslice<T> && !is_preslice_group<T>)
595template <is_preslice T>
596 requires std::same_as<typename T::policy_t, framework::PreslicePolicySorted>
599 if constexpr (T::optional) {
600 if (preslice.binding ==
"[MISSING]") {
604 auto locate = std::find_if(bsks.begin(), bsks.end(), [&](
auto const&
entry) { return (entry.binding == preslice.bindingKey.binding) && (entry.key == preslice.bindingKey.key); });
605 if (locate == bsks.end()) {
606 bsks.emplace_back(preslice.getBindingKey());
607 }
else if (locate->enabled ==
false) {
608 locate->enabled =
true;
613template <is_preslice T>
614 requires std::same_as<typename T::policy_t, framework::PreslicePolicyGeneral>
617 if constexpr (T::optional) {
618 if (preslice.binding ==
"[MISSING]") {
622 auto locate = std::find_if(bsksU.begin(), bsksU.end(), [&](
auto const&
entry) { return (entry.binding == preslice.bindingKey.binding) && (entry.key == preslice.bindingKey.key); });
623 if (locate == bsksU.end()) {
624 bsksU.emplace_back(preslice.getBindingKey());
625 }
else if (locate->enabled ==
false) {
626 locate->enabled =
true;
631template <is_preslice_group T>
634 homogeneous_apply_refs<true>([&bsks, &bsksU](
auto& preslice) {
return registerCache(preslice, bsks, bsksU); }, presliceGroup);
645template <is_preslice T>
647 requires std::same_as<typename T::policy_t, framework::PreslicePolicySorted>
649 if constexpr (T::optional) {
650 if (preslice.binding ==
"[MISSING]") {
654 preslice.updateSliceInfo(cache.getCacheFor(preslice.getBindingKey()));
658template <is_preslice T>
660 requires std::same_as<typename T::policy_t, framework::PreslicePolicyGeneral>
662 if constexpr (T::optional) {
663 if (preslice.binding ==
"[MISSING]") {
667 preslice.updateSliceInfo(cache.getCacheUnsortedFor(preslice.getBindingKey()));
671template <is_preslice_group T>
672static bool updateSliceInfo(T& presliceGroup, ArrowTableSlicingCache& cache)
674 homogeneous_apply_refs<true>([&cache](
auto& preslice) {
return updateSliceInfo(preslice, cache); }, presliceGroup);
680static bool setProcessSwitch(std::pair<std::string, bool>, T&)
685template <is_process_configurable T>
686static bool setProcessSwitch(std::pair<std::string, bool> setting, T& pc)
688 if (pc.name == setting.first) {
689 pc.value = setting.second;