187 const std::string& rawChannel,
size_t slice,
bool sup0xccdb,
int verbosity)
189 std::unique_ptr<MessagesPerRoute> messagesPerRoute = std::make_unique<MessagesPerRoute>();
190 auto& msgMap = *messagesPerRoute.get();
192 std::unordered_map<o2::header::DataHeader, std::pair<std::string, bool>> channelsMap;
193 auto findOutputChannel = [&outputRoutes, &rawChannel, &channelsMap](
const o2::header::DataHeader*
h,
size_t tslice) ->
const std::string& {
194 if (!rawChannel.empty()) {
197 auto& chFromMap = channelsMap[*
h];
198 if (chFromMap.first.empty() && !chFromMap.second) {
199 chFromMap.second =
true;
200 for (
auto& oroute : outputRoutes) {
201 LOG(
debug) <<
"comparing with matcher to route " << oroute.matcher <<
" TSlice:" << oroute.timeslice;
202 if (o2f::DataSpecUtils::match(oroute.matcher,
h->dataOrigin,
h->dataDescription,
h->subSpecification) && ((tslice % oroute.maxTimeslices) == oroute.timeslice)) {
203 LOG(
debug) <<
"picking the route:" << o2f::DataSpecUtils::describe(oroute.matcher) <<
" channel " << oroute.channel;
204 chFromMap.first = oroute.channel;
209 return chFromMap.first;
212 auto addPart = [&msgMap](fair::mq::MessagePtr hd, fair::mq::MessagePtr pl,
const std::string& fairMQChannel) {
213 fair::mq::Parts* parts =
nullptr;
214 parts = msgMap[fairMQChannel].get();
216 msgMap[fairMQChannel] = std::make_unique<fair::mq::Parts>();
217 parts = msgMap[fairMQChannel].get();
219 parts->AddPart(std::move(hd));
220 parts->AddPart(std::move(pl));
224 const auto lTfStartPosition =
position();
226 if (lTfStartPosition ==
size() || !mFileMap.is_open() ||
eof()) {
233 bool negativeOrbitNotified =
false, noRunNumberNotified =
false, creation0Notified =
false;
234 std::size_t lMetaHdrStackSize = 0;
239 auto dph = o2::header::get<o2f::DataProcessingHeader*>(
st.data());
240 auto dh = o2::header::get<o2::header::DataHeader*>(
st.data());
241 LOGP(info,
"TF#{} Header for {}/{}/{} @ tfCounter {} run {} | {} of {} size {}, TForbit {} | DPH: {}/{}/{}", tfID,
242 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->runNumber,
243 dh->splitPayloadIndex, dh->splitPayloadParts, dh->payloadSize, dh->firstTForbit,
244 dph ? dph->startTime : 0, dph ? dph->duration : 0, dph ? dph->creation : 0);
248 auto lMetaHdrStack = getHeaderStack(lMetaHdrStackSize);
249 if (lMetaHdrStackSize == 0) {
250 LOG(error) <<
"Failed to read the TF file header. The file might be corrupted.";
258 if (lStfFileMeta.
mWriteTimeMs == 0 && creationFallBack != 0) {
259 if (!creation0Notified) {
260 creation0Notified =
true;
261 LOGP(warn,
"Creation time 0 for timeSlice:{}, redefine to {}", tfID, creationFallBack);
270 LOGP(warning,
"Reading bad data: SubTimeFrame META header");
278 LOGP(warning,
"Reading an empty TF from file. Only meta information present");
284 if ((lTfStartPosition + lStfSizeInFile) > this->
size()) {
285 LOGP(warning,
"Not enough data in file for this TF. Required: {}, available: {}", lStfSizeInFile, (this->
size() - lTfStartPosition));
291 std::size_t lStfIndexHdrStackSize = 0;
295 auto lStfIndexHdrStack = getHeaderStack(lStfIndexHdrStackSize);
296 if (lStfIndexHdrStackSize == 0) {
302 LOG(error) <<
"Failed to read the TF index structure. The file might be corrupted.";
309#ifdef _RUN_TIMING_MEASUREMENT_
310 TStopwatch readSW, findChanSW, msgSW, addPartSW;
319 std::int64_t lLeftToRead = lStfDataSize;
322 while (lLeftToRead > 0) {
325 std::size_t lDataHeaderStackSize = 0;
326 Stack lDataHeaderStack = getHeaderStack(lDataHeaderStackSize);
327 if (lDataHeaderStackSize == 0) {
333 LOG(error) <<
"Failed to read the TF HBF DataHeader structure. The file might be corrupted.";
340 if (!negativeOrbitNotified) {
341 LOGP(warn,
"Negative orbit for timeSlice:{} tfCounter:{} runNumber:{}, redefine to {}", tfID, locDataHeader.
tfCounter, locDataHeader.
runNumber, firstTForbitFallBack);
342 negativeOrbitNotified =
true;
347 if (!noRunNumberNotified) {
348 LOGP(warn,
"runNumber is 0 for timeSlice:{} tfCounter:{}, redefine to {}", tfID, locDataHeader.
tfCounter, runNumberFallBack);
349 noRunNumberNotified =
true;
351 locDataHeader.
runNumber = runNumberFallBack;
354 if (stfHeader.runNumber == -1) {
356 stfHeader.runNumber = locDataHeader.
runNumber;
358 std::lock_guard<std::mutex> lock(
stfMtx);
363 const std::uint64_t lDataSize = locDataHeader.
payloadSize;
365 auto detOrigStatus = mDetOrigMap.find(locDataHeader.
dataOrigin);
366 if (detOrigStatus != mDetOrigMap.end() && !detOrigStatus->second) {
367 if (!ignore_nbytes(lDataSize)) {
370 lLeftToRead -= (lDataHeaderStackSize + lDataSize);
373#ifdef _RUN_TIMING_MEASUREMENT_
374 findChanSW.Start(
false);
376 const auto& fmqChannel = findOutputChannel(&locDataHeader, tfID);
377#ifdef _RUN_TIMING_MEASUREMENT_
380 if (fmqChannel.empty()) {
381 if (!ignore_nbytes(lDataSize)) {
384 lLeftToRead -= (lDataHeaderStackSize + lDataSize);
391 auto fmqFactory = device->GetChannel(fmqChannel, 0).Transport();
392#ifdef _RUN_TIMING_MEASUREMENT_
395 auto lHdrStackMsg = fmqFactory->CreateMessage(headerStack.size(), fair::mq::Alignment{64});
396 auto lDataMsg = fmqFactory->CreateMessage(lDataSize, fair::mq::Alignment{64});
397#ifdef _RUN_TIMING_MEASUREMENT_
400 memcpy(lHdrStackMsg->GetData(), headerStack.data(), headerStack.size());
402 if (!read_advance(lDataMsg->GetData(), lDataSize)) {
407 printStack(headerStack);
413#ifdef _RUN_TIMING_MEASUREMENT_
414 addPartSW.Start(
false);
416 addPart(std::move(lHdrStackMsg), std::move(lDataMsg), fmqChannel);
417#ifdef _RUN_TIMING_MEASUREMENT_
421 lLeftToRead -= (lDataHeaderStackSize + lDataSize);
424 if (lLeftToRead < 0) {
425 LOG(error) <<
"FileRead: Read more data than it is indicated in the META header!";
430 if (stfHeader.runNumber == -1u) {
431 stfHeader.runNumber = runNumberFallBack;
432 stfHeader.firstOrbit = firstTForbitFallBack;
433 LOGP(info,
"Empty TF#{}, fallback to previous runNumber:{} firstTForbit:{}", tfID, stfHeader.runNumber, stfHeader.firstOrbit);
436 unsigned stfSS[2] = {0, 0xccdb};
437 for (
int iss = 0; iss < (sup0xccdb ? 1 : 2); iss++) {
441 stfDistDataHeader.
runNumber = stfHeader.runNumber;
442 stfDistDataHeader.
tfCounter = stfHeader.id;
443 const auto fmqChannel = findOutputChannel(&stfDistDataHeader, tfID);
444 if (!fmqChannel.empty()) {
445 auto fmqFactory = device->GetChannel(fmqChannel, 0).Transport();
448 printStack(headerStackSTF);
450 auto hdMessageSTF = fmqFactory->CreateMessage(headerStackSTF.size(), fair::mq::Alignment{64});
451 auto plMessageSTF = fmqFactory->CreateMessage(stfDistDataHeader.
payloadSize, fair::mq::Alignment{64});
452 memcpy(hdMessageSTF->GetData(), headerStackSTF.data(), headerStackSTF.size());
453 memcpy(plMessageSTF->GetData(), &stfHeader,
sizeof(
STFHeader));
454#ifdef _RUN_TIMING_MEASUREMENT_
455 addPartSW.Start(
false);
457 addPart(std::move(hdMessageSTF), std::move(plMessageSTF), fmqChannel);
458#ifdef _RUN_TIMING_MEASUREMENT_
464#ifdef _RUN_TIMING_MEASUREMENT_
466 LOG(info) <<
"TF creation time: CPU: " << readSW.CpuTime() <<
" Wall: " << readSW.RealTime() <<
" s";
467 LOG(info) <<
"AddPart Timer CPU: " << addPartSW.CpuTime() <<
" Wall: " << addPartSW.RealTime() <<
" s";
468 LOG(info) <<
"CreMsg Timer CPU: " << msgSW.CpuTime() <<
" Wall: " << msgSW.RealTime() <<
" s";
469 LOG(info) <<
"FndChan Timer CPU: " << findChanSW.CpuTime() <<
" Wall: " << findChanSW.RealTime() <<
" s";
471 return messagesPerRoute;