Project
Loading...
Searching...
No Matches
test_Root2ArrowTable.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <catch_amalgamated.hpp>
13
16#include "Framework/ASoA.h"
18#include "../src/ArrowDebugHelpers.h"
19
20#include <ROOT/RDataFrame.hxx>
21#include <ROOT/RArrowDS.hxx>
22#include <TBufferFile.h>
23#include <TClass.h>
24#include <TDirectoryFile.h>
25#include <TMemFile.h>
26#include <TDirectory.h>
27#include <TTree.h>
28#include <TRandom.h>
29#include <TFile.h>
30#include <ROOT/RField.hxx>
31#include <ROOT/RNTuple.hxx>
32#include <ROOT/RNTupleDescriptor.hxx>
33#include <ROOT/RNTupleModel.hxx>
34#include <ROOT/RNTupleReader.hxx>
35#include <ROOT/RNTupleUtil.hxx>
36#include <ROOT/RNTupleWriter.hxx>
37#include <memory>
38
39#include <arrow/array/array_primitive.h>
40#include <arrow/array/builder_primitive.h>
41#include <arrow/buffer.h>
42#include <arrow/dataset/scanner.h>
43#include <arrow/record_batch.h>
44#include <arrow/table.h>
45#include <arrow/ipc/writer.h>
46#include <arrow/io/memory.h>
47#include <arrow/ipc/writer.h>
48#include <arrow/ipc/reader.h>
50
51using namespace o2::framework;
52
53TEST_CASE("RootTree2Table")
54{
55 using namespace o2::framework;
57 TTree t1("t1", "a simple Tree with simple variables");
58 Float_t xyz[3];
59 Int_t ij[2];
60 Float_t px, py, pz;
61 Double_t random;
62 Int_t ev;
63 t1.Branch("px", &px, "px/F");
64 t1.Branch("py", &py, "py/F");
65 t1.Branch("pz", &pz, "pz/F");
66 t1.Branch("random", &random, "random/D");
67 t1.Branch("ev", &ev, "ev/I");
68 t1.Branch("xyz", xyz, "xyz[3]/F");
69 t1.Branch("ij", ij, "ij[2]/I");
70 // fill the tree
71 for (Int_t i = 0; i < 1000; i++) {
72 xyz[0] = 1;
73 xyz[1] = 2;
74 xyz[2] = 3;
75 gRandom->Rannor(px, py);
76 pz = px * px + py * py;
77 xyz[2] = i + 1;
78 ij[0] = i;
79 ij[1] = i + 1;
80 random = gRandom->Rndm();
81 ev = i + 1;
82 t1.Fill();
83 }
84
85 // Create an arrow table from this.
86 TableBuilder builder;
87 TTreeReader reader(&t1);
88 auto&& xyzReader = HolderMaker<float[3]>::make(reader, "xyz");
89 auto&& ijkReader = HolderMaker<int[2]>::make(reader, "ij");
90 auto&& pxReader = HolderMaker<float>::make(reader, "px");
91 auto&& pyReader = HolderMaker<float>::make(reader, "py");
92 auto&& pzReader = HolderMaker<float>::make(reader, "pz");
93 auto&& randomReader = HolderMaker<double>::make(reader, "random");
94 auto&& evReader = HolderMaker<int>::make(reader, "ev");
95
96 RootTableBuilderHelpers::convertTTree(builder, reader, std::move(xyzReader), std::move(ijkReader), std::move(pxReader), std::move(pyReader), std::move(pzReader), std::move(randomReader), std::move(evReader));
97 auto table = builder.finalize();
98 REQUIRE(table->num_rows() == 1000);
99 REQUIRE(table->num_columns() == 7);
100 REQUIRE(table->schema()->field(0)->type()->id() == arrow::fixed_size_list(arrow::float32(), 3)->id());
101 REQUIRE(table->schema()->field(1)->type()->id() == arrow::fixed_size_list(arrow::int32(), 2)->id());
102 REQUIRE(table->schema()->field(2)->type()->id() == arrow::float32()->id());
103 REQUIRE(table->schema()->field(3)->type()->id() == arrow::float32()->id());
104 REQUIRE(table->schema()->field(4)->type()->id() == arrow::float32()->id());
105 REQUIRE(table->schema()->field(5)->type()->id() == arrow::float64()->id());
106 REQUIRE(table->schema()->field(6)->type()->id() == arrow::int32()->id());
107
108 {
109 auto chunkToUse = table->column(0)->chunk(0);
110 chunkToUse = std::dynamic_pointer_cast<arrow::FixedSizeListArray>(chunkToUse)->values();
111 auto array = std::static_pointer_cast<arrow::FloatArray>(chunkToUse);
112 // array of 3 floats, time 1000.
113 REQUIRE(array->length() == 3000);
114 const float* c = reinterpret_cast<float const*>(array->values()->data());
115
116 CHECK(c[0] == 1);
117 CHECK(c[1] == 2);
118 CHECK(c[2] == 1);
119 }
120 {
121 auto chunkToUse = table->column(1)->chunk(0);
122 chunkToUse = std::dynamic_pointer_cast<arrow::FixedSizeListArray>(chunkToUse)->values();
123 auto array = std::static_pointer_cast<arrow::Int32Array>(chunkToUse);
124 REQUIRE(array->length() == 2000);
125
126 const int* ptr = reinterpret_cast<int const*>(array->values()->data());
127 for (size_t i = 0; i < 1000; i++) {
128 CHECK(ptr[2 * i + 0] == i);
129 CHECK(ptr[2 * i + 1] == i + 1);
130 }
131 }
132}
133
134namespace o2::aod
135{
136namespace test
137{
138DECLARE_SOA_COLUMN_FULL(Px, px, float, "px");
139DECLARE_SOA_COLUMN_FULL(Py, py, float, "py");
140DECLARE_SOA_COLUMN_FULL(Pz, pz, float, "pz");
141DECLARE_SOA_COLUMN_FULL(Xyz, xyz, float[3], "xyz");
142DECLARE_SOA_COLUMN_FULL(Ij, ij, int[2], "ij");
143DECLARE_SOA_COLUMN_FULL(Random, random, double, "random");
144DECLARE_SOA_COLUMN_FULL(Ev, ev, int, "ev");
145} // namespace test
146
147DECLARE_SOA_TABLE(Test, "AOD", "ETAPHI",
148 test::Px, test::Py, test::Pz, test::Xyz, test::Ij,
149 test::Random, test::Ev);
150} // namespace o2::aod
151
152TEST_CASE("RootTree2TableViaASoA")
153{
154 using namespace o2::framework;
156 TTree t2("t2", "a simple Tree with simple variables");
157 Float_t xyz[3];
158 Int_t ij[2];
159 Float_t px, py, pz;
160 Double_t random;
161 Int_t ev;
162 t2.Branch("px", &px, "px/F");
163 t2.Branch("py", &py, "py/F");
164 t2.Branch("pz", &pz, "pz/F");
165 t2.Branch("random", &random, "random/D");
166 t2.Branch("ev", &ev, "ev/I");
167 t2.Branch("xyz", xyz, "xyz[3]/F");
168 t2.Branch("ij", ij, "ij[2]/I");
169 // fill the tree
170 for (Int_t i = 0; i < 1000; i++) {
171 gRandom->Rannor(xyz[0], xyz[1]);
172 gRandom->Rannor(px, py);
173 pz = px * px + py * py;
174 xyz[2] = i + 1;
175 ij[0] = i;
176 ij[1] = i + 1;
177 random = gRandom->Rndm();
178 ev = i + 1;
179 t2.Fill();
180 }
181
182 // Create an arrow table from this.
183 TableBuilder builder;
184 TTreeReader reader(&t2);
185 REQUIRE(t2.GetEntries() == 1000);
186
187 RootTableBuilderHelpers::convertASoA<o2::aod::Test>(builder, reader);
188 auto table = builder.finalize();
189 REQUIRE(table->num_rows() == 1000);
190 REQUIRE(table->num_columns() == 7);
191 REQUIRE(table->column(0)->type()->id() == arrow::float32()->id());
192 REQUIRE(table->column(1)->type()->id() == arrow::float32()->id());
193 REQUIRE(table->column(2)->type()->id() == arrow::float32()->id());
194 REQUIRE(table->column(3)->type()->id() == arrow::fixed_size_list(arrow::float32(), 3)->id());
195 REQUIRE(table->column(4)->type()->id() == arrow::fixed_size_list(arrow::int32(), 2)->id());
196 REQUIRE(table->column(5)->type()->id() == arrow::float64()->id());
197 REQUIRE(table->column(6)->type()->id() == arrow::int32()->id());
198
199 o2::aod::Test testTable{table};
200 for (auto& row : testTable) {
201 REQUIRE(row.ij()[0] == row.ij()[1] - 1);
202 REQUIRE(row.ij()[1] == row.ev());
203 }
204}
205
206TEST_CASE("RootTree2Fragment")
207{
208 using namespace o2::framework;
210
212 TBufferFile* file = new TBufferFile(TBuffer::kWrite);
213
214 TTree t1("t1", "a simple Tree with simple variables");
215 Float_t xyz[3];
216 Int_t ij[2];
217 Float_t px = 0, py = 1, pz = 2;
218 Double_t random;
219 Int_t ev;
220 t1.Branch("px", &px, "px/F");
221 t1.Branch("py", &py, "py/F");
222 t1.Branch("pz", &pz, "pz/F");
223 t1.Branch("random", &random, "random/D");
224 t1.Branch("ev", &ev, "ev/I");
225 t1.Branch("xyz", xyz, "xyz[3]/F");
226 t1.Branch("ij", ij, "ij[2]/I");
227 // fill the tree
228 for (Int_t i = 0; i < 1000; i++) {
229 xyz[0] = 1;
230 xyz[1] = 2;
231 xyz[2] = 3;
232 gRandom->Rannor(px, py);
233 pz = px * px + py * py;
234 xyz[2] = i + 1;
235 ij[0] = i;
236 ij[1] = i + 1;
237 random = gRandom->Rndm();
238 ev = i + 1;
239 t1.Fill();
240 }
241 file->WriteObjectAny(&t1, t1.Class());
242 auto* fileRead = new TBufferFile(TBuffer::kRead, file->BufferSize(), file->Buffer(), false, nullptr);
243
244 std::vector<char const*> capabilitiesSpecs = {
245 "O2Framework:RNTupleObjectReadingCapability",
246 "O2Framework:TTreeObjectReadingCapability",
247 };
248
249 std::vector<LoadablePlugin> plugins;
250 for (auto spec : capabilitiesSpecs) {
251 auto morePlugins = PluginManager::parsePluginSpecString(spec);
252 for (auto& extra : morePlugins) {
253 plugins.push_back(extra);
254 }
255 }
256 REQUIRE(plugins.size() == 2);
257
259 std::vector<char const*> configDiscoverySpec = {};
260 PluginManager::loadFromPlugin<RootObjectReadingCapability, RootObjectReadingCapabilityPlugin>(plugins, factory.capabilities);
261 REQUIRE(factory.capabilities.size() == 2);
262 REQUIRE(factory.capabilities[0].name == "rntuple");
263 REQUIRE(factory.capabilities[1].name == "ttree");
264
265 // Plugins are hardcoded for now...
266 auto format = factory.capabilities[1].factory().format();
267
268 auto fs = std::make_shared<TBufferFileFS>(fileRead, factory);
269
270 arrow::dataset::FileSource source("p", fs);
271 REQUIRE(format->IsSupported(source) == true);
272 auto schemaOpt = format->Inspect(source);
273 REQUIRE(schemaOpt.ok());
274 auto schema = *schemaOpt;
275 REQUIRE(schema->num_fields() == 7);
276 REQUIRE(schema->field(0)->type()->id() == arrow::float32()->id());
277 REQUIRE(schema->field(1)->type()->id() == arrow::float32()->id());
278 REQUIRE(schema->field(2)->type()->id() == arrow::float32()->id());
279 REQUIRE(schema->field(3)->type()->id() == arrow::float64()->id());
280 REQUIRE(schema->field(4)->type()->id() == arrow::int32()->id());
281 REQUIRE(schema->field(5)->type()->id() == arrow::fixed_size_list(arrow::float32(), 3)->id());
282 REQUIRE(schema->field(6)->type()->id() == arrow::fixed_size_list(arrow::int32(), 2)->id());
283 auto fragment = format->MakeFragment(source, {}, schema);
284 REQUIRE(fragment.ok());
285 auto options = std::make_shared<arrow::dataset::ScanOptions>();
286 options->dataset_schema = schema;
287 auto scanner = format->ScanBatchesAsync(options, *fragment);
288 REQUIRE(scanner.ok());
289 auto batches = (*scanner)();
290 auto result = batches.result();
291 REQUIRE(result.ok());
292 REQUIRE((*result)->columns().size() == 7);
293 REQUIRE((*result)->num_rows() == 1000);
294}
295
296bool validateContents(std::shared_ptr<arrow::RecordBatch> batch)
297{
298 {
299 auto int_array = std::static_pointer_cast<arrow::Int32Array>(batch->GetColumnByName("ev"));
300 REQUIRE(int_array->length() == 100);
301 for (int64_t j = 0; j < int_array->length(); j++) {
302 REQUIRE(int_array->Value(j) == j + 1);
303 }
304 }
305
306 {
307 auto list_array = std::static_pointer_cast<arrow::FixedSizeListArray>(batch->GetColumnByName("xyz"));
308
309 REQUIRE(list_array->length() == 100);
310 // Iterate over the FixedSizeListArray
311 for (int64_t i = 0; i < list_array->length(); i++) {
312 auto value_slice = list_array->value_slice(i);
313 auto float_array = std::static_pointer_cast<arrow::FloatArray>(value_slice);
314
315 REQUIRE(float_array->Value(0) == 1);
316 REQUIRE(float_array->Value(1) == 2);
317 REQUIRE(float_array->Value(2) == i + 1);
318 }
319 }
320
321 {
322 auto list_array = std::static_pointer_cast<arrow::FixedSizeListArray>(batch->GetColumnByName("ij"));
323
324 REQUIRE(list_array->length() == 100);
325 // Iterate over the FixedSizeListArray
326 for (int64_t i = 0; i < list_array->length(); i++) {
327 auto value_slice = list_array->value_slice(i);
328 auto int_array = std::static_pointer_cast<arrow::Int32Array>(value_slice);
329 REQUIRE(int_array->Value(0) == i);
330 REQUIRE(int_array->Value(1) == i + 1);
331 }
332 }
333
334 {
335 auto bool_array = std::static_pointer_cast<arrow::BooleanArray>(batch->GetColumnByName("bools"));
336
337 REQUIRE(bool_array->length() == 100);
338 for (int64_t j = 0; j < bool_array->length(); j++) {
339 REQUIRE(bool_array->Value(j) == (j % 3 == 0));
340 }
341 }
342
343 {
344 auto list_array = std::static_pointer_cast<arrow::FixedSizeListArray>(batch->GetColumnByName("manyBools"));
345
346 REQUIRE(list_array->length() == 100);
347 for (int64_t i = 0; i < list_array->length(); i++) {
348 auto value_slice = list_array->value_slice(i);
349 auto bool_array = std::static_pointer_cast<arrow::BooleanArray>(value_slice);
350 REQUIRE(bool_array->Value(0) == (i % 4 == 0));
351 REQUIRE(bool_array->Value(1) == (i % 5 == 0));
352 }
353 }
354
355 {
356 auto list_array = std::static_pointer_cast<arrow::ListArray>(batch->GetColumnByName("vla"));
357
358 REQUIRE(list_array->length() == 100);
359 for (int64_t i = 0; i < list_array->length(); i++) {
360 auto value_slice = list_array->value_slice(i);
361 REQUIRE(value_slice->length() == (i % 10));
362 auto int_array = std::static_pointer_cast<arrow::Int32Array>(value_slice);
363 for (size_t j = 0; j < value_slice->length(); j++) {
364 REQUIRE(int_array->Value(j) == j);
365 }
366 }
367 }
368 return true;
369}
370
371bool validateSchema(std::shared_ptr<arrow::Schema> schema)
372{
373 REQUIRE(schema->num_fields() == 11);
374 REQUIRE(schema->field(0)->type()->id() == arrow::float32()->id());
375 REQUIRE(schema->field(1)->type()->id() == arrow::float32()->id());
376 REQUIRE(schema->field(2)->type()->id() == arrow::float32()->id());
377 REQUIRE(schema->field(3)->type()->id() == arrow::float64()->id());
378 REQUIRE(schema->field(4)->type()->id() == arrow::int32()->id());
379 REQUIRE(schema->field(5)->type()->id() == arrow::fixed_size_list(arrow::float32(), 3)->id());
380 REQUIRE(schema->field(6)->type()->id() == arrow::fixed_size_list(arrow::int32(), 2)->id());
381 REQUIRE(schema->field(7)->type()->id() == arrow::boolean()->id());
382 REQUIRE(schema->field(8)->type()->id() == arrow::fixed_size_list(arrow::boolean(), 2)->id());
383 REQUIRE(schema->field(9)->type()->id() == arrow::list(arrow::int32())->id());
384 REQUIRE(schema->field(10)->type()->id() == arrow::int8()->id());
385 return true;
386}
387
388bool validatePhysicalSchema(std::shared_ptr<arrow::Schema> schema)
389{
390 REQUIRE(schema->num_fields() == 12);
391 REQUIRE(schema->field(0)->type()->id() == arrow::float32()->id());
392 REQUIRE(schema->field(0)->name() == "px");
393 REQUIRE(schema->field(1)->type()->id() == arrow::float32()->id());
394 REQUIRE(schema->field(2)->type()->id() == arrow::float32()->id());
395 REQUIRE(schema->field(3)->type()->id() == arrow::float64()->id());
396 REQUIRE(schema->field(4)->type()->id() == arrow::int32()->id());
397 REQUIRE(schema->field(5)->type()->id() == arrow::fixed_size_list(arrow::float32(), 3)->id());
398 REQUIRE(schema->field(6)->type()->id() == arrow::fixed_size_list(arrow::int32(), 2)->id());
399 REQUIRE(schema->field(7)->type()->id() == arrow::boolean()->id());
400 REQUIRE(schema->field(8)->type()->id() == arrow::fixed_size_list(arrow::boolean(), 2)->id());
401 REQUIRE(schema->field(9)->type()->id() == arrow::int32()->id());
402 REQUIRE(schema->field(10)->type()->id() == arrow::list(arrow::int32())->id());
403 REQUIRE(schema->field(11)->type()->id() == arrow::int8()->id());
404 return true;
405}
406
407TEST_CASE("RootTree2Dataset")
408{
409 using namespace o2::framework;
411 // auto *f = new TFile("Foo.root", "RECREATE");
412 auto* f = new TMemFile("foo", "RECREATE");
413 f->mkdir("DF_1");
414 f->mkdir("DF_2");
415
416 f->cd("DF_1");
417 auto* t = new TTree("tracks", "a simple Tree with simple variables");
418 {
419 Float_t xyz[3];
420 Int_t ij[2];
421 Float_t px = 0, py = 1, pz = 2;
422 Double_t random;
423 Int_t ev;
424 t->Branch("px", &px, "px/F");
425 t->Branch("py", &py, "py/F");
426 t->Branch("pz", &pz, "pz/F");
427 t->Branch("random", &random, "random/D");
428 t->Branch("ev", &ev, "ev/I");
429 t->Branch("xyz", xyz, "xyz[3]/F");
430 t->Branch("ij", ij, "ij[2]/I");
431 // fill the tree
432 for (Int_t i = 0; i < 1000; i++) {
433 xyz[0] = 1;
434 xyz[1] = 2;
435 xyz[2] = 3;
436 gRandom->Rannor(px, py);
437 pz = px * px + py * py;
438 xyz[2] = i + 1;
439 ij[0] = i;
440 ij[1] = i + 1;
441 random = gRandom->Rndm();
442 ev = i + 1;
443 t->Fill();
444 }
445 }
446
447 f->cd("DF_2");
448 t = new TTree("tracks", "a simple Tree with simple variables");
449 {
450 Float_t xyz[3];
451 Int_t ij[2];
452 Float_t px = 0, py = 1, pz = 2;
453 Double_t random;
454 Int_t ev;
455 bool oneBool;
456 bool manyBool[2];
457 int vla[10] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
458 int vlaSize = 0;
459 char byte;
460
461 t->Branch("px", &px, "px/F");
462 t->Branch("py", &py, "py/F");
463 t->Branch("pz", &pz, "pz/F");
464 t->Branch("random", &random, "random/D");
465 t->Branch("ev", &ev, "ev/I");
466 t->Branch("xyz", xyz, "xyz[3]/F");
467 t->Branch("ij", ij, "ij[2]/I");
468 t->Branch("bools", &oneBool, "bools/O");
469 t->Branch("manyBools", &manyBool, "manyBools[2]/O");
470 t->Branch("vla_size", &vlaSize, "vla_size/I");
471 t->Branch("vla", vla, "vla[vla_size]/I");
472 t->Branch("byte", &byte, "byte/B");
473 // fill the tree
474 for (Int_t i = 0; i < 100; i++) {
475 xyz[0] = 1;
476 xyz[1] = 2;
477 xyz[2] = 3;
478 gRandom->Rannor(px, py);
479 pz = px * px + py * py;
480 xyz[2] = i + 1;
481 ij[0] = i;
482 ij[1] = i + 1;
483 random = gRandom->Rndm();
484 ev = i + 1;
485 oneBool = (i % 3 == 0);
486 manyBool[0] = (i % 4 == 0);
487 manyBool[1] = (i % 5 == 0);
488 vlaSize = i % 10;
489 byte = i;
490 t->Fill();
491 }
492 }
493 f->Write();
494
495 std::vector<char const*> capabilitiesSpecs = {
496 "O2Framework:RNTupleObjectReadingCapability",
497 "O2Framework:TTreeObjectReadingCapability",
498 };
499
501
502 std::vector<LoadablePlugin> plugins;
503 for (auto spec : capabilitiesSpecs) {
504 auto morePlugins = PluginManager::parsePluginSpecString(spec);
505 for (auto& extra : morePlugins) {
506 plugins.push_back(extra);
507 }
508 }
509 REQUIRE(plugins.size() == 2);
510
511 PluginManager::loadFromPlugin<RootObjectReadingCapability, RootObjectReadingCapabilityPlugin>(plugins, factory.capabilities);
512
513 REQUIRE(factory.capabilities.size() == 2);
514 REQUIRE(factory.capabilities[0].name == "rntuple");
515 REQUIRE(factory.capabilities[1].name == "ttree");
516
517 // Plugins are hardcoded for now...
518 auto rNtupleFormat = factory.capabilities[0].factory().format();
519 auto format = factory.capabilities[1].factory().format();
520
521 auto fs = std::make_shared<TFileFileSystem>(f, 50 * 1024 * 1024, factory);
522
523 arrow::dataset::FileSource source("DF_2/tracks", fs);
524 REQUIRE(format->IsSupported(source) == true);
525 auto physicalSchema = format->Inspect(source);
526 REQUIRE(physicalSchema.ok());
527 REQUIRE(validatePhysicalSchema(*physicalSchema));
528 // Create the dataset schema rather than using the physical one
529 std::vector<std::shared_ptr<arrow::Field>> fields;
530 for (auto& field : (*(physicalSchema))->fields()) {
531 if (field->name().ends_with("_size")) {
532 continue;
533 }
534 fields.push_back(field);
535 }
536 std::shared_ptr<arrow::Schema> schema = std::make_shared<arrow::Schema>(fields);
537
538 validateSchema(schema);
539
540 auto fragment = format->MakeFragment(source, {}, *physicalSchema);
541 REQUIRE(fragment.ok());
542 auto options = std::make_shared<arrow::dataset::ScanOptions>();
543 options->dataset_schema = schema;
544 auto scanner = format->ScanBatchesAsync(options, *fragment);
545 REQUIRE(scanner.ok());
546
547 // This is batch has deferred contents. Therefore we need to use a DeferredOutputStream to
548 // write it to a real one and read it back with the BufferReader, which is hopefully zero copy
549 std::shared_ptr<arrow::RecordBatch> batch;
550
551 auto batches = (*scanner)();
552 auto result = batches.result();
553 REQUIRE(result.ok());
554 REQUIRE((*result)->columns().size() == 11);
555 REQUIRE((*result)->num_rows() == 100);
556 std::shared_ptr<arrow::ResizableBuffer> buffer = *arrow::AllocateResizableBuffer(1000, 64);
557 auto deferredWriterStream = factory.capabilities[1].factory().deferredOutputStreamer(*fragment, buffer);
558 auto outBatch = arrow::ipc::MakeStreamWriter(deferredWriterStream.get(), schema);
559 auto status = outBatch.ValueOrDie()->WriteRecordBatch(**result);
560 std::shared_ptr<arrow::io::InputStream> bufferReader = std::make_shared<arrow::io::BufferReader>(buffer);
561 auto readerResult = arrow::ipc::RecordBatchStreamReader::Open(bufferReader);
562 auto batchReader = readerResult.ValueOrDie();
563
564 auto next = batchReader->ReadNext(&batch);
565 REQUIRE(batch != nullptr);
566
567 validateContents(batch);
568
569 auto* output = new TMemFile("foo", "RECREATE");
570 auto outFs = std::make_shared<TFileFileSystem>(output, 0, factory);
571
572 // Open a stream at toplevel
573 auto destination = outFs->OpenOutputStream("/", {});
574 REQUIRE(destination.ok());
575
576 // Write to the /DF_3 tree at top level
577 arrow::fs::FileLocator locator{outFs, "/DF_3"};
578 auto writer = format->MakeWriter(*destination, schema, {}, locator);
579 auto success = writer->get()->Write(batch);
580 REQUIRE(batch->schema()->field(0)->name() == "px");
581 auto rootDestination = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(*destination);
582
583 SECTION("Read tree")
584 {
585 REQUIRE(success.ok());
586 // Let's read it back...
587 auto tfileFs = std::dynamic_pointer_cast<TFileFileSystem>(outFs);
588 REQUIRE(tfileFs.get());
589 REQUIRE(tfileFs->GetFile());
590 auto* tree = (TTree*)tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree"));
591 REQUIRE(tree != nullptr);
592 REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetEntries() == 100);
593 REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetName() == std::string("px"));
594
595 arrow::dataset::FileSource source2("/DF_3", outFs);
596
597 REQUIRE(format->IsSupported(source2) == true);
598 tfileFs = std::dynamic_pointer_cast<TFileFileSystem>(source2.filesystem());
599 REQUIRE(tfileFs.get());
600 REQUIRE(tfileFs->GetFile());
601 REQUIRE(tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree")));
602
603 tree = (TTree*)tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree"));
604 REQUIRE(tree != nullptr);
605 REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetEntries() == 100);
606
607 auto schemaOptWritten = format->Inspect(source2);
608 tfileFs = std::dynamic_pointer_cast<TFileFileSystem>(source2.filesystem());
609 REQUIRE(tfileFs.get());
610 REQUIRE(tfileFs->GetFile());
611 REQUIRE(tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree")));
612 REQUIRE(schemaOptWritten.ok());
613 auto schemaWritten = *schemaOptWritten;
614
615 tree = (TTree*)tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree"));
616 REQUIRE(tree != nullptr);
617 REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetEntries() == 100);
618
619 REQUIRE(validatePhysicalSchema(schemaWritten));
620 std::vector<std::shared_ptr<arrow::Field>> fields;
621 for (auto& field : schemaWritten->fields()) {
622 if (field->name().ends_with("_size")) {
623 continue;
624 }
625 fields.push_back(field);
626 }
627 std::shared_ptr<arrow::Schema> schema = std::make_shared<arrow::Schema>(fields);
628 REQUIRE(validateSchema(schema));
629
630 auto fragmentWritten = format->MakeFragment(source2, {}, *physicalSchema);
631 REQUIRE(fragmentWritten.ok());
632 auto optionsWritten = std::make_shared<arrow::dataset::ScanOptions>();
633 optionsWritten->dataset_schema = schema;
634 auto scannerWritten = format->ScanBatchesAsync(optionsWritten, *fragmentWritten);
635 REQUIRE(scannerWritten.ok());
636 tree = (TTree*)tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree"));
637 REQUIRE(tree != nullptr);
638 REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetEntries() == 100);
639 auto batchesWritten = (*scannerWritten)();
640 auto resultWritten = batchesWritten.result();
641 REQUIRE(resultWritten.ok());
642 REQUIRE((*resultWritten)->columns().size() == 11);
643 REQUIRE((*resultWritten)->num_rows() == 100);
644
645 std::shared_ptr<arrow::ResizableBuffer> buffer = *arrow::AllocateResizableBuffer(1000, 64);
646 auto deferredWriterStream2 = factory.capabilities[1].factory().deferredOutputStreamer(*fragmentWritten, buffer);
647 auto outBatch = arrow::ipc::MakeStreamWriter(deferredWriterStream2.get(), schema);
648 auto status = outBatch.ValueOrDie()->WriteRecordBatch(**resultWritten);
649 std::shared_ptr<arrow::io::InputStream> bufferReader = std::make_shared<arrow::io::BufferReader>(buffer);
650 auto readerResult = arrow::ipc::RecordBatchStreamReader::Open(bufferReader);
651 auto batchReader = readerResult.ValueOrDie();
652
653 auto next = batchReader->ReadNext(&batch);
654 REQUIRE(batch != nullptr);
655 validateContents(batch);
656 }
657
658 arrow::fs::FileLocator rnTupleLocator{outFs, "/rntuple"};
659 // We write an RNTuple in the same TMemFile, using /rntuple as a location
660 auto rntupleDestination = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(*destination);
661
662 {
663 auto rNtupleWriter = rNtupleFormat->MakeWriter(*destination, schema, {}, rnTupleLocator);
664 auto rNtupleSuccess = rNtupleWriter->get()->Write(batch);
665 REQUIRE(rNtupleSuccess.ok());
666 }
667
668 // And now we can read back the RNTuple into a RecordBatch
669 arrow::dataset::FileSource writtenRntupleSource("/rntuple", outFs);
670
671 REQUIRE(rNtupleFormat->IsSupported(writtenRntupleSource) == true);
672
673 auto rntupleSchemaOpt = rNtupleFormat->Inspect(writtenRntupleSource);
674 REQUIRE(rntupleSchemaOpt.ok());
675 auto rntupleSchemaWritten = *rntupleSchemaOpt;
676 REQUIRE(validateSchema(rntupleSchemaWritten));
677
678 auto rntupleFragmentWritten = rNtupleFormat->MakeFragment(writtenRntupleSource, {}, rntupleSchemaWritten);
679 REQUIRE(rntupleFragmentWritten.ok());
680 auto rntupleOptionsWritten = std::make_shared<arrow::dataset::ScanOptions>();
681 rntupleOptionsWritten->dataset_schema = rntupleSchemaWritten;
682 auto rntupleScannerWritten = rNtupleFormat->ScanBatchesAsync(rntupleOptionsWritten, *rntupleFragmentWritten);
683 REQUIRE(rntupleScannerWritten.ok());
684 auto rntupleBatchesWritten = (*rntupleScannerWritten)();
685 auto rntupleResultWritten = rntupleBatchesWritten.result();
686 REQUIRE(rntupleResultWritten.ok());
687 REQUIRE((*rntupleResultWritten)->columns().size() == 11);
688 REQUIRE(validateSchema((*rntupleResultWritten)->schema()));
689 REQUIRE((*rntupleResultWritten)->num_rows() == 100);
690 REQUIRE(validateContents(*rntupleResultWritten));
691}
#define DECLARE_SOA_TABLE(_Name_, _Origin_, _Desc_,...)
Definition ASoA.h:3052
#define DECLARE_SOA_COLUMN_FULL(_Name_, _Getter_, _Type_, _Label_)
Definition ASoA.h:2285
int32_t i
Test
Definition Utils.h:55
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
uint32_t j
Definition RawData.h:0
uint32_t c
Definition RawData.h:2
TBranch * ptr
std::shared_ptr< arrow::Table > finalize()
#define CHECK
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLenum array
Definition glcorearb.h:4274
GLdouble f
Definition glcorearb.h:310
GLsizei GLsizei GLchar * source
Definition glcorearb.h:798
GLint GLint GLsizei GLint GLenum format
Definition glcorearb.h:275
GLuint GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat t1
Definition glcorearb.h:5034
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
TEST_CASE("test_prepareArguments")
FIXME: do not use data model tables.
static auto make(TTreeReader &reader, char const *branchName)
static std::vector< LoadablePlugin > parsePluginSpecString(char const *str)
Parse a comma separated list of <library>:<plugin-name> plugin declarations.
std::vector< RootObjectReadingCapability > capabilities
static void convertTTree(TableBuilder &builder, TTreeReader &reader, ReaderHolder< T >... holders)
Use bulk insertion when TTreeReaderValue everywhere.
bool validateContents(std::shared_ptr< arrow::RecordBatch > batch)
bool validateSchema(std::shared_ptr< arrow::Schema > schema)
bool validatePhysicalSchema(std::shared_ptr< arrow::Schema > schema)
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))
std::vector< int > row