Project
Loading...
Searching...
No Matches
RNTuplePlugin.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14#include "Framework/Plugins.h"
16#include <ROOT/RNTupleModel.hxx>
17#include <ROOT/RNTupleWriteOptions.hxx>
18#include <ROOT/RNTupleWriter.hxx>
19#include <ROOT/RField.hxx>
20#include <ROOT/RNTuple.hxx>
21#include <ROOT/RNTupleReader.hxx>
22#include <ROOT/RFieldVisitor.hxx>
23#include <ROOT/RNTupleInspector.hxx>
24#include <ROOT/RVec.hxx>
25#include <memory>
26#include <TBufferFile.h>
27
28#include <TDirectory.h>
29#include <arrow/array/array_nested.h>
30#include <arrow/array/array_primitive.h>
31#include <arrow/array/builder_nested.h>
32#include <arrow/array/builder_primitive.h>
33#include <arrow/dataset/file_base.h>
34
35template class
36 std::unique_ptr<ROOT::Experimental::RNTupleReader>;
37
38namespace o2::framework
39{
40
42{
43 public:
44 RNTupleFileWriteOptions(std::shared_ptr<arrow::dataset::FileFormat> format)
45 : FileWriteOptions(format)
46 {
47 }
48};
49
50// A filesystem which allows me to get a RNTuple
52{
53 public:
55
56 virtual ROOT::Experimental::RNTuple* GetRNTuple(arrow::dataset::FileSource source) = 0;
57};
58
60{
61 public:
62 SingleRNTupleFileSystem(ROOT::Experimental::RNTuple* tuple)
64 mTuple(tuple)
65 {
66 }
67
68 arrow::Result<arrow::fs::FileInfo> GetFileInfo(std::string const& path) override;
69
70 std::string type_name() const override
71 {
72 return "rntuple";
73 }
74
75 ROOT::Experimental::RNTuple* GetRNTuple(arrow::dataset::FileSource) override
76 {
77 // Simply return the only TTree we have
78 return mTuple;
79 }
80
81 private:
82 ROOT::Experimental::RNTuple* mTuple;
83};
84
85arrow::Result<arrow::fs::FileInfo> SingleRNTupleFileSystem::GetFileInfo(std::string const& path)
86{
87 arrow::dataset::FileSource source(path, shared_from_this());
88 arrow::fs::FileInfo result;
89 result.set_path(path);
90 result.set_type(arrow::fs::FileType::File);
91 return result;
92}
93
95{
96 public:
97 RNTupleFileFragment(arrow::dataset::FileSource source,
98 std::shared_ptr<arrow::dataset::FileFormat> format,
99 arrow::compute::Expression partition_expression,
100 std::shared_ptr<arrow::Schema> physical_schema)
101 : FileFragment(source, format, partition_expression, physical_schema)
102 {
103 auto fs = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(source.filesystem());
104 if (!fs.get()) {
105 throw runtime_error_f("Do not know how to extract %s from %s", source.path().c_str(), fs->type_name().c_str());
106 }
107 auto handler = fs->GetObjectHandler(source);
108 if (!handler->format->Equals(*format)) {
109 throw runtime_error_f("Format for %s does not match. Found %s, expected %s.", source.path().c_str(),
110 handler->format->type_name().c_str(),
111 format->type_name().c_str());
112 }
113 mNTuple = handler->GetObjectAsOwner<ROOT::Experimental::RNTuple>();
114 }
115
116 ROOT::Experimental::RNTuple* GetRNTuple()
117 {
118 return mNTuple.get();
119 }
120
121 private:
122 std::unique_ptr<ROOT::Experimental::RNTuple> mNTuple;
123};
124
126{
127 size_t& mTotCompressedSize;
128 size_t& mTotUncompressedSize;
129
130 public:
131 RNTupleFileFormat(size_t& totalCompressedSize, size_t& totalUncompressedSize)
132 : FileFormat({}),
133 mTotCompressedSize(totalCompressedSize),
134 mTotUncompressedSize(totalUncompressedSize)
135 {
136 }
137
138 ~RNTupleFileFormat() override = default;
139
140 std::string type_name() const override
141 {
142 return "rntuple";
143 }
144
145 bool Equals(const FileFormat& other) const override
146 {
147 return other.type_name() == this->type_name();
148 }
149
150 arrow::Result<bool> IsSupported(const arrow::dataset::FileSource& source) const override
151 {
152 auto fs = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(source.filesystem());
153 if (!fs) {
154 return false;
155 }
156 return fs->CheckSupport(source);
157 }
158
159 arrow::Result<std::shared_ptr<arrow::Schema>> Inspect(const arrow::dataset::FileSource& source) const override;
160
161 arrow::Result<arrow::RecordBatchGenerator> ScanBatchesAsync(
162 const std::shared_ptr<arrow::dataset::ScanOptions>& options,
163 const std::shared_ptr<arrow::dataset::FileFragment>& fragment) const override;
164
165 std::shared_ptr<arrow::dataset::FileWriteOptions> DefaultWriteOptions() override;
166
167 arrow::Result<std::shared_ptr<arrow::dataset::FileWriter>> MakeWriter(std::shared_ptr<arrow::io::OutputStream> destination,
168 std::shared_ptr<arrow::Schema> schema,
169 std::shared_ptr<arrow::dataset::FileWriteOptions> options,
170 arrow::fs::FileLocator destination_locator) const override;
171 arrow::Result<std::shared_ptr<arrow::dataset::FileFragment>> MakeFragment(
172 arrow::dataset::FileSource source, arrow::compute::Expression partition_expression,
173 std::shared_ptr<arrow::Schema> physical_schema) override;
174};
175
176struct RootNTupleVisitor : public ROOT::Experimental::Detail::RFieldVisitor {
177 void VisitArrayField(const ROOT::Experimental::RArrayField& field) override
178 {
179 int size = field.GetLength();
180 RootNTupleVisitor valueVisitor{};
181 auto valueField = field.GetSubFields()[0];
182 valueField->AcceptVisitor(valueVisitor);
183 auto type = valueVisitor.datatype;
184 this->datatype = arrow::fixed_size_list(type, size);
185 }
186
187 void VisitRVecField(const ROOT::Experimental::RRVecField& field) override
188 {
189 RootNTupleVisitor valueVisitor{};
190 auto valueField = field.GetSubFields()[0];
191 valueField->AcceptVisitor(valueVisitor);
192 auto type = valueVisitor.datatype;
193 this->datatype = arrow::list(type);
194 }
195
196 void VisitField(const ROOT::Experimental::RFieldBase& field) override
197 {
198 throw o2::framework::runtime_error_f("Unknown field %s with type %s", field.GetFieldName().c_str(), field.GetTypeName().c_str());
199 }
200
201 void VisitIntField(const ROOT::Experimental::RField<int>& field) override
202 {
203 this->datatype = arrow::int32();
204 }
205
206 void VisitInt8Field(const ROOT::Experimental::RField<std::int8_t>& field) override
207 {
208 this->datatype = arrow::int8();
209 }
210
211 void VisitInt16Field(const ROOT::Experimental::RField<std::int16_t>& field) override
212 {
213 this->datatype = arrow::int16();
214 }
215
216 void VisitUInt32Field(const ROOT::Experimental::RField<std::uint32_t>& field) override
217 {
218 this->datatype = arrow::uint32();
219 }
220
221 void VisitUInt8Field(const ROOT::Experimental::RField<std::uint8_t>& field) override
222 {
223 this->datatype = arrow::uint8();
224 }
225
226 void VisitUInt16Field(const ROOT::Experimental::RField<std::uint16_t>& field) override
227 {
228 this->datatype = arrow::int16();
229 }
230
231 void VisitBoolField(const ROOT::Experimental::RField<bool>& field) override
232 {
233 this->datatype = arrow::boolean();
234 }
235
236 void VisitFloatField(const ROOT::Experimental::RField<float>& field) override
237 {
238 this->datatype = arrow::float32();
239 }
240
241 void VisitDoubleField(const ROOT::Experimental::RField<double>& field) override
242 {
243 this->datatype = arrow::float64();
244 }
245 std::shared_ptr<arrow::DataType> datatype;
246};
247} // namespace o2::framework
248
249auto arrowTypeFromRNTuple(ROOT::Experimental::RFieldBase const& field, int size)
250{
252 field.AcceptVisitor(visitor);
253 return visitor.datatype;
254}
255
256namespace o2::framework
257{
258std::unique_ptr<ROOT::Experimental::RFieldBase> rootFieldFromArrow(std::shared_ptr<arrow::Field> field, std::string name)
259{
260 using namespace ROOT::Experimental;
261 switch (field->type()->id()) {
262 case arrow::Type::BOOL:
263 return std::make_unique<RField<bool>>(name);
264 case arrow::Type::UINT8:
265 return std::make_unique<RField<uint8_t>>(name);
266 case arrow::Type::UINT16:
267 return std::make_unique<RField<uint16_t>>(name);
268 case arrow::Type::UINT32:
269 return std::make_unique<RField<uint32_t>>(name);
270 case arrow::Type::UINT64:
271 return std::make_unique<RField<uint64_t>>(name);
272 case arrow::Type::INT8:
273 return std::make_unique<RField<int8_t>>(name);
274 case arrow::Type::INT16:
275 return std::make_unique<RField<int16_t>>(name);
276 case arrow::Type::INT32:
277 return std::make_unique<RField<int32_t>>(name);
278 case arrow::Type::INT64:
279 return std::make_unique<RField<int64_t>>(name);
280 case arrow::Type::FLOAT:
281 return std::make_unique<RField<float>>(name);
282 case arrow::Type::DOUBLE:
283 return std::make_unique<RField<double>>(name);
284 case arrow::Type::STRING:
285 return std::make_unique<RField<std::string>>(name);
286 default:
287 throw runtime_error("Unsupported arrow column type");
288 }
289}
290
292{
293 std::shared_ptr<ROOT::Experimental::RNTupleWriter> mWriter;
294 bool firstBatch = true;
295 std::vector<std::shared_ptr<arrow::Array>> valueArrays;
296 std::vector<std::shared_ptr<arrow::DataType>> valueTypes;
297 std::vector<size_t> valueCount;
298
299 public:
300 RNTupleFileWriter(std::shared_ptr<arrow::Schema> schema, std::shared_ptr<arrow::dataset::FileWriteOptions> options,
301 std::shared_ptr<arrow::io::OutputStream> destination,
302 arrow::fs::FileLocator destination_locator)
303 : FileWriter(schema, options, destination, destination_locator)
304 {
305 using namespace ROOT::Experimental;
306
307 auto model = RNTupleModel::CreateBare();
308 // Let's create a model from the physical schema
309 for (auto i = 0u; i < schema->fields().size(); ++i) {
310 auto& field = schema->field(i);
311
312 // Construct all the needed branches.
313 switch (field->type()->id()) {
314 case arrow::Type::FIXED_SIZE_LIST: {
315 auto list = std::static_pointer_cast<arrow::FixedSizeListType>(field->type());
316 auto valueField = field->type()->field(0);
317 model->AddField(std::make_unique<RArrayField>(field->name(), rootFieldFromArrow(valueField, "_0"), list->list_size()));
318 } break;
319 case arrow::Type::LIST: {
320 auto valueField = field->type()->field(0);
321 model->AddField(std::make_unique<RRVecField>(field->name(), rootFieldFromArrow(valueField, "_0")));
322 } break;
323 default: {
324 model->AddField(rootFieldFromArrow(field, field->name()));
325 } break;
326 }
327 }
328 auto fileStream = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(destination_);
329 auto* file = dynamic_cast<TFile*>(fileStream->GetDirectory());
330 mWriter = RNTupleWriter::Append(std::move(model), destination_locator_.path, *file, {});
331 }
332
333 arrow::Status Write(const std::shared_ptr<arrow::RecordBatch>& batch) override
334 {
335 if (firstBatch) {
336 firstBatch = false;
337 }
338
339 // Support writing empty tables
340 if (batch->columns().empty() || batch->num_rows() == 0) {
341 return arrow::Status::OK();
342 }
343
344 for (auto i = 0u; i < batch->columns().size(); ++i) {
345 auto column = batch->column(i);
346 auto& field = batch->schema()->field(i);
347
348 valueArrays.push_back(nullptr);
349 valueTypes.push_back(nullptr);
350 valueCount.push_back(1);
351
352 switch (field->type()->id()) {
353 case arrow::Type::FIXED_SIZE_LIST: {
354 auto list = std::static_pointer_cast<arrow::FixedSizeListArray>(column);
355 auto listType = std::static_pointer_cast<arrow::FixedSizeListType>(field->type());
356 if (field->type()->field(0)->type()->id() == arrow::Type::BOOL) {
357 auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(list->values());
358 int64_t length = boolArray->length();
359 arrow::UInt8Builder builder;
360 auto ok = builder.Reserve(length);
361
362 for (int64_t i = 0; i < length; ++i) {
363 if (boolArray->IsValid(i)) {
364 // Expand each boolean value (true/false) to uint8 (1/0)
365 uint8_t value = boolArray->Value(i) ? 1 : 0;
366 auto ok = builder.Append(value);
367 } else {
368 // Append null for invalid entries
369 auto ok = builder.AppendNull();
370 }
371 }
372 valueArrays.back() = *builder.Finish();
373 valueTypes.back() = valueArrays.back()->type();
374 } else {
375 valueArrays.back() = list->values();
376 valueTypes.back() = field->type()->field(0)->type();
377 }
378 valueCount.back() = listType->list_size();
379 } break;
380 case arrow::Type::LIST: {
381 auto list = std::static_pointer_cast<arrow::ListArray>(column);
382 valueArrays.back() = list;
383 valueTypes.back() = field->type()->field(0)->type();
384 valueCount.back() = -1;
385 } break;
386 case arrow::Type::BOOL: {
387 // We unpack the array
388 auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(column);
389 int64_t length = boolArray->length();
390 arrow::UInt8Builder builder;
391 auto ok = builder.Reserve(length);
392
393 for (int64_t i = 0; i < length; ++i) {
394 if (boolArray->IsValid(i)) {
395 // Expand each boolean value (true/false) to uint8 (1/0)
396 uint8_t value = boolArray->Value(i) ? 1 : 0;
397 auto ok = builder.Append(value);
398 } else {
399 // Append null for invalid entries
400 auto ok = builder.AppendNull();
401 }
402 }
403 valueArrays.back() = *builder.Finish();
404 valueTypes.back() = valueArrays.back()->type();
405 } break;
406 default:
407 valueArrays.back() = column;
408 valueTypes.back() = field->type();
409 break;
410 }
411 }
412
413 int64_t pos = 0;
414
415 auto entry = mWriter->CreateEntry();
416 std::vector<ROOT::Experimental::REntry::RFieldToken> tokens;
417 tokens.reserve(batch->num_columns());
418 std::vector<size_t> typeIds;
419 typeIds.reserve(batch->num_columns());
420
421 for (size_t ci = 0; ci < batch->num_columns(); ++ci) {
422 auto& field = batch->schema()->field(ci);
423 typeIds.push_back(batch->column(ci)->type()->id());
424 tokens.push_back(entry->GetToken(field->name()));
425 }
426
427 while (pos < batch->num_rows()) {
428 for (size_t ci = 0; ci < batch->num_columns(); ++ci) {
429 auto typeId = typeIds[ci];
430 auto token = tokens[ci];
431
432 switch (typeId) {
433 case arrow::Type::LIST: {
434 auto list = std::static_pointer_cast<arrow::ListArray>(valueArrays[ci]);
435 auto value_slice = list->value_slice(pos);
436
437 valueCount[ci] = value_slice->length();
438 auto bindValue = [&vc = valueCount, ci, token](auto array, std::unique_ptr<ROOT::Experimental::REntry>& entry) -> void {
439 using value_type = std::decay_t<decltype(*array.get())>::value_type;
440 auto v = std::make_shared<ROOT::VecOps::RVec<value_type>>((value_type*)array->raw_values(), vc[ci]);
441 entry->BindValue(token, v);
442 };
443 switch (valueTypes[ci]->id()) {
444 case arrow::Type::FLOAT: {
445 bindValue(std::static_pointer_cast<arrow::FloatArray>(value_slice), entry);
446 } break;
447 case arrow::Type::DOUBLE: {
448 bindValue(std::static_pointer_cast<arrow::DoubleArray>(value_slice), entry);
449 } break;
450 case arrow::Type::INT8: {
451 bindValue(std::static_pointer_cast<arrow::Int8Array>(value_slice), entry);
452 } break;
453 case arrow::Type::INT16: {
454 bindValue(std::static_pointer_cast<arrow::Int16Array>(value_slice), entry);
455 } break;
456 case arrow::Type::INT32: {
457 bindValue(std::static_pointer_cast<arrow::Int32Array>(value_slice), entry);
458 } break;
459 case arrow::Type::INT64: {
460 bindValue(std::static_pointer_cast<arrow::Int64Array>(value_slice), entry);
461 } break;
462 case arrow::Type::UINT8: {
463 bindValue(std::static_pointer_cast<arrow::UInt8Array>(value_slice), entry);
464 } break;
465 case arrow::Type::UINT16: {
466 bindValue(std::static_pointer_cast<arrow::UInt16Array>(value_slice), entry);
467 } break;
468 case arrow::Type::UINT32: {
469 bindValue(std::static_pointer_cast<arrow::UInt32Array>(value_slice), entry);
470 } break;
471 case arrow::Type::UINT64: {
472 bindValue(std::static_pointer_cast<arrow::UInt64Array>(value_slice), entry);
473 } break;
474 default: {
475 throw runtime_error("Unsupported kind of VLA");
476 } break;
477 }
478 } break;
479 case arrow::Type::FIXED_SIZE_LIST: {
480 entry->BindRawPtr<void>(token, (void*)(valueArrays[ci]->data()->buffers[1]->data() + pos * valueCount[ci] * valueTypes[ci]->byte_width()));
481 } break;
482 case arrow::Type::BOOL: {
483 // Not sure we actually need this
484 entry->BindRawPtr<bool>(token, (bool*)(valueArrays[ci]->data()->buffers[1]->data() + pos * 1));
485 } break;
486 default:
487 // By default we consider things scalars.
488 entry->BindRawPtr<void>(token, (void*)(valueArrays[ci]->data()->buffers[1]->data() + pos * valueTypes[ci]->byte_width()));
489 break;
490 }
491 }
492 mWriter->Fill(*entry);
493 ++pos;
494 }
495 // mWriter->CommitCluster();
496
497 return arrow::Status::OK();
498 }
499
500 arrow::Future<>
501 FinishInternal() override
502 {
503 return {};
504 };
505};
506
507arrow::Result<std::shared_ptr<arrow::Schema>> RNTupleFileFormat::Inspect(const arrow::dataset::FileSource& source) const
508{
509
510 auto fs = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(source.filesystem());
511 // Actually get the TTree from the ROOT file.
512 auto objectHandler = fs->GetObjectHandler(source);
513 if (objectHandler->format->type_name() != this->type_name()) {
514 throw runtime_error_f("Unexpected kind of filesystem %s to handle payload %s.\n", source.filesystem()->type_name().c_str(), source.path().c_str());
515 }
516 // We know this is a RNTuple, so we can continue with the inspection.
517 auto rntuple = objectHandler->GetObjectAsOwner<ROOT::Experimental::RNTuple>().release();
518
519 auto inspector = ROOT::Experimental::RNTupleInspector::Create(rntuple);
520
521 auto reader = ROOT::Experimental::RNTupleReader::Open(rntuple);
522
523 auto& tupleField0 = reader->GetModel().GetFieldZero();
524 std::vector<std::shared_ptr<arrow::Field>> fields;
525 for (auto& tupleField : tupleField0.GetSubFields()) {
526 auto field = std::make_shared<arrow::Field>(tupleField->GetFieldName(), arrowTypeFromRNTuple(*tupleField, tupleField->GetValueSize()));
527 fields.push_back(field);
528 }
529
530 return std::make_shared<arrow::Schema>(fields);
531}
532
533arrow::Result<arrow::RecordBatchGenerator> RNTupleFileFormat::ScanBatchesAsync(
534 const std::shared_ptr<arrow::dataset::ScanOptions>& options,
535 const std::shared_ptr<arrow::dataset::FileFragment>& fragment) const
536{
537 auto dataset_schema = options->dataset_schema;
538 auto ntupleFragment = std::dynamic_pointer_cast<RNTupleFileFragment>(fragment);
539
540 auto generator = [pool = options->pool, ntupleFragment, dataset_schema, &totalCompressedSize = mTotCompressedSize,
541 &totalUncompressedSize = mTotUncompressedSize]() -> arrow::Future<std::shared_ptr<arrow::RecordBatch>> {
542 using namespace ROOT::Experimental;
543 std::vector<std::shared_ptr<arrow::Array>> columns;
544 std::vector<std::shared_ptr<arrow::Field>> fields = dataset_schema->fields();
545
546 int64_t rows = -1;
547 ROOT::Experimental::RNTuple* rntuple = ntupleFragment->GetRNTuple();
548 auto reader = ROOT::Experimental::RNTupleReader::Open(rntuple);
549 auto& model = reader->GetModel();
550 for (auto& physicalField : fields) {
551 auto bulk = model.CreateBulk(physicalField->name());
552
553 auto listType = std::dynamic_pointer_cast<arrow::FixedSizeListType>(physicalField->type());
554
555 auto& descriptor = reader->GetDescriptor();
556 auto totalEntries = reader->GetNEntries();
557
558 if (rows == -1) {
559 rows = totalEntries;
560 }
561 if (rows != totalEntries) {
562 throw runtime_error_f("Unmatching number of rows for branch %s", physicalField->name().c_str());
563 }
564 arrow::Status status;
565 int readEntries = 0;
566 std::shared_ptr<arrow::Array> array;
567 if (physicalField->type() == arrow::boolean() ||
568 (listType && physicalField->type()->field(0)->type() == arrow::boolean())) {
569 if (listType) {
570 std::unique_ptr<arrow::ArrayBuilder> builder = nullptr;
571 auto status = arrow::MakeBuilder(pool, physicalField->type()->field(0)->type(), &builder);
572 if (!status.ok()) {
573 throw runtime_error("Cannot create value builder");
574 }
575 auto listBuilder = std::make_unique<arrow::FixedSizeListBuilder>(pool, std::move(builder), listType->list_size());
576 auto valueBuilder = listBuilder.get()->value_builder();
577 // boolean array special case: we need to use builder to create the bitmap
578 status = valueBuilder->Reserve(totalEntries * listType->list_size());
579 status &= listBuilder->Reserve(totalEntries);
580 if (!status.ok()) {
581 throw runtime_error("Failed to reserve memory for array builder");
582 }
583 auto clusterIt = descriptor.FindClusterId(0, 0);
584 // No adoption for now...
585 // bulk.AdoptBuffer(buffer, totalEntries)
586 while (clusterIt != kInvalidDescriptorId) {
587 auto& index = descriptor.GetClusterDescriptor(clusterIt);
588 auto mask = std::make_unique<bool[]>(index.GetNEntries());
589 std::fill(mask.get(), mask.get() + index.GetNEntries(), true);
590 void* ptr = bulk.ReadBulk(RClusterIndex(clusterIt, index.GetFirstEntryIndex()), mask.get(), index.GetNEntries());
591 int readLast = index.GetNEntries();
592 readEntries += readLast;
593 status &= static_cast<arrow::BooleanBuilder*>(valueBuilder)->AppendValues(reinterpret_cast<uint8_t const*>(ptr), readLast * listType->list_size());
594 clusterIt = descriptor.FindNextClusterId(clusterIt);
595 }
596 status &= static_cast<arrow::FixedSizeListBuilder*>(listBuilder.get())->AppendValues(readEntries);
597 if (!status.ok()) {
598 throw runtime_error("Failed to append values to array");
599 }
600 status &= listBuilder->Finish(&array);
601 if (!status.ok()) {
602 throw runtime_error("Failed to create array");
603 }
604 } else if (listType == nullptr) {
605 std::unique_ptr<arrow::ArrayBuilder> builder = nullptr;
606 auto status = arrow::MakeBuilder(pool, physicalField->type(), &builder);
607 if (!status.ok()) {
608 throw runtime_error("Cannot create builder");
609 }
610 auto valueBuilder = static_cast<arrow::BooleanBuilder*>(builder.get());
611 // boolean array special case: we need to use builder to create the bitmap
612 status = valueBuilder->Reserve(totalEntries);
613 if (!status.ok()) {
614 throw runtime_error("Failed to reserve memory for array builder");
615 }
616 auto clusterIt = descriptor.FindClusterId(0, 0);
617 while (clusterIt != kInvalidDescriptorId) {
618 auto& index = descriptor.GetClusterDescriptor(clusterIt);
619 auto mask = std::make_unique<bool[]>(index.GetNEntries());
620 std::fill(mask.get(), mask.get() + index.GetNEntries(), true);
621 void* ptr = bulk.ReadBulk(RClusterIndex(clusterIt, index.GetFirstEntryIndex()), mask.get(), index.GetNEntries());
622 int readLast = index.GetNEntries();
623 readEntries += readLast;
624 status &= valueBuilder->AppendValues(reinterpret_cast<uint8_t const*>(ptr), readLast);
625 clusterIt = descriptor.FindNextClusterId(clusterIt);
626 }
627 if (!status.ok()) {
628 throw runtime_error("Failed to append values to array");
629 }
630 status &= valueBuilder->Finish(&array);
631 if (!status.ok()) {
632 throw runtime_error("Failed to create array");
633 }
634 }
635 } else {
636 // other types: use serialized read to build arrays directly.
637 auto typeSize = physicalField->type()->byte_width();
638 // FIXME: for now...
639 auto bytes = 0;
640 auto branchSize = bytes ? bytes : 1000000;
641 auto&& result = arrow::AllocateResizableBuffer(branchSize, pool);
642 if (!result.ok()) {
643 throw runtime_error("Cannot allocate values buffer");
644 }
645 std::shared_ptr<arrow::Buffer> arrowValuesBuffer = std::move(result).ValueUnsafe();
646 auto ptr = arrowValuesBuffer->mutable_data();
647 if (ptr == nullptr) {
648 throw runtime_error("Invalid buffer");
649 }
650
651 std::unique_ptr<TBufferFile> offsetBuffer = nullptr;
652
653 std::shared_ptr<arrow::Buffer> arrowOffsetBuffer;
654 std::span<int> offsets;
655 int size = 0;
656 uint32_t totalSize = 0;
657 int64_t listSize = 1;
658 if (auto fixedSizeList = std::dynamic_pointer_cast<arrow::FixedSizeListType>(physicalField->type())) {
659 listSize = fixedSizeList->list_size();
660 typeSize = fixedSizeList->field(0)->type()->byte_width();
661 auto clusterIt = descriptor.FindClusterId(0, 0);
662 while (clusterIt != kInvalidDescriptorId) {
663 auto& index = descriptor.GetClusterDescriptor(clusterIt);
664 auto mask = std::make_unique<bool[]>(index.GetNEntries());
665 std::fill(mask.get(), mask.get() + index.GetNEntries(), true);
666 void* inPtr = bulk.ReadBulk(RClusterIndex(clusterIt, index.GetFirstEntryIndex()), mask.get(), index.GetNEntries());
667
668 int readLast = index.GetNEntries();
669 if (listSize == -1) {
670 size = offsets[readEntries + readLast] - offsets[readEntries];
671 } else {
672 size = readLast * listSize;
673 }
674 readEntries += readLast;
675 memcpy(ptr, inPtr, size * typeSize);
676 ptr += (ptrdiff_t)(size * typeSize);
677 clusterIt = descriptor.FindNextClusterId(clusterIt);
678 }
679 } else if (auto vlaListType = std::dynamic_pointer_cast<arrow::ListType>(physicalField->type())) {
680 listSize = -1;
681 typeSize = vlaListType->field(0)->type()->byte_width();
682 offsetBuffer = std::make_unique<TBufferFile>(TBuffer::EMode::kWrite, 4 * 1024 * 1024);
683 result = arrow::AllocateResizableBuffer((totalEntries + 1) * (int64_t)sizeof(int), pool);
684 if (!result.ok()) {
685 throw runtime_error("Cannot allocate offset buffer");
686 }
687 arrowOffsetBuffer = result.MoveValueUnsafe();
688
689 // Offset bulk
690 auto offsetBulk = model.CreateBulk(physicalField->name());
691 // Actual values are in a different place...
692 bulk = model.CreateBulk(physicalField->name());
693 auto clusterIt = descriptor.FindClusterId(0, 0);
694 auto* ptrOffset = reinterpret_cast<int*>(arrowOffsetBuffer->mutable_data());
695 auto* tPtrOffset = reinterpret_cast<int*>(ptrOffset);
696 offsets = std::span<int>{tPtrOffset, tPtrOffset + totalEntries + 1};
697
698 auto copyOffsets = [&arrowValuesBuffer, &pool, &ptrOffset, &ptr, &totalSize](auto inPtr, size_t total) {
699 using value_type = typename std::decay_t<decltype(*inPtr)>::value_type;
700 for (size_t i = 0; i < total; i++) {
701 *ptrOffset++ = totalSize;
702 totalSize += inPtr[i].size();
703 }
704 *ptrOffset = totalSize;
705 auto&& result = arrow::AllocateResizableBuffer(totalSize * sizeof(value_type), pool);
706 if (!result.ok()) {
707 throw runtime_error("Cannot allocate values buffer");
708 }
709 arrowValuesBuffer = result.MoveValueUnsafe();
710 ptr = (uint8_t*)(arrowValuesBuffer->mutable_data());
711 // Calculate the size of the buffer here.
712 for (size_t i = 0; i < total; i++) {
713 int vlaSizeInBytes = inPtr[i].size() * sizeof(value_type);
714 if (vlaSizeInBytes == 0) {
715 continue;
716 }
717 memcpy(ptr, inPtr[i].data(), vlaSizeInBytes);
718 ptr += vlaSizeInBytes;
719 }
720 };
721
722 while (clusterIt != kInvalidDescriptorId) {
723 auto& index = descriptor.GetClusterDescriptor(clusterIt);
724 auto mask = std::make_unique<bool[]>(index.GetNEntries());
725 std::fill(mask.get(), mask.get() + index.GetNEntries(), true);
726 int readLast = index.GetNEntries();
727 switch (vlaListType->field(0)->type()->id()) {
728 case arrow::Type::FLOAT: {
729 copyOffsets((ROOT::Internal::VecOps::RVec<float>*)offsetBulk.ReadBulk(RClusterIndex(clusterIt, index.GetFirstEntryIndex()), mask.get(), index.GetNEntries()), readLast);
730 } break;
731 case arrow::Type::DOUBLE: {
732 copyOffsets((ROOT::Internal::VecOps::RVec<double>*)offsetBulk.ReadBulk(RClusterIndex(clusterIt, index.GetFirstEntryIndex()), mask.get(), index.GetNEntries()), readLast);
733 } break;
734 case arrow::Type::INT8: {
735 copyOffsets((ROOT::Internal::VecOps::RVec<int8_t>*)offsetBulk.ReadBulk(RClusterIndex(clusterIt, index.GetFirstEntryIndex()), mask.get(), index.GetNEntries()), readLast);
736 } break;
737 case arrow::Type::INT16: {
738 copyOffsets((ROOT::Internal::VecOps::RVec<int16_t>*)offsetBulk.ReadBulk(RClusterIndex(clusterIt, index.GetFirstEntryIndex()), mask.get(), index.GetNEntries()), readLast);
739 } break;
740 case arrow::Type::INT32: {
741 copyOffsets((ROOT::Internal::VecOps::RVec<int32_t>*)offsetBulk.ReadBulk(RClusterIndex(clusterIt, index.GetFirstEntryIndex()), mask.get(), index.GetNEntries()), readLast);
742 } break;
743 case arrow::Type::INT64: {
744 copyOffsets((ROOT::Internal::VecOps::RVec<int64_t>*)offsetBulk.ReadBulk(RClusterIndex(clusterIt, index.GetFirstEntryIndex()), mask.get(), index.GetNEntries()), readLast);
745 } break;
746 case arrow::Type::UINT8: {
747 copyOffsets((ROOT::Internal::VecOps::RVec<uint8_t>*)offsetBulk.ReadBulk(RClusterIndex(clusterIt, index.GetFirstEntryIndex()), mask.get(), index.GetNEntries()), readLast);
748 } break;
749 case arrow::Type::UINT16: {
750 copyOffsets((ROOT::Internal::VecOps::RVec<uint16_t>*)offsetBulk.ReadBulk(RClusterIndex(clusterIt, index.GetFirstEntryIndex()), mask.get(), index.GetNEntries()), readLast);
751 } break;
752 case arrow::Type::UINT32: {
753 copyOffsets((ROOT::Internal::VecOps::RVec<uint32_t>*)offsetBulk.ReadBulk(RClusterIndex(clusterIt, index.GetFirstEntryIndex()), mask.get(), index.GetNEntries()), readLast);
754 } break;
755 case arrow::Type::UINT64: {
756 copyOffsets((ROOT::Internal::VecOps::RVec<uint64_t>*)offsetBulk.ReadBulk(RClusterIndex(clusterIt, index.GetFirstEntryIndex()), mask.get(), index.GetNEntries()), readLast);
757 } break;
758 default: {
759 throw runtime_error("Unsupported kind of VLA");
760 } break;
761 }
762
763 readEntries += readLast;
764 clusterIt = descriptor.FindNextClusterId(clusterIt);
765 }
766 } else {
767 auto clusterIt = descriptor.FindClusterId(0, 0);
768 while (clusterIt != kInvalidDescriptorId) {
769 auto& index = descriptor.GetClusterDescriptor(clusterIt);
770 auto mask = std::make_unique<bool[]>(index.GetNEntries());
771 std::fill(mask.get(), mask.get() + index.GetNEntries(), true);
772 void* inPtr = bulk.ReadBulk(RClusterIndex(clusterIt, index.GetFirstEntryIndex()), mask.get(), index.GetNEntries());
773
774 int readLast = index.GetNEntries();
775 if (listSize == -1) {
776 size = offsets[readEntries + readLast] - offsets[readEntries];
777 } else {
778 size = readLast * listSize;
779 }
780 readEntries += readLast;
781 memcpy(ptr, inPtr, size * typeSize);
782 ptr += (ptrdiff_t)(size * typeSize);
783 clusterIt = descriptor.FindNextClusterId(clusterIt);
784 }
785 }
786 switch (listSize) {
787 case -1: {
788 auto varray = std::make_shared<arrow::PrimitiveArray>(physicalField->type()->field(0)->type(), totalSize, arrowValuesBuffer);
789 array = std::make_shared<arrow::ListArray>(physicalField->type(), readEntries, arrowOffsetBuffer, varray);
790 } break;
791 case 1: {
792 totalSize = readEntries * listSize;
793 array = std::make_shared<arrow::PrimitiveArray>(physicalField->type(), readEntries, arrowValuesBuffer);
794
795 } break;
796 default: {
797 totalSize = readEntries * listSize;
798 auto varray = std::make_shared<arrow::PrimitiveArray>(physicalField->type()->field(0)->type(), totalSize, arrowValuesBuffer);
799 array = std::make_shared<arrow::FixedSizeListArray>(physicalField->type(), readEntries, varray);
800 }
801 }
802 }
803 columns.push_back(array);
804 }
805
806 auto batch = arrow::RecordBatch::Make(dataset_schema, rows, columns);
807 return batch;
808 };
809
810 return generator;
811}
812
813arrow::Result<std::shared_ptr<arrow::dataset::FileWriter>> RNTupleFileFormat::MakeWriter(std::shared_ptr<arrow::io::OutputStream> destination,
814 std::shared_ptr<arrow::Schema> schema,
815 std::shared_ptr<arrow::dataset::FileWriteOptions> options,
816 arrow::fs::FileLocator destination_locator) const
817{
818 auto writer = std::make_shared<RNTupleFileWriter>(schema, options, destination, destination_locator);
819 return std::dynamic_pointer_cast<arrow::dataset::FileWriter>(writer);
820}
821
822arrow::Result<std::shared_ptr<arrow::dataset::FileFragment>> RNTupleFileFormat::MakeFragment(
823 arrow::dataset::FileSource source, arrow::compute::Expression partition_expression,
824 std::shared_ptr<arrow::Schema> physical_schema)
825{
826 std::shared_ptr<arrow::dataset::FileFormat> format = std::make_shared<RNTupleFileFormat>(mTotCompressedSize, mTotUncompressedSize);
827
828 auto fragment = std::make_shared<RNTupleFileFragment>(source, format,
829 partition_expression,
830 physical_schema);
831 return std::dynamic_pointer_cast<arrow::dataset::FileFragment>(fragment);
832}
833
835
836std::shared_ptr<arrow::dataset::FileWriteOptions>
838{
839 return std::make_shared<RNTupleFileWriteOptions>(shared_from_this());
840}
841
845 std::shared_ptr<o2::framework::RNTupleFileFormat> format = nullptr;
846};
847
850 {
851 auto context = new RNTuplePluginContext;
852 context->format = std::make_shared<o2::framework::RNTupleFileFormat>(context->totalCompressedSize, context->totalUncompressedSize);
853 return new RootArrowFactory{
854 .options = [context]() { return context->format->DefaultWriteOptions(); },
855 .format = [context]() { return context->format; },
856 .deferredOutputStreamer = [](std::shared_ptr<arrow::dataset::FileFragment> fragment, const std::shared_ptr<arrow::ResizableBuffer>& buffer) -> std::shared_ptr<arrow::io::OutputStream> {
857 auto treeFragment = std::dynamic_pointer_cast<RNTupleFileFragment>(fragment);
858 return std::make_shared<FairMQOutputStream>(buffer);
859 }};
860 }
861};
862
866} // namespace o2::framework
int32_t i
#define DEFINE_DPL_PLUGIN_INSTANCE(NAME, KIND)
Definition Plugins.h:112
#define DEFINE_DPL_PLUGINS_END
Definition Plugins.h:115
#define DEFINE_DPL_PLUGINS_BEGIN
Definition Plugins.h:107
auto arrowTypeFromRNTuple(ROOT::Experimental::RFieldBase const &field, int size)
uint16_t pos
Definition RawData.h:3
TBranch * ptr
~RNTupleFileFormat() override=default
RNTupleFileFormat(size_t &totalCompressedSize, size_t &totalUncompressedSize)
std::shared_ptr< arrow::dataset::FileWriteOptions > DefaultWriteOptions() override
bool Equals(const FileFormat &other) const override
arrow::Result< std::shared_ptr< arrow::dataset::FileWriter > > MakeWriter(std::shared_ptr< arrow::io::OutputStream > destination, std::shared_ptr< arrow::Schema > schema, std::shared_ptr< arrow::dataset::FileWriteOptions > options, arrow::fs::FileLocator destination_locator) const override
arrow::Result< std::shared_ptr< arrow::dataset::FileFragment > > MakeFragment(arrow::dataset::FileSource source, arrow::compute::Expression partition_expression, std::shared_ptr< arrow::Schema > physical_schema) override
arrow::Result< arrow::RecordBatchGenerator > ScanBatchesAsync(const std::shared_ptr< arrow::dataset::ScanOptions > &options, const std::shared_ptr< arrow::dataset::FileFragment > &fragment) const override
arrow::Result< bool > IsSupported(const arrow::dataset::FileSource &source) const override
arrow::Result< std::shared_ptr< arrow::Schema > > Inspect(const arrow::dataset::FileSource &source) const override
std::string type_name() const override
ROOT::Experimental::RNTuple * GetRNTuple()
RNTupleFileFragment(arrow::dataset::FileSource source, std::shared_ptr< arrow::dataset::FileFormat > format, arrow::compute::Expression partition_expression, std::shared_ptr< arrow::Schema > physical_schema)
virtual ROOT::Experimental::RNTuple * GetRNTuple(arrow::dataset::FileSource source)=0
RNTupleFileWriteOptions(std::shared_ptr< arrow::dataset::FileFormat > format)
arrow::Future FinishInternal() override
arrow::Status Write(const std::shared_ptr< arrow::RecordBatch > &batch) override
RNTupleFileWriter(std::shared_ptr< arrow::Schema > schema, std::shared_ptr< arrow::dataset::FileWriteOptions > options, std::shared_ptr< arrow::io::OutputStream > destination, arrow::fs::FileLocator destination_locator)
ROOT::Experimental::RNTuple * GetRNTuple(arrow::dataset::FileSource) override
std::string type_name() const override
SingleRNTupleFileSystem(ROOT::Experimental::RNTuple *tuple)
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLuint entry
Definition glcorearb.h:5735
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLsizei const GLuint const GLintptr * offsets
Definition glcorearb.h:2595
const GLdouble * v
Definition glcorearb.h:832
GLenum array
Definition glcorearb.h:4274
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLsizei GLsizei GLchar * source
Definition glcorearb.h:798
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei GLsizei * length
Definition glcorearb.h:790
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
GLint GLuint mask
Definition glcorearb.h:291
GLint GLint GLsizei GLint GLenum format
Definition glcorearb.h:275
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RuntimeErrorRef runtime_error(const char *)
std::unique_ptr< ROOT::Experimental::RFieldBase > rootFieldFromArrow(std::shared_ptr< arrow::Field > field, std::string name)
RuntimeErrorRef runtime_error_f(const char *,...)
Definition list.h:40
std::shared_ptr< o2::framework::RNTupleFileFormat > format
std::function< std::shared_ptr< arrow::dataset::FileWriteOptions >()> options
void VisitBoolField(const ROOT::Experimental::RField< bool > &field) override
void VisitUInt8Field(const ROOT::Experimental::RField< std::uint8_t > &field) override
void VisitField(const ROOT::Experimental::RFieldBase &field) override
void VisitIntField(const ROOT::Experimental::RField< int > &field) override
void VisitDoubleField(const ROOT::Experimental::RField< double > &field) override
void VisitInt16Field(const ROOT::Experimental::RField< std::int16_t > &field) override
void VisitUInt32Field(const ROOT::Experimental::RField< std::uint32_t > &field) override
void VisitInt8Field(const ROOT::Experimental::RField< std::int8_t > &field) override
std::shared_ptr< arrow::DataType > datatype
void VisitFloatField(const ROOT::Experimental::RField< float > &field) override
void VisitRVecField(const ROOT::Experimental::RRVecField &field) override
void VisitArrayField(const ROOT::Experimental::RArrayField &field) override
void VisitUInt16Field(const ROOT::Experimental::RField< std::uint16_t > &field) override
VectorOfTObjectPtrs other
std::vector< ReadoutWindowData > rows