11#include <benchmark/benchmark.h>
18#include <Monitoring/Monitoring.h>
19#include <fairmq/TransportFactory.h>
33static void BM_RelayMessageCreation(benchmark::State& state)
42 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
44 for (
auto _ :
state) {
45 fair::mq::MessagePtr header = transport->CreateMessage(
stack.size());
46 fair::mq::MessagePtr payload = transport->CreateMessage(1000);
47 memcpy(header->GetData(),
stack.data(),
stack.size());
55static void BM_RelaySingleSlot(benchmark::State& state)
58 InputSpec spec{
"clusters",
"TPC",
"CLUSTERS"};
60 std::vector<InputRoute> inputs = {
63 std::vector<ForwardRoute> forwards;
64 std::vector<InputChannelInfo> infos{1};
80 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
84 std::vector<fair::mq::MessagePtr> inflightMessages;
85 inflightMessages.emplace_back(transport->CreateMessage(
stack.size()));
86 inflightMessages.emplace_back(transport->CreateMessage(1000));
87 memcpy(inflightMessages[0]->GetData(),
stack.data(),
stack.size());
90 for (
auto _ :
state) {
91 relayer.relay(inflightMessages[0]->GetData(), inflightMessages.data(), fakeInfo, inflightMessages.size());
92 std::vector<RecordAction> ready;
93 relayer.getReadyToProcess(ready);
94 assert(ready.size() == 1);
95 assert(ready[0].slot.index == 0);
96 assert(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
97 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
98 assert(
result.size() == 1);
99 assert(
result.at(0).size() == 1);
100 inflightMessages = std::move(
result[0].messages);
107static void BM_RelayMultipleSlots(benchmark::State& state)
110 InputSpec spec{
"clusters",
"TPC",
"CLUSTERS"};
112 std::vector<InputRoute> inputs = {
115 std::vector<ForwardRoute> forwards;
116 std::vector<InputChannelInfo> infos{1};
131 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
132 size_t timeslice = 0;
135 Stack placeholder{dh, dph};
140 std::vector<fair::mq::MessagePtr> inflightMessages;
141 inflightMessages.emplace_back(transport->CreateMessage(placeholder.size()));
142 inflightMessages.emplace_back(transport->CreateMessage(1000));
144 for (
auto _ :
state) {
146 memcpy(inflightMessages[0]->GetData(),
stack.data(),
stack.size());
149 relayer.relay(inflightMessages[0]->GetData(), inflightMessages.data(), fakeInfo, inflightMessages.size());
150 std::vector<RecordAction> ready;
151 relayer.getReadyToProcess(ready);
152 assert(ready.size() == 1);
153 assert(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
154 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
155 assert(
result.size() == 1);
156 assert(
result.at(0).size() == 1);
157 inflightMessages = std::move(
result[0].messages);
164static void BM_RelayMultipleRoutes(benchmark::State& state)
167 InputSpec spec1{
"clusters",
"TPC",
"CLUSTERS"};
168 InputSpec spec2{
"tracks",
"TPC",
"TRACKS"};
170 std::vector<InputRoute> inputs = {
174 std::vector<ForwardRoute> forwards;
175 std::vector<InputChannelInfo> infos{1};
195 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
196 size_t timeslice = 0;
199 Stack stack1{dh1, dph1};
201 std::vector<fair::mq::MessagePtr> inflightMessages;
202 inflightMessages.emplace_back(transport->CreateMessage(stack1.size()));
203 inflightMessages.emplace_back(transport->CreateMessage(1000));
205 memcpy(inflightMessages[0]->GetData(), stack1.data(), stack1.size());
208 Stack stack2{dh2, dph2};
210 inflightMessages.emplace_back(transport->CreateMessage(stack2.size()));
211 inflightMessages.emplace_back(transport->CreateMessage(1000));
213 memcpy(inflightMessages[2]->GetData(), stack2.data(), stack2.size());
215 for (
auto _ :
state) {
217 relayer.relay(inflightMessages[0]->GetData(), &inflightMessages[0], fakeInfo, 2);
218 std::vector<RecordAction> ready;
219 relayer.getReadyToProcess(ready);
220 assert(ready.size() == 1);
221 assert(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
224 relayer.relay(inflightMessages[2]->GetData(), &inflightMessages[2], fakeInfo2, 2);
226 relayer.getReadyToProcess(ready);
227 assert(ready.size() == 1);
228 assert(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
229 auto result = relayer.consumeAllInputsForTimeslice(ready[0].slot);
230 assert(
result.size() == 2);
231 assert(
result.at(0).size() == 1);
232 assert(
result.at(1).size() == 1);
233 inflightMessages = std::move(
result[0].messages);
234 inflightMessages.emplace_back(std::move(
result[1].messages[0]));
235 inflightMessages.emplace_back(std::move(
result[1].messages[1]));
242static void BM_RelaySplitParts(benchmark::State& state)
245 InputSpec spec1{
"clusters",
"TPC",
"CLUSTERS"};
247 std::vector<InputRoute> inputs = {
251 std::vector<ForwardRoute> forwards;
252 std::vector<InputChannelInfo> infos{1};
268 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
269 size_t timeslice = 0;
270 const int nSplitParts =
state.range(0);
272 std::vector<std::unique_ptr<fair::mq::Message>> inflightMessages;
273 inflightMessages.reserve(2 * nSplitParts);
275 for (
size_t i = 0;
i < nSplitParts; ++
i) {
281 fair::mq::MessagePtr header = transport->CreateMessage(
stack.size());
282 fair::mq::MessagePtr payload = transport->CreateMessage(dh.
payloadSize);
284 memcpy(header->GetData(),
stack.data(),
stack.size());
285 inflightMessages.emplace_back(std::move(header));
286 inflightMessages.emplace_back(std::move(payload));
290 for (
auto _ :
state) {
291 relayer.relay(inflightMessages[0]->GetData(), inflightMessages.data(), fakeInfo, inflightMessages.size());
292 std::vector<RecordAction> ready;
293 relayer.getReadyToProcess(ready);
294 assert(ready.size() == 1);
295 assert(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
296 inflightMessages = std::move(relayer.consumeAllInputsForTimeslice(ready[0].slot)[0].messages);
300BENCHMARK(BM_RelaySplitParts)->Arg(10)->Arg(100)->Arg(1000);
302static void BM_RelayMultiplePayloads(benchmark::State& state)
305 InputSpec spec1{
"clusters",
"TPC",
"CLUSTERS"};
307 std::vector<InputRoute> inputs = {
311 std::vector<ForwardRoute> forwards;
312 std::vector<InputChannelInfo> infos{1};
327 auto transport = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
328 size_t timeslice = 0;
329 const int nPayloads =
state.range(0);
330 std::vector<std::unique_ptr<fair::mq::Message>> inflightMessages;
331 inflightMessages.reserve(nPayloads + 1);
337 fair::mq::MessagePtr header = transport->CreateMessage(
stack.size());
338 memcpy(header->GetData(),
stack.data(),
stack.size());
339 inflightMessages.emplace_back(std::move(header));
340 for (
size_t i = 0;
i < nPayloads; ++
i) {
341 inflightMessages.emplace_back(transport->CreateMessage(dh.
payloadSize));
345 for (
auto _ :
state) {
346 relayer.relay(inflightMessages[0]->GetData(), inflightMessages.data(), fakeInfo, inflightMessages.size(), nPayloads);
347 std::vector<RecordAction> ready;
348 relayer.getReadyToProcess(ready);
349 assert(ready.size() == 1);
350 assert(ready[0].
op == CompletionPolicy::CompletionOp::Consume);
351 inflightMessages = std::move(relayer.consumeAllInputsForTimeslice(ready[0].slot)[0].messages);
355BENCHMARK(BM_RelayMultiplePayloads)->Arg(10)->Arg(100)->Arg(1000);
o2::monitoring::Monitoring Monitoring
BENCHMARK(BM_RelayMessageCreation)
void setPipelineLength(size_t s)
Tune the maximum number of in flight timeslices this can handle.
GLsizei GLenum const void GLuint GLsizei GLfloat * metrics
Defining PrimaryVertex explicitly as messageable.
static constexpr int INVALID
static CompletionPolicy consumeWhenAny(const char *name, CompletionPolicy::Matcher matcher)
When any of the parts of the record have been received, consume them.