Project
Loading...
Searching...
No Matches
o2ParallelWorkflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
13
14#include <chrono>
15#include <thread>
16#include <vector>
17
18using namespace o2::framework;
19
20// we need to add workflow options before including Framework/runDataProcessing
21void customize(std::vector<ConfigParamSpec>& workflowOptions)
22{
23 std::string spaceParallelHelp("Number of tpc processing lanes. A lane is a pipeline of algorithms.");
24 workflowOptions.push_back(
25 ConfigParamSpec{"2-layer-jobs", VariantType::Int, 1, {spaceParallelHelp}});
26
27 std::string timeHelp("Time pipelining happening in the second layer");
28 workflowOptions.push_back(
29 ConfigParamSpec{"3-layer-pipelining", VariantType::Int, 1, {timeHelp}});
30}
31
37
38#include "Framework/Logger.h"
39
40#include <vector>
41
43
45{
46 return DataProcessorSpec{"some-processor", {
47 InputSpec{"x", "TST", "A", 0, Lifetime::Timeframe},
48 },
49 {
50 OutputSpec{"TST", "P", 0, Lifetime::Timeframe},
51 },
52 // The producer is stateful, we use a static for the state in this
53 // particular case, but a Singleton or a captured new object would
54 // work as well.
55 AlgorithmSpec{[](InitContext& setup) {
56 srand(setup.services().get<ParallelContext>().index1D());
57 return [](ProcessingContext& ctx) {
58 // Create a single output.
59 size_t index = ctx.services().get<ParallelContext>().index1D();
60 auto& aData = ctx.outputs().make<int>(
61 Output{"TST", "P", static_cast<o2::header::DataHeader::SubSpecificationType>(index)}, 1);
62 std::this_thread::sleep_for(std::chrono::seconds(rand() % 5));
63 };
64 }}};
65}
66
67// This is a simple consumer / producer workflow where both are
68// stateful, i.e. they have context which comes from their initialization.
70{
71 size_t jobs = config.options().get<int>("2-layer-jobs");
72 size_t stages = config.options().get<int>("3-layer-pipelining");
73
74 // This is an example of how we can parallelize by subSpec.
75 // templatedProducer will be instanciated 32 times and the lambda function
76 // passed to the parallel statement will be applied to each one of the
77 // instances in order to modify it. Parallel will also make sure the name of
78 // the instance is amended from "some-producer" to "some-producer-<index>".
79 WorkflowSpec workflow = parallel(templateProcessor(), jobs, [](DataProcessorSpec& spec, size_t index) {
82 });
83
84 std::vector<OutputSpec> outputSpecs;
85 for (size_t ssi = 0; ssi < jobs; ++ssi) {
86 outputSpecs.emplace_back("TST", "A", ssi);
87 }
88
89 workflow.push_back(DataProcessorSpec{"reader", {}, outputSpecs, AlgorithmSpec{[jobs](InitContext& initCtx) {
90 return [jobs](ProcessingContext& ctx) {
91 for (size_t ji = 0; ji < jobs; ++ji) {
92 ctx.outputs().make<int>(Output{"TST", "A", static_cast<o2::header::DataHeader::SubSpecificationType>(ji)},
93 1);
94 }
95 };
96 }}});
97 workflow.push_back(timePipeline(DataProcessorSpec{
98 "merger",
99 mergeInputs(InputSpec{"x", "TST", "P"},
100 jobs,
101 [](InputSpec& input, size_t index) {
103 }),
104 {OutputSpec{{"out"}, "TST", "M"}},
105 AlgorithmSpec{[](InitContext& setup) {
106 return [](ProcessingContext& ctx) {
107 ctx.outputs().make<int>(OutputRef("out", 0), 1);
108 };
109 }}},
110 stages));
111
112 workflow.push_back(DataProcessorSpec{
113 "writer",
114 {InputSpec{"x", "TST", "M"}},
115 {},
116 AlgorithmSpec{[](InitContext& setup) {
117 return [](ProcessingContext& ctx) {
118 };
119 }}});
120 return workflow;
121}
ConfigParamRegistry & options() const
GLbitfield stages
Definition glcorearb.h:1905
GLuint index
Definition glcorearb.h:781
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
WorkflowSpec parallel(DataProcessorSpec original, size_t maxIndex, std::function< void(DataProcessorSpec &, size_t id)> amendCallback)
Inputs mergeInputs(InputSpec original, size_t maxIndex, std::function< void(InputSpec &, size_t)> amendCallback)
std::vector< DataProcessorSpec > WorkflowSpec
DataProcessorSpec timePipeline(DataProcessorSpec original, size_t count)
WorkflowSpec defineDataProcessing(ConfigContext const &config)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< ConfigParamSpec > &workflowOptions)
DataProcessorSpec templateProcessor()
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
the main header struct
Definition DataHeader.h:618
uint32_t SubSpecificationType
Definition DataHeader.h:620