Project
Loading...
Searching...
No Matches
o2SimpleSource.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
14#include "Framework/InputSpec.h"
16
17#include <chrono>
18#include <thread>
19#include <vector>
20
21using namespace o2::framework;
22// The dataspec is a workflow option, because it affects the
23// way the topology is built.
24void customize(std::vector<ConfigParamSpec>& workflowOptions)
25{
26 workflowOptions.emplace_back(
27 ConfigParamSpec{"dataspec", VariantType::String, "tst:TST/A/0", {"DataSpec for the outputs"}});
28 workflowOptions.emplace_back(
29 ConfigParamSpec{"name", VariantType::String, "test-source", {"Name of the source"}});
30 workflowOptions.emplace_back(
31 ConfigParamSpec{"timer", VariantType::String, "", {"What to use as timer intervals. Format is <period>:<validity since start>[, ...]"}});
32 workflowOptions.emplace_back(
33 ConfigParamSpec{"delay", VariantType::Int, 0, {"How long it takes to do the processing (in ms)"}});
34}
35
37
38// This is how you can define your processing in a declarative way
40{
41 // Get the dataspec option and creates OutputSpecs from it
42 auto dataspec = ctx.options().get<std::string>("dataspec");
43 auto timer = ctx.options().get<std::string>("timer");
44 auto delay = ctx.options().get<int>("delay");
45
46 std::vector<InputSpec> inputs;
47 std::vector<TimerSpec> timers;
48 if (timer.empty() == false) {
49 // Split timer at every comma, then split each part at every colon
50 // and create a TimerIntervalSpec from it.
51 while (true) {
52 auto comma = timer.find(',');
53 auto colon = timer.find(':');
54 if (colon == std::string::npos) {
55 break;
56 }
57 auto validity = std::stoull(timer.substr(0, colon));
58 auto period = std::stoull(timer.substr(colon + 1, comma - colon - 1));
59 timers.push_back(TimerSpec{.period = period, .validity = validity});
60 if (comma == std::string::npos) {
61 break;
62 }
63 timer = timer.substr(comma + 1);
64 }
65 inputs.emplace_back("timer", "TST", "TIMER", 0, Lifetime::Timer, timerSpecs(timers));
66 }
67
68 std::vector<InputSpec> matchers = select(dataspec.c_str());
69 std::vector<std::string> outputRefs;
70 std::vector<OutputSpec> outputSpecs;
71 for (auto const& matcher : matchers) {
72 outputRefs.emplace_back(matcher.binding);
73 outputSpecs.emplace_back(DataSpecUtils::asOutputSpec(matcher));
74 }
75
76 return WorkflowSpec{
77 {.name = ctx.options().get<std::string>("name"),
78 .inputs = inputs,
79 .outputs = outputSpecs,
80 .algorithm = AlgorithmSpec{adaptStateful(
81 [outputSpecs, delay](ConfigParamRegistry const& options) {
82 // the size of the messages is also a workflow option
83 auto dataSize = options.get<int64_t>("data-size");
84 return adaptStateless(
85 [outputSpecs, dataSize, delay](DataAllocator& outputs, ProcessingContext& ctx) {
86 for (auto const& output : outputSpecs) {
88 std::this_thread::sleep_for(std::chrono::milliseconds(delay));
89 outputs.make<char>(Output{concrete.origin, concrete.description, concrete.subSpec}, dataSize);
90 }
91 });
92 })},
93 .options = {ConfigParamSpec{"data-size", VariantType::Int64, 1LL, {"Size of the created messages"}}}}};
94}
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
ConfigParamRegistry & options() const
decltype(auto) make(const Output &spec, Args... args)
GLenum GLsizei dataSize
Definition glcorearb.h:3994
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
std::vector< InputSpec > select(char const *matcher="")
std::vector< ConfigParamSpec > timerSpecs(std::vector< TimerSpec > intervals)
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
WorkflowSpec defineDataProcessing(ConfigContext const &ctx)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< ConfigParamSpec > &workflowOptions)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
static OutputSpec asOutputSpec(InputSpec const &spec)
header::DataOrigin origin
Definition Output.h:28
size_t period
The period of the timer in microseconds.