24void customize(std::vector<ConfigParamSpec>& workflowOptions)
26 workflowOptions.push_back(
28 "proxy-name", VariantType::String,
"dpl-output-proxy", {
"name of the proxy processor, will be the default output channel name as well"}});
30 workflowOptions.push_back(
32 "proxy-channel-name", VariantType::String,
"downstream", {
"output channel name of the proxy"}});
34 workflowOptions.push_back(
36 "dataspec", VariantType::String,
"dpl-output-proxy:TST/CLUSTERS;dpl-output-proxy:TST/TRACKS", {
"selection string for the data to be proxied"}});
38 workflowOptions.push_back(
40 "output-proxy-method", VariantType::String,
"bind", {
"proxy socket method: bind, connect"}});
42 workflowOptions.push_back(
44 "sporadic-inputs", VariantType::Bool,
false, {
"consider all the inputs as sporadic"}});
46 workflowOptions.push_back(
48 "output-proxy-address", VariantType::String,
"0.0.0.0", {
"address to connect / bind to"}});
50 workflowOptions.push_back(
52 "default-transport", VariantType::String,
"shmem", {
"default transport: shmem, zeromq"}});
54 workflowOptions.push_back(
56 "default-port", VariantType::Int, 4200, {
"default port number"}});
78 auto processorName = config.
options().
get<std::string>(
"proxy-name");
79 auto inputConfig = config.
options().
get<std::string>(
"dataspec");
80 int defaultPort = config.
options().
get<
int>(
"default-port");
81 bool sporadicInputs = config.
options().
get<
bool>(
"sporadic-inputs");
82 auto defaultTransportConfig = config.
options().
get<std::string>(
"default-transport");
83 if (defaultTransportConfig ==
"zeromq") {
85 }
else if (defaultTransportConfig ==
"shmem") {
88 throw std::runtime_error(
"invalid argument for option --default-transport : '" + defaultTransportConfig +
"'");
91 std::vector<InputSpec> inputs =
select(inputConfig.c_str());
92 if (inputs.size() == 0) {
93 throw std::runtime_error(
"invalid dataspec '" + inputConfig +
"'");
97 for (
auto& input : inputs) {
98 input.lifetime = Lifetime::Sporadic;
107 externalChannelSpec.
name = config.
options().
get<std::string>(
"proxy-channel-name");
108 externalChannelSpec.
type = ChannelType::Push;
109 if (config.
options().
get<std::string>(
"output-proxy-method") ==
"bind") {
110 externalChannelSpec.
method = ChannelMethod::Bind;
111 }
else if (config.
options().
get<std::string>(
"output-proxy-method") ==
"connect") {
112 externalChannelSpec.
method = ChannelMethod::Connect;
114 externalChannelSpec.
hostname = config.
options().
get<std::string>(
"output-proxy-address");
115 externalChannelSpec.
port = defaultPort;
121 if (!defaultTransportConfig.empty()) {
122 if (defaultTransportConfig ==
"zeromq") {
123 externalChannelSpec.
protocol = ChannelProtocol::Network;
124 }
else if (defaultTransportConfig ==
"shmem") {
125 externalChannelSpec.
protocol = ChannelProtocol::IPC;
130 if (!defaultTransportConfig.empty() && defaultTransportConfig.find(
"transport=") == std::string::npos) {
131 defaultChannelConfig +=
",transport=" + defaultTransportConfig;
134 std::vector<DataProcessorSpec> workflow;
ConfigParamRegistry & options() const