32std::string jsonEscape(std::string_view s)
35 out.reserve(
s.size() + 4);
36 for (
unsigned char c :
s) {
56 snprintf(
buf,
sizeof(
buf),
"\\u%04x",
c);
59 out +=
static_cast<char>(
c);
69 case LogParsingHelpers::LogLevel::Debug:
71 case LogParsingHelpers::LogLevel::Info:
73 case LogParsingHelpers::LogLevel::Important:
75 case LogParsingHelpers::LogLevel::Warning:
77 case LogParsingHelpers::LogLevel::Alarm:
79 case LogParsingHelpers::LogLevel::Error:
81 case LogParsingHelpers::LogLevel::Critical:
83 case LogParsingHelpers::LogLevel::Fatal:
96 return "EndOfStreaming";
104void appendMetricValue(std::string& out, DeviceMetricsInfo
const& info,
size_t mi)
106 auto const& metric = info.metrics[mi];
107 if (metric.pos == 0) {
112 switch (metric.type) {
118 snprintf(
buf,
sizeof(
buf),
"%g",
static_cast<double>(info.floatMetrics[metric.storeIdx][last]));
133std::string_view extractStringField(std::string_view
json, std::string_view
key)
140 if (
pos == std::string_view::npos) {
143 pos += needle.size();
153 if (
end == std::string_view::npos) {
161std::string_view extractArrayField(std::string_view
json, std::string_view
key)
168 if (
pos == std::string_view::npos) {
171 pos += needle.size();
198void forEachStringInArray(std::string_view arr, F&& callback)
202 while (
pos < arr.size()) {
203 auto q = arr.find(
'"',
pos);
204 if (q == std::string_view::npos) {
207 auto end = arr.find(
'"', q + 1);
208 if (
end == std::string_view::npos) {
211 callback(arr.substr(q + 1,
end - q - 1));
219 : mContext{context}, mHandler{handler}
226 handlers.erase(std::remove(handlers.begin(), handlers.end(),
this), handlers.end());
237 auto cmd = extractStringField(
msg,
"cmd");
241 auto deviceName = extractStringField(
msg,
"device");
243 if (cmd ==
"list_metrics") {
244 handleListMetrics(deviceName);
245 }
else if (cmd ==
"subscribe") {
246 handleSubscribe(deviceName, extractArrayField(
msg,
"metrics"));
247 }
else if (cmd ==
"unsubscribe") {
248 handleUnsubscribe(deviceName, extractArrayField(
msg,
"metrics"));
249 }
else if (cmd ==
"subscribe_logs") {
250 handleSubscribeLogs(deviceName);
251 }
else if (cmd ==
"unsubscribe_logs") {
252 handleUnsubscribeLogs(deviceName);
256void StatusWebSocketHandler::sendText(std::string
const&
json)
258 std::vector<uv_buf_t> outputs;
260 mHandler->
write(outputs);
265 auto const& specs = *mContext.
specs;
266 auto const& infos = *mContext.
infos;
269 mSubscribedMetrics.resize(specs.size());
270 mLastLogSeq.resize(infos.size());
271 for (
size_t di = 0;
di < infos.size(); ++
di) {
272 mLastLogSeq[
di] = infos[
di].logSeq;
276 out.reserve(512 + specs.size() * 128);
277 out += R
"({"type":"snapshot","devices":[)";
278 for (
size_t di = 0;
di < specs.size(); ++
di) {
282 auto const& info = infos[
di];
283 out += R
"({"name":")";
284 out += jsonEscape(specs[di].name);
285 out += R"(","pid":)";
287 out += R"(,"active":)";
288 out += info.active ? "true" :
"false";
289 out += R
"(,"streamingState":")";
290 out += streamingStateName(info.streamingState);
291 out += R"(","deviceState":")";
292 out += jsonEscape(info.deviceState);
301 auto const& specs = *mContext.
specs;
304 if (deviceIndex >= specs.size() || deviceIndex >=
metrics.size()) {
309 if (mSubscribedMetrics.size() <= deviceIndex) {
310 mSubscribedMetrics.resize(deviceIndex + 1);
313 auto const& subscribed = mSubscribedMetrics[deviceIndex];
314 if (subscribed.empty()) {
318 auto const& info =
metrics[deviceIndex];
319 std::string metricsJson;
322 for (
size_t mi = 0; mi < info.metrics.size(); ++mi) {
323 if (!info.changed[mi]) {
326 auto const& metric = info.metrics[mi];
332 auto const&
label = info.metricLabels[mi];
333 std::string_view labelSV{
label.label,
label.size};
334 if (subscribed.find(std::string(labelSV)) == subscribed.end()) {
342 metricsJson += jsonEscape(labelSV);
343 metricsJson +=
"\":";
344 appendMetricValue(metricsJson, info, mi);
354 out += R
"({"type":"update","device":)";
356 out += R"(,"name":")";
357 out += jsonEscape(specs[deviceIndex].name);
358 out += R"(","metrics":)";
364void StatusWebSocketHandler::handleListMetrics(std::string_view deviceName)
366 size_t di = findDeviceIndex(deviceName);
367 if (
di == SIZE_MAX) {
373 out += R
"({"type":"metrics_list","device":")";
374 out += jsonEscape(deviceName);
375 out += R"(","metrics":[)";
379 for (
size_t mi = 0; mi < info.metrics.size(); ++mi) {
380 auto const& metric = info.metrics[mi];
390 auto const&
label = info.metricLabels[mi];
400void StatusWebSocketHandler::handleSubscribe(std::string_view deviceName, std::string_view metricsArr)
402 size_t di = findDeviceIndex(deviceName);
403 if (
di == SIZE_MAX || metricsArr.empty()) {
406 if (mSubscribedMetrics.size() <=
di) {
407 mSubscribedMetrics.resize(
di + 1);
409 forEachStringInArray(metricsArr, [&](std::string_view
name) {
410 mSubscribedMetrics[
di].emplace(
name);
414void StatusWebSocketHandler::handleUnsubscribe(std::string_view deviceName, std::string_view metricsArr)
416 size_t di = findDeviceIndex(deviceName);
417 if (
di == SIZE_MAX || metricsArr.empty() ||
di >= mSubscribedMetrics.size()) {
420 forEachStringInArray(metricsArr, [&](std::string_view
name) {
421 mSubscribedMetrics[
di].erase(std::string(
name));
425size_t StatusWebSocketHandler::findDeviceIndex(std::string_view
name)
const
427 auto const& specs = *mContext.
specs;
428 for (
size_t di = 0;
di < specs.size(); ++
di) {
436void StatusWebSocketHandler::handleSubscribeLogs(std::string_view deviceName)
438 size_t di = findDeviceIndex(deviceName);
439 if (
di == SIZE_MAX) {
442 if (mLastLogSeq.size() <=
di) {
443 mLastLogSeq.resize(
di + 1, 0);
446 mLastLogSeq[
di] = (*mContext.
infos)[
di].logSeq;
447 mLogSubscriptions.insert(
di);
450void StatusWebSocketHandler::handleUnsubscribeLogs(std::string_view deviceName)
452 size_t di = findDeviceIndex(deviceName);
453 if (
di == SIZE_MAX) {
456 mLogSubscriptions.erase(
di);
461 if (mLogSubscriptions.find(deviceIndex) == mLogSubscriptions.end()) {
464 auto const& infos = *mContext.
infos;
465 auto const& specs = *mContext.
specs;
466 if (deviceIndex >= infos.size() || deviceIndex >= specs.size()) {
469 if (mLastLogSeq.size() <= deviceIndex) {
470 mLastLogSeq.resize(deviceIndex + 1, 0);
473 auto const& info = infos[deviceIndex];
474 size_t newLines = info.logSeq - mLastLogSeq[deviceIndex];
479 if (newLines > info.history.size()) {
480 newLines = info.history.size();
483 size_t histSize = info.history.size();
485 size_t startPos = (info.historyPos + histSize - newLines) % histSize;
487 std::string_view devName = specs[deviceIndex].name;
488 for (
size_t i = 0;
i < newLines; ++
i) {
489 size_t pos = (startPos +
i) % histSize;
491 out += R
"({"type":"log","device":")";
492 out += jsonEscape(devName);
493 out += R"(","level":")";
494 out += logLevelName(info.historyLevel[pos]);
495 out += R"(","line":")";
496 out += jsonEscape(info.history[pos]);
500 mLastLogSeq[deviceIndex] = info.logSeq;
GLuint const GLchar * name
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
GLuint GLsizei const GLchar * label
GLint GLint GLsizei GLsizei GLsizei depth
GLenum GLuint GLenum GLsizei const GLchar * buf
Defining PrimaryVertex explicitly as messageable.
constexpr size_t metricStorageSize()
void encode_websocket_frames(std::vector< uv_buf_t > &outputs, char const *src, size_t size, WebSocketOpCode opcode, uint32_t mask)
@ EndOfStreaming
End of streaming requested, but not notified.
@ Streaming
Data is being processed.
@ Idle
End of streaming notified.
std::string to_string(gsl::span< T, Size > span)
std::vector< DeviceSpec > * specs
std::vector< DeviceMetricsInfo > * metrics
std::vector< StatusWebSocketHandler * > statusHandlers
std::vector< DeviceInfo > * infos
LogLevel
Possible log levels for device log entries.
StatusWebSocketHandler(DriverServerContext &context, WSDPLHandler *handler)
void sendSnapshot()
Send a minimal JSON snapshot (device list + basic state, no metrics/logs).
void sendNewLogs(size_t deviceIndex)
void sendUpdate(size_t deviceIndex)
~StatusWebSocketHandler() override
void frame(char const *data, size_t s) override
Handles incoming commands from the MCP client.
void headers(std::map< std::string, std::string > const &headers) override
Sends the minimal snapshot on handshake completion.
void write(char const *, size_t)
Helper to write a message to the associated client.
uint64_t const void const *restrict const msg