30#include <Monitoring/Monitoring.h>
34#include <TTreeCache.h>
35#include <TTreePerfStats.h>
37#include <arrow/ipc/reader.h>
38#include <arrow/ipc/writer.h>
39#include <arrow/io/interfaces.h>
40#include <arrow/table.h>
41#include <arrow/util/key_value_metadata.h>
49 ic.services().get<CallbackService>().set<CallbackService::Id::EndOfStream>(
50 [](EndOfStreamContext& eosc) {
51 auto& control = eosc.services().get<ControlService>();
52 control.endOfStream();
57template <
typename... Ts>
58static inline auto doExtractOriginal(framework::pack<Ts...>, ProcessingContext& pc)
60 if constexpr (
sizeof...(Ts) == 1) {
61 return pc.inputs().get<TableConsumer>(aod::MetadataTrait<
framework::pack_element_t<0, framework::pack<Ts...>>>::metadata::tableLabel())->asArrowTable();
63 return std::vector{pc.inputs().get<TableConsumer>(aod::MetadataTrait<Ts>::metadata::tableLabel())->asArrowTable()...};
67template <
typename... Os>
68static inline auto extractOriginalsTuple(framework::pack<Os...>, ProcessingContext& pc)
70 return std::make_tuple(extractTypedOriginal<Os>(pc)...);
73template <
typename... Os>
74static inline auto extractOriginalsVector(framework::pack<Os...>, ProcessingContext& pc)
76 return std::vector{extractOriginal<Os>(pc)...};
79template <
size_t N, std::array<soa::TableRef, N> refs>
80static inline auto extractOriginals(ProcessingContext& pc)
82 return [&]<
size_t... Is>(std::index_sequence<Is...>) -> std::vector<std::shared_ptr<arrow::Table>> {
83 return {pc.inputs().get<TableConsumer>(o2::aod::label<refs[Is]>())->asArrowTable()...};
84 }(std::make_index_sequence<refs.size()>());
91 auto outputs = pc.outputs();
93 for (
auto& input : requested) {
95 auto maker = [&](
auto metadata) {
96 using metadata_t =
decltype(metadata);
97 using Key =
typename metadata_t::Key;
98 using index_pack_t =
typename metadata_t::index_pack_t;
99 constexpr auto sources = metadata_t::sources;
100 if constexpr (metadata_t::exclusive ==
true) {
112 outputs.adopt(
Output{origin, description,
version}, maker(o2::aod::Run2MatchedExclusiveMetadata{}));
114 outputs.adopt(
Output{origin, description,
version}, maker(o2::aod::Run2MatchedSparseMetadata{}));
116 outputs.adopt(
Output{origin, description,
version}, maker(o2::aod::Run3MatchedExclusiveMetadata{}));
118 outputs.adopt(
Output{origin, description,
version}, maker(o2::aod::Run3MatchedSparseMetadata{}));
120 outputs.adopt(
Output{origin, description,
version}, maker(o2::aod::MatchedBCCollisionsExclusiveMetadata{}));
122 outputs.adopt(
Output{origin, description,
version}, maker(o2::aod::MatchedBCCollisionsSparseMetadata{}));
124 outputs.adopt(
Output{origin, description,
version}, maker(o2::aod::MatchedBCCollisionsExclusiveMultiMetadata{}));
126 outputs.adopt(
Output{origin, description,
version}, maker(o2::aod::MatchedBCCollisionsSparseMultiMetadata{}));
128 outputs.adopt(
Output{origin, description,
version}, maker(o2::aod::Run3MatchedToBCSparseMetadata{}));
130 outputs.adopt(
Output{origin, description,
version}, maker(o2::aod::Run3MatchedToBCExclusiveMetadata{}));
132 outputs.adopt(
Output{origin, description,
version}, maker(o2::aod::Run2MatchedToBCSparseMetadata{}));
134 throw std::runtime_error(
"Not an index table");
145 auto outputs = pc.outputs();
147 for (
auto& input : requested) {
151 constexpr auto sources = metadata_t::sources;
152 return o2::framework::spawner<D>(extractOriginals<
sources.size(),
sources>(pc), input.binding.c_str());
o2::monitoring::tags::Key Key
hash identification concepts
auto setEOSCallback(InitContext &ic)
RuntimeErrorRef runtime_error(const char *)
typename pack_element< I, T >::type pack_element_t
@ Me
Only quit this data processor.
std::function< ProcessCallback(InitContext &)> InitCallback
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static AlgorithmSpec aodSpawnerCallback(std::vector< InputSpec > &requested)
static AlgorithmSpec indexBuilderCallback(std::vector< InputSpec > &requested)