154 std::vector<CCDBFetcherHelper::FetchOp>
const& ops,
157 DataAllocator& allocator) -> std::vector<CCDBFetcherHelper::Response>
165 O2_SIGNPOST_START(ccdb, sid,
"populateCacheWith",
"Starting to populate cache with CCDB objects");
166 std::vector<Response> responses;
167 for (
auto&
op : ops) {
168 int64_t timestampToUse =
op.timestamp;
173 auto&&
v = allocator.makeVector<
char>(
output);
174 std::map<std::string, std::string> metadata;
175 std::map<std::string, std::string> headers;
176 std::string
path =
op.url;
177 std::string
etag =
"";
178 int chRate = helper->queryPeriodGlo;
179 bool checkValidity =
false;
180 if (
op.runDependent > 0) {
181 if (
op.runDependent == 1) {
182 metadata[
"runNumber"] = std::format(
"{}",
op.runNumber);
183 }
else if (
op.runDependent == 2) {
184 timestampToUse =
op.runNumber;
186 LOGP(fatal,
"Undefined ccdb-run-dependent option {} for spec {}/{}/{}",
op.runDependent,
187 concrete.origin.as<std::string>(), concrete.description.as<std::string>(),
int(concrete.subSpec));
190 for (
auto m :
op.metadata) {
191 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"populateCacheWith",
"Adding metadata %{public}s: %{public}s to the request",
m.key.data(),
m.value.data());
192 metadata[
m.key] =
m.value;
194 if (
op.queryRate != 0) {
195 chRate =
op.queryRate * helper->queryPeriodFactor;
198 const auto url2uuid = helper->mapURL2UUID.find(
path);
199 if (url2uuid != helper->mapURL2UUID.end()) {
200 etag = url2uuid->second.etag;
202 uint64_t validUntil = url2uuid->second.cacheValidUntil;
207 bool cacheExpired = (validUntil <= timestampToUse) || (
op.timestamp <
cachePopulatedAt);
208 checkValidity = (std::abs(
int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) && (isOnline || cacheExpired);
210 checkValidity =
true;
213 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"populateCacheWith",
"checkValidity is %{public}s for tfID %d of %{public}s", checkValidity ?
"true" :
"false", timingInfo.tfCounter,
path.data());
215 const auto& api = helper->getAPI(
path);
216 if (checkValidity && (!api.isSnapshotMode() ||
etag.empty())) {
217 LOGP(detail,
"Loading {} for timestamp {}",
path, timestampToUse);
218 api.loadFileToMemory(
v,
path, metadata, timestampToUse, &headers,
etag, helper->createdNotAfter, helper->createdNotBefore);
219 if ((headers.count(
"Error") != 0) || (
etag.empty() &&
v.empty())) {
220 LOGP(fatal,
"Unable to find CCDB object {}/{}",
path, timestampToUse);
225 if (headers.find(
"default") != headers.end()) {
226 LOGP(detail,
"******** Default entry used for {} ********",
path);
228 helper->mapURL2UUID[
path].lastCheckedTF = timingInfo.tfCounter;
230 helper->mapURL2UUID[
path].etag = headers[
"ETag"];
231 helper->mapURL2UUID[
path].cachePopulatedAt = timestampToUse;
232 helper->mapURL2UUID[
path].cacheMiss++;
233 helper->mapURL2UUID[
path].size =
v.size();
234 helper->mapURL2UUID[
path].minSize = std::min(
v.size(), helper->mapURL2UUID[
path].minSize);
235 helper->mapURL2UUID[
path].maxSize = std::max(
v.size(), helper->mapURL2UUID[
path].maxSize);
236 auto size =
v.size();
237 api.appendFlatHeader(
v, headers);
239 helper->mapURL2DPLCache[
path] = cacheId;
240 responses.emplace_back(
Response{.
id = cacheId, .size =
size, .request =
nullptr});
241 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"populateCacheWith",
"Caching %{public}s for %{public}s (DPL id %" PRIu64
", size %zu)",
path.data(), headers[
"ETag"].data(), cacheId.value,
size);
246 helper->mapURL2UUID[
path].etag = headers[
"ETag"];
247 helper->mapURL2UUID[
path].cachePopulatedAt = timestampToUse;
248 helper->mapURL2UUID[
path].cacheValidUntil = headers[
"Cache-Valid-Until"].empty() ? 0 : std::stoul(headers[
"Cache-Valid-Until"]);
249 helper->mapURL2UUID[
path].cacheMiss++;
250 helper->mapURL2UUID[
path].size =
v.size();
251 helper->mapURL2UUID[
path].minSize = std::min(
v.size(), helper->mapURL2UUID[
path].minSize);
252 helper->mapURL2UUID[
path].maxSize = std::max(
v.size(), helper->mapURL2UUID[
path].maxSize);
253 auto size =
v.size();
254 api.appendFlatHeader(
v, headers);
256 helper->mapURL2DPLCache[
path] = cacheId;
257 responses.emplace_back(
Response{.
id = cacheId, .size =
size, .request =
nullptr});
258 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"populateCacheWith",
"Caching %{public}s for %{public}s (DPL id %" PRIu64
")",
path.data(), headers[
"ETag"].data(), cacheId.value);
264 helper->mapURL2UUID[
path].cacheValidUntil = headers[
"Cache-Valid-Until"].empty() ? 0 : std::stoul(headers[
"Cache-Valid-Until"]);
268 auto cacheId = helper->mapURL2DPLCache[
path];
269 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"populateCacheWith",
"Reusing %{public}s for %{public}s (DPL id %" PRIu64
")",
path.data(), headers[
"ETag"].data(), cacheId.value);
270 helper->mapURL2UUID[
path].cacheHit++;
271 responses.emplace_back(
Response{.
id = cacheId, .size = helper->mapURL2UUID[
path].size, .request =
nullptr});
275 O2_SIGNPOST_END(ccdb, sid,
"populateCacheWith",
"Finished populating cache with CCDB objects");
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)
DeploymentMode deploymentMode
Where we thing this is running.