Project
Loading...
Searching...
No Matches
CCDBHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include "CCDBHelpers.h"
15#include "Framework/Logger.h"
21#include "CCDB/CcdbApi.h"
23#include "Framework/Signpost.h"
24#include <TError.h>
25#include <TMemFile.h>
26
28
29namespace o2::framework
30{
31
32namespace
33{
34struct CCDBFetcherHelper {
35 struct CCDBCacheInfo {
36 std::string etag;
37 size_t cacheValidUntil = 0;
38 size_t cachePopulatedAt = 0;
39 size_t cacheMiss = 0;
40 size_t cacheHit = 0;
41 size_t size = 0;
42 size_t minSize = -1ULL;
43 size_t maxSize = 0;
46 };
47
48 struct RemapMatcher {
49 std::string path;
50 };
51
52 struct RemapTarget {
53 std::string url;
54 };
55
58 std::unordered_map<std::string, CCDBCacheInfo> mapURL2UUID;
59 std::unordered_map<std::string, DataAllocator::CacheId> mapURL2DPLCache;
60 std::string createdNotBefore = "0";
61 std::string createdNotAfter = "3385078236000";
62 std::unordered_map<std::string, o2::ccdb::CcdbApi> apis;
63 std::vector<OutputRoute> routes;
64 std::unordered_map<std::string, std::string> remappings;
65 uint32_t lastCheckedTFCounterOrbReset = 0; // last checkecked TFcounter for bulk check
69 int useTFSlice = 0; // if non-zero, use TFslice instead of TFcounter for the validity check. If > requested checking rate, add additional check on |lastTFchecked - TCcounter|<=useTFSlice
70
71 o2::ccdb::CcdbApi& getAPI(const std::string& path)
72 {
73 // find the first = sign in the string. If present drop everything after it
74 // and between it and the previous /.
75 auto pos = path.find('=');
76 if (pos == std::string::npos) {
77 auto entry = remappings.find(path);
78 return apis[entry == remappings.end() ? "" : entry->second];
79 }
80 auto pos2 = path.rfind('/', pos);
81 if (pos2 == std::string::npos || pos2 == pos - 1 || pos2 == 0) {
82 throw runtime_error_f("Malformed path %s", path.c_str());
83 }
84 auto entry = remappings.find(path.substr(0, pos2));
85 return apis[entry == remappings.end() ? "" : entry->second];
86 }
87};
88} // namespace
89
90bool isPrefix(std::string_view prefix, std::string_view full)
91{
92 return prefix == full.substr(0, prefix.size());
93}
94
96{
97 std::unordered_map<std::string, std::string> remappings;
98 std::string currentUrl = "";
99
100 enum ParsingStates {
101 IN_BEGIN,
102 IN_BEGIN_URL,
103 IN_BEGIN_TARGET,
104 IN_END_TARGET,
105 IN_END_URL
106 };
107 ParsingStates state = IN_BEGIN;
108
109 while (true) {
110 switch (state) {
111 case IN_BEGIN: {
112 if (*str == 0) {
113 return {remappings, ""};
114 }
115 state = IN_BEGIN_URL;
116 }
117 case IN_BEGIN_URL: {
118 if ((strncmp("http://", str, 7) != 0) && (strncmp("https://", str, 8) != 0 && (strncmp("file://", str, 7) != 0))) {
119 return {remappings, "URL should start with either http:// or https:// or file://"};
120 }
121 state = IN_END_URL;
122 } break;
123 case IN_END_URL: {
124 char const* c = strchr(str, '=');
125 if (c == nullptr) {
126 return {remappings, "Expecting at least one target path, missing `='?"};
127 }
128 if ((c - str) == 0) {
129 return {remappings, "Empty url"};
130 }
131 currentUrl = std::string_view(str, c - str);
132 state = IN_BEGIN_TARGET;
133 str = c + 1;
134 } break;
135 case IN_BEGIN_TARGET: {
136 if (*str == 0) {
137 return {remappings, "Empty target"};
138 }
139 state = IN_END_TARGET;
140 } break;
141 case IN_END_TARGET: {
142 char const* c = strpbrk(str, ",;");
143 if (c == nullptr) {
144 if (remappings.count(str)) {
145 return {remappings, fmt::format("Path {} requested more than once.", str)};
146 }
147 remappings[std::string(str)] = currentUrl;
148 return {remappings, ""};
149 }
150 if ((c - str) == 0) {
151 return {remappings, "Empty target"};
152 }
153 auto key = std::string(str, c - str);
154 if (remappings.count(str)) {
155 return {remappings, fmt::format("Path {} requested more than once.", key)};
156 }
157 remappings[key] = currentUrl;
158 if (*c == ';') {
159 state = IN_BEGIN_URL;
160 } else {
161 state = IN_BEGIN_TARGET;
162 }
163 str = c + 1;
164 } break;
165 }
166 }
167}
168
169void initialiseHelper(CCDBFetcherHelper& helper, ConfigParamRegistry const& options, std::vector<o2::framework::OutputRoute> const& outputRoutes)
170{
171 std::unordered_map<std::string, bool> accountedSpecs;
172 auto defHost = options.get<std::string>("condition-backend");
173 auto checkRate = options.get<int>("condition-tf-per-query");
174 auto checkMult = options.get<int>("condition-tf-per-query-multiplier");
175 helper.useTFSlice = options.get<int>("condition-use-slice-for-prescaling");
176 helper.timeToleranceMS = options.get<int64_t>("condition-time-tolerance");
177 helper.queryPeriodGlo = checkRate > 0 ? checkRate : std::numeric_limits<int>::max();
178 helper.queryPeriodFactor = checkMult == 0 ? 1 : checkMult;
179 std::string extraCond{};
180 if (helper.useTFSlice) {
181 extraCond = ". Use TFSlice";
182 if (helper.useTFSlice > 0) {
183 extraCond += fmt::format(" + max TFcounter jump <= {}", helper.useTFSlice);
184 }
185 }
186 LOGP(info, "CCDB Backend at: {}, validity check for every {} TF{}{}", defHost, helper.queryPeriodGlo,
187 helper.queryPeriodFactor == 1 ? std::string{} : (helper.queryPeriodFactor > 0 ? fmt::format(", (query for high-rate objects downscaled by {})", helper.queryPeriodFactor) : fmt::format(", (query downscaled as TFcounter%{})", -helper.queryPeriodFactor)),
188 extraCond);
189 LOGP(info, "Hook to enable signposts for CCDB messages at {}", (void*)&private_o2_log_ccdb->stacktrace);
190 auto remapString = options.get<std::string>("condition-remap");
192 if (!result.error.empty()) {
193 throw runtime_error_f("Error while parsing remapping string %s", result.error.c_str());
194 }
195 helper.remappings = result.remappings;
196 helper.apis[""].init(defHost); // default backend
197 LOGP(info, "Initialised default CCDB host {}", defHost);
198 //
199 for (auto& entry : helper.remappings) { // init api instances for every host seen in the remapping
200 if (helper.apis.find(entry.second) == helper.apis.end()) {
201 helper.apis[entry.second].init(entry.second);
202 LOGP(info, "Initialised custom CCDB host {}", entry.second);
203 }
204 LOGP(info, "{} is remapped to {}", entry.first, entry.second);
205 }
206 helper.createdNotBefore = std::to_string(options.get<int64_t>("condition-not-before"));
207 helper.createdNotAfter = std::to_string(options.get<int64_t>("condition-not-after"));
208
209 for (auto& route : outputRoutes) {
210 if (route.matcher.lifetime != Lifetime::Condition) {
211 continue;
212 }
213 auto specStr = DataSpecUtils::describe(route.matcher);
214 if (accountedSpecs.find(specStr) != accountedSpecs.end()) {
215 continue;
216 }
217 accountedSpecs[specStr] = true;
218 helper.routes.push_back(route);
219 LOGP(info, "The following route is a condition {}", DataSpecUtils::describe(route.matcher));
220 for (auto& metadata : route.matcher.metadata) {
221 if (metadata.type == VariantType::String) {
222 LOGP(info, "- {}: {}", metadata.name, metadata.defaultValue.asString());
223 }
224 }
225 }
226}
227
229{
230 Int_t previousErrorLevel = gErrorIgnoreLevel;
231 gErrorIgnoreLevel = kFatal;
232 TMemFile memFile("name", const_cast<char*>(v.data()), v.size(), "READ");
233 gErrorIgnoreLevel = previousErrorLevel;
234 if (memFile.IsZombie()) {
235 throw runtime_error("CTP is Zombie");
236 }
237 TClass* tcl = TClass::GetClass(typeid(std::vector<Long64_t>));
238 void* result = ccdb::CcdbApi::extractFromTFile(memFile, tcl);
239 if (!result) {
240 throw runtime_error_f("Couldn't retrieve object corresponding to %s from TFile", tcl->GetName());
241 }
242 memFile.Close();
243 auto* ctp = (std::vector<Long64_t>*)result;
244 return (*ctp)[0];
245};
246
251
253 DataAllocator& allocator,
254 std::unordered_map<std::string, DataAllocator::CacheId> const& cache,
255 std::string const& path,
256 Output const& output,
259{
260 auto oldIt = cache.find(path);
261 auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, method);
262 if (oldIt != cache.end()) {
263 allocator.pruneFromCache(oldIt->second);
264 }
265 return cacheId;
266}
267
268auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
269 int64_t timestamp,
270 TimingInfo& timingInfo,
272 DataAllocator& allocator) -> void
273{
274 std::string ccdbMetadataPrefix = "ccdb-metadata-";
275 int objCnt = -1;
276 // We use the timeslice, so that we hook into the same interval as the rest of the
277 // callback.
278 static bool isOnline = isOnlineRun(dtc);
279
280 auto sid = _o2_signpost_id_t{(int64_t)timingInfo.timeslice};
281 O2_SIGNPOST_START(ccdb, sid, "populateCacheWith", "Starting to populate cache with CCDB objects");
282 for (auto& route : helper->routes) {
283 int64_t timestampToUse = timestamp;
284 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Fetching object for route %{public}s", DataSpecUtils::describe(route.matcher).data());
285 objCnt++;
286 auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
287 Output output{concrete.origin, concrete.description, concrete.subSpec};
288 auto&& v = allocator.makeVector<char>(output);
289 std::map<std::string, std::string> metadata;
290 std::map<std::string, std::string> headers;
291 std::string path = "";
292 std::string etag = "";
293 int chRate = helper->queryPeriodGlo;
294 bool checkValidity = false;
295 for (auto& meta : route.matcher.metadata) {
296 if (meta.name == "ccdb-path") {
297 path = meta.defaultValue.get<std::string>();
298 } else if (meta.name == "ccdb-run-dependent" && meta.defaultValue.get<int>() > 0) {
299 if (meta.defaultValue.get<int>() == 1) {
300 metadata["runNumber"] = dtc.runNumber;
301 } else if (meta.defaultValue.get<int>() == 2) {
302 timestampToUse = std::stoi(dtc.runNumber);
303 } else {
304 LOGP(fatal, "Undefined ccdb-run-dependent option {} for spec {}/{}/{}", meta.defaultValue.get<int>(), concrete.origin.as<std::string>(), concrete.description.as<std::string>(), int(concrete.subSpec));
305 }
306 } else if (isPrefix(ccdbMetadataPrefix, meta.name)) {
307 std::string key = meta.name.substr(ccdbMetadataPrefix.size());
308 auto value = meta.defaultValue.get<std::string>();
309 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Adding metadata %{public}s: %{public}s to the request", key.data(), value.data());
310 metadata[key] = value;
311 } else if (meta.name == "ccdb-query-rate") {
312 chRate = std::max(1, meta.defaultValue.get<int>()) * helper->queryPeriodFactor;
313 }
314 }
315 const auto url2uuid = helper->mapURL2UUID.find(path);
316 if (url2uuid != helper->mapURL2UUID.end()) {
317 etag = url2uuid->second.etag;
318 // We check validity every chRate timeslices or if the cache is expired
319 uint64_t validUntil = url2uuid->second.cacheValidUntil;
320 // When the cache was populated. If the cache was populated after the timestamp, we need to check validity.
321 uint64_t cachePopulatedAt = url2uuid->second.cachePopulatedAt;
322 // If timestamp is before the time the element was cached or after the claimed validity, we need to check validity, again
323 // when online.
324 bool cacheExpired = (validUntil <= timestampToUse) || (timestamp < cachePopulatedAt);
325 if (isOnline || cacheExpired) {
326 if (!helper->useTFSlice) {
327 checkValidity = chRate > 0 ? (std::abs(int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) : (timingInfo.tfCounter % -chRate) == 0;
328 } else {
329 checkValidity = chRate > 0 ? (std::abs(int(timingInfo.timeslice - url2uuid->second.lastCheckedSlice)) >= chRate) : (timingInfo.timeslice % -chRate) == 0;
330 if (!checkValidity && helper->useTFSlice > std::abs(chRate)) { // make sure the interval is tolerated unless the check rate itself is too large
331 checkValidity = std::abs(int(timingInfo.tfCounter) - url2uuid->second.lastCheckedTF) > helper->useTFSlice;
332 }
333 }
334 }
335 } else {
336 checkValidity = true; // never skip check if the cache is empty
337 }
338
339 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "checkValidity is %{public}s for tf%{public}s %zu of %{public}s", checkValidity ? "true" : "false", helper->useTFSlice ? "ID" : "Slice", helper->useTFSlice ? timingInfo.timeslice : timingInfo.tfCounter, path.data());
340
341 const auto& api = helper->getAPI(path);
342 if (checkValidity && (!api.isSnapshotMode() || etag.empty())) { // in the snapshot mode the object needs to be fetched only once
343 LOGP(detail, "Loading {} for timestamp {}", path, timestampToUse);
344 api.loadFileToMemory(v, path, metadata, timestampToUse, &headers, etag, helper->createdNotAfter, helper->createdNotBefore);
345 if ((headers.count("Error") != 0) || (etag.empty() && v.empty())) {
346 LOGP(fatal, "Unable to find CCDB object {}/{}", path, timestampToUse);
347 // FIXME: I should send a dummy message.
348 continue;
349 }
350 // printing in case we find a default entry
351 if (headers.find("default") != headers.end()) {
352 LOGP(detail, "******** Default entry used for {} ********", path);
353 }
354 helper->mapURL2UUID[path].lastCheckedTF = timingInfo.tfCounter;
355 helper->mapURL2UUID[path].lastCheckedSlice = timingInfo.timeslice;
356 if (etag.empty()) {
357 helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
358 helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse;
359 helper->mapURL2UUID[path].cacheMiss++;
360 helper->mapURL2UUID[path].size = v.size();
361 helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
362 helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
363 helper->totalFetchedBytes += v.size();
364 helper->totalRequestedBytes += v.size();
365 api.appendFlatHeader(v, headers);
366 auto cacheId = CCDBHelpers::adoptAndReplaceCachedMessage(allocator, helper->mapURL2DPLCache, path, output, std::move(v), header::gSerializationMethodCCDB);
367 helper->mapURL2DPLCache[path] = cacheId;
368 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
369 continue;
370 }
371 if (v.size()) { // but should be overridden by fresh object
372 helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
373 helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse;
374 helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]);
375 helper->mapURL2UUID[path].cacheMiss++;
376 helper->mapURL2UUID[path].size = v.size();
377 helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
378 helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
379 helper->totalFetchedBytes += v.size();
380 helper->totalRequestedBytes += v.size();
381 api.appendFlatHeader(v, headers);
382 auto cacheId = CCDBHelpers::adoptAndReplaceCachedMessage(allocator, helper->mapURL2DPLCache, path, output, std::move(v), header::gSerializationMethodCCDB);
383 helper->mapURL2DPLCache[path] = cacheId;
384 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
385 continue;
386 } else {
387 // Only once the etag is actually used, we get the information on how long the object is valid
388 helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]);
389 }
390 }
391 // cached object is fine
392 auto cacheId = helper->mapURL2DPLCache[path];
393 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Reusing %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
394 helper->mapURL2UUID[path].cacheHit++;
395 helper->totalRequestedBytes += helper->mapURL2UUID[path].size;
396 allocator.adoptFromCache(output, cacheId, header::gSerializationMethodCCDB);
397 // the outputBuffer was not used, can we destroy it?
398 }
399 O2_SIGNPOST_END(ccdb, sid, "populateCacheWith", "Finished populating cache with CCDB objects");
400};
401
403{
404 return adaptStateful([](CallbackService& callbacks, ConfigParamRegistry const& options, DeviceSpec const& spec) {
405 std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
406 initialiseHelper(*helper, options, spec.outputs);
409 callbacks.set<CallbackService::Id::Stop>([helper]() {
410 LOGP(info, "CCDB cache miss/hit ratio ({} fetched / {} requested bytes):", helper->totalFetchedBytes, helper->totalRequestedBytes);
411 for (auto& entry : helper->mapURL2UUID) {
412 LOGP(info, " {}: {}/{} ({}-{} bytes)", entry.first, entry.second.cacheMiss, entry.second.cacheHit, entry.second.minSize, entry.second.maxSize);
413 }
414 });
415
416 return adaptStateless([helper](DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo, DataProcessingStats& stats) {
417 auto sid = _o2_signpost_id_t{(int64_t)timingInfo.timeslice};
418 O2_SIGNPOST_START(ccdb, sid, "fetchFromCCDB", "Fetching CCDB objects for timeslice %" PRIu64, (uint64_t)timingInfo.timeslice);
419 static Long64_t orbitResetTime = -1;
420 static size_t lastTimeUsed = -1;
422 LOGP(info, "Dummy creation time is not supported for CCDB objects. Setting creation to last one used {}.", lastTimeUsed);
423 timingInfo.creation = lastTimeUsed;
424 }
425 lastTimeUsed = timingInfo.creation;
426 // Fetch the CCDB object for the CTP
427 {
428 const std::string path = "CTP/Calib/OrbitReset";
429 std::map<std::string, std::string> metadata;
430 std::map<std::string, std::string> headers;
431 std::string etag;
432 int32_t counter = helper->useTFSlice ? timingInfo.timeslice : timingInfo.tfCounter;
433 bool checkValidity = std::abs(int(counter - helper->lastCheckedTFCounterOrbReset)) >= helper->queryPeriodGlo;
434 const auto url2uuid = helper->mapURL2UUID.find(path);
435 if (url2uuid != helper->mapURL2UUID.end()) {
436 etag = url2uuid->second.etag;
437 } else {
438 checkValidity = true; // never skip check if the cache is empty
439 }
440 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "checkValidity is %{public}s for tf%{public}s %d of %{public}s",
441 checkValidity ? "true" : "false", helper->useTFSlice ? "ID" : "Slice", counter, path.data());
442 Output output{"CTP", "OrbitReset", 0};
443 Long64_t newOrbitResetTime = orbitResetTime;
444 auto&& v = allocator.makeVector<char>(output);
445 const auto& api = helper->getAPI(path);
446 if (checkValidity && (!api.isSnapshotMode() || etag.empty())) { // in the snapshot mode the object needs to be fetched only once
447 helper->lastCheckedTFCounterOrbReset = counter;
448 api.loadFileToMemory(v, path, metadata, timingInfo.creation, &headers, etag, helper->createdNotAfter, helper->createdNotBefore);
449 if ((headers.count("Error") != 0) || (etag.empty() && v.empty())) {
450 LOGP(fatal, "Unable to find CCDB object {}/{}", path, timingInfo.creation);
451 // FIXME: I should send a dummy message.
452 return;
453 }
454 if (etag.empty()) {
455 helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
456 helper->mapURL2UUID[path].cacheMiss++;
457 helper->mapURL2UUID[path].size = v.size();
458 helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
459 helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
460 helper->totalFetchedBytes += v.size();
461 helper->totalRequestedBytes += v.size();
462 newOrbitResetTime = getOrbitResetTime(v);
463 api.appendFlatHeader(v, headers);
464 auto cacheId = CCDBHelpers::adoptAndReplaceCachedMessage(allocator, helper->mapURL2DPLCache, path, output, std::move(v), header::gSerializationMethodNone);
465 helper->mapURL2DPLCache[path] = cacheId;
466 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
467 } else if (v.size()) { // but should be overridden by fresh object
468 helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
469 helper->mapURL2UUID[path].cacheMiss++;
470 helper->mapURL2UUID[path].size = v.size();
471 helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
472 helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
473 helper->totalFetchedBytes += v.size();
474 helper->totalRequestedBytes += v.size();
475 newOrbitResetTime = getOrbitResetTime(v);
476 api.appendFlatHeader(v, headers);
477 auto cacheId = CCDBHelpers::adoptAndReplaceCachedMessage(allocator, helper->mapURL2DPLCache, path, output, std::move(v), header::gSerializationMethodNone);
478 helper->mapURL2DPLCache[path] = cacheId;
479 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
480 }
481 // cached object is fine
482 }
483 auto cacheId = helper->mapURL2DPLCache[path];
484 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Reusing %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
485 helper->mapURL2UUID[path].cacheHit++;
486 helper->totalRequestedBytes += helper->mapURL2UUID[path].size;
488
489 if (newOrbitResetTime != orbitResetTime) {
490 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Orbit reset time changed from %lld to %lld", orbitResetTime, newOrbitResetTime);
491 orbitResetTime = newOrbitResetTime;
492 dtc.orbitResetTimeMUS = orbitResetTime;
493 }
494 }
495
496 int64_t timestamp = ceil((timingInfo.firstTForbit * o2::constants::lhc::LHCOrbitNS / 1000 + orbitResetTime) / 1000); // RS ceilf precision is not enough
497 if (std::abs(int64_t(timingInfo.creation) - timestamp) > helper->timeToleranceMS) {
498 static bool notWarnedYet = true;
499 if (notWarnedYet) {
500 LOGP(warn, "timestamp {} for orbit {} and orbit reset time {} differs by >{} from the TF creation time {}, use the latter", timestamp, timingInfo.firstTForbit, orbitResetTime / 1000, helper->timeToleranceMS, timingInfo.creation);
501 notWarnedYet = false;
502 // apparently the orbit reset time from the CTP object makes no sense (i.e. orbit was reset for this run w/o create an object, as it happens for technical runs)
503 dtc.orbitResetTimeMUS = 1000 * timingInfo.creation - timingInfo.firstTForbit * o2::constants::lhc::LHCOrbitNS / 1000;
504 }
505 timestamp = timingInfo.creation;
506 }
507 // Fetch the rest of the objects.
508 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Fetching objects. Run %{public}s. OrbitResetTime %lld. Creation %lld. Timestamp %lld. firstTForbit %" PRIu32,
509 dtc.runNumber.data(), orbitResetTime, timingInfo.creation, timestamp, timingInfo.firstTForbit);
510
511 populateCacheWith(helper, timestamp, timingInfo, dtc, allocator);
512 stats.updateStats({(int)ProcessingStatsId::CCDB_CACHE_FETCHED_BYTES, DataProcessingStats::Op::Set, (int64_t)helper->totalFetchedBytes});
513 stats.updateStats({(int)ProcessingStatsId::CCDB_CACHE_REQUESTED_BYTES, DataProcessingStats::Op::Set, (int64_t)helper->totalRequestedBytes});
514 O2_SIGNPOST_END(ccdb, _o2_signpost_id_t{(int64_t)timingInfo.timeslice}, "fetchFromCCDB", "Fetching CCDB objects");
515 }); });
516}
517
518} // namespace o2::framework
benchmark::State & state
std::unordered_map< std::string, std::string > remappings
size_t cachePopulatedAt
std::string etag
Header to collect LHC related constants.
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
uint16_t pos
Definition RawData.h:3
uint32_t c
Definition RawData.h:2
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:490
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:609
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:523
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:603
StringRef key
static void * extractFromTFile(TFile &file, TClass const *cl, const char *what=CCDBOBJECT_ENTRY)
Definition CcdbApi.cxx:905
o2::pmr::vector< T > makeVector(const Output &spec, Args &&... args)
void adoptFromCache(Output const &spec, CacheId id, header::SerializationMethod method=header::gSerializationMethodNone)
Adopt an already cached message, using an already provided CacheId.
CacheId adoptContainer(const Output &, ContainerT &, CacheStrategy, o2::header::SerializationMethod)
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint entry
Definition glcorearb.h:5735
GLsizeiptr size
Definition glcorearb.h:659
const GLdouble * v
Definition glcorearb.h:832
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
GLuint counter
Definition glcorearb.h:3987
constexpr double LHCOrbitNS
Defining ITS Vertex explicitly as messageable.
Definition Cartesian.h:288
RuntimeErrorRef runtime_error(const char *)
auto populateCacheWith(std::shared_ptr< CCDBFetcherHelper > const &helper, int64_t timestamp, TimingInfo &timingInfo, DataTakingContext &dtc, DataAllocator &allocator) -> void
bool isOnlineRun(DataTakingContext const &dtc)
bool isPrefix(std::string_view prefix, std::string_view full)
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
RuntimeErrorRef runtime_error_f(const char *,...)
auto getOrbitResetTime(o2::pmr::vector< char > const &v) -> Long64_t
void initialiseHelper(CCDBFetcherHelper &helper, ConfigParamRegistry const &options, std::vector< o2::framework::OutputRoute > const &outputRoutes)
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
constexpr o2::header::SerializationMethod gSerializationMethodNone
Definition DataHeader.h:327
constexpr o2::header::SerializationMethod gSerializationMethodCCDB
Definition DataHeader.h:329
std::vector< T, fair::mq::pmr::polymorphic_allocator< T > > vector
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
o2::ccdb::CcdbApi & getAPI(const std::string &path)
std::vector< OutputRoute > routes
std::unordered_map< std::string, std::string > remappings
std::unordered_map< std::string, DataAllocator::CacheId > mapURL2DPLCache
std::unordered_map< std::string, o2::ccdb::CcdbApi > apis
std::unordered_map< std::string, CCDBCacheInfo > mapURL2UUID
static DataAllocator::CacheId adoptAndReplaceCachedMessage(DataAllocator &allocator, std::unordered_map< std::string, DataAllocator::CacheId > const &cache, std::string const &path, Output const &output, o2::pmr::vector< char > &&v, o2::header::SerializationMethod method)
static ParserResult parseRemappings(char const *)
static AlgorithmSpec fetchFromCCDB()
static constexpr uint64_t DUMMY_CREATION_TIME_OFFSET
Helper struct to hold statistics about the data processing happening.
static std::string describe(InputSpec const &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
long orbitResetTimeMUS
The start time of the first orbit in microseconds(!)
DeploymentMode deploymentMode
Where we thing this is running.
std::string runNumber
The current run number.
std::vector< OutputRoute > outputs
Definition DeviceSpec.h:63
header::DataOrigin origin
Definition Output.h:28
uint32_t tfCounter
the orbit the TF begins
Definition TimingInfo.h:32
const std::string str