Project
Loading...
Searching...
No Matches
DataModelViews.h
Go to the documentation of this file.
1// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef O2_FRAMEWORK_DATASPECVIEWS_H_
12#define O2_FRAMEWORK_DATASPECVIEWS_H_
13
14#include <fairmq/FwdDecls.h>
15#include <fairmq/Message.h>
16#include "DomainInfoHeader.h"
17#include "SourceInfoHeader.h"
18#include "Headers/DataHeader.h"
20#include <ranges>
21#include <span>
22
23namespace o2::framework
24{
25
27 // ends the pipeline, returns the container
28 template <typename R>
29 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
30 friend size_t operator|(R&& r, count_payloads self)
31 {
32 size_t count = 0;
33 size_t mi = 0;
34 while (mi < r.size()) {
35 auto* header = o2::header::get<o2::header::DataHeader*>(r[mi]->GetData());
36 if (!header) {
37 throw std::runtime_error("Not a DataHeader");
38 }
39 if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
40 count += header->splitPayloadParts;
41 mi += header->splitPayloadParts + 1;
42 } else {
43 count += header->splitPayloadParts ? header->splitPayloadParts : 1;
44 mi += header->splitPayloadParts ? 2 * header->splitPayloadParts : 2;
45 }
46 }
47 return count;
48 }
49};
50
52 // ends the pipeline, returns the number of parts
53 template <typename R>
54 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
55 friend size_t operator|(R&& r, count_parts self)
56 {
57 size_t count = 0;
58 size_t mi = 0;
59 while (mi < r.size()) {
60 auto* header = o2::header::get<o2::header::DataHeader*>(r[mi]->GetData());
61 auto* sih = o2::header::get<o2::framework::SourceInfoHeader*>(r[mi]->GetData());
62 auto* dih = o2::header::get<o2::framework::DomainInfoHeader*>(r[mi]->GetData());
63 if (!header && !sih && !dih) {
64 throw std::runtime_error("Header information not found");
65 }
66 // We skip oldest possible timeframe / end of stream and not consider it
67 // as actual parts.
68 if (dih || sih) {
69 count += 1;
70 mi += 2;
71 } else if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
72 count += 1;
73 mi += header->splitPayloadParts + 1;
74 } else {
75 count += header->splitPayloadParts ? header->splitPayloadParts : 1;
76 mi += header->splitPayloadParts ? 2 * header->splitPayloadParts : 2;
77 }
78 }
79 return count;
80 }
81};
82
84 size_t headerIdx;
85 size_t payloadIdx;
86};
87
88struct get_pair {
89 size_t pairId;
90 template <typename R>
91 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
93 {
94 size_t count = 0;
95 size_t mi = 0;
96 while (mi < r.size()) {
97 auto* header = o2::header::get<o2::header::DataHeader*>(r[mi]->GetData());
98 if (!header) {
99 throw std::runtime_error("Not a DataHeader");
100 }
101 size_t diff = self.pairId - count;
102 if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
103 // New style: one header followed by splitPayloadParts contiguous payloads.
104 count += header->splitPayloadParts;
105 if (self.pairId < count) {
106 return {mi, mi + 1 + diff};
107 }
108 mi += header->splitPayloadParts + 1;
109 } else if (header->splitPayloadParts > 1 && header->splitPayloadIndex != header->splitPayloadParts) {
110 // Old style multi-part: splitPayloadParts [header, payload] pairs.
111 // We are at the first pair of the block; jump directly.
112 if (diff < header->splitPayloadParts) {
113 return {mi + 2 * diff, mi + 2 * diff + 1};
114 }
115 count += header->splitPayloadParts;
116 mi += 2 * header->splitPayloadParts;
117 } else {
118 // Single [header, payload] pair (splitPayloadParts == 0).
119 if (self.pairId == count) {
120 return {mi, mi + 1};
121 }
122 count += 1;
123 mi += 2;
124 }
125 }
126 throw std::runtime_error("Payload not found");
127 }
128};
129
131 size_t part;
132 size_t subPart;
133 // ends the pipeline, returns the number of parts
134 template <typename R>
135 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
137 {
138 size_t count = 0;
139 size_t mi = 0;
140 while (mi < r.size()) {
141 auto* header = o2::header::get<o2::header::DataHeader*>(r[mi]->GetData());
142 if (!header) {
143 throw std::runtime_error("Not a DataHeader");
144 }
145 if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
146 if (self.part == count) {
147 return {mi, mi + 1 + self.subPart};
148 }
149 count += 1;
150 mi += header->splitPayloadParts + 1;
151 } else {
152 if (self.part == count) {
153 return {mi, mi + self.subPart + 1};
154 }
155 count += 1;
156 mi += 2;
157 }
158 }
159 throw std::runtime_error("Payload not found");
160 }
161};
162
164 size_t id;
165 // ends the pipeline, returns the number of parts
166 template <typename R>
167 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
168 friend auto& operator|(R&& r, get_header self)
169 {
170 return r[(r | get_dataref_indices{self.id, 0}).headerIdx];
171 }
172};
173
175 size_t part;
176 size_t subPart;
177 // ends the pipeline, returns the number of parts
178 template <typename R>
179 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
180 friend auto& operator|(R&& r, get_payload self)
181 {
182 return r[(r | get_dataref_indices{self.part, self.subPart}).payloadIdx];
183 }
184};
185
187 size_t n;
188 // ends the pipeline, returns the number of payloads which are associated
189 // to the multipart n-th sequence of messages found in the range
190 template <typename R>
191 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
192 friend size_t operator|(R&& r, get_num_payloads self)
193 {
194 size_t count = 0;
195 size_t mi = 0;
196 // Un
197 while (mi < r.size()) {
198 auto* header = o2::header::get<o2::header::DataHeader*>(r[mi]->GetData());
199 if (!header) {
200 throw std::runtime_error("Not a DataHeader");
201 }
202 if (header->splitPayloadParts > 1 && (header->splitPayloadIndex == header->splitPayloadParts)) {
203 // This is the case for the new multi payload messages where the number of parts
204 // is as many as the splitPayloadParts number.
205 if (self.n == count) {
206 return header->splitPayloadParts;
207 }
208 // For multipayload we skip all the parts and their associated header
209 count += 1;
210 mi += header->splitPayloadParts + 1;
211 } else {
212 // This is the case of a multipart (header, payload), (header, payload), ...
213 // sequence where we know how many pairs are there.
214 // When splitPayloadParts == 0, it means it is a non-multipart (header, payload)
215 // pair. Each pair has exactly 1 payload.
216 auto pairs = header->splitPayloadParts ? header->splitPayloadParts : 1;
217 if (self.n < count + pairs) {
218 return 1;
219 }
220 count += pairs;
221 mi += 2 * pairs;
222 }
223 }
224 return 0;
225 }
226};
227
230 template <typename R>
231 requires requires(R r) { requires std::ranges::random_access_range<decltype(r.sets)>; }
232 friend auto operator|(R&& r, inputs_for_slot self)
233 {
234 return std::span(r.sets[self.slot.index * r.inputsPerSlot]);
235 }
236};
237
239 size_t inputIdx;
240 template <typename R>
241 requires std::ranges::random_access_range<R>
242 friend std::span<fair::mq::MessagePtr> operator|(R&& r, messages_for_input self)
243 {
244 return std::span(r[self.inputIdx]);
245 }
246};
247
248// FIXME: we should use special index classes in place of size_t
249// FIXME: we need something to substitute a range in the store with another
250
251} // namespace o2::framework
252
253#endif // O2_FRAMEWORK_DATASPECVIEWS_H_
GLint GLsizei count
Definition glcorearb.h:399
GLboolean r
Definition glcorearb.h:1233
Defining PrimaryVertex explicitly as messageable.
Definition Cartesian.h:288
friend size_t operator|(R &&r, count_parts self)
friend size_t operator|(R &&r, count_payloads self)
friend DataRefIndices operator|(R &&r, get_dataref_indices self)
friend auto & operator|(R &&r, get_header self)
friend size_t operator|(R &&r, get_num_payloads self)
friend DataRefIndices operator|(R &&r, get_pair self)
friend auto & operator|(R &&r, get_payload self)
friend auto operator|(R &&r, inputs_for_slot self)
friend std::span< fair::mq::MessagePtr > operator|(R &&r, messages_for_input self)