197 std::unique_ptr<MessagesPerRoute> messagesPerRoute = std::make_unique<MessagesPerRoute>();
198 auto& msgMap = *messagesPerRoute.get();
200 std::unordered_map<o2::header::DataHeader, std::pair<std::string, bool>> channelsMap;
201 auto findOutputChannel = [&outputRoutes, &rawChannel, &channelsMap](
const o2::header::DataHeader*
h,
size_t tslice) ->
const std::string& {
202 if (!rawChannel.empty()) {
205 auto& chFromMap = channelsMap[*
h];
206 if (chFromMap.first.empty() && !chFromMap.second) {
207 chFromMap.second =
true;
208 for (
auto& oroute : outputRoutes) {
209 LOG(
debug) <<
"comparing with matcher to route " << oroute.matcher <<
" TSlice:" << oroute.timeslice;
210 if (o2f::DataSpecUtils::match(oroute.matcher,
h->dataOrigin,
h->dataDescription,
h->subSpecification) && ((tslice % oroute.maxTimeslices) == oroute.timeslice)) {
211 LOG(
debug) <<
"picking the route:" << o2f::DataSpecUtils::describe(oroute.matcher) <<
" channel " << oroute.channel;
212 chFromMap.first = oroute.channel;
217 return chFromMap.first;
220 auto addPart = [&msgMap](fair::mq::MessagePtr hd, fair::mq::MessagePtr pl,
const std::string& fairMQChannel) {
221 fair::mq::Parts* parts =
nullptr;
222 parts = msgMap[fairMQChannel].get();
224 msgMap[fairMQChannel] = std::make_unique<fair::mq::Parts>();
225 parts = msgMap[fairMQChannel].get();
227 parts->AddPart(std::move(hd));
228 parts->AddPart(std::move(pl));
232 const auto lTfStartPosition =
position();
234 if (lTfStartPosition ==
size() || !mFileMap.is_open() ||
eof()) {
241 bool negativeOrbitNotified =
false, noRunNumberNotified =
false, creation0Notified =
false;
242 std::size_t lMetaHdrStackSize = 0;
247 auto dph = o2::header::get<o2f::DataProcessingHeader*>(
st.data());
248 auto dh = o2::header::get<o2::header::DataHeader*>(
st.data());
249 LOGP(info,
"TF#{} Header for {}/{}/{} @ tfCounter {} run {} | {} of {} size {}, TForbit {} | DPH: {}/{}/{}", tfID,
250 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->runNumber,
251 dh->splitPayloadIndex, dh->splitPayloadParts, dh->payloadSize, dh->firstTForbit,
252 dph ? dph->startTime : 0, dph ? dph->duration : 0, dph ? dph->creation : 0);
256 auto lMetaHdrStack = getHeaderStack(lMetaHdrStackSize);
257 if (lMetaHdrStackSize == 0) {
258 LOG(error) <<
"Failed to read the TF file header. The file might be corrupted.";
263 if (mVerbosity > 0) {
269 if (mVerbosity > 0) {
270 LOGP(info,
"TFMeta : {}", lStfFileMeta.
info());
272 if (lStfFileMeta.
mWriteTimeMs == 0 && creationFallBack != 0) {
273 if (!creation0Notified) {
274 creation0Notified =
true;
275 LOGP(warn,
"Creation time 0 for timeSlice:{}, redefine to {}", tfID, creationFallBack);
284 LOGP(warning,
"Reading bad data: SubTimeFrame META header");
292 LOGP(warning,
"Reading an empty TF from file. Only meta information present");
298 if ((lTfStartPosition + lStfSizeInFile) > this->
size()) {
299 LOGP(warning,
"Not enough data in file for this TF. Required: {}, available: {}", lStfSizeInFile, (this->
size() - lTfStartPosition));
305 std::size_t lStfIndexHdrStackSize = 0;
309 auto lStfIndexHdrStack = getHeaderStack(lStfIndexHdrStackSize);
310 if (lStfIndexHdrStackSize == 0) {
316 LOG(error) <<
"Failed to read the TF index structure. The file might be corrupted.";
323#ifdef _RUN_TIMING_MEASUREMENT_
324 TStopwatch readSW, findChanSW, msgSW, addPartSW;
333 std::int64_t lLeftToRead = lStfDataSize;
337 while (lLeftToRead > 0) {
339 std::size_t lDataHeaderStackSize = 0;
340 Stack lDataHeaderStack = getHeaderStack(lDataHeaderStackSize);
341 if (lDataHeaderStackSize == 0) {
347 LOG(error) <<
"Failed to read the TF HBF DataHeader structure. The file might be corrupted.";
353 if (mRepaireHeaders) {
354 if (locDataHeader == prevHeader) {
356 if (mVerbosity > 3) {
363 if (mVerbosity > 2) {
369 prevHeader = locDataHeader;
373 if (!negativeOrbitNotified) {
374 LOGP(warn,
"Negative orbit for timeSlice:{} tfCounter:{} runNumber:{}, redefine to {}", tfID, locDataHeader.
tfCounter, locDataHeader.
runNumber, firstTForbitFallBack);
375 negativeOrbitNotified =
true;
380 if (!noRunNumberNotified) {
381 LOGP(warn,
"runNumber is 0 for timeSlice:{} tfCounter:{}, redefine to {}", tfID, locDataHeader.
tfCounter, runNumberFallBack);
382 noRunNumberNotified =
true;
384 locDataHeader.
runNumber = runNumberFallBack;
386 const std::uint64_t lDataSize = locDataHeader.
payloadSize;
389 if (mVerbosity > 0) {
390 LOGP(warn,
"Ignoring stored {}", describeHeader(locDataHeader));
392 if (!ignore_nbytes(lDataSize)) {
395 lLeftToRead -= (lDataHeaderStackSize + lDataSize);
399 if (stfHeader.runNumber == -1) {
401 stfHeader.runNumber = locDataHeader.
runNumber;
403 std::lock_guard<std::mutex> lock(
stfMtx);
408 auto detOrigStatus = mDetOrigMap.find(locDataHeader.
dataOrigin);
409 if (detOrigStatus != mDetOrigMap.end() && !detOrigStatus->second) {
410 if (!ignore_nbytes(lDataSize)) {
413 lLeftToRead -= (lDataHeaderStackSize + lDataSize);
416#ifdef _RUN_TIMING_MEASUREMENT_
417 findChanSW.Start(
false);
419 const auto& fmqChannel = findOutputChannel(&locDataHeader, tfID);
420#ifdef _RUN_TIMING_MEASUREMENT_
423 if (fmqChannel.empty()) {
424 if (!ignore_nbytes(lDataSize)) {
427 lLeftToRead -= (lDataHeaderStackSize + lDataSize);
434 auto fmqFactory = device->GetChannel(fmqChannel, 0).Transport();
435#ifdef _RUN_TIMING_MEASUREMENT_
438 auto lHdrStackMsg = fmqFactory->CreateMessage(headerStack.size(), fair::mq::Alignment{64});
439 auto lDataMsg = fmqFactory->CreateMessage(lDataSize, fair::mq::Alignment{64});
440#ifdef _RUN_TIMING_MEASUREMENT_
443 memcpy(lHdrStackMsg->GetData(), headerStack.data(), headerStack.size());
444 LOGP(
debug,
"read data, pos = {}, size = {} leftToRead {}",
position(), lDataSize, lLeftToRead);
446 if (!read_advance(lDataMsg->GetData(), lDataSize)) {
449 if (mVerbosity > 0) {
451 printStack(headerStack);
457#ifdef _RUN_TIMING_MEASUREMENT_
458 addPartSW.Start(
false);
460 if (mVerbosity > 2) {
461 LOGP(info,
"addPart {} to {} | HdrSize:{} DataSize:{}", describeHeader(locDataHeader,
true), fmqChannel, lHdrStackMsg->GetSize(), lDataMsg->GetSize());
463 addPart(std::move(lHdrStackMsg), std::move(lDataMsg), fmqChannel);
464#ifdef _RUN_TIMING_MEASUREMENT_
468 lLeftToRead -= (lDataHeaderStackSize + lDataSize);
471 if (lLeftToRead < 0) {
472 LOG(error) <<
"FileRead: Read more data than it is indicated in the META header!";
477 if (stfHeader.runNumber == -1u) {
478 stfHeader.runNumber = runNumberFallBack;
479 stfHeader.firstOrbit = firstTForbitFallBack;
480 LOGP(info,
"Empty TF#{}, fallback to previous runNumber:{} firstTForbit:{}", tfID, stfHeader.runNumber, stfHeader.firstOrbit);
483 unsigned stfSS[2] = {0, 0xccdb};
484 for (
int iss = 0; iss < (mSup0xccdb ? 1 : 2); iss++) {
488 stfDistDataHeader.
runNumber = stfHeader.runNumber;
489 stfDistDataHeader.
tfCounter = stfHeader.id;
490 const auto fmqChannel = findOutputChannel(&stfDistDataHeader, tfID);
491 if (!fmqChannel.empty()) {
492 auto fmqFactory = device->GetChannel(fmqChannel, 0).Transport();
494 if (mVerbosity > 0) {
495 printStack(headerStackSTF);
497 auto hdMessageSTF = fmqFactory->CreateMessage(headerStackSTF.size(), fair::mq::Alignment{64});
498 auto plMessageSTF = fmqFactory->CreateMessage(stfDistDataHeader.
payloadSize, fair::mq::Alignment{64});
499 memcpy(hdMessageSTF->GetData(), headerStackSTF.data(), headerStackSTF.size());
500 memcpy(plMessageSTF->GetData(), &stfHeader,
sizeof(
STFHeader));
501#ifdef _RUN_TIMING_MEASUREMENT_
502 addPartSW.Start(
false);
504 if (mVerbosity > 2) {
505 LOGP(info,
"addPart forced {} to {} | HdrSize:{} DataSize:{}", describeHeader(stfDistDataHeader,
true), fmqChannel, hdMessageSTF->GetSize(), plMessageSTF->GetSize());
507 addPart(std::move(hdMessageSTF), std::move(plMessageSTF), fmqChannel);
508#ifdef _RUN_TIMING_MEASUREMENT_
514#ifdef _RUN_TIMING_MEASUREMENT_
516 LOG(info) <<
"TF creation time: CPU: " << readSW.CpuTime() <<
" Wall: " << readSW.RealTime() <<
" s";
517 LOG(info) <<
"AddPart Timer CPU: " << addPartSW.CpuTime() <<
" Wall: " << addPartSW.RealTime() <<
" s";
518 LOG(info) <<
"CreMsg Timer CPU: " << msgSW.CpuTime() <<
" Wall: " << msgSW.RealTime() <<
" s";
519 LOG(info) <<
"FndChan Timer CPU: " << findChanSW.CpuTime() <<
" Wall: " << findChanSW.RealTime() <<
" s";
521 return messagesPerRoute;