Project
Loading...
Searching...
No Matches
RootArrowFilesystem.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
13#include <Rtypes.h>
14#include <arrow/array/array_nested.h>
15#include <arrow/array/array_primitive.h>
16#include <arrow/array/builder_nested.h>
17#include <arrow/array/builder_primitive.h>
18#include <memory>
19#include <TFile.h>
20#include <TBufferFile.h>
21#include <TDirectoryFile.h>
22#include <arrow/type.h>
23#include <arrow/type_fwd.h>
24#include <arrow/dataset/file_base.h>
25#include <arrow/result.h>
26#include <arrow/status.h>
27#include <fmt/format.h>
28#include <TKey.h>
29
30template class
31 std::shared_ptr<arrow::Array>;
32
33namespace o2::framework
34{
35using arrow::Status;
36
37TFileFileSystem::TFileFileSystem(TDirectoryFile* f, size_t readahead, RootObjectReadingFactory& factory)
39 mFile(f),
40 mObjectFactory(factory)
41{
42 ((TFile*)mFile)->SetReadaheadSize(50 * 1024 * 1024);
43}
44
46{
47 mFile->Close();
48 delete mFile;
49}
50
51std::shared_ptr<RootObjectHandler> TFileFileSystem::GetObjectHandler(arrow::dataset::FileSource source)
52{
53 // We use a plugin to create the actual objects inside the
54 // file, so that we can support TTree and RNTuple at the same time
55 // without having to depend on both.
56 for (auto& capability : mObjectFactory.capabilities) {
57 auto objectPath = capability.lfn2objectPath(source.path());
58 void* handle = capability.getHandle(shared_from_this(), objectPath);
59 if (!handle) {
60 continue;
61 }
62 return std::make_shared<RootObjectHandler>(handle, capability.factory().format());
63 }
64 throw runtime_error_f("Unable to get handler for object %s", source.path().c_str());
65}
66
67bool TFileFileSystem::CheckSupport(arrow::dataset::FileSource source)
68{
69 // We use a plugin to create the actual objects inside the
70 // file, so that we can support TTree and RNTuple at the same time
71 // without having to depend on both.
72 for (auto& capability : mObjectFactory.capabilities) {
73 auto objectPath = capability.lfn2objectPath(source.path());
74
75 void* handle = capability.getHandle(shared_from_this(), objectPath);
76 if (handle) {
77 return true;
78 }
79 }
80 return false;
81}
82
83std::shared_ptr<VirtualRootFileSystemBase> TFileFileSystem::GetSubFilesystem(arrow::dataset::FileSource source)
84{
85 auto directory = (TDirectoryFile*)mFile->GetObjectChecked(source.path().c_str(), TClass::GetClass<TDirectory>());
86 if (directory) {
87 return std::shared_ptr<VirtualRootFileSystemBase>(new TFileFileSystem(directory, 50 * 1024 * 1024, mObjectFactory));
88 }
89 throw runtime_error_f("Unsupported file layout");
90}
91
92arrow::Result<arrow::fs::FileInfo> TFileFileSystem::GetFileInfo(const std::string& path)
93{
94 arrow::fs::FileInfo result;
95 result.set_type(arrow::fs::FileType::NotFound);
96 result.set_path(path);
97 arrow::dataset::FileSource source(path, shared_from_this());
98
99 auto fs = GetSubFilesystem(source);
100
101 // For now we only support single trees.
102 if (std::dynamic_pointer_cast<TFileFileSystem>(fs)) {
103 result.set_type(arrow::fs::FileType::Directory);
104 return result;
105 }
106 // Everything else is a file, if it was created.
107 if (fs.get()) {
108 result.set_type(arrow::fs::FileType::File);
109 }
110 return result;
111}
112
113arrow::Result<std::shared_ptr<arrow::io::OutputStream>> TFileFileSystem::OpenOutputStream(
114 const std::string& path,
115 const std::shared_ptr<const arrow::KeyValueMetadata>& metadata)
116{
117 if (path == "/") {
118 return std::make_shared<TDirectoryFileOutputStream>(this->GetFile());
119 }
120
121 auto* dir = dynamic_cast<TDirectoryFile*>(this->GetFile()->Get(path.c_str()));
122 if (!dir) {
123 return arrow::Status::Invalid(fmt::format("Unable to open directory {} in file {} ", path.c_str(), GetFile()->GetName()));
124 }
125 auto stream = std::make_shared<TDirectoryFileOutputStream>(dir);
126 return stream;
127}
128
129arrow::Result<arrow::fs::FileInfo> VirtualRootFileSystemBase::GetFileInfo(std::string const&)
130{
131 arrow::fs::FileInfo result;
132 result.set_type(arrow::fs::FileType::NotFound);
133 return result;
134}
135
136arrow::Result<arrow::fs::FileInfoVector> VirtualRootFileSystemBase::GetFileInfo(const arrow::fs::FileSelector& select)
137{
138 arrow::fs::FileInfoVector results;
139 auto selected = this->GetFileInfo(select.base_dir);
140 if (selected.ok()) {
141 results.emplace_back(*selected);
142 }
143 return results;
144}
145
146arrow::Status VirtualRootFileSystemBase::CreateDir(const std::string& path, bool recursive)
147{
148 return arrow::Status::NotImplemented("Read only filesystem");
149}
150
151arrow::Status VirtualRootFileSystemBase::DeleteDir(const std::string& path)
152{
153 return arrow::Status::NotImplemented("Read only filesystem");
154}
155
156arrow::Status VirtualRootFileSystemBase::CopyFile(const std::string& src, const std::string& dest)
157{
158 return arrow::Status::NotImplemented("Read only filesystem");
159}
160
161arrow::Status VirtualRootFileSystemBase::Move(const std::string& src, const std::string& dest)
162{
163 return arrow::Status::NotImplemented("Read only filesystem");
164}
165
166arrow::Status VirtualRootFileSystemBase::DeleteDirContents(const std::string& path, bool missing_dir_ok)
167{
168 return arrow::Status::NotImplemented("Read only filesystem");
169}
170
172{
173 return arrow::Status::NotImplemented("Read only filesystem");
174}
175
176arrow::Status VirtualRootFileSystemBase::DeleteFile(const std::string& path)
177{
178 return arrow::Status::NotImplemented("Read only filesystem");
179}
180
181arrow::Result<std::shared_ptr<arrow::io::InputStream>> VirtualRootFileSystemBase::OpenInputStream(const std::string& path)
182{
183 return arrow::Status::NotImplemented("Non streamable filesystem");
184}
185
186arrow::Result<std::shared_ptr<arrow::io::RandomAccessFile>> VirtualRootFileSystemBase::OpenInputFile(const std::string& path)
187{
188 return arrow::Status::NotImplemented("No random access file system");
189}
190
191arrow::Result<std::shared_ptr<arrow::io::OutputStream>> VirtualRootFileSystemBase::OpenOutputStream(
192 const std::string& path,
193 const std::shared_ptr<const arrow::KeyValueMetadata>& metadata)
194{
195 return arrow::Status::NotImplemented("Non streamable filesystem");
196}
197
198arrow::Result<std::shared_ptr<arrow::io::OutputStream>> VirtualRootFileSystemBase::OpenAppendStream(
199 const std::string& path,
200 const std::shared_ptr<const arrow::KeyValueMetadata>& metadata)
201{
202 return arrow::Status::NotImplemented("No random access file system");
203}
204
205// An arrow outputstream which allows to write to a ttree
207 : mDirectory(f)
208{
209}
210
212{
213 mDirectory->GetFile()->Close();
214 return arrow::Status::OK();
215}
216
217arrow::Result<int64_t> TDirectoryFileOutputStream::Tell() const
218{
219 return arrow::Result<int64_t>(arrow::Status::NotImplemented("Cannot move"));
220}
221
222arrow::Status TDirectoryFileOutputStream::Write(const void* data, int64_t nbytes)
223{
224 return arrow::Status::NotImplemented("Cannot write raw bytes to a TTree");
225}
226
228{
229 return mDirectory->GetFile()->IsOpen() == false;
230}
231
234 mBuffer(f),
235 mFilesystem(nullptr),
236 mObjectFactory(factory)
237{
238}
239
240arrow::Result<arrow::fs::FileInfo> TBufferFileFS::GetFileInfo(const std::string& path)
241{
242 arrow::fs::FileInfo result;
243 result.set_type(arrow::fs::FileType::NotFound);
244 result.set_path(path);
245 arrow::dataset::FileSource source(path, shared_from_this());
246
247 // Only once to avoid rereading the streamed tree.
248 if (!mFilesystem.get()) {
249 return result;
250 }
251
252 auto info = mFilesystem->GetFileInfo(path);
253 if (!info.ok()) {
254 return result;
255 }
256
257 result.set_type(info->type());
258 return result;
259}
260
261bool TBufferFileFS::CheckSupport(arrow::dataset::FileSource source)
262{
263 // We use a plugin to create the actual objects inside the
264 // file, so that we can support TTree and RNTuple at the same time
265 // without having to depend on both.
266 for (auto& capability : mObjectFactory.capabilities) {
267 auto objectPath = capability.lfn2objectPath(source.path());
268
269 mBuffer->SetBufferOffset(0);
270 mBuffer->InitMap();
271 TClass* serializedClass = mBuffer->ReadClass();
272 mBuffer->SetBufferOffset(0);
273 mBuffer->ResetMap();
274 mBuffer->Reset();
275 if (!serializedClass) {
276 continue;
277 }
278
279 bool supports = capability.checkSupport(serializedClass->GetName());
280 if (supports) {
281 return true;
282 }
283 }
284 return false;
285}
286
287std::shared_ptr<RootObjectHandler> TBufferFileFS::GetObjectHandler(arrow::dataset::FileSource source)
288{
289 // We use a plugin to create the actual objects inside the
290 // file, so that we can support TTree and RNTuple at the same time
291 // without having to depend on both.
292 for (auto& capability : mObjectFactory.capabilities) {
293 auto objectPath = capability.lfn2objectPath(source.path());
294 void* handle = capability.getHandle(shared_from_this(), objectPath);
295 if (!handle) {
296 continue;
297 }
298 return std::make_shared<RootObjectHandler>(handle, capability.factory().format());
299 }
300 throw runtime_error_f("Unable to get handler for object %s", source.path().c_str());
301}
302
304{
305 if (payload) {
306 throw runtime_error_f("Payload not owned");
307 }
308}
309
310} // namespace o2::framework
TBufferFileFS(TBufferFile *f, RootObjectReadingFactory &)
arrow::Result< arrow::fs::FileInfo > GetFileInfo(const std::string &path) override
std::shared_ptr< RootObjectHandler > GetObjectHandler(arrow::dataset::FileSource source) override
bool CheckSupport(arrow::dataset::FileSource source) override
arrow::Status Write(const void *data, int64_t nbytes) override
arrow::Result< int64_t > Tell() const override
virtual std::shared_ptr< VirtualRootFileSystemBase > GetSubFilesystem(arrow::dataset::FileSource source)
bool CheckSupport(arrow::dataset::FileSource source) override
arrow::Result< arrow::fs::FileInfo > GetFileInfo(const std::string &path) override
arrow::Result< std::shared_ptr< arrow::io::OutputStream > > OpenOutputStream(const std::string &path, const std::shared_ptr< const arrow::KeyValueMetadata > &metadata) override
std::shared_ptr< RootObjectHandler > GetObjectHandler(arrow::dataset::FileSource source) override
arrow::Status DeleteDir(const std::string &path) override
arrow::Status Move(const std::string &src, const std::string &dest) override
arrow::Status DeleteFile(const std::string &path) override
arrow::Status DeleteDirContents(const std::string &path, bool missing_dir_ok) override
arrow::Status CreateDir(const std::string &path, bool recursive) override
arrow::Status CopyFile(const std::string &src, const std::string &dest) override
arrow::Result< arrow::fs::FileInfo > GetFileInfo(const std::string &path) override
arrow::Result< std::shared_ptr< arrow::io::OutputStream > > OpenOutputStream(const std::string &path, const std::shared_ptr< const arrow::KeyValueMetadata > &metadata) override
arrow::Result< std::shared_ptr< arrow::io::InputStream > > OpenInputStream(const std::string &path) override
arrow::Result< std::shared_ptr< arrow::io::OutputStream > > OpenAppendStream(const std::string &path, const std::shared_ptr< const arrow::KeyValueMetadata > &metadata) override
arrow::Result< std::shared_ptr< arrow::io::RandomAccessFile > > OpenInputFile(const std::string &path) override
GLenum src
Definition glcorearb.h:1767
GLuint64EXT * result
Definition glcorearb.h:5662
GLdouble f
Definition glcorearb.h:310
GLsizei GLsizei GLchar * source
Definition glcorearb.h:798
GLboolean * data
Definition glcorearb.h:298
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
GLuint GLuint stream
Definition glcorearb.h:1806
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< InputSpec > select(char const *matcher="")
RuntimeErrorRef runtime_error_f(const char *,...)
std::vector< RootObjectReadingCapability > capabilities