Project
Loading...
Searching...
No Matches
AODWriterHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
14#include "AODWriterHelpers.h"
25#include "Framework/Signpost.h"
26
27#include <Monitoring/Monitoring.h>
28#include <TDirectory.h>
29#include <TFile.h>
30#include <TFile.h>
31#include <TTree.h>
32#include <TMap.h>
33#include <TObjString.h>
34#include <arrow/table.h>
35#include <chrono>
36#include <ios>
37
38O2_DECLARE_DYNAMIC_LOG(histogram_registry);
39
41{
42
51
53 TClass* kind = nullptr;
54 void* obj = nullptr;
55 std::string container;
56 std::string name;
57 int count = -1;
58};
59
60const static std::unordered_map<OutputObjHandlingPolicy, std::string> ROOTfileNames = {{OutputObjHandlingPolicy::AnalysisObject, "AnalysisResults.root"},
61 {OutputObjHandlingPolicy::QAObject, "QAResults.root"}};
62
64{
66 int compressionLevel = 505;
67 if (ctx.options().hasOption("aod-writer-compression")) {
68 compressionLevel = ctx.options().get<int>("aod-writer-compression");
69 }
70 return AlgorithmSpec{[dod, compressionLevel](InitContext& ic) -> std::function<void(ProcessingContext&)> {
71 auto outputInputs = ic.services().get<DanglingEdgesContext>().outputsInputsAOD;
72 LOGP(debug, "======== getGlobalAODSink::Init ==========");
73
74 // find out if any table needs to be saved
75 bool hasOutputsToWrite = false;
76 for (auto& outobj : outputInputs) {
77 auto ds = dod->getDataOutputDescriptors(outobj);
78 if (ds.size() > 0) {
79 hasOutputsToWrite = true;
80 break;
81 }
82 }
83
84 // if nothing needs to be saved then return a trivial functor
85 // this happens when nothing needs to be saved but there are dangling outputs
86 if (!hasOutputsToWrite) {
87 return [](ProcessingContext&) mutable -> void {
88 static bool once = false;
89 if (!once) {
90 LOG(info) << "No AODs to be saved.";
91 once = true;
92 }
93 };
94 }
95
96 // end of data functor is called at the end of the data stream
97 auto endofdatacb = [dod](EndOfStreamContext& context) {
98 dod->closeDataFiles();
99 context.services().get<ControlService>().readyToQuit(QuitRequest::Me);
100 };
101
102 auto& callbacks = ic.services().get<CallbackService>();
103 callbacks.set<CallbackService::Id::EndOfStream>(endofdatacb);
104
105 // prepare map<uint64_t, uint64_t>(startTime, tfNumber)
106 std::map<uint64_t, uint64_t> tfNumbers;
107 std::map<uint64_t, std::string> tfFilenames;
108
109 std::vector<TString> aodMetaDataKeys;
110 std::vector<TString> aodMetaDataVals;
111
112 // this functor is called once per time frame
113 return [dod, tfNumbers, tfFilenames, aodMetaDataKeys, aodMetaDataVals, compressionLevel](ProcessingContext& pc) mutable -> void {
114 LOGP(debug, "======== getGlobalAODSink::processing ==========");
115 LOGP(debug, " processing data set with {} entries", pc.inputs().size());
116
117 // return immediately if pc.inputs() is empty. This should never happen!
118 if (pc.inputs().size() == 0) {
119 LOGP(info, "No inputs available!");
120 return;
121 }
122
123 // update tfNumbers
124 uint64_t startTime = 0;
125 uint64_t tfNumber = 0;
126 auto ref = pc.inputs().get("tfn");
127 if (ref.spec && ref.payload) {
128 startTime = DataRefUtils::getHeader<DataProcessingHeader*>(ref)->startTime;
129 tfNumber = pc.inputs().get<uint64_t>("tfn");
130 tfNumbers.insert(std::pair<uint64_t, uint64_t>(startTime, tfNumber));
131 }
132 // update tfFilenames
133 std::string aodInputFile;
134 auto ref2 = pc.inputs().get("tff");
135 if (ref2.spec && ref2.payload) {
136 startTime = DataRefUtils::getHeader<DataProcessingHeader*>(ref2)->startTime;
137 aodInputFile = pc.inputs().get<std::string>("tff");
138 tfFilenames.insert(std::pair<uint64_t, std::string>(startTime, aodInputFile));
139 }
140
141 // close all output files if one has reached size limit
142 dod->checkFileSizes();
143
144 // loop over the DataRefs which are contained in pc.inputs()
145 for (const auto& ref : pc.inputs()) {
146 if (!ref.spec) {
147 LOGP(debug, "Invalid input will be skipped!");
148 continue;
149 }
150
151 // get metadata
152 if (DataSpecUtils::partialMatch(*ref.spec, header::DataDescription("AODMetadataKeys"))) {
153 aodMetaDataKeys = pc.inputs().get<std::vector<TString>>(ref.spec->binding);
154 }
155 if (DataSpecUtils::partialMatch(*ref.spec, header::DataDescription("AODMetadataVals"))) {
156 aodMetaDataVals = pc.inputs().get<std::vector<TString>>(ref.spec->binding);
157 }
158
159 // skip non-AOD refs
160 if (!DataSpecUtils::partialMatch(*ref.spec, writableAODOrigins)) {
161 continue;
162 }
163 startTime = DataRefUtils::getHeader<DataProcessingHeader*>(ref)->startTime;
164
165 // does this need to be saved?
166 auto dh = DataRefUtils::getHeader<header::DataHeader*>(ref);
167 auto tableName = dh->dataDescription.as<std::string>();
168 auto ds = dod->getDataOutputDescriptors(*dh);
169 if (ds.empty()) {
170 continue;
171 }
172
173 // get TF number from startTime
174 auto it = tfNumbers.find(startTime);
175 if (it != tfNumbers.end()) {
176 tfNumber = (it->second / dod->getNumberTimeFramesToMerge()) * dod->getNumberTimeFramesToMerge();
177 } else {
178 LOGP(fatal, "No time frame number found for output with start time {}", startTime);
179 throw std::runtime_error("Processing is stopped!");
180 }
181 // get aod input file from startTime
182 auto it2 = tfFilenames.find(startTime);
183 if (it2 != tfFilenames.end()) {
184 aodInputFile = it2->second;
185 }
186
187 // get the TableConsumer and corresponding arrow table
188 if (ref.header == nullptr) {
189 LOGP(error, "No header for message {}:{}", ref.spec->binding, DataSpecUtils::describe(*ref.spec));
190 continue;
191 }
192
193 auto table = pc.inputs().get<TableConsumer>(std::get<ConcreteDataMatcher>(ref.spec->matcher))->asArrowTable();
194 if (!table->Validate().ok()) {
195 LOGP(warning, "The table \"{}\" is not valid and will not be saved!", tableName);
196 continue;
197 }
198 if (table->schema()->fields().empty()) {
199 LOGP(debug, "The table \"{}\" is empty but will be saved anyway!", tableName);
200 }
201
202 // loop over all DataOutputDescriptors
203 // a table can be saved in multiple ways
204 // e.g. different selections of columns to different files
205 for (auto d : ds) {
206 auto fileAndFolder = dod->getFileFolder(d, tfNumber, aodInputFile, compressionLevel);
207 auto treename = fileAndFolder.folderName + "/" + d->treename;
208 TableToTree ta2tr(table,
209 fileAndFolder.file,
210 treename.c_str());
211
212 // update metadata
213 if (fileAndFolder.file->FindObjectAny("metaData")) {
214 LOGF(debug, "Metadata: target file %s already has metadata, preserving it", fileAndFolder.file->GetName());
215 } else if (!aodMetaDataKeys.empty() && !aodMetaDataVals.empty()) {
216 TMap aodMetaDataMap;
217 for (uint32_t imd = 0; imd < aodMetaDataKeys.size(); imd++) {
218 aodMetaDataMap.Add(new TObjString(aodMetaDataKeys[imd]), new TObjString(aodMetaDataVals[imd]));
219 }
220 fileAndFolder.file->WriteObject(&aodMetaDataMap, "metaData", "Overwrite");
221 }
222
223 if (!d->colnames.empty()) {
224 for (auto& cn : d->colnames) {
225 auto idx = table->schema()->GetFieldIndex(cn);
226 auto col = table->column(idx);
227 auto field = table->schema()->field(idx);
228 if (idx != -1) {
229 ta2tr.addBranch(col, field);
230 }
231 }
232 } else {
233 ta2tr.addAllBranches();
234 }
235 ta2tr.process();
236 }
237 }
238 };
239 }
240
241 };
242}
243
245{
246 return AlgorithmSpec{[](InitContext& ic) -> std::function<void(ProcessingContext&)> {
247 using namespace monitoring;
248 auto& dec = ic.services().get<DanglingEdgesContext>();
249 auto tskmap = dec.outTskMap;
250 auto objmap = dec.outObjHistMap;
251 auto& callbacks = ic.services().get<CallbackService>();
252 auto inputObjects = std::make_shared<std::vector<std::pair<InputObjectRoute, InputObject>>>();
253
255 for (auto i = 0u; i < OutputObjHandlingPolicy::numPolicies; ++i) {
256 f[i] = nullptr;
257 }
258
259 static std::string currentDirectory = "";
260 static std::string currentFile = "";
261
262 auto endofdatacb = [inputObjects](EndOfStreamContext& context) {
263 LOG(debug) << "Writing merged objects and histograms to file";
264 if (inputObjects->empty()) {
265 LOG(error) << "Output object map is empty!";
266 context.services().get<ControlService>().readyToQuit(QuitRequest::Me);
267 return;
268 }
269 for (auto i = 0u; i < OutputObjHandlingPolicy::numPolicies; ++i) {
270 if (f[i] != nullptr) {
271 f[i]->Close();
272 }
273 }
274 LOG(debug) << "All outputs merged in their respective target files";
275 context.services().get<ControlService>().readyToQuit(QuitRequest::Me);
276 };
277
278 callbacks.set<CallbackService::Id::EndOfStream>(endofdatacb);
279 return [inputObjects, objmap, tskmap](ProcessingContext& pc) mutable -> void {
280 auto mergePart = [&inputObjects, &objmap, &tskmap](DataRef const& ref) {
281 O2_SIGNPOST_ID_GENERATE(hid, histogram_registry);
282 O2_SIGNPOST_START(histogram_registry, hid, "mergePart", "Merging histogram");
283 if (!ref.header) {
284 O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "mergePart", "Header not found.");
285 return;
286 }
287 auto datah = o2::header::get<o2::header::DataHeader*>(ref.header);
288 if (!datah) {
289 O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "mergePart", "No data header in stack");
290 return;
291 }
292
293 if (!ref.payload) {
294 O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "mergePart", "Payload not found for %{public}s/%{public}s/%d",
295 datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
296 datah->subSpecification);
297 return;
298 }
299
300 auto objh = o2::header::get<o2::framework::OutputObjHeader*>(ref.header);
301 if (!objh) {
302 O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "mergePart", "No output object header in stack of %{public}s/%{public}s/%d.",
303 datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
304 datah->subSpecification);
305 return;
306 }
307
308 InputObject obj;
309 FairInputTBuffer tm(const_cast<char*>(ref.payload), static_cast<int>(datah->payloadSize));
310 tm.InitMap();
311 obj.kind = tm.ReadClass();
312 tm.SetBufferOffset(0);
313 tm.ResetMap();
314 O2_SIGNPOST_ID_GENERATE(did, histogram_registry);
315 O2_SIGNPOST_START(histogram_registry, did, "initialising root", "Starting deserialization of %{public}s/%{public}s/%d",
316 datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
317 datah->subSpecification);
318 if (obj.kind == nullptr) {
319 O2_SIGNPOST_END(histogram_registry, did, "initialising root", "Failed to deserialise");
320 O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "mergePart", "Cannot read class info from buffer of %{public}s/%{public}s/%d.",
321 datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
322 datah->subSpecification);
323 return;
324 }
325 O2_SIGNPOST_END(histogram_registry, did, "initialising root", "Done init.");
326
327 auto policy = objh->mPolicy;
328 auto sourceType = objh->mSourceType;
329 auto hash = objh->mTaskHash;
330 O2_SIGNPOST_START(histogram_registry, did, "deserialization", "Starting deserialization of %{public}s/%{public}s/%d",
331 datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
332 datah->subSpecification);
333
334 obj.obj = tm.ReadObjectAny(obj.kind);
335 auto* named = static_cast<TNamed*>(obj.obj);
336 obj.name = named->GetName();
337 O2_SIGNPOST_END(histogram_registry, did, "deserialization", "Done deserialization.");
338 // If we have a folder, we assume the first element of the path
339 // to be the name of the registry.
340 bool folderForContainer = false;
341 if (sourceType == HistogramRegistrySource) {
342 folderForContainer = objh->createContainer != 0;
343 obj.container = objh->containerName;
344 } else {
345 obj.container = obj.name;
346 }
347 auto hpos = std::find_if(tskmap.begin(), tskmap.end(), [&](auto&& x) { return x.id == hash; });
348 if (hpos == tskmap.end()) {
349 O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "mergePart", "No task found for hash %d.", hash);
350 return;
351 }
352 auto taskname = hpos->name;
353 auto opos = std::find_if(objmap.begin(), objmap.end(), [&](auto&& x) { return x.id == hash; });
354 if (opos == objmap.end()) {
355 O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "mergePart", "No object list found for task %{public}s (hash=%d).",
356 taskname.c_str(), hash);
357 return;
358 }
359 auto objects = opos->bindings;
360 if (std::find(objects.begin(), objects.end(), obj.container) == objects.end()) {
361 O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "mergePart", "No container %{public}s in map for task %{public}s.",
362 obj.container.c_str(), taskname.c_str());
363 return;
364 }
365 auto nameHash = runtime_hash(obj.name.c_str());
366 InputObjectRoute key{obj.name, nameHash, taskname, hash, policy, sourceType};
367 auto existing = std::find_if(inputObjects->begin(), inputObjects->end(), [&](auto&& x) { return (x.first.uniqueId == nameHash) && (x.first.taskHash == hash); });
368 // If it's the first one, we just add it to the list.
369 O2_SIGNPOST_START(histogram_registry, did, "merging", "Starting merging of %{public}s/%{public}s/%d",
370 datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
371 datah->subSpecification);
372 if (existing == inputObjects->end()) {
373 obj.count = objh->mPipelineSize;
374 inputObjects->emplace_back(key, obj);
375 existing = inputObjects->end() - 1;
376 } else {
377 obj.count = existing->second.count;
378 // Otherwise, we merge it with the existing one.
379 auto merger = existing->second.kind->GetMerge();
380 if (!merger) {
381 O2_SIGNPOST_END(histogram_registry, did, "merging", "Unabled to merge");
382 O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid, "merging", "Already one unmergeable object found for %{public}s", obj.name.c_str());
383 return;
384 }
385 TList coll;
386 coll.Add(static_cast<TObject*>(obj.obj));
387 merger(existing->second.obj, &coll, nullptr);
388 }
389 // We expect as many objects as the pipeline size, for
390 // a given object name and task hash.
391 existing->second.count -= 1;
392
393 if (existing->second.count != 0) {
394 O2_SIGNPOST_END(histogram_registry, did, "merging", "Done partial merging.");
395 O2_SIGNPOST_END(histogram_registry, hid, "mergePart", "Pipeline lanes still missing.");
396 return;
397 }
398 O2_SIGNPOST_END(histogram_registry, did, "merging", "Done merging.");
399 // Write the object here.
400 auto route = existing->first;
401 auto entry = existing->second;
402 auto file = ROOTfileNames.find(route.policy);
403 if (file == ROOTfileNames.end()) {
404 O2_SIGNPOST_END(histogram_registry, hid, "mergePart", "Not matching any file.");
405 return;
406 }
407 O2_SIGNPOST_START(histogram_registry, did, "writing", "Starting writing of %{public}s/%{public}s/%d",
408 datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
409 datah->subSpecification);
410 auto filename = file->second;
411 if (f[route.policy] == nullptr) {
412 f[route.policy] = TFile::Open(filename.c_str(), "RECREATE");
413 }
414 auto nextDirectory = route.directory;
415 if ((nextDirectory != currentDirectory) || (filename != currentFile)) {
416 if (!f[route.policy]->FindKey(nextDirectory.c_str())) {
417 f[route.policy]->mkdir(nextDirectory.c_str());
418 }
419 currentDirectory = nextDirectory;
420 currentFile = filename;
421 }
422
423 // FIXME: handle folders
424 f[route.policy]->cd("/");
425 auto* currentDir = f[route.policy]->GetDirectory(currentDirectory.c_str());
426
427 // In case we need a folder for the registry, let's create it.
428 if (folderForContainer) {
429 auto* histogramRegistryFolder = currentDir->GetDirectory(obj.container.data());
430 if (!histogramRegistryFolder) {
431 histogramRegistryFolder = currentDir->mkdir(obj.container.c_str(), "", kTRUE);
432 }
433 currentDir = histogramRegistryFolder;
434 }
435
436 // The name contains a path...
437 int objSize = 0;
438 if (sourceType == HistogramRegistrySource) {
439 TDirectory* currentFolder = currentDir;
440 O2_SIGNPOST_EVENT_EMIT(histogram_registry, hid, "mergePart", "Toplevel folder is %{public}s.",
441 currentDir->GetName());
442 std::string objName = entry.name;
443 auto lastSlash = entry.name.rfind('/');
444
445 if (lastSlash != std::string::npos) {
446 auto dirname = entry.name.substr(0, lastSlash);
447 objName = entry.name.substr(lastSlash + 1);
448 currentFolder = currentDir->GetDirectory(dirname.c_str());
449 if (!currentFolder) {
450 O2_SIGNPOST_EVENT_EMIT(histogram_registry, hid, "mergePart", "Creating folder %{public}s",
451 dirname.c_str());
452 currentFolder = currentDir->mkdir(dirname.c_str(), "", kTRUE);
453 } else {
454 O2_SIGNPOST_EVENT_EMIT(histogram_registry, hid, "mergePart", "Folder %{public}s already there.",
455 currentFolder->GetName());
456 }
457 }
458 O2_SIGNPOST_EVENT_EMIT(histogram_registry, hid, "mergePart", "Writing %{public}s of kind %{public}s in %{public}s",
459 entry.name.c_str(), entry.kind->GetName(), currentDir->GetName());
460 objSize = currentFolder->WriteObjectAny(entry.obj, entry.kind, objName.c_str());
461 O2_SIGNPOST_END(histogram_registry, did, "writing", "End writing %{public}s", entry.name.c_str());
462 delete (TObject*)entry.obj;
463 entry.obj = nullptr;
464 } else {
465 O2_SIGNPOST_EVENT_EMIT(histogram_registry, hid, "mergePart", "Writing %{public}s of kind %{public}s in %{public}s",
466 entry.name.c_str(), entry.kind->GetName(), currentDir->GetName());
467 objSize = currentDir->WriteObjectAny(entry.obj, entry.kind, entry.name.c_str());
468 O2_SIGNPOST_END(histogram_registry, did, "writing", "End writing %{public}s", entry.name.c_str());
469 delete (TObject*)entry.obj;
470 entry.obj = nullptr;
471 }
472 O2_SIGNPOST_END(histogram_registry, hid, "mergePart", "Done merging object of %d bytes.", objSize);
473 };
474 O2_SIGNPOST_ID_GENERATE(rid, histogram_registry);
475 O2_SIGNPOST_START(histogram_registry, rid, "processParts", "Start merging %zu parts received together.", pc.inputs().getNofParts(0));
476 for (auto pi = 0U; pi < pc.inputs().getNofParts(0); ++pi) {
477 mergePart(pc.inputs().get("x", pi));
478 }
479 O2_SIGNPOST_END(histogram_registry, rid, "processParts", "Done histograms in multipart message.");
480 };
481 }};
482}
483} // namespace o2::framework::writers
std::vector< std::string > objects
uint32_t hash
std::ostringstream debug
int32_t i
uint32_t col
Definition RawData.h:4
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_END_WITH_ERROR(log, id, name, format,...)
Definition Signpost.h:616
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
constexpr uint32_t runtime_hash(char const *str)
StringRef key
ConfigParamRegistry & options() const
bool hasOption(const char *key) const
void addBranch(std::shared_ptr< arrow::ChunkedArray > const &column, std::shared_ptr< arrow::Field > const &field)
std::shared_ptr< TTree > process()
GLint GLenum GLint x
Definition glcorearb.h:403
GLint GLsizei count
Definition glcorearb.h:399
GLuint entry
Definition glcorearb.h:5735
GLdouble f
Definition glcorearb.h:310
GLint ref
Definition glcorearb.h:291
@ Me
Only quit this data processor.
OutputObjHandlingPolicy
Policy enum to determine OutputObj handling when writing.
std::string filename()
static std::shared_ptr< DataOutputDirector > getDataOutputDirector(ConfigContext const &ctx)
Get the data director.
std::vector< OutputTaskInfo > outTskMap
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static AlgorithmSpec getOutputObjHistWriter(ConfigContext const &context)
static AlgorithmSpec getOutputTTreeWriter(ConfigContext const &context)
o2::mch::DsIndex ds
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"