Project
Loading...
Searching...
No Matches
DataInputDirector.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "DataInputDirector.h"
13#include "Framework/Logger.h"
17#include "Framework/Output.h"
18#include "Framework/Signpost.h"
19#include "Headers/DataHeader.h"
21#include "Monitoring/Tags.h"
22#include "Monitoring/Metric.h"
23#include "Monitoring/Monitoring.h"
24
25#include "rapidjson/document.h"
26#include "rapidjson/prettywriter.h"
27#include "rapidjson/filereadstream.h"
28
29#include "TGrid.h"
30#include "TObjString.h"
31#include "TMap.h"
32#include "TFile.h"
33
34#include <arrow/dataset/file_base.h>
35#include <arrow/dataset/dataset.h>
36#include <uv.h>
37#include <memory>
38
39#if __has_include(<TJAlienFile.h>)
40#include <TJAlienFile.h>
41
42#include <utility>
43#endif
44
45#include <dlfcn.h>
46O2_DECLARE_DYNAMIC_LOG(reader_memory_dump);
47
48namespace o2::framework
49{
50using namespace rapidjson;
51
52FileNameHolder* makeFileNameHolder(std::string fileName)
53{
54 auto fileNameHolder = new FileNameHolder();
55 fileNameHolder->fileName = fileName;
56
57 return fileNameHolder;
58}
59
60DataInputDescriptor::DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement)
61 : mAlienSupport(alienSupport),
62 mMonitoring(monitoring),
63 mAllowedParentLevel(allowedParentLevel),
64 mParentFileReplacement(std::move(parentFileReplacement)),
65 mLevel(level)
66{
67 std::vector<char const*> capabilitiesSpecs = {
68 "O2Framework:RNTupleObjectReadingCapability",
69 "O2Framework:TTreeObjectReadingCapability",
70 };
71
72 std::vector<LoadablePlugin> plugins;
73 for (auto spec : capabilitiesSpecs) {
74 auto morePlugins = PluginManager::parsePluginSpecString(spec);
75 for (auto& extra : morePlugins) {
76 plugins.push_back(extra);
77 }
78 }
79
80 PluginManager::loadFromPlugin<RootObjectReadingCapability, RootObjectReadingCapabilityPlugin>(plugins, mFactory.capabilities);
81}
82
84{
85 LOGP(info, "DataInputDescriptor");
86 LOGP(info, " Table name : {}", tablename);
87 LOGP(info, " Tree name : {}", treename);
88 LOGP(info, " Input files file : {}", getInputfilesFilename());
89 LOGP(info, " File name regex : {}", getFilenamesRegexString());
90 LOGP(info, " Input files : {}", mfilenames.size());
91 for (auto fn : mfilenames) {
92 LOGP(info, " {} {}", fn->fileName, fn->numberOfTimeFrames);
93 }
94 LOGP(info, " Total number of TF: {}", getNumberTimeFrames());
95}
96
98{
99 return (minputfilesFile.empty() && minputfilesFilePtr) ? (std::string)*minputfilesFilePtr : minputfilesFile;
100}
101
103{
104 return (mFilenameRegex.empty() && mFilenameRegexPtr) ? (std::string)*mFilenameRegexPtr : mFilenameRegex;
105}
106
108{
109 return std::regex(getFilenamesRegexString());
110}
111
113{
114 // remove leading file:// from file name
115 if (fn->fileName.rfind("file://", 0) == 0) {
116 fn->fileName.erase(0, 7);
117 } else if (!mAlienSupport && fn->fileName.rfind("alien://", 0) == 0 && !gGrid) {
118 LOGP(debug, "AliEn file requested. Enabling support.");
119 TGrid::Connect("alien://");
120 mAlienSupport = true;
121 }
122
123 mtotalNumberTimeFrames += fn->numberOfTimeFrames;
124 mfilenames.emplace_back(fn);
125}
126
127bool DataInputDescriptor::setFile(int counter, std::string_view origin)
128{
129 // no files left
130 if (counter >= getNumberInputfiles()) {
131 return false;
132 }
133
134 // In case the origin starts with a anything but AOD, we add the origin as the suffix
135 // of the filename. In the future we might expand this for proper rewriting of the
136 // filename based on the origin and the original file information.
137 std::string filename = mfilenames[counter]->fileName;
138 if (!origin.starts_with("AOD")) {
139 filename = std::regex_replace(filename, std::regex("[.]root$"), fmt::format("_{}.root", origin));
140 }
141
142 // open file
143 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
144 if (rootFS.get()) {
145 if (rootFS->GetFile()->GetName() == filename) {
146 return true;
147 }
149 }
150
151 mCurrentFilesystem = std::make_shared<TFileFileSystem>(TFile::Open(filename.c_str()), 50 * 1024 * 1024, mFactory);
152 if (!mCurrentFilesystem.get()) {
153 throw std::runtime_error(fmt::format("Couldn't open file \"{}\"!", filename));
154 }
155 rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
157
158 // get the parent file map if exists
159 mParentFileMap = (TMap*)rootFS->GetFile()->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path)
160 if (mParentFileMap && !mParentFileReplacement.empty()) {
161 auto pos = mParentFileReplacement.find(';');
162 if (pos == std::string::npos) {
163 throw std::runtime_error(fmt::format("Invalid syntax in aod-parent-base-path-replacement: \"{}\"", mParentFileReplacement.c_str()));
164 }
165 auto from = mParentFileReplacement.substr(0, pos);
166 auto to = mParentFileReplacement.substr(pos + 1);
167
168 auto it = mParentFileMap->MakeIterator();
169 while (auto obj = it->Next()) {
170 auto objString = (TObjString*)mParentFileMap->GetValue(obj);
171 objString->String().ReplaceAll(from.c_str(), to.c_str());
172 }
173 delete it;
174 }
175
176 // get the directory names
177 if (mfilenames[counter]->numberOfTimeFrames <= 0) {
178 const std::regex TFRegex = std::regex("/?DF_([0-9]+)(|-.*)$");
179 TList* keyList = rootFS->GetFile()->GetListOfKeys();
180 std::vector<std::string> finalList;
181
182 // extract TF numbers and sort accordingly
183 // We use an extra seen set to make sure we preserve the order in which
184 // we instert things in the final list and to make sure we do not have duplicates.
185 // Multiple folder numbers can happen if we use a flat structure /DF_<df>-<tablename>
186 std::unordered_set<size_t> seen;
187 for (auto key : *keyList) {
188 std::smatch matchResult;
189 std::string keyName = ((TObjString*)key)->GetString().Data();
190 bool match = std::regex_match(keyName, matchResult, TFRegex);
191 if (match) {
192 auto folderNumber = std::stoul(matchResult[1].str());
193 if (seen.find(folderNumber) == seen.end()) {
194 seen.insert(folderNumber);
195 mfilenames[counter]->listOfTimeFrameNumbers.emplace_back(folderNumber);
196 }
197 }
198 }
199
200 if (mParentFileMap != nullptr) {
201 // If we have a parent map, we should not process in DF alphabetical order but according to parent file to avoid swapping between files
202 std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end(),
203 [this](long const& l1, long const& l2) -> bool {
204 auto p1 = (TObjString*)this->mParentFileMap->GetValue(("DF_" + std::to_string(l1)).c_str());
205 auto p2 = (TObjString*)this->mParentFileMap->GetValue(("DF_" + std::to_string(l2)).c_str());
206 return p1->GetString().CompareTo(p2->GetString()) < 0;
207 });
208 } else {
209 std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end());
210 }
211
212 mfilenames[counter]->alreadyRead.resize(mfilenames[counter]->alreadyRead.size() + mfilenames[counter]->listOfTimeFrameNumbers.size(), false);
213 mfilenames[counter]->numberOfTimeFrames = mfilenames[counter]->listOfTimeFrameNumbers.size();
214 }
215
216 mCurrentFileID = counter;
217 mCurrentFileStartedAt = uv_hrtime();
218 mIOTime = 0;
219
220 return true;
221}
222
223uint64_t DataInputDescriptor::getTimeFrameNumber(int counter, int numTF, std::string_view origin)
224{
225
226 // open file
227 if (!setFile(counter, origin)) {
228 return 0ul;
229 }
230
231 // no TF left
232 if (mfilenames[counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[counter]->numberOfTimeFrames) {
233 return 0ul;
234 }
235
236 return (mfilenames[counter]->listOfTimeFrameNumbers)[numTF];
237}
238
239arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int numTF, std::string_view origin)
240{
241 // open file
242 if (!setFile(counter, origin)) {
243 return {};
244 }
245
246 // no TF left
247 if (mfilenames[counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[counter]->numberOfTimeFrames) {
248 return {};
249 }
250
251 mfilenames[counter]->alreadyRead[numTF] = true;
252
253 return {fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]), mCurrentFilesystem};
254}
255
256DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, std::string treename, std::string_view origin)
257{
258 if (!mParentFileMap) {
259 // This file has no parent map
260 return nullptr;
261 }
262 auto folderName = fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]);
263 auto parentFileName = (TObjString*)mParentFileMap->GetValue(folderName.c_str());
264 // The current DF is not found in the parent map (this should not happen and is a fatal error)
265 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
266 if (!parentFileName) {
267 throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), rootFS->GetFile()->GetName()));
268 return nullptr;
269 }
270
271 if (mParentFile) {
272 // Is this still the corresponding to the correct file?
273 auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(mParentFile->mCurrentFilesystem);
274 if (parentFileName->GetString().CompareTo(parentRootFS->GetFile()->GetName()) == 0) {
275 return mParentFile;
276 } else {
277 mParentFile->closeInputFile();
278 delete mParentFile;
279 mParentFile = nullptr;
280 }
281 }
282
283 if (mLevel == mAllowedParentLevel) {
284 throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(),
285 rootFS->GetFile()->GetName()));
286 }
287
288 LOGP(info, "Opening parent file {} for DF {}", parentFileName->GetString().Data(), folderName.c_str());
289 mParentFile = new DataInputDescriptor(mAlienSupport, mLevel + 1, mMonitoring, mAllowedParentLevel, mParentFileReplacement);
290 mParentFile->mdefaultFilenamesPtr = new std::vector<FileNameHolder*>;
291 mParentFile->mdefaultFilenamesPtr->emplace_back(makeFileNameHolder(parentFileName->GetString().Data()));
292 mParentFile->fillInputfiles();
293 mParentFile->setFile(0, origin);
294 return mParentFile;
295}
296
298{
299 return mfilenames.at(counter)->numberOfTimeFrames;
300}
301
303{
304 auto& list = mfilenames.at(counter)->alreadyRead;
305 return std::count(list.begin(), list.end(), true);
306}
307
309{
310 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
311 auto f = dynamic_cast<TFile*>(rootFS->GetFile());
312 std::string monitoringInfo(fmt::format("lfn={},size={}", f->GetName(), f->GetSize()));
313#if __has_include(<TJAlienFile.h>)
314 auto alienFile = dynamic_cast<TJAlienFile*>(f);
315 if (alienFile) {
316 monitoringInfo += fmt::format(",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed());
317 }
318#endif
319 mMonitoring->send(o2::monitoring::Metric{monitoringInfo, "aod-file-open-info"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
320 LOGP(info, "Opening file: {}", monitoringInfo);
321}
322
324{
325 int64_t wait_time = (int64_t)uv_hrtime() - (int64_t)mCurrentFileStartedAt - (int64_t)mIOTime;
326 if (wait_time < 0) {
327 wait_time = 0;
328 }
329 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
330 auto f = dynamic_cast<TFile*>(rootFS->GetFile());
331 std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", f->GetName(),
332 f->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), f->GetBytesRead(), f->GetReadCalls(),
333 ((float)mIOTime / 1e9), ((float)wait_time / 1e9), mLevel));
334#if __has_include(<TJAlienFile.h>)
335 auto alienFile = dynamic_cast<TJAlienFile*>(f);
336 if (alienFile) {
337 monitoringInfo += fmt::format(",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed());
338 }
339#endif
340 mMonitoring->send(o2::monitoring::Metric{monitoringInfo, "aod-file-read-info"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
341 LOGP(info, "Read info: {}", monitoringInfo);
342}
343
345{
346 if (mCurrentFilesystem.get()) {
347 if (mParentFile) {
348 mParentFile->closeInputFile();
349 delete mParentFile;
350 mParentFile = nullptr;
351 }
352
353 delete mParentFileMap;
354 mParentFileMap = nullptr;
355
357 mCurrentFilesystem.reset();
358 }
359}
360
362{
363 if (getNumberInputfiles() > 0) {
364 // 1. mfilenames
365 return getNumberInputfiles();
366 }
367
368 auto fileName = getInputfilesFilename();
369 if (!fileName.empty()) {
370 // 2. getFilenamesRegex() @ getInputfilesFilename()
371 try {
372 std::ifstream filelist(fileName);
373 if (!filelist.is_open()) {
374 throw std::runtime_error(fmt::format(R"(Couldn't open file "{}")", fileName));
375 }
376 while (std::getline(filelist, fileName)) {
377 // remove white spaces, empty lines are skipped
378 fileName.erase(std::remove_if(fileName.begin(), fileName.end(), ::isspace), fileName.end());
379 if (!fileName.empty() && (getFilenamesRegexString().empty() ||
380 std::regex_match(fileName, getFilenamesRegex()))) {
382 }
383 }
384 } catch (...) {
385 LOGP(error, "Check the input files file! Unable to process \"{}\"!", getInputfilesFilename());
386 return 0;
387 }
388 } else {
389 // 3. getFilenamesRegex() @ mdefaultFilenamesPtr
390 if (mdefaultFilenamesPtr) {
391 for (auto fileNameHolder : *mdefaultFilenamesPtr) {
392 if (getFilenamesRegexString().empty() ||
393 std::regex_match(fileNameHolder->fileName, getFilenamesRegex())) {
394 addFileNameHolder(fileNameHolder);
395 }
396 }
397 }
398 }
399
400 return getNumberInputfiles();
401}
402
403int DataInputDescriptor::findDFNumber(int file, std::string dfName)
404{
405 auto dfList = mfilenames[file]->listOfTimeFrameNumbers;
406 auto it = std::find_if(dfList.begin(), dfList.end(), [dfName](size_t i) { return fmt::format("DF_{}", i) == dfName; });
407 if (it == dfList.end()) {
408 return -1;
409 }
410 return it - dfList.begin();
411}
412
415 : mTarget(target)
416 {
417 start = uv_hrtime();
418 }
420 {
421 if (!active) {
422 return;
423 }
424 O2_SIGNPOST_ACTION(reader_memory_dump, [](void*) {
425 void (*dump_)(const char*);
426 if (void* sym = dlsym(nullptr, "igprof_dump_now")) {
427 dump_ = __extension__(void (*)(const char*)) sym;
428 if (dump_) {
429 std::string filename = fmt::format("reader-memory-dump-{}.gz", uv_hrtime());
430 dump_(filename.c_str());
431 }
432 }
433 });
434 mTarget += (uv_hrtime() - start);
435 }
436
438 {
439 active = false;
440 }
441
442 bool active = true;
443 uint64_t& mTarget;
444 uint64_t start;
445 uint64_t stop;
446};
447
448bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, std::string treename, size_t& totalSizeCompressed, size_t& totalSizeUncompressed)
449{
450 CalculateDelta t(mIOTime);
451 std::string origin = dh.dataOrigin.as<std::string>();
452 auto folder = getFileFolder(counter, numTF, origin);
453 if (!folder.filesystem()) {
454 t.deactivate();
455 return false;
456 }
457
458 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(folder.filesystem());
459
460 if (!rootFS) {
461 t.deactivate();
462 throw std::runtime_error(fmt::format(R"(Not a TFile filesystem!)"));
463 }
464 // FIXME: Ugly. We should detect the format from the treename, good enough for now.
465 std::shared_ptr<arrow::dataset::FileFormat> format;
466 FragmentToBatch::StreamerCreator creator = nullptr;
467
468 auto fullpath = arrow::dataset::FileSource{folder.path() + "/" + treename, folder.filesystem()};
469
470 for (auto& capability : mFactory.capabilities) {
471 auto objectPath = capability.lfn2objectPath(fullpath.path());
472 void* handle = capability.getHandle(rootFS, objectPath);
473 if (handle) {
474 format = capability.factory().format();
475 creator = capability.factory().deferredOutputStreamer;
476 break;
477 }
478 }
479
480 // FIXME: we should distinguish between an actually missing object and one which has a non compatible
481 // format.
482 if (!format) {
483 t.deactivate();
484 LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.path());
485 auto parentFile = getParentFile(counter, numTF, treename, origin);
486 if (parentFile != nullptr) {
487 int parentNumTF = parentFile->findDFNumber(0, folder.path());
488 if (parentNumTF == -1) {
489 auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(parentFile->mCurrentFilesystem);
490 throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", folder.path(), parentRootFS->GetFile()->GetName()));
491 }
492 // first argument is 0 as the parent file object contains only 1 file
493 return parentFile->readTree(outputs, dh, 0, parentNumTF, treename, totalSizeCompressed, totalSizeUncompressed);
494 }
495 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
496 throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fullpath.path(), rootFS->GetFile()->GetName()));
497 }
498
499 auto schemaOpt = format->Inspect(fullpath);
500 auto physicalSchema = schemaOpt;
501 std::vector<std::shared_ptr<arrow::Field>> fields;
502 for (auto& original : (*schemaOpt)->fields()) {
503 if (original->name().ends_with("_size")) {
504 continue;
505 }
506 fields.push_back(original);
507 }
508 auto datasetSchema = std::make_shared<arrow::Schema>(fields);
509
510 auto fragment = format->MakeFragment(fullpath, {}, *physicalSchema);
511
512 // create table output
513 auto o = Output(dh);
514
515 // FIXME: This should allow me to create a memory pool
516 // which I can then use to scan the dataset.
517 auto f2b = outputs.make<FragmentToBatch>(o, creator, *fragment);
518
521 f2b->setLabel(treename.c_str());
522 f2b->fill(datasetSchema, format);
523
524 return true;
525}
526
531
532DataInputDirector::DataInputDirector(std::string inputFile, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement) : mMonitoring(monitoring), mAllowedParentLevel(allowedParentLevel), mParentFileReplacement(std::move(parentFileReplacement))
533{
534 if (inputFile.size() && inputFile[0] == '@') {
535 inputFile.erase(0, 1);
536 setInputfilesFile(inputFile);
537 } else {
538 mdefaultInputFiles.emplace_back(makeFileNameHolder(inputFile));
539 }
540
542}
543
544DataInputDirector::DataInputDirector(std::vector<std::string> inputFiles, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement) : mMonitoring(monitoring), mAllowedParentLevel(allowedParentLevel), mParentFileReplacement(std::move(parentFileReplacement))
545{
546 for (auto inputFile : inputFiles) {
547 mdefaultInputFiles.emplace_back(makeFileNameHolder(inputFile));
548 }
549
551}
552
554{
555 for (auto fn : mdefaultInputFiles) {
556 delete fn;
557 }
558 mdefaultInputFiles.clear();
559 mdefaultDataInputDescriptor = nullptr;
560
561 for (auto fn : mdataInputDescriptors) {
562 delete fn;
563 }
564 mdataInputDescriptors.clear();
565}
566
568{
569 mdataInputDescriptors.clear();
570 mdefaultInputFiles.clear();
571 mFilenameRegex = std::string("");
572};
573
575{
576 if (mdefaultDataInputDescriptor) {
577 delete mdefaultDataInputDescriptor;
578 }
579 mdefaultDataInputDescriptor = new DataInputDescriptor(mAlienSupport, 0, mMonitoring, mAllowedParentLevel, mParentFileReplacement);
580
581 mdefaultDataInputDescriptor->setInputfilesFile(minputfilesFile);
582 mdefaultDataInputDescriptor->setFilenamesRegex(mFilenameRegex);
583 mdefaultDataInputDescriptor->setDefaultInputfiles(&mdefaultInputFiles);
584 mdefaultDataInputDescriptor->tablename = "any";
585 mdefaultDataInputDescriptor->treename = "any";
586 mdefaultDataInputDescriptor->fillInputfiles();
587
588 mAlienSupport &= mdefaultDataInputDescriptor->isAlienSupportOn();
589}
590
591bool DataInputDirector::readJson(std::string const& fnjson)
592{
593 // open the file
594 FILE* f = fopen(fnjson.c_str(), "r");
595 if (!f) {
596 LOGP(error, "Could not open JSON file \"{}\"!", fnjson);
597 return false;
598 }
599
600 // create streamer
601 char readBuffer[65536];
602 FileReadStream inputStream(f, readBuffer, sizeof(readBuffer));
603
604 // parse the json file
605 Document jsonDoc;
606 jsonDoc.ParseStream(inputStream);
607 auto status = readJsonDocument(&jsonDoc);
608
609 // clean up
610 fclose(f);
611
612 return status;
613}
614
615bool DataInputDirector::readJsonDocument(Document* jsonDoc)
616{
617 // initialisations
618 std::string fileName("");
619 const char* itemName;
620
621 // is it a proper json document?
622 if (jsonDoc->HasParseError()) {
623 LOGP(error, "Check the JSON document! There is a problem with the format!");
624 return false;
625 }
626
627 // InputDirector
628 itemName = "InputDirector";
629 const Value& didirItem = (*jsonDoc)[itemName];
630 if (!didirItem.IsObject()) {
631 LOGP(info, "No \"{}\" object found in the JSON document!", itemName);
632 return true;
633 }
634
635 // now read various items
636 itemName = "debugmode";
637 if (didirItem.HasMember(itemName)) {
638 if (didirItem[itemName].IsBool()) {
639 mDebugMode = (didirItem[itemName].GetBool());
640 } else {
641 LOGP(error, "Check the JSON document! Item \"{}\" must be a boolean!", itemName);
642 return false;
643 }
644 } else {
645 mDebugMode = false;
646 }
647
648 if (mDebugMode) {
649 StringBuffer buffer;
650 buffer.Clear();
651 PrettyWriter<StringBuffer> writer(buffer);
652 didirItem.Accept(writer);
653 LOGP(info, "InputDirector object: {}", std::string(buffer.GetString()));
654 }
655
656 itemName = "fileregex";
657 if (didirItem.HasMember(itemName)) {
658 if (didirItem[itemName].IsString()) {
659 setFilenamesRegex(didirItem[itemName].GetString());
660 } else {
661 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
662 return false;
663 }
664 }
665
666 itemName = "resfiles";
667 if (didirItem.HasMember(itemName)) {
668 if (didirItem[itemName].IsString()) {
669 fileName = didirItem[itemName].GetString();
670 if (fileName.size() && fileName[0] == '@') {
671 fileName.erase(0, 1);
672 setInputfilesFile(fileName);
673 } else {
675 mdefaultInputFiles.emplace_back(makeFileNameHolder(fileName));
676 }
677 } else if (didirItem[itemName].IsArray()) {
679 auto fns = didirItem[itemName].GetArray();
680 for (auto& fn : fns) {
681 mdefaultInputFiles.emplace_back(makeFileNameHolder(fn.GetString()));
682 }
683 } else {
684 LOGP(error, "Check the JSON document! Item \"{}\" must be a string or an array!", itemName);
685 return false;
686 }
687 }
688
689 itemName = "InputDescriptors";
690 if (didirItem.HasMember(itemName)) {
691 if (!didirItem[itemName].IsArray()) {
692 LOGP(error, "Check the JSON document! Item \"{}\" must be an array!", itemName);
693 return false;
694 }
695
696 // loop over DataInputDescriptors
697 for (auto& didescItem : didirItem[itemName].GetArray()) {
698 if (!didescItem.IsObject()) {
699 LOGP(error, "Check the JSON document! \"{}\" must be objects!", itemName);
700 return false;
701 }
702 // create a new dataInputDescriptor
703 auto didesc = new DataInputDescriptor(mAlienSupport, 0, mMonitoring, mAllowedParentLevel, mParentFileReplacement);
704 didesc->setDefaultInputfiles(&mdefaultInputFiles);
705
706 itemName = "table";
707 if (didescItem.HasMember(itemName)) {
708 if (didescItem[itemName].IsString()) {
709 didesc->tablename = didescItem[itemName].GetString();
710 didesc->matcher = DataDescriptorQueryBuilder::buildNode(didesc->tablename);
711 } else {
712 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
713 return false;
714 }
715 } else {
716 LOGP(error, "Check the JSON document! Item \"{}\" is missing!", itemName);
717 return false;
718 }
719
720 itemName = "treename";
721 if (didescItem.HasMember(itemName)) {
722 if (didescItem[itemName].IsString()) {
723 didesc->treename = didescItem[itemName].GetString();
724 } else {
725 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
726 return false;
727 }
728 } else {
729 auto m = DataDescriptorQueryBuilder::getTokens(didesc->tablename);
730 didesc->treename = m[2];
731 }
732
733 itemName = "fileregex";
734 if (didescItem.HasMember(itemName)) {
735 if (didescItem[itemName].IsString()) {
736 if (didesc->getNumberInputfiles() == 0) {
737 didesc->setFilenamesRegex(didescItem[itemName].GetString());
738 }
739 } else {
740 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
741 return false;
742 }
743 } else {
744 if (didesc->getNumberInputfiles() == 0) {
745 didesc->setFilenamesRegex(mFilenameRegexPtr);
746 }
747 }
748
749 itemName = "resfiles";
750 if (didescItem.HasMember(itemName)) {
751 if (didescItem[itemName].IsString()) {
752 fileName = didescItem[itemName].GetString();
753 if (fileName.size() && fileName[0] == '@') {
754 didesc->setInputfilesFile(fileName.erase(0, 1));
755 } else {
756 if (didesc->getFilenamesRegexString().empty() ||
757 std::regex_match(fileName, didesc->getFilenamesRegex())) {
758 didesc->addFileNameHolder(makeFileNameHolder(fileName));
759 }
760 }
761 } else if (didescItem[itemName].IsArray()) {
762 auto fns = didescItem[itemName].GetArray();
763 for (auto& fn : fns) {
764 if (didesc->getFilenamesRegexString().empty() ||
765 std::regex_match(fn.GetString(), didesc->getFilenamesRegex())) {
766 didesc->addFileNameHolder(makeFileNameHolder(fn.GetString()));
767 }
768 }
769 } else {
770 LOGP(error, "Check the JSON document! Item \"{}\" must be a string or an array!", itemName);
771 return false;
772 }
773 } else {
774 didesc->setInputfilesFile(minputfilesFilePtr);
775 }
776
777 // fill mfilenames and add InputDescriptor to InputDirector
778 if (didesc->fillInputfiles() > 0) {
779 mdataInputDescriptors.emplace_back(didesc);
780 } else {
781 didesc->printOut();
782 LOGP(info, "This DataInputDescriptor is ignored because its file list is empty!");
783 }
784 mAlienSupport &= didesc->isAlienSupportOn();
785 }
786 }
787
788 // add a default DataInputDescriptor
790
791 // check that all DataInputDescriptors have the same number of input files
792 if (!isValid()) {
793 printOut();
794 return false;
795 }
796
797 // print the DataIputDirector
798 if (mDebugMode) {
799 printOut();
800 }
801
802 return true;
803}
804
806{
807 DataInputDescriptor* result = nullptr;
808
809 // compute list of matching outputs
811
812 for (auto didesc : mdataInputDescriptors) {
813 if (didesc->matcher->match(dh, context)) {
814 result = didesc;
815 break;
816 }
817 }
818
819 return result;
820}
821
822arrow::dataset::FileSource DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF)
823{
824 auto didesc = getDataInputDescriptor(dh);
825 // if NOT match then use defaultDataInputDescriptor
826 if (!didesc) {
827 didesc = mdefaultDataInputDescriptor;
828 }
829 std::string origin = dh.dataOrigin.as<std::string>();
830
831 return didesc->getFileFolder(counter, numTF, origin);
832}
833
835{
836 auto didesc = getDataInputDescriptor(dh);
837 // if NOT match then use defaultDataInputDescriptor
838 if (!didesc) {
839 didesc = mdefaultDataInputDescriptor;
840 }
841
842 return didesc->getTimeFramesInFile(counter);
843}
844
846{
847 auto didesc = getDataInputDescriptor(dh);
848 // if NOT match then use defaultDataInputDescriptor
849 if (!didesc) {
850 didesc = mdefaultDataInputDescriptor;
851 }
852 std::string origin = dh.dataOrigin.as<std::string>();
853
854 return didesc->getTimeFrameNumber(counter, numTF, origin);
855}
856
857bool DataInputDirector::readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed)
858{
859 std::string treename;
860
861 auto didesc = getDataInputDescriptor(dh);
862 if (didesc) {
863 // if match then use filename and treename from DataInputDescriptor
864 treename = didesc->treename;
865 } else {
866 // if NOT match then use
867 // . filename from defaultDataInputDescriptor
868 // . treename from DataHeader
869 didesc = mdefaultDataInputDescriptor;
870 treename = aod::datamodel::getTreeName(dh);
871 }
872 std::string origin = dh.dataOrigin.as<std::string>();
873
874 auto result = didesc->readTree(outputs, dh, counter, numTF, treename, totalSizeCompressed, totalSizeUncompressed);
875 return result;
876}
877
879{
880 mdefaultDataInputDescriptor->closeInputFile();
881 for (auto didesc : mdataInputDescriptors) {
882 didesc->closeInputFile();
883 }
884}
885
886bool DataInputDirector::isValid()
887{
888 bool status = true;
889 int numberFiles = mdefaultDataInputDescriptor->getNumberInputfiles();
890 for (auto didesc : mdataInputDescriptors) {
891 status &= didesc->getNumberInputfiles() == numberFiles;
892 }
893
894 return status;
895}
896
898{
899 bool status = mdefaultDataInputDescriptor->getNumberInputfiles() <= counter;
900 for (auto didesc : mdataInputDescriptors) {
901 status &= (didesc->getNumberInputfiles() <= counter);
902 }
903
904 return status;
905}
906
908{
909 LOGP(info, "DataInputDirector");
910 LOGP(info, " Default input files file : {}", minputfilesFile);
911 LOGP(info, " Default file name regex : {}", mFilenameRegex);
912 LOGP(info, " Default file names : {}", mdefaultInputFiles.size());
913 for (auto const& fn : mdefaultInputFiles) {
914 LOGP(info, " {} {}", fn->fileName, fn->numberOfTimeFrames);
915 }
916 LOGP(info, " Default DataInputDescriptor:");
917 mdefaultDataInputDescriptor->printOut();
918 LOGP(info, " DataInputDescriptors : {}", getNumberInputDescriptors());
919 for (auto const& didesc : mdataInputDescriptors) {
920 didesc->printOut();
921 }
922}
923
924} // namespace o2::framework
o2::monitoring::tags::Value Value
int32_t i
bool o
uint16_t pos
Definition RawData.h:3
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ACTION(log, callback)
Definition Signpost.h:510
std::ostringstream debug
StringRef key
decltype(auto) make(const Output &spec, Args... args)
void addFileNameHolder(FileNameHolder *fn)
bool readTree(DataAllocator &outputs, header::DataHeader dh, int counter, int numTF, std::string treename, size_t &totalSizeCompressed, size_t &totalSizeUncompressed)
uint64_t getTimeFrameNumber(int counter, int numTF, std::string_view origin)
DataInputDescriptor * getParentFile(int counter, int numTF, std::string treename, std::string_view origin)
DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring *monitoring=nullptr, int allowedParentLevel=0, std::string parentFileReplacement="")
void setDefaultInputfiles(std::vector< FileNameHolder * > *difnptr)
void setInputfilesFile(std::string dffn)
int findDFNumber(int file, std::string dfName)
arrow::dataset::FileSource getFileFolder(int counter, int numTF, std::string_view origin)
bool setFile(int counter, std::string_view origin)
bool readTree(DataAllocator &outputs, header::DataHeader dh, int counter, int numTF, size_t &totalSizeCompressed, size_t &totalSizeUncompressed)
DataInputDescriptor * getDataInputDescriptor(header::DataHeader dh)
arrow::dataset::FileSource getFileFolder(header::DataHeader dh, int counter, int numTF)
void setInputfilesFile(std::string iffn)
int getTimeFramesInFile(header::DataHeader dh, int counter)
uint64_t getTimeFrameNumber(header::DataHeader dh, int counter, int numTF)
void setFilenamesRegex(std::string dfn)
bool readJson(std::string const &fnjson)
std::function< std::shared_ptr< arrow::io::OutputStream >(std::shared_ptr< arrow::dataset::FileFragment >, const std::shared_ptr< arrow::ResizableBuffer > &buffer)> StreamerCreator
void setLabel(const char *label)
bool match(const std::vector< std::string > &queries, const char *pattern)
Definition dcs-ccdb.cxx:229
const GLfloat * m
Definition glcorearb.h:4066
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLdouble f
Definition glcorearb.h:310
GLenum target
Definition glcorearb.h:1641
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLuint start
Definition glcorearb.h:469
GLint GLint GLsizei GLint GLenum format
Definition glcorearb.h:275
GLuint counter
Definition glcorearb.h:3987
std::string getTreeName(header::DataHeader dh)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
FileNameHolder * makeFileNameHolder(std::string fileName)
Defining DataPointCompositeObject explicitly as copiable.
std::string filename()
void empty(int)
Definition list.h:40
static std::unique_ptr< data_matcher::DataDescriptorMatcher > buildNode(std::string const &nodeString)
static std::vector< std::string > getTokens(std::string const &nodeString)
static std::vector< LoadablePlugin > parsePluginSpecString(char const *str)
Parse a comma separated list of <library>:<plugin-name> plugin declarations.
std::vector< RootObjectReadingCapability > capabilities
the main header struct
Definition DataHeader.h:618
std::enable_if_t< std::is_same< T, std::string >::value==true, T > as() const
get the descriptor as std::string
Definition DataHeader.h:301
const std::string str