16#include "arrow/type_traits.h"
17#include <arrow/dataset/file_base.h>
18#include <arrow/record_batch.h>
19#include <arrow/type.h>
20#include <arrow/util/key_value_metadata.h>
21#include <TBufferFile.h>
30static constexpr char const* sizeBranchSuffix =
"_size";
37 auto typeGenerator = [](std::shared_ptr<arrow::DataType>
const&
type,
int size) -> std::shared_ptr<arrow::DataType> {
40 return arrow::list(
type);
42 return std::move(
type);
44 return arrow::fixed_size_list(
type,
size);
49 case EDataType::kBool_t:
50 return typeGenerator(arrow::boolean(),
size);
51 case EDataType::kUChar_t:
52 return typeGenerator(arrow::uint8(),
size);
53 case EDataType::kUShort_t:
54 return typeGenerator(arrow::uint16(),
size);
55 case EDataType::kUInt_t:
56 return typeGenerator(arrow::uint32(),
size);
57 case EDataType::kULong64_t:
58 return typeGenerator(arrow::uint64(),
size);
59 case EDataType::kChar_t:
60 return typeGenerator(arrow::int8(),
size);
61 case EDataType::kShort_t:
62 return typeGenerator(arrow::int16(),
size);
63 case EDataType::kInt_t:
64 return typeGenerator(arrow::int32(),
size);
65 case EDataType::kLong64_t:
66 return typeGenerator(arrow::int64(),
size);
67 case EDataType::kFloat_t:
68 return typeGenerator(arrow::float32(),
size);
69 case EDataType::kDouble_t:
70 return typeGenerator(arrow::float64(),
size);
79 case arrow::Type::BOOL:
80 return ROOTTypeInfo{EDataType::kBool_t,
"/O", TDataType::GetDataType(EDataType::kBool_t)->Size()};
81 case arrow::Type::UINT8:
82 return ROOTTypeInfo{EDataType::kUChar_t,
"/b", TDataType::GetDataType(EDataType::kUChar_t)->Size()};
83 case arrow::Type::UINT16:
84 return ROOTTypeInfo{EDataType::kUShort_t,
"/s", TDataType::GetDataType(EDataType::kUShort_t)->Size()};
85 case arrow::Type::UINT32:
86 return ROOTTypeInfo{EDataType::kUInt_t,
"/i", TDataType::GetDataType(EDataType::kUInt_t)->Size()};
87 case arrow::Type::UINT64:
88 return ROOTTypeInfo{EDataType::kULong64_t,
"/l", TDataType::GetDataType(EDataType::kULong64_t)->Size()};
89 case arrow::Type::INT8:
90 return ROOTTypeInfo{EDataType::kChar_t,
"/B", TDataType::GetDataType(EDataType::kChar_t)->Size()};
91 case arrow::Type::INT16:
92 return ROOTTypeInfo{EDataType::kShort_t,
"/S", TDataType::GetDataType(EDataType::kShort_t)->Size()};
93 case arrow::Type::INT32:
94 return ROOTTypeInfo{EDataType::kInt_t,
"/I", TDataType::GetDataType(EDataType::kInt_t)->Size()};
95 case arrow::Type::INT64:
96 return ROOTTypeInfo{EDataType::kLong64_t,
"/L", TDataType::GetDataType(EDataType::kLong64_t)->Size()};
97 case arrow::Type::FLOAT:
98 return ROOTTypeInfo{EDataType::kFloat_t,
"/F", TDataType::GetDataType(EDataType::kFloat_t)->Size()};
99 case arrow::Type::DOUBLE:
100 return ROOTTypeInfo{EDataType::kDouble_t,
"/D", TDataType::GetDataType(EDataType::kDouble_t)->Size()};
121 if (mType == EDataType::kBool_t) {
123 auto status = arrow::MakeBuilder(mPool, mArrowType->field(0)->type(), &mBuilder);
127 mListBuilder = std::make_unique<arrow::FixedSizeListBuilder>(mPool, std::move(mBuilder), mListSize);
128 mValueBuilder =
static_cast<arrow::FixedSizeListBuilder*
>(mListBuilder.get())->value_builder();
130 auto status = arrow::MakeBuilder(mPool, mArrowType, &mBuilder);
134 mValueBuilder = mBuilder.get();
142 auto totalEntries = mBranch->GetEntries();
143 arrow::Status status;
146 std::shared_ptr<arrow::Array>
array;
148 if (mType == EDataType::kBool_t) {
150 status = mValueBuilder->Reserve(totalEntries * mListSize);
152 status &= mListBuilder->Reserve(totalEntries);
155 throw runtime_error(
"Failed to reserve memory for array builder");
157 while (readEntries < totalEntries) {
158 auto readLast = mBranch->GetBulkRead().GetBulkEntries(readEntries, *
buffer);
159 readEntries += readLast;
160 status &=
static_cast<arrow::BooleanBuilder*
>(mValueBuilder)->AppendValues(
reinterpret_cast<uint8_t const*
>(
buffer->GetCurrent()), readLast * mListSize);
163 status &=
static_cast<arrow::FixedSizeListBuilder*
>(mListBuilder.get())->AppendValues(readEntries);
169 status &= mListBuilder->Finish(&
array);
171 status &= mValueBuilder->Finish(&
array);
178 size_t branchSize = mBranch->GetTotBytes();
179 auto&&
result = arrow::AllocateResizableBuffer(mBranch->GetTotBytes(), mPool);
180 O2_SIGNPOST_EVENT_EMIT(tabletree_helpers, sid,
"BranchToColumn",
"Allocating %ld bytes for %{public}s", branchSize, mBranch->GetName());
184 std::shared_ptr<arrow::Buffer> arrowValuesBuffer = std::move(
result).ValueUnsafe();
185 auto ptr = arrowValuesBuffer->mutable_data();
186 if (
ptr ==
nullptr) {
190 auto typeSize = TDataType::GetDataType(mType)->Size();
191 std::unique_ptr<TBufferFile> offsetBuffer =
nullptr;
195 std::shared_ptr<arrow::Buffer> arrowOffsetBuffer;
198 uint32_t totalSize = 0;
199 TBranch* mSizeBranch =
nullptr;
201 mSizeBranch = mBranch->GetTree()->GetBranch((std::string{mBranch->GetName()} + TableTreeHelpers::sizeBranchSuffix).c_str());
202 offsetBuffer = std::make_unique<TBufferFile>(TBuffer::EMode::kWrite, 4 * 1024 * 1024);
203 result = arrow::AllocateResizableBuffer((totalEntries + 1) * (int64_t)
sizeof(
int), mPool);
207 arrowOffsetBuffer = std::move(
result).ValueUnsafe();
208 unsigned char* ptrOffset = arrowOffsetBuffer->mutable_data();
209 auto* tPtrOffset =
reinterpret_cast<int*
>(ptrOffset);
210 offsets = gsl::span<int>{tPtrOffset, tPtrOffset + totalEntries + 1};
213 while (readEntries < totalEntries) {
214 auto readLast = mSizeBranch->GetBulkRead().GetEntriesSerialized(readEntries, *offsetBuffer);
215 readEntries += readLast;
216 for (
auto i = 0;
i < readLast; ++
i) {
218 offset +=
swap32_(
reinterpret_cast<uint32_t*
>(offsetBuffer->GetCurrent())[
i]);
226 while (readEntries < totalEntries) {
227 auto readLast = mBranch->GetBulkRead().GetEntriesSerialized(readEntries, *
buffer);
231 size = readLast * mListSize;
233 readEntries += readLast;
235 ptr += (ptrdiff_t)(
size * typeSize);
238 totalSize = readEntries * mListSize;
240 std::shared_ptr<arrow::PrimitiveArray> varray;
243 varray = std::make_shared<arrow::PrimitiveArray>(mArrowType->field(0)->type(), totalSize, arrowValuesBuffer);
244 array = std::make_shared<arrow::ListArray>(mArrowType, readEntries, arrowOffsetBuffer, varray);
247 array = std::make_shared<arrow::PrimitiveArray>(mArrowType, readEntries, arrowValuesBuffer);
250 varray = std::make_shared<arrow::PrimitiveArray>(mArrowType->field(0)->type(), totalSize, arrowValuesBuffer);
251 array = std::make_shared<arrow::FixedSizeListArray>(mArrowType, readEntries, varray);
255 auto fullArray = std::make_shared<arrow::ChunkedArray>(
array);
256 auto field = std::make_shared<arrow::Field>(mBranch->GetName(), mArrowType);
258 mBranch->SetStatus(
false);
259 mBranch->DropBaskets(
"all");
261 mBranch->GetTransientBuffer(0)->Expand(0);
263 return std::make_pair(fullArray, field);
267 : mBranchName{field->
name()},
268 mColumn{column.get()},
269 mFieldSize{field->
type()->byte_width()}
271 std::string leafList;
272 std::string sizeLeafList;
273 auto arrowType = field->type();
274 mFieldType = arrowType->id();
275 switch (mFieldType) {
276 case arrow::Type::FIXED_SIZE_LIST:
277 mListSize = std::static_pointer_cast<arrow::FixedSizeListType>(arrowType)->list_size();
278 arrowType = arrowType->field(0)->type();
281 mFieldSize = arrowType->byte_width() * mListSize;
283 case arrow::Type::LIST:
284 arrowType = arrowType->field(0)->type();
286 leafList = mBranchName +
"[" + mBranchName + TableTreeHelpers::sizeBranchSuffix +
"]" + mElementType.
suffix;
287 sizeLeafList = mBranchName + TableTreeHelpers::sizeBranchSuffix +
"/I";
290 mFieldSize = arrowType->byte_width();
294 leafList = mBranchName + mElementType.
suffix;
297 if (!sizeLeafList.empty()) {
298 mSizeBranch =
tree->GetBranch((mBranchName + TableTreeHelpers::sizeBranchSuffix).c_str());
299 if (mSizeBranch ==
nullptr) {
300 mSizeBranch =
tree->Branch((mBranchName + TableTreeHelpers::sizeBranchSuffix).c_str(), (
char*)
nullptr, sizeLeafList.c_str());
303 mBranch =
tree->GetBranch(mBranchName.c_str());
304 if (mBranch ==
nullptr) {
305 mBranch =
tree->Branch(mBranchName.c_str(), (
char*)
nullptr, leafList.c_str());
307 if (mElementType.
type == EDataType::kBool_t) {
308 cache.resize(mListSize);
318 if (mElementType.
type == EDataType::kBool_t) {
319 auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(mCurrentArray);
320 for (
auto i = 0;
i < mListSize; ++
i) {
321 cache[
i] = boolArray->Value((*
pos - mFirstIndex) * mListSize +
i);
323 mBranch->SetAddress((
void*)(cache.data()));
327 switch (mFieldType) {
328 case arrow::Type::LIST: {
329 auto list = std::static_pointer_cast<arrow::ListArray>(mCurrentArray);
330 mListSize =
list->value_length((*
pos - mFirstIndex));
331 buffer = std::static_pointer_cast<arrow::PrimitiveArray>(
list->values())->values()->data() + mCurrentArray->offset() +
list->value_offset((*
pos - mFirstIndex)) * mElementType.
size;
332 mBranch->SetAddress((
void*)
buffer);
333 mSizeBranch->SetAddress(&mListSize);
336 case arrow::Type::FIXED_SIZE_LIST:
338 buffer = std::static_pointer_cast<arrow::PrimitiveArray>(mCurrentArray)->values()->data() + mCurrentArray->offset() + (*
pos - mFirstIndex) * mListSize * mElementType.
size;
339 mBranch->SetAddress((
void*)
buffer);
344void ColumnToBranch::accessChunk()
346 auto array = mColumn->chunk(mCurrentChunk);
347 switch (mFieldType) {
348 case arrow::Type::FIXED_SIZE_LIST: {
349 auto list = std::static_pointer_cast<arrow::FixedSizeListArray>(
array);
350 mChunkLength =
list->length();
351 mCurrentArray =
list->values();
354 case arrow::Type::LIST: {
355 auto list = std::static_pointer_cast<arrow::ListArray>(
array);
356 mChunkLength =
list->length();
357 mCurrentArray =
list;
361 mCurrentArray =
array;
362 mChunkLength = mCurrentArray->length();
366void ColumnToBranch::nextChunk()
368 mFirstIndex += mChunkLength;
375 mTable = table.get();
376 mTree.reset(
static_cast<TTree*
>(
file->Get(treename)));
380 std::string treeName(treename);
381 auto pos = treeName.find_first_of(
'/');
382 if (
pos != std::string::npos) {
383 file->cd(treeName.substr(0,
pos).c_str());
384 treeName = treeName.substr(
pos + 1, std::string::npos);
386 mTree = std::make_shared<TTree>(treeName.c_str(), treeName.c_str());
391 mRows = mTable->num_rows();
392 auto columns = mTable->columns();
393 auto fields = mTable->schema()->fields();
394 assert(columns.size() == fields.size());
395 for (
auto i = 0u;
i < columns.size(); ++
i) {
400void TableToTree::addBranch(std::shared_ptr<arrow::ChunkedArray>
const& column, std::shared_ptr<arrow::Field>
const& field)
403 mRows = column->length();
404 }
else if (mRows != column->length()) {
405 throw runtime_error_f(
"Adding incompatible column with size %d (num rows = %d)", column->length(), mRows);
407 mColumnReaders.emplace_back(
new ColumnToBranch{mTree.get(), column, field});
413 if (mTree->GetNbranches() == 0 || mRows == 0) {
414 mTree->Write(
"", TObject::kOverwrite);
415 mTree->SetDirectory(
nullptr);
419 for (
auto& reader : mColumnReaders) {
420 int idealBasketSize = 1024 + reader->fieldSize() * reader->columnEntries();
421 int basketSize = std::max(32000, idealBasketSize);
424 mTree->SetBasketSize(reader->branchName(), basketSize);
426 if (strncmp(reader->branchName(),
"fIndexArray", strlen(
"fIndexArray")) == 0) {
427 std::string sizeBranch = reader->branchName();
428 sizeBranch +=
"_size";
432 int idealBasketSize = 4 * mRows + 1024 + reader->fieldSize() * reader->columnEntries();
433 int basketSize = std::max(32000, idealBasketSize);
434 mTree->SetBasketSize(sizeBranch.c_str(), basketSize);
435 mTree->SetBasketSize(reader->branchName(), basketSize);
439 while (
row < mRows) {
440 for (
auto& reader : mColumnReaders) {
446 mTree->Write(
"", TObject::kOverwrite);
447 mTree->SetDirectory(
nullptr);
452 : mArrowMemoryPool{pool}
467 auto branches =
tree->GetListOfBranches();
468 auto n = branches->GetEntries();
473 std::vector<BranchInfo> branchInfos;
474 for (
auto i = 0;
i <
n; ++
i) {
475 auto branch =
static_cast<TBranch*
>(branches->At(
i));
476 auto name = std::string{branch->GetName()};
477 auto pos =
name.find(TableTreeHelpers::sizeBranchSuffix);
478 if (
pos != std::string::npos) {
480 branchInfos.emplace_back(BranchInfo{
name, (TBranch*)
nullptr,
true});
482 auto lookup = std::find_if(branchInfos.begin(), branchInfos.end(), [&](BranchInfo
const& bi) {
483 return bi.name == name;
485 if (
lookup == branchInfos.end()) {
486 branchInfos.emplace_back(BranchInfo{
name, branch,
false});
494 for (
auto& bi : branchInfos) {
495 addReader(bi.ptr, bi.name, bi.mVLA);
498 for (
auto&
name : names) {
499 auto lookup = std::find_if(branchInfos.begin(), branchInfos.end(), [&](BranchInfo
const& bi) {
500 return name == bi.name;
502 if (
lookup != branchInfos.end()) {
506 if (names.size() != mBranchReaders.size()) {
507 LOGF(warn,
"Not all requested columns were found in the tree");
510 if (mBranchReaders.empty()) {
515 tree->SetCacheSize(25000000);
517 for (
auto& reader : mBranchReaders) {
518 tree->AddBranchToCache(reader->branch());
519 if (strncmp(reader->branch()->GetName(),
"fIndexArray", strlen(
"fIndexArray")) == 0) {
520 std::string sizeBranchName = reader->branch()->GetName();
521 sizeBranchName +=
"_size";
522 auto* sizeBranch = (TBranch*)
tree->GetBranch(sizeBranchName.c_str());
524 tree->AddBranchToCache(sizeBranch);
528 tree->StopCacheLearningPhase();
538 std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
539 std::vector<std::shared_ptr<arrow::Field>> fields;
543 for (
auto& reader : mBranchReaders) {
545 auto arrayAndField = reader->read(&
buffer);
546 columns.push_back(arrayAndField.first);
547 fields.push_back(arrayAndField.second);
549 O2_SIGNPOST_END(tabletree_helpers, sid,
"TreeToTable",
"Done filling.");
551 auto schema = std::make_shared<arrow::Schema>(fields, std::make_shared<arrow::KeyValueMetadata>(std::vector{std::string{
"label"}}, std::vector{mTableLabel}));
552 mTable = arrow::Table::Make(schema, columns);
555void TreeToTable::addReader(TBranch* branch, std::string
const&
name,
bool VLA)
559 branch->GetExpectedType(cls,
type);
562 listSize =
static_cast<TLeaf*
>(branch->GetListOfLeaves()->At(0))->GetLenStatic();
564 mBranchReaders.emplace_back(std::make_unique<BranchToColumn>(branch,
VLA,
name,
type, listSize, mArrowMemoryPool));
573 : mArrowMemoryPool{pool}
582void FragmentToBatch::fill(std::shared_ptr<arrow::dataset::FileFragment> fragment, std::shared_ptr<arrow::Schema> schema, std::shared_ptr<arrow::dataset::FileFormat>
format)
584 auto options = std::make_shared<arrow::dataset::ScanOptions>();
585 options->dataset_schema = schema;
586 auto scanner =
format->ScanBatchesAsync(options, fragment);
587 auto batch = (*scanner)();
588 mRecordBatch = *batch.result();
#define O2_BUILTIN_UNLIKELY(x)
uint8_t lookup(const char input) noexcept
void swapCopy(unsigned char *dest, char *source, int size, int typeSize) noexcept
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)
std::pair< std::shared_ptr< arrow::ChunkedArray >, std::shared_ptr< arrow::Field > > read(TBuffer *buffer)
BranchToColumn(TBranch *branch, bool VLA, std::string name, EDataType type, int listSize, arrow::MemoryPool *pool)
ColumnToBranch(TTree *tree, std::shared_ptr< arrow::ChunkedArray > const &column, std::shared_ptr< arrow::Field > const &field)
void at(const int64_t *pos)
FragmentToBatch(arrow::MemoryPool *pool=arrow::default_memory_pool())
void fill(std::shared_ptr< arrow::dataset::FileFragment >, std::shared_ptr< arrow::Schema > dataSetSchema, std::shared_ptr< arrow::dataset::FileFormat >)
std::shared_ptr< arrow::RecordBatch > finalize()
void setLabel(const char *label)
void addBranch(std::shared_ptr< arrow::ChunkedArray > const &column, std::shared_ptr< arrow::Field > const &field)
TableToTree(std::shared_ptr< arrow::Table > const &table, TFile *file, const char *treename)
std::shared_ptr< TTree > process()
void addAllColumns(TTree *tree, std::vector< std::string > &&names={})
void setLabel(const char *label)
TreeToTable(arrow::MemoryPool *pool=arrow::default_memory_pool())
std::shared_ptr< arrow::Table > finalize()
GLuint GLsizei const GLuint const GLintptr * offsets
GLuint const GLchar * name
GLint GLint GLsizei GLint GLenum GLenum type
GLuint GLsizei const GLchar * label
GLint GLint GLsizei GLint GLenum format
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error(const char *)
auto basicROOTTypeFromArrow(arrow::Type::type id)
auto arrowTypeFromROOT(EDataType type, int size)
RuntimeErrorRef runtime_error_f(const char *,...)
Defining DataPointCompositeObject explicitly as copiable.
std::string to_string(gsl::span< T, Size > span)
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))