Project
Loading...
Searching...
No Matches
AnalysisManagers.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#ifndef FRAMEWORK_ANALYSISMANAGERS_H
13#define FRAMEWORK_ANALYSISMANAGERS_H
14#include "DataAllocator.h"
18#include "Framework/ASoA.h"
26#include "Framework/Condition.h"
31
32namespace o2::framework
33{
34
35namespace
36{
37template <size_t N, std::array<soa::TableRef, N> refs>
38static inline auto extractOriginals(ProcessingContext& pc)
39{
40 return [&]<size_t... Is>(std::index_sequence<Is...>) -> std::vector<std::shared_ptr<arrow::Table>> {
41 return {pc.inputs().get<TableConsumer>(o2::aod::matcher<refs[Is]>())->asArrowTable()...};
42 }(std::make_index_sequence<refs.size()>());
43}
44
45template <std::ranges::input_range R>
46static auto extractTablesFromRecord(InputRecord& record, R matchers)
47{
48 std::vector<std::shared_ptr<arrow::Table>> tables;
49 std::ranges::transform(matchers, std::back_inserter(tables), [&record](auto const& m) {
50 return record.get<TableConsumer>(m)->asArrowTable();
51 });
52 return tables;
53}
54
55} // namespace
56
57namespace analysis_task_parsers
58{
59
61template <typename O>
62bool appendOption(std::vector<ConfigParamSpec>&, O&)
63{
64 return false;
65}
66
67template <is_configurable O>
68bool appendOption(std::vector<ConfigParamSpec>& options, O& option)
69{
70 return ConfigurableHelpers::appendOption(options, option);
71}
72
73template <is_configurable_group O>
74bool appendOption(std::vector<ConfigParamSpec>& options, O& optionGroup)
75{
76 if constexpr (requires { optionGroup.prefix; }) {
77 homogeneous_apply_refs<true>([prefix = optionGroup.prefix]<typename C>(C& option) { // apend group prefix if set
78 if constexpr (requires { option.name; }) {
79 option.name.insert(0, 1, '.');
80 option.name.insert(0, prefix);
81 }
82 return true;
83 },
84 optionGroup);
85 }
86 homogeneous_apply_refs<true>([&options](auto& option) { return appendOption(options, option); }, optionGroup);
87 return true;
88}
89
90template <typename O>
92{
93 return false;
94}
95
96template <is_configurable O>
97bool prepareOption(InitContext& context, O& configurable)
98{
99 if constexpr (variant_trait_v<typename O::type> != VariantType::Unknown) {
100 configurable.value = context.options().get<typename O::type>(configurable.name.c_str());
101 } else {
102 auto pt = context.options().get<boost::property_tree::ptree>(configurable.name.c_str());
103 configurable.value = RootConfigParamHelpers::as<typename O::type>(pt);
104 }
105 return true;
106}
107
108template <is_configurable_group O>
109bool prepareOption(InitContext& context, O& configurableGroup)
110{
111 homogeneous_apply_refs<true>([&context](auto&& configurable) { return prepareOption(context, configurable); }, configurableGroup);
112 return true;
113}
114
116template <typename C>
117bool appendCondition(std::vector<InputSpec>&, C&)
118{
119 return false;
120}
121
122template <is_condition C>
123bool appendCondition(std::vector<InputSpec>& inputs, C& condition)
124{
125 inputs.emplace_back(InputSpec{condition.path, "AODC", runtime_hash(condition.path.c_str()), Lifetime::Condition, ccdbParamSpec(condition.path)});
126 return true;
127}
128
129template <is_condition_group C>
130bool appendCondition(std::vector<InputSpec>& inputs, C& conditionGroup)
131{
132 homogeneous_apply_refs<true>([&inputs](auto& condition) { return appendCondition(inputs, condition); }, conditionGroup);
133 return true;
134}
135
137
138template <typename T>
139concept with_required_inputs = requires(T t) { t.getRequiredInputs(); };
140
141template <typename T>
142 requires(!with_required_inputs<T>)
143bool requestInputs(std::vector<InputSpec>&, T&, header::DataOrigin)
144{
145 return false;
146}
147
148template <typename T>
150{
151 return false;
152}
153
154template <is_spawns T>
155const char* controlOption()
156{
157 return "control:spawn";
158}
159
160template <is_builds T>
161const char* controlOption()
162{
163 return "control:build";
164}
165
166template <is_defines T>
167const char* controlOption()
168{
169 return "control:define";
170}
171
172template <with_required_inputs T>
173bool requestInputs(std::vector<InputSpec>& inputs, T& entity, header::DataOrigin const& newOrigin = header::DataOrigin{"AOD"})
174{
175 entity.requiredInputs = entity.getRequiredInputs(newOrigin);
176 for (auto base_spec : entity.requiredInputs) {
177 base_spec.metadata.push_back(ConfigParamSpec{std::string{controlOption<T>()}, VariantType::Bool, true, {"\"\""}});
178 DataSpecUtils::updateInputList(inputs, std::forward<InputSpec>(base_spec));
179 }
180 return true;
181}
182
183template <typename T>
184concept with_updateable_output = requires(T t) { t.updateOutputSpec(); };
185
186template <with_updateable_output T>
187bool updateOutputSpec(T& entity, header::DataOrigin newOrigin = header::DataOrigin{"AOD"})
188{
189 entity.outputSpec = entity.updateOutputSpec(newOrigin);
190 return true;
191}
192
193template <is_produces_group T>
194bool updateOutputSpec(T& producesGroup, header::DataOrigin newOrigin = header::DataOrigin{"AOD"})
195{
196 homogeneous_apply_refs<true>([&newOrigin](auto& produces) { return updateOutputSpec(produces, newOrigin); }, producesGroup);
197 return true;
198}
199
200template <typename C>
202{
203 return false;
204}
205
206template <is_condition C>
207bool newDataframeCondition(InputRecord& record, C& condition)
208{
209 condition.instance = (typename C::type*)record.get<typename C::type*>(condition.path).release();
210 return true;
211}
212
213template <is_condition_group C>
214bool newDataframeCondition(InputRecord& record, C& conditionGroup)
215{
216 homogeneous_apply_refs<true>([&record](auto&& condition) { return newDataframeCondition(record, condition); }, conditionGroup);
217 return true;
218}
219
221template <typename T>
222constexpr bool appendOutput(std::vector<OutputSpec>&, T&, uint32_t)
223{
224 return false;
225}
226
227template <is_produces T>
228constexpr bool appendOutput(std::vector<OutputSpec>& outputs, T& produces, uint32_t)
229{
230 outputs.emplace_back(produces.outputSpec);
231 return true;
232}
233
234template <is_produces_group T>
235constexpr bool appendOutput(std::vector<OutputSpec>& outputs, T& producesGroup, uint32_t hash)
236{
237 homogeneous_apply_refs<true>([&outputs, hash](auto& produces) { return appendOutput(outputs, produces, hash); }, producesGroup);
238 return true;
239}
240
241template <is_histogram_registry T>
242bool appendOutput(std::vector<OutputSpec>& outputs, T& hr, uint32_t hash)
243{
244 hr.setHash(hash);
245 outputs.emplace_back(hr.spec());
246 return true;
247}
248
249template <is_outputobj T>
250bool appendOutput(std::vector<OutputSpec>& outputs, T& obj, uint32_t hash)
251{
252 obj.setHash(hash);
253 outputs.emplace_back(obj.spec());
254 return true;
255}
256
257template <typename T>
258 requires(is_spawns<T> || is_builds<T> || is_defines<T>)
259bool appendOutput(std::vector<OutputSpec>& outputs, T& entity, uint32_t)
260{
261 outputs.emplace_back(entity.outputSpec);
262 return true;
263}
264
265template <typename T>
267{
268 return false;
269}
270
271template <is_histogram_registry T>
272bool postRunOutput(EndOfStreamContext& context, T& hr)
273{
274 auto& deviceSpec = context.services().get<o2::framework::DeviceSpec const>();
275 auto sendHistos = [deviceSpec, &context](HistogramRegistry const& self, TNamed* obj) mutable {
276 context.outputs().snapshot(self.ref(deviceSpec.inputTimesliceId, deviceSpec.maxInputTimeslices), *obj);
277 };
278 hr.apply(sendHistos);
279 hr.clean();
280 return true;
281}
282
283template <is_outputobj T>
284bool postRunOutput(EndOfStreamContext& context, T& obj)
285{
286 auto& deviceSpec = context.services().get<o2::framework::DeviceSpec const>();
287 context.outputs().snapshot(obj.ref(deviceSpec.inputTimesliceId, deviceSpec.maxInputTimeslices), *obj);
288 return true;
289}
290
291template <typename T>
293{
294 return false;
295}
296
297template <is_produces T>
298bool prepareOutput(ProcessingContext& context, T& produces)
299{
300 auto matcher = DataSpecUtils::asConcreteDataMatcher(produces.outputSpec);
301 produces.resetCursor(std::move(context.outputs().make<TableBuilder>(Output{matcher.origin, matcher.description, matcher.subSpec})));
302 return true;
303}
304
305template <is_produces_group T>
306bool prepareOutput(ProcessingContext& context, T& producesGroup)
307{
308 homogeneous_apply_refs<true>([&context](auto& produces) { return prepareOutput(context, produces); }, producesGroup);
309 return true;
310}
311
312template <is_spawns T>
313bool prepareOutput(ProcessingContext& context, T& spawns)
314{
315 using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::spawnable_t::originals[T::spawnable_t::originals.size() - 1].desc_hash>>::metadata;
316 auto originalTable = soa::ArrowHelpers::joinTables(framework::extractTablesFromRecord(context.inputs(), spawns.requiredInputs | std::views::transform([](auto const& input) { return DataSpecUtils::asConcreteDataMatcher(input); })));
317 if (originalTable->num_rows() == 0) {
318 originalTable = makeEmptyTable("EMPTY", typename metadata::base_table_t::persistent_columns_t{});
319 }
320 using D = o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>;
321
322 spawns.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<D>(originalTable,
323 o2::aod::label<metadata::extension_table_t::ref>(),
324 spawns.projectors.data(),
325 spawns.projector,
326 spawns.schema));
327 spawns.table = std::make_shared<typename T::spawnable_t::table_t>(soa::ArrowHelpers::joinTables({spawns.extension->asArrowTable(), originalTable}, std::span{T::spawnable_t::table_t::originalLabels}));
328 return true;
329}
330
331template <is_builds T>
332bool prepareOutput(ProcessingContext& context, T& builds)
333{
334 return builds.build(framework::extractTablesFromRecord(context.inputs(), builds.requiredInputs | std::views::transform([](auto const& input) { return DataSpecUtils::asConcreteDataMatcher(input); })));
335}
336
337template <is_defines T>
338bool prepareOutput(ProcessingContext& context, T& defines)
339 requires(T::delayed == false)
340{
341 using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::spawnable_t::originals[T::spawnable_t::originals.size() - 1].desc_hash>>::metadata;
342 auto originalTable = soa::ArrowHelpers::joinTables(framework::extractTablesFromRecord(context.inputs(), defines.requiredInputs | std::views::transform([](auto const& input) { return DataSpecUtils::asConcreteDataMatcher(input); })));
343 if (originalTable->num_rows() == 0) {
344 originalTable = makeEmptyTable("EMPTY", typename metadata::base_table_t::persistent_columns_t{});
345 }
346 if (defines.inputSchema == nullptr) {
347 defines.inputSchema = originalTable->schema();
348 }
349 using D = o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>;
350
351 defines.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<D>(originalTable,
352 o2::aod::label<metadata::extension_table_t::ref>(),
353 defines.projectors.data(),
354 defines.projector,
355 defines.schema));
356 defines.table = std::make_shared<typename T::spawnable_t::table_t>(soa::ArrowHelpers::joinTables({defines.extension->asArrowTable(), originalTable}, std::span{T::spawnable_t::table_t::originalLabels}));
357 return true;
358}
359
360template <typename T>
362{
363 return false;
364}
365
366template <is_defines T>
367 requires(T::delayed == true)
368bool prepareDelayedOutput(ProcessingContext& context, T& defines)
369{
370 if (defines.needRecompilation) {
371 defines.recompile();
372 }
373 using metadata = o2::aod::MetadataTrait<o2::aod::Hash<T::spawnable_t::ref.desc_hash>>::metadata;
374 auto originalTable = soa::ArrowHelpers::joinTables(framework::extractTablesFromRecord(context.inputs(), defines.requiredInputs | std::views::transform([](auto const& input) { return DataSpecUtils::asConcreteDataMatcher(input); })));
375 if (originalTable->num_rows() == 0) {
376 originalTable = makeEmptyTable<metadata::base_table_t::ref>();
377 }
378 if (defines.inputSchema == nullptr) {
379 defines.inputSchema = originalTable->schema();
380 }
381 using D = o2::aod::Hash<metadata::extension_table_t::ref.desc_hash>;
382
383 defines.extension = std::make_shared<typename T::extension_t>(o2::framework::spawner<D>(originalTable,
384 o2::aod::label<metadata::extension_table_t::ref>(),
385 defines.projectors.data(),
386 defines.projector,
387 defines.schema));
388 defines.table = std::make_shared<typename T::spawnable_t::table_t>(soa::ArrowHelpers::joinTables({defines.extension->asArrowTable(), originalTable}, std::span{T::spawnable_t::table_t::originalLabels}));
389 return true;
390}
391
392template <typename T>
394{
395 return false;
396}
397
398template <is_produces T>
399bool finalizeOutput(ProcessingContext&, T& produces)
400{
401 produces.setLabel(o2::aod::label<T::persistent_table_t::ref>());
402 produces.release();
403 return true;
404}
405
406template <is_produces_group T>
407bool finalizeOutput(ProcessingContext& context, T& producesGroup)
408{
409 homogeneous_apply_refs<true>([&context](auto& produces) { return finalizeOutput(context, produces); }, producesGroup);
410 return true;
411}
412
413template <is_spawns T>
414bool finalizeOutput(ProcessingContext& context, T& spawns)
415{
416 auto matcher = DataSpecUtils::asConcreteDataMatcher(spawns.outputSpec);
417 context.outputs().adopt(Output{matcher.origin, matcher.description, matcher.subSpec}, spawns.asArrowTable());
418 return true;
419}
420
421template <is_builds T>
422bool finalizeOutput(ProcessingContext& context, T& builds)
423{
424 auto matcher = DataSpecUtils::asConcreteDataMatcher(builds.outputSpec);
425 context.outputs().adopt(Output{matcher.origin, matcher.description, matcher.subSpec}, builds.asArrowTable());
426 return true;
427}
428
429template <is_defines T>
430bool finalizeOutput(ProcessingContext& context, T& defines)
431{
432 auto matcher = DataSpecUtils::asConcreteDataMatcher(defines.outputSpec);
433 context.outputs().adopt(Output{matcher.origin, matcher.description, matcher.subSpec}, defines.asArrowTable());
434 return true;
435}
436
438template <typename T>
439bool addService(std::vector<ServiceSpec>&, T&)
440{
441 return false;
442}
443
444template <is_service T>
445bool addService(std::vector<ServiceSpec>& specs, T&)
446{
448 auto p = typename T::service_t{};
449 auto loadableServices = PluginManager::parsePluginSpecString(p.loadSpec.c_str());
450 PluginManager::loadFromPlugin<ServiceSpec, ServicePlugin>(loadableServices, specs);
451 }
452 return true;
453}
454
455template <typename T>
457{
458 return false;
459}
460
461template <is_service T>
462bool prepareService(InitContext& context, T& service)
463{
464 using S = typename T::service_t;
465 if constexpr (requires { &S::instance; }) {
466 service.service = &(S::instance()); // Sigh...
467 return true;
468 } else {
469 service.service = &(context.services().get<S>());
470 return true;
471 }
472 return false;
473}
474
475template <typename T>
477{
478 return false;
479}
480
481template <is_service T>
482bool postRunService(EndOfStreamContext&, T& service)
483{
484 // FIXME: for the moment we only need endOfStream to be
485 // stateless. In the future we might want to pass it EndOfStreamContext
486 if constexpr (requires { &T::service_t::endOfStream; }) {
487 service.service->endOfStream();
488 return true;
489 }
490 return false;
491}
492
494template <typename T>
496{
497 return false;
498}
499
500template <expressions::is_filter T>
501bool updatePlaceholders(InitContext& context, T& filter)
502{
504 return true;
505}
506
507template <is_partition T>
508bool updatePlaceholders(InitContext& context, T& partition)
509{
510 partition.updatePlaceholders(context);
511 return true;
512}
513
514template <typename T>
515bool createExpressionTrees(std::vector<ExpressionInfo>&, T&)
516{
517 return false;
518}
519
520template <expressions::is_filter T>
521bool createExpressionTrees(std::vector<ExpressionInfo>& expressionInfos, T& filter)
522{
524 return true;
525}
526
527template <typename T>
529{
530 return false;
531}
532
533template <is_partition T>
534bool newDataframePartition(T& partition)
535{
536 partition.dataframeChanged = true;
537 return true;
538}
539
540template <typename P, typename... T>
541void setPartition(P&, T&...)
542{
543}
544
545template <is_partition P, typename... T>
546void setPartition(P& partition, T&... tables)
547{
548 ([&]() { if constexpr (std::same_as<typename P::content_t, T>) {partition.bindTable(tables);} }(), ...);
549}
550
551template <typename P, typename T>
553{
554}
555
556template <is_partition P, typename T>
557void bindInternalIndicesPartition(P& partition, T* table)
558{
559 if constexpr (o2::soa::is_binding_compatible_v<typename P::content_t, std::decay_t<T>>()) {
560 partition.bindInternalIndicesTo(table);
561 }
562}
563
564template <typename P, typename... T>
566{
567}
568
569template <is_partition P, typename... T>
570void bindExternalIndicesPartition(P& partition, T*... tables)
571{
572 partition.bindExternalIndices(tables...);
573}
574
576template <typename T>
578{
579 return false;
580}
581
582template <is_slice_cache T>
583bool initializeCache(ProcessingContext& context, T& cache)
584{
585 if (cache.ptr == nullptr) {
586 cache.ptr = &context.services().get<ArrowTableSlicingCache>();
587 }
588 return true;
589}
590
592template <typename C, typename TG, typename... Ts>
593void setGroupedCombination(C&, TG&, Ts&...)
594{
595}
596
597template <is_combinations_generator C, typename TG, typename... Ts>
598static void setGroupedCombination(C& comb, TG& grouping, std::tuple<Ts...>& associated)
599{
600 if constexpr (std::same_as<typename C::g_t, std::decay_t<TG>>) {
601 comb.setTables(grouping, associated);
602 }
603}
604
606template <typename T>
607 requires(!is_preslice<T> && !is_preslice_group<T>)
609{
610 return false;
611}
612
613template <is_preslice T>
614bool replaceOrigin(T& preslice, header::DataOrigin const& newOrigin = header::DataOrigin{"AOD"})
615{
616 if ((T::target_t::binding_origin == "AOD"_h) && (newOrigin != header::DataOrigin{"AOD"})) {
617 preslice.bindingKey.matcher = framework::replaceOrigin(preslice.bindingKey.matcher, newOrigin);
618 return true;
619 }
620 return false;
621}
622
623template <is_preslice_group T>
624bool replaceOrigin(T& presliceGroup, header::DataOrigin const& newOrigin)
625{
626 homogeneous_apply_refs<true>([&newOrigin](auto& preslice) { return replaceOrigin(preslice, newOrigin); }, presliceGroup);
627 return true;
628}
629
630template <typename T>
631 requires(!is_preslice<T> && !is_preslice_group<T>)
633{
634 return false;
635}
636
637template <is_preslice T>
638 requires std::same_as<typename T::policy_t, framework::PreslicePolicySorted>
639bool registerCache(T& preslice, Cache& bsks, Cache&)
640{
641 if constexpr (T::optional) {
642 if (preslice.binding == "[MISSING]") {
643 return true;
644 }
645 }
646 auto locate = std::find(bsks.begin(), bsks.end(), preslice.getBindingKey());
647 if (locate == bsks.end()) {
648 bsks.emplace_back(preslice.getBindingKey());
649 } else if (locate->enabled == false) {
650 locate->enabled = true;
651 }
652 return true;
653}
654
655template <is_preslice T>
656 requires std::same_as<typename T::policy_t, framework::PreslicePolicyGeneral>
657bool registerCache(T& preslice, Cache&, Cache& bsksU)
658{
659 if constexpr (T::optional) {
660 if (preslice.binding == "[MISSING]") {
661 return true;
662 }
663 }
664 auto locate = std::find(bsksU.begin(), bsksU.end(), preslice.getBindingKey());
665 if (locate == bsksU.end()) {
666 bsksU.emplace_back(preslice.getBindingKey());
667 } else if (locate->enabled == false) {
668 locate->enabled = true;
669 }
670 return true;
671}
672
673template <is_preslice_group T>
674bool registerCache(T& presliceGroup, Cache& bsks, Cache& bsksU)
675{
676 homogeneous_apply_refs<true>([&bsks, &bsksU](auto& preslice) { return registerCache(preslice, bsks, bsksU); }, presliceGroup);
677 return true;
678}
679
680template <typename T>
683{
684 return false;
685}
686
687template <is_preslice T>
688static bool updateSliceInfo(T& preslice, ArrowTableSlicingCache& cache)
689 requires std::same_as<typename T::policy_t, framework::PreslicePolicySorted>
690{
691 if constexpr (T::optional) {
692 if (preslice.binding == "[MISSING]") {
693 return true;
694 }
695 }
696 preslice.updateSliceInfo(cache.getCacheFor(preslice.getBindingKey()));
697 return true;
698}
699
700template <is_preslice T>
701static bool updateSliceInfo(T& preslice, ArrowTableSlicingCache& cache)
702 requires std::same_as<typename T::policy_t, framework::PreslicePolicyGeneral>
703{
704 if constexpr (T::optional) {
705 if (preslice.binding == "[MISSING]") {
706 return true;
707 }
708 }
709 preslice.updateSliceInfo(cache.getCacheUnsortedFor(preslice.getBindingKey()));
710 return true;
711}
712
713template <is_preslice_group T>
714static bool updateSliceInfo(T& presliceGroup, ArrowTableSlicingCache& cache)
715{
716 homogeneous_apply_refs<true>([&cache](auto& preslice) { return updateSliceInfo(preslice, cache); }, presliceGroup);
717 return true;
718}
719
721template <typename T>
722static bool setProcessSwitch(std::pair<std::string, bool>, T&)
723{
724 return false;
725}
726
727template <is_process_configurable T>
728static bool setProcessSwitch(std::pair<std::string, bool> setting, T& pc)
729{
730 if (pc.name == setting.first) {
731 pc.value = setting.second;
732 return true;
733 }
734 return false;
735}
736
737} // namespace analysis_task_parsers
738} // namespace o2::framework
739
740#endif // ANALYSISMANAGERS_H
std::vector< framework::ConcreteDataMatcher > matchers
uint32_t hash
constexpr uint32_t runtime_hash(char const *str)
void snapshot(const Output &spec, T const &object)
decltype(auto) make(const Output &spec, Args... args)
ServiceRegistryRef services()
Definition InitContext.h:34
ConfigParamRegistry const & options()
Definition InitContext.h:33
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
const GLfloat * m
Definition glcorearb.h:4066
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
constexpr framework::ConcreteDataMatcher matcher()
Definition ASoA.h:380
bool prepareService(InitContext &, T &)
void setGroupedCombination(C &, TG &, Ts &...)
Combinations handling.
bool initializeCache(ProcessingContext &, T &)
Cache handling.
bool requestInputs(std::vector< InputSpec > &, T &, header::DataOrigin)
bool prepareDelayedOutput(ProcessingContext &, T &)
bool prepareOption(InitContext &, O &)
bool finalizeOutput(ProcessingContext &, T &)
bool registerCache(T &, Cache &, Cache &)
bool postRunOutput(EndOfStreamContext &, T &)
bool createExpressionTrees(std::vector< ExpressionInfo > &, T &)
bool prepareOutput(ProcessingContext &, T &)
bool appendOption(std::vector< ConfigParamSpec > &, O &)
Options handling.
bool updateSliceInfo(T &, ArrowTableSlicingCache &)
bool appendCondition(std::vector< InputSpec > &, C &)
Conditions handling.
bool postRunService(EndOfStreamContext &, T &)
bool addService(std::vector< ServiceSpec > &, T &)
Service handling.
bool updatePlaceholders(InitContext &, T &)
Filter handling.
constexpr bool appendOutput(std::vector< OutputSpec > &, T &, uint32_t)
Outputs handling.
bool newDataframeCondition(InputRecord &, C &)
bool updateOutputSpec(T &, header::DataOrigin)
void updateExpressionInfos(expressions::Filter const &filter, std::vector< ExpressionInfo > &eInfos)
Function for attaching gandiva filters to to compatible task inputs.
void updatePlaceholders(Filter &filter, InitContext &context)
Update placeholder nodes from context.
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
std::vector< Entry > Cache
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
ConfigParamSpec replaceOrigin(ConfigParamSpec &source, std::string const &originStr)
Descriptor< gSizeDataOriginString > DataOrigin
Definition DataHeader.h:550
header::DataHeader::SubSpecificationType subSpec
static bool appendOption(std::vector< ConfigParamSpec > &options, Configurable< T, K, IP > &what)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static void updateInputList(std::vector< InputSpec > &list, InputSpec &&input)
Updates list of InputSpecs by merging metadata.
std::vector< OutputRoute > outputs
Definition DeviceSpec.h:63
static std::vector< LoadablePlugin > parsePluginSpecString(char const *str)
Parse a comma separated list of <library>:<plugin-name> plugin declarations.
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables)
Definition ASoA.cxx:72