Project
Loading...
Searching...
No Matches
AnalysisManagers.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#ifndef FRAMEWORK_ANALYSISMANAGERS_H
13#define FRAMEWORK_ANALYSISMANAGERS_H
17#include "Framework/ASoA.h"
25#include "Framework/Condition.h"
30
31namespace o2::framework
32{
33
34namespace
35{
36template <typename O>
37static inline auto extractOriginal(ProcessingContext& pc)
38{
39 return pc.inputs().get<TableConsumer>(aod::MetadataTrait<O>::metadata::tableLabel())->asArrowTable();
40}
41
42template <typename... Os>
43static inline std::vector<std::shared_ptr<arrow::Table>> extractOriginals(framework::pack<Os...>, ProcessingContext& pc)
44{
45 return {extractOriginal<Os>(pc)...};
46}
47
48template <size_t N, std::array<soa::TableRef, N> refs>
49static inline auto extractOriginals(ProcessingContext& pc)
50{
51 return [&]<size_t... Is>(std::index_sequence<Is...>) -> std::vector<std::shared_ptr<arrow::Table>> {
52 return {pc.inputs().get<TableConsumer>(o2::aod::label<refs[Is]>())->asArrowTable()...};
53 }(std::make_index_sequence<refs.size()>());
54}
55} // namespace
56
57namespace analysis_task_parsers
58{
59
61template <typename O>
62bool appendOption(std::vector<ConfigParamSpec>&, O&)
63{
64 return false;
65}
66
67template <is_configurable O>
68bool appendOption(std::vector<ConfigParamSpec>& options, O& option)
69{
70 return ConfigurableHelpers::appendOption(options, option);
71}
72
73template <is_configurable_group O>
74bool appendOption(std::vector<ConfigParamSpec>& options, O& optionGroup)
75{
76 if constexpr (requires { optionGroup.prefix; }) {
77 homogeneous_apply_refs<true>([prefix = optionGroup.prefix]<typename C>(C& option) { // apend group prefix if set
78 if constexpr (requires { option.name; }) {
79 option.name.insert(0, 1, '.');
80 option.name.insert(0, prefix);
81 }
82 return true;
83 },
84 optionGroup);
85 }
86 homogeneous_apply_refs<true>([&options](auto& option) { return appendOption(options, option); }, optionGroup);
87 return true;
88}
89
90template <typename O>
92{
93 return false;
94}
95
96template <is_configurable O>
97bool prepareOption(InitContext& context, O& configurable)
98{
99 if constexpr (variant_trait_v<typename O::type> != VariantType::Unknown) {
100 configurable.value = context.options().get<typename O::type>(configurable.name.c_str());
101 } else {
102 auto pt = context.options().get<boost::property_tree::ptree>(configurable.name.c_str());
103 configurable.value = RootConfigParamHelpers::as<typename O::type>(pt);
104 }
105 return true;
106}
107
108template <is_configurable_group O>
109bool prepareOption(InitContext& context, O& configurableGroup)
110{
111 homogeneous_apply_refs<true>([&context](auto&& configurable) { return prepareOption(context, configurable); }, configurableGroup);
112 return true;
113}
114
116template <typename C>
117bool appendCondition(std::vector<InputSpec>&, C&)
118{
119 return false;
120}
121
122template <is_condition C>
123bool appendCondition(std::vector<InputSpec>& inputs, C& condition)
124{
125 inputs.emplace_back(InputSpec{condition.path, "AODC", runtime_hash(condition.path.c_str()), Lifetime::Condition, ccdbParamSpec(condition.path)});
126 return true;
127}
128
129template <is_condition_group C>
130bool appendCondition(std::vector<InputSpec>& inputs, C& conditionGroup)
131{
132 homogeneous_apply_refs<true>([&inputs](auto& condition) { return appendCondition(inputs, condition); }, conditionGroup);
133 return true;
134}
135
137template <typename T>
138bool requestInputs(std::vector<InputSpec>&, T const&)
139{
140 return false;
141}
142
143template <is_spawns T>
144const char* controlOption()
145{
146 return "control:spawn";
147}
148
149template <is_builds T>
150const char* controlOption()
151{
152 return "control:build";
153}
154
155template <is_defines T>
156const char* controlOption()
157{
158 return "control:define";
159}
160
161template <typename T>
162concept with_base_table = requires(T const& t) { t.base_specs(); };
163
164template <with_base_table T>
165bool requestInputs(std::vector<InputSpec>& inputs, T const& entity)
166{
167 auto base_specs = entity.base_specs();
168 for (auto base_spec : base_specs) {
169 base_spec.metadata.push_back(ConfigParamSpec{std::string{controlOption<T>()}, VariantType::Bool, true, {"\"\""}});
170 DataSpecUtils::updateInputList(inputs, std::forward<InputSpec>(base_spec));
171 }
172 return true;
173}
174
175template <typename C>
177{
178 return false;
179}
180
181template <is_condition C>
182bool newDataframeCondition(InputRecord& record, C& condition)
183{
184 condition.instance = (typename C::type*)record.get<typename C::type*>(condition.path).get();
185 return true;
186}
187
188template <is_condition_group C>
189bool newDataframeCondition(InputRecord& record, C& conditionGroup)
190{
191 homogeneous_apply_refs<true>([&record](auto&& condition) { return newDataframeCondition(record, condition); }, conditionGroup);
192 return true;
193}
194
196template <typename T>
197bool appendOutput(std::vector<OutputSpec>&, T&, uint32_t)
198{
199 return false;
200}
201
202template <is_produces T>
203bool appendOutput(std::vector<OutputSpec>& outputs, T&, uint32_t)
204{
206 return true;
207}
208
209template <is_produces_group T>
210bool appendOutput(std::vector<OutputSpec>& outputs, T& producesGroup, uint32_t hash)
211{
212 homogeneous_apply_refs<true>([&outputs, hash](auto& produces) { return appendOutput(outputs, produces, hash); }, producesGroup);
213 return true;
214}
215
216template <is_histogram_registry T>
217bool appendOutput(std::vector<OutputSpec>& outputs, T& hr, uint32_t hash)
218{
219 hr.setHash(hash);
220 outputs.emplace_back(hr.spec());
221 return true;
222}
223
224template <is_outputobj T>
225bool appendOutput(std::vector<OutputSpec>& outputs, T& obj, uint32_t hash)
226{
227 obj.setHash(hash);
228 outputs.emplace_back(obj.spec());
229 return true;
230}
231
232template <typename T>
233 requires(is_spawns<T> || is_builds<T> || is_defines<T>)
234bool appendOutput(std::vector<OutputSpec>& outputs, T& entity, uint32_t)
235{
236 outputs.emplace_back(entity.spec());
237 return true;
238}
239
240template <typename T>
242{
243 return false;
244}
245
246template <is_histogram_registry T>
247bool postRunOutput(EndOfStreamContext& context, T& hr)
248{
249 auto& deviceSpec = context.services().get<o2::framework::DeviceSpec const>();
250 context.outputs().snapshot(hr.ref(deviceSpec.inputTimesliceId, deviceSpec.maxInputTimeslices), *(hr.getListOfHistograms()));
251 hr.clean();
252 return true;
253}
254
255template <is_outputobj T>
256bool postRunOutput(EndOfStreamContext& context, T& obj)
257{
258 auto& deviceSpec = context.services().get<o2::framework::DeviceSpec const>();
259 context.outputs().snapshot(obj.ref(deviceSpec.inputTimesliceId, deviceSpec.maxInputTimeslices), *obj);
260 return true;
261}
262
263template <typename T>
265{
266 return false;
267}
268
269template <is_produces T>
270bool prepareOutput(ProcessingContext& context, T& produces)
271{
272 produces.resetCursor(std::move(context.outputs().make<TableBuilder>(OutputForTable<typename T::persistent_table_t>::ref())));
273 return true;
274}
275
276template <is_produces_group T>
277bool prepareOutput(ProcessingContext& context, T& producesGroup)
278{
279 homogeneous_apply_refs<true>([&context](auto& produces) { return prepareOutput(context, produces); }, producesGroup);
280 return true;
281}
282
283template <is_spawns T>
284bool prepareOutput(ProcessingContext& context, T& spawns)
285{
286 using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::spawnable_t::ref.desc_hash>>::metadata;
287 auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.size(), metadata::sources>(context), std::span{metadata::base_table_t::originalLabels});
288 if (originalTable->schema()->fields().empty() == true) {
289 using base_table_t = typename T::base_table_t::table_t;
290 originalTable = makeEmptyTable<base_table_t>(o2::aod::label<metadata::extension_table_t::ref>());
291 }
292 using D = o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>;
293
294 spawns.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<D>(originalTable,
295 o2::aod::label<metadata::extension_table_t::ref>(),
296 spawns.projectors.data(),
297 spawns.projector,
298 spawns.schema));
299 spawns.table = std::make_shared<typename T::spawnable_t::table_t>(soa::ArrowHelpers::joinTables({spawns.extension->asArrowTable(), originalTable}, std::span{T::spawnable_t::table_t::originalLabels}));
300 return true;
301}
302
303template <is_builds T>
304bool prepareOutput(ProcessingContext& context, T& builds)
305{
306 using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::buildable_t::ref.desc_hash>>::metadata;
307 return builds.template build<typename T::buildable_t::indexing_t>(builds.pack(), extractOriginals<metadata::sources.size(), metadata::sources>(context));
308}
309
310template <is_defines T>
311bool prepareOutput(ProcessingContext& context, T& defines)
312 requires(T::delayed == false)
313{
314 using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::spawnable_t::ref.desc_hash>>::metadata;
315 auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.size(), metadata::sources>(context), std::span{metadata::base_table_t::originalLabels});
316 if (originalTable->schema()->fields().empty() == true) {
317 using base_table_t = typename T::base_table_t::table_t;
318 originalTable = makeEmptyTable<base_table_t>(o2::aod::label<metadata::extension_table_t::ref>());
319 }
320 if (defines.inputSchema == nullptr) {
321 defines.inputSchema = originalTable->schema();
322 }
323 using D = o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>;
324
325 defines.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<D>(originalTable,
326 o2::aod::label<metadata::extension_table_t::ref>(),
327 defines.projectors.data(),
328 defines.projector,
329 defines.schema));
330 defines.table = std::make_shared<typename T::spawnable_t::table_t>(soa::ArrowHelpers::joinTables({defines.extension->asArrowTable(), originalTable}, std::span{T::spawnable_t::table_t::originalLabels}));
331 return true;
332}
333
334template <typename T>
336{
337 return false;
338}
339
340template <is_defines T>
341 requires(T::delayed == true)
342bool prepareDelayedOutput(ProcessingContext& context, T& defines)
343{
344 if (defines.needRecompilation) {
345 defines.recompile();
346 }
347 using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::spawnable_t::ref.desc_hash>>::metadata;
348 auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.size(), metadata::sources>(context), std::span{metadata::base_table_t::originalLabels});
349 if (originalTable->schema()->fields().empty() == true) {
350 using base_table_t = typename T::base_table_t::table_t;
351 originalTable = makeEmptyTable<base_table_t>(o2::aod::label<metadata::extension_table_t::ref>());
352 }
353 if (defines.inputSchema == nullptr) {
354 defines.inputSchema = originalTable->schema();
355 }
356 using D = o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>;
357
358 defines.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<D>(originalTable,
359 o2::aod::label<metadata::extension_table_t::ref>(),
360 defines.projectors.data(),
361 defines.projector,
362 defines.schema));
363 defines.table = std::make_shared<typename T::spawnable_t::table_t>(soa::ArrowHelpers::joinTables({defines.extension->asArrowTable(), originalTable}, std::span{T::spawnable_t::table_t::originalLabels}));
364 return true;
365}
366
367template <typename T>
369{
370 return false;
371}
372
373template <is_produces T>
374bool finalizeOutput(ProcessingContext&, T& produces)
375{
376 produces.setLabel(o2::aod::label<T::persistent_table_t::ref>());
377 produces.release();
378 return true;
379}
380
381template <is_produces_group T>
382bool finalizeOutput(ProcessingContext& context, T& producesGroup)
383{
384 homogeneous_apply_refs<true>([&context](auto& produces) { return finalizeOutput(context, produces); }, producesGroup);
385 return true;
386}
387
388template <is_spawns T>
389bool finalizeOutput(ProcessingContext& context, T& spawns)
390{
391 context.outputs().adopt(spawns.output(), spawns.asArrowTable());
392 return true;
393}
394
395template <is_builds T>
396bool finalizeOutput(ProcessingContext& context, T& builds)
397{
398 context.outputs().adopt(builds.output(), builds.asArrowTable());
399 return true;
400}
401
402template <is_defines T>
403bool finalizeOutput(ProcessingContext& context, T& defines)
404{
405 context.outputs().adopt(defines.output(), defines.asArrowTable());
406 return true;
407}
408
410template <typename T>
411bool addService(std::vector<ServiceSpec>&, T&)
412{
413 return false;
414}
415
416template <is_service T>
417bool addService(std::vector<ServiceSpec>& specs, T&)
418{
420 auto p = typename T::service_t{};
421 auto loadableServices = PluginManager::parsePluginSpecString(p.loadSpec.c_str());
422 PluginManager::loadFromPlugin<ServiceSpec, ServicePlugin>(loadableServices, specs);
423 }
424 return true;
425}
426
427template <typename T>
429{
430 return false;
431}
432
433template <is_service T>
434bool prepareService(InitContext& context, T& service)
435{
436 using S = typename T::service_t;
437 if constexpr (requires { &S::instance; }) {
438 service.service = &(S::instance()); // Sigh...
439 return true;
440 } else {
441 service.service = &(context.services().get<S>());
442 return true;
443 }
444 return false;
445}
446
447template <typename T>
449{
450 return false;
451}
452
453template <is_service T>
454bool postRunService(EndOfStreamContext&, T& service)
455{
456 // FIXME: for the moment we only need endOfStream to be
457 // stateless. In the future we might want to pass it EndOfStreamContext
458 if constexpr (requires { &T::service_t::endOfStream; }) {
459 service.service->endOfStream();
460 return true;
461 }
462 return false;
463}
464
466template <typename T>
468{
469 return false;
470}
471
472template <expressions::is_filter T>
473bool updatePlaceholders(InitContext& context, T& filter)
474{
476 return true;
477}
478
479template <is_partition T>
480bool updatePlaceholders(InitContext& context, T& partition)
481{
482 partition.updatePlaceholders(context);
483 return true;
484}
485
486template <typename T>
487bool createExpressionTrees(std::vector<ExpressionInfo>&, T&)
488{
489 return false;
490}
491
492template <expressions::is_filter T>
493bool createExpressionTrees(std::vector<ExpressionInfo>& expressionInfos, T& filter)
494{
496 return true;
497}
498
499template <typename T>
501{
502 return false;
503}
504
505template <is_partition T>
506bool newDataframePartition(T& partition)
507{
508 partition.dataframeChanged = true;
509 return true;
510}
511
512template <typename P, typename... T>
513void setPartition(P&, T&...)
514{
515}
516
517template <is_partition P, typename... T>
518void setPartition(P& partition, T&... tables)
519{
520 ([&]() { if constexpr (std::same_as<typename P::content_t, T>) {partition.bindTable(tables);} }(), ...);
521}
522
523template <typename P, typename T>
525{
526}
527
528template <is_partition P, typename T>
529void bindInternalIndicesPartition(P& partition, T* table)
530{
531 if constexpr (o2::soa::is_binding_compatible_v<typename P::content_t, std::decay_t<T>>()) {
532 partition.bindInternalIndicesTo(table);
533 }
534}
535
536template <typename P, typename... T>
538{
539}
540
541template <is_partition P, typename... T>
542void bindExternalIndicesPartition(P& partition, T*... tables)
543{
544 partition.bindExternalIndices(tables...);
545}
546
548template <typename T>
550{
551 return false;
552}
553
554template <typename T>
556{
557 return false;
558}
559
560template <is_slice_cache T>
561bool initializeCache(ProcessingContext& context, T& cache)
562{
563 if (cache.ptr == nullptr) {
564 cache.ptr = &context.services().get<ArrowTableSlicingCache>();
565 }
566 return true;
567}
568
570template <typename C, typename TG, typename... Ts>
571void setGroupedCombination(C&, TG&, Ts&...)
572{
573}
574
575template <is_combinations_generator C, typename TG, typename... Ts>
576static void setGroupedCombination(C& comb, TG& grouping, std::tuple<Ts...>& associated)
577{
578 if constexpr (std::same_as<typename C::g_t, std::decay_t<TG>>) {
579 comb.setTables(grouping, associated);
580 }
581}
582
584template <typename T>
585 requires(!is_preslice<T> && !is_preslice_group<T>)
587{
588 return false;
589}
590
591template <is_preslice T>
592 requires std::same_as<typename T::policy_t, framework::PreslicePolicySorted>
593bool registerCache(T& preslice, Cache& bsks, Cache&)
594{
595 if constexpr (T::optional) {
596 if (preslice.binding == "[MISSING]") {
597 return true;
598 }
599 }
600 auto locate = std::find_if(bsks.begin(), bsks.end(), [&](auto const& entry) { return (entry.binding == preslice.bindingKey.binding) && (entry.key == preslice.bindingKey.key); });
601 if (locate == bsks.end()) {
602 bsks.emplace_back(preslice.getBindingKey());
603 } else if (locate->enabled == false) {
604 locate->enabled = true;
605 }
606 return true;
607}
608
609template <is_preslice T>
610 requires std::same_as<typename T::policy_t, framework::PreslicePolicyGeneral>
611bool registerCache(T& preslice, Cache&, Cache& bsksU)
612{
613 if constexpr (T::optional) {
614 if (preslice.binding == "[MISSING]") {
615 return true;
616 }
617 }
618 auto locate = std::find_if(bsksU.begin(), bsksU.end(), [&](auto const& entry) { return (entry.binding == preslice.bindingKey.binding) && (entry.key == preslice.bindingKey.key); });
619 if (locate == bsksU.end()) {
620 bsksU.emplace_back(preslice.getBindingKey());
621 } else if (locate->enabled == false) {
622 locate->enabled = true;
623 }
624 return true;
625}
626
627template <is_preslice_group T>
628bool registerCache(T& presliceGroup, Cache& bsks, Cache& bsksU)
629{
630 homogeneous_apply_refs<true>([&bsks, &bsksU](auto& preslice) { return registerCache(preslice, bsks, bsksU); }, presliceGroup);
631 return true;
632}
633
634template <typename T>
637{
638 return false;
639}
640
641template <is_preslice T>
642static bool updateSliceInfo(T& preslice, ArrowTableSlicingCache& cache)
643 requires std::same_as<typename T::policy_t, framework::PreslicePolicySorted>
644{
645 if constexpr (T::optional) {
646 if (preslice.binding == "[MISSING]") {
647 return true;
648 }
649 }
650 preslice.updateSliceInfo(cache.getCacheFor(preslice.getBindingKey()));
651 return true;
652}
653
654template <is_preslice T>
655static bool updateSliceInfo(T& preslice, ArrowTableSlicingCache& cache)
656 requires std::same_as<typename T::policy_t, framework::PreslicePolicyGeneral>
657{
658 if constexpr (T::optional) {
659 if (preslice.binding == "[MISSING]") {
660 return true;
661 }
662 }
663 preslice.updateSliceInfo(cache.getCacheUnsortedFor(preslice.getBindingKey()));
664 return true;
665}
666
667template <is_preslice_group T>
668static bool updateSliceInfo(T& presliceGroup, ArrowTableSlicingCache& cache)
669{
670 homogeneous_apply_refs<true>([&cache](auto& preslice) { return updateSliceInfo(preslice, cache); }, presliceGroup);
671 return true;
672}
673
675template <typename T>
676static bool setProcessSwitch(std::pair<std::string, bool>, T&)
677{
678 return false;
679}
680
681template <is_process_configurable T>
682static bool setProcessSwitch(std::pair<std::string, bool> setting, T& pc)
683{
684 if (pc.name == setting.first) {
685 pc.value = setting.second;
686 return true;
687 }
688 return false;
689}
690
691} // namespace analysis_task_parsers
692} // namespace o2::framework
693
694#endif // ANALYSISMANAGERS_H
constexpr uint32_t runtime_hash(char const *str)
void snapshot(const Output &spec, T const &object)
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
Definition InitContext.h:34
ConfigParamRegistry const & options()
Definition InitContext.h:33
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
GLuint entry
Definition glcorearb.h:5735
GLsizeiptr size
Definition glcorearb.h:659
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
bool prepareService(InitContext &, T &)
bool requestInputs(std::vector< InputSpec > &, T const &)
Table auto-creation handling.
void setGroupedCombination(C &, TG &, Ts &...)
Combinations handling.
bool initializeCache(ProcessingContext &, T &)
bool preInitializeCache(InitContext &, T &)
Cache handling.
bool prepareDelayedOutput(ProcessingContext &, T &)
bool prepareOption(InitContext &, O &)
bool appendOutput(std::vector< OutputSpec > &, T &, uint32_t)
Outputs handling.
bool finalizeOutput(ProcessingContext &, T &)
bool registerCache(T &, Cache &, Cache &)
Preslice handling.
bool postRunOutput(EndOfStreamContext &, T &)
bool createExpressionTrees(std::vector< ExpressionInfo > &, T &)
bool prepareOutput(ProcessingContext &, T &)
bool appendOption(std::vector< ConfigParamSpec > &, O &)
Options handling.
bool updateSliceInfo(T &, ArrowTableSlicingCache &)
bool appendCondition(std::vector< InputSpec > &, C &)
Conditions handling.
bool postRunService(EndOfStreamContext &, T &)
bool addService(std::vector< ServiceSpec > &, T &)
Service handling.
bool updatePlaceholders(InitContext &, T &)
Filter handling.
bool newDataframeCondition(InputRecord &, C &)
void updateExpressionInfos(expressions::Filter const &filter, std::vector< ExpressionInfo > &eInfos)
Function for attaching gandiva filters to to compatible task inputs.
void updatePlaceholders(Filter &filter, InitContext &context)
Update placeholder nodes from context.
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< Entry > Cache
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
static bool appendOption(std::vector< ConfigParamSpec > &options, Configurable< T, K, IP > &what)
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static std::vector< LoadablePlugin > parsePluginSpecString(char const *str)
Parse a comma separated list of <library>:<plugin-name> plugin declarations.
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables, std::span< const char *const > labels)
Definition ASoA.cxx:72