Project
Loading...
Searching...
No Matches
DPLWebSocket.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "Framework/Logger.h"
12#include "DPLWebSocket.h"
18#include "DriverServerContext.h"
19#include "DriverClientContext.h"
22#include "HTTPParser.h"
23#include <algorithm>
24#include <atomic>
25#include <uv.h>
26#include <sys/types.h>
27#include <unistd.h>
28#include <memory>
30
31namespace o2::framework
32{
33
34static void my_alloc_cb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
35{
36 buf->base = (char*)malloc(suggested_size);
37 buf->len = suggested_size;
38}
39
42{
43 LOG(debug) << "socket closed";
44 delete (WSDPLHandler*)handle->data;
45 free(handle);
46}
47
48void ws_error_write_callback(uv_write_t* h, int status)
49{
50 LOG(error) << "Error in write callback: " << uv_strerror(status);
51 if (h->data) {
52 free(h->data);
53 }
55 free(h);
56}
57
59void websocket_server_callback(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
60{
61 WSDPLHandler* server = (WSDPLHandler*)stream->data;
62 assert(server);
63 if (nread == 0) {
64 return;
65 }
66 if (nread == UV_EOF) {
67 LOG(detail) << "websocket_server_callback: communication with driver closed upon EOF";
69 return;
70 }
71 if (nread < 0) {
72 LOG(error) << "websocket_server_callback: Error while reading from websocket" << uv_strerror((int)nread);
74 return;
75 }
76 try {
77 parse_http_request(buf->base, nread, server);
78 free(buf->base);
79 } catch (RuntimeErrorRef& ref) {
81 LOG(error) << "Error while parsing request: " << err.what;
82 }
83}
84
87void ws_handshake_done_callback(uv_write_t* h, int status)
88{
89 if (status) {
90 LOG(error) << "uv_write error: " << uv_err_name(status);
91 free(h);
92 return;
93 }
94 uv_read_start((uv_stream_t*)h->handle, (uv_alloc_cb)my_alloc_cb, websocket_server_callback);
95}
96
97enum struct GUIOpcodes : uint8_t {
98 Mousepos = 1,
99 Mouseclick = 2,
100 Mousewheel = 3,
101 Window = 4,
102 Latency = 5,
103 Keydown = 6,
104 Keyup = 7,
105 Charin = 8
106};
107
111 : mContext{context}, mRenderer{renderer}
112 {
113 }
115 {
117 uv_timer_stop(&(mRenderer->drawTimer));
118 delete mRenderer;
119 LOGP(info, "RemoteGUI disconnected, {} left", mContext.gui->renderers.size());
120 }
121
122 void headers(std::map<std::string, std::string> const& headers) override {}
123 void beginFragmentation() override {}
124 void frame(char const* frame, size_t s) override
125 {
126 GUIOpcodes opcode = (GUIOpcodes) * (frame++);
127 switch (opcode) {
129 float* positions = (float*)frame;
130 mContext.gui->plugin->updateMousePos(positions[0], positions[1]);
131 break;
132 }
134 char isClicked = *frame;
135 mContext.gui->plugin->updateMouseButton(isClicked == 1);
136 break;
137 }
139 int movement = *frame;
141 break;
142 }
143 case GUIOpcodes::Window: {
144 int* size = (int*)frame;
146 break;
147 }
148 case GUIOpcodes::Latency: {
149 int lat = *((int*)frame);
150 lat = lat < 20 ? 20 : lat;
151 uv_timer_set_repeat(&(mRenderer->drawTimer), lat);
152 break;
153 }
154 case GUIOpcodes::Keydown: {
155 char key = *frame;
156 mContext.gui->plugin->keyEvent(key, true);
157 break;
158 }
159 case GUIOpcodes::Keyup: {
160 char key = *frame;
161 mContext.gui->plugin->keyEvent(key, false);
162 break;
163 }
164 case GUIOpcodes::Charin: {
165 char key = *frame;
167 break;
168 }
169 }
170 }
171 void endFragmentation() override {};
172 void control(char const* frame, size_t s) override {};
173 void beginChunk() override {};
174 void endChunk() override {};
175
180};
181
183 : mStream{s},
184 mServerContext{context}
185{
186}
187
188void WSDPLHandler::method(std::string_view const& s)
189{
190 if (s != "GET") {
191 throw WSError{400, "Bad Request"};
192 }
193}
194
195void WSDPLHandler::target(std::string_view const& s)
196{
197 if (s != "/" && s != "/status") {
198 throw WSError{404, "Unknown"};
199 }
200 mTarget = s;
201}
202
203void populateHeader(std::map<std::string, std::string>& headers, std::string_view const& k, std::string_view const& v)
204{
205 std::string kk{k};
206 std::string vv{v};
207 std::transform(kk.begin(), kk.end(), kk.begin(),
208 [](unsigned char c) { return std::tolower(c); });
209 if (kk != "sec-websocket-accept" && kk != "sec-websocket-key") {
210 std::transform(vv.begin(), vv.end(), vv.begin(),
211 [](unsigned char c) { return std::tolower(c); });
212 }
213 headers.insert(std::make_pair(kk, vv));
214}
215
216void remoteGuiCallback(uv_timer_s* ctx)
217{
218 auto* renderer = reinterpret_cast<GuiRenderer*>(ctx->data);
219 assert(renderer);
220
221 void* frame = nullptr;
222 void* draw_data = nullptr;
223 int size;
224 uint64_t frameStart = uv_hrtime();
225 uint64_t frameLatency = frameStart - renderer->gui->frameLast;
226
227 // if less than 15ms have passed reuse old frame
228 if (renderer->gui->lastFrame == nullptr || frameLatency / 1000000 > 15) {
229 renderer->gui->plugin->pollGUIPreRender(renderer->gui->window, (float)frameLatency / 1000000000.0f);
230 draw_data = renderer->gui->plugin->pollGUIRender(renderer->gui->callback);
231 renderer->gui->plugin->pollGUIPostRender(renderer->gui->window, draw_data);
232 } else {
233 draw_data = renderer->gui->lastFrame;
234 }
235
236 renderer->gui->plugin->getFrameRaw(draw_data, &frame, &size, renderer->updateTextures);
237 // For now we only sent the text atlas once
238 renderer->updateTextures = false;
239 std::vector<uv_buf_t> outputs;
240 encode_websocket_frames(outputs, (const char*)frame, size, WebSocketOpCode::Binary, 0);
241 renderer->handler->write(outputs);
242 free(frame);
243
244 renderer->guiConnected = true;
245
246 if (frameLatency / 1000000 > 15) {
247 uint64_t frameEnd = uv_hrtime();
248 *(renderer->gui->frameCost) = (frameEnd - frameStart) / 1000000.f;
249 *(renderer->gui->frameLatency) = frameLatency / 1000000.f;
250 renderer->gui->frameLast = frameStart;
251 renderer->gui->lastFrame = draw_data;
252 }
253}
254
255void WSDPLHandler::header(std::string_view const& k, std::string_view const& v)
256{
258}
259
261{
263 if (mHeaders["upgrade"] != "websocket") {
264 throw WSError{400, "Bad Request: not a websocket upgrade"};
265 }
266
267 if (mHeaders["connection"].find("upgrade") == std::string::npos) {
268 throw WSError{400, "Bad Request: connection not for upgrade"};
269 }
270 if (mHeaders["sec-websocket-protocol"] != "dpl") {
271 throw WSError{400, "Bad Request: websocket protocol not \"dpl\"."};
272 }
273 if (mHeaders.count("sec-websocket-key") == 0) {
274 throw WSError{400, "Bad Request: sec-websocket-key missing"};
275 }
276 if (mHeaders["sec-websocket-version"] != "13") {
277 throw WSError{400, "Bad Request: wrong protocol version"};
278 }
280 LOG(debug) << "Got upgrade request with nonce " << mHeaders["sec-websocket-key"].c_str();
281 std::string reply = encode_websocket_handshake_reply(mHeaders["sec-websocket-key"].c_str(), "dpl");
282 mHandshaken = true;
283
284 uv_buf_t bfr = uv_buf_init(strdup(reply.data()), reply.size());
285 auto* info_req = (uv_write_t*)malloc(sizeof(uv_write_t));
286 uv_write(info_req, (uv_stream_t*)mStream, &bfr, 1, ws_handshake_done_callback);
287 auto header = mHeaders.find("x-dpl-pid");
288 if (header != mHeaders.end()) {
289 LOG(debug) << "Driver connected to PID : " << header->second;
290 mHandler = std::make_unique<ControlWebSocketHandler>(*mServerContext);
291 mHandler->headers(mHeaders);
292
293 for (size_t i = 0; i < mServerContext->infos->size(); ++i) {
294 if (std::to_string((*mServerContext->infos)[i].pid) == header->second) {
295 (*mServerContext->controls)[i].controller = new DeviceController{this};
296 break;
297 }
298 }
299 } else if (mTarget == "/status" && mServerContext->isDriver) {
300 LOGP(info, "Status client connected ({} total)", mServerContext->statusHandlers.size() + 1);
301 auto* statusHandler = new StatusWebSocketHandler(*mServerContext, this);
302 mServerContext->statusHandlers.push_back(statusHandler);
303 mHandler = std::unique_ptr<WebSocketHandler>(statusHandler);
304 mHandler->headers(mHeaders);
305 } else {
306 if ((mServerContext->isDriver && getenv("DPL_DRIVER_REMOTE_GUI")) || ((mServerContext->isDriver == false) && getenv("DPL_DEVICE_REMOTE_GUI"))) {
307 LOG(info) << "Connection not bound to a PID";
308 auto* renderer = new GuiRenderer;
309 renderer->gui = mServerContext->gui;
310 renderer->handler = this;
311 uv_timer_init(mServerContext->loop, &(renderer->drawTimer));
312 renderer->drawTimer.data = renderer;
313 uv_timer_start(&(renderer->drawTimer), remoteGuiCallback, 0, 200);
314 mHandler = std::make_unique<GUIWebSocketHandler>(*mServerContext, renderer);
315 mHandler->headers(mHeaders);
316 mServerContext->gui->renderers.insert(renderer);
317
318 LOGP(info, "RemoteGUI connected, {} running", mServerContext->gui->renderers.size());
319 } else {
320 LOGP(warning, "Connection not bound to a PID however {} is not set. Skipping.",
321 mServerContext->isDriver ? "DPL_DRIVER_REMOTE_GUI" : "DPL_DEVICE_REMOTE_GUI");
322 throw WSError{418, "Remote GUI not enabled"};
323 }
324 }
325}
326
328void WSDPLHandler::body(char* data, size_t s)
329{
330 decode_websocket(data, s, *mHandler.get());
331}
332
333void ws_server_write_callback(uv_write_t* h, int status)
334{
335 if (status) {
336 LOG(error) << "uv_write error: " << uv_err_name(status);
337 free(h);
338 return;
339 }
340 if (h->data) {
341 free(h->data);
342 }
343 free(h);
344}
345
346void ws_server_bulk_write_callback(uv_write_t* h, int status)
347{
348 if (status) {
349 LOG(error) << "uv_write error: " << uv_err_name(status);
350 free(h);
351 return;
352 }
353 auto* buffers = (std::vector<uv_buf_t>*)h->data;
354 if (buffers) {
355 for (auto& b : *buffers) {
356 free(b.base);
357 }
358 }
359 delete buffers;
360 free(h);
361}
362
363void WSDPLHandler::write(char const* message, size_t s)
364{
365 uv_buf_t bfr = uv_buf_init(strdup(message), s);
366 auto* write_req = (uv_write_t*)malloc(sizeof(uv_write_t));
367 write_req->data = bfr.base;
368 uv_write(write_req, (uv_stream_t*)mStream, &bfr, 1, ws_server_write_callback);
369}
370
371void WSDPLHandler::write(std::vector<uv_buf_t>& outputs)
372{
373 if (outputs.empty()) {
374 return;
375 }
376 auto* write_req = (uv_write_t*)malloc(sizeof(uv_write_t));
377 auto* buffers = new std::vector<uv_buf_t>;
378 buffers->swap(outputs);
379 write_req->data = buffers;
380 uv_write(write_req, (uv_stream_t*)mStream, &buffers->at(0), buffers->size(), ws_server_bulk_write_callback);
381}
382
384void WSDPLHandler::error(int code, char const* message)
385{
386 static constexpr auto errorFMT = "HTTP/1.1 {} {}\r\ncontent-type: text/plain\r\n\r\n{}: {}\r\n";
387 std::string error = fmt::format(errorFMT, code, message, code, message);
388 char* reply = strdup(error.data());
389 uv_buf_t bfr = uv_buf_init(reply, error.size());
390 auto* error_rep = (uv_write_t*)malloc(sizeof(uv_write_t));
391 error_rep->data = reply;
392 uv_write(error_rep, (uv_stream_t*)mStream, &bfr, 1, ws_error_write_callback);
393}
394
396{
397 LOG(debug) << "Closing websocket connection to server";
398}
399
400void websocket_client_callback(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
401{
402 auto* context = (DriverClientContext*)stream->data;
403 auto& state = context->ref.get<DeviceState>();
405 assert(context->client);
406 if (nread == 0) {
407 return;
408 }
409 if (nread == UV_EOF) {
410 LOG(debug) << "EOF received from server, closing.";
411 uv_read_stop(stream);
413 return;
414 }
415 if (nread < 0) {
416 // FIXME: improve error message
417 // FIXME: should I close?
418 LOG(error) << "Error while reading from websocket";
419 uv_read_stop(stream);
421 return;
422 }
423 try {
424 LOG(debug) << "Data received from server";
425 parse_http_request(buf->base, nread, context->client);
426 free(buf->base);
427 } catch (RuntimeErrorRef& ref) {
429 LOG(error) << "Error while parsing request: " << err.what;
430 }
431}
432
433// FIXME: mNonce should be random
435 : mNonce{"dGhlIHNhbXBsZSBub25jZQ=="}
436{
437}
438
439void WSDPLClient::connect(ServiceRegistryRef ref, uv_stream_t* s, std::function<void()> handshake, std::unique_ptr<WebSocketHandler> handler)
440{
441 mStream = s;
442 mContext = std::make_unique<DriverClientContext>(DriverClientContext{.ref = ref, .client = this});
443 mHandshake = handshake;
444 mHandler = std::move(handler);
445 s->data = mContext.get();
446 uv_read_start((uv_stream_t*)s, (uv_alloc_cb)my_alloc_cb, websocket_client_callback);
447}
448
450{
451 auto& spec = mContext->ref.get<DeviceSpec const>();
452 std::vector<std::pair<std::string, std::string>> headers = {
453 {{"x-dpl-pid"}, std::to_string(getpid())},
454 {{"x-dpl-id"}, spec.id},
455 {{"x-dpl-name"}, spec.name}};
456 std::string handShakeString = encode_websocket_handshake_request("/", "dpl", 13, mNonce.c_str(), headers);
457 this->write(handShakeString.c_str(), handShakeString.size());
458}
459
460void WSDPLClient::replyVersion(std::string_view const& s)
461{
462 if (s != "HTTP/1.1") {
463 throw runtime_error("Not an HTTP reply");
464 }
465}
466
467void WSDPLClient::replyCode(std::string_view const& s)
468{
469 if (s != "101") {
470 throw runtime_error("Upgrade denied");
471 }
472}
473
474void WSDPLClient::header(std::string_view const& k, std::string_view const& v)
475{
477}
478
480{
481 for (auto [k, v] : mHeaders) {
482 LOG(info) << k << ": " << v;
483 }
484}
485
487{
489 if (mHeaders["upgrade"] != "websocket") {
490 throw runtime_error_f("No websocket upgrade");
491 }
492 // find is used to account for multiple options
493 if (mHeaders["connection"].find("upgrade") == std::string::npos) {
494 throw runtime_error_f("No connection upgrade");
495 }
496 if (mHeaders.count("sec-websocket-accept") == 0) {
497 throw runtime_error("sec-websocket-accept not found");
498 }
499
500 std::string expectedAccept = HTTPParserHelpers::calculateAccept(mNonce.c_str());
501 if (mHeaders["sec-websocket-accept"] != expectedAccept) {
502 throw runtime_error_f(R"(Invalid accept received: "%s", expected "%s")", mHeaders["sec-websocket-accept"].c_str(), expectedAccept.c_str());
503 }
504
505 LOG(info) << "Correctly handshaken websocket connection.";
507 mHandshaken = true;
508 mHandshake();
509}
510
515
517 std::vector<uv_buf_t> buffers;
519};
520
521void ws_client_write_callback(uv_write_t* h, int status)
522{
523 auto* context = (WriteRequestContext*)h->data;
524 if (status) {
525 LOG(error) << "uv_write error: " << uv_err_name(status);
526 free(h);
527 return;
528 }
529 auto& state = context->ref.get<DeviceState>();
531 if (context->buf.base) {
532 free(context->buf.base);
533 }
534 delete context;
535 free(h);
536}
537
538void ws_client_bulk_write_callback(uv_write_t* h, int status)
539{
540 auto* context = (BulkWriteRequestContext*)h->data;
541 auto& state = context->ref.get<DeviceState>();
542
544 if (status < 0) {
545 LOG(error) << "uv_write error: " << uv_err_name(status);
546 free(h);
547 return;
548 }
549 if (context->buffers.size()) {
550 for (auto& b : context->buffers) {
551 free(b.base);
552 }
553 }
554 delete context;
555 free(h);
556}
557
559void WSDPLClient::body(char* data, size_t s)
560{
561 decode_websocket(data, s, *mHandler.get());
562}
563
565void WSDPLClient::write(char const* message, size_t s)
566{
567 auto* context = new WriteRequestContext{.ref = mContext->ref};
568 context->buf = uv_buf_init(strdup(message), s);
569 auto* write_req = (uv_write_t*)malloc(sizeof(uv_write_t));
570 write_req->data = context;
571 uv_write(write_req, (uv_stream_t*)mStream, &context->buf, 1, ws_client_write_callback);
572}
573
574void WSDPLClient::write(std::vector<uv_buf_t>& outputs)
575{
576 if (outputs.empty()) {
577 return;
578 }
579 auto* write_req = (uv_write_t*)malloc(sizeof(uv_write_t));
580 auto* context = new BulkWriteRequestContext{.ref = mContext->ref};
581 context->buffers.swap(outputs);
582 write_req->data = context;
583 uv_write(write_req, (uv_stream_t*)mStream, &context->buffers.at(0),
584 context->buffers.size(), ws_client_bulk_write_callback);
585}
586
587} // namespace o2::framework
benchmark::State & state
struct uv_handle_s uv_handle_t
std::ostringstream debug
int32_t i
uint32_t c
Definition RawData.h:2
StringRef key
Class for time synchronization of RawReader instances.
GLsizeiptr size
Definition glcorearb.h:659
const GLdouble * v
Definition glcorearb.h:832
GLboolean GLboolean GLboolean b
Definition glcorearb.h:1233
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLuint GLuint stream
Definition glcorearb.h:1806
GLint ref
Definition glcorearb.h:291
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
const GLuint * buffers
Definition glcorearb.h:656
Defining PrimaryVertex explicitly as messageable.
Definition Cartesian.h:288
RuntimeErrorRef runtime_error(const char *)
void ws_handshake_done_callback(uv_write_t *h, int status)
void populateHeader(std::map< std::string, std::string > &headers, std::string_view const &k, std::string_view const &v)
void websocket_client_callback(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
void close_client_websocket(uv_handle_t *stream)
void encode_websocket_frames(std::vector< uv_buf_t > &outputs, char const *src, size_t size, WebSocketOpCode opcode, uint32_t mask)
void websocket_server_close_callback(uv_handle_t *handle)
Free any resource associated with the device - driver channel.
void ws_error_write_callback(uv_write_t *h, int status)
void parse_http_request(char *start, size_t size, HTTPParser *parser)
std::string encode_websocket_handshake_request(const char *endpoint, const char *protocol, int version, char const *nonce, std::vector< std::pair< std::string, std::string > > headers)
void ws_client_bulk_write_callback(uv_write_t *h, int status)
void decode_websocket(char *start, size_t size, WebSocketHandler &handler)
void ws_server_write_callback(uv_write_t *h, int status)
void remoteGuiCallback(uv_timer_s *ctx)
RuntimeError & error_from_ref(RuntimeErrorRef)
void ws_client_write_callback(uv_write_t *h, int status)
void ws_server_bulk_write_callback(uv_write_t *h, int status)
void websocket_server_callback(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
Actually replies to any incoming websocket stuff.
RuntimeErrorRef runtime_error_f(const char *,...)
std::string encode_websocket_handshake_reply(char const *nonce, const char *protocol)
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
virtual void charIn(char key)=0
virtual void updateMousePos(float x, float y)=0
virtual void keyEvent(char key, bool down)=0
virtual void updateMouseWheel(int direction)=0
virtual void updateMouseButton(bool isClicked)=0
virtual void updateWindowSize(int x, int y)=0
std::string id
The id of the device, including time-pipelining and suffix.
Definition DeviceSpec.h:52
Running state information of a given device.
Definition DeviceState.h:34
Context for the client callbacks.
std::vector< StatusWebSocketHandler * > statusHandlers
std::vector< DeviceInfo > * infos
std::vector< DeviceControl > * controls
An handler for a websocket message stream.
void endChunk() override
Invoked whenever we have no more input to process.
void frame(char const *frame, size_t s) override
void beginChunk() override
Invoked before processing the next round of input.
void control(char const *frame, size_t s) override
FIXME: not implemented.
void headers(std::map< std::string, std::string > const &headers) override
Invoked when all the headers are received.
void beginFragmentation() override
FIXME: not implemented.
void endFragmentation() override
FIXME: not implemented.
GUIWebSocketHandler(DriverServerContext &context, GuiRenderer *renderer)
std::set< GuiRenderer * > renderers
static std::string calculateAccept(const char *nonce)
Helper to calculate the reply to a nonce.
std::unique_ptr< WebSocketHandler > mHandler
void replyVersion(std::string_view const &s) override
std::function< void()> mHandshake
std::atomic< bool > mHandshaken
void connect(ServiceRegistryRef ref, uv_stream_t *stream, std::function< void()> handshake, std::unique_ptr< WebSocketHandler > handler)
std::unique_ptr< DriverClientContext > mContext
void body(char *data, size_t s) override
Actual handling of WS frames happens inside here.
void replyCode(std::string_view const &s) override
void dumpHeaders()
Dump headers.
void write(char const *, size_t)
Helper to write a message to the server.
void header(std::string_view const &k, std::string_view const &v) override
std::map< std::string, std::string > mHeaders
void method(std::string_view const &s) override
void body(char *data, size_t s) override
Actual handling of WS frames happens inside here.
void header(std::string_view const &k, std::string_view const &v) override
WSDPLHandler(uv_stream_t *stream, DriverServerContext *context)
DriverServerContext * mServerContext
void target(std::string_view const &s) override
std::unique_ptr< WebSocketHandler > mHandler
void write(char const *, size_t)
Helper to write a message to the associated client.
std::map< std::string, std::string > mHeaders
An handler for a websocket message stream.
Definition HTTPParser.h:137
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"