Project
Loading...
Searching...
No Matches
FairMQResizableBuffer.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#ifndef O2_FRAMEWORK_FAIRMQRESIZABLEBUFFER_H_
13#define O2_FRAMEWORK_FAIRMQRESIZABLEBUFFER_H_
14
15#include <memory>
16#include <functional>
17#include <arrow/buffer.h>
18#include "arrow/io/interfaces.h"
19#include "arrow/status.h"
20#include "arrow/util/future.h"
21
22#include <fairmq/FwdDecls.h>
23
24namespace o2::framework
25{
26
27using namespace arrow;
28using namespace arrow::io;
29
31{
32 public:
33 explicit FairMQOutputStream(const std::shared_ptr<ResizableBuffer>& buffer);
34
41 static Result<std::shared_ptr<FairMQOutputStream>> Create(
42 int64_t initial_capacity = 4096, MemoryPool* pool = default_memory_pool());
43
44 // By the time we call the destructor, the contents
45 // of the buffer are already moved to fairmq
46 // for being sent.
47 ~FairMQOutputStream() override = default;
48
49 // Implement the OutputStream interface
50
52 Status Close() override;
53 [[nodiscard]] bool closed() const override;
54 [[nodiscard]] Result<int64_t> Tell() const override;
55 Status Write(const void* data, int64_t nbytes) override;
56
58 using OutputStream::Write;
60
62 Result<std::shared_ptr<Buffer>> Finish();
63
69 Status Reset(int64_t initial_capacity = 1024, MemoryPool* pool = default_memory_pool());
70
71 [[nodiscard]] int64_t capacity() const { return capacity_; }
72
73 private:
75
76 // Ensures there is sufficient space available to write nbytes
77 Status Reserve(int64_t nbytes);
78
79 std::shared_ptr<ResizableBuffer> buffer_;
80 bool is_open_;
81 int64_t capacity_;
82 int64_t position_;
83 uint8_t* mutable_data_;
84};
85
90class FairMQResizableBuffer : public ::arrow::ResizableBuffer
91{
92 public:
93 using Creator = std::function<std::unique_ptr<fair::mq::Message>(size_t)>;
94
97
104 arrow::Status Resize(const int64_t new_size, bool shrink_to_fit) override;
109 arrow::Status Reserve(const int64_t capacity) override;
110
114 std::unique_ptr<fair::mq::Message> Finalise();
115
116 private:
117 std::unique_ptr<fair::mq::Message> mMessage;
118 int64_t mSize;
119 Creator mCreator;
120};
121
122} // namespace o2::framework
123
124#endif // O2_FRAMEWORK_FAIRMQRESIZABLEBUFFER_H_
~FairMQOutputStream() override=default
Status Reset(int64_t initial_capacity=1024, MemoryPool *pool=default_memory_pool())
Initialize state of OutputStream with newly allocated memory and set position to 0.
Status Close() override
Close the stream, preserving the buffer (retrieve it with Finish()).
static Result< std::shared_ptr< FairMQOutputStream > > Create(int64_t initial_capacity=4096, MemoryPool *pool=default_memory_pool())
Create in-memory output stream with indicated capacity using a memory pool.
Result< std::shared_ptr< Buffer > > Finish()
Close the stream and return the buffer.
Result< int64_t > Tell() const override
arrow::Status Reserve(const int64_t capacity) override
arrow::Status Resize(const int64_t new_size, bool shrink_to_fit) override
std::unique_ptr< fair::mq::Message > Finalise()
std::function< std::unique_ptr< fair::mq::Message >(size_t)> Creator
GLuint buffer
Definition glcorearb.h:655
GLboolean * data
Definition glcorearb.h:298
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
ctfTree Write()