Project
Loading...
Searching...
No Matches
TableTreeHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
12#include "Framework/Logger.h"
13#include "Framework/Endian.h"
14#include "Framework/Signpost.h"
15
16#include <arrow/dataset/file_base.h>
17#include <arrow/record_batch.h>
18#include <arrow/type.h>
19#include <arrow/util/key_value_metadata.h>
20#include <TBufferFile.h>
21
22#include <memory>
23#include <utility>
24
25O2_DECLARE_DYNAMIC_LOG(tabletree_helpers);
26
28{
29static constexpr char const* sizeBranchSuffix = "_size";
30} // namespace TableTreeHelpers
31
32namespace o2::framework
33{
34auto arrowTypeFromROOT(EDataType type, int size)
35{
36 auto typeGenerator = [](std::shared_ptr<arrow::DataType> const& type, int size) -> std::shared_ptr<arrow::DataType> {
37 switch (size) {
38 case -1:
39 return arrow::list(type);
40 case 1:
41 return std::move(type);
42 default:
43 return arrow::fixed_size_list(type, size);
44 }
45 };
46
47 switch (type) {
48 case EDataType::kBool_t:
49 return typeGenerator(arrow::boolean(), size);
50 case EDataType::kUChar_t:
51 return typeGenerator(arrow::uint8(), size);
52 case EDataType::kUShort_t:
53 return typeGenerator(arrow::uint16(), size);
54 case EDataType::kUInt_t:
55 return typeGenerator(arrow::uint32(), size);
56 case EDataType::kULong64_t:
57 return typeGenerator(arrow::uint64(), size);
58 case EDataType::kChar_t:
59 return typeGenerator(arrow::int8(), size);
60 case EDataType::kShort_t:
61 return typeGenerator(arrow::int16(), size);
62 case EDataType::kInt_t:
63 return typeGenerator(arrow::int32(), size);
64 case EDataType::kLong64_t:
65 return typeGenerator(arrow::int64(), size);
66 case EDataType::kFloat_t:
67 return typeGenerator(arrow::float32(), size);
68 case EDataType::kDouble_t:
69 return typeGenerator(arrow::float64(), size);
70 default:
71 throw runtime_error_f("Unsupported branch type: %d", static_cast<int>(type));
72 }
73}
74
75auto basicROOTTypeFromArrow(arrow::Type::type id)
76{
77 switch (id) {
78 case arrow::Type::BOOL:
79 return ROOTTypeInfo{EDataType::kBool_t, "/O", TDataType::GetDataType(EDataType::kBool_t)->Size()};
80 case arrow::Type::UINT8:
81 return ROOTTypeInfo{EDataType::kUChar_t, "/b", TDataType::GetDataType(EDataType::kUChar_t)->Size()};
82 case arrow::Type::UINT16:
83 return ROOTTypeInfo{EDataType::kUShort_t, "/s", TDataType::GetDataType(EDataType::kUShort_t)->Size()};
84 case arrow::Type::UINT32:
85 return ROOTTypeInfo{EDataType::kUInt_t, "/i", TDataType::GetDataType(EDataType::kUInt_t)->Size()};
86 case arrow::Type::UINT64:
87 return ROOTTypeInfo{EDataType::kULong64_t, "/l", TDataType::GetDataType(EDataType::kULong64_t)->Size()};
88 case arrow::Type::INT8:
89 return ROOTTypeInfo{EDataType::kChar_t, "/B", TDataType::GetDataType(EDataType::kChar_t)->Size()};
90 case arrow::Type::INT16:
91 return ROOTTypeInfo{EDataType::kShort_t, "/S", TDataType::GetDataType(EDataType::kShort_t)->Size()};
92 case arrow::Type::INT32:
93 return ROOTTypeInfo{EDataType::kInt_t, "/I", TDataType::GetDataType(EDataType::kInt_t)->Size()};
94 case arrow::Type::INT64:
95 return ROOTTypeInfo{EDataType::kLong64_t, "/L", TDataType::GetDataType(EDataType::kLong64_t)->Size()};
96 case arrow::Type::FLOAT:
97 return ROOTTypeInfo{EDataType::kFloat_t, "/F", TDataType::GetDataType(EDataType::kFloat_t)->Size()};
98 case arrow::Type::DOUBLE:
99 return ROOTTypeInfo{EDataType::kDouble_t, "/D", TDataType::GetDataType(EDataType::kDouble_t)->Size()};
100 default:
101 throw runtime_error("Unsupported arrow column type");
102 }
103}
104
105ColumnToBranch::ColumnToBranch(TTree* tree, std::shared_ptr<arrow::ChunkedArray> const& column, std::shared_ptr<arrow::Field> const& field)
106 : mBranchName{field->name()},
107 mColumn{column.get()},
108 mFieldSize{field->type()->byte_width()}
109{
110 std::string leafList;
111 std::string sizeLeafList;
112 auto arrowType = field->type();
113 mFieldType = arrowType->id();
114 switch (mFieldType) {
115 case arrow::Type::FIXED_SIZE_LIST:
116 mListSize = std::static_pointer_cast<arrow::FixedSizeListType>(arrowType)->list_size();
117 arrowType = arrowType->field(0)->type();
118 mElementType = basicROOTTypeFromArrow(arrowType->id());
119 leafList = mBranchName + "[" + std::to_string(mListSize) + "]" + mElementType.suffix;
120 mFieldSize = arrowType->byte_width() * mListSize;
121 break;
122 case arrow::Type::LIST:
123 arrowType = arrowType->field(0)->type();
124 mElementType = basicROOTTypeFromArrow(arrowType->id());
125 leafList = mBranchName + "[" + mBranchName + TableTreeHelpers::sizeBranchSuffix + "]" + mElementType.suffix;
126 sizeLeafList = mBranchName + TableTreeHelpers::sizeBranchSuffix + "/I";
127 // Notice that this could be replaced by a better guess of the
128 // average size of the list elements, but this is not trivial.
129 mFieldSize = arrowType->byte_width();
130 break;
131 default:
132 mElementType = basicROOTTypeFromArrow(arrowType->id());
133 leafList = mBranchName + mElementType.suffix;
134 break;
135 }
136 if (!sizeLeafList.empty()) {
137 mSizeBranch = tree->GetBranch((mBranchName + TableTreeHelpers::sizeBranchSuffix).c_str());
138 if (mSizeBranch == nullptr) {
139 mSizeBranch = tree->Branch((mBranchName + TableTreeHelpers::sizeBranchSuffix).c_str(), (char*)nullptr, sizeLeafList.c_str());
140 }
141 }
142 mBranch = tree->GetBranch(mBranchName.c_str());
143 if (mBranch == nullptr) {
144 mBranch = tree->Branch(mBranchName.c_str(), (char*)nullptr, leafList.c_str());
145 }
146 if (mElementType.type == EDataType::kBool_t) {
147 cache.resize(mListSize);
148 }
149 accessChunk();
150}
151
152void ColumnToBranch::at(const int64_t* pos)
153{
154 if (O2_BUILTIN_UNLIKELY(*pos - mFirstIndex >= mChunkLength)) {
155 nextChunk();
156 }
157 if (mElementType.type == EDataType::kBool_t) {
158 auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(mCurrentArray);
159 for (auto i = 0; i < mListSize; ++i) {
160 cache[i] = boolArray->Value((*pos - mFirstIndex) * mListSize + i);
161 }
162 mBranch->SetAddress((void*)(cache.data()));
163 return;
164 }
165 uint8_t const* buffer;
166 switch (mFieldType) {
167 case arrow::Type::LIST: {
168 auto list = std::static_pointer_cast<arrow::ListArray>(mCurrentArray);
169 mListSize = list->value_length((*pos - mFirstIndex));
170 buffer = std::static_pointer_cast<arrow::PrimitiveArray>(list->values())->values()->data() + mCurrentArray->offset() + list->value_offset((*pos - mFirstIndex)) * mElementType.size;
171 mBranch->SetAddress((void*)buffer);
172 mSizeBranch->SetAddress(&mListSize);
173 };
174 break;
175 case arrow::Type::FIXED_SIZE_LIST:
176 default: {
177 buffer = std::static_pointer_cast<arrow::PrimitiveArray>(mCurrentArray)->values()->data() + mCurrentArray->offset() + (*pos - mFirstIndex) * mListSize * mElementType.size;
178 mBranch->SetAddress((void*)buffer);
179 };
180 }
181}
182
183void ColumnToBranch::accessChunk()
184{
185 auto array = mColumn->chunk(mCurrentChunk);
186 switch (mFieldType) {
187 case arrow::Type::FIXED_SIZE_LIST: {
188 auto list = std::static_pointer_cast<arrow::FixedSizeListArray>(array);
189 mChunkLength = list->length();
190 mCurrentArray = list->values();
191 };
192 break;
193 case arrow::Type::LIST: {
194 auto list = std::static_pointer_cast<arrow::ListArray>(array);
195 mChunkLength = list->length();
196 mCurrentArray = list;
197 };
198 break;
199 default:
200 mCurrentArray = array;
201 mChunkLength = mCurrentArray->length();
202 }
203}
204
205void ColumnToBranch::nextChunk()
206{
207 mFirstIndex += mChunkLength;
208 ++mCurrentChunk;
209 accessChunk();
210}
211
212TableToTree::TableToTree(std::shared_ptr<arrow::Table> const& table, TFile* file, const char* treename)
213{
214 mTable = table.get();
215 mTree.reset(static_cast<TTree*>(file->Get(treename)));
216 if (mTree) {
217 return;
218 }
219 std::string treeName(treename);
220 auto pos = treeName.find_first_of('/');
221 if (pos != std::string::npos) {
222 file->cd(treeName.substr(0, pos).c_str());
223 treeName = treeName.substr(pos + 1, std::string::npos);
224 }
225 mTree = std::make_shared<TTree>(treeName.c_str(), treeName.c_str());
226}
227
229{
230 mRows = mTable->num_rows();
231 auto columns = mTable->columns();
232 auto fields = mTable->schema()->fields();
233 assert(columns.size() == fields.size());
234 for (auto i = 0u; i < columns.size(); ++i) {
235 addBranch(columns[i], fields[i]);
236 }
237}
238
239void TableToTree::addBranch(std::shared_ptr<arrow::ChunkedArray> const& column, std::shared_ptr<arrow::Field> const& field)
240{
241 if (mRows == 0) {
242 mRows = column->length();
243 } else if (mRows != column->length()) {
244 throw runtime_error_f("Adding incompatible column with size %d (num rows = %d)", column->length(), mRows);
245 }
246 mColumnReaders.emplace_back(new ColumnToBranch{mTree.get(), column, field});
247}
248
249std::shared_ptr<TTree> TableToTree::process()
250{
251 int64_t row = 0;
252 if (mTree->GetNbranches() == 0 || mRows == 0) {
253 mTree->Write("", TObject::kOverwrite);
254 mTree->SetDirectory(nullptr);
255 return mTree;
256 }
257
258 for (auto& reader : mColumnReaders) {
259 int idealBasketSize = 1024 + reader->fieldSize() * reader->columnEntries(); // minimal additional size needed, otherwise we get 2 baskets
260 int basketSize = std::max(32000, idealBasketSize); // keep a minimum value
261 // std::cout << "Setting baskets size for " << reader->branchName() << " to " << basketSize << " = 1024 + "
262 // << reader->fieldSize() << " * " << reader->columnEntries() << ". mRows was " << mRows << std::endl;
263 mTree->SetBasketSize(reader->branchName(), basketSize);
264 // If it starts with fIndexArray, also set the size branch basket size
265 if (strncmp(reader->branchName(), "fIndexArray", strlen("fIndexArray")) == 0) {
266 std::string sizeBranch = reader->branchName();
267 sizeBranch += "_size";
268 // std::cout << "Setting baskets size for " << sizeBranch << " to " << basketSize << " = 1024 + "
269 // << reader->fieldSize() << " * " << reader->columnEntries() << ". mRows was " << mRows << std::endl;
270 // One int per array to keep track of the size
271 int idealBasketSize = 4 * mRows + 1024 + reader->fieldSize() * reader->columnEntries(); // minimal additional size needed, otherwise we get 2 baskets
272 int basketSize = std::max(32000, idealBasketSize); // keep a minimum value
273 mTree->SetBasketSize(sizeBranch.c_str(), basketSize);
274 mTree->SetBasketSize(reader->branchName(), basketSize);
275 }
276 }
277
278 while (row < mRows) {
279 for (auto& reader : mColumnReaders) {
280 reader->at(&row);
281 }
282 mTree->Fill();
283 ++row;
284 }
285 mTree->Write("", TObject::kOverwrite);
286 mTree->SetDirectory(nullptr);
287 return mTree;
288}
289
290namespace
291{
292struct BranchInfo {
293 std::string name;
294 TBranch* ptr;
295 bool mVLA;
296};
297} // namespace
298
299FragmentToBatch::FragmentToBatch(StreamerCreator creator, std::shared_ptr<arrow::dataset::FileFragment> fragment, arrow::MemoryPool* pool)
300 : mFragment{std::move(fragment)},
301 mArrowMemoryPool{pool},
302 mCreator{std::move(creator)}
303{
304}
305
307{
308 mTableLabel = label;
309}
310
311void FragmentToBatch::fill(std::shared_ptr<arrow::Schema> schema, std::shared_ptr<arrow::dataset::FileFormat> format)
312{
313 auto options = std::make_shared<arrow::dataset::ScanOptions>();
314 options->dataset_schema = schema;
315 auto scanner = format->ScanBatchesAsync(options, mFragment);
316 auto batch = (*scanner)();
317 mRecordBatch = *batch.result();
318 // Notice that up to here the buffer was not yet filled.
319}
320
321std::shared_ptr<arrow::RecordBatch> FragmentToBatch::finalize()
322{
323 return mRecordBatch;
324}
325
326} // namespace o2::framework
#define O2_BUILTIN_UNLIKELY(x)
int32_t i
uint16_t pos
Definition RawData.h:3
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:483
TBranch * ptr
bool mVLA
ColumnToBranch(TTree *tree, std::shared_ptr< arrow::ChunkedArray > const &column, std::shared_ptr< arrow::Field > const &field)
void at(const int64_t *pos)
FragmentToBatch(StreamerCreator, std::shared_ptr< arrow::dataset::FileFragment >, arrow::MemoryPool *pool=arrow::default_memory_pool())
std::shared_ptr< arrow::RecordBatch > finalize()
std::function< std::shared_ptr< arrow::io::OutputStream >(std::shared_ptr< arrow::dataset::FileFragment >, const std::shared_ptr< arrow::ResizableBuffer > &buffer)> StreamerCreator
void setLabel(const char *label)
void fill(std::shared_ptr< arrow::Schema > dataSetSchema, std::shared_ptr< arrow::dataset::FileFormat >)
void addBranch(std::shared_ptr< arrow::ChunkedArray > const &column, std::shared_ptr< arrow::Field > const &field)
TableToTree(std::shared_ptr< arrow::Table > const &table, TFile *file, const char *treename)
std::shared_ptr< TTree > process()
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLenum array
Definition glcorearb.h:4274
GLuint const GLchar * name
Definition glcorearb.h:781
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint GLint GLsizei GLint GLenum format
Definition glcorearb.h:275
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RuntimeErrorRef runtime_error(const char *)
auto basicROOTTypeFromArrow(arrow::Type::type id)
auto arrowTypeFromROOT(EDataType type, int size)
RuntimeErrorRef runtime_error_f(const char *,...)
Defining DataPointCompositeObject explicitly as copiable.
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
Definition list.h:40
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))
std::vector< int > row