171 std::unordered_map<std::string, bool> accountedSpecs;
172 auto defHost = options.
get<std::string>(
"condition-backend");
173 auto checkRate = options.
get<
int>(
"condition-tf-per-query");
174 auto checkMult = options.
get<
int>(
"condition-tf-per-query-multiplier");
175 helper.useTFSlice = options.
get<
int>(
"condition-use-slice-for-prescaling");
176 helper.timeToleranceMS = options.
get<int64_t>(
"condition-time-tolerance");
177 helper.queryPeriodGlo = checkRate > 0 ? checkRate : std::numeric_limits<int>::max();
178 helper.queryPeriodFactor = checkMult == 0 ? 1 : checkMult;
179 std::string extraCond{};
180 if (helper.useTFSlice) {
181 extraCond =
". Use TFSlice";
182 if (helper.useTFSlice > 0) {
183 extraCond += fmt::format(
" + max TFcounter jump <= {}", helper.useTFSlice);
186 LOGP(info,
"CCDB Backend at: {}, validity check for every {} TF{}{}", defHost, helper.queryPeriodGlo,
187 helper.queryPeriodFactor == 1 ? std::string{} : (helper.queryPeriodFactor > 0 ? fmt::format(
", (query for high-rate objects downscaled by {})", helper.queryPeriodFactor) : fmt::format(
", (query downscaled as TFcounter%{})", -helper.queryPeriodFactor)),
189 LOGP(info,
"Hook to enable signposts for CCDB messages at {}", (
void*)&private_o2_log_ccdb->stacktrace);
190 auto remapString = options.
get<std::string>(
"condition-remap");
192 if (!
result.error.empty()) {
195 helper.remappings =
result.remappings;
196 helper.apis[
""].init(defHost);
197 LOGP(info,
"Initialised default CCDB host {}", defHost);
199 for (
auto&
entry : helper.remappings) {
200 if (helper.apis.find(
entry.second) == helper.apis.end()) {
202 LOGP(info,
"Initialised custom CCDB host {}",
entry.second);
204 LOGP(info,
"{} is remapped to {}",
entry.first,
entry.second);
206 helper.createdNotBefore =
std::to_string(options.
get<int64_t>(
"condition-not-before"));
207 helper.createdNotAfter =
std::to_string(options.
get<int64_t>(
"condition-not-after"));
209 for (
auto& route : outputRoutes) {
210 if (route.matcher.lifetime != Lifetime::Condition) {
214 if (accountedSpecs.find(specStr) != accountedSpecs.end()) {
217 accountedSpecs[specStr] =
true;
218 helper.routes.push_back(route);
220 for (
auto& metadata : route.matcher.metadata) {
222 LOGP(info,
"- {}: {}", metadata.name, metadata.defaultValue.asString());
274 std::string ccdbMetadataPrefix =
"ccdb-metadata-";
281 O2_SIGNPOST_START(ccdb, sid,
"populateCacheWith",
"Starting to populate cache with CCDB objects");
282 for (
auto& route : helper->routes) {
283 int64_t timestampToUse = timestamp;
288 auto&&
v = allocator.makeVector<
char>(
output);
289 std::map<std::string, std::string> metadata;
290 std::map<std::string, std::string> headers;
291 std::string
path =
"";
292 std::string
etag =
"";
293 int chRate = helper->queryPeriodGlo;
294 bool checkValidity =
false;
295 for (
auto& meta : route.matcher.metadata) {
296 if (meta.name ==
"ccdb-path") {
297 path = meta.defaultValue.get<std::string>();
298 }
else if (meta.name ==
"ccdb-run-dependent" && meta.defaultValue.get<
int>() > 0) {
299 if (meta.defaultValue.get<
int>() == 1) {
300 metadata[
"runNumber"] = dtc.runNumber;
301 }
else if (meta.defaultValue.get<
int>() == 2) {
302 timestampToUse = std::stoi(dtc.runNumber);
304 LOGP(fatal,
"Undefined ccdb-run-dependent option {} for spec {}/{}/{}", meta.defaultValue.get<
int>(), concrete.origin.as<std::string>(), concrete.description.as<std::string>(),
int(concrete.subSpec));
306 }
else if (
isPrefix(ccdbMetadataPrefix, meta.name)) {
307 std::string
key = meta.name.substr(ccdbMetadataPrefix.size());
308 auto value = meta.defaultValue.get<std::string>();
311 }
else if (meta.name ==
"ccdb-query-rate") {
312 chRate = std::max(1, meta.defaultValue.get<
int>()) * helper->queryPeriodFactor;
315 const auto url2uuid = helper->mapURL2UUID.find(
path);
316 if (url2uuid != helper->mapURL2UUID.end()) {
317 etag = url2uuid->second.etag;
319 uint64_t validUntil = url2uuid->second.cacheValidUntil;
324 bool cacheExpired = (validUntil <= timestampToUse) || (timestamp <
cachePopulatedAt);
325 if (isOnline || cacheExpired) {
326 if (!helper->useTFSlice) {
327 checkValidity = chRate > 0 ? (std::abs(
int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) : (timingInfo.tfCounter % -chRate) == 0;
329 checkValidity = chRate > 0 ? (std::abs(
int(timingInfo.timeslice - url2uuid->second.lastCheckedSlice)) >= chRate) : (timingInfo.timeslice % -chRate) == 0;
330 if (!checkValidity && helper->useTFSlice > std::abs(chRate)) {
331 checkValidity = std::abs(
int(timingInfo.tfCounter) - url2uuid->second.lastCheckedTF) > helper->useTFSlice;
336 checkValidity =
true;
339 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"populateCacheWith",
"checkValidity is %{public}s for tf%{public}s %zu of %{public}s", checkValidity ?
"true" :
"false", helper->useTFSlice ?
"ID" :
"Slice", helper->useTFSlice ? timingInfo.timeslice : timingInfo.tfCounter,
path.data());
341 const auto& api = helper->getAPI(
path);
342 if (checkValidity && (!api.isSnapshotMode() ||
etag.empty())) {
343 LOGP(detail,
"Loading {} for timestamp {}",
path, timestampToUse);
344 api.loadFileToMemory(
v,
path, metadata, timestampToUse, &headers,
etag, helper->createdNotAfter, helper->createdNotBefore);
345 if ((headers.count(
"Error") != 0) || (
etag.empty() &&
v.empty())) {
346 LOGP(fatal,
"Unable to find CCDB object {}/{}",
path, timestampToUse);
351 if (headers.find(
"default") != headers.end()) {
352 LOGP(detail,
"******** Default entry used for {} ********",
path);
354 helper->mapURL2UUID[
path].lastCheckedTF = timingInfo.tfCounter;
355 helper->mapURL2UUID[
path].lastCheckedSlice = timingInfo.timeslice;
357 helper->mapURL2UUID[
path].etag = headers[
"ETag"];
358 helper->mapURL2UUID[
path].cachePopulatedAt = timestampToUse;
359 helper->mapURL2UUID[
path].cacheMiss++;
360 helper->mapURL2UUID[
path].size =
v.size();
361 helper->mapURL2UUID[
path].minSize = std::min(
v.size(), helper->mapURL2UUID[
path].minSize);
362 helper->mapURL2UUID[
path].maxSize = std::max(
v.size(), helper->mapURL2UUID[
path].maxSize);
363 helper->totalFetchedBytes +=
v.size();
364 helper->totalRequestedBytes +=
v.size();
365 api.appendFlatHeader(
v, headers);
367 helper->mapURL2DPLCache[
path] = cacheId;
368 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"populateCacheWith",
"Caching %{public}s for %{public}s (DPL id %" PRIu64
")",
path.data(), headers[
"ETag"].data(), cacheId.value);
372 helper->mapURL2UUID[
path].etag = headers[
"ETag"];
373 helper->mapURL2UUID[
path].cachePopulatedAt = timestampToUse;
374 helper->mapURL2UUID[
path].cacheValidUntil = headers[
"Cache-Valid-Until"].empty() ? 0 : std::stoul(headers[
"Cache-Valid-Until"]);
375 helper->mapURL2UUID[
path].cacheMiss++;
376 helper->mapURL2UUID[
path].size =
v.size();
377 helper->mapURL2UUID[
path].minSize = std::min(
v.size(), helper->mapURL2UUID[
path].minSize);
378 helper->mapURL2UUID[
path].maxSize = std::max(
v.size(), helper->mapURL2UUID[
path].maxSize);
379 helper->totalFetchedBytes +=
v.size();
380 helper->totalRequestedBytes +=
v.size();
381 api.appendFlatHeader(
v, headers);
383 helper->mapURL2DPLCache[
path] = cacheId;
384 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"populateCacheWith",
"Caching %{public}s for %{public}s (DPL id %" PRIu64
")",
path.data(), headers[
"ETag"].data(), cacheId.value);
388 helper->mapURL2UUID[
path].cacheValidUntil = headers[
"Cache-Valid-Until"].empty() ? 0 : std::stoul(headers[
"Cache-Valid-Until"]);
392 auto cacheId = helper->mapURL2DPLCache[
path];
393 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"populateCacheWith",
"Reusing %{public}s for %{public}s (DPL id %" PRIu64
")",
path.data(), headers[
"ETag"].data(), cacheId.value);
394 helper->mapURL2UUID[
path].cacheHit++;
395 helper->totalRequestedBytes += helper->mapURL2UUID[
path].size;
399 O2_SIGNPOST_END(ccdb, sid,
"populateCacheWith",
"Finished populating cache with CCDB objects");
405 std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
410 LOGP(info,
"CCDB cache miss/hit ratio ({} fetched / {} requested bytes):", helper->totalFetchedBytes, helper->totalRequestedBytes);
411 for (
auto&
entry : helper->mapURL2UUID) {
412 LOGP(info,
" {}: {}/{} ({}-{} bytes)",
entry.first,
entry.second.cacheMiss,
entry.second.cacheHit,
entry.second.minSize,
entry.second.maxSize);
418 O2_SIGNPOST_START(ccdb, sid,
"fetchFromCCDB",
"Fetching CCDB objects for timeslice %" PRIu64, (uint64_t)timingInfo.
timeslice);
419 static Long64_t orbitResetTime = -1;
420 static size_t lastTimeUsed = -1;
422 LOGP(info,
"Dummy creation time is not supported for CCDB objects. Setting creation to last one used {}.", lastTimeUsed);
428 const std::string
path =
"CTP/Calib/OrbitReset";
429 std::map<std::string, std::string> metadata;
430 std::map<std::string, std::string> headers;
433 bool checkValidity = std::abs(
int(
counter - helper->lastCheckedTFCounterOrbReset)) >= helper->queryPeriodGlo;
434 const auto url2uuid = helper->mapURL2UUID.find(
path);
435 if (url2uuid != helper->mapURL2UUID.end()) {
436 etag = url2uuid->second.etag;
438 checkValidity =
true;
440 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"fetchFromCCDB",
"checkValidity is %{public}s for tf%{public}s %d of %{public}s",
441 checkValidity ?
"true" :
"false", helper->useTFSlice ?
"ID" :
"Slice",
counter,
path.data());
443 Long64_t newOrbitResetTime = orbitResetTime;
445 const auto& api = helper->getAPI(
path);
446 if (checkValidity && (!api.isSnapshotMode() ||
etag.empty())) {
447 helper->lastCheckedTFCounterOrbReset =
counter;
448 api.loadFileToMemory(
v,
path, metadata, timingInfo.
creation, &headers,
etag, helper->createdNotAfter, helper->createdNotBefore);
449 if ((headers.count(
"Error") != 0) || (
etag.empty() &&
v.empty())) {
450 LOGP(fatal,
"Unable to find CCDB object {}/{}",
path, timingInfo.
creation);
455 helper->mapURL2UUID[
path].etag = headers[
"ETag"];
456 helper->mapURL2UUID[
path].cacheMiss++;
457 helper->mapURL2UUID[
path].size =
v.size();
458 helper->mapURL2UUID[
path].minSize = std::min(
v.size(), helper->mapURL2UUID[
path].minSize);
459 helper->mapURL2UUID[
path].maxSize = std::max(
v.size(), helper->mapURL2UUID[
path].maxSize);
460 helper->totalFetchedBytes +=
v.size();
461 helper->totalRequestedBytes +=
v.size();
463 api.appendFlatHeader(
v, headers);
465 helper->mapURL2DPLCache[
path] = cacheId;
466 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"fetchFromCCDB",
"Caching %{public}s for %{public}s (DPL id %" PRIu64
")",
path.data(), headers[
"ETag"].data(), cacheId.value);
467 }
else if (
v.size()) {
468 helper->mapURL2UUID[
path].etag = headers[
"ETag"];
469 helper->mapURL2UUID[
path].cacheMiss++;
470 helper->mapURL2UUID[
path].size =
v.size();
471 helper->mapURL2UUID[
path].minSize = std::min(
v.size(), helper->mapURL2UUID[
path].minSize);
472 helper->mapURL2UUID[
path].maxSize = std::max(
v.size(), helper->mapURL2UUID[
path].maxSize);
473 helper->totalFetchedBytes +=
v.size();
474 helper->totalRequestedBytes +=
v.size();
476 api.appendFlatHeader(
v, headers);
478 helper->mapURL2DPLCache[
path] = cacheId;
479 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"fetchFromCCDB",
"Caching %{public}s for %{public}s (DPL id %" PRIu64
")",
path.data(), headers[
"ETag"].data(), cacheId.value);
483 auto cacheId = helper->mapURL2DPLCache[
path];
484 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"fetchFromCCDB",
"Reusing %{public}s for %{public}s (DPL id %" PRIu64
")",
path.data(), headers[
"ETag"].data(), cacheId.value);
485 helper->mapURL2UUID[
path].cacheHit++;
486 helper->totalRequestedBytes += helper->mapURL2UUID[
path].size;
489 if (newOrbitResetTime != orbitResetTime) {
490 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"fetchFromCCDB",
"Orbit reset time changed from %lld to %lld", orbitResetTime, newOrbitResetTime);
491 orbitResetTime = newOrbitResetTime;
497 if (std::abs(int64_t(timingInfo.
creation) - timestamp) > helper->timeToleranceMS) {
498 static bool notWarnedYet =
true;
500 LOGP(warn,
"timestamp {} for orbit {} and orbit reset time {} differs by >{} from the TF creation time {}, use the latter", timestamp, timingInfo.
firstTForbit, orbitResetTime / 1000, helper->timeToleranceMS, timingInfo.
creation);
501 notWarnedYet =
false;
508 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"fetchFromCCDB",
"Fetching objects. Run %{public}s. OrbitResetTime %lld. Creation %lld. Timestamp %lld. firstTForbit %" PRIu32,
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)
long orbitResetTimeMUS
The start time of the first orbit in microseconds(!)
DeploymentMode deploymentMode
Where we thing this is running.