Project
Loading...
Searching...
No Matches
WSDriverClient.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "WSDriverClient.h"
14#include "Framework/Logger.h"
17#include "DriverClientContext.h"
18#include "DPLWebSocket.h"
19#include "Framework/Signpost.h"
20#include <uv.h>
21#include <string_view>
22#include <charconv>
23
26O2_DECLARE_DYNAMIC_LOG(monitoring_service);
27O2_DECLARE_DYNAMIC_LOG(data_processor_context);
28O2_DECLARE_DYNAMIC_LOG(stream_context);
30
31namespace o2::framework
32{
33
36 : mClient{client}
37 {
38 }
39
40 void headers(std::map<std::string, std::string> const& headers) override
41 {
42 }
44 void beginFragmentation() override {}
45
48 void frame(char const* frame, size_t s) override
49 {
50 mClient.dispatch(std::string_view(frame, s));
51 }
52
53 void endFragmentation() override {};
54 void control(char const* frame, size_t s) override {};
55
58 void beginChunk() override
59 {
60 }
61
65 void endChunk() override
66 {
67 }
68
72};
73
78
79void on_connect(uv_connect_t* connection, int status)
80{
81 if (status < 0) {
82 LOG(error) << "Unable to connect to driver.";
83 return;
84 }
85 auto* context = (ConnectionContext*)connection->data;
86 WSDriverClient* client = context->client;
87 auto& state = context->ref.get<DeviceState>();
88 state.loopReason |= DeviceState::WS_CONNECTED;
89 auto onHandshake = [client, ref = context->ref]() {
90 client->flushPending(ref);
91 };
92 std::lock_guard<std::mutex> lock(client->mutex());
93 auto handler = std::make_unique<ClientWebSocketHandler>(*client);
94 client->observe("/ping", [](std::string_view) {
95 LOG(info) << "ping";
96 });
98 client->observe("/shm-offer", [ref = context->ref](std::string_view cmd) {
99 auto& state = ref.get<DeviceState>();
100 static constexpr int prefixSize = std::string_view{"/shm-offer "}.size();
101 if (prefixSize > cmd.size()) {
102 LOG(error) << "Malformed shared memory offer";
103 return;
104 }
105 cmd.remove_prefix(prefixSize);
106 size_t offerSize;
107 auto offerSizeError = std::from_chars(cmd.data(), cmd.data() + cmd.size(), offerSize);
108 if (offerSizeError.ec != std::errc()) {
109 LOG(error) << "Malformed shared memory offer";
110 return;
111 }
112 LOGP(detail, "Received {}MB shared memory offer", offerSize);
114 offer.cpu = 0;
115 offer.memory = 0;
116 offer.sharedMemory = offerSize * 1000000;
117 offer.runtime = 10000;
118 offer.user = -1;
119 offer.valid = true;
120
121 state.pendingOffers.push_back(offer);
122 });
123 client->observe("/timeslice-offer", [ref = context->ref](std::string_view cmd) {
124 O2_SIGNPOST_ID_GENERATE(wid, ws_client);
125 O2_SIGNPOST_START(ws_client, wid, "timeslice-offer", "Received timeslice offer.");
126 auto& state = ref.get<DeviceState>();
127 static constexpr int prefixSize = std::string_view{"/timeslice-offer "}.size();
128 if (prefixSize > cmd.size()) {
129 O2_SIGNPOST_END_WITH_ERROR(ws_client, wid, "timeslice-offer", "Malformed timeslice offer");
130 return;
131 }
132 cmd.remove_prefix(prefixSize);
133 int64_t offerSize;
134 auto offerSizeError = std::from_chars(cmd.data(), cmd.data() + cmd.size(), offerSize);
135 if (offerSizeError.ec != std::errc()) {
136 O2_SIGNPOST_END_WITH_ERROR(ws_client, wid, "timeslice-offer", "Unexpected timeslice offer size");
137 return;
138 }
140 .cpu = 0,
141 .memory = 0,
142 .sharedMemory = 0,
143 .timeslices = offerSize,
144 .runtime = 10000,
145 .user = -1,
146 .valid = true};
147 state.pendingOffers.push_back(offer);
148 O2_SIGNPOST_END(ws_client, wid, "timeslice-offer", "Received %lli timeslices offer. Total pending offers %zu.",
149 offerSize, state.pendingOffers.size());
150 });
151
152 client->observe("/quit", [ref = context->ref](std::string_view) {
153 auto& state = ref.get<DeviceState>();
154 state.quitRequested = true;
155 });
156
157 client->observe("/restart", [ref = context->ref](std::string_view) {
158 auto& state = ref.get<DeviceState>();
159 state.nextFairMQState.emplace_back("RUN");
160 state.nextFairMQState.emplace_back("STOP");
161 });
162
163 client->observe("/start", [ref = context->ref](std::string_view) {
164 auto& state = ref.get<DeviceState>();
165 state.nextFairMQState.emplace_back("RUN");
166 });
167
168 client->observe("/stop", [ref = context->ref](std::string_view) {
169 auto& state = ref.get<DeviceState>();
170 state.nextFairMQState.emplace_back("STOP");
171 });
172
173 client->observe("/trace", [ref = context->ref](std::string_view cmd) {
174 auto& state = ref.get<DeviceState>();
175 static constexpr int prefixSize = std::string_view{"/trace "}.size();
176 if (prefixSize > cmd.size()) {
177 LOG(error) << "Malformed tracing request";
178 return;
179 }
180 cmd.remove_prefix(prefixSize);
181 int tracingFlags = 0;
182 auto error = std::from_chars(cmd.data(), cmd.data() + cmd.size(), tracingFlags);
183 if (error.ec != std::errc()) {
184 LOG(error) << "Malformed tracing mask";
185 return;
186 }
187 LOGP(info, "Tracing flags set to {}", tracingFlags);
188 state.tracingFlags = tracingFlags;
189 });
190
191 client->observe("/signpost:enable", [](std::string_view cmd) {
192 static constexpr int prefixSize = std::string_view{"/signpost:enable "}.size();
193 if (cmd.size() <= prefixSize) {
194 LOG(error) << "Malformed /signpost:enable request";
195 return;
196 }
197 std::string name(cmd.substr(prefixSize));
198 o2_walk_logs([](char const* logName, void* l, void* context) -> bool {
199 auto* log = static_cast<_o2_log_t*>(l);
200 auto* target = static_cast<std::string*>(context);
201 if (*target == logName) {
202 _o2_log_set_stacktrace(log, log->defaultStacktrace);
203 return false;
204 }
205 return true;
206 }, &name);
207 });
208
209 client->observe("/signpost:disable", [](std::string_view cmd) {
210 static constexpr int prefixSize = std::string_view{"/signpost:disable "}.size();
211 if (cmd.size() <= prefixSize) {
212 LOG(error) << "Malformed /signpost:disable request";
213 return;
214 }
215 std::string name(cmd.substr(prefixSize));
216 o2_walk_logs([](char const* logName, void* l, void* context) -> bool {
217 auto* log = static_cast<_o2_log_t*>(l);
218 auto* target = static_cast<std::string*>(context);
219 if (*target == logName) {
221 return false;
222 }
223 return true;
224 }, &name);
225 });
226
227 // Client will be filled in the line after. I can probably have a single
228 // client per device.
229 auto dplClient = std::make_unique<WSDPLClient>();
230 dplClient->connect(context->ref, connection->handle, onHandshake, std::move(handler));
231 client->setDPLClient(std::move(dplClient));
232 client->sendHandshake();
233}
234
236{
237 auto* state = (DeviceState*)handle->data;
239}
240
241WSDriverClient::WSDriverClient(ServiceRegistryRef registry, char const* ip, unsigned short port)
242 : mRegistry(registry)
243{
244 auto& state = registry.get<DeviceState>();
245
246 // Must connect the device to the server and send a websocket request.
247 // On successful connection we can then start to send commands to the driver.
248 // We keep a backlog to make sure we do not lose messages.
249 auto* socket = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
250 uv_tcp_init(state.loop, socket);
251 auto* connection = (uv_connect_t*)malloc(sizeof(uv_connect_t));
252 auto* context = new ConnectionContext{.client = this, .ref = registry};
253 connection->data = context;
254
255 struct sockaddr_in dest;
256 uv_ip4_addr(strdup(ip), port, &dest);
257 uv_tcp_connect(connection, socket, (const struct sockaddr*)&dest, on_connect);
258
259 this->mAwakeMainThread = (uv_async_t*)malloc(sizeof(uv_async_t));
260 this->mAwakeMainThread->data = &state;
261 uv_async_init(state.loop, this->mAwakeMainThread, on_awake_main_thread);
262}
263
265{
266 free(this->mAwakeMainThread);
267}
268
269void sendMessageToDriver(std::unique_ptr<o2::framework::WSDPLClient>& client, char const* message, size_t s)
270{
271}
272
273void WSDriverClient::setDPLClient(std::unique_ptr<WSDPLClient> client)
274{
275 mClient = std::move(client);
276 mConnected = true;
277}
278
280{
281 mClient->sendHandshake();
283}
284
285void WSDriverClient::tell(const char* msg, size_t s, bool flush)
286{
287 // Tell will always accumulate and we signal the main thread we
288 // have metrics to push
289 std::lock_guard<std::mutex> lock(mClientMutex);
291 if (flush) {
292 this->awake();
293 }
294}
295
296void WSDriverClient::awake()
297{
298 uv_async_send(mAwakeMainThread);
299}
300
302{
303 if (mainThreadRef.isMainThread() == false) {
304 LOG(error) << "flushPending not called from main thread";
305 }
306 std::lock_guard<std::mutex> lock(mClientMutex);
307 static bool printed1 = false;
308 static bool printed2 = false;
309 if (!mClient) {
310 if (mBacklog.size() > 2000) {
311 if (!printed1) {
312 LOG(warning) << "Unable to communicate with driver because client does not exist. Continuing connection attempts.";
313 printed1 = true;
314 }
315 }
316 return;
317 }
318 if (!(mClient->isHandshaken())) {
319 if (mBacklog.size() > 2000) {
320 if (!printed2) {
321 LOG(warning) << "Unable to communicate with driver because client is not connected. Continuing connection attempts.";
322 printed2 = true;
323 }
324 }
325 return;
326 }
327 if (printed1 || printed2) {
328 LOGP(warning, "DriverClient connected successfully. Flushing message backlog of {} messages. All is good.", mBacklog.size());
329 printed1 = false;
330 printed2 = false;
331 }
332 mClient->write(mBacklog);
333 mBacklog.resize(0);
334}
335
336} // namespace o2::framework
benchmark::State & state
struct uv_async_s uv_async_t
o2_log_handle_t * o2_walk_logs(bool(*callback)(char const *name, void *log, void *context), void *context=nullptr)
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
void _o2_log_set_stacktrace(_o2_log_t *log, int stacktrace)
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_SIGNPOST_END_WITH_ERROR(log, id, name, format,...)
Definition Signpost.h:616
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
struct uv_connect_s uv_connect_t
void dispatch(std::string_view event)
Dispatch an event.
void observe(char const *eventType, std::function< void(std::string_view)> callback)
Request action on some eventType notified by the driver.
void setDPLClient(std::unique_ptr< WSDPLClient >)
void tell(const char *msg, size_t s, bool flush=true) final
void flushPending(ServiceRegistryRef mainThreadRef) final
WSDriverClient(ServiceRegistryRef registry, char const *ip, unsigned short port)
GLuint const GLchar * name
Definition glcorearb.h:781
GLenum target
Definition glcorearb.h:1641
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
Defining PrimaryVertex explicitly as messageable.
Definition Cartesian.h:288
void on_connect(uv_connect_t *connection, int status)
void encode_websocket_frames(std::vector< uv_buf_t > &outputs, char const *src, size_t size, WebSocketOpCode opcode, uint32_t mask)
void sendMessageToDriver(std::unique_ptr< o2::framework::WSDPLClient > &client, char const *message, size_t s)
void on_awake_main_thread(uv_async_t *handle)
void endFragmentation() override
FIXME: not implemented.
void beginFragmentation() override
FIXME: not implemented by the backend.
ClientWebSocketHandler(WSDriverClient &client)
void headers(std::map< std::string, std::string > const &headers) override
Invoked when all the headers are received.
void control(char const *frame, size_t s) override
FIXME: not implemented.
void frame(char const *frame, size_t s) override
int64_t memory
How much memory it can use.
int64_t sharedMemory
How much shared memory it can allocate.
int user
Which task is using the offer.
int cpu
How many cores it can use.
Running state information of a given device.
Definition DeviceState.h:34
An handler for a websocket message stream.
Definition HTTPParser.h:137
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
uint64_t const void const *restrict const msg
Definition x9.h:153