Project
Loading...
Searching...
No Matches
ChannelSpecHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
13#include <fmt/format.h>
14#include <ostream>
15#include <cassert>
16#include <cctype>
17#include <regex>
18#include <unistd.h>
19#if __has_include(<filesystem>)
20#include <filesystem>
21namespace fs = std::filesystem;
22#endif
23
24namespace
25{
26std::string getTmpFolder()
27{
28 std::string tmppath = fs::temp_directory_path().native();
29 while (tmppath.back() == '/') {
30 tmppath.pop_back();
31 }
32 return tmppath;
33}
34} // namespace
35
36namespace o2::framework
37{
38
43
47
48bool isIPAddress(const std::string& address)
49{
50 std::regex ipv4_regex("^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$");
51 if (std::regex_match(address, ipv4_regex)) {
52 return true;
53 }
54 return false;
55}
56
57void OutputChannelSpecConfigParser::property(std::string_view key, std::string_view value)
58{
59 std::string valueStr = value.data();
60 if (key == "address") {
61 auto parseAddress = [v = std::string(value), &outputChannelSpec = specs.back()]() {
62 auto value = v;
63 std::string protocol = "tcp";
64 std::string hostname = "127.0.0.1";
65 std::string port = "9090";
66 auto pos = value.find("://");
67 if (pos != std::string::npos) {
68 protocol = value.substr(0, pos);
69 value = value.substr(pos + 3);
70 }
71 if (protocol == "tcp") {
72 pos = value.find(':');
73 if (pos != std::string::npos) {
74 hostname = value.substr(0, pos);
75 value = value.substr(pos + 1);
76 } else {
77 throw runtime_error_f("Port not found in address '%s'", v.c_str());
78 }
79 port = value;
80 if (isIPAddress(hostname) == false) {
81 throw runtime_error_f("Invalid ip address '%s'", hostname.c_str());
82 }
83 outputChannelSpec.hostname = hostname;
84 outputChannelSpec.port = std::stoi(port);
85 outputChannelSpec.protocol = ChannelProtocol::Network;
86 } else if (protocol == "ipc") {
87 outputChannelSpec.hostname = value;
88 outputChannelSpec.port = 0;
89 outputChannelSpec.protocol = ChannelProtocol::IPC;
90 } else {
91 throw runtime_error_f("Unknown protocol '%s'", protocol.c_str());
92 }
93 };
94 parseAddress();
95 }
96 auto& outputChannelSpec = specs.back();
97 if (key == "name") {
98 outputChannelSpec.name = value;
99 } else if (key == "type" && value == "pub") {
100 outputChannelSpec.type = ChannelType::Pub;
101 } else if (key == "type" && value == "sub") {
102 outputChannelSpec.type = ChannelType::Sub;
103 } else if (key == "type" && value == "push") {
104 outputChannelSpec.type = ChannelType::Push;
105 } else if (key == "type" && value == "pull") {
106 outputChannelSpec.type = ChannelType::Pull;
107 } else if (key == "type" && value == "pair") {
108 outputChannelSpec.type = ChannelType::Pair;
109 } else if (key == "method" && value == "bind") {
110 outputChannelSpec.method = ChannelMethod::Bind;
111 } else if (key == "method" && value == "connect") {
112 outputChannelSpec.method = ChannelMethod::Connect;
113 } else if (key == "rateLogging") {
114 outputChannelSpec.rateLogging = std::stoi(valueStr);
115 } else if (key == "recvBufSize") {
116 outputChannelSpec.recvBufferSize = std::stoi(valueStr);
117 } else if (key == "sendBufSize") {
118 outputChannelSpec.recvBufferSize = std::stoi(valueStr);
119 }
120}
121
123{
124 throw runtime_error_f("Error in channel config.");
125}
126
128{
129 switch (type) {
130 case ChannelType::Pub:
131 return "pub";
132 case ChannelType::Sub:
133 return "sub";
135 return "push";
137 return "pull";
139 return "pair";
140 }
141 throw runtime_error("Unknown ChannelType");
142}
143
145{
146 switch (method) {
148 return "bind";
150 return "connect";
151 }
152 throw runtime_error("Unknown ChannelMethod");
153}
154
155namespace
156{
157std::string composeIPCName(std::string const& prefix, std::string const& hostname, short port)
158{
159 if (prefix.empty() == false && prefix[0] == '@') {
160 return fmt::format("ipc://{}{}_{},transport=shmem", prefix, hostname, port);
161 }
162 if (prefix.empty() == false && prefix.back() == '/') {
163 return fmt::format("ipc://{}{}_{},transport=shmem", prefix, hostname, port);
164 }
165 return fmt::format("ipc://{}/{}_{},transport=shmem", prefix, hostname, port);
166}
167} // namespace
168
170{
171 switch (channel.protocol) {
173 return composeIPCName(channel.ipcPrefix, channel.hostname, channel.port);
174 default:
175 return channel.method == ChannelMethod::Bind ? fmt::format("tcp://*:{},autoBind=false", channel.port)
176 : fmt::format("tcp://{}:{}", channel.hostname, channel.port);
177 }
178}
179
181{
182 switch (channel.protocol) {
184 return composeIPCName(channel.ipcPrefix, channel.hostname, channel.port);
185 default:
186 return channel.method == ChannelMethod::Bind ? fmt::format("tcp://*:{},autoBind=false", channel.port)
187 : fmt::format("tcp://{}:{}", channel.hostname, channel.port);
188 }
189}
190
192std::ostream& operator<<(std::ostream& s, ChannelType const& type)
193{
195 return s;
196}
197
199std::ostream& operator<<(std::ostream& s, ChannelMethod const& method)
200{
202 return s;
203}
204
206 BEGIN,
209 BEGIN_KEY,
211 END_VALUE,
212 END,
213 ERROR
214};
215
217{
219 char const* cur = config;
220 char const* next = config;
221 std::string_view key;
222 std::string_view value;
223 char const* nameKey = "name";
224 char const* lastError = "bad configuation string";
225
226 while (true) {
227 switch (state) {
229 if (*cur == '\0') {
230 lastError = "empty config string";
232 } else if (!isalpha(*cur)) {
233 lastError = "first character is not alphabetic";
235 } else {
237 }
238 break;
239 }
241 next = strpbrk(cur, ":=;,");
242 if (*next == ';' || *next == ',') {
243 lastError = "expected channel name";
245 break;
246 } else if (*next == ':') {
247 handler.beginChannel();
248 key = std::string_view(nameKey, 4);
249 value = std::string_view(cur, next - cur);
250 handler.property(key, value);
252 cur = next + 1;
253 if (*cur == '\0') {
255 } else {
257 }
258 break;
259 }
260 handler.beginChannel();
262 break;
263 }
265 next = strchr(cur, '=');
266 if (next == nullptr) {
267 lastError = "expected '='";
269 } else {
270 key = std::string_view(cur, next - cur);
272 cur = next + 1;
273 }
274 break;
275 }
277 next = strpbrk(cur, ";,");
278 if (next == nullptr) {
279 size_t l = strlen(cur);
280 value = std::string_view(cur, l);
282 cur = cur + l;
283 } else if (*next == ';') {
284 value = std::string_view(cur, next - cur);
286 cur = next;
287 } else if (*next == ',') {
288 value = std::string_view(cur, next - cur);
290 cur = next;
291 }
292 handler.property(key, value);
293 break;
294 }
296 if (*cur == '\0') {
298 } else if (*cur == ',') {
300 cur++;
301 } else if (*cur == ';') {
303 cur++;
304 }
305 break;
306 }
308 handler.endChannel();
309 if (*cur == '\0') {
311 } else if (*cur == ';') {
313 cur++;
314 } else {
315 lastError = "expected ';'";
317 }
318 break;
319 }
321 return;
322 }
324 throw runtime_error_f("Unable to parse channel config: %s", lastError);
325 }
326 }
327 }
328}
329
331{
332#ifdef __linux__
333 // On linux we can use abstract sockets to avoid the need for a file.
334 // This is not available on macOS.
335 // Notice also that when running inside a docker container, like
336 // when on alien, the abstract socket is not isolated, so we need
337 // to add some unique identifier to avoid collisions.
338 char const* channelPrefix = getenv("ALIEN_PROC_ID");
339 if (channelPrefix) {
340 return fmt::format("@dpl_{}_", channelPrefix);
341 }
342 return "@";
343#else
345 char const* channelPrefix = getenv("TMPDIR");
346 if (channelPrefix) {
347 return {channelPrefix};
348 }
349 return access("/tmp", W_OK) == 0 ? "/tmp/" : "./";
350#endif
351}
352
353} // namespace o2::framework
benchmark::State & state
uint16_t pos
Definition RawData.h:3
StringRef key
GLuint GLuint64EXT address
Definition glcorearb.h:5846
const GLdouble * v
Definition glcorearb.h:832
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLuint GLint GLboolean GLint GLenum access
Definition glcorearb.h:2196
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RuntimeErrorRef runtime_error(const char *)
std::ostream & operator<<(std::ostream &s, ChannelType const &type)
Stream operators so that we can use ChannelType with Boost.Test.
bool isIPAddress(const std::string &address)
ChannelType
These map to zeromq types for the channels.
Definition ChannelSpec.h:27
RuntimeErrorRef runtime_error_f(const char *,...)
static void parseChannelConfig(char const *channelConfig, FairMQChannelConfigParser &parser)
static char const * typeAsString(enum ChannelType type)
return a ChannelType as a lowercase string
static std::string channelUrl(InputChannelSpec const &)
static char const * methodAsString(enum ChannelMethod method)
return a ChannelMethod as a lowercase string
Handler to parse the description of the –channel-config.
virtual void property(std::string_view, std::string_view)
void property(std::string_view, std::string_view) override