16#include <fairmq/Device.h>
23void customize(std::vector<o2::framework::CompletionPolicy>& policies)
36 .outputs = {
OutputSpec{{
"a"},
"CLP",
"D0", 0, Lifetime::Timeframe},
37 OutputSpec{{
"b"},
"CLW",
"D0", 0, Lifetime::Timeframe}},
46 device.
device()->WaitFor(std::chrono::milliseconds(100));
48 LOGP(info,
"Done sending 100 messages on the fast path");
54 .outputs = {
OutputSpec{{
"a"},
"CLP",
"D1", 1, Lifetime::Timeframe},
55 OutputSpec{{
"b"},
"CLW",
"D1", 1, Lifetime::Timeframe}},
64 device.
device()->WaitFor(std::chrono::milliseconds(1000));
66 LOGP(info,
"Done sending 10 messages on the slow path");
72 .inputs = {{
"x",
"CLP", Lifetime::Sporadic},
73 {
"y",
"CLW", Lifetime::Sporadic}},
76 static int a1Count = 0;
77 static int a2Count = 0;
79 auto&
x = inputs.
get<
int>(
"x");
81 LOGP(info,
"Received from A1 {}", a1Count++);
83 LOGP(info,
"Received from A2 {}", a2Count++);
85 LOGP(fatal,
"Unexpected value {}",
x);
87 LOGP(info,
"Count is {} {}", a1Count, a2Count);
88 if (a1Count == 101 && a2Count == 11) {
89 LOGP(info,
"Done receiving all messages");
void readyToQuit(bool all)
Compatibility with old API.
void endOfStream()
Signal that we are done with the current stream.
decltype(auto) make(const Output &spec, Args... args)
virtual fair::mq::Device * device()=0
typedef void(APIENTRYP PFNGLCULLFACEPROC)(GLenum mode)
Defining PrimaryVertex explicitly as messageable.
std::vector< DataProcessorSpec > WorkflowSpec
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
WorkflowSpec defineDataProcessing(ConfigContext const &specs)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< o2::framework::CompletionPolicy > &policies)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"