Project
Loading...
Searching...
No Matches
SubTimeFrameFileReader.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12// Adapthed with minimal changes from Gvozden Nescovic code to read sTFs files created by DataDistribution
13
16#include "Framework/Logger.h"
20#include <fairmq/Device.h>
21#include <fairmq/Message.h>
22#include <fairmq/Parts.h>
23#include <mutex>
24
25#if __linux__
26#include <sys/mman.h>
27#endif
28
29// uncomment this to check breakdown of TF building timing
30//#define _RUN_TIMING_MEASUREMENT_
31
32#ifdef _RUN_TIMING_MEASUREMENT_
33#include "TStopwatch.h"
34#endif
35
36namespace o2
37{
38namespace rawdd
39{
41using namespace o2::header;
42namespace o2f = o2::framework;
43
47
49 : mFileName(pFileName)
50{
51 mFileMap.open(mFileName);
52 if (!mFileMap.is_open()) {
53 LOG(error) << "Failed to open TF file for reading (mmap).";
54 return;
55 }
56 mFileSize = mFileMap.size();
57 mFileMapOffset = 0;
58
59 for (DetID::ID id = DetID::First; id <= DetID::Last; id++) {
60 mDetOrigMap[DetID::getDataOrigin(id)] = detMask[id];
61 }
62
63#if __linux__
64 madvise((void*)mFileMap.data(), mFileMap.size(), MADV_HUGEPAGE | MADV_SEQUENTIAL | MADV_DONTDUMP);
65#endif
66}
67
69{
70 if (!mFileMap.is_open()) {
71#if __linux__
72 madvise((void*)mFileMap.data(), mFileMap.size(), MADV_DONTNEED);
73#endif
74 mFileMap.close();
75 }
76}
77
78std::size_t SubTimeFrameFileReader::getHeaderStackSize() // throws ios_base::failure
79{
80 // Expect valid Stack in the file.
81 // First Header must be DataHeader. The size is unknown since there are multiple versions.
82 // Each header in the stack extends BaseHeader
83
84 // Read first the base header then the rest of the extended header. Keep going until the next flag is set.
85 // reset the file pointer to the original incoming position, so the complete Stack can be read in
86
87 bool readNextHeader = true;
88 std::size_t lStackSize = 0;
89 DataHeader lBaseHdr; // Use DataHeader since the BaseHeader has no default contructor.
90
91 const auto lFilePosStart = position();
92
93 const auto cMaxHeaders = 16; /* make sure we don't loop forever */
94 auto lNumHeaders = 0;
95 while (readNextHeader && (++lNumHeaders <= cMaxHeaders)) {
96 // read BaseHeader only!
97 const auto lBaseHdrPos = position();
98 if (!read_advance(&lBaseHdr, sizeof(BaseHeader))) {
99 return 0;
100 }
101
102 // go back, and read the whole O2 header (Base+Derived)
103 set_position(lBaseHdrPos);
104 if (!ignore_nbytes(lBaseHdr.size())) {
105 return 0;
106 }
107
108 lStackSize += lBaseHdr.size();
109 readNextHeader = (lBaseHdr.next() != nullptr);
110 }
111 // reset the file pointer
112 set_position(lFilePosStart);
113
114 if (lNumHeaders >= cMaxHeaders) {
115 LOGP(error, "FileReader: Reached max number of headers allowed: {}.", cMaxHeaders);
116 return 0;
117 }
118
119 return lStackSize;
120}
121
122Stack SubTimeFrameFileReader::getHeaderStack(std::size_t& pOrigsize)
123{
124 const auto lStackSize = getHeaderStackSize();
125 pOrigsize = lStackSize;
126
127 if (lStackSize < sizeof(BaseHeader)) {
128 // error in the stream
129 pOrigsize = 0;
130 return Stack{};
131 }
132
133 std::byte* lStackMem = reinterpret_cast<std::byte*>(peek());
134 if (!ignore_nbytes(lStackSize)) {
135 // error in the stream
136 pOrigsize = 0;
137 return Stack{};
138 }
139
140 // This must handle different versions of DataHeader
141 // check if DataHeader needs an upgrade by looking at the version number
142 const BaseHeader* lBaseOfDH = BaseHeader::get(lStackMem);
143 if (!lBaseOfDH) {
144 return Stack{};
145 }
146
147 if (lBaseOfDH->headerVersion < DataHeader::sVersion) {
148 DataHeader lNewDh;
149
150 // Write over the new DataHeader. We need to update some of the BaseHeader values.
151 assert(sizeof(DataHeader) > lBaseOfDH->size()); // current DataHeader must be larger
152 std::memcpy(&lNewDh, (void*)lBaseOfDH->data(), lBaseOfDH->size());
153
154 // make sure to bump the version in the BaseHeader.
155 // TODO: Is there a better way?
156 lNewDh.headerSize = sizeof(DataHeader);
158
159 if (lBaseOfDH->headerVersion == 1 || lBaseOfDH->headerVersion == 2) {
160 /* nothing to do for the upgrade */
161 } else {
162 LOGP(error, "FileReader: DataHeader v{} read from file is not upgraded to the current version {}",
164 LOGP(error, "Try using a newer version of DataDistribution or file a BUG");
165 }
166
167 if (lBaseOfDH->size() == lStackSize) {
168 return Stack(lNewDh);
169 } else {
170 assert(lBaseOfDH->size() < lStackSize);
171
172 return Stack(
173 lNewDh,
174 Stack(lStackMem + lBaseOfDH->size()));
175 }
176 }
177
178 return Stack(lStackMem);
179}
180
181std::uint32_t sRunNumber = 0; // TODO: add id to files metadata
182std::uint32_t sFirstTForbit = 0; // TODO: add id to files metadata
183std::uint64_t sCreationTime = 0;
184std::mutex stfMtx;
185
186std::unique_ptr<MessagesPerRoute> SubTimeFrameFileReader::read(fair::mq::Device* device, const std::vector<o2f::OutputRoute>& outputRoutes,
187 const std::string& rawChannel, size_t slice, bool sup0xccdb, int verbosity)
188{
189 std::unique_ptr<MessagesPerRoute> messagesPerRoute = std::make_unique<MessagesPerRoute>();
190 auto& msgMap = *messagesPerRoute.get();
191 assert(device);
192 std::unordered_map<o2::header::DataHeader, std::pair<std::string, bool>> channelsMap;
193 auto findOutputChannel = [&outputRoutes, &rawChannel, &channelsMap](const o2::header::DataHeader* h, size_t tslice) -> const std::string& {
194 if (!rawChannel.empty()) {
195 return rawChannel;
196 }
197 auto& chFromMap = channelsMap[*h];
198 if (chFromMap.first.empty() && !chFromMap.second) { // search for channel which is enountered for the 1st time
199 chFromMap.second = true; // flag that it was already checked
200 for (auto& oroute : outputRoutes) {
201 LOG(debug) << "comparing with matcher to route " << oroute.matcher << " TSlice:" << oroute.timeslice;
202 if (o2f::DataSpecUtils::match(oroute.matcher, h->dataOrigin, h->dataDescription, h->subSpecification) && ((tslice % oroute.maxTimeslices) == oroute.timeslice)) {
203 LOG(debug) << "picking the route:" << o2f::DataSpecUtils::describe(oroute.matcher) << " channel " << oroute.channel;
204 chFromMap.first = oroute.channel;
205 break;
206 }
207 }
208 }
209 return chFromMap.first;
210 };
211
212 auto addPart = [&msgMap](fair::mq::MessagePtr hd, fair::mq::MessagePtr pl, const std::string& fairMQChannel) {
213 fair::mq::Parts* parts = nullptr;
214 parts = msgMap[fairMQChannel].get(); // fair::mq::Parts*
215 if (!parts) {
216 msgMap[fairMQChannel] = std::make_unique<fair::mq::Parts>();
217 parts = msgMap[fairMQChannel].get();
218 }
219 parts->AddPart(std::move(hd));
220 parts->AddPart(std::move(pl));
221 };
222
223 // record current position
224 const auto lTfStartPosition = position();
225
226 if (lTfStartPosition == size() || !mFileMap.is_open() || eof()) {
227 return nullptr;
228 }
229 auto tfID = slice;
230 uint32_t runNumberFallBack = sRunNumber;
231 uint32_t firstTForbitFallBack = sFirstTForbit;
232 uint64_t creationFallBack = sCreationTime;
233 bool negativeOrbitNotified = false, noRunNumberNotified = false, creation0Notified = false;
234 std::size_t lMetaHdrStackSize = 0;
235 const DataHeader* lStfMetaDataHdr = nullptr;
236 SubTimeFrameFileMeta lStfFileMeta;
237
238 auto printStack = [tfID](const o2::header::Stack& st) {
239 auto dph = o2::header::get<o2f::DataProcessingHeader*>(st.data());
240 auto dh = o2::header::get<o2::header::DataHeader*>(st.data());
241 LOGP(info, "TF#{} Header for {}/{}/{} @ tfCounter {} run {} | {} of {} size {}, TForbit {} | DPH: {}/{}/{}", tfID,
242 dh->dataOrigin.str, dh->dataDescription.str, dh->subSpecification, dh->tfCounter, dh->runNumber,
243 dh->splitPayloadIndex, dh->splitPayloadParts, dh->payloadSize, dh->firstTForbit,
244 dph ? dph->startTime : 0, dph ? dph->duration : 0, dph ? dph->creation : 0);
245 };
246
247 // Read DataHeader + SubTimeFrameFileMeta
248 auto lMetaHdrStack = getHeaderStack(lMetaHdrStackSize);
249 if (lMetaHdrStackSize == 0) {
250 LOG(error) << "Failed to read the TF file header. The file might be corrupted.";
251 mFileMap.close();
252 return nullptr;
253 }
254 lStfMetaDataHdr = o2::header::DataHeader::Get(lMetaHdrStack.first());
255 if (!read_advance(&lStfFileMeta, sizeof(SubTimeFrameFileMeta))) {
256 return nullptr;
257 }
258 if (lStfFileMeta.mWriteTimeMs == 0 && creationFallBack != 0) {
259 if (!creation0Notified) {
260 creation0Notified = true;
261 LOGP(warn, "Creation time 0 for timeSlice:{}, redefine to {}", tfID, creationFallBack);
262 }
263 lStfFileMeta.mWriteTimeMs = creationFallBack;
264 } else {
265 sCreationTime = lStfFileMeta.mWriteTimeMs;
266 }
267
268 // verify we're actually reading the correct data in
269 if (!(SubTimeFrameFileMeta::getDataHeader().dataDescription == lStfMetaDataHdr->dataDescription)) {
270 LOGP(warning, "Reading bad data: SubTimeFrame META header");
271 mFileMap.close();
272 return nullptr;
273 }
274
275 // prepare to read the TF data
276 const auto lStfSizeInFile = lStfFileMeta.mStfSizeInFile;
277 if (lStfSizeInFile == (sizeof(DataHeader) + sizeof(SubTimeFrameFileMeta))) {
278 LOGP(warning, "Reading an empty TF from file. Only meta information present");
279 mFileMap.close();
280 return nullptr;
281 }
282
283 // check there's enough data in the file
284 if ((lTfStartPosition + lStfSizeInFile) > this->size()) {
285 LOGP(warning, "Not enough data in file for this TF. Required: {}, available: {}", lStfSizeInFile, (this->size() - lTfStartPosition));
286 mFileMap.close();
287 return nullptr;
288 }
289
290 // Index
291 std::size_t lStfIndexHdrStackSize = 0;
292 const DataHeader* lStfIndexHdr = nullptr;
293
294 // Read DataHeader + SubTimeFrameFileMeta
295 auto lStfIndexHdrStack = getHeaderStack(lStfIndexHdrStackSize);
296 if (lStfIndexHdrStackSize == 0) {
297 mFileMap.close();
298 return nullptr;
299 }
300 lStfIndexHdr = o2::header::DataHeader::Get(lStfIndexHdrStack.first());
301 if (!lStfIndexHdr) {
302 LOG(error) << "Failed to read the TF index structure. The file might be corrupted.";
303 return nullptr;
304 }
305
306 if (!ignore_nbytes(lStfIndexHdr->payloadSize)) {
307 return nullptr;
308 }
309#ifdef _RUN_TIMING_MEASUREMENT_
310 TStopwatch readSW, findChanSW, msgSW, addPartSW;
311 findChanSW.Stop();
312 msgSW.Stop();
313 addPartSW.Stop();
314#endif
315 // Remaining data size of the TF:
316 // total size in file - meta (hdr+struct) - index (hdr + payload)
317 const auto lStfDataSize = lStfSizeInFile - (lMetaHdrStackSize + sizeof(SubTimeFrameFileMeta)) - (lStfIndexHdrStackSize + lStfIndexHdr->payloadSize);
318
319 std::int64_t lLeftToRead = lStfDataSize;
320 STFHeader stfHeader{tfID, -1u, -1u};
321 // read <hdrStack + data> pairs
322 while (lLeftToRead > 0) {
323
324 // allocate and read the Headers
325 std::size_t lDataHeaderStackSize = 0;
326 Stack lDataHeaderStack = getHeaderStack(lDataHeaderStackSize);
327 if (lDataHeaderStackSize == 0) {
328 mFileMap.close();
329 return nullptr;
330 }
331 const DataHeader* lDataHeader = o2::header::DataHeader::Get(lDataHeaderStack.first());
332 if (!lDataHeader) {
333 LOG(error) << "Failed to read the TF HBF DataHeader structure. The file might be corrupted.";
334 mFileMap.close();
335 return nullptr;
336 }
337 DataHeader locDataHeader(*lDataHeader);
338 // sanity check
339 if (int(locDataHeader.firstTForbit) == -1) {
340 if (!negativeOrbitNotified) {
341 LOGP(warn, "Negative orbit for timeSlice:{} tfCounter:{} runNumber:{}, redefine to {}", tfID, locDataHeader.tfCounter, locDataHeader.runNumber, firstTForbitFallBack);
342 negativeOrbitNotified = true;
343 }
344 locDataHeader.firstTForbit = firstTForbitFallBack;
345 }
346 if (locDataHeader.runNumber == 0) {
347 if (!noRunNumberNotified) {
348 LOGP(warn, "runNumber is 0 for timeSlice:{} tfCounter:{}, redefine to {}", tfID, locDataHeader.tfCounter, runNumberFallBack);
349 noRunNumberNotified = true;
350 }
351 locDataHeader.runNumber = runNumberFallBack;
352 }
353 o2::header::Stack headerStack{locDataHeader, o2f::DataProcessingHeader{tfID, 1, lStfFileMeta.mWriteTimeMs}};
354 if (stfHeader.runNumber == -1) {
355 stfHeader.id = locDataHeader.tfCounter;
356 stfHeader.runNumber = locDataHeader.runNumber;
357 stfHeader.firstOrbit = locDataHeader.firstTForbit;
358 std::lock_guard<std::mutex> lock(stfMtx);
359 sRunNumber = stfHeader.runNumber;
360 sFirstTForbit = stfHeader.firstOrbit;
361 }
362
363 const std::uint64_t lDataSize = locDataHeader.payloadSize;
364 // do we accept these data?
365 auto detOrigStatus = mDetOrigMap.find(locDataHeader.dataOrigin);
366 if (detOrigStatus != mDetOrigMap.end() && !detOrigStatus->second) { // this is a detector data and we don't want to read it
367 if (!ignore_nbytes(lDataSize)) {
368 return nullptr;
369 }
370 lLeftToRead -= (lDataHeaderStackSize + lDataSize); // update the counter
371 continue;
372 }
373#ifdef _RUN_TIMING_MEASUREMENT_
374 findChanSW.Start(false);
375#endif
376 const auto& fmqChannel = findOutputChannel(&locDataHeader, tfID);
377#ifdef _RUN_TIMING_MEASUREMENT_
378 findChanSW.Stop();
379#endif
380 if (fmqChannel.empty()) { // no output channel
381 if (!ignore_nbytes(lDataSize)) {
382 return nullptr;
383 }
384 lLeftToRead -= (lDataHeaderStackSize + lDataSize); // update the counter
385 continue;
386 //mFileMap.close();
387 //return nullptr;
388 }
389 // read the data
390
391 auto fmqFactory = device->GetChannel(fmqChannel, 0).Transport();
392#ifdef _RUN_TIMING_MEASUREMENT_
393 msgSW.Start(false);
394#endif
395 auto lHdrStackMsg = fmqFactory->CreateMessage(headerStack.size(), fair::mq::Alignment{64});
396 auto lDataMsg = fmqFactory->CreateMessage(lDataSize, fair::mq::Alignment{64});
397#ifdef _RUN_TIMING_MEASUREMENT_
398 msgSW.Stop();
399#endif
400 memcpy(lHdrStackMsg->GetData(), headerStack.data(), headerStack.size());
401
402 if (!read_advance(lDataMsg->GetData(), lDataSize)) {
403 return nullptr;
404 }
405 if (verbosity > 0) {
406 if (verbosity > 1 || locDataHeader.splitPayloadIndex == 0) {
407 printStack(headerStack);
408 if (o2::raw::RDHUtils::checkRDH(lDataMsg->GetData()) && verbosity > 2) {
409 o2::raw::RDHUtils::printRDH(lDataMsg->GetData());
410 }
411 }
412 }
413#ifdef _RUN_TIMING_MEASUREMENT_
414 addPartSW.Start(false);
415#endif
416 addPart(std::move(lHdrStackMsg), std::move(lDataMsg), fmqChannel);
417#ifdef _RUN_TIMING_MEASUREMENT_
418 addPartSW.Stop();
419#endif
420 // update the counter
421 lLeftToRead -= (lDataHeaderStackSize + lDataSize);
422 }
423
424 if (lLeftToRead < 0) {
425 LOG(error) << "FileRead: Read more data than it is indicated in the META header!";
426 return nullptr;
427 }
428 // add TF acknowledge part
429 // in case of empty TF fall-back to previous runNumber and fistTForbit
430 if (stfHeader.runNumber == -1u) {
431 stfHeader.runNumber = runNumberFallBack;
432 stfHeader.firstOrbit = firstTForbitFallBack;
433 LOGP(info, "Empty TF#{}, fallback to previous runNumber:{} firstTForbit:{}", tfID, stfHeader.runNumber, stfHeader.firstOrbit);
434 }
435
436 unsigned stfSS[2] = {0, 0xccdb};
437 for (int iss = 0; iss < (sup0xccdb ? 1 : 2); iss++) {
440 stfDistDataHeader.firstTForbit = stfHeader.firstOrbit;
441 stfDistDataHeader.runNumber = stfHeader.runNumber;
442 stfDistDataHeader.tfCounter = stfHeader.id;
443 const auto fmqChannel = findOutputChannel(&stfDistDataHeader, tfID);
444 if (!fmqChannel.empty()) { // no output channel
445 auto fmqFactory = device->GetChannel(fmqChannel, 0).Transport();
446 o2::header::Stack headerStackSTF{stfDistDataHeader, o2f::DataProcessingHeader{tfID, 1, lStfFileMeta.mWriteTimeMs}};
447 if (verbosity > 0) {
448 printStack(headerStackSTF);
449 }
450 auto hdMessageSTF = fmqFactory->CreateMessage(headerStackSTF.size(), fair::mq::Alignment{64});
451 auto plMessageSTF = fmqFactory->CreateMessage(stfDistDataHeader.payloadSize, fair::mq::Alignment{64});
452 memcpy(hdMessageSTF->GetData(), headerStackSTF.data(), headerStackSTF.size());
453 memcpy(plMessageSTF->GetData(), &stfHeader, sizeof(STFHeader));
454#ifdef _RUN_TIMING_MEASUREMENT_
455 addPartSW.Start(false);
456#endif
457 addPart(std::move(hdMessageSTF), std::move(plMessageSTF), fmqChannel);
458#ifdef _RUN_TIMING_MEASUREMENT_
459 addPartSW.Stop();
460#endif
461 }
462 }
463
464#ifdef _RUN_TIMING_MEASUREMENT_
465 readSW.Stop();
466 LOG(info) << "TF creation time: CPU: " << readSW.CpuTime() << " Wall: " << readSW.RealTime() << " s";
467 LOG(info) << "AddPart Timer CPU: " << addPartSW.CpuTime() << " Wall: " << addPartSW.RealTime() << " s";
468 LOG(info) << "CreMsg Timer CPU: " << msgSW.CpuTime() << " Wall: " << msgSW.RealTime() << " s";
469 LOG(info) << "FndChan Timer CPU: " << findChanSW.CpuTime() << " Wall: " << findChanSW.RealTime() << " s";
470#endif
471 return messagesPerRoute;
472}
473
474} // namespace rawdd
475} // namespace o2
#define verbosity
std::ostringstream debug
benchmark::State & st
o2::header::Stack Stack
Class for time synchronization of RawReader instances.
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
static constexpr ID First
Definition DetID.h:94
static constexpr ID Last
if extra detectors added, update this !!!
Definition DetID.h:92
static constexpr o2h::DataOrigin getDataOrigin(ID id)
Definition DetID.h:147
std::uint64_t position() const
Tell the current position of the file.
bool eof() const
Is the stream position at EOF.
std::unique_ptr< MessagesPerRoute > read(fair::mq::Device *device, const std::vector< o2f::OutputRoute > &outputRoutes, const std::string &rawChannel, size_t slice, bool sup0xccdb, int verbosity)
Read a single TF from the file.
std::uint64_t size() const
Tell the size of the file.
void set_position(std::uint64_t pPos)
Set the current position of the file.
GLuint id
Definition glcorearb.h:650
constexpr o2::header::DataOrigin gDataOriginFLP
Definition DataHeader.h:562
constexpr o2::header::DataDescription gDataDescriptionDISTSTF
Definition DataHeader.h:603
o2::header::DataHeader DataHeader
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
O2 data header classes and API, v0.1.
Definition DetID.h:49
constexpr o2::header::SerializationMethod gSerializationMethodNone
Definition DataHeader.h:327
std::uint64_t sCreationTime
std::uint32_t sRunNumber
std::uint32_t sFirstTForbit
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
the base header struct Every header type must begin (i.e. derive) with this. Don't use this struct di...
Definition DataHeader.h:351
const std::byte * data() const noexcept
Definition DataHeader.h:422
static const BaseHeader * get(const std::byte *b, size_t=0)
access header in buffer
Definition DataHeader.h:405
const BaseHeader * next() const noexcept
get the next header if any (const version)
Definition DataHeader.h:425
uint32_t headerVersion
version of the entire header, set by the derived header
Definition DataHeader.h:382
constexpr uint32_t size() const noexcept
Definition DataHeader.h:421
the main header struct
Definition DataHeader.h:618
TFCounterType tfCounter
Definition DataHeader.h:679
SerializationMethod payloadSerializationMethod
Definition DataHeader.h:651
TForbitType firstTForbit
Definition DataHeader.h:674
DataDescription dataDescription
Definition DataHeader.h:636
PayloadSizeType payloadSize
Definition DataHeader.h:666
RunNumberType runNumber
Definition DataHeader.h:684
static constexpr uint32_t sVersion
Definition DataHeader.h:629
static const DataHeader * Get(const BaseHeader *baseHeader)
Definition DataHeader.h:743
SplitPayloadIndexType splitPayloadIndex
Definition DataHeader.h:661
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36
const BaseHeader * first() const
Definition Stack.h:61
static void printRDH(const RDHv4 &rdh)
Definition RDHUtils.cxx:26
static bool checkRDH(const RDHv4 &rdh, bool verbose=true, bool checkZeros=false)
Definition RDHUtils.cxx:133
static const o2::header::DataHeader getDataHeader()
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"