Project
Loading...
Searching...
No Matches
test_Root2ArrowTable.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <catch_amalgamated.hpp>
13
15#include "Framework/ASoA.h"
17#include "../src/ArrowDebugHelpers.h"
18
19#include <ROOT/RDataFrame.hxx>
20#include <ROOT/RArrowDS.hxx>
21#include <TBufferFile.h>
22#include <TClass.h>
23#include <TDirectoryFile.h>
24#include <TMemFile.h>
25#include <TDirectory.h>
26#include <TTree.h>
27#include <TRandom.h>
28#include <TFile.h>
29#include <ROOT/RField.hxx>
30#include <ROOT/RNTuple.hxx>
31#include <ROOT/RNTupleDescriptor.hxx>
32#include <ROOT/RNTupleModel.hxx>
33#include <ROOT/RNTupleReader.hxx>
34#include <ROOT/RNTupleUtil.hxx>
35#include <ROOT/RNTupleWriter.hxx>
36#include <memory>
37
38#include <arrow/array/array_primitive.h>
39#include <arrow/array/builder_primitive.h>
40#include <arrow/buffer.h>
41#include <arrow/dataset/scanner.h>
42#include <arrow/record_batch.h>
43#include <arrow/table.h>
44#include <arrow/ipc/writer.h>
45#include <arrow/io/memory.h>
46#include <arrow/ipc/writer.h>
47#include <arrow/ipc/reader.h>
49
50using namespace o2::framework;
51
52namespace o2::aod
53{
54namespace test
55{
56DECLARE_SOA_COLUMN_FULL(Px, px, float, "px");
57DECLARE_SOA_COLUMN_FULL(Py, py, float, "py");
58DECLARE_SOA_COLUMN_FULL(Pz, pz, float, "pz");
59DECLARE_SOA_COLUMN_FULL(Xyz, xyz, float[3], "xyz");
60DECLARE_SOA_COLUMN_FULL(Ij, ij, int[2], "ij");
61DECLARE_SOA_COLUMN_FULL(Random, random, double, "random");
62DECLARE_SOA_COLUMN_FULL(Ev, ev, int, "ev");
63} // namespace test
64
65DECLARE_SOA_TABLE(Test, "AOD", "ETAPHI",
66 test::Px, test::Py, test::Pz, test::Xyz, test::Ij,
67 test::Random, test::Ev);
68} // namespace o2::aod
69
70TEST_CASE("RootTree2Fragment")
71{
72 using namespace o2::framework;
74
76 TBufferFile* file = new TBufferFile(TBuffer::kWrite);
77
78 TTree t1("t1", "a simple Tree with simple variables");
79 Float_t xyz[3];
80 Int_t ij[2];
81 Float_t px = 0, py = 1, pz = 2;
82 Double_t random;
83 Int_t ev;
84 t1.Branch("px", &px, "px/F");
85 t1.Branch("py", &py, "py/F");
86 t1.Branch("pz", &pz, "pz/F");
87 t1.Branch("random", &random, "random/D");
88 t1.Branch("ev", &ev, "ev/I");
89 t1.Branch("xyz", xyz, "xyz[3]/F");
90 t1.Branch("ij", ij, "ij[2]/I");
91 // fill the tree
92 for (Int_t i = 0; i < 1000; i++) {
93 xyz[0] = 1;
94 xyz[1] = 2;
95 xyz[2] = 3;
96 gRandom->Rannor(px, py);
97 pz = px * px + py * py;
98 xyz[2] = i + 1;
99 ij[0] = i;
100 ij[1] = i + 1;
101 random = gRandom->Rndm();
102 ev = i + 1;
103 t1.Fill();
104 }
105 file->WriteObjectAny(&t1, t1.Class());
106 auto* fileRead = new TBufferFile(TBuffer::kRead, file->BufferSize(), file->Buffer(), false, nullptr);
107
108 std::vector<char const*> capabilitiesSpecs = {
109 "O2Framework:RNTupleObjectReadingCapability",
110 "O2Framework:TTreeObjectReadingCapability",
111 };
112
113 std::vector<LoadablePlugin> plugins;
114 for (auto spec : capabilitiesSpecs) {
115 auto morePlugins = PluginManager::parsePluginSpecString(spec);
116 for (auto& extra : morePlugins) {
117 plugins.push_back(extra);
118 }
119 }
120 REQUIRE(plugins.size() == 2);
121
123 std::vector<char const*> configDiscoverySpec = {};
124 PluginManager::loadFromPlugin<RootObjectReadingCapability, RootObjectReadingCapabilityPlugin>(plugins, factory.capabilities);
125 REQUIRE(factory.capabilities.size() == 2);
126 REQUIRE(factory.capabilities[0].name == "rntuple");
127 REQUIRE(factory.capabilities[1].name == "ttree");
128
129 // Plugins are hardcoded for now...
130 auto format = factory.capabilities[1].factory().format();
131
132 auto fs = std::make_shared<TBufferFileFS>(fileRead, factory);
133
134 arrow::dataset::FileSource source("p", fs);
135 REQUIRE(format->IsSupported(source) == true);
136 auto schemaOpt = format->Inspect(source);
137 REQUIRE(schemaOpt.ok());
138 auto schema = *schemaOpt;
139 REQUIRE(schema->num_fields() == 7);
140 REQUIRE(schema->field(0)->type()->id() == arrow::float32()->id());
141 REQUIRE(schema->field(1)->type()->id() == arrow::float32()->id());
142 REQUIRE(schema->field(2)->type()->id() == arrow::float32()->id());
143 REQUIRE(schema->field(3)->type()->id() == arrow::float64()->id());
144 REQUIRE(schema->field(4)->type()->id() == arrow::int32()->id());
145 REQUIRE(schema->field(5)->type()->id() == arrow::fixed_size_list(arrow::float32(), 3)->id());
146 REQUIRE(schema->field(6)->type()->id() == arrow::fixed_size_list(arrow::int32(), 2)->id());
147 auto fragment = format->MakeFragment(source, {}, schema);
148 REQUIRE(fragment.ok());
149 auto options = std::make_shared<arrow::dataset::ScanOptions>();
150 options->dataset_schema = schema;
151 auto scanner = format->ScanBatchesAsync(options, *fragment);
152 REQUIRE(scanner.ok());
153 auto batches = (*scanner)();
154 auto result = batches.result();
155 REQUIRE(result.ok());
156 REQUIRE((*result)->columns().size() == 7);
157 REQUIRE((*result)->num_rows() == 1000);
158}
159
160bool validateContents(std::shared_ptr<arrow::RecordBatch> batch)
161{
162 {
163 auto int_array = std::static_pointer_cast<arrow::Int32Array>(batch->GetColumnByName("ev"));
164 REQUIRE(int_array->length() == 100);
165 for (int64_t j = 0; j < int_array->length(); j++) {
166 REQUIRE(int_array->Value(j) == j + 1);
167 }
168 }
169
170 {
171 auto list_array = std::static_pointer_cast<arrow::FixedSizeListArray>(batch->GetColumnByName("xyz"));
172
173 REQUIRE(list_array->length() == 100);
174 // Iterate over the FixedSizeListArray
175 for (int64_t i = 0; i < list_array->length(); i++) {
176 auto value_slice = list_array->value_slice(i);
177 auto float_array = std::static_pointer_cast<arrow::FloatArray>(value_slice);
178
179 REQUIRE(float_array->Value(0) == 1);
180 REQUIRE(float_array->Value(1) == 2);
181 REQUIRE(float_array->Value(2) == i + 1);
182 }
183 }
184
185 {
186 auto list_array = std::static_pointer_cast<arrow::FixedSizeListArray>(batch->GetColumnByName("ij"));
187
188 REQUIRE(list_array->length() == 100);
189 // Iterate over the FixedSizeListArray
190 for (int64_t i = 0; i < list_array->length(); i++) {
191 auto value_slice = list_array->value_slice(i);
192 auto int_array = std::static_pointer_cast<arrow::Int32Array>(value_slice);
193 REQUIRE(int_array->Value(0) == i);
194 REQUIRE(int_array->Value(1) == i + 1);
195 }
196 }
197
198 {
199 auto bool_array = std::static_pointer_cast<arrow::BooleanArray>(batch->GetColumnByName("bools"));
200
201 REQUIRE(bool_array->length() == 100);
202 for (int64_t j = 0; j < bool_array->length(); j++) {
203 REQUIRE(bool_array->Value(j) == (j % 3 == 0));
204 }
205 }
206
207 {
208 auto list_array = std::static_pointer_cast<arrow::FixedSizeListArray>(batch->GetColumnByName("manyBools"));
209
210 REQUIRE(list_array->length() == 100);
211 for (int64_t i = 0; i < list_array->length(); i++) {
212 auto value_slice = list_array->value_slice(i);
213 auto bool_array = std::static_pointer_cast<arrow::BooleanArray>(value_slice);
214 REQUIRE(bool_array->Value(0) == (i % 4 == 0));
215 REQUIRE(bool_array->Value(1) == (i % 5 == 0));
216 }
217 }
218
219 {
220 auto list_array = std::static_pointer_cast<arrow::ListArray>(batch->GetColumnByName("vla"));
221
222 REQUIRE(list_array->length() == 100);
223 for (int64_t i = 0; i < list_array->length(); i++) {
224 auto value_slice = list_array->value_slice(i);
225 REQUIRE(value_slice->length() == (i % 10));
226 auto int_array = std::static_pointer_cast<arrow::Int32Array>(value_slice);
227 for (size_t j = 0; j < value_slice->length(); j++) {
228 REQUIRE(int_array->Value(j) == j);
229 }
230 }
231 }
232 return true;
233}
234
235bool validateSchema(std::shared_ptr<arrow::Schema> schema)
236{
237 REQUIRE(schema->num_fields() == 11);
238 REQUIRE(schema->field(0)->type()->id() == arrow::float32()->id());
239 REQUIRE(schema->field(1)->type()->id() == arrow::float32()->id());
240 REQUIRE(schema->field(2)->type()->id() == arrow::float32()->id());
241 REQUIRE(schema->field(3)->type()->id() == arrow::float64()->id());
242 REQUIRE(schema->field(4)->type()->id() == arrow::int32()->id());
243 REQUIRE(schema->field(5)->type()->id() == arrow::fixed_size_list(arrow::float32(), 3)->id());
244 REQUIRE(schema->field(6)->type()->id() == arrow::fixed_size_list(arrow::int32(), 2)->id());
245 REQUIRE(schema->field(7)->type()->id() == arrow::boolean()->id());
246 REQUIRE(schema->field(8)->type()->id() == arrow::fixed_size_list(arrow::boolean(), 2)->id());
247 REQUIRE(schema->field(9)->type()->id() == arrow::list(arrow::int32())->id());
248 REQUIRE(schema->field(10)->type()->id() == arrow::int8()->id());
249 return true;
250}
251
252bool validatePhysicalSchema(std::shared_ptr<arrow::Schema> schema)
253{
254 REQUIRE(schema->num_fields() == 12);
255 REQUIRE(schema->field(0)->type()->id() == arrow::float32()->id());
256 REQUIRE(schema->field(0)->name() == "px");
257 REQUIRE(schema->field(1)->type()->id() == arrow::float32()->id());
258 REQUIRE(schema->field(2)->type()->id() == arrow::float32()->id());
259 REQUIRE(schema->field(3)->type()->id() == arrow::float64()->id());
260 REQUIRE(schema->field(4)->type()->id() == arrow::int32()->id());
261 REQUIRE(schema->field(5)->type()->id() == arrow::fixed_size_list(arrow::float32(), 3)->id());
262 REQUIRE(schema->field(6)->type()->id() == arrow::fixed_size_list(arrow::int32(), 2)->id());
263 REQUIRE(schema->field(7)->type()->id() == arrow::boolean()->id());
264 REQUIRE(schema->field(8)->type()->id() == arrow::fixed_size_list(arrow::boolean(), 2)->id());
265 REQUIRE(schema->field(9)->type()->id() == arrow::int32()->id());
266 REQUIRE(schema->field(10)->type()->id() == arrow::list(arrow::int32())->id());
267 REQUIRE(schema->field(11)->type()->id() == arrow::int8()->id());
268 return true;
269}
270
271TEST_CASE("RootTree2Dataset")
272{
273 using namespace o2::framework;
275 // auto *f = new TFile("Foo.root", "RECREATE");
276 auto* f = new TMemFile("foo", "RECREATE");
277 f->mkdir("DF_1");
278 f->mkdir("DF_2");
279
280 f->cd("DF_1");
281 auto* t = new TTree("tracks", "a simple Tree with simple variables");
282 {
283 Float_t xyz[3];
284 Int_t ij[2];
285 Float_t px = 0, py = 1, pz = 2;
286 Double_t random;
287 Int_t ev;
288 t->Branch("px", &px, "px/F");
289 t->Branch("py", &py, "py/F");
290 t->Branch("pz", &pz, "pz/F");
291 t->Branch("random", &random, "random/D");
292 t->Branch("ev", &ev, "ev/I");
293 t->Branch("xyz", xyz, "xyz[3]/F");
294 t->Branch("ij", ij, "ij[2]/I");
295 // fill the tree
296 for (Int_t i = 0; i < 1000; i++) {
297 xyz[0] = 1;
298 xyz[1] = 2;
299 xyz[2] = 3;
300 gRandom->Rannor(px, py);
301 pz = px * px + py * py;
302 xyz[2] = i + 1;
303 ij[0] = i;
304 ij[1] = i + 1;
305 random = gRandom->Rndm();
306 ev = i + 1;
307 t->Fill();
308 }
309 }
310
311 f->cd("DF_2");
312 t = new TTree("tracks", "a simple Tree with simple variables");
313 {
314 Float_t xyz[3];
315 Int_t ij[2];
316 Float_t px = 0, py = 1, pz = 2;
317 Double_t random;
318 Int_t ev;
319 bool oneBool;
320 bool manyBool[2];
321 int vla[10] = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
322 int vlaSize = 0;
323 char byte;
324
325 t->Branch("px", &px, "px/F");
326 t->Branch("py", &py, "py/F");
327 t->Branch("pz", &pz, "pz/F");
328 t->Branch("random", &random, "random/D");
329 t->Branch("ev", &ev, "ev/I");
330 t->Branch("xyz", xyz, "xyz[3]/F");
331 t->Branch("ij", ij, "ij[2]/I");
332 t->Branch("bools", &oneBool, "bools/O");
333 t->Branch("manyBools", &manyBool, "manyBools[2]/O");
334 t->Branch("vla_size", &vlaSize, "vla_size/I");
335 t->Branch("vla", vla, "vla[vla_size]/I");
336 t->Branch("byte", &byte, "byte/B");
337 // fill the tree
338 for (Int_t i = 0; i < 100; i++) {
339 xyz[0] = 1;
340 xyz[1] = 2;
341 xyz[2] = 3;
342 gRandom->Rannor(px, py);
343 pz = px * px + py * py;
344 xyz[2] = i + 1;
345 ij[0] = i;
346 ij[1] = i + 1;
347 random = gRandom->Rndm();
348 ev = i + 1;
349 oneBool = (i % 3 == 0);
350 manyBool[0] = (i % 4 == 0);
351 manyBool[1] = (i % 5 == 0);
352 vlaSize = i % 10;
353 byte = i;
354 t->Fill();
355 }
356 }
357 f->Write();
358
359 std::vector<char const*> capabilitiesSpecs = {
360 "O2Framework:RNTupleObjectReadingCapability",
361 "O2Framework:TTreeObjectReadingCapability",
362 };
363
365
366 std::vector<LoadablePlugin> plugins;
367 for (auto spec : capabilitiesSpecs) {
368 auto morePlugins = PluginManager::parsePluginSpecString(spec);
369 for (auto& extra : morePlugins) {
370 plugins.push_back(extra);
371 }
372 }
373 REQUIRE(plugins.size() == 2);
374
375 PluginManager::loadFromPlugin<RootObjectReadingCapability, RootObjectReadingCapabilityPlugin>(plugins, factory.capabilities);
376
377 REQUIRE(factory.capabilities.size() == 2);
378 REQUIRE(factory.capabilities[0].name == "rntuple");
379 REQUIRE(factory.capabilities[1].name == "ttree");
380
381 // Plugins are hardcoded for now...
382 auto rNtupleFormat = factory.capabilities[0].factory().format();
383 auto format = factory.capabilities[1].factory().format();
384
385 auto fs = std::make_shared<TFileFileSystem>(f, 50 * 1024 * 1024, factory);
386
387 arrow::dataset::FileSource source("DF_2/tracks", fs);
388 REQUIRE(format->IsSupported(source) == true);
389 auto physicalSchema = format->Inspect(source);
390 REQUIRE(physicalSchema.ok());
391 REQUIRE(validatePhysicalSchema(*physicalSchema));
392 // Create the dataset schema rather than using the physical one
393 std::vector<std::shared_ptr<arrow::Field>> fields;
394 for (auto& field : (*(physicalSchema))->fields()) {
395 if (field->name().ends_with("_size")) {
396 continue;
397 }
398 fields.push_back(field);
399 }
400 std::shared_ptr<arrow::Schema> schema = std::make_shared<arrow::Schema>(fields);
401
402 validateSchema(schema);
403
404 auto fragment = format->MakeFragment(source, {}, *physicalSchema);
405 REQUIRE(fragment.ok());
406 auto options = std::make_shared<arrow::dataset::ScanOptions>();
407 options->dataset_schema = schema;
408 auto scanner = format->ScanBatchesAsync(options, *fragment);
409 REQUIRE(scanner.ok());
410
411 // This is batch has deferred contents. Therefore we need to use a DeferredOutputStream to
412 // write it to a real one and read it back with the BufferReader, which is hopefully zero copy
413 std::shared_ptr<arrow::RecordBatch> batch;
414
415 auto batches = (*scanner)();
416 auto result = batches.result();
417 REQUIRE(result.ok());
418 REQUIRE((*result)->columns().size() == 11);
419 REQUIRE((*result)->num_rows() == 100);
420 std::shared_ptr<arrow::ResizableBuffer> buffer = *arrow::AllocateResizableBuffer(1000, 64);
421 auto deferredWriterStream = factory.capabilities[1].factory().deferredOutputStreamer(*fragment, buffer);
422 auto outBatch = arrow::ipc::MakeStreamWriter(deferredWriterStream.get(), schema);
423 auto status = outBatch.ValueOrDie()->WriteRecordBatch(**result);
424 std::shared_ptr<arrow::io::InputStream> bufferReader = std::make_shared<arrow::io::BufferReader>(buffer);
425 auto readerResult = arrow::ipc::RecordBatchStreamReader::Open(bufferReader);
426 auto batchReader = readerResult.ValueOrDie();
427
428 auto next = batchReader->ReadNext(&batch);
429 REQUIRE(batch != nullptr);
430
431 validateContents(batch);
432
433 auto* output = new TMemFile("foo", "RECREATE");
434 auto outFs = std::make_shared<TFileFileSystem>(output, 0, factory);
435
436 // Open a stream at toplevel
437 auto destination = outFs->OpenOutputStream("/", {});
438 REQUIRE(destination.ok());
439
440 // Write to the /DF_3 tree at top level
441 arrow::fs::FileLocator locator{outFs, "/DF_3"};
442 auto writer = format->MakeWriter(*destination, schema, {}, locator);
443 auto success = writer->get()->Write(batch);
444 REQUIRE(batch->schema()->field(0)->name() == "px");
445 auto rootDestination = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(*destination);
446
447 SECTION("Read tree")
448 {
449 REQUIRE(success.ok());
450 // Let's read it back...
451 auto tfileFs = std::dynamic_pointer_cast<TFileFileSystem>(outFs);
452 REQUIRE(tfileFs.get());
453 REQUIRE(tfileFs->GetFile());
454 auto* tree = (TTree*)tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree"));
455 REQUIRE(tree != nullptr);
456 REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetEntries() == 100);
457 REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetName() == std::string("px"));
458
459 arrow::dataset::FileSource source2("/DF_3", outFs);
460
461 REQUIRE(format->IsSupported(source2) == true);
462 tfileFs = std::dynamic_pointer_cast<TFileFileSystem>(source2.filesystem());
463 REQUIRE(tfileFs.get());
464 REQUIRE(tfileFs->GetFile());
465 REQUIRE(tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree")));
466
467 tree = (TTree*)tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree"));
468 REQUIRE(tree != nullptr);
469 REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetEntries() == 100);
470
471 auto schemaOptWritten = format->Inspect(source2);
472 tfileFs = std::dynamic_pointer_cast<TFileFileSystem>(source2.filesystem());
473 REQUIRE(tfileFs.get());
474 REQUIRE(tfileFs->GetFile());
475 REQUIRE(tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree")));
476 REQUIRE(schemaOptWritten.ok());
477 auto schemaWritten = *schemaOptWritten;
478
479 tree = (TTree*)tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree"));
480 REQUIRE(tree != nullptr);
481 REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetEntries() == 100);
482
483 REQUIRE(validatePhysicalSchema(schemaWritten));
484 std::vector<std::shared_ptr<arrow::Field>> fields;
485 for (auto& field : schemaWritten->fields()) {
486 if (field->name().ends_with("_size")) {
487 continue;
488 }
489 fields.push_back(field);
490 }
491 std::shared_ptr<arrow::Schema> schema = std::make_shared<arrow::Schema>(fields);
492 REQUIRE(validateSchema(schema));
493
494 auto fragmentWritten = format->MakeFragment(source2, {}, *physicalSchema);
495 REQUIRE(fragmentWritten.ok());
496 auto optionsWritten = std::make_shared<arrow::dataset::ScanOptions>();
497 optionsWritten->dataset_schema = schema;
498 auto scannerWritten = format->ScanBatchesAsync(optionsWritten, *fragmentWritten);
499 REQUIRE(scannerWritten.ok());
500 tree = (TTree*)tfileFs->GetFile()->GetObjectChecked("/DF_3", TClass::GetClass("TTree"));
501 REQUIRE(tree != nullptr);
502 REQUIRE(((TBranch*)tree->GetListOfBranches()->At(0))->GetEntries() == 100);
503 auto batchesWritten = (*scannerWritten)();
504 auto resultWritten = batchesWritten.result();
505 REQUIRE(resultWritten.ok());
506 REQUIRE((*resultWritten)->columns().size() == 11);
507 REQUIRE((*resultWritten)->num_rows() == 100);
508
509 std::shared_ptr<arrow::ResizableBuffer> buffer = *arrow::AllocateResizableBuffer(1000, 64);
510 auto deferredWriterStream2 = factory.capabilities[1].factory().deferredOutputStreamer(*fragmentWritten, buffer);
511 auto outBatch = arrow::ipc::MakeStreamWriter(deferredWriterStream2.get(), schema);
512 auto status = outBatch.ValueOrDie()->WriteRecordBatch(**resultWritten);
513 std::shared_ptr<arrow::io::InputStream> bufferReader = std::make_shared<arrow::io::BufferReader>(buffer);
514 auto readerResult = arrow::ipc::RecordBatchStreamReader::Open(bufferReader);
515 auto batchReader = readerResult.ValueOrDie();
516
517 auto next = batchReader->ReadNext(&batch);
518 REQUIRE(batch != nullptr);
519 validateContents(batch);
520 }
521
522 arrow::fs::FileLocator rnTupleLocator{outFs, "/rntuple"};
523 // We write an RNTuple in the same TMemFile, using /rntuple as a location
524 auto rntupleDestination = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(*destination);
525
526 {
527 auto rNtupleWriter = rNtupleFormat->MakeWriter(*destination, schema, {}, rnTupleLocator);
528 auto rNtupleSuccess = rNtupleWriter->get()->Write(batch);
529 REQUIRE(rNtupleSuccess.ok());
530 }
531
532 // And now we can read back the RNTuple into a RecordBatch
533 arrow::dataset::FileSource writtenRntupleSource("/rntuple", outFs);
534
535 REQUIRE(rNtupleFormat->IsSupported(writtenRntupleSource) == true);
536
537 auto rntupleSchemaOpt = rNtupleFormat->Inspect(writtenRntupleSource);
538 REQUIRE(rntupleSchemaOpt.ok());
539 auto rntupleSchemaWritten = *rntupleSchemaOpt;
540 REQUIRE(validateSchema(rntupleSchemaWritten));
541
542 auto rntupleFragmentWritten = rNtupleFormat->MakeFragment(writtenRntupleSource, {}, rntupleSchemaWritten);
543 REQUIRE(rntupleFragmentWritten.ok());
544 auto rntupleOptionsWritten = std::make_shared<arrow::dataset::ScanOptions>();
545 rntupleOptionsWritten->dataset_schema = rntupleSchemaWritten;
546 auto rntupleScannerWritten = rNtupleFormat->ScanBatchesAsync(rntupleOptionsWritten, *rntupleFragmentWritten);
547 REQUIRE(rntupleScannerWritten.ok());
548 auto rntupleBatchesWritten = (*rntupleScannerWritten)();
549 auto rntupleResultWritten = rntupleBatchesWritten.result();
550 REQUIRE(rntupleResultWritten.ok());
551 REQUIRE((*rntupleResultWritten)->columns().size() == 11);
552 REQUIRE(validateSchema((*rntupleResultWritten)->schema()));
553 REQUIRE((*rntupleResultWritten)->num_rows() == 100);
554 REQUIRE(validateContents(*rntupleResultWritten));
555}
#define DECLARE_SOA_TABLE(_Name_, _Origin_, _Desc_,...)
Definition ASoA.h:3092
#define DECLARE_SOA_COLUMN_FULL(_Name_, _Getter_, _Type_, _Label_)
Definition ASoA.h:2293
int32_t i
Test
Definition Utils.h:55
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
uint32_t j
Definition RawData.h:0
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLdouble f
Definition glcorearb.h:310
GLsizei GLsizei GLchar * source
Definition glcorearb.h:798
GLint GLint GLsizei GLint GLenum format
Definition glcorearb.h:275
GLuint GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat t1
Definition glcorearb.h:5034
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
TEST_CASE("test_prepareArguments")
FIXME: do not use data model tables.
static std::vector< LoadablePlugin > parsePluginSpecString(char const *str)
Parse a comma separated list of <library>:<plugin-name> plugin declarations.
std::vector< RootObjectReadingCapability > capabilities
bool validateContents(std::shared_ptr< arrow::RecordBatch > batch)
bool validateSchema(std::shared_ptr< arrow::Schema > schema)
bool validatePhysicalSchema(std::shared_ptr< arrow::Schema > schema)
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))