Project
Loading...
Searching...
No Matches
mergersBenchmarkTopology.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
17
20
22
23using namespace o2::framework;
24using namespace o2::mergers;
25
26void customize(std::vector<CompletionPolicy>& policies)
27{
29}
30
31// we need to add workflow options before including Framework/runDataProcessing
32void customize(std::vector<ConfigParamSpec>& options)
33{
34 options.push_back({"obj-bins", VariantType::Int, 100, {"Number of bins in a histogram"}});
35 options.push_back({"obj-rate", VariantType::Double, 1.0, {"Number of objects per second sent by one producer"}});
36 options.push_back({"obj-producers", VariantType::Int, 4, {"Number of objects producers"}});
37
38 options.push_back({"mergers-layers", VariantType::Int, 2, {"Number of layers in the merger topology"}});
39 options.push_back({"mergers-publication-interval", VariantType::Double, 10.0, {"Publication interval of merged object [s]. It takes effect with --mergers-publication-decision interval"}});
40 options.push_back(
41 {"mergers-input-timespan", VariantType::String, "diffs", {"Should the topology use 'diffs' or 'full' objects"}});
42}
43
45
46#include <TH1F.h>
47#include <memory>
48#include <random>
49#include "Framework/Logger.h"
51
52using namespace std::chrono;
53
55{
56 int objectsBins = config.options().get<int>("obj-bins");
57 double objectsRate = config.options().get<double>("obj-rate");
58 int objectsProducers = config.options().get<int>("obj-producers");
59
60 int mergersLayers = config.options().get<int>("mergers-layers");
61 PublicationDecision mergersPublicationDecision = PublicationDecision::EachNSeconds;
62 double mergersPublicationInterval = config.options().get<double>("mergers-publication-interval");
63 InputObjectsTimespan mergersInputObjectTimespan =
64 config.options().get<std::string>("mergers-input-timespan") == "full" ? InputObjectsTimespan::FullHistory : InputObjectsTimespan::LastDifference;
65
66 WorkflowSpec specs;
67 // clang-format off
68 // one 1D histo, binwise
69 {
70 Inputs mergersInputs;
71 for (size_t p = 0; p < objectsProducers; p++) {
72 mergersInputs.push_back({ "mo", "TST",
73 "HISTO", static_cast<o2::header::DataHeader::SubSpecificationType>(p + 1),
74 Lifetime::Sporadic });
75 DataProcessorSpec producer{
76 "producer-histo" + std::to_string(p), Inputs{},
77 Outputs{ { { "mo" },
78 "TST",
79 "HISTO",
81 Lifetime::Sporadic } },
83 (AlgorithmSpec::ProcessCallback)[ p, periodus = int(1000000 / objectsRate), objectsBins, objectsProducers ](
84 ProcessingContext& processingContext) mutable { static auto lastTime = steady_clock::now();
85 auto now = steady_clock::now();
86
87 if (duration_cast<microseconds>(now - lastTime).count() > periodus) {
88
89 lastTime += microseconds(periodus);
90
91 auto subspec = static_cast<o2::header::DataHeader::SubSpecificationType>(p + 1);
92 TH1F& histo = processingContext.outputs().make<TH1F>(Output{ "TST", "HISTO", subspec }, "gauss", "gauss", objectsBins, -3, 3);
93 histo.FillRandom("gaus", 10000);
94 }
95 }
96 }
97 };
98 specs.push_back(producer);
99 }
100
101 MergerInfrastructureBuilder mergersBuilder;
102 mergersBuilder.setInfrastructureName("histos");
103 mergersBuilder.setInputSpecs(mergersInputs);
104 mergersBuilder.setOutputSpec({{ "main" }, "TST", "HISTO", 0 });
105 MergerConfig mergerConfig;
106 mergerConfig.inputObjectTimespan = { mergersInputObjectTimespan };
107 std::vector<std::pair<size_t, size_t>> param = {{mergersPublicationInterval, 1}};
108 mergerConfig.publicationDecision = { mergersPublicationDecision, param };
109 mergerConfig.mergedObjectTimespan = { MergedObjectTimespan::FullHistory };
110 mergerConfig.topologySize = { TopologySize::NumberOfLayers, mergersLayers };
111 mergersBuilder.setConfig(mergerConfig);
112
113 mergersBuilder.generateInfrastructure(specs);
114
115 DataProcessorSpec printer{
116 "printer-bins",
117 Inputs{
118 { "histo", "TST", "HISTO", 0, Lifetime::Sporadic }
119 },
120 Outputs{},
123 return (AlgorithmSpec::ProcessCallback) [](ProcessingContext& processingContext) mutable {
124 auto histo = processingContext.inputs().get<TH1F*>("histo");
125 std::string bins = "BINS:";
126 for (int i = 1; i <= histo->GetNbinsX(); i++) {
127 bins += " " + std::to_string((int) histo->GetBinContent(i));
128 if (i >= 100) {
129 LOG(info) << "Trimming the output to 100 entries, total is: " << histo->GetNbinsX();
130 break;
131 }
132 }
133 LOG(info) << bins;
134 };
135 }
136 }
137 };
138 specs.push_back(printer);
139 }
140
141 return specs;
142}
143// clang-format on
const auto bins
Definition PID.cxx:49
int32_t i
Definition of O2 MergerInfrastructureBuilder, v0.1.
ConfigParamRegistry & options() const
static void customizeInfrastructure(std::vector< framework::CompletionPolicy > &)
Configures mergers to consume any data immediately.
void setOutputSpec(const framework::OutputSpec &outputSpec)
void setInputSpecs(const framework::Inputs &inputs)
GLint GLsizei count
Definition glcorearb.h:399
GLenum GLfloat param
Definition glcorearb.h:271
void customize(std::vector< CompletionPolicy > &policies)
WorkflowSpec defineDataProcessing(ConfigContext const &config)
This function hooks up the the workflow specifications into the DPL driver.
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::function< ProcessCallback(InitContext &)> InitCallback
std::function< void(ProcessingContext &)> ProcessCallback
uint32_t SubSpecificationType
Definition DataHeader.h:620
ConfigEntry< PublicationDecision, PublicationDecisionParameter > publicationDecision
ConfigEntry< TopologySize, std::variant< int, std::vector< size_t > > > topologySize
ConfigEntry< InputObjectsTimespan > inputObjectTimespan
ConfigEntry< MergedObjectTimespan, int > mergedObjectTimespan
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"