Project
Loading...
Searching...
No Matches
DataDecoderSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
18
19#include <random>
20#include <iostream>
21#include <fstream>
22#include <chrono>
23#include <stdexcept>
24#include <array>
25#include <functional>
26
32#include "Framework/Lifetime.h"
33#include "Framework/Output.h"
34#include "Framework/Task.h"
37
44
54
55//#define MCH_RAW_DATADECODER_DEBUG_DIGIT_TIME 1
56
57namespace o2
58{
59namespace mch
60{
61namespace raw
62{
63
64using namespace o2;
65using namespace o2::framework;
66using namespace o2::mch::mapping;
68
69//=======================
70// Data decoder
72{
73 public:
74 DataDecoderTask(std::string spec,
75 std::shared_ptr<base::GRPGeomRequest> request) : mInputSpec(spec),
76 mCcdbRequest(request) {}
77
78 //_________________________________________________________________________________________________
80 {
81 SampaChannelHandler channelHandler;
82 RdhHandler rdhHandler;
83
84 auto ds2manu = ic.options().get<bool>("ds2manu");
85 mDebug = ic.options().get<bool>("mch-debug");
86 mCheckROFs = ic.options().get<bool>("check-rofs");
87 mDummyROFs = ic.options().get<bool>("dummy-rofs");
88 auto mapCRUfile = ic.options().get<std::string>("cru-map");
89 auto mapFECfile = ic.options().get<std::string>("fec-map");
90 auto useDummyElecMap = ic.options().get<bool>("dummy-elecmap");
91 mErrorLogFrequency = ic.options().get<int>("error-log-frequency");
92 auto timeRecoModeString = ic.options().get<std::string>("time-reco-mode");
93
95 if (timeRecoModeString == "hbpackets") {
97 } else if (timeRecoModeString == "bcreset") {
99 }
100
101 mDecoder = new DataDecoder(channelHandler, rdhHandler, mapCRUfile, mapFECfile, ds2manu, mDebug,
102 useDummyElecMap, timeRecoMode);
103
104 if (mCcdbRequest) {
106 }
107
108 auto stop = [this]() {
109 LOG(info) << "mch-data-decoder: decoding duration = " << mTimeDecoding.count() * 1000 / mTFcount << " us / TF";
110 LOG(info) << "mch-data-decoder: ROF finder duration = " << mTimeROFFinder.count() * 1000 / mTFcount << " us / TF";
111 };
112 ic.services().get<CallbackService>().set<CallbackService::Id::Stop>(stop);
113 }
114
115 void finaliseCCDB(ConcreteDataMatcher& matcher, void* obj)
116 {
117 if (mCcdbRequest) {
120 LOGP(info, "Setting number of orbits per TF to {}", n);
121 mDecoder->setOrbitsInTF(n);
122 }
123 }
124 //_________________________________________________________________________________________________
125 // the decodeTF() function processes the messages generated by the (sub)TimeFrame builder
127 {
128 const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
129 mFirstTForbit = tinfo.firstTForbit;
130
131 mDecoder->setFirstOrbitInTF(mFirstTForbit);
132
133 if (mDebug) {
134 LOG(info) << "[DataDecoderSpec::run] first TF orbit is " << mFirstTForbit;
135 }
136
137 // get the input buffer
138 auto& inputs = pc.inputs();
139 DPLRawParser parser(inputs, o2::framework::select(mInputSpec.c_str()));
140 bool abort{false};
141 for (auto it = parser.begin(), end = parser.end(); it != end && abort == false; ++it) {
142 auto const* raw = it.raw();
143 if (!raw) {
144 continue;
145 }
146 size_t payloadSize = it.size();
147
148 gsl::span<const std::byte> buffer(reinterpret_cast<const std::byte*>(raw), sizeof(RDH) + payloadSize);
149 bool ok = mDecoder->decodeBuffer(buffer);
150 if (!ok) {
151 LOG(alarm) << "critical decoding error : aborting this TF decoding\n";
152 abort = true;
153 }
154 }
155 }
156
157 //_________________________________________________________________________________________________
158 // the decodeReadout() function processes the messages generated by o2-mch-cru-page-reader-workflow
160 {
161 static int nFrame = 1;
162 // get the input buffer
163 if (input.spec->binding != "readout") {
164 return;
165 }
166
167 auto const* raw = input.payload;
168 // size of payload
169 size_t payloadSize = DataRefUtils::getPayloadSize(input);
170
171 if (mDebug) {
172 std::cout << nFrame << " payloadSize=" << payloadSize << std::endl;
173 }
174 nFrame += 1;
175 if (payloadSize == 0) {
176 return;
177 }
178
179 const RDH* rdh = reinterpret_cast<const RDH*>(raw);
180 mFirstTForbit = o2::raw::RDHUtils::getHeartBeatOrbit(rdh);
181 mDecoder->setFirstOrbitInTF(mFirstTForbit);
182
183 gsl::span<const std::byte> buffer(reinterpret_cast<const std::byte*>(raw), payloadSize);
184 mDecoder->decodeBuffer(buffer);
185 }
186
188 {
189 std::vector<Digit> digits;
190 std::vector<ROFRecord> rofs;
191 std::vector<OrbitInfo> orbits;
192 std::vector<DecoderError> errors;
193 std::vector<HeartBeatPacket> hbpackets;
194 output.snapshot(Output{header::gDataOriginMCH, "DIGITS", 0}, digits);
195 output.snapshot(Output{header::gDataOriginMCH, "DIGITROFS", 0}, rofs);
196 output.snapshot(Output{header::gDataOriginMCH, "ORBITS", 0}, orbits);
197 output.snapshot(Output{header::gDataOriginMCH, "ERRORS", 0}, errors);
198 output.snapshot(Output{header::gDataOriginMCH, "HBPACKETS", 0}, hbpackets);
199 }
200
202 {
207 constexpr auto origin = header::gDataOriginMCH;
208 const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
209 static size_t contDeadBeef = 0; // number of times 0xDEADBEEF was seen continuously
211 for (const auto& ref : o2::framework::InputRecordWalker(pc.inputs(), {dummy})) {
212 auto payloadSize = DataRefUtils::getPayloadSize(ref);
213 if (payloadSize == 0) {
214 const auto* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(pc.inputs().getFirstValid(true));
216 if (++contDeadBeef <= maxWarn) {
217 LOGP(alarm, "Found input [{}/{}/{:#x}] TF#{} 1st_orbit:{} Payload {} : assuming no payload for all links in this TF{}",
218 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, tinfo.tfCounter, tinfo.firstTForbit, payloadSize,
219 contDeadBeef == maxWarn ? fmt::format(". {} such inputs in row received, stopping reporting", contDeadBeef) : "");
220 }
221 return true;
222 }
223 }
224 contDeadBeef = 0; // if good data, reset the counter
225 return false;
226 }
227
228 //_________________________________________________________________________________________________
230 {
231 if (isDroppedTF(pc)) {
233 return;
234 }
235
236 if (mCcdbRequest) {
238 }
239
240 static int32_t deltaMax = 0;
241
242 auto createBuffer = [&](auto& vec, size_t& size) {
243 size = vec.empty() ? 0 : sizeof(*(vec.begin())) * vec.size();
244 char* buf = nullptr;
245 if (size > 0) {
246 buf = (char*)malloc(size);
247 if (buf) {
248 char* p = buf;
249 size_t sizeofElement = sizeof(*(vec.begin()));
250 for (auto& element : vec) {
251 memcpy(p, &element, sizeofElement);
252 p += sizeofElement;
253 }
254 }
255 }
256 return buf;
257 };
258
259 auto tStart = std::chrono::high_resolution_clock::now();
260 mDecoder->reset();
261 for (auto&& input : pc.inputs()) {
262 if (input.spec->binding == "readout") {
263 decodeReadout(input);
264 }
265 if (input.spec->binding == "TF") {
266 decodeTF(pc);
267 }
268 }
269 mDecoder->computeDigitsTime();
270 auto tEnd = std::chrono::high_resolution_clock::now();
271 mTimeDecoding += tEnd - tStart;
272
273 auto& digits = mDecoder->getDigits();
274 auto& orbits = mDecoder->getOrbits();
275 auto& errors = mDecoder->getErrors();
276 auto& hbpackets = mDecoder->getHBPackets();
277
278#ifdef MCH_RAW_DATADECODER_DEBUG_DIGIT_TIME
280 int32_t bcMax = mNHBPerTF * 3564 - 1;
281 for (auto d : digits) {
282 if (d.getTime() < 0 || d.getTime() > bcMax) {
283 int32_t delta = d.getTime() - bcMax;
284 if (delta > deltaMax) {
285 deltaMax = delta;
286 std::cout << "Max digit time exceeded: TF ORBIT " << mDecoder->getFirstOrbitInTF().value() << " DE# " << d.getDetID() << " PadId " << d.getPadID() << " ADC " << d.getADC()
287 << " time " << d.getTime() << " (" << bcMax << ", delta=" << delta << ")" << std::endl;
288 }
289 }
290 }
291#endif
292
293 tStart = std::chrono::high_resolution_clock::now();
294 ROFFinder rofFinder(digits, mFirstTForbit);
295 rofFinder.process(mDummyROFs);
296 tEnd = std::chrono::high_resolution_clock::now();
297 mTimeROFFinder += tEnd - tStart;
298
299 if (mDebug) {
300 rofFinder.dumpOutputDigits();
301 rofFinder.dumpOutputROFs();
302 }
303
304 if (mCheckROFs) {
305 rofFinder.isRofTimeMonotonic();
306 rofFinder.isDigitsTimeAligned();
307 }
308
309 // send the output buffer via DPL
310 size_t digitsSize, rofsSize, orbitsSize, errorsSize, hbSize;
311 char* digitsBuffer = rofFinder.saveDigitsToBuffer(digitsSize);
312 char* rofsBuffer = rofFinder.saveROFRsToBuffer(rofsSize);
313 char* orbitsBuffer = createBuffer(orbits, orbitsSize);
314 char* errorsBuffer = createBuffer(errors, errorsSize);
315 char* hbBuffer = createBuffer(hbpackets, hbSize);
316
317 if (mDebug) {
318 LOGP(info, "digitsSize {} rofsSize {} orbitsSize {} errorsSize {}", digitsSize, rofsSize, orbitsSize, errorsSize);
319 }
320
321 // create the output message
322 auto freefct = [](void* data, void*) { free(data); };
323 pc.outputs().adoptChunk(Output{header::gDataOriginMCH, "DIGITS", 0}, digitsBuffer, digitsSize, freefct, nullptr);
324 pc.outputs().adoptChunk(Output{header::gDataOriginMCH, "DIGITROFS", 0}, rofsBuffer, rofsSize, freefct, nullptr);
325 pc.outputs().adoptChunk(Output{header::gDataOriginMCH, "ORBITS", 0}, orbitsBuffer, orbitsSize, freefct, nullptr);
326 pc.outputs().adoptChunk(Output{header::gDataOriginMCH, "ERRORS", 0}, errorsBuffer, errorsSize, freefct, nullptr);
327 pc.outputs().adoptChunk(Output{header::gDataOriginMCH, "HBPACKETS", 0}, hbBuffer, hbSize, freefct, nullptr);
328
329 mTFcount += 1;
330 if (mErrorLogFrequency) {
331 if (mTFcount == 1 || mTFcount % mErrorLogFrequency == 0) {
332 mDecoder->logErrorMap(mTFcount);
333 }
334 }
335 }
336
337 private:
338 std::string mInputSpec;
339 bool mDebug = {false};
340 bool mCheckROFs = {false};
341 bool mDummyROFs = {false};
342 uint32_t mFirstTForbit{0};
343 DataDecoder* mDecoder = {nullptr};
344
345 uint32_t mTFcount{0};
346 uint32_t mErrorLogFrequency;
347
348 std::chrono::duration<double, std::milli> mTimeDecoding{};
349 std::chrono::duration<double, std::milli> mTimeROFFinder{};
350 std::shared_ptr<base::GRPGeomRequest> mCcdbRequest;
351};
352
353//_________________________________________________________________________________________________
355 bool askSTFDist)
356{
357 auto inputs = o2::framework::select(inputSpec.c_str());
358 if (askSTFDist) {
359 // request the input FLP/DISTSUBTIMEFRAME/0 that is _guaranteed_
360 // to be present, even if none of our raw data is present.
361 inputs.emplace_back("stfDist", "FLP", "DISTSUBTIMEFRAME", 0, o2::framework::Lifetime::Timeframe);
362 }
363 auto ccdbRequest = std::make_shared<o2::base::GRPGeomRequest>(false, // orbitResetTime
364 true, // GRPECS=true
365 false, // GRPLHCIF
366 false, // GRPMagField
367 false, // askMatLUT
369 inputs);
370 o2::mch::raw::DataDecoderTask task(inputSpec, ccdbRequest);
371 return DataProcessorSpec{
372 specName,
373 inputs,
374 Outputs{OutputSpec{header::gDataOriginMCH, "DIGITS", 0, Lifetime::Timeframe},
375 OutputSpec{header::gDataOriginMCH, "DIGITROFS", 0, Lifetime::Timeframe},
376 OutputSpec{header::gDataOriginMCH, "ORBITS", 0, Lifetime::Timeframe},
377 OutputSpec{header::gDataOriginMCH, "HBPACKETS", 0, Lifetime::Timeframe},
378 OutputSpec{header::gDataOriginMCH, "ERRORS", 0, Lifetime::Timeframe}},
379 AlgorithmSpec{adaptFromTask<DataDecoderTask>(std::move(task))},
380 Options{{"mch-debug", VariantType::Bool, false, {"enable verbose output"}},
381 {"cru-map", VariantType::String, "", {"custom CRU mapping"}},
382 {"fec-map", VariantType::String, "", {"custom FEC mapping"}},
383 {"dummy-elecmap", VariantType::Bool, false, {"use dummy electronic mapping (for debug, temporary)"}},
384 {"ds2manu", VariantType::Bool, false, {"convert channel numbering from Run3 to Run1-2 order"}},
385 {"time-reco-mode", VariantType::String, "bcreset", {"digit time reconstruction method [hbpackets, bcreset]"}},
386 {"check-rofs", VariantType::Bool, false, {"perform consistency checks on the output ROFs"}},
387 {"dummy-rofs", VariantType::Bool, false, {"disable the ROFs finding algorithm"}},
388 {"error-log-frequency", VariantType::Int, 6000, {"log the error map at this frequency (in TF unit) (first TF is always logged, unless frequency is zero)"}}}};
389}
390
391} // namespace raw
392} // namespace mch
393} // end namespace o2
A raw page parser for DPL input.
Definition of the decoder for the MCH data.
Helper for geometry and GRP related CCDB requests.
A helper class to iteratate over all parts of all input routes.
Header to collect LHC related constants.
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
Definition of the RAW Data Header.
Class to group the fired pads according to their time stamp.
int ds2manu(int deId, int ch)
const char * specName
void checkUpdates(o2::framework::ProcessingContext &pc)
bool finaliseCCDB(o2::framework::ConcreteDataMatcher &matcher, void *obj)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
The parser handles transparently input in the format of raw pages.
const_iterator end() const
const_iterator begin() const
void adoptChunk(const Output &, char *, size_t, fair::mq::FreeFn *, void *)
ServiceRegistryRef services()
Definition InitContext.h:34
ConfigParamRegistry const & options()
Definition InitContext.h:33
A helper class to iteratate over all parts of all input routes.
DataRef getFirstValid(bool throwOnFailure=false) const
Get the ref of the first valid input. If requested, throw an error if none is found.
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void decodeReadout(const o2::framework::DataRef &input)
void run(framework::ProcessingContext &pc)
DataDecoderTask(std::string spec, std::shared_ptr< base::GRPGeomRequest > request)
void finaliseCCDB(ConcreteDataMatcher &matcher, void *obj)
void decodeTF(framework::ProcessingContext &pc)
bool isDroppedTF(framework::ProcessingContext &pc)
void init(framework::InitContext &ic)
void sendEmptyOutput(framework::DataAllocator &output)
void logErrorMap(int tfcount) const
send all messages from our error map to the infologger
void setOrbitsInTF(uint32_t nofOrbitsPerTF)
const std::vector< o2::mch::HeartBeatPacket > & getHBPackets() const
Get the list of heart-beat packets that have been found in the current TimeFrame.
const std::vector< o2::mch::DecoderError > & getErrors() const
Get the list of decoding errors that have been found in the current TimeFrame.
void setFirstOrbitInTF(uint32_t orbit)
const RawDigitVector & getDigits() const
Get the vector of digits that have been decoded in the current TimeFrame.
const std::unordered_set< OrbitInfo, OrbitInfoHash > & getOrbits() const
Get the list of orbits that have been found in the current TimeFrame for each CRU link.
bool decodeBuffer(gsl::span< const std::byte > buf)
char * saveROFRsToBuffer(size_t &bufSize)
char * saveDigitsToBuffer(size_t &bufSize)
void process(bool dummyROFs=false)
Definition ROFFinder.cxx:51
GLdouble n
Definition glcorearb.h:1982
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLboolean * data
Definition glcorearb.h:298
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
constexpr o2::header::DataOrigin gDataOriginMCH
Definition DataHeader.h:571
constexpr o2::header::DataDescription gDataDescriptionRawData
Definition DataHeader.h:597
constexpr int LHCMaxBunches
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< ConfigParamSpec > Options
std::vector< InputSpec > select(char const *matcher="")
std::vector< OutputSpec > Outputs
o2::framework::DataProcessorSpec getDecodingSpec(const char *specName="mch-data-decoder", std::string inputSpec="TF:MCH/RAWDATA", bool askDISTSTF=false)
std::function< void(o2::header::RDHAny *)> RdhHandler
Definition DataDecoder.h:41
std::function< void(DsElecId dsId, DualSampaChannelId channel, SampaCluster)> SampaChannelHandler
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
const char * payload
Definition DataRef.h:28
const InputSpec * spec
Definition DataRef.h:26
std::string binding
A mnemonic name for the input spec.
Definition InputSpec.h:66
const bool useDummyElecMap
std::vector< std::byte > createBuffer(gsl::span< std::string > data, uint32_t orbit=12345, uint16_t bc=678)
std::vector< o2::ctf::BufferType > vec
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::vector< Digit > digits