12#ifndef O2_CALIBRATION_CCDBPOPULATOR_H
13#define O2_CALIBRATION_CCDBPOPULATOR_H
32#include <unordered_map>
49 using BLOB = std::vector<char>;
50 using TBLOB = std::pair<long, BLOB>;
51 using OBJCACHE = std::map<CcdbObjectInfo, TBLOB>;
64 long mThrottlingDelayMS = 0;
65 int mOrderingLatencyMS = -1;
66 bool mFatalOnFailure = true;
67 bool mValidateUpload = false;
69 std::unordered_map<
std::
string,
std::pair<
long,
int>> mThrottling;
71 std::int64_t mSSpecMin = -1;
72 std::int64_t mSSpecMax = -1;
73 std::
string mCCDBpath = "http:
75 std::
string mRunNoStr = {};
80 mCCDBpath = ic.
options().
get<std::string>(
"ccdb-path");
81 mSSpecMin = ic.
options().
get<std::int64_t>(
"sspec-min");
82 mSSpecMax = ic.
options().
get<std::int64_t>(
"sspec-max");
83 mFatalOnFailure = ic.
options().
get<
bool>(
"fatal-on-failure");
84 mValidateUpload = ic.
options().
get<
bool>(
"validate-upload");
85 mThrottlingDelayMS = ic.
options().
get<std::int64_t>(
"throttling-delay");
86 mOrderingLatencyMS = ic.
options().
get<
int>(
"ordering-latency");
94 LOGP(alarm,
"Number of slots={} in part0 is different from that ({}) in part1", nSlots, pc.
inputs().
getNofParts(1));
96 }
else if (nSlots == 0) {
97 LOG(alarm) <<
"0 slots received";
101 if (mRunNoFromDH > 0) {
104 auto nowMS = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
105 for (
int isl = 0; isl < nSlots; isl++) {
106 auto refWrp = pc.
inputs().
get(
"clbWrapper", isl);
107 auto refPld = pc.
inputs().
get(
"clbPayload", isl);
109 LOGP(alarm,
"Wrapper is not valid for slot {}", isl);
113 LOGP(alarm,
"Payload is not valid for slot {}", isl);
116 if (mSSpecMin >= 0 && mSSpecMin <= mSSpecMax) {
117 auto ss = std::int64_t(o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(refWrp)->subSpecification);
118 if (ss < mSSpecMin || ss > mSSpecMax) {
123 const auto pld = pc.
inputs().
get<gsl::span<char>>(refPld);
125 LOGP(alarm,
"No CcdbObjectInfo info for {} at slot {}",
126 o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(refWrp)->dataDescription.as<std::string>(), isl);
129 if (mOrderingLatencyMS <= 0) {
132 auto& pathCache = mOrdCache[wrp->getPath()];
133 auto stt = pathCache.emplace(*wrp, std::make_pair(nowMS, std::vector<char>(pld.size())));
135 stt.first->second.second.assign(pld.begin(), pld.end());
136 std::string
msg = fmt::format(
"Bufferizing for ordering ccdb object {}/{} of size {} valid for {} : {}",
140 bool v = stt.first != pathCache.end();
141 LOGP(error,
"failed to bufferize a {} object with SOV={}/EOV={} received at {}, conflicting with previously bufferized one SOV={}/EOV={} received at {}",
142 wrp->getPath(), wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp(), nowMS,
143 v ?
std::to_string(stt.first->first.getStartValidityTimestamp()) : std::string{
"N/A"},
144 v ?
std::to_string(stt.first->first.getEndValidityTimestamp()) : std::string{
"N/A"},
149 if (mOrderingLatencyMS > 0) {
157 auto nowMS = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
158 for (
auto& pathCache : mOrdCache) {
159 if (delay < 0 && pathCache.second.size()) {
160 LOGP(important,
"Uploading {} cached objects for path {}", pathCache.second.size(), pathCache.first);
162 for (
auto it = pathCache.second.begin(); it != pathCache.second.end();) {
163 if (nowMS - it->second.first > delay) {
164 doUpload(it->first, {it->second.second.data(), it->second.second.size()},
true);
165 it = pathCache.second.erase(it);
178 std::map<std::string, std::string> metadata;
187 if (mFatalOnFailure) {
195 constexpr long MAXDESYNC = 3;
197 if (headers.empty() ||
198 std::atol(headers[
"Created"].c_str()) < uploadTS - MAXDESYNC ||
201 if (mFatalOnFailure) {
217 auto& lastLog = mThrottling[
path];
218 if (lastLog.first + mThrottlingDelayMS < nowMS) {
219 if (lastLog.second) {
220 msg += fmt::format(
" ({} uploads were logged as INFO)", lastLog.second);
223 lastLog.first = nowMS;
237 LOG(info) <<
"EndOfStream received";
238 if (mOrderingLatencyMS > 0) {
249 LOG(info) <<
"Forced stop";
250 if (mOrderingLatencyMS > 0) {
260DataProcessorSpec getCCDBPopulatorDeviceSpec(
const std::string& defCCDB,
const std::string& nameExt)
263 std::vector<InputSpec> inputs = {{
"clbPayload",
"CLP", Lifetime::Sporadic}, {
"clbWrapper",
"CLW", Lifetime::Sporadic}};
264 std::string devName =
"ccdb-populator";
270 AlgorithmSpec{adaptFromTask<o2::calibration::CCDBPopulator>()},
275 {
"ordering-latency",
VariantType::Int, -1, {
"if enabled (positive) bufferize object and upload it if no object with smaller SOV received in given waiting time (ms)"}},
276 {
"throttling-delay",
VariantType::Int64, 300000L, {
"produce important type log at most once per this period in ms for each CCDB path"}},
277 {
"validate-upload",
VariantType::Bool,
false, {
"valider upload by querying its headers"}},
278 {
"fatal-on-failure",
VariantType::Bool,
false, {
"do not produce fatal on failed upload"}}}};
Utils and constants for calibration and related workflows.
Definition of the Names Generator class.
static constexpr std::string_view CCDBRunTag
void init(o2::framework::InitContext &ic) final
void logAsNeeded(long nowMS, const std::string &path, std::string &msg)
std::map< CcdbObjectInfo, TBLOB > OBJCACHE
void doUpload(const CcdbObjectInfo &wrp, const gsl::span< const char > &pld, bool cached=false)
std::pair< long, BLOB > TBLOB
void checkCache(long delay)
void run(o2::framework::ProcessingContext &pc) final
void stop() final
This is invoked on stop.
void endOfStream(o2::framework::EndOfStreamContext &ec) final
This is invoked whenever we have an EndOfStream event.
void init(std::string const &hosts)
std::string const & getURL() const
bool isSnapshotMode() const
std::map< std::string, std::string > retrieveHeaders(std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp=-1) const
int storeAsBinaryFile(const char *buffer, size_t size, const std::string &fileName, const std::string &objectType, const std::string &path, const std::map< std::string, std::string > &metadata, long startValidityTimestamp, long endValidityTimestamp, std::vector< char >::size_type maxSize=0) const
bool getValidateUpload() const
long getEndValidityTimestamp() const
const std::string & getPath() const
bool isAdjustableEOV() const
const std::string & getObjectType() const
const std::map< std::string, std::string > & getMetaData() const
long getStartValidityTimestamp() const
const std::string & getFileName() const
T get(const char *key) const
ConfigParamRegistry const & options()
InputRecord & inputs()
The inputs associated with this processing context.
ServiceRegistryRef services()
The services registry associated with this processing context.
GLsizei const GLchar *const * path
long getCurrentTimestamp()
returns the timestamp in long corresponding to "now"
int adjustOverriddenEOV(CcdbApi &api, const CcdbObjectInfo &infoNew)
set EOV of overriden objects to SOV-1 of overriding one if it is allowed
std::vector< OutputSpec > Outputs
a couple of static helper functions to create timestamp values for CCDB queries or override obsolete ...
Defining DataPointCompositeObject explicitly as copiable.
std::string to_string(gsl::span< T, Size > span)
static bool isValid(DataRef const &ref)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
uint64_t const void const *restrict const msg