21 std::vector<framework::IndexColumnBuilder> builders;
22 builders.reserve(
records.size());
23 auto pool = arrow::default_memory_pool();
26 std::get<framework::SelfBuilder>(builders[0].builder).keyIndex = std::make_unique<framework::ChunkedArrayIterator>(tables[0]->column(
records[0].
pos));
38 for (
auto i = 0U;
i < builders.size(); ++
i) {
39 builders[
i].reset(builders[
i].mColumnPos >= 0 ? tables[
i]->column(builders[
i].mColumnPos) :
nullptr);
42 if (builders[0].mColumnPos >= 0) {
43 std::get<framework::SelfBuilder>(builders[0].builder).keyIndex = std::make_unique<framework::ChunkedArrayIterator>(tables[0]->column(builders[0].mColumnPos));
47std::shared_ptr<arrow::Table>
IndexBuilder::materialize(std::vector<framework::IndexColumnBuilder>& builders, std::vector<std::shared_ptr<arrow::Table>>&& tables, std::vector<soa::IndexRecord>
const&
records, std::shared_ptr<arrow::Schema>
const&
schema,
bool exclusive)
49 auto size = tables[0]->num_rows();
58 if (std::get<framework::SelfBuilder>(builders[0].builder).keyIndex ==
nullptr) {
61 idx = std::get<framework::SelfBuilder>(builders[0].builder).keyIndex->valueAt(
counter);
65 std::ranges::for_each(builders, [&idx, &found](
auto& builder) { found &= builder.find(idx); });
69 std::ranges::for_each(builders.begin() + 1, builders.end(), [&idx](
auto& builder) { builder.fill(idx); });
73 std::vector<std::shared_ptr<arrow::ChunkedArray>>
arrays;
74 arrays.reserve(builders.size());
75 std::ranges::transform(builders, std::back_inserter(
arrays), [](
auto& builder) {
return builder.result(); });
90 if (!
source.name.starts_with(
"input:")) {
93 source.defaultValue = std::regex_replace(
source.defaultValue.get<std::string>(), std::regex{
"/AOD/"},
"/" + originStr +
"/");
104 schema =
schema->WithMetadata(std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{
"label"}}, std::vector{std::string{
name}}));
105 return arrow::Table::MakeEmpty(
schema).ValueOrDie();
108std::shared_ptr<arrow::Table>
spawnerHelper(std::shared_ptr<arrow::Table>
const& fullTable, std::shared_ptr<arrow::Schema> newSchema,
size_t nColumns,
110 std::shared_ptr<gandiva::Projector>& projector)
112 if (projector ==
nullptr) {
119std::shared_ptr<arrow::Table>
spawnerHelper(std::shared_ptr<arrow::Table>
const& fullTable, std::shared_ptr<arrow::Schema> newSchema,
120 const char*
name,
size_t nColumns,
121 std::shared_ptr<gandiva::Projector>
const& projector)
123 arrow::TableBatchReader reader(*fullTable);
124 std::shared_ptr<arrow::RecordBatch> batch;
125 arrow::ArrayVector
v;
126 std::vector<arrow::ArrayVector> chunks;
127 chunks.resize(nColumns);
128 std::vector<std::shared_ptr<arrow::ChunkedArray>>
arrays;
131 auto s = reader.ReadNext(&batch);
133 throw runtime_error_f(
"Cannot read batches from the source table to spawn %s: %s",
name, s.ToString().c_str());
135 if (batch ==
nullptr) {
139 s = projector->Evaluate(*batch, arrow::default_memory_pool(), &
v);
141 throw runtime_error_f(
"Cannot apply projector to the source table of %s: %s",
name, s.ToString().c_str());
143 }
catch (std::exception& e) {
144 throw runtime_error_f(
"Cannot apply projector to the source table of %s: exception caught: %s",
name, e.what());
147 for (
auto i = 0U;
i < nColumns; ++
i) {
148 chunks[
i].emplace_back(
v.at(
i));
153 std::ranges::transform(chunks, std::back_inserter(
arrays), [](
auto&& chunk) {
return std::make_shared<arrow::ChunkedArray>(chunk); });
155 return arrow::Table::Make(newSchema,
arrays);
160 if (
tree ==
nullptr) {
162 if (isTableCompatible(hashes, ops)) {
165 throw std::runtime_error(
"Partition filter does not match declared table type");
175 std::stringstream osm;
182 std::stringstream osm;
189 std::stringstream osm;
196 std::vector<std::shared_ptr<arrow::Table>> tables;
198 std::ranges::transform(
matchers, std::back_inserter(tables), [&pc](
auto const& matcher) {
return pc.
inputs().get<TableConsumer>(matcher)->asArrowTable(); });
206 if (fullTable->num_rows() == 0) {
207 return arrow::Table::MakeEmpty(
schema).ValueOrDie();
216 builders = std::make_shared<std::vector<framework::IndexColumnBuilder>>();
219 std::shared_ptr<arrow::Table>
result;
std::vector< o2::soa::IndexRecord > records
std::vector< framework::ConcreteDataMatcher > matchers
std::vector< expressions::Projector > projectors
#define O2_BUILTIN_UNLIKELY(x)
std::shared_ptr< arrow::Schema > schema
InputRecord & inputs()
The inputs associated with this processing context.
GLuint const GLchar * name
GLsizei GLsizei GLchar * source
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
std::shared_ptr< gandiva::Filter > FilterPtr
std::shared_ptr< gandiva::Filter > createFilter(gandiva::SchemaPtr const &Schema, gandiva::ConditionPtr condition)
Function to create gandiva filter from gandiva condition.
std::shared_ptr< gandiva::Projector > createProjectorHelper(size_t nColumns, expressions::Projector *projectors, std::shared_ptr< arrow::Schema > schema, std::vector< std::shared_ptr< arrow::Field > > const &fields)
std::vector< ColumnOperationSpec > Operations
gandiva::ConditionPtr makeCondition(gandiva::NodePtr node)
Function to create gandiva condition expression from generic gandiva expression tree.
Defining ITS Vertex explicitly as messageable.
std::string serializeSchema(std::shared_ptr< arrow::Schema > schema)
std::string serializeProjectors(std::vector< framework::expressions::Projector > &projectors)
std::shared_ptr< arrow::Table > spawnerHelper(std::shared_ptr< arrow::Table > const &fullTable, std::shared_ptr< arrow::Schema > newSchema, size_t nColumns, expressions::Projector *projectors, const char *name, std::shared_ptr< gandiva::Projector > &projector)
std::string serializeIndexRecords(std::vector< o2::soa::IndexRecord > &irs)
ConfigParamSpec replaceOrigin(ConfigParamSpec &source, std::string const &originStr)
RuntimeErrorRef runtime_error_f(const char *,...)
void wrongOriginReplacement(std::string_view replacement)
std::vector< std::shared_ptr< arrow::Table > > extractSources(ProcessingContext &pc, std::vector< std::string > const &labels)
void initializePartitionCaches(std::set< uint32_t > const &hashes, std::shared_ptr< arrow::Schema > const &schema, expressions::Filter const &filter, gandiva::NodePtr &tree, gandiva::FilterPtr &gfilter)
std::shared_ptr< arrow::Table > makeEmptyTableImpl(const char *name, std::shared_ptr< arrow::Schema > &schema)
static void write(std::ostream &o, std::shared_ptr< arrow::Schema > &schema)
std::vector< o2::soa::IndexRecord > records
std::vector< framework::ConcreteDataMatcher > matchers
std::shared_ptr< std::vector< framework::IndexColumnBuilder > > builders
std::shared_ptr< arrow::Schema > outputSchema
std::shared_ptr< arrow::Table > materialize(ProcessingContext &pc)
header::DataDescription description
header::DataHeader::SubSpecificationType subSpec
static void write(std::ostream &o, std::vector< expressions::Projector > &projectors)
static void write(std::ostream &o, std::vector< o2::soa::IndexRecord > &irs)
std::vector< std::string > labels
std::shared_ptr< gandiva::Projector > projector
std::vector< framework::ConcreteDataMatcher > matchers
std::shared_ptr< arrow::Schema > schema
std::shared_ptr< arrow::Table > materialize(ProcessingContext &pc) const
A struct, containing the root of the expression tree.
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables)
static void resetBuilders(std::vector< framework::IndexColumnBuilder > &builders, std::vector< std::shared_ptr< arrow::Table > > &&tables)
static std::vector< framework::IndexColumnBuilder > makeBuilders(std::vector< std::shared_ptr< arrow::Table > > &&tables, std::vector< soa::IndexRecord > const &records)
static std::shared_ptr< arrow::Table > materialize(std::vector< framework::IndexColumnBuilder > &builders, std::vector< std::shared_ptr< arrow::Table > > &&tables, std::vector< soa::IndexRecord > const &records, std::shared_ptr< arrow::Schema > const &schema, bool exclusive)
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))