Project
Loading...
Searching...
No Matches
TableBuilder.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13#include <memory>
14#if defined(__GNUC__)
15#pragma GCC diagnostic push
16#pragma GCC diagnostic ignored "-Wshadow"
17#endif
18#include <arrow/builder.h>
19#include <arrow/memory_pool.h>
20#include <arrow/record_batch.h>
21#include <arrow/status.h>
22#include <arrow/table.h>
23#include <arrow/type_traits.h>
24#include <arrow/util/key_value_metadata.h>
25#if defined(__GNUC__)
26#pragma GCC diagnostic pop
27#endif
28
29using namespace arrow;
30
31namespace
32{
33
34// FIXME: Dummy schema, to compile.
35template <typename TYPE, typename C_TYPE>
36void ArrayFromVector(const std::vector<C_TYPE>& values, std::shared_ptr<arrow::Array>* out)
37{
38 typename arrow::TypeTraits<TYPE>::BuilderType builder;
39 for (size_t i = 0; i < values.size(); ++i) {
40 auto status = builder.Append(values[i]);
41 assert(status.ok());
42 }
43 auto status = builder.Finish(out);
44 assert(status.ok());
45}
46
47} // namespace
48
49namespace o2::framework
50{
51void addLabelToSchema(std::shared_ptr<arrow::Schema>& schema, const char* label)
52{
53 schema = schema->WithMetadata(
54 std::make_shared<arrow::KeyValueMetadata>(
55 std::vector{std::string{"label"}},
56 std::vector{std::string{label}}));
57}
58
59std::shared_ptr<arrow::Table>
61{
62 bool status = mFinalizer(mArrays, mHolders);
63 if (!status) {
64 throwError(runtime_error("Unable to finalize"));
65 }
66 assert(mSchema->num_fields() > 0 && "Schema needs to be non-empty");
67 return arrow::Table::Make(mSchema, mArrays);
68}
69
70void TableBuilder::throwError(RuntimeErrorRef const& ref)
71{
72 throw ref;
73}
74
75void TableBuilder::validate() const
76{
77 if (mHolders != nullptr) {
78 throwError(runtime_error("TableBuilder::persist can only be invoked once per instance"));
79 }
80}
81
83{
84 mSchema = mSchema->WithMetadata(std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{"label"}}, std::vector{std::string{label}}));
85}
86
87std::shared_ptr<arrow::Table> spawnerHelper(std::shared_ptr<arrow::Table> const& fullTable, std::shared_ptr<arrow::Schema> newSchema, size_t nColumns,
88 expressions::Projector* projectors, std::vector<std::shared_ptr<arrow::Field>> const& fields, const char* name)
89{
90 auto mergedProjectors = framework::expressions::createProjectorHelper(nColumns, projectors, fullTable->schema(), fields);
91
92 arrow::TableBatchReader reader(*fullTable);
93 std::shared_ptr<arrow::RecordBatch> batch;
94 arrow::ArrayVector v;
95 std::vector<arrow::ArrayVector> chunks;
96 chunks.resize(nColumns);
97 std::vector<std::shared_ptr<arrow::ChunkedArray>> arrays;
98
99 while (true) {
100 auto s = reader.ReadNext(&batch);
101 if (!s.ok()) {
102 throw runtime_error_f("Cannot read batches from source table to spawn %s: %s", name, s.ToString().c_str());
103 }
104 if (batch == nullptr) {
105 break;
106 }
107 try {
108 s = mergedProjectors->Evaluate(*batch, arrow::default_memory_pool(), &v);
109 if (!s.ok()) {
110 throw runtime_error_f("Cannot apply projector to source table of %s: %s", name, s.ToString().c_str());
111 }
112 } catch (std::exception& e) {
113 throw runtime_error_f("Cannot apply projector to source table of %s: exception caught: %s", name, e.what());
114 }
115
116 for (auto i = 0U; i < nColumns; ++i) {
117 chunks[i].emplace_back(v.at(i));
118 }
119 }
120
121 arrays.reserve(nColumns);
122 for (auto i = 0U; i < nColumns; ++i) {
123 arrays.push_back(std::make_shared<arrow::ChunkedArray>(chunks[i]));
124 }
125
126 addLabelToSchema(newSchema, name);
127 return arrow::Table::Make(newSchema, arrays);
128}
129
130} // namespace o2::framework
int32_t i
void setLabel(const char *label)
std::shared_ptr< arrow::Table > finalize()
const GLdouble * v
Definition glcorearb.h:832
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum GLsizei GLsizei GLint * values
Definition glcorearb.h:1576
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
const GLuint * arrays
Definition glcorearb.h:1314
GLint ref
Definition glcorearb.h:291
std::shared_ptr< gandiva::Projector > createProjectorHelper(size_t nColumns, expressions::Projector *projectors, std::shared_ptr< arrow::Schema > schema, std::vector< std::shared_ptr< arrow::Field > > const &fields)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
void addLabelToSchema(std::shared_ptr< arrow::Schema > &schema, const char *label)
RuntimeErrorRef runtime_error(const char *)
std::shared_ptr< arrow::Table > spawnerHelper(std::shared_ptr< arrow::Table > const &fullTable, std::shared_ptr< arrow::Schema > newSchema, size_t nColumns, expressions::Projector *projectors, std::vector< std::shared_ptr< arrow::Field > > const &fields, const char *name)
RuntimeErrorRef runtime_error_f(const char *,...)
A struct, containing the root of the expression tree.