20 std::vector<framework::IndexColumnBuilder> builders;
21 builders.reserve(
records.size());
22 auto pool = arrow::default_memory_pool();
25 std::get<framework::SelfBuilder>(builders[0].builder).keyIndex = std::make_unique<framework::ChunkedArrayIterator>(tables[0]->column(
records[0].
pos));
37 for (
auto i = 0U;
i < builders.size(); ++
i) {
38 builders[
i].reset(builders[
i].mColumnPos >= 0 ? tables[
i]->column(builders[
i].mColumnPos) :
nullptr);
41 if (builders[0].mColumnPos >= 0) {
42 std::get<framework::SelfBuilder>(builders[0].builder).keyIndex = std::make_unique<framework::ChunkedArrayIterator>(tables[0]->column(builders[0].mColumnPos));
46std::shared_ptr<arrow::Table>
IndexBuilder::materialize(std::vector<framework::IndexColumnBuilder>& builders, std::vector<std::shared_ptr<arrow::Table>>&& tables, std::vector<soa::IndexRecord>
const&
records, std::shared_ptr<arrow::Schema>
const&
schema,
bool exclusive)
48 auto size = tables[0]->num_rows();
57 if (std::get<framework::SelfBuilder>(builders[0].builder).keyIndex ==
nullptr) {
60 idx = std::get<framework::SelfBuilder>(builders[0].builder).keyIndex->valueAt(
counter);
64 std::ranges::for_each(builders, [&idx, &found](
auto& builder) { found &= builder.find(idx); });
68 std::ranges::for_each(builders.begin() + 1, builders.end(), [&idx](
auto& builder) { builder.fill(idx); });
72 std::vector<std::shared_ptr<arrow::ChunkedArray>>
arrays;
73 arrays.reserve(builders.size());
74 std::ranges::transform(builders, std::back_inserter(
arrays), [](
auto& builder) {
return builder.result(); });
84 schema =
schema->WithMetadata(std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{
"label"}}, std::vector{std::string{
name}}));
85 return arrow::Table::MakeEmpty(
schema).ValueOrDie();
88std::shared_ptr<arrow::Table>
spawnerHelper(std::shared_ptr<arrow::Table>
const& fullTable, std::shared_ptr<arrow::Schema> newSchema,
size_t nColumns,
90 std::shared_ptr<gandiva::Projector>& projector)
92 if (projector ==
nullptr) {
99std::shared_ptr<arrow::Table>
spawnerHelper(std::shared_ptr<arrow::Table>
const& fullTable, std::shared_ptr<arrow::Schema> newSchema,
100 const char*
name,
size_t nColumns,
101 std::shared_ptr<gandiva::Projector>
const& projector)
103 arrow::TableBatchReader reader(*fullTable);
104 std::shared_ptr<arrow::RecordBatch> batch;
105 arrow::ArrayVector
v;
106 std::vector<arrow::ArrayVector> chunks;
107 chunks.resize(nColumns);
108 std::vector<std::shared_ptr<arrow::ChunkedArray>>
arrays;
111 auto s = reader.ReadNext(&batch);
113 throw runtime_error_f(
"Cannot read batches from the source table to spawn %s: %s",
name, s.ToString().c_str());
115 if (batch ==
nullptr) {
119 s = projector->Evaluate(*batch, arrow::default_memory_pool(), &
v);
121 throw runtime_error_f(
"Cannot apply projector to the source table of %s: %s",
name, s.ToString().c_str());
123 }
catch (std::exception& e) {
124 throw runtime_error_f(
"Cannot apply projector to the source table of %s: exception caught: %s",
name, e.what());
127 for (
auto i = 0U;
i < nColumns; ++
i) {
128 chunks[
i].emplace_back(
v.at(
i));
133 std::ranges::transform(chunks, std::back_inserter(
arrays), [](
auto&& chunk) {
return std::make_shared<arrow::ChunkedArray>(chunk); });
135 return arrow::Table::Make(newSchema,
arrays);
140 if (
tree ==
nullptr) {
142 if (isTableCompatible(hashes, ops)) {
145 throw std::runtime_error(
"Partition filter does not match declared table type");
155 std::stringstream osm;
162 std::stringstream osm;
169 std::stringstream osm;
176 std::vector<std::shared_ptr<arrow::Table>> tables;
178 std::ranges::transform(
matchers, std::back_inserter(tables), [&pc](
auto const& matcher) {
return pc.
inputs().get<TableConsumer>(matcher)->asArrowTable(); });
186 if (fullTable->num_rows() == 0) {
187 return arrow::Table::MakeEmpty(
schema).ValueOrDie();
196 builders = std::make_shared<std::vector<framework::IndexColumnBuilder>>();
199 std::shared_ptr<arrow::Table>
result;
std::vector< o2::soa::IndexRecord > records
std::vector< framework::ConcreteDataMatcher > matchers
std::vector< expressions::Projector > projectors
#define O2_BUILTIN_UNLIKELY(x)
std::shared_ptr< arrow::Schema > schema
InputRecord & inputs()
The inputs associated with this processing context.
GLuint const GLchar * name
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
std::shared_ptr< gandiva::Filter > FilterPtr
std::shared_ptr< gandiva::Filter > createFilter(gandiva::SchemaPtr const &Schema, gandiva::ConditionPtr condition)
Function to create gandiva filter from gandiva condition.
std::shared_ptr< gandiva::Projector > createProjectorHelper(size_t nColumns, expressions::Projector *projectors, std::shared_ptr< arrow::Schema > schema, std::vector< std::shared_ptr< arrow::Field > > const &fields)
std::vector< ColumnOperationSpec > Operations
gandiva::ConditionPtr makeCondition(gandiva::NodePtr node)
Function to create gandiva condition expression from generic gandiva expression tree.
Defining PrimaryVertex explicitly as messageable.
std::string serializeSchema(std::shared_ptr< arrow::Schema > schema)
std::string serializeProjectors(std::vector< framework::expressions::Projector > &projectors)
std::shared_ptr< arrow::Table > spawnerHelper(std::shared_ptr< arrow::Table > const &fullTable, std::shared_ptr< arrow::Schema > newSchema, size_t nColumns, expressions::Projector *projectors, const char *name, std::shared_ptr< gandiva::Projector > &projector)
std::string serializeIndexRecords(std::vector< o2::soa::IndexRecord > &irs)
RuntimeErrorRef runtime_error_f(const char *,...)
std::vector< std::shared_ptr< arrow::Table > > extractSources(ProcessingContext &pc, std::vector< std::string > const &labels)
void initializePartitionCaches(std::set< uint32_t > const &hashes, std::shared_ptr< arrow::Schema > const &schema, expressions::Filter const &filter, gandiva::NodePtr &tree, gandiva::FilterPtr &gfilter)
std::shared_ptr< arrow::Table > makeEmptyTableImpl(const char *name, std::shared_ptr< arrow::Schema > &schema)
static void write(std::ostream &o, std::shared_ptr< arrow::Schema > &schema)
std::vector< o2::soa::IndexRecord > records
std::vector< framework::ConcreteDataMatcher > matchers
std::shared_ptr< std::vector< framework::IndexColumnBuilder > > builders
std::shared_ptr< arrow::Schema > outputSchema
std::shared_ptr< arrow::Table > materialize(ProcessingContext &pc)
static void write(std::ostream &o, std::vector< expressions::Projector > &projectors)
static void write(std::ostream &o, std::vector< o2::soa::IndexRecord > &irs)
std::vector< std::string > labels
std::shared_ptr< gandiva::Projector > projector
std::vector< framework::ConcreteDataMatcher > matchers
std::shared_ptr< arrow::Schema > schema
std::shared_ptr< arrow::Table > materialize(ProcessingContext &pc) const
A struct, containing the root of the expression tree.
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables)
static void resetBuilders(std::vector< framework::IndexColumnBuilder > &builders, std::vector< std::shared_ptr< arrow::Table > > &&tables)
static std::vector< framework::IndexColumnBuilder > makeBuilders(std::vector< std::shared_ptr< arrow::Table > > &&tables, std::vector< soa::IndexRecord > const &records)
static std::shared_ptr< arrow::Table > materialize(std::vector< framework::IndexColumnBuilder > &builders, std::vector< std::shared_ptr< arrow::Table > > &&tables, std::vector< soa::IndexRecord > const &records, std::shared_ptr< arrow::Schema > const &schema, bool exclusive)
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))