Project
Loading...
Searching...
No Matches
ControlWebSocketHandler.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13#include "DriverServerContext.h"
17#include <regex>
18#include "Framework/Logger.h"
20
21namespace o2::framework
22{
23void ControlWebSocketHandler::frame(char const* frame, size_t s)
24{
25 bool hasNewMetric = false;
26 std::array<Metric2DViewIndex*, 2> model = {&(*mContext.infos)[mIndex].inputChannelMetricsViewIndex,
27 &(*mContext.infos)[mIndex].outputChannelMetricsViewIndex};
28 auto updateMetricsViews = Metric2DViewIndex::getUpdater();
29
30 auto newMetricCallback = [&](std::string const& name, MetricInfo const& metric, int value, size_t metricIndex) {
31 updateMetricsViews(model, name, metric, value, metricIndex);
32 hasNewMetric = true;
33 };
34 std::string_view tokenSV(frame, s);
35 ParsedMetricMatch metricMatch;
36
37 auto doParseConfig = [](std::string_view const& token, ParsedConfigMatch& configMatch, DeviceInfo& info) -> bool {
38 if (DeviceConfigHelper::parseConfig(token, configMatch)) {
39 DeviceConfigHelper::processConfig(configMatch, info);
40 return true;
41 }
42 return false;
43 };
44 LOG(debug3) << "Data received: " << std::string_view(frame, s);
45 if (DeviceMetricsHelper::parseMetric(tokenSV, metricMatch)) {
46 // We use this callback to cache which metrics are needed to provide a
47 // the DataRelayer view.
48 assert(mContext.metrics);
49 DeviceMetricsHelper::processMetric(metricMatch, (*mContext.metrics)[mIndex], newMetricCallback);
50 didProcessMetric = true;
51 didHaveNewMetric |= hasNewMetric;
52 return;
53 }
54
55 ParsedConfigMatch configMatch;
56 std::string_view const token(frame, s);
57 std::match_results<std::string_view::const_iterator> match;
58
61 } else if (doParseConfig(token, configMatch, (*mContext.infos)[mIndex]) && mContext.infos) {
62 LOG(debug2) << "Found configuration information for pid " << mPid;
63 } else {
64 LOG(error) << "Unexpected control data: " << std::string_view(frame, s);
65 }
66}
67
73{
74 if (!didProcessMetric) {
75 return;
76 }
77 size_t timestamp = (uv_hrtime() - mContext.driver->startTime) / 1000000 + mContext.driver->startTimeMsFromEpoch;
78 assert(mContext.metrics);
79 assert(mContext.infos);
80 assert(mContext.states);
81 assert(mContext.specs);
82 assert(mContext.driver);
83
84 for (auto& callback : *mContext.metricProcessingCallbacks) {
85 callback(mContext.registry, ServiceMetricsInfo{*mContext.metrics, *mContext.specs, *mContext.infos, mContext.driver->metrics, *mContext.driver}, timestamp);
86 }
87 // Notify status clients before changed flags are reset so they can see what changed.
88 for (auto* statusHandler : mContext.statusHandlers) {
89 statusHandler->sendUpdate(mIndex);
90 }
91 for (auto& metricsInfo : *mContext.metrics) {
92 std::fill(metricsInfo.changed.begin(), metricsInfo.changed.end(), false);
93 }
94}
95
96void ControlWebSocketHandler::headers(std::map<std::string, std::string> const& headers)
97{
98 if (headers.count("x-dpl-pid")) {
99 auto s = headers.find("x-dpl-pid");
100 this->mPid = std::stoi(s->second);
101 for (size_t di = 0; di < mContext.infos->size(); ++di) {
102 if ((*mContext.infos)[di].pid == mPid) {
103 mIndex = di;
104 return;
105 }
106 }
107 }
108}
109} // namespace o2::framework
bool match(const std::vector< std::string > &queries, const char *pattern)
Definition dcs-ccdb.cxx:229
GLuint const GLchar * name
Definition glcorearb.h:781
GLsizei const GLfloat * value
Definition glcorearb.h:819
Defining PrimaryVertex explicitly as messageable.
Definition Cartesian.h:288
static bool parseControl(std::string_view const &s, std::match_results< std::string_view::const_iterator > &match)
static void processCommand(std::vector< DeviceInfo > &infos, std::vector< DataProcessingStates > &allStates, pid_t pid, std::string const &command, std::string const &arg)
size_t mIndex
The index of the remote process associated to this handler.
void headers(std::map< std::string, std::string > const &headers) override
ControlWebSocketHandler(DriverServerContext &context)
void frame(char const *frame, size_t s) override
static bool processConfig(ParsedConfigMatch &results, DeviceInfo &info)
static bool parseConfig(std::string_view const s, ParsedConfigMatch &results)
static bool parseMetric(std::string_view const s, ParsedMetricMatch &results)
Helper function to parse a metric string.
static bool processMetric(ParsedMetricMatch &results, DeviceMetricsInfo &info, NewMetricCallback newMetricCallback=nullptr)
std::vector< ServiceMetricHandling > * metricProcessingCallbacks
std::vector< DeviceSpec > * specs
std::vector< DeviceMetricsInfo > * metrics
std::vector< StatusWebSocketHandler * > statusHandlers
std::vector< DataProcessingStates > * states
std::vector< DeviceInfo > * infos
static Updater getUpdater()
Get the right updated function given a list of Metric views.
Temporary struct to hold a configuration parameter.
Temporary struct to hold a metric after it has been parsed.
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"