26 size_t pos = s.find(
"CONTROL_ACTION: ");
27 if (
pos == std::string::npos) {
30 const static std::regex controlRE1(
"^READY_TO_(QUIT)_(ME|ALL)", std::regex::optimize);
31 const static std::regex controlRE2(
"^(NOTIFY_STREAMING_STATE) (IDLE|STREAMING|EOS)", std::regex::optimize);
32 const static std::regex controlRE3(
"^(NOTIFY_DEVICE_STATE) ([A-Z ]*)", std::regex::optimize);
33 const static std::regex controlRE4(
"^(PUT) (.*)", std::regex::optimize);
34 std::string_view sv = s.substr(
pos + strlen(
"CONTROL_ACTION: "));
35 return std::regex_search(sv.begin(), sv.end(),
match, controlRE1) ||
36 std::regex_search(sv.begin(), sv.end(),
match, controlRE2) ||
37 std::regex_search(sv.begin(), sv.end(),
match, controlRE3) ||
38 std::regex_search(sv.begin(), sv.end(),
match, controlRE4);
42 std::vector<DataProcessingStates>& allStates,
44 std::string
const& command,
45 std::string
const& arg)
47 auto doToMatchingPid = [&](std::vector<DeviceInfo>& infos, pid_t
pid,
auto lambda) {
48 assert(infos.size() == allStates.size());
49 for (
size_t i = 0;
i < infos.size(); ++
i) {
50 auto& deviceInfo = infos[
i];
51 if (deviceInfo.pid ==
pid) {
52 return lambda(deviceInfo);
55 LOGP(error,
"Command received for pid {} which does not exists.",
pid);
57 auto doToMatchingStatePid = [&](std::vector<DeviceInfo>& infos, std::vector<DataProcessingStates>& allStates, pid_t
pid,
auto lambda) {
58 assert(infos.size() == allStates.size());
59 for (
size_t i = 0;
i < infos.size(); ++
i) {
60 auto& deviceInfo = infos[
i];
62 if (deviceInfo.pid ==
pid) {
63 return lambda(deviceInfo,
states);
66 LOGP(error,
"Command received for pid {} which does not exists.",
pid);
68 LOGP(debug2,
"Found control command {} from pid {} with argument {}.", command,
pid, arg);
69 if (command ==
"QUIT" && arg ==
"ALL") {
70 for (
auto& deviceInfo : infos) {
71 deviceInfo.readyToQuit =
true;
73 }
else if (command ==
"QUIT" && arg ==
"ME") {
75 }
else if (command ==
"NOTIFY_STREAMING_STATE" && arg ==
"IDLE") {
78 }
else if (command ==
"NOTIFY_STREAMING_STATE" && arg ==
"STREAMING") {
81 }
else if (command ==
"NOTIFY_STREAMING_STATE" && arg ==
"EOS") {
84 }
else if (command ==
"NOTIFY_DEVICE_STATE") {
86 }
else if (command ==
"PUT") {
93 if (beginKey == std::string::npos) {
94 LOGP(error,
"Cannot parse key in PUT command with arg {} for device {}", arg, info.
pid);
97 auto endKey = arg.find(
' ', beginKey + 1);
98 if (endKey == std::string::npos) {
99 LOGP(error,
"Cannot parse timestamp in PUT command with arg {}", arg);
102 auto beginTimestamp = endKey + 1;
103 auto endTimestamp = arg.find(
' ', beginTimestamp + 1);
104 if (endTimestamp == std::string::npos) {
105 LOGP(error,
"Cannot parse value in PUT command with arg {}", arg);
108 auto beginValue = endTimestamp + 1;
109 auto endValue = arg.size();
111 std::string_view
key(arg.data() + beginKey, endKey - beginKey);
112 std::string_view timestamp(arg.data() + beginTimestamp, endTimestamp - beginTimestamp);
113 std::string_view
value(arg.data() + beginValue, endValue - beginValue);
115 auto spec = std::find_if(
states.stateSpecs.begin(),
states.stateSpecs.end(), [&
key](
auto const& spec) {
116 return spec.name == key;
118 if (spec ==
states.stateSpecs.end()) {
119 LOGP(warn,
"Cannot find state {}",
key.data());
122 if (
value.data() ==
nullptr) {
123 LOGP(
debug,
"State {} value is null skipping",
key.data());
129 states.processCommandQueue();
132 LOGP(error,
"Unknown command {} with argument {}", command, arg);