Project
Loading...
Searching...
No Matches
ctp-proxy.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
13// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
14// All rights not expressly granted are reserved.
15//
16// This software is distributed under the terms of the GNU General Public
17// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
18//
19// In applying this license CERN does not waive the privileges and immunities
20// granted to it by virtue of its status as an Intergovernmental Organization
21// or submit itself to any jurisdiction.
22
23// example to run:
24// default: processing , intermal database
25// o2-ctp-proxy --ctp-proxy '--channel-config "name=ctp-proxy,type=sub,method=connect,address=tcp://10.161.64.100:50090,rateLogging=5,transport=zeromq"' -b
26// processing, test database
27// o2-ctp-proxy --ctp-proxy '--channel-config "name=ctp-proxy,type=sub,method=connect,address=tcp://10.161.64.100:50090,rateLogging=5,transport=zeromq"' '--ccdb-host=http://ccdb-test.cern.ch:8080' -b
28
34#include "Framework/Logger.h"
35#include "Framework/Lifetime.h"
40#include <fairmq/Device.h>
41#include <fairmq/Parts.h>
44#include <vector>
45#include <string>
46#include "BookkeepingApi/BkpClient.h"
47using namespace o2::framework;
49InjectorFunction dcs2dpl(std::string& ccdbhost, std::string& bkhost, std::string& qchost, int qcwriteperiod, std::string& ctpcfgdir)
50{
51 auto runMgr = std::make_shared<o2::ctp::CTPRunManager>();
52 runMgr->setCCDBHost(ccdbhost);
53 runMgr->setBKHost(bkhost);
54 runMgr->setQCDBHost(qchost);
55 runMgr->setQCWritePeriod(qcwriteperiod);
56 runMgr->setCtpCfgDir(ctpcfgdir);
57 runMgr->init();
58 // runMgr->setClient(client);
59 static int nprint = 0;
60 return [runMgr](TimingInfo&, ServiceRegistryRef const& services, fair::mq::Parts& parts, ChannelRetriever channelRetriever, size_t newTimesliceId, bool& stop) -> bool {
61 // FIXME: Why isn't this function using the timeslice index?
62 // make sure just 2 messages received
63 // if (parts.Size() != 2) {
64 // LOG(error) << "received " << parts.Size() << " instead of 2 expected";
65 // return;
66 //}
67 std::string messageHeader{static_cast<const char*>(parts.At(0)->GetData()), parts.At(0)->GetSize()};
68 size_t dataSize = parts.At(1)->GetSize();
69 std::string messageData{static_cast<const char*>(parts.At(1)->GetData()), parts.At(1)->GetSize()};
70 nprint++;
71 int nlimit = 60;
72 int nrange = 8;
73 if (nprint > nlimit && nprint < (nlimit + nrange + 1)) {
74 LOG(info) << "received message " << messageHeader << " of size " << dataSize << " # parts:" << parts.Size(); // << " Payload:" << messageData;
75 if (nprint == (nlimit + nrange)) {
76 nprint = 0;
77 }
78 }
79 runMgr->processMessage(messageHeader, messageData);
80 return true;
81 };
82}
83
84// we need to add workflow options before including Framework/runDataProcessing
85void customize(std::vector<ConfigParamSpec>& workflowOptions)
86{
87 workflowOptions.push_back(ConfigParamSpec{"subscribe-to", VariantType::String, "type=sub,method=connect,address=tcp://188.184.30.57:5556,rateLogging=10,transport=zeromq", {"channel subscribe to"}});
88 workflowOptions.push_back(ConfigParamSpec{"ccdb-host", VariantType::String, "http://o2-ccdb.internal:8080", {"ccdb host"}});
89 workflowOptions.push_back(ConfigParamSpec{"bk-host", VariantType::String, "none", {"bk host"}});
90 workflowOptions.push_back(ConfigParamSpec{"qc-host", VariantType::String, "none", {"qc host"}});
91 workflowOptions.push_back(ConfigParamSpec{"ctpcfg-dir", VariantType::String, "none", {"ctp.cfg file directory"}});
92 workflowOptions.push_back(ConfigParamSpec{"qc-writeperiod", VariantType::Int, 30, {"Period of writing to QCDB in units of 10secs, default = 30 (5 mins)"}});
93}
94
96
98{
99 LOG(info) << "Defining data processing";
100 auto setChanName = [](const std::string& chan, const std::string& name) {
101 size_t n = 0;
102 if (std::string(chan).find("name=") != std::string::npos) {
103 n = std::string(chan).find(",");
104 if (n == std::string::npos) {
105 throw std::runtime_error(fmt::format("wrongly formatted channel: {}", chan));
106 }
107 n++;
108 }
109 LOG(info) << "===>inside:" << name << " " << chan;
110 return o2::utils::Str::concat_string("name=", name, ",", chan.substr(n, chan.size()));
111 };
112 const std::string devName = "ctp-proxy";
113 auto chan = config.options().get<std::string>("subscribe-to");
114 std::string ccdbhost = config.options().get<std::string>("ccdb-host");
115 std::string bkhost = config.options().get<std::string>("bk-host");
116 std::string qchost = config.options().get<std::string>("qc-host");
117 int qcwriteperiod = config.options().get<int>("qc-writeperiod");
118 std::string ctpcfgdir = config.options().get<std::string>("ctpcfg-dir");
119 if (chan.empty()) {
120 throw std::runtime_error("input channel is not provided");
121 }
122 chan = setChanName(chan, devName);
123 LOG(info) << "name:" << devName << " chan:" << chan;
124 LOG(info) << "Channels setup: " << chan;
125 Outputs ctpCountersOutputs;
126 ctpCountersOutputs.emplace_back("CTP", "CTP_COUNTERS", 0, Lifetime::Timeframe);
127 LOG(info) << "===> Proxy to be set";
129 devName.c_str(),
130 std::move(ctpCountersOutputs),
131 // this is just default, can be overriden by --ctp-config-proxy '--channel-config..'
132 chan.c_str(),
133 dcs2dpl(ccdbhost, bkhost, qchost, qcwriteperiod, ctpcfgdir));
134 ctpProxy.labels.emplace_back(DataProcessorLabel{"input-proxy"});
135 LOG(info) << "===> Proxy done";
136 WorkflowSpec workflow;
137 workflow.emplace_back(ctpProxy);
138 return workflow;
139}
Managing runs for config and scalers.
Static class with identifiers, bitmasks and names for ALICE detectors.
Definition DetID.h:58
ConfigParamRegistry & options() const
WorkflowSpec defineDataProcessing(ConfigContext const &config)
Definition ctp-proxy.cxx:97
void customize(std::vector< ConfigParamSpec > &workflowOptions)
Definition ctp-proxy.cxx:85
InjectorFunction dcs2dpl()
GLdouble n
Definition glcorearb.h:1982
GLenum GLsizei dataSize
Definition glcorearb.h:3994
GLuint const GLchar * name
Definition glcorearb.h:781
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
DataProcessorSpec specifyExternalFairMQDeviceProxy(char const *label, std::vector< OutputSpec > const &outputs, const char *defaultChannelConfig, InjectorFunction converter, uint64_t minSHM=0, bool sendTFcounter=false, bool doInjectMissingData=false, unsigned int doPrintSizes=0)
std::function< bool(TimingInfo &, ServiceRegistryRef const &services, fair::mq::Parts &inputs, ChannelRetriever, size_t newTimesliceId, bool &stop)> InjectorFunction
std::vector< DataProcessorSpec > WorkflowSpec
std::function< std::string const &(OutputSpec const &, DataProcessingHeader::StartTime)> ChannelRetriever
std::vector< OutputSpec > Outputs
A label that can be associated to a DataProcessorSpec.
std::vector< DataProcessorLabel > labels
static std::string concat_string(Ts const &... ts)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"