21#include "Monitoring/Tags.h"
22#include "Monitoring/Metric.h"
23#include "Monitoring/Monitoring.h"
25#include "rapidjson/document.h"
26#include "rapidjson/prettywriter.h"
27#include "rapidjson/filereadstream.h"
30#include "TObjString.h"
34#include <arrow/dataset/file_base.h>
35#include <arrow/dataset/dataset.h>
39#if __has_include(<TJAlienFile.h>)
40#include <TJAlienFile.h>
50using namespace rapidjson;
55 fileNameHolder->fileName = fileName;
57 return fileNameHolder;
61 : mAlienSupport(alienSupport),
62 mMonitoring(monitoring),
63 mAllowedParentLevel(allowedParentLevel),
64 mParentFileReplacement(
std::move(parentFileReplacement)),
67 std::vector<char const*> capabilitiesSpecs = {
68 "O2Framework:RNTupleObjectReadingCapability",
69 "O2Framework:TTreeObjectReadingCapability",
72 std::vector<LoadablePlugin> plugins;
73 for (
auto spec : capabilitiesSpecs) {
75 for (
auto& extra : morePlugins) {
76 plugins.push_back(extra);
80 PluginManager::loadFromPlugin<RootObjectReadingCapability, RootObjectReadingCapabilityPlugin>(plugins, mFactory.
capabilities);
85 LOGP(info,
"DataInputDescriptor");
86 LOGP(info,
" Table name : {}",
tablename);
87 LOGP(info,
" Tree name : {}",
treename);
90 LOGP(info,
" Input files : {}", mfilenames.size());
91 for (
auto fn : mfilenames) {
92 LOGP(info,
" {} {}", fn->fileName, fn->numberOfTimeFrames);
99 return (minputfilesFile.empty() && minputfilesFilePtr) ? (std::string)*minputfilesFilePtr : minputfilesFile;
104 return (mFilenameRegex.empty() && mFilenameRegexPtr) ? (std::string)*mFilenameRegexPtr : mFilenameRegex;
115 if (fn->
fileName.rfind(
"file://", 0) == 0) {
117 }
else if (!mAlienSupport && fn->
fileName.rfind(
"alien://", 0) == 0 && !gGrid) {
118 LOGP(
debug,
"AliEn file requested. Enabling support.");
119 TGrid::Connect(
"alien://");
120 mAlienSupport =
true;
124 mfilenames.emplace_back(fn);
138 if (!origin.starts_with(
"AOD")) {
139 filename = std::regex_replace(
filename, std::regex(
"[.]root$"), fmt::format(
"_{}.root", origin));
143 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
145 if (rootFS->GetFile()->GetName() ==
filename) {
151 mCurrentFilesystem = std::make_shared<TFileFileSystem>(TFile::Open(
filename.c_str()), 50 * 1024 * 1024, mFactory);
152 if (!mCurrentFilesystem.get()) {
153 throw std::runtime_error(fmt::format(
"Couldn't open file \"{}\"!",
filename));
155 rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
159 mParentFileMap = (TMap*)rootFS->GetFile()->Get(
"parentFiles");
160 if (mParentFileMap && !mParentFileReplacement.empty()) {
161 auto pos = mParentFileReplacement.find(
';');
162 if (
pos == std::string::npos) {
163 throw std::runtime_error(fmt::format(
"Invalid syntax in aod-parent-base-path-replacement: \"{}\"", mParentFileReplacement.c_str()));
165 auto from = mParentFileReplacement.substr(0,
pos);
166 auto to = mParentFileReplacement.substr(
pos + 1);
168 auto it = mParentFileMap->MakeIterator();
169 while (
auto obj = it->Next()) {
170 auto objString = (TObjString*)mParentFileMap->GetValue(obj);
171 objString->String().ReplaceAll(from.c_str(), to.c_str());
177 if (mfilenames[
counter]->numberOfTimeFrames <= 0) {
178 const std::regex TFRegex = std::regex(
"/?DF_([0-9]+)(|-.*)$");
179 TList* keyList = rootFS->GetFile()->GetListOfKeys();
180 std::vector<std::string> finalList;
186 std::unordered_set<size_t> seen;
187 for (
auto key : *keyList) {
188 std::smatch matchResult;
189 std::string keyName = ((TObjString*)
key)->GetString().Data();
190 bool match = std::regex_match(keyName, matchResult, TFRegex);
192 auto folderNumber = std::stoul(matchResult[1].
str());
193 if (seen.find(folderNumber) == seen.end()) {
194 seen.insert(folderNumber);
195 mfilenames[
counter]->listOfTimeFrameNumbers.emplace_back(folderNumber);
200 if (mParentFileMap !=
nullptr) {
202 std::sort(mfilenames[
counter]->listOfTimeFrameNumbers.begin(), mfilenames[
counter]->listOfTimeFrameNumbers.end(),
203 [
this](
long const& l1,
long const& l2) ->
bool {
204 auto p1 = (TObjString*)this->mParentFileMap->GetValue((
"DF_" + std::to_string(l1)).c_str());
205 auto p2 = (TObjString*)this->mParentFileMap->GetValue((
"DF_" + std::to_string(l2)).c_str());
206 return p1->GetString().CompareTo(p2->GetString()) < 0;
209 std::sort(mfilenames[
counter]->listOfTimeFrameNumbers.begin(), mfilenames[
counter]->listOfTimeFrameNumbers.end());
212 mfilenames[
counter]->alreadyRead.resize(mfilenames[
counter]->alreadyRead.size() + mfilenames[
counter]->listOfTimeFrameNumbers.size(),
false);
213 mfilenames[
counter]->numberOfTimeFrames = mfilenames[
counter]->listOfTimeFrameNumbers.size();
217 mCurrentFileStartedAt = uv_hrtime();
232 if (mfilenames[
counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[
counter]->numberOfTimeFrames) {
236 return (mfilenames[
counter]->listOfTimeFrameNumbers)[numTF];
247 if (mfilenames[
counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[
counter]->numberOfTimeFrames) {
251 mfilenames[
counter]->alreadyRead[numTF] =
true;
253 return {fmt::format(
"DF_{}", mfilenames[
counter]->listOfTimeFrameNumbers[numTF]), mCurrentFilesystem};
258 if (!mParentFileMap) {
262 auto folderName = fmt::format(
"DF_{}", mfilenames[
counter]->listOfTimeFrameNumbers[numTF]);
263 auto parentFileName = (TObjString*)mParentFileMap->GetValue(folderName.c_str());
265 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
266 if (!parentFileName) {
267 throw std::runtime_error(fmt::format(R
"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), rootFS->GetFile()->GetName()));
273 auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(mParentFile->mCurrentFilesystem);
274 if (parentFileName->GetString().CompareTo(parentRootFS->GetFile()->GetName()) == 0) {
279 mParentFile =
nullptr;
283 if (mLevel == mAllowedParentLevel) {
284 throw std::runtime_error(fmt::format(R
"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(),
285 rootFS->GetFile()->GetName()));
288 LOGP(info, "Opening parent file {} for DF {}", parentFileName->GetString().Data(), folderName.c_str());
289 mParentFile =
new DataInputDescriptor(mAlienSupport, mLevel + 1, mMonitoring, mAllowedParentLevel, mParentFileReplacement);
290 mParentFile->mdefaultFilenamesPtr =
new std::vector<FileNameHolder*>;
291 mParentFile->mdefaultFilenamesPtr->emplace_back(
makeFileNameHolder(parentFileName->GetString().Data()));
293 mParentFile->
setFile(0, origin);
299 return mfilenames.at(
counter)->numberOfTimeFrames;
305 return std::count(
list.begin(),
list.end(),
true);
310 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
311 auto f =
dynamic_cast<TFile*
>(rootFS->GetFile());
312 std::string monitoringInfo(fmt::format(
"lfn={},size={}",
f->GetName(),
f->GetSize()));
313#if __has_include(<TJAlienFile.h>)
314 auto alienFile =
dynamic_cast<TJAlienFile*
>(
f);
316 monitoringInfo += fmt::format(
",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed());
319 mMonitoring->send(o2::monitoring::Metric{monitoringInfo,
"aod-file-open-info"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
320 LOGP(info,
"Opening file: {}", monitoringInfo);
325 int64_t wait_time = (int64_t)uv_hrtime() - (int64_t)mCurrentFileStartedAt - (int64_t)mIOTime;
329 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
330 auto f =
dynamic_cast<TFile*
>(rootFS->GetFile());
331 std::string monitoringInfo(fmt::format(
"lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}",
f->GetName(),
333 ((
float)mIOTime / 1e9), ((
float)wait_time / 1e9), mLevel));
334#if __has_include(<TJAlienFile.h>)
335 auto alienFile =
dynamic_cast<TJAlienFile*
>(
f);
337 monitoringInfo += fmt::format(
",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed());
340 mMonitoring->send(o2::monitoring::Metric{monitoringInfo,
"aod-file-read-info"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
341 LOGP(info,
"Read info: {}", monitoringInfo);
346 if (mCurrentFilesystem.get()) {
350 mParentFile =
nullptr;
353 delete mParentFileMap;
354 mParentFileMap =
nullptr;
357 mCurrentFilesystem.reset();
369 if (!fileName.empty()) {
372 std::ifstream filelist(fileName);
373 if (!filelist.is_open()) {
374 throw std::runtime_error(fmt::format(R
"(Couldn't open file "{}")", fileName));
376 while (std::getline(filelist, fileName)) {
378 fileName.erase(std::remove_if(fileName.begin(), fileName.end(), ::isspace), fileName.end());
390 if (mdefaultFilenamesPtr) {
391 for (
auto fileNameHolder : *mdefaultFilenamesPtr) {
405 auto dfList = mfilenames[
file]->listOfTimeFrameNumbers;
406 auto it = std::find_if(dfList.begin(), dfList.end(), [dfName](
size_t i) { return fmt::format(
"DF_{}", i) == dfName; });
407 if (it == dfList.end()) {
410 return it - dfList.begin();
425 void (*dump_)(
const char*);
426 if (
void* sym = dlsym(
nullptr,
"igprof_dump_now")) {
427 dump_ = __extension__(
void (*)(
const char*)) sym;
429 std::string
filename = fmt::format(
"reader-memory-dump-{}.gz", uv_hrtime());
453 if (!folder.filesystem()) {
458 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(folder.filesystem());
462 throw std::runtime_error(fmt::format(R
"(Not a TFile filesystem!)"));
465 std::shared_ptr<arrow::dataset::FileFormat>
format;
468 auto fullpath = arrow::dataset::FileSource{folder.path() +
"/" +
treename, folder.filesystem()};
471 auto objectPath = capability.lfn2objectPath(fullpath.path());
472 void* handle = capability.getHandle(rootFS, objectPath);
474 format = capability.factory().format();
475 creator = capability.factory().deferredOutputStreamer;
484 LOGP(
debug,
"Could not find tree {}. Trying in parent file.", fullpath.path());
486 if (parentFile !=
nullptr) {
487 int parentNumTF = parentFile->findDFNumber(0, folder.path());
488 if (parentNumTF == -1) {
489 auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(parentFile->mCurrentFilesystem);
490 throw std::runtime_error(fmt::format(R
"(DF {} listed in parent file map but not found in the corresponding file "{}")", folder.path(), parentRootFS->GetFile()->GetName()));
493 return parentFile->readTree(outputs, dh, 0, parentNumTF,
treename, totalSizeCompressed, totalSizeUncompressed);
495 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
496 throw std::runtime_error(fmt::format(R
"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fullpath.path(), rootFS->GetFile()->GetName()));
499 auto schemaOpt =
format->Inspect(fullpath);
500 auto physicalSchema = schemaOpt;
501 std::vector<std::shared_ptr<arrow::Field>> fields;
502 for (
auto& original : (*schemaOpt)->fields()) {
503 if (original->name().ends_with(
"_size")) {
506 fields.push_back(original);
508 auto datasetSchema = std::make_shared<arrow::Schema>(fields);
510 auto fragment =
format->MakeFragment(fullpath, {}, *physicalSchema);
522 f2b->fill(datasetSchema,
format);
532DataInputDirector::DataInputDirector(std::string inputFile, o2::monitoring::Monitoring* monitoring,
int allowedParentLevel, std::string parentFileReplacement) : mMonitoring(monitoring), mAllowedParentLevel(allowedParentLevel), mParentFileReplacement(
std::move(parentFileReplacement))
534 if (inputFile.size() && inputFile[0] ==
'@') {
535 inputFile.erase(0, 1);
544DataInputDirector::DataInputDirector(std::vector<std::string> inputFiles, o2::monitoring::Monitoring* monitoring,
int allowedParentLevel, std::string parentFileReplacement) : mMonitoring(monitoring), mAllowedParentLevel(allowedParentLevel), mParentFileReplacement(
std::move(parentFileReplacement))
546 for (
auto inputFile : inputFiles) {
555 for (
auto fn : mdefaultInputFiles) {
558 mdefaultInputFiles.clear();
559 mdefaultDataInputDescriptor =
nullptr;
561 for (
auto fn : mdataInputDescriptors) {
564 mdataInputDescriptors.clear();
569 mdataInputDescriptors.clear();
570 mdefaultInputFiles.clear();
571 mFilenameRegex = std::string(
"");
576 if (mdefaultDataInputDescriptor) {
577 delete mdefaultDataInputDescriptor;
579 mdefaultDataInputDescriptor =
new DataInputDescriptor(mAlienSupport, 0, mMonitoring, mAllowedParentLevel, mParentFileReplacement);
584 mdefaultDataInputDescriptor->
tablename =
"any";
585 mdefaultDataInputDescriptor->
treename =
"any";
594 FILE*
f = fopen(fnjson.c_str(),
"r");
596 LOGP(error,
"Could not open JSON file \"{}\"!", fnjson);
601 char readBuffer[65536];
602 FileReadStream inputStream(
f, readBuffer,
sizeof(readBuffer));
606 jsonDoc.ParseStream(inputStream);
607 auto status = readJsonDocument(&jsonDoc);
615bool DataInputDirector::readJsonDocument(Document* jsonDoc)
618 std::string fileName(
"");
619 const char* itemName;
622 if (jsonDoc->HasParseError()) {
623 LOGP(error,
"Check the JSON document! There is a problem with the format!");
628 itemName =
"InputDirector";
629 const Value& didirItem = (*jsonDoc)[itemName];
630 if (!didirItem.IsObject()) {
631 LOGP(info,
"No \"{}\" object found in the JSON document!", itemName);
636 itemName =
"debugmode";
637 if (didirItem.HasMember(itemName)) {
638 if (didirItem[itemName].IsBool()) {
639 mDebugMode = (didirItem[itemName].GetBool());
641 LOGP(error,
"Check the JSON document! Item \"{}\" must be a boolean!", itemName);
651 PrettyWriter<StringBuffer> writer(
buffer);
652 didirItem.Accept(writer);
653 LOGP(info,
"InputDirector object: {}", std::string(
buffer.GetString()));
656 itemName =
"fileregex";
657 if (didirItem.HasMember(itemName)) {
658 if (didirItem[itemName].IsString()) {
661 LOGP(error,
"Check the JSON document! Item \"{}\" must be a string!", itemName);
666 itemName =
"resfiles";
667 if (didirItem.HasMember(itemName)) {
668 if (didirItem[itemName].IsString()) {
669 fileName = didirItem[itemName].GetString();
670 if (fileName.size() && fileName[0] ==
'@') {
671 fileName.erase(0, 1);
677 }
else if (didirItem[itemName].IsArray()) {
679 auto fns = didirItem[itemName].GetArray();
680 for (
auto& fn : fns) {
684 LOGP(error,
"Check the JSON document! Item \"{}\" must be a string or an array!", itemName);
689 itemName =
"InputDescriptors";
690 if (didirItem.HasMember(itemName)) {
691 if (!didirItem[itemName].IsArray()) {
692 LOGP(error,
"Check the JSON document! Item \"{}\" must be an array!", itemName);
697 for (
auto& didescItem : didirItem[itemName].GetArray()) {
698 if (!didescItem.IsObject()) {
699 LOGP(error,
"Check the JSON document! \"{}\" must be objects!", itemName);
703 auto didesc =
new DataInputDescriptor(mAlienSupport, 0, mMonitoring, mAllowedParentLevel, mParentFileReplacement);
704 didesc->setDefaultInputfiles(&mdefaultInputFiles);
707 if (didescItem.HasMember(itemName)) {
708 if (didescItem[itemName].IsString()) {
709 didesc->tablename = didescItem[itemName].GetString();
712 LOGP(error,
"Check the JSON document! Item \"{}\" must be a string!", itemName);
716 LOGP(error,
"Check the JSON document! Item \"{}\" is missing!", itemName);
720 itemName =
"treename";
721 if (didescItem.HasMember(itemName)) {
722 if (didescItem[itemName].IsString()) {
723 didesc->treename = didescItem[itemName].GetString();
725 LOGP(error,
"Check the JSON document! Item \"{}\" must be a string!", itemName);
730 didesc->treename =
m[2];
733 itemName =
"fileregex";
734 if (didescItem.HasMember(itemName)) {
735 if (didescItem[itemName].IsString()) {
736 if (didesc->getNumberInputfiles() == 0) {
737 didesc->setFilenamesRegex(didescItem[itemName].GetString());
740 LOGP(error,
"Check the JSON document! Item \"{}\" must be a string!", itemName);
744 if (didesc->getNumberInputfiles() == 0) {
745 didesc->setFilenamesRegex(mFilenameRegexPtr);
749 itemName =
"resfiles";
750 if (didescItem.HasMember(itemName)) {
751 if (didescItem[itemName].IsString()) {
752 fileName = didescItem[itemName].GetString();
753 if (fileName.size() && fileName[0] ==
'@') {
754 didesc->setInputfilesFile(fileName.erase(0, 1));
756 if (didesc->getFilenamesRegexString().empty() ||
757 std::regex_match(fileName, didesc->getFilenamesRegex())) {
761 }
else if (didescItem[itemName].IsArray()) {
762 auto fns = didescItem[itemName].GetArray();
763 for (
auto& fn : fns) {
764 if (didesc->getFilenamesRegexString().empty() ||
765 std::regex_match(fn.GetString(), didesc->getFilenamesRegex())) {
770 LOGP(error,
"Check the JSON document! Item \"{}\" must be a string or an array!", itemName);
774 didesc->setInputfilesFile(minputfilesFilePtr);
778 if (didesc->fillInputfiles() > 0) {
779 mdataInputDescriptors.emplace_back(didesc);
782 LOGP(info,
"This DataInputDescriptor is ignored because its file list is empty!");
784 mAlienSupport &= didesc->isAlienSupportOn();
812 for (
auto didesc : mdataInputDescriptors) {
813 if (didesc->matcher->match(dh, context)) {
827 didesc = mdefaultDataInputDescriptor;
831 return didesc->getFileFolder(
counter, numTF, origin);
839 didesc = mdefaultDataInputDescriptor;
850 didesc = mdefaultDataInputDescriptor;
854 return didesc->getTimeFrameNumber(
counter, numTF, origin);
859 std::string treename;
864 treename = didesc->treename;
869 didesc = mdefaultDataInputDescriptor;
874 auto result = didesc->readTree(outputs, dh,
counter, numTF, treename, totalSizeCompressed, totalSizeUncompressed);
881 for (
auto didesc : mdataInputDescriptors) {
882 didesc->closeInputFile();
886bool DataInputDirector::isValid()
890 for (
auto didesc : mdataInputDescriptors) {
891 status &= didesc->getNumberInputfiles() == numberFiles;
900 for (
auto didesc : mdataInputDescriptors) {
901 status &= (didesc->getNumberInputfiles() <=
counter);
909 LOGP(info,
"DataInputDirector");
910 LOGP(info,
" Default input files file : {}", minputfilesFile);
911 LOGP(info,
" Default file name regex : {}", mFilenameRegex);
912 LOGP(info,
" Default file names : {}", mdefaultInputFiles.size());
913 for (
auto const& fn : mdefaultInputFiles) {
914 LOGP(info,
" {} {}", fn->fileName, fn->numberOfTimeFrames);
916 LOGP(info,
" Default DataInputDescriptor:");
917 mdefaultDataInputDescriptor->
printOut();
919 for (
auto const& didesc : mdataInputDescriptors) {
o2::monitoring::tags::Value Value
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ACTION(log, callback)
decltype(auto) make(const Output &spec, Args... args)
std::function< std::shared_ptr< arrow::io::OutputStream >(std::shared_ptr< arrow::dataset::FileFragment >, const std::shared_ptr< arrow::ResizableBuffer > &buffer)> StreamerCreator
void setLabel(const char *label)
bool match(const std::vector< std::string > &queries, const char *pattern)
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLint GLint GLsizei GLint GLenum format
std::string getTreeName(header::DataHeader dh)
Defining PrimaryVertex explicitly as messageable.
FileNameHolder * makeFileNameHolder(std::string fileName)
Defining DataPointCompositeObject explicitly as copiable.
CalculateDelta(uint64_t &target)
static std::unique_ptr< data_matcher::DataDescriptorMatcher > buildNode(std::string const &nodeString)
static std::vector< std::string > getTokens(std::string const &nodeString)
static std::vector< LoadablePlugin > parsePluginSpecString(char const *str)
Parse a comma separated list of <library>:<plugin-name> plugin declarations.
std::vector< RootObjectReadingCapability > capabilities