Project
Loading...
Searching...
No Matches
test_FrameworkDataFlowToDDS.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include "Mocking.h"
13#include <catch_amalgamated.hpp>
14#include "../src/DDSConfigHelpers.h"
15#include "../src/DeviceSpecHelpers.h"
16#include "../src/SimpleResourceManager.h"
17#include "../src/ComputingResourceHelpers.h"
27
28#include <chrono>
29#include <sstream>
30#include <thread>
31#include <memory>
32
33using namespace o2::framework;
34
36{
37 return AlgorithmSpec{[what](ProcessingContext& ctx) {
38 ctx.outputs().make<int>(Output{"TST", what, 0}, 1);
39 }};
40}
41
42namespace
43{
44// This is how you can define your processing in a declarative way
46{
47 return {{.name = "A",
48 .outputs = {OutputSpec{"TST", "A1"},
49 OutputSpec{"TST", "A2"}},
50 .algorithm = AlgorithmSpec{[](ProcessingContext& ctx) {
51 std::this_thread::sleep_for(std::chrono::seconds(1));
52 ctx.outputs().make<int>(Output{"TST", "A1", 0}, 1);
53 ctx.outputs().make<int>(Output{"TST", "A2", 0}, 1);
54 }}},
55 {.name = "B",
56 .inputs = {InputSpec{"x", "TST", "A1"}},
57 .outputs = {OutputSpec{"TST", "B1"}},
58 .algorithm = simplePipe(o2::header::DataDescription{"B1"})},
59 {.name = "C",
60 .inputs = {InputSpec{"y", "TST", "A2"}},
61 .outputs = {OutputSpec{"TST", "C1"}},
62 .algorithm = simplePipe(o2::header::DataDescription{"C1"})},
63 {.name = "D",
64 .inputs = {
65 InputSpec{"x", "TST", "B1"},
66 InputSpec{"y", "TST", "C1"},
67 },
68 .algorithm = {
69 [](ProcessingContext&) {},
70 },
71 .options = {
72 ConfigParamSpec{"a-param", VariantType::Int, 1, {"A parameter which should not be escaped"}},
73 ConfigParamSpec{"b-param", VariantType::String, "", {"a parameter which will be escaped"}},
74 ConfigParamSpec{"c-param", VariantType::String, "foo;bar", {"another parameter which will be escaped"}},
75 },
76 .labels = {}}};
77}
78
79WorkflowSpec defineDataProcessingExpendable()
80{
81 return {{.name = "A",
82 .outputs = {OutputSpec{"TST", "A1"},
83 OutputSpec{"TST", "A2"}},
84 .algorithm = AlgorithmSpec{[](ProcessingContext& ctx) {
85 std::this_thread::sleep_for(std::chrono::seconds(1));
86 ctx.outputs().make<int>(Output{"TST", "A1", 0}, 1);
87 ctx.outputs().make<int>(Output{"TST", "A2", 0}, 1);
88 }}},
89 {.name = "B",
90 .inputs = {InputSpec{"x", "TST", "A1"}},
91 .outputs = {OutputSpec{"TST", "B1"}},
92 .algorithm = simplePipe(o2::header::DataDescription{"B1"})},
93 {.name = "C",
94 .inputs = {InputSpec{"y", "TST", "A2"}},
95 .outputs = {OutputSpec{"TST", "C1"}},
96 .algorithm = simplePipe(o2::header::DataDescription{"C1"})},
97 {.name = "D",
98 .inputs = {
99 InputSpec{"x", "TST", "B1"},
100 InputSpec{"y", "TST", "C1"},
101 },
102 .algorithm = {
103 [](ProcessingContext&) {},
104 },
105 .options = {
106 ConfigParamSpec{"a-param", VariantType::Int, 1, {"A parameter which should not be escaped"}},
107 ConfigParamSpec{"b-param", VariantType::String, "", {"a parameter which will be escaped"}},
108 ConfigParamSpec{"c-param", VariantType::String, "foo;bar", {"another parameter which will be escaped"}},
109 },
110 .labels = {{"expendable"}}}};
111}
112
113char* strdiffchr(const char* s1, const char* s2)
114{
115 while (*s1 && *s1 == *s2) {
116 s1++;
117 s2++;
118 }
119 return (*s1 == *s2) ? nullptr : (char*)s1;
120}
121} // namespace
122
123TEST_CASE("TestDDS")
124{
125 auto workflow = defineDataProcessing();
126 std::ostringstream ss{""};
127 auto configContext = makeEmptyConfigContext();
128 auto channelPolicies = makeTrivialChannelPolicies(*configContext);
129 std::vector<DeviceSpec> devices;
130 std::vector<ComputingResource> resources{ComputingResourceHelpers::getLocalhostResource()};
131 SimpleResourceManager rm(resources);
132 auto completionPolicies = CompletionPolicy::createDefaultPolicies();
133 auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies();
134 DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext, true);
135 std::vector<DeviceControl> controls;
136 std::vector<DeviceExecution> executions;
137 controls.resize(devices.size());
138 executions.resize(devices.size());
139
140 std::vector<ConfigParamSpec> workflowOptions = {
141 ConfigParamSpec{"jobs", VariantType::Int, 4, {"number of producer jobs"}}};
142
143 std::vector<DataProcessorInfo> dataProcessorInfos = {
144 {
145 {"A", "bcsadc/foo", {}, workflowOptions},
146 {"B", "foo", {}, workflowOptions},
147 {"C", "foo", {}, workflowOptions},
148 {"D", "foo", {}, workflowOptions},
149 }};
150 DriverConfig driverConfig = {
151 .batch = true,
152 };
153 DeviceSpecHelpers::prepareArguments(false, false, false, 8080,
154 driverConfig,
155 dataProcessorInfos,
156 devices, executions, controls, {},
157 "workflow-id");
158 CommandInfo command{"foo"};
159 DDSConfigHelpers::dumpDeviceSpec2DDS(ss, DriverMode::STANDALONE, "", workflow, dataProcessorInfos, devices, executions, command);
160 auto expected = R"EXPECTED(<topology name="o2-dataflow">
161<asset name="dpl_json" type="inline" visibility="global" value="{
162 &quot;workflow&quot;: [
163 {
164 &quot;name&quot;: &quot;A&quot;,
165 &quot;inputs&quot;: [],
166 &quot;outputs&quot;: [
167 {
168 &quot;binding&quot;: &quot;TST/A1/0&quot;,
169 &quot;origin&quot;: &quot;TST&quot;,
170 &quot;description&quot;: &quot;A1&quot;,
171 &quot;subspec&quot;: 0,
172 &quot;lifetime&quot;: 0
173 },
174 {
175 &quot;binding&quot;: &quot;TST/A2/0&quot;,
176 &quot;origin&quot;: &quot;TST&quot;,
177 &quot;description&quot;: &quot;A2&quot;,
178 &quot;subspec&quot;: 0,
179 &quot;lifetime&quot;: 0
180 }
181 ],
182 &quot;options&quot;: [],
183 &quot;labels&quot;: [],
184 &quot;metadata&quot;: [],
185 &quot;rank&quot;: 0,
186 &quot;nSlots&quot;: 1,
187 &quot;inputTimeSliceId&quot;: 0,
188 &quot;maxInputTimeslices&quot;: 1
189 },
190 {
191 &quot;name&quot;: &quot;B&quot;,
192 &quot;inputs&quot;: [
193 {
194 &quot;binding&quot;: &quot;x&quot;,
195 &quot;origin&quot;: &quot;TST&quot;,
196 &quot;description&quot;: &quot;A1&quot;,
197 &quot;subspec&quot;: 0,
198 &quot;lifetime&quot;: 0
199 }
200 ],
201 &quot;outputs&quot;: [
202 {
203 &quot;binding&quot;: &quot;TST/B1/0&quot;,
204 &quot;origin&quot;: &quot;TST&quot;,
205 &quot;description&quot;: &quot;B1&quot;,
206 &quot;subspec&quot;: 0,
207 &quot;lifetime&quot;: 0
208 }
209 ],
210 &quot;options&quot;: [],
211 &quot;labels&quot;: [],
212 &quot;metadata&quot;: [],
213 &quot;rank&quot;: 0,
214 &quot;nSlots&quot;: 1,
215 &quot;inputTimeSliceId&quot;: 0,
216 &quot;maxInputTimeslices&quot;: 1
217 },
218 {
219 &quot;name&quot;: &quot;C&quot;,
220 &quot;inputs&quot;: [
221 {
222 &quot;binding&quot;: &quot;y&quot;,
223 &quot;origin&quot;: &quot;TST&quot;,
224 &quot;description&quot;: &quot;A2&quot;,
225 &quot;subspec&quot;: 0,
226 &quot;lifetime&quot;: 0
227 }
228 ],
229 &quot;outputs&quot;: [
230 {
231 &quot;binding&quot;: &quot;TST/C1/0&quot;,
232 &quot;origin&quot;: &quot;TST&quot;,
233 &quot;description&quot;: &quot;C1&quot;,
234 &quot;subspec&quot;: 0,
235 &quot;lifetime&quot;: 0
236 }
237 ],
238 &quot;options&quot;: [],
239 &quot;labels&quot;: [],
240 &quot;metadata&quot;: [],
241 &quot;rank&quot;: 0,
242 &quot;nSlots&quot;: 1,
243 &quot;inputTimeSliceId&quot;: 0,
244 &quot;maxInputTimeslices&quot;: 1
245 },
246 {
247 &quot;name&quot;: &quot;D&quot;,
248 &quot;inputs&quot;: [
249 {
250 &quot;binding&quot;: &quot;x&quot;,
251 &quot;origin&quot;: &quot;TST&quot;,
252 &quot;description&quot;: &quot;B1&quot;,
253 &quot;subspec&quot;: 0,
254 &quot;lifetime&quot;: 0
255 },
256 {
257 &quot;binding&quot;: &quot;y&quot;,
258 &quot;origin&quot;: &quot;TST&quot;,
259 &quot;description&quot;: &quot;C1&quot;,
260 &quot;subspec&quot;: 0,
261 &quot;lifetime&quot;: 0
262 }
263 ],
264 &quot;outputs&quot;: [],
265 &quot;options&quot;: [
266 {
267 &quot;name&quot;: &quot;a-param&quot;,
268 &quot;type&quot;: &quot;0&quot;,
269 &quot;defaultValue&quot;: &quot;1&quot;,
270 &quot;help&quot;: &quot;A parameter which should not be escaped&quot;,
271 &quot;kind&quot;: &quot;0&quot;
272 },
273 {
274 &quot;name&quot;: &quot;b-param&quot;,
275 &quot;type&quot;: &quot;4&quot;,
276 &quot;defaultValue&quot;: &quot;&quot;,
277 &quot;help&quot;: &quot;a parameter which will be escaped&quot;,
278 &quot;kind&quot;: &quot;0&quot;
279 },
280 {
281 &quot;name&quot;: &quot;c-param&quot;,
282 &quot;type&quot;: &quot;4&quot;,
283 &quot;defaultValue&quot;: &quot;foo;bar&quot;,
284 &quot;help&quot;: &quot;another parameter which will be escaped&quot;,
285 &quot;kind&quot;: &quot;0&quot;
286 }
287 ],
288 &quot;labels&quot;: [],
289 &quot;metadata&quot;: [],
290 &quot;rank&quot;: 0,
291 &quot;nSlots&quot;: 1,
292 &quot;inputTimeSliceId&quot;: 0,
293 &quot;maxInputTimeslices&quot;: 1
294 }
295 ],
296 &quot;metadata&quot;: [
297 {
298 &quot;name&quot;: &quot;A&quot;,
299 &quot;executable&quot;: &quot;bcsadc/foo&quot;,
300 &quot;cmdLineArgs&quot;: [],
301 &quot;workflowOptions&quot;: [
302 {
303 &quot;name&quot;: &quot;jobs&quot;,
304 &quot;type&quot;: &quot;0&quot;,
305 &quot;defaultValue&quot;: &quot;4&quot;,
306 &quot;help&quot;: &quot;number of producer jobs&quot;
307 }
308 ],
309 &quot;channels&quot;: []
310 },
311 {
312 &quot;name&quot;: &quot;B&quot;,
313 &quot;executable&quot;: &quot;foo&quot;,
314 &quot;cmdLineArgs&quot;: [],
315 &quot;workflowOptions&quot;: [
316 {
317 &quot;name&quot;: &quot;jobs&quot;,
318 &quot;type&quot;: &quot;0&quot;,
319 &quot;defaultValue&quot;: &quot;4&quot;,
320 &quot;help&quot;: &quot;number of producer jobs&quot;
321 }
322 ],
323 &quot;channels&quot;: []
324 },
325 {
326 &quot;name&quot;: &quot;C&quot;,
327 &quot;executable&quot;: &quot;foo&quot;,
328 &quot;cmdLineArgs&quot;: [],
329 &quot;workflowOptions&quot;: [
330 {
331 &quot;name&quot;: &quot;jobs&quot;,
332 &quot;type&quot;: &quot;0&quot;,
333 &quot;defaultValue&quot;: &quot;4&quot;,
334 &quot;help&quot;: &quot;number of producer jobs&quot;
335 }
336 ],
337 &quot;channels&quot;: []
338 },
339 {
340 &quot;name&quot;: &quot;D&quot;,
341 &quot;executable&quot;: &quot;foo&quot;,
342 &quot;cmdLineArgs&quot;: [],
343 &quot;workflowOptions&quot;: [
344 {
345 &quot;name&quot;: &quot;jobs&quot;,
346 &quot;type&quot;: &quot;0&quot;,
347 &quot;defaultValue&quot;: &quot;4&quot;,
348 &quot;help&quot;: &quot;number of producer jobs&quot;
349 }
350 ],
351 &quot;channels&quot;: []
352 }
353 ],
354 &quot;command&quot;: &quot;foo&quot;
355}"/>
356 <decltask name="A">
357 <assets><name>dpl_json</name></assets>
358 <exe reachable="true">cat ${DDS_LOCATION}/dpl_json.asset | foo --id A_dds%TaskIndex%_%CollectionIndex% --shm-monitor false --log-color false --batch --color false --channel-config "name=from_A_to_B,type=push,method=bind,address=ipc://@localhostworkflow-id_22000,transport=shmem,rateLogging=0,rcvBufSize=1,sndBufSize=1" --channel-config "name=from_A_to_C,type=push,method=bind,address=ipc://@localhostworkflow-id_22001,transport=shmem,rateLogging=0,rcvBufSize=1,sndBufSize=1" --bad-alloc-attempt-interval 50 --bad-alloc-max-attempts 1 --early-forward-policy never --io-threads 1 --jobs 4 --severity info --shm-allocation rbtree_best_fit --shm-metadata-msg-size 0 --shm-mlock-segment false --shm-mlock-segment-on-creation false --shm-no-cleanup false --shm-segment-id 0 --shm-throw-bad-alloc true --shm-zero-segment false --signposts "" --stacktrace-on-signal simple --timeframes-rate-limit 0 --session dpl_workflow-id --plugin odc</exe>
359 </decltask>
360 <decltask name="B">
361 <assets><name>dpl_json</name></assets>
362 <exe reachable="true">cat ${DDS_LOCATION}/dpl_json.asset | foo --id B_dds%TaskIndex%_%CollectionIndex% --shm-monitor false --log-color false --batch --color false --channel-config "name=from_B_to_D,type=push,method=bind,address=ipc://@localhostworkflow-id_22002,transport=shmem,rateLogging=0,rcvBufSize=1,sndBufSize=1" --channel-config "name=from_A_to_B,type=pull,method=connect,address=ipc://@localhostworkflow-id_22000,transport=shmem,rateLogging=0,rcvBufSize=1,sndBufSize=1" --bad-alloc-attempt-interval 50 --bad-alloc-max-attempts 1 --early-forward-policy never --io-threads 1 --jobs 4 --severity info --shm-allocation rbtree_best_fit --shm-metadata-msg-size 0 --shm-mlock-segment false --shm-mlock-segment-on-creation false --shm-no-cleanup false --shm-segment-id 0 --shm-throw-bad-alloc true --shm-zero-segment false --signposts "" --stacktrace-on-signal simple --timeframes-rate-limit 0 --session dpl_workflow-id --plugin odc</exe>
363 </decltask>
364 <decltask name="C">
365 <assets><name>dpl_json</name></assets>
366 <exe reachable="true">cat ${DDS_LOCATION}/dpl_json.asset | foo --id C_dds%TaskIndex%_%CollectionIndex% --shm-monitor false --log-color false --batch --color false --channel-config "name=from_C_to_D,type=push,method=bind,address=ipc://@localhostworkflow-id_22003,transport=shmem,rateLogging=0,rcvBufSize=1,sndBufSize=1" --channel-config "name=from_A_to_C,type=pull,method=connect,address=ipc://@localhostworkflow-id_22001,transport=shmem,rateLogging=0,rcvBufSize=1,sndBufSize=1" --bad-alloc-attempt-interval 50 --bad-alloc-max-attempts 1 --early-forward-policy never --io-threads 1 --jobs 4 --severity info --shm-allocation rbtree_best_fit --shm-metadata-msg-size 0 --shm-mlock-segment false --shm-mlock-segment-on-creation false --shm-no-cleanup false --shm-segment-id 0 --shm-throw-bad-alloc true --shm-zero-segment false --signposts "" --stacktrace-on-signal simple --timeframes-rate-limit 0 --session dpl_workflow-id --plugin odc</exe>
367 </decltask>
368 <decltask name="D">
369 <assets><name>dpl_json</name></assets>
370 <exe reachable="true">cat ${DDS_LOCATION}/dpl_json.asset | foo --id D_dds%TaskIndex%_%CollectionIndex% --shm-monitor false --log-color false --batch --color false --channel-config "name=from_B_to_D,type=pull,method=connect,address=ipc://@localhostworkflow-id_22002,transport=shmem,rateLogging=0,rcvBufSize=1,sndBufSize=1" --channel-config "name=from_C_to_D,type=pull,method=connect,address=ipc://@localhostworkflow-id_22003,transport=shmem,rateLogging=0,rcvBufSize=1,sndBufSize=1" --bad-alloc-attempt-interval 50 --bad-alloc-max-attempts 1 --early-forward-policy never --io-threads 1 --jobs 4 --severity info --shm-allocation rbtree_best_fit --shm-metadata-msg-size 0 --shm-mlock-segment false --shm-mlock-segment-on-creation false --shm-no-cleanup false --shm-segment-id 0 --shm-throw-bad-alloc true --shm-zero-segment false --signposts "" --stacktrace-on-signal simple --timeframes-rate-limit 0 --a-param 1 --b-param "" --c-param "foo;bar" --session dpl_workflow-id --plugin odc</exe>
371 </decltask>
372 <declcollection name="DPL">
373 <tasks>
374 <name>A</name>
375 <name>B</name>
376 <name>C</name>
377 <name>D</name>
378 </tasks>
379 </declcollection>
380</topology>
381)EXPECTED";
382 REQUIRE(strdiffchr(ss.str().data(), expected) == nullptr);
383 REQUIRE(strdiffchr(ss.str().data(), expected) == strdiffchr(expected, ss.str().data()));
384 REQUIRE(ss.str() == expected);
385}
386
387TEST_CASE("TestDDSExpendable")
388{
389 auto workflow = defineDataProcessingExpendable();
390 std::ostringstream ss{""};
391 auto configContext = makeEmptyConfigContext();
392 auto channelPolicies = makeTrivialChannelPolicies(*configContext);
393 std::vector<DeviceSpec> devices;
394 std::vector<ComputingResource> resources{ComputingResourceHelpers::getLocalhostResource()};
395 SimpleResourceManager rm(resources);
396 auto completionPolicies = CompletionPolicy::createDefaultPolicies();
397 auto callbacksPolicies = CallbacksPolicy::createDefaultPolicies();
398 DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, callbacksPolicies, devices, rm, "workflow-id", *configContext, true);
399 std::vector<DeviceControl> controls;
400 std::vector<DeviceExecution> executions;
401 controls.resize(devices.size());
402 executions.resize(devices.size());
403
404 std::vector<ConfigParamSpec> workflowOptions = {
405 ConfigParamSpec{"jobs", VariantType::Int, 4, {"number of producer jobs"}}};
406
407 std::vector<DataProcessorInfo> dataProcessorInfos = {
408 {
409 {"A", "bcsadc/foo", {}, workflowOptions},
410 {"B", "foo", {}, workflowOptions},
411 {"C", "foo", {}, workflowOptions},
412 {"D", "foo", {}, workflowOptions},
413 }};
414 DriverConfig driverConfig = {
415 .batch = true,
416 };
417 DeviceSpecHelpers::prepareArguments(false, false, false, 8080,
418 driverConfig,
419 dataProcessorInfos,
420 devices, executions, controls, {},
421 "workflow-id");
422 CommandInfo command{"foo"};
423 DDSConfigHelpers::dumpDeviceSpec2DDS(ss, DriverMode::STANDALONE, "", workflow, dataProcessorInfos, devices, executions, command);
424 auto expected = R"EXPECTED(<topology name="o2-dataflow">
425<declrequirement name="odc_expendable_task" type="custom" value="true" />
426<asset name="dpl_json" type="inline" visibility="global" value="{
427 &quot;workflow&quot;: [
428 {
429 &quot;name&quot;: &quot;A&quot;,
430 &quot;inputs&quot;: [],
431 &quot;outputs&quot;: [
432 {
433 &quot;binding&quot;: &quot;TST/A1/0&quot;,
434 &quot;origin&quot;: &quot;TST&quot;,
435 &quot;description&quot;: &quot;A1&quot;,
436 &quot;subspec&quot;: 0,
437 &quot;lifetime&quot;: 0
438 },
439 {
440 &quot;binding&quot;: &quot;TST/A2/0&quot;,
441 &quot;origin&quot;: &quot;TST&quot;,
442 &quot;description&quot;: &quot;A2&quot;,
443 &quot;subspec&quot;: 0,
444 &quot;lifetime&quot;: 0
445 }
446 ],
447 &quot;options&quot;: [],
448 &quot;labels&quot;: [],
449 &quot;metadata&quot;: [],
450 &quot;rank&quot;: 0,
451 &quot;nSlots&quot;: 1,
452 &quot;inputTimeSliceId&quot;: 0,
453 &quot;maxInputTimeslices&quot;: 1
454 },
455 {
456 &quot;name&quot;: &quot;B&quot;,
457 &quot;inputs&quot;: [
458 {
459 &quot;binding&quot;: &quot;x&quot;,
460 &quot;origin&quot;: &quot;TST&quot;,
461 &quot;description&quot;: &quot;A1&quot;,
462 &quot;subspec&quot;: 0,
463 &quot;lifetime&quot;: 0
464 }
465 ],
466 &quot;outputs&quot;: [
467 {
468 &quot;binding&quot;: &quot;TST/B1/0&quot;,
469 &quot;origin&quot;: &quot;TST&quot;,
470 &quot;description&quot;: &quot;B1&quot;,
471 &quot;subspec&quot;: 0,
472 &quot;lifetime&quot;: 0
473 }
474 ],
475 &quot;options&quot;: [],
476 &quot;labels&quot;: [],
477 &quot;metadata&quot;: [],
478 &quot;rank&quot;: 0,
479 &quot;nSlots&quot;: 1,
480 &quot;inputTimeSliceId&quot;: 0,
481 &quot;maxInputTimeslices&quot;: 1
482 },
483 {
484 &quot;name&quot;: &quot;C&quot;,
485 &quot;inputs&quot;: [
486 {
487 &quot;binding&quot;: &quot;y&quot;,
488 &quot;origin&quot;: &quot;TST&quot;,
489 &quot;description&quot;: &quot;A2&quot;,
490 &quot;subspec&quot;: 0,
491 &quot;lifetime&quot;: 0
492 }
493 ],
494 &quot;outputs&quot;: [
495 {
496 &quot;binding&quot;: &quot;TST/C1/0&quot;,
497 &quot;origin&quot;: &quot;TST&quot;,
498 &quot;description&quot;: &quot;C1&quot;,
499 &quot;subspec&quot;: 0,
500 &quot;lifetime&quot;: 0
501 }
502 ],
503 &quot;options&quot;: [],
504 &quot;labels&quot;: [],
505 &quot;metadata&quot;: [],
506 &quot;rank&quot;: 0,
507 &quot;nSlots&quot;: 1,
508 &quot;inputTimeSliceId&quot;: 0,
509 &quot;maxInputTimeslices&quot;: 1
510 },
511 {
512 &quot;name&quot;: &quot;D&quot;,
513 &quot;inputs&quot;: [
514 {
515 &quot;binding&quot;: &quot;x&quot;,
516 &quot;origin&quot;: &quot;TST&quot;,
517 &quot;description&quot;: &quot;B1&quot;,
518 &quot;subspec&quot;: 0,
519 &quot;lifetime&quot;: 0
520 },
521 {
522 &quot;binding&quot;: &quot;y&quot;,
523 &quot;origin&quot;: &quot;TST&quot;,
524 &quot;description&quot;: &quot;C1&quot;,
525 &quot;subspec&quot;: 0,
526 &quot;lifetime&quot;: 0
527 }
528 ],
529 &quot;outputs&quot;: [],
530 &quot;options&quot;: [
531 {
532 &quot;name&quot;: &quot;a-param&quot;,
533 &quot;type&quot;: &quot;0&quot;,
534 &quot;defaultValue&quot;: &quot;1&quot;,
535 &quot;help&quot;: &quot;A parameter which should not be escaped&quot;,
536 &quot;kind&quot;: &quot;0&quot;
537 },
538 {
539 &quot;name&quot;: &quot;b-param&quot;,
540 &quot;type&quot;: &quot;4&quot;,
541 &quot;defaultValue&quot;: &quot;&quot;,
542 &quot;help&quot;: &quot;a parameter which will be escaped&quot;,
543 &quot;kind&quot;: &quot;0&quot;
544 },
545 {
546 &quot;name&quot;: &quot;c-param&quot;,
547 &quot;type&quot;: &quot;4&quot;,
548 &quot;defaultValue&quot;: &quot;foo;bar&quot;,
549 &quot;help&quot;: &quot;another parameter which will be escaped&quot;,
550 &quot;kind&quot;: &quot;0&quot;
551 }
552 ],
553 &quot;labels&quot;: [
554 &quot;expendable&quot;
555 ],
556 &quot;metadata&quot;: [],
557 &quot;rank&quot;: 0,
558 &quot;nSlots&quot;: 1,
559 &quot;inputTimeSliceId&quot;: 0,
560 &quot;maxInputTimeslices&quot;: 1
561 }
562 ],
563 &quot;metadata&quot;: [
564 {
565 &quot;name&quot;: &quot;A&quot;,
566 &quot;executable&quot;: &quot;bcsadc/foo&quot;,
567 &quot;cmdLineArgs&quot;: [],
568 &quot;workflowOptions&quot;: [
569 {
570 &quot;name&quot;: &quot;jobs&quot;,
571 &quot;type&quot;: &quot;0&quot;,
572 &quot;defaultValue&quot;: &quot;4&quot;,
573 &quot;help&quot;: &quot;number of producer jobs&quot;
574 }
575 ],
576 &quot;channels&quot;: []
577 },
578 {
579 &quot;name&quot;: &quot;B&quot;,
580 &quot;executable&quot;: &quot;foo&quot;,
581 &quot;cmdLineArgs&quot;: [],
582 &quot;workflowOptions&quot;: [
583 {
584 &quot;name&quot;: &quot;jobs&quot;,
585 &quot;type&quot;: &quot;0&quot;,
586 &quot;defaultValue&quot;: &quot;4&quot;,
587 &quot;help&quot;: &quot;number of producer jobs&quot;
588 }
589 ],
590 &quot;channels&quot;: []
591 },
592 {
593 &quot;name&quot;: &quot;C&quot;,
594 &quot;executable&quot;: &quot;foo&quot;,
595 &quot;cmdLineArgs&quot;: [],
596 &quot;workflowOptions&quot;: [
597 {
598 &quot;name&quot;: &quot;jobs&quot;,
599 &quot;type&quot;: &quot;0&quot;,
600 &quot;defaultValue&quot;: &quot;4&quot;,
601 &quot;help&quot;: &quot;number of producer jobs&quot;
602 }
603 ],
604 &quot;channels&quot;: []
605 },
606 {
607 &quot;name&quot;: &quot;D&quot;,
608 &quot;executable&quot;: &quot;foo&quot;,
609 &quot;cmdLineArgs&quot;: [],
610 &quot;workflowOptions&quot;: [
611 {
612 &quot;name&quot;: &quot;jobs&quot;,
613 &quot;type&quot;: &quot;0&quot;,
614 &quot;defaultValue&quot;: &quot;4&quot;,
615 &quot;help&quot;: &quot;number of producer jobs&quot;
616 }
617 ],
618 &quot;channels&quot;: []
619 }
620 ],
621 &quot;command&quot;: &quot;foo&quot;
622}"/>
623 <decltask name="A">
624 <assets><name>dpl_json</name></assets>
625 <exe reachable="true">cat ${DDS_LOCATION}/dpl_json.asset | foo --id A_dds%TaskIndex%_%CollectionIndex% --shm-monitor false --log-color false --batch --color false --channel-config "name=from_A_to_B,type=push,method=bind,address=ipc://@localhostworkflow-id_22000,transport=shmem,rateLogging=0,rcvBufSize=1,sndBufSize=1" --channel-config "name=from_A_to_C,type=push,method=bind,address=ipc://@localhostworkflow-id_22001,transport=shmem,rateLogging=0,rcvBufSize=1,sndBufSize=1" --bad-alloc-attempt-interval 50 --bad-alloc-max-attempts 1 --early-forward-policy never --io-threads 1 --jobs 4 --severity info --shm-allocation rbtree_best_fit --shm-metadata-msg-size 0 --shm-mlock-segment false --shm-mlock-segment-on-creation false --shm-no-cleanup false --shm-segment-id 0 --shm-throw-bad-alloc true --shm-zero-segment false --signposts "" --stacktrace-on-signal simple --timeframes-rate-limit 0 --session dpl_workflow-id --plugin odc</exe>
626 </decltask>
627 <decltask name="B">
628 <assets><name>dpl_json</name></assets>
629 <exe reachable="true">cat ${DDS_LOCATION}/dpl_json.asset | foo --id B_dds%TaskIndex%_%CollectionIndex% --shm-monitor false --log-color false --batch --color false --channel-config "name=from_B_to_D,type=push,method=bind,address=ipc://@localhostworkflow-id_22002,transport=shmem,rateLogging=0,rcvBufSize=1,sndBufSize=1" --channel-config "name=from_A_to_B,type=pull,method=connect,address=ipc://@localhostworkflow-id_22000,transport=shmem,rateLogging=0,rcvBufSize=1,sndBufSize=1" --bad-alloc-attempt-interval 50 --bad-alloc-max-attempts 1 --early-forward-policy never --io-threads 1 --jobs 4 --severity info --shm-allocation rbtree_best_fit --shm-metadata-msg-size 0 --shm-mlock-segment false --shm-mlock-segment-on-creation false --shm-no-cleanup false --shm-segment-id 0 --shm-throw-bad-alloc true --shm-zero-segment false --signposts "" --stacktrace-on-signal simple --timeframes-rate-limit 0 --session dpl_workflow-id --plugin odc</exe>
630 </decltask>
631 <decltask name="C">
632 <assets><name>dpl_json</name></assets>
633 <exe reachable="true">cat ${DDS_LOCATION}/dpl_json.asset | foo --id C_dds%TaskIndex%_%CollectionIndex% --shm-monitor false --log-color false --batch --color false --channel-config "name=from_C_to_D,type=push,method=bind,address=ipc://@localhostworkflow-id_22003,transport=shmem,rateLogging=0,rcvBufSize=1,sndBufSize=1" --channel-config "name=from_A_to_C,type=pull,method=connect,address=ipc://@localhostworkflow-id_22001,transport=shmem,rateLogging=0,rcvBufSize=1,sndBufSize=1" --bad-alloc-attempt-interval 50 --bad-alloc-max-attempts 1 --early-forward-policy never --io-threads 1 --jobs 4 --severity info --shm-allocation rbtree_best_fit --shm-metadata-msg-size 0 --shm-mlock-segment false --shm-mlock-segment-on-creation false --shm-no-cleanup false --shm-segment-id 0 --shm-throw-bad-alloc true --shm-zero-segment false --signposts "" --stacktrace-on-signal simple --timeframes-rate-limit 0 --session dpl_workflow-id --plugin odc</exe>
634 </decltask>
635 <decltask name="D">
636 <assets><name>dpl_json</name></assets>
637 <exe reachable="true">cat ${DDS_LOCATION}/dpl_json.asset | foo --id D_dds%TaskIndex%_%CollectionIndex% --shm-monitor false --log-color false --batch --color false --channel-config "name=from_B_to_D,type=pull,method=connect,address=ipc://@localhostworkflow-id_22002,transport=shmem,rateLogging=0,rcvBufSize=1,sndBufSize=1" --channel-config "name=from_C_to_D,type=pull,method=connect,address=ipc://@localhostworkflow-id_22003,transport=shmem,rateLogging=0,rcvBufSize=1,sndBufSize=1" --bad-alloc-attempt-interval 50 --bad-alloc-max-attempts 1 --early-forward-policy never --io-threads 1 --jobs 4 --severity info --shm-allocation rbtree_best_fit --shm-metadata-msg-size 0 --shm-mlock-segment false --shm-mlock-segment-on-creation false --shm-no-cleanup false --shm-segment-id 0 --shm-throw-bad-alloc true --shm-zero-segment false --signposts "" --stacktrace-on-signal simple --timeframes-rate-limit 0 --a-param 1 --b-param "" --c-param "foo;bar" --session dpl_workflow-id --plugin odc</exe>
638 <requirements>
639 <name>odc_expendable_task</name>
640 </requirements>
641 </decltask>
642 <declcollection name="DPL">
643 <tasks>
644 <name>A</name>
645 <name>B</name>
646 <name>C</name>
647 <name>D</name>
648 </tasks>
649 </declcollection>
650</topology>
651)EXPECTED";
652 REQUIRE(strdiffchr(ss.str().data(), expected) == nullptr);
653 REQUIRE(strdiffchr(ss.str().data(), expected) == strdiffchr(expected, ss.str().data()));
654 REQUIRE(ss.str() == expected);
655}
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
std::unique_ptr< ConfigContext > makeEmptyConfigContext()
GLuint GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat GLfloat s1
Definition glcorearb.h:5034
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
TEST_CASE("test_prepareArguments")
std::vector< DataProcessorSpec > WorkflowSpec
static std::vector< CallbacksPolicy > createDefaultPolicies()
static std::vector< CompletionPolicy > createDefaultPolicies()
Helper to create the default configuration.
static void dumpDeviceSpec2DDS(std::ostream &out, DriverMode mode, std::string const &workflowSuffix, std::vector< DataProcessorSpec > const &workflow, std::vector< DataProcessorInfo > const &metadata, std::vector< DeviceSpec > const &specs, std::vector< DeviceExecution > const &executions, CommandInfo const &commandInfo)
static void prepareArguments(bool defaultQuiet, bool defaultStopped, bool intereactive, unsigned short driverPort, DriverConfig const &driverConfig, std::vector< DataProcessorInfo > const &processorInfos, std::vector< DeviceSpec > const &deviceSpecs, std::vector< DeviceExecution > &deviceExecutions, std::vector< DeviceControl > &deviceControls, std::vector< ConfigParamSpec > const &detectedOptions, std::string const &uniqueWorkflowId)
static void dataProcessorSpecs2DeviceSpecs(const WorkflowSpec &workflow, std::vector< ChannelConfigurationPolicy > const &channelPolicies, std::vector< CompletionPolicy > const &completionPolicies, std::vector< DispatchPolicy > const &dispatchPolicies, std::vector< ResourcePolicy > const &resourcePolicies, std::vector< CallbacksPolicy > const &callbacksPolicies, std::vector< SendingPolicy > const &sendingPolicy, std::vector< ForwardingPolicy > const &forwardingPolicies, std::vector< DeviceSpec > &devices, ResourceManager &resourceManager, std::string const &uniqueWorkflowId, ConfigContext const &configContext, bool optimizeTopology=false, unsigned short resourcesMonitoringInterval=0, std::string const &channelPrefix="", OverrideServiceSpecs const &overrideServices={})
bool batch
Whether the driver was started in batch mode or not.
std::map< std::string, ID > expected
AlgorithmSpec simplePipe(o2::header::DataDescription what)