Project
Loading...
Searching...
No Matches
TTreePlugin.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13#include "Framework/Plugins.h"
14#include "Framework/Signpost.h"
15#include "Framework/Endian.h"
16#include <TBufferFile.h>
17#include <TBufferIO.h>
18#include <arrow/buffer.h>
19#include <arrow/dataset/file_base.h>
20#include <arrow/extension_type.h>
21#include <arrow/memory_pool.h>
22#include <arrow/status.h>
23#include <arrow/type.h>
24#include <arrow/type_fwd.h>
25#include <arrow/util/key_value_metadata.h>
26#include <arrow/array/array_nested.h>
27#include <arrow/array/array_primitive.h>
28#include <arrow/array/builder_nested.h>
29#include <arrow/array/builder_primitive.h>
30#include <TTree.h>
31#include <TBranch.h>
32#include <TFile.h>
33#include <TLeaf.h>
34#include <unistd.h>
35#include <cstdint>
36#include <memory>
37#include <stdexcept>
38#include <iostream>
39
41
42namespace o2::framework
43{
44
45enum struct ReadOpKind {
46 Unknown,
47 Offsets,
48 Values,
50 VLA
51};
52
53struct ReadOps {
54 TBranch* branch = nullptr;
55 std::shared_ptr<arrow::Buffer> targetBuffer = nullptr;
56 int64_t rootBranchEntries = 0;
57 size_t typeSize = 0;
58 size_t listSize = 0;
59 // If this is an offset reading op, keep track of the actual
60 // range for the offsets, not only how many VLAs are there.
61 int64_t offsetCount = 0;
63};
64
70{
71 public:
72 explicit TTreeDeferredReadOutputStream(std::vector<ReadOps>& ops,
73 const std::shared_ptr<arrow::ResizableBuffer>& buffer);
74
81 static arrow::Result<std::shared_ptr<TTreeDeferredReadOutputStream>> Create(
82 std::vector<ReadOps>& ops,
83 int64_t initial_capacity = 4096,
84 arrow::MemoryPool* pool = arrow::default_memory_pool());
85
86 // By the time we call the destructor, the contents
87 // of the buffer are already moved to fairmq
88 // for being sent.
89 ~TTreeDeferredReadOutputStream() override = default;
90
91 // Implement the OutputStream interface
92
94 arrow::Status Close() override;
95 [[nodiscard]] bool closed() const override;
96 [[nodiscard]] arrow::Result<int64_t> Tell() const override;
97 arrow::Status Write(const void* data, int64_t nbytes) override;
98
100 using OutputStream::Write;
102
104 arrow::Result<std::shared_ptr<arrow::Buffer>> Finish();
105
111 arrow::Status Reset(std::vector<ReadOps> ops,
112 int64_t initial_capacity, arrow::MemoryPool* pool);
113
114 [[nodiscard]] int64_t capacity() const { return capacity_; }
115
116 private:
118 std::vector<ReadOps> ops_;
119
120 // Ensures there is sufficient space available to write nbytes
121 arrow::Status Reserve(int64_t nbytes);
122
123 std::shared_ptr<arrow::ResizableBuffer> buffer_;
124 bool is_open_;
125 int64_t capacity_;
126 int64_t position_;
127 uint8_t* mutable_data_;
128};
129
130static constexpr int64_t kBufferMinimumSize = 256;
131
132TTreeDeferredReadOutputStream::TTreeDeferredReadOutputStream()
133 : is_open_(false), capacity_(0), position_(0), mutable_data_(nullptr) {}
134
135TTreeDeferredReadOutputStream::TTreeDeferredReadOutputStream(std::vector<ReadOps>& ops,
136 const std::shared_ptr<arrow::ResizableBuffer>& buffer)
137 : ops_(ops),
138 buffer_(buffer),
139 is_open_(true),
140 capacity_(buffer->size()),
141 position_(0),
142 mutable_data_(buffer->mutable_data()) {}
143
144arrow::Result<std::shared_ptr<TTreeDeferredReadOutputStream>> TTreeDeferredReadOutputStream::Create(
145 std::vector<ReadOps>& ops,
146 int64_t initial_capacity, arrow::MemoryPool* pool)
147{
148 // ctor is private, so cannot use make_shared
149 auto ptr = std::shared_ptr<TTreeDeferredReadOutputStream>(new TTreeDeferredReadOutputStream);
150 RETURN_NOT_OK(ptr->Reset(ops, initial_capacity, pool));
151 return ptr;
152}
153
154arrow::Status TTreeDeferredReadOutputStream::Reset(std::vector<ReadOps> ops,
155 int64_t initial_capacity, arrow::MemoryPool* pool)
156{
157 ARROW_ASSIGN_OR_RAISE(buffer_, AllocateResizableBuffer(initial_capacity, pool));
158 ops_ = ops;
159 is_open_ = true;
160 capacity_ = initial_capacity;
161 position_ = 0;
162 mutable_data_ = buffer_->mutable_data();
163 return arrow::Status::OK();
164}
165
167{
168 if (is_open_) {
169 is_open_ = false;
170 if (position_ < capacity_) {
171 RETURN_NOT_OK(buffer_->Resize(position_, false));
172 }
173 }
174 return arrow::Status::OK();
175}
176
177bool TTreeDeferredReadOutputStream::closed() const { return !is_open_; }
178
179arrow::Result<std::shared_ptr<arrow::Buffer>> TTreeDeferredReadOutputStream::Finish()
180{
181 RETURN_NOT_OK(Close());
182 buffer_->ZeroPadding();
183 is_open_ = false;
184 return std::move(buffer_);
185}
186
187arrow::Result<int64_t> TTreeDeferredReadOutputStream::Tell() const { return position_; }
188
190 int readEntries = 0;
191 rootBuffer.Reset();
192 while (readEntries < op.rootBranchEntries) {
193 auto readLast = op.branch->GetBulkRead().GetEntriesSerialized(readEntries, rootBuffer);
194 if (readLast < 0) {
195 throw runtime_error_f("Error while reading branch %s starting from %zu.", op.branch->GetName(), readEntries);
196 }
197 int size = readLast * op.listSize;
198 readEntries += readLast;
199 swapCopy(target, rootBuffer.GetCurrent(), size, op.typeSize);
200 target += (ptrdiff_t)(size * op.typeSize);
201 }
202};
203
205 int readEntries = 0;
206 rootBuffer.Reset();
207 // Set to 0
208 memset(target, 0, op.targetBuffer->size());
209 int readLast = 0;
210 while (readEntries < op.rootBranchEntries) {
211 auto beginValue = readLast;
212 auto readLast = op.branch->GetBulkRead().GetBulkEntries(readEntries, rootBuffer);
213 int size = readLast * op.listSize;
214 readEntries += readLast;
215 for (int i = beginValue; i < beginValue + size; ++i) {
216 auto value = static_cast<uint8_t>(rootBuffer.GetCurrent()[i - beginValue] << (i % 8));
217 target[i / 8] |= value;
218 }
219 }
220};
221
222auto readVLAValues = [](uint8_t* target, ReadOps& op, ReadOps const& offsetOp, TBufferFile& rootBuffer) {
223 int readEntries = 0;
224 auto* tPtrOffset = reinterpret_cast<const int*>(offsetOp.targetBuffer->data());
225 std::span<int const> const offsets{tPtrOffset, tPtrOffset + offsetOp.rootBranchEntries + 1};
226
227 rootBuffer.Reset();
228 while (readEntries < op.rootBranchEntries) {
229 auto readLast = op.branch->GetBulkRead().GetEntriesSerialized(readEntries, rootBuffer);
230 int size = offsets[readEntries + readLast] - offsets[readEntries];
231 readEntries += readLast;
232 swapCopy(target, rootBuffer.GetCurrent(), size, op.typeSize);
233 target += (ptrdiff_t)(size * op.typeSize);
234 }
235};
236
238{
239 // FIXME: we will need more than one once we have multithreaded reading.
240 static TBufferFile rootBuffer{TBuffer::EMode::kWrite, 4 * 1024 * 1024};
241 return rootBuffer;
242}
243
244arrow::Status TTreeDeferredReadOutputStream::Write(const void* data, int64_t nbytes)
245{
246 if (ARROW_PREDICT_FALSE(!is_open_)) {
247 return arrow::Status::IOError("OutputStream is closed");
248 }
249 if (ARROW_PREDICT_TRUE(nbytes == 0)) {
250 return arrow::Status::OK();
251 }
252 if (ARROW_PREDICT_FALSE(position_ + nbytes >= capacity_)) {
253 RETURN_NOT_OK(Reserve(nbytes));
254 }
255 // This is a real address which needs to be copied. Do it!
256 auto ref = (int64_t)data;
257 if (ref >= ops_.size()) {
258 memcpy(mutable_data_ + position_, data, nbytes);
259 position_ += nbytes;
260 return arrow::Status::OK();
261 }
262 auto& op = ops_[ref];
263
264 switch (op.kind) {
265 // Offsets need to be read in advance because we need to know
266 // how many elements are there in total (since TTree does not allow discovering such informantion)
268 break;
270 readValues(mutable_data_ + position_, op, rootBuffer());
271 break;
272 case ReadOpKind::VLA:
273 readVLAValues(mutable_data_ + position_, op, ops_[ref - 1], rootBuffer());
274 break;
276 readBoolValues(mutable_data_ + position_, op, rootBuffer());
277 break;
279 throw runtime_error("Unknown Op");
280 }
281 op.branch->SetStatus(false);
282 op.branch->DropBaskets("all");
283 op.branch->Reset();
284 op.branch->GetTransientBuffer(0)->Expand(0);
285
286 position_ += nbytes;
287 return arrow::Status::OK();
288}
289
290arrow::Status TTreeDeferredReadOutputStream::Reserve(int64_t nbytes)
291{
292 // Always overallocate by doubling. It seems that it is a better growth
293 // strategy, at least for memory_benchmark.cc.
294 // This may be because it helps match the allocator's allocation buckets
295 // more exactly. Or perhaps it hits a sweet spot in jemalloc.
296 int64_t new_capacity = std::max(kBufferMinimumSize, capacity_);
297 new_capacity = position_ + nbytes;
298 if (new_capacity > capacity_) {
299 RETURN_NOT_OK(buffer_->Resize(new_capacity));
300 capacity_ = new_capacity;
301 mutable_data_ = buffer_->mutable_data();
302 }
303 return arrow::Status::OK();
304}
305
307{
308 public:
309 TTreeFileWriteOptions(std::shared_ptr<arrow::dataset::FileFormat> format)
310 : FileWriteOptions(format)
311 {
312 }
313};
314
315// A filesystem which allows me to get a TTree
317{
318 public:
320
321 arrow::Result<std::shared_ptr<arrow::io::OutputStream>> OpenOutputStream(
322 const std::string& path,
323 const std::shared_ptr<const arrow::KeyValueMetadata>& metadata) override;
324
325 virtual std::unique_ptr<TTree>& GetTree(arrow::dataset::FileSource source) = 0;
326};
327
329{
330 size_t& mTotCompressedSize;
331 size_t& mTotUncompressedSize;
332
333 public:
334 TTreeFileFormat(size_t& totalCompressedSize, size_t& totalUncompressedSize)
335 : FileFormat({}),
336 mTotCompressedSize(totalCompressedSize),
337 mTotUncompressedSize(totalUncompressedSize)
338 {
339 }
340
341 ~TTreeFileFormat() override = default;
342
343 std::string type_name() const override
344 {
345 return "ttree";
346 }
347
348 bool Equals(const FileFormat& other) const override
349 {
350 return other.type_name() == this->type_name();
351 }
352
353 arrow::Result<bool> IsSupported(const arrow::dataset::FileSource& source) const override
354 {
355 auto fs = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(source.filesystem());
356 if (!fs) {
357 return false;
358 }
359 return fs->CheckSupport(source);
360 }
361
362 arrow::Result<std::shared_ptr<arrow::Schema>> Inspect(const arrow::dataset::FileSource& source) const override;
364 arrow::Result<std::shared_ptr<arrow::dataset::FileFragment>> MakeFragment(
365 arrow::dataset::FileSource source, arrow::compute::Expression partition_expression,
366 std::shared_ptr<arrow::Schema> physical_schema) override;
367
368 arrow::Result<std::shared_ptr<arrow::dataset::FileWriter>> MakeWriter(std::shared_ptr<arrow::io::OutputStream> destination, std::shared_ptr<arrow::Schema> schema, std::shared_ptr<arrow::dataset::FileWriteOptions> options, arrow::fs::FileLocator destination_locator) const override;
369
370 std::shared_ptr<arrow::dataset::FileWriteOptions> DefaultWriteOptions() override;
371
372 arrow::Result<arrow::RecordBatchGenerator> ScanBatchesAsync(
373 const std::shared_ptr<arrow::dataset::ScanOptions>& options,
374 const std::shared_ptr<arrow::dataset::FileFragment>& fragment) const override;
375};
376
378{
379 public:
381 : TTreeFileSystem(),
382 mTree(tree)
383 {
384 }
385
386 arrow::Result<arrow::fs::FileInfo> GetFileInfo(std::string const& path) override;
387
388 std::string type_name() const override
389 {
390 return "ttree";
391 }
392
393 std::shared_ptr<RootObjectHandler> GetObjectHandler(arrow::dataset::FileSource source) override
394 {
395 return std::make_shared<RootObjectHandler>((void*)mTree.get(), std::make_shared<TTreeFileFormat>(mTotCompressedSize, mTotUncompressedSize));
396 }
397
398 std::unique_ptr<TTree>& GetTree(arrow::dataset::FileSource) override
399 {
400 // Simply return the only TTree we have
401 return mTree;
402 }
403
404 private:
405 size_t mTotUncompressedSize;
406 size_t mTotCompressedSize;
407 std::unique_ptr<TTree> mTree;
408};
409
410arrow::Result<arrow::fs::FileInfo> SingleTreeFileSystem::GetFileInfo(std::string const& path)
411{
412 arrow::dataset::FileSource source(path, shared_from_this());
413 arrow::fs::FileInfo result;
414 result.set_path(path);
415 result.set_type(arrow::fs::FileType::File);
416 return result;
417}
418
419// A fragment which holds a tree
421{
422 public:
423 TTreeFileFragment(arrow::dataset::FileSource source,
424 std::shared_ptr<arrow::dataset::FileFormat> format,
425 arrow::compute::Expression partition_expression,
426 std::shared_ptr<arrow::Schema> physical_schema)
427 : FileFragment(source, format, std::move(partition_expression), physical_schema)
428 {
429 auto rootFS = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(this->source().filesystem());
430 if (rootFS.get() == nullptr) {
431 throw runtime_error_f("Unknown filesystem %s when reading %s.",
432 source.filesystem()->type_name().c_str(), source.path().c_str());
433 }
434 auto objectHandler = rootFS->GetObjectHandler(source);
435 if (!objectHandler->format->Equals(*format)) {
436 throw runtime_error_f("Cannot read source %s with format %s to pupulate a TTreeFileFragment.",
437 source.path().c_str(), objectHandler->format->type_name().c_str());
438 };
439 mTree = objectHandler->GetObjectAsOwner<TTree>();
440 }
441
442 TTree* GetTree()
443 {
444 return mTree.get();
445 }
446
447 std::vector<ReadOps>& ops()
448 {
449 return mOps;
450 }
451
454 std::shared_ptr<arrow::Buffer> GetPlaceholderForOp(size_t size)
455 {
456 return std::make_shared<arrow::Buffer>((uint8_t*)(mOps.size() - 1), size);
457 }
458
459 private:
460 std::unique_ptr<TTree> mTree;
461 std::vector<ReadOps> mOps;
462};
463
464// An arrow outputstream which allows to write to a TTree. Eventually
465// with a prefix for the branches.
467{
468 public:
469 // Using a pointer means that the tree itself is owned by another
470 // class
471 TTreeOutputStream(TTree*, std::string branchPrefix);
472
473 arrow::Status Close() override;
474
475 arrow::Result<int64_t> Tell() const override;
476
477 arrow::Status Write(const void* data, int64_t nbytes) override;
478
479 bool closed() const override;
480
481 TBranch* CreateBranch(char const* branchName, char const* sizeBranch);
482
483 TTree* GetTree()
484 {
485 return mTree;
486 }
487
488 private:
489 TTree* mTree;
490 std::string mBranchPrefix;
491};
492
493// An arrow outputstream which allows to write to a ttree
494// @a branch prefix is to be used to identify a set of branches which all belong to
495// the same table.
496TTreeOutputStream::TTreeOutputStream(TTree* f, std::string branchPrefix)
497 : mTree(f),
498 mBranchPrefix(std::move(branchPrefix))
499{
500}
501
503{
504 if (mTree->GetCurrentFile() == nullptr) {
505 return arrow::Status::Invalid("Cannot close a tree not attached to a file");
506 }
507 mTree->GetCurrentFile()->Close();
508 return arrow::Status::OK();
509}
510
511arrow::Result<int64_t> TTreeOutputStream::Tell() const
512{
513 return arrow::Result<int64_t>(arrow::Status::NotImplemented("Cannot move"));
514}
515
516arrow::Status TTreeOutputStream::Write(const void* data, int64_t nbytes)
517{
518 return arrow::Status::NotImplemented("Cannot write raw bytes to a TTree");
519}
520
522{
523 // A standalone tree is never closed.
524 if (mTree->GetCurrentFile() == nullptr) {
525 return false;
526 }
527 return mTree->GetCurrentFile()->IsOpen() == false;
528}
529
530TBranch* TTreeOutputStream::CreateBranch(char const* branchName, char const* sizeBranch)
531{
532 if (mBranchPrefix.empty() == true) {
533 return mTree->Branch(branchName, (char*)nullptr, sizeBranch);
534 }
535 return mTree->Branch((mBranchPrefix + "/" + branchName).c_str(), (char*)nullptr, (mBranchPrefix + sizeBranch).c_str());
536}
537
541 std::shared_ptr<o2::framework::TTreeFileFormat> format = nullptr;
542};
543
546 {
547 auto context = new TTreePluginContext;
548 context->format = std::make_shared<o2::framework::TTreeFileFormat>(context->totalCompressedSize, context->totalUncompressedSize);
549 return new RootArrowFactory{
550 .options = [context]() { return context->format->DefaultWriteOptions(); },
551 .format = [context]() { return context->format; },
552 .deferredOutputStreamer = [](std::shared_ptr<arrow::dataset::FileFragment> fragment, const std::shared_ptr<arrow::ResizableBuffer>& buffer) -> std::shared_ptr<arrow::io::OutputStream> {
553 auto treeFragment = std::dynamic_pointer_cast<TTreeFileFragment>(fragment);
554 return std::make_shared<TTreeDeferredReadOutputStream>(treeFragment->ops(), buffer);
555 }};
556 }
557};
558
564
566 uint32_t offset = 0;
567 std::span<int> offsets;
568 int readEntries = 0;
569 int count = 0;
570 auto* tPtrOffset = reinterpret_cast<int*>(op.targetBuffer->mutable_data());
571 offsets = std::span<int>{tPtrOffset, tPtrOffset + op.rootBranchEntries + 1};
572
573 // read sizes first
574 rootBuffer.Reset();
575 while (readEntries < op.rootBranchEntries) {
576 auto readLast = op.branch->GetBulkRead().GetEntriesSerialized(readEntries, rootBuffer);
577 if (readLast == -1) {
578 throw runtime_error_f("Unable to read from branch %s.", op.branch->GetName());
579 }
580 readEntries += readLast;
581 for (auto i = 0; i < readLast; ++i) {
582 offsets[count++] = (int)offset;
583 offset += swap32_(reinterpret_cast<uint32_t*>(rootBuffer.GetCurrent())[i]);
584 }
585 }
587 op.offsetCount = offset;
588};
589
590arrow::Result<arrow::RecordBatchGenerator> TTreeFileFormat::ScanBatchesAsync(
591 const std::shared_ptr<arrow::dataset::ScanOptions>& options,
592 const std::shared_ptr<arrow::dataset::FileFragment>& fragment) const
593{
594 assert(options->dataset_schema != nullptr);
595 // This is the schema we want to read
596 auto dataset_schema = options->dataset_schema;
597 auto treeFragment = std::dynamic_pointer_cast<TTreeFileFragment>(fragment);
598 if (treeFragment.get() == nullptr) {
599 return {arrow::Status::NotImplemented("Not a ttree fragment")};
600 }
601
602 auto generator = [pool = options->pool, treeFragment, dataset_schema, &totalCompressedSize = mTotCompressedSize,
603 &totalUncompressedSize = mTotUncompressedSize]() -> arrow::Future<std::shared_ptr<arrow::RecordBatch>> {
604 O2_SIGNPOST_ID_FROM_POINTER(tid, root_arrow_fs, treeFragment->GetTree());
605 O2_SIGNPOST_START(root_arrow_fs, tid, "Generator", "Creating batch for tree %{public}s", treeFragment->GetTree()->GetName());
606 std::vector<std::shared_ptr<arrow::Array>> columns;
607 std::vector<std::shared_ptr<arrow::Field>> fields = dataset_schema->fields();
608 auto physical_schema = *treeFragment->ReadPhysicalSchema();
609
610 if (dataset_schema->num_fields() > physical_schema->num_fields()) {
611 throw runtime_error_f("One TTree must have all the fields requested in a table");
612 }
613
614 // Register physical fields into the cache
615 std::vector<BranchFieldMapping> mappings;
616
617 // We need to count the number of readops to avoid moving the vector.
618 int opsCount = 0;
619 for (int fi = 0; fi < dataset_schema->num_fields(); ++fi) {
620 auto dataset_field = dataset_schema->field(fi);
621 // This is needed because for now the dataset_field
622 // is actually the schema of the ttree
623 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Processing dataset field %{public}s.", dataset_field->name().c_str());
624 int physicalFieldIdx = physical_schema->GetFieldIndex(dataset_field->name());
625
626 if (physicalFieldIdx < 0) {
627 throw runtime_error_f("Cannot find physical field associated to %s. Possible fields: %s",
628 dataset_field->name().c_str(), physical_schema->ToString().c_str());
629 }
630 if (physicalFieldIdx > 1 && physical_schema->field(physicalFieldIdx - 1)->name().ends_with("_size")) {
631 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Field %{public}s has sizes in %{public}s.", dataset_field->name().c_str(),
632 physical_schema->field(physicalFieldIdx - 1)->name().c_str());
633 mappings.push_back({physicalFieldIdx, physicalFieldIdx - 1, fi});
634 opsCount += 2;
635 } else {
636 mappings.push_back({physicalFieldIdx, -1, fi});
637 opsCount++;
638 }
639 }
640
641 auto* tree = treeFragment->GetTree();
642 auto branches = tree->GetListOfBranches();
643 size_t totalTreeSize = 0;
644 std::vector<TBranch*> selectedBranches;
645 for (auto& mapping : mappings) {
646 selectedBranches.push_back((TBranch*)branches->At(mapping.mainBranchIdx));
647 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Adding branch %{public}s to stream.", selectedBranches.back()->GetName());
648 totalTreeSize += selectedBranches.back()->GetTotalSize();
649 if (mapping.vlaIdx != -1) {
650 selectedBranches.push_back((TBranch*)branches->At(mapping.vlaIdx));
651 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Adding branch %{public}s to stream.", selectedBranches.back()->GetName());
652 totalTreeSize += selectedBranches.back()->GetTotalSize();
653 }
654 }
655
656 size_t cacheSize = std::max(std::min(totalTreeSize, 25000000UL), 1000000UL);
657 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Resizing cache to %zu.", cacheSize);
658 tree->SetCacheSize(cacheSize);
659 for (auto* branch : selectedBranches) {
660 tree->AddBranchToCache(branch, false);
661 }
662 tree->StopCacheLearningPhase();
663
664 // Intermediate buffer to bulk read. Two for now
665 std::vector<ReadOps>& ops = treeFragment->ops();
666 ops.clear();
667 ops.reserve(opsCount);
668 for (size_t mi = 0; mi < mappings.size(); ++mi) {
669 BranchFieldMapping mapping = mappings[mi];
670 // The field actually on disk
671 auto datasetField = dataset_schema->field(mapping.datasetFieldIdx);
672 auto physicalField = physical_schema->field(mapping.mainBranchIdx);
673
674 if (mapping.vlaIdx != -1) {
675 auto* branch = (TBranch*)branches->At(mapping.vlaIdx);
676 ops.emplace_back(ReadOps{
677 .branch = branch,
678 .rootBranchEntries = branch->GetEntries(),
679 .typeSize = 4,
680 .listSize = 1,
681 .kind = ReadOpKind::Offsets,
682 });
683 auto& op = ops.back();
684 ARROW_ASSIGN_OR_RAISE(op.targetBuffer, arrow::AllocateBuffer((op.rootBranchEntries + 1) * op.typeSize, pool));
685 // Offsets need to be read immediately to know how many values are there
687 }
688 ops.push_back({});
689 auto& valueOp = ops.back();
690 valueOp.branch = (TBranch*)branches->At(mapping.mainBranchIdx);
691 valueOp.rootBranchEntries = valueOp.branch->GetEntries();
692 // In case this is a vla, we set the offsetCount as totalEntries
693 // In case we read booleans we need a special coversion from bytes to bits.
694 auto listType = std::dynamic_pointer_cast<arrow::FixedSizeListType>(datasetField->type());
695 valueOp.typeSize = physicalField->type()->byte_width();
696 // Notice how we are not (yet) allocating buffers at this point. We merely
697 // create placeholders to subsequently fill.
698 if ((datasetField->type() == arrow::boolean())) {
699 valueOp.kind = ReadOpKind::Booleans;
700 valueOp.listSize = 1;
701 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp((valueOp.rootBranchEntries) / 8 + 1);
702 } else if (listType && datasetField->type()->field(0)->type() == arrow::boolean()) {
703 valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width();
704 valueOp.listSize = listType->list_size();
705 valueOp.kind = ReadOpKind::Booleans;
706 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp((valueOp.rootBranchEntries * valueOp.listSize) / 8 + 1);
707 } else if (mapping.vlaIdx != -1) {
708 valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width();
709 valueOp.listSize = -1;
710 // -1 is the current one, -2 is the one with for the offsets
711 valueOp.kind = ReadOpKind::VLA;
712 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(ops[ops.size() - 2].offsetCount * valueOp.typeSize);
713 } else if (listType) {
714 valueOp.kind = ReadOpKind::Values;
715 valueOp.listSize = listType->list_size();
716 valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width();
717 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(valueOp.rootBranchEntries * valueOp.typeSize * valueOp.listSize);
718 } else {
719 valueOp.typeSize = physicalField->type()->byte_width();
720 valueOp.kind = ReadOpKind::Values;
721 valueOp.listSize = 1;
722 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(valueOp.rootBranchEntries * valueOp.typeSize);
723 }
724 arrow::Status status;
725 std::shared_ptr<arrow::Array> array;
726
727 if (listType) {
728 auto varray = std::make_shared<arrow::PrimitiveArray>(datasetField->type()->field(0)->type(), valueOp.rootBranchEntries * valueOp.listSize, valueOp.targetBuffer);
729 array = std::make_shared<arrow::FixedSizeListArray>(datasetField->type(), valueOp.rootBranchEntries, varray);
730 // This is a vla, there is also an offset op
731 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
732 valueOp.branch->GetName(),
733 valueOp.rootBranchEntries,
734 valueOp.targetBuffer->size());
735 } else if (mapping.vlaIdx != -1) {
736 auto& offsetOp = ops[ops.size() - 2];
737 auto varray = std::make_shared<arrow::PrimitiveArray>(datasetField->type()->field(0)->type(), offsetOp.offsetCount, valueOp.targetBuffer);
738 // We have pushed an offset op if this was the case.
739 array = std::make_shared<arrow::ListArray>(datasetField->type(), offsetOp.rootBranchEntries, offsetOp.targetBuffer, varray);
740 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
741 offsetOp.branch->GetName(), offsetOp.rootBranchEntries, offsetOp.targetBuffer->size());
742 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
743 valueOp.branch->GetName(),
744 offsetOp.offsetCount,
745 valueOp.targetBuffer->size());
746 } else {
747 array = std::make_shared<arrow::PrimitiveArray>(datasetField->type(), valueOp.rootBranchEntries, valueOp.targetBuffer);
748 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
749 valueOp.branch->GetName(),
750 valueOp.rootBranchEntries,
751 valueOp.targetBuffer->size());
752 }
753
754 columns.push_back(array);
755 }
756
757 // Do the actual filling of the buffers. This happens after we have created the whole structure
758 // so that we can read directly in shared memory.
759 int64_t rows = -1;
760 for (size_t i = 0; i < ops.size(); ++i) {
761 auto& op = ops[i];
762 if (rows == -1 && op.kind != ReadOpKind::VLA) {
763 rows = op.rootBranchEntries;
764 }
765 if (rows == -1 && op.kind == ReadOpKind::VLA) {
766 auto& offsetOp = ops[i - 1];
767 rows = offsetOp.rootBranchEntries;
768 }
769 if (op.kind != ReadOpKind::VLA && rows != op.rootBranchEntries) {
770 throw runtime_error_f("Unmatching number of rows for branch %s. Expected %lli, found %lli", op.branch->GetName(), rows, op.rootBranchEntries);
771 }
772 if (op.kind == ReadOpKind::VLA && rows != ops[i - 1].rootBranchEntries) {
773 throw runtime_error_f("Unmatching number of rows for branch %s. Expected %lli, found %lli", op.branch->GetName(), rows, ops[i - 1].offsetCount);
774 }
775 }
776
777 auto batch = arrow::RecordBatch::Make(dataset_schema, rows, columns);
778 totalCompressedSize += tree->GetZipBytes();
779 totalUncompressedSize += tree->GetTotBytes();
780 O2_SIGNPOST_END(root_arrow_fs, tid, "Generator", "Done creating batch compressed:%zu uncompressed:%zu", totalCompressedSize, totalUncompressedSize);
781 return batch;
782 };
783 return generator;
784}
785
786char const* rootSuffixFromArrow(arrow::Type::type id)
787{
788 switch (id) {
789 case arrow::Type::BOOL:
790 return "/O";
791 case arrow::Type::UINT8:
792 return "/b";
793 case arrow::Type::UINT16:
794 return "/s";
795 case arrow::Type::UINT32:
796 return "/i";
797 case arrow::Type::UINT64:
798 return "/l";
799 case arrow::Type::INT8:
800 return "/B";
801 case arrow::Type::INT16:
802 return "/S";
803 case arrow::Type::INT32:
804 return "/I";
805 case arrow::Type::INT64:
806 return "/L";
807 case arrow::Type::FLOAT:
808 return "/F";
809 case arrow::Type::DOUBLE:
810 return "/D";
811 default:
812 throw runtime_error("Unsupported arrow column type");
813 }
814}
815
816arrow::Result<std::shared_ptr<arrow::io::OutputStream>> TTreeFileSystem::OpenOutputStream(
817 const std::string& path,
818 const std::shared_ptr<const arrow::KeyValueMetadata>& metadata)
819{
820 arrow::dataset::FileSource source{path, shared_from_this()};
821 auto prefix = metadata->Get("branch_prefix");
822 if (prefix.ok()) {
823 return std::make_shared<TTreeOutputStream>(GetTree(source).get(), *prefix);
824 }
825 return std::make_shared<TTreeOutputStream>(GetTree(source).get(), "");
826}
827
828namespace
829{
830struct BranchInfo {
831 std::string name;
832 TBranch* ptr;
833 bool mVLA;
834};
835} // namespace
836
837auto arrowTypeFromROOT(EDataType type, int size)
838{
839 auto typeGenerator = [](std::shared_ptr<arrow::DataType> const& type, int size) -> std::shared_ptr<arrow::DataType> {
840 switch (size) {
841 case -1:
842 return arrow::list(type);
843 case 1:
844 return std::move(type);
845 default:
846 return arrow::fixed_size_list(type, size);
847 }
848 };
849
850 switch (type) {
851 case EDataType::kBool_t:
852 return typeGenerator(arrow::boolean(), size);
853 case EDataType::kUChar_t:
854 return typeGenerator(arrow::uint8(), size);
855 case EDataType::kUShort_t:
856 return typeGenerator(arrow::uint16(), size);
857 case EDataType::kUInt_t:
858 return typeGenerator(arrow::uint32(), size);
859 case EDataType::kULong64_t:
860 return typeGenerator(arrow::uint64(), size);
861 case EDataType::kChar_t:
862 return typeGenerator(arrow::int8(), size);
863 case EDataType::kShort_t:
864 return typeGenerator(arrow::int16(), size);
865 case EDataType::kInt_t:
866 return typeGenerator(arrow::int32(), size);
867 case EDataType::kLong64_t:
868 return typeGenerator(arrow::int64(), size);
869 case EDataType::kFloat_t:
870 return typeGenerator(arrow::float32(), size);
871 case EDataType::kDouble_t:
872 return typeGenerator(arrow::float64(), size);
873 default:
874 throw o2::framework::runtime_error_f("Unsupported branch type: %d", static_cast<int>(type));
875 }
876}
877
878// This is a datatype for branches which implies
879struct RootTransientIndexType : arrow::ExtensionType {
880};
881
882arrow::Result<std::shared_ptr<arrow::Schema>> TTreeFileFormat::Inspect(const arrow::dataset::FileSource& source) const
883{
884 auto fs = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(source.filesystem());
885
886 if (!fs.get()) {
887 throw runtime_error_f("Unknown filesystem %s\n", source.filesystem()->type_name().c_str());
888 }
889 auto objectHandler = fs->GetObjectHandler(source);
890
891 if (!objectHandler->format->Equals(*this)) {
892 throw runtime_error_f("Unknown filesystem %s\n", source.filesystem()->type_name().c_str());
893 }
894
895 // Notice that we abuse of the API here and do not release the TTree,
896 // so that it's still managed by ROOT.
897 auto tree = objectHandler->GetObjectAsOwner<TTree>().release();
898
899 auto branches = tree->GetListOfBranches();
900 auto n = branches->GetEntries();
901
902 std::vector<std::shared_ptr<arrow::Field>> fields;
903
904 bool prevIsSize = false;
905 for (auto i = 0; i < n; ++i) {
906 auto branch = static_cast<TBranch*>(branches->At(i));
907 std::string name = branch->GetName();
908 if (prevIsSize && fields.back()->name() != name + "_size") {
909 throw runtime_error_f("Unexpected layout for VLA container %s.", branch->GetName());
910 }
911
912 if (name.ends_with("_size")) {
913 fields.emplace_back(std::make_shared<arrow::Field>(name, arrow::int32()));
914 prevIsSize = true;
915 } else {
916 static TClass* cls;
917 EDataType type;
918 branch->GetExpectedType(cls, type);
919
920 if (prevIsSize) {
921 fields.emplace_back(std::make_shared<arrow::Field>(name, arrowTypeFromROOT(type, -1)));
922 } else {
923 auto listSize = static_cast<TLeaf*>(branch->GetListOfLeaves()->At(0))->GetLenStatic();
924 fields.emplace_back(std::make_shared<arrow::Field>(name, arrowTypeFromROOT(type, listSize)));
925 }
926 prevIsSize = false;
927 }
928 }
929
930 if (fields.back()->name().ends_with("_size")) {
931 throw runtime_error_f("Missing values for VLA indices %s.", fields.back()->name().c_str());
932 }
933 return std::make_shared<arrow::Schema>(fields);
934}
935
937arrow::Result<std::shared_ptr<arrow::dataset::FileFragment>> TTreeFileFormat::MakeFragment(
938 arrow::dataset::FileSource source, arrow::compute::Expression partition_expression,
939 std::shared_ptr<arrow::Schema> physical_schema)
940{
941
942 return std::make_shared<TTreeFileFragment>(source, std::dynamic_pointer_cast<arrow::dataset::FileFormat>(shared_from_this()),
943 std::move(partition_expression),
944 physical_schema);
945}
946
948{
949 std::vector<TBranch*> branches;
950 std::vector<TBranch*> sizesBranches;
951 std::vector<std::shared_ptr<arrow::Array>> valueArrays;
952 std::vector<std::shared_ptr<arrow::Array>> sizeArrays;
953 std::vector<std::shared_ptr<arrow::DataType>> valueTypes;
954
955 std::vector<int64_t> valuesIdealBasketSize;
956 std::vector<int64_t> sizeIdealBasketSize;
957
958 std::vector<int64_t> typeSizes;
959 std::vector<int64_t> listSizes;
960 bool firstBasket = true;
961
962 // This is to create a batsket size according to the first batch.
963 void finaliseBasketSize(std::shared_ptr<arrow::RecordBatch> firstBatch)
964 {
965 O2_SIGNPOST_ID_FROM_POINTER(sid, root_arrow_fs, this);
966 O2_SIGNPOST_START(root_arrow_fs, sid, "finaliseBasketSize", "First batch with %lli rows received and %zu columns",
967 firstBatch->num_rows(), firstBatch->columns().size());
968 for (size_t i = 0; i < branches.size(); i++) {
969 auto* branch = branches[i];
970 auto* sizeBranch = sizesBranches[i];
971
972 int valueSize = valueTypes[i]->byte_width();
973 if (listSizes[i] == 1) {
974 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid, "finaliseBasketSize", "Branch %s exists and uses %d bytes per entry for %lli entries.",
975 branch->GetName(), valueSize, firstBatch->num_rows());
976 assert(sizeBranch == nullptr);
977 branch->SetBasketSize(1024 + firstBatch->num_rows() * valueSize);
978 } else if (listSizes[i] == -1) {
979 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid, "finaliseBasketSize", "Branch %s exists and uses %d bytes per entry.",
980 branch->GetName(), valueSize);
981 // This should probably lookup the
982 auto column = firstBatch->GetColumnByName(schema_->field(i)->name());
983 auto list = std::static_pointer_cast<arrow::ListArray>(column);
984 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid, "finaliseBasketSize", "Branch %s needed. Associated size branch %s and there are %lli entries of size %d in that list.",
985 branch->GetName(), sizeBranch->GetName(), list->length(), valueSize);
986 branch->SetBasketSize(1024 + firstBatch->num_rows() * valueSize * list->length());
987 sizeBranch->SetBasketSize(1024 + firstBatch->num_rows() * 4);
988 } else {
989 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid, "finaliseBasketSize", "Branch %s needed. There are %lli entries per array of size %d in that list.",
990 branch->GetName(), listSizes[i], valueSize);
991 assert(sizeBranch == nullptr);
992 branch->SetBasketSize(1024 + firstBatch->num_rows() * valueSize * listSizes[i]);
993 }
994
995 auto field = firstBatch->schema()->field(i);
996 if (field->name().starts_with("fIndexArray")) {
997 // One int per array to keep track of the size
998 int idealBasketSize = 4 * firstBatch->num_rows() + 1024 + field->type()->byte_width() * firstBatch->num_rows(); // minimal additional size needed, otherwise we get 2 baskets
999 int basketSize = std::max(32000, idealBasketSize); // keep a minimum value
1000 sizeBranch->SetBasketSize(basketSize);
1001 branch->SetBasketSize(basketSize);
1002 }
1003 }
1004 O2_SIGNPOST_END(root_arrow_fs, sid, "finaliseBasketSize", "Done");
1005 }
1006
1007 public:
1008 // Create the TTree based on the physical_schema, not the one in the batch.
1009 // The write method will have to reconcile the two schemas.
1010 TTreeFileWriter(std::shared_ptr<arrow::Schema> schema, std::shared_ptr<arrow::dataset::FileWriteOptions> options,
1011 std::shared_ptr<arrow::io::OutputStream> destination,
1012 arrow::fs::FileLocator destination_locator)
1013 : FileWriter(schema, options, destination, destination_locator)
1014 {
1015 // Batches have the same number of entries for each column.
1016 auto directoryStream = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(destination_);
1017 auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
1018
1019 if (directoryStream.get()) {
1020 TDirectoryFile* dir = directoryStream->GetDirectory();
1021 dir->cd();
1022 auto* tree = new TTree(destination_locator_.path.c_str(), "");
1023 treeStream = std::make_shared<TTreeOutputStream>(tree, "");
1024 } else if (treeStream.get()) {
1025 // We already have a tree stream, let's derive a new one
1026 // with the destination_locator_.path as prefix for the branches
1027 // This way we can multiplex multiple tables in the same tree.
1028 auto* tree = treeStream->GetTree();
1029 treeStream = std::make_shared<TTreeOutputStream>(tree, destination_locator_.path);
1030 } else {
1031 // I could simply set a prefix here to merge to an already existing tree.
1032 throw std::runtime_error("Unsupported backend.");
1033 }
1034
1035 for (auto i = 0u; i < schema->fields().size(); ++i) {
1036 auto& field = schema->field(i);
1037 listSizes.push_back(1);
1038
1039 int valuesIdealBasketSize = 0;
1040 // Construct all the needed branches.
1041 switch (field->type()->id()) {
1042 case arrow::Type::FIXED_SIZE_LIST: {
1043 listSizes.back() = std::static_pointer_cast<arrow::FixedSizeListType>(field->type())->list_size();
1044 valuesIdealBasketSize = 1024 + valueTypes.back()->byte_width() * listSizes.back();
1045 valueTypes.push_back(field->type()->field(0)->type());
1046 sizesBranches.push_back(nullptr);
1047 std::string leafList = fmt::format("{}[{}]{}", field->name(), listSizes.back(), rootSuffixFromArrow(valueTypes.back()->id()));
1048 branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
1049 } break;
1050 case arrow::Type::LIST: {
1051 valueTypes.push_back(field->type()->field(0)->type());
1052 std::string leafList = fmt::format("{}[{}_size]{}", field->name(), field->name(), rootSuffixFromArrow(valueTypes.back()->id()));
1053 listSizes.back() = -1; // VLA, we need to calculate it on the fly;
1054 std::string sizeLeafList = field->name() + "_size/I";
1055 sizesBranches.push_back(treeStream->CreateBranch((field->name() + "_size").c_str(), sizeLeafList.c_str()));
1056 branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
1057 // Notice that this could be replaced by a better guess of the
1058 // average size of the list elements, but this is not trivial.
1059 } break;
1060 default: {
1061 valueTypes.push_back(field->type());
1062 std::string leafList = field->name() + rootSuffixFromArrow(valueTypes.back()->id());
1063 sizesBranches.push_back(nullptr);
1064 branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
1065 } break;
1066 }
1067 }
1068 // We create the branches from the schema
1069 }
1070
1071 arrow::Status Write(const std::shared_ptr<arrow::RecordBatch>& batch) override
1072 {
1073 if (firstBasket) {
1074 firstBasket = false;
1075 finaliseBasketSize(batch);
1076 }
1077
1078 // Support writing empty tables
1079 if (batch->columns().empty() || batch->num_rows() == 0) {
1080 return arrow::Status::OK();
1081 }
1082
1083 // Batches have the same number of entries for each column.
1084 auto directoryStream = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(destination_);
1085 TTree* tree = nullptr;
1086 if (directoryStream.get()) {
1087 TDirectoryFile* dir = directoryStream->GetDirectory();
1088 tree = (TTree*)dir->Get(destination_locator_.path.c_str());
1089 }
1090 auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
1091
1092 if (!tree) {
1093 // I could simply set a prefix here to merge to an already existing tree.
1094 throw std::runtime_error("Unsupported backend.");
1095 }
1096
1097 for (auto i = 0u; i < batch->columns().size(); ++i) {
1098 auto column = batch->column(i);
1099 auto& field = batch->schema()->field(i);
1100
1101 valueArrays.push_back(nullptr);
1102
1103 switch (field->type()->id()) {
1104 case arrow::Type::FIXED_SIZE_LIST: {
1105 auto list = std::static_pointer_cast<arrow::FixedSizeListArray>(column);
1106 if (list->list_type()->field(0)->type()->id() == arrow::Type::BOOL) {
1107 int64_t length = list->length() * list->list_type()->list_size();
1108 arrow::UInt8Builder builder;
1109 auto ok = builder.Reserve(length);
1110 // I need to build an array of uint8_t for the conversion to ROOT which uses
1111 // bytes for boolans.
1112 auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(list->values());
1113 for (int64_t i = 0; i < length; ++i) {
1114 if (boolArray->IsValid(i)) {
1115 // Expand each boolean value (true/false) to uint8 (1/0)
1116 uint8_t value = boolArray->Value(i) ? 1 : 0;
1117 auto ok = builder.Append(value);
1118 } else {
1119 // Append null for invalid entries
1120 auto ok = builder.AppendNull();
1121 }
1122 }
1123 valueArrays.back() = *builder.Finish();
1124 } else {
1125 valueArrays.back() = list->values();
1126 }
1127 } break;
1128 case arrow::Type::LIST: {
1129 auto list = std::static_pointer_cast<arrow::ListArray>(column);
1130 valueArrays.back() = list->values();
1131 } break;
1132 case arrow::Type::BOOL: {
1133 // In case of arrays of booleans, we need to go back to their
1134 // char based representation for ROOT to save them.
1135 auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(column);
1136
1137 int64_t length = boolArray->length();
1138 arrow::UInt8Builder builder;
1139 auto ok = builder.Reserve(length);
1140
1141 for (int64_t i = 0; i < length; ++i) {
1142 if (boolArray->IsValid(i)) {
1143 // Expand each boolean value (true/false) to uint8 (1/0)
1144 uint8_t value = boolArray->Value(i) ? 1 : 0;
1145 auto ok = builder.Append(value);
1146 } else {
1147 // Append null for invalid entries
1148 auto ok = builder.AppendNull();
1149 }
1150 }
1151 valueArrays.back() = *builder.Finish();
1152 } break;
1153 default:
1154 valueArrays.back() = column;
1155 }
1156 }
1157
1158 int64_t pos = 0;
1159 while (pos < batch->num_rows()) {
1160 for (size_t bi = 0; bi < branches.size(); ++bi) {
1161 auto* branch = branches[bi];
1162 auto* sizeBranch = sizesBranches[bi];
1163 auto array = batch->column(bi);
1164 auto& field = batch->schema()->field(bi);
1165 auto& listSize = listSizes[bi];
1166 auto valueType = valueTypes[bi];
1167 auto valueArray = valueArrays[bi];
1168
1169 switch (field->type()->id()) {
1170 case arrow::Type::LIST: {
1171 auto list = std::static_pointer_cast<arrow::ListArray>(array);
1172 listSize = list->value_length(pos);
1173 uint8_t const* buffer = std::static_pointer_cast<arrow::PrimitiveArray>(valueArray)->values()->data() + array->offset() + list->value_offset(pos) * valueType->byte_width();
1174 branch->SetAddress((void*)buffer);
1175 sizeBranch->SetAddress(&listSize);
1176 } break;
1177 case arrow::Type::FIXED_SIZE_LIST:
1178 default: {
1179 // needed for the boolean case, I should probably cache this.
1180 auto byteWidth = valueType->byte_width() ? valueType->byte_width() : 1;
1181 uint8_t const* buffer = std::static_pointer_cast<arrow::PrimitiveArray>(valueArray)->values()->data() + array->offset() + pos * listSize * byteWidth;
1182 branch->SetAddress((void*)buffer);
1183 };
1184 }
1185 }
1186 tree->Fill();
1187 ++pos;
1188 }
1189 return arrow::Status::OK();
1190 }
1191
1192 arrow::Future<> FinishInternal() override
1193 {
1194 auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
1195 auto* tree = treeStream->GetTree();
1196 tree->Write("", TObject::kOverwrite);
1197 tree->SetDirectory(nullptr);
1198
1199 return {};
1200 };
1201};
1202arrow::Result<std::shared_ptr<arrow::dataset::FileWriter>> TTreeFileFormat::MakeWriter(std::shared_ptr<arrow::io::OutputStream> destination, std::shared_ptr<arrow::Schema> schema, std::shared_ptr<arrow::dataset::FileWriteOptions> options, arrow::fs::FileLocator destination_locator) const
1203{
1204 auto writer = std::make_shared<TTreeFileWriter>(schema, options, destination, destination_locator);
1205 return std::dynamic_pointer_cast<arrow::dataset::FileWriter>(writer);
1206}
1207
1208std::shared_ptr<arrow::dataset::FileWriteOptions> TTreeFileFormat::DefaultWriteOptions()
1209{
1210 std::shared_ptr<TTreeFileWriteOptions> options(
1211 new TTreeFileWriteOptions(shared_from_this()));
1212 return options;
1213}
1214
1216
1220} // namespace o2::framework
#define swap32_
Definition Endian.h:27
void swapCopy(unsigned char *dest, char *source, int size, int typeSize) noexcept
Definition Endian.h:61
int32_t i
uint32_t op
#define DEFINE_DPL_PLUGIN_INSTANCE(NAME, KIND)
Definition Plugins.h:112
#define DEFINE_DPL_PLUGINS_END
Definition Plugins.h:115
#define DEFINE_DPL_PLUGINS_BEGIN
Definition Plugins.h:107
uint16_t pos
Definition RawData.h:3
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:473
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:489
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:540
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:495
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:534
TBranch * ptr
bool mVLA
std::string type_name() const override
arrow::Result< arrow::fs::FileInfo > GetFileInfo(std::string const &path) override
std::shared_ptr< RootObjectHandler > GetObjectHandler(arrow::dataset::FileSource source) override
std::unique_ptr< TTree > & GetTree(arrow::dataset::FileSource) override
arrow::Status Reset(std::vector< ReadOps > ops, int64_t initial_capacity, arrow::MemoryPool *pool)
Initialize state of OutputStream with newly allocated memory and set position to 0.
arrow::Status Write(const void *data, int64_t nbytes) override
arrow::Result< std::shared_ptr< arrow::Buffer > > Finish()
Close the stream and return the buffer.
arrow::Result< int64_t > Tell() const override
static arrow::Result< std::shared_ptr< TTreeDeferredReadOutputStream > > Create(std::vector< ReadOps > &ops, int64_t initial_capacity=4096, arrow::MemoryPool *pool=arrow::default_memory_pool())
Create in-memory output stream with indicated capacity using a memory pool.
arrow::Status Close() override
Close the stream, preserving the buffer (retrieve it with Finish()).
TTreeFileFormat(size_t &totalCompressedSize, size_t &totalUncompressedSize)
arrow::Result< arrow::RecordBatchGenerator > ScanBatchesAsync(const std::shared_ptr< arrow::dataset::ScanOptions > &options, const std::shared_ptr< arrow::dataset::FileFragment > &fragment) const override
std::shared_ptr< arrow::dataset::FileWriteOptions > DefaultWriteOptions() override
arrow::Result< bool > IsSupported(const arrow::dataset::FileSource &source) const override
arrow::Result< std::shared_ptr< arrow::Schema > > Inspect(const arrow::dataset::FileSource &source) const override
std::string type_name() const override
bool Equals(const FileFormat &other) const override
arrow::Result< std::shared_ptr< arrow::dataset::FileWriter > > MakeWriter(std::shared_ptr< arrow::io::OutputStream > destination, std::shared_ptr< arrow::Schema > schema, std::shared_ptr< arrow::dataset::FileWriteOptions > options, arrow::fs::FileLocator destination_locator) const override
~TTreeFileFormat() override=default
arrow::Result< std::shared_ptr< arrow::dataset::FileFragment > > MakeFragment(arrow::dataset::FileSource source, arrow::compute::Expression partition_expression, std::shared_ptr< arrow::Schema > physical_schema) override
Create a FileFragment for a FileSource.
TTreeFileFragment(arrow::dataset::FileSource source, std::shared_ptr< arrow::dataset::FileFormat > format, arrow::compute::Expression partition_expression, std::shared_ptr< arrow::Schema > physical_schema)
std::vector< ReadOps > & ops()
std::shared_ptr< arrow::Buffer > GetPlaceholderForOp(size_t size)
arrow::Result< std::shared_ptr< arrow::io::OutputStream > > OpenOutputStream(const std::string &path, const std::shared_ptr< const arrow::KeyValueMetadata > &metadata) override
virtual std::unique_ptr< TTree > & GetTree(arrow::dataset::FileSource source)=0
TTreeFileWriteOptions(std::shared_ptr< arrow::dataset::FileFormat > format)
arrow::Status Write(const std::shared_ptr< arrow::RecordBatch > &batch) override
arrow::Future FinishInternal() override
TTreeFileWriter(std::shared_ptr< arrow::Schema > schema, std::shared_ptr< arrow::dataset::FileWriteOptions > options, std::shared_ptr< arrow::io::OutputStream > destination, arrow::fs::FileLocator destination_locator)
arrow::Status Write(const void *data, int64_t nbytes) override
arrow::Result< int64_t > Tell() const override
arrow::Status Close() override
TBranch * CreateBranch(char const *branchName, char const *sizeBranch)
TTreeOutputStream(TTree *, std::string branchPrefix)
GLdouble n
Definition glcorearb.h:1982
GLint GLsizei count
Definition glcorearb.h:399
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLsizei const GLuint const GLintptr * offsets
Definition glcorearb.h:2595
GLenum array
Definition glcorearb.h:4274
GLuint const GLchar * name
Definition glcorearb.h:781
GLdouble f
Definition glcorearb.h:310
GLsizei GLsizei GLchar * source
Definition glcorearb.h:798
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum target
Definition glcorearb.h:1641
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLboolean * data
Definition glcorearb.h:298
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei GLsizei * length
Definition glcorearb.h:790
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
GLint ref
Definition glcorearb.h:291
GLint GLint GLsizei GLint GLenum format
Definition glcorearb.h:275
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RuntimeErrorRef runtime_error(const char *)
TBufferFile & rootBuffer()
char const * rootSuffixFromArrow(arrow::Type::type id)
auto arrowTypeFromROOT(EDataType type, int size)
RuntimeErrorRef runtime_error_f(const char *,...)
Defining DataPointCompositeObject explicitly as copiable.
Definition list.h:40
std::shared_ptr< arrow::Buffer > targetBuffer
std::function< std::shared_ptr< arrow::dataset::FileWriteOptions >()> options
std::shared_ptr< o2::framework::TTreeFileFormat > format
VectorOfTObjectPtrs other
ctfTree Write()
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))
std::vector< ReadoutWindowData > rows