11#include <benchmark/benchmark.h>
25#include <Monitoring/Monitoring.h>
26#include <fairmq/TransportFactory.h>
53 int quickUpdateInterval = 1;
54 std::vector<MetricSpec> specs{
55 MetricSpec{.
name =
"malformed_inputs", .metricId =
static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), .minPublishInterval = quickUpdateInterval},
56 MetricSpec{.name =
"dropped_computations", .metricId =
static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), .minPublishInterval = quickUpdateInterval},
57 MetricSpec{.name =
"dropped_incoming_messages", .metricId =
static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), .minPublishInterval = quickUpdateInterval},
58 MetricSpec{.name =
"relayed_messages", .metricId =
static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), .minPublishInterval = quickUpdateInterval}};
59 for (
auto& spec : specs) {
65 r.registerService(ServiceRegistryHelpers::handleForService<DataProcessingStates>(&
states));
66 r.registerService(ServiceRegistryHelpers::handleForService<DataProcessingStats>(&
stats));
67 r.registerService(ServiceRegistryHelpers::handleForService<DriverConfig const>(&
driverConfig));
68 r.registerService(ServiceRegistryHelpers::handleForService<DeviceState>(&
deviceState));
77static void BM_RelayMessageCreation(benchmark::State& state)
86 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
88 for (
auto _ :
state) {
89 fair::mq::MessagePtr header = transport->CreateMessage(
stack.size());
90 fair::mq::MessagePtr payload = transport->CreateMessage(1000);
91 memcpy(header->GetData(),
stack.data(),
stack.size());
99static void BM_RelaySingleSlot(benchmark::State& state)
102 InputSpec spec{
"clusters",
"TPC",
"CLUSTERS"};
104 std::vector<InputRoute> inputs = {
107 std::vector<ForwardRoute> forwards;
108 std::vector<InputChannelInfo> infos{1};
112 relayer.setPipelineLength(4);
123 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
127 std::vector<fair::mq::MessagePtr> inflightMessages;
128 inflightMessages.emplace_back(transport->CreateMessage(
stack.size()));
129 inflightMessages.emplace_back(transport->CreateMessage(1000));
130 memcpy(inflightMessages[0]->GetData(),
stack.data(),
stack.size());
133 for (
auto _ :
state) {
134 relayer.relay(inflightMessages[0]->GetData(), inflightMessages.data(), fakeInfo, inflightMessages.size());
135 std::vector<RecordAction> ready;
136 relayer.getReadyToProcess(ready);
137 assert(ready.size() == 1);
138 assert(ready[0].slot.index == 0);
139 assert(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
140 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
141 assert(
result.size() == 1);
143 inflightMessages = std::move(
result[0]);
150static void BM_RelayMultipleSlots(benchmark::State& state)
153 InputSpec spec{
"clusters",
"TPC",
"CLUSTERS"};
155 std::vector<InputRoute> inputs = {
158 std::vector<ForwardRoute> forwards;
159 std::vector<InputChannelInfo> infos{1};
164 relayer.setPipelineLength(4);
173 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
174 size_t timeslice = 0;
177 Stack placeholder{dh, dph};
182 std::vector<fair::mq::MessagePtr> inflightMessages;
183 inflightMessages.emplace_back(transport->CreateMessage(placeholder.size()));
184 inflightMessages.emplace_back(transport->CreateMessage(1000));
186 for (
auto _ :
state) {
188 memcpy(inflightMessages[0]->GetData(),
stack.data(),
stack.size());
191 relayer.relay(inflightMessages[0]->GetData(), inflightMessages.data(), fakeInfo, inflightMessages.size());
192 std::vector<RecordAction> ready;
193 relayer.getReadyToProcess(ready);
194 assert(ready.size() == 1);
195 assert(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
196 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
197 assert(
result.size() == 1);
199 inflightMessages = std::move(
result[0]);
206static void BM_RelayMultipleRoutes(benchmark::State& state)
209 InputSpec spec1{
"clusters",
"TPC",
"CLUSTERS"};
210 InputSpec spec2{
"tracks",
"TPC",
"TRACKS"};
212 std::vector<InputRoute> inputs = {
216 std::vector<ForwardRoute> forwards;
217 std::vector<InputChannelInfo> infos{1};
222 relayer.setPipelineLength(4);
236 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
237 size_t timeslice = 0;
240 Stack stack1{dh1, dph1};
242 std::vector<fair::mq::MessagePtr> inflightMessages;
243 inflightMessages.emplace_back(transport->CreateMessage(stack1.size()));
244 inflightMessages.emplace_back(transport->CreateMessage(1000));
246 memcpy(inflightMessages[0]->GetData(), stack1.data(), stack1.size());
249 Stack stack2{dh2, dph2};
251 inflightMessages.emplace_back(transport->CreateMessage(stack2.size()));
252 inflightMessages.emplace_back(transport->CreateMessage(1000));
254 memcpy(inflightMessages[2]->GetData(), stack2.data(), stack2.size());
256 for (
auto _ :
state) {
258 relayer.relay(inflightMessages[0]->GetData(), &inflightMessages[0], fakeInfo, 2);
259 std::vector<RecordAction> ready;
260 relayer.getReadyToProcess(ready);
261 assert(ready.size() == 1);
262 assert(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
265 relayer.relay(inflightMessages[2]->GetData(), &inflightMessages[2], fakeInfo2, 2);
267 relayer.getReadyToProcess(ready);
268 assert(ready.size() == 1);
269 assert(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
270 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
271 assert(
result.size() == 2);
274 inflightMessages = std::move(
result[0]);
275 inflightMessages.emplace_back(std::move(
result[1][0]));
276 inflightMessages.emplace_back(std::move(
result[1][1]));
283static void BM_RelaySplitParts(benchmark::State& state)
286 InputSpec spec1{
"clusters",
"TPC",
"CLUSTERS"};
288 std::vector<InputRoute> inputs = {
292 std::vector<ForwardRoute> forwards;
293 std::vector<InputChannelInfo> infos{1};
298 relayer.setPipelineLength(4);
308 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
309 size_t timeslice = 0;
310 const int nSplitParts =
state.range(0);
312 std::vector<std::unique_ptr<fair::mq::Message>> inflightMessages;
313 inflightMessages.reserve(2 * nSplitParts);
315 for (
size_t i = 0;
i < nSplitParts; ++
i) {
321 fair::mq::MessagePtr header = transport->CreateMessage(
stack.size());
322 fair::mq::MessagePtr payload = transport->CreateMessage(dh.
payloadSize);
324 memcpy(header->GetData(),
stack.data(),
stack.size());
325 inflightMessages.emplace_back(std::move(header));
326 inflightMessages.emplace_back(std::move(payload));
330 for (
auto _ :
state) {
331 relayer.relay(inflightMessages[0]->GetData(), inflightMessages.data(), fakeInfo, inflightMessages.size());
332 std::vector<RecordAction> ready;
333 relayer.getReadyToProcess(ready);
334 assert(ready.size() == 1);
335 assert(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
336 inflightMessages = std::move(relayer.consumeAllInputsForTimeslice(ready[0].slot)[0]);
340BENCHMARK(BM_RelaySplitParts)->Arg(10)->Arg(100)->Arg(1000);
342static void BM_RelayMultiplePayloads(benchmark::State& state)
345 InputSpec spec1{
"clusters",
"TPC",
"CLUSTERS"};
347 std::vector<InputRoute> inputs = {
351 std::vector<ForwardRoute> forwards;
352 std::vector<InputChannelInfo> infos{1};
366 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
367 size_t timeslice = 0;
368 const int nPayloads =
state.range(0);
369 std::vector<std::unique_ptr<fair::mq::Message>> inflightMessages;
370 inflightMessages.reserve(nPayloads + 1);
376 fair::mq::MessagePtr header = transport->CreateMessage(
stack.size());
377 memcpy(header->GetData(),
stack.data(),
stack.size());
378 inflightMessages.emplace_back(std::move(header));
379 for (
size_t i = 0;
i < nPayloads; ++
i) {
380 inflightMessages.emplace_back(transport->CreateMessage(dh.
payloadSize));
384 for (
auto _ :
state) {
385 relayer.
relay(inflightMessages[0]->GetData(), inflightMessages.data(), fakeInfo, inflightMessages.size(), nPayloads);
386 std::vector<RecordAction> ready;
388 assert(ready.size() == 1);
389 assert(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
394BENCHMARK(BM_RelayMultiplePayloads)->Arg(10)->Arg(100)->Arg(1000);
o2::monitoring::Monitoring Monitoring
BENCHMARK(BM_RelayMessageCreation)
void getReadyToProcess(std::vector< RecordAction > &completed)
void setPipelineLength(size_t s)
Tune the maximum number of in flight timeslices this can handle.
std::vector< std::vector< fair::mq::MessagePtr > > consumeAllInputsForTimeslice(TimesliceSlot id)
RelayChoice relay(void const *rawHeader, std::unique_ptr< fair::mq::Message > *messages, InputInfo const &info, size_t nMessages, size_t nPayloads=1, OnInsertionCallback onInsertion=nullptr, OnDropCallback onDrop=nullptr)
Defining PrimaryVertex explicitly as messageable.
const DriverConfig driverConfig
DataProcessingStats stats
static constexpr int INVALID
static CompletionPolicy consumeWhenAny(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
Helper struct to hold statistics about the data processing happening.
void registerMetric(MetricSpec const &spec)
Running state information of a given device.
bool batch
Whether the driver was started in batch mode or not.
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, Salt salt, char const *name=nullptr, ServiceRegistry::SpecIndex specIndex=SpecIndex{-1}) const
static std::function< int64_t(int64_t base, int64_t offset)> defaultCPUTimeConfigurator(uv_loop_t *loop)
static std::function< void(int64_t &base, int64_t &offset)> defaultRealtimeBaseConfigurator(uint64_t offset, uv_loop_t *loop)