32#include <TStreamerInfo.h>
36#include <fairlogger/Logger.h>
42#include <boost/algorithm/string.hpp>
45#include <boost/interprocess/sync/named_semaphore.hpp>
49#include <TAlienUserAgent.h>
50#include <unordered_set>
51#include "rapidjson/document.h"
52#include "rapidjson/writer.h"
53#include "rapidjson/stringbuffer.h"
61unique_ptr<TJAlienCredentials> CcdbApi::mJAlienCredentials =
nullptr;
76 boost::interprocess::named_semaphore* mSem =
nullptr;
77 std::string mSemName{};
92 std::unordered_set<CCDBSemaphore const*> mStore;
102 mIsCCDBDownloaderPreferred = 0;
103 if (deploymentMode == DeploymentMode::OnlineDDS && deploymentMode == DeploymentMode::OnlineECS && deploymentMode == DeploymentMode::OnlineAUX && deploymentMode == DeploymentMode::FST) {
104 mIsCCDBDownloaderPreferred = 1;
106 if (getenv(
"ALICEO2_ENABLE_MULTIHANDLE_CCDBAPI")) {
107 mIsCCDBDownloaderPreferred = atoi(getenv(
"ALICEO2_ENABLE_MULTIHANDLE_CCDBAPI"));
114 curl_global_cleanup();
118void CcdbApi::setUniqueAgentID()
120 mUniqueAgentID = TAlienUserAgent::BasedOnEnvironment().ToString();
126 LOG(
debug) <<
"On macOS we simply rely on TGrid::Connect(\"alien\").";
129 if (getenv(
"ALICEO2_CCDB_NOTOKENCHECK") && atoi(getenv(
"ALICEO2_CCDB_NOTOKENCHECK"))) {
132 if (getenv(
"JALIEN_TOKEN_CERT")) {
135 auto returncode = system(
"LD_PRELOAD= alien-token-info &> /dev/null");
136 if (returncode == -1) {
139 return returncode == 0;
142void CcdbApi::curlInit()
145 curl_global_init(CURL_GLOBAL_DEFAULT);
146 CcdbApi::mJAlienCredentials = std::make_unique<TJAlienCredentials>();
147 CcdbApi::mJAlienCredentials->loadCredentials();
148 CcdbApi::mJAlienCredentials->selectPreferedCredentials();
151 if (getenv(
"ALICEO2_CCDB_SOCKET_TIMEOUT")) {
152 auto timeoutMS = atoi(getenv(
"ALICEO2_CCDB_SOCKET_TIMEOUT"));
153 if (timeoutMS >= 0) {
154 LOG(info) <<
"Setting socket timeout to " << timeoutMS <<
" milliseconds";
163 throw std::invalid_argument(
"Empty url passed CcdbApi, cannot initialize. Aborting.");
168 constexpr const char* SNAPSHOTPREFIX =
"file://";
171 if (host.substr(0, 7).compare(SNAPSHOTPREFIX) == 0) {
172 auto path = host.substr(7);
173 initInSnapshotMode(
path);
194 std::string snapshotReport{};
195 const char* cachedir = getenv(
"ALICEO2_CCDB_LOCALCACHE");
196 namespace fs = std::filesystem;
198 if (cachedir[0] == 0) {
199 mSnapshotCachePath = fs::weakly_canonical(fs::absolute(
"."));
201 mSnapshotCachePath = fs::weakly_canonical(fs::absolute(cachedir));
203 snapshotReport = fmt::format(
"(cache snapshots to dir={}", mSnapshotCachePath);
206 mPreferSnapshotCache =
true;
207 if (mSnapshotCachePath.empty()) {
208 LOGP(fatal,
"IGNORE_VALIDITYCHECK_OF_CCDB_LOCALCACHE is defined but the ALICEO2_CCDB_LOCALCACHE is not");
210 snapshotReport +=
", prefer if available";
212 if (!snapshotReport.empty()) {
213 snapshotReport +=
')';
216 mNeedAlienToken = (host.find(
"https://") != std::string::npos) || (host.find(
"alice-ccdb.cern.ch") != std::string::npos);
219 if (getenv(
"ALICEO2_CCDB_CURL_TIMEOUT_DOWNLOAD")) {
220 auto timeout = atoi(getenv(
"ALICEO2_CCDB_CURL_TIMEOUT_DOWNLOAD"));
222 mCurlTimeoutDownload =
timeout;
229 mCurlTimeoutDownload = 15;
232 mCurlTimeoutDownload = 15;
234 mCurlTimeoutDownload = 5;
238 if (getenv(
"ALICEO2_CCDB_CURL_TIMEOUT_UPLOAD")) {
239 auto timeout = atoi(getenv(
"ALICEO2_CCDB_CURL_TIMEOUT_UPLOAD"));
248 mCurlTimeoutUpload = 3;
251 mCurlTimeoutUpload = 20;
253 mCurlTimeoutUpload = 20;
260 LOGP(
debug,
"Curl timeouts are set to: download={:2}, upload={:2} seconds", mCurlTimeoutDownload, mCurlTimeoutUpload);
262 LOGP(info,
"Init CcdApi with UserAgentID: {}, Host: {}{}, Curl timeouts: upload:{} download:{}", mUniqueAgentID, host,
263 mInSnapshotMode ?
"(snapshot readonly mode)" : snapshotReport.c_str(), mCurlTimeoutUpload, mCurlTimeoutDownload);
272void CcdbApi::updateMetaInformationInLocalFile(std::string
const&
filename, std::map<std::string, std::string>
const* headers,
CCDBQuery const* querysummary)
274 std::lock_guard<std::mutex> guard(
gIOMutex);
275 auto oldlevel = gErrorIgnoreLevel;
276 gErrorIgnoreLevel = 6001;
277 TFile snapshotfile(
filename.c_str(),
"UPDATE");
279 if (!snapshotfile.IsZombie()) {
281 snapshotfile.WriteObjectAny(querysummary, TClass::GetClass(
typeid(*querysummary)),
CCDBQUERY_ENTRY);
284 snapshotfile.WriteObjectAny(headers, TClass::GetClass(
typeid(*headers)),
CCDBMETA_ENTRY);
286 snapshotfile.Write();
287 snapshotfile.Close();
289 gErrorIgnoreLevel = oldlevel;
299 std::string tmpObjectName = objectName;
300 tmpObjectName.erase(std::remove_if(tmpObjectName.begin(), tmpObjectName.end(),
301 [](
auto const&
c) ->
bool { return (!std::isalnum(c) && c !=
'_' && c !=
'/' && c !=
'.'); }),
302 tmpObjectName.end());
303 return tmpObjectName;
310 std::lock_guard<std::mutex> guard(
gIOMutex);
324 std::string className = rootObject->GetName();
330 std::lock_guard<std::mutex> guard(
gIOMutex);
335 std::map<std::string, std::string>
const& metadata,
336 long startValidityTimestamp,
long endValidityTimestamp,
337 std::vector<char>::size_type
maxSize)
const
341 LOGP(error,
"nullptr is provided for object {}/{}/{}",
path, startValidityTimestamp, endValidityTimestamp);
347 path, metadata, startValidityTimestamp, endValidityTimestamp,
maxSize);
351 const std::string&
path,
const std::map<std::string, std::string>& metadata,
352 long startValidityTimestamp,
long endValidityTimestamp, std::vector<char>::size_type
maxSize)
const
355 LOGP(alarm,
"Object will not be uploaded to {} since its size {} exceeds max allowed {}",
path,
size,
maxSize);
361 long sanitizedStartValidityTimestamp = startValidityTimestamp;
362 if (startValidityTimestamp == -1) {
363 LOGP(info,
"Start of Validity not set, current timestamp used.");
366 long sanitizedEndValidityTimestamp = endValidityTimestamp;
367 if (endValidityTimestamp == -1) {
368 LOGP(info,
"End of Validity not set, start of validity plus 1 day used.");
371 if (mInSnapshotMode) {
373 LOGP(alarm,
"Snapshot mode does not support headers-only upload");
376 auto pthLoc = getSnapshotDir(mSnapshotTopPath,
path);
378 auto flLoc = getSnapshotFile(mSnapshotTopPath,
path,
filename);
380 auto pent = flLoc.find_last_of(
'.');
381 if (pent == std::string::npos) {
384 flLoc.insert(pent, fmt::format(
"_{}_{}", startValidityTimestamp, endValidityTimestamp));
385 ofstream outf(flLoc.c_str(), ios::out | ios::binary);
389 throw std::runtime_error(fmt::format(
"Failed to write local CCDB file {}", flLoc));
391 std::map<std::string, std::string> metaheader(metadata);
393 metaheader[
"Valid-From"] =
std::to_string(startValidityTimestamp);
395 updateMetaInformationInLocalFile(flLoc.c_str(), &metaheader);
396 std::string metaStr{};
397 for (
const auto& mentry : metadata) {
398 metaStr += fmt::format(
"{}={};", mentry.first, mentry.second);
400 metaStr +=
"$USER_META;";
401 LOGP(info,
"Created local snapshot {}", flLoc);
402 LOGP(info, R
"(Upload with: o2-ccdb-upload --host "$ccdbhost" -p {} -f {} -k {} --starttimestamp {} --endtimestamp {} -m "{}")",
409 CURL* curl =
nullptr;
410 curl = curl_easy_init();
413 checkMetadataKeys(metadata);
415 if (curl !=
nullptr) {
416 auto mime = curl_mime_init(curl);
417 auto field = curl_mime_addpart(mime);
418 curl_mime_name(field,
"send");
420 curl_mime_filedata(field,
filename.c_str());
425 curl_mime_data(field,
"", 0);
428 struct curl_slist* headerlist =
nullptr;
429 static const char buf[] =
"Expect:";
430 headerlist = curl_slist_append(headerlist,
buf);
434 curl_easy_setopt(curl, CURLOPT_MIMEPOST, mime);
435 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headerlist);
436 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
437 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
438 curl_easy_setopt(curl, CURLOPT_TIMEOUT, mCurlTimeoutUpload);
440 CURLcode
res = CURL_LAST;
442 for (
size_t hostIndex = 0; hostIndex < hostsPool.size() &&
res > 0; hostIndex++) {
443 std::string fullUrl = getFullUrlForStorage(curl,
path, objectType, metadata, sanitizedStartValidityTimestamp, sanitizedEndValidityTimestamp, hostIndex);
444 LOG(debug3) <<
"Full URL Encoded: " << fullUrl;
446 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.c_str());
449 res = CURL_perform(curl);
451 if (
res != CURLE_OK) {
452 if (
res == CURLE_OPERATION_TIMEDOUT) {
453 LOGP(alarm,
"curl_easy_perform() timed out. Consider increasing the timeout using the env var `ALICEO2_CCDB_CURL_TIMEOUT_UPLOAD` (seconds), current one is {}", mCurlTimeoutUpload);
455 LOGP(alarm,
"curl_easy_perform() failed: {}", curl_easy_strerror(
res));
462 curl_easy_cleanup(curl);
465 curl_slist_free_all(headerlist);
467 curl_mime_free(mime);
469 LOGP(alarm,
"curl initialization failure");
476 long startValidityTimestamp,
long endValidityTimestamp, std::vector<char>::size_type
maxSize)
const
480 LOGP(error,
"nullptr is provided for object {}/{}/{}",
path, startValidityTimestamp, endValidityTimestamp);
488std::string CcdbApi::getFullUrlForStorage(CURL* curl,
const std::string&
path,
const std::string& objtype,
489 const std::map<std::string, std::string>& metadata,
490 long startValidityTimestamp,
long endValidityTimestamp,
int hostIndex)
const
493 std::string startValidityString = getTimestampString(startValidityTimestamp < 0 ?
getCurrentTimestamp() : startValidityTimestamp);
494 std::string endValidityString = getTimestampString(endValidityTimestamp < 0 ?
getFutureTimestamp(60 * 60 * 24 * 1) : endValidityTimestamp);
496 std::string
url = getHostUrl(hostIndex);
498 std::string fullUrl =
url +
"/" +
path +
"/" + startValidityString +
"/" + endValidityString +
"/";
501 char* objtypeEncoded = curl_easy_escape(curl, objtype.c_str(), objtype.size());
502 fullUrl +=
"ObjectType=" + std::string(objtypeEncoded) +
"/";
503 curl_free(objtypeEncoded);
505 for (
auto& kv : metadata) {
506 std::string mfirst = kv.first;
507 std::string msecond = kv.second;
509 char* mfirstEncoded = curl_easy_escape(curl, mfirst.c_str(), mfirst.size());
510 char* msecondEncoded = curl_easy_escape(curl, msecond.c_str(), msecond.size());
511 fullUrl += std::string(mfirstEncoded) +
"=" + std::string(msecondEncoded) +
"/";
512 curl_free(mfirstEncoded);
513 curl_free(msecondEncoded);
519std::string CcdbApi::getFullUrlForRetrieval(CURL* curl,
const std::string&
path,
const std::map<std::string, std::string>& metadata,
long timestamp,
int hostIndex)
const
521 if (mInSnapshotMode) {
522 return getSnapshotFile(mSnapshotTopPath,
path);
528 std::string hostUrl = getHostUrl(hostIndex);
530 std::string fullUrl = hostUrl +
"/" +
path +
"/" + validityString +
"/";
532 for (
auto& kv : metadata) {
533 std::string mfirst = kv.first;
534 std::string msecond = kv.second;
536 char* mfirstEncoded = curl_easy_escape(curl, mfirst.c_str(), mfirst.size());
537 char* msecondEncoded = curl_easy_escape(curl, msecond.c_str(), msecond.size());
538 fullUrl += std::string(mfirstEncoded) +
"=" + std::string(msecondEncoded) +
"/";
539 curl_free(mfirstEncoded);
540 curl_free(msecondEncoded);
562static size_t WriteMemoryCallback(
void* contents,
size_t size,
size_t nmemb,
void* userp)
564 size_t realsize =
size * nmemb;
567 mem->memory = (
char*)realloc(mem->memory, mem->size + realsize + 1);
568 if (mem->memory ==
nullptr) {
569 printf(
"not enough memory (realloc returned NULL)\n");
573 memcpy(&(mem->memory[mem->size]), contents, realsize);
574 mem->size += realsize;
575 mem->memory[mem->size] = 0;
591static size_t WriteToFileCallback(
void*
ptr,
size_t size,
size_t nmemb, FILE*
stream)
604static CURLcode ssl_ctx_callback(CURL*,
void*,
void* parm)
606 std::string
msg((
const char*)parm);
609 if (
msg.length() > 0 &&
end == -1) {
611 }
else if (
end > 0) {
623 CredentialsKind cmk = mJAlienCredentials->getPreferedCredentials();
626 if (cmk == cNOT_FOUND) {
630 TJAlienCredentialsObject cmo = mJAlienCredentials->get(cmk);
632 char* CAPath = getenv(
"X509_CERT_DIR");
634 curl_easy_setopt(curl_handle, CURLOPT_CAPATH, CAPath);
636 curl_easy_setopt(curl_handle, CURLOPT_CAINFO,
nullptr);
637 curl_easy_setopt(curl_handle, CURLOPT_SSLCERT, cmo.certpath.c_str());
638 curl_easy_setopt(curl_handle, CURLOPT_SSLKEY, cmo.keypath.c_str());
641 curl_easy_setopt(curl_handle, CURLOPT_SSL_CTX_FUNCTION, ssl_ctx_callback);
642 curl_easy_setopt(curl_handle, CURLOPT_SSL_CTX_DATA, mJAlienCredentials->getMessages().c_str());
649void CcdbApi::initCurlOptionsForRetrieve(CURL* curlHandle,
void* chunk,
CurlWriteCallback writeCallback,
bool followRedirect)
const
651 curl_easy_setopt(curlHandle, CURLOPT_WRITEFUNCTION, writeCallback);
652 curl_easy_setopt(curlHandle, CURLOPT_WRITEDATA, chunk);
653 curl_easy_setopt(curlHandle, CURLOPT_FOLLOWLOCATION, followRedirect ? 1L : 0L);
658template <
typename MapType = std::map<std::
string, std::
string>>
659size_t header_map_callback(
char*
buffer,
size_t size,
size_t nitems,
void* userdata)
661 auto* headers =
static_cast<MapType*
>(userdata);
662 auto header = std::string(
buffer,
size * nitems);
663 std::string::size_type
index = header.find(
':', 0);
664 if (
index != std::string::npos) {
665 const auto key = boost::algorithm::trim_copy(header.substr(0,
index));
666 const auto value = boost::algorithm::trim_copy(header.substr(
index + 1));
667 LOGP(
debug,
"Adding #{} {} -> {}", headers->size(),
key,
value);
669 if (
key ==
"Content-Length") {
670 auto cl = headers->find(
"Content-Length");
671 if (cl != headers->end()) {
672 if (std::stol(cl->second) < stol(
value)) {
682 auto cl = headers->find(
"ETag");
683 if (cl != headers->end()) {
689 if (
key ==
"Content-Type") {
690 auto cl = headers->find(
"Content-Type");
691 if (cl != headers->end()) {
697 headers->insert(std::make_pair(
key,
value));
700 return size * nitems;
704void CcdbApi::initCurlHTTPHeaderOptionsForRetrieve(CURL* curlHandle, curl_slist*& option_list,
long timestamp, std::map<std::string, std::string>* headers, std::string
const&
etag,
709 option_list = curl_slist_append(option_list, (
"If-None-Match: " +
etag).c_str());
713 option_list = curl_slist_append(option_list, (
"If-Not-After: " +
createdNotAfter).c_str());
717 option_list = curl_slist_append(option_list, (
"If-Not-Before: " +
createdNotBefore).c_str());
720 if (headers !=
nullptr) {
721 option_list = curl_slist_append(option_list, (
"If-None-Match: " +
to_string(timestamp)).c_str());
722 curl_easy_setopt(curlHandle, CURLOPT_HEADERFUNCTION, header_map_callback<>);
723 curl_easy_setopt(curlHandle, CURLOPT_HEADERDATA, headers);
727 curl_easy_setopt(curlHandle, CURLOPT_HTTPHEADER, option_list);
730 curl_easy_setopt(curlHandle, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
733bool CcdbApi::receiveToFile(FILE* fileHandle, std::string
const&
path, std::map<std::string, std::string>
const& metadata,
734 long timestamp, std::map<std::string, std::string>* headers, std::string
const&
etag,
740bool CcdbApi::receiveToMemory(
void* chunk, std::string
const&
path, std::map<std::string, std::string>
const& metadata,
741 long timestamp, std::map<std::string, std::string>* headers, std::string
const&
etag,
747bool CcdbApi::receiveObject(
void* dataHolder, std::string
const&
path, std::map<std::string, std::string>
const& metadata,
748 long timestamp, std::map<std::string, std::string>* headers, std::string
const&
etag,
753 curlHandle = curl_easy_init();
754 curl_easy_setopt(curlHandle, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
756 if (curlHandle !=
nullptr) {
759 initCurlOptionsForRetrieve(curlHandle, dataHolder, writeCallback, followRedirect);
760 curl_slist* option_list =
nullptr;
763 long responseCode = 0;
764 CURLcode curlResultCode = CURL_LAST;
766 for (
size_t hostIndex = 0; hostIndex < hostsPool.size() && (responseCode >= 400 || curlResultCode > 0); hostIndex++) {
767 std::string fullUrl = getFullUrlForRetrieval(curlHandle,
path, metadata, timestamp, hostIndex);
768 curl_easy_setopt(curlHandle, CURLOPT_URL, fullUrl.c_str());
770 curlResultCode = CURL_perform(curlHandle);
772 if (curlResultCode != CURLE_OK) {
773 LOGP(alarm,
"curl_easy_perform() failed: {}", curl_easy_strerror(curlResultCode));
775 curlResultCode = curl_easy_getinfo(curlHandle, CURLINFO_RESPONSE_CODE, &responseCode);
776 if ((curlResultCode == CURLE_OK) && (responseCode < 300)) {
777 curl_slist_free_all(option_list);
778 curl_easy_cleanup(curlHandle);
781 if (curlResultCode != CURLE_OK) {
782 LOGP(alarm,
"invalid URL {}", fullUrl);
784 LOGP(alarm,
"not found under link {}", fullUrl);
790 curl_slist_free_all(option_list);
791 curl_easy_cleanup(curlHandle);
797 long timestamp)
const
805 bool res = receiveToMemory((
void*)&chunk,
path, metadata, timestamp);
808 std::lock_guard<std::mutex> guard(
gIOMutex);
810 mess.SetBuffer(chunk.
memory, chunk.
size, kFALSE);
815 LOGP(info,
"couldn't retrieve the object {}",
path);
827 std::string
str = inp;
828 str.erase(std::remove_if(
str.begin(),
str.end(), ::isspace),
str.end());
829 str = std::regex_replace(
str, std::regex(
"::"),
"-");
835 long timestamp, std::map<std::string, std::string>* headers, std::string
const&
etag,
842 long timestamp,
bool preservePath, std::string
const& localFileName, std::string
const&
createdNotAfter, std::string
const&
createdNotBefore, std::map<std::string, std::string>* outHeaders)
const
846 std::string fulltargetdir = targetdir + (preservePath ? (
'/' +
path) :
"");
850 }
catch (std::exception e) {
851 LOGP(error,
"Could not create local snapshot cache directory {}, reason: {}", fulltargetdir, e.what());
856 std::map<std::string, std::string> headers;
859 if ((headers.count(
"Error") != 0) || (buff.empty())) {
860 LOGP(error,
"Unable to find object {}/{}, Aborting",
path, timestamp);
864 auto getFileName = [&headers]() {
865 auto& s = headers[
"Content-Disposition"];
867 std::regex re(
"(.*;)filename=\"(.*)\"");
869 if (std::regex_match(s.c_str(),
m, re)) {
873 std::string backupname(
"ccdb-blob.bin");
874 LOG(error) <<
"Cannot determine original filename from Content-Disposition ... falling back to " << backupname;
877 auto filename = localFileName.size() > 0 ? localFileName : getFileName();
878 std::string targetpath = fulltargetdir +
"/" +
filename;
880 std::ofstream objFile(targetpath, std::ios::out | std::ofstream::binary);
881 std::copy(buff.begin(), buff.end(), std::ostreambuf_iterator<char>(objFile));
882 if (!objFile.good()) {
883 LOGP(error,
"Unable to open local file {}, Aborting", targetpath);
889 updateMetaInformationInLocalFile(targetpath.c_str(), &headers, &querysummary);
891 *outHeaders = std::move(headers);
896void CcdbApi::snapshot(std::string
const& ccdbrootpath, std::string
const& localDir,
long timestamp)
const
900 std::map<std::string, std::string> metadata;
901 for (
auto& folder : allfolders) {
911 auto object =
file.GetObjectChecked(what, cl);
915 std::string objectName(cl->GetName());
917 object =
file.GetObjectChecked(objectName.c_str(), cl);
918 LOG(warn) <<
"Did not find object under expected name " << what;
922 LOG(warn) <<
"Found object under deprecated name " << cl->GetName();
927 if (cl->InheritsFrom(
"TObject")) {
930 auto tree =
dynamic_cast<TTree*
>((
TObject*)
object);
932 tree->LoadBaskets(0x1L << 32);
933 tree->SetDirectory(
nullptr);
936 auto h =
dynamic_cast<TH1*
>((
TObject*)
object);
938 h->SetDirectory(
nullptr);
946void* CcdbApi::extractFromLocalFile(std::string
const&
filename, std::type_info
const& tinfo, std::map<std::string, std::string>* headers)
const
948 if (!std::filesystem::exists(
filename)) {
949 LOG(error) <<
"Local snapshot " <<
filename <<
" not found \n";
952 std::lock_guard<std::mutex> guard(
gIOMutex);
953 auto tcl = tinfo2TClass(tinfo);
958 *headers = *storedmeta;
961 if ((
isSnapshotMode() || mPreferSnapshotCache) && headers->find(
"ETag") == headers->end()) {
964 if (headers->find(
"fileSize") == headers->end()) {
965 (*headers)[
"fileSize"] = fmt::format(
"{}",
f.GetEND());
971bool CcdbApi::initTGrid()
const
973 if (mNeedAlienToken && !gGrid) {
974 static bool allowNoToken = getenv(
"ALICEO2_CCDB_NOTOKENCHECK") && atoi(getenv(
"ALICEO2_CCDB_NOTOKENCHECK"));
976 LOG(fatal) <<
"Alien Token Check failed - Please get an alien token before running with https CCDB endpoint, or alice-ccdb.cern.ch!";
978 TGrid::Connect(
"alien");
979 static bool errorShown =
false;
980 if (!gGrid && errorShown ==
false) {
982 LOG(error) <<
"TGrid::Connect returned nullptr. May be due to missing alien token";
984 LOG(fatal) <<
"TGrid::Connect returned nullptr. May be due to missing alien token";
989 return gGrid !=
nullptr;
992void* CcdbApi::downloadFilesystemContent(std::string
const&
url, std::type_info
const& tinfo, std::map<std::string, std::string>* headers)
const
994 if ((
url.find(
"alien:/", 0) != std::string::npos) && !initTGrid()) {
997 std::lock_guard<std::mutex> guard(
gIOMutex);
998 auto memfile = TMemFile::Open(
url.c_str(),
"OPEN");
1000 auto cl = tinfo2TClass(tinfo);
1002 if (headers && headers->find(
"fileSize") == headers->end()) {
1003 (*headers)[
"fileSize"] = fmt::format(
"{}", memfile->GetEND());
1011void* CcdbApi::interpretAsTMemFileAndExtract(
char* contentptr,
size_t contentsize, std::type_info
const& tinfo)
1014 Int_t previousErrorLevel = gErrorIgnoreLevel;
1015 gErrorIgnoreLevel = kFatal;
1016 std::lock_guard<std::mutex> guard(
gIOMutex);
1017 TMemFile memFile(
"name", contentptr, contentsize,
"READ");
1018 gErrorIgnoreLevel = previousErrorLevel;
1019 if (!memFile.IsZombie()) {
1020 auto tcl = tinfo2TClass(tinfo);
1031void* CcdbApi::navigateURLsAndRetrieveContent(CURL* curl_handle, std::string
const&
url, std::type_info
const& tinfo, std::map<std::string, std::string>* headers)
const
1036 static thread_local std::multimap<std::string, std::string> headerData;
1039 if ((
url.find(
"alien:/", 0) != std::string::npos) || (
url.find(
"file:/", 0) != std::string::npos)) {
1040 return downloadFilesystemContent(
url, tinfo, headers);
1047 curl_easy_setopt(curl_handle, CURLOPT_URL,
url.c_str());
1049 MemoryStruct chunk{(
char*)malloc(1), 0};
1050 initCurlOptionsForRetrieve(curl_handle, (
void*)&chunk, WriteMemoryCallback,
false);
1052 curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, header_map_callback<
decltype(headerData)>);
1054 curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, (
void*)&headerData);
1058 auto res = CURL_perform(curl_handle);
1059 long response_code = -1;
1060 void* content =
nullptr;
1062 if (
res == CURLE_OK && curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &response_code) == CURLE_OK) {
1064 for (
auto& p : headerData) {
1065 (*headers)[
p.first] =
p.second;
1068 if (200 <= response_code && response_code < 300) {
1070 content = interpretAsTMemFileAndExtract(chunk.memory, chunk.size, tinfo);
1071 if (headers && headers->find(
"fileSize") == headers->end()) {
1072 (*headers)[
"fileSize"] = fmt::format(
"{}", chunk.size);
1074 }
else if (response_code == 304) {
1079 LOGP(
debug,
"Object exists but I am not serving it since it's already in your possession");
1082 else if (300 <= response_code && response_code < 400) {
1088 auto complement_Location = [
this](std::string
const& loc) {
1089 if (loc[0] ==
'/') {
1096 std::vector<std::string> locs;
1097 auto iter = headerData.find(
"Location");
1098 if (iter != headerData.end()) {
1099 locs.push_back(complement_Location(iter->second));
1102 auto iter2 = headerData.find(
"Content-Location");
1103 if (iter2 != headerData.end()) {
1104 auto range = headerData.equal_range(
"Content-Location");
1105 for (
auto it =
range.first; it !=
range.second; ++it) {
1106 if (std::find(locs.begin(), locs.end(), it->second) == locs.end()) {
1107 locs.push_back(complement_Location(it->second));
1111 for (
auto& l : locs) {
1113 LOG(
debug) <<
"Trying content location " << l;
1114 content = navigateURLsAndRetrieveContent(curl_handle, l, tinfo, headers);
1120 }
else if (response_code == 404) {
1121 LOG(error) <<
"Requested resource does not exist: " <<
url;
1124 LOG(error) <<
"Error in fetching object " <<
url <<
", curl response code:" << response_code;
1128 if (chunk.memory !=
nullptr) {
1132 LOGP(alarm,
"Curl request to {} failed with result {}, response code: {}",
url,
int(
res), response_code);
1137 (*headers)[
"Error"] =
"An error occurred during retrieval";
1143 std::map<std::string, std::string>
const& metadata,
long timestamp,
1144 std::map<std::string, std::string>* headers, std::string
const&
etag,
1147 if (!mSnapshotCachePath.empty()) {
1149 auto semaphore_barrier = std::make_unique<CCDBSemaphore>(mSnapshotCachePath,
path);
1150 std::string logfile = mSnapshotCachePath +
"/log";
1151 std::fstream out(logfile, ios_base::out | ios_base::app);
1152 if (out.is_open()) {
1153 out <<
"CCDB-access[" << getpid() <<
"] of " << mUniqueAgentID <<
" to " <<
path <<
" timestamp " << timestamp <<
"\n";
1155 auto snapshotfile = getSnapshotFile(mSnapshotCachePath,
path);
1156 bool snapshoting =
false;
1157 if (!std::filesystem::exists(snapshotfile)) {
1159 out <<
"CCDB-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
" downloading to snapshot " << snapshotfile <<
"\n";
1162 out <<
"CCDB-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
" failed to create directory for " << snapshotfile <<
"\n";
1165 out <<
"CCDB-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
"serving from local snapshot " << snapshotfile <<
"\n";
1168 auto res = extractFromLocalFile(snapshotfile, tinfo, headers);
1170 logReading(
path, timestamp, headers,
"retrieve from snapshot");
1177 CURL* curl_handle = curl_easy_init();
1178 curl_easy_setopt(curl_handle, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1179 std::string fullUrl = getFullUrlForRetrieval(curl_handle,
path, metadata, timestamp);
1181 if (mInSnapshotMode) {
1182 auto res = extractFromLocalFile(fullUrl, tinfo, headers);
1184 logReading(
path, timestamp, headers,
"retrieve from snapshot");
1189 curl_slist* option_list =
nullptr;
1191 auto content = navigateURLsAndRetrieveContent(curl_handle, fullUrl, tinfo, headers);
1193 for (
size_t hostIndex = 1; hostIndex < hostsPool.size() && !(content); hostIndex++) {
1194 fullUrl = getFullUrlForRetrieval(curl_handle,
path, metadata, timestamp, hostIndex);
1195 content = navigateURLsAndRetrieveContent(curl_handle, fullUrl, tinfo, headers);
1198 logReading(
path, timestamp, headers,
"retrieve");
1200 curl_slist_free_all(option_list);
1201 curl_easy_cleanup(curl_handle);
1207 size_t newLength =
size * nmemb;
1208 size_t oldLength = s->size();
1210 s->resize(oldLength + newLength);
1211 }
catch (std::bad_alloc& e) {
1212 LOG(error) <<
"memory error when getting data from CCDB";
1216 std::copy((
char*)contents, (
char*)contents + newLength, s->begin() + oldLength);
1217 return size * nmemb;
1223 CURLcode
res = CURL_LAST;
1226 curl = curl_easy_init();
1227 if (curl !=
nullptr) {
1229 curl_easy_setopt(curl, CURLOPT_WRITEDATA, &
result);
1230 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1232 struct curl_slist* headers =
nullptr;
1233 headers = curl_slist_append(headers, (std::string(
"Accept: ") + returnFormat).c_str());
1234 headers = curl_slist_append(headers, (std::string(
"Content-Type: ") + returnFormat).c_str());
1241 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
1245 std::string fullUrl;
1247 for (
size_t hostIndex = 0; hostIndex < hostsPool.size() &&
res != CURLE_OK; hostIndex++) {
1248 fullUrl = getHostUrl(hostIndex);
1249 fullUrl += latestOnly ?
"/latest/" :
"/browse/";
1251 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.c_str());
1253 res = CURL_perform(curl);
1254 if (
res != CURLE_OK) {
1255 LOGP(alarm,
"CURL_perform() failed: {}", curl_easy_strerror(
res));
1258 curl_slist_free_all(headers);
1259 curl_easy_cleanup(curl);
1265std::string CcdbApi::getTimestampString(
long timestamp)
const
1276 stringstream fullUrl;
1279 curl = curl_easy_init();
1280 if (curl !=
nullptr) {
1281 curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST,
"DELETE");
1282 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1285 for (
size_t hostIndex = 0; hostIndex < hostsPool.size(); hostIndex++) {
1286 fullUrl << getHostUrl(hostIndex) <<
"/" <<
path <<
"/" << timestampLocal;
1287 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.str().c_str());
1290 res = CURL_perform(curl);
1291 if (
res != CURLE_OK) {
1292 LOGP(alarm,
"CURL_perform() failed: {}", curl_easy_strerror(
res));
1294 curl_easy_cleanup(curl);
1303 stringstream fullUrl;
1304 for (
size_t i = 0;
i < hostsPool.size();
i++) {
1305 std::string
url = getHostUrl(
i);
1306 fullUrl <<
url <<
"/truncate/" <<
path;
1308 curl = curl_easy_init();
1309 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1310 if (curl !=
nullptr) {
1311 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.str().c_str());
1316 res = CURL_perform(curl);
1317 if (
res != CURLE_OK) {
1318 LOGP(alarm,
"CURL_perform() failed: {}", curl_easy_strerror(
res));
1320 curl_easy_cleanup(curl);
1327 return size * nmemb;
1333 CURLcode
res = CURL_LAST;
1336 curl = curl_easy_init();
1337 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1339 for (
size_t hostIndex = 0; hostIndex < hostsPool.size() &&
res != CURLE_OK; hostIndex++) {
1340 curl_easy_setopt(curl, CURLOPT_URL, mUrl.data());
1341 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
write_data);
1343 res = CURL_perform(curl);
1348 curl_easy_cleanup(curl);
1357 std::stringstream ss(reply.c_str());
1359 std::vector<std::string> folders;
1361 size_t numberoflines = std::count(reply.begin(), reply.end(),
'\n');
1362 bool inSubFolderSection =
false;
1364 for (
size_t linenumber = 0; linenumber < numberoflines; ++linenumber) {
1365 std::getline(ss, line);
1366 if (inSubFolderSection && line.size() > 0) {
1371 if (line.compare(
"Subfolders:") == 0) {
1372 inSubFolderSection =
true;
1380size_t header_callback(
char*
buffer,
size_t size,
size_t nitems,
void* userdata)
1382 auto* headers =
static_cast<std::vector<std::string>*
>(userdata);
1383 auto header = std::string(
buffer,
size * nitems);
1384 headers->emplace_back(std::string(header.data()));
1385 return size * nitems;
1393 auto p = std::filesystem::path(
filename).parent_path();
1394 if (!std::filesystem::exists(p)) {
1395 std::filesystem::create_directories(p);
1398 rapidjson::StringBuffer
buffer;
1399 rapidjson::Writer<rapidjson::StringBuffer> writer(
buffer);
1400 writer.StartObject();
1401 for (
const auto& pair : meta) {
1402 writer.Key(pair.first.c_str());
1403 writer.String(pair.second.c_str());
1409 if (
file.is_open()) {
1422 if (!
file.is_open()) {
1423 std::cerr <<
"Failed to open file for reading." << std::endl;
1427 std::string jsonStr((std::istreambuf_iterator<char>(
file)), std::istreambuf_iterator<char>());
1430 rapidjson::Document document;
1431 document.Parse(jsonStr.c_str());
1433 if (document.HasParseError()) {
1434 std::cerr <<
"Error parsing JSON" << std::endl;
1439 for (
auto itr = document.MemberBegin(); itr != document.MemberEnd(); ++itr) {
1440 meta[itr->name.GetString()] = itr->value.GetString();
1445std::map<std::string, std::string>
CcdbApi::retrieveHeaders(std::string
const&
path, std::map<std::string, std::string>
const& metadata,
long timestamp)
const
1448 auto do_remote_header_call = [
this, &
path, &metadata, timestamp]() -> std::map<std::string, std::string> {
1449 CURL* curl = curl_easy_init();
1450 CURLcode
res = CURL_LAST;
1451 std::string fullUrl = getFullUrlForRetrieval(curl,
path, metadata, timestamp);
1452 std::map<std::string, std::string> headers;
1454 if (curl !=
nullptr) {
1455 struct curl_slist*
list =
nullptr;
1458 curl_easy_setopt(curl, CURLOPT_HTTPHEADER,
list);
1461 curl_easy_setopt(curl, CURLOPT_NOBODY, 1L);
1462 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
1463 curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, header_map_callback<>);
1464 curl_easy_setopt(curl, CURLOPT_HEADERDATA, &headers);
1465 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1470 long httpCode = 404;
1471 CURLcode getCodeRes = CURL_LAST;
1472 for (
size_t hostIndex = 0; hostIndex < hostsPool.size() && (httpCode >= 400 ||
res > 0 || getCodeRes > 0); hostIndex++) {
1473 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.c_str());
1474 res = CURL_perform(curl);
1475 if (
res != CURLE_OK &&
res != CURLE_UNSUPPORTED_PROTOCOL) {
1479 LOG(error) <<
"CURL_perform() failed: " << curl_easy_strerror(
res);
1481 getCodeRes = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode);
1483 if (httpCode == 404) {
1486 curl_easy_cleanup(curl);
1491 if (!mSnapshotCachePath.empty()) {
1493 auto semaphore_barrier = std::make_unique<CCDBSemaphore>(mSnapshotCachePath + std::string(
"_headers"),
path);
1495 std::string logfile = mSnapshotCachePath +
"/log";
1496 std::fstream out(logfile, ios_base::out | ios_base::app);
1497 if (out.is_open()) {
1498 out <<
"CCDB-header-access[" << getpid() <<
"] of " << mUniqueAgentID <<
" to " <<
path <<
" timestamp " << timestamp <<
"\n";
1500 auto snapshotfile = getSnapshotFile(mSnapshotCachePath,
path +
"/" +
std::to_string(timestamp),
"header.json");
1501 if (!std::filesystem::exists(snapshotfile)) {
1502 out <<
"CCDB-header-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
" storing to snapshot " << snapshotfile <<
"\n";
1505 auto meta = do_remote_header_call();
1509 LOG(warn) <<
"Failed to cache the header information to disc";
1513 out <<
"CCDB-header-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
"serving from local snapshot " << snapshotfile <<
"\n";
1514 std::map<std::string, std::string> meta;
1516 LOG(warn) <<
"Failed to read cached information from disc";
1517 return do_remote_header_call();
1522 return do_remote_header_call();
1527 auto curl = curl_easy_init();
1533 struct curl_slist*
list =
nullptr;
1534 list = curl_slist_append(
list, (
"If-None-Match: " +
etag).c_str());
1536 curl_easy_setopt(curl, CURLOPT_HTTPHEADER,
list);
1538 curl_easy_setopt(curl, CURLOPT_URL,
url.c_str());
1540 curl_easy_setopt(curl, CURLOPT_NOBODY, 1L);
1541 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
1542 curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, header_callback);
1543 curl_easy_setopt(curl, CURLOPT_HEADERDATA, &headers);
1544 if (!agentID.empty()) {
1545 curl_easy_setopt(curl, CURLOPT_USERAGENT, agentID.c_str());
1551 curl_easy_perform(curl);
1552 long http_code = 404;
1553 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code);
1554 if (http_code == 304) {
1562 static std::string etagHeader =
"ETag: ";
1563 static std::string locationHeader =
"Content-Location: ";
1564 for (
auto h : headers) {
1565 if (
h.find(etagHeader) == 0) {
1566 etag = std::string(
h.data() + etagHeader.size());
1567 }
else if (
h.find(locationHeader) == 0) {
1568 pfns.emplace_back(std::string(
h.data() + locationHeader.size(),
h.size() - locationHeader.size()));
1584 auto object =
file.GetObjectChecked(
CCDBMETA_ENTRY, TClass::GetClass(
typeid(std::map<std::string, std::string>)));
1586 return static_cast<std::map<std::string, std::string>*
>(
object);
1593void traverseAndFillFolders(
CcdbApi const& api, std::string
const&
top, std::vector<std::string>& folders)
1597 folders.emplace_back(
top);
1600 if (subfolders.size() > 0) {
1602 for (
auto& sub : subfolders) {
1603 traverseAndFillFolders(api, sub, folders);
1613 std::vector<std::string> folders;
1614 traverseAndFillFolders(*
this,
top, folders);
1618TClass* CcdbApi::tinfo2TClass(std::type_info
const& tinfo)
1620 TClass* cl = TClass::GetClass(tinfo);
1622 throw std::runtime_error(fmt::format(
"Could not retrieve ROOT dictionary for type {}, aborting", tinfo.name()));
1628int CcdbApi::updateMetadata(std::string
const&
path, std::map<std::string, std::string>
const& metadata,
long timestamp, std::string
const&
id,
long newEOV)
1631 CURL* curl = curl_easy_init();
1632 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1633 if (curl !=
nullptr) {
1635 stringstream fullUrl;
1636 for (
size_t hostIndex = 0; hostIndex < hostsPool.size(); hostIndex++) {
1637 fullUrl << getHostUrl(hostIndex) <<
"/" <<
path <<
"/" << timestamp;
1639 fullUrl <<
"/" << newEOV;
1642 fullUrl <<
"/" <<
id;
1646 for (
auto& kv : metadata) {
1647 std::string mfirst = kv.first;
1648 std::string msecond = kv.second;
1650 char* mfirstEncoded = curl_easy_escape(curl, mfirst.c_str(), mfirst.size());
1651 char* msecondEncoded = curl_easy_escape(curl, msecond.c_str(), msecond.size());
1652 fullUrl << std::string(mfirstEncoded) +
"=" + std::string(msecondEncoded) +
"&";
1653 curl_free(mfirstEncoded);
1654 curl_free(msecondEncoded);
1657 if (curl !=
nullptr) {
1658 LOG(
debug) <<
"passing to curl: " << fullUrl.str();
1659 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.str().c_str());
1660 curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST,
"PUT");
1661 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1662 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
1666 res = CURL_perform(curl);
1667 if (
res != CURLE_OK) {
1668 LOGP(alarm,
"CURL_perform() failed: {}, code: {}", curl_easy_strerror(
res),
int(
res));
1673 curl_easy_cleanup(curl);
1680void CcdbApi::initHostsPool(std::string hosts)
1683 auto splitted = hosts | std::views::transform([](
char c) {
return (
c ==
';') ?
',' :
c; }) | std::views::split(
',');
1684 for (
auto&& part : splitted) {
1685 hostsPool.emplace_back(part.begin(), part.end());
1689std::string CcdbApi::getHostUrl(
int hostIndex)
const
1691 return hostsPool.at(hostIndex);
1697 data->hoPair.object = &requestContext.
dest;
1699 std::function<bool(std::string)> localContentCallback = [
this, &requestContext](std::string
url) {
1703 auto writeCallback = [](
void* contents,
size_t size,
size_t nmemb,
void* chunkptr) {
1705 auto& chunk = *ho.
object;
1706 size_t realsize =
size * nmemb, sz = 0;
1709 if (chunk.capacity() < chunk.size() + realsize) {
1711 const char hannot[] =
"header";
1712 size_t hsize = getFlatHeaderSize(ho.header);
1713 auto cl = ho.header.find(
"Content-Length");
1714 if (cl != ho.header.end()) {
1715 size_t sizeFromHeader = std::stol(cl->second);
1716 sz = hsize + std::max(chunk.size() * (sizeFromHeader ? 1 : 2) + realsize, sizeFromHeader);
1718 sz = hsize + std::max(chunk.size() * 2, chunk.size() + realsize);
1723 char* contC = (
char*)contents;
1724 chunk.insert(chunk.end(), contC, contC + realsize);
1725 }
catch (std::exception e) {
1732 CURL* curl_handle = curl_easy_init();
1733 curl_easy_setopt(curl_handle, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1734 std::string fullUrl = getFullUrlForRetrieval(curl_handle, requestContext.
path, requestContext.
metadata, requestContext.
timestamp);
1735 curl_slist* options_list =
nullptr;
1736 initCurlHTTPHeaderOptionsForRetrieve(curl_handle, options_list, requestContext.
timestamp, &requestContext.
headers,
1740 data->hosts = hostsPool;
1743 data->localContentCallback = localContentCallback;
1744 data->userAgent = mUniqueAgentID;
1745 data->optionsList = options_list;
1747 curl_easy_setopt(curl_handle, CURLOPT_URL, fullUrl.c_str());
1748 initCurlOptionsForRetrieve(curl_handle, (
void*)(&
data->hoPair), writeCallback,
false);
1749 curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, header_map_callback<
decltype(
data->hoPair.header)>);
1750 curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, (
void*)&(
data->hoPair.header));
1751 curl_easy_setopt(curl_handle, CURLOPT_PRIVATE, (
void*)
data);
1754 asynchPerform(curl_handle, requestCounter);
1759 std::hash<std::string> hasher;
1760 std::string semhashedstring =
"aliceccdb" +
std::to_string(hasher(basedir + ccdbpath)).substr(0, 16);
1761 return semhashedstring;
1769 return new boost::interprocess::named_semaphore(boost::interprocess::open_or_create_t{}, semhashedstring.c_str(), 1);
1770 }
catch (std::exception e) {
1771 LOG(warn) <<
"Exception occurred during CCDB (cache) semaphore setup; Continuing without";
1780 if (sem->try_wait()) {
1791 boost::interprocess::named_semaphore semaphore(boost::interprocess::open_only, semaname.c_str());
1792 std::cout <<
"Found CCDB semaphore: " << semaname <<
"\n";
1794 auto success = boost::interprocess::named_semaphore::remove(semaname.c_str());
1796 std::cout <<
"Removed CCDB semaphore: " << semaname <<
"\n";
1801 }
catch (std::exception
const& e) {
1812 namespace fs = std::filesystem;
1813 std::string fileName{
"snapshot.root"};
1815 auto absolutesnapshotdir = fs::weakly_canonical(fs::absolute(snapshotdir));
1816 for (
const auto&
entry : fs::recursive_directory_iterator(absolutesnapshotdir)) {
1817 if (
entry.is_directory()) {
1818 const fs::path& currentDir = fs::canonical(fs::absolute(
entry.path()));
1819 fs::path filePath = currentDir / fileName;
1820 if (fs::exists(filePath) && fs::is_regular_file(filePath)) {
1821 std::cout <<
"Directory with file '" << fileName <<
"': " << currentDir << std::endl;
1825 auto numtokens = pathtokens.size();
1826 if (numtokens < 3) {
1831 std::string
path = pathtokens[numtokens - 3] +
"/" + pathtokens[numtokens - 2] +
"/" + pathtokens[numtokens - 1];
1837 }
catch (std::exception
const& e) {
1838 LOG(info) <<
"Semaphore search had exception " << e.what();
1843 long timestamp, std::map<std::string, std::string>& headers,
1846 if (createSnapshot) {
1847 std::string logfile = mSnapshotCachePath +
"/log";
1848 std::fstream logStream = std::fstream(logfile, ios_base::out | ios_base::app);
1849 if (logStream.is_open()) {
1850 logStream <<
"CCDB-access[" << getpid() <<
"] of " << mUniqueAgentID <<
" to " <<
path <<
" timestamp " << timestamp <<
" for load to memory\n";
1853 if (mInSnapshotMode) {
1858 }
else if (mPreferSnapshotCache && std::filesystem::exists(snapshotpath)) {
1870 if (!mSnapshotCachePath.empty() && !(mInSnapshotMode && mSnapshotTopPath == mSnapshotCachePath)) {
1871 auto semaphore_barrier = std::make_unique<CCDBSemaphore>(mSnapshotCachePath, requestContext.
path);
1873 auto snapshotdir = getSnapshotDir(mSnapshotCachePath, requestContext.
path);
1874 std::string snapshotpath = getSnapshotFile(mSnapshotCachePath, requestContext.
path);
1876 std::fstream logStream;
1877 if (logStream.is_open()) {
1878 logStream <<
"CCDB-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
" downloading to snapshot " << snapshotpath <<
" from memory\n";
1881 LOGP(
debug,
"creating snapshot {} -> {}", requestContext.
path, snapshotpath);
1884 std::ofstream objFile(snapshotpath, std::ios::out | std::ofstream::binary);
1885 std::copy(requestContext.
dest.begin(), requestContext.
dest.end(), std::ostreambuf_iterator<char>(objFile));
1888 updateMetaInformationInLocalFile(snapshotpath, &requestContext.
headers, &querysummary);
1894 std::map<std::string, std::string>
const& metadata,
long timestamp,
1895 std::map<std::string, std::string>* headers, std::string
const&
etag,
1899 destP.reserve(dest.size());
1902 dest.reserve(destP.size());
1903 for (
const auto c : destP) {
1909 std::map<std::string, std::string>
const& metadata,
long timestamp,
1910 std::map<std::string, std::string>* headers, std::string
const&
etag,
1922 std::vector<RequestContext> contexts = {requestContext};
1928 size_t hsize = getFlatHeaderSize(headers), cnt = dest.size();
1929 dest.resize(cnt + hsize);
1930 auto addString = [&dest, &cnt](
const std::string& s) {
1937 for (
auto&
h : headers) {
1939 addString(
h.second);
1941 *
reinterpret_cast<int*
>(&dest[cnt]) = hsize;
1942 std::memcpy(&dest[cnt +
sizeof(
int)], FlatHeaderAnnot,
sizeof(FlatHeaderAnnot));
1947 LOGP(
debug,
"loadFileToMemory {} ETag=[{}]", requestContext.
path, requestContext.
etag);
1948 bool createSnapshot = requestContext.
considerSnapshot && !mSnapshotCachePath.empty();
1950 std::string snapshotpath;
1951 if (mInSnapshotMode || std::filesystem::exists(snapshotpath = getSnapshotFile(mSnapshotCachePath, requestContext.
path))) {
1952 auto semaphore_barrier = std::make_unique<CCDBSemaphore>(mSnapshotCachePath, requestContext.
path);
1963 std::vector<int> fromSnapshots(requestContexts.size());
1964 size_t requestCounter = 0;
1967 for (
int i = 0;
i < requestContexts.size();
i++) {
1969 auto& requestContext = requestContexts.at(
i);
1974 while (requestCounter > 0) {
1979 for (
int i = 0;
i < requestContexts.size();
i++) {
1980 auto& requestContext = requestContexts.at(
i);
1981 if (!requestContext.dest.empty()) {
1982 logReading(requestContext.path, requestContext.timestamp, &requestContext.headers,
1983 fmt::format(
"{}{}", requestContext.considerSnapshot ?
"load to memory" :
"retrieve", fromSnapshots.at(
i) ?
" from snapshot" :
""));
1984 if (requestContext.considerSnapshot && fromSnapshots.at(
i) != 2) {
1993 if (
url.find(
"alien:/", 0) != std::string::npos) {
1994 std::map<std::string, std::string> localHeaders;
1996 auto it = localHeaders.find(
"Error");
1997 if (it != localHeaders.end() && it->second ==
"An error occurred during retrieval") {
2003 if ((
url.find(
"file:/", 0) != std::string::npos)) {
2004 std::string
path =
url.substr(7);
2005 if (std::filesystem::exists(
path)) {
2006 std::map<std::string, std::string> localHeaders;
2008 auto it = localHeaders.find(
"Error");
2009 if (it != localHeaders.end() && it->second ==
"An error occurred during retrieval") {
2022 constexpr size_t MaxCopySize = 0x1L << 25;
2023 auto signalError = [&dest, localHeaders]() {
2027 (*localHeaders)[
"Error"] =
"An error occurred during retrieval";
2030 if (
path.find(
"alien:/") == 0 && !initTGrid()) {
2034 std::string fname(
path);
2035 if (fname.find(
"?filetype=raw") == std::string::npos) {
2036 fname +=
"?filetype=raw";
2038 std::unique_ptr<TFile> sfile{TFile::Open(fname.c_str())};
2039 if (!sfile || sfile->IsZombie()) {
2040 LOG(error) <<
"Failed to open file " << fname;
2044 size_t totalread = 0, fsize = sfile->GetSize(), b00 = sfile->GetBytesRead();
2046 char* dptr = dest.data();
2050 size_t b0 = sfile->GetBytesRead(), b1 = b0 - b00;
2051 size_t readsize = fsize - b1 > MaxCopySize ? MaxCopySize : fsize - b1;
2052 if (readsize == 0) {
2055 sfile->Seek(totalread, TFile::kBeg);
2056 bool failed = sfile->ReadBuffer(dptr, (Int_t)readsize);
2057 nread = sfile->GetBytesRead() - b0;
2058 if (
failed || nread < 0) {
2059 LOG(error) <<
"failed to copy file " << fname <<
" to memory buffer";
2065 }
while (nread == (
long)MaxCopySize);
2067 if (localHeaders && fetchLocalMetaData) {
2068 TMemFile memFile(
"name",
const_cast<char*
>(dest.data()), dest.size(),
"READ");
2069 auto storedmeta = (std::map<std::string, std::string>*)
extractFromTFile(memFile, TClass::GetClass(
"std::map<std::string, std::string>"),
CCDBMETA_ENTRY);
2071 *localHeaders = *storedmeta;
2074 if ((
isSnapshotMode() || mPreferSnapshotCache) && localHeaders->find(
"ETag") == localHeaders->end()) {
2075 (*localHeaders)[
"ETag"] =
path;
2077 if (localHeaders->find(
"fileSize") == localHeaders->end()) {
2078 (*localHeaders)[
"fileSize"] = fmt::format(
"{}", memFile.GetEND());
2084void CcdbApi::checkMetadataKeys(std::map<std::string, std::string>
const& metadata)
const
2090 const std::regex regexPatternSearch(R
"([ :;.,\\/'?!\(\)\{\}\[\]@<>=+*#$&`|~^%])");
2091 bool isInvalid =
false;
2093 for (
auto& el : metadata) {
2094 auto keyMd = el.first;
2096 std::smatch searchRes;
2097 while (std::regex_search(keyMd, searchRes, regexPatternSearch)) {
2099 LOG(error) <<
"Invalid character found in metadata key '" << tmp <<
"\': '" << searchRes.str() <<
"\'";
2100 keyMd = searchRes.suffix();
2104 LOG(fatal) <<
"Some metadata keys have invalid characters, please fix!";
2109void CcdbApi::logReading(
const std::string&
path,
long ts,
const std::map<std::string, std::string>* headers,
const std::string& comment)
const
2111 std::string upath{
path};
2113 auto ent = headers->find(
"Valid-From");
2114 if (ent != headers->end()) {
2115 upath +=
"/" + ent->second;
2117 ent = headers->find(
"ETag");
2118 if (ent != headers->end()) {
2119 upath +=
"/" + ent->second;
2122 upath.erase(remove(upath.begin(), upath.end(),
'\"'), upath.end());
2123 LOGP(info,
"ccdb reads {}{}{} for {} ({}, agent_id: {}), ", mUrl, mUrl.back() ==
'/' ?
"" :
"/", upath, ts < 0 ?
getCurrentTimestamp() : ts, comment, mUniqueAgentID);
2126void CcdbApi::asynchPerform(CURL* handle,
size_t* requestCounter)
const
2131CURLcode CcdbApi::CURL_perform(CURL* handle)
const
2133 if (mIsCCDBDownloaderPreferred) {
2134 return mDownloader->
perform(handle);
2137 for (
int i = 1;
i <= mCurlRetries && (
result = curl_easy_perform(handle)) != CURLE_OK;
i++) {
2138 usleep(mCurlDelayRetries *
i);
2149 LOG(
debug) <<
"Entering semaphore barrier";
2152 mSem =
new boost::interprocess::named_semaphore(boost::interprocess::open_or_create_t{}, mSemName.c_str(), 1);
2153 }
catch (std::exception e) {
2154 LOG(warn) <<
"Exception occurred during CCDB (cache) semaphore setup; Continuing without";
2159 gSemaRegistry.
add(
this);
2166 LOG(
debug) <<
"Ending semaphore barrier";
2169 if (mSem->try_wait()) {
2171 boost::interprocess::named_semaphore::remove(mSemName.c_str());
2173 gSemaRegistry.
remove(
this);
2179 LOG(
debug) <<
"Cleaning up semaphore registry with count " << mStore.size();
2180 for (
auto& s : mStore) {
std::string createdNotBefore
std::string createdNotAfter
Class for time synchronization of RawReader instances.
void setRequestTimeoutTime(int timeoutMS)
CURLcode perform(CURL *handle)
void asynchSchedule(CURL *handle, size_t *requestCounter)
void setKeepaliveTimeoutTime(int timeoutMS)
void runLoop(bool noWait)
CCDBSemaphore(std::string const &cachepath, std::string const &path)
static void curlSetSSLOptions(CURL *curl)
static std::string generateFileName(const std::string &inp)
std::string list(std::string const &path="", bool latestOnly=false, std::string const &returnFormat="text/plain", long createdNotAfter=-1, long createdNotBefore=-1) const
int storeAsTFile_impl(const void *obj1, std::type_info const &info, std::string const &path, std::map< std::string, std::string > const &metadata, long startValidityTimestamp=-1, long endValidityTimestamp=-1, std::vector< char >::size_type maxSize=0) const
static bool checkAlienToken()
void runDownloaderLoop(bool noWait)
void releaseNamedSemaphore(boost::interprocess::named_semaphore *sem, std::string const &path) const
static std::map< std::string, std::string > * retrieveMetaInfo(TFile &)
void scheduleDownload(RequestContext &requestContext, size_t *requestCounter) const
TObject * retrieve(std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp) const
void init(std::string const &hosts)
TObject * retrieveFromTFile(std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp, std::map< std::string, std::string > *headers, std::string const &etag, const std::string &createdNotAfter, const std::string &createdNotBefore) const
std::string const & getURL() const
void getFromSnapshot(bool createSnapshot, std::string const &path, long timestamp, std::map< std::string, std::string > &headers, std::string &snapshotpath, o2::pmr::vector< char > &dest, int &fromSnapshot, std::string const &etag) const
bool loadLocalContentToMemory(o2::pmr::vector< char > &dest, std::string &url) const
static void removeLeakingSemaphores(std::string const &basedir, bool remove=false)
void saveSnapshot(RequestContext &requestContext) const
static std::unique_ptr< std::vector< char > > createObjectImage(const T *obj, CcdbObjectInfo *info=nullptr)
static void * extractFromTFile(TFile &file, TClass const *cl, const char *what=CCDBOBJECT_ENTRY)
int storeAsTFile(const TObject *rootObject, std::string const &path, std::map< std::string, std::string > const &metadata, long startValidityTimestamp=-1, long endValidityTimestamp=-1, std::vector< char >::size_type maxSize=0) const
void snapshot(std::string const &ccdbrootpath, std::string const &localDir, long timestamp) const
bool isSnapshotMode() const
bool isHostReachable() const
void loadFileToMemory(std::vector< char > &dest, std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp, std::map< std::string, std::string > *headers, std::string const &etag, const std::string &createdNotAfter, const std::string &createdNotBefore, bool considerSnapshot=true) const
static std::string determineSemaphoreName(std::string const &basedir, std::string const &objectpath)
void deleteObject(std::string const &path, long timestamp=-1) const
static void appendFlatHeader(o2::pmr::vector< char > &dest, const std::map< std::string, std::string > &headers)
std::vector< std::string > getAllFolders(std::string const &top) const
void vectoredLoadFileToMemory(std::vector< RequestContext > &requestContext) const
boost::interprocess::named_semaphore * createNamedSemaphore(std::string const &path) const
static bool removeSemaphore(std::string const &name, bool remove=false)
std::map< std::string, std::string > retrieveHeaders(std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp=-1) const
CcdbApi()
Default constructor.
static bool getCCDBEntryHeaders(std::string const &url, std::string const &etag, std::vector< std::string > &headers, const std::string &agentID="")
static CCDBQuery * retrieveQueryInfo(TFile &)
static constexpr const char * CCDBQUERY_ENTRY
void truncate(std::string const &path) const
int storeAsBinaryFile(const char *buffer, size_t size, const std::string &fileName, const std::string &objectType, const std::string &path, const std::map< std::string, std::string > &metadata, long startValidityTimestamp, long endValidityTimestamp, std::vector< char >::size_type maxSize=0) const
static void parseCCDBHeaders(std::vector< std::string > const &headers, std::vector< std::string > &pfns, std::string &etag)
bool retrieveBlob(std::string const &path, std::string const &targetdir, std::map< std::string, std::string > const &metadata, long timestamp, bool preservePathStructure=true, std::string const &localFileName="snapshot.root", std::string const &createdNotAfter="", std::string const &createdNotBefore="", std::map< std::string, std::string > *headers=nullptr) const
int updateMetadata(std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp, std::string const &id="", long newEOV=0)
static constexpr const char * CCDBMETA_ENTRY
static constexpr const char * CCDBOBJECT_ENTRY
void navigateSourcesAndLoadFile(RequestContext &requestContext, int &fromSnapshot, size_t *requestCounter) const
std::vector< std::string > parseSubFolders(std::string const &reply) const
virtual ~CcdbApi()
Default destructor.
void setFileName(const std::string &nm)
const std::string & getObjectType() const
void setObjectType(const std::string &tp)
const std::string & getFileName() const
void add(CCDBSemaphore const *ptr)
void remove(CCDBSemaphore const *ptr)
SemaphoreRegistry()=default
GLdouble GLdouble GLdouble GLdouble top
GLsizei const GLfloat * value
GLsizei const GLchar *const * path
GLbitfield GLuint64 timeout
GLenum GLuint GLenum GLsizei const GLchar * buf
information complementary to a CCDB object (path, metadata, startTimeValidity, endTimeValidity etc)
bool stdmap_to_jsonfile(std::map< std::string, std::string > const &meta, std::string const &filename)
size_t write_data(void *, size_t size, size_t nmemb, void *)
long getCurrentTimestamp()
returns the timestamp in long corresponding to "now"
size_t(*)(void *, size_t, size_t, void *) CurlWriteCallback
std::string sanitizeObjectName(const std::string &objectName)
bool jsonfile_to_stdmap(std::map< std::string, std::string > &meta, std::string const &filename)
long getFutureTimestamp(int secondsInFuture)
returns the timestamp in long corresponding to "now + secondsInFuture"
size_t CurlWrite_CallbackFunc_StdString2(void *contents, size_t size, size_t nmemb, std::string *s)
std::string timestamp() noexcept
Defining PrimaryVertex explicitly as messageable.
std::vector< T, fair::mq::pmr::polymorphic_allocator< T > > vector
void createDirectoriesIfAbsent(std::string const &path)
Defining DataPointCompositeObject explicitly as copiable.
std::string to_string(gsl::span< T, Size > span)
std::map< std::string, std::string > const & metadata
std::string createdNotAfter
std::map< std::string, std::string > & headers
o2::pmr::vector< char > & dest
std::string createdNotBefore
static DeploymentMode deploymentMode()
static std::string getClassName(const T &obj)
get the class name of the object
static std::unique_ptr< FileImage > createFileImage(const TObject &obj, const std::string &fileName, const std::string &objName)
static void trim(std::string &s)
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
static std::string concat_string(Ts const &... ts)
static bool endsWith(const std::string &s, const std::string &ending)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))
uint64_t const void const *restrict const msg