Project
Loading...
Searching...
No Matches
EmptyFragment.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
12#include <arrow/array/data.h>
13#include <arrow/type_fwd.h>
14#include <arrow/array/array_primitive.h>
15#include <arrow/array/array_nested.h>
16#include <arrow/record_batch.h>
17#include <arrow/type.h>
18#include <arrow/array/util.h>
19#include <memory>
20
21static constexpr int64_t kBufferMinimumSize = 256;
22
23namespace o2::framework
24{
25
26// Scanner function which returns a batch where the space is not actually used.
27arrow::Result<arrow::RecordBatchGenerator> EmptyFragment::ScanBatchesAsync(
28 const std::shared_ptr<arrow::dataset::ScanOptions>& options)
29{
30 auto generator = [this]() -> arrow::Future<std::shared_ptr<arrow::RecordBatch>> {
31 std::vector<std::shared_ptr<arrow::Array>> columns;
32 columns.reserve(this->physical_schema_->fields().size());
33
34 for (auto& field : this->physical_schema_->fields()) {
35 if (auto listType = std::dynamic_pointer_cast<arrow::FixedSizeListType>(field->type())) {
36 size_t size = mRows * listType->list_size();
37 if (field->type()->field(0)->type()->byte_width() == 0) {
38 size /= 8;
39 } else {
40 size *= field->type()->field(0)->type()->byte_width();
41 }
42 auto vdata = std::make_shared<arrow::ArrayData>(field->type()->field(0)->type(), mRows * listType->list_size(), std::vector<std::shared_ptr<arrow::Buffer>>{nullptr, GetPlaceholderForOp(size)});
43 columns.push_back(std::make_shared<arrow::FixedSizeListArray>(field->type(), (int32_t)mRows, arrow::MakeArray(vdata)));
44 } else {
45 size_t size = mRows;
46 if (field->type()->byte_width() == 0) {
47 size /= 8;
48 } else {
49 size *= field->type()->byte_width();
50 }
51 auto data = std::make_shared<arrow::ArrayData>(field->type(), mRows, std::vector<std::shared_ptr<arrow::Buffer>>{nullptr, GetPlaceholderForOp(size)});
52 columns.push_back(arrow::MakeArray(data));
53 }
54 }
55 return arrow::RecordBatch::Make(physical_schema_, mRows, columns);
56 };
57 return generator;
58}
59
60PreallocatedOutputStream::PreallocatedOutputStream()
61 : is_open_(false), capacity_(0), position_(0), mutable_data_(nullptr) {}
62
63PreallocatedOutputStream::PreallocatedOutputStream(std::vector<size_t>& sizes,
64 const std::shared_ptr<arrow::ResizableBuffer>& buffer)
65 : sizes_(sizes),
66 buffer_(buffer),
67 is_open_(true),
68 capacity_(buffer->size()),
69 position_(0),
70 mutable_data_(buffer->mutable_data()) {}
71
72arrow::Result<std::shared_ptr<PreallocatedOutputStream>> PreallocatedOutputStream::Create(
73 std::vector<size_t>& ops,
74 int64_t initial_capacity, arrow::MemoryPool* pool)
75{
76 // ctor is private, so cannot use make_shared
77 auto ptr = std::shared_ptr<PreallocatedOutputStream>(new PreallocatedOutputStream);
78 RETURN_NOT_OK(ptr->Reset(ops, initial_capacity, pool));
79 return ptr;
80}
81
82arrow::Status PreallocatedOutputStream::Reset(std::vector<size_t> sizes,
83 int64_t initial_capacity, arrow::MemoryPool* pool)
84{
85 ARROW_ASSIGN_OR_RAISE(buffer_, AllocateResizableBuffer(initial_capacity, pool));
86 sizes_ = sizes;
87 is_open_ = true;
88 capacity_ = initial_capacity;
89 position_ = 0;
90 mutable_data_ = buffer_->mutable_data();
91 return arrow::Status::OK();
92}
93
95{
96 if (is_open_) {
97 is_open_ = false;
98 if (position_ < capacity_) {
99 RETURN_NOT_OK(buffer_->Resize(position_, false));
100 }
101 }
102 return arrow::Status::OK();
103}
104
105bool PreallocatedOutputStream::closed() const { return !is_open_; }
106
107arrow::Result<std::shared_ptr<arrow::Buffer>> PreallocatedOutputStream::Finish()
108{
109 RETURN_NOT_OK(Close());
110 buffer_->ZeroPadding();
111 is_open_ = false;
112 return std::move(buffer_);
113}
114
115arrow::Result<int64_t> PreallocatedOutputStream::Tell() const { return position_; }
116
117arrow::Status PreallocatedOutputStream::Write(const void* data, int64_t nbytes)
118{
119 if (ARROW_PREDICT_FALSE(!is_open_)) {
120 return arrow::Status::IOError("OutputStream is closed");
121 }
122 if (ARROW_PREDICT_TRUE(nbytes == 0)) {
123 return arrow::Status::OK();
124 }
125 if (ARROW_PREDICT_FALSE(position_ + nbytes >= capacity_)) {
126 RETURN_NOT_OK(Reserve(nbytes));
127 }
128 // This is a real address which needs to be copied. Do it!
129 auto ref = (int64_t)data;
130 if (ref >= sizes_.size()) {
131 memcpy(mutable_data_ + position_, data, nbytes);
132 position_ += nbytes;
133 return arrow::Status::OK();
134 }
135
136 position_ += nbytes;
137 return arrow::Status::OK();
138}
139
140arrow::Status PreallocatedOutputStream::Reserve(int64_t nbytes)
141{
142 // Always overallocate by doubling. It seems that it is a better growth
143 // strategy, at least for memory_benchmark.cc.
144 // This may be because it helps match the allocator's allocation buckets
145 // more exactly. Or perhaps it hits a sweet spot in jemalloc.
146 int64_t new_capacity = std::max(kBufferMinimumSize, capacity_);
147 new_capacity = position_ + nbytes;
148 if (new_capacity > capacity_) {
149 RETURN_NOT_OK(buffer_->Resize(new_capacity));
150 capacity_ = new_capacity;
151 mutable_data_ = buffer_->mutable_data();
152 }
153 return arrow::Status::OK();
154}
155
156} // namespace o2::framework
TBranch * ptr
arrow::Result< arrow::RecordBatchGenerator > ScanBatchesAsync(const std::shared_ptr< arrow::dataset::ScanOptions > &options) override
arrow::Result< std::shared_ptr< arrow::Buffer > > Finish()
Close the stream and return the buffer.
arrow::Status Close() override
Close the stream, preserving the buffer (retrieve it with Finish()).
static arrow::Result< std::shared_ptr< PreallocatedOutputStream > > Create(std::vector< size_t > &sizes, int64_t initial_capacity=4096, arrow::MemoryPool *pool=arrow::default_memory_pool())
Create in-memory output stream with indicated capacity using a memory pool.
arrow::Status Write(const void *data, int64_t nbytes) override
arrow::Status Reset(std::vector< size_t > sizes, int64_t initial_capacity, arrow::MemoryPool *pool)
Initialize state of OutputStream with newly allocated memory and set position to 0.
arrow::Result< int64_t > Tell() const override
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLsizei const GLuint const GLintptr const GLsizeiptr * sizes
Definition glcorearb.h:2595
GLboolean * data
Definition glcorearb.h:298
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20