591 const std::shared_ptr<arrow::dataset::ScanOptions>& options,
592 const std::shared_ptr<arrow::dataset::FileFragment>& fragment)
const
594 assert(options->dataset_schema !=
nullptr);
596 auto dataset_schema = options->dataset_schema;
597 auto treeFragment = std::dynamic_pointer_cast<TTreeFileFragment>(fragment);
598 if (treeFragment.get() ==
nullptr) {
599 return {arrow::Status::NotImplemented(
"Not a ttree fragment")};
602 auto generator = [pool = options->pool, treeFragment, dataset_schema, &totalCompressedSize = mTotCompressedSize,
603 &totalUncompressedSize = mTotUncompressedSize]() -> arrow::Future<std::shared_ptr<arrow::RecordBatch>> {
605 O2_SIGNPOST_START(root_arrow_fs, tid,
"Generator",
"Creating batch for tree %{public}s", treeFragment->GetTree()->GetName());
606 std::vector<std::shared_ptr<arrow::Array>> columns;
607 std::vector<std::shared_ptr<arrow::Field>> fields = dataset_schema->fields();
608 auto physical_schema = *treeFragment->ReadPhysicalSchema();
610 if (dataset_schema->num_fields() > physical_schema->num_fields()) {
611 throw runtime_error_f(
"One TTree must have all the fields requested in a table");
615 std::vector<BranchFieldMapping> mappings;
619 for (
int fi = 0; fi < dataset_schema->num_fields(); ++fi) {
620 auto dataset_field = dataset_schema->field(fi);
623 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Generator",
"Processing dataset field %{public}s.", dataset_field->name().c_str());
624 int physicalFieldIdx = physical_schema->GetFieldIndex(dataset_field->name());
626 if (physicalFieldIdx < 0) {
627 throw runtime_error_f(
"Cannot find physical field associated to %s. Possible fields: %s",
628 dataset_field->name().c_str(), physical_schema->ToString().c_str());
630 if (physicalFieldIdx > 0 && physical_schema->field(physicalFieldIdx - 1)->name().ends_with(
"_size")) {
631 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Generator",
"Field %{public}s has sizes in %{public}s.", dataset_field->name().c_str(),
632 physical_schema->field(physicalFieldIdx - 1)->name().c_str());
633 mappings.push_back({physicalFieldIdx, physicalFieldIdx - 1, fi});
636 if (physicalFieldIdx > 0) {
637 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Generator",
"Field %{public}s previous field is %{public}s.", dataset_field->name().c_str(),
638 physical_schema->field(physicalFieldIdx - 1)->name().c_str());
640 mappings.push_back({physicalFieldIdx, -1, fi});
645 auto*
tree = treeFragment->GetTree();
646 auto branches =
tree->GetListOfBranches();
647 size_t totalTreeSize = 0;
648 std::vector<TBranch*> selectedBranches;
649 for (
auto& mapping : mappings) {
650 selectedBranches.push_back((TBranch*)branches->At(mapping.mainBranchIdx));
651 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Generator",
"Adding branch %{public}s to stream.", selectedBranches.back()->GetName());
652 totalTreeSize += selectedBranches.back()->GetTotalSize();
653 if (mapping.vlaIdx != -1) {
654 selectedBranches.push_back((TBranch*)branches->At(mapping.vlaIdx));
655 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Generator",
"Adding branch %{public}s to stream.", selectedBranches.back()->GetName());
656 totalTreeSize += selectedBranches.back()->GetTotalSize();
660 size_t cacheSize = std::max(std::min(totalTreeSize, 25000000UL), 1000000UL);
662 tree->SetCacheSize(cacheSize);
663 for (
auto* branch : selectedBranches) {
664 tree->AddBranchToCache(branch,
false);
666 tree->StopCacheLearningPhase();
669 std::vector<ReadOps>& ops = treeFragment->ops();
671 ops.reserve(opsCount);
672 for (
size_t mi = 0; mi < mappings.size(); ++mi) {
676 auto physicalField = physical_schema->field(mapping.
mainBranchIdx);
678 if (mapping.
vlaIdx != -1) {
679 auto* branch = (TBranch*)branches->At(mapping.
vlaIdx);
682 .rootBranchEntries = branch->GetEntries(),
687 auto&
op = ops.back();
688 ARROW_ASSIGN_OR_RAISE(
op.targetBuffer, arrow::AllocateBuffer((
op.rootBranchEntries + 1) *
op.typeSize, pool));
693 auto& valueOp = ops.back();
694 valueOp.branch = (TBranch*)branches->At(mapping.
mainBranchIdx);
695 valueOp.rootBranchEntries = valueOp.branch->GetEntries();
698 auto listType = std::dynamic_pointer_cast<arrow::FixedSizeListType>(datasetField->type());
699 valueOp.typeSize = physicalField->type()->byte_width();
702 if ((datasetField->type() == arrow::boolean())) {
704 valueOp.listSize = 1;
705 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp((valueOp.rootBranchEntries + 7) / 8);
706 }
else if (listType && datasetField->type()->field(0)->type() == arrow::boolean()) {
707 valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width();
708 valueOp.listSize = listType->list_size();
710 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp((valueOp.rootBranchEntries * valueOp.listSize) / 8 + 1);
711 }
else if (mapping.
vlaIdx != -1) {
712 valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width();
713 valueOp.listSize = -1;
716 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(ops[ops.size() - 2].offsetCount * valueOp.typeSize);
717 }
else if (listType) {
719 valueOp.listSize = listType->list_size();
720 valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width();
721 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(valueOp.rootBranchEntries * valueOp.typeSize * valueOp.listSize);
723 valueOp.typeSize = physicalField->type()->byte_width();
725 valueOp.listSize = 1;
726 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(valueOp.rootBranchEntries * valueOp.typeSize);
728 arrow::Status status;
729 std::shared_ptr<arrow::Array>
array;
732 auto varray = std::make_shared<arrow::PrimitiveArray>(datasetField->type()->field(0)->type(), valueOp.rootBranchEntries * valueOp.listSize, valueOp.targetBuffer);
733 array = std::make_shared<arrow::FixedSizeListArray>(datasetField->type(), valueOp.rootBranchEntries, varray);
735 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Op",
"Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
736 valueOp.branch->GetName(),
737 valueOp.rootBranchEntries,
738 valueOp.targetBuffer->size());
739 }
else if (mapping.
vlaIdx != -1) {
740 auto& offsetOp = ops[ops.size() - 2];
741 auto varray = std::make_shared<arrow::PrimitiveArray>(datasetField->type()->field(0)->type(), offsetOp.offsetCount, valueOp.targetBuffer);
743 array = std::make_shared<arrow::ListArray>(datasetField->type(), offsetOp.rootBranchEntries, offsetOp.targetBuffer, varray);
744 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Op",
"Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
745 offsetOp.branch->GetName(), offsetOp.rootBranchEntries, offsetOp.targetBuffer->size());
746 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Op",
"Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
747 valueOp.branch->GetName(),
748 offsetOp.offsetCount,
749 valueOp.targetBuffer->size());
751 array = std::make_shared<arrow::PrimitiveArray>(datasetField->type(), valueOp.rootBranchEntries, valueOp.targetBuffer);
752 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Op",
"Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
753 valueOp.branch->GetName(),
754 valueOp.rootBranchEntries,
755 valueOp.targetBuffer->size());
758 columns.push_back(
array);
764 for (
size_t i = 0;
i < ops.size(); ++
i) {
767 rows =
op.rootBranchEntries;
770 auto& offsetOp = ops[
i - 1];
771 rows = offsetOp.rootBranchEntries;
774 throw runtime_error_f(
"Unmatching number of rows for branch %s. Expected %lli, found %lli",
op.branch->GetName(),
rows,
op.rootBranchEntries);
777 throw runtime_error_f(
"Unmatching number of rows for branch %s. Expected %lli, found %lli",
op.branch->GetName(),
rows, ops[
i - 1].offsetCount);
781 auto batch = arrow::RecordBatch::Make(dataset_schema,
rows, columns);
782 totalCompressedSize +=
tree->GetZipBytes();
783 totalUncompressedSize +=
tree->GetTotBytes();
784 O2_SIGNPOST_END(root_arrow_fs, tid,
"Generator",
"Done creating batch compressed:%zu uncompressed:%zu", totalCompressedSize, totalUncompressedSize);
888 auto fs = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(
source.filesystem());
893 auto objectHandler = fs->GetObjectHandler(
source);
895 if (!objectHandler->format->Equals(*
this)) {
901 auto tree = objectHandler->GetObjectAsOwner<TTree>().release();
903 auto branches =
tree->GetListOfBranches();
904 auto n = branches->GetEntries();
906 std::vector<std::shared_ptr<arrow::Field>> fields;
908 bool prevIsSize =
false;
909 for (
auto i = 0;
i <
n; ++
i) {
910 auto branch =
static_cast<TBranch*
>(branches->At(
i));
911 std::string
name = branch->GetName();
912 if (prevIsSize && fields.back()->name() !=
name +
"_size") {
913 throw runtime_error_f(
"Unexpected layout for VLA container %s.", branch->GetName());
916 if (
name.ends_with(
"_size")) {
917 fields.emplace_back(std::make_shared<arrow::Field>(
name, arrow::int32()));
922 branch->GetExpectedType(cls,
type);
927 auto listSize =
static_cast<TLeaf*
>(branch->GetListOfLeaves()->At(0))->GetLenStatic();
934 if (fields.back()->name().ends_with(
"_size")) {
935 throw runtime_error_f(
"Missing values for VLA indices %s.", fields.back()->name().c_str());
937 return std::make_shared<arrow::Schema>(fields);
953 std::vector<TBranch*> branches;
954 std::vector<TBranch*> sizesBranches;
955 std::vector<std::shared_ptr<arrow::Array>> valueArrays;
956 std::vector<std::shared_ptr<arrow::Array>> sizeArrays;
957 std::vector<std::shared_ptr<arrow::DataType>> valueTypes;
959 std::vector<int64_t> valuesIdealBasketSize;
960 std::vector<int64_t> sizeIdealBasketSize;
962 std::vector<int64_t> typeSizes;
963 std::vector<int64_t> listSizes;
964 bool firstBasket =
true;
967 void finaliseBasketSize(std::shared_ptr<arrow::RecordBatch> firstBatch)
970 O2_SIGNPOST_START(root_arrow_fs, sid,
"finaliseBasketSize",
"First batch with %lli rows received and %zu columns",
971 firstBatch->num_rows(), firstBatch->columns().size());
972 for (
size_t i = 0;
i < branches.size();
i++) {
973 auto* branch = branches[
i];
974 auto* sizeBranch = sizesBranches[
i];
976 int valueSize = valueTypes[
i]->byte_width();
977 if (listSizes[
i] == 1) {
978 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid,
"finaliseBasketSize",
"Branch %s exists and uses %d bytes per entry for %lli entries.",
979 branch->GetName(), valueSize, firstBatch->num_rows());
980 assert(sizeBranch ==
nullptr);
981 branch->SetBasketSize(1024 + firstBatch->num_rows() * valueSize);
982 }
else if (listSizes[
i] == -1) {
983 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid,
"finaliseBasketSize",
"Branch %s exists and uses %d bytes per entry.",
984 branch->GetName(), valueSize);
986 auto column = firstBatch->GetColumnByName(schema_->field(
i)->name());
987 auto list = std::static_pointer_cast<arrow::ListArray>(column);
988 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid,
"finaliseBasketSize",
"Branch %s needed. Associated size branch %s and there are %lli entries of size %d in that list.",
989 branch->GetName(), sizeBranch->GetName(),
list->length(), valueSize);
990 branch->SetBasketSize(1024 + firstBatch->num_rows() * valueSize *
list->length());
991 sizeBranch->SetBasketSize(1024 + firstBatch->num_rows() * 4);
993 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid,
"finaliseBasketSize",
"Branch %s needed. There are %lli entries per array of size %d in that list.",
994 branch->GetName(), listSizes[
i], valueSize);
995 assert(sizeBranch ==
nullptr);
996 branch->SetBasketSize(1024 + firstBatch->num_rows() * valueSize * listSizes[
i]);
999 auto field = firstBatch->schema()->field(
i);
1000 if (field->name().starts_with(
"fIndexArray")) {
1002 int idealBasketSize = 4 * firstBatch->num_rows() + 1024 + field->type()->byte_width() * firstBatch->num_rows();
1003 int basketSize = std::max(32000, idealBasketSize);
1004 sizeBranch->SetBasketSize(basketSize);
1005 branch->SetBasketSize(basketSize);
1014 TTreeFileWriter(std::shared_ptr<arrow::Schema> schema, std::shared_ptr<arrow::dataset::FileWriteOptions> options,
1015 std::shared_ptr<arrow::io::OutputStream> destination,
1016 arrow::fs::FileLocator destination_locator)
1017 : FileWriter(schema, options, destination, destination_locator)
1020 auto directoryStream = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(destination_);
1021 auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
1023 if (directoryStream.get()) {
1024 TDirectoryFile* dir = directoryStream->GetDirectory();
1026 auto*
tree =
new TTree(destination_locator_.path.c_str(),
"");
1027 treeStream = std::make_shared<TTreeOutputStream>(
tree,
"");
1028 }
else if (treeStream.get()) {
1032 auto*
tree = treeStream->GetTree();
1033 treeStream = std::make_shared<TTreeOutputStream>(
tree, destination_locator_.path);
1036 throw std::runtime_error(
"Unsupported backend.");
1039 for (
auto i = 0u;
i < schema->fields().
size(); ++
i) {
1040 auto& field = schema->field(
i);
1041 listSizes.push_back(1);
1043 int valuesIdealBasketSize = 0;
1045 switch (field->type()->id()) {
1046 case arrow::Type::FIXED_SIZE_LIST: {
1047 listSizes.back() = std::static_pointer_cast<arrow::FixedSizeListType>(field->type())->list_size();
1048 valuesIdealBasketSize = 1024 + valueTypes.back()->byte_width() * listSizes.back();
1049 valueTypes.push_back(field->type()->field(0)->type());
1050 sizesBranches.push_back(
nullptr);
1051 std::string leafList = fmt::format(
"{}[{}]{}", field->name(), listSizes.back(),
rootSuffixFromArrow(valueTypes.back()->id()));
1052 branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
1054 case arrow::Type::LIST: {
1055 valueTypes.push_back(field->type()->field(0)->type());
1056 std::string leafList = fmt::format(
"{}[{}_size]{}", field->name(), field->name(),
rootSuffixFromArrow(valueTypes.back()->id()));
1057 listSizes.back() = -1;
1058 std::string sizeLeafList = field->name() +
"_size/I";
1059 sizesBranches.push_back(treeStream->CreateBranch((field->name() +
"_size").c_str(), sizeLeafList.c_str()));
1060 branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
1065 valueTypes.push_back(field->type());
1067 sizesBranches.push_back(
nullptr);
1068 branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
1075 arrow::Status
Write(
const std::shared_ptr<arrow::RecordBatch>& batch)
override
1078 firstBasket =
false;
1079 finaliseBasketSize(batch);
1083 if (batch->columns().empty() || batch->num_rows() == 0) {
1084 return arrow::Status::OK();
1088 auto directoryStream = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(destination_);
1089 TTree*
tree =
nullptr;
1090 if (directoryStream.get()) {
1091 TDirectoryFile* dir = directoryStream->GetDirectory();
1092 tree = (TTree*)dir->Get(destination_locator_.path.c_str());
1094 auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
1098 throw std::runtime_error(
"Unsupported backend.");
1101 for (
auto i = 0u;
i < batch->columns().
size(); ++
i) {
1102 auto column = batch->column(
i);
1103 auto& field = batch->schema()->field(
i);
1105 valueArrays.push_back(
nullptr);
1107 switch (field->type()->id()) {
1108 case arrow::Type::FIXED_SIZE_LIST: {
1109 auto list = std::static_pointer_cast<arrow::FixedSizeListArray>(column);
1110 if (
list->list_type()->field(0)->type()->id() == arrow::Type::BOOL) {
1111 int64_t
length =
list->length() *
list->list_type()->list_size();
1112 arrow::UInt8Builder builder;
1113 auto ok = builder.Reserve(
length);
1116 auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(
list->values());
1118 if (boolArray->IsValid(
i)) {
1120 uint8_t
value = boolArray->Value(
i) ? 1 : 0;
1121 auto ok = builder.Append(
value);
1124 auto ok = builder.AppendNull();
1127 valueArrays.back() = *builder.Finish();
1129 valueArrays.back() =
list->values();
1132 case arrow::Type::LIST: {
1133 auto list = std::static_pointer_cast<arrow::ListArray>(column);
1134 valueArrays.back() =
list->values();
1136 case arrow::Type::BOOL: {
1139 auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(column);
1141 int64_t
length = boolArray->length();
1142 arrow::UInt8Builder builder;
1143 auto ok = builder.Reserve(
length);
1146 if (boolArray->IsValid(
i)) {
1148 uint8_t
value = boolArray->Value(
i) ? 1 : 0;
1149 auto ok = builder.Append(
value);
1152 auto ok = builder.AppendNull();
1155 valueArrays.back() = *builder.Finish();
1158 valueArrays.back() = column;
1163 while (pos < batch->num_rows()) {
1164 for (
size_t bi = 0; bi < branches.size(); ++bi) {
1165 auto* branch = branches[bi];
1166 auto* sizeBranch = sizesBranches[bi];
1167 auto array = batch->column(bi);
1168 auto& field = batch->schema()->field(bi);
1169 auto& listSize = listSizes[bi];
1170 auto valueType = valueTypes[bi];
1171 auto valueArray = valueArrays[bi];
1173 switch (field->type()->id()) {
1174 case arrow::Type::LIST: {
1175 auto list = std::static_pointer_cast<arrow::ListArray>(
array);
1176 listSize =
list->value_length(
pos);
1177 uint8_t
const*
buffer = std::static_pointer_cast<arrow::PrimitiveArray>(valueArray)->values()->data() +
array->offset() +
list->value_offset(
pos) * valueType->byte_width();
1178 branch->SetAddress((
void*)
buffer);
1179 sizeBranch->SetAddress(&listSize);
1181 case arrow::Type::FIXED_SIZE_LIST:
1184 auto byteWidth = valueType->byte_width() ? valueType->byte_width() : 1;
1185 uint8_t
const*
buffer = std::static_pointer_cast<arrow::PrimitiveArray>(valueArray)->values()->data() +
array->offset() +
pos * listSize * byteWidth;
1186 branch->SetAddress((
void*)
buffer);
1193 return arrow::Status::OK();
1198 auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
1199 auto*
tree = treeStream->GetTree();
1200 tree->Write(
"", TObject::kOverwrite);
1201 tree->SetDirectory(
nullptr);