Project
Loading...
Searching...
No Matches
multinodeBenchmarkProducers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
17
19#include <vector>
20
21using namespace o2::framework;
22
23// we need to add workflow options before including Framework/runDataProcessing
24void customize(std::vector<ConfigParamSpec>& options)
25{
26 options.push_back({"obj-bins", VariantType::Int, 100, {"Number of bins in a histogram"}});
27 options.push_back({"obj-rate", VariantType::Double, 1.0, {"Number of objects per second sent by one producer"}});
28 options.push_back({"obj-producers", VariantType::Int, 4, {"Number of objects producers"}});
29 options.push_back({"obj-per-message", VariantType::Int, 1, {"Number objects per message (in one TCollection)"}});
30 options.push_back({"output-channel-config", VariantType::String, "", {"Proxy output FMQ channel configuration"}});
31 options.push_back({"first-subspec", VariantType::Int, 1, {"First subSpec of the parallel producers, the rest will be incremental"}});
32}
33
35#include "Framework/Logger.h"
37#include <TRandomGen.h>
38#include <TObjArray.h>
39#include <TH1F.h>
40
41using namespace std::chrono;
43
45{
46 int objectsBins = config.options().get<int>("obj-bins");
47 double objectsRate = config.options().get<double>("obj-rate");
48 int objectsProducers = config.options().get<int>("obj-producers");
49 int objectsPerMessage = config.options().get<int>("obj-per-message");
50 std::string outputChannelConfig = config.options().get<std::string>("output-channel-config");
51 SubSpec subSpec = static_cast<SubSpec>(config.options().get<int>("first-subspec"));
52 WorkflowSpec specs;
53 // clang-format off
54 // one 1D histo
55 for (size_t p = 0; p < objectsProducers; p++, subSpec++) {
56 DataProcessorSpec producer{
57 "producer-histo" + std::to_string(subSpec),
58 Inputs{},
59 Outputs{ { { "histo" }, "TST", "HISTO", subSpec} },
62 const size_t randoms = 10000;
63 int periodus = static_cast<int>(1000000 / objectsRate);
64 TRandom gen;
65 gen.SetSeed(p);
66
67 double randomsArray[randoms];
68 TObjArray* collection = new TObjArray();
69 collection->SetOwner(true);
70
71 for (size_t i = 0; i < objectsPerMessage; i++) {
72 TH1I* h = new TH1I(std::to_string(i).c_str(), "uni", objectsBins, 0, 1);
73 collection->Add(h);
74 }
75
76 return (AlgorithmSpec::ProcessCallback)[=](ProcessingContext& pctx) mutable {
77
78 static auto lastTime = steady_clock::now() - std::chrono::microseconds(periodus * p / objectsProducers);
79 auto now = steady_clock::now();
80 if (duration_cast<microseconds>(now - lastTime).count() > periodus) {
81 lastTime += microseconds(periodus);
82
83 if (objectsPerMessage > 1) {
84 for (auto o : *collection) {
85 gen.RndmArray(randoms, randomsArray);
86
87 TH1I* h = dynamic_cast<TH1I*>(o);
88 h->Reset();
89 h->FillN(randoms, randomsArray, nullptr);
90 }
91
92 collection->SetOwner(false);
93 pctx.outputs().snapshot({"TST", "HISTO", subSpec}, *collection);
94 collection->SetOwner(true);
95 } else {
96 gen.RndmArray(randoms, randomsArray);
97
98 TH1I* h = dynamic_cast<TH1I*>(collection->At(0));
99 h->Reset();
100 h->FillN(randoms, randomsArray, nullptr);
101
102 pctx.outputs().snapshot({"TST", "HISTO", subSpec}, *h);
103 }
104 }
105 };
106 }
107 }
108 };
109 specs.push_back(producer);
110
111 // We spawn one proxy per each producer to simulate the real scenario.
112 specs.emplace_back(
113 std::move(
115 ("histo-proxy-" + std::to_string(p)).c_str(),
116 {{"histo", "TST", "HISTO", subSpec }}, outputChannelConfig.c_str())));
117 }
118
119 return specs;
120}
121// clang-format on
default_random_engine gen(dev())
int32_t i
bool o
Class for time synchronization of RawReader instances.
ConfigParamRegistry & options() const
o2::header::DataHeader::SubSpecificationType SubSpec
GLint GLsizei count
Definition glcorearb.h:399
WorkflowSpec defineDataProcessing(ConfigContext const &config)
This function hooks up the the workflow specifications into the DPL driver.
void customize(std::vector< ConfigParamSpec > &options)
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
DataProcessorSpec specifyFairMQDeviceOutputProxy(char const *label, Inputs const &inputSpecs, const char *defaultChannelConfig)
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< InputSpec > Inputs
std::vector< OutputSpec > Outputs
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::function< ProcessCallback(InitContext &)> InitCallback
std::function< void(ProcessingContext &)> ProcessCallback
uint32_t SubSpecificationType
Definition DataHeader.h:620