Project
Loading...
Searching...
No Matches
RawFileReader.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
15
16#include <algorithm>
17#include <cstring>
18#include <iostream>
19#include <iomanip>
20#include <memory>
21#include <sstream>
22#include <iostream>
24#include "Headers/DAQID.h"
28#include "Framework/Logger.h"
29
30#include <Common/Configuration.h>
31#include <TStopwatch.h>
32#include <fcntl.h>
33
34using namespace o2::raw;
35namespace o2h = o2::header;
36
37//====================== methods of LinkBlock ========================
38//____________________________________________
39void RawFileReader::LinkBlock::print(const std::string& pref) const
40{
41 LOGF(info, "%sfile:%3d offs:%10zu size:%8d newSP:%d newTF:%d newHB:%d endHB:%d | Orbit %u TF %u",
44}
45
46//====================== methods of LinkData ========================
47
48//____________________________________________
50{
51 std::stringstream ss;
52 ss << "Link " << origin.as<std::string>() << '/' << description.as<std::string>() << "/0x"
53 << std::hex << std::setw(8) << std::setfill('0') << subspec
54 << " RO: " << (continuousRO ? "Cont" : "Trig");
55 return ss.str();
56}
57
58//____________________________________________
59void RawFileReader::LinkData::print(bool verbose, const std::string& pref) const
60{
61 LOGF(info, "%s %s FEE:0x%04x CRU:%4d Lnk:%3d EP:%d RDHv%d Src:%s | SPages:%4d Pages:%6d TFs:%6d with %6d HBF in %4d blocks (%d err)",
62 pref, describe(), int(RDHUtils::getFEEID(rdhl)), int(RDHUtils::getCRUID(rdhl)), int(RDHUtils::getLinkID(rdhl)),
63 int(RDHUtils::getEndPointID(rdhl)), int(RDHUtils::getVersion(rdhl)),
64 RDHUtils::getVersion(rdhl) > 5 ? o2h::DAQID::DAQtoO2(RDHUtils::getSourceID(rdhl)).str : "N/A",
65 nSPages, nCRUPages, nTimeFrames, nHBFrames, int(blocks.size()), nErrors);
66 if (verbose) {
67 for (int i = 0; i < int(blocks.size()); i++) {
68 std::stringstream counts;
69 counts << '#' << std::setw(5) << i << " | ";
70 blocks[i].print(counts.str());
71 }
72 }
73}
74
75//____________________________________________
76size_t RawFileReader::LinkData::getNextTFSuperPagesStat(std::vector<RawFileReader::PartStat>& parts) const
77{
78 // get stat. of superpages for this link in this TF. We treat as a start of a superpage the discontinuity in the link data, new TF
79 // or continuous data exceeding a threshold (e.g. 1MB)
80 if (nextBlock2Read >= 0) { // negative nextBlock2Read signals absence of data
81 int sz = 0, nSP = 0, ibl = nextBlock2Read, nbl = blocks.size(), nblPart = 0;
82 parts.clear();
83 while (ibl < nbl && (blocks[ibl].tfID == blocks[nextBlock2Read].tfID)) {
84 if (ibl > nextBlock2Read && (blocks[ibl].testFlag(LinkBlock::StartSP) ||
85 (sz + blocks[ibl].size) > reader->mNominalSPageSize ||
86 (blocks[ibl - 1].offset + blocks[ibl - 1].size) < blocks[ibl].offset)) { // new superpage
87 parts.emplace_back(RawFileReader::PartStat{sz, nblPart});
88 sz = 0;
89 nblPart = 0;
90 }
91 sz += blocks[ibl].size;
92 nblPart++;
93 ibl++;
94 }
95 if (sz) {
96 parts.emplace_back(RawFileReader::PartStat{sz, nblPart});
97 }
98 }
99 return parts.size();
100}
101
102//____________________________________________
104{
105 // estimate the memory size of the next HBF to read
106 // The blocks are guaranteed to not cover more than 1 HB
107 size_t sz = 0;
108 if (nextBlock2Read >= 0) { // negative nextBlock2Read signals absence of data
109 int ibl = nextBlock2Read, nbl = blocks.size();
110 while (ibl < nbl && (blocks[ibl].ir == blocks[nextBlock2Read].ir)) {
111 sz += blocks[ibl].size;
112 ibl++;
113 }
114 }
115 return sz;
116}
117
118//____________________________________________
120{
121 // read data of the next complete HB, buffer of getNextHBFSize() must be allocated in advance
122 size_t sz = 0;
123 if (nextBlock2Read < 0) { // negative nextBlock2Read signals absence of data
124 return sz;
125 }
126 int ibl = nextBlock2Read, nbl = blocks.size();
127 bool error = false;
128 while (ibl < nbl) {
129 auto& blc = blocks[ibl];
130 if (blc.ir != blocks[nextBlock2Read].ir) {
131 break;
132 }
133 ibl++;
134 if (blc.dataCache) {
135 memcpy(buff + sz, blc.dataCache.get(), blc.size);
136 } else {
137 auto fl = reader->mFiles[blc.fileID];
138 if (fseek(fl, blc.offset, SEEK_SET) || fread(buff + sz, 1, blc.size, fl) != blc.size) {
139 LOGF(error, "Failed to read for the %s a bloc:", describe());
140 blc.print();
141 error = true;
142 } else if (reader->mCacheData) { // need to fill the cache at 1st reading
143 blc.dataCache = std::make_unique<char[]>(blc.size);
144 memcpy(blc.dataCache.get(), buff + sz, blc.size); // will be used at next reading
145 }
146 }
147 sz += blc.size;
148 }
149 nextBlock2Read = ibl;
150 return error ? 0 : sz; // in case of the error we ignore the data
151}
152
153//____________________________________________
155{
156 // skip next complete HB
157 size_t sz = 0;
158 if (nextBlock2Read < 0) { // negative nextBlock2Read signals absence of data
159 return sz;
160 }
161 int ibl = nextBlock2Read, nbl = blocks.size();
162 while (ibl < nbl) {
163 const auto& blc = blocks[ibl];
164 if (blc.ir.orbit != blocks[nextBlock2Read].ir.orbit) {
165 break;
166 }
167 ibl++;
168 sz += blc.size;
169 }
170 nextBlock2Read = ibl;
171 return sz;
172}
173
174//____________________________________________
176{
177 // estimate the memory size of the next TF to read
178 // (assuming nextBlock2Read is at the start of the TF)
179 size_t sz = 0;
180 if (nextBlock2Read >= 0) { // negative nextBlock2Read signals absence of data
181 int ibl = nextBlock2Read, nbl = blocks.size();
182 while (ibl < nbl && (blocks[ibl].tfID == blocks[nextBlock2Read].tfID)) {
183 sz += blocks[ibl].size;
184 ibl++;
185 }
186 }
187 return sz;
188}
189
190//_____________________________________________________________________
192{
193 // read next complete TF, buffer of getNextTFSize() must be allocated in advance
194 size_t sz = 0;
195 if (nextBlock2Read < 0) { // negative nextBlock2Read signals absence of data
196 return sz;
197 }
198 int ibl0 = nextBlock2Read, nbl = blocks.size();
199 bool error = false;
200 while (nextBlock2Read < nbl && (blocks[nextBlock2Read].tfID == blocks[ibl0].tfID)) { // nextBlock2Read is incremented by the readNextHBF!
201 auto szb = readNextHBF(buff + sz);
202 if (!szb) {
203 error = true;
204 }
205 sz += szb;
206 }
207 return error ? 0 : sz; // in case of the error we ignore the data
208}
209
210//_____________________________________________________________________
212{
213 // skip next complete TF
214 size_t sz = 0;
215 if (nextBlock2Read < 0) { // negative nextBlock2Read signals absence of data
216 return sz;
217 }
218 int ibl0 = nextBlock2Read, nbl = blocks.size();
219 bool error = false;
220 while (nextBlock2Read < nbl && (blocks[nextBlock2Read].tfID == blocks[ibl0].tfID)) { // nextBlock2Read is incremented by the readNextHBF!
221 auto szb = skipNextHBF();
222 if (!szb) {
223 error = true;
224 }
225 sz += szb;
226 }
227 return error ? 0 : sz; // in case of the error we ignore the data
228}
229
230//_____________________________________________________________________
232{
233 // go to given TF
234 if (tf < tfStartBlock.size()) {
235 nextBlock2Read = tfStartBlock[tf].first;
236 } else {
237 LOG(warning) << "No TF " << tf << " for " << describe();
238 nextBlock2Read = -1;
239 return false;
240 }
241 return true;
242}
243
244//____________________________________________
246{
247 // estimate number of HBFs left in the TF
248 int ibl = nextBlock2Read, nbl = blocks.size(), nHB = 0;
249 if (nextBlock2Read >= 0) { // negative nextBlock2Read signals absence of data
250 while (ibl < nbl && (blocks[ibl].tfID == blocks[nextBlock2Read].tfID)) {
251 if (blocks[ibl].testFlag(LinkBlock::StartHB)) {
252 nHB++;
253 }
254 ibl++;
255 }
256 }
257 return nHB;
258}
259
260//____________________________________________
262{
263 // read data of the next complete HB, buffer of getNextHBFSize() must be allocated in advance
264 size_t sz = 0;
265 if (nextBlock2Read < 0) { // negative nextBlock2Read signals absence of data
266 return sz;
267 }
268 int ibl = nextBlock2Read, nbl = blocks.size();
269 auto tfID = blocks[nextBlock2Read].tfID;
270 bool error = false;
271 if (pstat) { // info is provided, use it derictly
272 sz = pstat->size;
273 ibl += pstat->nBlocks;
274 } else { // need to calculate blocks to read
275 while (ibl < nbl) {
276 auto& blc = blocks[ibl];
277 if (ibl > nextBlock2Read && (blc.tfID != blocks[nextBlock2Read].tfID ||
278 blc.testFlag(LinkBlock::StartSP) ||
279 (sz + blc.size) > reader->mNominalSPageSize ||
280 blocks[ibl - 1].offset + blocks[ibl - 1].size < blc.offset)) { // new superpage or TF
281 break;
282 }
283 ibl++;
284 sz += blc.size;
285 }
286 }
287 if (sz) {
288 if (reader->mCacheData && blocks[nextBlock2Read].dataCache) {
289 memcpy(buff, blocks[nextBlock2Read].dataCache.get(), sz);
290 } else {
291 auto fl = reader->mFiles[blocks[nextBlock2Read].fileID];
292 if (fseek(fl, blocks[nextBlock2Read].offset, SEEK_SET) || fread(buff, 1, sz, fl) != sz) {
293 LOGF(error, "Failed to read for the %s a bloc:", describe());
294 blocks[nextBlock2Read].print();
295 error = true;
296 } else if (reader->mCacheData) { // cache after 1st reading
297 blocks[nextBlock2Read].dataCache = std::make_unique<char[]>(sz);
298 memcpy(blocks[nextBlock2Read].dataCache.get(), buff, sz);
299 }
300 }
301 }
302 nextBlock2Read = ibl;
303 return error ? 0 : sz; // in case of the error we ignore the data
304}
305
306//____________________________________________
308{
309 // estimate largest super page size
310 size_t szMax = 0, szLast = 0;
311 for (const auto& bl : blocks) {
312 if (bl.testFlag(LinkBlock::StartSP)) { // account previous SPage and start accumulation of the next one
313 if (szLast > szMax) {
314 szMax = szLast;
315 }
316 szLast = 0;
317 }
318 szLast += bl.size;
319 }
320 return szLast > szMax ? szLast : szMax;
321}
322
323//____________________________________________
325{
326 // estimate largest TF
327 size_t szMax = 0, szLast = 0;
328 for (const auto& bl : blocks) {
329 if (bl.testFlag(LinkBlock::StartTF)) { // account previous TF and start accumulation of the next one
330 if (szLast > szMax) {
331 szMax = szLast;
332 }
333 szLast = 0;
334 }
335 szLast += bl.size;
336 }
337 return szLast > szMax ? szLast : szMax;
338}
339
340//_____________________________________________________________________
342{
343 // account RDH in statistics
344 bool ok = true;
345 bool newTF = false, newHB = false;
346 const auto& HBU = HBFUtils::Instance();
347
348 if (RDHUtils::getFEEID(rdh) != RDHUtils::getFEEID(rdhl)) { // make sure links with different FEEID were not assigned same subspec
349 LOGF(error, "Same SubSpec is found for %s with different RDH.feeId", describe());
350 LOGF(error, "old RDH assigned SubSpec=0x%-8d:", subspec);
351 RDHUtils::dumpRDH(rdhl);
352 LOGF(error, "new RDH assigned SubSpec=0x%-8d:", subspec);
354 throw std::runtime_error("Conflicting SubSpecs are provided");
355 ok = false;
356 nErrors++;
357 }
358 auto ir = RDHUtils::getTriggerIR(rdh);
359 auto pageCnt = RDHUtils::getPageCounter(rdh);
360
361 if (pageCnt == 0) {
362 auto triggerType = RDHUtils::getTriggerType(rdh);
363 if (!nCRUPages) { // 1st page, expect SOX
364 if (triggerType & o2::trigger::SOC) {
365 continuousRO = true;
366 irOfSOX = ir;
367 } else if (triggerType & o2::trigger::SOT) {
368 continuousRO = false;
369 irOfSOX = ir;
370 } else {
371 if (reader->mCheckErrors & (0x1 << ErrNoSOX)) {
372 LOG(error) << ErrNames[ErrNoSOX];
373 ok = false;
374 nErrors++;
375 }
376 }
377 if (!irOfSOX.isDummy() && reader->getTFAutodetect() == FirstTFDetection::Pending) {
378 reader->imposeFirstTF(irOfSOX.orbit);
379 }
380 }
381 auto newTFCalc = reader->getTFAutodetect() != FirstTFDetection::Pending && (blocks.empty() || HBU.getTF(blocks.back().ir) != HBU.getTF(ir)); // TF change
382 if (cruDetector) {
383 newTF = (triggerType & o2::trigger::TF);
384 newHB = (triggerType & o2::trigger::HB);
385 if (newTFCalc != newTF && (reader->mCheckErrors & (0x1 << ErrMismatchTF))) {
386 LOG(error) << ErrNames[ErrMismatchTF];
387 ok = false;
388 nErrors++;
389 }
390 if (reader->mPreferCalculatedTFStart) {
391 newTF = newTFCalc;
392 if (newTF) {
393 newHB = true;
394 }
395 }
396 } else {
397 newHB = true; // in RORC detectors treat each trigger as a HBF
398 if (newTFCalc) {
399 newTF = true;
400 // continuousRO = false;
401 }
402 }
403 } else if (reader->mCheckErrors & (0x1 << ErrWrongPageCounterIncrement)) {
404 // check increasing pageCnt
405 if (nCRUPages && (pageCnt != (RDHUtils::getPageCounter(rdhl) + 1))) { // skip for very 1st page
407 << " old=" << int(pageCnt) << " new=" << int(RDHUtils::getPageCounter(rdhl));
408 ok = false;
409 nErrors++;
410 }
411 }
412
413 if (reader->mCheckErrors) {
414 if (nCRUPages) {
415 // check increasing (or wrapping) packetCounter
416 auto packetCounter = RDHUtils::getPacketCounter(rdh);
417 auto packetCounterL = RDHUtils::getPacketCounter(rdhl);
418 if ((packetCounter != ((packetCounterL + 1) & 0xff)) &&
419 (reader->mCheckErrors & (0x1 << ErrWrongPacketCounterIncrement))) { // skip for very 1st page
421 << " new=" << int(packetCounter) << " old=" << int(packetCounterL);
422 ok = false;
423 nErrors++;
424 }
425 // check if number of HBFs in the TF is as expected
426 if (newTF) {
427 if (nHBFinTF != HBFUtils::Instance().getNOrbitsPerTF() &&
428 (reader->mCheckErrors & (0x1 << ErrWrongHBFsPerTF)) && cruDetector) {
429 LOG(error) << ErrNames[ErrWrongHBFsPerTF] << ": "
430 << nHBFinTF << " instead of " << HBFUtils::Instance().getNOrbitsPerTF();
431 ok = false;
432 nErrors++;
433 }
434 nHBFinTF = 0; // reset
435 }
436
437 } else { // make sure data starts with TF and HBF
438 if ((!newTF || !newHB || pageCnt != 0) &&
439 (reader->mCheckErrors & (0x1 << ErrWrongFirstPage) && cruDetector)) {
440 LOG(error) << ErrNames[ErrWrongFirstPage];
441 ok = false;
442 nErrors++;
443 }
444 }
445 }
446 auto stop = RDHUtils::getStop(rdh);
447 auto hbIR = RDHUtils::getHeartBeatIR(rdh), hblIR = RDHUtils::getHeartBeatIR(rdhl);
448 if (newHB) {
449 if (reader->mCheckErrors) {
450 nHBFinTF++;
451 if (stop && (reader->mCheckErrors & (0x1 << ErrHBFStopOnFirstPage))) {
452 LOG(error) << ErrNames[ErrHBFStopOnFirstPage] << " @ HBF#" << nHBFrames;
453 ok = false;
454 nErrors++;
455 }
456 if (openHB && (reader->mCheckErrors & (0x1 << ErrHBFNoStop)) && cruDetector) {
457 LOG(error) << ErrNames[ErrHBFNoStop] << " @ HBF#" << nHBFrames;
458 ok = false;
459 nErrors++;
460 }
461 if ((reader->mCheckErrors & (0x1 << ErrHBFJump)) &&
462 (nCRUPages && // skip this check for the very 1st RDH
463 !(/*hbIR.bc == hblIR.bc &&*/ hbIR.orbit == hblIR.orbit + 1)) &&
464 cruDetector) {
465 LOG(error) << ErrNames[ErrHBFJump] << " @ HBF#" << nHBFrames << " New HB orbit/bc=" << hbIR.orbit << '/' << int(hbIR.bc)
466 << " is not incremented by 1 orbit wrt Old HB orbit/bc=" << hblIR.orbit << '/' << int(hblIR.bc);
467 ok = false;
468 nErrors++;
469 }
470 } // end of check errors
471 openHB = true;
472 nHBFrames++;
473 }
474 if (stop) {
475 openHB = false;
476 }
477
478 if (cruDetector &&
479 ((reader->getTFAutodetect() == FirstTFDetection::Pending && !newTF) ||
480 (reader->getTFAutodetect() == FirstTFDetection::Done && ir.orbit < HBU.orbitFirst))) { // skip data until TF start is seen or orbit is less than determined 1st TF orbit
481 LOG(error) << "skipping RDH w/o newTF flag until TF start is found";
482 ok = false;
483 newTF = newSPage = newHB = false;
484 }
485
486 if (newTF || newSPage || newHB) {
487 if (newTF && reader->getTFAutodetect() == FirstTFDetection::Pending) {
488 if (cruDetector) {
489 reader->imposeFirstTF(hbIR.orbit);
490 } else {
491 throw std::runtime_error("HBFUtil first orbit/bc autodetection cannot be done with first link from CRORC detector");
492 }
493 }
494 int nbl = blocks.size();
495 auto& bl = blocks.emplace_back(reader->mCurrentFileID, reader->mPosInFile);
496 bl.ir = hbIR;
497 bl.tfID = HBU.getTF(hbIR); // nTimeFrames - 1;
498 if (newTF) {
499 tfStartBlock.emplace_back(nbl, bl.tfID);
500 nTimeFrames++;
501 bl.setFlag(LinkBlock::StartTF);
502 if (reader->mCheckErrors & (0x1 << ErrNoSuperPageForTF) && cruDetector) {
503 if (reader->mMultiLinkFile && !newSPage) {
504 LOG(error) << ErrNames[ErrNoSuperPageForTF] << " @ TF#" << nTimeFrames;
505 ok = false;
506 nErrors++;
507 }
508 } // end of check errors
509 }
510
511 if (newSPage) {
512 nSPages++;
513 bl.setFlag(LinkBlock::StartSP);
514 }
515 if (newHB) {
516 bl.setFlag(LinkBlock::StartHB);
517 }
518 }
519 if (blocks.size()) {
520 blocks.back().setFlag(LinkBlock::EndHB, stop); // last processed RDH defines this flag
521 blocks.back().size += RDHUtils::getOffsetToNext(rdh);
522 rdhl = rdh;
523 nCRUPages++;
524 }
525 if (!ok) {
526 LOG(error) << " ^^^Problem(s) was encountered at offset " << reader->mPosInFile << " of file " << reader->mCurrentFileID;
528 } else if (reader->mVerbosity > 1) {
529 if (reader->mVerbosity > 2) {
531 } else {
533 }
534 LOG(info) << "--------------- reader tags: newTF: " << newTF << " newHBF/Trigger: " << newHB << " newSPage: " << newSPage;
535 }
536 return true;
537}
538
539//====================== methods of RawFileReader ========================
540
541//_____________________________________________________________________
542RawFileReader::RawFileReader(const std::string& config, int verbosity, size_t buffSize, const std::string& onlyDet) : mVerbosity(verbosity), mBufferSize(buffSize)
543{
544 if (!config.empty()) {
545 auto inp = parseInput(config, onlyDet, true);
547 }
548}
549
550//_____________________________________________________________________
551int RawFileReader::getLinkLocalID(const RDHAny& rdh, int fileID)
552{
553 // get id of the link subspec. in the parser (create entry if new)
554 auto orig = std::get<0>(mDataSpecs[fileID]);
555 LinkSubSpec_t subspec = RDHUtils::getSubSpec(rdh);
556 LinkSpec_t spec = createSpec(orig, subspec);
557 auto entryMap = mLinkEntries.find(spec);
558 if (entryMap == mLinkEntries.end()) { // need to register a new link
559 int n = mLinkEntries.size();
560 mLinkEntries[spec] = n;
561 auto& lnk = mLinksData.emplace_back(rdh, this);
562 lnk.subspec = subspec;
563 lnk.origin = orig;
564 lnk.description = std::get<1>(mDataSpecs[fileID]);
565 lnk.spec = spec;
566 lnk.cruDetector = std::get<2>(mDataSpecs[fileID]) == CRU;
567 return n;
568 }
569 return entryMap->second;
570}
571
572//_____________________________________________________________________
573bool RawFileReader::preprocessFile(int ifl)
574{
575 // preprocess file, check RDH data, build statistics
576 std::unique_ptr<char[]> buffer = std::make_unique<char[]>(mBufferSize);
577 FILE* fl = mFiles[ifl];
578 mCurrentFileID = ifl;
579 LinkSpec_t specPrev = 0xffffffffffffffff;
580 int lIDPrev = -1;
581 mMultiLinkFile = false;
582 fseek(fl, 0L, SEEK_END);
583 const auto fileSize = ftell(fl);
584 rewind(fl);
585 long int nr = 0;
586 mPosInFile = 0;
587 size_t nRDHread = 0, boffs;
588 bool readMore = true;
589 while (readMore && (nr = fread(buffer.get(), 1, mBufferSize, fl))) {
590 boffs = 0;
591 while (1) {
592 auto& rdh = *reinterpret_cast<RDHUtils::RDHAny*>(&buffer[boffs]);
593 if ((mPosInFile + RDHUtils::getOffsetToNext(rdh)) > fileSize) {
594 LOGP(warning, "File {} truncated current file pos {} + offsetToNext {} > fileSize {}", ifl, mPosInFile, RDHUtils::getOffsetToNext(rdh), fileSize);
595 readMore = false;
596 break;
597 }
598 nRDHread++;
599 LinkSpec_t spec = createSpec(std::get<0>(mDataSpecs[mCurrentFileID]), RDHUtils::getSubSpec(rdh));
600 int lID = lIDPrev;
601 if (spec != specPrev) { // link has changed
602 specPrev = spec;
603 if (lIDPrev != -1) {
604 mMultiLinkFile = true;
605 }
606 lID = getLinkLocalID(rdh, mCurrentFileID);
607 }
608 bool newSPage = lID != lIDPrev;
609 try {
610 mLinksData[lID].preprocessCRUPage(rdh, newSPage);
611 } catch (...) {
612 LOG(error) << "Corrupted data, abandoning processing";
613 mStopProcessing = true;
614 break;
615 }
616
617 if (mLinksData[lID].nTimeFrames && (mLinksData[lID].nTimeFrames - 1 > mMaxTFToRead)) { // limit reached, discard the last read
618 mLinksData[lID].nTimeFrames--;
619 mLinksData[lID].blocks.pop_back();
620 if (mLinksData[lID].nHBFrames > 0) {
621 mLinksData[lID].nHBFrames--;
622 }
623 if (mLinksData[lID].nCRUPages > 0) {
624 mLinksData[lID].nCRUPages--;
625 }
626 lIDPrev = -1; // last block is closed
627 readMore = false;
628 break;
629 }
630 boffs += RDHUtils::getOffsetToNext(rdh);
631 mPosInFile += RDHUtils::getOffsetToNext(rdh);
632 lIDPrev = lID;
633 if (boffs + sizeof(RDHUtils::RDHAny) >= nr) {
634 if (fseek(fl, mPosInFile, SEEK_SET)) {
635 readMore = false;
636 break;
637 }
638 break;
639 }
640 }
641 }
642 LOGF(info, "File %3d : %9li bytes scanned, %6d RDH read for %4d links from %s",
643 mCurrentFileID, mPosInFile, nRDHread, int(mLinkEntries.size()), mFileNames[mCurrentFileID]);
644 return nRDHread > 0;
645}
646
647//_____________________________________________________________________
648void RawFileReader::printStat(bool verbose) const
649{
650 int n = getNLinks();
651 for (int i = 0; i < n; i++) {
652 const auto& link = getLink(i);
653 std::stringstream counts;
654 counts << "Lnk" << std::left << std::setw(4) << i << "| ";
655 link.print(verbose, counts.str());
656 }
657}
658
659//_____________________________________________________________________
661{
662 mLinkEntries.clear();
663 mOrderedIDs.clear();
664 mLinksData.clear();
665 for (auto fl : mFiles) {
666 fclose(fl);
667 }
668 mFiles.clear();
669 mFileNames.clear();
670
671 mCurrentFileID = 0;
672 mMultiLinkFile = false;
673 mInitDone = false;
674}
675
676//_____________________________________________________________________
678{
679 if (mInitDone) {
680 LOG(error) << "Cannot add new files after initialization";
681 return false;
682 }
683 bool ok = true;
684
685 mFileBuffers.push_back(std::make_unique<char[]>(mBufferSize));
686 auto inFile = fopen(sname.c_str(), "rb");
687 if (!inFile) {
688 LOG(error) << "Failed to open input file " << sname;
689 return false;
690 }
691 setvbuf(inFile, mFileBuffers.back().get(), _IOFBF, mBufferSize);
692
693 if (origin == o2h::gDataOriginInvalid) {
694 LOG(error) << "Invalid data origin " << origin.as<std::string>() << " for file " << sname;
695 ok = false;
696 }
697 if (desc == o2h::gDataDescriptionInvalid) {
698 LOG(error) << "Invalid data description " << desc.as<std::string>() << " for file " << sname;
699 ok = false;
700 }
701 if (!ok) {
702 fclose(inFile);
703 return false;
704 }
705 mFileNames.push_back(sname);
706 mFiles.push_back(inFile);
707 mDataSpecs.emplace_back(origin, desc, t);
708 return true;
709}
710
711//_____________________________________________________________________
713{
714 // make initialization, preprocess files and chack for errors if asked
715
716 for (int i = 0; i < NErrorsDefined; i++) {
717 if (mCheckErrors & (0x1 << i)) {
718 LOGF(info, "%s check for /%s/", (mCheckErrors & (0x1 << i)) ? "perform" : "ignore ", ErrNames[i].data());
719 }
720 }
721 if (mMaxTFToRead < 0xffffffff) {
722 LOGF(info, "at most %u TF will be processed", mMaxTFToRead);
723 }
724
725 int nf = mFiles.size();
726 mEmpty = true;
727 for (int i = 0; i < nf; i++) {
728 if (preprocessFile(i)) {
729 mEmpty = false;
730 }
731 }
732 if (mStopProcessing) {
733 LOG(error) << "Abandoning processing due to corrupted data";
734 return false;
735 }
736 mOrderedIDs.resize(mLinksData.size());
737 for (int i = mLinksData.size(); i--;) {
738 mOrderedIDs[i] = i;
739 if (mNTimeFrames < mLinksData[i].nTimeFrames) {
740 mNTimeFrames = mLinksData[i].nTimeFrames;
741 }
742 }
743 std::sort(mOrderedIDs.begin(), mOrderedIDs.end(),
744 [&links = mLinksData](int a, int b) { return links[a].spec < links[b].spec; });
745
746 size_t maxSP = 0, maxTF = 0;
747
748 LOGF(info, "Summary of preprocessing:");
749 for (int i = 0; i < int(mLinksData.size()); i++) {
750 auto& link = getLink(i);
751 auto msp = link.getLargestSuperPage();
752 auto mtf = link.getLargestTF();
753 if (maxSP < msp) {
754 maxSP = msp;
755 }
756 if (maxTF < mtf) {
757 maxTF = mtf;
758 }
759 std::stringstream counts;
760 counts << "Lnk" << std::setw(4) << std::left << i << "| ";
761 link.print(mVerbosity, counts.str());
762 if (msp > mNominalSPageSize) {
763 LOGF(debug, " Attention: largest superpage %zu B exceeds expected %d B",
764 msp, mNominalSPageSize);
765 }
766 // min max orbits
767 if (link.blocks.front().ir.orbit < mOrbitMin) {
768 mOrbitMin = link.blocks.front().ir.orbit;
769 }
770 if (link.blocks.back().ir.orbit > mOrbitMax) {
771 mOrbitMax = link.blocks.back().ir.orbit;
772 }
773 if (link.tfStartBlock.empty() && !link.blocks.empty()) {
774 link.tfStartBlock.emplace_back(0, 0);
775 }
776 if ((mCheckErrors & (0x1 << ErrWrongNumberOfTF)) && (mNTimeFrames != link.nTimeFrames)) {
777 LOGF(error, "%s for %s: %u TFs while %u were seen for other links", ErrNames[ErrWrongNumberOfTF],
778 link.describe(), link.nTimeFrames, mNTimeFrames);
779 }
780 }
781 LOGF(info, "First orbit: %u, Last orbit: %u", mOrbitMin, mOrbitMax);
782 LOGF(info, "Largest super-page: %zu B, largest TF: %zu B", maxSP, maxTF);
783 if (!mCheckErrors) {
784 LOGF(info, "Detailed data format check was disabled");
785 }
786 mInitDone = true;
787
788 return !mEmpty;
789}
790
791//_____________________________________________________________________
793{
794 constexpr int NGoodOrigins = 21;
795 constexpr std::array<o2h::DataOrigin, NGoodOrigins> goodOrigins{
800
801 for (auto orgood : goodOrigins) {
802 if (ors == orgood.as<std::string>()) {
803 return orgood;
804 }
805 }
807}
808
809//_____________________________________________________________________
811{
812 constexpr int NGoodDesc = 5;
813 constexpr std::array<o2h::DataDescription, NGoodDesc> goodDesc{
816
817 for (auto dgood : goodDesc) {
818 if (ors == dgood.as<std::string>()) {
819 return dgood;
820 }
821 }
823}
824
825//_____________________________________________________________________
827{
828 // load from already parsed input
829 for (const auto& entry : inp) {
830 const auto& ordesc = entry.first;
831 const auto& files = entry.second;
832 if (files.empty()) { // these are default origin and decription
833 setDefaultDataOrigin(std::get<0>(ordesc));
834 setDefaultDataDescription(std::get<1>(ordesc));
835 setDefaultReadoutCardType(std::get<2>(ordesc));
836 continue;
837 }
838 for (const auto& fnm : files) { // specific file names
839 if (!addFile(fnm, std::get<0>(ordesc), std::get<1>(ordesc), std::get<2>(ordesc))) {
840 throw std::runtime_error("wrong raw data file path or origin/description");
841 }
842 }
843 }
844}
845
846//_____________________________________________________________________
847RawFileReader::InputsMap RawFileReader::parseInput(const std::string& confUri, const std::string& onlyDet, bool verbose)
848{
849 // read input files from configuration
850 std::map<OrigDescCard, std::vector<std::string>> entries;
851
852 if (confUri.empty()) {
853 throw std::runtime_error("Input configuration file is not provided");
854 }
855 std::string confFile = confUri;
856 ConfigFile cfg;
857 // ConfigFile file requires input as file:///absolute_path or file:local
858 if (confFile.rfind("file:", 0) != 0) {
859 confFile.insert(0, "file:");
860 }
861 try {
862 cfg.load(confFile);
863 } catch (std::string& e) { // unfortunately, the ConfigFile::load throws a string rather than the std::exception
864 throw std::runtime_error(std::string("Failed to parse configuration ") + confFile + " : " + e);
865 }
866 //
867 std::unordered_map<std::string, int> detFilter;
868 auto msk = DetID::getMask(onlyDet);
869 for (DetID::ID id = DetID::First; id <= DetID::Last; id++) {
870 if (msk[id]) {
871 detFilter[DetID::getName(id)] = 1;
872 }
873 }
874
875 try {
876 std::string origStr, descStr, cardStr, defstr = "defaults";
877 cfg.getOptionalValue<std::string>(defstr + ".dataOrigin", origStr, DEFDataOrigin.as<std::string>());
878 auto defDataOrigin = getDataOrigin(origStr);
879 if (defDataOrigin == o2h::gDataOriginInvalid) {
880 throw std::runtime_error(std::string("Invalid default data origin ") + origStr);
881 }
882 cfg.getOptionalValue<std::string>(defstr + ".dataDescription", descStr, DEFDataDescription.as<std::string>());
883 auto defDataDescription = getDataDescription(descStr);
884 if (defDataDescription == o2h::gDataDescriptionInvalid) {
885 throw std::runtime_error(std::string("Invalid default data description ") + descStr);
886 }
887 auto defCardType = DEFCardType;
888 cfg.getOptionalValue<std::string>(defstr + ".readoutCard", cardStr, std::string{CardNames[DEFCardType]});
889 if (cardStr == CardNames[CRU]) {
890 defCardType = CRU;
891 } else if (cardStr == CardNames[RORC]) {
892 defCardType = RORC;
893 } else {
894 throw std::runtime_error(std::string("Invalid default readout card ") + cardStr);
895 }
896
897 entries[{defDataOrigin, defDataDescription, defCardType}]; // insert
898 LOG(debug) << "Setting default dataOrigin/Description/CardType " << defDataOrigin.as<std::string>() << '/' << defDataDescription.as<std::string>() << '/' << CardNames[defCardType];
899
900 for (auto flsect : ConfigFileBrowser(&cfg, "input-")) {
901 std::string flNameStr, defs{""};
902 cfg.getOptionalValue<std::string>(flsect + ".dataOrigin", origStr, defDataOrigin.as<std::string>());
903 cfg.getOptionalValue<std::string>(flsect + ".dataDescription", descStr, defDataDescription.as<std::string>());
904 cfg.getOptionalValue<std::string>(flsect + ".filePath", flNameStr, defs);
905 cfg.getOptionalValue<std::string>(flsect + ".readoutCard", cardStr, std::string{CardNames[CRU]});
906 if (flNameStr.empty()) {
907 LOG(debug) << "Skipping incomplete input " << flsect;
908 continue;
909 }
910 auto dataOrigin = getDataOrigin(origStr);
911 if (dataOrigin == o2h::gDataOriginInvalid) {
912 throw std::runtime_error(std::string("Invalid data origin ") + origStr + " for " + flsect);
913 }
914 if (!detFilter.empty()) {
915 int& sdet = detFilter[dataOrigin.as<std::string>()];
916 if (sdet < 1) {
917 if (sdet == 0 && verbose) { // print only once
918 LOG(info) << "discarding data of detector " << dataOrigin.as<std::string>();
919 sdet--;
920 }
921 continue;
922 }
923 }
924 auto dataDescription = getDataDescription(descStr);
925 if (dataDescription == o2h::gDataDescriptionInvalid) {
926 throw std::runtime_error(std::string("Invalid data description ") + descStr + " for " + flsect);
927 }
928
929 auto cardType = defCardType;
930 if (cardStr == CardNames[CRU]) {
931 cardType = CRU;
932 } else if (cardStr == CardNames[RORC]) {
933 cardType = RORC;
934 } else {
935 throw std::runtime_error(std::string("Invalid default readout card ") + cardStr + " for " + flsect);
936 }
937 entries[{dataOrigin, dataDescription, cardType}].push_back(flNameStr);
938 LOG(debug) << "adding file " << flNameStr << " to dataOrigin/Description " << dataOrigin.as<std::string>() << '/' << dataDescription.as<std::string>();
939 }
940 } catch (std::string& e) { // to catch exceptions from the parser
941 throw std::runtime_error(std::string("Aborting due to the exception: ") + e);
942 }
943
944 return entries;
945}
946
948{
949 if (mFirstTFAutodetect != FirstTFDetection::Pending) {
950 throw std::runtime_error("reader was not expecting imposing first TF");
951 }
952 auto& hbu = o2::raw::HBFUtils::Instance();
953 o2::raw::HBFUtils::setValue("HBFUtils", "orbitFirst", orbit);
954 LOG(info) << "Imposed data-driven TF start";
955 mFirstTFAutodetect = FirstTFDetection::Done;
956 hbu.printKeyValues();
957}
958
960{
961 std::string opt = ErrCheckDefaults[e] ? "nocheck-" : "check-";
962 return opt + RawFileReader::ErrNamesShort[e].data();
963}
964
966{
967 std::string opt = ErrCheckDefaults[e] ? "ignore /" : "check /";
968 return opt + RawFileReader::ErrNames[e].data() + '/';
969}
#define verbosity
Definition of the 32 Central Trigger System (CTS) Trigger Types defined in https://twiki....
uint64_t orbit
Definition RawEventData.h:6
int32_t i
Reader for (multiple) raw data files.
std::ostringstream debug
static void setValue(std::string const &mainkey, std::string const &subkey, T x)
static constexpr const char * getName(ID id)
names of defined detectors
Definition DetID.h:145
static constexpr ID First
Definition DetID.h:94
static constexpr ID Last
if extra detectors added, update this !!!
Definition DetID.h:92
static mask_t getMask(const std::string_view detList)
detector masks from any non-alpha-num delimiter-separated list (empty if NONE is supplied)
Definition DetID.cxx:42
void printStat(bool verbose=false) const
static InputsMap parseInput(const std::string &confUri, const std::string &onlyDet={}, bool verbose=false)
void setDefaultDataDescription(const std::string &desc)
static std::string nochk_expl(ErrTypes e)
void setDefaultDataOrigin(const std::string &orig)
std::map< OrigDescCard, std::vector< std::string > > InputsMap
static constexpr std::string_view CardNames[]
void loadFromInputsMap(const InputsMap &inp)
static constexpr bool ErrCheckDefaults[]
static constexpr std::string_view ErrNamesShort[]
static o2::header::DataDescription getDataDescription(const std::string &ors)
static std::string nochk_opt(ErrTypes e)
const LinkData & getLink(int i) const
void setDefaultReadoutCardType(ReadoutCardType t=CRU)
static o2::header::DataOrigin getDataOrigin(const std::string &ors)
RawFileReader(const std::string &config="", int verbosity=0, size_t buffsize=50 *1024UL, const std::string &onlyDet={})
void imposeFirstTF(uint32_t orbit)
static constexpr std::string_view ErrNames[]
bool addFile(const std::string &sname, o2::header::DataOrigin origin, o2::header::DataDescription desc, ReadoutCardType t=CRU)
GLdouble n
Definition glcorearb.h:1982
GLuint buffer
Definition glcorearb.h:655
GLuint entry
Definition glcorearb.h:5735
GLsizeiptr size
Definition glcorearb.h:659
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLintptr offset
Definition glcorearb.h:660
GLboolean GLboolean GLboolean GLboolean a
Definition glcorearb.h:1233
constexpr o2::header::DataOrigin gDataOriginPHS
Definition DataHeader.h:574
constexpr o2::header::DataOrigin gDataOriginMFT
Definition DataHeader.h:572
constexpr o2::header::DataOrigin gDataOriginMCH
Definition DataHeader.h:571
constexpr o2::header::DataDescription gDataDescriptionInvalid
Definition DataHeader.h:596
constexpr o2::header::DataOrigin gDataOriginCTP
Definition DataHeader.h:564
constexpr o2::header::DataOrigin gDataOriginFLP
Definition DataHeader.h:562
constexpr o2::header::DataOrigin gDataOriginTST
Definition DataHeader.h:579
constexpr o2::header::DataDescription gDataDescriptionInfo
Definition DataHeader.h:601
constexpr o2::header::DataOrigin gDataOriginHMP
Definition DataHeader.h:569
constexpr o2::header::DataOrigin gDataOriginTPC
Definition DataHeader.h:576
constexpr o2::header::DataOrigin gDataOriginFOC
Definition DataHeader.h:583
constexpr o2::header::DataOrigin gDataOriginZDC
Definition DataHeader.h:578
constexpr o2::header::DataDescription gDataDescriptionTracks
Definition DataHeader.h:599
constexpr o2::header::DataOrigin gDataOriginInvalid
Definition DataHeader.h:561
constexpr o2::header::DataDescription gDataDescriptionConfig
Definition DataHeader.h:600
constexpr o2::header::DataOrigin gDataOriginMID
Definition DataHeader.h:573
constexpr o2::header::DataOrigin gDataOriginFDD
Definition DataHeader.h:568
constexpr o2::header::DataDescription gDataDescriptionRawData
Definition DataHeader.h:597
constexpr o2::header::DataOrigin gDataOriginFT0
Definition DataHeader.h:566
constexpr o2::header::DataOrigin gDataOriginTRD
Definition DataHeader.h:577
constexpr o2::header::DataOrigin gDataOriginEMC
Definition DataHeader.h:565
constexpr o2::header::DataOrigin gDataOriginTOF
Definition DataHeader.h:575
constexpr o2::header::DataOrigin gDataOriginITS
Definition DataHeader.h:570
constexpr o2::header::DataOrigin gDataOriginCPV
Definition DataHeader.h:563
constexpr o2::header::DataDescription gDataDescriptionClusters
Definition DataHeader.h:598
constexpr o2::header::DataOrigin gDataOriginFV0
Definition DataHeader.h:567
O2 data header classes and API, v0.1.
Definition DetID.h:49
uint32_t LinkSubSpec_t
Definition RDHUtils.h:34
constexpr uint32_t SOT
Definition Triggers.h:33
constexpr uint32_t TF
Definition Triggers.h:37
constexpr uint32_t SOC
Definition Triggers.h:35
constexpr uint32_t HB
Definition Triggers.h:27
std::unique_ptr< GPUReconstructionTimeframe > tf
uint32_t orbit
LHC orbit.
std::enable_if_t< std::is_same< T, std::string >::value==true, T > as() const
get the descriptor as std::string
Definition DataHeader.h:301
int getNOrbitsPerTF() const
get IR corresponding to start of the HBF
Definition HBFUtils.h:49
static LinkSubSpec_t getSubSpec(uint16_t cru, uint8_t link, uint8_t endpoint, uint16_t feeId, o2::header::DAQID::ID srcid=o2::header::DAQID::INVALID)
Definition RDHUtils.h:685
static void printRDH(const RDHv4 &rdh)
Definition RDHUtils.cxx:26
static void dumpRDH(const H &rdh, NOTPTR(H))
Definition RDHUtils.h:669
static constexpr int getVersion()
get numeric version of the RDH
Definition RDHUtils.h:58
bool testFlag(uint8_t fl) const
void print(const std::string &pref="") const
uint16_t fileID
ir starting the block
bool preprocessCRUPage(const RDHAny &rdh, bool newSPage)
void print(bool verbose=false, const std::string &pref="") const
size_t readNextSuperPage(char *buff, const PartStat *pstat=nullptr)
size_t getNextTFSuperPagesStat(std::vector< PartStat > &parts) const
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
o2::InteractionRecord ir(0, 0)
const std::string str