71 mWriteDebug = ic.options().get<
bool>(
"write-debug");
72 mWriteDebugOnError = ic.options().get<
bool>(
"write-debug-on-error");
73 mWriteRawDataOnError = ic.options().get<
bool>(
"write-raw-data-on-error");
74 mRawDataType = ic.options().get<
int>(
"raw-data-type");
77 mDebugStreamFileName = ic.options().get<std::string>(
"debug-file-name").
data();
78 mRawOutputFileName = ic.options().get<std::string>(
"raw-file-name").
data();
80 mSwapLinks = ic.options().get<
bool>(
"swap-links");
82 auto pedestalFile = ic.options().get<std::string>(
"pedestal-url");
83 if (pedestalFile.length()) {
86 const auto tsPos = pedestalFile.find(
"@");
87 std::string pedestalURL = pedestalFile.substr(0, tsPos);
88 if (pedestalURL.find(
"ccdb") != std::string::npos) {
89 if (pedestalURL.find(
"-default") != std::string::npos) {
92 LOGP(info,
"Loading pedestals from ccdb: {}", pedestalURL);
93 cdb.setURL(pedestalURL);
94 if (cdb.isHostReachable()) {
95 if (tsPos != std::string::npos) {
96 timeStamp = std::stol(pedestalFile.substr(tsPos + 1));
97 LOGP(info,
"Using custom time stamp {}", timeStamp);
99 auto pedestalNoise = cdb.getForTimeStamp<std::unordered_map<std::string, CalPad>>(
"TPC/Calib/PedestalNoise", timeStamp);
101 if (!pedestalNoise) {
102 throw std::runtime_error(
"Couldn't retrieve PedestaNoise map");
104 mPedestal = std::make_unique<CalPad>(pedestalNoise->at(
"Pedestals"));
105 }
catch (
const std::exception& e) {
106 LOGP(fatal,
"could not load pedestals from {} ({}), required for IDC processing", pedestalURL, e.what());
109 LOGP(fatal,
"ccdb access to {} requested, but host is not reachable. Cannot load pedestals, required for IDC processing", pedestalURL);
112 LOGP(info,
"Loading pedestals from file: {}", pedestalURL);
114 if (calPads.size() != 1) {
115 LOGP(fatal,
"Pedestal could not be loaded from file {}, required for IDC processing", pedestalURL);
117 mPedestal.reset(calPads[0]);
121 LOGP(error,
"No pedestal file set, IDCs will be without pedestal subtraction!");
134 if ((mWriteDebug || mWriteDebugOnError) && !mDebugStream) {
135 const auto debugFileName = fmt::format(fmt::runtime(mDebugStreamFileName), fmt::arg(
"run", runNumber));
136 LOGP(info,
"creating debug stream {}", debugFileName);
137 mDebugStream = std::make_unique<o2::utils::TreeStreamRedirector>(debugFileName.data(),
"recreate");
140 if (mWriteRawDataOnError && !mRawOutputFile.is_open()) {
141 std::string_view rawType = (mRawDataType < 2) ?
"tf" :
"raw";
142 if (mRawDataType == 4) {
145 const auto rawFileName = fmt::format(fmt::runtime(mRawOutputFileName), fmt::arg(
"run", runNumber), fmt::arg(
"raw_type", rawType));
146 LOGP(info,
"creating raw debug file {}", rawFileName);
147 mRawOutputFile.open(rawFileName, std::ios::binary);
150 uint32_t heartbeatOrbit = 0;
151 uint32_t heartbeatBC = 0;
152 uint32_t tfCounter = 0;
154 bool hasErrors =
false;
156 CalPad* pedestals = mPedestal.get();
159 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(
ref);
160 tfCounter = dh->tfCounter;
161 const auto subSpecification = dh->subSpecification;
165 const gsl::span<const char> raw = pc.inputs().get<gsl::span<char>>(
ref);
168 size_t lastErrorCount = 0;
170 for (
auto it = parser.
begin(),
end = parser.
end(); it !=
end; ++it) {
171 const auto size = it.size();
185 if (!rdhPtr || rdhVersion < 6) {
186 throw std::runtime_error(fmt::format(
"could not get RDH from packet, or version {} < 6", rdhVersion).
data());
190 const auto feeId = (
FEEIDType)RDHUtils::getFEEID(*rdhPtr);
191 const auto link = rdh_utils::getLink(feeId);
192 const uint32_t cruID = rdh_utils::getCRU(feeId);
193 const auto endPoint = rdh_utils::getEndPoint(feeId);
194 const auto detField = RDHUtils::getDetectorField(*rdhPtr);
198 if ((detField != (
decltype(detField))RawDataType::IDC) || (link != rdh_utils::IDCLinkID)) {
201 LOGP(
debug,
"IDC Processing firstTForbit {:9}, tfCounter {:5}, run {:6}, feeId {:6} ({:3}/{}/{:2})", dh->firstTForbit, dh->tfCounter, dh->runNumber, feeId, cruID, endPoint, link);
203 if (std::find(mCRUs.begin(), mCRUs.end(), cruID) == mCRUs.end()) {
204 LOGP(error,
"IDC CRU {:3} not configured in CRUs, skipping", cruID);
208 const CRU cru(cruID);
209 const int sector = cru.
sector();
210 const auto& partInfo = mapper.getPartitionInfo(cru.
partition());
211 const int fecLinkOffsetCRU = (partInfo.getNumberOfFECs() + 1) / 2;
212 const int fecSectorOffset = partInfo.getSectorFECOffset();
215 int sampaOnFEC{}, channelOnSAMPA{};
216 auto& idcVec = mIDCvectors[cruID];
217 auto& infoVec = mIDCInfos[cruID];
220 auto data = it.data();
222 const uint32_t
orbit = idcs.header.heartbeatOrbit;
223 const uint32_t
bc = idcs.header.heartbeatBC;
226 auto infoIt = std::find(infoVec.begin(), infoVec.end(),
orbit);
227 if (!infoVec.size()) {
228 infoVec.emplace_back(
orbit,
bc);
229 infoIt = infoVec.end() - 1;
230 }
else if (infoIt == infoVec.end()) {
231 auto& lastInfo = infoVec.back();
232 if ((
orbit - lastInfo.heartbeatOrbit) != mNOrbitsIDC) {
233 LOGP(error,
"received packet with invalid jump in idc orbit ({} - {} == {} != {})",
orbit, lastInfo.heartbeatOrbit,
int(
orbit) -
int(lastInfo.heartbeatOrbit), mNOrbitsIDC);
236 infoVec.emplace_back(
orbit,
bc);
237 infoIt = infoVec.end() - 1;
241 auto& lastInfo = *infoIt;
242 if (lastInfo.wasEPseen(endPoint)) {
243 LOGP(
debug,
"Already received another data packet for CRU {}, ep {}, orbit {}, bc {}", cruID, endPoint,
orbit,
bc);
247 lastInfo.setEPseen(endPoint);
249 const size_t idcOffset = std::distance(infoVec.begin(), infoIt);
254 const float norm = 1. / float(mTimeStampsPerIntegrationInterval);
256 const uint32_t iLink = mSwapLinks ? (idc::Links - iLinkTmp - 1) : iLinkTmp;
258 if (!idcs.hasLink(iLink)) {
262 const int fecInSector = iLinkTmp + endPoint * fecLinkOffsetCRU + fecSectorOffset;
264 for (uint32_t iChannel = 0; iChannel < idc::Channels; ++iChannel) {
265 auto val = idcs.getChannelValueFloat(iLink, iChannel);
267 const GlobalPadNumber padInSector = mapper.globalPadNumber(fecInSector, sampaOnFEC, channelOnSAMPA);
269 val -= pedestals->
getValue(sector, padInSector) * mTimeStampsPerIntegrationInterval;
273 const GlobalPadNumber vectorPosition = padInRegion + idcOffset * numberPads;
278 idcVec[vectorPosition] =
val;
282 }
catch (
const std::exception& e) {
284 using namespace std::literals::chrono_literals;
285 static std::unordered_map<uint32_t, size_t> nErrorPerSubspec;
286 static std::chrono::time_point<std::chrono::steady_clock> lastReport = std::chrono::steady_clock::now();
287 const auto now = std::chrono::steady_clock::now();
288 static size_t reportedErrors = 0;
289 const size_t MAXERRORS = 10;
291 ++nErrorPerSubspec[subSpecification];
294 if (reportedErrors < MAXERRORS) {
296 std::string sleepInfo;
297 if (reportedErrors == MAXERRORS) {
298 sleepInfo = fmt::format(
", maximum error count ({}) reached, not reporting for the next {}", MAXERRORS,
sleepTime);
300 LOGP(alarm,
"EXCEPTIION in processRawData: {} -> skipping part:{}/{} of spec:{}/{}/{}, size:{}, error count for subspec: {}{}", e.what(), dh->splitPayloadIndex, dh->splitPayloadParts,
301 dh->dataOrigin, dh->dataDescription, subSpecification, payloadSize, nErrorPerSubspec.at(subSpecification), sleepInfo);
312 hasErrors |= snapshotIDCs(pc.outputs(), tfCounter);
314 if (mWriteDebug || (mWriteDebugOnError && hasErrors)) {
315 writeDebugOutput(tfCounter);
318 if (mWriteRawDataOnError && hasErrors) {
319 writeRawData(pc.inputs());