82 LOG(error) <<
"Unable to connect to driver.";
89 auto onHandshake = [client,
ref = context->ref]() {
92 std::lock_guard<std::mutex> lock(client->
mutex());
93 auto handler = std::make_unique<ClientWebSocketHandler>(*client);
94 client->
observe(
"/ping", [](std::string_view) {
98 client->
observe(
"/shm-offer", [
ref = context->ref](std::string_view cmd) {
100 static constexpr int prefixSize = std::string_view{
"/shm-offer "}.size();
101 if (prefixSize > cmd.size()) {
102 LOG(error) <<
"Malformed shared memory offer";
105 cmd.remove_prefix(prefixSize);
107 auto offerSizeError = std::from_chars(cmd.data(), cmd.data() + cmd.size(), offerSize);
108 if (offerSizeError.ec != std::errc()) {
109 LOG(error) <<
"Malformed shared memory offer";
112 LOGP(detail,
"Received {}MB shared memory offer", offerSize);
121 state.pendingOffers.push_back(offer);
123 client->
observe(
"/timeslice-offer", [
ref = context->ref](std::string_view cmd) {
125 O2_SIGNPOST_START(ws_client, wid,
"timeslice-offer",
"Received timeslice offer.");
127 static constexpr int prefixSize = std::string_view{
"/timeslice-offer "}.size();
128 if (prefixSize > cmd.size()) {
132 cmd.remove_prefix(prefixSize);
134 auto offerSizeError = std::from_chars(cmd.data(), cmd.data() + cmd.size(), offerSize);
135 if (offerSizeError.ec != std::errc()) {
143 .timeslices = offerSize,
147 state.pendingOffers.push_back(offer);
148 O2_SIGNPOST_END(ws_client, wid,
"timeslice-offer",
"Received %lli timeslices offer. Total pending offers %zu.",
149 offerSize,
state.pendingOffers.size());
152 client->
observe(
"/quit", [
ref = context->ref](std::string_view) {
154 state.quitRequested =
true;
157 client->
observe(
"/restart", [
ref = context->ref](std::string_view) {
159 state.nextFairMQState.emplace_back(
"RUN");
160 state.nextFairMQState.emplace_back(
"STOP");
163 client->
observe(
"/start", [
ref = context->ref](std::string_view) {
165 state.nextFairMQState.emplace_back(
"RUN");
168 client->
observe(
"/stop", [
ref = context->ref](std::string_view) {
170 state.nextFairMQState.emplace_back(
"STOP");
173 client->
observe(
"/trace", [
ref = context->ref](std::string_view cmd) {
175 static constexpr int prefixSize = std::string_view{
"/trace "}.size();
176 if (prefixSize > cmd.size()) {
177 LOG(error) <<
"Malformed tracing request";
180 cmd.remove_prefix(prefixSize);
181 int tracingFlags = 0;
182 auto error = std::from_chars(cmd.data(), cmd.data() + cmd.size(), tracingFlags);
183 if (error.ec != std::errc()) {
184 LOG(error) <<
"Malformed tracing mask";
187 LOGP(info,
"Tracing flags set to {}", tracingFlags);
188 state.tracingFlags = tracingFlags;
191 client->
observe(
"/signpost:enable", [](std::string_view cmd) {
192 static constexpr int prefixSize = std::string_view{
"/signpost:enable "}.size();
193 if (cmd.size() <= prefixSize) {
194 LOG(error) <<
"Malformed /signpost:enable request";
197 std::string
name(cmd.substr(prefixSize));
198 o2_walk_logs([](
char const* logName,
void* l,
void* context) ->
bool {
200 auto*
target =
static_cast<std::string*
>(context);
209 client->
observe(
"/signpost:disable", [](std::string_view cmd) {
210 static constexpr int prefixSize = std::string_view{
"/signpost:disable "}.size();
211 if (cmd.size() <= prefixSize) {
212 LOG(error) <<
"Malformed /signpost:disable request";
215 std::string
name(cmd.substr(prefixSize));
216 o2_walk_logs([](
char const* logName,
void* l,
void* context) ->
bool {
218 auto*
target =
static_cast<std::string*
>(context);
229 auto dplClient = std::make_unique<WSDPLClient>();
230 dplClient->connect(context->ref, connection->handle, onHandshake, std::move(handler));
242 : mRegistry(registry)
249 auto* socket = (uv_tcp_t*)malloc(
sizeof(uv_tcp_t));
250 uv_tcp_init(
state.loop, socket);
253 connection->data = context;
255 struct sockaddr_in dest;
256 uv_ip4_addr(strdup(ip), port, &dest);
257 uv_tcp_connect(connection, socket, (
const struct sockaddr*)&dest,
on_connect);
260 this->mAwakeMainThread->data = &
state;
o2_log_handle_t * o2_walk_logs(bool(*callback)(char const *name, void *log, void *context), void *context=nullptr)
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_END(log, id, name, format,...)
void _o2_log_set_stacktrace(_o2_log_t *log, int stacktrace)
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_END_WITH_ERROR(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)