Project
Loading...
Searching...
No Matches
AODReaderHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include "AODReaderHelpers.h"
13#include "../src/ExpressionJSONHelpers.h"
14#include "../src/IndexJSONHelpers.h"
15
23
25{
26namespace
27{
28struct Buildable {
29 bool exclusive = false;
30 std::string binding;
31 std::vector<std::string> labels;
35 std::vector<o2::soa::IndexRecord> records;
36 std::shared_ptr<arrow::Schema> outputSchema;
37
38 Buildable(InputSpec const& spec)
39 : binding{spec.binding}
40 {
41 auto&& [origin_, description_, version_] = DataSpecUtils::asConcreteDataMatcher(spec);
42 origin = origin_;
43 description = description_;
44 version = version_;
45
46 auto loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps) { return cps.name.compare("index-records") == 0; });
47 std::stringstream iws(loc->defaultValue.get<std::string>());
49
50 loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps) { return cps.name.compare("index-exclusive") == 0; });
51 exclusive = loc->defaultValue.get<bool>();
52
53 for (auto const& r : records) {
54 labels.emplace_back(r.label);
55 }
56 outputSchema = std::make_shared<arrow::Schema>([](std::vector<o2::soa::IndexRecord> const& recs) {
57 std::vector<std::shared_ptr<arrow::Field>> fields;
58 for (auto& r : recs) {
59 fields.push_back(r.field());
60 }
61 return fields;
62 }(records))
63 ->WithMetadata(std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{"label"}}, std::vector{std::string{binding}}));
64 }
65
66 framework::Builder createBuilder() const
67 {
68 return {
70 labels,
71 records,
73 origin,
75 version,
76 nullptr};
77 }
78};
79
80} // namespace
81
83{
84 auto& ac = ctx.services().get<AnalysisContext>();
85 return AlgorithmSpec::InitCallback{[requested = ac.requestedIDXs](InitContext& /*ic*/) {
86 std::vector<Buildable> buildables;
87 for (auto& i : requested) {
88 buildables.emplace_back(i);
89 }
90 std::vector<Builder> builders;
91 for (auto& b : buildables) {
92 builders.push_back(b.createBuilder());
93 }
94 return [builders](ProcessingContext& pc) mutable {
95 auto outputs = pc.outputs();
96 for (auto& builder : builders) {
97 outputs.adopt(Output{builder.origin, builder.description, builder.version}, builder.materialize(pc));
98 }
99 };
100 }};
101}
102
103namespace
104{
105struct Spawnable {
106 std::string binding;
107 std::vector<std::string> labels;
108 std::vector<expressions::Projector> projectors;
109 std::vector<std::shared_ptr<gandiva::Expression>> expressions;
110 std::shared_ptr<arrow::Schema> outputSchema;
111 std::shared_ptr<arrow::Schema> inputSchema;
112
116
117 Spawnable(InputSpec const& spec)
118 : binding{spec.binding}
119 {
120 auto&& [origin_, description_, version_] = DataSpecUtils::asConcreteDataMatcher(spec);
121 origin = origin_;
122 description = description_;
123 version = version_;
124 auto loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps) { return cps.name.compare("projectors") == 0; });
125 std::stringstream iws(loc->defaultValue.get<std::string>());
127
128 loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps) { return cps.name.compare("schema") == 0; });
129 iws.clear();
130 iws.str(loc->defaultValue.get<std::string>());
133
134 std::vector<std::shared_ptr<arrow::Schema>> schemas;
135 for (auto& i : spec.metadata) {
136 if (i.name.starts_with("input-schema:")) {
137 labels.emplace_back(i.name.substr(13));
138 iws.clear();
139 auto json = i.defaultValue.get<std::string>();
140 iws.str(json);
141 schemas.emplace_back(ArrowJSONHelpers::read(iws));
142 }
143 }
144
145 std::vector<std::shared_ptr<arrow::Field>> fields;
146 for (auto& s : schemas) {
147 std::copy(s->fields().begin(), s->fields().end(), std::back_inserter(fields));
148 }
149
150 inputSchema = std::make_shared<arrow::Schema>(fields);
152 }
153
154 std::shared_ptr<gandiva::Projector> makeProjector() const
155 {
156 std::shared_ptr<gandiva::Projector> p = nullptr;
157 auto s = gandiva::Projector::Make(
158 inputSchema,
159 expressions,
160 &p);
161 if (!s.ok()) {
162 throw o2::framework::runtime_error_f("Failed to create projector: %s", s.ToString().c_str());
163 }
164 return p;
165 }
166
167 framework::Spawner createMaker() const
168 {
169 return {
170 binding,
171 labels,
173 makeProjector(),
176 origin,
178 version};
179 }
180};
181
182} // namespace
183
185{
186 auto& ac = ctx.services().get<AnalysisContext>();
187 return AlgorithmSpec::InitCallback{[requested = ac.spawnerInputs](InitContext& /*ic*/) {
188 std::vector<Spawnable> spawnables;
189 for (auto& i : requested) {
190 spawnables.emplace_back(i);
191 }
192 std::vector<Spawner> spawners;
193 for (auto& s : spawnables) {
194 spawners.push_back(s.createMaker());
195 }
196
197 return [spawners](ProcessingContext& pc) mutable {
198 auto outputs = pc.outputs();
199 for (auto& spawner : spawners) {
200 outputs.adopt(Output{spawner.origin, spawner.description, spawner.version}, spawner.materialize(pc));
201 }
202 };
203 }};
204}
205
206} // namespace o2::framework::readers
bool exclusive
std::shared_ptr< arrow::Schema > outputSchema
header::DataOrigin origin
std::vector< o2::soa::IndexRecord > records
header::DataDescription description
std::vector< expressions::Projector > projectors
std::vector< std::shared_ptr< gandiva::Expression > > expressions
std::shared_ptr< arrow::Schema > inputSchema
std::vector< std::string > labels
std::string binding
std::vector< std::shared_ptr< arrow::Field > > fields
int32_t i
uint32_t version
Definition RawData.h:8
nlohmann::json json
ServiceRegistryRef services() const
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLboolean r
Definition glcorearb.h:1233
std::vector< std::shared_ptr< gandiva::Expression > > materializeProjectors(std::vector< expressions::Projector > const &projectors, std::shared_ptr< arrow::Schema > const &inputSchema, std::vector< std::shared_ptr< arrow::Field > > outputFields)
void addLabelToSchema(std::shared_ptr< arrow::Schema > &schema, const char *label)
auto spawner(std::shared_ptr< arrow::Table > const &fullTable, const char *name, o2::framework::expressions::Projector *projectors, std::shared_ptr< gandiva::Projector > &projector, std::shared_ptr< arrow::Schema > const &schema)
Expression-based column generator to materialize columns.
RuntimeErrorRef runtime_error_f(const char *,...)
Descriptor< gSizeDataDescriptionString > DataDescription
Definition DataHeader.h:551
Descriptor< gSizeDataOriginString > DataOrigin
Definition DataHeader.h:550
std::function< ProcessCallback(InitContext &)> InitCallback
static std::shared_ptr< arrow::Schema > read(std::istream &s)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static std::vector< expressions::Projector > read(std::istream &s)
static std::vector< o2::soa::IndexRecord > read(std::istream &s)
std::vector< ConfigParamSpec > metadata
A set of configurables which can be used to customise the InputSpec.
Definition InputSpec.h:76
header::DataOrigin origin
Definition Output.h:28
static AlgorithmSpec indexBuilderCallback(ConfigContext const &ctx)
static AlgorithmSpec aodSpawnerCallback(ConfigContext const &ctx)
uint32_t SubSpecificationType
Definition DataHeader.h:621