Project
Loading...
Searching...
No Matches
ControlWebSocketHandler.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13#include "DriverServerContext.h"
16#include <regex>
17#include "Framework/Logger.h"
19
20namespace o2::framework
21{
22void ControlWebSocketHandler::frame(char const* frame, size_t s)
23{
24 bool hasNewMetric = false;
25 std::array<Metric2DViewIndex*, 2> model = {&(*mContext.infos)[mIndex].inputChannelMetricsViewIndex,
26 &(*mContext.infos)[mIndex].outputChannelMetricsViewIndex};
27 auto updateMetricsViews = Metric2DViewIndex::getUpdater();
28
29 auto newMetricCallback = [&](std::string const& name, MetricInfo const& metric, int value, size_t metricIndex) {
30 updateMetricsViews(model, name, metric, value, metricIndex);
31 hasNewMetric = true;
32 };
33 std::string_view tokenSV(frame, s);
34 ParsedMetricMatch metricMatch;
35
36 auto doParseConfig = [](std::string_view const& token, ParsedConfigMatch& configMatch, DeviceInfo& info) -> bool {
37 if (DeviceConfigHelper::parseConfig(token, configMatch)) {
38 DeviceConfigHelper::processConfig(configMatch, info);
39 return true;
40 }
41 return false;
42 };
43 LOG(debug3) << "Data received: " << std::string_view(frame, s);
44 if (DeviceMetricsHelper::parseMetric(tokenSV, metricMatch)) {
45 // We use this callback to cache which metrics are needed to provide a
46 // the DataRelayer view.
47 assert(mContext.metrics);
48 DeviceMetricsHelper::processMetric(metricMatch, (*mContext.metrics)[mIndex], newMetricCallback);
49 didProcessMetric = true;
50 didHaveNewMetric |= hasNewMetric;
51 return;
52 }
53
54 ParsedConfigMatch configMatch;
55 std::string_view const token(frame, s);
56 std::match_results<std::string_view::const_iterator> match;
57
60 } else if (doParseConfig(token, configMatch, (*mContext.infos)[mIndex]) && mContext.infos) {
61 LOG(debug2) << "Found configuration information for pid " << mPid;
62 } else {
63 LOG(error) << "Unexpected control data: " << std::string_view(frame, s);
64 }
65}
66
72{
73 if (!didProcessMetric) {
74 return;
75 }
76 size_t timestamp = (uv_hrtime() - mContext.driver->startTime) / 1000000 + mContext.driver->startTimeMsFromEpoch;
77 assert(mContext.metrics);
78 assert(mContext.infos);
79 assert(mContext.states);
80 assert(mContext.specs);
81 assert(mContext.driver);
82
83 for (auto& callback : *mContext.metricProcessingCallbacks) {
84 callback(mContext.registry, ServiceMetricsInfo{*mContext.metrics, *mContext.specs, *mContext.infos, mContext.driver->metrics, *mContext.driver}, timestamp);
85 }
86 for (auto& metricsInfo : *mContext.metrics) {
87 std::fill(metricsInfo.changed.begin(), metricsInfo.changed.end(), false);
88 }
89}
90
91void ControlWebSocketHandler::headers(std::map<std::string, std::string> const& headers)
92{
93 if (headers.count("x-dpl-pid")) {
94 auto s = headers.find("x-dpl-pid");
95 this->mPid = std::stoi(s->second);
96 for (size_t di = 0; di < mContext.infos->size(); ++di) {
97 if ((*mContext.infos)[di].pid == mPid) {
98 mIndex = di;
99 return;
100 }
101 }
102 }
103}
104} // namespace o2::framework
bool match(const std::vector< std::string > &queries, const char *pattern)
Definition dcs-ccdb.cxx:229
GLuint const GLchar * name
Definition glcorearb.h:781
GLsizei const GLfloat * value
Definition glcorearb.h:819
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
static bool parseControl(std::string_view const &s, std::match_results< std::string_view::const_iterator > &match)
static void processCommand(std::vector< DeviceInfo > &infos, std::vector< DataProcessingStates > &allStates, pid_t pid, std::string const &command, std::string const &arg)
size_t mIndex
The index of the remote process associated to this handler.
void headers(std::map< std::string, std::string > const &headers) override
ControlWebSocketHandler(DriverServerContext &context)
void frame(char const *frame, size_t s) override
static bool processConfig(ParsedConfigMatch &results, DeviceInfo &info)
static bool parseConfig(std::string_view const s, ParsedConfigMatch &results)
static bool parseMetric(std::string_view const s, ParsedMetricMatch &results)
Helper function to parse a metric string.
static bool processMetric(ParsedMetricMatch &results, DeviceMetricsInfo &info, NewMetricCallback newMetricCallback=nullptr)
std::vector< ServiceMetricHandling > * metricProcessingCallbacks
std::vector< DeviceSpec > * specs
std::vector< DeviceMetricsInfo > * metrics
std::vector< DataProcessingStates > * states
std::vector< DeviceInfo > * infos
static Updater getUpdater()
Get the right updated function given a list of Metric views.
Temporary struct to hold a configuration parameter.
Temporary struct to hold a metric after it has been parsed.
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"