Project
Loading...
Searching...
No Matches
DPLRawParser.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef FRAMEWORK_UTILS_DPLRAWPARSER_H
12#define FRAMEWORK_UTILS_DPLRAWPARSER_H
13
18
19#include "DPLUtils/RawParser.h"
21#include "Framework/DataRef.h"
23#include "Framework/Logger.h"
25#include "Headers/DataHeader.h"
26#include <utility> // std::declval
27
28namespace o2::framework
29{
30
66template <bool BOUNDS_CHECKS = true>
68{
69 public:
72
73 DPLRawParser() = delete;
74 DPLRawParser(InputRecord& inputs, std::vector<InputSpec> filterSpecs = {}, fair::Severity sev = fair::Severity::alarm) : mInputs(inputs), mFilterSpecs(filterSpecs), mSeverity(sev) {}
75
76 void setMaxFailureMessages(size_t n) { mMaxFailureMessages = n; }
77 void setExtFailureCounter(size_t* cnt) { mExtFailureCounter = cnt; }
79
80 // this is a dummy default buffer used to initialize the RawParser in the iterator
81 // constructor
83
90 template <typename T>
92 {
93 public:
94 using iterator_category = std::forward_iterator_tag;
96 using value_type = T;
97 using reference = T&;
98 using pointer = T*;
99 // the iterator over the input channels
100 using input_iterator = decltype(std::declval<InputRecord>().begin());
101 // the parser type
103
104 Iterator() = delete;
105
106 Iterator(InputRecord& parent, input_iterator it, input_iterator end, std::vector<InputSpec> const& filterSpecs, fair::Severity sev = fair::Severity::alarm, size_t maxErrMsg = -1, size_t* cntErrMsg = nullptr)
107 : mParent(parent), mInputIterator(it), mEnd(end), mPartIterator(mInputIterator.begin()), mParser(std::make_unique<parser_type>(reinterpret_cast<const char*>(&initializer), sizeof(initializer))), mCurrent(mParser->begin()), mFilterSpecs(filterSpecs), mMaxFailureMessages(maxErrMsg), mExtFailureCounter(cntErrMsg), mSeverity(sev)
108 {
109 mParser.reset();
110 next();
111 }
112
113 ~Iterator() = default;
114
115 // prefix increment
117 {
118 next();
119 return *this;
120 }
121 // postfix increment
122 self_type operator++(int /*unused*/)
123 {
124 self_type copy(*this);
125 operator++();
126 return copy;
127 }
128 // return reference
130 {
131 return *mCurrent;
132 }
133 // comparison
134 bool operator==(const self_type& other) const
135 {
136 bool result = mInputIterator == other.mInputIterator;
137 result = result && mPartIterator == other.mPartIterator;
138 if (mParser != nullptr && other.mParser != nullptr) {
139 result = result && mCurrent == other.mCurrent;
140 }
141 return result;
142 }
143
144 bool operator!=(const self_type& rh) const
145 {
146 return not operator==(rh);
147 }
148
151 {
152 if (mInputIterator != mEnd) {
153 return DataRefUtils::getHeader<o2::header::DataHeader*>(*mPartIterator);
154 }
155 return nullptr;
156 }
157
160 {
161 if (mInputIterator != mEnd) {
162 return DataRefUtils::getHeader<o2::framework::DataProcessingHeader*>(*mPartIterator);
163 }
164 return nullptr;
165 }
166
168 buffer_type const* raw() const
169 {
170 return mCurrent.raw();
171 }
172
174 buffer_type const* data() const
175 {
176 return mCurrent.data();
177 }
178
180 size_t offset() const
181 {
182 return mCurrent.offset();
183 }
184
186 size_t size() const
187 {
188 return mCurrent.size();
189 }
190
192 size_t sizeTotal() const
193 {
194 return mCurrent.sizeTotal();
195 }
196
199 template <typename U>
200 U const* get_if() const
201 {
202 return mCurrent.template get_if<U>();
203 }
204
205 friend std::ostream& operator<<(std::ostream& os, self_type const& it)
206 {
207 if (it.mInputIterator != it.mEnd && it.mPartIterator != it.mInputIterator.end() && it.mParser != nullptr) {
208 os << it.mCurrent;
209 }
210 return os;
211 }
212
213 // helper wrapper to control the format and content of the stream output
214 template <raw_parser::FormatSpec FmtCtrl>
215 struct Fmt {
216 static constexpr raw_parser::FormatSpec format_control = FmtCtrl;
217 Fmt(self_type const& _it) : it{_it} {}
218 self_type const& it;
219 };
220
221 template <raw_parser::FormatSpec FmtCtrl>
222 friend std::ostream& operator<<(std::ostream& os, Fmt<FmtCtrl> const& fmt)
223 {
224 auto const& it = fmt.it;
225 if (it.mInputIterator != it.mEnd && it.mPartIterator != it.mInputIterator.end() && it.mParser != nullptr) {
226 if constexpr (FmtCtrl == raw_parser::FormatSpec::Info) {
227 // TODO: need to propagate the format spec also on the RawParser object
228 // for now this operation prints the RDH version info and the table header
229 os << *it.mParser;
230 } else {
231 os << it;
232 }
233 }
234 return os;
235 }
236
237 private:
238 // the iterator over the parts in one channel
239 using part_iterator = typename input_iterator::const_iterator;
240 // the iterator over the over the parser pages
241 using parser_iterator = typename parser_type::const_iterator;
242
243 bool next()
244 {
245 auto logFailure = [this](const std::string& msg, const std::runtime_error& e) {
246 if (!this->mExtFailureCounter || (*this->mExtFailureCounter)++ < this->mMaxFailureMessages) {
247 if (this->mSeverity == fair::Severity::alarm) {
248 LOG(alarm) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
249 } else if (this->mSeverity == fair::Severity::warn) {
250 LOG(warn) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
251 } else if (this->mSeverity == fair::Severity::fatal) {
252 LOG(fatal) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
253 } else if (this->mSeverity == fair::Severity::critical) {
254 LOG(critical) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
255 } else if (this->mSeverity == fair::Severity::error) {
256 LOG(error) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
257 } else if (this->mSeverity == fair::Severity::info) {
258 LOG(info) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
259 } else {
260 LOG(debug) << msg << (*this->mInputIterator).spec->binding << " : " << e.what();
261 }
262 }
263 };
264
265 while (mInputIterator != mEnd) {
266 bool isInitial = mParser == nullptr;
267 while (mPartIterator != mInputIterator.end()) {
268 // first increment on the parser level
269 if (mParser && mCurrent != mParser->end() && ++mCurrent != mParser->end()) {
270 // we have an active parser and there is still data at the incremented iterator
271 return true;
272 }
273 // now increment on the level of one input
274 mParser.reset();
275 if (!isInitial && (mPartIterator == mInputIterator.end() || ++mPartIterator == mInputIterator.end())) {
276 // no more parts, go to next input
277 break;
278 }
279 isInitial = false;
280 // check filter rules
281 if (mFilterSpecs.size() > 0) {
282 bool isSelected = false;
283 for (auto const& spec : mFilterSpecs) {
284 if ((isSelected = DataRefUtils::match(*mPartIterator, spec)) == true) {
285 break;
286 }
287 }
288 if (!isSelected) {
289 continue;
290 }
291 }
292 gsl::span<const char> raw;
293 try {
294 raw = mParent.get<gsl::span<char>>(*mPartIterator);
295 } catch (const std::runtime_error& e) {
296 logFailure("failed to read data from ", e);
297 }
298 if (raw.size() == 0) {
299 continue;
300 }
301
302 try {
303 mParser = std::make_unique<parser_type>(raw.data(), raw.size());
304 } catch (const std::runtime_error& e) {
305 logFailure("can not create raw parser from ", e);
306 }
307
308 if (mParser != nullptr) {
309 mCurrent = mParser->begin();
310 return true;
311 }
312 } // end loop over parts on one input
313 ++mInputIterator;
314 mPartIterator = mInputIterator.begin();
315 } // end loop over inputs
316 return false;
317 }
318
319 InputRecord& mParent;
320 input_iterator mInputIterator;
321 input_iterator mEnd;
322 part_iterator mPartIterator;
323 std::unique_ptr<parser_type> mParser;
324 parser_iterator mCurrent;
325 std::vector<InputSpec> const& mFilterSpecs;
326 size_t mMaxFailureMessages = -1;
327 size_t* mExtFailureCounter = nullptr; // external optionally provided counter to throttle error messages
328 fair::Severity mSeverity = fair::Severity::alarm;
329 };
330
332
334 {
335 return const_iterator(mInputs, mInputs.begin(), mInputs.end(), mFilterSpecs, mSeverity, mMaxFailureMessages, mExtFailureCounter);
336 }
337
339 {
340 return const_iterator(mInputs, mInputs.end(), mInputs.end(), mFilterSpecs, mSeverity, mMaxFailureMessages, mExtFailureCounter);
341 }
342
345 using RDHInfo = typename o2::framework::DPLRawParser<BOUNDS_CHECKS>::const_iterator::template Fmt<raw_parser::FormatSpec::Info>;
346
347 private:
348 InputRecord& mInputs;
349 std::vector<InputSpec> mFilterSpecs;
350 size_t mMaxFailureMessages = -1;
351 size_t* mExtFailureCounter = nullptr; // external optionally provided counter to throttle error messages
352 fair::Severity mSeverity = fair::Severity::alarm;
353};
354
355} // namespace o2::framework
356
357#endif //FRAMEWORK_UTILS_DPLRAWPARSER_H
Generic parser for consecutive raw pages.
std::ostringstream debug
decltype(std::declval< InputRecord >().begin()) input_iterator
std::forward_iterator_tag iterator_category
o2::header::DataHeader const * o2DataHeader() const
get DataHeader of the current input message
size_t sizeTotal() const
get size of header + payload at current position
buffer_type const * raw() const
get pointer to raw block at current position, rdh starts here
friend std::ostream & operator<<(std::ostream &os, self_type const &it)
bool operator!=(const self_type &rh) const
buffer_type const * data() const
get pointer to payload at current position
o2::framework::DataProcessingHeader const * o2DataProcessingHeader() const
get DataProcessingHeader of the current input message
size_t offset() const
offset of payload at current position
Iterator(InputRecord &parent, input_iterator it, input_iterator end, std::vector< InputSpec > const &filterSpecs, fair::Severity sev=fair::Severity::alarm, size_t maxErrMsg=-1, size_t *cntErrMsg=nullptr)
size_t size() const
get size of payload at current position
friend std::ostream & operator<<(std::ostream &os, Fmt< FmtCtrl > const &fmt)
bool operator==(const self_type &other) const
The parser handles transparently input in the format of raw pages.
static constexpr o2::header::RAWDataHeader initializer
const_iterator end() const
Iterator< DataRef const > const_iterator
static void setCheckIncompleteHBF(bool v)
typename rawparser_type::buffer_type buffer_type
void setMaxFailureMessages(size_t n)
typename o2::framework::DPLRawParser< BOUNDS_CHECKS >::const_iterator::template Fmt< raw_parser::FormatSpec::Info > RDHInfo
void setExtFailureCounter(size_t *cnt)
DPLRawParser(InputRecord &inputs, std::vector< InputSpec > filterSpecs={}, fair::Severity sev=fair::Severity::alarm)
const_iterator begin() const
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
const_iterator begin() const
decltype(auto) get(R binding, int part=0) const
const_iterator end() const
unsigned char buffer_type
Definition RawParser.h:468
static void setCheckIncompleteHBF(bool v)
Definition RawParser.h:647
GLdouble n
Definition glcorearb.h:1982
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint GLuint end
Definition glcorearb.h:469
const GLdouble * v
Definition glcorearb.h:832
FormatSpec
specifier for printout
Definition RawParser.h:42
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RAWDataHeaderV7 RAWDataHeader
bool isSelected(const ExtendedTrack &track)
Defining DataPointCompositeObject explicitly as copiable.
static constexpr raw_parser::FormatSpec format_control
static bool match(DataRef const &ref, const char *binding)
the main header struct
Definition DataHeader.h:618
uint32_t memorySize
bit 64 to 79: offset to next packet in memory
VectorOfTObjectPtrs other
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
uint64_t const void const *restrict const msg
Definition x9.h:153