Project
Loading...
Searching...
No Matches
ASoA.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include "Framework/ASoA.h"
13#include "ArrowDebugHelpers.h"
15#include <arrow/util/key_value_metadata.h>
16#include <arrow/util/config.h>
17#include <TMemFile.h>
18#include <TClass.h>
19#include <TTree.h>
20#include <TH1.h>
21#include <TError.h>
22
23namespace o2::soa
24{
25void accessingInvalidIndexFor(const char* getter)
26{
27 throw o2::framework::runtime_error_f("Accessing invalid index for %s", getter);
28}
29void dereferenceWithWrongType(const char* getter, const char* target)
30{
31 throw o2::framework::runtime_error_f("Trying to dereference index with a wrong type in %s_as<T> for base target \"%s\". Note that if you have several compatible index targets in your process() signature, the last one will be the one actually bound.", getter, target);
32}
34{
35 throw o2::framework::runtime_error_f("Null selection for %d (arg %d), missing Filter declaration?", hash, ai);
36}
37
38void getterNotFound(const char* targetColumnLabel)
39{
40 throw o2::framework::runtime_error_f("Getter for \"%s\" not found", targetColumnLabel);
41}
42
44{
45 throw framework::runtime_error("columnLabel: must not be empty");
46}
47
49{
51 rows.resize(sel->GetNumSlots());
52 for (auto i = 0; i < sel->GetNumSlots(); ++i) {
53 rows[i] = sel->GetIndex(i);
54 }
55 return rows;
56}
57
58SelectionVector sliceSelection(std::span<int64_t const> const& mSelectedRows, int64_t nrows, uint64_t offset)
59{
60 auto start = offset;
61 auto end = start + nrows;
62 auto start_iterator = std::lower_bound(mSelectedRows.begin(), mSelectedRows.end(), start);
63 auto stop_iterator = std::lower_bound(start_iterator, mSelectedRows.end(), end);
64 SelectionVector slicedSelection{start_iterator, stop_iterator};
65 std::ranges::transform(slicedSelection.begin(), slicedSelection.end(), slicedSelection.begin(),
66 [&start](int64_t idx) {
67 return idx - static_cast<int64_t>(start);
68 });
69 return slicedSelection;
70}
71
72std::shared_ptr<arrow::Table> ArrowHelpers::joinTables(std::vector<std::shared_ptr<arrow::Table>>&& tables)
73{
74 std::vector<std::shared_ptr<arrow::Field>> fields;
75 std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
76 bool notEmpty = (tables[0]->num_rows() != 0);
77 std::ranges::for_each(tables, [&fields, &columns, notEmpty](auto const& t) {
78 std::ranges::copy(t->fields(), std::back_inserter(fields));
79 if (notEmpty) {
80 std::ranges::copy(t->columns(), std::back_inserter(columns));
81 }
82 });
83 auto schema = std::make_shared<arrow::Schema>(fields);
84 return arrow::Table::Make(schema, columns);
85}
86
87namespace
88{
89template <typename T>
90 requires(std::same_as<T, std::string>)
91auto makeString(T const& str)
92{
93 return str.c_str();
94}
95template <typename T>
96 requires(std::same_as<T, const char*>)
97auto makeString(T const& str)
98{
99 return str;
100}
101
102template <typename T>
103void canNotJoin(std::vector<std::shared_ptr<arrow::Table>> const& tables, std::span<T> labels)
104{
105 for (auto i = 0U; i < tables.size() - 1; ++i) {
106 if (tables[i]->num_rows() != tables[i + 1]->num_rows()) {
107 throw o2::framework::runtime_error_f("Tables %s and %s have different sizes (%d vs %d) and cannot be joined!",
108 makeString(labels[i]), makeString(labels[i + 1]), tables[i]->num_rows(), tables[i + 1]->num_rows());
109 }
110 }
111}
112} // namespace
113
114std::shared_ptr<arrow::Table> ArrowHelpers::joinTables(std::vector<std::shared_ptr<arrow::Table>>&& tables, std::span<const char* const> labels)
115{
116 if (tables.size() == 1) {
117 return tables[0];
118 }
119 canNotJoin(tables, labels);
120 return joinTables(std::forward<std::vector<std::shared_ptr<arrow::Table>>>(tables));
121}
122
123std::shared_ptr<arrow::Table> ArrowHelpers::joinTables(std::vector<std::shared_ptr<arrow::Table>>&& tables, std::span<const std::string> labels)
124{
125 if (tables.size() == 1) {
126 return tables[0];
127 }
128 canNotJoin(tables, labels);
129 return joinTables(std::forward<std::vector<std::shared_ptr<arrow::Table>>>(tables));
130}
131
132std::shared_ptr<arrow::Table> ArrowHelpers::concatTables(std::vector<std::shared_ptr<arrow::Table>>&& tables)
133{
134 if (tables.size() == 1) {
135 return tables[0];
136 }
137 std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
138 std::vector<std::shared_ptr<arrow::Field>> resultFields = tables[0]->schema()->fields();
139 auto compareFields = [](std::shared_ptr<arrow::Field> const& f1, std::shared_ptr<arrow::Field> const& f2) {
140 // Let's do this with stable sorting.
141 return (!f1->Equals(f2)) && (f1->name() < f2->name());
142 };
143 for (size_t i = 1; i < tables.size(); ++i) {
144 auto& fields = tables[i]->schema()->fields();
145 std::vector<std::shared_ptr<arrow::Field>> intersection;
146
147 std::set_intersection(resultFields.begin(), resultFields.end(),
148 fields.begin(), fields.end(),
149 std::back_inserter(intersection), compareFields);
150 resultFields.swap(intersection);
151 }
152
153 for (auto& field : resultFields) {
154 arrow::ArrayVector chunks;
155 for (auto& table : tables) {
156 auto ci = table->schema()->GetFieldIndex(field->name());
157 if (ci == -1) {
158 throw std::runtime_error("Unable to find field " + field->name());
159 }
160 auto column = table->column(ci);
161 auto otherChunks = column->chunks();
162 chunks.insert(chunks.end(), otherChunks.begin(), otherChunks.end());
163 }
164 columns.push_back(std::make_shared<arrow::ChunkedArray>(chunks));
165 }
166
167 return arrow::Table::Make(std::make_shared<arrow::Schema>(resultFields), columns);
168}
169
170// ASCII-only lowercase. Column labels are plain identifiers, so we deliberately
171// avoid the locale-aware std::tolower: it goes through the C locale facet on
172// every character and dominated getIndexFromLabel in profiles.
173static constexpr char asciiToLower(char c)
174{
175 return (c >= 'A' && c <= 'Z') ? static_cast<char>(c + 32) : c;
176}
177
178arrow::ChunkedArray* getIndexFromLabel(arrow::Table* table, std::string_view label)
179{
180 // Take the exact-match common case first (string_view comparison checks length
181 // then memcmp), and fall back to a case-insensitive scan only when the labels
182 // differ in case.
183 auto field = std::ranges::find_if(table->schema()->fields(), [label](std::shared_ptr<arrow::Field> const& f) {
184 std::string_view name = f->name();
185 return label == name ||
186 std::ranges::equal(label, name, [](char c1, char c2) {
187 return asciiToLower(c1) == asciiToLower(c2);
188 });
189 });
190 if (field == table->schema()->fields().end()) {
191 o2::framework::throw_error(o2::framework::runtime_error_f("Unable to find column with label %s.", label));
192 }
193 auto index = std::distance(table->schema()->fields().begin(), field);
194 return table->column(index).get();
195}
196
197void notBoundTable(const char* tableName)
198{
199 throw o2::framework::runtime_error_f("Index pointing to %s is not bound! Did you subscribe to the table?", tableName);
200}
201
202void notFoundColumn(const char* label, const char* key)
203{
204 throw o2::framework::runtime_error_f(R"(Preslice not valid: table "%s" (or join based on it) does not have column "%s")", label, key);
205}
206
207void missingOptionalPreslice(const char* label, const char* key)
208{
209 throw o2::framework::runtime_error_f(R"(Optional Preslice with missing binding used: table "%s" (or join based on it) does not have column "%s")", label, key);
210}
211
212void* extractCCDBPayload(char* payload, size_t size, TClass const* cl, const char* what)
213{
214 Int_t previousErrorLevel = gErrorIgnoreLevel;
215 gErrorIgnoreLevel = kFatal;
216 // does it have a flattened headers map attached in the end?
217 TMemFile file("name", (char*)payload, size, "READ");
218 gErrorIgnoreLevel = previousErrorLevel;
219 if (file.IsZombie()) {
220 return nullptr;
221 }
222
223 if (!cl) {
224 return nullptr;
225 }
226 auto object = file.GetObjectChecked(what, cl);
227 if (!object) {
228 // it could be that object was stored with previous convention
229 // where the classname was taken as key
230 std::string objectName(cl->GetName());
231 objectName.erase(std::find_if(objectName.rbegin(), objectName.rend(), [](unsigned char ch) {
232 return !std::isspace(ch);
233 }).base(),
234 objectName.end());
235 objectName.erase(objectName.begin(), std::find_if(objectName.begin(), objectName.end(), [](unsigned char ch) {
236 return !std::isspace(ch);
237 }));
238
239 object = file.GetObjectChecked(objectName.c_str(), cl);
240 LOG(warn) << "Did not find object under expected name " << what;
241 if (!object) {
242 return nullptr;
243 }
244 LOG(warn) << "Found object under deprecated name " << cl->GetName();
245 }
246 auto result = object;
247 // We need to handle some specific cases as ROOT ties them deeply
248 // to the file they are contained in
249 if (cl->InheritsFrom("TObject")) {
250 // make a clone
251 // detach from the file
252 auto tree = dynamic_cast<TTree*>((TObject*)object);
253 if (tree) {
254 tree->LoadBaskets(0x1L << 32); // make tree memory based
255 tree->SetDirectory(nullptr);
256 result = tree;
257 } else {
258 auto h = dynamic_cast<TH1*>((TObject*)object);
259 if (h) {
260 h->SetDirectory(nullptr);
261 result = h;
262 }
263 }
264 }
265 return result;
266}
267
269{
270 return [newOrigin](framework::ConcreteDataMatcher&& m) {
271 if ((m.origin == header::DataOrigin{"AOD"}) && (newOrigin != header::DataOrigin{"AOD"})) {
272 m.origin = newOrigin;
273 }
274 return m;
275 };
276}
277
278} // namespace o2::soa
279
280namespace o2::framework
281{
282std::string cutString(std::string&& str)
283{
284 auto pos = str.find('_');
285 if (pos != std::string::npos) {
286 str.erase(pos);
287 }
288 return str;
289}
290
291std::string strToUpper(std::string&& str)
292{
293 std::transform(str.begin(), str.end(), str.begin(), [](unsigned char c) { return std::toupper(c); });
294 return str;
295}
296
298{
299 return binding == "[MISSING]";
300}
301
303{
304 return bindingKey;
305}
306
311
316
317std::shared_ptr<arrow::Table> PreslicePolicySorted::getSliceFor(int value, std::shared_ptr<arrow::Table> const& input, uint64_t& offset) const
318{
319 auto [offset_, count] = this->sliceInfo.getSliceFor(value);
320 offset = static_cast<int64_t>(offset_);
321 if (count == 0) {
322 // Empty group: avoid slicing every column only to discard it. Cache one
323 // empty (0-row) table per input table and reuse it (see GroupSlicer).
324 if (emptySlice.first != input.get()) {
325 emptySlice = {input.get(), input->Slice(0, 0)};
326 }
327 return emptySlice.second;
328 }
329 return input->Slice(offset_, count);
330}
331
332std::span<const int64_t> PreslicePolicyGeneral::getSliceFor(int value) const
333{
334 return this->sliceInfo.getSliceFor(value);
335}
336} // namespace o2::framework
std::vector< std::string > labels
uint32_t hash
std::shared_ptr< arrow::Schema > schema
std::vector< std::shared_ptr< arrow::Field > > fields
int32_t i
uint16_t pos
Definition RawData.h:3
uint32_t c
Definition RawData.h:2
StringRef key
Class for time synchronization of RawReader instances.
const GLfloat * m
Definition glcorearb.h:4066
GLint GLsizei count
Definition glcorearb.h:399
GLuint64EXT * result
Definition glcorearb.h:5662
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLuint index
Definition glcorearb.h:781
GLdouble f
Definition glcorearb.h:310
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum target
Definition glcorearb.h:1641
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLuint object
Definition glcorearb.h:4041
GLuint start
Definition glcorearb.h:469
std::shared_ptr< gandiva::SelectionVector > Selection
Definition Expressions.h:46
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
RuntimeErrorRef runtime_error(const char *)
void throw_error(RuntimeErrorRef)
std::string strToUpper(std::string &&str)
Definition ASoA.cxx:291
RuntimeErrorRef runtime_error_f(const char *,...)
std::string cutString(std::string &&str)
Definition ASoA.cxx:282
void * extractCCDBPayload(char *payload, size_t size, TClass const *cl, const char *what)
Definition ASoA.cxx:212
SelectionVector selectionToVector(gandiva::Selection const &sel)
Definition ASoA.cxx:48
void notBoundTable(const char *tableName)
Definition ASoA.cxx:197
SelectionVector sliceSelection(std::span< int64_t const > const &mSelectedRows, int64_t nrows, uint64_t offset)
Definition ASoA.cxx:58
std::vector< int64_t > SelectionVector
Definition ASoA.h:443
void missingFilterDeclaration(int hash, int ai)
Definition ASoA.cxx:33
void accessingInvalidIndexFor(const char *getter)
Definition ASoA.cxx:25
void dereferenceWithWrongType(const char *getter, const char *target)
Definition ASoA.cxx:29
void emptyColumnLabel()
Definition ASoA.cxx:43
void getterNotFound(const char *targetColumnLabel)
Definition ASoA.cxx:38
void missingOptionalPreslice(const char *label, const char *key)
Definition ASoA.cxx:207
std::function< framework::ConcreteDataMatcher(framework::ConcreteDataMatcher &&)> originReplacement(header::DataOrigin newOrigin)
Definition ASoA.cxx:268
arrow::ChunkedArray * getIndexFromLabel(arrow::Table *table, std::string_view label)
Definition ASoA.cxx:178
void notFoundColumn(const char *label, const char *key)
Definition ASoA.cxx:202
const std::string binding
Definition ASoA.h:1508
Entry const & getBindingKey() const
Definition ASoA.cxx:302
SliceInfoUnsortedPtr sliceInfo
Definition ASoA.h:1530
std::span< const int64_t > getSliceFor(int value) const
Definition ASoA.cxx:332
void updateSliceInfo(SliceInfoUnsortedPtr &&si)
Definition ASoA.cxx:312
void updateSliceInfo(SliceInfoPtr &&si)
Definition ASoA.cxx:307
std::pair< arrow::Table const *, std::shared_ptr< arrow::Table > > emptySlice
Definition ASoA.h:1524
std::shared_ptr< arrow::Table > getSliceFor(int value, std::shared_ptr< arrow::Table > const &input, uint64_t &offset) const
Definition ASoA.cxx:317
std::pair< int64_t, int64_t > getSliceFor(int value) const
std::span< int64_t const > getSliceFor(int value) const
static std::shared_ptr< arrow::Table > joinTables(std::vector< std::shared_ptr< arrow::Table > > &&tables)
Definition ASoA.cxx:72
static std::shared_ptr< arrow::Table > concatTables(std::vector< std::shared_ptr< arrow::Table > > &&tables)
Definition ASoA.cxx:132
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))
std::vector< ReadoutWindowData > rows
const std::string str