Project
Loading...
Searching...
No Matches
test_DataRelayer.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <catch_amalgamated.hpp>
14#include "Headers/DataHeader.h"
15#include "Headers/Stack.h"
24#include "../src/DataRelayerHelpers.h"
28#include <Monitoring/Monitoring.h>
29#include <fairmq/TransportFactory.h>
30#include <fairmq/Channel.h>
34#include <array>
35#include <vector>
36#include <uv.h>
37
38using Monitoring = o2::monitoring::Monitoring;
39using namespace o2::framework;
43
44TEST_CASE("DataRelayer")
45{
46 ServiceRegistry registry;
47 ServiceRegistryRef ref{registry};
48 Monitoring monitoring;
49 const DriverConfig driverConfig{
50 .batch = false,
51 };
57 TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()), {});
58 int quickUpdateInterval = 1;
59 using MetricSpec = DataProcessingStats::MetricSpec;
60 std::vector<MetricSpec> specs{
61 MetricSpec{.name = "malformed_inputs", .metricId = static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), .minPublishInterval = quickUpdateInterval},
62 MetricSpec{.name = "dropped_computations", .metricId = static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), .minPublishInterval = quickUpdateInterval},
63 MetricSpec{.name = "dropped_incoming_messages", .metricId = static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), .minPublishInterval = quickUpdateInterval},
64 MetricSpec{.name = "relayed_messages", .metricId = static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), .minPublishInterval = quickUpdateInterval}};
65
66 for (auto& spec : specs) {
67 stats.registerMetric(spec);
68 }
69
71 ref.registerService(ServiceRegistryHelpers::handleForService<Monitoring>(&monitoring));
72 ref.registerService(ServiceRegistryHelpers::handleForService<DataProcessingStats>(&stats));
73 ref.registerService(ServiceRegistryHelpers::handleForService<DataProcessingStates>(&states));
74 ref.registerService(ServiceRegistryHelpers::handleForService<DriverConfig const>(&driverConfig));
75 ref.registerService(ServiceRegistryHelpers::handleForService<DeviceState>(&state));
76 // A simple test where an input is provided
77 // and the subsequent InputRecord is immediately requested.
78 SECTION("TestNoWait")
79 {
80 InputSpec spec{"clusters", "TPC", "CLUSTERS"};
81
82 std::vector<InputRoute> inputs = {
83 InputRoute{spec, 0, "Fake", 0}};
84
85 std::vector<ForwardRoute> forwards;
86 std::vector<InputChannelInfo> infos{1};
87 TimesliceIndex index{1, infos};
88 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
89
91 DataRelayer relayer(policy, inputs, index, {registry}, -1);
92 relayer.setPipelineLength(4);
93
94 // Let's create a dummy O2 Message with two headers in the stack:
95 // - DataHeader matching the one provided in the input
96 DataHeader dh;
97 dh.dataDescription = "CLUSTERS";
98 dh.dataOrigin = "TPC";
99 dh.subSpecification = 0;
100 dh.splitPayloadIndex = 0;
101 dh.splitPayloadParts = 1;
102
103 DataProcessingHeader dph{0, 1};
104 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
105 std::array<fair::mq::MessagePtr, 2> messages;
106 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
107 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, dph});
108 messages[1] = transport->CreateMessage(1000);
109 fair::mq::MessagePtr& header = messages[0];
110 fair::mq::MessagePtr& payload = messages[1];
111 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
112 relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
113 std::vector<RecordAction> ready;
114 relayer.getReadyToProcess(ready);
115 REQUIRE(ready.size() == 1);
116 REQUIRE(ready[0].slot.index == 0);
117 REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume);
118 REQUIRE(header.get() == nullptr);
119 REQUIRE(payload.get() == nullptr);
120 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
121 // one MessageSet with one PartRef with header and payload
122 REQUIRE(result.size() == 1);
123 REQUIRE((result.at(0) | count_parts{}) == 1);
124 }
125
126 //
127 SECTION("TestNoWaitMatcher")
128 {
130 auto specs = o2::framework::select("clusters:TPC/CLUSTERS");
131
132 std::vector<InputRoute> inputs = {
133 InputRoute{specs[0], 0, "Fake", 0}};
134
135 std::vector<ForwardRoute> forwards;
136 std::vector<InputChannelInfo> infos{1};
137 TimesliceIndex index{1, infos};
138 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
139
141 DataRelayer relayer(policy, inputs, index, {registry}, -1);
142 relayer.setPipelineLength(4);
143
144 // Let's create a dummy O2 Message with two headers in the stack:
145 // - DataHeader matching the one provided in the input
146 DataHeader dh;
147 dh.dataDescription = "CLUSTERS";
148 dh.dataOrigin = "TPC";
149 dh.subSpecification = 0;
150 dh.splitPayloadIndex = 0;
151 dh.splitPayloadParts = 1;
152
153 DataProcessingHeader dph{0, 1};
154 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
155 std::array<fair::mq::MessagePtr, 2> messages;
156 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
157 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, dph});
158 messages[1] = transport->CreateMessage(1000);
159 fair::mq::MessagePtr& header = messages[0];
160 fair::mq::MessagePtr& payload = messages[1];
161 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
162 relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
163 std::vector<RecordAction> ready;
164 relayer.getReadyToProcess(ready);
165 REQUIRE(ready.size() == 1);
166 REQUIRE(ready[0].slot.index == 0);
167 REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume);
168 REQUIRE(header.get() == nullptr);
169 REQUIRE(payload.get() == nullptr);
170 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
171 // one MessageSet with one PartRef with header and payload
172 REQUIRE(result.size() == 1);
173 REQUIRE((result.at(0) | count_parts{}) == 1);
174 }
175
176 // This test a more complicated set of inputs, and verifies that data is
177 // correctly relayed before being processed.
178 SECTION("TestRelay")
179 {
181 InputSpec spec1{
182 "clusters",
183 "TPC",
184 "CLUSTERS",
185 };
186 InputSpec spec2{
187 "clusters_its",
188 "ITS",
189 "CLUSTERS",
190 };
191
192 std::vector<InputRoute> inputs = {
193 InputRoute{spec1, 0, "Fake1", 0},
194 InputRoute{spec2, 1, "Fake2", 0}};
195
196 std::vector<ForwardRoute> forwards;
197
198 std::vector<InputChannelInfo> infos{1};
199 TimesliceIndex index{1, infos};
200 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
201
203 DataRelayer relayer(policy, inputs, index, {registry}, -1);
204 relayer.setPipelineLength(4);
205
206 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
207 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
208
209 auto createMessage = [&transport, &channelAlloc, &relayer](DataHeader& dh, size_t time) {
210 std::array<fair::mq::MessagePtr, 2> messages;
211 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, DataProcessingHeader{time, 1}});
212 messages[1] = transport->CreateMessage(1000);
213 fair::mq::MessagePtr& header = messages[0];
214 fair::mq::MessagePtr& payload = messages[1];
215 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
216 relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
217 REQUIRE(header.get() == nullptr);
218 REQUIRE(payload.get() == nullptr);
219 };
220
221 // Let's create a dummy O2 Message with two headers in the stack:
222 // - DataHeader matching the one provided in the input
223 DataHeader dh1;
224 dh1.dataDescription = "CLUSTERS";
225 dh1.dataOrigin = "TPC";
226 dh1.subSpecification = 0;
227 dh1.splitPayloadIndex = 0;
228 dh1.splitPayloadParts = 1;
229
230 // Let's create the second O2 Message:
231 DataHeader dh2;
232 dh2.dataDescription = "CLUSTERS";
233 dh2.dataOrigin = "ITS";
234 dh2.subSpecification = 0;
235 dh2.splitPayloadIndex = 0;
236 dh2.splitPayloadParts = 1;
237
238 createMessage(dh1, 0);
239 std::vector<RecordAction> ready;
240 relayer.getReadyToProcess(ready);
241 REQUIRE(ready.size() == 0);
242
243 createMessage(dh2, 0);
244 ready.clear();
245 relayer.getReadyToProcess(ready);
246 REQUIRE(ready.size() == 1);
247 REQUIRE(ready[0].slot.index == 0);
248 REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume);
249
250 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
251 // two MessageSets, each with one PartRef
252 REQUIRE(result.size() == 2);
253 REQUIRE((result.at(0) | count_parts{}) == 1);
254 REQUIRE((result.at(1) | count_parts{}) == 1);
255 }
256
257 // This test a more complicated set of inputs, and verifies that data is
258 // correctly relayed before being processed.
259 SECTION("TestRelayBug")
260 {
262 InputSpec spec1{
263 "clusters",
264 "TPC",
265 "CLUSTERS",
266 };
267 InputSpec spec2{
268 "clusters_its",
269 "ITS",
270 "CLUSTERS",
271 };
272
273 std::vector<InputRoute> inputs = {
274 InputRoute{spec1, 0, "Fake1", 0},
275 InputRoute{spec2, 1, "Fake2", 0}};
276
277 std::vector<ForwardRoute> forwards;
278
279 std::vector<InputChannelInfo> infos{1};
280 TimesliceIndex index{1, infos};
281 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
282
284 DataRelayer relayer(policy, inputs, index, {registry}, -1);
285 relayer.setPipelineLength(3);
286
287 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
288 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
289
290 auto createMessage = [&transport, &channelAlloc, &relayer](DataHeader& dh, size_t time) {
291 std::array<fair::mq::MessagePtr, 2> messages;
292 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, DataProcessingHeader{time, 1}});
293 messages[1] = transport->CreateMessage(1000);
294 fair::mq::MessagePtr& header = messages[0];
295 fair::mq::MessagePtr& payload = messages[1];
296 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
297 relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
298 REQUIRE(header.get() == nullptr);
299 REQUIRE(payload.get() == nullptr);
300 };
301
302 // Let's create a dummy O2 Message with two headers in the stack:
303 // - DataHeader matching the one provided in the input
304 DataHeader dh1;
305 dh1.dataDescription = "CLUSTERS";
306 dh1.dataOrigin = "TPC";
307 dh1.subSpecification = 0;
308 dh1.splitPayloadIndex = 0;
309 dh1.splitPayloadParts = 1;
310
311 // Let's create the second O2 Message:
312 DataHeader dh2;
313 dh2.dataDescription = "CLUSTERS";
314 dh2.dataOrigin = "ITS";
315 dh2.subSpecification = 0;
316 dh2.splitPayloadIndex = 0;
317 dh2.splitPayloadParts = 1;
318
319 // Let's create the second O2 Message:
320 DataHeader dh3;
321 dh3.dataDescription = "CLUSTERS";
322 dh3.dataOrigin = "FOO";
323 dh3.subSpecification = 0;
324 dh3.splitPayloadIndex = 0;
325 dh3.splitPayloadParts = 1;
326
328 createMessage(dh1, 0);
329 std::vector<RecordAction> ready;
330 relayer.getReadyToProcess(ready);
331 REQUIRE(ready.size() == 0);
332 createMessage(dh1, 1);
333 ready.clear();
334 relayer.getReadyToProcess(ready);
335 REQUIRE(ready.size() == 0);
336 createMessage(dh2, 0);
337 ready.clear();
338 relayer.getReadyToProcess(ready);
339 REQUIRE(ready.size() == 1);
340 REQUIRE(ready[0].slot.index == 0);
341 REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume);
342 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
343 createMessage(dh2, 1);
344 ready.clear();
345 relayer.getReadyToProcess(ready);
346 REQUIRE(ready.size() == 1);
347 REQUIRE(ready[0].slot.index == 1);
348 REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume);
349 result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
350 }
351
352 // This tests a simple cache pruning, where a single input is shifted out of
353 // the cache.
354 SECTION("TestCache")
355 {
357 InputSpec spec{"clusters", "TPC", "CLUSTERS"};
358
359 std::vector<InputRoute> inputs = {
360 InputRoute{spec, 0, "Fake", 0}};
361 std::vector<ForwardRoute> forwards;
362
364 std::vector<InputChannelInfo> infos{1};
365 TimesliceIndex index{1, infos};
366 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
367 DataRelayer relayer(policy, inputs, index, {registry}, -1);
368 // Only two messages to fill the cache.
369 relayer.setPipelineLength(2);
370
371 // Let's create a dummy O2 Message with two headers in the stack:
372 // - DataHeader matching the one provided in the input
373 DataHeader dh;
374 dh.dataDescription = "CLUSTERS";
375 dh.dataOrigin = "TPC";
376 dh.subSpecification = 0;
377 dh.splitPayloadIndex = 0;
378 dh.splitPayloadParts = 1;
379
380 DataProcessingHeader dph{0, 1};
381 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
382 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
383 auto createMessage = [&transport, &channelAlloc, &relayer, &dh](auto const& h) {
384 std::array<fair::mq::MessagePtr, 2> messages;
385 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, h});
386 messages[1] = transport->CreateMessage(1000);
387 fair::mq::MessagePtr& header = messages[0];
388 fair::mq::MessagePtr& payload = messages[1];
389 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
390 auto res = relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
391 REQUIRE((res.type != DataRelayer::RelayChoice::Type::WillRelay || header.get() == nullptr));
392 REQUIRE((res.type != DataRelayer::RelayChoice::Type::WillRelay || payload.get() == nullptr));
393 REQUIRE((res.type != DataRelayer::RelayChoice::Type::Backpressured || header.get() != nullptr));
394 REQUIRE((res.type != DataRelayer::RelayChoice::Type::Backpressured || payload.get() != nullptr));
395 };
396
397 // This fills the cache, and then empties it.
398 createMessage(DataProcessingHeader{0, 1});
399 createMessage(DataProcessingHeader{1, 1});
400 std::vector<RecordAction> ready;
401 relayer.getReadyToProcess(ready);
402 REQUIRE(ready.size() == 2);
403 REQUIRE(ready[0].slot.index == 1);
404 REQUIRE(ready[1].slot.index == 0);
405 REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume);
406 REQUIRE(ready[1].op == CompletionPolicy::CompletionOp::Consume);
407 for (size_t i = 0; i < ready.size(); ++i) {
408 auto result = relayer.consumeAllInputsForTimeslice(ready[i].slot);
409 }
410
411 // This fills the cache and makes 2 obsolete.
412 createMessage(DataProcessingHeader{2, 1});
413 createMessage(DataProcessingHeader{3, 1});
414 createMessage(DataProcessingHeader{4, 1});
415 ready.clear();
416 relayer.getReadyToProcess(ready);
417 REQUIRE(ready.size() == 2);
418
419 auto result1 = relayer.consumeAllInputsForTimeslice(ready[0].slot);
420 auto result2 = relayer.consumeAllInputsForTimeslice(ready[1].slot);
421 // One for the header, one for the payload
422 REQUIRE(result1.size() == 1);
423 REQUIRE(result2.size() == 1);
424 }
425
426 // This the any policy. Even when there are two inputs, given the any policy
427 // it will run immediately.
428 SECTION("TestPolicies")
429 {
431 InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
432 InputSpec spec2{"tracks", "TPC", "TRACKS"};
433
434 std::vector<InputRoute> inputs = {
435 InputRoute{spec1, 0, "Fake1", 0},
436 InputRoute{spec2, 1, "Fake2", 0},
437 };
438
439 std::vector<ForwardRoute> forwards;
440 std::vector<InputChannelInfo> infos{1};
441 TimesliceIndex index{1, infos};
442 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
443
445 DataRelayer relayer(policy, inputs, index, {registry}, -1);
446 // Only two messages to fill the cache.
447 relayer.setPipelineLength(2);
448
449 // Let's create a dummy O2 Message with two headers in the stack:
450 // - DataHeader matching the one provided in the input
451 DataHeader dh1;
452 dh1.dataDescription = "CLUSTERS";
453 dh1.dataOrigin = "TPC";
454 dh1.subSpecification = 0;
455 dh1.splitPayloadIndex = 0;
456 dh1.splitPayloadParts = 1;
457
458 DataHeader dh2;
459 dh2.dataDescription = "TRACKS";
460 dh2.dataOrigin = "TPC";
461 dh2.subSpecification = 0;
462 dh2.splitPayloadIndex = 0;
463 dh2.splitPayloadParts = 1;
464
465 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
466 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
467 auto createMessage = [&transport, &channelAlloc, &relayer](auto const& dh, auto const& h) {
468 std::array<fair::mq::MessagePtr, 2> messages;
469 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, h});
470 messages[1] = transport->CreateMessage(1000);
471 fair::mq::MessagePtr& header = messages[0];
472 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
473 return relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
474 };
475
476 // This fills the cache, and then empties it.
477 createMessage(dh1, DataProcessingHeader{0, 1});
478 std::vector<RecordAction> ready1;
479 relayer.getReadyToProcess(ready1);
480 REQUIRE(ready1.size() == 1);
481 REQUIRE(ready1[0].slot.index == 0);
482 REQUIRE(ready1[0].op == CompletionPolicy::CompletionOp::Process);
483
484 createMessage(dh1, DataProcessingHeader{1, 1});
485 std::vector<RecordAction> ready2;
486 relayer.getReadyToProcess(ready2);
487 REQUIRE(ready2.size() == 1);
488 REQUIRE(ready2[0].slot.index == 1);
489 REQUIRE(ready2[0].op == CompletionPolicy::CompletionOp::Process);
490
491 createMessage(dh2, DataProcessingHeader{1, 1});
492 std::vector<RecordAction> ready3;
493 relayer.getReadyToProcess(ready3);
494 REQUIRE(ready3.size() == 1);
495 REQUIRE(ready3[0].slot.index == 1);
496 REQUIRE(ready3[0].op == CompletionPolicy::CompletionOp::Consume);
497 }
498
500 SECTION("TestClear")
501 {
503 InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
504 InputSpec spec2{"tracks", "TPC", "TRACKS"};
505
506 std::vector<InputRoute> inputs = {
507 InputRoute{spec1, 0, "Fake1", 0},
508 InputRoute{spec2, 1, "Fake2", 0},
509 };
510
511 std::vector<ForwardRoute> forwards;
512 std::vector<InputChannelInfo> infos{1};
513 TimesliceIndex index{1, infos};
514 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
515
517 DataRelayer relayer(policy, inputs, index, {registry}, -1);
518 // Only two messages to fill the cache.
519 relayer.setPipelineLength(3);
520
521 // Let's create a dummy O2 Message with two headers in the stack:
522 // - DataHeader matching the one provided in the input
523 DataHeader dh1;
524 dh1.dataDescription = "CLUSTERS";
525 dh1.dataOrigin = "TPC";
526 dh1.subSpecification = 0;
527 dh1.splitPayloadIndex = 0;
528 dh1.splitPayloadParts = 1;
529
530 DataHeader dh2;
531 dh2.dataDescription = "TRACKS";
532 dh2.dataOrigin = "TPC";
533 dh2.subSpecification = 0;
534 dh2.splitPayloadIndex = 0;
535 dh2.splitPayloadParts = 1;
536
537 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
538 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
539 auto createMessage = [&transport, &channelAlloc, &relayer](auto const& dh, auto const& h) {
540 std::array<fair::mq::MessagePtr, 2> messages;
541 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, h});
542 messages[1] = transport->CreateMessage(1000);
543 fair::mq::MessagePtr& header = messages[0];
544 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
545 return relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
546 };
547
548 // This fills the cache, and then empties it.
549 createMessage(dh1, DataProcessingHeader{0, 1});
550 createMessage(dh1, DataProcessingHeader{1, 1});
551 createMessage(dh2, DataProcessingHeader{1, 1});
552 relayer.clear();
553 std::vector<RecordAction> ready;
554 relayer.getReadyToProcess(ready);
555 REQUIRE(ready.size() == 0);
556 }
557
559 SECTION("TestTooMany")
560 {
562 InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
563 InputSpec spec2{"tracks", "TPC", "TRACKS"};
564
565 std::vector<InputRoute> inputs = {
566 InputRoute{spec1, 0, "Fake1", 0},
567 InputRoute{spec2, 1, "Fake2", 0},
568 };
569
570 std::vector<ForwardRoute> forwards;
571 std::vector<InputChannelInfo> infos{1};
572 TimesliceIndex index{1, infos};
573 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
574
576 DataRelayer relayer(policy, inputs, index, {registry}, -1);
577 // Only two messages to fill the cache.
578 relayer.setPipelineLength(1);
579
580 // Let's create a dummy O2 Message with two headers in the stack:
581 // - DataHeader matching the one provided in the input
582 DataHeader dh1;
583 dh1.dataDescription = "CLUSTERS";
584 dh1.dataOrigin = "TPC";
585 dh1.subSpecification = 0;
586 dh1.splitPayloadIndex = 0;
587 dh1.splitPayloadParts = 1;
588
589 DataHeader dh2;
590 dh2.dataDescription = "TRACKS";
591 dh2.dataOrigin = "TPC";
592 dh2.subSpecification = 0;
593 dh2.splitPayloadIndex = 0;
594 dh2.splitPayloadParts = 1;
595
596 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
597 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
598
599 std::array<fair::mq::MessagePtr, 4> messages;
600 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh1, DataProcessingHeader{0, 1}});
601 messages[1] = transport->CreateMessage(1000);
602 fair::mq::MessagePtr& header = messages[0];
603 fair::mq::MessagePtr& payload = messages[1];
604 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
605 relayer.relay(header->GetData(), &messages[0], fakeInfo, 2);
606 REQUIRE(header.get() == nullptr);
607 REQUIRE(payload.get() == nullptr);
608 // This fills the cache, and then waits.
609 messages[2] = o2::pmr::getMessage(Stack{channelAlloc, dh1, DataProcessingHeader{1, 1}});
610 messages[3] = transport->CreateMessage(1000);
611 fair::mq::MessagePtr& header2 = messages[2];
612 fair::mq::MessagePtr& payload2 = messages[3];
613 DataRelayer::InputInfo fakeInfo2{2, 2, DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
614 auto action = relayer.relay(header2->GetData(), &messages[2], fakeInfo2, 2);
615 REQUIRE(action.type == DataRelayer::RelayChoice::Type::Backpressured);
616 REQUIRE(header2.get() != nullptr);
617 REQUIRE(payload2.get() != nullptr);
618 }
619
620 SECTION("SplitParts")
621 {
623 InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
624 InputSpec spec2{"its", "ITS", "CLUSTERS"};
625
626 std::vector<InputRoute> inputs = {
627 InputRoute{spec1, 0, "Fake1", 0},
628 InputRoute{spec2, 0, "Fake2", 0},
629 };
630
631 std::vector<ForwardRoute> forwards;
632 std::vector<InputChannelInfo> infos{1};
633 TimesliceIndex index{1, infos};
634 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
635
637 DataRelayer relayer(policy, inputs, index, {registry}, -1);
638 // Only two messages to fill the cache.
639 relayer.setPipelineLength(1);
640
641 // Let's create a dummy O2 Message with two headers in the stack:
642 // - DataHeader matching the one provided in the input
643 DataHeader dh1;
644 dh1.dataDescription = "CLUSTERS";
645 dh1.dataOrigin = "TPC";
646 dh1.subSpecification = 0;
647 dh1.splitPayloadIndex = 0;
648 dh1.splitPayloadParts = 1;
649
650 DataHeader dh2;
651 dh2.dataDescription = "TRACKS";
652 dh2.dataOrigin = "TPC";
653 dh2.subSpecification = 0;
654 dh2.splitPayloadIndex = 0;
655 dh2.splitPayloadParts = 1;
656
657 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
658 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
659
660 std::array<fair::mq::MessagePtr, 6> messages;
661 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh1, DataProcessingHeader{0, 1}});
662 messages[1] = transport->CreateMessage(1000);
663 fair::mq::MessagePtr& header = messages[0];
664 fair::mq::MessagePtr& payload = messages[1];
665 DataRelayer::InputInfo fakeInfo{0, 2, DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
666 relayer.relay(header->GetData(), &messages[0], fakeInfo, 2);
667 REQUIRE(header.get() == nullptr);
668 REQUIRE(payload.get() == nullptr);
669 // This fills the cache, and then waits.
670 messages[2] = o2::pmr::getMessage(Stack{channelAlloc, dh1, DataProcessingHeader{1, 1}});
671 messages[3] = transport->CreateMessage(1000);
672 fair::mq::MessagePtr& header2 = messages[2];
673 fair::mq::MessagePtr& payload2 = messages[3];
674 DataRelayer::InputInfo fakeInfo2{2, 2, DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
675 auto action = relayer.relay(header2->GetData(), &messages[2], fakeInfo, 2);
676 REQUIRE(action.type == DataRelayer::RelayChoice::Type::Backpressured);
677 CHECK(action.timeslice.value == 1);
678 REQUIRE(header2.get() != nullptr);
679 REQUIRE(payload2.get() != nullptr);
680 // This fills the cache, and then waits.
681 messages[4] = o2::pmr::getMessage(Stack{channelAlloc, dh1, DataProcessingHeader{1, 1}});
682 messages[5] = transport->CreateMessage(1000);
683 DataRelayer::InputInfo fakeInfo3{4, 2, DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
684 relayer.relay(header2->GetData(), &messages[4], fakeInfo3, 2);
685 REQUIRE(action.type == DataRelayer::RelayChoice::Type::Backpressured);
686 CHECK(action.timeslice.value == 1);
687 REQUIRE(header2.get() != nullptr);
688 REQUIRE(payload2.get() != nullptr);
689 }
690
691 SECTION("SplitPayloadPairs")
692 {
694 InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
695
696 std::vector<InputRoute> inputs = {
697 InputRoute{spec1, 0, "Fake1", 0},
698 };
699
700 std::vector<ForwardRoute> forwards;
701 std::vector<InputChannelInfo> infos{1};
702 TimesliceIndex index{1, infos};
703 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
704
706 DataRelayer relayer(policy, inputs, index, {registry}, -1);
707 relayer.setPipelineLength(4);
708
709 DataHeader dh{"CLUSTERS", "TPC", 0};
710
711 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
712 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
713 size_t timeslice = 0;
714
715 const int nSplitParts = 100;
716 std::vector<std::unique_ptr<fair::mq::Message>> splitParts;
717 splitParts.reserve(2 * nSplitParts);
718
719 for (size_t i = 0; i < nSplitParts; ++i) {
720 dh.splitPayloadIndex = i;
721 dh.splitPayloadParts = nSplitParts;
722
723 fair::mq::MessagePtr header = o2::pmr::getMessage(Stack{channelAlloc, dh, DataProcessingHeader{timeslice, 1}});
724 fair::mq::MessagePtr payload = transport->CreateMessage(100);
725
726 splitParts.emplace_back(std::move(header));
727 splitParts.emplace_back(std::move(payload));
728 }
729 REQUIRE(splitParts.size() == 2 * nSplitParts);
730
731 DataRelayer::InputInfo fakeInfo{0, splitParts.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
732 relayer.relay(splitParts[0]->GetData(), splitParts.data(), fakeInfo, splitParts.size());
733 std::vector<RecordAction> ready;
734 relayer.getReadyToProcess(ready);
735 REQUIRE(ready.size() == 1);
736 REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume);
737 auto messageSet = relayer.consumeAllInputsForTimeslice(ready[0].slot);
738 // we have one input route and thus one message set containing pairs for all
739 // payloads
740 REQUIRE(messageSet.size() == 1);
741 REQUIRE((messageSet[0] | count_parts{}) == nSplitParts);
742 REQUIRE((messageSet[0] | get_num_payloads{0}) == 1);
743 }
744
745 SECTION("SplitPayloadSequence")
746 {
748 InputSpec spec1{"clusters", "TST", "COUNTER"};
749
750 std::vector<InputRoute> inputs = {
751 InputRoute{spec1, 0, "Fake1", 0},
752 };
753
754 std::vector<ForwardRoute> forwards;
755 std::vector<InputChannelInfo> infos{1};
756 TimesliceIndex index{1, infos};
757 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
758
760 DataRelayer relayer(policy, inputs, index, {registry}, -1);
761 relayer.setPipelineLength(4);
762
763 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
764 size_t timeslice = 0;
765
766 std::vector<size_t> sequenceSize;
767 size_t nTotalPayloads = 0;
768
769 auto createSequence = [&nTotalPayloads, &timeslice, &sequenceSize, &transport, &relayer](size_t nPayloads) -> void {
770 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
771 std::vector<std::unique_ptr<fair::mq::Message>> messages;
772 messages.reserve(nPayloads + 1);
773 DataHeader dh{"COUNTER", "TST", 0};
774
775 // one header with index set to the number of split parts indicates sequence
776 // of payloads without additional headers
777 dh.splitPayloadIndex = nPayloads;
778 dh.splitPayloadParts = nPayloads;
779 fair::mq::MessagePtr header = o2::pmr::getMessage(Stack{channelAlloc, dh, DataProcessingHeader{timeslice, 1}});
780 messages.emplace_back(std::move(header));
781
782 for (size_t i = 0; i < nPayloads; ++i) {
783 messages.emplace_back(transport->CreateMessage(100));
784 *(reinterpret_cast<size_t*>(messages.back()->GetData())) = nTotalPayloads;
785 ++nTotalPayloads;
786 }
787 REQUIRE(messages.size() == nPayloads + 1);
788 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
789 relayer.relay(messages[0]->GetData(), messages.data(), fakeInfo, messages.size(), nPayloads);
790 sequenceSize.emplace_back(nPayloads);
791 };
792 createSequence(100);
793 createSequence(1);
794 createSequence(42);
795
796 std::vector<RecordAction> ready;
797 relayer.getReadyToProcess(ready);
798 REQUIRE(ready.size() == 1);
799 REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume);
800 auto messageSet = relayer.consumeAllInputsForTimeslice(ready[0].slot);
801 // we have one input route
802 REQUIRE(messageSet.size() == 1);
803 // one message set containing number of added sequences of messages
804 REQUIRE((messageSet[0] | count_parts{}) == sequenceSize.size());
805 size_t counter = 0;
806 for (size_t seqid = 0; seqid < sequenceSize.size(); ++seqid) {
807 REQUIRE((messageSet[0] | get_num_payloads{seqid}) == sequenceSize[seqid]);
808 for (size_t pi = 0; pi < (messageSet[0] | get_num_payloads{seqid}); ++pi) {
809 REQUIRE((messageSet[0] | get_payload{seqid, pi}));
810 auto const* data = (messageSet[0] | get_payload{seqid, pi})->GetData();
811 REQUIRE(*(reinterpret_cast<size_t const*>(data)) == counter);
812 ++counter;
813 }
814 }
815 }
816
817 SECTION("ProcessDanglingInputs")
818 {
819 InputSpec spec{"condition", "TST", "COND"};
820 std::vector<InputRoute> inputs = {
821 InputRoute{spec, 0, "from_source_to_self", 0}};
822
823 std::vector<InputChannelInfo> infos{1};
824 TimesliceIndex index{1, infos};
825 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
826
827 // Bind a fake input channel so FairMQDeviceProxy::getInputChannelIndex works
828 FairMQDeviceProxy proxy;
829 std::vector<fair::mq::Channel> channels{fair::mq::Channel("from_source_to_self")};
830 auto findChannel = [&channels](std::string const& name) -> fair::mq::Channel& {
831 for (auto& ch : channels) {
832 if (ch.GetName() == name) {
833 return ch;
834 }
835 }
836 throw std::runtime_error("Channel not found: " + name);
837 };
838 proxy.bind({}, inputs, {}, findChannel, [] { return false; });
839 ref.registerService(ServiceRegistryHelpers::handleForService<FairMQDeviceProxy>(&proxy));
840
842 DataRelayer relayer(policy, inputs, index, {registry}, -1);
843 relayer.setPipelineLength(4);
844
845 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
846 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
847
848 DataHeader dh{"COND", "TST", 0};
849 dh.splitPayloadParts = 1;
850 dh.splitPayloadIndex = 0;
851 DataProcessingHeader dph{0, 1};
852
853 ExpirationHandler handler;
854 handler.name = "test-condition";
855 handler.routeIndex = RouteIndex{0};
856 handler.lifetime = Lifetime::Condition;
857
858 // Creator: claim an empty slot and assign timeslice 0 to it
859 handler.creator = [](ServiceRegistryRef services, ChannelIndex channelIndex) -> TimesliceSlot {
860 auto& index = services.get<TimesliceIndex>();
861 for (size_t si = 0; si < index.size(); si++) {
862 TimesliceSlot slot{si};
863 if (!index.isValid(slot)) {
864 index.associate(TimesliceId{0}, slot);
865 (void)index.setOldestPossibleInput({1}, channelIndex);
866 return slot;
867 }
868 }
870 };
871
872 // Checker: always trigger expiration
873 handler.checker = LifetimeHelpers::expireAlways();
874
875 // Handler: materialise a dummy header+payload into the PartRef
876 handler.handler = [&transport, &channelAlloc, &dh, &dph](ServiceRegistryRef, PartRef& ref, data_matcher::VariableContext&) {
877 ref.header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
878 ref.payload = transport->CreateMessage(4);
879 };
880
881 std::vector<ExpirationHandler> handlers{handler};
882 auto activity = relayer.processDanglingInputs(handlers, {registry}, true);
883
884 REQUIRE(activity.newSlots == 1);
885 REQUIRE(activity.expiredSlots == 1);
886
887 // The materialised data should now be ready to consume
888 std::vector<RecordAction> ready;
889 relayer.getReadyToProcess(ready);
890 REQUIRE(ready.size() == 1);
891 REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume);
892
893 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
894 REQUIRE(result.size() == 1);
895 REQUIRE((result.at(0) | count_parts{}) == 1);
896 }
897
898 SECTION("ProcessDanglingInputsSkipsWhenDataPresent")
899 {
900 // processDanglingInputs must not overwrite a slot that already has data.
901 // This is guarded by the (part.messages | get_header{0}) != nullptr check.
902 InputSpec spec{"condition", "TST", "COND"};
903 std::vector<InputRoute> inputs = {
904 InputRoute{spec, 0, "from_source_to_self", 0}};
905
906 std::vector<InputChannelInfo> infos{1};
907 TimesliceIndex index{1, infos};
908 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
909
910 FairMQDeviceProxy proxy;
911 std::vector<fair::mq::Channel> channels{fair::mq::Channel("from_source_to_self")};
912 auto findChannel = [&channels](std::string const& name) -> fair::mq::Channel& {
913 for (auto& ch : channels) {
914 if (ch.GetName() == name) {
915 return ch;
916 }
917 }
918 throw std::runtime_error("Channel not found: " + name);
919 };
920 proxy.bind({}, inputs, {}, findChannel, [] { return false; });
921 ref.registerService(ServiceRegistryHelpers::handleForService<FairMQDeviceProxy>(&proxy));
922
924 DataRelayer relayer(policy, inputs, index, {registry}, -1);
925 relayer.setPipelineLength(4);
926
927 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
928 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
929
930 DataHeader dh{"COND", "TST", 0};
931 dh.splitPayloadParts = 1;
932 dh.splitPayloadIndex = 0;
933 DataProcessingHeader dph{0, 1};
934
935 // Build an expiration handler that always tries to expire
936 ExpirationHandler handler;
937 handler.name = "test-condition";
938 handler.routeIndex = RouteIndex{0};
939 handler.lifetime = Lifetime::Condition;
940 handler.creator = [](ServiceRegistryRef services, ChannelIndex channelIndex) -> TimesliceSlot {
941 auto& index = services.get<TimesliceIndex>();
942 for (size_t si = 0; si < index.size(); si++) {
943 TimesliceSlot slot{si};
944 if (!index.isValid(slot)) {
945 index.associate(TimesliceId{0}, slot);
946 (void)index.setOldestPossibleInput({1}, channelIndex);
947 return slot;
948 }
949 }
951 };
953 int handlerCallCount = 0;
954 handler.handler = [&transport, &channelAlloc, &dh, &dph, &handlerCallCount](ServiceRegistryRef, PartRef& ref, data_matcher::VariableContext&) {
955 ref.header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
956 ref.payload = transport->CreateMessage(4);
957 handlerCallCount++;
958 };
959 std::vector<ExpirationHandler> handlers{handler};
960
961 // First call: slot is empty, so the handler fires and materialises data
962 auto activity1 = relayer.processDanglingInputs(handlers, {registry}, true);
963 REQUIRE(activity1.expiredSlots == 1);
964 REQUIRE(handlerCallCount == 1);
965
966 // Second call: slot already has data — the handler must NOT fire again
967 auto activity2 = relayer.processDanglingInputs(handlers, {registry}, false);
968 REQUIRE(activity2.expiredSlots == 0);
969 REQUIRE(handlerCallCount == 1); // handler was not called a second time
970 }
971}
benchmark::State & state
int16_t time
Definition RawEventData.h:4
int32_t i
uint32_t op
uint32_t res
Definition RawData.h:0
o2::monitoring::Monitoring Monitoring
Class for time synchronization of RawReader instances.
void setPipelineLength(size_t s)
Tune the maximum number of in flight timeslices this can handle.
void bind(std::vector< OutputRoute > const &outputs, std::vector< InputRoute > const &inputs, std::vector< ForwardRoute > const &forwards, std::function< fair::mq::Channel &(std::string const &)> bindChannelByName, std::function< bool(void)> newStateRequestedCallback)
#define CHECK
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint index
Definition glcorearb.h:781
GLuint const GLchar * name
Definition glcorearb.h:781
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
Definition glcorearb.h:5500
GLboolean * data
Definition glcorearb.h:298
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
GLint ref
Definition glcorearb.h:291
GLuint * states
Definition glcorearb.h:4932
GLuint counter
Definition glcorearb.h:3987
Defining PrimaryVertex explicitly as messageable.
Definition Cartesian.h:288
std::vector< InputSpec > select(char const *matcher="")
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
static constexpr int INVALID
static CompletionPolicy processWhenAny(const char *name, CompletionPolicy::Matcher matcher)
static CompletionPolicy consumeWhenAll(const char *name, CompletionPolicy::Matcher matcher)
Default Completion policy. When all the parts of a record have arrived, consume them.
static CompletionPolicy consumeWhenAny(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
Helper struct to hold statistics about the data processing happening.
Running state information of a given device.
Definition DeviceState.h:34
bool batch
Whether the driver was started in batch mode or not.
static ExpirationHandler::Checker expireAlways()
Reference to an inflight part.
Definition PartRef.h:24
static constexpr uint64_t INVALID
static std::function< int64_t(int64_t base, int64_t offset)> defaultCPUTimeConfigurator(uv_loop_t *loop)
static std::function< void(int64_t &base, int64_t &offset)> defaultRealtimeBaseConfigurator(uint64_t offset, uv_loop_t *loop)
the main header struct
Definition DataHeader.h:620
SplitPayloadPartsType splitPayloadParts
Definition DataHeader.h:648
DataDescription dataDescription
Definition DataHeader.h:638
SubSpecificationType subSpecification
Definition DataHeader.h:658
SplitPayloadIndexType splitPayloadIndex
Definition DataHeader.h:663
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:33
TEST_CASE("DataRelayer")
std::vector< ChannelData > channels