23#include <Monitoring/Monitoring.h>
27#include <TTreeCache.h>
29#include <arrow/ipc/reader.h>
30#include <arrow/ipc/writer.h>
31#include <arrow/io/interfaces.h>
32#include <arrow/table.h>
33#include <arrow/util/key_value_metadata.h>
39 ic.services().get<CallbackService>().set<CallbackService::Id::EndOfStream>(
40 [](EndOfStreamContext& eosc) {
41 auto& control = eosc.services().get<ControlService>();
42 control.endOfStream();
47template <
typename... Ts>
48static inline auto doExtractOriginal(framework::pack<Ts...>, ProcessingContext& pc)
50 if constexpr (
sizeof...(Ts) == 1) {
51 return pc.inputs().get<TableConsumer>(aod::MetadataTrait<
framework::pack_element_t<0, framework::pack<Ts...>>>::metadata::tableLabel())->asArrowTable();
53 return std::vector{pc.inputs().get<TableConsumer>(aod::MetadataTrait<Ts>::metadata::tableLabel())->asArrowTable()...};
57template <
typename... Os>
58static inline auto extractOriginalsTuple(framework::pack<Os...>, ProcessingContext& pc)
60 return std::make_tuple(extractTypedOriginal<Os>(pc)...);
63template <
typename... Os>
64static inline auto extractOriginalsVector(framework::pack<Os...>, ProcessingContext& pc)
66 return std::vector{extractOriginal<Os>(pc)...};
69template <
size_t N, std::array<soa::TableRef, N> refs>
70static inline auto extractOriginals(ProcessingContext& pc)
72 return [&]<
size_t... Is>(std::index_sequence<Is...>) -> std::vector<std::shared_ptr<arrow::Table>> {
73 return {pc.inputs().get<TableConsumer>(o2::aod::label<refs[Is]>())->asArrowTable()...};
74 }(std::make_index_sequence<refs.size()>());
79 requires(D::exclusive)
80auto make_build(D metadata,
InputSpec const& input, ProcessingContext& pc)
82 using metadata_t =
decltype(metadata);
83 using Key =
typename metadata_t::Key;
84 using index_pack_t =
typename metadata_t::index_pack_t;
85 constexpr auto sources = metadata_t::sources;
92 requires(!D::exclusive)
93auto make_build(D metadata,
InputSpec const& input, ProcessingContext& pc)
95 using metadata_t =
decltype(metadata);
96 using Key =
typename metadata_t::Key;
97 using index_pack_t =
typename metadata_t::index_pack_t;
98 constexpr auto sources = metadata_t::sources;
109 auto outputs = pc.outputs();
111 for (
auto& input : requested) {
114 outputs.adopt(
Output{origin, description,
version}, make_build(o2::aod::Run2MatchedExclusiveMetadata{}, input, pc));
116 outputs.adopt(
Output{origin, description,
version}, make_build(o2::aod::Run2MatchedSparseMetadata{}, input, pc));
118 outputs.adopt(
Output{origin, description,
version}, make_build(o2::aod::Run3MatchedExclusiveMetadata{}, input, pc));
120 outputs.adopt(
Output{origin, description,
version}, make_build(o2::aod::Run3MatchedSparseMetadata{}, input, pc));
122 outputs.adopt(
Output{origin, description,
version}, make_build(o2::aod::MatchedBCCollisionsExclusiveMetadata{}, input, pc));
124 outputs.adopt(
Output{origin, description,
version}, make_build(o2::aod::MatchedBCCollisionsSparseMetadata{}, input, pc));
126 outputs.adopt(
Output{origin, description,
version}, make_build(o2::aod::MatchedBCCollisionsExclusiveMultiMetadata{}, input, pc));
128 outputs.adopt(
Output{origin, description,
version}, make_build(o2::aod::MatchedBCCollisionsSparseMultiMetadata{}, input, pc));
130 outputs.adopt(
Output{origin, description,
version}, make_build(o2::aod::Run3MatchedToBCSparseMetadata{}, input, pc));
132 outputs.adopt(
Output{origin, description,
version}, make_build(o2::aod::Run3MatchedToBCExclusiveMetadata{}, input, pc));
134 outputs.adopt(
Output{origin, description,
version}, make_build(o2::aod::Run2MatchedToBCSparseMetadata{}, input, pc));
136 throw std::runtime_error(
"Not an index table");
145template <o2::aod::is_aod_hash D>
149 constexpr auto sources = metadata_t::sources;
150 static std::shared_ptr<gandiva::Projector> projector =
nullptr;
154 return {{std::move(C::Projector())...}};
156 (
typename metadata_t::expression_pack_t{});
157 return o2::framework::spawner<D>(extractOriginals<
sources.size(),
sources>(pc), input.
binding.c_str(), projectors.data(), projector, schema);
167 for (
auto& input : requested) {
o2::monitoring::tags::Key Key
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
auto setEOSCallback(InitContext &ic)
RuntimeErrorRef runtime_error(const char *)
typename pack_element< I, T >::type pack_element_t
@ Me
Only quit this data processor.
auto createFieldsFromColumns(framework::pack< C... >)
std::function< ProcessCallback(InitContext &)> InitCallback
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
A struct, containing the root of the expression tree.
static AlgorithmSpec aodSpawnerCallback(std::vector< InputSpec > &requested)
static AlgorithmSpec indexBuilderCallback(std::vector< InputSpec > &requested)