81 LOG(error) <<
"Unable to connect to driver.";
88 auto onHandshake = [client,
ref = context->ref]() {
91 std::lock_guard<std::mutex> lock(client->
mutex());
92 auto handler = std::make_unique<ClientWebSocketHandler>(*client);
93 client->
observe(
"/ping", [](std::string_view) {
97 client->
observe(
"/shm-offer", [
ref = context->ref](std::string_view cmd) {
99 static constexpr int prefixSize = std::string_view{
"/shm-offer "}.size();
100 if (prefixSize > cmd.size()) {
101 LOG(error) <<
"Malformed shared memory offer";
104 cmd.remove_prefix(prefixSize);
106 auto offerSizeError = std::from_chars(cmd.data(), cmd.data() + cmd.size(), offerSize);
107 if (offerSizeError.ec != std::errc()) {
108 LOG(error) <<
"Malformed shared memory offer";
111 LOGP(detail,
"Received {}MB shared memory offer", offerSize);
120 state.pendingOffers.push_back(offer);
123 client->
observe(
"/quit", [
ref = context->ref](std::string_view) {
125 state.quitRequested =
true;
128 client->
observe(
"/restart", [
ref = context->ref](std::string_view) {
130 state.nextFairMQState.emplace_back(
"RUN");
131 state.nextFairMQState.emplace_back(
"STOP");
134 client->
observe(
"/start", [
ref = context->ref](std::string_view) {
136 state.nextFairMQState.emplace_back(
"RUN");
139 client->
observe(
"/stop", [
ref = context->ref](std::string_view) {
141 state.nextFairMQState.emplace_back(
"STOP");
144 client->
observe(
"/trace", [
ref = context->ref](std::string_view cmd) {
146 static constexpr int prefixSize = std::string_view{
"/trace "}.size();
147 if (prefixSize > cmd.size()) {
148 LOG(error) <<
"Malformed tracing request";
151 cmd.remove_prefix(prefixSize);
152 int tracingFlags = 0;
153 auto error = std::from_chars(cmd.data(), cmd.data() + cmd.size(), tracingFlags);
154 if (error.ec != std::errc()) {
155 LOG(error) <<
"Malformed tracing mask";
158 LOGP(info,
"Tracing flags set to {}", tracingFlags);
159 state.tracingFlags = tracingFlags;
162 client->
observe(
"/log-streams", [
ref = context->ref](std::string_view cmd) {
164 static constexpr int prefixSize = std::string_view{
"/log-streams "}.size();
165 if (prefixSize > cmd.size()) {
166 LOG(error) <<
"Malformed log-streams request";
169 cmd.remove_prefix(prefixSize);
172 auto error = std::from_chars(cmd.data(), cmd.data() + cmd.size(), logStreams);
173 if (error.ec != std::errc()) {
174 LOG(error) <<
"Malformed log-streams mask";
177 LOGP(info,
"Logstreams flags set to {}", logStreams);
178 state.logStreams = logStreams;
208 auto dplClient = std::make_unique<WSDPLClient>();
209 dplClient->connect(context->ref, connection->handle, onHandshake, std::move(handler));
221 : mRegistry(registry)
228 auto* socket = (uv_tcp_t*)malloc(
sizeof(uv_tcp_t));
229 uv_tcp_init(
state.loop, socket);
232 connection->data = context;
234 struct sockaddr_in dest;
235 uv_ip4_addr(strdup(ip), port, &dest);
236 uv_tcp_connect(connection, socket, (
const struct sockaddr*)&dest,
on_connect);
239 this->mAwakeMainThread->data = &
state;
283 LOG(error) <<
"flushPending not called from main thread";
285 std::lock_guard<std::mutex> lock(mClientMutex);
286 static bool printed1 =
false;
287 static bool printed2 =
false;
289 if (mBacklog.size() > 2000) {
291 LOG(warning) <<
"Unable to communicate with driver because client does not exist. Continuing connection attempts.";
297 if (!(mClient->isHandshaken())) {
298 if (mBacklog.size() > 2000) {
300 LOG(warning) <<
"Unable to communicate with driver because client is not connected. Continuing connection attempts.";
306 if (printed1 || printed2) {
307 LOGP(warning,
"DriverClient connected successfully. Flushing message backlog of {} messages. All is good.", mBacklog.size());
311 mClient->write(mBacklog);
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_LOG_DISABLE(log)
#define O2_LOG_ENABLE(log)