Project
Loading...
Searching...
No Matches
CCDBHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include "CCDBHelpers.h"
14#include "Framework/Logger.h"
20#include "CCDB/CcdbApi.h"
22#include "Framework/Signpost.h"
23#include <TError.h>
24#include <TMemFile.h>
25
27
28namespace o2::framework
29{
30
31namespace {
32struct CCDBFetcherHelper {
33 struct CCDBCacheInfo {
34 std::string etag;
35 size_t cacheValidUntil = 0;
36 size_t cachePopulatedAt = 0;
37 size_t cacheMiss = 0;
38 size_t cacheHit = 0;
39 size_t minSize = -1ULL;
40 size_t maxSize = 0;
43 };
44
45 struct RemapMatcher {
46 std::string path;
47 };
48
49 struct RemapTarget {
50 std::string url;
51 };
52
53 std::unordered_map<std::string, CCDBCacheInfo> mapURL2UUID;
54 std::unordered_map<std::string, DataAllocator::CacheId> mapURL2DPLCache;
55 std::string createdNotBefore = "0";
56 std::string createdNotAfter = "3385078236000";
57 std::unordered_map<std::string, o2::ccdb::CcdbApi> apis;
58 std::vector<OutputRoute> routes;
59 std::unordered_map<std::string, std::string> remappings;
60 uint32_t lastCheckedTFCounterOrbReset = 0; // last checkecked TFcounter for bulk check
63 int64_t timeToleranceMS = 5000;
64 int useTFSlice = 0; // if non-zero, use TFslice instead of TFcounter for the validity check. If > requested checking rate, add additional check on |lastTFchecked - TCcounter|<=useTFSlice
65
66 o2::ccdb::CcdbApi& getAPI(const std::string& path)
67 {
68 // find the first = sign in the string. If present drop everything after it
69 // and between it and the previous /.
70 auto pos = path.find('=');
71 if (pos == std::string::npos) {
72 auto entry = remappings.find(path);
73 return apis[entry == remappings.end() ? "" : entry->second];
74 }
75 auto pos2 = path.rfind('/', pos);
76 if (pos2 == std::string::npos || pos2 == pos - 1 || pos2 == 0) {
77 throw runtime_error_f("Malformed path %s", path.c_str());
78 }
79 auto entry = remappings.find(path.substr(0, pos2));
80 return apis[entry == remappings.end() ? "" : entry->second];
81 }
82};
83}
84
85bool isPrefix(std::string_view prefix, std::string_view full)
86{
87 return prefix == full.substr(0, prefix.size());
88}
89
91{
92 std::unordered_map<std::string, std::string> remappings;
93 std::string currentUrl = "";
94
95 enum ParsingStates {
96 IN_BEGIN,
97 IN_BEGIN_URL,
98 IN_BEGIN_TARGET,
99 IN_END_TARGET,
100 IN_END_URL
101 };
102 ParsingStates state = IN_BEGIN;
103
104 while (true) {
105 switch (state) {
106 case IN_BEGIN: {
107 if (*str == 0) {
108 return {remappings, ""};
109 }
110 state = IN_BEGIN_URL;
111 }
112 case IN_BEGIN_URL: {
113 if ((strncmp("http://", str, 7) != 0) && (strncmp("https://", str, 8) != 0 && (strncmp("file://", str, 7) != 0))) {
114 return {remappings, "URL should start with either http:// or https:// or file://"};
115 }
116 state = IN_END_URL;
117 } break;
118 case IN_END_URL: {
119 char const* c = strchr(str, '=');
120 if (c == nullptr) {
121 return {remappings, "Expecting at least one target path, missing `='?"};
122 }
123 if ((c - str) == 0) {
124 return {remappings, "Empty url"};
125 }
126 currentUrl = std::string_view(str, c - str);
127 state = IN_BEGIN_TARGET;
128 str = c + 1;
129 } break;
130 case IN_BEGIN_TARGET: {
131 if (*str == 0) {
132 return {remappings, "Empty target"};
133 }
134 state = IN_END_TARGET;
135 } break;
136 case IN_END_TARGET: {
137 char const* c = strpbrk(str, ",;");
138 if (c == nullptr) {
139 if (remappings.count(str)) {
140 return {remappings, fmt::format("Path {} requested more than once.", str)};
141 }
142 remappings[std::string(str)] = currentUrl;
143 return {remappings, ""};
144 }
145 if ((c - str) == 0) {
146 return {remappings, "Empty target"};
147 }
148 auto key = std::string(str, c - str);
149 if (remappings.count(str)) {
150 return {remappings, fmt::format("Path {} requested more than once.", key)};
151 }
152 remappings[key] = currentUrl;
153 if (*c == ';') {
154 state = IN_BEGIN_URL;
155 } else {
156 state = IN_BEGIN_TARGET;
157 }
158 str = c + 1;
159 } break;
160 }
161 }
162}
163
164void initialiseHelper(CCDBFetcherHelper& helper, ConfigParamRegistry const& options, std::vector<o2::framework::OutputRoute> const& outputRoutes)
165{
166 std::unordered_map<std::string, bool> accountedSpecs;
167 auto defHost = options.get<std::string>("condition-backend");
168 auto checkRate = options.get<int>("condition-tf-per-query");
169 auto checkMult = options.get<int>("condition-tf-per-query-multiplier");
170 helper.useTFSlice = options.get<int>("condition-use-slice-for-prescaling");
171 helper.timeToleranceMS = options.get<int64_t>("condition-time-tolerance");
172 helper.queryPeriodGlo = checkRate > 0 ? checkRate : std::numeric_limits<int>::max();
173 helper.queryPeriodFactor = checkMult == 0 ? 1 : checkMult;
174 std::string extraCond{};
175 if (helper.useTFSlice) {
176 extraCond = ". Use TFSlice";
177 if (helper.useTFSlice > 0) {
178 extraCond += fmt::format(" + max TFcounter jump <= {}", helper.useTFSlice);
179 }
180 }
181 LOGP(info, "CCDB Backend at: {}, validity check for every {} TF{}{}", defHost, helper.queryPeriodGlo,
182 helper.queryPeriodFactor == 1 ? std::string{} : (helper.queryPeriodFactor > 0 ? fmt::format(", (query for high-rate objects downscaled by {})", helper.queryPeriodFactor) : fmt::format(", (query downscaled as TFcounter%{})", -helper.queryPeriodFactor)),
183 extraCond);
184 LOGP(info, "Hook to enable signposts for CCDB messages at {}", (void*)&private_o2_log_ccdb->stacktrace);
185 auto remapString = options.get<std::string>("condition-remap");
187 if (!result.error.empty()) {
188 throw runtime_error_f("Error while parsing remapping string %s", result.error.c_str());
189 }
190 helper.remappings = result.remappings;
191 helper.apis[""].init(defHost); // default backend
192 LOGP(info, "Initialised default CCDB host {}", defHost);
193 //
194 for (auto& entry : helper.remappings) { // init api instances for every host seen in the remapping
195 if (helper.apis.find(entry.second) == helper.apis.end()) {
196 helper.apis[entry.second].init(entry.second);
197 LOGP(info, "Initialised custom CCDB host {}", entry.second);
198 }
199 LOGP(info, "{} is remapped to {}", entry.first, entry.second);
200 }
201 helper.createdNotBefore = std::to_string(options.get<int64_t>("condition-not-before"));
202 helper.createdNotAfter = std::to_string(options.get<int64_t>("condition-not-after"));
203
204 for (auto& route : outputRoutes) {
205 if (route.matcher.lifetime != Lifetime::Condition) {
206 continue;
207 }
208 auto specStr = DataSpecUtils::describe(route.matcher);
209 if (accountedSpecs.find(specStr) != accountedSpecs.end()) {
210 continue;
211 }
212 accountedSpecs[specStr] = true;
213 helper.routes.push_back(route);
214 LOGP(info, "The following route is a condition {}", DataSpecUtils::describe(route.matcher));
215 for (auto& metadata : route.matcher.metadata) {
216 if (metadata.type == VariantType::String) {
217 LOGP(info, "- {}: {}", metadata.name, metadata.defaultValue.asString());
218 }
219 }
220 }
221}
222
224{
225 Int_t previousErrorLevel = gErrorIgnoreLevel;
226 gErrorIgnoreLevel = kFatal;
227 TMemFile memFile("name", const_cast<char*>(v.data()), v.size(), "READ");
228 gErrorIgnoreLevel = previousErrorLevel;
229 if (memFile.IsZombie()) {
230 throw runtime_error("CTP is Zombie");
231 }
232 TClass* tcl = TClass::GetClass(typeid(std::vector<Long64_t>));
233 void* result = ccdb::CcdbApi::extractFromTFile(memFile, tcl);
234 if (!result) {
235 throw runtime_error_f("Couldn't retrieve object corresponding to %s from TFile", tcl->GetName());
236 }
237 memFile.Close();
238 auto* ctp = (std::vector<Long64_t>*)result;
239 return (*ctp)[0];
240};
241
246
247auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
248 int64_t timestamp,
249 TimingInfo& timingInfo,
251 DataAllocator& allocator) -> void
252{
253 std::string ccdbMetadataPrefix = "ccdb-metadata-";
254 int objCnt = -1;
255 // We use the timeslice, so that we hook into the same interval as the rest of the
256 // callback.
257 static bool isOnline = isOnlineRun(dtc);
258
259 auto sid = _o2_signpost_id_t{(int64_t)timingInfo.timeslice};
260 O2_SIGNPOST_START(ccdb, sid, "populateCacheWith", "Starting to populate cache with CCDB objects");
261 for (auto& route : helper->routes) {
262 int64_t timestampToUse = timestamp;
263 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Fetching object for route %{public}s", DataSpecUtils::describe(route.matcher).data());
264 objCnt++;
265 auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
266 Output output{concrete.origin, concrete.description, concrete.subSpec};
267 auto&& v = allocator.makeVector<char>(output);
268 std::map<std::string, std::string> metadata;
269 std::map<std::string, std::string> headers;
270 std::string path = "";
271 std::string etag = "";
272 int chRate = helper->queryPeriodGlo;
273 bool checkValidity = false;
274 for (auto& meta : route.matcher.metadata) {
275 if (meta.name == "ccdb-path") {
276 path = meta.defaultValue.get<std::string>();
277 } else if (meta.name == "ccdb-run-dependent" && meta.defaultValue.get<int>() > 0) {
278 if (meta.defaultValue.get<int>() == 1) {
279 metadata["runNumber"] = dtc.runNumber;
280 } else if (meta.defaultValue.get<int>() == 2) {
281 timestampToUse = std::stoi(dtc.runNumber);
282 } else {
283 LOGP(fatal, "Undefined ccdb-run-dependent option {} for spec {}/{}/{}", meta.defaultValue.get<int>(), concrete.origin.as<std::string>(), concrete.description.as<std::string>(), int(concrete.subSpec));
284 }
285 } else if (isPrefix(ccdbMetadataPrefix, meta.name)) {
286 std::string key = meta.name.substr(ccdbMetadataPrefix.size());
287 auto value = meta.defaultValue.get<std::string>();
288 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Adding metadata %{public}s: %{public}s to the request", key.data(), value.data());
289 metadata[key] = value;
290 } else if (meta.name == "ccdb-query-rate") {
291 chRate = std::max(1, meta.defaultValue.get<int>()) * helper->queryPeriodFactor;
292 }
293 }
294 const auto url2uuid = helper->mapURL2UUID.find(path);
295 if (url2uuid != helper->mapURL2UUID.end()) {
296 etag = url2uuid->second.etag;
297 // We check validity every chRate timeslices or if the cache is expired
298 uint64_t validUntil = url2uuid->second.cacheValidUntil;
299 // When the cache was populated. If the cache was populated after the timestamp, we need to check validity.
300 uint64_t cachePopulatedAt = url2uuid->second.cachePopulatedAt;
301 // If timestamp is before the time the element was cached or after the claimed validity, we need to check validity, again
302 // when online.
303 bool cacheExpired = (validUntil <= timestampToUse) || (timestamp < cachePopulatedAt);
304 if (isOnline || cacheExpired) {
305 if (!helper->useTFSlice) {
306 checkValidity = chRate > 0 ? (std::abs(int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) : (timingInfo.tfCounter % -chRate) == 0;
307 } else {
308 checkValidity = chRate > 0 ? (std::abs(int(timingInfo.timeslice - url2uuid->second.lastCheckedSlice)) >= chRate) : (timingInfo.timeslice % -chRate) == 0;
309 if (!checkValidity && helper->useTFSlice > std::abs(chRate)) { // make sure the interval is tolerated unless the check rate itself is too large
310 checkValidity = std::abs(int(timingInfo.tfCounter) - url2uuid->second.lastCheckedTF) > helper->useTFSlice;
311 }
312 }
313 }
314 } else {
315 checkValidity = true; // never skip check if the cache is empty
316 }
317
318 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "checkValidity is %{public}s for tf%{public}s %zu of %{public}s", checkValidity ? "true" : "false", helper->useTFSlice ? "ID" : "Slice", helper->useTFSlice ? timingInfo.timeslice : timingInfo.tfCounter, path.data());
319
320 const auto& api = helper->getAPI(path);
321 if (checkValidity && (!api.isSnapshotMode() || etag.empty())) { // in the snapshot mode the object needs to be fetched only once
322 LOGP(detail, "Loading {} for timestamp {}", path, timestampToUse);
323 api.loadFileToMemory(v, path, metadata, timestampToUse, &headers, etag, helper->createdNotAfter, helper->createdNotBefore);
324 if ((headers.count("Error") != 0) || (etag.empty() && v.empty())) {
325 LOGP(fatal, "Unable to find CCDB object {}/{}", path, timestampToUse);
326 // FIXME: I should send a dummy message.
327 continue;
328 }
329 // printing in case we find a default entry
330 if (headers.find("default") != headers.end()) {
331 LOGP(detail, "******** Default entry used for {} ********", path);
332 }
333 helper->mapURL2UUID[path].lastCheckedTF = timingInfo.tfCounter;
334 helper->mapURL2UUID[path].lastCheckedSlice = timingInfo.timeslice;
335 if (etag.empty()) {
336 helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
337 helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse;
338 helper->mapURL2UUID[path].cacheMiss++;
339 helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
340 helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
341 api.appendFlatHeader(v, headers);
342 auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
343 helper->mapURL2DPLCache[path] = cacheId;
344 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
345 continue;
346 }
347 if (v.size()) { // but should be overridden by fresh object
348 // somewhere here pruneFromCache should be called
349 helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
350 helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse;
351 helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]);
352 helper->mapURL2UUID[path].cacheMiss++;
353 helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
354 helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
355 api.appendFlatHeader(v, headers);
356 auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
357 helper->mapURL2DPLCache[path] = cacheId;
358 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
359 // one could modify the adoptContainer to take optional old cacheID to clean:
360 // mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]);
361 continue;
362 } else {
363 // Only once the etag is actually used, we get the information on how long the object is valid
364 helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]);
365 }
366 }
367 // cached object is fine
368 auto cacheId = helper->mapURL2DPLCache[path];
369 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Reusing %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
370 helper->mapURL2UUID[path].cacheHit++;
371 allocator.adoptFromCache(output, cacheId, header::gSerializationMethodCCDB);
372 // the outputBuffer was not used, can we destroy it?
373 }
374 O2_SIGNPOST_END(ccdb, sid, "populateCacheWith", "Finished populating cache with CCDB objects");
375};
376
378{
379 return adaptStateful([](CallbackService& callbacks, ConfigParamRegistry const& options, DeviceSpec const& spec) {
380 std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
381 initialiseHelper(*helper, options, spec.outputs);
384 callbacks.set<CallbackService::Id::Stop>([helper]() {
385 LOGP(info, "CCDB cache miss/hit ratio:");
386 for (auto& entry : helper->mapURL2UUID) {
387 LOGP(info, " {}: {}/{} ({}-{} bytes)", entry.first, entry.second.cacheMiss, entry.second.cacheHit, entry.second.minSize, entry.second.maxSize);
388 }
389 });
390
391 return adaptStateless([helper](DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo) {
392 auto sid = _o2_signpost_id_t{(int64_t)timingInfo.timeslice};
393 O2_SIGNPOST_START(ccdb, sid, "fetchFromCCDB", "Fetching CCDB objects for timeslice %" PRIu64, (uint64_t)timingInfo.timeslice);
394 static Long64_t orbitResetTime = -1;
395 static size_t lastTimeUsed = -1;
397 LOGP(info, "Dummy creation time is not supported for CCDB objects. Setting creation to last one used {}.", lastTimeUsed);
398 timingInfo.creation = lastTimeUsed;
399 }
400 lastTimeUsed = timingInfo.creation;
401 // Fetch the CCDB object for the CTP
402 {
403 const std::string path = "CTP/Calib/OrbitReset";
404 std::map<std::string, std::string> metadata;
405 std::map<std::string, std::string> headers;
406 std::string etag;
407 int32_t counter = helper->useTFSlice ? timingInfo.timeslice : timingInfo.tfCounter;
408 bool checkValidity = std::abs(int(counter - helper->lastCheckedTFCounterOrbReset)) >= helper->queryPeriodGlo;
409 const auto url2uuid = helper->mapURL2UUID.find(path);
410 if (url2uuid != helper->mapURL2UUID.end()) {
411 etag = url2uuid->second.etag;
412 } else {
413 checkValidity = true; // never skip check if the cache is empty
414 }
415 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "checkValidity is %{public}s for tf%{public}s %d of %{public}s",
416 checkValidity ? "true" : "false", helper->useTFSlice ? "ID" : "Slice", counter, path.data());
417 Output output{"CTP", "OrbitReset", 0};
418 Long64_t newOrbitResetTime = orbitResetTime;
419 auto&& v = allocator.makeVector<char>(output);
420 const auto& api = helper->getAPI(path);
421 if (checkValidity && (!api.isSnapshotMode() || etag.empty())) { // in the snapshot mode the object needs to be fetched only once
422 helper->lastCheckedTFCounterOrbReset = counter;
423 api.loadFileToMemory(v, path, metadata, timingInfo.creation, &headers, etag, helper->createdNotAfter, helper->createdNotBefore);
424 if ((headers.count("Error") != 0) || (etag.empty() && v.empty())) {
425 LOGP(fatal, "Unable to find CCDB object {}/{}", path, timingInfo.creation);
426 // FIXME: I should send a dummy message.
427 return;
428 }
429 if (etag.empty()) {
430 helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
431 helper->mapURL2UUID[path].cacheMiss++;
432 helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
433 helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
434 newOrbitResetTime = getOrbitResetTime(v);
435 api.appendFlatHeader(v, headers);
437 helper->mapURL2DPLCache[path] = cacheId;
438 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
439 } else if (v.size()) { // but should be overridden by fresh object
440 // somewhere here pruneFromCache should be called
441 helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
442 helper->mapURL2UUID[path].cacheMiss++;
443 helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
444 helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
445 newOrbitResetTime = getOrbitResetTime(v);
446 api.appendFlatHeader(v, headers);
448 helper->mapURL2DPLCache[path] = cacheId;
449 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
450 // one could modify the adoptContainer to take optional old cacheID to clean:
451 // mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]);
452 }
453 // cached object is fine
454 }
455 auto cacheId = helper->mapURL2DPLCache[path];
456 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Reusing %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
457 helper->mapURL2UUID[path].cacheHit++;
459
460 if (newOrbitResetTime != orbitResetTime) {
461 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Orbit reset time changed from %lld to %lld", orbitResetTime, newOrbitResetTime);
462 orbitResetTime = newOrbitResetTime;
463 dtc.orbitResetTimeMUS = orbitResetTime;
464 }
465 }
466
467 int64_t timestamp = ceil((timingInfo.firstTForbit * o2::constants::lhc::LHCOrbitNS / 1000 + orbitResetTime) / 1000); // RS ceilf precision is not enough
468 if (std::abs(int64_t(timingInfo.creation) - timestamp) > helper->timeToleranceMS) {
469 static bool notWarnedYet = true;
470 if (notWarnedYet) {
471 LOGP(warn, "timestamp {} for orbit {} and orbit reset time {} differs by >{} from the TF creation time {}, use the latter", timestamp, timingInfo.firstTForbit, orbitResetTime / 1000, helper->timeToleranceMS, timingInfo.creation);
472 notWarnedYet = false;
473 // apparently the orbit reset time from the CTP object makes no sense (i.e. orbit was reset for this run w/o create an object, as it happens for technical runs)
474 dtc.orbitResetTimeMUS = 1000 * timingInfo.creation - timingInfo.firstTForbit * o2::constants::lhc::LHCOrbitNS / 1000;
475 }
476 timestamp = timingInfo.creation;
477 }
478 // Fetch the rest of the objects.
479 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Fetching objects. Run %{public}s. OrbitResetTime %lld. Creation %lld. Timestamp %lld. firstTForbit %" PRIu32,
480 dtc.runNumber.data(), orbitResetTime, timingInfo.creation, timestamp, timingInfo.firstTForbit);
481
482 populateCacheWith(helper, timestamp, timingInfo, dtc, allocator);
483 O2_SIGNPOST_END(ccdb, _o2_signpost_id_t{(int64_t)timingInfo.timeslice}, "fetchFromCCDB", "Fetching CCDB objects");
484 }); });
485}
486
487} // namespace o2::framework
benchmark::State & state
std::unordered_map< std::string, std::string > remappings
size_t cachePopulatedAt
std::string etag
Header to collect LHC related constants.
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
uint16_t pos
Definition RawData.h:3
uint32_t c
Definition RawData.h:2
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:489
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:608
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:522
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:602
StringRef key
static void * extractFromTFile(TFile &file, TClass const *cl, const char *what=CCDBOBJECT_ENTRY)
Definition CcdbApi.cxx:895
o2::pmr::vector< T > makeVector(const Output &spec, Args &&... args)
void adoptFromCache(Output const &spec, CacheId id, header::SerializationMethod method=header::gSerializationMethodNone)
Adopt an already cached message, using an already provided CacheId.
CacheId adoptContainer(const Output &, ContainerT &, CacheStrategy, o2::header::SerializationMethod)
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint entry
Definition glcorearb.h:5735
const GLdouble * v
Definition glcorearb.h:832
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
GLuint counter
Definition glcorearb.h:3987
constexpr double LHCOrbitNS
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RuntimeErrorRef runtime_error(const char *)
auto populateCacheWith(std::shared_ptr< CCDBFetcherHelper > const &helper, int64_t timestamp, TimingInfo &timingInfo, DataTakingContext &dtc, DataAllocator &allocator) -> void
bool isOnlineRun(DataTakingContext const &dtc)
bool isPrefix(std::string_view prefix, std::string_view full)
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
RuntimeErrorRef runtime_error_f(const char *,...)
auto getOrbitResetTime(o2::pmr::vector< char > const &v) -> Long64_t
void initialiseHelper(CCDBFetcherHelper &helper, ConfigParamRegistry const &options, std::vector< o2::framework::OutputRoute > const &outputRoutes)
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
constexpr o2::header::SerializationMethod gSerializationMethodNone
Definition DataHeader.h:327
constexpr o2::header::SerializationMethod gSerializationMethodCCDB
Definition DataHeader.h:329
std::vector< T, fair::mq::pmr::polymorphic_allocator< T > > vector
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
o2::ccdb::CcdbApi & getAPI(const std::string &path)
std::vector< OutputRoute > routes
std::unordered_map< std::string, std::string > remappings
std::unordered_map< std::string, DataAllocator::CacheId > mapURL2DPLCache
std::unordered_map< std::string, o2::ccdb::CcdbApi > apis
std::unordered_map< std::string, CCDBCacheInfo > mapURL2UUID
static ParserResult parseRemappings(char const *)
static AlgorithmSpec fetchFromCCDB()
static constexpr uint64_t DUMMY_CREATION_TIME_OFFSET
static std::string describe(InputSpec const &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
long orbitResetTimeMUS
The start time of the first orbit in microseconds(!)
DeploymentMode deploymentMode
Where we thing this is running.
std::string runNumber
The current run number.
std::vector< OutputRoute > outputs
Definition DeviceSpec.h:63
header::DataOrigin origin
Definition Output.h:28
uint32_t tfCounter
the orbit the TF begins
Definition TimingInfo.h:32
const std::string str