Project
Loading...
Searching...
No Matches
DataOutputDirector.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
15#include "Framework/Logger.h"
16
17#include <filesystem>
18#include <regex>
19
20#include "rapidjson/document.h"
21#include "rapidjson/prettywriter.h"
22#include "rapidjson/filereadstream.h"
23
24#include "TMap.h"
25#include "TObjString.h"
26
27namespace fs = std::filesystem;
28
29namespace o2::framework
30{
31using namespace rapidjson;
32
34{
35 // inString is an item consisting of 4 parts which are separated by a ':'
36 // "origin/description/subSpec:treename:col1/col2/col3:filename"
37 // the 1st part is used to create a DataDescriptorMatcher
38 // the other parts are used to fill treename, colnames, and filename
39 // remove all spaces
40 inString.erase(std::remove_if(inString.begin(), inString.end(), isspace), inString.end());
41
42 // reset
43 treename = "";
44 colnames.clear();
45 mfilenameBase = "";
46
47 // analyze the parts of the input string
48 static const std::regex delim1(":");
49 std::sregex_token_iterator end;
50 std::sregex_token_iterator iter1(inString.begin(),
51 inString.end(),
52 delim1,
53 -1);
54
55 // create the DataDescriptorMatcher
56 if (iter1 == end) {
57 return;
58 }
59 auto tableString = iter1->str();
61
62 // get the table name
63 auto tableItems = DataDescriptorQueryBuilder::getTokens(tableString);
64 if (!std::string(tableItems[2]).empty()) {
65 tablename = tableItems[2];
66 }
67 if (!std::string(tableItems[3]).empty() && std::atoi(std::string(tableItems[3]).c_str()) > 0) {
68 version = std::string{"_"}.append(std::string(3 - std::string(tableItems[3]).length(), '0')).append(std::string(tableItems[3]));
69 }
70
71 // get the tree name
72 // default tree name is the O2 + table name (lower case)
74 std::transform(treename.begin(), treename.end(), treename.begin(), [](unsigned char c) { return std::tolower(c); });
75 treename = std::string("O2") + treename + version;
76 ++iter1;
77 if (iter1 == end) {
78 return;
79 }
80 if (!iter1->str().empty()) {
81 treename = iter1->str();
82 }
83
84 // get column names
85 ++iter1;
86 if (iter1 == end) {
87 return;
88 }
89 if (!iter1->str().empty()) {
90 auto cns = iter1->str();
91
92 static const std::regex delim2("/");
93 std::sregex_token_iterator iter2(cns.begin(),
94 cns.end(),
95 delim2,
96 -1);
97 for (; iter2 != end; ++iter2) {
98 if (!iter2->str().empty()) {
99 colnames.emplace_back(iter2->str());
100 }
101 }
102 }
103
104 // get the file name base
105 ++iter1;
106 if (iter1 == end) {
107 return;
108 }
109 if (!iter1->str().empty()) {
110 mfilenameBase = iter1->str();
111 }
112}
113
115{
116 return (mfilenameBase.empty() && mfilenameBasePtr) ? (std::string)*mfilenameBasePtr : mfilenameBase;
117}
118
120{
121 LOGP(info, "DataOutputDescriptor");
122 LOGP(info, " Table name : {}", tablename);
123 LOGP(info, " File name base : {}", getFilenameBase());
124 LOGP(info, " Tree name : {}", treename);
125 if (colnames.empty()) {
126 LOGP(info, " Columns : \"all\"");
127 } else {
128 LOGP(info, " Columns : {}", colnames.size());
129 }
130 for (auto cn : colnames) {
131 LOGP(info, " {}", cn);
132 }
133}
134
136{
137 mfilenameBase = std::string("");
138}
139
141{
142 mDataOutputDescriptors.clear();
143 mfilenameBases.clear();
144 mtreeFilenames.clear();
146 mfilePtrs.clear();
147 mfilenameBase = std::string("");
148 mfileCounter = 1;
149};
150
151void DataOutputDirector::readString(std::string const& keepString)
152{
153 // the keep-string keepString consists of ','-separated items
154 // create for each item a corresponding DataOutputDescriptor
155 static const std::regex delim(",");
156 std::sregex_token_iterator end;
157 std::sregex_token_iterator iter(keepString.begin(),
158 keepString.end(),
159 delim,
160 -1);
161
162 // loop over ','-separated items
163 for (; iter != end; ++iter) {
164 auto itemString = iter->str();
165
166 // create a new DataOutputDescriptor and add it to the list
167 auto dodesc = new DataOutputDescriptor(itemString);
168 if (dodesc->getFilenameBase().empty()) {
169 dodesc->setFilenameBase(mfilenameBasePtr);
170 }
171 mDataOutputDescriptors.emplace_back(dodesc);
172 mfilenameBases.emplace_back(dodesc->getFilenameBase());
173 mtreeFilenames.emplace_back(dodesc->treename + dodesc->getFilenameBase());
174 }
175
176 // the combination [tree name/file name] must be unique
177 // throw exception if this is not the case
178 auto it = std::unique(mtreeFilenames.begin(), mtreeFilenames.end());
179 if (it != mtreeFilenames.end()) {
180 printOut();
181 LOGP(fatal, "Dublicate tree names in a file!");
182 }
183
184 // make unique/sorted list of filenameBases
185 std::sort(mfilenameBases.begin(), mfilenameBases.end());
186 auto last = std::unique(mfilenameBases.begin(), mfilenameBases.end());
187 mfilenameBases.erase(last, mfilenameBases.end());
188
189 // prepare list mfilePtrs of TFile
190 for (auto fn : mfilenameBases) {
191 mfilePtrs.emplace_back(new TFile());
192 mParentMaps.emplace_back(new TMap());
193 }
194}
195
196// creates a keep string from a InputSpec
197std::string SpectoString(InputSpec input)
198{
199 std::string keepString;
200 std::string delim("/");
201
202 auto matcher = DataSpecUtils::asConcreteDataMatcher(input);
203 keepString += matcher.origin.str + delim;
204 keepString += matcher.description.str + delim;
205 keepString += std::to_string(matcher.subSpec);
206
207 return keepString;
208}
209
210void DataOutputDirector::readSpecs(std::vector<InputSpec> inputs)
211{
212 for (auto input : inputs) {
213 auto keepString = SpectoString(input);
214 readString(keepString);
215 }
216}
217
218std::tuple<std::string, std::string, std::string, float, int> DataOutputDirector::readJson(std::string const& fnjson)
219{
220 // open the file
221 FILE* fjson = fopen(fnjson.c_str(), "r");
222 if (!fjson) {
223 LOGP(info, "Could not open JSON file \"{}\"", fnjson);
224 return memptyanswer;
225 }
226
227 // create streamer
228 char readBuffer[65536];
229 FileReadStream jsonStream(fjson, readBuffer, sizeof(readBuffer));
230
231 // parse the json file
232 Document jsonDocument;
233 jsonDocument.ParseStream(jsonStream);
234 auto [rdn, dfn, fmode, mfs, ntfm] = readJsonDocument(&jsonDocument);
235
236 // clean up
237 fclose(fjson);
238
239 return std::make_tuple(rdn, dfn, fmode, mfs, ntfm);
240}
241
242std::tuple<std::string, std::string, std::string, float, int> DataOutputDirector::readJsonString(std::string const& jsonString)
243{
244 // parse the json string
245 Document jsonDocument;
246 jsonDocument.Parse(jsonString.c_str());
247 auto [rdn, dfn, fmode, mfs, ntfm] = readJsonDocument(&jsonDocument);
248
249 return std::make_tuple(rdn, dfn, fmode, mfs, ntfm);
250}
251
252std::tuple<std::string, std::string, std::string, float, int> DataOutputDirector::readJsonDocument(Document* jsonDocument)
253{
254 std::string smc(":");
255 std::string slh("/");
256 const char* itemName;
257
258 // initialisations
259 std::string resdir(".");
260 std::string dfn("");
261 std::string fmode("");
262 float maxfs = -1.;
263 int ntfm = -1;
264
265 // is it a proper json document?
266 if (jsonDocument->HasParseError()) {
267 LOGP(error, "Check the JSON document! There is a problem with the format!");
268 return memptyanswer;
269 }
270
271 // OutputDirector
272 itemName = "OutputDirector";
273 const Value& dodirItem = (*jsonDocument)[itemName];
274 if (!dodirItem.IsObject()) {
275 LOGP(info, "No \"{}\" object found in the JSON document!", itemName);
276 return memptyanswer;
277 }
278
279 // now read various items
280 itemName = "debugmode";
281 if (dodirItem.HasMember(itemName)) {
282 if (dodirItem[itemName].IsBool()) {
283 mdebugmode = dodirItem[itemName].GetBool();
284 } else {
285 LOGP(error, "Check the JSON document! Item \"{}\" must be a boolean!", itemName);
286 return memptyanswer;
287 }
288 } else {
289 mdebugmode = false;
290 }
291
292 if (mdebugmode) {
293 StringBuffer buffer;
294 buffer.Clear();
295 Writer<rapidjson::StringBuffer> writer(buffer);
296 dodirItem.Accept(writer);
297 }
298
299 itemName = "resdir";
300 if (dodirItem.HasMember(itemName)) {
301 if (dodirItem[itemName].IsString()) {
302 resdir = dodirItem[itemName].GetString();
303 setResultDir(resdir);
304 } else {
305 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
306 return memptyanswer;
307 }
308 }
309
310 itemName = "resfile";
311 if (dodirItem.HasMember(itemName)) {
312 if (dodirItem[itemName].IsString()) {
313 dfn = dodirItem[itemName].GetString();
314 setFilenameBase(dfn);
315 } else {
316 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
317 return memptyanswer;
318 }
319 }
320
321 itemName = "resfilemode";
322 if (dodirItem.HasMember(itemName)) {
323 if (dodirItem[itemName].IsString()) {
324 fmode = dodirItem[itemName].GetString();
325 setFileMode(fmode);
326 } else {
327 LOGP(error, "Check the JSON document! Item \"{}\" must be a string!", itemName);
328 return memptyanswer;
329 }
330 }
331
332 itemName = "maxfilesize";
333 if (dodirItem.HasMember(itemName)) {
334 if (dodirItem[itemName].IsNumber()) {
335 maxfs = dodirItem[itemName].GetFloat();
336 setMaximumFileSize(maxfs);
337 } else {
338 LOGP(error, "Check the JSON document! Item \"{}\" must be a number!", itemName);
339 return memptyanswer;
340 }
341 }
342
343 itemName = "ntfmerge";
344 if (dodirItem.HasMember(itemName)) {
345 if (dodirItem[itemName].IsNumber()) {
346 ntfm = dodirItem[itemName].GetInt();
348 } else {
349 LOGP(error, "Check the JSON document! Item \"{}\" must be a number!", itemName);
350 return memptyanswer;
351 }
352 }
353
354 itemName = "OutputDescriptors";
355 if (dodirItem.HasMember(itemName)) {
356 if (!dodirItem[itemName].IsArray()) {
357 LOGP(error, "Check the JSON document! Item \"{}\" must be an array!", itemName);
358 return memptyanswer;
359 }
360
361 // loop over DataOutputDescriptors
362 for (auto& dodescItem : dodirItem[itemName].GetArray()) {
363 if (!dodescItem.IsObject()) {
364 LOGP(error, "Check the JSON document! \"{}\" must be objects!", itemName);
365 return memptyanswer;
366 }
367
368 std::string dodString = "";
369 itemName = "table";
370 if (dodescItem.HasMember(itemName)) {
371 if (dodescItem[itemName].IsString()) {
372 dodString += dodescItem[itemName].GetString();
373 } else {
374 LOGP(error, "Check the JSON document! \"{}\" must be a string!", itemName);
375 return memptyanswer;
376 }
377 }
378 dodString += smc;
379 itemName = "treename";
380 if (dodescItem.HasMember(itemName)) {
381 if (dodescItem[itemName].IsString()) {
382 dodString += dodescItem[itemName].GetString();
383 } else {
384 LOGP(error, "Check the JSON document! \"{}\" must be a string!", itemName);
385 return memptyanswer;
386 }
387 }
388 dodString += smc;
389 itemName = "columns";
390 if (dodescItem.HasMember(itemName)) {
391 if (dodescItem[itemName].IsArray()) {
392 auto columnNames = dodescItem[itemName].GetArray();
393 for (auto& c : columnNames) {
394 dodString += (c == columnNames[0]) ? c.GetString() : slh + c.GetString();
395 }
396 } else {
397 LOGP(error, "Check the JSON document! \"{}\" must be an array!", itemName);
398 return memptyanswer;
399 }
400 }
401 dodString += smc;
402 itemName = "filename";
403 if (dodescItem.HasMember(itemName)) {
404 if (dodescItem[itemName].IsString()) {
405 dodString += dodescItem[itemName].GetString();
406 } else {
407 LOGP(error, "Check the JSON document! \"{}\" must be a string!", itemName);
408 return memptyanswer;
409 }
410 }
411
412 // convert s to DataOutputDescription object
413 readString(dodString);
414 }
415 }
416
417 // print the DataOutputDirector
418 if (mdebugmode) {
419 printOut();
420 }
421
422 return std::make_tuple(resdir, dfn, fmode, maxfs, ntfm);
423}
424
426{
427 std::vector<DataOutputDescriptor*> result;
428
429 // compute list of matching outputs
431
432 for (auto dodescr : mDataOutputDescriptors) {
433 if (dodescr->matcher->match(dh, context)) {
434 result.emplace_back(dodescr);
435 }
436 }
437
438 return result;
439}
440
441std::vector<DataOutputDescriptor*> DataOutputDirector::getDataOutputDescriptors(InputSpec spec)
442{
443 std::vector<DataOutputDescriptor*> result;
444
445 // compute list of matching outputs
447 auto concrete = std::get<ConcreteDataMatcher>(spec.matcher);
448
449 for (auto dodescr : mDataOutputDescriptors) {
450 if (dodescr->matcher->match(concrete, context)) {
451 result.emplace_back(dodescr);
452 }
453 }
454
455 return result;
456}
457
458FileAndFolder DataOutputDirector::getFileFolder(DataOutputDescriptor* dodesc, uint64_t folderNumber, std::string parentFileName, int compression)
459{
460 // initialisation
461 FileAndFolder fileAndFolder;
462
463 // search dodesc->filename in mfilenameBases and return corresponding filePtr
464 auto it = std::find(mfilenameBases.begin(), mfilenameBases.end(), dodesc->getFilenameBase());
465 if (it != mfilenameBases.end()) {
466 int ind = std::distance(mfilenameBases.begin(), it);
467
468 // open new output file
469 if (!mfilePtrs[ind]->IsOpen()) {
470 // output directory
471 auto resdirname = mresultDirectory;
472 // is the maximum-file-size check enabled?
473 if (mmaxfilesize > 0.) {
474 // subdirectory ./xxx
475 char chcnt[4];
476 std::snprintf(chcnt, sizeof(chcnt), "%03d", mfileCounter);
477 resdirname += "/" + std::string(chcnt);
478 }
479 auto resdir = fs::path{resdirname.c_str()};
480
481 if (!fs::is_directory(resdir)) {
482 if (!fs::create_directories(resdir)) {
483 LOGF(fatal, "Could not create output directory %s", resdirname.c_str());
484 }
485 }
486
487 // complete file name
488 auto fn = resdirname + "/" + mfilenameBases[ind] + ".root";
489 delete mfilePtrs[ind];
490 mParentMaps[ind]->Clear();
491 mfilePtrs[ind] = TFile::Open(fn.c_str(), mfileMode.c_str(), "", compression);
492 }
493 fileAndFolder.file = mfilePtrs[ind];
494
495 // check if folder DF_* exists
496 fileAndFolder.folderName = "DF_" + std::to_string(folderNumber);
497 auto key = fileAndFolder.file->GetKey(fileAndFolder.folderName.c_str());
498 if (!key) {
499 fileAndFolder.file->mkdir(fileAndFolder.folderName.c_str());
500 // TODO not clear why we get a " " in case we sent empty over DPL, put the limit to 1 for now
501 if (parentFileName.length() > 1) {
502 mParentMaps[ind]->Add(new TObjString(fileAndFolder.folderName.c_str()), new TObjString(parentFileName.c_str()));
503 }
504 }
505 fileAndFolder.file->cd(fileAndFolder.folderName.c_str());
506 }
507
508 return fileAndFolder;
509}
510
512{
513 // is the maximum-file-size check enabled?
514 if (mmaxfilesize <= 0.) {
515 return true;
516 }
517
518 // current result directory
519 char chcnt[4];
520 std::snprintf(chcnt, sizeof(chcnt), "%03d", mfileCounter);
521 std::string strcnt{chcnt};
522 auto resdirname = mresultDirectory + "/" + strcnt;
523
524 // loop over all files
525 // if one file is large, then all files need to be closed
526 for (auto i = 0U; i < mfilenameBases.size(); i++) {
527 if (!mfilePtrs[i]) {
528 continue;
529 }
530 // size of fn
531 auto fn = resdirname + "/" + mfilenameBases[i] + ".root";
532 auto resfile = fs::path{fn.c_str()};
533 if (!fs::exists(resfile)) {
534 continue;
535 }
536 auto fsize = (float)fs::file_size(resfile) / 1.E6; // MBytes
537 LOGF(debug, "File %s: %f MBytes", fn.c_str(), fsize);
538 if (fsize >= mmaxfilesize) {
540 // increment the subdirectory counter
541 mfileCounter++;
542 return false;
543 }
544 }
545
546 return true;
547}
548
550{
551 for (auto i = 0U; i < mfilePtrs.size(); i++) {
552 auto filePtr = mfilePtrs[i];
553 if (filePtr) {
554 if (filePtr->IsOpen() && mParentMaps[i]->GetEntries() > 0) {
555 filePtr->cd("/");
556 filePtr->WriteObject(mParentMaps[i], "parentFiles");
557 }
558 filePtr->Close();
559 }
560 }
561}
562
564{
565 LOGP(info, "DataOutputDirector");
566 LOGP(info, " Output directory : {}", mresultDirectory);
567 LOGP(info, " Default file name : {}", mfilenameBase);
568 LOGP(info, " Maximum file size : {} megabytes", mmaxfilesize);
569 LOGP(info, " Number of files : {}", mfilenameBases.size());
570
571 LOGP(info, " DataOutputDescriptors: {}", mDataOutputDescriptors.size());
572 for (auto const& ds : mDataOutputDescriptors) {
573 ds->printOut();
574 }
575
576 LOGP(info, " File name bases :");
577 for (auto const& fb : mfilenameBases) {
578 LOGP(info, "{}", fb);
579 }
580}
581
582void DataOutputDirector::setResultDir(std::string resDir)
583{
584 mresultDirectory = resDir;
585}
586
588{
589 // reset
590 mfilenameBase = dfn;
591
592 mfilenameBases.clear();
593 mtreeFilenames.clear();
595 mfilePtrs.clear();
596
597 // loop over DataOutputDescritors
598 for (auto dodesc : mDataOutputDescriptors) {
599 mfilenameBases.emplace_back(dodesc->getFilenameBase());
600 mtreeFilenames.emplace_back(dodesc->treename + dodesc->getFilenameBase());
601 }
602
603 // the combination [tree name/file name] must be unique
604 // throw exception if this is not the case
605 auto it = std::unique(mtreeFilenames.begin(), mtreeFilenames.end());
606 if (it != mtreeFilenames.end()) {
607 printOut();
608 LOG(fatal) << "Duplicate tree names in a file!";
609 }
610
611 // make unique/sorted list of filenameBases
612 std::sort(mfilenameBases.begin(), mfilenameBases.end());
613 auto last = std::unique(mfilenameBases.begin(), mfilenameBases.end());
614 mfilenameBases.erase(last, mfilenameBases.end());
615
616 // prepare list mfilePtrs of TFile
617 for (auto fn : mfilenameBases) {
618 mfilePtrs.emplace_back(new TFile());
619 mParentMaps.emplace_back(new TMap());
620 }
621}
622
624{
625 mmaxfilesize = maxfs;
626}
627} // namespace o2::framework
o2::monitoring::tags::Value Value
int32_t i
uint32_t c
Definition RawData.h:2
std::ostringstream debug
StringRef key
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint buffer
Definition glcorearb.h:655
GLuint GLuint end
Definition glcorearb.h:469
GLuint GLsizei GLsizei * length
Definition glcorearb.h:790
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::string SpectoString(InputSpec input)
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
void empty(int)
static std::unique_ptr< data_matcher::DataDescriptorMatcher > buildNode(std::string const &nodeString)
static std::vector< std::string > getTokens(std::string const &nodeString)
std::vector< std::string > colnames
std::unique_ptr< data_matcher::DataDescriptorMatcher > matcher
void setFileMode(std::string filemode)
void readString(std::string const &keepString)
FileAndFolder getFileFolder(DataOutputDescriptor *dodesc, uint64_t folderNumber, std::string parentFileName, int compression)
void readSpecs(std::vector< InputSpec > inputs)
std::tuple< std::string, std::string, std::string, float, int > readJson(std::string const &fnjson)
std::tuple< std::string, std::string, std::string, float, int > readJsonString(std::string const &stjson)
void setNumberTimeFramesToMerge(int ntfmerge)
std::vector< DataOutputDescriptor * > getDataOutputDescriptors(header::DataHeader dh)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
std::variant< ConcreteDataMatcher, data_matcher::DataDescriptorMatcher > matcher
The actual matcher for the input spec.
Definition InputSpec.h:70
the main header struct
Definition DataHeader.h:618
o2::mch::DsIndex ds
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"