53 int quickUpdateInterval = 1;
55 std::vector<MetricSpec> specs{
56 MetricSpec{.
name =
"malformed_inputs", .metricId =
static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), .minPublishInterval = quickUpdateInterval},
57 MetricSpec{.name =
"dropped_computations", .metricId =
static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), .minPublishInterval = quickUpdateInterval},
58 MetricSpec{.name =
"dropped_incoming_messages", .metricId =
static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), .minPublishInterval = quickUpdateInterval},
59 MetricSpec{.name =
"relayed_messages", .metricId =
static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), .minPublishInterval = quickUpdateInterval}};
61 for (
auto& spec : specs) {
62 stats.registerMetric(spec);
66 ref.registerService(ServiceRegistryHelpers::handleForService<Monitoring>(&monitoring));
67 ref.registerService(ServiceRegistryHelpers::handleForService<DataProcessingStats>(&stats));
68 ref.registerService(ServiceRegistryHelpers::handleForService<DataProcessingStates>(&
states));
69 ref.registerService(ServiceRegistryHelpers::handleForService<DriverConfig const>(&driverConfig));
70 ref.registerService(ServiceRegistryHelpers::handleForService<DeviceState>(&
state));
75 InputSpec spec{
"clusters",
"TPC",
"CLUSTERS"};
77 std::vector<InputRoute> inputs = {
80 std::vector<ForwardRoute> forwards;
81 std::vector<InputChannelInfo> infos{1};
83 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
99 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
100 std::array<fair::mq::MessagePtr, 2> messages;
101 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
103 messages[1] = transport->CreateMessage(1000);
104 fair::mq::MessagePtr& header = messages[0];
105 fair::mq::MessagePtr& payload = messages[1];
107 relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
108 std::vector<RecordAction> ready;
109 relayer.getReadyToProcess(ready);
110 REQUIRE(ready.size() == 1);
111 REQUIRE(ready[0].slot.index == 0);
112 REQUIRE(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
113 REQUIRE(header.get() ==
nullptr);
114 REQUIRE(payload.get() ==
nullptr);
115 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
117 REQUIRE(
result.size() == 1);
118 REQUIRE(
result.at(0).size() == 1);
122 SECTION(
"TestNoWaitMatcher")
127 std::vector<InputRoute> inputs = {
130 std::vector<ForwardRoute> forwards;
131 std::vector<InputChannelInfo> infos{1};
133 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
149 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
150 std::array<fair::mq::MessagePtr, 2> messages;
151 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
153 messages[1] = transport->CreateMessage(1000);
154 fair::mq::MessagePtr& header = messages[0];
155 fair::mq::MessagePtr& payload = messages[1];
157 relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
158 std::vector<RecordAction> ready;
159 relayer.getReadyToProcess(ready);
160 REQUIRE(ready.size() == 1);
161 REQUIRE(ready[0].slot.index == 0);
162 REQUIRE(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
163 REQUIRE(header.get() ==
nullptr);
164 REQUIRE(payload.get() ==
nullptr);
165 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
167 REQUIRE(
result.size() == 1);
168 REQUIRE(
result.at(0).size() == 1);
187 std::vector<InputRoute> inputs = {
191 std::vector<ForwardRoute> forwards;
193 std::vector<InputChannelInfo> infos{1};
195 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
201 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
202 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
204 auto createMessage = [&transport, &channelAlloc, &relayer](
DataHeader& dh,
size_t time) {
205 std::array<fair::mq::MessagePtr, 2> messages;
207 messages[1] = transport->CreateMessage(1000);
208 fair::mq::MessagePtr& header = messages[0];
209 fair::mq::MessagePtr& payload = messages[1];
211 relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
212 REQUIRE(header.get() ==
nullptr);
213 REQUIRE(payload.get() ==
nullptr);
233 createMessage(dh1, 0);
234 std::vector<RecordAction> ready;
235 relayer.getReadyToProcess(ready);
236 REQUIRE(ready.size() == 0);
238 createMessage(dh2, 0);
240 relayer.getReadyToProcess(ready);
241 REQUIRE(ready.size() == 1);
242 REQUIRE(ready[0].slot.index == 0);
243 REQUIRE(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
245 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
247 REQUIRE(
result.size() == 2);
248 REQUIRE(
result.at(0).size() == 1);
249 REQUIRE(
result.at(1).size() == 1);
254 SECTION(
"TestRelayBug")
268 std::vector<InputRoute> inputs = {
272 std::vector<ForwardRoute> forwards;
274 std::vector<InputChannelInfo> infos{1};
276 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
282 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
283 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
285 auto createMessage = [&transport, &channelAlloc, &relayer](
DataHeader& dh,
size_t time) {
286 std::array<fair::mq::MessagePtr, 2> messages;
288 messages[1] = transport->CreateMessage(1000);
289 fair::mq::MessagePtr& header = messages[0];
290 fair::mq::MessagePtr& payload = messages[1];
292 relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
293 REQUIRE(header.get() ==
nullptr);
294 REQUIRE(payload.get() ==
nullptr);
323 createMessage(dh1, 0);
324 std::vector<RecordAction> ready;
325 relayer.getReadyToProcess(ready);
326 REQUIRE(ready.size() == 0);
327 createMessage(dh1, 1);
329 relayer.getReadyToProcess(ready);
330 REQUIRE(ready.size() == 0);
331 createMessage(dh2, 0);
333 relayer.getReadyToProcess(ready);
334 REQUIRE(ready.size() == 1);
335 REQUIRE(ready[0].slot.index == 0);
336 REQUIRE(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
337 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
338 createMessage(dh2, 1);
340 relayer.getReadyToProcess(ready);
341 REQUIRE(ready.size() == 1);
342 REQUIRE(ready[0].slot.index == 1);
343 REQUIRE(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
344 result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
352 InputSpec spec{
"clusters",
"TPC",
"CLUSTERS"};
354 std::vector<InputRoute> inputs = {
356 std::vector<ForwardRoute> forwards;
359 std::vector<InputChannelInfo> infos{1};
361 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
376 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
377 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
378 auto createMessage = [&transport, &channelAlloc, &relayer, &dh](
auto const&
h) {
379 std::array<fair::mq::MessagePtr, 2> messages;
381 messages[1] = transport->CreateMessage(1000);
382 fair::mq::MessagePtr& header = messages[0];
383 fair::mq::MessagePtr& payload = messages[1];
385 auto res = relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
386 REQUIRE((
res.type != DataRelayer::RelayChoice::Type::WillRelay || header.get() ==
nullptr));
387 REQUIRE((
res.type != DataRelayer::RelayChoice::Type::WillRelay || payload.get() ==
nullptr));
388 REQUIRE((
res.type != DataRelayer::RelayChoice::Type::Backpressured || header.get() !=
nullptr));
389 REQUIRE((
res.type != DataRelayer::RelayChoice::Type::Backpressured || payload.get() !=
nullptr));
395 std::vector<RecordAction> ready;
396 relayer.getReadyToProcess(ready);
397 REQUIRE(ready.size() == 2);
398 REQUIRE(ready[0].slot.index == 1);
399 REQUIRE(ready[1].slot.index == 0);
400 REQUIRE(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
401 REQUIRE(ready[1].
op == CompletionPolicy::CompletionOp::Consume);
402 for (
size_t i = 0;
i < ready.size(); ++
i) {
403 auto result = relayer.consumeAllInputsForTimeslice(ready[
i].slot);
411 relayer.getReadyToProcess(ready);
412 REQUIRE(ready.size() == 2);
414 auto result1 = relayer.consumeAllInputsForTimeslice(ready[0].slot);
415 auto result2 = relayer.consumeAllInputsForTimeslice(ready[1].slot);
417 REQUIRE(result1.size() == 1);
418 REQUIRE(result2.size() == 1);
423 SECTION(
"TestPolicies")
426 InputSpec spec1{
"clusters",
"TPC",
"CLUSTERS"};
427 InputSpec spec2{
"tracks",
"TPC",
"TRACKS"};
429 std::vector<InputRoute> inputs = {
434 std::vector<ForwardRoute> forwards;
435 std::vector<InputChannelInfo> infos{1};
437 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
460 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
461 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
462 auto createMessage = [&transport, &channelAlloc, &relayer](
auto const& dh,
auto const&
h) {
463 std::array<fair::mq::MessagePtr, 2> messages;
465 messages[1] = transport->CreateMessage(1000);
466 fair::mq::MessagePtr& header = messages[0];
468 return relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
473 std::vector<RecordAction> ready1;
474 relayer.getReadyToProcess(ready1);
475 REQUIRE(ready1.size() == 1);
476 REQUIRE(ready1[0].slot.index == 0);
477 REQUIRE(ready1[0].
op == CompletionPolicy::CompletionOp::Process);
480 std::vector<RecordAction> ready2;
481 relayer.getReadyToProcess(ready2);
482 REQUIRE(ready2.size() == 1);
483 REQUIRE(ready2[0].slot.index == 1);
484 REQUIRE(ready2[0].
op == CompletionPolicy::CompletionOp::Process);
487 std::vector<RecordAction> ready3;
488 relayer.getReadyToProcess(ready3);
489 REQUIRE(ready3.size() == 1);
490 REQUIRE(ready3[0].slot.index == 1);
491 REQUIRE(ready3[0].
op == CompletionPolicy::CompletionOp::Consume);
498 InputSpec spec1{
"clusters",
"TPC",
"CLUSTERS"};
499 InputSpec spec2{
"tracks",
"TPC",
"TRACKS"};
501 std::vector<InputRoute> inputs = {
506 std::vector<ForwardRoute> forwards;
507 std::vector<InputChannelInfo> infos{1};
509 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
532 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
533 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
534 auto createMessage = [&transport, &channelAlloc, &relayer](
auto const& dh,
auto const&
h) {
535 std::array<fair::mq::MessagePtr, 2> messages;
537 messages[1] = transport->CreateMessage(1000);
538 fair::mq::MessagePtr& header = messages[0];
540 return relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
548 std::vector<RecordAction> ready;
549 relayer.getReadyToProcess(ready);
550 REQUIRE(ready.size() == 0);
554 SECTION(
"TestTooMany")
557 InputSpec spec1{
"clusters",
"TPC",
"CLUSTERS"};
558 InputSpec spec2{
"tracks",
"TPC",
"TRACKS"};
560 std::vector<InputRoute> inputs = {
565 std::vector<ForwardRoute> forwards;
566 std::vector<InputChannelInfo> infos{1};
568 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
591 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
592 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
594 std::array<fair::mq::MessagePtr, 4> messages;
596 messages[1] = transport->CreateMessage(1000);
597 fair::mq::MessagePtr& header = messages[0];
598 fair::mq::MessagePtr& payload = messages[1];
600 relayer.relay(header->GetData(), &messages[0], fakeInfo, 2);
601 REQUIRE(header.get() ==
nullptr);
602 REQUIRE(payload.get() ==
nullptr);
605 messages[3] = transport->CreateMessage(1000);
606 fair::mq::MessagePtr& header2 = messages[2];
607 fair::mq::MessagePtr& payload2 = messages[3];
609 auto action = relayer.relay(header2->GetData(), &messages[2], fakeInfo2, 2);
610 REQUIRE(action.type == DataRelayer::RelayChoice::Type::Backpressured);
611 REQUIRE(header2.get() !=
nullptr);
612 REQUIRE(payload2.get() !=
nullptr);
615 SECTION(
"SplitParts")
618 InputSpec spec1{
"clusters",
"TPC",
"CLUSTERS"};
619 InputSpec spec2{
"its",
"ITS",
"CLUSTERS"};
621 std::vector<InputRoute> inputs = {
626 std::vector<ForwardRoute> forwards;
627 std::vector<InputChannelInfo> infos{1};
629 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
652 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
653 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
655 std::array<fair::mq::MessagePtr, 6> messages;
657 messages[1] = transport->CreateMessage(1000);
658 fair::mq::MessagePtr& header = messages[0];
659 fair::mq::MessagePtr& payload = messages[1];
661 relayer.relay(header->GetData(), &messages[0], fakeInfo, 2);
662 REQUIRE(header.get() ==
nullptr);
663 REQUIRE(payload.get() ==
nullptr);
666 messages[3] = transport->CreateMessage(1000);
667 fair::mq::MessagePtr& header2 = messages[2];
668 fair::mq::MessagePtr& payload2 = messages[3];
670 auto action = relayer.relay(header2->GetData(), &messages[2], fakeInfo, 2);
671 REQUIRE(action.type == DataRelayer::RelayChoice::Type::Backpressured);
672 CHECK(action.timeslice.value == 1);
673 REQUIRE(header2.get() !=
nullptr);
674 REQUIRE(payload2.get() !=
nullptr);
677 messages[5] = transport->CreateMessage(1000);
679 relayer.relay(header2->GetData(), &messages[4], fakeInfo3, 2);
680 REQUIRE(action.type == DataRelayer::RelayChoice::Type::Backpressured);
681 CHECK(action.timeslice.value == 1);
682 REQUIRE(header2.get() !=
nullptr);
683 REQUIRE(payload2.get() !=
nullptr);
686 SECTION(
"SplitPayloadPairs")
689 InputSpec spec1{
"clusters",
"TPC",
"CLUSTERS"};
691 std::vector<InputRoute> inputs = {
695 std::vector<ForwardRoute> forwards;
696 std::vector<InputChannelInfo> infos{1};
698 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
706 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
707 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
708 size_t timeslice = 0;
710 const int nSplitParts = 100;
711 std::vector<std::unique_ptr<fair::mq::Message>> splitParts;
712 splitParts.reserve(2 * nSplitParts);
714 for (
size_t i = 0;
i < nSplitParts; ++
i) {
715 dh.splitPayloadIndex =
i;
716 dh.splitPayloadParts = nSplitParts;
719 fair::mq::MessagePtr payload = transport->CreateMessage(100);
721 splitParts.emplace_back(std::move(header));
722 splitParts.emplace_back(std::move(payload));
724 REQUIRE(splitParts.size() == 2 * nSplitParts);
727 relayer.relay(splitParts[0]->GetData(), splitParts.data(), fakeInfo, splitParts.size());
728 std::vector<RecordAction> ready;
729 relayer.getReadyToProcess(ready);
730 REQUIRE(ready.size() == 1);
731 REQUIRE(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
732 auto messageSet = relayer.consumeAllInputsForTimeslice(ready[0].slot);
735 REQUIRE(messageSet.size() == 1);
736 REQUIRE(messageSet[0].
size() == nSplitParts);
737 REQUIRE(messageSet[0].getNumberOfPayloads(0) == 1);
740 SECTION(
"SplitPayloadSequence")
743 InputSpec spec1{
"clusters",
"TST",
"COUNTER"};
745 std::vector<InputRoute> inputs = {
749 std::vector<ForwardRoute> forwards;
750 std::vector<InputChannelInfo> infos{1};
752 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&
index));
758 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
759 size_t timeslice = 0;
761 std::vector<size_t> sequenceSize;
762 size_t nTotalPayloads = 0;
764 auto createSequence = [&nTotalPayloads, ×lice, &sequenceSize, &transport, &relayer](
size_t nPayloads) ->
void {
765 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
766 std::vector<std::unique_ptr<fair::mq::Message>> messages;
767 messages.reserve(nPayloads + 1);
773 dh.splitPayloadParts = nPayloads;
775 messages.emplace_back(std::move(header));
777 for (
size_t i = 0;
i < nPayloads; ++
i) {
778 messages.emplace_back(transport->CreateMessage(100));
779 *(
reinterpret_cast<size_t*
>(messages.back()->GetData())) = nTotalPayloads;
782 REQUIRE(messages.size() == nPayloads + 1);
784 relayer.relay(messages[0]->GetData(), messages.data(), fakeInfo, messages.size(), nPayloads);
785 sequenceSize.emplace_back(nPayloads);
791 std::vector<RecordAction> ready;
792 relayer.getReadyToProcess(ready);
793 REQUIRE(ready.size() == 1);
794 REQUIRE(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
795 auto messageSet = relayer.consumeAllInputsForTimeslice(ready[0].slot);
797 REQUIRE(messageSet.size() == 1);
799 REQUIRE(messageSet[0].
size() == sequenceSize.size());
801 for (
auto seqid = 0; seqid < sequenceSize.size(); ++seqid) {
802 REQUIRE(messageSet[0].getNumberOfPayloads(seqid) == sequenceSize[seqid]);
803 for (
auto pi = 0; pi < messageSet[0].getNumberOfPayloads(seqid); ++pi) {
804 REQUIRE(messageSet[0].payload(seqid, pi));
805 auto const*
data = messageSet[0].payload(seqid, pi)->GetData();
806 REQUIRE(*(
reinterpret_cast<size_t const*
>(
data)) ==
counter);