Project
Loading...
Searching...
No Matches
DPLRawParser.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef FRAMEWORK_UTILS_DPLRAWPARSER_H
12#define FRAMEWORK_UTILS_DPLRAWPARSER_H
13
18
19#include "DPLUtils/RawParser.h"
21#include "Framework/DataRef.h"
23#include "Framework/Logger.h"
25#include "Headers/DataHeader.h"
26#include <utility> // std::declval
27
28namespace o2::framework
29{
30
66template <bool BOUNDS_CHECKS = true>
68{
69 public:
72
73 DPLRawParser() = delete;
74 DPLRawParser(InputRecord& inputs, std::vector<InputSpec> filterSpecs = {}, fair::Severity sev = fair::Severity::alarm) : mInputs(inputs), mFilterSpecs(filterSpecs), mSeverity(sev) {}
75
76 void setMaxFailureMessages(size_t n) { mMaxFailureMessages = n; }
77 void setExtFailureCounter(size_t* cnt) { mExtFailureCounter = cnt; }
80
81 // this is a dummy default buffer used to initialize the RawParser in the iterator
82 // constructor
84
91 template <typename T>
93 {
94 public:
95 using iterator_category = std::forward_iterator_tag;
97 using value_type = T;
98 using reference = T&;
99 using pointer = T*;
100 // the iterator over the input channels
101 using input_iterator = decltype(std::declval<InputRecord>().begin());
102 // the parser type
104
105 Iterator() = delete;
106
107 Iterator(InputRecord& parent, input_iterator it, input_iterator end, std::vector<InputSpec> const& filterSpecs, fair::Severity sev = fair::Severity::alarm, size_t maxErrMsg = -1, size_t* cntErrMsg = nullptr)
108 : mParent(parent), mInputIterator(it), mEnd(end), mPartIterator(mInputIterator.begin()), mParser(std::make_unique<parser_type>(reinterpret_cast<const char*>(&initializer), sizeof(initializer))), mCurrent(mParser->begin()), mFilterSpecs(filterSpecs), mMaxFailureMessages(maxErrMsg), mExtFailureCounter(cntErrMsg), mSeverity(sev)
109 {
110 mParser.reset();
111 next();
112 }
113
114 ~Iterator() = default;
115
116 // prefix increment
118 {
119 next();
120 return *this;
121 }
122 // postfix increment
123 self_type operator++(int /*unused*/)
124 {
125 self_type copy(*this);
126 operator++();
127 return copy;
128 }
129 // return reference
131 {
132 return *mCurrent;
133 }
134 // comparison
135 bool operator==(const self_type& other) const
136 {
137 bool result = mInputIterator == other.mInputIterator;
138 result = result && mPartIterator == other.mPartIterator;
139 if (mParser != nullptr && other.mParser != nullptr) {
140 result = result && mCurrent == other.mCurrent;
141 }
142 return result;
143 }
144
145 bool operator!=(const self_type& rh) const
146 {
147 return not operator==(rh);
148 }
149
152 {
153 if (mInputIterator != mEnd) {
154 return DataRefUtils::getHeader<o2::header::DataHeader*>(*mPartIterator);
155 }
156 return nullptr;
157 }
158
161 {
162 if (mInputIterator != mEnd) {
163 return DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(*mPartIterator);
164 }
165 return nullptr;
166 }
167
169 buffer_type const* raw() const
170 {
171 return mCurrent.raw();
172 }
173
175 buffer_type const* data() const
176 {
177 return mCurrent.data();
178 }
179
181 size_t offset() const
182 {
183 return mCurrent.offset();
184 }
185
187 size_t size() const
188 {
189 return mCurrent.size();
190 }
191
193 size_t sizeTotal() const
194 {
195 return mCurrent.sizeTotal();
196 }
197
200 template <typename U>
201 U const* get_if() const
202 {
203 return mCurrent.template get_if<U>();
204 }
205
206 friend std::ostream& operator<<(std::ostream& os, self_type const& it)
207 {
208 if (it.mInputIterator != it.mEnd && it.mPartIterator != it.mInputIterator.end() && it.mParser != nullptr) {
209 os << it.mCurrent;
210 }
211 return os;
212 }
213
214 // helper wrapper to control the format and content of the stream output
215 template <raw_parser::FormatSpec FmtCtrl>
216 struct Fmt {
217 static constexpr raw_parser::FormatSpec format_control = FmtCtrl;
218 Fmt(self_type const& _it) : it{_it} {}
219 self_type const& it;
220 };
221
222 template <raw_parser::FormatSpec FmtCtrl>
223 friend std::ostream& operator<<(std::ostream& os, Fmt<FmtCtrl> const& fmt)
224 {
225 auto const& it = fmt.it;
226 if (it.mInputIterator != it.mEnd && it.mPartIterator != it.mInputIterator.end() && it.mParser != nullptr) {
227 if constexpr (FmtCtrl == raw_parser::FormatSpec::Info) {
228 // TODO: need to propagate the format spec also on the RawParser object
229 // for now this operation prints the RDH version info and the table header
230 os << *it.mParser;
231 } else {
232 os << it;
233 }
234 }
235 return os;
236 }
237
238 private:
239 // the iterator over the parts in one channel
240 using part_iterator = typename input_iterator::const_iterator;
241 // the iterator over the over the parser pages
242 using parser_iterator = typename parser_type::const_iterator;
243
244 bool next()
245 {
246 auto logFailure = [this](const std::string& msg, const std::runtime_error& e) {
247 if (!this->mExtFailureCounter || (*this->mExtFailureCounter)++ < this->mMaxFailureMessages) {
248 if (this->mSeverity == fair::Severity::alarm) {
249 LOG(alarm) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
250 } else if (this->mSeverity == fair::Severity::warn) {
251 LOG(warn) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
252 } else if (this->mSeverity == fair::Severity::fatal) {
253 LOG(fatal) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
254 } else if (this->mSeverity == fair::Severity::critical) {
255 LOG(critical) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
256 } else if (this->mSeverity == fair::Severity::error) {
257 LOG(error) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
258 } else if (this->mSeverity == fair::Severity::info) {
259 LOG(info) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
260 } else {
261 LOG(debug) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
262 }
263 }
264 };
265
266 while (mInputIterator != mEnd) {
267 bool isInitial = mParser == nullptr;
268 while (mPartIterator != mInputIterator.end()) {
269 // first increment on the parser level
270 if (mParser && mCurrent != mParser->end() && ++mCurrent != mParser->end()) {
271 // we have an active parser and there is still data at the incremented iterator
272 return true;
273 }
274 // now increment on the level of one input
275 mParser.reset();
276 if (!isInitial && (mPartIterator == mInputIterator.end() || ++mPartIterator == mInputIterator.end())) {
277 // no more parts, go to next input
278 break;
279 }
280 isInitial = false;
281 // check filter rules
282 if (mFilterSpecs.size() > 0) {
283 bool isSelected = false;
284 for (auto const& spec : mFilterSpecs) {
285 if ((isSelected = DataRefUtils::match(*mPartIterator, spec)) == true) {
286 break;
287 }
288 }
289 if (!isSelected) {
290 continue;
291 }
292 }
293 gsl::span<const char> raw;
294 try {
295 raw = mParent.get<gsl::span<char>>(*mPartIterator);
296 } catch (const std::runtime_error& e) {
297 logFailure("failed to read data from ", e);
298 }
299 if (raw.size() == 0) {
300 continue;
301 }
302
303 try {
304 mParser = std::make_unique<parser_type>(raw.data(), raw.size());
305 } catch (const std::runtime_error& e) {
306 logFailure("can not create raw parser from ", e);
307 }
308
309 if (mParser != nullptr) {
310 mCurrent = mParser->begin();
311 return true;
312 }
313 } // end loop over parts on one input
314 ++mInputIterator;
315 mPartIterator = mInputIterator.begin();
316 } // end loop over inputs
317 return false;
318 }
319
320 InputRecord& mParent;
321 input_iterator mInputIterator;
322 input_iterator mEnd;
323 part_iterator mPartIterator;
324 std::unique_ptr<parser_type> mParser;
325 parser_iterator mCurrent;
326 std::vector<InputSpec> const& mFilterSpecs;
327 size_t mMaxFailureMessages = -1;
328 size_t* mExtFailureCounter = nullptr; // external optionally provided counter to throttle error messages
329 fair::Severity mSeverity = fair::Severity::alarm;
330 };
331
333
335 {
336 return const_iterator(mInputs, mInputs.begin(), mInputs.end(), mFilterSpecs, mSeverity, mMaxFailureMessages, mExtFailureCounter);
337 }
338
340 {
341 return const_iterator(mInputs, mInputs.end(), mInputs.end(), mFilterSpecs, mSeverity, mMaxFailureMessages, mExtFailureCounter);
342 }
343
346 using RDHInfo = typename o2::framework::DPLRawParser<BOUNDS_CHECKS>::const_iterator::template Fmt<raw_parser::FormatSpec::Info>;
347
348 private:
349 InputRecord& mInputs;
350 std::vector<InputSpec> mFilterSpecs;
351 size_t mMaxFailureMessages = -1;
352 size_t* mExtFailureCounter = nullptr; // external optionally provided counter to throttle error messages
353 fair::Severity mSeverity = fair::Severity::alarm;
354};
355
356} // namespace o2::framework
357
358#endif //FRAMEWORK_UTILS_DPLRAWPARSER_H
Generic parser for consecutive raw pages.
std::ostringstream debug
decltype(std::declval< InputRecord >().begin()) input_iterator
std::forward_iterator_tag iterator_category
o2::header::DataHeader const * o2DataHeader() const
get DataHeader of the current input message
size_t sizeTotal() const
get size of header + payload at current position
buffer_type const * raw() const
get pointer to raw block at current position, rdh starts here
friend std::ostream & operator<<(std::ostream &os, self_type const &it)
bool operator!=(const self_type &rh) const
buffer_type const * data() const
get pointer to payload at current position
o2::framework::DataProcessingHeader const * o2DataProcessingHeader() const
get DataProcessingHeader of the current input message
size_t offset() const
offset of payload at current position
Iterator(InputRecord &parent, input_iterator it, input_iterator end, std::vector< InputSpec > const &filterSpecs, fair::Severity sev=fair::Severity::alarm, size_t maxErrMsg=-1, size_t *cntErrMsg=nullptr)
size_t size() const
get size of payload at current position
friend std::ostream & operator<<(std::ostream &os, Fmt< FmtCtrl > const &fmt)
bool operator==(const self_type &other) const
The parser handles transparently input in the format of raw pages.
static constexpr o2::header::RAWDataHeader initializer
const_iterator end() const
Iterator< DataRef const > const_iterator
static void setErrorMode(int v)
static void setCheckIncompleteHBF(bool v)
typename rawparser_type::buffer_type buffer_type
void setMaxFailureMessages(size_t n)
typename o2::framework::DPLRawParser< BOUNDS_CHECKS >::const_iterator::template Fmt< raw_parser::FormatSpec::Info > RDHInfo
void setExtFailureCounter(size_t *cnt)
DPLRawParser(InputRecord &inputs, std::vector< InputSpec > filterSpecs={}, fair::Severity sev=fair::Severity::alarm)
const_iterator begin() const
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
const_iterator begin() const
decltype(auto) get(R binding, int part=0) const
const_iterator end() const
unsigned char buffer_type
Definition RawParser.h:468
static void setCheckIncompleteHBF(bool v)
Definition RawParser.h:647
static void setErrorMode(int v)
Definition RawParser.h:652
GLdouble n
Definition glcorearb.h:1982
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint GLuint end
Definition glcorearb.h:469
const GLdouble * v
Definition glcorearb.h:832
FormatSpec
specifier for printout
Definition RawParser.h:42
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RAWDataHeaderV7 RAWDataHeader
bool isSelected(const ExtendedTrack &track)
Defining DataPointCompositeObject explicitly as copiable.
static constexpr raw_parser::FormatSpec format_control
static bool match(DataRef const &ref, const char *binding)
the main header struct
Definition DataHeader.h:618
uint32_t memorySize
bit 64 to 79: offset to next packet in memory
VectorOfTObjectPtrs other
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
uint64_t const void const *restrict const msg
Definition x9.h:153