16#if (defined(WITH_OPENMP) || defined(_OPENMP))
30 auto& currents =
data[
pos].currents;
33 LOGP(warning,
"FE {} already set at time {}, position {}", feid,
time,
pos);
41 const auto startTime = HighResClock::now();
44 const auto instance = sac.getInstance();
53 const auto packetInstance = sac.header.pktCount;
54 const auto packetFE = sac.data.pktNumber;
55 const bool isOK = sac.check();
56 const auto dataWords = sac.getDataWords();
57 const auto feIndex = sac.getFEIndex();
58 auto& lastPacketInstance = mPktCountInstance[instance];
59 auto& lastPacketFE = mPktCountFEs[feIndex];
63 if (lastPacketInstance && (uint16_t(packetInstance) != uint16_t(lastPacketInstance + 1))) {
64 LOGP(error,
"Packet for instance {} missing, last packet {}, this packet {}", instance, lastPacketInstance, packetInstance);
67 if (lastPacketFE && (packetFE != (lastPacketFE + 1))) {
68 LOGP(error,
"Packet for frontend {} missing, last packet {}, this packet {}", feIndex, lastPacketFE, packetFE);
70 lastPacketInstance = packetInstance;
71 lastPacketFE = packetFE;
74 auto& dataStrings = mDataStrings[feIndex];
75 dataStrings.insert(dataStrings.end(), dataWords.begin(), dataWords.end());
77 LOGP(error,
"Problem in SAC data found, header check: {}, data check: {}", sac.header.check(), sac.data.check());
81 auto endTime = HighResClock::now();
82 auto elapsed_seconds = std::chrono::duration_cast<std::chrono::duration<double>>(endTime - startTime);
83 LOGP(detail,
"Time to process data of size {}: {} s",
size, elapsed_seconds.count());
86 ++mCollectedDataPackets;
90int Decoder::decodeChannels(
DecodedDataFE& sacs,
size_t& carry,
int feid)
92 const auto&
data = mDataStrings[feid];
94 const size_t next = std::min(
size_t(8 * 8),
dataSize - carry);
95 const size_t start = carry;
100 if (
data[carry] >=
'0' &&
data[carry] <=
'7') {
101 const uint32_t channel =
data[carry] -
'0';
104 for (
int i = 0;
i < 6; ++
i) {
105 const auto c =
data[carry];
107 if ((
c >=
'0') && (
c <=
'9')) {
109 }
else if ((
c >=
'A') && (
c <=
'F')) {
110 nibble =
c -
'A' + 10;
112 LOGP(warning,
"Problem decoding data value for FE {}, channel {} at position {} / {}, no valid hex charakter, dump: {}\n",
117 value |= (nibble & 0xF);
120 int32_t valueSigned =
value & 0x00FFFFFF;
122 if ((valueSigned >> 23) & 1) {
123 valueSigned |= 0xff000000;
125 sacs.
currents[channel] = valueSigned;
127 if (
data[carry] !=
'\n') {
128 LOGP(warning,
"Problem decoding data value for FE {}, channel {} at position {} / {}, CR expected, dump: {}\n",
140uint32_t Decoder::decodeTimeStamp(
const char*
data)
144 for (
int i = 0;
i < 8; ++
i) {
145 const auto c =
data[carry];
147 if ((
c >=
'0') && (
c <=
'9')) {
149 }
else if ((
c >=
'A') && (
c <=
'F')) {
150 nibble =
c -
'A' + 10;
152 LOGP(error,
"unexpected '{}' in time stamp",
data[carry]);
156 value |= (nibble & 0xF);
163void Decoder::decode(
int feid)
165 const auto startTime = HighResClock::now();
166 auto&
data = mDataStrings[feid];
170 bool syncLost{
false};
173 size_t deletePosition = 0;
180 if ((carry == 0) && (
data[0] ==
'\n') && (
data[1] ==
's')) {
183 while (
data[carry] !=
'\n') {
195 if (
data[carry] >=
'0' &&
data[carry] <=
'7') {
196 const auto status = decodeChannels(decdata, carry, feid);
199 }
else if (status == -1) {
201 LOGP(warn,
"trying to re-align data stream\n");
205 LOGP(error,
"stopping decoding\n");
209 }
else if (
data[carry] ==
'S') {
213 const auto streamStart = carry;
216 uint32_t timeStamp = decodeTimeStamp(&
data[carry]);
221 if (
data[carry] !=
'\n' ||
data[carry + 1] !=
's') {
222 LOGP(warning,
"Problem decoding time stamp for FE ({}) at position {} / {}, dump: {}\n",
223 feid, carry - 8,
dataSize, std::string_view(&
data[carry - 8], std::min(
size_t(20),
dataSize - 8 - carry)));
226 deletePosition = carry;
232 (*mDebugStream) <<
"d"
233 <<
"data=" << decdata
235 <<
"tscount=" << mTSCountFEs[feid]
238 ++mTSCountFEs[feid].first;
242 auto& currentsTime = mDecodedData.
data;
243 const auto nSamples = currentsTime.size();
244 auto firstRefTime = (nSamples > 0) ? currentsTime[0].
time : refTime;
245 auto& refCount = mTSCountFEs[feid].second;
247 if ((refCount == 0) && (refTime > 1)) {
248 LOGP(detail,
"Skipping initial data packet {} with time stamp {} for FE {}",
249 mTSCountFEs[feid].
first, timeStamp, feid);
251 if (refTime < firstRefTime) {
254 firstRefTime = refTime;
255 }
else if (nSamples < refTime - firstRefTime + 1) {
257 mDecodedData.
resize(refTime - firstRefTime + 1);
260 mDecodedData.
setData(refTime - firstRefTime, refTime, decdata, feid);
262 if (refCount != refTime) {
264 if (!((refCount == 0) && (refTime == 1))) {
265 LOGP(warning,
"Unexpected time stamp in FE {}. Count {} != TS {} ({}), dump: {}", feid, refCount, refTime, timeStamp, std::string_view(&
data[streamStart], std::min(
size_t(20),
dataSize - streamStart - carry)));
268 while (refCount < refTime) {
270 if ((refCount == 0) && (refTime == 1)) {
271 LOGP(info,
"Adding dummy data for FE {}, TS {}", feid, refCount);
273 LOGP(warning,
"Adding dummy data for FE {}, TS {}", feid, refCount);
286 }
else if (
const auto pos = mDecodeAdditional.find(
data[carry]); (
pos != std::string::npos) && mDebugStream) {
288 const auto streamStart = carry;
289 const char streamType =
data[carry];
290 const char endMarker = streamType + 32;
293 if (!decodeChannels(decAdditional, carry, feid)) {
297 if (
data[carry] != endMarker) {
298 LOGP(warning,
"Problem decoding additional stream '{}' values for FE ({}) at position {} / {}, dump: {}",
299 streamType, feid, carry,
dataSize, std::string_view(&
data[streamStart], std::min(
size_t(20),
dataSize - streamStart - carry)));
301 const char treeName[2] = {streamType,
'\0'};
302 (*mDebugStream) << treeName
303 <<
"data=" << decAdditional
308 decAdditional.
reset();
312 const char streamType =
data[carry];
313 const char endMarker = streamType + 32;
314 while (
data[carry] != endMarker) {
321 }
else if (
data[carry] >=
'a' &&
data[carry] <=
'z') {
322 if (mTSCountFEs[feid].
first == 0) {
324 LOGP(info,
"Skipping {} for FE {}, time stamp packet count {}, ref count {}",
325 data[carry], feid, mTSCountFEs[feid].
first, mTSCountFEs[feid].second);
329 LOGP(warning,
"Skipping {} for FE {}, trying to re-align data stream, time stamp packet count {}, ref count {}",
330 data[carry], feid, mTSCountFEs[feid].
first, mTSCountFEs[feid].second);
334 LOGP(error,
"Skipping {} for FE {}, might lead to decoding problems, time stamp packet count {}, ref count {}",
335 data[carry], feid, mTSCountFEs[feid].
first, mTSCountFEs[feid].second);
342 LOGP(warn,
"Can't interpret position for FE {}, {} / {}, {}, trying to re-align data stream, time stamp packet count {}, ref count {}",
343 feid, carry,
dataSize, std::string_view(&
data[carry - 8], std::min(
size_t(20),
dataSize - 8 - carry)), mTSCountFEs[feid].
first, mTSCountFEs[feid].second);
347 LOGP(error,
"Can't interpret position for FE {}, {} / {}, {}, stopping decoding, time stamp packet count {}, ref count {}",
348 feid, carry,
dataSize, std::string_view(&
data[carry - 8], std::min(
size_t(20),
dataSize - 8 - carry)), mTSCountFEs[feid].
first, mTSCountFEs[feid].second);
355 data.erase(
data.begin(),
data.begin() + deletePosition);
359 auto endTime = HighResClock::now();
360 auto elapsed_seconds = std::chrono::duration_cast<std::chrono::duration<double>>(endTime - startTime);
361 LOGP(detail,
"Time to decode feid {}: {} s", feid, elapsed_seconds.count());
367 const auto startTime = HighResClock::now();
369#pragma omp parallel for num_threads(sNThreads)
370 for (
size_t feid = 0; feid <
NumberFEs; ++feid) {
375 auto endTime = HighResClock::now();
376 auto elapsed_seconds = std::chrono::duration_cast<std::chrono::duration<double>>(endTime - startTime);
377 LOGP(detail,
"Time to decode all feids {} s, {} s per packet ({})", elapsed_seconds.count(), elapsed_seconds.count() / mCollectedDataPackets, mCollectedDataPackets);
384 const size_t nDecodedData = (streamAll) ? mDecodedData.
data.size() : mDecodedData.
getNGoodEntries();
385 LOGP(info,
"streamDecodedData (streamAll {}): {} / {}", streamAll, nDecodedData, mDecodedData.
data.size());
387 for (
size_t ientry = 0; ientry < nDecodedData; ++ientry) {
388 auto& currentData = mDecodedData.
data[ientry];
389 auto fes = mDecodedData.
fes[ientry].to_ulong();
390 auto nfes = mDecodedData.
fes[ientry].count();
391 (*mDebugStream) <<
"c"
392 <<
"refTime=" << refTime
393 <<
"values=" << currentData
403 LOGP(info,
"Finalize sac::Decoder with {} good / {} remaining entries",
414 mDebugStream->Close();
415 mDebugStream.reset();
425 LOGP(info,
"Clearing data of size {}, firstTS {}, lastTS {}",
426 posGood, (
data.size() > 0) ?
data.front().time : -1, (posGood > 0) ?
data[posGood - 1].
time : 0);
431void Decoder::printPacketInfo(
const sac::packet& sac)
433 const auto& header = sac.
header;
434 const auto& sacc = sac.
data;
436 LOGP(info,
"{:>4} {:>4} {:>8} {:>8} -- {:>4} {:>4} {:>8} {:>8} {:>10} -- {:>4}\n",
449 LOGP(info,
"{:>4} {:>4} {:>8} {:>8} -- {:>4} {:>4} {:>8} {:>8} {:>#10x} -- {:>4b}\n",
452 header.bunchCrossing,
463void Decoder::dumpStreams()
465 const std::string outNameBase(mDebugOutputName.substr(0, mDebugOutputName.size() - 5));
466 for (
size_t feid = 0; feid <
NumberFEs; ++feid) {
467 std::string outName = outNameBase;
468 outName += fmt::format(
".feid_{}.stream.txt", feid);
469 std::ofstream fout(outName.data());
470 const auto&
data = mDataStrings[feid];
471 fout << std::string_view(&
data[0],
data.size());
std::chrono::high_resolution_clock HighResClock
Decoding of integrated analogue currents.
@ TimingInfo
Print timing information.
@ DumpFullStream
Dump the data character streams.
@ StreamSingleFE
Stream debug output for each single FE.
@ ProcessingInfo
Print some processing info.
@ PacketInfo
Print packe information.
@ StreamFinalData
Stream debug output for each single FE.
bool process(const char *data, size_t size)
static constexpr uint32_t SampleDistance
Number of samples between time data stamps.
void streamDecodedData(bool streamAll=false)
static constexpr std::string_view AllowedAdditionalStreams
Allowed additional data streams that can be decoded with debug stream enabled.
@ None
Print packe information.
@ AlignAndFillMissing
Try re-alignment and fill missing packets with 0s.
GLsizei const GLfloat * value
constexpr size_t NumberFEs
Total number of frontends to process.
constexpr uint32_t ChannelsPerFE
Channels per front-end card. One channel is one stack.
constexpr size_t Instances
Number of instances to process.
FIXME: do not use data model tables.
std::array< int32_t, 8 > currents
void insertFront(const size_t entries)
insert entries at the front
double referenceTime
reference time when sampling clock was started
void setData(const size_t pos, uint32_t time, const DecodedDataFE &decdata, const int feid)
copy decoded data from single FE and mark FE as received
size_t getNGoodEntries() const
Number of good entries, for which all FEs were decoded.
std::vector< std::bitset< NumberFEs > > fes
bitmask of decoded FEs
void clearGoodData()
clear entries where all FEs were already decoded
void resize(size_t newSize)
std::vector< DataPoint > data
decoded data