12#include <arrow/type_fwd.h>
13#include <arrow/array/array_primitive.h>
14#include <arrow/array/array_nested.h>
17static constexpr int64_t kBufferMinimumSize = 256;
24 const std::shared_ptr<arrow::dataset::ScanOptions>& options)
26 auto generator = [
this]() -> arrow::Future<std::shared_ptr<arrow::RecordBatch>> {
27 std::vector<std::shared_ptr<arrow::Array>> columns;
28 columns.reserve(this->physical_schema_->fields().size());
30 for (
auto& field : this->physical_schema_->fields()) {
31 if (
auto listType = std::dynamic_pointer_cast<arrow::FixedSizeListType>(field->type())) {
32 size_t size = mRows * listType->list_size();
33 if (field->type()->field(0)->type()->byte_width() == 0) {
36 size *= field->type()->field(0)->type()->byte_width();
38 auto varray = std::make_shared<arrow::PrimitiveArray>(field->type()->field(0)->type(), mRows * listType->list_size(), GetPlaceholderForOp(
size));
39 columns.push_back(std::make_shared<arrow::FixedSizeListArray>(field->type(), (int32_t)mRows, varray));
42 if (field->type()->byte_width() == 0) {
45 size *= field->type()->byte_width();
47 columns.push_back(std::make_shared<arrow::PrimitiveArray>(field->type(), mRows, GetPlaceholderForOp(
size)));
50 return arrow::RecordBatch::Make(physical_schema_, mRows, columns);
55PreallocatedOutputStream::PreallocatedOutputStream()
56 : is_open_(false), capacity_(0), position_(0), mutable_data_(nullptr) {}
58PreallocatedOutputStream::PreallocatedOutputStream(std::vector<size_t>&
sizes,
59 const std::shared_ptr<arrow::ResizableBuffer>&
buffer)
65 mutable_data_(
buffer->mutable_data()) {}
68 std::vector<size_t>& ops,
69 int64_t initial_capacity, arrow::MemoryPool* pool)
73 RETURN_NOT_OK(
ptr->Reset(ops, initial_capacity, pool));
78 int64_t initial_capacity, arrow::MemoryPool* pool)
80 ARROW_ASSIGN_OR_RAISE(buffer_, AllocateResizableBuffer(initial_capacity, pool));
83 capacity_ = initial_capacity;
85 mutable_data_ = buffer_->mutable_data();
86 return arrow::Status::OK();
93 if (position_ < capacity_) {
94 RETURN_NOT_OK(buffer_->Resize(position_,
false));
97 return arrow::Status::OK();
104 RETURN_NOT_OK(
Close());
105 buffer_->ZeroPadding();
107 return std::move(buffer_);
114 if (ARROW_PREDICT_FALSE(!is_open_)) {
115 return arrow::Status::IOError(
"OutputStream is closed");
117 if (ARROW_PREDICT_TRUE(nbytes == 0)) {
118 return arrow::Status::OK();
120 if (ARROW_PREDICT_FALSE(position_ + nbytes >= capacity_)) {
121 RETURN_NOT_OK(Reserve(nbytes));
125 if (
ref >= sizes_.size()) {
126 memcpy(mutable_data_ + position_,
data, nbytes);
128 return arrow::Status::OK();
132 return arrow::Status::OK();
135arrow::Status PreallocatedOutputStream::Reserve(int64_t nbytes)
141 int64_t new_capacity = std::max(kBufferMinimumSize, capacity_);
142 new_capacity = position_ + nbytes;
143 if (new_capacity > capacity_) {
144 RETURN_NOT_OK(buffer_->Resize(new_capacity));
145 capacity_ = new_capacity;
146 mutable_data_ = buffer_->mutable_data();
148 return arrow::Status::OK();
arrow::Result< arrow::RecordBatchGenerator > ScanBatchesAsync(const std::shared_ptr< arrow::dataset::ScanOptions > &options) override
arrow::Result< std::shared_ptr< arrow::Buffer > > Finish()
Close the stream and return the buffer.
bool closed() const override
arrow::Status Close() override
Close the stream, preserving the buffer (retrieve it with Finish()).
static arrow::Result< std::shared_ptr< PreallocatedOutputStream > > Create(std::vector< size_t > &sizes, int64_t initial_capacity=4096, arrow::MemoryPool *pool=arrow::default_memory_pool())
Create in-memory output stream with indicated capacity using a memory pool.
arrow::Status Write(const void *data, int64_t nbytes) override
arrow::Status Reset(std::vector< size_t > sizes, int64_t initial_capacity, arrow::MemoryPool *pool)
Initialize state of OutputStream with newly allocated memory and set position to 0.
arrow::Result< int64_t > Tell() const override
GLuint GLsizei const GLuint const GLintptr const GLsizeiptr * sizes
Defining PrimaryVertex explicitly as messageable.