62 std::vector<fair::mq::Channel>
channels{
63 fair::mq::Channel(
"from_A_to_B")};
66 bool copyByDefault =
true;
72 .channel =
"from_A_to_B",
76 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
78 if (channel.GetName() == channelName) {
82 throw std::runtime_error(
"Channel not found");
85 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
87 std::vector<MessageSet> currentSetOfInputs;
90 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
91 fair::mq::MessagePtr payload(transport->CreateMessage());
92 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
94 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
95 REQUIRE(messageSet.
size() == 1);
96 currentSetOfInputs.emplace_back(std::move(messageSet));
99 REQUIRE(
result.size() == 1);
103TEST_CASE(
"ForwardInputsSingleMessageSingleRouteNoConsume")
113 std::vector<fair::mq::Channel>
channels{
114 fair::mq::Channel(
"from_A_to_B")};
116 bool copyByDefault =
false;
122 .channel =
"from_A_to_B",
126 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
128 if (channel.GetName() == channelName) {
132 throw std::runtime_error(
"Channel not found");
135 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
137 std::vector<MessageSet> currentSetOfInputs;
140 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
141 fair::mq::MessagePtr payload(
nullptr);
142 REQUIRE(payload.get() ==
nullptr);
143 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
145 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
146 REQUIRE(messageSet.
size() == 1);
147 currentSetOfInputs.emplace_back(std::move(messageSet));
150 REQUIRE(
result.size() == 1);
167 std::vector<fair::mq::Channel>
channels{
168 fair::mq::Channel(
"from_A_to_B")};
171 bool copyByDefault =
true;
177 .channel =
"from_A_to_B",
181 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
183 if (channel.GetName() == channelName) {
187 throw std::runtime_error(
"Channel not found");
190 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
192 std::vector<MessageSet> currentSetOfInputs;
195 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
196 fair::mq::MessagePtr payload(transport->CreateMessage());
197 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
199 REQUIRE(o2::header::get<SourceInfoHeader*>(header->GetData()));
200 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
201 REQUIRE(messageSet.
size() == 1);
202 currentSetOfInputs.emplace_back(std::move(messageSet));
205 REQUIRE(
result.size() == 1);
212TEST_CASE(
"ForwardInputsSingleMessageSingleRouteWithOldestPossible")
225 std::vector<fair::mq::Channel>
channels{
226 fair::mq::Channel(
"from_A_to_B")};
229 bool copyByDefault =
true;
235 .channel =
"from_A_to_B",
239 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
241 if (channel.GetName() == channelName) {
245 throw std::runtime_error(
"Channel not found");
248 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
250 std::vector<MessageSet> currentSetOfInputs;
253 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
254 fair::mq::MessagePtr payload(transport->CreateMessage());
255 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
257 REQUIRE(o2::header::get<DomainInfoHeader*>(header->GetData()));
258 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
259 REQUIRE(messageSet.
size() == 1);
260 currentSetOfInputs.emplace_back(std::move(messageSet));
263 REQUIRE(
result.size() == 1);
281 std::vector<fair::mq::Channel>
channels{
282 fair::mq::Channel(
"from_A_to_B"),
283 fair::mq::Channel(
"from_A_to_C"),
287 bool copyByDefault =
true;
289 std::vector<ForwardRoute>
routes{
294 .channel =
"from_A_to_B",
301 .channel =
"from_A_to_C",
305 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
307 if (channel.GetName() == channelName) {
311 throw std::runtime_error(
"Channel not found");
314 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
316 std::vector<MessageSet> currentSetOfInputs;
319 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
320 fair::mq::MessagePtr payload(transport->CreateMessage());
321 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
323 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
324 REQUIRE(messageSet.
size() == 1);
325 currentSetOfInputs.emplace_back(std::move(messageSet));
328 REQUIRE(
result.size() == 2);
333TEST_CASE(
"ForwardInputsSingleMessageMultipleRoutesExternals")
344 std::vector<fair::mq::Channel>
channels{
345 fair::mq::Channel(
"external"),
346 fair::mq::Channel(
"from_A_to_C"),
350 bool copyByDefault =
true;
352 std::vector<ForwardRoute>
routes{
357 .channel =
"external",
364 .channel =
"from_A_to_C",
368 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
370 if (channel.GetName() == channelName) {
374 throw std::runtime_error(
"Channel not found");
377 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
379 std::vector<MessageSet> currentSetOfInputs;
382 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
383 fair::mq::MessagePtr payload(transport->CreateMessage());
384 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
386 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
387 REQUIRE(messageSet.
size() == 1);
388 currentSetOfInputs.emplace_back(std::move(messageSet));
391 REQUIRE(
result.size() == 2);
414 std::vector<fair::mq::Channel>
channels{
415 fair::mq::Channel(
"from_A_to_B"),
416 fair::mq::Channel(
"from_A_to_C"),
420 bool copyByDefault =
true;
422 std::vector<ForwardRoute>
routes{
427 .channel =
"from_A_to_B",
434 .channel =
"from_A_to_C",
438 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
440 if (channel.GetName() == channelName) {
444 throw std::runtime_error(
"Channel not found");
447 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
449 std::vector<MessageSet> currentSetOfInputs;
451 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
452 fair::mq::MessagePtr payload1(transport->CreateMessage());
453 fair::mq::MessagePtr payload2(transport->CreateMessage());
454 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
457 messageSet1.
add(
PartRef{std::move(header1), std::move(payload1)});
458 REQUIRE(messageSet1.size() == 1);
462 messageSet2.
add(
PartRef{std::move(header2), std::move(payload2)});
463 REQUIRE(messageSet2.size() == 1);
464 currentSetOfInputs.emplace_back(std::move(messageSet1));
465 currentSetOfInputs.emplace_back(std::move(messageSet2));
466 REQUIRE(currentSetOfInputs.size() == 2);
469 REQUIRE(
result.size() == 2);
474TEST_CASE(
"ForwardInputsSingleMessageMultipleRoutesOnlyOneMatches")
485 std::vector<fair::mq::Channel>
channels{
486 fair::mq::Channel(
"from_A_to_B"),
487 fair::mq::Channel(
"from_A_to_C"),
491 bool copyByDefault =
true;
493 std::vector<ForwardRoute>
routes{
498 .channel =
"from_A_to_B",
505 .channel =
"from_A_to_C",
509 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
511 if (channel.GetName() == channelName) {
515 throw std::runtime_error(
"Channel not found");
518 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
520 std::vector<MessageSet> currentSetOfInputs;
523 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
524 fair::mq::MessagePtr payload(transport->CreateMessage());
525 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
527 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
528 REQUIRE(messageSet.
size() == 1);
529 currentSetOfInputs.emplace_back(std::move(messageSet));
532 REQUIRE(
result.size() == 2);
555 std::vector<fair::mq::Channel>
channels{
556 fair::mq::Channel(
"from_A_to_B"),
557 fair::mq::Channel(
"from_A_to_C"),
561 bool copyByDefault =
true;
563 std::vector<ForwardRoute>
routes{
568 .channel =
"from_A_to_B",
575 .channel =
"from_A_to_C",
579 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
581 if (channel.GetName() == channelName) {
585 throw std::runtime_error(
"Channel not found");
588 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
590 std::vector<MessageSet> currentSetOfInputs;
593 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
594 fair::mq::MessagePtr payload1(transport->CreateMessage());
595 fair::mq::MessagePtr payload2(transport->CreateMessage());
596 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
598 std::vector<std::unique_ptr<fair::mq::Message>> messages;
599 messages.push_back(std::move(header));
600 messages.push_back(std::move(payload1));
601 messages.push_back(std::move(payload2));
602 auto fillMessages = [&messages](
size_t t) -> fair::mq::MessagePtr {
603 return std::move(messages[t]);
605 messageSet.
add(fillMessages, 3);
607 PartRef part{std::move(header2), transport->CreateMessage()};
608 messageSet.
add(std::move(part));
610 REQUIRE(messageSet.
size() == 2);
611 currentSetOfInputs.emplace_back(std::move(messageSet));
614 REQUIRE(
result.size() == 2);
623 std::vector<fair::mq::Channel>
channels{
624 fair::mq::Channel(
"from_A_to_B")};
627 bool copyByDefault =
true;
633 .channel =
"from_A_to_B",
637 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
639 if (channel.GetName() == channelName) {
643 throw std::runtime_error(
"Channel not found");
646 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
648 std::vector<MessageSet> currentSetOfInputs;
651 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
652 fair::mq::MessagePtr payload(transport->CreateMessage());
653 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
655 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
656 REQUIRE(messageSet.
size() == 1);
657 currentSetOfInputs.emplace_back(std::move(messageSet));
660 REQUIRE(
result.size() == 1);
668 std::vector<fair::mq::Channel>
channels{
669 fair::mq::Channel(
"from_A_to_B")};
672 bool copyByDefault =
true;
678 .channel =
"from_A_to_B",
682 auto findChannelByName = [&
channels](std::string
const& channelName) -> fair::mq::Channel& {
684 if (channel.GetName() == channelName) {
688 throw std::runtime_error(
"Channel not found");
691 proxy.
bind({}, {},
routes, findChannelByName,
nullptr);
693 std::vector<MessageSet> currentSetOfInputs;
696 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
697 fair::mq::MessagePtr payload(transport->CreateMessage());
698 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
700 messageSet.
add(
PartRef{std::move(header), std::move(payload)});
701 REQUIRE(messageSet.
size() == 1);
702 currentSetOfInputs.emplace_back(std::move(messageSet));
705 REQUIRE(
result.size() == 1);
#define O2_DECLARE_DYNAMIC_LOG(name)
void bind(std::vector< OutputRoute > const &outputs, std::vector< InputRoute > const &inputs, std::vector< ForwardRoute > const &forwards, std::function< fair::mq::Channel &(std::string const &)> bindChannelByName, std::function< bool(void)> newStateRequestedCallback)
a BaseHeader with domain information from the source