63 int compressionLevel = 505;
65 compressionLevel = ctx.
options().
get<
int>(
"aod-writer-compression");
69 LOGP(
debug,
"======== getGlobalAODSink::Init ==========");
72 bool hasOutputsToWrite =
false;
73 for (
auto& outobj : outputInputs) {
74 auto ds = dod->getDataOutputDescriptors(outobj);
76 hasOutputsToWrite =
true;
83 if (!hasOutputsToWrite) {
85 static bool once =
false;
87 LOG(info) <<
"No AODs to be saved.";
95 dod->closeDataFiles();
103 std::map<uint64_t, uint64_t> tfNumbers;
104 std::map<uint64_t, std::string> tfFilenames;
106 std::vector<TString> aodMetaDataKeys;
107 std::vector<TString> aodMetaDataVals;
110 return [dod, tfNumbers, tfFilenames, aodMetaDataKeys, aodMetaDataVals, compressionLevel](
ProcessingContext& pc)
mutable ->
void {
111 LOGP(
debug,
"======== getGlobalAODSink::processing ==========");
112 LOGP(
debug,
" processing data set with {} entries", pc.inputs().size());
115 if (pc.inputs().size() == 0) {
116 LOGP(info,
"No inputs available!");
121 uint64_t startTime = 0;
122 uint64_t tfNumber = 0;
123 auto ref = pc.inputs().get(
"tfn");
124 if (
ref.spec &&
ref.payload) {
125 startTime = DataRefUtils::getHeader<DataProcessingHeader*>(
ref)->startTime;
126 tfNumber = pc.inputs().get<uint64_t>(
"tfn");
127 tfNumbers.insert(std::pair<uint64_t, uint64_t>(startTime, tfNumber));
130 std::string aodInputFile;
131 auto ref2 = pc.inputs().get(
"tff");
132 if (ref2.spec && ref2.payload) {
133 startTime = DataRefUtils::getHeader<DataProcessingHeader*>(ref2)->startTime;
134 aodInputFile = pc.inputs().get<std::string>(
"tff");
135 tfFilenames.insert(std::pair<uint64_t, std::string>(startTime, aodInputFile));
139 dod->checkFileSizes();
142 for (
const auto&
ref : pc.inputs()) {
144 LOGP(
debug,
"Invalid input will be skipped!");
150 aodMetaDataKeys = pc.inputs().get<std::vector<TString>>(
ref.spec->binding);
153 aodMetaDataVals = pc.inputs().get<std::vector<TString>>(
ref.spec->binding);
160 startTime = DataRefUtils::getHeader<DataProcessingHeader*>(
ref)->startTime;
163 auto dh = DataRefUtils::getHeader<header::DataHeader*>(
ref);
164 auto tableName = dh->dataDescription.as<std::string>();
165 auto ds = dod->getDataOutputDescriptors(*dh);
171 auto it = tfNumbers.find(startTime);
172 if (it != tfNumbers.end()) {
173 tfNumber = (it->second / dod->getNumberTimeFramesToMerge()) * dod->getNumberTimeFramesToMerge();
175 LOGP(fatal,
"No time frame number found for output with start time {}", startTime);
176 throw std::runtime_error(
"Processing is stopped!");
179 auto it2 = tfFilenames.find(startTime);
180 if (it2 != tfFilenames.end()) {
181 aodInputFile = it2->second;
185 if (
ref.header ==
nullptr) {
190 auto table = pc.inputs().get<
TableConsumer>(std::get<ConcreteDataMatcher>(
ref.spec->matcher))->asArrowTable();
191 if (!table->Validate().ok()) {
192 LOGP(warning,
"The table \"{}\" is not valid and will not be saved!", tableName);
195 if (table->schema()->fields().empty()) {
196 LOGP(
debug,
"The table \"{}\" is empty but will be saved anyway!", tableName);
203 auto fileAndFolder = dod->getFileFolder(d, tfNumber, aodInputFile, compressionLevel);
204 auto treename = fileAndFolder.folderName +
"/" + d->treename;
210 if (fileAndFolder.file->FindObjectAny(
"metaData")) {
211 LOGF(
debug,
"Metadata: target file %s already has metadata, preserving it", fileAndFolder.file->GetName());
212 }
else if (!aodMetaDataKeys.empty() && !aodMetaDataVals.empty()) {
214 for (uint32_t imd = 0; imd < aodMetaDataKeys.size(); imd++) {
215 aodMetaDataMap.Add(
new TObjString(aodMetaDataKeys[imd]),
new TObjString(aodMetaDataVals[imd]));
217 fileAndFolder.file->WriteObject(&aodMetaDataMap,
"metaData",
"Overwrite");
220 if (!d->colnames.empty()) {
221 for (
auto& cn : d->colnames) {
222 auto idx = table->schema()->GetFieldIndex(cn);
223 auto col = table->column(idx);
224 auto field = table->schema()->field(idx);
244 using namespace monitoring;
247 auto objmap = dec.outObjHistMap;
249 auto inputObjects = std::make_shared<std::vector<std::pair<InputObjectRoute, InputObject>>>();
256 static std::string currentDirectory =
"";
257 static std::string currentFile =
"";
260 LOG(
debug) <<
"Writing merged objects and histograms to file";
261 if (inputObjects->empty()) {
262 LOG(error) <<
"Output object map is empty!";
267 if (
f[
i] !=
nullptr) {
271 LOG(
debug) <<
"All outputs merged in their respective target files";
277 auto mergePart = [&inputObjects, &objmap, &tskmap](
DataRef const&
ref) {
284 auto datah = o2::header::get<o2::header::DataHeader*>(
ref.header);
292 datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
293 datah->subSpecification);
297 auto objh = o2::header::get<o2::framework::OutputObjHeader*>(
ref.header);
299 O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid,
"mergePart",
"No output object header in stack of %{public}s/%{public}s/%d.",
300 datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
301 datah->subSpecification);
308 obj.
kind = tm.ReadClass();
309 tm.SetBufferOffset(0);
312 O2_SIGNPOST_START(histogram_registry, did,
"initialising root",
"Starting deserialization of %{public}s/%{public}s/%d",
313 datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
314 datah->subSpecification);
315 if (obj.
kind ==
nullptr) {
316 O2_SIGNPOST_END(histogram_registry, did,
"initialising root",
"Failed to deserialise");
317 O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid,
"mergePart",
"Cannot read class info from buffer of %{public}s/%{public}s/%d.",
318 datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
319 datah->subSpecification);
322 O2_SIGNPOST_END(histogram_registry, did,
"initialising root",
"Done init.");
324 auto policy = objh->mPolicy;
325 auto sourceType = objh->mSourceType;
326 auto hash = objh->mTaskHash;
327 O2_SIGNPOST_START(histogram_registry, did,
"deserialization",
"Starting deserialization of %{public}s/%{public}s/%d",
328 datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
329 datah->subSpecification);
331 obj.
obj = tm.ReadObjectAny(obj.
kind);
332 auto* named =
static_cast<TNamed*
>(obj.
obj);
333 obj.
name = named->GetName();
334 O2_SIGNPOST_END(histogram_registry, did,
"deserialization",
"Done deserialization.");
337 bool folderForContainer =
false;
339 folderForContainer = objh->createContainer != 0;
344 auto hpos = std::find_if(tskmap.begin(), tskmap.end(), [&](
auto&&
x) { return x.id == hash; });
345 if (hpos == tskmap.end()) {
349 auto taskname = hpos->name;
350 auto opos = std::find_if(objmap.begin(), objmap.end(), [&](
auto&&
x) { return x.id == hash; });
351 if (opos == objmap.end()) {
353 taskname.c_str(),
hash);
359 obj.
container.c_str(), taskname.c_str());
364 auto existing = std::find_if(inputObjects->begin(), inputObjects->end(), [&](
auto&&
x) { return (x.first.uniqueId == nameHash) && (x.first.taskHash == hash); });
366 O2_SIGNPOST_START(histogram_registry, did,
"merging",
"Starting merging of %{public}s/%{public}s/%d",
367 datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
368 datah->subSpecification);
369 if (existing == inputObjects->end()) {
370 obj.
count = objh->mPipelineSize;
371 inputObjects->emplace_back(
key, obj);
372 existing = inputObjects->end() - 1;
374 obj.
count = existing->second.count;
376 auto merger = existing->second.kind->GetMerge();
378 O2_SIGNPOST_END(histogram_registry, did,
"merging",
"Unabled to merge");
384 merger(existing->second.obj, &coll,
nullptr);
388 existing->second.count -= 1;
390 if (existing->second.count != 0) {
391 O2_SIGNPOST_END(histogram_registry, did,
"merging",
"Done partial merging.");
392 O2_SIGNPOST_END(histogram_registry, hid,
"mergePart",
"Pipeline lanes still missing.");
397 auto route = existing->first;
398 auto entry = existing->second;
399 auto file = ROOTfileNames.find(route.policy);
400 if (file == ROOTfileNames.end()) {
401 O2_SIGNPOST_END(histogram_registry, hid,
"mergePart",
"Not matching any file.");
404 O2_SIGNPOST_START(histogram_registry, did,
"writing",
"Starting writing of %{public}s/%{public}s/%d",
405 datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
406 datah->subSpecification);
408 if (
f[route.policy] ==
nullptr) {
409 f[route.policy] = TFile::Open(
filename.c_str(),
"RECREATE");
411 auto nextDirectory = route.directory;
412 if ((nextDirectory != currentDirectory) || (
filename != currentFile)) {
413 if (!
f[route.policy]->FindKey(nextDirectory.c_str())) {
414 f[route.policy]->mkdir(nextDirectory.c_str());
416 currentDirectory = nextDirectory;
421 f[route.policy]->cd(
"/");
422 auto* currentDir =
f[route.policy]->GetDirectory(currentDirectory.c_str());
425 if (folderForContainer) {
426 auto* histogramRegistryFolder = currentDir->GetDirectory(obj.
container.data());
427 if (!histogramRegistryFolder) {
428 histogramRegistryFolder = currentDir->mkdir(obj.
container.c_str(),
"", kTRUE);
430 currentDir = histogramRegistryFolder;
436 TDirectory* currentFolder = currentDir;
438 currentDir->GetName());
439 std::string objName =
entry.name;
440 auto lastSlash =
entry.name.rfind(
'/');
442 if (lastSlash != std::string::npos) {
443 auto dirname =
entry.name.substr(0, lastSlash);
444 objName =
entry.name.substr(lastSlash + 1);
445 currentFolder = currentDir->GetDirectory(dirname.c_str());
446 if (!currentFolder) {
449 currentFolder = currentDir->mkdir(dirname.c_str(),
"", kTRUE);
452 currentFolder->GetName());
455 O2_SIGNPOST_EVENT_EMIT(histogram_registry, hid,
"mergePart",
"Writing %{public}s of kind %{public}s in %{public}s",
456 entry.name.c_str(),
entry.kind->GetName(), currentDir->GetName());
457 objSize = currentFolder->WriteObjectAny(
entry.obj,
entry.kind, objName.c_str());
458 O2_SIGNPOST_END(histogram_registry, did,
"writing",
"End writing %{public}s",
entry.name.c_str());
462 O2_SIGNPOST_EVENT_EMIT(histogram_registry, hid,
"mergePart",
"Writing %{public}s of kind %{public}s in %{public}s",
463 entry.name.c_str(),
entry.kind->GetName(), currentDir->GetName());
464 objSize = currentDir->WriteObjectAny(
entry.obj,
entry.kind,
entry.name.c_str());
465 O2_SIGNPOST_END(histogram_registry, did,
"writing",
"End writing %{public}s",
entry.name.c_str());
469 O2_SIGNPOST_END(histogram_registry, hid,
"mergePart",
"Done merging object of %d bytes.", objSize);
472 O2_SIGNPOST_START(histogram_registry, rid,
"processParts",
"Start merging %zu parts received together.", pc.inputs().getNofParts(0));
473 for (
auto pi = 0U; pi < pc.inputs().getNofParts(0); ++pi) {
474 mergePart(pc.inputs().get(
"x", pi));
476 O2_SIGNPOST_END(histogram_registry, rid,
"processParts",
"Done histograms in multipart message.");