Project
Loading...
Searching...
No Matches
TTreePlugin.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13#include "Framework/Plugins.h"
14#include "Framework/Signpost.h"
15#include "Framework/Endian.h"
16#include <TBufferFile.h>
17#include <TBufferIO.h>
18#include <arrow/buffer.h>
19#include <arrow/dataset/file_base.h>
20#include <arrow/extension_type.h>
21#include <arrow/memory_pool.h>
22#include <arrow/status.h>
23#include <arrow/type.h>
24#include <arrow/type_fwd.h>
25#include <arrow/util/key_value_metadata.h>
26#include <arrow/array/array_nested.h>
27#include <arrow/array/array_primitive.h>
28#include <arrow/array/builder_nested.h>
29#include <arrow/array/builder_primitive.h>
30#include <arrow/array/util.h>
31#include <arrow/record_batch.h>
32#include <TTree.h>
33#include <TBranch.h>
34#include <TFile.h>
35#include <TLeaf.h>
36#include <unistd.h>
37#include <cstdint>
38#include <memory>
39#include <stdexcept>
40
42
43namespace o2::framework
44{
45
46enum struct ReadOpKind {
47 Unknown,
48 Offsets,
49 Values,
51 VLA
52};
53
54struct ReadOps {
55 TBranch* branch = nullptr;
56 std::shared_ptr<arrow::Buffer> targetBuffer = nullptr;
57 int64_t rootBranchEntries = 0;
58 size_t typeSize = 0;
59 size_t listSize = 0;
60 // If this is an offset reading op, keep track of the actual
61 // range for the offsets, not only how many VLAs are there.
62 int64_t offsetCount = 0;
64};
65
71{
72 public:
73 explicit TTreeDeferredReadOutputStream(std::vector<ReadOps>& ops,
74 const std::shared_ptr<arrow::ResizableBuffer>& buffer);
75
82 static arrow::Result<std::shared_ptr<TTreeDeferredReadOutputStream>> Create(
83 std::vector<ReadOps>& ops,
84 int64_t initial_capacity = 4096,
85 arrow::MemoryPool* pool = arrow::default_memory_pool());
86
87 // By the time we call the destructor, the contents
88 // of the buffer are already moved to fairmq
89 // for being sent.
90 ~TTreeDeferredReadOutputStream() override = default;
91
92 // Implement the OutputStream interface
93
95 arrow::Status Close() override;
96 [[nodiscard]] bool closed() const override;
97 [[nodiscard]] arrow::Result<int64_t> Tell() const override;
98 arrow::Status Write(const void* data, int64_t nbytes) override;
99
101 using OutputStream::Write;
103
105 arrow::Result<std::shared_ptr<arrow::Buffer>> Finish();
106
112 arrow::Status Reset(std::vector<ReadOps> ops,
113 int64_t initial_capacity, arrow::MemoryPool* pool);
114
115 [[nodiscard]] int64_t capacity() const { return capacity_; }
116
117 private:
119 std::vector<ReadOps> ops_;
120
121 // Ensures there is sufficient space available to write nbytes
122 arrow::Status Reserve(int64_t nbytes);
123
124 std::shared_ptr<arrow::ResizableBuffer> buffer_;
125 bool is_open_;
126 int64_t capacity_;
127 int64_t position_;
128 uint8_t* mutable_data_;
129};
130
131static constexpr int64_t kBufferMinimumSize = 256;
132
133TTreeDeferredReadOutputStream::TTreeDeferredReadOutputStream()
134 : is_open_(false), capacity_(0), position_(0), mutable_data_(nullptr) {}
135
136TTreeDeferredReadOutputStream::TTreeDeferredReadOutputStream(std::vector<ReadOps>& ops,
137 const std::shared_ptr<arrow::ResizableBuffer>& buffer)
138 : ops_(ops),
139 buffer_(buffer),
140 is_open_(true),
141 capacity_(buffer->size()),
142 position_(0),
143 mutable_data_(buffer->mutable_data()) {}
144
145arrow::Result<std::shared_ptr<TTreeDeferredReadOutputStream>> TTreeDeferredReadOutputStream::Create(
146 std::vector<ReadOps>& ops,
147 int64_t initial_capacity, arrow::MemoryPool* pool)
148{
149 // ctor is private, so cannot use make_shared
150 auto ptr = std::shared_ptr<TTreeDeferredReadOutputStream>(new TTreeDeferredReadOutputStream);
151 RETURN_NOT_OK(ptr->Reset(ops, initial_capacity, pool));
152 return ptr;
153}
154
155arrow::Status TTreeDeferredReadOutputStream::Reset(std::vector<ReadOps> ops,
156 int64_t initial_capacity, arrow::MemoryPool* pool)
157{
158 ARROW_ASSIGN_OR_RAISE(buffer_, AllocateResizableBuffer(initial_capacity, pool));
159 ops_ = ops;
160 is_open_ = true;
161 capacity_ = initial_capacity;
162 position_ = 0;
163 mutable_data_ = buffer_->mutable_data();
164 return arrow::Status::OK();
165}
166
168{
169 if (is_open_) {
170 is_open_ = false;
171 if (position_ < capacity_) {
172 RETURN_NOT_OK(buffer_->Resize(position_, false));
173 }
174 }
175 return arrow::Status::OK();
176}
177
178bool TTreeDeferredReadOutputStream::closed() const { return !is_open_; }
179
180arrow::Result<std::shared_ptr<arrow::Buffer>> TTreeDeferredReadOutputStream::Finish()
181{
182 RETURN_NOT_OK(Close());
183 buffer_->ZeroPadding();
184 is_open_ = false;
185 return std::move(buffer_);
186}
187
188arrow::Result<int64_t> TTreeDeferredReadOutputStream::Tell() const { return position_; }
189
191 int readEntries = 0;
192 rootBuffer.Reset();
193 while (readEntries < op.rootBranchEntries) {
194 auto readLast = op.branch->GetBulkRead().GetEntriesSerialized(readEntries, rootBuffer);
195 if (readLast < 0) {
196 throw runtime_error_f("Error while reading branch %s starting from %zu.", op.branch->GetName(), readEntries);
197 }
198 int size = readLast * op.listSize;
199 readEntries += readLast;
200 swapCopy(target, rootBuffer.GetCurrent(), size, op.typeSize);
201 target += (ptrdiff_t)(size * op.typeSize);
202 }
203};
204
206 int readEntries = 0;
207 rootBuffer.Reset();
208 // Set to 0
209 memset(target, 0, op.targetBuffer->size());
210 int readLast = 0;
211 while (readEntries < op.rootBranchEntries) {
212 auto beginValue = readEntries;
213 readLast = op.branch->GetBulkRead().GetBulkEntries(readEntries, rootBuffer);
214 int size = readLast * op.listSize;
215 readEntries += readLast;
216 for (int i = beginValue; i < beginValue + size; ++i) {
217 auto value = static_cast<uint8_t>(rootBuffer.GetCurrent()[i - beginValue] << (i % 8));
218 target[i / 8] |= value;
219 }
220 }
221};
222
223auto readVLAValues = [](uint8_t* target, ReadOps& op, ReadOps const& offsetOp, TBufferFile& rootBuffer) {
224 int readEntries = 0;
225 auto* tPtrOffset = reinterpret_cast<const int*>(offsetOp.targetBuffer->data());
226 std::span<int const> const offsets{tPtrOffset, tPtrOffset + offsetOp.rootBranchEntries + 1};
227
228 rootBuffer.Reset();
229 while (readEntries < op.rootBranchEntries) {
230 auto readLast = op.branch->GetBulkRead().GetEntriesSerialized(readEntries, rootBuffer);
231 int size = offsets[readEntries + readLast] - offsets[readEntries];
232 readEntries += readLast;
233 swapCopy(target, rootBuffer.GetCurrent(), size, op.typeSize);
234 target += (ptrdiff_t)(size * op.typeSize);
235 }
236};
237
239{
240 // FIXME: we will need more than one once we have multithreaded reading.
241 static TBufferFile rootBuffer{TBuffer::EMode::kWrite, 4 * 1024 * 1024};
242 return rootBuffer;
243}
244
245arrow::Status TTreeDeferredReadOutputStream::Write(const void* data, int64_t nbytes)
246{
247 if (ARROW_PREDICT_FALSE(!is_open_)) {
248 return arrow::Status::IOError("OutputStream is closed");
249 }
250 if (ARROW_PREDICT_TRUE(nbytes == 0)) {
251 return arrow::Status::OK();
252 }
253 if (ARROW_PREDICT_FALSE(position_ + nbytes >= capacity_)) {
254 RETURN_NOT_OK(Reserve(nbytes));
255 }
256 // This is a real address which needs to be copied. Do it!
257 auto ref = (int64_t)data;
258 if (ref >= ops_.size()) {
259 memcpy(mutable_data_ + position_, data, nbytes);
260 position_ += nbytes;
261 return arrow::Status::OK();
262 }
263 auto& op = ops_[ref];
264
265 switch (op.kind) {
266 // Offsets need to be read in advance because we need to know
267 // how many elements are there in total (since TTree does not allow discovering such informantion)
269 break;
271 readValues(mutable_data_ + position_, op, rootBuffer());
272 break;
273 case ReadOpKind::VLA:
274 readVLAValues(mutable_data_ + position_, op, ops_[ref - 1], rootBuffer());
275 break;
277 readBoolValues(mutable_data_ + position_, op, rootBuffer());
278 break;
280 throw runtime_error("Unknown Op");
281 }
282 op.branch->SetStatus(false);
283 op.branch->DropBaskets("all");
284 op.branch->Reset();
285 op.branch->GetTransientBuffer(0)->Expand(0);
286
287 position_ += nbytes;
288 return arrow::Status::OK();
289}
290
291arrow::Status TTreeDeferredReadOutputStream::Reserve(int64_t nbytes)
292{
293 // Always overallocate by doubling. It seems that it is a better growth
294 // strategy, at least for memory_benchmark.cc.
295 // This may be because it helps match the allocator's allocation buckets
296 // more exactly. Or perhaps it hits a sweet spot in jemalloc.
297 int64_t new_capacity = std::max(kBufferMinimumSize, capacity_);
298 new_capacity = position_ + nbytes;
299 if (new_capacity > capacity_) {
300 RETURN_NOT_OK(buffer_->Resize(new_capacity));
301 capacity_ = new_capacity;
302 mutable_data_ = buffer_->mutable_data();
303 }
304 return arrow::Status::OK();
305}
306
308{
309 public:
310 TTreeFileWriteOptions(std::shared_ptr<arrow::dataset::FileFormat> format)
311 : FileWriteOptions(format)
312 {
313 }
314};
315
316// A filesystem which allows me to get a TTree
318{
319 public:
321
322 arrow::Result<std::shared_ptr<arrow::io::OutputStream>> OpenOutputStream(
323 const std::string& path,
324 const std::shared_ptr<const arrow::KeyValueMetadata>& metadata) override;
325
326 virtual std::unique_ptr<TTree>& GetTree(arrow::dataset::FileSource source) = 0;
327};
328
330{
331 size_t& mTotCompressedSize;
332 size_t& mTotUncompressedSize;
333
334 public:
335 TTreeFileFormat(size_t& totalCompressedSize, size_t& totalUncompressedSize)
336 : FileFormat({}),
337 mTotCompressedSize(totalCompressedSize),
338 mTotUncompressedSize(totalUncompressedSize)
339 {
340 }
341
342 ~TTreeFileFormat() override = default;
343
344 std::string type_name() const override
345 {
346 return "ttree";
347 }
348
349 bool Equals(const FileFormat& other) const override
350 {
351 return other.type_name() == this->type_name();
352 }
353
354 arrow::Result<bool> IsSupported(const arrow::dataset::FileSource& source) const override
355 {
356 auto fs = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(source.filesystem());
357 if (!fs) {
358 return false;
359 }
360 return fs->CheckSupport(source);
361 }
362
363 arrow::Result<std::shared_ptr<arrow::Schema>> Inspect(const arrow::dataset::FileSource& source) const override;
365 arrow::Result<std::shared_ptr<arrow::dataset::FileFragment>> MakeFragment(
366 arrow::dataset::FileSource source, arrow::compute::Expression partition_expression,
367 std::shared_ptr<arrow::Schema> physical_schema) override;
368
369 arrow::Result<std::shared_ptr<arrow::dataset::FileWriter>> MakeWriter(std::shared_ptr<arrow::io::OutputStream> destination, std::shared_ptr<arrow::Schema> schema, std::shared_ptr<arrow::dataset::FileWriteOptions> options, arrow::fs::FileLocator destination_locator) const override;
370
371 std::shared_ptr<arrow::dataset::FileWriteOptions> DefaultWriteOptions() override;
372
373 arrow::Result<arrow::RecordBatchGenerator> ScanBatchesAsync(
374 const std::shared_ptr<arrow::dataset::ScanOptions>& options,
375 const std::shared_ptr<arrow::dataset::FileFragment>& fragment) const override;
376};
377
379{
380 public:
382 : TTreeFileSystem(),
383 mTree(tree)
384 {
385 }
386
387 arrow::Result<arrow::fs::FileInfo> GetFileInfo(std::string const& path) override;
388
389 std::string type_name() const override
390 {
391 return "ttree";
392 }
393
394 std::shared_ptr<RootObjectHandler> GetObjectHandler(arrow::dataset::FileSource source) override
395 {
396 return std::make_shared<RootObjectHandler>((void*)mTree.get(), std::make_shared<TTreeFileFormat>(mTotCompressedSize, mTotUncompressedSize));
397 }
398
399 std::unique_ptr<TTree>& GetTree(arrow::dataset::FileSource) override
400 {
401 // Simply return the only TTree we have
402 return mTree;
403 }
404
405 private:
406 size_t mTotUncompressedSize;
407 size_t mTotCompressedSize;
408 std::unique_ptr<TTree> mTree;
409};
410
411arrow::Result<arrow::fs::FileInfo> SingleTreeFileSystem::GetFileInfo(std::string const& path)
412{
413 arrow::dataset::FileSource source(path, shared_from_this());
414 arrow::fs::FileInfo result;
415 result.set_path(path);
416 result.set_type(arrow::fs::FileType::File);
417 return result;
418}
419
420// A fragment which holds a tree
422{
423 public:
424 TTreeFileFragment(arrow::dataset::FileSource source,
425 std::shared_ptr<arrow::dataset::FileFormat> format,
426 arrow::compute::Expression partition_expression,
427 std::shared_ptr<arrow::Schema> physical_schema)
428 : FileFragment(source, format, std::move(partition_expression), physical_schema)
429 {
430 auto rootFS = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(this->source().filesystem());
431 if (rootFS.get() == nullptr) {
432 throw runtime_error_f("Unknown filesystem %s when reading %s.",
433 source.filesystem()->type_name().c_str(), source.path().c_str());
434 }
435 auto objectHandler = rootFS->GetObjectHandler(source);
436 if (!objectHandler->format->Equals(*format)) {
437 throw runtime_error_f("Cannot read source %s with format %s to pupulate a TTreeFileFragment.",
438 source.path().c_str(), objectHandler->format->type_name().c_str());
439 };
440 mTree = objectHandler->GetObjectAsOwner<TTree>();
441 }
442
443 TTree* GetTree()
444 {
445 return mTree.get();
446 }
447
448 std::vector<ReadOps>& ops()
449 {
450 return mOps;
451 }
452
455 std::shared_ptr<arrow::Buffer> GetPlaceholderForOp(size_t size)
456 {
457 return std::make_shared<arrow::Buffer>((uint8_t*)(mOps.size() - 1), size);
458 }
459
460 private:
461 std::unique_ptr<TTree> mTree;
462 std::vector<ReadOps> mOps;
463};
464
465// An arrow outputstream which allows to write to a TTree. Eventually
466// with a prefix for the branches.
468{
469 public:
470 // Using a pointer means that the tree itself is owned by another
471 // class
472 TTreeOutputStream(TTree*, std::string branchPrefix);
473
474 arrow::Status Close() override;
475
476 arrow::Result<int64_t> Tell() const override;
477
478 arrow::Status Write(const void* data, int64_t nbytes) override;
479
480 bool closed() const override;
481
482 TBranch* CreateBranch(char const* branchName, char const* sizeBranch);
483
484 TTree* GetTree()
485 {
486 return mTree;
487 }
488
489 private:
490 TTree* mTree;
491 std::string mBranchPrefix;
492};
493
494// An arrow outputstream which allows to write to a ttree
495// @a branch prefix is to be used to identify a set of branches which all belong to
496// the same table.
497TTreeOutputStream::TTreeOutputStream(TTree* f, std::string branchPrefix)
498 : mTree(f),
499 mBranchPrefix(std::move(branchPrefix))
500{
501}
502
504{
505 if (mTree->GetCurrentFile() == nullptr) {
506 return arrow::Status::Invalid("Cannot close a tree not attached to a file");
507 }
508 mTree->GetCurrentFile()->Close();
509 return arrow::Status::OK();
510}
511
512arrow::Result<int64_t> TTreeOutputStream::Tell() const
513{
514 return arrow::Result<int64_t>(arrow::Status::NotImplemented("Cannot move"));
515}
516
517arrow::Status TTreeOutputStream::Write(const void* data, int64_t nbytes)
518{
519 return arrow::Status::NotImplemented("Cannot write raw bytes to a TTree");
520}
521
523{
524 // A standalone tree is never closed.
525 if (mTree->GetCurrentFile() == nullptr) {
526 return false;
527 }
528 return mTree->GetCurrentFile()->IsOpen() == false;
529}
530
531TBranch* TTreeOutputStream::CreateBranch(char const* branchName, char const* sizeBranch)
532{
533 if (mBranchPrefix.empty() == true) {
534 return mTree->Branch(branchName, (char*)nullptr, sizeBranch);
535 }
536 return mTree->Branch((mBranchPrefix + "/" + branchName).c_str(), (char*)nullptr, (mBranchPrefix + sizeBranch).c_str());
537}
538
542 std::shared_ptr<o2::framework::TTreeFileFormat> format = nullptr;
543};
544
547 {
548 auto context = new TTreePluginContext;
549 context->format = std::make_shared<o2::framework::TTreeFileFormat>(context->totalCompressedSize, context->totalUncompressedSize);
550 return new RootArrowFactory{
551 .options = [context]() { return context->format->DefaultWriteOptions(); },
552 .format = [context]() { return context->format; },
553 .deferredOutputStreamer = [](std::shared_ptr<arrow::dataset::FileFragment> fragment, const std::shared_ptr<arrow::ResizableBuffer>& buffer) -> std::shared_ptr<arrow::io::OutputStream> {
554 auto treeFragment = std::dynamic_pointer_cast<TTreeFileFragment>(fragment);
555 return std::make_shared<TTreeDeferredReadOutputStream>(treeFragment->ops(), buffer);
556 }};
557 }
558};
559
565
567 uint32_t offset = 0;
568 std::span<int> offsets;
569 int readEntries = 0;
570 int count = 0;
571 auto* tPtrOffset = reinterpret_cast<int*>(op.targetBuffer->mutable_data());
572 offsets = std::span<int>{tPtrOffset, tPtrOffset + op.rootBranchEntries + 1};
573
574 // read sizes first
575 rootBuffer.Reset();
576 while (readEntries < op.rootBranchEntries) {
577 auto readLast = op.branch->GetBulkRead().GetEntriesSerialized(readEntries, rootBuffer);
578 if (readLast == -1) {
579 throw runtime_error_f("Unable to read from branch %s.", op.branch->GetName());
580 }
581 readEntries += readLast;
582 for (auto i = 0; i < readLast; ++i) {
583 offsets[count++] = (int)offset;
584 offset += swap32_(reinterpret_cast<uint32_t*>(rootBuffer.GetCurrent())[i]);
585 }
586 }
588 op.offsetCount = offset;
589};
590
591arrow::Result<arrow::RecordBatchGenerator> TTreeFileFormat::ScanBatchesAsync(
592 const std::shared_ptr<arrow::dataset::ScanOptions>& options,
593 const std::shared_ptr<arrow::dataset::FileFragment>& fragment) const
594{
595 assert(options->dataset_schema != nullptr);
596 // This is the schema we want to read
597 auto dataset_schema = options->dataset_schema;
598 auto treeFragment = std::dynamic_pointer_cast<TTreeFileFragment>(fragment);
599 if (treeFragment.get() == nullptr) {
600 return {arrow::Status::NotImplemented("Not a ttree fragment")};
601 }
602
603 auto generator = [pool = options->pool, treeFragment, dataset_schema, &totalCompressedSize = mTotCompressedSize,
604 &totalUncompressedSize = mTotUncompressedSize]() -> arrow::Future<std::shared_ptr<arrow::RecordBatch>> {
605 O2_SIGNPOST_ID_FROM_POINTER(tid, root_arrow_fs, treeFragment->GetTree());
606 O2_SIGNPOST_START(root_arrow_fs, tid, "Generator", "Creating batch for tree %{public}s", treeFragment->GetTree()->GetName());
607 std::vector<std::shared_ptr<arrow::Array>> columns;
608 std::vector<std::shared_ptr<arrow::Field>> fields = dataset_schema->fields();
609 auto physical_schema = *treeFragment->ReadPhysicalSchema();
610
611 if (dataset_schema->num_fields() > physical_schema->num_fields()) {
612 throw runtime_error_f("One TTree must have all the fields requested in a table");
613 }
614
615 // Register physical fields into the cache
616 std::vector<BranchFieldMapping> mappings;
617
618 // We need to count the number of readops to avoid moving the vector.
619 int opsCount = 0;
620 for (int fi = 0; fi < dataset_schema->num_fields(); ++fi) {
621 auto dataset_field = dataset_schema->field(fi);
622 // This is needed because for now the dataset_field
623 // is actually the schema of the ttree
624 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Processing dataset field %{public}s.", dataset_field->name().c_str());
625 int physicalFieldIdx = physical_schema->GetFieldIndex(dataset_field->name());
626
627 if (physicalFieldIdx < 0) {
628 throw runtime_error_f("Cannot find physical field associated to %s. Possible fields: %s",
629 dataset_field->name().c_str(), physical_schema->ToString().c_str());
630 }
631 if (physicalFieldIdx > 0 && physical_schema->field(physicalFieldIdx - 1)->name().ends_with("_size")) {
632 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Field %{public}s has sizes in %{public}s.", dataset_field->name().c_str(),
633 physical_schema->field(physicalFieldIdx - 1)->name().c_str());
634 mappings.push_back({physicalFieldIdx, physicalFieldIdx - 1, fi});
635 opsCount += 2;
636 } else {
637 if (physicalFieldIdx > 0) {
638 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Field %{public}s previous field is %{public}s.", dataset_field->name().c_str(),
639 physical_schema->field(physicalFieldIdx - 1)->name().c_str());
640 }
641 mappings.push_back({physicalFieldIdx, -1, fi});
642 opsCount++;
643 }
644 }
645
646 auto* tree = treeFragment->GetTree();
647 auto branches = tree->GetListOfBranches();
648 size_t totalTreeSize = 0;
649 std::vector<TBranch*> selectedBranches;
650 for (auto& mapping : mappings) {
651 selectedBranches.push_back((TBranch*)branches->At(mapping.mainBranchIdx));
652 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Adding branch %{public}s to stream.", selectedBranches.back()->GetName());
653 totalTreeSize += selectedBranches.back()->GetTotalSize();
654 if (mapping.vlaIdx != -1) {
655 selectedBranches.push_back((TBranch*)branches->At(mapping.vlaIdx));
656 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Adding branch %{public}s to stream.", selectedBranches.back()->GetName());
657 totalTreeSize += selectedBranches.back()->GetTotalSize();
658 }
659 }
660
661 size_t cacheSize = std::max(std::min(totalTreeSize, 25000000UL), 1000000UL);
662 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Generator", "Resizing cache to %zu.", cacheSize);
663 tree->SetCacheSize(cacheSize);
664 for (auto* branch : selectedBranches) {
665 tree->AddBranchToCache(branch, false);
666 }
667 tree->StopCacheLearningPhase();
668
669 // Intermediate buffer to bulk read. Two for now
670 std::vector<ReadOps>& ops = treeFragment->ops();
671 ops.clear();
672 ops.reserve(opsCount);
673 for (size_t mi = 0; mi < mappings.size(); ++mi) {
674 BranchFieldMapping mapping = mappings[mi];
675 // The field actually on disk
676 auto datasetField = dataset_schema->field(mapping.datasetFieldIdx);
677 auto physicalField = physical_schema->field(mapping.mainBranchIdx);
678
679 if (mapping.vlaIdx != -1) {
680 auto* branch = (TBranch*)branches->At(mapping.vlaIdx);
681 ops.emplace_back(ReadOps{
682 .branch = branch,
683 .rootBranchEntries = branch->GetEntries(),
684 .typeSize = 4,
685 .listSize = 1,
686 .kind = ReadOpKind::Offsets,
687 });
688 auto& op = ops.back();
689 ARROW_ASSIGN_OR_RAISE(op.targetBuffer, arrow::AllocateBuffer((op.rootBranchEntries + 1) * op.typeSize, pool));
690 // Offsets need to be read immediately to know how many values are there
692 }
693 ops.push_back({});
694 auto& valueOp = ops.back();
695 valueOp.branch = (TBranch*)branches->At(mapping.mainBranchIdx);
696 valueOp.rootBranchEntries = valueOp.branch->GetEntries();
697 // In case this is a vla, we set the offsetCount as totalEntries
698 // In case we read booleans we need a special coversion from bytes to bits.
699 auto listType = std::dynamic_pointer_cast<arrow::FixedSizeListType>(datasetField->type());
700 valueOp.typeSize = physicalField->type()->byte_width();
701 // Notice how we are not (yet) allocating buffers at this point. We merely
702 // create placeholders to subsequently fill.
703 if ((datasetField->type() == arrow::boolean())) {
704 valueOp.kind = ReadOpKind::Booleans;
705 valueOp.listSize = 1;
706 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp((valueOp.rootBranchEntries + 7) / 8);
707 } else if (listType && datasetField->type()->field(0)->type() == arrow::boolean()) {
708 valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width();
709 valueOp.listSize = listType->list_size();
710 valueOp.kind = ReadOpKind::Booleans;
711 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp((valueOp.rootBranchEntries * valueOp.listSize) / 8 + 1);
712 } else if (mapping.vlaIdx != -1) {
713 valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width();
714 valueOp.listSize = -1;
715 // -1 is the current one, -2 is the one with for the offsets
716 valueOp.kind = ReadOpKind::VLA;
717 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(ops[ops.size() - 2].offsetCount * valueOp.typeSize);
718 } else if (listType) {
719 valueOp.kind = ReadOpKind::Values;
720 valueOp.listSize = listType->list_size();
721 valueOp.typeSize = physicalField->type()->field(0)->type()->byte_width();
722 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(valueOp.rootBranchEntries * valueOp.typeSize * valueOp.listSize);
723 } else {
724 valueOp.typeSize = physicalField->type()->byte_width();
725 valueOp.kind = ReadOpKind::Values;
726 valueOp.listSize = 1;
727 valueOp.targetBuffer = treeFragment->GetPlaceholderForOp(valueOp.rootBranchEntries * valueOp.typeSize);
728 }
729 arrow::Status status;
730 std::shared_ptr<arrow::Array> array;
731
732 if (listType) {
733 auto vdata = std::make_shared<arrow::ArrayData>(datasetField->type()->field(0)->type(), valueOp.rootBranchEntries * valueOp.listSize,
734 std::vector<std::shared_ptr<arrow::Buffer>>{nullptr, valueOp.targetBuffer});
735 array = std::make_shared<arrow::FixedSizeListArray>(datasetField->type(), valueOp.rootBranchEntries, arrow::MakeArray(vdata));
736 // This is a vla, there is also an offset op
737 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
738 valueOp.branch->GetName(),
739 valueOp.rootBranchEntries,
740 valueOp.targetBuffer->size());
741 } else if (mapping.vlaIdx != -1) {
742 auto& offsetOp = ops[ops.size() - 2];
743 auto vdata = std::make_shared<arrow::ArrayData>(datasetField->type()->field(0)->type(), offsetOp.offsetCount,
744 std::vector<std::shared_ptr<arrow::Buffer>>{nullptr, valueOp.targetBuffer});
745 // We have pushed an offset op if this was the case.
746 array = std::make_shared<arrow::ListArray>(datasetField->type(), offsetOp.rootBranchEntries, offsetOp.targetBuffer, arrow::MakeArray(vdata));
747 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
748 offsetOp.branch->GetName(), offsetOp.rootBranchEntries, offsetOp.targetBuffer->size());
749 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
750 valueOp.branch->GetName(),
751 offsetOp.offsetCount,
752 valueOp.targetBuffer->size());
753 } else {
754 auto data = std::make_shared<arrow::ArrayData>(datasetField->type(), valueOp.rootBranchEntries,
755 std::vector<std::shared_ptr<arrow::Buffer>>{nullptr, valueOp.targetBuffer});
756 array = arrow::MakeArray(data);
757 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, tid, "Op", "Created op for branch %{public}s with %lli entries, size of the buffer %lli.",
758 valueOp.branch->GetName(),
759 valueOp.rootBranchEntries,
760 valueOp.targetBuffer->size());
761 }
762
763 columns.push_back(array);
764 }
765
766 // Do the actual filling of the buffers. This happens after we have created the whole structure
767 // so that we can read directly in shared memory.
768 int64_t rows = -1;
769 for (size_t i = 0; i < ops.size(); ++i) {
770 auto& op = ops[i];
771 if (rows == -1 && op.kind != ReadOpKind::VLA) {
772 rows = op.rootBranchEntries;
773 }
774 if (rows == -1 && op.kind == ReadOpKind::VLA) {
775 auto& offsetOp = ops[i - 1];
776 rows = offsetOp.rootBranchEntries;
777 }
778 if (op.kind != ReadOpKind::VLA && rows != op.rootBranchEntries) {
779 throw runtime_error_f("Unmatching number of rows for branch %s. Expected %lli, found %lli", op.branch->GetName(), rows, op.rootBranchEntries);
780 }
781 if (op.kind == ReadOpKind::VLA && rows != ops[i - 1].rootBranchEntries) {
782 throw runtime_error_f("Unmatching number of rows for branch %s. Expected %lli, found %lli", op.branch->GetName(), rows, ops[i - 1].offsetCount);
783 }
784 }
785
786 auto batch = arrow::RecordBatch::Make(dataset_schema, rows, columns);
787 totalCompressedSize += tree->GetZipBytes();
788 totalUncompressedSize += tree->GetTotBytes();
789 O2_SIGNPOST_END(root_arrow_fs, tid, "Generator", "Done creating batch compressed:%zu uncompressed:%zu", totalCompressedSize, totalUncompressedSize);
790 return batch;
791 };
792 return generator;
793}
794
795char const* rootSuffixFromArrow(arrow::Type::type id)
796{
797 switch (id) {
798 case arrow::Type::BOOL:
799 return "/O";
800 case arrow::Type::UINT8:
801 return "/b";
802 case arrow::Type::UINT16:
803 return "/s";
804 case arrow::Type::UINT32:
805 return "/i";
806 case arrow::Type::UINT64:
807 return "/l";
808 case arrow::Type::INT8:
809 return "/B";
810 case arrow::Type::INT16:
811 return "/S";
812 case arrow::Type::INT32:
813 return "/I";
814 case arrow::Type::INT64:
815 return "/L";
816 case arrow::Type::FLOAT:
817 return "/F";
818 case arrow::Type::DOUBLE:
819 return "/D";
820 default:
821 throw runtime_error("Unsupported arrow column type");
822 }
823}
824
825arrow::Result<std::shared_ptr<arrow::io::OutputStream>> TTreeFileSystem::OpenOutputStream(
826 const std::string& path,
827 const std::shared_ptr<const arrow::KeyValueMetadata>& metadata)
828{
829 arrow::dataset::FileSource source{path, shared_from_this()};
830 auto prefix = metadata->Get("branch_prefix");
831 if (prefix.ok()) {
832 return std::make_shared<TTreeOutputStream>(GetTree(source).get(), *prefix);
833 }
834 return std::make_shared<TTreeOutputStream>(GetTree(source).get(), "");
835}
836
837namespace
838{
839struct BranchInfo {
840 std::string name;
841 TBranch* ptr;
842 bool mVLA;
843};
844} // namespace
845
846auto arrowTypeFromROOT(EDataType type, int size)
847{
848 auto typeGenerator = [](std::shared_ptr<arrow::DataType> const& type, int size) -> std::shared_ptr<arrow::DataType> {
849 switch (size) {
850 case -1:
851 return arrow::list(type);
852 case 1:
853 return std::move(type);
854 default:
855 return arrow::fixed_size_list(type, size);
856 }
857 };
858
859 switch (type) {
860 case EDataType::kBool_t:
861 return typeGenerator(arrow::boolean(), size);
862 case EDataType::kUChar_t:
863 return typeGenerator(arrow::uint8(), size);
864 case EDataType::kUShort_t:
865 return typeGenerator(arrow::uint16(), size);
866 case EDataType::kUInt_t:
867 return typeGenerator(arrow::uint32(), size);
868 case EDataType::kULong64_t:
869 return typeGenerator(arrow::uint64(), size);
870 case EDataType::kChar_t:
871 return typeGenerator(arrow::int8(), size);
872 case EDataType::kShort_t:
873 return typeGenerator(arrow::int16(), size);
874 case EDataType::kInt_t:
875 return typeGenerator(arrow::int32(), size);
876 case EDataType::kLong64_t:
877 return typeGenerator(arrow::int64(), size);
878 case EDataType::kFloat_t:
879 return typeGenerator(arrow::float32(), size);
880 case EDataType::kDouble_t:
881 return typeGenerator(arrow::float64(), size);
882 default:
883 throw o2::framework::runtime_error_f("Unsupported branch type: %d", static_cast<int>(type));
884 }
885}
886
887// This is a datatype for branches which implies
888struct RootTransientIndexType : arrow::ExtensionType {
889};
890
891arrow::Result<std::shared_ptr<arrow::Schema>> TTreeFileFormat::Inspect(const arrow::dataset::FileSource& source) const
892{
893 auto fs = std::dynamic_pointer_cast<VirtualRootFileSystemBase>(source.filesystem());
894
895 if (!fs.get()) {
896 throw runtime_error_f("Unknown filesystem %s\n", source.filesystem()->type_name().c_str());
897 }
898 auto objectHandler = fs->GetObjectHandler(source);
899
900 if (!objectHandler->format->Equals(*this)) {
901 throw runtime_error_f("Unknown filesystem %s\n", source.filesystem()->type_name().c_str());
902 }
903
904 // Notice that we abuse of the API here and do not release the TTree,
905 // so that it's still managed by ROOT.
906 auto tree = objectHandler->GetObjectAsOwner<TTree>().release();
907
908 auto branches = tree->GetListOfBranches();
909 auto n = branches->GetEntries();
910
911 std::vector<std::shared_ptr<arrow::Field>> fields;
912
913 bool prevIsSize = false;
914 for (auto i = 0; i < n; ++i) {
915 auto branch = static_cast<TBranch*>(branches->At(i));
916 std::string name = branch->GetName();
917 if (prevIsSize && fields.back()->name() != name + "_size") {
918 throw runtime_error_f("Unexpected layout for VLA container %s.", branch->GetName());
919 }
920
921 if (name.ends_with("_size")) {
922 fields.emplace_back(std::make_shared<arrow::Field>(name, arrow::int32()));
923 prevIsSize = true;
924 } else {
925 static TClass* cls;
926 EDataType type;
927 branch->GetExpectedType(cls, type);
928
929 if (prevIsSize) {
930 fields.emplace_back(std::make_shared<arrow::Field>(name, arrowTypeFromROOT(type, -1)));
931 } else {
932 auto listSize = static_cast<TLeaf*>(branch->GetListOfLeaves()->At(0))->GetLenStatic();
933 fields.emplace_back(std::make_shared<arrow::Field>(name, arrowTypeFromROOT(type, listSize)));
934 }
935 prevIsSize = false;
936 }
937 }
938
939 if (fields.back()->name().ends_with("_size")) {
940 throw runtime_error_f("Missing values for VLA indices %s.", fields.back()->name().c_str());
941 }
942 return std::make_shared<arrow::Schema>(fields);
943}
944
946arrow::Result<std::shared_ptr<arrow::dataset::FileFragment>> TTreeFileFormat::MakeFragment(
947 arrow::dataset::FileSource source, arrow::compute::Expression partition_expression,
948 std::shared_ptr<arrow::Schema> physical_schema)
949{
950
951 return std::make_shared<TTreeFileFragment>(source, std::dynamic_pointer_cast<arrow::dataset::FileFormat>(shared_from_this()),
952 std::move(partition_expression),
953 physical_schema);
954}
955
957{
958 std::vector<TBranch*> branches;
959 std::vector<TBranch*> sizesBranches;
960 std::vector<std::shared_ptr<arrow::Array>> valueArrays;
961 std::vector<std::shared_ptr<arrow::Array>> sizeArrays;
962 std::vector<std::shared_ptr<arrow::DataType>> valueTypes;
963
964 std::vector<int64_t> valuesIdealBasketSize;
965 std::vector<int64_t> sizeIdealBasketSize;
966
967 std::vector<int64_t> typeSizes;
968 std::vector<int64_t> listSizes;
969 bool firstBasket = true;
970
971 // This is to create a batsket size according to the first batch.
972 void finaliseBasketSize(std::shared_ptr<arrow::RecordBatch> firstBatch)
973 {
974 O2_SIGNPOST_ID_FROM_POINTER(sid, root_arrow_fs, this);
975 O2_SIGNPOST_START(root_arrow_fs, sid, "finaliseBasketSize", "First batch with %lli rows received and %zu columns",
976 firstBatch->num_rows(), firstBatch->columns().size());
977 for (size_t i = 0; i < branches.size(); i++) {
978 auto* branch = branches[i];
979 auto* sizeBranch = sizesBranches[i];
980
981 int valueSize = valueTypes[i]->byte_width();
982 if (listSizes[i] == 1) {
983 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid, "finaliseBasketSize", "Branch %s exists and uses %d bytes per entry for %lli entries.",
984 branch->GetName(), valueSize, firstBatch->num_rows());
985 assert(sizeBranch == nullptr);
986 branch->SetBasketSize(1024 + firstBatch->num_rows() * valueSize);
987 } else if (listSizes[i] == -1) {
988 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid, "finaliseBasketSize", "Branch %s exists and uses %d bytes per entry.",
989 branch->GetName(), valueSize);
990 // This should probably lookup the
991 auto column = firstBatch->GetColumnByName(schema_->field(i)->name());
992 auto list = std::static_pointer_cast<arrow::ListArray>(column);
993 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid, "finaliseBasketSize", "Branch %s needed. Associated size branch %s and there are %lli entries of size %d in that list.",
994 branch->GetName(), sizeBranch->GetName(), list->length(), valueSize);
995 branch->SetBasketSize(1024 + firstBatch->num_rows() * valueSize * list->length());
996 sizeBranch->SetBasketSize(1024 + firstBatch->num_rows() * 4);
997 } else {
998 O2_SIGNPOST_EVENT_EMIT(root_arrow_fs, sid, "finaliseBasketSize", "Branch %s needed. There are %lli entries per array of size %d in that list.",
999 branch->GetName(), listSizes[i], valueSize);
1000 assert(sizeBranch == nullptr);
1001 branch->SetBasketSize(1024 + firstBatch->num_rows() * valueSize * listSizes[i]);
1002 }
1003
1004 auto field = firstBatch->schema()->field(i);
1005 if (field->name().starts_with("fIndexArray")) {
1006 // One int per array to keep track of the size
1007 int idealBasketSize = 4 * firstBatch->num_rows() + 1024 + field->type()->byte_width() * firstBatch->num_rows(); // minimal additional size needed, otherwise we get 2 baskets
1008 int basketSize = std::max(32000, idealBasketSize); // keep a minimum value
1009 sizeBranch->SetBasketSize(basketSize);
1010 branch->SetBasketSize(basketSize);
1011 }
1012 }
1013 O2_SIGNPOST_END(root_arrow_fs, sid, "finaliseBasketSize", "Done");
1014 }
1015
1016 public:
1017 // Create the TTree based on the physical_schema, not the one in the batch.
1018 // The write method will have to reconcile the two schemas.
1019 TTreeFileWriter(std::shared_ptr<arrow::Schema> schema, std::shared_ptr<arrow::dataset::FileWriteOptions> options,
1020 std::shared_ptr<arrow::io::OutputStream> destination,
1021 arrow::fs::FileLocator destination_locator)
1022 : FileWriter(schema, options, destination, destination_locator)
1023 {
1024 // Batches have the same number of entries for each column.
1025 auto directoryStream = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(destination_);
1026 auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
1027
1028 if (directoryStream.get()) {
1029 TDirectoryFile* dir = directoryStream->GetDirectory();
1030 dir->cd();
1031 auto* tree = new TTree(destination_locator_.path.c_str(), "");
1032 treeStream = std::make_shared<TTreeOutputStream>(tree, "");
1033 } else if (treeStream.get()) {
1034 // We already have a tree stream, let's derive a new one
1035 // with the destination_locator_.path as prefix for the branches
1036 // This way we can multiplex multiple tables in the same tree.
1037 auto* tree = treeStream->GetTree();
1038 treeStream = std::make_shared<TTreeOutputStream>(tree, destination_locator_.path);
1039 } else {
1040 // I could simply set a prefix here to merge to an already existing tree.
1041 throw std::runtime_error("Unsupported backend.");
1042 }
1043
1044 for (auto i = 0u; i < schema->fields().size(); ++i) {
1045 auto& field = schema->field(i);
1046 listSizes.push_back(1);
1047
1048 int valuesIdealBasketSize = 0;
1049 // Construct all the needed branches.
1050 switch (field->type()->id()) {
1051 case arrow::Type::FIXED_SIZE_LIST: {
1052 listSizes.back() = std::static_pointer_cast<arrow::FixedSizeListType>(field->type())->list_size();
1053 valuesIdealBasketSize = 1024 + valueTypes.back()->byte_width() * listSizes.back();
1054 valueTypes.push_back(field->type()->field(0)->type());
1055 sizesBranches.push_back(nullptr);
1056 std::string leafList = fmt::format("{}[{}]{}", field->name(), listSizes.back(), rootSuffixFromArrow(valueTypes.back()->id()));
1057 branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
1058 } break;
1059 case arrow::Type::LIST: {
1060 valueTypes.push_back(field->type()->field(0)->type());
1061 std::string leafList = fmt::format("{}[{}_size]{}", field->name(), field->name(), rootSuffixFromArrow(valueTypes.back()->id()));
1062 listSizes.back() = -1; // VLA, we need to calculate it on the fly;
1063 std::string sizeLeafList = field->name() + "_size/I";
1064 sizesBranches.push_back(treeStream->CreateBranch((field->name() + "_size").c_str(), sizeLeafList.c_str()));
1065 branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
1066 // Notice that this could be replaced by a better guess of the
1067 // average size of the list elements, but this is not trivial.
1068 } break;
1069 default: {
1070 valueTypes.push_back(field->type());
1071 std::string leafList = field->name() + rootSuffixFromArrow(valueTypes.back()->id());
1072 sizesBranches.push_back(nullptr);
1073 branches.push_back(treeStream->CreateBranch(field->name().c_str(), leafList.c_str()));
1074 } break;
1075 }
1076 }
1077 // We create the branches from the schema
1078 }
1079
1080 arrow::Status Write(const std::shared_ptr<arrow::RecordBatch>& batch) override
1081 {
1082 if (firstBasket) {
1083 firstBasket = false;
1084 finaliseBasketSize(batch);
1085 }
1086
1087 // Support writing empty tables
1088 if (batch->columns().empty() || batch->num_rows() == 0) {
1089 return arrow::Status::OK();
1090 }
1091
1092 // Batches have the same number of entries for each column.
1093 auto directoryStream = std::dynamic_pointer_cast<TDirectoryFileOutputStream>(destination_);
1094 TTree* tree = nullptr;
1095 if (directoryStream.get()) {
1096 TDirectoryFile* dir = directoryStream->GetDirectory();
1097 tree = (TTree*)dir->Get(destination_locator_.path.c_str());
1098 }
1099 auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
1100
1101 if (!tree) {
1102 // I could simply set a prefix here to merge to an already existing tree.
1103 throw std::runtime_error("Unsupported backend.");
1104 }
1105
1106 for (auto i = 0u; i < batch->columns().size(); ++i) {
1107 auto column = batch->column(i);
1108 auto& field = batch->schema()->field(i);
1109
1110 valueArrays.push_back(nullptr);
1111
1112 switch (field->type()->id()) {
1113 case arrow::Type::FIXED_SIZE_LIST: {
1114 auto list = std::static_pointer_cast<arrow::FixedSizeListArray>(column);
1115 if (list->list_type()->field(0)->type()->id() == arrow::Type::BOOL) {
1116 int64_t length = list->length() * list->list_type()->list_size();
1117 arrow::UInt8Builder builder;
1118 auto ok = builder.Reserve(length);
1119 // I need to build an array of uint8_t for the conversion to ROOT which uses
1120 // bytes for boolans.
1121 auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(list->values());
1122 for (int64_t i = 0; i < length; ++i) {
1123 if (boolArray->IsValid(i)) {
1124 // Expand each boolean value (true/false) to uint8 (1/0)
1125 uint8_t value = boolArray->Value(i) ? 1 : 0;
1126 auto ok = builder.Append(value);
1127 } else {
1128 // Append null for invalid entries
1129 auto ok = builder.AppendNull();
1130 }
1131 }
1132 valueArrays.back() = *builder.Finish();
1133 } else {
1134 valueArrays.back() = list->values();
1135 }
1136 } break;
1137 case arrow::Type::LIST: {
1138 auto list = std::static_pointer_cast<arrow::ListArray>(column);
1139 valueArrays.back() = list->values();
1140 } break;
1141 case arrow::Type::BOOL: {
1142 // In case of arrays of booleans, we need to go back to their
1143 // char based representation for ROOT to save them.
1144 auto boolArray = std::static_pointer_cast<arrow::BooleanArray>(column);
1145
1146 int64_t length = boolArray->length();
1147 arrow::UInt8Builder builder;
1148 auto ok = builder.Reserve(length);
1149
1150 for (int64_t i = 0; i < length; ++i) {
1151 if (boolArray->IsValid(i)) {
1152 // Expand each boolean value (true/false) to uint8 (1/0)
1153 uint8_t value = boolArray->Value(i) ? 1 : 0;
1154 auto ok = builder.Append(value);
1155 } else {
1156 // Append null for invalid entries
1157 auto ok = builder.AppendNull();
1158 }
1159 }
1160 valueArrays.back() = *builder.Finish();
1161 } break;
1162 default:
1163 valueArrays.back() = column;
1164 }
1165 }
1166
1167 int64_t pos = 0;
1168 while (pos < batch->num_rows()) {
1169 for (size_t bi = 0; bi < branches.size(); ++bi) {
1170 auto* branch = branches[bi];
1171 auto* sizeBranch = sizesBranches[bi];
1172 auto array = batch->column(bi);
1173 auto& field = batch->schema()->field(bi);
1174 auto& listSize = listSizes[bi];
1175 auto valueType = valueTypes[bi];
1176 auto valueArray = valueArrays[bi];
1177
1178 switch (field->type()->id()) {
1179 case arrow::Type::LIST: {
1180 auto list = std::static_pointer_cast<arrow::ListArray>(array);
1181 listSize = list->value_length(pos);
1182 uint8_t const* buffer = std::static_pointer_cast<arrow::PrimitiveArray>(valueArray)->values()->data() + array->offset() + list->value_offset(pos) * valueType->byte_width();
1183 branch->SetAddress((void*)buffer);
1184 sizeBranch->SetAddress(&listSize);
1185 } break;
1186 case arrow::Type::FIXED_SIZE_LIST:
1187 default: {
1188 // needed for the boolean case, I should probably cache this.
1189 auto byteWidth = valueType->byte_width() ? valueType->byte_width() : 1;
1190 uint8_t const* buffer = std::static_pointer_cast<arrow::PrimitiveArray>(valueArray)->values()->data() + array->offset() + pos * listSize * byteWidth;
1191 branch->SetAddress((void*)buffer);
1192 };
1193 }
1194 }
1195 tree->Fill();
1196 ++pos;
1197 }
1198 return arrow::Status::OK();
1199 }
1200
1201 arrow::Future<> FinishInternal() override
1202 {
1203 auto treeStream = std::dynamic_pointer_cast<TTreeOutputStream>(destination_);
1204 auto* tree = treeStream->GetTree();
1205 tree->Write("", TObject::kOverwrite);
1206 tree->SetDirectory(nullptr);
1207
1208 return {};
1209 };
1210};
1211arrow::Result<std::shared_ptr<arrow::dataset::FileWriter>> TTreeFileFormat::MakeWriter(std::shared_ptr<arrow::io::OutputStream> destination, std::shared_ptr<arrow::Schema> schema, std::shared_ptr<arrow::dataset::FileWriteOptions> options, arrow::fs::FileLocator destination_locator) const
1212{
1213 auto writer = std::make_shared<TTreeFileWriter>(schema, options, destination, destination_locator);
1214 return std::dynamic_pointer_cast<arrow::dataset::FileWriter>(writer);
1215}
1216
1217std::shared_ptr<arrow::dataset::FileWriteOptions> TTreeFileFormat::DefaultWriteOptions()
1218{
1219 std::shared_ptr<TTreeFileWriteOptions> options(
1220 new TTreeFileWriteOptions(shared_from_this()));
1221 return options;
1222}
1223
1225
1229} // namespace o2::framework
#define swap32_
Definition Endian.h:27
void swapCopy(unsigned char *dest, char *source, int size, int typeSize) noexcept
Definition Endian.h:61
int32_t i
uint32_t op
#define DEFINE_DPL_PLUGIN_INSTANCE(NAME, KIND)
Definition Plugins.h:112
#define DEFINE_DPL_PLUGINS_END
Definition Plugins.h:115
#define DEFINE_DPL_PLUGINS_BEGIN
Definition Plugins.h:107
uint16_t pos
Definition RawData.h:3
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:483
#define O2_SIGNPOST_ID_FROM_POINTER(name, log, pointer)
Definition Signpost.h:499
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:571
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:516
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:565
TBranch * ptr
bool mVLA
std::string type_name() const override
arrow::Result< arrow::fs::FileInfo > GetFileInfo(std::string const &path) override
std::shared_ptr< RootObjectHandler > GetObjectHandler(arrow::dataset::FileSource source) override
std::unique_ptr< TTree > & GetTree(arrow::dataset::FileSource) override
arrow::Status Reset(std::vector< ReadOps > ops, int64_t initial_capacity, arrow::MemoryPool *pool)
Initialize state of OutputStream with newly allocated memory and set position to 0.
arrow::Status Write(const void *data, int64_t nbytes) override
arrow::Result< std::shared_ptr< arrow::Buffer > > Finish()
Close the stream and return the buffer.
arrow::Result< int64_t > Tell() const override
static arrow::Result< std::shared_ptr< TTreeDeferredReadOutputStream > > Create(std::vector< ReadOps > &ops, int64_t initial_capacity=4096, arrow::MemoryPool *pool=arrow::default_memory_pool())
Create in-memory output stream with indicated capacity using a memory pool.
arrow::Status Close() override
Close the stream, preserving the buffer (retrieve it with Finish()).
TTreeFileFormat(size_t &totalCompressedSize, size_t &totalUncompressedSize)
arrow::Result< arrow::RecordBatchGenerator > ScanBatchesAsync(const std::shared_ptr< arrow::dataset::ScanOptions > &options, const std::shared_ptr< arrow::dataset::FileFragment > &fragment) const override
std::shared_ptr< arrow::dataset::FileWriteOptions > DefaultWriteOptions() override
arrow::Result< bool > IsSupported(const arrow::dataset::FileSource &source) const override
arrow::Result< std::shared_ptr< arrow::Schema > > Inspect(const arrow::dataset::FileSource &source) const override
std::string type_name() const override
bool Equals(const FileFormat &other) const override
arrow::Result< std::shared_ptr< arrow::dataset::FileWriter > > MakeWriter(std::shared_ptr< arrow::io::OutputStream > destination, std::shared_ptr< arrow::Schema > schema, std::shared_ptr< arrow::dataset::FileWriteOptions > options, arrow::fs::FileLocator destination_locator) const override
~TTreeFileFormat() override=default
arrow::Result< std::shared_ptr< arrow::dataset::FileFragment > > MakeFragment(arrow::dataset::FileSource source, arrow::compute::Expression partition_expression, std::shared_ptr< arrow::Schema > physical_schema) override
Create a FileFragment for a FileSource.
TTreeFileFragment(arrow::dataset::FileSource source, std::shared_ptr< arrow::dataset::FileFormat > format, arrow::compute::Expression partition_expression, std::shared_ptr< arrow::Schema > physical_schema)
std::vector< ReadOps > & ops()
std::shared_ptr< arrow::Buffer > GetPlaceholderForOp(size_t size)
arrow::Result< std::shared_ptr< arrow::io::OutputStream > > OpenOutputStream(const std::string &path, const std::shared_ptr< const arrow::KeyValueMetadata > &metadata) override
virtual std::unique_ptr< TTree > & GetTree(arrow::dataset::FileSource source)=0
TTreeFileWriteOptions(std::shared_ptr< arrow::dataset::FileFormat > format)
arrow::Status Write(const std::shared_ptr< arrow::RecordBatch > &batch) override
arrow::Future FinishInternal() override
TTreeFileWriter(std::shared_ptr< arrow::Schema > schema, std::shared_ptr< arrow::dataset::FileWriteOptions > options, std::shared_ptr< arrow::io::OutputStream > destination, arrow::fs::FileLocator destination_locator)
arrow::Status Write(const void *data, int64_t nbytes) override
arrow::Result< int64_t > Tell() const override
arrow::Status Close() override
TBranch * CreateBranch(char const *branchName, char const *sizeBranch)
TTreeOutputStream(TTree *, std::string branchPrefix)
GLdouble n
Definition glcorearb.h:1982
GLint GLsizei count
Definition glcorearb.h:399
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLsizei const GLuint const GLintptr * offsets
Definition glcorearb.h:2595
GLenum array
Definition glcorearb.h:4274
GLuint const GLchar * name
Definition glcorearb.h:781
GLdouble f
Definition glcorearb.h:310
GLsizei GLsizei GLchar * source
Definition glcorearb.h:798
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLenum target
Definition glcorearb.h:1641
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLboolean * data
Definition glcorearb.h:298
GLintptr offset
Definition glcorearb.h:660
GLuint GLsizei GLsizei * length
Definition glcorearb.h:790
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
GLint ref
Definition glcorearb.h:291
GLint GLint GLsizei GLint GLenum format
Definition glcorearb.h:275
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RuntimeErrorRef runtime_error(const char *)
TBufferFile & rootBuffer()
char const * rootSuffixFromArrow(arrow::Type::type id)
auto arrowTypeFromROOT(EDataType type, int size)
RuntimeErrorRef runtime_error_f(const char *,...)
Defining DataPointCompositeObject explicitly as copiable.
Definition list.h:40
std::shared_ptr< arrow::Buffer > targetBuffer
std::function< std::shared_ptr< arrow::dataset::FileWriteOptions >()> options
std::shared_ptr< o2::framework::TTreeFileFormat > format
VectorOfTObjectPtrs other
ctfTree Write()
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))
std::vector< ReadoutWindowData > rows