13#include <fairmq/Message.h>
14#include <arrow/status.h>
15#include <arrow/util/config.h>
26static constexpr int64_t kBufferMinimumSize = 256;
28FairMQOutputStream::FairMQOutputStream()
29 : is_open_(false), capacity_(0), position_(0), mutable_data_(nullptr) {}
31FairMQOutputStream::FairMQOutputStream(
const std::shared_ptr<ResizableBuffer>&
buffer)
36 mutable_data_(
buffer->mutable_data()) {}
39 int64_t initial_capacity, MemoryPool* pool)
43 RETURN_NOT_OK(
ptr->Reset(initial_capacity, pool));
49 ARROW_ASSIGN_OR_RAISE(buffer_, AllocateResizableBuffer(initial_capacity, pool));
51 capacity_ = initial_capacity;
53 mutable_data_ = buffer_->mutable_data();
61 if (position_ < capacity_) {
62 RETURN_NOT_OK(buffer_->Resize(position_,
false));
72 RETURN_NOT_OK(
Close());
73 buffer_->ZeroPadding();
75 return std::move(buffer_);
82 if (ARROW_PREDICT_FALSE(!is_open_)) {
83 return Status::IOError(
"OutputStream is closed");
85 if (ARROW_PREDICT_TRUE(nbytes > 0)) {
86 if (ARROW_PREDICT_FALSE(position_ + nbytes >= capacity_)) {
87 RETURN_NOT_OK(Reserve(nbytes));
89 memcpy(mutable_data_ + position_,
data, nbytes);
95Status FairMQOutputStream::Reserve(int64_t nbytes)
101 int64_t new_capacity = std::max(kBufferMinimumSize, capacity_);
102 new_capacity = position_ + nbytes;
103 if (new_capacity > capacity_) {
104 RETURN_NOT_OK(buffer_->Resize(new_capacity));
105 capacity_ = new_capacity;
106 mutable_data_ = buffer_->mutable_data();
115 : ResizableBuffer(nullptr, 0),
117 mCreator{
std::move(creator)}
119 this->data_ =
nullptr;
129 if (newSize > this->capacity_) {
130 auto status = this->
Reserve(newSize);
131 if (status.ok() ==
false) {
135 assert(newSize <= this->capacity_);
137 this->size_ = newSize;
138 assert(this->size_ <= mMessage->GetSize());
139 return arrow::Status::OK();
144 assert(!mMessage || this->size_ <= mMessage->GetSize());
145 assert(!mMessage || this->capacity_ == mMessage->GetSize());
146 if (capacity <= this->capacity_) {
147 return arrow::Status::OK();
149 auto newMessage = mCreator(capacity);
150 assert(!mMessage || capacity > mMessage->GetSize());
152 memcpy(newMessage->GetData(), mMessage->GetData(), mMessage->GetSize());
154 mMessage = std::move(newMessage);
156 this->data_ =
reinterpret_cast<uint8_t*
>(mMessage->GetData());
157 this->capacity_ =
static_cast<int64_t
>(mMessage->GetSize());
159 return arrow::Status::OK();
164 this->data_ =
nullptr;
167 return std::move(mMessage);
Status Reset(int64_t initial_capacity=1024, MemoryPool *pool=default_memory_pool())
Initialize state of OutputStream with newly allocated memory and set position to 0.
Status Close() override
Close the stream, preserving the buffer (retrieve it with Finish()).
static Result< std::shared_ptr< FairMQOutputStream > > Create(int64_t initial_capacity=4096, MemoryPool *pool=default_memory_pool())
Create in-memory output stream with indicated capacity using a memory pool.
Result< std::shared_ptr< Buffer > > Finish()
Close the stream and return the buffer.
Status Write(const void *data, int64_t nbytes) override
Result< int64_t > Tell() const override
bool closed() const override
FairMQResizableBuffer(Creator)
arrow::Status Reserve(const int64_t capacity) override
arrow::Status Resize(const int64_t new_size, bool shrink_to_fit) override
std::unique_ptr< fair::mq::Message > Finalise()
std::function< std::unique_ptr< fair::mq::Message >(size_t)> Creator
~FairMQResizableBuffer() override
void CloseFromDestructor(FileInterface *file)
Defining PrimaryVertex explicitly as messageable.
Defining DataPointCompositeObject explicitly as copiable.