20 std::vector<framework::IndexColumnBuilder> builders;
21 builders.reserve(
records.size());
22 auto pool = arrow::default_memory_pool();
25 std::get<framework::SelfBuilder>(builders[0].builder).keyIndex = std::make_unique<framework::ChunkedArrayIterator>(tables[0]->column(
records[0].
pos));
37 for (
auto i = 0U;
i < builders.size(); ++
i) {
38 builders[
i].reset(builders[
i].mColumnPos >= 0 ? tables[
i]->column(builders[
i].mColumnPos) :
nullptr);
41 if (builders[0].mColumnPos >= 0) {
42 std::get<framework::SelfBuilder>(builders[0].builder).keyIndex = std::make_unique<framework::ChunkedArrayIterator>(tables[0]->column(builders[0].mColumnPos));
46std::shared_ptr<arrow::Table>
IndexBuilder::materialize(std::vector<framework::IndexColumnBuilder>& builders, std::vector<std::shared_ptr<arrow::Table>>&& tables, std::vector<soa::IndexRecord>
const&
records, std::shared_ptr<arrow::Schema>
const&
schema,
bool exclusive)
48 auto size = tables[0]->num_rows();
49 if (builders.empty()) {
55 std::vector<bool> finds;
56 finds.resize(builders.size());
59 if (std::get<framework::SelfBuilder>(builders[0].builder).keyIndex ==
nullptr) {
62 idx = std::get<framework::SelfBuilder>(builders[0].builder).keyIndex->valueAt(
counter);
64 for (
auto i = 0U;
i < builders.size(); ++
i) {
65 finds[
i] = builders[
i].find(idx);
68 if (std::none_of(finds.begin(), finds.end(), [](
bool const x) { return x == false; })) {
70 for (
auto i = 1U;
i < builders.size(); ++
i) {
71 builders[
i].fill(idx);
76 for (
auto i = 1U;
i < builders.size(); ++
i) {
77 builders[
i].fill(idx);
82 std::vector<std::shared_ptr<arrow::ChunkedArray>>
arrays;
83 arrays.reserve(builders.size());
84 for (
auto& builder : builders) {
85 arrays.push_back(builder.result());
96 schema =
schema->WithMetadata(std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{
"label"}}, std::vector{std::string{
name}}));
97 return arrow::Table::MakeEmpty(
schema).ValueOrDie();
100std::shared_ptr<arrow::Table>
spawnerHelper(std::shared_ptr<arrow::Table>
const& fullTable, std::shared_ptr<arrow::Schema> newSchema,
size_t nColumns,
102 std::shared_ptr<gandiva::Projector>& projector)
104 if (projector ==
nullptr) {
111std::shared_ptr<arrow::Table>
spawnerHelper(std::shared_ptr<arrow::Table>
const& fullTable, std::shared_ptr<arrow::Schema> newSchema,
112 const char*
name,
size_t nColumns,
113 std::shared_ptr<gandiva::Projector>
const& projector)
115 arrow::TableBatchReader reader(*fullTable);
116 std::shared_ptr<arrow::RecordBatch> batch;
117 arrow::ArrayVector
v;
118 std::vector<arrow::ArrayVector> chunks;
119 chunks.resize(nColumns);
120 std::vector<std::shared_ptr<arrow::ChunkedArray>>
arrays;
123 auto s = reader.ReadNext(&batch);
125 throw runtime_error_f(
"Cannot read batches from the source table to spawn %s: %s",
name, s.ToString().c_str());
127 if (batch ==
nullptr) {
131 s = projector->Evaluate(*batch, arrow::default_memory_pool(), &
v);
133 throw runtime_error_f(
"Cannot apply projector to the source table of %s: %s",
name, s.ToString().c_str());
135 }
catch (std::exception& e) {
136 throw runtime_error_f(
"Cannot apply projector to the source table of %s: exception caught: %s",
name, e.what());
139 for (
auto i = 0U;
i < nColumns; ++
i) {
140 chunks[
i].emplace_back(
v.at(
i));
145 for (
auto i = 0U;
i < nColumns; ++
i) {
146 arrays.push_back(std::make_shared<arrow::ChunkedArray>(chunks[
i]));
149 return arrow::Table::Make(newSchema,
arrays);
154 if (
tree ==
nullptr) {
156 if (isTableCompatible(hashes, ops)) {
159 throw std::runtime_error(
"Partition filter does not match declared table type");
169 std::stringstream osm;
176 std::stringstream osm;
183 std::stringstream osm;
190 std::vector<std::shared_ptr<arrow::Table>> tables;
201 if (fullTable->num_rows() == 0) {
202 return arrow::Table::MakeEmpty(
schema).ValueOrDie();
211 builders = std::make_shared<std::vector<framework::IndexColumnBuilder>>();
214 std::shared_ptr<arrow::Table>
result;
std::vector< o2::soa::IndexRecord > records
std::vector< expressions::Projector > projectors
std::vector< std::string > labels
std::shared_ptr< arrow::Schema > schema
InputRecord & inputs()
The inputs associated with this processing context.
GLuint const GLchar * name
GLuint GLsizei const GLchar * label
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
std::shared_ptr< gandiva::Filter > FilterPtr
std::shared_ptr< gandiva::Filter > createFilter(gandiva::SchemaPtr const &Schema, gandiva::ConditionPtr condition)
Function to create gandiva filter from gandiva condition.
std::shared_ptr< gandiva::Projector > createProjectorHelper(size_t nColumns, expressions::Projector *projectors, std::shared_ptr< arrow::Schema > schema, std::vector< std::shared_ptr< arrow::Field > > const &fields)
std::vector< ColumnOperationSpec > Operations
gandiva::ConditionPtr makeCondition(gandiva::NodePtr node)
Function to create gandiva condition expression from generic gandiva expression tree.
Defining PrimaryVertex explicitly as messageable.
std::string serializeSchema(std::shared_ptr< arrow::Schema > schema)
std::string serializeProjectors(std::vector< framework::expressions::Projector > &projectors)
std::shared_ptr< arrow::Table > spawnerHelper(std::shared_ptr< arrow::Table > const &fullTable, std::shared_ptr< arrow::Schema > newSchema, size_t nColumns, expressions::Projector *projectors, const char *name, std::shared_ptr< gandiva::Projector > &projector)
std::string serializeIndexRecords(std::vector< o2::soa::IndexRecord > &irs)
RuntimeErrorRef runtime_error_f(const char *,...)
std::vector< std::shared_ptr< arrow::Table > > extractSources(ProcessingContext &pc, std::vector< std::string > const &labels)
void initializePartitionCaches(std::set< uint32_t > const &hashes, std::shared_ptr< arrow::Schema > const &schema, expressions::Filter const &filter, gandiva::NodePtr &tree, gandiva::FilterPtr &gfilter)
std::shared_ptr< arrow::Table > makeEmptyTableImpl(const char *name, std::shared_ptr< arrow::Schema > &schema)
static void write(std::ostream &o, std::shared_ptr< arrow::Schema > &schema)
std::vector< o2::soa::IndexRecord > records
std::vector< std::string > labels
std::shared_ptr< std::vector< framework::IndexColumnBuilder > > builders
std::shared_ptr< arrow::Schema > outputSchema
std::shared_ptr< arrow::Table > materialize(ProcessingContext &pc)
static void write(std::ostream &o, std::vector< expressions::Projector > &projectors)
static void write(std::ostream &o, std::vector< o2::soa::IndexRecord > &irs)
std::vector< std::string > labels
std::shared_ptr< gandiva::Projector > projector
std::shared_ptr< arrow::Schema > schema
std::shared_ptr< arrow::Table > materialize(ProcessingContext &pc) const
A struct, containing the root of the expression tree.
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables, std::span< const char *const > labels)
static void resetBuilders(std::vector< framework::IndexColumnBuilder > &builders, std::vector< std::shared_ptr< arrow::Table > > &&tables)
static std::vector< framework::IndexColumnBuilder > makeBuilders(std::vector< std::shared_ptr< arrow::Table > > &&tables, std::vector< soa::IndexRecord > const &records)
static std::shared_ptr< arrow::Table > materialize(std::vector< framework::IndexColumnBuilder > &builders, std::vector< std::shared_ptr< arrow::Table > > &&tables, std::vector< soa::IndexRecord > const &records, std::shared_ptr< arrow::Schema > const &schema, bool exclusive)
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))