Project
Loading...
Searching...
No Matches
DPLRawPageSequencer.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef FRAMEWORK_UTILS_DPLRAWPAGESEQUENCER_H
12#define FRAMEWORK_UTILS_DPLRAWPAGESEQUENCER_H
13
18
19#include "DPLUtils/RawParser.h"
20#include "Framework/DataRef.h"
22#include "Framework/Logger.h"
24#include <utility> // std::declval
25
26// Framework does not depend on detectors, but this file is header-only.
27// So we just include the RDH header, and the user must make sure it is
28// available. Otherwise there is no way to properly parse raw data
29// without having access to RDH info
31
32namespace o2::framework
33{
34class InputRecord;
35
69{
70 public:
73
75 DPLRawPageSequencer(InputRecord& inputs, std::vector<InputSpec> filterSpecs = {}) : mInput(inputs, filterSpecs) {}
76
77 template <typename Predicate, typename Inserter, typename Precheck>
78 int operator()(Predicate&& pred, Inserter&& inserter, Precheck preCheck)
79 {
80 return binary(std::forward<Predicate>(pred), std::forward<Inserter>(inserter), std::forward<Precheck>(preCheck));
81 }
82
83 template <typename Predicate, typename Inserter>
84 int operator()(Predicate&& pred, Inserter&& inserter)
85 {
86 return binary(std::forward<Predicate>(pred), std::forward<Inserter>(inserter), [](...) { return true; });
87 }
88
89 template <typename Predicate, typename Inserter>
90 int binary(Predicate pred, Inserter inserter)
91 {
92 return binary(std::forward<Predicate>(pred), std::forward<Inserter>(inserter), [](...) { return true; });
93 }
94
95 template <typename Predicate, typename Inserter, typename Precheck>
96 int binary(Predicate pred, Inserter inserter, Precheck preCheck)
97 {
98 int retVal = 0;
99 for (auto const& ref : mInput) {
101 const auto dh = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
102 if (dh == nullptr) {
103 continue;
104 }
105 if (size == 0) {
106 if (dh->subSpecification == 0xDEADBEEF) {
108 }
109 continue;
110 }
111 auto const pageSize = rawparser_type::max_size;
112 auto nPages = size / pageSize + (size % pageSize ? 1 : 0);
113 if (!preCheck(ref.payload, dh->subSpecification)) {
114 continue;
115 }
116 // FIXME: automatic type from inserter/predicate?
117 const char* iterator = ref.payload;
118
119 auto check = [&pred, &pageSize, payload = ref.payload](size_t left, size_t right) -> bool {
120 return pred(payload + left * pageSize, payload + right * pageSize);
121 };
122 auto insert = [&inserter, &pageSize, payload = ref.payload](size_t pos, size_t n, uint32_t subSpec) -> void {
123 inserter(payload + pos * pageSize, n, subSpec);
124 };
125 // binary search the next different page based on the check predicate
126 auto search = [&check](size_t first, size_t n) -> size_t {
127 auto count = n;
128 auto pos = first;
129 while (count > 0) {
130 auto step = count / 2;
131 if (check(first, pos + step)) {
132 // still the same
133 pos += step;
134 count = n - (pos - first);
135 } else {
136 if (step == 1) {
137 pos += step;
138 break;
139 }
140 count = step;
141 }
142 }
143 return pos;
144 };
145
146 // check if the last block contains a valid RDH, otherwise data is corrupted or 8kb assumption is wrong
147 if (!o2::raw::RDHUtils::checkRDH(ref.payload, false) || (nPages > 1 && (o2::raw::RDHUtils::getMemorySize(ref.payload) != pageSize || !o2::raw::RDHUtils::checkRDH(ref.payload + (nPages - 1) * pageSize, false)))) {
148 forwardInternal(std::forward<Predicate>(pred), std::forward<Inserter>(inserter), ref.payload, size, dh);
149 retVal = 1;
150 continue;
151 }
152
153 size_t p = 0;
154 do {
155 // insert the full block if the last RDH matches the position
156 if (check(p, nPages - 1)) {
157 insert(p, nPages - p, dh->subSpecification);
158 break;
159 }
160 auto q = search(p, nPages - p);
161 insert(p, q - p, dh->subSpecification);
162 p = q;
163 } while (p < nPages);
164 // if payloads are consecutive in memory we could apply this algorithm even over
165 // O2 message boundaries
166 }
167 return retVal;
168 }
169
170 template <typename Predicate, typename Inserter>
171 int forward(Predicate pred, Inserter inserter)
172 {
173 return forward(std::forward<Predicate>(pred), std::forward<Inserter>(inserter), [](...) { return true; });
174 }
175
176 template <typename Predicate, typename Inserter, typename Precheck>
177 int forward(Predicate pred, Inserter inserter, Precheck preCheck)
178 {
179 for (auto const& ref : mInput) {
181 if (size == 0) {
182 continue;
183 }
184 auto dh = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
185 if (!preCheck(ref.payload, dh->subSpecification)) {
186 continue;
187 }
188 forwardInternal(std::forward<Predicate>(pred), std::forward<Inserter>(inserter), ref.payload, size, dh);
189 }
190 return 0;
191 }
192
193 private:
194 InputRecordWalker mInput;
195
196 template <typename Predicate, typename Inserter>
197 void forwardInternal(Predicate pred, Inserter inserter, const char* data, size_t size, const o2::header::DataHeader* dh)
198 {
200 const char* ptr = nullptr;
201 int count = 0;
202 for (auto it = parser.begin(); it != parser.end(); it++) {
203 const char* current = reinterpret_cast<const char*>(it.raw());
204 if (ptr == nullptr) {
205 ptr = current;
206 } else if (pred(ptr, current) == false) {
207 if (count) {
208 inserter(ptr, count, dh->subSpecification);
209 }
210 count = 0;
211 ptr = current;
212 }
213 count++;
214 if (it.sizeTotal() != rawparser_type::max_size) {
215 inserter(ptr, count, dh->subSpecification);
216 count = 0;
217 ptr = nullptr;
218 }
219 }
220 if (count) {
221 inserter(ptr, count, dh->subSpecification);
222 }
223 }
224};
225
226} // namespace o2::framework
227
228#endif //FRAMEWORK_UTILS_DPLRAWPAGESEQUENCER_H
int32_t retVal
A helper class to iteratate over all parts of all input routes.
uint16_t pos
Definition RawData.h:3
Generic parser for consecutive raw pages.
TBranch * ptr
This utility handles transparently the DPL inputs and triggers a customizable action on sequences of ...
typename rawparser_type::buffer_type buffer_type
int binary(Predicate pred, Inserter inserter)
DPLRawPageSequencer(InputRecord &inputs, std::vector< InputSpec > filterSpecs={})
int operator()(Predicate &&pred, Inserter &&inserter)
int forward(Predicate pred, Inserter inserter, Precheck preCheck)
int binary(Predicate pred, Inserter inserter, Precheck preCheck)
int forward(Predicate pred, Inserter inserter)
int operator()(Predicate &&pred, Inserter &&inserter, Precheck preCheck)
A helper class to iteratate over all parts of all input routes.
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
unsigned char buffer_type
Definition RawParser.h:468
static size_t const max_size
Definition RawParser.h:469
GLdouble n
Definition glcorearb.h:1982
const GLuint GLenum const void * binary
Definition glcorearb.h:1898
GLint GLsizei count
Definition glcorearb.h:399
GLsizeiptr size
Definition glcorearb.h:659
GLdouble GLdouble right
Definition glcorearb.h:4077
GLint first
Definition glcorearb.h:399
GLint left
Definition glcorearb.h:1979
GLboolean * data
Definition glcorearb.h:298
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
void check(const std::vector< std::string > &arguments, const std::vector< ConfigParamSpec > &workflowOptions, const std::vector< DeviceSpec > &deviceSpecs, CheckMatrix &matrix)
bool search(DeviceExecution const &execution, std::string const &option, std::string const &argument)
static o2::header::DataHeader::PayloadSizeType getPayloadSize(const DataRef &ref)
static void warnDeadBeef(const o2::header::DataHeader *dh)
Definition RawParser.cxx:45
the main header struct
Definition DataHeader.h:618
SubSpecificationType subSpecification
Definition DataHeader.h:656
static bool checkRDH(const RDHv4 &rdh, bool verbose=true, bool checkZeros=false)
Definition RDHUtils.cxx:133