Project
Loading...
Searching...
No Matches
RawFileWriter.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
15
16#include <iomanip>
17#include <iostream>
18#include <sstream>
19#include <functional>
20#include <cassert>
25#include "Framework/Logger.h"
26#include <filesystem>
27
28using namespace o2::raw;
30
31//_____________________________________________________________________
36
37//_____________________________________________________________________
39{
40 // finalize all links
41 if (mFName2File.empty()) {
42 return;
43 }
44 if (mCachingStage) {
45 fillFromCache();
46 }
47 if (mDoLazinessCheck) {
48 IR newIR = mDetLazyCheck.ir;
49 mDetLazyCheck.completeLinks(this, ++newIR); // make sure that all links for previously called IR got their addData call
50 mDoLazinessCheck = false;
51 }
52
53 if (!mFirstIRAdded.isDummy()) { // flushing and completing the last HBF makes sense only if data was added.
54 auto irmax = getIRMax();
55 // for CRU detectors link.updateIR and hence the irmax points on the last IR with data + 1 orbit
56 if (isCRUDetector()) {
57 irmax.orbit -= 1;
58 }
59 for (auto& lnk : mSSpec2Link) {
60 lnk.second.close(irmax);
61 lnk.second.print();
62 }
63 }
64 //
65 // close all files
66 for (auto& flh : mFName2File) {
67 LOG(info) << "Closing output file " << flh.first;
68 fclose(flh.second.handler);
69 flh.second.handler = nullptr;
70 }
71 mFName2File.clear();
72 if (mDetLazyCheck.completeCount) {
73 LOG(warning) << "RawFileWriter forced " << mDetLazyCheck.completeCount << " dummy addData calls in "
74 << mDetLazyCheck.irSeen << " IRs for links which did not receive data";
75 }
76 mTimer.Stop();
77 mTimer.Print();
78}
79
80//_____________________________________________________________________
81void RawFileWriter::fillFromCache()
82{
83 LOG(info) << "Filling links from cached trees";
84 mCachingStage = false;
85 for (const auto& cache : mCacheMap) {
86 for (const auto& entry : cache.second) {
87 auto& link = getLinkWithSubSpec(entry.first);
88 link.cacheTree->GetEntry(entry.second);
89 if (mDoLazinessCheck) {
90 mDetLazyCheck.completeLinks(this, cache.first); // make sure that all links for previously called IR got their addData call
91 mDetLazyCheck.acknowledge(link.subspec, cache.first, link.cacheBuffer.preformatted, link.cacheBuffer.trigger, link.cacheBuffer.detField);
92 }
93 link.addData(cache.first, link.cacheBuffer.payload, link.cacheBuffer.preformatted, link.cacheBuffer.trigger, link.cacheBuffer.detField);
94 }
95 }
96 mCacheFile->cd();
97 for (auto& linkEntry : mSSpec2Link) {
98 if (linkEntry.second.cacheTree) {
99 linkEntry.second.cacheTree->Write();
100 linkEntry.second.cacheTree.reset(nullptr);
101 }
102 }
103 std::string cacheName{mCacheFile->GetName()};
104 mCacheFile->Close();
105 mCacheFile.reset(nullptr);
106 unlink(cacheName.c_str());
107}
108
109//_____________________________________________________________________
110RawFileWriter::LinkData::LinkData(const LinkData& src) : rdhCopy(src.rdhCopy), updateIR(src.updateIR), lastRDHoffset(src.lastRDHoffset), startOfRun(src.startOfRun), packetCounter(src.packetCounter), pageCnt(src.pageCnt), subspec(src.subspec), nTFWritten(src.nTFWritten), nRDHWritten(src.nRDHWritten), nBytesWritten(src.nBytesWritten), fileName(src.fileName), buffer(src.buffer), writer(src.writer)
111{
112}
113
114//_____________________________________________________________________
116{
117 if (this != &src) {
118 rdhCopy = src.rdhCopy;
119 updateIR = src.updateIR;
120 lastRDHoffset = src.lastRDHoffset;
121 startOfRun = src.startOfRun;
122 packetCounter = src.packetCounter;
123 pageCnt = src.pageCnt;
124 subspec = src.subspec;
125 nTFWritten = src.nTFWritten;
126 nRDHWritten = src.nRDHWritten;
127 nBytesWritten = src.nBytesWritten;
128 fileName = src.fileName;
129 buffer = src.buffer;
130 writer = src.writer;
131 }
132 return *this;
133}
134
135//_____________________________________________________________________
136RawFileWriter::LinkData& RawFileWriter::registerLink(uint16_t fee, uint16_t cru, uint8_t link, uint8_t endpoint, std::string_view outFileNameV)
137{
138 // register the GBT link and its output file
139 std::string outFileName{outFileNameV};
140 auto sspec = RDHUtils::getSubSpec(cru, link, endpoint, fee);
141 auto& linkData = mSSpec2Link[sspec];
142 auto& file = mFName2File[std::string(outFileName)];
143 if (!file.handler) {
144 if (!(file.handler = fopen(outFileName.c_str(), "wb"))) { // if file does not exist, create it
145 LOG(error) << "Failed to open output file " << outFileName;
146 throw std::runtime_error(std::string("cannot open link output file ") + outFileName);
147 }
148 }
149 if (!linkData.fileName.empty()) { // this link was already declared and associated with a file
150 if (linkData.fileName == outFileName) {
151 LOGF(info, "Link 0x%ux was already declared with same output, do nothing", sspec);
152 return linkData;
153 } else {
154 LOGF(error, "Link 0x%ux was already connected to different output file %s", sspec, linkData.fileName);
155 throw std::runtime_error("redifinition of the link output file");
156 }
157 }
158 linkData.fileName = outFileName;
159 linkData.subspec = sspec;
160 RDHUtils::setVersion(linkData.rdhCopy, mUseRDHVersion);
161 if (mUseRDHVersion > 6) {
162 RDHUtils::setDataFormat(linkData.rdhCopy, mUseRDHDataFormat);
163 }
164 RDHUtils::setFEEID(linkData.rdhCopy, fee);
165 RDHUtils::setCRUID(linkData.rdhCopy, cru);
166 RDHUtils::setLinkID(linkData.rdhCopy, link);
167 RDHUtils::setEndPointID(linkData.rdhCopy, endpoint);
168 if (mUseRDHVersion >= 6) {
169 RDHUtils::setSourceID(linkData.rdhCopy, o2::header::DAQID::O2toDAQ(mOrigin));
170 }
171 linkData.writer = this;
172 linkData.updateIR = mHBFUtils.obligatorySOR ? mHBFUtils.getFirstIR() : mHBFUtils.getFirstSampledTFIR();
173 linkData.buffer.reserve(mSuperPageSize);
174 RDHUtils::printRDH(linkData.rdhCopy);
175 LOGF(info, "Registered %s with output to %s", linkData.describe(), outFileName);
176 return linkData;
177}
178
179//_____________________________________________________________________
180void RawFileWriter::addData(uint16_t feeid, uint16_t cru, uint8_t lnk, uint8_t endpoint, const IR& ir, const gsl::span<char> data, bool preformatted, uint32_t trigger, uint32_t detField)
181{
182 // add payload to relevant links
183 auto sspec = RDHUtils::getSubSpec(cru, lnk, endpoint, feeid);
184 auto& link = getLinkWithSubSpec(sspec);
185 if (mVerbosity > 10) {
186 LOGP(info, "addData for {} on IR BCid:{} Orbit: {}, payload: {}, preformatted: {}, trigger: {}, detField: {}", link.describe(), ir.bc, ir.orbit, data.size(), preformatted, trigger, detField);
187 }
188 if (isCRUDetector() && mUseRDHDataFormat == 0 && (data.size() % RDHUtils::GBTWord128)) {
189 LOG(error) << "provided payload size " << data.size() << " is not multiple of GBT word size";
190 throw std::runtime_error("payload size is not mutiple of GBT word size");
191 }
192 if (ir < mHBFUtils.getFirstSampledTFIR()) {
193 LOG(warning) << "provided " << ir << " precedes first sampled TF " << mHBFUtils.getFirstSampledTFIR() << " | discarding data for " << link.describe();
194 return;
195 }
196 if (link.discardData || ir.orbit - mHBFUtils.orbitFirst >= mHBFUtils.maxNOrbits) {
197 if (!link.discardData) {
198 link.discardData = true;
199 LOG(info) << "Orbit " << ir.orbit << ": max. allowed orbit " << mHBFUtils.orbitFirst + mHBFUtils.maxNOrbits - 1 << " exceeded, " << link.describe() << " will discard further data";
200 }
201 return;
202 }
203 if (ir < mFirstIRAdded) {
204 mHBFUtils.checkConsistency(); // done only once
205 mFirstIRAdded = ir;
206 }
207 if (mDoLazinessCheck && !mCachingStage) {
208 mDetLazyCheck.completeLinks(this, ir); // make sure that all links for previously called IR got their addData call
209 mDetLazyCheck.acknowledge(sspec, ir, preformatted, trigger, detField);
210 }
211 link.addData(ir, data, preformatted, trigger, detField);
212}
213
214//_____________________________________________________________________
216{
217 mSuperPageSize = nbytes < 16 * RDHUtils::MAXCRUPage ? RDHUtils::MAXCRUPage : nbytes;
218 assert((mSuperPageSize % RDHUtils::MAXCRUPage) == 0); // make sure it is multiple of 8KB
219}
220
221//_____________________________________________________________________
223{
224 // highest IR seen so far
225 IR irmax{0, 0};
226 for (auto& lnk : mSSpec2Link) {
227 if (irmax < lnk.second.updateIR) {
228 irmax = lnk.second.updateIR;
229 }
230 }
231 return irmax;
232}
233
234//_____________________________________________________________________
236{
237 auto lnkIt = mSSpec2Link.find(ss);
238 if (lnkIt == mSSpec2Link.end()) {
239 LOGF(error, "The link for SubSpec=0x%u was not registered", ss);
240 throw std::runtime_error("data for non-registered GBT link supplied");
241 }
242 return lnkIt->second;
243}
244
245//_____________________________________________________________________
246void RawFileWriter::writeConfFile(std::string_view origin, std::string_view description, std::string_view cfgname, bool fullPath) const
247{
248 // write configuration file for generated data
249 std::ofstream cfgfile;
250 cfgfile.open(cfgname.data());
251 // this is good for the global settings only, problematic for concatenation
252 cfgfile << "#[defaults]" << std::endl;
253 cfgfile << "#dataOrigin = " << origin << std::endl;
254 cfgfile << "#dataDescription = " << description << std::endl;
255 cfgfile << "#readoutCard = " << (isCRUDetector() ? "CRU" : "RORC") << std::endl;
256 for (int i = 0; i < getNOutputFiles(); i++) {
257 cfgfile << std::endl
258 << "[input-" << mOrigin.str << '-' << i << "]" << std::endl;
259 cfgfile << "dataOrigin = " << origin << std::endl;
260 cfgfile << "dataDescription = " << description << std::endl;
261 cfgfile << "readoutCard = " << (isCRUDetector() ? "CRU" : "RORC") << std::endl;
262 cfgfile << "filePath = " << (fullPath ? o2::utils::Str::getFullPath(getOutputFileName(i)) : getOutputFileName(i)) << std::endl;
263 }
264 cfgfile.close();
265}
266
267//___________________________________________________________________________________
269{
270 // impose preliminary caching of data to the tree, used in case of async. data input
271 if (!mFirstIRAdded.isDummy()) {
272 throw std::runtime_error("caching must be requested before feeding the data");
273 }
274 mCachingStage = true;
275 if (mCacheFile) {
276 return; // already done
277 }
278 auto cachename = o2::utils::Str::concat_string("_rawWriter_cache_", mOrigin.str, ::getpid(), ".root");
279 mCacheFile.reset(TFile::Open(cachename.c_str(), "recreate"));
280 LOG(info) << "Switched caching ON";
281}
282
283//===================================================================================
284
285//___________________________________________________________________________________
286void RawFileWriter::LinkData::cacheData(const IR& ir, const gsl::span<char> data, bool preformatted, uint32_t trigger, uint32_t detField)
287{
288 // cache data to temporary tree
289 std::lock_guard<std::mutex> lock(writer->mCacheFileMtx);
290 if (!cacheTree) {
291 writer->mCacheFile->cd();
292 cacheTree = std::make_unique<TTree>(o2::utils::Str::concat_string("lnk", std::to_string(subspec)).c_str(), "cache");
293 cacheTree->Branch("cache", &cacheBuffer);
294 }
295 cacheBuffer.preformatted = preformatted;
296 cacheBuffer.trigger = trigger;
297 cacheBuffer.detField = detField;
298 cacheBuffer.payload.resize(data.size());
299 if (!data.empty()) {
300 memcpy(cacheBuffer.payload.data(), data.data(), data.size());
301 }
302 writer->mCacheMap[ir].emplace_back(subspec, cacheTree->GetEntries());
303 cacheTree->Fill();
304 return;
305}
306
307//___________________________________________________________________________________
308void RawFileWriter::LinkData::addData(const IR& ir, const gsl::span<char> data, bool preformatted, uint32_t trigger, uint32_t detField)
309{
310 // add payload corresponding to IR, locking access to this method
311 std::lock_guard<std::mutex> lock(mtx);
312 addDataInternal(ir, data, preformatted, trigger, detField);
313}
314
315//___________________________________________________________________________________
316void RawFileWriter::LinkData::addDataInternal(const IR& ir, const gsl::span<char> data, bool preformatted, uint32_t trigger, uint32_t detField, bool checkEmpty)
317{
318 // add payload corresponding to IR
319 LOG(debug) << "Adding " << data.size() << " bytes in IR " << ir << " to " << describe() << " checkEmpty=" << checkEmpty;
320 if (writer->mCachingStage) {
321 cacheData(ir, data, preformatted, trigger, detField);
322 return;
323 }
324 if (startOfRun && ((writer->mHBFUtils.getFirstIRofTF(ir) > writer->mHBFUtils.getFirstIR()) && !writer->mHBFUtils.obligatorySOR)) {
325 startOfRun = false;
326 }
327
328 if (startOfRun && writer->isRORCDetector()) { // in RORC mode we write separate RDH with SOX in the very beginning of the run
329 writer->mHBFUtils.updateRDH<RDHAny>(rdhCopy, writer->mHBFUtils.getFirstIR(), false);
330 RDHUtils::setTriggerType(rdhCopy, 0);
331 openHBFPage(rdhCopy); // open new HBF just to report the SOX
332 // closeHBFPage();
333 }
334
335 int dataSize = data.size();
336 if (ir >= updateIR && checkEmpty) { // new IR exceeds or equal IR of next HBF to open, insert missed HBFs if needed
337 fillEmptyHBHs(ir, true);
338 }
339 // we are guaranteed to be under the valid RDH + possibly some data
340
341 if (trigger) {
342 auto& rdh = *getLastRDH();
343 RDHUtils::setTriggerType(rdh, RDHUtils::getTriggerType(rdh) | trigger);
344 }
345 if (detField) {
346 auto& rdh = *getLastRDH();
347 RDHUtils::setDetectorField(rdh, detField);
348 }
349
350 if (!dataSize) {
351 return;
352 }
353 if (preformatted) { // in case detectors wants to add new CRU page of predefined size
354 addPreformattedCRUPage(data);
355 return;
356 }
357
358 // if we are at the beginning of the page, detector may want to add some header
359 if (isNewPage() && writer->newRDHFunc) {
360 std::vector<char> newPageHeader;
361 writer->newRDHFunc(getLastRDH(), false, newPageHeader);
362 pushBack(newPageHeader.data(), newPageHeader.size());
363 }
364
365 const char* ptr = &data[0];
366 // in case particular detector CRU pages need to be self-consistent, when carrying-over
367 // large payload to new CRU page we may need to write optional trailer and header before
368 // and after the new RDH.
369 bool carryOver = false, wasSplit = false, lastSplitPart = false;
370 int splitID = 0;
371 std::vector<char> carryOverHeader;
372 while (dataSize > 0) {
373
374 if (carryOver) { // check if there is carry-over header to write in the buffer
375 addHBFPage(); // start new CRU page, if needed, the completed superpage is flushed
376 if (writer->newRDHFunc) {
377 std::vector<char> newPageHeader;
378 writer->newRDHFunc(getLastRDH(), false, newPageHeader);
379 pushBack(newPageHeader.data(), newPageHeader.size());
380 }
381
382 // for sure after the carryOver we have space on the CRU page, no need to check
383 LOG(debug) << "Adding carryOverHeader " << carryOverHeader.size()
384 << " bytes in IR " << ir << " to " << describe();
385 pushBack(carryOverHeader.data(), carryOverHeader.size());
386 carryOverHeader.clear();
387 carryOver = false;
388 }
389 int sizeLeftSupPage = writer->mSuperPageSize - buffer.size();
390 int sizeLeftCRUPage = RDHUtils::MAXCRUPage - (int(buffer.size()) - lastRDHoffset);
391 int sizeLeft = sizeLeftCRUPage < sizeLeftSupPage ? sizeLeftCRUPage : sizeLeftSupPage;
392 if (!sizeLeft || (sizeLeft < writer->mAlignmentSize)) { // this page is just over, open a new one
393 addHBFPage(); // start new CRU page, if needed, the completed superpage is flushed
394 if (writer->newRDHFunc) {
395 std::vector<char> newPageHeader;
396 writer->newRDHFunc(getLastRDH(), false, newPageHeader);
397 pushBack(newPageHeader.data(), newPageHeader.size());
398 }
399 continue;
400 }
401
402 if (dataSize <= sizeLeft) {
403 if (wasSplit && writer->mApplyCarryOverToLastPage) {
404 lastSplitPart = true;
405 carryOver = true;
406 }
407 } else {
408 carryOver = true;
409 wasSplit = true;
410 }
411
412 if (!carryOver) { // add all remaining data
413 LOG(debug) << "Adding payload " << dataSize << " bytes in IR " << ir << " (carryover=" << carryOver << " ) to " << describe();
414 pushBack(ptr, dataSize);
415 dataSize = 0;
416 } else { // need to carryOver payload, determine 1st wsize bytes to write starting from ptr
417 if (sizeLeft > dataSize) {
418 sizeLeft = dataSize;
419 }
420 int sizeActual = sizeLeft;
421 std::vector<char> carryOverTrailer;
422 if (writer->carryOverFunc) {
423 sizeActual = writer->carryOverFunc(&rdhCopy, data, ptr, sizeLeft, splitID++, carryOverTrailer, carryOverHeader);
424 }
425 LOG(debug) << "Adding carry-over " << splitID - 1 << " fitted payload " << sizeActual << " bytes in IR " << ir << " to " << describe();
426 if (sizeActual < 0 || (!lastSplitPart && (sizeActual + carryOverTrailer.size() > sizeLeft))) {
427 throw std::runtime_error(std::string("wrong carry-over data size provided by carryOverMethod") + std::to_string(sizeActual));
428 }
429 // if there is carry-over trailer at the very last chunk, it must overwrite existing trailer
430 int trailerOffset = 0;
431 if (lastSplitPart) {
432 trailerOffset = carryOverTrailer.size();
433 if (sizeActual - trailerOffset < 0) {
434 throw std::runtime_error("trailer size of last split chunk cannot exceed actual size as it overwrites the existing trailer");
435 }
436 }
437 pushBack(ptr, sizeActual - trailerOffset); // write payload fitting to this page
438 dataSize -= sizeActual;
439 ptr += sizeActual;
440 LOG(debug) << "Adding carryOverTrailer " << carryOverTrailer.size() << " bytes in IR " << ir << " to " << describe();
441 pushBack(carryOverTrailer.data(), carryOverTrailer.size());
442 }
443 }
444}
445
446//___________________________________________________________________________________
448{
449 // add preformatted CRU page w/o any attempt of splitting
450
451 // we are guaranteed to have a page with RDH open
452 int sizeLeftSupPage = writer->mSuperPageSize - buffer.size();
453 if (sizeLeftSupPage < data.size()) { // we are not allowed to split this payload
454 flushSuperPage(true); // flush all but the last added RDH
455 }
456 if (data.size() > RDHUtils::MAXCRUPage - sizeof(RDHAny)) {
457 LOG(error) << "Preformatted payload size of " << data.size() << " bytes for " << describe()
458 << " exceeds max. size " << RDHUtils::MAXCRUPage - sizeof(RDHAny);
459 throw std::runtime_error("preformatted payload exceeds max size");
460 }
461 if (int(buffer.size()) - lastRDHoffset > sizeof(RDHAny)) { // we must start from empty page
462 addHBFPage(); // start new CRU page
463 }
464 pushBack(&data[0], data.size());
465}
466
467//___________________________________________________________________________________
469{
471
472 if (lastRDHoffset < 0) {
473 return; // no page was open
474 }
475 // finalize last RDH
476 auto& lastRDH = *getLastRDH();
477 int psize = getCurrentPageSize(); // set the size for the previous header RDH
478
479 if (writer->mAlignmentSize && psize % writer->mAlignmentSize != 0) { // need to pad to align to needed size
480 std::vector<char> padding(writer->mAlignmentSize - psize % writer->mAlignmentSize, writer->mAlignmentPaddingFiller);
481 pushBack(padding.data(), padding.size());
482 psize += padding.size();
483 }
484 RDHUtils::setOffsetToNext(lastRDH, psize);
485 RDHUtils::setMemorySize(lastRDH, psize);
486
487 rdhCopy = lastRDH;
488 bool add = true;
489 if (stop && !writer->mAddSeparateHBFStopPage) {
490 if (writer->isRDHStopUsed()) {
491 RDHUtils::setStop(lastRDH, stop);
492 }
493 add = false;
494 }
495 if (writer->mVerbosity > 2) {
496 RDHUtils::printRDH(lastRDH);
497 }
498 if (add) { // if we are in stopping HBF and new page is needed, add it
499 // check if the superpage reached the size where it has to be flushed
500 int left = writer->mSuperPageSize - buffer.size();
501 if (left <= MarginToFlush) {
502 flushSuperPage();
503 }
504 RDHUtils::setPacketCounter(rdhCopy, packetCounter++);
505 RDHUtils::setPageCounter(rdhCopy, pageCnt++);
506 RDHUtils::setStop(rdhCopy, stop);
507 std::vector<char> userData;
508 int sz = sizeof(RDHAny);
509 if (stop) {
510 if (writer->newRDHFunc) { // detector may want to write something in closing page
511 writer->newRDHFunc(&rdhCopy, psize == sizeof(RDHAny), userData);
512 sz += userData.size();
513 }
514 if (writer->mAlignmentSize && sz % writer->mAlignmentSize != 0) { // need to pad to align to needed size
515 sz += writer->mAlignmentSize - sz % writer->mAlignmentSize;
516 userData.resize(sz - sizeof(RDHAny), writer->mAlignmentPaddingFiller);
517 }
518 }
519 RDHUtils::setOffsetToNext(rdhCopy, sz);
520 RDHUtils::setMemorySize(rdhCopy, sz);
521 lastRDHoffset = pushBack(rdhCopy); // entry of the new RDH
522 if (!userData.empty()) {
523 pushBack(userData.data(), userData.size());
524 }
525 }
526 if (stop) {
527 if (RDHUtils::getTriggerType(rdhCopy) & o2::trigger::TF) {
528 nTFWritten++;
529 }
530 if (writer->mVerbosity > 2 && add) {
531 RDHUtils::printRDH(rdhCopy);
532 }
533 lastRDHoffset = -1; // after closing, the previous RDH is not valid anymore
534 startOfRun = false; // signal that we are definitely not in the beginning of the run
535 }
536 //
537}
538
539//___________________________________________________________________________________
541{
542 // close the HBF page, if it is empty and detector has a special treatment of empty pages
543 // invoke detector callback method
544 if (lastRDHoffset < 0) {
545 return; // no page was open
546 }
547 bool emptyPage = getCurrentPageSize() == sizeof(RDHAny);
548 if (emptyPage && writer->emptyHBFFunc) { // we are closing an empty page, does detector want to add something?
549 std::vector<char> emtyHBFFiller; // working space for optional empty HBF filler
550 const auto rdh = getLastRDH();
551 writer->emptyHBFFunc(rdh, emtyHBFFiller);
552 if (!emtyHBFFiller.empty()) {
553 auto ir = RDHUtils::getTriggerIR(rdh);
554 LOG(debug) << "Adding empty HBF filler of size " << emtyHBFFiller.size() << " for " << describe();
555 addDataInternal(ir, emtyHBFFiller, false, 0, 0, false); // add filler w/o new check for empty HBF
556 }
557 }
558 addHBFPage(true);
559}
560
561//___________________________________________________________________________________
562void RawFileWriter::LinkData::openHBFPage(const RDHAny& rdhn, uint32_t trigger)
563{
565 bool forceNewPage = false;
566 // for RORC detectors the TF flag is absent, instead the 1st trigger after the start of TF will define the 1st be interpreted as 1st TF
567 if ((RDHUtils::getTriggerType(rdhn) & o2::trigger::TF) ||
568 (writer->isRORCDetector() &&
569 (updateIR == writer->mHBFUtils.getFirstIR() || writer->mHBFUtils.getTF(updateIR - 1) < writer->mHBFUtils.getTF(RDHUtils::getTriggerIR(rdhn))))) {
570 if (writer->mVerbosity > -10) {
571 LOGF(info, "Starting new TF for link FEEId 0x%04x", RDHUtils::getFEEID(rdhn));
572 }
573 if (writer->mStartTFOnNewSPage && nTFWritten) { // don't flush if 1st TF
574 forceNewPage = true;
575 }
576 }
577 int left = writer->mSuperPageSize - buffer.size();
578 if (forceNewPage || left <= MarginToFlush) {
579 flushSuperPage();
580 }
581 pageCnt = 0;
582 lastRDHoffset = pushBack(rdhn);
583 auto& newrdh = *getLastRDH(); // dress new RDH with correct counters
584 RDHUtils::setPacketCounter(newrdh, packetCounter++);
585 RDHUtils::setPageCounter(newrdh, pageCnt++);
586 RDHUtils::setStop(newrdh, 0);
587 RDHUtils::setMemorySize(newrdh, sizeof(RDHAny));
588 RDHUtils::setOffsetToNext(newrdh, sizeof(RDHAny));
589
590 if (startOfRun && writer->isReadOutModeSet()) {
591 auto trg = RDHUtils::getTriggerType(newrdh) | (writer->isContinuousReadout() ? o2::trigger::SOC : o2::trigger::SOT);
592 RDHUtils::setTriggerType(newrdh, trg);
593 }
594 rdhCopy = newrdh;
595}
596
597//___________________________________________________________________________________
599{
600 // write link superpage data to file (if requested, only up to the last page)
601 size_t pgSize = (lastRDHoffset < 0 || !keepLastPage) ? buffer.size() : lastRDHoffset;
602 if (writer->mVerbosity) {
603 LOGF(info, "Flushing super page of %u bytes for %s", pgSize, describe());
604 }
605 writer->mFName2File.find(fileName)->second.write(buffer.data(), pgSize);
606 auto toMove = buffer.size() - pgSize;
607 if (toMove) { // is there something left in the buffer, move it to the beginning of the buffer
608 if (toMove > pgSize) {
609 memcpy(buffer.data(), &buffer[pgSize], toMove);
610 } else {
611 memmove(buffer.data(), &buffer[pgSize], toMove);
612 }
613 buffer.resize(toMove);
614 lastRDHoffset -= pgSize;
615 } else {
616 buffer.clear();
617 lastRDHoffset = -1;
618 }
619}
620
621//___________________________________________________________________________________
623{
624 // finalize link data
625 // close open HBF, write empty HBFs until the end of the TF corresponding to irfin and detach from the stream
626 if (writer->mFName2File.find(fileName) == writer->mFName2File.end()) {
627 return; // already closed
628 }
629 if (writer->isCRUDetector()) { // finalize last TF
630 int tf = writer->mHBFUtils.getTF(irf);
631 auto finalIR = writer->mHBFUtils.getIRTF(tf + 1) - 1; // last IR of the current TF
632 fillEmptyHBHs(finalIR, false);
633 }
634 closeHBFPage(); // close last HBF
635 flushSuperPage();
636}
637
638//___________________________________________________________________________________
639void RawFileWriter::LinkData::fillEmptyHBHs(const IR& ir, bool dataAdded)
640{
641 // fill HBFs from last processed one to requested ir
642
643 if (writer->isCRUDetector()) {
644 std::vector<o2::InteractionRecord> irw;
645 if (!writer->mHBFUtils.fillHBIRvector(irw, updateIR, ir)) {
646 return;
647 }
648 for (const auto& irdummy : irw) {
649 if (writer->mDontFillEmptyHBF &&
650 writer->mHBFUtils.getTFandHBinTF(irdummy).second != 0 &&
651 (!dataAdded || irdummy.orbit < ir.orbit)) {
652 // even if requested, we skip empty HBF filling only if
653 // 1) we are not at the new TF start
654 // 2) method was called from addData and the current IR orbit is the one for which it was called (then it is not empty HB/trigger!)
655 continue;
656 }
657 if (writer->mVerbosity > 2) {
658 LOG(info) << "Adding HBF " << irdummy << " for " << describe();
659 }
660 closeHBFPage(); // close current HBF: add RDH with stop and update counters
661 RDHUtils::setTriggerType(rdhCopy, 0); // reset to avoid any detector specific flags in the dummy HBFs
662 writer->mHBFUtils.updateRDH<RDHAny>(rdhCopy, irdummy, writer->isCRUDetector()); // update HBF orbit/bc and trigger flags
663 openHBFPage(rdhCopy); // open new HBF
664 }
665 updateIR = irw.back() + o2::constants::lhc::LHCMaxBunches; // new HBF will be generated at >= this IR
666 } else { // RORC detector
667 if (writer->mVerbosity > 2) {
668 LOG(info) << "Adding HBF " << ir << " for " << describe();
669 }
670 closeHBFPage(); // close current HBF: add RDH with stop and update counters
671 RDHUtils::setTriggerType(rdhCopy, 0); // reset to avoid any detector specific flags in the dummy HBFs
672 writer->mHBFUtils.updateRDH<RDHAny>(rdhCopy, ir, false); // update HBF orbit/bc and trigger flags
673 openHBFPage(rdhCopy); // open new HBF
674 updateIR = ir + 1; // new Trigger in RORC detector will be generated at >= this IR
675 }
676}
677
678//____________________________________________
680{
681 std::stringstream ss;
682 ss << "Link SubSpec=0x" << std::hex << std::setw(8) << std::setfill('0') << subspec << std::dec
683 << '(' << std::setw(3) << int(RDHUtils::getCRUID(rdhCopy)) << ':' << std::setw(2) << int(RDHUtils::getLinkID(rdhCopy)) << ':'
684 << int(RDHUtils::getEndPointID(rdhCopy)) << ") feeID=0x" << std::hex << std::setw(4) << std::setfill('0') << RDHUtils::getFEEID(rdhCopy);
685 return ss.str();
686}
687
688//____________________________________________
690{
691 LOGF(info, "Summary for %s : NTF: %u NRDH: %u Nbytes: %u", describe(), nTFWritten, nRDHWritten, nBytesWritten);
692}
693
694//____________________________________________
695size_t RawFileWriter::LinkData::pushBack(const char* ptr, size_t sz, bool keepLastOnFlash)
696{
697 if (!sz) {
698 return buffer.size();
699 }
700 nBytesWritten += sz;
701 // do we have a space one this superpage?
702 if ((writer->mSuperPageSize - int(buffer.size())) < 0) { // need to flush
703 flushSuperPage(keepLastOnFlash);
704 }
705 auto offs = expandBufferBy(sz);
706 memmove(&buffer[offs], ptr, sz);
707 return offs;
708}
709
710//================================================
711
712//____________________________________________
713void RawFileWriter::OutputFile::write(const char* data, size_t sz)
714{
715 std::lock_guard<std::mutex> lock(fileMtx);
716 fwrite(data, 1, sz, handler); // flush to file
717}
718
719//____________________________________________
720void RawFileWriter::DetLazinessCheck::acknowledge(LinkSubSpec_t s, const IR& _ir, bool _preformatted, uint32_t _trigger, uint32_t _detField)
721{
722 if (_ir != ir) { // unseen IR arrived
723 ir = _ir;
724 irSeen++;
725 preformatted = _preformatted;
726 trigger = _trigger;
727 detField = _detField;
728 }
729 linksDone[s] = true;
730}
731
732//____________________________________________
734{
735 if (wr->mSSpec2Link.size() == linksDone.size() || ir == _ir || ir.isDummy()) { // nothing to do
736 return;
737 }
738 for (auto& it : wr->mSSpec2Link) {
739 auto res = linksDone.find(it.first);
740 if (res == linksDone.end()) {
741 if (wr->mVerbosity > 10) {
742 LOGP(info, "Complete {} for IR BCid:{} Orbit: {}", it.second.describe(), ir.bc, ir.orbit);
743 }
744 completeCount++;
745 it.second.addData(ir, gsl::span<char>{}, preformatted, trigger, detField);
746 }
747 }
748 clear();
749}
750
751void o2::raw::assertOutputDirectory(std::string_view outDirName)
752{
753 if (!std::filesystem::exists(outDirName)) {
754#if defined(__clang__)
755 // clang `create_directories` implementation is misbehaving and can
756 // return false even if the directory is actually successfully created
757 // so we work around that "feature" by not checking the
758 // return value at all but using a second call to `exists`
759 std::filesystem::create_directories(outDirName);
760 if (!std::filesystem::exists(outDirName)) {
761 LOG(fatal) << "could not create output directory " << outDirName;
762 }
763#else
764 if (!std::filesystem::create_directories(outDirName)) {
765 LOG(fatal) << "could not create output directory " << outDirName;
766 }
767#endif
768 }
769}
Definition of the 32 Central Trigger System (CTS) Trigger Types defined in https://twiki....
uint16_t padding
int32_t i
Definition of the Names Generator class.
uint8_t endpoint
Definition RawData.h:0
uint32_t res
Definition RawData.h:0
Utility class to write detectors data to (multiple) raw data file(s) respecting CRU format.
TBranch * ptr
std::ostringstream debug
static constexpr ID O2toDAQ(o2::header::DataOrigin o2orig)
Definition DAQID.h:70
std::string getOutputFileName(int i) const
void setSuperPageSize(int nbytes)
void addData(uint16_t feeid, uint16_t cru, uint8_t lnk, uint8_t endpoint, const IR &ir, const gsl::span< char > data, bool preformatted=false, uint32_t trigger=0, uint32_t detField=0)
LinkData & registerLink(uint16_t fee, uint16_t cru, uint8_t link, uint8_t endpoint, std::string_view outFileName)
void writeConfFile(std::string_view origin="FLP", std::string_view description="RAWDATA", std::string_view cfgname="raw.cfg", bool fullPath=true) const
IR getIRMax() const
get highest IR seen so far
o2::header::RDHAny RDHAny
LinkData & getLinkWithSubSpec(LinkSubSpec_t ss)
GLenum src
Definition glcorearb.h:1767
GLuint buffer
Definition glcorearb.h:655
GLuint entry
Definition glcorearb.h:5735
GLenum GLsizei dataSize
Definition glcorearb.h:3994
GLboolean * data
Definition glcorearb.h:298
constexpr int LHCMaxBunches
void assertOutputDirectory(std::string_view outDirName)
uint32_t LinkSubSpec_t
Definition RDHUtils.h:34
constexpr uint32_t SOT
Definition Triggers.h:33
constexpr uint32_t TF
Definition Triggers.h:37
constexpr uint32_t SOC
Definition Triggers.h:35
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::mutex mtx
std::unique_ptr< GPUReconstructionTimeframe > tf
uint32_t orbit
LHC orbit.
uint16_t bc
bunch crossing ID of interaction
bool obligatorySOR
in mc->raw always start from run 1st TF to set the SOR
Definition HBFUtils.h:139
void checkConsistency() const
Definition HBFUtils.cxx:53
uint32_t maxNOrbits
max number of orbits to accept, used in digit->raw conversion
Definition HBFUtils.h:143
IR getFirstIR() const
Definition HBFUtils.h:47
IR getFirstSampledTFIR() const
get TF and HB (abs) for this IR
Definition HBFUtils.h:74
uint32_t orbitFirst
orbit of 1st TF of the run
Definition HBFUtils.h:140
static constexpr int MAXCRUPage
Definition RDHUtils.h:53
static LinkSubSpec_t getSubSpec(uint16_t cru, uint8_t link, uint8_t endpoint, uint16_t feeId, o2::header::DAQID::ID srcid=o2::header::DAQID::INVALID)
Definition RDHUtils.h:685
static void setLinkID(H &rdh, uint8_t v, NOTPTR(H))
Definition RDHUtils.h:256
static void setDataFormat(RDHv7 &rdh, uint8_t s)
Definition RDHUtils.h:493
static void setDetectorField(H &rdh, uint32_t v, NOTPTR(H))
Definition RDHUtils.h:582
static void setTriggerType(H &rdh, uint32_t v, NOTPTR(H))
Definition RDHUtils.h:523
static constexpr int GBTWord128
Definition RDHUtils.h:52
static void setPageCounter(H &rdh, uint16_t v, NOTPTR(H))
Definition RDHUtils.h:557
static void setMemorySize(H &rdh, uint16_t v, NOTPTR(H))
Definition RDHUtils.h:240
static void printRDH(const RDHv4 &rdh)
Definition RDHUtils.cxx:26
static void setVersion(H &rdh, uint8_t v, NOTPTR(H))
Definition RDHUtils.h:88
static void setPacketCounter(H &rdh, uint8_t v, NOTPTR(H))
Definition RDHUtils.h:272
static void setSourceID(RDHv7 &rdh, uint8_t s)
Definition RDHUtils.h:202
static void setEndPointID(H &rdh, uint8_t v, NOTPTR(H))
Definition RDHUtils.h:304
static void setFEEID(RDHv4 &rdh, uint16_t v)
Definition RDHUtils.h:146
static void setOffsetToNext(H &rdh, uint16_t v, NOTPTR(H))
Definition RDHUtils.h:224
static void setStop(H &rdh, uint8_t v, NOTPTR(H))
Definition RDHUtils.h:644
static void setCRUID(H &rdh, uint16_t v, NOTPTR(H))
Definition RDHUtils.h:288
void acknowledge(LinkSubSpec_t s, const IR &_ir, bool _preformatted, uint32_t _trigger, uint32_t _detField)
void completeLinks(RawFileWriter *wr, const IR &_ir)
LinkData & operator=(const LinkData &src)
void addPreformattedCRUPage(const gsl::span< char > data)
void addData(const IR &ir, const gsl::span< char > data, bool preformatted=false, uint32_t trigger=0, uint32_t detField=0)
size_t pushBack(const char *ptr, size_t sz, bool keepLastOnFlash=true)
append to the end of the buffer and return the point where appended to
void flushSuperPage(bool keepLastPage=false)
void fillEmptyHBHs(const IR &ir, bool dataAdded)
void addDataInternal(const IR &ir, const gsl::span< char > data, bool preformatted=false, uint32_t trigger=0, uint32_t detField=0, bool checkEmpty=true)
void cacheData(const IR &ir, const gsl::span< char > data, bool preformatted, uint32_t trigger=0, uint32_t detField=0)
void openHBFPage(const RDHAny &rdh, uint32_t trigger=0)
void write(const char *data, size_t size)
static std::string concat_string(Ts const &... ts)
static std::string getFullPath(const std::string_view p)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
o2::InteractionRecord ir(0, 0)
vec clear()