82 LOG(error) <<
"Unable to connect to driver.";
89 auto onHandshake = [client,
ref = context->ref]() {
92 std::lock_guard<std::mutex> lock(client->
mutex());
93 auto handler = std::make_unique<ClientWebSocketHandler>(*client);
94 client->
observe(
"/ping", [](std::string_view) {
98 client->
observe(
"/shm-offer", [
ref = context->ref](std::string_view cmd) {
100 static constexpr int prefixSize = std::string_view{
"/shm-offer "}.size();
101 if (prefixSize > cmd.size()) {
102 LOG(error) <<
"Malformed shared memory offer";
105 cmd.remove_prefix(prefixSize);
107 auto offerSizeError = std::from_chars(cmd.data(), cmd.data() + cmd.size(), offerSize);
108 if (offerSizeError.ec != std::errc()) {
109 LOG(error) <<
"Malformed shared memory offer";
112 LOGP(detail,
"Received {}MB shared memory offer", offerSize);
121 state.pendingOffers.push_back(offer);
123 client->
observe(
"/timeslice-offer", [
ref = context->ref](std::string_view cmd) {
125 O2_SIGNPOST_START(ws_client, wid,
"timeslice-offer",
"Received timeslice offer.");
127 static constexpr int prefixSize = std::string_view{
"/timeslice-offer "}.size();
128 if (prefixSize > cmd.size()) {
132 cmd.remove_prefix(prefixSize);
134 auto offerSizeError = std::from_chars(cmd.data(), cmd.data() + cmd.size(), offerSize);
135 if (offerSizeError.ec != std::errc()) {
143 .timeslices = offerSize,
147 state.pendingOffers.push_back(offer);
148 O2_SIGNPOST_END(ws_client, wid,
"timeslice-offer",
"Received %lli timeslices offer. Total pending offers %zu.",
149 offerSize,
state.pendingOffers.size());
152 client->
observe(
"/quit", [
ref = context->ref](std::string_view) {
154 state.quitRequested =
true;
157 client->
observe(
"/restart", [
ref = context->ref](std::string_view) {
159 state.nextFairMQState.emplace_back(
"RUN");
160 state.nextFairMQState.emplace_back(
"STOP");
163 client->
observe(
"/start", [
ref = context->ref](std::string_view) {
165 state.nextFairMQState.emplace_back(
"RUN");
168 client->
observe(
"/stop", [
ref = context->ref](std::string_view) {
170 state.nextFairMQState.emplace_back(
"STOP");
173 client->
observe(
"/trace", [
ref = context->ref](std::string_view cmd) {
175 static constexpr int prefixSize = std::string_view{
"/trace "}.size();
176 if (prefixSize > cmd.size()) {
177 LOG(error) <<
"Malformed tracing request";
180 cmd.remove_prefix(prefixSize);
181 int tracingFlags = 0;
182 auto error = std::from_chars(cmd.data(), cmd.data() + cmd.size(), tracingFlags);
183 if (error.ec != std::errc()) {
184 LOG(error) <<
"Malformed tracing mask";
187 LOGP(info,
"Tracing flags set to {}", tracingFlags);
188 state.tracingFlags = tracingFlags;
191 client->
observe(
"/log-streams", [
ref = context->ref](std::string_view cmd) {
193 static constexpr int prefixSize = std::string_view{
"/log-streams "}.size();
194 if (prefixSize > cmd.size()) {
195 LOG(error) <<
"Malformed log-streams request";
198 cmd.remove_prefix(prefixSize);
201 auto error = std::from_chars(cmd.data(), cmd.data() + cmd.size(), logStreams);
202 if (error.ec != std::errc()) {
203 LOG(error) <<
"Malformed log-streams mask";
206 LOGP(info,
"Logstreams flags set to {}", logStreams);
207 state.logStreams = logStreams;
237 auto dplClient = std::make_unique<WSDPLClient>();
238 dplClient->connect(context->ref, connection->handle, onHandshake, std::move(handler));
250 : mRegistry(registry)
257 auto* socket = (uv_tcp_t*)malloc(
sizeof(uv_tcp_t));
258 uv_tcp_init(
state.loop, socket);
261 connection->data = context;
263 struct sockaddr_in dest;
264 uv_ip4_addr(strdup(ip), port, &dest);
265 uv_tcp_connect(connection, socket, (
const struct sockaddr*)&dest,
on_connect);
268 this->mAwakeMainThread->data = &
state;
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_LOG_DISABLE(log)
#define O2_LOG_ENABLE(log)
#define O2_SIGNPOST_END_WITH_ERROR(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)