15#include <arrow/compute/api_aggregate.h>
16#include <arrow/compute/kernel.h>
17#include <arrow/status.h>
18#include <arrow/table.h>
19#include <arrow/util/key_value_metadata.h>
38 mArrowType{
arrow::int32()}
40 auto status = arrow::MakeBuilder(pool, arrow::int32(), &
mBuilder);
63 if (preSlice().ok()) {
64 mListBuilder = std::make_unique<arrow::FixedSizeListBuilder>(pool, std::move(
mBuilder), mListSize);
65 mValueBuilder =
static_cast<arrow::FixedSizeListBuilder*
>(mListBuilder.get())->value_builder();
66 mArrowType = arrow::fixed_size_list(arrow::int32(), 2);
73 mListBuilder = std::make_unique<arrow::ListBuilder>(pool, std::move(
mBuilder));
74 mValueBuilder =
static_cast<arrow::ListBuilder*
>(mListBuilder.get())->value_builder();
81 throw runtime_error_f(
"Invalid list size for index column: %d", mListSize);
85arrow::Status IndexColumnBuilder::preSlice()
87 arrow::Datum value_counts;
88 auto options = arrow::compute::ScalarAggregateOptions::Defaults();
89 ARROW_ASSIGN_OR_RAISE(value_counts, arrow::compute::CallFunction(
"value_counts", {
mSource}, &options));
90 auto pair =
static_cast<arrow::StructArray
>(value_counts.array());
91 mValuesArrow = std::make_shared<arrow::NumericArray<arrow::Int32Type>>(pair.field(0)->data());
92 mCounts = std::make_shared<arrow::NumericArray<arrow::Int64Type>>(pair.field(1)->data());
93 return arrow::Status::OK();
96arrow::Status IndexColumnBuilder::preFind()
99 auto options = arrow::compute::ScalarAggregateOptions::Defaults();
100 ARROW_ASSIGN_OR_RAISE(
max, arrow::compute::CallFunction(
"max", {
mSource}, &options));
101 auto maxValue = std::dynamic_pointer_cast<arrow::Int32Scalar>(
max.scalar())->value;
102 mIndices.resize(maxValue + 1);
105 for (
auto i = 0;
i <
mSource->length(); ++
i) {
108 mValues.emplace_back(
v);
109 mIndices[
v].push_back(
row);
113 std::sort(mValues.begin(), mValues.end());
115 return arrow::Status::OK();
118std::shared_ptr<arrow::ChunkedArray> IndexColumnBuilder::resultSingle()
const
120 std::shared_ptr<arrow::Array>
array;
121 auto status =
static_cast<arrow::Int32Builder*
>(mValueBuilder)->Finish(&
array);
125 return std::make_shared<arrow::ChunkedArray>(
array);
128std::shared_ptr<arrow::ChunkedArray> IndexColumnBuilder::resultSlice()
const
130 std::shared_ptr<arrow::Array>
array;
131 auto status =
static_cast<arrow::FixedSizeListBuilder*
>(mListBuilder.get())->Finish(&
array);
135 return std::make_shared<arrow::ChunkedArray>(
array);
138std::shared_ptr<arrow::ChunkedArray> IndexColumnBuilder::resultMulti()
const
140 std::shared_ptr<arrow::Array>
array;
141 auto status =
static_cast<arrow::ListBuilder*
>(mListBuilder.get())->Finish(&
array);
145 return std::make_shared<arrow::ChunkedArray>(
array);
148bool IndexColumnBuilder::findSingle(
int idx)
169bool IndexColumnBuilder::findSlice(
int idx)
171 auto count = mValuesArrow->length() - mValuePos;
175 if (mValuesArrow->Value(mValuePos) <= idx) {
183 if (mValuePos < mValuesArrow->
length() && mValuesArrow->Value(mValuePos) <= idx) {
187 return (mValuePos < mValuesArrow->
length() && mValuesArrow->Value(mValuePos) == idx);
190bool IndexColumnBuilder::findMulti(
int idx)
192 return (std::find(mValues.begin(), mValues.end(), idx) != mValues.end());
195void IndexColumnBuilder::fillSingle(
int idx)
199 (
void)
static_cast<arrow::Int32Builder*
>(mValueBuilder)->Append((
int)
mPosition);
201 (
void)
static_cast<arrow::Int32Builder*
>(mValueBuilder)->Append(-1);
205void IndexColumnBuilder::fillSlice(
int idx)
207 int data[2] = {-1, -1};
208 if (mValuePos < mValuesArrow->
length() && mValuesArrow->Value(mValuePos) == idx) {
209 for (
auto i = 0;
i < mValuePos; ++
i) {
210 data[0] += mCounts->Value(
i);
213 data[1] =
data[0] + mCounts->Value(mValuePos) - 1;
215 (
void)
static_cast<arrow::FixedSizeListBuilder*
>(mListBuilder.get())->AppendValues(1);
216 (
void)
static_cast<arrow::Int32Builder*
>(mValueBuilder)->AppendValues(
data, 2);
219void IndexColumnBuilder::fillMulti(
int idx)
221 (
void)
static_cast<arrow::ListBuilder*
>(mListBuilder.get())->Append();
222 if (std::find(mValues.begin(), mValues.end(), idx) != mValues.end()) {
223 (
void)
static_cast<arrow::Int32Builder*
>(mValueBuilder)->AppendValues(mIndices[idx].
data(), mIndices[
idx].size());
225 (
void)
static_cast<arrow::Int32Builder*
>(mValueBuilder)->AppendValues(
nullptr, 0);
233 return std::static_pointer_cast<arrow::Int32Array>(chunk);
269std::shared_ptr<arrow::Table>
makeArrowTable(
const char*
label, std::vector<std::shared_ptr<arrow::ChunkedArray>>&& columns, std::vector<std::shared_ptr<arrow::Field>>&& fields)
271 auto schema = std::make_shared<arrow::Schema>(fields);
272 schema->WithMetadata(
273 std::make_shared<arrow::KeyValueMetadata>(
274 std::vector{std::string{
"label"}},
275 std::vector{std::string{
label}}));
276 return arrow::Table::Make(schema, columns);
#define O2_BUILTIN_UNLIKELY(x)
IndexColumnBuilder(std::shared_ptr< arrow::ChunkedArray > source, const char *name, int listSize, arrow::MemoryPool *pool)
GLuint const GLchar * name
GLsizei GLsizei GLchar * source
GLuint GLsizei GLsizei * length
GLuint GLsizei const GLchar * label
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error(const char *)
void cannotBuildAnArray()
RuntimeErrorRef runtime_error_f(const char *,...)
std::shared_ptr< arrow::Table > makeArrowTable(const char *label, std::vector< std::shared_ptr< arrow::ChunkedArray > > &&columns, std::vector< std::shared_ptr< arrow::Field > > &&fields)
std::shared_ptr< arrow::Int32Array > getCurrentArray()
ChunkedArrayIterator(std::shared_ptr< arrow::ChunkedArray > source)
std::shared_ptr< arrow::Int32Array > mCurrentArray
std::shared_ptr< arrow::ChunkedArray > mSource
SelfIndexColumnBuilder(const char *name, arrow::MemoryPool *pool)
std::unique_ptr< arrow::ArrayBuilder > mBuilder
std::shared_ptr< arrow::DataType > mArrowType
std::shared_ptr< arrow::Field > field() const