Project
Loading...
Searching...
No Matches
DataInputDirector.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "DataInputDirector.h"
13#include "Framework/Logger.h"
17#include "Framework/Output.h"
18#include "Framework/Signpost.h"
20#include "Headers/DataHeader.h"
21#include "Monitoring/Tags.h"
22#include "Monitoring/Metric.h"
23#include "Monitoring/Monitoring.h"
24
25#include "rapidjson/document.h"
26#include "rapidjson/prettywriter.h"
27#include "rapidjson/filereadstream.h"
28
29#include "TGrid.h"
30#include "TObjString.h"
31#include "TMap.h"
32#include "TFile.h"
33
34#include <arrow/dataset/file_base.h>
35#include <arrow/dataset/dataset.h>
36#include <uv.h>
37#include <memory>
38
39#if __has_include(<TJAlienFile.h>)
40#include <TJAlienFile.h>
41
42#include <utility>
43#endif
44
45#include <dlfcn.h>
46O2_DECLARE_DYNAMIC_LOG(reader_memory_dump);
47
48namespace o2::framework
49{
50using namespace rapidjson;
51
52FileNameHolder* makeFileNameHolder(std::string fileName)
53{
54 auto fileNameHolder = new FileNameHolder();
55 fileNameHolder->fileName = fileName;
56
57 return fileNameHolder;
58}
59
61 : mAlienSupport(alienSupport),
62 mLevel(level),
63 mContext(context)
64{
65 std::vector<char const*> capabilitiesSpecs = {
66 "O2Framework:RNTupleObjectReadingCapability",
67 "O2Framework:TTreeObjectReadingCapability",
68 };
69
70 std::vector<LoadablePlugin> plugins;
71 for (auto spec : capabilitiesSpecs) {
72 auto morePlugins = PluginManager::parsePluginSpecString(spec);
73 for (auto& extra : morePlugins) {
74 plugins.push_back(extra);
75 }
76 }
77
78 PluginManager::loadFromPlugin<RootObjectReadingCapability, RootObjectReadingCapabilityPlugin>(plugins, mFactory.capabilities);
79}
80
82{
83 LOGP(info, "DataInputDescriptor");
84 LOGP(info, " Table name : {}", tablename);
85 LOGP(info, " Tree name : {}", treename);
86 LOGP(info, " Input files file : {}", getInputfilesFilename());
87 LOGP(info, " File name regex : {}", getFilenamesRegexString());
88 LOGP(info, " Input files : {}", mfilenames.size());
89 for (auto fn : mfilenames) {
90 LOGP(info, " {} {}", fn->fileName, fn->numberOfTimeFrames);
91 }
92 LOGP(info, " Total number of TF: {}", getNumberTimeFrames());
93}
94
96{
97 return (minputfilesFile.empty() && minputfilesFilePtr) ? (std::string)*minputfilesFilePtr : minputfilesFile;
98}
99
101{
102 return (mFilenameRegex.empty() && mFilenameRegexPtr) ? (std::string)*mFilenameRegexPtr : mFilenameRegex;
103}
104
106{
107 return std::regex(getFilenamesRegexString());
108}
109
111{
112 // remove leading file:// from file name
113 if (fn->fileName.rfind("file://", 0) == 0) {
114 fn->fileName.erase(0, 7);
115 } else if (!mAlienSupport && fn->fileName.rfind("alien://", 0) == 0 && !gGrid) {
116 LOGP(debug, "AliEn file requested. Enabling support.");
117 TGrid::Connect("alien://");
118 mAlienSupport = true;
119 }
120
121 mtotalNumberTimeFrames += fn->numberOfTimeFrames;
122 mfilenames.emplace_back(fn);
123}
124
125bool DataInputDescriptor::setFile(int counter, int wantedParentLevel, std::string_view origin)
126{
127 // no files left
128 if (counter >= getNumberInputfiles()) {
129 return false;
130 }
131
132 // In case the origin starts with a anything but AOD, we add the origin as the suffix
133 // of the filename. In the future we might expand this for proper rewriting of the
134 // filename based on the origin and the original file information.
135 std::string filename = mfilenames[counter]->fileName;
136 // In case we do not need to remap parent levels, the requested origin is what
137 // drives the filename.
138 if (wantedParentLevel == -1 && !origin.starts_with("AOD")) {
139 filename = std::regex_replace(filename, std::regex("[.]root$"), fmt::format("_{}.root", origin));
140 }
141
142 // open file
143 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
144 if (rootFS.get()) {
145 if (rootFS->GetFile()->GetName() == filename) {
146 return true;
147 }
149 }
150
151 TFile* tfile = nullptr;
152 bool externalFile = false;
153 for (auto& [name, f] : mContext.openFiles) {
154 if (name == filename) {
155 tfile = f;
156 externalFile = true;
157 break;
158 }
159 }
160 if (tfile == nullptr) {
161 tfile = TFile::Open(filename.c_str());
162 }
163 mCurrentFilesystem = std::make_shared<TFileFileSystem>(tfile, 50 * 1024 * 1024, mFactory, !externalFile);
164 if (!mCurrentFilesystem.get()) {
165 throw std::runtime_error(fmt::format("Couldn't open file \"{}\"!", filename));
166 }
167 rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
169
170 // get the parent file map if exists
171 mParentFileMap = (TMap*)rootFS->GetFile()->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path)
172 if (mParentFileMap && !mContext.parentFileReplacement.empty()) {
173 auto pos = mContext.parentFileReplacement.find(';');
174 if (pos == std::string::npos) {
175 throw std::runtime_error(fmt::format("Invalid syntax in aod-parent-base-path-replacement: \"{}\"", mContext.parentFileReplacement.c_str()));
176 }
177 auto from = mContext.parentFileReplacement.substr(0, pos);
178 auto to = mContext.parentFileReplacement.substr(pos + 1);
179
180 auto it = mParentFileMap->MakeIterator();
181 while (auto obj = it->Next()) {
182 auto objString = (TObjString*)mParentFileMap->GetValue(obj);
183 objString->String().ReplaceAll(from.c_str(), to.c_str());
184 }
185 delete it;
186 }
187
188 // get the directory names
189 if (mfilenames[counter]->numberOfTimeFrames <= 0) {
190 const std::regex TFRegex = std::regex("/?DF_([0-9]+)(|-.*)$");
191 TList* keyList = rootFS->GetFile()->GetListOfKeys();
192 std::vector<std::string> finalList;
193
194 // extract TF numbers and sort accordingly
195 // We use an extra seen set to make sure we preserve the order in which
196 // we instert things in the final list and to make sure we do not have duplicates.
197 // Multiple folder numbers can happen if we use a flat structure /DF_<df>-<tablename>
198 std::unordered_set<size_t> seen;
199 for (auto key : *keyList) {
200 std::smatch matchResult;
201 std::string keyName = ((TObjString*)key)->GetString().Data();
202 bool match = std::regex_match(keyName, matchResult, TFRegex);
203 if (match) {
204 auto folderNumber = std::stoul(matchResult[1].str());
205 if (seen.find(folderNumber) == seen.end()) {
206 seen.insert(folderNumber);
207 mfilenames[counter]->listOfTimeFrameNumbers.emplace_back(folderNumber);
208 }
209 }
210 }
211
212 if (mParentFileMap != nullptr) {
213 // If we have a parent map, we should not process in DF alphabetical order but according to parent file to avoid swapping between files
214 std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end(),
215 [this](long const& l1, long const& l2) -> bool {
216 auto p1 = (TObjString*)this->mParentFileMap->GetValue(("DF_" + std::to_string(l1)).c_str());
217 auto p2 = (TObjString*)this->mParentFileMap->GetValue(("DF_" + std::to_string(l2)).c_str());
218 return p1->GetString().CompareTo(p2->GetString()) < 0;
219 });
220 } else {
221 std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end());
222 }
223
224 mfilenames[counter]->alreadyRead.resize(mfilenames[counter]->alreadyRead.size() + mfilenames[counter]->listOfTimeFrameNumbers.size(), false);
225 mfilenames[counter]->numberOfTimeFrames = mfilenames[counter]->listOfTimeFrameNumbers.size();
226 }
227
228 mCurrentFileID = counter;
229 mCurrentFileStartedAt = uv_hrtime();
230 mIOTime = 0;
231
232 return true;
233}
234
235uint64_t DataInputDescriptor::getTimeFrameNumber(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin)
236{
237
238 // open file
239 if (!setFile(counter, wantedParentLevel, wantedOrigin)) {
240 return 0ul;
241 }
242
243 // no TF left
244 if (mfilenames[counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[counter]->numberOfTimeFrames) {
245 return 0ul;
246 }
247
248 return (mfilenames[counter]->listOfTimeFrameNumbers)[numTF];
249}
250
251std::pair<DataInputDescriptor*, int> DataInputDescriptor::navigateToLevel(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin)
252{
253 if (!setFile(counter, wantedParentLevel, wantedOrigin)) {
254 return {nullptr, -1};
255 }
256 auto folderName = fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]);
257 auto parentFile = getParentFile(counter, numTF, "", wantedParentLevel, wantedOrigin);
258 if (parentFile == nullptr) {
259 return {nullptr, -1};
260 }
261 return {parentFile, parentFile->findDFNumber(0, folderName)};
262}
263
264arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin)
265{
266 // If mapped to a parent level deeper than current, skip directly to the right level.
267 if (wantedParentLevel != -1 && mLevel < wantedParentLevel) {
268 auto [parentFile, parentNumTF] = navigateToLevel(counter, numTF, wantedParentLevel, wantedOrigin);
269 if (parentFile == nullptr || parentNumTF == -1) {
270 return {};
271 }
272 return parentFile->getFileFolder(0, parentNumTF, wantedParentLevel, wantedOrigin);
273 }
274
275 // open file
276 if (!setFile(counter, wantedParentLevel, wantedOrigin)) {
277 return {};
278 }
279
280 // no TF left
281 if (mfilenames[counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[counter]->numberOfTimeFrames) {
282 return {};
283 }
284
285 mfilenames[counter]->alreadyRead[numTF] = true;
286
287 return {fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]), mCurrentFilesystem};
288}
289
290DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, std::string treename, int wantedParentLevel, std::string_view wantedOrigin)
291{
292 if (!mParentFileMap) {
293 // This file has no parent map
294 return nullptr;
295 }
296 auto folderName = fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]);
297 auto parentFileName = (TObjString*)mParentFileMap->GetValue(folderName.c_str());
298 // The current DF is not found in the parent map (this should not happen and is a fatal error)
299 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
300 if (!parentFileName) {
301 throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), rootFS->GetFile()->GetName()));
302 return nullptr;
303 }
304
305 if (mParentFile) {
306 // Is this still the corresponding to the correct file?
307 auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(mParentFile->mCurrentFilesystem);
308 if (parentFileName->GetString().CompareTo(parentRootFS->GetFile()->GetName()) == 0) {
309 return mParentFile;
310 } else {
311 mParentFile->closeInputFile();
312 delete mParentFile;
313 mParentFile = nullptr;
314 }
315 }
316
317 if (mLevel == mContext.allowedParentLevel) {
318 throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mContext.allowedParentLevel, folderName.c_str(),
319 rootFS->GetFile()->GetName()));
320 }
321
322 LOGP(info, "Opening parent file {} for DF {}", parentFileName->GetString().Data(), folderName.c_str());
323 mParentFile = new DataInputDescriptor(mAlienSupport, mLevel + 1, mContext);
324 mParentFile->mdefaultFilenamesPtr = new std::vector<FileNameHolder*>;
325 mParentFile->mdefaultFilenamesPtr->emplace_back(makeFileNameHolder(parentFileName->GetString().Data()));
326 mParentFile->fillInputfiles();
327 mParentFile->setFile(0, wantedParentLevel, wantedOrigin);
328 return mParentFile;
329}
330
332{
333 return mfilenames.at(counter)->numberOfTimeFrames;
334}
335
337{
338 auto& list = mfilenames.at(counter)->alreadyRead;
339 return std::count(list.begin(), list.end(), true);
340}
341
343{
344 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
345 auto f = dynamic_cast<TFile*>(rootFS->GetFile());
346 std::string monitoringInfo(fmt::format("lfn={},size={}", f->GetName(), f->GetSize()));
347#if __has_include(<TJAlienFile.h>)
348 auto alienFile = dynamic_cast<TJAlienFile*>(f);
349 if (alienFile) {
350 monitoringInfo += fmt::format(",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed());
351 }
352#endif
353 if (mContext.monitoring) {
354 mContext.monitoring->send(o2::monitoring::Metric{monitoringInfo, "aod-file-open-info"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
355 }
356 LOGP(info, "Opening file: {}", monitoringInfo);
357}
358
360{
361 int64_t wait_time = (int64_t)uv_hrtime() - (int64_t)mCurrentFileStartedAt - (int64_t)mIOTime;
362 if (wait_time < 0) {
363 wait_time = 0;
364 }
365 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
366 auto f = dynamic_cast<TFile*>(rootFS->GetFile());
367 std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", f->GetName(),
368 f->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), f->GetBytesRead(), f->GetReadCalls(),
369 ((float)mIOTime / 1e9), ((float)wait_time / 1e9), mLevel));
370#if __has_include(<TJAlienFile.h>)
371 auto alienFile = dynamic_cast<TJAlienFile*>(f);
372 if (alienFile) {
373 monitoringInfo += fmt::format(",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed());
374 }
375#endif
376 if (mContext.monitoring) {
377 mContext.monitoring->send(o2::monitoring::Metric{monitoringInfo, "aod-file-read-info"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
378 }
379 LOGP(info, "Read info: {}", monitoringInfo);
380}
381
383{
384 if (mCurrentFilesystem.get()) {
385 if (mParentFile) {
386 mParentFile->closeInputFile();
387 delete mParentFile;
388 mParentFile = nullptr;
389 }
390
391 delete mParentFileMap;
392 mParentFileMap = nullptr;
393
395 mCurrentFilesystem.reset();
396 }
397}
398
400{
401 if (getNumberInputfiles() > 0) {
402 // 1. mfilenames
403 return getNumberInputfiles();
404 }
405
406 auto fileName = getInputfilesFilename();
407 if (!fileName.empty()) {
408 // 2. getFilenamesRegex() @ getInputfilesFilename()
409 try {
410 std::ifstream filelist(fileName);
411 if (!filelist.is_open()) {
412 throw std::runtime_error(fmt::format(R"(Couldn't open file "{}")", fileName));
413 }
414 while (std::getline(filelist, fileName)) {
415 // remove white spaces, empty lines are skipped
416 fileName.erase(std::remove_if(fileName.begin(), fileName.end(), ::isspace), fileName.end());
417 if (!fileName.empty() && (getFilenamesRegexString().empty() ||
418 std::regex_match(fileName, getFilenamesRegex()))) {
420 }
421 }
422 } catch (...) {
423 LOGP(error, "Check the input files file! Unable to process \"{}\"!", getInputfilesFilename());
424 return 0;
425 }
426 } else {
427 // 3. getFilenamesRegex() @ mdefaultFilenamesPtr
428 if (mdefaultFilenamesPtr) {
429 for (auto fileNameHolder : *mdefaultFilenamesPtr) {
430 if (getFilenamesRegexString().empty() ||
431 std::regex_match(fileNameHolder->fileName, getFilenamesRegex())) {
432 addFileNameHolder(fileNameHolder);
433 }
434 }
435 }
436 }
437
438 return getNumberInputfiles();
439}
440
441int DataInputDescriptor::findDFNumber(int file, std::string dfName)
442{
443 auto dfList = mfilenames[file]->listOfTimeFrameNumbers;
444 auto it = std::find_if(dfList.begin(), dfList.end(), [dfName](size_t i) { return fmt::format("DF_{}", i) == dfName; });
445 if (it == dfList.end()) {
446 return -1;
447 }
448 return it - dfList.begin();
449}
450
453 : mTarget(target)
454 {
455 start = uv_hrtime();
456 }
458 {
459 if (!active) {
460 return;
461 }
462 O2_SIGNPOST_ACTION(reader_memory_dump, [](void*) {
463 void (*dump_)(const char*);
464 if (void* sym = dlsym(nullptr, "igprof_dump_now")) {
465 dump_ = __extension__(void (*)(const char*)) sym;
466 if (dump_) {
467 std::string filename = fmt::format("reader-memory-dump-{}.gz", uv_hrtime());
468 dump_(filename.c_str());
469 }
470 }
471 });
472 mTarget += (uv_hrtime() - start);
473 }
474
476 {
477 active = false;
478 }
479
480 bool active = true;
481 uint64_t& mTarget;
482 uint64_t start;
483 uint64_t stop;
484};
485
486bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, std::string treename, size_t& totalSizeCompressed, size_t& totalSizeUncompressed)
487{
488 CalculateDelta t(mIOTime);
489 std::string wantedOrigin = dh.dataOrigin.as<std::string>();
490 int wantedLevel = mContext.levelForOrigin(wantedOrigin);
491
492 // If this origin is mapped to a parent level deeper than current, skip directly without
493 // attempting to read from this level.
494 if (wantedLevel != -1 && mLevel < wantedLevel) {
495 auto [parentFile, parentNumTF] = navigateToLevel(counter, numTF, wantedLevel, wantedOrigin);
496 if (parentFile == nullptr) {
497 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
498 throw std::runtime_error(fmt::format(R"(No parent file found for "{}" while looking for level {} in "{}")", treename, wantedLevel, rootFS->GetFile()->GetName()));
499 }
500 if (parentNumTF == -1) {
501 auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(parentFile->mCurrentFilesystem);
502 throw std::runtime_error(fmt::format(R"(DF not found in parent file "{}")", parentRootFS->GetFile()->GetName()));
503 }
504 t.deactivate();
505 return parentFile->readTree(outputs, dh, 0, parentNumTF, treename, totalSizeCompressed, totalSizeUncompressed);
506 }
507
508 auto folder = getFileFolder(counter, numTF, wantedLevel, wantedOrigin);
509 if (!folder.filesystem()) {
510 t.deactivate();
511 return false;
512 }
513
514 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(folder.filesystem());
515
516 if (!rootFS) {
517 t.deactivate();
518 throw std::runtime_error(fmt::format(R"(Not a TFile filesystem!)"));
519 }
520 // FIXME: Ugly. We should detect the format from the treename, good enough for now.
521 std::shared_ptr<arrow::dataset::FileFormat> format;
522 FragmentToBatch::StreamerCreator creator = nullptr;
523
524 auto fullpath = arrow::dataset::FileSource{folder.path() + "/" + treename, folder.filesystem()};
525
526 for (auto& capability : mFactory.capabilities) {
527 auto objectPath = capability.lfn2objectPath(fullpath.path());
528 void* handle = capability.getHandle(rootFS, objectPath);
529 if (handle) {
530 format = capability.factory().format();
531 creator = capability.factory().deferredOutputStreamer;
532 break;
533 }
534 }
535
536 // FIXME: we should distinguish between an actually missing object and one which has a non compatible
537 // format.
538 if (!format) {
539 t.deactivate();
540 LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.path());
541 auto parentFile = getParentFile(counter, numTF, treename, wantedLevel, wantedOrigin);
542 if (parentFile != nullptr) {
543 int parentNumTF = parentFile->findDFNumber(0, folder.path());
544 if (parentNumTF == -1) {
545 auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(parentFile->mCurrentFilesystem);
546 throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", folder.path(), parentRootFS->GetFile()->GetName()));
547 }
548 // first argument is 0 as the parent file object contains only 1 file
549 return parentFile->readTree(outputs, dh, 0, parentNumTF, treename, totalSizeCompressed, totalSizeUncompressed);
550 }
551 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
552 throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fullpath.path(), rootFS->GetFile()->GetName()));
553 }
554
555 auto schemaOpt = format->Inspect(fullpath);
556 auto physicalSchema = schemaOpt;
557 std::vector<std::shared_ptr<arrow::Field>> fields;
558 for (auto& original : (*schemaOpt)->fields()) {
559 if (original->name().ends_with("_size")) {
560 continue;
561 }
562 fields.push_back(original);
563 }
564 auto datasetSchema = std::make_shared<arrow::Schema>(fields);
565
566 auto fragment = format->MakeFragment(fullpath, {}, *physicalSchema);
567
568 // create table output
569 auto o = Output(dh);
570
571 // FIXME: This should allow me to create a memory pool
572 // which I can then use to scan the dataset.
573 auto f2b = outputs.make<FragmentToBatch>(o, creator, *fragment);
574
577 f2b->setLabel(treename.c_str());
578 f2b->fill(datasetSchema, format);
579
580 return true;
581}
582
583DataInputDirector::DataInputDirector(std::vector<std::string> inputFiles, DataInputDirectorContext&& context)
584 : mContext{context}
585{
586 if (inputFiles.size() == 1 && !inputFiles[0].empty() && inputFiles[0][0] == '@') {
587 setInputfilesFile(inputFiles.back().substr(1, -1));
588 } else {
589 for (auto inputFile : inputFiles) {
590 mdefaultInputFiles.emplace_back(makeFileNameHolder(inputFile));
591 }
592 }
593
595}
596
598{
599 for (auto fn : mdefaultInputFiles) {
600 delete fn;
601 }
602 mdefaultInputFiles.clear();
603 mdefaultDataInputDescriptor = nullptr;
604
605 for (auto fn : mdataInputDescriptors) {
606 delete fn;
607 }
608 mdataInputDescriptors.clear();
609}
610
612{
613 mdataInputDescriptors.clear();
614 mdefaultInputFiles.clear();
615 mFilenameRegex = std::string("");
616};
617
619{
620 if (mdefaultDataInputDescriptor) {
621 delete mdefaultDataInputDescriptor;
622 }
623 mdefaultDataInputDescriptor = new DataInputDescriptor(mAlienSupport, 0, mContext);
624
625 mdefaultDataInputDescriptor->setInputfilesFile(minputfilesFile);
626 mdefaultDataInputDescriptor->setFilenamesRegex(mFilenameRegex);
627 mdefaultDataInputDescriptor->setDefaultInputfiles(&mdefaultInputFiles);
628 mdefaultDataInputDescriptor->tablename = "any";
629 mdefaultDataInputDescriptor->treename = "any";
630 mdefaultDataInputDescriptor->fillInputfiles();
631
632 mAlienSupport &= mdefaultDataInputDescriptor->isAlienSupportOn();
633}
634
635bool DataInputDirector::readJson(std::string const& fnjson)
636{
637 // open the file
638 FILE* f = fopen(fnjson.c_str(), "r");
639 if (!f) {
640 LOGP(error, "Could not open JSON file \"{}\"!", fnjson);
641 return false;
642 }
643
644 // create streamer
645 char readBuffer[65536];
646 FileReadStream inputStream(f, readBuffer, sizeof(readBuffer));
647
648 // parse the json file
649 Document jsonDoc;
650 jsonDoc.ParseStream(inputStream);
651 auto status = readJsonDocument(&jsonDoc);
652
653 // clean up
654 fclose(f);
655
656 return status;
657}
658
659bool DataInputDirector::readJsonDocument(Document* jsonDoc)
660{
661 // initialisations
662 std::string fileName("");
663 const char* itemName;
664
665 // is it a proper json document?
666 if (jsonDoc->HasParseError()) {
667 LOGP(error, "Check the JSON document! There is a problem with the format!");
668 return false;
669 }
670
671 // InputDirector
672 itemName = "InputDirector";
673 const Value& didirItem = (*jsonDoc)[itemName];
674 if (!didirItem.IsObject()) {
675 LOGP(info, "No \"{}\" object found in the JSON document!", itemName);
676 return true;
677 }
678
679 // now read various items
680 itemName = "debugmode";
681 if (didirItem.HasMember(itemName)) {
682 if (didirItem[itemName].IsBool()) {
683 mDebugMode = (didirItem[itemName].GetBool());
684 } else {
685 LOGP(error, "Check the JSON document! Item \"{}\" must be a boolean!", itemName);
686 return false;
687 }
688 } else {
689 mDebugMode = false;
690 }
691
692 if (mDebugMode) {
693 StringBuffer buffer;
694 buffer.Clear();
695 PrettyWriter<StringBuffer> writer(buffer);
696 didirItem.Accept(writer);
697 LOGP(info, "InputDirector object: {}", std::string(buffer.GetString()));
698 }
699
700 itemName = "fileregex";
701 if (didirItem.HasMember(itemName)) {
702 if (didirItem[itemName].IsString()) {
703 setFilenamesRegex(didirItem[itemName].GetString());
704 } else {
705 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
706 return false;
707 }
708 }
709
710 itemName = "resfiles";
711 if (didirItem.HasMember(itemName)) {
712 if (didirItem[itemName].IsString()) {
713 fileName = didirItem[itemName].GetString();
714 if (fileName.size() && fileName[0] == '@') {
715 fileName.erase(0, 1);
716 setInputfilesFile(fileName);
717 } else {
719 mdefaultInputFiles.emplace_back(makeFileNameHolder(fileName));
720 }
721 } else if (didirItem[itemName].IsArray()) {
723 auto fns = didirItem[itemName].GetArray();
724 for (auto& fn : fns) {
725 mdefaultInputFiles.emplace_back(makeFileNameHolder(fn.GetString()));
726 }
727 } else {
728 LOGP(error, "Check the JSON document! Item \"{}\" must be a string or an array!", itemName);
729 return false;
730 }
731 }
732
733 itemName = "InputDescriptors";
734 if (didirItem.HasMember(itemName)) {
735 if (!didirItem[itemName].IsArray()) {
736 LOGP(error, "Check the JSON document! Item \"{}\" must be an array!", itemName);
737 return false;
738 }
739
740 // loop over DataInputDescriptors
741 for (auto& didescItem : didirItem[itemName].GetArray()) {
742 if (!didescItem.IsObject()) {
743 LOGP(error, "Check the JSON document! \"{}\" must be objects!", itemName);
744 return false;
745 }
746 // create a new dataInputDescriptor
747 auto didesc = new DataInputDescriptor(mAlienSupport, 0, mContext);
748 didesc->setDefaultInputfiles(&mdefaultInputFiles);
749
750 itemName = "table";
751 if (didescItem.HasMember(itemName)) {
752 if (didescItem[itemName].IsString()) {
753 didesc->tablename = didescItem[itemName].GetString();
754 didesc->matcher = DataDescriptorQueryBuilder::buildNode(didesc->tablename);
755 } else {
756 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
757 return false;
758 }
759 } else {
760 LOGP(error, "Check the JSON document! Item \"{}\" is missing!", itemName);
761 return false;
762 }
763
764 itemName = "treename";
765 if (didescItem.HasMember(itemName)) {
766 if (didescItem[itemName].IsString()) {
767 didesc->treename = didescItem[itemName].GetString();
768 } else {
769 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
770 return false;
771 }
772 } else {
773 auto m = DataDescriptorQueryBuilder::getTokens(didesc->tablename);
774 didesc->treename = m[2];
775 }
776
777 itemName = "fileregex";
778 if (didescItem.HasMember(itemName)) {
779 if (didescItem[itemName].IsString()) {
780 if (didesc->getNumberInputfiles() == 0) {
781 didesc->setFilenamesRegex(didescItem[itemName].GetString());
782 }
783 } else {
784 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
785 return false;
786 }
787 } else {
788 if (didesc->getNumberInputfiles() == 0) {
789 didesc->setFilenamesRegex(mFilenameRegexPtr);
790 }
791 }
792
793 itemName = "resfiles";
794 if (didescItem.HasMember(itemName)) {
795 if (didescItem[itemName].IsString()) {
796 fileName = didescItem[itemName].GetString();
797 if (fileName.size() && fileName[0] == '@') {
798 didesc->setInputfilesFile(fileName.erase(0, 1));
799 } else {
800 if (didesc->getFilenamesRegexString().empty() ||
801 std::regex_match(fileName, didesc->getFilenamesRegex())) {
802 didesc->addFileNameHolder(makeFileNameHolder(fileName));
803 }
804 }
805 } else if (didescItem[itemName].IsArray()) {
806 auto fns = didescItem[itemName].GetArray();
807 for (auto& fn : fns) {
808 if (didesc->getFilenamesRegexString().empty() ||
809 std::regex_match(fn.GetString(), didesc->getFilenamesRegex())) {
810 didesc->addFileNameHolder(makeFileNameHolder(fn.GetString()));
811 }
812 }
813 } else {
814 LOGP(error, "Check the JSON document! Item \"{}\" must be a string or an array!", itemName);
815 return false;
816 }
817 } else {
818 didesc->setInputfilesFile(minputfilesFilePtr);
819 }
820
821 // fill mfilenames and add InputDescriptor to InputDirector
822 if (didesc->fillInputfiles() > 0) {
823 mdataInputDescriptors.emplace_back(didesc);
824 } else {
825 didesc->printOut();
826 LOGP(info, "This DataInputDescriptor is ignored because its file list is empty!");
827 }
828 mAlienSupport &= didesc->isAlienSupportOn();
829 }
830 }
831
832 // add a default DataInputDescriptor
834
835 // check that all DataInputDescriptors have the same number of input files
836 if (!isValid()) {
837 printOut();
838 return false;
839 }
840
841 // print the DataIputDirector
842 if (mDebugMode) {
843 printOut();
844 }
845
846 return true;
847}
848
850{
851 DataInputDescriptor* result = nullptr;
852
853 // compute list of matching outputs
855
856 for (auto didesc : mdataInputDescriptors) {
857 if (didesc->matcher->match(dh, context)) {
858 result = didesc;
859 break;
860 }
861 }
862
863 return result;
864}
865
866arrow::dataset::FileSource DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF)
867{
868 auto didesc = getDataInputDescriptor(dh);
869 // if NOT match then use defaultDataInputDescriptor
870 if (!didesc) {
871 didesc = mdefaultDataInputDescriptor;
872 }
873 std::string origin = dh.dataOrigin.as<std::string>();
874 int wantedLevel = mContext.levelForOrigin(origin);
875
876 return didesc->getFileFolder(counter, numTF, wantedLevel, origin);
877}
878
880{
881 auto didesc = getDataInputDescriptor(dh);
882 // if NOT match then use defaultDataInputDescriptor
883 if (!didesc) {
884 didesc = mdefaultDataInputDescriptor;
885 }
886
887 return didesc->getTimeFramesInFile(counter);
888}
889
891{
892 auto didesc = getDataInputDescriptor(dh);
893 // if NOT match then use defaultDataInputDescriptor
894 if (!didesc) {
895 didesc = mdefaultDataInputDescriptor;
896 }
897 std::string origin = dh.dataOrigin.as<std::string>();
898 int wantedLevel = mContext.levelForOrigin(origin);
899
900 return didesc->getTimeFrameNumber(counter, numTF, wantedLevel, origin);
901}
902
903bool DataInputDirector::readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed)
904{
905 std::string treename;
906
907 auto didesc = getDataInputDescriptor(dh);
908 if (didesc) {
909 // if match then use filename and treename from DataInputDescriptor
910 treename = didesc->treename;
911 } else {
912 // if NOT match then use
913 // . filename from defaultDataInputDescriptor
914 // . treename from DataHeader
915 didesc = mdefaultDataInputDescriptor;
916 treename = aod::datamodel::getTreeName(dh);
917 }
918 std::string origin = dh.dataOrigin.as<std::string>();
919
920 auto result = didesc->readTree(outputs, dh, counter, numTF, treename, totalSizeCompressed, totalSizeUncompressed);
921 return result;
922}
923
925{
926 mdefaultDataInputDescriptor->closeInputFile();
927 for (auto didesc : mdataInputDescriptors) {
928 didesc->closeInputFile();
929 }
930}
931
932bool DataInputDirector::isValid()
933{
934 bool status = true;
935 int numberFiles = mdefaultDataInputDescriptor->getNumberInputfiles();
936 for (auto didesc : mdataInputDescriptors) {
937 status &= didesc->getNumberInputfiles() == numberFiles;
938 }
939
940 return status;
941}
942
944{
945 bool status = mdefaultDataInputDescriptor->getNumberInputfiles() <= counter;
946 for (auto didesc : mdataInputDescriptors) {
947 status &= (didesc->getNumberInputfiles() <= counter);
948 }
949
950 return status;
951}
952
954{
955 LOGP(info, "DataInputDirector");
956 LOGP(info, " Default input files file : {}", minputfilesFile);
957 LOGP(info, " Default file name regex : {}", mFilenameRegex);
958 LOGP(info, " Default file names : {}", mdefaultInputFiles.size());
959 for (auto const& fn : mdefaultInputFiles) {
960 LOGP(info, " {} {}", fn->fileName, fn->numberOfTimeFrames);
961 }
962 LOGP(info, " Default DataInputDescriptor:");
963 mdefaultDataInputDescriptor->printOut();
964 LOGP(info, " DataInputDescriptors : {}", getNumberInputDescriptors());
965 for (auto const& didesc : mdataInputDescriptors) {
966 didesc->printOut();
967 }
968}
969
970} // namespace o2::framework
header::DataOrigin origin
o2::monitoring::tags::Value Value
std::vector< std::shared_ptr< arrow::Field > > fields
std::ostringstream debug
int32_t i
uint16_t pos
Definition RawData.h:3
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:490
#define O2_SIGNPOST_ACTION(log, callback)
Definition Signpost.h:511
StringRef key
decltype(auto) make(const Output &spec, Args... args)
uint64_t getTimeFrameNumber(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin)
void addFileNameHolder(FileNameHolder *fn)
bool readTree(DataAllocator &outputs, header::DataHeader dh, int counter, int numTF, std::string treename, size_t &totalSizeCompressed, size_t &totalSizeUncompressed)
arrow::dataset::FileSource getFileFolder(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin)
std::pair< DataInputDescriptor *, int > navigateToLevel(int counter, int numTF, int wantedParentLevel, std::string_view wantedOrigin)
void setDefaultInputfiles(std::vector< FileNameHolder * > *difnptr)
void setInputfilesFile(std::string dffn)
DataInputDescriptor(bool alienSupport, int level, DataInputDirectorContext &context)
DataInputDescriptor * getParentFile(int counter, int numTF, std::string treename, int wantedParentLevel, std::string_view wantedOrigin)
bool setFile(int counter, int wantedParentLevel, std::string_view wantedOrigin)
int findDFNumber(int file, std::string dfName)
bool readTree(DataAllocator &outputs, header::DataHeader dh, int counter, int numTF, size_t &totalSizeCompressed, size_t &totalSizeUncompressed)
DataInputDescriptor * getDataInputDescriptor(header::DataHeader dh)
DataInputDirector(std::vector< std::string > inputFiles, DataInputDirectorContext &&context)
arrow::dataset::FileSource getFileFolder(header::DataHeader dh, int counter, int numTF)
void setInputfilesFile(std::string iffn)
int getTimeFramesInFile(header::DataHeader dh, int counter)
uint64_t getTimeFrameNumber(header::DataHeader dh, int counter, int numTF)
void setFilenamesRegex(std::string dfn)
bool readJson(std::string const &fnjson)
std::function< std::shared_ptr< arrow::io::OutputStream >(std::shared_ptr< arrow::dataset::FileFragment >, const std::shared_ptr< arrow::ResizableBuffer > &buffer)> StreamerCreator
void setLabel(const char *label)
bool match(const std::vector< std::string > &queries, const char *pattern)
Definition dcs-ccdb.cxx:229
const GLfloat * m
Definition glcorearb.h:4066
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLuint const GLchar * name
Definition glcorearb.h:781
GLdouble f
Definition glcorearb.h:310
GLenum target
Definition glcorearb.h:1641
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLuint start
Definition glcorearb.h:469
GLint GLint GLsizei GLint GLenum format
Definition glcorearb.h:275
GLuint counter
Definition glcorearb.h:3987
std::string getTreeName(header::DataHeader dh)
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
FileNameHolder * makeFileNameHolder(std::string fileName)
std::string filename()
void empty(int)
static std::unique_ptr< data_matcher::DataDescriptorMatcher > buildNode(std::string const &nodeString)
static std::vector< std::string > getTokens(std::string const &nodeString)
o2::monitoring::Monitoring * monitoring
std::vector< std::pair< std::string, TFile * > > openFiles
int levelForOrigin(std::string_view origin) const
static std::vector< LoadablePlugin > parsePluginSpecString(char const *str)
Parse a comma separated list of <library>:<plugin-name> plugin declarations.
std::vector< RootObjectReadingCapability > capabilities
the main header struct
Definition DataHeader.h:620
std::enable_if_t< std::is_same< T, std::string >::value==true, T > as() const
get the descriptor as std::string
Definition DataHeader.h:301
const std::string str