30#include <Monitoring/Monitoring.h>
34#include <TTreeCache.h>
35#include <TTreePerfStats.h>
37#include <arrow/ipc/reader.h>
38#include <arrow/ipc/writer.h>
39#include <arrow/io/interfaces.h>
40#include <arrow/table.h>
41#include <arrow/util/key_value_metadata.h>
49 ic.services().get<CallbackService>().set<CallbackService::Id::EndOfStream>(
50 [](EndOfStreamContext& eosc) {
51 auto& control = eosc.services().get<ControlService>();
52 control.endOfStream();
57template <
typename... Ts>
58static inline auto doExtractOriginal(framework::pack<Ts...>, ProcessingContext& pc)
60 if constexpr (
sizeof...(Ts) == 1) {
61 return pc.inputs().get<TableConsumer>(aod::MetadataTrait<
framework::pack_element_t<0, framework::pack<Ts...>>>::metadata::tableLabel())->asArrowTable();
63 return std::vector{pc.inputs().get<TableConsumer>(aod::MetadataTrait<Ts>::metadata::tableLabel())->asArrowTable()...};
67template <
typename... Os>
68static inline auto extractOriginalsTuple(framework::pack<Os...>, ProcessingContext& pc)
70 return std::make_tuple(extractTypedOriginal<Os>(pc)...);
73template <
typename... Os>
74static inline auto extractOriginalsVector(framework::pack<Os...>, ProcessingContext& pc)
76 return std::vector{extractOriginal<Os>(pc)...};
79template <
size_t N, std::array<soa::TableRef, N> refs>
80static inline auto extractOriginals(ProcessingContext& pc)
82 return [&]<
size_t... Is>(std::index_sequence<Is...>) -> std::vector<std::shared_ptr<arrow::Table>> {
83 return {pc.inputs().get<TableConsumer>(o2::aod::label<refs[Is]>())->asArrowTable()...};
84 }(std::make_index_sequence<refs.size()>());
89 requires(D::exclusive)
90auto make_build(D metadata,
InputSpec const& input, ProcessingContext& pc)
92 using metadata_t =
decltype(metadata);
93 using Key =
typename metadata_t::Key;
94 using index_pack_t =
typename metadata_t::index_pack_t;
95 constexpr auto sources = metadata_t::sources;
102 requires(!D::exclusive)
103auto make_build(D metadata,
InputSpec const& input, ProcessingContext& pc)
105 using metadata_t =
decltype(metadata);
106 using Key =
typename metadata_t::Key;
107 using index_pack_t =
typename metadata_t::index_pack_t;
108 constexpr auto sources = metadata_t::sources;
119 auto outputs = pc.outputs();
121 for (
auto& input : requested) {
124 outputs.adopt(
Output{origin, description,
version}, make_build(o2::aod::Run2MatchedExclusiveMetadata{}, input, pc));
126 outputs.adopt(
Output{origin, description,
version}, make_build(o2::aod::Run2MatchedSparseMetadata{}, input, pc));
128 outputs.adopt(
Output{origin, description,
version}, make_build(o2::aod::Run3MatchedExclusiveMetadata{}, input, pc));
130 outputs.adopt(
Output{origin, description,
version}, make_build(o2::aod::Run3MatchedSparseMetadata{}, input, pc));
132 outputs.adopt(
Output{origin, description,
version}, make_build(o2::aod::MatchedBCCollisionsExclusiveMetadata{}, input, pc));
134 outputs.adopt(
Output{origin, description,
version}, make_build(o2::aod::MatchedBCCollisionsSparseMetadata{}, input, pc));
136 outputs.adopt(
Output{origin, description,
version}, make_build(o2::aod::MatchedBCCollisionsExclusiveMultiMetadata{}, input, pc));
138 outputs.adopt(
Output{origin, description,
version}, make_build(o2::aod::MatchedBCCollisionsSparseMultiMetadata{}, input, pc));
140 outputs.adopt(
Output{origin, description,
version}, make_build(o2::aod::Run3MatchedToBCSparseMetadata{}, input, pc));
142 outputs.adopt(
Output{origin, description,
version}, make_build(o2::aod::Run3MatchedToBCExclusiveMetadata{}, input, pc));
144 outputs.adopt(
Output{origin, description,
version}, make_build(o2::aod::Run2MatchedToBCSparseMetadata{}, input, pc));
146 throw std::runtime_error(
"Not an index table");
155template <o2::aod::is_aod_hash D>
159 constexpr auto sources = metadata_t::sources;
160 static std::shared_ptr<gandiva::Projector> projector =
nullptr;
161 return o2::framework::spawner<D>(extractOriginals<
sources.size(),
sources>(pc), input.
binding.c_str(), projector);
169 auto outputs = pc.outputs();
171 for (
auto& input : requested) {
o2::monitoring::tags::Key Key
auto setEOSCallback(InitContext &ic)
RuntimeErrorRef runtime_error(const char *)
typename pack_element< I, T >::type pack_element_t
@ Me
Only quit this data processor.
std::function< ProcessCallback(InitContext &)> InitCallback
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static AlgorithmSpec aodSpawnerCallback(std::vector< InputSpec > &requested)
static AlgorithmSpec indexBuilderCallback(std::vector< InputSpec > &requested)