Project
Loading...
Searching...
No Matches
RawToDigitConverterSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include <string>
12#include <fairlogger/Logger.h>
29#include "CPVBase/Geometry.h"
32
33using namespace o2::cpv::reco_workflow;
36
38{
39 mStartTime = std::chrono::system_clock::now();
40 mDecoderErrorsPerMinute = 0;
41 mIsMuteDecoderErrors = false;
42
43 LOG(debug) << "Initializing RawToDigitConverterSpec...";
44 // Pedestal flag true/false
45 LOG(info) << "Pedestal run: " << (mIsPedestalData ? "YES" : "NO");
46 if (mIsPedestalData) { // no calibration for pedestal runs needed
47 mIsUsingGainCalibration = false;
48 mIsUsingBadMap = false;
49 LOG(info) << "CCDB is not used. Task configuration is done.";
50 return; // all other flags are irrelevant in this case
51 }
52
53 // is use-gain-calibration flag setted?
54 LOG(info) << "Gain calibration is " << (mIsUsingGainCalibration ? "ON" : "OFF");
55
56 // is use-bad-channel-map flag setted?
57 LOG(info) << "Bad channel rejection is " << (mIsUsingBadMap ? "ON" : "OFF");
58
59 LOG(info) << "Task configuration is done.";
60}
61
63{
64 if (matcher == framework::ConcreteDataMatcher("CTP", "Trig_Offset", 0)) {
65 LOG(info) << "RawToDigitConverterSpec::finaliseCCDB() : CTP/Config/TriggerOffsets updated.";
67 par.printKeyValues();
68 return;
69 }
70}
71
72void RawToDigitConverterSpec::updateTimeDependentParams(framework::ProcessingContext& ctx)
73{
74 static bool updateOnlyOnce = false;
75 if (!updateOnlyOnce) {
76 ctx.inputs().get<o2::ctp::TriggerOffsetsParam*>("trigoffset");
77
78 // std::decay_t<decltype(ctx.inputs().get<o2::cpv::Pedestals*>("peds"))> pedPtr{};
79 // std::decay_t<decltype(ctx.inputs().get<o2::cpv::BadChannelMap*>("badmap"))> badMapPtr{};
80 // std::decay_t<decltype(ctx.inputs().get<o2::cpv::CalibParams*>("gains"))> gainsPtr{};
81
82 if (!mIsPedestalData) {
83 auto pedPtr = ctx.inputs().get<o2::cpv::Pedestals*>("peds");
84 mPedestals = pedPtr.get();
85 }
86
87 if (mIsUsingBadMap) {
88 auto badMapPtr = ctx.inputs().get<o2::cpv::BadChannelMap*>("badmap");
89 mBadMap = badMapPtr.get();
90 }
91
92 if (mIsUsingGainCalibration) {
93 auto gainsPtr = ctx.inputs().get<o2::cpv::CalibParams*>("gains");
94 mGains = gainsPtr.get();
95 }
96 updateOnlyOnce = true;
97 }
98}
99
101{
102 updateTimeDependentParams(ctx);
103 // check timers if we need mute/unmute error reporting
104 auto now = std::chrono::system_clock::now();
105 if (mIsMuteDecoderErrors) { // check if 10-minutes muting period passed
106 if (((now - mTimeWhenMuted) / std::chrono::minutes(1)) >= 10) {
107 mIsMuteDecoderErrors = false; // unmute
108 if (mDecoderErrorsCounterWhenMuted) {
109 LOG(error) << "RawToDigitConverterSpec::run() : " << mDecoderErrorsCounterWhenMuted << " errors happened while it was muted ((";
110 }
111 mDecoderErrorsCounterWhenMuted = 0;
112 }
113 }
114 if (((now - mStartTime) / std::chrono::minutes(1)) > mMinutesPassed) {
115 mMinutesPassed = (now - mStartTime) / std::chrono::minutes(1);
116 LOG(debug) << "minutes passed: " << mMinutesPassed;
117 mDecoderErrorsPerMinute = 0;
118 }
119
120 // Cache digits from bunch crossings as the component reads timeframes from many links consecutively
121 std::map<o2::InteractionRecord, std::shared_ptr<std::vector<o2::cpv::Digit>>> digitBuffer; // Internal digit buffer
122 mOutputHWErrors.clear();
123
124 // if we see requested data type input with 0xDEADBEEF subspec and 0 payload this means that the "delayed message"
125 // mechanism created it in absence of real data from upstream. Processor should send empty output to not block the workflow
126 static size_t contDeadBeef = 0; // number of times 0xDEADBEEF was seen continuously
127 std::vector<o2::framework::InputSpec> dummy{o2::framework::InputSpec{"dummy", o2::framework::ConcreteDataMatcher{"CPV", o2::header::gDataDescriptionRawData, 0xDEADBEEF}}};
128 for (const auto& ref : framework::InputRecordWalker(ctx.inputs(), dummy)) {
129 const auto dh = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
131 if (payloadSize == 0) { // send empty output
133 if (++contDeadBeef <= maxWarn) {
134 LOGP(alarm, "Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : assuming no payload for all links in this TF{}",
135 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->firstTForbit, payloadSize,
136 contDeadBeef == maxWarn ? fmt::format(". {} such inputs in row received, stopping reporting", contDeadBeef) : "");
137 }
138 mOutputDigits.clear();
139 ctx.outputs().snapshot(o2::framework::Output{"CPV", "DIGITS", 0}, mOutputDigits);
140 mOutputTriggerRecords.clear();
141 ctx.outputs().snapshot(o2::framework::Output{"CPV", "DIGITTRIGREC", 0}, mOutputTriggerRecords);
142 mOutputHWErrors.clear();
143 ctx.outputs().snapshot(o2::framework::Output{"CPV", "RAWHWERRORS", 0}, mOutputHWErrors);
144 return; // empty TF, nothing to process
145 }
146 }
147 contDeadBeef = 0; // if good data, reset the counter
148
149 bool skipTF = false; // skip TF due to fatal error?
150
151 std::vector<o2::framework::InputSpec> rawFilter{
152 {"RAWDATA", o2::framework::ConcreteDataTypeMatcher{"CPV", "RAWDATA"}, o2::framework::Lifetime::Timeframe},
153 };
154 for (const auto& rawData : framework::InputRecordWalker(ctx.inputs(), rawFilter)) {
155 o2::cpv::RawReaderMemory rawreader(o2::framework::DataRefUtils::as<const char>(rawData));
156 // loop over all the DMA pages
157 while (rawreader.hasNext()) {
158 try {
159 rawreader.next();
160 } catch (RawErrorType_t e) {
161 if (!mIsMuteDecoderErrors) {
162 LOG(error) << "Raw decoding error " << (int)e;
163 }
164 // add error list
165 // RawErrorType_t is defined in O2/Detectors/CPV/reconstruction/include/CPVReconstruction/RawReaderMemory.h
166 // RawDecoderError(short c, short d, short g, short p, RawErrorType_t e)
167 mOutputHWErrors.emplace_back(-1, 0, 0, 0, e); // Put general errors to non-existing ccId -1
168 // if problem in header, abandon this page
169 if (e == RawErrorType_t::kRDH_DECODING) { // fatal error -> skip whole TF
170 LOG(error) << "RDH decoding error. Skipping this TF";
171 skipTF = true;
172 break;
173 }
174 if (e == RawErrorType_t::kPAGE_NOTFOUND || // nothing left to read -> skip to next HBF
175 e == RawErrorType_t::kNOT_CPV_RDH || // not cpv rdh -> skip to next HBF
176 e == RawErrorType_t::kOFFSET_TO_NEXT_IS_0) { // offset to next package is 0 -> do not know how to read next -> skip to next HBF
177 break;
178 }
179 // if problem in payload, try to continue
180 continue;
181 }
182 auto& rdh = rawreader.getRawHeader();
183 auto triggerOrbit = o2::raw::RDHUtils::getTriggerOrbit(rdh);
184 auto mod = o2::raw::RDHUtils::getLinkID(rdh) + 2; // link=0,1,2 -> mod=2,3,4
185 // for now all modules are written to one LinkID
186 if (mod > o2::cpv::Geometry::kNMod || mod < 2) { // only 3 correct modules:2,3,4
187 if (!mIsMuteDecoderErrors) {
188 LOG(error) << "RDH linkId corresponds to module " << mod << " which does not exist";
189 }
190 mOutputHWErrors.emplace_back(-1, mod, 0, 0, kRDH_INVALID); // Add non-existing modules to non-existing ccId -1 and dilogic = mod
191 continue;
192 }
193 o2::cpv::RawDecoder decoder(rawreader);
194 if (mIsMuteDecoderErrors) {
195 decoder.muteErrors();
196 }
197 RawErrorType_t err = decoder.decode();
198 int decoderErrors = 0;
199 for (auto errs : decoder.getErrors()) {
200 if (errs.ccId == -1) { // error related to wrong data format
201 decoderErrors++;
202 }
203 }
204 mDecoderErrorsPerMinute += decoderErrors;
205 // LOG(debug) << "RawDecoder found " << decoderErrors << " raw format errors";
206 // LOG(debug) << "Now I have " << mDecoderErrorsPerMinute << " errors for current minute";
207 if (mIsMuteDecoderErrors) {
208 mDecoderErrorsCounterWhenMuted += decoder.getErrors().size();
209 } else {
210 if (mDecoderErrorsPerMinute > 10) { // mute error reporting for 10 minutes
211 LOG(warning) << "> 10 raw decoder error messages per minute, muting it for 10 minutes";
212 mIsMuteDecoderErrors = true;
213 mTimeWhenMuted = std::chrono::system_clock::now();
214 }
215 }
216
217 if (!(err == kOK || err == kOK_NO_PAYLOAD)) {
218 // TODO handle severe errors
219 // TODO: probably careful conversion of decoder errors to Fitter errors?
220 mOutputHWErrors.emplace_back(-1, mod, 0, 0, err); // assign general RDH errors to non-existing ccId -1 and dilogic = mod
221 }
222
223 std::shared_ptr<std::vector<o2::cpv::Digit>> currentDigitContainer;
224 auto digilets = decoder.getDigits();
225 if (digilets.empty()) { // no digits -> continue to next pages
226 continue;
227 }
228 o2::InteractionRecord currentIR(0, triggerOrbit); //(bc, orbit)
229 // Loop over all the BCs
230 for (auto itBCRecords : decoder.getBCRecords()) {
231 currentIR.bc = itBCRecords.bc;
232 for (unsigned int iDig = itBCRecords.firstDigit; iDig <= itBCRecords.lastDigit; iDig++) {
233 auto adch = digilets[iDig];
234 auto found = digitBuffer.find(currentIR);
235 if (found == digitBuffer.end()) {
236 currentDigitContainer = std::make_shared<std::vector<o2::cpv::Digit>>();
237 digitBuffer[currentIR] = currentDigitContainer;
238 } else {
239 currentDigitContainer = found->second;
240 }
241
242 AddressCharge ac = {adch};
243 unsigned short absId = ac.Address;
244 // if we deal with non-pedestal data?
245 if (!mIsPedestalData) { // not a pedestal data
246 // test bad map
247 if (mIsUsingBadMap) {
248 if (!mBadMap->isChannelGood(absId)) {
249 continue; // skip bad channel
250 }
251 }
252 float amp = 0;
253 if (mIsUsingGainCalibration) { // calibrate amplitude
254 amp = mGains->getGain(absId) * (ac.Charge - mPedestals->getPedestal(absId));
255 } else { // no gain calibration needed
256 amp = ac.Charge - mPedestals->getPedestal(absId);
257 }
258 if (amp > 0) { // emplace new digit
259 currentDigitContainer->emplace_back(absId, amp, -1);
260 }
261 } else { // pedestal data, no calibration needed.
262 currentDigitContainer->emplace_back(absId, (float)ac.Charge, -1);
263 }
264 }
265 }
266 // Check and send list of hwErrors
267 for (auto& er : decoder.getErrors()) {
268 mOutputHWErrors.push_back(er);
269 }
270 }
271 // if corrupted raw data is present in this TF -> skip it
272 if (skipTF) {
273 // Send no digits
274 mOutputDigits.clear();
275 ctx.outputs().snapshot(o2::framework::Output{"CPV", "DIGITS", 0}, mOutputDigits);
276 mOutputTriggerRecords.clear();
277 ctx.outputs().snapshot(o2::framework::Output{"CPV", "DIGITTRIGREC", 0}, mOutputTriggerRecords);
278 // Send errors
279 ctx.outputs().snapshot(o2::framework::Output{"CPV", "RAWHWERRORS", 0}, mOutputHWErrors);
280 return;
281 }
282 }
283
284 // Loop over BCs, sort digits with increasing digit ID and write to output containers
285 mOutputDigits.clear();
286 mOutputTriggerRecords.clear();
287 const auto tfOrbitFirst = ctx.services().get<o2::framework::TimingInfo>().firstTForbit;
288 const auto& ctpOffsets = o2::ctp::TriggerOffsetsParam::Instance();
289 for (auto [bc, digits] : digitBuffer) {
290 if (bc.differenceInBC({0, tfOrbitFirst}) < ctpOffsets.LM_L0) {
291 continue; // discard this BC as it is actually out of
292 }
293 int prevDigitSize = mOutputDigits.size();
294 if (digits->size()) {
295 // Sort digits according to digit ID
296 std::sort(digits->begin(), digits->end(), [](o2::cpv::Digit& lhs, o2::cpv::Digit& rhs) { return lhs.getAbsId() < rhs.getAbsId(); });
297
298 for (auto digit : *digits) {
299 mOutputDigits.push_back(digit);
300 }
301 }
302 // subtract ctp L0-LM offset here
303 mOutputTriggerRecords.emplace_back(bc - ctpOffsets.LM_L0, prevDigitSize, mOutputDigits.size() - prevDigitSize);
304 }
305 digitBuffer.clear();
306
307 LOG(info) << "[CPVRawToDigitConverter - run] Sending " << mOutputDigits.size() << " digits in " << mOutputTriggerRecords.size() << "trigger records.";
308 ctx.outputs().snapshot(o2::framework::Output{"CPV", "DIGITS", 0}, mOutputDigits);
309 ctx.outputs().snapshot(o2::framework::Output{"CPV", "DIGITTRIGREC", 0}, mOutputTriggerRecords);
310 ctx.outputs().snapshot(o2::framework::Output{"CPV", "RAWHWERRORS", 0}, mOutputHWErrors);
311}
312//_____________________________________________________________________________
313o2::framework::DataProcessorSpec o2::cpv::reco_workflow::getRawToDigitConverterSpec(bool askDISTSTF, bool isPedestal, bool useBadChannelMap, bool useGainCalibration)
314{
315 std::vector<o2::framework::InputSpec> inputs;
316 inputs.emplace_back("RAWDATA", o2::framework::ConcreteDataTypeMatcher{"CPV", "RAWDATA"}, o2::framework::Lifetime::Timeframe);
317 // receive at least 1 guaranteed input (which will allow to acknowledge the TF)
318 if (askDISTSTF) {
319 inputs.emplace_back("STFDist", "FLP", "DISTSUBTIMEFRAME", 0, o2::framework::Lifetime::Timeframe);
320 }
321 if (!isPedestal) {
322 inputs.emplace_back("peds", "CPV", "CPV_Pedestals", 0, o2::framework::Lifetime::Condition, o2::framework::ccdbParamSpec("CPV/Calib/Pedestals"));
323 if (useBadChannelMap) {
324 inputs.emplace_back("badmap", "CPV", "CPV_BadMap", 0, o2::framework::Lifetime::Condition, o2::framework::ccdbParamSpec("CPV/Calib/BadChannelMap"));
325 }
326 if (useGainCalibration) {
327 inputs.emplace_back("gains", "CPV", "CPV_Gains", 0, o2::framework::Lifetime::Condition, o2::framework::ccdbParamSpec("CPV/Calib/Gains"));
328 }
329 }
330 // for BC correction
331 inputs.emplace_back("trigoffset", "CTP", "Trig_Offset", 0, o2::framework::Lifetime::Condition, o2::framework::ccdbParamSpec("CTP/Config/TriggerOffsets"));
332
333 std::vector<o2::framework::OutputSpec> outputs;
334 outputs.emplace_back("CPV", "DIGITS", 0, o2::framework::Lifetime::Timeframe);
335 outputs.emplace_back("CPV", "DIGITTRIGREC", 0, o2::framework::Lifetime::Timeframe);
336 outputs.emplace_back("CPV", "RAWHWERRORS", 0, o2::framework::Lifetime::Timeframe);
337 // note that for cpv we always have stream #0 (i.e. CPV/DIGITS/0)
338
339 return o2::framework::DataProcessorSpec{"CPVRawToDigitConverterSpec",
340 inputs, // o2::framework::select("A:CPV/RAWDATA"),
341 outputs,
342 o2::framework::adaptFromTask<o2::cpv::reco_workflow::RawToDigitConverterSpec>(isPedestal, useBadChannelMap, useGainCalibration),
344}
std::ostringstream debug
uint64_t bc
Definition RawEventData.h:5
A helper class to iteratate over all parts of all input routes.
Definition of the Names Generator class.
CCDB container for bad (masked) channels in CPV.
bool isChannelGood(unsigned short channelID) const
Get the status of a certain cell.
float getGain(unsigned short cellID) const
Get High Gain energy calibration coefficients.
Definition CalibParams.h:49
static constexpr short kNMod
Definition Geometry.h:38
short getPedestal(short cellID) const
Get pedestal.
Definition Pedestals.h:49
Decoder of the ALTRO data in the raw page.
Definition RawDecoder.h:68
const std::vector< uint32_t > & getDigits() const
Get the reference to the digits container.
Definition RawDecoder.h:89
const std::vector< o2::cpv::RawDecoderError > & getErrors() const
Get the reference to the list of decoding errors.
Definition RawDecoder.h:97
RawErrorType_t decode()
Decode the raw cpv payload stream.
void muteErrors()
mute error reporting
Definition RawDecoder.h:100
const std::vector< o2::cpv::BCRecord > & getBCRecords() const
Get the reference to the BC records.
Definition RawDecoder.h:93
Reader for raw data produced by the Readout application in in-memory format.
RawErrorType_t next()
Read next payload from the stream.
bool hasNext() const
check if more pages are available in the raw file
const o2::header::RDHAny & getRawHeader() const
access to the raw header of the current page
void finaliseCCDB(framework::ConcreteDataMatcher &matcher, void *obj) final
void init(framework::InitContext &ctx) final
Initializing the RawToDigitConverterSpec.
void run(framework::ProcessingContext &ctx) final
Run conversion of raw data to cells.
void snapshot(const Output &spec, T const &object)
A helper class to iteratate over all parts of all input routes.
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
constexpr o2::header::DataDescription gDataDescriptionRawData
Definition DataHeader.h:598
o2::framework::DataProcessorSpec getRawToDigitConverterSpec(bool askDISTSTF=true, bool isPedestal=false, bool useBadChannelMap=true, bool useGainCalibration=true)
Creating DataProcessorSpec for the CPV Digit Converter Spec.
@ kOK
NoError.
@ kOFFSET_TO_NEXT_IS_0
@ kOK_NO_PAYLOAD
No payload per ddl (not error)
Lifetime
Possible Lifetime of objects being exchanged by the DPL.
Definition Lifetime.h:18
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
std::vector< ConfigParamSpec > Options
uint16_t bc
bunch crossing ID of interaction
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::vector< Digit > digits
uint32_t Charge
Bits 18 - 32 : charge.
Definition RawDecoder.h:44
uint32_t Address
Bits 0 - 17 : Address.
Definition RawDecoder.h:43