54 int quickUpdateInterval = 1;
56 std::vector<MetricSpec> specs{
57 MetricSpec{.
name =
"malformed_inputs", .metricId =
static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), .minPublishInterval = quickUpdateInterval},
58 MetricSpec{.name =
"dropped_computations", .metricId =
static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), .minPublishInterval = quickUpdateInterval},
59 MetricSpec{.name =
"dropped_incoming_messages", .metricId =
static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), .minPublishInterval = quickUpdateInterval},
60 MetricSpec{.name =
"relayed_messages", .metricId =
static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), .minPublishInterval = quickUpdateInterval}};
62 for (
auto& spec : specs) {
63 stats.registerMetric(spec);
67 ref.registerService(ServiceRegistryHelpers::handleForService<Monitoring>(&monitoring));
68 ref.registerService(ServiceRegistryHelpers::handleForService<DataProcessingStats>(&stats));
69 ref.registerService(ServiceRegistryHelpers::handleForService<DataProcessingStates>(&
states));
70 ref.registerService(ServiceRegistryHelpers::handleForService<DriverConfig const>(&driverConfig));
71 ref.registerService(ServiceRegistryHelpers::handleForService<DeviceState>(&
state));
76 InputSpec spec{
"clusters",
"TPC",
"CLUSTERS"};
78 std::vector<InputRoute> inputs = {
81 std::vector<ForwardRoute> forwards;
82 std::vector<InputChannelInfo> infos{1};
84 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
100 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
101 std::array<fair::mq::MessagePtr, 2> messages;
102 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
104 messages[1] = transport->CreateMessage(1000);
105 fair::mq::MessagePtr& header = messages[0];
106 fair::mq::MessagePtr& payload = messages[1];
108 relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
109 std::vector<RecordAction> ready;
110 relayer.getReadyToProcess(ready);
111 REQUIRE(ready.size() == 1);
112 REQUIRE(ready[0].slot.index == 0);
113 REQUIRE(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
114 REQUIRE(header.get() ==
nullptr);
115 REQUIRE(payload.get() ==
nullptr);
116 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
118 REQUIRE(
result.size() == 1);
119 REQUIRE(
result.at(0).size() == 1);
123 SECTION(
"TestNoWaitMatcher")
128 std::vector<InputRoute> inputs = {
131 std::vector<ForwardRoute> forwards;
132 std::vector<InputChannelInfo> infos{1};
134 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
150 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
151 std::array<fair::mq::MessagePtr, 2> messages;
152 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
154 messages[1] = transport->CreateMessage(1000);
155 fair::mq::MessagePtr& header = messages[0];
156 fair::mq::MessagePtr& payload = messages[1];
158 relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
159 std::vector<RecordAction> ready;
160 relayer.getReadyToProcess(ready);
161 REQUIRE(ready.size() == 1);
162 REQUIRE(ready[0].slot.index == 0);
163 REQUIRE(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
164 REQUIRE(header.get() ==
nullptr);
165 REQUIRE(payload.get() ==
nullptr);
166 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
168 REQUIRE(
result.size() == 1);
169 REQUIRE(
result.at(0).size() == 1);
188 std::vector<InputRoute> inputs = {
192 std::vector<ForwardRoute> forwards;
194 std::vector<InputChannelInfo> infos{1};
196 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
202 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
203 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
205 auto createMessage = [&transport, &channelAlloc, &relayer](
DataHeader& dh,
size_t time) {
206 std::array<fair::mq::MessagePtr, 2> messages;
208 messages[1] = transport->CreateMessage(1000);
209 fair::mq::MessagePtr& header = messages[0];
210 fair::mq::MessagePtr& payload = messages[1];
212 relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
213 REQUIRE(header.get() ==
nullptr);
214 REQUIRE(payload.get() ==
nullptr);
234 createMessage(dh1, 0);
235 std::vector<RecordAction> ready;
236 relayer.getReadyToProcess(ready);
237 REQUIRE(ready.size() == 0);
239 createMessage(dh2, 0);
241 relayer.getReadyToProcess(ready);
242 REQUIRE(ready.size() == 1);
243 REQUIRE(ready[0].slot.index == 0);
244 REQUIRE(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
246 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
248 REQUIRE(
result.size() == 2);
249 REQUIRE(
result.at(0).size() == 1);
250 REQUIRE(
result.at(1).size() == 1);
255 SECTION(
"TestRelayBug")
269 std::vector<InputRoute> inputs = {
273 std::vector<ForwardRoute> forwards;
275 std::vector<InputChannelInfo> infos{1};
277 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
283 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
284 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
286 auto createMessage = [&transport, &channelAlloc, &relayer](
DataHeader& dh,
size_t time) {
287 std::array<fair::mq::MessagePtr, 2> messages;
289 messages[1] = transport->CreateMessage(1000);
290 fair::mq::MessagePtr& header = messages[0];
291 fair::mq::MessagePtr& payload = messages[1];
293 relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
294 REQUIRE(header.get() ==
nullptr);
295 REQUIRE(payload.get() ==
nullptr);
324 createMessage(dh1, 0);
325 std::vector<RecordAction> ready;
326 relayer.getReadyToProcess(ready);
327 REQUIRE(ready.size() == 0);
328 createMessage(dh1, 1);
330 relayer.getReadyToProcess(ready);
331 REQUIRE(ready.size() == 0);
332 createMessage(dh2, 0);
334 relayer.getReadyToProcess(ready);
335 REQUIRE(ready.size() == 1);
336 REQUIRE(ready[0].slot.index == 0);
337 REQUIRE(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
338 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
339 createMessage(dh2, 1);
341 relayer.getReadyToProcess(ready);
342 REQUIRE(ready.size() == 1);
343 REQUIRE(ready[0].slot.index == 1);
344 REQUIRE(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
345 result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
353 InputSpec spec{
"clusters",
"TPC",
"CLUSTERS"};
355 std::vector<InputRoute> inputs = {
357 std::vector<ForwardRoute> forwards;
360 std::vector<InputChannelInfo> infos{1};
362 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
377 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
378 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
379 auto createMessage = [&transport, &channelAlloc, &relayer, &dh](
auto const&
h) {
380 std::array<fair::mq::MessagePtr, 2> messages;
382 messages[1] = transport->CreateMessage(1000);
383 fair::mq::MessagePtr& header = messages[0];
384 fair::mq::MessagePtr& payload = messages[1];
386 auto res = relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
387 REQUIRE((
res.type != DataRelayer::RelayChoice::Type::WillRelay || header.get() ==
nullptr));
388 REQUIRE((
res.type != DataRelayer::RelayChoice::Type::WillRelay || payload.get() ==
nullptr));
389 REQUIRE((
res.type != DataRelayer::RelayChoice::Type::Backpressured || header.get() !=
nullptr));
390 REQUIRE((
res.type != DataRelayer::RelayChoice::Type::Backpressured || payload.get() !=
nullptr));
396 std::vector<RecordAction> ready;
397 relayer.getReadyToProcess(ready);
398 REQUIRE(ready.size() == 2);
399 REQUIRE(ready[0].slot.index == 1);
400 REQUIRE(ready[1].slot.index == 0);
401 REQUIRE(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
402 REQUIRE(ready[1].
op == CompletionPolicy::CompletionOp::Consume);
403 for (
size_t i = 0;
i < ready.size(); ++
i) {
404 auto result = relayer.consumeAllInputsForTimeslice(ready[
i].slot);
412 relayer.getReadyToProcess(ready);
413 REQUIRE(ready.size() == 2);
415 auto result1 = relayer.consumeAllInputsForTimeslice(ready[0].slot);
416 auto result2 = relayer.consumeAllInputsForTimeslice(ready[1].slot);
418 REQUIRE(result1.size() == 1);
419 REQUIRE(result2.size() == 1);
424 SECTION(
"TestPolicies")
427 InputSpec spec1{
"clusters",
"TPC",
"CLUSTERS"};
428 InputSpec spec2{
"tracks",
"TPC",
"TRACKS"};
430 std::vector<InputRoute> inputs = {
435 std::vector<ForwardRoute> forwards;
436 std::vector<InputChannelInfo> infos{1};
438 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
461 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
462 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
463 auto createMessage = [&transport, &channelAlloc, &relayer](
auto const& dh,
auto const&
h) {
464 std::array<fair::mq::MessagePtr, 2> messages;
466 messages[1] = transport->CreateMessage(1000);
467 fair::mq::MessagePtr& header = messages[0];
469 return relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
474 std::vector<RecordAction> ready1;
475 relayer.getReadyToProcess(ready1);
476 REQUIRE(ready1.size() == 1);
477 REQUIRE(ready1[0].slot.index == 0);
478 REQUIRE(ready1[0].
op == CompletionPolicy::CompletionOp::Process);
481 std::vector<RecordAction> ready2;
482 relayer.getReadyToProcess(ready2);
483 REQUIRE(ready2.size() == 1);
484 REQUIRE(ready2[0].slot.index == 1);
485 REQUIRE(ready2[0].
op == CompletionPolicy::CompletionOp::Process);
488 std::vector<RecordAction> ready3;
489 relayer.getReadyToProcess(ready3);
490 REQUIRE(ready3.size() == 1);
491 REQUIRE(ready3[0].slot.index == 1);
492 REQUIRE(ready3[0].
op == CompletionPolicy::CompletionOp::Consume);
499 InputSpec spec1{
"clusters",
"TPC",
"CLUSTERS"};
500 InputSpec spec2{
"tracks",
"TPC",
"TRACKS"};
502 std::vector<InputRoute> inputs = {
507 std::vector<ForwardRoute> forwards;
508 std::vector<InputChannelInfo> infos{1};
510 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
533 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
534 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
535 auto createMessage = [&transport, &channelAlloc, &relayer](
auto const& dh,
auto const&
h) {
536 std::array<fair::mq::MessagePtr, 2> messages;
538 messages[1] = transport->CreateMessage(1000);
539 fair::mq::MessagePtr& header = messages[0];
541 return relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
549 std::vector<RecordAction> ready;
550 relayer.getReadyToProcess(ready);
551 REQUIRE(ready.size() == 0);
555 SECTION(
"TestTooMany")
558 InputSpec spec1{
"clusters",
"TPC",
"CLUSTERS"};
559 InputSpec spec2{
"tracks",
"TPC",
"TRACKS"};
561 std::vector<InputRoute> inputs = {
566 std::vector<ForwardRoute> forwards;
567 std::vector<InputChannelInfo> infos{1};
569 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
592 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
593 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
595 std::array<fair::mq::MessagePtr, 4> messages;
597 messages[1] = transport->CreateMessage(1000);
598 fair::mq::MessagePtr& header = messages[0];
599 fair::mq::MessagePtr& payload = messages[1];
601 relayer.relay(header->GetData(), &messages[0], fakeInfo, 2);
602 REQUIRE(header.get() ==
nullptr);
603 REQUIRE(payload.get() ==
nullptr);
606 messages[3] = transport->CreateMessage(1000);
607 fair::mq::MessagePtr& header2 = messages[2];
608 fair::mq::MessagePtr& payload2 = messages[3];
610 auto action = relayer.relay(header2->GetData(), &messages[2], fakeInfo2, 2);
611 REQUIRE(action.type == DataRelayer::RelayChoice::Type::Backpressured);
612 REQUIRE(header2.get() !=
nullptr);
613 REQUIRE(payload2.get() !=
nullptr);
616 SECTION(
"SplitParts")
619 InputSpec spec1{
"clusters",
"TPC",
"CLUSTERS"};
620 InputSpec spec2{
"its",
"ITS",
"CLUSTERS"};
622 std::vector<InputRoute> inputs = {
627 std::vector<ForwardRoute> forwards;
628 std::vector<InputChannelInfo> infos{1};
630 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
653 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
654 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
656 std::array<fair::mq::MessagePtr, 6> messages;
658 messages[1] = transport->CreateMessage(1000);
659 fair::mq::MessagePtr& header = messages[0];
660 fair::mq::MessagePtr& payload = messages[1];
662 relayer.relay(header->GetData(), &messages[0], fakeInfo, 2);
663 REQUIRE(header.get() ==
nullptr);
664 REQUIRE(payload.get() ==
nullptr);
667 messages[3] = transport->CreateMessage(1000);
668 fair::mq::MessagePtr& header2 = messages[2];
669 fair::mq::MessagePtr& payload2 = messages[3];
671 auto action = relayer.relay(header2->GetData(), &messages[2], fakeInfo, 2);
672 REQUIRE(action.type == DataRelayer::RelayChoice::Type::Backpressured);
673 CHECK(action.timeslice.value == 1);
674 REQUIRE(header2.get() !=
nullptr);
675 REQUIRE(payload2.get() !=
nullptr);
678 messages[5] = transport->CreateMessage(1000);
680 relayer.relay(header2->GetData(), &messages[4], fakeInfo3, 2);
681 REQUIRE(action.type == DataRelayer::RelayChoice::Type::Backpressured);
682 CHECK(action.timeslice.value == 1);
683 REQUIRE(header2.get() !=
nullptr);
684 REQUIRE(payload2.get() !=
nullptr);
687 SECTION(
"SplitPayloadPairs")
690 InputSpec spec1{
"clusters",
"TPC",
"CLUSTERS"};
692 std::vector<InputRoute> inputs = {
696 std::vector<ForwardRoute> forwards;
697 std::vector<InputChannelInfo> infos{1};
699 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
707 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
708 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
709 size_t timeslice = 0;
711 const int nSplitParts = 100;
712 std::vector<std::unique_ptr<fair::mq::Message>> splitParts;
713 splitParts.reserve(2 * nSplitParts);
715 for (
size_t i = 0;
i < nSplitParts; ++
i) {
716 dh.splitPayloadIndex =
i;
717 dh.splitPayloadParts = nSplitParts;
720 fair::mq::MessagePtr payload = transport->CreateMessage(100);
722 splitParts.emplace_back(std::move(header));
723 splitParts.emplace_back(std::move(payload));
725 REQUIRE(splitParts.size() == 2 * nSplitParts);
728 relayer.relay(splitParts[0]->GetData(), splitParts.data(), fakeInfo, splitParts.size());
729 std::vector<RecordAction> ready;
730 relayer.getReadyToProcess(ready);
731 REQUIRE(ready.size() == 1);
732 REQUIRE(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
733 auto messageSet = relayer.consumeAllInputsForTimeslice(ready[0].slot);
736 REQUIRE(messageSet.size() == 1);
737 REQUIRE(messageSet[0].
size() == nSplitParts);
738 REQUIRE(messageSet[0].getNumberOfPayloads(0) == 1);
741 SECTION(
"SplitPayloadSequence")
744 InputSpec spec1{
"clusters",
"TST",
"COUNTER"};
746 std::vector<InputRoute> inputs = {
750 std::vector<ForwardRoute> forwards;
751 std::vector<InputChannelInfo> infos{1};
753 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
759 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
760 size_t timeslice = 0;
762 std::vector<size_t> sequenceSize;
763 size_t nTotalPayloads = 0;
765 auto createSequence = [&nTotalPayloads, ×lice, &sequenceSize, &transport, &relayer](
size_t nPayloads) ->
void {
766 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
767 std::vector<std::unique_ptr<fair::mq::Message>> messages;
768 messages.reserve(nPayloads + 1);
774 dh.splitPayloadParts = nPayloads;
776 messages.emplace_back(std::move(header));
778 for (
size_t i = 0;
i < nPayloads; ++
i) {
779 messages.emplace_back(transport->CreateMessage(100));
780 *(
reinterpret_cast<size_t*
>(messages.back()->GetData())) = nTotalPayloads;
783 REQUIRE(messages.size() == nPayloads + 1);
785 relayer.relay(messages[0]->GetData(), messages.data(), fakeInfo, messages.size(), nPayloads);
786 sequenceSize.emplace_back(nPayloads);
792 std::vector<RecordAction> ready;
793 relayer.getReadyToProcess(ready);
794 REQUIRE(ready.size() == 1);
795 REQUIRE(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
796 auto messageSet = relayer.consumeAllInputsForTimeslice(ready[0].slot);
798 REQUIRE(messageSet.size() == 1);
800 REQUIRE(messageSet[0].
size() == sequenceSize.size());
802 for (
auto seqid = 0; seqid < sequenceSize.size(); ++seqid) {
803 REQUIRE(messageSet[0].getNumberOfPayloads(seqid) == sequenceSize[seqid]);
804 for (
auto pi = 0; pi < messageSet[0].getNumberOfPayloads(seqid); ++pi) {
805 REQUIRE(messageSet[0].payload(seqid, pi));
806 auto const*
data = messageSet[0].payload(seqid, pi)->GetData();
807 REQUIRE(*(
reinterpret_cast<size_t const*
>(
data)) ==
counter);