11#ifndef O2_FRAMEWORK_DATASPECVIEWS_H_
12#define O2_FRAMEWORK_DATASPECVIEWS_H_
14#include <fairmq/FwdDecls.h>
15#include <fairmq/Message.h>
27 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
32 while (mi <
r.size()) {
33 auto* header = o2::header::get<o2::header::DataHeader*>(
r[mi]->GetData());
35 throw std::runtime_error(
"Not a DataHeader");
37 if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
38 count += header->splitPayloadParts;
39 mi += header->splitPayloadParts + 1;
41 count += header->splitPayloadParts ? header->splitPayloadParts : 1;
42 mi += header->splitPayloadParts ? 2 * header->splitPayloadParts : 2;
52 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
57 while (mi <
r.size()) {
58 auto* header = o2::header::get<o2::header::DataHeader*>(
r[mi]->GetData());
59 auto* sih = o2::header::get<o2::framework::SourceInfoHeader*>(
r[mi]->GetData());
60 auto* dih = o2::header::get<o2::framework::DomainInfoHeader*>(
r[mi]->GetData());
61 if (!header && !sih && !dih) {
62 throw std::runtime_error(
"Header information not found");
69 }
else if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
71 mi += header->splitPayloadParts + 1;
73 count += header->splitPayloadParts;
74 mi += header->splitPayloadParts ? 2 * header->splitPayloadParts : 2;
89 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
94 while (mi <
r.size()) {
95 auto* header = o2::header::get<o2::header::DataHeader*>(
r[mi]->GetData());
97 throw std::runtime_error(
"Not a DataHeader");
100 if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
101 count += header->splitPayloadParts;
103 return {mi, mi + 1 + diff};
105 mi += header->splitPayloadParts + 1;
107 count += header->splitPayloadParts ? header->splitPayloadParts : 1;
109 return {mi, mi + 2 * diff + 1};
111 mi += header->splitPayloadParts ? 2 * header->splitPayloadParts : 2;
114 throw std::runtime_error(
"Payload not found");
122 template <
typename R>
123 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
128 while (mi <
r.size()) {
129 auto* header = o2::header::get<o2::header::DataHeader*>(
r[mi]->GetData());
131 throw std::runtime_error(
"Not a DataHeader");
133 if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
135 return {mi, mi + 1 + self.
subPart};
138 mi += header->splitPayloadParts + 1;
141 return {mi, mi + 2 * self.
subPart + 1};
144 mi += header->splitPayloadParts ? 2 * header->splitPayloadParts : 2;
147 throw std::runtime_error(
"Payload not found");
154 template <
typename R>
155 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
166 template <
typename R>
167 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
177 template <
typename R>
178 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
183 while (mi <
r.size()) {
184 auto* header = o2::header::get<o2::header::DataHeader*>(
r[mi]->GetData());
186 throw std::runtime_error(
"Not a DataHeader");
189 if (header->splitPayloadParts > 1 && (header->splitPayloadIndex == header->splitPayloadParts)) {
190 return header->splitPayloadParts;
195 if (header->splitPayloadParts > 1 && (header->splitPayloadIndex == header->splitPayloadParts)) {
197 mi += header->splitPayloadParts + 1;
200 mi += header->splitPayloadParts ? 2 * header->splitPayloadParts : 2;
216 template <
typename R>
217 requires requires(
R r) { std::ranges::random_access_range<
decltype(
r.sets)>; }
220 return std::span(
r.sets[self.
slot.
index *
r.inputsPerSlot]);
226 template <
typename R>
227 requires std::ranges::random_access_range<R>
Defining PrimaryVertex explicitly as messageable.
std::span< MessageSet > sets
friend size_t operator|(R &&r, count_parts self)
friend size_t operator|(R &&r, count_payloads self)
friend DataRefIndices operator|(R &&r, get_dataref_indices self)
friend size_t operator|(R &&r, get_num_payloads self)
friend DataRefIndices operator|(R &&r, get_pair self)
friend fair::mq::MessagePtr & operator|(R &&r, get_payload self)