Project
Loading...
Searching...
No Matches
AODReaderHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include "AODReaderHelpers.h"
13#include "../src/ExpressionJSONHelpers.h"
14#include "../src/IndexJSONHelpers.h"
15
24
26{
27namespace
28{
29struct Buildable {
30 bool exclusive = false;
31 std::string binding;
32 std::vector<std::string> labels;
33 std::vector<framework::ConcreteDataMatcher> matchers;
37 std::vector<o2::soa::IndexRecord> records;
38 std::shared_ptr<arrow::Schema> outputSchema;
39
40 explicit Buildable(InputSpec const& spec)
41 : binding{spec.binding}
42 {
43 auto&& [origin_, description_, version_] = DataSpecUtils::asConcreteDataMatcher(spec);
44 origin = origin_;
45 description = description_;
46 version = version_;
47
48 auto loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps) { return cps.name.compare("index-records") == 0; });
49 std::stringstream iws(loc->defaultValue.get<std::string>());
51
52 loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps) { return cps.name.compare("index-exclusive") == 0; });
53 exclusive = loc->defaultValue.get<bool>();
54
55 for (auto const& r : records) {
56 labels.emplace_back(r.label);
57 matchers.emplace_back(r.matcher);
58 }
59 outputSchema = std::make_shared<arrow::Schema>([](std::vector<o2::soa::IndexRecord> const& recs) {
60 std::vector<std::shared_ptr<arrow::Field>> fields;
61 fields.reserve(recs.size());
62 std::ranges::transform(recs, std::back_inserter(fields), [](auto& r) { return r.field(); });
63 return fields;
64 }(records))
65 ->WithMetadata(std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{"label"}}, std::vector{std::string{binding}}));
66 }
67
68 framework::Builder createBuilder() const
69 {
70 return {
72 labels,
74 records,
76 origin,
78 version,
79 nullptr};
80 }
81};
82
83} // namespace
84
86{
88 auto const& requested = ic.services().get<DanglingEdgesContext>().requestedIDXs;
89 std::vector<Builder> builders;
90 builders.reserve(requested.size());
91 std::ranges::transform(requested, std::back_inserter(builders), [](auto const& i) { return Buildable{i}.createBuilder(); });
92 return [builders](ProcessingContext& pc) mutable {
93 auto outputs = pc.outputs();
94 std::ranges::for_each(builders, [&pc, &outputs](auto& builder) { outputs.adopt(Output{builder.origin, builder.description, builder.version}, builder.materialize(pc)); });
95 };
96 }};
97}
98
99namespace
100{
101struct Spawnable {
102 std::string binding;
103 std::vector<std::string> labels;
104 std::vector<framework::ConcreteDataMatcher> matchers;
105 std::vector<expressions::Projector> projectors;
106 std::vector<std::shared_ptr<gandiva::Expression>> expressions;
107 std::shared_ptr<arrow::Schema> outputSchema;
108 std::shared_ptr<arrow::Schema> inputSchema;
109
113
114 explicit Spawnable(InputSpec const& spec)
115 : binding{spec.binding}
116 {
117 auto&& [origin_, description_, version_] = DataSpecUtils::asConcreteDataMatcher(spec);
118 origin = origin_;
119 description = description_;
120 version = version_;
121 auto loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps) { return cps.name.compare("projectors") == 0; });
122 std::stringstream iws(loc->defaultValue.get<std::string>());
124
125 loc = std::find_if(spec.metadata.begin(), spec.metadata.end(), [](ConfigParamSpec const& cps) { return cps.name.compare("schema") == 0; });
126 iws.clear();
127 iws.str(loc->defaultValue.get<std::string>());
130
131 std::vector<std::shared_ptr<arrow::Schema>> schemas;
132 for (auto const& i : spec.metadata | views::filter_string_params_starts_with("input-schema:")) {
133 labels.emplace_back(i.name.substr(13));
134 iws.clear();
135 auto json = i.defaultValue.get<std::string>();
136 iws.str(json);
137 schemas.emplace_back(ArrowJSONHelpers::read(iws));
138 }
139 std::ranges::transform(spec.metadata |
140 views::filter_string_params_starts_with("input:") |
141 std::ranges::views::transform(
142 [](auto const& param) {
143 return DataSpecUtils::fromMetadataString(param.defaultValue.template get<std::string>());
144 }),
145 std::back_inserter(matchers), [](auto const& i) { return std::get<ConcreteDataMatcher>(i.matcher); });
146
147 std::vector<std::shared_ptr<arrow::Field>> fields;
148 std::ranges::for_each(schemas,
149 [&fields](auto const& s) {
150 std::ranges::copy(s->fields(), std::back_inserter(fields));
151 });
152
153 inputSchema = std::make_shared<arrow::Schema>(fields);
155 }
156
157 std::shared_ptr<gandiva::Projector> makeProjector() const
158 {
159 std::shared_ptr<gandiva::Projector> p = nullptr;
160 auto s = gandiva::Projector::Make(
161 inputSchema,
162 expressions,
163 &p);
164 if (!s.ok()) {
165 throw o2::framework::runtime_error_f("Failed to create projector: %s", s.ToString().c_str());
166 }
167 return p;
168 }
169
170 framework::Spawner createMaker() const
171 {
172 return {
173 binding,
174 labels,
175 matchers,
177 makeProjector(),
180 origin,
182 version};
183 }
184};
185
186} // namespace
187
189{
191 auto const& requested = ic.services().get<DanglingEdgesContext>().spawnerInputs;
192 std::vector<Spawner> spawners;
193 spawners.reserve(requested.size());
194 std::ranges::transform(requested, std::back_inserter(spawners), [](auto const& i) { return Spawnable{i}.createMaker(); });
195 return [spawners](ProcessingContext& pc) mutable {
196 auto outputs = pc.outputs();
197 std::ranges::for_each(spawners, [&pc, &outputs](auto& spawner) { outputs.adopt(Output{spawner.origin, spawner.description, spawner.version}, spawner.materialize(pc)); });
198 };
199 }};
200}
201
202} // namespace o2::framework::readers
bool exclusive
std::shared_ptr< arrow::Schema > outputSchema
header::DataOrigin origin
std::vector< o2::soa::IndexRecord > records
std::vector< framework::ConcreteDataMatcher > matchers
header::DataDescription description
std::vector< expressions::Projector > projectors
std::vector< std::shared_ptr< gandiva::Expression > > expressions
std::shared_ptr< arrow::Schema > inputSchema
std::vector< std::string > labels
std::string binding
std::vector< std::shared_ptr< arrow::Field > > fields
int32_t i
uint32_t version
Definition RawData.h:8
nlohmann::json json
GLboolean r
Definition glcorearb.h:1233
GLenum GLfloat param
Definition glcorearb.h:271
std::vector< std::shared_ptr< gandiva::Expression > > materializeProjectors(std::vector< expressions::Projector > const &projectors, std::shared_ptr< arrow::Schema > const &inputSchema, std::vector< std::shared_ptr< arrow::Field > > const &outputFields)
void addLabelToSchema(std::shared_ptr< arrow::Schema > &schema, const char *label)
auto spawner(std::shared_ptr< arrow::Table > const &fullTable, const char *name, o2::framework::expressions::Projector *projectors, std::shared_ptr< gandiva::Projector > &projector, std::shared_ptr< arrow::Schema > const &schema)
Expression-based column generator to materialize columns.
RuntimeErrorRef runtime_error_f(const char *,...)
Descriptor< gSizeDataDescriptionString > DataDescription
Definition DataHeader.h:551
Descriptor< gSizeDataOriginString > DataOrigin
Definition DataHeader.h:550
std::function< ProcessCallback(InitContext &)> InitCallback
static std::shared_ptr< arrow::Schema > read(std::istream &s)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static std::vector< expressions::Projector > read(std::istream &s)
static std::vector< o2::soa::IndexRecord > read(std::istream &s)
std::vector< ConfigParamSpec > metadata
A set of configurables which can be used to customise the InputSpec.
Definition InputSpec.h:76
header::DataOrigin origin
Definition Output.h:28
static AlgorithmSpec indexBuilderCallback(ConfigContext const &)
static AlgorithmSpec aodSpawnerCallback(ConfigContext const &)
uint32_t SubSpecificationType
Definition DataHeader.h:621