32#include <TStreamerInfo.h>
36#include <fairlogger/Logger.h>
42#include <boost/algorithm/string.hpp>
43#include <boost/asio/ip/host_name.hpp>
46#include <boost/interprocess/sync/named_semaphore.hpp>
50#include <unordered_set>
51#include "rapidjson/document.h"
52#include "rapidjson/writer.h"
53#include "rapidjson/stringbuffer.h"
61unique_ptr<TJAlienCredentials> CcdbApi::mJAlienCredentials =
nullptr;
76 boost::interprocess::named_semaphore* mSem =
nullptr;
77 std::string mSemName{};
92 std::unordered_set<CCDBSemaphore const*> mStore;
102 mIsCCDBDownloaderPreferred = 0;
103 if (deploymentMode == DeploymentMode::OnlineDDS && deploymentMode == DeploymentMode::OnlineECS && deploymentMode == DeploymentMode::OnlineAUX && deploymentMode == DeploymentMode::FST) {
104 mIsCCDBDownloaderPreferred = 1;
106 if (getenv(
"ALICEO2_ENABLE_MULTIHANDLE_CCDBAPI")) {
107 mIsCCDBDownloaderPreferred = atoi(getenv(
"ALICEO2_ENABLE_MULTIHANDLE_CCDBAPI"));
114 curl_global_cleanup();
118void CcdbApi::setUniqueAgentID()
120 std::string host = boost::asio::ip::host_name();
121 char const* jobID = getenv(
"ALIEN_PROC_ID");
132 LOG(
debug) <<
"On macOS we simply rely on TGrid::Connect(\"alien\").";
135 if (getenv(
"ALICEO2_CCDB_NOTOKENCHECK") && atoi(getenv(
"ALICEO2_CCDB_NOTOKENCHECK"))) {
138 if (getenv(
"JALIEN_TOKEN_CERT")) {
141 auto returncode = system(
"LD_PRELOAD= alien-token-info &> /dev/null");
142 if (returncode == -1) {
145 return returncode == 0;
148void CcdbApi::curlInit()
151 curl_global_init(CURL_GLOBAL_DEFAULT);
152 CcdbApi::mJAlienCredentials = std::make_unique<TJAlienCredentials>();
153 CcdbApi::mJAlienCredentials->loadCredentials();
154 CcdbApi::mJAlienCredentials->selectPreferedCredentials();
157 if (getenv(
"ALICEO2_CCDB_SOCKET_TIMEOUT")) {
158 auto timeoutMS = atoi(getenv(
"ALICEO2_CCDB_SOCKET_TIMEOUT"));
159 if (timeoutMS >= 0) {
160 LOG(info) <<
"Setting socket timeout to " << timeoutMS <<
" milliseconds";
169 throw std::invalid_argument(
"Empty url passed CcdbApi, cannot initialize. Aborting.");
174 constexpr const char* SNAPSHOTPREFIX =
"file://";
177 if (host.substr(0, 7).compare(SNAPSHOTPREFIX) == 0) {
178 auto path = host.substr(7);
179 initInSnapshotMode(
path);
200 std::string snapshotReport{};
201 const char* cachedir = getenv(
"ALICEO2_CCDB_LOCALCACHE");
202 namespace fs = std::filesystem;
204 if (cachedir[0] == 0) {
205 mSnapshotCachePath = fs::weakly_canonical(fs::absolute(
"."));
207 mSnapshotCachePath = fs::weakly_canonical(fs::absolute(cachedir));
209 snapshotReport = fmt::format(
"(cache snapshots to dir={}", mSnapshotCachePath);
212 mPreferSnapshotCache =
true;
213 if (mSnapshotCachePath.empty()) {
214 LOGP(fatal,
"IGNORE_VALIDITYCHECK_OF_CCDB_LOCALCACHE is defined but the ALICEO2_CCDB_LOCALCACHE is not");
216 snapshotReport +=
", prefer if available";
218 if (!snapshotReport.empty()) {
219 snapshotReport +=
')';
222 mNeedAlienToken = (host.find(
"https://") != std::string::npos) || (host.find(
"alice-ccdb.cern.ch") != std::string::npos);
225 if (getenv(
"ALICEO2_CCDB_CURL_TIMEOUT_DOWNLOAD")) {
226 auto timeout = atoi(getenv(
"ALICEO2_CCDB_CURL_TIMEOUT_DOWNLOAD"));
228 mCurlTimeoutDownload =
timeout;
235 mCurlTimeoutDownload = 15;
238 mCurlTimeoutDownload = 15;
240 mCurlTimeoutDownload = 5;
244 if (getenv(
"ALICEO2_CCDB_CURL_TIMEOUT_UPLOAD")) {
245 auto timeout = atoi(getenv(
"ALICEO2_CCDB_CURL_TIMEOUT_UPLOAD"));
254 mCurlTimeoutUpload = 3;
257 mCurlTimeoutUpload = 20;
259 mCurlTimeoutUpload = 20;
266 LOGP(
debug,
"Curl timeouts are set to: download={:2}, upload={:2} seconds", mCurlTimeoutDownload, mCurlTimeoutUpload);
268 LOGP(info,
"Init CcdApi with UserAgentID: {}, Host: {}{}, Curl timeouts: upload:{} download:{}", mUniqueAgentID, host,
269 mInSnapshotMode ?
"(snapshot readonly mode)" : snapshotReport.c_str(), mCurlTimeoutUpload, mCurlTimeoutDownload);
278void CcdbApi::updateMetaInformationInLocalFile(std::string
const&
filename, std::map<std::string, std::string>
const* headers,
CCDBQuery const* querysummary)
280 std::lock_guard<std::mutex> guard(
gIOMutex);
281 auto oldlevel = gErrorIgnoreLevel;
282 gErrorIgnoreLevel = 6001;
283 TFile snapshotfile(
filename.c_str(),
"UPDATE");
285 if (!snapshotfile.IsZombie()) {
287 snapshotfile.WriteObjectAny(querysummary, TClass::GetClass(
typeid(*querysummary)),
CCDBQUERY_ENTRY);
290 snapshotfile.WriteObjectAny(headers, TClass::GetClass(
typeid(*headers)),
CCDBMETA_ENTRY);
292 snapshotfile.Write();
293 snapshotfile.Close();
295 gErrorIgnoreLevel = oldlevel;
305 std::string tmpObjectName = objectName;
306 tmpObjectName.erase(std::remove_if(tmpObjectName.begin(), tmpObjectName.end(),
307 [](
auto const&
c) ->
bool { return (!std::isalnum(c) && c !=
'_' && c !=
'/' && c !=
'.'); }),
308 tmpObjectName.end());
309 return tmpObjectName;
316 std::lock_guard<std::mutex> guard(
gIOMutex);
330 std::string className = rootObject->GetName();
336 std::lock_guard<std::mutex> guard(
gIOMutex);
341 std::map<std::string, std::string>
const& metadata,
342 long startValidityTimestamp,
long endValidityTimestamp,
343 std::vector<char>::size_type
maxSize)
const
347 LOGP(error,
"nullptr is provided for object {}/{}/{}",
path, startValidityTimestamp, endValidityTimestamp);
353 path, metadata, startValidityTimestamp, endValidityTimestamp,
maxSize);
357 const std::string&
path,
const std::map<std::string, std::string>& metadata,
358 long startValidityTimestamp,
long endValidityTimestamp, std::vector<char>::size_type
maxSize)
const
361 LOGP(alarm,
"Object will not be uploaded to {} since its size {} exceeds max allowed {}",
path,
size,
maxSize);
367 long sanitizedStartValidityTimestamp = startValidityTimestamp;
368 if (startValidityTimestamp == -1) {
369 LOGP(info,
"Start of Validity not set, current timestamp used.");
372 long sanitizedEndValidityTimestamp = endValidityTimestamp;
373 if (endValidityTimestamp == -1) {
374 LOGP(info,
"End of Validity not set, start of validity plus 1 day used.");
377 if (mInSnapshotMode) {
378 auto pthLoc = getSnapshotDir(mSnapshotTopPath,
path);
380 auto flLoc = getSnapshotFile(mSnapshotTopPath,
path,
filename);
382 auto pent = flLoc.find_last_of(
'.');
383 if (pent == std::string::npos) {
386 flLoc.insert(pent, fmt::format(
"_{}_{}", startValidityTimestamp, endValidityTimestamp));
387 ofstream outf(flLoc.c_str(), ios::out | ios::binary);
391 throw std::runtime_error(fmt::format(
"Failed to write local CCDB file {}", flLoc));
393 std::map<std::string, std::string> metaheader(metadata);
395 metaheader[
"Valid-From"] =
std::to_string(startValidityTimestamp);
397 updateMetaInformationInLocalFile(flLoc.c_str(), &metaheader);
398 std::string metaStr{};
399 for (
const auto& mentry : metadata) {
400 metaStr += fmt::format(
"{}={};", mentry.first, mentry.second);
402 metaStr +=
"$USER_META;";
403 LOGP(info,
"Created local snapshot {}", flLoc);
404 LOGP(info, R
"(Upload with: o2-ccdb-upload --host "$ccdbhost" -p {} -f {} -k {} --starttimestamp {} --endtimestamp {} -m "{}")",
411 CURL* curl =
nullptr;
412 curl = curl_easy_init();
415 checkMetadataKeys(metadata);
417 if (curl !=
nullptr) {
418 auto mime = curl_mime_init(curl);
419 auto field = curl_mime_addpart(mime);
420 curl_mime_name(field,
"send");
421 curl_mime_filedata(field,
filename.c_str());
424 struct curl_slist* headerlist =
nullptr;
425 static const char buf[] =
"Expect:";
426 headerlist = curl_slist_append(headerlist,
buf);
430 curl_easy_setopt(curl, CURLOPT_MIMEPOST, mime);
431 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headerlist);
432 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
433 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
434 curl_easy_setopt(curl, CURLOPT_TIMEOUT, mCurlTimeoutUpload);
436 CURLcode
res = CURL_LAST;
438 for (
size_t hostIndex = 0; hostIndex < hostsPool.size() &&
res > 0; hostIndex++) {
439 std::string fullUrl = getFullUrlForStorage(curl,
path, objectType, metadata, sanitizedStartValidityTimestamp, sanitizedEndValidityTimestamp, hostIndex);
440 LOG(debug3) <<
"Full URL Encoded: " << fullUrl;
442 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.c_str());
445 res = CURL_perform(curl);
447 if (
res != CURLE_OK) {
448 if (
res == CURLE_OPERATION_TIMEDOUT) {
449 LOGP(alarm,
"curl_easy_perform() timed out. Consider increasing the timeout using the env var `ALICEO2_CCDB_CURL_TIMEOUT_UPLOAD` (seconds), current one is {}", mCurlTimeoutUpload);
451 LOGP(alarm,
"curl_easy_perform() failed: {}", curl_easy_strerror(
res));
458 curl_easy_cleanup(curl);
461 curl_slist_free_all(headerlist);
463 curl_mime_free(mime);
465 LOGP(alarm,
"curl initialization failure");
472 long startValidityTimestamp,
long endValidityTimestamp, std::vector<char>::size_type
maxSize)
const
476 LOGP(error,
"nullptr is provided for object {}/{}/{}",
path, startValidityTimestamp, endValidityTimestamp);
484std::string CcdbApi::getFullUrlForStorage(CURL* curl,
const std::string&
path,
const std::string& objtype,
485 const std::map<std::string, std::string>& metadata,
486 long startValidityTimestamp,
long endValidityTimestamp,
int hostIndex)
const
489 std::string startValidityString = getTimestampString(startValidityTimestamp < 0 ?
getCurrentTimestamp() : startValidityTimestamp);
490 std::string endValidityString = getTimestampString(endValidityTimestamp < 0 ?
getFutureTimestamp(60 * 60 * 24 * 1) : endValidityTimestamp);
492 std::string
url = getHostUrl(hostIndex);
494 std::string fullUrl =
url +
"/" +
path +
"/" + startValidityString +
"/" + endValidityString +
"/";
497 char* objtypeEncoded = curl_easy_escape(curl, objtype.c_str(), objtype.size());
498 fullUrl +=
"ObjectType=" + std::string(objtypeEncoded) +
"/";
499 curl_free(objtypeEncoded);
501 for (
auto& kv : metadata) {
502 std::string mfirst = kv.first;
503 std::string msecond = kv.second;
505 char* mfirstEncoded = curl_easy_escape(curl, mfirst.c_str(), mfirst.size());
506 char* msecondEncoded = curl_easy_escape(curl, msecond.c_str(), msecond.size());
507 fullUrl += std::string(mfirstEncoded) +
"=" + std::string(msecondEncoded) +
"/";
508 curl_free(mfirstEncoded);
509 curl_free(msecondEncoded);
515std::string CcdbApi::getFullUrlForRetrieval(CURL* curl,
const std::string&
path,
const std::map<std::string, std::string>& metadata,
long timestamp,
int hostIndex)
const
517 if (mInSnapshotMode) {
518 return getSnapshotFile(mSnapshotTopPath,
path);
524 std::string hostUrl = getHostUrl(hostIndex);
526 std::string fullUrl = hostUrl +
"/" +
path +
"/" + validityString +
"/";
528 for (
auto& kv : metadata) {
529 std::string mfirst = kv.first;
530 std::string msecond = kv.second;
532 char* mfirstEncoded = curl_easy_escape(curl, mfirst.c_str(), mfirst.size());
533 char* msecondEncoded = curl_easy_escape(curl, msecond.c_str(), msecond.size());
534 fullUrl += std::string(mfirstEncoded) +
"=" + std::string(msecondEncoded) +
"/";
535 curl_free(mfirstEncoded);
536 curl_free(msecondEncoded);
558static size_t WriteMemoryCallback(
void* contents,
size_t size,
size_t nmemb,
void* userp)
560 size_t realsize =
size * nmemb;
563 mem->memory = (
char*)realloc(mem->memory, mem->size + realsize + 1);
564 if (mem->memory ==
nullptr) {
565 printf(
"not enough memory (realloc returned NULL)\n");
569 memcpy(&(mem->memory[mem->size]), contents, realsize);
570 mem->size += realsize;
571 mem->memory[mem->size] = 0;
587static size_t WriteToFileCallback(
void*
ptr,
size_t size,
size_t nmemb, FILE*
stream)
600static CURLcode ssl_ctx_callback(CURL*,
void*,
void* parm)
602 std::string
msg((
const char*)parm);
605 if (
msg.length() > 0 &&
end == -1) {
607 }
else if (
end > 0) {
619 CredentialsKind cmk = mJAlienCredentials->getPreferedCredentials();
622 if (cmk == cNOT_FOUND) {
626 TJAlienCredentialsObject cmo = mJAlienCredentials->get(cmk);
628 char* CAPath = getenv(
"X509_CERT_DIR");
630 curl_easy_setopt(curl_handle, CURLOPT_CAPATH, CAPath);
632 curl_easy_setopt(curl_handle, CURLOPT_CAINFO,
nullptr);
633 curl_easy_setopt(curl_handle, CURLOPT_SSLCERT, cmo.certpath.c_str());
634 curl_easy_setopt(curl_handle, CURLOPT_SSLKEY, cmo.keypath.c_str());
637 curl_easy_setopt(curl_handle, CURLOPT_SSL_CTX_FUNCTION, ssl_ctx_callback);
638 curl_easy_setopt(curl_handle, CURLOPT_SSL_CTX_DATA, mJAlienCredentials->getMessages().c_str());
645void CcdbApi::initCurlOptionsForRetrieve(CURL* curlHandle,
void* chunk,
CurlWriteCallback writeCallback,
bool followRedirect)
const
647 curl_easy_setopt(curlHandle, CURLOPT_WRITEFUNCTION, writeCallback);
648 curl_easy_setopt(curlHandle, CURLOPT_WRITEDATA, chunk);
649 curl_easy_setopt(curlHandle, CURLOPT_FOLLOWLOCATION, followRedirect ? 1L : 0L);
654template <
typename MapType = std::map<std::
string, std::
string>>
655size_t header_map_callback(
char*
buffer,
size_t size,
size_t nitems,
void* userdata)
657 auto* headers =
static_cast<MapType*
>(userdata);
658 auto header = std::string(
buffer,
size * nitems);
659 std::string::size_type
index = header.find(
':', 0);
660 if (
index != std::string::npos) {
661 const auto key = boost::algorithm::trim_copy(header.substr(0,
index));
662 const auto value = boost::algorithm::trim_copy(header.substr(
index + 1));
663 LOGP(
debug,
"Adding #{} {} -> {}", headers->size(),
key,
value);
665 if (
key ==
"Content-Length") {
666 auto cl = headers->find(
"Content-Length");
667 if (cl != headers->end()) {
668 if (std::stol(cl->second) < stol(
value)) {
678 auto cl = headers->find(
"ETag");
679 if (cl != headers->end()) {
685 if (
key ==
"Content-Type") {
686 auto cl = headers->find(
"Content-Type");
687 if (cl != headers->end()) {
693 headers->insert(std::make_pair(
key,
value));
696 return size * nitems;
700void CcdbApi::initCurlHTTPHeaderOptionsForRetrieve(CURL* curlHandle, curl_slist*& option_list,
long timestamp, std::map<std::string, std::string>* headers, std::string
const&
etag,
705 option_list = curl_slist_append(option_list, (
"If-None-Match: " +
etag).c_str());
709 option_list = curl_slist_append(option_list, (
"If-Not-After: " +
createdNotAfter).c_str());
713 option_list = curl_slist_append(option_list, (
"If-Not-Before: " +
createdNotBefore).c_str());
716 if (headers !=
nullptr) {
717 option_list = curl_slist_append(option_list, (
"If-None-Match: " +
to_string(timestamp)).c_str());
718 curl_easy_setopt(curlHandle, CURLOPT_HEADERFUNCTION, header_map_callback<>);
719 curl_easy_setopt(curlHandle, CURLOPT_HEADERDATA, headers);
723 curl_easy_setopt(curlHandle, CURLOPT_HTTPHEADER, option_list);
726 curl_easy_setopt(curlHandle, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
729bool CcdbApi::receiveToFile(FILE* fileHandle, std::string
const&
path, std::map<std::string, std::string>
const& metadata,
730 long timestamp, std::map<std::string, std::string>* headers, std::string
const&
etag,
736bool CcdbApi::receiveToMemory(
void* chunk, std::string
const&
path, std::map<std::string, std::string>
const& metadata,
737 long timestamp, std::map<std::string, std::string>* headers, std::string
const&
etag,
743bool CcdbApi::receiveObject(
void* dataHolder, std::string
const&
path, std::map<std::string, std::string>
const& metadata,
744 long timestamp, std::map<std::string, std::string>* headers, std::string
const&
etag,
749 curlHandle = curl_easy_init();
750 curl_easy_setopt(curlHandle, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
752 if (curlHandle !=
nullptr) {
755 initCurlOptionsForRetrieve(curlHandle, dataHolder, writeCallback, followRedirect);
756 curl_slist* option_list =
nullptr;
759 long responseCode = 0;
760 CURLcode curlResultCode = CURL_LAST;
762 for (
size_t hostIndex = 0; hostIndex < hostsPool.size() && (responseCode >= 400 || curlResultCode > 0); hostIndex++) {
763 std::string fullUrl = getFullUrlForRetrieval(curlHandle,
path, metadata, timestamp, hostIndex);
764 curl_easy_setopt(curlHandle, CURLOPT_URL, fullUrl.c_str());
766 curlResultCode = CURL_perform(curlHandle);
768 if (curlResultCode != CURLE_OK) {
769 LOGP(alarm,
"curl_easy_perform() failed: {}", curl_easy_strerror(curlResultCode));
771 curlResultCode = curl_easy_getinfo(curlHandle, CURLINFO_RESPONSE_CODE, &responseCode);
772 if ((curlResultCode == CURLE_OK) && (responseCode < 300)) {
773 curl_slist_free_all(option_list);
774 curl_easy_cleanup(curlHandle);
777 if (curlResultCode != CURLE_OK) {
778 LOGP(alarm,
"invalid URL {}", fullUrl);
780 LOGP(alarm,
"not found under link {}", fullUrl);
786 curl_slist_free_all(option_list);
787 curl_easy_cleanup(curlHandle);
793 long timestamp)
const
801 bool res = receiveToMemory((
void*)&chunk,
path, metadata, timestamp);
804 std::lock_guard<std::mutex> guard(
gIOMutex);
806 mess.SetBuffer(chunk.
memory, chunk.
size, kFALSE);
811 LOGP(info,
"couldn't retrieve the object {}",
path);
823 std::string
str = inp;
824 str.erase(std::remove_if(
str.begin(),
str.end(), ::isspace),
str.end());
825 str = std::regex_replace(
str, std::regex(
"::"),
"-");
831 long timestamp, std::map<std::string, std::string>* headers, std::string
const&
etag,
838 long timestamp,
bool preservePath, std::string
const& localFileName, std::string
const&
createdNotAfter, std::string
const&
createdNotBefore, std::map<std::string, std::string>* outHeaders)
const
842 std::string fulltargetdir = targetdir + (preservePath ? (
'/' +
path) :
"");
846 }
catch (std::exception e) {
847 LOGP(error,
"Could not create local snapshot cache directory {}, reason: {}", fulltargetdir, e.what());
852 std::map<std::string, std::string> headers;
855 if ((headers.count(
"Error") != 0) || (buff.empty())) {
856 LOGP(error,
"Unable to find object {}/{}, Aborting",
path, timestamp);
860 auto getFileName = [&headers]() {
861 auto& s = headers[
"Content-Disposition"];
863 std::regex re(
"(.*;)filename=\"(.*)\"");
865 if (std::regex_match(s.c_str(),
m, re)) {
869 std::string backupname(
"ccdb-blob.bin");
870 LOG(error) <<
"Cannot determine original filename from Content-Disposition ... falling back to " << backupname;
873 auto filename = localFileName.size() > 0 ? localFileName : getFileName();
874 std::string targetpath = fulltargetdir +
"/" +
filename;
876 std::ofstream objFile(targetpath, std::ios::out | std::ofstream::binary);
877 std::copy(buff.begin(), buff.end(), std::ostreambuf_iterator<char>(objFile));
878 if (!objFile.good()) {
879 LOGP(error,
"Unable to open local file {}, Aborting", targetpath);
885 updateMetaInformationInLocalFile(targetpath.c_str(), &headers, &querysummary);
887 *outHeaders = std::move(headers);
892void CcdbApi::snapshot(std::string
const& ccdbrootpath, std::string
const& localDir,
long timestamp)
const
896 std::map<std::string, std::string> metadata;
897 for (
auto& folder : allfolders) {
907 auto object =
file.GetObjectChecked(what, cl);
911 std::string objectName(cl->GetName());
913 object =
file.GetObjectChecked(objectName.c_str(), cl);
914 LOG(warn) <<
"Did not find object under expected name " << what;
918 LOG(warn) <<
"Found object under deprecated name " << cl->GetName();
923 if (cl->InheritsFrom(
"TObject")) {
926 auto tree =
dynamic_cast<TTree*
>((
TObject*)
object);
928 tree->LoadBaskets(0x1L << 32);
929 tree->SetDirectory(
nullptr);
932 auto h =
dynamic_cast<TH1*
>((
TObject*)
object);
934 h->SetDirectory(
nullptr);
942void* CcdbApi::extractFromLocalFile(std::string
const&
filename, std::type_info
const& tinfo, std::map<std::string, std::string>* headers)
const
944 if (!std::filesystem::exists(
filename)) {
945 LOG(error) <<
"Local snapshot " <<
filename <<
" not found \n";
948 std::lock_guard<std::mutex> guard(
gIOMutex);
949 auto tcl = tinfo2TClass(tinfo);
954 *headers = *storedmeta;
957 if ((
isSnapshotMode() || mPreferSnapshotCache) && headers->find(
"ETag") == headers->end()) {
960 if (headers->find(
"fileSize") == headers->end()) {
961 (*headers)[
"fileSize"] = fmt::format(
"{}",
f.GetEND());
967bool CcdbApi::initTGrid()
const
969 if (mNeedAlienToken && !gGrid) {
970 static bool allowNoToken = getenv(
"ALICEO2_CCDB_NOTOKENCHECK") && atoi(getenv(
"ALICEO2_CCDB_NOTOKENCHECK"));
972 LOG(fatal) <<
"Alien Token Check failed - Please get an alien token before running with https CCDB endpoint, or alice-ccdb.cern.ch!";
974 TGrid::Connect(
"alien");
975 static bool errorShown =
false;
976 if (!gGrid && errorShown ==
false) {
978 LOG(error) <<
"TGrid::Connect returned nullptr. May be due to missing alien token";
980 LOG(fatal) <<
"TGrid::Connect returned nullptr. May be due to missing alien token";
985 return gGrid !=
nullptr;
988void* CcdbApi::downloadFilesystemContent(std::string
const&
url, std::type_info
const& tinfo, std::map<std::string, std::string>* headers)
const
990 if ((
url.find(
"alien:/", 0) != std::string::npos) && !initTGrid()) {
993 std::lock_guard<std::mutex> guard(
gIOMutex);
994 auto memfile = TMemFile::Open(
url.c_str(),
"OPEN");
996 auto cl = tinfo2TClass(tinfo);
998 if (headers && headers->find(
"fileSize") == headers->end()) {
999 (*headers)[
"fileSize"] = fmt::format(
"{}", memfile->GetEND());
1007void* CcdbApi::interpretAsTMemFileAndExtract(
char* contentptr,
size_t contentsize, std::type_info
const& tinfo)
1010 Int_t previousErrorLevel = gErrorIgnoreLevel;
1011 gErrorIgnoreLevel = kFatal;
1012 std::lock_guard<std::mutex> guard(
gIOMutex);
1013 TMemFile memFile(
"name", contentptr, contentsize,
"READ");
1014 gErrorIgnoreLevel = previousErrorLevel;
1015 if (!memFile.IsZombie()) {
1016 auto tcl = tinfo2TClass(tinfo);
1027void* CcdbApi::navigateURLsAndRetrieveContent(CURL* curl_handle, std::string
const&
url, std::type_info
const& tinfo, std::map<std::string, std::string>* headers)
const
1032 static thread_local std::multimap<std::string, std::string> headerData;
1035 if ((
url.find(
"alien:/", 0) != std::string::npos) || (
url.find(
"file:/", 0) != std::string::npos)) {
1036 return downloadFilesystemContent(
url, tinfo, headers);
1043 curl_easy_setopt(curl_handle, CURLOPT_URL,
url.c_str());
1045 MemoryStruct chunk{(
char*)malloc(1), 0};
1046 initCurlOptionsForRetrieve(curl_handle, (
void*)&chunk, WriteMemoryCallback,
false);
1048 curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, header_map_callback<
decltype(headerData)>);
1050 curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, (
void*)&headerData);
1054 auto res = CURL_perform(curl_handle);
1055 long response_code = -1;
1056 void* content =
nullptr;
1058 if (
res == CURLE_OK && curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &response_code) == CURLE_OK) {
1060 for (
auto& p : headerData) {
1061 (*headers)[
p.first] =
p.second;
1064 if (200 <= response_code && response_code < 300) {
1066 content = interpretAsTMemFileAndExtract(chunk.memory, chunk.size, tinfo);
1067 if (headers && headers->find(
"fileSize") == headers->end()) {
1068 (*headers)[
"fileSize"] = fmt::format(
"{}", chunk.size);
1070 }
else if (response_code == 304) {
1075 LOGP(
debug,
"Object exists but I am not serving it since it's already in your possession");
1078 else if (300 <= response_code && response_code < 400) {
1084 auto complement_Location = [
this](std::string
const& loc) {
1085 if (loc[0] ==
'/') {
1092 std::vector<std::string> locs;
1093 auto iter = headerData.find(
"Location");
1094 if (iter != headerData.end()) {
1095 locs.push_back(complement_Location(iter->second));
1098 auto iter2 = headerData.find(
"Content-Location");
1099 if (iter2 != headerData.end()) {
1100 auto range = headerData.equal_range(
"Content-Location");
1101 for (
auto it =
range.first; it !=
range.second; ++it) {
1102 if (std::find(locs.begin(), locs.end(), it->second) == locs.end()) {
1103 locs.push_back(complement_Location(it->second));
1107 for (
auto& l : locs) {
1109 LOG(
debug) <<
"Trying content location " << l;
1110 content = navigateURLsAndRetrieveContent(curl_handle, l, tinfo, headers);
1116 }
else if (response_code == 404) {
1117 LOG(error) <<
"Requested resource does not exist: " <<
url;
1120 LOG(error) <<
"Error in fetching object " <<
url <<
", curl response code:" << response_code;
1124 if (chunk.memory !=
nullptr) {
1128 LOGP(alarm,
"Curl request to {} failed with result {}, response code: {}",
url,
int(
res), response_code);
1133 (*headers)[
"Error"] =
"An error occurred during retrieval";
1139 std::map<std::string, std::string>
const& metadata,
long timestamp,
1140 std::map<std::string, std::string>* headers, std::string
const&
etag,
1143 if (!mSnapshotCachePath.empty()) {
1145 auto semaphore_barrier = std::make_unique<CCDBSemaphore>(mSnapshotCachePath,
path);
1146 std::string logfile = mSnapshotCachePath +
"/log";
1147 std::fstream out(logfile, ios_base::out | ios_base::app);
1148 if (out.is_open()) {
1149 out <<
"CCDB-access[" << getpid() <<
"] of " << mUniqueAgentID <<
" to " <<
path <<
" timestamp " << timestamp <<
"\n";
1151 auto snapshotfile = getSnapshotFile(mSnapshotCachePath,
path);
1152 bool snapshoting =
false;
1153 if (!std::filesystem::exists(snapshotfile)) {
1155 out <<
"CCDB-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
" downloading to snapshot " << snapshotfile <<
"\n";
1158 out <<
"CCDB-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
" failed to create directory for " << snapshotfile <<
"\n";
1161 out <<
"CCDB-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
"serving from local snapshot " << snapshotfile <<
"\n";
1164 auto res = extractFromLocalFile(snapshotfile, tinfo, headers);
1166 logReading(
path, timestamp, headers,
"retrieve from snapshot");
1173 CURL* curl_handle = curl_easy_init();
1174 curl_easy_setopt(curl_handle, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1175 std::string fullUrl = getFullUrlForRetrieval(curl_handle,
path, metadata, timestamp);
1177 if (mInSnapshotMode) {
1178 auto res = extractFromLocalFile(fullUrl, tinfo, headers);
1180 logReading(
path, timestamp, headers,
"retrieve from snapshot");
1185 curl_slist* option_list =
nullptr;
1187 auto content = navigateURLsAndRetrieveContent(curl_handle, fullUrl, tinfo, headers);
1189 for (
size_t hostIndex = 1; hostIndex < hostsPool.size() && !(content); hostIndex++) {
1190 fullUrl = getFullUrlForRetrieval(curl_handle,
path, metadata, timestamp, hostIndex);
1191 content = navigateURLsAndRetrieveContent(curl_handle, fullUrl, tinfo, headers);
1194 logReading(
path, timestamp, headers,
"retrieve");
1196 curl_slist_free_all(option_list);
1197 curl_easy_cleanup(curl_handle);
1203 size_t newLength =
size * nmemb;
1204 size_t oldLength = s->size();
1206 s->resize(oldLength + newLength);
1207 }
catch (std::bad_alloc& e) {
1208 LOG(error) <<
"memory error when getting data from CCDB";
1212 std::copy((
char*)contents, (
char*)contents + newLength, s->begin() + oldLength);
1213 return size * nmemb;
1219 CURLcode
res = CURL_LAST;
1222 curl = curl_easy_init();
1223 if (curl !=
nullptr) {
1225 curl_easy_setopt(curl, CURLOPT_WRITEDATA, &
result);
1226 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1228 struct curl_slist* headers =
nullptr;
1229 headers = curl_slist_append(headers, (std::string(
"Accept: ") + returnFormat).c_str());
1230 headers = curl_slist_append(headers, (std::string(
"Content-Type: ") + returnFormat).c_str());
1237 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
1241 std::string fullUrl;
1243 for (
size_t hostIndex = 0; hostIndex < hostsPool.size() &&
res != CURLE_OK; hostIndex++) {
1244 fullUrl = getHostUrl(hostIndex);
1245 fullUrl += latestOnly ?
"/latest/" :
"/browse/";
1247 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.c_str());
1249 res = CURL_perform(curl);
1250 if (
res != CURLE_OK) {
1251 LOGP(alarm,
"CURL_perform() failed: {}", curl_easy_strerror(
res));
1254 curl_slist_free_all(headers);
1255 curl_easy_cleanup(curl);
1261std::string CcdbApi::getTimestampString(
long timestamp)
const
1272 stringstream fullUrl;
1275 curl = curl_easy_init();
1276 if (curl !=
nullptr) {
1277 curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST,
"DELETE");
1278 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1281 for (
size_t hostIndex = 0; hostIndex < hostsPool.size(); hostIndex++) {
1282 fullUrl << getHostUrl(hostIndex) <<
"/" <<
path <<
"/" << timestampLocal;
1283 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.str().c_str());
1286 res = CURL_perform(curl);
1287 if (
res != CURLE_OK) {
1288 LOGP(alarm,
"CURL_perform() failed: {}", curl_easy_strerror(
res));
1290 curl_easy_cleanup(curl);
1299 stringstream fullUrl;
1300 for (
size_t i = 0;
i < hostsPool.size();
i++) {
1301 std::string
url = getHostUrl(
i);
1302 fullUrl <<
url <<
"/truncate/" <<
path;
1304 curl = curl_easy_init();
1305 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1306 if (curl !=
nullptr) {
1307 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.str().c_str());
1312 res = CURL_perform(curl);
1313 if (
res != CURLE_OK) {
1314 LOGP(alarm,
"CURL_perform() failed: {}", curl_easy_strerror(
res));
1316 curl_easy_cleanup(curl);
1323 return size * nmemb;
1329 CURLcode
res = CURL_LAST;
1332 curl = curl_easy_init();
1333 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1335 for (
size_t hostIndex = 0; hostIndex < hostsPool.size() &&
res != CURLE_OK; hostIndex++) {
1336 curl_easy_setopt(curl, CURLOPT_URL, mUrl.data());
1337 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
write_data);
1339 res = CURL_perform(curl);
1344 curl_easy_cleanup(curl);
1353 std::stringstream ss(reply.c_str());
1355 std::vector<std::string> folders;
1357 size_t numberoflines = std::count(reply.begin(), reply.end(),
'\n');
1358 bool inSubFolderSection =
false;
1360 for (
size_t linenumber = 0; linenumber < numberoflines; ++linenumber) {
1361 std::getline(ss, line);
1362 if (inSubFolderSection && line.size() > 0) {
1367 if (line.compare(
"Subfolders:") == 0) {
1368 inSubFolderSection =
true;
1376size_t header_callback(
char*
buffer,
size_t size,
size_t nitems,
void* userdata)
1378 auto* headers =
static_cast<std::vector<std::string>*
>(userdata);
1379 auto header = std::string(
buffer,
size * nitems);
1380 headers->emplace_back(std::string(header.data()));
1381 return size * nitems;
1389 auto p = std::filesystem::path(
filename).parent_path();
1390 if (!std::filesystem::exists(p)) {
1391 std::filesystem::create_directories(p);
1394 rapidjson::StringBuffer
buffer;
1395 rapidjson::Writer<rapidjson::StringBuffer> writer(
buffer);
1396 writer.StartObject();
1397 for (
const auto& pair : meta) {
1398 writer.Key(pair.first.c_str());
1399 writer.String(pair.second.c_str());
1405 if (
file.is_open()) {
1418 if (!
file.is_open()) {
1419 std::cerr <<
"Failed to open file for reading." << std::endl;
1423 std::string jsonStr((std::istreambuf_iterator<char>(
file)), std::istreambuf_iterator<char>());
1426 rapidjson::Document document;
1427 document.Parse(jsonStr.c_str());
1429 if (document.HasParseError()) {
1430 std::cerr <<
"Error parsing JSON" << std::endl;
1435 for (
auto itr = document.MemberBegin(); itr != document.MemberEnd(); ++itr) {
1436 meta[itr->name.GetString()] = itr->value.GetString();
1441std::map<std::string, std::string>
CcdbApi::retrieveHeaders(std::string
const&
path, std::map<std::string, std::string>
const& metadata,
long timestamp)
const
1444 auto do_remote_header_call = [
this, &
path, &metadata, timestamp]() -> std::map<std::string, std::string> {
1445 CURL* curl = curl_easy_init();
1446 CURLcode
res = CURL_LAST;
1447 std::string fullUrl = getFullUrlForRetrieval(curl,
path, metadata, timestamp);
1448 std::map<std::string, std::string> headers;
1450 if (curl !=
nullptr) {
1451 struct curl_slist*
list =
nullptr;
1454 curl_easy_setopt(curl, CURLOPT_HTTPHEADER,
list);
1457 curl_easy_setopt(curl, CURLOPT_NOBODY, 1L);
1458 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
1459 curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, header_map_callback<>);
1460 curl_easy_setopt(curl, CURLOPT_HEADERDATA, &headers);
1461 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1466 long httpCode = 404;
1467 CURLcode getCodeRes = CURL_LAST;
1468 for (
size_t hostIndex = 0; hostIndex < hostsPool.size() && (httpCode >= 400 ||
res > 0 || getCodeRes > 0); hostIndex++) {
1469 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.c_str());
1470 res = CURL_perform(curl);
1471 if (
res != CURLE_OK &&
res != CURLE_UNSUPPORTED_PROTOCOL) {
1475 LOG(error) <<
"CURL_perform() failed: " << curl_easy_strerror(
res);
1477 getCodeRes = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode);
1479 if (httpCode == 404) {
1482 curl_easy_cleanup(curl);
1487 if (!mSnapshotCachePath.empty()) {
1489 auto semaphore_barrier = std::make_unique<CCDBSemaphore>(mSnapshotCachePath + std::string(
"_headers"),
path);
1491 std::string logfile = mSnapshotCachePath +
"/log";
1492 std::fstream out(logfile, ios_base::out | ios_base::app);
1493 if (out.is_open()) {
1494 out <<
"CCDB-header-access[" << getpid() <<
"] of " << mUniqueAgentID <<
" to " <<
path <<
" timestamp " << timestamp <<
"\n";
1496 auto snapshotfile = getSnapshotFile(mSnapshotCachePath,
path +
"/" +
std::to_string(timestamp),
"header.json");
1497 if (!std::filesystem::exists(snapshotfile)) {
1498 out <<
"CCDB-header-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
" storing to snapshot " << snapshotfile <<
"\n";
1501 auto meta = do_remote_header_call();
1505 LOG(warn) <<
"Failed to cache the header information to disc";
1509 out <<
"CCDB-header-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
"serving from local snapshot " << snapshotfile <<
"\n";
1510 std::map<std::string, std::string> meta;
1512 LOG(warn) <<
"Failed to read cached information from disc";
1513 return do_remote_header_call();
1518 return do_remote_header_call();
1523 auto curl = curl_easy_init();
1529 struct curl_slist*
list =
nullptr;
1530 list = curl_slist_append(
list, (
"If-None-Match: " +
etag).c_str());
1532 curl_easy_setopt(curl, CURLOPT_HTTPHEADER,
list);
1534 curl_easy_setopt(curl, CURLOPT_URL,
url.c_str());
1536 curl_easy_setopt(curl, CURLOPT_NOBODY, 1L);
1537 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
1538 curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, header_callback);
1539 curl_easy_setopt(curl, CURLOPT_HEADERDATA, &headers);
1540 if (!agentID.empty()) {
1541 curl_easy_setopt(curl, CURLOPT_USERAGENT, agentID.c_str());
1547 curl_easy_perform(curl);
1548 long http_code = 404;
1549 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code);
1550 if (http_code == 304) {
1558 static std::string etagHeader =
"ETag: ";
1559 static std::string locationHeader =
"Content-Location: ";
1560 for (
auto h : headers) {
1561 if (
h.find(etagHeader) == 0) {
1562 etag = std::string(
h.data() + etagHeader.size());
1563 }
else if (
h.find(locationHeader) == 0) {
1564 pfns.emplace_back(std::string(
h.data() + locationHeader.size(),
h.size() - locationHeader.size()));
1580 auto object =
file.GetObjectChecked(
CCDBMETA_ENTRY, TClass::GetClass(
typeid(std::map<std::string, std::string>)));
1582 return static_cast<std::map<std::string, std::string>*
>(
object);
1589void traverseAndFillFolders(
CcdbApi const& api, std::string
const&
top, std::vector<std::string>& folders)
1593 folders.emplace_back(
top);
1596 if (subfolders.size() > 0) {
1598 for (
auto& sub : subfolders) {
1599 traverseAndFillFolders(api, sub, folders);
1609 std::vector<std::string> folders;
1610 traverseAndFillFolders(*
this,
top, folders);
1614TClass* CcdbApi::tinfo2TClass(std::type_info
const& tinfo)
1616 TClass* cl = TClass::GetClass(tinfo);
1618 throw std::runtime_error(fmt::format(
"Could not retrieve ROOT dictionary for type {}, aborting", tinfo.name()));
1624int CcdbApi::updateMetadata(std::string
const&
path, std::map<std::string, std::string>
const& metadata,
long timestamp, std::string
const&
id,
long newEOV)
1627 CURL* curl = curl_easy_init();
1628 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1629 if (curl !=
nullptr) {
1631 stringstream fullUrl;
1632 for (
size_t hostIndex = 0; hostIndex < hostsPool.size(); hostIndex++) {
1633 fullUrl << getHostUrl(hostIndex) <<
"/" <<
path <<
"/" << timestamp;
1635 fullUrl <<
"/" << newEOV;
1638 fullUrl <<
"/" <<
id;
1642 for (
auto& kv : metadata) {
1643 std::string mfirst = kv.first;
1644 std::string msecond = kv.second;
1646 char* mfirstEncoded = curl_easy_escape(curl, mfirst.c_str(), mfirst.size());
1647 char* msecondEncoded = curl_easy_escape(curl, msecond.c_str(), msecond.size());
1648 fullUrl << std::string(mfirstEncoded) +
"=" + std::string(msecondEncoded) +
"&";
1649 curl_free(mfirstEncoded);
1650 curl_free(msecondEncoded);
1653 if (curl !=
nullptr) {
1654 LOG(
debug) <<
"passing to curl: " << fullUrl.str();
1655 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.str().c_str());
1656 curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST,
"PUT");
1657 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1658 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
1662 res = CURL_perform(curl);
1663 if (
res != CURLE_OK) {
1664 LOGP(alarm,
"CURL_perform() failed: {}, code: {}", curl_easy_strerror(
res),
int(
res));
1669 curl_easy_cleanup(curl);
1676void CcdbApi::initHostsPool(std::string hosts)
1679 auto splitted = hosts | std::views::transform([](
char c) {
return (
c ==
';') ?
',' :
c; }) | std::views::split(
',');
1680 for (
auto&& part : splitted) {
1681 hostsPool.emplace_back(part.begin(), part.end());
1685std::string CcdbApi::getHostUrl(
int hostIndex)
const
1687 return hostsPool.at(hostIndex);
1693 data->hoPair.object = &requestContext.
dest;
1695 std::function<bool(std::string)> localContentCallback = [
this, &requestContext](std::string
url) {
1699 auto writeCallback = [](
void* contents,
size_t size,
size_t nmemb,
void* chunkptr) {
1701 auto& chunk = *ho.
object;
1702 size_t realsize =
size * nmemb, sz = 0;
1705 if (chunk.capacity() < chunk.size() + realsize) {
1707 const char hannot[] =
"header";
1708 size_t hsize = getFlatHeaderSize(ho.header);
1709 auto cl = ho.header.find(
"Content-Length");
1710 if (cl != ho.header.end()) {
1711 size_t sizeFromHeader = std::stol(cl->second);
1712 sz = hsize + std::max(chunk.size() * (sizeFromHeader ? 1 : 2) + realsize, sizeFromHeader);
1714 sz = hsize + std::max(chunk.size() * 2, chunk.size() + realsize);
1719 char* contC = (
char*)contents;
1720 chunk.insert(chunk.end(), contC, contC + realsize);
1721 }
catch (std::exception e) {
1728 CURL* curl_handle = curl_easy_init();
1729 curl_easy_setopt(curl_handle, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1730 std::string fullUrl = getFullUrlForRetrieval(curl_handle, requestContext.
path, requestContext.
metadata, requestContext.
timestamp);
1731 curl_slist* options_list =
nullptr;
1732 initCurlHTTPHeaderOptionsForRetrieve(curl_handle, options_list, requestContext.
timestamp, &requestContext.
headers,
1736 data->hosts = hostsPool;
1739 data->localContentCallback = localContentCallback;
1740 data->userAgent = mUniqueAgentID;
1741 data->optionsList = options_list;
1743 curl_easy_setopt(curl_handle, CURLOPT_URL, fullUrl.c_str());
1744 initCurlOptionsForRetrieve(curl_handle, (
void*)(&
data->hoPair), writeCallback,
false);
1745 curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, header_map_callback<
decltype(
data->hoPair.header)>);
1746 curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, (
void*)&(
data->hoPair.header));
1747 curl_easy_setopt(curl_handle, CURLOPT_PRIVATE, (
void*)
data);
1750 asynchPerform(curl_handle, requestCounter);
1755 std::hash<std::string> hasher;
1756 std::string semhashedstring =
"aliceccdb" +
std::to_string(hasher(basedir + ccdbpath)).substr(0, 16);
1757 return semhashedstring;
1765 return new boost::interprocess::named_semaphore(boost::interprocess::open_or_create_t{}, semhashedstring.c_str(), 1);
1766 }
catch (std::exception e) {
1767 LOG(warn) <<
"Exception occurred during CCDB (cache) semaphore setup; Continuing without";
1776 if (sem->try_wait()) {
1787 boost::interprocess::named_semaphore semaphore(boost::interprocess::open_only, semaname.c_str());
1788 std::cout <<
"Found CCDB semaphore: " << semaname <<
"\n";
1790 auto success = boost::interprocess::named_semaphore::remove(semaname.c_str());
1792 std::cout <<
"Removed CCDB semaphore: " << semaname <<
"\n";
1797 }
catch (std::exception
const& e) {
1808 namespace fs = std::filesystem;
1809 std::string fileName{
"snapshot.root"};
1811 auto absolutesnapshotdir = fs::weakly_canonical(fs::absolute(snapshotdir));
1812 for (
const auto&
entry : fs::recursive_directory_iterator(absolutesnapshotdir)) {
1813 if (
entry.is_directory()) {
1814 const fs::path& currentDir = fs::canonical(fs::absolute(
entry.path()));
1815 fs::path filePath = currentDir / fileName;
1816 if (fs::exists(filePath) && fs::is_regular_file(filePath)) {
1817 std::cout <<
"Directory with file '" << fileName <<
"': " << currentDir << std::endl;
1821 auto numtokens = pathtokens.size();
1822 if (numtokens < 3) {
1827 std::string
path = pathtokens[numtokens - 3] +
"/" + pathtokens[numtokens - 2] +
"/" + pathtokens[numtokens - 1];
1833 }
catch (std::exception
const& e) {
1834 LOG(info) <<
"Semaphore search had exception " << e.what();
1839 long timestamp, std::map<std::string, std::string>& headers,
1842 if (createSnapshot) {
1843 std::string logfile = mSnapshotCachePath +
"/log";
1844 std::fstream logStream = std::fstream(logfile, ios_base::out | ios_base::app);
1845 if (logStream.is_open()) {
1846 logStream <<
"CCDB-access[" << getpid() <<
"] of " << mUniqueAgentID <<
" to " <<
path <<
" timestamp " << timestamp <<
" for load to memory\n";
1849 if (mInSnapshotMode) {
1854 }
else if (mPreferSnapshotCache && std::filesystem::exists(snapshotpath)) {
1866 if (!mSnapshotCachePath.empty() && !(mInSnapshotMode && mSnapshotTopPath == mSnapshotCachePath)) {
1867 auto semaphore_barrier = std::make_unique<CCDBSemaphore>(mSnapshotCachePath, requestContext.
path);
1869 auto snapshotdir = getSnapshotDir(mSnapshotCachePath, requestContext.
path);
1870 std::string snapshotpath = getSnapshotFile(mSnapshotCachePath, requestContext.
path);
1872 std::fstream logStream;
1873 if (logStream.is_open()) {
1874 logStream <<
"CCDB-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
" downloading to snapshot " << snapshotpath <<
" from memory\n";
1877 LOGP(
debug,
"creating snapshot {} -> {}", requestContext.
path, snapshotpath);
1880 std::ofstream objFile(snapshotpath, std::ios::out | std::ofstream::binary);
1881 std::copy(requestContext.
dest.begin(), requestContext.
dest.end(), std::ostreambuf_iterator<char>(objFile));
1884 updateMetaInformationInLocalFile(snapshotpath, &requestContext.
headers, &querysummary);
1890 std::map<std::string, std::string>
const& metadata,
long timestamp,
1891 std::map<std::string, std::string>* headers, std::string
const&
etag,
1895 destP.reserve(dest.size());
1898 dest.reserve(destP.size());
1899 for (
const auto c : destP) {
1905 std::map<std::string, std::string>
const& metadata,
long timestamp,
1906 std::map<std::string, std::string>* headers, std::string
const&
etag,
1918 std::vector<RequestContext> contexts = {requestContext};
1924 size_t hsize = getFlatHeaderSize(headers), cnt = dest.size();
1925 dest.resize(cnt + hsize);
1926 auto addString = [&dest, &cnt](
const std::string& s) {
1933 for (
auto&
h : headers) {
1935 addString(
h.second);
1937 *
reinterpret_cast<int*
>(&dest[cnt]) = hsize;
1938 std::memcpy(&dest[cnt +
sizeof(
int)], FlatHeaderAnnot,
sizeof(FlatHeaderAnnot));
1943 LOGP(
debug,
"loadFileToMemory {} ETag=[{}]", requestContext.
path, requestContext.
etag);
1944 bool createSnapshot = requestContext.
considerSnapshot && !mSnapshotCachePath.empty();
1946 std::string snapshotpath;
1947 if (mInSnapshotMode || std::filesystem::exists(snapshotpath = getSnapshotFile(mSnapshotCachePath, requestContext.
path))) {
1948 auto semaphore_barrier = std::make_unique<CCDBSemaphore>(mSnapshotCachePath, requestContext.
path);
1959 std::vector<int> fromSnapshots(requestContexts.size());
1960 size_t requestCounter = 0;
1963 for (
int i = 0;
i < requestContexts.size();
i++) {
1965 auto& requestContext = requestContexts.at(
i);
1970 while (requestCounter > 0) {
1975 for (
int i = 0;
i < requestContexts.size();
i++) {
1976 auto& requestContext = requestContexts.at(
i);
1977 if (!requestContext.dest.empty()) {
1978 logReading(requestContext.path, requestContext.timestamp, &requestContext.headers,
1979 fmt::format(
"{}{}", requestContext.considerSnapshot ?
"load to memory" :
"retrieve", fromSnapshots.at(
i) ?
" from snapshot" :
""));
1980 if (requestContext.considerSnapshot && fromSnapshots.at(
i) != 2) {
1989 if (
url.find(
"alien:/", 0) != std::string::npos) {
1990 std::map<std::string, std::string> localHeaders;
1992 auto it = localHeaders.find(
"Error");
1993 if (it != localHeaders.end() && it->second ==
"An error occurred during retrieval") {
1999 if ((
url.find(
"file:/", 0) != std::string::npos)) {
2000 std::string
path =
url.substr(7);
2001 if (std::filesystem::exists(
path)) {
2002 std::map<std::string, std::string> localHeaders;
2004 auto it = localHeaders.find(
"Error");
2005 if (it != localHeaders.end() && it->second ==
"An error occurred during retrieval") {
2018 constexpr size_t MaxCopySize = 0x1L << 25;
2019 auto signalError = [&dest, localHeaders]() {
2023 (*localHeaders)[
"Error"] =
"An error occurred during retrieval";
2026 if (
path.find(
"alien:/") == 0 && !initTGrid()) {
2030 std::string fname(
path);
2031 if (fname.find(
"?filetype=raw") == std::string::npos) {
2032 fname +=
"?filetype=raw";
2034 std::unique_ptr<TFile> sfile{TFile::Open(fname.c_str())};
2035 if (!sfile || sfile->IsZombie()) {
2036 LOG(error) <<
"Failed to open file " << fname;
2040 size_t totalread = 0, fsize = sfile->GetSize(), b00 = sfile->GetBytesRead();
2042 char* dptr = dest.data();
2046 size_t b0 = sfile->GetBytesRead(), b1 = b0 - b00;
2047 size_t readsize = fsize - b1 > MaxCopySize ? MaxCopySize : fsize - b1;
2048 if (readsize == 0) {
2051 sfile->Seek(totalread, TFile::kBeg);
2052 bool failed = sfile->ReadBuffer(dptr, (Int_t)readsize);
2053 nread = sfile->GetBytesRead() - b0;
2054 if (
failed || nread < 0) {
2055 LOG(error) <<
"failed to copy file " << fname <<
" to memory buffer";
2061 }
while (nread == (
long)MaxCopySize);
2063 if (localHeaders && fetchLocalMetaData) {
2064 TMemFile memFile(
"name",
const_cast<char*
>(dest.data()), dest.size(),
"READ");
2065 auto storedmeta = (std::map<std::string, std::string>*)
extractFromTFile(memFile, TClass::GetClass(
"std::map<std::string, std::string>"),
CCDBMETA_ENTRY);
2067 *localHeaders = *storedmeta;
2070 if ((
isSnapshotMode() || mPreferSnapshotCache) && localHeaders->find(
"ETag") == localHeaders->end()) {
2071 (*localHeaders)[
"ETag"] =
path;
2073 if (localHeaders->find(
"fileSize") == localHeaders->end()) {
2074 (*localHeaders)[
"fileSize"] = fmt::format(
"{}", memFile.GetEND());
2080void CcdbApi::checkMetadataKeys(std::map<std::string, std::string>
const& metadata)
const
2086 const std::regex regexPatternSearch(R
"([ :;.,\\/'?!\(\)\{\}\[\]@<>=+*#$&`|~^%])");
2087 bool isInvalid =
false;
2089 for (
auto& el : metadata) {
2090 auto keyMd = el.first;
2092 std::smatch searchRes;
2093 while (std::regex_search(keyMd, searchRes, regexPatternSearch)) {
2095 LOG(error) <<
"Invalid character found in metadata key '" << tmp <<
"\': '" << searchRes.str() <<
"\'";
2096 keyMd = searchRes.suffix();
2100 LOG(fatal) <<
"Some metadata keys have invalid characters, please fix!";
2105void CcdbApi::logReading(
const std::string&
path,
long ts,
const std::map<std::string, std::string>* headers,
const std::string& comment)
const
2107 std::string upath{
path};
2109 auto ent = headers->find(
"Valid-From");
2110 if (ent != headers->end()) {
2111 upath +=
"/" + ent->second;
2113 ent = headers->find(
"ETag");
2114 if (ent != headers->end()) {
2115 upath +=
"/" + ent->second;
2118 upath.erase(remove(upath.begin(), upath.end(),
'\"'), upath.end());
2119 LOGP(info,
"ccdb reads {}{}{} for {} ({}, agent_id: {}), ", mUrl, mUrl.back() ==
'/' ?
"" :
"/", upath, ts < 0 ?
getCurrentTimestamp() : ts, comment, mUniqueAgentID);
2122void CcdbApi::asynchPerform(CURL* handle,
size_t* requestCounter)
const
2127CURLcode CcdbApi::CURL_perform(CURL* handle)
const
2129 if (mIsCCDBDownloaderPreferred) {
2130 return mDownloader->
perform(handle);
2133 for (
int i = 1;
i <= mCurlRetries && (
result = curl_easy_perform(handle)) != CURLE_OK;
i++) {
2134 usleep(mCurlDelayRetries *
i);
2145 LOG(
debug) <<
"Entering semaphore barrier";
2148 mSem =
new boost::interprocess::named_semaphore(boost::interprocess::open_or_create_t{}, mSemName.c_str(), 1);
2149 }
catch (std::exception e) {
2150 LOG(warn) <<
"Exception occurred during CCDB (cache) semaphore setup; Continuing without";
2155 gSemaRegistry.
add(
this);
2162 LOG(
debug) <<
"Ending semaphore barrier";
2165 if (mSem->try_wait()) {
2167 boost::interprocess::named_semaphore::remove(mSemName.c_str());
2169 gSemaRegistry.
remove(
this);
2175 LOG(
debug) <<
"Cleaning up semaphore registry with count " << mStore.size();
2176 for (
auto& s : mStore) {
std::string createdNotBefore
std::string createdNotAfter
Class for time synchronization of RawReader instances.
void setRequestTimeoutTime(int timeoutMS)
CURLcode perform(CURL *handle)
void asynchSchedule(CURL *handle, size_t *requestCounter)
void setKeepaliveTimeoutTime(int timeoutMS)
void runLoop(bool noWait)
CCDBSemaphore(std::string const &cachepath, std::string const &path)
static void curlSetSSLOptions(CURL *curl)
static std::string generateFileName(const std::string &inp)
std::string list(std::string const &path="", bool latestOnly=false, std::string const &returnFormat="text/plain", long createdNotAfter=-1, long createdNotBefore=-1) const
int storeAsTFile_impl(const void *obj1, std::type_info const &info, std::string const &path, std::map< std::string, std::string > const &metadata, long startValidityTimestamp=-1, long endValidityTimestamp=-1, std::vector< char >::size_type maxSize=0) const
static bool checkAlienToken()
void runDownloaderLoop(bool noWait)
void releaseNamedSemaphore(boost::interprocess::named_semaphore *sem, std::string const &path) const
static std::map< std::string, std::string > * retrieveMetaInfo(TFile &)
void scheduleDownload(RequestContext &requestContext, size_t *requestCounter) const
TObject * retrieve(std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp) const
void init(std::string const &hosts)
TObject * retrieveFromTFile(std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp, std::map< std::string, std::string > *headers, std::string const &etag, const std::string &createdNotAfter, const std::string &createdNotBefore) const
std::string const & getURL() const
void getFromSnapshot(bool createSnapshot, std::string const &path, long timestamp, std::map< std::string, std::string > &headers, std::string &snapshotpath, o2::pmr::vector< char > &dest, int &fromSnapshot, std::string const &etag) const
bool loadLocalContentToMemory(o2::pmr::vector< char > &dest, std::string &url) const
static void removeLeakingSemaphores(std::string const &basedir, bool remove=false)
void saveSnapshot(RequestContext &requestContext) const
static std::unique_ptr< std::vector< char > > createObjectImage(const T *obj, CcdbObjectInfo *info=nullptr)
static void * extractFromTFile(TFile &file, TClass const *cl, const char *what=CCDBOBJECT_ENTRY)
int storeAsTFile(const TObject *rootObject, std::string const &path, std::map< std::string, std::string > const &metadata, long startValidityTimestamp=-1, long endValidityTimestamp=-1, std::vector< char >::size_type maxSize=0) const
void snapshot(std::string const &ccdbrootpath, std::string const &localDir, long timestamp) const
bool isSnapshotMode() const
bool isHostReachable() const
void loadFileToMemory(std::vector< char > &dest, std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp, std::map< std::string, std::string > *headers, std::string const &etag, const std::string &createdNotAfter, const std::string &createdNotBefore, bool considerSnapshot=true) const
static std::string determineSemaphoreName(std::string const &basedir, std::string const &objectpath)
void deleteObject(std::string const &path, long timestamp=-1) const
static void appendFlatHeader(o2::pmr::vector< char > &dest, const std::map< std::string, std::string > &headers)
std::vector< std::string > getAllFolders(std::string const &top) const
void vectoredLoadFileToMemory(std::vector< RequestContext > &requestContext) const
boost::interprocess::named_semaphore * createNamedSemaphore(std::string const &path) const
static bool removeSemaphore(std::string const &name, bool remove=false)
std::map< std::string, std::string > retrieveHeaders(std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp=-1) const
CcdbApi()
Default constructor.
static bool getCCDBEntryHeaders(std::string const &url, std::string const &etag, std::vector< std::string > &headers, const std::string &agentID="")
static CCDBQuery * retrieveQueryInfo(TFile &)
static constexpr const char * CCDBQUERY_ENTRY
void truncate(std::string const &path) const
int storeAsBinaryFile(const char *buffer, size_t size, const std::string &fileName, const std::string &objectType, const std::string &path, const std::map< std::string, std::string > &metadata, long startValidityTimestamp, long endValidityTimestamp, std::vector< char >::size_type maxSize=0) const
static void parseCCDBHeaders(std::vector< std::string > const &headers, std::vector< std::string > &pfns, std::string &etag)
bool retrieveBlob(std::string const &path, std::string const &targetdir, std::map< std::string, std::string > const &metadata, long timestamp, bool preservePathStructure=true, std::string const &localFileName="snapshot.root", std::string const &createdNotAfter="", std::string const &createdNotBefore="", std::map< std::string, std::string > *headers=nullptr) const
int updateMetadata(std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp, std::string const &id="", long newEOV=0)
static constexpr const char * CCDBMETA_ENTRY
static constexpr const char * CCDBOBJECT_ENTRY
void navigateSourcesAndLoadFile(RequestContext &requestContext, int &fromSnapshot, size_t *requestCounter) const
std::vector< std::string > parseSubFolders(std::string const &reply) const
virtual ~CcdbApi()
Default destructor.
void setFileName(const std::string &nm)
const std::string & getObjectType() const
void setObjectType(const std::string &tp)
const std::string & getFileName() const
void add(CCDBSemaphore const *ptr)
void remove(CCDBSemaphore const *ptr)
SemaphoreRegistry()=default
GLdouble GLdouble GLdouble GLdouble top
GLsizei const GLfloat * value
GLsizei const GLchar *const * path
GLbitfield GLuint64 timeout
GLenum GLuint GLenum GLsizei const GLchar * buf
information complementary to a CCDB object (path, metadata, startTimeValidity, endTimeValidity etc)
bool stdmap_to_jsonfile(std::map< std::string, std::string > const &meta, std::string const &filename)
size_t write_data(void *, size_t size, size_t nmemb, void *)
long getCurrentTimestamp()
returns the timestamp in long corresponding to "now"
size_t(*)(void *, size_t, size_t, void *) CurlWriteCallback
std::string sanitizeObjectName(const std::string &objectName)
bool jsonfile_to_stdmap(std::map< std::string, std::string > &meta, std::string const &filename)
long getFutureTimestamp(int secondsInFuture)
returns the timestamp in long corresponding to "now + secondsInFuture"
size_t CurlWrite_CallbackFunc_StdString2(void *contents, size_t size, size_t nmemb, std::string *s)
std::string timestamp() noexcept
Defining PrimaryVertex explicitly as messageable.
std::vector< T, fair::mq::pmr::polymorphic_allocator< T > > vector
void createDirectoriesIfAbsent(std::string const &path)
Defining DataPointCompositeObject explicitly as copiable.
std::string to_string(gsl::span< T, Size > span)
std::map< std::string, std::string > const & metadata
std::string createdNotAfter
std::map< std::string, std::string > & headers
o2::pmr::vector< char > & dest
std::string createdNotBefore
static DeploymentMode deploymentMode()
static std::string getClassName(const T &obj)
get the class name of the object
static std::unique_ptr< FileImage > createFileImage(const TObject &obj, const std::string &fileName, const std::string &objName)
static void trim(std::string &s)
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
static std::string concat_string(Ts const &... ts)
static bool endsWith(const std::string &s, const std::string &ending)
static std::string getRandomString(int length)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))
uint64_t const void const *restrict const msg