41 if (mFName2File.empty()) {
47 if (mDoLazinessCheck) {
48 IR newIR = mDetLazyCheck.
ir;
50 mDoLazinessCheck =
false;
59 for (
auto& lnk : mSSpec2Link) {
60 lnk.second.close(irmax);
66 for (
auto& flh : mFName2File) {
67 LOG(info) <<
"Closing output file " << flh.first;
68 fclose(flh.second.handler);
69 flh.second.handler =
nullptr;
73 LOG(warning) <<
"RawFileWriter forced " << mDetLazyCheck.
completeCount <<
" dummy addData calls in "
74 << mDetLazyCheck.
irSeen <<
" IRs for links which did not receive data";
81void RawFileWriter::fillFromCache()
83 LOG(info) <<
"Filling links from cached trees";
84 mCachingStage =
false;
85 for (
const auto& cache : mCacheMap) {
86 for (
const auto&
entry : cache.second) {
88 link.cacheTree->GetEntry(
entry.second);
89 if (mDoLazinessCheck) {
91 mDetLazyCheck.
acknowledge(link.subspec, cache.first, link.cacheBuffer.preformatted, link.cacheBuffer.trigger, link.cacheBuffer.detField);
93 link.addData(cache.first, link.cacheBuffer.payload, link.cacheBuffer.preformatted, link.cacheBuffer.trigger, link.cacheBuffer.detField);
97 for (
auto& linkEntry : mSSpec2Link) {
98 if (linkEntry.second.cacheTree) {
99 linkEntry.second.cacheTree->Write();
100 linkEntry.second.cacheTree.reset(
nullptr);
103 std::string cacheName{mCacheFile->GetName()};
105 mCacheFile.reset(
nullptr);
106 unlink(cacheName.c_str());
110RawFileWriter::LinkData::LinkData(
const LinkData&
src) : rdhCopy(
src.rdhCopy), updateIR(
src.updateIR), lastRDHoffset(
src.lastRDHoffset), startOfRun(
src.startOfRun), packetCounter(
src.packetCounter), pageCnt(
src.pageCnt), subspec(
src.subspec), nTFWritten(
src.nTFWritten), nRDHWritten(
src.nRDHWritten), nBytesWritten(
src.nBytesWritten), fileName(
src.fileName),
buffer(
src.
buffer), writer(
src.writer)
119 updateIR =
src.updateIR;
120 lastRDHoffset =
src.lastRDHoffset;
121 startOfRun =
src.startOfRun;
122 packetCounter =
src.packetCounter;
123 pageCnt =
src.pageCnt;
124 subspec =
src.subspec;
125 nTFWritten =
src.nTFWritten;
126 nRDHWritten =
src.nRDHWritten;
127 nBytesWritten =
src.nBytesWritten;
128 fileName =
src.fileName;
139 std::string outFileName{outFileNameV};
141 auto& linkData = mSSpec2Link[sspec];
142 auto&
file = mFName2File[std::string(outFileName)];
144 if (!(
file.handler = fopen(outFileName.c_str(),
"wb"))) {
145 LOG(error) <<
"Failed to open output file " << outFileName;
146 throw std::runtime_error(std::string(
"cannot open link output file ") + outFileName);
149 if (!linkData.fileName.empty()) {
150 if (linkData.fileName == outFileName) {
151 LOGF(info,
"Link 0x%ux was already declared with same output, do nothing", sspec);
154 LOGF(error,
"Link 0x%ux was already connected to different output file %s", sspec, linkData.fileName);
155 throw std::runtime_error(
"redifinition of the link output file");
158 linkData.fileName = outFileName;
159 linkData.subspec = sspec;
161 if (mUseRDHVersion > 6) {
168 if (mUseRDHVersion >= 6) {
171 linkData.writer =
this;
173 linkData.buffer.reserve(mSuperPageSize);
175 LOGF(info,
"Registered %s with output to %s", linkData.describe(), outFileName);
185 if (mVerbosity > 10) {
186 LOGP(info,
"addData for {} on IR BCid:{} Orbit: {}, payload: {}, preformatted: {}, trigger: {}, detField: {}", link.describe(),
ir.
bc,
ir.
orbit,
data.size(), preformatted, trigger, detField);
189 LOG(error) <<
"provided payload size " <<
data.size() <<
" is not multiple of GBT word size";
190 throw std::runtime_error(
"payload size is not mutiple of GBT word size");
193 LOG(warning) <<
"provided " <<
ir <<
" precedes first sampled TF " << mHBFUtils.
getFirstSampledTFIR() <<
" | discarding data for " << link.describe();
197 if (!link.discardData) {
198 link.discardData =
true;
199 LOG(info) <<
"Orbit " <<
ir.
orbit <<
": max. allowed orbit " << mHBFUtils.
orbitFirst + mHBFUtils.
maxNOrbits - 1 <<
" exceeded, " << link.describe() <<
" will discard further data";
203 if (
ir < mFirstIRAdded) {
207 if (mDoLazinessCheck && !mCachingStage) {
209 mDetLazyCheck.
acknowledge(sspec,
ir, preformatted, trigger, detField);
211 link.addData(
ir,
data, preformatted, trigger, detField);
226 for (
auto& lnk : mSSpec2Link) {
227 if (irmax < lnk.second.updateIR) {
228 irmax = lnk.second.updateIR;
237 auto lnkIt = mSSpec2Link.find(ss);
238 if (lnkIt == mSSpec2Link.end()) {
239 LOGF(error,
"The link for SubSpec=0x%u was not registered", ss);
240 throw std::runtime_error(
"data for non-registered GBT link supplied");
242 return lnkIt->second;
249 std::ofstream cfgfile;
250 cfgfile.open(cfgname.data());
252 cfgfile <<
"#[defaults]" << std::endl;
253 cfgfile <<
"#dataOrigin = " << origin << std::endl;
254 cfgfile <<
"#dataDescription = " << description << std::endl;
255 cfgfile <<
"#readoutCard = " << (
isCRUDetector() ?
"CRU" :
"RORC") << std::endl;
258 <<
"[input-" << mOrigin.
str <<
'-' <<
i <<
"]" << std::endl;
259 cfgfile <<
"dataOrigin = " << origin << std::endl;
260 cfgfile <<
"dataDescription = " << description << std::endl;
261 cfgfile <<
"readoutCard = " << (
isCRUDetector() ?
"CRU" :
"RORC") << std::endl;
271 if (!mFirstIRAdded.
isDummy()) {
272 throw std::runtime_error(
"caching must be requested before feeding the data");
274 mCachingStage =
true;
279 mCacheFile.reset(TFile::Open(cachename.c_str(),
"recreate"));
280 LOG(info) <<
"Switched caching ON";
289 std::lock_guard<std::mutex> lock(writer->mCacheFileMtx);
291 writer->mCacheFile->cd();
293 cacheTree->Branch(
"cache", &cacheBuffer);
295 cacheBuffer.preformatted = preformatted;
296 cacheBuffer.trigger = trigger;
297 cacheBuffer.detField = detField;
298 cacheBuffer.payload.resize(
data.size());
300 memcpy(cacheBuffer.payload.data(),
data.data(),
data.size());
302 writer->mCacheMap[
ir].emplace_back(subspec, cacheTree->GetEntries());
311 std::lock_guard<std::mutex> lock(
mtx);
312 addDataInternal(
ir,
data, preformatted, trigger, detField);
319 LOG(
debug) <<
"Adding " <<
data.size() <<
" bytes in IR " <<
ir <<
" to " << describe() <<
" checkEmpty=" << checkEmpty;
320 if (writer->mCachingStage) {
321 cacheData(
ir,
data, preformatted, trigger, detField);
324 if (startOfRun && ((writer->mHBFUtils.getFirstIRofTF(
ir) > writer->mHBFUtils.getFirstIR()) && !writer->mHBFUtils.obligatorySOR)) {
328 if (startOfRun && writer->isRORCDetector()) {
329 writer->mHBFUtils.updateRDH<
RDHAny>(rdhCopy, writer->mHBFUtils.getFirstIR(),
false);
331 openHBFPage(rdhCopy);
336 if (
ir >= updateIR && checkEmpty) {
337 fillEmptyHBHs(
ir,
true);
342 auto& rdh = *getLastRDH();
346 auto& rdh = *getLastRDH();
354 addPreformattedCRUPage(
data);
359 if (isNewPage() && writer->newRDHFunc) {
360 std::vector<char> newPageHeader;
361 writer->newRDHFunc(getLastRDH(),
false, newPageHeader);
362 pushBack(newPageHeader.data(), newPageHeader.size());
369 bool carryOver =
false, wasSplit =
false, lastSplitPart =
false;
371 std::vector<char> carryOverHeader;
376 if (writer->newRDHFunc) {
377 std::vector<char> newPageHeader;
378 writer->newRDHFunc(getLastRDH(),
false, newPageHeader);
379 pushBack(newPageHeader.data(), newPageHeader.size());
383 LOG(
debug) <<
"Adding carryOverHeader " << carryOverHeader.size()
384 <<
" bytes in IR " <<
ir <<
" to " << describe();
385 pushBack(carryOverHeader.data(), carryOverHeader.size());
386 carryOverHeader.clear();
389 int sizeLeftSupPage = writer->mSuperPageSize -
buffer.size();
391 int sizeLeft = sizeLeftCRUPage < sizeLeftSupPage ? sizeLeftCRUPage : sizeLeftSupPage;
392 if (!sizeLeft || (sizeLeft < writer->mAlignmentSize)) {
394 if (writer->newRDHFunc) {
395 std::vector<char> newPageHeader;
396 writer->newRDHFunc(getLastRDH(),
false, newPageHeader);
397 pushBack(newPageHeader.data(), newPageHeader.size());
403 if (wasSplit && writer->mApplyCarryOverToLastPage) {
404 lastSplitPart =
true;
413 LOG(
debug) <<
"Adding payload " <<
dataSize <<
" bytes in IR " <<
ir <<
" (carryover=" << carryOver <<
" ) to " << describe();
420 int sizeActual = sizeLeft;
421 std::vector<char> carryOverTrailer;
422 if (writer->carryOverFunc) {
423 sizeActual = writer->carryOverFunc(&rdhCopy,
data,
ptr, sizeLeft, splitID++, carryOverTrailer, carryOverHeader);
425 LOG(
debug) <<
"Adding carry-over " << splitID - 1 <<
" fitted payload " << sizeActual <<
" bytes in IR " <<
ir <<
" to " << describe();
426 if (sizeActual < 0 || (!lastSplitPart && (sizeActual + carryOverTrailer.size() > sizeLeft))) {
427 throw std::runtime_error(std::string(
"wrong carry-over data size provided by carryOverMethod") +
std::to_string(sizeActual));
430 int trailerOffset = 0;
432 trailerOffset = carryOverTrailer.size();
433 if (sizeActual - trailerOffset < 0) {
434 throw std::runtime_error(
"trailer size of last split chunk cannot exceed actual size as it overwrites the existing trailer");
437 pushBack(
ptr, sizeActual - trailerOffset);
440 LOG(
debug) <<
"Adding carryOverTrailer " << carryOverTrailer.size() <<
" bytes in IR " <<
ir <<
" to " << describe();
441 pushBack(carryOverTrailer.data(), carryOverTrailer.size());
452 int sizeLeftSupPage = writer->mSuperPageSize -
buffer.size();
453 if (sizeLeftSupPage <
data.size()) {
454 flushSuperPage(
true);
457 LOG(error) <<
"Preformatted payload size of " <<
data.size() <<
" bytes for " << describe()
459 throw std::runtime_error(
"preformatted payload exceeds max size");
461 if (
int(
buffer.size()) - lastRDHoffset >
sizeof(
RDHAny)) {
472 if (lastRDHoffset < 0) {
476 auto& lastRDH = *getLastRDH();
477 int psize = getCurrentPageSize();
479 if (writer->mAlignmentSize && psize % writer->mAlignmentSize != 0) {
480 std::vector<char>
padding(writer->mAlignmentSize - psize % writer->mAlignmentSize, writer->mAlignmentPaddingFiller);
489 if (stop && !writer->mAddSeparateHBFStopPage) {
490 if (writer->isRDHStopUsed()) {
495 if (writer->mVerbosity > 2) {
500 int left = writer->mSuperPageSize -
buffer.size();
501 if (
left <= MarginToFlush) {
507 std::vector<char> userData;
510 if (writer->newRDHFunc) {
511 writer->newRDHFunc(&rdhCopy, psize ==
sizeof(
RDHAny), userData);
512 sz += userData.size();
514 if (writer->mAlignmentSize && sz % writer->mAlignmentSize != 0) {
515 sz += writer->mAlignmentSize - sz % writer->mAlignmentSize;
516 userData.resize(sz -
sizeof(
RDHAny), writer->mAlignmentPaddingFiller);
521 lastRDHoffset = pushBack(rdhCopy);
522 if (!userData.empty()) {
523 pushBack(userData.data(), userData.size());
530 if (writer->mVerbosity > 2 && add) {
544 if (lastRDHoffset < 0) {
547 bool emptyPage = getCurrentPageSize() ==
sizeof(
RDHAny);
548 if (emptyPage && writer->emptyHBFFunc) {
549 std::vector<char> emtyHBFFiller;
550 const auto rdh = getLastRDH();
551 writer->emptyHBFFunc(rdh, emtyHBFFiller);
552 if (!emtyHBFFiller.empty()) {
553 auto ir = RDHUtils::getTriggerIR(rdh);
554 LOG(
debug) <<
"Adding empty HBF filler of size " << emtyHBFFiller.size() <<
" for " << describe();
555 addDataInternal(
ir, emtyHBFFiller,
false, 0, 0,
false);
565 bool forceNewPage =
false;
568 (writer->isRORCDetector() &&
569 (updateIR == writer->mHBFUtils.getFirstIR() || writer->mHBFUtils.getTF(updateIR - 1) < writer->mHBFUtils.getTF(RDHUtils::getTriggerIR(rdhn))))) {
570 if (writer->mVerbosity > -10) {
571 LOGF(info,
"Starting new TF for link FEEId 0x%04x", RDHUtils::getFEEID(rdhn));
573 if (writer->mStartTFOnNewSPage && nTFWritten) {
577 int left = writer->mSuperPageSize -
buffer.size();
578 if (forceNewPage ||
left <= MarginToFlush) {
582 lastRDHoffset = pushBack(rdhn);
583 auto& newrdh = *getLastRDH();
590 if (startOfRun && writer->isReadOutModeSet()) {
601 size_t pgSize = (lastRDHoffset < 0 || !keepLastPage) ?
buffer.size() : lastRDHoffset;
602 if (writer->mVerbosity) {
603 LOGF(info,
"Flushing super page of %u bytes for %s", pgSize, describe());
605 writer->mFName2File.find(fileName)->second.write(
buffer.data(), pgSize);
606 auto toMove =
buffer.size() - pgSize;
608 if (toMove > pgSize) {
614 lastRDHoffset -= pgSize;
626 if (writer->mFName2File.find(fileName) == writer->mFName2File.end()) {
629 if (writer->isCRUDetector()) {
630 int tf = writer->mHBFUtils.getTF(irf);
631 auto finalIR = writer->mHBFUtils.getIRTF(
tf + 1) - 1;
632 fillEmptyHBHs(finalIR,
false);
643 if (writer->isCRUDetector()) {
644 std::vector<o2::InteractionRecord> irw;
645 if (!writer->mHBFUtils.fillHBIRvector(irw, updateIR,
ir)) {
648 for (
const auto& irdummy : irw) {
649 if (writer->mDontFillEmptyHBF &&
650 writer->mHBFUtils.getTFandHBinTF(irdummy).second != 0 &&
651 (!dataAdded || irdummy.orbit <
ir.
orbit)) {
657 if (writer->mVerbosity > 2) {
658 LOG(info) <<
"Adding HBF " << irdummy <<
" for " << describe();
662 writer->mHBFUtils.updateRDH<
RDHAny>(rdhCopy, irdummy, writer->isCRUDetector());
663 openHBFPage(rdhCopy);
667 if (writer->mVerbosity > 2) {
668 LOG(info) <<
"Adding HBF " <<
ir <<
" for " << describe();
672 writer->mHBFUtils.updateRDH<
RDHAny>(rdhCopy,
ir,
false);
673 openHBFPage(rdhCopy);
681 std::stringstream ss;
682 ss <<
"Link SubSpec=0x" << std::hex << std::setw(8) << std::setfill(
'0') << subspec << std::dec
683 <<
'(' << std::setw(3) <<
int(RDHUtils::getCRUID(rdhCopy)) <<
':' << std::setw(2) <<
int(RDHUtils::getLinkID(rdhCopy)) <<
':'
684 <<
int(RDHUtils::getEndPointID(rdhCopy)) <<
") feeID=0x" << std::hex << std::setw(4) << std::setfill(
'0') << RDHUtils::getFEEID(rdhCopy);
691 LOGF(info,
"Summary for %s : NTF: %u NRDH: %u Nbytes: %u", describe(), nTFWritten, nRDHWritten, nBytesWritten);
702 if ((writer->mSuperPageSize -
int(
buffer.size())) < 0) {
703 flushSuperPage(keepLastOnFlash);
705 auto offs = expandBufferBy(sz);
715 std::lock_guard<std::mutex> lock(fileMtx);
716 fwrite(
data, 1, sz, handler);
725 preformatted = _preformatted;
727 detField = _detField;
735 if (wr->mSSpec2Link.size() == linksDone.size() ||
ir == _ir ||
ir.
isDummy()) {
738 for (
auto& it : wr->mSSpec2Link) {
739 auto res = linksDone.find(it.first);
740 if (
res == linksDone.end()) {
741 if (wr->mVerbosity > 10) {
742 LOGP(info,
"Complete {} for IR BCid:{} Orbit: {}", it.second.describe(),
ir.
bc,
ir.
orbit);
745 it.second.addData(
ir, gsl::span<char>{}, preformatted, trigger, detField);
753 if (!std::filesystem::exists(outDirName)) {
754#if defined(__clang__)
759 std::filesystem::create_directories(outDirName);
760 if (!std::filesystem::exists(outDirName)) {
761 LOG(fatal) <<
"could not create output directory " << outDirName;
764 if (!std::filesystem::create_directories(outDirName)) {
765 LOG(fatal) <<
"could not create output directory " << outDirName;
Definition of the 32 Central Trigger System (CTS) Trigger Types defined in https://twiki....
Definition of the Names Generator class.
Utility class to write detectors data to (multiple) raw data file(s) respecting CRU format.
std::string getOutputFileName(int i) const
void setSuperPageSize(int nbytes)
bool isCRUDetector() const
void addData(uint16_t feeid, uint16_t cru, uint8_t lnk, uint8_t endpoint, const IR &ir, const gsl::span< char > data, bool preformatted=false, uint32_t trigger=0, uint32_t detField=0)
LinkData & registerLink(uint16_t fee, uint16_t cru, uint8_t link, uint8_t endpoint, std::string_view outFileName)
int getNOutputFiles() const
void writeConfFile(std::string_view origin="FLP", std::string_view description="RAWDATA", std::string_view cfgname="raw.cfg", bool fullPath=true) const
IR getIRMax() const
get highest IR seen so far
o2::header::RDHAny RDHAny
LinkData & getLinkWithSubSpec(LinkSubSpec_t ss)
constexpr int LHCMaxBunches
void assertOutputDirectory(std::string_view outDirName)
std::string to_string(gsl::span< T, Size > span)
std::unique_ptr< GPUReconstructionTimeframe > tf
uint16_t bc
bunch crossing ID of interaction
bool obligatorySOR
in mc->raw always start from run 1st TF to set the SOR
void checkConsistency() const
uint32_t maxNOrbits
max number of orbits to accept, used in digit->raw conversion
IR getFirstSampledTFIR() const
get TF and HB (abs) for this IR
uint32_t orbitFirst
orbit of 1st TF of the run
static constexpr int MAXCRUPage
static LinkSubSpec_t getSubSpec(uint16_t cru, uint8_t link, uint8_t endpoint, uint16_t feeId, o2::header::DAQID::ID srcid=o2::header::DAQID::INVALID)
static void setLinkID(H &rdh, uint8_t v, NOTPTR(H))
static void setDataFormat(RDHv7 &rdh, uint8_t s)
static void setDetectorField(H &rdh, uint32_t v, NOTPTR(H))
static void setTriggerType(H &rdh, uint32_t v, NOTPTR(H))
static constexpr int GBTWord128
static void setPageCounter(H &rdh, uint16_t v, NOTPTR(H))
static void setMemorySize(H &rdh, uint16_t v, NOTPTR(H))
static void printRDH(const RDHv4 &rdh)
static void setVersion(H &rdh, uint8_t v, NOTPTR(H))
static void setPacketCounter(H &rdh, uint8_t v, NOTPTR(H))
static void setSourceID(RDHv7 &rdh, uint8_t s)
static void setEndPointID(H &rdh, uint8_t v, NOTPTR(H))
static void setFEEID(RDHv4 &rdh, uint16_t v)
static void setOffsetToNext(H &rdh, uint16_t v, NOTPTR(H))
static void setStop(H &rdh, uint8_t v, NOTPTR(H))
static void setCRUID(H &rdh, uint16_t v, NOTPTR(H))
void acknowledge(LinkSubSpec_t s, const IR &_ir, bool _preformatted, uint32_t _trigger, uint32_t _detField)
void completeLinks(RawFileWriter *wr, const IR &_ir)
LinkData & operator=(const LinkData &src)
std::string describe() const
void addPreformattedCRUPage(const gsl::span< char > data)
void addData(const IR &ir, const gsl::span< char > data, bool preformatted=false, uint32_t trigger=0, uint32_t detField=0)
size_t pushBack(const char *ptr, size_t sz, bool keepLastOnFlash=true)
append to the end of the buffer and return the point where appended to
void flushSuperPage(bool keepLastPage=false)
void fillEmptyHBHs(const IR &ir, bool dataAdded)
void addHBFPage(bool stop=false)
void addDataInternal(const IR &ir, const gsl::span< char > data, bool preformatted=false, uint32_t trigger=0, uint32_t detField=0, bool checkEmpty=true)
void cacheData(const IR &ir, const gsl::span< char > data, bool preformatted, uint32_t trigger=0, uint32_t detField=0)
void openHBFPage(const RDHAny &rdh, uint32_t trigger=0)
void write(const char *data, size_t size)
static std::string concat_string(Ts const &... ts)
static std::string getFullPath(const std::string_view p)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
o2::InteractionRecord ir(0, 0)