Project
Loading...
Searching...
No Matches
DPLRawParser.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef FRAMEWORK_UTILS_DPLRAWPARSER_H
12#define FRAMEWORK_UTILS_DPLRAWPARSER_H
13
18
19#include "DPLUtils/RawParser.h"
21#include "Framework/DataRef.h"
23#include "Framework/Logger.h"
25#include "Headers/DataHeader.h"
26#include <utility> // std::declval
27
28namespace o2::framework
29{
30
66template <bool BOUNDS_CHECKS = true>
68{
69 public:
72
73 DPLRawParser() = delete;
74 DPLRawParser(InputRecord& inputs, std::vector<InputSpec> filterSpecs = {}, fair::Severity sev = fair::Severity::alarm) : mInputs(inputs), mFilterSpecs(filterSpecs), mSeverity(sev) {}
75
76 void setMaxFailureMessages(size_t n) { mMaxFailureMessages = n; }
77 void setExtFailureCounter(size_t* cnt) { mExtFailureCounter = cnt; }
80
81 // this is a dummy default buffer used to initialize the RawParser in the iterator
82 // constructor
84
91 template <typename T>
93 {
94 public:
95 using iterator_category = std::forward_iterator_tag;
97 using value_type = T;
98 using reference = T&;
99 using pointer = T*;
100 // the iterator over the input channels
101 using input_iterator = decltype(std::declval<InputRecord>().begin());
102 // the parser type
104
105 Iterator() = delete;
106
107 Iterator(InputRecord& parent, input_iterator it, input_iterator end, std::vector<InputSpec> const& filterSpecs, fair::Severity sev = fair::Severity::alarm, size_t maxErrMsg = -1, size_t* cntErrMsg = nullptr)
108 : mParent(parent), mInputIterator(it), mEnd(end), mCurrentRange(it.parts()), mPartIterator(mCurrentRange.begin()), mParser(std::make_unique<parser_type>(reinterpret_cast<const char*>(&initializer), sizeof(initializer))), mCurrent(mParser->begin()), mFilterSpecs(filterSpecs), mMaxFailureMessages(maxErrMsg), mExtFailureCounter(cntErrMsg), mSeverity(sev)
109 {
110 mParser.reset();
111 next();
112 }
113
114 // prefix increment
116 {
117 next();
118 return *this;
119 }
120 // postfix increment
121 self_type operator++(int /*unused*/)
122 {
123 self_type copy(*this);
124 operator++();
125 return copy;
126 }
127 // return reference
129 {
130 return *mCurrent;
131 }
132 // comparison
133 bool operator==(const self_type& other) const
134 {
135 bool result = mInputIterator == other.mInputIterator;
136 result = result && mPartIterator == other.mPartIterator;
137 if (mParser != nullptr && other.mParser != nullptr) {
138 result = result && mCurrent == other.mCurrent;
139 }
140 return result;
141 }
142
143 bool operator!=(const self_type& rh) const
144 {
145 return not operator==(rh);
146 }
147
150 {
151 if (mInputIterator != mEnd) {
152 return DataRefUtils::getHeader<o2::header::DataHeader*>(*mPartIterator);
153 }
154 return nullptr;
155 }
156
159 {
160 if (mInputIterator != mEnd) {
161 return DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(*mPartIterator);
162 }
163 return nullptr;
164 }
165
167 buffer_type const* raw() const
168 {
169 return mCurrent.raw();
170 }
171
173 buffer_type const* data() const
174 {
175 return mCurrent.data();
176 }
177
179 size_t offset() const
180 {
181 return mCurrent.offset();
182 }
183
185 size_t size() const
186 {
187 return mCurrent.size();
188 }
189
191 size_t sizeTotal() const
192 {
193 return mCurrent.sizeTotal();
194 }
195
198 template <typename U>
199 U const* get_if() const
200 {
201 return mCurrent.template get_if<U>();
202 }
203
204 friend std::ostream& operator<<(std::ostream& os, self_type const& it)
205 {
206 if (it.mInputIterator != it.mEnd && it.mPartIterator != it.mCurrentRange.end() && it.mParser != nullptr) {
207 os << it.mCurrent;
208 }
209 return os;
210 }
211
212 // helper wrapper to control the format and content of the stream output
213 template <raw_parser::FormatSpec FmtCtrl>
214 struct Fmt {
215 static constexpr raw_parser::FormatSpec format_control = FmtCtrl;
216 Fmt(self_type const& _it) : it{_it} {}
217 self_type const& it;
218 };
219
220 template <raw_parser::FormatSpec FmtCtrl>
221 friend std::ostream& operator<<(std::ostream& os, Fmt<FmtCtrl> const& fmt)
222 {
223 auto const& it = fmt.it;
224 if (it.mInputIterator != it.mEnd && it.mPartIterator != it.mCurrentRange.end() && it.mParser != nullptr) {
225 if constexpr (FmtCtrl == raw_parser::FormatSpec::Info) {
226 // TODO: need to propagate the format spec also on the RawParser object
227 // for now this operation prints the RDH version info and the table header
228 os << *it.mParser;
229 } else {
230 os << it;
231 }
232 }
233 return os;
234 }
235
236 private:
237 // the range over parts in one slot — must be stored as a member so the
238 // part_iterator (which holds a pointer into it) does not dangle
239 using part_range = InputRecord::PartRange;
241 // the iterator over the over the parser pages
242 using parser_iterator = typename parser_type::const_iterator;
243
244 bool next()
245 {
246 auto logFailure = [this](const std::string& msg, const std::runtime_error& e) {
247 if (!this->mExtFailureCounter || (*this->mExtFailureCounter)++ < this->mMaxFailureMessages) {
248 if (this->mSeverity == fair::Severity::alarm) {
249 LOG(alarm) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
250 } else if (this->mSeverity == fair::Severity::warn) {
251 LOG(warn) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
252 } else if (this->mSeverity == fair::Severity::fatal) {
253 LOG(fatal) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
254 } else if (this->mSeverity == fair::Severity::critical) {
255 LOG(critical) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
256 } else if (this->mSeverity == fair::Severity::error) {
257 LOG(error) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
258 } else if (this->mSeverity == fair::Severity::info) {
259 LOG(info) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
260 } else {
261 LOG(debug) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
262 }
263 }
264 };
265
266 while (mInputIterator != mEnd) {
267 bool isInitial = mParser == nullptr;
268 while (mPartIterator != mCurrentRange.end()) {
269 // first increment on the parser level
270 if (mParser && mCurrent != mParser->end() && ++mCurrent != mParser->end()) {
271 // we have an active parser and there is still data at the incremented iterator
272 return true;
273 }
274 // now increment on the level of one input
275 mParser.reset();
276 if (!isInitial && ++mPartIterator == mCurrentRange.end()) {
277 // no more parts, go to next input
278 break;
279 }
280 isInitial = false;
281 // check filter rules
282 if (mFilterSpecs.size() > 0) {
283 bool isSelected = false;
284 for (auto const& spec : mFilterSpecs) {
285 if ((isSelected = DataRefUtils::match(*mPartIterator, spec)) == true) {
286 break;
287 }
288 }
289 if (!isSelected) {
290 continue;
291 }
292 }
293 gsl::span<const char> raw;
294 try {
295 raw = mParent.get<gsl::span<char>>(*mPartIterator);
296 } catch (const std::runtime_error& e) {
297 logFailure("failed to read data from ", e);
298 }
299 if (raw.size() == 0) {
300 continue;
301 }
302
303 try {
304 mParser = std::make_unique<parser_type>(raw.data(), raw.size());
305 } catch (const std::runtime_error& e) {
306 logFailure("can not create raw parser from ", e);
307 }
308
309 if (mParser != nullptr) {
310 mCurrent = mParser->begin();
311 return true;
312 }
313 } // end loop over parts on one input
314 ++mInputIterator;
315 mCurrentRange = mInputIterator.parts();
316 mPartIterator = mCurrentRange.begin();
317 } // end loop over inputs
318 return false;
319 }
320
321 InputRecord& mParent;
322 input_iterator mInputIterator;
323 input_iterator mEnd;
324 part_range mCurrentRange; // declared before mPartIterator — initialized first
325 part_iterator mPartIterator;
326 std::unique_ptr<parser_type> mParser;
327 parser_iterator mCurrent;
328 std::vector<InputSpec> const& mFilterSpecs;
329 size_t mMaxFailureMessages = -1;
330 size_t* mExtFailureCounter = nullptr; // external optionally provided counter to throttle error messages
331 fair::Severity mSeverity = fair::Severity::alarm;
332 };
333
335
337 {
338 return const_iterator(mInputs, mInputs.begin(), mInputs.end(), mFilterSpecs, mSeverity, mMaxFailureMessages, mExtFailureCounter);
339 }
340
342 {
343 return const_iterator(mInputs, mInputs.end(), mInputs.end(), mFilterSpecs, mSeverity, mMaxFailureMessages, mExtFailureCounter);
344 }
345
348 using RDHInfo = typename o2::framework::DPLRawParser<BOUNDS_CHECKS>::const_iterator::template Fmt<raw_parser::FormatSpec::Info>;
349
350 private:
351 InputRecord& mInputs;
352 std::vector<InputSpec> mFilterSpecs;
353 size_t mMaxFailureMessages = -1;
354 size_t* mExtFailureCounter = nullptr; // external optionally provided counter to throttle error messages
355 fair::Severity mSeverity = fair::Severity::alarm;
356};
357
358} // namespace o2::framework
359
360#endif // FRAMEWORK_UTILS_DPLRAWPARSER_H
std::ostringstream debug
Generic parser for consecutive raw pages.
decltype(std::declval< InputRecord >().begin()) input_iterator
std::forward_iterator_tag iterator_category
o2::header::DataHeader const * o2DataHeader() const
get DataHeader of the current input message
size_t sizeTotal() const
get size of header + payload at current position
buffer_type const * raw() const
get pointer to raw block at current position, rdh starts here
friend std::ostream & operator<<(std::ostream &os, self_type const &it)
bool operator!=(const self_type &rh) const
buffer_type const * data() const
get pointer to payload at current position
o2::framework::DataProcessingHeader const * o2DataProcessingHeader() const
get DataProcessingHeader of the current input message
size_t offset() const
offset of payload at current position
Iterator(InputRecord &parent, input_iterator it, input_iterator end, std::vector< InputSpec > const &filterSpecs, fair::Severity sev=fair::Severity::alarm, size_t maxErrMsg=-1, size_t *cntErrMsg=nullptr)
size_t size() const
get size of payload at current position
friend std::ostream & operator<<(std::ostream &os, Fmt< FmtCtrl > const &fmt)
bool operator==(const self_type &other) const
The parser handles transparently input in the format of raw pages.
static constexpr o2::header::RAWDataHeader initializer
const_iterator end() const
Iterator< DataRef const > const_iterator
static void setErrorMode(int v)
static void setCheckIncompleteHBF(bool v)
typename rawparser_type::buffer_type buffer_type
void setMaxFailureMessages(size_t n)
typename o2::framework::DPLRawParser< BOUNDS_CHECKS >::const_iterator::template Fmt< raw_parser::FormatSpec::Info > RDHInfo
void setExtFailureCounter(size_t *cnt)
DPLRawParser(InputRecord &inputs, std::vector< InputSpec > filterSpecs={}, fair::Severity sev=fair::Severity::alarm)
const_iterator begin() const
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
const_iterator begin() const
decltype(auto) get(R binding, int part=0) const
const_iterator end() const
unsigned char buffer_type
Definition RawParser.h:468
static void setCheckIncompleteHBF(bool v)
Definition RawParser.h:647
static void setErrorMode(int v)
Definition RawParser.h:652
GLdouble n
Definition glcorearb.h:1982
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint GLuint end
Definition glcorearb.h:469
const GLdouble * v
Definition glcorearb.h:832
FormatSpec
specifier for printout
Definition RawParser.h:42
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
RAWDataHeaderV7 RAWDataHeader
bool isSelected(const ExtendedTrack &track)
static constexpr raw_parser::FormatSpec format_control
static bool match(DataRef const &ref, const char *binding)
A range over the parts of a single slot that sets ref.spec on each DataRef.
InputSpan::Iterator< PartRange, const DataRef > end() const
InputSpan::Iterator< PartRange, const DataRef > begin() const
the main header struct
Definition DataHeader.h:620
uint32_t memorySize
bit 64 to 79: offset to next packet in memory
VectorOfTObjectPtrs other
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
uint64_t const void const *restrict const msg
Definition x9.h:153