25void customize(std::vector<CompletionPolicy>& policies)
30 return device.
name ==
"heWhoRequestsExit";
35void customize(std::vector<ConfigParamSpec>& workflowOptions)
37 workflowOptions.push_back(
ConfigParamSpec{
"producers", VariantType::Int, 1, {
"number of producers"}});
39 "test-duration", VariantType::Int, 300, {
"how long should the test run (in seconds, max. 2147)"}});
45#include <Common/Timer.h>
46#include <Monitoring/Monitoring.h>
48using namespace AliceO2::Common;
59 size_t producers = config.
options().
get<
int>(
"producers");
60 size_t testDuration = config.
options().
get<
int>(
"test-duration");
63 for (
size_t p = 0; p < producers; p++) {
80 Inputs{{
"test-data", {
"TST",
"NODATA" }},
81 {
"sink-timer",
"TST",
"TIMER", 0, Lifetime::Timer }},
82 Outputs{{{
"output" },
"TST",
"ALSONODATA" }},
85 auto timer = std::make_shared<Timer>();
86 timer->reset(10 * 1000000);
87 uint64_t loopCounter = 0;
91 if (timer->isTimeout()) {
93 auto& monitoring = ctx.services().get<
Monitoring>();
94 monitoring.send({ loopCounter,
"loop_counter" });
99 Options{{
"period-sink-timer", VariantType::Int, 0, {
"timer period" }}}
101 specs.push_back(sink);
105 Inputs{{
"input",
"TST",
"ALSONODATA" },
106 {
"test-timer",
"TST",
"TIMER2", 0, Lifetime::Timer }},
111 LOG(info) <<
"Planned exit";
116 Options{{
"period-test-timer", VariantType::Int,
static_cast<int>(testDuration * 1000000), {
"timer period" }}}
119 specs.push_back(heWhoRequestsExit);
o2::monitoring::Monitoring Monitoring
ConfigParamRegistry & options() const
T get(const char *key) const
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
o2::header::DataHeader::SubSpecificationType SubSpec
void customize(std::vector< CompletionPolicy > &policies)
WorkflowSpec defineDataProcessing(ConfigContext const &config)
This function hooks up the the workflow specifications into the DPL driver.
Defining PrimaryVertex explicitly as messageable.
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
std::string to_string(gsl::span< T, Size > span)
std::function< ProcessCallback(InitContext &)> InitCallback
std::function< void(ProcessingContext &)> ProcessCallback
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
static CompletionPolicy consumeWhenAny(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
std::string name
The name of the associated DataProcessorSpec.
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"