Project
Loading...
Searching...
No Matches
TableTreeHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
12#include "Framework/Logger.h"
13#include "Framework/Endian.h"
14#include "Framework/Signpost.h"
15
16#include "arrow/type_traits.h"
17#include <arrow/dataset/file_base.h>
18#include <arrow/record_batch.h>
19#include <arrow/type.h>
20#include <arrow/util/key_value_metadata.h>
21#include <TBufferFile.h>
22
23#include <memory>
24#include <utility>
25
26O2_DECLARE_DYNAMIC_LOG(tabletree_helpers);
27
29{
30static constexpr char const* sizeBranchSuffix = "_size";
31} // namespace TableTreeHelpers
32
33namespace o2::framework
34{
35auto arrowTypeFromROOT(EDataType type, int size)
36{
37 auto typeGenerator = [](std::shared_ptr<arrow::DataType> const& type, int size) -> std::shared_ptr<arrow::DataType> {
38 switch (size) {
39 case -1:
40 return arrow::list(type);
41 case 1:
42 return std::move(type);
43 default:
44 return arrow::fixed_size_list(type, size);
45 }
46 };
47
48 switch (type) {
49 case EDataType::kBool_t:
50 return typeGenerator(arrow::boolean(), size);
51 case EDataType::kUChar_t:
52 return typeGenerator(arrow::uint8(), size);
53 case EDataType::kUShort_t:
54 return typeGenerator(arrow::uint16(), size);
55 case EDataType::kUInt_t:
56 return typeGenerator(arrow::uint32(), size);
57 case EDataType::kULong64_t:
58 return typeGenerator(arrow::uint64(), size);
59 case EDataType::kChar_t:
60 return typeGenerator(arrow::int8(), size);
61 case EDataType::kShort_t:
62 return typeGenerator(arrow::int16(), size);
63 case EDataType::kInt_t:
64 return typeGenerator(arrow::int32(), size);
65 case EDataType::kLong64_t:
66 return typeGenerator(arrow::int64(), size);
67 case EDataType::kFloat_t:
68 return typeGenerator(arrow::float32(), size);
69 case EDataType::kDouble_t:
70 return typeGenerator(arrow::float64(), size);
71 default:
72 throw runtime_error_f("Unsupported branch type: %d", static_cast<int>(type));
73 }
74}
75
76auto basicROOTTypeFromArrow(arrow::Type::type id)
77{
78 switch (id) {
79 case arrow::Type::BOOL:
80 return ROOTTypeInfo{EDataType::kBool_t, "/O", TDataType::GetDataType(EDataType::kBool_t)->Size()};
81 case arrow::Type::UINT8:
82 return ROOTTypeInfo{EDataType::kUChar_t, "/b", TDataType::GetDataType(EDataType::kUChar_t)->Size()};
83 case arrow::Type::UINT16:
84 return ROOTTypeInfo{EDataType::kUShort_t, "/s", TDataType::GetDataType(EDataType::kUShort_t)->Size()};
85 case arrow::Type::UINT32:
86 return ROOTTypeInfo{EDataType::kUInt_t, "/i", TDataType::GetDataType(EDataType::kUInt_t)->Size()};
87 case arrow::Type::UINT64:
88 return ROOTTypeInfo{EDataType::kULong64_t, "/l", TDataType::GetDataType(EDataType::kULong64_t)->Size()};
89 case arrow::Type::INT8:
90 return ROOTTypeInfo{EDataType::kChar_t, "/B", TDataType::GetDataType(EDataType::kChar_t)->Size()};
91 case arrow::Type::INT16:
92 return ROOTTypeInfo{EDataType::kShort_t, "/S", TDataType::GetDataType(EDataType::kShort_t)->Size()};
93 case arrow::Type::INT32:
94 return ROOTTypeInfo{EDataType::kInt_t, "/I", TDataType::GetDataType(EDataType::kInt_t)->Size()};
95 case arrow::Type::INT64:
96 return ROOTTypeInfo{EDataType::kLong64_t, "/L", TDataType::GetDataType(EDataType::kLong64_t)->Size()};
97 case arrow::Type::FLOAT:
98 return ROOTTypeInfo{EDataType::kFloat_t, "/F", TDataType::GetDataType(EDataType::kFloat_t)->Size()};
99 case arrow::Type::DOUBLE:
100 return ROOTTypeInfo{EDataType::kDouble_t, "/D", TDataType::GetDataType(EDataType::kDouble_t)->Size()};
101 default:
102 throw runtime_error("Unsupported arrow column type");
103 }
104}
105
107{
108 return mBranch;
109}
110
111BranchToColumn::BranchToColumn(TBranch* branch, bool VLA, std::string name, EDataType type, int listSize, arrow::MemoryPool* pool)
112 : mBranch{branch},
113 mVLA{VLA},
114 mColumnName{std::move(name)},
115 mType{type},
116 mArrowType{arrowTypeFromROOT(type, listSize)},
117 mListSize{listSize},
118 mPool{pool}
119
120{
121 if (mType == EDataType::kBool_t) {
122 if (mListSize > 1) {
123 auto status = arrow::MakeBuilder(mPool, mArrowType->field(0)->type(), &mBuilder);
124 if (!status.ok()) {
125 throw runtime_error("Cannot create value builder");
126 }
127 mListBuilder = std::make_unique<arrow::FixedSizeListBuilder>(mPool, std::move(mBuilder), mListSize);
128 mValueBuilder = static_cast<arrow::FixedSizeListBuilder*>(mListBuilder.get())->value_builder();
129 } else {
130 auto status = arrow::MakeBuilder(mPool, mArrowType, &mBuilder);
131 if (!status.ok()) {
132 throw runtime_error("Cannot create builder");
133 }
134 mValueBuilder = mBuilder.get();
135 }
136 }
137}
138
139std::pair<std::shared_ptr<arrow::ChunkedArray>, std::shared_ptr<arrow::Field>> BranchToColumn::read(TBuffer* buffer)
140{
141 O2_SIGNPOST_ID_FROM_POINTER(sid, tabletree_helpers, buffer);
142 auto totalEntries = mBranch->GetEntries();
143 arrow::Status status;
144 int readEntries = 0;
145 buffer->Reset();
146 std::shared_ptr<arrow::Array> array;
147
148 if (mType == EDataType::kBool_t) {
149 // boolean array special case: we need to use builder to create the bitmap
150 status = mValueBuilder->Reserve(totalEntries * mListSize);
151 if (mListSize > 1) {
152 status &= mListBuilder->Reserve(totalEntries);
153 }
154 if (!status.ok()) {
155 throw runtime_error("Failed to reserve memory for array builder");
156 }
157 while (readEntries < totalEntries) {
158 auto readLast = mBranch->GetBulkRead().GetBulkEntries(readEntries, *buffer);
159 readEntries += readLast;
160 status &= static_cast<arrow::BooleanBuilder*>(mValueBuilder)->AppendValues(reinterpret_cast<uint8_t const*>(buffer->GetCurrent()), readLast * mListSize);
161 }
162 if (mListSize > 1) {
163 status &= static_cast<arrow::FixedSizeListBuilder*>(mListBuilder.get())->AppendValues(readEntries);
164 }
165 if (!status.ok()) {
166 throw runtime_error("Failed to append values to array");
167 }
168 if (mListSize > 1) {
169 status &= mListBuilder->Finish(&array);
170 } else {
171 status &= mValueBuilder->Finish(&array);
172 }
173 if (!status.ok()) {
174 throw runtime_error("Failed to create array");
175 }
176 } else {
177 // other types: use serialized read to build arrays directly
178 size_t branchSize = mBranch->GetTotBytes();
179 auto&& result = arrow::AllocateResizableBuffer(mBranch->GetTotBytes(), mPool);
180 O2_SIGNPOST_EVENT_EMIT(tabletree_helpers, sid, "BranchToColumn", "Allocating %ld bytes for %{public}s", branchSize, mBranch->GetName());
181 if (!result.ok()) {
182 throw runtime_error("Cannot allocate values buffer");
183 }
184 std::shared_ptr<arrow::Buffer> arrowValuesBuffer = std::move(result).ValueUnsafe();
185 auto ptr = arrowValuesBuffer->mutable_data();
186 if (ptr == nullptr) {
187 throw runtime_error("Invalid buffer");
188 }
189
190 auto typeSize = TDataType::GetDataType(mType)->Size();
191 std::unique_ptr<TBufferFile> offsetBuffer = nullptr;
192
193 uint32_t offset = 0;
194 int count = 0;
195 std::shared_ptr<arrow::Buffer> arrowOffsetBuffer;
196 gsl::span<int> offsets;
197 int size = 0;
198 uint32_t totalSize = 0;
199 TBranch* mSizeBranch = nullptr;
200 if (mVLA) {
201 mSizeBranch = mBranch->GetTree()->GetBranch((std::string{mBranch->GetName()} + TableTreeHelpers::sizeBranchSuffix).c_str());
202 offsetBuffer = std::make_unique<TBufferFile>(TBuffer::EMode::kWrite, 4 * 1024 * 1024);
203 result = arrow::AllocateResizableBuffer((totalEntries + 1) * (int64_t)sizeof(int), mPool);
204 if (!result.ok()) {
205 throw runtime_error("Cannot allocate offset buffer");
206 }
207 arrowOffsetBuffer = std::move(result).ValueUnsafe();
208 unsigned char* ptrOffset = arrowOffsetBuffer->mutable_data();
209 auto* tPtrOffset = reinterpret_cast<int*>(ptrOffset);
210 offsets = gsl::span<int>{tPtrOffset, tPtrOffset + totalEntries + 1};
211
212 // read sizes first
213 while (readEntries < totalEntries) {
214 auto readLast = mSizeBranch->GetBulkRead().GetEntriesSerialized(readEntries, *offsetBuffer);
215 readEntries += readLast;
216 for (auto i = 0; i < readLast; ++i) {
217 offsets[count++] = (int)offset;
218 offset += swap32_(reinterpret_cast<uint32_t*>(offsetBuffer->GetCurrent())[i]);
219 }
220 }
222 totalSize = offset;
223 readEntries = 0;
224 }
225
226 while (readEntries < totalEntries) {
227 auto readLast = mBranch->GetBulkRead().GetEntriesSerialized(readEntries, *buffer);
228 if (mVLA) {
229 size = offsets[readEntries + readLast] - offsets[readEntries];
230 } else {
231 size = readLast * mListSize;
232 }
233 readEntries += readLast;
234 swapCopy(ptr, buffer->GetCurrent(), size, typeSize);
235 ptr += (ptrdiff_t)(size * typeSize);
236 }
237 if (!mVLA) {
238 totalSize = readEntries * mListSize;
239 }
240 std::shared_ptr<arrow::PrimitiveArray> varray;
241 switch (mListSize) {
242 case -1:
243 varray = std::make_shared<arrow::PrimitiveArray>(mArrowType->field(0)->type(), totalSize, arrowValuesBuffer);
244 array = std::make_shared<arrow::ListArray>(mArrowType, readEntries, arrowOffsetBuffer, varray);
245 break;
246 case 1:
247 array = std::make_shared<arrow::PrimitiveArray>(mArrowType, readEntries, arrowValuesBuffer);
248 break;
249 default:
250 varray = std::make_shared<arrow::PrimitiveArray>(mArrowType->field(0)->type(), totalSize, arrowValuesBuffer);
251 array = std::make_shared<arrow::FixedSizeListArray>(mArrowType, readEntries, varray);
252 }
253 }
254
255 auto fullArray = std::make_shared<arrow::ChunkedArray>(array);
256 auto field = std::make_shared<arrow::Field>(mBranch->GetName(), mArrowType);
257
258 mBranch->SetStatus(false);
259 mBranch->DropBaskets("all");
260 mBranch->Reset();
261 mBranch->GetTransientBuffer(0)->Expand(0);
262
263 return std::make_pair(fullArray, field);
264}
265
266ColumnToBranch::ColumnToBranch(TTree* tree, std::shared_ptr<arrow::ChunkedArray> const& column, std::shared_ptr<arrow::Field> const& field)
267 : mBranchName{field->name()},
268 mColumn{column.get()},
269 mFieldSize{field->type()->byte_width()}
270{
271 std::string leafList;
272 std::string sizeLeafList;
273 auto arrowType = field->type();
274 mFieldType = arrowType->id();
275 switch (mFieldType) {
276 case arrow::Type::FIXED_SIZE_LIST:
277 mListSize = std::static_pointer_cast<arrow::FixedSizeListType>(arrowType)->list_size();
278 arrowType = arrowType->field(0)->type();
279 mElementType = basicROOTTypeFromArrow(arrowType->id());
280 leafList = mBranchName + "[" + std::to_string(mListSize) + "]" + mElementType.suffix;
281 mFieldSize = arrowType->byte_width() * mListSize;
282 break;
283 case arrow::Type::LIST:
284 arrowType = arrowType->field(0)->type();
285 mElementType = basicROOTTypeFromArrow(arrowType->id());
286 leafList = mBranchName + "[" + mBranchName + TableTreeHelpers::sizeBranchSuffix + "]" + mElementType.suffix;
287 sizeLeafList = mBranchName + TableTreeHelpers::sizeBranchSuffix + "/I";
288 // Notice that this could be replaced by a better guess of the
289 // average size of the list elements, but this is not trivial.
290 mFieldSize = arrowType->byte_width();
291 break;
292 default:
293 mElementType = basicROOTTypeFromArrow(arrowType->id());
294 leafList = mBranchName + mElementType.suffix;
295 break;
296 }
297 if (!sizeLeafList.empty()) {
298 mSizeBranch = tree->GetBranch((mBranchName + TableTreeHelpers::sizeBranchSuffix).c_str());
299 if (mSizeBranch == nullptr) {
300 mSizeBranch = tree->Branch((mBranchName + TableTreeHelpers::sizeBranchSuffix).c_str(), (char*)nullptr, sizeLeafList.c_str());
301 }
302 }
303 mBranch = tree->GetBranch(mBranchName.c_str());
304 if (mBranch == nullptr) {
305 mBranch = tree->Branch(mBranchName.c_str(), (char*)nullptr, leafList.c_str());
306 }
307 if (mElementType.type == EDataType::kBool_t) {
308 cache.resize(mListSize);
309 }
310 accessChunk();
311}
312
313void ColumnToBranch::at(const int64_t* pos)
314{
315 if (O2_BUILTIN_UNLIKELY(*pos - mFirstIndex >= mChunkLength)) {
316 nextChunk();
317 }
318 if (mElementType.type == EDataType::kBool_t) {
319 auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(mCurrentArray);
320 for (auto i = 0; i < mListSize; ++i) {
321 cache[i] = boolArray->Value((*pos - mFirstIndex) * mListSize + i);
322 }
323 mBranch->SetAddress((void*)(cache.data()));
324 return;
325 }
326 uint8_t const* buffer;
327 switch (mFieldType) {
328 case arrow::Type::LIST: {
329 auto list = std::static_pointer_cast<arrow::ListArray>(mCurrentArray);
330 mListSize = list->value_length((*pos - mFirstIndex));
331 buffer = std::static_pointer_cast<arrow::PrimitiveArray>(list->values())->values()->data() + mCurrentArray->offset() + list->value_offset((*pos - mFirstIndex)) * mElementType.size;
332 mBranch->SetAddress((void*)buffer);
333 mSizeBranch->SetAddress(&mListSize);
334 };
335 break;
336 case arrow::Type::FIXED_SIZE_LIST:
337 default: {
338 buffer = std::static_pointer_cast<arrow::PrimitiveArray>(mCurrentArray)->values()->data() + mCurrentArray->offset() + (*pos - mFirstIndex) * mListSize * mElementType.size;
339 mBranch->SetAddress((void*)buffer);
340 };
341 }
342}
343
344void ColumnToBranch::accessChunk()
345{
346 auto array = mColumn->chunk(mCurrentChunk);
347 switch (mFieldType) {
348 case arrow::Type::FIXED_SIZE_LIST: {
349 auto list = std::static_pointer_cast<arrow::FixedSizeListArray>(array);
350 mChunkLength = list->length();
351 mCurrentArray = list->values();
352 };
353 break;
354 case arrow::Type::LIST: {
355 auto list = std::static_pointer_cast<arrow::ListArray>(array);
356 mChunkLength = list->length();
357 mCurrentArray = list;
358 };
359 break;
360 default:
361 mCurrentArray = array;
362 mChunkLength = mCurrentArray->length();
363 }
364}
365
366void ColumnToBranch::nextChunk()
367{
368 mFirstIndex += mChunkLength;
369 ++mCurrentChunk;
370 accessChunk();
371}
372
373TableToTree::TableToTree(std::shared_ptr<arrow::Table> const& table, TFile* file, const char* treename)
374{
375 mTable = table.get();
376 mTree.reset(static_cast<TTree*>(file->Get(treename)));
377 if (mTree) {
378 return;
379 }
380 std::string treeName(treename);
381 auto pos = treeName.find_first_of('/');
382 if (pos != std::string::npos) {
383 file->cd(treeName.substr(0, pos).c_str());
384 treeName = treeName.substr(pos + 1, std::string::npos);
385 }
386 mTree = std::make_shared<TTree>(treeName.c_str(), treeName.c_str());
387}
388
390{
391 mRows = mTable->num_rows();
392 auto columns = mTable->columns();
393 auto fields = mTable->schema()->fields();
394 assert(columns.size() == fields.size());
395 for (auto i = 0u; i < columns.size(); ++i) {
396 addBranch(columns[i], fields[i]);
397 }
398}
399
400void TableToTree::addBranch(std::shared_ptr<arrow::ChunkedArray> const& column, std::shared_ptr<arrow::Field> const& field)
401{
402 if (mRows == 0) {
403 mRows = column->length();
404 } else if (mRows != column->length()) {
405 throw runtime_error_f("Adding incompatible column with size %d (num rows = %d)", column->length(), mRows);
406 }
407 mColumnReaders.emplace_back(new ColumnToBranch{mTree.get(), column, field});
408}
409
410std::shared_ptr<TTree> TableToTree::process()
411{
412 int64_t row = 0;
413 if (mTree->GetNbranches() == 0 || mRows == 0) {
414 mTree->Write("", TObject::kOverwrite);
415 mTree->SetDirectory(nullptr);
416 return mTree;
417 }
418
419 for (auto& reader : mColumnReaders) {
420 int idealBasketSize = 1024 + reader->fieldSize() * reader->columnEntries(); // minimal additional size needed, otherwise we get 2 baskets
421 int basketSize = std::max(32000, idealBasketSize); // keep a minimum value
422 // std::cout << "Setting baskets size for " << reader->branchName() << " to " << basketSize << " = 1024 + "
423 // << reader->fieldSize() << " * " << reader->columnEntries() << ". mRows was " << mRows << std::endl;
424 mTree->SetBasketSize(reader->branchName(), basketSize);
425 // If it starts with fIndexArray, also set the size branch basket size
426 if (strncmp(reader->branchName(), "fIndexArray", strlen("fIndexArray")) == 0) {
427 std::string sizeBranch = reader->branchName();
428 sizeBranch += "_size";
429 // std::cout << "Setting baskets size for " << sizeBranch << " to " << basketSize << " = 1024 + "
430 // << reader->fieldSize() << " * " << reader->columnEntries() << ". mRows was " << mRows << std::endl;
431 // One int per array to keep track of the size
432 int idealBasketSize = 4 * mRows + 1024 + reader->fieldSize() * reader->columnEntries(); // minimal additional size needed, otherwise we get 2 baskets
433 int basketSize = std::max(32000, idealBasketSize); // keep a minimum value
434 mTree->SetBasketSize(sizeBranch.c_str(), basketSize);
435 mTree->SetBasketSize(reader->branchName(), basketSize);
436 }
437 }
438
439 while (row < mRows) {
440 for (auto& reader : mColumnReaders) {
441 reader->at(&row);
442 }
443 mTree->Fill();
444 ++row;
445 }
446 mTree->Write("", TObject::kOverwrite);
447 mTree->SetDirectory(nullptr);
448 return mTree;
449}
450
451TreeToTable::TreeToTable(arrow::MemoryPool* pool)
452 : mArrowMemoryPool{pool}
453{
454}
455
456namespace
457{
458struct BranchInfo {
459 std::string name;
460 TBranch* ptr;
461 bool mVLA;
462};
463} // namespace
464
465void TreeToTable::addAllColumns(TTree* tree, std::vector<std::string>&& names)
466{
467 auto branches = tree->GetListOfBranches();
468 auto n = branches->GetEntries();
469 if (n == 0) {
470 throw runtime_error("Tree has no branches");
471 }
472
473 std::vector<BranchInfo> branchInfos;
474 for (auto i = 0; i < n; ++i) {
475 auto branch = static_cast<TBranch*>(branches->At(i));
476 auto name = std::string{branch->GetName()};
477 auto pos = name.find(TableTreeHelpers::sizeBranchSuffix);
478 if (pos != std::string::npos) {
479 name.erase(pos);
480 branchInfos.emplace_back(BranchInfo{name, (TBranch*)nullptr, true});
481 } else {
482 auto lookup = std::find_if(branchInfos.begin(), branchInfos.end(), [&](BranchInfo const& bi) {
483 return bi.name == name;
484 });
485 if (lookup == branchInfos.end()) {
486 branchInfos.emplace_back(BranchInfo{name, branch, false});
487 } else {
488 lookup->ptr = branch;
489 }
490 }
491 }
492
493 if (names.empty()) {
494 for (auto& bi : branchInfos) {
495 addReader(bi.ptr, bi.name, bi.mVLA);
496 }
497 } else {
498 for (auto& name : names) {
499 auto lookup = std::find_if(branchInfos.begin(), branchInfos.end(), [&](BranchInfo const& bi) {
500 return name == bi.name;
501 });
502 if (lookup != branchInfos.end()) {
503 addReader(lookup->ptr, lookup->name, lookup->mVLA);
504 }
505 }
506 if (names.size() != mBranchReaders.size()) {
507 LOGF(warn, "Not all requested columns were found in the tree");
508 }
509 }
510 if (mBranchReaders.empty()) {
511 throw runtime_error("No columns will be read");
512 }
513 // Was affected by https://github.com/root-project/root/issues/8962
514 // Re-enabling this seems to cut the number of IOPS in half
515 tree->SetCacheSize(25000000);
516 // tree->SetClusterPrefetch(true);
517 for (auto& reader : mBranchReaders) {
518 tree->AddBranchToCache(reader->branch());
519 if (strncmp(reader->branch()->GetName(), "fIndexArray", strlen("fIndexArray")) == 0) {
520 std::string sizeBranchName = reader->branch()->GetName();
521 sizeBranchName += "_size";
522 auto* sizeBranch = (TBranch*)tree->GetBranch(sizeBranchName.c_str());
523 if (sizeBranch) {
524 tree->AddBranchToCache(sizeBranch);
525 }
526 }
527 }
528 tree->StopCacheLearningPhase();
529}
530
532{
533 mTableLabel = label;
534}
535
537{
538 std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
539 std::vector<std::shared_ptr<arrow::Field>> fields;
540 static TBufferFile buffer{TBuffer::EMode::kWrite, 4 * 1024 * 1024};
541 O2_SIGNPOST_ID_FROM_POINTER(sid, tabletree_helpers, &buffer);
542 O2_SIGNPOST_START(tabletree_helpers, sid, "TreeToTable", "Filling %{public}s", tree->GetName());
543 for (auto& reader : mBranchReaders) {
544 buffer.Reset();
545 auto arrayAndField = reader->read(&buffer);
546 columns.push_back(arrayAndField.first);
547 fields.push_back(arrayAndField.second);
548 }
549 O2_SIGNPOST_END(tabletree_helpers, sid, "TreeToTable", "Done filling.");
550
551 auto schema = std::make_shared<arrow::Schema>(fields, std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{"label"}}, std::vector{mTableLabel}));
552 mTable = arrow::Table::Make(schema, columns);
553}
554
555void TreeToTable::addReader(TBranch* branch, std::string const& name, bool VLA)
556{
557 static TClass* cls;
558 EDataType type;
559 branch->GetExpectedType(cls, type);
560 auto listSize = -1;
561 if (!VLA) {
562 listSize = static_cast<TLeaf*>(branch->GetListOfLeaves()->At(0))->GetLenStatic();
563 }
564 mBranchReaders.emplace_back(std::make_unique<BranchToColumn>(branch, VLA, name, type, listSize, mArrowMemoryPool));
565}
566
567std::shared_ptr<arrow::Table> TreeToTable::finalize()
568{
569 return mTable;
570}
571
572FragmentToBatch::FragmentToBatch(arrow::MemoryPool* pool)
573 : mArrowMemoryPool{pool}
574{
575}
576
578{
579 mTableLabel = label;
580}
581
582void FragmentToBatch::fill(std::shared_ptr<arrow::dataset::FileFragment> fragment, std::shared_ptr<arrow::Schema> schema, std::shared_ptr<arrow::dataset::FileFormat> format)
583{
584 auto options = std::make_shared<arrow::dataset::ScanOptions>();
585 options->dataset_schema = schema;
586 auto scanner = format->ScanBatchesAsync(options, fragment);
587 auto batch = (*scanner)();
588 mRecordBatch = *batch.result();
589}
590
591std::shared_ptr<arrow::RecordBatch> FragmentToBatch::finalize()
592{
593 return mRecordBatch;
594}
595
596} // namespace o2::framework
#define O2_BUILTIN_UNLIKELY(x)
uint8_t lookup(const char input) noexcept
#define swap32_
Definition Endian.h:27
void swapCopy(unsigned char *dest, char *source, int size, int typeSize) noexcept
Definition Endian.h:61
int32_t i
uint16_t pos
Definition RawData.h:3
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:473
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:489
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:540
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:495
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:534
TBranch * ptr
bool mVLA
std::pair< std::shared_ptr< arrow::ChunkedArray >, std::shared_ptr< arrow::Field > > read(TBuffer *buffer)
BranchToColumn(TBranch *branch, bool VLA, std::string name, EDataType type, int listSize, arrow::MemoryPool *pool)
ColumnToBranch(TTree *tree, std::shared_ptr< arrow::ChunkedArray > const &column, std::shared_ptr< arrow::Field > const &field)
void at(const int64_t *pos)
FragmentToBatch(arrow::MemoryPool *pool=arrow::default_memory_pool())
void fill(std::shared_ptr< arrow::dataset::FileFragment >, std::shared_ptr< arrow::Schema > dataSetSchema, std::shared_ptr< arrow::dataset::FileFormat >)
std::shared_ptr< arrow::RecordBatch > finalize()
void setLabel(const char *label)
void addBranch(std::shared_ptr< arrow::ChunkedArray > const &column, std::shared_ptr< arrow::Field > const &field)
TableToTree(std::shared_ptr< arrow::Table > const &table, TFile *file, const char *treename)
std::shared_ptr< TTree > process()
void addAllColumns(TTree *tree, std::vector< std::string > &&names={})
void setLabel(const char *label)
TreeToTable(arrow::MemoryPool *pool=arrow::default_memory_pool())
std::shared_ptr< arrow::Table > finalize()
GLdouble n
Definition glcorearb.h:1982
GLint GLsizei count
Definition glcorearb.h:399
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLsizei const GLuint const GLintptr * offsets
Definition glcorearb.h:2595
GLenum array
Definition glcorearb.h:4274
GLuint const GLchar * name
Definition glcorearb.h:781
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint GLint GLsizei GLint GLenum format
Definition glcorearb.h:275
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RuntimeErrorRef runtime_error(const char *)
auto basicROOTTypeFromArrow(arrow::Type::type id)
auto arrowTypeFromROOT(EDataType type, int size)
RuntimeErrorRef runtime_error_f(const char *,...)
Defining DataPointCompositeObject explicitly as copiable.
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
Definition list.h:40
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))
std::vector< int > row