Project
Loading...
Searching...
No Matches
test_DeviceSpec.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include "Mocking.h"
13#include <catch_amalgamated.hpp>
15#include "../src/DeviceSpecHelpers.h"
16#include "../src/GraphvizHelpers.h"
17#include "../src/WorkflowHelpers.h"
21#include "../src/SimpleResourceManager.h"
22#include "../src/ComputingResourceHelpers.h"
23#include "test_HelperMacros.h"
24
25using namespace o2::framework;
26
27// This is how you can define your processing in a declarative way
29{
30 return {{"A", Inputs{},
31 Outputs{OutputSpec{"TST", "A1"},
32 OutputSpec{"TST", "A2"}}},
33 {
34 "B",
35 Inputs{InputSpec{"a", "TST", "A1"}},
36 }};
37}
38
39TEST_CASE("TestDeviceSpec1")
40{
41 auto workflow = defineDataProcessing1();
42 auto configContext = makeEmptyConfigContext();
43 auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(*configContext);
44 auto completionPolicies = CompletionPolicy::createDefaultPolicies();
45 auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies();
46 REQUIRE(channelPolicies.empty() == false);
47 REQUIRE(completionPolicies.empty() == false);
48 std::vector<DeviceSpec> devices;
49
50 std::vector<ComputingResource> resources{ComputingResourceHelpers::getLocalhostResource()};
51 REQUIRE(resources.size() == 1);
52 REQUIRE(resources[0].startPort == 22000);
53 SimpleResourceManager rm(resources);
54 auto offers = rm.getAvailableOffers();
55 REQUIRE(offers.size() == 1);
56 REQUIRE(offers[0].startPort == 22000);
57 REQUIRE(offers[0].rangeSize == 5000);
58
59 DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
60 REQUIRE(devices.size() == 2);
61 REQUIRE(devices[0].outputChannels.size() == 1);
62 REQUIRE(devices[0].outputChannels[0].method == ChannelMethod::Bind);
63 REQUIRE(devices[0].outputChannels[0].type == ChannelType::Push);
64 REQUIRE(devices[0].outputChannels[0].name == "from_A_to_B");
65 REQUIRE(devices[0].outputChannels[0].port == 22000);
66 REQUIRE(devices[0].outputs.size() == 1);
67
68 REQUIRE(devices[1].inputChannels.size() == 1);
69 REQUIRE(devices[1].inputChannels[0].method == ChannelMethod::Connect);
70 REQUIRE(devices[1].inputChannels[0].type == ChannelType::Pull);
71 REQUIRE(devices[1].inputChannels[0].name == "from_A_to_B");
72 REQUIRE(devices[1].inputChannels[0].port == 22000);
73
74 REQUIRE(devices[1].inputs.size() == 1);
75 REQUIRE(devices[1].inputs[0].sourceChannel == "from_A_to_B");
76}
77
78// Same as before, but using PUSH/PULL as policy
79TEST_CASE("TestDeviceSpec1PushPull")
80{
81 auto workflow = defineDataProcessing1();
82 ChannelConfigurationPolicy pushPullPolicy;
86
87 std::vector<ChannelConfigurationPolicy> channelPolicies = {pushPullPolicy};
88 auto configContext = makeEmptyConfigContext();
89 auto completionPolicies = CompletionPolicy::createDefaultPolicies();
90 auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies();
91
92 REQUIRE(channelPolicies.empty() == false);
93 std::vector<DeviceSpec> devices;
94 std::vector<ComputingResource> resources{ComputingResourceHelpers::getLocalhostResource()};
95 SimpleResourceManager rm(resources);
96 DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
97 REQUIRE(devices.size() == 2);
98 REQUIRE(devices[0].outputChannels.size() == 1);
99 REQUIRE(devices[0].outputChannels[0].method == ChannelMethod::Bind);
100 REQUIRE(devices[0].outputChannels[0].type == ChannelType::Push);
101 REQUIRE(devices[0].outputChannels[0].name == "from_A_to_B");
102 REQUIRE(devices[0].outputChannels[0].port == 22000);
103 REQUIRE(devices[0].outputs.size() == 1);
104
105 REQUIRE(devices[1].inputChannels.size() == 1);
106 REQUIRE(devices[1].inputChannels[0].method == ChannelMethod::Connect);
107 REQUIRE(devices[1].inputChannels[0].type == ChannelType::Pull);
108 REQUIRE(devices[1].inputChannels[0].name == "from_A_to_B");
109 REQUIRE(devices[1].inputChannels[0].port == 22000);
110
111 REQUIRE(devices[1].inputs.size() == 1);
112 REQUIRE(devices[1].inputs[0].sourceChannel == "from_A_to_B");
113}
114
115// This should still define only one channel, since there is only
116// two devices to connect
117namespace
118{
120{
121 return {{"A", Inputs{},
122 Outputs{OutputSpec{"TST", "A1"},
123 OutputSpec{"TST", "A2"}}},
124 {
125 "B",
126 Inputs{
127 InputSpec{"a", "TST", "A1"},
128 InputSpec{"b", "TST", "A2"},
129 },
130 }};
131}
132} // namespace
133
134TEST_CASE("TestDeviceSpec2")
135{
136 auto workflow = defineDataProcessing2();
137 auto configContext = makeEmptyConfigContext();
138 auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(*configContext);
139 auto completionPolicies = CompletionPolicy::createDefaultPolicies();
140 auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies();
141 std::vector<DeviceSpec> devices;
142
143 std::vector<ComputingResource> resources{ComputingResourceHelpers::getLocalhostResource()};
144 SimpleResourceManager rm(resources);
145 DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
146 REQUIRE(devices.size() == 2);
147 REQUIRE(devices[0].outputChannels.size() == 1);
148 REQUIRE(devices[0].outputChannels[0].method == ChannelMethod::Bind);
149 REQUIRE(devices[0].outputChannels[0].type == ChannelType::Push);
150 REQUIRE(devices[0].outputChannels[0].name == "from_A_to_B");
151 REQUIRE(devices[0].outputChannels[0].port == 22000);
152
153 REQUIRE(devices[1].inputChannels.size() == 1);
154 REQUIRE(devices[1].inputChannels[0].method == ChannelMethod::Connect);
155 REQUIRE(devices[1].inputChannels[0].type == ChannelType::Pull);
156 REQUIRE(devices[1].inputChannels[0].name == "from_A_to_B");
157 REQUIRE(devices[1].inputChannels[0].port == 22000);
158}
159
160// This should still define only one channel, since there is only
161// two devices to connect
163{
164 return {{"A", Inputs{},
165 Outputs{OutputSpec{"TST", "A1"},
166 OutputSpec{"TST", "A2"}}},
167 {
168 "B",
169 Inputs{
170 InputSpec{"a", "TST", "A1"},
171 },
172 },
173 {"C", Inputs{
174 InputSpec{"a", "TST", "A2"},
175 }}};
176}
177
178TEST_CASE("TestDeviceSpec3")
179{
180 auto workflow = defineDataProcessing3();
181 auto configContext = makeEmptyConfigContext();
182 auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(*configContext);
183 auto completionPolicies = CompletionPolicy::createDefaultPolicies();
184 auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies();
185 std::vector<DeviceSpec> devices;
186
187 std::vector<ComputingResource> resources{ComputingResourceHelpers::getLocalhostResource()};
188 SimpleResourceManager rm(resources);
189 DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
190 REQUIRE(devices.size() == 3);
191 REQUIRE(devices[0].outputChannels.size() == 2);
192 REQUIRE(devices[0].outputChannels[0].method == ChannelMethod::Bind);
193 REQUIRE(devices[0].outputChannels[0].type == ChannelType::Push);
194 REQUIRE(devices[0].outputChannels[0].name == "from_A_to_B");
195 REQUIRE(devices[0].outputChannels[0].port == 22000);
196 REQUIRE(devices[0].outputChannels[1].method == ChannelMethod::Bind);
197 REQUIRE(devices[0].outputChannels[1].type == ChannelType::Push);
198 REQUIRE(devices[0].outputChannels[1].name == "from_A_to_C");
199 REQUIRE(devices[0].outputChannels[1].port == 22001);
200
201 REQUIRE(devices[1].inputChannels.size() == 1);
202 REQUIRE(devices[1].inputChannels[0].method == ChannelMethod::Connect);
203 REQUIRE(devices[1].inputChannels[0].type == ChannelType::Pull);
204 REQUIRE(devices[1].inputChannels[0].name == "from_A_to_B");
205 REQUIRE(devices[1].inputChannels[0].port == 22000);
206
207 REQUIRE(devices[2].inputChannels.size() == 1);
208 REQUIRE(devices[2].inputChannels[0].method == ChannelMethod::Connect);
209 REQUIRE(devices[2].inputChannels[0].type == ChannelType::Pull);
210 REQUIRE(devices[2].inputChannels[0].name == "from_A_to_C");
211 REQUIRE(devices[2].inputChannels[0].port == 22001);
212}
213
214// Diamond shape.
216{
217 return {{"A", Inputs{},
218 Outputs{OutputSpec{"TST", "A1"},
219 OutputSpec{"TST", "A2"}}},
220 {"B", Inputs{InputSpec{"input", "TST", "A1"}},
221 Outputs{OutputSpec{"TST", "B1"}}},
222 {"C", Inputs{InputSpec{"input", "TST", "A2"}},
223 Outputs{OutputSpec{"TST", "C1"}}},
224 {"D", Inputs{InputSpec{"a", "TST", "B1"},
225 InputSpec{"b", "TST", "C1"}}}};
226}
227
228TEST_CASE("TestDeviceSpec4")
229{
230 auto workflow = defineDataProcessing4();
231 auto configContext = makeEmptyConfigContext();
232 auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(*configContext);
233 auto completionPolicies = CompletionPolicy::createDefaultPolicies();
234 auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies();
235 std::vector<DeviceSpec> devices;
236 std::vector<ComputingResource> resources{ComputingResourceHelpers::getLocalhostResource()};
237 SimpleResourceManager rm(resources);
238
239 DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
240 REQUIRE(devices.size() == 4);
241 REQUIRE(devices[0].outputChannels.size() == 2);
242 REQUIRE(devices[0].outputChannels[0].method == ChannelMethod::Bind);
243 REQUIRE(devices[0].outputChannels[0].type == ChannelType::Push);
244 REQUIRE(devices[0].outputChannels[0].name == "from_A_to_B");
245 REQUIRE(devices[0].outputChannels[0].port == 22000);
246 REQUIRE(devices[0].outputChannels[1].method == ChannelMethod::Bind);
247 REQUIRE(devices[0].outputChannels[1].type == ChannelType::Push);
248 REQUIRE(devices[0].outputChannels[1].name == "from_A_to_C");
249 REQUIRE(devices[0].outputChannels[1].port == 22001);
250
251 REQUIRE(devices[1].inputChannels.size() == 1);
252 REQUIRE(devices[1].inputChannels[0].method == ChannelMethod::Connect);
253 REQUIRE(devices[1].inputChannels[0].type == ChannelType::Pull);
254 REQUIRE(devices[1].inputChannels[0].name == "from_A_to_B");
255 REQUIRE(devices[1].inputChannels[0].port == 22000);
256 REQUIRE(devices[1].outputChannels.size() == 1);
257 REQUIRE(devices[1].outputChannels[0].method == ChannelMethod::Bind);
258 REQUIRE(devices[1].outputChannels[0].type == ChannelType::Push);
259 REQUIRE(devices[1].outputChannels[0].name == "from_B_to_D");
260 REQUIRE(devices[1].outputChannels[0].port == 22002);
261
262 REQUIRE(devices[2].inputChannels.size() == 1);
263 REQUIRE(devices[2].inputChannels[0].method == ChannelMethod::Connect);
264 REQUIRE(devices[2].inputChannels[0].type == ChannelType::Pull);
265 REQUIRE(devices[2].inputChannels[0].name == "from_A_to_C");
266 REQUIRE(devices[2].inputChannels[0].port == 22001);
267 REQUIRE(devices[2].outputChannels.size() == 1);
268 REQUIRE(devices[2].outputChannels[0].method == ChannelMethod::Bind);
269 REQUIRE(devices[2].outputChannels[0].type == ChannelType::Push);
270 REQUIRE(devices[2].outputChannels[0].name == "from_C_to_D");
271 REQUIRE(devices[2].outputChannels[0].port == 22003);
272
273 REQUIRE(devices[3].inputChannels.size() == 2);
274 REQUIRE(devices[3].inputChannels[0].method == ChannelMethod::Connect);
275 REQUIRE(devices[3].inputChannels[0].type == ChannelType::Pull);
276 REQUIRE(devices[3].inputChannels[0].name == "from_B_to_D");
277 REQUIRE(devices[3].inputChannels[0].port == 22002);
278 REQUIRE(devices[3].inputChannels[1].method == ChannelMethod::Connect);
279 REQUIRE(devices[3].inputChannels[1].type == ChannelType::Pull);
280 REQUIRE(devices[3].inputChannels[1].name == "from_C_to_D");
281 REQUIRE(devices[3].inputChannels[1].port == 22003);
282}
283
284// This defines two consumers for the sameproduct, therefore we
285// need to forward (assuming we are in shared memory).
287{
288 return {{"A", Inputs{}, Outputs{OutputSpec{"TST", "A1"}}},
289 {
290 "B",
291 Inputs{InputSpec{"x", "TST", "A1"}},
292 },
293 {
294 "C",
295 Inputs{InputSpec{"y", "TST", "A1"}},
296 }};
297}
298
299TEST_CASE("TestTopologyForwarding")
300{
301 auto workflow = defineDataProcessing5();
302 auto configContext = makeEmptyConfigContext();
303 auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(*configContext);
304 auto completionPolicies = CompletionPolicy::createDefaultPolicies();
305 auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies();
306 std::vector<DeviceSpec> devices;
307
308 std::vector<ComputingResource> resources{ComputingResourceHelpers::getLocalhostResource()};
309 SimpleResourceManager rm(resources);
310 DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
311 REQUIRE(devices.size() == 3);
312 REQUIRE(devices[0].outputChannels.size() == 1);
313 REQUIRE(devices[0].outputChannels[0].method == ChannelMethod::Bind);
314 REQUIRE(devices[0].outputChannels[0].type == ChannelType::Push);
315 REQUIRE(devices[0].outputChannels[0].name == "from_A_to_B");
316 REQUIRE(devices[0].outputChannels[0].port == 22000);
317
318 REQUIRE(devices[1].inputChannels.size() == 1);
319 REQUIRE(devices[1].inputChannels[0].method == ChannelMethod::Connect);
320 REQUIRE(devices[1].inputChannels[0].type == ChannelType::Pull);
321 REQUIRE(devices[1].inputChannels[0].name == "from_A_to_B");
322 REQUIRE(devices[1].inputChannels[0].port == 22000);
323 REQUIRE(devices[1].outputChannels.size() == 1);
324 REQUIRE(devices[1].outputChannels[0].method == ChannelMethod::Bind);
325 REQUIRE(devices[1].outputChannels[0].type == ChannelType::Push);
326 REQUIRE(devices[1].outputChannels[0].name == "from_B_to_C");
327 REQUIRE(devices[1].outputChannels[0].port == 22001);
328
329 REQUIRE(devices[2].inputChannels.size() == 1);
330 REQUIRE(devices[2].inputChannels[0].method == ChannelMethod::Connect);
331 REQUIRE(devices[2].inputChannels[0].type == ChannelType::Pull);
332 REQUIRE(devices[2].inputChannels[0].name == "from_B_to_C");
333 REQUIRE(devices[2].inputChannels[0].port == 22001);
334
335 REQUIRE(devices[0].inputs.size() == 0);
336 REQUIRE(devices[1].inputs.size() == 1);
337 REQUIRE(devices[2].inputs.size() == 1);
338
339 // The outputs of device[1] are 0 because all
340 // it has is really forwarding rules!
341 REQUIRE(devices[0].outputs.size() == 1);
342 REQUIRE(devices[1].outputs.size() == 0);
343 REQUIRE(devices[2].outputs.size() == 0);
344
345 REQUIRE(devices[1].inputs[0].sourceChannel == "from_A_to_B");
346 REQUIRE(devices[2].inputs[0].sourceChannel == "from_B_to_C");
347
348 REQUIRE(devices[0].forwards.size() == 0);
349 REQUIRE(devices[1].forwards.size() == 1);
350 REQUIRE(devices[2].forwards.size() == 0);
351}
352
353// This defines two consumers for the sameproduct, therefore we
354// need to forward (assuming we are in shared memory).
356{
357 return {{"A", Inputs{}, Outputs{OutputSpec{"TST", "A1"}}},
358 timePipeline({"B", Inputs{InputSpec{"a", "TST", "A1"}}}, 2)};
359}
360
361// This is three explicit layers, last two with
362// multiple (non commensurable) timeslice setups.
364{
365 return {{"A", Inputs{}, {OutputSpec{"TST", "A"}}},
367 {
368 "B",
369 Inputs{InputSpec{"x", "TST", "A"}},
370 Outputs{OutputSpec{"TST", "B"}},
371 },
372 3),
373 timePipeline({"C", Inputs{InputSpec{"x", "TST", "B"}}}, 2)};
374}
375
376TEST_CASE("TestOutEdgeProcessingHelpers")
377{
378 // Logical edges for:
379 // b0---\
380 // / \___c0
381 // / /\ /
382 // a--b1 X
383 // \ \/_\c1
384 // \ / /
385 // b2---/
386 //
387 std::vector<DeviceSpec> devices;
388 std::vector<DeviceId> deviceIndex;
389 std::vector<DeviceConnectionId> connections;
390 std::vector<LogicalForwardInfo> availableForwardsInfo;
391
392 std::vector<OutputSpec> globalOutputs = {OutputSpec{"TST", "A"},
393 OutputSpec{"TST", "B"}};
394
395 std::vector<size_t> edgeOutIndex{0, 1, 2, 3, 6, 4, 7, 5, 8};
396 std::vector<DeviceConnectionEdge> logicalEdges = {
397 {0, 1, 0, 0, 0, 0, false, ConnectionKind::Out},
398 {0, 1, 1, 0, 0, 0, false, ConnectionKind::Out},
399 {0, 1, 2, 0, 0, 0, false, ConnectionKind::Out},
400 {1, 2, 0, 0, 1, 0, false, ConnectionKind::Out},
401 {1, 2, 0, 1, 1, 0, false, ConnectionKind::Out},
402 {1, 2, 0, 2, 1, 0, false, ConnectionKind::Out},
403 {1, 2, 1, 0, 1, 0, false, ConnectionKind::Out},
404 {1, 2, 1, 1, 1, 0, false, ConnectionKind::Out},
405 {1, 2, 1, 2, 1, 0, false, ConnectionKind::Out},
406 };
407
408 std::vector<EdgeAction> actions{
409 EdgeAction{true, true},
410 EdgeAction{false, true},
411 EdgeAction{false, true},
412 EdgeAction{true, true},
413 EdgeAction{true, true},
414 EdgeAction{true, true},
415 EdgeAction{false, true},
416 EdgeAction{false, true},
417 EdgeAction{false, true},
418 };
419
421 auto configContext = makeEmptyConfigContext();
422 auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(*configContext);
423 auto sendingPolicies = SendingPolicy::createDefaultPolicies();
424 auto forwardingPolicies = ForwardingPolicy::createDefaultPolicies();
425
426 std::vector<ComputingResource> resources{ComputingResourceHelpers::getLocalhostResource()};
427 SimpleResourceManager rm(resources);
428 ComputingOffer defaultOffer;
429 defaultOffer.cpu = 0.01;
430 defaultOffer.memory = 0.01;
431
432 DeviceSpecHelpers::processOutEdgeActions(*configContext, devices, deviceIndex, connections, rm, edgeOutIndex, logicalEdges,
433 actions, workflow, globalOutputs, channelPolicies, sendingPolicies, forwardingPolicies, "", defaultOffer);
434
435 std::vector<DeviceId> expectedDeviceIndex = {{0, 0, 0}, {0, 0, 0}, {0, 0, 0}, {1, 0, 1}, {1, 0, 1}, {1, 1, 2}, {1, 1, 2}, {1, 2, 3}, {1, 2, 3}};
436 REQUIRE(devices.size() == 4);
437 ; // For producers
438 REQUIRE(expectedDeviceIndex.size() == deviceIndex.size());
439
440 for (size_t i = 0; i < expectedDeviceIndex.size(); ++i) {
441 DeviceId& expected = expectedDeviceIndex[i];
442 DeviceId& actual = deviceIndex[i];
443 REQUIRE(expected.processorIndex == actual.processorIndex);
444 REQUIRE(expected.timeslice == actual.timeslice);
445 REQUIRE(expected.deviceIndex == actual.deviceIndex);
446 }
447
448 // Check that all the required channels are there.
449 REQUIRE(devices[0].outputChannels.size() == 3);
450 REQUIRE(devices[1].outputChannels.size() == 2);
451 REQUIRE(devices[2].outputChannels.size() == 2);
452 REQUIRE(devices[3].outputChannels.size() == 2);
453
454 // Check that the required output routes are there
455 REQUIRE(devices[0].outputs.size() == 3);
456 REQUIRE(devices[1].outputs.size() == 2);
457 REQUIRE(devices[2].outputs.size() == 2);
458 REQUIRE(devices[3].outputs.size() == 2);
459
460 auto offers = rm.getAvailableOffers();
461 REQUIRE(offers.size() == 1);
462 REQUIRE(offers[0].startPort == 22009);
463
464 // Not sure this is correct, but lets assume that's the case..
465 std::vector<size_t> edgeInIndex{0, 1, 2, 3, 4, 5, 6, 7, 8};
466
467 std::vector<EdgeAction> inActions{
468 EdgeAction{true, true},
469 EdgeAction{true, true},
470 EdgeAction{true, true},
471 EdgeAction{true, true},
472 EdgeAction{false, true},
473 EdgeAction{false, true},
474 EdgeAction{true, true},
475 EdgeAction{false, true},
476 EdgeAction{false, true},
477 };
478
479 std::sort(connections.begin(), connections.end());
480
481 DeviceSpecHelpers::processInEdgeActions(devices, deviceIndex, connections, rm, edgeInIndex, logicalEdges,
482 inActions, workflow, availableForwardsInfo, channelPolicies, "", defaultOffer);
483 //
484 std::vector<DeviceId> expectedDeviceIndexFinal = {{0, 0, 0}, {0, 0, 0}, {0, 0, 0}, {1, 0, 1}, {1, 0, 1}, {1, 1, 2}, {1, 1, 2}, {1, 2, 3}, {1, 2, 3}, {2, 0, 4}, {2, 1, 5}};
485 REQUIRE(expectedDeviceIndexFinal.size() == deviceIndex.size());
486
487 for (size_t i = 0; i < expectedDeviceIndexFinal.size(); ++i) {
488 DeviceId& expected = expectedDeviceIndexFinal[i];
489 DeviceId& actual = deviceIndex[i];
490 REQUIRE(expected.processorIndex == actual.processorIndex);
491 REQUIRE(expected.timeslice == actual.timeslice);
492 REQUIRE(expected.deviceIndex == actual.deviceIndex);
493 }
494
495 // Iterating over the in edges should have created the final 2
496 // devices.
497 REQUIRE(devices.size() == 6);
498 std::vector<std::string> expectedDeviceNames = {"A", "B_t0", "B_t1", "B_t2", "C_t0", "C_t1"};
499
500 for (size_t i = 0; i < devices.size(); ++i) {
501 REQUIRE(devices[i].id == expectedDeviceNames[i]);
502 }
503
504 // Check that all the required output channels are there.
505 REQUIRE(devices[0].outputChannels.size() == 3);
506 REQUIRE(devices[1].outputChannels.size() == 2);
507 REQUIRE(devices[2].outputChannels.size() == 2);
508 REQUIRE(devices[3].outputChannels.size() == 2);
509 REQUIRE(devices[4].outputChannels.size() == 0);
510 REQUIRE(devices[5].outputChannels.size() == 0);
511
512 // Check that the required routes are there
513 REQUIRE(devices[0].outputs.size() == 3);
514 REQUIRE(devices[1].outputs.size() == 2);
515 REQUIRE(devices[2].outputs.size() == 2);
516 REQUIRE(devices[3].outputs.size() == 2);
517 REQUIRE(devices[4].outputs.size() == 0);
518 REQUIRE(devices[5].outputs.size() == 0);
519
520 SendingPolicy dummy;
521 // Check that the output specs and the timeframe ids are correct
522 std::vector<std::vector<OutputRoute>> expectedRoutes = {
523 {
524 OutputRoute{0, 3, globalOutputs[0], "from_A_to_B_t0", &dummy},
525 OutputRoute{1, 3, globalOutputs[0], "from_A_to_B_t1", &dummy},
526 OutputRoute{2, 3, globalOutputs[0], "from_A_to_B_t2", &dummy},
527 },
528 {
529 OutputRoute{0, 2, globalOutputs[1], "from_B_t0_to_C_t0", &dummy},
530 OutputRoute{1, 2, globalOutputs[1], "from_B_t0_to_C_t1", &dummy},
531 },
532 {
533 OutputRoute{0, 2, globalOutputs[1], "from_B_t1_to_C_t0", &dummy},
534 OutputRoute{1, 2, globalOutputs[1], "from_B_t1_to_C_t1", &dummy},
535 },
536 {
537 OutputRoute{0, 2, globalOutputs[1], "from_B_t2_to_C_t0", &dummy},
538 OutputRoute{1, 2, globalOutputs[1], "from_B_t2_to_C_t1", &dummy},
539 },
540 };
541
542 for (size_t di = 0; di < expectedRoutes.size(); di++) {
543 auto& routes = expectedRoutes[di];
544 auto& device = devices[di];
545 for (size_t ri = 0; ri < device.outputs.size(); ri++) {
546 // FIXME: check that the matchers are the same
547 auto concreteA = DataSpecUtils::asConcreteDataTypeMatcher(device.outputs[ri].matcher);
548 auto concreteB = DataSpecUtils::asConcreteDataTypeMatcher(routes[ri].matcher);
549 REQUIRE(std::string(concreteA.origin.as<std::string>()) == std::string(concreteB.origin.as<std::string>()));
550 REQUIRE(device.outputs[ri].channel == routes[ri].channel);
551 REQUIRE(device.outputs[ri].timeslice == routes[ri].timeslice);
552 }
553 }
554
555 // Check that we have all the needed input connections
556 REQUIRE(devices[0].inputChannels.size() == 0);
557 REQUIRE(devices[1].inputChannels.size() == 1);
558 REQUIRE(devices[2].inputChannels.size() == 1);
559 REQUIRE(devices[3].inputChannels.size() == 1);
560 REQUIRE(devices[4].inputChannels.size() == 3);
561 REQUIRE(devices[5].inputChannels.size() == 3);
562
563 // Check that the required input routes are there
564 REQUIRE(devices[0].inputs.size() == 0);
565 REQUIRE(devices[1].inputs.size() == 1);
566 REQUIRE(devices[2].inputs.size() == 1);
567 REQUIRE(devices[3].inputs.size() == 1);
568 REQUIRE(devices[4].inputs.size() == 3);
569 REQUIRE(devices[5].inputs.size() == 3);
570
571 REQUIRE(devices[1].inputs[0].sourceChannel == "from_A_to_B_t0");
572 REQUIRE(devices[2].inputs[0].sourceChannel == "from_A_to_B_t1");
573 REQUIRE(devices[3].inputs[0].sourceChannel == "from_A_to_B_t2");
574
575 REQUIRE(devices[4].inputs[0].sourceChannel == "from_B_t0_to_C_t0");
576 REQUIRE(devices[4].inputs[1].sourceChannel == "from_B_t1_to_C_t0");
577 REQUIRE(devices[4].inputs[2].sourceChannel == "from_B_t2_to_C_t0");
578
579 REQUIRE(devices[5].inputs[0].sourceChannel == "from_B_t0_to_C_t1");
580 REQUIRE(devices[5].inputs[1].sourceChannel == "from_B_t1_to_C_t1");
581 REQUIRE(devices[5].inputs[2].sourceChannel == "from_B_t2_to_C_t1");
582}
583
584TEST_CASE("TestTopologyLayeredTimePipeline")
585{
586 auto workflow = defineDataProcessing7();
587 std::vector<DeviceSpec> devices;
588 auto configContext = makeEmptyConfigContext();
589 auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(*configContext);
590 auto completionPolicies = CompletionPolicy::createDefaultPolicies();
591 auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies();
592 std::vector<ComputingResource> resources{ComputingResourceHelpers::getLocalhostResource()};
593 SimpleResourceManager rm(resources);
594 DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext);
595 REQUIRE(devices.size() == 6);
596 REQUIRE(devices[0].id == "A");
597 REQUIRE(devices[1].id == "B_t0");
598 REQUIRE(devices[2].id == "B_t1");
599 REQUIRE(devices[3].id == "B_t2");
600 REQUIRE(devices[4].id == "C_t0");
601 REQUIRE(devices[5].id == "C_t1");
602
603 REQUIRE(devices[0].inputChannels.size() == 0);
604 REQUIRE(devices[0].outputChannels.size() == 3);
605 REQUIRE(devices[0].outputChannels[0].method == ChannelMethod::Bind);
606 REQUIRE(devices[0].outputChannels[0].type == ChannelType::Push);
607 REQUIRE(devices[0].outputChannels[0].name == "from_A_to_B_t0");
608 REQUIRE(devices[0].outputChannels[0].port == 22000);
609 REQUIRE(devices[0].outputChannels[1].method == ChannelMethod::Bind);
610 REQUIRE(devices[0].outputChannels[1].type == ChannelType::Push);
611 REQUIRE(devices[0].outputChannels[1].name == "from_A_to_B_t1");
612 REQUIRE(devices[0].outputChannels[1].port == 22001);
613 REQUIRE(devices[0].outputChannels[2].method == ChannelMethod::Bind);
614 REQUIRE(devices[0].outputChannels[2].type == ChannelType::Push);
615 REQUIRE(devices[0].outputChannels[2].name == "from_A_to_B_t2");
616 REQUIRE(devices[0].outputChannels[2].port == 22002);
617
618 REQUIRE(devices[1].inputChannels.size() == 1);
619 REQUIRE(devices[1].inputChannels[0].method == ChannelMethod::Connect);
620 REQUIRE(devices[1].inputChannels[0].type == ChannelType::Pull);
621 REQUIRE(devices[1].inputChannels[0].name == "from_A_to_B_t0");
622 REQUIRE(devices[1].inputChannels[0].port == 22000);
623 REQUIRE(devices[1].outputChannels.size() == 2);
624 REQUIRE(devices[1].outputChannels[0].method == ChannelMethod::Bind);
625 REQUIRE(devices[1].outputChannels[0].type == ChannelType::Push);
626 REQUIRE(devices[1].outputChannels[0].name == "from_B_t0_to_C_t0");
627 REQUIRE(devices[1].outputChannels[0].port == 22003);
628 REQUIRE(devices[1].outputChannels[1].method == ChannelMethod::Bind);
629 REQUIRE(devices[1].outputChannels[1].type == ChannelType::Push);
630 REQUIRE(devices[1].outputChannels[1].name == "from_B_t0_to_C_t1");
631 REQUIRE(devices[1].outputChannels[1].port == 22004);
632
633 REQUIRE(devices[2].inputChannels.size() == 1);
634 REQUIRE(devices[2].inputChannels[0].method == ChannelMethod::Connect);
635 REQUIRE(devices[2].inputChannels[0].type == ChannelType::Pull);
636 REQUIRE(devices[2].inputChannels[0].name == "from_A_to_B_t1");
637 REQUIRE(devices[2].inputChannels[0].port == 22001);
638 REQUIRE(devices[2].outputChannels.size() == 2);
639 REQUIRE(devices[2].outputChannels[0].method == ChannelMethod::Bind);
640 REQUIRE(devices[2].outputChannels[0].type == ChannelType::Push);
641 REQUIRE(devices[2].outputChannels[0].name == "from_B_t1_to_C_t0");
642 REQUIRE(devices[2].outputChannels[0].port == 22005);
643 REQUIRE(devices[2].outputChannels[1].method == ChannelMethod::Bind);
644 REQUIRE(devices[2].outputChannels[1].type == ChannelType::Push);
645 REQUIRE(devices[2].outputChannels[1].name == "from_B_t1_to_C_t1");
646 REQUIRE(devices[2].outputChannels[1].port == 22006);
647
648 REQUIRE(devices[3].inputChannels.size() == 1);
649 REQUIRE(devices[3].inputChannels[0].method == ChannelMethod::Connect);
650 REQUIRE(devices[3].inputChannels[0].type == ChannelType::Pull);
651 REQUIRE(devices[3].inputChannels[0].name == "from_A_to_B_t2");
652 REQUIRE(devices[3].inputChannels[0].port == 22002);
653 REQUIRE(devices[3].outputChannels.size() == 2);
654 REQUIRE(devices[3].outputChannels[0].method == ChannelMethod::Bind);
655 REQUIRE(devices[3].outputChannels[0].type == ChannelType::Push);
656 REQUIRE(devices[3].outputChannels[0].name == "from_B_t2_to_C_t0");
657 REQUIRE(devices[3].outputChannels[0].port == 22007);
658 REQUIRE(devices[3].outputChannels[1].method == ChannelMethod::Bind);
659 REQUIRE(devices[3].outputChannels[1].type == ChannelType::Push);
660 REQUIRE(devices[3].outputChannels[1].name == "from_B_t2_to_C_t1");
661 REQUIRE(devices[3].outputChannels[1].port == 22008);
662
663 REQUIRE(devices[4].inputChannels.size() == 3);
664 REQUIRE(devices[4].inputChannels[0].method == ChannelMethod::Connect);
665 REQUIRE(devices[4].inputChannels[0].type == ChannelType::Pull);
666 REQUIRE(devices[4].inputChannels[0].name == "from_B_t0_to_C_t0");
667 REQUIRE(devices[4].inputChannels[0].port == 22003);
668 REQUIRE(devices[4].inputChannels[1].method == ChannelMethod::Connect);
669 REQUIRE(devices[4].inputChannels[1].type == ChannelType::Pull);
670 REQUIRE(devices[4].inputChannels[1].name == "from_B_t1_to_C_t0");
671 REQUIRE(devices[4].inputChannels[1].port == 22005);
672 REQUIRE(devices[4].inputChannels[2].method == ChannelMethod::Connect);
673 REQUIRE(devices[4].inputChannels[2].type == ChannelType::Pull);
674 REQUIRE(devices[4].inputChannels[2].name == "from_B_t2_to_C_t0");
675 REQUIRE(devices[4].inputChannels[2].port == 22007);
676 REQUIRE(devices[4].outputChannels.size() == 0);
677
678 REQUIRE(devices[5].inputChannels.size() == 3);
679 REQUIRE(devices[5].inputChannels[0].method == ChannelMethod::Connect);
680 REQUIRE(devices[5].inputChannels[0].type == ChannelType::Pull);
681 REQUIRE(devices[5].inputChannels[0].name == "from_B_t0_to_C_t1");
682 REQUIRE(devices[5].inputChannels[0].port == 22004);
683 REQUIRE(devices[5].inputChannels[1].method == ChannelMethod::Connect);
684 REQUIRE(devices[5].inputChannels[1].type == ChannelType::Pull);
685 REQUIRE(devices[5].inputChannels[1].name == "from_B_t1_to_C_t1");
686 REQUIRE(devices[5].inputChannels[1].port == 22006);
687 REQUIRE(devices[5].inputChannels[2].method == ChannelMethod::Connect);
688 REQUIRE(devices[5].inputChannels[2].type == ChannelType::Pull);
689 REQUIRE(devices[5].inputChannels[2].name == "from_B_t2_to_C_t1");
690 REQUIRE(devices[5].inputChannels[2].port == 22008);
691 REQUIRE(devices[5].outputChannels.size() == 0);
692}
693
694// Test the case in which we have one source with two
695// description and a wildcard for both description and
696// subspec on the receiving side:
697//
698// A/1
699// \ B
700// /
701// A/2
703{
704 return {
705 {"A", Inputs{InputSpec{"timer", "DPL", "TIMER", 0, Lifetime::Timer}}, {OutputSpec{"A", "1"}, OutputSpec{"A", "2"}}},
707 {"internal-dpl-timer", {}, {OutputSpec{"DPL", "TIMER", 0, Lifetime::Timer}}}};
708}
709TEST_CASE("TestSimpleWildcard")
710{
711 auto workflow = defineDataProcessing8();
712 std::vector<ComputingResource> resources{ComputingResourceHelpers::getLocalhostResource()};
713 SimpleResourceManager rm(resources);
714 auto configContext = makeEmptyConfigContext();
715 auto channelPolicies = ChannelConfigurationPolicy::createDefaultPolicies(*configContext);
716 auto sendingPolicies = SendingPolicy::createDefaultPolicies();
717 auto forwardingPolicies = ForwardingPolicy::createDefaultPolicies();
718
719 std::vector<DeviceSpec> devices;
720 std::vector<DeviceId> deviceIndex;
721 std::vector<DeviceConnectionId> connections;
722 std::vector<LogicalForwardInfo> availableForwardsInfo;
723
724 std::vector<OutputSpec> globalOutputs = {OutputSpec{"A", "1"},
725 OutputSpec{"A", "2"},
726 OutputSpec{"DPL", "TIMER", 0, Lifetime::Timer}};
727
728 // See values in test_WorkflowHelpers.cxx
729 std::vector<size_t> edgeOutIndex{1, 2, 0};
730 std::vector<size_t> edgeInIndex{0, 1, 2};
731 std::vector<DeviceConnectionEdge> logicalEdges = {
732 {2, 0, 0, 0, 2, 0, false, ConnectionKind::Out},
733 {0, 1, 0, 0, 0, 0, false, ConnectionKind::Out},
734 {0, 1, 0, 0, 1, 0, false, ConnectionKind::Out},
735 };
736
737 // See values in test_WorkflowHelpers.cxx
738 std::vector<EdgeAction> outActions{
739 EdgeAction{true, true},
740 EdgeAction{true, true},
741 EdgeAction{false, false},
742 };
743
744 // See values in test_WorkflowHelpers.cxx
745 std::vector<EdgeAction> inActions{
746 EdgeAction{true, true},
747 EdgeAction{true, true},
748 EdgeAction{false, false},
749 };
750
751 ComputingOffer defaultOffer;
752 defaultOffer.cpu = 0.01;
753 defaultOffer.memory = 0.01;
754
755 DeviceSpecHelpers::processOutEdgeActions(*configContext, devices, deviceIndex, connections, rm, edgeOutIndex, logicalEdges,
756 outActions, workflow, globalOutputs, channelPolicies, sendingPolicies, forwardingPolicies, "", defaultOffer);
757
758 REQUIRE(devices.size() == 2);
759 ; // Two devices have outputs: A and Timer
760 REQUIRE(devices[0].name == "A");
761 REQUIRE(devices[1].name == "internal-dpl-timer");
762 REQUIRE(deviceIndex.size() == 2);
763 REQUIRE(deviceIndex[0].processorIndex == 0);
764 ; // A is the first processor in the workflow
765 REQUIRE(deviceIndex[0].timeslice == 0);
766 ; // There is no time pipelining
767 REQUIRE(deviceIndex[0].deviceIndex == 0);
768 ; // It's also the first device created
769 REQUIRE(deviceIndex[1].processorIndex == 2);
770 ; // TIMER is added only at the end
771 REQUIRE(deviceIndex[1].timeslice == 0);
772 ; // There is no time pipelining
773 REQUIRE(deviceIndex[1].deviceIndex == 1);
774 ; // It's the second device created
775
776 std::sort(connections.begin(), connections.end());
777
778 DeviceSpecHelpers::processInEdgeActions(devices, deviceIndex, connections, rm, edgeInIndex, logicalEdges,
779 inActions, workflow, availableForwardsInfo, channelPolicies, "", defaultOffer);
780
781 REQUIRE(devices.size() == 3);
782 ; // Now we also have B
783 REQUIRE(devices[0].name == "A");
784 REQUIRE(devices[1].name == "internal-dpl-timer");
785 REQUIRE(devices[2].name == "B");
786 REQUIRE(deviceIndex.size() == 3);
787 REQUIRE(deviceIndex[1].processorIndex == 1);
788 ; // B is the second processor in the workflow
789 REQUIRE(deviceIndex[1].timeslice == 0);
790 ; // There is no time pipelining
791 REQUIRE(deviceIndex[1].deviceIndex == 2);
792 ; // It's the last device created because it's a sink
793
794 // We should have only one input, because the two outputs of A can
795 // be captured by the generic matcher in B
796 REQUIRE(devices[2].inputs.size() == 1);
797}
int32_t i
std::unique_ptr< ConfigContext > makeEmptyConfigContext()
std::vector< ComputingOffer > getAvailableOffers() override
Get the available resources for a device to run on.
GLuint const GLchar * name
Definition glcorearb.h:781
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
DataProcessorSpec timePipeline(DataProcessorSpec original, size_t count)
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
static std::vector< CallbacksPolicy > createDefaultPolicies()
static PolicyMatcher matchAny
Catch all policy, used by the last rule.
static OutputChannelModifier pushOutput(FairMQChannelConfigSpec const &spec)
Makes the passed output channel bind and push.
static InputChannelModifier pullInput(FairMQChannelConfigSpec const &spec)
Makes the passed input channel connect and pull.
static std::vector< ChannelConfigurationPolicy > createDefaultPolicies(ConfigContext const &configContext)
Default policies to use, based on the contents of the @configContex content.
static std::vector< CompletionPolicy > createDefaultPolicies()
Helper to create the default configuration.
static ConcreteDataTypeMatcher asConcreteDataTypeMatcher(OutputSpec const &spec)
static data_matcher::DataDescriptorMatcher dataDescriptorMatcherFrom(ConcreteDataMatcher const &concrete)
Build a DataDescriptMatcher which does not care about the subSpec.
static void processOutEdgeActions(ConfigContext const &configContext, std::vector< DeviceSpec > &devices, std::vector< DeviceId > &deviceIndex, std::vector< DeviceConnectionId > &connections, ResourceManager &resourceManager, const std::vector< size_t > &outEdgeIndex, const std::vector< DeviceConnectionEdge > &logicalEdges, const std::vector< EdgeAction > &actions, const WorkflowSpec &workflow, const std::vector< OutputSpec > &outputs, std::vector< ChannelConfigurationPolicy > const &channelPolicies, std::vector< SendingPolicy > const &sendingPolicies, std::vector< ForwardingPolicy > const &forwardingPolicies, std::string const &channelPrefix, ComputingOffer const &defaultOffer, OverrideServiceSpecs const &overrideServices={})
static void processInEdgeActions(std::vector< DeviceSpec > &devices, std::vector< DeviceId > &deviceIndex, const std::vector< DeviceConnectionId > &connections, ResourceManager &resourceManager, const std::vector< size_t > &inEdgeIndex, const std::vector< DeviceConnectionEdge > &logicalEdges, const std::vector< EdgeAction > &actions, const WorkflowSpec &workflow, const std::vector< LogicalForwardInfo > &availableForwardsInfo, std::vector< ChannelConfigurationPolicy > const &channelPolicies, std::string const &channelPrefix, ComputingOffer const &defaultOffer, OverrideServiceSpecs const &overrideServices={})
static void dataProcessorSpecs2DeviceSpecs(const WorkflowSpec &workflow, std::vector< ChannelConfigurationPolicy > const &channelPolicies, std::vector< CompletionPolicy > const &completionPolicies, std::vector< DispatchPolicy > const &dispatchPolicies, std::vector< ResourcePolicy > const &resourcePolicies, std::vector< CallbacksPolicy > const &callbacksPolicies, std::vector< SendingPolicy > const &sendingPolicy, std::vector< ForwardingPolicy > const &forwardingPolicies, std::vector< DeviceSpec > &devices, ResourceManager &resourceManager, std::string const &uniqueWorkflowId, ConfigContext const &configContext, bool optimizeTopology=false, unsigned short resourcesMonitoringInterval=0, std::string const &channelPrefix="", OverrideServiceSpecs const &overrideServices={})
static std::vector< ForwardingPolicy > createDefaultPolicies()
static std::vector< SendingPolicy > createDefaultPolicies()
std::map< std::string, ID > expected
WorkflowSpec defineDataProcessing7()
WorkflowSpec defineDataProcessing8()
WorkflowSpec defineDataProcessing4()
WorkflowSpec defineDataProcessing1()
WorkflowSpec defineDataProcessing5()
WorkflowSpec defineDataProcessing6()
WorkflowSpec defineDataProcessing3()
TEST_CASE("TestDeviceSpec1")
WorkflowSpec defineDataProcessing2()