Project
Loading...
Searching...
No Matches
FairMQResizableBuffer.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#ifndef O2_FRAMEWORK_FAIRMQRESIZABLEBUFFER_H_
13#define O2_FRAMEWORK_FAIRMQRESIZABLEBUFFER_H_
14
15#include <memory>
16#include <functional>
17#include <arrow/buffer.h>
18#include "arrow/io/interfaces.h"
19#include "arrow/status.h"
20#include "arrow/util/future.h"
21
22#include <fairmq/FwdDecls.h>
23
24namespace o2::framework
25{
26
28{
29 public:
30 explicit FairMQOutputStream(const std::shared_ptr<arrow::ResizableBuffer>& buffer);
31
38 static arrow::Result<std::shared_ptr<FairMQOutputStream>> Create(
39 int64_t initial_capacity = 4096, arrow::MemoryPool* pool = arrow::default_memory_pool());
40
41 // By the time we call the destructor, the contents
42 // of the buffer are already moved to fairmq
43 // for being sent.
44 ~FairMQOutputStream() override = default;
45
46 // Implement the OutputStream interface
47
49 arrow::Status Close() override;
50 [[nodiscard]] bool closed() const override;
51 [[nodiscard]] arrow::Result<int64_t> Tell() const override;
52 arrow::Status Write(const void* data, int64_t nbytes) override;
53
55 using OutputStream::Write;
57
59 arrow::Result<std::shared_ptr<arrow::Buffer>> Finish();
60
66 arrow::Status Reset(int64_t initial_capacity = 1024, arrow::MemoryPool* pool = arrow::default_memory_pool());
67
68 [[nodiscard]] int64_t capacity() const { return capacity_; }
69
70 private:
72
73 // Ensures there is sufficient space available to write nbytes
74 arrow::Status Reserve(int64_t nbytes);
75
76 std::shared_ptr<arrow::ResizableBuffer> buffer_;
77 bool is_open_;
78 int64_t capacity_;
79 int64_t position_;
80 uint8_t* mutable_data_;
81};
82
87class FairMQResizableBuffer : public ::arrow::ResizableBuffer
88{
89 public:
90 using Creator = std::function<std::unique_ptr<fair::mq::Message>(size_t)>;
91
94
101 arrow::Status Resize(const int64_t new_size, bool shrink_to_fit) override;
106 arrow::Status Reserve(const int64_t capacity) override;
107
111 std::unique_ptr<fair::mq::Message> Finalise();
112
113 private:
114 std::unique_ptr<fair::mq::Message> mMessage;
115 int64_t mSize;
116 Creator mCreator;
117};
118
119} // namespace o2::framework
120
121#endif // O2_FRAMEWORK_FAIRMQRESIZABLEBUFFER_H_
~FairMQOutputStream() override=default
arrow::Result< std::shared_ptr< arrow::Buffer > > Finish()
Close the stream and return the buffer.
arrow::Result< int64_t > Tell() const override
arrow::Status Close() override
Close the stream, preserving the buffer (retrieve it with Finish()).
static arrow::Result< std::shared_ptr< FairMQOutputStream > > Create(int64_t initial_capacity=4096, arrow::MemoryPool *pool=arrow::default_memory_pool())
Create in-memory output stream with indicated capacity using a memory pool.
arrow::Status Reset(int64_t initial_capacity=1024, arrow::MemoryPool *pool=arrow::default_memory_pool())
Initialize state of OutputStream with newly allocated memory and set position to 0.
arrow::Status Reserve(const int64_t capacity) override
arrow::Status Resize(const int64_t new_size, bool shrink_to_fit) override
std::unique_ptr< fair::mq::Message > Finalise()
std::function< std::unique_ptr< fair::mq::Message >(size_t)> Creator
GLuint buffer
Definition glcorearb.h:655
GLboolean * data
Definition glcorearb.h:298
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
ctfTree Write()