Project
Loading...
Searching...
No Matches
DPLWebSocket.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "Framework/Logger.h"
12#include "DPLWebSocket.h"
18#include "DriverServerContext.h"
19#include "DriverClientContext.h"
21#include "HTTPParser.h"
22#include <algorithm>
23#include <atomic>
24#include <uv.h>
25#include <sys/types.h>
26#include <unistd.h>
27#include <memory>
29
30namespace o2::framework
31{
32
33static void my_alloc_cb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
34{
35 buf->base = (char*)malloc(suggested_size);
36 buf->len = suggested_size;
37}
38
41{
42 LOG(debug) << "socket closed";
43 delete (WSDPLHandler*)handle->data;
44 free(handle);
45}
46
47void ws_error_write_callback(uv_write_t* h, int status)
48{
49 LOG(error) << "Error in write callback: " << uv_strerror(status);
50 if (h->data) {
51 free(h->data);
52 }
54 free(h);
55}
56
58void websocket_server_callback(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
59{
60 WSDPLHandler* server = (WSDPLHandler*)stream->data;
61 assert(server);
62 if (nread == 0) {
63 return;
64 }
65 if (nread == UV_EOF) {
66 LOG(detail) << "websocket_server_callback: communication with driver closed upon EOF";
68 return;
69 }
70 if (nread < 0) {
71 LOG(error) << "websocket_server_callback: Error while reading from websocket" << uv_strerror((int)nread);
73 return;
74 }
75 try {
76 parse_http_request(buf->base, nread, server);
77 free(buf->base);
78 } catch (RuntimeErrorRef& ref) {
80 LOG(error) << "Error while parsing request: " << err.what;
81 }
82}
83
86void ws_handshake_done_callback(uv_write_t* h, int status)
87{
88 if (status) {
89 LOG(error) << "uv_write error: " << uv_err_name(status);
90 free(h);
91 return;
92 }
93 uv_read_start((uv_stream_t*)h->handle, (uv_alloc_cb)my_alloc_cb, websocket_server_callback);
94}
95
96enum struct GUIOpcodes : uint8_t {
97 Mousepos = 1,
98 Mouseclick = 2,
99 Mousewheel = 3,
100 Window = 4,
101 Latency = 5,
102 Keydown = 6,
103 Keyup = 7,
104 Charin = 8
105};
106
110 : mContext{context}, mRenderer{renderer}
111 {
112 }
114 {
116 uv_timer_stop(&(mRenderer->drawTimer));
117 delete mRenderer;
118 LOGP(info, "RemoteGUI disconnected, {} left", mContext.gui->renderers.size());
119 }
120
121 void headers(std::map<std::string, std::string> const& headers) override {}
122 void beginFragmentation() override {}
123 void frame(char const* frame, size_t s) override
124 {
125 GUIOpcodes opcode = (GUIOpcodes) * (frame++);
126 switch (opcode) {
128 float* positions = (float*)frame;
129 mContext.gui->plugin->updateMousePos(positions[0], positions[1]);
130 break;
131 }
133 char isClicked = *frame;
134 mContext.gui->plugin->updateMouseButton(isClicked == 1);
135 break;
136 }
138 int movement = *frame;
140 break;
141 }
142 case GUIOpcodes::Window: {
143 int* size = (int*)frame;
145 break;
146 }
147 case GUIOpcodes::Latency: {
148 int lat = *((int*)frame);
149 lat = lat < 20 ? 20 : lat;
150 uv_timer_set_repeat(&(mRenderer->drawTimer), lat);
151 break;
152 }
153 case GUIOpcodes::Keydown: {
154 char key = *frame;
155 mContext.gui->plugin->keyEvent(key, true);
156 break;
157 }
158 case GUIOpcodes::Keyup: {
159 char key = *frame;
160 mContext.gui->plugin->keyEvent(key, false);
161 break;
162 }
163 case GUIOpcodes::Charin: {
164 char key = *frame;
166 break;
167 }
168 }
169 }
170 void endFragmentation() override {};
171 void control(char const* frame, size_t s) override {};
172 void beginChunk() override {};
173 void endChunk() override {};
174
179};
180
182 : mStream{s},
183 mServerContext{context}
184{
185}
186
187void WSDPLHandler::method(std::string_view const& s)
188{
189 if (s != "GET") {
190 throw WSError{400, "Bad Request"};
191 }
192}
193
194void WSDPLHandler::target(std::string_view const& s)
195{
196 if (s != "/") {
197 throw WSError{404, "Unknown"};
198 }
199}
200
201void populateHeader(std::map<std::string, std::string>& headers, std::string_view const& k, std::string_view const& v)
202{
203 std::string kk{k};
204 std::string vv{v};
205 std::transform(kk.begin(), kk.end(), kk.begin(),
206 [](unsigned char c) { return std::tolower(c); });
207 if (kk != "sec-websocket-accept" && kk != "sec-websocket-key") {
208 std::transform(vv.begin(), vv.end(), vv.begin(),
209 [](unsigned char c) { return std::tolower(c); });
210 }
211 headers.insert(std::make_pair(kk, vv));
212}
213
214void remoteGuiCallback(uv_timer_s* ctx)
215{
216 auto* renderer = reinterpret_cast<GuiRenderer*>(ctx->data);
217 assert(renderer);
218
219 void* frame = nullptr;
220 void* draw_data = nullptr;
221 int size;
222 uint64_t frameStart = uv_hrtime();
223 uint64_t frameLatency = frameStart - renderer->gui->frameLast;
224
225 // if less than 15ms have passed reuse old frame
226 if (renderer->gui->lastFrame == nullptr || frameLatency / 1000000 > 15) {
227 renderer->gui->plugin->pollGUIPreRender(renderer->gui->window, (float)frameLatency / 1000000000.0f);
228 draw_data = renderer->gui->plugin->pollGUIRender(renderer->gui->callback);
229 renderer->gui->plugin->pollGUIPostRender(renderer->gui->window, draw_data);
230 } else {
231 draw_data = renderer->gui->lastFrame;
232 }
233
234 renderer->gui->plugin->getFrameRaw(draw_data, &frame, &size, renderer->updateTextures);
235 // For now we only sent the text atlas once
236 renderer->updateTextures = false;
237 std::vector<uv_buf_t> outputs;
238 encode_websocket_frames(outputs, (const char*)frame, size, WebSocketOpCode::Binary, 0);
239 renderer->handler->write(outputs);
240 free(frame);
241
242 renderer->guiConnected = true;
243
244 if (frameLatency / 1000000 > 15) {
245 uint64_t frameEnd = uv_hrtime();
246 *(renderer->gui->frameCost) = (frameEnd - frameStart) / 1000000.f;
247 *(renderer->gui->frameLatency) = frameLatency / 1000000.f;
248 renderer->gui->frameLast = frameStart;
249 renderer->gui->lastFrame = draw_data;
250 }
251}
252
253void WSDPLHandler::header(std::string_view const& k, std::string_view const& v)
254{
256}
257
259{
261 if (mHeaders["upgrade"] != "websocket") {
262 throw WSError{400, "Bad Request: not a websocket upgrade"};
263 }
264
265 if (mHeaders["connection"].find("upgrade") == std::string::npos) {
266 throw WSError{400, "Bad Request: connection not for upgrade"};
267 }
268 if (mHeaders["sec-websocket-protocol"] != "dpl") {
269 throw WSError{400, "Bad Request: websocket protocol not \"dpl\"."};
270 }
271 if (mHeaders.count("sec-websocket-key") == 0) {
272 throw WSError{400, "Bad Request: sec-websocket-key missing"};
273 }
274 if (mHeaders["sec-websocket-version"] != "13") {
275 throw WSError{400, "Bad Request: wrong protocol version"};
276 }
278 LOG(debug) << "Got upgrade request with nonce " << mHeaders["sec-websocket-key"].c_str();
279 std::string reply = encode_websocket_handshake_reply(mHeaders["sec-websocket-key"].c_str());
280 mHandshaken = true;
281
282 uv_buf_t bfr = uv_buf_init(strdup(reply.data()), reply.size());
283 auto* info_req = (uv_write_t*)malloc(sizeof(uv_write_t));
284 uv_write(info_req, (uv_stream_t*)mStream, &bfr, 1, ws_handshake_done_callback);
285 auto header = mHeaders.find("x-dpl-pid");
286 if (header != mHeaders.end()) {
287 LOG(debug) << "Driver connected to PID : " << header->second;
288 mHandler = std::make_unique<ControlWebSocketHandler>(*mServerContext);
289 mHandler->headers(mHeaders);
290
291 for (size_t i = 0; i < mServerContext->infos->size(); ++i) {
292 if (std::to_string((*mServerContext->infos)[i].pid) == header->second) {
293 (*mServerContext->controls)[i].controller = new DeviceController{this};
294 break;
295 }
296 }
297 } else {
298 if ((mServerContext->isDriver && getenv("DPL_DRIVER_REMOTE_GUI")) || ((mServerContext->isDriver == false) && getenv("DPL_DEVICE_REMOTE_GUI"))) {
299 LOG(info) << "Connection not bound to a PID";
300 auto* renderer = new GuiRenderer;
301 renderer->gui = mServerContext->gui;
302 renderer->handler = this;
303 uv_timer_init(mServerContext->loop, &(renderer->drawTimer));
304 renderer->drawTimer.data = renderer;
305 uv_timer_start(&(renderer->drawTimer), remoteGuiCallback, 0, 200);
306 mHandler = std::make_unique<GUIWebSocketHandler>(*mServerContext, renderer);
307 mHandler->headers(mHeaders);
308 mServerContext->gui->renderers.insert(renderer);
309
310 LOGP(info, "RemoteGUI connected, {} running", mServerContext->gui->renderers.size());
311 } else {
312 LOGP(warning, "Connection not bound to a PID however {} is not set. Skipping.",
313 mServerContext->isDriver ? "DPL_DRIVER_REMOTE_GUI" : "DPL_DEVICE_REMOTE_GUI");
314 throw WSError{418, "Remote GUI not enabled"};
315 }
316 }
317}
318
320void WSDPLHandler::body(char* data, size_t s)
321{
322 decode_websocket(data, s, *mHandler.get());
323}
324
325void ws_server_write_callback(uv_write_t* h, int status)
326{
327 if (status) {
328 LOG(error) << "uv_write error: " << uv_err_name(status);
329 free(h);
330 return;
331 }
332 if (h->data) {
333 free(h->data);
334 }
335 free(h);
336}
337
338void ws_server_bulk_write_callback(uv_write_t* h, int status)
339{
340 if (status) {
341 LOG(error) << "uv_write error: " << uv_err_name(status);
342 free(h);
343 return;
344 }
345 auto* buffers = (std::vector<uv_buf_t>*)h->data;
346 if (buffers) {
347 for (auto& b : *buffers) {
348 free(b.base);
349 }
350 }
351 delete buffers;
352 free(h);
353}
354
355void WSDPLHandler::write(char const* message, size_t s)
356{
357 uv_buf_t bfr = uv_buf_init(strdup(message), s);
358 auto* write_req = (uv_write_t*)malloc(sizeof(uv_write_t));
359 write_req->data = bfr.base;
360 uv_write(write_req, (uv_stream_t*)mStream, &bfr, 1, ws_server_write_callback);
361}
362
363void WSDPLHandler::write(std::vector<uv_buf_t>& outputs)
364{
365 if (outputs.empty()) {
366 return;
367 }
368 auto* write_req = (uv_write_t*)malloc(sizeof(uv_write_t));
369 auto* buffers = new std::vector<uv_buf_t>;
370 buffers->swap(outputs);
371 write_req->data = buffers;
372 uv_write(write_req, (uv_stream_t*)mStream, &buffers->at(0), buffers->size(), ws_server_bulk_write_callback);
373}
374
376void WSDPLHandler::error(int code, char const* message)
377{
378 static constexpr auto errorFMT = "HTTP/1.1 {} {}\r\ncontent-type: text/plain\r\n\r\n{}: {}\r\n";
379 std::string error = fmt::format(errorFMT, code, message, code, message);
380 char* reply = strdup(error.data());
381 uv_buf_t bfr = uv_buf_init(reply, error.size());
382 auto* error_rep = (uv_write_t*)malloc(sizeof(uv_write_t));
383 error_rep->data = reply;
384 uv_write(error_rep, (uv_stream_t*)mStream, &bfr, 1, ws_error_write_callback);
385}
386
388{
389 LOG(debug) << "Closing websocket connection to server";
390}
391
392void websocket_client_callback(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
393{
394 auto* context = (DriverClientContext*)stream->data;
395 auto& state = context->ref.get<DeviceState>();
397 assert(context->client);
398 if (nread == 0) {
399 return;
400 }
401 if (nread == UV_EOF) {
402 LOG(debug) << "EOF received from server, closing.";
403 uv_read_stop(stream);
405 return;
406 }
407 if (nread < 0) {
408 // FIXME: improve error message
409 // FIXME: should I close?
410 LOG(error) << "Error while reading from websocket";
411 uv_read_stop(stream);
413 return;
414 }
415 try {
416 LOG(debug) << "Data received from server";
417 parse_http_request(buf->base, nread, context->client);
418 free(buf->base);
419 } catch (RuntimeErrorRef& ref) {
421 LOG(error) << "Error while parsing request: " << err.what;
422 }
423}
424
425// FIXME: mNonce should be random
427 : mNonce{"dGhlIHNhbXBsZSBub25jZQ=="}
428{
429}
430
431void WSDPLClient::connect(ServiceRegistryRef ref, uv_stream_t* s, std::function<void()> handshake, std::unique_ptr<WebSocketHandler> handler)
432{
433 mStream = s;
434 mContext = std::make_unique<DriverClientContext>(DriverClientContext{.ref = ref, .client = this});
435 mHandshake = handshake;
436 mHandler = std::move(handler);
437 s->data = mContext.get();
438 uv_read_start((uv_stream_t*)s, (uv_alloc_cb)my_alloc_cb, websocket_client_callback);
439}
440
442{
443 auto& spec = mContext->ref.get<DeviceSpec const>();
444 std::vector<std::pair<std::string, std::string>> headers = {
445 {{"x-dpl-pid"}, std::to_string(getpid())},
446 {{"x-dpl-id"}, spec.id},
447 {{"x-dpl-name"}, spec.name}};
448 std::string handShakeString = encode_websocket_handshake_request("/", "dpl", 13, mNonce.c_str(), headers);
449 this->write(handShakeString.c_str(), handShakeString.size());
450}
451
452void WSDPLClient::replyVersion(std::string_view const& s)
453{
454 if (s != "HTTP/1.1") {
455 throw runtime_error("Not an HTTP reply");
456 }
457}
458
459void WSDPLClient::replyCode(std::string_view const& s)
460{
461 if (s != "101") {
462 throw runtime_error("Upgrade denied");
463 }
464}
465
466void WSDPLClient::header(std::string_view const& k, std::string_view const& v)
467{
469}
470
472{
473 for (auto [k, v] : mHeaders) {
474 LOG(info) << k << ": " << v;
475 }
476}
477
479{
481 if (mHeaders["upgrade"] != "websocket") {
482 throw runtime_error_f("No websocket upgrade");
483 }
484 // find is used to account for multiple options
485 if (mHeaders["connection"].find("upgrade") == std::string::npos) {
486 throw runtime_error_f("No connection upgrade");
487 }
488 if (mHeaders.count("sec-websocket-accept") == 0) {
489 throw runtime_error("sec-websocket-accept not found");
490 }
491
492 std::string expectedAccept = HTTPParserHelpers::calculateAccept(mNonce.c_str());
493 if (mHeaders["sec-websocket-accept"] != expectedAccept) {
494 throw runtime_error_f(R"(Invalid accept received: "%s", expected "%s")", mHeaders["sec-websocket-accept"].c_str(), expectedAccept.c_str());
495 }
496
497 LOG(info) << "Correctly handshaken websocket connection.";
499 mHandshaken = true;
500 mHandshake();
501}
502
507
509 std::vector<uv_buf_t> buffers;
511};
512
513void ws_client_write_callback(uv_write_t* h, int status)
514{
515 auto* context = (WriteRequestContext*)h->data;
516 if (status) {
517 LOG(error) << "uv_write error: " << uv_err_name(status);
518 free(h);
519 return;
520 }
521 auto& state = context->ref.get<DeviceState>();
523 if (context->buf.base) {
524 free(context->buf.base);
525 }
526 delete context;
527 free(h);
528}
529
530void ws_client_bulk_write_callback(uv_write_t* h, int status)
531{
532 auto* context = (BulkWriteRequestContext*)h->data;
533 auto& state = context->ref.get<DeviceState>();
534
536 if (status < 0) {
537 LOG(error) << "uv_write error: " << uv_err_name(status);
538 free(h);
539 return;
540 }
541 if (context->buffers.size()) {
542 for (auto& b : context->buffers) {
543 free(b.base);
544 }
545 }
546 delete context;
547 free(h);
548}
549
551void WSDPLClient::body(char* data, size_t s)
552{
553 decode_websocket(data, s, *mHandler.get());
554}
555
557void WSDPLClient::write(char const* message, size_t s)
558{
559 auto* context = new WriteRequestContext{.ref = mContext->ref};
560 context->buf = uv_buf_init(strdup(message), s);
561 auto* write_req = (uv_write_t*)malloc(sizeof(uv_write_t));
562 write_req->data = context;
563 uv_write(write_req, (uv_stream_t*)mStream, &context->buf, 1, ws_client_write_callback);
564}
565
566void WSDPLClient::write(std::vector<uv_buf_t>& outputs)
567{
568 if (outputs.empty()) {
569 return;
570 }
571 auto* write_req = (uv_write_t*)malloc(sizeof(uv_write_t));
572 auto* context = new BulkWriteRequestContext{.ref = mContext->ref};
573 context->buffers.swap(outputs);
574 write_req->data = context;
575 uv_write(write_req, (uv_stream_t*)mStream, &context->buffers.at(0),
576 context->buffers.size(), ws_client_bulk_write_callback);
577}
578
579} // namespace o2::framework
benchmark::State & state
struct uv_handle_s uv_handle_t
int32_t i
uint32_t c
Definition RawData.h:2
std::ostringstream debug
StringRef key
Class for time synchronization of RawReader instances.
GLsizeiptr size
Definition glcorearb.h:659
const GLdouble * v
Definition glcorearb.h:832
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLuint GLuint stream
Definition glcorearb.h:1806
GLint ref
Definition glcorearb.h:291
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
const GLuint * buffers
Definition glcorearb.h:656
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RuntimeErrorRef runtime_error(const char *)
void ws_handshake_done_callback(uv_write_t *h, int status)
void populateHeader(std::map< std::string, std::string > &headers, std::string_view const &k, std::string_view const &v)
void websocket_client_callback(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
void close_client_websocket(uv_handle_t *stream)
void encode_websocket_frames(std::vector< uv_buf_t > &outputs, char const *src, size_t size, WebSocketOpCode opcode, uint32_t mask)
void websocket_server_close_callback(uv_handle_t *handle)
Free any resource associated with the device - driver channel.
std::string encode_websocket_handshake_reply(char const *nonce)
void ws_error_write_callback(uv_write_t *h, int status)
void parse_http_request(char *start, size_t size, HTTPParser *parser)
std::string encode_websocket_handshake_request(const char *endpoint, const char *protocol, int version, char const *nonce, std::vector< std::pair< std::string, std::string > > headers)
void ws_client_bulk_write_callback(uv_write_t *h, int status)
void decode_websocket(char *start, size_t size, WebSocketHandler &handler)
void ws_server_write_callback(uv_write_t *h, int status)
void remoteGuiCallback(uv_timer_s *ctx)
RuntimeError & error_from_ref(RuntimeErrorRef)
void ws_client_write_callback(uv_write_t *h, int status)
void ws_server_bulk_write_callback(uv_write_t *h, int status)
void websocket_server_callback(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
Actually replies to any incoming websocket stuff.
RuntimeErrorRef runtime_error_f(const char *,...)
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
virtual void charIn(char key)=0
virtual void updateMousePos(float x, float y)=0
virtual void keyEvent(char key, bool down)=0
virtual void updateMouseWheel(int direction)=0
virtual void updateMouseButton(bool isClicked)=0
virtual void updateWindowSize(int x, int y)=0
std::string id
The id of the device, including time-pipelining and suffix.
Definition DeviceSpec.h:52
Running state information of a given device.
Definition DeviceState.h:34
Context for the client callbacks.
std::vector< DeviceInfo > * infos
std::vector< DeviceControl > * controls
An handler for a websocket message stream.
void endChunk() override
Invoked whenever we have no more input to process.
void frame(char const *frame, size_t s) override
void beginChunk() override
Invoked before processing the next round of input.
void control(char const *frame, size_t s) override
FIXME: not implemented.
void headers(std::map< std::string, std::string > const &headers) override
Invoked when all the headers are received.
void beginFragmentation() override
FIXME: not implemented.
void endFragmentation() override
FIXME: not implemented.
GUIWebSocketHandler(DriverServerContext &context, GuiRenderer *renderer)
std::set< GuiRenderer * > renderers
static std::string calculateAccept(const char *nonce)
Helper to calculate the reply to a nonce.
std::unique_ptr< WebSocketHandler > mHandler
void replyVersion(std::string_view const &s) override
std::function< void()> mHandshake
std::atomic< bool > mHandshaken
void connect(ServiceRegistryRef ref, uv_stream_t *stream, std::function< void()> handshake, std::unique_ptr< WebSocketHandler > handler)
std::unique_ptr< DriverClientContext > mContext
void body(char *data, size_t s) override
Actual handling of WS frames happens inside here.
void replyCode(std::string_view const &s) override
void dumpHeaders()
Dump headers.
void write(char const *, size_t)
Helper to write a message to the server.
void header(std::string_view const &k, std::string_view const &v) override
std::map< std::string, std::string > mHeaders
void method(std::string_view const &s) override
void body(char *data, size_t s) override
Actual handling of WS frames happens inside here.
void header(std::string_view const &k, std::string_view const &v) override
WSDPLHandler(uv_stream_t *stream, DriverServerContext *context)
DriverServerContext * mServerContext
void target(std::string_view const &s) override
std::unique_ptr< WebSocketHandler > mHandler
void write(char const *, size_t)
Helper to write a message to the associated client.
std::map< std::string, std::string > mHeaders
An handler for a websocket message stream.
Definition HTTPParser.h:136
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"