Project
Loading...
Searching...
No Matches
IndexBuilderHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16#include <arrow/util/config.h>
17#if (ARROW_VERSION_MAJOR > 20)
18#include <arrow/compute/initialize.h>
19#endif
20#include <arrow/compute/kernel.h>
21#include <arrow/compute/api_aggregate.h>
22#include <arrow/status.h>
23#include <arrow/table.h>
24#include <arrow/util/key_value_metadata.h>
25
26namespace o2::framework
27{
28void cannotBuildAnArray(const char* reason)
29{
30 throw framework::runtime_error_f("Cannot finish an array: %s", reason);
31}
32
34{
35 throw framework::runtime_error("Cannot create index column builder: invalid kind of index column");
36}
37
38ChunkedArrayIterator::ChunkedArrayIterator(std::shared_ptr<arrow::ChunkedArray> source)
39 : mSource{source},
40 mSourceSize{(size_t)source->length()}
41{
43 mCurrent = reinterpret_cast<int const*>(mCurrentArray->values()->data()) + mOffset;
44 mLast = mCurrent + mCurrentArray->length();
45}
46
47void ChunkedArrayIterator::reset(std::shared_ptr<arrow::ChunkedArray>& source)
48{
49 mPosition = 0;
50 mChunk = 0;
51 mOffset = 0;
52 mCurrentArray = nullptr;
53 mCurrent = nullptr;
54 mLast = nullptr;
55 mFirstIndex = 0;
56 mSourceSize = 0;
57
59 mSourceSize = (size_t)source->length();
61 mCurrent = reinterpret_cast<int const*>(mCurrentArray->values()->data()) + mOffset;
62 mLast = mCurrent + mCurrentArray->length();
63}
64
65SelfBuilder::SelfBuilder(arrow::MemoryPool* pool)
66{
67 auto status = arrow::MakeBuilder(pool, arrow::int32(), &mBuilder);
68 if (!status.ok()) {
69 throw framework::runtime_error_f("Cannot create array builder for the self-index: %s", status.ToString().c_str());
70 }
71}
72
73void SelfBuilder::reset(std::shared_ptr<arrow::ChunkedArray>)
74{
75 mBuilder->Reset();
76 keyIndex = nullptr;
77}
78
79void SelfBuilder::fill(int idx)
80{
81 auto status = static_cast<arrow::Int32Builder*>(mBuilder.get())->Append(idx);
82 if (!status.ok()) {
83 throw framework::runtime_error_f("Cannot append to self-index array: %s", status.ToString().c_str());
84 }
85}
86
87std::shared_ptr<arrow::ChunkedArray> SelfBuilder::result() const
88{
89 std::shared_ptr<arrow::Array> array;
90 auto status = static_cast<arrow::Int32Builder*>(mBuilder.get())->Finish(&array);
91 if (!status.ok()) {
92 cannotBuildAnArray(status.ToString().c_str());
93 }
94
95 return std::make_shared<arrow::ChunkedArray>(array);
96}
97
98SingleBuilder::SingleBuilder(std::shared_ptr<arrow::ChunkedArray> source, arrow::MemoryPool* pool)
100{
101 auto status = arrow::MakeBuilder(pool, arrow::int32(), &mBuilder);
102 if (!status.ok()) {
103 throw framework::runtime_error_f("Cannot create array builder for the single-valued index: %s", status.ToString().c_str());
104 }
105}
106
107void SingleBuilder::reset(std::shared_ptr<arrow::ChunkedArray> source)
108{
109 static_cast<ChunkedArrayIterator*>(this)->reset(source);
110 mBuilder->Reset();
111}
112
114{
115 auto count = mSourceSize - mPosition;
116 while (count > 0) {
117 size_t step = count / 2;
118 mPosition += step;
119 if (valueAt(mPosition) <= idx) {
120 count -= step + 1;
121 } else {
122 mPosition -= step;
123 count = step;
124 }
125 }
126
127 if (mPosition < mSourceSize && valueAt(mPosition) < idx) {
128 ++mPosition;
129 }
130
131 return (mPosition < mSourceSize && valueAt(mPosition) == idx);
132}
133
135{
136 arrow::Status status;
137 if (mPosition < mSourceSize && valueAt(mPosition) == idx) {
138 status = static_cast<arrow::Int32Builder*>(mBuilder.get())->Append((int)mPosition);
139 } else {
140 status = static_cast<arrow::Int32Builder*>(mBuilder.get())->Append(-1);
141 }
142 if (!status.ok()) {
143 throw framework::runtime_error_f("Cannot append to array: %s", status.ToString().c_str());
144 }
145}
146
147std::shared_ptr<arrow::ChunkedArray> SingleBuilder::result() const
148{
149 std::shared_ptr<arrow::Array> array;
150 auto status = static_cast<arrow::Int32Builder*>(mBuilder.get())->Finish(&array);
151 if (!status.ok()) {
152 cannotBuildAnArray(status.ToString().c_str());
153 }
154 return std::make_shared<arrow::ChunkedArray>(array);
155}
156
157SliceBuilder::SliceBuilder(std::shared_ptr<arrow::ChunkedArray> source, arrow::MemoryPool* pool)
159{
160 auto status = preSlice();
161 if (!status.ok()) {
162 throw framework::runtime_error_f("Cannot pre-slice the source for slice-index building: %s", status.ToString().c_str());
163 }
164
165 std::unique_ptr<arrow::ArrayBuilder> builder;
166 status = arrow::MakeBuilder(pool, arrow::int32(), &builder);
167 if (!status.ok()) {
168 throw framework::runtime_error_f("Cannot create array for the slice-index builder: %s", status.ToString().c_str());
169 }
170 mListBuilder = std::make_unique<arrow::FixedSizeListBuilder>(pool, std::move(builder), 2);
171 mValueBuilder = static_cast<arrow::FixedSizeListBuilder*>(mListBuilder.get())->value_builder();
172}
173
174void SliceBuilder::reset(std::shared_ptr<arrow::ChunkedArray> source)
175{
176 mValues = nullptr;
177 mCounts = nullptr;
178 mListBuilder->Reset();
179 mValuePos = 0;
180 static_cast<ChunkedArrayIterator*>(this)->reset(source);
181 auto status = preSlice();
182 if (!status.ok()) {
183 throw framework::runtime_error_f("Cannot pre-slice the source for slice-index building: %s", status.ToString().c_str());
184 }
185}
186
188{
189 auto count = mValues->length() - mValuePos;
190 while (count > 0) {
191 auto step = count / 2;
192 mValuePos += step;
193 if (mValues->Value(mValuePos) <= idx) {
194 count -= step + 1;
195 } else {
196 mValuePos -= step;
197 count = step;
198 }
199 }
200
201 if (mValuePos < mValues->length() && mValues->Value(mValuePos) <= idx) {
202 ++mPosition;
203 }
204
205 return (mValuePos < mValues->length() && mValues->Value(mValuePos) == idx);
206}
207
209{
210 int data[2] = {-1, -1};
211 if (mValuePos < mValues->length() && mValues->Value(mValuePos) == idx) {
212 for (auto i = 0; i < mValuePos; ++i) {
213 data[0] += mCounts->Value(i);
214 }
215 data[0] += 1;
216 data[1] = data[0] + mCounts->Value(mValuePos) - 1;
217 }
218 (void)static_cast<arrow::FixedSizeListBuilder*>(mListBuilder.get())->AppendValues(1);
219 (void)static_cast<arrow::Int32Builder*>(mValueBuilder)->AppendValues(data, 2);
220}
221
222std::shared_ptr<arrow::ChunkedArray> SliceBuilder::result() const
223{
224 std::shared_ptr<arrow::Array> array;
225 auto status = static_cast<arrow::FixedSizeListBuilder*>(mListBuilder.get())->Finish(&array);
226 if (!status.ok()) {
227 cannotBuildAnArray(status.ToString().c_str());
228 }
229 return std::make_shared<arrow::ChunkedArray>(array);
230}
231
232arrow::Status SliceBuilder::SliceBuilder::preSlice()
233{
234#if (ARROW_VERSION_MAJOR > 20)
235 auto status = arrow::compute::Initialize();
236 if (!status.ok()) {
237 throw framework::runtime_error_f("Cannot initialize arrow compute: %s", status.ToString().c_str());
238 }
239#else
240 arrow::Status status;
241#endif
242 arrow::Datum value_counts;
243 auto options = arrow::compute::ScalarAggregateOptions::Defaults();
244 ARROW_ASSIGN_OR_RAISE(value_counts, arrow::compute::CallFunction("value_counts", {mSource}, &options));
245 auto pair = static_cast<arrow::StructArray>(value_counts.array());
246 mValues = std::make_shared<arrow::NumericArray<arrow::Int32Type>>(pair.field(0)->data());
247 mCounts = std::make_shared<arrow::NumericArray<arrow::Int64Type>>(pair.field(1)->data());
248 return arrow::Status::OK();
249}
250
251ArrayBuilder::ArrayBuilder(std::shared_ptr<arrow::ChunkedArray> source, arrow::MemoryPool* pool)
253{
254 auto&& status = preFind();
255 if (!status.ok()) {
256 throw framework::runtime_error_f("Cannot pre-find in a source for array-index building: %s", status.ToString().c_str());
257 }
258
259 std::unique_ptr<arrow::ArrayBuilder> builder;
260 status = arrow::MakeBuilder(pool, arrow::int32(), &builder);
261 if (!status.ok()) {
262 throw framework::runtime_error_f("Cannot create array for the array-index builder: %s", status.ToString().c_str());
263 }
264 mListBuilder = std::make_unique<arrow::ListBuilder>(pool, std::move(builder));
265 mValueBuilder = static_cast<arrow::ListBuilder*>(mListBuilder.get())->value_builder();
266}
267
268void ArrayBuilder::reset(std::shared_ptr<arrow::ChunkedArray> source)
269{
270 static_cast<ChunkedArrayIterator*>(this)->reset(source);
271 auto status = preFind();
272 if (!status.ok()) {
273 throw framework::runtime_error_f("Cannot pre-find in a source for array-index building: %s", status.ToString().c_str());
274 }
275 mValues.clear();
276 mIndices.clear();
277 mListBuilder->Reset();
278}
279
281{
282 return (std::find(mValues.begin(), mValues.end(), idx) != mValues.end());
283}
284
286{
287 (void)static_cast<arrow::ListBuilder*>(mListBuilder.get())->Append();
288 if (std::find(mValues.begin(), mValues.end(), idx) != mValues.end()) {
289 (void)static_cast<arrow::Int32Builder*>(mValueBuilder)->AppendValues(mIndices[idx].data(), mIndices[idx].size());
290 } else {
291 (void)static_cast<arrow::Int32Builder*>(mValueBuilder)->AppendValues(nullptr, 0);
292 }
293}
294
295std::shared_ptr<arrow::ChunkedArray> ArrayBuilder::result() const
296{
297 std::shared_ptr<arrow::Array> array;
298 auto status = static_cast<arrow::ListBuilder*>(mListBuilder.get())->Finish(&array);
299 if (!status.ok()) {
300 cannotBuildAnArray(status.ToString().c_str());
301 }
302 return std::make_shared<arrow::ChunkedArray>(array);
303}
304
306{
307#if (ARROW_VERSION_MAJOR > 20)
308 auto status = arrow::compute::Initialize();
309 if (!status.ok()) {
310 throw framework::runtime_error_f("Cannot initialize arrow compute: %s", status.ToString().c_str());
311 }
312#else
313 arrow::Status status;
314#endif
315 arrow::Datum max;
316 auto options = arrow::compute::ScalarAggregateOptions::Defaults();
317 ARROW_ASSIGN_OR_RAISE(max, arrow::compute::CallFunction("max", {mSource}, &options));
318 auto maxValue = std::dynamic_pointer_cast<arrow::Int32Scalar>(max.scalar())->value;
319 mIndices.resize(maxValue + 1);
320
321 auto row = 0;
322 for (auto i = 0; i < mSource->length(); ++i) {
323 auto v = valueAt(i);
324 if (v >= 0) {
325 mValues.emplace_back(v);
326 mIndices[v].push_back(row);
327 }
328 ++row;
329 }
330 std::sort(mValues.begin(), mValues.end());
331
332 return arrow::Status::OK();
333}
334
335IndexColumnBuilder::IndexColumnBuilder(soa::IndexKind kind, int pos, arrow::MemoryPool* pool, std::shared_ptr<arrow::ChunkedArray> source)
336 : mColumnPos{pos}
337{
338 switch (kind) {
340 builder = SelfBuilder{pool};
341 break;
344 break;
346 builder = SliceBuilder{source, pool};
347 break;
349 builder = ArrayBuilder{source, pool};
350 break;
351 default:
353 }
354}
355
356void IndexColumnBuilder::reset(std::shared_ptr<arrow::ChunkedArray> source)
357{
358 std::visit(
360 [](std::monostate) {},
361 [&source](auto& b) { b.reset(source); }},
362 builder);
363}
364
366{
367 return std::visit(
369 [](std::monostate) { return false; },
370 [&idx](auto& b) { return b.find(idx); },
371 },
372 builder);
373}
374
376{
377 std::visit(
379 [](std::monostate) {},
380 [&idx](auto& b) { b.fill(idx); }},
381 builder);
382}
383
384std::shared_ptr<arrow::ChunkedArray> IndexColumnBuilder::result() const
385{
386 return std::visit(
388 [](std::monostate) -> std::shared_ptr<arrow::ChunkedArray> { return nullptr; },
389 [](auto& b) { return b.result(); }},
390 builder);
391}
392
393std::shared_ptr<arrow::Int32Array> ChunkedArrayIterator::getCurrentArray()
394{
395 auto chunk = mSource->chunk(mChunk);
396 mOffset = chunk->offset();
397 return std::static_pointer_cast<arrow::Int32Array>(chunk);
398}
399
401{
402 auto previousArray = getCurrentArray();
403 mFirstIndex += previousArray->length();
404
405 ++mChunk;
406 auto array = getCurrentArray();
407 mCurrent = reinterpret_cast<int const*>(array->values()->data()) + mOffset - mFirstIndex;
408 mLast = mCurrent + array->length() + mFirstIndex;
409}
410
412{
413 auto previousArray = getCurrentArray();
414 mFirstIndex -= previousArray->length();
415
416 --mChunk;
417 auto array = getCurrentArray();
418 mCurrent = reinterpret_cast<int const*>(array->values()->data()) + mOffset - mFirstIndex;
419 mLast = mCurrent + array->length() + mFirstIndex;
420}
421
423{
424 while (O2_BUILTIN_UNLIKELY(mCurrent + pos >= mLast)) {
425 nextChunk();
426 }
428 prevChunk();
429 }
430 return *(mCurrent + pos);
431}
432} // namespace o2::framework
#define O2_BUILTIN_UNLIKELY(x)
int32_t i
uint16_t pos
Definition RawData.h:3
GLint GLsizei count
Definition glcorearb.h:399
const GLdouble * v
Definition glcorearb.h:832
GLenum array
Definition glcorearb.h:4274
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLsizei GLsizei GLchar * source
Definition glcorearb.h:798
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei GLsizei * length
Definition glcorearb.h:790
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error(const char *)
void cannotBuildAnArray(const char *reason)
RuntimeErrorRef runtime_error_f(const char *,...)
arrow::ArrayBuilder * mValueBuilder
std::shared_ptr< arrow::ChunkedArray > result() const
std::vector< std::vector< int > > mIndices
ArrayBuilder(std::shared_ptr< arrow::ChunkedArray > source, arrow::MemoryPool *pool)
std::unique_ptr< arrow::ArrayBuilder > mListBuilder
void reset(std::shared_ptr< arrow::ChunkedArray > source)
std::shared_ptr< arrow::Int32Array > getCurrentArray()
ChunkedArrayIterator(std::shared_ptr< arrow::ChunkedArray > source)
std::shared_ptr< arrow::Int32Array > mCurrentArray
void reset(std::shared_ptr< arrow::ChunkedArray > &source)
std::shared_ptr< arrow::ChunkedArray > mSource
IndexColumnBuilder(soa::IndexKind kind, int pos, arrow::MemoryPool *pool, std::shared_ptr< arrow::ChunkedArray > source=nullptr)
std::shared_ptr< arrow::ChunkedArray > result() const
void reset(std::shared_ptr< arrow::ChunkedArray > source=nullptr)
std::variant< std::monostate, SelfBuilder, SingleBuilder, SliceBuilder, ArrayBuilder > builder
SelfBuilder(arrow::MemoryPool *pool)
std::shared_ptr< arrow::ChunkedArray > result() const
std::unique_ptr< arrow::ArrayBuilder > mBuilder
std::unique_ptr< framework::ChunkedArrayIterator > keyIndex
void reset(std::shared_ptr< arrow::ChunkedArray >)
std::unique_ptr< arrow::ArrayBuilder > mBuilder
SingleBuilder(std::shared_ptr< arrow::ChunkedArray > source, arrow::MemoryPool *pool)
std::shared_ptr< arrow::ChunkedArray > result() const
void reset(std::shared_ptr< arrow::ChunkedArray > source)
std::shared_ptr< arrow::NumericArray< arrow::Int32Type > > mValues
std::unique_ptr< arrow::ArrayBuilder > mListBuilder
std::shared_ptr< arrow::ChunkedArray > result() const
SliceBuilder(std::shared_ptr< arrow::ChunkedArray > source, arrow::MemoryPool *pool)
void reset(std::shared_ptr< arrow::ChunkedArray > source)
arrow::ArrayBuilder * mValueBuilder
std::shared_ptr< arrow::NumericArray< arrow::Int64Type > > mCounts
From https://en.cppreference.com/w/cpp/utility/variant/visit.
constexpr size_t max
std::vector< int > row