21#include "Monitoring/Tags.h"
22#include "Monitoring/Metric.h"
23#include "Monitoring/Monitoring.h"
25#include "rapidjson/document.h"
26#include "rapidjson/prettywriter.h"
27#include "rapidjson/filereadstream.h"
30#include "TObjString.h"
34#include <arrow/dataset/file_base.h>
35#include <arrow/dataset/dataset.h>
39#if __has_include(<TJAlienFile.h>)
40#include <TJAlienFile.h>
50using namespace rapidjson;
61 : mAlienSupport(alienSupport),
65 std::vector<char const*> capabilitiesSpecs = {
66 "O2Framework:RNTupleObjectReadingCapability",
67 "O2Framework:TTreeObjectReadingCapability",
70 std::vector<LoadablePlugin> plugins;
71 for (
auto spec : capabilitiesSpecs) {
73 for (
auto& extra : morePlugins) {
74 plugins.push_back(extra);
78 PluginManager::loadFromPlugin<RootObjectReadingCapability, RootObjectReadingCapabilityPlugin>(plugins, mFactory.
capabilities);
83 LOGP(info,
"DataInputDescriptor");
84 LOGP(info,
" Table name : {}",
tablename);
85 LOGP(info,
" Tree name : {}",
treename);
88 LOGP(info,
" Input files : {}", mfilenames.size());
89 for (
auto& fn : mfilenames) {
90 LOGP(info,
" {} {}", fn.fileName, fn.numberOfTimeFrames);
97 return (minputfilesFile.empty() && minputfilesFilePtr) ? (std::string)*minputfilesFilePtr : minputfilesFile;
102 return (mFilenameRegex.empty() && mFilenameRegexPtr) ? (std::string)*mFilenameRegexPtr : mFilenameRegex;
113 if (fn.
fileName.rfind(
"file://", 0) == 0) {
115 }
else if (!mAlienSupport && fn.
fileName.rfind(
"alien://", 0) == 0 && !gGrid) {
116 LOGP(
debug,
"AliEn file requested. Enabling support.");
117 TGrid::Connect(
"alien://");
118 mAlienSupport =
true;
122 mfilenames.emplace_back(fn);
138 if (wantedParentLevel == -1 && !
origin.starts_with(
"AOD")) {
143 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
145 if (rootFS->GetFile()->GetName() ==
filename) {
151 TFile* tfile =
nullptr;
152 bool externalFile =
false;
160 if (tfile ==
nullptr) {
161 tfile = TFile::Open(
filename.c_str());
163 mCurrentFilesystem = std::make_shared<TFileFileSystem>(tfile, 50 * 1024 * 1024, mFactory, !externalFile);
164 if (!mCurrentFilesystem.get()) {
165 throw std::runtime_error(fmt::format(
"Couldn't open file \"{}\"!",
filename));
167 rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
171 mParentFileMap = (TMap*)rootFS->GetFile()->Get(
"parentFiles");
174 if (
pos == std::string::npos) {
175 throw std::runtime_error(fmt::format(
"Invalid syntax in aod-parent-base-path-replacement: \"{}\"", mContext.
parentFileReplacement.c_str()));
180 auto it = mParentFileMap->MakeIterator();
181 while (
auto obj = it->Next()) {
182 auto objString = (TObjString*)mParentFileMap->GetValue(obj);
183 objString->String().ReplaceAll(from.c_str(), to.c_str());
189 if (mfilenames[
counter].numberOfTimeFrames <= 0) {
190 const std::regex TFRegex = std::regex(
"/?DF_([0-9]+)(|-.*)$");
191 TList* keyList = rootFS->GetFile()->GetListOfKeys();
192 std::vector<std::string> finalList;
198 std::unordered_set<size_t> seen;
199 for (
auto key : *keyList) {
200 std::smatch matchResult;
201 std::string keyName = ((TObjString*)
key)->GetString().Data();
202 bool match = std::regex_match(keyName, matchResult, TFRegex);
204 auto folderNumber = std::stoul(matchResult[1].
str());
205 if (seen.find(folderNumber) == seen.end()) {
206 seen.insert(folderNumber);
207 mfilenames[
counter].listOfTimeFrameNumbers.emplace_back(folderNumber);
212 if (mParentFileMap !=
nullptr) {
214 std::ranges::sort(mfilenames[
counter].listOfTimeFrameNumbers,
215 [
this](
long const& l1,
long const& l2) ->
bool {
216 auto p1 = (TObjString*)this->mParentFileMap->GetValue((
"DF_" +
std::to_string(l1)).c_str());
217 auto p2 = (TObjString*)this->mParentFileMap->GetValue((
"DF_" +
std::to_string(l2)).c_str());
218 return p1->GetString().CompareTo(
p2->GetString()) < 0;
221 std::sort(mfilenames[
counter].listOfTimeFrameNumbers.begin(), mfilenames[
counter].listOfTimeFrameNumbers.end());
224 mfilenames[
counter].alreadyRead.resize(mfilenames[
counter].alreadyRead.size() + mfilenames[
counter].listOfTimeFrameNumbers.size(),
false);
225 mfilenames[
counter].numberOfTimeFrames = mfilenames[
counter].listOfTimeFrameNumbers.size();
229 mCurrentFileStartedAt = uv_hrtime();
244 if (mfilenames[
counter].numberOfTimeFrames > 0 && numTF >= mfilenames[
counter].numberOfTimeFrames) {
248 return (mfilenames[
counter].listOfTimeFrameNumbers)[numTF];
254 return {
nullptr, -1};
256 auto folderName = fmt::format(
"DF_{}", mfilenames[
counter].listOfTimeFrameNumbers[numTF]);
258 if (parentFile ==
nullptr) {
259 return {
nullptr, -1};
261 return {parentFile, parentFile->findDFNumber(0, folderName)};
267 if ((wantedParentLevel != -1) && (mLevel < wantedParentLevel)) {
269 if (parentFile ==
nullptr || parentNumTF == -1) {
272 return parentFile->getFileFolder(0, parentNumTF, wantedParentLevel, wantedOrigin);
281 if ((mfilenames[
counter].numberOfTimeFrames > 0) && (numTF >= mfilenames[
counter].numberOfTimeFrames)) {
285 mfilenames[
counter].alreadyRead[numTF] =
true;
287 return {fmt::format(
"DF_{}", mfilenames[
counter].listOfTimeFrameNumbers[numTF]), mCurrentFilesystem};
292 if (!mParentFileMap) {
297 auto folderName = fmt::format(
"DF_{}", mfilenames[
counter].listOfTimeFrameNumbers[numTF]);
298 auto parentFileName = (TObjString*)mParentFileMap->GetValue(folderName.c_str());
300 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
301 if (!parentFileName) {
302 throw std::runtime_error(fmt::format(R
"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), rootFS->GetFile()->GetName()));
308 auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(mParentFile->mCurrentFilesystem);
309 if (parentFileName->GetString().CompareTo(parentRootFS->GetFile()->GetName()) == 0) {
312 mParentFile->closeInputFile();
318 throw std::runtime_error(fmt::format(R
"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mContext.allowedParentLevel, folderName.c_str(),
319 rootFS->GetFile()->GetName()));
322 LOGP(info, "Opening parent file {} for DF {}", parentFileName->GetString().Data(), folderName.c_str());
323 mParentFile = std::make_shared<DataInputDescriptor>(mAlienSupport, mLevel + 1, mContext);
324 mParentFile->mdefaultFilenamesPtr.emplace_back(
makeFileNameHolder(parentFileName->GetString().Data()));
325 mParentFile->fillInputfiles();
326 mParentFile->setFile(0, wantedParentLevel, wantedOrigin);
332 return mfilenames.at(
counter).numberOfTimeFrames;
337 auto& list = mfilenames.at(
counter).alreadyRead;
338 return std::count(list.begin(), list.end(),
true);
343 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
344 auto f =
dynamic_cast<TFile*
>(rootFS->GetFile());
345 std::string monitoringInfo(fmt::format(
"lfn={},size={}",
f->GetName(),
f->GetSize()));
346#if __has_include(<TJAlienFile.h>)
347 auto alienFile =
dynamic_cast<TJAlienFile*
>(
f);
349 monitoringInfo += fmt::format(
",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed());
353 mContext.
monitoring->send(o2::monitoring::Metric{monitoringInfo,
"aod-file-open-info"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
355 LOGP(info,
"Opening file: {}", monitoringInfo);
360 int64_t wait_time = (int64_t)uv_hrtime() - (int64_t)mCurrentFileStartedAt - (int64_t)mIOTime;
364 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
365 auto f =
dynamic_cast<TFile*
>(rootFS->GetFile());
366 std::string monitoringInfo(fmt::format(
"lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}",
f->GetName(),
368 ((
float)mIOTime / 1e9), ((
float)wait_time / 1e9), mLevel));
369#if __has_include(<TJAlienFile.h>)
370 auto alienFile =
dynamic_cast<TJAlienFile*
>(
f);
372 monitoringInfo += fmt::format(
",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed());
376 mContext.
monitoring->send(o2::monitoring::Metric{monitoringInfo,
"aod-file-read-info"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
378 LOGP(info,
"Read info: {}", monitoringInfo);
383 if (mCurrentFilesystem.get()) {
385 mParentFile->closeInputFile();
389 delete mParentFileMap;
390 mParentFileMap =
nullptr;
393 mCurrentFilesystem.reset();
405 if (!fileName.empty()) {
408 std::ifstream filelist(fileName);
409 if (!filelist.is_open()) {
410 throw std::runtime_error(fmt::format(R
"(Couldn't open file "{}")", fileName));
412 while (std::getline(filelist, fileName)) {
414 fileName.erase(std::remove_if(fileName.begin(), fileName.end(), ::isspace), fileName.end());
426 if (!mdefaultFilenamesPtr.empty()) {
427 for (
auto& fileNameHolder : mdefaultFilenamesPtr) {
441 auto dfList = mfilenames[file].listOfTimeFrameNumbers;
442 auto it = std::find_if(dfList.begin(), dfList.end(), [dfName](
size_t i) { return fmt::format(
"DF_{}", i) == dfName; });
443 if (it == dfList.end()) {
446 return it - dfList.begin();
461 void (*dump_)(
const char*);
462 if (
void* sym = dlsym(
nullptr,
"igprof_dump_now")) {
463 dump_ = __extension__(
void (*)(
const char*)) sym;
465 std::string
filename = fmt::format(
"reader-memory-dump-{}.gz", uv_hrtime());
487 std::string wantedOrigin = dh.
dataOrigin.
as<std::string>();
492 if (wantedLevel != -1 && mLevel < wantedLevel) {
494 if (parentFile ==
nullptr) {
495 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
496 throw std::runtime_error(fmt::format(R
"(No parent file found for "{}" while looking for level {} in "{}")", treename, wantedLevel, rootFS->GetFile()->GetName()));
498 if (parentNumTF == -1) {
499 auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(parentFile->mCurrentFilesystem);
500 throw std::runtime_error(fmt::format(R
"(DF not found in parent file "{}")", parentRootFS->GetFile()->GetName()));
503 return parentFile->readTree(outputs, dh, 0, parentNumTF,
treename, totalSizeCompressed, totalSizeUncompressed);
507 if (!folder.filesystem()) {
512 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(folder.filesystem());
516 throw std::runtime_error(fmt::format(R
"(Not a TFile filesystem!)"));
519 std::shared_ptr<arrow::dataset::FileFormat>
format;
522 auto fullpath = arrow::dataset::FileSource{folder.path() +
"/" +
treename, folder.filesystem()};
525 auto objectPath = capability.lfn2objectPath(fullpath.path());
526 void* handle = capability.getHandle(rootFS, objectPath);
528 format = capability.factory().format();
529 creator = capability.factory().deferredOutputStreamer;
538 LOGP(
debug,
"Could not find tree {}. Trying in parent file.", fullpath.path());
540 if (parentFile !=
nullptr) {
541 int parentNumTF = parentFile->findDFNumber(0, folder.path());
542 if (parentNumTF == -1) {
543 auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(parentFile->mCurrentFilesystem);
544 throw std::runtime_error(fmt::format(R
"(DF {} listed in parent file map but not found in the corresponding file "{}")", folder.path(), parentRootFS->GetFile()->GetName()));
547 return parentFile->readTree(outputs, dh, 0, parentNumTF,
treename, totalSizeCompressed, totalSizeUncompressed);
549 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
550 throw std::runtime_error(fmt::format(R
"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fullpath.path(), rootFS->GetFile()->GetName()));
553 auto schemaOpt =
format->Inspect(fullpath);
554 auto physicalSchema = schemaOpt;
555 std::vector<std::shared_ptr<arrow::Field>>
fields;
556 for (
auto& original : (*schemaOpt)->fields()) {
557 if (original->name().ends_with(
"_size")) {
560 fields.push_back(original);
562 auto datasetSchema = std::make_shared<arrow::Schema>(
fields);
564 auto fragment =
format->MakeFragment(fullpath, {}, *physicalSchema);
576 f2b->fill(datasetSchema,
format);
584 if (inputFiles.size() == 1 && !inputFiles[0].empty() && inputFiles[0][0] ==
'@') {
587 for (
auto inputFile : inputFiles) {
597 mdefaultInputFiles.clear();
598 mdefaultDataInputDescriptor =
nullptr;
600 mdataInputDescriptors.clear();
605 mdataInputDescriptors.clear();
606 mdefaultInputFiles.clear();
607 mFilenameRegex = std::string(
"");
612 if (mdefaultDataInputDescriptor) {
613 mdefaultDataInputDescriptor.reset();
615 mdefaultDataInputDescriptor = std::make_shared<DataInputDescriptor>(mAlienSupport, 0, mContext);
617 mdefaultDataInputDescriptor->setInputfilesFile(minputfilesFile);
618 mdefaultDataInputDescriptor->setFilenamesRegex(mFilenameRegex);
619 mdefaultDataInputDescriptor->setDefaultInputfiles(mdefaultInputFiles);
620 mdefaultDataInputDescriptor->tablename =
"any";
621 mdefaultDataInputDescriptor->treename =
"any";
622 mdefaultDataInputDescriptor->fillInputfiles();
624 mAlienSupport &= mdefaultDataInputDescriptor->isAlienSupportOn();
630 FILE*
f = fopen(fnjson.c_str(),
"r");
632 LOGP(error,
"Could not open JSON file \"{}\"!", fnjson);
637 char readBuffer[65536];
638 FileReadStream inputStream(
f, readBuffer,
sizeof(readBuffer));
642 jsonDoc.ParseStream(inputStream);
643 auto status = readJsonDocument(&jsonDoc);
651bool DataInputDirector::readJsonDocument(Document* jsonDoc)
654 std::string fileName(
"");
655 const char* itemName;
658 if (jsonDoc->HasParseError()) {
659 LOGP(error,
"Check the JSON document! There is a problem with the format!");
664 itemName =
"InputDirector";
665 const Value& didirItem = (*jsonDoc)[itemName];
666 if (!didirItem.IsObject()) {
667 LOGP(info,
"No \"{}\" object found in the JSON document!", itemName);
672 itemName =
"debugmode";
673 if (didirItem.HasMember(itemName)) {
674 if (didirItem[itemName].IsBool()) {
675 mDebugMode = (didirItem[itemName].GetBool());
677 LOGP(error,
"Check the JSON document! Item \"{}\" must be a boolean!", itemName);
687 PrettyWriter<StringBuffer> writer(
buffer);
688 didirItem.Accept(writer);
689 LOGP(info,
"InputDirector object: {}", std::string(
buffer.GetString()));
692 itemName =
"fileregex";
693 if (didirItem.HasMember(itemName)) {
694 if (didirItem[itemName].IsString()) {
697 LOGP(error,
"Check the JSON document! Item \"{}\" must be a string!", itemName);
702 itemName =
"resfiles";
703 if (didirItem.HasMember(itemName)) {
704 if (didirItem[itemName].IsString()) {
705 fileName = didirItem[itemName].GetString();
706 if (fileName.size() && fileName[0] ==
'@') {
707 fileName.erase(0, 1);
713 }
else if (didirItem[itemName].IsArray()) {
715 auto fns = didirItem[itemName].GetArray();
716 for (
auto& fn : fns) {
720 LOGP(error,
"Check the JSON document! Item \"{}\" must be a string or an array!", itemName);
725 itemName =
"InputDescriptors";
726 if (didirItem.HasMember(itemName)) {
727 if (!didirItem[itemName].IsArray()) {
728 LOGP(error,
"Check the JSON document! Item \"{}\" must be an array!", itemName);
733 for (
auto& didescItem : didirItem[itemName].GetArray()) {
734 if (!didescItem.IsObject()) {
735 LOGP(error,
"Check the JSON document! \"{}\" must be objects!", itemName);
739 auto didesc = DataInputDescriptor(mAlienSupport, 0, mContext);
740 didesc.setDefaultInputfiles(mdefaultInputFiles);
743 if (didescItem.HasMember(itemName)) {
744 if (didescItem[itemName].IsString()) {
745 didesc.tablename = didescItem[itemName].GetString();
748 LOGP(error,
"Check the JSON document! Item \"{}\" must be a string!", itemName);
752 LOGP(error,
"Check the JSON document! Item \"{}\" is missing!", itemName);
756 itemName =
"treename";
757 if (didescItem.HasMember(itemName)) {
758 if (didescItem[itemName].IsString()) {
759 didesc.treename = didescItem[itemName].GetString();
761 LOGP(error,
"Check the JSON document! Item \"{}\" must be a string!", itemName);
766 didesc.treename =
m[2];
769 itemName =
"fileregex";
770 if (didescItem.HasMember(itemName)) {
771 if (didescItem[itemName].IsString()) {
772 if (didesc.getNumberInputfiles() == 0) {
773 didesc.setFilenamesRegex(didescItem[itemName].GetString());
776 LOGP(error,
"Check the JSON document! Item \"{}\" must be a string!", itemName);
780 if (didesc.getNumberInputfiles() == 0) {
781 didesc.setFilenamesRegex(mFilenameRegexPtr);
785 itemName =
"resfiles";
786 if (didescItem.HasMember(itemName)) {
787 if (didescItem[itemName].IsString()) {
788 fileName = didescItem[itemName].GetString();
789 if (fileName.size() && fileName[0] ==
'@') {
790 didesc.setInputfilesFile(fileName.erase(0, 1));
792 if (didesc.getFilenamesRegexString().empty() ||
793 std::regex_match(fileName, didesc.getFilenamesRegex())) {
797 }
else if (didescItem[itemName].IsArray()) {
798 auto fns = didescItem[itemName].GetArray();
799 for (
auto& fn : fns) {
800 if (didesc.getFilenamesRegexString().empty() ||
801 std::regex_match(fn.GetString(), didesc.getFilenamesRegex())) {
806 LOGP(error,
"Check the JSON document! Item \"{}\" must be a string or an array!", itemName);
810 didesc.setInputfilesFile(minputfilesFilePtr);
814 if (didesc.fillInputfiles() > 0) {
815 mdataInputDescriptors.emplace_back(didesc);
818 LOGP(info,
"This DataInputDescriptor is ignored because its file list is empty!");
820 mAlienSupport &= didesc.isAlienSupportOn();
846 for (
auto& didesc : mdataInputDescriptors) {
847 if (didesc.matcher->match(dh, context)) {
860 didesc = mdefaultDataInputDescriptor.get();
865 return didesc->getFileFolder(
counter, numTF, wantedLevel,
origin);
873 didesc = mdefaultDataInputDescriptor.get();
876 return didesc->getTimeFramesInFile(
counter);
884 didesc = mdefaultDataInputDescriptor.get();
889 return didesc->getTimeFrameNumber(
counter, numTF, wantedLevel,
origin);
894 std::string treename;
899 treename = didesc->treename;
904 didesc = mdefaultDataInputDescriptor.get();
909 auto result = didesc->readTree(outputs, dh,
counter, numTF, treename, totalSizeCompressed, totalSizeUncompressed);
915 mdefaultDataInputDescriptor->closeInputFile();
916 for (
auto& didesc : mdataInputDescriptors) {
917 didesc.closeInputFile();
921bool DataInputDirector::isValid()
924 int numberFiles = mdefaultDataInputDescriptor->getNumberInputfiles();
925 for (
auto& didesc : mdataInputDescriptors) {
926 status &= didesc.getNumberInputfiles() == numberFiles;
934 bool status = mdefaultDataInputDescriptor->getNumberInputfiles() <=
counter;
935 for (
auto& didesc : mdataInputDescriptors) {
936 status &= (didesc.getNumberInputfiles() <=
counter);
944 LOGP(info,
"DataInputDirector");
945 LOGP(info,
" Default input files file : {}", minputfilesFile);
946 LOGP(info,
" Default file name regex : {}", mFilenameRegex);
947 LOGP(info,
" Default file names : {}", mdefaultInputFiles.size());
948 for (
auto const& fn : mdefaultInputFiles) {
949 LOGP(info,
" {} {}", fn.fileName, fn.numberOfTimeFrames);
951 LOGP(info,
" Default DataInputDescriptor:");
952 mdefaultDataInputDescriptor->printOut();
954 for (
auto const& didesc : mdataInputDescriptors) {
header::DataOrigin origin
o2::monitoring::tags::Value Value
std::vector< std::shared_ptr< arrow::Field > > fields
constexpr int p1()
constexpr to accelerate the coordinates changing
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_ACTION(log, callback)
decltype(auto) make(const Output &spec, Args... args)
std::function< std::shared_ptr< arrow::io::OutputStream >(std::shared_ptr< arrow::dataset::FileFragment >, const std::shared_ptr< arrow::ResizableBuffer > &buffer)> StreamerCreator
void setLabel(const char *label)
bool match(const std::vector< std::string > &queries, const char *pattern)
GLuint const GLchar * name
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLint GLint GLsizei GLint GLenum format
std::string getTreeName(header::DataHeader dh, bool wasAOD)
Defining ITS Vertex explicitly as messageable.
FileNameHolder makeFileNameHolder(std::string fileName)
std::string to_string(gsl::span< T, Size > span)
CalculateDelta(uint64_t &target)
static std::unique_ptr< data_matcher::DataDescriptorMatcher > buildNode(std::string const &nodeString)
static std::vector< std::string > getTokens(std::string const &nodeString)
std::string parentFileReplacement
o2::monitoring::Monitoring * monitoring
std::vector< std::pair< std::string, TFile * > > openFiles
int levelForOrigin(std::string_view origin) const
static std::vector< LoadablePlugin > parsePluginSpecString(char const *str)
Parse a comma separated list of <library>:<plugin-name> plugin declarations.
std::vector< RootObjectReadingCapability > capabilities