Project
Loading...
Searching...
No Matches
test_MessageSet.cxx
Go to the documentation of this file.
1// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <fairmq/Message.h>
13#include <fairmq/TransportFactory.h>
16#include "Framework/PartRef.h"
17#include "Headers/Stack.h"
18#include "Headers/DataHeader.h"
20#include <catch_amalgamated.hpp>
21
22using namespace o2::framework;
23
24TEST_CASE("MessageSet")
25{
26 std::vector<fair::mq::MessagePtr> messages;
28 dh.splitPayloadParts = 0;
29 dh.splitPayloadIndex = 0;
31 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
32 fair::mq::MessagePtr payload(transport->CreateMessage());
33 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
34 fair::mq::MessagePtr header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
35 std::unique_ptr<fair::mq::Message> msg2(nullptr);
36 std::vector<fair::mq::MessagePtr> ptrs;
37 ptrs.emplace_back(std::move(header));
38 ptrs.emplace_back(std::move(msg2));
39 for (size_t i = 0; i < 2; ++i) {
40 messages.emplace_back(std::move(ptrs[i]));
41 }
42
43 REQUIRE(messages.size() == 2);
44 REQUIRE((messages | count_payloads{}) == 1);
45 REQUIRE((messages | get_dataref_indices{0, 0}).headerIdx == 0);
46 REQUIRE((messages | get_dataref_indices{0, 0}).payloadIdx == 1);
47 REQUIRE((messages | get_pair{0}).headerIdx == 0);
48 REQUIRE((messages | get_pair{0}).payloadIdx == 1);
49 CHECK_THROWS((messages | get_pair{1}));
50 REQUIRE((messages | get_num_payloads{0}) == 1);
51 REQUIRE((messages | count_parts{}) == 1);
52}
53
54TEST_CASE("MessageSetWithFunction")
55{
56 std::vector<fair::mq::MessagePtr> ptrs;
58 dh.splitPayloadParts = 0;
59 dh.splitPayloadIndex = 0;
61 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
62 fair::mq::MessagePtr payload(transport->CreateMessage());
63 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
64 fair::mq::MessagePtr header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
65 std::unique_ptr<fair::mq::Message> msg2(nullptr);
66 ptrs.emplace_back(std::move(header));
67 ptrs.emplace_back(std::move(msg2));
68 std::vector<fair::mq::MessagePtr> messages;
69 for (size_t i = 0; i < 2; ++i) {
70 messages.emplace_back(std::move(ptrs[i]));
71 }
72
73 REQUIRE(messages.size() == 2);
74 REQUIRE((messages | count_payloads{}) == 1);
75 REQUIRE((messages | get_dataref_indices{0, 0}).headerIdx == 0);
76 REQUIRE((messages | get_dataref_indices{0, 0}).payloadIdx == 1);
77 REQUIRE((messages | get_pair{0}).headerIdx == 0);
78 REQUIRE((messages | get_pair{0}).payloadIdx == 1);
79 CHECK_THROWS((messages | get_pair{1}));
80 REQUIRE((messages | get_num_payloads{0}) == 1);
81 REQUIRE((messages | count_parts{}) == 1);
82}
83
84TEST_CASE("MessageSetWithMultipart")
85{
86 std::vector<fair::mq::MessagePtr> ptrs;
88 dh.splitPayloadParts = 2;
89 dh.splitPayloadIndex = 2;
91 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
92 fair::mq::MessagePtr payload(transport->CreateMessage());
93 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
94 fair::mq::MessagePtr header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
95 std::unique_ptr<fair::mq::Message> msg2(nullptr);
96 std::unique_ptr<fair::mq::Message> msg3(nullptr);
97 ptrs.emplace_back(std::move(header));
98 ptrs.emplace_back(std::move(msg2));
99 ptrs.emplace_back(std::move(msg3));
100 std::vector<fair::mq::MessagePtr> messages;
101 for (size_t i = 0; i < 3; ++i) {
102 messages.emplace_back(std::move(ptrs[i]));
103 }
104
105 REQUIRE(messages.size() == 3);
106 REQUIRE((messages | count_payloads{}) == 2);
107 REQUIRE((messages | get_dataref_indices{0, 0}).headerIdx == 0);
108 REQUIRE((messages | get_dataref_indices{0, 0}).payloadIdx == 1);
109 REQUIRE((messages | get_dataref_indices{0, 1}).headerIdx == 0);
110 REQUIRE((messages | get_dataref_indices{0, 1}).payloadIdx == 2);
111 REQUIRE((messages | get_pair{0}).headerIdx == 0);
112 REQUIRE((messages | get_pair{0}).payloadIdx == 1);
113 REQUIRE((messages | get_pair{1}).headerIdx == 0);
114 REQUIRE((messages | get_pair{1}).payloadIdx == 2);
115 CHECK_THROWS((messages | get_pair{2}));
116 REQUIRE((messages | get_num_payloads{0}) == 2);
117 REQUIRE((messages | count_parts{}) == 1);
118}
119
120TEST_CASE("MessageSetAddPartRef")
121{
122 std::unique_ptr<fair::mq::Message> msg(nullptr);
123 std::unique_ptr<fair::mq::Message> msg2(nullptr);
124 PartRef ref{std::move(msg), std::move(msg2)};
125 std::vector<fair::mq::MessagePtr> messages;
126 messages.emplace_back(std::move(ref.header));
127 messages.emplace_back(std::move(ref.payload));
128
129 REQUIRE(messages.size() == 2);
130}
131
132TEST_CASE("MessageSetAddMultiple")
133{
135 dh1.splitPayloadParts = 0;
136 dh1.splitPayloadIndex = 0;
138 dh2.splitPayloadParts = 1;
139 dh2.splitPayloadIndex = 0;
141 dh3.splitPayloadParts = 2;
142 dh3.splitPayloadIndex = 2;
144 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
145 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
146 fair::mq::MessagePtr header1 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh1, dph});
147 fair::mq::MessagePtr header2 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh2, dph});
148 fair::mq::MessagePtr header3 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh3, dph});
149
150 std::vector<fair::mq::MessagePtr> messages;
151 // part 0: dh1 (splitPayloadParts=0) — standard pair
152 messages.emplace_back(std::move(header1));
153 messages.emplace_back(std::unique_ptr<fair::mq::Message>(nullptr));
154 // part 1: dh2 (splitPayloadParts=1) — traditional split, one pair
155 messages.emplace_back(std::move(header2));
156 messages.emplace_back(std::unique_ptr<fair::mq::Message>(nullptr));
157 // part 2: dh3 (splitPayloadParts=2, splitPayloadIndex=2) — multi-payload, two payloads
158 messages.emplace_back(std::move(header3));
159 messages.emplace_back(std::unique_ptr<fair::mq::Message>(nullptr));
160 messages.emplace_back(std::unique_ptr<fair::mq::Message>(nullptr));
161
162 REQUIRE(messages.size() == 7);
163
164 REQUIRE((messages | count_payloads{}) == 4);
165 REQUIRE((messages | get_dataref_indices{0, 0}).headerIdx == 0);
166 REQUIRE((messages | get_dataref_indices{0, 0}).payloadIdx == 1);
167 REQUIRE((messages | get_dataref_indices{1, 0}).headerIdx == 2);
168 REQUIRE((messages | get_dataref_indices{1, 0}).payloadIdx == 3);
169 REQUIRE((messages | get_dataref_indices{2, 0}).headerIdx == 4);
170 REQUIRE((messages | get_dataref_indices{2, 0}).payloadIdx == 5);
171 REQUIRE((messages | get_dataref_indices{2, 1}).headerIdx == 4);
172 REQUIRE((messages | get_dataref_indices{2, 1}).payloadIdx == 6);
173 REQUIRE((messages | get_pair{0}).headerIdx == 0);
174 REQUIRE((messages | get_pair{0}).payloadIdx == 1);
175 REQUIRE((messages | get_pair{1}).headerIdx == 2);
176 REQUIRE((messages | get_pair{1}).payloadIdx == 3);
177 REQUIRE((messages | get_pair{2}).headerIdx == 4);
178 REQUIRE((messages | get_pair{2}).payloadIdx == 5);
179 REQUIRE((messages | get_pair{3}).headerIdx == 4);
180 REQUIRE((messages | get_pair{3}).payloadIdx == 6);
181 REQUIRE((messages | get_num_payloads{0}) == 1);
182 REQUIRE((messages | get_num_payloads{1}) == 1);
183 REQUIRE((messages | get_num_payloads{2}) == 2);
184 REQUIRE((messages | count_parts{}) == 3);
185 REQUIRE((messages | count_payloads{}) == 4);
186}
187
188TEST_CASE("GetHeaderPayloadOperators")
189{
190 // Validates that get_header{part} / get_payload{part, 0} pipe operators
191 // correctly return the right messages, including access to parts at index > 0.
193 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
194 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
195
196 std::vector<fair::mq::MessagePtr> messages;
197
198 // Add two separate header-payload pairs
199 for (size_t part = 0; part < 2; ++part) {
201 dh.dataDescription = "CLUSTERS";
202 dh.dataOrigin = "TPC";
203 dh.subSpecification = part;
204 dh.splitPayloadParts = 1;
205 dh.splitPayloadIndex = 0;
206 messages.emplace_back(o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph}));
207 messages.emplace_back(transport->CreateMessage(100 + part * 100));
208 }
209
210 REQUIRE(messages.size() == 4);
211
212 // Validate part 0
213 auto& hdr0 = messages | get_header{0};
214 REQUIRE(hdr0.get() != nullptr);
215 auto* dh0 = o2::header::get<o2::header::DataHeader*>(hdr0->GetData());
216 REQUIRE(dh0 != nullptr);
217 REQUIRE(dh0->subSpecification == 0);
218 auto& pl0 = messages | get_payload{0, 0};
219 REQUIRE(pl0.get() != nullptr);
220 REQUIRE(pl0->GetSize() == 100);
221
222 // Validate part 1
223 auto& hdr1 = messages | get_header{1};
224 REQUIRE(hdr1.get() != nullptr);
225 auto* dh1 = o2::header::get<o2::header::DataHeader*>(hdr1->GetData());
226 REQUIRE(dh1 != nullptr);
227 REQUIRE(dh1->subSpecification == 1);
228 auto& pl1 = messages | get_payload{1, 0};
229 REQUIRE(pl1.get() != nullptr);
230 REQUIRE(pl1->GetSize() == 200);
231
232 REQUIRE((messages | count_parts{}) == 2);
233 REQUIRE((messages | count_payloads{}) == 2);
234 REQUIRE((messages | get_pair{0}).headerIdx == 0);
235 REQUIRE((messages | get_pair{0}).payloadIdx == 1);
236 REQUIRE((messages | get_pair{1}).headerIdx == 2);
237 REQUIRE((messages | get_pair{1}).payloadIdx == 3);
238}
239
240TEST_CASE("GetHeaderPayloadMultiPayload")
241{
242 // Validates get_header{part} / get_payload{part, subpart} where both
243 // part and subpart can be non-zero.
244 // Layout:
245 // part 0: standard (1 header + 1 payload) → splitPayloadParts=1
246 // part 1: multi-payload (1 header + 3 payloads) → splitPayloadParts=3, splitPayloadIndex=3
248 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
249 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
250
251 std::vector<fair::mq::MessagePtr> messages;
252
253 // Part 0: standard header-payload pair
254 {
256 dh.dataDescription = "CLUSTERS";
257 dh.dataOrigin = "TPC";
258 dh.subSpecification = 0;
259 dh.splitPayloadParts = 1;
260 dh.splitPayloadIndex = 0;
261 messages.emplace_back(o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph}));
262 messages.emplace_back(transport->CreateMessage(100));
263 }
264
265 // Part 1: one header with 3 payloads (splitPayloadIndex == splitPayloadParts)
266 {
268 dh.dataDescription = "TRACKS";
269 dh.dataOrigin = "TPC";
270 dh.subSpecification = 1;
271 dh.splitPayloadParts = 3;
272 dh.splitPayloadIndex = 3;
273 messages.emplace_back(o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph}));
274 messages.emplace_back(transport->CreateMessage(200));
275 messages.emplace_back(transport->CreateMessage(300));
276 messages.emplace_back(transport->CreateMessage(400));
277 }
278
279 // messages: [hdr0, pl0, hdr1, pl1_0, pl1_1, pl1_2]
280 REQUIRE(messages.size() == 6);
281
282 // Part 0
283 auto& hdr0 = messages | get_header{0};
284 REQUIRE(hdr0.get() != nullptr);
285 auto* dh0 = o2::header::get<o2::header::DataHeader*>(hdr0->GetData());
286 REQUIRE(dh0->subSpecification == 0);
287 auto& pl0 = messages | get_payload{0, 0};
288 REQUIRE(pl0.get() != nullptr);
289 REQUIRE(pl0->GetSize() == 100);
290
291 // Part 1: multi-payload header
292 auto& hdr1 = messages | get_header{1};
293 REQUIRE(hdr1.get() != nullptr);
294 auto* dh1 = o2::header::get<o2::header::DataHeader*>(hdr1->GetData());
295 REQUIRE(dh1->subSpecification == 1);
296
297 auto& pl1_0 = messages | get_payload{1, 0};
298 REQUIRE(pl1_0.get() != nullptr);
299 REQUIRE(pl1_0->GetSize() == 200);
300
301 auto& pl1_1 = messages | get_payload{1, 1};
302 REQUIRE(pl1_1.get() != nullptr);
303 REQUIRE(pl1_1->GetSize() == 300);
304
305 auto& pl1_2 = messages | get_payload{1, 2};
306 REQUIRE(pl1_2.get() != nullptr);
307 REQUIRE(pl1_2->GetSize() == 400);
308
309 REQUIRE((messages | get_num_payloads{0}) == 1);
310 REQUIRE((messages | get_num_payloads{1}) == 3);
311 REQUIRE((messages | count_parts{}) == 2);
312 REQUIRE((messages | count_payloads{}) == 4);
313 REQUIRE((messages | get_pair{0}).headerIdx == 0);
314 REQUIRE((messages | get_pair{0}).payloadIdx == 1);
315 REQUIRE((messages | get_pair{1}).headerIdx == 2);
316 REQUIRE((messages | get_pair{1}).payloadIdx == 3);
317 REQUIRE((messages | get_pair{2}).headerIdx == 2);
318 REQUIRE((messages | get_pair{2}).payloadIdx == 4);
319 REQUIRE((messages | get_pair{3}).headerIdx == 2);
320 REQUIRE((messages | get_pair{3}).payloadIdx == 5);
321}
322
323TEST_CASE("TraditionalSplitParts")
324{
325 // Validates operators with traditional split parts layout:
326 // 3 (header, payload) pairs where splitPayloadParts=3, splitPayloadIndex=0,1,2
327 // Memory layout: [hdr0, pl0, hdr1, pl1, hdr2, pl2]
329 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
330 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
331
332 std::vector<fair::mq::MessagePtr> messages;
333
334 for (size_t i = 0; i < 3; ++i) {
336 dh.dataDescription = "CLUSTERS";
337 dh.dataOrigin = "TPC";
338 dh.subSpecification = 0;
339 dh.splitPayloadParts = 3;
340 dh.splitPayloadIndex = i;
341 messages.emplace_back(o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph}));
342 messages.emplace_back(transport->CreateMessage(100 * (i + 1)));
343 }
344
345 REQUIRE(messages.size() == 6);
346
347 REQUIRE((messages | count_payloads{}) == 3);
348 REQUIRE((messages | count_parts{}) == 3);
349
350 for (size_t i = 0; i < 3; ++i) {
351 auto& hdr = messages | get_header{i};
352 REQUIRE(hdr.get() != nullptr);
353 auto* dh = o2::header::get<o2::header::DataHeader*>(hdr->GetData());
354 REQUIRE(dh != nullptr);
355 REQUIRE(dh->splitPayloadIndex == i);
356
357 auto& pl = messages | get_payload{i, 0};
358 REQUIRE(pl.get() != nullptr);
359 REQUIRE(pl->GetSize() == 100 * (i + 1));
360 }
361
362 for (size_t i = 0; i < 3; ++i) {
363 auto indices = messages | get_dataref_indices{i, 0};
364 REQUIRE(indices.headerIdx == 2 * i);
365 REQUIRE(indices.payloadIdx == 2 * i + 1);
366 }
367
368 for (size_t i = 0; i < 3; ++i) {
369 auto indices = messages | get_pair{i};
370 REQUIRE(indices.headerIdx == 2 * i);
371 REQUIRE(indices.payloadIdx == 2 * i + 1);
372 }
373
374 for (size_t i = 0; i < 3; ++i) {
375 REQUIRE((messages | get_num_payloads{i}) == 1);
376 }
377 REQUIRE((messages | count_parts{}) == 3);
378 REQUIRE((messages | count_payloads{}) == 3);
379}
int32_t i
GLsizei GLenum const void * indices
Definition glcorearb.h:400
Defining PrimaryVertex explicitly as messageable.
Definition Cartesian.h:288
TEST_CASE("test_prepareArguments")
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
Reference to an inflight part.
Definition PartRef.h:24
the main header struct
Definition DataHeader.h:620
SplitPayloadPartsType splitPayloadParts
Definition DataHeader.h:648
DataDescription dataDescription
Definition DataHeader.h:638
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:33
uint64_t const void const *restrict const msg
Definition x9.h:153