26void customize(std::vector<ConfigParamSpec>& workflowOptions)
28 std::string spaceParallelHelp(
"Number of tpc processing lanes. A lane is a pipeline of algorithms.");
29 workflowOptions.push_back(
30 ConfigParamSpec{
"2-layer-jobs", VariantType::Int, 1, {spaceParallelHelp}});
32 std::string timeHelp(
"Time pipelining happening in the second layer");
33 workflowOptions.push_back(
34 ConfigParamSpec{
"3-layer-pipelining", VariantType::Int, 1, {timeHelp}});
37void customize(std::vector<CompletionPolicy>& policies)
56 InputSpec{
"x",
"TST",
"A", 0, Lifetime::Timeframe},
59 OutputSpec{
"TST",
"P", 0, Lifetime::Timeframe},
69 auto&
i = ctx.outputs().make<
int>(
72 std::this_thread::sleep_for(std::chrono::seconds(rand() % 5));
81 size_t jobs = config.
options().
get<
int>(
"2-layer-jobs");
94 std::vector<OutputSpec> outputSpecs;
95 for (
size_t ssi = 0; ssi < jobs; ++ssi) {
96 outputSpecs.emplace_back(
"TST",
"A", ssi);
101 .outputs = outputSpecs,
104 static int count = 0;
105 for (
size_t ji = 0; ji < jobs; ++ji) {
120 if (input.header ==
nullptr) {
121 LOGP(error,
"Missing header");
124 int record = *(
int*)input.payload;
125 LOGP(info,
"Record {}", record);
127 ctx.outputs().make<
int>(
OutputRef(
"out", 0), 1);
ConfigParamRegistry & options() const
T get(const char *key) const
Defining PrimaryVertex explicitly as messageable.
WorkflowSpec parallel(DataProcessorSpec original, size_t maxIndex, std::function< void(DataProcessorSpec &, size_t id)> amendCallback)
std::vector< DataProcessorSpec > WorkflowSpec
DataProcessorSpec timePipeline(DataProcessorSpec original, size_t count)
WorkflowSpec defineDataProcessing(ConfigContext const &config)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< ConfigParamSpec > &workflowOptions)
DataProcessorSpec templateProcessor()
static CompletionPolicy consumeWhenPastOldestPossibleTimeframe(const char *name, CompletionPolicy::Matcher matcher)
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)