Project
Loading...
Searching...
No Matches
aodMerger.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <map>
13#include <list>
14#include <fstream>
15#include <getopt.h>
16#include <algorithm>
17
18#include "TSystem.h"
19#include "TFile.h"
20#include "TTree.h"
21#include "TList.h"
22#include "TKey.h"
23#include "TDirectory.h"
24#include "TObjString.h"
25#include <TGrid.h>
26#include <TMap.h>
27#include <TLeaf.h>
28
29#include "aodMerger.h"
30#include <cinttypes>
31
32// AOD merger with correct index rewriting
33// No need to know the datamodel because the branch names follow a canonical standard (identified by fIndex)
34int main(int argc, char* argv[])
35{
36 std::string inputCollection("input.txt");
37 std::string outputFileName("AO2D.root");
38 long maxDirSize = 100000000;
39 bool skipNonExistingFiles = false;
40 bool skipParentFilesList = false;
41 int verbosity = 2;
42 int exitCode = 0; // 0: success, >0: failure
43 int compression = 505;
44
45 int option_index = 0;
46 static struct option long_options[] = {
47 {"input", required_argument, nullptr, 0},
48 {"output", required_argument, nullptr, 1},
49 {"max-size", required_argument, nullptr, 2},
50 {"skip-non-existing-files", no_argument, nullptr, 3},
51 {"skip-parent-files-list", no_argument, nullptr, 4},
52 {"compression", required_argument, nullptr, 5},
53 {"verbosity", required_argument, nullptr, 'v'},
54 {"help", no_argument, nullptr, 'h'},
55 {nullptr, 0, nullptr, 0}};
56
57 while (true) {
58 int c = getopt_long(argc, argv, "", long_options, &option_index);
59 if (c == -1) {
60 break;
61 } else if (c == 0) {
62 inputCollection = optarg;
63 } else if (c == 1) {
64 outputFileName = optarg;
65 } else if (c == 2) {
66 maxDirSize = atol(optarg);
67 } else if (c == 3) {
68 skipNonExistingFiles = true;
69 } else if (c == 4) {
70 skipParentFilesList = true;
71 } else if (c == 5) {
72 compression = atoi(optarg);
73 } else if (c == 'v') {
74 verbosity = atoi(optarg);
75 } else if (c == 'h') {
76 printf("AO2D merging tool. Options: \n");
77 printf(" --input <inputfile.txt> Contains path to files to be merged. Default: %s\n", inputCollection.c_str());
78 printf(" --output <outputfile.root> Target output ROOT file. Default: %s\n", outputFileName.c_str());
79 printf(" --max-size <size in Bytes> Target directory size. Default: %ld. Set to 0 if file is not self-contained.\n", maxDirSize);
80 printf(" --skip-non-existing-files Flag to allow skipping of non-existing files in the input list.\n");
81 printf(" --skip-parent-files-list Flag to allow skipping the merging of the parent files list.\n");
82 printf(" --compression <root compression id> Compression algorithm / level to use (default: %d)\n", compression);
83 printf(" --verbosity <flag> Verbosity of output (default: %d).\n", verbosity);
84 return -1;
85 } else {
86 return -2;
87 }
88 }
89
90 printf("AOD merger started with:\n");
91 printf(" Input file: %s\n", inputCollection.c_str());
92 printf(" Output file name: %s\n", outputFileName.c_str());
93 printf(" Maximal folder size (uncompressed): %ld\n", maxDirSize);
94 if (skipNonExistingFiles) {
95 printf(" WARNING: Skipping non-existing files.\n");
96 }
97
98 std::map<std::string, TTree*> trees;
99 std::map<std::string, uint64_t> sizeCompressed;
100 std::map<std::string, uint64_t> sizeUncompressed;
101 std::map<std::string, int> offsets;
102 std::map<std::string, int> unassignedIndexOffset;
103
104 auto outputFile = TFile::Open(outputFileName.c_str(), "RECREATE", "", compression);
105 TDirectory* outputDir = nullptr;
106 long currentDirSize = 0;
107
108 std::ifstream in;
109 in.open(inputCollection);
110 TString line;
111 TMap* metaData = nullptr;
112 TMap* parentFiles = nullptr;
113 int totalMergedDFs = 0;
114 int mergedDFs = 0;
115 while (in.good() && exitCode == 0) {
116 in >> line;
117
118 if (line.Length() == 0) {
119 continue;
120 }
121
122 if (line.BeginsWith("alien:") && !gGrid) {
123 printf("Connecting to AliEn...");
124 TGrid::Connect("alien:");
125 }
126
127 printf("Processing input file: %s\n", line.Data());
128
129 auto inputFile = TFile::Open(line);
130 if (!inputFile || inputFile->IsZombie()) {
131 printf("Error: %s input file %s.\n", !inputFile ? "Could not open" : "Zombie", line.Data());
132 if (skipNonExistingFiles) {
133 continue;
134 } else {
135 printf("Aborting merge!\n");
136 exitCode = 1;
137 break;
138 }
139 }
140
141 TList* keyList = inputFile->GetListOfKeys();
142 keyList->Sort();
143
144 for (auto key1 : *keyList) {
145 if (((TObjString*)key1)->GetString().EqualTo("metaData")) {
146 auto metaDataCurrentFile = (TMap*)inputFile->Get("metaData");
147 if (metaData == nullptr) {
148 metaData = metaDataCurrentFile;
149 outputFile->cd();
150 metaData->Write("metaData", TObject::kSingleKey);
151 } else {
152 for (auto metaDataPair : *metaData) {
153 auto metaDataKey = ((TPair*)metaDataPair)->Key();
154 if (metaDataCurrentFile->Contains(((TObjString*)metaDataKey)->GetString())) {
155 auto value = (TObjString*)metaData->GetValue(((TObjString*)metaDataKey)->GetString());
156 auto valueCurrentFile = (TObjString*)metaDataCurrentFile->GetValue(((TObjString*)metaDataKey)->GetString());
157 if (!value->GetString().EqualTo(valueCurrentFile->GetString())) {
158 printf("WARNING: Metadata differs between input files. Key %s : %s vs. %s\n", ((TObjString*)metaDataKey)->GetString().Data(),
159 value->GetString().Data(), valueCurrentFile->GetString().Data());
160 }
161 } else {
162 printf("WARNING: Metadata differs between input files. Key %s is not present in current file\n", ((TObjString*)metaDataKey)->GetString().Data());
163 }
164 }
165 }
166 }
167
168 if (((TObjString*)key1)->GetString().EqualTo("parentFiles") && !skipParentFilesList) {
169 auto parentFilesCurrentFile = (TMap*)inputFile->Get("parentFiles");
170 if (parentFiles == nullptr) {
171 parentFiles = new TMap;
172 }
173 for (auto pair : *parentFilesCurrentFile) {
174 parentFiles->Add(((TPair*)pair)->Key(), ((TPair*)pair)->Value());
175 }
176 delete parentFilesCurrentFile;
177 }
178
179 if (!((TObjString*)key1)->GetString().BeginsWith("DF_")) {
180 continue;
181 }
182
183 auto dfName = ((TObjString*)key1)->GetString().Data();
184
185 if (verbosity > 0) {
186 printf(" Processing folder %s\n", dfName);
187 }
188 ++mergedDFs;
189 ++totalMergedDFs;
190 auto folder = (TDirectoryFile*)inputFile->Get(dfName);
191 auto treeList = folder->GetListOfKeys();
192
193 treeList->Sort();
194
195 // purging keys from duplicates
196 for (auto i = 0; i < treeList->GetEntries(); ++i) {
197 TKey* ki = (TKey*)treeList->At(i);
198 for (int j = i + 1; j < treeList->GetEntries(); ++j) {
199 TKey* kj = (TKey*)treeList->At(j);
200 if (std::strcmp(ki->GetName(), kj->GetName()) == 0 && std::strcmp(ki->GetTitle(), kj->GetTitle()) == 0) {
201 if (ki->GetCycle() < kj->GetCycle()) {
202 printf(" *** FATAL *** we had ordered the keys, first cycle should be higher, please check");
203 exitCode = 5;
204 } else {
205 // key is a duplicate, let's remove it
206 treeList->Remove(kj);
207 j--;
208 }
209 } else {
210 // we changed key, since they are sorted, we won't have the same anymore
211 break;
212 }
213 }
214 }
215
216 std::list<std::string> foundTrees;
217
218 for (auto key2 : *treeList) {
219 auto treeName = ((TObjString*)key2)->GetString().Data();
220 bool found = (std::find(foundTrees.begin(), foundTrees.end(), treeName) != foundTrees.end());
221 if (found == true) {
222 printf(" ***WARNING*** Tree %s was already merged (even if we purged duplicated trees before, so this should not happen), skipping\n", treeName);
223 continue;
224 }
225 foundTrees.push_back(treeName);
226
227 auto inputTree = (TTree*)inputFile->Get(Form("%s/%s", dfName, treeName));
228 bool fastCopy = (inputTree->GetTotBytes() > 10000000); // Only do this for large enough trees to avoid that baskets are too small
229 if (verbosity > 1) {
230 printf(" Processing tree %s with %lld entries with total size %lld (fast copy: %d)\n", treeName, inputTree->GetEntries(), inputTree->GetTotBytes(), fastCopy);
231 }
232
233 bool alreadyCopied = false;
234 if (trees.count(treeName) == 0) {
235 if (mergedDFs > 1) {
236 printf(" *** FATAL ***: The tree %s was not in the previous dataframe(s)\n", treeName);
237 exitCode = 3;
238 }
239
240 // Connect trees but do not copy entries (using the clone function) unless fast copy is on
241 // NOTE Basket size etc. are copied in CloneTree()
242 if (!outputDir) {
243 outputDir = outputFile->mkdir(dfName);
244 currentDirSize = 0;
245 if (verbosity > 0) {
246 printf("Writing to output folder %s\n", dfName);
247 }
248 }
249 outputDir->cd();
250 auto outputTree = inputTree->CloneTree(-1, (fastCopy) ? "fast" : "");
251 currentDirSize += inputTree->GetTotBytes(); // NOTE outputTree->GetTotBytes() is 0, so we use the inputTree here
252 alreadyCopied = true;
253 outputTree->SetAutoFlush(0);
254 trees[treeName] = outputTree;
255 } else {
256 // adjust addresses tree
257 trees[treeName]->CopyAddresses(inputTree);
258 }
259
260 auto outputTree = trees[treeName];
261 // register index and connect VLA columns
262 std::vector<std::pair<int*, int>> indexList;
263 std::vector<char*> vlaPointers;
264 std::vector<int*> indexPointers;
265 TObjArray* branches = inputTree->GetListOfBranches();
266 for (int i = 0; i < branches->GetEntriesFast(); ++i) {
267 TBranch* br = (TBranch*)branches->UncheckedAt(i);
268 TString branchName(br->GetName());
269
270 // detect VLA
271 if (((TLeaf*)br->GetListOfLeaves()->First())->GetLeafCount() != nullptr) {
272 int maximum = ((TLeaf*)br->GetListOfLeaves()->First())->GetLeafCount()->GetMaximum();
273
274 // get type
275 static TClass* cls;
276 EDataType type;
277 br->GetExpectedType(cls, type);
278 auto typeSize = TDataType::GetDataType(type)->Size();
279
280 char* buffer = new char[maximum * typeSize];
281 memset(buffer, 0, maximum * typeSize);
282 vlaPointers.push_back(buffer);
283 if (verbosity > 2) {
284 printf(" Allocated VLA buffer of length %d with %d bytes each for branch name %s\n", maximum, typeSize, br->GetName());
285 }
286 inputTree->SetBranchAddress(br->GetName(), buffer);
287 outputTree->SetBranchAddress(br->GetName(), buffer);
288
289 if (branchName.BeginsWith("fIndexArray")) {
290 for (int i = 0; i < maximum; i++) {
291 indexList.push_back({reinterpret_cast<int*>(buffer + i * typeSize), offsets[getTableName(branchName, treeName)]});
292 }
293 }
294 } else if (branchName.BeginsWith("fIndexSlice")) {
295 int* buffer = new int[2];
296 memset(buffer, 0, 2 * sizeof(buffer[0]));
297 vlaPointers.push_back(reinterpret_cast<char*>(buffer));
298
299 inputTree->SetBranchAddress(br->GetName(), buffer);
300 outputTree->SetBranchAddress(br->GetName(), buffer);
301
302 indexList.push_back({buffer, offsets[getTableName(branchName, treeName)]});
303 indexList.push_back({buffer + 1, offsets[getTableName(branchName, treeName)]});
304 } else if (branchName.BeginsWith("fIndex") && !branchName.EndsWith("_size")) {
305 int* buffer = new int;
306 *buffer = 0;
307 indexPointers.push_back(buffer);
308
309 inputTree->SetBranchAddress(br->GetName(), buffer);
310 outputTree->SetBranchAddress(br->GetName(), buffer);
311
312 indexList.push_back({buffer, offsets[getTableName(branchName, treeName)]});
313 }
314 }
315
316 if (indexList.size() > 0) {
317 auto entries = inputTree->GetEntries();
318 int minIndexOffset = unassignedIndexOffset[treeName];
319 auto newMinIndexOffset = minIndexOffset;
320 for (int i = 0; i < entries; i++) {
321 for (auto& index : indexList) {
322 *(index.first) = 0; // Any positive number will do, in any case it will not be filled in the output. Otherwise the previous entry is used and manipulated in the following.
323 }
324 inputTree->GetEntry(i);
325 // shift index columns by offset
326 for (const auto& idx : indexList) {
327 // if negative, the index is unassigned. In this case, the different unassigned blocks have to get unique negative IDs
328 if (*(idx.first) < 0) {
329 *(idx.first) += minIndexOffset;
330 newMinIndexOffset = std::min(newMinIndexOffset, *(idx.first));
331 } else {
332 *(idx.first) += idx.second;
333 }
334 }
335 if (!alreadyCopied) {
336 int nbytes = outputTree->Fill();
337 if (nbytes > 0) {
338 currentDirSize += nbytes;
339 }
340 }
341 }
342 unassignedIndexOffset[treeName] = newMinIndexOffset;
343 } else if (!alreadyCopied) {
344 auto nbytes = outputTree->CopyEntries(inputTree, -1, (fastCopy) ? "fast" : "");
345 if (nbytes > 0) {
346 currentDirSize += nbytes;
347 }
348 }
349
350 delete inputTree;
351
352 for (auto& buffer : indexPointers) {
353 delete buffer;
354 }
355 for (auto& buffer : vlaPointers) {
356 delete[] buffer;
357 }
358 }
359 if (exitCode > 0) {
360 break;
361 }
362
363 // check if all trees were present
364 if (mergedDFs > 1) {
365 for (auto const& tree : trees) {
366 bool found = (std::find(foundTrees.begin(), foundTrees.end(), tree.first) != foundTrees.end());
367 if (found == false) {
368 printf(" *** FATAL ***: The tree %s was not in the current dataframe\n", tree.first.c_str());
369 exitCode = 4;
370 }
371 }
372 }
373
374 // set to -1 to identify not found tables
375 for (auto& offset : offsets) {
376 offset.second = -1;
377 }
378
379 // update offsets
380 for (auto const& tree : trees) {
381 offsets[removeVersionSuffix(tree.first.c_str())] = tree.second->GetEntries();
382 }
383
384 // check for not found tables
385 for (auto& offset : offsets) {
386 if (offset.second < 0) {
387 if (maxDirSize > 0) {
388 // if maxDirSize is 0 then we do not merge DFs and this error is not an error actually (e.g. for not self-contained derived data)
389 printf("ERROR: Index on %s but no tree found\n", offset.first.c_str());
390 }
391 offset.second = 0;
392 }
393 }
394
395 if (maxDirSize == 0 || currentDirSize > maxDirSize) {
396 if (verbosity > 0) {
397 printf("Maximum size reached: %ld. Closing folder %s.\n", currentDirSize, dfName);
398 }
399 for (auto const& tree : trees) {
400 // printf("Writing %s\n", tree.first.c_str());
401 outputDir->cd();
402 tree.second->Write();
403
404 // stats
405 sizeCompressed[tree.first] += tree.second->GetZipBytes();
406 sizeUncompressed[tree.first] += tree.second->GetTotBytes();
407
408 delete tree.second;
409 }
410 outputDir = nullptr;
411 trees.clear();
412 offsets.clear();
413 mergedDFs = 0;
414 }
415 }
416 inputFile->Close();
417 }
418
419 if (parentFiles) {
420 outputFile->cd();
421 parentFiles->Write("parentFiles", TObject::kSingleKey);
422 }
423
424 for (auto const& tree : trees) {
425 outputDir->cd();
426 tree.second->Write();
427
428 // stats
429 sizeCompressed[tree.first] += tree.second->GetZipBytes();
430 sizeUncompressed[tree.first] += tree.second->GetTotBytes();
431
432 delete tree.second;
433 }
434
435 outputFile->Write();
436 outputFile->Close();
437
438 if (totalMergedDFs == 0) {
439 printf("ERROR: Did not merge a single DF. This does not seem right.\n");
440 exitCode = 2;
441 }
442
443 // in case of failure, remove the incomplete file
444 if (exitCode != 0) {
445 printf("Removing incomplete output file %s.\n", outputFile->GetName());
446 gSystem->Unlink(outputFile->GetName());
447 } else {
448 printf("AOD merger finished. Size overview follows:\n");
449
450 uint64_t totalCompressed = 0;
451 uint64_t totalUncompressed = 0;
452 for (auto const& tree : sizeCompressed) {
453 totalCompressed += tree.second;
454 totalUncompressed += sizeUncompressed[tree.first];
455 }
456 if (totalCompressed > 0 && totalUncompressed > 0) {
457 for (auto const& tree : sizeCompressed) {
458 printf(" Tree %20s | Compressed: %12" PRIu64 " (%2.0f%%) | Uncompressed: %12" PRIu64 " (%2.0f%%)\n", tree.first.c_str(), tree.second, 100.0 * tree.second / totalCompressed, sizeUncompressed[tree.first], 100.0 * sizeUncompressed[tree.first] / totalUncompressed);
459 }
460 }
461 }
462 printf("\n");
463
464 return exitCode;
465}
#define verbosity
int32_t i
uint32_t j
Definition RawData.h:0
uint32_t c
Definition RawData.h:2
const char * getTableName(const char *branchName, const char *treeName)
Definition aodMerger.h:26
const char * removeVersionSuffix(const char *treeName)
Definition aodMerger.h:14
GLuint buffer
Definition glcorearb.h:655
GLuint GLsizei const GLuint const GLintptr * offsets
Definition glcorearb.h:2595
GLuint index
Definition glcorearb.h:781
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLintptr offset
Definition glcorearb.h:660
#define main
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))