Project
Loading...
Searching...
No Matches
AnalysisHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
14#include "IndexJSONHelpers.h"
15
16namespace o2::soa
17{
18std::vector<framework::IndexColumnBuilder> IndexBuilder::makeBuilders(std::vector<std::shared_ptr<arrow::Table>>&& tables, std::vector<soa::IndexRecord> const& records)
19{
20 std::vector<framework::IndexColumnBuilder> builders;
21 builders.reserve(records.size());
22 auto pool = arrow::default_memory_pool();
23 builders.emplace_back(IndexKind::IdxSelf, records[0].pos, pool);
24 if (records[0].pos >= 0) {
25 std::get<framework::SelfBuilder>(builders[0].builder).keyIndex = std::make_unique<framework::ChunkedArrayIterator>(tables[0]->column(records[0].pos));
26 }
27
28 for (auto i = 1U; i < records.size(); ++i) {
29 builders.emplace_back(records[i].kind, records[i].pos, pool, records[i].pos >= 0 ? tables[i]->column(records[i].pos) : nullptr);
30 }
31
32 return builders;
33}
34
35void IndexBuilder::resetBuilders(std::vector<framework::IndexColumnBuilder>& builders, std::vector<std::shared_ptr<arrow::Table>>&& tables)
36{
37 for (auto i = 0U; i < builders.size(); ++i) {
38 builders[i].reset(builders[i].mColumnPos >= 0 ? tables[i]->column(builders[i].mColumnPos) : nullptr);
39 }
40
41 if (builders[0].mColumnPos >= 0) {
42 std::get<framework::SelfBuilder>(builders[0].builder).keyIndex = std::make_unique<framework::ChunkedArrayIterator>(tables[0]->column(builders[0].mColumnPos));
43 }
44}
45
46std::shared_ptr<arrow::Table> IndexBuilder::materialize(std::vector<framework::IndexColumnBuilder>& builders, std::vector<std::shared_ptr<arrow::Table>>&& tables, std::vector<soa::IndexRecord> const& records, std::shared_ptr<arrow::Schema> const& schema, bool exclusive)
47{
48 auto size = tables[0]->num_rows();
49 if (O2_BUILTIN_UNLIKELY(builders.empty())) {
50 builders = makeBuilders(std::move(tables), records);
51 } else {
52 resetBuilders(builders, std::move(tables));
53 }
54
55 for (int64_t counter = 0; counter < size; ++counter) {
56 int64_t idx = -1;
57 if (std::get<framework::SelfBuilder>(builders[0].builder).keyIndex == nullptr) {
58 idx = counter;
59 } else {
60 idx = std::get<framework::SelfBuilder>(builders[0].builder).keyIndex->valueAt(counter);
61 }
62
63 bool found = true;
64 std::ranges::for_each(builders, [&idx, &found](auto& builder) { found &= builder.find(idx); });
65
66 if (!exclusive || found) {
67 builders[0].fill(counter);
68 std::ranges::for_each(builders.begin() + 1, builders.end(), [&idx](auto& builder) { builder.fill(idx); });
69 }
70 }
71
72 std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
73 arrays.reserve(builders.size());
74 std::ranges::transform(builders, std::back_inserter(arrays), [](auto& builder) { return builder.result(); });
75
76 return arrow::Table::Make(schema, arrays);
77}
78} // namespace o2::soa
79
80namespace o2::framework
81{
82std::shared_ptr<arrow::Table> makeEmptyTableImpl(const char* name, std::shared_ptr<arrow::Schema>& schema)
83{
84 schema = schema->WithMetadata(std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{"label"}}, std::vector{std::string{name}}));
85 return arrow::Table::MakeEmpty(schema).ValueOrDie();
86}
87
88std::shared_ptr<arrow::Table> spawnerHelper(std::shared_ptr<arrow::Table> const& fullTable, std::shared_ptr<arrow::Schema> newSchema, size_t nColumns,
90 std::shared_ptr<gandiva::Projector>& projector)
91{
92 if (projector == nullptr) {
93 projector = framework::expressions::createProjectorHelper(nColumns, projectors, fullTable->schema(), newSchema->fields());
94 }
95
96 return spawnerHelper(fullTable, newSchema, name, nColumns, projector);
97}
98
99std::shared_ptr<arrow::Table> spawnerHelper(std::shared_ptr<arrow::Table> const& fullTable, std::shared_ptr<arrow::Schema> newSchema,
100 const char* name, size_t nColumns,
101 std::shared_ptr<gandiva::Projector> const& projector)
102{
103 arrow::TableBatchReader reader(*fullTable);
104 std::shared_ptr<arrow::RecordBatch> batch;
105 arrow::ArrayVector v;
106 std::vector<arrow::ArrayVector> chunks;
107 chunks.resize(nColumns);
108 std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
109
110 while (true) {
111 auto s = reader.ReadNext(&batch);
112 if (!s.ok()) {
113 throw runtime_error_f("Cannot read batches from the source table to spawn %s: %s", name, s.ToString().c_str());
114 }
115 if (batch == nullptr) {
116 break;
117 }
118 try {
119 s = projector->Evaluate(*batch, arrow::default_memory_pool(), &v);
120 if (!s.ok()) {
121 throw runtime_error_f("Cannot apply projector to the source table of %s: %s", name, s.ToString().c_str());
122 }
123 } catch (std::exception& e) {
124 throw runtime_error_f("Cannot apply projector to the source table of %s: exception caught: %s", name, e.what());
125 }
126
127 for (auto i = 0U; i < nColumns; ++i) {
128 chunks[i].emplace_back(v.at(i));
129 }
130 }
131
132 arrays.reserve(nColumns);
133 std::ranges::transform(chunks, std::back_inserter(arrays), [](auto&& chunk) { return std::make_shared<arrow::ChunkedArray>(chunk); });
134
135 return arrow::Table::Make(newSchema, arrays);
136}
137
138void initializePartitionCaches(std::set<uint32_t> const& hashes, std::shared_ptr<arrow::Schema> const& schema, expressions::Filter const& filter, gandiva::NodePtr& tree, gandiva::FilterPtr& gfilter)
139{
140 if (tree == nullptr) {
141 expressions::Operations ops = createOperations(filter);
142 if (isTableCompatible(hashes, ops)) {
143 tree = createExpressionTree(ops, schema);
144 } else {
145 throw std::runtime_error("Partition filter does not match declared table type");
146 }
147 }
148 if (gfilter == nullptr) {
150 }
151}
152
153std::string serializeProjectors(std::vector<framework::expressions::Projector>& projectors)
154{
155 std::stringstream osm;
157 return osm.str();
158}
159
160std::string serializeSchema(std::shared_ptr<arrow::Schema> schema)
161{
162 std::stringstream osm;
164 return osm.str();
165}
166
167std::string serializeIndexRecords(std::vector<o2::soa::IndexRecord>& irs)
168{
169 std::stringstream osm;
170 IndexJSONHelpers::write(osm, irs);
171 return osm.str();
172}
173
174std::vector<std::shared_ptr<arrow::Table>> extractSources(ProcessingContext& pc, std::vector<ConcreteDataMatcher> const& matchers)
175{
176 std::vector<std::shared_ptr<arrow::Table>> tables;
177 tables.reserve(matchers.size());
178 std::ranges::transform(matchers, std::back_inserter(tables), [&pc](auto const& matcher) { return pc.inputs().get<TableConsumer>(matcher)->asArrowTable(); });
179 return tables;
180}
181
182std::shared_ptr<arrow::Table> Spawner::materialize(ProcessingContext& pc) const
183{
184 auto tables = extractSources(pc, matchers);
185 auto fullTable = soa::ArrowHelpers::joinTables(std::move(tables), std::span{labels.begin(), labels.size()});
186 if (fullTable->num_rows() == 0) {
187 return arrow::Table::MakeEmpty(schema).ValueOrDie();
188 }
189
190 return spawnerHelper(fullTable, schema, binding.c_str(), schema->num_fields(), projector);
191}
192
193std::shared_ptr<arrow::Table> Builder::materialize(ProcessingContext& pc)
194{
195 if (builders == nullptr) {
196 builders = std::make_shared<std::vector<framework::IndexColumnBuilder>>();
197 builders->reserve(records.size());
198 }
199 std::shared_ptr<arrow::Table> result;
200 auto tables = extractSources(pc, matchers);
202 return result;
203}
204} // namespace o2::framework
bool exclusive
std::vector< o2::soa::IndexRecord > records
std::vector< framework::ConcreteDataMatcher > matchers
std::vector< expressions::Projector > projectors
#define O2_BUILTIN_UNLIKELY(x)
std::shared_ptr< arrow::Schema > schema
int32_t i
uint16_t pos
Definition RawData.h:3
uint32_t gfilter
Definition RawData.h:6
InputRecord & inputs()
The inputs associated with this processing context.
GLuint64EXT * result
Definition glcorearb.h:5662
GLsizeiptr size
Definition glcorearb.h:659
const GLdouble * v
Definition glcorearb.h:832
GLuint const GLchar * name
Definition glcorearb.h:781
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
const GLuint * arrays
Definition glcorearb.h:1314
GLuint counter
Definition glcorearb.h:3987
std::shared_ptr< gandiva::Filter > FilterPtr
Definition Expressions.h:47
std::shared_ptr< gandiva::Filter > createFilter(gandiva::SchemaPtr const &Schema, gandiva::ConditionPtr condition)
Function to create gandiva filter from gandiva condition.
std::shared_ptr< gandiva::Projector > createProjectorHelper(size_t nColumns, expressions::Projector *projectors, std::shared_ptr< arrow::Schema > schema, std::vector< std::shared_ptr< arrow::Field > > const &fields)
std::vector< ColumnOperationSpec > Operations
gandiva::ConditionPtr makeCondition(gandiva::NodePtr node)
Function to create gandiva condition expression from generic gandiva expression tree.
Defining PrimaryVertex explicitly as messageable.
std::string serializeSchema(std::shared_ptr< arrow::Schema > schema)
std::string serializeProjectors(std::vector< framework::expressions::Projector > &projectors)
std::shared_ptr< arrow::Table > spawnerHelper(std::shared_ptr< arrow::Table > const &fullTable, std::shared_ptr< arrow::Schema > newSchema, size_t nColumns, expressions::Projector *projectors, const char *name, std::shared_ptr< gandiva::Projector > &projector)
std::string serializeIndexRecords(std::vector< o2::soa::IndexRecord > &irs)
RuntimeErrorRef runtime_error_f(const char *,...)
std::vector< std::shared_ptr< arrow::Table > > extractSources(ProcessingContext &pc, std::vector< std::string > const &labels)
void initializePartitionCaches(std::set< uint32_t > const &hashes, std::shared_ptr< arrow::Schema > const &schema, expressions::Filter const &filter, gandiva::NodePtr &tree, gandiva::FilterPtr &gfilter)
std::shared_ptr< arrow::Table > makeEmptyTableImpl(const char *name, std::shared_ptr< arrow::Schema > &schema)
static void write(std::ostream &o, std::shared_ptr< arrow::Schema > &schema)
std::vector< o2::soa::IndexRecord > records
std::vector< framework::ConcreteDataMatcher > matchers
std::shared_ptr< std::vector< framework::IndexColumnBuilder > > builders
std::shared_ptr< arrow::Schema > outputSchema
std::shared_ptr< arrow::Table > materialize(ProcessingContext &pc)
static void write(std::ostream &o, std::vector< expressions::Projector > &projectors)
static void write(std::ostream &o, std::vector< o2::soa::IndexRecord > &irs)
std::vector< std::string > labels
std::shared_ptr< gandiva::Projector > projector
std::vector< framework::ConcreteDataMatcher > matchers
std::shared_ptr< arrow::Schema > schema
std::shared_ptr< arrow::Table > materialize(ProcessingContext &pc) const
A struct, containing the root of the expression tree.
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables)
Definition ASoA.cxx:72
static void resetBuilders(std::vector< framework::IndexColumnBuilder > &builders, std::vector< std::shared_ptr< arrow::Table > > &&tables)
static std::vector< framework::IndexColumnBuilder > makeBuilders(std::vector< std::shared_ptr< arrow::Table > > &&tables, std::vector< soa::IndexRecord > const &records)
static std::shared_ptr< arrow::Table > materialize(std::vector< framework::IndexColumnBuilder > &builders, std::vector< std::shared_ptr< arrow::Table > > &&tables, std::vector< soa::IndexRecord > const &records, std::shared_ptr< arrow::Schema > const &schema, bool exclusive)
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))