Project
Loading...
Searching...
No Matches
test_ExternalFairMQDeviceWorkflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
12
13using namespace o2::framework;
14
25#include "Framework/Logger.h"
27#include "Headers/DataHeader.h"
28#include <fairmq/Device.h>
29
30namespace test_config
31{
32enum struct ProxyMode {
33 All,
35 OnlyOutput, // also excludes checker
37};
38}
39
40namespace test_header
41{
43 enum struct MsgMode {
44 Pair,
46 };
47
48 static constexpr uint32_t sVersion = 1;
49 static constexpr o2::header::HeaderType sHeaderType = "MsgMode";
50 MsgModeHeader(MsgMode _mode, size_t nParts)
51 : BaseHeader(sizeof(MsgModeHeader), sHeaderType, o2::header::gSerializationMethodNone, sVersion), mode(_mode), nPayloadParts(nParts)
52 {
53 }
54
57};
58} // namespace test_header
59std::istream& operator>>(std::istream& in, enum test_config::ProxyMode& val);
60std::ostream& operator<<(std::ostream& out, const enum test_config::ProxyMode& val);
61std::istream& operator>>(std::istream& in, enum test_header::MsgModeHeader::MsgMode val);
62std::ostream& operator<<(std::ostream& out, const enum test_header::MsgModeHeader::MsgMode val);
63
64// we need to add workflow options before including Framework/runDataProcessing
65void customize(std::vector<ConfigParamSpec>& workflowOptions)
66{
67 workflowOptions.push_back(
69 "default-transport", VariantType::String, "shmem", {"default transport: shmem, zeromq"}});
70 workflowOptions.push_back(
72 "number-of-events,n", VariantType::Int, 10, {"number of events to process"}});
73 workflowOptions.push_back(
75 "proxy-mode", VariantType::String, "skip-output", {"proxy mode: all, skip-output, only-output, skip-all"}});
76}
77
79
80using namespace o2::framework;
83
84#define ASSERT_ERROR(condition) \
85 if ((condition) == false) { \
86 LOG(fatal) << R"(Test condition ")" #condition R"(" failed)"; \
87 }
88
89#define ASSERT_EQUAL(left, right) \
90 if ((left == right) == false) { \
91 LOGP(fatal, R"(Test condition {} ({}) == {} ({}) failed")", #left, left, #right, right); \
92 }
93
94template <typename T>
95T readConfig(ConfigContext const& config, const char* key)
96{
97 auto p = config.options().get<std::string>(key);
98 std::stringstream cs(p);
99 T val;
100 cs >> val;
101 if (cs.fail()) {
102 throw std::runtime_error("invalid configuration parameter '" + p + "' for key " + key);
103 }
104 return val;
105}
106
107std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)
108{
109 using ProxyMode = test_config::ProxyMode;
110 auto proxyMode = readConfig<ProxyMode>(config, "proxy-mode");
111 std::string defaultTransportConfig = config.options().get<std::string>("default-transport");
112 int nRolls = config.options().get<int>("number-of-events");
113 if (defaultTransportConfig == "zeromq") {
114 // nothing to do for the moment
115 } else if (defaultTransportConfig == "shmem") {
116 // nothing to do for the moment
117 } else {
118 throw std::runtime_error("invalid argument for option --default-transport : '" + defaultTransportConfig + "'");
119 }
120 std::vector<DataProcessorSpec> workflow;
121
123 // configuration of the out-of-band proxy channel
124 //
125 // used either in the output proxy ('dpl-sink') or as a direct channel of the producer
126 // use the OutputChannelSpec as a tool to create the default configuration for the out-of-band channel
127 OutputChannelSpec externalChannelSpec;
128 // Note: the name is hardcoded for now
129 externalChannelSpec.name = "downstream";
130 externalChannelSpec.type = ChannelType::Push;
131 externalChannelSpec.method = ChannelMethod::Bind;
132 externalChannelSpec.hostname = "localhost";
133 externalChannelSpec.port = 42042;
134 externalChannelSpec.listeners = 0;
135 externalChannelSpec.rateLogging = 10;
136 externalChannelSpec.sendBufferSize = 1;
137 externalChannelSpec.recvBufferSize = 1;
138 if (!defaultTransportConfig.empty()) {
139 if (defaultTransportConfig == "zeromq") {
140 externalChannelSpec.protocol = ChannelProtocol::Network;
141 } else if (defaultTransportConfig == "shmem") {
142 externalChannelSpec.protocol = ChannelProtocol::IPC;
143 }
144 }
145 std::string channelConfig = formatExternalChannelConfiguration(externalChannelSpec);
146 // at some point the formatting tool might add the transport as well so we have to check
147 if (!defaultTransportConfig.empty() && defaultTransportConfig.find("transport=") == std::string::npos) {
148 channelConfig += ",transport=" + defaultTransportConfig;
149 }
150
152 // a producer process steered by a timer
153 //
154 auto producerInitCallback = [nRolls, proxyMode, externalChannelSpec](CallbackService& callbacks, RawDeviceService& rds) {
155 srand(getpid());
156 auto channelName = std::make_shared<std::string>();
157 auto producerChannelInit = [channelName, outputRoutes = rds.spec().outputs]() {
158 // find the output channel name, we expect all output messages to be
159 // sent over the same channel
160 if (channelName->empty()) {
161 OutputSpec const query{"TST", "DATA", 0};
162 for (auto& route : outputRoutes) {
163 if (DataSpecUtils::match(route.matcher, query)) {
164 *channelName = route.channel;
165 break;
166 }
167 }
168 }
169 ASSERT_ERROR(channelName->length() > 0);
170 };
171 if (proxyMode == ProxyMode::SkipOutput) {
172 *channelName = externalChannelSpec.name;
173 } else {
174 callbacks.set<CallbackService::Id::Start>(producerChannelInit);
175 }
176 // the compute callback of the producer
177 auto producerCallback = [nRolls, channelName, proxyMode, counter = std::make_shared<size_t>()](DataAllocator& outputs, ControlService& control, RawDeviceService& rds, MessageContext& messageContext) {
178 int data = *counter;
179 // outputs.make<int>(OutputRef{"data", 0}) = data;
180
181 fair::mq::Device& device = *(rds.device());
182 auto transport = device.GetChannel(*channelName, 0).Transport();
183 auto channelAlloc = o2::pmr::getTransportAllocator(transport);
184
185 DataProcessingHeader dph{*counter, 0};
186
187 auto msgMode = rand() % 2 ? test_header::MsgModeHeader::MsgMode::Pair : test_header::MsgModeHeader::MsgMode::Sequence;
188 size_t nPayloads = rand() % 10 + 1;
189
190 test_header::MsgModeHeader mmh{msgMode, nPayloads};
191 fair::mq::Parts messages;
192 auto insertHeader = [&dph, &mmh, &channelAlloc, &messages](DataHeader const& dh) -> void {
193 fair::mq::MessagePtr header = o2::pmr::getMessage(Stack{channelAlloc, dh, dph, mmh});
194 messages.AddPart(std::move(header));
195 };
196 auto insertPayload = [&transport, &messages, &data](size_t size) -> void {
197 fair::mq::MessagePtr payload = transport->CreateMessage(size);
198 memcpy(payload->GetData(), &data, sizeof(data));
199 messages.AddPart(std::move(payload));
200 };
201 auto createSequence = [&insertHeader, &insertPayload, &data](size_t nPayloads, DataHeader dh) -> void {
202 // one header with index set to the number of split parts indicates sequence
203 // of payloads without additional headers
204 dh.payloadSize = sizeof(data);
205 dh.payloadSerializationMethod = o2::header::gSerializationMethodNone;
206 dh.splitPayloadIndex = nPayloads;
207 dh.splitPayloadParts = nPayloads;
208 insertHeader(dh);
209
210 for (size_t i = 0; i < nPayloads; ++i) {
211 insertPayload(dh.payloadSize);
212 }
213 };
214
215 auto createPairs = [&insertHeader, &insertPayload, &data](size_t nPayloads, DataHeader dh) -> void {
216 // one header with index set to the number of split parts indicates sequence
217 // of payloads without additional headers
218 dh.payloadSize = sizeof(data);
219 dh.payloadSerializationMethod = o2::header::gSerializationMethodNone;
220 dh.splitPayloadIndex = 0;
221 dh.splitPayloadParts = nPayloads;
222 for (size_t i = 0; i < nPayloads; ++i) {
223 dh.splitPayloadIndex = i;
224 insertHeader(dh);
225 insertPayload(dh.payloadSize);
226 }
227 };
228
229 if (msgMode == test_header::MsgModeHeader::MsgMode::Pair) {
230 createPairs(nPayloads, DataHeader{"DATA", "TST", 0});
231 } else {
232 createSequence(nPayloads, DataHeader{"DATA", "TST", 0});
233 }
234 // using utility from ExternalFairMQDeviceProxy
235 o2::framework::sendOnChannel(device, messages, *channelName, (size_t)-1);
236 messageContext.fakeDispatch();
237
238 if (++(*counter) >= nRolls) {
239 // send the end of stream signal, this is transferred by the proxies
240 // and allows to properly terminate downstream devices
241 control.endOfStream();
242 if (proxyMode == ProxyMode::SkipOutput) {
243 // since we are sending on the bare channel, also the EOS message needs to be created.
244 DataHeader dhEOS;
245 dhEOS.dataOrigin = "DPL";
246 dhEOS.dataDescription = "EOS";
247 dhEOS.subSpecification = 0;
248 dhEOS.payloadSize = 0;
249 dhEOS.payloadSerializationMethod = o2::header::gSerializationMethodNone;
250 dhEOS.tfCounter = 0;
251 dhEOS.firstTForbit = 0;
252 SourceInfoHeader sih;
253 sih.state = InputChannelState::Completed;
254 auto headerMessage = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dhEOS, dph, sih});
255 fair::mq::Parts out;
256 out.AddPart(std::move(headerMessage));
257 // add empty payload message
258 out.AddPart(std::move(device.NewMessageFor(*channelName, 0, 0)));
259 o2::framework::sendOnChannel(device, out, *channelName, (size_t)-1);
260 messageContext.fakeDispatch();
261 }
262 }
263 };
264 return adaptStateless(producerCallback);
265 };
266
267 workflow.emplace_back(DataProcessorSpec{"producer",
268 {InputSpec{"timer", "TST", "TIMER", 0, Lifetime::Timer}},
269 {OutputSpec{{"data"}, "TST", "DATA", 0, Lifetime::Timeframe}},
270 AlgorithmSpec{adaptStateful(producerInitCallback)},
271 {ConfigParamSpec{"period-timer", VariantType::Int, 100000, {"period of timer"}}}});
272
273 if (proxyMode == ProxyMode::SkipOutput) {
274 // create the out-of-band channel in the producer if the output proxy is bypassed
275 const char* d = strdup(channelConfig.c_str());
276 workflow.back().options.push_back(ConfigParamSpec{"channel-config", VariantType::String, d, {"proxy channel of producer"}});
277 }
278
280 // the dpl sink proxy process
281
282 Inputs sinkInputs = {InputSpec{"external", "TST", "DATA", 0, Lifetime::Timeframe}};
283 auto channelSelector = [](InputSpec const&, const std::unordered_map<std::string, std::vector<fair::mq::Channel>>&) -> std::string {
284 return "downstream";
285 };
286 if (proxyMode == ProxyMode::All || proxyMode == ProxyMode::OnlyOutput) {
287 workflow.emplace_back(std::move(specifyFairMQDeviceMultiOutputProxy("dpl-sink", sinkInputs, channelConfig.c_str(), channelSelector)));
288 }
289
291 // a simple checker process subscribing to the output of the input proxy
292 //
293 // the compute callback of the checker
294 auto counter = std::make_shared<int>(0);
295 auto checkerCallback = [counter](InputRecord& inputs, ControlService& control) {
296 auto const* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(inputs.get("datain"));
297 auto const* mmh = DataRefUtils::getHeader<test_header::MsgModeHeader*>(inputs.get("datain"));
298 ASSERT_ERROR(dh != nullptr);
299 ASSERT_ERROR(mmh != nullptr);
300 LOGP(info, "{} input slots(s), data {}, parts {}, mode {}", inputs.size(), inputs.get<int>("datain"), mmh->nPayloadParts, (int)mmh->mode);
301 if (mmh->mode == test_header::MsgModeHeader::MsgMode::Pair) {
302 ASSERT_ERROR(dh->splitPayloadParts == mmh->nPayloadParts);
303 ASSERT_ERROR(dh->splitPayloadIndex == 0);
304 } else {
305 ASSERT_ERROR(dh->splitPayloadParts == mmh->nPayloadParts);
306 ASSERT_ERROR(dh->splitPayloadIndex == mmh->nPayloadParts);
307 }
308 size_t nPayloads = 0;
309 for (auto const& ref : InputRecordWalker(inputs)) {
310 auto data = inputs.get<int>(ref);
311 ASSERT_ERROR(data == *counter);
312 ++nPayloads;
313 }
314 ASSERT_ERROR(nPayloads == mmh->nPayloadParts);
315 ++(*counter);
316 };
317 auto checkCounter = [counter, nRolls](EndOfStreamContext&) {
318 ASSERT_EQUAL(*counter, nRolls);
319 if (*counter == nRolls) {
320 LOG(info) << "checker has received " << nRolls << " successful event(s)";
321 }
322 };
323 auto checkerInit = [checkerCallback, checkCounter](CallbackService& callbacks) {
324 callbacks.set<CallbackService::Id::EndOfStream>(checkCounter);
325 return adaptStateless(checkerCallback);
326 };
327
328 // the checker process connects to the proxy
329 Inputs checkerInputs;
330 if (proxyMode != ProxyMode::All) {
331 checkerInputs.emplace_back(InputSpec{"datain", ConcreteDataTypeMatcher{"TST", "DATA"}, Lifetime::Timeframe});
332 // for (unsigned int i = 0; i < pState->nChannels; i++) {
333 // checkerInputs.emplace_back(InputSpec{{"datain"}, "TST", "DATA", i, Lifetime::Timeframe});
334 // }
335 } else {
336 checkerInputs.emplace_back(InputSpec{"datain", ConcreteDataTypeMatcher{"PRX", "DATA"}, Lifetime::Timeframe});
337 // for (unsigned int i = 0; i < pState->nChannels; i++) {
338 // checkerInputs.emplace_back(InputSpec{{"datain"}, "PRX", "DATA", i, Lifetime::Timeframe});
339 // }
340 }
341 if (proxyMode != ProxyMode::OnlyOutput) {
342 // the checker is not added if the input proxy is skipped
343 workflow.emplace_back(DataProcessorSpec{"checker",
344 std::move(checkerInputs),
345 {},
346 AlgorithmSpec{adaptStateful(checkerInit)}});
347 }
348
350 // the input proxy process
351 // reads the messages from the output proxy via the out-of-band channel
352
353 // converter callback for the external FairMQ device proxy ProcessorSpec generator
354 InjectorFunction converter = [](TimingInfo&, ServiceRegistryRef const& services, fair::mq::Parts& inputs, ChannelRetriever channelRetriever, size_t newTimesliceId, bool&) -> bool {
355 auto* device = services.get<RawDeviceService>().device();
356 ASSERT_ERROR(inputs.Size() >= 2);
357 if (inputs.Size() < 2) {
358 return false;
359 }
360 int msgidx = 0;
361 auto dh = o2::header::get<o2::header::DataHeader*>(inputs.At(msgidx)->GetData());
362 if (!dh) {
363 LOG(error) << "data on input " << msgidx << " does not follow the O2 data model, DataHeader missing";
364 return false;
365 }
366 auto dph = o2::header::get<DataProcessingHeader*>(inputs.At(msgidx)->GetData());
367 if (!dph) {
368 LOG(error) << "data on input " << msgidx << " does not follow the O2 data model, DataProcessingHeader missing";
369 return false;
370 }
371 // Note: we want to run both the output and input proxy in the same workflow and thus we need
372 // different data identifiers and change the data origin in the forwarding
373 OutputSpec query{"PRX", dh->dataDescription, dh->subSpecification};
374 auto channelName = channelRetriever(query, dph->startTime);
375 bool isData = DataSpecUtils::match(OutputSpec{"TST", "DATA", 0}, dh->dataOrigin, dh->dataDescription, dh->subSpecification);
376 // for the configured data channel we require the channel name, the EOS message containing
377 // the forwarded SourceInfoHeader created by the output proxy will be skipped here since the
378 // input proxy handles this internally
379 ASSERT_ERROR(!isData || !channelName.empty());
380 LOG(debug) << "using channel '" << channelName << "' for " << DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification});
381 if (channelName.empty()) {
382 return false;
383 }
384 fair::mq::Parts output;
385 for (; msgidx < inputs.Size(); ++msgidx) {
386 auto const* dh = o2::header::get<o2::header::DataHeader*>(inputs.At(msgidx)->GetData());
387 if (dh) {
388 LOGP(debug, "{}/{}/{} with {} part(s), index {}",
389 dh->dataOrigin.as<std::string>(),
390 dh->dataDescription.as<std::string>(),
391 dh->subSpecification,
392 dh->splitPayloadParts,
393 dh->splitPayloadIndex);
394 // make a copy of the header message, get the data header and change origin
395 auto outHeaderMessage = device->NewMessageFor(channelName, 0, inputs.At(msgidx)->GetSize());
396 memcpy(outHeaderMessage->GetData(), inputs.At(msgidx)->GetData(), inputs.At(msgidx)->GetSize());
397 // this we obviously need to fix in the get API, const'ness of the returned header pointer
398 // should depend on const'ness of the buffer
399 auto odh = const_cast<o2::header::DataHeader*>(o2::header::get<o2::header::DataHeader*>(outHeaderMessage->GetData()));
400 odh->dataOrigin = o2::header::DataOrigin("PRX");
401 output.AddPart(std::move(outHeaderMessage));
402 } else {
403 output.AddPart(std::move(inputs.At(msgidx)));
404 }
405 }
406 auto& messageContext = services.get<MessageContext>();
407 o2::framework::sendOnChannel(*device, output, channelName, (size_t)-1);
408 messageContext.fakeDispatch();
409 return output.Size() != 0;
410 };
411
412 // we use the same spec to build the configuration string, ideally we would have some helpers
413 // which convert an OutputChannelSpec to an InputChannelSpec replacing 'bind' <--> 'connect'
414 // and 'push' <--> 'pull'
415 //
416 // skip the name in the configuration string as it is added in specifyExternalFairMQDeviceProxy
417 externalChannelSpec.name = "";
418 externalChannelSpec.type = ChannelType::Pull;
419 externalChannelSpec.method = ChannelMethod::Connect;
420 channelConfig = formatExternalChannelConfiguration(externalChannelSpec);
421 if (!defaultTransportConfig.empty() && defaultTransportConfig.find("transport=") == std::string::npos) {
422 channelConfig += ",transport=" + defaultTransportConfig;
423 }
424
425 if (proxyMode == ProxyMode::All) {
426 // Note: in order to make the DPL output proxy and an input proxy working in the same
427 // workflow, we use different data description
428 Outputs inputProxyOutputs = {OutputSpec{ConcreteDataTypeMatcher{"PRX", "DATA"}, Lifetime::Timeframe}};
429 workflow.emplace_back(specifyExternalFairMQDeviceProxy(
430 "input-proxy",
431 std::move(inputProxyOutputs),
432 channelConfig.c_str(),
433 converter));
434 } else if (proxyMode == ProxyMode::SkipOutput) {
435 Outputs inputProxyOutputs = {OutputSpec{ConcreteDataTypeMatcher{"TST", "DATA"}, Lifetime::Timeframe}};
436 // we use the same specs as filters in the dpl adaptor
437 auto filterSpecs = inputProxyOutputs;
438 workflow.emplace_back(specifyExternalFairMQDeviceProxy(
439 "input-proxy",
440 std::move(inputProxyOutputs),
441 channelConfig.c_str(),
442 o2::framework::dplModelAdaptor(filterSpecs, true)));
443 }
444 workflow.back().labels.emplace_back(DataProcessorLabel{"input-proxy"});
445
446 return workflow;
447}
448
449std::istream& operator>>(std::istream& in, enum test_config::ProxyMode& val)
450{
451 std::string token;
452 in >> token;
453 if (token == "all" || token == "a") {
454 val = test_config::ProxyMode::All;
455 } else if (token == "skip-output") {
456 val = test_config::ProxyMode::SkipOutput;
457 } else if (token == "only-output") {
458 val = test_config::ProxyMode::OnlyOutput;
459 } else if (token == "skip-all" || token == "skip-proxies") {
460 val = test_config::ProxyMode::NoProxies;
461 } else {
462 in.setstate(std::ios_base::failbit);
463 }
464 return in;
465}
466
467std::ostream& operator<<(std::ostream& out, const enum test_config::ProxyMode& val)
468{
469 if (val == test_config::ProxyMode::All) {
470 out << "all";
471 } else if (val == test_config::ProxyMode::SkipOutput) {
472 out << "skip-output";
473 } else if (val == test_config::ProxyMode::OnlyOutput) {
474 out << "only-output";
475 } else if (val == test_config::ProxyMode::NoProxies) {
476 out << "skip-all";
477 } else {
478 out.setstate(std::ios_base::failbit);
479 }
480 return out;
481}
482
483std::istream& operator>>(std::istream& in, enum test_header::MsgModeHeader::MsgMode& val)
484{
485 std::string token;
486 in >> token;
487 if (token == "pair") {
488 val = test_header::MsgModeHeader::MsgMode::Pair;
489 } else if (token == "sequence") {
490 val = test_header::MsgModeHeader::MsgMode::Sequence;
491 } else {
492 in.setstate(std::ios_base::failbit);
493 }
494 return in;
495}
496
497std::ostream& operator<<(std::ostream& out, const enum test_header::MsgModeHeader::MsgMode& val)
498{
499 if (val == test_header::MsgModeHeader::MsgMode::Pair) {
500 out << "pair";
501 } else if (val == test_header::MsgModeHeader::MsgMode::Sequence) {
502 out << "sequence";
503 } else {
504 out.setstate(std::ios_base::failbit);
505 }
506 return out;
507}
A helper class to iteratate over all parts of all input routes.
GLenum mode
Definition glcorearb.h:266
GLuint GLfloat * val
Definition glcorearb.h:1582
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::ostream & operator<<(std::ostream &s, ChannelType const &type)
Stream operators so that we can use ChannelType with Boost.Test.
std::istream & operator>>(std::istream &in, enum TerminationPolicy &policy)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
the base header struct Every header type must begin (i.e. derive) with this. Don't use this struct di...
Definition DataHeader.h:351
the main header struct
Definition DataHeader.h:618
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36
MsgModeHeader(MsgMode _mode, size_t nParts)
static constexpr o2::header::HeaderType sHeaderType
void customize(std::vector< ConfigParamSpec > &workflowOptions)