Project
Loading...
Searching...
No Matches
DataInputDirector.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "DataInputDirector.h"
13#include "Framework/Logger.h"
17#include "Framework/Output.h"
18#include "Framework/Signpost.h"
19#include "Headers/DataHeader.h"
21#include "Monitoring/Tags.h"
22#include "Monitoring/Metric.h"
23#include "Monitoring/Monitoring.h"
24
25#include "rapidjson/document.h"
26#include "rapidjson/prettywriter.h"
27#include "rapidjson/filereadstream.h"
28
29#include "TGrid.h"
30#include "TObjString.h"
31#include "TMap.h"
32#include "TFile.h"
33
34#include <arrow/dataset/file_base.h>
35#include <arrow/dataset/dataset.h>
36#include <uv.h>
37#include <memory>
38
39#if __has_include(<TJAlienFile.h>)
40#include <TJAlienFile.h>
41
42#include <utility>
43#endif
44
45#include <dlfcn.h>
46O2_DECLARE_DYNAMIC_LOG(reader_memory_dump);
47
48namespace o2::framework
49{
50using namespace rapidjson;
51
52FileNameHolder* makeFileNameHolder(std::string fileName)
53{
54 auto fileNameHolder = new FileNameHolder();
55 fileNameHolder->fileName = fileName;
56
57 return fileNameHolder;
58}
59
60DataInputDescriptor::DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement)
61 : mAlienSupport(alienSupport),
62 mMonitoring(monitoring),
63 mAllowedParentLevel(allowedParentLevel),
64 mParentFileReplacement(std::move(parentFileReplacement)),
65 mLevel(level)
66{
67 std::vector<char const*> capabilitiesSpecs = {
68 "O2Framework:RNTupleObjectReadingCapability",
69 "O2Framework:TTreeObjectReadingCapability",
70 };
71
72 std::vector<LoadablePlugin> plugins;
73 for (auto spec : capabilitiesSpecs) {
74 auto morePlugins = PluginManager::parsePluginSpecString(spec);
75 for (auto& extra : morePlugins) {
76 plugins.push_back(extra);
77 }
78 }
79
80 PluginManager::loadFromPlugin<RootObjectReadingCapability, RootObjectReadingCapabilityPlugin>(plugins, mFactory.capabilities);
81}
82
84{
85 LOGP(info, "DataInputDescriptor");
86 LOGP(info, " Table name : {}", tablename);
87 LOGP(info, " Tree name : {}", treename);
88 LOGP(info, " Input files file : {}", getInputfilesFilename());
89 LOGP(info, " File name regex : {}", getFilenamesRegexString());
90 LOGP(info, " Input files : {}", mfilenames.size());
91 for (auto fn : mfilenames) {
92 LOGP(info, " {} {}", fn->fileName, fn->numberOfTimeFrames);
93 }
94 LOGP(info, " Total number of TF: {}", getNumberTimeFrames());
95}
96
98{
99 return (minputfilesFile.empty() && minputfilesFilePtr) ? (std::string)*minputfilesFilePtr : minputfilesFile;
100}
101
103{
104 return (mFilenameRegex.empty() && mFilenameRegexPtr) ? (std::string)*mFilenameRegexPtr : mFilenameRegex;
105}
106
108{
109 return std::regex(getFilenamesRegexString());
110}
111
113{
114 // remove leading file:// from file name
115 if (fn->fileName.rfind("file://", 0) == 0) {
116 fn->fileName.erase(0, 7);
117 } else if (!mAlienSupport && fn->fileName.rfind("alien://", 0) == 0 && !gGrid) {
118 LOGP(debug, "AliEn file requested. Enabling support.");
119 TGrid::Connect("alien://");
120 mAlienSupport = true;
121 }
122
123 mtotalNumberTimeFrames += fn->numberOfTimeFrames;
124 mfilenames.emplace_back(fn);
125}
126
128{
129 // no files left
130 if (counter >= getNumberInputfiles()) {
131 return false;
132 }
133
134 // open file
135 auto filename = mfilenames[counter]->fileName;
136 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
137 if (rootFS.get()) {
138 if (rootFS->GetFile()->GetName() == filename) {
139 return true;
140 }
142 }
143
144 mCurrentFilesystem = std::make_shared<TFileFileSystem>(TFile::Open(filename.c_str()), 50 * 1024 * 1024, mFactory);
145 if (!mCurrentFilesystem.get()) {
146 throw std::runtime_error(fmt::format("Couldn't open file \"{}\"!", filename));
147 }
148 rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
150
151 // get the parent file map if exists
152 mParentFileMap = (TMap*)rootFS->GetFile()->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path)
153 if (mParentFileMap && !mParentFileReplacement.empty()) {
154 auto pos = mParentFileReplacement.find(';');
155 if (pos == std::string::npos) {
156 throw std::runtime_error(fmt::format("Invalid syntax in aod-parent-base-path-replacement: \"{}\"", mParentFileReplacement.c_str()));
157 }
158 auto from = mParentFileReplacement.substr(0, pos);
159 auto to = mParentFileReplacement.substr(pos + 1);
160
161 auto it = mParentFileMap->MakeIterator();
162 while (auto obj = it->Next()) {
163 auto objString = (TObjString*)mParentFileMap->GetValue(obj);
164 objString->String().ReplaceAll(from.c_str(), to.c_str());
165 }
166 delete it;
167 }
168
169 // get the directory names
170 if (mfilenames[counter]->numberOfTimeFrames <= 0) {
171 const std::regex TFRegex = std::regex("/?DF_([0-9]+)(|-.*)$");
172 TList* keyList = rootFS->GetFile()->GetListOfKeys();
173 std::vector<std::string> finalList;
174
175 // extract TF numbers and sort accordingly
176 // We use an extra seen set to make sure we preserve the order in which
177 // we instert things in the final list and to make sure we do not have duplicates.
178 // Multiple folder numbers can happen if we use a flat structure /DF_<df>-<tablename>
179 std::unordered_set<size_t> seen;
180 for (auto key : *keyList) {
181 std::smatch matchResult;
182 std::string keyName = ((TObjString*)key)->GetString().Data();
183 bool match = std::regex_match(keyName, matchResult, TFRegex);
184 if (match) {
185 auto folderNumber = std::stoul(matchResult[1].str());
186 if (seen.find(folderNumber) == seen.end()) {
187 seen.insert(folderNumber);
188 mfilenames[counter]->listOfTimeFrameNumbers.emplace_back(folderNumber);
189 }
190 }
191 }
192
193 if (mParentFileMap != nullptr) {
194 // If we have a parent map, we should not process in DF alphabetical order but according to parent file to avoid swapping between files
195 std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end(),
196 [this](long const& l1, long const& l2) -> bool {
197 auto p1 = (TObjString*)this->mParentFileMap->GetValue(("DF_" + std::to_string(l1)).c_str());
198 auto p2 = (TObjString*)this->mParentFileMap->GetValue(("DF_" + std::to_string(l2)).c_str());
199 return p1->GetString().CompareTo(p2->GetString()) < 0;
200 });
201 } else {
202 std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end());
203 }
204
205 mfilenames[counter]->alreadyRead.resize(mfilenames[counter]->alreadyRead.size() + mfilenames[counter]->listOfTimeFrameNumbers.size(), false);
206 mfilenames[counter]->numberOfTimeFrames = mfilenames[counter]->listOfTimeFrameNumbers.size();
207 }
208
209 mCurrentFileID = counter;
210 mCurrentFileStartedAt = uv_hrtime();
211 mIOTime = 0;
212
213 return true;
214}
215
217{
218
219 // open file
220 if (!setFile(counter)) {
221 return 0ul;
222 }
223
224 // no TF left
225 if (mfilenames[counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[counter]->numberOfTimeFrames) {
226 return 0ul;
227 }
228
229 return (mfilenames[counter]->listOfTimeFrameNumbers)[numTF];
230}
231
232arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int numTF)
233{
234 // open file
235 if (!setFile(counter)) {
236 return {};
237 }
238
239 // no TF left
240 if (mfilenames[counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[counter]->numberOfTimeFrames) {
241 return {};
242 }
243
244 mfilenames[counter]->alreadyRead[numTF] = true;
245
246 return {fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]), mCurrentFilesystem};
247}
248
250{
251 if (!mParentFileMap) {
252 // This file has no parent map
253 return nullptr;
254 }
255 auto folderName = fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]);
256 auto parentFileName = (TObjString*)mParentFileMap->GetValue(folderName.c_str());
257 // The current DF is not found in the parent map (this should not happen and is a fatal error)
258 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
259 if (!parentFileName) {
260 throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), rootFS->GetFile()->GetName()));
261 return nullptr;
262 }
263
264 if (mParentFile) {
265 // Is this still the corresponding to the correct file?
266 auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(mParentFile->mCurrentFilesystem);
267 if (parentFileName->GetString().CompareTo(parentRootFS->GetFile()->GetName()) == 0) {
268 return mParentFile;
269 } else {
270 mParentFile->closeInputFile();
271 delete mParentFile;
272 mParentFile = nullptr;
273 }
274 }
275
276 if (mLevel == mAllowedParentLevel) {
277 throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(),
278 rootFS->GetFile()->GetName()));
279 }
280
281 LOGP(info, "Opening parent file {} for DF {}", parentFileName->GetString().Data(), folderName.c_str());
282 mParentFile = new DataInputDescriptor(mAlienSupport, mLevel + 1, mMonitoring, mAllowedParentLevel, mParentFileReplacement);
283 mParentFile->mdefaultFilenamesPtr = new std::vector<FileNameHolder*>;
284 mParentFile->mdefaultFilenamesPtr->emplace_back(makeFileNameHolder(parentFileName->GetString().Data()));
285 mParentFile->fillInputfiles();
286 mParentFile->setFile(0);
287 return mParentFile;
288}
289
291{
292 return mfilenames.at(counter)->numberOfTimeFrames;
293}
294
296{
297 auto& list = mfilenames.at(counter)->alreadyRead;
298 return std::count(list.begin(), list.end(), true);
299}
300
302{
303 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
304 auto f = dynamic_cast<TFile*>(rootFS->GetFile());
305 std::string monitoringInfo(fmt::format("lfn={},size={}", f->GetName(), f->GetSize()));
306#if __has_include(<TJAlienFile.h>)
307 auto alienFile = dynamic_cast<TJAlienFile*>(f);
308 if (alienFile) {
309 monitoringInfo += fmt::format(",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed());
310 }
311#endif
312 mMonitoring->send(o2::monitoring::Metric{monitoringInfo, "aod-file-open-info"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
313 LOGP(info, "Opening file: {}", monitoringInfo);
314}
315
317{
318 int64_t wait_time = (int64_t)uv_hrtime() - (int64_t)mCurrentFileStartedAt - (int64_t)mIOTime;
319 if (wait_time < 0) {
320 wait_time = 0;
321 }
322 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
323 auto f = dynamic_cast<TFile*>(rootFS->GetFile());
324 std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", f->GetName(),
325 f->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), f->GetBytesRead(), f->GetReadCalls(),
326 ((float)mIOTime / 1e9), ((float)wait_time / 1e9), mLevel));
327#if __has_include(<TJAlienFile.h>)
328 auto alienFile = dynamic_cast<TJAlienFile*>(f);
329 if (alienFile) {
330 monitoringInfo += fmt::format(",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed());
331 }
332#endif
333 mMonitoring->send(o2::monitoring::Metric{monitoringInfo, "aod-file-read-info"}.addTag(o2::monitoring::tags::Key::Subsystem, o2::monitoring::tags::Value::DPL));
334 LOGP(info, "Read info: {}", monitoringInfo);
335}
336
338{
339 if (mCurrentFilesystem.get()) {
340 if (mParentFile) {
341 mParentFile->closeInputFile();
342 delete mParentFile;
343 mParentFile = nullptr;
344 }
345
346 delete mParentFileMap;
347 mParentFileMap = nullptr;
348
350 mCurrentFilesystem.reset();
351 }
352}
353
355{
356 if (getNumberInputfiles() > 0) {
357 // 1. mfilenames
358 return getNumberInputfiles();
359 }
360
361 auto fileName = getInputfilesFilename();
362 if (!fileName.empty()) {
363 // 2. getFilenamesRegex() @ getInputfilesFilename()
364 try {
365 std::ifstream filelist(fileName);
366 if (!filelist.is_open()) {
367 throw std::runtime_error(fmt::format(R"(Couldn't open file "{}")", fileName));
368 }
369 while (std::getline(filelist, fileName)) {
370 // remove white spaces, empty lines are skipped
371 fileName.erase(std::remove_if(fileName.begin(), fileName.end(), ::isspace), fileName.end());
372 if (!fileName.empty() && (getFilenamesRegexString().empty() ||
373 std::regex_match(fileName, getFilenamesRegex()))) {
375 }
376 }
377 } catch (...) {
378 LOGP(error, "Check the input files file! Unable to process \"{}\"!", getInputfilesFilename());
379 return 0;
380 }
381 } else {
382 // 3. getFilenamesRegex() @ mdefaultFilenamesPtr
383 if (mdefaultFilenamesPtr) {
384 for (auto fileNameHolder : *mdefaultFilenamesPtr) {
385 if (getFilenamesRegexString().empty() ||
386 std::regex_match(fileNameHolder->fileName, getFilenamesRegex())) {
387 addFileNameHolder(fileNameHolder);
388 }
389 }
390 }
391 }
392
393 return getNumberInputfiles();
394}
395
396int DataInputDescriptor::findDFNumber(int file, std::string dfName)
397{
398 auto dfList = mfilenames[file]->listOfTimeFrameNumbers;
399 auto it = std::find_if(dfList.begin(), dfList.end(), [dfName](size_t i) { return fmt::format("DF_{}", i) == dfName; });
400 if (it == dfList.end()) {
401 return -1;
402 }
403 return it - dfList.begin();
404}
405
408 : mTarget(target)
409 {
410 start = uv_hrtime();
411 }
413 {
414 if (!active) {
415 return;
416 }
417 O2_SIGNPOST_ACTION(reader_memory_dump, [](void*) {
418 void (*dump_)(const char*);
419 if (void* sym = dlsym(nullptr, "igprof_dump_now")) {
420 dump_ = __extension__(void (*)(const char*)) sym;
421 if (dump_) {
422 std::string filename = fmt::format("reader-memory-dump-{}.gz", uv_hrtime());
423 dump_(filename.c_str());
424 }
425 }
426 });
427 mTarget += (uv_hrtime() - start);
428 }
429
430 void deactivate() {
431 active = false;
432 }
433
434 bool active = true;
435 uint64_t& mTarget;
436 uint64_t start;
437 uint64_t stop;
438};
439
440bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, std::string treename, size_t& totalSizeCompressed, size_t& totalSizeUncompressed)
441{
442 CalculateDelta t(mIOTime);
443 auto folder = getFileFolder(counter, numTF);
444 if (!folder.filesystem()) {
445 t.deactivate();
446 return false;
447 }
448
449 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(folder.filesystem());
450
451 if (!rootFS) {
452 t.deactivate();
453 throw std::runtime_error(fmt::format(R"(Not a TFile filesystem!)"));
454 }
455 // FIXME: Ugly. We should detect the format from the treename, good enough for now.
456 std::shared_ptr<arrow::dataset::FileFormat> format;
457 FragmentToBatch::StreamerCreator creator = nullptr;
458
459 auto fullpath = arrow::dataset::FileSource{folder.path() + "/" + treename, folder.filesystem()};
460
461 for (auto& capability : mFactory.capabilities) {
462 auto objectPath = capability.lfn2objectPath(fullpath.path());
463 void* handle = capability.getHandle(rootFS, objectPath);
464 if (handle) {
465 format = capability.factory().format();
466 creator = capability.factory().deferredOutputStreamer;
467 break;
468 }
469 }
470
471 // FIXME: we should distinguish between an actually missing object and one which has a non compatible
472 // format.
473 if (!format) {
474 t.deactivate();
475 LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.path());
476 auto parentFile = getParentFile(counter, numTF, treename);
477 if (parentFile != nullptr) {
478 int parentNumTF = parentFile->findDFNumber(0, folder.path());
479 if (parentNumTF == -1) {
480 auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(parentFile->mCurrentFilesystem);
481 throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", folder.path(), parentRootFS->GetFile()->GetName()));
482 }
483 // first argument is 0 as the parent file object contains only 1 file
484 return parentFile->readTree(outputs, dh, 0, parentNumTF, treename, totalSizeCompressed, totalSizeUncompressed);
485 }
486 auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
487 throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fullpath.path(), rootFS->GetFile()->GetName()));
488 }
489
490 auto schemaOpt = format->Inspect(fullpath);
491 auto physicalSchema = schemaOpt;
492 std::vector<std::shared_ptr<arrow::Field>> fields;
493 for (auto& original : (*schemaOpt)->fields()) {
494 if (original->name().ends_with("_size")) {
495 continue;
496 }
497 fields.push_back(original);
498 }
499 auto datasetSchema = std::make_shared<arrow::Schema>(fields);
500
501 auto fragment = format->MakeFragment(fullpath, {}, *physicalSchema);
502
503 // create table output
504 auto o = Output(dh);
505
506 // FIXME: This should allow me to create a memory pool
507 // which I can then use to scan the dataset.
508 auto f2b = outputs.make<FragmentToBatch>(o, creator, *fragment);
509
512 f2b->setLabel(treename.c_str());
513 f2b->fill(datasetSchema, format);
514
515 return true;
516}
517
522
523DataInputDirector::DataInputDirector(std::string inputFile, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement) : mMonitoring(monitoring), mAllowedParentLevel(allowedParentLevel), mParentFileReplacement(std::move(parentFileReplacement))
524{
525 if (inputFile.size() && inputFile[0] == '@') {
526 inputFile.erase(0, 1);
527 setInputfilesFile(inputFile);
528 } else {
529 mdefaultInputFiles.emplace_back(makeFileNameHolder(inputFile));
530 }
531
533}
534
535DataInputDirector::DataInputDirector(std::vector<std::string> inputFiles, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement) : mMonitoring(monitoring), mAllowedParentLevel(allowedParentLevel), mParentFileReplacement(std::move(parentFileReplacement))
536{
537 for (auto inputFile : inputFiles) {
538 mdefaultInputFiles.emplace_back(makeFileNameHolder(inputFile));
539 }
540
542}
543
545{
546 for (auto fn : mdefaultInputFiles) {
547 delete fn;
548 }
549 mdefaultInputFiles.clear();
550 mdefaultDataInputDescriptor = nullptr;
551
552 for (auto fn : mdataInputDescriptors) {
553 delete fn;
554 }
555 mdataInputDescriptors.clear();
556}
557
559{
560 mdataInputDescriptors.clear();
561 mdefaultInputFiles.clear();
562 mFilenameRegex = std::string("");
563};
564
566{
567 if (mdefaultDataInputDescriptor) {
568 delete mdefaultDataInputDescriptor;
569 }
570 mdefaultDataInputDescriptor = new DataInputDescriptor(mAlienSupport, 0, mMonitoring, mAllowedParentLevel, mParentFileReplacement);
571
572 mdefaultDataInputDescriptor->setInputfilesFile(minputfilesFile);
573 mdefaultDataInputDescriptor->setFilenamesRegex(mFilenameRegex);
574 mdefaultDataInputDescriptor->setDefaultInputfiles(&mdefaultInputFiles);
575 mdefaultDataInputDescriptor->tablename = "any";
576 mdefaultDataInputDescriptor->treename = "any";
577 mdefaultDataInputDescriptor->fillInputfiles();
578
579 mAlienSupport &= mdefaultDataInputDescriptor->isAlienSupportOn();
580}
581
582bool DataInputDirector::readJson(std::string const& fnjson)
583{
584 // open the file
585 FILE* f = fopen(fnjson.c_str(), "r");
586 if (!f) {
587 LOGP(error, "Could not open JSON file \"{}\"!", fnjson);
588 return false;
589 }
590
591 // create streamer
592 char readBuffer[65536];
593 FileReadStream inputStream(f, readBuffer, sizeof(readBuffer));
594
595 // parse the json file
596 Document jsonDoc;
597 jsonDoc.ParseStream(inputStream);
598 auto status = readJsonDocument(&jsonDoc);
599
600 // clean up
601 fclose(f);
602
603 return status;
604}
605
606bool DataInputDirector::readJsonDocument(Document* jsonDoc)
607{
608 // initialisations
609 std::string fileName("");
610 const char* itemName;
611
612 // is it a proper json document?
613 if (jsonDoc->HasParseError()) {
614 LOGP(error, "Check the JSON document! There is a problem with the format!");
615 return false;
616 }
617
618 // InputDirector
619 itemName = "InputDirector";
620 const Value& didirItem = (*jsonDoc)[itemName];
621 if (!didirItem.IsObject()) {
622 LOGP(info, "No \"{}\" object found in the JSON document!", itemName);
623 return true;
624 }
625
626 // now read various items
627 itemName = "debugmode";
628 if (didirItem.HasMember(itemName)) {
629 if (didirItem[itemName].IsBool()) {
630 mDebugMode = (didirItem[itemName].GetBool());
631 } else {
632 LOGP(error, "Check the JSON document! Item \"{}\" must be a boolean!", itemName);
633 return false;
634 }
635 } else {
636 mDebugMode = false;
637 }
638
639 if (mDebugMode) {
640 StringBuffer buffer;
641 buffer.Clear();
642 PrettyWriter<StringBuffer> writer(buffer);
643 didirItem.Accept(writer);
644 LOGP(info, "InputDirector object: {}", std::string(buffer.GetString()));
645 }
646
647 itemName = "fileregex";
648 if (didirItem.HasMember(itemName)) {
649 if (didirItem[itemName].IsString()) {
650 setFilenamesRegex(didirItem[itemName].GetString());
651 } else {
652 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
653 return false;
654 }
655 }
656
657 itemName = "resfiles";
658 if (didirItem.HasMember(itemName)) {
659 if (didirItem[itemName].IsString()) {
660 fileName = didirItem[itemName].GetString();
661 if (fileName.size() && fileName[0] == '@') {
662 fileName.erase(0, 1);
663 setInputfilesFile(fileName);
664 } else {
666 mdefaultInputFiles.emplace_back(makeFileNameHolder(fileName));
667 }
668 } else if (didirItem[itemName].IsArray()) {
670 auto fns = didirItem[itemName].GetArray();
671 for (auto& fn : fns) {
672 mdefaultInputFiles.emplace_back(makeFileNameHolder(fn.GetString()));
673 }
674 } else {
675 LOGP(error, "Check the JSON document! Item \"{}\" must be a string or an array!", itemName);
676 return false;
677 }
678 }
679
680 itemName = "InputDescriptors";
681 if (didirItem.HasMember(itemName)) {
682 if (!didirItem[itemName].IsArray()) {
683 LOGP(error, "Check the JSON document! Item \"{}\" must be an array!", itemName);
684 return false;
685 }
686
687 // loop over DataInputDescriptors
688 for (auto& didescItem : didirItem[itemName].GetArray()) {
689 if (!didescItem.IsObject()) {
690 LOGP(error, "Check the JSON document! \"{}\" must be objects!", itemName);
691 return false;
692 }
693 // create a new dataInputDescriptor
694 auto didesc = new DataInputDescriptor(mAlienSupport, 0, mMonitoring, mAllowedParentLevel, mParentFileReplacement);
695 didesc->setDefaultInputfiles(&mdefaultInputFiles);
696
697 itemName = "table";
698 if (didescItem.HasMember(itemName)) {
699 if (didescItem[itemName].IsString()) {
700 didesc->tablename = didescItem[itemName].GetString();
701 didesc->matcher = DataDescriptorQueryBuilder::buildNode(didesc->tablename);
702 } else {
703 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
704 return false;
705 }
706 } else {
707 LOGP(error, "Check the JSON document! Item \"{}\" is missing!", itemName);
708 return false;
709 }
710
711 itemName = "treename";
712 if (didescItem.HasMember(itemName)) {
713 if (didescItem[itemName].IsString()) {
714 didesc->treename = didescItem[itemName].GetString();
715 } else {
716 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
717 return false;
718 }
719 } else {
720 auto m = DataDescriptorQueryBuilder::getTokens(didesc->tablename);
721 didesc->treename = m[2];
722 }
723
724 itemName = "fileregex";
725 if (didescItem.HasMember(itemName)) {
726 if (didescItem[itemName].IsString()) {
727 if (didesc->getNumberInputfiles() == 0) {
728 didesc->setFilenamesRegex(didescItem[itemName].GetString());
729 }
730 } else {
731 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
732 return false;
733 }
734 } else {
735 if (didesc->getNumberInputfiles() == 0) {
736 didesc->setFilenamesRegex(mFilenameRegexPtr);
737 }
738 }
739
740 itemName = "resfiles";
741 if (didescItem.HasMember(itemName)) {
742 if (didescItem[itemName].IsString()) {
743 fileName = didescItem[itemName].GetString();
744 if (fileName.size() && fileName[0] == '@') {
745 didesc->setInputfilesFile(fileName.erase(0, 1));
746 } else {
747 if (didesc->getFilenamesRegexString().empty() ||
748 std::regex_match(fileName, didesc->getFilenamesRegex())) {
749 didesc->addFileNameHolder(makeFileNameHolder(fileName));
750 }
751 }
752 } else if (didescItem[itemName].IsArray()) {
753 auto fns = didescItem[itemName].GetArray();
754 for (auto& fn : fns) {
755 if (didesc->getFilenamesRegexString().empty() ||
756 std::regex_match(fn.GetString(), didesc->getFilenamesRegex())) {
757 didesc->addFileNameHolder(makeFileNameHolder(fn.GetString()));
758 }
759 }
760 } else {
761 LOGP(error, "Check the JSON document! Item \"{}\" must be a string or an array!", itemName);
762 return false;
763 }
764 } else {
765 didesc->setInputfilesFile(minputfilesFilePtr);
766 }
767
768 // fill mfilenames and add InputDescriptor to InputDirector
769 if (didesc->fillInputfiles() > 0) {
770 mdataInputDescriptors.emplace_back(didesc);
771 } else {
772 didesc->printOut();
773 LOGP(info, "This DataInputDescriptor is ignored because its file list is empty!");
774 }
775 mAlienSupport &= didesc->isAlienSupportOn();
776 }
777 }
778
779 // add a default DataInputDescriptor
781
782 // check that all DataInputDescriptors have the same number of input files
783 if (!isValid()) {
784 printOut();
785 return false;
786 }
787
788 // print the DataIputDirector
789 if (mDebugMode) {
790 printOut();
791 }
792
793 return true;
794}
795
797{
798 DataInputDescriptor* result = nullptr;
799
800 // compute list of matching outputs
802
803 for (auto didesc : mdataInputDescriptors) {
804 if (didesc->matcher->match(dh, context)) {
805 result = didesc;
806 break;
807 }
808 }
809
810 return result;
811}
812
813arrow::dataset::FileSource DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF)
814{
815 auto didesc = getDataInputDescriptor(dh);
816 // if NOT match then use defaultDataInputDescriptor
817 if (!didesc) {
818 didesc = mdefaultDataInputDescriptor;
819 }
820
821 return didesc->getFileFolder(counter, numTF);
822}
823
825{
826 auto didesc = getDataInputDescriptor(dh);
827 // if NOT match then use defaultDataInputDescriptor
828 if (!didesc) {
829 didesc = mdefaultDataInputDescriptor;
830 }
831
832 return didesc->getTimeFramesInFile(counter);
833}
834
836{
837 auto didesc = getDataInputDescriptor(dh);
838 // if NOT match then use defaultDataInputDescriptor
839 if (!didesc) {
840 didesc = mdefaultDataInputDescriptor;
841 }
842
843 return didesc->getTimeFrameNumber(counter, numTF);
844}
845
846bool DataInputDirector::readTree(DataAllocator& outputs, header::DataHeader dh, int counter, int numTF, size_t& totalSizeCompressed, size_t& totalSizeUncompressed)
847{
848 std::string treename;
849
850 auto didesc = getDataInputDescriptor(dh);
851 if (didesc) {
852 // if match then use filename and treename from DataInputDescriptor
853 treename = didesc->treename;
854 } else {
855 // if NOT match then use
856 // . filename from defaultDataInputDescriptor
857 // . treename from DataHeader
858 didesc = mdefaultDataInputDescriptor;
859 treename = aod::datamodel::getTreeName(dh);
860 }
861
862 auto result = didesc->readTree(outputs, dh, counter, numTF, treename, totalSizeCompressed, totalSizeUncompressed);
863 return result;
864}
865
867{
868 mdefaultDataInputDescriptor->closeInputFile();
869 for (auto didesc : mdataInputDescriptors) {
870 didesc->closeInputFile();
871 }
872}
873
874bool DataInputDirector::isValid()
875{
876 bool status = true;
877 int numberFiles = mdefaultDataInputDescriptor->getNumberInputfiles();
878 for (auto didesc : mdataInputDescriptors) {
879 status &= didesc->getNumberInputfiles() == numberFiles;
880 }
881
882 return status;
883}
884
886{
887 bool status = mdefaultDataInputDescriptor->getNumberInputfiles() <= counter;
888 for (auto didesc : mdataInputDescriptors) {
889 status &= (didesc->getNumberInputfiles() <= counter);
890 }
891
892 return status;
893}
894
896{
897 LOGP(info, "DataInputDirector");
898 LOGP(info, " Default input files file : {}", minputfilesFile);
899 LOGP(info, " Default file name regex : {}", mFilenameRegex);
900 LOGP(info, " Default file names : {}", mdefaultInputFiles.size());
901 for (auto const& fn : mdefaultInputFiles) {
902 LOGP(info, " {} {}", fn->fileName, fn->numberOfTimeFrames);
903 }
904 LOGP(info, " Default DataInputDescriptor:");
905 mdefaultDataInputDescriptor->printOut();
906 LOGP(info, " DataInputDescriptors : {}", getNumberInputDescriptors());
907 for (auto const& didesc : mdataInputDescriptors) {
908 didesc->printOut();
909 }
910}
911
912} // namespace o2::framework
o2::monitoring::tags::Value Value
int32_t i
bool o
uint16_t pos
Definition RawData.h:3
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:483
#define O2_SIGNPOST_ACTION(log, callback)
Definition Signpost.h:504
std::ostringstream debug
StringRef key
decltype(auto) make(const Output &spec, Args... args)
void addFileNameHolder(FileNameHolder *fn)
bool readTree(DataAllocator &outputs, header::DataHeader dh, int counter, int numTF, std::string treename, size_t &totalSizeCompressed, size_t &totalSizeUncompressed)
uint64_t getTimeFrameNumber(int counter, int numTF)
arrow::dataset::FileSource getFileFolder(int counter, int numTF)
DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring *monitoring=nullptr, int allowedParentLevel=0, std::string parentFileReplacement="")
void setDefaultInputfiles(std::vector< FileNameHolder * > *difnptr)
void setInputfilesFile(std::string dffn)
DataInputDescriptor * getParentFile(int counter, int numTF, std::string treename)
int findDFNumber(int file, std::string dfName)
bool readTree(DataAllocator &outputs, header::DataHeader dh, int counter, int numTF, size_t &totalSizeCompressed, size_t &totalSizeUncompressed)
DataInputDescriptor * getDataInputDescriptor(header::DataHeader dh)
arrow::dataset::FileSource getFileFolder(header::DataHeader dh, int counter, int numTF)
void setInputfilesFile(std::string iffn)
int getTimeFramesInFile(header::DataHeader dh, int counter)
uint64_t getTimeFrameNumber(header::DataHeader dh, int counter, int numTF)
void setFilenamesRegex(std::string dfn)
bool readJson(std::string const &fnjson)
std::function< std::shared_ptr< arrow::io::OutputStream >(std::shared_ptr< arrow::dataset::FileFragment >, const std::shared_ptr< arrow::ResizableBuffer > &buffer)> StreamerCreator
void setLabel(const char *label)
bool match(const std::vector< std::string > &queries, const char *pattern)
Definition dcs-ccdb.cxx:229
const GLfloat * m
Definition glcorearb.h:4066
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLdouble f
Definition glcorearb.h:310
GLenum target
Definition glcorearb.h:1641
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLuint start
Definition glcorearb.h:469
GLint GLint GLsizei GLint GLenum format
Definition glcorearb.h:275
GLuint counter
Definition glcorearb.h:3987
std::string getTreeName(header::DataHeader dh)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
FileNameHolder * makeFileNameHolder(std::string fileName)
Defining DataPointCompositeObject explicitly as copiable.
std::string filename()
void empty(int)
Definition list.h:40
static std::unique_ptr< data_matcher::DataDescriptorMatcher > buildNode(std::string const &nodeString)
static std::vector< std::string > getTokens(std::string const &nodeString)
static std::vector< LoadablePlugin > parsePluginSpecString(char const *str)
Parse a comma separated list of <library>:<plugin-name> plugin declarations.
std::vector< RootObjectReadingCapability > capabilities
the main header struct
Definition DataHeader.h:618
const std::string str