Project
Loading...
Searching...
No Matches
AnalysisHelpers.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef o2_framework_AnalysisHelpers_H_DEFINED
12#define o2_framework_AnalysisHelpers_H_DEFINED
13
14#include "ConfigParamSpec.h"
15#include "Framework/ASoA.h"
18#include "Framework/InputSpec.h"
19#include "Framework/Output.h"
21#include "Framework/OutputRef.h"
23#include "Framework/Plugins.h"
26#include "Framework/Traits.h"
27
28#include <string>
29namespace o2::soa
30{
32 std::string label;
34 std::string columnLabel;
36 int pos;
37 std::shared_ptr<arrow::DataType> type = [](IndexKind kind) -> std::shared_ptr<arrow::DataType> {
38 switch (kind) {
41 return arrow::int32();
43 return arrow::fixed_size_list(arrow::int32(), 2);
45 return arrow::list(arrow::int32());
46 default:
47 return {nullptr};
48 }
49 }(kind);
50
51 auto operator==(IndexRecord const& other) const
52 {
53 return (this->label == other.label) && (this->columnLabel == other.columnLabel) && (this->kind == other.kind) && (this->pos == other.pos);
54 }
55
56 std::shared_ptr<arrow::Field> field() const
57 {
58 return std::make_shared<arrow::Field>(columnLabel, type);
59 }
60};
61
63 static std::vector<framework::IndexColumnBuilder> makeBuilders(std::vector<std::shared_ptr<arrow::Table>>&& tables, std::vector<soa::IndexRecord> const& records);
64 static void resetBuilders(std::vector<framework::IndexColumnBuilder>& builders, std::vector<std::shared_ptr<arrow::Table>>&& tables);
65
66 static std::shared_ptr<arrow::Table> materialize(std::vector<framework::IndexColumnBuilder>& builders, std::vector<std::shared_ptr<arrow::Table>>&& tables, std::vector<soa::IndexRecord> const& records, std::shared_ptr<arrow::Schema> const& schema, bool exclusive);
67};
68} // namespace o2::soa
69
70namespace o2::framework
71{
72std::shared_ptr<arrow::Table> makeEmptyTableImpl(const char* name, std::shared_ptr<arrow::Schema>& schema);
73
74template <soa::is_table T>
75auto makeEmptyTable(const char* name)
76{
77 auto schema = std::make_shared<arrow::Schema>(soa::createFieldsFromColumns(typename T::table_t::persistent_columns_t{}));
79}
80
81template <soa::TableRef R>
83{
84 auto schema = std::make_shared<arrow::Schema>(soa::createFieldsFromColumns(typename aod::MetadataTrait<aod::Hash<R.desc_hash>>::metadata::persistent_columns_t{}));
85 return makeEmptyTableImpl(o2::aod::label<R>(), schema);
86}
87
88template <typename... Cs>
90{
91 auto schema = std::make_shared<arrow::Schema>(soa::createFieldsFromColumns(p));
93}
94
95template <aod::is_aod_hash D>
96auto makeEmptyTable(const char* name)
97{
98 auto schema = std::make_shared<arrow::Schema>(soa::createFieldsFromColumns(typename aod::MetadataTrait<D>::metadata::persistent_columns_t{}));
100}
101
102std::shared_ptr<arrow::Table> spawnerHelper(std::shared_ptr<arrow::Table> const& fullTable, std::shared_ptr<arrow::Schema> newSchema, size_t nColumns,
103 expressions::Projector* projectors, const char* name, std::shared_ptr<gandiva::Projector>& projector);
104
105std::shared_ptr<arrow::Table> spawnerHelper(std::shared_ptr<arrow::Table> const& fullTable, std::shared_ptr<arrow::Schema> newSchema,
106 const char* name, size_t nColumns,
107 const std::shared_ptr<gandiva::Projector>& projector);
108
110template <aod::is_aod_hash D>
111 requires(soa::has_extension<typename o2::aod::MetadataTrait<D>::metadata>)
112auto spawner(std::shared_ptr<arrow::Table> const& fullTable, const char* name, o2::framework::expressions::Projector* projectors, std::shared_ptr<gandiva::Projector>& projector, std::shared_ptr<arrow::Schema> const& schema)
113{
114 if (fullTable->num_rows() == 0) {
115 return makeEmptyTable<D>(name);
116 }
117 constexpr auto Ncol = []<typename M>() {
119 return framework::pack_size(typename M::placeholders_pack_t{});
120 } else {
121 return framework::pack_size(typename M::expression_pack_t{});
122 }
123 }.template operator()<typename o2::aod::MetadataTrait<D>::metadata>();
124 return spawnerHelper(fullTable, schema, Ncol, projectors, name, projector);
125}
126
127template <typename... C>
128auto spawner(framework::pack<C...>, std::vector<std::shared_ptr<arrow::Table>>&& tables, const char* name, expressions::Projector* projectors, std::shared_ptr<gandiva::Projector>& projector, std::shared_ptr<arrow::Schema> const& schema)
129{
130 std::array<const char*, 1> labels{"original"};
131 auto fullTable = soa::ArrowHelpers::joinTables(std::move(tables), std::span<const char* const>{labels});
132 if (fullTable->num_rows() == 0) {
134 }
135 return spawnerHelper(fullTable, schema, sizeof...(C), projectors, name, projector);
136}
137
138std::string serializeProjectors(std::vector<framework::expressions::Projector>& projectors);
139std::string serializeSchema(std::shared_ptr<arrow::Schema> schema);
140std::string serializeIndexRecords(std::vector<o2::soa::IndexRecord>& irs);
141std::vector<std::shared_ptr<arrow::Table>> extractSources(ProcessingContext& pc, std::vector<std::string> const& labels);
142
143struct Spawner {
144 std::string binding;
145 std::vector<std::string> labels;
146 std::vector<framework::ConcreteDataMatcher> matchers;
147 std::vector<std::shared_ptr<gandiva::Expression>> expressions;
148 std::shared_ptr<gandiva::Projector> projector = nullptr;
149 std::shared_ptr<arrow::Schema> schema = nullptr;
150 std::shared_ptr<arrow::Schema> inputSchema = nullptr;
151
155
156 std::shared_ptr<arrow::Table> materialize(ProcessingContext& pc) const;
157};
158
159struct Builder {
161 std::vector<std::string> labels;
162 std::vector<framework::ConcreteDataMatcher> matchers;
163 std::vector<o2::soa::IndexRecord> records;
164 std::shared_ptr<arrow::Schema> outputSchema;
168
169 std::shared_ptr<std::vector<framework::IndexColumnBuilder>> builders = nullptr;
170
171 std::shared_ptr<arrow::Table> materialize(ProcessingContext& pc);
172};
173} // namespace o2::framework
174
175namespace o2::soa
176{
177template <TableRef R>
179{
181 std::string{"input:"} + o2::aod::label<R>(),
183 aod::sourceSpec<R>(),
184 {"\"\""}};
185}
186
187template <TableRef R>
188constexpr auto tableRef2Schema()
189{
191 std::string{"input-schema:"} + o2::aod::label<R>(),
194 {"\"\""}};
195}
196
197namespace
198{
199template <soa::with_sources T>
200inline constexpr auto getSources()
201{
202 return []<size_t N, std::array<soa::TableRef, N> refs>() {
203 return []<size_t... Is>(std::index_sequence<Is...>) {
204 return std::vector{soa::tableRef2ConfigParamSpec<refs[Is]>()...};
205 }(std::make_index_sequence<N>());
206 }.template operator()<T::sources.size(), T::sources>();
207}
208
209template <soa::with_sources T>
210inline constexpr auto getSourceSchemas()
211{
212 return []<size_t N, std::array<soa::TableRef, N> refs>() {
213 return []<size_t... Is>(std::index_sequence<Is...>) {
214 return std::vector{soa::tableRef2Schema<refs[Is]>()...};
215 }(std::make_index_sequence<N>());
216 }.template operator()<T::sources.size(), T::sources>();
217}
218
219template <soa::with_ccdb_urls T>
220inline constexpr auto getCCDBUrls()
221{
222 std::vector<framework::ConfigParamSpec> result;
223 for (size_t i = 0; i < T::ccdb_urls.size(); ++i) {
224 result.push_back({std::string{"ccdb:"} + std::string{T::ccdb_bindings[i]},
226 T::ccdb_urls[i],
227 {"\"\""}});
228 }
229 return result;
230}
231
232template <typename T>
233 requires(std::same_as<T, int>)
234consteval IndexKind getIndexKind()
235{
237}
238
239template <typename T>
240 requires(std::is_bounded_array_v<T>)
241consteval IndexKind getIndexKind()
242{
243 return IndexKind::IdxSlice;
244}
245
246template <typename T>
247 requires(framework::is_specialization_v<T, std::vector>)
248consteval IndexKind getIndexKind()
249{
250 return IndexKind::IdxArray;
251}
252
253template <soa::with_index_pack T>
254inline constexpr auto getIndexMapping()
255{
256 std::vector<IndexRecord> idx;
257 using indices = T::index_pack_t;
258 using Key = T::Key;
259 [&idx]<size_t... Is>(std::index_sequence<Is...>) mutable {
260 constexpr auto refs = T::sources;
261 ([&idx]<TableRef ref, typename C>() mutable {
262 constexpr auto pos = o2::aod::MetadataTrait<o2::aod::Hash<ref.desc_hash>>::metadata::template getIndexPosToKey<Key>();
263 if constexpr (pos == -1) {
264 idx.emplace_back(o2::aod::label<ref>(), o2::aod::matcher<ref>(), C::columnLabel(), IndexKind::IdxSelf, pos);
265 } else {
266 idx.emplace_back(o2::aod::label<ref>(), o2::aod::matcher<ref>(), C::columnLabel(), getIndexKind<typename C::type>(), pos);
267 }
268 }.template operator()<refs[Is], typename framework::pack_element_t<Is, indices>>(),
269 ...);
270 }(std::make_index_sequence<framework::pack_size(indices{})>());
271 ;
272 return idx;
273}
274
275template <soa::with_sources T>
276constexpr auto getInputMetadata() -> std::vector<framework::ConfigParamSpec>
277{
278 std::vector<framework::ConfigParamSpec> inputMetadata;
279
280 auto inputSources = getSources<T>();
281 std::sort(inputSources.begin(), inputSources.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name < b.name; });
282 auto last = std::unique(inputSources.begin(), inputSources.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name == b.name; });
283 inputSources.erase(last, inputSources.end());
284 inputMetadata.insert(inputMetadata.end(), inputSources.begin(), inputSources.end());
285
286 auto inputSchemas = getSourceSchemas<T>();
287 std::sort(inputSchemas.begin(), inputSchemas.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name < b.name; });
288 last = std::unique(inputSchemas.begin(), inputSchemas.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name == b.name; });
289 inputSchemas.erase(last, inputSchemas.end());
290 inputMetadata.insert(inputMetadata.end(), inputSchemas.begin(), inputSchemas.end());
291
292 return inputMetadata;
293}
294
295template <typename T>
296 requires(!soa::with_sources<T>)
297constexpr auto getInputMetadata() -> std::vector<framework::ConfigParamSpec>
298{
299 return {};
300}
301
302template <soa::with_ccdb_urls T>
303constexpr auto getCCDBMetadata() -> std::vector<framework::ConfigParamSpec>
304{
305 std::vector<framework::ConfigParamSpec> results = getCCDBUrls<T>();
306 std::sort(results.begin(), results.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name < b.name; });
307 auto last = std::unique(results.begin(), results.end(), [](framework::ConfigParamSpec const& a, framework::ConfigParamSpec const& b) { return a.name == b.name; });
308 results.erase(last, results.end());
309 return results;
310}
311
312template <typename T>
313constexpr auto getCCDBMetadata() -> std::vector<framework::ConfigParamSpec>
314{
315 return {};
316}
317
318template <soa::with_expression_pack T>
319constexpr auto getExpressionMetadata() -> std::vector<framework::ConfigParamSpec>
320{
321 using expression_pack_t = T::expression_pack_t;
322
323 auto projectors = []<typename... C>(framework::pack<C...>) -> std::vector<framework::expressions::Projector> {
324 std::vector<framework::expressions::Projector> result;
325 (result.emplace_back(std::move(C::Projector())), ...);
326 return result;
327 }(expression_pack_t{});
328
330 return {framework::ConfigParamSpec{"projectors", framework::VariantType::String, json, {"\"\""}}};
331}
332
333template <typename T>
334 requires(!soa::with_expression_pack<T>)
335constexpr auto getExpressionMetadata() -> std::vector<framework::ConfigParamSpec>
336{
337 return {};
338}
339
340template <soa::with_index_pack T>
341constexpr auto getIndexMetadata() -> std::vector<framework::ConfigParamSpec>
342{
343 auto map = getIndexMapping<T>();
344 return {framework::ConfigParamSpec{"index-records", framework::VariantType::String, framework::serializeIndexRecords(map), {"\"\""}},
345 {framework::ConfigParamSpec{"index-exclusive", framework::VariantType::Bool, T::exclusive, {"\"\""}}}};
346}
347
348template <typename T>
349 requires(!soa::with_index_pack<T>)
350constexpr auto getIndexMetadata() -> std::vector<framework::ConfigParamSpec>
351{
352 return {};
353}
354
355} // namespace
356
357template <TableRef R>
358constexpr auto tableRef2InputSpec()
359{
360 std::vector<framework::ConfigParamSpec> metadata;
361 auto m = getInputMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
362 metadata.insert(metadata.end(), m.begin(), m.end());
363 auto ccdbMetadata = getCCDBMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
364 metadata.insert(metadata.end(), ccdbMetadata.begin(), ccdbMetadata.end());
365 auto p = getExpressionMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
366 metadata.insert(metadata.end(), p.begin(), p.end());
367 auto idx = getIndexMetadata<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>();
368 metadata.insert(metadata.end(), idx.begin(), idx.end());
369 if constexpr (!soa::with_ccdb_urls<typename o2::aod::MetadataTrait<o2::aod::Hash<R.desc_hash>>::metadata>) {
371 }
372
374 o2::aod::label<R>(),
375 o2::aod::origin<R>(),
376 o2::aod::description(o2::aod::signature<R>()),
377 R.version,
378 framework::Lifetime::Timeframe,
379 metadata};
380}
381
382template <TableRef R>
383constexpr auto tableRef2OutputSpec()
384{
386 framework::OutputLabel{o2::aod::label<R>()},
387 o2::aod::origin<R>(),
388 o2::aod::description(o2::aod::signature<R>()),
389 R.version};
390}
391
392template <TableRef R>
393constexpr auto tableRef2Output()
394{
395 return framework::Output{
396 o2::aod::origin<R>(),
397 o2::aod::description(o2::aod::signature<R>()),
398 R.version};
399}
400
401template <TableRef R>
402constexpr auto tableRef2OutputRef()
403{
405 o2::aod::label<R>(),
406 R.version};
407}
408} // namespace o2::soa
409
410namespace o2::framework
411{
412class TableConsumer;
413
417template <typename T>
419
420template <typename T>
421concept is_enumerated_iterator = requires(T t) { t.globalIndex(); };
422
423template <is_producable T>
425 public:
426 using persistent_table_t = decltype([]() { if constexpr (soa::is_iterator<T>) { return typename T::parent_t{nullptr}; } else { return T{nullptr}; } }());
427 using cursor_t = decltype(std::declval<TableBuilder>().cursor<persistent_table_t>());
428
429 template <typename... Ts>
430 void operator()(Ts&&... args)
431 requires(sizeof...(Ts) == framework::pack_size(typename persistent_table_t::persistent_columns_t{}))
432 {
433 ++mCount;
434 cursor(0, extract(args)...);
435 }
436
438 int64_t lastIndex()
439 {
440 return mCount;
441 }
442
444 {
445 mBuilder = std::move(builder);
446 cursor = std::move(FFL(mBuilder->cursor<persistent_table_t>()));
447 mCount = -1;
448 return true;
449 }
450
451 void setLabel(const char* label)
452 {
453 mBuilder->setLabel(label);
454 }
455
458 void reserve(int64_t size)
459 {
460 mBuilder->reserve(typename persistent_table_t::column_types{}, size);
461 }
462
463 void release()
464 {
465 mBuilder.release();
466 }
467
468 decltype(FFL(std::declval<cursor_t>())) cursor;
469
470 private:
471 static decltype(auto) extract(is_enumerated_iterator auto const& arg)
472 {
473 return arg.globalIndex();
474 }
475
476 template <typename A>
478 static decltype(auto) extract(A&& arg)
479 {
480 return arg;
481 }
482
486 LifetimeHolder<TableBuilder> mBuilder = nullptr;
487 int64_t mCount = -1;
488};
489
491template <soa::is_table T>
492consteval auto typeWithRef() -> T
493{
494}
495
496template <soa::is_iterator T>
497consteval auto typeWithRef() -> typename T::parent_t
498{
499}
500
501template <typename T>
502 requires soa::is_table<T> || soa::is_iterator<T>
504 using table_t = decltype(typeWithRef<T>());
505 using metadata = aod::MetadataTrait<o2::aod::Hash<table_t::ref.desc_hash>>::metadata;
506
507 static OutputSpec const spec()
508 {
509 return OutputSpec{OutputLabel{aod::label<table_t::ref>()}, o2::aod::origin<table_t::ref>(), o2::aod::description(o2::aod::signature<table_t::ref>()), table_t::ref.version};
510 }
511
512 static OutputRef ref()
513 {
514 return OutputRef{aod::label<table_t::ref>(), table_t::ref.version};
515 }
516};
517
522template <is_producable T>
524};
525
526template <typename T>
527concept is_produces = requires(T t) { typename T::cursor_t; typename T::persistent_table_t; &T::cursor; };
528
538};
539
540template <typename T>
541concept is_produces_group = std::derived_from<T, ProducesGroup>;
542
544template <soa::is_metadata M, soa::TableRef Ref>
546 using metadata = M;
547 constexpr static auto sources = M::sources;
548
549 template <soa::TableRef R>
550 static auto base_spec()
551 {
552 return soa::tableRef2InputSpec<R>();
553 }
554
555 static auto base_specs()
556 {
557 return []<size_t... Is>(std::index_sequence<Is...>) {
558 return std::array{base_spec<sources[Is]>()...};
559 }(std::make_index_sequence<sources.size()>{});
560 }
561
562 static constexpr auto spec()
563 {
564 return soa::tableRef2OutputSpec<Ref>();
565 }
566
567 static constexpr auto output()
568 {
569 return soa::tableRef2Output<Ref>();
570 }
571
572 static constexpr auto ref()
573 {
574 return soa::tableRef2OutputRef<Ref>();
575 }
576};
577
580template <typename T>
581concept is_spawnable = soa::has_metadata<aod::MetadataTrait<o2::aod::Hash<T::ref.desc_hash>>> && soa::has_extension<typename aod::MetadataTrait<o2::aod::Hash<T::ref.desc_hash>>::metadata>;
582
583template <typename T>
584concept is_dynamically_spawnable = soa::has_metadata<aod::MetadataTrait<o2::aod::Hash<T::ref.desc_hash>>> && soa::has_configurable_extension<typename aod::MetadataTrait<o2::aod::Hash<T::ref.desc_hash>>::metadata>;
585
586template <is_spawnable T>
587constexpr auto transformBase()
588{
589 using metadata = typename aod::MetadataTrait<o2::aod::Hash<T::ref.desc_hash>>::metadata;
591}
592
593template <is_spawnable T>
594struct Spawns : decltype(transformBase<T>()) {
595 using spawnable_t = T;
596 using metadata = decltype(transformBase<T>())::metadata;
597 using extension_t = typename metadata::extension_table_t;
598 using expression_pack_t = typename metadata::expression_pack_t;
599 static constexpr size_t N = framework::pack_size(expression_pack_t{});
600
601 typename T::table_t* operator->()
602 {
603 return table.get();
604 }
605 typename T::table_t const& operator*() const
606 {
607 return *table;
608 }
609
611 {
612 return extension->asArrowTable();
613 }
614
615 std::shared_ptr<typename T::table_t> table = nullptr;
616 std::shared_ptr<extension_t> extension = nullptr;
617 std::array<o2::framework::expressions::Projector, N> projectors = []<typename... C>(framework::pack<C...>) -> std::array<expressions::Projector, sizeof...(C)>
618 {
619 return {{std::move(C::Projector())...}};
620 }
622 std::shared_ptr<gandiva::Projector> projector = nullptr;
623 std::shared_ptr<arrow::Schema> schema = []() {
624 auto s = std::make_shared<arrow::Schema>(o2::soa::createFieldsFromColumns(expression_pack_t{}));
625 s->WithMetadata(std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{"label"}}, std::vector{std::string{o2::aod::label<T::ref>()}}));
626 return s;
627 }();
628};
629
630template <typename T>
631concept is_spawns = requires(T t) {
632 typename T::metadata;
633 typename T::expression_pack_t;
634 requires std::same_as<decltype(t.projector), std::shared_ptr<gandiva::Projector>>;
635};
636
641
642template <is_dynamically_spawnable T, bool DELAYED = false>
643struct Defines : decltype(transformBase<T>()) {
644 static constexpr bool delayed = DELAYED;
645 using spawnable_t = T;
646 using metadata = decltype(transformBase<T>())::metadata;
647 using extension_t = typename metadata::extension_table_t;
648 using placeholders_pack_t = typename metadata::placeholders_pack_t;
649 static constexpr size_t N = framework::pack_size(placeholders_pack_t{});
650
651 typename T::table_t* operator->()
652 {
653 return table.get();
654 }
655 typename T::table_t const& operator*() const
656 {
657 return *table;
658 }
659
661 {
662 return extension->asArrowTable();
663 }
664 std::shared_ptr<typename T::table_t> table = nullptr;
665 std::shared_ptr<extension_t> extension = nullptr;
666
667 std::array<o2::framework::expressions::Projector, N> projectors;
668 std::shared_ptr<gandiva::Projector> projector = nullptr;
669 std::shared_ptr<arrow::Schema> schema = []() {
670 auto s = std::make_shared<arrow::Schema>(o2::soa::createFieldsFromColumns(placeholders_pack_t{}));
671 s->WithMetadata(std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{"label"}}, std::vector{std::string{o2::aod::label<T::ref>()}}));
672 return s;
673 }();
674 std::shared_ptr<arrow::Schema> inputSchema = nullptr;
675
676 bool needRecompilation = false;
677
682};
683
684template <is_dynamically_spawnable T>
686
687template <typename T>
688concept is_defines = requires(T t) {
689 typename T::metadata;
690 typename T::placeholders_pack_t;
691 requires std::same_as<decltype(t.projector), std::shared_ptr<gandiva::Projector>>;
692 requires std::same_as<decltype(t.needRecompilation), bool>;
693 &T::recompile;
694};
695
700struct Exclusive {
701};
702struct Sparse {
703};
704
706
707template <soa::is_index_table T>
708constexpr auto transformBase()
709{
710 using metadata = typename aod::MetadataTrait<o2::aod::Hash<T::ref.desc_hash>>::metadata;
712}
713
714template <soa::is_index_table T>
715struct Builds : decltype(transformBase<T>()) {
716 using buildable_t = T;
717 using metadata = decltype(transformBase<T>())::metadata;
718 using Key = metadata::Key;
719 using H = typename T::first_t;
720 using Ts = typename T::rest_t;
721 using index_pack_t = metadata::index_pack_t;
722
723 std::shared_ptr<arrow::Schema> outputSchema = []() { return std::make_shared<arrow::Schema>(soa::createFieldsFromColumns(index_pack_t{}))->WithMetadata(std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{"label"}}, std::vector{std::string{o2::aod::label<T::ref>()}})); }();
724
725 std::vector<soa::IndexRecord> map = soa::getIndexMapping<metadata>();
726
727 std::vector<framework::IndexColumnBuilder> builders;
728
730 {
731 return table.get();
732 }
733 T const& operator*() const
734 {
735 return *table;
736 }
737
739 {
740 return table->asArrowTable();
741 }
742 std::shared_ptr<T> table = nullptr;
743
744 constexpr auto pack()
745 {
746 return index_pack_t{};
747 }
748
749 auto build(std::vector<std::shared_ptr<arrow::Table>>&& tables)
750 {
751 this->table = std::make_shared<T>(soa::IndexBuilder::materialize(builders, std::forward<std::vector<std::shared_ptr<arrow::Table>>>(tables), map, outputSchema, metadata::exclusive));
752 return (this->table != nullptr);
753 }
754};
755
756template <typename T>
757concept is_builds = requires(T t) {
758 typename T::metadata;
759 typename T::Key;
760 requires std::same_as<decltype(t.map), std::vector<soa::IndexRecord>>;
761};
762
770template <typename T>
771struct OutputObj {
772 using obj_t = T;
773
775 : object(std::make_shared<T>(t)),
776 label(t.GetName()),
777 policy{policy_},
778 sourceType{sourceType_},
779 mTaskHash{0}
780 {
781 }
782
784 : object(nullptr),
785 label(label_),
786 policy{policy_},
787 sourceType{sourceType_},
788 mTaskHash{0}
789 {
790 }
791
792 void setObject(T const& t)
793 {
794 object = std::make_shared<T>(t);
795 object->SetName(label.c_str());
796 }
797
798 void setObject(T&& t)
799 {
800 object = std::make_shared<T>(t);
801 object->SetName(label.c_str());
802 }
803
804 void setObject(T* t)
805 {
806 object.reset(t);
807 object->SetName(label.c_str());
808 }
809
810 void setObject(std::shared_ptr<T> t)
811 {
812 object = t;
813 object->SetName(label.c_str());
814 }
815
816 void setHash(uint32_t hash)
817 {
818 mTaskHash = hash;
819 }
820
823 {
825 auto lhash = runtime_hash(label.c_str());
826 std::memset(desc.str, '_', 16);
827 std::stringstream s;
828 s << std::hex << lhash;
829 s << std::hex << mTaskHash;
830 s << std::hex << reinterpret_cast<uint64_t>(this);
831 std::memcpy(desc.str, s.str().c_str(), 12);
832 return OutputSpec{OutputLabel{label}, "ATSK", desc, 0, Lifetime::QA};
833 }
834
836 {
837 return object.get();
838 }
839
841 {
842 return *object.get();
843 }
844
845 OutputRef ref(uint16_t index, uint16_t max)
846 {
847 return OutputRef{std::string{label}, 0,
849 }
850
851 std::shared_ptr<T> object;
852 std::string label;
855 uint32_t mTaskHash;
856};
857
858template <typename T>
859concept is_outputobj = requires(T t) {
860 &T::setHash;
861 &T::spec;
862 &T::ref;
863 requires std::same_as<decltype(t.operator->()), typename T::obj_t*>;
864 requires std::same_as<decltype(t.object), std::shared_ptr<typename T::obj_t>>;
865};
866
870template <typename T>
871struct Service {
872 using service_t = T;
874
875 decltype(auto) operator->() const
876 {
878 return service->get();
879 } else {
880 return service;
881 }
882 }
883};
884
885template <typename T>
886concept is_service = requires(T t) {
887 requires std::same_as<decltype(t.service), typename T::service_t*>;
888 &T::operator->;
889};
890
892{
893 return std::make_unique<o2::soa::Filtered<std::decay_t<decltype(table)>>>(std::vector{table}, std::forward<soa::SelectionVector>(selection));
894}
895
897{
898 return std::make_unique<o2::soa::Filtered<std::decay_t<decltype(table)>>>(std::vector{table.asArrowTable()}, std::forward<soa::SelectionVector>(selection));
899}
900
901void initializePartitionCaches(std::set<uint32_t> const& hashes, std::shared_ptr<arrow::Schema> const& schema, expressions::Filter const& filter, gandiva::NodePtr& tree, gandiva::FilterPtr& gfilter);
902
903template <typename T>
904struct Partition {
905 using content_t = T;
906 Partition(expressions::Node&& filter_) : filter{std::forward<expressions::Node>(filter_)}
907 {
908 }
909
910 Partition(expressions::Node&& filter_, T const& table)
911 : filter{std::forward<expressions::Node>(filter_)}
912 {
913 setTable(table);
914 }
915
916 void intializeCaches(std::set<uint32_t> const& hashes, std::shared_ptr<arrow::Schema> const& schema)
917 {
919 }
920
921 void bindTable(T const& table)
922 {
923 intializeCaches(T::table_t::hashes(), table.asArrowTable()->schema());
924 if (dataframeChanged) {
926 dataframeChanged = false;
927 }
928 }
929
930 template <typename... Ts>
931 void bindExternalIndices(Ts*... tables)
932 {
933 if (mFiltered != nullptr) {
934 mFiltered->bindExternalIndices(tables...);
935 }
936 }
937
938 template <typename E>
940 {
941 if (mFiltered != nullptr) {
942 mFiltered->bindInternalIndicesTo(ptr);
943 }
944 }
945
947 {
949 }
950
951 [[nodiscard]] std::shared_ptr<arrow::Table> asArrowTable() const
952 {
953 return mFiltered->asArrowTable();
954 }
955
957 {
958 return mFiltered.get();
959 }
960
961 template <typename T1>
962 [[nodiscard]] auto rawSliceBy(o2::framework::Preslice<T1> const& container, int value) const
963 {
964 return mFiltered->rawSliceBy(container, value);
965 }
966
968 {
969 return mFiltered->sliceByCached(node, value, cache);
970 }
971
973 {
974 return mFiltered->sliceByCachedUnsorted(node, value, cache);
975 }
976
977 template <typename T1, typename Policy, bool OPT>
978 [[nodiscard]] auto sliceBy(o2::framework::PresliceBase<T1, Policy, OPT> const& container, int value) const
979 {
980 return mFiltered->sliceBy(container, value);
981 }
982
984 std::unique_ptr<o2::soa::Filtered<T>> mFiltered = nullptr;
985 gandiva::NodePtr tree = nullptr;
987 bool dataframeChanged = true;
988
994 {
995 return mFiltered->begin();
996 }
998 {
999 return mFiltered->end();
1000 }
1002 {
1003 return mFiltered->begin();
1004 }
1006 {
1007 return mFiltered->end();
1008 }
1009
1010 int64_t size() const
1011 {
1012 return mFiltered->size();
1013 }
1014};
1015
1016template <typename T>
1017concept is_partition = requires(T t) {
1018 &T::updatePlaceholders;
1019 requires std::same_as<decltype(t.filter), expressions::Filter>;
1020 requires std::same_as<decltype(t.mFiltered), std::unique_ptr<o2::soa::Filtered<typename T::content_t>>>;
1021};
1022} // namespace o2::framework
1023
1024namespace o2::soa
1025{
1027template <soa::is_table T, soa::is_spawnable_column... Cs>
1028auto Extend(T const& table)
1029{
1030 using output_t = Join<T, soa::Table<o2::aod::Hash<"JOIN"_h>, o2::aod::Hash<"JOIN/0"_h>, o2::aod::Hash<"JOIN"_h>, Cs...>>;
1031 static std::array<framework::expressions::Projector, sizeof...(Cs)> projectors{{std::move(Cs::Projector())...}};
1032 static std::shared_ptr<gandiva::Projector> projector = nullptr;
1033 static auto schema = std::make_shared<arrow::Schema>(o2::soa::createFieldsFromColumns(framework::pack<Cs...>{}));
1034 return output_t{{o2::framework::spawner(framework::pack<Cs...>{}, {table.asArrowTable()}, "dynamicExtension", projectors.data(), projector, schema), table.asArrowTable()}, 0};
1035}
1036
1039template <soa::is_table T, soa::is_dynamic_column... Cs>
1040auto Attach(T const& table)
1041{
1042 using output_t = Join<T, o2::soa::Table<o2::aod::Hash<"JOIN"_h>, o2::aod::Hash<"JOIN/0"_h>, o2::aod::Hash<"JOIN"_h>, Cs...>>;
1043 return output_t{{table.asArrowTable()}, table.offset()};
1044}
1045} // namespace o2::soa
1046
1047#endif // o2_framework_AnalysisHelpers_H_DEFINED
bool exclusive
std::vector< o2::soa::IndexRecord > records
std::vector< expressions::Projector > projectors
std::vector< std::shared_ptr< gandiva::Expression > > expressions
std::vector< std::string > labels
o2::monitoring::tags::Key Key
uint32_t hash
std::shared_ptr< arrow::Schema > schema
std::unique_ptr< expressions::Node > node
int32_t i
uint16_t pos
Definition RawData.h:3
uint32_t gfilter
Definition RawData.h:6
constexpr uint32_t runtime_hash(char const *str)
nlohmann::json json
TBranch * ptr
Definition A.h:16
iterator const_iterator
Definition ASoA.h:3816
T::template iterator_template_o< FilteredIndexPolicy, self_t > iterator
Definition ASoA.h:3814
Helper to check if a type T is an iterator.
Definition ASoA.h:1293
const GLfloat * m
Definition glcorearb.h:4066
GLuint64EXT * result
Definition glcorearb.h:5662
GLsizeiptr size
Definition glcorearb.h:659
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLsizei GLenum const void * indices
Definition glcorearb.h:400
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
GLuint object
Definition glcorearb.h:4041
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
GLint ref
Definition glcorearb.h:291
GLsizei GLenum * sources
Definition glcorearb.h:2516
std::shared_ptr< gandiva::Filter > FilterPtr
Definition Expressions.h:47
std::variant< OriginValueMatcher, DescriptionValueMatcher, SubSpecificationTypeValueMatcher, std::unique_ptr< DataDescriptorMatcher >, ConstantValueMatcher, StartTimeValueMatcher > Node
std::shared_ptr< gandiva::Projector > createProjectorHelper(size_t nColumns, expressions::Projector *projectors, std::shared_ptr< arrow::Schema > schema, std::vector< std::shared_ptr< arrow::Field > > const &fields)
gandiva::Selection createSelection(std::shared_ptr< arrow::Table > const &table, Filter const &expression)
Function for creating gandiva selection from our internal filter tree.
void updatePlaceholders(Filter &filter, InitContext &context)
Update placeholder nodes from context.
Defining PrimaryVertex explicitly as messageable.
auto spawner(std::shared_ptr< arrow::Table > const &fullTable, const char *name, o2::framework::expressions::Projector *projectors, std::shared_ptr< gandiva::Projector > &projector, std::shared_ptr< arrow::Schema > const &schema)
Expression-based column generator to materialize columns.
std::string serializeSchema(std::shared_ptr< arrow::Schema > schema)
std::string serializeProjectors(std::vector< framework::expressions::Projector > &projectors)
std::shared_ptr< arrow::Table > spawnerHelper(std::shared_ptr< arrow::Table > const &fullTable, std::shared_ptr< arrow::Schema > newSchema, size_t nColumns, expressions::Projector *projectors, const char *name, std::shared_ptr< gandiva::Projector > &projector)
constexpr std::size_t pack_size(pack< Ts... > const &)
template function to determine number of types in a pack
Definition Pack.h:28
constexpr auto transformBase()
This helper struct allows you to declare index tables to be created in a task.
std::string serializeIndexRecords(std::vector< o2::soa::IndexRecord > &irs)
memfun_type< decltype(&F::operator())>::type FFL(F const &func)
auto getTableFromFilter(soa::is_filtered_table auto const &table, soa::SelectionVector &&selection)
std::vector< std::shared_ptr< arrow::Table > > extractSources(ProcessingContext &pc, std::vector< std::string > const &labels)
consteval auto typeWithRef() -> T
Helper to define output for a Table.
OutputObjHandlingPolicy
Policy enum to determine OutputObj handling when writing.
void initializePartitionCaches(std::set< uint32_t > const &hashes, std::shared_ptr< arrow::Schema > const &schema, expressions::Filter const &filter, gandiva::NodePtr &tree, gandiva::FilterPtr &gfilter)
std::shared_ptr< arrow::Table > makeEmptyTableImpl(const char *name, std::shared_ptr< arrow::Schema > &schema)
auto createFieldsFromColumns(framework::pack< C... >)
Definition ASoA.h:58
SelectionVector selectionToVector(gandiva::Selection const &sel)
Definition ASoA.cxx:48
constexpr auto tableRef2InputSpec()
constexpr auto tableRef2Output()
std::vector< int64_t > SelectionVector
Definition ASoA.h:429
auto Attach(T const &table)
constexpr auto tableRef2Schema()
constexpr auto tableRef2ConfigParamSpec()
auto Extend(T const &table)
On-the-fly adding of expression columns.
constexpr auto tableRef2OutputSpec()
constexpr auto tableRef2OutputRef()
@ C
Definition Defs.h:36
Defining DataPointCompositeObject explicitly as copiable.
header::DataDescription description
std::vector< o2::soa::IndexRecord > records
std::vector< std::string > labels
std::vector< framework::ConcreteDataMatcher > matchers
header::DataOrigin origin
std::shared_ptr< std::vector< framework::IndexColumnBuilder > > builders
header::DataHeader::SubSpecificationType version
std::shared_ptr< arrow::Schema > outputSchema
std::shared_ptr< arrow::Table > materialize(ProcessingContext &pc)
std::shared_ptr< T > table
constexpr auto pack()
auto build(std::vector< std::shared_ptr< arrow::Table > > &&tables)
std::vector< soa::IndexRecord > map
std::vector< framework::IndexColumnBuilder > builders
std::shared_ptr< arrow::Schema > outputSchema
typename T::first_t H
T const & operator*() const
metadata::index_pack_t index_pack_t
typename T::rest_t Ts
decltype(transformBase< T >())::metadata metadata
std::shared_ptr< typename T::table_t > table
T::table_t const & operator*() const
decltype(transformBase< T >())::metadata metadata
typename metadata::extension_table_t extension_t
std::array< o2::framework::expressions::Projector, N > projectors
std::shared_ptr< extension_t > extension
std::shared_ptr< arrow::Schema > inputSchema
std::shared_ptr< arrow::Schema > schema
static constexpr bool delayed
static constexpr size_t N
typename metadata::placeholders_pack_t placeholders_pack_t
std::shared_ptr< gandiva::Projector > projector
static OutputSpec const spec()
aod::MetadataTrait< o2::aod::Hash< table_t::ref.desc_hash > >::metadata metadata
decltype(typeWithRef< T >()) table_t
O2 header for OutputObj metadata.
OutputObj(T &&t, OutputObjHandlingPolicy policy_=OutputObjHandlingPolicy::AnalysisObject, OutputObjSourceType sourceType_=OutputObjSourceType::OutputObjSource)
void setObject(std::shared_ptr< T > t)
OutputObjSourceType sourceType
OutputRef ref(uint16_t index, uint16_t max)
std::shared_ptr< T > object
OutputObj(std::string const &label_, OutputObjHandlingPolicy policy_=OutputObjHandlingPolicy::AnalysisObject, OutputObjSourceType sourceType_=OutputObjSourceType::OutputObjSource)
OutputSpec const spec()
OutputObjHandlingPolicy policy
void setHash(uint32_t hash)
Partition(expressions::Node &&filter_, T const &table)
typename o2::soa::Filtered< T >::const_iterator filtered_const_iterator
void bindTable(T const &table)
filtered_const_iterator begin() const
auto rawSliceBy(o2::framework::Preslice< T1 > const &container, int value) const
auto sliceByCached(framework::expressions::BindingNode const &node, int value, o2::framework::SliceCache &cache) const
void intializeCaches(std::set< uint32_t > const &hashes, std::shared_ptr< arrow::Schema > const &schema)
Partition(expressions::Node &&filter_)
typename o2::soa::Filtered< T >::iterator filtered_iterator
o2::soa::Filtered< T > * operator->()
typename o2::soa::Filtered< T >::iterator iterator
auto sliceBy(o2::framework::PresliceBase< T1, Policy, OPT > const &container, int value) const
std::shared_ptr< arrow::Table > asArrowTable() const
o2::soa::RowViewSentinel end() const
void updatePlaceholders(InitContext &context)
std::unique_ptr< o2::soa::Filtered< T > > mFiltered
expressions::Filter filter
o2::soa::RowViewSentinel end()
auto sliceByCachedUnsorted(framework::expressions::BindingNode const &node, int value, o2::framework::SliceCache &cache) const
typename o2::soa::Filtered< T >::const_iterator const_iterator
gandiva::FilterPtr gfilter
void bindExternalIndices(Ts *... tables)
filtered_iterator begin()
header::DataDescription description
std::vector< std::string > labels
std::vector< std::shared_ptr< gandiva::Expression > > expressions
std::shared_ptr< gandiva::Projector > projector
header::DataOrigin origin
std::vector< framework::ConcreteDataMatcher > matchers
header::DataHeader::SubSpecificationType version
std::shared_ptr< arrow::Schema > schema
std::shared_ptr< arrow::Table > materialize(ProcessingContext &pc) const
std::shared_ptr< arrow::Schema > inputSchema
T::table_t * operator->()
std::shared_ptr< gandiva::Projector > projector
std::shared_ptr< extension_t > extension
T::table_t const & operator*() const
static constexpr size_t N
std::shared_ptr< arrow::Schema > schema
typename metadata::expression_pack_t expression_pack_t
std::array< o2::framework::expressions::Projector, N > projectors
decltype(transformBase< T >())::metadata metadata
std::shared_ptr< typename T::table_t > table
typename metadata::extension_table_t extension_t
Helper template for table transformations.
static constexpr auto spec()
static constexpr auto ref()
static constexpr auto output()
int64_t lastIndex()
Last index inserted in the table.
decltype(FFL(std::declval< cursor_t >())) cursor
bool resetCursor(LifetimeHolder< TableBuilder > builder)
void setLabel(const char *label)
void operator()(Ts &&... args)
decltype([]() { if constexpr(soa::is_iterator< T >) { return typename T::parent_t{nullptr} persistent_table_t
decltype(std::declval< TableBuilder >().cursor< persistent_table_t >()) cursor_t
An expression tree node corresponding to a column binding.
A struct, containing the root of the expression tree.
uint32_t SubSpecificationType
Definition DataHeader.h:621
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:33
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables, std::span< const char *const > labels)
Definition ASoA.cxx:72
static void resetBuilders(std::vector< framework::IndexColumnBuilder > &builders, std::vector< std::shared_ptr< arrow::Table > > &&tables)
static std::vector< framework::IndexColumnBuilder > makeBuilders(std::vector< std::shared_ptr< arrow::Table > > &&tables, std::vector< soa::IndexRecord > const &records)
static std::shared_ptr< arrow::Table > materialize(std::vector< framework::IndexColumnBuilder > &builders, std::vector< std::shared_ptr< arrow::Table > > &&tables, std::vector< soa::IndexRecord > const &records, std::shared_ptr< arrow::Schema > const &schema, bool exclusive)
std::shared_ptr< arrow::Field > field() const
auto operator==(IndexRecord const &other) const
framework::ConcreteDataMatcher matcher
constexpr size_t max
VectorOfTObjectPtrs other
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))