Project
Loading...
Searching...
No Matches
SubTimeFrameFileReader.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12// Adapthed with minimal changes from Gvozden Nescovic code to read sTFs files created by DataDistribution
13
16#include "Framework/Logger.h"
20#include <fairmq/Device.h>
21#include <fairmq/Message.h>
22#include <fairmq/Parts.h>
23#include <mutex>
24
25#if __linux__
26#include <sys/mman.h>
27#endif
28
29// uncomment this to check breakdown of TF building timing
30//#define _RUN_TIMING_MEASUREMENT_
31
32#ifdef _RUN_TIMING_MEASUREMENT_
33#include "TStopwatch.h"
34#endif
35
36namespace o2
37{
38namespace rawdd
39{
41using namespace o2::header;
42namespace o2f = o2::framework;
43
47
48SubTimeFrameFileReader::SubTimeFrameFileReader(const std::string& pFileName, o2::detectors::DetID::mask_t detMask, int verb, bool sup0xccdb, bool repaireHeaders, bool rejectDistSTF)
49 : mFileName(pFileName), mVerbosity(verb), mSup0xccdb(sup0xccdb), mRepaireHeaders(repaireHeaders), mRejectDistSTF(rejectDistSTF)
50{
51 mFileMap.open(mFileName);
52 if (!mFileMap.is_open()) {
53 LOG(error) << "Failed to open TF file for reading (mmap).";
54 return;
55 }
56 mFileSize = mFileMap.size();
57 mFileMapOffset = 0;
58
59 for (DetID::ID id = DetID::First; id <= DetID::Last; id++) {
60 mDetOrigMap[DetID::getDataOrigin(id)] = detMask[id];
61 }
62
63#if __linux__
64 madvise((void*)mFileMap.data(), mFileMap.size(), MADV_HUGEPAGE | MADV_SEQUENTIAL | MADV_DONTDUMP);
65#endif
66}
67
69{
70 if (!mFileMap.is_open()) {
71#if __linux__
72 madvise((void*)mFileMap.data(), mFileMap.size(), MADV_DONTNEED);
73#endif
74 mFileMap.close();
75 }
76}
77
78std::size_t SubTimeFrameFileReader::getHeaderStackSize() // throws ios_base::failure
79{
80 // Expect valid Stack in the file.
81 // First Header must be DataHeader. The size is unknown since there are multiple versions.
82 // Each header in the stack extends BaseHeader
83
84 // Read first the base header then the rest of the extended header. Keep going until the next flag is set.
85 // reset the file pointer to the original incoming position, so the complete Stack can be read in
86
87 bool readNextHeader = true;
88 std::size_t lStackSize = 0;
89 DataHeader lBaseHdr; // Use DataHeader since the BaseHeader has no default contructor.
90
91 const auto lFilePosStart = position();
92
93 const auto cMaxHeaders = 16; /* make sure we don't loop forever */
94 auto lNumHeaders = 0;
95 while (readNextHeader && (++lNumHeaders <= cMaxHeaders)) {
96 // read BaseHeader only!
97 const auto lBaseHdrPos = position();
98 if (!read_advance(&lBaseHdr, sizeof(BaseHeader))) {
99 return 0;
100 }
101
102 // go back, and read the whole O2 header (Base+Derived)
103 set_position(lBaseHdrPos);
104 if (!ignore_nbytes(lBaseHdr.size())) {
105 return 0;
106 }
107
108 lStackSize += lBaseHdr.size();
109 readNextHeader = (lBaseHdr.next() != nullptr);
110 }
111 // reset the file pointer
112 set_position(lFilePosStart);
113
114 if (lNumHeaders >= cMaxHeaders) {
115 LOGP(error, "FileReader: Reached max number of headers allowed: {}.", cMaxHeaders);
116 return 0;
117 }
118 LOGP(debug, "getHeaderStackSize, pos = {}, size = {}", lFilePosStart, lStackSize);
119 return lStackSize;
120}
121
122Stack SubTimeFrameFileReader::getHeaderStack(std::size_t& pOrigsize)
123{
124 const auto lStackSize = getHeaderStackSize();
125 pOrigsize = lStackSize;
126
127 if (lStackSize < sizeof(BaseHeader)) {
128 // error in the stream
129 pOrigsize = 0;
130 return Stack{};
131 }
132
133 std::byte* lStackMem = reinterpret_cast<std::byte*>(peek());
134 if (!ignore_nbytes(lStackSize)) {
135 // error in the stream
136 pOrigsize = 0;
137 return Stack{};
138 }
139
140 // This must handle different versions of DataHeader
141 // check if DataHeader needs an upgrade by looking at the version number
142 const BaseHeader* lBaseOfDH = BaseHeader::get(lStackMem);
143 if (!lBaseOfDH) {
144 return Stack{};
145 }
146
147 if (lBaseOfDH->headerVersion < DataHeader::sVersion) {
148 DataHeader lNewDh;
149
150 // Write over the new DataHeader. We need to update some of the BaseHeader values.
151 assert(sizeof(DataHeader) > lBaseOfDH->size()); // current DataHeader must be larger
152 std::memcpy(&lNewDh, (void*)lBaseOfDH->data(), lBaseOfDH->size());
153
154 // make sure to bump the version in the BaseHeader.
155 // TODO: Is there a better way?
156 lNewDh.headerSize = sizeof(DataHeader);
158
159 if (lBaseOfDH->headerVersion == 1 || lBaseOfDH->headerVersion == 2) {
160 /* nothing to do for the upgrade */
161 } else {
162 LOGP(error, "FileReader: DataHeader v{} read from file is not upgraded to the current version {}",
164 LOGP(error, "Try using a newer version of DataDistribution or file a BUG");
165 }
166
167 if (lBaseOfDH->size() == lStackSize) {
168 return Stack(lNewDh);
169 } else {
170 assert(lBaseOfDH->size() < lStackSize);
171
172 return Stack(
173 lNewDh,
174 Stack(lStackMem + lBaseOfDH->size()));
175 }
176 }
177
178 return Stack(lStackMem);
179}
180
181const std::string SubTimeFrameFileReader::describeHeader(const o2::header::DataHeader& hd, bool full) const
182{
183 std::string res = fmt::format("{}", o2f::DataSpecUtils::describe(o2::framework::OutputSpec{hd.dataOrigin, hd.dataDescription, hd.subSpecification}));
184 if (full) {
185 res += fmt::format(" part:{}/{} sz:{} TF:{} Orb:{} Run:{}", hd.splitPayloadIndex, hd.splitPayloadParts, hd.payloadSize, hd.tfCounter, hd.firstTForbit, hd.runNumber);
186 }
187 return res;
188}
189
190std::uint32_t sRunNumber = 0; // TODO: add id to files metadata
191std::uint32_t sFirstTForbit = 0; // TODO: add id to files metadata
192std::uint64_t sCreationTime = 0;
193std::mutex stfMtx;
194
195std::unique_ptr<MessagesPerRoute> SubTimeFrameFileReader::read(fair::mq::Device* device, const std::vector<o2f::OutputRoute>& outputRoutes, const std::string& rawChannel, size_t slice)
196{
197 std::unique_ptr<MessagesPerRoute> messagesPerRoute = std::make_unique<MessagesPerRoute>();
198 auto& msgMap = *messagesPerRoute.get();
199 assert(device);
200 std::unordered_map<o2::header::DataHeader, std::pair<std::string, bool>> channelsMap;
201 auto findOutputChannel = [&outputRoutes, &rawChannel, &channelsMap](const o2::header::DataHeader* h, size_t tslice) -> const std::string& {
202 if (!rawChannel.empty()) {
203 return rawChannel;
204 }
205 auto& chFromMap = channelsMap[*h];
206 if (chFromMap.first.empty() && !chFromMap.second) { // search for channel which is enountered for the 1st time
207 chFromMap.second = true; // flag that it was already checked
208 for (auto& oroute : outputRoutes) {
209 LOG(debug) << "comparing with matcher to route " << oroute.matcher << " TSlice:" << oroute.timeslice;
210 if (o2f::DataSpecUtils::match(oroute.matcher, h->dataOrigin, h->dataDescription, h->subSpecification) && ((tslice % oroute.maxTimeslices) == oroute.timeslice)) {
211 LOG(debug) << "picking the route:" << o2f::DataSpecUtils::describe(oroute.matcher) << " channel " << oroute.channel;
212 chFromMap.first = oroute.channel;
213 break;
214 }
215 }
216 }
217 return chFromMap.first;
218 };
219
220 auto addPart = [&msgMap](fair::mq::MessagePtr hd, fair::mq::MessagePtr pl, const std::string& fairMQChannel) {
221 fair::mq::Parts* parts = nullptr;
222 parts = msgMap[fairMQChannel].get(); // fair::mq::Parts*
223 if (!parts) {
224 msgMap[fairMQChannel] = std::make_unique<fair::mq::Parts>();
225 parts = msgMap[fairMQChannel].get();
226 }
227 parts->AddPart(std::move(hd));
228 parts->AddPart(std::move(pl));
229 };
230
231 // record current position
232 const auto lTfStartPosition = position();
233
234 if (lTfStartPosition == size() || !mFileMap.is_open() || eof()) {
235 return nullptr;
236 }
237 auto tfID = slice;
238 uint32_t runNumberFallBack = sRunNumber;
239 uint32_t firstTForbitFallBack = sFirstTForbit;
240 uint64_t creationFallBack = sCreationTime;
241 bool negativeOrbitNotified = false, noRunNumberNotified = false, creation0Notified = false;
242 std::size_t lMetaHdrStackSize = 0;
243 const DataHeader* lStfMetaDataHdr = nullptr;
244 SubTimeFrameFileMeta lStfFileMeta;
245
246 auto printStack = [tfID](const o2::header::Stack& st) {
247 auto dph = o2::header::get<o2f::DataProcessingHeader*>(st.data());
248 auto dh = o2::header::get<o2::header::DataHeader*>(st.data());
249 LOGP(info, "TF#{} Header for {}/{}/{} @ tfCounter {} run {} | {} of {} size {}, TForbit {} | DPH: {}/{}/{}", tfID,
250 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->runNumber,
251 dh->splitPayloadIndex, dh->splitPayloadParts, dh->payloadSize, dh->firstTForbit,
252 dph ? dph->startTime : 0, dph ? dph->duration : 0, dph ? dph->creation : 0);
253 };
254
255 // Read DataHeader + SubTimeFrameFileMeta
256 auto lMetaHdrStack = getHeaderStack(lMetaHdrStackSize);
257 if (lMetaHdrStackSize == 0) {
258 LOG(error) << "Failed to read the TF file header. The file might be corrupted.";
259 mFileMap.close();
260 return nullptr;
261 }
262 lStfMetaDataHdr = o2::header::DataHeader::Get(lMetaHdrStack.first());
263 if (mVerbosity > 0) {
264 LOGP(info, "read filemeta, pos = {}, size = {}", position(), sizeof(SubTimeFrameFileMeta));
265 }
266 if (!read_advance(&lStfFileMeta, sizeof(SubTimeFrameFileMeta))) {
267 return nullptr;
268 }
269 if (mVerbosity > 0) {
270 LOGP(info, "TFMeta : {}", lStfFileMeta.info());
271 }
272 if (lStfFileMeta.mWriteTimeMs == 0 && creationFallBack != 0) {
273 if (!creation0Notified) {
274 creation0Notified = true;
275 LOGP(warn, "Creation time 0 for timeSlice:{}, redefine to {}", tfID, creationFallBack);
276 }
277 lStfFileMeta.mWriteTimeMs = creationFallBack;
278 } else {
279 sCreationTime = lStfFileMeta.mWriteTimeMs;
280 }
281
282 // verify we're actually reading the correct data in
283 if (!(SubTimeFrameFileMeta::getDataHeader().dataDescription == lStfMetaDataHdr->dataDescription)) {
284 LOGP(warning, "Reading bad data: SubTimeFrame META header");
285 mFileMap.close();
286 return nullptr;
287 }
288
289 // prepare to read the TF data
290 const auto lStfSizeInFile = lStfFileMeta.mStfSizeInFile;
291 if (lStfSizeInFile == (sizeof(DataHeader) + sizeof(SubTimeFrameFileMeta))) {
292 LOGP(warning, "Reading an empty TF from file. Only meta information present");
293 mFileMap.close();
294 return nullptr;
295 }
296
297 // check there's enough data in the file
298 if ((lTfStartPosition + lStfSizeInFile) > this->size()) {
299 LOGP(warning, "Not enough data in file for this TF. Required: {}, available: {}", lStfSizeInFile, (this->size() - lTfStartPosition));
300 mFileMap.close();
301 return nullptr;
302 }
303
304 // Index
305 std::size_t lStfIndexHdrStackSize = 0;
306 const DataHeader* lStfIndexHdr = nullptr;
307
308 // Read DataHeader + SubTimeFrameFileMeta
309 auto lStfIndexHdrStack = getHeaderStack(lStfIndexHdrStackSize);
310 if (lStfIndexHdrStackSize == 0) {
311 mFileMap.close();
312 return nullptr;
313 }
314 lStfIndexHdr = o2::header::DataHeader::Get(lStfIndexHdrStack.first());
315 if (!lStfIndexHdr) {
316 LOG(error) << "Failed to read the TF index structure. The file might be corrupted.";
317 return nullptr;
318 }
319
320 if (!ignore_nbytes(lStfIndexHdr->payloadSize)) {
321 return nullptr;
322 }
323#ifdef _RUN_TIMING_MEASUREMENT_
324 TStopwatch readSW, findChanSW, msgSW, addPartSW;
325 findChanSW.Stop();
326 msgSW.Stop();
327 addPartSW.Stop();
328#endif
329 // Remaining data size of the TF:
330 // total size in file - meta (hdr+struct) - index (hdr + payload)
331 const auto lStfDataSize = lStfSizeInFile - (lMetaHdrStackSize + sizeof(SubTimeFrameFileMeta)) - (lStfIndexHdrStackSize + lStfIndexHdr->payloadSize);
332
333 std::int64_t lLeftToRead = lStfDataSize;
334 STFHeader stfHeader{tfID, -1u, -1u};
335 DataHeader prevHeader;
336 // read <hdrStack + data> pairs
337 while (lLeftToRead > 0) {
338 // allocate and read the Headers
339 std::size_t lDataHeaderStackSize = 0;
340 Stack lDataHeaderStack = getHeaderStack(lDataHeaderStackSize);
341 if (lDataHeaderStackSize == 0) {
342 mFileMap.close();
343 return nullptr;
344 }
345 const DataHeader* lDataHeader = o2::header::DataHeader::Get(lDataHeaderStack.first());
346 if (!lDataHeader) {
347 LOG(error) << "Failed to read the TF HBF DataHeader structure. The file might be corrupted.";
348 mFileMap.close();
349 return nullptr;
350 }
351 DataHeader locDataHeader(*lDataHeader);
352
353 if (mRepaireHeaders) {
354 if (locDataHeader == prevHeader) {
355 if (prevHeader.tfCounter == locDataHeader.tfCounter && (prevHeader.splitPayloadIndex + 1) != locDataHeader.splitPayloadIndex) {
356 if (mVerbosity > 3) {
357 LOGP(warn, "Repairing wrong part index for {} to {}", describeHeader(locDataHeader, true), (prevHeader.splitPayloadIndex + 1) % prevHeader.splitPayloadParts);
358 }
359 locDataHeader.splitPayloadIndex = (++prevHeader.splitPayloadIndex) % prevHeader.splitPayloadParts;
360 }
361 } else { // new header
362 if (locDataHeader.splitPayloadIndex != 0) {
363 if (mVerbosity > 2) {
364 LOGP(warn, "Repairing wrong part index for new {} to {}", describeHeader(locDataHeader, true), (prevHeader.splitPayloadIndex + 1) % prevHeader.splitPayloadParts);
365 }
366 locDataHeader.splitPayloadIndex = 0;
367 }
368 }
369 prevHeader = locDataHeader;
370 }
371 // sanity check
372 if (int(locDataHeader.firstTForbit) == -1) {
373 if (!negativeOrbitNotified) {
374 LOGP(warn, "Negative orbit for timeSlice:{} tfCounter:{} runNumber:{}, redefine to {}", tfID, locDataHeader.tfCounter, locDataHeader.runNumber, firstTForbitFallBack);
375 negativeOrbitNotified = true;
376 }
377 locDataHeader.firstTForbit = firstTForbitFallBack;
378 }
379 if (locDataHeader.runNumber == 0) {
380 if (!noRunNumberNotified) {
381 LOGP(warn, "runNumber is 0 for timeSlice:{} tfCounter:{}, redefine to {}", tfID, locDataHeader.tfCounter, runNumberFallBack);
382 noRunNumberNotified = true;
383 }
384 locDataHeader.runNumber = runNumberFallBack;
385 }
386 const std::uint64_t lDataSize = locDataHeader.payloadSize;
387
388 if (locDataHeader.dataOrigin == o2::header::gDataOriginFLP && locDataHeader.dataDescription == o2::header::gDataDescriptionDISTSTF && mRejectDistSTF) {
389 if (mVerbosity > 0) {
390 LOGP(warn, "Ignoring stored {}", describeHeader(locDataHeader));
391 }
392 if (!ignore_nbytes(lDataSize)) {
393 return nullptr;
394 }
395 lLeftToRead -= (lDataHeaderStackSize + lDataSize); // update the counter
396 continue;
397 }
398 o2::header::Stack headerStack{locDataHeader, o2f::DataProcessingHeader{tfID, 1, lStfFileMeta.mWriteTimeMs}};
399 if (stfHeader.runNumber == -1) {
400 stfHeader.id = locDataHeader.tfCounter;
401 stfHeader.runNumber = locDataHeader.runNumber;
402 stfHeader.firstOrbit = locDataHeader.firstTForbit;
403 std::lock_guard<std::mutex> lock(stfMtx);
404 sRunNumber = stfHeader.runNumber;
405 sFirstTForbit = stfHeader.firstOrbit;
406 }
407 // do we accept these data?
408 auto detOrigStatus = mDetOrigMap.find(locDataHeader.dataOrigin);
409 if (detOrigStatus != mDetOrigMap.end() && !detOrigStatus->second) { // this is a detector data and we don't want to read it
410 if (!ignore_nbytes(lDataSize)) {
411 return nullptr;
412 }
413 lLeftToRead -= (lDataHeaderStackSize + lDataSize); // update the counter
414 continue;
415 }
416#ifdef _RUN_TIMING_MEASUREMENT_
417 findChanSW.Start(false);
418#endif
419 const auto& fmqChannel = findOutputChannel(&locDataHeader, tfID);
420#ifdef _RUN_TIMING_MEASUREMENT_
421 findChanSW.Stop();
422#endif
423 if (fmqChannel.empty()) { // no output channel
424 if (!ignore_nbytes(lDataSize)) {
425 return nullptr;
426 }
427 lLeftToRead -= (lDataHeaderStackSize + lDataSize); // update the counter
428 continue;
429 //mFileMap.close();
430 //return nullptr;
431 }
432 // read the data
433
434 auto fmqFactory = device->GetChannel(fmqChannel, 0).Transport();
435#ifdef _RUN_TIMING_MEASUREMENT_
436 msgSW.Start(false);
437#endif
438 auto lHdrStackMsg = fmqFactory->CreateMessage(headerStack.size(), fair::mq::Alignment{64});
439 auto lDataMsg = fmqFactory->CreateMessage(lDataSize, fair::mq::Alignment{64});
440#ifdef _RUN_TIMING_MEASUREMENT_
441 msgSW.Stop();
442#endif
443 memcpy(lHdrStackMsg->GetData(), headerStack.data(), headerStack.size());
444 LOGP(debug, "read data, pos = {}, size = {} leftToRead {}", position(), lDataSize, lLeftToRead);
445
446 if (!read_advance(lDataMsg->GetData(), lDataSize)) {
447 return nullptr;
448 }
449 if (mVerbosity > 0) {
450 if (mVerbosity > 1 || locDataHeader.splitPayloadIndex == 0) {
451 printStack(headerStack);
452 if (o2::raw::RDHUtils::checkRDH(lDataMsg->GetData()) && mVerbosity > 2) {
453 o2::raw::RDHUtils::printRDH(lDataMsg->GetData());
454 }
455 }
456 }
457#ifdef _RUN_TIMING_MEASUREMENT_
458 addPartSW.Start(false);
459#endif
460 if (mVerbosity > 2) {
461 LOGP(info, "addPart {} to {} | HdrSize:{} DataSize:{}", describeHeader(locDataHeader, true), fmqChannel, lHdrStackMsg->GetSize(), lDataMsg->GetSize());
462 }
463 addPart(std::move(lHdrStackMsg), std::move(lDataMsg), fmqChannel);
464#ifdef _RUN_TIMING_MEASUREMENT_
465 addPartSW.Stop();
466#endif
467 // update the counter
468 lLeftToRead -= (lDataHeaderStackSize + lDataSize);
469 }
470
471 if (lLeftToRead < 0) {
472 LOG(error) << "FileRead: Read more data than it is indicated in the META header!";
473 return nullptr;
474 }
475 // add TF acknowledge part
476 // in case of empty TF fall-back to previous runNumber and fistTForbit
477 if (stfHeader.runNumber == -1u) {
478 stfHeader.runNumber = runNumberFallBack;
479 stfHeader.firstOrbit = firstTForbitFallBack;
480 LOGP(info, "Empty TF#{}, fallback to previous runNumber:{} firstTForbit:{}", tfID, stfHeader.runNumber, stfHeader.firstOrbit);
481 }
482
483 unsigned stfSS[2] = {0, 0xccdb};
484 for (int iss = 0; iss < (mSup0xccdb ? 1 : 2); iss++) {
487 stfDistDataHeader.firstTForbit = stfHeader.firstOrbit;
488 stfDistDataHeader.runNumber = stfHeader.runNumber;
489 stfDistDataHeader.tfCounter = stfHeader.id;
490 const auto fmqChannel = findOutputChannel(&stfDistDataHeader, tfID);
491 if (!fmqChannel.empty()) { // no output channel
492 auto fmqFactory = device->GetChannel(fmqChannel, 0).Transport();
493 o2::header::Stack headerStackSTF{stfDistDataHeader, o2f::DataProcessingHeader{tfID, 1, lStfFileMeta.mWriteTimeMs}};
494 if (mVerbosity > 0) {
495 printStack(headerStackSTF);
496 }
497 auto hdMessageSTF = fmqFactory->CreateMessage(headerStackSTF.size(), fair::mq::Alignment{64});
498 auto plMessageSTF = fmqFactory->CreateMessage(stfDistDataHeader.payloadSize, fair::mq::Alignment{64});
499 memcpy(hdMessageSTF->GetData(), headerStackSTF.data(), headerStackSTF.size());
500 memcpy(plMessageSTF->GetData(), &stfHeader, sizeof(STFHeader));
501#ifdef _RUN_TIMING_MEASUREMENT_
502 addPartSW.Start(false);
503#endif
504 if (mVerbosity > 2) {
505 LOGP(info, "addPart forced {} to {} | HdrSize:{} DataSize:{}", describeHeader(stfDistDataHeader, true), fmqChannel, hdMessageSTF->GetSize(), plMessageSTF->GetSize());
506 }
507 addPart(std::move(hdMessageSTF), std::move(plMessageSTF), fmqChannel);
508#ifdef _RUN_TIMING_MEASUREMENT_
509 addPartSW.Stop();
510#endif
511 }
512 }
513
514#ifdef _RUN_TIMING_MEASUREMENT_
515 readSW.Stop();
516 LOG(info) << "TF creation time: CPU: " << readSW.CpuTime() << " Wall: " << readSW.RealTime() << " s";
517 LOG(info) << "AddPart Timer CPU: " << addPartSW.CpuTime() << " Wall: " << addPartSW.RealTime() << " s";
518 LOG(info) << "CreMsg Timer CPU: " << msgSW.CpuTime() << " Wall: " << msgSW.RealTime() << " s";
519 LOG(info) << "FndChan Timer CPU: " << findChanSW.CpuTime() << " Wall: " << findChanSW.RealTime() << " s";
520#endif
521 return messagesPerRoute;
522}
523
524} // namespace rawdd
525} // namespace o2
std::ostringstream debug
uint32_t res
Definition RawData.h:0
benchmark::State & st
o2::header::Stack Stack
Class for time synchronization of RawReader instances.
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
static constexpr ID First
Definition DetID.h:95
static constexpr ID Last
if extra detectors added, update this !!!
Definition DetID.h:93
static constexpr o2h::DataOrigin getDataOrigin(ID id)
Definition DetID.h:148
std::uint64_t position() const
Tell the current position of the file.
bool eof() const
Is the stream position at EOF.
std::unique_ptr< MessagesPerRoute > read(fair::mq::Device *device, const std::vector< o2f::OutputRoute > &outputRoutes, const std::string &rawChannel, size_t slice)
Read a single TF from the file.
std::uint64_t size() const
Tell the size of the file.
void set_position(std::uint64_t pPos)
Set the current position of the file.
GLuint id
Definition glcorearb.h:650
constexpr o2::header::DataOrigin gDataOriginFLP
Definition DataHeader.h:562
constexpr o2::header::DataDescription gDataDescriptionDISTSTF
Definition DataHeader.h:605
o2::header::DataHeader DataHeader
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
O2 data header classes and API, v0.1.
Definition DetID.h:49
constexpr o2::header::SerializationMethod gSerializationMethodNone
Definition DataHeader.h:327
std::uint64_t sCreationTime
std::uint32_t sRunNumber
std::uint32_t sFirstTForbit
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
the base header struct Every header type must begin (i.e. derive) with this. Don't use this struct di...
Definition DataHeader.h:351
const std::byte * data() const noexcept
Definition DataHeader.h:422
static const BaseHeader * get(const std::byte *b, size_t=0)
access header in buffer
Definition DataHeader.h:405
const BaseHeader * next() const noexcept
get the next header if any (const version)
Definition DataHeader.h:425
uint32_t headerVersion
version of the entire header, set by the derived header
Definition DataHeader.h:382
constexpr uint32_t size() const noexcept
Definition DataHeader.h:421
the main header struct
Definition DataHeader.h:620
SplitPayloadPartsType splitPayloadParts
Definition DataHeader.h:648
TFCounterType tfCounter
Definition DataHeader.h:681
SerializationMethod payloadSerializationMethod
Definition DataHeader.h:653
TForbitType firstTForbit
Definition DataHeader.h:676
DataDescription dataDescription
Definition DataHeader.h:638
SubSpecificationType subSpecification
Definition DataHeader.h:658
PayloadSizeType payloadSize
Definition DataHeader.h:668
RunNumberType runNumber
Definition DataHeader.h:686
static constexpr uint32_t sVersion
Definition DataHeader.h:631
static const DataHeader * Get(const BaseHeader *baseHeader)
Definition DataHeader.h:745
SplitPayloadIndexType splitPayloadIndex
Definition DataHeader.h:663
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:33
const BaseHeader * first() const
Definition Stack.h:59
static void printRDH(const RDHv4 &rdh)
Definition RDHUtils.cxx:26
static bool checkRDH(const RDHv4 &rdh, bool verbose=true, bool checkZeros=false)
Definition RDHUtils.cxx:133
static const o2::header::DataHeader getDataHeader()
const std::string info() const
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"