30#include <TStopwatch.h>
42using Duration = std::chrono::duration<double, std::ratio<1, 1>>;
47 std::vector<DPID> vect;
48 mDPsUpdateInterval = ic.
options().
get<int64_t>(
"DPs-update-interval");
49 mWarnEmptyCycles = ic.
options().
get<
int>(
"warn-empty-cycles");
50 if (mDPsUpdateInterval == 0) {
51 LOG(error) <<
"GRP DPs update interval set to zero seconds --> changed to 60";
52 mDPsUpdateInterval = 60;
54 bool useCCDBtoConfigure = ic.
options().
get<
bool>(
"use-ccdb-to-configure");
55 if (useCCDBtoConfigure) {
56 LOG(info) <<
"Configuring via CCDB";
57 std::string ccdbpath = ic.
options().
get<std::string>(
"ccdb-path");
61 api.
init(mgr.getURL());
62 long ts = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
63 std::unordered_map<DPID, std::string>* dpid2DataDesc = mgr.getForTimeStamp<std::unordered_map<DPID, std::string>>(
"GRP/Config/DCSDPconfig", ts);
64 for (
auto&
i : *dpid2DataDesc) {
65 vect.push_back(
i.first);
68 LOG(info) <<
"Configuring via hardcoded strings";
69 std::vector<std::string> aliasesBFieldDouble = {
"L3Current",
"DipoleCurrent"};
70 std::vector<std::string> aliasesBFieldBool = {
"L3Polarity",
"DipolePolarity"};
71 std::vector<std::string> aliasesEnvVar = {
"CavernTemperature",
"CavernAtmosPressure",
"SurfaceAtmosPressure",
"CavernAtmosPressure2"};
72 std::vector<std::string> compactAliasesLHCIFDouble = {
"LHC_IntensityBeam[1..2]_totalIntensity",
"ALI_Background[1..3]",
73 "ALI_Lumi_Total_Inst",
74 "BPTX_deltaT_B1_B2",
"BPTX_deltaTRMS_B1_B2",
75 "BPTX_Phase_B[1..2]",
"BPTX_PhaseRMS_B[1..2]",
"BPTX_Phase_Shift_B[1..2]"};
77 std::vector<std::string> aliasesLHCIFString = {
"ALI_Lumi_Source_Name",
"MACHINE_MODE",
"BEAM_MODE"};
78 std::vector<std::string> aliasesLHCIFCollimators = {
"LHC_CollimatorPos_TCLIA_4R2_lvdt_gap_downstream",
"LHC_CollimatorPos_TCLIA_4R2_lvdt_gap_upstream",
79 "LHC_CollimatorPos_TCLIA_4R2_lvdt_left_downstream",
"LHC_CollimatorPos_TCLIA_4R2_lvdt_left_upstream",
80 "LHC_CollimatorPos_TCLIA_4R2_lvdt_right_downstream",
"LHC_CollimatorPos_TCLIA_4R2_lvdt_right_upstream"};
82 for (
const auto&
i : aliasesBFieldDouble) {
85 for (
const auto&
i : aliasesBFieldBool) {
88 for (
const auto&
i : aliasesEnvVar) {
91 for (
const auto&
i : aliasesLHCIFDouble) {
94 for (
const auto&
i : aliasesLHCIFCollimators) {
97 for (
const auto&
i : aliasesLHCIFString) {
102 LOG(info) <<
"Listing Data Points for GRP:";
103 for (
auto&
i : vect) {
107 mProcessor = std::make_unique<o2::grp::GRPDCSDPsProcessor>();
108 mVerbose = ic.
options().
get<
bool>(
"use-verbose-mode");
109 LOG(info) <<
" ************************* Verbose?" << mVerbose;
110 bool clearVectors = ic.
options().
get<
bool>(
"clear-vectors");
111 LOG(info) <<
" ************************* Clear vectors?" << clearVectors;
113 mProcessor->useVerboseMode();
116 mProcessor->clearVectors();
118 mProcessor->init(vect);
119 mTimer = HighResClock::now();
120 mReportTiming = ic.
options().
get<
bool>(
"report-timing") || mVerbose;
126 mLHCIFupdated =
false;
129 auto dps = pc.
inputs().
get<gsl::span<DPCOM>>(
"input");
130 auto timeNow = HighResClock::now();
131 if (startValidity == 0xffffffffffffffff) {
132 startValidity = std::chrono::duration_cast<std::chrono::milliseconds>(timeNow.time_since_epoch()).count();
134 mProcessor->setStartValidityMagFi(startValidity);
137 LOG(info) <<
"Change start validity for LHCIF to " << startValidity;
139 mProcessor->setStartValidityLHCIF(startValidity);
143 LOG(info) <<
"Change start validity for Env Variables to " << startValidity;
145 mProcessor->setStartValidityEnvVa(startValidity);
149 LOG(info) <<
"Change start validity for Collimators to " << startValidity;
151 mProcessor->setStartValidityColli(startValidity);
153 mProcessor->process(dps);
154 Duration elapsedTime = timeNow - mTimer;
155 if (elapsedTime.count() >= mDPsUpdateInterval || mProcessor->isLHCIFInfoUpdated()) {
157 if (elapsedTime.count() >= mDPsUpdateInterval) {
159 LOG(info) <<
"enough time passed (" << elapsedTime.count() <<
" s), sending to CCDB LHCIFDPs";
163 LOG(info) <<
"sending to CCDB LHCIFDPs since something changed";
166 mProcessor->updateLHCIFInfoCCDB();
167 sendLHCIFDPsoutput(pc.
outputs());
168 mProcessor->resetAndKeepLastLHCIFDPs();
169 mLHCIFupdated =
true;
170 mProcessor->resetPIDsLHCIF();
172 if (elapsedTime.count() >= mDPsUpdateInterval) {
176 LOG(info) <<
"enough time passed (" << elapsedTime.count() <<
" s), sending to CCDB Env and Coll";
178 mProcessor->updateCollimatorsCCDB();
179 sendCollimatorsDPsoutput(pc.
outputs());
180 mProcessor->resetAndKeepLast(mProcessor->getCollimatorsObj().mCollimators);
182 mProcessor->updateEnvVarsCCDB();
183 sendEnvVarsDPsoutput(pc.
outputs());
184 mProcessor->resetAndKeepLast(mProcessor->getEnvVarsObj().mEnvVars);
186 mProcessor->resetPIDs();
188 if (mProcessor->isMagFieldUpdated()) {
189 sendMagFieldDPsoutput(pc.
outputs());
201 LOG(info) <<
" ********** End of Stream **********";
204 if (!mLHCIFupdated) {
205 mProcessor->updateLHCIFInfoCCDB();
206 sendLHCIFDPsoutput(ec.
outputs());
208 mProcessor->updateCollimatorsCCDB();
209 sendCollimatorsDPsoutput(ec.
outputs());
211 mProcessor->updateEnvVarsCCDB();
212 sendEnvVarsDPsoutput(ec.
outputs());
221 const auto& payload = mProcessor->getLHCIFObj();
222 auto& info = mProcessor->getccdbLHCIFInfo();
224 LOG(info) <<
"Sending object " << info.getPath() <<
"/" << info.getFileName() <<
" of size " <<
image->size()
225 <<
" bytes, valid for " << info.getStartValidityTimestamp() <<
" : " << info.getEndValidityTimestamp();
228 mProcessor->resetStartValidityLHCIF();
236 const auto& payload = mProcessor->getMagFieldObj();
237 auto& info = mProcessor->getccdbMagFieldInfo();
239 LOG(info) <<
"Sending object " << info.getPath() <<
"/" << info.getFileName() <<
" of size " <<
image->size()
240 <<
" bytes, valid for " << info.getStartValidityTimestamp() <<
" : " << info.getEndValidityTimestamp();
251 const auto& payload = mProcessor->getCollimatorsObj();
252 if (payload.totalEntries() == 0) {
253 if ((mEmptyCyclesCollimators %
size_t(mWarnEmptyCycles)) == 0) {
254 LOGP(alarm,
"No Collimator DPs were received after {} {}-s cycles", mEmptyCyclesCollimators, mDPsUpdateInterval);
256 mEmptyCyclesCollimators++;
258 mEmptyCyclesCollimators = 0;
260 auto& info = mProcessor->getccdbCollimatorsInfo();
262 LOG(info) <<
"Sending object " << info.getPath() <<
"/" << info.getFileName() <<
" of size " <<
image->size()
263 <<
" bytes, valid for " << info.getStartValidityTimestamp() <<
" : " << info.getEndValidityTimestamp();
266 mProcessor->resetStartValidityColli();
275 const auto& payload = mProcessor->getEnvVarsObj();
276 if (payload.totalEntries() == 0) {
277 if ((mEmptyCyclesEnvVars %
size_t(mWarnEmptyCycles)) == 0) {
278 LOGP(alarm,
"No EnvVar DPs were received after {} {}-s cycles", mEmptyCyclesEnvVars, mDPsUpdateInterval);
280 mEmptyCyclesEnvVars++;
282 mEmptyCyclesEnvVars = 0;
284 auto& info = mProcessor->getccdbEnvVarsInfo();
286 LOG(info) <<
"Sending object " << info.getPath() <<
"/" << info.getFileName() <<
" of size " <<
image->size()
287 <<
" bytes, valid for " << info.getStartValidityTimestamp() <<
" : " << info.getEndValidityTimestamp();
290 mProcessor->resetStartValidityEnvVa();
302 std::vector<OutputSpec> outputs;
316 "grp-dcs-data-processor",
317 Inputs{{
"input",
"DCS",
"GRPDATAPOINTS"}},
319 AlgorithmSpec{adaptFromTask<o2::grp::GRPDCSDPsDataProcessor>()},
324 {
"DPs-update-interval",
VariantType::Int64, 600ll, {
"Interval (in s) after which to update the DPs CCDB entry"}},
325 {
"warn-empty-cycles",
VariantType::Int, 1, {
"Warn about empty object after this number of cycles"}},
326 {
"clear-vectors",
VariantType::Bool,
false, {
"Clear vectors when starting processing for a new CCDB entry (latest value will not be kept)"}}}};
Utils and constants for calibration and related workflows.
Header to collect definitions for different units.
static BasicCCDBManager & instance()
void init(std::string const &hosts)
static std::unique_ptr< std::vector< char > > createObjectImage(const T *obj, CcdbObjectInfo *info=nullptr)
static constexpr long INFINITE_TIMESTAMP
T get(const char *key) const
DataAllocator & outputs()
ConfigParamRegistry const & options()
DataAllocator & outputs()
The data allocator is used to allocate memory for the output data.
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void run(o2::framework::ProcessingContext &pc) final
void init(o2::framework::InitContext &ic) final
information complementary to a CCDB object (path, metadata, startTimeValidity, endTimeValidity etc)
std::vector< std::string > expandAliases(const std::vector< std::string > &patternedAliases)
std::vector< InputSpec > Inputs
std::chrono::duration< double, std::ratio< 1, 1 > > Duration
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
static constexpr o2::header::DataOrigin gDataOriginCDBWrapper
static constexpr o2::header::DataOrigin gDataOriginCDBPayload
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"