192 std::string ccdbMetadataPrefix =
"ccdb-metadata-";
199 O2_SIGNPOST_START(ccdb, sid,
"populateCacheWith",
"Starting to populate cache with CCDB objects");
200 for (
auto& route : helper->routes) {
201 int64_t timestampToUse = timestamp;
206 auto&&
v = allocator.makeVector<
char>(
output);
207 std::map<std::string, std::string> metadata;
208 std::map<std::string, std::string> headers;
209 std::string
path =
"";
210 std::string etag =
"";
211 int chRate = helper->queryPeriodGlo;
212 bool checkValidity =
false;
213 for (
auto& meta : route.matcher.metadata) {
214 if (meta.name ==
"ccdb-path") {
215 path = meta.defaultValue.get<std::string>();
216 }
else if (meta.name ==
"ccdb-run-dependent" && meta.defaultValue.get<
int>() > 0) {
217 if (meta.defaultValue.get<
int>() == 1) {
218 metadata[
"runNumber"] = dtc.runNumber;
219 }
else if (meta.defaultValue.get<
int>() == 2) {
220 timestampToUse = std::stoi(dtc.runNumber);
222 LOGP(fatal,
"Undefined run-dependent option {} for spec {}/{}/{}", meta.defaultValue.get<
int>(), concrete.origin.as<std::string>(), concrete.description.as<std::string>(),
int(concrete.subSpec));
224 }
else if (
isPrefix(ccdbMetadataPrefix, meta.name)) {
225 std::string
key = meta.name.substr(ccdbMetadataPrefix.size());
226 auto value = meta.defaultValue.get<std::string>();
229 }
else if (meta.name ==
"ccdb-query-rate") {
230 chRate = meta.defaultValue.get<
int>() * helper->queryPeriodFactor;
233 const auto url2uuid = helper->mapURL2UUID.find(
path);
234 if (url2uuid != helper->mapURL2UUID.end()) {
235 etag = url2uuid->second.etag;
237 uint64_t validUntil = url2uuid->second.cacheValidUntil;
239 uint64_t cachePopulatedAt = url2uuid->second.cachePopulatedAt;
242 bool cacheExpired = (validUntil <= timestampToUse) || (timestamp < cachePopulatedAt);
243 checkValidity = (std::abs(
int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) && (isOnline || cacheExpired);
245 checkValidity =
true;
248 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"populateCacheWith",
"checkValidity is %{public}s for tfID %d of %{public}s", checkValidity ?
"true" :
"false", timingInfo.tfCounter,
path.data());
250 const auto& api = helper->getAPI(
path);
251 if (checkValidity && (!api.isSnapshotMode() || etag.empty())) {
252 LOGP(detail,
"Loading {} for timestamp {}",
path, timestampToUse);
253 api.loadFileToMemory(
v,
path, metadata, timestampToUse, &headers, etag, helper->createdNotAfter, helper->createdNotBefore);
254 if ((headers.count(
"Error") != 0) || (etag.empty() &&
v.empty())) {
255 LOGP(fatal,
"Unable to find object {}/{}",
path, timestampToUse);
260 if (headers.find(
"default") != headers.end()) {
261 LOGP(detail,
"******** Default entry used for {} ********",
path);
263 helper->mapURL2UUID[
path].lastCheckedTF = timingInfo.tfCounter;
265 helper->mapURL2UUID[
path].etag = headers[
"ETag"];
266 helper->mapURL2UUID[
path].cachePopulatedAt = timestampToUse;
267 helper->mapURL2UUID[
path].cacheMiss++;
268 helper->mapURL2UUID[
path].minSize = std::min(
v.size(), helper->mapURL2UUID[
path].minSize);
269 helper->mapURL2UUID[
path].maxSize = std::max(
v.size(), helper->mapURL2UUID[
path].maxSize);
270 api.appendFlatHeader(
v, headers);
272 helper->mapURL2DPLCache[
path] = cacheId;
273 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"populateCacheWith",
"Caching %{public}s for %{public}s (DPL id %" PRIu64
")",
path.data(), headers[
"ETag"].data(), cacheId.value);
278 helper->mapURL2UUID[
path].etag = headers[
"ETag"];
279 helper->mapURL2UUID[
path].cachePopulatedAt = timestampToUse;
280 helper->mapURL2UUID[
path].cacheValidUntil = headers[
"Cache-Valid-Until"].empty() ? 0 : std::stoul(headers[
"Cache-Valid-Until"]);
281 helper->mapURL2UUID[
path].cacheMiss++;
282 helper->mapURL2UUID[
path].minSize = std::min(
v.size(), helper->mapURL2UUID[
path].minSize);
283 helper->mapURL2UUID[
path].maxSize = std::max(
v.size(), helper->mapURL2UUID[
path].maxSize);
284 api.appendFlatHeader(
v, headers);
286 helper->mapURL2DPLCache[
path] = cacheId;
287 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"populateCacheWith",
"Caching %{public}s for %{public}s (DPL id %" PRIu64
")",
path.data(), headers[
"ETag"].data(), cacheId.value);
293 helper->mapURL2UUID[
path].cacheValidUntil = headers[
"Cache-Valid-Until"].empty() ? 0 : std::stoul(headers[
"Cache-Valid-Until"]);
297 auto cacheId = helper->mapURL2DPLCache[
path];
298 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"populateCacheWith",
"Reusing %{public}s for %{public}s (DPL id %" PRIu64
")",
path.data(), headers[
"ETag"].data(), cacheId.value);
299 helper->mapURL2UUID[
path].cacheHit++;
303 O2_SIGNPOST_END(ccdb, sid,
"populateCacheWith",
"Finished populating cache with CCDB objects");
309 std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
310 std::unordered_map<std::string, bool> accountedSpecs;
311 auto defHost = options.
get<std::string>(
"condition-backend");
312 auto checkRate = options.
get<
int>(
"condition-tf-per-query");
313 auto checkMult = options.
get<
int>(
"condition-tf-per-query-multiplier");
314 helper->timeToleranceMS = options.
get<int64_t>(
"condition-time-tolerance");
315 helper->queryPeriodGlo = checkRate > 0 ? checkRate : std::numeric_limits<int>::max();
316 helper->queryPeriodFactor = checkMult > 0 ? checkMult : 1;
317 LOGP(info,
"CCDB Backend at: {}, validity check for every {} TF{}", defHost, helper->queryPeriodGlo, helper->queryPeriodFactor == 1 ? std::string{} : fmt::format(
", (query for high-rate objects downscaled by {})", helper->queryPeriodFactor));
318 LOGP(info,
"Hook to enable signposts for CCDB messages at {}", (
void*)&private_o2_log_ccdb->stacktrace);
319 auto remapString = options.
get<std::string>(
"condition-remap");
321 if (!
result.error.empty()) {
324 helper->remappings =
result.remappings;
325 helper->apis[
""].init(defHost);
326 LOGP(info,
"Initialised default CCDB host {}", defHost);
328 for (
auto&
entry : helper->remappings) {
329 if (helper->apis.find(
entry.second) == helper->apis.end()) {
330 helper->apis[
entry.second].init(
entry.second);
331 LOGP(info,
"Initialised custom CCDB host {}",
entry.second);
333 LOGP(info,
"{} is remapped to {}",
entry.first,
entry.second);
335 helper->createdNotBefore =
std::to_string(options.
get<int64_t>(
"condition-not-before"));
336 helper->createdNotAfter =
std::to_string(options.
get<int64_t>(
"condition-not-after"));
338 for (
auto &route : spec.
outputs) {
339 if (route.matcher.lifetime != Lifetime::Condition) {
343 if (accountedSpecs.find(specStr) != accountedSpecs.end()) {
346 accountedSpecs[specStr] =
true;
347 helper->routes.push_back(route);
349 for (
auto& metadata : route.matcher.metadata) {
351 LOGP(info,
"- {}: {}", metadata.name, metadata.defaultValue.asString());
358 LOGP(info,
"CCDB cache miss/hit ratio:");
359 for (
auto&
entry : helper->mapURL2UUID) {
360 LOGP(info,
" {}: {}/{} ({}-{} bytes)",
entry.first,
entry.second.cacheMiss,
entry.second.cacheHit,
entry.second.minSize,
entry.second.maxSize);
366 O2_SIGNPOST_START(ccdb, sid,
"fetchFromCCDB",
"Fetching CCDB objects for timeslice %" PRIu64, (uint64_t)timingInfo.
timeslice);
367 static Long64_t orbitResetTime = -1;
368 static size_t lastTimeUsed = -1;
370 LOGP(info,
"Dummy creation time is not supported for CCDB objects. Setting creation to last one used {}.", lastTimeUsed);
376 const std::string
path =
"CTP/Calib/OrbitReset";
377 std::map<std::string, std::string> metadata;
378 std::map<std::string, std::string> headers;
380 bool checkValidity = std::abs(
int(timingInfo.
tfCounter - helper->lastCheckedTFCounterOrbReset)) >= helper->queryPeriodGlo;
381 const auto url2uuid = helper->mapURL2UUID.find(
path);
382 if (url2uuid != helper->mapURL2UUID.end()) {
383 etag = url2uuid->second.etag;
385 checkValidity =
true;
387 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"fetchFromCCDB",
"checkValidity is %{public}s for tfID %d of %{public}s",
388 checkValidity ?
"true" :
"false", timingInfo.
tfCounter,
path.data());
390 Long64_t newOrbitResetTime = orbitResetTime;
392 const auto& api = helper->getAPI(
path);
393 if (checkValidity && (!api.isSnapshotMode() || etag.empty())) {
394 helper->lastCheckedTFCounterOrbReset = timingInfo.
tfCounter;
395 api.loadFileToMemory(
v,
path, metadata, timingInfo.
creation, &headers, etag, helper->createdNotAfter, helper->createdNotBefore);
396 if ((headers.count(
"Error") != 0) || (etag.empty() &&
v.empty())) {
397 LOGP(fatal,
"Unable to find object {}/{}",
path, timingInfo.
creation);
402 helper->mapURL2UUID[
path].etag = headers[
"ETag"];
403 helper->mapURL2UUID[
path].cacheMiss++;
404 helper->mapURL2UUID[
path].minSize = std::min(
v.size(), helper->mapURL2UUID[
path].minSize);
405 helper->mapURL2UUID[
path].maxSize = std::max(
v.size(), helper->mapURL2UUID[
path].maxSize);
407 api.appendFlatHeader(
v, headers);
409 helper->mapURL2DPLCache[
path] = cacheId;
410 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"fetchFromCCDB",
"Caching %{public}s for %{public}s (DPL id %" PRIu64
")",
path.data(), headers[
"ETag"].data(), cacheId.value);
411 }
else if (
v.size()) {
413 helper->mapURL2UUID[
path].etag = headers[
"ETag"];
414 helper->mapURL2UUID[
path].cacheMiss++;
415 helper->mapURL2UUID[
path].minSize = std::min(
v.size(), helper->mapURL2UUID[
path].minSize);
416 helper->mapURL2UUID[
path].maxSize = std::max(
v.size(), helper->mapURL2UUID[
path].maxSize);
418 api.appendFlatHeader(
v, headers);
420 helper->mapURL2DPLCache[
path] = cacheId;
421 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"fetchFromCCDB",
"Caching %{public}s for %{public}s (DPL id %" PRIu64
")",
path.data(), headers[
"ETag"].data(), cacheId.value);
427 auto cacheId = helper->mapURL2DPLCache[
path];
428 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"fetchFromCCDB",
"Reusing %{public}s for %{public}s (DPL id %" PRIu64
")",
path.data(), headers[
"ETag"].data(), cacheId.value);
429 helper->mapURL2UUID[
path].cacheHit++;
432 if (newOrbitResetTime != orbitResetTime) {
433 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"fetchFromCCDB",
"Orbit reset time changed from %lld to %lld", orbitResetTime, newOrbitResetTime);
434 orbitResetTime = newOrbitResetTime;
440 if (std::abs(int64_t(timingInfo.
creation) - timestamp) > helper->timeToleranceMS) {
441 static bool notWarnedYet =
true;
443 LOGP(warn,
"timestamp {} for orbit {} and orbit reset time {} differs by >{} from the TF creation time {}, use the latter", timestamp, timingInfo.
firstTForbit, orbitResetTime / 1000, helper->timeToleranceMS, timingInfo.
creation);
444 notWarnedYet =
false;
451 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"fetchFromCCDB",
"Fetching objects. Run %{public}s. OrbitResetTime %lld. Creation %lld. Timestamp %lld. firstTForbit %" PRIu32,
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)
long orbitResetTimeMUS
The start time of the first orbit in microseconds(!)
DeploymentMode deploymentMode
Where we thing this is running.