Project
Loading...
Searching...
No Matches
CCDBFetcherHelper.cxx
Go to the documentation of this file.
1// Copyright 2019-2025 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11#include "CCDBFetcherHelper.h"
13#include "Framework/Signpost.h"
16#include <TError.h>
17#include <TMemFile.h>
18
20
21namespace o2::framework
22{
23
25{
26 // find the first = sign in the string. If present drop everything after it
27 // and between it and the previous /.
28 auto pos = path.find('=');
29 if (pos == std::string::npos) {
30 auto entry = remappings.find(path);
31 return apis[entry == remappings.end() ? "" : entry->second];
32 }
33 auto pos2 = path.rfind('/', pos);
34 if (pos2 == std::string::npos || pos2 == pos - 1 || pos2 == 0) {
35 throw runtime_error_f("Malformed path %s", path.c_str());
36 }
37 auto entry = remappings.find(path.substr(0, pos2));
38 return apis[entry == remappings.end() ? "" : entry->second];
39}
40
41namespace
42{
43bool isOnlineRun(DataTakingContext const& dtc)
44{
46}
47} // namespace
48
50{
51 auto defHost = options.get<std::string>("condition-backend");
52 auto checkRate = options.get<int>("condition-tf-per-query");
53 auto checkMult = options.get<int>("condition-tf-per-query-multiplier");
54 helper.useTFSlice = options.get<int>("condition-use-slice-for-prescaling");
55 helper.timeToleranceMS = options.get<int64_t>("condition-time-tolerance");
56 helper.queryPeriodGlo = checkRate > 0 ? checkRate : std::numeric_limits<int>::max();
57 helper.queryPeriodFactor = checkMult == 0 ? 1 : checkMult;
58 std::string extraCond{};
59 if (helper.useTFSlice) {
60 extraCond = ". Use TFSlice";
61 if (helper.useTFSlice > 0) {
62 extraCond += fmt::format(" + max TFcounter jump <= {}", helper.useTFSlice);
63 }
64 }
65 LOGP(info, "CCDB Backend at: {}, validity check for every {} TF{}{}", defHost, helper.queryPeriodGlo,
66 helper.queryPeriodFactor == 1 ? std::string{} : (helper.queryPeriodFactor > 0 ? fmt::format(", (query for high-rate objects downscaled by {})", helper.queryPeriodFactor) : fmt::format(", (query downscaled as TFcounter%{})", -helper.queryPeriodFactor)),
67 extraCond);
68 LOGP(info, "Hook to enable signposts for CCDB messages at {}", (void*)&private_o2_log_ccdb->stacktrace);
69 auto remapString = options.get<std::string>("condition-remap");
70 ParserResult result = parseRemappings(remapString.c_str());
71 if (!result.error.empty()) {
72 throw runtime_error_f("Error while parsing remapping string %s", result.error.c_str());
73 }
74 helper.remappings = result.remappings;
75 helper.apis[""].init(defHost); // default backend
76 LOGP(info, "Initialised default CCDB host {}", defHost);
77 //
78 for (auto& entry : helper.remappings) { // init api instances for every host seen in the remapping
79 if (helper.apis.find(entry.second) == helper.apis.end()) {
80 helper.apis[entry.second].init(entry.second);
81 LOGP(info, "Initialised custom CCDB host {}", entry.second);
82 }
83 LOGP(info, "{} is remapped to {}", entry.first, entry.second);
84 }
85 helper.createdNotBefore = std::to_string(options.get<int64_t>("condition-not-before"));
86 helper.createdNotAfter = std::to_string(options.get<int64_t>("condition-not-after"));
87}
88
90{
91 std::unordered_map<std::string, std::string> remappings;
92 std::string currentUrl = "";
93
94 enum ParsingStates {
95 IN_BEGIN,
96 IN_BEGIN_URL,
97 IN_BEGIN_TARGET,
98 IN_END_TARGET,
99 IN_END_URL
100 };
101 ParsingStates state = IN_BEGIN;
102
103 while (true) {
104 switch (state) {
105 case IN_BEGIN: {
106 if (*str == 0) {
107 return {remappings, ""};
108 }
109 state = IN_BEGIN_URL;
110 }
111 case IN_BEGIN_URL: {
112 if ((strncmp("http://", str, 7) != 0) && (strncmp("https://", str, 8) != 0 && (strncmp("file://", str, 7) != 0))) {
113 return {remappings, "URL should start with either http:// or https:// or file://"};
114 }
115 state = IN_END_URL;
116 } break;
117 case IN_END_URL: {
118 char const* c = strchr(str, '=');
119 if (c == nullptr) {
120 return {remappings, "Expecting at least one target path, missing `='?"};
121 }
122 if ((c - str) == 0) {
123 return {remappings, "Empty url"};
124 }
125 currentUrl = std::string_view(str, c - str);
126 state = IN_BEGIN_TARGET;
127 str = c + 1;
128 } break;
129 case IN_BEGIN_TARGET: {
130 if (*str == 0) {
131 return {remappings, "Empty target"};
132 }
133 state = IN_END_TARGET;
134 } break;
135 case IN_END_TARGET: {
136 char const* c = strpbrk(str, ",;");
137 if (c == nullptr) {
138 if (remappings.count(str)) {
139 return {remappings, fmt::format("Path {} requested more than once.", str)};
140 }
141 remappings[std::string(str)] = currentUrl;
142 return {remappings, ""};
143 }
144 if ((c - str) == 0) {
145 return {remappings, "Empty target"};
146 }
147 auto key = std::string(str, c - str);
148 if (remappings.count(str)) {
149 return {remappings, fmt::format("Path {} requested more than once.", key)};
150 }
151 remappings[key] = currentUrl;
152 if (*c == ';') {
153 state = IN_BEGIN_URL;
154 } else {
155 state = IN_BEGIN_TARGET;
156 }
157 str = c + 1;
158 } break;
159 }
160 }
161}
162
163auto CCDBFetcherHelper::populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
164 std::vector<CCDBFetcherHelper::FetchOp> const& ops,
165 TimingInfo& timingInfo,
167 DataAllocator& allocator) -> std::vector<CCDBFetcherHelper::Response>
168{
169 int objCnt = -1;
170 // We use the timeslice, so that we hook into the same interval as the rest of the
171 // callback.
172 static bool isOnline = isOnlineRun(dtc);
173
174 auto sid = _o2_signpost_id_t{(int64_t)timingInfo.timeslice};
175 O2_SIGNPOST_START(ccdb, sid, "populateCacheWith", "Starting to populate cache with CCDB objects");
176 std::vector<Response> responses;
177 for (auto& op : ops) {
178 int64_t timestampToUse = op.timestamp;
179 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Fetching object for route %{public}s", DataSpecUtils::describe(op.spec).data());
180 objCnt++;
181 auto concrete = DataSpecUtils::asConcreteDataMatcher(op.spec);
182 Output output{concrete.origin, concrete.description, concrete.subSpec};
183 auto&& v = allocator.makeVector<char>(output);
184 std::map<std::string, std::string> metadata;
185 std::map<std::string, std::string> headers;
186 std::string path = op.url;
187 std::string etag = "";
188 int chRate = helper->queryPeriodGlo;
189 bool checkValidity = false;
190 if (op.runDependent > 0) {
191 if (op.runDependent == 1) {
192 metadata["runNumber"] = std::format("{}", op.runNumber);
193 } else if (op.runDependent == 2) {
194 timestampToUse = op.runNumber;
195 } else {
196 LOGP(fatal, "Undefined ccdb-run-dependent option {} for spec {}/{}/{}", op.runDependent,
197 concrete.origin.as<std::string>(), concrete.description.as<std::string>(), int(concrete.subSpec));
198 }
199 }
200 for (auto m : op.metadata) {
201 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Adding metadata %{public}s: %{public}s to the request", m.key.data(), m.value.data());
202 metadata[m.key] = m.value;
203 }
204 if (op.queryRate != 0) {
205 chRate = op.queryRate * helper->queryPeriodFactor;
206 }
207
208 const auto url2uuid = helper->mapURL2UUID.find(path);
209 if (url2uuid != helper->mapURL2UUID.end()) {
210 etag = url2uuid->second.etag;
211 // We check validity every chRate timeslices or if the cache is expired
212 uint64_t validUntil = url2uuid->second.cacheValidUntil;
213 // When the cache was populated. If the cache was populated after the timestamp, we need to check validity.
214 uint64_t cachePopulatedAt = url2uuid->second.cachePopulatedAt;
215 // If timestamp is before the time the element was cached or after the claimed validity, we need to check validity, again
216 // when online.
217 bool cacheExpired = (validUntil <= timestampToUse) || (op.timestamp < cachePopulatedAt);
218 if (isOnline || cacheExpired) {
219 if (!helper->useTFSlice) {
220 checkValidity = chRate > 0 ? (std::abs(int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) : (timingInfo.tfCounter % -chRate) == 0;
221 } else {
222 checkValidity = chRate > 0 ? (std::abs(int(timingInfo.timeslice - url2uuid->second.lastCheckedSlice)) >= chRate) : (timingInfo.timeslice % -chRate) == 0;
223 if (!checkValidity && helper->useTFSlice > std::abs(chRate)) { // make sure the interval is tolerated unless the check rate itself is too large
224 checkValidity = std::abs(int(timingInfo.tfCounter) - url2uuid->second.lastCheckedTF) > helper->useTFSlice;
225 }
226 }
227 }
228 } else {
229 checkValidity = true; // never skip check if the cache is empty
230 }
231
232 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "checkValidity is %{public}s for tf%{public}s %zu of %{public}s", checkValidity ? "true" : "false", helper->useTFSlice ? "ID" : "Slice", helper->useTFSlice ? timingInfo.timeslice : timingInfo.tfCounter, path.data());
233
234 const auto& api = helper->getAPI(path);
235 if (checkValidity && (!api.isSnapshotMode() || etag.empty())) { // in the snapshot mode the object needs to be fetched only once
236 LOGP(detail, "Loading {} for timestamp {}", path, timestampToUse);
237 api.loadFileToMemory(v, path, metadata, timestampToUse, &headers, etag, helper->createdNotAfter, helper->createdNotBefore);
238 if ((headers.count("Error") != 0) || (etag.empty() && v.empty())) {
239 LOGP(fatal, "Unable to find CCDB object {}/{}", path, timestampToUse);
240 // FIXME: I should send a dummy message.
241 continue;
242 }
243 // printing in case we find a default entry
244 if (headers.find("default") != headers.end()) {
245 LOGP(detail, "******** Default entry used for {} ********", path);
246 }
247 helper->mapURL2UUID[path].lastCheckedTF = timingInfo.tfCounter;
248 helper->mapURL2UUID[path].lastCheckedSlice = timingInfo.timeslice;
249 if (etag.empty()) {
250 helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
251 helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse;
252 helper->mapURL2UUID[path].cacheMiss++;
253 helper->mapURL2UUID[path].size = v.size();
254 helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
255 helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
256 auto size = v.size();
257 api.appendFlatHeader(v, headers);
258 auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
259 helper->mapURL2DPLCache[path] = cacheId;
260 responses.emplace_back(Response{.id = cacheId, .size = size, .request = nullptr});
261 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ", size %zu)", path.data(), headers["ETag"].data(), cacheId.value, size);
262 continue;
263 }
264 if (v.size()) { // but should be overridden by fresh object
265 // somewhere here pruneFromCache should be called
266 helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
267 helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse;
268 helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]);
269 helper->mapURL2UUID[path].cacheMiss++;
270 helper->mapURL2UUID[path].size = v.size();
271 helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
272 helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
273 auto size = v.size();
274 api.appendFlatHeader(v, headers);
275 auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
276 helper->mapURL2DPLCache[path] = cacheId;
277 responses.emplace_back(Response{.id = cacheId, .size = size, .request = nullptr});
278 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
279 // one could modify the adoptContainer to take optional old cacheID to clean:
280 // mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]);
281 continue;
282 } else {
283 // Only once the etag is actually used, we get the information on how long the object is valid
284 helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]);
285 }
286 }
287 // cached object is fine
288 auto cacheId = helper->mapURL2DPLCache[path];
289 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Reusing %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
290 helper->mapURL2UUID[path].cacheHit++;
291 responses.emplace_back(Response{.id = cacheId, .size = helper->mapURL2UUID[path].size, .request = nullptr});
292 allocator.adoptFromCache(output, cacheId, header::gSerializationMethodCCDB);
293 // the outputBuffer was not used, can we destroy it?
294 }
295 O2_SIGNPOST_END(ccdb, sid, "populateCacheWith", "Finished populating cache with CCDB objects");
296 return responses;
297};
298
299} // namespace o2::framework
benchmark::State & state
size_t cachePopulatedAt
std::string etag
uint32_t op
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
uint16_t pos
Definition RawData.h:3
uint32_t c
Definition RawData.h:2
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
StringRef key
const GLfloat * m
Definition glcorearb.h:4066
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint entry
Definition glcorearb.h:5735
GLsizeiptr size
Definition glcorearb.h:659
const GLdouble * v
Definition glcorearb.h:832
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
bool isOnlineRun(DataTakingContext const &dtc)
RuntimeErrorRef runtime_error_f(const char *,...)
constexpr o2::header::SerializationMethod gSerializationMethodCCDB
Definition DataHeader.h:329
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
o2::ccdb::CcdbApi & getAPI(const std::string &path)
std::unordered_map< std::string, std::string > remappings
static auto populateCacheWith(std::shared_ptr< CCDBFetcherHelper > const &helper, std::vector< FetchOp > const &ops, TimingInfo &timingInfo, DataTakingContext &dtc, DataAllocator &allocator) -> std::vector< Response >
std::unordered_map< std::string, o2::ccdb::CcdbApi > apis
static void initialiseHelper(CCDBFetcherHelper &helper, ConfigParamRegistry const &options)
static ParserResult parseRemappings(char const *)
static std::string describe(InputSpec const &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
DeploymentMode deploymentMode
Where we thing this is running.
header::DataOrigin origin
Definition Output.h:28
const std::string str