62 std::vector<fair::mq::Channel>
channels{
63 fair::mq::Channel(
"from_A_to_B")};
66 bool copyByDefault =
true;
72 .channel =
"from_A_to_B",
76 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
78 if (channel.GetName() == channelName) {
82 throw std::runtime_error(
"Channel not found");
85 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
87 std::vector<MessageSet> currentSetOfInputs;
90 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
91 fair::mq::MessagePtr payload(transport->CreateMessage());
92 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
94 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
95 REQUIRE(messageSet.
size() == 1);
96 currentSetOfInputs.emplace_back(std::move(messageSet));
99 REQUIRE(
result.size() == 1);
103TEST_CASE(
"ForwardInputsSingleMessageSingleRouteNoConsume")
113 std::vector<fair::mq::Channel>
channels{
114 fair::mq::Channel(
"from_A_to_B")};
116 bool copyByDefault =
false;
122 .channel =
"from_A_to_B",
126 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
128 if (channel.GetName() == channelName) {
132 throw std::runtime_error(
"Channel not found");
135 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
137 std::vector<MessageSet> currentSetOfInputs;
140 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
141 fair::mq::MessagePtr payload(
nullptr);
142 REQUIRE(payload.get() ==
nullptr);
143 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
145 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
146 REQUIRE(messageSet.
size() == 1);
147 currentSetOfInputs.emplace_back(std::move(messageSet));
150 REQUIRE(
result.size() == 1);
167 std::vector<fair::mq::Channel>
channels{
168 fair::mq::Channel(
"from_A_to_B")};
171 bool copyByDefault =
true;
177 .channel =
"from_A_to_B",
181 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
183 if (channel.GetName() == channelName) {
187 throw std::runtime_error(
"Channel not found");
190 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
192 std::vector<MessageSet> currentSetOfInputs;
195 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
196 fair::mq::MessagePtr payload(transport->CreateMessage());
197 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
199 REQUIRE(o2::header::get<SourceInfoHeader*>(header->GetData()));
200 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
201 REQUIRE(messageSet.
size() == 1);
202 currentSetOfInputs.emplace_back(std::move(messageSet));
205 REQUIRE(
result.size() == 1);
212TEST_CASE(
"ForwardInputsSingleMessageSingleRouteWithOldestPossible")
225 std::vector<fair::mq::Channel>
channels{
226 fair::mq::Channel(
"from_A_to_B")};
229 bool copyByDefault =
true;
235 .channel =
"from_A_to_B",
239 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
241 if (channel.GetName() == channelName) {
245 throw std::runtime_error(
"Channel not found");
248 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
250 std::vector<MessageSet> currentSetOfInputs;
253 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
254 fair::mq::MessagePtr payload(transport->CreateMessage());
255 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
257 REQUIRE(o2::header::get<DomainInfoHeader*>(header->GetData()));
258 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
259 REQUIRE(messageSet.
size() == 1);
260 currentSetOfInputs.emplace_back(std::move(messageSet));
263 REQUIRE(
result.size() == 1);
281 std::vector<fair::mq::Channel>
channels{
282 fair::mq::Channel(
"from_A_to_B"),
283 fair::mq::Channel(
"from_A_to_C"),
287 bool copyByDefault =
true;
289 std::vector<ForwardRoute>
routes{
294 .channel =
"from_A_to_B",
301 .channel =
"from_A_to_C",
305 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
307 if (channel.GetName() == channelName) {
311 throw std::runtime_error(
"Channel not found");
314 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
316 std::vector<MessageSet> currentSetOfInputs;
319 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
320 fair::mq::MessagePtr payload(transport->CreateMessage());
321 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
323 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
324 REQUIRE(messageSet.
size() == 1);
325 currentSetOfInputs.emplace_back(std::move(messageSet));
328 REQUIRE(
result.size() == 2);
333TEST_CASE(
"ForwardInputsSingleMessageMultipleRoutesExternals")
344 std::vector<fair::mq::Channel>
channels{
345 fair::mq::Channel(
"external"),
346 fair::mq::Channel(
"from_A_to_C"),
350 bool copyByDefault =
true;
352 std::vector<ForwardRoute>
routes{
357 .channel =
"external",
364 .channel =
"from_A_to_C",
368 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
370 if (channel.GetName() == channelName) {
374 throw std::runtime_error(
"Channel not found");
377 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
379 std::vector<MessageSet> currentSetOfInputs;
382 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
383 fair::mq::MessagePtr payload(transport->CreateMessage());
384 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
386 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
387 REQUIRE(messageSet.
size() == 1);
388 currentSetOfInputs.emplace_back(std::move(messageSet));
391 REQUIRE(
result.size() == 2);
414 std::vector<fair::mq::Channel>
channels{
415 fair::mq::Channel(
"from_A_to_B"),
416 fair::mq::Channel(
"from_A_to_C"),
420 bool copyByDefault =
true;
422 std::vector<ForwardRoute>
routes{
427 .channel =
"from_A_to_B",
434 .channel =
"from_A_to_C",
438 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
440 if (channel.GetName() == channelName) {
444 throw std::runtime_error(
"Channel not found");
447 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
449 std::vector<MessageSet> currentSetOfInputs;
451 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
452 fair::mq::MessagePtr payload1(transport->CreateMessage());
453 fair::mq::MessagePtr payload2(transport->CreateMessage());
454 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
457 messageSet1.
add(
PartRef{std::move(header1), std::move(payload1)});
458 REQUIRE(messageSet1.size() == 1);
462 messageSet2.
add(
PartRef{std::move(header2), std::move(payload2)});
463 REQUIRE(messageSet2.size() == 1);
464 currentSetOfInputs.emplace_back(std::move(messageSet1));
465 currentSetOfInputs.emplace_back(std::move(messageSet2));
466 REQUIRE(currentSetOfInputs.size() == 2);
469 REQUIRE(
result.size() == 2);
474TEST_CASE(
"ForwardInputsSingleMessageMultipleRoutesOnlyOneMatches")
485 std::vector<fair::mq::Channel>
channels{
486 fair::mq::Channel(
"from_A_to_B"),
487 fair::mq::Channel(
"from_A_to_C"),
491 bool copyByDefault =
true;
493 std::vector<ForwardRoute>
routes{
498 .channel =
"from_A_to_B",
505 .channel =
"from_A_to_C",
509 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
511 if (channel.GetName() == channelName) {
515 throw std::runtime_error(
"Channel not found");
518 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
520 std::vector<MessageSet> currentSetOfInputs;
523 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
524 fair::mq::MessagePtr payload(transport->CreateMessage());
525 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
527 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
528 REQUIRE(messageSet.
size() == 1);
529 currentSetOfInputs.emplace_back(std::move(messageSet));
532 REQUIRE(
result.size() == 2);
555 std::vector<fair::mq::Channel>
channels{
556 fair::mq::Channel(
"from_A_to_B"),
557 fair::mq::Channel(
"from_A_to_C"),
561 bool copyByDefault =
true;
563 std::vector<ForwardRoute>
routes{
568 .channel =
"from_A_to_B",
575 .channel =
"from_A_to_C",
579 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
581 if (channel.GetName() == channelName) {
585 throw std::runtime_error(
"Channel not found");
588 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
590 std::vector<MessageSet> currentSetOfInputs;
593 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
594 fair::mq::MessagePtr payload1(transport->CreateMessage());
595 fair::mq::MessagePtr payload2(transport->CreateMessage());
596 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
598 std::vector<std::unique_ptr<fair::mq::Message>> messages;
599 messages.push_back(std::move(header));
600 messages.push_back(std::move(payload1));
601 messages.push_back(std::move(payload2));
602 auto fillMessages = [&messages](
size_t t) -> fair::mq::MessagePtr {
603 return std::move(messages[t]);
605 messageSet.
add(fillMessages, 3);
607 PartRef part{std::move(header2), transport->CreateMessage()};
608 messageSet.
add(std::move(part));
610 REQUIRE(messageSet.
size() == 2);
611 currentSetOfInputs.emplace_back(std::move(messageSet));
614 REQUIRE(
result.size() == 2);
637 std::vector<fair::mq::Channel>
channels{
638 fair::mq::Channel(
"from_A_to_B"),
639 fair::mq::Channel(
"from_A_to_C"),
643 bool copyByDefault =
true;
645 std::vector<ForwardRoute>
routes{
650 .channel =
"from_A_to_B",
657 .channel =
"from_A_to_C",
661 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
663 if (channel.GetName() == channelName) {
667 throw std::runtime_error(
"Channel not found");
670 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
672 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
673 fair::mq::MessagePtr payload1(transport->CreateMessage());
674 fair::mq::MessagePtr payload2(transport->CreateMessage());
675 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
677 std::vector<std::unique_ptr<fair::mq::Message>> messages;
678 messages.push_back(std::move(header));
679 messages.push_back(std::move(payload1));
680 messages.push_back(std::move(payload2));
682 messages.push_back(std::move(header2));
683 messages.push_back(transport->CreateMessage());
685 std::vector<fair::mq::Parts>
result(2);
686 auto span = std::span(messages);
688 REQUIRE(
result.size() == 2);
697 std::vector<fair::mq::Channel>
channels{
698 fair::mq::Channel(
"from_A_to_B")};
701 bool copyByDefault =
true;
707 .channel =
"from_A_to_B",
711 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
713 if (channel.GetName() == channelName) {
717 throw std::runtime_error(
"Channel not found");
720 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
722 std::vector<MessageSet> currentSetOfInputs;
725 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
726 fair::mq::MessagePtr payload(transport->CreateMessage());
727 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
729 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
730 REQUIRE(messageSet.
size() == 1);
731 currentSetOfInputs.emplace_back(std::move(messageSet));
734 REQUIRE(
result.size() == 1);
742 std::vector<fair::mq::Channel>
channels{
743 fair::mq::Channel(
"from_A_to_B")};
746 bool copyByDefault =
true;
752 .channel =
"from_A_to_B",
756 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
758 if (channel.GetName() == channelName) {
762 throw std::runtime_error(
"Channel not found");
765 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
767 std::vector<MessageSet> currentSetOfInputs;
770 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
771 fair::mq::MessagePtr payload(transport->CreateMessage());
772 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
774 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
775 REQUIRE(messageSet.
size() == 1);
776 currentSetOfInputs.emplace_back(std::move(messageSet));
779 REQUIRE(
result.size() == 1);
#define O2_DECLARE_DYNAMIC_LOG(name)
void bind(std::vector< OutputRoute > const &outputs, std::vector< InputRoute > const &inputs, std::vector< ForwardRoute > const &forwards, std::function< fair::mq::Channel &(std::string const &)> bindChannelByName, std::function< bool(void)> newStateRequestedCallback)
static void routeForwardedMessages(FairMQDeviceProxy &proxy, std::span< fair::mq::MessagePtr > ¤tSetOfInputs, std::vector< fair::mq::Parts > &forwardedParts, bool copy, bool consume)
Helper to route messages for forwarding.
a BaseHeader with domain information from the source