Project
Loading...
Searching...
No Matches
test_ParallelProducer.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
15#include "Framework/Logger.h"
17
18#include <chrono>
19#include <thread>
20#include <vector>
21
22using namespace o2::framework;
23
24void customize(std::vector<ConfigParamSpec>& options)
25{
26 options.push_back(o2::framework::ConfigParamSpec{"jobs", VariantType::Int, 4, {"number of producer jobs"}});
27};
28
30
32
34{
35 return DataProcessorSpec{"some-producer", Inputs{}, {
36 OutputSpec{"TST", "A", 0, Lifetime::Timeframe},
37 },
38 // The producer is stateful, we use a static for the state in this
39 // particular case, but a Singleton or a captured new object would
40 // work as well.
41 AlgorithmSpec{[](InitContext& setup) {
42 return [](ProcessingContext& ctx) {
43 // Create a single output.
44 size_t index = ctx.services().get<ParallelContext>().index1D();
45 std::this_thread::sleep_for(std::chrono::seconds(1));
46 auto& aData = ctx.outputs().make<int>(
47 Output{"TST", "A", static_cast<o2::header::DataHeader::SubSpecificationType>(index)}, 1);
48 ctx.services().get<ControlService>().readyToQuit(QuitRequest::All);
49 };
50 }}};
51}
52
53// This is a simple consumer / producer workflow where both are
54// stateful, i.e. they have context which comes from their initialization.
56{
57 // This is an example of how we can parallelize by subSpec.
58 // templatedProducer will be instanciated 32 times and the lambda function
59 // passed to the parallel statement will be applied to each one of the
60 // instances in order to modify it. Parallel will also make sure the name of
61 // the instance is amended from "some-producer" to "some-producer-<index>".
62 auto jobs = context.options().get<int>("jobs");
63 WorkflowSpec workflow = parallel(templateProducer(), jobs, [](DataProcessorSpec& spec, size_t index) {
65 });
66 workflow.push_back(DataProcessorSpec{
67 "merger",
68 mergeInputs(InputSpec{"x", "TST", "A", 0, Lifetime::Timeframe},
69 jobs,
70 [](InputSpec& input, size_t index) {
72 }),
73 {},
74 AlgorithmSpec{[](InitContext& setup) {
75 return [](ProcessingContext& ctx) {
76 // Create a single output.
77 LOG(debug) << "Invoked" << std::endl;
78 };
79 }}});
80
81 return workflow;
82}
std::ostringstream debug
ConfigParamRegistry & options() const
GLuint index
Definition glcorearb.h:781
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
WorkflowSpec parallel(DataProcessorSpec original, size_t maxIndex, std::function< void(DataProcessorSpec &, size_t id)> amendCallback)
Inputs mergeInputs(InputSpec original, size_t maxIndex, std::function< void(InputSpec &, size_t)> amendCallback)
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
static void updateMatchingSubspec(InputSpec &in, header::DataHeader::SubSpecificationType subSpec)
the main header struct
Definition DataHeader.h:618
uint32_t SubSpecificationType
Definition DataHeader.h:620
DataProcessorSpec templateProducer()
WorkflowSpec defineDataProcessing(ConfigContext const &context)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< ConfigParamSpec > &options)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"