11#ifndef O2_FRAMEWORK_DATASPECVIEWS_H_
12#define O2_FRAMEWORK_DATASPECVIEWS_H_
14#include <fairmq/FwdDecls.h>
15#include <fairmq/Message.h>
30 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
35 while (mi <
r.size()) {
36 auto* header = o2::header::get<o2::header::DataHeader*>(
r[mi]->GetData());
38 throw std::runtime_error(
"Not a DataHeader");
40 if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
41 count += header->splitPayloadParts;
42 mi += header->splitPayloadParts + 1;
44 count += header->splitPayloadParts ? header->splitPayloadParts : 1;
45 mi += header->splitPayloadParts ? 2 * header->splitPayloadParts : 2;
55 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
60 while (mi <
r.size()) {
61 auto* header = o2::header::get<o2::header::DataHeader*>(
r[mi]->GetData());
62 auto* sih = o2::header::get<o2::framework::SourceInfoHeader*>(
r[mi]->GetData());
63 auto* dih = o2::header::get<o2::framework::DomainInfoHeader*>(
r[mi]->GetData());
64 if (!header && !sih && !dih) {
65 throw std::runtime_error(
"Header information not found");
72 }
else if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
74 mi += header->splitPayloadParts + 1;
76 count += header->splitPayloadParts ? header->splitPayloadParts : 1;
77 mi += header->splitPayloadParts ? 2 * header->splitPayloadParts : 2;
89 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
94 while (mi <
r.size()) {
95 auto* header = o2::header::get<o2::header::DataHeader*>(
r[mi]->GetData());
97 throw std::runtime_error(
"Not a DataHeader");
100 if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
102 count += header->splitPayloadParts;
104 return {mi, mi + 1 + diff};
106 mi += header->splitPayloadParts + 1;
107 }
else if (header->splitPayloadParts > 1 && header->splitPayloadIndex != header->splitPayloadParts) {
110 if (diff < header->splitPayloadParts) {
111 return {mi + 2 * diff, mi + 2 * diff + 1};
113 count += header->splitPayloadParts;
114 mi += 2 * header->splitPayloadParts;
124 throw std::runtime_error(
"Payload not found");
141 template <
typename R>
142 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
146 auto* header = o2::header::get<o2::header::DataHeader*>(
r[hIdx]->GetData());
148 throw std::runtime_error(
"Not a DataHeader");
150 if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
157 size_t nextHIdx = hIdx + header->splitPayloadParts + 1;
158 return {nextHIdx, nextHIdx + 1};
161 return {hIdx + 2, hIdx + 3};
169 template <
typename R>
170 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
175 while (mi <
r.size()) {
176 auto* header = o2::header::get<o2::header::DataHeader*>(
r[mi]->GetData());
178 throw std::runtime_error(
"Not a DataHeader");
180 if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
182 return {mi, mi + 1 + self.
subPart};
185 mi += header->splitPayloadParts + 1;
188 return {mi, mi + self.
subPart + 1};
194 throw std::runtime_error(
"Payload not found");
201 template <
typename R>
202 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
213 template <
typename R>
214 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
225 template <
typename R>
226 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
232 while (mi <
r.size()) {
233 auto* header = o2::header::get<o2::header::DataHeader*>(
r[mi]->GetData());
235 throw std::runtime_error(
"Not a DataHeader");
237 if (header->splitPayloadParts > 1 && (header->splitPayloadIndex == header->splitPayloadParts)) {
241 return header->splitPayloadParts;
245 mi += header->splitPayloadParts + 1;
251 auto pairs = header->splitPayloadParts ? header->splitPayloadParts : 1;
252 if (self.
n <
count + pairs) {
265 template <
typename R>
266 requires requires(
R r) {
requires std::ranges::random_access_range<
decltype(
r.sets)>; }
269 return std::span(
r.sets[self.
slot.
index *
r.inputsPerSlot]);
275 template <
typename R>
276 requires std::ranges::random_access_range<R>
Defining ITS Vertex explicitly as messageable.
friend size_t operator|(R &&r, count_parts self)
friend size_t operator|(R &&r, count_payloads self)
friend DataRefIndices operator|(R &&r, get_dataref_indices self)
friend DataRefIndices operator|(R &&r, get_next_pair self)
friend size_t operator|(R &&r, get_num_payloads self)
friend DataRefIndices operator|(R &&r, get_pair self)
friend auto & operator|(R &&r, get_payload self)