1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See for details of the copyright holders.
3// All rights not expressly granted are reserved.
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
16#include <vector>
40namespace o2
42namespace itsmft
45using namespace o2::framework;
48template <class Mapping>
49STFDecoder<Mapping>::STFDecoder(const STFDecoderInp& inp, std::shared_ptr<o2::base::GRPGeomRequest> gr)
50 : mDoClusters(inp.doClusters), mDoPatterns(inp.doPatterns), mDoDigits(inp.doDigits), mDoCalibData(inp.doCalib), mAllowReporting(inp.allowReporting), mVerifyDecoder(inp.verifyDecoder), mInputSpec(inp.inputSpec), mGGCCDBRequest(gr)
52 mSelfName = o2::utils::Str::concat_string(Mapping::getName(), "STFDecoder");
53 mTimer.Stop();
54 mTimer.Reset();
58template <class Mapping>
62 try {
63 mDecoder = std::make_unique<RawPixelDecoder<Mapping>>();
64 auto v0 = o2::utils::Str::tokenize(mInputSpec, ':');
65 auto v1 = o2::utils::Str::tokenize(v0[1], '/');
66 auto v2 = o2::utils::Str::tokenize(v1[1], '?');
67 header::DataOrigin dataOrig;
69 dataOrig.runtimeInit(v1[0].c_str());
70 dataDesc.runtimeInit(v2[0].c_str());
71 mDecoder->setUserDataOrigin(dataOrig);
72 mDecoder->setUserDataDescription(dataDesc);
73 mDecoder->init(); // is this no-op?
74 } catch (const std::exception& e) {
75 LOG(error) << "exception was thrown in decoder creation: " << e.what();
76 throw;
77 } catch (...) {
78 LOG(error) << "non-std::exception was thrown in decoder creation";
79 throw;
80 }
81 mApplyNoiseMap = !ic.options().get<bool>("ignore-noise-map");
82 mUseClusterDictionary = !ic.options().get<bool>("ignore-cluster-dictionary");
83 try {
84 float fr = ic.options().get<float>("rof-lenght-error-freq");
85 mROFErrRepIntervalMS = fr <= 0. ? -1 : long(fr * 1e3);
86 mNThreads = std::max(1, ic.options().get<int>("nthreads"));
87 mDecoder->setNThreads(mNThreads);
88 mUnmutExtraLanes = ic.options().get<bool>("unmute-extra-lanes");
89 mVerbosity = ic.options().get<int>("decoder-verbosity");
90 auto dmpSz = ic.options().get<int>("stop-raw-data-dumps-after-size");
91 if (dmpSz > 0) {
92 mMaxRawDumpsSize = size_t(dmpSz) * 1024 * 1024;
93 }
94 mDumpOnError = ic.options().get<int>("raw-data-dumps");
95 if (mDumpOnError < 0) {
96 mDumpOnError = -mDumpOnError;
97 mDumpFrom1stPipeline = true;
98 }
99 if (mDumpOnError >= int(GBTLink::RawDataDumps::DUMP_NTYPES)) {
100 throw std::runtime_error(fmt::format("unknown raw data dump level {} requested", mDumpOnError));
101 }
102 auto dumpDir = ic.options().get<std::string>("raw-data-dumps-directory");
103 if (mDumpOnError != int(GBTLink::RawDataDumps::DUMP_NONE) && (!dumpDir.empty() && !o2::utils::Str::pathIsDirectory(dumpDir))) {
104 throw std::runtime_error(fmt::format("directory {} for raw data dumps does not exist", dumpDir));
105 }
106 mDecoder->setAlwaysParseTrigger(ic.options().get<bool>("always-parse-trigger"));
107 mDecoder->setAllowEmptyROFs(ic.options().get<bool>("allow-empty-rofs"));
108 mDecoder->setRawDumpDirectory(dumpDir);
109 mDecoder->setFillCalibData(mDoCalibData);
110 mDecoder->setVerifyDecoder(mVerifyDecoder);
111 bool ignoreRampUp = !ic.options().get<bool>("accept-rof-rampup-data");
112 mDecoder->setSkipRampUpData(ignoreRampUp);
113 } catch (const std::exception& e) {
114 LOG(error) << "exception was thrown in decoder configuration: " << e.what();
115 throw;
116 } catch (...) {
117 LOG(error) << "non-std::exception was thrown in decoder configuration";
118 throw;
119 }
121 if (mDoClusters) {
122 mClusterer = std::make_unique<Clusterer>();
123 mClusterer->setNChips(Mapping::getNChips());
124 }
128template <class Mapping>
131 updateTimeDependentParams(pc);
132 static bool firstCall = true;
133 if (!firstCall &&<o2::framework::TimingInfo>().globalRunNumberChanged) { // reset at the beginning of the new run
134 reset();
135 }
136 if (firstCall) {
137 firstCall = false;
138 mDecoder->setInstanceID(<const o2::framework::DeviceSpec>().inputTimesliceId);
139 mDecoder->setNInstances(<const o2::framework::DeviceSpec>().maxInputTimeslices);
140 mDecoder->setVerbosity(mDecoder->getInstanceID() == 0 ? mVerbosity : (mUnmutExtraLanes ? mVerbosity : -1));
141 mAllowReporting &= (mDecoder->getInstanceID() == 0) || mUnmutExtraLanes;
142 }
144 int nSlots = pc.inputs().getNofParts(0);
145 double timeCPU0 = mTimer.CpuTime(), timeReal0 = mTimer.RealTime();
146 mTimer.Start(false);
147 auto orig = Mapping::getOrigin();
148 std::vector<o2::itsmft::CompClusterExt> clusCompVec;
149 std::vector<o2::itsmft::ROFRecord> clusROFVec;
150 std::vector<unsigned char> clusPattVec;
152 std::vector<Digit> digVec;
153 std::vector<GBTCalibData> calVec;
154 std::vector<ROFRecord> digROFVec;
155 auto& chipStatus = pc.outputs().make<std::vector<char>>(Output{orig, "CHIPSSTATUS", 0}, (size_t)Mapping::getNChips());
157 try {
158 mDecoder->startNewTF(pc.inputs());
159 if (mDoDigits) {
160 digVec.reserve(mEstNDig);
161 digROFVec.reserve(mEstNROF);
162 }
163 if (mDoClusters) {
164 clusCompVec.reserve(mEstNClus);
165 clusROFVec.reserve(mEstNROF);
166 clusPattVec.reserve(mEstNClusPatt);
167 }
168 if (mDoCalibData) {
169 calVec.reserve(mEstNCalib);
170 }
172 mDecoder->setDecodeNextAuto(false);
173 o2::InteractionRecord lastIR{}, firstIR{0,<o2::framework::TimingInfo>().firstTForbit};
174 int nTriggersProcessed = mDecoder->getNROFsProcessed();
175 static long lastErrReportTS = 0;
176 while (mDecoder->decodeNextTrigger() >= 0) {
177 if ((!lastIR.isDummy() && lastIR >= mDecoder->getInteractionRecord()) || firstIR > mDecoder->getInteractionRecord()) {
178 const int MaxErrLog = 2;
179 static int errLocCount = 0;
180 if (errLocCount++ < MaxErrLog) {
181 LOGP(warn, "Impossible ROF IR {}, previous was {}, TF 1st IR was {}, discarding in decoding", mDecoder->getInteractionRecord().asString(), lastIR.asString(), firstIR.asString());
182 }
183 nTriggersProcessed = 0x7fffffff; // to account for a problem with event
184 continue;
185 }
186 lastIR = mDecoder->getInteractionRecord();
187 mDecoder->fillChipsStatus(chipStatus);
188 if (mDoDigits || mClusterer->getMaxROFDepthToSquash()) { // call before clusterization, since the latter will hide the digits
189 mDecoder->fillDecodedDigits(digVec, digROFVec); // lot of copying involved
190 if (mDoCalibData) {
191 mDecoder->fillCalibData(calVec);
192 }
193 }
194 if (mDoClusters && !mClusterer->getMaxROFDepthToSquash()) { // !!! THREADS !!!
195 mClusterer->process(mNThreads, *mDecoder.get(), &clusCompVec, mDoPatterns ? &clusPattVec : nullptr, &clusROFVec);
196 }
197 }
198 nTriggersProcessed = mDecoder->getNROFsProcessed() - nTriggersProcessed - 1;
200 const auto& alpParams = o2::itsmft::DPLAlpideParam<Mapping::getDetID()>::Instance();
201 int expectedTFSize = static_cast<int>(o2::constants::lhc::LHCMaxBunches * o2::base::GRPGeomHelper::instance().getGRPECS()->getNHBFPerTF() / alpParams.roFrameLengthInBC); // 3564*32 / ROF Length in BS = number of ROFs per TF
202 if ((expectedTFSize != nTriggersProcessed) && mROFErrRepIntervalMS > 0 && mTFCounter > 1 && nTriggersProcessed > 0) {
203 long currTS = std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()).time_since_epoch().count();
204 if (currTS - lastErrReportTS > mROFErrRepIntervalMS) {
205 LOGP(error, "Inconsistent number of ROF per TF. From parameters: {} from readout: {} (muting further reporting for {} ms)", expectedTFSize, nTriggersProcessed, mROFErrRepIntervalMS);
206 lastErrReportTS = currTS;
207 }
208 }
209 if (mDoClusters && mClusterer->getMaxROFDepthToSquash()) {
210 // Digits squashing require to run on a batch of digits and uses a digit reader, cannot (?) run with decoder
211 // - Setup decoder for running on a batch of digits
213 reader.setSquashingDepth(mClusterer->getMaxROFDepthToSquash());
214 reader.setSquashingDist(mClusterer->getMaxRowColDiffToMask()); // Sharing same parameter/logic with masking
215 reader.setMaxBCSeparationToSquash(mClusterer->getMaxBCSeparationToSquash());
216 reader.setDigits(digVec);
217 reader.setROFRecords(digROFVec);
218 reader.init();
220 mClusterer->process(mNThreads, reader, &clusCompVec, mDoPatterns ? &clusPattVec : nullptr, &clusROFVec);
221 }
222 } catch (const std::exception& e) {
223 static size_t nErr = 0;
225 if (++nErr < maxWarn) {
226 LOGP(alarm, "EXCEPTION {} in raw decoder, abandoning TF decoding {}", e.what(), nErr == maxWarn ? "(will mute further warnings)" : "");
227 }
228 }
229 if (mDoDigits) {
230 pc.outputs().snapshot(Output{orig, "DIGITS", 0}, digVec);
231 pc.outputs().snapshot(Output{orig, "DIGITSROF", 0}, digROFVec);
232 mEstNDig = std::max(mEstNDig, size_t(digVec.size() * 1.2));
233 mEstNROF = std::max(mEstNROF, size_t(digROFVec.size() * 1.2));
234 if (mDoCalibData) {
235 pc.outputs().snapshot(Output{orig, "GBTCALIB", 0}, calVec);
236 mEstNCalib = std::max(mEstNCalib, size_t(calVec.size() * 1.2));
237 }
238 }
240 if (mDoClusters) { // we are not obliged to create vectors which are not requested, but other devices might not know the options of this one
241 pc.outputs().snapshot(Output{orig, "COMPCLUSTERS", 0}, clusCompVec);
242 pc.outputs().snapshot(Output{orig, "PATTERNS", 0}, clusPattVec);
243 pc.outputs().snapshot(Output{orig, "CLUSTERSROF", 0}, clusROFVec);
244 mEstNClus = std::max(mEstNClus, size_t(clusCompVec.size() * 1.2));
245 mEstNClusPatt = std::max(mEstNClusPatt, size_t(clusPattVec.size() * 1.2));
246 mEstNROF = std::max(mEstNROF, size_t(clusROFVec.size() * 1.2));
247 }
248 auto& linkErrors = pc.outputs().make<std::vector<GBTLinkDecodingStat>>(Output{orig, "LinkErrors", 0});
249 auto& decErrors = pc.outputs().make<std::vector<ChipError>>(Output{orig, "ChipErrors", 0});
250 auto& errMessages = pc.outputs().make<std::vector<ErrorMessage>>(Output{orig, "ErrorInfo", 0});
251 mDecoder->collectDecodingErrors(linkErrors, decErrors, errMessages);
253 pc.outputs().snapshot(Output{orig, "PHYSTRIG", 0}, mDecoder->getExternalTriggers());
255 if (mDumpOnError != int(GBTLink::RawDataDumps::DUMP_NONE) &&
256 (!mDumpFrom1stPipeline ||<const o2::framework::DeviceSpec>().inputTimesliceId == 0)) {
257 mRawDumpedSize += mDecoder->produceRawDataDumps(mDumpOnError,<o2::framework::TimingInfo>());
258 if (mRawDumpedSize > mMaxRawDumpsSize && mMaxRawDumpsSize > 0) {
259 LOGP(info, "Max total dumped size {} MB exceeded allowed limit, disabling further dumping", mRawDumpedSize / (1024 * 1024));
261 }
262 }
264 if (mDoClusters) {
265 LOG(debug) << mSelfName << " Built " << clusCompVec.size() << " clusters in " << clusROFVec.size() << " ROFs";
266 }
267 if (mDoDigits) {
268 LOG(debug) << mSelfName << " Decoded " << digVec.size() << " Digits in " << digROFVec.size() << " ROFs";
269 }
270 mTimer.Stop();
271 auto tfID =<o2::framework::TimingInfo>().tfCounter;
273 LOG(debug) << mSelfName << " Total time for TF " << tfID << '(' << mTFCounter << ") : CPU: " << mTimer.CpuTime() - timeCPU0 << " Real: " << mTimer.RealTime() - timeReal0;
274 mTFCounter++;
278template <class Mapping>
281 if (mFinalizeDone) {
282 return;
283 }
284 mFinalizeDone = true;
285 LOGF(info, "%s statistics:", mSelfName);
286 LOGF(info, "%s Total STF decoding%s timing (w/o disk IO): Cpu: %.3e Real: %.3e s in %d slots", mSelfName,
287 mDoClusters ? "/clustering" : "", mTimer.CpuTime(), mTimer.RealTime(), mTimer.Counter() - 1);
288 if (mDecoder && mAllowReporting) {
289 mDecoder->printReport();
290 }
291 if (mClusterer) {
292 mClusterer->print();
293 }
297template <class Mapping>
298void STFDecoder<Mapping>::updateTimeDependentParams(ProcessingContext& pc)
300 // we call these methods just to trigger finaliseCCDB callback
302 if (<o2::framework::TimingInfo>().globalRunNumberChanged) { // this params need to be queried only in the beginning of the run
303 pc.inputs().get<o2::itsmft::NoiseMap*>("noise");
304 pc.inputs().get<o2::itsmft::DPLAlpideParam<Mapping::getDetID()>*>("alppar");
305 const auto& alpParams = DPLAlpideParam<Mapping::getDetID()>::Instance();
306 alpParams.printKeyValues();
307 if (mDoClusters) {
308 mClusterer->setContinuousReadOut(o2::base::GRPGeomHelper::instance().getGRPECS()->isDetContinuousReadOut(Mapping::getDetID()));
310 pc.inputs().get<o2::itsmft::ClustererParam<Mapping::getDetID()>*>("cluspar");
311 // settings for the fired pixel overflow masking
312 const auto& clParams = ClustererParam<Mapping::getDetID()>::Instance();
313 if (clParams.maxBCDiffToMaskBias > 0 && clParams.maxBCDiffToSquashBias > 0) {
314 LOGP(fatal, "maxBCDiffToMaskBias = {} and maxBCDiffToMaskBias = {} cannot be set at the same time. Either set masking or squashing with a BCDiff > 0", clParams.maxBCDiffToMaskBias, clParams.maxBCDiffToSquashBias);
315 }
316 clParams.printKeyValues();
317 auto nbc = clParams.maxBCDiffToMaskBias;
318 nbc += mClusterer->isContinuousReadOut() ? alpParams.roFrameLengthInBC : (alpParams.roFrameLengthTrig / o2::constants::lhc::LHCBunchSpacingNS);
319 mClusterer->setMaxBCSeparationToMask(nbc);
320 mClusterer->setMaxRowColDiffToMask(clParams.maxRowColDiffToMask);
321 // Squasher
322 int rofBC = mClusterer->isContinuousReadOut() ? alpParams.roFrameLengthInBC : (alpParams.roFrameLengthTrig / o2::constants::lhc::LHCBunchSpacingNS); // ROF length in BC
323 mClusterer->setMaxBCSeparationToSquash(rofBC + clParams.maxBCDiffToSquashBias);
324 int nROFsToSquash = 0; // squashing disabled if no reset due to maxSOTMUS>0.
325 if (clParams.maxSOTMUS > 0 && rofBC > 0) {
326 nROFsToSquash = 2 + int(clParams.maxSOTMUS / (rofBC * o2::constants::lhc::LHCBunchSpacingMUS)); // use squashing
327 }
328 mClusterer->setMaxROFDepthToSquash(clParams.maxBCDiffToSquashBias > 0 ? nROFsToSquash : 0);
329 mClusterer->print();
330 }
331 }
335template <class Mapping>
338 if (o2::base::GRPGeomHelper::instance().finaliseCCDB(matcher, obj)) {
339 return;
340 }
341 if (matcher == ConcreteDataMatcher(Mapping::getOrigin(), "NOISEMAP", 0)) {
342 LOG(info) << Mapping::getName() << " noise map updated" << (!mApplyNoiseMap ? " but masking is disabled" : "");
343 if (mApplyNoiseMap) {
345 }
346 return;
347 }
348 if (matcher == ConcreteDataMatcher(Mapping::getOrigin(), "CLUSDICT", 0)) {
349 LOG(info) << Mapping::getName() << " cluster dictionary updated" << (!mUseClusterDictionary ? " but its using is disabled" : "");
350 if (mUseClusterDictionary) {
351 mClusterer->setDictionary((const TopologyDictionary*)obj);
352 }
353 return;
354 }
355 // Note: strictly speaking, for Configurable params we don't need finaliseCCDB check, the singletons are updated at the CCDB fetcher level
356 if (matcher == ConcreteDataMatcher(Mapping::getOrigin(), "ALPIDEPARAM", 0)) {
357 LOG(info) << "Alpide param updated";
358 return;
359 }
363template <class Mapping>
366 // reset for the new run
367 mFinalizeDone = false;
368 mTFCounter = 0;
369 mTimer.Reset();
370 if (mDecoder) {
371 mDecoder->reset();
372 }
373 if (mClusterer) {
374 mClusterer->reset();
375 }
381 std::vector<OutputSpec> outputs;
382 auto inputs = o2::framework::select(inp.inputSpec.c_str());
383 if (inp.doDigits) {
384 outputs.emplace_back(inp.origin, "DIGITS", 0, Lifetime::Timeframe);
385 outputs.emplace_back(inp.origin, "DIGITSROF", 0, Lifetime::Timeframe);
386 if (inp.doCalib) {
387 outputs.emplace_back(inp.origin, "GBTCALIB", 0, Lifetime::Timeframe);
388 }
389 }
390 if (inp.doClusters) {
391 outputs.emplace_back(inp.origin, "COMPCLUSTERS", 0, Lifetime::Timeframe);
392 outputs.emplace_back(inp.origin, "CLUSTERSROF", 0, Lifetime::Timeframe);
393 // in principle, we don't need to open this input if we don't need to send real data,
394 // but other devices expecting it do not know about options of this device: problem?
395 // if (doClusters && doPatterns)
396 outputs.emplace_back(inp.origin, "PATTERNS", 0, Lifetime::Timeframe);
397 }
398 outputs.emplace_back(inp.origin, "PHYSTRIG", 0, Lifetime::Timeframe);
400 outputs.emplace_back(inp.origin, "LinkErrors", 0, Lifetime::Timeframe);
401 outputs.emplace_back(inp.origin, "ChipErrors", 0, Lifetime::Timeframe);
402 outputs.emplace_back(inp.origin, "ErrorInfo", 0, Lifetime::Timeframe);
403 outputs.emplace_back(inp.origin, "CHIPSSTATUS", 0, Lifetime::Timeframe);
405 if (inp.askSTFDist) {
406 // request the input FLP/DISTSUBTIMEFRAME/0 that is _guaranteed_ to be present, even if none of our raw data is present.
407 inputs.emplace_back("stfDist", "FLP", "DISTSUBTIMEFRAME", 0, o2::framework::Lifetime::Timeframe);
408 }
409 inputs.emplace_back("noise", inp.origin, "NOISEMAP", 0, Lifetime::Condition,
410 o2::framework::ccdbParamSpec(fmt::format("{}/Calib/NoiseMap",<std::string>())));
411 inputs.emplace_back("alppar", inp.origin, "ALPIDEPARAM", 0, Lifetime::Condition, ccdbParamSpec(fmt::format("{}/Config/AlpideParam",<std::string>())));
412 if (inp.doClusters) {
413 inputs.emplace_back("cldict", inp.origin, "CLUSDICT", 0, Lifetime::Condition, ccdbParamSpec(fmt::format("{}/Calib/ClusterDictionary",<std::string>())));
414 inputs.emplace_back("cluspar", inp.origin, "CLUSPARAM", 0, Lifetime::Condition, ccdbParamSpec(fmt::format("{}/Config/ClustererParam",<std::string>())));
415 }
417 auto ggRequest = std::make_shared<o2::base::GRPGeomRequest>(false, // orbitResetTime
418 true, // GRPECS=true
419 false, // GRPLHCIF
420 false, // GRPMagField
421 false, // askMatLUT
423 inputs,
424 true); // query only once all objects except mag.field
426 return DataProcessorSpec{
427 inp.deviceName,
428 inputs,
429 outputs,
430 inp.origin == o2::header::gDataOriginITS ? AlgorithmSpec{adaptFromTask<STFDecoder<ChipMappingITS>>(inp, ggRequest)} : AlgorithmSpec{adaptFromTask<STFDecoder<ChipMappingMFT>>(inp, ggRequest)},
431 Options{
432 {"nthreads", VariantType::Int, 1, {"Number of decoding/clustering threads"}},
433 {"decoder-verbosity", VariantType::Int, 0, {"Verbosity level (-1: silent, 0: errors, 1: headers, 2: data, 3: raw data dump) of 1st lane"}},
434 {"always-parse-trigger", VariantType::Bool, false, {"parse trigger word even if flags continuation of old trigger"}},
435 {"raw-data-dumps", VariantType::Int, int(GBTLink::RawDataDumps::DUMP_NONE), {"Raw data dumps on error (0: none, 1: HBF for link, 2: whole TF for all links. If negative, dump only on from 1st pipeline."}},
436 {"raw-data-dumps-directory", VariantType::String, "", {"Destination directory for the raw data dumps"}},
437 {"stop-raw-data-dumps-after-size", VariantType::Int, 1024, {"Stop dumping once this size in MB is accumulated. 0: no limit"}},
438 {"unmute-extra-lanes", VariantType::Bool, false, {"allow extra lanes to be as verbose as 1st one"}},
439 {"allow-empty-rofs", VariantType::Bool, false, {"record ROFs w/o any hit"}},
440 {"ignore-noise-map", VariantType::Bool, false, {"do not mask pixels flagged in the noise map"}},
441 {"accept-rof-rampup-data", VariantType::Bool, false, {"do not discard data during ROF ramp up"}},
442 {"rof-lenght-error-freq", VariantType::Float, 60.f, {"do not report ROF lenght error more frequently than this value, disable if negative"}},
443 {"ignore-cluster-dictionary", VariantType::Bool, false, {"do not use cluster dictionary, always store explicit patterns"}}}};
446} // namespace itsmft
447} // namespace o2
