Project
Loading...
Searching...
No Matches
StatusWebSocketHandler.cxx
Go to the documentation of this file.
1// Copyright 2019-2026 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13#include "DPLWebSocket.h"
14#include "DriverServerContext.h"
20#include <algorithm>
21#include <cstdio>
22#include <string>
23#include <string_view>
24#include <vector>
25
26namespace o2::framework
27{
28
29namespace
30{
31
32std::string jsonEscape(std::string_view s)
33{
34 std::string out;
35 out.reserve(s.size() + 4);
36 for (unsigned char c : s) {
37 switch (c) {
38 case '"':
39 out += "\\\"";
40 break;
41 case '\\':
42 out += "\\\\";
43 break;
44 case '\n':
45 out += "\\n";
46 break;
47 case '\r':
48 out += "\\r";
49 break;
50 case '\t':
51 out += "\\t";
52 break;
53 default:
54 if (c < 0x20) {
55 char buf[8];
56 snprintf(buf, sizeof(buf), "\\u%04x", c);
57 out += buf;
58 } else {
59 out += static_cast<char>(c);
60 }
61 }
62 }
63 return out;
64}
65
66char const* logLevelName(LogParsingHelpers::LogLevel level)
67{
68 switch (level) {
69 case LogParsingHelpers::LogLevel::Debug:
70 return "debug";
71 case LogParsingHelpers::LogLevel::Info:
72 return "info";
73 case LogParsingHelpers::LogLevel::Important:
74 return "important";
75 case LogParsingHelpers::LogLevel::Warning:
76 return "warning";
77 case LogParsingHelpers::LogLevel::Alarm:
78 return "alarm";
79 case LogParsingHelpers::LogLevel::Error:
80 return "error";
81 case LogParsingHelpers::LogLevel::Critical:
82 return "critical";
83 case LogParsingHelpers::LogLevel::Fatal:
84 return "fatal";
85 default:
86 return "unknown";
87 }
88}
89
90char const* streamingStateName(StreamingState s)
91{
92 switch (s) {
94 return "Streaming";
96 return "EndOfStreaming";
98 return "Idle";
99 default:
100 return "Unknown";
101 }
102}
103
104void appendMetricValue(std::string& out, DeviceMetricsInfo const& info, size_t mi)
105{
106 auto const& metric = info.metrics[mi];
107 if (metric.pos == 0) {
108 out += "null";
109 return;
110 }
111 size_t last = (metric.pos - 1) % metricStorageSize(metric.type);
112 switch (metric.type) {
113 case MetricType::Int:
114 out += std::to_string(info.intMetrics[metric.storeIdx][last]);
115 break;
116 case MetricType::Float: {
117 char buf[32];
118 snprintf(buf, sizeof(buf), "%g", static_cast<double>(info.floatMetrics[metric.storeIdx][last]));
119 out += buf;
120 break;
121 }
123 out += std::to_string(info.uint64Metrics[metric.storeIdx][last]);
124 break;
125 default:
126 out += "null";
127 }
128}
129
133std::string_view extractStringField(std::string_view json, std::string_view key)
134{
135 std::string needle;
136 needle += '"';
137 needle += key;
138 needle += "\":";
139 auto pos = json.find(needle);
140 if (pos == std::string_view::npos) {
141 return {};
142 }
143 pos += needle.size();
144 // skip optional whitespace between ':' and '"'
145 while (pos < json.size() && json[pos] == ' ') {
146 ++pos;
147 }
148 if (pos >= json.size() || json[pos] != '"') {
149 return {};
150 }
151 ++pos; // skip opening quote
152 auto end = json.find('"', pos);
153 if (end == std::string_view::npos) {
154 return {};
155 }
156 return json.substr(pos, end - pos);
157}
158
161std::string_view extractArrayField(std::string_view json, std::string_view key)
162{
163 std::string needle;
164 needle += '"';
165 needle += key;
166 needle += "\":";
167 auto pos = json.find(needle);
168 if (pos == std::string_view::npos) {
169 return {};
170 }
171 pos += needle.size();
172 // skip whitespace
173 while (pos < json.size() && json[pos] == ' ') {
174 ++pos;
175 }
176 if (pos >= json.size() || json[pos] != '[') {
177 return {};
178 }
179 auto start = pos;
180 size_t depth = 0;
181 while (pos < json.size()) {
182 if (json[pos] == '[') {
183 ++depth;
184 } else if (json[pos] == ']') {
185 --depth;
186 if (depth == 0) {
187 return json.substr(start, pos - start + 1);
188 }
189 }
190 ++pos;
191 }
192 return {};
193}
194
197template <typename F>
198void forEachStringInArray(std::string_view arr, F&& callback)
199{
200 // arr is like ["name1","name2"]
201 size_t pos = 0;
202 while (pos < arr.size()) {
203 auto q = arr.find('"', pos);
204 if (q == std::string_view::npos) {
205 break;
206 }
207 auto end = arr.find('"', q + 1);
208 if (end == std::string_view::npos) {
209 break;
210 }
211 callback(arr.substr(q + 1, end - q - 1));
212 pos = end + 1;
213 }
214}
215
216} // anonymous namespace
217
219 : mContext{context}, mHandler{handler}
220{
221}
222
224{
225 auto& handlers = mContext.statusHandlers;
226 handlers.erase(std::remove(handlers.begin(), handlers.end(), this), handlers.end());
227}
228
229void StatusWebSocketHandler::headers(std::map<std::string, std::string> const&)
230{
231 sendSnapshot();
232}
233
234void StatusWebSocketHandler::frame(char const* data, size_t s)
235{
236 std::string_view msg{data, s};
237 auto cmd = extractStringField(msg, "cmd");
238 if (cmd.empty()) {
239 return;
240 }
241 auto deviceName = extractStringField(msg, "device");
242
243 if (cmd == "list_metrics") {
244 handleListMetrics(deviceName);
245 } else if (cmd == "subscribe") {
246 handleSubscribe(deviceName, extractArrayField(msg, "metrics"));
247 } else if (cmd == "unsubscribe") {
248 handleUnsubscribe(deviceName, extractArrayField(msg, "metrics"));
249 } else if (cmd == "subscribe_logs") {
250 handleSubscribeLogs(deviceName);
251 } else if (cmd == "unsubscribe_logs") {
252 handleUnsubscribeLogs(deviceName);
253 }
254}
255
256void StatusWebSocketHandler::sendText(std::string const& json)
257{
258 std::vector<uv_buf_t> outputs;
259 encode_websocket_frames(outputs, json.data(), json.size(), WebSocketOpCode::Text, 0);
260 mHandler->write(outputs);
261}
262
264{
265 auto const& specs = *mContext.specs;
266 auto const& infos = *mContext.infos;
267
268 // Size subscription tables to current device count; grow lazily as needed.
269 mSubscribedMetrics.resize(specs.size());
270 mLastLogSeq.resize(infos.size());
271 for (size_t di = 0; di < infos.size(); ++di) {
272 mLastLogSeq[di] = infos[di].logSeq;
273 }
274
275 std::string out;
276 out.reserve(512 + specs.size() * 128);
277 out += R"({"type":"snapshot","devices":[)";
278 for (size_t di = 0; di < specs.size(); ++di) {
279 if (di > 0) {
280 out += ',';
281 }
282 auto const& info = infos[di];
283 out += R"({"name":")";
284 out += jsonEscape(specs[di].name);
285 out += R"(","pid":)";
286 out += std::to_string(info.pid);
287 out += R"(,"active":)";
288 out += info.active ? "true" : "false";
289 out += R"(,"streamingState":")";
290 out += streamingStateName(info.streamingState);
291 out += R"(","deviceState":")";
292 out += jsonEscape(info.deviceState);
293 out += R"("})";
294 }
295 out += "]}";
296 sendText(out);
297}
298
300{
301 auto const& specs = *mContext.specs;
302 auto const& metrics = *mContext.metrics;
303
304 if (deviceIndex >= specs.size() || deviceIndex >= metrics.size()) {
305 return;
306 }
307
308 // Lazily grow the subscription table if new devices were added after snapshot.
309 if (mSubscribedMetrics.size() <= deviceIndex) {
310 mSubscribedMetrics.resize(deviceIndex + 1);
311 }
312
313 auto const& subscribed = mSubscribedMetrics[deviceIndex];
314 if (subscribed.empty()) {
315 return;
316 }
317
318 auto const& info = metrics[deviceIndex];
319 std::string metricsJson;
320 metricsJson += '{';
321 bool first = true;
322 for (size_t mi = 0; mi < info.metrics.size(); ++mi) {
323 if (!info.changed[mi]) {
324 continue;
325 }
326 auto const& metric = info.metrics[mi];
327 if (metric.type == MetricType::String ||
328 metric.type == MetricType::Enum ||
329 metric.type == MetricType::Unknown) {
330 continue;
331 }
332 auto const& label = info.metricLabels[mi];
333 std::string_view labelSV{label.label, label.size};
334 if (subscribed.find(std::string(labelSV)) == subscribed.end()) {
335 continue;
336 }
337 if (!first) {
338 metricsJson += ',';
339 }
340 first = false;
341 metricsJson += '"';
342 metricsJson += jsonEscape(labelSV);
343 metricsJson += "\":";
344 appendMetricValue(metricsJson, info, mi);
345 }
346 metricsJson += '}';
347
348 if (first) {
349 // Nothing subscribed changed in this cycle.
350 return;
351 }
352
353 std::string out;
354 out += R"({"type":"update","device":)";
355 out += std::to_string(deviceIndex);
356 out += R"(,"name":")";
357 out += jsonEscape(specs[deviceIndex].name);
358 out += R"(","metrics":)";
359 out += metricsJson;
360 out += '}';
361 sendText(out);
362}
363
364void StatusWebSocketHandler::handleListMetrics(std::string_view deviceName)
365{
366 size_t di = findDeviceIndex(deviceName);
367 if (di == SIZE_MAX) {
368 return;
369 }
370 auto const& metrics = *mContext.metrics;
371
372 std::string out;
373 out += R"({"type":"metrics_list","device":")";
374 out += jsonEscape(deviceName);
375 out += R"(","metrics":[)";
376 bool first = true;
377 if (di < metrics.size()) {
378 auto const& info = metrics[di];
379 for (size_t mi = 0; mi < info.metrics.size(); ++mi) {
380 auto const& metric = info.metrics[mi];
381 if (metric.type == MetricType::String ||
382 metric.type == MetricType::Enum ||
383 metric.type == MetricType::Unknown) {
384 continue;
385 }
386 if (!first) {
387 out += ',';
388 }
389 first = false;
390 auto const& label = info.metricLabels[mi];
391 out += '"';
392 out += jsonEscape({label.label, label.size});
393 out += '"';
394 }
395 }
396 out += "]}";
397 sendText(out);
398}
399
400void StatusWebSocketHandler::handleSubscribe(std::string_view deviceName, std::string_view metricsArr)
401{
402 size_t di = findDeviceIndex(deviceName);
403 if (di == SIZE_MAX || metricsArr.empty()) {
404 return;
405 }
406 if (mSubscribedMetrics.size() <= di) {
407 mSubscribedMetrics.resize(di + 1);
408 }
409 forEachStringInArray(metricsArr, [&](std::string_view name) {
410 mSubscribedMetrics[di].emplace(name);
411 });
412}
413
414void StatusWebSocketHandler::handleUnsubscribe(std::string_view deviceName, std::string_view metricsArr)
415{
416 size_t di = findDeviceIndex(deviceName);
417 if (di == SIZE_MAX || metricsArr.empty() || di >= mSubscribedMetrics.size()) {
418 return;
419 }
420 forEachStringInArray(metricsArr, [&](std::string_view name) {
421 mSubscribedMetrics[di].erase(std::string(name));
422 });
423}
424
425size_t StatusWebSocketHandler::findDeviceIndex(std::string_view name) const
426{
427 auto const& specs = *mContext.specs;
428 for (size_t di = 0; di < specs.size(); ++di) {
429 if (specs[di].name == name) {
430 return di;
431 }
432 }
433 return SIZE_MAX;
434}
435
436void StatusWebSocketHandler::handleSubscribeLogs(std::string_view deviceName)
437{
438 size_t di = findDeviceIndex(deviceName);
439 if (di == SIZE_MAX) {
440 return;
441 }
442 if (mLastLogSeq.size() <= di) {
443 mLastLogSeq.resize(di + 1, 0);
444 }
445 // Start the cursor at the current log position so we only push future lines.
446 mLastLogSeq[di] = (*mContext.infos)[di].logSeq;
447 mLogSubscriptions.insert(di);
448}
449
450void StatusWebSocketHandler::handleUnsubscribeLogs(std::string_view deviceName)
451{
452 size_t di = findDeviceIndex(deviceName);
453 if (di == SIZE_MAX) {
454 return;
455 }
456 mLogSubscriptions.erase(di);
457}
458
460{
461 if (mLogSubscriptions.find(deviceIndex) == mLogSubscriptions.end()) {
462 return;
463 }
464 auto const& infos = *mContext.infos;
465 auto const& specs = *mContext.specs;
466 if (deviceIndex >= infos.size() || deviceIndex >= specs.size()) {
467 return;
468 }
469 if (mLastLogSeq.size() <= deviceIndex) {
470 mLastLogSeq.resize(deviceIndex + 1, 0);
471 }
472
473 auto const& info = infos[deviceIndex];
474 size_t newLines = info.logSeq - mLastLogSeq[deviceIndex];
475 if (newLines == 0) {
476 return;
477 }
478 // Cap to buffer size to avoid re-reading overwritten entries.
479 if (newLines > info.history.size()) {
480 newLines = info.history.size();
481 }
482
483 size_t histSize = info.history.size();
484 // The oldest unread entry sits at (historyPos - newLines + histSize) % histSize.
485 size_t startPos = (info.historyPos + histSize - newLines) % histSize;
486
487 std::string_view devName = specs[deviceIndex].name;
488 for (size_t i = 0; i < newLines; ++i) {
489 size_t pos = (startPos + i) % histSize;
490 std::string out;
491 out += R"({"type":"log","device":")";
492 out += jsonEscape(devName);
493 out += R"(","level":")";
494 out += logLevelName(info.historyLevel[pos]);
495 out += R"(","line":")";
496 out += jsonEscape(info.history[pos]);
497 out += R"("})";
498 sendText(out);
499 }
500 mLastLogSeq[deviceIndex] = info.logSeq;
501}
502
503} // namespace o2::framework
int32_t i
uint16_t pos
Definition RawData.h:3
uint32_t c
Definition RawData.h:2
nlohmann::json json
StringRef key
GLuint GLuint end
Definition glcorearb.h:469
GLuint const GLchar * name
Definition glcorearb.h:781
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
Definition glcorearb.h:5500
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei const GLchar * label
Definition glcorearb.h:2519
GLint GLint GLsizei GLsizei GLsizei depth
Definition glcorearb.h:470
GLuint start
Definition glcorearb.h:469
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
Defining PrimaryVertex explicitly as messageable.
Definition Cartesian.h:288
constexpr size_t metricStorageSize()
void encode_websocket_frames(std::vector< uv_buf_t > &outputs, char const *src, size_t size, WebSocketOpCode opcode, uint32_t mask)
@ EndOfStreaming
End of streaming requested, but not notified.
@ Streaming
Data is being processed.
@ Idle
End of streaming notified.
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::vector< DeviceSpec > * specs
std::vector< DeviceMetricsInfo > * metrics
std::vector< StatusWebSocketHandler * > statusHandlers
std::vector< DeviceInfo > * infos
LogLevel
Possible log levels for device log entries.
StatusWebSocketHandler(DriverServerContext &context, WSDPLHandler *handler)
void sendSnapshot()
Send a minimal JSON snapshot (device list + basic state, no metrics/logs).
void frame(char const *data, size_t s) override
Handles incoming commands from the MCP client.
void headers(std::map< std::string, std::string > const &headers) override
Sends the minimal snapshot on handshake completion.
void write(char const *, size_t)
Helper to write a message to the associated client.
uint64_t const void const *restrict const msg
Definition x9.h:153