Project
Loading...
Searching...
No Matches
AODReaderHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
28#include "Framework/Logger.h"
29
30#include <Monitoring/Monitoring.h>
31
32#include <TGrid.h>
33#include <TFile.h>
34#include <TTreeCache.h>
35#include <TTreePerfStats.h>
36
37#include <arrow/ipc/reader.h>
38#include <arrow/ipc/writer.h>
39#include <arrow/io/interfaces.h>
40#include <arrow/table.h>
41#include <arrow/util/key_value_metadata.h>
42
43#include <thread>
44
46{
47auto setEOSCallback(InitContext& ic)
48{
49 ic.services().get<CallbackService>().set<CallbackService::Id::EndOfStream>(
50 [](EndOfStreamContext& eosc) {
51 auto& control = eosc.services().get<ControlService>();
52 control.endOfStream();
53 control.readyToQuit(QuitRequest::Me);
54 });
55}
56
57template <typename... Ts>
58static inline auto doExtractOriginal(framework::pack<Ts...>, ProcessingContext& pc)
59{
60 if constexpr (sizeof...(Ts) == 1) {
61 return pc.inputs().get<TableConsumer>(aod::MetadataTrait<framework::pack_element_t<0, framework::pack<Ts...>>>::metadata::tableLabel())->asArrowTable();
62 } else {
63 return std::vector{pc.inputs().get<TableConsumer>(aod::MetadataTrait<Ts>::metadata::tableLabel())->asArrowTable()...};
64 }
65}
66
67template <typename... Os>
68static inline auto extractOriginalsTuple(framework::pack<Os...>, ProcessingContext& pc)
69{
70 return std::make_tuple(extractTypedOriginal<Os>(pc)...);
71}
72
73template <typename... Os>
74static inline auto extractOriginalsVector(framework::pack<Os...>, ProcessingContext& pc)
75{
76 return std::vector{extractOriginal<Os>(pc)...};
77}
78
79template <size_t N, std::array<soa::TableRef, N> refs>
80static inline auto extractOriginals(ProcessingContext& pc)
81{
82 return [&]<size_t... Is>(std::index_sequence<Is...>) -> std::vector<std::shared_ptr<arrow::Table>> {
83 return {pc.inputs().get<TableConsumer>(o2::aod::label<refs[Is]>())->asArrowTable()...};
84 }(std::make_index_sequence<refs.size()>());
85}
86namespace
87{
88template <typename D>
89 requires(D::exclusive)
90auto make_build(D metadata, InputSpec const& input, ProcessingContext& pc)
91{
92 using metadata_t = decltype(metadata);
93 using Key = typename metadata_t::Key;
94 using index_pack_t = typename metadata_t::index_pack_t;
95 constexpr auto sources = metadata_t::sources;
97 extractOriginals<sources.size(), sources>(pc),
98 index_pack_t{});
99}
100
101template <typename D>
102 requires(!D::exclusive)
103auto make_build(D metadata, InputSpec const& input, ProcessingContext& pc)
104{
105 using metadata_t = decltype(metadata);
106 using Key = typename metadata_t::Key;
107 using index_pack_t = typename metadata_t::index_pack_t;
108 constexpr auto sources = metadata_t::sources;
110 extractOriginals<sources.size(), sources>(pc),
111 index_pack_t{});
112}
113} // namespace
114
116{
117 return AlgorithmSpec::InitCallback{[requested](InitContext& /*ic*/) {
118 return [requested](ProcessingContext& pc) {
119 auto outputs = pc.outputs();
120 // spawn tables
121 for (auto& input : requested) {
122 auto&& [origin, description, version] = DataSpecUtils::asConcreteDataMatcher(input);
123 if (description == header::DataDescription{"MA_RN2_EX"}) {
124 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run2MatchedExclusiveMetadata{}, input, pc));
125 } else if (description == header::DataDescription{"MA_RN2_SP"}) {
126 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run2MatchedSparseMetadata{}, input, pc));
127 } else if (description == header::DataDescription{"MA_RN3_EX"}) {
128 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run3MatchedExclusiveMetadata{}, input, pc));
129 } else if (description == header::DataDescription{"MA_RN3_SP"}) {
130 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run3MatchedSparseMetadata{}, input, pc));
131 } else if (description == header::DataDescription{"MA_BCCOL_EX"}) {
132 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::MatchedBCCollisionsExclusiveMetadata{}, input, pc));
133 } else if (description == header::DataDescription{"MA_BCCOL_SP"}) {
134 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::MatchedBCCollisionsSparseMetadata{}, input, pc));
135 } else if (description == header::DataDescription{"MA_BCCOLS_EX"}) {
136 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::MatchedBCCollisionsExclusiveMultiMetadata{}, input, pc));
137 } else if (description == header::DataDescription{"MA_BCCOLS_SP"}) {
138 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::MatchedBCCollisionsSparseMultiMetadata{}, input, pc));
139 } else if (description == header::DataDescription{"MA_RN3_BC_SP"}) {
140 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run3MatchedToBCSparseMetadata{}, input, pc));
141 } else if (description == header::DataDescription{"MA_RN3_BC_EX"}) {
142 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run3MatchedToBCExclusiveMetadata{}, input, pc));
143 } else if (description == header::DataDescription{"MA_RN2_BC_SP"}) {
144 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run2MatchedToBCSparseMetadata{}, input, pc));
145 } else {
146 throw std::runtime_error("Not an index table");
147 }
148 }
149 };
150 }};
151}
152
153namespace
154{
155template <o2::aod::is_aod_hash D>
156auto make_spawn(InputSpec const& input, ProcessingContext& pc)
157{
158 using metadata_t = o2::aod::MetadataTrait<D>::metadata;
159 constexpr auto sources = metadata_t::sources;
160 static std::shared_ptr<gandiva::Projector> projector = nullptr;
161 return o2::framework::spawner<D>(extractOriginals<sources.size(), sources>(pc), input.binding.c_str(), projector);
162}
163} // namespace
164
165AlgorithmSpec AODReaderHelpers::aodSpawnerCallback(std::vector<InputSpec>& requested)
166{
167 return AlgorithmSpec::InitCallback{[requested](InitContext& /*ic*/) {
168 return [requested](ProcessingContext& pc) {
169 auto outputs = pc.outputs();
170 // spawn tables
171 for (auto& input : requested) {
172 auto&& [origin, description, version] = DataSpecUtils::asConcreteDataMatcher(input);
173 if (description == header::DataDescription{"EXTRACK"}) {
174 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXTRACK/0"_h>>(input, pc));
175 } else if (description == header::DataDescription{"EXTRACK_IU"}) {
176 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXTRACK_IU/0"_h>>(input, pc));
177 } else if (description == header::DataDescription{"EXTRACKCOV"}) {
178 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXTRACKCOV/0"_h>>(input, pc));
179 } else if (description == header::DataDescription{"EXTRACKCOV_IU"}) {
180 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXTRACKCOV_IU/0"_h>>(input, pc));
181 } else if (description == header::DataDescription{"EXTRACKEXTRA"}) {
182 if (version == 0U) {
183 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXTRACKEXTRA/0"_h>>(input, pc));
184 } else if (version == 1U) {
185 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXTRACKEXTRA/1"_h>>(input, pc));
186 } else if (version == 2U) {
187 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXTRACKEXTRA/2"_h>>(input, pc));
188 }
189 } else if (description == header::DataDescription{"EXMFTTRACK"}) {
190 if (version == 0U) {
191 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXMFTTRACK/0"_h>>(input, pc));
192 } else if (version == 1U) {
193 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXMFTTRACK/1"_h>>(input, pc));
194 }
195 } else if (description == header::DataDescription{"EXFWDTRACK"}) {
196 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXFWDTRACK/0"_h>>(input, pc));
197 } else if (description == header::DataDescription{"EXFWDTRACKCOV"}) {
198 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXFWDTRACKCOV/0"_h>>(input, pc));
199 } else if (description == header::DataDescription{"EXMCPARTICLE"}) {
200 if (version == 0U) {
201 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXMCPARTICLE/0"_h>>(input, pc));
202 } else if (version == 1U) {
203 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXMCPARTICLE/1"_h>>(input, pc));
204 }
205 } else {
206 throw runtime_error("Not an extended table");
207 }
208 }
209 };
210 }};
211}
212
213} // namespace o2::framework::readers
o2::monitoring::tags::Key Key
uint32_t version
Definition RawData.h:8
GLsizei GLenum * sources
Definition glcorearb.h:2516
auto setEOSCallback(InitContext &ic)
RuntimeErrorRef runtime_error(const char *)
typename pack_element< I, T >::type pack_element_t
Definition Pack.h:56
@ Me
Only quit this data processor.
std::function< ProcessCallback(InitContext &)> InitCallback
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
std::string binding
A mnemonic name for the input spec.
Definition InputSpec.h:66
static AlgorithmSpec aodSpawnerCallback(std::vector< InputSpec > &requested)
static AlgorithmSpec indexBuilderCallback(std::vector< InputSpec > &requested)