Project
Loading...
Searching...
No Matches
o2ParallelWorkflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
17#include "Framework/Logger.h"
18
19#include <chrono>
20#include <thread>
21#include <vector>
22
23using namespace o2::framework;
24
25// we need to add workflow options before including Framework/runDataProcessing
26void customize(std::vector<ConfigParamSpec>& workflowOptions)
27{
28 std::string spaceParallelHelp("Number of tpc processing lanes. A lane is a pipeline of algorithms.");
29 workflowOptions.push_back(
30 ConfigParamSpec{"2-layer-jobs", VariantType::Int, 1, {spaceParallelHelp}});
31
32 std::string timeHelp("Time pipelining happening in the second layer");
33 workflowOptions.push_back(
34 ConfigParamSpec{"3-layer-pipelining", VariantType::Int, 1, {timeHelp}});
35}
36
37void customize(std::vector<CompletionPolicy>& policies)
38{
39 policies = {
40 CompletionPolicyHelpers::consumeWhenPastOldestPossibleTimeframe("merger-policy", [](auto const&) -> bool { return true; })};
41}
42
47
48#include <vector>
49
51
53{
54 return DataProcessorSpec{.name = "some-processor",
55 .inputs = {
56 InputSpec{"x", "TST", "A", 0, Lifetime::Timeframe},
57 },
58 .outputs = {
59 OutputSpec{"TST", "P", 0, Lifetime::Timeframe},
60 },
61 // The producer is stateful, we use a static for the state in this
62 // particular case, but a Singleton or a captured new object would
63 // work as well.
64 .algorithm = AlgorithmSpec{[](InitContext& setup) {
65 srand(setup.services().get<ParallelContext>().index1D());
66 return [](ProcessingContext& ctx) {
67 // Create a single output.
68 size_t index = ctx.services().get<ParallelContext>().index1D();
69 auto& i = ctx.outputs().make<int>(
70 Output{"TST", "P", static_cast<o2::header::DataHeader::SubSpecificationType>(index)}, 1);
71 i[0] = index;
72 std::this_thread::sleep_for(std::chrono::seconds(rand() % 5));
73 };
74 }}};
75}
76
77// This is a simple consumer / producer workflow where both are
78// stateful, i.e. they have context which comes from their initialization.
80{
81 size_t jobs = config.options().get<int>("2-layer-jobs");
82 size_t stages = config.options().get<int>("3-layer-pipelining");
83
84 // This is an example of how we can parallelize by subSpec.
85 // templatedProducer will be instanciated 32 times and the lambda function
86 // passed to the parallel statement will be applied to each one of the
87 // instances in order to modify it. Parallel will also make sure the name of
88 // the instance is amended from "some-producer" to "some-producer-<index>".
89 WorkflowSpec workflow = parallel(templateProcessor(), jobs, [](DataProcessorSpec& spec, size_t index) {
92 });
93
94 std::vector<OutputSpec> outputSpecs;
95 for (size_t ssi = 0; ssi < jobs; ++ssi) {
96 outputSpecs.emplace_back("TST", "A", ssi);
97 }
98
99 workflow.push_back(DataProcessorSpec{
100 .name = "reader",
101 .outputs = outputSpecs,
102 .algorithm = AlgorithmSpec{[jobs](InitContext& initCtx) {
103 return [jobs](ProcessingContext& ctx) {
104 static int count = 0;
105 for (size_t ji = 0; ji < jobs; ++ji) {
106 int& i = ctx.outputs().make<int>(Output{"TST", "A", static_cast<o2::header::DataHeader::SubSpecificationType>(ji)});
107 i = count * 100 + ji;
108 }
109 count++;
110 };
111 }}});
112 workflow.push_back(timePipeline(DataProcessorSpec{
113 .name = "merger",
114 .inputs = {InputSpec{"all", ConcreteDataTypeMatcher{"TST", "P"}}},
115 .outputs = {OutputSpec{{"out"}, "TST", "M"}},
116 .algorithm = AlgorithmSpec{[](InitContext& setup) {
117 return [](ProcessingContext& ctx) {
118 LOGP(info, "Run");
119 for (const auto& input : o2::framework::InputRecordWalker(ctx.inputs())) {
120 if (input.header == nullptr) {
121 LOGP(error, "Missing header");
122 continue;
123 }
124 int record = *(int*)input.payload;
125 LOGP(info, "Record {}", record);
126 }
127 ctx.outputs().make<int>(OutputRef("out", 0), 1);
128 };
129 }}},
130 stages));
131
132 workflow.push_back(DataProcessorSpec{
133 .name = "writer",
134 .inputs = {InputSpec{"x", "TST", "M"}},
135 .algorithm = AlgorithmSpec{[](InitContext& setup) {
136 return [](ProcessingContext& ctx) {
137 };
138 }}});
139 return workflow;
140}
int32_t i
A helper class to iteratate over all parts of all input routes.
ConfigParamRegistry & options() const
A helper class to iteratate over all parts of all input routes.
GLbitfield stages
Definition glcorearb.h:1905
GLint GLsizei count
Definition glcorearb.h:399
GLuint index
Definition glcorearb.h:781
Defining PrimaryVertex explicitly as messageable.
WorkflowSpec parallel(DataProcessorSpec original, size_t maxIndex, std::function< void(DataProcessorSpec &, size_t id)> amendCallback)
std::vector< DataProcessorSpec > WorkflowSpec
DataProcessorSpec timePipeline(DataProcessorSpec original, size_t count)
WorkflowSpec defineDataProcessing(ConfigContext const &config)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< ConfigParamSpec > &workflowOptions)
DataProcessorSpec templateProcessor()
static CompletionPolicy consumeWhenPastOldestPossibleTimeframe(const char *name, CompletionPolicy::Matcher matcher)
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
the main header struct
Definition DataHeader.h:619
uint32_t SubSpecificationType
Definition DataHeader.h:621