66 std::vector<fair::mq::Channel>
channels{
67 fair::mq::Channel(
"from_A_to_B")};
70 bool copyByDefault =
true;
76 .channel =
"from_A_to_B",
80 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
82 if (channel.GetName() == channelName) {
86 throw std::runtime_error(
"Channel not found");
89 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
92 std::vector<MessageSet> currentSetOfInputs;
95 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
96 fair::mq::MessagePtr payload(transport->CreateMessage());
97 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
99 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
100 REQUIRE(messageSet.
size() == 1);
101 currentSetOfInputs.emplace_back(std::move(messageSet));
106 REQUIRE(
result.size() == 1);
110TEST_CASE(
"ForwardInputsSingleMessageSingleRouteNoConsume")
120 std::vector<fair::mq::Channel>
channels{
121 fair::mq::Channel(
"from_A_to_B")};
123 bool copyByDefault =
false;
129 .channel =
"from_A_to_B",
133 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
135 if (channel.GetName() == channelName) {
139 throw std::runtime_error(
"Channel not found");
142 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
145 std::vector<MessageSet> currentSetOfInputs;
148 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
149 fair::mq::MessagePtr payload(
nullptr);
150 REQUIRE(payload.get() ==
nullptr);
151 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
153 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
154 REQUIRE(messageSet.
size() == 1);
155 currentSetOfInputs.emplace_back(std::move(messageSet));
160 REQUIRE(
result.size() == 1);
177 std::vector<fair::mq::Channel>
channels{
178 fair::mq::Channel(
"from_A_to_B")};
181 bool copyByDefault =
true;
187 .channel =
"from_A_to_B",
191 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
193 if (channel.GetName() == channelName) {
197 throw std::runtime_error(
"Channel not found");
200 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
203 std::vector<MessageSet> currentSetOfInputs;
206 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
207 fair::mq::MessagePtr payload(transport->CreateMessage());
208 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
210 REQUIRE(o2::header::get<SourceInfoHeader*>(header->GetData()));
211 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
212 REQUIRE(messageSet.
size() == 1);
213 currentSetOfInputs.emplace_back(std::move(messageSet));
218 REQUIRE(
result.size() == 1);
225TEST_CASE(
"ForwardInputsSingleMessageSingleRouteWithOldestPossible")
238 std::vector<fair::mq::Channel>
channels{
239 fair::mq::Channel(
"from_A_to_B")};
242 bool copyByDefault =
true;
248 .channel =
"from_A_to_B",
252 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
254 if (channel.GetName() == channelName) {
258 throw std::runtime_error(
"Channel not found");
261 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
264 std::vector<MessageSet> currentSetOfInputs;
267 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
268 fair::mq::MessagePtr payload(transport->CreateMessage());
269 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
271 REQUIRE(o2::header::get<DomainInfoHeader*>(header->GetData()));
272 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
273 REQUIRE(messageSet.
size() == 1);
274 currentSetOfInputs.emplace_back(std::move(messageSet));
279 REQUIRE(
result.size() == 1);
297 std::vector<fair::mq::Channel>
channels{
298 fair::mq::Channel(
"from_A_to_B"),
299 fair::mq::Channel(
"from_A_to_C"),
303 bool copyByDefault =
true;
305 std::vector<ForwardRoute>
routes{
310 .channel =
"from_A_to_B",
317 .channel =
"from_A_to_C",
321 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
323 if (channel.GetName() == channelName) {
327 throw std::runtime_error(
"Channel not found");
330 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
333 std::vector<MessageSet> currentSetOfInputs;
336 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
337 fair::mq::MessagePtr payload(transport->CreateMessage());
338 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
340 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
341 REQUIRE(messageSet.
size() == 1);
342 currentSetOfInputs.emplace_back(std::move(messageSet));
347 REQUIRE(
result.size() == 2);
352TEST_CASE(
"ForwardInputsSingleMessageMultipleRoutesExternals")
363 std::vector<fair::mq::Channel>
channels{
364 fair::mq::Channel(
"external"),
365 fair::mq::Channel(
"from_A_to_C"),
369 bool copyByDefault =
true;
371 std::vector<ForwardRoute>
routes{
376 .channel =
"external",
383 .channel =
"from_A_to_C",
387 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
389 if (channel.GetName() == channelName) {
393 throw std::runtime_error(
"Channel not found");
396 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
399 std::vector<MessageSet> currentSetOfInputs;
402 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
403 fair::mq::MessagePtr payload(transport->CreateMessage());
404 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
406 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
407 REQUIRE(messageSet.
size() == 1);
408 currentSetOfInputs.emplace_back(std::move(messageSet));
413 REQUIRE(
result.size() == 2);
436 std::vector<fair::mq::Channel>
channels{
437 fair::mq::Channel(
"from_A_to_B"),
438 fair::mq::Channel(
"from_A_to_C"),
442 bool copyByDefault =
true;
444 std::vector<ForwardRoute>
routes{
449 .channel =
"from_A_to_B",
456 .channel =
"from_A_to_C",
460 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
462 if (channel.GetName() == channelName) {
466 throw std::runtime_error(
"Channel not found");
469 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
472 std::vector<MessageSet> currentSetOfInputs;
474 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
475 fair::mq::MessagePtr payload1(transport->CreateMessage());
476 fair::mq::MessagePtr payload2(transport->CreateMessage());
477 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
480 messageSet1.
add(
PartRef{std::move(header1), std::move(payload1)});
481 REQUIRE(messageSet1.size() == 1);
485 messageSet2.
add(
PartRef{std::move(header2), std::move(payload2)});
486 REQUIRE(messageSet2.size() == 1);
487 currentSetOfInputs.emplace_back(std::move(messageSet1));
488 currentSetOfInputs.emplace_back(std::move(messageSet2));
489 REQUIRE(currentSetOfInputs.size() == 2);
494 REQUIRE(
result.size() == 2);
499TEST_CASE(
"ForwardInputsSingleMessageMultipleRoutesOnlyOneMatches")
510 std::vector<fair::mq::Channel>
channels{
511 fair::mq::Channel(
"from_A_to_B"),
512 fair::mq::Channel(
"from_A_to_C"),
516 bool copyByDefault =
true;
518 std::vector<ForwardRoute>
routes{
523 .channel =
"from_A_to_B",
530 .channel =
"from_A_to_C",
534 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
536 if (channel.GetName() == channelName) {
540 throw std::runtime_error(
"Channel not found");
543 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
546 std::vector<MessageSet> currentSetOfInputs;
549 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
550 fair::mq::MessagePtr payload(transport->CreateMessage());
551 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
553 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
554 REQUIRE(messageSet.
size() == 1);
555 currentSetOfInputs.emplace_back(std::move(messageSet));
560 REQUIRE(
result.size() == 2);
583 std::vector<fair::mq::Channel>
channels{
584 fair::mq::Channel(
"from_A_to_B"),
585 fair::mq::Channel(
"from_A_to_C"),
589 bool copyByDefault =
true;
591 std::vector<ForwardRoute>
routes{
596 .channel =
"from_A_to_B",
603 .channel =
"from_A_to_C",
607 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
609 if (channel.GetName() == channelName) {
613 throw std::runtime_error(
"Channel not found");
616 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
619 std::vector<MessageSet> currentSetOfInputs;
622 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
623 fair::mq::MessagePtr payload1(transport->CreateMessage());
624 fair::mq::MessagePtr payload2(transport->CreateMessage());
625 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
627 std::vector<std::unique_ptr<fair::mq::Message>> messages;
628 messages.push_back(std::move(header));
629 messages.push_back(std::move(payload1));
630 messages.push_back(std::move(payload2));
631 auto fillMessages = [&messages](
size_t t) -> fair::mq::MessagePtr {
632 return std::move(messages[t]);
634 messageSet.
add(fillMessages, 3);
636 PartRef part{std::move(header2), transport->CreateMessage()};
637 messageSet.
add(std::move(part));
639 REQUIRE(messageSet.
size() == 2);
640 currentSetOfInputs.emplace_back(std::move(messageSet));
645 REQUIRE(
result.size() == 2);
655 std::vector<fair::mq::Channel>
channels{
656 fair::mq::Channel(
"from_A_to_B")};
659 bool copyByDefault =
true;
665 .channel =
"from_A_to_B",
669 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
671 if (channel.GetName() == channelName) {
675 throw std::runtime_error(
"Channel not found");
678 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
681 std::vector<MessageSet> currentSetOfInputs;
684 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
685 fair::mq::MessagePtr payload(transport->CreateMessage());
686 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
688 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
689 REQUIRE(messageSet.
size() == 1);
690 currentSetOfInputs.emplace_back(std::move(messageSet));
695 REQUIRE(
result.size() == 1);
703 std::vector<fair::mq::Channel>
channels{
704 fair::mq::Channel(
"from_A_to_B")};
707 bool copyByDefault =
true;
713 .channel =
"from_A_to_B",
717 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
719 if (channel.GetName() == channelName) {
723 throw std::runtime_error(
"Channel not found");
726 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
729 std::vector<MessageSet> currentSetOfInputs;
732 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
733 fair::mq::MessagePtr payload(transport->CreateMessage());
734 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
736 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
737 REQUIRE(messageSet.
size() == 1);
738 currentSetOfInputs.emplace_back(std::move(messageSet));
743 REQUIRE(
result.size() == 1);
#define O2_DECLARE_DYNAMIC_LOG(name)
void bind(std::vector< OutputRoute > const &outputs, std::vector< InputRoute > const &inputs, std::vector< ForwardRoute > const &forwards, std::function< fair::mq::Channel &(std::string const &)> bindChannelByName, std::function< bool(void)> newStateRequestedCallback)
static std::vector< fair::mq::Parts > routeForwardedMessages(FairMQDeviceProxy &proxy, TimesliceSlot slot, std::vector< MessageSet > ¤tSetOfInputs, TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume)
Helper to route messages for forwarding.
a BaseHeader with domain information from the source