58 int quickUpdateInterval = 1;
60 std::vector<MetricSpec> specs{
61 MetricSpec{.
name =
"malformed_inputs", .metricId =
static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), .minPublishInterval = quickUpdateInterval},
62 MetricSpec{.name =
"dropped_computations", .metricId =
static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), .minPublishInterval = quickUpdateInterval},
63 MetricSpec{.name =
"dropped_incoming_messages", .metricId =
static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), .minPublishInterval = quickUpdateInterval},
64 MetricSpec{.name =
"relayed_messages", .metricId =
static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), .minPublishInterval = quickUpdateInterval}};
66 for (
auto& spec : specs) {
67 stats.registerMetric(spec);
71 ref.registerService(ServiceRegistryHelpers::handleForService<Monitoring>(&monitoring));
72 ref.registerService(ServiceRegistryHelpers::handleForService<DataProcessingStats>(&stats));
73 ref.registerService(ServiceRegistryHelpers::handleForService<DataProcessingStates>(&
states));
74 ref.registerService(ServiceRegistryHelpers::handleForService<DriverConfig const>(&driverConfig));
75 ref.registerService(ServiceRegistryHelpers::handleForService<DeviceState>(&
state));
80 InputSpec spec{
"clusters",
"TPC",
"CLUSTERS"};
82 std::vector<InputRoute> inputs = {
85 std::vector<ForwardRoute> forwards;
86 std::vector<InputChannelInfo> infos{1};
88 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
104 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
105 std::array<fair::mq::MessagePtr, 2> messages;
106 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
108 messages[1] = transport->CreateMessage(1000);
109 fair::mq::MessagePtr& header = messages[0];
110 fair::mq::MessagePtr& payload = messages[1];
112 relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
113 std::vector<RecordAction> ready;
114 relayer.getReadyToProcess(ready);
115 REQUIRE(ready.size() == 1);
116 REQUIRE(ready[0].slot.index == 0);
117 REQUIRE(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
118 REQUIRE(header.get() ==
nullptr);
119 REQUIRE(payload.get() ==
nullptr);
120 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
122 REQUIRE(
result.size() == 1);
127 SECTION(
"TestNoWaitMatcher")
132 std::vector<InputRoute> inputs = {
135 std::vector<ForwardRoute> forwards;
136 std::vector<InputChannelInfo> infos{1};
138 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
154 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
155 std::array<fair::mq::MessagePtr, 2> messages;
156 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
158 messages[1] = transport->CreateMessage(1000);
159 fair::mq::MessagePtr& header = messages[0];
160 fair::mq::MessagePtr& payload = messages[1];
162 relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
163 std::vector<RecordAction> ready;
164 relayer.getReadyToProcess(ready);
165 REQUIRE(ready.size() == 1);
166 REQUIRE(ready[0].slot.index == 0);
167 REQUIRE(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
168 REQUIRE(header.get() ==
nullptr);
169 REQUIRE(payload.get() ==
nullptr);
170 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
172 REQUIRE(
result.size() == 1);
192 std::vector<InputRoute> inputs = {
196 std::vector<ForwardRoute> forwards;
198 std::vector<InputChannelInfo> infos{1};
200 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
206 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
207 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
209 auto createMessage = [&transport, &channelAlloc, &relayer](
DataHeader& dh,
size_t time) {
210 std::array<fair::mq::MessagePtr, 2> messages;
212 messages[1] = transport->CreateMessage(1000);
213 fair::mq::MessagePtr& header = messages[0];
214 fair::mq::MessagePtr& payload = messages[1];
216 relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
217 REQUIRE(header.get() ==
nullptr);
218 REQUIRE(payload.get() ==
nullptr);
238 createMessage(dh1, 0);
239 std::vector<RecordAction> ready;
240 relayer.getReadyToProcess(ready);
241 REQUIRE(ready.size() == 0);
243 createMessage(dh2, 0);
245 relayer.getReadyToProcess(ready);
246 REQUIRE(ready.size() == 1);
247 REQUIRE(ready[0].slot.index == 0);
248 REQUIRE(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
250 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
252 REQUIRE(
result.size() == 2);
259 SECTION(
"TestRelayBug")
273 std::vector<InputRoute> inputs = {
277 std::vector<ForwardRoute> forwards;
279 std::vector<InputChannelInfo> infos{1};
281 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
287 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
288 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
290 auto createMessage = [&transport, &channelAlloc, &relayer](
DataHeader& dh,
size_t time) {
291 std::array<fair::mq::MessagePtr, 2> messages;
293 messages[1] = transport->CreateMessage(1000);
294 fair::mq::MessagePtr& header = messages[0];
295 fair::mq::MessagePtr& payload = messages[1];
297 relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
298 REQUIRE(header.get() ==
nullptr);
299 REQUIRE(payload.get() ==
nullptr);
328 createMessage(dh1, 0);
329 std::vector<RecordAction> ready;
330 relayer.getReadyToProcess(ready);
331 REQUIRE(ready.size() == 0);
332 createMessage(dh1, 1);
334 relayer.getReadyToProcess(ready);
335 REQUIRE(ready.size() == 0);
336 createMessage(dh2, 0);
338 relayer.getReadyToProcess(ready);
339 REQUIRE(ready.size() == 1);
340 REQUIRE(ready[0].slot.index == 0);
341 REQUIRE(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
342 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
343 createMessage(dh2, 1);
345 relayer.getReadyToProcess(ready);
346 REQUIRE(ready.size() == 1);
347 REQUIRE(ready[0].slot.index == 1);
348 REQUIRE(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
349 result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
357 InputSpec spec{
"clusters",
"TPC",
"CLUSTERS"};
359 std::vector<InputRoute> inputs = {
361 std::vector<ForwardRoute> forwards;
364 std::vector<InputChannelInfo> infos{1};
366 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
381 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
382 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
383 auto createMessage = [&transport, &channelAlloc, &relayer, &dh](
auto const&
h) {
384 std::array<fair::mq::MessagePtr, 2> messages;
386 messages[1] = transport->CreateMessage(1000);
387 fair::mq::MessagePtr& header = messages[0];
388 fair::mq::MessagePtr& payload = messages[1];
390 auto res = relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
391 REQUIRE((
res.type != DataRelayer::RelayChoice::Type::WillRelay || header.get() ==
nullptr));
392 REQUIRE((
res.type != DataRelayer::RelayChoice::Type::WillRelay || payload.get() ==
nullptr));
393 REQUIRE((
res.type != DataRelayer::RelayChoice::Type::Backpressured || header.get() !=
nullptr));
394 REQUIRE((
res.type != DataRelayer::RelayChoice::Type::Backpressured || payload.get() !=
nullptr));
400 std::vector<RecordAction> ready;
401 relayer.getReadyToProcess(ready);
402 REQUIRE(ready.size() == 2);
403 REQUIRE(ready[0].slot.index == 1);
404 REQUIRE(ready[1].slot.index == 0);
405 REQUIRE(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
406 REQUIRE(ready[1].
op == CompletionPolicy::CompletionOp::Consume);
407 for (
size_t i = 0;
i < ready.size(); ++
i) {
408 auto result = relayer.consumeAllInputsForTimeslice(ready[
i].slot);
416 relayer.getReadyToProcess(ready);
417 REQUIRE(ready.size() == 2);
419 auto result1 = relayer.consumeAllInputsForTimeslice(ready[0].slot);
420 auto result2 = relayer.consumeAllInputsForTimeslice(ready[1].slot);
422 REQUIRE(result1.size() == 1);
423 REQUIRE(result2.size() == 1);
428 SECTION(
"TestPolicies")
431 InputSpec spec1{
"clusters",
"TPC",
"CLUSTERS"};
432 InputSpec spec2{
"tracks",
"TPC",
"TRACKS"};
434 std::vector<InputRoute> inputs = {
439 std::vector<ForwardRoute> forwards;
440 std::vector<InputChannelInfo> infos{1};
442 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
465 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
466 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
467 auto createMessage = [&transport, &channelAlloc, &relayer](
auto const& dh,
auto const&
h) {
468 std::array<fair::mq::MessagePtr, 2> messages;
470 messages[1] = transport->CreateMessage(1000);
471 fair::mq::MessagePtr& header = messages[0];
473 return relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
478 std::vector<RecordAction> ready1;
479 relayer.getReadyToProcess(ready1);
480 REQUIRE(ready1.size() == 1);
481 REQUIRE(ready1[0].slot.index == 0);
482 REQUIRE(ready1[0].
op == CompletionPolicy::CompletionOp::Process);
485 std::vector<RecordAction> ready2;
486 relayer.getReadyToProcess(ready2);
487 REQUIRE(ready2.size() == 1);
488 REQUIRE(ready2[0].slot.index == 1);
489 REQUIRE(ready2[0].
op == CompletionPolicy::CompletionOp::Process);
492 std::vector<RecordAction> ready3;
493 relayer.getReadyToProcess(ready3);
494 REQUIRE(ready3.size() == 1);
495 REQUIRE(ready3[0].slot.index == 1);
496 REQUIRE(ready3[0].
op == CompletionPolicy::CompletionOp::Consume);
503 InputSpec spec1{
"clusters",
"TPC",
"CLUSTERS"};
504 InputSpec spec2{
"tracks",
"TPC",
"TRACKS"};
506 std::vector<InputRoute> inputs = {
511 std::vector<ForwardRoute> forwards;
512 std::vector<InputChannelInfo> infos{1};
514 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
537 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
538 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
539 auto createMessage = [&transport, &channelAlloc, &relayer](
auto const& dh,
auto const&
h) {
540 std::array<fair::mq::MessagePtr, 2> messages;
542 messages[1] = transport->CreateMessage(1000);
543 fair::mq::MessagePtr& header = messages[0];
545 return relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
553 std::vector<RecordAction> ready;
554 relayer.getReadyToProcess(ready);
555 REQUIRE(ready.size() == 0);
559 SECTION(
"TestTooMany")
562 InputSpec spec1{
"clusters",
"TPC",
"CLUSTERS"};
563 InputSpec spec2{
"tracks",
"TPC",
"TRACKS"};
565 std::vector<InputRoute> inputs = {
570 std::vector<ForwardRoute> forwards;
571 std::vector<InputChannelInfo> infos{1};
573 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
596 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
597 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
599 std::array<fair::mq::MessagePtr, 4> messages;
601 messages[1] = transport->CreateMessage(1000);
602 fair::mq::MessagePtr& header = messages[0];
603 fair::mq::MessagePtr& payload = messages[1];
605 relayer.relay(header->GetData(), &messages[0], fakeInfo, 2);
606 REQUIRE(header.get() ==
nullptr);
607 REQUIRE(payload.get() ==
nullptr);
610 messages[3] = transport->CreateMessage(1000);
611 fair::mq::MessagePtr& header2 = messages[2];
612 fair::mq::MessagePtr& payload2 = messages[3];
614 auto action = relayer.relay(header2->GetData(), &messages[2], fakeInfo2, 2);
615 REQUIRE(action.type == DataRelayer::RelayChoice::Type::Backpressured);
616 REQUIRE(header2.get() !=
nullptr);
617 REQUIRE(payload2.get() !=
nullptr);
620 SECTION(
"SplitParts")
623 InputSpec spec1{
"clusters",
"TPC",
"CLUSTERS"};
624 InputSpec spec2{
"its",
"ITS",
"CLUSTERS"};
626 std::vector<InputRoute> inputs = {
631 std::vector<ForwardRoute> forwards;
632 std::vector<InputChannelInfo> infos{1};
634 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
657 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
658 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
660 std::array<fair::mq::MessagePtr, 6> messages;
662 messages[1] = transport->CreateMessage(1000);
663 fair::mq::MessagePtr& header = messages[0];
664 fair::mq::MessagePtr& payload = messages[1];
666 relayer.relay(header->GetData(), &messages[0], fakeInfo, 2);
667 REQUIRE(header.get() ==
nullptr);
668 REQUIRE(payload.get() ==
nullptr);
671 messages[3] = transport->CreateMessage(1000);
672 fair::mq::MessagePtr& header2 = messages[2];
673 fair::mq::MessagePtr& payload2 = messages[3];
675 auto action = relayer.relay(header2->GetData(), &messages[2], fakeInfo, 2);
676 REQUIRE(action.type == DataRelayer::RelayChoice::Type::Backpressured);
677 CHECK(action.timeslice.value == 1);
678 REQUIRE(header2.get() !=
nullptr);
679 REQUIRE(payload2.get() !=
nullptr);
682 messages[5] = transport->CreateMessage(1000);
684 relayer.relay(header2->GetData(), &messages[4], fakeInfo3, 2);
685 REQUIRE(action.type == DataRelayer::RelayChoice::Type::Backpressured);
686 CHECK(action.timeslice.value == 1);
687 REQUIRE(header2.get() !=
nullptr);
688 REQUIRE(payload2.get() !=
nullptr);
691 SECTION(
"SplitPayloadPairs")
694 InputSpec spec1{
"clusters",
"TPC",
"CLUSTERS"};
696 std::vector<InputRoute> inputs = {
700 std::vector<ForwardRoute> forwards;
701 std::vector<InputChannelInfo> infos{1};
703 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
711 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
712 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
713 size_t timeslice = 0;
715 const int nSplitParts = 100;
716 std::vector<std::unique_ptr<fair::mq::Message>> splitParts;
717 splitParts.reserve(2 * nSplitParts);
719 for (
size_t i = 0;
i < nSplitParts; ++
i) {
720 dh.splitPayloadIndex =
i;
721 dh.splitPayloadParts = nSplitParts;
724 fair::mq::MessagePtr payload = transport->CreateMessage(100);
726 splitParts.emplace_back(std::move(header));
727 splitParts.emplace_back(std::move(payload));
729 REQUIRE(splitParts.size() == 2 * nSplitParts);
732 relayer.relay(splitParts[0]->GetData(), splitParts.data(), fakeInfo, splitParts.size());
733 std::vector<RecordAction> ready;
734 relayer.getReadyToProcess(ready);
735 REQUIRE(ready.size() == 1);
736 REQUIRE(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
737 auto messageSet = relayer.consumeAllInputsForTimeslice(ready[0].slot);
740 REQUIRE(messageSet.size() == 1);
741 REQUIRE((messageSet[0] |
count_parts{}) == nSplitParts);
745 SECTION(
"SplitPayloadSequence")
748 InputSpec spec1{
"clusters",
"TST",
"COUNTER"};
750 std::vector<InputRoute> inputs = {
754 std::vector<ForwardRoute> forwards;
755 std::vector<InputChannelInfo> infos{1};
757 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
763 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
764 size_t timeslice = 0;
766 std::vector<size_t> sequenceSize;
767 size_t nTotalPayloads = 0;
769 auto createSequence = [&nTotalPayloads, ×lice, &sequenceSize, &transport, &relayer](
size_t nPayloads) ->
void {
770 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
771 std::vector<std::unique_ptr<fair::mq::Message>> messages;
772 messages.reserve(nPayloads + 1);
778 dh.splitPayloadParts = nPayloads;
780 messages.emplace_back(std::move(header));
782 for (
size_t i = 0;
i < nPayloads; ++
i) {
783 messages.emplace_back(transport->CreateMessage(100));
784 *(
reinterpret_cast<size_t*
>(messages.back()->GetData())) = nTotalPayloads;
787 REQUIRE(messages.size() == nPayloads + 1);
789 relayer.relay(messages[0]->GetData(), messages.data(), fakeInfo, messages.size(), nPayloads);
790 sequenceSize.emplace_back(nPayloads);
796 std::vector<RecordAction> ready;
797 relayer.getReadyToProcess(ready);
798 REQUIRE(ready.size() == 1);
799 REQUIRE(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
800 auto messageSet = relayer.consumeAllInputsForTimeslice(ready[0].slot);
802 REQUIRE(messageSet.size() == 1);
804 REQUIRE((messageSet[0] |
count_parts{}) == sequenceSize.size());
806 for (
size_t seqid = 0; seqid < sequenceSize.size(); ++seqid) {
810 auto const*
data = (messageSet[0] |
get_payload{seqid, pi})->GetData();
811 REQUIRE(*(
reinterpret_cast<size_t const*
>(
data)) ==
counter);
817 SECTION(
"ProcessDanglingInputs")
819 InputSpec spec{
"condition",
"TST",
"COND"};
820 std::vector<InputRoute> inputs = {
821 InputRoute{spec, 0,
"from_source_to_self", 0}};
823 std::vector<InputChannelInfo> infos{1};
825 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
829 std::vector<fair::mq::Channel>
channels{fair::mq::Channel(
"from_source_to_self")};
830 auto findChannel = [&
channels](std::string
const&
name) -> fair::mq::Channel& {
832 if (ch.GetName() ==
name) {
836 throw std::runtime_error(
"Channel not found: " +
name);
838 proxy.
bind({}, inputs, {}, findChannel, [] {
return false; });
839 ref.registerService(ServiceRegistryHelpers::handleForService<FairMQDeviceProxy>(&proxy));
845 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
846 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
850 dh.splitPayloadIndex = 0;
854 handler.
name =
"test-condition";
856 handler.
lifetime = Lifetime::Condition;
861 for (
size_t si = 0; si <
index.size(); si++) {
863 if (!
index.isValid(slot)) {
865 (
void)
index.setOldestPossibleInput({1}, channelIndex);
878 ref.payload = transport->CreateMessage(4);
881 std::vector<ExpirationHandler> handlers{handler};
882 auto activity = relayer.processDanglingInputs(handlers, {registry},
true);
884 REQUIRE(activity.newSlots == 1);
885 REQUIRE(activity.expiredSlots == 1);
888 std::vector<RecordAction> ready;
889 relayer.getReadyToProcess(ready);
890 REQUIRE(ready.size() == 1);
891 REQUIRE(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
893 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
894 REQUIRE(
result.size() == 1);
898 SECTION(
"ProcessDanglingInputsSkipsWhenDataPresent")
902 InputSpec spec{
"condition",
"TST",
"COND"};
903 std::vector<InputRoute> inputs = {
904 InputRoute{spec, 0,
"from_source_to_self", 0}};
906 std::vector<InputChannelInfo> infos{1};
908 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
911 std::vector<fair::mq::Channel>
channels{fair::mq::Channel(
"from_source_to_self")};
912 auto findChannel = [&
channels](std::string
const&
name) -> fair::mq::Channel& {
914 if (ch.GetName() ==
name) {
918 throw std::runtime_error(
"Channel not found: " +
name);
920 proxy.
bind({}, inputs, {}, findChannel, [] {
return false; });
921 ref.registerService(ServiceRegistryHelpers::handleForService<FairMQDeviceProxy>(&proxy));
927 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
928 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
932 dh.splitPayloadIndex = 0;
937 handler.
name =
"test-condition";
939 handler.
lifetime = Lifetime::Condition;
942 for (
size_t si = 0; si <
index.size(); si++) {
944 if (!
index.isValid(slot)) {
946 (
void)
index.setOldestPossibleInput({1}, channelIndex);
953 int handlerCallCount = 0;
956 ref.payload = transport->CreateMessage(4);
959 std::vector<ExpirationHandler> handlers{handler};
962 auto activity1 = relayer.processDanglingInputs(handlers, {registry},
true);
963 REQUIRE(activity1.expiredSlots == 1);
964 REQUIRE(handlerCallCount == 1);
967 auto activity2 = relayer.processDanglingInputs(handlers, {registry},
false);
968 REQUIRE(activity2.expiredSlots == 0);
969 REQUIRE(handlerCallCount == 1);