607 const std::shared_ptr<arrow::dataset::ScanOptions>& options,
608 const std::shared_ptr<arrow::dataset::FileFragment>& fragment)
const
610 auto dataset_schema = options->dataset_schema;
611 auto ntupleFragment = std::dynamic_pointer_cast<RNTupleFileFragment>(fragment);
613 auto generator = [pool = options->pool, ntupleFragment, dataset_schema, &totalCompressedSize = mTotCompressedSize,
614 &totalUncompressedSize = mTotUncompressedSize]() -> arrow::Future<std::shared_ptr<arrow::RecordBatch>> {
615 using namespace ROOT::Experimental;
616 std::vector<std::shared_ptr<arrow::Array>> columns;
617 std::vector<std::shared_ptr<arrow::Field>> fields = dataset_schema->fields();
620 rns::RNTuple* rntuple = ntupleFragment->GetRNTuple();
621#if __has_include(<ROOT/RFieldBase.hxx>)
622 auto reader = rns::RNTupleReader::Open(*rntuple);
624 auto reader = rns::RNTupleReader::Open(rntuple);
626 auto& model = reader->GetModel();
627 for (
auto& physicalField : fields) {
628 auto bulk = model.CreateBulk(physicalField->name());
630 auto listType = std::dynamic_pointer_cast<arrow::FixedSizeListType>(physicalField->type());
632 auto& descriptor = reader->GetDescriptor();
633 auto totalEntries = reader->GetNEntries();
638 if (
rows != totalEntries) {
639 throw runtime_error_f(
"Unmatching number of rows for branch %s", physicalField->name().c_str());
641 arrow::Status status;
643 std::shared_ptr<arrow::Array>
array;
644 if (physicalField->type() == arrow::boolean() ||
645 (listType && physicalField->type()->field(0)->type() == arrow::boolean())) {
647 std::unique_ptr<arrow::ArrayBuilder> builder =
nullptr;
648 auto status = arrow::MakeBuilder(pool, physicalField->type()->field(0)->type(), &builder);
652 auto listBuilder = std::make_unique<arrow::FixedSizeListBuilder>(pool, std::move(builder), listType->list_size());
653 auto valueBuilder = listBuilder.get()->value_builder();
655 status = valueBuilder->Reserve(totalEntries * listType->list_size());
656 status &= listBuilder->Reserve(totalEntries);
658 throw runtime_error(
"Failed to reserve memory for array builder");
660 auto clusterIt = descriptor.FindClusterId(0, 0);
663 while (clusterIt != rns::kInvalidDescriptorId) {
664 auto&
index = descriptor.GetClusterDescriptor(clusterIt);
665 auto mask = std::make_unique<bool[]>(
index.GetNEntries());
666 std::fill(
mask.get(),
mask.get() +
index.GetNEntries(),
true);
668 int readLast =
index.GetNEntries();
669 readEntries += readLast;
670 status &=
static_cast<arrow::BooleanBuilder*
>(valueBuilder)->AppendValues(
reinterpret_cast<uint8_t const*
>(
ptr), readLast * listType->list_size());
671 clusterIt = descriptor.FindNextClusterId(clusterIt);
673 status &=
static_cast<arrow::FixedSizeListBuilder*
>(listBuilder.get())->AppendValues(readEntries);
677 status &= listBuilder->Finish(&
array);
681 }
else if (listType ==
nullptr) {
682 std::unique_ptr<arrow::ArrayBuilder> builder =
nullptr;
683 auto status = arrow::MakeBuilder(pool, physicalField->type(), &builder);
687 auto valueBuilder =
static_cast<arrow::BooleanBuilder*
>(builder.get());
689 status = valueBuilder->Reserve(totalEntries);
691 throw runtime_error(
"Failed to reserve memory for array builder");
693 auto clusterIt = descriptor.FindClusterId(0, 0);
694 while (clusterIt != rns::kInvalidDescriptorId) {
695 auto&
index = descriptor.GetClusterDescriptor(clusterIt);
696 auto mask = std::make_unique<bool[]>(
index.GetNEntries());
697 std::fill(
mask.get(),
mask.get() +
index.GetNEntries(),
true);
699 int readLast =
index.GetNEntries();
700 readEntries += readLast;
701 status &= valueBuilder->AppendValues(
reinterpret_cast<uint8_t const*
>(
ptr), readLast);
702 clusterIt = descriptor.FindNextClusterId(clusterIt);
707 status &= valueBuilder->Finish(&
array);
714 auto typeSize = physicalField->type()->byte_width();
717 auto branchSize = bytes ? bytes : 1000000;
718 auto&&
result = arrow::AllocateResizableBuffer(branchSize, pool);
722 std::shared_ptr<arrow::Buffer> arrowValuesBuffer = std::move(
result).ValueUnsafe();
723 auto ptr = arrowValuesBuffer->mutable_data();
724 if (
ptr ==
nullptr) {
728 std::unique_ptr<TBufferFile> offsetBuffer =
nullptr;
730 std::shared_ptr<arrow::Buffer> arrowOffsetBuffer;
733 uint32_t totalSize = 0;
734 int64_t listSize = 1;
735 if (
auto fixedSizeList = std::dynamic_pointer_cast<arrow::FixedSizeListType>(physicalField->type())) {
736 listSize = fixedSizeList->list_size();
737 typeSize = fixedSizeList->field(0)->type()->byte_width();
738 auto clusterIt = descriptor.FindClusterId(0, 0);
739 while (clusterIt != rns::kInvalidDescriptorId) {
740 auto&
index = descriptor.GetClusterDescriptor(clusterIt);
741 auto mask = std::make_unique<bool[]>(
index.GetNEntries());
742 std::fill(
mask.get(),
mask.get() +
index.GetNEntries(),
true);
745 int readLast =
index.GetNEntries();
746 if (listSize == -1) {
749 size = readLast * listSize;
751 readEntries += readLast;
752 memcpy(
ptr, inPtr,
size * typeSize);
753 ptr += (ptrdiff_t)(
size * typeSize);
754 clusterIt = descriptor.FindNextClusterId(clusterIt);
756 }
else if (
auto vlaListType = std::dynamic_pointer_cast<arrow::ListType>(physicalField->type())) {
758 typeSize = vlaListType->field(0)->type()->byte_width();
759 offsetBuffer = std::make_unique<TBufferFile>(TBuffer::EMode::kWrite, 4 * 1024 * 1024);
760 result = arrow::AllocateResizableBuffer((totalEntries + 1) * (int64_t)
sizeof(
int), pool);
764 arrowOffsetBuffer =
result.MoveValueUnsafe();
767 auto offsetBulk = model.CreateBulk(physicalField->name());
769 bulk = model.CreateBulk(physicalField->name());
770 auto clusterIt = descriptor.FindClusterId(0, 0);
771 auto* ptrOffset =
reinterpret_cast<int*
>(arrowOffsetBuffer->mutable_data());
772 auto* tPtrOffset =
reinterpret_cast<int*
>(ptrOffset);
773 offsets = std::span<int>{tPtrOffset, tPtrOffset + totalEntries + 1};
775 auto copyOffsets = [&arrowValuesBuffer, &pool, &ptrOffset, &
ptr, &totalSize](
auto inPtr,
size_t total) {
776 using value_type =
typename std::decay_t<
decltype(*inPtr)>::value_type;
777 for (
size_t i = 0;
i < total;
i++) {
778 *ptrOffset++ = totalSize;
779 totalSize += inPtr[
i].size();
781 *ptrOffset = totalSize;
782 auto&&
result = arrow::AllocateResizableBuffer(totalSize *
sizeof(value_type), pool);
786 arrowValuesBuffer =
result.MoveValueUnsafe();
787 ptr = (uint8_t*)(arrowValuesBuffer->mutable_data());
789 for (
size_t i = 0;
i < total;
i++) {
790 int vlaSizeInBytes = inPtr[
i].size() *
sizeof(value_type);
791 if (vlaSizeInBytes == 0) {
794 memcpy(
ptr, inPtr[
i].
data(), vlaSizeInBytes);
795 ptr += vlaSizeInBytes;
799 while (clusterIt != rns::kInvalidDescriptorId) {
800 auto&
index = descriptor.GetClusterDescriptor(clusterIt);
801 auto mask = std::make_unique<bool[]>(
index.GetNEntries());
802 std::fill(
mask.get(),
mask.get() +
index.GetNEntries(),
true);
803 int readLast =
index.GetNEntries();
804 switch (vlaListType->field(0)->type()->id()) {
805 case arrow::Type::FLOAT: {
806 copyOffsets((ROOT::Internal::VecOps::RVec<float>*)offsetBulk.ReadBulk(
DPLLocalIndex(clusterIt,
index.GetFirstEntryIndex()),
mask.get(),
index.GetNEntries()), readLast);
808 case arrow::Type::DOUBLE: {
809 copyOffsets((ROOT::Internal::VecOps::RVec<double>*)offsetBulk.ReadBulk(
DPLLocalIndex(clusterIt,
index.GetFirstEntryIndex()),
mask.get(),
index.GetNEntries()), readLast);
811 case arrow::Type::INT8: {
812 copyOffsets((ROOT::Internal::VecOps::RVec<int8_t>*)offsetBulk.ReadBulk(
DPLLocalIndex(clusterIt,
index.GetFirstEntryIndex()),
mask.get(),
index.GetNEntries()), readLast);
814 case arrow::Type::INT16: {
815 copyOffsets((ROOT::Internal::VecOps::RVec<int16_t>*)offsetBulk.ReadBulk(
DPLLocalIndex(clusterIt,
index.GetFirstEntryIndex()),
mask.get(),
index.GetNEntries()), readLast);
817 case arrow::Type::INT32: {
818 copyOffsets((ROOT::Internal::VecOps::RVec<int32_t>*)offsetBulk.ReadBulk(
DPLLocalIndex(clusterIt,
index.GetFirstEntryIndex()),
mask.get(),
index.GetNEntries()), readLast);
820 case arrow::Type::INT64: {
821 copyOffsets((ROOT::Internal::VecOps::RVec<int64_t>*)offsetBulk.ReadBulk(
DPLLocalIndex(clusterIt,
index.GetFirstEntryIndex()),
mask.get(),
index.GetNEntries()), readLast);
823 case arrow::Type::UINT8: {
824 copyOffsets((ROOT::Internal::VecOps::RVec<uint8_t>*)offsetBulk.ReadBulk(
DPLLocalIndex(clusterIt,
index.GetFirstEntryIndex()),
mask.get(),
index.GetNEntries()), readLast);
826 case arrow::Type::UINT16: {
827 copyOffsets((ROOT::Internal::VecOps::RVec<uint16_t>*)offsetBulk.ReadBulk(
DPLLocalIndex(clusterIt,
index.GetFirstEntryIndex()),
mask.get(),
index.GetNEntries()), readLast);
829 case arrow::Type::UINT32: {
830 copyOffsets((ROOT::Internal::VecOps::RVec<uint32_t>*)offsetBulk.ReadBulk(
DPLLocalIndex(clusterIt,
index.GetFirstEntryIndex()),
mask.get(),
index.GetNEntries()), readLast);
832 case arrow::Type::UINT64: {
833 copyOffsets((ROOT::Internal::VecOps::RVec<uint64_t>*)offsetBulk.ReadBulk(
DPLLocalIndex(clusterIt,
index.GetFirstEntryIndex()),
mask.get(),
index.GetNEntries()), readLast);
840 readEntries += readLast;
841 clusterIt = descriptor.FindNextClusterId(clusterIt);
844 auto clusterIt = descriptor.FindClusterId(0, 0);
845 while (clusterIt != rns::kInvalidDescriptorId) {
846 auto&
index = descriptor.GetClusterDescriptor(clusterIt);
847 auto mask = std::make_unique<bool[]>(
index.GetNEntries());
848 std::fill(
mask.get(),
mask.get() +
index.GetNEntries(),
true);
851 int readLast =
index.GetNEntries();
852 if (listSize == -1) {
855 size = readLast * listSize;
857 readEntries += readLast;
858 memcpy(
ptr, inPtr,
size * typeSize);
859 ptr += (ptrdiff_t)(
size * typeSize);
860 clusterIt = descriptor.FindNextClusterId(clusterIt);
865 auto vdata = std::make_shared<arrow::ArrayData>(physicalField->type()->field(0)->type(), totalSize, std::vector<std::shared_ptr<arrow::Buffer>>{nullptr, arrowValuesBuffer});
866 array = std::make_shared<arrow::ListArray>(physicalField->type(), readEntries, arrowOffsetBuffer, arrow::MakeArray(vdata));
869 totalSize = readEntries * listSize;
870 auto data = std::make_shared<arrow::ArrayData>(physicalField->type(), readEntries, std::vector<std::shared_ptr<arrow::Buffer>>{nullptr, arrowValuesBuffer});
875 totalSize = readEntries * listSize;
876 auto vdata = std::make_shared<arrow::ArrayData>(physicalField->type()->field(0)->type(), totalSize, std::vector<std::shared_ptr<arrow::Buffer>>{nullptr, arrowValuesBuffer});
877 array = std::make_shared<arrow::FixedSizeListArray>(physicalField->type(), readEntries, arrow::MakeArray(vdata));
881 columns.push_back(
array);
884 auto batch = arrow::RecordBatch::Make(dataset_schema,
rows, columns);