20#include "rapidjson/document.h"
21#include "rapidjson/prettywriter.h"
22#include "rapidjson/filereadstream.h"
25#include "TObjString.h"
27namespace fs = std::filesystem;
31using namespace rapidjson;
40 inString.erase(std::remove_if(inString.begin(), inString.end(), isspace), inString.end());
48 static const std::regex delim1(
":");
49 std::sregex_token_iterator
end;
50 std::sregex_token_iterator iter1(inString.begin(),
59 auto tableString = iter1->str();
64 if (!std::string(tableItems[2]).
empty()) {
67 if (!std::string(tableItems[3]).
empty() && std::atoi(std::string(tableItems[3]).c_str()) > 0) {
68 version = std::string{
"_"}.append(std::string(3 - std::string(tableItems[3]).
length(),
'0')).append(std::string(tableItems[3]));
80 if (!iter1->str().empty()) {
89 if (!iter1->str().empty()) {
90 auto cns = iter1->str();
92 static const std::regex delim2(
"/");
93 std::sregex_token_iterator iter2(cns.begin(),
97 for (; iter2 !=
end; ++iter2) {
98 if (!iter2->str().empty()) {
109 if (!iter1->str().empty()) {
110 mfilenameBase = iter1->str();
116 return (mfilenameBase.empty() && mfilenameBasePtr) ? (std::string)*mfilenameBasePtr : mfilenameBase;
121 LOGP(info,
"DataOutputDescriptor");
122 LOGP(info,
" Table name : {}",
tablename);
124 LOGP(info,
" Tree name : {}",
treename);
126 LOGP(info,
" Columns : \"all\"");
128 LOGP(info,
" Columns : {}",
colnames.size());
131 LOGP(info,
" {}", cn);
137 mfilenameBase = std::string(
"");
142 mDataOutputDescriptors.clear();
143 mfilenameBases.clear();
144 mtreeFilenames.clear();
147 mfilenameBase = std::string(
"");
155 static const std::regex delim(
",");
156 std::sregex_token_iterator
end;
157 std::sregex_token_iterator iter(keepString.begin(),
163 for (; iter !=
end; ++iter) {
164 auto itemString = iter->str();
168 if (dodesc->getFilenameBase().empty()) {
169 dodesc->setFilenameBase(mfilenameBasePtr);
171 mDataOutputDescriptors.emplace_back(dodesc);
172 mfilenameBases.emplace_back(dodesc->getFilenameBase());
173 mtreeFilenames.emplace_back(dodesc->treename + dodesc->getFilenameBase());
178 auto it = std::unique(mtreeFilenames.begin(), mtreeFilenames.end());
179 if (it != mtreeFilenames.end()) {
181 LOGP(fatal,
"Dublicate tree names in a file!");
185 std::sort(mfilenameBases.begin(), mfilenameBases.end());
186 auto last = std::unique(mfilenameBases.begin(), mfilenameBases.end());
187 mfilenameBases.erase(last, mfilenameBases.end());
190 for (
auto fn : mfilenameBases) {
191 mfilePtrs.emplace_back(
new TFile());
192 mParentMaps.emplace_back(
new TMap());
199 std::string keepString;
200 std::string delim(
"/");
203 keepString += matcher.origin.str + delim;
204 keepString += matcher.description.str + delim;
212 for (
auto input : inputs) {
221 FILE* fjson = fopen(fnjson.c_str(),
"r");
223 LOGP(info,
"Could not open JSON file \"{}\"", fnjson);
228 char readBuffer[65536];
229 FileReadStream jsonStream(fjson, readBuffer,
sizeof(readBuffer));
232 Document jsonDocument;
233 jsonDocument.ParseStream(jsonStream);
234 auto [rdn, dfn, fmode, mfs, ntfm] = readJsonDocument(&jsonDocument);
239 return std::make_tuple(rdn, dfn, fmode, mfs, ntfm);
245 Document jsonDocument;
246 jsonDocument.Parse(jsonString.c_str());
247 auto [rdn, dfn, fmode, mfs, ntfm] = readJsonDocument(&jsonDocument);
249 return std::make_tuple(rdn, dfn, fmode, mfs, ntfm);
252std::tuple<std::string, std::string, std::string, float, int> DataOutputDirector::readJsonDocument(Document* jsonDocument)
254 std::string smc(
":");
255 std::string slh(
"/");
256 const char* itemName;
259 std::string resdir(
".");
261 std::string fmode(
"");
266 if (jsonDocument->HasParseError()) {
267 LOGP(error,
"Check the JSON document! There is a problem with the format!");
272 itemName =
"OutputDirector";
273 const Value& dodirItem = (*jsonDocument)[itemName];
274 if (!dodirItem.IsObject()) {
275 LOGP(info,
"No \"{}\" object found in the JSON document!", itemName);
280 itemName =
"debugmode";
281 if (dodirItem.HasMember(itemName)) {
282 if (dodirItem[itemName].IsBool()) {
283 mdebugmode = dodirItem[itemName].GetBool();
285 LOGP(error,
"Check the JSON document! Item \"{}\" must be a boolean!", itemName);
295 Writer<rapidjson::StringBuffer> writer(
buffer);
296 dodirItem.Accept(writer);
300 if (dodirItem.HasMember(itemName)) {
301 if (dodirItem[itemName].IsString()) {
302 resdir = dodirItem[itemName].GetString();
305 LOGP(error,
"Check the JSON document! Item \"{}\" must be a string!", itemName);
310 itemName =
"resfile";
311 if (dodirItem.HasMember(itemName)) {
312 if (dodirItem[itemName].IsString()) {
313 dfn = dodirItem[itemName].GetString();
316 LOGP(error,
"Check the JSON document! Item \"{}\" must be a string!", itemName);
321 itemName =
"resfilemode";
322 if (dodirItem.HasMember(itemName)) {
323 if (dodirItem[itemName].IsString()) {
324 fmode = dodirItem[itemName].GetString();
327 LOGP(error,
"Check the JSON document! Item \"{}\" must be a string!", itemName);
332 itemName =
"maxfilesize";
333 if (dodirItem.HasMember(itemName)) {
334 if (dodirItem[itemName].IsNumber()) {
335 maxfs = dodirItem[itemName].GetFloat();
338 LOGP(error,
"Check the JSON document! Item \"{}\" must be a number!", itemName);
343 itemName =
"ntfmerge";
344 if (dodirItem.HasMember(itemName)) {
345 if (dodirItem[itemName].IsNumber()) {
346 ntfm = dodirItem[itemName].GetInt();
349 LOGP(error,
"Check the JSON document! Item \"{}\" must be a number!", itemName);
354 itemName =
"OutputDescriptors";
355 if (dodirItem.HasMember(itemName)) {
356 if (!dodirItem[itemName].IsArray()) {
357 LOGP(error,
"Check the JSON document! Item \"{}\" must be an array!", itemName);
362 for (
auto& dodescItem : dodirItem[itemName].GetArray()) {
363 if (!dodescItem.IsObject()) {
364 LOGP(error,
"Check the JSON document! \"{}\" must be objects!", itemName);
368 std::string dodString =
"";
370 if (dodescItem.HasMember(itemName)) {
371 if (dodescItem[itemName].IsString()) {
372 dodString += dodescItem[itemName].GetString();
374 LOGP(error,
"Check the JSON document! \"{}\" must be a string!", itemName);
379 itemName =
"treename";
380 if (dodescItem.HasMember(itemName)) {
381 if (dodescItem[itemName].IsString()) {
382 dodString += dodescItem[itemName].GetString();
384 LOGP(error,
"Check the JSON document! \"{}\" must be a string!", itemName);
389 itemName =
"columns";
390 if (dodescItem.HasMember(itemName)) {
391 if (dodescItem[itemName].IsArray()) {
392 auto columnNames = dodescItem[itemName].GetArray();
393 for (
auto&
c : columnNames) {
394 dodString += (
c == columnNames[0]) ?
c.GetString() : slh +
c.GetString();
397 LOGP(error,
"Check the JSON document! \"{}\" must be an array!", itemName);
402 itemName =
"filename";
403 if (dodescItem.HasMember(itemName)) {
404 if (dodescItem[itemName].IsString()) {
405 dodString += dodescItem[itemName].GetString();
407 LOGP(error,
"Check the JSON document! \"{}\" must be a string!", itemName);
422 return std::make_tuple(resdir, dfn, fmode, maxfs, ntfm);
427 std::vector<DataOutputDescriptor*>
result;
432 for (
auto dodescr : mDataOutputDescriptors) {
433 if (dodescr->matcher->match(dh, context)) {
434 result.emplace_back(dodescr);
443 std::vector<DataOutputDescriptor*>
result;
447 auto concrete = std::get<ConcreteDataMatcher>(spec.
matcher);
449 for (
auto dodescr : mDataOutputDescriptors) {
450 if (dodescr->matcher->match(concrete, context)) {
451 result.emplace_back(dodescr);
464 auto it = std::find(mfilenameBases.begin(), mfilenameBases.end(), dodesc->
getFilenameBase());
465 if (it != mfilenameBases.end()) {
466 int ind = std::distance(mfilenameBases.begin(), it);
469 if (!mfilePtrs[ind]->IsOpen()) {
471 auto resdirname = mresultDirectory;
473 if (mmaxfilesize > 0.) {
476 std::snprintf(chcnt,
sizeof(chcnt),
"%03d", mfileCounter);
477 resdirname +=
"/" + std::string(chcnt);
479 auto resdir = fs::path{resdirname.c_str()};
481 if (!fs::is_directory(resdir)) {
482 if (!fs::create_directories(resdir)) {
483 LOGF(fatal,
"Could not create output directory %s", resdirname.c_str());
488 auto fn = resdirname +
"/" + mfilenameBases[ind] +
".root";
489 delete mfilePtrs[ind];
490 mParentMaps[ind]->Clear();
491 mfilePtrs[ind] = TFile::Open(fn.c_str(), mfileMode.c_str(),
"", compression);
493 fileAndFolder.
file = mfilePtrs[ind];
501 if (parentFileName.length() > 1) {
502 mParentMaps[ind]->Add(
new TObjString(fileAndFolder.
folderName.c_str()),
new TObjString(parentFileName.c_str()));
508 return fileAndFolder;
514 if (mmaxfilesize <= 0.) {
520 std::snprintf(chcnt,
sizeof(chcnt),
"%03d", mfileCounter);
521 std::string strcnt{chcnt};
522 auto resdirname = mresultDirectory +
"/" + strcnt;
526 for (
auto i = 0U;
i < mfilenameBases.size();
i++) {
531 auto fn = resdirname +
"/" + mfilenameBases[
i] +
".root";
532 auto resfile = fs::path{fn.c_str()};
533 if (!fs::exists(resfile)) {
536 auto fsize = (float)fs::file_size(resfile) / 1.E6;
537 LOGF(
debug,
"File %s: %f MBytes", fn.c_str(), fsize);
538 if (fsize >= mmaxfilesize) {
551 for (
auto i = 0U;
i < mfilePtrs.size();
i++) {
552 auto filePtr = mfilePtrs[
i];
554 if (filePtr->IsOpen() && mParentMaps[
i]->GetEntries() > 0) {
556 filePtr->WriteObject(mParentMaps[
i],
"parentFiles");
565 LOGP(info,
"DataOutputDirector");
566 LOGP(info,
" Output directory : {}", mresultDirectory);
567 LOGP(info,
" Default file name : {}", mfilenameBase);
568 LOGP(info,
" Maximum file size : {} megabytes", mmaxfilesize);
569 LOGP(info,
" Number of files : {}", mfilenameBases.size());
571 LOGP(info,
" DataOutputDescriptors: {}", mDataOutputDescriptors.size());
572 for (
auto const&
ds : mDataOutputDescriptors) {
576 LOGP(info,
" File name bases :");
577 for (
auto const& fb : mfilenameBases) {
578 LOGP(info,
"{}", fb);
584 mresultDirectory = resDir;
592 mfilenameBases.clear();
593 mtreeFilenames.clear();
598 for (
auto dodesc : mDataOutputDescriptors) {
599 mfilenameBases.emplace_back(dodesc->getFilenameBase());
600 mtreeFilenames.emplace_back(dodesc->treename + dodesc->getFilenameBase());
605 auto it = std::unique(mtreeFilenames.begin(), mtreeFilenames.end());
606 if (it != mtreeFilenames.end()) {
608 LOG(fatal) <<
"Duplicate tree names in a file!";
612 std::sort(mfilenameBases.begin(), mfilenameBases.end());
613 auto last = std::unique(mfilenameBases.begin(), mfilenameBases.end());
614 mfilenameBases.erase(last, mfilenameBases.end());
617 for (
auto fn : mfilenameBases) {
618 mfilePtrs.emplace_back(
new TFile());
619 mParentMaps.emplace_back(
new TMap());
625 mmaxfilesize = maxfs;
o2::monitoring::tags::Value Value
GLuint GLsizei GLsizei * length
Defining PrimaryVertex explicitly as messageable.
std::string SpectoString(InputSpec input)
std::string to_string(gsl::span< T, Size > span)
static std::unique_ptr< data_matcher::DataDescriptorMatcher > buildNode(std::string const &nodeString)
static std::vector< std::string > getTokens(std::string const &nodeString)
std::string getFilenameBase()
DataOutputDescriptor(std::string sin)
std::vector< std::string > colnames
std::unique_ptr< data_matcher::DataDescriptorMatcher > matcher
void setFileMode(std::string filemode)
void readString(std::string const &keepString)
FileAndFolder getFileFolder(DataOutputDescriptor *dodesc, uint64_t folderNumber, std::string parentFileName, int compression)
void readSpecs(std::vector< InputSpec > inputs)
std::tuple< std::string, std::string, std::string, float, int > readJson(std::string const &fnjson)
std::tuple< std::string, std::string, std::string, float, int > readJsonString(std::string const &stjson)
void setMaximumFileSize(float maxfs)
void setNumberTimeFramesToMerge(int ntfmerge)
std::vector< DataOutputDescriptor * > getDataOutputDescriptors(header::DataHeader dh)
void setResultDir(std::string resDir)
void setFilenameBase(std::string dfn)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"