Project
Loading...
Searching...
No Matches
benchmark_DataRelayer.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include <benchmark/benchmark.h>
12
13#include "Headers/DataHeader.h"
14#include "Headers/Stack.h"
25#include <Monitoring/Monitoring.h>
26#include <fairmq/TransportFactory.h>
27#include <cstring>
28#include <vector>
29#include <uv.h>
30
31using Monitoring = o2::monitoring::Monitoring;
32using namespace o2::framework;
36
49
51 {
52 using MetricSpec = DataProcessingStats::MetricSpec;
53 int quickUpdateInterval = 1;
54 std::vector<MetricSpec> specs{
55 MetricSpec{.name = "malformed_inputs", .metricId = static_cast<short>(ProcessingStatsId::MALFORMED_INPUTS), .minPublishInterval = quickUpdateInterval},
56 MetricSpec{.name = "dropped_computations", .metricId = static_cast<short>(ProcessingStatsId::DROPPED_COMPUTATIONS), .minPublishInterval = quickUpdateInterval},
57 MetricSpec{.name = "dropped_incoming_messages", .metricId = static_cast<short>(ProcessingStatsId::DROPPED_INCOMING_MESSAGES), .minPublishInterval = quickUpdateInterval},
58 MetricSpec{.name = "relayed_messages", .metricId = static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), .minPublishInterval = quickUpdateInterval}};
59 for (auto& spec : specs) {
61 }
62
64 r.registerService(ServiceRegistryHelpers::handleForService<Monitoring>(&monitoring));
65 r.registerService(ServiceRegistryHelpers::handleForService<DataProcessingStates>(&states));
66 r.registerService(ServiceRegistryHelpers::handleForService<DataProcessingStats>(&stats));
67 r.registerService(ServiceRegistryHelpers::handleForService<DriverConfig const>(&driverConfig));
68 r.registerService(ServiceRegistryHelpers::handleForService<DeviceState>(&deviceState));
69 return r;
70 }
71};
72
73// a simple benchmark of the contribution of the pure message creation
74// this was important when the benchmarks below included the message
75// creation inside the benchmark loop, its somewhat obsolete now but
76// we keep it for reference
77static void BM_RelayMessageCreation(benchmark::State& state)
78{
79 DataHeader dh;
80 dh.dataDescription = "CLUSTERS";
81 dh.dataOrigin = "TPC";
82 dh.subSpecification = 0;
83
84 DataProcessingHeader dph{0, 1};
85 Stack stack{dh, dph};
86 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
87
88 for (auto _ : state) {
89 fair::mq::MessagePtr header = transport->CreateMessage(stack.size());
90 fair::mq::MessagePtr payload = transport->CreateMessage(1000);
91 memcpy(header->GetData(), stack.data(), stack.size());
92 }
93}
94
95BENCHMARK(BM_RelayMessageCreation);
96
97// A simple test where an input is provided
98// and the subsequent InputRecord is immediately requested.
99static void BM_RelaySingleSlot(benchmark::State& state)
100{
101 BenchmarkServices services;
102 InputSpec spec{"clusters", "TPC", "CLUSTERS"};
103
104 std::vector<InputRoute> inputs = {
105 InputRoute{spec, 0, "Fake", 0}};
106
107 std::vector<ForwardRoute> forwards;
108 std::vector<InputChannelInfo> infos{1};
109 TimesliceIndex index{1, infos};
111 DataRelayer relayer(policy, inputs, index, services.ref(), -1);
112 relayer.setPipelineLength(4);
113
114 // Let's create a dummy O2 Message with two headers in the stack:
115 // - DataHeader matching the one provided in the input
116 DataHeader dh;
117 dh.dataDescription = "CLUSTERS";
118 dh.dataOrigin = "TPC";
119 dh.subSpecification = 0;
120
121 DataProcessingHeader dph{0, 1};
122 Stack stack{dh, dph};
123 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
124 // we are creating the inflight messages once outside the benchmark
125 // loop and make sure that they are moved back to the original vector
126 // when processed by the relayer
127 std::vector<fair::mq::MessagePtr> inflightMessages;
128 inflightMessages.emplace_back(transport->CreateMessage(stack.size()));
129 inflightMessages.emplace_back(transport->CreateMessage(1000));
130 memcpy(inflightMessages[0]->GetData(), stack.data(), stack.size());
131
132 DataRelayer::InputInfo fakeInfo{0, inflightMessages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
133 for (auto _ : state) {
134 relayer.relay(inflightMessages[0]->GetData(), inflightMessages.data(), fakeInfo, inflightMessages.size());
135 std::vector<RecordAction> ready;
136 relayer.getReadyToProcess(ready);
137 assert(ready.size() == 1);
138 assert(ready[0].slot.index == 0);
139 assert(ready[0].op == CompletionPolicy::CompletionOp::Consume);
140 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
141 assert(result.size() == 1);
142 assert((result.at(0) | count_parts{}) == 1);
143 inflightMessages = std::move(result[0]);
144 }
145}
146
147BENCHMARK(BM_RelaySingleSlot);
148
149// This one will simulate a single input.
150static void BM_RelayMultipleSlots(benchmark::State& state)
151{
152 BenchmarkServices services;
153 InputSpec spec{"clusters", "TPC", "CLUSTERS"};
154
155 std::vector<InputRoute> inputs = {
156 InputRoute{spec, 0, "Fake", 0}};
157
158 std::vector<ForwardRoute> forwards;
159 std::vector<InputChannelInfo> infos{1};
160 TimesliceIndex index{1, infos};
161
163 DataRelayer relayer(policy, inputs, index, services.ref(), -1);
164 relayer.setPipelineLength(4);
165
166 // Let's create a dummy O2 Message with two headers in the stack:
167 // - DataHeader matching the one provided in the input
168 DataHeader dh;
169 dh.dataDescription = "CLUSTERS";
170 dh.dataOrigin = "TPC";
171 dh.subSpecification = 0;
172
173 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
174 size_t timeslice = 0;
175
176 DataProcessingHeader dph{timeslice, 1};
177 Stack placeholder{dh, dph};
178
179 // we are creating the inflight messages once outside the benchmark
180 // loop and make sure that they are moved back to the original vector
181 // when processed by the relayer
182 std::vector<fair::mq::MessagePtr> inflightMessages;
183 inflightMessages.emplace_back(transport->CreateMessage(placeholder.size()));
184 inflightMessages.emplace_back(transport->CreateMessage(1000));
185
186 for (auto _ : state) {
187 Stack stack{dh, DataProcessingHeader{timeslice++, 1}};
188 memcpy(inflightMessages[0]->GetData(), stack.data(), stack.size());
189
190 DataRelayer::InputInfo fakeInfo{0, inflightMessages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
191 relayer.relay(inflightMessages[0]->GetData(), inflightMessages.data(), fakeInfo, inflightMessages.size());
192 std::vector<RecordAction> ready;
193 relayer.getReadyToProcess(ready);
194 assert(ready.size() == 1);
195 assert(ready[0].op == CompletionPolicy::CompletionOp::Consume);
196 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
197 assert(result.size() == 1);
198 assert((result.at(0) | count_parts{}) == 1);
199 inflightMessages = std::move(result[0]);
200 }
201}
202
203BENCHMARK(BM_RelayMultipleSlots);
204
206static void BM_RelayMultipleRoutes(benchmark::State& state)
207{
208 BenchmarkServices services;
209 InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
210 InputSpec spec2{"tracks", "TPC", "TRACKS"};
211
212 std::vector<InputRoute> inputs = {
213 InputRoute{spec1, 0, "Fake1", 0},
214 InputRoute{spec2, 1, "Fake2", 0}};
215
216 std::vector<ForwardRoute> forwards;
217 std::vector<InputChannelInfo> infos{1};
218 TimesliceIndex index{1, infos};
219
221 DataRelayer relayer(policy, inputs, index, services.ref(), -1);
222 relayer.setPipelineLength(4);
223
224 // Let's create a dummy O2 Message with two headers in the stack:
225 // - DataHeader matching the one provided in the input
226 DataHeader dh1;
227 dh1.dataDescription = "CLUSTERS";
228 dh1.dataOrigin = "TPC";
229 dh1.subSpecification = 0;
230
231 DataHeader dh2;
232 dh2.dataDescription = "TRACKS";
233 dh2.dataOrigin = "TPC";
234 dh2.subSpecification = 0;
235
236 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
237 size_t timeslice = 0;
238
239 DataProcessingHeader dph1{timeslice, 1};
240 Stack stack1{dh1, dph1};
241
242 std::vector<fair::mq::MessagePtr> inflightMessages;
243 inflightMessages.emplace_back(transport->CreateMessage(stack1.size()));
244 inflightMessages.emplace_back(transport->CreateMessage(1000));
245
246 memcpy(inflightMessages[0]->GetData(), stack1.data(), stack1.size());
247
248 DataProcessingHeader dph2{timeslice, 1};
249 Stack stack2{dh2, dph2};
250
251 inflightMessages.emplace_back(transport->CreateMessage(stack2.size()));
252 inflightMessages.emplace_back(transport->CreateMessage(1000));
253
254 memcpy(inflightMessages[2]->GetData(), stack2.data(), stack2.size());
255
256 for (auto _ : state) {
257 DataRelayer::InputInfo fakeInfo{0, inflightMessages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
258 relayer.relay(inflightMessages[0]->GetData(), &inflightMessages[0], fakeInfo, 2);
259 std::vector<RecordAction> ready;
260 relayer.getReadyToProcess(ready);
261 assert(ready.size() == 1);
262 assert(ready[0].op == CompletionPolicy::CompletionOp::Consume);
263
264 DataRelayer::InputInfo fakeInfo2{0, inflightMessages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
265 relayer.relay(inflightMessages[2]->GetData(), &inflightMessages[2], fakeInfo2, 2);
266 ready.clear();
267 relayer.getReadyToProcess(ready);
268 assert(ready.size() == 1);
269 assert(ready[0].op == CompletionPolicy::CompletionOp::Consume);
270 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
271 assert(result.size() == 2);
272 assert((result.at(0) | count_parts{}) == 1);
273 assert((result.at(1) | count_parts{}) == 1);
274 inflightMessages = std::move(result[0]);
275 inflightMessages.emplace_back(std::move(result[1][0]));
276 inflightMessages.emplace_back(std::move(result[1][1]));
277 }
278}
279
280BENCHMARK(BM_RelayMultipleRoutes);
281
283static void BM_RelaySplitParts(benchmark::State& state)
284{
285 BenchmarkServices services;
286 InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
287
288 std::vector<InputRoute> inputs = {
289 InputRoute{spec1, 0, "Fake1", 0},
290 };
291
292 std::vector<ForwardRoute> forwards;
293 std::vector<InputChannelInfo> infos{1};
294 TimesliceIndex index{1, infos};
295
297 DataRelayer relayer(policy, inputs, index, services.ref(), -1);
298 relayer.setPipelineLength(4);
299
300 // Let's create a dummy O2 Message with two headers in the stack:
301 // - DataHeader matching the one provided in the input
302 DataHeader dh;
303 dh.dataDescription = "CLUSTERS";
304 dh.dataOrigin = "TPC";
305 dh.subSpecification = 0;
306 dh.payloadSize = 100;
307
308 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
309 size_t timeslice = 0;
310 const int nSplitParts = state.range(0);
311
312 std::vector<std::unique_ptr<fair::mq::Message>> inflightMessages;
313 inflightMessages.reserve(2 * nSplitParts);
314
315 for (size_t i = 0; i < nSplitParts; ++i) {
316 DataProcessingHeader dph{timeslice, 1};
317 dh.splitPayloadIndex = i;
318 dh.splitPayloadParts = nSplitParts;
319 Stack stack{dh, dph};
320
321 fair::mq::MessagePtr header = transport->CreateMessage(stack.size());
322 fair::mq::MessagePtr payload = transport->CreateMessage(dh.payloadSize);
323
324 memcpy(header->GetData(), stack.data(), stack.size());
325 inflightMessages.emplace_back(std::move(header));
326 inflightMessages.emplace_back(std::move(payload));
327 }
328
329 DataRelayer::InputInfo fakeInfo{0, inflightMessages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
330 for (auto _ : state) {
331 relayer.relay(inflightMessages[0]->GetData(), inflightMessages.data(), fakeInfo, inflightMessages.size());
332 std::vector<RecordAction> ready;
333 relayer.getReadyToProcess(ready);
334 assert(ready.size() == 1);
335 assert(ready[0].op == CompletionPolicy::CompletionOp::Consume);
336 inflightMessages = std::move(relayer.consumeAllInputsForTimeslice(ready[0].slot)[0]);
337 }
338}
339
340BENCHMARK(BM_RelaySplitParts)->Arg(10)->Arg(100)->Arg(1000);
341
342static void BM_RelayMultiplePayloads(benchmark::State& state)
343{
344 BenchmarkServices services;
345 InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
346
347 std::vector<InputRoute> inputs = {
348 InputRoute{spec1, 0, "Fake1", 0},
349 };
350
351 std::vector<ForwardRoute> forwards;
352 std::vector<InputChannelInfo> infos{1};
353 TimesliceIndex index{1, infos};
354
356 DataRelayer relayer(policy, inputs, index, services.ref(), -1);
357 relayer.setPipelineLength(4);
358
359 // DataHeader matching the one provided in the input
360 DataHeader dh;
361 dh.dataDescription = "CLUSTERS";
362 dh.dataOrigin = "TPC";
363 dh.subSpecification = 0;
364 dh.payloadSize = 100;
365
366 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
367 size_t timeslice = 0;
368 const int nPayloads = state.range(0);
369 std::vector<std::unique_ptr<fair::mq::Message>> inflightMessages;
370 inflightMessages.reserve(nPayloads + 1);
371
372 DataProcessingHeader dph{timeslice, 1};
373 dh.splitPayloadIndex = nPayloads;
374 dh.splitPayloadParts = nPayloads;
375 Stack stack{dh, dph};
376 fair::mq::MessagePtr header = transport->CreateMessage(stack.size());
377 memcpy(header->GetData(), stack.data(), stack.size());
378 inflightMessages.emplace_back(std::move(header));
379 for (size_t i = 0; i < nPayloads; ++i) {
380 inflightMessages.emplace_back(transport->CreateMessage(dh.payloadSize));
381 }
382
383 DataRelayer::InputInfo fakeInfo{0, inflightMessages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
384 for (auto _ : state) {
385 relayer.relay(inflightMessages[0]->GetData(), inflightMessages.data(), fakeInfo, inflightMessages.size(), nPayloads);
386 std::vector<RecordAction> ready;
387 relayer.getReadyToProcess(ready);
388 assert(ready.size() == 1);
389 assert(ready[0].op == CompletionPolicy::CompletionOp::Consume);
390 inflightMessages = std::move(relayer.consumeAllInputsForTimeslice(ready[0].slot)[0]);
391 }
392}
393
394BENCHMARK(BM_RelayMultiplePayloads)->Arg(10)->Arg(100)->Arg(1000);
395
benchmark::State & state
int32_t i
uint32_t op
uint32_t stack
Definition RawData.h:1
BENCHMARK_MAIN()
o2::monitoring::Monitoring Monitoring
BENCHMARK(BM_RelayMessageCreation)
void getReadyToProcess(std::vector< RecordAction > &completed)
void setPipelineLength(size_t s)
Tune the maximum number of in flight timeslices this can handle.
std::vector< std::vector< fair::mq::MessagePtr > > consumeAllInputsForTimeslice(TimesliceSlot id)
RelayChoice relay(void const *rawHeader, std::unique_ptr< fair::mq::Message > *messages, InputInfo const &info, size_t nMessages, size_t nPayloads=1, OnInsertionCallback onInsertion=nullptr, OnDropCallback onDrop=nullptr)
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint index
Definition glcorearb.h:781
GLboolean r
Definition glcorearb.h:1233
GLuint * states
Definition glcorearb.h:4932
Defining PrimaryVertex explicitly as messageable.
Definition Cartesian.h:288
const DriverConfig driverConfig
ServiceRegistryRef ref()
DataProcessingStats stats
static constexpr int INVALID
static CompletionPolicy consumeWhenAny(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
Helper struct to hold statistics about the data processing happening.
void registerMetric(MetricSpec const &spec)
Running state information of a given device.
Definition DeviceState.h:34
bool batch
Whether the driver was started in batch mode or not.
void registerService(ServiceTypeHash typeHash, void *service, ServiceKind kind, Salt salt, char const *name=nullptr, ServiceRegistry::SpecIndex specIndex=SpecIndex{-1}) const
static std::function< int64_t(int64_t base, int64_t offset)> defaultCPUTimeConfigurator(uv_loop_t *loop)
static std::function< void(int64_t &base, int64_t &offset)> defaultRealtimeBaseConfigurator(uint64_t offset, uv_loop_t *loop)
the main header struct
Definition DataHeader.h:620
SplitPayloadPartsType splitPayloadParts
Definition DataHeader.h:648
DataDescription dataDescription
Definition DataHeader.h:638
SubSpecificationType subSpecification
Definition DataHeader.h:658
PayloadSizeType payloadSize
Definition DataHeader.h:668
SplitPayloadIndexType splitPayloadIndex
Definition DataHeader.h:663
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:33