Project
Loading...
Searching...
No Matches
RawToDigitConverterSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include <string>
12#include <fairlogger/Logger.h>
29#include "CPVBase/Geometry.h"
32
33using namespace o2::cpv::reco_workflow;
36
38{
39 mStartTime = std::chrono::system_clock::now();
40 mDecoderErrorsPerMinute = 0;
41 mIsMuteDecoderErrors = false;
42
43 LOG(debug) << "Initializing RawToDigitConverterSpec...";
44 // Pedestal flag true/false
45 LOG(info) << "Pedestal run: " << (mIsPedestalData ? "YES" : "NO");
46 if (mIsPedestalData) { // no calibration for pedestal runs needed
47 mIsUsingGainCalibration = false;
48 mIsUsingBadMap = false;
49 LOG(info) << "CCDB is not used. Task configuration is done.";
50 return; // all other flags are irrelevant in this case
51 }
52
53 // is use-gain-calibration flag setted?
54 LOG(info) << "Gain calibration is " << (mIsUsingGainCalibration ? "ON" : "OFF");
55
56 // is use-bad-channel-map flag setted?
57 LOG(info) << "Bad channel rejection is " << (mIsUsingBadMap ? "ON" : "OFF");
58
59 LOG(info) << "Task configuration is done.";
60}
61
63{
64 if (matcher == framework::ConcreteDataMatcher("CTP", "Trig_Offset", 0)) {
65 LOG(info) << "RawToDigitConverterSpec::finaliseCCDB() : CTP/Config/TriggerOffsets updated.";
67 par.printKeyValues();
68 return;
69 }
70}
71
72void RawToDigitConverterSpec::updateTimeDependentParams(framework::ProcessingContext& ctx)
73{
74 static bool updateOnlyOnce = false;
75 if (!updateOnlyOnce) {
76 ctx.inputs().get<o2::ctp::TriggerOffsetsParam*>("trigoffset");
77
78 // std::decay_t<decltype(ctx.inputs().get<o2::cpv::Pedestals*>("peds"))> pedPtr{};
79 // std::decay_t<decltype(ctx.inputs().get<o2::cpv::BadChannelMap*>("badmap"))> badMapPtr{};
80 // std::decay_t<decltype(ctx.inputs().get<o2::cpv::CalibParams*>("gains"))> gainsPtr{};
81
82 if (!mIsPedestalData) {
83 auto pedPtr = ctx.inputs().get<o2::cpv::Pedestals*>("peds");
84 mPedestals = pedPtr.get();
85 }
86
87 if (mIsUsingBadMap) {
88 auto badMapPtr = ctx.inputs().get<o2::cpv::BadChannelMap*>("badmap");
89 mBadMap = badMapPtr.get();
90 }
91
92 if (mIsUsingGainCalibration) {
93 auto gainsPtr = ctx.inputs().get<o2::cpv::CalibParams*>("gains");
94 mGains = gainsPtr.get();
95 }
96 updateOnlyOnce = true;
97 }
98}
99
101{
102 updateTimeDependentParams(ctx);
103 // check timers if we need mute/unmute error reporting
104 auto now = std::chrono::system_clock::now();
105 if (mIsMuteDecoderErrors) { // check if 10-minutes muting period passed
106 if (((now - mTimeWhenMuted) / std::chrono::minutes(1)) >= 10) {
107 mIsMuteDecoderErrors = false; // unmute
108 if (mDecoderErrorsCounterWhenMuted) {
109 LOG(error) << "RawToDigitConverterSpec::run() : " << mDecoderErrorsCounterWhenMuted << " errors happened while it was muted ((";
110 }
111 mDecoderErrorsCounterWhenMuted = 0;
112 }
113 }
114 if (((now - mStartTime) / std::chrono::minutes(1)) > mMinutesPassed) {
115 mMinutesPassed = (now - mStartTime) / std::chrono::minutes(1);
116 LOG(debug) << "minutes passed: " << mMinutesPassed;
117 mDecoderErrorsPerMinute = 0;
118 }
119
120 // Cache digits from bunch crossings as the component reads timeframes from many links consecutively
121 std::map<o2::InteractionRecord, std::shared_ptr<std::vector<o2::cpv::Digit>>> digitBuffer; // Internal digit buffer
122 mOutputHWErrors.clear();
123
124 // if we see requested data type input with 0xDEADBEEF subspec and 0 payload this means that the "delayed message"
125 // mechanism created it in absence of real data from upstream. Processor should send empty output to not block the workflow
126 static size_t contDeadBeef = 0; // number of times 0xDEADBEEF was seen continuously
127 std::vector<o2::framework::InputSpec> dummy{o2::framework::InputSpec{"dummy", o2::framework::ConcreteDataMatcher{"CPV", o2::header::gDataDescriptionRawData, 0xDEADBEEF}}};
128 for (const auto& ref : framework::InputRecordWalker(ctx.inputs(), dummy)) {
129 const auto dh = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
131 if (payloadSize == 0) { // send empty output
133 if (++contDeadBeef <= maxWarn) {
134 LOGP(alarm, "Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : assuming no payload for all links in this TF{}",
135 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->firstTForbit, payloadSize,
136 contDeadBeef == maxWarn ? fmt::format(". {} such inputs in row received, stopping reporting", contDeadBeef) : "");
137 }
138 mOutputDigits.clear();
139 ctx.outputs().snapshot(o2::framework::Output{"CPV", "DIGITS", 0}, mOutputDigits);
140 mOutputTriggerRecords.clear();
141 ctx.outputs().snapshot(o2::framework::Output{"CPV", "DIGITTRIGREC", 0}, mOutputTriggerRecords);
142 mOutputHWErrors.clear();
143 ctx.outputs().snapshot(o2::framework::Output{"CPV", "RAWHWERRORS", 0}, mOutputHWErrors);
144 return; // empty TF, nothing to process
145 }
146 }
147 contDeadBeef = 0; // if good data, reset the counter
148
149 bool skipTF = false; // skip TF due to fatal error?
150
151 std::vector<o2::framework::InputSpec> rawFilter{
152 {"RAWDATA", o2::framework::ConcreteDataTypeMatcher{"CPV", "RAWDATA"}, o2::framework::Lifetime::Timeframe},
153 };
154 for (const auto& rawData : framework::InputRecordWalker(ctx.inputs(), rawFilter)) {
155 o2::cpv::RawReaderMemory rawreader(o2::framework::DataRefUtils::as<const char>(rawData));
156 // loop over all the DMA pages
157 while (rawreader.hasNext()) {
158 try {
159 rawreader.next();
160 } catch (RawErrorType_t e) {
161 if (!mIsMuteDecoderErrors) {
162 LOG(error) << "Raw decoding error " << (int)e;
163 }
164 // add error list
165 // RawErrorType_t is defined in O2/Detectors/CPV/reconstruction/include/CPVReconstruction/RawReaderMemory.h
166 // RawDecoderError(short c, short d, short g, short p, RawErrorType_t e)
167 mOutputHWErrors.emplace_back(-1, 0, 0, 0, e); // Put general errors to non-existing ccId -1
168 // if problem in header, abandon this page
169 if (e == RawErrorType_t::kRDH_DECODING) { // fatal error -> skip whole TF
170 LOG(error) << "RDH decoding error. Skipping this TF";
171 skipTF = true;
172 break;
173 }
174 if (e == RawErrorType_t::kPAGE_NOTFOUND || // nothing left to read -> skip to next HBF
175 e == RawErrorType_t::kNOT_CPV_RDH || // not cpv rdh -> skip to next HBF
176 e == RawErrorType_t::kOFFSET_TO_NEXT_IS_0) { // offset to next package is 0 -> do not know how to read next -> skip to next HBF
177 break;
178 }
179 // if problem in payload, try to continue
180 continue;
181 }
182 auto& rdh = rawreader.getRawHeader();
183 auto triggerOrbit = o2::raw::RDHUtils::getTriggerOrbit(rdh);
184 auto mod = o2::raw::RDHUtils::getLinkID(rdh) + 2; // link=0,1,2 -> mod=2,3,4
185 // for now all modules are written to one LinkID
186 if (mod > o2::cpv::Geometry::kNMod || mod < 2) { // only 3 correct modules:2,3,4
187 if (!mIsMuteDecoderErrors) {
188 LOG(error) << "RDH linkId corresponds to module " << mod << " which does not exist";
189 }
190 mOutputHWErrors.emplace_back(-1, mod, 0, 0, kRDH_INVALID); // Add non-existing modules to non-existing ccId -1 and dilogic = mod
191 continue;
192 }
193 o2::cpv::RawDecoder decoder(rawreader);
194 if (mIsMuteDecoderErrors) {
195 decoder.muteErrors();
196 }
197 RawErrorType_t err = decoder.decode();
198 int decoderErrors = 0;
199 for (auto errs : decoder.getErrors()) {
200 if (errs.ccId == -1) { // error related to wrong data format
201 decoderErrors++;
202 }
203 }
204 mDecoderErrorsPerMinute += decoderErrors;
205 // LOG(debug) << "RawDecoder found " << decoderErrors << " raw format errors";
206 // LOG(debug) << "Now I have " << mDecoderErrorsPerMinute << " errors for current minute";
207 if (mIsMuteDecoderErrors) {
208 mDecoderErrorsCounterWhenMuted += decoder.getErrors().size();
209 } else {
210 if (mDecoderErrorsPerMinute > 10) { // mute error reporting for 10 minutes
211 LOG(warning) << "> 10 raw decoder error messages per minute, muting it for 10 minutes";
212 mIsMuteDecoderErrors = true;
213 mTimeWhenMuted = std::chrono::system_clock::now();
214 }
215 }
216
217 if (!(err == kOK || err == kOK_NO_PAYLOAD)) {
218 // TODO handle severe errors
219 // TODO: probably careful conversion of decoder errors to Fitter errors?
220 mOutputHWErrors.emplace_back(-1, mod, 0, 0, err); // assign general RDH errors to non-existing ccId -1 and dilogic = mod
221 }
222
223 std::shared_ptr<std::vector<o2::cpv::Digit>> currentDigitContainer;
224 auto digilets = decoder.getDigits();
225 if (digilets.empty()) { // no digits -> continue to next pages
226 continue;
227 }
228 o2::InteractionRecord currentIR(0, triggerOrbit); //(bc, orbit)
229 // Loop over all the BCs
230 for (auto itBCRecords : decoder.getBCRecords()) {
231 currentIR.bc = itBCRecords.bc;
232 for (unsigned int iDig = itBCRecords.firstDigit; iDig <= itBCRecords.lastDigit; iDig++) {
233 auto adch = digilets[iDig];
234 auto found = digitBuffer.find(currentIR);
235 if (found == digitBuffer.end()) {
236 currentDigitContainer = std::make_shared<std::vector<o2::cpv::Digit>>();
237 digitBuffer[currentIR] = currentDigitContainer;
238 } else {
239 currentDigitContainer = found->second;
240 }
241
242 AddressCharge ac = {adch};
243 unsigned short absId = ac.Address;
244 // if we deal with non-pedestal data?
245 if (!mIsPedestalData) { // not a pedestal data
246 // test bad map
247 if (mIsUsingBadMap) {
248 if (!mBadMap->isChannelGood(absId)) {
249 continue; // skip bad channel
250 }
251 }
252 float amp = 0;
253 if (mIsUsingGainCalibration) { // calibrate amplitude
254 amp = mGains->getGain(absId) * (ac.Charge - mPedestals->getPedestal(absId));
255 } else { // no gain calibration needed
256 amp = ac.Charge - mPedestals->getPedestal(absId);
257 }
258 if (amp > 0) { // emplace new digit
259 currentDigitContainer->emplace_back(absId, amp, -1);
260 }
261 } else { // pedestal data, no calibration needed.
262 currentDigitContainer->emplace_back(absId, (float)ac.Charge, -1);
263 }
264 }
265 }
266 // Check and send list of hwErrors
267 for (auto& er : decoder.getErrors()) {
268 mOutputHWErrors.push_back(er);
269 }
270 }
271 // if corrupted raw data is present in this TF -> skip it
272 if (skipTF) {
273 // Send no digits
274 mOutputDigits.clear();
275 ctx.outputs().snapshot(o2::framework::Output{"CPV", "DIGITS", 0}, mOutputDigits);
276 mOutputTriggerRecords.clear();
277 ctx.outputs().snapshot(o2::framework::Output{"CPV", "DIGITTRIGREC", 0}, mOutputTriggerRecords);
278 // Send errors
279 ctx.outputs().snapshot(o2::framework::Output{"CPV", "RAWHWERRORS", 0}, mOutputHWErrors);
280 return;
281 }
282 }
283
284 // Loop over BCs, sort digits with increasing digit ID and write to output containers
285 mOutputDigits.clear();
286 mOutputTriggerRecords.clear();
287 const auto tfOrbitFirst = ctx.services().get<o2::framework::TimingInfo>().firstTForbit;
288 const auto& ctpOffsets = o2::ctp::TriggerOffsetsParam::Instance();
289 for (auto [bc, digits] : digitBuffer) {
290 if (bc.differenceInBC({0, tfOrbitFirst}) < ctpOffsets.LM_L0) {
291 continue; // discard this BC as it is actually out of
292 }
293 int prevDigitSize = mOutputDigits.size();
294 if (digits->size()) {
295 // Sort digits according to digit ID
296 std::sort(digits->begin(), digits->end(), [](o2::cpv::Digit& lhs, o2::cpv::Digit& rhs) { return lhs.getAbsId() < rhs.getAbsId(); });
297
298 for (auto digit : *digits) {
299 mOutputDigits.push_back(digit);
300 }
301 }
302 // subtract ctp L0-LM offset here
303 mOutputTriggerRecords.emplace_back(bc - ctpOffsets.LM_L0, prevDigitSize, mOutputDigits.size() - prevDigitSize);
304 }
305 digitBuffer.clear();
306
307 LOG(info) << "[CPVRawToDigitConverter - run] Sending " << mOutputDigits.size() << " digits in " << mOutputTriggerRecords.size() << "trigger records.";
308 ctx.outputs().snapshot(o2::framework::Output{"CPV", "DIGITS", 0}, mOutputDigits);
309 ctx.outputs().snapshot(o2::framework::Output{"CPV", "DIGITTRIGREC", 0}, mOutputTriggerRecords);
310 ctx.outputs().snapshot(o2::framework::Output{"CPV", "RAWHWERRORS", 0}, mOutputHWErrors);
311}
312//_____________________________________________________________________________
313o2::framework::DataProcessorSpec o2::cpv::reco_workflow::getRawToDigitConverterSpec(bool askDISTSTF, bool isPedestal, bool useBadChannelMap, bool useGainCalibration)
314{
315 std::vector<o2::framework::InputSpec> inputs;
316 inputs.emplace_back("RAWDATA", o2::framework::ConcreteDataTypeMatcher{"CPV", "RAWDATA"}, o2::framework::Lifetime::Timeframe);
317 // receive at least 1 guaranteed input (which will allow to acknowledge the TF)
318 if (askDISTSTF) {
319 inputs.emplace_back("STFDist", "FLP", "DISTSUBTIMEFRAME", 0, o2::framework::Lifetime::Timeframe);
320 }
321 if (!isPedestal) {
322 inputs.emplace_back("peds", "CPV", "CPV_Pedestals", 0, o2::framework::Lifetime::Condition, o2::framework::ccdbParamSpec("CPV/Calib/Pedestals"));
323 if (useBadChannelMap) {
324 inputs.emplace_back("badmap", "CPV", "CPV_BadMap", 0, o2::framework::Lifetime::Condition, o2::framework::ccdbParamSpec("CPV/Calib/BadChannelMap"));
325 }
326 if (useGainCalibration) {
327 inputs.emplace_back("gains", "CPV", "CPV_Gains", 0, o2::framework::Lifetime::Condition, o2::framework::ccdbParamSpec("CPV/Calib/Gains"));
328 }
329 }
330 // for BC correction
331 inputs.emplace_back("trigoffset", "CTP", "Trig_Offset", 0, o2::framework::Lifetime::Condition, o2::framework::ccdbParamSpec("CTP/Config/TriggerOffsets"));
332
333 std::vector<o2::framework::OutputSpec> outputs;
334 outputs.emplace_back("CPV", "DIGITS", 0, o2::framework::Lifetime::Timeframe);
335 outputs.emplace_back("CPV", "DIGITTRIGREC", 0, o2::framework::Lifetime::Timeframe);
336 outputs.emplace_back("CPV", "RAWHWERRORS", 0, o2::framework::Lifetime::Timeframe);
337 // note that for cpv we always have stream #0 (i.e. CPV/DIGITS/0)
338
339 return o2::framework::DataProcessorSpec{"CPVRawToDigitConverterSpec",
340 inputs, // o2::framework::select("A:CPV/RAWDATA"),
341 outputs,
342 o2::framework::adaptFromTask<o2::cpv::reco_workflow::RawToDigitConverterSpec>(isPedestal, useBadChannelMap, useGainCalibration),
344}
uint64_t bc
Definition RawEventData.h:5
A helper class to iteratate over all parts of all input routes.
Definition of the Names Generator class.
std::ostringstream debug
CCDB container for bad (masked) channels in CPV.
bool isChannelGood(unsigned short channelID) const
Get the status of a certain cell.
float getGain(unsigned short cellID) const
Get High Gain energy calibration coefficients.
Definition CalibParams.h:49
static constexpr short kNMod
Definition Geometry.h:38
short getPedestal(short cellID) const
Get pedestal.
Definition Pedestals.h:49
Decoder of the ALTRO data in the raw page.
Definition RawDecoder.h:68
const std::vector< uint32_t > & getDigits() const
Get the reference to the digits container.
Definition RawDecoder.h:89
const std::vector< o2::cpv::RawDecoderError > & getErrors() const
Get the reference to the list of decoding errors.
Definition RawDecoder.h:97
RawErrorType_t decode()
Decode the raw cpv payload stream.
void muteErrors()
mute error reporting
Definition RawDecoder.h:100
const std::vector< o2::cpv::BCRecord > & getBCRecords() const
Get the reference to the BC records.
Definition RawDecoder.h:93
Reader for raw data produced by the Readout application in in-memory format.
RawErrorType_t next()
Read next payload from the stream.
bool hasNext() const
check if more pages are available in the raw file
const o2::header::RDHAny & getRawHeader() const
access to the raw header of the current page
void finaliseCCDB(framework::ConcreteDataMatcher &matcher, void *obj) final
void init(framework::InitContext &ctx) final
Initializing the RawToDigitConverterSpec.
void run(framework::ProcessingContext &ctx) final
Run conversion of raw data to cells.
void snapshot(const Output &spec, T const &object)
A helper class to iteratate over all parts of all input routes.
decltype(auto) get(R binding, int part=0) const
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
constexpr o2::header::DataDescription gDataDescriptionRawData
Definition DataHeader.h:597
o2::framework::DataProcessorSpec getRawToDigitConverterSpec(bool askDISTSTF=true, bool isPedestal=false, bool useBadChannelMap=true, bool useGainCalibration=true)
Creating DataProcessorSpec for the CPV Digit Converter Spec.
@ kOK
NoError.
@ kOFFSET_TO_NEXT_IS_0
@ kOK_NO_PAYLOAD
No payload per ddl (not error)
Lifetime
Possible Lifetime of objects being exchanged by the DPL.
Definition Lifetime.h:18
std::vector< ConfigParamSpec > ccdbParamSpec(std::string const &path, int runDependent, std::vector< CCDBMetadata > metadata={}, int qrate=0)
std::vector< ConfigParamSpec > Options
uint16_t bc
bunch crossing ID of interaction
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::vector< Digit > digits
uint32_t Charge
Bits 18 - 32 : charge.
Definition RawDecoder.h:44
uint32_t Address
Bits 0 - 17 : Address.
Definition RawDecoder.h:43