81 if ((mWriteDebug || mWriteDebugOnError) && !mDebugStream) {
82 const auto debugFileName = fmt::format(fmt::runtime(mDebugStreamFileName), fmt::arg(
"run", runNumber));
83 LOGP(info,
"Creating debug stream {}", debugFileName);
84 mDebugStream = std::make_unique<o2::utils::TreeStreamRedirector>(debugFileName.data(),
"recreate");
87 if (mWriteRawDataOnError && !mRawOutputFile.is_open()) {
88 std::string_view rawType = (mRawDataType < 2) ?
"tf" :
"raw";
89 if (mRawDataType == 5) {
92 const auto rawFileName = fmt::format(fmt::runtime(mRawOutputFileName), fmt::arg(
"run", runNumber), fmt::arg(
"raw_type", rawType));
93 LOGP(info,
"Creating raw debug file {}", rawFileName);
94 mRawOutputFile.open(rawFileName, std::ios::binary);
97 uint32_t tfCounter = 0;
98 bool hasErrors =
false;
101 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
102 tfCounter = dh->tfCounter;
103 const auto subSpecification = dh->subSpecification;
105 LOGP(
debug,
"Processing TF {}, subSpecification {}, payloadSize {}", tfCounter, subSpecification, payloadSize);
108 const gsl::span<const char>
raw = pc.inputs().get<gsl::span<char>>(
ref);
111 size_t lastErrorCount = 0;
113 for (
auto it = parser.
begin(),
end = parser.
end(); it !=
end; ++it) {
114 const auto size = it.size();
128 if (!rdhPtr || rdhVersion < 6) {
129 throw std::runtime_error(fmt::format(
"could not get RDH from packet, or version {} < 6", rdhVersion).
data());
133 const auto feeId = (
FEEIDType)RDHUtils::getFEEID(*rdhPtr);
134 const auto link = rdh_utils::getLink(feeId);
135 const uint32_t cruID = rdh_utils::getCRU(feeId);
136 const auto detField = RDHUtils::getDetectorField(*rdhPtr);
138 LOGP(
debug,
"Detected CMV packet: CRU {}, link {}, feeId {}", cruID,
link, feeId);
140 if ((detField != (
decltype(detField))RawDataType::CMV) || (
link != rdh_utils::CMVLinkID)) {
141 LOGP(
debug,
"Skipping packet: detField {}, (expected RawDataType {}), link {}, (expected CMVLinkID {})", detField, (
decltype(detField))RawDataType::CMV,
link, rdh_utils::CMVLinkID);
145 LOGP(
debug,
"Processing firstTForbit {:9}, tfCounter {:5}, run {:6}, feeId {:6}, cruID {:3}, link {:2}", dh->firstTForbit, dh->tfCounter, dh->runNumber, feeId, cruID,
link);
147 if (std::find(mCRUs.begin(), mCRUs.end(), cruID) == mCRUs.end()) {
152 auto& cmvVec = mCMVvectors[cruID];
153 auto& infoVec = mCMVInfos[cruID];
156 LOGP(warning,
"CMV packet size mismatch: got {} bytes, expected {} bytes (sizeof cmv::Container). Skipping package.",
size,
sizeof(
cmv::Container));
160 auto data = it.data();
162 const uint32_t
orbit = cmvs.header.heartbeatOrbit;
163 const uint16_t
bc = cmvs.header.heartbeatBC;
166 infoVec.emplace_back(
orbit,
bc);
167 cmvVec.reserve(cmvVec.size() + cmv::NTimeBinsPerPacket);
168 for (uint32_t tb = 0; tb < cmv::NTimeBinsPerPacket; ++tb) {
169 cmvVec.push_back(cmvs.getCMV(tb));
173 }
catch (
const std::exception& e) {
175 using namespace std::literals::chrono_literals;
176 static std::unordered_map<uint32_t, size_t> nErrorPerSubspec;
177 static std::chrono::time_point<std::chrono::steady_clock> lastReport = std::chrono::steady_clock::now();
178 const auto now = std::chrono::steady_clock::now();
179 static size_t reportedErrors = 0;
180 const size_t MAXERRORS = 10;
182 ++nErrorPerSubspec[subSpecification];
185 if (reportedErrors < MAXERRORS) {
187 std::string sleepInfo;
188 if (reportedErrors == MAXERRORS) {
189 sleepInfo = fmt::format(
", maximum error count ({}) reached, not reporting for the next {}", MAXERRORS,
sleepTime);
191 LOGP(alarm,
"EXCEPTION in processRawData: {} -> skipping part:{}/{} of spec:{}/{}/{}, size:{}, error count for subspec: {}{}", e.what(), dh->splitPayloadIndex, dh->splitPayloadParts,
192 dh->dataOrigin, dh->dataDescription, subSpecification, payloadSize, nErrorPerSubspec.at(subSpecification), sleepInfo);
203 hasErrors |= snapshotCMVs(pc.outputs());
205 if (mWriteDebug || (mWriteDebugOnError && hasErrors)) {
206 writeDebugOutput(tfCounter);
209 if (mWriteRawDataOnError && hasErrors) {
210 writeRawData(pc.inputs());