Project
Loading...
Searching...
No Matches
benchmark_DataRelayer.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include <benchmark/benchmark.h>
12
13#include "Headers/DataHeader.h"
14#include "Headers/Stack.h"
18#include <Monitoring/Monitoring.h>
19#include <fairmq/TransportFactory.h>
20#include <cstring>
21#include <vector>
22
23using Monitoring = o2::monitoring::Monitoring;
24using namespace o2::framework;
28
29// a simple benchmark of the contribution of the pure message creation
30// this was important when the benchmarks below included the message
31// creation inside the benchmark loop, its somewhat obsolete now but
32// we keep it for reference
33static void BM_RelayMessageCreation(benchmark::State& state)
34{
35 DataHeader dh;
36 dh.dataDescription = "CLUSTERS";
37 dh.dataOrigin = "TPC";
38 dh.subSpecification = 0;
39
40 DataProcessingHeader dph{0, 1};
41 Stack stack{dh, dph};
42 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
43
44 for (auto _ : state) {
45 fair::mq::MessagePtr header = transport->CreateMessage(stack.size());
46 fair::mq::MessagePtr payload = transport->CreateMessage(1000);
47 memcpy(header->GetData(), stack.data(), stack.size());
48 }
49}
50
51BENCHMARK(BM_RelayMessageCreation);
52
53// A simple test where an input is provided
54// and the subsequent InputRecord is immediately requested.
55static void BM_RelaySingleSlot(benchmark::State& state)
56{
58 InputSpec spec{"clusters", "TPC", "CLUSTERS"};
59
60 std::vector<InputRoute> inputs = {
61 InputRoute{spec, 0, "Fake", 0}};
62
63 std::vector<ForwardRoute> forwards;
64 std::vector<InputChannelInfo> infos{1};
65 TimesliceIndex index{1, infos};
67 ServiceRegistry registry;
68 DataRelayer relayer(policy, inputs, index, {registry});
69 relayer.setPipelineLength(4);
70
71 // Let's create a dummy O2 Message with two headers in the stack:
72 // - DataHeader matching the one provided in the input
73 DataHeader dh;
74 dh.dataDescription = "CLUSTERS";
75 dh.dataOrigin = "TPC";
76 dh.subSpecification = 0;
77
78 DataProcessingHeader dph{0, 1};
79 Stack stack{dh, dph};
80 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
81 // we are creating the inflight messages once outside the benchmark
82 // loop and make sure that they are moved back to the original vector
83 // when processed by the relayer
84 std::vector<fair::mq::MessagePtr> inflightMessages;
85 inflightMessages.emplace_back(transport->CreateMessage(stack.size()));
86 inflightMessages.emplace_back(transport->CreateMessage(1000));
87 memcpy(inflightMessages[0]->GetData(), stack.data(), stack.size());
88
89 DataRelayer::InputInfo fakeInfo{0, inflightMessages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
90 for (auto _ : state) {
91 relayer.relay(inflightMessages[0]->GetData(), inflightMessages.data(), fakeInfo, inflightMessages.size());
92 std::vector<RecordAction> ready;
93 relayer.getReadyToProcess(ready);
94 assert(ready.size() == 1);
95 assert(ready[0].slot.index == 0);
96 assert(ready[0].op == CompletionPolicy::CompletionOp::Consume);
97 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
98 assert(result.size() == 1);
99 assert(result.at(0).size() == 1);
100 inflightMessages = std::move(result[0].messages);
101 }
102}
103
104BENCHMARK(BM_RelaySingleSlot);
105
106// This one will simulate a single input.
107static void BM_RelayMultipleSlots(benchmark::State& state)
108{
110 InputSpec spec{"clusters", "TPC", "CLUSTERS"};
111
112 std::vector<InputRoute> inputs = {
113 InputRoute{spec, 0, "Fake", 0}};
114
115 std::vector<ForwardRoute> forwards;
116 std::vector<InputChannelInfo> infos{1};
117 TimesliceIndex index{1, infos};
118
120 ServiceRegistry registry;
121 DataRelayer relayer(policy, inputs, index, {registry});
122 relayer.setPipelineLength(4);
123
124 // Let's create a dummy O2 Message with two headers in the stack:
125 // - DataHeader matching the one provided in the input
126 DataHeader dh;
127 dh.dataDescription = "CLUSTERS";
128 dh.dataOrigin = "TPC";
129 dh.subSpecification = 0;
130
131 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
132 size_t timeslice = 0;
133
134 DataProcessingHeader dph{timeslice, 1};
135 Stack placeholder{dh, dph};
136
137 // we are creating the inflight messages once outside the benchmark
138 // loop and make sure that they are moved back to the original vector
139 // when processed by the relayer
140 std::vector<fair::mq::MessagePtr> inflightMessages;
141 inflightMessages.emplace_back(transport->CreateMessage(placeholder.size()));
142 inflightMessages.emplace_back(transport->CreateMessage(1000));
143
144 for (auto _ : state) {
145 Stack stack{dh, DataProcessingHeader{timeslice++, 1}};
146 memcpy(inflightMessages[0]->GetData(), stack.data(), stack.size());
147
148 DataRelayer::InputInfo fakeInfo{0, inflightMessages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
149 relayer.relay(inflightMessages[0]->GetData(), inflightMessages.data(), fakeInfo, inflightMessages.size());
150 std::vector<RecordAction> ready;
151 relayer.getReadyToProcess(ready);
152 assert(ready.size() == 1);
153 assert(ready[0].op == CompletionPolicy::CompletionOp::Consume);
154 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
155 assert(result.size() == 1);
156 assert(result.at(0).size() == 1);
157 inflightMessages = std::move(result[0].messages);
158 }
159}
160
161BENCHMARK(BM_RelayMultipleSlots);
162
164static void BM_RelayMultipleRoutes(benchmark::State& state)
165{
167 InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
168 InputSpec spec2{"tracks", "TPC", "TRACKS"};
169
170 std::vector<InputRoute> inputs = {
171 InputRoute{spec1, 0, "Fake1", 0},
172 InputRoute{spec2, 1, "Fake2", 0}};
173
174 std::vector<ForwardRoute> forwards;
175 std::vector<InputChannelInfo> infos{1};
176 TimesliceIndex index{1, infos};
177
179 ServiceRegistry registry;
180 DataRelayer relayer(policy, inputs, index, {registry});
181 relayer.setPipelineLength(4);
182
183 // Let's create a dummy O2 Message with two headers in the stack:
184 // - DataHeader matching the one provided in the input
185 DataHeader dh1;
186 dh1.dataDescription = "CLUSTERS";
187 dh1.dataOrigin = "TPC";
188 dh1.subSpecification = 0;
189
190 DataHeader dh2;
191 dh2.dataDescription = "TRACKS";
192 dh2.dataOrigin = "TPC";
193 dh2.subSpecification = 0;
194
195 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
196 size_t timeslice = 0;
197
198 DataProcessingHeader dph1{timeslice, 1};
199 Stack stack1{dh1, dph1};
200
201 std::vector<fair::mq::MessagePtr> inflightMessages;
202 inflightMessages.emplace_back(transport->CreateMessage(stack1.size()));
203 inflightMessages.emplace_back(transport->CreateMessage(1000));
204
205 memcpy(inflightMessages[0]->GetData(), stack1.data(), stack1.size());
206
207 DataProcessingHeader dph2{timeslice, 1};
208 Stack stack2{dh2, dph2};
209
210 inflightMessages.emplace_back(transport->CreateMessage(stack2.size()));
211 inflightMessages.emplace_back(transport->CreateMessage(1000));
212
213 memcpy(inflightMessages[2]->GetData(), stack2.data(), stack2.size());
214
215 for (auto _ : state) {
216 DataRelayer::InputInfo fakeInfo{0, inflightMessages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
217 relayer.relay(inflightMessages[0]->GetData(), &inflightMessages[0], fakeInfo, 2);
218 std::vector<RecordAction> ready;
219 relayer.getReadyToProcess(ready);
220 assert(ready.size() == 1);
221 assert(ready[0].op == CompletionPolicy::CompletionOp::Consume);
222
223 DataRelayer::InputInfo fakeInfo2{0, inflightMessages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
224 relayer.relay(inflightMessages[2]->GetData(), &inflightMessages[2], fakeInfo2, 2);
225 ready.clear();
226 relayer.getReadyToProcess(ready);
227 assert(ready.size() == 1);
228 assert(ready[0].op == CompletionPolicy::CompletionOp::Consume);
229 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
230 assert(result.size() == 2);
231 assert(result.at(0).size() == 1);
232 assert(result.at(1).size() == 1);
233 inflightMessages = std::move(result[0].messages);
234 inflightMessages.emplace_back(std::move(result[1].messages[0]));
235 inflightMessages.emplace_back(std::move(result[1].messages[1]));
236 }
237}
238
239BENCHMARK(BM_RelayMultipleRoutes);
240
242static void BM_RelaySplitParts(benchmark::State& state)
243{
245 InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
246
247 std::vector<InputRoute> inputs = {
248 InputRoute{spec1, 0, "Fake1", 0},
249 };
250
251 std::vector<ForwardRoute> forwards;
252 std::vector<InputChannelInfo> infos{1};
253 TimesliceIndex index{1, infos};
254
256 ServiceRegistry registry;
257 DataRelayer relayer(policy, inputs, index, {registry});
258 relayer.setPipelineLength(4);
259
260 // Let's create a dummy O2 Message with two headers in the stack:
261 // - DataHeader matching the one provided in the input
262 DataHeader dh;
263 dh.dataDescription = "CLUSTERS";
264 dh.dataOrigin = "TPC";
265 dh.subSpecification = 0;
266 dh.payloadSize = 100;
267
268 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
269 size_t timeslice = 0;
270 const int nSplitParts = state.range(0);
271
272 std::vector<std::unique_ptr<fair::mq::Message>> inflightMessages;
273 inflightMessages.reserve(2 * nSplitParts);
274
275 for (size_t i = 0; i < nSplitParts; ++i) {
276 DataProcessingHeader dph{timeslice, 1};
277 dh.splitPayloadIndex = i;
278 dh.splitPayloadParts = nSplitParts;
279 Stack stack{dh, dph};
280
281 fair::mq::MessagePtr header = transport->CreateMessage(stack.size());
282 fair::mq::MessagePtr payload = transport->CreateMessage(dh.payloadSize);
283
284 memcpy(header->GetData(), stack.data(), stack.size());
285 inflightMessages.emplace_back(std::move(header));
286 inflightMessages.emplace_back(std::move(payload));
287 }
288
289 DataRelayer::InputInfo fakeInfo{0, inflightMessages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
290 for (auto _ : state) {
291 relayer.relay(inflightMessages[0]->GetData(), inflightMessages.data(), fakeInfo, inflightMessages.size());
292 std::vector<RecordAction> ready;
293 relayer.getReadyToProcess(ready);
294 assert(ready.size() == 1);
295 assert(ready[0].op == CompletionPolicy::CompletionOp::Consume);
296 inflightMessages = std::move(relayer.consumeAllInputsForTimeslice(ready[0].slot)[0].messages);
297 }
298}
299
300BENCHMARK(BM_RelaySplitParts)->Arg(10)->Arg(100)->Arg(1000);
301
302static void BM_RelayMultiplePayloads(benchmark::State& state)
303{
305 InputSpec spec1{"clusters", "TPC", "CLUSTERS"};
306
307 std::vector<InputRoute> inputs = {
308 InputRoute{spec1, 0, "Fake1", 0},
309 };
310
311 std::vector<ForwardRoute> forwards;
312 std::vector<InputChannelInfo> infos{1};
313 TimesliceIndex index{1, infos};
314
316 ServiceRegistry registry;
317 DataRelayer relayer(policy, inputs, index, {registry});
318 relayer.setPipelineLength(4);
319
320 // DataHeader matching the one provided in the input
321 DataHeader dh;
322 dh.dataDescription = "CLUSTERS";
323 dh.dataOrigin = "TPC";
324 dh.subSpecification = 0;
325 dh.payloadSize = 100;
326
327 auto transport = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
328 size_t timeslice = 0;
329 const int nPayloads = state.range(0);
330 std::vector<std::unique_ptr<fair::mq::Message>> inflightMessages;
331 inflightMessages.reserve(nPayloads + 1);
332
333 DataProcessingHeader dph{timeslice, 1};
334 dh.splitPayloadIndex = nPayloads;
335 dh.splitPayloadParts = nPayloads;
336 Stack stack{dh, dph};
337 fair::mq::MessagePtr header = transport->CreateMessage(stack.size());
338 memcpy(header->GetData(), stack.data(), stack.size());
339 inflightMessages.emplace_back(std::move(header));
340 for (size_t i = 0; i < nPayloads; ++i) {
341 inflightMessages.emplace_back(transport->CreateMessage(dh.payloadSize));
342 }
343
344 DataRelayer::InputInfo fakeInfo{0, inflightMessages.size(), DataRelayer::InputType::Data, {ChannelIndex::INVALID}};
345 for (auto _ : state) {
346 relayer.relay(inflightMessages[0]->GetData(), inflightMessages.data(), fakeInfo, inflightMessages.size(), nPayloads);
347 std::vector<RecordAction> ready;
348 relayer.getReadyToProcess(ready);
349 assert(ready.size() == 1);
350 assert(ready[0].op == CompletionPolicy::CompletionOp::Consume);
351 inflightMessages = std::move(relayer.consumeAllInputsForTimeslice(ready[0].slot)[0].messages);
352 }
353}
354
355BENCHMARK(BM_RelayMultiplePayloads)->Arg(10)->Arg(100)->Arg(1000);
356
benchmark::State & state
int32_t i
uint32_t op
uint32_t stack
Definition RawData.h:1
BENCHMARK_MAIN()
o2::monitoring::Monitoring Monitoring
BENCHMARK(BM_RelayMessageCreation)
void setPipelineLength(size_t s)
Tune the maximum number of in flight timeslices this can handle.
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint index
Definition glcorearb.h:781
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
Definition glcorearb.h:5500
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
static constexpr int INVALID
static CompletionPolicy consumeWhenAny(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.
the main header struct
Definition DataHeader.h:618
SplitPayloadPartsType splitPayloadParts
Definition DataHeader.h:646
DataDescription dataDescription
Definition DataHeader.h:636
SubSpecificationType subSpecification
Definition DataHeader.h:656
PayloadSizeType payloadSize
Definition DataHeader.h:666
SplitPayloadIndexType splitPayloadIndex
Definition DataHeader.h:661
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36