Project
Loading...
Searching...
No Matches
IndexBuilderHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16#include <arrow/compute/api_aggregate.h>
17#include <arrow/compute/kernel.h>
18#include <arrow/status.h>
19#include <arrow/table.h>
20#include <arrow/util/key_value_metadata.h>
21
22namespace o2::framework
23{
25{
26 throw framework::runtime_error("Cannot finish an array");
27}
28
30{
31 throw framework::runtime_error("Cannot create index column builder: invalid kind of index column");
32}
33
34ChunkedArrayIterator::ChunkedArrayIterator(std::shared_ptr<arrow::ChunkedArray> source)
35 : mSource{source},
36 mSourceSize{(size_t)source->length()}
37{
39 mCurrent = reinterpret_cast<int const*>(mCurrentArray->values()->data()) + mOffset;
40 mLast = mCurrent + mCurrentArray->length();
41}
42
43void ChunkedArrayIterator::reset(std::shared_ptr<arrow::ChunkedArray>& source)
44{
45 mPosition = 0;
46 mChunk = 0;
47 mOffset = 0;
48 mCurrentArray = nullptr;
49 mCurrent = nullptr;
50 mLast = nullptr;
51 mFirstIndex = 0;
52 mSourceSize = 0;
53
55 mSourceSize = (size_t)source->length();
57 mCurrent = reinterpret_cast<int const*>(mCurrentArray->values()->data()) + mOffset;
58 mLast = mCurrent + mCurrentArray->length();
59}
60
61SelfBuilder::SelfBuilder(arrow::MemoryPool* pool)
62{
63 auto status = arrow::MakeBuilder(pool, arrow::int32(), &mBuilder);
64 if (!status.ok()) {
65 throw framework::runtime_error("Cannot create array builder for the self-index!");
66 }
67}
68// static_cast<ChunkedArrayIterator*>(this)->reset(pool);
69void SelfBuilder::reset(std::shared_ptr<arrow::ChunkedArray>)
70{
71 mBuilder->Reset();
72 keyIndex = nullptr;
73}
74
75void SelfBuilder::fill(int idx)
76{
77 (void)static_cast<arrow::Int32Builder*>(mBuilder.get())->Append(idx);
78}
79
80std::shared_ptr<arrow::ChunkedArray> SelfBuilder::result() const
81{
82 std::shared_ptr<arrow::Array> array;
83 auto status = static_cast<arrow::Int32Builder*>(mBuilder.get())->Finish(&array);
84 if (!status.ok()) {
86 }
87
88 return std::make_shared<arrow::ChunkedArray>(array);
89}
90
91SingleBuilder::SingleBuilder(std::shared_ptr<arrow::ChunkedArray> source, arrow::MemoryPool* pool)
93{
94 auto status = arrow::MakeBuilder(pool, arrow::int32(), &mBuilder);
95 if (!status.ok()) {
96 throw framework::runtime_error("Cannot create array builder for the single-valued index!");
97 }
98}
99
100void SingleBuilder::reset(std::shared_ptr<arrow::ChunkedArray> source)
101{
102 static_cast<ChunkedArrayIterator*>(this)->reset(source);
103 mBuilder->Reset();
104}
105
107{
108 auto count = mSourceSize - mPosition;
109 while (count > 0) {
110 size_t step = count / 2;
111 mPosition += step;
112 if (valueAt(mPosition) <= idx) {
113 count -= step + 1;
114 } else {
115 mPosition -= step;
116 count = step;
117 }
118 }
119
120 if (mPosition < mSourceSize && valueAt(mPosition) < idx) {
121 ++mPosition;
122 }
123
124 return (mPosition < mSourceSize && valueAt(mPosition) == idx);
125}
126
128{
129 if (mPosition < mSourceSize && valueAt(mPosition) == idx) {
130 (void)static_cast<arrow::Int32Builder*>(mBuilder.get())->Append((int)mPosition);
131 } else {
132 (void)static_cast<arrow::Int32Builder*>(mBuilder.get())->Append(-1);
133 }
134}
135
136std::shared_ptr<arrow::ChunkedArray> SingleBuilder::result() const
137{
138 std::shared_ptr<arrow::Array> array;
139 auto status = static_cast<arrow::Int32Builder*>(mBuilder.get())->Finish(&array);
140 if (!status.ok()) {
142 }
143 return std::make_shared<arrow::ChunkedArray>(array);
144}
145
146SliceBuilder::SliceBuilder(std::shared_ptr<arrow::ChunkedArray> source, arrow::MemoryPool* pool)
148{
149 if (!preSlice().ok()) {
150 throw framework::runtime_error("Cannot pre-slice the source for slice-index building");
151 }
152
153 std::unique_ptr<arrow::ArrayBuilder> builder;
154 auto status = arrow::MakeBuilder(pool, arrow::int32(), &builder);
155 if (!status.ok()) {
156 throw framework::runtime_error("Cannot create array for the slice-index builder!");
157 }
158 mListBuilder = std::make_unique<arrow::FixedSizeListBuilder>(pool, std::move(builder), 2);
159 mValueBuilder = static_cast<arrow::FixedSizeListBuilder*>(mListBuilder.get())->value_builder();
160}
161
162void SliceBuilder::reset(std::shared_ptr<arrow::ChunkedArray> source)
163{
164 static_cast<ChunkedArrayIterator*>(this)->reset(source);
165 if (!preSlice().ok()) {
166 throw framework::runtime_error("Cannot pre-slice the source for slice-index building");
167 }
168 mListBuilder->Reset();
169 mValues = nullptr;
170 mCounts = nullptr;
171 mValuePos = 0;
172}
173
175{
176 auto count = mValues->length() - mValuePos;
177 while (count > 0) {
178 auto step = count / 2;
179 mValuePos += step;
180 if (mValues->Value(mValuePos) <= idx) {
181 count -= step + 1;
182 } else {
183 mValuePos -= step;
184 count = step;
185 }
186 }
187
188 if (mValuePos < mValues->length() && mValues->Value(mValuePos) <= idx) {
189 ++mPosition;
190 }
191
192 return (mValuePos < mValues->length() && mValues->Value(mValuePos) == idx);
193}
194
196{
197 int data[2] = {-1, -1};
198 if (mValuePos < mValues->length() && mValues->Value(mValuePos) == idx) {
199 for (auto i = 0; i < mValuePos; ++i) {
200 data[0] += mCounts->Value(i);
201 }
202 data[0] += 1;
203 data[1] = data[0] + mCounts->Value(mValuePos) - 1;
204 }
205 (void)static_cast<arrow::FixedSizeListBuilder*>(mListBuilder.get())->AppendValues(1);
206 (void)static_cast<arrow::Int32Builder*>(mValueBuilder)->AppendValues(data, 2);
207}
208
209std::shared_ptr<arrow::ChunkedArray> SliceBuilder::result() const
210{
211 std::shared_ptr<arrow::Array> array;
212 auto status = static_cast<arrow::FixedSizeListBuilder*>(mListBuilder.get())->Finish(&array);
213 if (!status.ok()) {
215 }
216 return std::make_shared<arrow::ChunkedArray>(array);
217}
218
219arrow::Status SliceBuilder::SliceBuilder::preSlice()
220{
221 arrow::Datum value_counts;
222 auto options = arrow::compute::ScalarAggregateOptions::Defaults();
223 ARROW_ASSIGN_OR_RAISE(value_counts, arrow::compute::CallFunction("value_counts", {mSource}, &options));
224 auto pair = static_cast<arrow::StructArray>(value_counts.array());
225 mValues = std::make_shared<arrow::NumericArray<arrow::Int32Type>>(pair.field(0)->data());
226 mCounts = std::make_shared<arrow::NumericArray<arrow::Int64Type>>(pair.field(1)->data());
227 return arrow::Status::OK();
228}
229
230ArrayBuilder::ArrayBuilder(std::shared_ptr<arrow::ChunkedArray> source, arrow::MemoryPool* pool)
232{
233 if (!preFind().ok()) {
234 throw framework::runtime_error("Cannot pre-find in a source for array-index building");
235 }
236
237 std::unique_ptr<arrow::ArrayBuilder> builder;
238 auto status = arrow::MakeBuilder(pool, arrow::int32(), &builder);
239 if (!status.ok()) {
240 throw framework::runtime_error("Cannot create array for the array-index builder!");
241 }
242 mListBuilder = std::make_unique<arrow::ListBuilder>(pool, std::move(builder));
243 mValueBuilder = static_cast<arrow::ListBuilder*>(mListBuilder.get())->value_builder();
244}
245
246void ArrayBuilder::reset(std::shared_ptr<arrow::ChunkedArray> source)
247{
248 static_cast<ChunkedArrayIterator*>(this)->reset(source);
249 if (!preFind().ok()) {
250 throw framework::runtime_error("Cannot pre-find in a source for array-index building");
251 }
252 mValues.clear();
253 mIndices.clear();
254 mListBuilder->Reset();
255}
256
258{
259 return (std::find(mValues.begin(), mValues.end(), idx) != mValues.end());
260}
261
263{
264 (void)static_cast<arrow::ListBuilder*>(mListBuilder.get())->Append();
265 if (std::find(mValues.begin(), mValues.end(), idx) != mValues.end()) {
266 (void)static_cast<arrow::Int32Builder*>(mValueBuilder)->AppendValues(mIndices[idx].data(), mIndices[idx].size());
267 } else {
268 (void)static_cast<arrow::Int32Builder*>(mValueBuilder)->AppendValues(nullptr, 0);
269 }
270}
271
272std::shared_ptr<arrow::ChunkedArray> ArrayBuilder::result() const
273{
274 std::shared_ptr<arrow::Array> array;
275 auto status = static_cast<arrow::ListBuilder*>(mListBuilder.get())->Finish(&array);
276 if (!status.ok()) {
278 }
279 return std::make_shared<arrow::ChunkedArray>(array);
280}
281
283{
284 arrow::Datum max;
285 auto options = arrow::compute::ScalarAggregateOptions::Defaults();
286 ARROW_ASSIGN_OR_RAISE(max, arrow::compute::CallFunction("max", {mSource}, &options));
287 auto maxValue = std::dynamic_pointer_cast<arrow::Int32Scalar>(max.scalar())->value;
288 mIndices.resize(maxValue + 1);
289
290 auto row = 0;
291 for (auto i = 0; i < mSource->length(); ++i) {
292 auto v = valueAt(i);
293 if (v >= 0) {
294 mValues.emplace_back(v);
295 mIndices[v].push_back(row);
296 }
297 ++row;
298 }
299 std::sort(mValues.begin(), mValues.end());
300
301 return arrow::Status::OK();
302}
303
304IndexColumnBuilder::IndexColumnBuilder(soa::IndexKind kind, int pos, arrow::MemoryPool* pool, std::shared_ptr<arrow::ChunkedArray> source)
305 : mColumnPos{pos}
306{
307 switch (kind) {
309 builder = SelfBuilder{pool};
310 break;
313 break;
315 builder = SliceBuilder{source, pool};
316 break;
318 builder = ArrayBuilder{source, pool};
319 break;
320 default:
322 }
323}
324
325void IndexColumnBuilder::reset(std::shared_ptr<arrow::ChunkedArray> source)
326{
327 std::visit(
329 [](std::monostate) {},
330 [&source](auto& b) { b.reset(source); }},
331 builder);
332}
333
335{
336 return std::visit(
338 [](std::monostate) { return false; },
339 [&idx](auto& b) { return b.find(idx); },
340 },
341 builder);
342}
343
345{
346 std::visit(
348 [](std::monostate) {},
349 [&idx](auto& b) { b.fill(idx); }},
350 builder);
351}
352
353std::shared_ptr<arrow::ChunkedArray> IndexColumnBuilder::result() const
354{
355 return std::visit(
357 [](std::monostate) -> std::shared_ptr<arrow::ChunkedArray> { return nullptr; },
358 [](auto& b) { return b.result(); }},
359 builder);
360}
361
362std::shared_ptr<arrow::Int32Array> ChunkedArrayIterator::getCurrentArray()
363{
364 auto chunk = mSource->chunk(mChunk);
365 mOffset = chunk->offset();
366 return std::static_pointer_cast<arrow::Int32Array>(chunk);
367}
368
370{
371 auto previousArray = getCurrentArray();
372 mFirstIndex += previousArray->length();
373
374 ++mChunk;
375 auto array = getCurrentArray();
376 mCurrent = reinterpret_cast<int const*>(array->values()->data()) + mOffset - mFirstIndex;
377 mLast = mCurrent + array->length() + mFirstIndex;
378}
379
381{
382 auto previousArray = getCurrentArray();
383 mFirstIndex -= previousArray->length();
384
385 --mChunk;
386 auto array = getCurrentArray();
387 mCurrent = reinterpret_cast<int const*>(array->values()->data()) + mOffset - mFirstIndex;
388 mLast = mCurrent + array->length() + mFirstIndex;
389}
390
392{
393 while (O2_BUILTIN_UNLIKELY(mCurrent + pos >= mLast)) {
394 nextChunk();
395 }
397 prevChunk();
398 }
399 return *(mCurrent + pos);
400}
401} // namespace o2::framework
#define O2_BUILTIN_UNLIKELY(x)
int32_t i
uint16_t pos
Definition RawData.h:3
GLint GLsizei count
Definition glcorearb.h:399
const GLdouble * v
Definition glcorearb.h:832
GLenum array
Definition glcorearb.h:4274
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei GLsizei GLchar * source
Definition glcorearb.h:798
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei GLsizei * length
Definition glcorearb.h:790
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error(const char *)
arrow::ArrayBuilder * mValueBuilder
std::shared_ptr< arrow::ChunkedArray > result() const
std::vector< std::vector< int > > mIndices
ArrayBuilder(std::shared_ptr< arrow::ChunkedArray > source, arrow::MemoryPool *pool)
std::unique_ptr< arrow::ArrayBuilder > mListBuilder
void reset(std::shared_ptr< arrow::ChunkedArray > source)
std::shared_ptr< arrow::Int32Array > getCurrentArray()
ChunkedArrayIterator(std::shared_ptr< arrow::ChunkedArray > source)
std::shared_ptr< arrow::Int32Array > mCurrentArray
void reset(std::shared_ptr< arrow::ChunkedArray > &source)
std::shared_ptr< arrow::ChunkedArray > mSource
IndexColumnBuilder(soa::IndexKind kind, int pos, arrow::MemoryPool *pool, std::shared_ptr< arrow::ChunkedArray > source=nullptr)
std::shared_ptr< arrow::ChunkedArray > result() const
void reset(std::shared_ptr< arrow::ChunkedArray > source=nullptr)
std::variant< std::monostate, SelfBuilder, SingleBuilder, SliceBuilder, ArrayBuilder > builder
SelfBuilder(arrow::MemoryPool *pool)
std::shared_ptr< arrow::ChunkedArray > result() const
std::unique_ptr< arrow::ArrayBuilder > mBuilder
std::unique_ptr< framework::ChunkedArrayIterator > keyIndex
void reset(std::shared_ptr< arrow::ChunkedArray >)
std::unique_ptr< arrow::ArrayBuilder > mBuilder
SingleBuilder(std::shared_ptr< arrow::ChunkedArray > source, arrow::MemoryPool *pool)
std::shared_ptr< arrow::ChunkedArray > result() const
void reset(std::shared_ptr< arrow::ChunkedArray > source)
std::shared_ptr< arrow::NumericArray< arrow::Int32Type > > mValues
std::unique_ptr< arrow::ArrayBuilder > mListBuilder
std::shared_ptr< arrow::ChunkedArray > result() const
SliceBuilder(std::shared_ptr< arrow::ChunkedArray > source, arrow::MemoryPool *pool)
void reset(std::shared_ptr< arrow::ChunkedArray > source)
arrow::ArrayBuilder * mValueBuilder
std::shared_ptr< arrow::NumericArray< arrow::Int64Type > > mCounts
From https://en.cppreference.com/w/cpp/utility/variant/visit.
constexpr size_t max
std::vector< int > row