34int main(
int argc,
char* argv[])
36 std::string inputCollection(
"input.txt");
37 std::string outputFileName(
"AO2D.root");
38 long maxDirSize = 100000000;
39 bool skipNonExistingFiles =
false;
40 bool skipParentFilesList =
false;
41 bool mergeByName =
false;
44 int compression = 505;
47 static struct option long_options[] = {
48 {
"input", required_argument,
nullptr, 0},
49 {
"output", required_argument,
nullptr, 1},
50 {
"max-size", required_argument,
nullptr, 2},
51 {
"skip-non-existing-files", no_argument,
nullptr, 3},
52 {
"skip-parent-files-list", no_argument,
nullptr, 4},
53 {
"compression", required_argument,
nullptr, 5},
54 {
"merge-by-name", no_argument,
nullptr, 6},
55 {
"verbosity", required_argument,
nullptr,
'v'},
56 {
"help", no_argument,
nullptr,
'h'},
57 {
nullptr, 0,
nullptr, 0}};
60 int c = getopt_long(argc, argv,
"", long_options, &option_index);
64 inputCollection = optarg;
66 outputFileName = optarg;
68 maxDirSize = atol(optarg);
70 skipNonExistingFiles =
true;
72 skipParentFilesList =
true;
74 compression = atoi(optarg);
77 }
else if (
c ==
'v') {
79 }
else if (
c ==
'h') {
80 printf(
"AO2D merging tool. Options: \n");
81 printf(
" --input <inputfile.txt> Contains path to files to be merged. Default: %s\n", inputCollection.c_str());
82 printf(
" --output <outputfile.root> Target output ROOT file. Default: %s\n", outputFileName.c_str());
83 printf(
" --max-size <size in Bytes> Target directory size. Default: %ld. Set to 0 if file is not self-contained.\n", maxDirSize);
84 printf(
" --skip-non-existing-files Flag to allow skipping of non-existing files in the input list.\n");
85 printf(
" --skip-parent-files-list Flag to allow skipping the merging of the parent files list.\n");
86 printf(
" --compression <root compression id> Compression algorithm / level to use (default: %d)\n", compression);
87 printf(
" --merge-by-name Only merge TTrees from folders with the same name.\n");
88 printf(
" --verbosity <flag> Verbosity of output (default: %d).\n",
verbosity);
95 printf(
"AOD merger started with:\n");
96 printf(
" Input file: %s\n", inputCollection.c_str());
97 printf(
" Output file name: %s\n", outputFileName.c_str());
98 printf(
" Maximal folder size (uncompressed): %ld\n", maxDirSize);
99 if (skipNonExistingFiles) {
100 printf(
" WARNING: Skipping non-existing files.\n");
103 printf(
" Merging only folders with the same name.\n");
106 std::map<std::string, TTree*> trees;
107 std::map<std::string, uint64_t> sizeCompressed;
108 std::map<std::string, uint64_t> sizeUncompressed;
109 std::map<std::string, int>
offsets;
110 std::map<std::string, int> unassignedIndexOffset;
112 auto outputFile = TFile::Open(outputFileName.c_str(),
"RECREATE",
"", compression);
113 TDirectory* outputDir =
nullptr;
114 long currentDirSize = 0;
117 in.open(inputCollection);
119 TMap* metaData =
nullptr;
120 TMap* parentFiles =
nullptr;
121 int totalMergedDFs = 0;
125 auto flushTrees = [&](
bool resetState) {
129 for (
auto const&
tree : trees) {
131 tree.second->Write();
132 sizeCompressed[
tree.first] +=
tree.second->GetZipBytes();
133 sizeUncompressed[
tree.first] +=
tree.second->GetTotBytes();
145 while (in.good() && exitCode == 0) {
148 if (line.Length() == 0) {
152 if (line.BeginsWith(
"alien:") && !gGrid) {
153 printf(
"Connecting to AliEn...");
154 TGrid::Connect(
"alien:");
157 printf(
"Processing input file: %s\n", line.Data());
159 auto inputFile = TFile::Open(line);
160 if (!inputFile || inputFile->IsZombie()) {
161 printf(
"Error: %s input file %s.\n", !inputFile ?
"Could not open" :
"Zombie", line.Data());
162 if (skipNonExistingFiles) {
165 printf(
"Aborting merge!\n");
171 TList* keyList = inputFile->GetListOfKeys();
174 for (
auto key1 : *keyList) {
175 if (((TObjString*)key1)->GetString().EqualTo(
"metaData")) {
176 auto metaDataCurrentFile = (TMap*)inputFile->Get(
"metaData");
177 if (metaData ==
nullptr) {
178 metaData = metaDataCurrentFile;
180 metaData->Write(
"metaData", TObject::kSingleKey);
182 for (
auto metaDataPair : *metaData) {
183 auto metaDataKey = ((TPair*)metaDataPair)->Key();
184 if (metaDataCurrentFile->Contains(((TObjString*)metaDataKey)->GetString())) {
185 auto value = (TObjString*)metaData->GetValue(((TObjString*)metaDataKey)->GetString());
186 auto valueCurrentFile = (TObjString*)metaDataCurrentFile->GetValue(((TObjString*)metaDataKey)->GetString());
187 if (!
value->GetString().EqualTo(valueCurrentFile->GetString())) {
188 printf(
"WARNING: Metadata differs between input files. Key %s : %s vs. %s\n", ((TObjString*)metaDataKey)->GetString().Data(),
189 value->GetString().Data(), valueCurrentFile->GetString().Data());
192 printf(
"WARNING: Metadata differs between input files. Key %s is not present in current file\n", ((TObjString*)metaDataKey)->GetString().Data());
198 if (((TObjString*)key1)->GetString().EqualTo(
"parentFiles") && !skipParentFilesList) {
199 auto parentFilesCurrentFile = (TMap*)inputFile->Get(
"parentFiles");
200 if (parentFiles ==
nullptr) {
201 parentFiles =
new TMap;
203 for (
auto pair : *parentFilesCurrentFile) {
204 parentFiles->Add(((TPair*)pair)->Key(), ((TPair*)pair)->Value());
206 delete parentFilesCurrentFile;
209 if (!((TObjString*)key1)->GetString().BeginsWith(
"DF_")) {
213 auto dfName = ((TObjString*)key1)->GetString().Data();
216 if (mergeByName && outputDir && std::string(outputDir->GetName()) != std::string(dfName)) {
218 printf(
"Folder name changed: closing folder %s.\n", outputDir->GetName());
224 printf(
" Processing folder %s\n", dfName);
228 auto folder = (TDirectoryFile*)inputFile->Get(dfName);
229 auto treeList = folder->GetListOfKeys();
234 for (
auto i = 0;
i < treeList->GetEntries(); ++
i) {
235 TKey* ki = (TKey*)treeList->At(
i);
236 for (
int j =
i + 1;
j < treeList->GetEntries(); ++
j) {
237 TKey* kj = (TKey*)treeList->At(
j);
238 if (std::strcmp(ki->GetName(), kj->GetName()) == 0 && std::strcmp(ki->GetTitle(), kj->GetTitle()) == 0) {
239 if (ki->GetCycle() < kj->GetCycle()) {
240 printf(
" *** FATAL *** we had ordered the keys, first cycle should be higher, please check");
244 treeList->Remove(kj);
254 std::list<std::string> foundTrees;
256 for (
auto key2 : *treeList) {
257 auto treeName = ((TObjString*)key2)->GetString().Data();
258 bool found = (std::find(foundTrees.begin(), foundTrees.end(), treeName) != foundTrees.end());
260 printf(
" ***WARNING*** Tree %s was already merged (even if we purged duplicated trees before, so this should not happen), skipping\n", treeName);
263 foundTrees.push_back(treeName);
265 auto inputTree = (TTree*)inputFile->Get(Form(
"%s/%s", dfName, treeName));
266 bool fastCopy = (inputTree->GetTotBytes() > 10000000);
268 printf(
" Processing tree %s with %lld entries with total size %lld (fast copy: %d)\n", treeName, inputTree->GetEntries(), inputTree->GetTotBytes(), fastCopy);
271 bool alreadyCopied =
false;
272 if (trees.count(treeName) == 0) {
274 printf(
" *** FATAL ***: The tree %s was not in the previous dataframe(s)\n", treeName);
281 outputDir = outputFile->mkdir(dfName);
284 printf(
"Writing to output folder %s\n", dfName);
288 auto outputTree = inputTree->CloneTree(-1, (fastCopy) ?
"fast" :
"");
289 currentDirSize += inputTree->GetTotBytes();
290 alreadyCopied =
true;
291 outputTree->SetAutoFlush(0);
292 trees[treeName] = outputTree;
295 trees[treeName]->CopyAddresses(inputTree);
298 auto outputTree = trees[treeName];
300 std::vector<std::pair<int*, int>> indexList;
301 std::vector<char*> vlaPointers;
302 std::vector<int*> indexPointers;
303 TObjArray* branches = inputTree->GetListOfBranches();
304 for (
int i = 0;
i < branches->GetEntriesFast(); ++
i) {
305 TBranch* br = (TBranch*)branches->UncheckedAt(
i);
306 TString branchName(br->GetName());
309 if (((TLeaf*)br->GetListOfLeaves()->First())->GetLeafCount() !=
nullptr) {
310 int maximum = ((TLeaf*)br->GetListOfLeaves()->First())->GetLeafCount()->GetMaximum();
315 br->GetExpectedType(cls,
type);
316 auto typeSize = TDataType::GetDataType(
type)->Size();
318 char*
buffer =
new char[maximum * typeSize];
319 memset(
buffer, 0, maximum * typeSize);
320 vlaPointers.push_back(
buffer);
322 printf(
" Allocated VLA buffer of length %d with %d bytes each for branch name %s\n", maximum, typeSize, br->GetName());
324 inputTree->SetBranchAddress(br->GetName(),
buffer);
325 outputTree->SetBranchAddress(br->GetName(),
buffer);
327 if (branchName.BeginsWith(
"fIndexArray")) {
328 for (
int i = 0;
i < maximum;
i++) {
332 }
else if (branchName.BeginsWith(
"fIndexSlice")) {
335 vlaPointers.push_back(
reinterpret_cast<char*
>(
buffer));
337 inputTree->SetBranchAddress(br->GetName(),
buffer);
338 outputTree->SetBranchAddress(br->GetName(),
buffer);
342 }
else if (branchName.BeginsWith(
"fIndex") && !branchName.EndsWith(
"_size")) {
345 indexPointers.push_back(
buffer);
347 inputTree->SetBranchAddress(br->GetName(),
buffer);
348 outputTree->SetBranchAddress(br->GetName(),
buffer);
354 if (indexList.size() > 0) {
355 auto entries = inputTree->GetEntries();
356 int minIndexOffset = unassignedIndexOffset[treeName];
357 auto newMinIndexOffset = minIndexOffset;
358 for (
int i = 0;
i < entries;
i++) {
359 for (
auto&
index : indexList) {
362 inputTree->GetEntry(
i);
364 for (
const auto& idx : indexList) {
366 if (*(idx.first) < 0) {
367 *(idx.first) += minIndexOffset;
368 newMinIndexOffset = std::min(newMinIndexOffset, *(idx.first));
370 *(idx.first) += idx.second;
373 if (!alreadyCopied) {
374 int nbytes = outputTree->Fill();
376 currentDirSize += nbytes;
380 unassignedIndexOffset[treeName] = newMinIndexOffset;
381 }
else if (!alreadyCopied) {
382 auto nbytes = outputTree->CopyEntries(inputTree, -1, (fastCopy) ?
"fast" :
"");
384 currentDirSize += nbytes;
390 for (
auto&
buffer : indexPointers) {
393 for (
auto&
buffer : vlaPointers) {
403 for (
auto const&
tree : trees) {
404 bool found = (std::find(foundTrees.begin(), foundTrees.end(),
tree.first) != foundTrees.end());
405 if (found ==
false) {
406 printf(
" *** FATAL ***: The tree %s was not in the current dataframe\n",
tree.first.c_str());
418 for (
auto const&
tree : trees) {
425 if (maxDirSize > 0) {
427 printf(
"ERROR: Index on %s but no tree found\n",
offset.first.c_str());
433 if (maxDirSize == 0 || currentDirSize > maxDirSize) {
435 printf(
"Maximum size reached: %ld. Closing folder %s.\n", currentDirSize, dfName);
445 parentFiles->Write(
"parentFiles", TObject::kSingleKey);
453 if (totalMergedDFs == 0) {
454 printf(
"ERROR: Did not merge a single DF. This does not seem right.\n");
460 printf(
"Removing incomplete output file %s.\n", outputFile->GetName());
461 gSystem->Unlink(outputFile->GetName());
463 printf(
"AOD merger finished. Size overview follows:\n");
465 uint64_t totalCompressed = 0;
466 uint64_t totalUncompressed = 0;
467 for (
auto const&
tree : sizeCompressed) {
468 totalCompressed +=
tree.second;
469 totalUncompressed += sizeUncompressed[
tree.first];
471 if (totalCompressed > 0 && totalUncompressed > 0) {
472 for (
auto const&
tree : sizeCompressed) {
473 printf(
" Tree %20s | Compressed: %12" PRIu64
" (%2.0f%%) | Uncompressed: %12" PRIu64
" (%2.0f%%)\n",
tree.first.c_str(),
tree.second, 100.0 *
tree.second / totalCompressed, sizeUncompressed[
tree.first], 100.0 * sizeUncompressed[
tree.first] / totalUncompressed);