Project
Loading...
Searching...
No Matches
AnalysisHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
12#include <regex>
15#include "IndexJSONHelpers.h"
16
17namespace o2::soa
18{
19std::vector<framework::IndexColumnBuilder> IndexBuilder::makeBuilders(std::vector<std::shared_ptr<arrow::Table>>&& tables, std::vector<soa::IndexRecord> const& records)
20{
21 std::vector<framework::IndexColumnBuilder> builders;
22 builders.reserve(records.size());
23 auto pool = arrow::default_memory_pool();
24 builders.emplace_back(IndexKind::IdxSelf, records[0].pos, pool);
25 if (records[0].pos >= 0) {
26 std::get<framework::SelfBuilder>(builders[0].builder).keyIndex = std::make_unique<framework::ChunkedArrayIterator>(tables[0]->column(records[0].pos));
27 }
28
29 for (auto i = 1U; i < records.size(); ++i) {
30 builders.emplace_back(records[i].kind, records[i].pos, pool, records[i].pos >= 0 ? tables[i]->column(records[i].pos) : nullptr);
31 }
32
33 return builders;
34}
35
36void IndexBuilder::resetBuilders(std::vector<framework::IndexColumnBuilder>& builders, std::vector<std::shared_ptr<arrow::Table>>&& tables)
37{
38 for (auto i = 0U; i < builders.size(); ++i) {
39 builders[i].reset(builders[i].mColumnPos >= 0 ? tables[i]->column(builders[i].mColumnPos) : nullptr);
40 }
41
42 if (builders[0].mColumnPos >= 0) {
43 std::get<framework::SelfBuilder>(builders[0].builder).keyIndex = std::make_unique<framework::ChunkedArrayIterator>(tables[0]->column(builders[0].mColumnPos));
44 }
45}
46
47std::shared_ptr<arrow::Table> IndexBuilder::materialize(std::vector<framework::IndexColumnBuilder>& builders, std::vector<std::shared_ptr<arrow::Table>>&& tables, std::vector<soa::IndexRecord> const& records, std::shared_ptr<arrow::Schema> const& schema, bool exclusive)
48{
49 auto size = tables[0]->num_rows();
50 if (O2_BUILTIN_UNLIKELY(builders.empty())) {
51 builders = makeBuilders(std::move(tables), records);
52 } else {
53 resetBuilders(builders, std::move(tables));
54 }
55
56 for (int64_t counter = 0; counter < size; ++counter) {
57 int64_t idx = -1;
58 if (std::get<framework::SelfBuilder>(builders[0].builder).keyIndex == nullptr) {
59 idx = counter;
60 } else {
61 idx = std::get<framework::SelfBuilder>(builders[0].builder).keyIndex->valueAt(counter);
62 }
63
64 bool found = true;
65 std::ranges::for_each(builders, [&idx, &found](auto& builder) { found &= builder.find(idx); });
66
67 if (!exclusive || found) {
68 builders[0].fill(counter);
69 std::ranges::for_each(builders.begin() + 1, builders.end(), [&idx](auto& builder) { builder.fill(idx); });
70 }
71 }
72
73 std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
74 arrays.reserve(builders.size());
75 std::ranges::transform(builders, std::back_inserter(arrays), [](auto& builder) { return builder.result(); });
76
77 return arrow::Table::Make(schema, arrays);
78}
79} // namespace o2::soa
80
81namespace o2::framework
82{
83void wrongOriginReplacement(std::string_view replacement)
84{
85 throw framework::runtime_error_f("Provided origin replacement string is longer than 4 symbols: %s", replacement.data());
86}
87
88ConfigParamSpec replaceOrigin(ConfigParamSpec& source, std::string const& originStr)
89{
90 if (!source.name.starts_with("input:")) {
91 return source;
92 }
93 source.defaultValue = std::regex_replace(source.defaultValue.get<std::string>(), std::regex{"/AOD/"}, "/" + originStr + "/");
94 return source;
95}
96
98{
99 return ConcreteDataMatcher{newOrigin, matcher.description, matcher.subSpec};
100}
101
102std::shared_ptr<arrow::Table> makeEmptyTableImpl(const char* name, std::shared_ptr<arrow::Schema>& schema)
103{
104 schema = schema->WithMetadata(std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{"label"}}, std::vector{std::string{name}}));
105 return arrow::Table::MakeEmpty(schema).ValueOrDie();
106}
107
108std::shared_ptr<arrow::Table> spawnerHelper(std::shared_ptr<arrow::Table> const& fullTable, std::shared_ptr<arrow::Schema> newSchema, size_t nColumns,
110 std::shared_ptr<gandiva::Projector>& projector)
111{
112 if (projector == nullptr) {
113 projector = framework::expressions::createProjectorHelper(nColumns, projectors, fullTable->schema(), newSchema->fields());
114 }
115
116 return spawnerHelper(fullTable, newSchema, name, nColumns, projector);
117}
118
119std::shared_ptr<arrow::Table> spawnerHelper(std::shared_ptr<arrow::Table> const& fullTable, std::shared_ptr<arrow::Schema> newSchema,
120 const char* name, size_t nColumns,
121 std::shared_ptr<gandiva::Projector> const& projector)
122{
123 arrow::TableBatchReader reader(*fullTable);
124 std::shared_ptr<arrow::RecordBatch> batch;
125 arrow::ArrayVector v;
126 std::vector<arrow::ArrayVector> chunks;
127 chunks.resize(nColumns);
128 std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
129
130 while (true) {
131 auto s = reader.ReadNext(&batch);
132 if (!s.ok()) {
133 throw runtime_error_f("Cannot read batches from the source table to spawn %s: %s", name, s.ToString().c_str());
134 }
135 if (batch == nullptr) {
136 break;
137 }
138 try {
139 s = projector->Evaluate(*batch, arrow::default_memory_pool(), &v);
140 if (!s.ok()) {
141 throw runtime_error_f("Cannot apply projector to the source table of %s: %s", name, s.ToString().c_str());
142 }
143 } catch (std::exception& e) {
144 throw runtime_error_f("Cannot apply projector to the source table of %s: exception caught: %s", name, e.what());
145 }
146
147 for (auto i = 0U; i < nColumns; ++i) {
148 chunks[i].emplace_back(v.at(i));
149 }
150 }
151
152 arrays.reserve(nColumns);
153 std::ranges::transform(chunks, std::back_inserter(arrays), [](auto&& chunk) { return std::make_shared<arrow::ChunkedArray>(chunk); });
154
155 return arrow::Table::Make(newSchema, arrays);
156}
157
158void initializePartitionCaches(std::set<uint32_t> const& hashes, std::shared_ptr<arrow::Schema> const& schema, expressions::Filter const& filter, gandiva::NodePtr& tree, gandiva::FilterPtr& gfilter)
159{
160 if (tree == nullptr) {
161 expressions::Operations ops = createOperations(filter);
162 if (isTableCompatible(hashes, ops)) {
163 tree = createExpressionTree(ops, schema);
164 } else {
165 throw std::runtime_error("Partition filter does not match declared table type");
166 }
167 }
168 if (gfilter == nullptr) {
170 }
171}
172
173std::string serializeProjectors(std::vector<framework::expressions::Projector>& projectors)
174{
175 std::stringstream osm;
177 return osm.str();
178}
179
180std::string serializeSchema(std::shared_ptr<arrow::Schema> schema)
181{
182 std::stringstream osm;
184 return osm.str();
185}
186
187std::string serializeIndexRecords(std::vector<o2::soa::IndexRecord>& irs)
188{
189 std::stringstream osm;
190 IndexJSONHelpers::write(osm, irs);
191 return osm.str();
192}
193
194std::vector<std::shared_ptr<arrow::Table>> extractSources(ProcessingContext& pc, std::vector<ConcreteDataMatcher> const& matchers)
195{
196 std::vector<std::shared_ptr<arrow::Table>> tables;
197 tables.reserve(matchers.size());
198 std::ranges::transform(matchers, std::back_inserter(tables), [&pc](auto const& matcher) { return pc.inputs().get<TableConsumer>(matcher)->asArrowTable(); });
199 return tables;
200}
201
202std::shared_ptr<arrow::Table> Spawner::materialize(ProcessingContext& pc) const
203{
204 auto tables = extractSources(pc, matchers);
205 auto fullTable = soa::ArrowHelpers::joinTables(std::move(tables), std::span{labels.begin(), labels.size()});
206 if (fullTable->num_rows() == 0) {
207 return arrow::Table::MakeEmpty(schema).ValueOrDie();
208 }
209
210 return spawnerHelper(fullTable, schema, binding.c_str(), schema->num_fields(), projector);
211}
212
213std::shared_ptr<arrow::Table> Builder::materialize(ProcessingContext& pc)
214{
215 if (builders == nullptr) {
216 builders = std::make_shared<std::vector<framework::IndexColumnBuilder>>();
217 builders->reserve(records.size());
218 }
219 std::shared_ptr<arrow::Table> result;
220 auto tables = extractSources(pc, matchers);
222 return result;
223}
224
225} // namespace o2::framework
bool exclusive
std::vector< o2::soa::IndexRecord > records
std::vector< framework::ConcreteDataMatcher > matchers
std::vector< expressions::Projector > projectors
#define O2_BUILTIN_UNLIKELY(x)
std::shared_ptr< arrow::Schema > schema
int32_t i
uint16_t pos
Definition RawData.h:3
uint32_t gfilter
Definition RawData.h:6
InputRecord & inputs()
The inputs associated with this processing context.
GLuint64EXT * result
Definition glcorearb.h:5662
GLsizeiptr size
Definition glcorearb.h:659
const GLdouble * v
Definition glcorearb.h:832
GLuint const GLchar * name
Definition glcorearb.h:781
GLsizei GLsizei GLchar * source
Definition glcorearb.h:798
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
const GLuint * arrays
Definition glcorearb.h:1314
GLuint counter
Definition glcorearb.h:3987
std::shared_ptr< gandiva::Filter > FilterPtr
Definition Expressions.h:47
std::shared_ptr< gandiva::Filter > createFilter(gandiva::SchemaPtr const &Schema, gandiva::ConditionPtr condition)
Function to create gandiva filter from gandiva condition.
std::shared_ptr< gandiva::Projector > createProjectorHelper(size_t nColumns, expressions::Projector *projectors, std::shared_ptr< arrow::Schema > schema, std::vector< std::shared_ptr< arrow::Field > > const &fields)
std::vector< ColumnOperationSpec > Operations
gandiva::ConditionPtr makeCondition(gandiva::NodePtr node)
Function to create gandiva condition expression from generic gandiva expression tree.
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
std::string serializeSchema(std::shared_ptr< arrow::Schema > schema)
std::string serializeProjectors(std::vector< framework::expressions::Projector > &projectors)
std::shared_ptr< arrow::Table > spawnerHelper(std::shared_ptr< arrow::Table > const &fullTable, std::shared_ptr< arrow::Schema > newSchema, size_t nColumns, expressions::Projector *projectors, const char *name, std::shared_ptr< gandiva::Projector > &projector)
std::string serializeIndexRecords(std::vector< o2::soa::IndexRecord > &irs)
ConfigParamSpec replaceOrigin(ConfigParamSpec &source, std::string const &originStr)
RuntimeErrorRef runtime_error_f(const char *,...)
void wrongOriginReplacement(std::string_view replacement)
std::vector< std::shared_ptr< arrow::Table > > extractSources(ProcessingContext &pc, std::vector< std::string > const &labels)
void initializePartitionCaches(std::set< uint32_t > const &hashes, std::shared_ptr< arrow::Schema > const &schema, expressions::Filter const &filter, gandiva::NodePtr &tree, gandiva::FilterPtr &gfilter)
std::shared_ptr< arrow::Table > makeEmptyTableImpl(const char *name, std::shared_ptr< arrow::Schema > &schema)
static void write(std::ostream &o, std::shared_ptr< arrow::Schema > &schema)
std::vector< o2::soa::IndexRecord > records
std::vector< framework::ConcreteDataMatcher > matchers
std::shared_ptr< std::vector< framework::IndexColumnBuilder > > builders
std::shared_ptr< arrow::Schema > outputSchema
std::shared_ptr< arrow::Table > materialize(ProcessingContext &pc)
header::DataHeader::SubSpecificationType subSpec
static void write(std::ostream &o, std::vector< expressions::Projector > &projectors)
static void write(std::ostream &o, std::vector< o2::soa::IndexRecord > &irs)
std::vector< std::string > labels
std::shared_ptr< gandiva::Projector > projector
std::vector< framework::ConcreteDataMatcher > matchers
std::shared_ptr< arrow::Schema > schema
std::shared_ptr< arrow::Table > materialize(ProcessingContext &pc) const
A struct, containing the root of the expression tree.
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables)
Definition ASoA.cxx:72
static void resetBuilders(std::vector< framework::IndexColumnBuilder > &builders, std::vector< std::shared_ptr< arrow::Table > > &&tables)
static std::vector< framework::IndexColumnBuilder > makeBuilders(std::vector< std::shared_ptr< arrow::Table > > &&tables, std::vector< soa::IndexRecord > const &records)
static std::shared_ptr< arrow::Table > materialize(std::vector< framework::IndexColumnBuilder > &builders, std::vector< std::shared_ptr< arrow::Table > > &&tables, std::vector< soa::IndexRecord > const &records, std::shared_ptr< arrow::Schema > const &schema, bool exclusive)
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))