Project
Loading...
Searching...
No Matches
test_ForwardInputs.cxx
Go to the documentation of this file.
1// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <catch_amalgamated.hpp>
13#include "Headers/DataHeader.h"
18#include "Framework/Signpost.h"
21#include "Headers/Stack.h"
23#include <fairmq/TransportFactory.h>
24#include <fairmq/Channel.h>
25#include <vector>
26
28using namespace o2::framework;
29
30TEST_CASE("ForwardInputsEmpty")
31{
33 dh.dataDescription = "CLUSTERS";
34 dh.dataOrigin = "TPC";
35 dh.subSpecification = 0;
36 dh.splitPayloadIndex = 0;
37 dh.splitPayloadParts = 1;
38
40 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
41
42 bool consume = true;
43 bool copyByDefault = true;
45
46 std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
47
48 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
49 REQUIRE(result.empty());
50}
51
52TEST_CASE("ForwardInputsSingleMessageSingleRoute")
53{
55 dh.dataOrigin = "TST";
56 dh.dataDescription = "A";
57 dh.subSpecification = 0;
58 dh.splitPayloadIndex = 0;
59 dh.splitPayloadParts = 1;
60
62 std::vector<fair::mq::Channel> channels{
63 fair::mq::Channel("from_A_to_B")};
64
65 bool consume = true;
66 bool copyByDefault = true;
68 std::vector<ForwardRoute> routes{ForwardRoute{
69 .timeslice = 0,
70 .maxTimeslices = 1,
71 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
72 .channel = "from_A_to_B",
73 .policy = nullptr,
74 }};
75
76 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
77 for (auto& channel : channels) {
78 if (channel.GetName() == channelName) {
79 return channel;
80 }
81 }
82 throw std::runtime_error("Channel not found");
83 };
84
85 proxy.bind({}, {}, routes, findChannelByName, nullptr);
86
87 std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
88 std::vector<fair::mq::MessagePtr> messageSet;
89
90 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
91 fair::mq::MessagePtr payload(transport->CreateMessage());
92 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
93 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
94 messageSet.emplace_back(std::move(header));
95 messageSet.emplace_back(std::move(payload));
96 REQUIRE((messageSet | count_parts{}) == 1);
97 currentSetOfInputs.emplace_back(std::move(messageSet));
98
99 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
100 REQUIRE(result.size() == 1); // One route
101 REQUIRE(result[0].Size() == 2); // Two messages for that route
102}
103
104TEST_CASE("ForwardInputsSingleMessageSingleRouteNoConsume")
105{
107 dh.dataOrigin = "TST";
108 dh.dataDescription = "A";
109 dh.subSpecification = 0;
110 dh.splitPayloadIndex = 0;
111 dh.splitPayloadParts = 1;
112
114 std::vector<fair::mq::Channel> channels{
115 fair::mq::Channel("from_A_to_B")};
116
117 bool copyByDefault = false;
118 FairMQDeviceProxy proxy;
119 std::vector<ForwardRoute> routes{ForwardRoute{
120 .timeslice = 0,
121 .maxTimeslices = 1,
122 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
123 .channel = "from_A_to_B",
124 .policy = nullptr,
125 }};
126
127 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
128 for (auto& channel : channels) {
129 if (channel.GetName() == channelName) {
130 return channel;
131 }
132 }
133 throw std::runtime_error("Channel not found");
134 };
135
136 proxy.bind({}, {}, routes, findChannelByName, nullptr);
137
138 std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
139 std::vector<fair::mq::MessagePtr> messageSet;
140
141 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
142 fair::mq::MessagePtr payload(nullptr);
143 REQUIRE(payload.get() == nullptr);
144 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
145 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
146 messageSet.emplace_back(std::move(header));
147 messageSet.emplace_back(std::move(payload));
148 REQUIRE((messageSet | count_parts{}) == 1);
149 currentSetOfInputs.emplace_back(std::move(messageSet));
150
151 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, true);
152 REQUIRE(result.size() == 1);
153 REQUIRE(result[0].Size() == 0); // Because there is a nullptr, we do not forward this as it was already consumed.
154}
155
156TEST_CASE("ForwardInputsSingleMessageSingleRouteAtEOS")
157{
159 dh.dataOrigin = "TST";
160 dh.dataDescription = "A";
161 dh.subSpecification = 0;
162 dh.splitPayloadIndex = 0;
163 dh.splitPayloadParts = 1;
164
166
168
169 std::vector<fair::mq::Channel> channels{
170 fair::mq::Channel("from_A_to_B")};
171
172 bool consume = true;
173 bool copyByDefault = true;
174 FairMQDeviceProxy proxy;
175 std::vector<ForwardRoute> routes{ForwardRoute{
176 .timeslice = 0,
177 .maxTimeslices = 1,
178 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
179 .channel = "from_A_to_B",
180 .policy = nullptr,
181 }};
182
183 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
184 for (auto& channel : channels) {
185 if (channel.GetName() == channelName) {
186 return channel;
187 }
188 }
189 throw std::runtime_error("Channel not found");
190 };
191
192 proxy.bind({}, {}, routes, findChannelByName, nullptr);
193
194 std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
195 std::vector<fair::mq::MessagePtr> messageSet;
196
197 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
198 fair::mq::MessagePtr payload(transport->CreateMessage());
199 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
200 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph, sih});
201 REQUIRE(o2::header::get<SourceInfoHeader*>(header->GetData()));
202 messageSet.emplace_back(std::move(header));
203 messageSet.emplace_back(std::move(payload));
204 REQUIRE((messageSet | count_parts{}) == 1);
205 currentSetOfInputs.emplace_back(std::move(messageSet));
206
207 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
208 REQUIRE(result.size() == 1); // One route
209 REQUIRE(result[0].Size() == 0); // FIXME: this is an actual error. It should be 2. However it cannot really happen.
210 // Correct behavior below:
211 // REQUIRE(result[0].Size() == 2);
212 // REQUIRE(o2::header::get<SourceInfoHeader*>(result[0].At(0)->GetData()) == nullptr);
213}
214
215TEST_CASE("ForwardInputsSingleMessageSingleRouteWithOldestPossible")
216{
218 dh.dataOrigin = "TST";
219 dh.dataDescription = "A";
220 dh.subSpecification = 0;
221 dh.splitPayloadIndex = 0;
222 dh.splitPayloadParts = 1;
223
225
227
228 std::vector<fair::mq::Channel> channels{
229 fair::mq::Channel("from_A_to_B")};
230
231 bool consume = true;
232 bool copyByDefault = true;
233 FairMQDeviceProxy proxy;
234 std::vector<ForwardRoute> routes{ForwardRoute{
235 .timeslice = 0,
236 .maxTimeslices = 1,
237 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
238 .channel = "from_A_to_B",
239 .policy = nullptr,
240 }};
241
242 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
243 for (auto& channel : channels) {
244 if (channel.GetName() == channelName) {
245 return channel;
246 }
247 }
248 throw std::runtime_error("Channel not found");
249 };
250
251 proxy.bind({}, {}, routes, findChannelByName, nullptr);
252
253 std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
254 std::vector<fair::mq::MessagePtr> messageSet;
255
256 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
257 fair::mq::MessagePtr payload(transport->CreateMessage());
258 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
259 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph, dih});
260 REQUIRE(o2::header::get<DomainInfoHeader*>(header->GetData()));
261 messageSet.emplace_back(std::move(header));
262 messageSet.emplace_back(std::move(payload));
263 REQUIRE((messageSet | count_parts{}) == 1);
264 currentSetOfInputs.emplace_back(std::move(messageSet));
265
266 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
267 REQUIRE(result.size() == 1); // One route
268 REQUIRE(result[0].Size() == 0); // FIXME: this is actually wrong
269 // FIXME: actually correct behavior below
270 // REQUIRE(result[0].Size() == 2); // Two messages
271 // REQUIRE(o2::header::get<DomainInfoHeader*>(result[0].At(0)->GetData()) == nullptr); // it should not have the end of stream
272}
273
274TEST_CASE("ForwardInputsSingleMessageMultipleRoutes")
275{
277 dh.dataOrigin = "TST";
278 dh.dataDescription = "A";
279 dh.subSpecification = 0;
280 dh.splitPayloadIndex = 0;
281 dh.splitPayloadParts = 1;
282
284
285 std::vector<fair::mq::Channel> channels{
286 fair::mq::Channel("from_A_to_B"),
287 fair::mq::Channel("from_A_to_C"),
288 };
289
290 bool consume = true;
291 bool copyByDefault = true;
292 FairMQDeviceProxy proxy;
293 std::vector<ForwardRoute> routes{
295 .timeslice = 0,
296 .maxTimeslices = 1,
297 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
298 .channel = "from_A_to_B",
299 .policy = nullptr,
300 },
302 .timeslice = 0,
303 .maxTimeslices = 1,
304 .matcher = {"binding2", ConcreteDataMatcher{"TST", "A", 0}},
305 .channel = "from_A_to_C",
306 .policy = nullptr,
307 }};
308
309 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
310 for (auto& channel : channels) {
311 if (channel.GetName() == channelName) {
312 return channel;
313 }
314 }
315 throw std::runtime_error("Channel not found");
316 };
317
318 proxy.bind({}, {}, routes, findChannelByName, nullptr);
319
320 std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
321 std::vector<fair::mq::MessagePtr> messageSet;
322
323 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
324 fair::mq::MessagePtr payload(transport->CreateMessage());
325 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
326 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
327 messageSet.emplace_back(std::move(header));
328 messageSet.emplace_back(std::move(payload));
329 REQUIRE((messageSet | count_parts{}) == 1);
330 currentSetOfInputs.emplace_back(std::move(messageSet));
331
332 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
333 REQUIRE(result.size() == 2); // Two routes
334 REQUIRE(result[0].Size() == 2); // Two messages per route
335 REQUIRE(result[1].Size() == 0); // Only the first DPL matched channel matters
336}
337
338TEST_CASE("ForwardInputsSingleMessageMultipleRoutesExternals")
339{
341 dh.dataOrigin = "TST";
342 dh.dataDescription = "A";
343 dh.subSpecification = 0;
344 dh.splitPayloadIndex = 0;
345 dh.splitPayloadParts = 1;
346
348
349 std::vector<fair::mq::Channel> channels{
350 fair::mq::Channel("external"),
351 fair::mq::Channel("from_A_to_C"),
352 };
353
354 bool consume = true;
355 bool copyByDefault = true;
356 FairMQDeviceProxy proxy;
357 std::vector<ForwardRoute> routes{
359 .timeslice = 0,
360 .maxTimeslices = 1,
361 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
362 .channel = "external",
363 .policy = nullptr,
364 },
366 .timeslice = 0,
367 .maxTimeslices = 1,
368 .matcher = {"binding2", ConcreteDataMatcher{"TST", "A", 0}},
369 .channel = "from_A_to_C",
370 .policy = nullptr,
371 }};
372
373 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
374 for (auto& channel : channels) {
375 if (channel.GetName() == channelName) {
376 return channel;
377 }
378 }
379 throw std::runtime_error("Channel not found");
380 };
381
382 proxy.bind({}, {}, routes, findChannelByName, nullptr);
383
384 std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
385 std::vector<fair::mq::MessagePtr> messageSet;
386
387 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
388 fair::mq::MessagePtr payload(transport->CreateMessage());
389 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
390 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
391 messageSet.emplace_back(std::move(header));
392 messageSet.emplace_back(std::move(payload));
393 REQUIRE((messageSet | count_parts{}) == 1);
394 currentSetOfInputs.emplace_back(std::move(messageSet));
395
396 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
397 REQUIRE(result.size() == 2); // Two routes
398 REQUIRE(result[0].Size() == 2); // With external matching channels, we need to copy and then forward
399 REQUIRE(result[1].Size() == 2); //
400}
401
402TEST_CASE("ForwardInputsMultiMessageMultipleRoutes")
403{
405 dh1.dataOrigin = "TST";
406 dh1.dataDescription = "A";
407 dh1.subSpecification = 0;
408 dh1.splitPayloadIndex = 0;
409 dh1.splitPayloadParts = 1;
410
412 dh2.dataOrigin = "TST";
413 dh2.dataDescription = "B";
414 dh2.subSpecification = 0;
415 dh2.splitPayloadIndex = 0;
416 dh2.splitPayloadParts = 1;
417
419
420 std::vector<fair::mq::Channel> channels{
421 fair::mq::Channel("from_A_to_B"),
422 fair::mq::Channel("from_A_to_C"),
423 };
424
425 bool consume = true;
426 bool copyByDefault = true;
427 FairMQDeviceProxy proxy;
428 std::vector<ForwardRoute> routes{
430 .timeslice = 0,
431 .maxTimeslices = 1,
432 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
433 .channel = "from_A_to_B",
434 .policy = nullptr,
435 },
437 .timeslice = 0,
438 .maxTimeslices = 1,
439 .matcher = {"binding2", ConcreteDataMatcher{"TST", "B", 0}},
440 .channel = "from_A_to_C",
441 .policy = nullptr,
442 }};
443
444 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
445 for (auto& channel : channels) {
446 if (channel.GetName() == channelName) {
447 return channel;
448 }
449 }
450 throw std::runtime_error("Channel not found");
451 };
452
453 proxy.bind({}, {}, routes, findChannelByName, nullptr);
454
455 std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
456
457 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
458 fair::mq::MessagePtr payload1(transport->CreateMessage());
459 fair::mq::MessagePtr payload2(transport->CreateMessage());
460 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
461 auto header1 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh1, dph});
462 std::vector<fair::mq::MessagePtr> messageSet1;
463 messageSet1.emplace_back(std::move(header1));
464 messageSet1.emplace_back(std::move(payload1));
465 REQUIRE((messageSet1 | count_parts{}) == 1);
466
467 auto header2 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh2, dph});
468 std::vector<fair::mq::MessagePtr> messageSet2;
469 messageSet2.emplace_back(std::move(header2));
470 messageSet2.emplace_back(std::move(payload2));
471 REQUIRE((messageSet2 | count_parts{}) == 1);
472 currentSetOfInputs.emplace_back(std::move(messageSet1));
473 currentSetOfInputs.emplace_back(std::move(messageSet2));
474 REQUIRE(currentSetOfInputs.size() == 2);
475
476 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
477 REQUIRE(result.size() == 2); // Two routes
478 REQUIRE(result[0].Size() == 2); //
479 REQUIRE(result[1].Size() == 2); //
480}
481
482TEST_CASE("ForwardInputsSingleMessageMultipleRoutesOnlyOneMatches")
483{
485 dh.dataOrigin = "TST";
486 dh.dataDescription = "A";
487 dh.subSpecification = 0;
488 dh.splitPayloadIndex = 0;
489 dh.splitPayloadParts = 1;
490
492
493 std::vector<fair::mq::Channel> channels{
494 fair::mq::Channel("from_A_to_B"),
495 fair::mq::Channel("from_A_to_C"),
496 };
497
498 bool consume = true;
499 bool copyByDefault = true;
500 FairMQDeviceProxy proxy;
501 std::vector<ForwardRoute> routes{
503 .timeslice = 0,
504 .maxTimeslices = 1,
505 .matcher = {"binding", ConcreteDataMatcher{"TST", "B", 0}},
506 .channel = "from_A_to_B",
507 .policy = nullptr,
508 },
510 .timeslice = 0,
511 .maxTimeslices = 1,
512 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
513 .channel = "from_A_to_C",
514 .policy = nullptr,
515 }};
516
517 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
518 for (auto& channel : channels) {
519 if (channel.GetName() == channelName) {
520 return channel;
521 }
522 }
523 throw std::runtime_error("Channel not found");
524 };
525
526 proxy.bind({}, {}, routes, findChannelByName, nullptr);
527
528 std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
529 std::vector<fair::mq::MessagePtr> messageSet;
530
531 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
532 fair::mq::MessagePtr payload(transport->CreateMessage());
533 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
534 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
535 messageSet.emplace_back(std::move(header));
536 messageSet.emplace_back(std::move(payload));
537 REQUIRE((messageSet | count_parts{}) == 1);
538 currentSetOfInputs.emplace_back(std::move(messageSet));
539
540 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
541 REQUIRE(result.size() == 2); // Two routes
542 REQUIRE(result[0].Size() == 0); // Two messages per route
543 REQUIRE(result[1].Size() == 2); // Two messages per route
544}
545
546TEST_CASE("ForwardInputsSplitPayload")
547{
549 dh.dataOrigin = "TST";
550 dh.dataDescription = "A";
551 dh.subSpecification = 0;
552 dh.splitPayloadIndex = 2;
553 dh.splitPayloadParts = 2;
554
556 dh2.dataOrigin = "TST";
557 dh2.dataDescription = "B";
558 dh2.subSpecification = 0;
559 dh2.splitPayloadIndex = 0;
560 dh2.splitPayloadParts = 1;
561
563
564 std::vector<fair::mq::Channel> channels{
565 fair::mq::Channel("from_A_to_B"),
566 fair::mq::Channel("from_A_to_C"),
567 };
568
569 bool consume = true;
570 bool copyByDefault = true;
571 FairMQDeviceProxy proxy;
572 std::vector<ForwardRoute> routes{
574 .timeslice = 0,
575 .maxTimeslices = 1,
576 .matcher = {"binding", ConcreteDataMatcher{"TST", "B", 0}},
577 .channel = "from_A_to_B",
578 .policy = nullptr,
579 },
581 .timeslice = 0,
582 .maxTimeslices = 1,
583 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
584 .channel = "from_A_to_C",
585 .policy = nullptr,
586 }};
587
588 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
589 for (auto& channel : channels) {
590 if (channel.GetName() == channelName) {
591 return channel;
592 }
593 }
594 throw std::runtime_error("Channel not found");
595 };
596
597 proxy.bind({}, {}, routes, findChannelByName, nullptr);
598
599 std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
600 std::vector<fair::mq::MessagePtr> messageSet;
601
602 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
603 fair::mq::MessagePtr payload1(transport->CreateMessage());
604 fair::mq::MessagePtr payload2(transport->CreateMessage());
605 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
606 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
607 std::vector<std::unique_ptr<fair::mq::Message>> messages;
608 messages.push_back(std::move(header));
609 messages.push_back(std::move(payload1));
610 messages.push_back(std::move(payload2));
611 auto fillMessages = [&messages](size_t t) -> fair::mq::MessagePtr {
612 return std::move(messages[t]);
613 };
614 for (size_t i = 0; i < 3; ++i) {
615 messageSet.emplace_back(fillMessages(i));
616 }
617 auto header2 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh2, dph});
618 messageSet.emplace_back(std::move(header2));
619 messageSet.emplace_back(transport->CreateMessage());
620
621 REQUIRE((messageSet | count_parts{}) == 2);
622 currentSetOfInputs.emplace_back(std::move(messageSet));
623
624 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
625 REQUIRE(result.size() == 2); // Two routes
626 CHECK(result[0].Size() == 2); // No messages on this route
627 CHECK(result[1].Size() == 3);
628}
629
630TEST_CASE("ForwardInputsSplitPayloadNoMessageSet")
631{
633 dh.dataOrigin = "TST";
634 dh.dataDescription = "A";
635 dh.subSpecification = 0;
636 dh.splitPayloadIndex = 2;
637 dh.splitPayloadParts = 2;
638
640 dh2.dataOrigin = "TST";
641 dh2.dataDescription = "B";
642 dh2.subSpecification = 0;
643 dh2.splitPayloadIndex = 0;
644 dh2.splitPayloadParts = 1;
645
647
648 std::vector<fair::mq::Channel> channels{
649 fair::mq::Channel("from_A_to_B"),
650 fair::mq::Channel("from_A_to_C"),
651 };
652
653 bool consume = true;
654 bool copyByDefault = true;
655 FairMQDeviceProxy proxy;
656 std::vector<ForwardRoute> routes{
658 .timeslice = 0,
659 .maxTimeslices = 1,
660 .matcher = {"binding", ConcreteDataMatcher{"TST", "B", 0}},
661 .channel = "from_A_to_B",
662 .policy = nullptr,
663 },
665 .timeslice = 0,
666 .maxTimeslices = 1,
667 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
668 .channel = "from_A_to_C",
669 .policy = nullptr,
670 }};
671
672 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
673 for (auto& channel : channels) {
674 if (channel.GetName() == channelName) {
675 return channel;
676 }
677 }
678 throw std::runtime_error("Channel not found");
679 };
680
681 proxy.bind({}, {}, routes, findChannelByName, nullptr);
682
683 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
684 fair::mq::MessagePtr payload1(transport->CreateMessage());
685 fair::mq::MessagePtr payload2(transport->CreateMessage());
686 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
687 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
688 std::vector<std::unique_ptr<fair::mq::Message>> messages;
689 messages.push_back(std::move(header));
690 messages.push_back(std::move(payload1));
691 messages.push_back(std::move(payload2));
692 auto header2 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh2, dph});
693 messages.push_back(std::move(header2));
694 messages.push_back(transport->CreateMessage());
695
696 std::vector<fair::mq::Parts> result(2);
697 auto span = std::span(messages);
698 o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, span, result, copyByDefault, consume);
699 REQUIRE(result.size() == 2); // Two routes
700 CHECK(result[0].Size() == 2); // No messages on this route
701 CHECK(result[1].Size() == 3);
702}
703
704TEST_CASE("ForwardInputEOSSingleRoute")
705{
707
708 std::vector<fair::mq::Channel> channels{
709 fair::mq::Channel("from_A_to_B")};
710
711 bool consume = true;
712 bool copyByDefault = true;
713 FairMQDeviceProxy proxy;
714 std::vector<ForwardRoute> routes{ForwardRoute{
715 .timeslice = 0,
716 .maxTimeslices = 1,
717 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
718 .channel = "from_A_to_B",
719 .policy = nullptr,
720 }};
721
722 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
723 for (auto& channel : channels) {
724 if (channel.GetName() == channelName) {
725 return channel;
726 }
727 }
728 throw std::runtime_error("Channel not found");
729 };
730
731 proxy.bind({}, {}, routes, findChannelByName, nullptr);
732
733 std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
734 std::vector<fair::mq::MessagePtr> messageSet;
735
736 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
737 fair::mq::MessagePtr payload(transport->CreateMessage());
738 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
739 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, sih});
740 messageSet.emplace_back(std::move(header));
741 messageSet.emplace_back(std::move(payload));
742 REQUIRE((messageSet | count_parts{}) == 1);
743 currentSetOfInputs.emplace_back(std::move(messageSet));
744
745 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
746 REQUIRE(result.size() == 1); // One route
747 REQUIRE(result[0].Size() == 0); // Oldest possible timeframe should not be forwarded
748}
749
750TEST_CASE("ForwardInputOldestPossibleSingleRoute")
751{
753
754 std::vector<fair::mq::Channel> channels{
755 fair::mq::Channel("from_A_to_B")};
756
757 bool consume = true;
758 bool copyByDefault = true;
759 FairMQDeviceProxy proxy;
760 std::vector<ForwardRoute> routes{ForwardRoute{
761 .timeslice = 0,
762 .maxTimeslices = 1,
763 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
764 .channel = "from_A_to_B",
765 .policy = nullptr,
766 }};
767
768 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
769 for (auto& channel : channels) {
770 if (channel.GetName() == channelName) {
771 return channel;
772 }
773 }
774 throw std::runtime_error("Channel not found");
775 };
776
777 proxy.bind({}, {}, routes, findChannelByName, nullptr);
778
779 std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
780 std::vector<fair::mq::MessagePtr> messageSet;
781
782 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
783 fair::mq::MessagePtr payload(transport->CreateMessage());
784 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
785 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dih});
786 messageSet.emplace_back(std::move(header));
787 messageSet.emplace_back(std::move(payload));
788 REQUIRE((messageSet | count_parts{}) == 1);
789 currentSetOfInputs.emplace_back(std::move(messageSet));
790
791 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
792 REQUIRE(result.size() == 1); // One route
793 REQUIRE(result[0].Size() == 0); // Oldest possible timeframe should not be forwarded
794}
std::vector< OutputRoute > routes
int32_t i
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
void bind(std::vector< OutputRoute > const &outputs, std::vector< InputRoute > const &inputs, std::vector< ForwardRoute > const &forwards, std::function< fair::mq::Channel &(std::string const &)> bindChannelByName, std::function< bool(void)> newStateRequestedCallback)
#define CHECK
GLuint64EXT * result
Definition glcorearb.h:5662
Defining PrimaryVertex explicitly as messageable.
Definition Cartesian.h:288
TEST_CASE("test_prepareArguments")
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
static std::vector< fair::mq::Parts > routeForwardedMessageSet(FairMQDeviceProxy &proxy, std::vector< std::vector< fair::mq::MessagePtr > > &currentSetOfInputs, bool copy, bool consume)
Helper to route messages for forwarding.
static void routeForwardedMessages(FairMQDeviceProxy &proxy, std::span< fair::mq::MessagePtr > &currentSetOfInputs, std::vector< fair::mq::Parts > &forwardedParts, bool copy, bool consume)
Helper to route messages for forwarding.
a BaseHeader with domain information from the source
a BaseHeader with state information from the source
the main header struct
Definition DataHeader.h:620
SplitPayloadPartsType splitPayloadParts
Definition DataHeader.h:648
DataDescription dataDescription
Definition DataHeader.h:638
SubSpecificationType subSpecification
Definition DataHeader.h:658
SplitPayloadIndexType splitPayloadIndex
Definition DataHeader.h:663
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:33
std::vector< ChannelData > channels