Project
Loading...
Searching...
No Matches
DataModelViews.h
Go to the documentation of this file.
1// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef O2_FRAMEWORK_DATASPECVIEWS_H_
12#define O2_FRAMEWORK_DATASPECVIEWS_H_
13
14#include <fairmq/FwdDecls.h>
15#include <fairmq/Message.h>
16#include "DomainInfoHeader.h"
17#include "SourceInfoHeader.h"
18#include "Headers/DataHeader.h"
19#include "Framework/DataRef.h"
21#include <ranges>
22#include <span>
23
24namespace o2::framework
25{
26
28 // ends the pipeline, returns the container
29 template <typename R>
30 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
31 friend size_t operator|(R&& r, count_payloads self)
32 {
33 size_t count = 0;
34 size_t mi = 0;
35 while (mi < r.size()) {
36 auto* header = o2::header::get<o2::header::DataHeader*>(r[mi]->GetData());
37 if (!header) {
38 throw std::runtime_error("Not a DataHeader");
39 }
40 if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
41 count += header->splitPayloadParts;
42 mi += header->splitPayloadParts + 1;
43 } else {
44 count += header->splitPayloadParts ? header->splitPayloadParts : 1;
45 mi += header->splitPayloadParts ? 2 * header->splitPayloadParts : 2;
46 }
47 }
48 return count;
49 }
50};
51
53 // ends the pipeline, returns the number of parts
54 template <typename R>
55 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
56 friend size_t operator|(R&& r, count_parts self)
57 {
58 size_t count = 0;
59 size_t mi = 0;
60 while (mi < r.size()) {
61 auto* header = o2::header::get<o2::header::DataHeader*>(r[mi]->GetData());
62 auto* sih = o2::header::get<o2::framework::SourceInfoHeader*>(r[mi]->GetData());
63 auto* dih = o2::header::get<o2::framework::DomainInfoHeader*>(r[mi]->GetData());
64 if (!header && !sih && !dih) {
65 throw std::runtime_error("Header information not found");
66 }
67 // We skip oldest possible timeframe / end of stream and not consider it
68 // as actual parts.
69 if (dih || sih) {
70 count += 1;
71 mi += 2;
72 } else if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
73 count += 1;
74 mi += header->splitPayloadParts + 1;
75 } else {
76 count += header->splitPayloadParts ? header->splitPayloadParts : 1;
77 mi += header->splitPayloadParts ? 2 * header->splitPayloadParts : 2;
78 }
79 }
80 return count;
81 }
82};
83
84// DataRefIndices is defined in Framework/DataRef.h
85
86struct get_pair {
87 size_t pairId;
88 template <typename R>
89 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
91 {
92 size_t count = 0;
93 size_t mi = 0;
94 while (mi < r.size()) {
95 auto* header = o2::header::get<o2::header::DataHeader*>(r[mi]->GetData());
96 if (!header) {
97 throw std::runtime_error("Not a DataHeader");
98 }
99 size_t diff = self.pairId - count;
100 if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
101 // New style: one header followed by splitPayloadParts contiguous payloads.
102 count += header->splitPayloadParts;
103 if (self.pairId < count) {
104 return {mi, mi + 1 + diff};
105 }
106 mi += header->splitPayloadParts + 1;
107 } else if (header->splitPayloadParts > 1 && header->splitPayloadIndex != header->splitPayloadParts) {
108 // Old style multi-part: splitPayloadParts [header, payload] pairs.
109 // We are at the first pair of the block; jump directly.
110 if (diff < header->splitPayloadParts) {
111 return {mi + 2 * diff, mi + 2 * diff + 1};
112 }
113 count += header->splitPayloadParts;
114 mi += 2 * header->splitPayloadParts;
115 } else {
116 // Single [header, payload] pair (splitPayloadParts == 0).
117 if (self.pairId == count) {
118 return {mi, mi + 1};
119 }
120 count += 1;
121 mi += 2;
122 }
123 }
124 throw std::runtime_error("Payload not found");
125 }
126};
127
128// Advance from a DataRefIndices to the next one in O(1), reading only the
129// current header. Intended for use in iterators so that ++ is O(1) rather
130// than the O(n) while-loop that get_pair requires.
131//
132// New-style block (splitPayloadIndex == splitPayloadParts > 1):
133// layout: [header, payload_0, payload_1, ..., payload_{N-1}]
134// advance within block while payloads remain, then jump to the next block.
135//
136// Old-style block (splitPayloadIndex != splitPayloadParts, splitPayloadParts > 1)
137// or single pair (splitPayloadParts == 0):
138// layout: [header, payload] – always advance by two messages.
141 template <typename R>
142 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
144 {
145 size_t hIdx = self.current.headerIdx;
146 auto* header = o2::header::get<o2::header::DataHeader*>(r[hIdx]->GetData());
147 if (!header) {
148 throw std::runtime_error("Not a DataHeader");
149 }
150 if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
151 // New-style block: one header followed by splitPayloadParts contiguous payloads.
152 if (self.current.payloadIdx < hIdx + header->splitPayloadParts) {
153 // More sub-payloads remain in this block.
154 return {hIdx, self.current.payloadIdx + 1};
155 }
156 // Last sub-payload consumed; move to the first pair of the next block.
157 size_t nextHIdx = hIdx + header->splitPayloadParts + 1;
158 return {nextHIdx, nextHIdx + 1};
159 }
160 // Old-style [header, payload] pairs or a single pair: advance by two messages.
161 return {hIdx + 2, hIdx + 3};
162 }
163};
164
166 size_t part;
167 size_t subPart;
168 // ends the pipeline, returns the number of parts
169 template <typename R>
170 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
172 {
173 size_t count = 0;
174 size_t mi = 0;
175 while (mi < r.size()) {
176 auto* header = o2::header::get<o2::header::DataHeader*>(r[mi]->GetData());
177 if (!header) {
178 throw std::runtime_error("Not a DataHeader");
179 }
180 if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
181 if (self.part == count) {
182 return {mi, mi + 1 + self.subPart};
183 }
184 count += 1;
185 mi += header->splitPayloadParts + 1;
186 } else {
187 if (self.part == count) {
188 return {mi, mi + self.subPart + 1};
189 }
190 count += 1;
191 mi += 2;
192 }
193 }
194 throw std::runtime_error("Payload not found");
195 }
196};
197
199 size_t id;
200 // ends the pipeline, returns the number of parts
201 template <typename R>
202 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
203 friend auto& operator|(R&& r, get_header self)
204 {
205 return r[(r | get_dataref_indices{self.id, 0}).headerIdx];
206 }
207};
208
210 size_t part;
211 size_t subPart;
212 // ends the pipeline, returns the number of parts
213 template <typename R>
214 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
215 friend auto& operator|(R&& r, get_payload self)
216 {
217 return r[(r | get_dataref_indices{self.part, self.subPart}).payloadIdx];
218 }
219};
220
222 size_t n;
223 // ends the pipeline, returns the number of payloads which are associated
224 // to the multipart n-th sequence of messages found in the range
225 template <typename R>
226 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
227 friend size_t operator|(R&& r, get_num_payloads self)
228 {
229 size_t count = 0;
230 size_t mi = 0;
231 // Un
232 while (mi < r.size()) {
233 auto* header = o2::header::get<o2::header::DataHeader*>(r[mi]->GetData());
234 if (!header) {
235 throw std::runtime_error("Not a DataHeader");
236 }
237 if (header->splitPayloadParts > 1 && (header->splitPayloadIndex == header->splitPayloadParts)) {
238 // This is the case for the new multi payload messages where the number of parts
239 // is as many as the splitPayloadParts number.
240 if (self.n == count) {
241 return header->splitPayloadParts;
242 }
243 // For multipayload we skip all the parts and their associated header
244 count += 1;
245 mi += header->splitPayloadParts + 1;
246 } else {
247 // This is the case of a multipart (header, payload), (header, payload), ...
248 // sequence where we know how many pairs are there.
249 // When splitPayloadParts == 0, it means it is a non-multipart (header, payload)
250 // pair. Each pair has exactly 1 payload.
251 auto pairs = header->splitPayloadParts ? header->splitPayloadParts : 1;
252 if (self.n < count + pairs) {
253 return 1;
254 }
255 count += pairs;
256 mi += 2 * pairs;
257 }
258 }
259 return 0;
260 }
261};
262
265 template <typename R>
266 requires requires(R r) { requires std::ranges::random_access_range<decltype(r.sets)>; }
267 friend auto operator|(R&& r, inputs_for_slot self)
268 {
269 return std::span(r.sets[self.slot.index * r.inputsPerSlot]);
270 }
271};
272
274 size_t inputIdx;
275 template <typename R>
276 requires std::ranges::random_access_range<R>
277 friend std::span<fair::mq::MessagePtr> operator|(R&& r, messages_for_input self)
278 {
279 return std::span(r[self.inputIdx]);
280 }
281};
282
283// FIXME: we should use special index classes in place of size_t
284// FIXME: we need something to substitute a range in the store with another
285
286} // namespace o2::framework
287
288#endif // O2_FRAMEWORK_DATASPECVIEWS_H_
GLint GLsizei count
Definition glcorearb.h:399
GLboolean r
Definition glcorearb.h:1233
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
friend size_t operator|(R &&r, count_parts self)
friend size_t operator|(R &&r, count_payloads self)
friend DataRefIndices operator|(R &&r, get_dataref_indices self)
friend auto & operator|(R &&r, get_header self)
friend DataRefIndices operator|(R &&r, get_next_pair self)
friend size_t operator|(R &&r, get_num_payloads self)
friend DataRefIndices operator|(R &&r, get_pair self)
friend auto & operator|(R &&r, get_payload self)
friend auto operator|(R &&r, inputs_for_slot self)
friend std::span< fair::mq::MessagePtr > operator|(R &&r, messages_for_input self)