Project
Loading...
Searching...
No Matches
FairMQResizableBuffer.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13#include <fairmq/Message.h>
14#include <arrow/status.h>
15#include <arrow/util/config.h>
16#include <cassert>
17#include <utility>
18
19using arrow::Status;
20
22{
23void CloseFromDestructor(FileInterface* file);
24}
25
26namespace o2::framework
27{
28static constexpr int64_t kBufferMinimumSize = 256;
29
30FairMQOutputStream::FairMQOutputStream()
31 : is_open_(false), capacity_(0), position_(0), mutable_data_(nullptr) {}
32
33FairMQOutputStream::FairMQOutputStream(const std::shared_ptr<arrow::ResizableBuffer>& buffer)
34 : buffer_(buffer),
35 is_open_(true),
36 capacity_(buffer->size()),
37 position_(0),
38 mutable_data_(buffer->mutable_data()) {}
39
40arrow::Result<std::shared_ptr<FairMQOutputStream>> FairMQOutputStream::Create(
41 int64_t initial_capacity, arrow::MemoryPool* pool)
42{
43 // ctor is private, so cannot use make_shared
44 auto ptr = std::shared_ptr<FairMQOutputStream>(new FairMQOutputStream);
45 RETURN_NOT_OK(ptr->Reset(initial_capacity, pool));
46 return ptr;
47}
48
49Status FairMQOutputStream::Reset(int64_t initial_capacity, arrow::MemoryPool* pool)
50{
51 ARROW_ASSIGN_OR_RAISE(buffer_, AllocateResizableBuffer(initial_capacity, pool));
52 is_open_ = true;
53 capacity_ = initial_capacity;
54 position_ = 0;
55 mutable_data_ = buffer_->mutable_data();
56 return Status::OK();
57}
58
60{
61 if (is_open_) {
62 is_open_ = false;
63 if (position_ < capacity_) {
64 RETURN_NOT_OK(buffer_->Resize(position_, false));
65 }
66 }
67 return Status::OK();
68}
69
70bool FairMQOutputStream::closed() const { return !is_open_; }
71
72arrow::Result<std::shared_ptr<arrow::Buffer>> FairMQOutputStream::Finish()
73{
74 RETURN_NOT_OK(Close());
75 buffer_->ZeroPadding();
76 is_open_ = false;
77 return std::move(buffer_);
78}
79
80arrow::Result<int64_t> FairMQOutputStream::Tell() const { return position_; }
81
82Status FairMQOutputStream::Write(const void* data, int64_t nbytes)
83{
84 if (ARROW_PREDICT_FALSE(!is_open_)) {
85 return Status::IOError("OutputStream is closed");
86 }
87 if (ARROW_PREDICT_TRUE(nbytes > 0)) {
88 if (ARROW_PREDICT_FALSE(position_ + nbytes >= capacity_)) {
89 RETURN_NOT_OK(Reserve(nbytes));
90 }
91 memcpy(mutable_data_ + position_, data, nbytes);
92 position_ += nbytes;
93 }
94 return Status::OK();
95}
96
97Status FairMQOutputStream::Reserve(int64_t nbytes)
98{
99 // Always overallocate by doubling. It seems that it is a better growth
100 // strategy, at least for memory_benchmark.cc.
101 // This may be because it helps match the allocator's allocation buckets
102 // more exactly. Or perhaps it hits a sweet spot in jemalloc.
103 int64_t new_capacity = std::max(kBufferMinimumSize, capacity_);
104 new_capacity = position_ + nbytes;
105 if (new_capacity > capacity_) {
106 RETURN_NOT_OK(buffer_->Resize(new_capacity));
107 capacity_ = new_capacity;
108 mutable_data_ = buffer_->mutable_data();
109 }
110 return Status::OK();
111}
112
114
115// Creates an empty message
117 : ResizableBuffer(nullptr, 0),
118 mMessage{nullptr},
119 mCreator{std::move(creator)}
120{
121 this->data_ = nullptr;
122 this->capacity_ = 0;
123 this->size_ = 0;
124}
125
126arrow::Status FairMQResizableBuffer::Resize(const int64_t newSize, bool shrink_to_fit)
127{
128 // NOTE: we ignore "shrink_to_fit" because in any case we
129 // invoke SetUsedSize when we send the message. This
130 // way we avoid unneeded copies at the arrow level.
131 if (newSize > this->capacity_) {
132 auto status = this->Reserve(newSize);
133 if (status.ok() == false) {
134 return status;
135 }
136 }
137 assert(newSize <= this->capacity_);
138
139 this->size_ = newSize;
140 assert(this->size_ <= mMessage->GetSize());
141 return arrow::Status::OK();
142}
143
144arrow::Status FairMQResizableBuffer::Reserve(const int64_t capacity)
145{
146 assert(!mMessage || this->size_ <= mMessage->GetSize());
147 assert(!mMessage || this->capacity_ == mMessage->GetSize());
148 if (capacity <= this->capacity_) {
149 return arrow::Status::OK();
150 }
151 auto newMessage = mCreator(capacity);
152 assert(!mMessage || capacity > mMessage->GetSize());
153 if (mMessage) {
154 memcpy(newMessage->GetData(), mMessage->GetData(), mMessage->GetSize());
155 }
156 mMessage = std::move(newMessage);
157 assert(mMessage);
158 this->data_ = reinterpret_cast<uint8_t*>(mMessage->GetData());
159 this->capacity_ = static_cast<int64_t>(mMessage->GetSize());
160 assert(this->data_);
161 return arrow::Status::OK();
162}
163
164std::unique_ptr<fair::mq::Message> FairMQResizableBuffer::Finalise()
165{
166 this->data_ = nullptr;
167 this->capacity_ = 0;
168 this->size_ = 0;
169 return std::move(mMessage);
170}
171
172} // namespace o2::framework
TBranch * ptr
arrow::Result< std::shared_ptr< arrow::Buffer > > Finish()
Close the stream and return the buffer.
arrow::Result< int64_t > Tell() const override
arrow::Status Close() override
Close the stream, preserving the buffer (retrieve it with Finish()).
static arrow::Result< std::shared_ptr< FairMQOutputStream > > Create(int64_t initial_capacity=4096, arrow::MemoryPool *pool=arrow::default_memory_pool())
Create in-memory output stream with indicated capacity using a memory pool.
arrow::Status Write(const void *data, int64_t nbytes) override
arrow::Status Reset(int64_t initial_capacity=1024, arrow::MemoryPool *pool=arrow::default_memory_pool())
Initialize state of OutputStream with newly allocated memory and set position to 0.
arrow::Status Reserve(const int64_t capacity) override
arrow::Status Resize(const int64_t new_size, bool shrink_to_fit) override
std::unique_ptr< fair::mq::Message > Finalise()
std::function< std::unique_ptr< fair::mq::Message >(size_t)> Creator
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLboolean * data
Definition glcorearb.h:298
void CloseFromDestructor(FileInterface *file)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
Defining DataPointCompositeObject explicitly as copiable.