16#include <arrow/dataset/file_base.h>
17#include <arrow/record_batch.h>
18#include <arrow/type.h>
19#include <arrow/util/key_value_metadata.h>
20#include <TBufferFile.h>
29static constexpr char const* sizeBranchSuffix =
"_size";
36 auto typeGenerator = [](std::shared_ptr<arrow::DataType>
const&
type,
int size) -> std::shared_ptr<arrow::DataType> {
39 return arrow::list(
type);
41 return std::move(
type);
43 return arrow::fixed_size_list(
type,
size);
48 case EDataType::kBool_t:
49 return typeGenerator(arrow::boolean(),
size);
50 case EDataType::kUChar_t:
51 return typeGenerator(arrow::uint8(),
size);
52 case EDataType::kUShort_t:
53 return typeGenerator(arrow::uint16(),
size);
54 case EDataType::kUInt_t:
55 return typeGenerator(arrow::uint32(),
size);
56 case EDataType::kULong64_t:
57 return typeGenerator(arrow::uint64(),
size);
58 case EDataType::kChar_t:
59 return typeGenerator(arrow::int8(),
size);
60 case EDataType::kShort_t:
61 return typeGenerator(arrow::int16(),
size);
62 case EDataType::kInt_t:
63 return typeGenerator(arrow::int32(),
size);
64 case EDataType::kLong64_t:
65 return typeGenerator(arrow::int64(),
size);
66 case EDataType::kFloat_t:
67 return typeGenerator(arrow::float32(),
size);
68 case EDataType::kDouble_t:
69 return typeGenerator(arrow::float64(),
size);
78 case arrow::Type::BOOL:
79 return ROOTTypeInfo{EDataType::kBool_t,
"/O", TDataType::GetDataType(EDataType::kBool_t)->Size()};
80 case arrow::Type::UINT8:
81 return ROOTTypeInfo{EDataType::kUChar_t,
"/b", TDataType::GetDataType(EDataType::kUChar_t)->Size()};
82 case arrow::Type::UINT16:
83 return ROOTTypeInfo{EDataType::kUShort_t,
"/s", TDataType::GetDataType(EDataType::kUShort_t)->Size()};
84 case arrow::Type::UINT32:
85 return ROOTTypeInfo{EDataType::kUInt_t,
"/i", TDataType::GetDataType(EDataType::kUInt_t)->Size()};
86 case arrow::Type::UINT64:
87 return ROOTTypeInfo{EDataType::kULong64_t,
"/l", TDataType::GetDataType(EDataType::kULong64_t)->Size()};
88 case arrow::Type::INT8:
89 return ROOTTypeInfo{EDataType::kChar_t,
"/B", TDataType::GetDataType(EDataType::kChar_t)->Size()};
90 case arrow::Type::INT16:
91 return ROOTTypeInfo{EDataType::kShort_t,
"/S", TDataType::GetDataType(EDataType::kShort_t)->Size()};
92 case arrow::Type::INT32:
93 return ROOTTypeInfo{EDataType::kInt_t,
"/I", TDataType::GetDataType(EDataType::kInt_t)->Size()};
94 case arrow::Type::INT64:
95 return ROOTTypeInfo{EDataType::kLong64_t,
"/L", TDataType::GetDataType(EDataType::kLong64_t)->Size()};
96 case arrow::Type::FLOAT:
97 return ROOTTypeInfo{EDataType::kFloat_t,
"/F", TDataType::GetDataType(EDataType::kFloat_t)->Size()};
98 case arrow::Type::DOUBLE:
99 return ROOTTypeInfo{EDataType::kDouble_t,
"/D", TDataType::GetDataType(EDataType::kDouble_t)->Size()};
106 : mBranchName{field->
name()},
107 mColumn{column.get()},
108 mFieldSize{field->
type()->byte_width()}
110 std::string leafList;
111 std::string sizeLeafList;
112 auto arrowType = field->type();
113 mFieldType = arrowType->id();
114 switch (mFieldType) {
115 case arrow::Type::FIXED_SIZE_LIST:
116 mListSize = std::static_pointer_cast<arrow::FixedSizeListType>(arrowType)->list_size();
117 arrowType = arrowType->field(0)->type();
120 mFieldSize = arrowType->byte_width() * mListSize;
122 case arrow::Type::LIST:
123 arrowType = arrowType->field(0)->type();
125 leafList = mBranchName +
"[" + mBranchName + TableTreeHelpers::sizeBranchSuffix +
"]" + mElementType.
suffix;
126 sizeLeafList = mBranchName + TableTreeHelpers::sizeBranchSuffix +
"/I";
129 mFieldSize = arrowType->byte_width();
133 leafList = mBranchName + mElementType.
suffix;
136 if (!sizeLeafList.empty()) {
137 mSizeBranch =
tree->GetBranch((mBranchName + TableTreeHelpers::sizeBranchSuffix).c_str());
138 if (mSizeBranch ==
nullptr) {
139 mSizeBranch =
tree->Branch((mBranchName + TableTreeHelpers::sizeBranchSuffix).c_str(), (
char*)
nullptr, sizeLeafList.c_str());
142 mBranch =
tree->GetBranch(mBranchName.c_str());
143 if (mBranch ==
nullptr) {
144 mBranch =
tree->Branch(mBranchName.c_str(), (
char*)
nullptr, leafList.c_str());
146 if (mElementType.
type == EDataType::kBool_t) {
147 cache.resize(mListSize);
157 if (mElementType.
type == EDataType::kBool_t) {
158 auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(mCurrentArray);
159 for (
auto i = 0;
i < mListSize; ++
i) {
160 cache[
i] = boolArray->Value((*
pos - mFirstIndex) * mListSize +
i);
162 mBranch->SetAddress((
void*)(cache.data()));
166 switch (mFieldType) {
167 case arrow::Type::LIST: {
168 auto list = std::static_pointer_cast<arrow::ListArray>(mCurrentArray);
169 mListSize =
list->value_length((*
pos - mFirstIndex));
170 buffer = std::static_pointer_cast<arrow::PrimitiveArray>(
list->values())->values()->data() + mCurrentArray->offset() +
list->value_offset((*
pos - mFirstIndex)) * mElementType.
size;
171 mBranch->SetAddress((
void*)
buffer);
172 mSizeBranch->SetAddress(&mListSize);
175 case arrow::Type::FIXED_SIZE_LIST:
177 buffer = std::static_pointer_cast<arrow::PrimitiveArray>(mCurrentArray)->values()->data() + mCurrentArray->offset() + (*
pos - mFirstIndex) * mListSize * mElementType.
size;
178 mBranch->SetAddress((
void*)
buffer);
183void ColumnToBranch::accessChunk()
185 auto array = mColumn->chunk(mCurrentChunk);
186 switch (mFieldType) {
187 case arrow::Type::FIXED_SIZE_LIST: {
188 auto list = std::static_pointer_cast<arrow::FixedSizeListArray>(
array);
189 mChunkLength =
list->length();
190 mCurrentArray =
list->values();
193 case arrow::Type::LIST: {
194 auto list = std::static_pointer_cast<arrow::ListArray>(
array);
195 mChunkLength =
list->length();
196 mCurrentArray =
list;
200 mCurrentArray =
array;
201 mChunkLength = mCurrentArray->length();
205void ColumnToBranch::nextChunk()
207 mFirstIndex += mChunkLength;
214 mTable = table.get();
215 mTree.reset(
static_cast<TTree*
>(
file->Get(treename)));
219 std::string treeName(treename);
220 auto pos = treeName.find_first_of(
'/');
221 if (
pos != std::string::npos) {
222 file->cd(treeName.substr(0,
pos).c_str());
223 treeName = treeName.substr(
pos + 1, std::string::npos);
225 mTree = std::make_shared<TTree>(treeName.c_str(), treeName.c_str());
230 mRows = mTable->num_rows();
231 auto columns = mTable->columns();
232 auto fields = mTable->schema()->fields();
233 assert(columns.size() == fields.size());
234 for (
auto i = 0u;
i < columns.size(); ++
i) {
239void TableToTree::addBranch(std::shared_ptr<arrow::ChunkedArray>
const& column, std::shared_ptr<arrow::Field>
const& field)
242 mRows = column->length();
243 }
else if (mRows != column->length()) {
244 throw runtime_error_f(
"Adding incompatible column with size %d (num rows = %d)", column->length(), mRows);
246 mColumnReaders.emplace_back(
new ColumnToBranch{mTree.get(), column, field});
252 if (mTree->GetNbranches() == 0 || mRows == 0) {
253 mTree->Write(
"", TObject::kOverwrite);
254 mTree->SetDirectory(
nullptr);
258 for (
auto& reader : mColumnReaders) {
259 int idealBasketSize = 1024 + reader->fieldSize() * reader->columnEntries();
260 int basketSize = std::max(32000, idealBasketSize);
263 mTree->SetBasketSize(reader->branchName(), basketSize);
265 if (strncmp(reader->branchName(),
"fIndexArray", strlen(
"fIndexArray")) == 0) {
266 std::string sizeBranch = reader->branchName();
267 sizeBranch +=
"_size";
271 int idealBasketSize = 4 * mRows + 1024 + reader->fieldSize() * reader->columnEntries();
272 int basketSize = std::max(32000, idealBasketSize);
273 mTree->SetBasketSize(sizeBranch.c_str(), basketSize);
274 mTree->SetBasketSize(reader->branchName(), basketSize);
278 while (
row < mRows) {
279 for (
auto& reader : mColumnReaders) {
285 mTree->Write(
"", TObject::kOverwrite);
286 mTree->SetDirectory(
nullptr);
300 : mFragment{
std::move(fragment)},
301 mArrowMemoryPool{pool},
302 mCreator{
std::move(creator)}
313 auto options = std::make_shared<arrow::dataset::ScanOptions>();
314 options->dataset_schema = schema;
315 auto scanner =
format->ScanBatchesAsync(options, mFragment);
316 auto batch = (*scanner)();
317 mRecordBatch = *batch.result();
#define O2_BUILTIN_UNLIKELY(x)
#define O2_DECLARE_DYNAMIC_LOG(name)
ColumnToBranch(TTree *tree, std::shared_ptr< arrow::ChunkedArray > const &column, std::shared_ptr< arrow::Field > const &field)
void at(const int64_t *pos)
FragmentToBatch(StreamerCreator, std::shared_ptr< arrow::dataset::FileFragment >, arrow::MemoryPool *pool=arrow::default_memory_pool())
std::shared_ptr< arrow::RecordBatch > finalize()
std::function< std::shared_ptr< arrow::io::OutputStream >(std::shared_ptr< arrow::dataset::FileFragment >, const std::shared_ptr< arrow::ResizableBuffer > &buffer)> StreamerCreator
void setLabel(const char *label)
void fill(std::shared_ptr< arrow::Schema > dataSetSchema, std::shared_ptr< arrow::dataset::FileFormat >)
void addBranch(std::shared_ptr< arrow::ChunkedArray > const &column, std::shared_ptr< arrow::Field > const &field)
TableToTree(std::shared_ptr< arrow::Table > const &table, TFile *file, const char *treename)
std::shared_ptr< TTree > process()
GLuint const GLchar * name
GLint GLint GLsizei GLint GLenum GLenum type
GLuint GLsizei const GLchar * label
GLint GLint GLsizei GLint GLenum format
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error(const char *)
auto basicROOTTypeFromArrow(arrow::Type::type id)
auto arrowTypeFromROOT(EDataType type, int size)
RuntimeErrorRef runtime_error_f(const char *,...)
Defining DataPointCompositeObject explicitly as copiable.
std::string to_string(gsl::span< T, Size > span)
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))