38std::string jsonEscape(std::string_view s)
41 out.reserve(
s.size() + 4);
42 for (
unsigned char c :
s) {
62 snprintf(
buf,
sizeof(
buf),
"\\u%04x",
c);
65 out +=
static_cast<char>(
c);
75 case LogParsingHelpers::LogLevel::Debug:
77 case LogParsingHelpers::LogLevel::Info:
79 case LogParsingHelpers::LogLevel::Important:
81 case LogParsingHelpers::LogLevel::Warning:
83 case LogParsingHelpers::LogLevel::Alarm:
85 case LogParsingHelpers::LogLevel::Error:
87 case LogParsingHelpers::LogLevel::Critical:
89 case LogParsingHelpers::LogLevel::Fatal:
102 return "EndOfStreaming";
110void appendMetricValue(std::string& out, DeviceMetricsInfo
const& info,
size_t mi)
112 auto const& metric =
info.metrics[mi];
113 if (metric.pos == 0) {
118 switch (metric.type) {
124 snprintf(
buf,
sizeof(
buf),
"%g",
static_cast<double>(
info.floatMetrics[metric.storeIdx][last]));
139std::string_view extractStringField(std::string_view
json, std::string_view
key)
146 if (
pos == std::string_view::npos) {
149 pos += needle.size();
159 if (
end == std::string_view::npos) {
167std::string_view extractArrayField(std::string_view
json, std::string_view
key)
174 if (
pos == std::string_view::npos) {
177 pos += needle.size();
204void forEachStringInArray(std::string_view arr, F&& callback)
208 while (
pos < arr.size()) {
209 auto q = arr.find(
'"',
pos);
210 if (q == std::string_view::npos) {
213 auto end = arr.find(
'"', q + 1);
214 if (
end == std::string_view::npos) {
217 callback(arr.substr(q + 1,
end - q - 1));
225 : mContext{context}, mHandler{handler}
232 handlers.erase(std::remove(handlers.begin(), handlers.end(),
this), handlers.end());
243 auto cmd = extractStringField(
msg,
"cmd");
247 auto deviceName = extractStringField(
msg,
"device");
249 if (cmd ==
"list_metrics") {
250 handleListMetrics(deviceName);
251 }
else if (cmd ==
"subscribe") {
252 handleSubscribe(deviceName, extractArrayField(
msg,
"metrics"));
253 }
else if (cmd ==
"unsubscribe") {
254 handleUnsubscribe(deviceName, extractArrayField(
msg,
"metrics"));
255 }
else if (cmd ==
"subscribe_logs") {
256 handleSubscribeLogs(deviceName);
257 }
else if (cmd ==
"unsubscribe_logs") {
258 handleUnsubscribeLogs(deviceName);
259 }
else if (cmd ==
"start_devices") {
260 handleStartDevices();
261 }
else if (cmd ==
"enable_signpost") {
262 handleEnableSignpost(deviceName, extractArrayField(
msg,
"streams"));
263 }
else if (cmd ==
"disable_signpost") {
264 handleDisableSignpost(deviceName, extractArrayField(
msg,
"streams"));
268void StatusWebSocketHandler::sendText(std::string
const&
json)
270 std::vector<uv_buf_t> outputs;
272 mHandler->
write(outputs);
277 auto const& specs = *mContext.
specs;
278 auto const& infos = *mContext.
infos;
281 mSubscribedMetrics.resize(specs.size());
282 mLastLogSeq.resize(infos.size());
283 for (
size_t di = 0;
di < infos.size(); ++
di) {
284 mLastLogSeq[
di] = infos[
di].logSeq;
288 out.reserve(512 + specs.size() * 128);
289 out += R
"({"type":"snapshot","devices":[)";
290 for (
size_t di = 0;
di < specs.size(); ++
di) {
294 auto const& info = infos[
di];
295 out += R
"({"name":")";
296 out += jsonEscape(specs[di].name);
297 out += R"(","pid":)";
299 out += R"(,"active":)";
300 out += info.active ? "true" :
"false";
301 out += R
"(,"streamingState":")";
302 out += streamingStateName(info.streamingState);
303 out += R"(","deviceState":")";
304 out += jsonEscape(info.deviceState);
313 auto const& specs = *mContext.
specs;
316 if (deviceIndex >= specs.size() || deviceIndex >=
metrics.size()) {
321 if (mSubscribedMetrics.size() <= deviceIndex) {
322 mSubscribedMetrics.resize(deviceIndex + 1);
325 auto const& subscribed = mSubscribedMetrics[deviceIndex];
326 if (subscribed.empty()) {
330 auto const& info =
metrics[deviceIndex];
331 std::string metricsJson;
334 for (
size_t mi = 0; mi < info.metrics.size(); ++mi) {
335 if (!info.changed[mi]) {
338 auto const& metric = info.metrics[mi];
344 auto const&
label = info.metricLabels[mi];
345 std::string_view labelSV{
label.label,
label.size};
346 if (subscribed.find(std::string(labelSV)) == subscribed.end()) {
354 metricsJson += jsonEscape(labelSV);
355 metricsJson +=
"\":";
356 appendMetricValue(metricsJson, info, mi);
366 out += R
"({"type":"update","device":)";
368 out += R"(,"name":")";
369 out += jsonEscape(specs[deviceIndex].name);
370 out += R"(","metrics":)";
376void StatusWebSocketHandler::handleListMetrics(std::string_view deviceName)
378 size_t di = findDeviceIndex(deviceName);
379 if (
di == SIZE_MAX) {
385 out += R
"({"type":"metrics_list","device":")";
386 out += jsonEscape(deviceName);
387 out += R"(","metrics":[)";
391 for (
size_t mi = 0; mi < info.metrics.size(); ++mi) {
392 auto const& metric = info.metrics[mi];
402 auto const&
label =
info.metricLabels[mi];
412void StatusWebSocketHandler::handleSubscribe(std::string_view deviceName, std::string_view metricsArr)
414 size_t di = findDeviceIndex(deviceName);
415 if (
di == SIZE_MAX || metricsArr.empty()) {
418 if (mSubscribedMetrics.size() <=
di) {
419 mSubscribedMetrics.resize(
di + 1);
421 forEachStringInArray(metricsArr, [&](std::string_view
name) {
422 mSubscribedMetrics[
di].emplace(
name);
426void StatusWebSocketHandler::handleUnsubscribe(std::string_view deviceName, std::string_view metricsArr)
428 size_t di = findDeviceIndex(deviceName);
429 if (
di == SIZE_MAX || metricsArr.empty() ||
di >= mSubscribedMetrics.size()) {
432 forEachStringInArray(metricsArr, [&](std::string_view
name) {
433 mSubscribedMetrics[
di].erase(std::string(
name));
437size_t StatusWebSocketHandler::findDeviceIndex(std::string_view
name)
const
439 auto const& specs = *mContext.
specs;
440 for (
size_t di = 0;
di < specs.size(); ++
di) {
448void StatusWebSocketHandler::handleStartDevices()
450 for (
auto const& info : *mContext.infos) {
452 kill(
info.pid, SIGCONT);
457void StatusWebSocketHandler::handleEnableSignpost(std::string_view deviceName, std::string_view streamsArr)
459 if (streamsArr.empty()) {
462 if (deviceName.empty()) {
464 forEachStringInArray(streamsArr, [](std::string_view streamName) {
465 std::string
target(streamName);
468 if (
static_cast<std::string*
>(context)->
compare(
name) == 0) {
476 size_t di = findDeviceIndex(deviceName);
480 auto* controller = (*mContext.
controls)[
di].controller;
481 forEachStringInArray(streamsArr, [controller](std::string_view
name) {
482 std::string cmd =
"/signpost:enable ";
484 controller->write(cmd.c_str(), cmd.size());
489void StatusWebSocketHandler::handleDisableSignpost(std::string_view deviceName, std::string_view streamsArr)
491 if (streamsArr.empty()) {
494 if (deviceName.empty()) {
495 forEachStringInArray(streamsArr, [](std::string_view streamName) {
496 std::string
target(streamName);
499 if (
static_cast<std::string*
>(context)->
compare(
name) == 0) {
507 size_t di = findDeviceIndex(deviceName);
511 auto* controller = (*mContext.
controls)[
di].controller;
512 forEachStringInArray(streamsArr, [controller](std::string_view
name) {
513 std::string cmd =
"/signpost:disable ";
515 controller->write(cmd.c_str(), cmd.size());
520void StatusWebSocketHandler::handleSubscribeLogs(std::string_view deviceName)
522 size_t di = findDeviceIndex(deviceName);
523 if (
di == SIZE_MAX) {
526 if (mLastLogSeq.size() <=
di) {
527 mLastLogSeq.resize(
di + 1, 0);
530 mLastLogSeq[
di] = (*mContext.
infos)[
di].logSeq;
531 mLogSubscriptions.insert(
di);
534void StatusWebSocketHandler::handleUnsubscribeLogs(std::string_view deviceName)
536 size_t di = findDeviceIndex(deviceName);
537 if (
di == SIZE_MAX) {
540 mLogSubscriptions.erase(
di);
545 if (mLogSubscriptions.find(deviceIndex) == mLogSubscriptions.end()) {
548 auto const& infos = *mContext.
infos;
549 auto const& specs = *mContext.
specs;
550 if (deviceIndex >= infos.size() || deviceIndex >= specs.size()) {
553 if (mLastLogSeq.size() <= deviceIndex) {
554 mLastLogSeq.resize(deviceIndex + 1, 0);
557 auto const& info = infos[deviceIndex];
558 size_t newLines = info.logSeq - mLastLogSeq[deviceIndex];
563 if (newLines > info.history.size()) {
564 newLines = info.history.size();
567 size_t histSize = info.history.size();
569 size_t startPos = (info.historyPos + histSize - newLines) % histSize;
571 std::string_view devName = specs[deviceIndex].name;
572 for (
size_t i = 0;
i < newLines; ++
i) {
573 size_t pos = (startPos +
i) % histSize;
575 out += R
"({"type":"log","device":")";
576 out += jsonEscape(devName);
577 out += R"(","level":")";
578 out += logLevelName(info.historyLevel[pos]);
579 out += R"(","line":")";
580 out += jsonEscape(info.history[pos]);
584 mLastLogSeq[deviceIndex] = info.logSeq;
o2_log_handle_t * o2_walk_logs(bool(*callback)(char const *name, void *log, void *context), void *context=nullptr)
void _o2_log_set_stacktrace(_o2_log_t *log, int stacktrace)
GLuint const GLchar * name
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
GLuint GLsizei const GLchar * label
GLint GLint GLsizei GLsizei GLsizei depth
GLenum GLuint GLenum GLsizei const GLchar * buf
Defining ITS Vertex explicitly as messageable.
constexpr size_t metricStorageSize()
void encode_websocket_frames(std::vector< uv_buf_t > &outputs, char const *src, size_t size, WebSocketOpCode opcode, uint32_t mask)
@ EndOfStreaming
End of streaming requested, but not notified.
@ Streaming
Data is being processed.
@ Idle
End of streaming notified.
std::string to_string(gsl::span< T, Size > span)
std::vector< DeviceSpec > * specs
std::vector< DeviceMetricsInfo > * metrics
std::vector< StatusWebSocketHandler * > statusHandlers
std::vector< DeviceInfo > * infos
std::vector< DeviceControl > * controls
LogLevel
Possible log levels for device log entries.
StatusWebSocketHandler(DriverServerContext &context, WSDPLHandler *handler)
void sendSnapshot()
Send a minimal JSON snapshot (device list + basic state, no metrics/logs).
void sendNewLogs(size_t deviceIndex)
void sendUpdate(size_t deviceIndex)
~StatusWebSocketHandler() override
void frame(char const *data, size_t s) override
Handles incoming commands from the MCP client.
void headers(std::map< std::string, std::string > const &headers) override
Sends the minimal snapshot on handshake completion.
void write(char const *, size_t)
Helper to write a message to the associated client.
void compare(std::string_view s1, std::string_view s2)
uint64_t const void const *restrict const msg