Project
Loading...
Searching...
No Matches
RootArrowFilesystem.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
13#include <Rtypes.h>
14#include <arrow/array/array_nested.h>
15#include <arrow/array/array_primitive.h>
16#include <arrow/array/builder_nested.h>
17#include <arrow/array/builder_primitive.h>
18#include <memory>
19#include <TFile.h>
20#include <TBufferFile.h>
21#include <TDirectoryFile.h>
22#include <arrow/type.h>
23#include <arrow/type_fwd.h>
24#include <arrow/dataset/file_base.h>
25#include <arrow/result.h>
26#include <arrow/status.h>
27#include <fmt/format.h>
28#include <TKey.h>
29
30template class
31 std::shared_ptr<arrow::Array>;
32
33namespace o2::framework
34{
35using arrow::Status;
36
37TFileFileSystem::TFileFileSystem(TDirectoryFile* f, size_t readahead, RootObjectReadingFactory& factory, bool ownsFile)
39 mFile(f),
40 mObjectFactory(factory),
41 mOwnsFile(ownsFile)
42{
43 ((TFile*)mFile)->SetReadaheadSize(50 * 1024 * 1024);
44}
45
47{
48 if (mOwnsFile) {
49 mFile->Close();
50 delete mFile;
51 }
52}
53
54std::shared_ptr<RootObjectHandler> TFileFileSystem::GetObjectHandler(arrow::dataset::FileSource source)
55{
56 // We use a plugin to create the actual objects inside the
57 // file, so that we can support TTree and RNTuple at the same time
58 // without having to depend on both.
59 for (auto& capability : mObjectFactory.capabilities) {
60 auto objectPath = capability.lfn2objectPath(source.path());
61 void* handle = capability.getHandle(shared_from_this(), objectPath);
62 if (!handle) {
63 continue;
64 }
65 return std::make_shared<RootObjectHandler>(handle, capability.factory().format());
66 }
67 throw runtime_error_f("Unable to get handler for object %s", source.path().c_str());
68}
69
70bool TFileFileSystem::CheckSupport(arrow::dataset::FileSource source)
71{
72 // We use a plugin to create the actual objects inside the
73 // file, so that we can support TTree and RNTuple at the same time
74 // without having to depend on both.
75 for (auto& capability : mObjectFactory.capabilities) {
76 auto objectPath = capability.lfn2objectPath(source.path());
77
78 void* handle = capability.getHandle(shared_from_this(), objectPath);
79 if (handle) {
80 return true;
81 }
82 }
83 return false;
84}
85
86std::shared_ptr<VirtualRootFileSystemBase> TFileFileSystem::GetSubFilesystem(arrow::dataset::FileSource source)
87{
88 auto directory = (TDirectoryFile*)mFile->GetObjectChecked(source.path().c_str(), TClass::GetClass<TDirectory>());
89 if (directory) {
90 return std::shared_ptr<VirtualRootFileSystemBase>(new TFileFileSystem(directory, 50 * 1024 * 1024, mObjectFactory));
91 }
92 throw runtime_error_f("Unsupported file layout");
93}
94
95arrow::Result<arrow::fs::FileInfo> TFileFileSystem::GetFileInfo(const std::string& path)
96{
97 arrow::fs::FileInfo result;
98 result.set_type(arrow::fs::FileType::NotFound);
99 result.set_path(path);
100 arrow::dataset::FileSource source(path, shared_from_this());
101
102 auto fs = GetSubFilesystem(source);
103
104 // For now we only support single trees.
105 if (std::dynamic_pointer_cast<TFileFileSystem>(fs)) {
106 result.set_type(arrow::fs::FileType::Directory);
107 return result;
108 }
109 // Everything else is a file, if it was created.
110 if (fs.get()) {
111 result.set_type(arrow::fs::FileType::File);
112 }
113 return result;
114}
115
116arrow::Result<std::shared_ptr<arrow::io::OutputStream>> TFileFileSystem::OpenOutputStream(
117 const std::string& path,
118 const std::shared_ptr<const arrow::KeyValueMetadata>& metadata)
119{
120 if (path == "/") {
121 return std::make_shared<TDirectoryFileOutputStream>(this->GetFile());
122 }
123
124 auto* dir = dynamic_cast<TDirectoryFile*>(this->GetFile()->Get(path.c_str()));
125 if (!dir) {
126 return arrow::Status::Invalid(fmt::format("Unable to open directory {} in file {} ", path.c_str(), GetFile()->GetName()));
127 }
128 auto stream = std::make_shared<TDirectoryFileOutputStream>(dir);
129 return stream;
130}
131
132arrow::Result<arrow::fs::FileInfo> VirtualRootFileSystemBase::GetFileInfo(std::string const&)
133{
134 arrow::fs::FileInfo result;
135 result.set_type(arrow::fs::FileType::NotFound);
136 return result;
137}
138
139arrow::Result<arrow::fs::FileInfoVector> VirtualRootFileSystemBase::GetFileInfo(const arrow::fs::FileSelector& select)
140{
141 arrow::fs::FileInfoVector results;
142 auto selected = this->GetFileInfo(select.base_dir);
143 if (selected.ok()) {
144 results.emplace_back(*selected);
145 }
146 return results;
147}
148
149arrow::Status VirtualRootFileSystemBase::CreateDir(const std::string& path, bool recursive)
150{
151 return arrow::Status::NotImplemented("Read only filesystem");
152}
153
154arrow::Status VirtualRootFileSystemBase::DeleteDir(const std::string& path)
155{
156 return arrow::Status::NotImplemented("Read only filesystem");
157}
158
159arrow::Status VirtualRootFileSystemBase::CopyFile(const std::string& src, const std::string& dest)
160{
161 return arrow::Status::NotImplemented("Read only filesystem");
162}
163
164arrow::Status VirtualRootFileSystemBase::Move(const std::string& src, const std::string& dest)
165{
166 return arrow::Status::NotImplemented("Read only filesystem");
167}
168
169arrow::Status VirtualRootFileSystemBase::DeleteDirContents(const std::string& path, bool missing_dir_ok)
170{
171 return arrow::Status::NotImplemented("Read only filesystem");
172}
173
175{
176 return arrow::Status::NotImplemented("Read only filesystem");
177}
178
179arrow::Status VirtualRootFileSystemBase::DeleteFile(const std::string& path)
180{
181 return arrow::Status::NotImplemented("Read only filesystem");
182}
183
184arrow::Result<std::shared_ptr<arrow::io::InputStream>> VirtualRootFileSystemBase::OpenInputStream(const std::string& path)
185{
186 return arrow::Status::NotImplemented("Non streamable filesystem");
187}
188
189arrow::Result<std::shared_ptr<arrow::io::RandomAccessFile>> VirtualRootFileSystemBase::OpenInputFile(const std::string& path)
190{
191 return arrow::Status::NotImplemented("No random access file system");
192}
193
194arrow::Result<std::shared_ptr<arrow::io::OutputStream>> VirtualRootFileSystemBase::OpenOutputStream(
195 const std::string& path,
196 const std::shared_ptr<const arrow::KeyValueMetadata>& metadata)
197{
198 return arrow::Status::NotImplemented("Non streamable filesystem");
199}
200
201arrow::Result<std::shared_ptr<arrow::io::OutputStream>> VirtualRootFileSystemBase::OpenAppendStream(
202 const std::string& path,
203 const std::shared_ptr<const arrow::KeyValueMetadata>& metadata)
204{
205 return arrow::Status::NotImplemented("No random access file system");
206}
207
208// An arrow outputstream which allows to write to a ttree
210 : mDirectory(f)
211{
212}
213
215{
216 mDirectory->GetFile()->Close();
217 return arrow::Status::OK();
218}
219
220arrow::Result<int64_t> TDirectoryFileOutputStream::Tell() const
221{
222 return arrow::Result<int64_t>(arrow::Status::NotImplemented("Cannot move"));
223}
224
225arrow::Status TDirectoryFileOutputStream::Write(const void* data, int64_t nbytes)
226{
227 return arrow::Status::NotImplemented("Cannot write raw bytes to a TTree");
228}
229
231{
232 return mDirectory->GetFile()->IsOpen() == false;
233}
234
237 mBuffer(f),
238 mFilesystem(nullptr),
239 mObjectFactory(factory)
240{
241}
242
243arrow::Result<arrow::fs::FileInfo> TBufferFileFS::GetFileInfo(const std::string& path)
244{
245 arrow::fs::FileInfo result;
246 result.set_type(arrow::fs::FileType::NotFound);
247 result.set_path(path);
248 arrow::dataset::FileSource source(path, shared_from_this());
249
250 // Only once to avoid rereading the streamed tree.
251 if (!mFilesystem.get()) {
252 return result;
253 }
254
255 auto info = mFilesystem->GetFileInfo(path);
256 if (!info.ok()) {
257 return result;
258 }
259
260 result.set_type(info->type());
261 return result;
262}
263
264bool TBufferFileFS::CheckSupport(arrow::dataset::FileSource source)
265{
266 // We use a plugin to create the actual objects inside the
267 // file, so that we can support TTree and RNTuple at the same time
268 // without having to depend on both.
269 for (auto& capability : mObjectFactory.capabilities) {
270 auto objectPath = capability.lfn2objectPath(source.path());
271
272 mBuffer->SetBufferOffset(0);
273 mBuffer->InitMap();
274 TClass* serializedClass = mBuffer->ReadClass();
275 mBuffer->SetBufferOffset(0);
276 mBuffer->ResetMap();
277 mBuffer->Reset();
278 if (!serializedClass) {
279 continue;
280 }
281
282 bool supports = capability.checkSupport(serializedClass->GetName());
283 if (supports) {
284 return true;
285 }
286 }
287 return false;
288}
289
290std::shared_ptr<RootObjectHandler> TBufferFileFS::GetObjectHandler(arrow::dataset::FileSource source)
291{
292 // We use a plugin to create the actual objects inside the
293 // file, so that we can support TTree and RNTuple at the same time
294 // without having to depend on both.
295 for (auto& capability : mObjectFactory.capabilities) {
296 auto objectPath = capability.lfn2objectPath(source.path());
297 void* handle = capability.getHandle(shared_from_this(), objectPath);
298 if (!handle) {
299 continue;
300 }
301 return std::make_shared<RootObjectHandler>(handle, capability.factory().format());
302 }
303 throw runtime_error_f("Unable to get handler for object %s", source.path().c_str());
304}
305
307{
308 if (payload) {
309 throw runtime_error_f("Payload not owned");
310 }
311}
312
313} // namespace o2::framework
TBufferFileFS(TBufferFile *f, RootObjectReadingFactory &)
arrow::Result< arrow::fs::FileInfo > GetFileInfo(const std::string &path) override
std::shared_ptr< RootObjectHandler > GetObjectHandler(arrow::dataset::FileSource source) override
bool CheckSupport(arrow::dataset::FileSource source) override
arrow::Status Write(const void *data, int64_t nbytes) override
arrow::Result< int64_t > Tell() const override
virtual std::shared_ptr< VirtualRootFileSystemBase > GetSubFilesystem(arrow::dataset::FileSource source)
bool CheckSupport(arrow::dataset::FileSource source) override
arrow::Result< arrow::fs::FileInfo > GetFileInfo(const std::string &path) override
arrow::Result< std::shared_ptr< arrow::io::OutputStream > > OpenOutputStream(const std::string &path, const std::shared_ptr< const arrow::KeyValueMetadata > &metadata) override
std::shared_ptr< RootObjectHandler > GetObjectHandler(arrow::dataset::FileSource source) override
arrow::Status DeleteDir(const std::string &path) override
arrow::Status Move(const std::string &src, const std::string &dest) override
arrow::Status DeleteFile(const std::string &path) override
arrow::Status DeleteDirContents(const std::string &path, bool missing_dir_ok) override
arrow::Status CreateDir(const std::string &path, bool recursive) override
arrow::Status CopyFile(const std::string &src, const std::string &dest) override
arrow::Result< arrow::fs::FileInfo > GetFileInfo(const std::string &path) override
arrow::Result< std::shared_ptr< arrow::io::OutputStream > > OpenOutputStream(const std::string &path, const std::shared_ptr< const arrow::KeyValueMetadata > &metadata) override
arrow::Result< std::shared_ptr< arrow::io::InputStream > > OpenInputStream(const std::string &path) override
arrow::Result< std::shared_ptr< arrow::io::OutputStream > > OpenAppendStream(const std::string &path, const std::shared_ptr< const arrow::KeyValueMetadata > &metadata) override
arrow::Result< std::shared_ptr< arrow::io::RandomAccessFile > > OpenInputFile(const std::string &path) override
GLenum src
Definition glcorearb.h:1767
GLuint64EXT * result
Definition glcorearb.h:5662
GLdouble f
Definition glcorearb.h:310
GLsizei GLsizei GLchar * source
Definition glcorearb.h:798
GLboolean * data
Definition glcorearb.h:298
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
GLuint GLuint stream
Definition glcorearb.h:1806
Defining PrimaryVertex explicitly as messageable.
Definition Cartesian.h:288
std::vector< InputSpec > select(char const *matcher="")
RuntimeErrorRef runtime_error_f(const char *,...)
std::vector< RootObjectReadingCapability > capabilities