40#include <fairmq/Device.h>
41#include <fairmq/Parts.h>
46#include "BookkeepingApi/BkpClient.h"
49InjectorFunction dcs2dpl(std::string& ccdbhost, std::string& bkhost, std::string& qchost,
int qcwriteperiod, std::string& ctpcfgdir)
51 auto runMgr = std::make_shared<o2::ctp::CTPRunManager>();
52 runMgr->setCCDBHost(ccdbhost);
53 runMgr->setBKHost(bkhost);
54 runMgr->setQCDBHost(qchost);
55 runMgr->setQCWritePeriod(qcwriteperiod);
56 runMgr->setCtpCfgDir(ctpcfgdir);
66 std::string messageHeader{
static_cast<const char*
>(parts.At(0)->GetData()), parts.At(0)->GetSize()};
67 size_t dataSize = parts.At(1)->GetSize();
68 std::string messageData{
static_cast<const char*
>(parts.At(1)->GetData()), parts.At(1)->GetSize()};
69 LOG(info) <<
"received message " << messageHeader <<
" of size " <<
dataSize <<
" # parts:" << parts.Size();
70 runMgr->processMessage(messageHeader, messageData);
76void customize(std::vector<ConfigParamSpec>& workflowOptions)
78 workflowOptions.push_back(
ConfigParamSpec{
"subscribe-to", VariantType::String,
"type=sub,method=connect,address=tcp://188.184.30.57:5556,rateLogging=10,transport=zeromq", {
"channel subscribe to"}});
79 workflowOptions.push_back(
ConfigParamSpec{
"ccdb-host", VariantType::String,
"http://o2-ccdb.internal:8080", {
"ccdb host"}});
80 workflowOptions.push_back(
ConfigParamSpec{
"bk-host", VariantType::String,
"none", {
"bk host"}});
81 workflowOptions.push_back(
ConfigParamSpec{
"qc-host", VariantType::String,
"none", {
"qc host"}});
82 workflowOptions.push_back(
ConfigParamSpec{
"ctpcfg-dir", VariantType::String,
"none", {
"ctp.cfg file directory"}});
83 workflowOptions.push_back(
ConfigParamSpec{
"qc-writeperiod", VariantType::Int, 30, {
"Period of writing to QCDB in units of 10secs, default = 30 (5 mins)"}});
90 LOG(info) <<
"Defining data processing";
91 auto setChanName = [](
const std::string& chan,
const std::string&
name) {
93 if (std::string(chan).find(
"name=") != std::string::npos) {
94 n = std::string(chan).find(
",");
95 if (
n == std::string::npos) {
96 throw std::runtime_error(fmt::format(
"wrongly formatted channel: {}", chan));
100 LOG(info) <<
"===>inside:" <<
name <<
" " << chan;
103 const std::string devName =
"ctp-proxy";
104 auto chan = config.
options().
get<std::string>(
"subscribe-to");
105 std::string ccdbhost = config.
options().
get<std::string>(
"ccdb-host");
106 std::string bkhost = config.
options().
get<std::string>(
"bk-host");
107 std::string qchost = config.
options().
get<std::string>(
"qc-host");
108 int qcwriteperiod = config.
options().
get<
int>(
"qc-writeperiod");
109 std::string ctpcfgdir = config.
options().
get<std::string>(
"ctpcfg-dir");
111 throw std::runtime_error(
"input channel is not provided");
113 chan = setChanName(chan, devName);
114 LOG(info) <<
"name:" << devName <<
" chan:" << chan;
115 LOG(info) <<
"Channels setup: " << chan;
117 ctpCountersOutputs.emplace_back(
"CTP",
"CTP_COUNTERS", 0, Lifetime::Timeframe);
118 LOG(info) <<
"===> Proxy to be set";
121 std::move(ctpCountersOutputs),
124 dcs2dpl(ccdbhost, bkhost, qchost, qcwriteperiod, ctpcfgdir));
126 LOG(info) <<
"===> Proxy done";
128 workflow.emplace_back(ctpProxy);
Managing runs for config and scalers.
Static class with identifiers, bitmasks and names for ALICE detectors.
ConfigParamRegistry & options() const
T get(const char *key) const
WorkflowSpec defineDataProcessing(ConfigContext const &config)
void customize(std::vector< ConfigParamSpec > &workflowOptions)
InjectorFunction dcs2dpl()
GLuint const GLchar * name
Defining PrimaryVertex explicitly as messageable.
DataProcessorSpec specifyExternalFairMQDeviceProxy(char const *label, std::vector< OutputSpec > const &outputs, const char *defaultChannelConfig, InjectorFunction converter, uint64_t minSHM=0, bool sendTFcounter=false, bool doInjectMissingData=false, unsigned int doPrintSizes=0)
std::function< bool(TimingInfo &, ServiceRegistryRef const &services, fair::mq::Parts &inputs, ChannelRetriever, size_t newTimesliceId, bool &stop)> InjectorFunction
std::vector< DataProcessorSpec > WorkflowSpec
std::function< std::string const &(OutputSpec const &, DataProcessingHeader::StartTime)> ChannelRetriever
std::vector< OutputSpec > Outputs
A label that can be associated to a DataProcessorSpec.
std::vector< DataProcessorLabel > labels
static std::string concat_string(Ts const &... ts)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"