149 std::unique_ptr<MessagesPerRoute> messagesPerRoute = std::make_unique<MessagesPerRoute>();
150 auto& msgMap = *messagesPerRoute.get();
152 std::unordered_map<o2::header::DataHeader, std::pair<std::string, bool>> channelsMap;
153 auto findOutputChannel = [&outputRoutes, &rawChannel, &channelsMap](
const o2::header::DataHeader*
h,
size_t tslice) ->
const std::string& {
154 if (!rawChannel.empty()) {
157 auto& chFromMap = channelsMap[*
h];
158 if (chFromMap.first.empty() && !chFromMap.second) {
159 chFromMap.second =
true;
160 for (
auto& oroute : outputRoutes) {
161 LOG(
debug) <<
"comparing with matcher to route " << oroute.matcher <<
" TSlice:" << oroute.timeslice;
162 if (o2f::DataSpecUtils::match(oroute.matcher,
h->dataOrigin,
h->dataDescription,
h->subSpecification) && ((tslice % oroute.maxTimeslices) == oroute.timeslice)) {
163 LOG(
debug) <<
"picking the route:" << o2f::DataSpecUtils::describe(oroute.matcher) <<
" channel " << oroute.channel;
164 chFromMap.first = oroute.channel;
169 return chFromMap.first;
172 auto addPart = [&msgMap](fair::mq::MessagePtr hd, fair::mq::MessagePtr pl,
const std::string& fairMQChannel) {
173 fair::mq::Parts* parts =
nullptr;
174 parts = msgMap[fairMQChannel].get();
176 msgMap[fairMQChannel] = std::make_unique<fair::mq::Parts>();
177 parts = msgMap[fairMQChannel].get();
179 parts->AddPart(std::move(hd));
180 parts->AddPart(std::move(pl));
184 const auto lTfStartPosition = mFile->position();
186 if (lTfStartPosition == mFile->size() || !mFile || !mFile->isGood() || mFile->eof()) {
193 bool negativeOrbitNotified =
false, noRunNumberNotified =
false, creation0Notified =
false;
194 std::size_t lMetaHdrStackSize = 0;
199 auto dph = o2::header::get<o2f::DataProcessingHeader*>(
st.data());
200 auto dh = o2::header::get<o2::header::DataHeader*>(
st.data());
201 LOGP(info,
"TF#{} Header for {}/{}/{} @ tfCounter {} run {} | {} of {} size {}, TForbit {} | DPH: {}/{}/{}", tfID,
202 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->runNumber,
203 dh->splitPayloadIndex, dh->splitPayloadParts, dh->payloadSize, dh->firstTForbit,
204 dph ? dph->startTime : 0, dph ? dph->duration : 0, dph ? dph->creation : 0);
208 auto lMetaHdrStack = getHeaderStack(lMetaHdrStackSize);
209 if (lMetaHdrStackSize == 0) {
210 LOG(error) <<
"Failed to read the TF file header. The file might be corrupted.";
211 mFile.reset(
nullptr);
215 if (mVerbosity > 0) {
216 LOGP(info,
"read filemeta, pos = {}, size = {}", mFile->position(),
sizeof(
SubTimeFrameFileMeta));
221 if (mVerbosity > 0) {
222 LOGP(info,
"TFMeta : {}", lStfFileMeta.
info());
224 if (lStfFileMeta.
mWriteTimeMs == 0 && creationFallBack != 0) {
225 if (!creation0Notified) {
226 creation0Notified =
true;
227 LOGP(warn,
"Creation time 0 for timeSlice:{}, redefine to {}", tfID, creationFallBack);
236 LOGP(warning,
"Reading bad data: SubTimeFrame META header");
237 mFile.reset(
nullptr);
244 LOGP(warning,
"Reading an empty TF from file. Only meta information present");
245 mFile.reset(
nullptr);
250 if ((lTfStartPosition + lStfSizeInFile) > mFile->size()) {
251 LOGP(warning,
"Not enough data in file for this TF. Required: {}, available: {}", lStfSizeInFile, (mFile->size() - lTfStartPosition));
252 mFile.reset(
nullptr);
257 std::size_t lStfIndexHdrStackSize = 0;
261 auto lStfIndexHdrStack = getHeaderStack(lStfIndexHdrStackSize);
262 if (lStfIndexHdrStackSize == 0) {
263 mFile.reset(
nullptr);
268 LOG(error) <<
"Failed to read the TF index structure. The file might be corrupted.";
272 if (!mFile->ignore_nbytes(lStfIndexHdr->
payloadSize)) {
275#ifdef _RUN_TIMING_MEASUREMENT_
276 TStopwatch readSW, findChanSW, msgSW, addPartSW;
285 std::int64_t lLeftToRead = lStfDataSize;
289 while (lLeftToRead > 0) {
291 std::size_t lDataHeaderStackSize = 0;
292 Stack lDataHeaderStack = getHeaderStack(lDataHeaderStackSize);
293 if (lDataHeaderStackSize == 0) {
294 mFile.reset(
nullptr);
299 LOG(error) <<
"Failed to read the TF HBF DataHeader structure. The file might be corrupted.";
300 mFile.reset(
nullptr);
305 if (mRepaireHeaders) {
306 if (locDataHeader == prevHeader) {
308 if (mVerbosity > 3) {
315 if (mVerbosity > 2) {
321 prevHeader = locDataHeader;
325 if (!negativeOrbitNotified) {
326 LOGP(warn,
"Negative orbit for timeSlice:{} tfCounter:{} runNumber:{}, redefine to {}", tfID, locDataHeader.
tfCounter, locDataHeader.
runNumber, firstTForbitFallBack);
327 negativeOrbitNotified =
true;
332 if (!noRunNumberNotified) {
333 LOGP(warn,
"runNumber is 0 for timeSlice:{} tfCounter:{}, redefine to {}", tfID, locDataHeader.
tfCounter, runNumberFallBack);
334 noRunNumberNotified =
true;
336 locDataHeader.
runNumber = runNumberFallBack;
338 const std::uint64_t lDataSize = locDataHeader.
payloadSize;
341 if (mVerbosity > 0) {
342 LOGP(warn,
"Ignoring stored {}", describeHeader(locDataHeader));
344 if (!mFile->ignore_nbytes(lDataSize)) {
347 lLeftToRead -= (lDataHeaderStackSize + lDataSize);
351 if (stfHeader.runNumber == -1) {
353 stfHeader.runNumber = locDataHeader.
runNumber;
355 std::lock_guard<std::mutex> lock(
stfMtx);
360 auto detOrigStatus = mDetOrigMap.find(locDataHeader.
dataOrigin);
361 if (detOrigStatus != mDetOrigMap.end() && !detOrigStatus->second) {
362 if (!mFile->ignore_nbytes(lDataSize)) {
365 lLeftToRead -= (lDataHeaderStackSize + lDataSize);
368#ifdef _RUN_TIMING_MEASUREMENT_
369 findChanSW.Start(
false);
371 const auto& fmqChannel = findOutputChannel(&locDataHeader, tfID);
372#ifdef _RUN_TIMING_MEASUREMENT_
375 if (fmqChannel.empty()) {
376 if (!mFile->ignore_nbytes(lDataSize)) {
379 lLeftToRead -= (lDataHeaderStackSize + lDataSize);
386 auto fmqFactory = device->GetChannel(fmqChannel, 0).Transport();
387#ifdef _RUN_TIMING_MEASUREMENT_
390 auto lHdrStackMsg = fmqFactory->CreateMessage(headerStack.size(), fair::mq::Alignment{64});
391 auto lDataMsg = fmqFactory->CreateMessage(lDataSize, fair::mq::Alignment{64});
392#ifdef _RUN_TIMING_MEASUREMENT_
395 memcpy(lHdrStackMsg->GetData(), headerStack.data(), headerStack.size());
396 LOGP(
debug,
"read data, pos = {}, size = {} leftToRead {}", mFile->position(), lDataSize, lLeftToRead);
398 if (!read_advance(lDataMsg->GetData(), lDataSize)) {
401 if (mVerbosity > 0) {
403 printStack(headerStack);
409#ifdef _RUN_TIMING_MEASUREMENT_
410 addPartSW.Start(
false);
412 if (mVerbosity > 2) {
413 LOGP(info,
"addPart {} to {} | HdrSize:{} DataSize:{}", describeHeader(locDataHeader,
true), fmqChannel, lHdrStackMsg->GetSize(), lDataMsg->GetSize());
415 addPart(std::move(lHdrStackMsg), std::move(lDataMsg), fmqChannel);
416#ifdef _RUN_TIMING_MEASUREMENT_
420 lLeftToRead -= (lDataHeaderStackSize + lDataSize);
423 if (lLeftToRead < 0) {
424 LOG(error) <<
"FileRead: Read more data than it is indicated in the META header!";
429 if (stfHeader.runNumber == -1u) {
430 stfHeader.runNumber = runNumberFallBack;
431 stfHeader.firstOrbit = firstTForbitFallBack;
432 LOGP(info,
"Empty TF#{}, fallback to previous runNumber:{} firstTForbit:{}", tfID, stfHeader.runNumber, stfHeader.firstOrbit);
435 unsigned stfSS[2] = {0, 0xccdb};
436 for (
int iss = 0; iss < (mSup0xccdb ? 1 : 2); iss++) {
440 stfDistDataHeader.
runNumber = stfHeader.runNumber;
441 stfDistDataHeader.
tfCounter = stfHeader.id;
442 const auto fmqChannel = findOutputChannel(&stfDistDataHeader, tfID);
443 if (!fmqChannel.empty()) {
444 auto fmqFactory = device->GetChannel(fmqChannel, 0).Transport();
446 if (mVerbosity > 0) {
447 printStack(headerStackSTF);
449 auto hdMessageSTF = fmqFactory->CreateMessage(headerStackSTF.size(), fair::mq::Alignment{64});
450 auto plMessageSTF = fmqFactory->CreateMessage(stfDistDataHeader.
payloadSize, fair::mq::Alignment{64});
451 memcpy(hdMessageSTF->GetData(), headerStackSTF.data(), headerStackSTF.size());
452 memcpy(plMessageSTF->GetData(), &stfHeader,
sizeof(
STFHeader));
453#ifdef _RUN_TIMING_MEASUREMENT_
454 addPartSW.Start(
false);
456 if (mVerbosity > 2) {
457 LOGP(info,
"addPart forced {} to {} | HdrSize:{} DataSize:{}", describeHeader(stfDistDataHeader,
true), fmqChannel, hdMessageSTF->GetSize(), plMessageSTF->GetSize());
459 addPart(std::move(hdMessageSTF), std::move(plMessageSTF), fmqChannel);
460#ifdef _RUN_TIMING_MEASUREMENT_
466#ifdef _RUN_TIMING_MEASUREMENT_
468 LOG(info) <<
"TF creation time: CPU: " << readSW.CpuTime() <<
" Wall: " << readSW.RealTime() <<
" s";
469 LOG(info) <<
"AddPart Timer CPU: " << addPartSW.CpuTime() <<
" Wall: " << addPartSW.RealTime() <<
" s";
470 LOG(info) <<
"CreMsg Timer CPU: " << msgSW.CpuTime() <<
" Wall: " << msgSW.RealTime() <<
" s";
471 LOG(info) <<
"FndChan Timer CPU: " << findChanSW.CpuTime() <<
" Wall: " << findChanSW.RealTime() <<
" s";
473 return messagesPerRoute;