Project
Loading...
Searching...
No Matches
EmptyFragment.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
12#include <arrow/type_fwd.h>
13#include <arrow/array/array_primitive.h>
14#include <arrow/array/array_nested.h>
15#include <memory>
16
17static constexpr int64_t kBufferMinimumSize = 256;
18
19namespace o2::framework
20{
21
22// Scanner function which returns a batch where the space is not actually used.
23arrow::Result<arrow::RecordBatchGenerator> EmptyFragment::ScanBatchesAsync(
24 const std::shared_ptr<arrow::dataset::ScanOptions>& options)
25{
26 auto generator = [this]() -> arrow::Future<std::shared_ptr<arrow::RecordBatch>> {
27 std::vector<std::shared_ptr<arrow::Array>> columns;
28 columns.reserve(this->physical_schema_->fields().size());
29
30 for (auto& field : this->physical_schema_->fields()) {
31 if (auto listType = std::dynamic_pointer_cast<arrow::FixedSizeListType>(field->type())) {
32 size_t size = mRows * listType->list_size();
33 if (field->type()->field(0)->type()->byte_width() == 0) {
34 size /= 8;
35 } else {
36 size *= field->type()->field(0)->type()->byte_width();
37 }
38 auto varray = std::make_shared<arrow::PrimitiveArray>(field->type()->field(0)->type(), mRows * listType->list_size(), GetPlaceholderForOp(size));
39 columns.push_back(std::make_shared<arrow::FixedSizeListArray>(field->type(), (int32_t)mRows, varray));
40 } else {
41 size_t size = mRows;
42 if (field->type()->byte_width() == 0) {
43 size /= 8;
44 } else {
45 size *= field->type()->byte_width();
46 }
47 columns.push_back(std::make_shared<arrow::PrimitiveArray>(field->type(), mRows, GetPlaceholderForOp(size)));
48 }
49 }
50 return arrow::RecordBatch::Make(physical_schema_, mRows, columns);
51 };
52 return generator;
53}
54
55PreallocatedOutputStream::PreallocatedOutputStream()
56 : is_open_(false), capacity_(0), position_(0), mutable_data_(nullptr) {}
57
58PreallocatedOutputStream::PreallocatedOutputStream(std::vector<size_t>& sizes,
59 const std::shared_ptr<arrow::ResizableBuffer>& buffer)
60 : sizes_(sizes),
61 buffer_(buffer),
62 is_open_(true),
63 capacity_(buffer->size()),
64 position_(0),
65 mutable_data_(buffer->mutable_data()) {}
66
67arrow::Result<std::shared_ptr<PreallocatedOutputStream>> PreallocatedOutputStream::Create(
68 std::vector<size_t>& ops,
69 int64_t initial_capacity, arrow::MemoryPool* pool)
70{
71 // ctor is private, so cannot use make_shared
72 auto ptr = std::shared_ptr<PreallocatedOutputStream>(new PreallocatedOutputStream);
73 RETURN_NOT_OK(ptr->Reset(ops, initial_capacity, pool));
74 return ptr;
75}
76
77arrow::Status PreallocatedOutputStream::Reset(std::vector<size_t> sizes,
78 int64_t initial_capacity, arrow::MemoryPool* pool)
79{
80 ARROW_ASSIGN_OR_RAISE(buffer_, AllocateResizableBuffer(initial_capacity, pool));
81 sizes_ = sizes;
82 is_open_ = true;
83 capacity_ = initial_capacity;
84 position_ = 0;
85 mutable_data_ = buffer_->mutable_data();
86 return arrow::Status::OK();
87}
88
90{
91 if (is_open_) {
92 is_open_ = false;
93 if (position_ < capacity_) {
94 RETURN_NOT_OK(buffer_->Resize(position_, false));
95 }
96 }
97 return arrow::Status::OK();
98}
99
100bool PreallocatedOutputStream::closed() const { return !is_open_; }
101
102arrow::Result<std::shared_ptr<arrow::Buffer>> PreallocatedOutputStream::Finish()
103{
104 RETURN_NOT_OK(Close());
105 buffer_->ZeroPadding();
106 is_open_ = false;
107 return std::move(buffer_);
108}
109
110arrow::Result<int64_t> PreallocatedOutputStream::Tell() const { return position_; }
111
112arrow::Status PreallocatedOutputStream::Write(const void* data, int64_t nbytes)
113{
114 if (ARROW_PREDICT_FALSE(!is_open_)) {
115 return arrow::Status::IOError("OutputStream is closed");
116 }
117 if (ARROW_PREDICT_TRUE(nbytes == 0)) {
118 return arrow::Status::OK();
119 }
120 if (ARROW_PREDICT_FALSE(position_ + nbytes >= capacity_)) {
121 RETURN_NOT_OK(Reserve(nbytes));
122 }
123 // This is a real address which needs to be copied. Do it!
124 auto ref = (int64_t)data;
125 if (ref >= sizes_.size()) {
126 memcpy(mutable_data_ + position_, data, nbytes);
127 position_ += nbytes;
128 return arrow::Status::OK();
129 }
130
131 position_ += nbytes;
132 return arrow::Status::OK();
133}
134
135arrow::Status PreallocatedOutputStream::Reserve(int64_t nbytes)
136{
137 // Always overallocate by doubling. It seems that it is a better growth
138 // strategy, at least for memory_benchmark.cc.
139 // This may be because it helps match the allocator's allocation buckets
140 // more exactly. Or perhaps it hits a sweet spot in jemalloc.
141 int64_t new_capacity = std::max(kBufferMinimumSize, capacity_);
142 new_capacity = position_ + nbytes;
143 if (new_capacity > capacity_) {
144 RETURN_NOT_OK(buffer_->Resize(new_capacity));
145 capacity_ = new_capacity;
146 mutable_data_ = buffer_->mutable_data();
147 }
148 return arrow::Status::OK();
149}
150
151} // namespace o2::framework
TBranch * ptr
arrow::Result< arrow::RecordBatchGenerator > ScanBatchesAsync(const std::shared_ptr< arrow::dataset::ScanOptions > &options) override
arrow::Result< std::shared_ptr< arrow::Buffer > > Finish()
Close the stream and return the buffer.
arrow::Status Close() override
Close the stream, preserving the buffer (retrieve it with Finish()).
static arrow::Result< std::shared_ptr< PreallocatedOutputStream > > Create(std::vector< size_t > &sizes, int64_t initial_capacity=4096, arrow::MemoryPool *pool=arrow::default_memory_pool())
Create in-memory output stream with indicated capacity using a memory pool.
arrow::Status Write(const void *data, int64_t nbytes) override
arrow::Status Reset(std::vector< size_t > sizes, int64_t initial_capacity, arrow::MemoryPool *pool)
Initialize state of OutputStream with newly allocated memory and set position to 0.
arrow::Result< int64_t > Tell() const override
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLsizei const GLuint const GLintptr const GLsizeiptr * sizes
Definition glcorearb.h:2595
GLboolean * data
Definition glcorearb.h:298
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20