12#ifndef O2_SIMPUBLISHCHANNELHELPER_H
13#define O2_SIMPUBLISHCHANNELHELPER_H
16#include <fairmq/Channel.h>
17#include <fairmq/Message.h>
18#include <fairmq/TransportFactory.h>
28 std::stringstream publishsocketname;
29 publishsocketname <<
"ipc:///tmp/" << base <<
"-" <<
pid;
30 return publishsocketname.str();
36 return origin + std::string(
"[") + topic + std::string(
"] : ") +
message;
42 if (channel.IsValid()) {
43 auto text =
new std::string(
message);
44 std::unique_ptr<fair::mq::Message> payload(channel.NewMessage(
45 const_cast<char*
>(text->c_str()),
46 text->length(), [](
void*
data,
void* hint) { delete static_cast<std::string*>(hint); }, text));
47 if (channel.Send(payload) > 0) {
51 LOG(error) <<
"CHANNEL NOT VALID";
58 std::string
const&
type =
"pub")
60 auto factory = fair::mq::TransportFactory::CreateTransportFactory(
"zeromq");
62 std::stringstream
str;
63 str <<
"channel" <<
i++;
64 auto externalpublishchannel = fair::mq::Channel{
str.str(),
type, factory};
65 externalpublishchannel.Init();
66 if ((
type.compare(
"pub") == 0) || (
type.compare(
"pull") == 0)) {
67 externalpublishchannel.Bind(
address);
70 externalpublishchannel.Connect(
address);
71 LOG(info) <<
"CONNECTING TO ADDRESS " <<
address <<
" type " <<
type;
73 externalpublishchannel.Validate();
74 return externalpublishchannel;
GLuint GLuint64EXT address
GLint GLint GLsizei GLint GLenum GLenum type
GLuint GLsizei const GLchar * message
std::string simStatusString(std::string const &origin, std::string const &topic, std::string const &message)
std::string getPublishAddress(std::string const &base, int pid=getpid())
bool publishMessage(fair::mq::Channel &channel, std::string const &message)
fair::mq::Channel createPUBChannel(std::string const &address, std::string const &type="pub")
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"