Project
Loading...
Searching...
No Matches
TableBuilder.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13#include <memory>
14#if defined(__GNUC__)
15#pragma GCC diagnostic push
16#pragma GCC diagnostic ignored "-Wshadow"
17#endif
18#include <arrow/builder.h>
19#include <arrow/memory_pool.h>
20#include <arrow/record_batch.h>
21#include <arrow/status.h>
22#include <arrow/table.h>
23#include <arrow/type_traits.h>
24#include <arrow/util/key_value_metadata.h>
25#if defined(__GNUC__)
26#pragma GCC diagnostic pop
27#endif
28
29using namespace arrow;
30
31namespace
32{
33
34// FIXME: Dummy schema, to compile.
35template <typename TYPE, typename C_TYPE>
36void ArrayFromVector(const std::vector<C_TYPE>& values, std::shared_ptr<arrow::Array>* out)
37{
38 typename arrow::TypeTraits<TYPE>::BuilderType builder;
39 for (size_t i = 0; i < values.size(); ++i) {
40 auto status = builder.Append(values[i]);
41 assert(status.ok());
42 }
43 auto status = builder.Finish(out);
44 assert(status.ok());
45}
46
47} // namespace
48
49namespace o2::framework
50{
51void addLabelToSchema(std::shared_ptr<arrow::Schema>& schema, const char* label)
52{
53 schema = schema->WithMetadata(
54 std::make_shared<arrow::KeyValueMetadata>(
55 std::vector{std::string{"label"}},
56 std::vector{std::string{label}}));
57}
58
59std::shared_ptr<arrow::Table>
61{
62 bool status = mFinalizer(mArrays, mHolders);
63 if (!status) {
64 throwError(runtime_error("Unable to finalize"));
65 }
66 assert(mSchema->num_fields() > 0 && "Schema needs to be non-empty");
67 return arrow::Table::Make(mSchema, mArrays);
68}
69
70void TableBuilder::throwError(RuntimeErrorRef const& ref)
71{
72 throw ref;
73}
74
75void TableBuilder::validate() const
76{
77 if (mHolders != nullptr) {
78 throwError(runtime_error("TableBuilder::persist can only be invoked once per instance"));
79 }
80}
81
83{
84 mSchema = mSchema->WithMetadata(std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{"label"}}, std::vector{std::string{label}}));
85}
86
87std::shared_ptr<arrow::Table> spawnerHelper(std::shared_ptr<arrow::Table> const& fullTable, std::shared_ptr<arrow::Schema> newSchema, size_t nColumns,
88 expressions::Projector* projectors, const char* name,
89 std::shared_ptr<gandiva::Projector>& projector)
90{
91 if (projector == nullptr) {
92 projector = framework::expressions::createProjectorHelper(nColumns, projectors, fullTable->schema(), newSchema->fields());
93 }
94
95 arrow::TableBatchReader reader(*fullTable);
96 std::shared_ptr<arrow::RecordBatch> batch;
97 arrow::ArrayVector v;
98 std::vector<arrow::ArrayVector> chunks;
99 chunks.resize(nColumns);
100 std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
101
102 while (true) {
103 auto s = reader.ReadNext(&batch);
104 if (!s.ok()) {
105 throw runtime_error_f("Cannot read batches from source table to spawn %s: %s", name, s.ToString().c_str());
106 }
107 if (batch == nullptr) {
108 break;
109 }
110 try {
111 s = projector->Evaluate(*batch, arrow::default_memory_pool(), &v);
112 if (!s.ok()) {
113 throw runtime_error_f("Cannot apply projector to source table of %s: %s", name, s.ToString().c_str());
114 }
115 } catch (std::exception& e) {
116 throw runtime_error_f("Cannot apply projector to source table of %s: exception caught: %s", name, e.what());
117 }
118
119 for (auto i = 0U; i < nColumns; ++i) {
120 chunks[i].emplace_back(v.at(i));
121 }
122 }
123
124 arrays.reserve(nColumns);
125 for (auto i = 0U; i < nColumns; ++i) {
126 arrays.push_back(std::make_shared<arrow::ChunkedArray>(chunks[i]));
127 }
128
129 addLabelToSchema(newSchema, name);
130 return arrow::Table::Make(newSchema, arrays);
131}
132
133} // namespace o2::framework
134
135template class arrow::NumericBuilder<arrow::UInt8Type>;
136template class arrow::NumericBuilder<arrow::UInt32Type>;
137template class arrow::NumericBuilder<arrow::FloatType>;
138template class arrow::NumericBuilder<arrow::Int32Type>;
139template class arrow::NumericBuilder<arrow::Int8Type>;
int32_t i
void setLabel(const char *label)
std::shared_ptr< arrow::Table > finalize()
const GLdouble * v
Definition glcorearb.h:832
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum GLsizei GLsizei GLint * values
Definition glcorearb.h:1576
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
const GLuint * arrays
Definition glcorearb.h:1314
GLint ref
Definition glcorearb.h:291
std::shared_ptr< gandiva::Projector > createProjectorHelper(size_t nColumns, expressions::Projector *projectors, std::shared_ptr< arrow::Schema > schema, std::vector< std::shared_ptr< arrow::Field > > const &fields)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
void addLabelToSchema(std::shared_ptr< arrow::Schema > &schema, const char *label)
RuntimeErrorRef runtime_error(const char *)
std::shared_ptr< arrow::Table > spawnerHelper(std::shared_ptr< arrow::Table > const &fullTable, std::shared_ptr< arrow::Schema > newSchema, size_t nColumns, expressions::Projector *projectors, const char *name, std::shared_ptr< gandiva::Projector > &projector)
RuntimeErrorRef runtime_error_f(const char *,...)
A struct, containing the root of the expression tree.