Project
Loading...
Searching...
No Matches
CCDBHelpers.cxx
Go to the documentation of this file.
1// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include "CCDBHelpers.h"
14#include "Framework/Logger.h"
20#include "CCDB/CcdbApi.h"
22#include "Framework/Signpost.h"
23#include <typeinfo>
24#include <TError.h>
25#include <TMemFile.h>
26#include <functional>
27
29
30namespace o2::framework
31{
32
35 std::string etag;
36 size_t cacheValidUntil = 0;
37 size_t cachePopulatedAt = 0;
38 size_t cacheMiss = 0;
39 size_t cacheHit = 0;
40 size_t minSize = -1ULL;
41 size_t maxSize = 0;
43 };
44
45 struct RemapMatcher {
46 std::string path;
47 };
48
49 struct RemapTarget {
50 std::string url;
51 };
52
53 std::unordered_map<std::string, CCDBCacheInfo> mapURL2UUID;
54 std::unordered_map<std::string, DataAllocator::CacheId> mapURL2DPLCache;
55 std::string createdNotBefore = "0";
56 std::string createdNotAfter = "3385078236000";
57 std::unordered_map<std::string, o2::ccdb::CcdbApi> apis;
58 std::vector<OutputRoute> routes;
59 std::unordered_map<std::string, std::string> remappings;
60 uint32_t lastCheckedTFCounterOrbReset = 0; // last checkecked TFcounter for bulk check
63 int64_t timeToleranceMS = 5000;
64
65 o2::ccdb::CcdbApi& getAPI(const std::string& path)
66 {
67 // find the first = sign in the string. If present drop everything after it
68 // and between it and the previous /.
69 auto pos = path.find('=');
70 if (pos == std::string::npos) {
71 auto entry = remappings.find(path);
72 return apis[entry == remappings.end() ? "" : entry->second];
73 }
74 auto pos2 = path.rfind('/', pos);
75 if (pos2 == std::string::npos || pos2 == pos - 1 || pos2 == 0) {
76 throw runtime_error_f("Malformed path %s", path.c_str());
77 }
78 auto entry = remappings.find(path.substr(0, pos2));
79 return apis[entry == remappings.end() ? "" : entry->second];
80 }
81};
82
83bool isPrefix(std::string_view prefix, std::string_view full)
84{
85 return prefix == full.substr(0, prefix.size());
86}
87
89{
90 std::unordered_map<std::string, std::string> remappings;
91 std::string currentUrl = "";
92
93 enum ParsingStates {
94 IN_BEGIN,
95 IN_BEGIN_URL,
96 IN_BEGIN_TARGET,
97 IN_END_TARGET,
98 IN_END_URL
99 };
100 ParsingStates state = IN_BEGIN;
101
102 while (true) {
103 switch (state) {
104 case IN_BEGIN: {
105 if (*str == 0) {
106 return {remappings, ""};
107 }
108 state = IN_BEGIN_URL;
109 }
110 case IN_BEGIN_URL: {
111 if ((strncmp("http://", str, 7) != 0) && (strncmp("https://", str, 8) != 0 && (strncmp("file://", str, 7) != 0))) {
112 return {remappings, "URL should start with either http:// or https:// or file://"};
113 }
114 state = IN_END_URL;
115 } break;
116 case IN_END_URL: {
117 char const* c = strchr(str, '=');
118 if (c == nullptr) {
119 return {remappings, "Expecting at least one target path, missing `='?"};
120 }
121 if ((c - str) == 0) {
122 return {remappings, "Empty url"};
123 }
124 currentUrl = std::string_view(str, c - str);
125 state = IN_BEGIN_TARGET;
126 str = c + 1;
127 } break;
128 case IN_BEGIN_TARGET: {
129 if (*str == 0) {
130 return {remappings, "Empty target"};
131 }
132 state = IN_END_TARGET;
133 } break;
134 case IN_END_TARGET: {
135 char const* c = strpbrk(str, ",;");
136 if (c == nullptr) {
137 if (remappings.count(str)) {
138 return {remappings, fmt::format("Path {} requested more than once.", str)};
139 }
140 remappings[std::string(str)] = currentUrl;
141 return {remappings, ""};
142 }
143 if ((c - str) == 0) {
144 return {remappings, "Empty target"};
145 }
146 auto key = std::string(str, c - str);
147 if (remappings.count(str)) {
148 return {remappings, fmt::format("Path {} requested more than once.", key)};
149 }
150 remappings[key] = currentUrl;
151 if (*c == ';') {
152 state = IN_BEGIN_URL;
153 } else {
154 state = IN_BEGIN_TARGET;
155 }
156 str = c + 1;
157 } break;
158 }
159 }
160}
161
163{
164 Int_t previousErrorLevel = gErrorIgnoreLevel;
165 gErrorIgnoreLevel = kFatal;
166 TMemFile memFile("name", const_cast<char*>(v.data()), v.size(), "READ");
167 gErrorIgnoreLevel = previousErrorLevel;
168 if (memFile.IsZombie()) {
169 throw runtime_error("CTP is Zombie");
170 }
171 TClass* tcl = TClass::GetClass(typeid(std::vector<Long64_t>));
172 void* result = ccdb::CcdbApi::extractFromTFile(memFile, tcl);
173 if (!result) {
174 throw runtime_error_f("Couldn't retrieve object corresponding to %s from TFile", tcl->GetName());
175 }
176 memFile.Close();
177 auto* ctp = (std::vector<Long64_t>*)result;
178 return (*ctp)[0];
179};
180
185
186auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
187 int64_t timestamp,
188 TimingInfo& timingInfo,
190 DataAllocator& allocator) -> void
191{
192 std::string ccdbMetadataPrefix = "ccdb-metadata-";
193 int objCnt = -1;
194 // We use the timeslice, so that we hook into the same interval as the rest of the
195 // callback.
196 static bool isOnline = isOnlineRun(dtc);
197
198 auto sid = _o2_signpost_id_t{(int64_t)timingInfo.timeslice};
199 O2_SIGNPOST_START(ccdb, sid, "populateCacheWith", "Starting to populate cache with CCDB objects");
200 for (auto& route : helper->routes) {
201 int64_t timestampToUse = timestamp;
202 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Fetching object for route %{public}s", DataSpecUtils::describe(route.matcher).data());
203 objCnt++;
204 auto concrete = DataSpecUtils::asConcreteDataMatcher(route.matcher);
205 Output output{concrete.origin, concrete.description, concrete.subSpec};
206 auto&& v = allocator.makeVector<char>(output);
207 std::map<std::string, std::string> metadata;
208 std::map<std::string, std::string> headers;
209 std::string path = "";
210 std::string etag = "";
211 int chRate = helper->queryPeriodGlo;
212 bool checkValidity = false;
213 for (auto& meta : route.matcher.metadata) {
214 if (meta.name == "ccdb-path") {
215 path = meta.defaultValue.get<std::string>();
216 } else if (meta.name == "ccdb-run-dependent" && meta.defaultValue.get<int>() > 0) {
217 if (meta.defaultValue.get<int>() == 1) {
218 metadata["runNumber"] = dtc.runNumber;
219 } else if (meta.defaultValue.get<int>() == 2) {
220 timestampToUse = std::stoi(dtc.runNumber);
221 } else {
222 LOGP(fatal, "Undefined run-dependent option {} for spec {}/{}/{}", meta.defaultValue.get<int>(), concrete.origin.as<std::string>(), concrete.description.as<std::string>(), int(concrete.subSpec));
223 }
224 } else if (isPrefix(ccdbMetadataPrefix, meta.name)) {
225 std::string key = meta.name.substr(ccdbMetadataPrefix.size());
226 auto value = meta.defaultValue.get<std::string>();
227 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Adding metadata %{public}s: %{public}s to the request", key.data(), value.data());
228 metadata[key] = value;
229 } else if (meta.name == "ccdb-query-rate") {
230 chRate = meta.defaultValue.get<int>() * helper->queryPeriodFactor;
231 }
232 }
233 const auto url2uuid = helper->mapURL2UUID.find(path);
234 if (url2uuid != helper->mapURL2UUID.end()) {
235 etag = url2uuid->second.etag;
236 // We check validity every chRate timeslices or if the cache is expired
237 uint64_t validUntil = url2uuid->second.cacheValidUntil;
238 // When the cache was populated. If the cache was populated after the timestamp, we need to check validity.
239 uint64_t cachePopulatedAt = url2uuid->second.cachePopulatedAt;
240 // If timestamp is before the time the element was cached or after the claimed validity, we need to check validity, again
241 // when online.
242 bool cacheExpired = (validUntil <= timestampToUse) || (timestamp < cachePopulatedAt);
243 checkValidity = (std::abs(int(timingInfo.tfCounter - url2uuid->second.lastCheckedTF)) >= chRate) && (isOnline || cacheExpired);
244 } else {
245 checkValidity = true; // never skip check if the cache is empty
246 }
247
248 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "checkValidity is %{public}s for tfID %d of %{public}s", checkValidity ? "true" : "false", timingInfo.tfCounter, path.data());
249
250 const auto& api = helper->getAPI(path);
251 if (checkValidity && (!api.isSnapshotMode() || etag.empty())) { // in the snapshot mode the object needs to be fetched only once
252 LOGP(detail, "Loading {} for timestamp {}", path, timestampToUse);
253 api.loadFileToMemory(v, path, metadata, timestampToUse, &headers, etag, helper->createdNotAfter, helper->createdNotBefore);
254 if ((headers.count("Error") != 0) || (etag.empty() && v.empty())) {
255 LOGP(fatal, "Unable to find object {}/{}", path, timestampToUse);
256 // FIXME: I should send a dummy message.
257 continue;
258 }
259 // printing in case we find a default entry
260 if (headers.find("default") != headers.end()) {
261 LOGP(detail, "******** Default entry used for {} ********", path);
262 }
263 helper->mapURL2UUID[path].lastCheckedTF = timingInfo.tfCounter;
264 if (etag.empty()) {
265 helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
266 helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse;
267 helper->mapURL2UUID[path].cacheMiss++;
268 helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
269 helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
270 api.appendFlatHeader(v, headers);
271 auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
272 helper->mapURL2DPLCache[path] = cacheId;
273 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
274 continue;
275 }
276 if (v.size()) { // but should be overridden by fresh object
277 // somewhere here pruneFromCache should be called
278 helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
279 helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse;
280 helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]);
281 helper->mapURL2UUID[path].cacheMiss++;
282 helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
283 helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
284 api.appendFlatHeader(v, headers);
285 auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
286 helper->mapURL2DPLCache[path] = cacheId;
287 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
288 // one could modify the adoptContainer to take optional old cacheID to clean:
289 // mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]);
290 continue;
291 } else {
292 // Only once the etag is actually used, we get the information on how long the object is valid
293 helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]);
294 }
295 }
296 // cached object is fine
297 auto cacheId = helper->mapURL2DPLCache[path];
298 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Reusing %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
299 helper->mapURL2UUID[path].cacheHit++;
300 allocator.adoptFromCache(output, cacheId, header::gSerializationMethodCCDB);
301 // the outputBuffer was not used, can we destroy it?
302 }
303 O2_SIGNPOST_END(ccdb, sid, "populateCacheWith", "Finished populating cache with CCDB objects");
304};
305
307{
308 return adaptStateful([](CallbackService& callbacks, ConfigParamRegistry const& options, DeviceSpec const& spec) {
309 std::shared_ptr<CCDBFetcherHelper> helper = std::make_shared<CCDBFetcherHelper>();
310 std::unordered_map<std::string, bool> accountedSpecs;
311 auto defHost = options.get<std::string>("condition-backend");
312 auto checkRate = options.get<int>("condition-tf-per-query");
313 auto checkMult = options.get<int>("condition-tf-per-query-multiplier");
314 helper->timeToleranceMS = options.get<int64_t>("condition-time-tolerance");
315 helper->queryPeriodGlo = checkRate > 0 ? checkRate : std::numeric_limits<int>::max();
316 helper->queryPeriodFactor = checkMult > 0 ? checkMult : 1;
317 LOGP(info, "CCDB Backend at: {}, validity check for every {} TF{}", defHost, helper->queryPeriodGlo, helper->queryPeriodFactor == 1 ? std::string{} : fmt::format(", (query for high-rate objects downscaled by {})", helper->queryPeriodFactor));
318 LOGP(info, "Hook to enable signposts for CCDB messages at {}", (void*)&private_o2_log_ccdb->stacktrace);
319 auto remapString = options.get<std::string>("condition-remap");
320 ParserResult result = CCDBHelpers::parseRemappings(remapString.c_str());
321 if (!result.error.empty()) {
322 throw runtime_error_f("Error while parsing remapping string %s", result.error.c_str());
323 }
324 helper->remappings = result.remappings;
325 helper->apis[""].init(defHost); // default backend
326 LOGP(info, "Initialised default CCDB host {}", defHost);
327 //
328 for (auto& entry : helper->remappings) { // init api instances for every host seen in the remapping
329 if (helper->apis.find(entry.second) == helper->apis.end()) {
330 helper->apis[entry.second].init(entry.second);
331 LOGP(info, "Initialised custom CCDB host {}", entry.second);
332 }
333 LOGP(info, "{} is remapped to {}", entry.first, entry.second);
334 }
335 helper->createdNotBefore = std::to_string(options.get<int64_t>("condition-not-before"));
336 helper->createdNotAfter = std::to_string(options.get<int64_t>("condition-not-after"));
337
338 for (auto &route : spec.outputs) {
339 if (route.matcher.lifetime != Lifetime::Condition) {
340 continue;
341 }
342 auto specStr = DataSpecUtils::describe(route.matcher);
343 if (accountedSpecs.find(specStr) != accountedSpecs.end()) {
344 continue;
345 }
346 accountedSpecs[specStr] = true;
347 helper->routes.push_back(route);
348 LOGP(info, "The following route is a condition {}", DataSpecUtils::describe(route.matcher));
349 for (auto& metadata : route.matcher.metadata) {
350 if (metadata.type == VariantType::String) {
351 LOGP(info, "- {}: {}", metadata.name, metadata.defaultValue.asString());
352 }
353 }
354 }
357 callbacks.set<CallbackService::Id::Stop>([helper]() {
358 LOGP(info, "CCDB cache miss/hit ratio:");
359 for (auto& entry : helper->mapURL2UUID) {
360 LOGP(info, " {}: {}/{} ({}-{} bytes)", entry.first, entry.second.cacheMiss, entry.second.cacheHit, entry.second.minSize, entry.second.maxSize);
361 }
362 });
363
364 return adaptStateless([helper](DataTakingContext& dtc, DataAllocator& allocator, TimingInfo& timingInfo) {
365 auto sid = _o2_signpost_id_t{(int64_t)timingInfo.timeslice};
366 O2_SIGNPOST_START(ccdb, sid, "fetchFromCCDB", "Fetching CCDB objects for timeslice %" PRIu64, (uint64_t)timingInfo.timeslice);
367 static Long64_t orbitResetTime = -1;
368 static size_t lastTimeUsed = -1;
370 LOGP(info, "Dummy creation time is not supported for CCDB objects. Setting creation to last one used {}.", lastTimeUsed);
371 timingInfo.creation = lastTimeUsed;
372 }
373 lastTimeUsed = timingInfo.creation;
374 // Fetch the CCDB object for the CTP
375 {
376 const std::string path = "CTP/Calib/OrbitReset";
377 std::map<std::string, std::string> metadata;
378 std::map<std::string, std::string> headers;
379 std::string etag;
380 bool checkValidity = std::abs(int(timingInfo.tfCounter - helper->lastCheckedTFCounterOrbReset)) >= helper->queryPeriodGlo;
381 const auto url2uuid = helper->mapURL2UUID.find(path);
382 if (url2uuid != helper->mapURL2UUID.end()) {
383 etag = url2uuid->second.etag;
384 } else {
385 checkValidity = true; // never skip check if the cache is empty
386 }
387 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "checkValidity is %{public}s for tfID %d of %{public}s",
388 checkValidity ? "true" : "false", timingInfo.tfCounter, path.data());
389 Output output{"CTP", "OrbitReset", 0};
390 Long64_t newOrbitResetTime = orbitResetTime;
391 auto&& v = allocator.makeVector<char>(output);
392 const auto& api = helper->getAPI(path);
393 if (checkValidity && (!api.isSnapshotMode() || etag.empty())) { // in the snapshot mode the object needs to be fetched only once
394 helper->lastCheckedTFCounterOrbReset = timingInfo.tfCounter;
395 api.loadFileToMemory(v, path, metadata, timingInfo.creation, &headers, etag, helper->createdNotAfter, helper->createdNotBefore);
396 if ((headers.count("Error") != 0) || (etag.empty() && v.empty())) {
397 LOGP(fatal, "Unable to find object {}/{}", path, timingInfo.creation);
398 // FIXME: I should send a dummy message.
399 return;
400 }
401 if (etag.empty()) {
402 helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
403 helper->mapURL2UUID[path].cacheMiss++;
404 helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
405 helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
406 newOrbitResetTime = getOrbitResetTime(v);
407 api.appendFlatHeader(v, headers);
409 helper->mapURL2DPLCache[path] = cacheId;
410 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
411 } else if (v.size()) { // but should be overridden by fresh object
412 // somewhere here pruneFromCache should be called
413 helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
414 helper->mapURL2UUID[path].cacheMiss++;
415 helper->mapURL2UUID[path].minSize = std::min(v.size(), helper->mapURL2UUID[path].minSize);
416 helper->mapURL2UUID[path].maxSize = std::max(v.size(), helper->mapURL2UUID[path].maxSize);
417 newOrbitResetTime = getOrbitResetTime(v);
418 api.appendFlatHeader(v, headers);
420 helper->mapURL2DPLCache[path] = cacheId;
421 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
422 // one could modify the adoptContainer to take optional old cacheID to clean:
423 // mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]);
424 }
425 // cached object is fine
426 }
427 auto cacheId = helper->mapURL2DPLCache[path];
428 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Reusing %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
429 helper->mapURL2UUID[path].cacheHit++;
431
432 if (newOrbitResetTime != orbitResetTime) {
433 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Orbit reset time changed from %lld to %lld", orbitResetTime, newOrbitResetTime);
434 orbitResetTime = newOrbitResetTime;
435 dtc.orbitResetTimeMUS = orbitResetTime;
436 }
437 }
438
439 int64_t timestamp = ceil((timingInfo.firstTForbit * o2::constants::lhc::LHCOrbitNS / 1000 + orbitResetTime) / 1000); // RS ceilf precision is not enough
440 if (std::abs(int64_t(timingInfo.creation) - timestamp) > helper->timeToleranceMS) {
441 static bool notWarnedYet = true;
442 if (notWarnedYet) {
443 LOGP(warn, "timestamp {} for orbit {} and orbit reset time {} differs by >{} from the TF creation time {}, use the latter", timestamp, timingInfo.firstTForbit, orbitResetTime / 1000, helper->timeToleranceMS, timingInfo.creation);
444 notWarnedYet = false;
445 // apparently the orbit reset time from the CTP object makes no sense (i.e. orbit was reset for this run w/o create an object, as it happens for technical runs)
446 dtc.orbitResetTimeMUS = 1000 * timingInfo.creation - timingInfo.firstTForbit * o2::constants::lhc::LHCOrbitNS / 1000;
447 }
448 timestamp = timingInfo.creation;
449 }
450 // Fetch the rest of the objects.
451 O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Fetching objects. Run %{public}s. OrbitResetTime %lld. Creation %lld. Timestamp %lld. firstTForbit %" PRIu32,
452 dtc.runNumber.data(), orbitResetTime, timingInfo.creation, timestamp, timingInfo.firstTForbit);
453
454 populateCacheWith(helper, timestamp, timingInfo, dtc, allocator);
455 O2_SIGNPOST_END(ccdb, _o2_signpost_id_t{(int64_t)timingInfo.timeslice}, "fetchFromCCDB", "Fetching CCDB objects");
456 }); });
457}
458
459} // namespace o2::framework
benchmark::State & state
Header to collect LHC related constants.
void output(const std::map< std::string, ChannelStat > &channels)
Definition rawdump.cxx:197
uint16_t pos
Definition RawData.h:3
uint32_t c
Definition RawData.h:2
#define O2_DECLARE_DYNAMIC_LOG(name)
Definition Signpost.h:473
#define O2_SIGNPOST_END(log, id, name, format,...)
Definition Signpost.h:540
#define O2_SIGNPOST_EVENT_EMIT(log, id, name, format,...)
Definition Signpost.h:495
#define O2_SIGNPOST_START(log, id, name, format,...)
Definition Signpost.h:534
StringRef key
static void * extractFromTFile(TFile &file, TClass const *cl, const char *what=CCDBOBJECT_ENTRY)
Definition CcdbApi.cxx:877
o2::pmr::vector< T > makeVector(const Output &spec, Args &&... args)
void adoptFromCache(Output const &spec, CacheId id, header::SerializationMethod method=header::gSerializationMethodNone)
Adopt an already cached message, using an already provided CacheId.
CacheId adoptContainer(const Output &, ContainerT &, CacheStrategy, o2::header::SerializationMethod)
GLuint64EXT * result
Definition glcorearb.h:5662
GLuint entry
Definition glcorearb.h:5735
const GLdouble * v
Definition glcorearb.h:832
GLsizei const GLfloat * value
Definition glcorearb.h:819
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
constexpr double LHCOrbitNS
Defining PrimaryVertex explicitly as messageable.
Definition TFIDInfo.h:20
RuntimeErrorRef runtime_error(const char *)
auto populateCacheWith(std::shared_ptr< CCDBFetcherHelper > const &helper, int64_t timestamp, TimingInfo &timingInfo, DataTakingContext &dtc, DataAllocator &allocator) -> void
bool isOnlineRun(DataTakingContext const &dtc)
bool isPrefix(std::string_view prefix, std::string_view full)
AlgorithmSpec::ProcessCallback adaptStateless(LAMBDA l)
RuntimeErrorRef runtime_error_f(const char *,...)
auto getOrbitResetTime(o2::pmr::vector< char > const &v) -> Long64_t
AlgorithmSpec::InitCallback adaptStateful(LAMBDA l)
constexpr o2::header::SerializationMethod gSerializationMethodNone
Definition DataHeader.h:327
constexpr o2::header::SerializationMethod gSerializationMethodCCDB
Definition DataHeader.h:329
std::vector< T, o2::pmr::polymorphic_allocator< T > > vector
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
o2::ccdb::CcdbApi & getAPI(const std::string &path)
std::vector< OutputRoute > routes
std::unordered_map< std::string, std::string > remappings
std::unordered_map< std::string, DataAllocator::CacheId > mapURL2DPLCache
std::unordered_map< std::string, o2::ccdb::CcdbApi > apis
std::unordered_map< std::string, CCDBCacheInfo > mapURL2UUID
static ParserResult parseRemappings(char const *)
static AlgorithmSpec fetchFromCCDB()
static constexpr uint64_t DUMMY_CREATION_TIME_OFFSET
static std::string describe(InputSpec const &spec)
static ConcreteDataMatcher asConcreteDataMatcher(InputSpec const &input)
long orbitResetTimeMUS
The start time of the first orbit in microseconds(!)
DeploymentMode deploymentMode
Where we thing this is running.
std::string runNumber
The current run number.
std::vector< OutputRoute > outputs
Definition DeviceSpec.h:63
header::DataOrigin origin
Definition Output.h:28
uint32_t tfCounter
the orbit the TF begins
Definition TimingInfo.h:32
const std::string str