Project
Loading...
Searching...
No Matches
AODReaderHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
25
26#include <Monitoring/Monitoring.h>
27
28#include <TGrid.h>
29#include <TFile.h>
30#include <TTreeCache.h>
31
32#include <arrow/ipc/reader.h>
33#include <arrow/ipc/writer.h>
34#include <arrow/io/interfaces.h>
35#include <arrow/table.h>
36#include <arrow/util/key_value_metadata.h>
37
39{
40auto setEOSCallback(InitContext& ic)
41{
42 ic.services().get<CallbackService>().set<CallbackService::Id::EndOfStream>(
43 [](EndOfStreamContext& eosc) {
44 auto& control = eosc.services().get<ControlService>();
45 control.endOfStream();
46 control.readyToQuit(QuitRequest::Me);
47 });
48}
49
50template <size_t N, std::array<soa::TableRef, N> refs>
51static inline auto extractOriginals(ProcessingContext& pc)
52{
53 return [&]<size_t... Is>(std::index_sequence<Is...>) -> std::vector<std::shared_ptr<arrow::Table>> {
54 return {pc.inputs().get<TableConsumer>(o2::aod::label<refs[Is]>())->asArrowTable()...};
55 }(std::make_index_sequence<refs.size()>());
56}
57namespace
58{
59template <typename D>
60 requires(D::exclusive)
61auto make_build(D metadata, InputSpec const& input, ProcessingContext& pc)
62{
63 using metadata_t = decltype(metadata);
64 using Key = typename metadata_t::Key;
65 using index_pack_t = typename metadata_t::index_pack_t;
66 constexpr auto sources = metadata_t::sources;
68 extractOriginals<sources.size(), sources>(pc),
69 index_pack_t{});
70}
71
72template <typename D>
73 requires(!D::exclusive)
74auto make_build(D metadata, InputSpec const& input, ProcessingContext& pc)
75{
76 using metadata_t = decltype(metadata);
77 using Key = typename metadata_t::Key;
78 using index_pack_t = typename metadata_t::index_pack_t;
79 constexpr auto sources = metadata_t::sources;
81 extractOriginals<sources.size(), sources>(pc),
82 index_pack_t{});
83}
84} // namespace
85
86AlgorithmSpec AODReaderHelpers::indexBuilderCallback(std::vector<InputSpec>& requested)
87{
88 return AlgorithmSpec::InitCallback{[requested](InitContext& /*ic*/) {
89 return [requested](ProcessingContext& pc) {
90 auto outputs = pc.outputs();
91 // spawn tables
92 for (auto& input : requested) {
94 if (description == header::DataDescription{"MA_RN2_EX"}) {
95 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run2MatchedExclusiveMetadata{}, input, pc));
96 } else if (description == header::DataDescription{"MA_RN2_SP"}) {
97 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run2MatchedSparseMetadata{}, input, pc));
98 } else if (description == header::DataDescription{"MA_RN3_EX"}) {
99 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run3MatchedExclusiveMetadata{}, input, pc));
100 } else if (description == header::DataDescription{"MA_RN3_SP"}) {
101 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run3MatchedSparseMetadata{}, input, pc));
102 } else if (description == header::DataDescription{"MA_BCCOL_EX"}) {
103 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::MatchedBCCollisionsExclusiveMetadata{}, input, pc));
104 } else if (description == header::DataDescription{"MA_BCCOL_SP"}) {
105 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::MatchedBCCollisionsSparseMetadata{}, input, pc));
106 } else if (description == header::DataDescription{"MA_BCCOLS_EX"}) {
107 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::MatchedBCCollisionsExclusiveMultiMetadata{}, input, pc));
108 } else if (description == header::DataDescription{"MA_BCCOLS_SP"}) {
109 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::MatchedBCCollisionsSparseMultiMetadata{}, input, pc));
110 } else if (description == header::DataDescription{"MA_RN3_BC_SP"}) {
111 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run3MatchedToBCSparseMetadata{}, input, pc));
112 } else if (description == header::DataDescription{"MA_RN3_BC_EX"}) {
113 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run3MatchedToBCExclusiveMetadata{}, input, pc));
114 } else if (description == header::DataDescription{"MA_RN2_BC_SP"}) {
115 outputs.adopt(Output{origin, description, version}, make_build(o2::aod::Run2MatchedToBCSparseMetadata{}, input, pc));
116 } else {
117 throw std::runtime_error("Not an index table");
118 }
119 }
120 };
121 }};
122}
123
124namespace
125{
126template <o2::aod::is_aod_hash D>
127auto make_spawn(InputSpec const& input, ProcessingContext& pc)
128{
129 using metadata_t = o2::aod::MetadataTrait<D>::metadata;
130 constexpr auto sources = metadata_t::sources;
131 static std::shared_ptr<gandiva::Projector> projector = nullptr;
132 static std::shared_ptr<arrow::Schema> schema = std::make_shared<arrow::Schema>(o2::soa::createFieldsFromColumns(typename metadata_t::expression_pack_t{}));
133 static auto projectors = []<typename... C>(framework::pack<C...>) -> std::array<expressions::Projector, sizeof...(C)>
134 {
135 return {{std::move(C::Projector())...}};
136 }
137 (typename metadata_t::expression_pack_t{});
138 return o2::framework::spawner<D>(extractOriginals<sources.size(), sources>(pc), input.binding.c_str(), projectors.data(), projector, schema);
139}
140
141struct Maker {
142 std::string binding;
143 std::vector<std::string> labels;
144 std::vector<std::shared_ptr<gandiva::Expression>> expressions;
145 std::shared_ptr<gandiva::Projector> projector = nullptr;
146 std::shared_ptr<arrow::Schema> schema;
147
151
152 std::shared_ptr<arrow::Table> make(ProcessingContext& pc)
153 {
154 std::vector<std::shared_ptr<arrow::Table>> originals;
155 for (auto const& label : labels) {
156 originals.push_back(pc.inputs().get<TableConsumer>(label)->asArrowTable());
157 }
158 auto fullTable = soa::ArrowHelpers::joinTables(std::move(originals), std::span{labels.begin(), labels.size()});
159 if (projector == nullptr) {
160 auto s = gandiva::Projector::Make(
161 fullTable->schema(),
163 &projector);
164 if (!s.ok()) {
165 throw o2::framework::runtime_error_f("Failed to create projector: %s", s.ToString().c_str());
166 }
167 }
168
169 return spawnerHelper(fullTable, schema, binding.c_str(), schema->num_fields(), projector);
170 }
171};
172
173struct Spawnable {
174 std::string binding;
175 std::vector<std::string> labels;
176 std::vector<expressions::Projector> projectors;
177 std::vector<std::shared_ptr<gandiva::Expression>> expressions;
178 std::shared_ptr<arrow::Schema> outputSchema;
179 std::shared_ptr<arrow::Schema> inputSchema;
180
184
185 Spawnable(InputSpec const& spec)
186 : binding{spec.binding}
187 {
188 auto&& [origin_, description_, version_] = DataSpecUtils::asConcreteDataMatcher(spec);
189 origin = origin_;
190 description = description_;
191 version = version_;
192 auto loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps) { return cps.name.compare("projectors") == 0; });
193 std::stringstream iws(loc->defaultValue.get<std::string>());
195
196 loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps) { return cps.name.compare("schema") == 0; });
197 iws.clear();
198 iws.str(loc->defaultValue.get<std::string>());
200
201 for (auto& i : spec.metadata) {
202 if (i.name.starts_with("input:")) {
203 labels.emplace_back(i.name.substr(6));
204 }
205 }
206
207 std::vector<std::shared_ptr<arrow::Field>> fields;
208 for (auto& p : projectors) {
209 expressions::walk(p.node.get(),
210 [&fields](expressions::Node* n) mutable {
211 if (n->self.index() == 1) {
212 auto& b = std::get<expressions::BindingNode>(n->self);
213 if (std::find_if(fields.begin(), fields.end(), [&b](std::shared_ptr<arrow::Field> const& field) { return field->name() == b.name; }) == fields.end()) {
214 fields.emplace_back(std::make_shared<arrow::Field>(b.name, expressions::concreteArrowType(b.type)));
215 }
216 }
217 });
218 }
219 inputSchema = std::make_shared<arrow::Schema>(fields);
220
221 int i = 0;
222 for (auto& p : projectors) {
223 expressions.push_back(
227 inputSchema),
228 outputSchema->field(i)));
229 ++i;
230 }
231 }
232
233 std::shared_ptr<gandiva::Projector> makeProjector()
234 {
236 }
237
238 Maker createMaker()
239 {
240 return {
241 binding,
242 labels,
244 nullptr,
246 origin,
248 version};
249 }
250};
251
252} // namespace
253
254AlgorithmSpec AODReaderHelpers::aodSpawnerCallback(/*std::vector<InputSpec>& requested*/ ConfigContext const& ctx)
255{
256 auto& ac = ctx.services().get<AnalysisContext>();
257 return AlgorithmSpec::InitCallback{[requested = ac.spawnerInputs](InitContext& /*ic*/) {
258 std::vector<Spawnable> spawnables;
259 for (auto& i : requested) {
260 spawnables.emplace_back(i);
261 }
262 std::vector<Maker> makers;
263 for (auto& s : spawnables) {
264 makers.push_back(s.createMaker());
265 }
266
267 return [makers](ProcessingContext& pc) mutable {
268 auto outputs = pc.outputs();
269 for (auto& maker : makers) {
270 outputs.adopt(Output{maker.origin, maker.description, maker.version}, maker.make(pc));
271 }
272 };
273 }};
274}
275
276} // namespace o2::framework::readers
std::shared_ptr< arrow::Schema > outputSchema
header::DataOrigin origin
std::shared_ptr< arrow::Schema > schema
header::DataDescription description
std::vector< expressions::Projector > projectors
std::vector< std::shared_ptr< gandiva::Expression > > expressions
std::shared_ptr< arrow::Schema > inputSchema
std::shared_ptr< gandiva::Projector > projector
std::vector< std::string > labels
std::string binding
o2::monitoring::tags::Key Key
std::vector< std::shared_ptr< arrow::Field > > fields
int32_t i
uint32_t version
Definition RawData.h:8
ServiceRegistryRef services() const
void adopt(const Output &spec, std::string *)
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
GLdouble n
Definition glcorearb.h:1982
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLsizei GLenum * sources
Definition glcorearb.h:2516
gandiva::ExpressionPtr makeExpression(gandiva::NodePtr node, gandiva::FieldPtr result)
Function to create gandiva projecting expression from generic gandiva expression tree.
std::shared_ptr< gandiva::Projector > createProjectorHelper(size_t nColumns, expressions::Projector *projectors, std::shared_ptr< arrow::Schema > schema, std::vector< std::shared_ptr< arrow::Field > > const &fields)
Operations createOperations(Filter const &expression)
Function to create an internal operation sequence from a filter tree.
void walk(Node *head, L &&pred)
Tree-walker helper.
gandiva::NodePtr createExpressionTree(Operations const &opSpecs, gandiva::SchemaPtr const &Schema)
Function to create gandiva expression tree from operation sequence.
auto setEOSCallback(InitContext &ic)
std::shared_ptr< arrow::Table > spawnerHelper(std::shared_ptr< arrow::Table > const &fullTable, std::shared_ptr< arrow::Schema > newSchema, size_t nColumns, expressions::Projector *projectors, const char *name, std::shared_ptr< gandiva::Projector > &projector)
@ Me
Only quit this data processor.
RuntimeErrorRef runtime_error_f(const char *,...)
Descriptor< gSizeDataDescriptionString > DataDescription
Definition DataHeader.h:551
Descriptor< gSizeDataOriginString > DataOrigin
Definition DataHeader.h:550
auto createFieldsFromColumns(framework::pack< C... >)
Definition ASoA.h:410
std::function< ProcessCallback(InitContext &)> InitCallback
static std::shared_ptr< arrow::Schema > read(std::istream &s)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static std::vector< expressions::Projector > read(std::istream &s)
std::string binding
A mnemonic name for the input spec.
Definition InputSpec.h:66
std::vector< ConfigParamSpec > metadata
A set of configurables which can be used to customise the InputSpec.
Definition InputSpec.h:76
header::DataOrigin origin
Definition Output.h:28
A struct, containing the root of the expression tree.
static AlgorithmSpec indexBuilderCallback(std::vector< InputSpec > &requested)
uint32_t SubSpecificationType
Definition DataHeader.h:621
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables, std::span< const char *const > labels)
Definition ASoA.cxx:72