Project
Loading...
Searching...
No Matches
aodMerger.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <map>
13#include <list>
14#include <fstream>
15#include <getopt.h>
16
17#include "TSystem.h"
18#include "TFile.h"
19#include "TTree.h"
20#include "TList.h"
21#include "TKey.h"
22#include "TDirectory.h"
23#include "TObjString.h"
24#include <TGrid.h>
25#include <TMap.h>
26#include <TLeaf.h>
27
28#include "aodMerger.h"
29#include <cinttypes>
30
31// AOD merger with correct index rewriting
32// No need to know the datamodel because the branch names follow a canonical standard (identified by fIndex)
33int main(int argc, char* argv[])
34{
35 std::string inputCollection("input.txt");
36 std::string outputFileName("AO2D.root");
37 long maxDirSize = 100000000;
38 bool skipNonExistingFiles = false;
39 bool skipParentFilesList = false;
40 int verbosity = 2;
41 int exitCode = 0; // 0: success, >0: failure
42 int compression = 505;
43
44 int option_index = 0;
45 static struct option long_options[] = {
46 {"input", required_argument, nullptr, 0},
47 {"output", required_argument, nullptr, 1},
48 {"max-size", required_argument, nullptr, 2},
49 {"skip-non-existing-files", no_argument, nullptr, 3},
50 {"skip-parent-files-list", no_argument, nullptr, 4},
51 {"compression", required_argument, nullptr, 5},
52 {"verbosity", required_argument, nullptr, 'v'},
53 {"help", no_argument, nullptr, 'h'},
54 {nullptr, 0, nullptr, 0}};
55
56 while (true) {
57 int c = getopt_long(argc, argv, "", long_options, &option_index);
58 if (c == -1) {
59 break;
60 } else if (c == 0) {
61 inputCollection = optarg;
62 } else if (c == 1) {
63 outputFileName = optarg;
64 } else if (c == 2) {
65 maxDirSize = atol(optarg);
66 } else if (c == 3) {
67 skipNonExistingFiles = true;
68 } else if (c == 4) {
69 skipParentFilesList = true;
70 } else if (c == 5) {
71 compression = atoi(optarg);
72 } else if (c == 'v') {
73 verbosity = atoi(optarg);
74 } else if (c == 'h') {
75 printf("AO2D merging tool. Options: \n");
76 printf(" --input <inputfile.txt> Contains path to files to be merged. Default: %s\n", inputCollection.c_str());
77 printf(" --output <outputfile.root> Target output ROOT file. Default: %s\n", outputFileName.c_str());
78 printf(" --max-size <size in Bytes> Target directory size. Default: %ld. Set to 0 if file is not self-contained.\n", maxDirSize);
79 printf(" --skip-non-existing-files Flag to allow skipping of non-existing files in the input list.\n");
80 printf(" --skip-parent-files-list Flag to allow skipping the merging of the parent files list.\n");
81 printf(" --compression <root compression id> Compression algorithm / level to use (default: %d)\n", compression);
82 printf(" --verbosity <flag> Verbosity of output (default: %d).\n", verbosity);
83 return -1;
84 } else {
85 return -2;
86 }
87 }
88
89 printf("AOD merger started with:\n");
90 printf(" Input file: %s\n", inputCollection.c_str());
91 printf(" Output file name: %s\n", outputFileName.c_str());
92 printf(" Maximal folder size (uncompressed): %ld\n", maxDirSize);
93 if (skipNonExistingFiles) {
94 printf(" WARNING: Skipping non-existing files.\n");
95 }
96
97 std::map<std::string, TTree*> trees;
98 std::map<std::string, uint64_t> sizeCompressed;
99 std::map<std::string, uint64_t> sizeUncompressed;
100 std::map<std::string, int> offsets;
101 std::map<std::string, int> unassignedIndexOffset;
102
103 auto outputFile = TFile::Open(outputFileName.c_str(), "RECREATE", "", compression);
104 TDirectory* outputDir = nullptr;
105 long currentDirSize = 0;
106
107 std::ifstream in;
108 in.open(inputCollection);
109 TString line;
110 TMap* metaData = nullptr;
111 TMap* parentFiles = nullptr;
112 int totalMergedDFs = 0;
113 int mergedDFs = 0;
114 while (in.good() && exitCode == 0) {
115 in >> line;
116
117 if (line.Length() == 0) {
118 continue;
119 }
120
121 if (line.BeginsWith("alien:") && !gGrid) {
122 printf("Connecting to AliEn...");
123 TGrid::Connect("alien:");
124 }
125
126 printf("Processing input file: %s\n", line.Data());
127
128 auto inputFile = TFile::Open(line);
129 if (!inputFile || inputFile->IsZombie()) {
130 printf("Error: %s input file %s.\n", !inputFile ? "Could not open" : "Zombie", line.Data());
131 if (skipNonExistingFiles) {
132 continue;
133 } else {
134 printf("Aborting merge!\n");
135 exitCode = 1;
136 break;
137 }
138 }
139
140 TList* keyList = inputFile->GetListOfKeys();
141 keyList->Sort();
142
143 for (auto key1 : *keyList) {
144 if (((TObjString*)key1)->GetString().EqualTo("metaData")) {
145 auto metaDataCurrentFile = (TMap*)inputFile->Get("metaData");
146 if (metaData == nullptr) {
147 metaData = metaDataCurrentFile;
148 outputFile->cd();
149 metaData->Write("metaData", TObject::kSingleKey);
150 } else {
151 for (auto metaDataPair : *metaData) {
152 auto metaDataKey = ((TPair*)metaDataPair)->Key();
153 if (metaDataCurrentFile->Contains(((TObjString*)metaDataKey)->GetString())) {
154 auto value = (TObjString*)metaData->GetValue(((TObjString*)metaDataKey)->GetString());
155 auto valueCurrentFile = (TObjString*)metaDataCurrentFile->GetValue(((TObjString*)metaDataKey)->GetString());
156 if (!value->GetString().EqualTo(valueCurrentFile->GetString())) {
157 printf("WARNING: Metadata differs between input files. Key %s : %s vs. %s\n", ((TObjString*)metaDataKey)->GetString().Data(),
158 value->GetString().Data(), valueCurrentFile->GetString().Data());
159 }
160 } else {
161 printf("WARNING: Metadata differs between input files. Key %s is not present in current file\n", ((TObjString*)metaDataKey)->GetString().Data());
162 }
163 }
164 }
165 }
166
167 if (((TObjString*)key1)->GetString().EqualTo("parentFiles") && !skipParentFilesList) {
168 auto parentFilesCurrentFile = (TMap*)inputFile->Get("parentFiles");
169 if (parentFiles == nullptr) {
170 parentFiles = new TMap;
171 }
172 for (auto pair : *parentFilesCurrentFile) {
173 parentFiles->Add(((TPair*)pair)->Key(), ((TPair*)pair)->Value());
174 }
175 delete parentFilesCurrentFile;
176 }
177
178 if (!((TObjString*)key1)->GetString().BeginsWith("DF_")) {
179 continue;
180 }
181
182 auto dfName = ((TObjString*)key1)->GetString().Data();
183
184 if (verbosity > 0) {
185 printf(" Processing folder %s\n", dfName);
186 }
187 ++mergedDFs;
188 ++totalMergedDFs;
189 auto folder = (TDirectoryFile*)inputFile->Get(dfName);
190 auto treeList = folder->GetListOfKeys();
191
192 treeList->Sort();
193
194 // purging keys from duplicates
195 for (auto i = 0; i < treeList->GetEntries(); ++i) {
196 TKey* ki = (TKey*)treeList->At(i);
197 for (int j = i + 1; j < treeList->GetEntries(); ++j) {
198 TKey* kj = (TKey*)treeList->At(j);
199 if (std::strcmp(ki->GetName(), kj->GetName()) == 0 && std::strcmp(ki->GetTitle(), kj->GetTitle()) == 0) {
200 if (ki->GetCycle() < kj->GetCycle()) {
201 printf(" *** FATAL *** we had ordered the keys, first cycle should be higher, please check");
202 exitCode = 5;
203 } else {
204 // key is a duplicate, let's remove it
205 treeList->Remove(kj);
206 j--;
207 }
208 } else {
209 // we changed key, since they are sorted, we won't have the same anymore
210 break;
211 }
212 }
213 }
214
215 std::list<std::string> foundTrees;
216
217 for (auto key2 : *treeList) {
218 auto treeName = ((TObjString*)key2)->GetString().Data();
219 bool found = (std::find(foundTrees.begin(), foundTrees.end(), treeName) != foundTrees.end());
220 if (found == true) {
221 printf(" ***WARNING*** Tree %s was already merged (even if we purged duplicated trees before, so this should not happen), skipping\n", treeName);
222 continue;
223 }
224 foundTrees.push_back(treeName);
225
226 auto inputTree = (TTree*)inputFile->Get(Form("%s/%s", dfName, treeName));
227 bool fastCopy = (inputTree->GetTotBytes() > 10000000); // Only do this for large enough trees to avoid that baskets are too small
228 if (verbosity > 1) {
229 printf(" Processing tree %s with %lld entries with total size %lld (fast copy: %d)\n", treeName, inputTree->GetEntries(), inputTree->GetTotBytes(), fastCopy);
230 }
231
232 bool alreadyCopied = false;
233 if (trees.count(treeName) == 0) {
234 if (mergedDFs > 1) {
235 printf(" *** FATAL ***: The tree %s was not in the previous dataframe(s)\n", treeName);
236 exitCode = 3;
237 }
238
239 // Connect trees but do not copy entries (using the clone function) unless fast copy is on
240 // NOTE Basket size etc. are copied in CloneTree()
241 if (!outputDir) {
242 outputDir = outputFile->mkdir(dfName);
243 currentDirSize = 0;
244 if (verbosity > 0) {
245 printf("Writing to output folder %s\n", dfName);
246 }
247 }
248 outputDir->cd();
249 auto outputTree = inputTree->CloneTree(-1, (fastCopy) ? "fast" : "");
250 currentDirSize += inputTree->GetTotBytes(); // NOTE outputTree->GetTotBytes() is 0, so we use the inputTree here
251 alreadyCopied = true;
252 outputTree->SetAutoFlush(0);
253 trees[treeName] = outputTree;
254 } else {
255 // adjust addresses tree
256 trees[treeName]->CopyAddresses(inputTree);
257 }
258
259 auto outputTree = trees[treeName];
260 // register index and connect VLA columns
261 std::vector<std::pair<int*, int>> indexList;
262 std::vector<char*> vlaPointers;
263 std::vector<int*> indexPointers;
264 TObjArray* branches = inputTree->GetListOfBranches();
265 for (int i = 0; i < branches->GetEntriesFast(); ++i) {
266 TBranch* br = (TBranch*)branches->UncheckedAt(i);
267 TString branchName(br->GetName());
268
269 // detect VLA
270 if (((TLeaf*)br->GetListOfLeaves()->First())->GetLeafCount() != nullptr) {
271 int maximum = ((TLeaf*)br->GetListOfLeaves()->First())->GetLeafCount()->GetMaximum();
272
273 // get type
274 static TClass* cls;
275 EDataType type;
276 br->GetExpectedType(cls, type);
277 auto typeSize = TDataType::GetDataType(type)->Size();
278
279 char* buffer = new char[maximum * typeSize];
280 memset(buffer, 0, maximum * typeSize);
281 vlaPointers.push_back(buffer);
282 if (verbosity > 2) {
283 printf(" Allocated VLA buffer of length %d with %d bytes each for branch name %s\n", maximum, typeSize, br->GetName());
284 }
285 inputTree->SetBranchAddress(br->GetName(), buffer);
286 outputTree->SetBranchAddress(br->GetName(), buffer);
287
288 if (branchName.BeginsWith("fIndexArray")) {
289 for (int i = 0; i < maximum; i++) {
290 indexList.push_back({reinterpret_cast<int*>(buffer + i * typeSize), offsets[getTableName(branchName, treeName)]});
291 }
292 }
293 } else if (branchName.BeginsWith("fIndexSlice")) {
294 int* buffer = new int[2];
295 memset(buffer, 0, 2 * sizeof(buffer[0]));
296 vlaPointers.push_back(reinterpret_cast<char*>(buffer));
297
298 inputTree->SetBranchAddress(br->GetName(), buffer);
299 outputTree->SetBranchAddress(br->GetName(), buffer);
300
301 indexList.push_back({buffer, offsets[getTableName(branchName, treeName)]});
302 indexList.push_back({buffer + 1, offsets[getTableName(branchName, treeName)]});
303 } else if (branchName.BeginsWith("fIndex") && !branchName.EndsWith("_size")) {
304 int* buffer = new int;
305 *buffer = 0;
306 indexPointers.push_back(buffer);
307
308 inputTree->SetBranchAddress(br->GetName(), buffer);
309 outputTree->SetBranchAddress(br->GetName(), buffer);
310
311 indexList.push_back({buffer, offsets[getTableName(branchName, treeName)]});
312 }
313 }
314
315 if (indexList.size() > 0) {
316 auto entries = inputTree->GetEntries();
317 int minIndexOffset = unassignedIndexOffset[treeName];
318 auto newMinIndexOffset = minIndexOffset;
319 for (int i = 0; i < entries; i++) {
320 for (auto& index : indexList) {
321 *(index.first) = 0; // Any positive number will do, in any case it will not be filled in the output. Otherwise the previous entry is used and manipulated in the following.
322 }
323 inputTree->GetEntry(i);
324 // shift index columns by offset
325 for (const auto& idx : indexList) {
326 // if negative, the index is unassigned. In this case, the different unassigned blocks have to get unique negative IDs
327 if (*(idx.first) < 0) {
328 *(idx.first) += minIndexOffset;
329 newMinIndexOffset = std::min(newMinIndexOffset, *(idx.first));
330 } else {
331 *(idx.first) += idx.second;
332 }
333 }
334 if (!alreadyCopied) {
335 int nbytes = outputTree->Fill();
336 if (nbytes > 0) {
337 currentDirSize += nbytes;
338 }
339 }
340 }
341 unassignedIndexOffset[treeName] = newMinIndexOffset;
342 } else if (!alreadyCopied) {
343 auto nbytes = outputTree->CopyEntries(inputTree, -1, (fastCopy) ? "fast" : "");
344 if (nbytes > 0) {
345 currentDirSize += nbytes;
346 }
347 }
348
349 delete inputTree;
350
351 for (auto& buffer : indexPointers) {
352 delete buffer;
353 }
354 for (auto& buffer : vlaPointers) {
355 delete[] buffer;
356 }
357 }
358 if (exitCode > 0) {
359 break;
360 }
361
362 // check if all trees were present
363 if (mergedDFs > 1) {
364 for (auto const& tree : trees) {
365 bool found = (std::find(foundTrees.begin(), foundTrees.end(), tree.first) != foundTrees.end());
366 if (found == false) {
367 printf(" *** FATAL ***: The tree %s was not in the current dataframe\n", tree.first.c_str());
368 exitCode = 4;
369 }
370 }
371 }
372
373 // set to -1 to identify not found tables
374 for (auto& offset : offsets) {
375 offset.second = -1;
376 }
377
378 // update offsets
379 for (auto const& tree : trees) {
380 offsets[removeVersionSuffix(tree.first.c_str())] = tree.second->GetEntries();
381 }
382
383 // check for not found tables
384 for (auto& offset : offsets) {
385 if (offset.second < 0) {
386 if (maxDirSize > 0) {
387 // if maxDirSize is 0 then we do not merge DFs and this error is not an error actually (e.g. for not self-contained derived data)
388 printf("ERROR: Index on %s but no tree found\n", offset.first.c_str());
389 }
390 offset.second = 0;
391 }
392 }
393
394 if (maxDirSize == 0 || currentDirSize > maxDirSize) {
395 if (verbosity > 0) {
396 printf("Maximum size reached: %ld. Closing folder %s.\n", currentDirSize, dfName);
397 }
398 for (auto const& tree : trees) {
399 // printf("Writing %s\n", tree.first.c_str());
400 outputDir->cd();
401 tree.second->Write();
402
403 // stats
404 sizeCompressed[tree.first] += tree.second->GetZipBytes();
405 sizeUncompressed[tree.first] += tree.second->GetTotBytes();
406
407 delete tree.second;
408 }
409 outputDir = nullptr;
410 trees.clear();
411 offsets.clear();
412 mergedDFs = 0;
413 }
414 }
415 inputFile->Close();
416 }
417
418 if (parentFiles) {
419 outputFile->cd();
420 parentFiles->Write("parentFiles", TObject::kSingleKey);
421 }
422
423 for (auto const& tree : trees) {
424 outputDir->cd();
425 tree.second->Write();
426
427 // stats
428 sizeCompressed[tree.first] += tree.second->GetZipBytes();
429 sizeUncompressed[tree.first] += tree.second->GetTotBytes();
430
431 delete tree.second;
432 }
433
434 outputFile->Write();
435 outputFile->Close();
436
437 if (totalMergedDFs == 0) {
438 printf("ERROR: Did not merge a single DF. This does not seem right.\n");
439 exitCode = 2;
440 }
441
442 // in case of failure, remove the incomplete file
443 if (exitCode != 0) {
444 printf("Removing incomplete output file %s.\n", outputFile->GetName());
445 gSystem->Unlink(outputFile->GetName());
446 } else {
447 printf("AOD merger finished. Size overview follows:\n");
448
449 uint64_t totalCompressed = 0;
450 uint64_t totalUncompressed = 0;
451 for (auto const& tree : sizeCompressed) {
452 totalCompressed += tree.second;
453 totalUncompressed += sizeUncompressed[tree.first];
454 }
455 if (totalCompressed > 0 && totalUncompressed > 0) {
456 for (auto const& tree : sizeCompressed) {
457 printf(" Tree %20s | Compressed: %12" PRIu64 " (%2.0f%%) | Uncompressed: %12" PRIu64 " (%2.0f%%)\n", tree.first.c_str(), tree.second, 100.0 * tree.second / totalCompressed, sizeUncompressed[tree.first], 100.0 * sizeUncompressed[tree.first] / totalUncompressed);
458 }
459 }
460 }
461 printf("\n");
462
463 return exitCode;
464}
#define verbosity
int32_t i
uint32_t j
Definition RawData.h:0
uint32_t c
Definition RawData.h:2
const char * getTableName(const char *branchName, const char *treeName)
Definition aodMerger.h:26
const char * removeVersionSuffix(const char *treeName)
Definition aodMerger.h:14
GLuint buffer
Definition glcorearb.h:655
GLuint GLsizei const GLuint const GLintptr * offsets
Definition glcorearb.h:2595
GLuint index
Definition glcorearb.h:781
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLintptr offset
Definition glcorearb.h:660
#define main
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))