18#include "Monitoring/Tags.h"
19#include "Monitoring/Metric.h"
20#include "Monitoring/Monitoring.h"
22#include "rapidjson/document.h"
23#include "rapidjson/prettywriter.h"
24#include "rapidjson/filereadstream.h"
27#include "TObjString.h"
32#if __has_include(<TJAlienFile.h>)
33#include <TJAlienFile.h>
40using namespace rapidjson;
45 fileNameHolder->fileName = fileName;
47 return fileNameHolder;
51 mMonitoring(monitoring),
52 mAllowedParentLevel(allowedParentLevel),
53 mParentFileReplacement(
std::move(parentFileReplacement)),
60 LOGP(info,
"DataInputDescriptor");
61 LOGP(info,
" Table name : {}",
tablename);
62 LOGP(info,
" Tree name : {}",
treename);
65 LOGP(info,
" Input files : {}", mfilenames.size());
66 for (
auto fn : mfilenames) {
67 LOGP(info,
" {} {}", fn->fileName, fn->numberOfTimeFrames);
74 return (minputfilesFile.empty() && minputfilesFilePtr) ? (std::string)*minputfilesFilePtr : minputfilesFile;
79 return (mFilenameRegex.empty() && mFilenameRegexPtr) ? (std::string)*mFilenameRegexPtr : mFilenameRegex;
90 if (fn->
fileName.rfind(
"file://", 0) == 0) {
92 }
else if (!mAlienSupport && fn->
fileName.rfind(
"alien://", 0) == 0) {
93 LOGP(
debug,
"AliEn file requested. Enabling support.");
94 TGrid::Connect(
"alien://");
99 mfilenames.emplace_back(fn);
112 if (mcurrentFile->GetName() ==
filename) {
117 mcurrentFile = TFile::Open(
filename.c_str());
119 throw std::runtime_error(fmt::format(
"Couldn't open file \"{}\"!",
filename));
121 mcurrentFile->SetReadaheadSize(50 * 1024 * 1024);
124 mParentFileMap = (TMap*)mcurrentFile->Get(
"parentFiles");
125 if (mParentFileMap && !mParentFileReplacement.empty()) {
126 auto pos = mParentFileReplacement.find(
';');
127 if (
pos == std::string::npos) {
128 throw std::runtime_error(fmt::format(
"Invalid syntax in aod-parent-base-path-replacement: \"{}\"", mParentFileReplacement.c_str()));
130 auto from = mParentFileReplacement.substr(0,
pos);
131 auto to = mParentFileReplacement.substr(
pos + 1);
133 auto it = mParentFileMap->MakeIterator();
134 while (
auto obj = it->Next()) {
135 auto objString = (TObjString*)mParentFileMap->GetValue(obj);
136 objString->String().ReplaceAll(from.c_str(), to.c_str());
142 if (mfilenames[
counter]->numberOfTimeFrames <= 0) {
143 std::regex TFRegex = std::regex(
"DF_[0-9]+");
144 TList* keyList = mcurrentFile->GetListOfKeys();
147 for (
auto key : *keyList) {
148 if (std::regex_match(((TObjString*)
key)->GetString().Data(), TFRegex)) {
149 auto folderNumber = std::stoul(std::string(((TObjString*)
key)->GetString().Data()).substr(3));
150 mfilenames[
counter]->listOfTimeFrameNumbers.emplace_back(folderNumber);
153 if (mParentFileMap !=
nullptr) {
155 std::sort(mfilenames[
counter]->listOfTimeFrameNumbers.begin(), mfilenames[
counter]->listOfTimeFrameNumbers.end(),
156 [
this](
long const& l1,
long const& l2) ->
bool {
157 auto p1 = (TObjString*)this->mParentFileMap->GetValue((
"DF_" + std::to_string(l1)).c_str());
158 auto p2 = (TObjString*)this->mParentFileMap->GetValue((
"DF_" + std::to_string(l2)).c_str());
159 return p1->GetString().CompareTo(p2->GetString()) < 0;
162 std::sort(mfilenames[
counter]->listOfTimeFrameNumbers.begin(), mfilenames[
counter]->listOfTimeFrameNumbers.end());
165 for (
auto folderNumber : mfilenames[
counter]->listOfTimeFrameNumbers) {
167 mfilenames[
counter]->listOfTimeFrameKeys.emplace_back(folderName);
168 mfilenames[
counter]->alreadyRead.emplace_back(
false);
170 mfilenames[
counter]->numberOfTimeFrames = mfilenames[
counter]->listOfTimeFrameKeys.size();
174 mCurrentFileStartedAt = uv_hrtime();
189 if (mfilenames[
counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[
counter]->numberOfTimeFrames) {
193 return (mfilenames[
counter]->listOfTimeFrameNumbers)[numTF];
202 return fileAndFolder;
206 if (mfilenames[
counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[
counter]->numberOfTimeFrames) {
207 return fileAndFolder;
210 fileAndFolder.
file = mcurrentFile;
213 mfilenames[
counter]->alreadyRead[numTF] =
true;
215 return fileAndFolder;
220 if (!mParentFileMap) {
224 auto folderName = (mfilenames[
counter]->listOfTimeFrameKeys)[numTF];
225 auto parentFileName = (TObjString*)mParentFileMap->GetValue(folderName.c_str());
226 if (!parentFileName) {
228 throw std::runtime_error(fmt::format(R
"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), mcurrentFile->GetName()));
234 if (parentFileName->GetString().CompareTo(mParentFile->mcurrentFile->GetName()) == 0) {
239 mParentFile =
nullptr;
243 if (mLevel == mAllowedParentLevel) {
244 throw std::runtime_error(fmt::format(R
"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(), mcurrentFile->GetName()));
247 LOGP(info, "Opening parent file {} for DF {}", parentFileName->GetString().Data(), folderName.c_str());
248 mParentFile =
new DataInputDescriptor(mAlienSupport, mLevel + 1, mMonitoring, mAllowedParentLevel, mParentFileReplacement);
249 mParentFile->mdefaultFilenamesPtr =
new std::vector<FileNameHolder*>;
250 mParentFile->mdefaultFilenamesPtr->emplace_back(
makeFileNameHolder(parentFileName->GetString().Data()));
258 return mfilenames.at(
counter)->numberOfTimeFrames;
264 return std::count(
list.begin(),
list.end(),
true);
269 int64_t wait_time = (int64_t)uv_hrtime() - (int64_t)mCurrentFileStartedAt - (int64_t)mIOTime;
273 std::string monitoringInfo(fmt::format(
"lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", mcurrentFile->GetName(),
275 ((
float)mIOTime / 1e9), ((
float)wait_time / 1e9), mLevel));
276#if __has_include(<TJAlienFile.h>)
277 auto alienFile =
dynamic_cast<TJAlienFile*
>(mcurrentFile);
279 monitoringInfo += fmt::format(
",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed());
282 mMonitoring->send(o2::monitoring::Metric{monitoringInfo,
"aod-file-read-info"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
283 LOGP(info,
"Read info: {}", monitoringInfo);
292 mParentFile =
nullptr;
295 delete mParentFileMap;
296 mParentFileMap =
nullptr;
299 mcurrentFile->Close();
301 mcurrentFile =
nullptr;
313 if (!fileName.empty()) {
316 std::ifstream filelist(fileName);
317 if (!filelist.is_open()) {
318 throw std::runtime_error(fmt::format(R
"(Couldn't open file "{}")", fileName));
320 while (std::getline(filelist, fileName)) {
322 fileName.erase(std::remove_if(fileName.begin(), fileName.end(), ::isspace), fileName.end());
334 if (mdefaultFilenamesPtr) {
335 for (
auto fileNameHolder : *mdefaultFilenamesPtr) {
349 auto dfList = mfilenames[
file]->listOfTimeFrameKeys;
350 auto it = std::find(dfList.begin(), dfList.end(), dfName);
351 if (it == dfList.end()) {
354 return it - dfList.begin();
359 auto ioStart = uv_hrtime();
362 if (!fileAndFolder.file) {
366 auto fullpath = fileAndFolder.folderName +
"/" +
treename;
367 auto tree = (TTree*)fileAndFolder.file->Get(fullpath.c_str());
370 LOGP(
debug,
"Could not find tree {}. Trying in parent file.", fullpath.c_str());
372 if (parentFile !=
nullptr) {
373 int parentNumTF = parentFile->findDFNumber(0, fileAndFolder.folderName);
374 if (parentNumTF == -1) {
375 throw std::runtime_error(fmt::format(R
"(DF {} listed in parent file map but not found in the corresponding file "{}")", fileAndFolder.folderName, parentFile->mcurrentFile->GetName()));
378 return parentFile->readTree(outputs, dh, 0, parentNumTF,
treename, totalSizeCompressed, totalSizeUncompressed);
380 throw std::runtime_error(fmt::format(R
"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fileAndFolder.folderName + "/" +
treename, fileAndFolder.file->GetName()));
390 totalSizeCompressed +=
tree->GetZipBytes();
391 totalSizeUncompressed +=
tree->GetTotBytes();
392 t2t->addAllColumns(
tree);
396 mIOTime += (uv_hrtime() - ioStart);
406DataInputDirector::DataInputDirector(std::string inputFile, o2::monitoring::Monitoring* monitoring,
int allowedParentLevel, std::string parentFileReplacement) : mMonitoring(monitoring), mAllowedParentLevel(allowedParentLevel), mParentFileReplacement(
std::move(parentFileReplacement))
408 if (inputFile.size() && inputFile[0] ==
'@') {
409 inputFile.erase(0, 1);
418DataInputDirector::DataInputDirector(std::vector<std::string> inputFiles, o2::monitoring::Monitoring* monitoring,
int allowedParentLevel, std::string parentFileReplacement) : mMonitoring(monitoring), mAllowedParentLevel(allowedParentLevel), mParentFileReplacement(
std::move(parentFileReplacement))
420 for (
auto inputFile : inputFiles) {
429 for (
auto fn : mdefaultInputFiles) {
432 mdefaultInputFiles.clear();
433 mdefaultDataInputDescriptor =
nullptr;
435 for (
auto fn : mdataInputDescriptors) {
438 mdataInputDescriptors.clear();
443 mdataInputDescriptors.clear();
444 mdefaultInputFiles.clear();
445 mFilenameRegex = std::string(
"");
450 if (mdefaultDataInputDescriptor) {
451 delete mdefaultDataInputDescriptor;
453 mdefaultDataInputDescriptor =
new DataInputDescriptor(mAlienSupport, 0, mMonitoring, mAllowedParentLevel, mParentFileReplacement);
458 mdefaultDataInputDescriptor->
tablename =
"any";
459 mdefaultDataInputDescriptor->
treename =
"any";
468 FILE*
f = fopen(fnjson.c_str(),
"r");
470 LOGP(error,
"Could not open JSON file \"{}\"!", fnjson);
475 char readBuffer[65536];
476 FileReadStream inputStream(
f, readBuffer,
sizeof(readBuffer));
480 jsonDoc.ParseStream(inputStream);
481 auto status = readJsonDocument(&jsonDoc);
489bool DataInputDirector::readJsonDocument(Document* jsonDoc)
492 std::string fileName(
"");
493 const char* itemName;
496 if (jsonDoc->HasParseError()) {
497 LOGP(error,
"Check the JSON document! There is a problem with the format!");
502 itemName =
"InputDirector";
503 const Value& didirItem = (*jsonDoc)[itemName];
504 if (!didirItem.IsObject()) {
505 LOGP(info,
"No \"{}\" object found in the JSON document!", itemName);
510 itemName =
"debugmode";
511 if (didirItem.HasMember(itemName)) {
512 if (didirItem[itemName].IsBool()) {
513 mDebugMode = (didirItem[itemName].GetBool());
515 LOGP(error,
"Check the JSON document! Item \"{}\" must be a boolean!", itemName);
525 PrettyWriter<StringBuffer> writer(
buffer);
526 didirItem.Accept(writer);
527 LOGP(info,
"InputDirector object: {}", std::string(
buffer.GetString()));
530 itemName =
"fileregex";
531 if (didirItem.HasMember(itemName)) {
532 if (didirItem[itemName].IsString()) {
535 LOGP(error,
"Check the JSON document! Item \"{}\" must be a string!", itemName);
540 itemName =
"resfiles";
541 if (didirItem.HasMember(itemName)) {
542 if (didirItem[itemName].IsString()) {
543 fileName = didirItem[itemName].GetString();
544 if (fileName.size() && fileName[0] ==
'@') {
545 fileName.erase(0, 1);
551 }
else if (didirItem[itemName].IsArray()) {
553 auto fns = didirItem[itemName].GetArray();
554 for (
auto& fn : fns) {
558 LOGP(error,
"Check the JSON document! Item \"{}\" must be a string or an array!", itemName);
563 itemName =
"InputDescriptors";
564 if (didirItem.HasMember(itemName)) {
565 if (!didirItem[itemName].IsArray()) {
566 LOGP(error,
"Check the JSON document! Item \"{}\" must be an array!", itemName);
571 for (
auto& didescItem : didirItem[itemName].GetArray()) {
572 if (!didescItem.IsObject()) {
573 LOGP(error,
"Check the JSON document! \"{}\" must be objects!", itemName);
577 auto didesc =
new DataInputDescriptor(mAlienSupport, 0, mMonitoring, mAllowedParentLevel, mParentFileReplacement);
578 didesc->setDefaultInputfiles(&mdefaultInputFiles);
581 if (didescItem.HasMember(itemName)) {
582 if (didescItem[itemName].IsString()) {
583 didesc->tablename = didescItem[itemName].GetString();
586 LOGP(error,
"Check the JSON document! Item \"{}\" must be a string!", itemName);
590 LOGP(error,
"Check the JSON document! Item \"{}\" is missing!", itemName);
594 itemName =
"treename";
595 if (didescItem.HasMember(itemName)) {
596 if (didescItem[itemName].IsString()) {
597 didesc->treename = didescItem[itemName].GetString();
599 LOGP(error,
"Check the JSON document! Item \"{}\" must be a string!", itemName);
604 didesc->treename =
m[2];
607 itemName =
"fileregex";
608 if (didescItem.HasMember(itemName)) {
609 if (didescItem[itemName].IsString()) {
610 if (didesc->getNumberInputfiles() == 0) {
611 didesc->setFilenamesRegex(didescItem[itemName].GetString());
614 LOGP(error,
"Check the JSON document! Item \"{}\" must be a string!", itemName);
618 if (didesc->getNumberInputfiles() == 0) {
619 didesc->setFilenamesRegex(mFilenameRegexPtr);
623 itemName =
"resfiles";
624 if (didescItem.HasMember(itemName)) {
625 if (didescItem[itemName].IsString()) {
626 fileName = didescItem[itemName].GetString();
627 if (fileName.size() && fileName[0] ==
'@') {
628 didesc->setInputfilesFile(fileName.erase(0, 1));
630 if (didesc->getFilenamesRegexString().empty() ||
631 std::regex_match(fileName, didesc->getFilenamesRegex())) {
635 }
else if (didescItem[itemName].IsArray()) {
636 auto fns = didescItem[itemName].GetArray();
637 for (
auto& fn : fns) {
638 if (didesc->getFilenamesRegexString().empty() ||
639 std::regex_match(fn.GetString(), didesc->getFilenamesRegex())) {
644 LOGP(error,
"Check the JSON document! Item \"{}\" must be a string or an array!", itemName);
648 didesc->setInputfilesFile(minputfilesFilePtr);
652 if (didesc->fillInputfiles() > 0) {
653 mdataInputDescriptors.emplace_back(didesc);
656 LOGP(info,
"This DataInputDescriptor is ignored because its file list is empty!");
658 mAlienSupport &= didesc->isAlienSupportOn();
686 for (
auto didesc : mdataInputDescriptors) {
687 if (didesc->matcher->match(dh, context)) {
701 didesc = mdefaultDataInputDescriptor;
712 didesc = mdefaultDataInputDescriptor;
723 didesc = mdefaultDataInputDescriptor;
731 std::string treename;
736 treename = didesc->treename;
741 didesc = mdefaultDataInputDescriptor;
745 return didesc->readTree(outputs, dh,
counter, numTF, treename, totalSizeCompressed, totalSizeUncompressed);
751 for (
auto didesc : mdataInputDescriptors) {
752 didesc->closeInputFile();
756bool DataInputDirector::isValid()
760 for (
auto didesc : mdataInputDescriptors) {
761 status &= didesc->getNumberInputfiles() == numberFiles;
770 for (
auto didesc : mdataInputDescriptors) {
771 status &= (didesc->getNumberInputfiles() <=
counter);
779 LOGP(info,
"DataInputDirector");
780 LOGP(info,
" Default input files file : {}", minputfilesFile);
781 LOGP(info,
" Default file name regex : {}", mFilenameRegex);
782 LOGP(info,
" Default file names : {}", mdefaultInputFiles.size());
783 for (
auto const& fn : mdefaultInputFiles) {
784 LOGP(info,
" {} {}", fn->fileName, fn->numberOfTimeFrames);
786 LOGP(info,
" Default DataInputDescriptor:");
787 mdefaultDataInputDescriptor->
printOut();
789 for (
auto const& didesc : mdataInputDescriptors) {
o2::monitoring::tags::Value Value
decltype(auto) make(const Output &spec, Args... args)
void setLabel(const char *label)
std::string getTreeName(header::DataHeader dh)
Defining PrimaryVertex explicitly as messageable.
FileNameHolder * makeFileNameHolder(std::string fileName)
Defining DataPointCompositeObject explicitly as copiable.
std::string to_string(gsl::span< T, Size > span)
static std::unique_ptr< data_matcher::DataDescriptorMatcher > buildNode(std::string const &nodeString)
static std::vector< std::string > getTokens(std::string const &nodeString)
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))