Project
Loading...
Searching...
No Matches
RawFileWriter.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#ifndef DETECTOR_BASE_RAWFILEWRITER_H
13#define DETECTOR_BASE_RAWFILEWRITER_H
14
18
19#include <gsl/span>
20#include <unordered_map>
21#include <vector>
22#include <map>
23#include <string>
24#include <string_view>
25#include <functional>
26#include <mutex>
27
28#include <Rtypes.h>
29#include <TTree.h>
30#include <TStopwatch.h>
32#include "Headers/DataHeader.h"
33#include "Headers/DAQID.h"
36
37namespace o2
38{
39namespace raw
40{
41
43{
44
45 public:
48 using CarryOverCallBack = std::function<int(const RDHAny* rdh, const gsl::span<char> data,
49 const char* ptr, int size, int splitID,
50 std::vector<char>& trailer, std::vector<char>& header)>;
51 using EmptyPageCallBack = std::function<void(const RDHAny* rdh, std::vector<char>& emptyHBF)>;
52 using NewRDHCallBack = std::function<void(const RDHAny* rdh, bool prevEmpty, std::vector<char>& filler)>;
53
56 struct OutputFile {
57 FILE* handler = nullptr;
58 std::mutex fileMtx;
59 OutputFile() = default;
62 {
63 if (this != &src) {
65 }
66 return *this;
67 }
68 void write(const char* data, size_t size);
69 };
71 struct PayloadCache {
72 bool preformatted = false;
73 uint32_t trigger = 0;
74 uint32_t detField = 0;
75 std::vector<char> payload;
77 };
78
81 struct LinkData {
82 static constexpr int MarginToFlush = 10 * sizeof(RDHAny); // flush superpage if free space left <= this margin
83 RDHAny rdhCopy; // RDH with the running info of the last RDH seen
84 IR updateIR; // IR at which new HBF needs to be created
85 int lastRDHoffset = -1; // position of last RDH in the link buffer
86 bool startOfRun = true; // to signal if this is the 1st HBF of the run or not
87 uint8_t packetCounter = 0; // running counter
88 uint16_t pageCnt = 0; // running counter
89 LinkSubSpec_t subspec = 0; // subspec according to DataDistribution
90 bool discardData = false; // discard data if true (e.g. desired max IR reached)
91 //
92 size_t nTFWritten = 0; // number of TFs written
93 size_t nRDHWritten = 0; // number of RDHs written
94 size_t nBytesWritten = 0; // number of bytes written
95 //
96 std::string fileName{}; // file name associated with this link
97 std::vector<char> buffer; // buffer to accumulate superpage data
98 RawFileWriter* writer = nullptr; // pointer on the parent writer
99
100 PayloadCache cacheBuffer; // used for caching in case of async. data input
101 std::unique_ptr<TTree> cacheTree; // tree to store the cache
102
103 std::mutex mtx;
104
105 LinkData() = default;
106 ~LinkData() = default;
107 LinkData(const LinkData& src); // due to the mutex...
108 LinkData& operator=(const LinkData& src); // due to the mutex...
109 void close(const IR& ir);
110 void print() const;
111 void addData(const IR& ir, const gsl::span<char> data, bool preformatted = false, uint32_t trigger = 0, uint32_t detField = 0);
112 RDHAny* getLastRDH() { return lastRDHoffset < 0 ? nullptr : reinterpret_cast<RDHAny*>(&buffer[lastRDHoffset]); }
113 int getCurrentPageSize() const { return lastRDHoffset < 0 ? -1 : int(buffer.size()) - lastRDHoffset; }
114 // check if we are at the beginning of new page
115 bool isNewPage() const { return getCurrentPageSize() == sizeof(RDHAny); }
116 std::string describe() const;
117
118 protected:
119 void addDataInternal(const IR& ir, const gsl::span<char> data, bool preformatted = false, uint32_t trigger = 0, uint32_t detField = 0, bool checkEmpty = true);
120 void openHBFPage(const RDHAny& rdh, uint32_t trigger = 0);
121 void addHBFPage(bool stop = false);
122 void closeHBFPage();
123 void flushSuperPage(bool keepLastPage = false);
124 void fillEmptyHBHs(const IR& ir, bool dataAdded);
125 void addPreformattedCRUPage(const gsl::span<char> data);
126 void cacheData(const IR& ir, const gsl::span<char> data, bool preformatted, uint32_t trigger = 0, uint32_t detField = 0);
127
129 size_t expandBufferBy(size_t by)
130 {
131 auto offs = buffer.size();
132 buffer.resize(offs + by);
133 return offs;
134 }
135
137 size_t pushBack(const char* ptr, size_t sz, bool keepLastOnFlash = true);
138
140 size_t pushBack(const RDHAny& rdh)
141 {
142 nRDHWritten++;
143 return pushBack(reinterpret_cast<const char*>(&rdh), sizeof(RDHAny), false);
144 }
145 };
146 //=====================================================================================
147 // If addData was called with given IR for at least 1 link, then it should be called for all links, even it with empty payload
148 // This structure will check if detector has dared to do this
151 bool preformatted = false;
152 uint32_t trigger = 0;
153 uint32_t detField = 0;
154 size_t irSeen = 0;
155 size_t completeCount = 0;
156 std::unordered_map<LinkSubSpec_t, bool> linksDone; // links for which addData was called
157 void acknowledge(LinkSubSpec_t s, const IR& _ir, bool _preformatted, uint32_t _trigger, uint32_t _detField);
158 void completeLinks(RawFileWriter* wr, const IR& _ir);
159 void clear()
160 {
161 linksDone.clear();
162 ir.clear();
163 }
164 };
165
166 //=====================================================================================
167
168 RawFileWriter(o2::header::DataOrigin origin = o2::header::gDataOriginInvalid, bool cru = true) : mOrigin(origin)
169 {
170 if (!cru) {
172 }
173 }
175 void useCaching();
176 void doLazinessCheck(bool v) { mDoLazinessCheck = v; }
177 void writeConfFile(std::string_view origin = "FLP", std::string_view description = "RAWDATA", std::string_view cfgname = "raw.cfg", bool fullPath = true) const;
178 void close();
179
180 LinkData& registerLink(uint16_t fee, uint16_t cru, uint8_t link, uint8_t endpoint, std::string_view outFileName);
181
182 template <typename H>
183 LinkData& registerLink(const H& rdh, std::string_view outFileName)
184 {
185 RDHAny::sanityCheckLoose<H>();
186 auto& linkData = registerLink(RDHUtils::getFEEID(rdh), RDHUtils::getCRUID(rdh), RDHUtils::getLinkID(rdh), RDHUtils::getEndPointID(rdh), outFileName);
187 RDHUtils::setDetectorField(linkData.rdhCopy, RDHUtils::getDetectorField(rdh));
188 return linkData;
189 }
190
192 {
193 mOrigin = origin;
194 }
195
196 o2::header::DataOrigin getOrigin() const { return mOrigin; }
197
198 LinkData& getLinkWithSubSpec(LinkSubSpec_t ss);
199
200 template <typename H>
202 {
203 RDHAny::sanityCheckLoose<H>();
204 return mSSpec2Link[RDHUtils::getSubSpec(RDHUtils::getCRUID(rdh), RDHUtils::getLinkID(rdh), RDHUtils::getEndPointID(rdh), RDHUtils::getFEEID(rdh))];
205 }
206
207 void addData(uint16_t feeid, uint16_t cru, uint8_t lnk, uint8_t endpoint, const IR& ir,
208 const gsl::span<char> data, bool preformatted = false, uint32_t trigger = 0, uint32_t detField = 0);
209
210 template <typename H>
211 void addData(const H& rdh, const IR& ir, const gsl::span<char> data, bool preformatted = false, uint32_t trigger = 0)
212 {
213 RDHAny::sanityCheckLoose<H>();
214 addData(RDHUtils::getFEEID(rdh), RDHUtils::getCRUID(rdh), RDHUtils::getLinkID(rdh), RDHUtils::getEndPointID(rdh), ir, data, trigger);
215 }
216
217 void setContinuousReadout() { mROMode = Continuous; }
219 {
220 mROMode = Triggered;
221 setDontFillEmptyHBF(!mCRUDetector);
222 }
224 {
225 if (v) {
227 } else {
229 }
230 }
231
232 bool isContinuousReadout() const { return mROMode == Continuous; }
233 bool isTriggeredReadout() const { return mROMode == Triggered; }
234 bool isReadOutModeSet() const { return mROMode != NotSet; }
235 bool isLinkRegistered(LinkSubSpec_t ss) const { return mSSpec2Link.find(ss) != mSSpec2Link.end(); }
236
237 void setVerbosity(int v) { mVerbosity = v; }
238 int getVerbosity() const { return mVerbosity; }
239
240 int getNOutputFiles() const { return mFName2File.size(); }
241 std::string getOutputFileName(int i) const
242 {
243 if (i >= getNOutputFiles()) {
244 return "";
245 }
246 auto it = mFName2File.begin();
247 while (i--) {
248 it++;
249 }
250 return it->first;
251 }
252
253 OutputFile& getOutputFileForLink(const LinkData& lnk) { return mFName2File[lnk.fileName]; }
254
255 int getSuperPageSize() const { return mSuperPageSize; }
256 void setSuperPageSize(int nbytes);
257
259 IR getIRMax() const;
260
261 const HBFUtils& getHBFUtils() const { return mHBFUtils; }
262
263 template <class T>
264 void setCarryOverCallBack(const T* t)
265 {
266 carryOverFunc = [=](const RDHAny* rdh, const gsl::span<char> data, const char* ptr, int size, int splitID,
267 std::vector<char>& trailer, std::vector<char>& header) -> int {
268 return t->carryOverMethod(rdh, data, ptr, size, splitID, trailer, header);
269 };
270 }
271
272 template <class T>
273 void setEmptyPageCallBack(const T* t)
274 {
275 emptyHBFFunc = [=](const RDHAny* rdh, std::vector<char>& toAdd) {
276 t->emptyHBFMethod(rdh, toAdd);
277 };
278 }
279
280 template <class T>
281 void setNewRDHCallBack(const T* t)
282 {
283 newRDHFunc = [=](const RDHAny* rdh, bool prevEmpty, std::vector<char>& toAdd) {
284 t->newRDHMethod(rdh, prevEmpty, toAdd);
285 };
286 }
287
288 // This is a placeholder for the function responsible to split large payload to pieces
289 // fitting 8kB CRU pages.
290 // The RawFileWriter receives from the encoder the payload to format according to the CRU format
291 // In case this payload size does not fit into the CRU page (this may happen even if it is
292 // less than 8KB, since it might be added to already partially populated CRU page of the HBF)
293 // it will write on the page only part of the payload and carry over the rest on extra page(s).
294 // By default the RawFileWriter will simply chunk payload as is considers necessary, but some
295 // detectors want their CRU pages to be self-consistent and in case of payload splitting they
296 // add in the end of page to be closed and beginning of the new page to be opened
297 // (right after the RDH) detector-specific trailer and header respectively.
298 //
299 // The role of this method is to suggest to writer how to split the payload:
300 // If this method was set to the RawFileWriter using
301 // RawFileWriter::setCarryOverCallBack(pointer_on_the_converter_class);
302 // then the RawFileWriter will call it before splitting.
303 //
304 // It provides to the carryOverMethod method the following info:
305 // rdh : RDH of the CRU page being written
306 // data : original payload received by the RawFileWriter
307 // ptr : pointer on the data in the payload which was not yet added to the link CRU pages
308 // maxSize : maximum size (multiple of 16 bytes) of the bloc starting at ptr which it can
309 // accomodate at the current CRU page (i.e. what it would write by default)
310 // splitID : number of times this payload was already split, 0 at 1st call
311 // trailer : what it wants to add in the end of the CRU page where the data starting from ptr
312 // will be added. The trailer is supplied as an empy vector, which carryOverMethod
313 // may populate, but its size must be multiple of 16 bytes.
314 // header : what it wants to add right after the RDH of the new CRU page before the rest of
315 // the payload (starting at ptr+actualSize) will be written
316 //
317 // The method mast return actual size of the bloc which can be written (<=maxSize).
318 // If this method populates the trailer, it must ensure that it returns the actual size such that
319 // actualSize + trailer.size() <= maxSize
320 // In case returned actualSize == 0, current CRU page will be closed w/o adding anything, and new
321 // query of this method will be done on the new CRU page
322
323 int carryOverMethod(const RDHAny*, const gsl::span<char> data, const char* ptr, int maxSize, int splitID,
324 std::vector<char>& trailer, std::vector<char>& header) const
325 {
326 return maxSize; // do nothing
327 }
328
329 // This is a placeholder for the optional callback function to provide a detector-specific filler between
330 // the 1st RDH and closing RDH of the empty HBF
331 //
332 // It provides to the emptyHBFMethod method the following info:
333 // rdh : RDH of the CRU page opening empty RDH
334 // toAdd : a vector (supplied empty) to be filled to a size multipe of 16 bytes
335 //
336 void emptyHBFMethod(const RDHAny* rdh, std::vector<char>& toAdd) const
337 {
338 }
339
340 // This is a placeholder for the optional callback function to provide a detector-specific filler to be added right
341 // after the page starting by RDH (might be with RDH.stop=1 !) for the normal data filling (!! not automatic open/close RDHs for empty pages)
342 //
343 // It provides to the newRDHMethod method the following info:
344 // rdh : RDH of the CRU page to be opened
345 // prevEmpty : true is previous RDH page did not receive any data
346 // toAdd : a vector (supplied empty) to be filled to a size multipe of 16 bytes
347 //
348 void newRDHMethod(const RDHAny* rdh, bool prevEmpty, std::vector<char>& toAdd) const
349 {
350 }
351
352 int getUsedRDHVersion() const { return mUseRDHVersion; }
353 void useRDHVersion(int v)
354 {
355 assert(v >= RDHUtils::getVersion<o2::header::RDHLowest>() && v <= RDHUtils::getVersion<o2::header::RDHHighest>());
356 mUseRDHVersion = v;
357 }
358
359 unsigned char getUsedRDHDataFormat() const { return mUseRDHDataFormat; }
360 unsigned char getAlignmentSize() const { return mAlignmentSize; }
361 unsigned char getAlignmentPaddingFiller() const { return mAlignmentPaddingFiller; }
362
363 void useRDHDataFormat(unsigned char v) { mUseRDHDataFormat = v; }
364 void setAlignmentSize(unsigned char v) { mAlignmentSize = v; }
365 void setAlignmentPaddingFiller(unsigned char v) { mAlignmentPaddingFiller = v; }
366
367 bool getDontFillEmptyHBF() const { return mDontFillEmptyHBF; }
368 void setDontFillEmptyHBF(bool v) { mDontFillEmptyHBF = v; }
369
370 bool getAddSeparateHBFStopPage() const { return mAddSeparateHBFStopPage; }
371 void setAddSeparateHBFStopPage(bool v) { mAddSeparateHBFStopPage = v; }
372
374 {
375 mCRUDetector = false;
377 setUseRDHStop(false);
378 }
379
380 void setUseRDHStop(bool v = true)
381 {
382 mUseRDHStop = v;
383 if (!v) {
385 }
386 }
387
388 void setApplyCarryOverToLastPage(bool v) { mApplyCarryOverToLastPage = v; }
389
390 bool isRORCDetector() const { return !mCRUDetector; }
391 bool isCRUDetector() const { return mCRUDetector; }
392 bool isRDHStopUsed() const { return mUseRDHStop; }
393 bool isCarryOverToLastPageApplied() const { return mApplyCarryOverToLastPage; }
394
395 private:
396 void fillFromCache();
397
398 enum RoMode_t { NotSet,
399 Continuous,
400 Triggered };
401
402 const HBFUtils& mHBFUtils = HBFUtils::Instance();
403 std::unordered_map<LinkSubSpec_t, LinkData> mSSpec2Link; // mapping from subSpec to link
404 std::unordered_map<std::string, OutputFile> mFName2File; // mapping from filenames to actual files
405
406 CarryOverCallBack carryOverFunc = nullptr; // default call back for large payload splitting (does nothing)
407 EmptyPageCallBack emptyHBFFunc = nullptr; // default call back for empty HBF (does nothing)
408 NewRDHCallBack newRDHFunc = nullptr; // default call back for new page opening (does nothing)
409 // options
410 int mVerbosity = 0;
412 int mUseRDHVersion = RDHUtils::getVersion<o2::header::RAWDataHeader>(); // by default, use default version
413 unsigned char mUseRDHDataFormat = 0; // by default use padding
414 unsigned char mAlignmentSize = 0; // apply alignment to the CRU page size
415 unsigned char mAlignmentPaddingFiller = 0xff; // using this filler
416 int mSuperPageSize = 1024 * 1024; // super page size
417 bool mStartTFOnNewSPage = true; // every TF must start on a new SPage
418 bool mDontFillEmptyHBF = false; // skipp adding empty HBFs (uness it must have TF flag)
419 bool mAddSeparateHBFStopPage = true; // HBF stop is added on a separate CRU page
420 bool mUseRDHStop = true; // detector uses STOP in RDH
421 bool mCRUDetector = true; // Detector readout via CRU ( RORC if false)
422 bool mApplyCarryOverToLastPage = false; // call CarryOver method also for last chunk and overwrite modified trailer
423
424 //>> caching --------------
425 bool mCachingStage = false; // signal that current data should be cached
426 std::mutex mCacheFileMtx;
427 std::unique_ptr<TFile> mCacheFile; // file for caching
428 using CacheEntry = std::vector<std::pair<LinkSubSpec_t, size_t>>;
429 std::map<IR, CacheEntry> mCacheMap;
430 //<< caching -------------
431
432 TStopwatch mTimer;
433 RoMode_t mROMode = NotSet;
434 IR mFirstIRAdded; // 1st IR seen
435 DetLazinessCheck mDetLazyCheck{};
436 bool mDoLazinessCheck = true;
437
438 ClassDefNV(RawFileWriter, 1);
439};
440
446void assertOutputDirectory(std::string_view outDirName);
447
448} // namespace raw
449} // namespace o2
450
451#endif //DETECTOR_BASE_RAWFILEWRITER_H
int32_t i
Definition of the RAW Data Header.
uint8_t endpoint
Definition RawData.h:0
TBranch * ptr
void setCarryOverCallBack(const T *t)
std::string getOutputFileName(int i) const
void setAlignmentSize(unsigned char v)
int carryOverMethod(const RDHAny *, const gsl::span< char > data, const char *ptr, int maxSize, int splitID, std::vector< char > &trailer, std::vector< char > &header) const
void useRDHDataFormat(unsigned char v)
unsigned char getUsedRDHDataFormat() const
bool getDontFillEmptyHBF() const
void setSuperPageSize(int nbytes)
bool getAddSeparateHBFStopPage() const
void setAlignmentPaddingFiller(unsigned char v)
OutputFile & getOutputFileForLink(const LinkData &lnk)
bool isCarryOverToLastPageApplied() const
void setDontFillEmptyHBF(bool v)
std::function< void(const RDHAny *rdh, std::vector< char > &emptyHBF)> EmptyPageCallBack
int getUsedRDHVersion() const
void addData(uint16_t feeid, uint16_t cru, uint8_t lnk, uint8_t endpoint, const IR &ir, const gsl::span< char > data, bool preformatted=false, uint32_t trigger=0, uint32_t detField=0)
void addData(const H &rdh, const IR &ir, const gsl::span< char > data, bool preformatted=false, uint32_t trigger=0)
bool isReadOutModeSet() const
LinkData & registerLink(uint16_t fee, uint16_t cru, uint8_t link, uint8_t endpoint, std::string_view outFileName)
std::function< int(const RDHAny *rdh, const gsl::span< char > data, const char *ptr, int size, int splitID, std::vector< char > &trailer, std::vector< char > &header)> CarryOverCallBack
o2::header::DataOrigin getOrigin() const
void setEmptyPageCallBack(const T *t)
RawFileWriter(o2::header::DataOrigin origin=o2::header::gDataOriginInvalid, bool cru=true)
void writeConfFile(std::string_view origin="FLP", std::string_view description="RAWDATA", std::string_view cfgname="raw.cfg", bool fullPath=true) const
bool isRORCDetector() const
IR getIRMax() const
get highest IR seen so far
void setAddSeparateHBFStopPage(bool v)
bool isContinuousReadout() const
void setContinuousReadout(bool v)
void doLazinessCheck(bool v)
o2::header::RDHAny RDHAny
const HBFUtils & getHBFUtils() const
unsigned char getAlignmentSize() const
LinkData & getLinkWithSubSpec(const H &rdh)
void setNewRDHCallBack(const T *t)
std::function< void(const RDHAny *rdh, bool prevEmpty, std::vector< char > &filler)> NewRDHCallBack
LinkData & registerLink(const H &rdh, std::string_view outFileName)
void setOrigin(o2::header::DataOrigin origin)
bool isLinkRegistered(LinkSubSpec_t ss) const
void setUseRDHStop(bool v=true)
void newRDHMethod(const RDHAny *rdh, bool prevEmpty, std::vector< char > &toAdd) const
void emptyHBFMethod(const RDHAny *rdh, std::vector< char > &toAdd) const
LinkData & getLinkWithSubSpec(LinkSubSpec_t ss)
bool isTriggeredReadout() const
unsigned char getAlignmentPaddingFiller() const
void setApplyCarryOverToLastPage(bool v)
GLenum src
Definition glcorearb.h:1767
GLuint buffer
Definition glcorearb.h:655
GLsizeiptr size
Definition glcorearb.h:659
const GLdouble * v
Definition glcorearb.h:832
GLboolean * data
Definition glcorearb.h:298
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
constexpr o2::header::DataOrigin gDataOriginInvalid
Definition DataHeader.h:561
void assertOutputDirectory(std::string_view outDirName)
uint32_t LinkSubSpec_t
Definition RDHUtils.h:34
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
static LinkSubSpec_t getSubSpec(uint16_t cru, uint8_t link, uint8_t endpoint, uint16_t feeId, o2::header::DAQID::ID srcid=o2::header::DAQID::INVALID)
Definition RDHUtils.h:685
static void setDetectorField(H &rdh, uint32_t v, NOTPTR(H))
Definition RDHUtils.h:582
void acknowledge(LinkSubSpec_t s, const IR &_ir, bool _preformatted, uint32_t _trigger, uint32_t _detField)
std::unordered_map< LinkSubSpec_t, bool > linksDone
void completeLinks(RawFileWriter *wr, const IR &_ir)
LinkData & operator=(const LinkData &src)
void addPreformattedCRUPage(const gsl::span< char > data)
size_t expandBufferBy(size_t by)
expand buffer by positive increment and return old size
void addData(const IR &ir, const gsl::span< char > data, bool preformatted=false, uint32_t trigger=0, uint32_t detField=0)
static constexpr int MarginToFlush
size_t pushBack(const RDHAny &rdh)
add RDH to buffer. In case this requires flushing of the superpage, do not keep the previous page
size_t pushBack(const char *ptr, size_t sz, bool keepLastOnFlash=true)
append to the end of the buffer and return the point where appended to
void flushSuperPage(bool keepLastPage=false)
void fillEmptyHBHs(const IR &ir, bool dataAdded)
void addDataInternal(const IR &ir, const gsl::span< char > data, bool preformatted=false, uint32_t trigger=0, uint32_t detField=0, bool checkEmpty=true)
std::unique_ptr< TTree > cacheTree
void cacheData(const IR &ir, const gsl::span< char > data, bool preformatted, uint32_t trigger=0, uint32_t detField=0)
void openHBFPage(const RDHAny &rdh, uint32_t trigger=0)
void write(const char *data, size_t size)
OutputFile(const OutputFile &src)
OutputFile & operator=(const OutputFile &src)
=====================================================================================
o2::InteractionRecord ir(0, 0)