28#include <fairmq/Device.h>
65void customize(std::vector<ConfigParamSpec>& workflowOptions)
67 workflowOptions.push_back(
69 "default-transport", VariantType::String,
"shmem", {
"default transport: shmem, zeromq"}});
70 workflowOptions.push_back(
72 "number-of-events,n", VariantType::Int, 10, {
"number of events to process"}});
73 workflowOptions.push_back(
75 "proxy-mode", VariantType::String,
"skip-output", {
"proxy mode: all, skip-output, only-output, skip-all"}});
84#define ASSERT_ERROR(condition) \
85 if ((condition) == false) { \
86 LOG(fatal) << R"(Test condition ")" #condition R"(" failed)"; \
89#define ASSERT_EQUAL(left, right) \
90 if ((left == right) == false) { \
91 LOGP(fatal, R"(Test condition {} ({}) == {} ({}) failed")", #left, left, #right, right); \
95T readConfig(ConfigContext const& config, const char* key)
97 auto p = config.options().get<std::string>(key);
98 std::stringstream cs(p);
102 throw std::runtime_error("invalid configuration parameter '" + p + "' for key " + key);
107std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)
109 using ProxyMode = test_config::ProxyMode;
110 auto proxyMode = readConfig<ProxyMode>(config, "proxy-mode");
111 std::string defaultTransportConfig = config.options().get<std::string>("default-transport");
112 int nRolls = config.options().get<int>("number-of-events");
113 if (defaultTransportConfig == "zeromq") {
114 // nothing to do for the moment
115 } else if (defaultTransportConfig == "shmem") {
116 // nothing to do for the moment
118 throw std::runtime_error("invalid argument for option --default-transport : '" + defaultTransportConfig + "'");
120 std::vector<DataProcessorSpec> workflow;
123 // configuration of the out-of-band proxy channel
125 // used either in the output proxy ('dpl-sink') or as a direct channel of the producer
126 // use the OutputChannelSpec as a tool to create the default configuration for the out-of-band channel
127 OutputChannelSpec externalChannelSpec;
128 // Note: the name is hardcoded for now
129 externalChannelSpec.name = "downstream";
130 externalChannelSpec.type = ChannelType::Push;
131 externalChannelSpec.method = ChannelMethod::Bind;
132 externalChannelSpec.hostname = "localhost";
133 externalChannelSpec.port = 42042;
134 externalChannelSpec.listeners = 0;
135 externalChannelSpec.rateLogging = 10;
136 externalChannelSpec.sendBufferSize = 1;
137 externalChannelSpec.recvBufferSize = 1;
138 if (!defaultTransportConfig.empty()) {
139 if (defaultTransportConfig == "zeromq") {
140 externalChannelSpec.protocol = ChannelProtocol::Network;
141 } else if (defaultTransportConfig == "shmem") {
142 externalChannelSpec.protocol = ChannelProtocol::IPC;
145 std::string channelConfig = formatExternalChannelConfiguration(externalChannelSpec);
146 // at some point the formatting tool might add the transport as well so we have to check
147 if (!defaultTransportConfig.empty() && defaultTransportConfig.find("transport=") == std::string::npos) {
148 channelConfig += ",transport=" + defaultTransportConfig;
152 // a producer process steered by a timer
154 auto producerInitCallback = [nRolls, proxyMode, externalChannelSpec](CallbackService& callbacks, RawDeviceService& rds) {
156 auto channelName = std::make_shared<std::string>();
157 auto producerChannelInit = [channelName, outputRoutes = rds.spec().outputs]() {
158 // find the output channel name, we expect all output messages to be
159 // sent over the same channel
160 if (channelName->empty()) {
161 OutputSpec const query{"TST", "DATA", 0};
162 for (auto& route : outputRoutes) {
163 if (DataSpecUtils::match(route.matcher, query)) {
164 *channelName = route.channel;
169 ASSERT_ERROR(channelName->length() > 0);
171 if (proxyMode == ProxyMode::SkipOutput) {
172 *channelName = externalChannelSpec.name;
174 callbacks.set<CallbackService::Id::Start>(producerChannelInit);
176 // the compute callback of the producer
177 auto producerCallback = [nRolls, channelName, proxyMode, counter = std::make_shared<size_t>()](DataAllocator& outputs, ControlService& control, RawDeviceService& rds, MessageContext& messageContext) {
179 // outputs.make<int>(OutputRef{"data", 0}) = data;
181 fair::mq::Device& device = *(rds.device());
182 auto transport = device.GetChannel(*channelName, 0).Transport();
183 auto channelAlloc = o2::pmr::getTransportAllocator(transport);
185 DataProcessingHeader dph{*counter, 0};
187 auto msgMode = rand() % 2 ? test_header::MsgModeHeader::MsgMode::Pair : test_header::MsgModeHeader::MsgMode::Sequence;
188 size_t nPayloads = rand() % 10 + 1;
190 test_header::MsgModeHeader mmh{msgMode, nPayloads};
191 fair::mq::Parts messages;
192 auto insertHeader = [&dph, &mmh, &channelAlloc, &messages](DataHeader const& dh) -> void {
193 fair::mq::MessagePtr header = o2::pmr::getMessage(Stack{channelAlloc, dh, dph, mmh});
194 messages.AddPart(std::move(header));
196 auto insertPayload = [&transport, &messages, &data](size_t size) -> void {
197 fair::mq::MessagePtr payload = transport->CreateMessage(size);
198 memcpy(payload->GetData(), &data, sizeof(data));
199 messages.AddPart(std::move(payload));
201 auto createSequence = [&insertHeader, &insertPayload, &data](size_t nPayloads, DataHeader dh) -> void {
202 // one header with index set to the number of split parts indicates sequence
203 // of payloads without additional headers
204 dh.payloadSize = sizeof(data);
205 dh.payloadSerializationMethod = o2::header::gSerializationMethodNone;
206 dh.splitPayloadIndex = nPayloads;
207 dh.splitPayloadParts = nPayloads;
210 for (size_t i = 0; i < nPayloads; ++i) {
211 insertPayload(dh.payloadSize);
215 auto createPairs = [&insertHeader, &insertPayload, &data](size_t nPayloads, DataHeader dh) -> void {
216 // one header with index set to the number of split parts indicates sequence
217 // of payloads without additional headers
218 dh.payloadSize = sizeof(data);
219 dh.payloadSerializationMethod = o2::header::gSerializationMethodNone;
220 dh.splitPayloadIndex = 0;
221 dh.splitPayloadParts = nPayloads;
222 for (size_t i = 0; i < nPayloads; ++i) {
223 dh.splitPayloadIndex = i;
225 insertPayload(dh.payloadSize);
229 if (msgMode == test_header::MsgModeHeader::MsgMode::Pair) {
230 createPairs(nPayloads, DataHeader{"DATA", "TST", 0});
232 createSequence(nPayloads, DataHeader{"DATA", "TST", 0});
234 // using utility from ExternalFairMQDeviceProxy
235 o2::framework::sendOnChannel(device, messages, *channelName, (size_t)-1);
236 messageContext.fakeDispatch();
238 if (++(*counter) >= nRolls) {
239 // send the end of stream signal, this is transferred by the proxies
240 // and allows to properly terminate downstream devices
241 control.endOfStream();
242 if (proxyMode == ProxyMode::SkipOutput) {
243 // since we are sending on the bare channel, also the EOS message needs to be created.
245 dhEOS.dataOrigin = "DPL";
246 dhEOS.dataDescription = "EOS";
247 dhEOS.subSpecification = 0;
248 dhEOS.payloadSize = 0;
249 dhEOS.payloadSerializationMethod = o2::header::gSerializationMethodNone;
251 dhEOS.firstTForbit = 0;
252 SourceInfoHeader sih;
253 sih.state = InputChannelState::Completed;
254 auto headerMessage = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dhEOS, dph, sih});
256 out.AddPart(std::move(headerMessage));
257 // add empty payload message
258 out.AddPart(std::move(device.NewMessageFor(*channelName, 0, 0)));
259 o2::framework::sendOnChannel(device, out, *channelName, (size_t)-1);
260 messageContext.fakeDispatch();
264 return adaptStateless(producerCallback);
267 workflow.emplace_back(DataProcessorSpec{"producer",
268 {InputSpec{"timer", "TST", "TIMER", 0, Lifetime::Timer}},
269 {OutputSpec{{"data"}, "TST", "DATA", 0, Lifetime::Timeframe}},
270 AlgorithmSpec{adaptStateful(producerInitCallback)},
271 {ConfigParamSpec{"period-timer", VariantType::Int, 100000, {"period of timer"}}}});
273 if (proxyMode == ProxyMode::SkipOutput) {
274 // create the out-of-band channel in the producer if the output proxy is bypassed
275 const char* d = strdup(channelConfig.c_str());
276 workflow.back().options.push_back(ConfigParamSpec{"channel-config", VariantType::String, d, {"proxy channel of producer"}});
280 // the dpl sink proxy process
282 Inputs sinkInputs = {InputSpec{"external", "TST", "DATA", 0, Lifetime::Timeframe}};
283 auto channelSelector = [](InputSpec const&, const std::unordered_map<std::string, std::vector<fair::mq::Channel>>&) -> std::string {
286 if (proxyMode == ProxyMode::All || proxyMode == ProxyMode::OnlyOutput) {
287 workflow.emplace_back(std::move(specifyFairMQDeviceMultiOutputProxy("dpl-sink", sinkInputs, channelConfig.c_str(), channelSelector)));
291 // a simple checker process subscribing to the output of the input proxy
293 // the compute callback of the checker
294 auto counter = std::make_shared<int>(0);
295 auto checkerCallback = [counter](InputRecord& inputs, ControlService& control) {
296 auto const* dh = DataRefUtils::getHeader<o2::header::DataHeader*>(inputs.get("datain"));
297 auto const* mmh = DataRefUtils::getHeader<test_header::MsgModeHeader*>(inputs.get("datain"));
298 ASSERT_ERROR(dh != nullptr);
299 ASSERT_ERROR(mmh != nullptr);
300 LOGP(info, "{} input slots(s), data {}, parts {}, mode {}", inputs.size(), inputs.get<int>("datain"), mmh->nPayloadParts, (int)mmh->mode);
301 if (mmh->mode == test_header::MsgModeHeader::MsgMode::Pair) {
302 ASSERT_ERROR(dh->splitPayloadParts == mmh->nPayloadParts);
303 ASSERT_ERROR(dh->splitPayloadIndex == 0);
305 ASSERT_ERROR(dh->splitPayloadParts == mmh->nPayloadParts);
306 ASSERT_ERROR(dh->splitPayloadIndex == mmh->nPayloadParts);
308 size_t nPayloads = 0;
309 for (auto const& ref : InputRecordWalker(inputs)) {
310 auto data = inputs.get<int>(ref);
311 ASSERT_ERROR(data == *counter);
314 ASSERT_ERROR(nPayloads == mmh->nPayloadParts);
317 auto checkCounter = [counter, nRolls](EndOfStreamContext&) {
318 ASSERT_EQUAL(*counter, nRolls);
319 if (*counter == nRolls) {
320 LOG(info) << "checker has received " << nRolls << " successful event(s)";
323 auto checkerInit = [checkerCallback, checkCounter](CallbackService& callbacks) {
324 callbacks.set<CallbackService::Id::EndOfStream>(checkCounter);
325 return adaptStateless(checkerCallback);
328 // the checker process connects to the proxy
329 Inputs checkerInputs;
330 if (proxyMode != ProxyMode::All) {
331 checkerInputs.emplace_back(InputSpec{"datain", ConcreteDataTypeMatcher{"TST", "DATA"}, Lifetime::Timeframe});
332 // for (unsigned int i = 0; i < pState->nChannels; i++) {
333 // checkerInputs.emplace_back(InputSpec{{"datain"}, "TST", "DATA", i, Lifetime::Timeframe});
336 checkerInputs.emplace_back(InputSpec{"datain", ConcreteDataTypeMatcher{"PRX", "DATA"}, Lifetime::Timeframe});
337 // for (unsigned int i = 0; i < pState->nChannels; i++) {
338 // checkerInputs.emplace_back(InputSpec{{"datain"}, "PRX", "DATA", i, Lifetime::Timeframe});
341 if (proxyMode != ProxyMode::OnlyOutput) {
342 // the checker is not added if the input proxy is skipped
343 workflow.emplace_back(DataProcessorSpec{"checker",
344 std::move(checkerInputs),
346 AlgorithmSpec{adaptStateful(checkerInit)}});
350 // the input proxy process
351 // reads the messages from the output proxy via the out-of-band channel
353 // converter callback for the external FairMQ device proxy ProcessorSpec generator
354 InjectorFunction converter = [](TimingInfo&, ServiceRegistryRef const& services, fair::mq::Parts& inputs, ChannelRetriever channelRetriever, size_t newTimesliceId, bool&) -> bool {
355 auto* device = services.get<RawDeviceService>().device();
356 ASSERT_ERROR(inputs.Size() >= 2);
357 if (inputs.Size() < 2) {
361 auto dh = o2::header::get<o2::header::DataHeader*>(inputs.At(msgidx)->GetData());
363 LOG(error) << "data on input " << msgidx << " does not follow the O2 data model, DataHeader missing";
366 auto dph = o2::header::get<DataProcessingHeader*>(inputs.At(msgidx)->GetData());
368 LOG(error) << "data on input " << msgidx << " does not follow the O2 data model, DataProcessingHeader missing";
371 // Note: we want to run both the output and input proxy in the same workflow and thus we need
372 // different data identifiers and change the data origin in the forwarding
373 OutputSpec query{"PRX", dh->dataDescription, dh->subSpecification};
374 auto channelName = channelRetriever(query, dph->startTime);
375 bool isData = DataSpecUtils::match(OutputSpec{"TST", "DATA", 0}, dh->dataOrigin, dh->dataDescription, dh->subSpecification);
376 // for the configured data channel we require the channel name, the EOS message containing
377 // the forwarded SourceInfoHeader created by the output proxy will be skipped here since the
378 // input proxy handles this internally
379 ASSERT_ERROR(!isData || !channelName.empty());
380 LOG(debug) << "using channel '" << channelName << "' for " << DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification});
381 if (channelName.empty()) {
384 fair::mq::Parts output;
385 for (; msgidx < inputs.Size(); ++msgidx) {
386 auto const* dh = o2::header::get<o2::header::DataHeader*>(inputs.At(msgidx)->GetData());
388 LOGP(debug, "{}/{}/{} with {} part(s), index {}",
389 dh->dataOrigin.as<std::string>(),
390 dh->dataDescription.as<std::string>(),
391 dh->subSpecification,
392 dh->splitPayloadParts,
393 dh->splitPayloadIndex);
394 // make a copy of the header message, get the data header and change origin
395 auto outHeaderMessage = device->NewMessageFor(channelName, 0, inputs.At(msgidx)->GetSize());
396 memcpy(outHeaderMessage->GetData(), inputs.At(msgidx)->GetData(), inputs.At(msgidx)->GetSize());
397 // this we obviously need to fix in the get API, const'ness of the returned header pointer
398 // should depend on const'ness of the buffer
399 auto odh = const_cast<o2::header::DataHeader*>(o2::header::get<o2::header::DataHeader*>(outHeaderMessage->GetData()));
400 odh->dataOrigin = o2::header::DataOrigin("PRX");
401 output.AddPart(std::move(outHeaderMessage));
403 output.AddPart(std::move(inputs.At(msgidx)));
406 auto& messageContext = services.get<MessageContext>();
407 o2::framework::sendOnChannel(*device, output, channelName, (size_t)-1);
408 messageContext.fakeDispatch();
409 return output.Size() != 0;
412 // we use the same spec to build the configuration string, ideally we would have some helpers
413 // which convert an OutputChannelSpec to an InputChannelSpec replacing 'bind' <--> 'connect'
414 // and 'push' <--> 'pull'
416 // skip the name in the configuration string as it is added in specifyExternalFairMQDeviceProxy
417 externalChannelSpec.name = "";
418 externalChannelSpec.type = ChannelType::Pull;
419 externalChannelSpec.method = ChannelMethod::Connect;
420 channelConfig = formatExternalChannelConfiguration(externalChannelSpec);
421 if (!defaultTransportConfig.empty() && defaultTransportConfig.find("transport=") == std::string::npos) {
422 channelConfig += ",transport=" + defaultTransportConfig;
425 if (proxyMode == ProxyMode::All) {
426 // Note: in order to make the DPL output proxy and an input proxy working in the same
427 // workflow, we use different data description
428 Outputs inputProxyOutputs = {OutputSpec{ConcreteDataTypeMatcher{"PRX", "DATA"}, Lifetime::Timeframe}};
429 workflow.emplace_back(specifyExternalFairMQDeviceProxy(
431 std::move(inputProxyOutputs),
432 channelConfig.c_str(),
434 } else if (proxyMode == ProxyMode::SkipOutput) {
435 Outputs inputProxyOutputs = {OutputSpec{ConcreteDataTypeMatcher{"TST", "DATA"}, Lifetime::Timeframe}};
436 // we use the same specs as filters in the dpl adaptor
437 auto filterSpecs = inputProxyOutputs;
438 workflow.emplace_back(specifyExternalFairMQDeviceProxy(
440 std::move(inputProxyOutputs),
441 channelConfig.c_str(),
442 o2::framework::dplModelAdaptor(filterSpecs, true)));
444 workflow.back().labels.emplace_back(DataProcessorLabel{"input-proxy"});
449std::istream& operator>>(std::istream& in, enum test_config::ProxyMode& val)
453 if (token == "all" || token == "a") {
454 val = test_config::ProxyMode::All;
455 } else if (token == "skip-output") {
456 val = test_config::ProxyMode::SkipOutput;
457 } else if (token == "only-output") {
458 val = test_config::ProxyMode::OnlyOutput;
459 } else if (token == "skip-all" || token == "skip-proxies") {
460 val = test_config::ProxyMode::NoProxies;
462 in.setstate(std::ios_base::failbit);
467std::ostream& operator<<(std::ostream& out, const enum test_config::ProxyMode& val)
469 if (val == test_config::ProxyMode::All) {
471 } else if (val == test_config::ProxyMode::SkipOutput) {
472 out << "skip-output";
473 } else if (val == test_config::ProxyMode::OnlyOutput) {
474 out << "only-output";
475 } else if (val == test_config::ProxyMode::NoProxies) {
478 out.setstate(std::ios_base::failbit);
483std::istream& operator>>(std::istream& in, enum test_header::MsgModeHeader::MsgMode& val)
487 if (token == "pair") {
488 val = test_header::MsgModeHeader::MsgMode::Pair;
489 } else if (token == "sequence") {
490 val = test_header::MsgModeHeader::MsgMode::Sequence;
492 in.setstate(std::ios_base::failbit);
497std::ostream& operator<<(std::ostream& out, const enum test_header::MsgModeHeader::MsgMode& val)
499 if (val == test_header::MsgModeHeader::MsgMode::Pair) {
501 } else if (val == test_header::MsgModeHeader::MsgMode::Sequence) {
504 out.setstate(std::ios_base::failbit);
Defining PrimaryVertex explicitly as messageable.
std::ostream & operator<<(std::ostream &s, ChannelType const &type)
Stream operators so that we can use ChannelType with Boost.Test.
std::istream & operator>>(std::istream &in, enum TerminationPolicy &policy)
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
void customize(std::vector< ConfigParamSpec > &workflowOptions)