Project
Loading...
Searching...
No Matches
AODReaderHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
29#include "Framework/Logger.h"
30
31#include <Monitoring/Monitoring.h>
32
33#include <TGrid.h>
34#include <TFile.h>
35#include <TTreeCache.h>
36#include <TTreePerfStats.h>
37
38#include <arrow/ipc/reader.h>
39#include <arrow/ipc/writer.h>
40#include <arrow/io/interfaces.h>
41#include <arrow/table.h>
42#include <arrow/util/key_value_metadata.h>
43
44#include <thread>
45
47{
48auto setEOSCallback(InitContext& ic)
49{
50 ic.services().get<CallbackService>().set<CallbackService::Id::EndOfStream>(
51 [](EndOfStreamContext& eosc) {
52 auto& control = eosc.services().get<ControlService>();
53 control.endOfStream();
54 control.readyToQuit(QuitRequest::Me);
55 });
56}
57
58template <typename... Ts>
59static inline auto doExtractOriginal(framework::pack<Ts...>, ProcessingContext& pc)
60{
61 if constexpr (sizeof...(Ts) == 1) {
62 return pc.inputs().get<TableConsumer>(aod::MetadataTrait<framework::pack_element_t<0, framework::pack<Ts...>>>::metadata::tableLabel())->asArrowTable();
63 } else {
64 return std::vector{pc.inputs().get<TableConsumer>(aod::MetadataTrait<Ts>::metadata::tableLabel())->asArrowTable()...};
65 }
66}
67
68template <typename... Os>
69static inline auto extractOriginalsTuple(framework::pack<Os...>, ProcessingContext& pc)
70{
71 return std::make_tuple(extractTypedOriginal<Os>(pc)...);
72}
73
74template <typename... Os>
75static inline auto extractOriginalsVector(framework::pack<Os...>, ProcessingContext& pc)
76{
77 return std::vector{extractOriginal<Os>(pc)...};
78}
79
80template <size_t N, std::array<soa::TableRef, N> refs>
81static inline auto extractOriginals(ProcessingContext& pc)
82{
83 return [&]<size_t... Is>(std::index_sequence<Is...>) -> std::vector<std::shared_ptr<arrow::Table>> {
84 return {pc.inputs().get<TableConsumer>(o2::aod::label<refs[Is]>())->asArrowTable()...};
85 }(std::make_index_sequence<refs.size()>());
86}
87
88AlgorithmSpec AODReaderHelpers::indexBuilderCallback(std::vector<InputSpec>& requested)
89{
90 return AlgorithmSpec::InitCallback{[requested](InitContext& ic) {
91 return [requested](ProcessingContext& pc) {
92 auto outputs = pc.outputs();
93 // spawn tables
94 for (auto& input : requested) {
95 auto&& [origin, description, version] = DataSpecUtils::asConcreteDataMatcher(input);
96 auto maker = [&](auto metadata) {
97 using metadata_t = decltype(metadata);
98 using Key = typename metadata_t::Key;
99 using index_pack_t = typename metadata_t::index_pack_t;
100 constexpr auto sources = metadata_t::sources;
101 if constexpr (metadata_t::exclusive == true) {
103 extractOriginals<sources.size(), sources>(pc),
104 index_pack_t{});
105 } else {
107 extractOriginals<sources.size(), sources>(pc),
108 index_pack_t{});
109 }
110 };
111
112 if (description == header::DataDescription{"MA_RN2_EX"}) {
113 outputs.adopt(Output{origin, description, version}, maker(o2::aod::Run2MatchedExclusiveMetadata{}));
114 } else if (description == header::DataDescription{"MA_RN2_SP"}) {
115 outputs.adopt(Output{origin, description, version}, maker(o2::aod::Run2MatchedSparseMetadata{}));
116 } else if (description == header::DataDescription{"MA_RN3_EX"}) {
117 outputs.adopt(Output{origin, description, version}, maker(o2::aod::Run3MatchedExclusiveMetadata{}));
118 } else if (description == header::DataDescription{"MA_RN3_SP"}) {
119 outputs.adopt(Output{origin, description, version}, maker(o2::aod::Run3MatchedSparseMetadata{}));
120 } else if (description == header::DataDescription{"MA_BCCOL_EX"}) {
121 outputs.adopt(Output{origin, description, version}, maker(o2::aod::MatchedBCCollisionsExclusiveMetadata{}));
122 } else if (description == header::DataDescription{"MA_BCCOL_SP"}) {
123 outputs.adopt(Output{origin, description, version}, maker(o2::aod::MatchedBCCollisionsSparseMetadata{}));
124 } else if (description == header::DataDescription{"MA_BCCOLS_EX"}) {
125 outputs.adopt(Output{origin, description, version}, maker(o2::aod::MatchedBCCollisionsExclusiveMultiMetadata{}));
126 } else if (description == header::DataDescription{"MA_BCCOLS_SP"}) {
127 outputs.adopt(Output{origin, description, version}, maker(o2::aod::MatchedBCCollisionsSparseMultiMetadata{}));
128 } else if (description == header::DataDescription{"MA_RN3_BC_SP"}) {
129 outputs.adopt(Output{origin, description, version}, maker(o2::aod::Run3MatchedToBCSparseMetadata{}));
130 } else if (description == header::DataDescription{"MA_RN3_BC_EX"}) {
131 outputs.adopt(Output{origin, description, version}, maker(o2::aod::Run3MatchedToBCExclusiveMetadata{}));
132 } else if (description == header::DataDescription{"MA_RN2_BC_SP"}) {
133 outputs.adopt(Output{origin, description, version}, maker(o2::aod::Run2MatchedToBCSparseMetadata{}));
134 } else {
135 throw std::runtime_error("Not an index table");
136 }
137 }
138 };
139 }};
140}
141
142AlgorithmSpec AODReaderHelpers::aodSpawnerCallback(std::vector<InputSpec>& requested)
143{
144 return AlgorithmSpec::InitCallback{[requested](InitContext& /*ic*/) {
145 return [requested](ProcessingContext& pc) {
146 auto outputs = pc.outputs();
147 // spawn tables
148 for (auto& input : requested) {
149 auto&& [origin, description, version] = DataSpecUtils::asConcreteDataMatcher(input);
150 auto maker = [&]<o2::aod::is_aod_hash D>() {
151 using metadata_t = o2::aod::MetadataTrait<D>::metadata;
152 constexpr auto sources = metadata_t::sources;
153 return o2::framework::spawner<D>(extractOriginals<sources.size(), sources>(pc), input.binding.c_str());
154 };
155
156 if (description == header::DataDescription{"EXTRACK"}) {
157 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXTRACK/0"_h>>());
158 } else if (description == header::DataDescription{"EXTRACK_IU"}) {
159 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXTRACK_IU/0"_h>>());
160 } else if (description == header::DataDescription{"EXTRACKCOV"}) {
161 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXTRACKCOV/0"_h>>());
162 } else if (description == header::DataDescription{"EXTRACKCOV_IU"}) {
163 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXTRACKCOV_IU/0"_h>>());
164 } else if (description == header::DataDescription{"EXTRACKEXTRA"}) {
165 if (version == 0U) {
166 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXTRACKEXTRA/0"_h>>());
167 } else if (version == 1U) {
168 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXTRACKEXTRA/1"_h>>());
169 } else if (version == 2U) {
170 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXTRACKEXTRA/2"_h>>());
171 }
172 } else if (description == header::DataDescription{"EXMFTTRACK"}) {
173 if (version == 0U) {
174 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXMFTTRACK/0"_h>>());
175 } else if (version == 1U) {
176 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXMFTTRACK/1"_h>>());
177 }
178 } else if (description == header::DataDescription{"EXFWDTRACK"}) {
179 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXFWDTRACK/0"_h>>());
180 } else if (description == header::DataDescription{"EXFWDTRACKCOV"}) {
181 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXFWDTRACKCOV/0"_h>>());
182 } else if (description == header::DataDescription{"EXMCPARTICLE"}) {
183 if (version == 0U) {
184 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXMCPARTICLE/0"_h>>());
185 } else if (version == 1U) {
186 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXMCPARTICLE/1"_h>>());
187 }
188 } else {
189 throw runtime_error("Not an extended table");
190 }
191 }
192 };
193 }};
194}
195
196} // namespace o2::framework::readers
o2::monitoring::tags::Key Key
uint32_t version
Definition RawData.h:8
hash identification concepts
Definition ASoA.h:358
GLsizei GLenum * sources
Definition glcorearb.h:2516
auto setEOSCallback(InitContext &ic)
RuntimeErrorRef runtime_error(const char *)
typename pack_element< I, T >::type pack_element_t
Definition Pack.h:56
@ Me
Only quit this data processor.
std::function< ProcessCallback(InitContext &)> InitCallback
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static AlgorithmSpec aodSpawnerCallback(std::vector< InputSpec > &requested)
static AlgorithmSpec indexBuilderCallback(std::vector< InputSpec > &requested)