26 std::vector<fair::mq::MessagePtr> messages;
29 dh.splitPayloadIndex = 0;
31 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
32 fair::mq::MessagePtr payload(transport->CreateMessage());
33 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
35 std::unique_ptr<fair::mq::Message> msg2(
nullptr);
36 std::vector<fair::mq::MessagePtr> ptrs;
37 ptrs.emplace_back(std::move(header));
38 ptrs.emplace_back(std::move(msg2));
39 for (
size_t i = 0;
i < 2; ++
i) {
40 messages.emplace_back(std::move(ptrs[
i]));
43 REQUIRE(messages.size() == 2);
47 REQUIRE((messages |
get_pair{0}).headerIdx == 0);
48 REQUIRE((messages |
get_pair{0}).payloadIdx == 1);
49 CHECK_THROWS((messages |
get_pair{1}));
56 std::vector<fair::mq::MessagePtr> ptrs;
59 dh.splitPayloadIndex = 0;
61 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
62 fair::mq::MessagePtr payload(transport->CreateMessage());
63 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
65 std::unique_ptr<fair::mq::Message> msg2(
nullptr);
66 ptrs.emplace_back(std::move(header));
67 ptrs.emplace_back(std::move(msg2));
68 std::vector<fair::mq::MessagePtr> messages;
69 for (
size_t i = 0;
i < 2; ++
i) {
70 messages.emplace_back(std::move(ptrs[
i]));
73 REQUIRE(messages.size() == 2);
77 REQUIRE((messages |
get_pair{0}).headerIdx == 0);
78 REQUIRE((messages |
get_pair{0}).payloadIdx == 1);
79 CHECK_THROWS((messages |
get_pair{1}));
86 std::vector<fair::mq::MessagePtr> ptrs;
89 dh.splitPayloadIndex = 2;
91 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
92 fair::mq::MessagePtr payload(transport->CreateMessage());
93 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
95 std::unique_ptr<fair::mq::Message> msg2(
nullptr);
96 std::unique_ptr<fair::mq::Message> msg3(
nullptr);
97 ptrs.emplace_back(std::move(header));
98 ptrs.emplace_back(std::move(msg2));
99 ptrs.emplace_back(std::move(msg3));
100 std::vector<fair::mq::MessagePtr> messages;
101 for (
size_t i = 0;
i < 3; ++
i) {
102 messages.emplace_back(std::move(ptrs[
i]));
105 REQUIRE(messages.size() == 3);
111 REQUIRE((messages |
get_pair{0}).headerIdx == 0);
112 REQUIRE((messages |
get_pair{0}).payloadIdx == 1);
113 REQUIRE((messages |
get_pair{1}).headerIdx == 0);
114 REQUIRE((messages |
get_pair{1}).payloadIdx == 2);
115 CHECK_THROWS((messages |
get_pair{2}));
122 std::unique_ptr<fair::mq::Message>
msg(
nullptr);
123 std::unique_ptr<fair::mq::Message> msg2(
nullptr);
125 std::vector<fair::mq::MessagePtr> messages;
126 messages.emplace_back(std::move(
ref.header));
127 messages.emplace_back(std::move(
ref.payload));
129 REQUIRE(messages.size() == 2);
136 dh1.splitPayloadIndex = 0;
139 dh2.splitPayloadIndex = 0;
142 dh3.splitPayloadIndex = 2;
144 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
145 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
150 std::vector<fair::mq::MessagePtr> messages;
152 messages.emplace_back(std::move(header1));
153 messages.emplace_back(std::unique_ptr<fair::mq::Message>(
nullptr));
155 messages.emplace_back(std::move(header2));
156 messages.emplace_back(std::unique_ptr<fair::mq::Message>(
nullptr));
158 messages.emplace_back(std::move(header3));
159 messages.emplace_back(std::unique_ptr<fair::mq::Message>(
nullptr));
160 messages.emplace_back(std::unique_ptr<fair::mq::Message>(
nullptr));
162 REQUIRE(messages.size() == 7);
173 REQUIRE((messages |
get_pair{0}).headerIdx == 0);
174 REQUIRE((messages |
get_pair{0}).payloadIdx == 1);
175 REQUIRE((messages |
get_pair{1}).headerIdx == 2);
176 REQUIRE((messages |
get_pair{1}).payloadIdx == 3);
177 REQUIRE((messages |
get_pair{2}).headerIdx == 4);
178 REQUIRE((messages |
get_pair{2}).payloadIdx == 5);
179 REQUIRE((messages |
get_pair{3}).headerIdx == 4);
180 REQUIRE((messages |
get_pair{3}).payloadIdx == 6);
193 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
194 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
196 std::vector<fair::mq::MessagePtr> messages;
199 for (
size_t part = 0; part < 2; ++part) {
202 dh.dataOrigin =
"TPC";
203 dh.subSpecification = part;
204 dh.splitPayloadParts = 1;
205 dh.splitPayloadIndex = 0;
207 messages.emplace_back(transport->CreateMessage(100 + part * 100));
210 REQUIRE(messages.size() == 4);
214 REQUIRE(hdr0.get() !=
nullptr);
215 auto* dh0 = o2::header::get<o2::header::DataHeader*>(hdr0->GetData());
216 REQUIRE(dh0 !=
nullptr);
217 REQUIRE(dh0->subSpecification == 0);
219 REQUIRE(pl0.get() !=
nullptr);
220 REQUIRE(pl0->GetSize() == 100);
224 REQUIRE(hdr1.get() !=
nullptr);
225 auto* dh1 = o2::header::get<o2::header::DataHeader*>(hdr1->GetData());
226 REQUIRE(dh1 !=
nullptr);
227 REQUIRE(dh1->subSpecification == 1);
229 REQUIRE(pl1.get() !=
nullptr);
230 REQUIRE(pl1->GetSize() == 200);
234 REQUIRE((messages |
get_pair{0}).headerIdx == 0);
235 REQUIRE((messages |
get_pair{0}).payloadIdx == 1);
236 REQUIRE((messages |
get_pair{1}).headerIdx == 2);
237 REQUIRE((messages |
get_pair{1}).payloadIdx == 3);
248 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
249 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
251 std::vector<fair::mq::MessagePtr> messages;
257 dh.dataOrigin =
"TPC";
258 dh.subSpecification = 0;
259 dh.splitPayloadParts = 1;
260 dh.splitPayloadIndex = 0;
262 messages.emplace_back(transport->CreateMessage(100));
269 dh.dataOrigin =
"TPC";
270 dh.subSpecification = 1;
271 dh.splitPayloadParts = 3;
272 dh.splitPayloadIndex = 3;
274 messages.emplace_back(transport->CreateMessage(200));
275 messages.emplace_back(transport->CreateMessage(300));
276 messages.emplace_back(transport->CreateMessage(400));
280 REQUIRE(messages.size() == 6);
284 REQUIRE(hdr0.get() !=
nullptr);
285 auto* dh0 = o2::header::get<o2::header::DataHeader*>(hdr0->GetData());
286 REQUIRE(dh0->subSpecification == 0);
288 REQUIRE(pl0.get() !=
nullptr);
289 REQUIRE(pl0->GetSize() == 100);
293 REQUIRE(hdr1.get() !=
nullptr);
294 auto* dh1 = o2::header::get<o2::header::DataHeader*>(hdr1->GetData());
295 REQUIRE(dh1->subSpecification == 1);
298 REQUIRE(pl1_0.get() !=
nullptr);
299 REQUIRE(pl1_0->GetSize() == 200);
302 REQUIRE(pl1_1.get() !=
nullptr);
303 REQUIRE(pl1_1->GetSize() == 300);
306 REQUIRE(pl1_2.get() !=
nullptr);
307 REQUIRE(pl1_2->GetSize() == 400);
313 REQUIRE((messages |
get_pair{0}).headerIdx == 0);
314 REQUIRE((messages |
get_pair{0}).payloadIdx == 1);
315 REQUIRE((messages |
get_pair{1}).headerIdx == 2);
316 REQUIRE((messages |
get_pair{1}).payloadIdx == 3);
317 REQUIRE((messages |
get_pair{2}).headerIdx == 2);
318 REQUIRE((messages |
get_pair{2}).payloadIdx == 4);
319 REQUIRE((messages |
get_pair{3}).headerIdx == 2);
320 REQUIRE((messages |
get_pair{3}).payloadIdx == 5);
329 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
330 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
332 std::vector<fair::mq::MessagePtr> messages;
334 for (
size_t i = 0;
i < 3; ++
i) {
337 dh.dataOrigin =
"TPC";
338 dh.subSpecification = 0;
339 dh.splitPayloadParts = 3;
340 dh.splitPayloadIndex =
i;
342 messages.emplace_back(transport->CreateMessage(100 * (
i + 1)));
345 REQUIRE(messages.size() == 6);
350 for (
size_t i = 0;
i < 3; ++
i) {
352 REQUIRE(hdr.get() !=
nullptr);
353 auto* dh = o2::header::get<o2::header::DataHeader*>(hdr->GetData());
354 REQUIRE(dh !=
nullptr);
355 REQUIRE(dh->splitPayloadIndex ==
i);
358 REQUIRE(pl.get() !=
nullptr);
359 REQUIRE(pl->GetSize() == 100 * (
i + 1));
362 for (
size_t i = 0;
i < 3; ++
i) {
364 REQUIRE(
indices.headerIdx == 2 *
i);
365 REQUIRE(
indices.payloadIdx == 2 *
i + 1);
368 for (
size_t i = 0;
i < 3; ++
i) {
370 REQUIRE(
indices.headerIdx == 2 *
i);
371 REQUIRE(
indices.payloadIdx == 2 *
i + 1);
374 for (
size_t i = 0;
i < 3; ++
i) {