26#include <Monitoring/Monitoring.h>
30#include <TTreeCache.h>
32#include <arrow/ipc/reader.h>
33#include <arrow/ipc/writer.h>
34#include <arrow/io/interfaces.h>
35#include <arrow/table.h>
36#include <arrow/util/key_value_metadata.h>
42 ic.services().get<CallbackService>().set<CallbackService::Id::EndOfStream>(
43 [](EndOfStreamContext& eosc) {
44 auto& control = eosc.services().get<ControlService>();
45 control.endOfStream();
50template <
size_t N, std::array<soa::TableRef, N> refs>
51static inline auto extractOriginals(ProcessingContext& pc)
53 return [&]<
size_t... Is>(std::index_sequence<Is...>) -> std::vector<std::shared_ptr<arrow::Table>> {
54 return {pc.inputs().get<TableConsumer>(o2::aod::label<refs[Is]>())->asArrowTable()...};
55 }(std::make_index_sequence<refs.size()>());
60 requires(D::exclusive)
61auto make_build(D metadata,
InputSpec const& input, ProcessingContext& pc)
63 using metadata_t =
decltype(metadata);
64 using Key =
typename metadata_t::Key;
65 using index_pack_t =
typename metadata_t::index_pack_t;
66 constexpr auto sources = metadata_t::sources;
73 requires(!D::exclusive)
74auto make_build(D metadata,
InputSpec const& input, ProcessingContext& pc)
76 using metadata_t =
decltype(metadata);
77 using Key =
typename metadata_t::Key;
78 using index_pack_t =
typename metadata_t::index_pack_t;
79 constexpr auto sources = metadata_t::sources;
90 auto outputs = pc.outputs();
92 for (
auto& input : requested) {
117 throw std::runtime_error(
"Not an index table");
126template <o2::aod::is_aod_hash D>
130 constexpr auto sources = metadata_t::sources;
131 static std::shared_ptr<gandiva::Projector>
projector =
nullptr;
135 return {{std::move(C::Projector())...}};
137 (
typename metadata_t::expression_pack_t{});
145 std::shared_ptr<gandiva::Projector>
projector =
nullptr;
152 std::shared_ptr<arrow::Table> make(ProcessingContext& pc)
154 std::vector<std::shared_ptr<arrow::Table>> originals;
156 originals.push_back(pc.inputs().get<TableConsumer>(
label)->asArrowTable());
160 auto s = gandiva::Projector::Make(
175 std::vector<std::string>
labels;
177 std::vector<std::shared_ptr<gandiva::Expression>>
expressions;
192 auto loc = std::find_if(spec.
metadata.begin(), spec.
metadata.end(), [](ConfigParamSpec
const& cps) { return cps.name.compare(
"projectors") == 0; });
193 std::stringstream iws(loc->defaultValue.get<std::string>());
196 loc = std::find_if(spec.
metadata.begin(), spec.
metadata.end(), [](ConfigParamSpec
const& cps) { return cps.name.compare(
"schema") == 0; });
198 iws.str(loc->defaultValue.get<std::string>());
202 if (
i.name.starts_with(
"input:")) {
203 labels.emplace_back(
i.name.substr(6));
207 std::vector<std::shared_ptr<arrow::Field>>
fields;
210 [&
fields](expressions::Node*
n)
mutable {
211 if (n->self.index() == 1) {
212 auto& b = std::get<expressions::BindingNode>(n->self);
213 if (std::find_if(fields.begin(), fields.end(), [&b](std::shared_ptr<arrow::Field> const& field) { return field->name() == b.name; }) == fields.end()) {
214 fields.emplace_back(std::make_shared<arrow::Field>(b.name, expressions::concreteArrowType(b.type)));
233 std::shared_ptr<gandiva::Projector> makeProjector()
258 std::vector<Spawnable> spawnables;
259 for (
auto&
i : requested) {
260 spawnables.emplace_back(
i);
262 std::vector<Maker> makers;
263 for (
auto& s : spawnables) {
264 makers.push_back(s.createMaker());
269 for (
auto& maker : makers) {
270 outputs.
adopt(
Output{maker.
origin, maker.description, maker.version}, maker.make(pc));
std::shared_ptr< arrow::Schema > outputSchema
header::DataOrigin origin
std::shared_ptr< arrow::Schema > schema
header::DataDescription description
std::vector< expressions::Projector > projectors
std::vector< std::shared_ptr< gandiva::Expression > > expressions
std::shared_ptr< arrow::Schema > inputSchema
std::shared_ptr< gandiva::Projector > projector
std::vector< std::string > labels
o2::monitoring::tags::Key Key
std::vector< std::shared_ptr< arrow::Field > > fields
ServiceRegistryRef services() const
void adopt(const Output &spec, std::string *)
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
GLuint GLsizei const GLchar * label
gandiva::ExpressionPtr makeExpression(gandiva::NodePtr node, gandiva::FieldPtr result)
Function to create gandiva projecting expression from generic gandiva expression tree.
std::shared_ptr< gandiva::Projector > createProjectorHelper(size_t nColumns, expressions::Projector *projectors, std::shared_ptr< arrow::Schema > schema, std::vector< std::shared_ptr< arrow::Field > > const &fields)
Operations createOperations(Filter const &expression)
Function to create an internal operation sequence from a filter tree.
void walk(Node *head, L &&pred)
Tree-walker helper.
gandiva::NodePtr createExpressionTree(Operations const &opSpecs, gandiva::SchemaPtr const &Schema)
Function to create gandiva expression tree from operation sequence.
auto setEOSCallback(InitContext &ic)
std::shared_ptr< arrow::Table > spawnerHelper(std::shared_ptr< arrow::Table > const &fullTable, std::shared_ptr< arrow::Schema > newSchema, size_t nColumns, expressions::Projector *projectors, const char *name, std::shared_ptr< gandiva::Projector > &projector)
@ Me
Only quit this data processor.
RuntimeErrorRef runtime_error_f(const char *,...)
auto createFieldsFromColumns(framework::pack< C... >)
std::function< ProcessCallback(InitContext &)> InitCallback
static std::shared_ptr< arrow::Schema > read(std::istream &s)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static std::vector< expressions::Projector > read(std::istream &s)
header::DataOrigin origin
A struct, containing the root of the expression tree.
static AlgorithmSpec indexBuilderCallback(std::vector< InputSpec > &requested)
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables, std::span< const char *const > labels)