91 auto bypassProxies = readConfig<ProxyBypass>(config,
"bypass-proxies");
93 std::string defaultTransportConfig = config.
options().
get<std::string>(
"default-transport");
94 if (defaultTransportConfig ==
"zeromq") {
96 }
else if (defaultTransportConfig ==
"shmem") {
99 throw std::runtime_error(
"invalid argument for option --default-transport : '" + defaultTransportConfig +
"'");
101 std::vector<DataProcessorSpec> workflow;
103 struct BenchmarkState {
104 size_t logPeriod = 2;
105 size_t runningTime = 30;
106 size_t eventCount = 0;
107 size_t totalEventCount = 0;
110 size_t totalMsgCount = 0;
111 size_t totalMsgSize = 0;
112 benchclock::time_point startTime = benchclock::now();
113 benchclock::time_point idleTime = benchclock::now();
114 benchclock::time_point lastLogTime = benchclock::now();
115 float maxMsgPerSec = .0;
116 float maxDataRatePerSec = .0;
117 float totalIdleTime = .0;
118 float maxIdleTime = .0;
121 auto makeBenchmarkState = [&config]() -> std::shared_ptr<BenchmarkState> {
122 auto state = std::make_shared<BenchmarkState>();
127 auto loggerInit = [](BenchmarkState&
state) {
128 state.startTime = benchclock::now();
129 state.idleTime = benchclock::now();
130 state.lastLogTime = benchclock::now();
131 state.totalIdleTime = 0.;
134 auto loggerCycle = [](BenchmarkState&
state,
size_t msgCount,
size_t msgSize) {
136 state.msgCount += msgCount;
137 state.msgSize += msgSize;
138 auto secSinceLastLog = std::chrono::duration_cast<std::chrono::seconds>(benchclock::now() -
state.lastLogTime);
139 if (secSinceLastLog.count() >=
state.logPeriod) {
144 float eventRate =
state.eventCount / secSinceLastLog.count();
145 float msgPerSec =
state.msgCount / secSinceLastLog.count();
146 float kbPerSec =
state.msgSize / (1024 * secSinceLastLog.count());
147 auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(benchclock::now() -
state.startTime);
148 LOG(info) << fmt::format(
149 "{: 3d}s: Total messages: {} - Event rate {:.2f} Hz {:.2f} msg/s {:.2f} MB/s, "
150 "Accumulated idle time {:.2f} ms",
151 elapsedTime.count(),
state.totalEventCount, eventRate, msgPerSec,
152 kbPerSec / 1024,
state.totalIdleTime / 1000);
153 if (
state.maxMsgPerSec < msgPerSec) {
154 state.maxMsgPerSec = msgPerSec;
156 if (
state.maxDataRatePerSec < kbPerSec) {
157 state.maxDataRatePerSec = kbPerSec;
159 state.eventCount = 0;
162 state.lastLogTime = benchclock::now();
167 ActiveGuard(BenchmarkState& _state) :
state(_state)
169 auto idleTime = std::chrono::duration_cast<std::chrono::microseconds>(benchclock::now() -
state.idleTime);
170 state.totalIdleTime += idleTime.count();
174 state.idleTime = benchclock::now();
176 BenchmarkState&
state;
179 auto loggerSummary = [](BenchmarkState&
state) {
180 auto totalTime = std::chrono::duration_cast<std::chrono::seconds>(benchclock::now() -
state.startTime);
181 if (totalTime.count() == 0 ||
state.totalEventCount == 0) {
184 float eventRate =
state.totalEventCount / totalTime.count();
185 float msgPerSec =
state.totalMsgCount / totalTime.count();
186 float kbPerSec =
state.totalMsgSize / (1024 * totalTime.count());
187 LOG(info) << fmt::format(
192 "for {} s: Avrg event rate {:.2f} Hz, "
193 "Avrg message rate {:.2f}/s (max {:.2f}/s), Avrg data rate {:.2f} MB/s (max {:.2f} MB/s), "
194 "Avrg idle time {:.2f} ms",
195 totalTime.count(), eventRate,
196 msgPerSec,
state.maxMsgPerSec,
197 kbPerSec / 1024,
state.maxDataRatePerSec / 1024,
198 state.totalIdleTime / (
state.totalEventCount * 1000));
201 struct ProducerAttributes {
209 size_t msgSize = 1024 * 1024;
211 size_t splitPayloadSize = 1;
212 size_t iteration = 0;
213 std::string channelName;
215 ProxyBypass bypassProxies = ProxyBypass::None;
225 externalChannelSpec.
name =
"downstream";
226 externalChannelSpec.
type = ChannelType::Push;
227 externalChannelSpec.
method = ChannelMethod::Bind;
228 externalChannelSpec.
hostname =
"localhost";
229 externalChannelSpec.
port = 42042;
234 if (!defaultTransportConfig.empty()) {
235 if (defaultTransportConfig ==
"zeromq") {
236 externalChannelSpec.
protocol = ChannelProtocol::Network;
237 }
else if (defaultTransportConfig ==
"shmem") {
238 externalChannelSpec.
protocol = ChannelProtocol::IPC;
243 if (!defaultTransportConfig.empty() && defaultTransportConfig.find(
"transport=") == std::string::npos) {
244 channelConfig +=
",transport=" + defaultTransportConfig;
251 auto pState = makeBenchmarkState();
252 auto attributes = std::make_shared<ProducerAttributes>();
253 if (bypassProxies == ProxyBypass::Output) {
255 attributes->channelName = externalChannelSpec.
name;
257 attributes->bypassProxies = bypassProxies;
259 auto producerInitCallback = [pState, loggerInit, loggerCycle, loggerSummary, attributes](
CallbackService& callbacks,
262 attributes->msgSize = 1024 * config.get<
int>(
"msgSize");
263 attributes->splitPayloadSize = config.get<
int>(
"splitPayloadSize");
264 auto producerBenchInit = [pState, loggerInit, attributes, outputRoutes = rds.spec().outputs]() {
267 if (attributes->channelName.empty()) {
269 for (
auto& route : outputRoutes) {
271 attributes->channelName = route.channel;
279 callbacks.set<CallbackService::Id::Start>(producerBenchInit);
282 auto&
state = *pState;
286 auto transport = device.GetChannel(attributes->channelName, 0).Transport();
287 auto channelAlloc = o2::pmr::getTransportAllocator(transport);
290 fair::mq::Parts messages;
292 size_t totalPayload = 0;
293 size_t allocatedSize = 0;
294 auto createMessage = [&transport, &allocatedSize](
size_t size) -> fair::mq::MessagePtr {
295 auto msg = transport->CreateMessage(
size);
296 allocatedSize +=
size;
299 auto insertHeader = [&dph, &createMessage, &messages, &nHeaders](
DataHeader const& dh) ->
void {
301 fair::mq::MessagePtr header = createMessage(
stack.size());
302 memcpy(header->GetData(),
stack.data(),
stack.size());
303 messages.AddPart(std::move(header));
306 auto insertPayload = [&createMessage, &messages, &totalPayload](
size_t size) ->
void {
307 fair::mq::MessagePtr payload = createMessage(
size);
308 messages.AddPart(std::move(payload));
309 totalPayload +=
size;
311 auto createSequence = [&attributes, &insertHeader, &insertPayload](
size_t nPayloads,
DataHeader dh) ->
void {
314 dh.payloadSize = attributes->msgSize;
316 dh.splitPayloadIndex = nPayloads;
317 dh.splitPayloadParts = nPayloads;
320 for (
size_t i = 0;
i < nPayloads; ++
i) {
321 insertPayload(dh.payloadSize);
325 auto createPairs = [&attributes, &insertHeader, &insertPayload](
size_t nPayloads,
DataHeader dh) ->
void {
328 dh.payloadSize = attributes->msgSize;
330 dh.splitPayloadIndex = 0;
331 dh.splitPayloadParts = nPayloads;
332 for (
size_t i = 0;
i < nPayloads; ++
i) {
333 dh.splitPayloadIndex =
i;
335 insertPayload(dh.payloadSize);
339 bool forcedTermination =
false;
341 if (attributes->mode == ProducerAttributes::Mode::Transport) {
342 for (
unsigned int i = 0;
i < attributes->nChannels;
i++) {
343 createPairs(attributes->splitPayloadSize,
DataHeader{
"DATA",
"TST", i});
348 for (
unsigned int i = 0;
i < attributes->nChannels;
i++) {
349 outputs.make<
char>(
OutputRef{
"data",
i}, attributes->msgSize);
352 }
catch (
const std::exception& e) {
355 if (std::string(e.what()).find(
"shmem: could not create a message of size") == std::string::npos) {
358 LOG(error) << fmt::format(
"Exception {}\nallocated {} in cycle {} \nconsider increasing shared memory", e.what(), allocatedSize, attributes->iteration);
359 forcedTermination =
true;
361 ++attributes->iteration;
362 loggerCycle(*pState, nHeaders, totalPayload);
363 auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(benchclock::now() -
state.startTime);
364 if (forcedTermination || elapsedTime.count() >=
state.runningTime) {
365 loggerSummary(*pState);
366 if (forcedTermination) {
367 LOG(error) <<
"termination was forced by earlier error";
371 control.endOfStream();
372 if (attributes->bypassProxies == ProxyBypass::Output) {
375 sih.
state = InputChannelState::Completed;
378 out.AddPart(std::move(headerMessage));
380 out.AddPart(std::move(device.NewMessageFor(attributes->channelName, 0, 0)));
391 outputs.emplace_back(
OutputSpec{{
"data"},
"TST",
"DATA",
i, Lifetime::Timeframe});
395 {std::move(outputs)},
397 {
ConfigParamSpec{
"splitPayloadSize", VariantType::Int, 1, {
"number of split payloads"}},
398 ConfigParamSpec{
"msgSize", VariantType::Int, 1024, {
"message size in kB"}}}});
400 if (bypassProxies == ProxyBypass::Output) {
402 const char* d = strdup(channelConfig.c_str());
403 workflow.back().options.push_back(
ConfigParamSpec{
"channel-config", VariantType::String, d, {
"proxy channel of producer"}});
411 sinkInputs.emplace_back(
InputSpec{{
"external"},
"TST",
"DATA",
i, Lifetime::Timeframe});
413 auto channelSelector = [](
InputSpec const&,
const std::unordered_map<std::string, std::vector<fair::mq::Channel>>&) -> std::string {
416 if (bypassProxies == ProxyBypass::None) {
424 auto cState = makeBenchmarkState();
425 auto checkerCallback = [cState, loggerCycle](
InputRecord& inputs) {
426 ActiveGuard
g(*cState);
427 LOG(
debug) <<
"got inputs " << inputs.size();
431 auto data = inputs.get<gsl::span<char>>(
ref);
433 msgSize +=
data.size();
435 loggerCycle(*cState, msgCount, msgSize);
437 auto checkerBenchInit = [cState, loggerInit]() {
441 loggerSummary(*cState);
443 auto checkerInit = [checkerCallback, checkerBenchInit, checkerBenchSummary](
CallbackService& callbacks) {
444 callbacks.set<CallbackService::Id::Start>(checkerBenchInit);
445 callbacks.set<CallbackService::Id::EndOfStream>(checkerBenchSummary);
451 if (bypassProxies != ProxyBypass::None) {
457 std::move(checkerInputs),
469 if (inputs.Size() < 2) {
473 auto dh = o2::header::get<o2::header::DataHeader*>(inputs.At(msgidx)->GetData());
475 LOG(error) <<
"data on input " << msgidx <<
" does not follow the O2 data model, DataHeader missing";
478 auto dph = o2::header::get<DataProcessingHeader*>(inputs.At(msgidx)->GetData());
480 LOG(error) <<
"data on input " << msgidx <<
" does not follow the O2 data model, DataProcessingHeader missing";
485 OutputSpec query{
"PRX", dh->dataDescription, dh->subSpecification};
486 auto const& channelName = channelRetriever(query, dph->startTime);
493 if (channelName.empty()) {
497 auto outHeaderMessage = device->NewMessageFor(channelName, 0, inputs.At(msgidx)->GetSize());
498 memcpy(outHeaderMessage->GetData(), inputs.At(msgidx)->GetData(), inputs.At(msgidx)->GetSize());
501 auto odh =
const_cast<o2::header::DataHeader*
>(o2::header::get<o2::header::DataHeader*>(outHeaderMessage->GetData()));
504 output.AddPart(std::move(outHeaderMessage));
505 output.AddPart(std::move(inputs.At(msgidx + 1)));
516 externalChannelSpec.
name =
"";
517 externalChannelSpec.
type = ChannelType::Pull;
518 externalChannelSpec.
method = ChannelMethod::Connect;
520 if (!defaultTransportConfig.empty() && defaultTransportConfig.find(
"transport=") == std::string::npos) {
521 channelConfig +=
",transport=" + defaultTransportConfig;
524 if (bypassProxies == ProxyBypass::None) {
530 std::move(inputProxyOutputs),
531 channelConfig.c_str(),
533 }
else if (bypassProxies == ProxyBypass::Output) {
536 auto filterSpecs = inputProxyOutputs;
539 std::move(inputProxyOutputs),
540 channelConfig.c_str(),