Project
Loading...
Searching...
No Matches
StatusWebSocketHandler.cxx
Go to the documentation of this file.
1// Copyright 2019-2026 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13#include "DPLWebSocket.h"
14#include "DriverServerContext.h"
16#include <csignal>
17#include <unistd.h>
25#include "Framework/Signpost.h"
26#include <algorithm>
27#include <cstdio>
28#include <string>
29#include <string_view>
30#include <vector>
31
32namespace o2::framework
33{
34
35namespace
36{
37
38std::string jsonEscape(std::string_view s)
39{
40 std::string out;
41 out.reserve(s.size() + 4);
42 for (unsigned char c : s) {
43 switch (c) {
44 case '"':
45 out += "\\\"";
46 break;
47 case '\\':
48 out += "\\\\";
49 break;
50 case '\n':
51 out += "\\n";
52 break;
53 case '\r':
54 out += "\\r";
55 break;
56 case '\t':
57 out += "\\t";
58 break;
59 default:
60 if (c < 0x20) {
61 char buf[8];
62 snprintf(buf, sizeof(buf), "\\u%04x", c);
63 out += buf;
64 } else {
65 out += static_cast<char>(c);
66 }
67 }
68 }
69 return out;
70}
71
72char const* logLevelName(LogParsingHelpers::LogLevel level)
73{
74 switch (level) {
75 case LogParsingHelpers::LogLevel::Debug:
76 return "debug";
77 case LogParsingHelpers::LogLevel::Info:
78 return "info";
79 case LogParsingHelpers::LogLevel::Important:
80 return "important";
81 case LogParsingHelpers::LogLevel::Warning:
82 return "warning";
83 case LogParsingHelpers::LogLevel::Alarm:
84 return "alarm";
85 case LogParsingHelpers::LogLevel::Error:
86 return "error";
87 case LogParsingHelpers::LogLevel::Critical:
88 return "critical";
89 case LogParsingHelpers::LogLevel::Fatal:
90 return "fatal";
91 default:
92 return "unknown";
93 }
94}
95
96char const* streamingStateName(StreamingState s)
97{
98 switch (s) {
100 return "Streaming";
102 return "EndOfStreaming";
104 return "Idle";
105 default:
106 return "Unknown";
107 }
108}
109
110void appendMetricValue(std::string& out, DeviceMetricsInfo const& info, size_t mi)
111{
112 auto const& metric = info.metrics[mi];
113 if (metric.pos == 0) {
114 out += "null";
115 return;
116 }
117 size_t last = (metric.pos - 1) % metricStorageSize(metric.type);
118 switch (metric.type) {
119 case MetricType::Int:
120 out += std::to_string(info.intMetrics[metric.storeIdx][last]);
121 break;
122 case MetricType::Float: {
123 char buf[32];
124 snprintf(buf, sizeof(buf), "%g", static_cast<double>(info.floatMetrics[metric.storeIdx][last]));
125 out += buf;
126 break;
127 }
129 out += std::to_string(info.uint64Metrics[metric.storeIdx][last]);
130 break;
131 default:
132 out += "null";
133 }
134}
135
139std::string_view extractStringField(std::string_view json, std::string_view key)
140{
141 std::string needle;
142 needle += '"';
143 needle += key;
144 needle += "\":";
145 auto pos = json.find(needle);
146 if (pos == std::string_view::npos) {
147 return {};
148 }
149 pos += needle.size();
150 // skip optional whitespace between ':' and '"'
151 while (pos < json.size() && json[pos] == ' ') {
152 ++pos;
153 }
154 if (pos >= json.size() || json[pos] != '"') {
155 return {};
156 }
157 ++pos; // skip opening quote
158 auto end = json.find('"', pos);
159 if (end == std::string_view::npos) {
160 return {};
161 }
162 return json.substr(pos, end - pos);
163}
164
167std::string_view extractArrayField(std::string_view json, std::string_view key)
168{
169 std::string needle;
170 needle += '"';
171 needle += key;
172 needle += "\":";
173 auto pos = json.find(needle);
174 if (pos == std::string_view::npos) {
175 return {};
176 }
177 pos += needle.size();
178 // skip whitespace
179 while (pos < json.size() && json[pos] == ' ') {
180 ++pos;
181 }
182 if (pos >= json.size() || json[pos] != '[') {
183 return {};
184 }
185 auto start = pos;
186 size_t depth = 0;
187 while (pos < json.size()) {
188 if (json[pos] == '[') {
189 ++depth;
190 } else if (json[pos] == ']') {
191 --depth;
192 if (depth == 0) {
193 return json.substr(start, pos - start + 1);
194 }
195 }
196 ++pos;
197 }
198 return {};
199}
200
203template <typename F>
204void forEachStringInArray(std::string_view arr, F&& callback)
205{
206 // arr is like ["name1","name2"]
207 size_t pos = 0;
208 while (pos < arr.size()) {
209 auto q = arr.find('"', pos);
210 if (q == std::string_view::npos) {
211 break;
212 }
213 auto end = arr.find('"', q + 1);
214 if (end == std::string_view::npos) {
215 break;
216 }
217 callback(arr.substr(q + 1, end - q - 1));
218 pos = end + 1;
219 }
220}
221
222} // anonymous namespace
223
225 : mContext{context}, mHandler{handler}
226{
227}
228
230{
231 auto& handlers = mContext.statusHandlers;
232 handlers.erase(std::remove(handlers.begin(), handlers.end(), this), handlers.end());
233}
234
235void StatusWebSocketHandler::headers(std::map<std::string, std::string> const&)
236{
237 sendSnapshot();
238}
239
240void StatusWebSocketHandler::frame(char const* data, size_t s)
241{
242 std::string_view msg{data, s};
243 auto cmd = extractStringField(msg, "cmd");
244 if (cmd.empty()) {
245 return;
246 }
247 auto deviceName = extractStringField(msg, "device");
248
249 if (cmd == "list_metrics") {
250 handleListMetrics(deviceName);
251 } else if (cmd == "subscribe") {
252 handleSubscribe(deviceName, extractArrayField(msg, "metrics"));
253 } else if (cmd == "unsubscribe") {
254 handleUnsubscribe(deviceName, extractArrayField(msg, "metrics"));
255 } else if (cmd == "subscribe_logs") {
256 handleSubscribeLogs(deviceName);
257 } else if (cmd == "unsubscribe_logs") {
258 handleUnsubscribeLogs(deviceName);
259 } else if (cmd == "start_devices") {
260 handleStartDevices();
261 } else if (cmd == "enable_signpost") {
262 handleEnableSignpost(deviceName, extractArrayField(msg, "streams"));
263 } else if (cmd == "disable_signpost") {
264 handleDisableSignpost(deviceName, extractArrayField(msg, "streams"));
265 }
266}
267
268void StatusWebSocketHandler::sendText(std::string const& json)
269{
270 std::vector<uv_buf_t> outputs;
271 encode_websocket_frames(outputs, json.data(), json.size(), WebSocketOpCode::Text, 0);
272 mHandler->write(outputs);
273}
274
276{
277 auto const& specs = *mContext.specs;
278 auto const& infos = *mContext.infos;
279
280 // Size subscription tables to current device count; grow lazily as needed.
281 mSubscribedMetrics.resize(specs.size());
282 mLastLogSeq.resize(infos.size());
283 for (size_t di = 0; di < infos.size(); ++di) {
284 mLastLogSeq[di] = infos[di].logSeq;
285 }
286
287 std::string out;
288 out.reserve(512 + specs.size() * 128);
289 out += R"({"type":"snapshot","devices":[)";
290 for (size_t di = 0; di < specs.size(); ++di) {
291 if (di > 0) {
292 out += ',';
293 }
294 auto const& info = infos[di];
295 out += R"({"name":")";
296 out += jsonEscape(specs[di].name);
297 out += R"(","pid":)";
298 out += std::to_string(info.pid);
299 out += R"(,"active":)";
300 out += info.active ? "true" : "false";
301 out += R"(,"streamingState":")";
302 out += streamingStateName(info.streamingState);
303 out += R"(","deviceState":")";
304 out += jsonEscape(info.deviceState);
305 out += R"("})";
306 }
307 out += "]}";
308 sendText(out);
309}
310
312{
313 auto const& specs = *mContext.specs;
314 auto const& metrics = *mContext.metrics;
315
316 if (deviceIndex >= specs.size() || deviceIndex >= metrics.size()) {
317 return;
318 }
319
320 // Lazily grow the subscription table if new devices were added after snapshot.
321 if (mSubscribedMetrics.size() <= deviceIndex) {
322 mSubscribedMetrics.resize(deviceIndex + 1);
323 }
324
325 auto const& subscribed = mSubscribedMetrics[deviceIndex];
326 if (subscribed.empty()) {
327 return;
328 }
329
330 auto const& info = metrics[deviceIndex];
331 std::string metricsJson;
332 metricsJson += '{';
333 bool first = true;
334 for (size_t mi = 0; mi < info.metrics.size(); ++mi) {
335 if (!info.changed[mi]) {
336 continue;
337 }
338 auto const& metric = info.metrics[mi];
339 if (metric.type == MetricType::String ||
340 metric.type == MetricType::Enum ||
341 metric.type == MetricType::Unknown) {
342 continue;
343 }
344 auto const& label = info.metricLabels[mi];
345 std::string_view labelSV{label.label, label.size};
346 if (subscribed.find(std::string(labelSV)) == subscribed.end()) {
347 continue;
348 }
349 if (!first) {
350 metricsJson += ',';
351 }
352 first = false;
353 metricsJson += '"';
354 metricsJson += jsonEscape(labelSV);
355 metricsJson += "\":";
356 appendMetricValue(metricsJson, info, mi);
357 }
358 metricsJson += '}';
359
360 if (first) {
361 // Nothing subscribed changed in this cycle.
362 return;
363 }
364
365 std::string out;
366 out += R"({"type":"update","device":)";
367 out += std::to_string(deviceIndex);
368 out += R"(,"name":")";
369 out += jsonEscape(specs[deviceIndex].name);
370 out += R"(","metrics":)";
371 out += metricsJson;
372 out += '}';
373 sendText(out);
374}
375
376void StatusWebSocketHandler::handleListMetrics(std::string_view deviceName)
377{
378 size_t di = findDeviceIndex(deviceName);
379 if (di == SIZE_MAX) {
380 return;
381 }
382 auto const& metrics = *mContext.metrics;
383
384 std::string out;
385 out += R"({"type":"metrics_list","device":")";
386 out += jsonEscape(deviceName);
387 out += R"(","metrics":[)";
388 bool first = true;
389 if (di < metrics.size()) {
390 auto const& info = metrics[di];
391 for (size_t mi = 0; mi < info.metrics.size(); ++mi) {
392 auto const& metric = info.metrics[mi];
393 if (metric.type == MetricType::String ||
394 metric.type == MetricType::Enum ||
395 metric.type == MetricType::Unknown) {
396 continue;
397 }
398 if (!first) {
399 out += ',';
400 }
401 first = false;
402 auto const& label = info.metricLabels[mi];
403 out += '"';
404 out += jsonEscape({label.label, label.size});
405 out += '"';
406 }
407 }
408 out += "]}";
409 sendText(out);
410}
411
412void StatusWebSocketHandler::handleSubscribe(std::string_view deviceName, std::string_view metricsArr)
413{
414 size_t di = findDeviceIndex(deviceName);
415 if (di == SIZE_MAX || metricsArr.empty()) {
416 return;
417 }
418 if (mSubscribedMetrics.size() <= di) {
419 mSubscribedMetrics.resize(di + 1);
420 }
421 forEachStringInArray(metricsArr, [&](std::string_view name) {
422 mSubscribedMetrics[di].emplace(name);
423 });
424}
425
426void StatusWebSocketHandler::handleUnsubscribe(std::string_view deviceName, std::string_view metricsArr)
427{
428 size_t di = findDeviceIndex(deviceName);
429 if (di == SIZE_MAX || metricsArr.empty() || di >= mSubscribedMetrics.size()) {
430 return;
431 }
432 forEachStringInArray(metricsArr, [&](std::string_view name) {
433 mSubscribedMetrics[di].erase(std::string(name));
434 });
435}
436
437size_t StatusWebSocketHandler::findDeviceIndex(std::string_view name) const
438{
439 auto const& specs = *mContext.specs;
440 for (size_t di = 0; di < specs.size(); ++di) {
441 if (specs[di].name == name) {
442 return di;
443 }
444 }
445 return SIZE_MAX;
446}
447
448void StatusWebSocketHandler::handleStartDevices()
449{
450 for (auto const& info : *mContext.infos) {
451 if (info.active) {
452 kill(info.pid, SIGCONT);
453 }
454 }
455}
456
457void StatusWebSocketHandler::handleEnableSignpost(std::string_view deviceName, std::string_view streamsArr)
458{
459 if (streamsArr.empty()) {
460 return;
461 }
462 if (deviceName.empty()) {
463 // Driver process — toggle in-process via o2_walk_logs.
464 forEachStringInArray(streamsArr, [](std::string_view streamName) {
465 std::string target(streamName);
466 o2_walk_logs([](char const* name, void* l, void* context) -> bool {
467 auto* log = static_cast<_o2_log_t*>(l);
468 if (static_cast<std::string*>(context)->compare(name) == 0) {
469 _o2_log_set_stacktrace(log, log->defaultStacktrace);
470 return false;
471 }
472 return true;
473 }, &target);
474 });
475 } else {
476 size_t di = findDeviceIndex(deviceName);
477 if (di == SIZE_MAX || di >= mContext.controls->size() || !(*mContext.controls)[di].controller) {
478 return;
479 }
480 auto* controller = (*mContext.controls)[di].controller;
481 forEachStringInArray(streamsArr, [controller](std::string_view name) {
482 std::string cmd = "/signpost:enable ";
483 cmd += name;
484 controller->write(cmd.c_str(), cmd.size());
485 });
486 }
487}
488
489void StatusWebSocketHandler::handleDisableSignpost(std::string_view deviceName, std::string_view streamsArr)
490{
491 if (streamsArr.empty()) {
492 return;
493 }
494 if (deviceName.empty()) {
495 forEachStringInArray(streamsArr, [](std::string_view streamName) {
496 std::string target(streamName);
497 o2_walk_logs([](char const* name, void* l, void* context) -> bool {
498 auto* log = static_cast<_o2_log_t*>(l);
499 if (static_cast<std::string*>(context)->compare(name) == 0) {
501 return false;
502 }
503 return true;
504 }, &target);
505 });
506 } else {
507 size_t di = findDeviceIndex(deviceName);
508 if (di == SIZE_MAX || di >= mContext.controls->size() || !(*mContext.controls)[di].controller) {
509 return;
510 }
511 auto* controller = (*mContext.controls)[di].controller;
512 forEachStringInArray(streamsArr, [controller](std::string_view name) {
513 std::string cmd = "/signpost:disable ";
514 cmd += name;
515 controller->write(cmd.c_str(), cmd.size());
516 });
517 }
518}
519
520void StatusWebSocketHandler::handleSubscribeLogs(std::string_view deviceName)
521{
522 size_t di = findDeviceIndex(deviceName);
523 if (di == SIZE_MAX) {
524 return;
525 }
526 if (mLastLogSeq.size() <= di) {
527 mLastLogSeq.resize(di + 1, 0);
528 }
529 // Start the cursor at the current log position so we only push future lines.
530 mLastLogSeq[di] = (*mContext.infos)[di].logSeq;
531 mLogSubscriptions.insert(di);
532}
533
534void StatusWebSocketHandler::handleUnsubscribeLogs(std::string_view deviceName)
535{
536 size_t di = findDeviceIndex(deviceName);
537 if (di == SIZE_MAX) {
538 return;
539 }
540 mLogSubscriptions.erase(di);
541}
542
544{
545 if (mLogSubscriptions.find(deviceIndex) == mLogSubscriptions.end()) {
546 return;
547 }
548 auto const& infos = *mContext.infos;
549 auto const& specs = *mContext.specs;
550 if (deviceIndex >= infos.size() || deviceIndex >= specs.size()) {
551 return;
552 }
553 if (mLastLogSeq.size() <= deviceIndex) {
554 mLastLogSeq.resize(deviceIndex + 1, 0);
555 }
556
557 auto const& info = infos[deviceIndex];
558 size_t newLines = info.logSeq - mLastLogSeq[deviceIndex];
559 if (newLines == 0) {
560 return;
561 }
562 // Cap to buffer size to avoid re-reading overwritten entries.
563 if (newLines > info.history.size()) {
564 newLines = info.history.size();
565 }
566
567 size_t histSize = info.history.size();
568 // The oldest unread entry sits at (historyPos - newLines + histSize) % histSize.
569 size_t startPos = (info.historyPos + histSize - newLines) % histSize;
570
571 std::string_view devName = specs[deviceIndex].name;
572 for (size_t i = 0; i < newLines; ++i) {
573 size_t pos = (startPos + i) % histSize;
574 std::string out;
575 out += R"({"type":"log","device":")";
576 out += jsonEscape(devName);
577 out += R"(","level":")";
578 out += logLevelName(info.historyLevel[pos]);
579 out += R"(","line":")";
580 out += jsonEscape(info.history[pos]);
581 out += R"("})";
582 sendText(out);
583 }
584 mLastLogSeq[deviceIndex] = info.logSeq;
585}
586
587} // namespace o2::framework
int32_t i
uint16_t pos
Definition RawData.h:3
uint32_t c
Definition RawData.h:2
o2_log_handle_t * o2_walk_logs(bool(*callback)(char const *name, void *log, void *context), void *context=nullptr)
void _o2_log_set_stacktrace(_o2_log_t *log, int stacktrace)
nlohmann::json json
StringRef key
GLuint GLuint end
Definition glcorearb.h:469
GLuint const GLchar * name
Definition glcorearb.h:781
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
Definition glcorearb.h:5500
GLenum target
Definition glcorearb.h:1641
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint GLint GLsizei GLsizei GLsizei depth
Definition glcorearb.h:470
GLuint start
Definition glcorearb.h:469
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
constexpr size_t metricStorageSize()
void encode_websocket_frames(std::vector< uv_buf_t > &outputs, char const *src, size_t size, WebSocketOpCode opcode, uint32_t mask)
@ EndOfStreaming
End of streaming requested, but not notified.
@ Streaming
Data is being processed.
@ Idle
End of streaming notified.
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::vector< DeviceSpec > * specs
std::vector< DeviceMetricsInfo > * metrics
std::vector< StatusWebSocketHandler * > statusHandlers
std::vector< DeviceInfo > * infos
std::vector< DeviceControl > * controls
LogLevel
Possible log levels for device log entries.
StatusWebSocketHandler(DriverServerContext &context, WSDPLHandler *handler)
void sendSnapshot()
Send a minimal JSON snapshot (device list + basic state, no metrics/logs).
void frame(char const *data, size_t s) override
Handles incoming commands from the MCP client.
void headers(std::map< std::string, std::string > const &headers) override
Sends the minimal snapshot on handshake completion.
void write(char const *, size_t)
Helper to write a message to the associated client.
void compare(std::string_view s1, std::string_view s2)
uint64_t const void const *restrict const msg
Definition x9.h:153