Project
Loading...
Searching...
No Matches
AODReaderHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
22
23#include <Monitoring/Monitoring.h>
24
25#include <TGrid.h>
26#include <TFile.h>
27#include <TTreeCache.h>
28
29#include <arrow/ipc/reader.h>
30#include <arrow/ipc/writer.h>
31#include <arrow/io/interfaces.h>
32#include <arrow/table.h>
33#include <arrow/util/key_value_metadata.h>
34
36{
37auto setEOSCallback(InitContext& ic)
38{
39 ic.services().get<CallbackService>().set<CallbackService::Id::EndOfStream>(
40 [](EndOfStreamContext& eosc) {
41 auto& control = eosc.services().get<ControlService>();
42 control.endOfStream();
43 control.readyToQuit(QuitRequest::Me);
44 });
45}
46
47template <typename... Ts>
48static inline auto doExtractOriginal(framework::pack<Ts...>, ProcessingContext& pc)
49{
50 if constexpr (sizeof...(Ts) == 1) {
51 return pc.inputs().get<TableConsumer>(aod::MetadataTrait<framework::pack_element_t<0, framework::pack<Ts...>>>::metadata::tableLabel())->asArrowTable();
52 } else {
53 return std::vector{pc.inputs().get<TableConsumer>(aod::MetadataTrait<Ts>::metadata::tableLabel())->asArrowTable()...};
54 }
55}
56
57template <typename... Os>
58static inline auto extractOriginalsTuple(framework::pack<Os...>, ProcessingContext& pc)
59{
60 return std::make_tuple(extractTypedOriginal<Os>(pc)...);
61}
62
63template <typename... Os>
64static inline auto extractOriginalsVector(framework::pack<Os...>, ProcessingContext& pc)
65{
66 return std::vector{extractOriginal<Os>(pc)...};
67}
68
69template <size_t N, std::array<soa::TableRef, N> refs>
70static inline auto extractOriginals(ProcessingContext& pc)
71{
72 return [&]<size_t... Is>(std::index_sequence<Is...>) -> std::vector<std::shared_ptr<arrow::Table>> {
73 return {pc.inputs().get<TableConsumer>(o2::aod::label<refs[Is]>())->asArrowTable()...};
74 }(std::make_index_sequence<refs.size()>());
75}
76namespace
77{
78template <typename D>
79 requires(D::exclusive)
80auto make_build(D metadata, InputSpec const& input, ProcessingContext& pc)
81{
82 using metadata_t = decltype(metadata);
83 using Key = typename metadata_t::Key;
84 using index_pack_t = typename metadata_t::index_pack_t;
85 constexpr auto sources = metadata_t::sources;
87 extractOriginals<sources.size(), sources>(pc),
88 index_pack_t{});
89}
90
91template <typename D>
92 requires(!D::exclusive)
93auto make_build(D metadata, InputSpec const& input, ProcessingContext& pc)
94{
95 using metadata_t = decltype(metadata);
96 using Key = typename metadata_t::Key;
97 using index_pack_t = typename metadata_t::index_pack_t;
98 constexpr auto sources = metadata_t::sources;
100 extractOriginals<sources.size(), sources>(pc),
101 index_pack_t{});
102}
103} // namespace
104
106{
107 return AlgorithmSpec::InitCallback{[requested](InitContext& /*ic*/) {
108 return [requested](ProcessingContext& pc) {
109 auto outputs = pc.outputs();
110 // spawn tables
111 for (auto& input : requested) {
112 auto&& [origin, description, version] = DataSpecUtils::asConcreteDataMatcher(input);
113 if (description == header::DataDescription{"MA_RN2_EX"}) {
114 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run2MatchedExclusiveMetadata{}, input, pc));
115 } else if (description == header::DataDescription{"MA_RN2_SP"}) {
116 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run2MatchedSparseMetadata{}, input, pc));
117 } else if (description == header::DataDescription{"MA_RN3_EX"}) {
118 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run3MatchedExclusiveMetadata{}, input, pc));
119 } else if (description == header::DataDescription{"MA_RN3_SP"}) {
120 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run3MatchedSparseMetadata{}, input, pc));
121 } else if (description == header::DataDescription{"MA_BCCOL_EX"}) {
122 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::MatchedBCCollisionsExclusiveMetadata{}, input, pc));
123 } else if (description == header::DataDescription{"MA_BCCOL_SP"}) {
124 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::MatchedBCCollisionsSparseMetadata{}, input, pc));
125 } else if (description == header::DataDescription{"MA_BCCOLS_EX"}) {
126 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::MatchedBCCollisionsExclusiveMultiMetadata{}, input, pc));
127 } else if (description == header::DataDescription{"MA_BCCOLS_SP"}) {
128 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::MatchedBCCollisionsSparseMultiMetadata{}, input, pc));
129 } else if (description == header::DataDescription{"MA_RN3_BC_SP"}) {
130 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run3MatchedToBCSparseMetadata{}, input, pc));
131 } else if (description == header::DataDescription{"MA_RN3_BC_EX"}) {
132 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run3MatchedToBCExclusiveMetadata{}, input, pc));
133 } else if (description == header::DataDescription{"MA_RN2_BC_SP"}) {
134 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run2MatchedToBCSparseMetadata{}, input, pc));
135 } else {
136 throw std::runtime_error("Not an index table");
137 }
138 }
139 };
140 }};
141}
142
143namespace
144{
145template <o2::aod::is_aod_hash D>
146auto make_spawn(InputSpec const& input, ProcessingContext& pc)
147{
148 using metadata_t = o2::aod::MetadataTrait<D>::metadata;
149 constexpr auto sources = metadata_t::sources;
150 static std::shared_ptr<gandiva::Projector> projector = nullptr;
151 static std::shared_ptr<arrow::Schema> schema = std::make_shared<arrow::Schema>(o2::soa::createFieldsFromColumns(typename metadata_t::expression_pack_t{}));
152 static auto projectors = []<typename... C>(framework::pack<C...>) -> std::array<expressions::Projector, sizeof...(C)>
153 {
154 return {{std::move(C::Projector())...}};
155 }
156 (typename metadata_t::expression_pack_t{});
157 return o2::framework::spawner<D>(extractOriginals<sources.size(), sources>(pc), input.binding.c_str(), projectors.data(), projector, schema);
158}
159} // namespace
160
161AlgorithmSpec AODReaderHelpers::aodSpawnerCallback(std::vector<InputSpec>& requested)
162{
163 return AlgorithmSpec::InitCallback{[requested](InitContext& /*ic*/) {
164 return [requested](ProcessingContext& pc) {
165 auto outputs = pc.outputs();
166 // spawn tables
167 for (auto& input : requested) {
168 auto&& [origin, description, version] = DataSpecUtils::asConcreteDataMatcher(input);
169 if (description == header::DataDescription{"EXTRACK"}) {
170 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXTRACK/0"_h>>(input, pc));
171 } else if (description == header::DataDescription{"EXTRACK_IU"}) {
172 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXTRACK_IU/0"_h>>(input, pc));
173 } else if (description == header::DataDescription{"EXTRACKCOV"}) {
174 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXTRACKCOV/0"_h>>(input, pc));
175 } else if (description == header::DataDescription{"EXTRACKCOV_IU"}) {
176 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXTRACKCOV_IU/0"_h>>(input, pc));
177 } else if (description == header::DataDescription{"EXTRACKEXTRA"}) {
178 if (version == 0U) {
179 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXTRACKEXTRA/0"_h>>(input, pc));
180 } else if (version == 1U) {
181 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXTRACKEXTRA/1"_h>>(input, pc));
182 } else if (version == 2U) {
183 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXTRACKEXTRA/2"_h>>(input, pc));
184 }
185 } else if (description == header::DataDescription{"EXMFTTRACK"}) {
186 if (version == 0U) {
187 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXMFTTRACK/0"_h>>(input, pc));
188 } else if (version == 1U) {
189 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXMFTTRACK/1"_h>>(input, pc));
190 }
191 } else if (description == header::DataDescription{"EXFWDTRACK"}) {
192 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXFWDTRACK/0"_h>>(input, pc));
193 } else if (description == header::DataDescription{"EXFWDTRACKCOV"}) {
194 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXFWDTRACKCOV/0"_h>>(input, pc));
195 } else if (description == header::DataDescription{"EXMCPARTICLE"}) {
196 if (version == 0U) {
197 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXMCPARTICLE/0"_h>>(input, pc));
198 } else if (version == 1U) {
199 outputs.adopt(Output{origin, description, version}, make_spawn<o2::aod::Hash<"EXMCPARTICLE/1"_h>>(input, pc));
200 }
201 } else {
202 throw runtime_error("Not an extended table");
203 }
204 }
205 };
206 }};
207}
208
209} // namespace o2::framework::readers
o2::monitoring::tags::Key Key
uint32_t version
Definition RawData.h:8
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
GLsizei GLenum * sources
Definition glcorearb.h:2516
auto setEOSCallback(InitContext &ic)
RuntimeErrorRef runtime_error(const char *)
typename pack_element< I, T >::type pack_element_t
Definition Pack.h:56
@ Me
Only quit this data processor.
auto createFieldsFromColumns(framework::pack< C... >)
Definition ASoA.h:407
std::function< ProcessCallback(InitContext &)> InitCallback
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
std::string binding
A mnemonic name for the input spec.
Definition InputSpec.h:66
A struct, containing the root of the expression tree.
static AlgorithmSpec aodSpawnerCallback(std::vector< InputSpec > &requested)
static AlgorithmSpec indexBuilderCallback(std::vector< InputSpec > &requested)