32void customize(std::vector<ConfigParamSpec>& options)
34 options.push_back({
"obj-bins", VariantType::Int, 100, {
"Number of bins in a histogram"}});
35 options.push_back({
"obj-rate", VariantType::Double, 1.0, {
"Number of objects per second sent by one producer"}});
36 options.push_back({
"obj-producers", VariantType::Int, 4, {
"Number of objects producers"}});
38 options.push_back({
"mergers-layers", VariantType::Int, 2, {
"Number of layers in the merger topology"}});
39 options.push_back({
"mergers-publication-interval", VariantType::Double, 10.0, {
"Publication interval of merged object [s]. It takes effect with --mergers-publication-decision interval"}});
41 {
"mergers-input-timespan", VariantType::String,
"diffs", {
"Should the topology use 'diffs' or 'full' objects"}});
56 int objectsBins = config.
options().
get<
int>(
"obj-bins");
57 double objectsRate = config.
options().
get<
double>(
"obj-rate");
58 int objectsProducers = config.
options().
get<
int>(
"obj-producers");
60 int mergersLayers = config.
options().
get<
int>(
"mergers-layers");
62 double mergersPublicationInterval = config.
options().
get<
double>(
"mergers-publication-interval");
64 config.
options().
get<std::string>(
"mergers-input-timespan") ==
"full" ? InputObjectsTimespan::FullHistory : InputObjectsTimespan::LastDifference;
71 for (
size_t p = 0; p < objectsProducers; p++) {
72 mergersInputs.push_back({
"mo",
"TST",
74 Lifetime::Sporadic });
81 Lifetime::Sporadic } },
84 ProcessingContext& processingContext)
mutable {
static auto lastTime = steady_clock::now();
85 auto now = steady_clock::now();
87 if (duration_cast<microseconds>(now - lastTime).
count() > periodus) {
89 lastTime += microseconds(periodus);
92 TH1F& histo = processingContext.outputs().make<TH1F>(
Output{
"TST",
"HISTO", subspec },
"gauss",
"gauss", objectsBins, -3, 3);
93 histo.FillRandom(
"gaus", 10000);
98 specs.push_back(producer);
104 mergersBuilder.
setOutputSpec({{
"main" },
"TST",
"HISTO", 0 });
107 std::vector<std::pair<size_t, size_t>>
param = {{mergersPublicationInterval, 1}};
110 mergerConfig.
topologySize = { TopologySize::NumberOfLayers, mergersLayers };
118 {
"histo",
"TST",
"HISTO", 0, Lifetime::Sporadic }
124 auto histo = processingContext.inputs().get<TH1F*>(
"histo");
125 std::string
bins =
"BINS:";
126 for (
int i = 1;
i <= histo->GetNbinsX();
i++) {
129 LOG(info) <<
"Trimming the output to 100 entries, total is: " << histo->GetNbinsX();
138 specs.push_back(printer);
ConfigParamRegistry & options() const