592 const std::shared_ptr<arrow::dataset::ScanOptions>& options,
593 const std::shared_ptr<arrow::dataset::FileFragment>& fragment)
const
595 assert(options->dataset_schema !=
nullptr);
597 auto dataset_schema = options->dataset_schema;
598 auto treeFragment = std::dynamic_pointer_cast<TTreeFileFragment>(fragment);
599 if (treeFragment.get() ==
nullptr) {
600 return {arrow::Status::NotImplemented(
"Not a ttree fragment")};
603 auto generator = [pool = options->pool, treeFragment, dataset_schema, &totalCompressedSize = mTotCompressedSize,
604 &totalUncompressedSize = mTotUncompressedSize]() -> arrow::Future<std::shared_ptr<arrow::RecordBatch>> {
606 O2_SIGNPOST_START(root_arrow_fs, tid,
"Generator",
"Creating batch for tree %{public}s", treeFragment->GetTree()->GetName());
607 std::vector<std::shared_ptr<arrow::Array>> columns;
608 std::vector<std::shared_ptr<arrow::Field>> fields = dataset_schema->fields();
609 auto physical_schema = *treeFragment->ReadPhysicalSchema();
611 if (dataset_schema->num_fields() > physical_schema->num_fields()) {
612 throw runtime_error_f(
"One TTree must have all the fields requested in a table");
616 std::vector<BranchFieldMapping> mappings;
620 for (
int fi = 0; fi < dataset_schema->num_fields(); ++fi) {
621 auto dataset_field = dataset_schema->field(fi);
624 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Generator",
"Processing dataset field %{public}s.", dataset_field->name().c_str());
625 int physicalFieldIdx = physical_schema->GetFieldIndex(dataset_field->name());
627 if (physicalFieldIdx < 0) {
628 throw runtime_error_f(
"Cannot find physical field associated to %s. Possible fields: %s",
629 dataset_field->name().c_str(), physical_schema->ToString().c_str());
631 if (physicalFieldIdx > 0 && physical_schema->field(physicalFieldIdx - 1)->name().ends_with(
"_size")) {
632 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Generator",
"Field %{public}s has sizes in %{public}s.", dataset_field->name().c_str(),
633 physical_schema->field(physicalFieldIdx - 1)->name().c_str());
634 mappings.push_back({physicalFieldIdx, physicalFieldIdx - 1, fi});
637 if (physicalFieldIdx > 0) {
638 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Generator",
"Field %{public}s previous field is %{public}s.", dataset_field->name().c_str(),
639 physical_schema->field(physicalFieldIdx - 1)->name().c_str());
641 mappings.push_back({physicalFieldIdx, -1, fi});
646 auto*
tree = treeFragment->GetTree();
647 auto branches =
tree->GetListOfBranches();
648 size_t totalTreeSize = 0;
649 std::vector<TBranch*> selectedBranches;
650 for (
auto& mapping : mappings) {
651 selectedBranches.push_back((TBranch*)branches->At(mapping.mainBranchIdx));
652 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Generator",
"Adding branch %{public}s to stream.", selectedBranches.back()->GetName());
653 totalTreeSize += selectedBranches.back()->GetTotalSize();
654 if (mapping.vlaIdx != -1) {
655 selectedBranches.push_back((TBranch*)branches->At(mapping.vlaIdx));
656 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Generator",
"Adding branch %{public}s to stream.", selectedBranches.back()->GetName());
657 totalTreeSize += selectedBranches.back()->GetTotalSize();
661 size_t cacheSize = std::max(std::min(totalTreeSize, 25000000UL), 1000000UL);
663 tree->SetCacheSize(cacheSize);
664 for (
auto* branch : selectedBranches) {
665 tree->AddBranchToCache(branch,
false);
667 tree->StopCacheLearningPhase();
670 std::vector<ReadOps>& ops = treeFragment->ops();
672 ops.reserve(opsCount);
673 for (
size_t mi = 0; mi < mappings.size(); ++mi) {
677 auto physicalField = physical_schema->field(mapping.
mainBranchIdx);
679 if (mapping.
vlaIdx != -1) {
680 auto* branch = (TBranch*)branches->At(mapping.
vlaIdx);
683 .rootBranchEntries = branch->GetEntries(),
688 auto&
op = ops.back();
689 ARROW_ASSIGN_OR_RAISE(
op.targetBuffer, arrow::AllocateBuffer((
op.rootBranchEntries + 1) *
op.typeSize, pool));
694 auto& valueOp = ops.back();
695 valueOp.branch = (TBranch*)branches->At(mapping.
mainBranchIdx);
696 valueOp.rootBranchEntries = valueOp.branch->GetEntries();
699 auto listType = std::dynamic_pointer_cast<arrow::FixedSizeListType>(datasetField->type());
700 valueOp.typeSize = physicalField->type()->byte_width();
703 if ((datasetField->type() == arrow::boolean())) {
705 valueOp.listSize = 1;
706 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp((valueOp.rootBranchEntries + 7) / 8);
707 }
else if (listType && datasetField->type()->field(0)->type() == arrow::boolean()) {
708 valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width();
709 valueOp.listSize = listType->list_size();
711 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp((valueOp.rootBranchEntries * valueOp.listSize) / 8 + 1);
712 }
else if (mapping.
vlaIdx != -1) {
713 valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width();
714 valueOp.listSize = -1;
717 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(ops[ops.size() - 2].offsetCount * valueOp.typeSize);
718 }
else if (listType) {
720 valueOp.listSize = listType->list_size();
721 valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width();
722 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(valueOp.rootBranchEntries * valueOp.typeSize * valueOp.listSize);
724 valueOp.typeSize = physicalField->type()->byte_width();
726 valueOp.listSize = 1;
727 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(valueOp.rootBranchEntries * valueOp.typeSize);
729 arrow::Status status;
730 std::shared_ptr<arrow::Array>
array;
733 auto vdata = std::make_shared<arrow::ArrayData>(datasetField->type()->field(0)->type(), valueOp.rootBranchEntries * valueOp.listSize,
734 std::vector<std::shared_ptr<arrow::Buffer>>{nullptr, valueOp.targetBuffer});
735 array = std::make_shared<arrow::FixedSizeListArray>(datasetField->type(), valueOp.rootBranchEntries, arrow::MakeArray(vdata));
737 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Op",
"Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
738 valueOp.branch->GetName(),
739 valueOp.rootBranchEntries,
740 valueOp.targetBuffer->size());
741 }
else if (mapping.
vlaIdx != -1) {
742 auto& offsetOp = ops[ops.size() - 2];
743 auto vdata = std::make_shared<arrow::ArrayData>(datasetField->type()->field(0)->type(), offsetOp.offsetCount,
744 std::vector<std::shared_ptr<arrow::Buffer>>{nullptr, valueOp.targetBuffer});
746 array = std::make_shared<arrow::ListArray>(datasetField->type(), offsetOp.rootBranchEntries, offsetOp.targetBuffer, arrow::MakeArray(vdata));
747 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Op",
"Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
748 offsetOp.branch->GetName(), offsetOp.rootBranchEntries, offsetOp.targetBuffer->size());
749 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Op",
"Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
750 valueOp.branch->GetName(),
751 offsetOp.offsetCount,
752 valueOp.targetBuffer->size());
754 auto data = std::make_shared<arrow::ArrayData>(datasetField->type(), valueOp.rootBranchEntries,
755 std::vector<std::shared_ptr<arrow::Buffer>>{nullptr, valueOp.targetBuffer});
757 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Op",
"Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
758 valueOp.branch->GetName(),
759 valueOp.rootBranchEntries,
760 valueOp.targetBuffer->size());
763 columns.push_back(
array);
769 for (
size_t i = 0;
i < ops.size(); ++
i) {
772 rows =
op.rootBranchEntries;
775 auto& offsetOp = ops[
i - 1];
776 rows = offsetOp.rootBranchEntries;
779 throw runtime_error_f(
"Unmatching number of rows for branch %s. Expected %lli, found %lli",
op.branch->GetName(),
rows,
op.rootBranchEntries);
782 throw runtime_error_f(
"Unmatching number of rows for branch %s. Expected %lli, found %lli",
op.branch->GetName(),
rows, ops[
i - 1].offsetCount);
786 auto batch = arrow::RecordBatch::Make(dataset_schema,
rows, columns);
787 totalCompressedSize +=
tree->GetZipBytes();
788 totalUncompressedSize +=
tree->GetTotBytes();
789 O2_SIGNPOST_END(root_arrow_fs, tid,
"Generator",
"Done creating batch compressed:%zu uncompressed:%zu", totalCompressedSize, totalUncompressedSize);
958 std::vector<TBranch*> branches;
959 std::vector<TBranch*> sizesBranches;
960 std::vector<std::shared_ptr<arrow::Array>> valueArrays;
961 std::vector<std::shared_ptr<arrow::Array>> sizeArrays;
962 std::vector<std::shared_ptr<arrow::DataType>> valueTypes;
964 std::vector<int64_t> valuesIdealBasketSize;
965 std::vector<int64_t> sizeIdealBasketSize;
967 std::vector<int64_t> typeSizes;
968 std::vector<int64_t> listSizes;
969 bool firstBasket =
true;
972 void finaliseBasketSize(std::shared_ptr<arrow::RecordBatch> firstBatch)
975 O2_SIGNPOST_START(root_arrow_fs, sid,
"finaliseBasketSize",
"First batch with %lli rows received and %zu columns",
976 firstBatch->num_rows(), firstBatch->columns().size());
977 for (
size_t i = 0;
i < branches.size();
i++) {
978 auto* branch = branches[
i];
979 auto* sizeBranch = sizesBranches[
i];
981 int valueSize = valueTypes[
i]->byte_width();
982 if (listSizes[
i] == 1) {
983 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid,
"finaliseBasketSize",
"Branch %s exists and uses %d bytes per entry for %lli entries.",
984 branch->GetName(), valueSize, firstBatch->num_rows());
985 assert(sizeBranch ==
nullptr);
986 branch->SetBasketSize(1024 + firstBatch->num_rows() * valueSize);
987 }
else if (listSizes[
i] == -1) {
988 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid,
"finaliseBasketSize",
"Branch %s exists and uses %d bytes per entry.",
989 branch->GetName(), valueSize);
991 auto column = firstBatch->GetColumnByName(schema_->field(
i)->name());
992 auto list = std::static_pointer_cast<arrow::ListArray>(column);
993 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid,
"finaliseBasketSize",
"Branch %s needed. Associated size branch %s and there are %lli entries of size %d in that list.",
994 branch->GetName(), sizeBranch->GetName(),
list->length(), valueSize);
995 branch->SetBasketSize(1024 + firstBatch->num_rows() * valueSize *
list->length());
996 sizeBranch->SetBasketSize(1024 + firstBatch->num_rows() * 4);
998 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid,
"finaliseBasketSize",
"Branch %s needed. There are %lli entries per array of size %d in that list.",
999 branch->GetName(), listSizes[
i], valueSize);
1000 assert(sizeBranch ==
nullptr);
1001 branch->SetBasketSize(1024 + firstBatch->num_rows() * valueSize * listSizes[
i]);
1004 auto field = firstBatch->schema()->field(
i);
1005 if (field->name().starts_with(
"fIndexArray")) {
1007 int idealBasketSize = 4 * firstBatch->num_rows() + 1024 + field->type()->byte_width() * firstBatch->num_rows();
1008 int basketSize = std::max(32000, idealBasketSize);
1009 sizeBranch->SetBasketSize(basketSize);
1010 branch->SetBasketSize(basketSize);
1019 TTreeFileWriter(std::shared_ptr<arrow::Schema> schema, std::shared_ptr<arrow::dataset::FileWriteOptions> options,
1020 std::shared_ptr<arrow::io::OutputStream> destination,
1021 arrow::fs::FileLocator destination_locator)
1022 : FileWriter(schema, options, destination, destination_locator)
1025 auto directoryStream = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(destination_);
1026 auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
1028 if (directoryStream.get()) {
1029 TDirectoryFile* dir = directoryStream->GetDirectory();
1031 auto*
tree =
new TTree(destination_locator_.path.c_str(),
"");
1032 treeStream = std::make_shared<TTreeOutputStream>(
tree,
"");
1033 }
else if (treeStream.get()) {
1037 auto*
tree = treeStream->GetTree();
1038 treeStream = std::make_shared<TTreeOutputStream>(
tree, destination_locator_.path);
1041 throw std::runtime_error(
"Unsupported backend.");
1044 for (
auto i = 0u;
i < schema->fields().
size(); ++
i) {
1045 auto& field = schema->field(
i);
1046 listSizes.push_back(1);
1048 int valuesIdealBasketSize = 0;
1050 switch (field->type()->id()) {
1051 case arrow::Type::FIXED_SIZE_LIST: {
1052 listSizes.back() = std::static_pointer_cast<arrow::FixedSizeListType>(field->type())->list_size();
1053 valuesIdealBasketSize = 1024 + valueTypes.back()->byte_width() * listSizes.back();
1054 valueTypes.push_back(field->type()->field(0)->type());
1055 sizesBranches.push_back(
nullptr);
1056 std::string leafList = fmt::format(
"{}[{}]{}", field->name(), listSizes.back(),
rootSuffixFromArrow(valueTypes.back()->id()));
1057 branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
1059 case arrow::Type::LIST: {
1060 valueTypes.push_back(field->type()->field(0)->type());
1061 std::string leafList = fmt::format(
"{}[{}_size]{}", field->name(), field->name(),
rootSuffixFromArrow(valueTypes.back()->id()));
1062 listSizes.back() = -1;
1063 std::string sizeLeafList = field->name() +
"_size/I";
1064 sizesBranches.push_back(treeStream->CreateBranch((field->name() +
"_size").c_str(), sizeLeafList.c_str()));
1065 branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
1070 valueTypes.push_back(field->type());
1072 sizesBranches.push_back(
nullptr);
1073 branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
1080 arrow::Status
Write(
const std::shared_ptr<arrow::RecordBatch>& batch)
override
1083 firstBasket =
false;
1084 finaliseBasketSize(batch);
1088 if (batch->columns().empty() || batch->num_rows() == 0) {
1089 return arrow::Status::OK();
1093 auto directoryStream = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(destination_);
1094 TTree*
tree =
nullptr;
1095 if (directoryStream.get()) {
1096 TDirectoryFile* dir = directoryStream->GetDirectory();
1097 tree = (TTree*)dir->Get(destination_locator_.path.c_str());
1099 auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
1103 throw std::runtime_error(
"Unsupported backend.");
1106 for (
auto i = 0u;
i < batch->columns().
size(); ++
i) {
1107 auto column = batch->column(
i);
1108 auto& field = batch->schema()->field(
i);
1110 valueArrays.push_back(
nullptr);
1112 switch (field->type()->id()) {
1113 case arrow::Type::FIXED_SIZE_LIST: {
1114 auto list = std::static_pointer_cast<arrow::FixedSizeListArray>(column);
1115 if (
list->list_type()->field(0)->type()->id() == arrow::Type::BOOL) {
1116 int64_t
length =
list->length() *
list->list_type()->list_size();
1117 arrow::UInt8Builder builder;
1118 auto ok = builder.Reserve(
length);
1121 auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(
list->values());
1123 if (boolArray->IsValid(
i)) {
1125 uint8_t
value = boolArray->Value(
i) ? 1 : 0;
1126 auto ok = builder.Append(
value);
1129 auto ok = builder.AppendNull();
1132 valueArrays.back() = *builder.Finish();
1134 valueArrays.back() =
list->values();
1137 case arrow::Type::LIST: {
1138 auto list = std::static_pointer_cast<arrow::ListArray>(column);
1139 valueArrays.back() =
list->values();
1141 case arrow::Type::BOOL: {
1144 auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(column);
1146 int64_t
length = boolArray->length();
1147 arrow::UInt8Builder builder;
1148 auto ok = builder.Reserve(
length);
1151 if (boolArray->IsValid(
i)) {
1153 uint8_t
value = boolArray->Value(
i) ? 1 : 0;
1154 auto ok = builder.Append(
value);
1157 auto ok = builder.AppendNull();
1160 valueArrays.back() = *builder.Finish();
1163 valueArrays.back() = column;
1168 while (pos < batch->num_rows()) {
1169 for (
size_t bi = 0; bi < branches.size(); ++bi) {
1170 auto* branch = branches[bi];
1171 auto* sizeBranch = sizesBranches[bi];
1172 auto array = batch->column(bi);
1173 auto& field = batch->schema()->field(bi);
1174 auto& listSize = listSizes[bi];
1175 auto valueType = valueTypes[bi];
1176 auto valueArray = valueArrays[bi];
1178 switch (field->type()->id()) {
1179 case arrow::Type::LIST: {
1180 auto list = std::static_pointer_cast<arrow::ListArray>(
array);
1181 listSize =
list->value_length(
pos);
1182 uint8_t
const*
buffer = std::static_pointer_cast<arrow::PrimitiveArray>(valueArray)->values()->data() +
array->offset() +
list->value_offset(
pos) * valueType->byte_width();
1183 branch->SetAddress((
void*)
buffer);
1184 sizeBranch->SetAddress(&listSize);
1186 case arrow::Type::FIXED_SIZE_LIST:
1189 auto byteWidth = valueType->byte_width() ? valueType->byte_width() : 1;
1190 uint8_t
const*
buffer = std::static_pointer_cast<arrow::PrimitiveArray>(valueArray)->values()->data() +
array->offset() +
pos * listSize * byteWidth;
1191 branch->SetAddress((
void*)
buffer);
1198 return arrow::Status::OK();
1203 auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
1204 auto*
tree = treeStream->GetTree();
1205 tree->Write(
"", TObject::kOverwrite);
1206 tree->SetDirectory(
nullptr);