593 const std::shared_ptr<arrow::dataset::ScanOptions>& options,
594 const std::shared_ptr<arrow::dataset::FileFragment>& fragment)
const
596 assert(options->dataset_schema !=
nullptr);
598 auto dataset_schema = options->dataset_schema;
599 auto treeFragment = std::dynamic_pointer_cast<TTreeFileFragment>(fragment);
600 if (treeFragment.get() ==
nullptr) {
601 return {arrow::Status::NotImplemented(
"Not a ttree fragment")};
604 auto generator = [pool = options->pool, treeFragment, dataset_schema, &totalCompressedSize = mTotCompressedSize,
605 &totalUncompressedSize = mTotUncompressedSize]() -> arrow::Future<std::shared_ptr<arrow::RecordBatch>> {
607 O2_SIGNPOST_START(root_arrow_fs, tid,
"Generator",
"Creating batch for tree %{public}s", treeFragment->GetTree()->GetName());
608 std::vector<std::shared_ptr<arrow::Array>> columns;
609 std::vector<std::shared_ptr<arrow::Field>>
fields = dataset_schema->fields();
610 auto physical_schema = *treeFragment->ReadPhysicalSchema();
612 if (dataset_schema->num_fields() > physical_schema->num_fields()) {
613 throw runtime_error_f(
"One TTree must have all the fields requested in a table");
617 std::vector<BranchFieldMapping> mappings;
621 for (
int fi = 0; fi < dataset_schema->num_fields(); ++fi) {
622 auto dataset_field = dataset_schema->field(fi);
625 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Generator",
"Processing dataset field %{public}s.", dataset_field->name().c_str());
626 int physicalFieldIdx = physical_schema->GetFieldIndex(dataset_field->name());
628 if (physicalFieldIdx < 0) {
629 throw runtime_error_f(
"Cannot find physical field associated to %s. Possible fields: %s",
630 dataset_field->name().c_str(), physical_schema->ToString().c_str());
632 if (physicalFieldIdx > 0 && physical_schema->field(physicalFieldIdx - 1)->name().ends_with(
"_size")) {
633 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Generator",
"Field %{public}s has sizes in %{public}s.", dataset_field->name().c_str(),
634 physical_schema->field(physicalFieldIdx - 1)->name().c_str());
635 mappings.push_back({physicalFieldIdx, physicalFieldIdx - 1, fi});
638 if (physicalFieldIdx > 0) {
639 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Generator",
"Field %{public}s previous field is %{public}s.", dataset_field->name().c_str(),
640 physical_schema->field(physicalFieldIdx - 1)->name().c_str());
642 mappings.push_back({physicalFieldIdx, -1, fi});
647 auto*
tree = treeFragment->GetTree();
648 auto branches =
tree->GetListOfBranches();
649 size_t totalTreeSize = 0;
650 std::vector<TBranch*> selectedBranches;
651 for (
auto& mapping : mappings) {
652 selectedBranches.push_back((TBranch*)branches->At(mapping.mainBranchIdx));
653 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Generator",
"Adding branch %{public}s to stream.", selectedBranches.back()->GetName());
654 totalTreeSize += selectedBranches.back()->GetTotalSize();
655 if (mapping.vlaIdx != -1) {
656 selectedBranches.push_back((TBranch*)branches->At(mapping.vlaIdx));
657 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Generator",
"Adding branch %{public}s to stream.", selectedBranches.back()->GetName());
658 totalTreeSize += selectedBranches.back()->GetTotalSize();
662 size_t cacheSize = std::max(std::min(totalTreeSize, 25000000UL), 1000000UL);
664 tree->SetCacheSize(cacheSize);
665 for (
auto* branch : selectedBranches) {
666 tree->AddBranchToCache(branch,
false);
668 tree->StopCacheLearningPhase();
671 std::vector<ReadOps>& ops = treeFragment->ops();
673 ops.reserve(opsCount);
674 for (
size_t mi = 0; mi < mappings.size(); ++mi) {
677 auto datasetField = dataset_schema->field(mapping.datasetFieldIdx);
678 auto physicalField = physical_schema->field(mapping.mainBranchIdx);
680 if (mapping.vlaIdx != -1) {
681 auto* branch = (TBranch*)branches->At(mapping.vlaIdx);
684 .rootBranchEntries = branch->GetEntries(),
689 auto&
op = ops.back();
690 ARROW_ASSIGN_OR_RAISE(
op.targetBuffer, arrow::AllocateBuffer((
op.rootBranchEntries + 1) *
op.typeSize, pool));
695 auto& valueOp = ops.back();
696 valueOp.branch = (TBranch*)branches->At(mapping.mainBranchIdx);
697 valueOp.rootBranchEntries = valueOp.branch->GetEntries();
700 auto listType = std::dynamic_pointer_cast<arrow::FixedSizeListType>(datasetField->type());
701 valueOp.typeSize = physicalField->type()->byte_width();
704 if ((datasetField->type() == arrow::boolean())) {
706 valueOp.listSize = 1;
707 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp((valueOp.rootBranchEntries + 7) / 8);
708 }
else if (listType && datasetField->type()->field(0)->type() == arrow::boolean()) {
709 valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width();
710 valueOp.listSize = listType->list_size();
712 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp((valueOp.rootBranchEntries * valueOp.listSize) / 8 + 1);
713 }
else if (mapping.vlaIdx != -1) {
714 valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width();
715 valueOp.listSize = -1;
718 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(ops[ops.size() - 2].offsetCount * valueOp.typeSize);
719 }
else if (listType) {
721 valueOp.listSize = listType->list_size();
722 valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width();
723 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(valueOp.rootBranchEntries * valueOp.typeSize * valueOp.listSize);
725 valueOp.typeSize = physicalField->type()->byte_width();
727 valueOp.listSize = 1;
728 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(valueOp.rootBranchEntries * valueOp.typeSize);
730 arrow::Status status;
731 std::shared_ptr<arrow::Array>
array;
734 auto vdata = std::make_shared<arrow::ArrayData>(datasetField->type()->field(0)->type(), valueOp.rootBranchEntries * valueOp.listSize,
735 std::vector<std::shared_ptr<arrow::Buffer>>{nullptr, valueOp.targetBuffer});
736 array = std::make_shared<arrow::FixedSizeListArray>(datasetField->type(), valueOp.rootBranchEntries, arrow::MakeArray(vdata));
738 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Op",
"Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
739 valueOp.branch->GetName(),
740 valueOp.rootBranchEntries,
741 valueOp.targetBuffer->size());
742 }
else if (mapping.vlaIdx != -1) {
743 auto& offsetOp = ops[ops.size() - 2];
744 auto vdata = std::make_shared<arrow::ArrayData>(datasetField->type()->field(0)->type(), offsetOp.offsetCount,
745 std::vector<std::shared_ptr<arrow::Buffer>>{nullptr, valueOp.targetBuffer});
747 array = std::make_shared<arrow::ListArray>(datasetField->type(), offsetOp.rootBranchEntries, offsetOp.targetBuffer, arrow::MakeArray(vdata));
748 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Op",
"Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
749 offsetOp.branch->GetName(), offsetOp.rootBranchEntries, offsetOp.targetBuffer->size());
750 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Op",
"Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
751 valueOp.branch->GetName(),
752 offsetOp.offsetCount,
753 valueOp.targetBuffer->size());
755 auto data = std::make_shared<arrow::ArrayData>(datasetField->type(), valueOp.rootBranchEntries,
756 std::vector<std::shared_ptr<arrow::Buffer>>{nullptr, valueOp.targetBuffer});
758 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid,
"Op",
"Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
759 valueOp.branch->GetName(),
760 valueOp.rootBranchEntries,
761 valueOp.targetBuffer->size());
764 columns.push_back(
array);
770 for (
size_t i = 0;
i < ops.size(); ++
i) {
773 rows =
op.rootBranchEntries;
776 auto& offsetOp = ops[
i - 1];
777 rows = offsetOp.rootBranchEntries;
780 throw runtime_error_f(
"Unmatching number of rows for branch %s. Expected %lli, found %lli",
op.branch->GetName(),
rows,
op.rootBranchEntries);
783 throw runtime_error_f(
"Unmatching number of rows for branch %s. Expected %lli, found %lli",
op.branch->GetName(),
rows, ops[
i - 1].offsetCount);
787 auto batch = arrow::RecordBatch::Make(dataset_schema,
rows, columns);
788 totalCompressedSize +=
tree->GetZipBytes();
789 totalUncompressedSize +=
tree->GetTotBytes();
790 O2_SIGNPOST_END(root_arrow_fs, tid,
"Generator",
"Done creating batch compressed:%zu uncompressed:%zu", totalCompressedSize, totalUncompressedSize);
959 std::vector<TBranch*> branches;
960 std::vector<TBranch*> sizesBranches;
961 std::vector<std::shared_ptr<arrow::Array>> valueArrays;
962 std::vector<std::shared_ptr<arrow::Array>> sizeArrays;
963 std::vector<std::shared_ptr<arrow::DataType>> valueTypes;
965 std::vector<int64_t> valuesIdealBasketSize;
966 std::vector<int64_t> sizeIdealBasketSize;
968 std::vector<int64_t> typeSizes;
969 std::vector<int64_t> listSizes;
970 bool firstBasket =
true;
973 void finaliseBasketSize(std::shared_ptr<arrow::RecordBatch> firstBatch)
976 O2_SIGNPOST_START(root_arrow_fs, sid,
"finaliseBasketSize",
"First batch with %lli rows received and %zu columns",
977 firstBatch->num_rows(), firstBatch->columns().size());
978 for (
size_t i = 0;
i < branches.size();
i++) {
979 auto* branch = branches[
i];
980 auto* sizeBranch = sizesBranches[
i];
982 int valueSize = valueTypes[
i]->byte_width();
983 if (listSizes[
i] == 1) {
984 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid,
"finaliseBasketSize",
"Branch %s exists and uses %d bytes per entry for %lli entries.",
985 branch->GetName(), valueSize, firstBatch->num_rows());
986 assert(sizeBranch ==
nullptr);
987 branch->SetBasketSize(1024 + firstBatch->num_rows() * valueSize);
988 }
else if (listSizes[
i] == -1) {
989 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid,
"finaliseBasketSize",
"Branch %s exists and uses %d bytes per entry.",
990 branch->GetName(), valueSize);
992 auto column = firstBatch->GetColumnByName(schema_->field(
i)->name());
993 auto list = std::static_pointer_cast<arrow::ListArray>(column);
994 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid,
"finaliseBasketSize",
"Branch %s needed. Associated size branch %s and there are %lli entries of size %d in that list.",
995 branch->GetName(), sizeBranch->GetName(), list->length(), valueSize);
996 branch->SetBasketSize(1024 + firstBatch->num_rows() * valueSize * list->length());
997 sizeBranch->SetBasketSize(1024 + firstBatch->num_rows() * 4);
999 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid,
"finaliseBasketSize",
"Branch %s needed. There are %lli entries per array of size %d in that list.",
1000 branch->GetName(), listSizes[
i], valueSize);
1001 assert(sizeBranch ==
nullptr);
1002 branch->SetBasketSize(1024 + firstBatch->num_rows() * valueSize * listSizes[
i]);
1005 auto field = firstBatch->schema()->field(
i);
1006 if (field->name().starts_with(
"fIndexArray")) {
1008 int idealBasketSize = 4 * firstBatch->num_rows() + 1024 + field->type()->byte_width() * firstBatch->num_rows();
1009 int basketSize = std::max(32000, idealBasketSize);
1010 sizeBranch->SetBasketSize(basketSize);
1011 branch->SetBasketSize(basketSize);
1021 std::shared_ptr<arrow::io::OutputStream> destination,
1022 arrow::fs::FileLocator destination_locator)
1023 : FileWriter(
schema, options, destination, destination_locator)
1026 auto directoryStream = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(destination_);
1027 auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
1029 if (directoryStream.get()) {
1030 TDirectoryFile* dir = directoryStream->GetDirectory();
1032 auto*
tree =
new TTree(destination_locator_.path.c_str(),
"");
1033 treeStream = std::make_shared<TTreeOutputStream>(
tree,
"");
1034 }
else if (treeStream.get()) {
1038 auto*
tree = treeStream->GetTree();
1039 treeStream = std::make_shared<TTreeOutputStream>(
tree, destination_locator_.path);
1042 throw std::runtime_error(
"Unsupported backend.");
1046 auto& field =
schema->field(
i);
1047 listSizes.push_back(1);
1049 int valuesIdealBasketSize = 0;
1051 switch (field->type()->id()) {
1052 case arrow::Type::FIXED_SIZE_LIST: {
1053 listSizes.back() = std::static_pointer_cast<arrow::FixedSizeListType>(field->type())->list_size();
1054 valuesIdealBasketSize = 1024 + valueTypes.back()->byte_width() * listSizes.back();
1055 valueTypes.push_back(field->type()->field(0)->type());
1056 sizesBranches.push_back(
nullptr);
1057 std::string leafList = fmt::format(
"{}[{}]{}", field->name(), listSizes.back(),
rootSuffixFromArrow(valueTypes.back()->id()));
1058 branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
1060 case arrow::Type::LIST: {
1061 valueTypes.push_back(field->type()->field(0)->type());
1062 std::string leafList = fmt::format(
"{}[{}_size]{}", field->name(), field->name(),
rootSuffixFromArrow(valueTypes.back()->id()));
1063 listSizes.back() = -1;
1064 std::string sizeLeafList = field->name() +
"_size/I";
1065 sizesBranches.push_back(treeStream->CreateBranch((field->name() +
"_size").c_str(), sizeLeafList.c_str()));
1066 branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
1071 valueTypes.push_back(field->type());
1073 sizesBranches.push_back(
nullptr);
1074 branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
1081 arrow::Status
Write(
const std::shared_ptr<arrow::RecordBatch>& batch)
override
1084 firstBasket =
false;
1085 finaliseBasketSize(batch);
1089 if (batch->columns().empty() || batch->num_rows() == 0) {
1090 return arrow::Status::OK();
1094 auto directoryStream = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(destination_);
1095 TTree*
tree =
nullptr;
1096 if (directoryStream.get()) {
1097 TDirectoryFile* dir = directoryStream->GetDirectory();
1098 tree = (TTree*)dir->Get(destination_locator_.path.c_str());
1100 auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
1104 throw std::runtime_error(
"Unsupported backend.");
1107 for (
auto i = 0u;
i < batch->columns().
size(); ++
i) {
1108 auto column = batch->column(
i);
1109 auto& field = batch->schema()->field(
i);
1111 valueArrays.push_back(
nullptr);
1113 switch (field->type()->id()) {
1114 case arrow::Type::FIXED_SIZE_LIST: {
1115 auto list = std::static_pointer_cast<arrow::FixedSizeListArray>(column);
1116 if (list->list_type()->field(0)->type()->id() == arrow::Type::BOOL) {
1117 int64_t
length = list->length() * list->list_type()->list_size();
1118 arrow::UInt8Builder builder;
1119 auto ok = builder.Reserve(
length);
1122 auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(list->values());
1124 if (boolArray->IsValid(
i)) {
1126 uint8_t
value = boolArray->Value(
i) ? 1 : 0;
1127 auto ok = builder.Append(
value);
1130 auto ok = builder.AppendNull();
1133 valueArrays.back() = *builder.Finish();
1135 valueArrays.back() = list->values();
1138 case arrow::Type::LIST: {
1139 auto list = std::static_pointer_cast<arrow::ListArray>(column);
1140 valueArrays.back() = list->values();
1142 case arrow::Type::BOOL: {
1145 auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(column);
1147 int64_t
length = boolArray->length();
1148 arrow::UInt8Builder builder;
1149 auto ok = builder.Reserve(
length);
1152 if (boolArray->IsValid(
i)) {
1154 uint8_t
value = boolArray->Value(
i) ? 1 : 0;
1155 auto ok = builder.Append(
value);
1158 auto ok = builder.AppendNull();
1161 valueArrays.back() = *builder.Finish();
1164 valueArrays.back() = column;
1169 while (pos < batch->num_rows()) {
1170 for (
size_t bi = 0; bi < branches.size(); ++bi) {
1171 auto* branch = branches[bi];
1172 auto* sizeBranch = sizesBranches[bi];
1173 auto array = batch->column(bi);
1174 auto& field = batch->schema()->field(bi);
1175 auto& listSize = listSizes[bi];
1176 auto valueType = valueTypes[bi];
1177 auto valueArray = valueArrays[bi];
1179 switch (field->type()->id()) {
1180 case arrow::Type::LIST: {
1181 auto list = std::static_pointer_cast<arrow::ListArray>(
array);
1182 listSize = list->value_length(
pos);
1183 uint8_t
const*
buffer = std::static_pointer_cast<arrow::PrimitiveArray>(valueArray)->values()->data() +
array->offset() + list->value_offset(
pos) * valueType->byte_width();
1184 branch->SetAddress((
void*)
buffer);
1185 sizeBranch->SetAddress(&listSize);
1187 case arrow::Type::FIXED_SIZE_LIST:
1190 auto byteWidth = valueType->byte_width() ? valueType->byte_width() : 1;
1191 uint8_t
const*
buffer = std::static_pointer_cast<arrow::PrimitiveArray>(valueArray)->values()->data() +
array->offset() +
pos * listSize * byteWidth;
1192 branch->SetAddress((
void*)
buffer);
1199 return arrow::Status::OK();
1204 auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
1205 auto*
tree = treeStream->GetTree();
1206 tree->Write(
"", TObject::kOverwrite);
1207 tree->SetDirectory(
nullptr);