Project
Loading...
Searching...
No Matches
DataInputDirector.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "DataInputDirector.h"
13#include "Framework/Logger.h"
17#include "Framework/Output.h"
18#include "Framework/Signpost.h"
20#include "Headers/DataHeader.h"
21#include "Monitoring/Tags.h"
22#include "Monitoring/Metric.h"
23#include "Monitoring/Monitoring.h"
24
25#include "rapidjson/document.h"
26#include "rapidjson/prettywriter.h"
27#include "rapidjson/filereadstream.h"
28
29#include "TGrid.h"
30#include "TObjString.h"
31#include "TMap.h"
32#include "TFile.h"
33
34#include <arrow/dataset/file_base.h>
35#include <arrow/dataset/dataset.h>
36#include <uv.h>
37#include <memory>
38
39#if __has_include(<TJAlienFile.h>)
40#include <TJAlienFile.h>
41
42#include <utility>
43#endif
44
45#include <dlfcn.h>
46O2_DECLARE_DYNAMIC_LOG(reader_memory_dump);
47
48namespace o2::framework
49{
50using namespace rapidjson;
51
52FileNameHolder* makeFileNameHolder(std::string fileName)
53{
54 auto fileNameHolder = new FileNameHolder();
55 fileNameHolder->fileName = fileName;
56
57 return fileNameHolder;
58}
59
61 : mAlienSupport(alienSupport),
62 mLevel(level),
63 mContext(context)
64{
65 std::vector<char const*> capabilitiesSpecs = {
66 "O2Framework:RNTupleObjectReadingCapability",
67 "O2Framework:TTreeObjectReadingCapability",
68 };
69
70 std::vector<LoadablePlugin> plugins;
71 for (auto spec : capabilitiesSpecs) {
72 auto morePlugins = PluginManager::parsePluginSpecString(spec);
73 for (auto& extra : morePlugins) {
74 plugins.push_back(extra);
75 }
76 }
77
78 PluginManager::loadFromPlugin<RootObjectReadingCapability, RootObjectReadingCapabilityPlugin>(plugins, mFactory.capabilities);
79}
80
82{
83 LOGP(info, "DataInputDescriptor");
84 LOGP(info, " Table name : {}", tablename);
85 LOGP(info, " Tree name : {}", treename);
86 LOGP(info, " Input files file : {}", getInputfilesFilename());
87 LOGP(info, " File name regex : {}", getFilenamesRegexString());
88 LOGP(info, " Input files : {}", mfilenames.size());
89 for (auto fn : mfilenames) {
90 LOGP(info, " {} {}", fn->fileName, fn->numberOfTimeFrames);
91 }
92 LOGP(info, " Total number of TF: {}", getNumberTimeFrames());
93}
94
96{
97 return (minputfilesFile.empty() && minputfilesFilePtr) ? (std::string)*minputfilesFilePtr : minputfilesFile;
98}
99
101{
102 return (mFilenameRegex.empty() && mFilenameRegexPtr) ? (std::string)*mFilenameRegexPtr : mFilenameRegex;
103}
104
106{
107 return std::regex(getFilenamesRegexString());
108}
109
111{
112 // remove leading file:// from file name
113 if (fn->fileName.rfind("file://", 0) == 0) {
114 fn->fileName.erase(0, 7);
115 } else if (!mAlienSupport && fn->fileName.rfind("alien://", 0) == 0 && !gGrid) {
116 LOGP(debug, "AliEn file requested. Enabling support.");
117 TGrid::Connect("alien://");
118 mAlienSupport = true;
119 }
120
121 mtotalNumberTimeFrames += fn->numberOfTimeFrames;
122 mfilenames.emplace_back(fn);
123}
124
125bool DataInputDescriptor::setFile(int counter, std::string_view origin)
126{
127 // no files left
128 if (counter >= getNumberInputfiles()) {
129 return false;
130 }
131
132 // In case the origin starts with a anything but AOD, we add the origin as the suffix
133 // of the filename. In the future we might expand this for proper rewriting of the
134 // filename based on the origin and the original file information.
135 std::string filename = mfilenames[counter]->fileName;
136 if (!origin.starts_with("AOD")) {
137 filename = std::regex_replace(filename, std::regex("[.]root$"), fmt::format("_{}.root", origin));
138 }
139
140 // open file
141 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
142 if (rootFS.get()) {
143 if (rootFS->GetFile()->GetName() == filename) {
144 return true;
145 }
147 }
148
149 mCurrentFilesystem = std::make_shared<TFileFileSystem>(TFile::Open(filename.c_str()), 50 * 1024 * 1024, mFactory);
150 if (!mCurrentFilesystem.get()) {
151 throw std::runtime_error(fmt::format("Couldn't open file \"{}\"!", filename));
152 }
153 rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
155
156 // get the parent file map if exists
157 mParentFileMap = (TMap*)rootFS->GetFile()->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path)
158 if (mParentFileMap && !mContext.parentFileReplacement.empty()) {
159 auto pos = mContext.parentFileReplacement.find(';');
160 if (pos == std::string::npos) {
161 throw std::runtime_error(fmt::format("Invalid syntax in aod-parent-base-path-replacement: \"{}\"", mContext.parentFileReplacement.c_str()));
162 }
163 auto from = mContext.parentFileReplacement.substr(0, pos);
164 auto to = mContext.parentFileReplacement.substr(pos + 1);
165
166 auto it = mParentFileMap->MakeIterator();
167 while (auto obj = it->Next()) {
168 auto objString = (TObjString*)mParentFileMap->GetValue(obj);
169 objString->String().ReplaceAll(from.c_str(), to.c_str());
170 }
171 delete it;
172 }
173
174 // get the directory names
175 if (mfilenames[counter]->numberOfTimeFrames <= 0) {
176 const std::regex TFRegex = std::regex("/?DF_([0-9]+)(|-.*)$");
177 TList* keyList = rootFS->GetFile()->GetListOfKeys();
178 std::vector<std::string> finalList;
179
180 // extract TF numbers and sort accordingly
181 // We use an extra seen set to make sure we preserve the order in which
182 // we instert things in the final list and to make sure we do not have duplicates.
183 // Multiple folder numbers can happen if we use a flat structure /DF_<df>-<tablename>
184 std::unordered_set<size_t> seen;
185 for (auto key : *keyList) {
186 std::smatch matchResult;
187 std::string keyName = ((TObjString*)key)->GetString().Data();
188 bool match = std::regex_match(keyName, matchResult, TFRegex);
189 if (match) {
190 auto folderNumber = std::stoul(matchResult[1].str());
191 if (seen.find(folderNumber) == seen.end()) {
192 seen.insert(folderNumber);
193 mfilenames[counter]->listOfTimeFrameNumbers.emplace_back(folderNumber);
194 }
195 }
196 }
197
198 if (mParentFileMap != nullptr) {
199 // If we have a parent map, we should not process in DF alphabetical order but according to parent file to avoid swapping between files
200 std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end(),
201 [this](long const& l1, long const& l2) -> bool {
202 auto p1 = (TObjString*)this->mParentFileMap->GetValue(("DF_" + std::to_string(l1)).c_str());
203 auto p2 = (TObjString*)this->mParentFileMap->GetValue(("DF_" + std::to_string(l2)).c_str());
204 return p1->GetString().CompareTo(p2->GetString()) < 0;
205 });
206 } else {
207 std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end());
208 }
209
210 mfilenames[counter]->alreadyRead.resize(mfilenames[counter]->alreadyRead.size() + mfilenames[counter]->listOfTimeFrameNumbers.size(), false);
211 mfilenames[counter]->numberOfTimeFrames = mfilenames[counter]->listOfTimeFrameNumbers.size();
212 }
213
214 mCurrentFileID = counter;
215 mCurrentFileStartedAt = uv_hrtime();
216 mIOTime = 0;
217
218 return true;
219}
220
221uint64_t DataInputDescriptor::getTimeFrameNumber(int counter, int numTF, std::string_view origin)
222{
223
224 // open file
225 if (!setFile(counter, origin)) {
226 return 0ul;
227 }
228
229 // no TF left
230 if (mfilenames[counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[counter]->numberOfTimeFrames) {
231 return 0ul;
232 }
233
234 return (mfilenames[counter]->listOfTimeFrameNumbers)[numTF];
235}
236
237arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int numTF, std::string_view origin)
238{
239 // open file
240 if (!setFile(counter, origin)) {
241 return {};
242 }
243
244 // no TF left
245 if (mfilenames[counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[counter]->numberOfTimeFrames) {
246 return {};
247 }
248
249 mfilenames[counter]->alreadyRead[numTF] = true;
250
251 return {fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]), mCurrentFilesystem};
252}
253
254DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, std::string treename, std::string_view origin)
255{
256 if (!mParentFileMap) {
257 // This file has no parent map
258 return nullptr;
259 }
260 auto folderName = fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]);
261 auto parentFileName = (TObjString*)mParentFileMap->GetValue(folderName.c_str());
262 // The current DF is not found in the parent map (this should not happen and is a fatal error)
263 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
264 if (!parentFileName) {
265 throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), rootFS->GetFile()->GetName()));
266 return nullptr;
267 }
268
269 if (mParentFile) {
270 // Is this still the corresponding to the correct file?
271 auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(mParentFile->mCurrentFilesystem);
272 if (parentFileName->GetString().CompareTo(parentRootFS->GetFile()->GetName()) == 0) {
273 return mParentFile;
274 } else {
275 mParentFile->closeInputFile();
276 delete mParentFile;
277 mParentFile = nullptr;
278 }
279 }
280
281 if (mLevel == mContext.allowedParentLevel) {
282 throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mContext.allowedParentLevel, folderName.c_str(),
283 rootFS->GetFile()->GetName()));
284 }
285
286 LOGP(info, "Opening parent file {} for DF {}", parentFileName->GetString().Data(), folderName.c_str());
287 mParentFile = new DataInputDescriptor(mAlienSupport, mLevel + 1, mContext);
288 mParentFile->mdefaultFilenamesPtr = new std::vector<FileNameHolder*>;
289 mParentFile->mdefaultFilenamesPtr->emplace_back(makeFileNameHolder(parentFileName->GetString().Data()));
290 mParentFile->fillInputfiles();
291 mParentFile->setFile(0, origin);
292 return mParentFile;
293}
294
296{
297 return mfilenames.at(counter)->numberOfTimeFrames;
298}
299
301{
302 auto& list = mfilenames.at(counter)->alreadyRead;
303 return std::count(list.begin(), list.end(), true);
304}
305
307{
308 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
309 auto f = dynamic_cast<TFile*>(rootFS->GetFile());
310 std::string monitoringInfo(fmt::format("lfn={},size={}", f->GetName(), f->GetSize()));
311#if __has_include(<TJAlienFile.h>)
312 auto alienFile = dynamic_cast<TJAlienFile*>(f);
313 if (alienFile) {
314 monitoringInfo += fmt::format(",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed());
315 }
316#endif
317 if (mContext.monitoring) {
318 mContext.monitoring->send(o2::monitoring::Metric{monitoringInfo, "aod-file-open-info"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
319 }
320 LOGP(info, "Opening file: {}", monitoringInfo);
321}
322
324{
325 int64_t wait_time = (int64_t)uv_hrtime() - (int64_t)mCurrentFileStartedAt - (int64_t)mIOTime;
326 if (wait_time < 0) {
327 wait_time = 0;
328 }
329 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
330 auto f = dynamic_cast<TFile*>(rootFS->GetFile());
331 std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", f->GetName(),
332 f->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), f->GetBytesRead(), f->GetReadCalls(),
333 ((float)mIOTime / 1e9), ((float)wait_time / 1e9), mLevel));
334#if __has_include(<TJAlienFile.h>)
335 auto alienFile = dynamic_cast<TJAlienFile*>(f);
336 if (alienFile) {
337 monitoringInfo += fmt::format(",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed());
338 }
339#endif
340 if (mContext.monitoring) {
341 mContext.monitoring->send(o2::monitoring::Metric{monitoringInfo, "aod-file-read-info"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
342 }
343 LOGP(info, "Read info: {}", monitoringInfo);
344}
345
347{
348 if (mCurrentFilesystem.get()) {
349 if (mParentFile) {
350 mParentFile->closeInputFile();
351 delete mParentFile;
352 mParentFile = nullptr;
353 }
354
355 delete mParentFileMap;
356 mParentFileMap = nullptr;
357
359 mCurrentFilesystem.reset();
360 }
361}
362
364{
365 if (getNumberInputfiles() > 0) {
366 // 1. mfilenames
367 return getNumberInputfiles();
368 }
369
370 auto fileName = getInputfilesFilename();
371 if (!fileName.empty()) {
372 // 2. getFilenamesRegex() @ getInputfilesFilename()
373 try {
374 std::ifstream filelist(fileName);
375 if (!filelist.is_open()) {
376 throw std::runtime_error(fmt::format(R"(Couldn't open file "{}")", fileName));
377 }
378 while (std::getline(filelist, fileName)) {
379 // remove white spaces, empty lines are skipped
380 fileName.erase(std::remove_if(fileName.begin(), fileName.end(), ::isspace), fileName.end());
381 if (!fileName.empty() && (getFilenamesRegexString().empty() ||
382 std::regex_match(fileName, getFilenamesRegex()))) {
384 }
385 }
386 } catch (...) {
387 LOGP(error, "Check the input files file! Unable to process \"{}\"!", getInputfilesFilename());
388 return 0;
389 }
390 } else {
391 // 3. getFilenamesRegex() @ mdefaultFilenamesPtr
392 if (mdefaultFilenamesPtr) {
393 for (auto fileNameHolder : *mdefaultFilenamesPtr) {
394 if (getFilenamesRegexString().empty() ||
395 std::regex_match(fileNameHolder->fileName, getFilenamesRegex())) {
396 addFileNameHolder(fileNameHolder);
397 }
398 }
399 }
400 }
401
402 return getNumberInputfiles();
403}
404
405int DataInputDescriptor::findDFNumber(int file, std::string dfName)
406{
407 auto dfList = mfilenames[file]->listOfTimeFrameNumbers;
408 auto it = std::find_if(dfList.begin(), dfList.end(), [dfName](size_t i) { return fmt::format("DF_{}", i) == dfName; });
409 if (it == dfList.end()) {
410 return -1;
411 }
412 return it - dfList.begin();
413}
414
417 : mTarget(target)
418 {
419 start = uv_hrtime();
420 }
422 {
423 if (!active) {
424 return;
425 }
426 O2_SIGNPOST_ACTION(reader_memory_dump, [](void*) {
427 void (*dump_)(const char*);
428 if (void* sym = dlsym(nullptr, "igprof_dump_now")) {
429 dump_ = __extension__(void (*)(const char*)) sym;
430 if (dump_) {
431 std::string filename = fmt::format("reader-memory-dump-{}.gz", uv_hrtime());
432 dump_(filename.c_str());
433 }
434 }
435 });
436 mTarget += (uv_hrtime() - start);
437 }
438
440 {
441 active = false;
442 }
443
444 bool active = true;
445 uint64_t& mTarget;
446 uint64_t start;
447 uint64_t stop;
448};
449
450bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, std::string treename, size_t& totalSizeCompressed, size_t& totalSizeUncompressed)
451{
452 CalculateDelta t(mIOTime);
453 std::string origin = dh.dataOrigin.as<std::string>();
454 auto folder = getFileFolder(counter, numTF, origin);
455 if (!folder.filesystem()) {
456 t.deactivate();
457 return false;
458 }
459
460 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(folder.filesystem());
461
462 if (!rootFS) {
463 t.deactivate();
464 throw std::runtime_error(fmt::format(R"(Not a TFile filesystem!)"));
465 }
466 // FIXME: Ugly. We should detect the format from the treename, good enough for now.
467 std::shared_ptr<arrow::dataset::FileFormat> format;
468 FragmentToBatch::StreamerCreator creator = nullptr;
469
470 auto fullpath = arrow::dataset::FileSource{folder.path() + "/" + treename, folder.filesystem()};
471
472 for (auto& capability : mFactory.capabilities) {
473 auto objectPath = capability.lfn2objectPath(fullpath.path());
474 void* handle = capability.getHandle(rootFS, objectPath);
475 if (handle) {
476 format = capability.factory().format();
477 creator = capability.factory().deferredOutputStreamer;
478 break;
479 }
480 }
481
482 // FIXME: we should distinguish between an actually missing object and one which has a non compatible
483 // format.
484 if (!format) {
485 t.deactivate();
486 LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.path());
487 auto parentFile = getParentFile(counter, numTF, treename, origin);
488 if (parentFile != nullptr) {
489 int parentNumTF = parentFile->findDFNumber(0, folder.path());
490 if (parentNumTF == -1) {
491 auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(parentFile->mCurrentFilesystem);
492 throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", folder.path(), parentRootFS->GetFile()->GetName()));
493 }
494 // first argument is 0 as the parent file object contains only 1 file
495 return parentFile->readTree(outputs, dh, 0, parentNumTF, treename, totalSizeCompressed, totalSizeUncompressed);
496 }
497 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
498 throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fullpath.path(), rootFS->GetFile()->GetName()));
499 }
500
501 auto schemaOpt = format->Inspect(fullpath);
502 auto physicalSchema = schemaOpt;
503 std::vector<std::shared_ptr<arrow::Field>> fields;
504 for (auto& original : (*schemaOpt)->fields()) {
505 if (original->name().ends_with("_size")) {
506 continue;
507 }
508 fields.push_back(original);
509 }
510 auto datasetSchema = std::make_shared<arrow::Schema>(fields);
511
512 auto fragment = format->MakeFragment(fullpath, {}, *physicalSchema);
513
514 // create table output
515 auto o = Output(dh);
516
517 // FIXME: This should allow me to create a memory pool
518 // which I can then use to scan the dataset.
519 auto f2b = outputs.make<FragmentToBatch>(o, creator, *fragment);
520
523 f2b->setLabel(treename.c_str());
524 f2b->fill(datasetSchema, format);
525
526 return true;
527}
528
529DataInputDirector::DataInputDirector(std::vector<std::string> inputFiles, DataInputDirectorContext&& context)
530 : mContext{context}
531{
532 if (inputFiles.size() == 1 && !inputFiles[0].empty() && inputFiles[0][0] == '@') {
533 setInputfilesFile(inputFiles.back().substr(1, -1));
534 } else {
535 for (auto inputFile : inputFiles) {
536 mdefaultInputFiles.emplace_back(makeFileNameHolder(inputFile));
537 }
538 }
539
541}
542
544{
545 for (auto fn : mdefaultInputFiles) {
546 delete fn;
547 }
548 mdefaultInputFiles.clear();
549 mdefaultDataInputDescriptor = nullptr;
550
551 for (auto fn : mdataInputDescriptors) {
552 delete fn;
553 }
554 mdataInputDescriptors.clear();
555}
556
558{
559 mdataInputDescriptors.clear();
560 mdefaultInputFiles.clear();
561 mFilenameRegex = std::string("");
562};
563
565{
566 if (mdefaultDataInputDescriptor) {
567 delete mdefaultDataInputDescriptor;
568 }
569 mdefaultDataInputDescriptor = new DataInputDescriptor(mAlienSupport, 0, mContext);
570
571 mdefaultDataInputDescriptor->setInputfilesFile(minputfilesFile);
572 mdefaultDataInputDescriptor->setFilenamesRegex(mFilenameRegex);
573 mdefaultDataInputDescriptor->setDefaultInputfiles(&mdefaultInputFiles);
574 mdefaultDataInputDescriptor->tablename = "any";
575 mdefaultDataInputDescriptor->treename = "any";
576 mdefaultDataInputDescriptor->fillInputfiles();
577
578 mAlienSupport &= mdefaultDataInputDescriptor->isAlienSupportOn();
579}
580
581bool DataInputDirector::readJson(std::string const& fnjson)
582{
583 // open the file
584 FILE* f = fopen(fnjson.c_str(), "r");
585 if (!f) {
586 LOGP(error, "Could not open JSON file \"{}\"!", fnjson);
587 return false;
588 }
589
590 // create streamer
591 char readBuffer[65536];
592 FileReadStream inputStream(f, readBuffer, sizeof(readBuffer));
593
594 // parse the json file
595 Document jsonDoc;
596 jsonDoc.ParseStream(inputStream);
597 auto status = readJsonDocument(&jsonDoc);
598
599 // clean up
600 fclose(f);
601
602 return status;
603}
604
605bool DataInputDirector::readJsonDocument(Document* jsonDoc)
606{
607 // initialisations
608 std::string fileName("");
609 const char* itemName;
610
611 // is it a proper json document?
612 if (jsonDoc->HasParseError()) {
613 LOGP(error, "Check the JSON document! There is a problem with the format!");
614 return false;
615 }
616
617 // InputDirector
618 itemName = "InputDirector";
619 const Value& didirItem = (*jsonDoc)[itemName];
620 if (!didirItem.IsObject()) {
621 LOGP(info, "No \"{}\" object found in the JSON document!", itemName);
622 return true;
623 }
624
625 // now read various items
626 itemName = "debugmode";
627 if (didirItem.HasMember(itemName)) {
628 if (didirItem[itemName].IsBool()) {
629 mDebugMode = (didirItem[itemName].GetBool());
630 } else {
631 LOGP(error, "Check the JSON document! Item \"{}\" must be a boolean!", itemName);
632 return false;
633 }
634 } else {
635 mDebugMode = false;
636 }
637
638 if (mDebugMode) {
639 StringBuffer buffer;
640 buffer.Clear();
641 PrettyWriter<StringBuffer> writer(buffer);
642 didirItem.Accept(writer);
643 LOGP(info, "InputDirector object: {}", std::string(buffer.GetString()));
644 }
645
646 itemName = "fileregex";
647 if (didirItem.HasMember(itemName)) {
648 if (didirItem[itemName].IsString()) {
649 setFilenamesRegex(didirItem[itemName].GetString());
650 } else {
651 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
652 return false;
653 }
654 }
655
656 itemName = "resfiles";
657 if (didirItem.HasMember(itemName)) {
658 if (didirItem[itemName].IsString()) {
659 fileName = didirItem[itemName].GetString();
660 if (fileName.size() && fileName[0] == '@') {
661 fileName.erase(0, 1);
662 setInputfilesFile(fileName);
663 } else {
665 mdefaultInputFiles.emplace_back(makeFileNameHolder(fileName));
666 }
667 } else if (didirItem[itemName].IsArray()) {
669 auto fns = didirItem[itemName].GetArray();
670 for (auto& fn : fns) {
671 mdefaultInputFiles.emplace_back(makeFileNameHolder(fn.GetString()));
672 }
673 } else {
674 LOGP(error, "Check the JSON document! Item \"{}\" must be a string or an array!", itemName);
675 return false;
676 }
677 }
678
679 itemName = "InputDescriptors";
680 if (didirItem.HasMember(itemName)) {
681 if (!didirItem[itemName].IsArray()) {
682 LOGP(error, "Check the JSON document! Item \"{}\" must be an array!", itemName);
683 return false;
684 }
685
686 // loop over DataInputDescriptors
687 for (auto& didescItem : didirItem[itemName].GetArray()) {
688 if (!didescItem.IsObject()) {
689 LOGP(error, "Check the JSON document! \"{}\" must be objects!", itemName);
690 return false;
691 }
692 // create a new dataInputDescriptor
693 auto didesc = new DataInputDescriptor(mAlienSupport, 0, mContext);
694 didesc->setDefaultInputfiles(&mdefaultInputFiles);
695
696 itemName = "table";
697 if (didescItem.HasMember(itemName)) {
698 if (didescItem[itemName].IsString()) {
699 didesc->tablename = didescItem[itemName].GetString();
700 didesc->matcher = DataDescriptorQueryBuilder::buildNode(didesc->tablename);
701 } else {
702 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
703 return false;
704 }
705 } else {
706 LOGP(error, "Check the JSON document! Item \"{}\" is missing!", itemName);
707 return false;
708 }
709
710 itemName = "treename";
711 if (didescItem.HasMember(itemName)) {
712 if (didescItem[itemName].IsString()) {
713 didesc->treename = didescItem[itemName].GetString();
714 } else {
715 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
716 return false;
717 }
718 } else {
719 auto m = DataDescriptorQueryBuilder::getTokens(didesc->tablename);
720 didesc->treename = m[2];
721 }
722
723 itemName = "fileregex";
724 if (didescItem.HasMember(itemName)) {
725 if (didescItem[itemName].IsString()) {
726 if (didesc->getNumberInputfiles() == 0) {
727 didesc->setFilenamesRegex(didescItem[itemName].GetString());
728 }
729 } else {
730 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
731 return false;
732 }
733 } else {
734 if (didesc->getNumberInputfiles() == 0) {
735 didesc->setFilenamesRegex(mFilenameRegexPtr);
736 }
737 }
738
739 itemName = "resfiles";
740 if (didescItem.HasMember(itemName)) {
741 if (didescItem[itemName].IsString()) {
742 fileName = didescItem[itemName].GetString();
743 if (fileName.size() && fileName[0] == '@') {
744 didesc->setInputfilesFile(fileName.erase(0, 1));
745 } else {
746 if (didesc->getFilenamesRegexString().empty() ||
747 std::regex_match(fileName, didesc->getFilenamesRegex())) {
748 didesc->addFileNameHolder(makeFileNameHolder(fileName));
749 }
750 }
751 } else if (didescItem[itemName].IsArray()) {
752 auto fns = didescItem[itemName].GetArray();
753 for (auto& fn : fns) {
754 if (didesc->getFilenamesRegexString().empty() ||
755 std::regex_match(fn.GetString(), didesc->getFilenamesRegex())) {
756 didesc->addFileNameHolder(makeFileNameHolder(fn.GetString()));
757 }
758 }
759 } else {
760 LOGP(error, "Check the JSON document! Item \"{}\" must be a string or an array!", itemName);
761 return false;
762 }
763 } else {
764 didesc->setInputfilesFile(minputfilesFilePtr);
765 }
766
767 // fill mfilenames and add InputDescriptor to InputDirector
768 if (didesc->fillInputfiles() > 0) {
769 mdataInputDescriptors.emplace_back(didesc);
770 } else {
771 didesc->printOut();
772 LOGP(info, "This DataInputDescriptor is ignored because its file list is empty!");
773 }
774 mAlienSupport &= didesc->isAlienSupportOn();
775 }
776 }
777
778 // add a default DataInputDescriptor
780
781 // check that all DataInputDescriptors have the same number of input files
782 if (!isValid()) {
783 printOut();
784 return false;
785 }
786
787 // print the DataIputDirector
788 if (mDebugMode) {
789 printOut();
790 }
791
792 return true;
793}
794
796{
797 DataInputDescriptor* result = nullptr;
798
799 // compute list of matching outputs
801
802 for (auto didesc : mdataInputDescriptors) {
803 if (didesc->matcher->match(dh, context)) {
804 result = didesc;
805 break;
806 }
807 }
808
809 return result;
810}
811
812arrow::dataset::FileSource DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF)
813{
814 auto didesc = getDataInputDescriptor(dh);
815 // if NOT match then use defaultDataInputDescriptor
816 if (!didesc) {
817 didesc = mdefaultDataInputDescriptor;
818 }
819 std::string origin = dh.dataOrigin.as<std::string>();
820
821 return didesc->getFileFolder(counter, numTF, origin);
822}
823
825{
826 auto didesc = getDataInputDescriptor(dh);
827 // if NOT match then use defaultDataInputDescriptor
828 if (!didesc) {
829 didesc = mdefaultDataInputDescriptor;
830 }
831
832 return didesc->getTimeFramesInFile(counter);
833}
834
836{
837 auto didesc = getDataInputDescriptor(dh);
838 // if NOT match then use defaultDataInputDescriptor
839 if (!didesc) {
840 didesc = mdefaultDataInputDescriptor;
841 }
842 std::string origin = dh.dataOrigin.as<std::string>();
843
844 return didesc->getTimeFrameNumber(counter, numTF, origin);
845}
846
847bool DataInputDirector::readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed)
848{
849 std::string treename;
850
851 auto didesc = getDataInputDescriptor(dh);
852 if (didesc) {
853 // if match then use filename and treename from DataInputDescriptor
854 treename = didesc->treename;
855 } else {
856 // if NOT match then use
857 // . filename from defaultDataInputDescriptor
858 // . treename from DataHeader
859 didesc = mdefaultDataInputDescriptor;
860 treename = aod::datamodel::getTreeName(dh);
861 }
862 std::string origin = dh.dataOrigin.as<std::string>();
863
864 auto result = didesc->readTree(outputs, dh, counter, numTF, treename, totalSizeCompressed, totalSizeUncompressed);
865 return result;
866}
867
869{
870 mdefaultDataInputDescriptor->closeInputFile();
871 for (auto didesc : mdataInputDescriptors) {
872 didesc->closeInputFile();
873 }
874}
875
876bool DataInputDirector::isValid()
877{
878 bool status = true;
879 int numberFiles = mdefaultDataInputDescriptor->getNumberInputfiles();
880 for (auto didesc : mdataInputDescriptors) {
881 status &= didesc->getNumberInputfiles() == numberFiles;
882 }
883
884 return status;
885}
886
888{
889 bool status = mdefaultDataInputDescriptor->getNumberInputfiles() <= counter;
890 for (auto didesc : mdataInputDescriptors) {
891 status &= (didesc->getNumberInputfiles() <= counter);
892 }
893
894 return status;
895}
896
898{
899 LOGP(info, "DataInputDirector");
900 LOGP(info, " Default input files file : {}", minputfilesFile);
901 LOGP(info, " Default file name regex : {}", mFilenameRegex);
902 LOGP(info, " Default file names : {}", mdefaultInputFiles.size());
903 for (auto const& fn : mdefaultInputFiles) {
904 LOGP(info, " {} {}", fn->fileName, fn->numberOfTimeFrames);
905 }
906 LOGP(info, " Default DataInputDescriptor:");
907 mdefaultDataInputDescriptor->printOut();
908 LOGP(info, " DataInputDescriptors : {}", getNumberInputDescriptors());
909 for (auto const& didesc : mdataInputDescriptors) {
910 didesc->printOut();
911 }
912}
913
914} // namespace o2::framework
header::DataOrigin origin
o2::monitoring::tags::Value Value
std::vector< std::shared_ptr< arrow::Field > > fields
std::ostringstream debug
int32_t i
uint16_t pos
Definition RawData.h:3
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_ACTION(log, callback)
Definition Signpost.h:510
StringRef key
decltype(auto) make(const Output &spec, Args... args)
void addFileNameHolder(FileNameHolder *fn)
bool readTree(DataAllocator &outputs, header::DataHeader dh, int counter, int numTF, std::string treename, size_t &totalSizeCompressed, size_t &totalSizeUncompressed)
uint64_t getTimeFrameNumber(int counter, int numTF, std::string_view origin)
DataInputDescriptor * getParentFile(int counter, int numTF, std::string treename, std::string_view origin)
void setDefaultInputfiles(std::vector< FileNameHolder * > *difnptr)
void setInputfilesFile(std::string dffn)
DataInputDescriptor(bool alienSupport, int level, DataInputDirectorContext &context)
int findDFNumber(int file, std::string dfName)
arrow::dataset::FileSource getFileFolder(int counter, int numTF, std::string_view origin)
bool setFile(int counter, std::string_view origin)
bool readTree(DataAllocator &outputs, header::DataHeader dh, int counter, int numTF, size_t &totalSizeCompressed, size_t &totalSizeUncompressed)
DataInputDescriptor * getDataInputDescriptor(header::DataHeader dh)
DataInputDirector(std::vector< std::string > inputFiles, DataInputDirectorContext &&context)
arrow::dataset::FileSource getFileFolder(header::DataHeader dh, int counter, int numTF)
void setInputfilesFile(std::string iffn)
int getTimeFramesInFile(header::DataHeader dh, int counter)
uint64_t getTimeFrameNumber(header::DataHeader dh, int counter, int numTF)
void setFilenamesRegex(std::string dfn)
bool readJson(std::string const &fnjson)
std::function< std::shared_ptr< arrow::io::OutputStream >(std::shared_ptr< arrow::dataset::FileFragment >, const std::shared_ptr< arrow::ResizableBuffer > &buffer)> StreamerCreator
void setLabel(const char *label)
bool match(const std::vector< std::string > &queries, const char *pattern)
Definition dcs-ccdb.cxx:229
const GLfloat * m
Definition glcorearb.h:4066
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLdouble f
Definition glcorearb.h:310
GLenum target
Definition glcorearb.h:1641
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLuint start
Definition glcorearb.h:469
GLint GLint GLsizei GLint GLenum format
Definition glcorearb.h:275
GLuint counter
Definition glcorearb.h:3987
std::string getTreeName(header::DataHeader dh)
Defining PrimaryVertex explicitly as messageable.
Definition Cartesian.h:288
FileNameHolder * makeFileNameHolder(std::string fileName)
std::string filename()
void empty(int)
Definition list.h:40
static std::unique_ptr< data_matcher::DataDescriptorMatcher > buildNode(std::string const &nodeString)
static std::vector< std::string > getTokens(std::string const &nodeString)
o2::monitoring::Monitoring * monitoring
static std::vector< LoadablePlugin > parsePluginSpecString(char const *str)
Parse a comma separated list of <library>:<plugin-name> plugin declarations.
std::vector< RootObjectReadingCapability > capabilities
the main header struct
Definition DataHeader.h:620
std::enable_if_t< std::is_same< T, std::string >::value==true, T > as() const
get the descriptor as std::string
Definition DataHeader.h:301
const std::string str