Project
Loading...
Searching...
No Matches
test_ParallelProducer.cxx
Go to the documentation of this file.
1
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3
// All rights not expressly granted are reserved.
4
//
5
// This software is distributed under the terms of the GNU General Public
6
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7
//
8
// In applying this license CERN does not waive the privileges and immunities
9
// granted to it by virtue of its status as an Intergovernmental Organization
10
// or submit itself to any jurisdiction.
11
#include "
Framework/ConfigContext.h
"
12
#include "
Framework/ControlService.h
"
13
#include "
Framework/DataProcessorSpec.h
"
14
#include "
Framework/DataSpecUtils.h
"
15
#include "
Framework/Logger.h
"
16
#include "
Framework/ParallelContext.h
"
17
18
#include <chrono>
19
#include <thread>
20
#include <vector>
21
22
using namespace
o2::framework
;
23
24
void
customize
(std::vector<ConfigParamSpec>& options)
25
{
26
options.push_back(
o2::framework::ConfigParamSpec
{
"jobs"
, VariantType::Int, 4, {
"number of producer jobs"
}});
27
};
28
29
#include "
Framework/runDataProcessing.h
"
30
31
using
DataHeader
=
o2::header::DataHeader
;
32
33
DataProcessorSpec
templateProducer
()
34
{
35
return
DataProcessorSpec
{
"some-producer"
,
Inputs
{}, {
36
OutputSpec
{
"TST"
,
"A"
, 0, Lifetime::Timeframe},
37
},
38
// The producer is stateful, we use a static for the state in this
39
// particular case, but a Singleton or a captured new object would
40
// work as well.
41
AlgorithmSpec
{[](
InitContext
& setup) {
42
return
[](
ProcessingContext
& ctx) {
43
// Create a single output.
44
size_t
index
= ctx.services().get<
ParallelContext
>().index1D();
45
std::this_thread::sleep_for(std::chrono::seconds(1));
46
auto
& aData = ctx.outputs().make<
int
>(
47
Output
{
"TST"
,
"A"
,
static_cast<
o2::header::DataHeader::SubSpecificationType
>
(
index
)}, 1);
48
ctx.services().get<
ControlService
>().readyToQuit(QuitRequest::All);
49
};
50
}}};
51
}
52
53
// This is a simple consumer / producer workflow where both are
54
// stateful, i.e. they have context which comes from their initialization.
55
WorkflowSpec
defineDataProcessing
(
ConfigContext
const
& context)
56
{
57
// This is an example of how we can parallelize by subSpec.
58
// templatedProducer will be instanciated 32 times and the lambda function
59
// passed to the parallel statement will be applied to each one of the
60
// instances in order to modify it. Parallel will also make sure the name of
61
// the instance is amended from "some-producer" to "some-producer-<index>".
62
auto
jobs = context.
options
().
get
<
int
>(
"jobs"
);
63
WorkflowSpec
workflow =
parallel
(
templateProducer
(), jobs, [](
DataProcessorSpec
& spec,
size_t
index
) {
64
DataSpecUtils::updateMatchingSubspec
(spec.
outputs
[0],
index
);
65
});
66
workflow.push_back(
DataProcessorSpec
{
67
"merger"
,
68
mergeInputs
(
InputSpec
{
"x"
,
"TST"
,
"A"
, 0, Lifetime::Timeframe},
69
jobs,
70
[](
InputSpec
& input,
size_t
index
) {
71
DataSpecUtils::updateMatchingSubspec
(input,
index
);
72
}),
73
{},
74
AlgorithmSpec
{[](
InitContext
& setup) {
75
return
[](
ProcessingContext
& ctx) {
76
// Create a single output.
77
LOG
(
debug
) <<
"Invoked"
<< std::endl;
78
};
79
}}});
80
81
return
workflow;
82
}
ConfigContext.h
ControlService.h
DataProcessorSpec.h
DataSpecUtils.h
Logger.h
ParallelContext.h
debug
std::ostringstream debug
Definition
VariantJSONHelpers.cxx:305
o2::framework::ConfigContext
Definition
ConfigContext.h:24
o2::framework::ConfigContext::options
ConfigParamRegistry & options() const
Definition
ConfigContext.h:28
o2::framework::ConfigParamRegistry::get
T get(const char *key) const
Definition
ConfigParamRegistry.h:98
o2::framework::ControlService
Definition
ControlService.h:40
o2::framework::InitContext
Definition
InitContext.h:25
o2::framework::ParallelContext
Definition
ParallelContext.h:33
o2::framework::ProcessingContext
Definition
ProcessingContext.h:27
index
GLuint index
Definition
glcorearb.h:781
o2::framework
Defining PrimaryVertex explicitly as messageable.
Definition
TFIDInfo.h:20
o2::framework::parallel
WorkflowSpec parallel(DataProcessorSpec original, size_t maxIndex, std::function< void(DataProcessorSpec &, size_t id)> amendCallback)
o2::framework::mergeInputs
Inputs mergeInputs(InputSpec original, size_t maxIndex, std::function< void(InputSpec &, size_t)> amendCallback)
Definition
WorkflowSpec.cxx:112
o2::framework::WorkflowSpec
std::vector< DataProcessorSpec > WorkflowSpec
Definition
HBFUtilsInitializer.h:39
o2::framework::Inputs
std::vector< InputSpec > Inputs
Definition
DataProcessorSpec.h:29
runDataProcessing.h
o2::framework::AlgorithmSpec
Definition
AlgorithmSpec.h:43
o2::framework::ConfigParamSpec
Definition
ConfigParamSpec.h:31
o2::framework::DataProcessorSpec
Definition
DataProcessorSpec.h:41
o2::framework::DataProcessorSpec::outputs
Outputs outputs
Definition
DataProcessorSpec.h:44
o2::framework::DataSpecUtils::updateMatchingSubspec
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
Definition
DataSpecUtils.cxx:150
o2::framework::InputSpec
Definition
InputSpec.h:31
o2::framework::OutputSpec
Definition
OutputSpec.h:33
o2::framework::Output
Definition
Output.h:27
o2::header::DataHeader
the main header struct
Definition
DataHeader.h:619
o2::header::DataHeader::SubSpecificationType
uint32_t SubSpecificationType
Definition
DataHeader.h:621
templateProducer
DataProcessorSpec templateProducer()
Definition
test_ParallelProducer.cxx:33
defineDataProcessing
WorkflowSpec defineDataProcessing(ConfigContext const &context)
This function hooks up the the workflow specifications into the DPL driver.
Definition
test_ParallelProducer.cxx:55
customize
void customize(std::vector< ConfigParamSpec > &options)
Definition
test_ParallelProducer.cxx:24
LOG
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
Framework
Core
test
test_ParallelProducer.cxx
Generated on Fri Oct 24 2025 01:01:50 for Project by
1.9.8