12#include <arrow/array/data.h>
13#include <arrow/type_fwd.h>
14#include <arrow/array/array_primitive.h>
15#include <arrow/array/array_nested.h>
16#include <arrow/record_batch.h>
17#include <arrow/type.h>
18#include <arrow/array/util.h>
21static constexpr int64_t kBufferMinimumSize = 256;
28 const std::shared_ptr<arrow::dataset::ScanOptions>& options)
30 auto generator = [
this]() -> arrow::Future<std::shared_ptr<arrow::RecordBatch>> {
31 std::vector<std::shared_ptr<arrow::Array>> columns;
32 columns.reserve(this->physical_schema_->fields().size());
34 for (
auto& field : this->physical_schema_->fields()) {
35 if (
auto listType = std::dynamic_pointer_cast<arrow::FixedSizeListType>(field->type())) {
36 size_t size = mRows * listType->list_size();
37 if (field->type()->field(0)->type()->byte_width() == 0) {
40 size *= field->type()->field(0)->type()->byte_width();
42 auto vdata = std::make_shared<arrow::ArrayData>(field->type()->field(0)->type(), mRows * listType->list_size(), std::vector<std::shared_ptr<arrow::Buffer>>{nullptr, GetPlaceholderForOp(size)});
43 columns.push_back(std::make_shared<arrow::FixedSizeListArray>(field->type(), (int32_t)mRows, arrow::MakeArray(vdata)));
46 if (field->type()->byte_width() == 0) {
49 size *= field->type()->byte_width();
51 auto data = std::make_shared<arrow::ArrayData>(field->type(), mRows, std::vector<std::shared_ptr<arrow::Buffer>>{nullptr, GetPlaceholderForOp(size)});
52 columns.push_back(arrow::MakeArray(
data));
55 return arrow::RecordBatch::Make(physical_schema_, mRows, columns);
60PreallocatedOutputStream::PreallocatedOutputStream()
61 : is_open_(false), capacity_(0), position_(0), mutable_data_(nullptr) {}
63PreallocatedOutputStream::PreallocatedOutputStream(std::vector<size_t>&
sizes,
64 const std::shared_ptr<arrow::ResizableBuffer>&
buffer)
70 mutable_data_(
buffer->mutable_data()) {}
73 std::vector<size_t>& ops,
74 int64_t initial_capacity, arrow::MemoryPool* pool)
78 RETURN_NOT_OK(
ptr->Reset(ops, initial_capacity, pool));
83 int64_t initial_capacity, arrow::MemoryPool* pool)
85 ARROW_ASSIGN_OR_RAISE(buffer_, AllocateResizableBuffer(initial_capacity, pool));
88 capacity_ = initial_capacity;
90 mutable_data_ = buffer_->mutable_data();
91 return arrow::Status::OK();
98 if (position_ < capacity_) {
99 RETURN_NOT_OK(buffer_->Resize(position_,
false));
102 return arrow::Status::OK();
109 RETURN_NOT_OK(
Close());
110 buffer_->ZeroPadding();
112 return std::move(buffer_);
119 if (ARROW_PREDICT_FALSE(!is_open_)) {
120 return arrow::Status::IOError(
"OutputStream is closed");
122 if (ARROW_PREDICT_TRUE(nbytes == 0)) {
123 return arrow::Status::OK();
125 if (ARROW_PREDICT_FALSE(position_ + nbytes >= capacity_)) {
126 RETURN_NOT_OK(Reserve(nbytes));
130 if (
ref >= sizes_.size()) {
131 memcpy(mutable_data_ + position_,
data, nbytes);
133 return arrow::Status::OK();
137 return arrow::Status::OK();
140arrow::Status PreallocatedOutputStream::Reserve(int64_t nbytes)
146 int64_t new_capacity = std::max(kBufferMinimumSize, capacity_);
147 new_capacity = position_ + nbytes;
148 if (new_capacity > capacity_) {
149 RETURN_NOT_OK(buffer_->Resize(new_capacity));
150 capacity_ = new_capacity;
151 mutable_data_ = buffer_->mutable_data();
153 return arrow::Status::OK();
arrow::Result< arrow::RecordBatchGenerator > ScanBatchesAsync(const std::shared_ptr< arrow::dataset::ScanOptions > &options) override
arrow::Result< std::shared_ptr< arrow::Buffer > > Finish()
Close the stream and return the buffer.
bool closed() const override
arrow::Status Close() override
Close the stream, preserving the buffer (retrieve it with Finish()).
static arrow::Result< std::shared_ptr< PreallocatedOutputStream > > Create(std::vector< size_t > &sizes, int64_t initial_capacity=4096, arrow::MemoryPool *pool=arrow::default_memory_pool())
Create in-memory output stream with indicated capacity using a memory pool.
arrow::Status Write(const void *data, int64_t nbytes) override
arrow::Status Reset(std::vector< size_t > sizes, int64_t initial_capacity, arrow::MemoryPool *pool)
Initialize state of OutputStream with newly allocated memory and set position to 0.
arrow::Result< int64_t > Tell() const override
GLuint GLsizei const GLuint const GLintptr const GLsizeiptr * sizes
Defining PrimaryVertex explicitly as messageable.