Project
Loading...
Searching...
No Matches
EmptyFragment.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef O2_FRAMEWORK_DEFERREDFRAGMENT_H
12#define O2_FRAMEWORK_DEFERREDFRAGMENT_H
13
14#include <arrow/dataset/api.h>
15
16namespace o2::framework
17{
18
19// A Fragment which will create a preallocated batch in shared memory
20// and fill it directly in place.
22{
23 public:
24 // @a numRows is the number of rows in the final result.
25 // @a physical_schema the schema of the resulting batch
26 // @a fillers helper functions to fill the given buffer.
28 arrow::compute::Expression partition_expression,
29 std::shared_ptr<arrow::Schema> physical_schema)
30 : Fragment(std::move(partition_expression), physical_schema)
31 {
32 }
33
34 // Scanner function which returns a batch where the space is not actually used.
35 arrow::Result<arrow::RecordBatchGenerator> ScanBatchesAsync(
36 const std::shared_ptr<arrow::dataset::ScanOptions>& options) override;
37
38 private:
41 std::shared_ptr<arrow::Buffer> GetPlaceholderForOp(size_t size)
42 {
43 mSizes.push_back(size);
44 return std::make_shared<arrow::Buffer>((uint8_t*)(mSizes.size() - 1), size);
45 }
46 std::vector<size_t> mSizes;
47 size_t mRows;
48};
49
55{
56 public:
57 explicit PreallocatedOutputStream(std::vector<size_t>& sizes,
58 const std::shared_ptr<arrow::ResizableBuffer>& buffer);
59
66 static arrow::Result<std::shared_ptr<PreallocatedOutputStream>> Create(
67 std::vector<size_t>& sizes,
68 int64_t initial_capacity = 4096,
69 arrow::MemoryPool* pool = arrow::default_memory_pool());
70
71 // By the time we call the destructor, the contents
72 // of the buffer are already moved to fairmq
73 // for being sent.
74 ~PreallocatedOutputStream() override = default;
75
76 // Implement the OutputStream interface
77
79 arrow::Status Close() override;
80 [[nodiscard]] bool closed() const override;
81 [[nodiscard]] arrow::Result<int64_t> Tell() const override;
82 arrow::Status Write(const void* data, int64_t nbytes) override;
83
85 using OutputStream::Write;
87
89 arrow::Result<std::shared_ptr<arrow::Buffer>> Finish();
90
96 arrow::Status Reset(std::vector<size_t> sizes,
97 int64_t initial_capacity, arrow::MemoryPool* pool);
98
99 [[nodiscard]] int64_t capacity() const { return capacity_; }
100
101 private:
102 std::vector<size_t> sizes_;
104
105 // Ensures there is sufficient space available to write nbytes
106 arrow::Status Reserve(int64_t nbytes);
107
108 std::shared_ptr<arrow::ResizableBuffer> buffer_;
109 bool is_open_;
110 int64_t capacity_;
111 int64_t position_;
112 uint8_t* mutable_data_;
113};
114} // namespace o2::framework
115
116#endif
arrow::Result< arrow::RecordBatchGenerator > ScanBatchesAsync(const std::shared_ptr< arrow::dataset::ScanOptions > &options) override
EmptyFragment(size_t rows, arrow::compute::Expression partition_expression, std::shared_ptr< arrow::Schema > physical_schema)
arrow::Result< std::shared_ptr< arrow::Buffer > > Finish()
Close the stream and return the buffer.
~PreallocatedOutputStream() override=default
arrow::Status Close() override
Close the stream, preserving the buffer (retrieve it with Finish()).
static arrow::Result< std::shared_ptr< PreallocatedOutputStream > > Create(std::vector< size_t > &sizes, int64_t initial_capacity=4096, arrow::MemoryPool *pool=arrow::default_memory_pool())
Create in-memory output stream with indicated capacity using a memory pool.
arrow::Status Reset(std::vector< size_t > sizes, int64_t initial_capacity, arrow::MemoryPool *pool)
Initialize state of OutputStream with newly allocated memory and set position to 0.
arrow::Result< int64_t > Tell() const override
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLsizei const GLuint const GLintptr const GLsizeiptr * sizes
Definition glcorearb.h:2595
GLboolean * data
Definition glcorearb.h:298
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
Defining DataPointCompositeObject explicitly as copiable.
ctfTree Write()
std::vector< ReadoutWindowData > rows