78 TTree
t1(
"t1",
"a simple Tree with simple variables");
84 t1.Branch(
"px", &px,
"px/F");
85 t1.Branch(
"py", &py,
"py/F");
86 t1.Branch(
"pz", &pz,
"pz/F");
87 t1.Branch(
"random", &random,
"random/D");
88 t1.Branch(
"ev", &ev,
"ev/I");
89 t1.Branch(
"xyz", xyz,
"xyz[3]/F");
90 t1.Branch(
"ij", ij,
"ij[2]/I");
92 for (Int_t
i = 0;
i < 1000;
i++) {
96 gRandom->Rannor(px, py);
97 pz = px * px + py * py;
101 random = gRandom->Rndm();
105 file->WriteObjectAny(&
t1,
t1.Class());
106 auto* fileRead =
new TBufferFile(TBuffer::kRead,
file->BufferSize(),
file->Buffer(),
false,
nullptr);
108 std::vector<char const*> capabilitiesSpecs = {
109 "O2Framework:RNTupleObjectReadingCapability",
110 "O2Framework:TTreeObjectReadingCapability",
113 std::vector<LoadablePlugin> plugins;
114 for (
auto spec : capabilitiesSpecs) {
116 for (
auto& extra : morePlugins) {
117 plugins.push_back(extra);
120 REQUIRE(plugins.size() == 2);
123 std::vector<char const*> configDiscoverySpec = {};
124 PluginManager::loadFromPlugin<RootObjectReadingCapability, RootObjectReadingCapabilityPlugin>(plugins, factory.
capabilities);
132 auto fs = std::make_shared<TBufferFileFS>(fileRead, factory);
134 arrow::dataset::FileSource
source(
"p", fs);
137 REQUIRE(schemaOpt.ok());
138 auto schema = *schemaOpt;
139 REQUIRE(schema->num_fields() == 7);
140 REQUIRE(schema->field(0)->type()->id() == arrow::float32()->id());
141 REQUIRE(schema->field(1)->type()->id() == arrow::float32()->id());
142 REQUIRE(schema->field(2)->type()->id() == arrow::float32()->id());
143 REQUIRE(schema->field(3)->type()->id() == arrow::float64()->id());
144 REQUIRE(schema->field(4)->type()->id() == arrow::int32()->id());
145 REQUIRE(schema->field(5)->type()->id() == arrow::fixed_size_list(arrow::float32(), 3)->id());
146 REQUIRE(schema->field(6)->type()->id() == arrow::fixed_size_list(arrow::int32(), 2)->id());
147 auto fragment =
format->MakeFragment(
source, {}, schema);
148 REQUIRE(fragment.ok());
149 auto options = std::make_shared<arrow::dataset::ScanOptions>();
150 options->dataset_schema = schema;
151 auto scanner =
format->ScanBatchesAsync(options, *fragment);
152 REQUIRE(scanner.ok());
153 auto batches = (*scanner)();
154 auto result = batches.result();
156 REQUIRE((*result)->columns().size() == 7);
157 REQUIRE((*result)->num_rows() == 1000);
163 auto int_array = std::static_pointer_cast<arrow::Int32Array>(batch->GetColumnByName(
"ev"));
164 REQUIRE(int_array->length() == 100);
165 for (int64_t
j = 0;
j < int_array->length();
j++) {
166 REQUIRE(int_array->Value(
j) ==
j + 1);
171 auto list_array = std::static_pointer_cast<arrow::FixedSizeListArray>(batch->GetColumnByName(
"xyz"));
173 REQUIRE(list_array->length() == 100);
175 for (int64_t
i = 0;
i < list_array->length();
i++) {
176 auto value_slice = list_array->value_slice(
i);
177 auto float_array = std::static_pointer_cast<arrow::FloatArray>(value_slice);
179 REQUIRE(float_array->Value(0) == 1);
180 REQUIRE(float_array->Value(1) == 2);
181 REQUIRE(float_array->Value(2) ==
i + 1);
186 auto list_array = std::static_pointer_cast<arrow::FixedSizeListArray>(batch->GetColumnByName(
"ij"));
188 REQUIRE(list_array->length() == 100);
190 for (int64_t
i = 0;
i < list_array->length();
i++) {
191 auto value_slice = list_array->value_slice(
i);
192 auto int_array = std::static_pointer_cast<arrow::Int32Array>(value_slice);
193 REQUIRE(int_array->Value(0) ==
i);
194 REQUIRE(int_array->Value(1) ==
i + 1);
199 auto bool_array = std::static_pointer_cast<arrow::BooleanArray>(batch->GetColumnByName(
"bools"));
201 REQUIRE(bool_array->length() == 100);
202 for (int64_t
j = 0;
j < bool_array->length();
j++) {
203 REQUIRE(bool_array->Value(
j) == (
j % 3 == 0));
208 auto list_array = std::static_pointer_cast<arrow::FixedSizeListArray>(batch->GetColumnByName(
"manyBools"));
210 REQUIRE(list_array->length() == 100);
211 for (int64_t
i = 0;
i < list_array->length();
i++) {
212 auto value_slice = list_array->value_slice(
i);
213 auto bool_array = std::static_pointer_cast<arrow::BooleanArray>(value_slice);
214 REQUIRE(bool_array->Value(0) == (
i % 4 == 0));
215 REQUIRE(bool_array->Value(1) == (
i % 5 == 0));
220 auto list_array = std::static_pointer_cast<arrow::ListArray>(batch->GetColumnByName(
"vla"));
222 REQUIRE(list_array->length() == 100);
223 for (int64_t
i = 0;
i < list_array->length();
i++) {
224 auto value_slice = list_array->value_slice(
i);
225 REQUIRE(value_slice->length() == (
i % 10));
226 auto int_array = std::static_pointer_cast<arrow::Int32Array>(value_slice);
227 for (
size_t j = 0;
j < value_slice->length();
j++) {
228 REQUIRE(int_array->Value(
j) ==
j);
276 auto*
f =
new TMemFile(
"foo",
"RECREATE");
281 auto* t =
new TTree(
"tracks",
"a simple Tree with simple variables");
285 Float_t px = 0, py = 1, pz = 2;
288 t->Branch(
"px", &px,
"px/F");
289 t->Branch(
"py", &py,
"py/F");
290 t->Branch(
"pz", &pz,
"pz/F");
291 t->Branch(
"random", &random,
"random/D");
292 t->Branch(
"ev", &ev,
"ev/I");
293 t->Branch(
"xyz", xyz,
"xyz[3]/F");
294 t->Branch(
"ij", ij,
"ij[2]/I");
296 for (Int_t
i = 0;
i < 1000;
i++) {
300 gRandom->Rannor(px, py);
301 pz = px * px + py * py;
305 random = gRandom->Rndm();
312 t =
new TTree(
"tracks",
"a simple Tree with simple variables");
316 Float_t px = 0, py = 1, pz = 2;
321 int vla[10] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
325 t->Branch(
"px", &px,
"px/F");
326 t->Branch(
"py", &py,
"py/F");
327 t->Branch(
"pz", &pz,
"pz/F");
328 t->Branch(
"random", &random,
"random/D");
329 t->Branch(
"ev", &ev,
"ev/I");
330 t->Branch(
"xyz", xyz,
"xyz[3]/F");
331 t->Branch(
"ij", ij,
"ij[2]/I");
332 t->Branch(
"bools", &oneBool,
"bools/O");
333 t->Branch(
"manyBools", &manyBool,
"manyBools[2]/O");
334 t->Branch(
"vla_size", &vlaSize,
"vla_size/I");
335 t->Branch(
"vla", vla,
"vla[vla_size]/I");
336 t->Branch(
"byte", &
byte,
"byte/B");
338 for (Int_t
i = 0;
i < 100;
i++) {
342 gRandom->Rannor(px, py);
343 pz = px * px + py * py;
347 random = gRandom->Rndm();
349 oneBool = (
i % 3 == 0);
350 manyBool[0] = (
i % 4 == 0);
351 manyBool[1] = (
i % 5 == 0);
359 std::vector<char const*> capabilitiesSpecs = {
360 "O2Framework:RNTupleObjectReadingCapability",
361 "O2Framework:TTreeObjectReadingCapability",
366 std::vector<LoadablePlugin> plugins;
367 for (
auto spec : capabilitiesSpecs) {
369 for (
auto& extra : morePlugins) {
370 plugins.push_back(extra);
373 REQUIRE(plugins.size() == 2);
375 PluginManager::loadFromPlugin<RootObjectReadingCapability, RootObjectReadingCapabilityPlugin>(plugins, factory.
capabilities);
382 auto rNtupleFormat = factory.
capabilities[0].factory().format();
385 auto fs = std::make_shared<TFileFileSystem>(
f, 50 * 1024 * 1024, factory);
387 arrow::dataset::FileSource
source(
"DF_2/tracks", fs);
390 REQUIRE(physicalSchema.ok());
393 std::vector<std::shared_ptr<arrow::Field>> fields;
394 for (
auto& field : (*(physicalSchema))->fields()) {
395 if (field->name().ends_with(
"_size")) {
398 fields.push_back(field);
400 std::shared_ptr<arrow::Schema> schema = std::make_shared<arrow::Schema>(fields);
404 auto fragment =
format->MakeFragment(
source, {}, *physicalSchema);
405 REQUIRE(fragment.ok());
406 auto options = std::make_shared<arrow::dataset::ScanOptions>();
407 options->dataset_schema = schema;
408 auto scanner =
format->ScanBatchesAsync(options, *fragment);
409 REQUIRE(scanner.ok());
413 std::shared_ptr<arrow::RecordBatch> batch;
415 auto batches = (*scanner)();
416 auto result = batches.result();
418 REQUIRE((*result)->columns().size() == 11);
419 REQUIRE((*result)->num_rows() == 100);
420 std::shared_ptr<arrow::ResizableBuffer>
buffer = *arrow::AllocateResizableBuffer(1000, 64);
421 auto deferredWriterStream = factory.
capabilities[1].factory().deferredOutputStreamer(*fragment,
buffer);
422 auto outBatch = arrow::ipc::MakeStreamWriter(deferredWriterStream.get(), schema);
423 auto status = outBatch.ValueOrDie()->WriteRecordBatch(**
result);
424 std::shared_ptr<arrow::io::InputStream> bufferReader = std::make_shared<arrow::io::BufferReader>(
buffer);
425 auto readerResult = arrow::ipc::RecordBatchStreamReader::Open(bufferReader);
426 auto batchReader = readerResult.ValueOrDie();
428 auto next = batchReader->ReadNext(&batch);
429 REQUIRE(batch !=
nullptr);
433 auto*
output =
new TMemFile(
"foo",
"RECREATE");
434 auto outFs = std::make_shared<TFileFileSystem>(
output, 0, factory);
437 auto destination = outFs->OpenOutputStream(
"/", {});
438 REQUIRE(destination.ok());
441 arrow::fs::FileLocator locator{outFs,
"/DF_3"};
442 auto writer =
format->MakeWriter(*destination, schema, {}, locator);
443 auto success = writer->get()->Write(batch);
444 REQUIRE(batch->schema()->field(0)->name() ==
"px");
445 auto rootDestination = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(*destination);
449 REQUIRE(success.ok());
451 auto tfileFs = std::dynamic_pointer_cast<TFileFileSystem>(outFs);
452 REQUIRE(tfileFs.get());
453 REQUIRE(tfileFs->GetFile());
454 auto*
tree = (TTree*)tfileFs->GetFile()->GetObjectChecked(
"/DF_3", TClass::GetClass(
"TTree"));
455 REQUIRE(
tree !=
nullptr);
456 REQUIRE(((TBranch*)
tree->GetListOfBranches()->At(0))->GetEntries() == 100);
457 REQUIRE(((TBranch*)
tree->GetListOfBranches()->At(0))->GetName() == std::string(
"px"));
459 arrow::dataset::FileSource source2(
"/DF_3", outFs);
461 REQUIRE(
format->IsSupported(source2) ==
true);
462 tfileFs = std::dynamic_pointer_cast<TFileFileSystem>(source2.filesystem());
463 REQUIRE(tfileFs.get());
464 REQUIRE(tfileFs->GetFile());
465 REQUIRE(tfileFs->GetFile()->GetObjectChecked(
"/DF_3", TClass::GetClass(
"TTree")));
467 tree = (TTree*)tfileFs->GetFile()->GetObjectChecked(
"/DF_3", TClass::GetClass(
"TTree"));
468 REQUIRE(
tree !=
nullptr);
469 REQUIRE(((TBranch*)
tree->GetListOfBranches()->At(0))->GetEntries() == 100);
471 auto schemaOptWritten =
format->Inspect(source2);
472 tfileFs = std::dynamic_pointer_cast<TFileFileSystem>(source2.filesystem());
473 REQUIRE(tfileFs.get());
474 REQUIRE(tfileFs->GetFile());
475 REQUIRE(tfileFs->GetFile()->GetObjectChecked(
"/DF_3", TClass::GetClass(
"TTree")));
476 REQUIRE(schemaOptWritten.ok());
477 auto schemaWritten = *schemaOptWritten;
479 tree = (TTree*)tfileFs->GetFile()->GetObjectChecked(
"/DF_3", TClass::GetClass(
"TTree"));
480 REQUIRE(
tree !=
nullptr);
481 REQUIRE(((TBranch*)
tree->GetListOfBranches()->At(0))->GetEntries() == 100);
484 std::vector<std::shared_ptr<arrow::Field>> fields;
485 for (
auto& field : schemaWritten->fields()) {
486 if (field->name().ends_with(
"_size")) {
489 fields.push_back(field);
491 std::shared_ptr<arrow::Schema> schema = std::make_shared<arrow::Schema>(fields);
494 auto fragmentWritten =
format->MakeFragment(source2, {}, *physicalSchema);
495 REQUIRE(fragmentWritten.ok());
496 auto optionsWritten = std::make_shared<arrow::dataset::ScanOptions>();
497 optionsWritten->dataset_schema = schema;
498 auto scannerWritten =
format->ScanBatchesAsync(optionsWritten, *fragmentWritten);
499 REQUIRE(scannerWritten.ok());
500 tree = (TTree*)tfileFs->GetFile()->GetObjectChecked(
"/DF_3", TClass::GetClass(
"TTree"));
501 REQUIRE(
tree !=
nullptr);
502 REQUIRE(((TBranch*)
tree->GetListOfBranches()->At(0))->GetEntries() == 100);
503 auto batchesWritten = (*scannerWritten)();
504 auto resultWritten = batchesWritten.result();
505 REQUIRE(resultWritten.ok());
506 REQUIRE((*resultWritten)->columns().size() == 11);
507 REQUIRE((*resultWritten)->num_rows() == 100);
509 std::shared_ptr<arrow::ResizableBuffer>
buffer = *arrow::AllocateResizableBuffer(1000, 64);
510 auto deferredWriterStream2 = factory.
capabilities[1].factory().deferredOutputStreamer(*fragmentWritten,
buffer);
511 auto outBatch = arrow::ipc::MakeStreamWriter(deferredWriterStream2.get(), schema);
512 auto status = outBatch.ValueOrDie()->WriteRecordBatch(**resultWritten);
513 std::shared_ptr<arrow::io::InputStream> bufferReader = std::make_shared<arrow::io::BufferReader>(
buffer);
514 auto readerResult = arrow::ipc::RecordBatchStreamReader::Open(bufferReader);
515 auto batchReader = readerResult.ValueOrDie();
517 auto next = batchReader->ReadNext(&batch);
518 REQUIRE(batch !=
nullptr);
522 arrow::fs::FileLocator rnTupleLocator{outFs,
"/rntuple"};
524 auto rntupleDestination = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(*destination);
527 auto rNtupleWriter = rNtupleFormat->MakeWriter(*destination, schema, {}, rnTupleLocator);
528 auto rNtupleSuccess = rNtupleWriter->get()->Write(batch);
529 REQUIRE(rNtupleSuccess.ok());
533 arrow::dataset::FileSource writtenRntupleSource(
"/rntuple", outFs);
535 REQUIRE(rNtupleFormat->IsSupported(writtenRntupleSource) ==
true);
537 auto rntupleSchemaOpt = rNtupleFormat->Inspect(writtenRntupleSource);
538 REQUIRE(rntupleSchemaOpt.ok());
539 auto rntupleSchemaWritten = *rntupleSchemaOpt;
542 auto rntupleFragmentWritten = rNtupleFormat->MakeFragment(writtenRntupleSource, {}, rntupleSchemaWritten);
543 REQUIRE(rntupleFragmentWritten.ok());
544 auto rntupleOptionsWritten = std::make_shared<arrow::dataset::ScanOptions>();
545 rntupleOptionsWritten->dataset_schema = rntupleSchemaWritten;
546 auto rntupleScannerWritten = rNtupleFormat->ScanBatchesAsync(rntupleOptionsWritten, *rntupleFragmentWritten);
547 REQUIRE(rntupleScannerWritten.ok());
548 auto rntupleBatchesWritten = (*rntupleScannerWritten)();
549 auto rntupleResultWritten = rntupleBatchesWritten.result();
550 REQUIRE(rntupleResultWritten.ok());
551 REQUIRE((*rntupleResultWritten)->columns().size() == 11);
553 REQUIRE((*rntupleResultWritten)->num_rows() == 100);