67 int compressionLevel = 505;
69 compressionLevel = ctx.
options().
get<
int>(
"aod-writer-compression");
72 LOGP(
debug,
"======== getGlobalAODSink::Init ==========");
75 bool hasOutputsToWrite =
false;
76 for (
auto& outobj : outputInputs) {
77 auto ds = dod->getDataOutputDescriptors(outobj);
79 hasOutputsToWrite =
true;
86 if (!hasOutputsToWrite) {
88 static bool once =
false;
90 LOG(info) <<
"No AODs to be saved.";
98 dod->closeDataFiles();
106 std::map<uint64_t, uint64_t> tfNumbers;
107 std::map<uint64_t, std::string> tfFilenames;
109 std::vector<TString> aodMetaDataKeys;
110 std::vector<TString> aodMetaDataVals;
113 return [dod, tfNumbers, tfFilenames, aodMetaDataKeys, aodMetaDataVals, compressionLevel](
ProcessingContext& pc)
mutable ->
void {
114 LOGP(
debug,
"======== getGlobalAODSink::processing ==========");
115 LOGP(
debug,
" processing data set with {} entries", pc.inputs().size());
118 if (pc.inputs().size() == 0) {
119 LOGP(info,
"No inputs available!");
124 uint64_t startTime = 0;
125 uint64_t tfNumber = 0;
126 auto ref = pc.inputs().get(
"tfn");
127 if (
ref.spec &&
ref.payload) {
128 startTime = DataRefUtils::getHeader<DataProcessingHeader*>(
ref)->startTime;
129 tfNumber = pc.inputs().get<uint64_t>(
"tfn");
130 tfNumbers.insert(std::pair<uint64_t, uint64_t>(startTime, tfNumber));
133 std::string aodInputFile;
134 auto ref2 = pc.inputs().get(
"tff");
135 if (ref2.spec && ref2.payload) {
136 startTime = DataRefUtils::getHeader<DataProcessingHeader*>(ref2)->startTime;
137 aodInputFile = pc.inputs().get<std::string>(
"tff");
138 tfFilenames.insert(std::pair<uint64_t, std::string>(startTime, aodInputFile));
142 dod->checkFileSizes();
145 for (
const auto&
ref : pc.inputs()) {
147 LOGP(
debug,
"Invalid input will be skipped!");
153 aodMetaDataKeys = pc.inputs().get<std::vector<TString>>(
ref.spec->binding);
156 aodMetaDataVals = pc.inputs().get<std::vector<TString>>(
ref.spec->binding);
163 startTime = DataRefUtils::getHeader<DataProcessingHeader*>(
ref)->startTime;
166 auto dh = DataRefUtils::getHeader<header::DataHeader*>(
ref);
167 auto tableName = dh->dataDescription.as<std::string>();
168 auto ds = dod->getDataOutputDescriptors(*dh);
174 auto it = tfNumbers.find(startTime);
175 if (it != tfNumbers.end()) {
176 tfNumber = (it->second / dod->getNumberTimeFramesToMerge()) * dod->getNumberTimeFramesToMerge();
178 LOGP(fatal,
"No time frame number found for output with start time {}", startTime);
179 throw std::runtime_error(
"Processing is stopped!");
182 auto it2 = tfFilenames.find(startTime);
183 if (it2 != tfFilenames.end()) {
184 aodInputFile = it2->second;
188 auto msg = pc.inputs().get(
ref.spec->binding);
189 if (
msg.header ==
nullptr) {
194 auto table = s->asArrowTable();
195 if (!table->Validate().ok()) {
196 LOGP(warning,
"The table \"{}\" is not valid and will not be saved!", tableName);
199 if (table->schema()->fields().empty()) {
200 LOGP(
debug,
"The table \"{}\" is empty but will be saved anyway!", tableName);
207 auto fileAndFolder = dod->getFileFolder(d, tfNumber, aodInputFile, compressionLevel);
208 auto treename = fileAndFolder.folderName +
"/" + d->treename;
214 if (fileAndFolder.file->FindObjectAny(
"metaData")) {
215 LOGF(
debug,
"Metadata: target file %s already has metadata, preserving it", fileAndFolder.file->GetName());
216 }
else if (!aodMetaDataKeys.empty() && !aodMetaDataVals.empty()) {
218 for (uint32_t imd = 0; imd < aodMetaDataKeys.size(); imd++) {
219 aodMetaDataMap.Add(
new TObjString(aodMetaDataKeys[imd]),
new TObjString(aodMetaDataVals[imd]));
221 fileAndFolder.file->WriteObject(&aodMetaDataMap,
"metaData",
"Overwrite");
224 if (!d->colnames.empty()) {
225 for (
auto& cn : d->colnames) {
226 auto idx = table->schema()->GetFieldIndex(cn);
227 auto col = table->column(idx);
228 auto field = table->schema()->field(idx);
247 using namespace monitoring;
250 auto objmap = ac.outObjHistMap;
254 auto inputObjects = std::make_shared<std::vector<std::pair<InputObjectRoute, InputObject>>>();
261 static std::string currentDirectory =
"";
262 static std::string currentFile =
"";
265 LOG(
debug) <<
"Writing merged objects and histograms to file";
266 if (inputObjects->empty()) {
267 LOG(error) <<
"Output object map is empty!";
272 if (
f[
i] !=
nullptr) {
276 LOG(
debug) <<
"All outputs merged in their respective target files";
282 auto mergePart = [&inputObjects, &objmap, &tskmap, &pc](
DataRef const&
ref) {
289 auto datah = o2::header::get<o2::header::DataHeader*>(
ref.header);
297 datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
298 datah->subSpecification);
302 auto objh = o2::header::get<o2::framework::OutputObjHeader*>(
ref.header);
304 O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid,
"mergePart",
"No output object header in stack of %{public}s/%{public}s/%d.",
305 datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
306 datah->subSpecification);
313 obj.
kind = tm.ReadClass();
314 tm.SetBufferOffset(0);
317 O2_SIGNPOST_START(histogram_registry, did,
"initialising root",
"Starting deserialization of %{public}s/%{public}s/%d",
318 datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
319 datah->subSpecification);
320 if (obj.
kind ==
nullptr) {
321 O2_SIGNPOST_END(histogram_registry, did,
"initialising root",
"Failed to deserialise");
322 O2_SIGNPOST_END_WITH_ERROR(histogram_registry, hid,
"mergePart",
"Cannot read class info from buffer of %{public}s/%{public}s/%d.",
323 datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
324 datah->subSpecification);
327 O2_SIGNPOST_END(histogram_registry, did,
"initialising root",
"Done init.");
329 auto policy = objh->mPolicy;
330 auto sourceType = objh->mSourceType;
331 auto hash = objh->mTaskHash;
332 O2_SIGNPOST_START(histogram_registry, did,
"deserialization",
"Starting deserialization of %{public}s/%{public}s/%d",
333 datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
334 datah->subSpecification);
336 obj.
obj = tm.ReadObjectAny(obj.
kind);
337 auto* named =
static_cast<TNamed*
>(obj.
obj);
338 obj.
name = named->GetName();
339 O2_SIGNPOST_END(histogram_registry, did,
"deserialization",
"Done deserialization.");
347 auto hpos = std::find_if(tskmap.begin(), tskmap.end(), [&](
auto&&
x) { return x.id == hash; });
348 if (hpos == tskmap.end()) {
352 auto taskname = hpos->name;
353 auto opos = std::find_if(objmap.begin(), objmap.end(), [&](
auto&&
x) { return x.id == hash; });
354 if (opos == objmap.end()) {
356 taskname.c_str(), hash);
362 obj.
container.c_str(), taskname.c_str());
367 auto existing = std::find_if(inputObjects->begin(), inputObjects->end(), [&](
auto&&
x) { return (x.first.uniqueId == nameHash) && (x.first.taskHash == hash); });
369 O2_SIGNPOST_START(histogram_registry, did,
"merging",
"Starting merging of %{public}s/%{public}s/%d",
370 datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
371 datah->subSpecification);
372 if (existing == inputObjects->end()) {
373 obj.
count = objh->mPipelineSize;
374 inputObjects->emplace_back(
key, obj);
375 existing = inputObjects->end() - 1;
377 obj.
count = existing->second.count;
379 auto merger = existing->second.kind->GetMerge();
381 O2_SIGNPOST_END(histogram_registry, did,
"merging",
"Unabled to merge");
387 merger(existing->second.obj, &coll,
nullptr);
391 existing->second.count -= 1;
393 if (existing->second.count != 0) {
394 O2_SIGNPOST_END(histogram_registry, did,
"merging",
"Done partial merging.");
395 O2_SIGNPOST_END(histogram_registry, hid,
"mergePart",
"Pipeline lanes still missing.");
400 auto route = existing->first;
401 auto entry = existing->second;
402 auto file = ROOTfileNames.find(route.policy);
403 if (
file == ROOTfileNames.end()) {
404 O2_SIGNPOST_END(histogram_registry, hid,
"mergePart",
"Not matching any file.");
407 O2_SIGNPOST_START(histogram_registry, did,
"writing",
"Starting writing of %{public}s/%{public}s/%d",
408 datah->dataOrigin.as<std::string>().c_str(), datah->dataDescription.as<std::string>().c_str(),
409 datah->subSpecification);
411 if (
f[route.policy] ==
nullptr) {
412 f[route.policy] = TFile::Open(
filename.c_str(),
"RECREATE");
414 auto nextDirectory = route.directory;
415 if ((nextDirectory != currentDirectory) || (
filename != currentFile)) {
416 if (!
f[route.policy]->FindKey(nextDirectory.c_str())) {
417 f[route.policy]->mkdir(nextDirectory.c_str());
419 currentDirectory = nextDirectory;
424 f[route.policy]->cd(
"/");
425 auto* currentDir =
f[route.policy]->GetDirectory(currentDirectory.c_str());
429 TDirectory* currentFolder = currentDir;
431 currentDir->GetName());
432 std::string objName =
entry.name;
433 auto lastSlash =
entry.name.rfind(
'/');
435 if (lastSlash != std::string::npos) {
436 auto dirname =
entry.name.substr(0, lastSlash);
437 objName =
entry.name.substr(lastSlash + 1);
438 currentFolder = currentDir->GetDirectory(dirname.c_str());
439 if (!currentFolder) {
442 currentFolder = currentDir->mkdir(dirname.c_str(),
"", kTRUE);
445 currentFolder->GetName());
448 O2_SIGNPOST_EVENT_EMIT(histogram_registry, hid,
"mergePart",
"Writing %{public}s of kind %{public}s in %{public}s",
449 entry.name.c_str(),
entry.kind->GetName(), currentDir->GetName());
450 objSize = currentFolder->WriteObjectAny(
entry.obj,
entry.kind, objName.c_str());
451 O2_SIGNPOST_END(histogram_registry, did,
"writing",
"End writing %{public}s",
entry.name.c_str());
455 O2_SIGNPOST_EVENT_EMIT(histogram_registry, hid,
"mergePart",
"Writing %{public}s of kind %{public}s in %{public}s",
456 entry.name.c_str(),
entry.kind->GetName(), currentDir->GetName());
457 objSize = currentDir->WriteObjectAny(
entry.obj,
entry.kind,
entry.name.c_str());
458 O2_SIGNPOST_END(histogram_registry, did,
"writing",
"End writing %{public}s",
entry.name.c_str());
462 O2_SIGNPOST_END(histogram_registry, hid,
"mergePart",
"Done merging object of %d bytes.", objSize);
465 O2_SIGNPOST_START(histogram_registry, rid,
"processParts",
"Start merging %zu parts received together.", pc.inputs().getNofParts(0));
466 for (
int pi = 0; pi < pc.inputs().getNofParts(0); ++pi) {
467 mergePart(pc.inputs().get(
"x", pi));
469 O2_SIGNPOST_END(histogram_registry, rid,
"processParts",
"Done histograms in multipart message.");