31#include <Monitoring/Monitoring.h>
35#include <TTreeCache.h>
36#include <TTreePerfStats.h>
38#include <arrow/ipc/reader.h>
39#include <arrow/ipc/writer.h>
40#include <arrow/io/interfaces.h>
41#include <arrow/table.h>
42#include <arrow/util/key_value_metadata.h>
50 ic.services().get<CallbackService>().set<CallbackService::Id::EndOfStream>(
51 [](EndOfStreamContext& eosc) {
52 auto& control = eosc.services().get<ControlService>();
53 control.endOfStream();
58template <
typename... Ts>
59static inline auto doExtractOriginal(framework::pack<Ts...>, ProcessingContext& pc)
61 if constexpr (
sizeof...(Ts) == 1) {
62 return pc.inputs().get<TableConsumer>(aod::MetadataTrait<
framework::pack_element_t<0, framework::pack<Ts...>>>::metadata::tableLabel())->asArrowTable();
64 return std::vector{pc.inputs().get<TableConsumer>(aod::MetadataTrait<Ts>::metadata::tableLabel())->asArrowTable()...};
68template <
typename... Os>
69static inline auto extractOriginalsTuple(framework::pack<Os...>, ProcessingContext& pc)
71 return std::make_tuple(extractTypedOriginal<Os>(pc)...);
74template <
typename... Os>
75static inline auto extractOriginalsVector(framework::pack<Os...>, ProcessingContext& pc)
77 return std::vector{extractOriginal<Os>(pc)...};
80template <
size_t N, std::array<soa::TableRef, N> refs>
81static inline auto extractOriginals(ProcessingContext& pc)
83 return [&]<
size_t... Is>(std::index_sequence<Is...>) -> std::vector<std::shared_ptr<arrow::Table>> {
84 return {pc.inputs().get<TableConsumer>(o2::aod::label<refs[Is]>())->asArrowTable()...};
85 }(std::make_index_sequence<refs.size()>());
92 auto outputs = pc.outputs();
94 for (
auto& input : requested) {
96 auto maker = [&](
auto metadata) {
97 using metadata_t =
decltype(metadata);
98 using Key =
typename metadata_t::Key;
99 using index_pack_t =
typename metadata_t::index_pack_t;
100 constexpr auto sources = metadata_t::sources;
101 if constexpr (metadata_t::exclusive ==
true) {
113 outputs.adopt(
Output{origin, description,
version}, maker(o2::aod::Run2MatchedExclusiveMetadata{}));
115 outputs.adopt(
Output{origin, description,
version}, maker(o2::aod::Run2MatchedSparseMetadata{}));
117 outputs.adopt(
Output{origin, description,
version}, maker(o2::aod::Run3MatchedExclusiveMetadata{}));
119 outputs.adopt(
Output{origin, description,
version}, maker(o2::aod::Run3MatchedSparseMetadata{}));
121 outputs.adopt(
Output{origin, description,
version}, maker(o2::aod::MatchedBCCollisionsExclusiveMetadata{}));
123 outputs.adopt(
Output{origin, description,
version}, maker(o2::aod::MatchedBCCollisionsSparseMetadata{}));
125 outputs.adopt(
Output{origin, description,
version}, maker(o2::aod::MatchedBCCollisionsExclusiveMultiMetadata{}));
127 outputs.adopt(
Output{origin, description,
version}, maker(o2::aod::MatchedBCCollisionsSparseMultiMetadata{}));
129 outputs.adopt(
Output{origin, description,
version}, maker(o2::aod::Run3MatchedToBCSparseMetadata{}));
131 outputs.adopt(
Output{origin, description,
version}, maker(o2::aod::Run3MatchedToBCExclusiveMetadata{}));
133 outputs.adopt(
Output{origin, description,
version}, maker(o2::aod::Run2MatchedToBCSparseMetadata{}));
135 throw std::runtime_error(
"Not an index table");
146 auto outputs = pc.outputs();
148 for (
auto& input : requested) {
152 constexpr auto sources = metadata_t::sources;
153 return o2::framework::spawner<D>(extractOriginals<
sources.size(),
sources>(pc), input.binding.c_str());
o2::monitoring::tags::Key Key
hash identification concepts
auto setEOSCallback(InitContext &ic)
RuntimeErrorRef runtime_error(const char *)
typename pack_element< I, T >::type pack_element_t
@ Me
Only quit this data processor.
std::function< ProcessCallback(InitContext &)> InitCallback
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static AlgorithmSpec aodSpawnerCallback(std::vector< InputSpec > &requested)
static AlgorithmSpec indexBuilderCallback(std::vector< InputSpec > &requested)