Project
Loading...
Searching...
No Matches
AnalysisManagers.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#ifndef FRAMEWORK_ANALYSISMANAGERS_H
13#define FRAMEWORK_ANALYSISMANAGERS_H
17#include "Framework/ASoA.h"
25#include "Framework/Condition.h"
30
31namespace o2::framework
32{
33
34namespace
35{
36template <typename O>
37static inline auto extractOriginal(ProcessingContext& pc)
38{
39 return pc.inputs().get<TableConsumer>(aod::MetadataTrait<O>::metadata::tableLabel())->asArrowTable();
40}
41
42template <typename... Os>
43static inline std::vector<std::shared_ptr<arrow::Table>> extractOriginals(framework::pack<Os...>, ProcessingContext& pc)
44{
45 return {extractOriginal<Os>(pc)...};
46}
47
48template <size_t N, std::array<soa::TableRef, N> refs>
49static inline auto extractOriginals(ProcessingContext& pc)
50{
51 return [&]<size_t... Is>(std::index_sequence<Is...>) -> std::vector<std::shared_ptr<arrow::Table>> {
52 return {pc.inputs().get<TableConsumer>(o2::aod::label<refs[Is]>())->asArrowTable()...};
53 }(std::make_index_sequence<refs.size()>());
54}
55} // namespace
56
57namespace analysis_task_parsers
58{
59
61template <typename O>
62bool appendOption(std::vector<ConfigParamSpec>&, O&)
63{
64 return false;
65}
66
67template <is_configurable O>
68bool appendOption(std::vector<ConfigParamSpec>& options, O& option)
69{
70 return ConfigurableHelpers::appendOption(options, option);
71}
72
73template <is_configurable_group O>
74bool appendOption(std::vector<ConfigParamSpec>& options, O& optionGroup)
75{
76 if constexpr (requires { optionGroup.prefix; }) {
77 homogeneous_apply_refs<true>([prefix = optionGroup.prefix]<typename C>(C& option) { // apend group prefix if set
78 if constexpr (requires { option.name; }) {
79 option.name.insert(0, 1, '.');
80 option.name.insert(0, prefix);
81 }
82 return true;
83 },
84 optionGroup);
85 }
86 homogeneous_apply_refs<true>([&options](auto& option) { return appendOption(options, option); }, optionGroup);
87 return true;
88}
89
90template <typename O>
92{
93 return false;
94}
95
96template <is_configurable O>
97bool prepareOption(InitContext& context, O& configurable)
98{
99 if constexpr (variant_trait_v<typename O::type> != VariantType::Unknown) {
100 configurable.value = context.options().get<typename O::type>(configurable.name.c_str());
101 } else {
102 auto pt = context.options().get<boost::property_tree::ptree>(configurable.name.c_str());
103 configurable.value = RootConfigParamHelpers::as<typename O::type>(pt);
104 }
105 return true;
106}
107
108template <is_configurable_group O>
109bool prepareOption(InitContext& context, O& configurableGroup)
110{
111 homogeneous_apply_refs<true>([&context](auto&& configurable) { return prepareOption(context, configurable); }, configurableGroup);
112 return true;
113}
114
116template <typename C>
117bool appendCondition(std::vector<InputSpec>&, C&)
118{
119 return false;
120}
121
122template <is_condition C>
123bool appendCondition(std::vector<InputSpec>& inputs, C& condition)
124{
125 inputs.emplace_back(InputSpec{condition.path, "AODC", runtime_hash(condition.path.c_str()), Lifetime::Condition, ccdbParamSpec(condition.path)});
126 return true;
127}
128
129template <is_condition_group C>
130bool appendCondition(std::vector<InputSpec>& inputs, C& conditionGroup)
131{
132 homogeneous_apply_refs<true>([&inputs](auto& condition) { return appendCondition(inputs, condition); }, conditionGroup);
133 return true;
134}
135
137template <typename T>
138bool requestInputs(std::vector<InputSpec>&, T const&)
139{
140 return false;
141}
142
143template <is_spawns T>
144bool requestInputs(std::vector<InputSpec>& inputs, T const& spawns)
145{
146 auto base_specs = spawns.base_specs();
147 for (auto base_spec : base_specs) {
148 base_spec.metadata.push_back(ConfigParamSpec{std::string{"control:spawn"}, VariantType::Bool, true, {"\"\""}});
149 DataSpecUtils::updateInputList(inputs, std::forward<InputSpec>(base_spec));
150 }
151 return true;
152}
153
154template <is_builds T>
155bool requestInputs(std::vector<InputSpec>& inputs, T const& builds)
156{
157 auto base_specs = builds.base_specs();
158 for (auto base_spec : base_specs) {
159 base_spec.metadata.push_back(ConfigParamSpec{std::string{"control:build"}, VariantType::Bool, true, {"\"\""}});
160 DataSpecUtils::updateInputList(inputs, std::forward<InputSpec>(base_spec));
161 }
162 return true;
163}
164
165template <typename C>
167{
168 return false;
169}
170
171template <is_condition C>
172bool newDataframeCondition(InputRecord& record, C& condition)
173{
174 condition.instance = (typename C::type*)record.get<typename C::type*>(condition.path).get();
175 return true;
176}
177
178template <is_condition_group C>
179bool newDataframeCondition(InputRecord& record, C& conditionGroup)
180{
181 homogeneous_apply_refs<true>([&record](auto&& condition) { return newDataframeCondition(record, condition); }, conditionGroup);
182 return true;
183}
184
186template <typename T>
187bool appendOutput(std::vector<OutputSpec>&, T&, uint32_t)
188{
189 return false;
190}
191
192template <is_produces T>
193bool appendOutput(std::vector<OutputSpec>& outputs, T&, uint32_t)
194{
196 return true;
197}
198
199template <is_produces_group T>
200bool appendOutput(std::vector<OutputSpec>& outputs, T& producesGroup, uint32_t hash)
201{
202 homogeneous_apply_refs<true>([&outputs, hash](auto& produces) { return appendOutput(outputs, produces, hash); }, producesGroup);
203 return true;
204}
205
206template <is_histogram_registry T>
207bool appendOutput(std::vector<OutputSpec>& outputs, T& hr, uint32_t hash)
208{
209 hr.setHash(hash);
210 outputs.emplace_back(hr.spec());
211 return true;
212}
213
214template <is_outputobj T>
215bool appendOutput(std::vector<OutputSpec>& outputs, T& obj, uint32_t hash)
216{
217 obj.setHash(hash);
218 outputs.emplace_back(obj.spec());
219 return true;
220}
221
222template <is_spawns T>
223bool appendOutput(std::vector<OutputSpec>& outputs, T& spawns, uint32_t)
224{
225 outputs.emplace_back(spawns.spec());
226 return true;
227}
228
229template <is_builds T>
230bool appendOutput(std::vector<OutputSpec>& outputs, T& builds, uint32_t)
231{
232 outputs.emplace_back(builds.spec());
233 return true;
234}
235
236template <typename T>
238{
239 return false;
240}
241
242template <is_histogram_registry T>
243bool postRunOutput(EndOfStreamContext& context, T& hr)
244{
245 auto& deviceSpec = context.services().get<o2::framework::DeviceSpec const>();
246 context.outputs().snapshot(hr.ref(deviceSpec.inputTimesliceId, deviceSpec.maxInputTimeslices), *(hr.getListOfHistograms()));
247 hr.clean();
248 return true;
249}
250
251template <is_outputobj T>
252bool postRunOutput(EndOfStreamContext& context, T& obj)
253{
254 auto& deviceSpec = context.services().get<o2::framework::DeviceSpec const>();
255 context.outputs().snapshot(obj.ref(deviceSpec.inputTimesliceId, deviceSpec.maxInputTimeslices), *obj);
256 return true;
257}
258
259template <typename T>
261{
262 return false;
263}
264
265template <is_produces T>
266bool prepareOutput(ProcessingContext& context, T& produces)
267{
268 produces.resetCursor(std::move(context.outputs().make<TableBuilder>(OutputForTable<typename T::persistent_table_t>::ref())));
269 return true;
270}
271
272template <is_produces_group T>
273bool prepareOutput(ProcessingContext& context, T& producesGroup)
274{
275 homogeneous_apply_refs<true>([&context](auto& produces) { return prepareOutput(context, produces); }, producesGroup);
276 return true;
277}
278
279template <is_spawns T>
280bool prepareOutput(ProcessingContext& context, T& spawns)
281{
282 using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::spawnable_t::ref.desc_hash>>::metadata;
283 auto originalTable = soa::ArrowHelpers::joinTables(extractOriginals<metadata::sources.size(), metadata::sources>(context));
284 if (originalTable->schema()->fields().empty() == true) {
285 using base_table_t = typename T::base_table_t::table_t;
286 originalTable = makeEmptyTable<base_table_t>(o2::aod::label<metadata::extension_table_t::ref>());
287 }
288
289 spawns.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>>(originalTable, o2::aod::label<metadata::extension_table_t::ref>()));
290 spawns.table = std::make_shared<typename T::spawnable_t::table_t>(soa::ArrowHelpers::joinTables({spawns.extension->asArrowTable(), originalTable}));
291 return true;
292}
293
294template <is_builds T>
295bool prepareOuput(ProcessingContext& context, T& builds)
296{
297 using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::buildable_t::ref.desc_hash>>::metadata;
298 return builds.template build<typename T::buildable_t::indexing_t>(builds.pack(), extractOriginals<metadata::sources.size(), metadata::sources>(context));
299}
300
301template <typename T>
303{
304 return false;
305}
306
307template <is_produces T>
308bool finalizeOutput(ProcessingContext&, T& produces)
309{
310 produces.setLabel(o2::aod::label<T::persistent_table_t::ref>());
311 produces.release();
312 return true;
313}
314
315template <is_produces_group T>
316bool finalizeOutput(ProcessingContext& context, T& producesGroup)
317{
318 homogeneous_apply_refs<true>([&context](auto& produces) { return finalizeOutput(context, produces); }, producesGroup);
319 return true;
320}
321
322template <is_spawns T>
323bool finalizeOutput(ProcessingContext& context, T& spawns)
324{
325 context.outputs().adopt(spawns.output(), spawns.asArrowTable());
326 return true;
327}
328
329template <is_builds T>
330bool finalizeOutput(ProcessingContext& context, T& builds)
331{
332 context.outputs().adopt(builds.output(), builds.asArrowTable());
333 return true;
334}
335
337template <typename T>
338bool addService(std::vector<ServiceSpec>&, T&)
339{
340 return false;
341}
342
343template <is_service T>
344bool addService(std::vector<ServiceSpec>& specs, T&)
345{
347 auto p = typename T::service_t{};
348 auto loadableServices = PluginManager::parsePluginSpecString(p.loadSpec.c_str());
349 PluginManager::loadFromPlugin<ServiceSpec, ServicePlugin>(loadableServices, specs);
350 }
351 return true;
352}
353
354template <typename T>
356{
357 return false;
358}
359
360template <is_service T>
361bool prepareService(InitContext& context, T& service)
362{
363 using S = typename T::service_t;
364 if constexpr (requires { &S::instance; }) {
365 service.service = &(S::instance()); // Sigh...
366 return true;
367 } else {
368 service.service = &(context.services().get<S>());
369 return true;
370 }
371 return false;
372}
373
374template <typename T>
376{
377 return false;
378}
379
380template <is_service T>
381bool postRunService(EndOfStreamContext&, T& service)
382{
383 // FIXME: for the moment we only need endOfStream to be
384 // stateless. In the future we might want to pass it EndOfStreamContext
385 if constexpr (requires { &T::service_t::endOfStream; }) {
386 service.service->endOfStream();
387 return true;
388 }
389 return false;
390}
391
393template <typename T>
395{
396 return false;
397}
398
399template <expressions::is_filter T>
400bool updatePlaceholders(InitContext& context, T& filter)
401{
403 return true;
404}
405
406template <is_partition T>
407bool updatePlaceholders(InitContext& context, T& partition)
408{
409 partition.updatePlaceholders(context);
410 return true;
411}
412
413template <typename T>
414bool createExpressionTrees(std::vector<ExpressionInfo>&, T&)
415{
416 return false;
417}
418
419template <expressions::is_filter T>
420bool createExpressionTrees(std::vector<ExpressionInfo>& expressionInfos, T& filter)
421{
423 return true;
424}
425
426template <typename T>
428{
429 return false;
430}
431
432template <is_partition T>
433bool newDataframePartition(T& partition)
434{
435 partition.dataframeChanged = true;
436 return true;
437}
438
439template <typename P, typename... T>
440void setPartition(P&, T&...)
441{
442}
443
444template <is_partition P, typename... T>
445void setPartition(P& partition, T&... tables)
446{
447 ([&]() { if constexpr (std::same_as<typename P::content_t, T>) {partition.bindTable(tables);} }(), ...);
448}
449
450template <typename P, typename T>
452{
453}
454
455template <is_partition P, typename T>
456void bindInternalIndicesPartition(P& partition, T* table)
457{
458 if constexpr (o2::soa::is_binding_compatible_v<typename P::content_t, std::decay_t<T>>()) {
459 partition.bindInternalIndicesTo(table);
460 }
461}
462
463template <typename P, typename... T>
465{
466}
467
468template <is_partition P, typename... T>
469void bindExternalIndicesPartition(P& partition, T*... tables)
470{
471 partition.bindExternalIndices(tables...);
472}
473
475template <typename T>
477{
478 return false;
479}
480
481template <typename T>
483{
484 return false;
485}
486
487template <is_slice_cache T>
488bool initializeCache(ProcessingContext& context, T& cache)
489{
490 if (cache.ptr == nullptr) {
491 cache.ptr = &context.services().get<ArrowTableSlicingCache>();
492 }
493 return true;
494}
495
497template <typename C, typename TG, typename... Ts>
498 requires(!is_combinations_generator<C>)
499void setGroupedCombination(C&, TG&, Ts&...)
500{
501}
502
503template <is_combinations_generator C, typename TG, typename... Ts>
504 requires((sizeof...(Ts) > 0) && (C::compatible(framework::pack<Ts...>{})))
505static void setGroupedCombination(C& comb, TG& grouping, std::tuple<Ts...>& associated)
506{
507 if constexpr (std::same_as<typename C::g_t, TG>) {
508 comb.setTables(grouping, associated);
509 }
510}
511
513template <typename T>
514 requires(!is_preslice<T>)
515bool registerCache(T&, std::vector<StringPair>&, std::vector<StringPair>&)
516{
517 return false;
518}
519
520template <is_preslice T>
521 requires std::same_as<typename T::policy_t, framework::PreslicePolicySorted>
522bool registerCache(T& preslice, std::vector<StringPair>& bsks, std::vector<StringPair>&)
523{
524 if constexpr (T::optional) {
525 if (preslice.binding == "[MISSING]") {
526 return true;
527 }
528 }
529 auto locate = std::find_if(bsks.begin(), bsks.end(), [&](auto const& entry) { return (entry.first == preslice.bindingKey.first) && (entry.second == preslice.bindingKey.second); });
530 if (locate == bsks.end()) {
531 bsks.emplace_back(preslice.getBindingKey());
532 }
533 return true;
534}
535
536template <is_preslice T>
537 requires std::same_as<typename T::policy_t, framework::PreslicePolicyGeneral>
538bool registerCache(T& preslice, std::vector<StringPair>&, std::vector<StringPair>& bsksU)
539{
540 if constexpr (T::optional) {
541 if (preslice.binding == "[MISSING]") {
542 return true;
543 }
544 }
545 auto locate = std::find_if(bsksU.begin(), bsksU.end(), [&](auto const& entry) { return (entry.first == preslice.bindingKey.first) && (entry.second == preslice.bindingKey.second); });
546 if (locate == bsksU.end()) {
547 bsksU.emplace_back(preslice.getBindingKey());
548 }
549 return true;
550}
551
552template <typename T>
553 requires(!is_preslice<T>)
555{
556 return false;
557}
558
559template <is_preslice T>
560static bool updateSliceInfo(T& preslice, ArrowTableSlicingCache& cache)
561 requires std::same_as<typename T::policy_t, framework::PreslicePolicySorted>
562{
563 if constexpr (T::optional) {
564 if (preslice.binding == "[MISSING]") {
565 return true;
566 }
567 }
568 preslice.updateSliceInfo(cache.getCacheFor(preslice.getBindingKey()));
569 return true;
570}
571
572template <is_preslice T>
573static bool updateSliceInfo(T& preslice, ArrowTableSlicingCache& cache)
574 requires std::same_as<typename T::policy_t, framework::PreslicePolicyGeneral>
575{
576 if constexpr (T::optional) {
577 if (preslice.binding == "[MISSING]") {
578 return true;
579 }
580 }
581 preslice.updateSliceInfo(cache.getCacheUnsortedFor(preslice.getBindingKey()));
582 return true;
583}
584
586template <typename T>
587static bool setProcessSwitch(std::pair<std::string, bool>, T&)
588{
589 return false;
590}
591
592template <is_process_configurable T>
593static bool setProcessSwitch(std::pair<std::string, bool> setting, T& pc)
594{
595 if (pc.name == setting.first) {
596 pc.value = setting.second;
597 return true;
598 }
599 return false;
600}
601
602} // namespace analysis_task_parsers
603} // namespace o2::framework
604
605#endif // ANALYSISMANAGERS_H
constexpr uint32_t runtime_hash(char const *str)
void snapshot(const Output &spec, T const &object)
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
Definition InitContext.h:34
ConfigParamRegistry const & options()
Definition InitContext.h:33
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
ServiceRegistryRef services()
The services registry associated with this processing context.
GLuint entry
Definition glcorearb.h:5735
GLsizeiptr size
Definition glcorearb.h:659
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
bool prepareService(InitContext &, T &)
bool requestInputs(std::vector< InputSpec > &, T const &)
Table auto-creation handling.
void setGroupedCombination(C &, TG &, Ts &...)
Combinations handling.
bool initializeCache(ProcessingContext &, T &)
bool preInitializeCache(InitContext &, T &)
Cache handling.
bool prepareOption(InitContext &, O &)
bool appendOutput(std::vector< OutputSpec > &, T &, uint32_t)
Outputs handling.
bool finalizeOutput(ProcessingContext &, T &)
bool postRunOutput(EndOfStreamContext &, T &)
bool createExpressionTrees(std::vector< ExpressionInfo > &, T &)
bool prepareOutput(ProcessingContext &, T &)
bool appendOption(std::vector< ConfigParamSpec > &, O &)
Options handling.
bool prepareOuput(ProcessingContext &context, T &builds)
bool updateSliceInfo(T &, ArrowTableSlicingCache &)
bool appendCondition(std::vector< InputSpec > &, C &)
Conditions handling.
bool postRunService(EndOfStreamContext &, T &)
bool addService(std::vector< ServiceSpec > &, T &)
Service handling.
bool updatePlaceholders(InitContext &, T &)
Filter handling.
bool registerCache(T &, std::vector< StringPair > &, std::vector< StringPair > &)
Preslice handling.
bool newDataframeCondition(InputRecord &, C &)
void updateExpressionInfos(expressions::Filter const &filter, std::vector< ExpressionInfo > &eInfos)
Function for attaching gandiva filters to to compatible task inputs.
void updatePlaceholders(Filter &filter, InitContext &context)
Update placeholder nodes from context.
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
auto spawner(std::vector< std::shared_ptr< arrow::Table > > &&tables, const char *name)
Expression-based column generator to materialize columns.
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
static bool appendOption(std::vector< ConfigParamSpec > &options, Configurable< T, K, IP > &what)
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
static std::vector< LoadablePlugin > parsePluginSpecString(char const *str)
Parse a comma separated list of <library>:<plugin-name> plugin declarations.
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables)
Definition ASoA.cxx:67