Project
Loading...
Searching...
No Matches
AnalysisHelpers.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef o2_framework_AnalysisHelpers_H_DEFINED
12#define o2_framework_AnalysisHelpers_H_DEFINED
13
14#include "ConfigParamSpec.h"
15#include "Framework/ASoA.h"
18#include "Framework/InputSpec.h"
19#include "Framework/Output.h"
21#include "Framework/OutputRef.h"
23#include "Framework/Plugins.h"
26#include "Framework/Traits.h"
27
28#include <string>
29namespace o2::soa
30{
32 std::string label;
33 std::string columnLabel;
35 int pos;
36 std::shared_ptr<arrow::DataType> type = [](IndexKind kind) -> std::shared_ptr<arrow::DataType> {
37 switch (kind) {
40 return arrow::int32();
42 return arrow::fixed_size_list(arrow::int32(), 2);
44 return arrow::list(arrow::int32());
45 default:
46 return {nullptr};
47 }
48 }(kind);
49
50 auto operator==(IndexRecord const& other) const
51 {
52 return (this->label == other.label) && (this->columnLabel == other.columnLabel) && (this->kind == other.kind) && (this->pos == other.pos);
53 }
54
55 std::shared_ptr<arrow::Field> field() const
56 {
57 return std::make_shared<arrow::Field>(columnLabel, type);
58 }
59};
60
62 static std::vector<framework::IndexColumnBuilder> makeBuilders(std::vector<std::shared_ptr<arrow::Table>>&& tables, std::vector<soa::IndexRecord> const& records);
63 static void resetBuilders(std::vector<framework::IndexColumnBuilder>& builders, std::vector<std::shared_ptr<arrow::Table>>&& tables);
64
65 static std::shared_ptr<arrow::Table> materialize(std::vector<framework::IndexColumnBuilder>& builders, std::vector<std::shared_ptr<arrow::Table>>&& tables, std::vector<soa::IndexRecord> const& records, std::shared_ptr<arrow::Schema> const& schema, bool exclusive);
66};
67} // namespace o2::soa
68
69namespace o2::framework
70{
71std::shared_ptr<arrow::Table> makeEmptyTableImpl(const char* name, std::shared_ptr<arrow::Schema>& schema);
72
73template <soa::is_table T>
74auto makeEmptyTable(const char* name)
75{
76 auto schema = std::make_shared<arrow::Schema>(soa::createFieldsFromColumns(typename T::table_t::persistent_columns_t{}));
78}
79
80template <soa::TableRef R>
82{
83 auto schema = std::make_shared<arrow::Schema>(soa::createFieldsFromColumns(typename aod::MetadataTrait<aod::Hash<R.desc_hash>>::metadata::persistent_columns_t{}));
84 return makeEmptyTableImpl(o2::aod::label<R>(), schema);
85}
86
87template <typename... Cs>
89{
90 auto schema = std::make_shared<arrow::Schema>(soa::createFieldsFromColumns(p));
92}
93
94template <aod::is_aod_hash D>
95auto makeEmptyTable(const char* name)
96{
97 auto schema = std::make_shared<arrow::Schema>(soa::createFieldsFromColumns(typename aod::MetadataTrait<D>::metadata::persistent_columns_t{}));
99}
100
101std::shared_ptr<arrow::Table> spawnerHelper(std::shared_ptr<arrow::Table> const& fullTable, std::shared_ptr<arrow::Schema> newSchema, size_t nColumns,
102 expressions::Projector* projectors, const char* name, std::shared_ptr<gandiva::Projector>& projector);
103
104std::shared_ptr<arrow::Table> spawnerHelper(std::shared_ptr<arrow::Table> const& fullTable, std::shared_ptr<arrow::Schema> newSchema,
105 const char* name, size_t nColumns,
106 const std::shared_ptr<gandiva::Projector>& projector);
107
109template <aod::is_aod_hash D>
110 requires(soa::has_extension<typename o2::aod::MetadataTrait<D>::metadata>)
111auto spawner(std::shared_ptr<arrow::Table> const& fullTable, const char* name, o2::framework::expressions::Projector* projectors, std::shared_ptr<gandiva::Projector>& projector, std::shared_ptr<arrow::Schema> const& schema)
112{
113 if (fullTable->num_rows() == 0) {
114 return makeEmptyTable<D>(name);
115 }
116 constexpr auto Ncol = []<typename M>() {
118 return framework::pack_size(typename M::placeholders_pack_t{});
119 } else {
120 return framework::pack_size(typename M::expression_pack_t{});
121 }
122 }.template operator()<typename o2::aod::MetadataTrait<D>::metadata>();
123 return spawnerHelper(fullTable, schema, Ncol, projectors, name, projector);
124}
125
126template <typename... C>
127auto spawner(framework::pack<C...>, std::vector<std::shared_ptr<arrow::Table>>&& tables, const char* name, expressions::Projector* projectors, std::shared_ptr<gandiva::Projector>& projector, std::shared_ptr<arrow::Schema> const& schema)
128{
129 std::array<const char*, 1> labels{"original"};
130 auto fullTable = soa::ArrowHelpers::joinTables(std::move(tables), std::span<const char* const>{labels});
131 if (fullTable->num_rows() == 0) {
133 }
134 return spawnerHelper(fullTable, schema, sizeof...(C), projectors, name, projector);
135}
136
137std::string serializeProjectors(std::vector<framework::expressions::Projector>& projectors);
138std::string serializeSchema(std::shared_ptr<arrow::Schema> schema);
139std::string serializeIndexRecords(std::vector<o2::soa::IndexRecord>& irs);
140std::vector<std::shared_ptr<arrow::Table>> extractSources(ProcessingContext& pc, std::vector<std::string> const& labels);
141
142struct Spawner {
143 std::string binding;
144 std::vector<std::string> labels;
145 std::vector<std::shared_ptr<gandiva::Expression>> expressions;
146 std::shared_ptr<gandiva::Projector> projector = nullptr;
147 std::shared_ptr<arrow::Schema> schema = nullptr;
148 std::shared_ptr<arrow::Schema> inputSchema = nullptr;
149
153
154 std::shared_ptr<arrow::Table> materialize(ProcessingContext& pc) const;
155};
156
157struct Builder {
159 std::vector<std::string> labels;
160 std::vector<o2::soa::IndexRecord> records;
161 std::shared_ptr<arrow::Schema> outputSchema;
165
166 std::shared_ptr<std::vector<framework::IndexColumnBuilder>> builders = nullptr;
167
168 std::shared_ptr<arrow::Table> materialize(ProcessingContext& pc);
169};
170} // namespace o2::framework
171
172namespace o2::soa
173{
174template <TableRef R>
176{
178 std::string{"input:"} + o2::aod::label<R>(),
180 aod::sourceSpec<R>(),
181 {"\"\""}};
182}
183
184template <TableRef R>
185constexpr auto tableRef2Schema()
186{
188 std::string{"input-schema:"} + o2::aod::label<R>(),
191 {"\"\""}};
192}
193
194namespace
195{
196template <soa::with_sources T>
197inline constexpr auto getSources()
198{
199 return []<size_t N, std::array<soa::TableRef, N> refs>() {
200 return []<size_t... Is>(std::index_sequence<Is...>) {
201 return std::vector{soa::tableRef2ConfigParamSpec<refs[Is]>()...};
202 }(std::make_index_sequence<N>());
203 }.template operator()<T::sources.size(), T::sources>();
204}
205
206template <soa::with_sources T>
207inline constexpr auto getSourceSchemas()
208{
209 return []<size_t N, std::array<soa::TableRef, N> refs>() {
210 return []<size_t... Is>(std::index_sequence<Is...>) {
211 return std::vector{soa::tableRef2Schema<refs[Is]>()...};
212 }(std::make_index_sequence<N>());
213 }.template operator()<T::sources.size(), T::sources>();
214}
215
216template <soa::with_ccdb_urls T>
217inline constexpr auto getCCDBUrls()
218{
219 std::vector<framework::ConfigParamSpec> result;
220 for (size_t i = 0; i < T::ccdb_urls.size(); ++i) {
221 result.push_back({std::string{"ccdb:"} + std::string{T::ccdb_bindings[i]},
223 T::ccdb_urls[i],
224 {"\"\""}});
225 }
226 return result;
227}
228
229template <typename T>
230 requires(std::same_as<T, int>)
231consteval IndexKind getIndexKind()
232{
234}
235
236template <typename T>
237 requires(std::is_bounded_array_v<T>)
238consteval IndexKind getIndexKind()
239{
240 return IndexKind::IdxSlice;
241}
242
243template <typename T>
244 requires(framework::is_specialization_v<T, std::vector>)
245consteval IndexKind getIndexKind()
246{
247 return IndexKind::IdxArray;
248}
249
250template <soa::with_index_pack T>
251inline constexpr auto getIndexMapping()
252{
253 std::vector<IndexRecord> idx;
254 using indices = T::index_pack_t;
255 using Key = T::Key;
256 [&idx]<size_t... Is>(std::index_sequence<Is...>) mutable {
257 constexpr auto refs = T::sources;
258 ([&idx]<TableRef ref, typename C>() mutable {
259 constexpr auto pos = o2::aod::MetadataTrait<o2::aod::Hash<ref.desc_hash>>::metadata::template getIndexPosToKey<Key>();
260 if constexpr (pos == -1) {
261 idx.emplace_back(o2::aod::label<ref>(), C::columnLabel(), IndexKind::IdxSelf, pos);
262 } else {
263 idx.emplace_back(o2::aod::label<ref>(), C::columnLabel(), getIndexKind<typename C::type>(), pos);
264 }
265 }.template operator()<refs[Is], typename framework::pack_element_t<Is, indices>>(),
266 ...);
267 }(std::make_index_sequence<framework::pack_size(indices{})>());
268 ;
269 return idx;
270}
271
272template <soa::with_sources T>
273constexpr auto getInputMetadata() -> std::vector<framework::ConfigParamSpec>
274{
275 std::vector<framework::ConfigParamSpec> inputMetadata;
276
277 auto inputSources = getSources<T>();
278 std::sort(inputSources.begin(), inputSources.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name < b.name; });
279 auto last = std::unique(inputSources.begin(), inputSources.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name == b.name; });
280 inputSources.erase(last, inputSources.end());
281 inputMetadata.insert(inputMetadata.end(), inputSources.begin(), inputSources.end());
282
283 auto inputSchemas = getSourceSchemas<T>();
284 std::sort(inputSchemas.begin(), inputSchemas.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name < b.name; });
285 last = std::unique(inputSchemas.begin(), inputSchemas.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name == b.name; });
286 inputSchemas.erase(last, inputSchemas.end());
287 inputMetadata.insert(inputMetadata.end(), inputSchemas.begin(), inputSchemas.end());
288
289 return inputMetadata;
290}
291
292template <typename T>
293 requires(!soa::with_sources<T>)
294constexpr auto getInputMetadata() -> std::vector<framework::ConfigParamSpec>
295{
296 return {};
297}
298
299template <soa::with_ccdb_urls T>
300constexpr auto getCCDBMetadata() -> std::vector<framework::ConfigParamSpec>
301{
302 std::vector<framework::ConfigParamSpec> results = getCCDBUrls<T>();
303 std::sort(results.begin(), results.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name < b.name; });
304 auto last = std::unique(results.begin(), results.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name == b.name; });
305 results.erase(last, results.end());
306 return results;
307}
308
309template <typename T>
310constexpr auto getCCDBMetadata() -> std::vector<framework::ConfigParamSpec>
311{
312 return {};
313}
314
315template <soa::with_expression_pack T>
316constexpr auto getExpressionMetadata() -> std::vector<framework::ConfigParamSpec>
317{
318 using expression_pack_t = T::expression_pack_t;
319
320 auto projectors = []<typename... C>(framework::pack<C...>) -> std::vector<framework::expressions::Projector> {
321 std::vector<framework::expressions::Projector> result;
322 (result.emplace_back(std::move(C::Projector())), ...);
323 return result;
324 }(expression_pack_t{});
325
327 return {framework::ConfigParamSpec{"projectors", framework::VariantType::String, json, {"\"\""}}};
328}
329
330template <typename T>
331 requires(!soa::with_expression_pack<T>)
332constexpr auto getExpressionMetadata() -> std::vector<framework::ConfigParamSpec>
333{
334 return {};
335}
336
337template <soa::with_index_pack T>
338constexpr auto getIndexMetadata() -> std::vector<framework::ConfigParamSpec>
339{
340 auto map = getIndexMapping<T>();
341 return {framework::ConfigParamSpec{"index-records", framework::VariantType::String, framework::serializeIndexRecords(map), {"\"\""}},
342 {framework::ConfigParamSpec{"index-exclusive", framework::VariantType::Bool, T::exclusive, {"\"\""}}}};
343}
344
345template <typename T>
346 requires(!soa::with_index_pack<T>)
347constexpr auto getIndexMetadata() -> std::vector<framework::ConfigParamSpec>
348{
349 return {};
350}
351
352} // namespace
353
354template <TableRef R>
355constexpr auto tableRef2InputSpec()
356{
357 std::vector<framework::ConfigParamSpec> metadata;
358 auto m = getInputMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
359 metadata.insert(metadata.end(), m.begin(), m.end());
360 auto ccdbMetadata = getCCDBMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
361 metadata.insert(metadata.end(), ccdbMetadata.begin(), ccdbMetadata.end());
362 auto p = getExpressionMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
363 metadata.insert(metadata.end(), p.begin(), p.end());
364 auto idx = getIndexMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
365 metadata.insert(metadata.end(), idx.begin(), idx.end());
366 if constexpr (!soa::with_ccdb_urls<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>) {
368 }
369
371 o2::aod::label<R>(),
372 o2::aod::origin<R>(),
373 o2::aod::description(o2::aod::signature<R>()),
374 R.version,
375 framework::Lifetime::Timeframe,
376 metadata};
377}
378
379template <TableRef R>
380constexpr auto tableRef2OutputSpec()
381{
383 framework::OutputLabel{o2::aod::label<R>()},
384 o2::aod::origin<R>(),
385 o2::aod::description(o2::aod::signature<R>()),
386 R.version};
387}
388
389template <TableRef R>
390constexpr auto tableRef2Output()
391{
392 return framework::Output{
393 o2::aod::origin<R>(),
394 o2::aod::description(o2::aod::signature<R>()),
395 R.version};
396}
397
398template <TableRef R>
399constexpr auto tableRef2OutputRef()
400{
402 o2::aod::label<R>(),
403 R.version};
404}
405} // namespace o2::soa
406
407namespace o2::framework
408{
409class TableConsumer;
410
414template <typename T>
416
417template <typename T>
418concept is_enumerated_iterator = requires(T t) { t.globalIndex(); };
419
420template <is_producable T>
422 public:
423 using persistent_table_t = decltype([]() { if constexpr (soa::is_iterator<T>) { return typename T::parent_t{nullptr}; } else { return T{nullptr}; } }());
424 using cursor_t = decltype(std::declval<TableBuilder>().cursor<persistent_table_t>());
425
426 template <typename... Ts>
427 void operator()(Ts&&... args)
428 requires(sizeof...(Ts) == framework::pack_size(typename persistent_table_t::persistent_columns_t{}))
429 {
430 ++mCount;
431 cursor(0, extract(args)...);
432 }
433
435 int64_t lastIndex()
436 {
437 return mCount;
438 }
439
441 {
442 mBuilder = std::move(builder);
443 cursor = std::move(FFL(mBuilder->cursor<persistent_table_t>()));
444 mCount = -1;
445 return true;
446 }
447
448 void setLabel(const char* label)
449 {
450 mBuilder->setLabel(label);
451 }
452
455 void reserve(int64_t size)
456 {
457 mBuilder->reserve(typename persistent_table_t::column_types{}, size);
458 }
459
460 void release()
461 {
462 mBuilder.release();
463 }
464
465 decltype(FFL(std::declval<cursor_t>())) cursor;
466
467 private:
468 static decltype(auto) extract(is_enumerated_iterator auto const& arg)
469 {
470 return arg.globalIndex();
471 }
472
473 template <typename A>
475 static decltype(auto) extract(A&& arg)
476 {
477 return arg;
478 }
479
483 LifetimeHolder<TableBuilder> mBuilder = nullptr;
484 int64_t mCount = -1;
485};
486
488template <soa::is_table T>
489consteval auto typeWithRef() -> T
490{
491}
492
493template <soa::is_iterator T>
494consteval auto typeWithRef() -> typename T::parent_t
495{
496}
497
498template <typename T>
499 requires soa::is_table<T> || soa::is_iterator<T>
501 using table_t = decltype(typeWithRef<T>());
502 using metadata = aod::MetadataTrait<o2::aod::Hash<table_t::ref.desc_hash>>::metadata;
503
504 static OutputSpec const spec()
505 {
506 return OutputSpec{OutputLabel{aod::label<table_t::ref>()}, o2::aod::origin<table_t::ref>(), o2::aod::description(o2::aod::signature<table_t::ref>()), table_t::ref.version};
507 }
508
509 static OutputRef ref()
510 {
511 return OutputRef{aod::label<table_t::ref>(), table_t::ref.version};
512 }
513};
514
519template <is_producable T>
521};
522
523template <typename T>
524concept is_produces = requires(T t) { typename T::cursor_t; typename T::persistent_table_t; &T::cursor; };
525
535};
536
537template <typename T>
538concept is_produces_group = std::derived_from<T, ProducesGroup>;
539
541template <soa::is_metadata M, soa::TableRef Ref>
543 using metadata = M;
544 constexpr static auto sources = M::sources;
545
546 template <soa::TableRef R>
547 static auto base_spec()
548 {
549 return soa::tableRef2InputSpec<R>();
550 }
551
552 static auto base_specs()
553 {
554 return []<size_t... Is>(std::index_sequence<Is...>) {
555 return std::array{base_spec<sources[Is]>()...};
556 }(std::make_index_sequence<sources.size()>{});
557 }
558
559 static constexpr auto spec()
560 {
561 return soa::tableRef2OutputSpec<Ref>();
562 }
563
564 static constexpr auto output()
565 {
566 return soa::tableRef2Output<Ref>();
567 }
568
569 static constexpr auto ref()
570 {
571 return soa::tableRef2OutputRef<Ref>();
572 }
573};
574
577template <typename T>
578concept is_spawnable = soa::has_metadata<aod::MetadataTrait<o2::aod::Hash<T::ref.desc_hash>>> && soa::has_extension<typename aod::MetadataTrait<o2::aod::Hash<T::ref.desc_hash>>::metadata>;
579
580template <typename T>
581concept is_dynamically_spawnable = soa::has_metadata<aod::MetadataTrait<o2::aod::Hash<T::ref.desc_hash>>> && soa::has_configurable_extension<typename aod::MetadataTrait<o2::aod::Hash<T::ref.desc_hash>>::metadata>;
582
583template <is_spawnable T>
584constexpr auto transformBase()
585{
586 using metadata = typename aod::MetadataTrait<o2::aod::Hash<T::ref.desc_hash>>::metadata;
588}
589
590template <is_spawnable T>
591struct Spawns : decltype(transformBase<T>()) {
592 using spawnable_t = T;
593 using metadata = decltype(transformBase<T>())::metadata;
594 using extension_t = typename metadata::extension_table_t;
595 using expression_pack_t = typename metadata::expression_pack_t;
596 static constexpr size_t N = framework::pack_size(expression_pack_t{});
597
598 typename T::table_t* operator->()
599 {
600 return table.get();
601 }
602 typename T::table_t const& operator*() const
603 {
604 return *table;
605 }
606
608 {
609 return extension->asArrowTable();
610 }
611
612 std::shared_ptr<typename T::table_t> table = nullptr;
613 std::shared_ptr<extension_t> extension = nullptr;
614 std::array<o2::framework::expressions::Projector, N> projectors = []<typename... C>(framework::pack<C...>) -> std::array<expressions::Projector, sizeof...(C)>
615 {
616 return {{std::move(C::Projector())...}};
617 }
619 std::shared_ptr<gandiva::Projector> projector = nullptr;
620 std::shared_ptr<arrow::Schema> schema = []() {
621 auto s = std::make_shared<arrow::Schema>(o2::soa::createFieldsFromColumns(expression_pack_t{}));
622 s->WithMetadata(std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{"label"}}, std::vector{std::string{o2::aod::label<T::ref>()}}));
623 return s;
624 }();
625};
626
627template <typename T>
628concept is_spawns = requires(T t) {
629 typename T::metadata;
630 typename T::expression_pack_t;
631 requires std::same_as<decltype(t.projector), std::shared_ptr<gandiva::Projector>>;
632};
633
638
639template <is_dynamically_spawnable T, bool DELAYED = false>
640struct Defines : decltype(transformBase<T>()) {
641 static constexpr bool delayed = DELAYED;
642 using spawnable_t = T;
643 using metadata = decltype(transformBase<T>())::metadata;
644 using extension_t = typename metadata::extension_table_t;
645 using placeholders_pack_t = typename metadata::placeholders_pack_t;
646 static constexpr size_t N = framework::pack_size(placeholders_pack_t{});
647
648 typename T::table_t* operator->()
649 {
650 return table.get();
651 }
652 typename T::table_t const& operator*() const
653 {
654 return *table;
655 }
656
658 {
659 return extension->asArrowTable();
660 }
661 std::shared_ptr<typename T::table_t> table = nullptr;
662 std::shared_ptr<extension_t> extension = nullptr;
663
664 std::array<o2::framework::expressions::Projector, N> projectors;
665 std::shared_ptr<gandiva::Projector> projector = nullptr;
666 std::shared_ptr<arrow::Schema> schema = []() {
667 auto s = std::make_shared<arrow::Schema>(o2::soa::createFieldsFromColumns(placeholders_pack_t{}));
668 s->WithMetadata(std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{"label"}}, std::vector{std::string{o2::aod::label<T::ref>()}}));
669 return s;
670 }();
671 std::shared_ptr<arrow::Schema> inputSchema = nullptr;
672
673 bool needRecompilation = false;
674
679};
680
681template <is_dynamically_spawnable T>
683
684template <typename T>
685concept is_defines = requires(T t) {
686 typename T::metadata;
687 typename T::placeholders_pack_t;
688 requires std::same_as<decltype(t.projector), std::shared_ptr<gandiva::Projector>>;
689 requires std::same_as<decltype(t.needRecompilation), bool>;
690 &T::recompile;
691};
692
697struct Exclusive {
698};
699struct Sparse {
700};
701
703
704template <soa::is_index_table T>
705constexpr auto transformBase()
706{
707 using metadata = typename aod::MetadataTrait<o2::aod::Hash<T::ref.desc_hash>>::metadata;
709}
710
711template <soa::is_index_table T>
712struct Builds : decltype(transformBase<T>()) {
713 using buildable_t = T;
714 using metadata = decltype(transformBase<T>())::metadata;
715 using Key = metadata::Key;
716 using H = typename T::first_t;
717 using Ts = typename T::rest_t;
718 using index_pack_t = metadata::index_pack_t;
719
720 std::shared_ptr<arrow::Schema> outputSchema = []() { return std::make_shared<arrow::Schema>(soa::createFieldsFromColumns(index_pack_t{}))->WithMetadata(std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{"label"}}, std::vector{std::string{o2::aod::label<T::ref>()}})); }();
721
722 std::vector<soa::IndexRecord> map = soa::getIndexMapping<metadata>();
723
724 std::vector<framework::IndexColumnBuilder> builders;
725
727 {
728 return table.get();
729 }
730 T const& operator*() const
731 {
732 return *table;
733 }
734
736 {
737 return table->asArrowTable();
738 }
739 std::shared_ptr<T> table = nullptr;
740
741 constexpr auto pack()
742 {
743 return index_pack_t{};
744 }
745
746 auto build(std::vector<std::shared_ptr<arrow::Table>>&& tables)
747 {
748 this->table = std::make_shared<T>(soa::IndexBuilder::materialize(builders, std::forward<std::vector<std::shared_ptr<arrow::Table>>>(tables), map, outputSchema, metadata::exclusive));
749 return (this->table != nullptr);
750 }
751};
752
753template <typename T>
754concept is_builds = requires(T t) {
755 typename T::metadata;
756 typename T::Key;
757 requires std::same_as<decltype(t.map), std::vector<soa::IndexRecord>>;
758};
759
767template <typename T>
768struct OutputObj {
769 using obj_t = T;
770
772 : object(std::make_shared<T>(t)),
773 label(t.GetName()),
774 policy{policy_},
775 sourceType{sourceType_},
776 mTaskHash{0}
777 {
778 }
779
781 : object(nullptr),
782 label(label_),
783 policy{policy_},
784 sourceType{sourceType_},
785 mTaskHash{0}
786 {
787 }
788
789 void setObject(T const& t)
790 {
791 object = std::make_shared<T>(t);
792 object->SetName(label.c_str());
793 }
794
795 void setObject(T&& t)
796 {
797 object = std::make_shared<T>(t);
798 object->SetName(label.c_str());
799 }
800
801 void setObject(T* t)
802 {
803 object.reset(t);
804 object->SetName(label.c_str());
805 }
806
807 void setObject(std::shared_ptr<T> t)
808 {
809 object = t;
810 object->SetName(label.c_str());
811 }
812
813 void setHash(uint32_t hash)
814 {
815 mTaskHash = hash;
816 }
817
820 {
822 auto lhash = runtime_hash(label.c_str());
823 std::memset(desc.str, '_', 16);
824 std::stringstream s;
825 s << std::hex << lhash;
826 s << std::hex << mTaskHash;
827 s << std::hex << reinterpret_cast<uint64_t>(this);
828 std::memcpy(desc.str, s.str().c_str(), 12);
829 return OutputSpec{OutputLabel{label}, "ATSK", desc, 0, Lifetime::QA};
830 }
831
833 {
834 return object.get();
835 }
836
838 {
839 return *object.get();
840 }
841
842 OutputRef ref(uint16_t index, uint16_t max)
843 {
844 return OutputRef{std::string{label}, 0,
846 }
847
848 std::shared_ptr<T> object;
849 std::string label;
852 uint32_t mTaskHash;
853};
854
855template <typename T>
856concept is_outputobj = requires(T t) {
857 &T::setHash;
858 &T::spec;
859 &T::ref;
860 requires std::same_as<decltype(t.operator->()), typename T::obj_t*>;
861 requires std::same_as<decltype(t.object), std::shared_ptr<typename T::obj_t>>;
862};
863
867template <typename T>
868struct Service {
869 using service_t = T;
871
872 decltype(auto) operator->() const
873 {
875 return service->get();
876 } else {
877 return service;
878 }
879 }
880};
881
882template <typename T>
883concept is_service = requires(T t) {
884 requires std::same_as<decltype(t.service), typename T::service_t*>;
885 &T::operator->;
886};
887
889{
890 return std::make_unique<o2::soa::Filtered<std::decay_t<decltype(table)>>>(std::vector{table}, std::forward<soa::SelectionVector>(selection));
891}
892
894{
895 return std::make_unique<o2::soa::Filtered<std::decay_t<decltype(table)>>>(std::vector{table.asArrowTable()}, std::forward<soa::SelectionVector>(selection));
896}
897
898void initializePartitionCaches(std::set<uint32_t> const& hashes, std::shared_ptr<arrow::Schema> const& schema, expressions::Filter const& filter, gandiva::NodePtr& tree, gandiva::FilterPtr& gfilter);
899
900template <typename T>
901struct Partition {
902 using content_t = T;
903 Partition(expressions::Node&& filter_) : filter{std::forward<expressions::Node>(filter_)}
904 {
905 }
906
907 Partition(expressions::Node&& filter_, T const& table)
908 : filter{std::forward<expressions::Node>(filter_)}
909 {
910 setTable(table);
911 }
912
913 void intializeCaches(std::set<uint32_t> const& hashes, std::shared_ptr<arrow::Schema> const& schema)
914 {
916 }
917
918 void bindTable(T const& table)
919 {
920 intializeCaches(T::table_t::hashes(), table.asArrowTable()->schema());
921 if (dataframeChanged) {
923 dataframeChanged = false;
924 }
925 }
926
927 template <typename... Ts>
928 void bindExternalIndices(Ts*... tables)
929 {
930 if (mFiltered != nullptr) {
931 mFiltered->bindExternalIndices(tables...);
932 }
933 }
934
935 template <typename E>
937 {
938 if (mFiltered != nullptr) {
939 mFiltered->bindInternalIndicesTo(ptr);
940 }
941 }
942
944 {
946 }
947
948 [[nodiscard]] std::shared_ptr<arrow::Table> asArrowTable() const
949 {
950 return mFiltered->asArrowTable();
951 }
952
954 {
955 return mFiltered.get();
956 }
957
958 template <typename T1>
959 [[nodiscard]] auto rawSliceBy(o2::framework::Preslice<T1> const& container, int value) const
960 {
961 return mFiltered->rawSliceBy(container, value);
962 }
963
965 {
966 return mFiltered->sliceByCached(node, value, cache);
967 }
968
970 {
971 return mFiltered->sliceByCachedUnsorted(node, value, cache);
972 }
973
974 template <typename T1, typename Policy, bool OPT>
975 [[nodiscard]] auto sliceBy(o2::framework::PresliceBase<T1, Policy, OPT> const& container, int value) const
976 {
977 return mFiltered->sliceBy(container, value);
978 }
979
981 std::unique_ptr<o2::soa::Filtered<T>> mFiltered = nullptr;
982 gandiva::NodePtr tree = nullptr;
984 bool dataframeChanged = true;
985
991 {
992 return mFiltered->begin();
993 }
995 {
996 return mFiltered->end();
997 }
999 {
1000 return mFiltered->begin();
1001 }
1003 {
1004 return mFiltered->end();
1005 }
1006
1007 int64_t size() const
1008 {
1009 return mFiltered->size();
1010 }
1011};
1012
1013template <typename T>
1014concept is_partition = requires(T t) {
1015 &T::updatePlaceholders;
1016 requires std::same_as<decltype(t.filter), expressions::Filter>;
1017 requires std::same_as<decltype(t.mFiltered), std::unique_ptr<o2::soa::Filtered<typename T::content_t>>>;
1018};
1019} // namespace o2::framework
1020
1021namespace o2::soa
1022{
1024template <soa::is_table T, soa::is_spawnable_column... Cs>
1025auto Extend(T const& table)
1026{
1027 using output_t = Join<T, soa::Table<o2::aod::Hash<"JOIN"_h>, o2::aod::Hash<"JOIN/0"_h>, o2::aod::Hash<"JOIN"_h>, Cs...>>;
1028 static std::array<framework::expressions::Projector, sizeof...(Cs)> projectors{{std::move(Cs::Projector())...}};
1029 static std::shared_ptr<gandiva::Projector> projector = nullptr;
1030 static auto schema = std::make_shared<arrow::Schema>(o2::soa::createFieldsFromColumns(framework::pack<Cs...>{}));
1031 return output_t{{o2::framework::spawner(framework::pack<Cs...>{}, {table.asArrowTable()}, "dynamicExtension", projectors.data(), projector, schema), table.asArrowTable()}, 0};
1032}
1033
1036template <soa::is_table T, soa::is_dynamic_column... Cs>
1037auto Attach(T const& table)
1038{
1039 using output_t = Join<T, o2::soa::Table<o2::aod::Hash<"JOIN"_h>, o2::aod::Hash<"JOIN/0"_h>, o2::aod::Hash<"JOIN"_h>, Cs...>>;
1040 return output_t{{table.asArrowTable()}, table.offset()};
1041}
1042} // namespace o2::soa
1043
1044#endif // o2_framework_AnalysisHelpers_H_DEFINED
bool exclusive
std::vector< o2::soa::IndexRecord > records
std::vector< expressions::Projector > projectors
std::vector< std::shared_ptr< gandiva::Expression > > expressions
std::vector< std::string > labels
o2::monitoring::tags::Key Key
uint32_t hash
std::shared_ptr< arrow::Schema > schema
std::unique_ptr< expressions::Node > node
int32_t i
uint16_t pos
Definition RawData.h:3
uint32_t gfilter
Definition RawData.h:6
constexpr uint32_t runtime_hash(char const *str)
nlohmann::json json
TBranch * ptr
Definition A.h:16
iterator const_iterator
Definition ASoA.h:3771
T::template iterator_template_o< FilteredIndexPolicy, self_t > iterator
Definition ASoA.h:3769
Helper to check if a type T is an iterator.
Definition ASoA.h:1286
const GLfloat * m
Definition glcorearb.h:4066
GLuint64EXT * result
Definition glcorearb.h:5662
GLsizeiptr size
Definition glcorearb.h:659
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLsizei GLenum const void * indices
Definition glcorearb.h:400
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
GLuint object
Definition glcorearb.h:4041
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLint ref
Definition glcorearb.h:291
GLsizei GLenum * sources
Definition glcorearb.h:2516
std::shared_ptr< gandiva::Filter > FilterPtr
Definition Expressions.h:47
std::variant< OriginValueMatcher, DescriptionValueMatcher, SubSpecificationTypeValueMatcher, std::unique_ptr< DataDescriptorMatcher >, ConstantValueMatcher, StartTimeValueMatcher > Node
std::shared_ptr< gandiva::Projector > createProjectorHelper(size_t nColumns, expressions::Projector *projectors, std::shared_ptr< arrow::Schema > schema, std::vector< std::shared_ptr< arrow::Field > > const &fields)
gandiva::Selection createSelection(std::shared_ptr< arrow::Table > const &table, Filter const &expression)
Function for creating gandiva selection from our internal filter tree.
void updatePlaceholders(Filter &filter, InitContext &context)
Update placeholder nodes from context.
Defining PrimaryVertex explicitly as messageable.
auto spawner(std::shared_ptr< arrow::Table > const &fullTable, const char *name, o2::framework::expressions::Projector *projectors, std::shared_ptr< gandiva::Projector > &projector, std::shared_ptr< arrow::Schema > const &schema)
Expression-based column generator to materialize columns.
std::string serializeSchema(std::shared_ptr< arrow::Schema > schema)
std::string serializeProjectors(std::vector< framework::expressions::Projector > &projectors)
std::shared_ptr< arrow::Table > spawnerHelper(std::shared_ptr< arrow::Table > const &fullTable, std::shared_ptr< arrow::Schema > newSchema, size_t nColumns, expressions::Projector *projectors, const char *name, std::shared_ptr< gandiva::Projector > &projector)
constexpr std::size_t pack_size(pack< Ts... > const &)
template function to determine number of types in a pack
Definition Pack.h:28
constexpr auto transformBase()
This helper struct allows you to declare index tables to be created in a task.
std::string serializeIndexRecords(std::vector< o2::soa::IndexRecord > &irs)
memfun_type< decltype(&F::operator())>::type FFL(F const &func)
auto getTableFromFilter(soa::is_filtered_table auto const &table, soa::SelectionVector &&selection)
std::vector< std::shared_ptr< arrow::Table > > extractSources(ProcessingContext &pc, std::vector< std::string > const &labels)
consteval auto typeWithRef() -> T
Helper to define output for a Table.
OutputObjHandlingPolicy
Policy enum to determine OutputObj handling when writing.
void initializePartitionCaches(std::set< uint32_t > const &hashes, std::shared_ptr< arrow::Schema > const &schema, expressions::Filter const &filter, gandiva::NodePtr &tree, gandiva::FilterPtr &gfilter)
std::shared_ptr< arrow::Table > makeEmptyTableImpl(const char *name, std::shared_ptr< arrow::Schema > &schema)
auto createFieldsFromColumns(framework::pack< C... >)
Definition ASoA.h:57
SelectionVector selectionToVector(gandiva::Selection const &sel)
Definition ASoA.cxx:48
constexpr auto tableRef2InputSpec()
constexpr auto tableRef2Output()
std::vector< int64_t > SelectionVector
Definition ASoA.h:422
auto Attach(T const &table)
constexpr auto tableRef2Schema()
constexpr auto tableRef2ConfigParamSpec()
auto Extend(T const &table)
On-the-fly adding of expression columns.
constexpr auto tableRef2OutputSpec()
constexpr auto tableRef2OutputRef()
@ C
Definition Defs.h:36
Defining DataPointCompositeObject explicitly as copiable.
header::DataDescription description
std::vector< o2::soa::IndexRecord > records
std::vector< std::string > labels
header::DataOrigin origin
std::shared_ptr< std::vector< framework::IndexColumnBuilder > > builders
header::DataHeader::SubSpecificationType version
std::shared_ptr< arrow::Schema > outputSchema
std::shared_ptr< arrow::Table > materialize(ProcessingContext &pc)
std::shared_ptr< T > table
constexpr auto pack()
auto build(std::vector< std::shared_ptr< arrow::Table > > &&tables)
std::vector< soa::IndexRecord > map
std::vector< framework::IndexColumnBuilder > builders
std::shared_ptr< arrow::Schema > outputSchema
typename T::first_t H
T const & operator*() const
metadata::index_pack_t index_pack_t
typename T::rest_t Ts
decltype(transformBase< T >())::metadata metadata
std::shared_ptr< typename T::table_t > table
T::table_t const & operator*() const
decltype(transformBase< T >())::metadata metadata
typename metadata::extension_table_t extension_t
std::array< o2::framework::expressions::Projector, N > projectors
std::shared_ptr< extension_t > extension
std::shared_ptr< arrow::Schema > inputSchema
std::shared_ptr< arrow::Schema > schema
static constexpr bool delayed
static constexpr size_t N
typename metadata::placeholders_pack_t placeholders_pack_t
std::shared_ptr< gandiva::Projector > projector
static OutputSpec const spec()
aod::MetadataTrait< o2::aod::Hash< table_t::ref.desc_hash > >::metadata metadata
decltype(typeWithRef< T >()) table_t
O2 header for OutputObj metadata.
OutputObj(T &&t, OutputObjHandlingPolicy policy_=OutputObjHandlingPolicy::AnalysisObject, OutputObjSourceType sourceType_=OutputObjSourceType::OutputObjSource)
void setObject(std::shared_ptr< T > t)
OutputObjSourceType sourceType
OutputRef ref(uint16_t index, uint16_t max)
std::shared_ptr< T > object
OutputObj(std::string const &label_, OutputObjHandlingPolicy policy_=OutputObjHandlingPolicy::AnalysisObject, OutputObjSourceType sourceType_=OutputObjSourceType::OutputObjSource)
OutputSpec const spec()
OutputObjHandlingPolicy policy
void setHash(uint32_t hash)
Partition(expressions::Node &&filter_, T const &table)
typename o2::soa::Filtered< T >::const_iterator filtered_const_iterator
void bindTable(T const &table)
filtered_const_iterator begin() const
auto rawSliceBy(o2::framework::Preslice< T1 > const &container, int value) const
auto sliceByCached(framework::expressions::BindingNode const &node, int value, o2::framework::SliceCache &cache) const
void intializeCaches(std::set< uint32_t > const &hashes, std::shared_ptr< arrow::Schema > const &schema)
Partition(expressions::Node &&filter_)
typename o2::soa::Filtered< T >::iterator filtered_iterator
o2::soa::Filtered< T > * operator->()
typename o2::soa::Filtered< T >::iterator iterator
auto sliceBy(o2::framework::PresliceBase< T1, Policy, OPT > const &container, int value) const
std::shared_ptr< arrow::Table > asArrowTable() const
o2::soa::RowViewSentinel end() const
void updatePlaceholders(InitContext &context)
std::unique_ptr< o2::soa::Filtered< T > > mFiltered
expressions::Filter filter
o2::soa::RowViewSentinel end()
auto sliceByCachedUnsorted(framework::expressions::BindingNode const &node, int value, o2::framework::SliceCache &cache) const
typename o2::soa::Filtered< T >::const_iterator const_iterator
gandiva::FilterPtr gfilter
void bindExternalIndices(Ts *... tables)
filtered_iterator begin()
header::DataDescription description
std::vector< std::string > labels
std::vector< std::shared_ptr< gandiva::Expression > > expressions
std::shared_ptr< gandiva::Projector > projector
header::DataOrigin origin
header::DataHeader::SubSpecificationType version
std::shared_ptr< arrow::Schema > schema
std::shared_ptr< arrow::Table > materialize(ProcessingContext &pc) const
std::shared_ptr< arrow::Schema > inputSchema
T::table_t * operator->()
std::shared_ptr< gandiva::Projector > projector
std::shared_ptr< extension_t > extension
T::table_t const & operator*() const
static constexpr size_t N
std::shared_ptr< arrow::Schema > schema
typename metadata::expression_pack_t expression_pack_t
std::array< o2::framework::expressions::Projector, N > projectors
decltype(transformBase< T >())::metadata metadata
std::shared_ptr< typename T::table_t > table
typename metadata::extension_table_t extension_t
Helper template for table transformations.
static constexpr auto spec()
static constexpr auto ref()
static constexpr auto output()
int64_t lastIndex()
Last index inserted in the table.
decltype(FFL(std::declval< cursor_t >())) cursor
bool resetCursor(LifetimeHolder< TableBuilder > builder)
void setLabel(const char *label)
void operator()(Ts &&... args)
decltype([]() { if constexpr(soa::is_iterator< T >) { return typename T::parent_t{nullptr} persistent_table_t
decltype(std::declval< TableBuilder >().cursor< persistent_table_t >()) cursor_t
An expression tree node corresponding to a column binding.
A struct, containing the root of the expression tree.
uint32_t SubSpecificationType
Definition DataHeader.h:621
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:33
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables, std::span< const char *const > labels)
Definition ASoA.cxx:72
static void resetBuilders(std::vector< framework::IndexColumnBuilder > &builders, std::vector< std::shared_ptr< arrow::Table > > &&tables)
static std::vector< framework::IndexColumnBuilder > makeBuilders(std::vector< std::shared_ptr< arrow::Table > > &&tables, std::vector< soa::IndexRecord > const &records)
static std::shared_ptr< arrow::Table > materialize(std::vector< framework::IndexColumnBuilder > &builders, std::vector< std::shared_ptr< arrow::Table > > &&tables, std::vector< soa::IndexRecord > const &records, std::shared_ptr< arrow::Schema > const &schema, bool exclusive)
std::shared_ptr< arrow::Field > field() const
auto operator==(IndexRecord const &other) const
constexpr size_t max
VectorOfTObjectPtrs other
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))