Project
Loading...
Searching...
No Matches
aodMerger.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <map>
13#include <list>
14#include <fstream>
15#include <getopt.h>
16#include <algorithm>
17
18#include "TSystem.h"
19#include "TFile.h"
20#include "TTree.h"
21#include "TList.h"
22#include "TKey.h"
23#include "TDirectory.h"
24#include "TObjString.h"
25#include <TGrid.h>
26#include <TMap.h>
27#include <TLeaf.h>
28
29#include "aodMerger.h"
30#include <cinttypes>
31
32// AOD merger with correct index rewriting
33// No need to know the datamodel because the branch names follow a canonical standard (identified by fIndex)
34int main(int argc, char* argv[])
35{
36 std::string inputCollection("input.txt");
37 std::string outputFileName("AO2D.root");
38 long maxDirSize = 100000000;
39 bool skipNonExistingFiles = false;
40 bool skipParentFilesList = false;
41 bool mergeByName = false;
42 int verbosity = 2;
43 int exitCode = 0; // 0: success, >0: failure
44 int compression = 505;
45
46 int option_index = 0;
47 static struct option long_options[] = {
48 {"input", required_argument, nullptr, 0},
49 {"output", required_argument, nullptr, 1},
50 {"max-size", required_argument, nullptr, 2},
51 {"skip-non-existing-files", no_argument, nullptr, 3},
52 {"skip-parent-files-list", no_argument, nullptr, 4},
53 {"compression", required_argument, nullptr, 5},
54 {"merge-by-name", no_argument, nullptr, 6},
55 {"verbosity", required_argument, nullptr, 'v'},
56 {"help", no_argument, nullptr, 'h'},
57 {nullptr, 0, nullptr, 0}};
58
59 while (true) {
60 int c = getopt_long(argc, argv, "", long_options, &option_index);
61 if (c == -1) {
62 break;
63 } else if (c == 0) {
64 inputCollection = optarg;
65 } else if (c == 1) {
66 outputFileName = optarg;
67 } else if (c == 2) {
68 maxDirSize = atol(optarg);
69 } else if (c == 3) {
70 skipNonExistingFiles = true;
71 } else if (c == 4) {
72 skipParentFilesList = true;
73 } else if (c == 5) {
74 compression = atoi(optarg);
75 } else if (c == 6) {
76 mergeByName = true;
77 } else if (c == 'v') {
78 verbosity = atoi(optarg);
79 } else if (c == 'h') {
80 printf("AO2D merging tool. Options: \n");
81 printf(" --input <inputfile.txt> Contains path to files to be merged. Default: %s\n", inputCollection.c_str());
82 printf(" --output <outputfile.root> Target output ROOT file. Default: %s\n", outputFileName.c_str());
83 printf(" --max-size <size in Bytes> Target directory size. Default: %ld. Set to 0 if file is not self-contained.\n", maxDirSize);
84 printf(" --skip-non-existing-files Flag to allow skipping of non-existing files in the input list.\n");
85 printf(" --skip-parent-files-list Flag to allow skipping the merging of the parent files list.\n");
86 printf(" --compression <root compression id> Compression algorithm / level to use (default: %d)\n", compression);
87 printf(" --merge-by-name Only merge TTrees from folders with the same name.\n");
88 printf(" --verbosity <flag> Verbosity of output (default: %d).\n", verbosity);
89 return -1;
90 } else {
91 return -2;
92 }
93 }
94
95 printf("AOD merger started with:\n");
96 printf(" Input file: %s\n", inputCollection.c_str());
97 printf(" Output file name: %s\n", outputFileName.c_str());
98 printf(" Maximal folder size (uncompressed): %ld\n", maxDirSize);
99 if (skipNonExistingFiles) {
100 printf(" WARNING: Skipping non-existing files.\n");
101 }
102 if (mergeByName) {
103 printf(" Merging only folders with the same name.\n");
104 }
105
106 std::map<std::string, TTree*> trees;
107 std::map<std::string, uint64_t> sizeCompressed;
108 std::map<std::string, uint64_t> sizeUncompressed;
109 std::map<std::string, int> offsets;
110 std::map<std::string, int> unassignedIndexOffset;
111
112 auto outputFile = TFile::Open(outputFileName.c_str(), "RECREATE", "", compression);
113 TDirectory* outputDir = nullptr;
114 long currentDirSize = 0;
115
116 std::ifstream in;
117 in.open(inputCollection);
118 TString line;
119 TMap* metaData = nullptr;
120 TMap* parentFiles = nullptr;
121 int totalMergedDFs = 0;
122 int mergedDFs = 0;
123
124 // Write all accumulated trees to outputDir, update stats, and clean up state.
125 auto flushTrees = [&](bool resetState) {
126 if (!outputDir) {
127 return;
128 }
129 for (auto const& tree : trees) {
130 outputDir->cd();
131 tree.second->Write();
132 sizeCompressed[tree.first] += tree.second->GetZipBytes();
133 sizeUncompressed[tree.first] += tree.second->GetTotBytes();
134 delete tree.second;
135 }
136 if (resetState) {
137 outputDir = nullptr;
138 trees.clear();
139 offsets.clear();
140 mergedDFs = 0;
141 currentDirSize = 0;
142 }
143 };
144
145 while (in.good() && exitCode == 0) {
146 in >> line;
147
148 if (line.Length() == 0) {
149 continue;
150 }
151
152 if (line.BeginsWith("alien:") && !gGrid) {
153 printf("Connecting to AliEn...");
154 TGrid::Connect("alien:");
155 }
156
157 printf("Processing input file: %s\n", line.Data());
158
159 auto inputFile = TFile::Open(line);
160 if (!inputFile || inputFile->IsZombie()) {
161 printf("Error: %s input file %s.\n", !inputFile ? "Could not open" : "Zombie", line.Data());
162 if (skipNonExistingFiles) {
163 continue;
164 } else {
165 printf("Aborting merge!\n");
166 exitCode = 1;
167 break;
168 }
169 }
170
171 TList* keyList = inputFile->GetListOfKeys();
172 keyList->Sort();
173
174 for (auto key1 : *keyList) {
175 if (((TObjString*)key1)->GetString().EqualTo("metaData")) {
176 auto metaDataCurrentFile = (TMap*)inputFile->Get("metaData");
177 if (metaData == nullptr) {
178 metaData = metaDataCurrentFile;
179 outputFile->cd();
180 metaData->Write("metaData", TObject::kSingleKey);
181 } else {
182 for (auto metaDataPair : *metaData) {
183 auto metaDataKey = ((TPair*)metaDataPair)->Key();
184 if (metaDataCurrentFile->Contains(((TObjString*)metaDataKey)->GetString())) {
185 auto value = (TObjString*)metaData->GetValue(((TObjString*)metaDataKey)->GetString());
186 auto valueCurrentFile = (TObjString*)metaDataCurrentFile->GetValue(((TObjString*)metaDataKey)->GetString());
187 if (!value->GetString().EqualTo(valueCurrentFile->GetString())) {
188 printf("WARNING: Metadata differs between input files. Key %s : %s vs. %s\n", ((TObjString*)metaDataKey)->GetString().Data(),
189 value->GetString().Data(), valueCurrentFile->GetString().Data());
190 }
191 } else {
192 printf("WARNING: Metadata differs between input files. Key %s is not present in current file\n", ((TObjString*)metaDataKey)->GetString().Data());
193 }
194 }
195 }
196 }
197
198 if (((TObjString*)key1)->GetString().EqualTo("parentFiles") && !skipParentFilesList) {
199 auto parentFilesCurrentFile = (TMap*)inputFile->Get("parentFiles");
200 if (parentFiles == nullptr) {
201 parentFiles = new TMap;
202 }
203 for (auto pair : *parentFilesCurrentFile) {
204 parentFiles->Add(((TPair*)pair)->Key(), ((TPair*)pair)->Value());
205 }
206 delete parentFilesCurrentFile;
207 }
208
209 if (!((TObjString*)key1)->GetString().BeginsWith("DF_")) {
210 continue;
211 }
212
213 auto dfName = ((TObjString*)key1)->GetString().Data();
214
215 // If merge-by-name is active, flush accumulated trees when the folder name changes
216 if (mergeByName && outputDir && std::string(outputDir->GetName()) != std::string(dfName)) {
217 if (verbosity > 0) {
218 printf("Folder name changed: closing folder %s.\n", outputDir->GetName());
219 }
220 flushTrees(true);
221 }
222
223 if (verbosity > 0) {
224 printf(" Processing folder %s\n", dfName);
225 }
226 ++mergedDFs;
227 ++totalMergedDFs;
228 auto folder = (TDirectoryFile*)inputFile->Get(dfName);
229 auto treeList = folder->GetListOfKeys();
230
231 treeList->Sort();
232
233 // purging keys from duplicates
234 for (auto i = 0; i < treeList->GetEntries(); ++i) {
235 TKey* ki = (TKey*)treeList->At(i);
236 for (int j = i + 1; j < treeList->GetEntries(); ++j) {
237 TKey* kj = (TKey*)treeList->At(j);
238 if (std::strcmp(ki->GetName(), kj->GetName()) == 0 && std::strcmp(ki->GetTitle(), kj->GetTitle()) == 0) {
239 if (ki->GetCycle() < kj->GetCycle()) {
240 printf(" *** FATAL *** we had ordered the keys, first cycle should be higher, please check");
241 exitCode = 5;
242 } else {
243 // key is a duplicate, let's remove it
244 treeList->Remove(kj);
245 j--;
246 }
247 } else {
248 // we changed key, since they are sorted, we won't have the same anymore
249 break;
250 }
251 }
252 }
253
254 std::list<std::string> foundTrees;
255
256 for (auto key2 : *treeList) {
257 auto treeName = ((TObjString*)key2)->GetString().Data();
258 bool found = (std::find(foundTrees.begin(), foundTrees.end(), treeName) != foundTrees.end());
259 if (found == true) {
260 printf(" ***WARNING*** Tree %s was already merged (even if we purged duplicated trees before, so this should not happen), skipping\n", treeName);
261 continue;
262 }
263 foundTrees.push_back(treeName);
264
265 auto inputTree = (TTree*)inputFile->Get(Form("%s/%s", dfName, treeName));
266 bool fastCopy = (inputTree->GetTotBytes() > 10000000); // Only do this for large enough trees to avoid that baskets are too small
267 if (verbosity > 1) {
268 printf(" Processing tree %s with %lld entries with total size %lld (fast copy: %d)\n", treeName, inputTree->GetEntries(), inputTree->GetTotBytes(), fastCopy);
269 }
270
271 bool alreadyCopied = false;
272 if (trees.count(treeName) == 0) {
273 if (mergedDFs > 1) {
274 printf(" *** FATAL ***: The tree %s was not in the previous dataframe(s)\n", treeName);
275 exitCode = 3;
276 }
277
278 // Connect trees but do not copy entries (using the clone function) unless fast copy is on
279 // NOTE Basket size etc. are copied in CloneTree()
280 if (!outputDir) {
281 outputDir = outputFile->mkdir(dfName);
282 currentDirSize = 0;
283 if (verbosity > 0) {
284 printf("Writing to output folder %s\n", dfName);
285 }
286 }
287 outputDir->cd();
288 auto outputTree = inputTree->CloneTree(-1, (fastCopy) ? "fast" : "");
289 currentDirSize += inputTree->GetTotBytes(); // NOTE outputTree->GetTotBytes() is 0, so we use the inputTree here
290 alreadyCopied = true;
291 outputTree->SetAutoFlush(0);
292 trees[treeName] = outputTree;
293 } else {
294 // adjust addresses tree
295 trees[treeName]->CopyAddresses(inputTree);
296 }
297
298 auto outputTree = trees[treeName];
299 // register index and connect VLA columns
300 std::vector<std::pair<int*, int>> indexList;
301 std::vector<char*> vlaPointers;
302 std::vector<int*> indexPointers;
303 TObjArray* branches = inputTree->GetListOfBranches();
304 for (int i = 0; i < branches->GetEntriesFast(); ++i) {
305 TBranch* br = (TBranch*)branches->UncheckedAt(i);
306 TString branchName(br->GetName());
307
308 // detect VLA
309 if (((TLeaf*)br->GetListOfLeaves()->First())->GetLeafCount() != nullptr) {
310 int maximum = ((TLeaf*)br->GetListOfLeaves()->First())->GetLeafCount()->GetMaximum();
311
312 // get type
313 static TClass* cls;
314 EDataType type;
315 br->GetExpectedType(cls, type);
316 auto typeSize = TDataType::GetDataType(type)->Size();
317
318 char* buffer = new char[maximum * typeSize];
319 memset(buffer, 0, maximum * typeSize);
320 vlaPointers.push_back(buffer);
321 if (verbosity > 2) {
322 printf(" Allocated VLA buffer of length %d with %d bytes each for branch name %s\n", maximum, typeSize, br->GetName());
323 }
324 inputTree->SetBranchAddress(br->GetName(), buffer);
325 outputTree->SetBranchAddress(br->GetName(), buffer);
326
327 if (branchName.BeginsWith("fIndexArray")) {
328 for (int i = 0; i < maximum; i++) {
329 indexList.push_back({reinterpret_cast<int*>(buffer + i * typeSize), offsets[getTableName(branchName, treeName)]});
330 }
331 }
332 } else if (branchName.BeginsWith("fIndexSlice")) {
333 int* buffer = new int[2];
334 memset(buffer, 0, 2 * sizeof(buffer[0]));
335 vlaPointers.push_back(reinterpret_cast<char*>(buffer));
336
337 inputTree->SetBranchAddress(br->GetName(), buffer);
338 outputTree->SetBranchAddress(br->GetName(), buffer);
339
340 indexList.push_back({buffer, offsets[getTableName(branchName, treeName)]});
341 indexList.push_back({buffer + 1, offsets[getTableName(branchName, treeName)]});
342 } else if (branchName.BeginsWith("fIndex") && !branchName.EndsWith("_size")) {
343 int* buffer = new int;
344 *buffer = 0;
345 indexPointers.push_back(buffer);
346
347 inputTree->SetBranchAddress(br->GetName(), buffer);
348 outputTree->SetBranchAddress(br->GetName(), buffer);
349
350 indexList.push_back({buffer, offsets[getTableName(branchName, treeName)]});
351 }
352 }
353
354 if (indexList.size() > 0) {
355 auto entries = inputTree->GetEntries();
356 int minIndexOffset = unassignedIndexOffset[treeName];
357 auto newMinIndexOffset = minIndexOffset;
358 for (int i = 0; i < entries; i++) {
359 for (auto& index : indexList) {
360 *(index.first) = 0; // Any positive number will do, in any case it will not be filled in the output. Otherwise the previous entry is used and manipulated in the following.
361 }
362 inputTree->GetEntry(i);
363 // shift index columns by offset
364 for (const auto& idx : indexList) {
365 // if negative, the index is unassigned. In this case, the different unassigned blocks have to get unique negative IDs
366 if (*(idx.first) < 0) {
367 *(idx.first) += minIndexOffset;
368 newMinIndexOffset = std::min(newMinIndexOffset, *(idx.first));
369 } else {
370 *(idx.first) += idx.second;
371 }
372 }
373 if (!alreadyCopied) {
374 int nbytes = outputTree->Fill();
375 if (nbytes > 0) {
376 currentDirSize += nbytes;
377 }
378 }
379 }
380 unassignedIndexOffset[treeName] = newMinIndexOffset;
381 } else if (!alreadyCopied) {
382 auto nbytes = outputTree->CopyEntries(inputTree, -1, (fastCopy) ? "fast" : "");
383 if (nbytes > 0) {
384 currentDirSize += nbytes;
385 }
386 }
387
388 delete inputTree;
389
390 for (auto& buffer : indexPointers) {
391 delete buffer;
392 }
393 for (auto& buffer : vlaPointers) {
394 delete[] buffer;
395 }
396 }
397 if (exitCode > 0) {
398 break;
399 }
400
401 // check if all trees were present
402 if (mergedDFs > 1) {
403 for (auto const& tree : trees) {
404 bool found = (std::find(foundTrees.begin(), foundTrees.end(), tree.first) != foundTrees.end());
405 if (found == false) {
406 printf(" *** FATAL ***: The tree %s was not in the current dataframe\n", tree.first.c_str());
407 exitCode = 4;
408 }
409 }
410 }
411
412 // set to -1 to identify not found tables
413 for (auto& offset : offsets) {
414 offset.second = -1;
415 }
416
417 // update offsets
418 for (auto const& tree : trees) {
419 offsets[removeVersionSuffix(tree.first.c_str())] = tree.second->GetEntries();
420 }
421
422 // check for not found tables
423 for (auto& offset : offsets) {
424 if (offset.second < 0) {
425 if (maxDirSize > 0) {
426 // if maxDirSize is 0 then we do not merge DFs and this error is not an error actually (e.g. for not self-contained derived data)
427 printf("ERROR: Index on %s but no tree found\n", offset.first.c_str());
428 }
429 offset.second = 0;
430 }
431 }
432
433 if (maxDirSize == 0 || currentDirSize > maxDirSize) {
434 if (verbosity > 0) {
435 printf("Maximum size reached: %ld. Closing folder %s.\n", currentDirSize, dfName);
436 }
437 flushTrees(true);
438 }
439 }
440 inputFile->Close();
441 }
442
443 if (parentFiles) {
444 outputFile->cd();
445 parentFiles->Write("parentFiles", TObject::kSingleKey);
446 }
447
448 flushTrees(false);
449
450 outputFile->Write();
451 outputFile->Close();
452
453 if (totalMergedDFs == 0) {
454 printf("ERROR: Did not merge a single DF. This does not seem right.\n");
455 exitCode = 2;
456 }
457
458 // in case of failure, remove the incomplete file
459 if (exitCode != 0) {
460 printf("Removing incomplete output file %s.\n", outputFile->GetName());
461 gSystem->Unlink(outputFile->GetName());
462 } else {
463 printf("AOD merger finished. Size overview follows:\n");
464
465 uint64_t totalCompressed = 0;
466 uint64_t totalUncompressed = 0;
467 for (auto const& tree : sizeCompressed) {
468 totalCompressed += tree.second;
469 totalUncompressed += sizeUncompressed[tree.first];
470 }
471 if (totalCompressed > 0 && totalUncompressed > 0) {
472 for (auto const& tree : sizeCompressed) {
473 printf(" Tree %20s | Compressed: %12" PRIu64 " (%2.0f%%) | Uncompressed: %12" PRIu64 " (%2.0f%%)\n", tree.first.c_str(), tree.second, 100.0 * tree.second / totalCompressed, sizeUncompressed[tree.first], 100.0 * sizeUncompressed[tree.first] / totalUncompressed);
474 }
475 }
476 }
477 printf("\n");
478
479 return exitCode;
480}
#define verbosity
int32_t i
uint32_t j
Definition RawData.h:0
uint32_t c
Definition RawData.h:2
const char * getTableName(const char *branchName, const char *treeName)
Definition aodMerger.h:26
const char * removeVersionSuffix(const char *treeName)
Definition aodMerger.h:14
GLuint buffer
Definition glcorearb.h:655
GLuint GLsizei const GLuint const GLintptr * offsets
Definition glcorearb.h:2595
GLuint index
Definition glcorearb.h:781
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLintptr offset
Definition glcorearb.h:660
#define main
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))