Project
Loading...
Searching...
No Matches
AODReaderHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
28#include "Framework/Logger.h"
29
30#include <Monitoring/Monitoring.h>
31
32#include <TGrid.h>
33#include <TFile.h>
34#include <TTreeCache.h>
35#include <TTreePerfStats.h>
36
37#include <arrow/ipc/reader.h>
38#include <arrow/ipc/writer.h>
39#include <arrow/io/interfaces.h>
40#include <arrow/table.h>
41#include <arrow/util/key_value_metadata.h>
42
43#include <thread>
44
46{
47auto setEOSCallback(InitContext& ic)
48{
49 ic.services().get<CallbackService>().set<CallbackService::Id::EndOfStream>(
50 [](EndOfStreamContext& eosc) {
51 auto& control = eosc.services().get<ControlService>();
52 control.endOfStream();
53 control.readyToQuit(QuitRequest::Me);
54 });
55}
56
57template <typename... Ts>
58static inline auto doExtractOriginal(framework::pack<Ts...>, ProcessingContext& pc)
59{
60 if constexpr (sizeof...(Ts) == 1) {
61 return pc.inputs().get<TableConsumer>(aod::MetadataTrait<framework::pack_element_t<0, framework::pack<Ts...>>>::metadata::tableLabel())->asArrowTable();
62 } else {
63 return std::vector{pc.inputs().get<TableConsumer>(aod::MetadataTrait<Ts>::metadata::tableLabel())->asArrowTable()...};
64 }
65}
66
67template <typename... Os>
68static inline auto extractOriginalsTuple(framework::pack<Os...>, ProcessingContext& pc)
69{
70 return std::make_tuple(extractTypedOriginal<Os>(pc)...);
71}
72
73template <typename... Os>
74static inline auto extractOriginalsVector(framework::pack<Os...>, ProcessingContext& pc)
75{
76 return std::vector{extractOriginal<Os>(pc)...};
77}
78
79template <size_t N, std::array<soa::TableRef, N> refs>
80static inline auto extractOriginals(ProcessingContext& pc)
81{
82 return [&]<size_t... Is>(std::index_sequence<Is...>) -> std::vector<std::shared_ptr<arrow::Table>> {
83 return {pc.inputs().get<TableConsumer>(o2::aod::label<refs[Is]>())->asArrowTable()...};
84 }(std::make_index_sequence<refs.size()>());
85}
86
87AlgorithmSpec AODReaderHelpers::indexBuilderCallback(std::vector<InputSpec>& requested)
88{
89 return AlgorithmSpec::InitCallback{[requested](InitContext& ic) {
90 return [requested](ProcessingContext& pc) {
91 auto outputs = pc.outputs();
92 // spawn tables
93 for (auto& input : requested) {
94 auto&& [origin, description, version] = DataSpecUtils::asConcreteDataMatcher(input);
95 auto maker = [&](auto metadata) {
96 using metadata_t = decltype(metadata);
97 using Key = typename metadata_t::Key;
98 using index_pack_t = typename metadata_t::index_pack_t;
99 constexpr auto sources = metadata_t::sources;
100 if constexpr (metadata_t::exclusive == true) {
102 extractOriginals<sources.size(), sources>(pc),
103 index_pack_t{});
104 } else {
106 extractOriginals<sources.size(), sources>(pc),
107 index_pack_t{});
108 }
109 };
110
111 if (description == header::DataDescription{"MA_RN2_EX"}) {
112 outputs.adopt(Output{origin, description, version}, maker(o2::aod::Run2MatchedExclusiveMetadata{}));
113 } else if (description == header::DataDescription{"MA_RN2_SP"}) {
114 outputs.adopt(Output{origin, description, version}, maker(o2::aod::Run2MatchedSparseMetadata{}));
115 } else if (description == header::DataDescription{"MA_RN3_EX"}) {
116 outputs.adopt(Output{origin, description, version}, maker(o2::aod::Run3MatchedExclusiveMetadata{}));
117 } else if (description == header::DataDescription{"MA_RN3_SP"}) {
118 outputs.adopt(Output{origin, description, version}, maker(o2::aod::Run3MatchedSparseMetadata{}));
119 } else if (description == header::DataDescription{"MA_BCCOL_EX"}) {
120 outputs.adopt(Output{origin, description, version}, maker(o2::aod::MatchedBCCollisionsExclusiveMetadata{}));
121 } else if (description == header::DataDescription{"MA_BCCOL_SP"}) {
122 outputs.adopt(Output{origin, description, version}, maker(o2::aod::MatchedBCCollisionsSparseMetadata{}));
123 } else if (description == header::DataDescription{"MA_BCCOLS_EX"}) {
124 outputs.adopt(Output{origin, description, version}, maker(o2::aod::MatchedBCCollisionsExclusiveMultiMetadata{}));
125 } else if (description == header::DataDescription{"MA_BCCOLS_SP"}) {
126 outputs.adopt(Output{origin, description, version}, maker(o2::aod::MatchedBCCollisionsSparseMultiMetadata{}));
127 } else if (description == header::DataDescription{"MA_RN3_BC_SP"}) {
128 outputs.adopt(Output{origin, description, version}, maker(o2::aod::Run3MatchedToBCSparseMetadata{}));
129 } else if (description == header::DataDescription{"MA_RN3_BC_EX"}) {
130 outputs.adopt(Output{origin, description, version}, maker(o2::aod::Run3MatchedToBCExclusiveMetadata{}));
131 } else if (description == header::DataDescription{"MA_RN2_BC_SP"}) {
132 outputs.adopt(Output{origin, description, version}, maker(o2::aod::Run2MatchedToBCSparseMetadata{}));
133 } else {
134 throw std::runtime_error("Not an index table");
135 }
136 }
137 };
138 }};
139}
140
141AlgorithmSpec AODReaderHelpers::aodSpawnerCallback(std::vector<InputSpec>& requested)
142{
143 return AlgorithmSpec::InitCallback{[requested](InitContext& /*ic*/) {
144 return [requested](ProcessingContext& pc) {
145 auto outputs = pc.outputs();
146 // spawn tables
147 for (auto& input : requested) {
148 auto&& [origin, description, version] = DataSpecUtils::asConcreteDataMatcher(input);
149 auto maker = [&]<o2::aod::is_aod_hash D>() {
150 using metadata_t = o2::aod::MetadataTrait<D>::metadata;
151 constexpr auto sources = metadata_t::sources;
152 return o2::framework::spawner<D>(extractOriginals<sources.size(), sources>(pc), input.binding.c_str());
153 };
154
155 if (description == header::DataDescription{"EXTRACK"}) {
156 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXTRACK/0"_h>>());
157 } else if (description == header::DataDescription{"EXTRACK_IU"}) {
158 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXTRACK_IU/0"_h>>());
159 } else if (description == header::DataDescription{"EXTRACKCOV"}) {
160 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXTRACKCOV/0"_h>>());
161 } else if (description == header::DataDescription{"EXTRACKCOV_IU"}) {
162 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXTRACKCOV_IU/0"_h>>());
163 } else if (description == header::DataDescription{"EXTRACKEXTRA"}) {
164 if (version == 0U) {
165 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXTRACKEXTRA/0"_h>>());
166 } else if (version == 1U) {
167 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXTRACKEXTRA/1"_h>>());
168 } else if (version == 2U) {
169 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXTRACKEXTRA/2"_h>>());
170 }
171 } else if (description == header::DataDescription{"EXMFTTRACK"}) {
172 if (version == 0U) {
173 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXMFTTRACK/0"_h>>());
174 } else if (version == 1U) {
175 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXMFTTRACK/1"_h>>());
176 }
177 } else if (description == header::DataDescription{"EXFWDTRACK"}) {
178 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXFWDTRACK/0"_h>>());
179 } else if (description == header::DataDescription{"EXFWDTRACKCOV"}) {
180 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXFWDTRACKCOV/0"_h>>());
181 } else if (description == header::DataDescription{"EXMCPARTICLE"}) {
182 if (version == 0U) {
183 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXMCPARTICLE/0"_h>>());
184 } else if (version == 1U) {
185 outputs.adopt(Output{origin, description, version}, maker.template operator()<o2::aod::Hash<"EXMCPARTICLE/1"_h>>());
186 }
187 } else {
188 throw runtime_error("Not an extended table");
189 }
190 }
191 };
192 }};
193}
194
195} // namespace o2::framework::readers
o2::monitoring::tags::Key Key
uint32_t version
Definition RawData.h:8
hash identification concepts
Definition ASoA.h:363
GLsizei GLenum * sources
Definition glcorearb.h:2516
auto setEOSCallback(InitContext &ic)
RuntimeErrorRef runtime_error(const char *)
typename pack_element< I, T >::type pack_element_t
Definition Pack.h:56
@ Me
Only quit this data processor.
std::function< ProcessCallback(InitContext &)> InitCallback
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static AlgorithmSpec aodSpawnerCallback(std::vector< InputSpec > &requested)
static AlgorithmSpec indexBuilderCallback(std::vector< InputSpec > &requested)