Project
Loading...
Searching...
No Matches
AnalysisHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
14#include "IndexJSONHelpers.h"
15
16namespace o2::soa
17{
18std::vector<framework::IndexColumnBuilder> IndexBuilder::makeBuilders(std::vector<std::shared_ptr<arrow::Table>>&& tables, std::vector<soa::IndexRecord> const& records)
19{
20 std::vector<framework::IndexColumnBuilder> builders;
21 builders.reserve(records.size());
22 auto pool = arrow::default_memory_pool();
23 builders.emplace_back(IndexKind::IdxSelf, records[0].pos, pool);
24 if (records[0].pos >= 0) {
25 std::get<framework::SelfBuilder>(builders[0].builder).keyIndex = std::make_unique<framework::ChunkedArrayIterator>(tables[0]->column(records[0].pos));
26 }
27
28 for (auto i = 1U; i < records.size(); ++i) {
29 builders.emplace_back(records[i].kind, records[i].pos, pool, records[i].pos >= 0 ? tables[i]->column(records[i].pos) : nullptr);
30 }
31
32 return builders;
33}
34
35void IndexBuilder::resetBuilders(std::vector<framework::IndexColumnBuilder>& builders, std::vector<std::shared_ptr<arrow::Table>>&& tables)
36{
37 for (auto i = 0U; i < builders.size(); ++i) {
38 builders[i].reset(builders[i].mColumnPos >= 0 ? tables[i]->column(builders[i].mColumnPos) : nullptr);
39 }
40
41 if (builders[0].mColumnPos >= 0) {
42 std::get<framework::SelfBuilder>(builders[0].builder).keyIndex = std::make_unique<framework::ChunkedArrayIterator>(tables[0]->column(builders[0].mColumnPos));
43 }
44}
45
46std::shared_ptr<arrow::Table> IndexBuilder::materialize(std::vector<framework::IndexColumnBuilder>& builders, std::vector<std::shared_ptr<arrow::Table>>&& tables, std::vector<soa::IndexRecord> const& records, std::shared_ptr<arrow::Schema> const& schema, bool exclusive)
47{
48 auto size = tables[0]->num_rows();
49 if (builders.empty()) {
50 builders = makeBuilders(std::move(tables), records);
51 } else {
52 resetBuilders(builders, std::move(tables));
53 }
54
55 std::vector<bool> finds;
56 finds.resize(builders.size());
57 for (int64_t counter = 0; counter < size; ++counter) {
58 int64_t idx = -1;
59 if (std::get<framework::SelfBuilder>(builders[0].builder).keyIndex == nullptr) {
60 idx = counter;
61 } else {
62 idx = std::get<framework::SelfBuilder>(builders[0].builder).keyIndex->valueAt(counter);
63 }
64 for (auto i = 0U; i < builders.size(); ++i) {
65 finds[i] = builders[i].find(idx);
66 }
67 if (exclusive) {
68 if (std::none_of(finds.begin(), finds.end(), [](bool const x) { return x == false; })) {
69 builders[0].fill(counter);
70 for (auto i = 1U; i < builders.size(); ++i) {
71 builders[i].fill(idx);
72 }
73 }
74 } else {
75 builders[0].fill(counter);
76 for (auto i = 1U; i < builders.size(); ++i) {
77 builders[i].fill(idx);
78 }
79 }
80 }
81
82 std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
83 arrays.reserve(builders.size());
84 for (auto& builder : builders) {
85 arrays.push_back(builder.result());
86 }
87
88 return arrow::Table::Make(schema, arrays);
89}
90} // namespace o2::soa
91
92namespace o2::framework
93{
94std::shared_ptr<arrow::Table> makeEmptyTableImpl(const char* name, std::shared_ptr<arrow::Schema>& schema)
95{
96 schema = schema->WithMetadata(std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{"label"}}, std::vector{std::string{name}}));
97 return arrow::Table::MakeEmpty(schema).ValueOrDie();
98}
99
100std::shared_ptr<arrow::Table> spawnerHelper(std::shared_ptr<arrow::Table> const& fullTable, std::shared_ptr<arrow::Schema> newSchema, size_t nColumns,
102 std::shared_ptr<gandiva::Projector>& projector)
103{
104 if (projector == nullptr) {
105 projector = framework::expressions::createProjectorHelper(nColumns, projectors, fullTable->schema(), newSchema->fields());
106 }
107
108 return spawnerHelper(fullTable, newSchema, name, nColumns, projector);
109}
110
111std::shared_ptr<arrow::Table> spawnerHelper(std::shared_ptr<arrow::Table> const& fullTable, std::shared_ptr<arrow::Schema> newSchema,
112 const char* name, size_t nColumns,
113 std::shared_ptr<gandiva::Projector> const& projector)
114{
115 arrow::TableBatchReader reader(*fullTable);
116 std::shared_ptr<arrow::RecordBatch> batch;
117 arrow::ArrayVector v;
118 std::vector<arrow::ArrayVector> chunks;
119 chunks.resize(nColumns);
120 std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
121
122 while (true) {
123 auto s = reader.ReadNext(&batch);
124 if (!s.ok()) {
125 throw runtime_error_f("Cannot read batches from the source table to spawn %s: %s", name, s.ToString().c_str());
126 }
127 if (batch == nullptr) {
128 break;
129 }
130 try {
131 s = projector->Evaluate(*batch, arrow::default_memory_pool(), &v);
132 if (!s.ok()) {
133 throw runtime_error_f("Cannot apply projector to the source table of %s: %s", name, s.ToString().c_str());
134 }
135 } catch (std::exception& e) {
136 throw runtime_error_f("Cannot apply projector to the source table of %s: exception caught: %s", name, e.what());
137 }
138
139 for (auto i = 0U; i < nColumns; ++i) {
140 chunks[i].emplace_back(v.at(i));
141 }
142 }
143
144 arrays.reserve(nColumns);
145 for (auto i = 0U; i < nColumns; ++i) {
146 arrays.push_back(std::make_shared<arrow::ChunkedArray>(chunks[i]));
147 }
148
149 return arrow::Table::Make(newSchema, arrays);
150}
151
152void initializePartitionCaches(std::set<uint32_t> const& hashes, std::shared_ptr<arrow::Schema> const& schema, expressions::Filter const& filter, gandiva::NodePtr& tree, gandiva::FilterPtr& gfilter)
153{
154 if (tree == nullptr) {
155 expressions::Operations ops = createOperations(filter);
156 if (isTableCompatible(hashes, ops)) {
157 tree = createExpressionTree(ops, schema);
158 } else {
159 throw std::runtime_error("Partition filter does not match declared table type");
160 }
161 }
162 if (gfilter == nullptr) {
164 }
165}
166
167std::string serializeProjectors(std::vector<framework::expressions::Projector>& projectors)
168{
169 std::stringstream osm;
171 return osm.str();
172}
173
174std::string serializeSchema(std::shared_ptr<arrow::Schema> schema)
175{
176 std::stringstream osm;
178 return osm.str();
179}
180
181std::string serializeIndexRecords(std::vector<o2::soa::IndexRecord>& irs)
182{
183 std::stringstream osm;
184 IndexJSONHelpers::write(osm, irs);
185 return osm.str();
186}
187
188std::vector<std::shared_ptr<arrow::Table>> extractSources(ProcessingContext& pc, std::vector<std::string> const& labels)
189{
190 std::vector<std::shared_ptr<arrow::Table>> tables;
191 for (auto const& label : labels) {
192 tables.emplace_back(pc.inputs().get<TableConsumer>(label.c_str())->asArrowTable());
193 }
194 return tables;
195}
196
197std::shared_ptr<arrow::Table> Spawner::materialize(ProcessingContext& pc) const
198{
199 auto tables = extractSources(pc, labels);
200 auto fullTable = soa::ArrowHelpers::joinTables(std::move(tables), std::span{labels.begin(), labels.size()});
201 if (fullTable->num_rows() == 0) {
202 return arrow::Table::MakeEmpty(schema).ValueOrDie();
203 }
204
205 return spawnerHelper(fullTable, schema, binding.c_str(), schema->num_fields(), projector);
206}
207
208std::shared_ptr<arrow::Table> Builder::materialize(ProcessingContext& pc)
209{
210 if (builders == nullptr) {
211 builders = std::make_shared<std::vector<framework::IndexColumnBuilder>>();
212 builders->reserve(records.size());
213 }
214 std::shared_ptr<arrow::Table> result;
215 auto tables = extractSources(pc, labels);
217 return result;
218}
219} // namespace o2::framework
bool exclusive
std::vector< o2::soa::IndexRecord > records
std::vector< expressions::Projector > projectors
std::vector< std::string > labels
std::shared_ptr< arrow::Schema > schema
int32_t i
uint16_t pos
Definition RawData.h:3
uint32_t gfilter
Definition RawData.h:6
decltype(auto) get(R binding, int part=0) const
InputRecord & inputs()
The inputs associated with this processing context.
GLint GLenum GLint x
Definition glcorearb.h:403
GLuint64EXT * result
Definition glcorearb.h:5662
GLsizeiptr size
Definition glcorearb.h:659
const GLdouble * v
Definition glcorearb.h:832
GLuint const GLchar * name
Definition glcorearb.h:781
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
const GLuint * arrays
Definition glcorearb.h:1314
GLuint counter
Definition glcorearb.h:3987
std::shared_ptr< gandiva::Filter > FilterPtr
Definition Expressions.h:47
std::shared_ptr< gandiva::Filter > createFilter(gandiva::SchemaPtr const &Schema, gandiva::ConditionPtr condition)
Function to create gandiva filter from gandiva condition.
std::shared_ptr< gandiva::Projector > createProjectorHelper(size_t nColumns, expressions::Projector *projectors, std::shared_ptr< arrow::Schema > schema, std::vector< std::shared_ptr< arrow::Field > > const &fields)
std::vector< ColumnOperationSpec > Operations
gandiva::ConditionPtr makeCondition(gandiva::NodePtr node)
Function to create gandiva condition expression from generic gandiva expression tree.
Defining PrimaryVertex explicitly as messageable.
std::string serializeSchema(std::shared_ptr< arrow::Schema > schema)
std::string serializeProjectors(std::vector< framework::expressions::Projector > &projectors)
std::shared_ptr< arrow::Table > spawnerHelper(std::shared_ptr< arrow::Table > const &fullTable, std::shared_ptr< arrow::Schema > newSchema, size_t nColumns, expressions::Projector *projectors, const char *name, std::shared_ptr< gandiva::Projector > &projector)
std::string serializeIndexRecords(std::vector< o2::soa::IndexRecord > &irs)
RuntimeErrorRef runtime_error_f(const char *,...)
std::vector< std::shared_ptr< arrow::Table > > extractSources(ProcessingContext &pc, std::vector< std::string > const &labels)
void initializePartitionCaches(std::set< uint32_t > const &hashes, std::shared_ptr< arrow::Schema > const &schema, expressions::Filter const &filter, gandiva::NodePtr &tree, gandiva::FilterPtr &gfilter)
std::shared_ptr< arrow::Table > makeEmptyTableImpl(const char *name, std::shared_ptr< arrow::Schema > &schema)
static void write(std::ostream &o, std::shared_ptr< arrow::Schema > &schema)
std::vector< o2::soa::IndexRecord > records
std::vector< std::string > labels
std::shared_ptr< std::vector< framework::IndexColumnBuilder > > builders
std::shared_ptr< arrow::Schema > outputSchema
std::shared_ptr< arrow::Table > materialize(ProcessingContext &pc)
static void write(std::ostream &o, std::vector< expressions::Projector > &projectors)
static void write(std::ostream &o, std::vector< o2::soa::IndexRecord > &irs)
std::vector< std::string > labels
std::shared_ptr< gandiva::Projector > projector
std::shared_ptr< arrow::Schema > schema
std::shared_ptr< arrow::Table > materialize(ProcessingContext &pc) const
A struct, containing the root of the expression tree.
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables, std::span< const char *const > labels)
Definition ASoA.cxx:72
static void resetBuilders(std::vector< framework::IndexColumnBuilder > &builders, std::vector< std::shared_ptr< arrow::Table > > &&tables)
static std::vector< framework::IndexColumnBuilder > makeBuilders(std::vector< std::shared_ptr< arrow::Table > > &&tables, std::vector< soa::IndexRecord > const &records)
static std::shared_ptr< arrow::Table > materialize(std::vector< framework::IndexColumnBuilder > &builders, std::vector< std::shared_ptr< arrow::Table > > &&tables, std::vector< soa::IndexRecord > const &records, std::shared_ptr< arrow::Schema > const &schema, bool exclusive)
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))