Project
Loading...
Searching...
No Matches
test_DataRelayer.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <catch_amalgamated.hpp>
14#include "Headers/DataHeader.h"
15#include "Headers/Stack.h"
23#include "../src/DataRelayerHelpers.h"
27#include <Monitoring/Monitoring.h>
28#include <fairmq/TransportFactory.h>
29#include <fairmq/MemoryResources.h>
30#include <array>
31#include <vector>
32#include <uv.h>
33
34using Monitoring = o2::monitoring::Monitoring;
35using namespace o2::framework;
39
40TEST_CASE("DataRelayer")
41{
42 ServiceRegistry registry;
43 ServiceRegistryRef ref{registry};
44 Monitoring monitoring;
45 const DriverConfig driverConfig{
46 .batch = false,
47 };
53 TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()), {});
54 int quickUpdateInterval = 1;
55 using MetricSpec = DataProcessingStats::MetricSpec;
56 std::vector<MetricSpec> specs{
57 MetricSpec{.name = "malformed_inputs", .metricId = static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), .minPublishInterval = quickUpdateInterval},
58 MetricSpec{.name = "dropped_computations", .metricId = static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), .minPublishInterval = quickUpdateInterval},
59 MetricSpec{.name = "dropped_incoming_messages", .metricId = static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), .minPublishInterval = quickUpdateInterval},
60 MetricSpec{.name = "relayed_messages", .metricId = static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), .minPublishInterval = quickUpdateInterval}};
61
62 for (auto& spec : specs) {
63 stats.registerMetric(spec);
64 }
65
67 ref.registerService(ServiceRegistryHelpers::handleForService<Monitoring>(&monitoring));
68 ref.registerService(ServiceRegistryHelpers::handleForService<DataProcessingStats>(&stats));
69 ref.registerService(ServiceRegistryHelpers::handleForService<DataProcessingStates>(&states));
70 ref.registerService(ServiceRegistryHelpers::handleForService<DriverConfig const>(&driverConfig));
71 ref.registerService(ServiceRegistryHelpers::handleForService<DeviceState>(&state));
72 // A simple test where an input is provided
73 // and the subsequent InputRecord is immediately requested.
74 SECTION("TestNoWait")
75 {
76 InputSpec spec{"clusters", "TPC", "CLUSTERS"};
77
78 std::vector<InputRoute> inputs = {
79 InputRoute{spec, 0, "Fake", 0}};
80
81 std::vector<ForwardRoute> forwards;
82 std::vector<InputChannelInfo> infos{1};
83 TimesliceIndex index{1, infos};
84 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
85
87 DataRelayer relayer(policy, inputs, index, {registry});
88 relayer.setPipelineLength(4);
89
90 // Let's create a dummy O2 Message with two headers in the stack:
91 // - DataHeader matching the one provided in the input
92 DataHeader dh;
93 dh.dataDescription = "CLUSTERS";
94 dh.dataOrigin = "TPC";
95 dh.subSpecification = 0;
96 dh.splitPayloadIndex = 0;
97 dh.splitPayloadParts = 1;
98
99 DataProcessingHeader dph{0, 1};
100 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
101 std::array<fair::mq::MessagePtr, 2> messages;
102 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
103 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, dph});
104 messages[1] = transport->CreateMessage(1000);
105 fair::mq::MessagePtr& header = messages[0];
106 fair::mq::MessagePtr& payload = messages[1];
107 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
108 relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
109 std::vector<RecordAction> ready;
110 relayer.getReadyToProcess(ready);
111 REQUIRE(ready.size() == 1);
112 REQUIRE(ready[0].slot.index == 0);
113 REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume);
114 REQUIRE(header.get() == nullptr);
115 REQUIRE(payload.get() == nullptr);
116 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
117 // one MessageSet with one PartRef with header and payload
118 REQUIRE(result.size() == 1);
119 REQUIRE(result.at(0).size() == 1);
120 }
121
122 //
123 SECTION("TestNoWaitMatcher")
124 {
126 auto specs = o2::framework::select("clusters:TPC/CLUSTERS");
127
128 std::vector<InputRoute> inputs = {
129 InputRoute{specs[0], 0, "Fake", 0}};
130
131 std::vector<ForwardRoute> forwards;
132 std::vector<InputChannelInfo> infos{1};
133 TimesliceIndex index{1, infos};
134 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
135
137 DataRelayer relayer(policy, inputs, index, {registry});
138 relayer.setPipelineLength(4);
139
140 // Let's create a dummy O2 Message with two headers in the stack:
141 // - DataHeader matching the one provided in the input
142 DataHeader dh;
143 dh.dataDescription = "CLUSTERS";
144 dh.dataOrigin = "TPC";
145 dh.subSpecification = 0;
146 dh.splitPayloadIndex = 0;
147 dh.splitPayloadParts = 1;
148
149 DataProcessingHeader dph{0, 1};
150 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
151 std::array<fair::mq::MessagePtr, 2> messages;
152 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
153 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, dph});
154 messages[1] = transport->CreateMessage(1000);
155 fair::mq::MessagePtr& header = messages[0];
156 fair::mq::MessagePtr& payload = messages[1];
157 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
158 relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
159 std::vector<RecordAction> ready;
160 relayer.getReadyToProcess(ready);
161 REQUIRE(ready.size() == 1);
162 REQUIRE(ready[0].slot.index == 0);
163 REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume);
164 REQUIRE(header.get() == nullptr);
165 REQUIRE(payload.get() == nullptr);
166 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
167 // one MessageSet with one PartRef with header and payload
168 REQUIRE(result.size() == 1);
169 REQUIRE(result.at(0).size() == 1);
170 }
171
172 // This test a more complicated set of inputs, and verifies that data is
173 // correctly relayed before being processed.
174 SECTION("TestRelay")
175 {
177 InputSpec spec1{
178 "clusters",
179 "TPC",
180 "CLUSTERS",
181 };
182 InputSpec spec2{
183 "clusters_its",
184 "ITS",
185 "CLUSTERS",
186 };
187
188 std::vector<InputRoute> inputs = {
189 InputRoute{spec1, 0, "Fake1", 0},
190 InputRoute{spec2, 1, "Fake2", 0}};
191
192 std::vector<ForwardRoute> forwards;
193
194 std::vector<InputChannelInfo> infos{1};
195 TimesliceIndex index{1, infos};
196 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
197
199 DataRelayer relayer(policy, inputs, index, {registry});
200 relayer.setPipelineLength(4);
201
202 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
203 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
204
205 auto createMessage = [&transport, &channelAlloc, &relayer](DataHeader& dh, size_t time) {
206 std::array<fair::mq::MessagePtr, 2> messages;
207 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, DataProcessingHeader{time, 1}});
208 messages[1] = transport->CreateMessage(1000);
209 fair::mq::MessagePtr& header = messages[0];
210 fair::mq::MessagePtr& payload = messages[1];
211 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
212 relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
213 REQUIRE(header.get() == nullptr);
214 REQUIRE(payload.get() == nullptr);
215 };
216
217 // Let's create a dummy O2 Message with two headers in the stack:
218 // - DataHeader matching the one provided in the input
219 DataHeader dh1;
220 dh1.dataDescription = "CLUSTERS";
221 dh1.dataOrigin = "TPC";
222 dh1.subSpecification = 0;
223 dh1.splitPayloadIndex = 0;
224 dh1.splitPayloadParts = 1;
225
226 // Let's create the second O2 Message:
227 DataHeader dh2;
228 dh2.dataDescription = "CLUSTERS";
229 dh2.dataOrigin = "ITS";
230 dh2.subSpecification = 0;
231 dh2.splitPayloadIndex = 0;
232 dh2.splitPayloadParts = 1;
233
234 createMessage(dh1, 0);
235 std::vector<RecordAction> ready;
236 relayer.getReadyToProcess(ready);
237 REQUIRE(ready.size() == 0);
238
239 createMessage(dh2, 0);
240 ready.clear();
241 relayer.getReadyToProcess(ready);
242 REQUIRE(ready.size() == 1);
243 REQUIRE(ready[0].slot.index == 0);
244 REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume);
245
246 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
247 // two MessageSets, each with one PartRef
248 REQUIRE(result.size() == 2);
249 REQUIRE(result.at(0).size() == 1);
250 REQUIRE(result.at(1).size() == 1);
251 }
252
253 // This test a more complicated set of inputs, and verifies that data is
254 // correctly relayed before being processed.
255 SECTION("TestRelayBug")
256 {
258 InputSpec spec1{
259 "clusters",
260 "TPC",
261 "CLUSTERS",
262 };
263 InputSpec spec2{
264 "clusters_its",
265 "ITS",
266 "CLUSTERS",
267 };
268
269 std::vector<InputRoute> inputs = {
270 InputRoute{spec1, 0, "Fake1", 0},
271 InputRoute{spec2, 1, "Fake2", 0}};
272
273 std::vector<ForwardRoute> forwards;
274
275 std::vector<InputChannelInfo> infos{1};
276 TimesliceIndex index{1, infos};
277 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
278
280 DataRelayer relayer(policy, inputs, index, {registry});
281 relayer.setPipelineLength(3);
282
283 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
284 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
285
286 auto createMessage = [&transport, &channelAlloc, &relayer](DataHeader& dh, size_t time) {
287 std::array<fair::mq::MessagePtr, 2> messages;
288 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, DataProcessingHeader{time, 1}});
289 messages[1] = transport->CreateMessage(1000);
290 fair::mq::MessagePtr& header = messages[0];
291 fair::mq::MessagePtr& payload = messages[1];
292 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
293 relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
294 REQUIRE(header.get() == nullptr);
295 REQUIRE(payload.get() == nullptr);
296 };
297
298 // Let's create a dummy O2 Message with two headers in the stack:
299 // - DataHeader matching the one provided in the input
300 DataHeader dh1;
301 dh1.dataDescription = "CLUSTERS";
302 dh1.dataOrigin = "TPC";
303 dh1.subSpecification = 0;
304 dh1.splitPayloadIndex = 0;
305 dh1.splitPayloadParts = 1;
306
307 // Let's create the second O2 Message:
308 DataHeader dh2;
309 dh2.dataDescription = "CLUSTERS";
310 dh2.dataOrigin = "ITS";
311 dh2.subSpecification = 0;
312 dh2.splitPayloadIndex = 0;
313 dh2.splitPayloadParts = 1;
314
315 // Let's create the second O2 Message:
316 DataHeader dh3;
317 dh3.dataDescription = "CLUSTERS";
318 dh3.dataOrigin = "FOO";
319 dh3.subSpecification = 0;
320 dh3.splitPayloadIndex = 0;
321 dh3.splitPayloadParts = 1;
322
324 createMessage(dh1, 0);
325 std::vector<RecordAction> ready;
326 relayer.getReadyToProcess(ready);
327 REQUIRE(ready.size() == 0);
328 createMessage(dh1, 1);
329 ready.clear();
330 relayer.getReadyToProcess(ready);
331 REQUIRE(ready.size() == 0);
332 createMessage(dh2, 0);
333 ready.clear();
334 relayer.getReadyToProcess(ready);
335 REQUIRE(ready.size() == 1);
336 REQUIRE(ready[0].slot.index == 0);
337 REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume);
338 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
339 createMessage(dh2, 1);
340 ready.clear();
341 relayer.getReadyToProcess(ready);
342 REQUIRE(ready.size() == 1);
343 REQUIRE(ready[0].slot.index == 1);
344 REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume);
345 result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
346 }
347
348 // This tests a simple cache pruning, where a single input is shifted out of
349 // the cache.
350 SECTION("TestCache")
351 {
353 InputSpec spec{"clusters", "TPC", "CLUSTERS"};
354
355 std::vector<InputRoute> inputs = {
356 InputRoute{spec, 0, "Fake", 0}};
357 std::vector<ForwardRoute> forwards;
358
360 std::vector<InputChannelInfo> infos{1};
361 TimesliceIndex index{1, infos};
362 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
363 DataRelayer relayer(policy, inputs, index, {registry});
364 // Only two messages to fill the cache.
365 relayer.setPipelineLength(2);
366
367 // Let's create a dummy O2 Message with two headers in the stack:
368 // - DataHeader matching the one provided in the input
369 DataHeader dh;
370 dh.dataDescription = "CLUSTERS";
371 dh.dataOrigin = "TPC";
372 dh.subSpecification = 0;
373 dh.splitPayloadIndex = 0;
374 dh.splitPayloadParts = 1;
375
376 DataProcessingHeader dph{0, 1};
377 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
378 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
379 auto createMessage = [&transport, &channelAlloc, &relayer, &dh](auto const& h) {
380 std::array<fair::mq::MessagePtr, 2> messages;
381 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, h});
382 messages[1] = transport->CreateMessage(1000);
383 fair::mq::MessagePtr& header = messages[0];
384 fair::mq::MessagePtr& payload = messages[1];
385 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
386 auto res = relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
387 REQUIRE((res.type != DataRelayer::RelayChoice::Type::WillRelay || header.get() == nullptr));
388 REQUIRE((res.type != DataRelayer::RelayChoice::Type::WillRelay || payload.get() == nullptr));
389 REQUIRE((res.type != DataRelayer::RelayChoice::Type::Backpressured || header.get() != nullptr));
390 REQUIRE((res.type != DataRelayer::RelayChoice::Type::Backpressured || payload.get() != nullptr));
391 };
392
393 // This fills the cache, and then empties it.
394 createMessage(DataProcessingHeader{0, 1});
395 createMessage(DataProcessingHeader{1, 1});
396 std::vector<RecordAction> ready;
397 relayer.getReadyToProcess(ready);
398 REQUIRE(ready.size() == 2);
399 REQUIRE(ready[0].slot.index == 1);
400 REQUIRE(ready[1].slot.index == 0);
401 REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume);
402 REQUIRE(ready[1].op == CompletionPolicy::CompletionOp::Consume);
403 for (size_t i = 0; i < ready.size(); ++i) {
404 auto result = relayer.consumeAllInputsForTimeslice(ready[i].slot);
405 }
406
407 // This fills the cache and makes 2 obsolete.
408 createMessage(DataProcessingHeader{2, 1});
409 createMessage(DataProcessingHeader{3, 1});
410 createMessage(DataProcessingHeader{4, 1});
411 ready.clear();
412 relayer.getReadyToProcess(ready);
413 REQUIRE(ready.size() == 2);
414
415 auto result1 = relayer.consumeAllInputsForTimeslice(ready[0].slot);
416 auto result2 = relayer.consumeAllInputsForTimeslice(ready[1].slot);
417 // One for the header, one for the payload
418 REQUIRE(result1.size() == 1);
419 REQUIRE(result2.size() == 1);
420 }
421
422 // This the any policy. Even when there are two inputs, given the any policy
423 // it will run immediately.
424 SECTION("TestPolicies")
425 {
427 InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
428 InputSpec spec2{"tracks", "TPC", "TRACKS"};
429
430 std::vector<InputRoute> inputs = {
431 InputRoute{spec1, 0, "Fake1", 0},
432 InputRoute{spec2, 1, "Fake2", 0},
433 };
434
435 std::vector<ForwardRoute> forwards;
436 std::vector<InputChannelInfo> infos{1};
437 TimesliceIndex index{1, infos};
438 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
439
441 DataRelayer relayer(policy, inputs, index, {registry});
442 // Only two messages to fill the cache.
443 relayer.setPipelineLength(2);
444
445 // Let's create a dummy O2 Message with two headers in the stack:
446 // - DataHeader matching the one provided in the input
447 DataHeader dh1;
448 dh1.dataDescription = "CLUSTERS";
449 dh1.dataOrigin = "TPC";
450 dh1.subSpecification = 0;
451 dh1.splitPayloadIndex = 0;
452 dh1.splitPayloadParts = 1;
453
454 DataHeader dh2;
455 dh2.dataDescription = "TRACKS";
456 dh2.dataOrigin = "TPC";
457 dh2.subSpecification = 0;
458 dh2.splitPayloadIndex = 0;
459 dh2.splitPayloadParts = 1;
460
461 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
462 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
463 auto createMessage = [&transport, &channelAlloc, &relayer](auto const& dh, auto const& h) {
464 std::array<fair::mq::MessagePtr, 2> messages;
465 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, h});
466 messages[1] = transport->CreateMessage(1000);
467 fair::mq::MessagePtr& header = messages[0];
468 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
469 return relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
470 };
471
472 // This fills the cache, and then empties it.
473 createMessage(dh1, DataProcessingHeader{0, 1});
474 std::vector<RecordAction> ready1;
475 relayer.getReadyToProcess(ready1);
476 REQUIRE(ready1.size() == 1);
477 REQUIRE(ready1[0].slot.index == 0);
478 REQUIRE(ready1[0].op == CompletionPolicy::CompletionOp::Process);
479
480 createMessage(dh1, DataProcessingHeader{1, 1});
481 std::vector<RecordAction> ready2;
482 relayer.getReadyToProcess(ready2);
483 REQUIRE(ready2.size() == 1);
484 REQUIRE(ready2[0].slot.index == 1);
485 REQUIRE(ready2[0].op == CompletionPolicy::CompletionOp::Process);
486
487 createMessage(dh2, DataProcessingHeader{1, 1});
488 std::vector<RecordAction> ready3;
489 relayer.getReadyToProcess(ready3);
490 REQUIRE(ready3.size() == 1);
491 REQUIRE(ready3[0].slot.index == 1);
492 REQUIRE(ready3[0].op == CompletionPolicy::CompletionOp::Consume);
493 }
494
496 SECTION("TestClear")
497 {
499 InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
500 InputSpec spec2{"tracks", "TPC", "TRACKS"};
501
502 std::vector<InputRoute> inputs = {
503 InputRoute{spec1, 0, "Fake1", 0},
504 InputRoute{spec2, 1, "Fake2", 0},
505 };
506
507 std::vector<ForwardRoute> forwards;
508 std::vector<InputChannelInfo> infos{1};
509 TimesliceIndex index{1, infos};
510 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
511
513 DataRelayer relayer(policy, inputs, index, {registry});
514 // Only two messages to fill the cache.
515 relayer.setPipelineLength(3);
516
517 // Let's create a dummy O2 Message with two headers in the stack:
518 // - DataHeader matching the one provided in the input
519 DataHeader dh1;
520 dh1.dataDescription = "CLUSTERS";
521 dh1.dataOrigin = "TPC";
522 dh1.subSpecification = 0;
523 dh1.splitPayloadIndex = 0;
524 dh1.splitPayloadParts = 1;
525
526 DataHeader dh2;
527 dh2.dataDescription = "TRACKS";
528 dh2.dataOrigin = "TPC";
529 dh2.subSpecification = 0;
530 dh2.splitPayloadIndex = 0;
531 dh2.splitPayloadParts = 1;
532
533 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
534 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
535 auto createMessage = [&transport, &channelAlloc, &relayer](auto const& dh, auto const& h) {
536 std::array<fair::mq::MessagePtr, 2> messages;
537 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, h});
538 messages[1] = transport->CreateMessage(1000);
539 fair::mq::MessagePtr& header = messages[0];
540 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
541 return relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
542 };
543
544 // This fills the cache, and then empties it.
545 createMessage(dh1, DataProcessingHeader{0, 1});
546 createMessage(dh1, DataProcessingHeader{1, 1});
547 createMessage(dh2, DataProcessingHeader{1, 1});
548 relayer.clear();
549 std::vector<RecordAction> ready;
550 relayer.getReadyToProcess(ready);
551 REQUIRE(ready.size() == 0);
552 }
553
555 SECTION("TestTooMany")
556 {
558 InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
559 InputSpec spec2{"tracks", "TPC", "TRACKS"};
560
561 std::vector<InputRoute> inputs = {
562 InputRoute{spec1, 0, "Fake1", 0},
563 InputRoute{spec2, 1, "Fake2", 0},
564 };
565
566 std::vector<ForwardRoute> forwards;
567 std::vector<InputChannelInfo> infos{1};
568 TimesliceIndex index{1, infos};
569 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
570
572 DataRelayer relayer(policy, inputs, index, {registry});
573 // Only two messages to fill the cache.
574 relayer.setPipelineLength(1);
575
576 // Let's create a dummy O2 Message with two headers in the stack:
577 // - DataHeader matching the one provided in the input
578 DataHeader dh1;
579 dh1.dataDescription = "CLUSTERS";
580 dh1.dataOrigin = "TPC";
581 dh1.subSpecification = 0;
582 dh1.splitPayloadIndex = 0;
583 dh1.splitPayloadParts = 1;
584
585 DataHeader dh2;
586 dh2.dataDescription = "TRACKS";
587 dh2.dataOrigin = "TPC";
588 dh2.subSpecification = 0;
589 dh2.splitPayloadIndex = 0;
590 dh2.splitPayloadParts = 1;
591
592 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
593 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
594
595 std::array<fair::mq::MessagePtr, 4> messages;
596 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh1, DataProcessingHeader{0, 1}});
597 messages[1] = transport->CreateMessage(1000);
598 fair::mq::MessagePtr& header = messages[0];
599 fair::mq::MessagePtr& payload = messages[1];
600 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
601 relayer.relay(header->GetData(), &messages[0], fakeInfo, 2);
602 REQUIRE(header.get() == nullptr);
603 REQUIRE(payload.get() == nullptr);
604 // This fills the cache, and then waits.
605 messages[2] = o2::pmr::getMessage(Stack{channelAlloc, dh1, DataProcessingHeader{1, 1}});
606 messages[3] = transport->CreateMessage(1000);
607 fair::mq::MessagePtr& header2 = messages[2];
608 fair::mq::MessagePtr& payload2 = messages[3];
609 DataRelayer::InputInfo fakeInfo2{2, 2, DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
610 auto action = relayer.relay(header2->GetData(), &messages[2], fakeInfo2, 2);
611 REQUIRE(action.type == DataRelayer::RelayChoice::Type::Backpressured);
612 REQUIRE(header2.get() != nullptr);
613 REQUIRE(payload2.get() != nullptr);
614 }
615
616 SECTION("SplitParts")
617 {
619 InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
620 InputSpec spec2{"its", "ITS", "CLUSTERS"};
621
622 std::vector<InputRoute> inputs = {
623 InputRoute{spec1, 0, "Fake1", 0},
624 InputRoute{spec2, 0, "Fake2", 0},
625 };
626
627 std::vector<ForwardRoute> forwards;
628 std::vector<InputChannelInfo> infos{1};
629 TimesliceIndex index{1, infos};
630 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
631
633 DataRelayer relayer(policy, inputs, index, {registry});
634 // Only two messages to fill the cache.
635 relayer.setPipelineLength(1);
636
637 // Let's create a dummy O2 Message with two headers in the stack:
638 // - DataHeader matching the one provided in the input
639 DataHeader dh1;
640 dh1.dataDescription = "CLUSTERS";
641 dh1.dataOrigin = "TPC";
642 dh1.subSpecification = 0;
643 dh1.splitPayloadIndex = 0;
644 dh1.splitPayloadParts = 1;
645
646 DataHeader dh2;
647 dh2.dataDescription = "TRACKS";
648 dh2.dataOrigin = "TPC";
649 dh2.subSpecification = 0;
650 dh2.splitPayloadIndex = 0;
651 dh2.splitPayloadParts = 1;
652
653 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
654 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
655
656 std::array<fair::mq::MessagePtr, 6> messages;
657 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh1, DataProcessingHeader{0, 1}});
658 messages[1] = transport->CreateMessage(1000);
659 fair::mq::MessagePtr& header = messages[0];
660 fair::mq::MessagePtr& payload = messages[1];
661 DataRelayer::InputInfo fakeInfo{0, 2, DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
662 relayer.relay(header->GetData(), &messages[0], fakeInfo, 2);
663 REQUIRE(header.get() == nullptr);
664 REQUIRE(payload.get() == nullptr);
665 // This fills the cache, and then waits.
666 messages[2] = o2::pmr::getMessage(Stack{channelAlloc, dh1, DataProcessingHeader{1, 1}});
667 messages[3] = transport->CreateMessage(1000);
668 fair::mq::MessagePtr& header2 = messages[2];
669 fair::mq::MessagePtr& payload2 = messages[3];
670 DataRelayer::InputInfo fakeInfo2{2, 2, DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
671 auto action = relayer.relay(header2->GetData(), &messages[2], fakeInfo, 2);
672 REQUIRE(action.type == DataRelayer::RelayChoice::Type::Backpressured);
673 CHECK(action.timeslice.value == 1);
674 REQUIRE(header2.get() != nullptr);
675 REQUIRE(payload2.get() != nullptr);
676 // This fills the cache, and then waits.
677 messages[4] = o2::pmr::getMessage(Stack{channelAlloc, dh1, DataProcessingHeader{1, 1}});
678 messages[5] = transport->CreateMessage(1000);
679 DataRelayer::InputInfo fakeInfo3{4, 2, DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
680 relayer.relay(header2->GetData(), &messages[4], fakeInfo3, 2);
681 REQUIRE(action.type == DataRelayer::RelayChoice::Type::Backpressured);
682 CHECK(action.timeslice.value == 1);
683 REQUIRE(header2.get() != nullptr);
684 REQUIRE(payload2.get() != nullptr);
685 }
686
687 SECTION("SplitPayloadPairs")
688 {
690 InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
691
692 std::vector<InputRoute> inputs = {
693 InputRoute{spec1, 0, "Fake1", 0},
694 };
695
696 std::vector<ForwardRoute> forwards;
697 std::vector<InputChannelInfo> infos{1};
698 TimesliceIndex index{1, infos};
699 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
700
702 DataRelayer relayer(policy, inputs, index, {registry});
703 relayer.setPipelineLength(4);
704
705 DataHeader dh{"CLUSTERS", "TPC", 0};
706
707 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
708 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
709 size_t timeslice = 0;
710
711 const int nSplitParts = 100;
712 std::vector<std::unique_ptr<fair::mq::Message>> splitParts;
713 splitParts.reserve(2 * nSplitParts);
714
715 for (size_t i = 0; i < nSplitParts; ++i) {
716 dh.splitPayloadIndex = i;
717 dh.splitPayloadParts = nSplitParts;
718
719 fair::mq::MessagePtr header = o2::pmr::getMessage(Stack{channelAlloc, dh, DataProcessingHeader{timeslice, 1}});
720 fair::mq::MessagePtr payload = transport->CreateMessage(100);
721
722 splitParts.emplace_back(std::move(header));
723 splitParts.emplace_back(std::move(payload));
724 }
725 REQUIRE(splitParts.size() == 2 * nSplitParts);
726
727 DataRelayer::InputInfo fakeInfo{0, splitParts.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
728 relayer.relay(splitParts[0]->GetData(), splitParts.data(), fakeInfo, splitParts.size());
729 std::vector<RecordAction> ready;
730 relayer.getReadyToProcess(ready);
731 REQUIRE(ready.size() == 1);
732 REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume);
733 auto messageSet = relayer.consumeAllInputsForTimeslice(ready[0].slot);
734 // we have one input route and thus one message set containing pairs for all
735 // payloads
736 REQUIRE(messageSet.size() == 1);
737 REQUIRE(messageSet[0].size() == nSplitParts);
738 REQUIRE(messageSet[0].getNumberOfPayloads(0) == 1);
739 }
740
741 SECTION("SplitPayloadSequence")
742 {
744 InputSpec spec1{"clusters", "TST", "COUNTER"};
745
746 std::vector<InputRoute> inputs = {
747 InputRoute{spec1, 0, "Fake1", 0},
748 };
749
750 std::vector<ForwardRoute> forwards;
751 std::vector<InputChannelInfo> infos{1};
752 TimesliceIndex index{1, infos};
753 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
754
756 DataRelayer relayer(policy, inputs, index, {registry});
757 relayer.setPipelineLength(4);
758
759 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
760 size_t timeslice = 0;
761
762 std::vector<size_t> sequenceSize;
763 size_t nTotalPayloads = 0;
764
765 auto createSequence = [&nTotalPayloads, &timeslice, &sequenceSize, &transport, &relayer](size_t nPayloads) -> void {
766 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
767 std::vector<std::unique_ptr<fair::mq::Message>> messages;
768 messages.reserve(nPayloads + 1);
769 DataHeader dh{"COUNTER", "TST", 0};
770
771 // one header with index set to the number of split parts indicates sequence
772 // of payloads without additional headers
773 dh.splitPayloadIndex = nPayloads;
774 dh.splitPayloadParts = nPayloads;
775 fair::mq::MessagePtr header = o2::pmr::getMessage(Stack{channelAlloc, dh, DataProcessingHeader{timeslice, 1}});
776 messages.emplace_back(std::move(header));
777
778 for (size_t i = 0; i < nPayloads; ++i) {
779 messages.emplace_back(transport->CreateMessage(100));
780 *(reinterpret_cast<size_t*>(messages.back()->GetData())) = nTotalPayloads;
781 ++nTotalPayloads;
782 }
783 REQUIRE(messages.size() == nPayloads + 1);
784 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
785 relayer.relay(messages[0]->GetData(), messages.data(), fakeInfo, messages.size(), nPayloads);
786 sequenceSize.emplace_back(nPayloads);
787 };
788 createSequence(100);
789 createSequence(1);
790 createSequence(42);
791
792 std::vector<RecordAction> ready;
793 relayer.getReadyToProcess(ready);
794 REQUIRE(ready.size() == 1);
795 REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume);
796 auto messageSet = relayer.consumeAllInputsForTimeslice(ready[0].slot);
797 // we have one input route
798 REQUIRE(messageSet.size() == 1);
799 // one message set containing number of added sequences of messages
800 REQUIRE(messageSet[0].size() == sequenceSize.size());
801 size_t counter = 0;
802 for (auto seqid = 0; seqid < sequenceSize.size(); ++seqid) {
803 REQUIRE(messageSet[0].getNumberOfPayloads(seqid) == sequenceSize[seqid]);
804 for (auto pi = 0; pi < messageSet[0].getNumberOfPayloads(seqid); ++pi) {
805 REQUIRE(messageSet[0].payload(seqid, pi));
806 auto const* data = messageSet[0].payload(seqid, pi)->GetData();
807 REQUIRE(*(reinterpret_cast<size_t const*>(data)) == counter);
808 ++counter;
809 }
810 }
811 }
812}
benchmark::State & state
int16_t time
Definition RawEventData.h:4
int32_t i
uint32_t op
uint32_t res
Definition RawData.h:0
o2::monitoring::Monitoring Monitoring
Class for time synchronization of RawReader instances.
void setPipelineLength(size_t s)
Tune the maximum number of in flight timeslices this can handle.
#define CHECK
GLuint64EXT * result
Definition glcorearb.h:5662
GLsizeiptr size
Definition glcorearb.h:659
GLuint index
Definition glcorearb.h:781
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
Definition glcorearb.h:5500
GLboolean * data
Definition glcorearb.h:298
GLuint * states
Definition glcorearb.h:4932
GLuint counter
Definition glcorearb.h:3987
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< InputSpec > select(char const *matcher="")
fair::mq::MessagePtr getMessage(ContainerT &&container, fair::mq::MemoryResource *targetResource=nullptr)
static constexpr int INVALID
static CompletionPolicy processWhenAny(const char *name, CompletionPolicy::Matcher matcher)
static CompletionPolicy consumeWhenAll(const char *name, CompletionPolicy::Matcher matcher)
Default Completion policy. When all the parts of a record have arrived, consume them.
static CompletionPolicy consumeWhenAny(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
Helper struct to hold statistics about the data processing happening.
Running state information of a given device.
Definition DeviceState.h:34
bool batch
Whether the driver was started in batch mode or not.
static std::function< int64_t(int64_t base, int64_t offset)> defaultCPUTimeConfigurator(uv_loop_t *loop)
static std::function< void(int64_t &base, int64_t &offset)> defaultRealtimeBaseConfigurator(uint64_t offset, uv_loop_t *loop)
the main header struct
Definition DataHeader.h:618
SplitPayloadPartsType splitPayloadParts
Definition DataHeader.h:646
DataDescription dataDescription
Definition DataHeader.h:636
SubSpecificationType subSpecification
Definition DataHeader.h:656
SplitPayloadIndexType splitPayloadIndex
Definition DataHeader.h:661
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:34
TEST_CASE("DataRelayer")