Project
Loading...
Searching...
No Matches
DataInspectorService.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
12#include "DataInspector.h"
16#include "DIMessages.h"
18#include <cstdlib>
19
20namespace o2::framework
21{
22static DIMessages::RegisterDevice::Specs::Input toRegisterMessageSpec(const InputRoute& input)
23{
24 boost::optional<std::string> origin;
25 boost::optional<std::string> description;
26 boost::optional<uint32_t> subSpec;
27
28 if (std::holds_alternative<ConcreteDataMatcher>(input.matcher.matcher)) {
29 origin = std::get<ConcreteDataMatcher>(input.matcher.matcher).origin.str;
30 description = std::get<ConcreteDataMatcher>(input.matcher.matcher).description.str;
31 subSpec = std::get<ConcreteDataMatcher>(input.matcher.matcher).subSpec;
32 }
33
35 .binding = input.matcher.binding,
36 .sourceChannel = input.sourceChannel,
37 .timeslice = input.timeslice,
38 .origin = origin,
39 .description = description,
40 .subSpec = subSpec};
41}
42
43static DIMessages::RegisterDevice::Specs::Output toRegisterMessageSpec(const OutputRoute& output)
44{
45 std::string origin;
46 std::string description;
47 boost::optional<uint32_t> subSpec;
48
49 if (std::holds_alternative<ConcreteDataMatcher>(output.matcher.matcher)) {
50 origin = std::get<ConcreteDataMatcher>(output.matcher.matcher).origin.str;
51 description = std::get<ConcreteDataMatcher>(output.matcher.matcher).description.str;
52 subSpec = std::get<ConcreteDataMatcher>(output.matcher.matcher).subSpec;
53 } else {
54 origin = std::get<ConcreteDataTypeMatcher>(output.matcher.matcher).origin.str;
55 description = std::get<ConcreteDataTypeMatcher>(output.matcher.matcher).description.str;
56 }
57
59 .binding = output.matcher.binding.value,
60 .channel = output.channel,
61 .timeslice = output.timeslice,
62 .maxTimeslices = output.maxTimeslices,
63 .origin = origin,
64 .description = description,
65 .subSpec = subSpec};
66}
67
68static DIMessages::RegisterDevice::Specs::Forward toRegisterMessageSpec(const ForwardRoute& forward)
69{
70 boost::optional<std::string> origin;
71 boost::optional<std::string> description;
72 boost::optional<uint32_t> subSpec;
73
74 if (std::holds_alternative<ConcreteDataMatcher>(forward.matcher.matcher)) {
75 origin = std::get<ConcreteDataMatcher>(forward.matcher.matcher).origin.str;
76 description = std::get<ConcreteDataMatcher>(forward.matcher.matcher).description.str;
77 subSpec = std::get<ConcreteDataMatcher>(forward.matcher.matcher).subSpec;
78 }
79
81 .binding = forward.matcher.binding,
82 .timeslice = forward.timeslice,
83 .maxTimeslices = forward.maxTimeslices,
84 .channel = forward.channel,
85 .origin = origin,
86 .description = description,
87 .subSpec = subSpec};
88}
89
90static DIMessages::RegisterDevice createRegisterMessage(DeviceSpec const& spec, const std::string& runId)
91{
93 msg.name = spec.name;
94 msg.runId = runId;
95
96 msg.specs.inputs = std::vector<DIMessages::RegisterDevice::Specs::Input>{};
97 std::transform(spec.inputs.begin(), spec.inputs.end(), std::back_inserter(msg.specs.inputs), [](const InputRoute& input) -> DIMessages::RegisterDevice::Specs::Input {
98 return toRegisterMessageSpec(input);
99 });
100
101 msg.specs.outputs = std::vector<DIMessages::RegisterDevice::Specs::Output>{};
102 std::transform(spec.outputs.begin(), spec.outputs.end(), std::back_inserter(msg.specs.outputs), [](const OutputRoute& output) -> DIMessages::RegisterDevice::Specs::Output {
103 return toRegisterMessageSpec(output);
104 });
105
106 msg.specs.forwards = std::vector<DIMessages::RegisterDevice::Specs::Forward>{};
107 std::transform(spec.forwards.begin(), spec.forwards.end(), std::back_inserter(msg.specs.forwards), [](const ForwardRoute& forward) -> DIMessages::RegisterDevice::Specs::Forward {
108 return toRegisterMessageSpec(forward);
109 });
110
111 msg.specs.maxInputTimeslices = spec.maxInputTimeslices;
112 msg.specs.inputTimesliceId = spec.inputTimesliceId;
113 msg.specs.nSlots = spec.nSlots;
114 msg.specs.rank = spec.rank;
115
116 return msg;
117}
118
120 DeviceSpec const& spec,
121 const std::string& address,
122 int port,
123 const std::string& runId) : serviceRegistry(serviceRegistry),
124 deviceName(spec.name),
125 socket(address, port),
126 runId(runId)
127{
128 try {
129 socket.send(DIMessage{DIMessage::Header::Type::DEVICE_ON, createRegisterMessage(spec, runId).toJson()});
130 } catch (const std::runtime_error& error) {
131 LOG(error) << error.what();
132 terminate();
133 }
134}
135
137{
138 try {
139 socket.send(DIMessage{DIMessage::Header::Type::DEVICE_OFF, std::string{deviceName}});
140 } catch (const std::runtime_error& error) {
141 LOG(error) << error.what();
142 terminate();
143 }
144}
145
147{
148 try {
149 if (socket.isMessageAvailable()) {
150 DIMessage msg = socket.receive();
151 handleMessage(msg);
152 }
153 } catch (const std::runtime_error& error) {
154 LOG(error) << error.what();
155 terminate();
156 }
157}
158
160{
161 try {
162 socket.send(std::move(msg));
163 } catch (const std::runtime_error& error) {
164 LOG(error) << error.what();
165 terminate();
166 }
167}
168
169void DataInspectorProxyService::handleMessage(const DIMessage& msg)
170{
171 switch (msg.header.type()) {
172 case DIMessage::Header::Type::INSPECT_ON: {
173 LOG(info) << "DIService - INSPECT ON";
174 _isInspected = true;
175 break;
176 }
177 case DIMessage::Header::Type::INSPECT_OFF: {
178 LOG(info) << "DIService - INSPECT OFF";
179 _isInspected = false;
180 break;
181 }
182 case DIMessage::Header::Type::TERMINATE: {
183 LOG(info) << "DIService - TERMINATE";
184 terminate();
185 break;
186 }
187 default: {
188 LOG(info) << "DIService - Wrong msg type: " << static_cast<uint32_t>(msg.header.type());
189 }
190 }
191}
192
193void DataInspectorProxyService::terminate()
194{
195 serviceRegistry.get<ControlService>().readyToQuit(QuitRequest::All);
196}
197
199{
200 return new ServiceSpec{
201 .name = "data-inspector-proxy",
202 .init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
203 std::string proxyAddress = std::getenv("O2_DATAINSPECTOR_ADDRESS");
204 auto proxyPort = std::stoi(std::getenv("O2_DATAINSPECTOR_PORT"));
205 std::string runId = std::getenv("O2_DATAINSPECTOR_ID");
206
207 const auto& spec = services.get<const DeviceSpec>();
209 auto* diService = new DataInspectorProxyService(services, spec, proxyAddress, proxyPort, runId);
210 return ServiceHandle{TypeIdHelpers::uniqueId<DataInspectorProxyService>(), diService};
211 } else {
212 return ServiceHandle{0, nullptr};
213 }
214 },
215 .configure = CommonServices::noConfiguration(),
216 .exit = [](ServiceRegistryRef, void* service) {
217 auto *diService = (DataInspectorProxyService *) service;
218 delete diService; },
219 .preSendingMessages = [](ServiceRegistryRef registry, fair::mq::Parts& parts, ChannelIndex channelIndex) {
220 auto &diService = registry.get<DataInspectorProxyService>();
221 diService.receive(); // Check for messages from proxy
222
223 // Check if message is inspected and prepare DataRefs for processing
224 if (diService.isInspected()) {
225 std::vector <DataRef> refs{};
226 int i = 0;
227 while (i < parts.Size()) {
228 auto header = o2::header::get<o2::header::DataHeader *>((char *) parts.At(i)->GetData());
229
230 int payloadParts = (int) header->splitPayloadParts;
231 int lastPart = i + payloadParts;
232 while (i < lastPart) {
233 i++;
234 refs.push_back(DataRef{nullptr, (char *) parts.At(0)->GetData(), (char *) parts.At(i)->GetData(),
235 parts.At(i)->GetSize()});
236 }
237 i++;
238 }
239
240 // Send copy to proxy
241 auto proxyMessages = data_inspector::serializeO2Messages(refs, registry.get<DeviceSpec const>().name);
242 for (auto &proxyMessage: proxyMessages) {
243 diService.send(std::move(proxyMessage));
244 }
245 } },
246 .kind = ServiceKind::Global};
247};
248} // namespace o2::framework
benchmark::State & state
int32_t i
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
bool isMessageAvailable()
Definition DISocket.cxx:87
void send(const DIMessage &message)
Definition DISocket.cxx:92
DIMessage receive()
Definition DISocket.cxx:102
DataInspectorProxyService(ServiceRegistryRef serviceRegistry, DeviceSpec const &spec, const std::string &address, int port, const std::string &runId)
GLuint GLuint64EXT address
Definition glcorearb.h:5846
GLuint const GLchar * name
Definition glcorearb.h:781
consteval header::DataOrigin origin()
Definition ASoA.h:345
std::vector< DIMessage > serializeO2Messages(const std::vector< DataRef > &refs, const std::string &deviceName)
bool isNonInternalDevice(const DeviceSpec &spec)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
@ All
Quit all data processor, regardless of their state.
static ServiceConfigureCallback noConfiguration()
auto create() -> ServiceSpec *final
std::string name
The name of the associated DataProcessorSpec.
Definition DeviceSpec.h:50
Running state information of a given device.
Definition DeviceState.h:34
std::string name
Name of the service.
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
uint64_t const void const *restrict const msg
Definition x9.h:153