Project
Loading...
Searching...
No Matches
DataInputDirector.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "DataInputDirector.h"
13#include "Framework/Logger.h"
17#include "Framework/Output.h"
18#include "Framework/Signpost.h"
19#include "Headers/DataHeader.h"
21#include "Monitoring/Tags.h"
22#include "Monitoring/Metric.h"
23#include "Monitoring/Monitoring.h"
24
25#include "rapidjson/document.h"
26#include "rapidjson/prettywriter.h"
27#include "rapidjson/filereadstream.h"
28
29#include "TGrid.h"
30#include "TObjString.h"
31#include "TMap.h"
32#include "TFile.h"
33
34#include <arrow/dataset/file_base.h>
35#include <arrow/dataset/dataset.h>
36#include <uv.h>
37#include <memory>
38
39#if __has_include(<TJAlienFile.h>)
40#include <TJAlienFile.h>
41
42#include <utility>
43#endif
44
45#include <dlfcn.h>
46O2_DECLARE_DYNAMIC_LOG(reader_memory_dump);
47
48namespace o2::framework
49{
50using namespace rapidjson;
51
52FileNameHolder* makeFileNameHolder(std::string fileName)
53{
54 auto fileNameHolder = new FileNameHolder();
55 fileNameHolder->fileName = fileName;
56
57 return fileNameHolder;
58}
59
60DataInputDescriptor::DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement)
61 : mAlienSupport(alienSupport),
62 mMonitoring(monitoring),
63 mAllowedParentLevel(allowedParentLevel),
64 mParentFileReplacement(std::move(parentFileReplacement)),
65 mLevel(level)
66{
67 std::vector<char const*> capabilitiesSpecs = {
68 "O2Framework:RNTupleObjectReadingCapability",
69 "O2Framework:TTreeObjectReadingCapability",
70 };
71
72 std::vector<LoadablePlugin> plugins;
73 for (auto spec : capabilitiesSpecs) {
74 auto morePlugins = PluginManager::parsePluginSpecString(spec);
75 for (auto& extra : morePlugins) {
76 plugins.push_back(extra);
77 }
78 }
79
80 PluginManager::loadFromPlugin<RootObjectReadingCapability, RootObjectReadingCapabilityPlugin>(plugins, mFactory.capabilities);
81}
82
84{
85 LOGP(info, "DataInputDescriptor");
86 LOGP(info, " Table name : {}", tablename);
87 LOGP(info, " Tree name : {}", treename);
88 LOGP(info, " Input files file : {}", getInputfilesFilename());
89 LOGP(info, " File name regex : {}", getFilenamesRegexString());
90 LOGP(info, " Input files : {}", mfilenames.size());
91 for (auto fn : mfilenames) {
92 LOGP(info, " {} {}", fn->fileName, fn->numberOfTimeFrames);
93 }
94 LOGP(info, " Total number of TF: {}", getNumberTimeFrames());
95}
96
98{
99 return (minputfilesFile.empty() && minputfilesFilePtr) ? (std::string)*minputfilesFilePtr : minputfilesFile;
100}
101
103{
104 return (mFilenameRegex.empty() && mFilenameRegexPtr) ? (std::string)*mFilenameRegexPtr : mFilenameRegex;
105}
106
108{
109 return std::regex(getFilenamesRegexString());
110}
111
113{
114 // remove leading file:// from file name
115 if (fn->fileName.rfind("file://", 0) == 0) {
116 fn->fileName.erase(0, 7);
117 } else if (!mAlienSupport && fn->fileName.rfind("alien://", 0) == 0) {
118 LOGP(debug, "AliEn file requested. Enabling support.");
119 TGrid::Connect("alien://");
120 mAlienSupport = true;
121 }
122
123 mtotalNumberTimeFrames += fn->numberOfTimeFrames;
124 mfilenames.emplace_back(fn);
125}
126
128{
129 // no files left
130 if (counter >= getNumberInputfiles()) {
131 return false;
132 }
133
134 // open file
135 auto filename = mfilenames[counter]->fileName;
136 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
137 if (rootFS.get()) {
138 if (rootFS->GetFile()->GetName() == filename) {
139 return true;
140 }
142 }
143
144 mCurrentFilesystem = std::make_shared<TFileFileSystem>(TFile::Open(filename.c_str()), 50 * 1024 * 1024, mFactory);
145 if (!mCurrentFilesystem.get()) {
146 throw std::runtime_error(fmt::format("Couldn't open file \"{}\"!", filename));
147 }
148 rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
149
150 // get the parent file map if exists
151 mParentFileMap = (TMap*)rootFS->GetFile()->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path)
152 if (mParentFileMap && !mParentFileReplacement.empty()) {
153 auto pos = mParentFileReplacement.find(';');
154 if (pos == std::string::npos) {
155 throw std::runtime_error(fmt::format("Invalid syntax in aod-parent-base-path-replacement: \"{}\"", mParentFileReplacement.c_str()));
156 }
157 auto from = mParentFileReplacement.substr(0, pos);
158 auto to = mParentFileReplacement.substr(pos + 1);
159
160 auto it = mParentFileMap->MakeIterator();
161 while (auto obj = it->Next()) {
162 auto objString = (TObjString*)mParentFileMap->GetValue(obj);
163 objString->String().ReplaceAll(from.c_str(), to.c_str());
164 }
165 delete it;
166 }
167
168 // get the directory names
169 if (mfilenames[counter]->numberOfTimeFrames <= 0) {
170 const std::regex TFRegex = std::regex("/?DF_([0-9]+)(|-.*)$");
171 TList* keyList = rootFS->GetFile()->GetListOfKeys();
172 std::vector<std::string> finalList;
173
174 // extract TF numbers and sort accordingly
175 // We use an extra seen set to make sure we preserve the order in which
176 // we instert things in the final list and to make sure we do not have duplicates.
177 // Multiple folder numbers can happen if we use a flat structure /DF_<df>-<tablename>
178 std::unordered_set<size_t> seen;
179 for (auto key : *keyList) {
180 std::smatch matchResult;
181 std::string keyName = ((TObjString*)key)->GetString().Data();
182 bool match = std::regex_match(keyName, matchResult, TFRegex);
183 if (match) {
184 auto folderNumber = std::stoul(matchResult[1].str());
185 if (seen.find(folderNumber) == seen.end()) {
186 seen.insert(folderNumber);
187 mfilenames[counter]->listOfTimeFrameNumbers.emplace_back(folderNumber);
188 }
189 }
190 }
191
192 if (mParentFileMap != nullptr) {
193 // If we have a parent map, we should not process in DF alphabetical order but according to parent file to avoid swapping between files
194 std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end(),
195 [this](long const& l1, long const& l2) -> bool {
196 auto p1 = (TObjString*)this->mParentFileMap->GetValue(("DF_" + std::to_string(l1)).c_str());
197 auto p2 = (TObjString*)this->mParentFileMap->GetValue(("DF_" + std::to_string(l2)).c_str());
198 return p1->GetString().CompareTo(p2->GetString()) < 0;
199 });
200 } else {
201 std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end());
202 }
203
204 mfilenames[counter]->alreadyRead.resize(mfilenames[counter]->alreadyRead.size() + mfilenames[counter]->listOfTimeFrameNumbers.size(), false);
205 mfilenames[counter]->numberOfTimeFrames = mfilenames[counter]->listOfTimeFrameNumbers.size();
206 }
207
208 mCurrentFileID = counter;
209 mCurrentFileStartedAt = uv_hrtime();
210 mIOTime = 0;
211
212 return true;
213}
214
216{
217
218 // open file
219 if (!setFile(counter)) {
220 return 0ul;
221 }
222
223 // no TF left
224 if (mfilenames[counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[counter]->numberOfTimeFrames) {
225 return 0ul;
226 }
227
228 return (mfilenames[counter]->listOfTimeFrameNumbers)[numTF];
229}
230
231arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int numTF)
232{
233 // open file
234 if (!setFile(counter)) {
235 return {};
236 }
237
238 // no TF left
239 if (mfilenames[counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[counter]->numberOfTimeFrames) {
240 return {};
241 }
242
243 mfilenames[counter]->alreadyRead[numTF] = true;
244
245 return {fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]), mCurrentFilesystem};
246}
247
249{
250 if (!mParentFileMap) {
251 // This file has no parent map
252 return nullptr;
253 }
254 auto folderName = fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]);
255 auto parentFileName = (TObjString*)mParentFileMap->GetValue(folderName.c_str());
256 // The current DF is not found in the parent map (this should not happen and is a fatal error)
257 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
258 if (!parentFileName) {
259 throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), rootFS->GetFile()->GetName()));
260 return nullptr;
261 }
262
263 if (mParentFile) {
264 // Is this still the corresponding to the correct file?
265 auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(mParentFile->mCurrentFilesystem);
266 if (parentFileName->GetString().CompareTo(parentRootFS->GetFile()->GetName()) == 0) {
267 return mParentFile;
268 } else {
269 mParentFile->closeInputFile();
270 delete mParentFile;
271 mParentFile = nullptr;
272 }
273 }
274
275 if (mLevel == mAllowedParentLevel) {
276 throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(),
277 rootFS->GetFile()->GetName()));
278 }
279
280 LOGP(info, "Opening parent file {} for DF {}", parentFileName->GetString().Data(), folderName.c_str());
281 mParentFile = new DataInputDescriptor(mAlienSupport, mLevel + 1, mMonitoring, mAllowedParentLevel, mParentFileReplacement);
282 mParentFile->mdefaultFilenamesPtr = new std::vector<FileNameHolder*>;
283 mParentFile->mdefaultFilenamesPtr->emplace_back(makeFileNameHolder(parentFileName->GetString().Data()));
284 mParentFile->fillInputfiles();
285 mParentFile->setFile(0);
286 return mParentFile;
287}
288
290{
291 return mfilenames.at(counter)->numberOfTimeFrames;
292}
293
295{
296 auto& list = mfilenames.at(counter)->alreadyRead;
297 return std::count(list.begin(), list.end(), true);
298}
299
301{
302 int64_t wait_time = (int64_t)uv_hrtime() - (int64_t)mCurrentFileStartedAt - (int64_t)mIOTime;
303 if (wait_time < 0) {
304 wait_time = 0;
305 }
306 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
307 auto f = dynamic_cast<TFile*>(rootFS->GetFile());
308 std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", f->GetName(),
309 f->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), f->GetBytesRead(), f->GetReadCalls(),
310 ((float)mIOTime / 1e9), ((float)wait_time / 1e9), mLevel));
311#if __has_include(<TJAlienFile.h>)
312 auto alienFile = dynamic_cast<TJAlienFile*>(f);
313 if (alienFile) {
314 monitoringInfo += fmt::format(",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed());
315 }
316#endif
317 mMonitoring->send(o2::monitoring::Metric{monitoringInfo, "aod-file-read-info"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
318 LOGP(info, "Read info: {}", monitoringInfo);
319}
320
322{
323 if (mCurrentFilesystem.get()) {
324 if (mParentFile) {
325 mParentFile->closeInputFile();
326 delete mParentFile;
327 mParentFile = nullptr;
328 }
329
330 delete mParentFileMap;
331 mParentFileMap = nullptr;
332
334 mCurrentFilesystem.reset();
335 }
336}
337
339{
340 if (getNumberInputfiles() > 0) {
341 // 1. mfilenames
342 return getNumberInputfiles();
343 }
344
345 auto fileName = getInputfilesFilename();
346 if (!fileName.empty()) {
347 // 2. getFilenamesRegex() @ getInputfilesFilename()
348 try {
349 std::ifstream filelist(fileName);
350 if (!filelist.is_open()) {
351 throw std::runtime_error(fmt::format(R"(Couldn't open file "{}")", fileName));
352 }
353 while (std::getline(filelist, fileName)) {
354 // remove white spaces, empty lines are skipped
355 fileName.erase(std::remove_if(fileName.begin(), fileName.end(), ::isspace), fileName.end());
356 if (!fileName.empty() && (getFilenamesRegexString().empty() ||
357 std::regex_match(fileName, getFilenamesRegex()))) {
359 }
360 }
361 } catch (...) {
362 LOGP(error, "Check the input files file! Unable to process \"{}\"!", getInputfilesFilename());
363 return 0;
364 }
365 } else {
366 // 3. getFilenamesRegex() @ mdefaultFilenamesPtr
367 if (mdefaultFilenamesPtr) {
368 for (auto fileNameHolder : *mdefaultFilenamesPtr) {
369 if (getFilenamesRegexString().empty() ||
370 std::regex_match(fileNameHolder->fileName, getFilenamesRegex())) {
371 addFileNameHolder(fileNameHolder);
372 }
373 }
374 }
375 }
376
377 return getNumberInputfiles();
378}
379
380int DataInputDescriptor::findDFNumber(int file, std::string dfName)
381{
382 auto dfList = mfilenames[file]->listOfTimeFrameNumbers;
383 auto it = std::find_if(dfList.begin(), dfList.end(), [dfName](size_t i) { return fmt::format("DF_{}", i) == dfName; });
384 if (it == dfList.end()) {
385 return -1;
386 }
387 return it - dfList.begin();
388}
389
392 : mTarget(target)
393 {
394 start = uv_hrtime();
395 }
397 {
398 if (!active) {
399 return;
400 }
401 O2_SIGNPOST_ACTION(reader_memory_dump, [](void*) {
402 void (*dump_)(const char*);
403 if (void* sym = dlsym(nullptr, "igprof_dump_now")) {
404 dump_ = __extension__(void (*)(const char*)) sym;
405 if (dump_) {
406 std::string filename = fmt::format("reader-memory-dump-{}.gz", uv_hrtime());
407 dump_(filename.c_str());
408 }
409 }
410 });
411 mTarget += (uv_hrtime() - start);
412 }
413
414 void deactivate() {
415 active = false;
416 }
417
418 bool active = true;
419 uint64_t& mTarget;
420 uint64_t start;
421 uint64_t stop;
422};
423
424bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, std::string treename, size_t& totalSizeCompressed, size_t& totalSizeUncompressed)
425{
426 CalculateDelta t(mIOTime);
427 auto folder = getFileFolder(counter, numTF);
428 if (!folder.filesystem()) {
429 t.deactivate();
430 return false;
431 }
432
433 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(folder.filesystem());
434
435 if (!rootFS) {
436 t.deactivate();
437 throw std::runtime_error(fmt::format(R"(Not a TFile filesystem!)"));
438 }
439 // FIXME: Ugly. We should detect the format from the treename, good enough for now.
440 std::shared_ptr<arrow::dataset::FileFormat> format;
441 FragmentToBatch::StreamerCreator creator = nullptr;
442
443 auto fullpath = arrow::dataset::FileSource{folder.path() + "/" + treename, folder.filesystem()};
444
445 for (auto& capability : mFactory.capabilities) {
446 auto objectPath = capability.lfn2objectPath(fullpath.path());
447 void* handle = capability.getHandle(rootFS, objectPath);
448 if (handle) {
449 format = capability.factory().format();
450 creator = capability.factory().deferredOutputStreamer;
451 break;
452 }
453 }
454
455 // FIXME: we should distinguish between an actually missing object and one which has a non compatible
456 // format.
457 if (!format) {
458 t.deactivate();
459 LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.path());
460 auto parentFile = getParentFile(counter, numTF, treename);
461 if (parentFile != nullptr) {
462 int parentNumTF = parentFile->findDFNumber(0, folder.path());
463 if (parentNumTF == -1) {
464 auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(parentFile->mCurrentFilesystem);
465 throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", folder.path(), parentRootFS->GetFile()->GetName()));
466 }
467 // first argument is 0 as the parent file object contains only 1 file
468 return parentFile->readTree(outputs, dh, 0, parentNumTF, treename, totalSizeCompressed, totalSizeUncompressed);
469 }
470 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
471 throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fullpath.path(), rootFS->GetFile()->GetName()));
472 }
473
474 auto schemaOpt = format->Inspect(fullpath);
475 auto physicalSchema = schemaOpt;
476 std::vector<std::shared_ptr<arrow::Field>> fields;
477 for (auto& original : (*schemaOpt)->fields()) {
478 if (original->name().ends_with("_size")) {
479 continue;
480 }
481 fields.push_back(original);
482 }
483 auto datasetSchema = std::make_shared<arrow::Schema>(fields);
484
485 auto fragment = format->MakeFragment(fullpath, {}, *physicalSchema);
486
487 // create table output
488 auto o = Output(dh);
489
490 // FIXME: This should allow me to create a memory pool
491 // which I can then use to scan the dataset.
492 auto f2b = outputs.make<FragmentToBatch>(o, creator, *fragment);
493
496 f2b->setLabel(treename.c_str());
497 f2b->fill(datasetSchema, format);
498
499 return true;
500}
501
506
507DataInputDirector::DataInputDirector(std::string inputFile, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement) : mMonitoring(monitoring), mAllowedParentLevel(allowedParentLevel), mParentFileReplacement(std::move(parentFileReplacement))
508{
509 if (inputFile.size() && inputFile[0] == '@') {
510 inputFile.erase(0, 1);
511 setInputfilesFile(inputFile);
512 } else {
513 mdefaultInputFiles.emplace_back(makeFileNameHolder(inputFile));
514 }
515
517}
518
519DataInputDirector::DataInputDirector(std::vector<std::string> inputFiles, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement) : mMonitoring(monitoring), mAllowedParentLevel(allowedParentLevel), mParentFileReplacement(std::move(parentFileReplacement))
520{
521 for (auto inputFile : inputFiles) {
522 mdefaultInputFiles.emplace_back(makeFileNameHolder(inputFile));
523 }
524
526}
527
529{
530 for (auto fn : mdefaultInputFiles) {
531 delete fn;
532 }
533 mdefaultInputFiles.clear();
534 mdefaultDataInputDescriptor = nullptr;
535
536 for (auto fn : mdataInputDescriptors) {
537 delete fn;
538 }
539 mdataInputDescriptors.clear();
540}
541
543{
544 mdataInputDescriptors.clear();
545 mdefaultInputFiles.clear();
546 mFilenameRegex = std::string("");
547};
548
550{
551 if (mdefaultDataInputDescriptor) {
552 delete mdefaultDataInputDescriptor;
553 }
554 mdefaultDataInputDescriptor = new DataInputDescriptor(mAlienSupport, 0, mMonitoring, mAllowedParentLevel, mParentFileReplacement);
555
556 mdefaultDataInputDescriptor->setInputfilesFile(minputfilesFile);
557 mdefaultDataInputDescriptor->setFilenamesRegex(mFilenameRegex);
558 mdefaultDataInputDescriptor->setDefaultInputfiles(&mdefaultInputFiles);
559 mdefaultDataInputDescriptor->tablename = "any";
560 mdefaultDataInputDescriptor->treename = "any";
561 mdefaultDataInputDescriptor->fillInputfiles();
562
563 mAlienSupport &= mdefaultDataInputDescriptor->isAlienSupportOn();
564}
565
566bool DataInputDirector::readJson(std::string const& fnjson)
567{
568 // open the file
569 FILE* f = fopen(fnjson.c_str(), "r");
570 if (!f) {
571 LOGP(error, "Could not open JSON file \"{}\"!", fnjson);
572 return false;
573 }
574
575 // create streamer
576 char readBuffer[65536];
577 FileReadStream inputStream(f, readBuffer, sizeof(readBuffer));
578
579 // parse the json file
580 Document jsonDoc;
581 jsonDoc.ParseStream(inputStream);
582 auto status = readJsonDocument(&jsonDoc);
583
584 // clean up
585 fclose(f);
586
587 return status;
588}
589
590bool DataInputDirector::readJsonDocument(Document* jsonDoc)
591{
592 // initialisations
593 std::string fileName("");
594 const char* itemName;
595
596 // is it a proper json document?
597 if (jsonDoc->HasParseError()) {
598 LOGP(error, "Check the JSON document! There is a problem with the format!");
599 return false;
600 }
601
602 // InputDirector
603 itemName = "InputDirector";
604 const Value& didirItem = (*jsonDoc)[itemName];
605 if (!didirItem.IsObject()) {
606 LOGP(info, "No \"{}\" object found in the JSON document!", itemName);
607 return true;
608 }
609
610 // now read various items
611 itemName = "debugmode";
612 if (didirItem.HasMember(itemName)) {
613 if (didirItem[itemName].IsBool()) {
614 mDebugMode = (didirItem[itemName].GetBool());
615 } else {
616 LOGP(error, "Check the JSON document! Item \"{}\" must be a boolean!", itemName);
617 return false;
618 }
619 } else {
620 mDebugMode = false;
621 }
622
623 if (mDebugMode) {
624 StringBuffer buffer;
625 buffer.Clear();
626 PrettyWriter<StringBuffer> writer(buffer);
627 didirItem.Accept(writer);
628 LOGP(info, "InputDirector object: {}", std::string(buffer.GetString()));
629 }
630
631 itemName = "fileregex";
632 if (didirItem.HasMember(itemName)) {
633 if (didirItem[itemName].IsString()) {
634 setFilenamesRegex(didirItem[itemName].GetString());
635 } else {
636 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
637 return false;
638 }
639 }
640
641 itemName = "resfiles";
642 if (didirItem.HasMember(itemName)) {
643 if (didirItem[itemName].IsString()) {
644 fileName = didirItem[itemName].GetString();
645 if (fileName.size() && fileName[0] == '@') {
646 fileName.erase(0, 1);
647 setInputfilesFile(fileName);
648 } else {
650 mdefaultInputFiles.emplace_back(makeFileNameHolder(fileName));
651 }
652 } else if (didirItem[itemName].IsArray()) {
654 auto fns = didirItem[itemName].GetArray();
655 for (auto& fn : fns) {
656 mdefaultInputFiles.emplace_back(makeFileNameHolder(fn.GetString()));
657 }
658 } else {
659 LOGP(error, "Check the JSON document! Item \"{}\" must be a string or an array!", itemName);
660 return false;
661 }
662 }
663
664 itemName = "InputDescriptors";
665 if (didirItem.HasMember(itemName)) {
666 if (!didirItem[itemName].IsArray()) {
667 LOGP(error, "Check the JSON document! Item \"{}\" must be an array!", itemName);
668 return false;
669 }
670
671 // loop over DataInputDescriptors
672 for (auto& didescItem : didirItem[itemName].GetArray()) {
673 if (!didescItem.IsObject()) {
674 LOGP(error, "Check the JSON document! \"{}\" must be objects!", itemName);
675 return false;
676 }
677 // create a new dataInputDescriptor
678 auto didesc = new DataInputDescriptor(mAlienSupport, 0, mMonitoring, mAllowedParentLevel, mParentFileReplacement);
679 didesc->setDefaultInputfiles(&mdefaultInputFiles);
680
681 itemName = "table";
682 if (didescItem.HasMember(itemName)) {
683 if (didescItem[itemName].IsString()) {
684 didesc->tablename = didescItem[itemName].GetString();
685 didesc->matcher = DataDescriptorQueryBuilder::buildNode(didesc->tablename);
686 } else {
687 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
688 return false;
689 }
690 } else {
691 LOGP(error, "Check the JSON document! Item \"{}\" is missing!", itemName);
692 return false;
693 }
694
695 itemName = "treename";
696 if (didescItem.HasMember(itemName)) {
697 if (didescItem[itemName].IsString()) {
698 didesc->treename = didescItem[itemName].GetString();
699 } else {
700 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
701 return false;
702 }
703 } else {
704 auto m = DataDescriptorQueryBuilder::getTokens(didesc->tablename);
705 didesc->treename = m[2];
706 }
707
708 itemName = "fileregex";
709 if (didescItem.HasMember(itemName)) {
710 if (didescItem[itemName].IsString()) {
711 if (didesc->getNumberInputfiles() == 0) {
712 didesc->setFilenamesRegex(didescItem[itemName].GetString());
713 }
714 } else {
715 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
716 return false;
717 }
718 } else {
719 if (didesc->getNumberInputfiles() == 0) {
720 didesc->setFilenamesRegex(mFilenameRegexPtr);
721 }
722 }
723
724 itemName = "resfiles";
725 if (didescItem.HasMember(itemName)) {
726 if (didescItem[itemName].IsString()) {
727 fileName = didescItem[itemName].GetString();
728 if (fileName.size() && fileName[0] == '@') {
729 didesc->setInputfilesFile(fileName.erase(0, 1));
730 } else {
731 if (didesc->getFilenamesRegexString().empty() ||
732 std::regex_match(fileName, didesc->getFilenamesRegex())) {
733 didesc->addFileNameHolder(makeFileNameHolder(fileName));
734 }
735 }
736 } else if (didescItem[itemName].IsArray()) {
737 auto fns = didescItem[itemName].GetArray();
738 for (auto& fn : fns) {
739 if (didesc->getFilenamesRegexString().empty() ||
740 std::regex_match(fn.GetString(), didesc->getFilenamesRegex())) {
741 didesc->addFileNameHolder(makeFileNameHolder(fn.GetString()));
742 }
743 }
744 } else {
745 LOGP(error, "Check the JSON document! Item \"{}\" must be a string or an array!", itemName);
746 return false;
747 }
748 } else {
749 didesc->setInputfilesFile(minputfilesFilePtr);
750 }
751
752 // fill mfilenames and add InputDescriptor to InputDirector
753 if (didesc->fillInputfiles() > 0) {
754 mdataInputDescriptors.emplace_back(didesc);
755 } else {
756 didesc->printOut();
757 LOGP(info, "This DataInputDescriptor is ignored because its file list is empty!");
758 }
759 mAlienSupport &= didesc->isAlienSupportOn();
760 }
761 }
762
763 // add a default DataInputDescriptor
765
766 // check that all DataInputDescriptors have the same number of input files
767 if (!isValid()) {
768 printOut();
769 return false;
770 }
771
772 // print the DataIputDirector
773 if (mDebugMode) {
774 printOut();
775 }
776
777 return true;
778}
779
781{
782 DataInputDescriptor* result = nullptr;
783
784 // compute list of matching outputs
786
787 for (auto didesc : mdataInputDescriptors) {
788 if (didesc->matcher->match(dh, context)) {
789 result = didesc;
790 break;
791 }
792 }
793
794 return result;
795}
796
797arrow::dataset::FileSource DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF)
798{
799 auto didesc = getDataInputDescriptor(dh);
800 // if NOT match then use defaultDataInputDescriptor
801 if (!didesc) {
802 didesc = mdefaultDataInputDescriptor;
803 }
804
805 return didesc->getFileFolder(counter, numTF);
806}
807
809{
810 auto didesc = getDataInputDescriptor(dh);
811 // if NOT match then use defaultDataInputDescriptor
812 if (!didesc) {
813 didesc = mdefaultDataInputDescriptor;
814 }
815
816 return didesc->getTimeFramesInFile(counter);
817}
818
820{
821 auto didesc = getDataInputDescriptor(dh);
822 // if NOT match then use defaultDataInputDescriptor
823 if (!didesc) {
824 didesc = mdefaultDataInputDescriptor;
825 }
826
827 return didesc->getTimeFrameNumber(counter, numTF);
828}
829
830bool DataInputDirector::readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed)
831{
832 std::string treename;
833
834 auto didesc = getDataInputDescriptor(dh);
835 if (didesc) {
836 // if match then use filename and treename from DataInputDescriptor
837 treename = didesc->treename;
838 } else {
839 // if NOT match then use
840 // . filename from defaultDataInputDescriptor
841 // . treename from DataHeader
842 didesc = mdefaultDataInputDescriptor;
843 treename = aod::datamodel::getTreeName(dh);
844 }
845
846 auto result = didesc->readTree(outputs, dh, counter, numTF, treename, totalSizeCompressed, totalSizeUncompressed);
847 return result;
848}
849
851{
852 mdefaultDataInputDescriptor->closeInputFile();
853 for (auto didesc : mdataInputDescriptors) {
854 didesc->closeInputFile();
855 }
856}
857
858bool DataInputDirector::isValid()
859{
860 bool status = true;
861 int numberFiles = mdefaultDataInputDescriptor->getNumberInputfiles();
862 for (auto didesc : mdataInputDescriptors) {
863 status &= didesc->getNumberInputfiles() == numberFiles;
864 }
865
866 return status;
867}
868
870{
871 bool status = mdefaultDataInputDescriptor->getNumberInputfiles() <= counter;
872 for (auto didesc : mdataInputDescriptors) {
873 status &= (didesc->getNumberInputfiles() <= counter);
874 }
875
876 return status;
877}
878
880{
881 LOGP(info, "DataInputDirector");
882 LOGP(info, " Default input files file : {}", minputfilesFile);
883 LOGP(info, " Default file name regex : {}", mFilenameRegex);
884 LOGP(info, " Default file names : {}", mdefaultInputFiles.size());
885 for (auto const& fn : mdefaultInputFiles) {
886 LOGP(info, " {} {}", fn->fileName, fn->numberOfTimeFrames);
887 }
888 LOGP(info, " Default DataInputDescriptor:");
889 mdefaultDataInputDescriptor->printOut();
890 LOGP(info, " DataInputDescriptors : {}", getNumberInputDescriptors());
891 for (auto const& didesc : mdataInputDescriptors) {
892 didesc->printOut();
893 }
894}
895
896} // namespace o2::framework
o2::monitoring::tags::Value Value
int32_t i
bool o
uint16_t pos
Definition RawData.h:3
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:483
#define O2_SIGNPOST_ACTION(log, callback)
Definition Signpost.h:504
std::ostringstream debug
StringRef key
decltype(auto) make(const Output &spec, Args... args)
void addFileNameHolder(FileNameHolder *fn)
bool readTree(DataAllocator &outputs, header::DataHeader dh, int counter, int numTF, std::string treename, size_t &totalSizeCompressed, size_t &totalSizeUncompressed)
uint64_t getTimeFrameNumber(int counter, int numTF)
arrow::dataset::FileSource getFileFolder(int counter, int numTF)
DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring *monitoring=nullptr, int allowedParentLevel=0, std::string parentFileReplacement="")
void setDefaultInputfiles(std::vector< FileNameHolder * > *difnptr)
void setInputfilesFile(std::string dffn)
DataInputDescriptor * getParentFile(int counter, int numTF, std::string treename)
int findDFNumber(int file, std::string dfName)
bool readTree(DataAllocator &outputs, header::DataHeader dh, int counter, int numTF, size_t &totalSizeCompressed, size_t &totalSizeUncompressed)
DataInputDescriptor * getDataInputDescriptor(header::DataHeader dh)
arrow::dataset::FileSource getFileFolder(header::DataHeader dh, int counter, int numTF)
void setInputfilesFile(std::string iffn)
int getTimeFramesInFile(header::DataHeader dh, int counter)
uint64_t getTimeFrameNumber(header::DataHeader dh, int counter, int numTF)
void setFilenamesRegex(std::string dfn)
bool readJson(std::string const &fnjson)
std::function< std::shared_ptr< arrow::io::OutputStream >(std::shared_ptr< arrow::dataset::FileFragment >, const std::shared_ptr< arrow::ResizableBuffer > &buffer)> StreamerCreator
void setLabel(const char *label)
bool match(const std::vector< std::string > &queries, const char *pattern)
Definition dcs-ccdb.cxx:229
const GLfloat * m
Definition glcorearb.h:4066
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLdouble f
Definition glcorearb.h:310
GLenum target
Definition glcorearb.h:1641
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLuint start
Definition glcorearb.h:469
GLint GLint GLsizei GLint GLenum format
Definition glcorearb.h:275
GLuint counter
Definition glcorearb.h:3987
std::string getTreeName(header::DataHeader dh)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
FileNameHolder * makeFileNameHolder(std::string fileName)
Defining DataPointCompositeObject explicitly as copiable.
std::string filename()
void empty(int)
Definition list.h:40
static std::unique_ptr< data_matcher::DataDescriptorMatcher > buildNode(std::string const &nodeString)
static std::vector< std::string > getTokens(std::string const &nodeString)
static std::vector< LoadablePlugin > parsePluginSpecString(char const *str)
Parse a comma separated list of <library>:<plugin-name> plugin declarations.
std::vector< RootObjectReadingCapability > capabilities
the main header struct
Definition DataHeader.h:618
const std::string str