Project
Loading...
Searching...
No Matches
dpl-output-proxy.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
14#include "Framework/Logger.h"
19#include <vector>
20
21using namespace o2::framework;
22
23// we need to add workflow options before including Framework/runDataProcessing
24void customize(std::vector<ConfigParamSpec>& workflowOptions)
25{
26 workflowOptions.push_back(
28 "proxy-name", VariantType::String, "dpl-output-proxy", {"name of the proxy processor, will be the default output channel name as well"}});
29
30 workflowOptions.push_back(
32 "proxy-channel-name", VariantType::String, "downstream", {"output channel name of the proxy"}});
33
34 workflowOptions.push_back(
36 "dataspec", VariantType::String, "dpl-output-proxy:TST/CLUSTERS;dpl-output-proxy:TST/TRACKS", {"selection string for the data to be proxied"}});
37
38 workflowOptions.push_back(
40 "output-proxy-method", VariantType::String, "bind", {"proxy socket method: bind, connect"}});
41
42 workflowOptions.push_back(
44 "sporadic-inputs", VariantType::Bool, false, {"consider all the inputs as sporadic"}});
45
46 workflowOptions.push_back(
48 "output-proxy-address", VariantType::String, "0.0.0.0", {"address to connect / bind to"}});
49
50 workflowOptions.push_back(
52 "default-transport", VariantType::String, "shmem", {"default transport: shmem, zeromq"}});
53
54 workflowOptions.push_back(
56 "default-port", VariantType::Int, 4200, {"default port number"}});
57
58 /* workflowOptions.push_back( // Cannot read option in customize function, so using an env variable for now
59 ConfigParamSpec{
60 "ordered-completion-policy", VariantType::Bool, false, {"Use the ordered completion policy for the input"}});*/
61}
62
63void customize(std::vector<o2::framework::CompletionPolicy>& policies)
64{
65 static bool doOrdered = getenv("DPL_OUTPUT_PROXY_ORDERED") && atoi(getenv("DPL_OUTPUT_PROXY_ORDERED"));
66 static bool doAny = getenv("DPL_OUTPUT_PROXY_WHENANY") && atoi(getenv("DPL_OUTPUT_PROXY_WHENANY"));
67 if (doOrdered) {
68 policies.push_back(CompletionPolicyHelpers::consumeWhenAllOrdered(".*"));
69 } else if (doAny) {
70 policies.push_back(CompletionPolicyHelpers::consumeWhenAny(".*"));
71 }
72}
73
75
77{
78 auto processorName = config.options().get<std::string>("proxy-name");
79 auto inputConfig = config.options().get<std::string>("dataspec");
80 int defaultPort = config.options().get<int>("default-port");
81 bool sporadicInputs = config.options().get<bool>("sporadic-inputs");
82 auto defaultTransportConfig = config.options().get<std::string>("default-transport");
83 if (defaultTransportConfig == "zeromq") {
84 // nothing to do for the moment
85 } else if (defaultTransportConfig == "shmem") {
86 // nothing to do for the moment
87 } else {
88 throw std::runtime_error("invalid argument for option --default-transport : '" + defaultTransportConfig + "'");
89 }
90
91 std::vector<InputSpec> inputs = select(inputConfig.c_str());
92 if (inputs.size() == 0) {
93 throw std::runtime_error("invalid dataspec '" + inputConfig + "'");
94 }
95 // we need to set the lifetime of the inputs to sporadic if requested
96 if (sporadicInputs) {
97 for (auto& input : inputs) {
98 input.lifetime = Lifetime::Sporadic;
99 }
100 }
101
102 // we build the default channel configuration from the binding of the first input
103 // in order to have more than one we would need to possibility to have support for
104 // vectored options
105 // use the OutputChannelSpec as a tool to create the default configuration for the out-of-band channel
106 OutputChannelSpec externalChannelSpec;
107 externalChannelSpec.name = config.options().get<std::string>("proxy-channel-name");
108 externalChannelSpec.type = ChannelType::Push;
109 if (config.options().get<std::string>("output-proxy-method") == "bind") {
110 externalChannelSpec.method = ChannelMethod::Bind;
111 } else if (config.options().get<std::string>("output-proxy-method") == "connect") {
112 externalChannelSpec.method = ChannelMethod::Connect;
113 }
114 externalChannelSpec.hostname = config.options().get<std::string>("output-proxy-address");
115 externalChannelSpec.port = defaultPort;
116 externalChannelSpec.listeners = 0;
117 // in principle, protocol and transport are two different things but fur simplicity
118 // we use ipc when shared memory is selected and the normal tcp url whith zeromq,
119 // this is for building the default configuration which can be simply changed from the
120 // command line
121 if (!defaultTransportConfig.empty()) {
122 if (defaultTransportConfig == "zeromq") {
123 externalChannelSpec.protocol = ChannelProtocol::Network;
124 } else if (defaultTransportConfig == "shmem") {
125 externalChannelSpec.protocol = ChannelProtocol::IPC;
126 }
127 }
128 std::string defaultChannelConfig = formatExternalChannelConfiguration(externalChannelSpec);
129 // at some point the formatting tool might add the transport as well so we have to check
130 if (!defaultTransportConfig.empty() && defaultTransportConfig.find("transport=") == std::string::npos) {
131 defaultChannelConfig += ",transport=" + defaultTransportConfig;
132 }
133
134 std::vector<DataProcessorSpec> workflow;
135 workflow.emplace_back(std::move(specifyFairMQDeviceOutputProxy(processorName.c_str(), inputs, defaultChannelConfig.c_str())));
136 return workflow;
137}
ConfigParamRegistry & options() const
WorkflowSpec defineDataProcessing(ConfigContext const &config)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< ConfigParamSpec > &workflowOptions)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
DataProcessorSpec specifyFairMQDeviceOutputProxy(char const *label, Inputs const &inputSpecs, const char *defaultChannelConfig)
std::vector< DataProcessorSpec > WorkflowSpec
std::string formatExternalChannelConfiguration(InputChannelSpec const &)
helper method to format a configuration string for an external channel
std::vector< InputSpec > select(char const *matcher="")
static CompletionPolicy consumeWhenAllOrdered(const char *name, CompletionPolicy::Matcher matcher)
as consumeWhenAll, but ensures that records are processed with incremental timeSlice (DataHeader::sta...
static CompletionPolicy consumeWhenAny(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.