Project
Loading...
Searching...
No Matches
rct-updater-workflow.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include "Framework/Logger.h"
15#include "Framework/InputSpec.h"
16#include "Framework/Task.h"
19
20using namespace o2::framework;
21
22void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
23{
24 std::vector<o2::framework::ConfigParamSpec> options{{"configKeyValues", o2::framework::VariantType::String, "", {"Semicolon separated key=value strings ..."}}};
25 std::swap(workflowOptions, options);
26}
27
29#include "CCDB/CcdbApi.h"
31
32namespace o2::rct
33{
35{
36 public:
37 RCTUpdaterSpec(std::shared_ptr<o2::base::GRPGeomRequest> gr) : mGGCCDBRequest(gr) {}
38 ~RCTUpdaterSpec() final = default;
39
40 void init(InitContext& ic) final
41 {
43 mUpdateInterval = std::max(0.1f, ic.options().get<float>("update-interval"));
44 auto ccdb = ic.options().get<std::string>("ccdb-server");
45 if (!ccdb.empty() && ccdb != "none") {
46 mCCDBApi = std::make_unique<o2::ccdb::CcdbApi>();
47 mCCDBApi->init(ic.options().get<std::string>("ccdb-server"));
48 } else {
49 LOGP(warn, "No ccdb server provided, no RCT update will be done");
50 }
51 mTimeToleranceMS = ic.options().get<int>("max-diff-orbit-creationtime");
52 mMaxWarnings = ic.options().get<int>("max-warn-tf-discard");
53 }
54
55 void run(ProcessingContext& pc) final
56 {
58 auto tinfo = pc.services().get<o2::framework::TimingInfo>();
59 if (tinfo.globalRunNumberChanged) { // do we have RCT object?
60 const auto* grp = o2::base::GRPGeomHelper::instance().getGRPECS();
61 mNHBFPerTF = grp->getNHBFPerTF();
62 if (mNHBFPerTF < 1) {
63 mNHBFPerTF = 32;
64 }
65 mRunNumber = tinfo.runNumber;
66 mUpdateIntervalTF = uint32_t(mUpdateInterval / (mNHBFPerTF * o2::constants::lhc::LHCOrbitMUS * 1e-6)); // convert update interval in seconds to interval in TFs
67 LOGP(info, "Will update RCT after {} TFs of {} HBFs ({}s was requested)", mUpdateIntervalTF, mNHBFPerTF, mUpdateInterval);
69 mMinOrbit = 0xffffffff;
70 mMaxOrbit = 0;
71 if (grp->getRunType() == o2::parameters::GRPECS::PHYSICS || grp->getRunType() == o2::parameters::GRPECS::COSMICS) {
72 mEnabled = true;
73 } else {
74 LOGP(warning, "Run {} type is {}, disabling RCT update", mRunNumber, o2::parameters::GRPECS::RunTypeNames[grp->getRunType()]);
75 mEnabled = false;
76 }
77 if (mEnabled) {
78 if (mCCDBApi) {
79 auto md = mCCDBApi->retrieveHeaders("RCT/Info/RunInformation", {}, grp->getRun());
80 if (md.empty()) {
81 mEnabled = false;
82 LOGP(alarm, "RCT object is missing for {} run {}, disabling RCT updater", o2::parameters::GRPECS::RunTypeNames[grp->getRunType()], grp->getRun());
83 }
84 }
85 }
86 }
87 if (mEnabled) {
88 // make sure that the orbit makes sense, since sometimes at the EOR bogus TFs are sent.
89 long ts = mOrbitReset + long(tinfo.firstTForbit * o2::constants::lhc::LHCOrbitMUS * 1e-3);
90 if (mTimeToleranceMS > 0 && std::abs(int64_t(tinfo.creation) - ts) < mTimeToleranceMS) {
91 if (tinfo.firstTForbit < mMinOrbit) {
92 mMinOrbit = tinfo.firstTForbit;
93 }
94 if (tinfo.firstTForbit > mMaxOrbit) {
95 mMaxOrbit = tinfo.firstTForbit;
96 }
97 if (tinfo.tfCounter > mLastTFUpdate + mUpdateIntervalTF) { // need to update
98 mLastTFUpdate = tinfo.tfCounter;
99 updateRCT();
100 }
101 } else {
102 static int nWarn = 0;
103 if (nWarn < mMaxWarnings) {
104 nWarn++;
105 LOGP(warn, "timestamp {} for orbit {} and orbit reset time {} differs by >{} from the TF creation time {}, ignore TF {}", ts, tinfo.firstTForbit, mOrbitReset / 1000, mTimeToleranceMS, tinfo.creation, tinfo.tfCounter);
106 }
107 }
108 }
109 }
110
112 {
113 if (mEnabled) {
114 updateRCT();
115 mEnabled = false;
116 }
117 }
118
119 void stop() final
120 {
121 if (mEnabled) {
122 updateRCT();
123 mEnabled = false;
124 }
125 }
126
127 void finaliseCCDB(framework::ConcreteDataMatcher& matcher, void* obj) final
128 {
130 return;
131 }
132 }
133
135 {
136 std::map<std::string, std::string> mdRCT;
137 if (mMinOrbit > mMaxOrbit) {
138 return;
139 }
140 mdRCT["STF"] = std::to_string(long(mMinOrbit * o2::constants::lhc::LHCOrbitMUS * 1e-3) + mOrbitReset);
141 mdRCT["ETF"] = std::to_string(long((mMaxOrbit + mNHBFPerTF - 1) * o2::constants::lhc::LHCOrbitMUS * 1e-3) + mOrbitReset);
142 long startValRCT = (long)mRunNumber;
143 long endValRCT = (long)(mRunNumber + 1);
144 if (mCCDBApi) {
145 int retValRCT = mCCDBApi->updateMetadata("RCT/Info/RunInformation", mdRCT, startValRCT);
146 if (retValRCT == 0) {
147 LOGP(info, "Updated {}/RCT/Info/RunInformation object for run {} with TF start:{} end:{}", mCCDBApi->getURL(), mRunNumber, mdRCT["STF"], mdRCT["ETF"]);
148 } else {
149 LOGP(alarm, "Update of RCT object for run {} with TF start:{} end:{} FAILED, returned with code {}", mRunNumber, mdRCT["STF"], mdRCT["ETF"], retValRCT);
150 }
151 } else {
152 LOGP(info, "CCDB update disabled, TF timestamp range is {}:{}", mdRCT["STF"], mdRCT["ETF"]);
153 }
154 }
155
156 private:
157 bool mEnabled = true;
158 float mUpdateInterval = 1.;
159 int mUpdateIntervalTF = 1;
160 uint32_t mMinOrbit = 0xffffffff;
161 uint32_t mMaxOrbit = 0;
162 uint32_t mLastTFUpdate = 0;
163 long mTimeToleranceMS = 5000;
164 long mOrbitReset = 0;
165 int mRunNumber = 0;
166 int mNHBFPerTF = 32;
167 int mMaxWarnings = 0;
168 std::shared_ptr<o2::base::GRPGeomRequest> mGGCCDBRequest;
169 std::unique_ptr<o2::ccdb::CcdbApi> mCCDBApi;
170};
171} // namespace o2::rct
172
173// ------------------------------------------------------------------
176
178{
179 WorkflowSpec specs;
180 o2::conf::ConfigurableParam::updateFromString(configcontext.options().get<std::string>("configKeyValues"));
181 std::vector<InputSpec> inputs{{"ctfdone", "CTF", "DONE", 0, Lifetime::Timeframe}};
182 std::vector<OutputSpec> outputs;
183 outputs.emplace_back(ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBPayload, "RCTUPD_DUMMY"}, Lifetime::Sporadic);
184 outputs.emplace_back(ConcreteDataTypeMatcher{o2::calibration::Utils::gDataOriginCDBWrapper, "RCTUPD_DUMMY"}, Lifetime::Sporadic);
185 auto ggRequest = std::make_shared<o2::base::GRPGeomRequest>(true, // orbitResetTime
186 true, // GRPECS=true
187 false, // GRPLHCIF
188 false, // GRPMagField
189 false, // askMatLUT
191 inputs,
192 true); // query only once all objects except mag.field
193 specs.push_back(DataProcessorSpec{
194 "rct-updater",
195 inputs,
196 outputs,
197 AlgorithmSpec{adaptFromTask<o2::rct::RCTUpdaterSpec>(ggRequest)},
198 Options{
199 {"update-interval", VariantType::Float, 1.f, {"update every ... seconds"}},
200 {"max-diff-orbit-creationtime", VariantType::Int, -1, {"max difference between TF creation-time and orbit-time to discard TF, do not check if negative"}},
201 {"max-warn-tf-discard", VariantType::Int, 10, {"max N warnings about discarding bad TFs"}},
202 {"ccdb-server", VariantType::String, "http://ccdb-test.cern.ch:8080", {"CCDB to update"}}}});
203 return specs;
204}
Utils and constants for calibration and related workflows.
Header of the AggregatedRunInfo struct.
Helper for geometry and GRP related CCDB requests.
auto getOrbitResetTimeMS() const
void checkUpdates(o2::framework::ProcessingContext &pc)
static GRPGeomHelper & instance()
void setRequest(std::shared_ptr< GRPGeomRequest > req)
static void updateFromString(std::string const &)
ConfigParamRegistry & options() const
~RCTUpdaterSpec() final=default
void run(ProcessingContext &pc) final
void init(InitContext &ic) final
void endOfStream(framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void stop() final
This is invoked on stop.
void finaliseCCDB(framework::ConcreteDataMatcher &matcher, void *obj) final
RCTUpdaterSpec(std::shared_ptr< o2::base::GRPGeomRequest > gr)
constexpr double LHCOrbitMUS
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
std::vector< DataProcessorSpec > WorkflowSpec
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
void customize(std::vector< o2::framework::ConfigParamSpec > &workflowOptions)
WorkflowSpec defineDataProcessing(ConfigContext const &configcontext)
static constexpr o2::header::DataOrigin gDataOriginCDBWrapper
Definition Utils.h:44
static constexpr o2::header::DataOrigin gDataOriginCDBPayload
Definition Utils.h:43