591 const std::shared_ptr<arrow::dataset::ScanOptions>& options,
592 const std::shared_ptr<arrow::dataset::FileFragment>& fragment)
const
594 assert(options->dataset_schema !=
nullptr);
596 auto dataset_schema = options->dataset_schema;
597 auto treeFragment = std::dynamic_pointer_cast<TTreeFileFragment>(fragment);
598 if (treeFragment.get() ==
nullptr) {
599 return {arrow::Status::NotImplemented(
"Not a ttree fragment")};
602 auto generator = [pool = options->pool, treeFragment, dataset_schema, &totalCompressedSize = mTotCompressedSize,
603 &totalUncompressedSize = mTotUncompressedSize]() -> arrow::Future<std::shared_ptr<arrow::RecordBatch>> {
605 O2_SIGNPOST_START(root_arrow_fs, tid,
"Generator",
"Creating batch for tree %{public}s", treeFragment->GetTree()->GetName());
606 std::vector<std::shared_ptr<arrow::Array>> columns;
607 std::vector<std::shared_ptr<arrow::Field>> fields = dataset_schema->fields();
608 auto physical_schema = *treeFragment->ReadPhysicalSchema();
610 if (dataset_schema->num_fields() > physical_schema->num_fields()) {
611 throw runtime_error_f(
"One TTree must have all the fields requested in a table");
615 std::vector<BranchFieldMapping> mappings;
619 for (
int fi = 0; fi < dataset_schema->num_fields(); ++fi) {
620 auto dataset_field = dataset_schema->field(fi);
623 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Generator",
"Processing dataset field %{public}s.", dataset_field->name().c_str());
624 int physicalFieldIdx = physical_schema->GetFieldIndex(dataset_field->name());
626 if (physicalFieldIdx < 0) {
627 throw runtime_error_f(
"Cannot find physical field associated to %s. Possible fields: %s",
628 dataset_field->name().c_str(), physical_schema->ToString().c_str());
630 if (physicalFieldIdx > 1 && physical_schema->field(physicalFieldIdx - 1)->name().ends_with(
"_size")) {
631 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Generator",
"Field %{public}s has sizes in %{public}s.", dataset_field->name().c_str(),
632 physical_schema->field(physicalFieldIdx - 1)->name().c_str());
633 mappings.push_back({physicalFieldIdx, physicalFieldIdx - 1, fi});
636 mappings.push_back({physicalFieldIdx, -1, fi});
641 auto*
tree = treeFragment->GetTree();
642 auto branches =
tree->GetListOfBranches();
643 size_t totalTreeSize = 0;
644 std::vector<TBranch*> selectedBranches;
645 for (
auto& mapping : mappings) {
646 selectedBranches.push_back((TBranch*)branches->At(mapping.mainBranchIdx));
647 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Generator",
"Adding branch %{public}s to stream.", selectedBranches.back()->GetName());
648 totalTreeSize += selectedBranches.back()->GetTotalSize();
649 if (mapping.vlaIdx != -1) {
650 selectedBranches.push_back((TBranch*)branches->At(mapping.vlaIdx));
651 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Generator",
"Adding branch %{public}s to stream.", selectedBranches.back()->GetName());
652 totalTreeSize += selectedBranches.back()->GetTotalSize();
656 size_t cacheSize = std::max(std::min(totalTreeSize, 25000000UL), 1000000UL);
658 tree->SetCacheSize(cacheSize);
659 for (
auto* branch : selectedBranches) {
660 tree->AddBranchToCache(branch,
false);
662 tree->StopCacheLearningPhase();
665 std::vector<ReadOps>& ops = treeFragment->ops();
667 ops.reserve(opsCount);
668 for (
size_t mi = 0; mi < mappings.size(); ++mi) {
672 auto physicalField = physical_schema->field(mapping.
mainBranchIdx);
674 if (mapping.
vlaIdx != -1) {
675 auto* branch = (TBranch*)branches->At(mapping.
vlaIdx);
678 .rootBranchEntries = branch->GetEntries(),
683 auto&
op = ops.back();
684 ARROW_ASSIGN_OR_RAISE(
op.targetBuffer, arrow::AllocateBuffer((
op.rootBranchEntries + 1) *
op.typeSize, pool));
689 auto& valueOp = ops.back();
690 valueOp.branch = (TBranch*)branches->At(mapping.
mainBranchIdx);
691 valueOp.rootBranchEntries = valueOp.branch->GetEntries();
694 auto listType = std::dynamic_pointer_cast<arrow::FixedSizeListType>(datasetField->type());
695 valueOp.typeSize = physicalField->type()->byte_width();
698 if ((datasetField->type() == arrow::boolean())) {
700 valueOp.listSize = 1;
701 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp((valueOp.rootBranchEntries) / 8 + 1);
702 }
else if (listType && datasetField->type()->field(0)->type() == arrow::boolean()) {
703 valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width();
704 valueOp.listSize = listType->list_size();
706 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp((valueOp.rootBranchEntries * valueOp.listSize) / 8 + 1);
707 }
else if (mapping.
vlaIdx != -1) {
708 valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width();
709 valueOp.listSize = -1;
712 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(ops[ops.size() - 2].offsetCount * valueOp.typeSize);
713 }
else if (listType) {
715 valueOp.listSize = listType->list_size();
716 valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width();
717 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(valueOp.rootBranchEntries * valueOp.typeSize * valueOp.listSize);
719 valueOp.typeSize = physicalField->type()->byte_width();
721 valueOp.listSize = 1;
722 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(valueOp.rootBranchEntries * valueOp.typeSize);
724 arrow::Status status;
725 std::shared_ptr<arrow::Array>
array;
728 auto varray = std::make_shared<arrow::PrimitiveArray>(datasetField->type()->field(0)->type(), valueOp.rootBranchEntries * valueOp.listSize, valueOp.targetBuffer);
729 array = std::make_shared<arrow::FixedSizeListArray>(datasetField->type(), valueOp.rootBranchEntries, varray);
731 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Op",
"Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
732 valueOp.branch->GetName(),
733 valueOp.rootBranchEntries,
734 valueOp.targetBuffer->size());
735 }
else if (mapping.
vlaIdx != -1) {
736 auto& offsetOp = ops[ops.size() - 2];
737 auto varray = std::make_shared<arrow::PrimitiveArray>(datasetField->type()->field(0)->type(), offsetOp.offsetCount, valueOp.targetBuffer);
739 array = std::make_shared<arrow::ListArray>(datasetField->type(), offsetOp.rootBranchEntries, offsetOp.targetBuffer, varray);
740 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Op",
"Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
741 offsetOp.branch->GetName(), offsetOp.rootBranchEntries, offsetOp.targetBuffer->size());
742 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Op",
"Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
743 valueOp.branch->GetName(),
744 offsetOp.offsetCount,
745 valueOp.targetBuffer->size());
747 array = std::make_shared<arrow::PrimitiveArray>(datasetField->type(), valueOp.rootBranchEntries, valueOp.targetBuffer);
748 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Op",
"Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
749 valueOp.branch->GetName(),
750 valueOp.rootBranchEntries,
751 valueOp.targetBuffer->size());
754 columns.push_back(
array);
760 for (
size_t i = 0;
i < ops.size(); ++
i) {
763 rows =
op.rootBranchEntries;
766 auto& offsetOp = ops[
i - 1];
767 rows = offsetOp.rootBranchEntries;
770 throw runtime_error_f(
"Unmatching number of rows for branch %s. Expected %lli, found %lli",
op.branch->GetName(),
rows,
op.rootBranchEntries);
773 throw runtime_error_f(
"Unmatching number of rows for branch %s. Expected %lli, found %lli",
op.branch->GetName(),
rows, ops[
i - 1].offsetCount);
777 auto batch = arrow::RecordBatch::Make(dataset_schema,
rows, columns);
778 totalCompressedSize +=
tree->GetZipBytes();
779 totalUncompressedSize +=
tree->GetTotBytes();
780 O2_SIGNPOST_END(root_arrow_fs, tid,
"Generator",
"Done creating batch compressed:%zu uncompressed:%zu", totalCompressedSize, totalUncompressedSize);
884 auto fs = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(
source.filesystem());
889 auto objectHandler = fs->GetObjectHandler(
source);
891 if (!objectHandler->format->Equals(*
this)) {
897 auto tree = objectHandler->GetObjectAsOwner<TTree>().release();
899 auto branches =
tree->GetListOfBranches();
900 auto n = branches->GetEntries();
902 std::vector<std::shared_ptr<arrow::Field>> fields;
904 bool prevIsSize =
false;
905 for (
auto i = 0;
i <
n; ++
i) {
906 auto branch =
static_cast<TBranch*
>(branches->At(
i));
907 std::string
name = branch->GetName();
908 if (prevIsSize && fields.back()->name() !=
name +
"_size") {
909 throw runtime_error_f(
"Unexpected layout for VLA container %s.", branch->GetName());
912 if (
name.ends_with(
"_size")) {
913 fields.emplace_back(std::make_shared<arrow::Field>(
name, arrow::int32()));
918 branch->GetExpectedType(cls,
type);
923 auto listSize =
static_cast<TLeaf*
>(branch->GetListOfLeaves()->At(0))->GetLenStatic();
930 if (fields.back()->name().ends_with(
"_size")) {
931 throw runtime_error_f(
"Missing values for VLA indices %s.", fields.back()->name().c_str());
933 return std::make_shared<arrow::Schema>(fields);
949 std::vector<TBranch*> branches;
950 std::vector<TBranch*> sizesBranches;
951 std::vector<std::shared_ptr<arrow::Array>> valueArrays;
952 std::vector<std::shared_ptr<arrow::Array>> sizeArrays;
953 std::vector<std::shared_ptr<arrow::DataType>> valueTypes;
955 std::vector<int64_t> valuesIdealBasketSize;
956 std::vector<int64_t> sizeIdealBasketSize;
958 std::vector<int64_t> typeSizes;
959 std::vector<int64_t> listSizes;
960 bool firstBasket =
true;
963 void finaliseBasketSize(std::shared_ptr<arrow::RecordBatch> firstBatch)
966 O2_SIGNPOST_START(root_arrow_fs, sid,
"finaliseBasketSize",
"First batch with %lli rows received and %zu columns",
967 firstBatch->num_rows(), firstBatch->columns().size());
968 for (
size_t i = 0;
i < branches.size();
i++) {
969 auto* branch = branches[
i];
970 auto* sizeBranch = sizesBranches[
i];
972 int valueSize = valueTypes[
i]->byte_width();
973 if (listSizes[
i] == 1) {
974 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid,
"finaliseBasketSize",
"Branch %s exists and uses %d bytes per entry for %lli entries.",
975 branch->GetName(), valueSize, firstBatch->num_rows());
976 assert(sizeBranch ==
nullptr);
977 branch->SetBasketSize(1024 + firstBatch->num_rows() * valueSize);
978 }
else if (listSizes[
i] == -1) {
979 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid,
"finaliseBasketSize",
"Branch %s exists and uses %d bytes per entry.",
980 branch->GetName(), valueSize);
982 auto column = firstBatch->GetColumnByName(schema_->field(
i)->name());
983 auto list = std::static_pointer_cast<arrow::ListArray>(column);
984 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid,
"finaliseBasketSize",
"Branch %s needed. Associated size branch %s and there are %lli entries of size %d in that list.",
985 branch->GetName(), sizeBranch->GetName(),
list->length(), valueSize);
986 branch->SetBasketSize(1024 + firstBatch->num_rows() * valueSize *
list->length());
987 sizeBranch->SetBasketSize(1024 + firstBatch->num_rows() * 4);
989 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid,
"finaliseBasketSize",
"Branch %s needed. There are %lli entries per array of size %d in that list.",
990 branch->GetName(), listSizes[
i], valueSize);
991 assert(sizeBranch ==
nullptr);
992 branch->SetBasketSize(1024 + firstBatch->num_rows() * valueSize * listSizes[
i]);
995 auto field = firstBatch->schema()->field(
i);
996 if (field->name().starts_with(
"fIndexArray")) {
998 int idealBasketSize = 4 * firstBatch->num_rows() + 1024 + field->type()->byte_width() * firstBatch->num_rows();
999 int basketSize = std::max(32000, idealBasketSize);
1000 sizeBranch->SetBasketSize(basketSize);
1001 branch->SetBasketSize(basketSize);
1010 TTreeFileWriter(std::shared_ptr<arrow::Schema> schema, std::shared_ptr<arrow::dataset::FileWriteOptions> options,
1011 std::shared_ptr<arrow::io::OutputStream> destination,
1012 arrow::fs::FileLocator destination_locator)
1013 : FileWriter(schema, options, destination, destination_locator)
1016 auto directoryStream = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(destination_);
1017 auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
1019 if (directoryStream.get()) {
1020 TDirectoryFile* dir = directoryStream->GetDirectory();
1022 auto*
tree =
new TTree(destination_locator_.path.c_str(),
"");
1023 treeStream = std::make_shared<TTreeOutputStream>(
tree,
"");
1024 }
else if (treeStream.get()) {
1028 auto*
tree = treeStream->GetTree();
1029 treeStream = std::make_shared<TTreeOutputStream>(
tree, destination_locator_.path);
1032 throw std::runtime_error(
"Unsupported backend.");
1035 for (
auto i = 0u;
i < schema->fields().
size(); ++
i) {
1036 auto& field = schema->field(
i);
1037 listSizes.push_back(1);
1039 int valuesIdealBasketSize = 0;
1041 switch (field->type()->id()) {
1042 case arrow::Type::FIXED_SIZE_LIST: {
1043 listSizes.back() = std::static_pointer_cast<arrow::FixedSizeListType>(field->type())->list_size();
1044 valuesIdealBasketSize = 1024 + valueTypes.back()->byte_width() * listSizes.back();
1045 valueTypes.push_back(field->type()->field(0)->type());
1046 sizesBranches.push_back(
nullptr);
1047 std::string leafList = fmt::format(
"{}[{}]{}", field->name(), listSizes.back(),
rootSuffixFromArrow(valueTypes.back()->id()));
1048 branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
1050 case arrow::Type::LIST: {
1051 valueTypes.push_back(field->type()->field(0)->type());
1052 std::string leafList = fmt::format(
"{}[{}_size]{}", field->name(), field->name(),
rootSuffixFromArrow(valueTypes.back()->id()));
1053 listSizes.back() = -1;
1054 std::string sizeLeafList = field->name() +
"_size/I";
1055 sizesBranches.push_back(treeStream->CreateBranch((field->name() +
"_size").c_str(), sizeLeafList.c_str()));
1056 branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
1061 valueTypes.push_back(field->type());
1063 sizesBranches.push_back(
nullptr);
1064 branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
1071 arrow::Status
Write(
const std::shared_ptr<arrow::RecordBatch>& batch)
override
1074 firstBasket =
false;
1075 finaliseBasketSize(batch);
1079 if (batch->columns().empty() || batch->num_rows() == 0) {
1080 return arrow::Status::OK();
1084 auto directoryStream = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(destination_);
1085 TTree*
tree =
nullptr;
1086 if (directoryStream.get()) {
1087 TDirectoryFile* dir = directoryStream->GetDirectory();
1088 tree = (TTree*)dir->Get(destination_locator_.path.c_str());
1090 auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
1094 throw std::runtime_error(
"Unsupported backend.");
1097 for (
auto i = 0u;
i < batch->columns().
size(); ++
i) {
1098 auto column = batch->column(
i);
1099 auto& field = batch->schema()->field(
i);
1101 valueArrays.push_back(
nullptr);
1103 switch (field->type()->id()) {
1104 case arrow::Type::FIXED_SIZE_LIST: {
1105 auto list = std::static_pointer_cast<arrow::FixedSizeListArray>(column);
1106 if (
list->list_type()->field(0)->type()->id() == arrow::Type::BOOL) {
1107 int64_t
length =
list->length() *
list->list_type()->list_size();
1108 arrow::UInt8Builder builder;
1109 auto ok = builder.Reserve(
length);
1112 auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(
list->values());
1114 if (boolArray->IsValid(
i)) {
1116 uint8_t
value = boolArray->Value(
i) ? 1 : 0;
1117 auto ok = builder.Append(
value);
1120 auto ok = builder.AppendNull();
1123 valueArrays.back() = *builder.Finish();
1125 valueArrays.back() =
list->values();
1128 case arrow::Type::LIST: {
1129 auto list = std::static_pointer_cast<arrow::ListArray>(column);
1130 valueArrays.back() =
list->values();
1132 case arrow::Type::BOOL: {
1135 auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(column);
1137 int64_t
length = boolArray->length();
1138 arrow::UInt8Builder builder;
1139 auto ok = builder.Reserve(
length);
1142 if (boolArray->IsValid(
i)) {
1144 uint8_t
value = boolArray->Value(
i) ? 1 : 0;
1145 auto ok = builder.Append(
value);
1148 auto ok = builder.AppendNull();
1151 valueArrays.back() = *builder.Finish();
1154 valueArrays.back() = column;
1159 while (pos < batch->num_rows()) {
1160 for (
size_t bi = 0; bi < branches.size(); ++bi) {
1161 auto* branch = branches[bi];
1162 auto* sizeBranch = sizesBranches[bi];
1163 auto array = batch->column(bi);
1164 auto& field = batch->schema()->field(bi);
1165 auto& listSize = listSizes[bi];
1166 auto valueType = valueTypes[bi];
1167 auto valueArray = valueArrays[bi];
1169 switch (field->type()->id()) {
1170 case arrow::Type::LIST: {
1171 auto list = std::static_pointer_cast<arrow::ListArray>(
array);
1172 listSize =
list->value_length(
pos);
1173 uint8_t
const*
buffer = std::static_pointer_cast<arrow::PrimitiveArray>(valueArray)->values()->data() +
array->offset() +
list->value_offset(
pos) * valueType->byte_width();
1174 branch->SetAddress((
void*)
buffer);
1175 sizeBranch->SetAddress(&listSize);
1177 case arrow::Type::FIXED_SIZE_LIST:
1180 auto byteWidth = valueType->byte_width() ? valueType->byte_width() : 1;
1181 uint8_t
const*
buffer = std::static_pointer_cast<arrow::PrimitiveArray>(valueArray)->values()->data() +
array->offset() +
pos * listSize * byteWidth;
1182 branch->SetAddress((
void*)
buffer);
1189 return arrow::Status::OK();
1194 auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
1195 auto*
tree = treeStream->GetTree();
1196 tree->Write(
"", TObject::kOverwrite);
1197 tree->SetDirectory(
nullptr);
#define O2_SIGNPOST_END(log, id, name, format,...)