63 mDecoder = std::make_unique<RawPixelDecoder<Mapping>>();
71 mDecoder->setUserDataOrigin(dataOrig);
72 mDecoder->setUserDataDescription(dataDesc);
74 }
catch (
const std::exception& e) {
75 LOG(error) <<
"exception was thrown in decoder creation: " << e.what();
78 LOG(error) <<
"non-std::exception was thrown in decoder creation";
81 mApplyNoiseMap = !ic.
options().
get<
bool>(
"ignore-noise-map");
82 mUseClusterDictionary = !ic.
options().
get<
bool>(
"ignore-cluster-dictionary");
84 float fr = ic.
options().
get<
float>(
"rof-lenght-error-freq");
85 mROFErrRepIntervalMS = fr <= 0. ? -1 :
long(fr * 1e3);
86 mNThreads = std::max(1, ic.
options().
get<
int>(
"nthreads"));
87 mDecoder->setNThreads(mNThreads);
88 mUnmutExtraLanes = ic.
options().
get<
bool>(
"unmute-extra-lanes");
89 mVerbosity = ic.
options().
get<
int>(
"decoder-verbosity");
90 auto dmpSz = ic.
options().
get<
int>(
"stop-raw-data-dumps-after-size");
92 mMaxRawDumpsSize = size_t(dmpSz) * 1024 * 1024;
94 mDumpOnError = ic.
options().
get<
int>(
"raw-data-dumps");
95 if (mDumpOnError < 0) {
96 mDumpOnError = -mDumpOnError;
97 mDumpFrom1stPipeline =
true;
100 throw std::runtime_error(fmt::format(
"unknown raw data dump level {} requested", mDumpOnError));
102 auto dumpDir = ic.
options().
get<std::string>(
"raw-data-dumps-directory");
104 throw std::runtime_error(fmt::format(
"directory {} for raw data dumps does not exist", dumpDir));
106 mDecoder->setAlwaysParseTrigger(ic.
options().
get<
bool>(
"always-parse-trigger"));
107 mDecoder->setAllowEmptyROFs(ic.
options().
get<
bool>(
"allow-empty-rofs"));
108 mDecoder->setRawDumpDirectory(dumpDir);
109 mDecoder->setFillCalibData(mDoCalibData);
110 mDecoder->setVerifyDecoder(mVerifyDecoder);
111 bool ignoreRampUp = !ic.
options().
get<
bool>(
"accept-rof-rampup-data");
112 mDecoder->setSkipRampUpData(ignoreRampUp);
113 }
catch (
const std::exception& e) {
114 LOG(error) <<
"exception was thrown in decoder configuration: " << e.what();
117 LOG(error) <<
"non-std::exception was thrown in decoder configuration";
122 mClusterer = std::make_unique<Clusterer>();
123 mClusterer->setNChips(Mapping::getNChips());
131 updateTimeDependentParams(pc);
132 static bool firstCall =
true;
140 mDecoder->setVerbosity(mDecoder->getInstanceID() == 0 ? mVerbosity : (mUnmutExtraLanes ? mVerbosity : -1));
141 mAllowReporting &= (mDecoder->getInstanceID() == 0) || mUnmutExtraLanes;
145 double timeCPU0 = mTimer.CpuTime(), timeReal0 = mTimer.RealTime();
147 auto orig = Mapping::getOrigin();
148 std::vector<o2::itsmft::CompClusterExt> clusCompVec;
149 std::vector<o2::itsmft::ROFRecord> clusROFVec;
150 std::vector<unsigned char> clusPattVec;
152 std::vector<Digit> digVec;
153 std::vector<GBTCalibData> calVec;
154 std::vector<ROFRecord> digROFVec;
155 auto& chipStatus = pc.
outputs().
make<std::vector<char>>(
Output{orig,
"CHIPSSTATUS", 0}, (size_t)Mapping::getNChips());
158 mDecoder->startNewTF(pc.
inputs());
160 digVec.reserve(mEstNDig);
161 digROFVec.reserve(mEstNROF);
164 clusCompVec.reserve(mEstNClus);
165 clusROFVec.reserve(mEstNROF);
166 clusPattVec.reserve(mEstNClusPatt);
169 calVec.reserve(mEstNCalib);
172 mDecoder->setDecodeNextAuto(
false);
174 int nTriggersProcessed = mDecoder->getNROFsProcessed();
175 static long lastErrReportTS = 0;
176 while (mDecoder->decodeNextTrigger() >= 0) {
177 if ((!lastIR.isDummy() && lastIR >= mDecoder->getInteractionRecord()) || firstIR > mDecoder->getInteractionRecord()) {
178 const int MaxErrLog = 2;
179 static int errLocCount = 0;
180 if (errLocCount++ < MaxErrLog) {
181 LOGP(warn,
"Impossible ROF IR {}, previous was {}, TF 1st IR was {}, discarding in decoding", mDecoder->getInteractionRecord().asString(), lastIR.asString(), firstIR.asString());
183 nTriggersProcessed = 0x7fffffff;
186 lastIR = mDecoder->getInteractionRecord();
187 mDecoder->fillChipsStatus(chipStatus);
188 if (mDoDigits || mClusterer->getMaxROFDepthToSquash()) {
189 mDecoder->fillDecodedDigits(digVec, digROFVec);
191 mDecoder->fillCalibData(calVec);
194 if (mDoClusters && !mClusterer->getMaxROFDepthToSquash()) {
195 mClusterer->process(mNThreads, *mDecoder.get(), &clusCompVec, mDoPatterns ? &clusPattVec :
nullptr, &clusROFVec);
198 nTriggersProcessed = mDecoder->getNROFsProcessed() - nTriggersProcessed - 1;
202 if ((expectedTFSize != nTriggersProcessed) && mROFErrRepIntervalMS > 0 && mTFCounter > 1 && nTriggersProcessed > 0) {
203 long currTS = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
204 if (currTS - lastErrReportTS > mROFErrRepIntervalMS) {
205 LOGP(error,
"Inconsistent number of ROF per TF. From parameters: {} from readout: {} (muting further reporting for {} ms)", expectedTFSize, nTriggersProcessed, mROFErrRepIntervalMS);
206 lastErrReportTS = currTS;
209 if (mDoClusters && mClusterer->getMaxROFDepthToSquash()) {
220 mClusterer->process(mNThreads, reader, &clusCompVec, mDoPatterns ? &clusPattVec :
nullptr, &clusROFVec);
222 }
catch (
const std::exception& e) {
223 static size_t nErr = 0;
225 if (++nErr < maxWarn) {
226 LOGP(alarm,
"EXCEPTION {} in raw decoder, abandoning TF decoding {}", e.what(), nErr == maxWarn ?
"(will mute further warnings)" :
"");
232 mEstNDig = std::max(mEstNDig,
size_t(digVec.size() * 1.2));
233 mEstNROF = std::max(mEstNROF,
size_t(digROFVec.size() * 1.2));
236 mEstNCalib = std::max(mEstNCalib,
size_t(calVec.size() * 1.2));
244 mEstNClus = std::max(mEstNClus,
size_t(clusCompVec.size() * 1.2));
245 mEstNClusPatt = std::max(mEstNClusPatt,
size_t(clusPattVec.size() * 1.2));
246 mEstNROF = std::max(mEstNROF,
size_t(clusROFVec.size() * 1.2));
248 auto& linkErrors = pc.
outputs().
make<std::vector<GBTLinkDecodingStat>>(
Output{orig,
"LinkErrors", 0});
249 auto& decErrors = pc.
outputs().
make<std::vector<ChipError>>(
Output{orig,
"ChipErrors", 0});
250 auto& errMessages = pc.
outputs().
make<std::vector<ErrorMessage>>(
Output{orig,
"ErrorInfo", 0});
251 mDecoder->collectDecodingErrors(linkErrors, decErrors, errMessages);
258 if (mRawDumpedSize > mMaxRawDumpsSize && mMaxRawDumpsSize > 0) {
259 LOGP(info,
"Max total dumped size {} MB exceeded allowed limit, disabling further dumping", mRawDumpedSize / (1024 * 1024));
265 LOG(
debug) << mSelfName <<
" Built " << clusCompVec.size() <<
" clusters in " << clusROFVec.size() <<
" ROFs";
268 LOG(
debug) << mSelfName <<
" Decoded " << digVec.size() <<
" Digits in " << digROFVec.size() <<
" ROFs";
273 LOG(
debug) << mSelfName <<
" Total time for TF " << tfID <<
'(' << mTFCounter <<
") : CPU: " << mTimer.CpuTime() - timeCPU0 <<
" Real: " << mTimer.RealTime() - timeReal0;
381 std::vector<OutputSpec> outputs;
384 outputs.emplace_back(inp.
origin,
"DIGITS", 0, Lifetime::Timeframe);
385 outputs.emplace_back(inp.
origin,
"DIGITSROF", 0, Lifetime::Timeframe);
387 outputs.emplace_back(inp.
origin,
"GBTCALIB", 0, Lifetime::Timeframe);
391 outputs.emplace_back(inp.
origin,
"COMPCLUSTERS", 0, Lifetime::Timeframe);
392 outputs.emplace_back(inp.
origin,
"CLUSTERSROF", 0, Lifetime::Timeframe);
396 outputs.emplace_back(inp.
origin,
"PATTERNS", 0, Lifetime::Timeframe);
398 outputs.emplace_back(inp.
origin,
"PHYSTRIG", 0, Lifetime::Timeframe);
400 outputs.emplace_back(inp.
origin,
"LinkErrors", 0, Lifetime::Timeframe);
401 outputs.emplace_back(inp.
origin,
"ChipErrors", 0, Lifetime::Timeframe);
402 outputs.emplace_back(inp.
origin,
"ErrorInfo", 0, Lifetime::Timeframe);
403 outputs.emplace_back(inp.
origin,
"CHIPSSTATUS", 0, Lifetime::Timeframe);
407 inputs.emplace_back(
"stfDist",
"FLP",
"DISTSUBTIMEFRAME", 0, o2::framework::Lifetime::Timeframe);
409 inputs.emplace_back(
"noise", inp.
origin,
"NOISEMAP", 0, Lifetime::Condition,
411 inputs.emplace_back(
"alppar", inp.
origin,
"ALPIDEPARAM", 0, Lifetime::Condition,
ccdbParamSpec(fmt::format(
"{}/Config/AlpideParam", inp.
origin.
as<std::string>())));
413 inputs.emplace_back(
"cldict", inp.
origin,
"CLUSDICT", 0, Lifetime::Condition,
ccdbParamSpec(fmt::format(
"{}/Calib/ClusterDictionary", inp.
origin.
as<std::string>())));
414 inputs.emplace_back(
"cluspar", inp.
origin,
"CLUSPARAM", 0, Lifetime::Condition,
ccdbParamSpec(fmt::format(
"{}/Config/ClustererParam", inp.
origin.
as<std::string>())));
417 auto ggRequest = std::make_shared<o2::base::GRPGeomRequest>(
false,
432 {
"nthreads", VariantType::Int, 1, {
"Number of decoding/clustering threads"}},
433 {
"decoder-verbosity", VariantType::Int, 0, {
"Verbosity level (-1: silent, 0: errors, 1: headers, 2: data, 3: raw data dump) of 1st lane"}},
434 {
"always-parse-trigger", VariantType::Bool,
false, {
"parse trigger word even if flags continuation of old trigger"}},
435 {
"raw-data-dumps", VariantType::Int,
int(
GBTLink::RawDataDumps::DUMP_NONE), {
"Raw data dumps on error (0: none, 1: HBF for link, 2: whole TF for all links. If negative, dump only on from 1st pipeline."}},
436 {
"raw-data-dumps-directory", VariantType::String,
"", {
"Destination directory for the raw data dumps"}},
437 {
"stop-raw-data-dumps-after-size", VariantType::Int, 1024, {
"Stop dumping once this size in MB is accumulated. 0: no limit"}},
438 {
"unmute-extra-lanes", VariantType::Bool,
false, {
"allow extra lanes to be as verbose as 1st one"}},
439 {
"allow-empty-rofs", VariantType::Bool,
false, {
"record ROFs w/o any hit"}},
440 {
"ignore-noise-map", VariantType::Bool,
false, {
"do not mask pixels flagged in the noise map"}},
441 {
"accept-rof-rampup-data", VariantType::Bool,
false, {
"do not discard data during ROF ramp up"}},
442 {
"rof-lenght-error-freq", VariantType::Float, 60.f, {
"do not report ROF lenght error more frequently than this value, disable if negative"}},
443 {
"ignore-cluster-dictionary", VariantType::Bool,
false, {
"do not use cluster dictionary, always store explicit patterns"}}}};
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.