13#include <fairmq/Message.h>
14#include <arrow/status.h>
15#include <arrow/util/config.h>
28static constexpr int64_t kBufferMinimumSize = 256;
30FairMQOutputStream::FairMQOutputStream()
31 : is_open_(false), capacity_(0), position_(0), mutable_data_(nullptr) {}
33FairMQOutputStream::FairMQOutputStream(
const std::shared_ptr<arrow::ResizableBuffer>&
buffer)
38 mutable_data_(
buffer->mutable_data()) {}
41 int64_t initial_capacity, arrow::MemoryPool* pool)
45 RETURN_NOT_OK(
ptr->Reset(initial_capacity, pool));
51 ARROW_ASSIGN_OR_RAISE(buffer_, AllocateResizableBuffer(initial_capacity, pool));
53 capacity_ = initial_capacity;
55 mutable_data_ = buffer_->mutable_data();
63 if (position_ < capacity_) {
64 RETURN_NOT_OK(buffer_->Resize(position_,
false));
74 RETURN_NOT_OK(
Close());
75 buffer_->ZeroPadding();
77 return std::move(buffer_);
84 if (ARROW_PREDICT_FALSE(!is_open_)) {
85 return Status::IOError(
"OutputStream is closed");
87 if (ARROW_PREDICT_TRUE(nbytes > 0)) {
88 if (ARROW_PREDICT_FALSE(position_ + nbytes >= capacity_)) {
89 RETURN_NOT_OK(Reserve(nbytes));
91 memcpy(mutable_data_ + position_,
data, nbytes);
97Status FairMQOutputStream::Reserve(int64_t nbytes)
103 int64_t new_capacity = std::max(kBufferMinimumSize, capacity_);
104 new_capacity = position_ + nbytes;
105 if (new_capacity > capacity_) {
106 RETURN_NOT_OK(buffer_->Resize(new_capacity));
107 capacity_ = new_capacity;
108 mutable_data_ = buffer_->mutable_data();
117 : ResizableBuffer(nullptr, 0),
119 mCreator{
std::move(creator)}
121 this->data_ =
nullptr;
131 if (newSize > this->capacity_) {
132 auto status = this->
Reserve(newSize);
133 if (status.ok() ==
false) {
137 assert(newSize <= this->capacity_);
139 this->size_ = newSize;
140 assert(this->size_ <= mMessage->GetSize());
141 return arrow::Status::OK();
146 assert(!mMessage || this->size_ <= mMessage->GetSize());
147 assert(!mMessage || this->capacity_ == mMessage->GetSize());
148 if (capacity <= this->capacity_) {
149 return arrow::Status::OK();
151 auto newMessage = mCreator(capacity);
152 assert(!mMessage || capacity > mMessage->GetSize());
154 memcpy(newMessage->GetData(), mMessage->GetData(), mMessage->GetSize());
156 mMessage = std::move(newMessage);
158 this->data_ =
reinterpret_cast<uint8_t*
>(mMessage->GetData());
159 this->capacity_ =
static_cast<int64_t
>(mMessage->GetSize());
161 return arrow::Status::OK();
166 this->data_ =
nullptr;
169 return std::move(mMessage);
arrow::Result< std::shared_ptr< arrow::Buffer > > Finish()
Close the stream and return the buffer.
arrow::Result< int64_t > Tell() const override
arrow::Status Close() override
Close the stream, preserving the buffer (retrieve it with Finish()).
static arrow::Result< std::shared_ptr< FairMQOutputStream > > Create(int64_t initial_capacity=4096, arrow::MemoryPool *pool=arrow::default_memory_pool())
Create in-memory output stream with indicated capacity using a memory pool.
arrow::Status Write(const void *data, int64_t nbytes) override
arrow::Status Reset(int64_t initial_capacity=1024, arrow::MemoryPool *pool=arrow::default_memory_pool())
Initialize state of OutputStream with newly allocated memory and set position to 0.
bool closed() const override
FairMQResizableBuffer(Creator)
arrow::Status Reserve(const int64_t capacity) override
arrow::Status Resize(const int64_t new_size, bool shrink_to_fit) override
std::unique_ptr< fair::mq::Message > Finalise()
std::function< std::unique_ptr< fair::mq::Message >(size_t)> Creator
~FairMQResizableBuffer() override
void CloseFromDestructor(FileInterface *file)
Defining PrimaryVertex explicitly as messageable.
Defining DataPointCompositeObject explicitly as copiable.