Project
Loading...
Searching...
No Matches
test_WorkflowHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "Mocking.h"
12#include "test_HelperMacros.h"
18#include "../src/WorkflowHelpers.h"
19#include <catch_amalgamated.hpp>
20#include <algorithm>
21#include <memory>
22#include <list>
23
24using namespace o2::framework;
25
26TEST_CASE("TestVerifyWorkflow")
27{
28 using namespace o2::framework;
29 auto checkIncompleteInput = [](WorkflowSpec const& workflow) {
30 // Empty workflows should be invalid.
31 REQUIRE_THROWS_AS((void)WorkflowHelpers::verifyWorkflow(workflow), std::runtime_error);
32 };
33
34 auto checkSpecialChars = [](WorkflowSpec const& workflow) {
35 // Empty workflows should be invalid.
36 REQUIRE_THROWS_AS((void)WorkflowHelpers::verifyWorkflow(workflow), std::runtime_error);
37 };
38
39 auto checkOk = [](WorkflowSpec const& workflow) {
40 // Empty workflows should be invalid.
41 REQUIRE_NOTHROW((void)WorkflowHelpers::verifyWorkflow(workflow));
42 };
43
44 auto checkNotOk = [](WorkflowSpec const& workflow) {
45 // Empty workflows should be invalid.
46 REQUIRE_THROWS_AS((void)WorkflowHelpers::verifyWorkflow(workflow), std::runtime_error);
47 };
48
49 // A non fully specified input is an error, given the result is ambiguous.
50 // Completely ambiguous.
51 checkIncompleteInput(WorkflowSpec{{"A", {InputSpec{"", "", ""}}}});
52 // missing origin and description
53 checkIncompleteInput(WorkflowSpec{{"A", {InputSpec{"x", "", ""}}}});
54 // missing description
55 checkIncompleteInput(WorkflowSpec{{"A", {InputSpec{"x", "TST", ""}}}});
56 // comma is not allowed
57 checkSpecialChars(WorkflowSpec{{"A,B", {}}});
58 // This is fine, since by default both subSpec == 0 and
59 // Timeframe are assumed.
60 checkOk(WorkflowSpec{{"A", {InputSpec{"x", "TST", "A"}}}});
61 // Check for duplicate DataProcessorSpecs names
62 checkNotOk(WorkflowSpec{{"A"}, {"A"}});
63}
64
65TEST_CASE("TestWorkflowHelpers")
66{
67 using namespace o2::framework;
68 using Edges = std::vector<std::pair<int, int>>;
69 // No edges
70
71 Edges edges0 = {};
72 auto result0 = WorkflowHelpers::topologicalSort(1,
73 &(edges0[0].first),
74 &(edges0[0].second),
75 sizeof(edges0[0]),
76 0);
77 std::vector<TopoIndexInfo> expected0 = {{0, 0}};
78 REQUIRE(result0 == expected0);
79
80 // Already sorted
81 Edges edges1 = {
82 {0, 1}, // 1 depends on 0
83 {1, 2},
84 {2, 3}};
85 auto result1 = WorkflowHelpers::topologicalSort(4,
86 &(edges1[0].first),
87 &(edges1[0].second),
88 sizeof(edges1[0]),
89 3);
90 std::vector<TopoIndexInfo> expected1 = {{0, 0}, {1, 1}, {2, 2}, {3, 3}};
91 REQUIRE(result1 == expected1);
92 // Inverse sort
93 Edges edges2 = {
94 {3, 2},
95 {2, 1},
96 {1, 0}};
97 auto result2 = WorkflowHelpers::topologicalSort(4,
98 &edges2[0].first,
99 &edges2[0].second,
100 sizeof(edges2[0]),
101 3);
102 std::vector<TopoIndexInfo> expected2 = {{3, 0}, {2, 1}, {1, 2}, {0, 1}};
103 REQUIRE(result2 == expected2);
104 // 2
105 // / \
106 // 4-3 0-5
107 // \ /
108 // 1
109 Edges edges3 = {
110 {0, 5},
111 {4, 3},
112 {3, 2},
113 {2, 0},
114 {1, 0},
115 {3, 1},
116 };
117 auto result3 = WorkflowHelpers::topologicalSort(6,
118 &(edges3[0].first),
119 &(edges3[0].second),
120 sizeof(edges3[0]),
121 6);
122 std::vector<TopoIndexInfo> expected3 = {{4, 0}, {3, 1}, {1, 2}, {2, 2}, {0, 3}, {5, 4}};
123 REQUIRE(result3 == expected3);
124
125 // 0 -> 1 -----\
126 // \
127 // 5
128 // /
129 // 2 -> 3 -> 4-/
130 Edges edges4 = {
131 {0, 1},
132 {2, 3},
133 {3, 4},
134 {4, 5},
135 {1, 5}};
136 auto result4 = WorkflowHelpers::topologicalSort(6,
137 &(edges4[0].first),
138 &(edges4[0].second),
139 sizeof(edges4[0]),
140 5);
141 std::vector<TopoIndexInfo> expected4 = {{0, 0}, {2, 0}, {1, 1}, {3, 1}, {4, 2}, {5, 3}};
142 REQUIRE(result4 == expected4);
143
144 // 0 -> 1
145 // 2 -> 3 -> 4
146 Edges edges5 = {
147 {0, 1},
148 {2, 3},
149 {3, 4},
150 };
151 auto result5 = WorkflowHelpers::topologicalSort(5,
152 &(edges5[0].first),
153 &(edges5[0].second),
154 sizeof(edges5[0]),
155 3);
156 std::vector<TopoIndexInfo> expected5 = {{0, 0}, {2, 0}, {1, 1}, {3, 1}, {4, 2}};
157 REQUIRE(result5 == expected5);
158
159 // 0 <-> 1
160 Edges edges6 = {
161 {0, 1},
162 {1, 0}};
163 auto result6 = WorkflowHelpers::topologicalSort(2,
164 &(edges6[0].first),
165 &(edges6[0].second),
166 sizeof(edges6[0]),
167 2);
170 std::vector<TopoIndexInfo> expected6 = {};
171 REQUIRE(result6 == expected6);
172
175 struct SlotEdge {
176 int nodeIn;
177 int slotIn;
178 int nodeOut;
179 int slotOut;
180 };
181
182 // (0,0) -> (1,0) or 0 -> 1
183 // (0,1) -> (2,0) or 0 -> 2
184 // (0,2) -> (2,1) or 0 -> 2
185 std::vector<SlotEdge> edges7 = {
186 {0, 0, 1, 0},
187 {0, 1, 2, 0},
188 {0, 2, 2, 1},
189 };
190 auto result7 = WorkflowHelpers::topologicalSort(3,
191 &(edges7[0].nodeIn),
192 &(edges7[0].nodeOut),
193 sizeof(edges7[0]),
194 3);
195 std::vector<TopoIndexInfo> expected7 = {{0, 0}, {1, 1}, {2, 1}};
196 REQUIRE(result7 == expected7);
197}
198
199// Test a single connection
200//
201// A->B becomes Enum -> A -> B
202TEST_CASE("TestSimpleConnection")
203{
204 std::vector<InputSpec> expectedInputs = {InputSpec{"y", "TST", "A"}};
205 std::vector<OutputSpec> expectedOutputs = {
206 OutputSpec{"TST", "A"},
207 OutputSpec{"DPL", "SUMMARY", compile_time_hash("A"), Lifetime::Timeframe},
208 OutputSpec{"DPL", "ENUM", 0, Lifetime::Enumeration}};
209 WorkflowSpec workflow{
210 {"A",
211 {},
212 Outputs{expectedOutputs[0]}},
213 {"B", {expectedInputs[0]}}};
214 std::vector<DeviceConnectionEdge> logicalEdges;
215 std::vector<OutputSpec> outputs;
216 std::vector<LogicalForwardInfo> availableForwardsInfo;
217
219 REQUIRE(result == WorkflowParsingState::Valid);
220 auto context = makeEmptyConfigContext();
221 WorkflowHelpers::injectServiceDevices(workflow, *context);
222 // The fourth one is the dummy sink for the
223 // timeframe reporting messages
224 std::vector<std::string> expectedNames = {"A", "B", "internal-dpl-clock", "internal-dpl-injected-dummy-sink"};
225 REQUIRE(workflow.size() == expectedNames.size());
226 for (size_t wi = 0, we = workflow.size(); wi != we; ++wi) {
227 SECTION("With parameter wi = " + std::to_string(wi))
228 {
229 REQUIRE(workflow[wi].name == expectedNames[wi]);
230 }
231 }
232 WorkflowHelpers::constructGraph(workflow, logicalEdges,
233 outputs,
234 availableForwardsInfo);
235 std::vector<DeviceConnectionEdge> expectedEdges{
236 {2, 0, 0, 0, 2, 0, false, ConnectionKind::Out},
237 {0, 1, 0, 0, 0, 0, false, ConnectionKind::Out},
238 {1, 3, 0, 1, 1, 0, false, ConnectionKind::Out},
239 };
240 REQUIRE(expectedOutputs.size() == outputs.size());
241 for (size_t oi = 0, oe = expectedOutputs.size(); oi != oe; ++oi) {
242 INFO("With parameter oi = " << oi);
243 REQUIRE(expectedOutputs[oi].lifetime == outputs[oi].lifetime);
244 }
245 REQUIRE(expectedEdges.size() == logicalEdges.size());
246 for (size_t ei = 0, ee = expectedEdges.size(); ei != ee; ++ei) {
247 SECTION("With parameter ei = " + std::to_string(ei))
248 {
249 REQUIRE(expectedEdges[ei].consumer == logicalEdges[ei].consumer);
250 REQUIRE(expectedEdges[ei].producer == logicalEdges[ei].producer);
251 REQUIRE(expectedEdges[ei].outputGlobalIndex == logicalEdges[ei].outputGlobalIndex);
252 }
253 }
254}
255
256// Test a simple forward in case of two parallel consumers
257// B
258// /
259// A becomes A -> B -> C
260// \
261// C
262TEST_CASE("TestSimpleForward")
263{
264 std::vector<InputSpec> expectedInputs = {InputSpec{"y", "TST", "A"}};
265 std::vector<OutputSpec> expectedOutputs = {
266 OutputSpec{"TST", "A"},
267 OutputSpec{"DPL", "SUMMARY", compile_time_hash("B"), Lifetime::Timeframe},
268 OutputSpec{"DPL", "SUMMARY", compile_time_hash("C"), Lifetime::Timeframe},
269 OutputSpec{"DPL", "SUMMARY", compile_time_hash("D"), Lifetime::Timeframe},
270 OutputSpec{"DPL", "TIMER", 0, Lifetime::Timer}};
271 WorkflowSpec workflow{
272 {"A", {}, Outputs{expectedOutputs[0]}},
273 {"B", {expectedInputs[0]}},
274 {"C", {expectedInputs[0]}},
275 {"D", {expectedInputs[0]}}};
276 std::vector<DeviceConnectionEdge> logicalEdges;
277 std::vector<OutputSpec> outputs;
278 std::vector<LogicalForwardInfo> availableForwardsInfo;
279 REQUIRE(WorkflowHelpers::verifyWorkflow(workflow) == WorkflowParsingState::Valid);
280 auto context = makeEmptyConfigContext();
281 WorkflowHelpers::injectServiceDevices(workflow, *context);
282 WorkflowHelpers::constructGraph(workflow, logicalEdges,
283 outputs,
284 availableForwardsInfo);
285
286 std::vector<DeviceConnectionEdge> expectedEdges{
287 {4, 0, 0, 0, 4, 0, false, ConnectionKind::Out},
288 {0, 1, 0, 0, 0, 0, false, ConnectionKind::Out},
289 {1, 2, 0, 0, 0, 0, true, ConnectionKind::Out},
290 {2, 3, 0, 0, 0, 0, true, ConnectionKind::Out},
291
292 {1, 5, 0, 0, 1, 0, true, ConnectionKind::Out},
293 {2, 5, 0, 0, 2, 1, true, ConnectionKind::Out},
294 {3, 5, 0, 0, 3, 2, true, ConnectionKind::Out},
295 };
296 REQUIRE(expectedOutputs.size() == outputs.size());
297 REQUIRE(expectedEdges.size() == logicalEdges.size());
298 for (size_t ei = 0, ee = expectedEdges.size(); ei != ee; ++ei) {
299 SECTION("with ei: " + std::to_string(ei))
300 {
301 REQUIRE(expectedEdges[ei].consumer == logicalEdges[ei].consumer);
302 REQUIRE(expectedEdges[ei].producer == logicalEdges[ei].producer);
303 REQUIRE(expectedEdges[ei].outputGlobalIndex == logicalEdges[ei].outputGlobalIndex);
304 REQUIRE(expectedEdges[ei].consumerInputIndex == logicalEdges[ei].consumerInputIndex);
305 }
306 }
307}
308
309TEST_CASE("TestGraphConstruction")
310{
311 WorkflowSpec workflow{
312 {"A",
313 Inputs{},
314 Outputs{
315 OutputSpec{"TST", "A"}}},
317 "B",
318 Inputs{InputSpec{"b", "TST", "A"}},
319 Outputs{OutputSpec{"TST", "B"}},
320 },
321 3),
322 timePipeline({"C", Inputs{InputSpec{"c", "TST", "B"}}}, 2)};
323
324 std::vector<DeviceConnectionEdge> expected{
325 {3, 0, 0, 0, 3, 0, false, ConnectionKind::Out},
326 {0, 1, 0, 0, 0, 0, false, ConnectionKind::Out},
327 {0, 1, 1, 0, 0, 0, false, ConnectionKind::Out},
328 {0, 1, 2, 0, 0, 0, false, ConnectionKind::Out},
329 {1, 2, 0, 0, 1, 0, false, ConnectionKind::Out},
330 {1, 2, 0, 1, 1, 0, false, ConnectionKind::Out},
331 {1, 2, 0, 2, 1, 0, false, ConnectionKind::Out},
332 {1, 2, 1, 0, 1, 0, false, ConnectionKind::Out},
333 {1, 2, 1, 1, 1, 0, false, ConnectionKind::Out},
334 {1, 2, 1, 2, 1, 0, false, ConnectionKind::Out},
335
336 {2, 4, 0, 0, 2, 0, false, ConnectionKind::Out}, // DPL/SUMMARY routes
337 {2, 4, 0, 1, 2, 0, false, ConnectionKind::Out},
338 };
339 std::list<LogicalOutputInfo> availableOutputsInfo;
340 std::vector<DeviceConnectionEdge> logicalEdges;
341 std::vector<LogicalForwardInfo> availableForwardsInfo;
342
343 // This is a temporary store for inputs and outputs, including forwarded
344 // channels, so that we can construct them before assigning to a device.
345 std::vector<OutputSpec> outputs;
346
347 REQUIRE(WorkflowHelpers::verifyWorkflow(workflow) == WorkflowParsingState::Valid);
348 auto context = makeEmptyConfigContext();
349 WorkflowHelpers::injectServiceDevices(workflow, *context);
350 WorkflowHelpers::constructGraph(workflow, logicalEdges,
351 outputs,
352 availableForwardsInfo);
353 std::vector<ConcreteDataMatcher> expectedMatchers = {
354 ConcreteDataMatcher{"TST", "A", 0},
355 ConcreteDataMatcher{"TST", "B", 0},
356 ConcreteDataMatcher{"DPL", "SUMMARY", compile_time_hash("C")}, // Summary value
357 ConcreteDataMatcher{"DPL", "ENUM", compile_time_hash("A")}, // Enums value
358 };
359
360 std::vector<Lifetime> expectedLifetimes = {
361 Lifetime::Timeframe,
362 Lifetime::Timeframe,
363 Lifetime::Timeframe,
364 Lifetime::Enumeration,
365 };
366
367 REQUIRE(expectedMatchers.size() == expectedLifetimes.size());
368 REQUIRE(outputs.size() == expectedMatchers.size());
369 ; // FIXME: Is this what we actually want? We need
370 // different matchers depending on the different timeframe ID.
371
372 for (size_t i = 0; i < outputs.size(); ++i) {
373 SECTION("with i: " + std::to_string(i))
374 {
375 auto concrete = DataSpecUtils::asConcreteDataMatcher(outputs[i]);
376 REQUIRE(concrete.origin.as<std::string>() == expectedMatchers[i].origin.as<std::string>());
377 REQUIRE(concrete.description.as<std::string>() == expectedMatchers[i].description.as<std::string>());
378 REQUIRE(concrete.subSpec == expectedMatchers[i].subSpec);
379 REQUIRE(outputs[i].lifetime == expectedLifetimes[i]);
380 }
381 }
382
383 REQUIRE(expected.size() == logicalEdges.size());
384 for (size_t i = 0; i < logicalEdges.size(); ++i) {
385 SECTION("with i: " + std::to_string(i))
386 {
387 REQUIRE(logicalEdges[i].producer == expected[i].producer);
388 REQUIRE(logicalEdges[i].consumer == expected[i].consumer);
389 REQUIRE(logicalEdges[i].timeIndex == expected[i].timeIndex);
390 REQUIRE(logicalEdges[i].producerTimeIndex == expected[i].producerTimeIndex);
391 REQUIRE(logicalEdges[i].outputGlobalIndex == expected[i].outputGlobalIndex);
392 }
393 }
394
395 std::vector<size_t> inIndex;
396 std::vector<size_t> outIndex;
397 WorkflowHelpers::sortEdges(inIndex, outIndex, logicalEdges);
398 // Notice that zero is at the end because the first edge in the topological
399 // sort is the timer and that gets added last.
400 std::vector<size_t> expectedOutIndex{
401 1, 2, 3, 4, 7, 5, 8, 6, 9, 10, 11, 0};
402
403 std::vector<size_t> expectedInIndex{
404 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
405
406 REQUIRE_THAT(expectedOutIndex, Catch::Matchers::RangeEquals(outIndex));
407 REQUIRE_THAT(expectedInIndex, Catch::Matchers::RangeEquals(inIndex));
408
409 auto actions = WorkflowHelpers::computeOutEdgeActions(logicalEdges,
410 outIndex);
411
412 std::vector<EdgeAction> expectedActionsOut{
413 EdgeAction{true, true}, // timer device with first timer channel
414 EdgeAction{true, true}, // actual first edge
415 EdgeAction{false, true},
416 EdgeAction{false, true},
417 EdgeAction{true, true},
418 EdgeAction{true, true},
419 EdgeAction{true, true},
420 EdgeAction{false, true},
421 EdgeAction{false, true},
422 EdgeAction{false, true},
423 EdgeAction{true, true},
424 EdgeAction{true, true},
425 };
426
427 REQUIRE(expectedActionsOut.size() == actions.size());
428 for (size_t i = 0; i < outIndex.size(); i++) {
429 size_t j = outIndex[i];
430 SECTION(std::to_string(i) + " " + std::to_string(j))
431 {
432 REQUIRE(expectedActionsOut[j].requiresNewDevice == actions[j].requiresNewDevice);
433 }
434 }
435
436 std::vector<EdgeAction> expectedActionsIn{
437 EdgeAction{true, true}, // timer device with first timer channel
438 EdgeAction{true, true}, // actual first edge
439 EdgeAction{true, true},
440 EdgeAction{true, true},
441 EdgeAction{true, true},
442 EdgeAction{false, true},
443 EdgeAction{false, true},
444 EdgeAction{true, true},
445 EdgeAction{false, true},
446 EdgeAction{false, true},
447 EdgeAction{true, true},
448 EdgeAction{false, true},
449 };
450 auto inActions = WorkflowHelpers::computeInEdgeActions(logicalEdges,
451 inIndex);
452
453 REQUIRE(expectedActionsIn.size() == inActions.size());
454 for (size_t i = 0; i < inIndex.size(); i++) {
455 size_t j = inIndex[i];
456 auto expectedValue = expectedActionsIn[j].requiresNewDevice;
457 auto actualValue = inActions[j].requiresNewDevice;
458
459 SECTION(std::to_string(i) + " " + std::to_string(j))
460 {
461 REQUIRE(expectedValue == actualValue);
462 }
463 }
464}
465
466// This is to test a workflow where the input is not of type Timeframe and
467// therefore requires a dangling channel.
468// The topology is
469//
470// TST/A TST/B
471// ----> (A) ---->
472//
473TEST_CASE("TestExternalInput")
474{
475 WorkflowSpec workflow{
476 {.name = "A",
477 .inputs = {
478 InputSpec{"external", "TST", "A", 0, Lifetime::Timer}},
479 .outputs = {OutputSpec{"TST", "B"}}}};
480 REQUIRE(WorkflowHelpers::verifyWorkflow(workflow) == WorkflowParsingState::Valid);
481 std::vector<DeviceConnectionEdge> logicalEdges;
482 std::vector<OutputSpec> outputs;
483 std::vector<LogicalForwardInfo> availableForwardsInfo;
484
485 REQUIRE(workflow.size() == 1);
486
487 auto context = makeEmptyConfigContext();
488 WorkflowHelpers::injectServiceDevices(workflow, *context);
489 // The added devices are the one which should connect to
490 // the condition DB and the sink for the dangling outputs.
491 REQUIRE(workflow.size() == 3);
492 WorkflowHelpers::constructGraph(workflow, logicalEdges,
493 outputs,
494 availableForwardsInfo);
495}
496
497TEST_CASE("DetermineDanglingOutputs")
498{
499 WorkflowSpec workflow0{
500 {.name = "A", .outputs = {OutputSpec{"TST", "A"}}},
501 {.name = "B", .inputs = {InputSpec{"a", "TST", "A"}}}};
502
503 WorkflowSpec workflow1{
504 {.name = "A",
505 .outputs = {OutputSpec{"TST", "A"}}}};
506
507 WorkflowSpec workflow2{
508 {.name = "A", .outputs = {OutputSpec{"TST", "A"}}},
509 {.name = "B", .inputs = {InputSpec{"a", "TST", "B"}}}};
510
511 WorkflowSpec workflow3{
512 {.name = "A", .outputs = {OutputSpec{"TST", "A"}, OutputSpec{"TST", "B"}}},
513 {.name = "B", .inputs = {InputSpec{"a", "TST", "A"}}}};
514
515 WorkflowSpec workflow4{
516 {.name = "A", .outputs = {OutputSpec{"TST", "A"}, OutputSpec{"TST", "B"}, OutputSpec{"TST", "C"}}},
517 {.name = "B", .inputs = {InputSpec{"a", "TST", "A"}}}};
518
519 auto dangling0 = WorkflowHelpers::computeDanglingOutputs(workflow0);
520 std::vector<InputSpec> expected0{};
521 REQUIRE_THAT(dangling0, Catch::Matchers::RangeEquals(expected0));
522
523 auto dangling1 = WorkflowHelpers::computeDanglingOutputs(workflow1);
524 std::vector<InputSpec> expected1{InputSpec{"dangling0", "TST", "A"}};
525 REQUIRE_THAT(dangling1, Catch::Matchers::RangeEquals(expected1));
526
527 auto dangling2 = WorkflowHelpers::computeDanglingOutputs(workflow2);
528 std::vector<InputSpec> expected2{InputSpec{"dangling0", "TST", "A"}};
529 REQUIRE_THAT(dangling2, Catch::Matchers::RangeEquals(expected2));
530
531 auto dangling3 = WorkflowHelpers::computeDanglingOutputs(workflow3);
532 std::vector<InputSpec> expected3{InputSpec{"dangling0", "TST", "B"}};
533 REQUIRE_THAT(dangling3, Catch::Matchers::RangeEquals(expected3));
534
535 auto dangling4 = WorkflowHelpers::computeDanglingOutputs(workflow4);
536 std::vector<InputSpec> expected4{InputSpec{"dangling0", "TST", "B"}, InputSpec{"dangling1", "TST", "C"}};
537 REQUIRE_THAT(dangling4, Catch::Matchers::RangeEquals(expected4));
538}
539
540TEST_CASE("TEST_SELECT")
541{
542 auto res = o2::framework::select();
543 REQUIRE(res.empty());
544 auto res1 = o2::framework::select("x:TST/C1/0");
545 REQUIRE(res1.size() == 1);
546}
547
548// Test the case in which two outputs are matched by the same generic input on B
549// A/1
550// \
551// B becomes Timer -> A -> B
552// /
553// A/2
554TEST_CASE("TestOriginWildcard")
555{
556 std::vector<InputSpec> expectedInputs = {InputSpec{"x", DataSpecUtils::dataDescriptorMatcherFrom(o2::header::DataOrigin{"A"})}};
557 std::vector<OutputSpec> expectedOutputs = {
558 OutputSpec{"A", "1"},
559 OutputSpec{"A", "2"},
560 OutputSpec{"DPL", "TIMER", 0, Lifetime::Timer},
561 OutputSpec{"DPL", "SUMMARY", compile_time_hash("B"), Lifetime::Timeframe}};
562 WorkflowSpec workflow{
563 {"A", {}, {expectedOutputs[0], expectedOutputs[1]}},
564 {"B", expectedInputs, {}},
565 };
566 std::vector<DeviceConnectionEdge> logicalEdges;
567 std::vector<OutputSpec> outputs;
568 std::vector<LogicalForwardInfo> availableForwardsInfo;
569
570 REQUIRE(WorkflowHelpers::verifyWorkflow(workflow) == WorkflowParsingState::Valid);
571 auto context = makeEmptyConfigContext();
572 WorkflowHelpers::injectServiceDevices(workflow, *context);
573 REQUIRE(workflow.size() == 4);
574 REQUIRE(workflow.size() >= 4);
575 REQUIRE(workflow[0].name == "A");
576 REQUIRE(workflow[1].name == "B");
577 REQUIRE(workflow[2].name == "internal-dpl-clock");
578 REQUIRE(workflow[3].name == "internal-dpl-injected-dummy-sink");
579 for (size_t wi = 4; wi < workflow.size(); ++wi) {
580 REQUIRE(workflow[wi].name == "");
581 }
582 WorkflowHelpers::constructGraph(workflow, logicalEdges,
583 outputs,
584 availableForwardsInfo);
585
586 std::vector<DeviceConnectionEdge> expectedEdges{
587 {2, 0, 0, 0, 3, 0, false, ConnectionKind::Out},
588 {0, 1, 0, 0, 0, 0, false, ConnectionKind::Out},
589 {0, 1, 0, 0, 1, 0, false, ConnectionKind::Out},
590 {1, 3, 0, 0, 2, 0, false, ConnectionKind::Out},
591 };
592
593 std::vector<size_t> expectedOutEdgeIndex = {1, 2, 3, 0};
594 std::vector<size_t> expectedInEdgeIndex = {0, 1, 2, 3};
595 std::vector<EdgeAction> expectedActions = {
596 {true, true}, // to go from timer to A (new channel and new device)
597 {true, true}, // to go from A/1 to B (new channel and new device)
598 {false, false}, // to go from A/2 to B (device is the same as A/1, device is the same as B?)
599 {true, true} // to go from B to sink
600 };
601
602 // Not sure I understand...
603 std::vector<EdgeAction> expectedInActions = {
604 {true, true},
605 {true, true},
606 {false, false},
607 {true, true} // to go from B to sink
608 };
609
610 REQUIRE(expectedOutputs.size() == outputs.size());
611 REQUIRE(expectedEdges.size() == logicalEdges.size());
612 for (size_t ei = 0, ee = expectedEdges.size(); ei != ee; ++ei) {
613 SECTION("ei : " + std::to_string(ei))
614 {
615 REQUIRE(expectedEdges[ei].consumer == logicalEdges[ei].consumer);
616 REQUIRE(expectedEdges[ei].producer == logicalEdges[ei].producer);
617 REQUIRE(expectedEdges[ei].outputGlobalIndex == logicalEdges[ei].outputGlobalIndex);
618 REQUIRE(expectedEdges[ei].consumerInputIndex == logicalEdges[ei].consumerInputIndex);
619 }
620 }
621
622 std::vector<size_t> inEdgeIndex;
623 std::vector<size_t> outEdgeIndex;
624 WorkflowHelpers::sortEdges(inEdgeIndex, outEdgeIndex, logicalEdges);
625 REQUIRE_THAT(outEdgeIndex, Catch::Matchers::RangeEquals(expectedOutEdgeIndex));
626 REQUIRE_THAT(inEdgeIndex, Catch::Matchers::RangeEquals(expectedInEdgeIndex));
627 REQUIRE(inEdgeIndex.size() == 4);
628
629 std::vector<EdgeAction> outActions = WorkflowHelpers::computeOutEdgeActions(logicalEdges, outEdgeIndex);
630 REQUIRE(outActions.size() == expectedActions.size());
631 for (size_t ai = 0; ai < outActions.size(); ++ai) {
632 SECTION("ai : " + std::to_string(ai))
633 {
634 REQUIRE(outActions[ai].requiresNewDevice == expectedActions[ai].requiresNewDevice);
635 REQUIRE(outActions[ai].requiresNewChannel == expectedActions[ai].requiresNewChannel);
636 }
637 }
638
639 // Crete the connections on the inverse map for all of them
640 // lookup for port and add as input of the current device.
641 std::vector<EdgeAction> inActions = WorkflowHelpers::computeInEdgeActions(logicalEdges, inEdgeIndex);
642 REQUIRE(inActions.size() == expectedInActions.size());
643 for (size_t ai = 0; ai < inActions.size(); ++ai) {
644 SECTION("ai : " + std::to_string(ai))
645 {
646 REQUIRE(inActions[ai].requiresNewDevice == expectedInActions[ai].requiresNewDevice);
647 REQUIRE(inActions[ai].requiresNewChannel == expectedInActions[ai].requiresNewChannel);
648 }
649 }
650}
int32_t i
uint32_t j
Definition RawData.h:0
uint32_t res
Definition RawData.h:0
consteval uint32_t compile_time_hash(char const *str)
std::unique_ptr< ConfigContext > makeEmptyConfigContext()
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint const GLchar * name
Definition glcorearb.h:781
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
TEST_CASE("test_prepareArguments")
std::vector< DataProcessorSpec > WorkflowSpec
DataProcessorSpec timePipeline(DataProcessorSpec original, size_t count)
std::vector< InputSpec > select(char const *matcher="")
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static data_matcher::DataDescriptorMatcher dataDescriptorMatcherFrom(ConcreteDataMatcher const &concrete)
Build a DataDescriptMatcher which does not care about the subSpec.
static void constructGraph(const WorkflowSpec &workflow, std::vector< DeviceConnectionEdge > &logicalEdges, std::vector< OutputSpec > &outputs, std::vector< LogicalForwardInfo > &availableForwardsInfo)
static void injectServiceDevices(WorkflowSpec &workflow, ConfigContext &ctx)
static WorkflowParsingState verifyWorkflow(const WorkflowSpec &workflow)
static std::vector< InputSpec > computeDanglingOutputs(WorkflowSpec const &workflow)
returns only dangling outputs
static std::vector< EdgeAction > computeOutEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static void sortEdges(std::vector< size_t > &inEdgeIndex, std::vector< size_t > &outEdgeIndex, const std::vector< DeviceConnectionEdge > &edges)
static std::vector< EdgeAction > computeInEdgeActions(const std::vector< DeviceConnectionEdge > &edges, const std::vector< size_t > &index)
static std::vector< TopoIndexInfo > topologicalSort(size_t nodeCount, int const *edgeIn, int const *edgeOut, size_t byteStride, size_t edgesCount)
std::map< std::string, ID > expected