Project
Loading...
Searching...
No Matches
O2ControlHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "O2ControlHelpers.h"
14#include "Framework/Logger.h"
16
17#include <iostream>
18#include <cstring>
19#include <string>
20#include <string_view>
21#include <filesystem>
22#include <optional>
23#include <set>
24#include <fmt/core.h>
25
26namespace bfs = std::filesystem;
27
28namespace o2::framework
29{
30
31const char* indScheme = " ";
32
33namespace implementation
34{
35
36std::string taskName(const std::string& workflowName, const std::string& deviceName)
37{
38 return workflowName + "-" + deviceName;
39}
40
41std::optional<DataProcessorMetadata> firstMatchingMetadata(DeviceSpec const& spec, std::string_view key)
42{
43 auto sameKey = [otherKey = key](DataProcessorMetadata const& metadata) { return metadata.key == otherKey; };
44 auto result = std::find_if(spec.metadata.begin(), spec.metadata.end(), sameKey);
45 return result != spec.metadata.end() ? std::optional{*result} : std::nullopt;
46}
47
48template <typename T>
49void dumpChannelBind(std::ostream& dumpOut, const T& channel, std::string indLevel)
50{
51 dumpOut << indLevel << "- name: " << channel.name << "\n";
52 dumpOut << indLevel << indScheme << "type: " << ChannelSpecHelpers::typeAsString(channel.type) << "\n";
53 // todo: i shouldn't guess here
54 dumpOut << indLevel << indScheme << "transport: " << (channel.protocol == ChannelProtocol::IPC ? "shmem" : "zeromq") << "\n";
55 dumpOut << indLevel << indScheme << "addressing: " << (channel.protocol == ChannelProtocol::IPC ? "ipc" : "tcp") << "\n";
56 dumpOut << indLevel << indScheme << "rateLogging: \"{{ fmq_rate_logging }}\"\n";
57 dumpOut << indLevel << indScheme << "sndBufSize: " << channel.sendBufferSize << "\n";
58 dumpOut << indLevel << indScheme << "rcvBufSize: " << channel.recvBufferSize << "\n";
59}
60
61template <typename T>
62void dumpChannelConnect(std::ostream& dumpOut, const T& channel, const std::string& binderName, std::string indLevel)
63{
64 dumpOut << indLevel << "- name: " << channel.name << "\n";
65 dumpOut << indLevel << indScheme << "type: " << ChannelSpecHelpers::typeAsString(channel.type) << "\n";
66 // todo: i shouldn't guess here
67 dumpOut << indLevel << indScheme << "transport: " << (channel.protocol == ChannelProtocol::IPC ? "shmem" : "zeromq") << "\n";
68 dumpOut << indLevel << indScheme << "target: \"{{ Parent().Path }}." << binderName << ":" << channel.name << "\"\n";
69 dumpOut << indLevel << indScheme << "rateLogging: \"{{ fmq_rate_logging }}\"\n";
70 dumpOut << indLevel << indScheme << "sndBufSize: " << channel.sendBufferSize << "\n";
71 dumpOut << indLevel << indScheme << "rcvBufSize: " << channel.recvBufferSize << "\n";
72}
73
74struct RawChannel {
75 std::string_view name;
76 std::string_view type;
77 std::string_view method;
78 std::string_view address;
79 std::string_view rateLogging;
80 std::string_view transport;
81 std::string_view sndBufSize;
82 std::string_view rcvBufSize;
83};
84
85std::string rawChannelReference(std::string_view channelName, bool isUniqueChannel)
86{
87 if (isUniqueChannel) {
88 return std::string(channelName);
89 } else {
90 return std::string(channelName) + "-{{ it }}";
91 }
92}
93
94void dumpRawChannelConnect(std::ostream& dumpOut, const RawChannel& channel, bool isUniqueChannel, bool preserveRawChannels, std::string indLevel)
95{
96
97 dumpOut << indLevel << "- name: " << channel.name << "\n";
98 dumpOut << indLevel << indScheme << "type: " << channel.type << "\n";
99 dumpOut << indLevel << indScheme << "transport: " << channel.transport << "\n";
100 if (preserveRawChannels) {
101 dumpOut << indLevel << indScheme << "target: \"" << channel.address << "\"\n";
102 LOG(info) << "This workflow will connect to the channel '" << channel.name << "', which is most likely bound outside."
103 << " Please make sure it is available under the address '" << channel.address
104 << "' in the mother workflow or another subworkflow.";
105 } else {
106 auto channelRef = rawChannelReference(channel.name, isUniqueChannel);
107 LOG(info) << "This workflow will connect to the channel '" << channel.name << "', which is most likely bound outside."
108 << " Please make sure it is declared in the global channel space under the name '" << channelRef
109 << "' in the mother workflow or another subworkflow.";
110 dumpOut << indLevel << indScheme << "target: \"::" << channelRef << "\"\n";
111 }
112 dumpOut << indLevel << indScheme << "rateLogging: \"{{ fmq_rate_logging }}\"\n";
113 if (!channel.sndBufSize.empty()) {
114 dumpOut << indLevel << indScheme << "sndBufSize: " << channel.sndBufSize << "\n";
115 }
116 if (!channel.rcvBufSize.empty()) {
117 dumpOut << indLevel << indScheme << "rcvBufSize: " << channel.rcvBufSize << "\n";
118 }
119}
120
121void dumpRawChannelBind(std::ostream& dumpOut, const RawChannel& channel, bool isUniqueChannel, bool preserveRawChannels, std::string indLevel)
122{
123 dumpOut << indLevel << "- name: " << channel.name << "\n";
124 dumpOut << indLevel << indScheme << "type: " << channel.type << "\n";
125 dumpOut << indLevel << indScheme << "transport: " << channel.transport << "\n";
126 dumpOut << indLevel << indScheme << "addressing: " << (channel.address.find("ipc") != std::string_view::npos ? "ipc" : "tcp") << "\n";
127 dumpOut << indLevel << indScheme << "rateLogging: \"{{ fmq_rate_logging }}\"\n";
128 if (preserveRawChannels) {
129 LOG(info) << "This workflow will bind a dangling channel '" << channel.name << "'"
130 << " with the address '" << channel.address << "'."
131 << " Please make sure that another device connects to this channel elsewhere."
132 << " Also, don't mind seeing the message twice, it will be addressed in future releases.";
133 dumpOut << indLevel << indScheme << "target: \"" << channel.address << "\"\n";
134 } else {
135 auto channelRef = rawChannelReference(channel.name, isUniqueChannel);
136 LOG(info) << "This workflow will bind a dangling channel '" << channel.name << "'"
137 << " and declare it in the global channel space under the name '" << channelRef << "'."
138 << " Please make sure that another device connects to this channel elsewhere."
139 << " Also, don't mind seeing the message twice, it will be addressed in future releases.";
140 dumpOut << indLevel << indScheme << "global: \"" << channelRef << "\"\n";
141 }
142 if (!channel.sndBufSize.empty()) {
143 dumpOut << indLevel << indScheme << "sndBufSize: " << channel.sndBufSize << "\n";
144 }
145 if (!channel.rcvBufSize.empty()) {
146 dumpOut << indLevel << indScheme << "rcvBufSize: " << channel.rcvBufSize << "\n";
147 }
148}
149
150std::string_view extractValueFromChannelConfig(std::string_view string, std::string_view token)
151{
152 size_t tokenStart = string.find(token);
153 if (tokenStart == std::string_view::npos) {
154 return {};
155 }
156 size_t valueStart = tokenStart + token.size();
157 if (valueStart >= string.size()) {
158 return {};
159 }
160 size_t valueEnd = string.find(',', valueStart);
161 return valueEnd == std::string_view::npos
162 ? string.substr(valueStart, string.size() - valueStart)
163 : string.substr(valueStart, valueEnd - valueStart);
164}
165
166// fixme: For now we extract information about raw FairMQ channels from execution.
167// However, we risk that it break if a channel configuration method changes,
168// thus this information should be provided in DeviceSpec. Find a way to do that.
169std::vector<RawChannel> extractRawChannels(const DeviceSpec& spec, const DeviceExecution& execution)
170{
171 std::vector<std::string> dplChannels;
172 for (const auto& channel : spec.inputChannels) {
173 dplChannels.emplace_back(channel.name);
174 }
175 for (const auto& channel : spec.outputChannels) {
176 dplChannels.emplace_back(channel.name);
177 }
178
179 std::vector<RawChannel> rawChannels;
180 for (size_t i = 0; i < execution.args.size(); i++) {
181 if (execution.args[i] != nullptr && strcmp(execution.args[i], "--channel-config") == 0 && i + 1 < execution.args.size()) {
182 auto channelConfig = execution.args[i + 1];
183 auto channelName = extractValueFromChannelConfig(channelConfig, "name=");
184 if (std::find(dplChannels.begin(), dplChannels.end(), channelName) == dplChannels.end()) {
185 // "name=readout-proxy,type=pair,method=connect,address=ipc:///tmp/readout-pipe-0,rateLogging=1,transport=shmem"
186 rawChannels.push_back({channelName,
187 extractValueFromChannelConfig(channelConfig, "type="),
188 extractValueFromChannelConfig(channelConfig, "method="),
189 extractValueFromChannelConfig(channelConfig, "address="),
190 extractValueFromChannelConfig(channelConfig, "rateLogging="),
191 extractValueFromChannelConfig(channelConfig, "transport="),
192 extractValueFromChannelConfig(channelConfig, "sndBufSize="),
193 extractValueFromChannelConfig(channelConfig, "rcvBufSize=")});
194 }
195 }
196 }
197 return rawChannels;
198}
199
200bool isUniqueProxy(const DeviceSpec& spec)
201{
202 return std::find(spec.labels.begin(), spec.labels.end(), ecs::uniqueProxyLabel) != spec.labels.end();
203}
204
205bool shouldPreserveRawChannels(const DeviceSpec& spec)
206{
207 return std::find(spec.labels.begin(), spec.labels.end(), ecs::preserveRawChannelsLabel) != spec.labels.end();
208}
209
210bool isCritical(const DeviceSpec& spec)
211{
212 // DPL's expendable Data Processor corresponds to a non-critical task in ECS
213 // DPL's resilient Data Processor corresponds to a critical task in ECS
214 // All tasks are considered critical by default in ECS
215 return std::find(spec.labels.begin(), spec.labels.end(), DataProcessorLabel{"expendable"}) == spec.labels.end();
216}
217
218void dumpCommand(std::ostream& dumpOut, const DeviceExecution& execution, std::string indLevel)
219{
220 dumpOut << indLevel << "shell: true\n";
221 dumpOut << indLevel << "stdout: \"{{ log_task_stdout }}\"\n";
222 dumpOut << indLevel << "stderr: \"{{ log_task_stderr }}\"\n";
223 dumpOut << indLevel << "env:\n";
224 dumpOut << indLevel << indLevel << "- O2_DETECTOR={{ detector }}\n";
225 dumpOut << indLevel << indLevel << "- O2_PARTITION={{ environment_id }}\n";
226 dumpOut << indLevel << indLevel << "- HOME=/tmp\n";
227
228 // Dump all the environment variables
229 for (auto& env : execution.environ) {
230 dumpOut << indLevel << indLevel << "- " << env << "\n";
231 }
232 dumpOut << indLevel << "user: \"{{ user }}\"\n";
233 dumpOut << indLevel << "value: \"{{ len(modulepath)>0 ? _module_cmdline : _plain_cmdline }}\"\n";
234
235 dumpOut << indLevel << "arguments:\n";
236 dumpOut << indLevel << indScheme << "- \"-b\"\n";
237 dumpOut << indLevel << indScheme << "- \"--exit-transition-timeout\"\n";
238 dumpOut << indLevel << indScheme << "- \"'{{ exit_transition_timeout }}'\"\n";
239 dumpOut << indLevel << indScheme << "- \"--data-processing-timeout\"\n";
240 dumpOut << indLevel << indScheme << "- \"'{{ data_processing_timeout }}'\"\n";
241 dumpOut << indLevel << indScheme << "- \"--monitoring-backend\"\n";
242 dumpOut << indLevel << indScheme << "- \"'{{ monitoring_dpl_url }}'\"\n";
243 dumpOut << indLevel << indScheme << "- \"--session\"\n";
244 dumpOut << indLevel << indScheme << "- \"'{{ session_id }}'\"\n";
245 dumpOut << indLevel << indScheme << "- \"--infologger-severity\"\n";
246 dumpOut << indLevel << indScheme << "- \"'{{ infologger_severity }}'\"\n";
247 dumpOut << indLevel << indScheme << "- \"--infologger-mode\"\n";
248 dumpOut << indLevel << indScheme << "- \"'{{ infologger_mode }}'\"\n";
249 dumpOut << indLevel << indScheme << "- \"--driver-client-backend\"\n";
250 dumpOut << indLevel << indScheme << "- \"'stdout://'\"\n";
251 dumpOut << indLevel << indScheme << "- \"--shm-segment-size\"\n";
252 dumpOut << indLevel << indScheme << "- \"'{{ shm_segment_size }}'\"\n";
253 dumpOut << indLevel << indScheme << "- \"--shm-throw-bad-alloc\"\n";
254 dumpOut << indLevel << indScheme << "- \"'{{ shm_throw_bad_alloc }}'\"\n";
255 dumpOut << indLevel << indScheme << "- \"--resources-monitoring\"\n";
256 dumpOut << indLevel << indScheme << "- \"'{{ resources_monitoring }}'\"\n";
257
258 for (size_t ai = 1; ai < execution.args.size(); ++ai) {
259 const char* option = execution.args[ai];
260 const char* value = nullptr; // no value by default (i.e. a boolean)
261 // If the subsequent option exists and does not start with -, we assume
262 // it is an argument to the previous one.
263 // ...that is unless it is a "-1" for example.
264 if (ai + 1 < execution.args.size() && execution.args[ai + 1][0] != '-') {
265 value = execution.args[ai + 1];
266 ai++;
267 }
268 if (!option) {
269 break;
270 }
271
272 static const std::set<std::string> omitOptions = {
273 "--channel-config", "--o2-control", "--control", "--session", "--monitoring-backend",
274 "-b", "--color", "--infologger-severity", "--infologger-mode", "--driver-client-backend",
275 "--shm-segment-size", "--shm-throw-bad-alloc", "--resources-monitoring"};
276 if (omitOptions.find(option) != omitOptions.end()) {
277 continue;
278 }
279 // todo: possible improvement - do not print if default values are used
280 // todo: check if '' are there already.
281 dumpOut << indLevel << indScheme << R"(- ")" << option << "\"\n";
282 if (value) {
283 dumpOut << indLevel << indScheme << R"(- )" << fmt::format("{:?}", fmt::format("'{}'", value)) << "\n";
284 }
285 }
286}
287
288std::string findBinder(const std::vector<DeviceSpec>& specs, const std::string& channel)
289{
290 // fixme: it is not crucial to be fast here, but ideally we should check only input OR output channels.
291 for (const auto& spec : specs) {
292 for (const auto& inputChannel : spec.inputChannels) {
293 if (inputChannel.method == ChannelMethod::Bind && inputChannel.name == channel) {
294 return spec.id;
295 }
296 }
297 for (const auto& outputChannel : spec.outputChannels) {
298 if (outputChannel.method == ChannelMethod::Bind && outputChannel.name == channel) {
299 return spec.id;
300 }
301 }
302 }
303 throw std::runtime_error("Could not find a device which binds the '" + channel + "' channel.");
304}
305
306void dumpRole(std::ostream& dumpOut, const std::string& taskName, const DeviceSpec& spec, const std::vector<DeviceSpec>& allSpecs, const DeviceExecution& execution, const std::string indLevel)
307{
308 dumpOut << indLevel << "- name: \"" << spec.id << "\"\n";
309
310 dumpOut << indLevel << indScheme << "connect:\n";
311
312 for (const auto& outputChannel : spec.outputChannels) {
313 if (outputChannel.method == ChannelMethod::Connect) {
314 dumpChannelConnect(dumpOut, outputChannel, findBinder(allSpecs, outputChannel.name), indLevel + indScheme);
315 }
316 }
317 for (const auto& inputChannel : spec.inputChannels) {
318 if (inputChannel.method == ChannelMethod::Connect) {
319 dumpChannelConnect(dumpOut, inputChannel, findBinder(allSpecs, inputChannel.name), indLevel + indScheme);
320 }
321 }
322 bool uniqueProxy = isUniqueProxy(spec);
323 bool preserveRawChannels = shouldPreserveRawChannels(spec);
324 bool bindsRawChannels = false;
325 auto rawChannels = extractRawChannels(spec, execution);
326 for (const auto& rawChannel : rawChannels) {
327 if (rawChannel.method == "connect") {
328 dumpRawChannelConnect(dumpOut, rawChannel, uniqueProxy, preserveRawChannels, indLevel + indScheme);
329 } else if (rawChannel.method == "bind") {
330 bindsRawChannels = true;
331 }
332 }
333
334 // for the time being we have to publish global bound channels also in WFT
335 if (bindsRawChannels) {
336 dumpOut << indLevel << indScheme << "bind:\n";
337 for (const auto& rawChannel : rawChannels) {
338 if (rawChannel.method == "bind") {
339 dumpRawChannelBind(dumpOut, rawChannel, uniqueProxy, preserveRawChannels, indLevel + indScheme);
340 }
341 }
342 }
343
344 dumpOut << indLevel << indScheme << "task:\n";
345 dumpOut << indLevel << indScheme << indScheme << "load: " << taskName << "\n";
346 dumpOut << indLevel << indScheme << indScheme << "critical: " << (isCritical(spec) ? "true" : "false") << "\n";
347}
348
349std::string removeO2ControlArg(std::string_view command)
350{
351 const char* o2ControlArg = " --o2-control ";
352 size_t o2ControlArgStart = command.find(o2ControlArg);
353 if (o2ControlArgStart == std::string_view::npos) {
354 return std::string(command);
355 }
356 size_t o2ControlArgEnd = command.find(" ", o2ControlArgStart + std::strlen(o2ControlArg));
357 auto result = std::string(command.substr(0, o2ControlArgStart));
358 if (o2ControlArgEnd != std::string_view::npos) {
359 result += command.substr(o2ControlArgEnd);
360 }
361 return result;
362}
363
364} // namespace implementation
365
366void dumpTask(std::ostream& dumpOut, const DeviceSpec& spec, const DeviceExecution& execution, std::string taskName, std::string indLevel)
367{
368 dumpOut << indLevel << "name: " << taskName << "\n";
369 dumpOut << indLevel << "defaults:\n";
370 dumpOut << indLevel << indScheme << "log_task_stdout: none\n";
371 dumpOut << indLevel << indScheme << "log_task_stderr: none\n";
372 std::string exitTransitionTimeout = "15"; // Allow 15 seconds to finish processing and calibrations
373 std::string dataProcessingTimeout = "10"; // Allow only ten seconds to finish processing
374 if (execution.args.size() > 2) {
375 for (size_t i = 0; i < execution.args.size() - 1; ++i) {
376 if (strcmp(execution.args[i], "--exit-transition-timeout") == 0) {
377 exitTransitionTimeout = execution.args[i + 1];
378 }
379 if (strcmp(execution.args[i], "--data-processing-timeout") == 0) {
380 dataProcessingTimeout = execution.args[i + 1];
381 }
382 }
383 }
384 dumpOut << indLevel << indScheme << "exit_transition_timeout: " << exitTransitionTimeout << "\n";
385 dumpOut << indLevel << indScheme << "data_processing_timeout: " << dataProcessingTimeout << "\n";
386
387 if (bfs::path(execution.args[0]).filename().string() != execution.args[0]) {
388 LOG(warning) << "The workflow template generation was started with absolute or relative executables paths."
389 " Please use the symlinks exported by the build infrastructure or remove the paths manually in the generated templates,"
390 " unless you really need executables within concrete directories";
391 }
392 dumpOut << indLevel << indScheme << "_module_cmdline: >-\n";
393 dumpOut << indLevel << indScheme << indScheme << "source /etc/profile.d/modules.sh && MODULEPATH={{ modulepath }} module load O2 QualityControl Control-OCCPlugin &&\n";
394 dumpOut << indLevel << indScheme << indScheme << "{{ dpl_command }} | " << execution.args[0] << "\n";
395 dumpOut << indLevel << indScheme << "_plain_cmdline: >-\n";
396 dumpOut << indLevel << indScheme << indScheme << "source /etc/profile.d/o2.sh && {{ len(extra_env_vars)>0 ? 'export ' + extra_env_vars + ' &&' : '' }} {{ dpl_command }} | " << execution.args[0] << "\n";
397
398 dumpOut << indLevel << "control:\n";
399 dumpOut << indLevel << indScheme << "mode: \"fairmq\"\n";
400
401 // todo: find out proper values perhaps...
402 dumpOut << indLevel << "wants:\n";
403 dumpOut << indLevel << indScheme << "cpu: 0.01\n";
404 dumpOut << indLevel << indScheme << "memory: 1\n";
405
406 auto cpuKillThreshold = implementation::firstMatchingMetadata(spec, ecs::cpuKillThreshold);
407 auto privateMemoryKillThresholdMB = implementation::firstMatchingMetadata(spec, ecs::privateMemoryKillThresholdMB);
408 if (cpuKillThreshold.has_value() || privateMemoryKillThresholdMB.has_value()) {
409 dumpOut << indLevel << "limits:\n";
410 if (cpuKillThreshold.has_value()) {
411 dumpOut << indLevel << indScheme << "cpu: " << cpuKillThreshold.value().value << '\n';
412 }
413 if (privateMemoryKillThresholdMB.has_value()) {
414 dumpOut << indLevel << indScheme << "memory: " << privateMemoryKillThresholdMB.value().value << '\n';
415 }
416 }
417
418 dumpOut << indLevel << "bind:\n";
419 for (const auto& outputChannel : spec.outputChannels) {
420 if (outputChannel.method == ChannelMethod::Bind) {
421 implementation::dumpChannelBind(dumpOut, outputChannel, indLevel + indScheme);
422 }
423 }
424 for (const auto& inputChannel : spec.inputChannels) {
425 if (inputChannel.method == ChannelMethod::Bind) {
426 implementation::dumpChannelBind(dumpOut, inputChannel, indLevel + indScheme);
427 }
428 }
429 bool uniqueProxy = implementation::isUniqueProxy(spec);
430 bool preserveRawChannels = implementation::shouldPreserveRawChannels(spec);
431 auto rawChannels = implementation::extractRawChannels(spec, execution);
432 for (const auto& rawChannel : rawChannels) {
433 if (rawChannel.method == "bind") {
434 dumpRawChannelBind(dumpOut, rawChannel, uniqueProxy, preserveRawChannels, indLevel + indScheme);
435 }
436 }
437
438 dumpOut << indLevel << "command:\n";
439 implementation::dumpCommand(dumpOut, execution, indLevel + indScheme);
440}
441
442void dumpWorkflow(std::ostream& dumpOut, const std::vector<DeviceSpec>& specs, const std::vector<DeviceExecution>& executions, const CommandInfo& commandInfo, std::string workflowName, std::string indLevel)
443{
444 dumpOut << indLevel << "name: " << workflowName << "\n";
445
446 dumpOut << indLevel << "vars:\n";
447 dumpOut << indLevel << indScheme << "dpl_command: >-\n";
448 dumpOut << indLevel << indScheme << indScheme << implementation::removeO2ControlArg(commandInfo.command) << "\n";
449
450 dumpOut << indLevel << "defaults:\n";
451 dumpOut << indLevel << indScheme << "monitoring_dpl_url: \"no-op://\"\n";
452 dumpOut << indLevel << indScheme << "user: \"flp\"\n";
453 dumpOut << indLevel << indScheme << "fmq_rate_logging: 0\n";
454 dumpOut << indLevel << indScheme << "shm_segment_size: 10000000000\n";
455 dumpOut << indLevel << indScheme << "shm_throw_bad_alloc: false\n";
456 dumpOut << indLevel << indScheme << "session_id: default\n";
457 dumpOut << indLevel << indScheme << "resources_monitoring: 15\n";
458
459 dumpOut << indLevel << "roles:\n";
460 for (size_t di = 0; di < specs.size(); di++) {
461 auto& spec = specs[di];
462 auto& execution = executions[di];
463 implementation::dumpRole(dumpOut, implementation::taskName(workflowName, spec.id), spec, specs, execution, indLevel + indScheme);
464 }
465}
466
467void dumpDeviceSpec2O2Control(std::string workflowName,
468 const std::vector<DeviceSpec>& specs,
469 const std::vector<DeviceExecution>& executions,
470 const CommandInfo& commandInfo)
471{
472 const char* tasksDirectory = "tasks";
473 const char* workflowsDirectory = "workflows";
474
475 LOG(info) << "Dumping the workflow configuration for AliECS.";
476
477 LOG(info) << "Creating directories '" << workflowsDirectory << "' and '" << tasksDirectory << "'.";
478 std::filesystem::create_directory(workflowsDirectory);
479 std::filesystem::create_directory(tasksDirectory);
480 LOG(info) << "... created.";
481
482 assert(specs.size() == executions.size());
483
484 LOG(info) << "Creating a workflow dump '" + workflowName + "'.";
485 std::string wfDumpPath = std::string(workflowsDirectory) + bfs::path::preferred_separator + workflowName + ".yaml";
486 std::ofstream wfDump(wfDumpPath);
487 dumpWorkflow(wfDump, specs, executions, commandInfo, workflowName, "");
488 wfDump.close();
489
490 for (size_t di = 0; di < specs.size(); ++di) {
491 auto& spec = specs[di];
492 auto& execution = executions[di];
493
494 LOG(info) << "Creating a task dump for '" + spec.id + "'.";
495 std::string taskName = implementation::taskName(workflowName, spec.id);
496 std::string taskDumpPath = std::string(tasksDirectory) + bfs::path::preferred_separator + taskName + ".yaml";
497 std::ofstream taskDump(taskDumpPath);
498 dumpTask(taskDump, spec, execution, taskName, "");
499 taskDump.close();
500 LOG(info) << "...created.";
501 }
502}
503
504} // namespace o2::framework
int32_t i
StringRef key
GLuint GLuint64EXT address
Definition glcorearb.h:5846
GLuint64EXT * result
Definition glcorearb.h:5662
GLsizeiptr size
Definition glcorearb.h:659
GLuint const GLchar * name
Definition glcorearb.h:781
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
const decltype(DataProcessorMetadata::key) privateMemoryKillThresholdMB
const decltype(DataProcessorMetadata::key) cpuKillThreshold
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
void dumpTask(std::ostream &dumpOut, const DeviceSpec &spec, const DeviceExecution &execution, std::string taskName, std::string indLevel)
Dumps only one task.
void dumpWorkflow(std::ostream &dumpOut, const std::vector< DeviceSpec > &specs, const std::vector< DeviceExecution > &executions, const CommandInfo &commandInfo, std::string workflowName, std::string indLevel)
Dumps only the workflow file.
void dumpDeviceSpec2O2Control(std::string workflowName, std::vector< DeviceSpec > const &specs, std::vector< DeviceExecution > const &executions, CommandInfo const &commandInfo)
Dumps the AliECS compatible workflow and task templates for a DPL workflow.
constexpr const char * channelName(int channel)
Definition Constants.h:318
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"