82 if ((mWriteDebug || mWriteDebugOnError) && !mDebugStream) {
83 const auto debugFileName = fmt::format(fmt::runtime(mDebugStreamFileName), fmt::arg(
"run", runNumber));
84 LOGP(info,
"Creating debug stream {}", debugFileName);
85 mDebugStream = std::make_unique<o2::utils::TreeStreamRedirector>(debugFileName.data(),
"recreate");
88 if (mWriteRawDataOnError && !mRawOutputFile.is_open()) {
89 std::string_view rawType = (mRawDataType < 2) ?
"tf" :
"raw";
90 if (mRawDataType == 5) {
93 const auto rawFileName = fmt::format(fmt::runtime(mRawOutputFileName), fmt::arg(
"run", runNumber), fmt::arg(
"raw_type", rawType));
94 LOGP(info,
"Creating raw debug file {}", rawFileName);
95 mRawOutputFile.open(rawFileName, std::ios::binary);
98 uint32_t heartbeatOrbit = 0;
99 uint16_t heartbeatBC = 0;
100 uint32_t tfCounter = 0;
102 bool hasErrors =
false;
105 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
106 tfCounter = dh->tfCounter;
107 const auto subSpecification = dh->subSpecification;
109 LOGP(
debug,
"Processing TF {}, subSpecification {}, payloadSize {}", tfCounter, subSpecification, payloadSize);
112 const gsl::span<const char> raw = pc.inputs().get<gsl::span<char>>(
ref);
115 size_t lastErrorCount = 0;
117 for (
auto it = parser.
begin(),
end = parser.
end(); it !=
end; ++it) {
118 const auto size = it.size();
132 if (!rdhPtr || rdhVersion < 6) {
133 throw std::runtime_error(fmt::format(
"could not get RDH from packet, or version {} < 6", rdhVersion).
data());
137 const auto feeId = (
FEEIDType)RDHUtils::getFEEID(*rdhPtr);
138 const auto link = rdh_utils::getLink(feeId);
139 const uint32_t cruID = rdh_utils::getCRU(feeId);
140 const auto detField = RDHUtils::getDetectorField(*rdhPtr);
142 LOGP(
debug,
"Detected CMV packet: CRU {}, link {}, feeId {}", cruID, link, feeId);
144 if ((detField != (
decltype(detField))RawDataType::CMV) || (link != rdh_utils::CMVLinkID)) {
145 LOGP(
debug,
"Skipping packet: detField {}, (expected RawDataType {}), link {}, (expected CMVLinkID {})", detField, (
decltype(detField))RawDataType::CMV, link, rdh_utils::CMVLinkID);
149 LOGP(
debug,
"Processing firstTForbit {:9}, tfCounter {:5}, run {:6}, feeId {:6}, cruID {:3}, link {:2}", dh->firstTForbit, dh->tfCounter, dh->runNumber, feeId, cruID, link);
151 if (std::find(mCRUs.begin(), mCRUs.end(), cruID) == mCRUs.end()) {
152 LOGP(warning,
"CMV CRU {:3} not configured in CRUs, skipping", cruID);
156 auto& cmvVec = mCMVvectors[cruID];
157 auto& infoVec = mCMVInfos[cruID];
160 LOGP(warning,
"CMV packet size mismatch: got {} bytes, expected {} bytes (sizeof cmv::Container). Skipping package.",
size,
sizeof(
cmv::Container));
164 auto data = it.data();
166 const uint32_t
orbit = cmvs.header.heartbeatOrbit;
167 const uint16_t
bc = cmvs.header.heartbeatBC;
170 infoVec.emplace_back(
orbit,
bc);
171 cmvVec.reserve(cmvVec.size() + cmv::NTimeBinsPerPacket);
172 for (uint32_t tb = 0; tb < cmv::NTimeBinsPerPacket; ++tb) {
173 cmvVec.push_back(cmvs.getCMV(tb));
177 }
catch (
const std::exception& e) {
179 using namespace std::literals::chrono_literals;
180 static std::unordered_map<uint32_t, size_t> nErrorPerSubspec;
181 static std::chrono::time_point<std::chrono::steady_clock> lastReport = std::chrono::steady_clock::now();
182 const auto now = std::chrono::steady_clock::now();
183 static size_t reportedErrors = 0;
184 const size_t MAXERRORS = 10;
186 ++nErrorPerSubspec[subSpecification];
189 if (reportedErrors < MAXERRORS) {
191 std::string sleepInfo;
192 if (reportedErrors == MAXERRORS) {
193 sleepInfo = fmt::format(
", maximum error count ({}) reached, not reporting for the next {}", MAXERRORS,
sleepTime);
195 LOGP(alarm,
"EXCEPTION in processRawData: {} -> skipping part:{}/{} of spec:{}/{}/{}, size:{}, error count for subspec: {}{}", e.what(), dh->splitPayloadIndex, dh->splitPayloadParts,
196 dh->dataOrigin, dh->dataDescription, subSpecification, payloadSize, nErrorPerSubspec.at(subSpecification), sleepInfo);
207 hasErrors |= snapshotCMVs(pc.outputs(), tfCounter);
209 if (mWriteDebug || (mWriteDebugOnError && hasErrors)) {
210 writeDebugOutput(tfCounter);
213 if (mWriteRawDataOnError && hasErrors) {
214 writeRawData(pc.inputs());