Project
Loading...
Searching...
No Matches
pedestal-decoding-workflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <random>
13#include <iostream>
14#include <fstream>
15#include <stdexcept>
16#include <array>
17#include <functional>
18#include <chrono>
19
24#include "Framework/Lifetime.h"
25#include "Framework/Output.h"
26#include "Framework/Task.h"
31
32#include "Headers/RDHAny.h"
34
38
42
46
48
50
51static const size_t SOLAR_ID_MAX = 100 * 8;
52
53namespace o2
54{
55namespace mch
56{
57namespace raw
58{
59
60using namespace o2;
61using namespace o2::framework;
62using namespace o2::mch::mapping;
64
65static std::string readFileContent(std::string& filename)
66{
67 std::string content;
68 std::string s;
69 std::ifstream in(filename);
70 while (std::getline(in, s)) {
71 content += s;
72 content += "\n";
73 }
74 return content;
75};
76
77static bool isValidDeID(int deId)
78{
79 for (auto id : o2::mch::constants::deIdsForAllMCH) {
80 if (id == deId) {
81 return true;
82 }
83 }
84
85 return false;
86}
87
88static void patchPage(gsl::span<const std::byte> rdhBuffer, bool verbose)
89{
90 static int sNrdhs = 0;
91 auto& rdhAny = *reinterpret_cast<RDH*>(const_cast<std::byte*>(&(rdhBuffer[0])));
92 sNrdhs++;
93
94 auto existingFeeId = o2::raw::RDHUtils::getFEEID(rdhAny);
95 if (existingFeeId == 0) {
96 // early versions of raw data did not set the feeId
97 // which we need to select the right decoder
98 auto cruId = o2::raw::RDHUtils::getCRUID(rdhAny) & 0xFF;
99 auto flags = o2::raw::RDHUtils::getCRUID(rdhAny) & 0xFF00;
100 auto endpoint = o2::raw::RDHUtils::getEndPointID(rdhAny);
101 uint32_t feeId = cruId * 2 + endpoint + flags;
102 o2::raw::RDHUtils::setFEEID(rdhAny, feeId);
103 }
104
105 if (verbose) {
106 std::cout << "RDH number " << sNrdhs << "--\n";
108 }
109};
110
111//=======================
112// Data decoder
114{
115 public:
116 PedestalsTask(std::string spec) : mInputSpec(spec) {}
117
118 void initElec2DetMapper(std::string filename)
119 {
120 LOG(info) << "[initElec2DetMapper] filename=" << filename;
121 if (filename.empty()) {
123 } else {
124 ElectronicMapperString::sFecMap = readFileContent(filename);
126 }
127 };
128
130 {
131 LOG(info) << "[initFee2SolarMapper] filename=" << filename;
132 if (filename.empty()) {
134 } else {
135 ElectronicMapperString::sCruMap = readFileContent(filename);
137 }
138 };
139
140 //_________________________________________________________________________________________________
142 {
143 mDebug = ic.options().get<bool>("mch-debug");
144 mLoggingInterval = ic.options().get<int>("logging-interval") * 1000;
145
146 auto mapCRUfile = ic.options().get<std::string>("cru-map");
147 auto mapFECfile = ic.options().get<std::string>("fec-map");
148 initFee2SolarMapper(mMapCRUfile);
149 initElec2DetMapper(mMapFECfile);
150 auto stop = [this]() {
151 if (mTFcount > 0) {
152 LOG(info) << "time spent for decoding (ms): min=" << mTimeDecoderMin->count() << ", max="
153 << mTimeDecoderMax->count() << ", mean=" << mTimeDecoder.count() / mTFcount;
154 }
155 };
156 ic.services().get<CallbackService>().set<CallbackService::Id::Stop>(stop);
157 ic.services().get<CallbackService>().set<CallbackService::Id::Reset>([this]() { reset(); });
158 }
159
160 //_________________________________________________________________________________________________
161 void reset()
162 {
163 mDigits.clear();
164 mHBPackets.clear();
165 mErrors.clear();
166 }
167
168 //_________________________________________________________________________________________________
169 void decodePage(gsl::span<const std::byte> page)
170 {
171 static int Nrdhs = 0;
172 size_t ndigits{0};
173
174 uint32_t orbit;
175
176 auto tStart = std::chrono::high_resolution_clock::now();
177
178 auto heartBeatHandler = [&](DsElecId dsElecId, uint8_t chip, uint32_t bunchCrossing) {
179 auto ds = dsElecId.elinkId();
180 auto solar = dsElecId.solarId();
181
182 if (mDebug) {
183 auto s = asString(dsElecId);
184 LOGP(info, "HeartBeat: {}-CHIP{}", s, chip);
185 }
186
187 mHBPackets.emplace_back(solar, ds, chip, bunchCrossing);
188 };
189
190 auto channelHandler = [&](DsElecId dsElecId, uint8_t channel, o2::mch::raw::SampaCluster sc) {
191 auto solarId = dsElecId.solarId();
192 auto dsId = dsElecId.elinkId();
193 if (mDebug) {
194 auto s = asString(dsElecId);
195 LOGP(info, "Digit: {}-CH{}", s, (int)channel);
196 }
197
198 mDigits.emplace_back(o2::mch::calibration::PedestalDigit(solarId, dsId, channel, sc.bunchCrossing, 0, sc.samples));
199 ++ndigits;
200 };
201
202 auto errorHandler = [&](DsElecId dsElecId, int8_t chip, uint32_t error) {
203 auto solarId = dsElecId.solarId();
204 auto dsId = dsElecId.elinkId();
205
206 mErrors.emplace_back(o2::mch::DecoderError(solarId, dsId, chip, error));
207 };
208
209 patchPage(page, mDebug);
210
211 if (mDebug) {
212 auto& rdhAny = *reinterpret_cast<RDH*>(const_cast<std::byte*>(&(page[0])));
213 Nrdhs += 1;
214 LOGP(info, "{}--", Nrdhs);
216 }
217
218 if (!mDecoder) {
219 DecodedDataHandlers handlers;
220 handlers.sampaChannelHandler = channelHandler;
221 handlers.sampaHeartBeatHandler = heartBeatHandler;
222 handlers.sampaErrorHandler = errorHandler;
223 mDecoder = mFee2Solar ? o2::mch::raw::createPageDecoder(page, handlers, mFee2Solar)
224 : o2::mch::raw::createPageDecoder(page, handlers);
225 }
226 try {
227 mDecoder(page);
228 } catch (std::exception& e) {
229 LOGP(error, "{}", e.what());
230 }
231 }
232
233 //_________________________________________________________________________________________________
234 void decodeBuffer(gsl::span<const std::byte> buf)
235 {
236 if (mDebug) {
237 LOGP(info, "\n\n============================\nStart of new buffer");
238 }
239 size_t bufSize = buf.size();
240 size_t pageStart = 0;
241 while (bufSize > pageStart) {
242 RDH* rdh = reinterpret_cast<RDH*>(const_cast<std::byte*>(&(buf[pageStart])));
243 auto rdhVersion = o2::raw::RDHUtils::getVersion(rdh);
244 auto rdhHeaderSize = o2::raw::RDHUtils::getHeaderSize(rdh);
245 if (rdhHeaderSize != 64) {
246 break;
247 }
248 auto pageSize = o2::raw::RDHUtils::getOffsetToNext(rdh);
249
250 gsl::span<const std::byte> page(reinterpret_cast<const std::byte*>(rdh), pageSize);
251 decodePage(page);
252
253 pageStart += pageSize;
254 }
255 }
256
257 //_________________________________________________________________________________________________
258 // the decodeTF() function processes the messages generated by the (sub)TimeFrame builder
260 {
261 // get the input buffer
262 auto& inputs = pc.inputs();
263 std::vector<InputSpec> filter = {{"check", ConcreteDataTypeMatcher{o2::header::gDataOriginMCH, "RAWDATA"}, Lifetime::Timeframe}};
264 DPLRawParser parser(inputs, filter);
265
266 auto tStart = std::chrono::high_resolution_clock::now();
267 size_t totPayloadSize = 0;
268 for (auto it = parser.begin(), end = parser.end(); it != end; ++it) {
269 auto const* raw = it.raw();
270 if (!raw) {
271 continue;
272 }
273 size_t payloadSize = it.size();
274 totPayloadSize += payloadSize;
275
276 gsl::span<const std::byte> buffer(reinterpret_cast<const std::byte*>(raw), sizeof(RDH) + payloadSize);
278 }
279 auto tEnd = std::chrono::high_resolution_clock::now();
280
281 if (totPayloadSize > 0) {
282 std::chrono::duration<double, std::milli> elapsed = tEnd - tStart;
283 mTimeDecoder += elapsed;
284 if (!mTimeDecoderMin || (elapsed < mTimeDecoderMin)) {
285 mTimeDecoderMin = elapsed;
286 }
287 if (!mTimeDecoderMax || (elapsed > mTimeDecoderMax)) {
288 mTimeDecoderMax = elapsed;
289 }
290 mTFcount += 1;
291 }
292 }
293
294 //_________________________________________________________________________________________________
295 // the decodeReadout() function processes the messages generated by o2-mch-cru-page-reader-workflow
297 {
298 // Note: DPL allows to extract the san directly from the input
299 // this would make this function obsolete
300 auto const* raw = input.payload;
301 size_t payloadSize = DataRefUtils::getPayloadSize(input);
302
303 gsl::span<const std::byte> buffer(reinterpret_cast<const std::byte*>(raw), payloadSize);
305 }
306
307 //_________________________________________________________________________________________________
308 void logStats()
309 {
310 static auto loggerStart = std::chrono::high_resolution_clock::now();
311 static auto loggerEnd = loggerStart;
312 static uint64_t nDigits = 0;
313 static uint64_t nTF = 0;
314
315 if (mLoggingInterval == 0) {
316 return;
317 }
318
319 nDigits += mDigits.size();
320 nTF += 1;
321
322 loggerEnd = std::chrono::high_resolution_clock::now();
323 std::chrono::duration<double, std::milli> loggerElapsed = loggerEnd - loggerStart;
324 if (loggerElapsed.count() > mLoggingInterval) {
325 LOGP(info, "Processed {} digits in {} time frames", nDigits, nTF);
326 nDigits = 0;
327 nTF = 0;
328 loggerStart = std::chrono::high_resolution_clock::now();
329 }
330 }
331
332 //_________________________________________________________________________________________________
334 {
335 auto createBuffer = [&](auto& vec, size_t& size) {
336 size = vec.empty() ? 0 : sizeof(*(vec.begin())) * vec.size();
337 char* buf = nullptr;
338 if (size > 0) {
339 buf = (char*)malloc(size);
340 if (buf) {
341 char* p = buf;
342 size_t sizeofElement = sizeof(*(vec.begin()));
343 for (auto& element : vec) {
344 memcpy(p, &element, sizeofElement);
345 p += sizeofElement;
346 }
347 }
348 }
349 return buf;
350 };
351
352 reset();
353 for (auto&& input : pc.inputs()) {
354 if (input.spec->binding == "TF") {
355 decodeTF(pc);
356 }
357 if (input.spec->binding == "readout") {
358 decodeReadout(input);
359 }
360 }
361
362 size_t digitsSize;
363 char* digitsBuffer = createBuffer(mDigits, digitsSize);
364
365 size_t hbSize;
366 char* hbBuffer = createBuffer(mHBPackets, hbSize);
367
368 size_t errorsSize;
369 char* errorsBuffer = createBuffer(mErrors, errorsSize);
370
371 // create the output message
372 auto freefct = [](void* data, void*) { free(data); };
373 pc.outputs().adoptChunk(Output{header::gDataOriginMCH, "PDIGITS", 0}, digitsBuffer, digitsSize, freefct, nullptr);
374 pc.outputs().adoptChunk(Output{header::gDataOriginMCH, "HBPACKETS", 0}, hbBuffer, hbSize, freefct, nullptr);
375 pc.outputs().adoptChunk(Output{header::gDataOriginMCH, "ERRORS", 0}, errorsBuffer, errorsSize, freefct, nullptr);
376
377 logStats();
378 }
379
380 private:
382 SampaChannelHandler mChannelHandler;
383 std::vector<o2::mch::calibration::PedestalDigit> mDigits;
384 std::vector<o2::mch::HeartBeatPacket> mHBPackets;
385 std::vector<o2::mch::DecoderError> mErrors;
386
387 Elec2DetMapper mElec2Det{nullptr};
388 FeeLink2SolarMapper mFee2Solar{nullptr};
389 std::string mMapCRUfile;
390 std::string mMapFECfile;
391
392 std::string mInputSpec;
393 bool mDebug = {false};
394 int mLoggingInterval = {0};
395
396 std::chrono::duration<double, std::milli> mTimeDecoder{};
397 std::optional<std::chrono::duration<double, std::milli>> mTimeDecoderMin{};
398 std::optional<std::chrono::duration<double, std::milli>> mTimeDecoderMax{};
399 size_t mTFcount{0};
400};
401
402} // namespace raw
403} // namespace mch
404} // end namespace o2
405
406using namespace o2::framework;
407
408const char* specName = "mch-pedestal-decoder";
409
410void customize(std::vector<ConfigParamSpec>& workflowOptions)
411{
412 workflowOptions.push_back(ConfigParamSpec{"input-spec", VariantType::String, "TF:MCH/RAWDATA", {"selection string for the input data"}});
413 workflowOptions.push_back(ConfigParamSpec{"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings"}});
414}
415
417
418using namespace o2;
419using namespace o2::framework;
420
421//_________________________________________________________________________________________________
423{
424 return DataProcessorSpec{
425 specName,
426 o2::framework::select(inputSpec.c_str()),
427 Outputs{OutputSpec{header::gDataOriginMCH, "PDIGITS", 0, Lifetime::Timeframe},
428 OutputSpec{header::gDataOriginMCH, "HBPACKETS", 0, Lifetime::Timeframe},
429 OutputSpec{header::gDataOriginMCH, "ERRORS", 0, Lifetime::Timeframe}},
430 AlgorithmSpec{adaptFromTask<o2::mch::raw::PedestalsTask>(inputSpec)},
431 Options{{"mch-debug", VariantType::Bool, false, {"enable verbose output"}},
432 {"logging-interval", VariantType::Int, 0, {"time interval in seconds between logging messages (set to zero to disable)"}},
433 {"noise-threshold", VariantType::Float, (float)2.0, {"maximum acceptable noise value"}},
434 {"pedestal-threshold", VariantType::Float, (float)150, {"maximum acceptable pedestal value"}},
435 {"cru-map", VariantType::String, "", {"custom CRU mapping"}},
436 {"fec-map", VariantType::String, "", {"custom FEC mapping"}}}};
437}
438
440{
441 auto inputSpec = config.options().get<std::string>("input-spec");
442 o2::conf::ConfigurableParam::updateFromString(config.options().get<std::string>("configKeyValues"));
443
444 WorkflowSpec specs;
445
447 specs.push_back(producer);
448
449 return specs;
450}
A raw page parser for DPL input.
uint64_t orbit
Definition RawEventData.h:6
Definition of the RAW Data Header.
uint8_t endpoint
Definition RawData.h:0
static void updateFromString(std::string const &)
ConfigParamRegistry & options() const
The parser handles transparently input in the format of raw pages.
const_iterator end() const
const_iterator begin() const
void adoptChunk(const Output &, char *, size_t, fair::mq::FreeFn *, void *)
ServiceRegistryRef services()
Definition InitContext.h:34
ConfigParamRegistry const & options()
Definition InitContext.h:33
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
MCH decoder error implementation.
"Fat" digit for pedestal data.
constexpr uint8_t elinkId() const
Definition DsElecId.h:44
constexpr uint16_t solarId() const
solarId is an identifier that uniquely identify a solar board
Definition DsElecId.h:50
void initElec2DetMapper(std::string filename)
void init(framework::InitContext &ic)
void decodeTF(framework::ProcessingContext &pc)
void run(framework::ProcessingContext &pc)
void decodeReadout(const o2::framework::DataRef &input)
void decodeBuffer(gsl::span< const std::byte > buf)
void initFee2SolarMapper(std::string filename)
void decodePage(gsl::span< const std::byte > page)
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
GLuint GLuint end
Definition glcorearb.h:469
GLuint GLsizei bufSize
Definition glcorearb.h:790
GLboolean * data
Definition glcorearb.h:298
GLbitfield flags
Definition glcorearb.h:1570
GLint GLint GLint GLint GLint GLint GLint GLbitfield GLenum filter
Definition glcorearb.h:1308
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
constexpr o2::header::DataOrigin gDataOriginMCH
Definition DataHeader.h:571
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > select(char const *matcher="")
std::vector< OutputSpec > Outputs
std::function< std::optional< DsDetId >(DsElecId)> createElec2DetMapper< ElectronicMapperGenerated >(uint64_t)
FeeLink2SolarMapper createFeeLink2SolarMapper< ElectronicMapperString >()
std::function< std::optional< DsDetId >(DsElecId)> Elec2DetMapper
Definition Mapper.h:42
std::function< std::optional< uint16_t >(FeeLinkId)> createFeeLink2SolarMapper< ElectronicMapperGenerated >()
std::function< void(Page buffer)> PageDecoder
Definition PageDecoder.h:28
Elec2DetMapper createElec2DetMapper< ElectronicMapperString >(uint64_t)
std::function< void(DsElecId dsId, DualSampaChannelId channel, SampaCluster)> SampaChannelHandler
std::function< std::optional< uint16_t >(FeeLinkId id)> FeeLink2SolarMapper
From (feeId,linkId) to solarId.
Definition Mapper.h:52
PageDecoder createPageDecoder(RawBuffer rdhBuffer, DecodedDataHandlers decodedDataHandlers)
will be called for each decoded Sampa packet and in case of decoding errors
std::string asString(const SampaCluster &sc)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
std::string filename()
const char * specName
WorkflowSpec defineDataProcessing(const ConfigContext &config)
void customize(std::vector< ConfigParamSpec > &workflowOptions)
o2::framework::DataProcessorSpec getMCHPedestalDecodingSpec(const char *specName, std::string inputSpec)
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
const char * payload
Definition DataRef.h:28
SampaHeartBeatHandler sampaHeartBeatHandler
Piece of data for one Sampa channel.
static void printRDH(const RDHv4 &rdh)
Definition RDHUtils.cxx:26
static void setFEEID(RDHv4 &rdh, uint16_t v)
Definition RDHUtils.h:146
static constexpr int getVersion()
get numeric version of the RDH
Definition RDHUtils.h:58
std::vector< std::byte > createBuffer(gsl::span< std::string > data, uint32_t orbit=12345, uint16_t bc=678)
o2::mch::DsIndex ds
std::vector< o2::ctf::BufferType > vec
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"