Project
Loading...
Searching...
No Matches
DDSConfigHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "DDSConfigHelpers.h"
14#include "DeviceSpecHelpers.h"
15#include <map>
16#include <iostream>
17#include <cstring>
18#include <regex>
19#include <fmt/format.h>
20#include <libgen.h>
21#include <regex>
22
23namespace o2::framework
24{
25
27 std::string_view key;
28 std::string_view value;
29};
30
32 void beginChannel() override
33 {
34 names.push_back(-1);
35 isZMQ.push_back(false);
36 hasAddress.push_back(false);
37 isWrite.push_back(false);
39 }
40
41 void endChannel() override
42 {
44 if (names.back() == -1) {
45 throw std::runtime_error("Channel does not have a name.");
46 }
47 // If we have a zmq channel which does not have an address,
48 // use a DDS property.
49 if (isZMQ.back() && (hasAddress.back() == false)) {
51 }
53 }
54
55 void property(std::string_view key, std::string_view value) override
56 {
57 properties.push_back({key, value});
58 if (key == "address") {
59 hasAddress.back() = true;
60 }
61 if (key == "transport" && value == "zeromq") {
62 isZMQ.back() = true;
63 }
64 // Channels that bind need to write the bound address
65 if (key == "method" && value == "bind") {
66 isWrite.back() = true;
67 }
68 if (key == "name") {
69 names.back() = propertyIndex;
70 }
72 }
73
75 int channelIndex = 0;
76 std::vector<ChannelProperties> properties;
77 std::vector<int> names;
78 std::vector<int> requiresProperties;
79 std::vector<int> propertiesBegin;
80 std::vector<int> propertiesEnd;
81 std::vector<bool> isWrite;
82 std::vector<bool> isZMQ;
83 std::vector<bool> hasAddress;
84};
85
86// encode &, ', ", < and > as &amp;, &apos;, &quot;, &lt; and &gt; respectively
87std::string xmlEncode(std::string const& source)
88{
89 std::string result;
90 result.reserve(source.size() * 2);
91 for (char c : source) {
92 switch (c) {
93 case '&':
94 result += "&amp;";
95 break;
96 case '\'':
97 result += "&apos;";
98 break;
99 case '"':
100 result += "&quot;";
101 break;
102 case '<':
103 result += "&lt;";
104 break;
105 case '>':
106 result += "&gt;";
107 break;
108 default:
109 result += c;
110 }
111 }
112 return result;
113}
114
116 DriverMode driverMode,
117 std::string const& workflowSuffix,
118 std::vector<DataProcessorSpec> const& workflow,
119 std::vector<DataProcessorInfo> const& dataProcessorInfos,
120 const std::vector<DeviceSpec>& specs,
121 const std::vector<DeviceExecution>& executions,
122 const CommandInfo& commandInfo)
123{
124 std::ostringstream asset;
125 WorkflowSerializationHelpers::dump(asset, workflow, dataProcessorInfos, commandInfo);
126 // Check if any expendable task is present
127 bool hasExpendableTask = false;
128 for (auto& spec : specs) {
129 for (auto& label : spec.labels) {
130 if (label.value == "expendable") {
131 hasExpendableTask = true;
132 break;
133 }
134 }
135 }
136 out << R"(<topology name="o2-dataflow">)"
137 "\n";
138 if (hasExpendableTask || driverMode == DriverMode::EMBEDDED) {
139 out << R"(<declrequirement name="odc_expendable_task" type="custom" value="true" />)"
140 "\n";
141 }
142 out << fmt::format(R"(<asset name="dpl_json{}" type="inline" visibility="global" value="{}"/>)",
143 workflowSuffix,
144 xmlEncode(asset.str()))
145 << "\n";
146 assert(specs.size() == executions.size());
147 std::vector<ChannelRewriter> rewriters;
148 rewriters.resize(specs.size());
149
150 // Find out if we need properties
151 // and a property for each zmq channel which does not have
152 // and address.
153 for (size_t di = 0; di < specs.size(); ++di) {
154 auto& rewriter = rewriters[di];
155 auto& execution = executions[di];
156 for (size_t cci = 0; cci < execution.args.size(); cci++) {
157 const char* arg = execution.args[cci];
158 if (!arg) {
159 break;
160 }
161 if (strcmp(arg, "--channel-config") == 0) {
162 if (cci + 1 == execution.args.size()) {
163 throw std::runtime_error("wrong channel config found");
164 }
165 ChannelSpecHelpers::parseChannelConfig(execution.args[cci + 1], rewriter);
166 }
167 }
168 for (int ci : rewriter.requiresProperties) {
169 out << " "
170 << fmt::format("<property name=\"fmqchan_{}\" />\n", rewriter.properties[rewriter.names[ci]].value);
171 }
172 }
173
174 if (driverMode == DriverMode::EMBEDDED) {
175 out << " "
176 << fmt::format("<decltask name=\"{}{}\">\n", "dplDriver", workflowSuffix);
177 out << " "
178 << fmt::format(R"(<assets><name>dpl_json{}</name></assets>)", workflowSuffix) << "\n";
179 out << " "
180 << R"(<exe reachable="true">)";
181 out << fmt::format("cat ${{DDS_LOCATION}}/dpl_json{}.asset | o2-dpl-run --driver-mode embedded", workflowSuffix);
182 out << R"(</exe>)"
183 << "<requirements>\n"
184 << " <name>odc_expendable_task</name>\n"
185 << "</requirements>\n"
186 << "\n";
187 out << "</decltask>";
188 }
189
190 for (size_t di = 0; di < specs.size(); ++di) {
191 auto& spec = specs[di];
192 auto& execution = executions[di];
193 if (execution.args.empty()) {
194 continue;
195 }
196
197 out << " "
198 << fmt::format("<decltask name=\"{}{}\">\n", spec.id, workflowSuffix);
199 out << " "
200 << fmt::format(R"(<assets><name>dpl_json{}</name></assets>)", workflowSuffix) << "\n";
201 out << " "
202 << R"(<exe reachable="true">)";
203 out << fmt::format("cat ${{DDS_LOCATION}}/dpl_json{}.asset | ", workflowSuffix);
204 for (auto ei : execution.environ) {
205 out << DeviceSpecHelpers::reworkTimeslicePlaceholder(ei, spec) << " ";
206 }
207 std::string accumulatedChannelPrefix;
208 char* s = strdup(execution.args[0]);
209 out << basename(s) << " ";
210 free(s);
211 for (size_t ai = 1; ai < execution.args.size(); ++ai) {
212 const char* arg = execution.args[ai];
213 if (!arg) {
214 break;
215 }
216 if (strcmp(arg, "--id") == 0 && ai + 1 < execution.args.size()) {
217 out << fmt::format(R"(--id {}_dds%TaskIndex%_%CollectionIndex% )", execution.args[ai + 1]);
218 ai++;
219 continue;
220 }
221 // Do not print out the driver client explicitly
222 if (strcmp(arg, "--driver-client-backend") == 0) {
223 ai++;
224 continue;
225 }
226 if (strcmp(arg, "--control") == 0) {
227 ai++;
228 continue;
229 }
230 if (strcmp(arg, "--channel-prefix") == 0 &&
231 ai + 1 < execution.args.size() &&
232 *execution.args[ai + 1] == 0) {
233 ai++;
234 continue;
235 }
236 if (strpbrk(arg, "' ;@") != nullptr || arg[0] == 0) {
237 out << fmt::format(R"("{}" )", arg);
238 } else if (strpbrk(arg, "\"") != nullptr || arg[0] == 0) {
239 out << fmt::format(R"('{}' )", arg);
240 } else {
241 out << fmt::format(R"({} )", arg);
242 }
243 }
244 out << "--plugin odc";
245 if (accumulatedChannelPrefix.empty() == false) {
246 out << " --channel-config \"" << accumulatedChannelPrefix << "\"";
247 }
248 out << "</exe>\n";
249 // Check if the expendable label is there, and if so, add
250 // the requirement to the XML.
251 if (std::find_if(spec.labels.begin(), spec.labels.end(), [](const auto& label) {
252 return label.value == "expendable";
253 }) != spec.labels.end()) {
254 out << " <requirements>\n";
255 out << " <name>odc_expendable_task</name>\n";
256 out << " </requirements>\n";
257 }
258 auto& rewriter = rewriters[di];
259 if (rewriter.requiresProperties.empty() == false) {
260 out << " <properties>\n";
261 for (auto pi : rewriter.requiresProperties) {
262 out << fmt::format(
263 " <name access=\"{}\">fmqchan_{}</name>\n",
264 rewriter.isWrite[pi] ? "write" : "read",
265 rewriter.properties[rewriter.names[pi]].value);
266 }
267 out << " </properties>\n";
268 }
269 out << " </decltask>\n";
270 }
271 out << " <declcollection name=\"DPL\">\n <tasks>\n";
272 for (const auto& spec : specs) {
273 out << fmt::format(" <name>{}{}</name>\n", spec.id, workflowSuffix);
274 }
275 if (driverMode == DriverMode::EMBEDDED) {
276 out << fmt::format(" <name>{}{}</name>\n", "dplDriver", workflowSuffix);
277 }
278 out << " </tasks>\n </declcollection>\n";
279 out << "</topology>\n";
280}
281
282} // namespace o2::framework
uint32_t c
Definition RawData.h:2
StringRef key
GLuint64EXT * result
Definition glcorearb.h:5662
GLsizei GLsizei GLchar * source
Definition glcorearb.h:798
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::string xmlEncode(std::string const &source)
void property(std::string_view key, std::string_view value) override
std::vector< ChannelProperties > properties
static void parseChannelConfig(char const *channelConfig, FairMQChannelConfigParser &parser)
static void dumpDeviceSpec2DDS(std::ostream &out, DriverMode mode, std::string const &workflowSuffix, std::vector< DataProcessorSpec > const &workflow, std::vector< DataProcessorInfo > const &metadata, std::vector< DeviceSpec > const &specs, std::vector< DeviceExecution > const &executions, CommandInfo const &commandInfo)
static std::string reworkTimeslicePlaceholder(std::string const &str, DeviceSpec const &spec)
Handler to parse the description of the –channel-config.
static void dump(std::ostream &o, std::vector< DataProcessorSpec > const &workflow, std::vector< DataProcessorInfo > const &metadata, CommandInfo const &commandInfo)