Project
Loading...
Searching...
No Matches
dcs-proxy.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12// example to run:
13// o2-dcs-proxy --dcs-proxy '--channel-config "name=dcs-proxy,type=pull,method=connect,address=tcp://10.11.28.22:60000,rateLogging=1,transport=zeromq"' -b
14
19#include "Framework/Logger.h"
20#include "Framework/Lifetime.h"
26#include "DCStoDPLconverter.h"
29#include "CCDB/CcdbApi.h"
31#include <vector>
32#include <unordered_map>
33#include <regex>
34#include <string>
35#include <unistd.h>
36
37using namespace o2::framework;
42
43// we need to add workflow options before including Framework/runDataProcessing
44void customize(std::vector<ConfigParamSpec>& workflowOptions)
45{
46 workflowOptions.push_back(ConfigParamSpec{"verbose", VariantType::Bool, false, {"verbose output"}});
47 workflowOptions.push_back(ConfigParamSpec{"fbi-report-rate", VariantType::Int, 6, {"report pet N FBI received"}});
48 workflowOptions.push_back(ConfigParamSpec{"test-mode", VariantType::Bool, false, {"test mode"}});
49 workflowOptions.push_back(ConfigParamSpec{"may-send-delta-first", VariantType::Bool, false, {"if true, do not wait for FBI before sending 1st output"}});
50 workflowOptions.push_back(ConfigParamSpec{"ccdb-url", VariantType::String, "http://ccdb-test.cern.ch:8080", {"url of CCDB to get the detectors DPs configuration"}});
51 workflowOptions.push_back(ConfigParamSpec{"detector-list", VariantType::String, "TOF, MCH", {"list of detectors for which to process DCS"}});
52 workflowOptions.push_back(ConfigParamSpec{"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings"}});
53}
54
56
58{
59
60 bool verbose = config.options().get<bool>("verbose");
61 bool testMode = config.options().get<bool>("test-mode");
62 bool fbiFirst = !config.options().get<bool>("may-send-delta-first");
63 int repRate = std::max(1, config.options().get<int>("fbi-report-rate"));
64 std::string detectorList = config.options().get<std::string>("detector-list");
65 o2::conf::ConfigurableParam::updateFromString(config.options().get<std::string>("configKeyValues"));
66 std::string url = config.options().get<std::string>("ccdb-url");
67
68 std::unordered_map<DPID, std::vector<o2h::DataDescription>> dpid2DataDesc;
69
70 if (testMode) {
71 DPID dpidtmp;
72 DPID::FILL(dpidtmp, "ADAPOS_LG/TEST_000100", DeliveryType::DPVAL_STRING);
73 dpid2DataDesc[dpidtmp] = {"COMMON"}; // i.e. this will go to {DCS/COMMON/0} OutputSpec
74 DPID::FILL(dpidtmp, "ADAPOS_LG/TEST_000110", DeliveryType::DPVAL_STRING);
75 dpid2DataDesc[dpidtmp] = {"COMMON"};
76 DPID::FILL(dpidtmp, "ADAPOS_LG/TEST_000200", DeliveryType::DPVAL_STRING);
77 dpid2DataDesc[dpidtmp] = {"COMMON1"};
78 DPID::FILL(dpidtmp, "ADAPOS_LG/TEST_000240", DeliveryType::DPVAL_INT);
79 dpid2DataDesc[dpidtmp] = {"COMMON1"};
80 }
81
82 else {
83 auto& mgr = CcdbManager::instance();
84 mgr.setURL(url); // http://ccdb-test.cern.ch:8080 or http://localhost:8080 for a local installation
85 long ts = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
86 std::regex re("[\\s,-]+");
87 std::sregex_token_iterator it(detectorList.begin(), detectorList.end(), re, -1);
88 std::sregex_token_iterator reg_end;
89 for (; it != reg_end; ++it) {
90 std::string detStr = it->str();
92 if (!detStr.empty()) {
93 LOG(info) << "DCS DPs configured for detector " << detStr;
94 std::unordered_map<DPID, std::string>* dpid2Det = mgr.getForTimeStamp<std::unordered_map<DPID, std::string>>(detStr + "/Config/DCSDPconfig", ts);
95 for (auto& el : *dpid2Det) {
97 tmpd.runtimeInit(el.second.c_str(), el.second.size());
98 dpid2DataDesc[el.first].push_back(tmpd);
99 }
100 }
101 }
102 }
103
104 // RS: here we should complete the attribution of different DPs to different outputs
105 // ...
106
107 // now collect all required outputs to define OutputSpecs for specifyExternalFairMQDeviceProxy
108 std::unordered_map<o2h::DataDescription, int, std::hash<o2h::DataDescription>> outMap;
109 for (auto itdp : dpid2DataDesc) {
110 for (const auto& ds : itdp.second) {
111 outMap[ds]++;
112 }
113 }
114
115 Outputs dcsOutputs;
116 for (auto itout : outMap) {
117 dcsOutputs.emplace_back("DCS", itout.first, 0, Lifetime::Timeframe);
118 }
119
121 "dcs-proxy",
122 std::move(dcsOutputs),
123 "type=pull,method=connect,address=tcp://aldcsadaposactor:60000,rateLogging=1,transport=zeromq",
124 dcs2dpl(dpid2DataDesc, fbiFirst, verbose, repRate));
125 dcsProxy.labels.emplace_back(DataProcessorLabel{"input-proxy"});
126
127 WorkflowSpec workflow;
128 workflow.emplace_back(dcsProxy);
129 return workflow;
130}
static BasicCCDBManager & instance()
static void updateFromString(std::string const &)
static void FILL(const DataPointIdentifier &dpid, const std::string &alias, const DeliveryType type) noexcept
ConfigParamRegistry & options() const
InjectorFunction dcs2dpl()
WorkflowSpec defineDataProcessing(ConfigContext const &config)
Definition dcs-proxy.cxx:57
void customize(std::vector< ConfigParamSpec > &workflowOptions)
Definition dcs-proxy.cxx:44
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
DataProcessorSpec specifyExternalFairMQDeviceProxy(char const *label, std::vector< OutputSpec > const &outputs, const char *defaultChannelConfig, InjectorFunction converter, uint64_t minSHM=0, bool sendTFcounter=false, bool doInjectMissingData=false, unsigned int doPrintSizes=0)
std::vector< DataProcessorSpec > WorkflowSpec
std::vector< OutputSpec > Outputs
A label that can be associated to a DataProcessorSpec.
std::vector< DataProcessorLabel > labels
void runtimeInit(const char *string, short length=-1)
Definition DataHeader.h:261
static void trim(std::string &s)
Definition StringUtils.h:71
o2::mch::DsIndex ds
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"