Project
Loading...
Searching...
No Matches
ControlWebSocketHandler.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13#include "DriverServerContext.h"
17#include "Framework/Signpost.h"
18#include <regex>
19#include "Framework/Logger.h"
21
23
24namespace o2::framework
25{
26void ControlWebSocketHandler::frame(char const* frame, size_t s)
27{
28 bool hasNewMetric = false;
29 std::array<Metric2DViewIndex*, 2> model = {&(*mContext.infos)[mIndex].inputChannelMetricsViewIndex,
30 &(*mContext.infos)[mIndex].outputChannelMetricsViewIndex};
31 auto updateMetricsViews = Metric2DViewIndex::getUpdater();
32
33 auto newMetricCallback = [&](std::string const& name, MetricInfo const& metric, int value, size_t metricIndex) {
34 updateMetricsViews(model, name, metric, value, metricIndex);
35 hasNewMetric = true;
36 };
37 std::string_view tokenSV(frame, s);
38 ParsedMetricMatch metricMatch;
39
40 auto doParseConfig = [](std::string_view const& token, ParsedConfigMatch& configMatch, DeviceInfo& info) -> bool {
41 if (DeviceConfigHelper::parseConfig(token, configMatch)) {
42 DeviceConfigHelper::processConfig(configMatch, info);
43 return true;
44 }
45 return false;
46 };
47 LOG(debug3) << "Data received: " << std::string_view(frame, s);
48 if (DeviceMetricsHelper::parseMetric(tokenSV, metricMatch)) {
49 // We use this callback to cache which metrics are needed to provide a
50 // the DataRelayer view.
51 assert(mContext.metrics);
52 DeviceMetricsHelper::processMetric(metricMatch, (*mContext.metrics)[mIndex], newMetricCallback);
53 didProcessMetric = true;
54 didHaveNewMetric |= hasNewMetric;
55 return;
56 }
57
58 ParsedConfigMatch configMatch;
59 std::string_view const token(frame, s);
60 std::match_results<std::string_view::const_iterator> match;
61
64 } else if (doParseConfig(token, configMatch, (*mContext.infos)[mIndex]) && mContext.infos) {
65 LOG(debug2) << "Found configuration information for pid " << mPid;
66 } else {
67 LOG(error) << "Unexpected control data: " << std::string_view(frame, s);
68 }
69}
70
76{
77 if (!didProcessMetric) {
78 return;
79 }
80 O2_SIGNPOST_ID_GENERATE(sid, rate_limiting);
81 O2_SIGNPOST_START(rate_limiting, sid, "endChunk",
82 "Processing metrics from device %zu (had new metric: %d)",
84 size_t timestamp = (uv_hrtime() - mContext.driver->startTime) / 1000000 + mContext.driver->startTimeMsFromEpoch;
85 assert(mContext.metrics);
86 assert(mContext.infos);
87 assert(mContext.states);
88 assert(mContext.specs);
89 assert(mContext.driver);
90
91 for (auto& callback : *mContext.metricProcessingCallbacks) {
92 callback(mContext.registry, ServiceMetricsInfo{*mContext.metrics, *mContext.specs, *mContext.infos, mContext.driver->metrics, *mContext.driver}, timestamp);
93 }
94 // Notify status clients before changed flags are reset so they can see what changed.
95 for (auto* statusHandler : mContext.statusHandlers) {
96 statusHandler->sendUpdate(mIndex);
97 }
98 for (auto& metricsInfo : *mContext.metrics) {
99 std::fill(metricsInfo.changed.begin(), metricsInfo.changed.end(), false);
100 }
101 O2_SIGNPOST_END(rate_limiting, sid, "endChunk",
102 "Done processing metrics from device %zu", mIndex);
103}
104
105void ControlWebSocketHandler::headers(std::map<std::string, std::string> const& headers)
106{
107 if (headers.count("x-dpl-pid")) {
108 auto s = headers.find("x-dpl-pid");
109 this->mPid = std::stoi(s->second);
110 for (size_t di = 0; di < mContext.infos->size(); ++di) {
111 if ((*mContext.infos)[di].pid == mPid) {
112 mIndex = di;
113 return;
114 }
115 }
116 }
117}
118} // namespace o2::framework
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:490
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:609
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:507
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:603
bool match(const std::vector< std::string > &queries, const char *pattern)
Definition dcs-ccdb.cxx:229
GLuint const GLchar * name
Definition glcorearb.h:781
GLsizei const GLfloat * value
Definition glcorearb.h:819
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
static bool parseControl(std::string_view const &s, std::match_results< std::string_view::const_iterator > &match)
static void processCommand(std::vector< DeviceInfo > &infos, std::vector< DataProcessingStates > &allStates, pid_t pid, std::string const &command, std::string const &arg)
size_t mIndex
The index of the remote process associated to this handler.
void headers(std::map< std::string, std::string > const &headers) override
ControlWebSocketHandler(DriverServerContext &context)
void frame(char const *frame, size_t s) override
static bool processConfig(ParsedConfigMatch &results, DeviceInfo &info)
static bool parseConfig(std::string_view const s, ParsedConfigMatch &results)
static bool parseMetric(std::string_view const s, ParsedMetricMatch &results)
Helper function to parse a metric string.
static bool processMetric(ParsedMetricMatch &results, DeviceMetricsInfo &info, NewMetricCallback newMetricCallback=nullptr)
std::vector< ServiceMetricHandling > * metricProcessingCallbacks
std::vector< DeviceSpec > * specs
std::vector< DeviceMetricsInfo > * metrics
std::vector< StatusWebSocketHandler * > statusHandlers
std::vector< DataProcessingStates > * states
std::vector< DeviceInfo > * infos
static Updater getUpdater()
Get the right updated function given a list of Metric views.
Temporary struct to hold a configuration parameter.
Temporary struct to hold a metric after it has been parsed.
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"