26namespace bfs = std::filesystem;
31const char* indScheme =
" ";
33namespace implementation
36std::string taskName(
const std::string& workflowName,
const std::string& deviceName)
38 return workflowName +
"-" + deviceName;
41std::optional<DataProcessorMetadata> firstMatchingMetadata(DeviceSpec
const& spec, std::string_view
key)
43 auto sameKey = [otherKey =
key](DataProcessorMetadata
const& metadata) {
return metadata.key == otherKey; };
44 auto result = std::find_if(spec.metadata.begin(), spec.metadata.end(), sameKey);
45 return result != spec.metadata.end() ? std::optional{*
result} : std::nullopt;
49void dumpChannelBind(std::ostream& dumpOut,
const T& channel, std::string indLevel)
51 dumpOut << indLevel <<
"- name: " << channel.name <<
"\n";
52 dumpOut << indLevel << indScheme <<
"type: " << ChannelSpecHelpers::typeAsString(channel.type) <<
"\n";
54 dumpOut << indLevel << indScheme <<
"transport: " << (channel.protocol == ChannelProtocol::IPC ?
"shmem" :
"zeromq") <<
"\n";
55 dumpOut << indLevel << indScheme <<
"addressing: " << (channel.protocol == ChannelProtocol::IPC ?
"ipc" :
"tcp") <<
"\n";
56 dumpOut << indLevel << indScheme <<
"rateLogging: \"{{ fmq_rate_logging }}\"\n";
57 dumpOut << indLevel << indScheme <<
"sndBufSize: " << channel.sendBufferSize <<
"\n";
58 dumpOut << indLevel << indScheme <<
"rcvBufSize: " << channel.recvBufferSize <<
"\n";
62void dumpChannelConnect(std::ostream& dumpOut,
const T& channel,
const std::string& binderName, std::string indLevel)
64 dumpOut << indLevel <<
"- name: " << channel.name <<
"\n";
65 dumpOut << indLevel << indScheme <<
"type: " << ChannelSpecHelpers::typeAsString(channel.type) <<
"\n";
67 dumpOut << indLevel << indScheme <<
"transport: " << (channel.protocol == ChannelProtocol::IPC ?
"shmem" :
"zeromq") <<
"\n";
68 dumpOut << indLevel << indScheme <<
"target: \"{{ Parent().Path }}." << binderName <<
":" << channel.name <<
"\"\n";
69 dumpOut << indLevel << indScheme <<
"rateLogging: \"{{ fmq_rate_logging }}\"\n";
70 dumpOut << indLevel << indScheme <<
"sndBufSize: " << channel.sendBufferSize <<
"\n";
71 dumpOut << indLevel << indScheme <<
"rcvBufSize: " << channel.recvBufferSize <<
"\n";
75 std::string_view
name;
76 std::string_view
type;
77 std::string_view method;
79 std::string_view rateLogging;
80 std::string_view transport;
81 std::string_view sndBufSize;
82 std::string_view rcvBufSize;
85std::string rawChannelReference(std::string_view channelName,
bool isUniqueChannel)
87 if (isUniqueChannel) {
88 return std::string(channelName);
90 return std::string(channelName) +
"-{{ it }}";
94void dumpRawChannelConnect(std::ostream& dumpOut,
const RawChannel& channel,
bool isUniqueChannel,
bool preserveRawChannels, std::string indLevel)
97 dumpOut << indLevel <<
"- name: " << channel.name <<
"\n";
98 dumpOut << indLevel << indScheme <<
"type: " << channel.type <<
"\n";
99 dumpOut << indLevel << indScheme <<
"transport: " << channel.transport <<
"\n";
100 if (preserveRawChannels) {
101 dumpOut << indLevel << indScheme <<
"target: \"" << channel.address <<
"\"\n";
102 LOG(info) <<
"This workflow will connect to the channel '" << channel.name <<
"', which is most likely bound outside."
103 <<
" Please make sure it is available under the address '" << channel.address
104 <<
"' in the mother workflow or another subworkflow.";
106 auto channelRef = rawChannelReference(channel.name, isUniqueChannel);
107 LOG(info) <<
"This workflow will connect to the channel '" << channel.name <<
"', which is most likely bound outside."
108 <<
" Please make sure it is declared in the global channel space under the name '" << channelRef
109 <<
"' in the mother workflow or another subworkflow.";
110 dumpOut << indLevel << indScheme <<
"target: \"::" << channelRef <<
"\"\n";
112 dumpOut << indLevel << indScheme <<
"rateLogging: \"{{ fmq_rate_logging }}\"\n";
113 if (!channel.sndBufSize.empty()) {
114 dumpOut << indLevel << indScheme <<
"sndBufSize: " << channel.sndBufSize <<
"\n";
116 if (!channel.rcvBufSize.empty()) {
117 dumpOut << indLevel << indScheme <<
"rcvBufSize: " << channel.rcvBufSize <<
"\n";
121void dumpRawChannelBind(std::ostream& dumpOut,
const RawChannel& channel,
bool isUniqueChannel,
bool preserveRawChannels, std::string indLevel)
123 dumpOut << indLevel <<
"- name: " << channel.name <<
"\n";
124 dumpOut << indLevel << indScheme <<
"type: " << channel.type <<
"\n";
125 dumpOut << indLevel << indScheme <<
"transport: " << channel.transport <<
"\n";
126 dumpOut << indLevel << indScheme <<
"addressing: " << (channel.address.find(
"ipc") != std::string_view::npos ?
"ipc" :
"tcp") <<
"\n";
127 dumpOut << indLevel << indScheme <<
"rateLogging: \"{{ fmq_rate_logging }}\"\n";
128 if (preserveRawChannels) {
129 LOG(info) <<
"This workflow will bind a dangling channel '" << channel.name <<
"'"
130 <<
" with the address '" << channel.address <<
"'."
131 <<
" Please make sure that another device connects to this channel elsewhere."
132 <<
" Also, don't mind seeing the message twice, it will be addressed in future releases.";
133 dumpOut << indLevel << indScheme <<
"target: \"" << channel.address <<
"\"\n";
135 auto channelRef = rawChannelReference(channel.name, isUniqueChannel);
136 LOG(info) <<
"This workflow will bind a dangling channel '" << channel.name <<
"'"
137 <<
" and declare it in the global channel space under the name '" << channelRef <<
"'."
138 <<
" Please make sure that another device connects to this channel elsewhere."
139 <<
" Also, don't mind seeing the message twice, it will be addressed in future releases.";
140 dumpOut << indLevel << indScheme <<
"global: \"" << channelRef <<
"\"\n";
142 if (!channel.sndBufSize.empty()) {
143 dumpOut << indLevel << indScheme <<
"sndBufSize: " << channel.sndBufSize <<
"\n";
145 if (!channel.rcvBufSize.empty()) {
146 dumpOut << indLevel << indScheme <<
"rcvBufSize: " << channel.rcvBufSize <<
"\n";
150std::string_view extractValueFromChannelConfig(std::string_view
string, std::string_view token)
152 size_t tokenStart =
string.find(token);
153 if (tokenStart == std::string_view::npos) {
156 size_t valueStart = tokenStart + token.size();
157 if (valueStart >=
string.
size()) {
160 size_t valueEnd =
string.find(
',', valueStart);
161 return valueEnd == std::string_view::npos
162 ?
string.substr(valueStart,
string.
size() - valueStart)
163 :
string.substr(valueStart, valueEnd - valueStart);
169std::vector<RawChannel> extractRawChannels(
const DeviceSpec& spec,
const DeviceExecution& execution)
171 std::vector<std::string> dplChannels;
172 for (
const auto& channel : spec.inputChannels) {
173 dplChannels.emplace_back(channel.name);
175 for (
const auto& channel : spec.outputChannels) {
176 dplChannels.emplace_back(channel.name);
179 std::vector<RawChannel> rawChannels;
180 for (
size_t i = 0;
i < execution.args.size();
i++) {
181 if (execution.args[
i] !=
nullptr && strcmp(execution.args[
i],
"--channel-config") == 0 &&
i + 1 < execution.args.size()) {
182 auto channelConfig = execution.args[
i + 1];
183 auto channelName = extractValueFromChannelConfig(channelConfig,
"name=");
184 if (std::find(dplChannels.begin(), dplChannels.end(), channelName) == dplChannels.end()) {
187 extractValueFromChannelConfig(channelConfig,
"type="),
188 extractValueFromChannelConfig(channelConfig,
"method="),
189 extractValueFromChannelConfig(channelConfig,
"address="),
190 extractValueFromChannelConfig(channelConfig,
"rateLogging="),
191 extractValueFromChannelConfig(channelConfig,
"transport="),
192 extractValueFromChannelConfig(channelConfig,
"sndBufSize="),
193 extractValueFromChannelConfig(channelConfig,
"rcvBufSize=")});
200bool isUniqueProxy(
const DeviceSpec& spec)
202 return std::find(spec.labels.begin(), spec.labels.end(), ecs::uniqueProxyLabel) != spec.labels.end();
205bool shouldPreserveRawChannels(
const DeviceSpec& spec)
207 return std::find(spec.labels.begin(), spec.labels.end(), ecs::preserveRawChannelsLabel) != spec.labels.end();
210bool isCritical(
const DeviceSpec& spec)
215 return std::find(spec.labels.begin(), spec.labels.end(), DataProcessorLabel{
"expendable"}) == spec.labels.end();
218void dumpCommand(std::ostream& dumpOut,
const DeviceExecution& execution, std::string indLevel)
220 dumpOut << indLevel <<
"shell: true\n";
221 dumpOut << indLevel <<
"stdout: \"{{ log_task_stdout }}\"\n";
222 dumpOut << indLevel <<
"stderr: \"{{ log_task_stderr }}\"\n";
223 dumpOut << indLevel <<
"env:\n";
224 dumpOut << indLevel << indLevel <<
"- O2_DETECTOR={{ detector }}\n";
225 dumpOut << indLevel << indLevel <<
"- O2_PARTITION={{ environment_id }}\n";
226 dumpOut << indLevel << indLevel <<
"- HOME=/tmp\n";
229 for (
auto& env : execution.environ) {
230 dumpOut << indLevel << indLevel <<
"- " << env <<
"\n";
232 dumpOut << indLevel <<
"user: \"{{ user }}\"\n";
233 dumpOut << indLevel <<
"value: \"{{ len(modulepath)>0 ? _module_cmdline : _plain_cmdline }}\"\n";
235 dumpOut << indLevel <<
"arguments:\n";
236 dumpOut << indLevel << indScheme <<
"- \"-b\"\n";
237 dumpOut << indLevel << indScheme <<
"- \"--exit-transition-timeout\"\n";
238 dumpOut << indLevel << indScheme <<
"- \"'{{ exit_transition_timeout }}'\"\n";
239 dumpOut << indLevel << indScheme <<
"- \"--data-processing-timeout\"\n";
240 dumpOut << indLevel << indScheme <<
"- \"'{{ data_processing_timeout }}'\"\n";
241 dumpOut << indLevel << indScheme <<
"- \"--monitoring-backend\"\n";
242 dumpOut << indLevel << indScheme <<
"- \"'{{ monitoring_dpl_url }}'\"\n";
243 dumpOut << indLevel << indScheme <<
"- \"--session\"\n";
244 dumpOut << indLevel << indScheme <<
"- \"'{{ session_id }}'\"\n";
245 dumpOut << indLevel << indScheme <<
"- \"--infologger-severity\"\n";
246 dumpOut << indLevel << indScheme <<
"- \"'{{ infologger_severity }}'\"\n";
247 dumpOut << indLevel << indScheme <<
"- \"--infologger-mode\"\n";
248 dumpOut << indLevel << indScheme <<
"- \"'{{ infologger_mode }}'\"\n";
249 dumpOut << indLevel << indScheme <<
"- \"--driver-client-backend\"\n";
250 dumpOut << indLevel << indScheme <<
"- \"'stdout://'\"\n";
251 dumpOut << indLevel << indScheme <<
"- \"--shm-segment-size\"\n";
252 dumpOut << indLevel << indScheme <<
"- \"'{{ shm_segment_size }}'\"\n";
253 dumpOut << indLevel << indScheme <<
"- \"--shm-throw-bad-alloc\"\n";
254 dumpOut << indLevel << indScheme <<
"- \"'{{ shm_throw_bad_alloc }}'\"\n";
255 dumpOut << indLevel << indScheme <<
"- \"--resources-monitoring\"\n";
256 dumpOut << indLevel << indScheme <<
"- \"'{{ resources_monitoring }}'\"\n";
258 for (
size_t ai = 1; ai < execution.args.size(); ++ai) {
259 const char* option = execution.args[ai];
260 const char*
value =
nullptr;
264 if (ai + 1 < execution.args.size() && execution.args[ai + 1][0] !=
'-') {
265 value = execution.args[ai + 1];
272 static const std::set<std::string> omitOptions = {
273 "--channel-config",
"--o2-control",
"--control",
"--session",
"--monitoring-backend",
274 "-b",
"--color",
"--infologger-severity",
"--infologger-mode",
"--driver-client-backend",
275 "--shm-segment-size",
"--shm-throw-bad-alloc",
"--resources-monitoring"};
276 if (omitOptions.find(option) != omitOptions.end()) {
281 dumpOut << indLevel << indScheme << R
"(- ")" << option << "\"\n";
283 dumpOut << indLevel << indScheme << R
"(- )" << fmt::format("{:?}", fmt::format(
"'{}'",
value)) <<
"\n";
288std::string findBinder(
const std::vector<DeviceSpec>& specs,
const std::string& channel)
291 for (
const auto& spec : specs) {
292 for (
const auto& inputChannel : spec.inputChannels) {
293 if (inputChannel.method == ChannelMethod::Bind && inputChannel.name == channel) {
297 for (
const auto& outputChannel : spec.outputChannels) {
298 if (outputChannel.method == ChannelMethod::Bind && outputChannel.name == channel) {
303 throw std::runtime_error(
"Could not find a device which binds the '" + channel +
"' channel.");
306void dumpRole(std::ostream& dumpOut,
const std::string& taskName,
const DeviceSpec& spec,
const std::vector<DeviceSpec>& allSpecs,
const DeviceExecution& execution,
const std::string indLevel)
308 dumpOut << indLevel <<
"- name: \"" << spec.id <<
"\"\n";
310 dumpOut << indLevel << indScheme <<
"connect:\n";
312 for (
const auto& outputChannel : spec.outputChannels) {
313 if (outputChannel.method == ChannelMethod::Connect) {
314 dumpChannelConnect(dumpOut, outputChannel, findBinder(allSpecs, outputChannel.name), indLevel + indScheme);
317 for (
const auto& inputChannel : spec.inputChannels) {
318 if (inputChannel.method == ChannelMethod::Connect) {
319 dumpChannelConnect(dumpOut, inputChannel, findBinder(allSpecs, inputChannel.name), indLevel + indScheme);
322 bool uniqueProxy = isUniqueProxy(spec);
323 bool preserveRawChannels = shouldPreserveRawChannels(spec);
324 bool bindsRawChannels =
false;
325 auto rawChannels = extractRawChannels(spec, execution);
326 for (
const auto& rawChannel : rawChannels) {
327 if (rawChannel.method ==
"connect") {
328 dumpRawChannelConnect(dumpOut, rawChannel, uniqueProxy, preserveRawChannels, indLevel + indScheme);
329 }
else if (rawChannel.method ==
"bind") {
330 bindsRawChannels =
true;
335 if (bindsRawChannels) {
336 dumpOut << indLevel << indScheme <<
"bind:\n";
337 for (
const auto& rawChannel : rawChannels) {
338 if (rawChannel.method ==
"bind") {
339 dumpRawChannelBind(dumpOut, rawChannel, uniqueProxy, preserveRawChannels, indLevel + indScheme);
344 dumpOut << indLevel << indScheme <<
"task:\n";
345 dumpOut << indLevel << indScheme << indScheme <<
"load: " << taskName <<
"\n";
346 dumpOut << indLevel << indScheme << indScheme <<
"critical: " << (isCritical(spec) ?
"true" :
"false") <<
"\n";
349std::string removeO2ControlArg(std::string_view command)
351 const char* o2ControlArg =
" --o2-control ";
352 size_t o2ControlArgStart = command.find(o2ControlArg);
353 if (o2ControlArgStart == std::string_view::npos) {
354 return std::string(command);
356 size_t o2ControlArgEnd = command.find(
" ", o2ControlArgStart + std::strlen(o2ControlArg));
357 auto result = std::string(command.substr(0, o2ControlArgStart));
358 if (o2ControlArgEnd != std::string_view::npos) {
359 result += command.substr(o2ControlArgEnd);
366void dumpTask(std::ostream& dumpOut,
const DeviceSpec& spec,
const DeviceExecution& execution, std::string taskName, std::string indLevel)
368 dumpOut << indLevel <<
"name: " << taskName <<
"\n";
369 dumpOut << indLevel <<
"defaults:\n";
370 dumpOut << indLevel << indScheme <<
"log_task_stdout: none\n";
371 dumpOut << indLevel << indScheme <<
"log_task_stderr: none\n";
372 std::string exitTransitionTimeout =
"15";
373 std::string dataProcessingTimeout =
"10";
374 if (execution.args.size() > 2) {
375 for (
size_t i = 0;
i < execution.args.size() - 1; ++
i) {
376 if (strcmp(execution.args[
i],
"--exit-transition-timeout") == 0) {
377 exitTransitionTimeout = execution.args[
i + 1];
379 if (strcmp(execution.args[
i],
"--data-processing-timeout") == 0) {
380 dataProcessingTimeout = execution.args[
i + 1];
384 dumpOut << indLevel << indScheme <<
"exit_transition_timeout: " << exitTransitionTimeout <<
"\n";
385 dumpOut << indLevel << indScheme <<
"data_processing_timeout: " << dataProcessingTimeout <<
"\n";
387 if (bfs::path(execution.args[0]).filename().string() != execution.args[0]) {
388 LOG(warning) <<
"The workflow template generation was started with absolute or relative executables paths."
389 " Please use the symlinks exported by the build infrastructure or remove the paths manually in the generated templates,"
390 " unless you really need executables within concrete directories";
392 dumpOut << indLevel << indScheme <<
"_module_cmdline: >-\n";
393 dumpOut << indLevel << indScheme << indScheme <<
"source /etc/profile.d/modules.sh && MODULEPATH={{ modulepath }} module load O2 QualityControl Control-OCCPlugin &&\n";
394 dumpOut << indLevel << indScheme << indScheme <<
"{{ dpl_command }} | " << execution.args[0] <<
"\n";
395 dumpOut << indLevel << indScheme <<
"_plain_cmdline: >-\n";
396 dumpOut << indLevel << indScheme << indScheme <<
"source /etc/profile.d/o2.sh && {{ len(extra_env_vars)>0 ? 'export ' + extra_env_vars + ' &&' : '' }} {{ dpl_command }} | " << execution.args[0] <<
"\n";
398 dumpOut << indLevel <<
"control:\n";
399 dumpOut << indLevel << indScheme <<
"mode: \"fairmq\"\n";
402 dumpOut << indLevel <<
"wants:\n";
403 dumpOut << indLevel << indScheme <<
"cpu: 0.01\n";
404 dumpOut << indLevel << indScheme <<
"memory: 1\n";
409 dumpOut << indLevel <<
"limits:\n";
411 dumpOut << indLevel << indScheme <<
"cpu: " <<
cpuKillThreshold.value().value <<
'\n';
418 dumpOut << indLevel <<
"bind:\n";
419 for (
const auto& outputChannel : spec.outputChannels) {
421 implementation::dumpChannelBind(dumpOut, outputChannel, indLevel + indScheme);
424 for (
const auto& inputChannel : spec.inputChannels) {
426 implementation::dumpChannelBind(dumpOut, inputChannel, indLevel + indScheme);
429 bool uniqueProxy = implementation::isUniqueProxy(spec);
430 bool preserveRawChannels = implementation::shouldPreserveRawChannels(spec);
431 auto rawChannels = implementation::extractRawChannels(spec, execution);
432 for (
const auto& rawChannel : rawChannels) {
433 if (rawChannel.method ==
"bind") {
434 dumpRawChannelBind(dumpOut, rawChannel, uniqueProxy, preserveRawChannels, indLevel + indScheme);
438 dumpOut << indLevel <<
"command:\n";
439 implementation::dumpCommand(dumpOut, execution, indLevel + indScheme);
442void dumpWorkflow(std::ostream& dumpOut,
const std::vector<DeviceSpec>& specs,
const std::vector<DeviceExecution>& executions,
const CommandInfo& commandInfo, std::string workflowName, std::string indLevel)
444 dumpOut << indLevel <<
"name: " << workflowName <<
"\n";
446 dumpOut << indLevel <<
"vars:\n";
447 dumpOut << indLevel << indScheme <<
"dpl_command: >-\n";
448 dumpOut << indLevel << indScheme << indScheme << implementation::removeO2ControlArg(commandInfo.command) <<
"\n";
450 dumpOut << indLevel <<
"defaults:\n";
451 dumpOut << indLevel << indScheme <<
"monitoring_dpl_url: \"no-op://\"\n";
452 dumpOut << indLevel << indScheme <<
"user: \"flp\"\n";
453 dumpOut << indLevel << indScheme <<
"fmq_rate_logging: 0\n";
454 dumpOut << indLevel << indScheme <<
"shm_segment_size: 10000000000\n";
455 dumpOut << indLevel << indScheme <<
"shm_throw_bad_alloc: false\n";
456 dumpOut << indLevel << indScheme <<
"session_id: default\n";
457 dumpOut << indLevel << indScheme <<
"resources_monitoring: 15\n";
459 dumpOut << indLevel <<
"roles:\n";
460 for (
size_t di = 0;
di < specs.size();
di++) {
461 auto& spec = specs[
di];
462 auto& execution = executions[
di];
463 implementation::dumpRole(dumpOut, implementation::taskName(workflowName, spec.id), spec, specs, execution, indLevel + indScheme);
468 const std::vector<DeviceSpec>& specs,
469 const std::vector<DeviceExecution>& executions,
470 const CommandInfo& commandInfo)
472 const char* tasksDirectory =
"tasks";
473 const char* workflowsDirectory =
"workflows";
475 LOG(info) <<
"Dumping the workflow configuration for AliECS.";
477 LOG(info) <<
"Creating directories '" << workflowsDirectory <<
"' and '" << tasksDirectory <<
"'.";
478 std::filesystem::create_directory(workflowsDirectory);
479 std::filesystem::create_directory(tasksDirectory);
480 LOG(info) <<
"... created.";
482 assert(specs.size() == executions.size());
484 LOG(info) <<
"Creating a workflow dump '" + workflowName +
"'.";
485 std::string wfDumpPath = std::string(workflowsDirectory) + bfs::path::preferred_separator + workflowName +
".yaml";
486 std::ofstream wfDump(wfDumpPath);
487 dumpWorkflow(wfDump, specs, executions, commandInfo, workflowName,
"");
490 for (
size_t di = 0;
di < specs.size(); ++
di) {
491 auto& spec = specs[
di];
492 auto& execution = executions[
di];
494 LOG(info) <<
"Creating a task dump for '" + spec.id +
"'.";
495 std::string taskName = implementation::taskName(workflowName, spec.id);
496 std::string taskDumpPath = std::string(tasksDirectory) + bfs::path::preferred_separator + taskName +
".yaml";
497 std::ofstream taskDump(taskDumpPath);
498 dumpTask(taskDump, spec, execution, taskName,
"");
500 LOG(info) <<
"...created.";
GLuint GLuint64EXT address
GLuint const GLchar * name
GLsizei const GLfloat * value
GLint GLint GLsizei GLint GLenum GLenum type
const decltype(DataProcessorMetadata::key) privateMemoryKillThresholdMB
const decltype(DataProcessorMetadata::key) cpuKillThreshold
Defining PrimaryVertex explicitly as messageable.
void dumpTask(std::ostream &dumpOut, const DeviceSpec &spec, const DeviceExecution &execution, std::string taskName, std::string indLevel)
Dumps only one task.
void dumpWorkflow(std::ostream &dumpOut, const std::vector< DeviceSpec > &specs, const std::vector< DeviceExecution > &executions, const CommandInfo &commandInfo, std::string workflowName, std::string indLevel)
Dumps only the workflow file.
void dumpDeviceSpec2O2Control(std::string workflowName, std::vector< DeviceSpec > const &specs, std::vector< DeviceExecution > const &executions, CommandInfo const &commandInfo)
Dumps the AliECS compatible workflow and task templates for a DPL workflow.
constexpr const char * channelName(int channel)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"