62 std::vector<fair::mq::Channel>
channels{
63 fair::mq::Channel(
"from_A_to_B")};
66 bool copyByDefault =
true;
72 .channel =
"from_A_to_B",
76 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
78 if (channel.GetName() == channelName) {
82 throw std::runtime_error(
"Channel not found");
85 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
87 std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
88 std::vector<fair::mq::MessagePtr> messageSet;
90 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
91 fair::mq::MessagePtr payload(transport->CreateMessage());
92 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
94 messageSet.emplace_back(std::move(header));
95 messageSet.emplace_back(std::move(payload));
97 currentSetOfInputs.emplace_back(std::move(messageSet));
100 REQUIRE(
result.size() == 1);
104TEST_CASE(
"ForwardInputsSingleMessageSingleRouteNoConsume")
114 std::vector<fair::mq::Channel>
channels{
115 fair::mq::Channel(
"from_A_to_B")};
117 bool copyByDefault =
false;
123 .channel =
"from_A_to_B",
127 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
129 if (channel.GetName() == channelName) {
133 throw std::runtime_error(
"Channel not found");
136 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
138 std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
139 std::vector<fair::mq::MessagePtr> messageSet;
141 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
142 fair::mq::MessagePtr payload(
nullptr);
143 REQUIRE(payload.get() ==
nullptr);
144 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
146 messageSet.emplace_back(std::move(header));
147 messageSet.emplace_back(std::move(payload));
149 currentSetOfInputs.emplace_back(std::move(messageSet));
152 REQUIRE(
result.size() == 1);
169 std::vector<fair::mq::Channel>
channels{
170 fair::mq::Channel(
"from_A_to_B")};
173 bool copyByDefault =
true;
179 .channel =
"from_A_to_B",
183 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
185 if (channel.GetName() == channelName) {
189 throw std::runtime_error(
"Channel not found");
192 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
194 std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
195 std::vector<fair::mq::MessagePtr> messageSet;
197 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
198 fair::mq::MessagePtr payload(transport->CreateMessage());
199 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
201 REQUIRE(o2::header::get<SourceInfoHeader*>(header->GetData()));
202 messageSet.emplace_back(std::move(header));
203 messageSet.emplace_back(std::move(payload));
205 currentSetOfInputs.emplace_back(std::move(messageSet));
208 REQUIRE(
result.size() == 1);
215TEST_CASE(
"ForwardInputsSingleMessageSingleRouteWithOldestPossible")
228 std::vector<fair::mq::Channel>
channels{
229 fair::mq::Channel(
"from_A_to_B")};
232 bool copyByDefault =
true;
238 .channel =
"from_A_to_B",
242 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
244 if (channel.GetName() == channelName) {
248 throw std::runtime_error(
"Channel not found");
251 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
253 std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
254 std::vector<fair::mq::MessagePtr> messageSet;
256 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
257 fair::mq::MessagePtr payload(transport->CreateMessage());
258 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
260 REQUIRE(o2::header::get<DomainInfoHeader*>(header->GetData()));
261 messageSet.emplace_back(std::move(header));
262 messageSet.emplace_back(std::move(payload));
264 currentSetOfInputs.emplace_back(std::move(messageSet));
267 REQUIRE(
result.size() == 1);
285 std::vector<fair::mq::Channel>
channels{
286 fair::mq::Channel(
"from_A_to_B"),
287 fair::mq::Channel(
"from_A_to_C"),
291 bool copyByDefault =
true;
293 std::vector<ForwardRoute>
routes{
298 .channel =
"from_A_to_B",
305 .channel =
"from_A_to_C",
309 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
311 if (channel.GetName() == channelName) {
315 throw std::runtime_error(
"Channel not found");
318 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
320 std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
321 std::vector<fair::mq::MessagePtr> messageSet;
323 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
324 fair::mq::MessagePtr payload(transport->CreateMessage());
325 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
327 messageSet.emplace_back(std::move(header));
328 messageSet.emplace_back(std::move(payload));
330 currentSetOfInputs.emplace_back(std::move(messageSet));
333 REQUIRE(
result.size() == 2);
338TEST_CASE(
"ForwardInputsSingleMessageMultipleRoutesExternals")
349 std::vector<fair::mq::Channel>
channels{
350 fair::mq::Channel(
"external"),
351 fair::mq::Channel(
"from_A_to_C"),
355 bool copyByDefault =
true;
357 std::vector<ForwardRoute>
routes{
362 .channel =
"external",
369 .channel =
"from_A_to_C",
373 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
375 if (channel.GetName() == channelName) {
379 throw std::runtime_error(
"Channel not found");
382 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
384 std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
385 std::vector<fair::mq::MessagePtr> messageSet;
387 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
388 fair::mq::MessagePtr payload(transport->CreateMessage());
389 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
391 messageSet.emplace_back(std::move(header));
392 messageSet.emplace_back(std::move(payload));
394 currentSetOfInputs.emplace_back(std::move(messageSet));
397 REQUIRE(
result.size() == 2);
420 std::vector<fair::mq::Channel>
channels{
421 fair::mq::Channel(
"from_A_to_B"),
422 fair::mq::Channel(
"from_A_to_C"),
426 bool copyByDefault =
true;
428 std::vector<ForwardRoute>
routes{
433 .channel =
"from_A_to_B",
440 .channel =
"from_A_to_C",
444 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
446 if (channel.GetName() == channelName) {
450 throw std::runtime_error(
"Channel not found");
453 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
455 std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
457 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
458 fair::mq::MessagePtr payload1(transport->CreateMessage());
459 fair::mq::MessagePtr payload2(transport->CreateMessage());
460 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
462 std::vector<fair::mq::MessagePtr> messageSet1;
463 messageSet1.emplace_back(std::move(header1));
464 messageSet1.emplace_back(std::move(payload1));
468 std::vector<fair::mq::MessagePtr> messageSet2;
469 messageSet2.emplace_back(std::move(header2));
470 messageSet2.emplace_back(std::move(payload2));
472 currentSetOfInputs.emplace_back(std::move(messageSet1));
473 currentSetOfInputs.emplace_back(std::move(messageSet2));
474 REQUIRE(currentSetOfInputs.size() == 2);
477 REQUIRE(
result.size() == 2);
482TEST_CASE(
"ForwardInputsSingleMessageMultipleRoutesOnlyOneMatches")
493 std::vector<fair::mq::Channel>
channels{
494 fair::mq::Channel(
"from_A_to_B"),
495 fair::mq::Channel(
"from_A_to_C"),
499 bool copyByDefault =
true;
501 std::vector<ForwardRoute>
routes{
506 .channel =
"from_A_to_B",
513 .channel =
"from_A_to_C",
517 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
519 if (channel.GetName() == channelName) {
523 throw std::runtime_error(
"Channel not found");
526 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
528 std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
529 std::vector<fair::mq::MessagePtr> messageSet;
531 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
532 fair::mq::MessagePtr payload(transport->CreateMessage());
533 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
535 messageSet.emplace_back(std::move(header));
536 messageSet.emplace_back(std::move(payload));
538 currentSetOfInputs.emplace_back(std::move(messageSet));
541 REQUIRE(
result.size() == 2);
564 std::vector<fair::mq::Channel>
channels{
565 fair::mq::Channel(
"from_A_to_B"),
566 fair::mq::Channel(
"from_A_to_C"),
570 bool copyByDefault =
true;
572 std::vector<ForwardRoute>
routes{
577 .channel =
"from_A_to_B",
584 .channel =
"from_A_to_C",
588 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
590 if (channel.GetName() == channelName) {
594 throw std::runtime_error(
"Channel not found");
597 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
599 std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
600 std::vector<fair::mq::MessagePtr> messageSet;
602 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
603 fair::mq::MessagePtr payload1(transport->CreateMessage());
604 fair::mq::MessagePtr payload2(transport->CreateMessage());
605 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
607 std::vector<std::unique_ptr<fair::mq::Message>> messages;
608 messages.push_back(std::move(header));
609 messages.push_back(std::move(payload1));
610 messages.push_back(std::move(payload2));
611 auto fillMessages = [&messages](
size_t t) -> fair::mq::MessagePtr {
612 return std::move(messages[t]);
614 for (
size_t i = 0;
i < 3; ++
i) {
615 messageSet.emplace_back(fillMessages(
i));
618 messageSet.emplace_back(std::move(header2));
619 messageSet.emplace_back(transport->CreateMessage());
622 currentSetOfInputs.emplace_back(std::move(messageSet));
625 REQUIRE(
result.size() == 2);
648 std::vector<fair::mq::Channel>
channels{
649 fair::mq::Channel(
"from_A_to_B"),
650 fair::mq::Channel(
"from_A_to_C"),
654 bool copyByDefault =
true;
656 std::vector<ForwardRoute>
routes{
661 .channel =
"from_A_to_B",
668 .channel =
"from_A_to_C",
672 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
674 if (channel.GetName() == channelName) {
678 throw std::runtime_error(
"Channel not found");
681 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
683 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
684 fair::mq::MessagePtr payload1(transport->CreateMessage());
685 fair::mq::MessagePtr payload2(transport->CreateMessage());
686 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
688 std::vector<std::unique_ptr<fair::mq::Message>> messages;
689 messages.push_back(std::move(header));
690 messages.push_back(std::move(payload1));
691 messages.push_back(std::move(payload2));
693 messages.push_back(std::move(header2));
694 messages.push_back(transport->CreateMessage());
696 std::vector<fair::mq::Parts>
result(2);
697 auto span = std::span(messages);
699 REQUIRE(
result.size() == 2);
708 std::vector<fair::mq::Channel>
channels{
709 fair::mq::Channel(
"from_A_to_B")};
712 bool copyByDefault =
true;
718 .channel =
"from_A_to_B",
722 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
724 if (channel.GetName() == channelName) {
728 throw std::runtime_error(
"Channel not found");
731 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
733 std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
734 std::vector<fair::mq::MessagePtr> messageSet;
736 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
737 fair::mq::MessagePtr payload(transport->CreateMessage());
738 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
740 messageSet.emplace_back(std::move(header));
741 messageSet.emplace_back(std::move(payload));
743 currentSetOfInputs.emplace_back(std::move(messageSet));
746 REQUIRE(
result.size() == 1);
754 std::vector<fair::mq::Channel>
channels{
755 fair::mq::Channel(
"from_A_to_B")};
758 bool copyByDefault =
true;
764 .channel =
"from_A_to_B",
768 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
770 if (channel.GetName() == channelName) {
774 throw std::runtime_error(
"Channel not found");
777 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
779 std::vector<std::vector<fair::mq::MessagePtr>> currentSetOfInputs;
780 std::vector<fair::mq::MessagePtr> messageSet;
782 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
783 fair::mq::MessagePtr payload(transport->CreateMessage());
784 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
786 messageSet.emplace_back(std::move(header));
787 messageSet.emplace_back(std::move(payload));
789 currentSetOfInputs.emplace_back(std::move(messageSet));
792 REQUIRE(
result.size() == 1);
#define O2_DECLARE_DYNAMIC_LOG(name)
void bind(std::vector< OutputRoute > const &outputs, std::vector< InputRoute > const &inputs, std::vector< ForwardRoute > const &forwards, std::function< fair::mq::Channel &(std::string const &)> bindChannelByName, std::function< bool(void)> newStateRequestedCallback)
static std::vector< fair::mq::Parts > routeForwardedMessageSet(FairMQDeviceProxy &proxy, std::vector< std::vector< fair::mq::MessagePtr > > ¤tSetOfInputs, bool copy, bool consume)
Helper to route messages for forwarding.
static void routeForwardedMessages(FairMQDeviceProxy &proxy, std::span< fair::mq::MessagePtr > ¤tSetOfInputs, std::vector< fair::mq::Parts > &forwardedParts, bool copy, bool consume)
Helper to route messages for forwarding.
a BaseHeader with domain information from the source