214 TTree
t1(
"t1",
"a simple Tree with simple variables");
217 Float_t px = 0, py = 1, pz = 2;
220 t1.Branch(
"px", &px,
"px/F");
221 t1.Branch(
"py", &py,
"py/F");
222 t1.Branch(
"pz", &pz,
"pz/F");
223 t1.Branch(
"random", &random,
"random/D");
224 t1.Branch(
"ev", &ev,
"ev/I");
225 t1.Branch(
"xyz", xyz,
"xyz[3]/F");
226 t1.Branch(
"ij", ij,
"ij[2]/I");
228 for (Int_t
i = 0;
i < 1000;
i++) {
232 gRandom->Rannor(px, py);
233 pz = px * px + py * py;
237 random = gRandom->Rndm();
241 file->WriteObjectAny(&
t1,
t1.Class());
242 auto* fileRead =
new TBufferFile(TBuffer::kRead,
file->BufferSize(),
file->Buffer(),
false,
nullptr);
244 std::vector<char const*> capabilitiesSpecs = {
245 "O2Framework:RNTupleObjectReadingCapability",
246 "O2Framework:TTreeObjectReadingCapability",
249 std::vector<LoadablePlugin> plugins;
250 for (
auto spec : capabilitiesSpecs) {
252 for (
auto& extra : morePlugins) {
253 plugins.push_back(extra);
256 REQUIRE(plugins.size() == 2);
259 std::vector<char const*> configDiscoverySpec = {};
260 PluginManager::loadFromPlugin<RootObjectReadingCapability, RootObjectReadingCapabilityPlugin>(plugins, factory.
capabilities);
268 auto fs = std::make_shared<TBufferFileFS>(fileRead, factory);
270 arrow::dataset::FileSource
source(
"p", fs);
273 REQUIRE(schemaOpt.ok());
274 auto schema = *schemaOpt;
275 REQUIRE(schema->num_fields() == 7);
276 REQUIRE(schema->field(0)->type()->id() == arrow::float32()->id());
277 REQUIRE(schema->field(1)->type()->id() == arrow::float32()->id());
278 REQUIRE(schema->field(2)->type()->id() == arrow::float32()->id());
279 REQUIRE(schema->field(3)->type()->id() == arrow::float64()->id());
280 REQUIRE(schema->field(4)->type()->id() == arrow::int32()->id());
281 REQUIRE(schema->field(5)->type()->id() == arrow::fixed_size_list(arrow::float32(), 3)->id());
282 REQUIRE(schema->field(6)->type()->id() == arrow::fixed_size_list(arrow::int32(), 2)->id());
283 auto fragment =
format->MakeFragment(
source, {}, schema);
284 REQUIRE(fragment.ok());
285 auto options = std::make_shared<arrow::dataset::ScanOptions>();
286 options->dataset_schema = schema;
287 auto scanner =
format->ScanBatchesAsync(options, *fragment);
288 REQUIRE(scanner.ok());
289 auto batches = (*scanner)();
290 auto result = batches.result();
292 REQUIRE((*result)->columns().size() == 7);
293 REQUIRE((*result)->num_rows() == 1000);
299 auto int_array = std::static_pointer_cast<arrow::Int32Array>(batch->GetColumnByName(
"ev"));
300 REQUIRE(int_array->length() == 100);
301 for (int64_t
j = 0;
j < int_array->length();
j++) {
302 REQUIRE(int_array->Value(
j) ==
j + 1);
307 auto list_array = std::static_pointer_cast<arrow::FixedSizeListArray>(batch->GetColumnByName(
"xyz"));
309 REQUIRE(list_array->length() == 100);
311 for (int64_t
i = 0;
i < list_array->length();
i++) {
312 auto value_slice = list_array->value_slice(
i);
313 auto float_array = std::static_pointer_cast<arrow::FloatArray>(value_slice);
315 REQUIRE(float_array->Value(0) == 1);
316 REQUIRE(float_array->Value(1) == 2);
317 REQUIRE(float_array->Value(2) ==
i + 1);
322 auto list_array = std::static_pointer_cast<arrow::FixedSizeListArray>(batch->GetColumnByName(
"ij"));
324 REQUIRE(list_array->length() == 100);
326 for (int64_t
i = 0;
i < list_array->length();
i++) {
327 auto value_slice = list_array->value_slice(
i);
328 auto int_array = std::static_pointer_cast<arrow::Int32Array>(value_slice);
329 REQUIRE(int_array->Value(0) ==
i);
330 REQUIRE(int_array->Value(1) ==
i + 1);
335 auto bool_array = std::static_pointer_cast<arrow::BooleanArray>(batch->GetColumnByName(
"bools"));
337 REQUIRE(bool_array->length() == 100);
338 for (int64_t
j = 0;
j < bool_array->length();
j++) {
339 REQUIRE(bool_array->Value(
j) == (
j % 3 == 0));
344 auto list_array = std::static_pointer_cast<arrow::FixedSizeListArray>(batch->GetColumnByName(
"manyBools"));
346 REQUIRE(list_array->length() == 100);
347 for (int64_t
i = 0;
i < list_array->length();
i++) {
348 auto value_slice = list_array->value_slice(
i);
349 auto bool_array = std::static_pointer_cast<arrow::BooleanArray>(value_slice);
350 REQUIRE(bool_array->Value(0) == (
i % 4 == 0));
351 REQUIRE(bool_array->Value(1) == (
i % 5 == 0));
356 auto list_array = std::static_pointer_cast<arrow::ListArray>(batch->GetColumnByName(
"vla"));
358 REQUIRE(list_array->length() == 100);
359 for (int64_t
i = 0;
i < list_array->length();
i++) {
360 auto value_slice = list_array->value_slice(
i);
361 REQUIRE(value_slice->length() == (
i % 10));
362 auto int_array = std::static_pointer_cast<arrow::Int32Array>(value_slice);
363 for (
size_t j = 0;
j < value_slice->length();
j++) {
364 REQUIRE(int_array->Value(
j) ==
j);
412 auto*
f =
new TMemFile(
"foo",
"RECREATE");
417 auto* t =
new TTree(
"tracks",
"a simple Tree with simple variables");
421 Float_t px = 0, py = 1, pz = 2;
424 t->Branch(
"px", &px,
"px/F");
425 t->Branch(
"py", &py,
"py/F");
426 t->Branch(
"pz", &pz,
"pz/F");
427 t->Branch(
"random", &random,
"random/D");
428 t->Branch(
"ev", &ev,
"ev/I");
429 t->Branch(
"xyz", xyz,
"xyz[3]/F");
430 t->Branch(
"ij", ij,
"ij[2]/I");
432 for (Int_t
i = 0;
i < 1000;
i++) {
436 gRandom->Rannor(px, py);
437 pz = px * px + py * py;
441 random = gRandom->Rndm();
448 t =
new TTree(
"tracks",
"a simple Tree with simple variables");
452 Float_t px = 0, py = 1, pz = 2;
457 int vla[10] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
461 t->Branch(
"px", &px,
"px/F");
462 t->Branch(
"py", &py,
"py/F");
463 t->Branch(
"pz", &pz,
"pz/F");
464 t->Branch(
"random", &random,
"random/D");
465 t->Branch(
"ev", &ev,
"ev/I");
466 t->Branch(
"xyz", xyz,
"xyz[3]/F");
467 t->Branch(
"ij", ij,
"ij[2]/I");
468 t->Branch(
"bools", &oneBool,
"bools/O");
469 t->Branch(
"manyBools", &manyBool,
"manyBools[2]/O");
470 t->Branch(
"vla_size", &vlaSize,
"vla_size/I");
471 t->Branch(
"vla", vla,
"vla[vla_size]/I");
472 t->Branch(
"byte", &
byte,
"byte/B");
474 for (Int_t
i = 0;
i < 100;
i++) {
478 gRandom->Rannor(px, py);
479 pz = px * px + py * py;
483 random = gRandom->Rndm();
485 oneBool = (
i % 3 == 0);
486 manyBool[0] = (
i % 4 == 0);
487 manyBool[1] = (
i % 5 == 0);
495 std::vector<char const*> capabilitiesSpecs = {
496 "O2Framework:RNTupleObjectReadingCapability",
497 "O2Framework:TTreeObjectReadingCapability",
502 std::vector<LoadablePlugin> plugins;
503 for (
auto spec : capabilitiesSpecs) {
505 for (
auto& extra : morePlugins) {
506 plugins.push_back(extra);
509 REQUIRE(plugins.size() == 2);
511 PluginManager::loadFromPlugin<RootObjectReadingCapability, RootObjectReadingCapabilityPlugin>(plugins, factory.
capabilities);
518 auto rNtupleFormat = factory.
capabilities[0].factory().format();
521 auto fs = std::make_shared<TFileFileSystem>(
f, 50 * 1024 * 1024, factory);
523 arrow::dataset::FileSource
source(
"DF_2/tracks", fs);
526 REQUIRE(physicalSchema.ok());
529 std::vector<std::shared_ptr<arrow::Field>> fields;
530 for (
auto& field : (*(physicalSchema))->fields()) {
531 if (field->name().ends_with(
"_size")) {
534 fields.push_back(field);
536 std::shared_ptr<arrow::Schema> schema = std::make_shared<arrow::Schema>(fields);
540 auto fragment =
format->MakeFragment(
source, {}, *physicalSchema);
541 REQUIRE(fragment.ok());
542 auto options = std::make_shared<arrow::dataset::ScanOptions>();
543 options->dataset_schema = schema;
544 auto scanner =
format->ScanBatchesAsync(options, *fragment);
545 REQUIRE(scanner.ok());
549 std::shared_ptr<arrow::RecordBatch> batch;
551 auto batches = (*scanner)();
552 auto result = batches.result();
554 REQUIRE((*result)->columns().size() == 11);
555 REQUIRE((*result)->num_rows() == 100);
556 std::shared_ptr<arrow::ResizableBuffer>
buffer = *arrow::AllocateResizableBuffer(1000, 64);
557 auto deferredWriterStream = factory.
capabilities[1].factory().deferredOutputStreamer(*fragment,
buffer);
558 auto outBatch = arrow::ipc::MakeStreamWriter(deferredWriterStream.get(), schema);
559 auto status = outBatch.ValueOrDie()->WriteRecordBatch(**
result);
560 std::shared_ptr<arrow::io::InputStream> bufferReader = std::make_shared<arrow::io::BufferReader>(
buffer);
561 auto readerResult = arrow::ipc::RecordBatchStreamReader::Open(bufferReader);
562 auto batchReader = readerResult.ValueOrDie();
564 auto next = batchReader->ReadNext(&batch);
565 REQUIRE(batch !=
nullptr);
569 auto*
output =
new TMemFile(
"foo",
"RECREATE");
570 auto outFs = std::make_shared<TFileFileSystem>(
output, 0, factory);
573 auto destination = outFs->OpenOutputStream(
"/", {});
574 REQUIRE(destination.ok());
577 arrow::fs::FileLocator locator{outFs,
"/DF_3"};
578 auto writer =
format->MakeWriter(*destination, schema, {}, locator);
579 auto success = writer->get()->Write(batch);
580 REQUIRE(batch->schema()->field(0)->name() ==
"px");
581 auto rootDestination = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(*destination);
585 REQUIRE(success.ok());
587 auto tfileFs = std::dynamic_pointer_cast<TFileFileSystem>(outFs);
588 REQUIRE(tfileFs.get());
589 REQUIRE(tfileFs->GetFile());
590 auto*
tree = (TTree*)tfileFs->GetFile()->GetObjectChecked(
"/DF_3", TClass::GetClass(
"TTree"));
591 REQUIRE(
tree !=
nullptr);
592 REQUIRE(((TBranch*)
tree->GetListOfBranches()->At(0))->GetEntries() == 100);
593 REQUIRE(((TBranch*)
tree->GetListOfBranches()->At(0))->GetName() == std::string(
"px"));
595 arrow::dataset::FileSource source2(
"/DF_3", outFs);
597 REQUIRE(
format->IsSupported(source2) ==
true);
598 tfileFs = std::dynamic_pointer_cast<TFileFileSystem>(source2.filesystem());
599 REQUIRE(tfileFs.get());
600 REQUIRE(tfileFs->GetFile());
601 REQUIRE(tfileFs->GetFile()->GetObjectChecked(
"/DF_3", TClass::GetClass(
"TTree")));
603 tree = (TTree*)tfileFs->GetFile()->GetObjectChecked(
"/DF_3", TClass::GetClass(
"TTree"));
604 REQUIRE(
tree !=
nullptr);
605 REQUIRE(((TBranch*)
tree->GetListOfBranches()->At(0))->GetEntries() == 100);
607 auto schemaOptWritten =
format->Inspect(source2);
608 tfileFs = std::dynamic_pointer_cast<TFileFileSystem>(source2.filesystem());
609 REQUIRE(tfileFs.get());
610 REQUIRE(tfileFs->GetFile());
611 REQUIRE(tfileFs->GetFile()->GetObjectChecked(
"/DF_3", TClass::GetClass(
"TTree")));
612 REQUIRE(schemaOptWritten.ok());
613 auto schemaWritten = *schemaOptWritten;
615 tree = (TTree*)tfileFs->GetFile()->GetObjectChecked(
"/DF_3", TClass::GetClass(
"TTree"));
616 REQUIRE(
tree !=
nullptr);
617 REQUIRE(((TBranch*)
tree->GetListOfBranches()->At(0))->GetEntries() == 100);
620 std::vector<std::shared_ptr<arrow::Field>> fields;
621 for (
auto& field : schemaWritten->fields()) {
622 if (field->name().ends_with(
"_size")) {
625 fields.push_back(field);
627 std::shared_ptr<arrow::Schema> schema = std::make_shared<arrow::Schema>(fields);
630 auto fragmentWritten =
format->MakeFragment(source2, {}, *physicalSchema);
631 REQUIRE(fragmentWritten.ok());
632 auto optionsWritten = std::make_shared<arrow::dataset::ScanOptions>();
633 optionsWritten->dataset_schema = schema;
634 auto scannerWritten =
format->ScanBatchesAsync(optionsWritten, *fragmentWritten);
635 REQUIRE(scannerWritten.ok());
636 tree = (TTree*)tfileFs->GetFile()->GetObjectChecked(
"/DF_3", TClass::GetClass(
"TTree"));
637 REQUIRE(
tree !=
nullptr);
638 REQUIRE(((TBranch*)
tree->GetListOfBranches()->At(0))->GetEntries() == 100);
639 auto batchesWritten = (*scannerWritten)();
640 auto resultWritten = batchesWritten.result();
641 REQUIRE(resultWritten.ok());
642 REQUIRE((*resultWritten)->columns().size() == 11);
643 REQUIRE((*resultWritten)->num_rows() == 100);
645 std::shared_ptr<arrow::ResizableBuffer>
buffer = *arrow::AllocateResizableBuffer(1000, 64);
646 auto deferredWriterStream2 = factory.
capabilities[1].factory().deferredOutputStreamer(*fragmentWritten,
buffer);
647 auto outBatch = arrow::ipc::MakeStreamWriter(deferredWriterStream2.get(), schema);
648 auto status = outBatch.ValueOrDie()->WriteRecordBatch(**resultWritten);
649 std::shared_ptr<arrow::io::InputStream> bufferReader = std::make_shared<arrow::io::BufferReader>(
buffer);
650 auto readerResult = arrow::ipc::RecordBatchStreamReader::Open(bufferReader);
651 auto batchReader = readerResult.ValueOrDie();
653 auto next = batchReader->ReadNext(&batch);
654 REQUIRE(batch !=
nullptr);
658 arrow::fs::FileLocator rnTupleLocator{outFs,
"/rntuple"};
660 auto rntupleDestination = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(*destination);
663 auto rNtupleWriter = rNtupleFormat->MakeWriter(*destination, schema, {}, rnTupleLocator);
664 auto rNtupleSuccess = rNtupleWriter->get()->Write(batch);
665 REQUIRE(rNtupleSuccess.ok());
669 arrow::dataset::FileSource writtenRntupleSource(
"/rntuple", outFs);
671 REQUIRE(rNtupleFormat->IsSupported(writtenRntupleSource) ==
true);
673 auto rntupleSchemaOpt = rNtupleFormat->Inspect(writtenRntupleSource);
674 REQUIRE(rntupleSchemaOpt.ok());
675 auto rntupleSchemaWritten = *rntupleSchemaOpt;
678 auto rntupleFragmentWritten = rNtupleFormat->MakeFragment(writtenRntupleSource, {}, rntupleSchemaWritten);
679 REQUIRE(rntupleFragmentWritten.ok());
680 auto rntupleOptionsWritten = std::make_shared<arrow::dataset::ScanOptions>();
681 rntupleOptionsWritten->dataset_schema = rntupleSchemaWritten;
682 auto rntupleScannerWritten = rNtupleFormat->ScanBatchesAsync(rntupleOptionsWritten, *rntupleFragmentWritten);
683 REQUIRE(rntupleScannerWritten.ok());
684 auto rntupleBatchesWritten = (*rntupleScannerWritten)();
685 auto rntupleResultWritten = rntupleBatchesWritten.result();
686 REQUIRE(rntupleResultWritten.ok());
687 REQUIRE((*rntupleResultWritten)->columns().size() == 11);
689 REQUIRE((*rntupleResultWritten)->num_rows() == 100);