Project
Loading...
Searching...
No Matches
TTreePlugin.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13#include "Framework/Plugins.h"
14#include "Framework/Signpost.h"
15#include "Framework/Endian.h"
16#include <TBufferFile.h>
17#include <TBufferIO.h>
18#include <arrow/buffer.h>
19#include <arrow/dataset/file_base.h>
20#include <arrow/extension_type.h>
21#include <arrow/memory_pool.h>
22#include <arrow/status.h>
23#include <arrow/type.h>
24#include <arrow/type_fwd.h>
25#include <arrow/util/key_value_metadata.h>
26#include <arrow/array/array_nested.h>
27#include <arrow/array/array_primitive.h>
28#include <arrow/array/builder_nested.h>
29#include <arrow/array/builder_primitive.h>
30#include <TTree.h>
31#include <TBranch.h>
32#include <TFile.h>
33#include <TLeaf.h>
34#include <unistd.h>
35#include <cstdint>
36#include <memory>
37#include <stdexcept>
38#include <iostream>
39
41
42namespace o2::framework
43{
44
45enum struct ReadOpKind {
46 Unknown,
47 Offsets,
48 Values,
50 VLA
51};
52
53struct ReadOps {
54 TBranch* branch = nullptr;
55 std::shared_ptr<arrow::Buffer> targetBuffer = nullptr;
56 int64_t rootBranchEntries = 0;
57 size_t typeSize = 0;
58 size_t listSize = 0;
59 // If this is an offset reading op, keep track of the actual
60 // range for the offsets, not only how many VLAs are there.
61 int64_t offsetCount = 0;
63};
64
70{
71 public:
72 explicit TTreeDeferredReadOutputStream(std::vector<ReadOps>& ops,
73 const std::shared_ptr<arrow::ResizableBuffer>& buffer);
74
81 static arrow::Result<std::shared_ptr<TTreeDeferredReadOutputStream>> Create(
82 std::vector<ReadOps>& ops,
83 int64_t initial_capacity = 4096,
84 arrow::MemoryPool* pool = arrow::default_memory_pool());
85
86 // By the time we call the destructor, the contents
87 // of the buffer are already moved to fairmq
88 // for being sent.
89 ~TTreeDeferredReadOutputStream() override = default;
90
91 // Implement the OutputStream interface
92
94 arrow::Status Close() override;
95 [[nodiscard]] bool closed() const override;
96 [[nodiscard]] arrow::Result<int64_t> Tell() const override;
97 arrow::Status Write(const void* data, int64_t nbytes) override;
98
100 using OutputStream::Write;
102
104 arrow::Result<std::shared_ptr<arrow::Buffer>> Finish();
105
111 arrow::Status Reset(std::vector<ReadOps> ops,
112 int64_t initial_capacity, arrow::MemoryPool* pool);
113
114 [[nodiscard]] int64_t capacity() const { return capacity_; }
115
116 private:
118 std::vector<ReadOps> ops_;
119
120 // Ensures there is sufficient space available to write nbytes
121 arrow::Status Reserve(int64_t nbytes);
122
123 std::shared_ptr<arrow::ResizableBuffer> buffer_;
124 bool is_open_;
125 int64_t capacity_;
126 int64_t position_;
127 uint8_t* mutable_data_;
128};
129
130static constexpr int64_t kBufferMinimumSize = 256;
131
132TTreeDeferredReadOutputStream::TTreeDeferredReadOutputStream()
133 : is_open_(false), capacity_(0), position_(0), mutable_data_(nullptr) {}
134
135TTreeDeferredReadOutputStream::TTreeDeferredReadOutputStream(std::vector<ReadOps>& ops,
136 const std::shared_ptr<arrow::ResizableBuffer>& buffer)
137 : ops_(ops),
138 buffer_(buffer),
139 is_open_(true),
140 capacity_(buffer->size()),
141 position_(0),
142 mutable_data_(buffer->mutable_data()) {}
143
144arrow::Result<std::shared_ptr<TTreeDeferredReadOutputStream>> TTreeDeferredReadOutputStream::Create(
145 std::vector<ReadOps>& ops,
146 int64_t initial_capacity, arrow::MemoryPool* pool)
147{
148 // ctor is private, so cannot use make_shared
149 auto ptr = std::shared_ptr<TTreeDeferredReadOutputStream>(new TTreeDeferredReadOutputStream);
150 RETURN_NOT_OK(ptr->Reset(ops, initial_capacity, pool));
151 return ptr;
152}
153
154arrow::Status TTreeDeferredReadOutputStream::Reset(std::vector<ReadOps> ops,
155 int64_t initial_capacity, arrow::MemoryPool* pool)
156{
157 ARROW_ASSIGN_OR_RAISE(buffer_, AllocateResizableBuffer(initial_capacity, pool));
158 ops_ = ops;
159 is_open_ = true;
160 capacity_ = initial_capacity;
161 position_ = 0;
162 mutable_data_ = buffer_->mutable_data();
163 return arrow::Status::OK();
164}
165
167{
168 if (is_open_) {
169 is_open_ = false;
170 if (position_ < capacity_) {
171 RETURN_NOT_OK(buffer_->Resize(position_, false));
172 }
173 }
174 return arrow::Status::OK();
175}
176
177bool TTreeDeferredReadOutputStream::closed() const { return !is_open_; }
178
179arrow::Result<std::shared_ptr<arrow::Buffer>> TTreeDeferredReadOutputStream::Finish()
180{
181 RETURN_NOT_OK(Close());
182 buffer_->ZeroPadding();
183 is_open_ = false;
184 return std::move(buffer_);
185}
186
187arrow::Result<int64_t> TTreeDeferredReadOutputStream::Tell() const { return position_; }
188
190 int readEntries = 0;
191 rootBuffer.Reset();
192 while (readEntries < op.rootBranchEntries) {
193 auto readLast = op.branch->GetBulkRead().GetEntriesSerialized(readEntries, rootBuffer);
194 if (readLast < 0) {
195 throw runtime_error_f("Error while reading branch %s starting from %zu.", op.branch->GetName(), readEntries);
196 }
197 int size = readLast * op.listSize;
198 readEntries += readLast;
199 swapCopy(target, rootBuffer.GetCurrent(), size, op.typeSize);
200 target += (ptrdiff_t)(size * op.typeSize);
201 }
202};
203
205 int readEntries = 0;
206 rootBuffer.Reset();
207 // Set to 0
208 memset(target, 0, op.targetBuffer->size());
209 int readLast = 0;
210 while (readEntries < op.rootBranchEntries) {
211 auto beginValue = readLast;
212 auto readLast = op.branch->GetBulkRead().GetBulkEntries(readEntries, rootBuffer);
213 int size = readLast * op.listSize;
214 readEntries += readLast;
215 for (int i = beginValue; i < beginValue + size; ++i) {
216 auto value = static_cast<uint8_t>(rootBuffer.GetCurrent()[i - beginValue] << (i % 8));
217 target[i / 8] |= value;
218 }
219 }
220};
221
222auto readVLAValues = [](uint8_t* target, ReadOps& op, ReadOps const& offsetOp, TBufferFile& rootBuffer) {
223 int readEntries = 0;
224 auto* tPtrOffset = reinterpret_cast<const int*>(offsetOp.targetBuffer->data());
225 std::span<int const> const offsets{tPtrOffset, tPtrOffset + offsetOp.rootBranchEntries + 1};
226
227 rootBuffer.Reset();
228 while (readEntries < op.rootBranchEntries) {
229 auto readLast = op.branch->GetBulkRead().GetEntriesSerialized(readEntries, rootBuffer);
230 int size = offsets[readEntries + readLast] - offsets[readEntries];
231 readEntries += readLast;
232 swapCopy(target, rootBuffer.GetCurrent(), size, op.typeSize);
233 target += (ptrdiff_t)(size * op.typeSize);
234 }
235};
236
238{
239 // FIXME: we will need more than one once we have multithreaded reading.
240 static TBufferFile rootBuffer{TBuffer::EMode::kWrite, 4 * 1024 * 1024};
241 return rootBuffer;
242}
243
244arrow::Status TTreeDeferredReadOutputStream::Write(const void* data, int64_t nbytes)
245{
246 if (ARROW_PREDICT_FALSE(!is_open_)) {
247 return arrow::Status::IOError("OutputStream is closed");
248 }
249 if (ARROW_PREDICT_TRUE(nbytes == 0)) {
250 return arrow::Status::OK();
251 }
252 if (ARROW_PREDICT_FALSE(position_ + nbytes >= capacity_)) {
253 RETURN_NOT_OK(Reserve(nbytes));
254 }
255 // This is a real address which needs to be copied. Do it!
256 auto ref = (int64_t)data;
257 if (ref >= ops_.size()) {
258 memcpy(mutable_data_ + position_, data, nbytes);
259 position_ += nbytes;
260 return arrow::Status::OK();
261 }
262 auto& op = ops_[ref];
263
264 switch (op.kind) {
265 // Offsets need to be read in advance because we need to know
266 // how many elements are there in total (since TTree does not allow discovering such informantion)
268 break;
270 readValues(mutable_data_ + position_, op, rootBuffer());
271 break;
272 case ReadOpKind::VLA:
273 readVLAValues(mutable_data_ + position_, op, ops_[ref - 1], rootBuffer());
274 break;
276 readBoolValues(mutable_data_ + position_, op, rootBuffer());
277 break;
279 throw runtime_error("Unknown Op");
280 }
281 op.branch->SetStatus(false);
282 op.branch->DropBaskets("all");
283 op.branch->Reset();
284 op.branch->GetTransientBuffer(0)->Expand(0);
285
286 position_ += nbytes;
287 return arrow::Status::OK();
288}
289
290arrow::Status TTreeDeferredReadOutputStream::Reserve(int64_t nbytes)
291{
292 // Always overallocate by doubling. It seems that it is a better growth
293 // strategy, at least for memory_benchmark.cc.
294 // This may be because it helps match the allocator's allocation buckets
295 // more exactly. Or perhaps it hits a sweet spot in jemalloc.
296 int64_t new_capacity = std::max(kBufferMinimumSize, capacity_);
297 new_capacity = position_ + nbytes;
298 if (new_capacity > capacity_) {
299 RETURN_NOT_OK(buffer_->Resize(new_capacity));
300 capacity_ = new_capacity;
301 mutable_data_ = buffer_->mutable_data();
302 }
303 return arrow::Status::OK();
304}
305
307{
308 public:
309 TTreeFileWriteOptions(std::shared_ptr<arrow::dataset::FileFormat> format)
310 : FileWriteOptions(format)
311 {
312 }
313};
314
315// A filesystem which allows me to get a TTree
317{
318 public:
320
321 arrow::Result<std::shared_ptr<arrow::io::OutputStream>> OpenOutputStream(
322 const std::string& path,
323 const std::shared_ptr<const arrow::KeyValueMetadata>& metadata) override;
324
325 virtual std::unique_ptr<TTree>& GetTree(arrow::dataset::FileSource source) = 0;
326};
327
329{
330 size_t& mTotCompressedSize;
331 size_t& mTotUncompressedSize;
332
333 public:
334 TTreeFileFormat(size_t& totalCompressedSize, size_t& totalUncompressedSize)
335 : FileFormat({}),
336 mTotCompressedSize(totalCompressedSize),
337 mTotUncompressedSize(totalUncompressedSize)
338 {
339 }
340
341 ~TTreeFileFormat() override = default;
342
343 std::string type_name() const override
344 {
345 return "ttree";
346 }
347
348 bool Equals(const FileFormat& other) const override
349 {
350 return other.type_name() == this->type_name();
351 }
352
353 arrow::Result<bool> IsSupported(const arrow::dataset::FileSource& source) const override
354 {
355 auto fs = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(source.filesystem());
356 if (!fs) {
357 return false;
358 }
359 return fs->CheckSupport(source);
360 }
361
362 arrow::Result<std::shared_ptr<arrow::Schema>> Inspect(const arrow::dataset::FileSource& source) const override;
364 arrow::Result<std::shared_ptr<arrow::dataset::FileFragment>> MakeFragment(
365 arrow::dataset::FileSource source, arrow::compute::Expression partition_expression,
366 std::shared_ptr<arrow::Schema> physical_schema) override;
367
368 arrow::Result<std::shared_ptr<arrow::dataset::FileWriter>> MakeWriter(std::shared_ptr<arrow::io::OutputStream> destination, std::shared_ptr<arrow::Schema> schema, std::shared_ptr<arrow::dataset::FileWriteOptions> options, arrow::fs::FileLocator destination_locator) const override;
369
370 std::shared_ptr<arrow::dataset::FileWriteOptions> DefaultWriteOptions() override;
371
372 arrow::Result<arrow::RecordBatchGenerator> ScanBatchesAsync(
373 const std::shared_ptr<arrow::dataset::ScanOptions>& options,
374 const std::shared_ptr<arrow::dataset::FileFragment>& fragment) const override;
375};
376
378{
379 public:
381 : TTreeFileSystem(),
382 mTree(tree)
383 {
384 }
385
386 arrow::Result<arrow::fs::FileInfo> GetFileInfo(std::string const& path) override;
387
388 std::string type_name() const override
389 {
390 return "ttree";
391 }
392
393 std::shared_ptr<RootObjectHandler> GetObjectHandler(arrow::dataset::FileSource source) override
394 {
395 return std::make_shared<RootObjectHandler>((void*)mTree.get(), std::make_shared<TTreeFileFormat>(mTotCompressedSize, mTotUncompressedSize));
396 }
397
398 std::unique_ptr<TTree>& GetTree(arrow::dataset::FileSource) override
399 {
400 // Simply return the only TTree we have
401 return mTree;
402 }
403
404 private:
405 size_t mTotUncompressedSize;
406 size_t mTotCompressedSize;
407 std::unique_ptr<TTree> mTree;
408};
409
410arrow::Result<arrow::fs::FileInfo> SingleTreeFileSystem::GetFileInfo(std::string const& path)
411{
412 arrow::dataset::FileSource source(path, shared_from_this());
413 arrow::fs::FileInfo result;
414 result.set_path(path);
415 result.set_type(arrow::fs::FileType::File);
416 return result;
417}
418
419// A fragment which holds a tree
421{
422 public:
423 TTreeFileFragment(arrow::dataset::FileSource source,
424 std::shared_ptr<arrow::dataset::FileFormat> format,
425 arrow::compute::Expression partition_expression,
426 std::shared_ptr<arrow::Schema> physical_schema)
427 : FileFragment(source, format, std::move(partition_expression), physical_schema)
428 {
429 auto rootFS = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(this->source().filesystem());
430 if (rootFS.get() == nullptr) {
431 throw runtime_error_f("Unknown filesystem %s when reading %s.",
432 source.filesystem()->type_name().c_str(), source.path().c_str());
433 }
434 auto objectHandler = rootFS->GetObjectHandler(source);
435 if (!objectHandler->format->Equals(*format)) {
436 throw runtime_error_f("Cannot read source %s with format %s to pupulate a TTreeFileFragment.",
437 source.path().c_str(), objectHandler->format->type_name().c_str());
438 };
439 mTree = objectHandler->GetObjectAsOwner<TTree>();
440 }
441
442 TTree* GetTree()
443 {
444 return mTree.get();
445 }
446
447 std::vector<ReadOps>& ops()
448 {
449 return mOps;
450 }
451
454 std::shared_ptr<arrow::Buffer> GetPlaceholderForOp(size_t size)
455 {
456 return std::make_shared<arrow::Buffer>((uint8_t*)(mOps.size() - 1), size);
457 }
458
459 private:
460 std::unique_ptr<TTree> mTree;
461 std::vector<ReadOps> mOps;
462};
463
464// An arrow outputstream which allows to write to a TTree. Eventually
465// with a prefix for the branches.
467{
468 public:
469 // Using a pointer means that the tree itself is owned by another
470 // class
471 TTreeOutputStream(TTree*, std::string branchPrefix);
472
473 arrow::Status Close() override;
474
475 arrow::Result<int64_t> Tell() const override;
476
477 arrow::Status Write(const void* data, int64_t nbytes) override;
478
479 bool closed() const override;
480
481 TBranch* CreateBranch(char const* branchName, char const* sizeBranch);
482
483 TTree* GetTree()
484 {
485 return mTree;
486 }
487
488 private:
489 TTree* mTree;
490 std::string mBranchPrefix;
491};
492
493// An arrow outputstream which allows to write to a ttree
494// @a branch prefix is to be used to identify a set of branches which all belong to
495// the same table.
496TTreeOutputStream::TTreeOutputStream(TTree* f, std::string branchPrefix)
497 : mTree(f),
498 mBranchPrefix(std::move(branchPrefix))
499{
500}
501
503{
504 if (mTree->GetCurrentFile() == nullptr) {
505 return arrow::Status::Invalid("Cannot close a tree not attached to a file");
506 }
507 mTree->GetCurrentFile()->Close();
508 return arrow::Status::OK();
509}
510
511arrow::Result<int64_t> TTreeOutputStream::Tell() const
512{
513 return arrow::Result<int64_t>(arrow::Status::NotImplemented("Cannot move"));
514}
515
516arrow::Status TTreeOutputStream::Write(const void* data, int64_t nbytes)
517{
518 return arrow::Status::NotImplemented("Cannot write raw bytes to a TTree");
519}
520
522{
523 // A standalone tree is never closed.
524 if (mTree->GetCurrentFile() == nullptr) {
525 return false;
526 }
527 return mTree->GetCurrentFile()->IsOpen() == false;
528}
529
530TBranch* TTreeOutputStream::CreateBranch(char const* branchName, char const* sizeBranch)
531{
532 if (mBranchPrefix.empty() == true) {
533 return mTree->Branch(branchName, (char*)nullptr, sizeBranch);
534 }
535 return mTree->Branch((mBranchPrefix + "/" + branchName).c_str(), (char*)nullptr, (mBranchPrefix + sizeBranch).c_str());
536}
537
541 std::shared_ptr<o2::framework::TTreeFileFormat> format = nullptr;
542};
543
546 {
547 auto context = new TTreePluginContext;
548 context->format = std::make_shared<o2::framework::TTreeFileFormat>(context->totalCompressedSize, context->totalUncompressedSize);
549 return new RootArrowFactory{
550 .options = [context]() { return context->format->DefaultWriteOptions(); },
551 .format = [context]() { return context->format; },
552 .deferredOutputStreamer = [](std::shared_ptr<arrow::dataset::FileFragment> fragment, const std::shared_ptr<arrow::ResizableBuffer>& buffer) -> std::shared_ptr<arrow::io::OutputStream> {
553 auto treeFragment = std::dynamic_pointer_cast<TTreeFileFragment>(fragment);
554 return std::make_shared<TTreeDeferredReadOutputStream>(treeFragment->ops(), buffer);
555 }};
556 }
557};
558
564
566 uint32_t offset = 0;
567 std::span<int> offsets;
568 int readEntries = 0;
569 int count = 0;
570 auto* tPtrOffset = reinterpret_cast<int*>(op.targetBuffer->mutable_data());
571 offsets = std::span<int>{tPtrOffset, tPtrOffset + op.rootBranchEntries + 1};
572
573 // read sizes first
574 rootBuffer.Reset();
575 while (readEntries < op.rootBranchEntries) {
576 auto readLast = op.branch->GetBulkRead().GetEntriesSerialized(readEntries, rootBuffer);
577 if (readLast == -1) {
578 throw runtime_error_f("Unable to read from branch %s.", op.branch->GetName());
579 }
580 readEntries += readLast;
581 for (auto i = 0; i < readLast; ++i) {
582 offsets[count++] = (int)offset;
583 offset += swap32_(reinterpret_cast<uint32_t*>(rootBuffer.GetCurrent())[i]);
584 }
585 }
587 op.offsetCount = offset;
588};
589
590arrow::Result<arrow::RecordBatchGenerator> TTreeFileFormat::ScanBatchesAsync(
591 const std::shared_ptr<arrow::dataset::ScanOptions>& options,
592 const std::shared_ptr<arrow::dataset::FileFragment>& fragment) const
593{
594 assert(options->dataset_schema != nullptr);
595 // This is the schema we want to read
596 auto dataset_schema = options->dataset_schema;
597 auto treeFragment = std::dynamic_pointer_cast<TTreeFileFragment>(fragment);
598 if (treeFragment.get() == nullptr) {
599 return {arrow::Status::NotImplemented("Not a ttree fragment")};
600 }
601
602 auto generator = [pool = options->pool, treeFragment, dataset_schema, &totalCompressedSize = mTotCompressedSize,
603 &totalUncompressedSize = mTotUncompressedSize]() -> arrow::Future<std::shared_ptr<arrow::RecordBatch>> {
604 O2_SIGNPOST_ID_FROM_POINTER(tid, root_arrow_fs, treeFragment->GetTree());
605 O2_SIGNPOST_START(root_arrow_fs, tid, "Generator", "Creating batch for tree %{public}s", treeFragment->GetTree()->GetName());
606 std::vector<std::shared_ptr<arrow::Array>> columns;
607 std::vector<std::shared_ptr<arrow::Field>> fields = dataset_schema->fields();
608 auto physical_schema = *treeFragment->ReadPhysicalSchema();
609
610 if (dataset_schema->num_fields() > physical_schema->num_fields()) {
611 throw runtime_error_f("One TTree must have all the fields requested in a table");
612 }
613
614 // Register physical fields into the cache
615 std::vector<BranchFieldMapping> mappings;
616
617 // We need to count the number of readops to avoid moving the vector.
618 int opsCount = 0;
619 for (int fi = 0; fi < dataset_schema->num_fields(); ++fi) {
620 auto dataset_field = dataset_schema->field(fi);
621 // This is needed because for now the dataset_field
622 // is actually the schema of the ttree
623 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Processing dataset field %{public}s.", dataset_field->name().c_str());
624 int physicalFieldIdx = physical_schema->GetFieldIndex(dataset_field->name());
625
626 if (physicalFieldIdx < 0) {
627 throw runtime_error_f("Cannot find physical field associated to %s. Possible fields: %s",
628 dataset_field->name().c_str(), physical_schema->ToString().c_str());
629 }
630 if (physicalFieldIdx > 0 && physical_schema->field(physicalFieldIdx - 1)->name().ends_with("_size")) {
631 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Field %{public}s has sizes in %{public}s.", dataset_field->name().c_str(),
632 physical_schema->field(physicalFieldIdx - 1)->name().c_str());
633 mappings.push_back({physicalFieldIdx, physicalFieldIdx - 1, fi});
634 opsCount += 2;
635 } else {
636 if (physicalFieldIdx > 0) {
637 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Field %{public}s previous field is %{public}s.", dataset_field->name().c_str(),
638 physical_schema->field(physicalFieldIdx - 1)->name().c_str());
639 }
640 mappings.push_back({physicalFieldIdx, -1, fi});
641 opsCount++;
642 }
643 }
644
645 auto* tree = treeFragment->GetTree();
646 auto branches = tree->GetListOfBranches();
647 size_t totalTreeSize = 0;
648 std::vector<TBranch*> selectedBranches;
649 for (auto& mapping : mappings) {
650 selectedBranches.push_back((TBranch*)branches->At(mapping.mainBranchIdx));
651 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Adding branch %{public}s to stream.", selectedBranches.back()->GetName());
652 totalTreeSize += selectedBranches.back()->GetTotalSize();
653 if (mapping.vlaIdx != -1) {
654 selectedBranches.push_back((TBranch*)branches->At(mapping.vlaIdx));
655 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Adding branch %{public}s to stream.", selectedBranches.back()->GetName());
656 totalTreeSize += selectedBranches.back()->GetTotalSize();
657 }
658 }
659
660 size_t cacheSize = std::max(std::min(totalTreeSize, 25000000UL), 1000000UL);
661 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Resizing cache to %zu.", cacheSize);
662 tree->SetCacheSize(cacheSize);
663 for (auto* branch : selectedBranches) {
664 tree->AddBranchToCache(branch, false);
665 }
666 tree->StopCacheLearningPhase();
667
668 // Intermediate buffer to bulk read. Two for now
669 std::vector<ReadOps>& ops = treeFragment->ops();
670 ops.clear();
671 ops.reserve(opsCount);
672 for (size_t mi = 0; mi < mappings.size(); ++mi) {
673 BranchFieldMapping mapping = mappings[mi];
674 // The field actually on disk
675 auto datasetField = dataset_schema->field(mapping.datasetFieldIdx);
676 auto physicalField = physical_schema->field(mapping.mainBranchIdx);
677
678 if (mapping.vlaIdx != -1) {
679 auto* branch = (TBranch*)branches->At(mapping.vlaIdx);
680 ops.emplace_back(ReadOps{
681 .branch = branch,
682 .rootBranchEntries = branch->GetEntries(),
683 .typeSize = 4,
684 .listSize = 1,
685 .kind = ReadOpKind::Offsets,
686 });
687 auto& op = ops.back();
688 ARROW_ASSIGN_OR_RAISE(op.targetBuffer, arrow::AllocateBuffer((op.rootBranchEntries + 1) * op.typeSize, pool));
689 // Offsets need to be read immediately to know how many values are there
691 }
692 ops.push_back({});
693 auto& valueOp = ops.back();
694 valueOp.branch = (TBranch*)branches->At(mapping.mainBranchIdx);
695 valueOp.rootBranchEntries = valueOp.branch->GetEntries();
696 // In case this is a vla, we set the offsetCount as totalEntries
697 // In case we read booleans we need a special coversion from bytes to bits.
698 auto listType = std::dynamic_pointer_cast<arrow::FixedSizeListType>(datasetField->type());
699 valueOp.typeSize = physicalField->type()->byte_width();
700 // Notice how we are not (yet) allocating buffers at this point. We merely
701 // create placeholders to subsequently fill.
702 if ((datasetField->type() == arrow::boolean())) {
703 valueOp.kind = ReadOpKind::Booleans;
704 valueOp.listSize = 1;
705 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp((valueOp.rootBranchEntries + 7) / 8);
706 } else if (listType && datasetField->type()->field(0)->type() == arrow::boolean()) {
707 valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width();
708 valueOp.listSize = listType->list_size();
709 valueOp.kind = ReadOpKind::Booleans;
710 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp((valueOp.rootBranchEntries * valueOp.listSize) / 8 + 1);
711 } else if (mapping.vlaIdx != -1) {
712 valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width();
713 valueOp.listSize = -1;
714 // -1 is the current one, -2 is the one with for the offsets
715 valueOp.kind = ReadOpKind::VLA;
716 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(ops[ops.size() - 2].offsetCount * valueOp.typeSize);
717 } else if (listType) {
718 valueOp.kind = ReadOpKind::Values;
719 valueOp.listSize = listType->list_size();
720 valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width();
721 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(valueOp.rootBranchEntries * valueOp.typeSize * valueOp.listSize);
722 } else {
723 valueOp.typeSize = physicalField->type()->byte_width();
724 valueOp.kind = ReadOpKind::Values;
725 valueOp.listSize = 1;
726 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(valueOp.rootBranchEntries * valueOp.typeSize);
727 }
728 arrow::Status status;
729 std::shared_ptr<arrow::Array> array;
730
731 if (listType) {
732 auto varray = std::make_shared<arrow::PrimitiveArray>(datasetField->type()->field(0)->type(), valueOp.rootBranchEntries * valueOp.listSize, valueOp.targetBuffer);
733 array = std::make_shared<arrow::FixedSizeListArray>(datasetField->type(), valueOp.rootBranchEntries, varray);
734 // This is a vla, there is also an offset op
735 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
736 valueOp.branch->GetName(),
737 valueOp.rootBranchEntries,
738 valueOp.targetBuffer->size());
739 } else if (mapping.vlaIdx != -1) {
740 auto& offsetOp = ops[ops.size() - 2];
741 auto varray = std::make_shared<arrow::PrimitiveArray>(datasetField->type()->field(0)->type(), offsetOp.offsetCount, valueOp.targetBuffer);
742 // We have pushed an offset op if this was the case.
743 array = std::make_shared<arrow::ListArray>(datasetField->type(), offsetOp.rootBranchEntries, offsetOp.targetBuffer, varray);
744 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
745 offsetOp.branch->GetName(), offsetOp.rootBranchEntries, offsetOp.targetBuffer->size());
746 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
747 valueOp.branch->GetName(),
748 offsetOp.offsetCount,
749 valueOp.targetBuffer->size());
750 } else {
751 array = std::make_shared<arrow::PrimitiveArray>(datasetField->type(), valueOp.rootBranchEntries, valueOp.targetBuffer);
752 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
753 valueOp.branch->GetName(),
754 valueOp.rootBranchEntries,
755 valueOp.targetBuffer->size());
756 }
757
758 columns.push_back(array);
759 }
760
761 // Do the actual filling of the buffers. This happens after we have created the whole structure
762 // so that we can read directly in shared memory.
763 int64_t rows = -1;
764 for (size_t i = 0; i < ops.size(); ++i) {
765 auto& op = ops[i];
766 if (rows == -1 && op.kind != ReadOpKind::VLA) {
767 rows = op.rootBranchEntries;
768 }
769 if (rows == -1 && op.kind == ReadOpKind::VLA) {
770 auto& offsetOp = ops[i - 1];
771 rows = offsetOp.rootBranchEntries;
772 }
773 if (op.kind != ReadOpKind::VLA && rows != op.rootBranchEntries) {
774 throw runtime_error_f("Unmatching number of rows for branch %s. Expected %lli, found %lli", op.branch->GetName(), rows, op.rootBranchEntries);
775 }
776 if (op.kind == ReadOpKind::VLA && rows != ops[i - 1].rootBranchEntries) {
777 throw runtime_error_f("Unmatching number of rows for branch %s. Expected %lli, found %lli", op.branch->GetName(), rows, ops[i - 1].offsetCount);
778 }
779 }
780
781 auto batch = arrow::RecordBatch::Make(dataset_schema, rows, columns);
782 totalCompressedSize += tree->GetZipBytes();
783 totalUncompressedSize += tree->GetTotBytes();
784 O2_SIGNPOST_END(root_arrow_fs, tid, "Generator", "Done creating batch compressed:%zu uncompressed:%zu", totalCompressedSize, totalUncompressedSize);
785 return batch;
786 };
787 return generator;
788}
789
790char const* rootSuffixFromArrow(arrow::Type::type id)
791{
792 switch (id) {
793 case arrow::Type::BOOL:
794 return "/O";
795 case arrow::Type::UINT8:
796 return "/b";
797 case arrow::Type::UINT16:
798 return "/s";
799 case arrow::Type::UINT32:
800 return "/i";
801 case arrow::Type::UINT64:
802 return "/l";
803 case arrow::Type::INT8:
804 return "/B";
805 case arrow::Type::INT16:
806 return "/S";
807 case arrow::Type::INT32:
808 return "/I";
809 case arrow::Type::INT64:
810 return "/L";
811 case arrow::Type::FLOAT:
812 return "/F";
813 case arrow::Type::DOUBLE:
814 return "/D";
815 default:
816 throw runtime_error("Unsupported arrow column type");
817 }
818}
819
820arrow::Result<std::shared_ptr<arrow::io::OutputStream>> TTreeFileSystem::OpenOutputStream(
821 const std::string& path,
822 const std::shared_ptr<const arrow::KeyValueMetadata>& metadata)
823{
824 arrow::dataset::FileSource source{path, shared_from_this()};
825 auto prefix = metadata->Get("branch_prefix");
826 if (prefix.ok()) {
827 return std::make_shared<TTreeOutputStream>(GetTree(source).get(), *prefix);
828 }
829 return std::make_shared<TTreeOutputStream>(GetTree(source).get(), "");
830}
831
832namespace
833{
834struct BranchInfo {
835 std::string name;
836 TBranch* ptr;
837 bool mVLA;
838};
839} // namespace
840
841auto arrowTypeFromROOT(EDataType type, int size)
842{
843 auto typeGenerator = [](std::shared_ptr<arrow::DataType> const& type, int size) -> std::shared_ptr<arrow::DataType> {
844 switch (size) {
845 case -1:
846 return arrow::list(type);
847 case 1:
848 return std::move(type);
849 default:
850 return arrow::fixed_size_list(type, size);
851 }
852 };
853
854 switch (type) {
855 case EDataType::kBool_t:
856 return typeGenerator(arrow::boolean(), size);
857 case EDataType::kUChar_t:
858 return typeGenerator(arrow::uint8(), size);
859 case EDataType::kUShort_t:
860 return typeGenerator(arrow::uint16(), size);
861 case EDataType::kUInt_t:
862 return typeGenerator(arrow::uint32(), size);
863 case EDataType::kULong64_t:
864 return typeGenerator(arrow::uint64(), size);
865 case EDataType::kChar_t:
866 return typeGenerator(arrow::int8(), size);
867 case EDataType::kShort_t:
868 return typeGenerator(arrow::int16(), size);
869 case EDataType::kInt_t:
870 return typeGenerator(arrow::int32(), size);
871 case EDataType::kLong64_t:
872 return typeGenerator(arrow::int64(), size);
873 case EDataType::kFloat_t:
874 return typeGenerator(arrow::float32(), size);
875 case EDataType::kDouble_t:
876 return typeGenerator(arrow::float64(), size);
877 default:
878 throw o2::framework::runtime_error_f("Unsupported branch type: %d", static_cast<int>(type));
879 }
880}
881
882// This is a datatype for branches which implies
883struct RootTransientIndexType : arrow::ExtensionType {
884};
885
886arrow::Result<std::shared_ptr<arrow::Schema>> TTreeFileFormat::Inspect(const arrow::dataset::FileSource& source) const
887{
888 auto fs = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(source.filesystem());
889
890 if (!fs.get()) {
891 throw runtime_error_f("Unknown filesystem %s\n", source.filesystem()->type_name().c_str());
892 }
893 auto objectHandler = fs->GetObjectHandler(source);
894
895 if (!objectHandler->format->Equals(*this)) {
896 throw runtime_error_f("Unknown filesystem %s\n", source.filesystem()->type_name().c_str());
897 }
898
899 // Notice that we abuse of the API here and do not release the TTree,
900 // so that it's still managed by ROOT.
901 auto tree = objectHandler->GetObjectAsOwner<TTree>().release();
902
903 auto branches = tree->GetListOfBranches();
904 auto n = branches->GetEntries();
905
906 std::vector<std::shared_ptr<arrow::Field>> fields;
907
908 bool prevIsSize = false;
909 for (auto i = 0; i < n; ++i) {
910 auto branch = static_cast<TBranch*>(branches->At(i));
911 std::string name = branch->GetName();
912 if (prevIsSize && fields.back()->name() != name + "_size") {
913 throw runtime_error_f("Unexpected layout for VLA container %s.", branch->GetName());
914 }
915
916 if (name.ends_with("_size")) {
917 fields.emplace_back(std::make_shared<arrow::Field>(name, arrow::int32()));
918 prevIsSize = true;
919 } else {
920 static TClass* cls;
921 EDataType type;
922 branch->GetExpectedType(cls, type);
923
924 if (prevIsSize) {
925 fields.emplace_back(std::make_shared<arrow::Field>(name, arrowTypeFromROOT(type, -1)));
926 } else {
927 auto listSize = static_cast<TLeaf*>(branch->GetListOfLeaves()->At(0))->GetLenStatic();
928 fields.emplace_back(std::make_shared<arrow::Field>(name, arrowTypeFromROOT(type, listSize)));
929 }
930 prevIsSize = false;
931 }
932 }
933
934 if (fields.back()->name().ends_with("_size")) {
935 throw runtime_error_f("Missing values for VLA indices %s.", fields.back()->name().c_str());
936 }
937 return std::make_shared<arrow::Schema>(fields);
938}
939
941arrow::Result<std::shared_ptr<arrow::dataset::FileFragment>> TTreeFileFormat::MakeFragment(
942 arrow::dataset::FileSource source, arrow::compute::Expression partition_expression,
943 std::shared_ptr<arrow::Schema> physical_schema)
944{
945
946 return std::make_shared<TTreeFileFragment>(source, std::dynamic_pointer_cast<arrow::dataset::FileFormat>(shared_from_this()),
947 std::move(partition_expression),
948 physical_schema);
949}
950
952{
953 std::vector<TBranch*> branches;
954 std::vector<TBranch*> sizesBranches;
955 std::vector<std::shared_ptr<arrow::Array>> valueArrays;
956 std::vector<std::shared_ptr<arrow::Array>> sizeArrays;
957 std::vector<std::shared_ptr<arrow::DataType>> valueTypes;
958
959 std::vector<int64_t> valuesIdealBasketSize;
960 std::vector<int64_t> sizeIdealBasketSize;
961
962 std::vector<int64_t> typeSizes;
963 std::vector<int64_t> listSizes;
964 bool firstBasket = true;
965
966 // This is to create a batsket size according to the first batch.
967 void finaliseBasketSize(std::shared_ptr<arrow::RecordBatch> firstBatch)
968 {
969 O2_SIGNPOST_ID_FROM_POINTER(sid, root_arrow_fs, this);
970 O2_SIGNPOST_START(root_arrow_fs, sid, "finaliseBasketSize", "First batch with %lli rows received and %zu columns",
971 firstBatch->num_rows(), firstBatch->columns().size());
972 for (size_t i = 0; i < branches.size(); i++) {
973 auto* branch = branches[i];
974 auto* sizeBranch = sizesBranches[i];
975
976 int valueSize = valueTypes[i]->byte_width();
977 if (listSizes[i] == 1) {
978 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid, "finaliseBasketSize", "Branch %s exists and uses %d bytes per entry for %lli entries.",
979 branch->GetName(), valueSize, firstBatch->num_rows());
980 assert(sizeBranch == nullptr);
981 branch->SetBasketSize(1024 + firstBatch->num_rows() * valueSize);
982 } else if (listSizes[i] == -1) {
983 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid, "finaliseBasketSize", "Branch %s exists and uses %d bytes per entry.",
984 branch->GetName(), valueSize);
985 // This should probably lookup the
986 auto column = firstBatch->GetColumnByName(schema_->field(i)->name());
987 auto list = std::static_pointer_cast<arrow::ListArray>(column);
988 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid, "finaliseBasketSize", "Branch %s needed. Associated size branch %s and there are %lli entries of size %d in that list.",
989 branch->GetName(), sizeBranch->GetName(), list->length(), valueSize);
990 branch->SetBasketSize(1024 + firstBatch->num_rows() * valueSize * list->length());
991 sizeBranch->SetBasketSize(1024 + firstBatch->num_rows() * 4);
992 } else {
993 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid, "finaliseBasketSize", "Branch %s needed. There are %lli entries per array of size %d in that list.",
994 branch->GetName(), listSizes[i], valueSize);
995 assert(sizeBranch == nullptr);
996 branch->SetBasketSize(1024 + firstBatch->num_rows() * valueSize * listSizes[i]);
997 }
998
999 auto field = firstBatch->schema()->field(i);
1000 if (field->name().starts_with("fIndexArray")) {
1001 // One int per array to keep track of the size
1002 int idealBasketSize = 4 * firstBatch->num_rows() + 1024 + field->type()->byte_width() * firstBatch->num_rows(); // minimal additional size needed, otherwise we get 2 baskets
1003 int basketSize = std::max(32000, idealBasketSize); // keep a minimum value
1004 sizeBranch->SetBasketSize(basketSize);
1005 branch->SetBasketSize(basketSize);
1006 }
1007 }
1008 O2_SIGNPOST_END(root_arrow_fs, sid, "finaliseBasketSize", "Done");
1009 }
1010
1011 public:
1012 // Create the TTree based on the physical_schema, not the one in the batch.
1013 // The write method will have to reconcile the two schemas.
1014 TTreeFileWriter(std::shared_ptr<arrow::Schema> schema, std::shared_ptr<arrow::dataset::FileWriteOptions> options,
1015 std::shared_ptr<arrow::io::OutputStream> destination,
1016 arrow::fs::FileLocator destination_locator)
1017 : FileWriter(schema, options, destination, destination_locator)
1018 {
1019 // Batches have the same number of entries for each column.
1020 auto directoryStream = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(destination_);
1021 auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
1022
1023 if (directoryStream.get()) {
1024 TDirectoryFile* dir = directoryStream->GetDirectory();
1025 dir->cd();
1026 auto* tree = new TTree(destination_locator_.path.c_str(), "");
1027 treeStream = std::make_shared<TTreeOutputStream>(tree, "");
1028 } else if (treeStream.get()) {
1029 // We already have a tree stream, let's derive a new one
1030 // with the destination_locator_.path as prefix for the branches
1031 // This way we can multiplex multiple tables in the same tree.
1032 auto* tree = treeStream->GetTree();
1033 treeStream = std::make_shared<TTreeOutputStream>(tree, destination_locator_.path);
1034 } else {
1035 // I could simply set a prefix here to merge to an already existing tree.
1036 throw std::runtime_error("Unsupported backend.");
1037 }
1038
1039 for (auto i = 0u; i < schema->fields().size(); ++i) {
1040 auto& field = schema->field(i);
1041 listSizes.push_back(1);
1042
1043 int valuesIdealBasketSize = 0;
1044 // Construct all the needed branches.
1045 switch (field->type()->id()) {
1046 case arrow::Type::FIXED_SIZE_LIST: {
1047 listSizes.back() = std::static_pointer_cast<arrow::FixedSizeListType>(field->type())->list_size();
1048 valuesIdealBasketSize = 1024 + valueTypes.back()->byte_width() * listSizes.back();
1049 valueTypes.push_back(field->type()->field(0)->type());
1050 sizesBranches.push_back(nullptr);
1051 std::string leafList = fmt::format("{}[{}]{}", field->name(), listSizes.back(), rootSuffixFromArrow(valueTypes.back()->id()));
1052 branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
1053 } break;
1054 case arrow::Type::LIST: {
1055 valueTypes.push_back(field->type()->field(0)->type());
1056 std::string leafList = fmt::format("{}[{}_size]{}", field->name(), field->name(), rootSuffixFromArrow(valueTypes.back()->id()));
1057 listSizes.back() = -1; // VLA, we need to calculate it on the fly;
1058 std::string sizeLeafList = field->name() + "_size/I";
1059 sizesBranches.push_back(treeStream->CreateBranch((field->name() + "_size").c_str(), sizeLeafList.c_str()));
1060 branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
1061 // Notice that this could be replaced by a better guess of the
1062 // average size of the list elements, but this is not trivial.
1063 } break;
1064 default: {
1065 valueTypes.push_back(field->type());
1066 std::string leafList = field->name() + rootSuffixFromArrow(valueTypes.back()->id());
1067 sizesBranches.push_back(nullptr);
1068 branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
1069 } break;
1070 }
1071 }
1072 // We create the branches from the schema
1073 }
1074
1075 arrow::Status Write(const std::shared_ptr<arrow::RecordBatch>& batch) override
1076 {
1077 if (firstBasket) {
1078 firstBasket = false;
1079 finaliseBasketSize(batch);
1080 }
1081
1082 // Support writing empty tables
1083 if (batch->columns().empty() || batch->num_rows() == 0) {
1084 return arrow::Status::OK();
1085 }
1086
1087 // Batches have the same number of entries for each column.
1088 auto directoryStream = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(destination_);
1089 TTree* tree = nullptr;
1090 if (directoryStream.get()) {
1091 TDirectoryFile* dir = directoryStream->GetDirectory();
1092 tree = (TTree*)dir->Get(destination_locator_.path.c_str());
1093 }
1094 auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
1095
1096 if (!tree) {
1097 // I could simply set a prefix here to merge to an already existing tree.
1098 throw std::runtime_error("Unsupported backend.");
1099 }
1100
1101 for (auto i = 0u; i < batch->columns().size(); ++i) {
1102 auto column = batch->column(i);
1103 auto& field = batch->schema()->field(i);
1104
1105 valueArrays.push_back(nullptr);
1106
1107 switch (field->type()->id()) {
1108 case arrow::Type::FIXED_SIZE_LIST: {
1109 auto list = std::static_pointer_cast<arrow::FixedSizeListArray>(column);
1110 if (list->list_type()->field(0)->type()->id() == arrow::Type::BOOL) {
1111 int64_t length = list->length() * list->list_type()->list_size();
1112 arrow::UInt8Builder builder;
1113 auto ok = builder.Reserve(length);
1114 // I need to build an array of uint8_t for the conversion to ROOT which uses
1115 // bytes for boolans.
1116 auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(list->values());
1117 for (int64_t i = 0; i < length; ++i) {
1118 if (boolArray->IsValid(i)) {
1119 // Expand each boolean value (true/false) to uint8 (1/0)
1120 uint8_t value = boolArray->Value(i) ? 1 : 0;
1121 auto ok = builder.Append(value);
1122 } else {
1123 // Append null for invalid entries
1124 auto ok = builder.AppendNull();
1125 }
1126 }
1127 valueArrays.back() = *builder.Finish();
1128 } else {
1129 valueArrays.back() = list->values();
1130 }
1131 } break;
1132 case arrow::Type::LIST: {
1133 auto list = std::static_pointer_cast<arrow::ListArray>(column);
1134 valueArrays.back() = list->values();
1135 } break;
1136 case arrow::Type::BOOL: {
1137 // In case of arrays of booleans, we need to go back to their
1138 // char based representation for ROOT to save them.
1139 auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(column);
1140
1141 int64_t length = boolArray->length();
1142 arrow::UInt8Builder builder;
1143 auto ok = builder.Reserve(length);
1144
1145 for (int64_t i = 0; i < length; ++i) {
1146 if (boolArray->IsValid(i)) {
1147 // Expand each boolean value (true/false) to uint8 (1/0)
1148 uint8_t value = boolArray->Value(i) ? 1 : 0;
1149 auto ok = builder.Append(value);
1150 } else {
1151 // Append null for invalid entries
1152 auto ok = builder.AppendNull();
1153 }
1154 }
1155 valueArrays.back() = *builder.Finish();
1156 } break;
1157 default:
1158 valueArrays.back() = column;
1159 }
1160 }
1161
1162 int64_t pos = 0;
1163 while (pos < batch->num_rows()) {
1164 for (size_t bi = 0; bi < branches.size(); ++bi) {
1165 auto* branch = branches[bi];
1166 auto* sizeBranch = sizesBranches[bi];
1167 auto array = batch->column(bi);
1168 auto& field = batch->schema()->field(bi);
1169 auto& listSize = listSizes[bi];
1170 auto valueType = valueTypes[bi];
1171 auto valueArray = valueArrays[bi];
1172
1173 switch (field->type()->id()) {
1174 case arrow::Type::LIST: {
1175 auto list = std::static_pointer_cast<arrow::ListArray>(array);
1176 listSize = list->value_length(pos);
1177 uint8_t const* buffer = std::static_pointer_cast<arrow::PrimitiveArray>(valueArray)->values()->data() + array->offset() + list->value_offset(pos) * valueType->byte_width();
1178 branch->SetAddress((void*)buffer);
1179 sizeBranch->SetAddress(&listSize);
1180 } break;
1181 case arrow::Type::FIXED_SIZE_LIST:
1182 default: {
1183 // needed for the boolean case, I should probably cache this.
1184 auto byteWidth = valueType->byte_width() ? valueType->byte_width() : 1;
1185 uint8_t const* buffer = std::static_pointer_cast<arrow::PrimitiveArray>(valueArray)->values()->data() + array->offset() + pos * listSize * byteWidth;
1186 branch->SetAddress((void*)buffer);
1187 };
1188 }
1189 }
1190 tree->Fill();
1191 ++pos;
1192 }
1193 return arrow::Status::OK();
1194 }
1195
1196 arrow::Future<> FinishInternal() override
1197 {
1198 auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
1199 auto* tree = treeStream->GetTree();
1200 tree->Write("", TObject::kOverwrite);
1201 tree->SetDirectory(nullptr);
1202
1203 return {};
1204 };
1205};
1206arrow::Result<std::shared_ptr<arrow::dataset::FileWriter>> TTreeFileFormat::MakeWriter(std::shared_ptr<arrow::io::OutputStream> destination, std::shared_ptr<arrow::Schema> schema, std::shared_ptr<arrow::dataset::FileWriteOptions> options, arrow::fs::FileLocator destination_locator) const
1207{
1208 auto writer = std::make_shared<TTreeFileWriter>(schema, options, destination, destination_locator);
1209 return std::dynamic_pointer_cast<arrow::dataset::FileWriter>(writer);
1210}
1211
1212std::shared_ptr<arrow::dataset::FileWriteOptions> TTreeFileFormat::DefaultWriteOptions()
1213{
1214 std::shared_ptr<TTreeFileWriteOptions> options(
1215 new TTreeFileWriteOptions(shared_from_this()));
1216 return options;
1217}
1218
1220
1224} // namespace o2::framework
#define swap32_
Definition Endian.h:27
void swapCopy(unsigned char *dest, char *source, int size, int typeSize) noexcept
Definition Endian.h:61
int32_t i
uint32_t op
#define DEFINE_DPL_PLUGIN_INSTANCE(NAME, KIND)
Definition Plugins.h:112
#define DEFINE_DPL_PLUGINS_END
Definition Plugins.h:115
#define DEFINE_DPL_PLUGINS_BEGIN
Definition Plugins.h:107
uint16_t pos
Definition RawData.h:3
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:483
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:499
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:571
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:516
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:565
TBranch * ptr
bool mVLA
std::string type_name() const override
arrow::Result< arrow::fs::FileInfo > GetFileInfo(std::string const &path) override
std::shared_ptr< RootObjectHandler > GetObjectHandler(arrow::dataset::FileSource source) override
std::unique_ptr< TTree > & GetTree(arrow::dataset::FileSource) override
arrow::Status Reset(std::vector< ReadOps > ops, int64_t initial_capacity, arrow::MemoryPool *pool)
Initialize state of OutputStream with newly allocated memory and set position to 0.
arrow::Status Write(const void *data, int64_t nbytes) override
arrow::Result< std::shared_ptr< arrow::Buffer > > Finish()
Close the stream and return the buffer.
arrow::Result< int64_t > Tell() const override
static arrow::Result< std::shared_ptr< TTreeDeferredReadOutputStream > > Create(std::vector< ReadOps > &ops, int64_t initial_capacity=4096, arrow::MemoryPool *pool=arrow::default_memory_pool())
Create in-memory output stream with indicated capacity using a memory pool.
arrow::Status Close() override
Close the stream, preserving the buffer (retrieve it with Finish()).
TTreeFileFormat(size_t &totalCompressedSize, size_t &totalUncompressedSize)
arrow::Result< arrow::RecordBatchGenerator > ScanBatchesAsync(const std::shared_ptr< arrow::dataset::ScanOptions > &options, const std::shared_ptr< arrow::dataset::FileFragment > &fragment) const override
std::shared_ptr< arrow::dataset::FileWriteOptions > DefaultWriteOptions() override
arrow::Result< bool > IsSupported(const arrow::dataset::FileSource &source) const override
arrow::Result< std::shared_ptr< arrow::Schema > > Inspect(const arrow::dataset::FileSource &source) const override
std::string type_name() const override
bool Equals(const FileFormat &other) const override
arrow::Result< std::shared_ptr< arrow::dataset::FileWriter > > MakeWriter(std::shared_ptr< arrow::io::OutputStream > destination, std::shared_ptr< arrow::Schema > schema, std::shared_ptr< arrow::dataset::FileWriteOptions > options, arrow::fs::FileLocator destination_locator) const override
~TTreeFileFormat() override=default
arrow::Result< std::shared_ptr< arrow::dataset::FileFragment > > MakeFragment(arrow::dataset::FileSource source, arrow::compute::Expression partition_expression, std::shared_ptr< arrow::Schema > physical_schema) override
Create a FileFragment for a FileSource.
TTreeFileFragment(arrow::dataset::FileSource source, std::shared_ptr< arrow::dataset::FileFormat > format, arrow::compute::Expression partition_expression, std::shared_ptr< arrow::Schema > physical_schema)
std::vector< ReadOps > & ops()
std::shared_ptr< arrow::Buffer > GetPlaceholderForOp(size_t size)
arrow::Result< std::shared_ptr< arrow::io::OutputStream > > OpenOutputStream(const std::string &path, const std::shared_ptr< const arrow::KeyValueMetadata > &metadata) override
virtual std::unique_ptr< TTree > & GetTree(arrow::dataset::FileSource source)=0
TTreeFileWriteOptions(std::shared_ptr< arrow::dataset::FileFormat > format)
arrow::Status Write(const std::shared_ptr< arrow::RecordBatch > &batch) override
arrow::Future FinishInternal() override
TTreeFileWriter(std::shared_ptr< arrow::Schema > schema, std::shared_ptr< arrow::dataset::FileWriteOptions > options, std::shared_ptr< arrow::io::OutputStream > destination, arrow::fs::FileLocator destination_locator)
arrow::Status Write(const void *data, int64_t nbytes) override
arrow::Result< int64_t > Tell() const override
arrow::Status Close() override
TBranch * CreateBranch(char const *branchName, char const *sizeBranch)
TTreeOutputStream(TTree *, std::string branchPrefix)
GLdouble n
Definition glcorearb.h:1982
GLint GLsizei count
Definition glcorearb.h:399
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLsizei const GLuint const GLintptr * offsets
Definition glcorearb.h:2595
GLenum array
Definition glcorearb.h:4274
GLuint const GLchar * name
Definition glcorearb.h:781
GLdouble f
Definition glcorearb.h:310
GLsizei GLsizei GLchar * source
Definition glcorearb.h:798
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum target
Definition glcorearb.h:1641
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLboolean * data
Definition glcorearb.h:298
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei GLsizei * length
Definition glcorearb.h:790
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
GLint ref
Definition glcorearb.h:291
GLint GLint GLsizei GLint GLenum format
Definition glcorearb.h:275
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RuntimeErrorRef runtime_error(const char *)
TBufferFile & rootBuffer()
char const * rootSuffixFromArrow(arrow::Type::type id)
auto arrowTypeFromROOT(EDataType type, int size)
RuntimeErrorRef runtime_error_f(const char *,...)
Defining DataPointCompositeObject explicitly as copiable.
Definition list.h:40
std::shared_ptr< arrow::Buffer > targetBuffer
std::function< std::shared_ptr< arrow::dataset::FileWriteOptions >()> options
std::shared_ptr< o2::framework::TTreeFileFormat > format
VectorOfTObjectPtrs other
ctfTree Write()
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))
std::vector< ReadoutWindowData > rows