24 boost::optional<std::string>
origin;
25 boost::optional<std::string> description;
26 boost::optional<uint32_t> subSpec;
28 if (std::holds_alternative<ConcreteDataMatcher>(input.matcher.matcher)) {
29 origin = std::get<ConcreteDataMatcher>(input.matcher.matcher).origin.
str;
30 description = std::get<ConcreteDataMatcher>(input.matcher.matcher).description.str;
31 subSpec = std::get<ConcreteDataMatcher>(input.matcher.matcher).subSpec;
35 .
binding = input.matcher.binding,
36 .sourceChannel = input.sourceChannel,
37 .timeslice = input.timeslice,
39 .description = description,
46 std::string description;
47 boost::optional<uint32_t> subSpec;
49 if (std::holds_alternative<ConcreteDataMatcher>(
output.matcher.matcher)) {
50 origin = std::get<ConcreteDataMatcher>(
output.matcher.matcher).origin.
str;
51 description = std::get<ConcreteDataMatcher>(
output.matcher.matcher).description.str;
52 subSpec = std::get<ConcreteDataMatcher>(
output.matcher.matcher).subSpec;
54 origin = std::get<ConcreteDataTypeMatcher>(
output.matcher.matcher).origin.
str;
55 description = std::get<ConcreteDataTypeMatcher>(
output.matcher.matcher).description.str;
61 .timeslice =
output.timeslice,
62 .maxTimeslices =
output.maxTimeslices,
64 .description = description,
70 boost::optional<std::string>
origin;
71 boost::optional<std::string> description;
72 boost::optional<uint32_t> subSpec;
74 if (std::holds_alternative<ConcreteDataMatcher>(forward.matcher.matcher)) {
75 origin = std::get<ConcreteDataMatcher>(forward.matcher.matcher).origin.
str;
76 description = std::get<ConcreteDataMatcher>(forward.matcher.matcher).description.str;
77 subSpec = std::get<ConcreteDataMatcher>(forward.matcher.matcher).subSpec;
81 .
binding = forward.matcher.binding,
82 .timeslice = forward.timeslice,
83 .maxTimeslices = forward.maxTimeslices,
84 .channel = forward.channel,
86 .description = description,
96 msg.specs.inputs = std::vector<DIMessages::RegisterDevice::Specs::Input>{};
98 return toRegisterMessageSpec(input);
101 msg.specs.outputs = std::vector<DIMessages::RegisterDevice::Specs::Output>{};
103 return toRegisterMessageSpec(output);
106 msg.specs.forwards = std::vector<DIMessages::RegisterDevice::Specs::Forward>{};
108 return toRegisterMessageSpec(forward);
111 msg.specs.maxInputTimeslices = spec.maxInputTimeslices;
112 msg.specs.inputTimesliceId = spec.inputTimesliceId;
113 msg.specs.nSlots = spec.nSlots;
114 msg.specs.rank = spec.rank;
123 const std::string& runId) : serviceRegistry(serviceRegistry),
124 deviceName(spec.
name),
129 socket.
send(
DIMessage{DIMessage::Header::Type::DEVICE_ON, createRegisterMessage(spec, runId).
toJson()});
130 }
catch (
const std::runtime_error& error) {
131 LOG(error) << error.what();
139 socket.
send(
DIMessage{DIMessage::Header::Type::DEVICE_OFF, std::string{deviceName}});
140 }
catch (
const std::runtime_error& error) {
141 LOG(error) << error.what();
153 }
catch (
const std::runtime_error& error) {
154 LOG(error) << error.what();
163 }
catch (
const std::runtime_error& error) {
164 LOG(error) << error.what();
169void DataInspectorProxyService::handleMessage(
const DIMessage&
msg)
171 switch (
msg.header.type()) {
172 case DIMessage::Header::Type::INSPECT_ON: {
173 LOG(info) <<
"DIService - INSPECT ON";
177 case DIMessage::Header::Type::INSPECT_OFF: {
178 LOG(info) <<
"DIService - INSPECT OFF";
179 _isInspected =
false;
182 case DIMessage::Header::Type::TERMINATE: {
183 LOG(info) <<
"DIService - TERMINATE";
188 LOG(info) <<
"DIService - Wrong msg type: " <<
static_cast<uint32_t
>(
msg.header.type());
193void DataInspectorProxyService::terminate()
201 .
name =
"data-inspector-proxy",
203 std::string proxyAddress = std::getenv(
"O2_DATAINSPECTOR_ADDRESS");
204 auto proxyPort = std::stoi(std::getenv(
"O2_DATAINSPECTOR_PORT"));
205 std::string runId = std::getenv(
"O2_DATAINSPECTOR_ID");
210 return ServiceHandle{TypeIdHelpers::uniqueId<DataInspectorProxyService>(), diService};
224 if (diService.isInspected()) {
225 std::vector <DataRef> refs{};
227 while (
i < parts.Size()) {
228 auto header = o2::header::get<o2::header::DataHeader *>((
char *) parts.At(
i)->GetData());
230 int payloadParts = (
int) header->splitPayloadParts;
231 int lastPart =
i + payloadParts;
232 while (
i < lastPart) {
234 refs.push_back(
DataRef{
nullptr, (
char *) parts.At(0)->GetData(), (
char *) parts.At(
i)->GetData(),
235 parts.At(
i)->GetSize()});
242 for (
auto &proxyMessage: proxyMessages) {
243 diService.send(std::move(proxyMessage));
bool isMessageAvailable()
void send(const DIMessage &message)
DataInspectorProxyService(ServiceRegistryRef serviceRegistry, DeviceSpec const &spec, const std::string &address, int port, const std::string &runId)
void send(DIMessage &&message)
~DataInspectorProxyService()
GLuint GLuint64EXT address
GLuint const GLchar * name
consteval header::DataOrigin origin()
std::vector< DIMessage > serializeO2Messages(const std::vector< DataRef > &refs, const std::string &deviceName)
bool isNonInternalDevice(const DeviceSpec &spec)
Defining PrimaryVertex explicitly as messageable.
@ All
Quit all data processor, regardless of their state.
static ServiceConfigureCallback noConfiguration()
auto create() -> ServiceSpec *final
std::string name
The name of the associated DataProcessorSpec.
Running state information of a given device.
std::string name
Name of the service.
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
uint64_t const void const *restrict const msg