Project
Loading...
Searching...
No Matches
DataModelViews.h
Go to the documentation of this file.
1// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#ifndef O2_FRAMEWORK_DATASPECVIEWS_H_
12#define O2_FRAMEWORK_DATASPECVIEWS_H_
13
14#include <fairmq/FwdDecls.h>
15#include <fairmq/Message.h>
16#include "DomainInfoHeader.h"
17#include "SourceInfoHeader.h"
18#include "Headers/DataHeader.h"
19#include <ranges>
20
21namespace o2::framework
22{
23
25 // ends the pipeline, returns the container
26 template <typename R>
27 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
28 friend size_t operator|(R&& r, count_payloads self)
29 {
30 size_t count = 0;
31 size_t mi = 0;
32 while (mi < r.size()) {
33 auto* header = o2::header::get<o2::header::DataHeader*>(r[mi]->GetData());
34 if (!header) {
35 throw std::runtime_error("Not a DataHeader");
36 }
37 if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
38 count += header->splitPayloadParts;
39 mi += header->splitPayloadParts + 1;
40 } else {
41 count += header->splitPayloadParts ? header->splitPayloadParts : 1;
42 mi += header->splitPayloadParts ? 2 * header->splitPayloadParts : 2;
43 }
44 }
45 return count;
46 }
47};
48
50 // ends the pipeline, returns the number of parts
51 template <typename R>
52 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
53 friend size_t operator|(R&& r, count_parts self)
54 {
55 size_t count = 0;
56 size_t mi = 0;
57 while (mi < r.size()) {
58 auto* header = o2::header::get<o2::header::DataHeader*>(r[mi]->GetData());
59 auto* sih = o2::header::get<o2::framework::SourceInfoHeader*>(r[mi]->GetData());
60 auto* dih = o2::header::get<o2::framework::DomainInfoHeader*>(r[mi]->GetData());
61 if (!header && !sih && !dih) {
62 throw std::runtime_error("Header information not found");
63 }
64 // We skip oldest possible timeframe / end of stream and not consider it
65 // as actual parts.
66 if (dih || sih) {
67 count += 1;
68 mi += 2;
69 } else if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
70 count += 1;
71 mi += header->splitPayloadParts + 1;
72 } else {
73 count += header->splitPayloadParts;
74 mi += header->splitPayloadParts ? 2 * header->splitPayloadParts : 2;
75 }
76 }
77 return count;
78 }
79};
80
82 size_t headerIdx;
83 size_t payloadIdx;
84};
85
86struct get_pair {
87 size_t pairId;
88 template <typename R>
89 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
91 {
92 size_t count = 0;
93 size_t mi = 0;
94 while (mi < r.size()) {
95 auto* header = o2::header::get<o2::header::DataHeader*>(r[mi]->GetData());
96 if (!header) {
97 throw std::runtime_error("Not a DataHeader");
98 }
99 size_t diff = self.pairId - count;
100 if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
101 count += header->splitPayloadParts;
102 if (self.pairId < count) {
103 return {mi, mi + 1 + diff};
104 }
105 mi += header->splitPayloadParts + 1;
106 } else {
107 count += header->splitPayloadParts ? header->splitPayloadParts : 1;
108 if (self.pairId < count) {
109 return {mi, mi + 2 * diff + 1};
110 }
111 mi += header->splitPayloadParts ? 2 * header->splitPayloadParts : 2;
112 }
113 }
114 throw std::runtime_error("Payload not found");
115 }
116};
117
119 size_t part;
120 size_t subPart;
121 // ends the pipeline, returns the number of parts
122 template <typename R>
123 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
125 {
126 size_t count = 0;
127 size_t mi = 0;
128 while (mi < r.size()) {
129 auto* header = o2::header::get<o2::header::DataHeader*>(r[mi]->GetData());
130 if (!header) {
131 throw std::runtime_error("Not a DataHeader");
132 }
133 if (header->splitPayloadParts > 1 && header->splitPayloadIndex == header->splitPayloadParts) {
134 if (self.part == count) {
135 return {mi, mi + 1 + self.subPart};
136 }
137 count += 1;
138 mi += header->splitPayloadParts + 1;
139 } else {
140 if (self.part == count) {
141 return {mi, mi + 2 * self.subPart + 1};
142 }
143 count += 1;
144 mi += header->splitPayloadParts ? 2 * header->splitPayloadParts : 2;
145 }
146 }
147 throw std::runtime_error("Payload not found");
148 }
149};
150
152 size_t id;
153 // ends the pipeline, returns the number of parts
154 template <typename R>
155 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
156 friend fair::mq::MessagePtr& operator|(R&& r, get_header self)
157 {
158 return r[(r | get_dataref_indices{self.id, 0}).headerIdx];
159 }
160};
161
163 size_t part;
164 size_t subPart;
165 // ends the pipeline, returns the number of parts
166 template <typename R>
167 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
168 friend fair::mq::MessagePtr& operator|(R&& r, get_payload self)
169 {
170 return r[(r | get_dataref_indices{self.part, self.subPart}).payloadIdx];
171 }
172};
173
175 size_t id;
176 // ends the pipeline, returns the number of parts
177 template <typename R>
178 requires std::ranges::random_access_range<R> && std::ranges::sized_range<R>
179 friend size_t operator|(R&& r, get_num_payloads self)
180 {
181 size_t count = 0;
182 size_t mi = 0;
183 while (mi < r.size()) {
184 auto* header = o2::header::get<o2::header::DataHeader*>(r[mi]->GetData());
185 if (!header) {
186 throw std::runtime_error("Not a DataHeader");
187 }
188 if (self.id == count) {
189 if (header->splitPayloadParts > 1 && (header->splitPayloadIndex == header->splitPayloadParts)) {
190 return header->splitPayloadParts;
191 } else {
192 return 1;
193 }
194 }
195 if (header->splitPayloadParts > 1 && (header->splitPayloadIndex == header->splitPayloadParts)) {
196 count += 1;
197 mi += header->splitPayloadParts + 1;
198 } else {
199 count += 1;
200 mi += header->splitPayloadParts ? 2 * header->splitPayloadParts : 2;
201 }
202 }
203 return 0;
204 }
205};
206
207struct MessageSet;
208
210 std::span<MessageSet> sets;
211 size_t inputsPerSlot = 0;
212};
213
216 template <typename R>
217 requires requires(R r) { std::ranges::random_access_range<decltype(r.sets)>; }
218 friend std::span<o2::framework::MessageSet> operator|(R&& r, inputs_for_slot self)
219 {
220 return std::span(r.sets[self.slot.index * r.inputsPerSlot]);
221 }
222};
223
225 size_t inputIdx;
226 template <typename R>
227 requires std::ranges::random_access_range<R>
228 friend std::span<fair::mq::MessagePtr> operator|(R&& r, messages_for_input self)
229 {
230 return r[self.inputIdx].messages;
231 }
232};
233
234// FIXME: we should use special index classes in place of size_t
235// FIXME: we need something to substitute a range in the store with another
236
237} // namespace o2::framework
238
239#endif // O2_FRAMEWORK_DATASPECVIEWS_H_
GLint GLsizei count
Definition glcorearb.h:399
GLboolean r
Definition glcorearb.h:1233
Defining PrimaryVertex explicitly as messageable.
std::span< MessageSet > sets
friend size_t operator|(R &&r, count_parts self)
friend size_t operator|(R &&r, count_payloads self)
friend DataRefIndices operator|(R &&r, get_dataref_indices self)
friend fair::mq::MessagePtr & operator|(R &&r, get_header self)
friend size_t operator|(R &&r, get_num_payloads self)
friend DataRefIndices operator|(R &&r, get_pair self)
friend fair::mq::MessagePtr & operator|(R &&r, get_payload self)
friend std::span< o2::framework::MessageSet > operator|(R &&r, inputs_for_slot self)
friend std::span< fair::mq::MessagePtr > operator|(R &&r, messages_for_input self)