19#include <fairmq/Device.h>
23void customize(std::vector<ConfigParamSpec>& workflowOptions)
25 workflowOptions.emplace_back(
26 ConfigParamSpec{
"dataspec", VariantType::String,
"", {
"DataSpec for the outputs"}});
27 workflowOptions.emplace_back(
28 ConfigParamSpec{
"name", VariantType::String,
"test-sink", {
"Name of the source"}});
37 LOG(info) <<
"Callback invoked";
39 device.
device()->WaitFor(std::chrono::seconds(minDelay));
48 auto dataspec = ctx.
options().
get<std::string>(
"dataspec");
49 std::vector<InputSpec> inputs =
select(dataspec.c_str());
53 .name = ctx.
options().
get<std::string>(
"name"),
57 LOG(info) <<
"Received " << inputs.
size() <<
" messages";
ConfigParamRegistry & options() const
T get(const char *key) const
decltype(auto) make(const Output &spec, Args... args)
virtual fair::mq::Device * device()=0
Defining PrimaryVertex explicitly as messageable.
std::vector< DataProcessorSpec > WorkflowSpec
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
std::vector< InputSpec > select(char const *matcher="")
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
WorkflowSpec defineDataProcessing(ConfigContext const &ctx)
This function hooks up the the workflow specifications into the DPL driver.
AlgorithmSpec simplePipe(std::string const &what, int minDelay)
void customize(std::vector< ConfigParamSpec > &workflowOptions)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"