Project
Loading...
Searching...
No Matches
SimPublishChannelHelper.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#ifndef O2_SIMPUBLISHCHANNELHELPER_H
13#define O2_SIMPUBLISHCHANNELHELPER_H
14
15#include <string>
16#include <fairmq/Channel.h>
17#include <fairmq/Message.h>
18#include <fairmq/TransportFactory.h>
19
21{
22
23// create an IPC socket name of the type
24// ipc:///tmp/base-PID
25// base should be for example "o2sim-worker" or "o2sim-merger"
26std::string getPublishAddress(std::string const& base, int pid = getpid())
27{
28 std::stringstream publishsocketname;
29 publishsocketname << "ipc:///tmp/" << base << "-" << pid;
30 return publishsocketname.str();
31}
32
33// some standard format for pub-sub subscribers
34std::string simStatusString(std::string const& origin, std::string const& topic, std::string const& message)
35{
36 return origin + std::string("[") + topic + std::string("] : ") + message;
37}
38
39// helper function to publish a message to an outside subscriber
40bool publishMessage(fair::mq::Channel& channel, std::string const& message)
41{
42 if (channel.IsValid()) {
43 auto text = new std::string(message);
44 std::unique_ptr<fair::mq::Message> payload(channel.NewMessage(
45 const_cast<char*>(text->c_str()),
46 text->length(), [](void* data, void* hint) { delete static_cast<std::string*>(hint); }, text));
47 if (channel.Send(payload) > 0) {
48 return true;
49 }
50 } else {
51 LOG(error) << "CHANNEL NOT VALID";
52 }
53 return false;
54}
55
56// make channel (transport factory needs to be injected)
57fair::mq::Channel createPUBChannel(std::string const& address,
58 std::string const& type = "pub")
59{
60 auto factory = fair::mq::TransportFactory::CreateTransportFactory("zeromq");
61 static int i = 0;
62 std::stringstream str;
63 str << "channel" << i++;
64 auto externalpublishchannel = fair::mq::Channel{str.str(), type, factory};
65 externalpublishchannel.Init();
66 if ((type.compare("pub") == 0) || (type.compare("pull") == 0)) {
67 externalpublishchannel.Bind(address);
68 LOG(info) << "BINDING TO ADDRESS " << address << " type " << type;
69 } else {
70 externalpublishchannel.Connect(address);
71 LOG(info) << "CONNECTING TO ADDRESS " << address << " type " << type;
72 }
73 externalpublishchannel.Validate();
74 return externalpublishchannel;
75}
76
77} // namespace o2::simpubsub
78
79#endif //O2_SIMPUBLISHCHANNELHELPER_H
int32_t i
uint16_t pid
Definition RawData.h:2
GLuint GLuint64EXT address
Definition glcorearb.h:5846
GLint GLint GLsizei GLint GLenum GLenum type
Definition glcorearb.h:275
GLboolean * data
Definition glcorearb.h:298
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
std::string simStatusString(std::string const &origin, std::string const &topic, std::string const &message)
std::string getPublishAddress(std::string const &base, int pid=getpid())
bool publishMessage(fair::mq::Channel &channel, std::string const &message)
fair::mq::Channel createPUBChannel(std::string const &address, std::string const &type="pub")
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
const std::string str