Project
Loading...
Searching...
No Matches
emptyLoopBenchmark.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
16
22
23using namespace o2::framework;
24
25void customize(std::vector<CompletionPolicy>& policies)
26{
27 // consume always, even when there is no data.
28 policies.push_back(CompletionPolicyHelpers::defineByName("sink", CompletionPolicy::CompletionOp::Consume));
29 policies.push_back(CompletionPolicyHelpers::consumeWhenAny("exitConsumeAny", [](DeviceSpec const& device) {
30 return device.name == "heWhoRequestsExit";
31 }));
32}
33
34// we need to add workflow options before including Framework/runDataProcessing
35void customize(std::vector<ConfigParamSpec>& workflowOptions)
36{
37 workflowOptions.push_back(ConfigParamSpec{"producers", VariantType::Int, 1, {"number of producers"}});
38 workflowOptions.push_back(ConfigParamSpec{
39 "test-duration", VariantType::Int, 300, {"how long should the test run (in seconds, max. 2147)"}});
40}
41
44
45#include <Common/Timer.h>
46#include <Monitoring/Monitoring.h>
47
48using namespace AliceO2::Common;
49using namespace o2::monitoring;
51
52// We spawn fake data producers which do absolutely nothing.
53// In the receiver we measure how many cycles/second can it do,
54// but without taking into account the cost of receiving and sending data.
55
56// clang-format off
58{
59 size_t producers = config.options().get<int>("producers");
60 size_t testDuration = config.options().get<int>("test-duration");
61
62 WorkflowSpec specs;
63 for (size_t p = 0; p < producers; p++) {
64 specs.push_back(DataProcessorSpec{
65 "dataProducer" + std::to_string(p),
66 Inputs{},
67 Outputs{
68 OutputSpec{ "TST", "NODATA", static_cast<SubSpec>(p) }
69 },
72 usleep(1000000);
73 }
74 }
75 });
76 }
77
79 "sink",
80 Inputs{{ "test-data", { "TST", "NODATA" }},
81 { "sink-timer", "TST", "TIMER", 0, Lifetime::Timer }},
82 Outputs{{{ "output" }, "TST", "ALSONODATA" }},
85 auto timer = std::make_shared<Timer>();
86 timer->reset(10 * 1000000);
87 uint64_t loopCounter = 0;
88
89 return (AlgorithmSpec::ProcessCallback) [=](ProcessingContext& ctx) mutable {
90 loopCounter++;
91 if (timer->isTimeout()) {
92 timer->increment();
93 auto& monitoring = ctx.services().get<Monitoring>();
94 monitoring.send({ loopCounter, "loop_counter" });
95 }
96 };
97 }
98 },
99 Options{{"period-sink-timer", VariantType::Int, 0, { "timer period" }}}
100 };
101 specs.push_back(sink);
102
103 DataProcessorSpec heWhoRequestsExit{
104 "heWhoRequestsExit",
105 Inputs{{ "input", "TST", "ALSONODATA" },
106 { "test-timer", "TST", "TIMER2", 0, Lifetime::Timer }},
107 Outputs{},
110 if (ctx.inputs().isValid("test-timer")) {
111 LOG(info) << "Planned exit";
112 ctx.services().get<ControlService>().readyToQuit(QuitRequest::All);
113 }
114 }
115 },
116 Options{{"period-test-timer", VariantType::Int, static_cast<int>(testDuration * 1000000), { "timer period" }}}
117 };
118
119 specs.push_back(heWhoRequestsExit);
120 return specs;
121}
o2::monitoring::Monitoring Monitoring
ConfigParamRegistry & options() const
bool isValid(std::string const &s) const
Helper method to be used to check if a given part of the InputRecord is present.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
o2::header::DataHeader::SubSpecificationType SubSpec
void customize(std::vector< CompletionPolicy > &policies)
WorkflowSpec defineDataProcessing(ConfigContext const &config)
This function hooks up the the workflow specifications into the DPL driver.
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::function< ProcessCallback(InitContext &)> InitCallback
std::function< void(ProcessingContext &)> ProcessCallback
static CompletionPolicy defineByName(std::string const &name, CompletionPolicy::CompletionOp op)
static CompletionPolicy consumeWhenAny(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
std::string name
The name of the associated DataProcessorSpec.
Definition DeviceSpec.h:50
uint32_t SubSpecificationType
Definition DataHeader.h:620
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"