Project
Loading...
Searching...
No Matches
test_ForwardInputs.cxx
Go to the documentation of this file.
1// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <catch_amalgamated.hpp>
13#include "Headers/DataHeader.h"
18#include "Framework/Signpost.h"
21#include "Headers/Stack.h"
23#include <fairmq/TransportFactory.h>
24#include <fairmq/Channel.h>
25#include <vector>
26
28using namespace o2::framework;
29
30TEST_CASE("ForwardInputsEmpty")
31{
33 dh.dataDescription = "CLUSTERS";
34 dh.dataOrigin = "TPC";
35 dh.subSpecification = 0;
36 dh.splitPayloadIndex = 0;
37 dh.splitPayloadParts = 1;
38
40 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
41
42 bool consume = true;
43 bool copyByDefault = true;
45
46 std::vector<MessageSet> currentSetOfInputs;
47
48 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
49 REQUIRE(result.empty());
50}
51
52TEST_CASE("ForwardInputsSingleMessageSingleRoute")
53{
55 dh.dataOrigin = "TST";
56 dh.dataDescription = "A";
57 dh.subSpecification = 0;
58 dh.splitPayloadIndex = 0;
59 dh.splitPayloadParts = 1;
60
62 std::vector<fair::mq::Channel> channels{
63 fair::mq::Channel("from_A_to_B")};
64
65 bool consume = true;
66 bool copyByDefault = true;
68 std::vector<ForwardRoute> routes{ForwardRoute{
69 .timeslice = 0,
70 .maxTimeslices = 1,
71 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
72 .channel = "from_A_to_B",
73 .policy = nullptr,
74 }};
75
76 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
77 for (auto& channel : channels) {
78 if (channel.GetName() == channelName) {
79 return channel;
80 }
81 }
82 throw std::runtime_error("Channel not found");
83 };
84
85 proxy.bind({}, {}, routes, findChannelByName, nullptr);
86
87 std::vector<MessageSet> currentSetOfInputs;
88 MessageSet messageSet;
89
90 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
91 fair::mq::MessagePtr payload(transport->CreateMessage());
92 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
93 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
94 messageSet.add(PartRef{std::move(header), std::move(payload)});
95 REQUIRE(messageSet.size() == 1);
96 currentSetOfInputs.emplace_back(std::move(messageSet));
97
98 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
99 REQUIRE(result.size() == 1); // One route
100 REQUIRE(result[0].Size() == 2); // Two messages for that route
101}
102
103TEST_CASE("ForwardInputsSingleMessageSingleRouteNoConsume")
104{
106 dh.dataOrigin = "TST";
107 dh.dataDescription = "A";
108 dh.subSpecification = 0;
109 dh.splitPayloadIndex = 0;
110 dh.splitPayloadParts = 1;
111
113 std::vector<fair::mq::Channel> channels{
114 fair::mq::Channel("from_A_to_B")};
115
116 bool copyByDefault = false;
117 FairMQDeviceProxy proxy;
118 std::vector<ForwardRoute> routes{ForwardRoute{
119 .timeslice = 0,
120 .maxTimeslices = 1,
121 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
122 .channel = "from_A_to_B",
123 .policy = nullptr,
124 }};
125
126 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
127 for (auto& channel : channels) {
128 if (channel.GetName() == channelName) {
129 return channel;
130 }
131 }
132 throw std::runtime_error("Channel not found");
133 };
134
135 proxy.bind({}, {}, routes, findChannelByName, nullptr);
136
137 std::vector<MessageSet> currentSetOfInputs;
138 MessageSet messageSet;
139
140 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
141 fair::mq::MessagePtr payload(nullptr);
142 REQUIRE(payload.get() == nullptr);
143 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
144 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
145 messageSet.add(PartRef{std::move(header), std::move(payload)});
146 REQUIRE(messageSet.size() == 1);
147 currentSetOfInputs.emplace_back(std::move(messageSet));
148
149 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, true);
150 REQUIRE(result.size() == 1);
151 REQUIRE(result[0].Size() == 0); // Because there is a nullptr, we do not forward this as it was already consumed.
152}
153
154TEST_CASE("ForwardInputsSingleMessageSingleRouteAtEOS")
155{
157 dh.dataOrigin = "TST";
158 dh.dataDescription = "A";
159 dh.subSpecification = 0;
160 dh.splitPayloadIndex = 0;
161 dh.splitPayloadParts = 1;
162
164
166
167 std::vector<fair::mq::Channel> channels{
168 fair::mq::Channel("from_A_to_B")};
169
170 bool consume = true;
171 bool copyByDefault = true;
172 FairMQDeviceProxy proxy;
173 std::vector<ForwardRoute> routes{ForwardRoute{
174 .timeslice = 0,
175 .maxTimeslices = 1,
176 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
177 .channel = "from_A_to_B",
178 .policy = nullptr,
179 }};
180
181 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
182 for (auto& channel : channels) {
183 if (channel.GetName() == channelName) {
184 return channel;
185 }
186 }
187 throw std::runtime_error("Channel not found");
188 };
189
190 proxy.bind({}, {}, routes, findChannelByName, nullptr);
191
192 std::vector<MessageSet> currentSetOfInputs;
193 MessageSet messageSet;
194
195 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
196 fair::mq::MessagePtr payload(transport->CreateMessage());
197 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
198 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph, sih});
199 REQUIRE(o2::header::get<SourceInfoHeader*>(header->GetData()));
200 messageSet.add(PartRef{std::move(header), std::move(payload)});
201 REQUIRE(messageSet.size() == 1);
202 currentSetOfInputs.emplace_back(std::move(messageSet));
203
204 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
205 REQUIRE(result.size() == 1); // One route
206 REQUIRE(result[0].Size() == 0); // FIXME: this is an actual error. It should be 2. However it cannot really happen.
207 // Correct behavior below:
208 // REQUIRE(result[0].Size() == 2);
209 // REQUIRE(o2::header::get<SourceInfoHeader*>(result[0].At(0)->GetData()) == nullptr);
210}
211
212TEST_CASE("ForwardInputsSingleMessageSingleRouteWithOldestPossible")
213{
215 dh.dataOrigin = "TST";
216 dh.dataDescription = "A";
217 dh.subSpecification = 0;
218 dh.splitPayloadIndex = 0;
219 dh.splitPayloadParts = 1;
220
222
224
225 std::vector<fair::mq::Channel> channels{
226 fair::mq::Channel("from_A_to_B")};
227
228 bool consume = true;
229 bool copyByDefault = true;
230 FairMQDeviceProxy proxy;
231 std::vector<ForwardRoute> routes{ForwardRoute{
232 .timeslice = 0,
233 .maxTimeslices = 1,
234 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
235 .channel = "from_A_to_B",
236 .policy = nullptr,
237 }};
238
239 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
240 for (auto& channel : channels) {
241 if (channel.GetName() == channelName) {
242 return channel;
243 }
244 }
245 throw std::runtime_error("Channel not found");
246 };
247
248 proxy.bind({}, {}, routes, findChannelByName, nullptr);
249
250 std::vector<MessageSet> currentSetOfInputs;
251 MessageSet messageSet;
252
253 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
254 fair::mq::MessagePtr payload(transport->CreateMessage());
255 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
256 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph, dih});
257 REQUIRE(o2::header::get<DomainInfoHeader*>(header->GetData()));
258 messageSet.add(PartRef{std::move(header), std::move(payload)});
259 REQUIRE(messageSet.size() == 1);
260 currentSetOfInputs.emplace_back(std::move(messageSet));
261
262 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
263 REQUIRE(result.size() == 1); // One route
264 REQUIRE(result[0].Size() == 0); // FIXME: this is actually wrong
265 // FIXME: actually correct behavior below
266 // REQUIRE(result[0].Size() == 2); // Two messages
267 // REQUIRE(o2::header::get<DomainInfoHeader*>(result[0].At(0)->GetData()) == nullptr); // it should not have the end of stream
268}
269
270TEST_CASE("ForwardInputsSingleMessageMultipleRoutes")
271{
273 dh.dataOrigin = "TST";
274 dh.dataDescription = "A";
275 dh.subSpecification = 0;
276 dh.splitPayloadIndex = 0;
277 dh.splitPayloadParts = 1;
278
280
281 std::vector<fair::mq::Channel> channels{
282 fair::mq::Channel("from_A_to_B"),
283 fair::mq::Channel("from_A_to_C"),
284 };
285
286 bool consume = true;
287 bool copyByDefault = true;
288 FairMQDeviceProxy proxy;
289 std::vector<ForwardRoute> routes{
291 .timeslice = 0,
292 .maxTimeslices = 1,
293 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
294 .channel = "from_A_to_B",
295 .policy = nullptr,
296 },
298 .timeslice = 0,
299 .maxTimeslices = 1,
300 .matcher = {"binding2", ConcreteDataMatcher{"TST", "A", 0}},
301 .channel = "from_A_to_C",
302 .policy = nullptr,
303 }};
304
305 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
306 for (auto& channel : channels) {
307 if (channel.GetName() == channelName) {
308 return channel;
309 }
310 }
311 throw std::runtime_error("Channel not found");
312 };
313
314 proxy.bind({}, {}, routes, findChannelByName, nullptr);
315
316 std::vector<MessageSet> currentSetOfInputs;
317 MessageSet messageSet;
318
319 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
320 fair::mq::MessagePtr payload(transport->CreateMessage());
321 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
322 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
323 messageSet.add(PartRef{std::move(header), std::move(payload)});
324 REQUIRE(messageSet.size() == 1);
325 currentSetOfInputs.emplace_back(std::move(messageSet));
326
327 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
328 REQUIRE(result.size() == 2); // Two routes
329 REQUIRE(result[0].Size() == 2); // Two messages per route
330 REQUIRE(result[1].Size() == 0); // Only the first DPL matched channel matters
331}
332
333TEST_CASE("ForwardInputsSingleMessageMultipleRoutesExternals")
334{
336 dh.dataOrigin = "TST";
337 dh.dataDescription = "A";
338 dh.subSpecification = 0;
339 dh.splitPayloadIndex = 0;
340 dh.splitPayloadParts = 1;
341
343
344 std::vector<fair::mq::Channel> channels{
345 fair::mq::Channel("external"),
346 fair::mq::Channel("from_A_to_C"),
347 };
348
349 bool consume = true;
350 bool copyByDefault = true;
351 FairMQDeviceProxy proxy;
352 std::vector<ForwardRoute> routes{
354 .timeslice = 0,
355 .maxTimeslices = 1,
356 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
357 .channel = "external",
358 .policy = nullptr,
359 },
361 .timeslice = 0,
362 .maxTimeslices = 1,
363 .matcher = {"binding2", ConcreteDataMatcher{"TST", "A", 0}},
364 .channel = "from_A_to_C",
365 .policy = nullptr,
366 }};
367
368 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
369 for (auto& channel : channels) {
370 if (channel.GetName() == channelName) {
371 return channel;
372 }
373 }
374 throw std::runtime_error("Channel not found");
375 };
376
377 proxy.bind({}, {}, routes, findChannelByName, nullptr);
378
379 std::vector<MessageSet> currentSetOfInputs;
380 MessageSet messageSet;
381
382 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
383 fair::mq::MessagePtr payload(transport->CreateMessage());
384 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
385 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
386 messageSet.add(PartRef{std::move(header), std::move(payload)});
387 REQUIRE(messageSet.size() == 1);
388 currentSetOfInputs.emplace_back(std::move(messageSet));
389
390 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
391 REQUIRE(result.size() == 2); // Two routes
392 REQUIRE(result[0].Size() == 2); // With external matching channels, we need to copy and then forward
393 REQUIRE(result[1].Size() == 2); //
394}
395
396TEST_CASE("ForwardInputsMultiMessageMultipleRoutes")
397{
399 dh1.dataOrigin = "TST";
400 dh1.dataDescription = "A";
401 dh1.subSpecification = 0;
402 dh1.splitPayloadIndex = 0;
403 dh1.splitPayloadParts = 1;
404
406 dh2.dataOrigin = "TST";
407 dh2.dataDescription = "B";
408 dh2.subSpecification = 0;
409 dh2.splitPayloadIndex = 0;
410 dh2.splitPayloadParts = 1;
411
413
414 std::vector<fair::mq::Channel> channels{
415 fair::mq::Channel("from_A_to_B"),
416 fair::mq::Channel("from_A_to_C"),
417 };
418
419 bool consume = true;
420 bool copyByDefault = true;
421 FairMQDeviceProxy proxy;
422 std::vector<ForwardRoute> routes{
424 .timeslice = 0,
425 .maxTimeslices = 1,
426 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
427 .channel = "from_A_to_B",
428 .policy = nullptr,
429 },
431 .timeslice = 0,
432 .maxTimeslices = 1,
433 .matcher = {"binding2", ConcreteDataMatcher{"TST", "B", 0}},
434 .channel = "from_A_to_C",
435 .policy = nullptr,
436 }};
437
438 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
439 for (auto& channel : channels) {
440 if (channel.GetName() == channelName) {
441 return channel;
442 }
443 }
444 throw std::runtime_error("Channel not found");
445 };
446
447 proxy.bind({}, {}, routes, findChannelByName, nullptr);
448
449 std::vector<MessageSet> currentSetOfInputs;
450
451 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
452 fair::mq::MessagePtr payload1(transport->CreateMessage());
453 fair::mq::MessagePtr payload2(transport->CreateMessage());
454 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
455 auto header1 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh1, dph});
456 MessageSet messageSet1;
457 messageSet1.add(PartRef{std::move(header1), std::move(payload1)});
458 REQUIRE(messageSet1.size() == 1);
459
460 auto header2 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh2, dph});
461 MessageSet messageSet2;
462 messageSet2.add(PartRef{std::move(header2), std::move(payload2)});
463 REQUIRE(messageSet2.size() == 1);
464 currentSetOfInputs.emplace_back(std::move(messageSet1));
465 currentSetOfInputs.emplace_back(std::move(messageSet2));
466 REQUIRE(currentSetOfInputs.size() == 2);
467
468 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
469 REQUIRE(result.size() == 2); // Two routes
470 REQUIRE(result[0].Size() == 2); //
471 REQUIRE(result[1].Size() == 2); //
472}
473
474TEST_CASE("ForwardInputsSingleMessageMultipleRoutesOnlyOneMatches")
475{
477 dh.dataOrigin = "TST";
478 dh.dataDescription = "A";
479 dh.subSpecification = 0;
480 dh.splitPayloadIndex = 0;
481 dh.splitPayloadParts = 1;
482
484
485 std::vector<fair::mq::Channel> channels{
486 fair::mq::Channel("from_A_to_B"),
487 fair::mq::Channel("from_A_to_C"),
488 };
489
490 bool consume = true;
491 bool copyByDefault = true;
492 FairMQDeviceProxy proxy;
493 std::vector<ForwardRoute> routes{
495 .timeslice = 0,
496 .maxTimeslices = 1,
497 .matcher = {"binding", ConcreteDataMatcher{"TST", "B", 0}},
498 .channel = "from_A_to_B",
499 .policy = nullptr,
500 },
502 .timeslice = 0,
503 .maxTimeslices = 1,
504 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
505 .channel = "from_A_to_C",
506 .policy = nullptr,
507 }};
508
509 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
510 for (auto& channel : channels) {
511 if (channel.GetName() == channelName) {
512 return channel;
513 }
514 }
515 throw std::runtime_error("Channel not found");
516 };
517
518 proxy.bind({}, {}, routes, findChannelByName, nullptr);
519
520 std::vector<MessageSet> currentSetOfInputs;
521 MessageSet messageSet;
522
523 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
524 fair::mq::MessagePtr payload(transport->CreateMessage());
525 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
526 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
527 messageSet.add(PartRef{std::move(header), std::move(payload)});
528 REQUIRE(messageSet.size() == 1);
529 currentSetOfInputs.emplace_back(std::move(messageSet));
530
531 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
532 REQUIRE(result.size() == 2); // Two routes
533 REQUIRE(result[0].Size() == 0); // Two messages per route
534 REQUIRE(result[1].Size() == 2); // Two messages per route
535}
536
537TEST_CASE("ForwardInputsSplitPayload")
538{
540 dh.dataOrigin = "TST";
541 dh.dataDescription = "A";
542 dh.subSpecification = 0;
543 dh.splitPayloadIndex = 2;
544 dh.splitPayloadParts = 2;
545
547 dh2.dataOrigin = "TST";
548 dh2.dataDescription = "B";
549 dh2.subSpecification = 0;
550 dh2.splitPayloadIndex = 0;
551 dh2.splitPayloadParts = 1;
552
554
555 std::vector<fair::mq::Channel> channels{
556 fair::mq::Channel("from_A_to_B"),
557 fair::mq::Channel("from_A_to_C"),
558 };
559
560 bool consume = true;
561 bool copyByDefault = true;
562 FairMQDeviceProxy proxy;
563 std::vector<ForwardRoute> routes{
565 .timeslice = 0,
566 .maxTimeslices = 1,
567 .matcher = {"binding", ConcreteDataMatcher{"TST", "B", 0}},
568 .channel = "from_A_to_B",
569 .policy = nullptr,
570 },
572 .timeslice = 0,
573 .maxTimeslices = 1,
574 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
575 .channel = "from_A_to_C",
576 .policy = nullptr,
577 }};
578
579 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
580 for (auto& channel : channels) {
581 if (channel.GetName() == channelName) {
582 return channel;
583 }
584 }
585 throw std::runtime_error("Channel not found");
586 };
587
588 proxy.bind({}, {}, routes, findChannelByName, nullptr);
589
590 std::vector<MessageSet> currentSetOfInputs;
591 MessageSet messageSet;
592
593 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
594 fair::mq::MessagePtr payload1(transport->CreateMessage());
595 fair::mq::MessagePtr payload2(transport->CreateMessage());
596 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
597 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
598 std::vector<std::unique_ptr<fair::mq::Message>> messages;
599 messages.push_back(std::move(header));
600 messages.push_back(std::move(payload1));
601 messages.push_back(std::move(payload2));
602 auto fillMessages = [&messages](size_t t) -> fair::mq::MessagePtr {
603 return std::move(messages[t]);
604 };
605 messageSet.add(fillMessages, 3);
606 auto header2 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh2, dph});
607 PartRef part{std::move(header2), transport->CreateMessage()};
608 messageSet.add(std::move(part));
609
610 REQUIRE(messageSet.size() == 2);
611 currentSetOfInputs.emplace_back(std::move(messageSet));
612
613 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
614 REQUIRE(result.size() == 2); // Two routes
615 CHECK(result[0].Size() == 2); // No messages on this route
616 CHECK(result[1].Size() == 3);
617}
618
619TEST_CASE("ForwardInputEOSSingleRoute")
620{
622
623 std::vector<fair::mq::Channel> channels{
624 fair::mq::Channel("from_A_to_B")};
625
626 bool consume = true;
627 bool copyByDefault = true;
628 FairMQDeviceProxy proxy;
629 std::vector<ForwardRoute> routes{ForwardRoute{
630 .timeslice = 0,
631 .maxTimeslices = 1,
632 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
633 .channel = "from_A_to_B",
634 .policy = nullptr,
635 }};
636
637 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
638 for (auto& channel : channels) {
639 if (channel.GetName() == channelName) {
640 return channel;
641 }
642 }
643 throw std::runtime_error("Channel not found");
644 };
645
646 proxy.bind({}, {}, routes, findChannelByName, nullptr);
647
648 std::vector<MessageSet> currentSetOfInputs;
649 MessageSet messageSet;
650
651 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
652 fair::mq::MessagePtr payload(transport->CreateMessage());
653 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
654 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, sih});
655 messageSet.add(PartRef{std::move(header), std::move(payload)});
656 REQUIRE(messageSet.size() == 1);
657 currentSetOfInputs.emplace_back(std::move(messageSet));
658
659 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
660 REQUIRE(result.size() == 1); // One route
661 REQUIRE(result[0].Size() == 0); // Oldest possible timeframe should not be forwarded
662}
663
664TEST_CASE("ForwardInputOldestPossibleSingleRoute")
665{
667
668 std::vector<fair::mq::Channel> channels{
669 fair::mq::Channel("from_A_to_B")};
670
671 bool consume = true;
672 bool copyByDefault = true;
673 FairMQDeviceProxy proxy;
674 std::vector<ForwardRoute> routes{ForwardRoute{
675 .timeslice = 0,
676 .maxTimeslices = 1,
677 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
678 .channel = "from_A_to_B",
679 .policy = nullptr,
680 }};
681
682 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
683 for (auto& channel : channels) {
684 if (channel.GetName() == channelName) {
685 return channel;
686 }
687 }
688 throw std::runtime_error("Channel not found");
689 };
690
691 proxy.bind({}, {}, routes, findChannelByName, nullptr);
692
693 std::vector<MessageSet> currentSetOfInputs;
694 MessageSet messageSet;
695
696 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
697 fair::mq::MessagePtr payload(transport->CreateMessage());
698 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
699 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dih});
700 messageSet.add(PartRef{std::move(header), std::move(payload)});
701 REQUIRE(messageSet.size() == 1);
702 currentSetOfInputs.emplace_back(std::move(messageSet));
703
704 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessageSet(proxy, currentSetOfInputs, copyByDefault, consume);
705 REQUIRE(result.size() == 1); // One route
706 REQUIRE(result[0].Size() == 0); // Oldest possible timeframe should not be forwarded
707}
std::vector< OutputRoute > routes
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
void bind(std::vector< OutputRoute > const &outputs, std::vector< InputRoute > const &inputs, std::vector< ForwardRoute > const &forwards, std::function< fair::mq::Channel &(std::string const &)> bindChannelByName, std::function< bool(void)> newStateRequestedCallback)
#define CHECK
GLuint64EXT * result
Definition glcorearb.h:5662
Defining PrimaryVertex explicitly as messageable.
TEST_CASE("test_prepareArguments")
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
static std::vector< fair::mq::Parts > routeForwardedMessageSet(FairMQDeviceProxy &proxy, std::vector< MessageSet > &currentSetOfInputs, bool copy, bool consume)
Helper to route messages for forwarding.
a BaseHeader with domain information from the source
size_t size() const
get number of in-flight O2 messages
Definition MessageSet.h:86
void add(PartRef &&ref)
Definition MessageSet.h:123
Reference to an inflight part.
Definition PartRef.h:24
a BaseHeader with state information from the source
the main header struct
Definition DataHeader.h:619
SplitPayloadPartsType splitPayloadParts
Definition DataHeader.h:647
DataDescription dataDescription
Definition DataHeader.h:637
SubSpecificationType subSpecification
Definition DataHeader.h:657
SplitPayloadIndexType splitPayloadIndex
Definition DataHeader.h:662
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:33
std::vector< ChannelData > channels