Project
Loading...
Searching...
No Matches
TTreePlugin.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13#include "Framework/Plugins.h"
14#include "Framework/Signpost.h"
15#include "Framework/BigEndian.h"
16#include <TBufferFile.h>
17#include <TBufferIO.h>
18#include <arrow/buffer.h>
19#include <arrow/dataset/file_base.h>
20#include <arrow/extension_type.h>
21#include <arrow/memory_pool.h>
22#include <arrow/status.h>
23#include <arrow/type.h>
24#include <arrow/type_fwd.h>
25#include <arrow/util/key_value_metadata.h>
26#include <arrow/array/array_nested.h>
27#include <arrow/array/array_primitive.h>
28#include <arrow/array/builder_nested.h>
29#include <arrow/array/builder_primitive.h>
30#include <arrow/array/util.h>
31#include <arrow/record_batch.h>
32#include <TTree.h>
33#include <TBranch.h>
34#include <TFile.h>
35#include <TLeaf.h>
36#include <unistd.h>
37#include <cstdint>
38#include <memory>
39#include <stdexcept>
40
42
43namespace o2::framework
44{
45
46enum struct ReadOpKind {
47 Unknown,
48 Offsets,
49 Values,
51 VLA
52};
53
54struct ReadOps {
55 TBranch* branch = nullptr;
56 std::shared_ptr<arrow::Buffer> targetBuffer = nullptr;
57 int64_t rootBranchEntries = 0;
58 size_t typeSize = 0;
59 size_t listSize = 0;
60 // If this is an offset reading op, keep track of the actual
61 // range for the offsets, not only how many VLAs are there.
62 int64_t offsetCount = 0;
64};
65
71{
72 public:
73 explicit TTreeDeferredReadOutputStream(std::vector<ReadOps>& ops,
74 const std::shared_ptr<arrow::ResizableBuffer>& buffer);
75
82 static arrow::Result<std::shared_ptr<TTreeDeferredReadOutputStream>> Create(
83 std::vector<ReadOps>& ops,
84 int64_t initial_capacity = 4096,
85 arrow::MemoryPool* pool = arrow::default_memory_pool());
86
87 // By the time we call the destructor, the contents
88 // of the buffer are already moved to fairmq
89 // for being sent.
90 ~TTreeDeferredReadOutputStream() override = default;
91
92 // Implement the OutputStream interface
93
95 arrow::Status Close() override;
96 [[nodiscard]] bool closed() const override;
97 [[nodiscard]] arrow::Result<int64_t> Tell() const override;
98 arrow::Status Write(const void* data, int64_t nbytes) override;
99
101 using OutputStream::Write;
103
105 arrow::Result<std::shared_ptr<arrow::Buffer>> Finish();
106
112 arrow::Status Reset(std::vector<ReadOps> ops,
113 int64_t initial_capacity, arrow::MemoryPool* pool);
114
115 [[nodiscard]] int64_t capacity() const { return capacity_; }
116
117 private:
119 std::vector<ReadOps> ops_;
120
121 // Ensures there is sufficient space available to write nbytes
122 arrow::Status Reserve(int64_t nbytes);
123
124 std::shared_ptr<arrow::ResizableBuffer> buffer_;
125 bool is_open_;
126 int64_t capacity_;
127 int64_t position_;
128 uint8_t* mutable_data_;
129};
130
131static constexpr int64_t kBufferMinimumSize = 256;
132
133TTreeDeferredReadOutputStream::TTreeDeferredReadOutputStream()
134 : is_open_(false), capacity_(0), position_(0), mutable_data_(nullptr) {}
135
136TTreeDeferredReadOutputStream::TTreeDeferredReadOutputStream(std::vector<ReadOps>& ops,
137 const std::shared_ptr<arrow::ResizableBuffer>& buffer)
138 : ops_(ops),
139 buffer_(buffer),
140 is_open_(true),
141 capacity_(buffer->size()),
142 position_(0),
143 mutable_data_(buffer->mutable_data()) {}
144
145arrow::Result<std::shared_ptr<TTreeDeferredReadOutputStream>> TTreeDeferredReadOutputStream::Create(
146 std::vector<ReadOps>& ops,
147 int64_t initial_capacity, arrow::MemoryPool* pool)
148{
149 // ctor is private, so cannot use make_shared
150 auto ptr = std::shared_ptr<TTreeDeferredReadOutputStream>(new TTreeDeferredReadOutputStream);
151 RETURN_NOT_OK(ptr->Reset(ops, initial_capacity, pool));
152 return ptr;
153}
154
155arrow::Status TTreeDeferredReadOutputStream::Reset(std::vector<ReadOps> ops,
156 int64_t initial_capacity, arrow::MemoryPool* pool)
157{
158 ARROW_ASSIGN_OR_RAISE(buffer_, AllocateResizableBuffer(initial_capacity, pool));
159 ops_ = ops;
160 is_open_ = true;
161 capacity_ = initial_capacity;
162 position_ = 0;
163 mutable_data_ = buffer_->mutable_data();
164 return arrow::Status::OK();
165}
166
168{
169 if (is_open_) {
170 is_open_ = false;
171 if (position_ < capacity_) {
172 RETURN_NOT_OK(buffer_->Resize(position_, false));
173 }
174 }
175 return arrow::Status::OK();
176}
177
178bool TTreeDeferredReadOutputStream::closed() const { return !is_open_; }
179
180arrow::Result<std::shared_ptr<arrow::Buffer>> TTreeDeferredReadOutputStream::Finish()
181{
182 RETURN_NOT_OK(Close());
183 buffer_->ZeroPadding();
184 is_open_ = false;
185 return std::move(buffer_);
186}
187
188arrow::Result<int64_t> TTreeDeferredReadOutputStream::Tell() const { return position_; }
189
191 int readEntries = 0;
192 rootBuffer.Reset();
193 while (readEntries < op.rootBranchEntries) {
194 auto readLast = op.branch->GetBulkRead().GetEntriesSerialized(readEntries, rootBuffer);
195 if (readLast < 0) {
196 throw runtime_error_f("Error while reading branch %s starting from %zu.", op.branch->GetName(), readEntries);
197 }
198 int size = readLast * op.listSize;
199 readEntries += readLast;
200 bigEndianCopy(target, rootBuffer.GetCurrent(), size, op.typeSize);
201 target += (ptrdiff_t)(size * op.typeSize);
202 }
203};
204
206 int readEntries = 0;
207 rootBuffer.Reset();
208 // Set to 0
209 memset(target, 0, op.targetBuffer->size());
210 int readLast = 0;
211 while (readEntries < op.rootBranchEntries) {
212 auto beginValue = readEntries;
213 readLast = op.branch->GetBulkRead().GetBulkEntries(readEntries, rootBuffer);
214 int size = readLast * op.listSize;
215 readEntries += readLast;
216 for (int i = beginValue; i < beginValue + size; ++i) {
217 auto value = static_cast<uint8_t>(rootBuffer.GetCurrent()[i - beginValue] << (i % 8));
218 target[i / 8] |= value;
219 }
220 }
221};
222
223auto readVLAValues = [](uint8_t* target, ReadOps& op, ReadOps const& offsetOp, TBufferFile& rootBuffer) {
224 int readEntries = 0;
225 auto* tPtrOffset = reinterpret_cast<const int*>(offsetOp.targetBuffer->data());
226 std::span<int const> const offsets{tPtrOffset, tPtrOffset + offsetOp.rootBranchEntries + 1};
227
228 rootBuffer.Reset();
229 while (readEntries < op.rootBranchEntries) {
230 auto readLast = op.branch->GetBulkRead().GetEntriesSerialized(readEntries, rootBuffer);
231 int size = offsets[readEntries + readLast] - offsets[readEntries];
232 readEntries += readLast;
233 bigEndianCopy(target, rootBuffer.GetCurrent(), size, op.typeSize);
234 target += (ptrdiff_t)(size * op.typeSize);
235 }
236};
237
239{
240 // FIXME: we will need more than one once we have multithreaded reading.
241 static TBufferFile rootBuffer{TBuffer::EMode::kWrite, 4 * 1024 * 1024};
242 return rootBuffer;
243}
244
245arrow::Status TTreeDeferredReadOutputStream::Write(const void* data, int64_t nbytes)
246{
247 if (ARROW_PREDICT_FALSE(!is_open_)) {
248 return arrow::Status::IOError("OutputStream is closed");
249 }
250 if (ARROW_PREDICT_TRUE(nbytes == 0)) {
251 return arrow::Status::OK();
252 }
253 if (ARROW_PREDICT_FALSE(position_ + nbytes >= capacity_)) {
254 RETURN_NOT_OK(Reserve(nbytes));
255 }
256 // This is a real address which needs to be copied. Do it!
257 auto ref = (int64_t)data;
258 if (ref >= ops_.size()) {
259 memcpy(mutable_data_ + position_, data, nbytes);
260 position_ += nbytes;
261 return arrow::Status::OK();
262 }
263 auto& op = ops_[ref];
264
265 switch (op.kind) {
266 // Offsets need to be read in advance because we need to know
267 // how many elements are there in total (since TTree does not allow discovering such informantion)
269 break;
271 readValues(mutable_data_ + position_, op, rootBuffer());
272 break;
273 case ReadOpKind::VLA:
274 readVLAValues(mutable_data_ + position_, op, ops_[ref - 1], rootBuffer());
275 break;
277 readBoolValues(mutable_data_ + position_, op, rootBuffer());
278 break;
280 throw runtime_error("Unknown Op");
281 }
282 op.branch->SetStatus(false);
283 op.branch->DropBaskets("all");
284 op.branch->Reset();
285 op.branch->GetTransientBuffer(0)->Expand(0);
286
287 position_ += nbytes;
288 return arrow::Status::OK();
289}
290
291arrow::Status TTreeDeferredReadOutputStream::Reserve(int64_t nbytes)
292{
293 // Always overallocate by doubling. It seems that it is a better growth
294 // strategy, at least for memory_benchmark.cc.
295 // This may be because it helps match the allocator's allocation buckets
296 // more exactly. Or perhaps it hits a sweet spot in jemalloc.
297 int64_t new_capacity = std::max(kBufferMinimumSize, capacity_);
298 new_capacity = position_ + nbytes;
299 if (new_capacity > capacity_) {
300 RETURN_NOT_OK(buffer_->Resize(new_capacity));
301 capacity_ = new_capacity;
302 mutable_data_ = buffer_->mutable_data();
303 }
304 return arrow::Status::OK();
305}
306
308{
309 public:
310 TTreeFileWriteOptions(std::shared_ptr<arrow::dataset::FileFormat> format)
311 : FileWriteOptions(format)
312 {
313 }
314};
315
316// A filesystem which allows me to get a TTree
318{
319 public:
321
322 arrow::Result<std::shared_ptr<arrow::io::OutputStream>> OpenOutputStream(
323 const std::string& path,
324 const std::shared_ptr<const arrow::KeyValueMetadata>& metadata) override;
325
326 virtual std::unique_ptr<TTree>& GetTree(arrow::dataset::FileSource source) = 0;
327};
328
330{
331 size_t& mTotCompressedSize;
332 size_t& mTotUncompressedSize;
333
334 public:
335 TTreeFileFormat(size_t& totalCompressedSize, size_t& totalUncompressedSize)
336 : FileFormat({}),
337 mTotCompressedSize(totalCompressedSize),
338 mTotUncompressedSize(totalUncompressedSize)
339 {
340 }
341
342 ~TTreeFileFormat() override = default;
343
344 std::string type_name() const override
345 {
346 return "ttree";
347 }
348
349 bool Equals(const FileFormat& other) const override
350 {
351 return other.type_name() == this->type_name();
352 }
353
354 arrow::Result<bool> IsSupported(const arrow::dataset::FileSource& source) const override
355 {
356 auto fs = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(source.filesystem());
357 if (!fs) {
358 return false;
359 }
360 return fs->CheckSupport(source);
361 }
362
363 arrow::Result<std::shared_ptr<arrow::Schema>> Inspect(const arrow::dataset::FileSource& source) const override;
365 arrow::Result<std::shared_ptr<arrow::dataset::FileFragment>> MakeFragment(
366 arrow::dataset::FileSource source, arrow::compute::Expression partition_expression,
367 std::shared_ptr<arrow::Schema> physical_schema) override;
368
369 arrow::Result<std::shared_ptr<arrow::dataset::FileWriter>> MakeWriter(std::shared_ptr<arrow::io::OutputStream> destination, std::shared_ptr<arrow::Schema> schema, std::shared_ptr<arrow::dataset::FileWriteOptions> options, arrow::fs::FileLocator destination_locator) const override;
370
371 std::shared_ptr<arrow::dataset::FileWriteOptions> DefaultWriteOptions() override;
372
373 arrow::Result<arrow::RecordBatchGenerator> ScanBatchesAsync(
374 const std::shared_ptr<arrow::dataset::ScanOptions>& options,
375 const std::shared_ptr<arrow::dataset::FileFragment>& fragment) const override;
376};
377
379{
380 public:
382 : TTreeFileSystem(),
383 mTree(tree)
384 {
385 }
386
387 arrow::Result<arrow::fs::FileInfo> GetFileInfo(std::string const& path) override;
388
389 std::string type_name() const override
390 {
391 return "ttree";
392 }
393
394 std::shared_ptr<RootObjectHandler> GetObjectHandler(arrow::dataset::FileSource source) override
395 {
396 return std::make_shared<RootObjectHandler>((void*)mTree.get(), std::make_shared<TTreeFileFormat>(mTotCompressedSize, mTotUncompressedSize));
397 }
398
399 std::unique_ptr<TTree>& GetTree(arrow::dataset::FileSource) override
400 {
401 // Simply return the only TTree we have
402 return mTree;
403 }
404
405 private:
406 size_t mTotUncompressedSize;
407 size_t mTotCompressedSize;
408 std::unique_ptr<TTree> mTree;
409};
410
411arrow::Result<arrow::fs::FileInfo> SingleTreeFileSystem::GetFileInfo(std::string const& path)
412{
413 arrow::dataset::FileSource source(path, shared_from_this());
414 arrow::fs::FileInfo result;
415 result.set_path(path);
416 result.set_type(arrow::fs::FileType::File);
417 return result;
418}
419
420// A fragment which holds a tree
422{
423 public:
424 TTreeFileFragment(arrow::dataset::FileSource source,
425 std::shared_ptr<arrow::dataset::FileFormat> format,
426 arrow::compute::Expression partition_expression,
427 std::shared_ptr<arrow::Schema> physical_schema)
428 : FileFragment(source, format, std::move(partition_expression), physical_schema)
429 {
430 auto rootFS = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(this->source().filesystem());
431 if (rootFS.get() == nullptr) {
432 throw runtime_error_f("Unknown filesystem %s when reading %s.",
433 source.filesystem()->type_name().c_str(), source.path().c_str());
434 }
435 auto objectHandler = rootFS->GetObjectHandler(source);
436 if (!objectHandler->format->Equals(*format)) {
437 throw runtime_error_f("Cannot read source %s with format %s to pupulate a TTreeFileFragment.",
438 source.path().c_str(), objectHandler->format->type_name().c_str());
439 };
440 mTree = objectHandler->GetObjectAsOwner<TTree>();
441 }
442
443 TTree* GetTree()
444 {
445 return mTree.get();
446 }
447
448 std::vector<ReadOps>& ops()
449 {
450 return mOps;
451 }
452
455 std::shared_ptr<arrow::Buffer> GetPlaceholderForOp(size_t size)
456 {
457 return std::make_shared<arrow::Buffer>((uint8_t*)(mOps.size() - 1), size);
458 }
459
460 private:
461 std::unique_ptr<TTree> mTree;
462 std::vector<ReadOps> mOps;
463};
464
465// An arrow outputstream which allows to write to a TTree. Eventually
466// with a prefix for the branches.
468{
469 public:
470 // Using a pointer means that the tree itself is owned by another
471 // class
472 TTreeOutputStream(TTree*, std::string branchPrefix);
473
474 arrow::Status Close() override;
475
476 arrow::Result<int64_t> Tell() const override;
477
478 arrow::Status Write(const void* data, int64_t nbytes) override;
479
480 bool closed() const override;
481
482 TBranch* CreateBranch(char const* branchName, char const* sizeBranch);
483
484 TTree* GetTree()
485 {
486 return mTree;
487 }
488
489 private:
490 TTree* mTree;
491 std::string mBranchPrefix;
492};
493
494// An arrow outputstream which allows to write to a ttree
495// @a branch prefix is to be used to identify a set of branches which all belong to
496// the same table.
497TTreeOutputStream::TTreeOutputStream(TTree* f, std::string branchPrefix)
498 : mTree(f),
499 mBranchPrefix(std::move(branchPrefix))
500{
501}
502
504{
505 if (mTree->GetCurrentFile() == nullptr) {
506 return arrow::Status::Invalid("Cannot close a tree not attached to a file");
507 }
508 mTree->GetCurrentFile()->Close();
509 return arrow::Status::OK();
510}
511
512arrow::Result<int64_t> TTreeOutputStream::Tell() const
513{
514 return arrow::Result<int64_t>(arrow::Status::NotImplemented("Cannot move"));
515}
516
517arrow::Status TTreeOutputStream::Write(const void* data, int64_t nbytes)
518{
519 return arrow::Status::NotImplemented("Cannot write raw bytes to a TTree");
520}
521
523{
524 // A standalone tree is never closed.
525 if (mTree->GetCurrentFile() == nullptr) {
526 return false;
527 }
528 return mTree->GetCurrentFile()->IsOpen() == false;
529}
530
531TBranch* TTreeOutputStream::CreateBranch(char const* branchName, char const* sizeBranch)
532{
533 if (mBranchPrefix.empty() == true) {
534 return mTree->Branch(branchName, (char*)nullptr, sizeBranch);
535 }
536 return mTree->Branch((mBranchPrefix + "/" + branchName).c_str(), (char*)nullptr, (mBranchPrefix + sizeBranch).c_str());
537}
538
542 std::shared_ptr<o2::framework::TTreeFileFormat> format = nullptr;
543};
544
547 {
548 auto context = new TTreePluginContext;
549 context->format = std::make_shared<o2::framework::TTreeFileFormat>(context->totalCompressedSize, context->totalUncompressedSize);
550 return new RootArrowFactory{
551 .options = [context]() { return context->format->DefaultWriteOptions(); },
552 .format = [context]() { return context->format; },
553 .deferredOutputStreamer = [](std::shared_ptr<arrow::dataset::FileFragment> fragment, const std::shared_ptr<arrow::ResizableBuffer>& buffer) -> std::shared_ptr<arrow::io::OutputStream> {
554 auto treeFragment = std::dynamic_pointer_cast<TTreeFileFragment>(fragment);
555 return std::make_shared<TTreeDeferredReadOutputStream>(treeFragment->ops(), buffer);
556 }};
557 }
558};
559
565
567 uint32_t offset = 0;
568 std::span<int> offsets;
569 int readEntries = 0;
570 int count = 0;
571 auto* tPtrOffset = reinterpret_cast<int*>(op.targetBuffer->mutable_data());
572 offsets = std::span<int>{tPtrOffset, tPtrOffset + op.rootBranchEntries + 1};
573
574 // read sizes first
575 rootBuffer.Reset();
576 while (readEntries < op.rootBranchEntries) {
577 auto readLast = op.branch->GetBulkRead().GetEntriesSerialized(readEntries, rootBuffer);
578 if (readLast == -1) {
579 throw runtime_error_f("Unable to read from branch %s.", op.branch->GetName());
580 }
581 readEntries += readLast;
582 for (auto i = 0; i < readLast; ++i) {
583 offsets[count++] = (int)offset;
584 uint32_t raw = reinterpret_cast<uint32_t*>(rootBuffer.GetCurrent())[i];
585 offset += (std::endian::native == std::endian::little) ? __builtin_bswap32(raw) : raw;
586 }
587 }
589 op.offsetCount = offset;
590};
591
592arrow::Result<arrow::RecordBatchGenerator> TTreeFileFormat::ScanBatchesAsync(
593 const std::shared_ptr<arrow::dataset::ScanOptions>& options,
594 const std::shared_ptr<arrow::dataset::FileFragment>& fragment) const
595{
596 assert(options->dataset_schema != nullptr);
597 // This is the schema we want to read
598 auto dataset_schema = options->dataset_schema;
599 auto treeFragment = std::dynamic_pointer_cast<TTreeFileFragment>(fragment);
600 if (treeFragment.get() == nullptr) {
601 return {arrow::Status::NotImplemented("Not a ttree fragment")};
602 }
603
604 auto generator = [pool = options->pool, treeFragment, dataset_schema, &totalCompressedSize = mTotCompressedSize,
605 &totalUncompressedSize = mTotUncompressedSize]() -> arrow::Future<std::shared_ptr<arrow::RecordBatch>> {
606 O2_SIGNPOST_ID_FROM_POINTER(tid, root_arrow_fs, treeFragment->GetTree());
607 O2_SIGNPOST_START(root_arrow_fs, tid, "Generator", "Creating batch for tree %{public}s", treeFragment->GetTree()->GetName());
608 std::vector<std::shared_ptr<arrow::Array>> columns;
609 std::vector<std::shared_ptr<arrow::Field>> fields = dataset_schema->fields();
610 auto physical_schema = *treeFragment->ReadPhysicalSchema();
611
612 if (dataset_schema->num_fields() > physical_schema->num_fields()) {
613 throw runtime_error_f("One TTree must have all the fields requested in a table");
614 }
615
616 // Register physical fields into the cache
617 std::vector<BranchFieldMapping> mappings;
618
619 // We need to count the number of readops to avoid moving the vector.
620 int opsCount = 0;
621 for (int fi = 0; fi < dataset_schema->num_fields(); ++fi) {
622 auto dataset_field = dataset_schema->field(fi);
623 // This is needed because for now the dataset_field
624 // is actually the schema of the ttree
625 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Processing dataset field %{public}s.", dataset_field->name().c_str());
626 int physicalFieldIdx = physical_schema->GetFieldIndex(dataset_field->name());
627
628 if (physicalFieldIdx < 0) {
629 throw runtime_error_f("Cannot find physical field associated to %s. Possible fields: %s",
630 dataset_field->name().c_str(), physical_schema->ToString().c_str());
631 }
632 if (physicalFieldIdx > 0 && physical_schema->field(physicalFieldIdx - 1)->name().ends_with("_size")) {
633 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Field %{public}s has sizes in %{public}s.", dataset_field->name().c_str(),
634 physical_schema->field(physicalFieldIdx - 1)->name().c_str());
635 mappings.push_back({physicalFieldIdx, physicalFieldIdx - 1, fi});
636 opsCount += 2;
637 } else {
638 if (physicalFieldIdx > 0) {
639 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Field %{public}s previous field is %{public}s.", dataset_field->name().c_str(),
640 physical_schema->field(physicalFieldIdx - 1)->name().c_str());
641 }
642 mappings.push_back({physicalFieldIdx, -1, fi});
643 opsCount++;
644 }
645 }
646
647 auto* tree = treeFragment->GetTree();
648 auto branches = tree->GetListOfBranches();
649 size_t totalTreeSize = 0;
650 std::vector<TBranch*> selectedBranches;
651 for (auto& mapping : mappings) {
652 selectedBranches.push_back((TBranch*)branches->At(mapping.mainBranchIdx));
653 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Adding branch %{public}s to stream.", selectedBranches.back()->GetName());
654 totalTreeSize += selectedBranches.back()->GetTotalSize();
655 if (mapping.vlaIdx != -1) {
656 selectedBranches.push_back((TBranch*)branches->At(mapping.vlaIdx));
657 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Adding branch %{public}s to stream.", selectedBranches.back()->GetName());
658 totalTreeSize += selectedBranches.back()->GetTotalSize();
659 }
660 }
661
662 size_t cacheSize = std::max(std::min(totalTreeSize, 25000000UL), 1000000UL);
663 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Resizing cache to %zu.", cacheSize);
664 tree->SetCacheSize(cacheSize);
665 for (auto* branch : selectedBranches) {
666 tree->AddBranchToCache(branch, false);
667 }
668 tree->StopCacheLearningPhase();
669
670 // Intermediate buffer to bulk read. Two for now
671 std::vector<ReadOps>& ops = treeFragment->ops();
672 ops.clear();
673 ops.reserve(opsCount);
674 for (size_t mi = 0; mi < mappings.size(); ++mi) {
675 BranchFieldMapping mapping = mappings[mi];
676 // The field actually on disk
677 auto datasetField = dataset_schema->field(mapping.datasetFieldIdx);
678 auto physicalField = physical_schema->field(mapping.mainBranchIdx);
679
680 if (mapping.vlaIdx != -1) {
681 auto* branch = (TBranch*)branches->At(mapping.vlaIdx);
682 ops.emplace_back(ReadOps{
683 .branch = branch,
684 .rootBranchEntries = branch->GetEntries(),
685 .typeSize = 4,
686 .listSize = 1,
687 .kind = ReadOpKind::Offsets,
688 });
689 auto& op = ops.back();
690 ARROW_ASSIGN_OR_RAISE(op.targetBuffer, arrow::AllocateBuffer((op.rootBranchEntries + 1) * op.typeSize, pool));
691 // Offsets need to be read immediately to know how many values are there
693 }
694 ops.push_back({});
695 auto& valueOp = ops.back();
696 valueOp.branch = (TBranch*)branches->At(mapping.mainBranchIdx);
697 valueOp.rootBranchEntries = valueOp.branch->GetEntries();
698 // In case this is a vla, we set the offsetCount as totalEntries
699 // In case we read booleans we need a special coversion from bytes to bits.
700 auto listType = std::dynamic_pointer_cast<arrow::FixedSizeListType>(datasetField->type());
701 valueOp.typeSize = physicalField->type()->byte_width();
702 // Notice how we are not (yet) allocating buffers at this point. We merely
703 // create placeholders to subsequently fill.
704 if ((datasetField->type() == arrow::boolean())) {
705 valueOp.kind = ReadOpKind::Booleans;
706 valueOp.listSize = 1;
707 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp((valueOp.rootBranchEntries + 7) / 8);
708 } else if (listType && datasetField->type()->field(0)->type() == arrow::boolean()) {
709 valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width();
710 valueOp.listSize = listType->list_size();
711 valueOp.kind = ReadOpKind::Booleans;
712 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp((valueOp.rootBranchEntries * valueOp.listSize) / 8 + 1);
713 } else if (mapping.vlaIdx != -1) {
714 valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width();
715 valueOp.listSize = -1;
716 // -1 is the current one, -2 is the one with for the offsets
717 valueOp.kind = ReadOpKind::VLA;
718 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(ops[ops.size() - 2].offsetCount * valueOp.typeSize);
719 } else if (listType) {
720 valueOp.kind = ReadOpKind::Values;
721 valueOp.listSize = listType->list_size();
722 valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width();
723 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(valueOp.rootBranchEntries * valueOp.typeSize * valueOp.listSize);
724 } else {
725 valueOp.typeSize = physicalField->type()->byte_width();
726 valueOp.kind = ReadOpKind::Values;
727 valueOp.listSize = 1;
728 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(valueOp.rootBranchEntries * valueOp.typeSize);
729 }
730 arrow::Status status;
731 std::shared_ptr<arrow::Array> array;
732
733 if (listType) {
734 auto vdata = std::make_shared<arrow::ArrayData>(datasetField->type()->field(0)->type(), valueOp.rootBranchEntries * valueOp.listSize,
735 std::vector<std::shared_ptr<arrow::Buffer>>{nullptr, valueOp.targetBuffer});
736 array = std::make_shared<arrow::FixedSizeListArray>(datasetField->type(), valueOp.rootBranchEntries, arrow::MakeArray(vdata));
737 // This is a vla, there is also an offset op
738 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
739 valueOp.branch->GetName(),
740 valueOp.rootBranchEntries,
741 valueOp.targetBuffer->size());
742 } else if (mapping.vlaIdx != -1) {
743 auto& offsetOp = ops[ops.size() - 2];
744 auto vdata = std::make_shared<arrow::ArrayData>(datasetField->type()->field(0)->type(), offsetOp.offsetCount,
745 std::vector<std::shared_ptr<arrow::Buffer>>{nullptr, valueOp.targetBuffer});
746 // We have pushed an offset op if this was the case.
747 array = std::make_shared<arrow::ListArray>(datasetField->type(), offsetOp.rootBranchEntries, offsetOp.targetBuffer, arrow::MakeArray(vdata));
748 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
749 offsetOp.branch->GetName(), offsetOp.rootBranchEntries, offsetOp.targetBuffer->size());
750 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
751 valueOp.branch->GetName(),
752 offsetOp.offsetCount,
753 valueOp.targetBuffer->size());
754 } else {
755 auto data = std::make_shared<arrow::ArrayData>(datasetField->type(), valueOp.rootBranchEntries,
756 std::vector<std::shared_ptr<arrow::Buffer>>{nullptr, valueOp.targetBuffer});
757 array = arrow::MakeArray(data);
758 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
759 valueOp.branch->GetName(),
760 valueOp.rootBranchEntries,
761 valueOp.targetBuffer->size());
762 }
763
764 columns.push_back(array);
765 }
766
767 // Do the actual filling of the buffers. This happens after we have created the whole structure
768 // so that we can read directly in shared memory.
769 int64_t rows = -1;
770 for (size_t i = 0; i < ops.size(); ++i) {
771 auto& op = ops[i];
772 if (rows == -1 && op.kind != ReadOpKind::VLA) {
773 rows = op.rootBranchEntries;
774 }
775 if (rows == -1 && op.kind == ReadOpKind::VLA) {
776 auto& offsetOp = ops[i - 1];
777 rows = offsetOp.rootBranchEntries;
778 }
779 if (op.kind != ReadOpKind::VLA && rows != op.rootBranchEntries) {
780 throw runtime_error_f("Unmatching number of rows for branch %s. Expected %lli, found %lli", op.branch->GetName(), rows, op.rootBranchEntries);
781 }
782 if (op.kind == ReadOpKind::VLA && rows != ops[i - 1].rootBranchEntries) {
783 throw runtime_error_f("Unmatching number of rows for branch %s. Expected %lli, found %lli", op.branch->GetName(), rows, ops[i - 1].offsetCount);
784 }
785 }
786
787 auto batch = arrow::RecordBatch::Make(dataset_schema, rows, columns);
788 totalCompressedSize += tree->GetZipBytes();
789 totalUncompressedSize += tree->GetTotBytes();
790 O2_SIGNPOST_END(root_arrow_fs, tid, "Generator", "Done creating batch compressed:%zu uncompressed:%zu", totalCompressedSize, totalUncompressedSize);
791 return batch;
792 };
793 return generator;
794}
795
796char const* rootSuffixFromArrow(arrow::Type::type id)
797{
798 switch (id) {
799 case arrow::Type::BOOL:
800 return "/O";
801 case arrow::Type::UINT8:
802 return "/b";
803 case arrow::Type::UINT16:
804 return "/s";
805 case arrow::Type::UINT32:
806 return "/i";
807 case arrow::Type::UINT64:
808 return "/l";
809 case arrow::Type::INT8:
810 return "/B";
811 case arrow::Type::INT16:
812 return "/S";
813 case arrow::Type::INT32:
814 return "/I";
815 case arrow::Type::INT64:
816 return "/L";
817 case arrow::Type::FLOAT:
818 return "/F";
819 case arrow::Type::DOUBLE:
820 return "/D";
821 default:
822 throw runtime_error("Unsupported arrow column type");
823 }
824}
825
826arrow::Result<std::shared_ptr<arrow::io::OutputStream>> TTreeFileSystem::OpenOutputStream(
827 const std::string& path,
828 const std::shared_ptr<const arrow::KeyValueMetadata>& metadata)
829{
830 arrow::dataset::FileSource source{path, shared_from_this()};
831 auto prefix = metadata->Get("branch_prefix");
832 if (prefix.ok()) {
833 return std::make_shared<TTreeOutputStream>(GetTree(source).get(), *prefix);
834 }
835 return std::make_shared<TTreeOutputStream>(GetTree(source).get(), "");
836}
837
838namespace
839{
840struct BranchInfo {
841 std::string name;
842 TBranch* ptr;
843 bool mVLA;
844};
845} // namespace
846
847auto arrowTypeFromROOT(EDataType type, int size)
848{
849 auto typeGenerator = [](std::shared_ptr<arrow::DataType> const& type, int size) -> std::shared_ptr<arrow::DataType> {
850 switch (size) {
851 case -1:
852 return arrow::list(type);
853 case 1:
854 return std::move(type);
855 default:
856 return arrow::fixed_size_list(type, size);
857 }
858 };
859
860 switch (type) {
861 case EDataType::kBool_t:
862 return typeGenerator(arrow::boolean(), size);
863 case EDataType::kUChar_t:
864 return typeGenerator(arrow::uint8(), size);
865 case EDataType::kUShort_t:
866 return typeGenerator(arrow::uint16(), size);
867 case EDataType::kUInt_t:
868 return typeGenerator(arrow::uint32(), size);
869 case EDataType::kULong64_t:
870 return typeGenerator(arrow::uint64(), size);
871 case EDataType::kChar_t:
872 return typeGenerator(arrow::int8(), size);
873 case EDataType::kShort_t:
874 return typeGenerator(arrow::int16(), size);
875 case EDataType::kInt_t:
876 return typeGenerator(arrow::int32(), size);
877 case EDataType::kLong64_t:
878 return typeGenerator(arrow::int64(), size);
879 case EDataType::kFloat_t:
880 return typeGenerator(arrow::float32(), size);
881 case EDataType::kDouble_t:
882 return typeGenerator(arrow::float64(), size);
883 default:
884 throw o2::framework::runtime_error_f("Unsupported branch type: %d", static_cast<int>(type));
885 }
886}
887
888// This is a datatype for branches which implies
889struct RootTransientIndexType : arrow::ExtensionType {
890};
891
892arrow::Result<std::shared_ptr<arrow::Schema>> TTreeFileFormat::Inspect(const arrow::dataset::FileSource& source) const
893{
894 auto fs = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(source.filesystem());
895
896 if (!fs.get()) {
897 throw runtime_error_f("Unknown filesystem %s\n", source.filesystem()->type_name().c_str());
898 }
899 auto objectHandler = fs->GetObjectHandler(source);
900
901 if (!objectHandler->format->Equals(*this)) {
902 throw runtime_error_f("Unknown filesystem %s\n", source.filesystem()->type_name().c_str());
903 }
904
905 // Notice that we abuse of the API here and do not release the TTree,
906 // so that it's still managed by ROOT.
907 auto tree = objectHandler->GetObjectAsOwner<TTree>().release();
908
909 auto branches = tree->GetListOfBranches();
910 auto n = branches->GetEntries();
911
912 std::vector<std::shared_ptr<arrow::Field>> fields;
913
914 bool prevIsSize = false;
915 for (auto i = 0; i < n; ++i) {
916 auto branch = static_cast<TBranch*>(branches->At(i));
917 std::string name = branch->GetName();
918 if (prevIsSize && fields.back()->name() != name + "_size") {
919 throw runtime_error_f("Unexpected layout for VLA container %s.", branch->GetName());
920 }
921
922 if (name.ends_with("_size")) {
923 fields.emplace_back(std::make_shared<arrow::Field>(name, arrow::int32()));
924 prevIsSize = true;
925 } else {
926 static TClass* cls;
927 EDataType type;
928 branch->GetExpectedType(cls, type);
929
930 if (prevIsSize) {
931 fields.emplace_back(std::make_shared<arrow::Field>(name, arrowTypeFromROOT(type, -1)));
932 } else {
933 auto listSize = static_cast<TLeaf*>(branch->GetListOfLeaves()->At(0))->GetLenStatic();
934 fields.emplace_back(std::make_shared<arrow::Field>(name, arrowTypeFromROOT(type, listSize)));
935 }
936 prevIsSize = false;
937 }
938 }
939
940 if (fields.back()->name().ends_with("_size")) {
941 throw runtime_error_f("Missing values for VLA indices %s.", fields.back()->name().c_str());
942 }
943 return std::make_shared<arrow::Schema>(fields);
944}
945
947arrow::Result<std::shared_ptr<arrow::dataset::FileFragment>> TTreeFileFormat::MakeFragment(
948 arrow::dataset::FileSource source, arrow::compute::Expression partition_expression,
949 std::shared_ptr<arrow::Schema> physical_schema)
950{
951
952 return std::make_shared<TTreeFileFragment>(source, std::dynamic_pointer_cast<arrow::dataset::FileFormat>(shared_from_this()),
953 std::move(partition_expression),
954 physical_schema);
955}
956
958{
959 std::vector<TBranch*> branches;
960 std::vector<TBranch*> sizesBranches;
961 std::vector<std::shared_ptr<arrow::Array>> valueArrays;
962 std::vector<std::shared_ptr<arrow::Array>> sizeArrays;
963 std::vector<std::shared_ptr<arrow::DataType>> valueTypes;
964
965 std::vector<int64_t> valuesIdealBasketSize;
966 std::vector<int64_t> sizeIdealBasketSize;
967
968 std::vector<int64_t> typeSizes;
969 std::vector<int64_t> listSizes;
970 bool firstBasket = true;
971
972 // This is to create a batsket size according to the first batch.
973 void finaliseBasketSize(std::shared_ptr<arrow::RecordBatch> firstBatch)
974 {
975 O2_SIGNPOST_ID_FROM_POINTER(sid, root_arrow_fs, this);
976 O2_SIGNPOST_START(root_arrow_fs, sid, "finaliseBasketSize", "First batch with %lli rows received and %zu columns",
977 firstBatch->num_rows(), firstBatch->columns().size());
978 for (size_t i = 0; i < branches.size(); i++) {
979 auto* branch = branches[i];
980 auto* sizeBranch = sizesBranches[i];
981
982 int valueSize = valueTypes[i]->byte_width();
983 if (listSizes[i] == 1) {
984 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid, "finaliseBasketSize", "Branch %s exists and uses %d bytes per entry for %lli entries.",
985 branch->GetName(), valueSize, firstBatch->num_rows());
986 assert(sizeBranch == nullptr);
987 branch->SetBasketSize(1024 + firstBatch->num_rows() * valueSize);
988 } else if (listSizes[i] == -1) {
989 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid, "finaliseBasketSize", "Branch %s exists and uses %d bytes per entry.",
990 branch->GetName(), valueSize);
991 // This should probably lookup the
992 auto column = firstBatch->GetColumnByName(schema_->field(i)->name());
993 auto list = std::static_pointer_cast<arrow::ListArray>(column);
994 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid, "finaliseBasketSize", "Branch %s needed. Associated size branch %s and there are %lli entries of size %d in that list.",
995 branch->GetName(), sizeBranch->GetName(), list->length(), valueSize);
996 branch->SetBasketSize(1024 + firstBatch->num_rows() * valueSize * list->length());
997 sizeBranch->SetBasketSize(1024 + firstBatch->num_rows() * 4);
998 } else {
999 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid, "finaliseBasketSize", "Branch %s needed. There are %lli entries per array of size %d in that list.",
1000 branch->GetName(), listSizes[i], valueSize);
1001 assert(sizeBranch == nullptr);
1002 branch->SetBasketSize(1024 + firstBatch->num_rows() * valueSize * listSizes[i]);
1003 }
1004
1005 auto field = firstBatch->schema()->field(i);
1006 if (field->name().starts_with("fIndexArray")) {
1007 // One int per array to keep track of the size
1008 int idealBasketSize = 4 * firstBatch->num_rows() + 1024 + field->type()->byte_width() * firstBatch->num_rows(); // minimal additional size needed, otherwise we get 2 baskets
1009 int basketSize = std::max(32000, idealBasketSize); // keep a minimum value
1010 sizeBranch->SetBasketSize(basketSize);
1011 branch->SetBasketSize(basketSize);
1012 }
1013 }
1014 O2_SIGNPOST_END(root_arrow_fs, sid, "finaliseBasketSize", "Done");
1015 }
1016
1017 public:
1018 // Create the TTree based on the physical_schema, not the one in the batch.
1019 // The write method will have to reconcile the two schemas.
1020 TTreeFileWriter(std::shared_ptr<arrow::Schema> schema, std::shared_ptr<arrow::dataset::FileWriteOptions> options,
1021 std::shared_ptr<arrow::io::OutputStream> destination,
1022 arrow::fs::FileLocator destination_locator)
1023 : FileWriter(schema, options, destination, destination_locator)
1024 {
1025 // Batches have the same number of entries for each column.
1026 auto directoryStream = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(destination_);
1027 auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
1028
1029 if (directoryStream.get()) {
1030 TDirectoryFile* dir = directoryStream->GetDirectory();
1031 dir->cd();
1032 auto* tree = new TTree(destination_locator_.path.c_str(), "");
1033 treeStream = std::make_shared<TTreeOutputStream>(tree, "");
1034 } else if (treeStream.get()) {
1035 // We already have a tree stream, let's derive a new one
1036 // with the destination_locator_.path as prefix for the branches
1037 // This way we can multiplex multiple tables in the same tree.
1038 auto* tree = treeStream->GetTree();
1039 treeStream = std::make_shared<TTreeOutputStream>(tree, destination_locator_.path);
1040 } else {
1041 // I could simply set a prefix here to merge to an already existing tree.
1042 throw std::runtime_error("Unsupported backend.");
1043 }
1044
1045 for (auto i = 0u; i < schema->fields().size(); ++i) {
1046 auto& field = schema->field(i);
1047 listSizes.push_back(1);
1048
1049 int valuesIdealBasketSize = 0;
1050 // Construct all the needed branches.
1051 switch (field->type()->id()) {
1052 case arrow::Type::FIXED_SIZE_LIST: {
1053 listSizes.back() = std::static_pointer_cast<arrow::FixedSizeListType>(field->type())->list_size();
1054 valuesIdealBasketSize = 1024 + valueTypes.back()->byte_width() * listSizes.back();
1055 valueTypes.push_back(field->type()->field(0)->type());
1056 sizesBranches.push_back(nullptr);
1057 std::string leafList = fmt::format("{}[{}]{}", field->name(), listSizes.back(), rootSuffixFromArrow(valueTypes.back()->id()));
1058 branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
1059 } break;
1060 case arrow::Type::LIST: {
1061 valueTypes.push_back(field->type()->field(0)->type());
1062 std::string leafList = fmt::format("{}[{}_size]{}", field->name(), field->name(), rootSuffixFromArrow(valueTypes.back()->id()));
1063 listSizes.back() = -1; // VLA, we need to calculate it on the fly;
1064 std::string sizeLeafList = field->name() + "_size/I";
1065 sizesBranches.push_back(treeStream->CreateBranch((field->name() + "_size").c_str(), sizeLeafList.c_str()));
1066 branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
1067 // Notice that this could be replaced by a better guess of the
1068 // average size of the list elements, but this is not trivial.
1069 } break;
1070 default: {
1071 valueTypes.push_back(field->type());
1072 std::string leafList = field->name() + rootSuffixFromArrow(valueTypes.back()->id());
1073 sizesBranches.push_back(nullptr);
1074 branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
1075 } break;
1076 }
1077 }
1078 // We create the branches from the schema
1079 }
1080
1081 arrow::Status Write(const std::shared_ptr<arrow::RecordBatch>& batch) override
1082 {
1083 if (firstBasket) {
1084 firstBasket = false;
1085 finaliseBasketSize(batch);
1086 }
1087
1088 // Support writing empty tables
1089 if (batch->columns().empty() || batch->num_rows() == 0) {
1090 return arrow::Status::OK();
1091 }
1092
1093 // Batches have the same number of entries for each column.
1094 auto directoryStream = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(destination_);
1095 TTree* tree = nullptr;
1096 if (directoryStream.get()) {
1097 TDirectoryFile* dir = directoryStream->GetDirectory();
1098 tree = (TTree*)dir->Get(destination_locator_.path.c_str());
1099 }
1100 auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
1101
1102 if (!tree) {
1103 // I could simply set a prefix here to merge to an already existing tree.
1104 throw std::runtime_error("Unsupported backend.");
1105 }
1106
1107 for (auto i = 0u; i < batch->columns().size(); ++i) {
1108 auto column = batch->column(i);
1109 auto& field = batch->schema()->field(i);
1110
1111 valueArrays.push_back(nullptr);
1112
1113 switch (field->type()->id()) {
1114 case arrow::Type::FIXED_SIZE_LIST: {
1115 auto list = std::static_pointer_cast<arrow::FixedSizeListArray>(column);
1116 if (list->list_type()->field(0)->type()->id() == arrow::Type::BOOL) {
1117 int64_t length = list->length() * list->list_type()->list_size();
1118 arrow::UInt8Builder builder;
1119 auto ok = builder.Reserve(length);
1120 // I need to build an array of uint8_t for the conversion to ROOT which uses
1121 // bytes for boolans.
1122 auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(list->values());
1123 for (int64_t i = 0; i < length; ++i) {
1124 if (boolArray->IsValid(i)) {
1125 // Expand each boolean value (true/false) to uint8 (1/0)
1126 uint8_t value = boolArray->Value(i) ? 1 : 0;
1127 auto ok = builder.Append(value);
1128 } else {
1129 // Append null for invalid entries
1130 auto ok = builder.AppendNull();
1131 }
1132 }
1133 valueArrays.back() = *builder.Finish();
1134 } else {
1135 valueArrays.back() = list->values();
1136 }
1137 } break;
1138 case arrow::Type::LIST: {
1139 auto list = std::static_pointer_cast<arrow::ListArray>(column);
1140 valueArrays.back() = list->values();
1141 } break;
1142 case arrow::Type::BOOL: {
1143 // In case of arrays of booleans, we need to go back to their
1144 // char based representation for ROOT to save them.
1145 auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(column);
1146
1147 int64_t length = boolArray->length();
1148 arrow::UInt8Builder builder;
1149 auto ok = builder.Reserve(length);
1150
1151 for (int64_t i = 0; i < length; ++i) {
1152 if (boolArray->IsValid(i)) {
1153 // Expand each boolean value (true/false) to uint8 (1/0)
1154 uint8_t value = boolArray->Value(i) ? 1 : 0;
1155 auto ok = builder.Append(value);
1156 } else {
1157 // Append null for invalid entries
1158 auto ok = builder.AppendNull();
1159 }
1160 }
1161 valueArrays.back() = *builder.Finish();
1162 } break;
1163 default:
1164 valueArrays.back() = column;
1165 }
1166 }
1167
1168 int64_t pos = 0;
1169 while (pos < batch->num_rows()) {
1170 for (size_t bi = 0; bi < branches.size(); ++bi) {
1171 auto* branch = branches[bi];
1172 auto* sizeBranch = sizesBranches[bi];
1173 auto array = batch->column(bi);
1174 auto& field = batch->schema()->field(bi);
1175 auto& listSize = listSizes[bi];
1176 auto valueType = valueTypes[bi];
1177 auto valueArray = valueArrays[bi];
1178
1179 switch (field->type()->id()) {
1180 case arrow::Type::LIST: {
1181 auto list = std::static_pointer_cast<arrow::ListArray>(array);
1182 listSize = list->value_length(pos);
1183 uint8_t const* buffer = std::static_pointer_cast<arrow::PrimitiveArray>(valueArray)->values()->data() + array->offset() + list->value_offset(pos) * valueType->byte_width();
1184 branch->SetAddress((void*)buffer);
1185 sizeBranch->SetAddress(&listSize);
1186 } break;
1187 case arrow::Type::FIXED_SIZE_LIST:
1188 default: {
1189 // needed for the boolean case, I should probably cache this.
1190 auto byteWidth = valueType->byte_width() ? valueType->byte_width() : 1;
1191 uint8_t const* buffer = std::static_pointer_cast<arrow::PrimitiveArray>(valueArray)->values()->data() + array->offset() + pos * listSize * byteWidth;
1192 branch->SetAddress((void*)buffer);
1193 };
1194 }
1195 }
1196 tree->Fill();
1197 ++pos;
1198 }
1199 return arrow::Status::OK();
1200 }
1201
1202 arrow::Future<> FinishInternal() override
1203 {
1204 auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
1205 auto* tree = treeStream->GetTree();
1206 tree->Write("", TObject::kOverwrite);
1207 tree->SetDirectory(nullptr);
1208
1209 return {};
1210 };
1211};
1212arrow::Result<std::shared_ptr<arrow::dataset::FileWriter>> TTreeFileFormat::MakeWriter(std::shared_ptr<arrow::io::OutputStream> destination, std::shared_ptr<arrow::Schema> schema, std::shared_ptr<arrow::dataset::FileWriteOptions> options, arrow::fs::FileLocator destination_locator) const
1213{
1214 auto writer = std::make_shared<TTreeFileWriter>(schema, options, destination, destination_locator);
1215 return std::dynamic_pointer_cast<arrow::dataset::FileWriter>(writer);
1216}
1217
1218std::shared_ptr<arrow::dataset::FileWriteOptions> TTreeFileFormat::DefaultWriteOptions()
1219{
1220 std::shared_ptr<TTreeFileWriteOptions> options(
1221 new TTreeFileWriteOptions(shared_from_this()));
1222 return options;
1223}
1224
1226
1230} // namespace o2::framework
std::shared_ptr< arrow::Schema > schema
std::vector< std::shared_ptr< arrow::Field > > fields
int32_t i
o2::raw::RawFileWriter * raw
uint32_t op
#define DEFINE_DPL_PLUGIN_INSTANCE(NAME, KIND)
Definition Plugins.h:112
#define DEFINE_DPL_PLUGINS_END
Definition Plugins.h:115
#define DEFINE_DPL_PLUGINS_BEGIN
Definition Plugins.h:107
uint16_t pos
Definition RawData.h:3
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:505
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
TBranch * ptr
bool mVLA
std::string type_name() const override
arrow::Result< arrow::fs::FileInfo > GetFileInfo(std::string const &path) override
std::shared_ptr< RootObjectHandler > GetObjectHandler(arrow::dataset::FileSource source) override
std::unique_ptr< TTree > & GetTree(arrow::dataset::FileSource) override
arrow::Status Reset(std::vector< ReadOps > ops, int64_t initial_capacity, arrow::MemoryPool *pool)
Initialize state of OutputStream with newly allocated memory and set position to 0.
arrow::Status Write(const void *data, int64_t nbytes) override
arrow::Result< std::shared_ptr< arrow::Buffer > > Finish()
Close the stream and return the buffer.
arrow::Result< int64_t > Tell() const override
static arrow::Result< std::shared_ptr< TTreeDeferredReadOutputStream > > Create(std::vector< ReadOps > &ops, int64_t initial_capacity=4096, arrow::MemoryPool *pool=arrow::default_memory_pool())
Create in-memory output stream with indicated capacity using a memory pool.
arrow::Status Close() override
Close the stream, preserving the buffer (retrieve it with Finish()).
TTreeFileFormat(size_t &totalCompressedSize, size_t &totalUncompressedSize)
arrow::Result< arrow::RecordBatchGenerator > ScanBatchesAsync(const std::shared_ptr< arrow::dataset::ScanOptions > &options, const std::shared_ptr< arrow::dataset::FileFragment > &fragment) const override
std::shared_ptr< arrow::dataset::FileWriteOptions > DefaultWriteOptions() override
arrow::Result< bool > IsSupported(const arrow::dataset::FileSource &source) const override
arrow::Result< std::shared_ptr< arrow::Schema > > Inspect(const arrow::dataset::FileSource &source) const override
std::string type_name() const override
bool Equals(const FileFormat &other) const override
arrow::Result< std::shared_ptr< arrow::dataset::FileWriter > > MakeWriter(std::shared_ptr< arrow::io::OutputStream > destination, std::shared_ptr< arrow::Schema > schema, std::shared_ptr< arrow::dataset::FileWriteOptions > options, arrow::fs::FileLocator destination_locator) const override
~TTreeFileFormat() override=default
arrow::Result< std::shared_ptr< arrow::dataset::FileFragment > > MakeFragment(arrow::dataset::FileSource source, arrow::compute::Expression partition_expression, std::shared_ptr< arrow::Schema > physical_schema) override
Create a FileFragment for a FileSource.
TTreeFileFragment(arrow::dataset::FileSource source, std::shared_ptr< arrow::dataset::FileFormat > format, arrow::compute::Expression partition_expression, std::shared_ptr< arrow::Schema > physical_schema)
std::vector< ReadOps > & ops()
std::shared_ptr< arrow::Buffer > GetPlaceholderForOp(size_t size)
arrow::Result< std::shared_ptr< arrow::io::OutputStream > > OpenOutputStream(const std::string &path, const std::shared_ptr< const arrow::KeyValueMetadata > &metadata) override
virtual std::unique_ptr< TTree > & GetTree(arrow::dataset::FileSource source)=0
TTreeFileWriteOptions(std::shared_ptr< arrow::dataset::FileFormat > format)
arrow::Status Write(const std::shared_ptr< arrow::RecordBatch > &batch) override
arrow::Future FinishInternal() override
TTreeFileWriter(std::shared_ptr< arrow::Schema > schema, std::shared_ptr< arrow::dataset::FileWriteOptions > options, std::shared_ptr< arrow::io::OutputStream > destination, arrow::fs::FileLocator destination_locator)
arrow::Status Write(const void *data, int64_t nbytes) override
arrow::Result< int64_t > Tell() const override
arrow::Status Close() override
TBranch * CreateBranch(char const *branchName, char const *sizeBranch)
TTreeOutputStream(TTree *, std::string branchPrefix)
GLdouble n
Definition glcorearb.h:1982
GLint GLsizei count
Definition glcorearb.h:399
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLsizei const GLuint const GLintptr * offsets
Definition glcorearb.h:2595
GLenum array
Definition glcorearb.h:4274
GLuint const GLchar * name
Definition glcorearb.h:781
GLdouble f
Definition glcorearb.h:310
GLsizei GLsizei GLchar * source
Definition glcorearb.h:798
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum target
Definition glcorearb.h:1641
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLboolean * data
Definition glcorearb.h:298
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei GLsizei * length
Definition glcorearb.h:790
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
GLint ref
Definition glcorearb.h:291
GLint GLint GLsizei GLint GLenum format
Definition glcorearb.h:275
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
RuntimeErrorRef runtime_error(const char *)
TBufferFile & rootBuffer()
char const * rootSuffixFromArrow(arrow::Type::type id)
auto arrowTypeFromROOT(EDataType type, int size)
void bigEndianCopy(void *dest, const void *src, int count, size_t typeSize)
Definition BigEndian.h:26
RuntimeErrorRef runtime_error_f(const char *,...)
std::shared_ptr< arrow::Buffer > targetBuffer
std::function< std::shared_ptr< arrow::dataset::FileWriteOptions >()> options
std::shared_ptr< o2::framework::TTreeFileFormat > format
VectorOfTObjectPtrs other
ctfTree Write()
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))
std::vector< ReadoutWindowData > rows