Project
Loading...
Searching...
No Matches
WSDriverClient.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "WSDriverClient.h"
14#include "Framework/Logger.h"
17#include "DriverClientContext.h"
18#include "DPLWebSocket.h"
19#include "Framework/Signpost.h"
20#include <uv.h>
21#include <string_view>
22#include <charconv>
23
26O2_DECLARE_DYNAMIC_LOG(monitoring_service);
27O2_DECLARE_DYNAMIC_LOG(data_processor_context);
28O2_DECLARE_DYNAMIC_LOG(stream_context);
29
30namespace o2::framework
31{
32
35 : mClient{client}
36 {
37 }
38
39 void headers(std::map<std::string, std::string> const& headers) override
40 {
41 }
43 void beginFragmentation() override {}
44
47 void frame(char const* frame, size_t s) override
48 {
49 mClient.dispatch(std::string_view(frame, s));
50 }
51
52 void endFragmentation() override{};
53 void control(char const* frame, size_t s) override{};
54
57 void beginChunk() override
58 {
59 }
60
64 void endChunk() override
65 {
66 }
67
71};
72
77
78void on_connect(uv_connect_t* connection, int status)
79{
80 if (status < 0) {
81 LOG(error) << "Unable to connect to driver.";
82 return;
83 }
84 auto* context = (ConnectionContext*)connection->data;
85 WSDriverClient* client = context->client;
86 auto& state = context->ref.get<DeviceState>();
87 state.loopReason |= DeviceState::WS_CONNECTED;
88 auto onHandshake = [client, ref = context->ref]() {
89 client->flushPending(ref);
90 };
91 std::lock_guard<std::mutex> lock(client->mutex());
92 auto handler = std::make_unique<ClientWebSocketHandler>(*client);
93 client->observe("/ping", [](std::string_view) {
94 LOG(info) << "ping";
95 });
97 client->observe("/shm-offer", [ref = context->ref](std::string_view cmd) {
98 auto& state = ref.get<DeviceState>();
99 static constexpr int prefixSize = std::string_view{"/shm-offer "}.size();
100 if (prefixSize > cmd.size()) {
101 LOG(error) << "Malformed shared memory offer";
102 return;
103 }
104 cmd.remove_prefix(prefixSize);
105 size_t offerSize;
106 auto offerSizeError = std::from_chars(cmd.data(), cmd.data() + cmd.size(), offerSize);
107 if (offerSizeError.ec != std::errc()) {
108 LOG(error) << "Malformed shared memory offer";
109 return;
110 }
111 LOGP(detail, "Received {}MB shared memory offer", offerSize);
113 offer.cpu = 0;
114 offer.memory = 0;
115 offer.sharedMemory = offerSize * 1000000;
116 offer.runtime = 10000;
117 offer.user = -1;
118 offer.valid = true;
119
120 state.pendingOffers.push_back(offer);
121 });
122
123 client->observe("/quit", [ref = context->ref](std::string_view) {
124 auto& state = ref.get<DeviceState>();
125 state.quitRequested = true;
126 });
127
128 client->observe("/restart", [ref = context->ref](std::string_view) {
129 auto& state = ref.get<DeviceState>();
130 state.nextFairMQState.emplace_back("RUN");
131 state.nextFairMQState.emplace_back("STOP");
132 });
133
134 client->observe("/start", [ref = context->ref](std::string_view) {
135 auto& state = ref.get<DeviceState>();
136 state.nextFairMQState.emplace_back("RUN");
137 });
138
139 client->observe("/stop", [ref = context->ref](std::string_view) {
140 auto& state = ref.get<DeviceState>();
141 state.nextFairMQState.emplace_back("STOP");
142 });
143
144 client->observe("/trace", [ref = context->ref](std::string_view cmd) {
145 auto& state = ref.get<DeviceState>();
146 static constexpr int prefixSize = std::string_view{"/trace "}.size();
147 if (prefixSize > cmd.size()) {
148 LOG(error) << "Malformed tracing request";
149 return;
150 }
151 cmd.remove_prefix(prefixSize);
152 int tracingFlags = 0;
153 auto error = std::from_chars(cmd.data(), cmd.data() + cmd.size(), tracingFlags);
154 if (error.ec != std::errc()) {
155 LOG(error) << "Malformed tracing mask";
156 return;
157 }
158 LOGP(info, "Tracing flags set to {}", tracingFlags);
159 state.tracingFlags = tracingFlags;
160 });
161
162 client->observe("/log-streams", [ref = context->ref](std::string_view cmd) {
163 auto& state = ref.get<DeviceState>();
164 static constexpr int prefixSize = std::string_view{"/log-streams "}.size();
165 if (prefixSize > cmd.size()) {
166 LOG(error) << "Malformed log-streams request";
167 return;
168 }
169 cmd.remove_prefix(prefixSize);
170 int logStreams = 0;
171
172 auto error = std::from_chars(cmd.data(), cmd.data() + cmd.size(), logStreams);
173 if (error.ec != std::errc()) {
174 LOG(error) << "Malformed log-streams mask";
175 return;
176 }
177 LOGP(info, "Logstreams flags set to {}", logStreams);
178 state.logStreams = logStreams;
179 if ((state.logStreams & DeviceState::LogStreams::DEVICE_LOG) != 0) {
180 O2_LOG_ENABLE(device);
181 } else {
182 O2_LOG_DISABLE(device);
183 }
184 if ((state.logStreams & DeviceState::LogStreams::COMPLETION_LOG) != 0) {
185 O2_LOG_ENABLE(completion);
186 } else {
187 O2_LOG_DISABLE(completion);
188 }
190 O2_LOG_ENABLE(monitoring_service);
191 } else {
192 O2_LOG_DISABLE(monitoring_service);
193 }
195 O2_LOG_ENABLE(data_processor_context);
196 } else {
197 O2_LOG_DISABLE(data_processor_context);
198 }
199 if ((state.logStreams & DeviceState::LogStreams::STREAM_CONTEXT_LOG) != 0) {
200 O2_LOG_ENABLE(stream_context);
201 } else {
202 O2_LOG_DISABLE(stream_context);
203 }
204 });
205
206 // Client will be filled in the line after. I can probably have a single
207 // client per device.
208 auto dplClient = std::make_unique<WSDPLClient>();
209 dplClient->connect(context->ref, connection->handle, onHandshake, std::move(handler));
210 client->setDPLClient(std::move(dplClient));
211 client->sendHandshake();
212}
213
215{
216 auto* state = (DeviceState*)handle->data;
218}
219
220WSDriverClient::WSDriverClient(ServiceRegistryRef registry, char const* ip, unsigned short port)
221 : mRegistry(registry)
222{
223 auto& state = registry.get<DeviceState>();
224
225 // Must connect the device to the server and send a websocket request.
226 // On successful connection we can then start to send commands to the driver.
227 // We keep a backlog to make sure we do not lose messages.
228 auto* socket = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
229 uv_tcp_init(state.loop, socket);
230 auto* connection = (uv_connect_t*)malloc(sizeof(uv_connect_t));
231 auto* context = new ConnectionContext{.client = this, .ref = registry};
232 connection->data = context;
233
234 struct sockaddr_in dest;
235 uv_ip4_addr(strdup(ip), port, &dest);
236 uv_tcp_connect(connection, socket, (const struct sockaddr*)&dest, on_connect);
237
238 this->mAwakeMainThread = (uv_async_t*)malloc(sizeof(uv_async_t));
239 this->mAwakeMainThread->data = &state;
240 uv_async_init(state.loop, this->mAwakeMainThread, on_awake_main_thread);
241}
242
244{
245 free(this->mAwakeMainThread);
246}
247
248void sendMessageToDriver(std::unique_ptr<o2::framework::WSDPLClient>& client, char const* message, size_t s)
249{
250}
251
252void WSDriverClient::setDPLClient(std::unique_ptr<WSDPLClient> client)
253{
254 mClient = std::move(client);
255 mConnected = true;
256}
257
259{
260 mClient->sendHandshake();
262}
263
264void WSDriverClient::tell(const char* msg, size_t s, bool flush)
265{
266 // Tell will always accumulate and we signal the main thread we
267 // have metrics to push
268 std::lock_guard<std::mutex> lock(mClientMutex);
270 if (flush) {
271 this->awake();
272 }
273}
274
275void WSDriverClient::awake()
276{
277 uv_async_send(mAwakeMainThread);
278}
279
281{
282 if (mainThreadRef.isMainThread() == false) {
283 LOG(error) << "flushPending not called from main thread";
284 }
285 std::lock_guard<std::mutex> lock(mClientMutex);
286 static bool printed1 = false;
287 static bool printed2 = false;
288 if (!mClient) {
289 if (mBacklog.size() > 2000) {
290 if (!printed1) {
291 LOG(warning) << "Unable to communicate with driver because client does not exist. Continuing connection attempts.";
292 printed1 = true;
293 }
294 }
295 return;
296 }
297 if (!(mClient->isHandshaken())) {
298 if (mBacklog.size() > 2000) {
299 if (!printed2) {
300 LOG(warning) << "Unable to communicate with driver because client is not connected. Continuing connection attempts.";
301 printed2 = true;
302 }
303 }
304 return;
305 }
306 if (printed1 || printed2) {
307 LOGP(warning, "DriverClient connected successfully. Flushing message backlog of {} messages. All is good.", mBacklog.size());
308 printed1 = false;
309 printed2 = false;
310 }
311 mClient->write(mBacklog);
312 mBacklog.resize(0);
313}
314
315} // namespace o2::framework
benchmark::State & state
struct uv_async_s uv_async_t
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:473
#define O2_LOG_DISABLE(log)
Definition Signpost.h:479
#define O2_LOG_ENABLE(log)
Definition Signpost.h:478
struct uv_connect_s uv_connect_t
void dispatch(std::string_view event)
Dispatch an event.
void observe(char const *eventType, std::function< void(std::string_view)> callback)
Request action on some eventType notified by the driver.
void setDPLClient(std::unique_ptr< WSDPLClient >)
void tell(const char *msg, size_t s, bool flush=true) final
void flushPending(ServiceRegistryRef mainThreadRef) final
WSDriverClient(ServiceRegistryRef registry, char const *ip, unsigned short port)
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
void on_connect(uv_connect_t *connection, int status)
void encode_websocket_frames(std::vector< uv_buf_t > &outputs, char const *src, size_t size, WebSocketOpCode opcode, uint32_t mask)
void sendMessageToDriver(std::unique_ptr< o2::framework::WSDPLClient > &client, char const *message, size_t s)
void on_awake_main_thread(uv_async_t *handle)
void endFragmentation() override
FIXME: not implemented.
void beginFragmentation() override
FIXME: not implemented by the backend.
ClientWebSocketHandler(WSDriverClient &client)
void headers(std::map< std::string, std::string > const &headers) override
Invoked when all the headers are received.
void control(char const *frame, size_t s) override
FIXME: not implemented.
void frame(char const *frame, size_t s) override
int64_t memory
How much memory it can use.
int64_t sharedMemory
How much shared memory it can allocate.
int user
Which task is using the offer.
int cpu
How many cores it can use.
Running state information of a given device.
Definition DeviceState.h:34
An handler for a websocket message stream.
Definition HTTPParser.h:136
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
uint64_t const void const *restrict const msg
Definition x9.h:153