Project
Loading...
Searching...
No Matches
ControlServiceHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
13#include "Framework/Logger.h"
17#include <string>
18#include <string_view>
19#include <regex>
20
21namespace o2::framework
22{
23
24bool ControlServiceHelpers::parseControl(std::string_view const& s, std::match_results<std::string_view::const_iterator>& match)
25{
26 size_t pos = s.find("CONTROL_ACTION: ");
27 if (pos == std::string::npos) {
28 return false;
29 }
30 const static std::regex controlRE1("^READY_TO_(QUIT)_(ME|ALL)", std::regex::optimize);
31 const static std::regex controlRE2("^(NOTIFY_STREAMING_STATE) (IDLE|STREAMING|EOS)", std::regex::optimize);
32 const static std::regex controlRE3("^(NOTIFY_DEVICE_STATE) ([A-Z ]*)", std::regex::optimize);
33 const static std::regex controlRE4("^(PUT) (.*)", std::regex::optimize);
34 std::string_view sv = s.substr(pos + strlen("CONTROL_ACTION: "));
35 return std::regex_search(sv.begin(), sv.end(), match, controlRE1) ||
36 std::regex_search(sv.begin(), sv.end(), match, controlRE2) ||
37 std::regex_search(sv.begin(), sv.end(), match, controlRE3) ||
38 std::regex_search(sv.begin(), sv.end(), match, controlRE4);
39}
40
41void ControlServiceHelpers::processCommand(std::vector<DeviceInfo>& infos,
42 std::vector<DataProcessingStates>& allStates,
43 pid_t pid,
44 std::string const& command,
45 std::string const& arg)
46{
47 auto doToMatchingPid = [&](std::vector<DeviceInfo>& infos, pid_t pid, auto lambda) {
48 assert(infos.size() == allStates.size());
49 for (size_t i = 0; i < infos.size(); ++i) {
50 auto& deviceInfo = infos[i];
51 if (deviceInfo.pid == pid) {
52 return lambda(deviceInfo);
53 }
54 }
55 LOGP(error, "Command received for pid {} which does not exists.", pid);
56 };
57 auto doToMatchingStatePid = [&](std::vector<DeviceInfo>& infos, std::vector<DataProcessingStates>& allStates, pid_t pid, auto lambda) {
58 assert(infos.size() == allStates.size());
59 for (size_t i = 0; i < infos.size(); ++i) {
60 auto& deviceInfo = infos[i];
61 auto& states = allStates[i];
62 if (deviceInfo.pid == pid) {
63 return lambda(deviceInfo, states);
64 }
65 }
66 LOGP(error, "Command received for pid {} which does not exists.", pid);
67 };
68 LOGP(debug2, "Found control command {} from pid {} with argument {}.", command, pid, arg);
69 if (command == "QUIT" && arg == "ALL") {
70 for (auto& deviceInfo : infos) {
71 deviceInfo.readyToQuit = true;
72 }
73 } else if (command == "QUIT" && arg == "ME") {
74 doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.readyToQuit = true; });
75 } else if (command == "NOTIFY_STREAMING_STATE" && arg == "IDLE") {
76 // FIXME: this should really be a policy...
77 doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.readyToQuit = true; info.streamingState = StreamingState::Idle; });
78 } else if (command == "NOTIFY_STREAMING_STATE" && arg == "STREAMING") {
79 // FIXME: this should really be a policy...
80 doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.streamingState = StreamingState::Streaming; });
81 } else if (command == "NOTIFY_STREAMING_STATE" && arg == "EOS") {
82 // FIXME: this should really be a policy...
83 doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.streamingState = StreamingState::EndOfStreaming; });
84 } else if (command == "NOTIFY_DEVICE_STATE") {
85 doToMatchingPid(infos, pid, [arg](DeviceInfo& info) { info.deviceState = arg; info.providedState++; });
86 } else if (command == "PUT") {
87 doToMatchingStatePid(infos, allStates, pid, [&arg](DeviceInfo& info, DataProcessingStates& states) {
89 // find the first space, that is the beginning of the key.
90 // Find the position of the fist space in beginKey.
91 auto beginKey = 0;
92 // If we did not find it complain and return.
93 if (beginKey == std::string::npos) {
94 LOGP(error, "Cannot parse key in PUT command with arg {} for device {}", arg, info.pid);
95 return;
96 }
97 auto endKey = arg.find(' ', beginKey + 1);
98 if (endKey == std::string::npos) {
99 LOGP(error, "Cannot parse timestamp in PUT command with arg {}", arg);
100 return;
101 }
102 auto beginTimestamp = endKey + 1;
103 auto endTimestamp = arg.find(' ', beginTimestamp + 1);
104 if (endTimestamp == std::string::npos) {
105 LOGP(error, "Cannot parse value in PUT command with arg {}", arg);
106 return;
107 }
108 auto beginValue = endTimestamp + 1;
109 auto endValue = arg.size();
110
111 std::string_view key(arg.data() + beginKey, endKey - beginKey);
112 std::string_view timestamp(arg.data() + beginTimestamp, endTimestamp - beginTimestamp);
113 std::string_view value(arg.data() + beginValue, endValue - beginValue);
114 // Find the assocaiated StateSpec and get the id.
115 auto spec = std::find_if(states.stateSpecs.begin(), states.stateSpecs.end(), [&key](auto const& spec) {
116 return spec.name == key;
117 });
118 if (spec == states.stateSpecs.end()) {
119 LOGP(warn, "Cannot find state {}", key.data());
120 return;
121 }
122 if (value.data() == nullptr) {
123 LOGP(debug, "State {} value is null skipping", key.data());
124 return;
125 }
128 states.updateState(DataProcessingStates::CommandSpec{.id = spec->stateId, .size = (int)value.size(), .data = value.data()});
129 states.processCommandQueue();
130 });
131 } else {
132 LOGP(error, "Unknown command {} with argument {}", command, arg);
133 }
134};
135
136} // namespace o2::framework
int32_t i
uint16_t pos
Definition RawData.h:3
uint16_t pid
Definition RawData.h:2
std::ostringstream debug
StringRef key
bool match(const std::vector< std::string > &queries, const char *pattern)
Definition dcs-ccdb.cxx:229
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLuint * states
Definition glcorearb.h:4932
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
@ EndOfStreaming
End of streaming requested, but not notified.
@ Streaming
Data is being processed.
@ Idle
End of streaming notified.
static bool parseControl(std::string_view const &s, std::match_results< std::string_view::const_iterator > &match)
static void processCommand(std::vector< DeviceInfo > &infos, std::vector< DataProcessingStates > &allStates, pid_t pid, std::string const &command, std::string const &arg)
pid_t pid
The pid of the device associated to this device.
Definition DeviceInfo.h:36
bool readyToQuit
Whether the device is ready to quit.
Definition DeviceInfo.h:67
int providedState
An incremental number for the state of the device.
Definition DeviceInfo.h:88
StreamingState streamingState
The current state of the device, as reported by it.
Definition DeviceInfo.h:69