Project
Loading...
Searching...
No Matches
SACDecoder.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <chrono>
13#include <cassert>
14#include <fstream>
15#include <string_view>
16#if (defined(WITH_OPENMP) || defined(_OPENMP))
17#include <omp.h>
18#endif
19
20#include "Framework/Logger.h"
21
23
24using HighResClock = std::chrono::high_resolution_clock;
25using namespace o2::tpc::sac;
26
27void DecodedData::setData(const size_t pos, uint32_t time, const DecodedDataFE& decdata, const int feid)
28{
29 data[pos].time = time;
30 auto& currents = data[pos].currents;
31 std::copy(decdata.currents.begin(), decdata.currents.end(), &currents[0] + feid * ChannelsPerFE);
32 if (fes[pos].test(feid)) {
33 LOGP(warning, "FE {} already set at time {}, position {}", feid, time, pos);
34 }
35 fes[pos].set(feid);
36}
37
38//______________________________________________________________________________
39bool Decoder::process(const char* data, size_t size)
40{
41 const auto startTime = HighResClock::now();
42 assert(size == sizeof(sac::packet));
43 auto& sac = *(sac::packet*)data;
44 const auto instance = sac.getInstance();
45 if (instance >= Instances) {
46 return true;
47 }
48
49 if (mDebugLevel & (uint32_t)DebugFlags::PacketInfo) {
50 printPacketInfo(sac);
51 }
52
53 const auto packetInstance = sac.header.pktCount;
54 const auto packetFE = sac.data.pktNumber;
55 const bool isOK = sac.check();
56 const auto dataWords = sac.getDataWords();
57 const auto feIndex = sac.getFEIndex();
58 auto& lastPacketInstance = mPktCountInstance[instance];
59 auto& lastPacketFE = mPktCountFEs[feIndex];
60
61 // check packet counters are increasing by one
62 //
63 if (lastPacketInstance && (uint16_t(packetInstance) != uint16_t(lastPacketInstance + 1))) { // convert to uint16_t to respect size of sac.header.pktCount
64 LOGP(error, "Packet for instance {} missing, last packet {}, this packet {}", instance, lastPacketInstance, packetInstance);
65 }
66
67 if (lastPacketFE && (packetFE != (lastPacketFE + 1))) {
68 LOGP(error, "Packet for frontend {} missing, last packet {}, this packet {}", feIndex, lastPacketFE, packetFE);
69 }
70 lastPacketInstance = packetInstance;
71 lastPacketFE = packetFE;
72
73 if (isOK) {
74 auto& dataStrings = mDataStrings[feIndex];
75 dataStrings.insert(dataStrings.end(), dataWords.begin(), dataWords.end());
76 } else {
77 LOGP(error, "Problem in SAC data found, header check: {}, data check: {}", sac.header.check(), sac.data.check());
78 }
79
80 if (mDebugLevel & (uint32_t)DebugFlags::TimingInfo) {
81 auto endTime = HighResClock::now();
82 auto elapsed_seconds = std::chrono::duration_cast<std::chrono::duration<double>>(endTime - startTime);
83 LOGP(detail, "Time to process data of size {}: {} s", size, elapsed_seconds.count());
84 }
85
86 ++mCollectedDataPackets;
87 return isOK;
88}
89
90int Decoder::decodeChannels(DecodedDataFE& sacs, size_t& carry, int feid)
91{
92 const auto& data = mDataStrings[feid];
93 const size_t dataSize = data.size();
94 const size_t next = std::min(size_t(8 * 8), dataSize - carry);
95 const size_t start = carry;
96 while (carry < dataSize) {
97 if (carry + 8 >= dataSize) {
98 return 0;
99 }
100 if (data[carry] >= '0' && data[carry] <= '7') {
101 const uint32_t channel = data[carry] - '0';
102 ++carry;
103 uint32_t value = 0;
104 for (int i = 0; i < 6; ++i) {
105 const auto c = data[carry];
106 uint32_t nibble = 0;
107 if ((c >= '0') && (c <= '9')) {
108 nibble = c - '0';
109 } else if ((c >= 'A') && (c <= 'F')) {
110 nibble = c - 'A' + 10;
111 } else {
112 LOGP(warning, "Problem decoding data value for FE {}, channel {} at position {} / {}, no valid hex charakter, dump: {}\n",
113 feid, channel, carry, dataSize, std::string_view(&data[start], next));
114 return -1;
115 }
116 value <<= 4;
117 value |= (nibble & 0xF);
118 ++carry;
119 }
120 int32_t valueSigned = value & 0x00FFFFFF;
121 // negative value?
122 if ((valueSigned >> 23) & 1) {
123 valueSigned |= 0xff000000;
124 }
125 sacs.currents[channel] = valueSigned;
126
127 if (data[carry] != '\n') {
128 LOGP(warning, "Problem decoding data value for FE {}, channel {} at position {} / {}, CR expected, dump: {}\n",
129 feid, channel, carry, dataSize, std::string_view(&data[start], next));
130 return -1;
131 }
132 ++carry;
133 } else {
134 return 1;
135 }
136 }
137 return 1;
138}
139
140uint32_t Decoder::decodeTimeStamp(const char* data)
141{
142 int carry = 0;
143 uint32_t value = 0;
144 for (int i = 0; i < 8; ++i) {
145 const auto c = data[carry];
146 uint32_t nibble = 0;
147 if ((c >= '0') && (c <= '9')) {
148 nibble = c - '0';
149 } else if ((c >= 'A') && (c <= 'F')) {
150 nibble = c - 'A' + 10;
151 } else {
152 LOGP(error, "unexpected '{}' in time stamp", data[carry]);
153 break;
154 }
155 value <<= 4;
156 value |= (nibble & 0xF);
157 ++carry;
158 }
159
160 return value;
161}
162
163void Decoder::decode(int feid)
164{
165 const auto startTime = HighResClock::now();
166 auto& data = mDataStrings[feid];
167 DecodedDataFE decdata;
168 DecodedDataFE decAdditional;
169 bool aligned{false};
170 bool syncLost{false};
171
172 size_t carry = 0;
173 size_t deletePosition = 0;
174 const size_t dataSize = data.size();
175
176 // fmt::print("================ Processing feid {:2} with size {} =================\n", feid, data.size());
177 while (carry < dataSize) {
178 if (!aligned) {
179 // check for re-aligning sequence
180 if ((carry == 0) && (data[0] == '\n') && (data[1] == 's')) {
181 carry += 2;
182 } else {
183 while (data[carry] != '\n') {
184 if (carry >= dataSize) {
185 break;
186 }
187 ++carry;
188 }
189 ++carry;
190 }
191 aligned = true;
192 }
193 // fmt::print("Checking position {} / {}, {}\n", carry, dataSize, std::string_view(&data[carry], std::min(size_t(20), dataSize - carry)));
194
195 if (data[carry] >= '0' && data[carry] <= '7') {
196 const auto status = decodeChannels(decdata, carry, feid);
197 if (status == 0) {
198 break;
199 } else if (status == -1) {
200 if (mReAlignType != ReAlignType::None) {
201 LOGP(warn, "trying to re-align data stream\n");
202 aligned = false;
203 syncLost = true;
204 } else {
205 LOGP(error, "stopping decoding\n");
206 break;
207 }
208 }
209 } else if (data[carry] == 'S') {
210 if (carry + 11 >= dataSize) {
211 break;
212 }
213 const auto streamStart = carry;
214 // time stamp comes after channel data
215 ++carry;
216 uint32_t timeStamp = decodeTimeStamp(&data[carry]);
217 decdata.timeStamp = timeStamp;
218 decAdditional.timeStamp = timeStamp;
219
220 carry += 8;
221 if (data[carry] != '\n' || data[carry + 1] != 's') {
222 LOGP(warning, "Problem decoding time stamp for FE ({}) at position {} / {}, dump: {}\n",
223 feid, carry - 8, dataSize, std::string_view(&data[carry - 8], std::min(size_t(20), dataSize - 8 - carry)));
224 break; // TODO: makes sense?
225 } else {
226 deletePosition = carry; // keep \ns to align in next iteration
227 carry += 2;
228
229#pragma omp critical
230 {
231 if (mDebugStream && (mDebugLevel & (uint32_t)DebugFlags::StreamSingleFE)) {
232 (*mDebugStream) << "d"
233 << "data=" << decdata
234 << "feid=" << feid
235 << "tscount=" << mTSCountFEs[feid]
236 << "\n";
237 }
238 ++mTSCountFEs[feid].first;
239
240 // copy decoded data to output
241 const auto refTime = timeStamp / SampleDistance;
242 auto& currentsTime = mDecodedData.data;
243 const auto nSamples = currentsTime.size();
244 auto firstRefTime = (nSamples > 0) ? currentsTime[0].time : refTime;
245 auto& refCount = mTSCountFEs[feid].second;
246 // NOTE: use (refTime > 1) instead of (refTime > 0), since in some cases the packet with TS 0 is missing
247 if ((refCount == 0) && (refTime > 1)) {
248 LOGP(detail, "Skipping initial data packet {} with time stamp {} for FE {}",
249 mTSCountFEs[feid].first, timeStamp, feid);
250 } else {
251 if (refTime < firstRefTime) {
252 // LOGP(info, "FE {}: {} < {}, adding {} DataPoint(s)", feid, refTime, firstRefTime, firstRefTime - refTime);
253 mDecodedData.insertFront(firstRefTime - refTime);
254 firstRefTime = refTime;
255 } else if (nSamples < refTime - firstRefTime + 1) {
256 // LOGP(info, "FE {}: refTime {}, firstRefTime {}, resize from {} to {}", feid, refTime, firstRefTime, currentsTime.size(), refTime - firstRefTime + 1);
257 mDecodedData.resize(refTime - firstRefTime + 1);
258 }
259 // LOGP(info, "FE {}: insert refTime {} at pos {}, with firstRefTime {}", feid, refTime, refTime - firstRefTime, firstRefTime);
260 mDecodedData.setData(refTime - firstRefTime, refTime, decdata, feid);
261
262 if (refCount != refTime) {
263 // NOTE: be graceful in case TS 0 is missing and avoid furhter warnings
264 if (!((refCount == 0) && (refTime == 1))) {
265 LOGP(warning, "Unexpected time stamp in FE {}. Count {} != TS {} ({}), dump: {}", feid, refCount, refTime, timeStamp, std::string_view(&data[streamStart], std::min(size_t(20), dataSize - streamStart - carry)));
266 }
267 if (((refCount == 0) && (refTime == 1)) || ((mReAlignType == ReAlignType::AlignAndFillMissing) && syncLost)) {
268 while (refCount < refTime) {
269 mDecodedData.setData(refCount - firstRefTime, refCount, DecodedDataFE(), feid);
270 if ((refCount == 0) && (refTime == 1)) {
271 LOGP(info, "Adding dummy data for FE {}, TS {}", feid, refCount);
272 } else {
273 LOGP(warning, "Adding dummy data for FE {}, TS {}", feid, refCount);
274 }
275 ++refCount;
276 }
277 syncLost = false;
278 }
279 }
280 ++refCount;
281 }
282 }
283 }
284
285 decdata.reset();
286 } else if (const auto pos = mDecodeAdditional.find(data[carry]); (pos != std::string::npos) && mDebugStream) {
287 // in case of debug stream output, decode additionally configured data streams
288 const auto streamStart = carry;
289 const char streamType = data[carry];
290 const char endMarker = streamType + 32;
291 ++carry;
292
293 if (!decodeChannels(decAdditional, carry, feid)) {
294 break;
295 }
296
297 if (data[carry] != endMarker) {
298 LOGP(warning, "Problem decoding additional stream '{}' values for FE ({}) at position {} / {}, dump: {}",
299 streamType, feid, carry, dataSize, std::string_view(&data[streamStart], std::min(size_t(20), dataSize - streamStart - carry)));
300 } else {
301 const char treeName[2] = {streamType, '\0'};
302 (*mDebugStream) << treeName
303 << "data=" << decAdditional
304 << "feid=" << feid
305 << "\n";
306 }
307
308 decAdditional.reset();
309 ++carry;
310 } else if (AllowedAdditionalStreams.find(data[carry]) != std::string_view::npos) {
311 // skip stream if not configured or no debug stream
312 const char streamType = data[carry];
313 const char endMarker = streamType + 32;
314 while (data[carry] != endMarker) {
315 if (carry >= dataSize) {
316 break;
317 }
318 ++carry;
319 }
320 ++carry;
321 } else if (data[carry] >= 'a' && data[carry] <= 'z') {
322 if (mTSCountFEs[feid].first == 0) {
323 // in case we still haven't processed the first packet we simply skip end sequence termination and continue
324 LOGP(info, "Skipping {} for FE {}, time stamp packet count {}, ref count {}",
325 data[carry], feid, mTSCountFEs[feid].first, mTSCountFEs[feid].second);
326 ++carry;
327 } else {
328 if (mReAlignType != ReAlignType::None) {
329 LOGP(warning, "Skipping {} for FE {}, trying to re-align data stream, time stamp packet count {}, ref count {}",
330 data[carry], feid, mTSCountFEs[feid].first, mTSCountFEs[feid].second);
331 aligned = false;
332 syncLost = true;
333 } else {
334 LOGP(error, "Skipping {} for FE {}, might lead to decoding problems, time stamp packet count {}, ref count {}",
335 data[carry], feid, mTSCountFEs[feid].first, mTSCountFEs[feid].second);
336 ++carry;
337 }
338 }
339 decdata.reset();
340 } else {
341 if (mReAlignType != ReAlignType::None) {
342 LOGP(warn, "Can't interpret position for FE {}, {} / {}, {}, trying to re-align data stream, time stamp packet count {}, ref count {}",
343 feid, carry, dataSize, std::string_view(&data[carry - 8], std::min(size_t(20), dataSize - 8 - carry)), mTSCountFEs[feid].first, mTSCountFEs[feid].second);
344 aligned = false;
345 syncLost = true;
346 } else {
347 LOGP(error, "Can't interpret position for FE {}, {} / {}, {}, stopping decoding, time stamp packet count {}, ref count {}",
348 feid, carry, dataSize, std::string_view(&data[carry - 8], std::min(size_t(20), dataSize - 8 - carry)), mTSCountFEs[feid].first, mTSCountFEs[feid].second);
349 break;
350 }
351 }
352 }
353
354 // Remove already decoded data
355 data.erase(data.begin(), data.begin() + deletePosition);
356 // LOGP(info, "removing {} characters from stream. Old size {}, new size {}", deletePosition, dataSize, data.size());
357
358 if (mDebugLevel & (uint32_t)DebugFlags::TimingInfo) {
359 auto endTime = HighResClock::now();
360 auto elapsed_seconds = std::chrono::duration_cast<std::chrono::duration<double>>(endTime - startTime);
361 LOGP(detail, "Time to decode feid {}: {} s", feid, elapsed_seconds.count());
362 }
363}
364
366{
367 const auto startTime = HighResClock::now();
368
369#pragma omp parallel for num_threads(sNThreads)
370 for (size_t feid = 0; feid < NumberFEs; ++feid) {
371 decode(feid);
372 }
373
374 if (mDebugLevel & (uint32_t)DebugFlags::TimingInfo) {
375 auto endTime = HighResClock::now();
376 auto elapsed_seconds = std::chrono::duration_cast<std::chrono::duration<double>>(endTime - startTime);
377 LOGP(detail, "Time to decode all feids {} s, {} s per packet ({})", elapsed_seconds.count(), elapsed_seconds.count() / mCollectedDataPackets, mCollectedDataPackets);
378 }
379}
380
381void Decoder::streamDecodedData(bool streamAll)
382{
383 if (mDebugStream && (mDebugLevel & (uint32_t)DebugFlags::StreamFinalData)) {
384 const size_t nDecodedData = (streamAll) ? mDecodedData.data.size() : mDecodedData.getNGoodEntries();
385 LOGP(info, "streamDecodedData (streamAll {}): {} / {}", streamAll, nDecodedData, mDecodedData.data.size());
386 auto refTime = mDecodedData.referenceTime;
387 for (size_t ientry = 0; ientry < nDecodedData; ++ientry) {
388 auto& currentData = mDecodedData.data[ientry];
389 auto fes = mDecodedData.fes[ientry].to_ulong();
390 auto nfes = mDecodedData.fes[ientry].count();
391 (*mDebugStream) << "c"
392 << "refTime=" << refTime
393 << "values=" << currentData
394 << "fes=" << fes
395 << "nfes=" << nfes
396 << "\n";
397 }
398 }
399}
400
402{
403 LOGP(info, "Finalize sac::Decoder with {} good / {} remaining entries",
404 mDecodedData.getNGoodEntries(), mDecodedData.data.size());
405
406 if (mDebugLevel & (uint32_t)DebugFlags::DumpFullStream) {
407 dumpStreams();
408 }
409
410 runDecoding();
411 streamDecodedData(true);
412
413 if (mDebugStream) {
414 mDebugStream->Close();
415 mDebugStream.reset();
416 }
417}
418
420{
422 if (mDebugLevel & (uint32_t)DebugFlags::ProcessingInfo) {
423 auto& data = mDecodedData.data;
424 const auto posGood = mDecodedData.getNGoodEntries();
425 LOGP(info, "Clearing data of size {}, firstTS {}, lastTS {}",
426 posGood, (data.size() > 0) ? data.front().time : -1, (posGood > 0) ? data[posGood - 1].time : 0);
427 }
428 mDecodedData.clearGoodData();
429}
430
431void Decoder::printPacketInfo(const sac::packet& sac)
432{
433 const auto& header = sac.header;
434 const auto& sacc = sac.data;
435
436 LOGP(info, "{:>4} {:>4} {:>8} {:>8} -- {:>4} {:>4} {:>8} {:>8} {:>10} -- {:>4}\n", //
437 "vers", //
438 "inst", //
439 "bc", //
440 "pktCnt", //
441 "feid", //
442 "size", //
443 "pktNum", //
444 "time", //
445 "crc32", //
446 "ok" //
447 );
448
449 LOGP(info, "{:>4} {:>4} {:>8} {:>8} -- {:>4} {:>4} {:>8} {:>8} {:>#10x} -- {:>4b}\n", //
450 header.version, //
451 header.instance, //
452 header.bunchCrossing, //
453 header.pktCount, //
454 sacc.feid, //
455 sacc.pktSize, //
456 sacc.pktNumber, //
457 sacc.timeStamp, //
458 sacc.crc32, //
459 sacc.check() //
460 );
461}
462
463void Decoder::dumpStreams()
464{
465 const std::string outNameBase(mDebugOutputName.substr(0, mDebugOutputName.size() - 5));
466 for (size_t feid = 0; feid < NumberFEs; ++feid) {
467 std::string outName = outNameBase;
468 outName += fmt::format(".feid_{}.stream.txt", feid);
469 std::ofstream fout(outName.data());
470 const auto& data = mDataStrings[feid];
471 fout << std::string_view(&data[0], data.size());
472 }
473}
std::chrono::high_resolution_clock HighResClock
int16_t time
Definition RawEventData.h:4
int32_t i
uint16_t pos
Definition RawData.h:3
uint32_t c
Definition RawData.h:2
Decoding of integrated analogue currents.
@ TimingInfo
Print timing information.
@ DumpFullStream
Dump the data character streams.
@ StreamSingleFE
Stream debug output for each single FE.
@ ProcessingInfo
Print some processing info.
@ PacketInfo
Print packe information.
@ StreamFinalData
Stream debug output for each single FE.
bool process(const char *data, size_t size)
static constexpr uint32_t SampleDistance
Number of samples between time data stamps.
Definition SACDecoder.h:135
void streamDecodedData(bool streamAll=false)
static constexpr std::string_view AllowedAdditionalStreams
Allowed additional data streams that can be decoded with debug stream enabled.
Definition SACDecoder.h:134
@ None
Print packe information.
@ AlignAndFillMissing
Try re-alignment and fill missing packets with 0s.
GLsizeiptr size
Definition glcorearb.h:659
GLenum GLsizei dataSize
Definition glcorearb.h:3994
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLboolean * data
Definition glcorearb.h:298
GLuint start
Definition glcorearb.h:469
constexpr size_t NumberFEs
Total number of frontends to process.
Definition SACDecoder.h:41
constexpr uint32_t ChannelsPerFE
Channels per front-end card. One channel is one stack.
Definition SACDecoder.h:39
constexpr size_t Instances
Number of instances to process.
Definition SACDecoder.h:40
FIXME: do not use data model tables.
Decoded data of one FE.
Definition SACDecoder.h:44
std::array< int32_t, 8 > currents
Definition SACDecoder.h:46
void insertFront(const size_t entries)
insert entries at the front
Definition SACDecoder.h:119
double referenceTime
reference time when sampling clock was started
Definition SACDecoder.h:74
void setData(const size_t pos, uint32_t time, const DecodedDataFE &decdata, const int feid)
copy decoded data from single FE and mark FE as received
size_t getNGoodEntries() const
Number of good entries, for which all FEs were decoded.
Definition SACDecoder.h:85
std::vector< std::bitset< NumberFEs > > fes
bitmask of decoded FEs
Definition SACDecoder.h:76
void clearGoodData()
clear entries where all FEs were already decoded
Definition SACDecoder.h:104
void resize(size_t newSize)
Definition SACDecoder.h:78
std::vector< DataPoint > data
decoded data
Definition SACDecoder.h:75
dataDef data
Definition SAC.h:97
headerDef header
Definition SAC.h:96