Project
Loading...
Searching...
No Matches
benchmark_ExternalFairMQDeviceProxies.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
12
13using namespace o2::framework;
14
25#include "Framework/Logger.h"
27#include "Headers/DataHeader.h"
28#include <fairmq/Channel.h>
29#include <fairmq/Device.h>
30#include <fairmq/Message.h>
31#include <fairmq/Parts.h>
32#include <chrono>
33#include <sstream>
34
36{
37enum struct ProxyBypass {
38 None,
39 All,
40 Output,
41};
42}
43std::istream& operator>>(std::istream& in, enum benchmark_config::ProxyBypass& val);
44std::ostream& operator<<(std::ostream& out, const enum benchmark_config::ProxyBypass& val);
45
46// we need to add workflow options before including Framework/runDataProcessing
47void customize(std::vector<ConfigParamSpec>& workflowOptions)
48{
49 workflowOptions.push_back(
51 "default-transport", VariantType::String, "shmem", {"default transport: shmem, zeromq"}});
52 workflowOptions.push_back(
54 "nChannels", VariantType::Int, 1, {"number of output channels of the producer"}});
55 workflowOptions.push_back(
57 "bypass-proxies", VariantType::String, "none", {"bypass proxies: none, all, output"}});
58 workflowOptions.push_back(
60 "runningTime", VariantType::Int, 30, {"time to run the workflow"}});
61}
62
64
65using namespace o2::framework;
68using benchclock = std::chrono::high_resolution_clock;
69
70#define ASSERT_ERROR(condition) \
71 if ((condition) == false) { \
72 LOG(fatal) << R"(Test condition ")" #condition R"(" failed)"; \
73 }
74
75template <typename T>
76T readConfig(ConfigContext const& config, const char* key)
77{
78 auto p = config.options().get<std::string>(key);
79 std::stringstream cs(p);
80 T val;
81 cs >> val;
82 if (cs.fail()) {
83 throw std::runtime_error("invalid configuration parameter '" + p + "' for key " + key);
84 }
85 return val;
86}
87
88std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)
89{
90 using ProxyBypass = benchmark_config::ProxyBypass;
91 auto bypassProxies = readConfig<ProxyBypass>(config, "bypass-proxies");
92 int nChannels = config.options().get<int>("nChannels");
93 std::string defaultTransportConfig = config.options().get<std::string>("default-transport");
94 if (defaultTransportConfig == "zeromq") {
95 // nothing to do for the moment
96 } else if (defaultTransportConfig == "shmem") {
97 // nothing to do for the moment
98 } else {
99 throw std::runtime_error("invalid argument for option --default-transport : '" + defaultTransportConfig + "'");
100 }
101 std::vector<DataProcessorSpec> workflow;
102
103 struct BenchmarkState {
104 size_t logPeriod = 2;
105 size_t runningTime = 30;
106 size_t eventCount = 0;
107 size_t totalEventCount = 0;
108 size_t msgCount = 0;
109 size_t msgSize = 0;
110 size_t totalMsgCount = 0;
111 size_t totalMsgSize = 0;
112 benchclock::time_point startTime = benchclock::now();
113 benchclock::time_point idleTime = benchclock::now();
114 benchclock::time_point lastLogTime = benchclock::now();
115 float maxMsgPerSec = .0;
116 float maxDataRatePerSec = .0;
117 float totalIdleTime = .0;
118 float maxIdleTime = .0;
119 };
120
121 auto makeBenchmarkState = [&config]() -> std::shared_ptr<BenchmarkState> {
122 auto state = std::make_shared<BenchmarkState>();
123 state->runningTime = config.options().get<int>("runningTime");
124 return state;
125 };
126
127 auto loggerInit = [](BenchmarkState& state) {
128 state.startTime = benchclock::now();
129 state.idleTime = benchclock::now();
130 state.lastLogTime = benchclock::now();
131 state.totalIdleTime = 0.;
132 };
133
134 auto loggerCycle = [](BenchmarkState& state, size_t msgCount, size_t msgSize) {
135 ++state.eventCount;
136 state.msgCount += msgCount;
137 state.msgSize += msgSize;
138 auto secSinceLastLog = std::chrono::duration_cast<std::chrono::seconds>(benchclock::now() - state.lastLogTime);
139 if (secSinceLastLog.count() >= state.logPeriod) {
140 // TODO: introduce real counters for accumulated number of messages and message size
141 state.totalEventCount += state.eventCount;
142 state.totalMsgCount += state.msgCount;
143 state.totalMsgSize += state.msgSize;
144 float eventRate = state.eventCount / secSinceLastLog.count();
145 float msgPerSec = state.msgCount / secSinceLastLog.count();
146 float kbPerSec = state.msgSize / (1024 * secSinceLastLog.count());
147 auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(benchclock::now() - state.startTime);
148 LOG(info) << fmt::format(
149 "{: 3d}s: Total messages: {} - Event rate {:.2f} Hz {:.2f} msg/s {:.2f} MB/s, "
150 "Accumulated idle time {:.2f} ms",
151 elapsedTime.count(), state.totalEventCount, eventRate, msgPerSec,
152 kbPerSec / 1024, state.totalIdleTime / 1000);
153 if (state.maxMsgPerSec < msgPerSec) {
154 state.maxMsgPerSec = msgPerSec;
155 }
156 if (state.maxDataRatePerSec < kbPerSec) {
157 state.maxDataRatePerSec = kbPerSec;
158 }
159 state.eventCount = 0;
160 state.msgCount = 0;
161 state.msgSize = 0;
162 state.lastLogTime = benchclock::now();
163 }
164 };
165
166 struct ActiveGuard {
167 ActiveGuard(BenchmarkState& _state) : state(_state)
168 {
169 auto idleTime = std::chrono::duration_cast<std::chrono::microseconds>(benchclock::now() - state.idleTime);
170 state.totalIdleTime += idleTime.count();
171 }
172 ~ActiveGuard()
173 {
174 state.idleTime = benchclock::now();
175 }
176 BenchmarkState& state;
177 };
178
179 auto loggerSummary = [](BenchmarkState& state) {
180 auto totalTime = std::chrono::duration_cast<std::chrono::seconds>(benchclock::now() - state.startTime);
181 if (totalTime.count() == 0 || state.totalEventCount == 0) {
182 return;
183 }
184 float eventRate = state.totalEventCount / totalTime.count();
185 float msgPerSec = state.totalMsgCount / totalTime.count();
186 float kbPerSec = state.totalMsgSize / (1024 * totalTime.count());
187 LOG(info) << fmt::format(
188 "Benchmarking "
189#ifndef NDEBUG
190 "accumulated "
191#endif
192 "for {} s: Avrg event rate {:.2f} Hz, "
193 "Avrg message rate {:.2f}/s (max {:.2f}/s), Avrg data rate {:.2f} MB/s (max {:.2f} MB/s), "
194 "Avrg idle time {:.2f} ms",
195 totalTime.count(), eventRate,
196 msgPerSec, state.maxMsgPerSec,
197 kbPerSec / 1024, state.maxDataRatePerSec / 1024,
198 state.totalIdleTime / (state.totalEventCount * 1000));
199 };
200
201 struct ProducerAttributes {
202 enum struct Mode {
203 // create messages via transport allocator
204 Transport = 0,
205 // create messages via DPL allocator
206 Allocator,
207 };
208 size_t nRolls = 2;
209 size_t msgSize = 1024 * 1024;
210 size_t nChannels = 1;
211 size_t splitPayloadSize = 1;
212 size_t iteration = 0;
213 std::string channelName;
214 Mode mode = Mode::Transport;
215 ProxyBypass bypassProxies = ProxyBypass::None;
216 };
217
219 // configuration of the out-of-band proxy channel
220 //
221 // used either in the output proxy ('dpl-sink') or as a direct channel of the producer
222 // use the OutputChannelSpec as a tool to create the default configuration for the out-of-band channel
223 OutputChannelSpec externalChannelSpec;
224 // Note: the name is hardcoded for now
225 externalChannelSpec.name = "downstream";
226 externalChannelSpec.type = ChannelType::Push;
227 externalChannelSpec.method = ChannelMethod::Bind;
228 externalChannelSpec.hostname = "localhost";
229 externalChannelSpec.port = 42042;
230 externalChannelSpec.listeners = 0;
231 externalChannelSpec.rateLogging = 10;
232 externalChannelSpec.sendBufferSize = 1;
233 externalChannelSpec.recvBufferSize = 1;
234 if (!defaultTransportConfig.empty()) {
235 if (defaultTransportConfig == "zeromq") {
236 externalChannelSpec.protocol = ChannelProtocol::Network;
237 } else if (defaultTransportConfig == "shmem") {
238 externalChannelSpec.protocol = ChannelProtocol::IPC;
239 }
240 }
241 std::string channelConfig = formatExternalChannelConfiguration(externalChannelSpec);
242 // at some point the formatting tool might add the transport as well so we have to check
243 if (!defaultTransportConfig.empty() && defaultTransportConfig.find("transport=") == std::string::npos) {
244 channelConfig += ",transport=" + defaultTransportConfig;
245 }
246
248 // producer process
249 //
250 // the compute callback of the producer
251 auto pState = makeBenchmarkState();
252 auto attributes = std::make_shared<ProducerAttributes>();
253 if (bypassProxies == ProxyBypass::Output) {
254 // if we bypass the output proxy, the producer needs the out-of-band channel
255 attributes->channelName = externalChannelSpec.name;
256 }
257 attributes->bypassProxies = bypassProxies;
258 attributes->nChannels = nChannels;
259 auto producerInitCallback = [pState, loggerInit, loggerCycle, loggerSummary, attributes](CallbackService& callbacks,
260 RawDeviceService& rds,
261 ConfigParamRegistry const& config) {
262 attributes->msgSize = 1024 * config.get<int>("msgSize");
263 attributes->splitPayloadSize = config.get<int>("splitPayloadSize");
264 auto producerBenchInit = [pState, loggerInit, attributes, outputRoutes = rds.spec().outputs]() {
265 // find the output channel name, we expect all output messages to be
266 // sent over the same channel
267 if (attributes->channelName.empty()) {
268 OutputSpec const query{"TST", "DATA", 0};
269 for (auto& route : outputRoutes) {
270 if (DataSpecUtils::match(route.matcher, query)) {
271 attributes->channelName = route.channel;
272 break;
273 }
274 }
275 }
276 ASSERT_ERROR(attributes->channelName.length() > 0);
277 loggerInit(*pState);
278 };
279 callbacks.set<CallbackService::Id::Start>(producerBenchInit);
280
281 auto producerCallback = [pState, loggerCycle, loggerSummary, attributes](InputRecord& inputs, DataAllocator& outputs, ControlService& control, RawDeviceService& rds) {
282 auto& state = *pState;
283 ActiveGuard g(state);
284
285 fair::mq::Device& device = *(rds.device());
286 auto transport = device.GetChannel(attributes->channelName, 0).Transport();
287 auto channelAlloc = o2::pmr::getTransportAllocator(transport);
288
289 DataProcessingHeader dph{attributes->iteration, 0};
290 fair::mq::Parts messages;
291 size_t nHeaders = 0;
292 size_t totalPayload = 0;
293 size_t allocatedSize = 0;
294 auto createMessage = [&transport, &allocatedSize](size_t size) -> fair::mq::MessagePtr {
295 auto msg = transport->CreateMessage(size);
296 allocatedSize += size;
297 return msg;
298 };
299 auto insertHeader = [&dph, &createMessage, &messages, &nHeaders](DataHeader const& dh) -> void {
300 Stack stack{dh, dph};
301 fair::mq::MessagePtr header = createMessage(stack.size());
302 memcpy(header->GetData(), stack.data(), stack.size());
303 messages.AddPart(std::move(header));
304 ++nHeaders;
305 };
306 auto insertPayload = [&createMessage, &messages, &totalPayload](size_t size) -> void {
307 fair::mq::MessagePtr payload = createMessage(size);
308 messages.AddPart(std::move(payload));
309 totalPayload += size;
310 };
311 auto createSequence = [&attributes, &insertHeader, &insertPayload](size_t nPayloads, DataHeader dh) -> void {
312 // one header with index set to the number of split parts indicates sequence
313 // of payloads without additional headers
314 dh.payloadSize = attributes->msgSize;
315 dh.payloadSerializationMethod = o2::header::gSerializationMethodNone;
316 dh.splitPayloadIndex = nPayloads;
317 dh.splitPayloadParts = nPayloads;
318 insertHeader(dh);
319
320 for (size_t i = 0; i < nPayloads; ++i) {
321 insertPayload(dh.payloadSize);
322 }
323 };
324
325 auto createPairs = [&attributes, &insertHeader, &insertPayload](size_t nPayloads, DataHeader dh) -> void {
326 // one header with index set to the number of split parts indicates sequence
327 // of payloads without additional headers
328 dh.payloadSize = attributes->msgSize;
329 dh.payloadSerializationMethod = o2::header::gSerializationMethodNone;
330 dh.splitPayloadIndex = 0;
331 dh.splitPayloadParts = nPayloads;
332 for (size_t i = 0; i < nPayloads; ++i) {
333 dh.splitPayloadIndex = i;
334 insertHeader(dh);
335 insertPayload(dh.payloadSize);
336 }
337 };
338
339 bool forcedTermination = false;
340 try {
341 if (attributes->mode == ProducerAttributes::Mode::Transport) {
342 for (unsigned int i = 0; i < attributes->nChannels; i++) {
343 createPairs(attributes->splitPayloadSize, DataHeader{"DATA", "TST", i});
344 }
345 // using utility from ExternalFairMQDeviceProxy
346 o2::framework::sendOnChannel(device, messages, attributes->channelName, (size_t)-1);
347 } else {
348 for (unsigned int i = 0; i < attributes->nChannels; i++) {
349 outputs.make<char>(OutputRef{"data", i}, attributes->msgSize);
350 }
351 }
352 } catch (const std::exception& e) {
353 // we cracefully handle if no shared memory can be allocated, that's simply
354 // a matter of configuration
355 if (std::string(e.what()).find("shmem: could not create a message of size") == std::string::npos) {
356 throw e;
357 }
358 LOG(error) << fmt::format("Exception {}\nallocated {} in cycle {} \nconsider increasing shared memory", e.what(), allocatedSize, attributes->iteration);
359 forcedTermination = true;
360 }
361 ++attributes->iteration;
362 loggerCycle(*pState, nHeaders, totalPayload);
363 auto elapsedTime = std::chrono::duration_cast<std::chrono::seconds>(benchclock::now() - state.startTime);
364 if (forcedTermination || elapsedTime.count() >= state.runningTime) {
365 loggerSummary(*pState);
366 if (forcedTermination) {
367 LOG(error) << "termination was forced by earlier error";
368 }
369 // send the end of stream signal, this is transferred by the proxies
370 // and allows to properly terminate downstream devices
371 control.endOfStream();
372 if (attributes->bypassProxies == ProxyBypass::Output) {
373 // since we are sending on the bare channel, also the EOS message needs to be created.
375 sih.state = InputChannelState::Completed;
376 auto headerMessage = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dph, sih});
377 fair::mq::Parts out;
378 out.AddPart(std::move(headerMessage));
379 // add empty payload message
380 out.AddPart(std::move(device.NewMessageFor(attributes->channelName, 0, 0)));
381 o2::framework::sendOnChannel(device, out, attributes->channelName, (size_t)-1);
382 }
383 }
384 };
385
386 return adaptStateless(producerCallback);
387 };
388
389 Outputs outputs;
390 for (unsigned int i = 0; i < nChannels; i++) {
391 outputs.emplace_back(OutputSpec{{"data"}, "TST", "DATA", i, Lifetime::Timeframe});
392 }
393 workflow.emplace_back(DataProcessorSpec{"producer",
394 {},
395 {std::move(outputs)},
396 AlgorithmSpec{adaptStateful(producerInitCallback)},
397 {ConfigParamSpec{"splitPayloadSize", VariantType::Int, 1, {"number of split payloads"}},
398 ConfigParamSpec{"msgSize", VariantType::Int, 1024, {"message size in kB"}}}});
399
400 if (bypassProxies == ProxyBypass::Output) {
401 // create the out-of-band channel in the producer if the output proxy is bypassed
402 const char* d = strdup(channelConfig.c_str());
403 workflow.back().options.push_back(ConfigParamSpec{"channel-config", VariantType::String, d, {"proxy channel of producer"}});
404 }
405
407 // the dpl sink proxy process
408
409 Inputs sinkInputs;
410 for (unsigned int i = 0; i < nChannels; i++) {
411 sinkInputs.emplace_back(InputSpec{{"external"}, "TST", "DATA", i, Lifetime::Timeframe});
412 }
413 auto channelSelector = [](InputSpec const&, const std::unordered_map<std::string, std::vector<fair::mq::Channel>>&) -> std::string {
414 return "downstream";
415 };
416 if (bypassProxies == ProxyBypass::None) {
417 workflow.emplace_back(std::move(specifyFairMQDeviceOutputProxy("dpl-sink", sinkInputs, channelConfig.c_str())));
418 }
419
421 // a simple checker process subscribing to the output of the input proxy
422 //
423 // the compute callback of the checker
424 auto cState = makeBenchmarkState();
425 auto checkerCallback = [cState, loggerCycle](InputRecord& inputs) {
426 ActiveGuard g(*cState);
427 LOG(debug) << "got inputs " << inputs.size();
428 size_t msgCount = 0;
429 size_t msgSize = 0;
430 for (auto const& ref : InputRecordWalker(inputs)) {
431 auto data = inputs.get<gsl::span<char>>(ref);
432 ++msgCount;
433 msgSize += data.size();
434 }
435 loggerCycle(*cState, msgCount, msgSize);
436 };
437 auto checkerBenchInit = [cState, loggerInit]() {
438 loggerInit(*cState);
439 };
440 auto checkerBenchSummary = [cState, loggerSummary](EndOfStreamContext&) {
441 loggerSummary(*cState);
442 };
443 auto checkerInit = [checkerCallback, checkerBenchInit, checkerBenchSummary](CallbackService& callbacks) {
444 callbacks.set<CallbackService::Id::Start>(checkerBenchInit);
445 callbacks.set<CallbackService::Id::EndOfStream>(checkerBenchSummary);
446 return adaptStateless(checkerCallback);
447 };
448
449 // the checker process connects to the proxy
450 Inputs checkerInputs;
451 if (bypassProxies != ProxyBypass::None) {
452 checkerInputs.emplace_back(InputSpec{"datain", ConcreteDataTypeMatcher{"TST", "DATA"}, Lifetime::Timeframe});
453 } else {
454 checkerInputs.emplace_back(InputSpec{"datain", ConcreteDataTypeMatcher{"PRX", "DATA"}, Lifetime::Timeframe});
455 }
456 workflow.emplace_back(DataProcessorSpec{"checker",
457 std::move(checkerInputs),
458 {},
459 AlgorithmSpec{adaptStateful(checkerInit)}});
460
462 // the input proxy process
463 // reads the messages from the output proxy via the out-of-band channel
464
465 // converter callback for the external FairMQ device proxy ProcessorSpec generator
466 InjectorFunction converter = [](TimingInfo&, ServiceRegistryRef const& ref, fair::mq::Parts& inputs, ChannelRetriever channelRetriever, size_t newTimesliceId, bool&) -> bool {
467 auto* device = ref.get<RawDeviceService>().device();
468 ASSERT_ERROR(inputs.Size() >= 2);
469 if (inputs.Size() < 2) {
470 return false;
471 }
472 int msgidx = 0;
473 auto dh = o2::header::get<o2::header::DataHeader*>(inputs.At(msgidx)->GetData());
474 if (!dh) {
475 LOG(error) << "data on input " << msgidx << " does not follow the O2 data model, DataHeader missing";
476 return false;
477 }
478 auto dph = o2::header::get<DataProcessingHeader*>(inputs.At(msgidx)->GetData());
479 if (!dph) {
480 LOG(error) << "data on input " << msgidx << " does not follow the O2 data model, DataProcessingHeader missing";
481 return false;
482 }
483 // Note: we want to run both the output and input proxy in the same workflow and thus we need
484 // different data identifiers and change the data origin in the forwarding
485 OutputSpec query{"PRX", dh->dataDescription, dh->subSpecification};
486 auto const& channelName = channelRetriever(query, dph->startTime);
487 bool isData = DataSpecUtils::match(OutputSpec{"TST", "DATA", 0}, dh->dataOrigin, dh->dataDescription, dh->subSpecification);
488 // for the configured data channel we require the channel name, the EOS message containing
489 // the forwarded SourceInfoHeader created by the output proxy will be skipped here since the
490 // input proxy handles this internally
491 ASSERT_ERROR(!isData || !channelName.empty());
492 LOG(debug) << "using channel '" << channelName << "' for " << DataSpecUtils::describe(OutputSpec{dh->dataOrigin, dh->dataDescription, dh->subSpecification});
493 if (channelName.empty()) {
494 return false;
495 }
496 // make a copy of the header message, get the data header and change origin
497 auto outHeaderMessage = device->NewMessageFor(channelName, 0, inputs.At(msgidx)->GetSize());
498 memcpy(outHeaderMessage->GetData(), inputs.At(msgidx)->GetData(), inputs.At(msgidx)->GetSize());
499 // this we obviously need to fix in the get API, const'ness of the returned header pointer
500 // should depend on const'ness of the buffer
501 auto odh = const_cast<o2::header::DataHeader*>(o2::header::get<o2::header::DataHeader*>(outHeaderMessage->GetData()));
503 fair::mq::Parts output;
504 output.AddPart(std::move(outHeaderMessage));
505 output.AddPart(std::move(inputs.At(msgidx + 1)));
506 LOG(debug) << "sending " << DataSpecUtils::describe(OutputSpec{odh->dataOrigin, odh->dataDescription, odh->subSpecification});
507 o2::framework::sendOnChannel(*device, output, channelName, (size_t)-1);
508 return output.Size() > 0;
509 };
510
511 // we use the same spec to build the configuration string, ideally we would have some helpers
512 // which convert an OutputChannelSpec to an InputChannelSpec replacing 'bind' <--> 'connect'
513 // and 'push' <--> 'pull'
514 //
515 // skip the name in the configuration string as it is added in specifyExternalFairMQDeviceProxy
516 externalChannelSpec.name = "";
517 externalChannelSpec.type = ChannelType::Pull;
518 externalChannelSpec.method = ChannelMethod::Connect;
519 channelConfig = formatExternalChannelConfiguration(externalChannelSpec);
520 if (!defaultTransportConfig.empty() && defaultTransportConfig.find("transport=") == std::string::npos) {
521 channelConfig += ",transport=" + defaultTransportConfig;
522 }
523
524 if (bypassProxies == ProxyBypass::None) {
525 // Note: in order to make the DPL output proxy and an input proxy working in the same
526 // workflow, we use different data description
527 Outputs inputProxyOutputs = {OutputSpec{ConcreteDataTypeMatcher{"PRX", "DATA"}, Lifetime::Timeframe}};
528 workflow.emplace_back(specifyExternalFairMQDeviceProxy(
529 "input-proxy",
530 std::move(inputProxyOutputs),
531 channelConfig.c_str(),
532 converter));
533 } else if (bypassProxies == ProxyBypass::Output) {
534 Outputs inputProxyOutputs = {OutputSpec{ConcreteDataTypeMatcher{"TST", "DATA"}, Lifetime::Timeframe}};
535 // we use the same specs as filters in the dpl adaptor
536 auto filterSpecs = inputProxyOutputs;
537 workflow.emplace_back(specifyExternalFairMQDeviceProxy(
538 "input-proxy",
539 std::move(inputProxyOutputs),
540 channelConfig.c_str(),
541 o2::framework::dplModelAdaptor(filterSpecs, true)));
542 }
543
544 return workflow;
545}
546
547std::istream& operator>>(std::istream& in, enum benchmark_config::ProxyBypass& val)
548{
549 std::string token;
550 in >> token;
551 if (token == "none") {
553 } else if (token == "all" || token == "both" || token == "a") {
555 } else if (token == "output" || token == "out" || token == "o") {
557 } else {
558 in.setstate(std::ios_base::failbit);
559 }
560 return in;
561}
562
563std::ostream& operator<<(std::ostream& out, const enum benchmark_config::ProxyBypass& val)
564{
566 out << "none";
568 out << "all";
570 out << "output";
571 } else {
572 out.setstate(std::ios_base::failbit);
573 }
574 return out;
575}
benchmark::State & state
int32_t i
Mode
Definition Utils.h:89
A helper class to iteratate over all parts of all input routes.
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
uint32_t stack
Definition RawData.h:1
std::ostringstream debug
#define ASSERT_ERROR(condition)
std::chrono::high_resolution_clock benchclock
void customize(std::vector< ConfigParamSpec > &workflowOptions)
T readConfig(ConfigContext const &config, const char *key)
std::vector< DataProcessorSpec > defineDataProcessing(ConfigContext const &config)
This function hooks up the the workflow specifications into the DPL driver.
StringRef key
ConfigParamRegistry & options() const
A helper class to iteratate over all parts of all input routes.
The input API of the Data Processing Layer This class holds the inputs which are valid for processing...
GLenum mode
Definition glcorearb.h:266
GLsizeiptr size
Definition glcorearb.h:659
GLboolean * data
Definition glcorearb.h:298
GLboolean GLboolean g
Definition glcorearb.h:1233
GLuint GLfloat * val
Definition glcorearb.h:1582
GLint ref
Definition glcorearb.h:291
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
DataProcessorSpec specifyExternalFairMQDeviceProxy(char const *label, std::vector< OutputSpec > const &outputs, const char *defaultChannelConfig, InjectorFunction converter, uint64_t minSHM=0, bool sendTFcounter=false, bool doInjectMissingData=false, unsigned int doPrintSizes=0)
std::ostream & operator<<(std::ostream &s, ChannelType const &type)
Stream operators so that we can use ChannelType with Boost.Test.
std::function< bool(TimingInfo &, ServiceRegistryRef const &services, fair::mq::Parts &inputs, ChannelRetriever, size_t newTimesliceId, bool &stop)> InjectorFunction
InjectorFunction dplModelAdaptor(std::vector< OutputSpec > const &specs={{header::gDataOriginAny, header::gDataDescriptionAny}}, DPLModelAdapterConfig config=DPLModelAdapterConfig{})
DataProcessorSpec specifyFairMQDeviceOutputProxy(char const *label, Inputs const &inputSpecs, const char *defaultChannelConfig)
void sendOnChannel(fair::mq::Device &device, o2::header::Stack &&headerStack, fair::mq::MessagePtr &&payloadMessage, OutputSpec const &spec, ChannelRetriever &channelRetriever)
std::string formatExternalChannelConfiguration(InputChannelSpec const &)
helper method to format a configuration string for an external channel
std::function< std::string const &(OutputSpec const &, DataProcessingHeader::StartTime)> ChannelRetriever
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
std::vector< InputSpec > Inputs
std::istream & operator>>(std::istream &in, enum TerminationPolicy &policy)
std::vector< OutputSpec > Outputs
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
constexpr o2::header::SerializationMethod gSerializationMethodNone
Definition DataHeader.h:327
Descriptor< gSizeDataOriginString > DataOrigin
Definition DataHeader.h:550
fair::mq::MessagePtr getMessage(ContainerT &&container, FairMQMemoryResource *targetResource=nullptr)
static std::string describe(InputSpec const &spec)
static bool match(InputSpec const &spec, ConcreteDataMatcher const &target)
a BaseHeader with state information from the source
the main header struct
Definition DataHeader.h:618
a move-only header stack with serialized headers This is the flat buffer where all the headers in a m...
Definition Stack.h:36
size_t nRolls
constexpr size_t nChannels
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
uint64_t const void const *restrict const msg
Definition x9.h:153