Project
Loading...
Searching...
No Matches
AnalysisManagers.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#ifndef FRAMEWORK_ANALYSISMANAGERS_H
13#define FRAMEWORK_ANALYSISMANAGERS_H
14#include "DataAllocator.h"
18#include "Framework/ASoA.h"
26#include "Framework/Condition.h"
31
32namespace o2::framework
33{
34
35namespace
36{
37template <size_t N, std::array<soa::TableRef, N> refs>
38static inline auto extractOriginals(ProcessingContext& pc)
39{
40 return [&]<size_t... Is>(std::index_sequence<Is...>) -> std::vector<std::shared_ptr<arrow::Table>> {
41 return {pc.inputs().get<TableConsumer>(o2::aod::label<refs[Is]>())->asArrowTable()...};
42 }(std::make_index_sequence<refs.size()>());
43}
44} // namespace
45
46namespace analysis_task_parsers
47{
48
50template <typename O>
51bool appendOption(std::vector<ConfigParamSpec>&, O&)
52{
53 return false;
54}
55
56template <is_configurable O>
57bool appendOption(std::vector<ConfigParamSpec>& options, O& option)
58{
59 return ConfigurableHelpers::appendOption(options, option);
60}
61
62template <is_configurable_group O>
63bool appendOption(std::vector<ConfigParamSpec>& options, O& optionGroup)
64{
65 if constexpr (requires { optionGroup.prefix; }) {
66 homogeneous_apply_refs<true>([prefix = optionGroup.prefix]<typename C>(C& option) { // apend group prefix if set
67 if constexpr (requires { option.name; }) {
68 option.name.insert(0, 1, '.');
69 option.name.insert(0, prefix);
70 }
71 return true;
72 },
73 optionGroup);
74 }
75 homogeneous_apply_refs<true>([&options](auto& option) { return appendOption(options, option); }, optionGroup);
76 return true;
77}
78
79template <typename O>
81{
82 return false;
83}
84
85template <is_configurable O>
86bool prepareOption(InitContext& context, O& configurable)
87{
88 if constexpr (variant_trait_v<typename O::type> != VariantType::Unknown) {
89 configurable.value = context.options().get<typename O::type>(configurable.name.c_str());
90 } else {
91 auto pt = context.options().get<boost::property_tree::ptree>(configurable.name.c_str());
92 configurable.value = RootConfigParamHelpers::as<typename O::type>(pt);
93 }
94 return true;
95}
96
97template <is_configurable_group O>
98bool prepareOption(InitContext& context, O& configurableGroup)
99{
100 homogeneous_apply_refs<true>([&context](auto&& configurable) { return prepareOption(context, configurable); }, configurableGroup);
101 return true;
102}
103
105template <typename C>
106bool appendCondition(std::vector<InputSpec>&, C&)
107{
108 return false;
109}
110
111template <is_condition C>
112bool appendCondition(std::vector<InputSpec>& inputs, C& condition)
113{
114 inputs.emplace_back(InputSpec{condition.path, "AODC", runtime_hash(condition.path.c_str()), Lifetime::Condition, ccdbParamSpec(condition.path)});
115 return true;
116}
117
118template <is_condition_group C>
119bool appendCondition(std::vector<InputSpec>& inputs, C& conditionGroup)
120{
121 homogeneous_apply_refs<true>([&inputs](auto& condition) { return appendCondition(inputs, condition); }, conditionGroup);
122 return true;
123}
124
126template <typename T>
127bool requestInputs(std::vector<InputSpec>&, T const&)
128{
129 return false;
130}
131
132template <is_spawns T>
133const char* controlOption()
134{
135 return "control:spawn";
136}
137
138template <is_builds T>
139const char* controlOption()
140{
141 return "control:build";
142}
143
144template <is_defines T>
145const char* controlOption()
146{
147 return "control:define";
148}
149
150template <typename T>
151concept with_base_table = requires { T::base_specs(); };
152
153template <with_base_table T>
154bool requestInputs(std::vector<InputSpec>& inputs, T const& entity)
155{
156 auto base_specs = T::base_specs();
157 for (auto base_spec : base_specs) {
158 base_spec.metadata.push_back(ConfigParamSpec{std::string{controlOption<T>()}, VariantType::Bool, true, {"\"\""}});
159 DataSpecUtils::updateInputList(inputs, std::forward<InputSpec>(base_spec));
160 }
161 return true;
162}
163
164template <typename C>
166{
167 return false;
168}
169
170template <is_condition C>
171bool newDataframeCondition(InputRecord& record, C& condition)
172{
173 condition.instance = (typename C::type*)record.get<typename C::type*>(condition.path).get();
174 return true;
175}
176
177template <is_condition_group C>
178bool newDataframeCondition(InputRecord& record, C& conditionGroup)
179{
180 homogeneous_apply_refs<true>([&record](auto&& condition) { return newDataframeCondition(record, condition); }, conditionGroup);
181 return true;
182}
183
185template <typename T>
186bool appendOutput(std::vector<OutputSpec>&, T&, uint32_t)
187{
188 return false;
189}
190
191template <is_produces T>
192bool appendOutput(std::vector<OutputSpec>& outputs, T&, uint32_t)
193{
195 return true;
196}
197
198template <is_produces_group T>
199bool appendOutput(std::vector<OutputSpec>& outputs, T& producesGroup, uint32_t hash)
200{
201 homogeneous_apply_refs<true>([&outputs, hash](auto& produces) { return appendOutput(outputs, produces, hash); }, producesGroup);
202 return true;
203}
204
205template <is_histogram_registry T>
206bool appendOutput(std::vector<OutputSpec>& outputs, T& hr, uint32_t hash)
207{
208 hr.setHash(hash);
209 outputs.emplace_back(hr.spec());
210 return true;
211}
212
213template <is_outputobj T>
214bool appendOutput(std::vector<OutputSpec>& outputs, T& obj, uint32_t hash)
215{
216 obj.setHash(hash);
217 outputs.emplace_back(obj.spec());
218 return true;
219}
220
221template <typename T>
222 requires(is_spawns<T> || is_builds<T> || is_defines<T>)
223bool appendOutput(std::vector<OutputSpec>& outputs, T& entity, uint32_t)
224{
225 outputs.emplace_back(entity.spec());
226 return true;
227}
228
229template <typename T>
231{
232 return false;
233}
234
235template <is_histogram_registry T>
236bool postRunOutput(EndOfStreamContext& context, T& hr)
237{
238 auto& deviceSpec = context.services().get<o2::framework::DeviceSpec const>();
239 auto sendHistos = [deviceSpec, &context](HistogramRegistry const& self, TNamed* obj) mutable {
240 context.outputs().snapshot(self.ref(deviceSpec.inputTimesliceId, deviceSpec.maxInputTimeslices), *obj);
241 };
242 hr.apply(sendHistos);
243 hr.clean();
244 return true;
245}
246
247template <is_outputobj T>
248bool postRunOutput(EndOfStreamContext& context, T& obj)
249{
250 auto& deviceSpec = context.services().get<o2::framework::DeviceSpec const>();
251 context.outputs().snapshot(obj.ref(deviceSpec.inputTimesliceId, deviceSpec.maxInputTimeslices), *obj);
252 return true;
253}
254
255template <typename T>
257{
258 return false;
259}
260
261template <is_produces T>
262bool prepareOutput(ProcessingContext& context, T& produces)
263{
264 produces.resetCursor(std::move(context.outputs().make<TableBuilder>(OutputForTable<typename T::persistent_table_t>::ref())));
265 return true;
266}
267
268template <is_produces_group T>
269bool prepareOutput(ProcessingContext& context, T& producesGroup)
270{
271 homogeneous_apply_refs<true>([&context](auto& produces) { return prepareOutput(context, produces); }, producesGroup);
272 return true;
273}
274
275template <is_spawns T>
276bool prepareOutput(ProcessingContext& context, T& spawns)
277{
278 using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::spawnable_t::ref.desc_hash>>::metadata;
279 auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.size(), metadata::sources>(context), std::span{metadata::base_table_t::originalLabels});
280 if (originalTable->num_rows() == 0) {
281 originalTable = makeEmptyTable<metadata::base_table_t::ref>();
282 }
283 using D = o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>;
284
285 spawns.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<D>(originalTable,
286 o2::aod::label<metadata::extension_table_t::ref>(),
287 spawns.projectors.data(),
288 spawns.projector,
289 spawns.schema));
290 spawns.table = std::make_shared<typename T::spawnable_t::table_t>(soa::ArrowHelpers::joinTables({spawns.extension->asArrowTable(), originalTable}, std::span{T::spawnable_t::table_t::originalLabels}));
291 return true;
292}
293
294template <is_builds T>
295bool prepareOutput(ProcessingContext& context, T& builds)
296{
297 using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::buildable_t::ref.desc_hash>>::metadata;
298 return builds.build(extractOriginals<metadata::sources.size(), metadata::sources>(context));
299}
300
301template <is_defines T>
302bool prepareOutput(ProcessingContext& context, T& defines)
303 requires(T::delayed == false)
304{
305 using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::spawnable_t::ref.desc_hash>>::metadata;
306 auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.size(), metadata::sources>(context), std::span{metadata::base_table_t::originalLabels});
307 if (originalTable->num_rows() == 0) {
308 originalTable = makeEmptyTable<metadata::base_table_t::ref>();
309 }
310 if (defines.inputSchema == nullptr) {
311 defines.inputSchema = originalTable->schema();
312 }
313 using D = o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>;
314
315 defines.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<D>(originalTable,
316 o2::aod::label<metadata::extension_table_t::ref>(),
317 defines.projectors.data(),
318 defines.projector,
319 defines.schema));
320 defines.table = std::make_shared<typename T::spawnable_t::table_t>(soa::ArrowHelpers::joinTables({defines.extension->asArrowTable(), originalTable}, std::span{T::spawnable_t::table_t::originalLabels}));
321 return true;
322}
323
324template <typename T>
326{
327 return false;
328}
329
330template <is_defines T>
331 requires(T::delayed == true)
332bool prepareDelayedOutput(ProcessingContext& context, T& defines)
333{
334 if (defines.needRecompilation) {
335 defines.recompile();
336 }
337 using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::spawnable_t::ref.desc_hash>>::metadata;
338 auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.size(), metadata::sources>(context), std::span{metadata::base_table_t::originalLabels});
339 if (originalTable->num_rows() == 0) {
340 originalTable = makeEmptyTable<metadata::base_table_t::ref>();
341 }
342 if (defines.inputSchema == nullptr) {
343 defines.inputSchema = originalTable->schema();
344 }
345 using D = o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>;
346
347 defines.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<D>(originalTable,
348 o2::aod::label<metadata::extension_table_t::ref>(),
349 defines.projectors.data(),
350 defines.projector,
351 defines.schema));
352 defines.table = std::make_shared<typename T::spawnable_t::table_t>(soa::ArrowHelpers::joinTables({defines.extension->asArrowTable(), originalTable}, std::span{T::spawnable_t::table_t::originalLabels}));
353 return true;
354}
355
356template <typename T>
358{
359 return false;
360}
361
362template <is_produces T>
363bool finalizeOutput(ProcessingContext&, T& produces)
364{
365 produces.setLabel(o2::aod::label<T::persistent_table_t::ref>());
366 produces.release();
367 return true;
368}
369
370template <is_produces_group T>
371bool finalizeOutput(ProcessingContext& context, T& producesGroup)
372{
373 homogeneous_apply_refs<true>([&context](auto& produces) { return finalizeOutput(context, produces); }, producesGroup);
374 return true;
375}
376
377template <is_spawns T>
378bool finalizeOutput(ProcessingContext& context, T& spawns)
379{
380 context.outputs().adopt(spawns.output(), spawns.asArrowTable());
381 return true;
382}
383
384template <is_builds T>
385bool finalizeOutput(ProcessingContext& context, T& builds)
386{
387 context.outputs().adopt(builds.output(), builds.asArrowTable());
388 return true;
389}
390
391template <is_defines T>
392bool finalizeOutput(ProcessingContext& context, T& defines)
393{
394 context.outputs().adopt(defines.output(), defines.asArrowTable());
395 return true;
396}
397
399template <typename T>
400bool addService(std::vector<ServiceSpec>&, T&)
401{
402 return false;
403}
404
405template <is_service T>
406bool addService(std::vector<ServiceSpec>& specs, T&)
407{
409 auto p = typename T::service_t{};
410 auto loadableServices = PluginManager::parsePluginSpecString(p.loadSpec.c_str());
411 PluginManager::loadFromPlugin<ServiceSpec, ServicePlugin>(loadableServices, specs);
412 }
413 return true;
414}
415
416template <typename T>
418{
419 return false;
420}
421
422template <is_service T>
423bool prepareService(InitContext& context, T& service)
424{
425 using S = typename T::service_t;
426 if constexpr (requires { &S::instance; }) {
427 service.service = &(S::instance()); // Sigh...
428 return true;
429 } else {
430 service.service = &(context.services().get<S>());
431 return true;
432 }
433 return false;
434}
435
436template <typename T>
438{
439 return false;
440}
441
442template <is_service T>
443bool postRunService(EndOfStreamContext&, T& service)
444{
445 // FIXME: for the moment we only need endOfStream to be
446 // stateless. In the future we might want to pass it EndOfStreamContext
447 if constexpr (requires { &T::service_t::endOfStream; }) {
448 service.service->endOfStream();
449 return true;
450 }
451 return false;
452}
453
455template <typename T>
457{
458 return false;
459}
460
461template <expressions::is_filter T>
462bool updatePlaceholders(InitContext& context, T& filter)
463{
465 return true;
466}
467
468template <is_partition T>
469bool updatePlaceholders(InitContext& context, T& partition)
470{
471 partition.updatePlaceholders(context);
472 return true;
473}
474
475template <typename T>
476bool createExpressionTrees(std::vector<ExpressionInfo>&, T&)
477{
478 return false;
479}
480
481template <expressions::is_filter T>
482bool createExpressionTrees(std::vector<ExpressionInfo>& expressionInfos, T& filter)
483{
485 return true;
486}
487
488template <typename T>
490{
491 return false;
492}
493
494template <is_partition T>
495bool newDataframePartition(T& partition)
496{
497 partition.dataframeChanged = true;
498 return true;
499}
500
501template <typename P, typename... T>
502void setPartition(P&, T&...)
503{
504}
505
506template <is_partition P, typename... T>
507void setPartition(P& partition, T&... tables)
508{
509 ([&]() { if constexpr (std::same_as<typename P::content_t, T>) {partition.bindTable(tables);} }(), ...);
510}
511
512template <typename P, typename T>
514{
515}
516
517template <is_partition P, typename T>
518void bindInternalIndicesPartition(P& partition, T* table)
519{
520 if constexpr (o2::soa::is_binding_compatible_v<typename P::content_t, std::decay_t<T>>()) {
521 partition.bindInternalIndicesTo(table);
522 }
523}
524
525template <typename P, typename... T>
527{
528}
529
530template <is_partition P, typename... T>
531void bindExternalIndicesPartition(P& partition, T*... tables)
532{
533 partition.bindExternalIndices(tables...);
534}
535
537template <typename T>
539{
540 return false;
541}
542
543template <typename T>
545{
546 return false;
547}
548
549template <is_slice_cache T>
550bool initializeCache(ProcessingContext& context, T& cache)
551{
552 if (cache.ptr == nullptr) {
553 cache.ptr = &context.services().get<ArrowTableSlicingCache>();
554 }
555 return true;
556}
557
559template <typename C, typename TG, typename... Ts>
560void setGroupedCombination(C&, TG&, Ts&...)
561{
562}
563
564template <is_combinations_generator C, typename TG, typename... Ts>
565static void setGroupedCombination(C& comb, TG& grouping, std::tuple<Ts...>& associated)
566{
567 if constexpr (std::same_as<typename C::g_t, std::decay_t<TG>>) {
568 comb.setTables(grouping, associated);
569 }
570}
571
573template <typename T>
574 requires(!is_preslice<T> && !is_preslice_group<T>)
576{
577 return false;
578}
579
580template <is_preslice T>
581 requires std::same_as<typename T::policy_t, framework::PreslicePolicySorted>
582bool registerCache(T& preslice, Cache& bsks, Cache&)
583{
584 if constexpr (T::optional) {
585 if (preslice.binding == "[MISSING]") {
586 return true;
587 }
588 }
589 auto locate = std::find_if(bsks.begin(), bsks.end(), [&](auto const& entry) { return (entry.binding == preslice.bindingKey.binding) && (entry.key == preslice.bindingKey.key); });
590 if (locate == bsks.end()) {
591 bsks.emplace_back(preslice.getBindingKey());
592 } else if (locate->enabled == false) {
593 locate->enabled = true;
594 }
595 return true;
596}
597
598template <is_preslice T>
599 requires std::same_as<typename T::policy_t, framework::PreslicePolicyGeneral>
600bool registerCache(T& preslice, Cache&, Cache& bsksU)
601{
602 if constexpr (T::optional) {
603 if (preslice.binding == "[MISSING]") {
604 return true;
605 }
606 }
607 auto locate = std::find_if(bsksU.begin(), bsksU.end(), [&](auto const& entry) { return (entry.binding == preslice.bindingKey.binding) && (entry.key == preslice.bindingKey.key); });
608 if (locate == bsksU.end()) {
609 bsksU.emplace_back(preslice.getBindingKey());
610 } else if (locate->enabled == false) {
611 locate->enabled = true;
612 }
613 return true;
614}
615
616template <is_preslice_group T>
617bool registerCache(T& presliceGroup, Cache& bsks, Cache& bsksU)
618{
619 homogeneous_apply_refs<true>([&bsks, &bsksU](auto& preslice) { return registerCache(preslice, bsks, bsksU); }, presliceGroup);
620 return true;
621}
622
623template <typename T>
626{
627 return false;
628}
629
630template <is_preslice T>
631static bool updateSliceInfo(T& preslice, ArrowTableSlicingCache& cache)
632 requires std::same_as<typename T::policy_t, framework::PreslicePolicySorted>
633{
634 if constexpr (T::optional) {
635 if (preslice.binding == "[MISSING]") {
636 return true;
637 }
638 }
639 preslice.updateSliceInfo(cache.getCacheFor(preslice.getBindingKey()));
640 return true;
641}
642
643template <is_preslice T>
644static bool updateSliceInfo(T& preslice, ArrowTableSlicingCache& cache)
645 requires std::same_as<typename T::policy_t, framework::PreslicePolicyGeneral>
646{
647 if constexpr (T::optional) {
648 if (preslice.binding == "[MISSING]") {
649 return true;
650 }
651 }
652 preslice.updateSliceInfo(cache.getCacheUnsortedFor(preslice.getBindingKey()));
653 return true;
654}
655
656template <is_preslice_group T>
657static bool updateSliceInfo(T& presliceGroup, ArrowTableSlicingCache& cache)
658{
659 homogeneous_apply_refs<true>([&cache](auto& preslice) { return updateSliceInfo(preslice, cache); }, presliceGroup);
660 return true;
661}
662
664template <typename T>
665static bool setProcessSwitch(std::pair<std::string, bool>, T&)
666{
667 return false;
668}
669
670template <is_process_configurable T>
671static bool setProcessSwitch(std::pair<std::string, bool> setting, T& pc)
672{
673 if (pc.name == setting.first) {
674 pc.value = setting.second;
675 return true;
676 }
677 return false;
678}
679
680} // namespace analysis_task_parsers
681} // namespace o2::framework
682
683#endif // ANALYSISMANAGERS_H
uint32_t hash
constexpr uint32_t runtime_hash(char const *str)
void snapshot(const Output &spec, T const &object)
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
Definition InitContext.h:34
ConfigParamRegistry const & options()
Definition InitContext.h:33
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
GLuint entry
Definition glcorearb.h:5735
GLsizeiptr size
Definition glcorearb.h:659
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
bool prepareService(InitContext &, T &)
bool requestInputs(std::vector< InputSpec > &, T const &)
Table auto-creation handling.
void setGroupedCombination(C &, TG &, Ts &...)
Combinations handling.
bool initializeCache(ProcessingContext &, T &)
bool preInitializeCache(InitContext &, T &)
Cache handling.
bool prepareDelayedOutput(ProcessingContext &, T &)
bool prepareOption(InitContext &, O &)
bool appendOutput(std::vector< OutputSpec > &, T &, uint32_t)
Outputs handling.
bool finalizeOutput(ProcessingContext &, T &)
bool registerCache(T &, Cache &, Cache &)
Preslice handling.
bool postRunOutput(EndOfStreamContext &, T &)
bool createExpressionTrees(std::vector< ExpressionInfo > &, T &)
bool prepareOutput(ProcessingContext &, T &)
bool appendOption(std::vector< ConfigParamSpec > &, O &)
Options handling.
bool updateSliceInfo(T &, ArrowTableSlicingCache &)
bool appendCondition(std::vector< InputSpec > &, C &)
Conditions handling.
bool postRunService(EndOfStreamContext &, T &)
bool addService(std::vector< ServiceSpec > &, T &)
Service handling.
bool updatePlaceholders(InitContext &, T &)
Filter handling.
bool newDataframeCondition(InputRecord &, C &)
void updateExpressionInfos(expressions::Filter const &filter, std::vector< ExpressionInfo > &eInfos)
Function for attaching gandiva filters to to compatible task inputs.
void updatePlaceholders(Filter &filter, InitContext &context)
Update placeholder nodes from context.
Defining PrimaryVertex explicitly as messageable.
std::vector< Entry > Cache
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
static bool appendOption(std::vector< ConfigParamSpec > &options, Configurable< T, K, IP > &what)
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
std::vector< OutputRoute > outputs
Definition DeviceSpec.h:63
static std::vector< LoadablePlugin > parsePluginSpecString(char const *str)
Parse a comma separated list of <library>:<plugin-name> plugin declarations.
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables, std::span< const char *const > labels)
Definition ASoA.cxx:72