60 int compressionLevel = 505;
62 compressionLevel = ctx.
options().
get<
int>(
"aod-writer-compression");
65 LOGP(
debug,
"======== getGlobalAODSink::Init ==========");
68 bool hasOutputsToWrite =
false;
69 for (
auto& outobj : outputInputs) {
70 auto ds = dod->getDataOutputDescriptors(outobj);
72 hasOutputsToWrite =
true;
79 if (!hasOutputsToWrite) {
81 static bool once =
false;
83 LOG(info) <<
"No AODs to be saved.";
91 dod->closeDataFiles();
99 std::map<uint64_t, uint64_t> tfNumbers;
100 std::map<uint64_t, std::string> tfFilenames;
102 std::vector<TString> aodMetaDataKeys;
103 std::vector<TString> aodMetaDataVals;
106 return [dod, tfNumbers, tfFilenames, aodMetaDataKeys, aodMetaDataVals, compressionLevel](
ProcessingContext& pc)
mutable ->
void {
107 LOGP(
debug,
"======== getGlobalAODSink::processing ==========");
108 LOGP(
debug,
" processing data set with {} entries", pc.inputs().size());
111 if (pc.inputs().size() == 0) {
112 LOGP(info,
"No inputs available!");
117 uint64_t startTime = 0;
118 uint64_t tfNumber = 0;
119 auto ref = pc.inputs().get(
"tfn");
120 if (
ref.spec &&
ref.payload) {
121 startTime = DataRefUtils::getHeader<DataProcessingHeader*>(
ref)->startTime;
122 tfNumber = pc.inputs().get<uint64_t>(
"tfn");
123 tfNumbers.insert(std::pair<uint64_t, uint64_t>(startTime, tfNumber));
126 std::string aodInputFile;
127 auto ref2 = pc.inputs().get(
"tff");
128 if (ref2.spec && ref2.payload) {
129 startTime = DataRefUtils::getHeader<DataProcessingHeader*>(ref2)->startTime;
130 aodInputFile = pc.inputs().get<std::string>(
"tff");
131 tfFilenames.insert(std::pair<uint64_t, std::string>(startTime, aodInputFile));
135 dod->checkFileSizes();
138 for (
const auto&
ref : pc.inputs()) {
140 LOGP(
debug,
"Invalid input will be skipped!");
146 aodMetaDataKeys = pc.inputs().get<std::vector<TString>>(
ref.spec->binding);
149 aodMetaDataVals = pc.inputs().get<std::vector<TString>>(
ref.spec->binding);
156 startTime = DataRefUtils::getHeader<DataProcessingHeader*>(
ref)->startTime;
159 auto dh = DataRefUtils::getHeader<header::DataHeader*>(
ref);
160 auto tableName = dh->dataDescription.as<std::string>();
161 auto ds = dod->getDataOutputDescriptors(*dh);
167 auto it = tfNumbers.find(startTime);
168 if (it != tfNumbers.end()) {
169 tfNumber = (it->second / dod->getNumberTimeFramesToMerge()) * dod->getNumberTimeFramesToMerge();
171 LOGP(fatal,
"No time frame number found for output with start time {}", startTime);
172 throw std::runtime_error(
"Processing is stopped!");
175 auto it2 = tfFilenames.find(startTime);
176 if (it2 != tfFilenames.end()) {
177 aodInputFile = it2->second;
181 auto msg = pc.inputs().get(
ref.spec->binding);
182 if (
msg.header ==
nullptr) {
187 auto table = s->asArrowTable();
188 if (!table->Validate().ok()) {
189 LOGP(warning,
"The table \"{}\" is not valid and will not be saved!", tableName);
192 if (table->schema()->fields().empty()) {
193 LOGP(
debug,
"The table \"{}\" is empty but will be saved anyway!", tableName);
200 auto fileAndFolder = dod->getFileFolder(d, tfNumber, aodInputFile, compressionLevel);
201 auto treename = fileAndFolder.folderName +
"/" + d->treename;
207 if (fileAndFolder.file->FindObjectAny(
"metaData")) {
208 LOGF(
debug,
"Metadata: target file %s already has metadata, preserving it", fileAndFolder.file->GetName());
209 }
else if (!aodMetaDataKeys.empty() && !aodMetaDataVals.empty()) {
211 for (uint32_t imd = 0; imd < aodMetaDataKeys.size(); imd++) {
212 aodMetaDataMap.Add(
new TObjString(aodMetaDataKeys[imd]),
new TObjString(aodMetaDataVals[imd]));
214 fileAndFolder.file->WriteObject(&aodMetaDataMap,
"metaData",
"Overwrite");
217 if (!d->colnames.empty()) {
218 for (
auto& cn : d->colnames) {
219 auto idx = table->schema()->GetFieldIndex(cn);
220 auto col = table->column(idx);
221 auto field = table->schema()->field(idx);
240 using namespace monitoring;
243 auto objmap = ac.outObjHistMap;
247 auto inputObjects = std::make_shared<std::vector<std::pair<InputObjectRoute, InputObject>>>();
254 static std::string currentDirectory =
"";
255 static std::string currentFile =
"";
258 LOG(
debug) <<
"Writing merged objects and histograms to file";
259 if (inputObjects->empty()) {
260 LOG(error) <<
"Output object map is empty!";
265 if (
f[
i] !=
nullptr) {
269 LOG(
debug) <<
"All outputs merged in their respective target files";
275 auto mergePart = [&inputObjects, &objmap, &tskmap, &pc](
DataRef const&
ref) {
277 LOG(error) <<
"Header not found";
280 auto datah = o2::header::get<o2::header::DataHeader*>(
ref.header);
282 LOG(error) <<
"No data header in stack";
287 LOGP(error,
"Payload not found for {}/{}/{}", datah->dataOrigin.as<std::string>(), datah->dataDescription.as<std::string>(), datah->subSpecification);
291 auto objh = o2::header::get<o2::framework::OutputObjHeader*>(
ref.header);
293 LOGP(error,
"No output object header in stack of {}/{}/{}", datah->dataOrigin.as<std::string>(), datah->dataDescription.as<std::string>(), datah->subSpecification);
300 obj.
kind = tm.ReadClass();
301 tm.SetBufferOffset(0);
303 if (obj.
kind ==
nullptr) {
304 LOGP(error,
"Cannot read class info from buffer of {}/{}/{}", datah->dataOrigin.as<std::string>(), datah->dataDescription.as<std::string>(), datah->subSpecification);
308 auto policy = objh->mPolicy;
309 auto sourceType = objh->mSourceType;
310 auto hash = objh->mTaskHash;
312 obj.
obj = tm.ReadObjectAny(obj.
kind);
313 auto* named =
static_cast<TNamed*
>(obj.
obj);
314 obj.
name = named->GetName();
315 auto hpos = std::find_if(tskmap.begin(), tskmap.end(), [&](
auto&&
x) { return x.id == hash; });
316 if (hpos == tskmap.end()) {
317 LOG(error) <<
"No task found for hash " << hash;
320 auto taskname = hpos->name;
321 auto opos = std::find_if(objmap.begin(), objmap.end(), [&](
auto&&
x) { return x.id == hash; });
322 if (opos == objmap.end()) {
323 LOG(error) <<
"No object list found for task " << taskname <<
" (hash=" << hash <<
")";
328 LOG(error) <<
"No object " << obj.
name <<
" in map for task " << taskname;
333 auto existing = std::find_if(inputObjects->begin(), inputObjects->end(), [&](
auto&&
x) { return (x.first.uniqueId == nameHash) && (x.first.taskHash == hash); });
335 if (existing == inputObjects->end()) {
336 obj.
count = objh->mPipelineSize;
337 inputObjects->push_back(std::make_pair(
key, obj));
338 existing = inputObjects->end() - 1;
340 obj.
count = existing->second.count;
342 auto merger = existing->second.kind->GetMerge();
344 LOG(error) <<
"Already one unmergeable object found for " << obj.
name;
349 merger(existing->second.obj, &coll,
nullptr);
353 existing->second.count -= 1;
355 if (existing->second.count != 0) {
359 auto route = existing->first;
360 auto entry = existing->second;
361 auto file = ROOTfileNames.find(route.policy);
362 if (
file == ROOTfileNames.end()) {
366 if (
f[route.policy] ==
nullptr) {
367 f[route.policy] = TFile::Open(
filename.c_str(),
"RECREATE");
369 auto nextDirectory = route.directory;
370 if ((nextDirectory != currentDirectory) || (
filename != currentFile)) {
371 if (!
f[route.policy]->FindKey(nextDirectory.c_str())) {
372 f[route.policy]->mkdir(nextDirectory.c_str());
374 currentDirectory = nextDirectory;
379 std::function<
void(TList*, TDirectory*)> writeListToFile;
380 writeListToFile = [&](TList*
list, TDirectory* parentDir) {
383 while ((
object = next())) {
384 if (
object->InheritsFrom(TList::Class())) {
385 writeListToFile(
static_cast<TList*
>(
object), parentDir->mkdir(
object->GetName(),
object->GetName(),
true));
387 int objSize = parentDir->WriteObjectAny(
object,
object->Class(),
object->GetName());
388 static int maxSizeWritten = 0;
389 if (objSize > maxSizeWritten) {
390 auto& monitoring = pc.services().get<
Monitoring>();
391 maxSizeWritten = objSize;
392 monitoring.send(
Metric{fmt::format(
"{}/{}:{}",
object->ClassName(),
object->GetName(), objSize),
"aod-largest-object-written"}.addTag(tags::Key::Subsystem, tags::Value::DPL));
394 auto* written =
list->Remove(
object);
400 TDirectory* currentDir =
f[route.policy]->GetDirectory(currentDirectory.c_str());
402 auto* outputList =
static_cast<TList*
>(
entry.obj);
403 outputList->SetOwner(
false);
406 if (outputList->Last() && outputList->Last()->IsA() == TNamed::Class()) {
407 delete outputList->Last();
408 outputList->RemoveLast();
409 currentDir = currentDir->mkdir(outputList->GetName(), outputList->GetName(),
true);
412 writeListToFile(outputList, currentDir);
413 outputList->SetOwner();
422 for (
int pi = 0; pi < pc.inputs().getNofParts(0); ++pi) {
423 mergePart(pc.inputs().get(
"x", pi));