164 std::vector<CCDBFetcherHelper::FetchOp>
const& ops,
167 DataAllocator& allocator) -> std::vector<CCDBFetcherHelper::Response>
175 O2_SIGNPOST_START(ccdb, sid,
"populateCacheWith",
"Starting to populate cache with CCDB objects");
176 std::vector<Response> responses;
177 for (
auto&
op : ops) {
178 int64_t timestampToUse =
op.timestamp;
183 auto&&
v = allocator.makeVector<
char>(
output);
184 std::map<std::string, std::string> metadata;
185 std::map<std::string, std::string> headers;
186 std::string
path =
op.url;
187 std::string
etag =
"";
188 int chRate = helper->queryPeriodGlo;
189 bool checkValidity =
false;
190 if (
op.runDependent > 0) {
191 if (
op.runDependent == 1) {
192 metadata[
"runNumber"] = std::format(
"{}",
op.runNumber);
193 }
else if (
op.runDependent == 2) {
194 timestampToUse =
op.runNumber;
196 LOGP(fatal,
"Undefined ccdb-run-dependent option {} for spec {}/{}/{}",
op.runDependent,
197 concrete.origin.as<std::string>(), concrete.description.as<std::string>(),
int(concrete.subSpec));
200 for (
auto m :
op.metadata) {
201 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"populateCacheWith",
"Adding metadata %{public}s: %{public}s to the request",
m.key.data(),
m.value.data());
202 metadata[
m.key] =
m.value;
204 if (
op.queryRate != 0) {
205 chRate =
op.queryRate * helper->queryPeriodFactor;
208 const auto url2uuid = helper->mapURL2UUID.find(
path);
209 if (url2uuid != helper->mapURL2UUID.end()) {
210 etag = url2uuid->second.etag;
212 uint64_t validUntil = url2uuid->second.cacheValidUntil;
217 bool cacheExpired = (validUntil <= timestampToUse) || (
op.timestamp <
cachePopulatedAt);
218 if (isOnline || cacheExpired) {
219 if (!helper->useTFSlice) {
220 checkValidity = chRate > 0 ? (std::abs(
int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) : (timingInfo.tfCounter % -chRate) == 0;
222 checkValidity = chRate > 0 ? (std::abs(
int(timingInfo.timeslice - url2uuid->second.lastCheckedSlice)) >= chRate) : (timingInfo.timeslice % -chRate) == 0;
223 if (!checkValidity && helper->useTFSlice > std::abs(chRate)) {
224 checkValidity = std::abs(
int(timingInfo.tfCounter) - url2uuid->second.lastCheckedTF) > helper->useTFSlice;
229 checkValidity =
true;
232 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"populateCacheWith",
"checkValidity is %{public}s for tf%{public}s %zu of %{public}s", checkValidity ?
"true" :
"false", helper->useTFSlice ?
"ID" :
"Slice", helper->useTFSlice ? timingInfo.timeslice : timingInfo.tfCounter,
path.data());
234 const auto& api = helper->getAPI(
path);
235 if (checkValidity && (!api.isSnapshotMode() ||
etag.empty())) {
236 LOGP(detail,
"Loading {} for timestamp {}",
path, timestampToUse);
237 api.loadFileToMemory(
v,
path, metadata, timestampToUse, &headers,
etag, helper->createdNotAfter, helper->createdNotBefore);
238 if ((headers.count(
"Error") != 0) || (
etag.empty() &&
v.empty())) {
239 LOGP(fatal,
"Unable to find CCDB object {}/{}",
path, timestampToUse);
244 if (headers.find(
"default") != headers.end()) {
245 LOGP(detail,
"******** Default entry used for {} ********",
path);
247 helper->mapURL2UUID[
path].lastCheckedTF = timingInfo.tfCounter;
248 helper->mapURL2UUID[
path].lastCheckedSlice = timingInfo.timeslice;
250 helper->mapURL2UUID[
path].etag = headers[
"ETag"];
251 helper->mapURL2UUID[
path].cachePopulatedAt = timestampToUse;
252 helper->mapURL2UUID[
path].cacheMiss++;
253 helper->mapURL2UUID[
path].size =
v.size();
254 helper->mapURL2UUID[
path].minSize = std::min(
v.size(), helper->mapURL2UUID[
path].minSize);
255 helper->mapURL2UUID[
path].maxSize = std::max(
v.size(), helper->mapURL2UUID[
path].maxSize);
256 auto size =
v.size();
257 api.appendFlatHeader(
v, headers);
259 helper->mapURL2DPLCache[
path] = cacheId;
260 responses.emplace_back(
Response{.
id = cacheId, .size =
size, .request =
nullptr});
261 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"populateCacheWith",
"Caching %{public}s for %{public}s (DPL id %" PRIu64
", size %zu)",
path.data(), headers[
"ETag"].data(), cacheId.value,
size);
266 helper->mapURL2UUID[
path].etag = headers[
"ETag"];
267 helper->mapURL2UUID[
path].cachePopulatedAt = timestampToUse;
268 helper->mapURL2UUID[
path].cacheValidUntil = headers[
"Cache-Valid-Until"].empty() ? 0 : std::stoul(headers[
"Cache-Valid-Until"]);
269 helper->mapURL2UUID[
path].cacheMiss++;
270 helper->mapURL2UUID[
path].size =
v.size();
271 helper->mapURL2UUID[
path].minSize = std::min(
v.size(), helper->mapURL2UUID[
path].minSize);
272 helper->mapURL2UUID[
path].maxSize = std::max(
v.size(), helper->mapURL2UUID[
path].maxSize);
273 auto size =
v.size();
274 api.appendFlatHeader(
v, headers);
276 helper->mapURL2DPLCache[
path] = cacheId;
277 responses.emplace_back(
Response{.
id = cacheId, .size =
size, .request =
nullptr});
278 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"populateCacheWith",
"Caching %{public}s for %{public}s (DPL id %" PRIu64
")",
path.data(), headers[
"ETag"].data(), cacheId.value);
284 helper->mapURL2UUID[
path].cacheValidUntil = headers[
"Cache-Valid-Until"].empty() ? 0 : std::stoul(headers[
"Cache-Valid-Until"]);
288 auto cacheId = helper->mapURL2DPLCache[
path];
289 O2_SIGNPOST_EVENT_EMIT(ccdb, sid,
"populateCacheWith",
"Reusing %{public}s for %{public}s (DPL id %" PRIu64
")",
path.data(), headers[
"ETag"].data(), cacheId.value);
290 helper->mapURL2UUID[
path].cacheHit++;
291 responses.emplace_back(
Response{.
id = cacheId, .size = helper->mapURL2UUID[
path].size, .request =
nullptr});
295 O2_SIGNPOST_END(ccdb, sid,
"populateCacheWith",
"Finished populating cache with CCDB objects");
#define O2_DECLARE_DYNAMIC_LOG(name)
#define O2_SIGNPOST_END(log, id, name, format,...)
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
#define O2_SIGNPOST_START(log, id, name, format,...)
DeploymentMode deploymentMode
Where we thing this is running.