Project
Loading...
Searching...
No Matches
SubTimeFrameFileReader.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12// Adapthed with minimal changes from Gvozden Nescovic code to read sTFs files created by DataDistribution
13
16#include "Framework/Logger.h"
20#include <fairmq/Device.h>
21#include <fairmq/Message.h>
22#include <fairmq/Parts.h>
23#include <mutex>
24
25#if __linux__
26#include <sys/mman.h>
27#endif
28
29// uncomment this to check breakdown of TF building timing
30//#define _RUN_TIMING_MEASUREMENT_
31
32#ifdef _RUN_TIMING_MEASUREMENT_
33#include "TStopwatch.h"
34#endif
35
36namespace o2
37{
38namespace rawdd
39{
41using namespace o2::header;
42namespace o2f = o2::framework;
43
47
48SubTimeFrameFileReader::SubTimeFrameFileReader(const std::string& pFileName, o2::detectors::DetID::mask_t detMask, int verb, bool sup0xccdb, bool repaireHeaders, bool rejectDistSTF)
49 : mVerbosity(verb), mSup0xccdb(sup0xccdb), mRepaireHeaders(repaireHeaders), mRejectDistSTF(rejectDistSTF)
50{
51 mFile.reset(BinFileOp::open(pFileName));
52 if (!mFile || !mFile->isGood()) {
53 return;
54 }
55
56 for (DetID::ID id = DetID::First; id <= DetID::Last; id++) {
57 mDetOrigMap[DetID::getDataOrigin(id)] = detMask[id];
58 }
59}
60
61Stack SubTimeFrameFileReader::getHeaderStack(std::size_t& pOrigsize)
62{
63 // Expect valid Stack in the file.
64 // First Header must be DataHeader. The size is unknown since there are multiple versions.
65 // Each header in the stack extends BaseHeader
66
67 // Read first the base header then the rest of the extended header. Keep going until the next flag is set.
68 // reset the file pointer to the original incoming position, so the complete Stack can be read in
69 bool readNextHeader = true;
70 std::size_t bufsz = 0, lStackSize = 0;
71 std::byte* lStackMem = nullptr;
72 pOrigsize = 0;
73 const auto lFilePosStart = mFile->position();
74 const int cMaxHeaders = 16; // make sure we don't loop forever
75 int lNumHeaders = 0;
76 while (readNextHeader && (++lNumHeaders <= cMaxHeaders)) {
77 if ((lStackSize + sizeof(BaseHeader)) >= bufsz && !(lStackMem = reinterpret_cast<std::byte*>(mFile->bufferize((bufsz += BinFileOp::KBYTE))))) {
78 LOGP(error, "Could not bufferize {} bytes to read the headers stack", bufsz);
79 return Stack{};
80 }
81 const auto& lBaseHdr = *reinterpret_cast<BaseHeader*>(lStackMem + lStackSize);
82 lStackSize += lBaseHdr.size();
83 readNextHeader = (lBaseHdr.next() != nullptr);
84 }
85 if (lNumHeaders >= cMaxHeaders) {
86 LOGP(error, "Reached max number of headers allowed: {}.", cMaxHeaders);
87 return Stack{};
88 }
89 if (lStackSize < sizeof(BaseHeader)) {
90 LOGP(error, "Stack size {} is smaller than BaseHeader size {}", lStackSize, sizeof(BaseHeader));
91 return Stack{};
92 }
93
94 // This must handle different versions of DataHeader, check if DataHeader needs an upgrade by looking at the version number
95 const BaseHeader* lBaseOfDH = BaseHeader::get(lStackMem);
96 if (!lBaseOfDH) {
97 LOGP(error, "Failed to extract the DataHeader from the buffer, position in file {}", mFile->position());
98 return Stack{};
99 }
100
101 pOrigsize = lStackSize;
102 mFile->set_position(lFilePosStart + lStackSize);
103 if (lBaseOfDH->headerVersion < DataHeader::sVersion) {
104 DataHeader lNewDh;
105
106 // Write over the new DataHeader. We need to update some of the BaseHeader values.
107 assert(sizeof(DataHeader) > lBaseOfDH->size()); // current DataHeader must be larger
108 std::memcpy(&lNewDh, (void*)lBaseOfDH->data(), lBaseOfDH->size());
109
110 // make sure to bump the version in the BaseHeader. TODO: Is there a better way?
111 lNewDh.headerSize = sizeof(DataHeader);
113
114 if (lBaseOfDH->headerVersion == 1 || lBaseOfDH->headerVersion == 2) {
115 // nothing to do for the upgrade
116 } else {
117 LOGP(error, "DataHeader v{} read from file is not upgraded to the current version {}",
119 LOGP(error, "Try using a newer version of DataDistribution or file a BUG");
120 }
121
122 if (lBaseOfDH->size() == lStackSize) {
123 return Stack(lNewDh);
124 } else {
125 assert(lBaseOfDH->size() < lStackSize);
126 return Stack(lNewDh, Stack(lStackMem + lBaseOfDH->size()));
127 }
128 }
129
130 return Stack(lStackMem);
131}
132
133const std::string SubTimeFrameFileReader::describeHeader(const o2::header::DataHeader& hd, bool full) const
134{
135 std::string res = fmt::format("{}", o2f::DataSpecUtils::describe(o2::framework::OutputSpec{hd.dataOrigin, hd.dataDescription, hd.subSpecification}));
136 if (full) {
137 res += fmt::format(" part:{}/{} sz:{} TF:{} Orb:{} Run:{}", hd.splitPayloadIndex, hd.splitPayloadParts, hd.payloadSize, hd.tfCounter, hd.firstTForbit, hd.runNumber);
138 }
139 return res;
140}
141
142std::uint32_t sRunNumber = 0; // TODO: add id to files metadata
143std::uint32_t sFirstTForbit = 0; // TODO: add id to files metadata
144std::uint64_t sCreationTime = 0;
145std::mutex stfMtx;
146
147std::unique_ptr<MessagesPerRoute> SubTimeFrameFileReader::read(fair::mq::Device* device, const std::vector<o2f::OutputRoute>& outputRoutes, const std::string& rawChannel, size_t slice)
148{
149 std::unique_ptr<MessagesPerRoute> messagesPerRoute = std::make_unique<MessagesPerRoute>();
150 auto& msgMap = *messagesPerRoute.get();
151 assert(device);
152 std::unordered_map<o2::header::DataHeader, std::pair<std::string, bool>> channelsMap;
153 auto findOutputChannel = [&outputRoutes, &rawChannel, &channelsMap](const o2::header::DataHeader* h, size_t tslice) -> const std::string& {
154 if (!rawChannel.empty()) {
155 return rawChannel;
156 }
157 auto& chFromMap = channelsMap[*h];
158 if (chFromMap.first.empty() && !chFromMap.second) { // search for channel which is enountered for the 1st time
159 chFromMap.second = true; // flag that it was already checked
160 for (auto& oroute : outputRoutes) {
161 LOG(debug) << "comparing with matcher to route " << oroute.matcher << " TSlice:" << oroute.timeslice;
162 if (o2f::DataSpecUtils::match(oroute.matcher, h->dataOrigin, h->dataDescription, h->subSpecification) && ((tslice % oroute.maxTimeslices) == oroute.timeslice)) {
163 LOG(debug) << "picking the route:" << o2f::DataSpecUtils::describe(oroute.matcher) << " channel " << oroute.channel;
164 chFromMap.first = oroute.channel;
165 break;
166 }
167 }
168 }
169 return chFromMap.first;
170 };
171
172 auto addPart = [&msgMap](fair::mq::MessagePtr hd, fair::mq::MessagePtr pl, const std::string& fairMQChannel) {
173 fair::mq::Parts* parts = nullptr;
174 parts = msgMap[fairMQChannel].get(); // fair::mq::Parts*
175 if (!parts) {
176 msgMap[fairMQChannel] = std::make_unique<fair::mq::Parts>();
177 parts = msgMap[fairMQChannel].get();
178 }
179 parts->AddPart(std::move(hd));
180 parts->AddPart(std::move(pl));
181 };
182
183 // record current position
184 const auto lTfStartPosition = mFile->position();
185
186 if (lTfStartPosition == mFile->size() || !mFile || !mFile->isGood() || mFile->eof()) {
187 return nullptr;
188 }
189 auto tfID = slice;
190 uint32_t runNumberFallBack = sRunNumber;
191 uint32_t firstTForbitFallBack = sFirstTForbit;
192 uint64_t creationFallBack = sCreationTime;
193 bool negativeOrbitNotified = false, noRunNumberNotified = false, creation0Notified = false;
194 std::size_t lMetaHdrStackSize = 0;
195 const DataHeader* lStfMetaDataHdr = nullptr;
196 SubTimeFrameFileMeta lStfFileMeta;
197
198 auto printStack = [tfID](const o2::header::Stack& st) {
199 auto dph = o2::header::get<o2f::DataProcessingHeader*>(st.data());
200 auto dh = o2::header::get<o2::header::DataHeader*>(st.data());
201 LOGP(info, "TF#{} Header for {}/{}/{} @ tfCounter {} run {} | {} of {} size {}, TForbit {} | DPH: {}/{}/{}", tfID,
202 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->runNumber,
203 dh->splitPayloadIndex, dh->splitPayloadParts, dh->payloadSize, dh->firstTForbit,
204 dph ? dph->startTime : 0, dph ? dph->duration : 0, dph ? dph->creation : 0);
205 };
206
207 // Read DataHeader + SubTimeFrameFileMeta
208 auto lMetaHdrStack = getHeaderStack(lMetaHdrStackSize);
209 if (lMetaHdrStackSize == 0) {
210 LOG(error) << "Failed to read the TF file header. The file might be corrupted.";
211 mFile.reset(nullptr);
212 return nullptr;
213 }
214 lStfMetaDataHdr = o2::header::DataHeader::Get(lMetaHdrStack.first());
215 if (mVerbosity > 0) {
216 LOGP(info, "read filemeta, pos = {}, size = {}", mFile->position(), sizeof(SubTimeFrameFileMeta));
217 }
218 if (!mFile->read_advance(&lStfFileMeta, sizeof(SubTimeFrameFileMeta))) {
219 return nullptr;
220 }
221 if (mVerbosity > 0) {
222 LOGP(info, "TFMeta : {}", lStfFileMeta.info());
223 }
224 if (lStfFileMeta.mWriteTimeMs == 0 && creationFallBack != 0) {
225 if (!creation0Notified) {
226 creation0Notified = true;
227 LOGP(warn, "Creation time 0 for timeSlice:{}, redefine to {}", tfID, creationFallBack);
228 }
229 lStfFileMeta.mWriteTimeMs = creationFallBack;
230 } else {
231 sCreationTime = lStfFileMeta.mWriteTimeMs;
232 }
233
234 // verify we're actually reading the correct data in
235 if (!(SubTimeFrameFileMeta::getDataHeader().dataDescription == lStfMetaDataHdr->dataDescription)) {
236 LOGP(warning, "Reading bad data: SubTimeFrame META header");
237 mFile.reset(nullptr);
238 return nullptr;
239 }
240
241 // prepare to read the TF data
242 const auto lStfSizeInFile = lStfFileMeta.mStfSizeInFile;
243 if (lStfSizeInFile == (sizeof(DataHeader) + sizeof(SubTimeFrameFileMeta))) {
244 LOGP(warning, "Reading an empty TF from file. Only meta information present");
245 mFile.reset(nullptr);
246 return nullptr;
247 }
248
249 // check there's enough data in the file
250 if ((lTfStartPosition + lStfSizeInFile) > mFile->size()) {
251 LOGP(warning, "Not enough data in file for this TF. Required: {}, available: {}", lStfSizeInFile, (mFile->size() - lTfStartPosition));
252 mFile.reset(nullptr);
253 return nullptr;
254 }
255
256 // Index
257 std::size_t lStfIndexHdrStackSize = 0;
258 const DataHeader* lStfIndexHdr = nullptr;
259
260 // Read DataHeader + SubTimeFrameFileMeta
261 auto lStfIndexHdrStack = getHeaderStack(lStfIndexHdrStackSize);
262 if (lStfIndexHdrStackSize == 0) {
263 mFile.reset(nullptr);
264 return nullptr;
265 }
266 lStfIndexHdr = o2::header::DataHeader::Get(lStfIndexHdrStack.first());
267 if (!lStfIndexHdr) {
268 LOG(error) << "Failed to read the TF index structure. The file might be corrupted.";
269 return nullptr;
270 }
271
272 if (!mFile->ignore_nbytes(lStfIndexHdr->payloadSize)) {
273 return nullptr;
274 }
275#ifdef _RUN_TIMING_MEASUREMENT_
276 TStopwatch readSW, findChanSW, msgSW, addPartSW;
277 findChanSW.Stop();
278 msgSW.Stop();
279 addPartSW.Stop();
280#endif
281 // Remaining data size of the TF:
282 // total size in file - meta (hdr+struct) - index (hdr + payload)
283 const auto lStfDataSize = lStfSizeInFile - (lMetaHdrStackSize + sizeof(SubTimeFrameFileMeta)) - (lStfIndexHdrStackSize + lStfIndexHdr->payloadSize);
284
285 std::int64_t lLeftToRead = lStfDataSize;
286 STFHeader stfHeader{tfID, -1u, -1u};
287 DataHeader prevHeader;
288 // read <hdrStack + data> pairs
289 while (lLeftToRead > 0) {
290 // allocate and read the Headers
291 std::size_t lDataHeaderStackSize = 0;
292 Stack lDataHeaderStack = getHeaderStack(lDataHeaderStackSize);
293 if (lDataHeaderStackSize == 0) {
294 mFile.reset(nullptr);
295 return nullptr;
296 }
297 const DataHeader* lDataHeader = o2::header::DataHeader::Get(lDataHeaderStack.first());
298 if (!lDataHeader) {
299 LOG(error) << "Failed to read the TF HBF DataHeader structure. The file might be corrupted.";
300 mFile.reset(nullptr);
301 return nullptr;
302 }
303 DataHeader locDataHeader(*lDataHeader);
304
305 if (mRepaireHeaders) {
306 if (locDataHeader == prevHeader) {
307 if (prevHeader.tfCounter == locDataHeader.tfCounter && (prevHeader.splitPayloadIndex + 1) != locDataHeader.splitPayloadIndex) {
308 if (mVerbosity > 3) {
309 LOGP(warn, "Repairing wrong part index for {} to {}", describeHeader(locDataHeader, true), (prevHeader.splitPayloadIndex + 1) % prevHeader.splitPayloadParts);
310 }
311 locDataHeader.splitPayloadIndex = (++prevHeader.splitPayloadIndex) % prevHeader.splitPayloadParts;
312 }
313 } else { // new header
314 if (locDataHeader.splitPayloadIndex != 0) {
315 if (mVerbosity > 2) {
316 LOGP(warn, "Repairing wrong part index for new {} to {}", describeHeader(locDataHeader, true), (prevHeader.splitPayloadIndex + 1) % prevHeader.splitPayloadParts);
317 }
318 locDataHeader.splitPayloadIndex = 0;
319 }
320 }
321 prevHeader = locDataHeader;
322 }
323 // sanity check
324 if (int(locDataHeader.firstTForbit) == -1) {
325 if (!negativeOrbitNotified) {
326 LOGP(warn, "Negative orbit for timeSlice:{} tfCounter:{} runNumber:{}, redefine to {}", tfID, locDataHeader.tfCounter, locDataHeader.runNumber, firstTForbitFallBack);
327 negativeOrbitNotified = true;
328 }
329 locDataHeader.firstTForbit = firstTForbitFallBack;
330 }
331 if (locDataHeader.runNumber == 0) {
332 if (!noRunNumberNotified) {
333 LOGP(warn, "runNumber is 0 for timeSlice:{} tfCounter:{}, redefine to {}", tfID, locDataHeader.tfCounter, runNumberFallBack);
334 noRunNumberNotified = true;
335 }
336 locDataHeader.runNumber = runNumberFallBack;
337 }
338 const std::uint64_t lDataSize = locDataHeader.payloadSize;
339
340 if (locDataHeader.dataOrigin == o2::header::gDataOriginFLP && locDataHeader.dataDescription == o2::header::gDataDescriptionDISTSTF && mRejectDistSTF) {
341 if (mVerbosity > 0) {
342 LOGP(warn, "Ignoring stored {}", describeHeader(locDataHeader));
343 }
344 if (!mFile->ignore_nbytes(lDataSize)) {
345 return nullptr;
346 }
347 lLeftToRead -= (lDataHeaderStackSize + lDataSize); // update the counter
348 continue;
349 }
350 o2::header::Stack headerStack{locDataHeader, o2f::DataProcessingHeader{tfID, 1, lStfFileMeta.mWriteTimeMs}};
351 if (stfHeader.runNumber == -1) {
352 stfHeader.id = locDataHeader.tfCounter;
353 stfHeader.runNumber = locDataHeader.runNumber;
354 stfHeader.firstOrbit = locDataHeader.firstTForbit;
355 std::lock_guard<std::mutex> lock(stfMtx);
356 sRunNumber = stfHeader.runNumber;
357 sFirstTForbit = stfHeader.firstOrbit;
358 }
359 // do we accept these data?
360 auto detOrigStatus = mDetOrigMap.find(locDataHeader.dataOrigin);
361 if (detOrigStatus != mDetOrigMap.end() && !detOrigStatus->second) { // this is a detector data and we don't want to read it
362 if (!mFile->ignore_nbytes(lDataSize)) {
363 return nullptr;
364 }
365 lLeftToRead -= (lDataHeaderStackSize + lDataSize); // update the counter
366 continue;
367 }
368#ifdef _RUN_TIMING_MEASUREMENT_
369 findChanSW.Start(false);
370#endif
371 const auto& fmqChannel = findOutputChannel(&locDataHeader, tfID);
372#ifdef _RUN_TIMING_MEASUREMENT_
373 findChanSW.Stop();
374#endif
375 if (fmqChannel.empty()) { // no output channel
376 if (!mFile->ignore_nbytes(lDataSize)) {
377 return nullptr;
378 }
379 lLeftToRead -= (lDataHeaderStackSize + lDataSize); // update the counter
380 continue;
381 //mFileMap.close();
382 //return nullptr;
383 }
384 // read the data
385
386 auto fmqFactory = device->GetChannel(fmqChannel, 0).Transport();
387#ifdef _RUN_TIMING_MEASUREMENT_
388 msgSW.Start(false);
389#endif
390 auto lHdrStackMsg = fmqFactory->CreateMessage(headerStack.size(), fair::mq::Alignment{64});
391 auto lDataMsg = fmqFactory->CreateMessage(lDataSize, fair::mq::Alignment{64});
392#ifdef _RUN_TIMING_MEASUREMENT_
393 msgSW.Stop();
394#endif
395 memcpy(lHdrStackMsg->GetData(), headerStack.data(), headerStack.size());
396 LOGP(debug, "read data, pos = {}, size = {} leftToRead {}", mFile->position(), lDataSize, lLeftToRead);
397
398 if (!read_advance(lDataMsg->GetData(), lDataSize)) {
399 return nullptr;
400 }
401 if (mVerbosity > 0) {
402 if (mVerbosity > 1 || locDataHeader.splitPayloadIndex == 0) {
403 printStack(headerStack);
404 if (o2::raw::RDHUtils::checkRDH(lDataMsg->GetData()) && mVerbosity > 2) {
405 o2::raw::RDHUtils::printRDH(lDataMsg->GetData());
406 }
407 }
408 }
409#ifdef _RUN_TIMING_MEASUREMENT_
410 addPartSW.Start(false);
411#endif
412 if (mVerbosity > 2) {
413 LOGP(info, "addPart {} to {} | HdrSize:{} DataSize:{}", describeHeader(locDataHeader, true), fmqChannel, lHdrStackMsg->GetSize(), lDataMsg->GetSize());
414 }
415 addPart(std::move(lHdrStackMsg), std::move(lDataMsg), fmqChannel);
416#ifdef _RUN_TIMING_MEASUREMENT_
417 addPartSW.Stop();
418#endif
419 // update the counter
420 lLeftToRead -= (lDataHeaderStackSize + lDataSize);
421 }
422
423 if (lLeftToRead < 0) {
424 LOG(error) << "FileRead: Read more data than it is indicated in the META header!";
425 return nullptr;
426 }
427 // add TF acknowledge part
428 // in case of empty TF fall-back to previous runNumber and fistTForbit
429 if (stfHeader.runNumber == -1u) {
430 stfHeader.runNumber = runNumberFallBack;
431 stfHeader.firstOrbit = firstTForbitFallBack;
432 LOGP(info, "Empty TF#{}, fallback to previous runNumber:{} firstTForbit:{}", tfID, stfHeader.runNumber, stfHeader.firstOrbit);
433 }
434
435 unsigned stfSS[2] = {0, 0xccdb};
436 for (int iss = 0; iss < (mSup0xccdb ? 1 : 2); iss++) {
439 stfDistDataHeader.firstTForbit = stfHeader.firstOrbit;
440 stfDistDataHeader.runNumber = stfHeader.runNumber;
441 stfDistDataHeader.tfCounter = stfHeader.id;
442 const auto fmqChannel = findOutputChannel(&stfDistDataHeader, tfID);
443 if (!fmqChannel.empty()) { // no output channel
444 auto fmqFactory = device->GetChannel(fmqChannel, 0).Transport();
445 o2::header::Stack headerStackSTF{stfDistDataHeader, o2f::DataProcessingHeader{tfID, 1, lStfFileMeta.mWriteTimeMs}};
446 if (mVerbosity > 0) {
447 printStack(headerStackSTF);
448 }
449 auto hdMessageSTF = fmqFactory->CreateMessage(headerStackSTF.size(), fair::mq::Alignment{64});
450 auto plMessageSTF = fmqFactory->CreateMessage(stfDistDataHeader.payloadSize, fair::mq::Alignment{64});
451 memcpy(hdMessageSTF->GetData(), headerStackSTF.data(), headerStackSTF.size());
452 memcpy(plMessageSTF->GetData(), &stfHeader, sizeof(STFHeader));
453#ifdef _RUN_TIMING_MEASUREMENT_
454 addPartSW.Start(false);
455#endif
456 if (mVerbosity > 2) {
457 LOGP(info, "addPart forced {} to {} | HdrSize:{} DataSize:{}", describeHeader(stfDistDataHeader, true), fmqChannel, hdMessageSTF->GetSize(), plMessageSTF->GetSize());
458 }
459 addPart(std::move(hdMessageSTF), std::move(plMessageSTF), fmqChannel);
460#ifdef _RUN_TIMING_MEASUREMENT_
461 addPartSW.Stop();
462#endif
463 }
464 }
465
466#ifdef _RUN_TIMING_MEASUREMENT_
467 readSW.Stop();
468 LOG(info) << "TF creation time: CPU: " << readSW.CpuTime() << " Wall: " << readSW.RealTime() << " s";
469 LOG(info) << "AddPart Timer CPU: " << addPartSW.CpuTime() << " Wall: " << addPartSW.RealTime() << " s";
470 LOG(info) << "CreMsg Timer CPU: " << msgSW.CpuTime() << " Wall: " << msgSW.RealTime() << " s";
471 LOG(info) << "FndChan Timer CPU: " << findChanSW.CpuTime() << " Wall: " << findChanSW.RealTime() << " s";
472#endif
473 return messagesPerRoute;
474}
475
476} // namespace rawdd
477} // namespace o2
std::ostringstream debug
uint32_t res
Definition RawData.h:0
benchmark::State & st
o2::header::Stack Stack
Class for time synchronization of RawReader instances.
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
static constexpr ID First
Definition DetID.h:95
static constexpr ID Last
if extra detectors added, update this !!!
Definition DetID.h:93
static constexpr o2h::DataOrigin getDataOrigin(ID id)
Definition DetID.h:148
static constexpr size_t KBYTE
Definition BinFileOp.h:28
static BinFileOp * open(const std::string &name)
Definition BinFileOp.cxx:24
std::unique_ptr< MessagesPerRoute > read(fair::mq::Device *device, const std::vector< o2f::OutputRoute > &outputRoutes, const std::string &rawChannel, size_t slice)
Read a single TF from the file.
GLuint id
Definition glcorearb.h:650
constexpr o2::header::DataOrigin gDataOriginFLP
Definition DataHeader.h:562
constexpr o2::header::DataDescription gDataDescriptionDISTSTF
Definition DataHeader.h:605
o2::header::DataHeader DataHeader
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
O2 data header classes and API, v0.1.
Definition DetID.h:49
constexpr o2::header::SerializationMethod gSerializationMethodNone
Definition DataHeader.h:327
std::uint64_t sCreationTime
std::uint32_t sRunNumber
std::uint32_t sFirstTForbit
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
the base header struct Every header type must begin (i.e. derive) with this. Don't use this struct di...
Definition DataHeader.h:351
const std::byte * data() const noexcept
Definition DataHeader.h:422
static const BaseHeader * get(const std::byte *b, size_t=0)
access header in buffer
Definition DataHeader.h:405
uint32_t headerVersion
version of the entire header, set by the derived header
Definition DataHeader.h:382
constexpr uint32_t size() const noexcept
Definition DataHeader.h:421
the main header struct
Definition DataHeader.h:620
SplitPayloadPartsType splitPayloadParts
Definition DataHeader.h:648
TFCounterType tfCounter
Definition DataHeader.h:681
SerializationMethod payloadSerializationMethod
Definition DataHeader.h:653
TForbitType firstTForbit
Definition DataHeader.h:676
DataDescription dataDescription
Definition DataHeader.h:638
SubSpecificationType subSpecification
Definition DataHeader.h:658
PayloadSizeType payloadSize
Definition DataHeader.h:668
RunNumberType runNumber
Definition DataHeader.h:686
static constexpr uint32_t sVersion
Definition DataHeader.h:631
static const DataHeader * Get(const BaseHeader *baseHeader)
Definition DataHeader.h:745
SplitPayloadIndexType splitPayloadIndex
Definition DataHeader.h:663
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:33
const BaseHeader * first() const
Definition Stack.h:59
static void printRDH(const RDHv4 &rdh)
Definition RDHUtils.cxx:26
static bool checkRDH(const RDHv4 &rdh, bool verbose=true, bool checkZeros=false)
Definition RDHUtils.cxx:133
static const o2::header::DataHeader getDataHeader()
const std::string info() const
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"