Project
Loading...
Searching...
No Matches
DPLRawParser.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef FRAMEWORK_UTILS_DPLRAWPARSER_H
12#define FRAMEWORK_UTILS_DPLRAWPARSER_H
13
18
19#include "DPLUtils/RawParser.h"
21#include "Framework/DataRef.h"
23#include "Framework/Logger.h"
25#include "Headers/DataHeader.h"
26#include <utility> // std::declval
27
28namespace o2::framework
29{
30
66template <bool BOUNDS_CHECKS = true>
68{
69 public:
72
73 DPLRawParser() = delete;
74 DPLRawParser(InputRecord& inputs, std::vector<InputSpec> filterSpecs = {}, fair::Severity sev = fair::Severity::alarm) : mInputs(inputs), mFilterSpecs(filterSpecs), mSeverity(sev) {}
75
76 void setMaxFailureMessages(size_t n) { mMaxFailureMessages = n; }
77 void setExtFailureCounter(size_t* cnt) { mExtFailureCounter = cnt; }
79
80 // this is a dummy default buffer used to initialize the RawParser in the iterator
81 // constructor
83
90 template <typename T>
92 {
93 public:
94 using iterator_category = std::forward_iterator_tag;
96 using value_type = T;
97 using reference = T&;
98 using pointer = T*;
99 // the iterator over the input channels
100 using input_iterator = decltype(std::declval<InputRecord>().begin());
101 // the parser type
103
104 Iterator() = delete;
105
106 Iterator(InputRecord& parent, input_iterator it, input_iterator end, std::vector<InputSpec> const& filterSpecs, fair::Severity sev = fair::Severity::alarm, size_t maxErrMsg = -1, size_t* cntErrMsg = nullptr)
107 : mParent(parent), mInputIterator(it), mEnd(end), mPartIterator(mInputIterator.begin()), mParser(std::make_unique<parser_type>(reinterpret_cast<const char*>(&initializer), sizeof(initializer))), mCurrent(mParser->begin()), mFilterSpecs(filterSpecs), mMaxFailureMessages(maxErrMsg), mExtFailureCounter(cntErrMsg), mSeverity(sev)
108 {
109 mParser.reset();
110 next();
111 }
112
113 ~Iterator() = default;
114
115 // prefix increment
117 {
118 next();
119 return *this;
120 }
121 // postfix increment
122 self_type operator++(int /*unused*/)
123 {
124 self_type copy(*this);
125 operator++();
126 return copy;
127 }
128 // return reference
130 {
131 return *mCurrent;
132 }
133 // comparison
134 bool operator==(const self_type& other) const
135 {
136 bool result = mInputIterator == other.mInputIterator;
137 result = result && mPartIterator == other.mPartIterator;
138 if (mParser != nullptr && other.mParser != nullptr) {
139 result = result && mCurrent == other.mCurrent;
140 }
141 return result;
142 }
143
144 bool operator!=(const self_type& rh) const
145 {
146 return not operator==(rh);
147 }
148
151 {
152 if (mInputIterator != mEnd) {
153 return DataRefUtils::getHeader<o2::header::DataHeader*>(*mPartIterator);
154 }
155 return nullptr;
156 }
157
160 {
161 if (mInputIterator != mEnd) {
162 return DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(*mPartIterator);
163 }
164 return nullptr;
165 }
166
168 buffer_type const* raw() const
169 {
170 return mCurrent.raw();
171 }
172
174 buffer_type const* data() const
175 {
176 return mCurrent.data();
177 }
178
180 size_t offset() const
181 {
182 return mCurrent.offset();
183 }
184
186 size_t size() const
187 {
188 return mCurrent.size();
189 }
190
192 size_t sizeTotal() const
193 {
194 return mCurrent.sizeTotal();
195 }
196
199 template <typename U>
200 U const* get_if() const
201 {
202 return mCurrent.template get_if<U>();
203 }
204
205 friend std::ostream& operator<<(std::ostream& os, self_type const& it)
206 {
207 if (it.mInputIterator != it.mEnd && it.mPartIterator != it.mInputIterator.end() && it.mParser != nullptr) {
208 os << it.mCurrent;
209 }
210 return os;
211 }
212
213 // helper wrapper to control the format and content of the stream output
214 template <raw_parser::FormatSpec FmtCtrl>
215 struct Fmt {
216 static constexpr raw_parser::FormatSpec format_control = FmtCtrl;
217 Fmt(self_type const& _it) : it{_it} {}
218 self_type const& it;
219 };
220
221 template <raw_parser::FormatSpec FmtCtrl>
222 friend std::ostream& operator<<(std::ostream& os, Fmt<FmtCtrl> const& fmt)
223 {
224 auto const& it = fmt.it;
225 if (it.mInputIterator != it.mEnd && it.mPartIterator != it.mInputIterator.end() && it.mParser != nullptr) {
226 if constexpr (FmtCtrl == raw_parser::FormatSpec::Info) {
227 // TODO: need to propagate the format spec also on the RawParser object
228 // for now this operation prints the RDH version info and the table header
229 os << *it.mParser;
230 } else {
231 os << it;
232 }
233 }
234 return os;
235 }
236
237 private:
238 // the iterator over the parts in one channel
239 using part_iterator = typename input_iterator::const_iterator;
240 // the iterator over the over the parser pages
241 using parser_iterator = typename parser_type::const_iterator;
242
243 bool next()
244 {
245 auto logFailure = [this](const std::string& msg, const std::runtime_error& e) {
246 if (!this->mExtFailureCounter || (*this->mExtFailureCounter)++ < this->mMaxFailureMessages) {
247 if (this->mSeverity == fair::Severity::alarm) {
248 LOG(alarm) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
249 } else if (this->mSeverity == fair::Severity::warn) {
250 LOG(warn) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
251 } else if (this->mSeverity == fair::Severity::fatal) {
252 LOG(fatal) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
253 } else if (this->mSeverity == fair::Severity::info) {
254 LOG(info) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
255 } else {
256 LOG(debug) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
257 }
258 }
259 };
260
261 while (mInputIterator != mEnd) {
262 bool isInitial = mParser == nullptr;
263 while (mPartIterator != mInputIterator.end()) {
264 // first increment on the parser level
265 if (mParser && mCurrent != mParser->end() && ++mCurrent != mParser->end()) {
266 // we have an active parser and there is still data at the incremented iterator
267 return true;
268 }
269 // now increment on the level of one input
270 mParser.reset();
271 if (!isInitial && (mPartIterator == mInputIterator.end() || ++mPartIterator == mInputIterator.end())) {
272 // no more parts, go to next input
273 break;
274 }
275 isInitial = false;
276 // check filter rules
277 if (mFilterSpecs.size() > 0) {
278 bool isSelected = false;
279 for (auto const& spec : mFilterSpecs) {
280 if ((isSelected = DataRefUtils::match(*mPartIterator, spec)) == true) {
281 break;
282 }
283 }
284 if (!isSelected) {
285 continue;
286 }
287 }
288 gsl::span<const char> raw;
289 try {
290 raw = mParent.get<gsl::span<char>>(*mPartIterator);
291 } catch (const std::runtime_error& e) {
292 logFailure("failed to read data from ", e);
293 }
294 if (raw.size() == 0) {
295 continue;
296 }
297
298 try {
299 mParser = std::make_unique<parser_type>(raw.data(), raw.size());
300 } catch (const std::runtime_error& e) {
301 logFailure("can not create raw parser from ", e);
302 }
303
304 if (mParser != nullptr) {
305 mCurrent = mParser->begin();
306 return true;
307 }
308 } // end loop over parts on one input
309 ++mInputIterator;
310 mPartIterator = mInputIterator.begin();
311 } // end loop over inputs
312 return false;
313 }
314
315 InputRecord& mParent;
316 input_iterator mInputIterator;
317 input_iterator mEnd;
318 part_iterator mPartIterator;
319 std::unique_ptr<parser_type> mParser;
320 parser_iterator mCurrent;
321 std::vector<InputSpec> const& mFilterSpecs;
322 size_t mMaxFailureMessages = -1;
323 size_t* mExtFailureCounter = nullptr; // external optionally provided counter to throttle error messages
324 fair::Severity mSeverity = fair::Severity::alarm;
325 };
326
328
330 {
331 return const_iterator(mInputs, mInputs.begin(), mInputs.end(), mFilterSpecs, mSeverity, mMaxFailureMessages, mExtFailureCounter);
332 }
333
335 {
336 return const_iterator(mInputs, mInputs.end(), mInputs.end(), mFilterSpecs, mSeverity, mMaxFailureMessages, mExtFailureCounter);
337 }
338
341 using RDHInfo = typename o2::framework::DPLRawParser<BOUNDS_CHECKS>::const_iterator::template Fmt<raw_parser::FormatSpec::Info>;
342
343 private:
344 InputRecord& mInputs;
345 std::vector<InputSpec> mFilterSpecs;
346 size_t mMaxFailureMessages = -1;
347 size_t* mExtFailureCounter = nullptr; // external optionally provided counter to throttle error messages
348 fair::Severity mSeverity = fair::Severity::alarm;
349};
350
351} // namespace o2::framework
352
353#endif //FRAMEWORK_UTILS_DPLRAWPARSER_H
Generic parser for consecutive raw pages.
std::ostringstream debug
decltype(std::declval< InputRecord >().begin()) input_iterator
std::forward_iterator_tag iterator_category
o2::header::DataHeader const * o2DataHeader() const
get DataHeader of the current input message
size_t sizeTotal() const
get size of header + payload at current position
buffer_type const * raw() const
get pointer to raw block at current position, rdh starts here
friend std::ostream & operator<<(std::ostream &os, self_type const &it)
bool operator!=(const self_type &rh) const
buffer_type const * data() const
get pointer to payload at current position
o2::framework::DataProcessingHeader const * o2DataProcessingHeader() const
get DataProcessingHeader of the current input message
size_t offset() const
offset of payload at current position
Iterator(InputRecord &parent, input_iterator it, input_iterator end, std::vector< InputSpec > const &filterSpecs, fair::Severity sev=fair::Severity::alarm, size_t maxErrMsg=-1, size_t *cntErrMsg=nullptr)
size_t size() const
get size of payload at current position
friend std::ostream & operator<<(std::ostream &os, Fmt< FmtCtrl > const &fmt)
bool operator==(const self_type &other) const
The parser handles transparently input in the format of raw pages.
static constexpr o2::header::RAWDataHeader initializer
const_iterator end() const
Iterator< DataRef const > const_iterator
static void setCheckIncompleteHBF(bool v)
typename rawparser_type::buffer_type buffer_type
void setMaxFailureMessages(size_t n)
typename o2::framework::DPLRawParser< BOUNDS_CHECKS >::const_iterator::template Fmt< raw_parser::FormatSpec::Info > RDHInfo
void setExtFailureCounter(size_t *cnt)
DPLRawParser(InputRecord &inputs, std::vector< InputSpec > filterSpecs={}, fair::Severity sev=fair::Severity::alarm)
const_iterator begin() const
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
const_iterator begin() const
decltype(auto) get(R binding, int part=0) const
const_iterator end() const
unsigned char buffer_type
Definition RawParser.h:468
static void setCheckIncompleteHBF(bool v)
Definition RawParser.h:647
GLdouble n
Definition glcorearb.h:1982
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint GLuint end
Definition glcorearb.h:469
const GLdouble * v
Definition glcorearb.h:832
FormatSpec
specifier for printout
Definition RawParser.h:42
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RAWDataHeaderV7 RAWDataHeader
bool isSelected(const ExtendedTrack &track)
Defining DataPointCompositeObject explicitly as copiable.
static constexpr raw_parser::FormatSpec format_control
static bool match(DataRef const &ref, const char *binding)
the main header struct
Definition DataHeader.h:618
uint32_t memorySize
bit 64 to 79: offset to next packet in memory
VectorOfTObjectPtrs other
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
uint64_t const void const *restrict const msg
Definition x9.h:153