Project
Loading...
Searching...
No Matches
WSDriverClient.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "WSDriverClient.h"
14#include "Framework/Logger.h"
17#include "DriverClientContext.h"
18#include "DPLWebSocket.h"
19#include "Framework/Signpost.h"
20#include <uv.h>
21#include <string_view>
22#include <charconv>
23
26O2_DECLARE_DYNAMIC_LOG(monitoring_service);
27O2_DECLARE_DYNAMIC_LOG(data_processor_context);
28O2_DECLARE_DYNAMIC_LOG(stream_context);
30
31namespace o2::framework
32{
33
36 : mClient{client}
37 {
38 }
39
40 void headers(std::map<std::string, std::string> const& headers) override
41 {
42 }
44 void beginFragmentation() override {}
45
48 void frame(char const* frame, size_t s) override
49 {
50 mClient.dispatch(std::string_view(frame, s));
51 }
52
53 void endFragmentation() override {};
54 void control(char const* frame, size_t s) override {};
55
58 void beginChunk() override
59 {
60 }
61
65 void endChunk() override
66 {
67 }
68
72};
73
78
79void on_connect(uv_connect_t* connection, int status)
80{
81 if (status < 0) {
82 LOG(error) << "Unable to connect to driver.";
83 return;
84 }
85 auto* context = (ConnectionContext*)connection->data;
86 WSDriverClient* client = context->client;
87 auto& state = context->ref.get<DeviceState>();
88 state.loopReason |= DeviceState::WS_CONNECTED;
89 auto onHandshake = [client, ref = context->ref]() {
90 client->flushPending(ref);
91 };
92 std::lock_guard<std::mutex> lock(client->mutex());
93 auto handler = std::make_unique<ClientWebSocketHandler>(*client);
94 client->observe("/ping", [](std::string_view) {
95 LOG(info) << "ping";
96 });
98 client->observe("/shm-offer", [ref = context->ref](std::string_view cmd) {
99 auto& state = ref.get<DeviceState>();
100 static constexpr int prefixSize = std::string_view{"/shm-offer "}.size();
101 if (prefixSize > cmd.size()) {
102 LOG(error) << "Malformed shared memory offer";
103 return;
104 }
105 cmd.remove_prefix(prefixSize);
106 size_t offerSize;
107 auto offerSizeError = std::from_chars(cmd.data(), cmd.data() + cmd.size(), offerSize);
108 if (offerSizeError.ec != std::errc()) {
109 LOG(error) << "Malformed shared memory offer";
110 return;
111 }
112 LOGP(detail, "Received {}MB shared memory offer", offerSize);
114 offer.cpu = 0;
115 offer.memory = 0;
116 offer.sharedMemory = offerSize * 1000000;
117 offer.runtime = 10000;
118 offer.user = -1;
119 offer.valid = true;
120
121 state.pendingOffers.push_back(offer);
122 });
123 client->observe("/timeslice-offer", [ref = context->ref](std::string_view cmd) {
124 O2_SIGNPOST_ID_GENERATE(wid, ws_client);
125 O2_SIGNPOST_START(ws_client, wid, "timeslice-offer", "Received timeslice offer.");
126 auto& state = ref.get<DeviceState>();
127 static constexpr int prefixSize = std::string_view{"/timeslice-offer "}.size();
128 if (prefixSize > cmd.size()) {
129 O2_SIGNPOST_END_WITH_ERROR(ws_client, wid, "timeslice-offer", "Malformed timeslice offer");
130 return;
131 }
132 cmd.remove_prefix(prefixSize);
133 int64_t offerSize;
134 auto offerSizeError = std::from_chars(cmd.data(), cmd.data() + cmd.size(), offerSize);
135 if (offerSizeError.ec != std::errc()) {
136 O2_SIGNPOST_END_WITH_ERROR(ws_client, wid, "timeslice-offer", "Unexpected timeslice offer size");
137 return;
138 }
140 .cpu = 0,
141 .memory = 0,
142 .sharedMemory = 0,
143 .timeslices = offerSize,
144 .runtime = 10000,
145 .user = -1,
146 .valid = true};
147 state.pendingOffers.push_back(offer);
148 O2_SIGNPOST_END(ws_client, wid, "timeslice-offer", "Received %lli timeslices offer. Total pending offers %zu.",
149 offerSize, state.pendingOffers.size());
150 });
151
152 client->observe("/quit", [ref = context->ref](std::string_view) {
153 auto& state = ref.get<DeviceState>();
154 state.quitRequested = true;
155 });
156
157 client->observe("/restart", [ref = context->ref](std::string_view) {
158 auto& state = ref.get<DeviceState>();
159 state.nextFairMQState.emplace_back("RUN");
160 state.nextFairMQState.emplace_back("STOP");
161 });
162
163 client->observe("/start", [ref = context->ref](std::string_view) {
164 auto& state = ref.get<DeviceState>();
165 state.nextFairMQState.emplace_back("RUN");
166 });
167
168 client->observe("/stop", [ref = context->ref](std::string_view) {
169 auto& state = ref.get<DeviceState>();
170 state.nextFairMQState.emplace_back("STOP");
171 });
172
173 client->observe("/trace", [ref = context->ref](std::string_view cmd) {
174 auto& state = ref.get<DeviceState>();
175 static constexpr int prefixSize = std::string_view{"/trace "}.size();
176 if (prefixSize > cmd.size()) {
177 LOG(error) << "Malformed tracing request";
178 return;
179 }
180 cmd.remove_prefix(prefixSize);
181 int tracingFlags = 0;
182 auto error = std::from_chars(cmd.data(), cmd.data() + cmd.size(), tracingFlags);
183 if (error.ec != std::errc()) {
184 LOG(error) << "Malformed tracing mask";
185 return;
186 }
187 LOGP(info, "Tracing flags set to {}", tracingFlags);
188 state.tracingFlags = tracingFlags;
189 });
190
191 client->observe("/log-streams", [ref = context->ref](std::string_view cmd) {
192 auto& state = ref.get<DeviceState>();
193 static constexpr int prefixSize = std::string_view{"/log-streams "}.size();
194 if (prefixSize > cmd.size()) {
195 LOG(error) << "Malformed log-streams request";
196 return;
197 }
198 cmd.remove_prefix(prefixSize);
199 int logStreams = 0;
200
201 auto error = std::from_chars(cmd.data(), cmd.data() + cmd.size(), logStreams);
202 if (error.ec != std::errc()) {
203 LOG(error) << "Malformed log-streams mask";
204 return;
205 }
206 LOGP(info, "Logstreams flags set to {}", logStreams);
207 state.logStreams = logStreams;
208 if ((state.logStreams & DeviceState::LogStreams::DEVICE_LOG) != 0) {
209 O2_LOG_ENABLE(device);
210 } else {
211 O2_LOG_DISABLE(device);
212 }
213 if ((state.logStreams & DeviceState::LogStreams::COMPLETION_LOG) != 0) {
214 O2_LOG_ENABLE(completion);
215 } else {
216 O2_LOG_DISABLE(completion);
217 }
219 O2_LOG_ENABLE(monitoring_service);
220 } else {
221 O2_LOG_DISABLE(monitoring_service);
222 }
224 O2_LOG_ENABLE(data_processor_context);
225 } else {
226 O2_LOG_DISABLE(data_processor_context);
227 }
228 if ((state.logStreams & DeviceState::LogStreams::STREAM_CONTEXT_LOG) != 0) {
229 O2_LOG_ENABLE(stream_context);
230 } else {
231 O2_LOG_DISABLE(stream_context);
232 }
233 });
234
235 // Client will be filled in the line after. I can probably have a single
236 // client per device.
237 auto dplClient = std::make_unique<WSDPLClient>();
238 dplClient->connect(context->ref, connection->handle, onHandshake, std::move(handler));
239 client->setDPLClient(std::move(dplClient));
240 client->sendHandshake();
241}
242
244{
245 auto* state = (DeviceState*)handle->data;
247}
248
249WSDriverClient::WSDriverClient(ServiceRegistryRef registry, char const* ip, unsigned short port)
250 : mRegistry(registry)
251{
252 auto& state = registry.get<DeviceState>();
253
254 // Must connect the device to the server and send a websocket request.
255 // On successful connection we can then start to send commands to the driver.
256 // We keep a backlog to make sure we do not lose messages.
257 auto* socket = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
258 uv_tcp_init(state.loop, socket);
259 auto* connection = (uv_connect_t*)malloc(sizeof(uv_connect_t));
260 auto* context = new ConnectionContext{.client = this, .ref = registry};
261 connection->data = context;
262
263 struct sockaddr_in dest;
264 uv_ip4_addr(strdup(ip), port, &dest);
265 uv_tcp_connect(connection, socket, (const struct sockaddr*)&dest, on_connect);
266
267 this->mAwakeMainThread = (uv_async_t*)malloc(sizeof(uv_async_t));
268 this->mAwakeMainThread->data = &state;
269 uv_async_init(state.loop, this->mAwakeMainThread, on_awake_main_thread);
270}
271
273{
274 free(this->mAwakeMainThread);
275}
276
277void sendMessageToDriver(std::unique_ptr<o2::framework::WSDPLClient>& client, char const* message, size_t s)
278{
279}
280
281void WSDriverClient::setDPLClient(std::unique_ptr<WSDPLClient> client)
282{
283 mClient = std::move(client);
284 mConnected = true;
285}
286
288{
289 mClient->sendHandshake();
291}
292
293void WSDriverClient::tell(const char* msg, size_t s, bool flush)
294{
295 // Tell will always accumulate and we signal the main thread we
296 // have metrics to push
297 std::lock_guard<std::mutex> lock(mClientMutex);
299 if (flush) {
300 this->awake();
301 }
302}
303
304void WSDriverClient::awake()
305{
306 uv_async_send(mAwakeMainThread);
307}
308
310{
311 if (mainThreadRef.isMainThread() == false) {
312 LOG(error) << "flushPending not called from main thread";
313 }
314 std::lock_guard<std::mutex> lock(mClientMutex);
315 static bool printed1 = false;
316 static bool printed2 = false;
317 if (!mClient) {
318 if (mBacklog.size() > 2000) {
319 if (!printed1) {
320 LOG(warning) << "Unable to communicate with driver because client does not exist. Continuing connection attempts.";
321 printed1 = true;
322 }
323 }
324 return;
325 }
326 if (!(mClient->isHandshaken())) {
327 if (mBacklog.size() > 2000) {
328 if (!printed2) {
329 LOG(warning) << "Unable to communicate with driver because client is not connected. Continuing connection attempts.";
330 printed2 = true;
331 }
332 }
333 return;
334 }
335 if (printed1 || printed2) {
336 LOGP(warning, "DriverClient connected successfully. Flushing message backlog of {} messages. All is good.", mBacklog.size());
337 printed1 = false;
338 printed2 = false;
339 }
340 mClient->write(mBacklog);
341 mBacklog.resize(0);
342}
343
344} // namespace o2::framework
benchmark::State & state
struct uv_async_s uv_async_t
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:506
#define O2_LOG_DISABLE(log)
Definition Signpost.h:495
#define O2_LOG_ENABLE(log)
Definition Signpost.h:494
#define O2_SIGNPOST_END_WITH_ERROR(log, id, name, format,...)
Definition Signpost.h:616
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
struct uv_connect_s uv_connect_t
void dispatch(std::string_view event)
Dispatch an event.
void observe(char const *eventType, std::function< void(std::string_view)> callback)
Request action on some eventType notified by the driver.
void setDPLClient(std::unique_ptr< WSDPLClient >)
void tell(const char *msg, size_t s, bool flush=true) final
void flushPending(ServiceRegistryRef mainThreadRef) final
WSDriverClient(ServiceRegistryRef registry, char const *ip, unsigned short port)
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
Defining PrimaryVertex explicitly as messageable.
void on_connect(uv_connect_t *connection, int status)
void encode_websocket_frames(std::vector< uv_buf_t > &outputs, char const *src, size_t size, WebSocketOpCode opcode, uint32_t mask)
void sendMessageToDriver(std::unique_ptr< o2::framework::WSDPLClient > &client, char const *message, size_t s)
void on_awake_main_thread(uv_async_t *handle)
void endFragmentation() override
FIXME: not implemented.
void beginFragmentation() override
FIXME: not implemented by the backend.
ClientWebSocketHandler(WSDriverClient &client)
void headers(std::map< std::string, std::string > const &headers) override
Invoked when all the headers are received.
void control(char const *frame, size_t s) override
FIXME: not implemented.
void frame(char const *frame, size_t s) override
int64_t memory
How much memory it can use.
int64_t sharedMemory
How much shared memory it can allocate.
int user
Which task is using the offer.
int cpu
How many cores it can use.
Running state information of a given device.
Definition DeviceState.h:34
An handler for a websocket message stream.
Definition HTTPParser.h:137
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
uint64_t const void const *restrict const msg
Definition x9.h:153