32struct CCDBFetcherHelper {
33 struct CCDBCacheInfo {
56 std::unordered_map<std::string, o2::ccdb::CcdbApi>
apis;
69 if (
pos == std::string::npos) {
73 auto pos2 =
path.rfind(
'/',
pos);
74 if (pos2 == std::string::npos || pos2 ==
pos - 1 || pos2 == 0) {
83bool isPrefix(std::string_view prefix, std::string_view full)
85 return prefix == full.substr(0, prefix.size());
90 std::unordered_map<std::string, std::string>
remappings;
91 std::string currentUrl =
"";
100 ParsingStates
state = IN_BEGIN;
108 state = IN_BEGIN_URL;
111 if ((strncmp(
"http://",
str, 7) != 0) && (strncmp(
"https://",
str, 8) != 0 && (strncmp(
"file://",
str, 7) != 0))) {
112 return {
remappings,
"URL should start with either http:// or https:// or file://"};
117 char const*
c = strchr(
str,
'=');
119 return {
remappings,
"Expecting at least one target path, missing `='?"};
121 if ((
c -
str) == 0) {
124 currentUrl = std::string_view(
str,
c -
str);
125 state = IN_BEGIN_TARGET;
128 case IN_BEGIN_TARGET: {
132 state = IN_END_TARGET;
134 case IN_END_TARGET: {
135 char const*
c = strpbrk(
str,
",;");
138 return {
remappings, fmt::format(
"Path {} requested more than once.",
str)};
143 if ((
c -
str) == 0) {
148 return {
remappings, fmt::format(
"Path {} requested more than once.",
key)};
152 state = IN_BEGIN_URL;
154 state = IN_BEGIN_TARGET;
164 std::unordered_map<std::string, bool> accountedSpecs;
165 auto defHost = options.
get<std::string>(
"condition-backend");
166 auto checkRate = options.
get<
int>(
"condition-tf-per-query");
167 auto checkMult = options.
get<
int>(
"condition-tf-per-query-multiplier");
168 helper.timeToleranceMS = options.
get<int64_t>(
"condition-time-tolerance");
169 helper.queryPeriodGlo = checkRate > 0 ? checkRate : std::numeric_limits<int>::max();
170 helper.queryPeriodFactor = checkMult > 0 ? checkMult : 1;
171 LOGP(info,
"CCDB Backend at: {}, validity check for every {} TF{}", defHost, helper.queryPeriodGlo, helper.queryPeriodFactor == 1 ? std::string{} : fmt::format(
", (query for high-rate objects downscaled by {})", helper.queryPeriodFactor));
172 LOGP(info,
"Hook to enable signposts for CCDB messages at {}", (
void*)&private_o2_log_ccdb->stacktrace);
173 auto remapString = options.
get<std::string>(
"condition-remap");
175 if (!
result.error.empty()) {
178 helper.remappings =
result.remappings;
179 helper.apis[
""].init(defHost);
180 LOGP(info,
"Initialised default CCDB host {}", defHost);
182 for (
auto&
entry : helper.remappings) {
183 if (helper.apis.find(
entry.second) == helper.apis.end()) {
185 LOGP(info,
"Initialised custom CCDB host {}",
entry.second);
187 LOGP(info,
"{} is remapped to {}",
entry.first,
entry.second);
189 helper.createdNotBefore =
std::to_string(options.
get<int64_t>(
"condition-not-before"));
190 helper.createdNotAfter =
std::to_string(options.
get<int64_t>(
"condition-not-after"));
192 for (
auto& route : outputRoutes) {
193 if (route.matcher.lifetime != Lifetime::Condition) {
197 if (accountedSpecs.find(specStr) != accountedSpecs.end()) {
200 accountedSpecs[specStr] =
true;
201 helper.routes.push_back(route);
203 for (
auto& metadata : route.matcher.metadata) {
205 LOGP(info,
"- {}: {}", metadata.name, metadata.defaultValue.asString());
213 Int_t previousErrorLevel = gErrorIgnoreLevel;
214 gErrorIgnoreLevel = kFatal;
215 TMemFile memFile(
"name",
const_cast<char*
>(
v.data()),
v.size(),
"READ");
216 gErrorIgnoreLevel = previousErrorLevel;
217 if (memFile.IsZombie()) {
220 TClass* tcl = TClass::GetClass(
typeid(std::vector<Long64_t>));
223 throw runtime_error_f(
"Couldn't retrieve object corresponding to %s from TFile", tcl->GetName());
226 auto* ctp = (std::vector<Long64_t>*)
result;
241 std::string ccdbMetadataPrefix =
"ccdb-metadata-";
248 O2_SIGNPOST_START(ccdb, sid,
"populateCacheWith",
"Starting to populate cache with CCDB objects");
249 for (
auto& route : helper->routes) {
250 int64_t timestampToUse = timestamp;
255 auto&&
v = allocator.makeVector<
char>(
output);
256 std::map<std::string, std::string> metadata;
257 std::map<std::string, std::string> headers;
258 std::string
path =
"";
259 std::string
etag =
"";
260 int chRate = helper->queryPeriodGlo;
261 bool checkValidity =
false;
262 for (
auto& meta : route.matcher.metadata) {
263 if (meta.name ==
"ccdb-path") {
264 path = meta.defaultValue.get<std::string>();
265 }
else if (meta.name ==
"ccdb-run-dependent" && meta.defaultValue.get<
int>() > 0) {
266 if (meta.defaultValue.get<
int>() == 1) {
267 metadata[
"runNumber"] = dtc.runNumber;
268 }
else if (meta.defaultValue.get<
int>() == 2) {
269 timestampToUse = std::stoi(dtc.runNumber);
271 LOGP(fatal,
"Undefined ccdb-run-dependent option {} for spec {}/{}/{}", meta.defaultValue.get<
int>(), concrete.origin.as<std::string>(), concrete.description.as<std::string>(),
int(concrete.subSpec));
273 }
else if (
isPrefix(ccdbMetadataPrefix, meta.name)) {
274 std::string
key = meta.name.substr(ccdbMetadataPrefix.size());
275 auto value = meta.defaultValue.get<std::string>();
278 }
else if (meta.name ==
"ccdb-query-rate") {
279 chRate = meta.defaultValue.get<
int>() * helper->queryPeriodFactor;
282 const auto url2uuid = helper->mapURL2UUID.find(
path);
283 if (url2uuid != helper->mapURL2UUID.end()) {
284 etag = url2uuid->second.etag;
286 uint64_t validUntil = url2uuid->second.cacheValidUntil;
291 bool cacheExpired = (validUntil <= timestampToUse) || (timestamp <
cachePopulatedAt);
292 checkValidity = (std::abs(
int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) && (isOnline || cacheExpired);
294 checkValidity =
true;
297 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"populateCacheWith",
"checkValidity is %{public}s for tfID %d of %{public}s", checkValidity ?
"true" :
"false", timingInfo.tfCounter,
path.data());
299 const auto& api = helper->getAPI(
path);
300 if (checkValidity && (!api.isSnapshotMode() ||
etag.empty())) {
301 LOGP(detail,
"Loading {} for timestamp {}",
path, timestampToUse);
302 api.loadFileToMemory(
v,
path, metadata, timestampToUse, &headers,
etag, helper->createdNotAfter, helper->createdNotBefore);
303 if ((headers.count(
"Error") != 0) || (
etag.empty() &&
v.empty())) {
304 LOGP(fatal,
"Unable to find CCDB object {}/{}",
path, timestampToUse);
309 if (headers.find(
"default") != headers.end()) {
310 LOGP(detail,
"******** Default entry used for {} ********",
path);
312 helper->mapURL2UUID[
path].lastCheckedTF = timingInfo.tfCounter;
314 helper->mapURL2UUID[
path].etag = headers[
"ETag"];
315 helper->mapURL2UUID[
path].cachePopulatedAt = timestampToUse;
316 helper->mapURL2UUID[
path].cacheMiss++;
317 helper->mapURL2UUID[
path].minSize = std::min(
v.size(), helper->mapURL2UUID[
path].minSize);
318 helper->mapURL2UUID[
path].maxSize = std::max(
v.size(), helper->mapURL2UUID[
path].maxSize);
319 api.appendFlatHeader(
v, headers);
321 helper->mapURL2DPLCache[
path] = cacheId;
322 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"populateCacheWith",
"Caching %{public}s for %{public}s (DPL id %" PRIu64
")",
path.data(), headers[
"ETag"].data(), cacheId.value);
327 helper->mapURL2UUID[
path].etag = headers[
"ETag"];
328 helper->mapURL2UUID[
path].cachePopulatedAt = timestampToUse;
329 helper->mapURL2UUID[
path].cacheValidUntil = headers[
"Cache-Valid-Until"].empty() ? 0 : std::stoul(headers[
"Cache-Valid-Until"]);
330 helper->mapURL2UUID[
path].cacheMiss++;
331 helper->mapURL2UUID[
path].minSize = std::min(
v.size(), helper->mapURL2UUID[
path].minSize);
332 helper->mapURL2UUID[
path].maxSize = std::max(
v.size(), helper->mapURL2UUID[
path].maxSize);
333 api.appendFlatHeader(
v, headers);
335 helper->mapURL2DPLCache[
path] = cacheId;
336 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"populateCacheWith",
"Caching %{public}s for %{public}s (DPL id %" PRIu64
")",
path.data(), headers[
"ETag"].data(), cacheId.value);
342 helper->mapURL2UUID[
path].cacheValidUntil = headers[
"Cache-Valid-Until"].empty() ? 0 : std::stoul(headers[
"Cache-Valid-Until"]);
346 auto cacheId = helper->mapURL2DPLCache[
path];
347 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"populateCacheWith",
"Reusing %{public}s for %{public}s (DPL id %" PRIu64
")",
path.data(), headers[
"ETag"].data(), cacheId.value);
348 helper->mapURL2UUID[
path].cacheHit++;
352 O2_SIGNPOST_END(ccdb, sid,
"populateCacheWith",
"Finished populating cache with CCDB objects");
358 std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
363 LOGP(info,
"CCDB cache miss/hit ratio:");
364 for (
auto&
entry : helper->mapURL2UUID) {
365 LOGP(info,
" {}: {}/{} ({}-{} bytes)",
entry.first,
entry.second.cacheMiss,
entry.second.cacheHit,
entry.second.minSize,
entry.second.maxSize);
371 O2_SIGNPOST_START(ccdb, sid,
"fetchFromCCDB",
"Fetching CCDB objects for timeslice %" PRIu64, (uint64_t)timingInfo.
timeslice);
372 static Long64_t orbitResetTime = -1;
373 static size_t lastTimeUsed = -1;
375 LOGP(info,
"Dummy creation time is not supported for CCDB objects. Setting creation to last one used {}.", lastTimeUsed);
381 const std::string
path =
"CTP/Calib/OrbitReset";
382 std::map<std::string, std::string> metadata;
383 std::map<std::string, std::string> headers;
385 bool checkValidity = std::abs(
int(timingInfo.
tfCounter - helper->lastCheckedTFCounterOrbReset)) >= helper->queryPeriodGlo;
386 const auto url2uuid = helper->mapURL2UUID.find(
path);
387 if (url2uuid != helper->mapURL2UUID.end()) {
388 etag = url2uuid->second.etag;
390 checkValidity =
true;
392 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"fetchFromCCDB",
"checkValidity is %{public}s for tfID %d of %{public}s",
393 checkValidity ?
"true" :
"false", timingInfo.
tfCounter,
path.data());
395 Long64_t newOrbitResetTime = orbitResetTime;
397 const auto& api = helper->getAPI(
path);
398 if (checkValidity && (!api.isSnapshotMode() ||
etag.empty())) {
399 helper->lastCheckedTFCounterOrbReset = timingInfo.
tfCounter;
400 api.loadFileToMemory(
v,
path, metadata, timingInfo.
creation, &headers,
etag, helper->createdNotAfter, helper->createdNotBefore);
401 if ((headers.count(
"Error") != 0) || (
etag.empty() &&
v.empty())) {
402 LOGP(fatal,
"Unable to find CCDB object {}/{}",
path, timingInfo.
creation);
407 helper->mapURL2UUID[
path].etag = headers[
"ETag"];
408 helper->mapURL2UUID[
path].cacheMiss++;
409 helper->mapURL2UUID[
path].minSize = std::min(
v.size(), helper->mapURL2UUID[
path].minSize);
410 helper->mapURL2UUID[
path].maxSize = std::max(
v.size(), helper->mapURL2UUID[
path].maxSize);
412 api.appendFlatHeader(
v, headers);
414 helper->mapURL2DPLCache[
path] = cacheId;
415 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"fetchFromCCDB",
"Caching %{public}s for %{public}s (DPL id %" PRIu64
")",
path.data(), headers[
"ETag"].data(), cacheId.value);
416 }
else if (
v.size()) {
418 helper->mapURL2UUID[
path].etag = headers[
"ETag"];
419 helper->mapURL2UUID[
path].cacheMiss++;
420 helper->mapURL2UUID[
path].minSize = std::min(
v.size(), helper->mapURL2UUID[
path].minSize);
421 helper->mapURL2UUID[
path].maxSize = std::max(
v.size(), helper->mapURL2UUID[
path].maxSize);
423 api.appendFlatHeader(
v, headers);
425 helper->mapURL2DPLCache[
path] = cacheId;
426 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"fetchFromCCDB",
"Caching %{public}s for %{public}s (DPL id %" PRIu64
")",
path.data(), headers[
"ETag"].data(), cacheId.value);
432 auto cacheId = helper->mapURL2DPLCache[
path];
433 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"fetchFromCCDB",
"Reusing %{public}s for %{public}s (DPL id %" PRIu64
")",
path.data(), headers[
"ETag"].data(), cacheId.value);
434 helper->mapURL2UUID[
path].cacheHit++;
437 if (newOrbitResetTime != orbitResetTime) {
438 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"fetchFromCCDB",
"Orbit reset time changed from %lld to %lld", orbitResetTime, newOrbitResetTime);
439 orbitResetTime = newOrbitResetTime;
445 if (std::abs(int64_t(timingInfo.
creation) - timestamp) > helper->timeToleranceMS) {
446 static bool notWarnedYet =
true;
448 LOGP(warn,
"timestamp {} for orbit {} and orbit reset time {} differs by >{} from the TF creation time {}, use the latter", timestamp, timingInfo.
firstTForbit, orbitResetTime / 1000, helper->timeToleranceMS, timingInfo.
creation);
449 notWarnedYet =
false;
456 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"fetchFromCCDB",
"Fetching objects. Run %{public}s. OrbitResetTime %lld. Creation %lld. Timestamp %lld. firstTForbit %" PRIu32,
std::unordered_map< std::string, std::string > remappings
Header to collect LHC related constants.
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)
static void * extractFromTFile(TFile &file, TClass const *cl, const char *what=CCDBOBJECT_ENTRY)
T get(const char *key) const
o2::pmr::vector< T > makeVector(const Output &spec, Args &&... args)
void adoptFromCache(Output const &spec, CacheId id, header::SerializationMethod method=header::gSerializationMethodNone)
Adopt an already cached message, using an already provided CacheId.
CacheId adoptContainer(const Output &, ContainerT &, CacheStrategy, o2::header::SerializationMethod)
GLsizei const GLfloat * value
GLsizei const GLchar *const * path
constexpr double LHCOrbitNS
Defining PrimaryVertex explicitly as messageable.
RuntimeErrorRef runtime_error(const char *)
auto populateCacheWith(std::shared_ptr< CCDBFetcherHelper > const &helper, int64_t timestamp, TimingInfo &timingInfo, DataTakingContext &dtc, DataAllocator &allocator) -> void
bool isOnlineRun(DataTakingContext const &dtc)
bool isPrefix(std::string_view prefix, std::string_view full)
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
RuntimeErrorRef runtime_error_f(const char *,...)
auto getOrbitResetTime(o2::pmr::vector< char > const &v) -> Long64_t
void initialiseHelper(CCDBFetcherHelper &helper, ConfigParamRegistry const &options, std::vector< o2::framework::OutputRoute > const &outputRoutes)
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
std::vector< T, fair::mq::pmr::polymorphic_allocator< T > > vector
std::string to_string(gsl::span< T, Size > span)
o2::ccdb::CcdbApi & getAPI(const std::string &path)
std::vector< OutputRoute > routes
uint32_t lastCheckedTFCounterOrbReset
std::unordered_map< std::string, std::string > remappings
std::unordered_map< std::string, DataAllocator::CacheId > mapURL2DPLCache
std::unordered_map< std::string, o2::ccdb::CcdbApi > apis
std::string createdNotBefore
std::string createdNotAfter
std::unordered_map< std::string, CCDBCacheInfo > mapURL2UUID
static ParserResult parseRemappings(char const *)
static AlgorithmSpec fetchFromCCDB()
static std::string describe(InputSpec const &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
long orbitResetTimeMUS
The start time of the first orbit in microseconds(!)
DeploymentMode deploymentMode
Where we thing this is running.
std::string runNumber
The current run number.
std::vector< OutputRoute > outputs
header::DataOrigin origin
uint32_t tfCounter
the orbit the TF begins