Project
Loading...
Searching...
No Matches
test_DataRelayer.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <catch_amalgamated.hpp>
14#include "Headers/DataHeader.h"
15#include "Headers/Stack.h"
23#include "../src/DataRelayerHelpers.h"
27#include <Monitoring/Monitoring.h>
28#include <fairmq/TransportFactory.h>
29#include <array>
30#include <vector>
31#include <uv.h>
32
33using Monitoring = o2::monitoring::Monitoring;
34using namespace o2::framework;
38
39TEST_CASE("DataRelayer")
40{
41 ServiceRegistry registry;
42 ServiceRegistryRef ref{registry};
43 Monitoring monitoring;
44 const DriverConfig driverConfig{
45 .batch = false,
46 };
52 TimingHelpers::defaultCPUTimeConfigurator(uv_default_loop()), {});
53 int quickUpdateInterval = 1;
54 using MetricSpec = DataProcessingStats::MetricSpec;
55 std::vector<MetricSpec> specs{
56 MetricSpec{.name = "malformed_inputs", .metricId = static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), .minPublishInterval = quickUpdateInterval},
57 MetricSpec{.name = "dropped_computations", .metricId = static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), .minPublishInterval = quickUpdateInterval},
58 MetricSpec{.name = "dropped_incoming_messages", .metricId = static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), .minPublishInterval = quickUpdateInterval},
59 MetricSpec{.name = "relayed_messages", .metricId = static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), .minPublishInterval = quickUpdateInterval}};
60
61 for (auto& spec : specs) {
62 stats.registerMetric(spec);
63 }
64
66 ref.registerService(ServiceRegistryHelpers::handleForService<Monitoring>(&monitoring));
67 ref.registerService(ServiceRegistryHelpers::handleForService<DataProcessingStats>(&stats));
68 ref.registerService(ServiceRegistryHelpers::handleForService<DataProcessingStates>(&states));
69 ref.registerService(ServiceRegistryHelpers::handleForService<DriverConfig const>(&driverConfig));
70 ref.registerService(ServiceRegistryHelpers::handleForService<DeviceState>(&state));
71 // A simple test where an input is provided
72 // and the subsequent InputRecord is immediately requested.
73 SECTION("TestNoWait")
74 {
75 InputSpec spec{"clusters", "TPC", "CLUSTERS"};
76
77 std::vector<InputRoute> inputs = {
78 InputRoute{spec, 0, "Fake", 0}};
79
80 std::vector<ForwardRoute> forwards;
81 std::vector<InputChannelInfo> infos{1};
82 TimesliceIndex index{1, infos};
83 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
84
86 DataRelayer relayer(policy, inputs, index, {registry});
87 relayer.setPipelineLength(4);
88
89 // Let's create a dummy O2 Message with two headers in the stack:
90 // - DataHeader matching the one provided in the input
91 DataHeader dh;
92 dh.dataDescription = "CLUSTERS";
93 dh.dataOrigin = "TPC";
94 dh.subSpecification = 0;
95 dh.splitPayloadIndex = 0;
96 dh.splitPayloadParts = 1;
97
98 DataProcessingHeader dph{0, 1};
99 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
100 std::array<fair::mq::MessagePtr, 2> messages;
101 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
102 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, dph});
103 messages[1] = transport->CreateMessage(1000);
104 fair::mq::MessagePtr& header = messages[0];
105 fair::mq::MessagePtr& payload = messages[1];
106 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
107 relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
108 std::vector<RecordAction> ready;
109 relayer.getReadyToProcess(ready);
110 REQUIRE(ready.size() == 1);
111 REQUIRE(ready[0].slot.index == 0);
112 REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume);
113 REQUIRE(header.get() == nullptr);
114 REQUIRE(payload.get() == nullptr);
115 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
116 // one MessageSet with one PartRef with header and payload
117 REQUIRE(result.size() == 1);
118 REQUIRE(result.at(0).size() == 1);
119 }
120
121 //
122 SECTION("TestNoWaitMatcher")
123 {
125 auto specs = o2::framework::select("clusters:TPC/CLUSTERS");
126
127 std::vector<InputRoute> inputs = {
128 InputRoute{specs[0], 0, "Fake", 0}};
129
130 std::vector<ForwardRoute> forwards;
131 std::vector<InputChannelInfo> infos{1};
132 TimesliceIndex index{1, infos};
133 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
134
136 DataRelayer relayer(policy, inputs, index, {registry});
137 relayer.setPipelineLength(4);
138
139 // Let's create a dummy O2 Message with two headers in the stack:
140 // - DataHeader matching the one provided in the input
141 DataHeader dh;
142 dh.dataDescription = "CLUSTERS";
143 dh.dataOrigin = "TPC";
144 dh.subSpecification = 0;
145 dh.splitPayloadIndex = 0;
146 dh.splitPayloadParts = 1;
147
148 DataProcessingHeader dph{0, 1};
149 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
150 std::array<fair::mq::MessagePtr, 2> messages;
151 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
152 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, dph});
153 messages[1] = transport->CreateMessage(1000);
154 fair::mq::MessagePtr& header = messages[0];
155 fair::mq::MessagePtr& payload = messages[1];
156 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
157 relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
158 std::vector<RecordAction> ready;
159 relayer.getReadyToProcess(ready);
160 REQUIRE(ready.size() == 1);
161 REQUIRE(ready[0].slot.index == 0);
162 REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume);
163 REQUIRE(header.get() == nullptr);
164 REQUIRE(payload.get() == nullptr);
165 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
166 // one MessageSet with one PartRef with header and payload
167 REQUIRE(result.size() == 1);
168 REQUIRE(result.at(0).size() == 1);
169 }
170
171 // This test a more complicated set of inputs, and verifies that data is
172 // correctly relayed before being processed.
173 SECTION("TestRelay")
174 {
176 InputSpec spec1{
177 "clusters",
178 "TPC",
179 "CLUSTERS",
180 };
181 InputSpec spec2{
182 "clusters_its",
183 "ITS",
184 "CLUSTERS",
185 };
186
187 std::vector<InputRoute> inputs = {
188 InputRoute{spec1, 0, "Fake1", 0},
189 InputRoute{spec2, 1, "Fake2", 0}};
190
191 std::vector<ForwardRoute> forwards;
192
193 std::vector<InputChannelInfo> infos{1};
194 TimesliceIndex index{1, infos};
195 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
196
198 DataRelayer relayer(policy, inputs, index, {registry});
199 relayer.setPipelineLength(4);
200
201 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
202 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
203
204 auto createMessage = [&transport, &channelAlloc, &relayer](DataHeader& dh, size_t time) {
205 std::array<fair::mq::MessagePtr, 2> messages;
206 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, DataProcessingHeader{time, 1}});
207 messages[1] = transport->CreateMessage(1000);
208 fair::mq::MessagePtr& header = messages[0];
209 fair::mq::MessagePtr& payload = messages[1];
210 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
211 relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
212 REQUIRE(header.get() == nullptr);
213 REQUIRE(payload.get() == nullptr);
214 };
215
216 // Let's create a dummy O2 Message with two headers in the stack:
217 // - DataHeader matching the one provided in the input
218 DataHeader dh1;
219 dh1.dataDescription = "CLUSTERS";
220 dh1.dataOrigin = "TPC";
221 dh1.subSpecification = 0;
222 dh1.splitPayloadIndex = 0;
223 dh1.splitPayloadParts = 1;
224
225 // Let's create the second O2 Message:
226 DataHeader dh2;
227 dh2.dataDescription = "CLUSTERS";
228 dh2.dataOrigin = "ITS";
229 dh2.subSpecification = 0;
230 dh2.splitPayloadIndex = 0;
231 dh2.splitPayloadParts = 1;
232
233 createMessage(dh1, 0);
234 std::vector<RecordAction> ready;
235 relayer.getReadyToProcess(ready);
236 REQUIRE(ready.size() == 0);
237
238 createMessage(dh2, 0);
239 ready.clear();
240 relayer.getReadyToProcess(ready);
241 REQUIRE(ready.size() == 1);
242 REQUIRE(ready[0].slot.index == 0);
243 REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume);
244
245 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
246 // two MessageSets, each with one PartRef
247 REQUIRE(result.size() == 2);
248 REQUIRE(result.at(0).size() == 1);
249 REQUIRE(result.at(1).size() == 1);
250 }
251
252 // This test a more complicated set of inputs, and verifies that data is
253 // correctly relayed before being processed.
254 SECTION("TestRelayBug")
255 {
257 InputSpec spec1{
258 "clusters",
259 "TPC",
260 "CLUSTERS",
261 };
262 InputSpec spec2{
263 "clusters_its",
264 "ITS",
265 "CLUSTERS",
266 };
267
268 std::vector<InputRoute> inputs = {
269 InputRoute{spec1, 0, "Fake1", 0},
270 InputRoute{spec2, 1, "Fake2", 0}};
271
272 std::vector<ForwardRoute> forwards;
273
274 std::vector<InputChannelInfo> infos{1};
275 TimesliceIndex index{1, infos};
276 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
277
279 DataRelayer relayer(policy, inputs, index, {registry});
280 relayer.setPipelineLength(3);
281
282 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
283 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
284
285 auto createMessage = [&transport, &channelAlloc, &relayer](DataHeader& dh, size_t time) {
286 std::array<fair::mq::MessagePtr, 2> messages;
287 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, DataProcessingHeader{time, 1}});
288 messages[1] = transport->CreateMessage(1000);
289 fair::mq::MessagePtr& header = messages[0];
290 fair::mq::MessagePtr& payload = messages[1];
291 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
292 relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
293 REQUIRE(header.get() == nullptr);
294 REQUIRE(payload.get() == nullptr);
295 };
296
297 // Let's create a dummy O2 Message with two headers in the stack:
298 // - DataHeader matching the one provided in the input
299 DataHeader dh1;
300 dh1.dataDescription = "CLUSTERS";
301 dh1.dataOrigin = "TPC";
302 dh1.subSpecification = 0;
303 dh1.splitPayloadIndex = 0;
304 dh1.splitPayloadParts = 1;
305
306 // Let's create the second O2 Message:
307 DataHeader dh2;
308 dh2.dataDescription = "CLUSTERS";
309 dh2.dataOrigin = "ITS";
310 dh2.subSpecification = 0;
311 dh2.splitPayloadIndex = 0;
312 dh2.splitPayloadParts = 1;
313
314 // Let's create the second O2 Message:
315 DataHeader dh3;
316 dh3.dataDescription = "CLUSTERS";
317 dh3.dataOrigin = "FOO";
318 dh3.subSpecification = 0;
319 dh3.splitPayloadIndex = 0;
320 dh3.splitPayloadParts = 1;
321
323 createMessage(dh1, 0);
324 std::vector<RecordAction> ready;
325 relayer.getReadyToProcess(ready);
326 REQUIRE(ready.size() == 0);
327 createMessage(dh1, 1);
328 ready.clear();
329 relayer.getReadyToProcess(ready);
330 REQUIRE(ready.size() == 0);
331 createMessage(dh2, 0);
332 ready.clear();
333 relayer.getReadyToProcess(ready);
334 REQUIRE(ready.size() == 1);
335 REQUIRE(ready[0].slot.index == 0);
336 REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume);
337 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
338 createMessage(dh2, 1);
339 ready.clear();
340 relayer.getReadyToProcess(ready);
341 REQUIRE(ready.size() == 1);
342 REQUIRE(ready[0].slot.index == 1);
343 REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume);
344 result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
345 }
346
347 // This tests a simple cache pruning, where a single input is shifted out of
348 // the cache.
349 SECTION("TestCache")
350 {
352 InputSpec spec{"clusters", "TPC", "CLUSTERS"};
353
354 std::vector<InputRoute> inputs = {
355 InputRoute{spec, 0, "Fake", 0}};
356 std::vector<ForwardRoute> forwards;
357
359 std::vector<InputChannelInfo> infos{1};
360 TimesliceIndex index{1, infos};
361 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
362 DataRelayer relayer(policy, inputs, index, {registry});
363 // Only two messages to fill the cache.
364 relayer.setPipelineLength(2);
365
366 // Let's create a dummy O2 Message with two headers in the stack:
367 // - DataHeader matching the one provided in the input
368 DataHeader dh;
369 dh.dataDescription = "CLUSTERS";
370 dh.dataOrigin = "TPC";
371 dh.subSpecification = 0;
372 dh.splitPayloadIndex = 0;
373 dh.splitPayloadParts = 1;
374
375 DataProcessingHeader dph{0, 1};
376 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
377 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
378 auto createMessage = [&transport, &channelAlloc, &relayer, &dh](auto const& h) {
379 std::array<fair::mq::MessagePtr, 2> messages;
380 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, h});
381 messages[1] = transport->CreateMessage(1000);
382 fair::mq::MessagePtr& header = messages[0];
383 fair::mq::MessagePtr& payload = messages[1];
384 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
385 auto res = relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
386 REQUIRE((res.type != DataRelayer::RelayChoice::Type::WillRelay || header.get() == nullptr));
387 REQUIRE((res.type != DataRelayer::RelayChoice::Type::WillRelay || payload.get() == nullptr));
388 REQUIRE((res.type != DataRelayer::RelayChoice::Type::Backpressured || header.get() != nullptr));
389 REQUIRE((res.type != DataRelayer::RelayChoice::Type::Backpressured || payload.get() != nullptr));
390 };
391
392 // This fills the cache, and then empties it.
393 createMessage(DataProcessingHeader{0, 1});
394 createMessage(DataProcessingHeader{1, 1});
395 std::vector<RecordAction> ready;
396 relayer.getReadyToProcess(ready);
397 REQUIRE(ready.size() == 2);
398 REQUIRE(ready[0].slot.index == 1);
399 REQUIRE(ready[1].slot.index == 0);
400 REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume);
401 REQUIRE(ready[1].op == CompletionPolicy::CompletionOp::Consume);
402 for (size_t i = 0; i < ready.size(); ++i) {
403 auto result = relayer.consumeAllInputsForTimeslice(ready[i].slot);
404 }
405
406 // This fills the cache and makes 2 obsolete.
407 createMessage(DataProcessingHeader{2, 1});
408 createMessage(DataProcessingHeader{3, 1});
409 createMessage(DataProcessingHeader{4, 1});
410 ready.clear();
411 relayer.getReadyToProcess(ready);
412 REQUIRE(ready.size() == 2);
413
414 auto result1 = relayer.consumeAllInputsForTimeslice(ready[0].slot);
415 auto result2 = relayer.consumeAllInputsForTimeslice(ready[1].slot);
416 // One for the header, one for the payload
417 REQUIRE(result1.size() == 1);
418 REQUIRE(result2.size() == 1);
419 }
420
421 // This the any policy. Even when there are two inputs, given the any policy
422 // it will run immediately.
423 SECTION("TestPolicies")
424 {
426 InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
427 InputSpec spec2{"tracks", "TPC", "TRACKS"};
428
429 std::vector<InputRoute> inputs = {
430 InputRoute{spec1, 0, "Fake1", 0},
431 InputRoute{spec2, 1, "Fake2", 0},
432 };
433
434 std::vector<ForwardRoute> forwards;
435 std::vector<InputChannelInfo> infos{1};
436 TimesliceIndex index{1, infos};
437 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
438
440 DataRelayer relayer(policy, inputs, index, {registry});
441 // Only two messages to fill the cache.
442 relayer.setPipelineLength(2);
443
444 // Let's create a dummy O2 Message with two headers in the stack:
445 // - DataHeader matching the one provided in the input
446 DataHeader dh1;
447 dh1.dataDescription = "CLUSTERS";
448 dh1.dataOrigin = "TPC";
449 dh1.subSpecification = 0;
450 dh1.splitPayloadIndex = 0;
451 dh1.splitPayloadParts = 1;
452
453 DataHeader dh2;
454 dh2.dataDescription = "TRACKS";
455 dh2.dataOrigin = "TPC";
456 dh2.subSpecification = 0;
457 dh2.splitPayloadIndex = 0;
458 dh2.splitPayloadParts = 1;
459
460 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
461 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
462 auto createMessage = [&transport, &channelAlloc, &relayer](auto const& dh, auto const& h) {
463 std::array<fair::mq::MessagePtr, 2> messages;
464 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, h});
465 messages[1] = transport->CreateMessage(1000);
466 fair::mq::MessagePtr& header = messages[0];
467 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
468 return relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
469 };
470
471 // This fills the cache, and then empties it.
472 createMessage(dh1, DataProcessingHeader{0, 1});
473 std::vector<RecordAction> ready1;
474 relayer.getReadyToProcess(ready1);
475 REQUIRE(ready1.size() == 1);
476 REQUIRE(ready1[0].slot.index == 0);
477 REQUIRE(ready1[0].op == CompletionPolicy::CompletionOp::Process);
478
479 createMessage(dh1, DataProcessingHeader{1, 1});
480 std::vector<RecordAction> ready2;
481 relayer.getReadyToProcess(ready2);
482 REQUIRE(ready2.size() == 1);
483 REQUIRE(ready2[0].slot.index == 1);
484 REQUIRE(ready2[0].op == CompletionPolicy::CompletionOp::Process);
485
486 createMessage(dh2, DataProcessingHeader{1, 1});
487 std::vector<RecordAction> ready3;
488 relayer.getReadyToProcess(ready3);
489 REQUIRE(ready3.size() == 1);
490 REQUIRE(ready3[0].slot.index == 1);
491 REQUIRE(ready3[0].op == CompletionPolicy::CompletionOp::Consume);
492 }
493
495 SECTION("TestClear")
496 {
498 InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
499 InputSpec spec2{"tracks", "TPC", "TRACKS"};
500
501 std::vector<InputRoute> inputs = {
502 InputRoute{spec1, 0, "Fake1", 0},
503 InputRoute{spec2, 1, "Fake2", 0},
504 };
505
506 std::vector<ForwardRoute> forwards;
507 std::vector<InputChannelInfo> infos{1};
508 TimesliceIndex index{1, infos};
509 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
510
512 DataRelayer relayer(policy, inputs, index, {registry});
513 // Only two messages to fill the cache.
514 relayer.setPipelineLength(3);
515
516 // Let's create a dummy O2 Message with two headers in the stack:
517 // - DataHeader matching the one provided in the input
518 DataHeader dh1;
519 dh1.dataDescription = "CLUSTERS";
520 dh1.dataOrigin = "TPC";
521 dh1.subSpecification = 0;
522 dh1.splitPayloadIndex = 0;
523 dh1.splitPayloadParts = 1;
524
525 DataHeader dh2;
526 dh2.dataDescription = "TRACKS";
527 dh2.dataOrigin = "TPC";
528 dh2.subSpecification = 0;
529 dh2.splitPayloadIndex = 0;
530 dh2.splitPayloadParts = 1;
531
532 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
533 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
534 auto createMessage = [&transport, &channelAlloc, &relayer](auto const& dh, auto const& h) {
535 std::array<fair::mq::MessagePtr, 2> messages;
536 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh, h});
537 messages[1] = transport->CreateMessage(1000);
538 fair::mq::MessagePtr& header = messages[0];
539 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
540 return relayer.relay(header->GetData(), messages.data(), fakeInfo, messages.size());
541 };
542
543 // This fills the cache, and then empties it.
544 createMessage(dh1, DataProcessingHeader{0, 1});
545 createMessage(dh1, DataProcessingHeader{1, 1});
546 createMessage(dh2, DataProcessingHeader{1, 1});
547 relayer.clear();
548 std::vector<RecordAction> ready;
549 relayer.getReadyToProcess(ready);
550 REQUIRE(ready.size() == 0);
551 }
552
554 SECTION("TestTooMany")
555 {
557 InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
558 InputSpec spec2{"tracks", "TPC", "TRACKS"};
559
560 std::vector<InputRoute> inputs = {
561 InputRoute{spec1, 0, "Fake1", 0},
562 InputRoute{spec2, 1, "Fake2", 0},
563 };
564
565 std::vector<ForwardRoute> forwards;
566 std::vector<InputChannelInfo> infos{1};
567 TimesliceIndex index{1, infos};
568 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
569
571 DataRelayer relayer(policy, inputs, index, {registry});
572 // Only two messages to fill the cache.
573 relayer.setPipelineLength(1);
574
575 // Let's create a dummy O2 Message with two headers in the stack:
576 // - DataHeader matching the one provided in the input
577 DataHeader dh1;
578 dh1.dataDescription = "CLUSTERS";
579 dh1.dataOrigin = "TPC";
580 dh1.subSpecification = 0;
581 dh1.splitPayloadIndex = 0;
582 dh1.splitPayloadParts = 1;
583
584 DataHeader dh2;
585 dh2.dataDescription = "TRACKS";
586 dh2.dataOrigin = "TPC";
587 dh2.subSpecification = 0;
588 dh2.splitPayloadIndex = 0;
589 dh2.splitPayloadParts = 1;
590
591 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
592 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
593
594 std::array<fair::mq::MessagePtr, 4> messages;
595 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh1, DataProcessingHeader{0, 1}});
596 messages[1] = transport->CreateMessage(1000);
597 fair::mq::MessagePtr& header = messages[0];
598 fair::mq::MessagePtr& payload = messages[1];
599 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
600 relayer.relay(header->GetData(), &messages[0], fakeInfo, 2);
601 REQUIRE(header.get() == nullptr);
602 REQUIRE(payload.get() == nullptr);
603 // This fills the cache, and then waits.
604 messages[2] = o2::pmr::getMessage(Stack{channelAlloc, dh1, DataProcessingHeader{1, 1}});
605 messages[3] = transport->CreateMessage(1000);
606 fair::mq::MessagePtr& header2 = messages[2];
607 fair::mq::MessagePtr& payload2 = messages[3];
608 DataRelayer::InputInfo fakeInfo2{2, 2, DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
609 auto action = relayer.relay(header2->GetData(), &messages[2], fakeInfo2, 2);
610 REQUIRE(action.type == DataRelayer::RelayChoice::Type::Backpressured);
611 REQUIRE(header2.get() != nullptr);
612 REQUIRE(payload2.get() != nullptr);
613 }
614
615 SECTION("SplitParts")
616 {
618 InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
619 InputSpec spec2{"its", "ITS", "CLUSTERS"};
620
621 std::vector<InputRoute> inputs = {
622 InputRoute{spec1, 0, "Fake1", 0},
623 InputRoute{spec2, 0, "Fake2", 0},
624 };
625
626 std::vector<ForwardRoute> forwards;
627 std::vector<InputChannelInfo> infos{1};
628 TimesliceIndex index{1, infos};
629 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
630
632 DataRelayer relayer(policy, inputs, index, {registry});
633 // Only two messages to fill the cache.
634 relayer.setPipelineLength(1);
635
636 // Let's create a dummy O2 Message with two headers in the stack:
637 // - DataHeader matching the one provided in the input
638 DataHeader dh1;
639 dh1.dataDescription = "CLUSTERS";
640 dh1.dataOrigin = "TPC";
641 dh1.subSpecification = 0;
642 dh1.splitPayloadIndex = 0;
643 dh1.splitPayloadParts = 1;
644
645 DataHeader dh2;
646 dh2.dataDescription = "TRACKS";
647 dh2.dataOrigin = "TPC";
648 dh2.subSpecification = 0;
649 dh2.splitPayloadIndex = 0;
650 dh2.splitPayloadParts = 1;
651
652 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
653 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
654
655 std::array<fair::mq::MessagePtr, 6> messages;
656 messages[0] = o2::pmr::getMessage(Stack{channelAlloc, dh1, DataProcessingHeader{0, 1}});
657 messages[1] = transport->CreateMessage(1000);
658 fair::mq::MessagePtr& header = messages[0];
659 fair::mq::MessagePtr& payload = messages[1];
660 DataRelayer::InputInfo fakeInfo{0, 2, DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
661 relayer.relay(header->GetData(), &messages[0], fakeInfo, 2);
662 REQUIRE(header.get() == nullptr);
663 REQUIRE(payload.get() == nullptr);
664 // This fills the cache, and then waits.
665 messages[2] = o2::pmr::getMessage(Stack{channelAlloc, dh1, DataProcessingHeader{1, 1}});
666 messages[3] = transport->CreateMessage(1000);
667 fair::mq::MessagePtr& header2 = messages[2];
668 fair::mq::MessagePtr& payload2 = messages[3];
669 DataRelayer::InputInfo fakeInfo2{2, 2, DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
670 auto action = relayer.relay(header2->GetData(), &messages[2], fakeInfo, 2);
671 REQUIRE(action.type == DataRelayer::RelayChoice::Type::Backpressured);
672 CHECK(action.timeslice.value == 1);
673 REQUIRE(header2.get() != nullptr);
674 REQUIRE(payload2.get() != nullptr);
675 // This fills the cache, and then waits.
676 messages[4] = o2::pmr::getMessage(Stack{channelAlloc, dh1, DataProcessingHeader{1, 1}});
677 messages[5] = transport->CreateMessage(1000);
678 DataRelayer::InputInfo fakeInfo3{4, 2, DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
679 relayer.relay(header2->GetData(), &messages[4], fakeInfo3, 2);
680 REQUIRE(action.type == DataRelayer::RelayChoice::Type::Backpressured);
681 CHECK(action.timeslice.value == 1);
682 REQUIRE(header2.get() != nullptr);
683 REQUIRE(payload2.get() != nullptr);
684 }
685
686 SECTION("SplitPayloadPairs")
687 {
689 InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
690
691 std::vector<InputRoute> inputs = {
692 InputRoute{spec1, 0, "Fake1", 0},
693 };
694
695 std::vector<ForwardRoute> forwards;
696 std::vector<InputChannelInfo> infos{1};
697 TimesliceIndex index{1, infos};
698 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
699
701 DataRelayer relayer(policy, inputs, index, {registry});
702 relayer.setPipelineLength(4);
703
704 DataHeader dh{"CLUSTERS", "TPC", 0};
705
706 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
707 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
708 size_t timeslice = 0;
709
710 const int nSplitParts = 100;
711 std::vector<std::unique_ptr<fair::mq::Message>> splitParts;
712 splitParts.reserve(2 * nSplitParts);
713
714 for (size_t i = 0; i < nSplitParts; ++i) {
715 dh.splitPayloadIndex = i;
716 dh.splitPayloadParts = nSplitParts;
717
718 fair::mq::MessagePtr header = o2::pmr::getMessage(Stack{channelAlloc, dh, DataProcessingHeader{timeslice, 1}});
719 fair::mq::MessagePtr payload = transport->CreateMessage(100);
720
721 splitParts.emplace_back(std::move(header));
722 splitParts.emplace_back(std::move(payload));
723 }
724 REQUIRE(splitParts.size() == 2 * nSplitParts);
725
726 DataRelayer::InputInfo fakeInfo{0, splitParts.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
727 relayer.relay(splitParts[0]->GetData(), splitParts.data(), fakeInfo, splitParts.size());
728 std::vector<RecordAction> ready;
729 relayer.getReadyToProcess(ready);
730 REQUIRE(ready.size() == 1);
731 REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume);
732 auto messageSet = relayer.consumeAllInputsForTimeslice(ready[0].slot);
733 // we have one input route and thus one message set containing pairs for all
734 // payloads
735 REQUIRE(messageSet.size() == 1);
736 REQUIRE(messageSet[0].size() == nSplitParts);
737 REQUIRE(messageSet[0].getNumberOfPayloads(0) == 1);
738 }
739
740 SECTION("SplitPayloadSequence")
741 {
743 InputSpec spec1{"clusters", "TST", "COUNTER"};
744
745 std::vector<InputRoute> inputs = {
746 InputRoute{spec1, 0, "Fake1", 0},
747 };
748
749 std::vector<ForwardRoute> forwards;
750 std::vector<InputChannelInfo> infos{1};
751 TimesliceIndex index{1, infos};
752 ref.registerService(ServiceRegistryHelpers::handleForService<TimesliceIndex>(&index));
753
755 DataRelayer relayer(policy, inputs, index, {registry});
756 relayer.setPipelineLength(4);
757
758 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
759 size_t timeslice = 0;
760
761 std::vector<size_t> sequenceSize;
762 size_t nTotalPayloads = 0;
763
764 auto createSequence = [&nTotalPayloads, &timeslice, &sequenceSize, &transport, &relayer](size_t nPayloads) -> void {
765 auto channelAlloc = o2::pmr::getTransportAllocator(transport.get());
766 std::vector<std::unique_ptr<fair::mq::Message>> messages;
767 messages.reserve(nPayloads + 1);
768 DataHeader dh{"COUNTER", "TST", 0};
769
770 // one header with index set to the number of split parts indicates sequence
771 // of payloads without additional headers
772 dh.splitPayloadIndex = nPayloads;
773 dh.splitPayloadParts = nPayloads;
774 fair::mq::MessagePtr header = o2::pmr::getMessage(Stack{channelAlloc, dh, DataProcessingHeader{timeslice, 1}});
775 messages.emplace_back(std::move(header));
776
777 for (size_t i = 0; i < nPayloads; ++i) {
778 messages.emplace_back(transport->CreateMessage(100));
779 *(reinterpret_cast<size_t*>(messages.back()->GetData())) = nTotalPayloads;
780 ++nTotalPayloads;
781 }
782 REQUIRE(messages.size() == nPayloads + 1);
783 DataRelayer::InputInfo fakeInfo{0, messages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
784 relayer.relay(messages[0]->GetData(), messages.data(), fakeInfo, messages.size(), nPayloads);
785 sequenceSize.emplace_back(nPayloads);
786 };
787 createSequence(100);
788 createSequence(1);
789 createSequence(42);
790
791 std::vector<RecordAction> ready;
792 relayer.getReadyToProcess(ready);
793 REQUIRE(ready.size() == 1);
794 REQUIRE(ready[0].op == CompletionPolicy::CompletionOp::Consume);
795 auto messageSet = relayer.consumeAllInputsForTimeslice(ready[0].slot);
796 // we have one input route
797 REQUIRE(messageSet.size() == 1);
798 // one message set containing number of added sequences of messages
799 REQUIRE(messageSet[0].size() == sequenceSize.size());
800 size_t counter = 0;
801 for (auto seqid = 0; seqid < sequenceSize.size(); ++seqid) {
802 REQUIRE(messageSet[0].getNumberOfPayloads(seqid) == sequenceSize[seqid]);
803 for (auto pi = 0; pi < messageSet[0].getNumberOfPayloads(seqid); ++pi) {
804 REQUIRE(messageSet[0].payload(seqid, pi));
805 auto const* data = messageSet[0].payload(seqid, pi)->GetData();
806 REQUIRE(*(reinterpret_cast<size_t const*>(data)) == counter);
807 ++counter;
808 }
809 }
810 }
811}
benchmark::State & state
int16_t time
Definition RawEventData.h:4
int32_t i
uint32_t op
uint32_t res
Definition RawData.h:0
o2::monitoring::Monitoring Monitoring
Class for time synchronization of RawReader instances.
void setPipelineLength(size_t s)
Tune the maximum number of in flight timeslices this can handle.
#define CHECK
GLuint64EXT * result
Definition glcorearb.h:5662
GLsizeiptr size
Definition glcorearb.h:659
GLuint index
Definition glcorearb.h:781
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
Definition glcorearb.h:5500
GLboolean * data
Definition glcorearb.h:298
GLuint * states
Definition glcorearb.h:4932
GLuint counter
Definition glcorearb.h:3987
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< InputSpec > select(char const *matcher="")
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
static constexpr int INVALID
static CompletionPolicy processWhenAny(const char *name, CompletionPolicy::Matcher matcher)
static CompletionPolicy consumeWhenAll(const char *name, CompletionPolicy::Matcher matcher)
Default Completion policy. When all the parts of a record have arrived, consume them.
static CompletionPolicy consumeWhenAny(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
Helper struct to hold statistics about the data processing happening.
Running state information of a given device.
Definition DeviceState.h:34
bool batch
Whether the driver was started in batch mode or not.
static std::function< int64_t(int64_t base, int64_t offset)> defaultCPUTimeConfigurator(uv_loop_t *loop)
static std::function< void(int64_t &base, int64_t &offset)> defaultRealtimeBaseConfigurator(uint64_t offset, uv_loop_t *loop)
the main header struct
Definition DataHeader.h:618
SplitPayloadPartsType splitPayloadParts
Definition DataHeader.h:646
DataDescription dataDescription
Definition DataHeader.h:636
SubSpecificationType subSpecification
Definition DataHeader.h:656
SplitPayloadIndexType splitPayloadIndex
Definition DataHeader.h:661
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36
TEST_CASE("DataRelayer")