21void customize(std::vector<ConfigParamSpec>& workflowOptions)
23 workflowOptions.emplace_back(
24 ConfigParamSpec{
"in-dataspec", VariantType::String,
"", {
"DataSpec for the outputs"}});
25 workflowOptions.emplace_back(
26 ConfigParamSpec{
"out-dataspec", VariantType::String,
"", {
"DataSpec for the outputs"}});
27 workflowOptions.emplace_back(
28 ConfigParamSpec{
"eos-dataspec", VariantType::String,
"", {
"DataSpec for the outputs during EoS"}});
29 workflowOptions.emplace_back(
30 ConfigParamSpec{
"processing-delay", VariantType::Int, 0, {
"How long the processing takes"}});
31 workflowOptions.emplace_back(
32 ConfigParamSpec{
"eos-delay", VariantType::Int, 0, {
"How long the takes to do eos"}});
33 workflowOptions.emplace_back(
34 ConfigParamSpec{
"name", VariantType::String,
"test-processor", {
"Name of the processor"}});
42 auto inDataspec = ctx.
options().
get<std::string>(
"in-dataspec");
43 auto outDataspec = ctx.
options().
get<std::string>(
"out-dataspec");
45 auto eosDataspec = ctx.
options().
get<std::string>(
"eos-dataspec");
47 auto processingDelay = ctx.
options().
get<
int>(
"processing-delay");
48 auto eosDelay = ctx.
options().
get<
int>(
"eos-delay");
50 std::vector<InputSpec> inputs =
select(inDataspec.c_str());
52 for (
auto& input : inputs) {
56 std::vector<InputSpec> matchers =
select(outDataspec.c_str());
57 std::vector<std::string> outputRefs;
58 std::vector<OutputSpec> outputs;
60 for (
auto const& matcher : matchers) {
61 outputRefs.emplace_back(matcher.binding);
65 std::vector<InputSpec> eosMatchers =
select(eosDataspec.c_str());
66 std::vector<std::string> eosRefs;
67 std::vector<OutputSpec> eosOutputs;
69 for (
auto const& matcher : eosMatchers) {
70 eosRefs.emplace_back(matcher.binding);
72 eosOut.lifetime = Lifetime::Sporadic;
73 outputs.emplace_back(eosOut);
78 LOG(info) <<
"Creating objects on end of stream reception.";
79 std::this_thread::sleep_for(std::chrono::seconds(eosDelay));
84 LOG(info) <<
"Received " << inputs.
size() <<
" messages. Converting.";
86 std::this_thread::sleep_for(std::chrono::milliseconds(processingDelay));
87 for (
auto&
ref : outputRefs) {
88 LOGP(info,
"Creating {}.",
ref);
95 {.name = ctx.
options().
get<std::string>(
"name"),
ConfigParamRegistry & options() const