534 const std::shared_ptr<arrow::dataset::ScanOptions>& options,
535 const std::shared_ptr<arrow::dataset::FileFragment>& fragment)
const
537 auto dataset_schema = options->dataset_schema;
538 auto ntupleFragment = std::dynamic_pointer_cast<RNTupleFileFragment>(fragment);
540 auto generator = [pool = options->pool, ntupleFragment, dataset_schema, &totalCompressedSize = mTotCompressedSize,
541 &totalUncompressedSize = mTotUncompressedSize]() -> arrow::Future<std::shared_ptr<arrow::RecordBatch>> {
542 using namespace ROOT::Experimental;
543 std::vector<std::shared_ptr<arrow::Array>> columns;
544 std::vector<std::shared_ptr<arrow::Field>> fields = dataset_schema->fields();
547 ROOT::Experimental::RNTuple* rntuple = ntupleFragment->GetRNTuple();
548 auto reader = ROOT::Experimental::RNTupleReader::Open(rntuple);
549 auto& model = reader->GetModel();
550 for (
auto& physicalField : fields) {
551 auto bulk = model.CreateBulk(physicalField->name());
553 auto listType = std::dynamic_pointer_cast<arrow::FixedSizeListType>(physicalField->type());
555 auto& descriptor = reader->GetDescriptor();
556 auto totalEntries = reader->GetNEntries();
561 if (
rows != totalEntries) {
562 throw runtime_error_f(
"Unmatching number of rows for branch %s", physicalField->name().c_str());
564 arrow::Status status;
566 std::shared_ptr<arrow::Array>
array;
567 if (physicalField->type() == arrow::boolean() ||
568 (listType && physicalField->type()->field(0)->type() == arrow::boolean())) {
570 std::unique_ptr<arrow::ArrayBuilder> builder =
nullptr;
571 auto status = arrow::MakeBuilder(pool, physicalField->type()->field(0)->type(), &builder);
575 auto listBuilder = std::make_unique<arrow::FixedSizeListBuilder>(pool, std::move(builder), listType->list_size());
576 auto valueBuilder = listBuilder.get()->value_builder();
578 status = valueBuilder->Reserve(totalEntries * listType->list_size());
579 status &= listBuilder->Reserve(totalEntries);
581 throw runtime_error(
"Failed to reserve memory for array builder");
583 auto clusterIt = descriptor.FindClusterId(0, 0);
586 while (clusterIt != kInvalidDescriptorId) {
587 auto&
index = descriptor.GetClusterDescriptor(clusterIt);
588 auto mask = std::make_unique<bool[]>(
index.GetNEntries());
589 std::fill(
mask.get(),
mask.get() +
index.GetNEntries(),
true);
590 void*
ptr = bulk.ReadBulk(RClusterIndex(clusterIt,
index.GetFirstEntryIndex()),
mask.get(),
index.GetNEntries());
591 int readLast =
index.GetNEntries();
592 readEntries += readLast;
593 status &=
static_cast<arrow::BooleanBuilder*
>(valueBuilder)->AppendValues(
reinterpret_cast<uint8_t const*
>(
ptr), readLast * listType->list_size());
594 clusterIt = descriptor.FindNextClusterId(clusterIt);
596 status &=
static_cast<arrow::FixedSizeListBuilder*
>(listBuilder.get())->AppendValues(readEntries);
600 status &= listBuilder->Finish(&
array);
604 }
else if (listType ==
nullptr) {
605 std::unique_ptr<arrow::ArrayBuilder> builder =
nullptr;
606 auto status = arrow::MakeBuilder(pool, physicalField->type(), &builder);
610 auto valueBuilder =
static_cast<arrow::BooleanBuilder*
>(builder.get());
612 status = valueBuilder->Reserve(totalEntries);
614 throw runtime_error(
"Failed to reserve memory for array builder");
616 auto clusterIt = descriptor.FindClusterId(0, 0);
617 while (clusterIt != kInvalidDescriptorId) {
618 auto&
index = descriptor.GetClusterDescriptor(clusterIt);
619 auto mask = std::make_unique<bool[]>(
index.GetNEntries());
620 std::fill(
mask.get(),
mask.get() +
index.GetNEntries(),
true);
621 void*
ptr = bulk.ReadBulk(RClusterIndex(clusterIt,
index.GetFirstEntryIndex()),
mask.get(),
index.GetNEntries());
622 int readLast =
index.GetNEntries();
623 readEntries += readLast;
624 status &= valueBuilder->AppendValues(
reinterpret_cast<uint8_t const*
>(
ptr), readLast);
625 clusterIt = descriptor.FindNextClusterId(clusterIt);
630 status &= valueBuilder->Finish(&
array);
637 auto typeSize = physicalField->type()->byte_width();
640 auto branchSize = bytes ? bytes : 1000000;
641 auto&&
result = arrow::AllocateResizableBuffer(branchSize, pool);
645 std::shared_ptr<arrow::Buffer> arrowValuesBuffer = std::move(
result).ValueUnsafe();
646 auto ptr = arrowValuesBuffer->mutable_data();
647 if (
ptr ==
nullptr) {
651 std::unique_ptr<TBufferFile> offsetBuffer =
nullptr;
653 std::shared_ptr<arrow::Buffer> arrowOffsetBuffer;
656 uint32_t totalSize = 0;
657 int64_t listSize = 1;
658 if (
auto fixedSizeList = std::dynamic_pointer_cast<arrow::FixedSizeListType>(physicalField->type())) {
659 listSize = fixedSizeList->list_size();
660 typeSize = fixedSizeList->field(0)->type()->byte_width();
661 auto clusterIt = descriptor.FindClusterId(0, 0);
662 while (clusterIt != kInvalidDescriptorId) {
663 auto&
index = descriptor.GetClusterDescriptor(clusterIt);
664 auto mask = std::make_unique<bool[]>(
index.GetNEntries());
665 std::fill(
mask.get(),
mask.get() +
index.GetNEntries(),
true);
666 void* inPtr = bulk.ReadBulk(RClusterIndex(clusterIt,
index.GetFirstEntryIndex()),
mask.get(),
index.GetNEntries());
668 int readLast =
index.GetNEntries();
669 if (listSize == -1) {
672 size = readLast * listSize;
674 readEntries += readLast;
675 memcpy(
ptr, inPtr,
size * typeSize);
676 ptr += (ptrdiff_t)(
size * typeSize);
677 clusterIt = descriptor.FindNextClusterId(clusterIt);
679 }
else if (
auto vlaListType = std::dynamic_pointer_cast<arrow::ListType>(physicalField->type())) {
681 typeSize = vlaListType->field(0)->type()->byte_width();
682 offsetBuffer = std::make_unique<TBufferFile>(TBuffer::EMode::kWrite, 4 * 1024 * 1024);
683 result = arrow::AllocateResizableBuffer((totalEntries + 1) * (int64_t)
sizeof(
int), pool);
687 arrowOffsetBuffer =
result.MoveValueUnsafe();
690 auto offsetBulk = model.CreateBulk(physicalField->name());
692 bulk = model.CreateBulk(physicalField->name());
693 auto clusterIt = descriptor.FindClusterId(0, 0);
694 auto* ptrOffset =
reinterpret_cast<int*
>(arrowOffsetBuffer->mutable_data());
695 auto* tPtrOffset =
reinterpret_cast<int*
>(ptrOffset);
696 offsets = std::span<int>{tPtrOffset, tPtrOffset + totalEntries + 1};
698 auto copyOffsets = [&arrowValuesBuffer, &pool, &ptrOffset, &
ptr, &totalSize](
auto inPtr,
size_t total) {
699 using value_type =
typename std::decay_t<
decltype(*inPtr)>::value_type;
700 for (
size_t i = 0;
i < total;
i++) {
701 *ptrOffset++ = totalSize;
702 totalSize += inPtr[
i].size();
704 *ptrOffset = totalSize;
705 auto&&
result = arrow::AllocateResizableBuffer(totalSize *
sizeof(value_type), pool);
709 arrowValuesBuffer =
result.MoveValueUnsafe();
710 ptr = (uint8_t*)(arrowValuesBuffer->mutable_data());
712 for (
size_t i = 0;
i < total;
i++) {
713 int vlaSizeInBytes = inPtr[
i].size() *
sizeof(value_type);
714 if (vlaSizeInBytes == 0) {
717 memcpy(
ptr, inPtr[
i].
data(), vlaSizeInBytes);
718 ptr += vlaSizeInBytes;
722 while (clusterIt != kInvalidDescriptorId) {
723 auto&
index = descriptor.GetClusterDescriptor(clusterIt);
724 auto mask = std::make_unique<bool[]>(
index.GetNEntries());
725 std::fill(
mask.get(),
mask.get() +
index.GetNEntries(),
true);
726 int readLast =
index.GetNEntries();
727 switch (vlaListType->field(0)->type()->id()) {
728 case arrow::Type::FLOAT: {
729 copyOffsets((ROOT::Internal::VecOps::RVec<float>*)offsetBulk.ReadBulk(RClusterIndex(clusterIt,
index.GetFirstEntryIndex()),
mask.get(),
index.GetNEntries()), readLast);
731 case arrow::Type::DOUBLE: {
732 copyOffsets((ROOT::Internal::VecOps::RVec<double>*)offsetBulk.ReadBulk(RClusterIndex(clusterIt,
index.GetFirstEntryIndex()),
mask.get(),
index.GetNEntries()), readLast);
734 case arrow::Type::INT8: {
735 copyOffsets((ROOT::Internal::VecOps::RVec<int8_t>*)offsetBulk.ReadBulk(RClusterIndex(clusterIt,
index.GetFirstEntryIndex()),
mask.get(),
index.GetNEntries()), readLast);
737 case arrow::Type::INT16: {
738 copyOffsets((ROOT::Internal::VecOps::RVec<int16_t>*)offsetBulk.ReadBulk(RClusterIndex(clusterIt,
index.GetFirstEntryIndex()),
mask.get(),
index.GetNEntries()), readLast);
740 case arrow::Type::INT32: {
741 copyOffsets((ROOT::Internal::VecOps::RVec<int32_t>*)offsetBulk.ReadBulk(RClusterIndex(clusterIt,
index.GetFirstEntryIndex()),
mask.get(),
index.GetNEntries()), readLast);
743 case arrow::Type::INT64: {
744 copyOffsets((ROOT::Internal::VecOps::RVec<int64_t>*)offsetBulk.ReadBulk(RClusterIndex(clusterIt,
index.GetFirstEntryIndex()),
mask.get(),
index.GetNEntries()), readLast);
746 case arrow::Type::UINT8: {
747 copyOffsets((ROOT::Internal::VecOps::RVec<uint8_t>*)offsetBulk.ReadBulk(RClusterIndex(clusterIt,
index.GetFirstEntryIndex()),
mask.get(),
index.GetNEntries()), readLast);
749 case arrow::Type::UINT16: {
750 copyOffsets((ROOT::Internal::VecOps::RVec<uint16_t>*)offsetBulk.ReadBulk(RClusterIndex(clusterIt,
index.GetFirstEntryIndex()),
mask.get(),
index.GetNEntries()), readLast);
752 case arrow::Type::UINT32: {
753 copyOffsets((ROOT::Internal::VecOps::RVec<uint32_t>*)offsetBulk.ReadBulk(RClusterIndex(clusterIt,
index.GetFirstEntryIndex()),
mask.get(),
index.GetNEntries()), readLast);
755 case arrow::Type::UINT64: {
756 copyOffsets((ROOT::Internal::VecOps::RVec<uint64_t>*)offsetBulk.ReadBulk(RClusterIndex(clusterIt,
index.GetFirstEntryIndex()),
mask.get(),
index.GetNEntries()), readLast);
763 readEntries += readLast;
764 clusterIt = descriptor.FindNextClusterId(clusterIt);
767 auto clusterIt = descriptor.FindClusterId(0, 0);
768 while (clusterIt != kInvalidDescriptorId) {
769 auto&
index = descriptor.GetClusterDescriptor(clusterIt);
770 auto mask = std::make_unique<bool[]>(
index.GetNEntries());
771 std::fill(
mask.get(),
mask.get() +
index.GetNEntries(),
true);
772 void* inPtr = bulk.ReadBulk(RClusterIndex(clusterIt,
index.GetFirstEntryIndex()),
mask.get(),
index.GetNEntries());
774 int readLast =
index.GetNEntries();
775 if (listSize == -1) {
778 size = readLast * listSize;
780 readEntries += readLast;
781 memcpy(
ptr, inPtr,
size * typeSize);
782 ptr += (ptrdiff_t)(
size * typeSize);
783 clusterIt = descriptor.FindNextClusterId(clusterIt);
788 auto varray = std::make_shared<arrow::PrimitiveArray>(physicalField->type()->field(0)->type(), totalSize, arrowValuesBuffer);
789 array = std::make_shared<arrow::ListArray>(physicalField->type(), readEntries, arrowOffsetBuffer, varray);
792 totalSize = readEntries * listSize;
793 array = std::make_shared<arrow::PrimitiveArray>(physicalField->type(), readEntries, arrowValuesBuffer);
797 totalSize = readEntries * listSize;
798 auto varray = std::make_shared<arrow::PrimitiveArray>(physicalField->type()->field(0)->type(), totalSize, arrowValuesBuffer);
799 array = std::make_shared<arrow::FixedSizeListArray>(physicalField->type(), readEntries, varray);
803 columns.push_back(
array);
806 auto batch = arrow::RecordBatch::Make(dataset_schema,
rows, columns);