Project
Loading...
Searching...
No Matches
aodMerger.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <map>
13#include <list>
14#include <fstream>
15#include <getopt.h>
16
17#include "TSystem.h"
18#include "TFile.h"
19#include "TTree.h"
20#include "TList.h"
21#include "TKey.h"
22#include "TDirectory.h"
23#include "TObjString.h"
24#include <TGrid.h>
25#include <TMap.h>
26#include <TLeaf.h>
27
28#include "aodMerger.h"
29#include <cinttypes>
30
31// AOD merger with correct index rewriting
32// No need to know the datamodel because the branch names follow a canonical standard (identified by fIndex)
33int main(int argc, char* argv[])
34{
35 std::string inputCollection("input.txt");
36 std::string outputFileName("AO2D.root");
37 long maxDirSize = 100000000;
38 bool skipNonExistingFiles = false;
39 bool skipParentFilesList = false;
40 int verbosity = 2;
41 int exitCode = 0; // 0: success, >0: failure
42 int compression = 505;
43
44 int option_index = 0;
45 static struct option long_options[] = {
46 {"input", required_argument, nullptr, 0},
47 {"output", required_argument, nullptr, 1},
48 {"max-size", required_argument, nullptr, 2},
49 {"skip-non-existing-files", no_argument, nullptr, 3},
50 {"skip-parent-files-list", no_argument, nullptr, 4},
51 {"compression", required_argument, nullptr, 5},
52 {"verbosity", required_argument, nullptr, 'v'},
53 {"help", no_argument, nullptr, 'h'},
54 {nullptr, 0, nullptr, 0}};
55
56 while (true) {
57 int c = getopt_long(argc, argv, "", long_options, &option_index);
58 if (c == -1) {
59 break;
60 } else if (c == 0) {
61 inputCollection = optarg;
62 } else if (c == 1) {
63 outputFileName = optarg;
64 } else if (c == 2) {
65 maxDirSize = atol(optarg);
66 } else if (c == 3) {
67 skipNonExistingFiles = true;
68 } else if (c == 4) {
69 skipParentFilesList = true;
70 } else if (c == 5) {
71 compression = atoi(optarg);
72 } else if (c == 'v') {
73 verbosity = atoi(optarg);
74 } else if (c == 'h') {
75 printf("AO2D merging tool. Options: \n");
76 printf(" --input <inputfile.txt> Contains path to files to be merged. Default: %s\n", inputCollection.c_str());
77 printf(" --output <outputfile.root> Target output ROOT file. Default: %s\n", outputFileName.c_str());
78 printf(" --max-size <size in Bytes> Target directory size. Default: %ld. Set to 0 if file is not self-contained.\n", maxDirSize);
79 printf(" --skip-non-existing-files Flag to allow skipping of non-existing files in the input list.\n");
80 printf(" --skip-parent-files-list Flag to allow skipping the merging of the parent files list.\n");
81 printf(" --compression <root compression id> Compression algorithm / level to use (default: %d)\n", compression);
82 printf(" --verbosity <flag> Verbosity of output (default: %d).\n", verbosity);
83 return -1;
84 } else {
85 return -2;
86 }
87 }
88
89 printf("AOD merger started with:\n");
90 printf(" Input file: %s\n", inputCollection.c_str());
91 printf(" Output file name: %s\n", outputFileName.c_str());
92 printf(" Maximal folder size (uncompressed): %ld\n", maxDirSize);
93 if (skipNonExistingFiles) {
94 printf(" WARNING: Skipping non-existing files.\n");
95 }
96
97 std::map<std::string, TTree*> trees;
98 std::map<std::string, uint64_t> sizeCompressed;
99 std::map<std::string, uint64_t> sizeUncompressed;
100 std::map<std::string, int> offsets;
101 std::map<std::string, int> unassignedIndexOffset;
102
103 auto outputFile = TFile::Open(outputFileName.c_str(), "RECREATE", "", compression);
104 TDirectory* outputDir = nullptr;
105 long currentDirSize = 0;
106
107 std::ifstream in;
108 in.open(inputCollection);
109 TString line;
110 bool connectedToAliEn = false;
111 TMap* metaData = nullptr;
112 TMap* parentFiles = nullptr;
113 int totalMergedDFs = 0;
114 int mergedDFs = 0;
115 while (in.good() && exitCode == 0) {
116 in >> line;
117
118 if (line.Length() == 0) {
119 continue;
120 }
121
122 if (line.BeginsWith("alien:") && !connectedToAliEn) {
123 printf("Connecting to AliEn...");
124 TGrid::Connect("alien:");
125 connectedToAliEn = true; // Only try once
126 }
127
128 printf("Processing input file: %s\n", line.Data());
129
130 auto inputFile = TFile::Open(line);
131 if (!inputFile || inputFile->IsZombie()) {
132 printf("Error: %s input file %s.\n", !inputFile ? "Could not open" : "Zombie", line.Data());
133 if (skipNonExistingFiles) {
134 continue;
135 } else {
136 printf("Aborting merge!\n");
137 exitCode = 1;
138 break;
139 }
140 }
141
142 TList* keyList = inputFile->GetListOfKeys();
143 keyList->Sort();
144
145 for (auto key1 : *keyList) {
146 if (((TObjString*)key1)->GetString().EqualTo("metaData")) {
147 auto metaDataCurrentFile = (TMap*)inputFile->Get("metaData");
148 if (metaData == nullptr) {
149 metaData = metaDataCurrentFile;
150 outputFile->cd();
151 metaData->Write("metaData", TObject::kSingleKey);
152 } else {
153 for (auto metaDataPair : *metaData) {
154 auto metaDataKey = ((TPair*)metaDataPair)->Key();
155 if (metaDataCurrentFile->Contains(((TObjString*)metaDataKey)->GetString())) {
156 auto value = (TObjString*)metaData->GetValue(((TObjString*)metaDataKey)->GetString());
157 auto valueCurrentFile = (TObjString*)metaDataCurrentFile->GetValue(((TObjString*)metaDataKey)->GetString());
158 if (!value->GetString().EqualTo(valueCurrentFile->GetString())) {
159 printf("WARNING: Metadata differs between input files. Key %s : %s vs. %s\n", ((TObjString*)metaDataKey)->GetString().Data(),
160 value->GetString().Data(), valueCurrentFile->GetString().Data());
161 }
162 } else {
163 printf("WARNING: Metadata differs between input files. Key %s is not present in current file\n", ((TObjString*)metaDataKey)->GetString().Data());
164 }
165 }
166 }
167 }
168
169 if (((TObjString*)key1)->GetString().EqualTo("parentFiles") && !skipParentFilesList) {
170 auto parentFilesCurrentFile = (TMap*)inputFile->Get("parentFiles");
171 if (parentFiles == nullptr) {
172 parentFiles = new TMap;
173 }
174 for (auto pair : *parentFilesCurrentFile) {
175 parentFiles->Add(((TPair*)pair)->Key(), ((TPair*)pair)->Value());
176 }
177 delete parentFilesCurrentFile;
178 }
179
180 if (!((TObjString*)key1)->GetString().BeginsWith("DF_")) {
181 continue;
182 }
183
184 auto dfName = ((TObjString*)key1)->GetString().Data();
185
186 if (verbosity > 0) {
187 printf(" Processing folder %s\n", dfName);
188 }
189 ++mergedDFs;
190 ++totalMergedDFs;
191 auto folder = (TDirectoryFile*)inputFile->Get(dfName);
192 auto treeList = folder->GetListOfKeys();
193
194 treeList->Sort();
195
196 // purging keys from duplicates
197 for (auto i = 0; i < treeList->GetEntries(); ++i) {
198 TKey* ki = (TKey*)treeList->At(i);
199 for (int j = i + 1; j < treeList->GetEntries(); ++j) {
200 TKey* kj = (TKey*)treeList->At(j);
201 if (std::strcmp(ki->GetName(), kj->GetName()) == 0 && std::strcmp(ki->GetTitle(), kj->GetTitle()) == 0) {
202 if (ki->GetCycle() < kj->GetCycle()) {
203 printf(" *** FATAL *** we had ordered the keys, first cycle should be higher, please check");
204 exitCode = 5;
205 } else {
206 // key is a duplicate, let's remove it
207 treeList->Remove(kj);
208 j--;
209 }
210 } else {
211 // we changed key, since they are sorted, we won't have the same anymore
212 break;
213 }
214 }
215 }
216
217 std::list<std::string> foundTrees;
218
219 for (auto key2 : *treeList) {
220 auto treeName = ((TObjString*)key2)->GetString().Data();
221 bool found = (std::find(foundTrees.begin(), foundTrees.end(), treeName) != foundTrees.end());
222 if (found == true) {
223 printf(" ***WARNING*** Tree %s was already merged (even if we purged duplicated trees before, so this should not happen), skipping\n", treeName);
224 continue;
225 }
226 foundTrees.push_back(treeName);
227
228 auto inputTree = (TTree*)inputFile->Get(Form("%s/%s", dfName, treeName));
229 bool fastCopy = (inputTree->GetTotBytes() > 10000000); // Only do this for large enough trees to avoid that baskets are too small
230 if (verbosity > 1) {
231 printf(" Processing tree %s with %lld entries with total size %lld (fast copy: %d)\n", treeName, inputTree->GetEntries(), inputTree->GetTotBytes(), fastCopy);
232 }
233
234 bool alreadyCopied = false;
235 if (trees.count(treeName) == 0) {
236 if (mergedDFs > 1) {
237 printf(" *** FATAL ***: The tree %s was not in the previous dataframe(s)\n", treeName);
238 exitCode = 3;
239 }
240
241 // Connect trees but do not copy entries (using the clone function) unless fast copy is on
242 // NOTE Basket size etc. are copied in CloneTree()
243 if (!outputDir) {
244 outputDir = outputFile->mkdir(dfName);
245 currentDirSize = 0;
246 if (verbosity > 0) {
247 printf("Writing to output folder %s\n", dfName);
248 }
249 }
250 outputDir->cd();
251 auto outputTree = inputTree->CloneTree(-1, (fastCopy) ? "fast" : "");
252 currentDirSize += inputTree->GetTotBytes(); // NOTE outputTree->GetTotBytes() is 0, so we use the inputTree here
253 alreadyCopied = true;
254 outputTree->SetAutoFlush(0);
255 trees[treeName] = outputTree;
256 } else {
257 // adjust addresses tree
258 trees[treeName]->CopyAddresses(inputTree);
259 }
260
261 auto outputTree = trees[treeName];
262 // register index and connect VLA columns
263 std::vector<std::pair<int*, int>> indexList;
264 std::vector<char*> vlaPointers;
265 std::vector<int*> indexPointers;
266 TObjArray* branches = inputTree->GetListOfBranches();
267 for (int i = 0; i < branches->GetEntriesFast(); ++i) {
268 TBranch* br = (TBranch*)branches->UncheckedAt(i);
269 TString branchName(br->GetName());
270
271 // detect VLA
272 if (((TLeaf*)br->GetListOfLeaves()->First())->GetLeafCount() != nullptr) {
273 int maximum = ((TLeaf*)br->GetListOfLeaves()->First())->GetLeafCount()->GetMaximum();
274
275 // get type
276 static TClass* cls;
277 EDataType type;
278 br->GetExpectedType(cls, type);
279 auto typeSize = TDataType::GetDataType(type)->Size();
280
281 char* buffer = new char[maximum * typeSize];
282 memset(buffer, 0, maximum * typeSize);
283 vlaPointers.push_back(buffer);
284 if (verbosity > 2) {
285 printf(" Allocated VLA buffer of length %d with %d bytes each for branch name %s\n", maximum, typeSize, br->GetName());
286 }
287 inputTree->SetBranchAddress(br->GetName(), buffer);
288 outputTree->SetBranchAddress(br->GetName(), buffer);
289
290 if (branchName.BeginsWith("fIndexArray")) {
291 for (int i = 0; i < maximum; i++) {
292 indexList.push_back({reinterpret_cast<int*>(buffer + i * typeSize), offsets[getTableName(branchName, treeName)]});
293 }
294 }
295 } else if (branchName.BeginsWith("fIndexSlice")) {
296 int* buffer = new int[2];
297 memset(buffer, 0, 2 * sizeof(buffer[0]));
298 vlaPointers.push_back(reinterpret_cast<char*>(buffer));
299
300 inputTree->SetBranchAddress(br->GetName(), buffer);
301 outputTree->SetBranchAddress(br->GetName(), buffer);
302
303 indexList.push_back({buffer, offsets[getTableName(branchName, treeName)]});
304 indexList.push_back({buffer + 1, offsets[getTableName(branchName, treeName)]});
305 } else if (branchName.BeginsWith("fIndex") && !branchName.EndsWith("_size")) {
306 int* buffer = new int;
307 *buffer = 0;
308 indexPointers.push_back(buffer);
309
310 inputTree->SetBranchAddress(br->GetName(), buffer);
311 outputTree->SetBranchAddress(br->GetName(), buffer);
312
313 indexList.push_back({buffer, offsets[getTableName(branchName, treeName)]});
314 }
315 }
316
317 if (indexList.size() > 0) {
318 auto entries = inputTree->GetEntries();
319 int minIndexOffset = unassignedIndexOffset[treeName];
320 auto newMinIndexOffset = minIndexOffset;
321 for (int i = 0; i < entries; i++) {
322 for (auto& index : indexList) {
323 *(index.first) = 0; // Any positive number will do, in any case it will not be filled in the output. Otherwise the previous entry is used and manipulated in the following.
324 }
325 inputTree->GetEntry(i);
326 // shift index columns by offset
327 for (const auto& idx : indexList) {
328 // if negative, the index is unassigned. In this case, the different unassigned blocks have to get unique negative IDs
329 if (*(idx.first) < 0) {
330 *(idx.first) += minIndexOffset;
331 newMinIndexOffset = std::min(newMinIndexOffset, *(idx.first));
332 } else {
333 *(idx.first) += idx.second;
334 }
335 }
336 if (!alreadyCopied) {
337 int nbytes = outputTree->Fill();
338 if (nbytes > 0) {
339 currentDirSize += nbytes;
340 }
341 }
342 }
343 unassignedIndexOffset[treeName] = newMinIndexOffset;
344 } else if (!alreadyCopied) {
345 auto nbytes = outputTree->CopyEntries(inputTree, -1, (fastCopy) ? "fast" : "");
346 if (nbytes > 0) {
347 currentDirSize += nbytes;
348 }
349 }
350
351 delete inputTree;
352
353 for (auto& buffer : indexPointers) {
354 delete buffer;
355 }
356 for (auto& buffer : vlaPointers) {
357 delete[] buffer;
358 }
359 }
360 if (exitCode > 0) {
361 break;
362 }
363
364 // check if all trees were present
365 if (mergedDFs > 1) {
366 for (auto const& tree : trees) {
367 bool found = (std::find(foundTrees.begin(), foundTrees.end(), tree.first) != foundTrees.end());
368 if (found == false) {
369 printf(" *** FATAL ***: The tree %s was not in the current dataframe\n", tree.first.c_str());
370 exitCode = 4;
371 }
372 }
373 }
374
375 // set to -1 to identify not found tables
376 for (auto& offset : offsets) {
377 offset.second = -1;
378 }
379
380 // update offsets
381 for (auto const& tree : trees) {
382 offsets[removeVersionSuffix(tree.first.c_str())] = tree.second->GetEntries();
383 }
384
385 // check for not found tables
386 for (auto& offset : offsets) {
387 if (offset.second < 0) {
388 if (maxDirSize > 0) {
389 // if maxDirSize is 0 then we do not merge DFs and this error is not an error actually (e.g. for not self-contained derived data)
390 printf("ERROR: Index on %s but no tree found\n", offset.first.c_str());
391 }
392 offset.second = 0;
393 }
394 }
395
396 if (maxDirSize == 0 || currentDirSize > maxDirSize) {
397 if (verbosity > 0) {
398 printf("Maximum size reached: %ld. Closing folder %s.\n", currentDirSize, dfName);
399 }
400 for (auto const& tree : trees) {
401 // printf("Writing %s\n", tree.first.c_str());
402 outputDir->cd();
403 tree.second->Write();
404
405 // stats
406 sizeCompressed[tree.first] += tree.second->GetZipBytes();
407 sizeUncompressed[tree.first] += tree.second->GetTotBytes();
408
409 delete tree.second;
410 }
411 outputDir = nullptr;
412 trees.clear();
413 offsets.clear();
414 mergedDFs = 0;
415 }
416 }
417 inputFile->Close();
418 }
419
420 if (parentFiles) {
421 outputFile->cd();
422 parentFiles->Write("parentFiles", TObject::kSingleKey);
423 }
424
425 for (auto const& tree : trees) {
426 outputDir->cd();
427 tree.second->Write();
428
429 // stats
430 sizeCompressed[tree.first] += tree.second->GetZipBytes();
431 sizeUncompressed[tree.first] += tree.second->GetTotBytes();
432
433 delete tree.second;
434 }
435
436 outputFile->Write();
437 outputFile->Close();
438
439 if (totalMergedDFs == 0) {
440 printf("ERROR: Did not merge a single DF. This does not seem right.\n");
441 exitCode = 2;
442 }
443
444 // in case of failure, remove the incomplete file
445 if (exitCode != 0) {
446 printf("Removing incomplete output file %s.\n", outputFile->GetName());
447 gSystem->Unlink(outputFile->GetName());
448 } else {
449 printf("AOD merger finished. Size overview follows:\n");
450
451 uint64_t totalCompressed = 0;
452 uint64_t totalUncompressed = 0;
453 for (auto const& tree : sizeCompressed) {
454 totalCompressed += tree.second;
455 totalUncompressed += sizeUncompressed[tree.first];
456 }
457 if (totalCompressed > 0 && totalUncompressed > 0) {
458 for (auto const& tree : sizeCompressed) {
459 printf(" Tree %20s | Compressed: %12" PRIu64 " (%2.0f%%) | Uncompressed: %12" PRIu64 " (%2.0f%%)\n", tree.first.c_str(), tree.second, 100.0 * tree.second / totalCompressed, sizeUncompressed[tree.first], 100.0 * sizeUncompressed[tree.first] / totalUncompressed);
460 }
461 }
462 }
463 printf("\n");
464
465 return exitCode;
466}
#define verbosity
int32_t i
uint32_t j
Definition RawData.h:0
uint32_t c
Definition RawData.h:2
const char * getTableName(const char *branchName, const char *treeName)
Definition aodMerger.h:26
const char * removeVersionSuffix(const char *treeName)
Definition aodMerger.h:14
GLuint buffer
Definition glcorearb.h:655
GLuint GLsizei const GLuint const GLintptr * offsets
Definition glcorearb.h:2595
GLuint index
Definition glcorearb.h:781
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLintptr offset
Definition glcorearb.h:660
#define main
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))