Project
Loading...
Searching...
No Matches
AODWriterHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
14#include "AODWriterHelpers.h"
25
26#include <Monitoring/Monitoring.h>
27#include <TFile.h>
28#include <TFile.h>
29#include <TTree.h>
30#include <TMap.h>
31#include <TObjString.h>
32#include <arrow/table.h>
33
35{
36
45
47 TClass* kind = nullptr;
48 void* obj = nullptr;
49 std::string name;
50 int count = -1;
51};
52
53const static std::unordered_map<OutputObjHandlingPolicy, std::string> ROOTfileNames = {{OutputObjHandlingPolicy::AnalysisObject, "AnalysisResults.root"},
54 {OutputObjHandlingPolicy::QAObject, "QAResults.root"}};
55
57{
58 auto& ac = ctx.services().get<AnalysisContext>();
60 int compressionLevel = 505;
61 if (ctx.options().hasOption("aod-writer-compression")) {
62 compressionLevel = ctx.options().get<int>("aod-writer-compression");
63 }
64 return AlgorithmSpec{[dod, outputInputs = ac.outputsInputsAOD, compressionLevel](InitContext& ic) -> std::function<void(ProcessingContext&)> {
65 LOGP(debug, "======== getGlobalAODSink::Init ==========");
66
67 // find out if any table needs to be saved
68 bool hasOutputsToWrite = false;
69 for (auto& outobj : outputInputs) {
70 auto ds = dod->getDataOutputDescriptors(outobj);
71 if (ds.size() > 0) {
72 hasOutputsToWrite = true;
73 break;
74 }
75 }
76
77 // if nothing needs to be saved then return a trivial functor
78 // this happens when nothing needs to be saved but there are dangling outputs
79 if (!hasOutputsToWrite) {
80 return [](ProcessingContext&) mutable -> void {
81 static bool once = false;
82 if (!once) {
83 LOG(info) << "No AODs to be saved.";
84 once = true;
85 }
86 };
87 }
88
89 // end of data functor is called at the end of the data stream
90 auto endofdatacb = [dod](EndOfStreamContext& context) {
91 dod->closeDataFiles();
92 context.services().get<ControlService>().readyToQuit(QuitRequest::Me);
93 };
94
95 auto& callbacks = ic.services().get<CallbackService>();
96 callbacks.set<CallbackService::Id::EndOfStream>(endofdatacb);
97
98 // prepare map<uint64_t, uint64_t>(startTime, tfNumber)
99 std::map<uint64_t, uint64_t> tfNumbers;
100 std::map<uint64_t, std::string> tfFilenames;
101
102 std::vector<TString> aodMetaDataKeys;
103 std::vector<TString> aodMetaDataVals;
104
105 // this functor is called once per time frame
106 return [dod, tfNumbers, tfFilenames, aodMetaDataKeys, aodMetaDataVals, compressionLevel](ProcessingContext& pc) mutable -> void {
107 LOGP(debug, "======== getGlobalAODSink::processing ==========");
108 LOGP(debug, " processing data set with {} entries", pc.inputs().size());
109
110 // return immediately if pc.inputs() is empty. This should never happen!
111 if (pc.inputs().size() == 0) {
112 LOGP(info, "No inputs available!");
113 return;
114 }
115
116 // update tfNumbers
117 uint64_t startTime = 0;
118 uint64_t tfNumber = 0;
119 auto ref = pc.inputs().get("tfn");
120 if (ref.spec && ref.payload) {
121 startTime = DataRefUtils::getHeader<DataProcessingHeader*>(ref)->startTime;
122 tfNumber = pc.inputs().get<uint64_t>("tfn");
123 tfNumbers.insert(std::pair<uint64_t, uint64_t>(startTime, tfNumber));
124 }
125 // update tfFilenames
126 std::string aodInputFile;
127 auto ref2 = pc.inputs().get("tff");
128 if (ref2.spec && ref2.payload) {
129 startTime = DataRefUtils::getHeader<DataProcessingHeader*>(ref2)->startTime;
130 aodInputFile = pc.inputs().get<std::string>("tff");
131 tfFilenames.insert(std::pair<uint64_t, std::string>(startTime, aodInputFile));
132 }
133
134 // close all output files if one has reached size limit
135 dod->checkFileSizes();
136
137 // loop over the DataRefs which are contained in pc.inputs()
138 for (const auto& ref : pc.inputs()) {
139 if (!ref.spec) {
140 LOGP(debug, "Invalid input will be skipped!");
141 continue;
142 }
143
144 // get metadata
145 if (DataSpecUtils::partialMatch(*ref.spec, header::DataDescription("AODMetadataKeys"))) {
146 aodMetaDataKeys = pc.inputs().get<std::vector<TString>>(ref.spec->binding);
147 }
148 if (DataSpecUtils::partialMatch(*ref.spec, header::DataDescription("AODMetadataVals"))) {
149 aodMetaDataVals = pc.inputs().get<std::vector<TString>>(ref.spec->binding);
150 }
151
152 // skip non-AOD refs
153 if (!DataSpecUtils::partialMatch(*ref.spec, writableAODOrigins)) {
154 continue;
155 }
156 startTime = DataRefUtils::getHeader<DataProcessingHeader*>(ref)->startTime;
157
158 // does this need to be saved?
159 auto dh = DataRefUtils::getHeader<header::DataHeader*>(ref);
160 auto tableName = dh->dataDescription.as<std::string>();
161 auto ds = dod->getDataOutputDescriptors(*dh);
162 if (ds.empty()) {
163 continue;
164 }
165
166 // get TF number from startTime
167 auto it = tfNumbers.find(startTime);
168 if (it != tfNumbers.end()) {
169 tfNumber = (it->second / dod->getNumberTimeFramesToMerge()) * dod->getNumberTimeFramesToMerge();
170 } else {
171 LOGP(fatal, "No time frame number found for output with start time {}", startTime);
172 throw std::runtime_error("Processing is stopped!");
173 }
174 // get aod input file from startTime
175 auto it2 = tfFilenames.find(startTime);
176 if (it2 != tfFilenames.end()) {
177 aodInputFile = it2->second;
178 }
179
180 // get the TableConsumer and corresponding arrow table
181 auto msg = pc.inputs().get(ref.spec->binding);
182 if (msg.header == nullptr) {
183 LOGP(error, "No header for message {}:{}", ref.spec->binding, DataSpecUtils::describe(*ref.spec));
184 continue;
185 }
186 auto s = pc.inputs().get<TableConsumer>(ref.spec->binding);
187 auto table = s->asArrowTable();
188 if (!table->Validate().ok()) {
189 LOGP(warning, "The table \"{}\" is not valid and will not be saved!", tableName);
190 continue;
191 }
192 if (table->schema()->fields().empty()) {
193 LOGP(debug, "The table \"{}\" is empty but will be saved anyway!", tableName);
194 }
195
196 // loop over all DataOutputDescriptors
197 // a table can be saved in multiple ways
198 // e.g. different selections of columns to different files
199 for (auto d : ds) {
200 auto fileAndFolder = dod->getFileFolder(d, tfNumber, aodInputFile, compressionLevel);
201 auto treename = fileAndFolder.folderName + "/" + d->treename;
202 TableToTree ta2tr(table,
203 fileAndFolder.file,
204 treename.c_str());
205
206 // update metadata
207 if (fileAndFolder.file->FindObjectAny("metaData")) {
208 LOGF(debug, "Metadata: target file %s already has metadata, preserving it", fileAndFolder.file->GetName());
209 } else if (!aodMetaDataKeys.empty() && !aodMetaDataVals.empty()) {
210 TMap aodMetaDataMap;
211 for (uint32_t imd = 0; imd < aodMetaDataKeys.size(); imd++) {
212 aodMetaDataMap.Add(new TObjString(aodMetaDataKeys[imd]), new TObjString(aodMetaDataVals[imd]));
213 }
214 fileAndFolder.file->WriteObject(&aodMetaDataMap, "metaData", "Overwrite");
215 }
216
217 if (!d->colnames.empty()) {
218 for (auto& cn : d->colnames) {
219 auto idx = table->schema()->GetFieldIndex(cn);
220 auto col = table->column(idx);
221 auto field = table->schema()->field(idx);
222 if (idx != -1) {
223 ta2tr.addBranch(col, field);
224 }
225 }
226 } else {
227 ta2tr.addAllBranches();
228 }
229 ta2tr.process();
230 }
231 }
232 };
233 }
234
235 };
236}
237
239{
240 using namespace monitoring;
241 auto& ac = ctx.services().get<AnalysisContext>();
242 auto tskmap = ac.outTskMap;
243 auto objmap = ac.outObjHistMap;
244
245 return AlgorithmSpec{[objmap, tskmap](InitContext& ic) -> std::function<void(ProcessingContext&)> {
246 auto& callbacks = ic.services().get<CallbackService>();
247 auto inputObjects = std::make_shared<std::vector<std::pair<InputObjectRoute, InputObject>>>();
248
250 for (auto i = 0u; i < OutputObjHandlingPolicy::numPolicies; ++i) {
251 f[i] = nullptr;
252 }
253
254 static std::string currentDirectory = "";
255 static std::string currentFile = "";
256
257 auto endofdatacb = [inputObjects](EndOfStreamContext& context) {
258 LOG(debug) << "Writing merged objects and histograms to file";
259 if (inputObjects->empty()) {
260 LOG(error) << "Output object map is empty!";
261 context.services().get<ControlService>().readyToQuit(QuitRequest::Me);
262 return;
263 }
264 for (auto i = 0u; i < OutputObjHandlingPolicy::numPolicies; ++i) {
265 if (f[i] != nullptr) {
266 f[i]->Close();
267 }
268 }
269 LOG(debug) << "All outputs merged in their respective target files";
270 context.services().get<ControlService>().readyToQuit(QuitRequest::Me);
271 };
272
273 callbacks.set<CallbackService::Id::EndOfStream>(endofdatacb);
274 return [inputObjects, objmap, tskmap](ProcessingContext& pc) mutable -> void {
275 auto mergePart = [&inputObjects, &objmap, &tskmap, &pc](DataRef const& ref) {
276 if (!ref.header) {
277 LOG(error) << "Header not found";
278 return;
279 }
280 auto datah = o2::header::get<o2::header::DataHeader*>(ref.header);
281 if (!datah) {
282 LOG(error) << "No data header in stack";
283 return;
284 }
285
286 if (!ref.payload) {
287 LOGP(error, "Payload not found for {}/{}/{}", datah->dataOrigin.as<std::string>(), datah->dataDescription.as<std::string>(), datah->subSpecification);
288 return;
289 }
290
291 auto objh = o2::header::get<o2::framework::OutputObjHeader*>(ref.header);
292 if (!objh) {
293 LOGP(error, "No output object header in stack of {}/{}/{}", datah->dataOrigin.as<std::string>(), datah->dataDescription.as<std::string>(), datah->subSpecification);
294 return;
295 }
296
297 InputObject obj;
298 FairInputTBuffer tm(const_cast<char*>(ref.payload), static_cast<int>(datah->payloadSize));
299 tm.InitMap();
300 obj.kind = tm.ReadClass();
301 tm.SetBufferOffset(0);
302 tm.ResetMap();
303 if (obj.kind == nullptr) {
304 LOGP(error, "Cannot read class info from buffer of {}/{}/{}", datah->dataOrigin.as<std::string>(), datah->dataDescription.as<std::string>(), datah->subSpecification);
305 return;
306 }
307
308 auto policy = objh->mPolicy;
309 auto sourceType = objh->mSourceType;
310 auto hash = objh->mTaskHash;
311
312 obj.obj = tm.ReadObjectAny(obj.kind);
313 auto* named = static_cast<TNamed*>(obj.obj);
314 obj.name = named->GetName();
315 auto hpos = std::find_if(tskmap.begin(), tskmap.end(), [&](auto&& x) { return x.id == hash; });
316 if (hpos == tskmap.end()) {
317 LOG(error) << "No task found for hash " << hash;
318 return;
319 }
320 auto taskname = hpos->name;
321 auto opos = std::find_if(objmap.begin(), objmap.end(), [&](auto&& x) { return x.id == hash; });
322 if (opos == objmap.end()) {
323 LOG(error) << "No object list found for task " << taskname << " (hash=" << hash << ")";
324 return;
325 }
326 auto objects = opos->bindings;
327 if (std::find(objects.begin(), objects.end(), obj.name) == objects.end()) {
328 LOG(error) << "No object " << obj.name << " in map for task " << taskname;
329 return;
330 }
331 auto nameHash = runtime_hash(obj.name.c_str());
332 InputObjectRoute key{obj.name, nameHash, taskname, hash, policy, sourceType};
333 auto existing = std::find_if(inputObjects->begin(), inputObjects->end(), [&](auto&& x) { return (x.first.uniqueId == nameHash) && (x.first.taskHash == hash); });
334 // If it's the first one, we just add it to the list.
335 if (existing == inputObjects->end()) {
336 obj.count = objh->mPipelineSize;
337 inputObjects->push_back(std::make_pair(key, obj));
338 existing = inputObjects->end() - 1;
339 } else {
340 obj.count = existing->second.count;
341 // Otherwise, we merge it with the existing one.
342 auto merger = existing->second.kind->GetMerge();
343 if (!merger) {
344 LOG(error) << "Already one unmergeable object found for " << obj.name;
345 return;
346 }
347 TList coll;
348 coll.Add(static_cast<TObject*>(obj.obj));
349 merger(existing->second.obj, &coll, nullptr);
350 }
351 // We expect as many objects as the pipeline size, for
352 // a given object name and task hash.
353 existing->second.count -= 1;
354
355 if (existing->second.count != 0) {
356 return;
357 }
358 // Write the object here.
359 auto route = existing->first;
360 auto entry = existing->second;
361 auto file = ROOTfileNames.find(route.policy);
362 if (file == ROOTfileNames.end()) {
363 return;
364 }
365 auto filename = file->second;
366 if (f[route.policy] == nullptr) {
367 f[route.policy] = TFile::Open(filename.c_str(), "RECREATE");
368 }
369 auto nextDirectory = route.directory;
370 if ((nextDirectory != currentDirectory) || (filename != currentFile)) {
371 if (!f[route.policy]->FindKey(nextDirectory.c_str())) {
372 f[route.policy]->mkdir(nextDirectory.c_str());
373 }
374 currentDirectory = nextDirectory;
375 currentFile = filename;
376 }
377
378 // translate the list-structure created by the registry into a directory structure within the file
379 std::function<void(TList*, TDirectory*)> writeListToFile;
380 writeListToFile = [&](TList* list, TDirectory* parentDir) {
381 TIter next(list);
382 TObject* object = nullptr;
383 while ((object = next())) {
384 if (object->InheritsFrom(TList::Class())) {
385 writeListToFile(static_cast<TList*>(object), parentDir->mkdir(object->GetName(), object->GetName(), true));
386 } else {
387 int objSize = parentDir->WriteObjectAny(object, object->Class(), object->GetName());
388 static int maxSizeWritten = 0;
389 if (objSize > maxSizeWritten) {
390 auto& monitoring = pc.services().get<Monitoring>();
391 maxSizeWritten = objSize;
392 monitoring.send(Metric{fmt::format("{}/{}:{}", object->ClassName(), object->GetName(), objSize), "aod-largest-object-written"}.addTag(tags::Key::Subsystem, tags::Value::DPL));
393 }
394 auto* written = list->Remove(object);
395 delete written;
396 }
397 }
398 };
399
400 TDirectory* currentDir = f[route.policy]->GetDirectory(currentDirectory.c_str());
401 if (route.sourceType == OutputObjSourceType::HistogramRegistrySource) {
402 auto* outputList = static_cast<TList*>(entry.obj);
403 outputList->SetOwner(false);
404
405 // if registry should live in dedicated folder a TNamed object is appended to the list
406 if (outputList->Last() && outputList->Last()->IsA() == TNamed::Class()) {
407 delete outputList->Last();
408 outputList->RemoveLast();
409 currentDir = currentDir->mkdir(outputList->GetName(), outputList->GetName(), true);
410 }
411
412 writeListToFile(outputList, currentDir);
413 outputList->SetOwner();
414 delete outputList;
415 entry.obj = nullptr;
416 } else {
417 currentDir->WriteObjectAny(entry.obj, entry.kind, entry.name.c_str());
418 delete (TObject*)entry.obj;
419 entry.obj = nullptr;
420 }
421 };
422 for (int pi = 0; pi < pc.inputs().getNofParts(0); ++pi) {
423 mergePart(pc.inputs().get("x", pi));
424 }
425 };
426 }};
427}
428} // namespace o2::framework::writers
std::vector< std::string > objects
o2::monitoring::Metric Metric
int32_t i
uint32_t col
Definition RawData.h:4
constexpr uint32_t runtime_hash(char const *str)
std::ostringstream debug
o2::monitoring::Monitoring Monitoring
StringRef key
ServiceRegistryRef services() const
ConfigParamRegistry & options() const
bool hasOption(const char *key) const
void addBranch(std::shared_ptr< arrow::ChunkedArray > const &column, std::shared_ptr< arrow::Field > const &field)
std::shared_ptr< TTree > process()
GLint GLenum GLint x
Definition glcorearb.h:403
GLint GLsizei count
Definition glcorearb.h:399
GLuint entry
Definition glcorearb.h:5735
GLdouble f
Definition glcorearb.h:310
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLuint object
Definition glcorearb.h:4041
GLint ref
Definition glcorearb.h:291
@ Me
Only quit this data processor.
OutputObjHandlingPolicy
Policy enum to determine OutputObj handling when writing.
std::string filename()
Definition list.h:40
std::vector< OutputTaskInfo > outTskMap
std::vector< InputSpec > outputsInputsAOD
static std::shared_ptr< DataOutputDirector > getDataOutputDirector(ConfigContext const &ctx)
Get the data director.
static bool partialMatch(InputSpec const &spec, o2::header::DataOrigin const &origin)
static std::string describe(InputSpec const &spec)
static AlgorithmSpec getOutputObjHistWriter(ConfigContext const &context)
static AlgorithmSpec getOutputTTreeWriter(ConfigContext const &context)
o2::mch::DsIndex ds
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
uint64_t const void const *restrict const msg
Definition x9.h:153