Project
Loading...
Searching...
No Matches
test_ForwardInputs.cxx
Go to the documentation of this file.
1// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <catch_amalgamated.hpp>
13#include "Headers/DataHeader.h"
20#include "Framework/Signpost.h"
23#include "Headers/Stack.h"
25#include <fairmq/TransportFactory.h>
26#include <fairmq/Channel.h>
27#include <vector>
28
30using namespace o2::framework;
31
32TEST_CASE("ForwardInputsEmpty")
33{
35 dh.dataDescription = "CLUSTERS";
36 dh.dataOrigin = "TPC";
37 dh.subSpecification = 0;
38 dh.splitPayloadIndex = 0;
39 dh.splitPayloadParts = 1;
40
42 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
43
44 bool consume = true;
45 bool copyByDefault = true;
47
48 TimesliceIndex::OldestOutputInfo oldestTimeslice{.timeslice = {1}};
49 std::vector<MessageSet> currentSetOfInputs;
50 TimesliceSlot slot{0};
51
52 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume);
53 REQUIRE(result.empty());
54}
55
56TEST_CASE("ForwardInputsSingleMessageSingleRoute")
57{
59 dh.dataOrigin = "TST";
60 dh.dataDescription = "A";
61 dh.subSpecification = 0;
62 dh.splitPayloadIndex = 0;
63 dh.splitPayloadParts = 1;
64
66 std::vector<fair::mq::Channel> channels{
67 fair::mq::Channel("from_A_to_B")};
68
69 bool consume = true;
70 bool copyByDefault = true;
72 std::vector<ForwardRoute> routes{ForwardRoute{
73 .timeslice = 0,
74 .maxTimeslices = 1,
75 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
76 .channel = "from_A_to_B",
77 .policy = nullptr,
78 }};
79
80 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
81 for (auto& channel : channels) {
82 if (channel.GetName() == channelName) {
83 return channel;
84 }
85 }
86 throw std::runtime_error("Channel not found");
87 };
88
89 proxy.bind({}, {}, routes, findChannelByName, nullptr);
90
91 TimesliceIndex::OldestOutputInfo oldestTimeslice{.timeslice = {0}};
92 std::vector<MessageSet> currentSetOfInputs;
93 MessageSet messageSet;
94
95 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
96 fair::mq::MessagePtr payload(transport->CreateMessage());
97 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
98 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
99 messageSet.add(PartRef{std::move(header), std::move(payload)});
100 REQUIRE(messageSet.size() == 1);
101 currentSetOfInputs.emplace_back(std::move(messageSet));
102
103 TimesliceSlot slot{0};
104
105 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume);
106 REQUIRE(result.size() == 1); // One route
107 REQUIRE(result[0].Size() == 2); // Two messages for that route
108}
109
110TEST_CASE("ForwardInputsSingleMessageSingleRouteNoConsume")
111{
113 dh.dataOrigin = "TST";
114 dh.dataDescription = "A";
115 dh.subSpecification = 0;
116 dh.splitPayloadIndex = 0;
117 dh.splitPayloadParts = 1;
118
120 std::vector<fair::mq::Channel> channels{
121 fair::mq::Channel("from_A_to_B")};
122
123 bool copyByDefault = false;
124 FairMQDeviceProxy proxy;
125 std::vector<ForwardRoute> routes{ForwardRoute{
126 .timeslice = 0,
127 .maxTimeslices = 1,
128 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
129 .channel = "from_A_to_B",
130 .policy = nullptr,
131 }};
132
133 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
134 for (auto& channel : channels) {
135 if (channel.GetName() == channelName) {
136 return channel;
137 }
138 }
139 throw std::runtime_error("Channel not found");
140 };
141
142 proxy.bind({}, {}, routes, findChannelByName, nullptr);
143
144 TimesliceIndex::OldestOutputInfo oldestTimeslice{.timeslice = {0}};
145 std::vector<MessageSet> currentSetOfInputs;
146 MessageSet messageSet;
147
148 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
149 fair::mq::MessagePtr payload(nullptr);
150 REQUIRE(payload.get() == nullptr);
151 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
152 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
153 messageSet.add(PartRef{std::move(header), std::move(payload)});
154 REQUIRE(messageSet.size() == 1);
155 currentSetOfInputs.emplace_back(std::move(messageSet));
156
157 TimesliceSlot slot{0};
158
159 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, true);
160 REQUIRE(result.size() == 1);
161 REQUIRE(result[0].Size() == 0); // Because there is a nullptr, we do not forward this as it was already consumed.
162}
163
164TEST_CASE("ForwardInputsSingleMessageSingleRouteAtEOS")
165{
167 dh.dataOrigin = "TST";
168 dh.dataDescription = "A";
169 dh.subSpecification = 0;
170 dh.splitPayloadIndex = 0;
171 dh.splitPayloadParts = 1;
172
174
176
177 std::vector<fair::mq::Channel> channels{
178 fair::mq::Channel("from_A_to_B")};
179
180 bool consume = true;
181 bool copyByDefault = true;
182 FairMQDeviceProxy proxy;
183 std::vector<ForwardRoute> routes{ForwardRoute{
184 .timeslice = 0,
185 .maxTimeslices = 1,
186 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
187 .channel = "from_A_to_B",
188 .policy = nullptr,
189 }};
190
191 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
192 for (auto& channel : channels) {
193 if (channel.GetName() == channelName) {
194 return channel;
195 }
196 }
197 throw std::runtime_error("Channel not found");
198 };
199
200 proxy.bind({}, {}, routes, findChannelByName, nullptr);
201
202 TimesliceIndex::OldestOutputInfo oldestTimeslice{.timeslice = {0}};
203 std::vector<MessageSet> currentSetOfInputs;
204 MessageSet messageSet;
205
206 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
207 fair::mq::MessagePtr payload(transport->CreateMessage());
208 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
209 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph, sih});
210 REQUIRE(o2::header::get<SourceInfoHeader*>(header->GetData()));
211 messageSet.add(PartRef{std::move(header), std::move(payload)});
212 REQUIRE(messageSet.size() == 1);
213 currentSetOfInputs.emplace_back(std::move(messageSet));
214
215 TimesliceSlot slot{0};
216
217 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume);
218 REQUIRE(result.size() == 1); // One route
219 REQUIRE(result[0].Size() == 0); // FIXME: this is an actual error. It should be 2
220 // Correct behavior below:
221 // REQUIRE(result[0].Size() == 2);
222 // REQUIRE(o2::header::get<SourceInfoHeader*>(result[0].At(0)->GetData()) == nullptr);
223}
224
225TEST_CASE("ForwardInputsSingleMessageSingleRouteWithOldestPossible")
226{
228 dh.dataOrigin = "TST";
229 dh.dataDescription = "A";
230 dh.subSpecification = 0;
231 dh.splitPayloadIndex = 0;
232 dh.splitPayloadParts = 1;
233
235
237
238 std::vector<fair::mq::Channel> channels{
239 fair::mq::Channel("from_A_to_B")};
240
241 bool consume = true;
242 bool copyByDefault = true;
243 FairMQDeviceProxy proxy;
244 std::vector<ForwardRoute> routes{ForwardRoute{
245 .timeslice = 0,
246 .maxTimeslices = 1,
247 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
248 .channel = "from_A_to_B",
249 .policy = nullptr,
250 }};
251
252 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
253 for (auto& channel : channels) {
254 if (channel.GetName() == channelName) {
255 return channel;
256 }
257 }
258 throw std::runtime_error("Channel not found");
259 };
260
261 proxy.bind({}, {}, routes, findChannelByName, nullptr);
262
263 TimesliceIndex::OldestOutputInfo oldestTimeslice{.timeslice = {0}};
264 std::vector<MessageSet> currentSetOfInputs;
265 MessageSet messageSet;
266
267 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
268 fair::mq::MessagePtr payload(transport->CreateMessage());
269 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
270 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph, dih});
271 REQUIRE(o2::header::get<DomainInfoHeader*>(header->GetData()));
272 messageSet.add(PartRef{std::move(header), std::move(payload)});
273 REQUIRE(messageSet.size() == 1);
274 currentSetOfInputs.emplace_back(std::move(messageSet));
275
276 TimesliceSlot slot{0};
277
278 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume);
279 REQUIRE(result.size() == 1); // One route
280 REQUIRE(result[0].Size() == 0); // FIXME: this is actually wrong
281 // FIXME: actually correct behavior below
282 // REQUIRE(result[0].Size() == 2); // Two messages
283 // REQUIRE(o2::header::get<DomainInfoHeader*>(result[0].At(0)->GetData()) == nullptr); // it should not have the end of stream
284}
285
286TEST_CASE("ForwardInputsSingleMessageMultipleRoutes")
287{
289 dh.dataOrigin = "TST";
290 dh.dataDescription = "A";
291 dh.subSpecification = 0;
292 dh.splitPayloadIndex = 0;
293 dh.splitPayloadParts = 1;
294
296
297 std::vector<fair::mq::Channel> channels{
298 fair::mq::Channel("from_A_to_B"),
299 fair::mq::Channel("from_A_to_C"),
300 };
301
302 bool consume = true;
303 bool copyByDefault = true;
304 FairMQDeviceProxy proxy;
305 std::vector<ForwardRoute> routes{
307 .timeslice = 0,
308 .maxTimeslices = 1,
309 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
310 .channel = "from_A_to_B",
311 .policy = nullptr,
312 },
314 .timeslice = 0,
315 .maxTimeslices = 1,
316 .matcher = {"binding2", ConcreteDataMatcher{"TST", "A", 0}},
317 .channel = "from_A_to_C",
318 .policy = nullptr,
319 }};
320
321 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
322 for (auto& channel : channels) {
323 if (channel.GetName() == channelName) {
324 return channel;
325 }
326 }
327 throw std::runtime_error("Channel not found");
328 };
329
330 proxy.bind({}, {}, routes, findChannelByName, nullptr);
331
332 TimesliceIndex::OldestOutputInfo oldestTimeslice{.timeslice = {0}};
333 std::vector<MessageSet> currentSetOfInputs;
334 MessageSet messageSet;
335
336 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
337 fair::mq::MessagePtr payload(transport->CreateMessage());
338 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
339 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
340 messageSet.add(PartRef{std::move(header), std::move(payload)});
341 REQUIRE(messageSet.size() == 1);
342 currentSetOfInputs.emplace_back(std::move(messageSet));
343
344 TimesliceSlot slot{0};
345
346 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume);
347 REQUIRE(result.size() == 2); // Two routes
348 REQUIRE(result[0].Size() == 2); // Two messages per route
349 REQUIRE(result[1].Size() == 0); // Only the first DPL matched channel matters
350}
351
352TEST_CASE("ForwardInputsSingleMessageMultipleRoutesExternals")
353{
355 dh.dataOrigin = "TST";
356 dh.dataDescription = "A";
357 dh.subSpecification = 0;
358 dh.splitPayloadIndex = 0;
359 dh.splitPayloadParts = 1;
360
362
363 std::vector<fair::mq::Channel> channels{
364 fair::mq::Channel("external"),
365 fair::mq::Channel("from_A_to_C"),
366 };
367
368 bool consume = true;
369 bool copyByDefault = true;
370 FairMQDeviceProxy proxy;
371 std::vector<ForwardRoute> routes{
373 .timeslice = 0,
374 .maxTimeslices = 1,
375 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
376 .channel = "external",
377 .policy = nullptr,
378 },
380 .timeslice = 0,
381 .maxTimeslices = 1,
382 .matcher = {"binding2", ConcreteDataMatcher{"TST", "A", 0}},
383 .channel = "from_A_to_C",
384 .policy = nullptr,
385 }};
386
387 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
388 for (auto& channel : channels) {
389 if (channel.GetName() == channelName) {
390 return channel;
391 }
392 }
393 throw std::runtime_error("Channel not found");
394 };
395
396 proxy.bind({}, {}, routes, findChannelByName, nullptr);
397
398 TimesliceIndex::OldestOutputInfo oldestTimeslice{.timeslice = {0}};
399 std::vector<MessageSet> currentSetOfInputs;
400 MessageSet messageSet;
401
402 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
403 fair::mq::MessagePtr payload(transport->CreateMessage());
404 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
405 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
406 messageSet.add(PartRef{std::move(header), std::move(payload)});
407 REQUIRE(messageSet.size() == 1);
408 currentSetOfInputs.emplace_back(std::move(messageSet));
409
410 TimesliceSlot slot{0};
411
412 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume);
413 REQUIRE(result.size() == 2); // Two routes
414 REQUIRE(result[0].Size() == 2); // With external matching channels, we need to copy and then forward
415 REQUIRE(result[1].Size() == 2); //
416}
417
418TEST_CASE("ForwardInputsMultiMessageMultipleRoutes")
419{
421 dh1.dataOrigin = "TST";
422 dh1.dataDescription = "A";
423 dh1.subSpecification = 0;
424 dh1.splitPayloadIndex = 0;
425 dh1.splitPayloadParts = 1;
426
428 dh2.dataOrigin = "TST";
429 dh2.dataDescription = "B";
430 dh2.subSpecification = 0;
431 dh2.splitPayloadIndex = 0;
432 dh2.splitPayloadParts = 1;
433
435
436 std::vector<fair::mq::Channel> channels{
437 fair::mq::Channel("from_A_to_B"),
438 fair::mq::Channel("from_A_to_C"),
439 };
440
441 bool consume = true;
442 bool copyByDefault = true;
443 FairMQDeviceProxy proxy;
444 std::vector<ForwardRoute> routes{
446 .timeslice = 0,
447 .maxTimeslices = 1,
448 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
449 .channel = "from_A_to_B",
450 .policy = nullptr,
451 },
453 .timeslice = 0,
454 .maxTimeslices = 1,
455 .matcher = {"binding2", ConcreteDataMatcher{"TST", "B", 0}},
456 .channel = "from_A_to_C",
457 .policy = nullptr,
458 }};
459
460 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
461 for (auto& channel : channels) {
462 if (channel.GetName() == channelName) {
463 return channel;
464 }
465 }
466 throw std::runtime_error("Channel not found");
467 };
468
469 proxy.bind({}, {}, routes, findChannelByName, nullptr);
470
471 TimesliceIndex::OldestOutputInfo oldestTimeslice{.timeslice = {0}};
472 std::vector<MessageSet> currentSetOfInputs;
473
474 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
475 fair::mq::MessagePtr payload1(transport->CreateMessage());
476 fair::mq::MessagePtr payload2(transport->CreateMessage());
477 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
478 auto header1 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh1, dph});
479 MessageSet messageSet1;
480 messageSet1.add(PartRef{std::move(header1), std::move(payload1)});
481 REQUIRE(messageSet1.size() == 1);
482
483 auto header2 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh2, dph});
484 MessageSet messageSet2;
485 messageSet2.add(PartRef{std::move(header2), std::move(payload2)});
486 REQUIRE(messageSet2.size() == 1);
487 currentSetOfInputs.emplace_back(std::move(messageSet1));
488 currentSetOfInputs.emplace_back(std::move(messageSet2));
489 REQUIRE(currentSetOfInputs.size() == 2);
490
491 TimesliceSlot slot{0};
492
493 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume);
494 REQUIRE(result.size() == 2); // Two routes
495 REQUIRE(result[0].Size() == 2); //
496 REQUIRE(result[1].Size() == 2); //
497}
498
499TEST_CASE("ForwardInputsSingleMessageMultipleRoutesOnlyOneMatches")
500{
502 dh.dataOrigin = "TST";
503 dh.dataDescription = "A";
504 dh.subSpecification = 0;
505 dh.splitPayloadIndex = 0;
506 dh.splitPayloadParts = 1;
507
509
510 std::vector<fair::mq::Channel> channels{
511 fair::mq::Channel("from_A_to_B"),
512 fair::mq::Channel("from_A_to_C"),
513 };
514
515 bool consume = true;
516 bool copyByDefault = true;
517 FairMQDeviceProxy proxy;
518 std::vector<ForwardRoute> routes{
520 .timeslice = 0,
521 .maxTimeslices = 1,
522 .matcher = {"binding", ConcreteDataMatcher{"TST", "B", 0}},
523 .channel = "from_A_to_B",
524 .policy = nullptr,
525 },
527 .timeslice = 0,
528 .maxTimeslices = 1,
529 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
530 .channel = "from_A_to_C",
531 .policy = nullptr,
532 }};
533
534 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
535 for (auto& channel : channels) {
536 if (channel.GetName() == channelName) {
537 return channel;
538 }
539 }
540 throw std::runtime_error("Channel not found");
541 };
542
543 proxy.bind({}, {}, routes, findChannelByName, nullptr);
544
545 TimesliceIndex::OldestOutputInfo oldestTimeslice{.timeslice = {0}};
546 std::vector<MessageSet> currentSetOfInputs;
547 MessageSet messageSet;
548
549 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
550 fair::mq::MessagePtr payload(transport->CreateMessage());
551 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
552 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
553 messageSet.add(PartRef{std::move(header), std::move(payload)});
554 REQUIRE(messageSet.size() == 1);
555 currentSetOfInputs.emplace_back(std::move(messageSet));
556
557 TimesliceSlot slot{0};
558
559 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume);
560 REQUIRE(result.size() == 2); // Two routes
561 REQUIRE(result[0].Size() == 0); // Two messages per route
562 REQUIRE(result[1].Size() == 2); // Two messages per route
563}
564
565TEST_CASE("ForwardInputsSplitPayload")
566{
568 dh.dataOrigin = "TST";
569 dh.dataDescription = "A";
570 dh.subSpecification = 0;
571 dh.splitPayloadIndex = 0;
572 dh.splitPayloadParts = 2;
573
575 dh2.dataOrigin = "TST";
576 dh2.dataDescription = "B";
577 dh2.subSpecification = 0;
578 dh2.splitPayloadIndex = 0;
579 dh2.splitPayloadParts = 1;
580
582
583 std::vector<fair::mq::Channel> channels{
584 fair::mq::Channel("from_A_to_B"),
585 fair::mq::Channel("from_A_to_C"),
586 };
587
588 bool consume = true;
589 bool copyByDefault = true;
590 FairMQDeviceProxy proxy;
591 std::vector<ForwardRoute> routes{
593 .timeslice = 0,
594 .maxTimeslices = 1,
595 .matcher = {"binding", ConcreteDataMatcher{"TST", "B", 0}},
596 .channel = "from_A_to_B",
597 .policy = nullptr,
598 },
600 .timeslice = 0,
601 .maxTimeslices = 1,
602 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
603 .channel = "from_A_to_C",
604 .policy = nullptr,
605 }};
606
607 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
608 for (auto& channel : channels) {
609 if (channel.GetName() == channelName) {
610 return channel;
611 }
612 }
613 throw std::runtime_error("Channel not found");
614 };
615
616 proxy.bind({}, {}, routes, findChannelByName, nullptr);
617
618 TimesliceIndex::OldestOutputInfo oldestTimeslice{.timeslice = {0}};
619 std::vector<MessageSet> currentSetOfInputs;
620 MessageSet messageSet;
621
622 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
623 fair::mq::MessagePtr payload1(transport->CreateMessage());
624 fair::mq::MessagePtr payload2(transport->CreateMessage());
625 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
626 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
627 std::vector<std::unique_ptr<fair::mq::Message>> messages;
628 messages.push_back(std::move(header));
629 messages.push_back(std::move(payload1));
630 messages.push_back(std::move(payload2));
631 auto fillMessages = [&messages](size_t t) -> fair::mq::MessagePtr {
632 return std::move(messages[t]);
633 };
634 messageSet.add(fillMessages, 3);
635 auto header2 = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh2, dph});
636 PartRef part{std::move(header2), transport->CreateMessage()};
637 messageSet.add(std::move(part));
638
639 REQUIRE(messageSet.size() == 2);
640 currentSetOfInputs.emplace_back(std::move(messageSet));
641
642 TimesliceSlot slot{0};
643
644 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume);
645 REQUIRE(result.size() == 2); // Two routes
646 CHECK(result[0].Size() == 2); // No messages on this route
647 CHECK(result[1].Size() == 5); // FIXME: Multipart matching has side effects also for the elements
648 // CHECK(result[1].Size() == 3); // FIXME: the correct forwarding is that only the multipart goes to the same route
649}
650
651TEST_CASE("ForwardInputEOSSingleRoute")
652{
654
655 std::vector<fair::mq::Channel> channels{
656 fair::mq::Channel("from_A_to_B")};
657
658 bool consume = true;
659 bool copyByDefault = true;
660 FairMQDeviceProxy proxy;
661 std::vector<ForwardRoute> routes{ForwardRoute{
662 .timeslice = 0,
663 .maxTimeslices = 1,
664 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
665 .channel = "from_A_to_B",
666 .policy = nullptr,
667 }};
668
669 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
670 for (auto& channel : channels) {
671 if (channel.GetName() == channelName) {
672 return channel;
673 }
674 }
675 throw std::runtime_error("Channel not found");
676 };
677
678 proxy.bind({}, {}, routes, findChannelByName, nullptr);
679
680 TimesliceIndex::OldestOutputInfo oldestTimeslice{.timeslice = {0}};
681 std::vector<MessageSet> currentSetOfInputs;
682 MessageSet messageSet;
683
684 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
685 fair::mq::MessagePtr payload(transport->CreateMessage());
686 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
687 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, sih});
688 messageSet.add(PartRef{std::move(header), std::move(payload)});
689 REQUIRE(messageSet.size() == 1);
690 currentSetOfInputs.emplace_back(std::move(messageSet));
691
692 TimesliceSlot slot{0};
693
694 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume);
695 REQUIRE(result.size() == 1); // One route
696 REQUIRE(result[0].Size() == 0); // Oldest possible timeframe should not be forwarded
697}
698
699TEST_CASE("ForwardInputOldestPossibleSingleRoute")
700{
702
703 std::vector<fair::mq::Channel> channels{
704 fair::mq::Channel("from_A_to_B")};
705
706 bool consume = true;
707 bool copyByDefault = true;
708 FairMQDeviceProxy proxy;
709 std::vector<ForwardRoute> routes{ForwardRoute{
710 .timeslice = 0,
711 .maxTimeslices = 1,
712 .matcher = {"binding", ConcreteDataMatcher{"TST", "A", 0}},
713 .channel = "from_A_to_B",
714 .policy = nullptr,
715 }};
716
717 auto findChannelByName = [&channels](std::string const& channelName) -> fair::mq::Channel& {
718 for (auto& channel : channels) {
719 if (channel.GetName() == channelName) {
720 return channel;
721 }
722 }
723 throw std::runtime_error("Channel not found");
724 };
725
726 proxy.bind({}, {}, routes, findChannelByName, nullptr);
727
728 TimesliceIndex::OldestOutputInfo oldestTimeslice{.timeslice = {0}};
729 std::vector<MessageSet> currentSetOfInputs;
730 MessageSet messageSet;
731
732 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
733 fair::mq::MessagePtr payload(transport->CreateMessage());
734 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
735 auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dih});
736 messageSet.add(PartRef{std::move(header), std::move(payload)});
737 REQUIRE(messageSet.size() == 1);
738 currentSetOfInputs.emplace_back(std::move(messageSet));
739
740 TimesliceSlot slot{0};
741
742 auto result = o2::framework::DataProcessingHelpers::routeForwardedMessages(proxy, slot, currentSetOfInputs, oldestTimeslice, copyByDefault, consume);
743 REQUIRE(result.size() == 1); // One route
744 REQUIRE(result[0].Size() == 0); // Oldest possible timeframe should not be forwarded
745}
std::vector< OutputRoute > routes
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
void bind(std::vector< OutputRoute > const &outputs, std::vector< InputRoute > const &inputs, std::vector< ForwardRoute > const &forwards, std::function< fair::mq::Channel &(std::string const &)> bindChannelByName, std::function< bool(void)> newStateRequestedCallback)
#define CHECK
GLuint64EXT * result
Definition glcorearb.h:5662
Defining PrimaryVertex explicitly as messageable.
TEST_CASE("test_prepareArguments")
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
static std::vector< fair::mq::Parts > routeForwardedMessages(FairMQDeviceProxy &proxy, TimesliceSlot slot, std::vector< MessageSet > &currentSetOfInputs, TimesliceIndex::OldestOutputInfo oldestTimeslice, bool copy, bool consume)
Helper to route messages for forwarding.
a BaseHeader with domain information from the source
size_t size() const
get number of in-flight O2 messages
Definition MessageSet.h:86
void add(PartRef &&ref)
Definition MessageSet.h:123
Reference to an inflight part.
Definition PartRef.h:24
a BaseHeader with state information from the source
the main header struct
Definition DataHeader.h:619
SplitPayloadPartsType splitPayloadParts
Definition DataHeader.h:647
DataDescription dataDescription
Definition DataHeader.h:637
SubSpecificationType subSpecification
Definition DataHeader.h:657
SplitPayloadIndexType splitPayloadIndex
Definition DataHeader.h:662
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:33
std::vector< ChannelData > channels