81 LOG(error) <<
"Unable to connect to driver.";
88 auto onHandshake = [client,
ref = context->ref]() {
91 std::lock_guard<std::mutex> lock(client->
mutex());
92 auto handler = std::make_unique<ClientWebSocketHandler>(*client);
93 client->
observe(
"/ping", [](std::string_view) {
97 client->
observe(
"/shm-offer", [
ref = context->ref](std::string_view cmd) {
99 static constexpr int prefixSize = std::string_view{
"/shm-offer "}.size();
100 if (prefixSize > cmd.size()) {
101 LOG(error) <<
"Malformed shared memory offer";
104 cmd.remove_prefix(prefixSize);
106 auto offerSizeError = std::from_chars(cmd.data(), cmd.data() + cmd.size(), offerSize);
107 if (offerSizeError.ec != std::errc()) {
108 LOG(error) <<
"Malformed shared memory offer";
111 LOGP(detail,
"Received {}MB shared memory offer", offerSize);
120 state.pendingOffers.push_back(offer);
123 client->
observe(
"/quit", [
ref = context->ref](std::string_view) {
128 client->
observe(
"/restart", [
ref = context->ref](std::string_view) {
131 state.nextFairMQState.emplace_back(
"STOP");
134 client->
observe(
"/start", [
ref = context->ref](std::string_view) {
139 client->
observe(
"/stop", [
ref = context->ref](std::string_view) {
144 client->
observe(
"/trace", [
ref = context->ref](std::string_view cmd) {
146 static constexpr int prefixSize = std::string_view{
"/trace "}.size();
147 if (prefixSize > cmd.size()) {
148 LOG(error) <<
"Malformed tracing request";
151 cmd.remove_prefix(prefixSize);
152 int tracingFlags = 0;
153 auto error = std::from_chars(cmd.data(), cmd.data() + cmd.size(), tracingFlags);
154 if (error.ec != std::errc()) {
155 LOG(error) <<
"Malformed tracing mask";
158 LOGP(info,
"Tracing flags set to {}", tracingFlags);
159 state.tracingFlags = tracingFlags;
162 client->
observe(
"/log-streams", [
ref = context->ref](std::string_view cmd) {
164 static constexpr int prefixSize = std::string_view{
"/log-streams "}.size();
165 if (prefixSize > cmd.size()) {
166 LOG(error) <<
"Malformed log-streams request";
169 cmd.remove_prefix(prefixSize);
172 auto error = std::from_chars(cmd.data(), cmd.data() + cmd.size(), logStreams);
173 if (error.ec != std::errc()) {
174 LOG(error) <<
"Malformed log-streams mask";
177 LOGP(info,
"Logstreams flags set to {}", logStreams);
178 state.logStreams = logStreams;
208 auto dplClient = std::make_unique<WSDPLClient>();
209 dplClient->connect(context->ref, connection->handle, onHandshake, std::move(handler));
221 : mRegistry(registry)
228 auto* socket = (uv_tcp_t*)malloc(
sizeof(uv_tcp_t));
229 uv_tcp_init(
state.loop, socket);
232 connection->data = context;
234 struct sockaddr_in dest;
235 uv_ip4_addr(strdup(ip), port, &dest);
236 uv_tcp_connect(connection, socket, (
const struct sockaddr*)&dest,
on_connect);
239 this->mAwakeMainThread->data = &
state;
245 free(this->mAwakeMainThread);
254 mClient = std::move(client);
260 mClient->sendHandshake();
268 std::lock_guard<std::mutex> lock(mClientMutex);
275void WSDriverClient::awake()
277 uv_async_send(mAwakeMainThread);
283 LOG(error) <<
"flushPending not called from main thread";
285 std::lock_guard<std::mutex> lock(mClientMutex);
286 static bool printed1 =
false;
287 static bool printed2 =
false;
289 if (mBacklog.size() > 2000) {
291 LOG(warning) <<
"Unable to communicate with driver because client does not exist. Continuing connection attempts.";
297 if (!(mClient->isHandshaken())) {
298 if (mBacklog.size() > 2000) {
300 LOG(warning) <<
"Unable to communicate with driver because client is not connected. Continuing connection attempts.";
306 if (printed1 || printed2) {
307 LOGP(warning,
"DriverClient connected successfully. Flushing message backlog of {} messages. All is good.", mBacklog.size());
311 mClient->write(mBacklog);
struct uv_async_s uv_async_t
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_LOG_DISABLE(log)
#define O2_LOG_ENABLE(log)
struct uv_connect_s uv_connect_t
void dispatch(std::string_view event)
Dispatch an event.
void observe(char const *eventType, std::function< void(std::string_view)> callback)
Request action on some eventType notified by the driver.
void setDPLClient(std::unique_ptr< WSDPLClient >)
void tell(const char *msg, size_t s, bool flush=true) final
void flushPending(ServiceRegistryRef mainThreadRef) final
WSDriverClient(ServiceRegistryRef registry, char const *ip, unsigned short port)
GLuint GLsizei const GLchar * message
Defining PrimaryVertex explicitly as messageable.
void on_connect(uv_connect_t *connection, int status)
void encode_websocket_frames(std::vector< uv_buf_t > &outputs, char const *src, size_t size, WebSocketOpCode opcode, uint32_t mask)
void sendMessageToDriver(std::unique_ptr< o2::framework::WSDPLClient > &client, char const *message, size_t s)
void on_awake_main_thread(uv_async_t *handle)
void endFragmentation() override
FIXME: not implemented.
void beginFragmentation() override
FIXME: not implemented by the backend.
ClientWebSocketHandler(WSDriverClient &client)
void headers(std::map< std::string, std::string > const &headers) override
Invoked when all the headers are received.
void control(char const *frame, size_t s) override
FIXME: not implemented.
void frame(char const *frame, size_t s) override
void beginChunk() override
int64_t memory
How much memory it can use.
int64_t sharedMemory
How much shared memory it can allocate.
int user
Which task is using the offer.
int cpu
How many cores it can use.
Running state information of a given device.
std::vector< std::string > nextFairMQState
int loopReason
Bitmask of LoopReason which caused this iterations.
@ DATA_PROCESSOR_CONTEXT_LOG
An handler for a websocket message stream.
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
uint64_t const void const *restrict const msg