Project
Loading...
Searching...
No Matches
DataInputDirector.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "DataInputDirector.h"
13#include "Framework/Logger.h"
15#include "Framework/Output.h"
16#include "Headers/DataHeader.h"
18#include "Monitoring/Tags.h"
19#include "Monitoring/Metric.h"
20#include "Monitoring/Monitoring.h"
21
22#include "rapidjson/document.h"
23#include "rapidjson/prettywriter.h"
24#include "rapidjson/filereadstream.h"
25
26#include "TGrid.h"
27#include "TObjString.h"
28#include "TMap.h"
29
30#include <uv.h>
31
32#if __has_include(<TJAlienFile.h>)
33#include <TJAlienFile.h>
34
35#include <utility>
36#endif
37
38namespace o2::framework
39{
40using namespace rapidjson;
41
42FileNameHolder* makeFileNameHolder(std::string fileName)
43{
44 auto fileNameHolder = new FileNameHolder();
45 fileNameHolder->fileName = fileName;
46
47 return fileNameHolder;
48}
49
50DataInputDescriptor::DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement) : mAlienSupport(alienSupport),
51 mMonitoring(monitoring),
52 mAllowedParentLevel(allowedParentLevel),
53 mParentFileReplacement(std::move(parentFileReplacement)),
54 mLevel(level)
55{
56}
57
59{
60 LOGP(info, "DataInputDescriptor");
61 LOGP(info, " Table name : {}", tablename);
62 LOGP(info, " Tree name : {}", treename);
63 LOGP(info, " Input files file : {}", getInputfilesFilename());
64 LOGP(info, " File name regex : {}", getFilenamesRegexString());
65 LOGP(info, " Input files : {}", mfilenames.size());
66 for (auto fn : mfilenames) {
67 LOGP(info, " {} {}", fn->fileName, fn->numberOfTimeFrames);
68 }
69 LOGP(info, " Total number of TF: {}", getNumberTimeFrames());
70}
71
73{
74 return (minputfilesFile.empty() && minputfilesFilePtr) ? (std::string)*minputfilesFilePtr : minputfilesFile;
75}
76
78{
79 return (mFilenameRegex.empty() && mFilenameRegexPtr) ? (std::string)*mFilenameRegexPtr : mFilenameRegex;
80}
81
83{
84 return std::regex(getFilenamesRegexString());
85}
86
88{
89 // remove leading file:// from file name
90 if (fn->fileName.rfind("file://", 0) == 0) {
91 fn->fileName.erase(0, 7);
92 } else if (!mAlienSupport && fn->fileName.rfind("alien://", 0) == 0) {
93 LOGP(debug, "AliEn file requested. Enabling support.");
94 TGrid::Connect("alien://");
95 mAlienSupport = true;
96 }
97
98 mtotalNumberTimeFrames += fn->numberOfTimeFrames;
99 mfilenames.emplace_back(fn);
100}
101
103{
104 // no files left
105 if (counter >= getNumberInputfiles()) {
106 return false;
107 }
108
109 // open file
110 auto filename = mfilenames[counter]->fileName;
111 if (mcurrentFile) {
112 if (mcurrentFile->GetName() == filename) {
113 return true;
114 }
116 }
117 mcurrentFile = TFile::Open(filename.c_str());
118 if (!mcurrentFile) {
119 throw std::runtime_error(fmt::format("Couldn't open file \"{}\"!", filename));
120 }
121 mcurrentFile->SetReadaheadSize(50 * 1024 * 1024);
122
123 // get the parent file map if exists
124 mParentFileMap = (TMap*)mcurrentFile->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path)
125 if (mParentFileMap && !mParentFileReplacement.empty()) {
126 auto pos = mParentFileReplacement.find(';');
127 if (pos == std::string::npos) {
128 throw std::runtime_error(fmt::format("Invalid syntax in aod-parent-base-path-replacement: \"{}\"", mParentFileReplacement.c_str()));
129 }
130 auto from = mParentFileReplacement.substr(0, pos);
131 auto to = mParentFileReplacement.substr(pos + 1);
132
133 auto it = mParentFileMap->MakeIterator();
134 while (auto obj = it->Next()) {
135 auto objString = (TObjString*)mParentFileMap->GetValue(obj);
136 objString->String().ReplaceAll(from.c_str(), to.c_str());
137 }
138 delete it;
139 }
140
141 // get the directory names
142 if (mfilenames[counter]->numberOfTimeFrames <= 0) {
143 std::regex TFRegex = std::regex("DF_[0-9]+");
144 TList* keyList = mcurrentFile->GetListOfKeys();
145
146 // extract TF numbers and sort accordingly
147 for (auto key : *keyList) {
148 if (std::regex_match(((TObjString*)key)->GetString().Data(), TFRegex)) {
149 auto folderNumber = std::stoul(std::string(((TObjString*)key)->GetString().Data()).substr(3));
150 mfilenames[counter]->listOfTimeFrameNumbers.emplace_back(folderNumber);
151 }
152 }
153 if (mParentFileMap != nullptr) {
154 // If we have a parent map, we should not process in DF alphabetical order but according to parent file to avoid swapping between files
155 std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end(),
156 [this](long const& l1, long const& l2) -> bool {
157 auto p1 = (TObjString*)this->mParentFileMap->GetValue(("DF_" + std::to_string(l1)).c_str());
158 auto p2 = (TObjString*)this->mParentFileMap->GetValue(("DF_" + std::to_string(l2)).c_str());
159 return p1->GetString().CompareTo(p2->GetString()) < 0;
160 });
161 } else {
162 std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end());
163 }
164
165 for (auto folderNumber : mfilenames[counter]->listOfTimeFrameNumbers) {
166 auto folderName = "DF_" + std::to_string(folderNumber);
167 mfilenames[counter]->listOfTimeFrameKeys.emplace_back(folderName);
168 mfilenames[counter]->alreadyRead.emplace_back(false);
169 }
170 mfilenames[counter]->numberOfTimeFrames = mfilenames[counter]->listOfTimeFrameKeys.size();
171 }
172
173 mCurrentFileID = counter;
174 mCurrentFileStartedAt = uv_hrtime();
175 mIOTime = 0;
176
177 return true;
178}
179
181{
182
183 // open file
184 if (!setFile(counter)) {
185 return 0ul;
186 }
187
188 // no TF left
189 if (mfilenames[counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[counter]->numberOfTimeFrames) {
190 return 0ul;
191 }
192
193 return (mfilenames[counter]->listOfTimeFrameNumbers)[numTF];
194}
195
197{
198 FileAndFolder fileAndFolder;
199
200 // open file
201 if (!setFile(counter)) {
202 return fileAndFolder;
203 }
204
205 // no TF left
206 if (mfilenames[counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[counter]->numberOfTimeFrames) {
207 return fileAndFolder;
208 }
209
210 fileAndFolder.file = mcurrentFile;
211 fileAndFolder.folderName = (mfilenames[counter]->listOfTimeFrameKeys)[numTF];
212
213 mfilenames[counter]->alreadyRead[numTF] = true;
214
215 return fileAndFolder;
216}
217
219{
220 if (!mParentFileMap) {
221 // This file has no parent map
222 return nullptr;
223 }
224 auto folderName = (mfilenames[counter]->listOfTimeFrameKeys)[numTF];
225 auto parentFileName = (TObjString*)mParentFileMap->GetValue(folderName.c_str());
226 if (!parentFileName) {
227 // The current DF is not found in the parent map (this should not happen and is a fatal error)
228 throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), mcurrentFile->GetName()));
229 return nullptr;
230 }
231
232 if (mParentFile) {
233 // Is this still the corresponding to the correct file?
234 if (parentFileName->GetString().CompareTo(mParentFile->mcurrentFile->GetName()) == 0) {
235 return mParentFile;
236 } else {
237 mParentFile->closeInputFile();
238 delete mParentFile;
239 mParentFile = nullptr;
240 }
241 }
242
243 if (mLevel == mAllowedParentLevel) {
244 throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(), mcurrentFile->GetName()));
245 }
246
247 LOGP(info, "Opening parent file {} for DF {}", parentFileName->GetString().Data(), folderName.c_str());
248 mParentFile = new DataInputDescriptor(mAlienSupport, mLevel + 1, mMonitoring, mAllowedParentLevel, mParentFileReplacement);
249 mParentFile->mdefaultFilenamesPtr = new std::vector<FileNameHolder*>;
250 mParentFile->mdefaultFilenamesPtr->emplace_back(makeFileNameHolder(parentFileName->GetString().Data()));
251 mParentFile->fillInputfiles();
252 mParentFile->setFile(0);
253 return mParentFile;
254}
255
257{
258 return mfilenames.at(counter)->numberOfTimeFrames;
259}
260
262{
263 auto& list = mfilenames.at(counter)->alreadyRead;
264 return std::count(list.begin(), list.end(), true);
265}
266
268{
269 int64_t wait_time = (int64_t)uv_hrtime() - (int64_t)mCurrentFileStartedAt - (int64_t)mIOTime;
270 if (wait_time < 0) {
271 wait_time = 0;
272 }
273 std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", mcurrentFile->GetName(),
274 mcurrentFile->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), mcurrentFile->GetBytesRead(), mcurrentFile->GetReadCalls(),
275 ((float)mIOTime / 1e9), ((float)wait_time / 1e9), mLevel));
276#if __has_include(<TJAlienFile.h>)
277 auto alienFile = dynamic_cast<TJAlienFile*>(mcurrentFile);
278 if (alienFile) {
279 monitoringInfo += fmt::format(",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed());
280 }
281#endif
282 mMonitoring->send(o2::monitoring::Metric{monitoringInfo, "aod-file-read-info"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
283 LOGP(info, "Read info: {}", monitoringInfo);
284}
285
287{
288 if (mcurrentFile) {
289 if (mParentFile) {
290 mParentFile->closeInputFile();
291 delete mParentFile;
292 mParentFile = nullptr;
293 }
294
295 delete mParentFileMap;
296 mParentFileMap = nullptr;
297
299 mcurrentFile->Close();
300 delete mcurrentFile;
301 mcurrentFile = nullptr;
302 }
303}
304
306{
307 if (getNumberInputfiles() > 0) {
308 // 1. mfilenames
309 return getNumberInputfiles();
310 }
311
312 auto fileName = getInputfilesFilename();
313 if (!fileName.empty()) {
314 // 2. getFilenamesRegex() @ getInputfilesFilename()
315 try {
316 std::ifstream filelist(fileName);
317 if (!filelist.is_open()) {
318 throw std::runtime_error(fmt::format(R"(Couldn't open file "{}")", fileName));
319 }
320 while (std::getline(filelist, fileName)) {
321 // remove white spaces, empty lines are skipped
322 fileName.erase(std::remove_if(fileName.begin(), fileName.end(), ::isspace), fileName.end());
323 if (!fileName.empty() && (getFilenamesRegexString().empty() ||
324 std::regex_match(fileName, getFilenamesRegex()))) {
326 }
327 }
328 } catch (...) {
329 LOGP(error, "Check the input files file! Unable to process \"{}\"!", getInputfilesFilename());
330 return 0;
331 }
332 } else {
333 // 3. getFilenamesRegex() @ mdefaultFilenamesPtr
334 if (mdefaultFilenamesPtr) {
335 for (auto fileNameHolder : *mdefaultFilenamesPtr) {
336 if (getFilenamesRegexString().empty() ||
337 std::regex_match(fileNameHolder->fileName, getFilenamesRegex())) {
338 addFileNameHolder(fileNameHolder);
339 }
340 }
341 }
342 }
343
344 return getNumberInputfiles();
345}
346
347int DataInputDescriptor::findDFNumber(int file, std::string dfName)
348{
349 auto dfList = mfilenames[file]->listOfTimeFrameKeys;
350 auto it = std::find(dfList.begin(), dfList.end(), dfName);
351 if (it == dfList.end()) {
352 return -1;
353 }
354 return it - dfList.begin();
355}
356
357bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, std::string treename, size_t& totalSizeCompressed, size_t& totalSizeUncompressed)
358{
359 auto ioStart = uv_hrtime();
360
361 auto fileAndFolder = getFileFolder(counter, numTF);
362 if (!fileAndFolder.file) {
363 return false;
364 }
365
366 auto fullpath = fileAndFolder.folderName + "/" + treename;
367 auto tree = (TTree*)fileAndFolder.file->Get(fullpath.c_str());
368
369 if (!tree) {
370 LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.c_str());
371 auto parentFile = getParentFile(counter, numTF, treename);
372 if (parentFile != nullptr) {
373 int parentNumTF = parentFile->findDFNumber(0, fileAndFolder.folderName);
374 if (parentNumTF == -1) {
375 throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", fileAndFolder.folderName, parentFile->mcurrentFile->GetName()));
376 }
377 // first argument is 0 as the parent file object contains only 1 file
378 return parentFile->readTree(outputs, dh, 0, parentNumTF, treename, totalSizeCompressed, totalSizeUncompressed);
379 }
380 throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fileAndFolder.folderName + "/" + treename, fileAndFolder.file->GetName()));
381 }
382
383 // create table output
384 auto o = Output(dh);
385 auto t2t = outputs.make<TreeToTable>(o);
386
387 // add branches to read
388 // fill the table
389 t2t->setLabel(tree->GetName());
390 totalSizeCompressed += tree->GetZipBytes();
391 totalSizeUncompressed += tree->GetTotBytes();
392 t2t->addAllColumns(tree);
393 t2t->fill(tree);
394 delete tree;
395
396 mIOTime += (uv_hrtime() - ioStart);
397
398 return true;
399}
400
405
406DataInputDirector::DataInputDirector(std::string inputFile, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement) : mMonitoring(monitoring), mAllowedParentLevel(allowedParentLevel), mParentFileReplacement(std::move(parentFileReplacement))
407{
408 if (inputFile.size() && inputFile[0] == '@') {
409 inputFile.erase(0, 1);
410 setInputfilesFile(inputFile);
411 } else {
412 mdefaultInputFiles.emplace_back(makeFileNameHolder(inputFile));
413 }
414
416}
417
418DataInputDirector::DataInputDirector(std::vector<std::string> inputFiles, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement) : mMonitoring(monitoring), mAllowedParentLevel(allowedParentLevel), mParentFileReplacement(std::move(parentFileReplacement))
419{
420 for (auto inputFile : inputFiles) {
421 mdefaultInputFiles.emplace_back(makeFileNameHolder(inputFile));
422 }
423
425}
426
428{
429 for (auto fn : mdefaultInputFiles) {
430 delete fn;
431 }
432 mdefaultInputFiles.clear();
433 mdefaultDataInputDescriptor = nullptr;
434
435 for (auto fn : mdataInputDescriptors) {
436 delete fn;
437 }
438 mdataInputDescriptors.clear();
439}
440
442{
443 mdataInputDescriptors.clear();
444 mdefaultInputFiles.clear();
445 mFilenameRegex = std::string("");
446};
447
449{
450 if (mdefaultDataInputDescriptor) {
451 delete mdefaultDataInputDescriptor;
452 }
453 mdefaultDataInputDescriptor = new DataInputDescriptor(mAlienSupport, 0, mMonitoring, mAllowedParentLevel, mParentFileReplacement);
454
455 mdefaultDataInputDescriptor->setInputfilesFile(minputfilesFile);
456 mdefaultDataInputDescriptor->setFilenamesRegex(mFilenameRegex);
457 mdefaultDataInputDescriptor->setDefaultInputfiles(&mdefaultInputFiles);
458 mdefaultDataInputDescriptor->tablename = "any";
459 mdefaultDataInputDescriptor->treename = "any";
460 mdefaultDataInputDescriptor->fillInputfiles();
461
462 mAlienSupport &= mdefaultDataInputDescriptor->isAlienSupportOn();
463}
464
465bool DataInputDirector::readJson(std::string const& fnjson)
466{
467 // open the file
468 FILE* f = fopen(fnjson.c_str(), "r");
469 if (!f) {
470 LOGP(error, "Could not open JSON file \"{}\"!", fnjson);
471 return false;
472 }
473
474 // create streamer
475 char readBuffer[65536];
476 FileReadStream inputStream(f, readBuffer, sizeof(readBuffer));
477
478 // parse the json file
479 Document jsonDoc;
480 jsonDoc.ParseStream(inputStream);
481 auto status = readJsonDocument(&jsonDoc);
482
483 // clean up
484 fclose(f);
485
486 return status;
487}
488
489bool DataInputDirector::readJsonDocument(Document* jsonDoc)
490{
491 // initialisations
492 std::string fileName("");
493 const char* itemName;
494
495 // is it a proper json document?
496 if (jsonDoc->HasParseError()) {
497 LOGP(error, "Check the JSON document! There is a problem with the format!");
498 return false;
499 }
500
501 // InputDirector
502 itemName = "InputDirector";
503 const Value& didirItem = (*jsonDoc)[itemName];
504 if (!didirItem.IsObject()) {
505 LOGP(info, "No \"{}\" object found in the JSON document!", itemName);
506 return true;
507 }
508
509 // now read various items
510 itemName = "debugmode";
511 if (didirItem.HasMember(itemName)) {
512 if (didirItem[itemName].IsBool()) {
513 mDebugMode = (didirItem[itemName].GetBool());
514 } else {
515 LOGP(error, "Check the JSON document! Item \"{}\" must be a boolean!", itemName);
516 return false;
517 }
518 } else {
519 mDebugMode = false;
520 }
521
522 if (mDebugMode) {
523 StringBuffer buffer;
524 buffer.Clear();
525 PrettyWriter<StringBuffer> writer(buffer);
526 didirItem.Accept(writer);
527 LOGP(info, "InputDirector object: {}", std::string(buffer.GetString()));
528 }
529
530 itemName = "fileregex";
531 if (didirItem.HasMember(itemName)) {
532 if (didirItem[itemName].IsString()) {
533 setFilenamesRegex(didirItem[itemName].GetString());
534 } else {
535 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
536 return false;
537 }
538 }
539
540 itemName = "resfiles";
541 if (didirItem.HasMember(itemName)) {
542 if (didirItem[itemName].IsString()) {
543 fileName = didirItem[itemName].GetString();
544 if (fileName.size() && fileName[0] == '@') {
545 fileName.erase(0, 1);
546 setInputfilesFile(fileName);
547 } else {
549 mdefaultInputFiles.emplace_back(makeFileNameHolder(fileName));
550 }
551 } else if (didirItem[itemName].IsArray()) {
553 auto fns = didirItem[itemName].GetArray();
554 for (auto& fn : fns) {
555 mdefaultInputFiles.emplace_back(makeFileNameHolder(fn.GetString()));
556 }
557 } else {
558 LOGP(error, "Check the JSON document! Item \"{}\" must be a string or an array!", itemName);
559 return false;
560 }
561 }
562
563 itemName = "InputDescriptors";
564 if (didirItem.HasMember(itemName)) {
565 if (!didirItem[itemName].IsArray()) {
566 LOGP(error, "Check the JSON document! Item \"{}\" must be an array!", itemName);
567 return false;
568 }
569
570 // loop over DataInputDescriptors
571 for (auto& didescItem : didirItem[itemName].GetArray()) {
572 if (!didescItem.IsObject()) {
573 LOGP(error, "Check the JSON document! \"{}\" must be objects!", itemName);
574 return false;
575 }
576 // create a new dataInputDescriptor
577 auto didesc = new DataInputDescriptor(mAlienSupport, 0, mMonitoring, mAllowedParentLevel, mParentFileReplacement);
578 didesc->setDefaultInputfiles(&mdefaultInputFiles);
579
580 itemName = "table";
581 if (didescItem.HasMember(itemName)) {
582 if (didescItem[itemName].IsString()) {
583 didesc->tablename = didescItem[itemName].GetString();
584 didesc->matcher = DataDescriptorQueryBuilder::buildNode(didesc->tablename);
585 } else {
586 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
587 return false;
588 }
589 } else {
590 LOGP(error, "Check the JSON document! Item \"{}\" is missing!", itemName);
591 return false;
592 }
593
594 itemName = "treename";
595 if (didescItem.HasMember(itemName)) {
596 if (didescItem[itemName].IsString()) {
597 didesc->treename = didescItem[itemName].GetString();
598 } else {
599 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
600 return false;
601 }
602 } else {
603 auto m = DataDescriptorQueryBuilder::getTokens(didesc->tablename);
604 didesc->treename = m[2];
605 }
606
607 itemName = "fileregex";
608 if (didescItem.HasMember(itemName)) {
609 if (didescItem[itemName].IsString()) {
610 if (didesc->getNumberInputfiles() == 0) {
611 didesc->setFilenamesRegex(didescItem[itemName].GetString());
612 }
613 } else {
614 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
615 return false;
616 }
617 } else {
618 if (didesc->getNumberInputfiles() == 0) {
619 didesc->setFilenamesRegex(mFilenameRegexPtr);
620 }
621 }
622
623 itemName = "resfiles";
624 if (didescItem.HasMember(itemName)) {
625 if (didescItem[itemName].IsString()) {
626 fileName = didescItem[itemName].GetString();
627 if (fileName.size() && fileName[0] == '@') {
628 didesc->setInputfilesFile(fileName.erase(0, 1));
629 } else {
630 if (didesc->getFilenamesRegexString().empty() ||
631 std::regex_match(fileName, didesc->getFilenamesRegex())) {
632 didesc->addFileNameHolder(makeFileNameHolder(fileName));
633 }
634 }
635 } else if (didescItem[itemName].IsArray()) {
636 auto fns = didescItem[itemName].GetArray();
637 for (auto& fn : fns) {
638 if (didesc->getFilenamesRegexString().empty() ||
639 std::regex_match(fn.GetString(), didesc->getFilenamesRegex())) {
640 didesc->addFileNameHolder(makeFileNameHolder(fn.GetString()));
641 }
642 }
643 } else {
644 LOGP(error, "Check the JSON document! Item \"{}\" must be a string or an array!", itemName);
645 return false;
646 }
647 } else {
648 didesc->setInputfilesFile(minputfilesFilePtr);
649 }
650
651 // fill mfilenames and add InputDescriptor to InputDirector
652 if (didesc->fillInputfiles() > 0) {
653 mdataInputDescriptors.emplace_back(didesc);
654 } else {
655 didesc->printOut();
656 LOGP(info, "This DataInputDescriptor is ignored because its file list is empty!");
657 }
658 mAlienSupport &= didesc->isAlienSupportOn();
659 }
660 }
661
662 // add a default DataInputDescriptor
664
665 // check that all DataInputDescriptors have the same number of input files
666 if (!isValid()) {
667 printOut();
668 return false;
669 }
670
671 // print the DataIputDirector
672 if (mDebugMode) {
673 printOut();
674 }
675
676 return true;
677}
678
680{
681 DataInputDescriptor* result = nullptr;
682
683 // compute list of matching outputs
685
686 for (auto didesc : mdataInputDescriptors) {
687 if (didesc->matcher->match(dh, context)) {
688 result = didesc;
689 break;
690 }
691 }
692
693 return result;
694}
695
697{
698 auto didesc = getDataInputDescriptor(dh);
699 // if NOT match then use defaultDataInputDescriptor
700 if (!didesc) {
701 didesc = mdefaultDataInputDescriptor;
702 }
703
704 return didesc->getFileFolder(counter, numTF);
705}
706
708{
709 auto didesc = getDataInputDescriptor(dh);
710 // if NOT match then use defaultDataInputDescriptor
711 if (!didesc) {
712 didesc = mdefaultDataInputDescriptor;
713 }
714
715 return didesc->getTimeFramesInFile(counter);
716}
717
719{
720 auto didesc = getDataInputDescriptor(dh);
721 // if NOT match then use defaultDataInputDescriptor
722 if (!didesc) {
723 didesc = mdefaultDataInputDescriptor;
724 }
725
726 return didesc->getTimeFrameNumber(counter, numTF);
727}
728
729bool DataInputDirector::readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed)
730{
731 std::string treename;
732
733 auto didesc = getDataInputDescriptor(dh);
734 if (didesc) {
735 // if match then use filename and treename from DataInputDescriptor
736 treename = didesc->treename;
737 } else {
738 // if NOT match then use
739 // . filename from defaultDataInputDescriptor
740 // . treename from DataHeader
741 didesc = mdefaultDataInputDescriptor;
742 treename = aod::datamodel::getTreeName(dh);
743 }
744
745 return didesc->readTree(outputs, dh, counter, numTF, treename, totalSizeCompressed, totalSizeUncompressed);
746}
747
749{
750 mdefaultDataInputDescriptor->closeInputFile();
751 for (auto didesc : mdataInputDescriptors) {
752 didesc->closeInputFile();
753 }
754}
755
756bool DataInputDirector::isValid()
757{
758 bool status = true;
759 int numberFiles = mdefaultDataInputDescriptor->getNumberInputfiles();
760 for (auto didesc : mdataInputDescriptors) {
761 status &= didesc->getNumberInputfiles() == numberFiles;
762 }
763
764 return status;
765}
766
768{
769 bool status = mdefaultDataInputDescriptor->getNumberInputfiles() <= counter;
770 for (auto didesc : mdataInputDescriptors) {
771 status &= (didesc->getNumberInputfiles() <= counter);
772 }
773
774 return status;
775}
776
778{
779 LOGP(info, "DataInputDirector");
780 LOGP(info, " Default input files file : {}", minputfilesFile);
781 LOGP(info, " Default file name regex : {}", mFilenameRegex);
782 LOGP(info, " Default file names : {}", mdefaultInputFiles.size());
783 for (auto const& fn : mdefaultInputFiles) {
784 LOGP(info, " {} {}", fn->fileName, fn->numberOfTimeFrames);
785 }
786 LOGP(info, " Default DataInputDescriptor:");
787 mdefaultDataInputDescriptor->printOut();
788 LOGP(info, " DataInputDescriptors : {}", getNumberInputDescriptors());
789 for (auto const& didesc : mdataInputDescriptors) {
790 didesc->printOut();
791 }
792}
793
794} // namespace o2::framework
o2::monitoring::tags::Value Value
bool o
uint16_t pos
Definition RawData.h:3
std::ostringstream debug
StringRef key
decltype(auto) make(const Output &spec, Args... args)
void addFileNameHolder(FileNameHolder *fn)
bool readTree(DataAllocator &outputs, header::DataHeader dh, int counter, int numTF, std::string treename, size_t &totalSizeCompressed, size_t &totalSizeUncompressed)
uint64_t getTimeFrameNumber(int counter, int numTF)
FileAndFolder getFileFolder(int counter, int numTF)
void setDefaultInputfiles(std::vector< FileNameHolder * > *difnptr)
void setInputfilesFile(std::string dffn)
DataInputDescriptor * getParentFile(int counter, int numTF, std::string treename)
int findDFNumber(int file, std::string dfName)
bool readTree(DataAllocator &outputs, header::DataHeader dh, int counter, int numTF, size_t &totalSizeCompressed, size_t &totalSizeUncompressed)
DataInputDescriptor * getDataInputDescriptor(header::DataHeader dh)
void setInputfilesFile(std::string iffn)
int getTimeFramesInFile(header::DataHeader dh, int counter)
uint64_t getTimeFrameNumber(header::DataHeader dh, int counter, int numTF)
void setFilenamesRegex(std::string dfn)
bool readJson(std::string const &fnjson)
FileAndFolder getFileFolder(header::DataHeader dh, int counter, int numTF)
void setLabel(const char *label)
const GLfloat * m
Definition glcorearb.h:4066
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLdouble f
Definition glcorearb.h:310
GLuint counter
Definition glcorearb.h:3987
std::string getTreeName(header::DataHeader dh)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
FileNameHolder * makeFileNameHolder(std::string fileName)
Defining DataPointCompositeObject explicitly as copiable.
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::string filename()
void empty(int)
Definition list.h:40
static std::unique_ptr< data_matcher::DataDescriptorMatcher > buildNode(std::string const &nodeString)
static std::vector< std::string > getTokens(std::string const &nodeString)
the main header struct
Definition DataHeader.h:618
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))