20#include <fairmq/Device.h>
37void customize(std::vector<CallbacksPolicy>& policies)
42 service.
set<CallbackService::Id::Start>([]() {
LOG(info) <<
"invoked at start"; });
52 LOG(info) <<
"A custom policy for sending invoked!";
54 channel->Send(parts, 0);
64 LOG(info) <<
"There are " << runningWorkflow.
devices.size() <<
" devices in the workflow";
66 device.
device()->WaitFor(std::chrono::milliseconds(minDelay));
83 device.
device()->WaitFor(std::chrono::seconds(
r));
90 {
ConfigParamSpec{
"some-device-param", VariantType::Int, 1, {
"Some device parameter"}}}},
92 {
InputSpec{
"x",
"TST",
"A1", Lifetime::Timeframe, {
ConfigParamSpec{
"somestring", VariantType::String,
"", {
"Some input param"}}}}},
107 auto ref = inputs.
get(
"b");
108 auto header = o2::header::get<const DataProcessingHeader*>(
ref.header);
109 LOG(
debug) <<
"Start time: " << header->startTime;
decltype(auto) make(const Output &spec, Args... args)
fair::mq::Channel * getOutputChannel(ChannelIndex channelIndex) const
virtual fair::mq::Device * device()=0
Defining PrimaryVertex explicitly as messageable.
std::vector< DataProcessorSpec > WorkflowSpec
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
WorkflowSpec defineDataProcessing(ConfigContext const &specs)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< CallbacksPolicy > &policies)
AlgorithmSpec simplePipe(std::string const &what, int minDelay)
Configurable< int > anInt
Configurable< std::string > aString
Configurable< bool > aBool
Configurable< double > aDouble
Configurable< float > aFloat
static DeviceMatcher matchByName(const char *name)
static EdgeMatcher matchSourceByName(const char *name)
Information about the running workflow.
std::vector< DeviceSpec > devices
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"