Project
Loading...
Searching...
No Matches
FairMQResizableBuffer.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13#include <fairmq/Message.h>
14#include <arrow/status.h>
15#include <arrow/util/config.h>
16#include <cassert>
17#include <utility>
18
20{
21void CloseFromDestructor(FileInterface* file);
22}
23
24namespace o2::framework
25{
26static constexpr int64_t kBufferMinimumSize = 256;
27
28FairMQOutputStream::FairMQOutputStream()
29 : is_open_(false), capacity_(0), position_(0), mutable_data_(nullptr) {}
30
31FairMQOutputStream::FairMQOutputStream(const std::shared_ptr<ResizableBuffer>& buffer)
32 : buffer_(buffer),
33 is_open_(true),
34 capacity_(buffer->size()),
35 position_(0),
36 mutable_data_(buffer->mutable_data()) {}
37
38Result<std::shared_ptr<FairMQOutputStream>> FairMQOutputStream::Create(
39 int64_t initial_capacity, MemoryPool* pool)
40{
41 // ctor is private, so cannot use make_shared
42 auto ptr = std::shared_ptr<FairMQOutputStream>(new FairMQOutputStream);
43 RETURN_NOT_OK(ptr->Reset(initial_capacity, pool));
44 return ptr;
45}
46
47Status FairMQOutputStream::Reset(int64_t initial_capacity, MemoryPool* pool)
48{
49 ARROW_ASSIGN_OR_RAISE(buffer_, AllocateResizableBuffer(initial_capacity, pool));
50 is_open_ = true;
51 capacity_ = initial_capacity;
52 position_ = 0;
53 mutable_data_ = buffer_->mutable_data();
54 return Status::OK();
55}
56
58{
59 if (is_open_) {
60 is_open_ = false;
61 if (position_ < capacity_) {
62 RETURN_NOT_OK(buffer_->Resize(position_, false));
63 }
64 }
65 return Status::OK();
66}
67
68bool FairMQOutputStream::closed() const { return !is_open_; }
69
70Result<std::shared_ptr<Buffer>> FairMQOutputStream::Finish()
71{
72 RETURN_NOT_OK(Close());
73 buffer_->ZeroPadding();
74 is_open_ = false;
75 return std::move(buffer_);
76}
77
78Result<int64_t> FairMQOutputStream::Tell() const { return position_; }
79
80Status FairMQOutputStream::Write(const void* data, int64_t nbytes)
81{
82 if (ARROW_PREDICT_FALSE(!is_open_)) {
83 return Status::IOError("OutputStream is closed");
84 }
85 if (ARROW_PREDICT_TRUE(nbytes > 0)) {
86 if (ARROW_PREDICT_FALSE(position_ + nbytes >= capacity_)) {
87 RETURN_NOT_OK(Reserve(nbytes));
88 }
89 memcpy(mutable_data_ + position_, data, nbytes);
90 position_ += nbytes;
91 }
92 return Status::OK();
93}
94
95Status FairMQOutputStream::Reserve(int64_t nbytes)
96{
97 // Always overallocate by doubling. It seems that it is a better growth
98 // strategy, at least for memory_benchmark.cc.
99 // This may be because it helps match the allocator's allocation buckets
100 // more exactly. Or perhaps it hits a sweet spot in jemalloc.
101 int64_t new_capacity = std::max(kBufferMinimumSize, capacity_);
102 new_capacity = position_ + nbytes;
103 if (new_capacity > capacity_) {
104 RETURN_NOT_OK(buffer_->Resize(new_capacity));
105 capacity_ = new_capacity;
106 mutable_data_ = buffer_->mutable_data();
107 }
108 return Status::OK();
109}
110
112
113// Creates an empty message
115 : ResizableBuffer(nullptr, 0),
116 mMessage{nullptr},
117 mCreator{std::move(creator)}
118{
119 this->data_ = nullptr;
120 this->capacity_ = 0;
121 this->size_ = 0;
122}
123
124arrow::Status FairMQResizableBuffer::Resize(const int64_t newSize, bool shrink_to_fit)
125{
126 // NOTE: we ignore "shrink_to_fit" because in any case we
127 // invoke SetUsedSize when we send the message. This
128 // way we avoid unneeded copies at the arrow level.
129 if (newSize > this->capacity_) {
130 auto status = this->Reserve(newSize);
131 if (status.ok() == false) {
132 return status;
133 }
134 }
135 assert(newSize <= this->capacity_);
136
137 this->size_ = newSize;
138 assert(this->size_ <= mMessage->GetSize());
139 return arrow::Status::OK();
140}
141
142arrow::Status FairMQResizableBuffer::Reserve(const int64_t capacity)
143{
144 assert(!mMessage || this->size_ <= mMessage->GetSize());
145 assert(!mMessage || this->capacity_ == mMessage->GetSize());
146 if (capacity <= this->capacity_) {
147 return arrow::Status::OK();
148 }
149 auto newMessage = mCreator(capacity);
150 assert(!mMessage || capacity > mMessage->GetSize());
151 if (mMessage) {
152 memcpy(newMessage->GetData(), mMessage->GetData(), mMessage->GetSize());
153 }
154 mMessage = std::move(newMessage);
155 assert(mMessage);
156 this->data_ = reinterpret_cast<uint8_t*>(mMessage->GetData());
157 this->capacity_ = static_cast<int64_t>(mMessage->GetSize());
158 assert(this->data_);
159 return arrow::Status::OK();
160}
161
162std::unique_ptr<fair::mq::Message> FairMQResizableBuffer::Finalise()
163{
164 this->data_ = nullptr;
165 this->capacity_ = 0;
166 this->size_ = 0;
167 return std::move(mMessage);
168}
169
170} // namespace o2::framework
TBranch * ptr
Status Reset(int64_t initial_capacity=1024, MemoryPool *pool=default_memory_pool())
Initialize state of OutputStream with newly allocated memory and set position to 0.
Status Close() override
Close the stream, preserving the buffer (retrieve it with Finish()).
static Result< std::shared_ptr< FairMQOutputStream > > Create(int64_t initial_capacity=4096, MemoryPool *pool=default_memory_pool())
Create in-memory output stream with indicated capacity using a memory pool.
Result< std::shared_ptr< Buffer > > Finish()
Close the stream and return the buffer.
Status Write(const void *data, int64_t nbytes) override
Result< int64_t > Tell() const override
arrow::Status Reserve(const int64_t capacity) override
arrow::Status Resize(const int64_t new_size, bool shrink_to_fit) override
std::unique_ptr< fair::mq::Message > Finalise()
std::function< std::unique_ptr< fair::mq::Message >(size_t)> Creator
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLboolean * data
Definition glcorearb.h:298
void CloseFromDestructor(FileInterface *file)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
Defining DataPointCompositeObject explicitly as copiable.