Project
Loading...
Searching...
No Matches
CCDBPopulatorSpec.h
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#ifndef O2_CALIBRATION_CCDBPOPULATOR_H
13#define O2_CALIBRATION_CCDBPOPULATOR_H
14
17
22#include "Framework/Task.h"
26#include "Headers/DataHeader.h"
28#include "CCDB/CcdbApi.h"
29#include "CCDB/CcdbObjectInfo.h"
32#include <unordered_map>
33#include <chrono>
34#include <vector>
35#include <utility>
36#include <map>
37
38namespace o2
39{
40namespace calibration
41{
42
44{
45 public:
48
49 using BLOB = std::vector<char>;
50 using TBLOB = std::pair<long, BLOB>; // pair of creation time and object to upload
51 using OBJCACHE = std::map<CcdbObjectInfo, TBLOB>;
52
53 void init(o2::framework::InitContext& ic) final;
56 void stop() final;
57
58 void checkCache(long delay);
59 void doUpload(const CcdbObjectInfo& wrp, const gsl::span<const char>& pld, bool cached = false);
60 void logAsNeeded(long nowMS, const std::string& path, std::string& msg);
61
62 private:
63 CcdbApi mAPI;
64 long mThrottlingDelayMS = 0; // LOG(important) at most once per this period for given path
65 int mOrderingLatencyMS = -1; // if >0, bufferize and upload if no object with smaller SOV was received in this time interval in ms
66 bool mFatalOnFailure = true; // produce fatal on failed upload
67 bool mValidateUpload = false; // validate upload by querying its headers
68 bool mEnded = false;
69 std::unordered_map<std::string, std::pair<long, int>> mThrottling;
70 std::unordered_map<std::string, OBJCACHE> mOrdCache;
71 std::int64_t mSSpecMin = -1; // min subspec to accept
72 std::int64_t mSSpecMax = -1; // max subspec to accept
73 std::string mCCDBpath = "http://ccdb-test.cern.ch:8080"; // CCDB path
74 int mRunNoFromDH = 0;
75 std::string mRunNoStr = {};
76};
77
79{
80 mCCDBpath = ic.options().get<std::string>("ccdb-path");
81 mSSpecMin = ic.options().get<std::int64_t>("sspec-min");
82 mSSpecMax = ic.options().get<std::int64_t>("sspec-max");
83 mFatalOnFailure = ic.options().get<bool>("fatal-on-failure");
84 mValidateUpload = ic.options().get<bool>("validate-upload");
85 mThrottlingDelayMS = ic.options().get<std::int64_t>("throttling-delay");
86 mOrderingLatencyMS = ic.options().get<int>("ordering-latency");
87 mAPI.init(mCCDBpath);
88}
89
91{
92 int nSlots = pc.inputs().getNofParts(0);
93 if (nSlots != pc.inputs().getNofParts(1)) {
94 LOGP(alarm, "Number of slots={} in part0 is different from that ({}) in part1", nSlots, pc.inputs().getNofParts(1));
95 return;
96 } else if (nSlots == 0) {
97 LOG(alarm) << "0 slots received";
98 return;
99 }
100 mRunNoFromDH = pc.services().get<o2::framework::TimingInfo>().runNumber;
101 if (mRunNoFromDH > 0) {
102 mRunNoStr = std::to_string(mRunNoFromDH);
103 }
104 auto nowMS = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
105 for (int isl = 0; isl < nSlots; isl++) {
106 auto refWrp = pc.inputs().get("clbWrapper", isl);
107 auto refPld = pc.inputs().get("clbPayload", isl);
109 LOGP(alarm, "Wrapper is not valid for slot {}", isl);
110 continue;
111 }
113 LOGP(alarm, "Payload is not valid for slot {}", isl);
114 continue;
115 }
116 if (mSSpecMin >= 0 && mSSpecMin <= mSSpecMax) { // there is a selection
117 auto ss = std::int64_t(o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(refWrp)->subSpecification);
118 if (ss < mSSpecMin || ss > mSSpecMax) {
119 continue;
120 }
121 }
122 const auto wrp = pc.inputs().get<CcdbObjectInfo*>(refWrp);
123 const auto pld = pc.inputs().get<gsl::span<char>>(refPld); // this is actually an image of TMemFile
124 if (!wrp) {
125 LOGP(alarm, "No CcdbObjectInfo info for {} at slot {}",
126 o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(refWrp)->dataDescription.as<std::string>(), isl);
127 continue;
128 }
129 if (mOrderingLatencyMS <= 0) { // ordering is not requested
130 doUpload(*wrp, pld);
131 } else {
132 auto& pathCache = mOrdCache[wrp->getPath()];
133 auto stt = pathCache.emplace(*wrp, std::make_pair(nowMS, std::vector<char>(pld.size())));
134 if (stt.second) { // insertion success
135 stt.first->second.second.assign(pld.begin(), pld.end());
136 std::string msg = fmt::format("Bufferizing for ordering ccdb object {}/{} of size {} valid for {} : {}",
137 wrp->getPath(), wrp->getFileName(), pld.size(), wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp());
138 logAsNeeded(nowMS, wrp->getPath(), msg);
139 } else {
140 bool v = stt.first != pathCache.end();
141 LOGP(error, "failed to bufferize a {} object with SOV={}/EOV={} received at {}, conflicting with previously bufferized one SOV={}/EOV={} received at {}",
142 wrp->getPath(), wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp(), nowMS,
143 v ? std::to_string(stt.first->first.getStartValidityTimestamp()) : std::string{"N/A"},
144 v ? std::to_string(stt.first->first.getEndValidityTimestamp()) : std::string{"N/A"},
145 v ? std::to_string(stt.first->second.first) : std::string{"N/A"});
146 }
147 }
148 }
149 if (mOrderingLatencyMS > 0) {
150 checkCache(mOrderingLatencyMS);
151 }
152}
153
155{
156 // check if some entries in cache are ripe enough to upload
157 auto nowMS = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
158 for (auto& pathCache : mOrdCache) { // loop over paths
159 if (delay < 0 && pathCache.second.size()) {
160 LOGP(important, "Uploading {} cached objects for path {}", pathCache.second.size(), pathCache.first);
161 }
162 for (auto it = pathCache.second.begin(); it != pathCache.second.end();) { // loop over objects of the path
163 if (nowMS - it->second.first > delay) {
164 doUpload(it->first, {it->second.second.data(), it->second.second.size()}, true);
165 it = pathCache.second.erase(it);
166 } else {
167 break;
168 }
169 }
170 }
171}
172
173void CCDBPopulator::doUpload(const CcdbObjectInfo& wrp, const gsl::span<const char>& pld, bool cached)
174{
175 std::string msg = fmt::format("Storing in ccdb {}{}/{} of size {} valid for {} : {}", cached ? "cached " : "", wrp.getPath(), wrp.getFileName(), pld.size(), wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
176 auto uploadTS = o2::ccdb::getCurrentTimestamp();
177 logAsNeeded(uploadTS, wrp.getPath(), msg);
178 std::map<std::string, std::string> metadata;
179 const auto* md = &wrp.getMetaData();
180 if (mRunNoFromDH > 0 && md->find(o2::base::NameConf::CCDBRunTag.data()) == md->end()) { // if valid run number is provided and it is not filled in the metadata, add it to the clone
181 metadata = *md; // clone since the md from the message is const
182 metadata[o2::base::NameConf::CCDBRunTag.data()] = mRunNoStr;
183 md = &metadata;
184 }
185 int res = mAPI.storeAsBinaryFile(&pld[0], pld.size(), wrp.getFileName(), wrp.getObjectType(), wrp.getPath(), *md, wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
186 if (res) {
187 if (mFatalOnFailure) {
188 LOGP(fatal, "failed on uploading to {} / {} for [{}:{}]", mAPI.getURL(), wrp.getPath(), wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
189 } else {
190 LOGP(error, "failed on uploading to {} / {} for [{}:{}]", mAPI.getURL(), wrp.getPath(), wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
191 }
192 }
193 // if requested, make sure that the new object can be queried
194 if (mValidateUpload || wrp.getValidateUpload()) {
195 constexpr long MAXDESYNC = 3;
196 auto headers = mAPI.retrieveHeaders(wrp.getPath(), {}, wrp.getStartValidityTimestamp() + (wrp.getEndValidityTimestamp() - wrp.getStartValidityTimestamp()) / 2);
197 if (headers.empty() ||
198 std::atol(headers["Created"].c_str()) < uploadTS - MAXDESYNC ||
199 std::atol(headers["Valid-From"].c_str()) != wrp.getStartValidityTimestamp() ||
200 std::atol(headers["Valid-Until"].c_str()) != wrp.getEndValidityTimestamp()) {
201 if (mFatalOnFailure) {
202 LOGP(fatal, "Failed to validate upload to {} / {} for [{}:{}]", mAPI.getURL(), wrp.getPath(), wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
203 } else {
204 LOGP(error, "Failed to validate upload to {} / {} for [{}:{}]", mAPI.getURL(), wrp.getPath(), wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
205 }
206 } else {
207 LOGP(important, "Validated upload to {} / {} for [{}:{}]", mAPI.getURL(), wrp.getPath(), wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
208 }
209 }
210 if (wrp.isAdjustableEOV() && !mAPI.isSnapshotMode()) {
212 }
213}
214
215void CCDBPopulator::logAsNeeded(long nowMS, const std::string& path, std::string& msg)
216{
217 auto& lastLog = mThrottling[path];
218 if (lastLog.first + mThrottlingDelayMS < nowMS) {
219 if (lastLog.second) {
220 msg += fmt::format(" ({} uploads were logged as INFO)", lastLog.second);
221 lastLog.second = 0;
222 }
223 lastLog.first = nowMS;
224 LOG(important) << msg;
225 } else {
226 lastLog.second++;
227 LOG(info) << msg;
228 }
229}
230
232{
233 if (mEnded) {
234 return;
235 }
236 mEnded = true;
237 LOG(info) << "EndOfStream received";
238 if (mOrderingLatencyMS > 0) {
239 checkCache(-mOrderingLatencyMS); // force
240 }
241}
242
244{
245 if (mEnded) {
246 return;
247 }
248 mEnded = true;
249 LOG(info) << "Forced stop";
250 if (mOrderingLatencyMS > 0) {
251 checkCache(-mOrderingLatencyMS); // force
252 }
253}
254
255} // namespace calibration
256
257namespace framework
258{
259
260DataProcessorSpec getCCDBPopulatorDeviceSpec(const std::string& defCCDB, const std::string& nameExt)
261{
263 std::vector<InputSpec> inputs = {{"clbPayload", "CLP", Lifetime::Sporadic}, {"clbWrapper", "CLW", Lifetime::Sporadic}};
264 std::string devName = "ccdb-populator";
265 devName += nameExt;
266 return DataProcessorSpec{
267 devName,
268 inputs,
269 Outputs{},
270 AlgorithmSpec{adaptFromTask<o2::calibration::CCDBPopulator>()},
271 Options{
272 {"ccdb-path", VariantType::String, defCCDB, {"Path to CCDB"}},
273 {"sspec-min", VariantType::Int64, -1L, {"min subspec to accept"}},
274 {"sspec-max", VariantType::Int64, -1L, {"max subspec to accept"}},
275 {"ordering-latency", VariantType::Int, -1, {"if enabled (positive) bufferize object and upload it if no object with smaller SOV received in given waiting time (ms)"}},
276 {"throttling-delay", VariantType::Int64, 300000L, {"produce important type log at most once per this period in ms for each CCDB path"}},
277 {"validate-upload", VariantType::Bool, false, {"valider upload by querying its headers"}},
278 {"fatal-on-failure", VariantType::Bool, false, {"do not produce fatal on failed upload"}}}};
279}
280
281} // namespace framework
282} // namespace o2
283
284#endif
Utils and constants for calibration and related workflows.
Definition of the Names Generator class.
uint32_t res
Definition RawData.h:0
static constexpr std::string_view CCDBRunTag
Definition NameConf.h:69
void init(o2::framework::InitContext &ic) final
void logAsNeeded(long nowMS, const std::string &path, std::string &msg)
std::map< CcdbObjectInfo, TBLOB > OBJCACHE
void doUpload(const CcdbObjectInfo &wrp, const gsl::span< const char > &pld, bool cached=false)
std::pair< long, BLOB > TBLOB
void run(o2::framework::ProcessingContext &pc) final
void stop() final
This is invoked on stop.
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void init(std::string const &hosts)
Definition CcdbApi.cxx:165
std::string const & getURL() const
Definition CcdbApi.h:87
bool isSnapshotMode() const
Definition CcdbApi.h:93
std::map< std::string, std::string > retrieveHeaders(std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp=-1) const
Definition CcdbApi.cxx:1416
int storeAsBinaryFile(const char *buffer, size_t size, const std::string &fileName, const std::string &objectType, const std::string &path, const std::map< std::string, std::string > &metadata, long startValidityTimestamp, long endValidityTimestamp, std::vector< char >::size_type maxSize=0) const
Definition CcdbApi.cxx:351
long getEndValidityTimestamp() const
const std::string & getPath() const
const std::string & getObjectType() const
const std::map< std::string, std::string > & getMetaData() const
long getStartValidityTimestamp() const
const std::string & getFileName() const
ConfigParamRegistry const & options()
Definition InitContext.h:33
decltype(auto) get(R binding, int part=0) const
size_t getNofParts(int pos) const
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
const GLdouble * v
Definition glcorearb.h:832
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
long getCurrentTimestamp()
returns the timestamp in long corresponding to "now"
int adjustOverriddenEOV(CcdbApi &api, const CcdbObjectInfo &infoNew)
set EOV of overriden objects to SOV-1 of overriding one if it is allowed
std::vector< OutputSpec > Outputs
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
Defining DataPointCompositeObject explicitly as copiable.
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
static bool isValid(DataRef const &ref)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
uint64_t const void const *restrict const msg
Definition x9.h:153