Project
Loading...
Searching...
No Matches
CompressedDecodingTask.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
20
27#include "Framework/Logger.h"
33#include "TOFBase/Utils.h"
34
35using namespace o2::framework;
36
37namespace o2
38{
39namespace tof
40{
41
43
45{
46 LOG(debug) << "CompressedDecoding init";
47
48 if (mNorbitsPerTF == -1) {
50 }
51
52 mMaskNoise = ic.options().get<bool>("mask-noise");
53 mNoiseRate = ic.options().get<int>("noise-counts");
54 mRowFilter = ic.options().get<bool>("row-filter");
55
56 if (mMaskNoise) {
57 mDecoder.maskNoiseRate(mNoiseRate);
58 } else {
59 mDecoder.maskNoiseRate(-mNoiseRate); // negative means -> flag but not filter
60 }
61
62 auto finishFunction = [this]() {
63 LOG(debug) << "CompressedDecoding finish";
64 };
65 ic.services().get<CallbackService>().set<CallbackService::Id::Stop>(finishFunction);
66 mTimer.Stop();
67 mTimer.Reset();
68}
69
71{
72 mHasToBePosted = false;
73 mDecoder.fillWindows();
74 mDecoder.fillDiagnosticFrequency();
75
76 // send output message
77 std::vector<o2::tof::Digit>* alldigits = mDecoder.getDigitPerTimeFrame();
78 std::vector<o2::tof::ReadoutWindowData>* row = mDecoder.getReadoutWindowData();
79 if (mRowFilter) {
81 }
82
83 ReadoutWindowData* last = nullptr;
85 int lastval = 0;
86 if (!row->empty()) {
87 last = &row->back();
88 lastval = last->first() + last->size();
89 lastIR = last->mFirstIR;
90 }
91
92 /*
93 int nwindowperTF = o2::tof::Utils::getNOrbitInTF() * 3;
94 while (row->size() < nwindowperTF) {
95 // complete timeframe with empty readout windows
96 auto& dummy = row->emplace_back(lastval, 0);
97 dummy.mFirstIR = lastIR;
98 }
99 while (row->size() > nwindowperTF) {
100 // remove extra readout windows after a check they are empty
101 row->pop_back();
102 }
103*/
104
105 int n_tof_window = row->size();
106 int n_orbits = n_tof_window / 3;
107 int digit_size = alldigits->size();
108
109 // LOG(info) << "TOF: N tof window decoded = " << n_tof_window << "(orbits = " << n_orbits << ") with " << digit_size << " digits";
110
111 // add digits in the output snapshot
112 pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "DIGITS", 0}, *alldigits);
113 pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "READOUTWINDOW", 0}, *row);
114
115 std::vector<uint8_t>& patterns = mDecoder.getPatterns();
116
117 pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "PATTERNS", 0}, patterns);
118
119 std::vector<uint64_t>& errors = mDecoder.getErrors();
120 pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "ERRORS", 0}, errors);
121
122 DigitHeader& digitH = mDecoder.getDigitHeader();
123 pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "DIGITHEADER", 0}, digitH);
124
125 auto diagnosticFrequency = mDecoder.getDiagnosticFrequency();
126 diagnosticFrequency.setTimeStamp(mCreationTime / 1000);
127 // add TFIDInfo
130 diagnosticFrequency.setTFIDInfo(tfinfo);
131
132 //diagnosticFrequency.print();
133 pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "DIAFREQ", 0}, diagnosticFrequency);
134
135 mDecoder.clear();
136
137 mNTF++;
138 mNCrateOpenTF = 0;
139 mNCrateCloseTF = 0;
140}
141
143{
144 mTimer.Start(false);
145
146 int norbits = mNorbitsPerTF;
147 if (norbits == -1) {
150 mNorbitsPerTF = norbits;
151 }
152 mDecoder.setNOrbitInTF(norbits);
153
154 static bool isFirstCall = true;
155 if (isFirstCall) {
156 isFirstCall = false;
157 LOG(info) << "N orbits per TF set to " << norbits;
158 } else {
159 LOG(debug) << "N orbits per TF set to " << norbits;
160 }
161
162 mCreationTime = std::chrono::high_resolution_clock::now().time_since_epoch().count() / 1000000;
163
164 //RS set the 1st orbit of the TF from the O2 header, relying on rdhHandler is not good (in fact, the RDH might be eliminated in the derived data)
165 mInitOrbit = pc.services().get<o2::framework::TimingInfo>().firstTForbit;
166 if (!mConetMode) {
167 mDecoder.setFirstIR({0, mInitOrbit});
168 }
169
170 decodeTF(pc);
171
172 if (!mConetMode) {
173 mHasToBePosted = true;
174 } else if (mNCrateOpenTF == mNCrateCloseTF) {
175 mHasToBePosted = true;
176 }
177
178 if (mHasToBePosted) {
179 postData(pc);
180 }
181 mTimer.Stop();
182}
183
185{
186 LOGF(debug, "TOF CompressedDecoding total timing: Cpu: %.3e Real: %.3e s in %d slots",
187 mTimer.CpuTime(), mTimer.RealTime(), mTimer.Counter() - 1);
188}
189
191{
192 auto& inputs = pc.inputs();
193 const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
194
195 // if we see requested data type input with 0xDEADBEEF subspec and 0 payload this means that the "delayed message"
196 // mechanism created it in absence of real data from upstream. Processor should send empty output to not block the workflow
197 if (!mLocalCmp) {
198 static size_t contDeadBeef = 0; // number of times 0xDEADBEEF was seen continuously
199 std::vector<InputSpec> dummy{InputSpec{"dummy", ConcreteDataMatcher{"TOF", mDataDesc, 0xDEADBEEF}}};
200 for (const auto& ref : InputRecordWalker(inputs, dummy)) {
201 const auto dh = o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
202 auto payloadSize = DataRefUtils::getPayloadSize(ref);
203 if (payloadSize == 0) {
205 if (++contDeadBeef <= maxWarn) {
206 LOGP(alarm, "Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : assuming no payload for all links in this TF{}",
207 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, tinfo.tfCounter, tinfo.firstTForbit, payloadSize,
208 contDeadBeef == maxWarn ? fmt::format(". {} such inputs in row received, stopping reporting", contDeadBeef) : "");
209 }
210 return;
211 }
212 }
213 contDeadBeef = 0; // if good data, reset the counter
214 }
217 std::vector<InputSpec> sel{InputSpec{"filter", ConcreteDataTypeMatcher{"TOF", mDataDesc.str}}};
218 for (const auto& ref : InputRecordWalker(pc.inputs(), sel)) {
219 // for (auto const& ref : o2::framework::InputRecordWalker(pc.inputs())) {
220 // for (auto iit = pc.inputs().begin(), iend = pc.inputs().end(); iit != iend; ++iit) {
221 // if (!iit.isValid()) {
222 // continue;
223 // }
224
226 // for (auto const& ref : iit) {
227 const auto* headerIn = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
228 auto payloadIn = ref.payload;
229 auto payloadInSize = DataRefUtils::getPayloadSize(ref);
230
231 if (payloadInSize < 1) {
232 continue;
233 }
234
235 // LOG(info) << "payloadInSize = " << payloadInSize;
236
240 // }
241 }
242}
243
244void CompressedDecodingTask::headerHandler(const CrateHeader_t* crateHeader, const CrateOrbit_t* crateOrbit)
245{
246 if (mConetMode) {
247 if (mNCrateOpenTF == 0) {
248 mInitOrbit = crateOrbit->orbitID;
249 mDecoder.setFirstIR({0, mInitOrbit});
250 }
251
252 LOG(debug) << "Crate found" << crateHeader->drmID;
253 mNCrateOpenTF++;
254 }
255}
256void CompressedDecodingTask::trailerHandler(const CrateHeader_t* crateHeader, const CrateOrbit_t* crateOrbit,
257 const CrateTrailer_t* crateTrailer, const Diagnostic_t* diagnostics,
258 const Error_t* errors)
259{
260 if (mConetMode) {
261 LOG(debug) << "Crate closed " << crateHeader->drmID;
262 mNCrateCloseTF++;
263 }
264
265 if (mCurrentOrbit > 0) {
266 mDecoder.addCrateHeaderData(mCurrentOrbit, crateHeader->drmID, crateHeader->bunchID, crateTrailer->eventCounter);
267 } else {
268 mDecoder.addCrateHeaderData(crateOrbit->orbitID, crateHeader->drmID, crateHeader->bunchID, crateTrailer->eventCounter);
269 }
270
271 // Diagnostics used to fill digit patterns
272 auto numberOfDiagnostics = crateTrailer->numberOfDiagnostics;
273 auto numberOfErrors = crateTrailer->numberOfErrors;
274 for (int i = 0; i < numberOfDiagnostics; i++) {
275 const uint32_t* val = reinterpret_cast<const uint32_t*>(&(diagnostics[i]));
276 if (mCurrentOrbit > 0) {
277 mDecoder.addPattern(*val, crateHeader->drmID, mCurrentOrbit, crateHeader->bunchID); // take orbit from crateHeader instead of crateOrbit (it can be wrong, check also for digits!)
278 } else {
279 mDecoder.addPattern(*val, crateHeader->drmID, crateOrbit->orbitID, crateHeader->bunchID);
280 }
281
282 /*
283 int islot = (*val & 15);
284 if (islot == 1) {
285 if (o2::tof::diagnostic::DRM_HEADER_MISSING & *val) {
286 printf("DRM_HEADER_MISSING\n");
287 }
288 if (o2::tof::diagnostic::DRM_TRAILER_MISSING & *val) {
289 printf("DRM_TRAILER_MISSING\n");
290 }
291 if (o2::tof::diagnostic::DRM_FEEID_MISMATCH & *val) {
292 printf("DRM_FEEID_MISMATCH\n");
293 }
294 if (o2::tof::diagnostic::DRM_ORBIT_MISMATCH & *val) {
295 printf("DRM_ORBIT_MISMATCH\n");
296 }
297 if (o2::tof::diagnostic::DRM_CRC_MISMATCH & *val) {
298 printf("DRM_CRC_MISMATCH\n");
299 }
300 if (o2::tof::diagnostic::DRM_ENAPARTMASK_DIFFER & *val) {
301 printf("DRM_ENAPARTMASK_DIFFER\n");
302 }
303 if (o2::tof::diagnostic::DRM_CLOCKSTATUS_WRONG & *val) {
304 printf("DRM_CLOCKSTATUS_WRONG\n");
305 }
306 if (o2::tof::diagnostic::DRM_FAULTSLOTMASK_NOTZERO & *val) {
307 printf("DRM_FAULTSLOTMASK_NOTZERO\n");
308 }
309 if (o2::tof::diagnostic::DRM_READOUTTIMEOUT_NOTZERO & *val) {
310 printf("DRM_READOUTTIMEOUT_NOTZERO\n");
311 }
312 if (o2::tof::diagnostic::DRM_EVENTWORDS_MISMATCH & *val) {
313 printf("DRM_EVENTWORDS_MISMATCH\n");
314 }
315 if (o2::tof::diagnostic::DRM_MAXDIAGNOSTIC_BIT & *val) {
316 printf("DRM_MAXDIAGNOSTIC_BIT\n");
317 }
318 } else if (islot == 2) {
319 if (o2::tof::diagnostic::LTM_HEADER_MISSING & *val) {
320 printf("LTM_HEADER_MISSING\n");
321 }
322 if (o2::tof::diagnostic::LTM_TRAILER_MISSING & *val) {
323 printf("LTM_TRAILER_MISSING\n");
324 }
325 if (o2::tof::diagnostic::LTM_HEADER_UNEXPECTED & *val) {
326 printf("LTM_HEADER_UNEXPECTED\n");
327 }
328 if (o2::tof::diagnostic::LTM_MAXDIAGNOSTIC_BIT & *val) {
329 printf("LTM_MAXDIAGNOSTIC_BIT\n");
330 }
331 } else if (islot < 13) {
332 if (o2::tof::diagnostic::TRM_HEADER_MISSING & *val) {
333 printf("TRM_HEADER_MISSING\n");
334 }
335 if (o2::tof::diagnostic::TRM_TRAILER_MISSING & *val) {
336 printf("TRM_TRAILER_MISSING\n");
337 }
338 if (o2::tof::diagnostic::TRM_CRC_MISMATCH & *val) {
339 printf("TRM_CRC_MISMATCH\n");
340 }
341 if (o2::tof::diagnostic::TRM_HEADER_UNEXPECTED & *val) {
342 printf("TRM_HEADER_UNEXPECTED\n");
343 }
344 if (o2::tof::diagnostic::TRM_EVENTCNT_MISMATCH & *val) {
345 printf("TRM_EVENTCNT_MISMATCH\n");
346 }
347 if (o2::tof::diagnostic::TRM_EMPTYBIT_NOTZERO & *val) {
348 printf("TRM_EMPTYBIT_NOTZERO\n");
349 }
350 if (o2::tof::diagnostic::TRM_LBIT_NOTZERO & *val) {
351 printf("TRM_LBIT_NOTZERO\n");
352 }
353 if (o2::tof::diagnostic::TRM_FAULTSLOTBIT_NOTZERO & *val) {
354 printf("TRM_FAULTSLOTBIT_NOTZERO\n");
355 }
356 if (o2::tof::diagnostic::TRM_EVENTWORDS_MISMATCH & *val) {
357 printf("TRM_EVENTWORDS_MISMATCH\n");
358 }
359 if (o2::tof::diagnostic::TRM_DIAGNOSTIC_SPARE1 & *val) {
360 printf("TRM_DIAGNOSTIC_SPARE1\n");
361 }
362 if (o2::tof::diagnostic::TRM_DIAGNOSTIC_SPARE2 & *val) {
363 printf("TRM_DIAGNOSTIC_SPARE2\n");
364 }
365 if (o2::tof::diagnostic::TRM_DIAGNOSTIC_SPARE3 & *val) {
366 printf("TRM_DIAGNOSTIC_SPARE3\n");
367 }
368 if (o2::tof::diagnostic::TRM_MAXDIAGNOSTIC_BIT & *val) {
369 printf("TRM_MAXDIAGNOSTIC_BIT\n");
370 }
371
372 if (o2::tof::diagnostic::TRMCHAIN_HEADER_MISSING & *val) {
373 printf("TRMCHAIN_HEADER_MISSING\n");
374 }
375 if (o2::tof::diagnostic::TRMCHAIN_TRAILER_MISSING & *val) {
376 printf("TRMCHAIN_TRAILER_MISSING\n");
377 }
378 if (o2::tof::diagnostic::TRMCHAIN_STATUS_NOTZERO & *val) {
379 printf("TRMCHAIN_STATUS_NOTZERO\n");
380 }
381 if (o2::tof::diagnostic::TRMCHAIN_EVENTCNT_MISMATCH & *val) {
382 printf("TRMCHAIN_EVENTCNT_MISMATCH\n");
383 }
384 if (o2::tof::diagnostic::TRMCHAIN_TDCERROR_DETECTED & *val) {
385 printf("TRMCHAIN_TDCERROR_DETECTED\n");
386 }
387 if (o2::tof::diagnostic::TRMCHAIN_BUNCHCNT_MISMATCH & *val) {
388 printf("TRMCHAIN_BUNCHCNT_MISMATCH\n");
389 }
390 if (o2::tof::diagnostic::TRMCHAIN_DIAGNOSTIC_SPARE1 & *val) {
391 printf("TRMCHAIN_DIAGNOSTIC_SPARE1\n");
392 }
393 if (o2::tof::diagnostic::TRMCHAIN_DIAGNOSTIC_SPARE2 & *val) {
394 printf("TRMCHAIN_DIAGNOSTIC_SPARE2\n");
395 }
396 if (o2::tof::diagnostic::TRMCHAIN_MAXDIAGNOSTIC_BIT & *val) {
397 printf("TRMCHAIN_MAXDIAGNOSTIC_BIT\n");
398 }
399 }
400 printf("------\n");
401 */
402 }
403
404 for (int i = 0; i < numberOfErrors; i++) {
405 const uint32_t* val = reinterpret_cast<const uint32_t*>(&(errors[i]));
406 mDecoder.addError(*val, crateHeader->drmID);
407 }
408}
409
410void CompressedDecodingTask::rdhHandler(const o2::header::RAWDataHeader* rdh)
411{
412 const auto& rdhr = *rdh;
413 // set first orbtìt here (to be check in future), please not remove this!!!
414 mCurrentOrbit = RDHUtils::getHeartBeatOrbit(rdhr);
415
416 // rdh close
417 if (RDHUtils::getStop(rdhr) && RDHUtils::getHeartBeatOrbit(rdhr) == o2::tof::Utils::getNOrbitInTF() - 1 + mInitOrbit) {
418 mNCrateCloseTF++;
419 return;
420 }
421
422 // rdh open
423 if ((RDHUtils::getPageCounter(rdhr) == 0) && (RDHUtils::getTriggerType(rdhr) & o2::trigger::TF)) {
424 mNCrateOpenTF++;
425 }
426};
427
428void CompressedDecodingTask::frameHandler(const CrateHeader_t* crateHeader, const CrateOrbit_t* crateOrbit,
429 const FrameHeader_t* frameHeader, const PackedHit_t* packedHits)
430{
431 for (int i = 0; i < frameHeader->numberOfHits; ++i) {
432 auto packedHit = packedHits + i;
433 if (mCurrentOrbit > 0) {
434 mDecoder.InsertDigit(crateHeader->drmID, frameHeader->trmID, packedHit->tdcID, packedHit->chain, packedHit->channel, mCurrentOrbit, crateHeader->bunchID, frameHeader->frameID << 13, packedHit->time, packedHit->tot);
435 } else {
436 mDecoder.InsertDigit(crateHeader->drmID, frameHeader->trmID, packedHit->tdcID, packedHit->chain, packedHit->channel, crateOrbit->orbitID, crateHeader->bunchID, frameHeader->frameID << 13, packedHit->time, packedHit->tot);
437 }
438 }
439};
440
441DataProcessorSpec getCompressedDecodingSpec(const std::string& inputDesc, bool conet, bool askDISTSTF, int norbitPerTF, bool localCmp)
442{
443 std::vector<InputSpec> inputs;
444 if (askDISTSTF) {
445 inputs.emplace_back("stdDist", "FLP", "DISTSUBTIMEFRAME", 0, Lifetime::Timeframe);
446 }
447
448 std::shared_ptr<o2::base::GRPGeomRequest> ccdbRequest;
449 if (norbitPerTF == -1) {
450 ccdbRequest = std::make_shared<o2::base::GRPGeomRequest>(false, // orbitResetTime
451 norbitPerTF == -1, // GRPECS=true for nHBF per TF
452 false, // GRPLHCIF
453 false, // GRPMagField
454 false, // askMatLUT
456 inputs);
457 }
458
459 // inputs.emplace_back(std::string("x:TOF/" + inputDesc).c_str(), 0, Lifetime::Optional);
461 dataDesc.runtimeInit(inputDesc.c_str());
462 inputs.emplace_back("x", ConcreteDataTypeMatcher{o2::header::gDataOriginTOF, dataDesc}, Lifetime::Timeframe);
463 std::vector<OutputSpec> outputs;
464 outputs.emplace_back(o2::header::gDataOriginTOF, "DIGITHEADER", 0, Lifetime::Timeframe);
465 outputs.emplace_back(o2::header::gDataOriginTOF, "DIGITS", 0, Lifetime::Timeframe);
466 outputs.emplace_back(o2::header::gDataOriginTOF, "READOUTWINDOW", 0, Lifetime::Timeframe);
467 outputs.emplace_back(o2::header::gDataOriginTOF, "PATTERNS", 0, Lifetime::Timeframe);
468 outputs.emplace_back(o2::header::gDataOriginTOF, "ERRORS", 0, Lifetime::Timeframe);
469 outputs.emplace_back(o2::header::gDataOriginTOF, "DIAFREQ", 0, Lifetime::Timeframe);
470
471 return DataProcessorSpec{
472 "tof-compressed-decoder",
473 inputs,
474 // select(std::string("x:TOF/" + inputDesc).c_str()),
475 outputs,
476 AlgorithmSpec{adaptFromTask<CompressedDecodingTask>(conet, dataDesc, ccdbRequest, norbitPerTF, localCmp)},
477 Options{
478 {"row-filter", VariantType::Bool, false, {"Filter empty row"}},
479 {"mask-noise", VariantType::Bool, false, {"Flag to mask noisy digits"}},
480 {"noise-counts", VariantType::Int, 11, {"Counts in a single (TF) payload"}}}};
481}
482
483} // namespace tof
484} // namespace o2
TOF compressed data decoding task.
int32_t i
Header of the General Run Parameters object.
A helper class to iteratate over all parts of all input routes.
Definition of the RAW Data Header.
std::ostringstream debug
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
void snapshot(const Output &spec, T const &object)
ServiceRegistryRef services()
Definition InitContext.h:34
ConfigParamRegistry const & options()
Definition InitContext.h:33
A helper class to iteratate over all parts of all input routes.
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void postData(ProcessingContext &pc)
void endOfStream(EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void setTimeStamp(int val)
Definition Diagnostic.h:91
static uint32_t getNOrbitInTF()
Definition Utils.h:72
void addCrateHeaderData(unsigned long orbit, int crate, int32_t bc, uint32_t eventCounter)
std::vector< uint8_t > & getPatterns()
Diagnostic & getDiagnosticFrequency()
void addPattern(const uint32_t val, int icrate, int orbit, int bc)
std::vector< Digit > * getDigitPerTimeFrame()
DigitHeader & getDigitHeader()
void setNOrbitInTF(uint32_t norb)
void maskNoiseRate(int val)
void setFirstIR(const o2::InteractionRecord &ir)
std::vector< ReadoutWindowData > * getReadoutWindowDataFiltered()
std::vector< ReadoutWindowData > * getReadoutWindowData()
std::vector< uint64_t > & getErrors()
Definition Decoder.h:75
void InsertDigit(int icrate, int itrm, int itdc, int ichain, int channel, uint32_t orbit, uint16_t bunchid, int time_ext, int tdc, int tot)
Definition Decoder.cxx:126
void addError(const uint32_t val, int icrate)
Definition Decoder.h:76
GLuint GLfloat * val
Definition glcorearb.h:1582
constexpr o2::header::DataOrigin gDataOriginTOF
Definition DataHeader.h:575
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< ConfigParamSpec > Options
framework::DataProcessorSpec getCompressedDecodingSpec(const std::string &inputDesc, bool conet=false, bool askDISTSTF=true, int norbitPerTF=-1, bool localCmp=false)
constexpr uint32_t TF
Definition Triggers.h:37
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
static void fillTFIDInfo(o2::framework::ProcessingContext &pc, o2::dataformats::TFIDInfo &ti)
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
void runtimeInit(const char *string, short length=-1)
Definition DataHeader.h:261
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::vector< int > row
int norbits