32#include <TStreamerInfo.h>
36#include <fairlogger/Logger.h>
42#include <boost/algorithm/string.hpp>
45#include <boost/interprocess/sync/named_semaphore.hpp>
49#include <TAlienUserAgent.h>
50#include <unordered_set>
51#include "rapidjson/document.h"
52#include "rapidjson/writer.h"
53#include "rapidjson/stringbuffer.h"
61unique_ptr<TJAlienCredentials> CcdbApi::mJAlienCredentials =
nullptr;
76 boost::interprocess::named_semaphore* mSem =
nullptr;
77 std::string mSemName{};
92 std::unordered_set<CCDBSemaphore const*> mStore;
102 mIsCCDBDownloaderPreferred = 0;
103 if (deploymentMode == DeploymentMode::OnlineDDS && deploymentMode == DeploymentMode::OnlineECS && deploymentMode == DeploymentMode::OnlineAUX && deploymentMode == DeploymentMode::FST) {
104 mIsCCDBDownloaderPreferred = 1;
106 if (getenv(
"ALICEO2_ENABLE_MULTIHANDLE_CCDBAPI")) {
107 mIsCCDBDownloaderPreferred = atoi(getenv(
"ALICEO2_ENABLE_MULTIHANDLE_CCDBAPI"));
114 curl_global_cleanup();
118void CcdbApi::setUniqueAgentID()
120 mUniqueAgentID = TAlienUserAgent::BasedOnEnvironment().ToString();
126 LOG(
debug) <<
"On macOS we simply rely on TGrid::Connect(\"alien\").";
129 if (getenv(
"ALICEO2_CCDB_NOTOKENCHECK") && atoi(getenv(
"ALICEO2_CCDB_NOTOKENCHECK"))) {
132 if (getenv(
"JALIEN_TOKEN_CERT")) {
135 auto returncode = system(
"LD_PRELOAD= alien-token-info &> /dev/null");
136 if (returncode == -1) {
139 return returncode == 0;
142void CcdbApi::curlInit()
145 curl_global_init(CURL_GLOBAL_DEFAULT);
146 CcdbApi::mJAlienCredentials = std::make_unique<TJAlienCredentials>();
147 CcdbApi::mJAlienCredentials->loadCredentials();
148 CcdbApi::mJAlienCredentials->selectPreferedCredentials();
151 if (getenv(
"ALICEO2_CCDB_SOCKET_TIMEOUT")) {
152 auto timeoutMS = atoi(getenv(
"ALICEO2_CCDB_SOCKET_TIMEOUT"));
153 if (timeoutMS >= 0) {
154 LOG(info) <<
"Setting socket timeout to " << timeoutMS <<
" milliseconds";
163 throw std::invalid_argument(
"Empty url passed CcdbApi, cannot initialize. Aborting.");
168 constexpr const char* SNAPSHOTPREFIX =
"file://";
171 if (host.substr(0, 7).compare(SNAPSHOTPREFIX) == 0) {
172 auto path = host.substr(7);
173 initInSnapshotMode(
path);
194 std::string snapshotReport{};
195 const char* cachedir = getenv(
"ALICEO2_CCDB_LOCALCACHE");
196 namespace fs = std::filesystem;
198 if (cachedir[0] == 0) {
199 mSnapshotCachePath = fs::weakly_canonical(fs::absolute(
"."));
201 mSnapshotCachePath = fs::weakly_canonical(fs::absolute(cachedir));
203 snapshotReport = fmt::format(
"(cache snapshots to dir={}", mSnapshotCachePath);
206 mPreferSnapshotCache =
true;
207 if (mSnapshotCachePath.empty()) {
208 LOGP(fatal,
"IGNORE_VALIDITYCHECK_OF_CCDB_LOCALCACHE is defined but the ALICEO2_CCDB_LOCALCACHE is not");
210 snapshotReport +=
", prefer if available";
212 if (!snapshotReport.empty()) {
213 snapshotReport +=
')';
216 mNeedAlienToken = (host.find(
"https://") != std::string::npos) || (host.find(
"alice-ccdb.cern.ch") != std::string::npos) || (host.find(
"ccdb-test.cern.ch") != std::string::npos);
218 if (getenv(
"ALICEO2_CCDB_CURL_TIMEOUT_DOWNLOAD")) {
219 auto timeout = atoi(getenv(
"ALICEO2_CCDB_CURL_TIMEOUT_DOWNLOAD"));
221 mCurlTimeoutDownload =
timeout;
228 mCurlTimeoutDownload = 15;
231 mCurlTimeoutDownload = 15;
233 mCurlTimeoutDownload = 5;
237 if (getenv(
"ALICEO2_CCDB_CURL_TIMEOUT_UPLOAD")) {
238 auto timeout = atoi(getenv(
"ALICEO2_CCDB_CURL_TIMEOUT_UPLOAD"));
247 mCurlTimeoutUpload = 3;
250 mCurlTimeoutUpload = 20;
252 mCurlTimeoutUpload = 20;
259 LOGP(
debug,
"Curl timeouts are set to: download={:2}, upload={:2} seconds", mCurlTimeoutDownload, mCurlTimeoutUpload);
261 LOGP(info,
"Init CcdApi with UserAgentID: {}, Host: {}{}, Curl timeouts: upload:{} download:{}", mUniqueAgentID, host,
262 mInSnapshotMode ?
"(snapshot readonly mode)" : snapshotReport.c_str(), mCurlTimeoutUpload, mCurlTimeoutDownload);
271void CcdbApi::updateMetaInformationInLocalFile(std::string
const&
filename, std::map<std::string, std::string>
const* headers,
CCDBQuery const* querysummary)
273 std::lock_guard<std::mutex> guard(
gIOMutex);
274 auto oldlevel = gErrorIgnoreLevel;
275 gErrorIgnoreLevel = 6001;
276 TFile snapshotfile(
filename.c_str(),
"UPDATE");
278 if (!snapshotfile.IsZombie()) {
280 snapshotfile.WriteObjectAny(querysummary, TClass::GetClass(
typeid(*querysummary)),
CCDBQUERY_ENTRY);
283 snapshotfile.WriteObjectAny(headers, TClass::GetClass(
typeid(*headers)),
CCDBMETA_ENTRY);
285 snapshotfile.Write();
286 snapshotfile.Close();
288 gErrorIgnoreLevel = oldlevel;
298 std::string tmpObjectName = objectName;
299 tmpObjectName.erase(std::remove_if(tmpObjectName.begin(), tmpObjectName.end(),
300 [](
auto const&
c) ->
bool { return (!std::isalnum(c) && c !=
'_' && c !=
'/' && c !=
'.'); }),
301 tmpObjectName.end());
302 return tmpObjectName;
309 std::lock_guard<std::mutex> guard(
gIOMutex);
313 info->setFileName(tmpFileName);
314 info->setObjectType(className);
323 std::string className = rootObject->GetName();
326 info->setFileName(tmpFileName);
327 info->setObjectType(
"TObject");
329 std::lock_guard<std::mutex> guard(
gIOMutex);
334 std::map<std::string, std::string>
const& metadata,
335 long startValidityTimestamp,
long endValidityTimestamp,
336 std::vector<char>::size_type
maxSize)
const
340 LOGP(error,
"nullptr is provided for object {}/{}/{}",
path, startValidityTimestamp, endValidityTimestamp);
346 path, metadata, startValidityTimestamp, endValidityTimestamp,
maxSize);
350 const std::string&
path,
const std::map<std::string, std::string>& metadata,
351 long startValidityTimestamp,
long endValidityTimestamp, std::vector<char>::size_type
maxSize)
const
354 LOGP(alarm,
"Object will not be uploaded to {} since its size {} exceeds max allowed {}",
path,
size,
maxSize);
360 long sanitizedStartValidityTimestamp = startValidityTimestamp;
361 if (startValidityTimestamp == -1) {
362 LOGP(info,
"Start of Validity not set, current timestamp used.");
365 long sanitizedEndValidityTimestamp = endValidityTimestamp;
366 if (endValidityTimestamp == -1) {
367 LOGP(info,
"End of Validity not set, start of validity plus 1 day used.");
370 if (mInSnapshotMode) {
372 LOGP(alarm,
"Snapshot mode does not support headers-only upload");
375 auto pthLoc = getSnapshotDir(mSnapshotTopPath,
path);
377 auto flLoc = getSnapshotFile(mSnapshotTopPath,
path,
filename);
379 auto pent = flLoc.find_last_of(
'.');
380 if (pent == std::string::npos) {
383 flLoc.insert(pent, fmt::format(
"_{}_{}", startValidityTimestamp, endValidityTimestamp));
384 ofstream outf(flLoc.c_str(), ios::out | ios::binary);
388 throw std::runtime_error(fmt::format(
"Failed to write local CCDB file {}", flLoc));
390 std::map<std::string, std::string> metaheader(metadata);
392 metaheader[
"Valid-From"] =
std::to_string(startValidityTimestamp);
394 updateMetaInformationInLocalFile(flLoc.c_str(), &metaheader);
395 std::string metaStr{};
396 for (
const auto& mentry : metadata) {
397 metaStr += fmt::format(
"{}={};", mentry.first, mentry.second);
399 metaStr +=
"$USER_META;";
400 LOGP(info,
"Created local snapshot {}", flLoc);
401 LOGP(info, R
"(Upload with: o2-ccdb-upload --host "$ccdbhost" -p {} -f {} -k {} --starttimestamp {} --endtimestamp {} -m "{}")",
408 CURL* curl =
nullptr;
409 curl = curl_easy_init();
412 checkMetadataKeys(metadata);
414 if (curl !=
nullptr) {
415 auto mime = curl_mime_init(curl);
416 auto field = curl_mime_addpart(mime);
417 curl_mime_name(field,
"send");
419 curl_mime_filedata(field,
filename.c_str());
424 curl_mime_data(field,
"", 0);
427 struct curl_slist* headerlist =
nullptr;
428 static const char buf[] =
"Expect:";
429 headerlist = curl_slist_append(headerlist,
buf);
433 curl_easy_setopt(curl, CURLOPT_MIMEPOST, mime);
434 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headerlist);
435 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
436 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
437 curl_easy_setopt(curl, CURLOPT_TIMEOUT, mCurlTimeoutUpload);
439 CURLcode
res = CURL_LAST;
441 for (
size_t hostIndex = 0; hostIndex < hostsPool.size() &&
res > 0; hostIndex++) {
442 std::string fullUrl = getFullUrlForStorage(curl,
path, objectType, metadata, sanitizedStartValidityTimestamp, sanitizedEndValidityTimestamp, hostIndex);
443 LOG(debug3) <<
"Full URL Encoded: " << fullUrl;
445 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.c_str());
448 res = CURL_perform(curl);
450 if (
res != CURLE_OK) {
451 if (
res == CURLE_OPERATION_TIMEDOUT) {
452 LOGP(alarm,
"curl_easy_perform() timed out. Consider increasing the timeout using the env var `ALICEO2_CCDB_CURL_TIMEOUT_UPLOAD` (seconds), current one is {}", mCurlTimeoutUpload);
454 LOGP(alarm,
"curl_easy_perform() failed: {}", curl_easy_strerror(
res));
461 curl_easy_cleanup(curl);
464 curl_slist_free_all(headerlist);
466 curl_mime_free(mime);
468 LOGP(alarm,
"curl initialization failure");
475 long startValidityTimestamp,
long endValidityTimestamp, std::vector<char>::size_type
maxSize)
const
479 LOGP(error,
"nullptr is provided for object {}/{}/{}",
path, startValidityTimestamp, endValidityTimestamp);
487std::string CcdbApi::getFullUrlForStorage(CURL* curl,
const std::string&
path,
const std::string& objtype,
488 const std::map<std::string, std::string>& metadata,
489 long startValidityTimestamp,
long endValidityTimestamp,
int hostIndex)
const
492 std::string startValidityString = getTimestampString(startValidityTimestamp < 0 ?
getCurrentTimestamp() : startValidityTimestamp);
493 std::string endValidityString = getTimestampString(endValidityTimestamp < 0 ?
getFutureTimestamp(60 * 60 * 24 * 1) : endValidityTimestamp);
495 std::string
url = getHostUrl(hostIndex);
497 std::string fullUrl =
url +
"/" +
path +
"/" + startValidityString +
"/" + endValidityString +
"/";
500 char* objtypeEncoded = curl_easy_escape(curl, objtype.c_str(), objtype.size());
501 fullUrl +=
"ObjectType=" + std::string(objtypeEncoded) +
"/";
502 curl_free(objtypeEncoded);
504 for (
auto& kv : metadata) {
505 std::string mfirst = kv.first;
506 std::string msecond = kv.second;
508 char* mfirstEncoded = curl_easy_escape(curl, mfirst.c_str(), mfirst.size());
509 char* msecondEncoded = curl_easy_escape(curl, msecond.c_str(), msecond.size());
510 fullUrl += std::string(mfirstEncoded) +
"=" + std::string(msecondEncoded) +
"/";
511 curl_free(mfirstEncoded);
512 curl_free(msecondEncoded);
518std::string CcdbApi::getFullUrlForRetrieval(CURL* curl,
const std::string&
path,
const std::map<std::string, std::string>& metadata,
long timestamp,
int hostIndex)
const
520 if (mInSnapshotMode) {
521 return getSnapshotFile(mSnapshotTopPath,
path);
527 std::string hostUrl = getHostUrl(hostIndex);
529 std::string fullUrl = hostUrl +
"/" +
path +
"/" + validityString +
"/";
531 for (
auto& kv : metadata) {
532 std::string mfirst = kv.first;
533 std::string msecond = kv.second;
535 char* mfirstEncoded = curl_easy_escape(curl, mfirst.c_str(), mfirst.size());
536 char* msecondEncoded = curl_easy_escape(curl, msecond.c_str(), msecond.size());
537 fullUrl += std::string(mfirstEncoded) +
"=" + std::string(msecondEncoded) +
"/";
538 curl_free(mfirstEncoded);
539 curl_free(msecondEncoded);
561static size_t WriteMemoryCallback(
void* contents,
size_t size,
size_t nmemb,
void* userp)
563 size_t realsize =
size * nmemb;
566 mem->memory = (
char*)realloc(mem->memory, mem->size + realsize + 1);
567 if (mem->memory ==
nullptr) {
568 printf(
"not enough memory (realloc returned NULL)\n");
572 memcpy(&(mem->memory[mem->size]), contents, realsize);
573 mem->size += realsize;
574 mem->memory[mem->size] = 0;
590static size_t WriteToFileCallback(
void*
ptr,
size_t size,
size_t nmemb, FILE*
stream)
603static CURLcode ssl_ctx_callback(CURL*,
void*,
void* parm)
605 std::string
msg((
const char*)parm);
608 if (
msg.length() > 0 &&
end == -1) {
610 }
else if (
end > 0) {
622 CredentialsKind cmk = mJAlienCredentials->getPreferedCredentials();
625 if (cmk == cNOT_FOUND) {
629 TJAlienCredentialsObject cmo = mJAlienCredentials->get(cmk);
631 char* CAPath = getenv(
"X509_CERT_DIR");
633 curl_easy_setopt(curl_handle, CURLOPT_CAPATH, CAPath);
635 curl_easy_setopt(curl_handle, CURLOPT_CAINFO,
nullptr);
636 curl_easy_setopt(curl_handle, CURLOPT_SSLCERT, cmo.certpath.c_str());
637 curl_easy_setopt(curl_handle, CURLOPT_SSLKEY, cmo.keypath.c_str());
640 curl_easy_setopt(curl_handle, CURLOPT_SSL_CTX_FUNCTION, ssl_ctx_callback);
641 curl_easy_setopt(curl_handle, CURLOPT_SSL_CTX_DATA, mJAlienCredentials->getMessages().c_str());
648void CcdbApi::initCurlOptionsForRetrieve(CURL* curlHandle,
void* chunk,
CurlWriteCallback writeCallback,
bool followRedirect)
const
650 curl_easy_setopt(curlHandle, CURLOPT_WRITEFUNCTION, writeCallback);
651 curl_easy_setopt(curlHandle, CURLOPT_WRITEDATA, chunk);
652 curl_easy_setopt(curlHandle, CURLOPT_FOLLOWLOCATION, followRedirect ? 1L : 0L);
657template <
typename MapType = std::map<std::
string, std::
string>>
658size_t header_map_callback(
char*
buffer,
size_t size,
size_t nitems,
void* userdata)
660 auto* headers =
static_cast<MapType*
>(userdata);
661 auto header = std::string(
buffer,
size * nitems);
662 std::string::size_type
index = header.find(
':', 0);
663 if (
index != std::string::npos) {
664 const auto key = boost::algorithm::trim_copy(header.substr(0,
index));
665 const auto value = boost::algorithm::trim_copy(header.substr(
index + 1));
666 LOGP(
debug,
"Adding #{} {} -> {}", headers->size(),
key,
value);
668 if (
key ==
"Content-Length") {
669 auto cl = headers->find(
"Content-Length");
670 if (cl != headers->end()) {
671 if (std::stol(cl->second) < stol(
value)) {
681 auto cl = headers->find(
"ETag");
682 if (cl != headers->end()) {
688 if (
key ==
"Content-Type") {
689 auto cl = headers->find(
"Content-Type");
690 if (cl != headers->end()) {
696 headers->insert(std::make_pair(
key,
value));
699 return size * nitems;
703void CcdbApi::initCurlHTTPHeaderOptionsForRetrieve(CURL* curlHandle, curl_slist*& option_list,
long timestamp, std::map<std::string, std::string>* headers, std::string
const&
etag,
708 option_list = curl_slist_append(option_list, (
"If-None-Match: " +
etag).c_str());
712 option_list = curl_slist_append(option_list, (
"If-Not-After: " +
createdNotAfter).c_str());
716 option_list = curl_slist_append(option_list, (
"If-Not-Before: " +
createdNotBefore).c_str());
719 if (headers !=
nullptr) {
720 option_list = curl_slist_append(option_list, (
"If-None-Match: " +
to_string(timestamp)).c_str());
721 curl_easy_setopt(curlHandle, CURLOPT_HEADERFUNCTION, header_map_callback<>);
722 curl_easy_setopt(curlHandle, CURLOPT_HEADERDATA, headers);
726 curl_easy_setopt(curlHandle, CURLOPT_HTTPHEADER, option_list);
729 curl_easy_setopt(curlHandle, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
732bool CcdbApi::receiveToFile(FILE* fileHandle, std::string
const&
path, std::map<std::string, std::string>
const& metadata,
733 long timestamp, std::map<std::string, std::string>* headers, std::string
const&
etag,
739bool CcdbApi::receiveToMemory(
void* chunk, std::string
const&
path, std::map<std::string, std::string>
const& metadata,
740 long timestamp, std::map<std::string, std::string>* headers, std::string
const&
etag,
746bool CcdbApi::receiveObject(
void* dataHolder, std::string
const&
path, std::map<std::string, std::string>
const& metadata,
747 long timestamp, std::map<std::string, std::string>* headers, std::string
const&
etag,
752 curlHandle = curl_easy_init();
753 curl_easy_setopt(curlHandle, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
755 if (curlHandle !=
nullptr) {
758 initCurlOptionsForRetrieve(curlHandle, dataHolder, writeCallback, followRedirect);
759 curl_slist* option_list =
nullptr;
762 long responseCode = 0;
763 CURLcode curlResultCode = CURL_LAST;
765 for (
size_t hostIndex = 0; hostIndex < hostsPool.size() && (responseCode >= 400 || curlResultCode > 0); hostIndex++) {
766 std::string fullUrl = getFullUrlForRetrieval(curlHandle,
path, metadata, timestamp, hostIndex);
767 curl_easy_setopt(curlHandle, CURLOPT_URL, fullUrl.c_str());
769 curlResultCode = CURL_perform(curlHandle);
771 if (curlResultCode != CURLE_OK) {
772 LOGP(alarm,
"curl_easy_perform() failed: {}", curl_easy_strerror(curlResultCode));
774 curlResultCode = curl_easy_getinfo(curlHandle, CURLINFO_RESPONSE_CODE, &responseCode);
775 if ((curlResultCode == CURLE_OK) && (responseCode < 300)) {
776 curl_slist_free_all(option_list);
777 curl_easy_cleanup(curlHandle);
780 if (curlResultCode != CURLE_OK) {
781 LOGP(alarm,
"invalid URL {}", fullUrl);
783 LOGP(alarm,
"not found under link {}", fullUrl);
789 curl_slist_free_all(option_list);
790 curl_easy_cleanup(curlHandle);
796 long timestamp)
const
804 bool res = receiveToMemory((
void*)&chunk,
path, metadata, timestamp);
807 std::lock_guard<std::mutex> guard(
gIOMutex);
809 mess.SetBuffer(chunk.
memory, chunk.
size, kFALSE);
814 LOGP(info,
"couldn't retrieve the object {}",
path);
826 std::string
str = inp;
827 str.erase(std::remove_if(
str.begin(),
str.end(), ::isspace),
str.end());
828 str = std::regex_replace(
str, std::regex(
"::"),
"-");
834 long timestamp, std::map<std::string, std::string>* headers, std::string
const&
etag,
841 long timestamp,
bool preservePath, std::string
const& localFileName, std::string
const&
createdNotAfter, std::string
const&
createdNotBefore, std::map<std::string, std::string>* outHeaders)
const
845 std::string fulltargetdir = targetdir + (preservePath ? (
'/' +
path) :
"");
849 }
catch (std::exception e) {
850 LOGP(error,
"Could not create local snapshot cache directory {}, reason: {}", fulltargetdir, e.what());
855 std::map<std::string, std::string> headers;
858 if ((headers.count(
"Error") != 0) || (buff.empty())) {
859 LOGP(error,
"Unable to find object {}/{}, Aborting",
path, timestamp);
863 auto getFileName = [&headers]() {
864 auto& s = headers[
"Content-Disposition"];
866 std::regex re(
"(.*;)filename=\"(.*)\"");
868 if (std::regex_match(s.c_str(),
m, re)) {
872 std::string backupname(
"ccdb-blob.bin");
873 LOG(error) <<
"Cannot determine original filename from Content-Disposition ... falling back to " << backupname;
876 auto filename = localFileName.size() > 0 ? localFileName : getFileName();
877 std::string targetpath = fulltargetdir +
"/" +
filename;
879 std::ofstream objFile(targetpath, std::ios::out | std::ofstream::binary);
880 std::copy(buff.begin(), buff.end(), std::ostreambuf_iterator<char>(objFile));
881 if (!objFile.good()) {
882 LOGP(error,
"Unable to open local file {}, Aborting", targetpath);
888 updateMetaInformationInLocalFile(targetpath.c_str(), &headers, &querysummary);
890 *outHeaders = std::move(headers);
895void CcdbApi::snapshot(std::string
const& ccdbrootpath, std::string
const& localDir,
long timestamp)
const
899 std::map<std::string, std::string> metadata;
900 for (
auto& folder : allfolders) {
910 auto object = file.GetObjectChecked(what, cl);
914 std::string objectName(cl->GetName());
916 object = file.GetObjectChecked(objectName.c_str(), cl);
917 LOG(warn) <<
"Did not find object under expected name " << what;
921 LOG(warn) <<
"Found object under deprecated name " << cl->GetName();
926 if (cl->InheritsFrom(
"TObject")) {
929 auto tree =
dynamic_cast<TTree*
>((
TObject*)
object);
931 tree->LoadBaskets(0x1L << 32);
932 tree->SetDirectory(
nullptr);
935 auto h =
dynamic_cast<TH1*
>((
TObject*)
object);
937 h->SetDirectory(
nullptr);
945void* CcdbApi::extractFromLocalFile(std::string
const&
filename, std::type_info
const& tinfo, std::map<std::string, std::string>* headers)
const
947 if (!std::filesystem::exists(
filename)) {
948 LOG(error) <<
"Local snapshot " <<
filename <<
" not found \n";
951 std::lock_guard<std::mutex> guard(
gIOMutex);
952 auto tcl = tinfo2TClass(tinfo);
957 *headers = *storedmeta;
960 if ((
isSnapshotMode() || mPreferSnapshotCache) && headers->find(
"ETag") == headers->end()) {
963 if (headers->find(
"fileSize") == headers->end()) {
964 (*headers)[
"fileSize"] = fmt::format(
"{}",
f.GetEND());
970bool CcdbApi::initTGrid()
const
972 if (mNeedAlienToken && !gGrid) {
973 static bool allowNoToken = getenv(
"ALICEO2_CCDB_NOTOKENCHECK") && atoi(getenv(
"ALICEO2_CCDB_NOTOKENCHECK"));
975 LOG(fatal) <<
"Alien Token Check failed - Please get an alien token before running with https CCDB endpoint, or alice-ccdb.cern.ch!";
977 TGrid::Connect(
"alien");
978 static bool errorShown =
false;
979 if (!gGrid && errorShown ==
false) {
981 LOG(error) <<
"TGrid::Connect returned nullptr. May be due to missing alien token";
983 LOG(fatal) <<
"TGrid::Connect returned nullptr. May be due to missing alien token";
988 return gGrid !=
nullptr;
991void* CcdbApi::downloadFilesystemContent(std::string
const&
url, std::type_info
const& tinfo, std::map<std::string, std::string>* headers)
const
993 if ((
url.find(
"alien:/", 0) != std::string::npos) && !initTGrid()) {
996 std::lock_guard<std::mutex> guard(
gIOMutex);
997 auto memfile = TMemFile::Open(
url.c_str(),
"OPEN");
999 auto cl = tinfo2TClass(tinfo);
1001 if (headers && headers->find(
"fileSize") == headers->end()) {
1002 (*headers)[
"fileSize"] = fmt::format(
"{}", memfile->GetEND());
1010void* CcdbApi::interpretAsTMemFileAndExtract(
char* contentptr,
size_t contentsize, std::type_info
const& tinfo)
1013 Int_t previousErrorLevel = gErrorIgnoreLevel;
1014 gErrorIgnoreLevel = kFatal;
1015 std::lock_guard<std::mutex> guard(
gIOMutex);
1016 TMemFile memFile(
"name", contentptr, contentsize,
"READ");
1017 gErrorIgnoreLevel = previousErrorLevel;
1018 if (!memFile.IsZombie()) {
1019 auto tcl = tinfo2TClass(tinfo);
1030void* CcdbApi::navigateURLsAndRetrieveContent(CURL* curl_handle, std::string
const&
url, std::type_info
const& tinfo, std::map<std::string, std::string>* headers)
const
1035 static thread_local std::multimap<std::string, std::string> headerData;
1038 if ((
url.find(
"alien:/", 0) != std::string::npos) || (
url.find(
"file:/", 0) != std::string::npos)) {
1039 return downloadFilesystemContent(
url, tinfo, headers);
1046 curl_easy_setopt(curl_handle, CURLOPT_URL,
url.c_str());
1048 MemoryStruct chunk{(
char*)malloc(1), 0};
1049 initCurlOptionsForRetrieve(curl_handle, (
void*)&chunk, WriteMemoryCallback,
false);
1051 curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, header_map_callback<
decltype(headerData)>);
1053 curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, (
void*)&headerData);
1057 auto res = CURL_perform(curl_handle);
1058 long response_code = -1;
1059 void* content =
nullptr;
1061 if (
res == CURLE_OK && curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &response_code) == CURLE_OK) {
1063 for (
auto& p : headerData) {
1064 (*headers)[
p.first] =
p.second;
1067 if (200 <= response_code && response_code < 300) {
1069 content = interpretAsTMemFileAndExtract(chunk.memory, chunk.size, tinfo);
1070 if (headers && headers->find(
"fileSize") == headers->end()) {
1071 (*headers)[
"fileSize"] = fmt::format(
"{}", chunk.size);
1073 }
else if (response_code == 304) {
1078 LOGP(
debug,
"Object exists but I am not serving it since it's already in your possession");
1081 else if (300 <= response_code && response_code < 400) {
1087 auto complement_Location = [
this](std::string
const& loc) {
1088 if (loc[0] ==
'/') {
1095 std::vector<std::string> locs;
1096 auto iter = headerData.find(
"Location");
1097 if (iter != headerData.end()) {
1098 locs.push_back(complement_Location(iter->second));
1101 auto iter2 = headerData.find(
"Content-Location");
1102 if (iter2 != headerData.end()) {
1103 auto range = headerData.equal_range(
"Content-Location");
1104 for (
auto it =
range.first; it !=
range.second; ++it) {
1105 if (std::find(locs.begin(), locs.end(), it->second) == locs.end()) {
1106 locs.push_back(complement_Location(it->second));
1110 for (
auto& l : locs) {
1112 LOG(
debug) <<
"Trying content location " << l;
1113 content = navigateURLsAndRetrieveContent(curl_handle, l, tinfo, headers);
1119 }
else if (response_code == 404) {
1120 LOG(error) <<
"Requested resource does not exist: " <<
url;
1123 LOG(error) <<
"Error in fetching object " <<
url <<
", curl response code:" << response_code;
1127 if (chunk.memory !=
nullptr) {
1131 LOGP(alarm,
"Curl request to {} failed with result {}, response code: {}",
url,
int(
res), response_code);
1136 (*headers)[
"Error"] =
"An error occurred during retrieval";
1142 std::map<std::string, std::string>
const& metadata,
long timestamp,
1143 std::map<std::string, std::string>* headers, std::string
const&
etag,
1146 if (!mSnapshotCachePath.empty()) {
1148 auto semaphore_barrier = std::make_unique<CCDBSemaphore>(mSnapshotCachePath,
path);
1149 std::string logfile = mSnapshotCachePath +
"/log";
1150 std::fstream out(logfile, ios_base::out | ios_base::app);
1151 if (out.is_open()) {
1152 out <<
"CCDB-access[" << getpid() <<
"] of " << mUniqueAgentID <<
" to " <<
path <<
" timestamp " << timestamp <<
"\n";
1154 auto snapshotfile = getSnapshotFile(mSnapshotCachePath,
path);
1155 bool snapshoting =
false;
1156 if (!std::filesystem::exists(snapshotfile)) {
1158 out <<
"CCDB-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
" downloading to snapshot " << snapshotfile <<
"\n";
1161 out <<
"CCDB-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
" failed to create directory for " << snapshotfile <<
"\n";
1164 out <<
"CCDB-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
"serving from local snapshot " << snapshotfile <<
"\n";
1167 auto res = extractFromLocalFile(snapshotfile, tinfo, headers);
1169 logReading(
path, timestamp, headers,
"retrieve from snapshot");
1176 CURL* curl_handle = curl_easy_init();
1177 curl_easy_setopt(curl_handle, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1178 std::string fullUrl = getFullUrlForRetrieval(curl_handle,
path, metadata, timestamp);
1180 if (mInSnapshotMode) {
1181 auto res = extractFromLocalFile(fullUrl, tinfo, headers);
1183 logReading(
path, timestamp, headers,
"retrieve from snapshot");
1188 curl_slist* option_list =
nullptr;
1190 auto content = navigateURLsAndRetrieveContent(curl_handle, fullUrl, tinfo, headers);
1192 for (
size_t hostIndex = 1; hostIndex < hostsPool.size() && !(content); hostIndex++) {
1193 fullUrl = getFullUrlForRetrieval(curl_handle,
path, metadata, timestamp, hostIndex);
1194 content = navigateURLsAndRetrieveContent(curl_handle, fullUrl, tinfo, headers);
1197 logReading(
path, timestamp, headers,
"retrieve");
1199 curl_slist_free_all(option_list);
1200 curl_easy_cleanup(curl_handle);
1206 size_t newLength =
size * nmemb;
1207 size_t oldLength = s->size();
1209 s->resize(oldLength + newLength);
1210 }
catch (std::bad_alloc& e) {
1211 LOG(error) <<
"memory error when getting data from CCDB";
1215 std::copy((
char*)contents, (
char*)contents + newLength, s->begin() + oldLength);
1216 return size * nmemb;
1222 CURLcode
res = CURL_LAST;
1225 curl = curl_easy_init();
1226 if (curl !=
nullptr) {
1228 curl_easy_setopt(curl, CURLOPT_WRITEDATA, &
result);
1229 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1231 struct curl_slist* headers =
nullptr;
1232 headers = curl_slist_append(headers, (std::string(
"Accept: ") + returnFormat).c_str());
1233 headers = curl_slist_append(headers, (std::string(
"Content-Type: ") + returnFormat).c_str());
1240 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
1244 std::string fullUrl;
1246 for (
size_t hostIndex = 0; hostIndex < hostsPool.size() &&
res != CURLE_OK; hostIndex++) {
1247 fullUrl = getHostUrl(hostIndex);
1248 fullUrl += latestOnly ?
"/latest/" :
"/browse/";
1250 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.c_str());
1252 res = CURL_perform(curl);
1253 if (
res != CURLE_OK) {
1254 LOGP(alarm,
"CURL_perform() failed: {}", curl_easy_strerror(
res));
1257 curl_slist_free_all(headers);
1258 curl_easy_cleanup(curl);
1264std::string CcdbApi::getTimestampString(
long timestamp)
const
1275 stringstream fullUrl;
1278 curl = curl_easy_init();
1279 if (curl !=
nullptr) {
1280 curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST,
"DELETE");
1281 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1284 for (
size_t hostIndex = 0; hostIndex < hostsPool.size(); hostIndex++) {
1285 fullUrl << getHostUrl(hostIndex) <<
"/" <<
path <<
"/" << timestampLocal;
1286 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.str().c_str());
1289 res = CURL_perform(curl);
1290 if (
res != CURLE_OK) {
1291 LOGP(alarm,
"CURL_perform() failed: {}", curl_easy_strerror(
res));
1293 curl_easy_cleanup(curl);
1302 stringstream fullUrl;
1303 for (
size_t i = 0;
i < hostsPool.size();
i++) {
1304 std::string
url = getHostUrl(
i);
1305 fullUrl <<
url <<
"/truncate/" <<
path;
1307 curl = curl_easy_init();
1308 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1309 if (curl !=
nullptr) {
1310 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.str().c_str());
1315 res = CURL_perform(curl);
1316 if (
res != CURLE_OK) {
1317 LOGP(alarm,
"CURL_perform() failed: {}", curl_easy_strerror(
res));
1319 curl_easy_cleanup(curl);
1326 return size * nmemb;
1332 CURLcode
res = CURL_LAST;
1335 curl = curl_easy_init();
1336 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1338 for (
size_t hostIndex = 0; hostIndex < hostsPool.size() &&
res != CURLE_OK; hostIndex++) {
1339 curl_easy_setopt(curl, CURLOPT_URL, mUrl.data());
1340 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
write_data);
1342 res = CURL_perform(curl);
1347 curl_easy_cleanup(curl);
1356 std::stringstream ss(reply.c_str());
1358 std::vector<std::string> folders;
1360 size_t numberoflines = std::count(reply.begin(), reply.end(),
'\n');
1361 bool inSubFolderSection =
false;
1363 for (
size_t linenumber = 0; linenumber < numberoflines; ++linenumber) {
1364 std::getline(ss, line);
1365 if (inSubFolderSection && line.size() > 0) {
1370 if (line.compare(
"Subfolders:") == 0) {
1371 inSubFolderSection =
true;
1379size_t header_callback(
char*
buffer,
size_t size,
size_t nitems,
void* userdata)
1381 auto* headers =
static_cast<std::vector<std::string>*
>(userdata);
1382 auto header = std::string(
buffer,
size * nitems);
1383 headers->emplace_back(std::string(header.data()));
1384 return size * nitems;
1392 auto p = std::filesystem::path(
filename).parent_path();
1393 if (!std::filesystem::exists(p)) {
1394 std::filesystem::create_directories(p);
1397 rapidjson::StringBuffer
buffer;
1398 rapidjson::Writer<rapidjson::StringBuffer> writer(
buffer);
1399 writer.StartObject();
1400 for (
const auto& pair : meta) {
1401 writer.Key(pair.first.c_str());
1402 writer.String(pair.second.c_str());
1408 if (file.is_open()) {
1409 file <<
buffer.GetString();
1421 if (!file.is_open()) {
1422 std::cerr <<
"Failed to open file for reading." << std::endl;
1426 std::string jsonStr((std::istreambuf_iterator<char>(file)), std::istreambuf_iterator<char>());
1429 rapidjson::Document document;
1430 document.Parse(jsonStr.c_str());
1432 if (document.HasParseError()) {
1433 std::cerr <<
"Error parsing JSON" << std::endl;
1438 for (
auto itr = document.MemberBegin(); itr != document.MemberEnd(); ++itr) {
1439 meta[itr->name.GetString()] = itr->value.GetString();
1444std::map<std::string, std::string>
CcdbApi::retrieveHeaders(std::string
const&
path, std::map<std::string, std::string>
const& metadata,
long timestamp)
const
1447 auto do_remote_header_call = [
this, &
path, &metadata, timestamp]() -> std::map<std::string, std::string> {
1448 CURL* curl = curl_easy_init();
1449 CURLcode
res = CURL_LAST;
1450 std::string fullUrl = getFullUrlForRetrieval(curl,
path, metadata, timestamp);
1451 std::map<std::string, std::string> headers;
1453 if (curl !=
nullptr) {
1454 struct curl_slist*
list =
nullptr;
1457 curl_easy_setopt(curl, CURLOPT_HTTPHEADER,
list);
1460 curl_easy_setopt(curl, CURLOPT_NOBODY, 1L);
1461 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
1462 curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, header_map_callback<>);
1463 curl_easy_setopt(curl, CURLOPT_HEADERDATA, &headers);
1464 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1469 long httpCode = 404;
1470 CURLcode getCodeRes = CURL_LAST;
1471 for (
size_t hostIndex = 0; hostIndex < hostsPool.size() && (httpCode >= 400 ||
res > 0 || getCodeRes > 0); hostIndex++) {
1472 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.c_str());
1473 res = CURL_perform(curl);
1474 if (
res != CURLE_OK &&
res != CURLE_UNSUPPORTED_PROTOCOL) {
1478 LOG(error) <<
"CURL_perform() failed: " << curl_easy_strerror(
res);
1480 getCodeRes = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode);
1482 if (httpCode == 404) {
1485 curl_easy_cleanup(curl);
1490 if (!mSnapshotCachePath.empty()) {
1492 auto semaphore_barrier = std::make_unique<CCDBSemaphore>(mSnapshotCachePath + std::string(
"_headers"),
path);
1494 std::string logfile = mSnapshotCachePath +
"/log";
1495 std::fstream out(logfile, ios_base::out | ios_base::app);
1496 if (out.is_open()) {
1497 out <<
"CCDB-header-access[" << getpid() <<
"] of " << mUniqueAgentID <<
" to " <<
path <<
" timestamp " << timestamp <<
"\n";
1499 auto snapshotfile = getSnapshotFile(mSnapshotCachePath,
path +
"/" +
std::to_string(timestamp),
"header.json");
1500 if (!std::filesystem::exists(snapshotfile)) {
1501 out <<
"CCDB-header-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
" storing to snapshot " << snapshotfile <<
"\n";
1504 auto meta = do_remote_header_call();
1508 LOG(warn) <<
"Failed to cache the header information to disc";
1512 out <<
"CCDB-header-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
"serving from local snapshot " << snapshotfile <<
"\n";
1513 std::map<std::string, std::string> meta;
1515 LOG(warn) <<
"Failed to read cached information from disc";
1516 return do_remote_header_call();
1521 return do_remote_header_call();
1526 auto curl = curl_easy_init();
1532 struct curl_slist*
list =
nullptr;
1533 list = curl_slist_append(
list, (
"If-None-Match: " +
etag).c_str());
1535 curl_easy_setopt(curl, CURLOPT_HTTPHEADER,
list);
1537 curl_easy_setopt(curl, CURLOPT_URL,
url.c_str());
1539 curl_easy_setopt(curl, CURLOPT_NOBODY, 1L);
1540 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
1541 curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, header_callback);
1542 curl_easy_setopt(curl, CURLOPT_HEADERDATA, &headers);
1543 if (!agentID.empty()) {
1544 curl_easy_setopt(curl, CURLOPT_USERAGENT, agentID.c_str());
1550 curl_easy_perform(curl);
1551 long http_code = 404;
1552 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code);
1553 if (http_code == 304) {
1561 static std::string etagHeader =
"ETag: ";
1562 static std::string locationHeader =
"Content-Location: ";
1563 for (
auto h : headers) {
1564 if (
h.find(etagHeader) == 0) {
1565 etag = std::string(
h.data() + etagHeader.size());
1566 }
else if (
h.find(locationHeader) == 0) {
1567 pfns.emplace_back(std::string(
h.data() + locationHeader.size(),
h.size() - locationHeader.size()));
1583 auto object = file.GetObjectChecked(
CCDBMETA_ENTRY, TClass::GetClass(
typeid(std::map<std::string, std::string>)));
1585 return static_cast<std::map<std::string, std::string>*
>(
object);
1592void traverseAndFillFolders(
CcdbApi const& api, std::string
const&
top, std::vector<std::string>& folders)
1596 folders.emplace_back(
top);
1599 if (subfolders.size() > 0) {
1601 for (
auto& sub : subfolders) {
1602 traverseAndFillFolders(api, sub, folders);
1612 std::vector<std::string> folders;
1613 traverseAndFillFolders(*
this,
top, folders);
1617TClass* CcdbApi::tinfo2TClass(std::type_info
const& tinfo)
1619 TClass* cl = TClass::GetClass(tinfo);
1621 throw std::runtime_error(fmt::format(
"Could not retrieve ROOT dictionary for type {}, aborting", tinfo.name()));
1627int CcdbApi::updateMetadata(std::string
const&
path, std::map<std::string, std::string>
const& metadata,
long timestamp, std::string
const&
id,
long newEOV)
1630 CURL* curl = curl_easy_init();
1631 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1632 if (curl !=
nullptr) {
1634 stringstream fullUrl;
1635 for (
size_t hostIndex = 0; hostIndex < hostsPool.size(); hostIndex++) {
1636 fullUrl << getHostUrl(hostIndex) <<
"/" <<
path <<
"/" << timestamp;
1638 fullUrl <<
"/" << newEOV;
1641 fullUrl <<
"/" <<
id;
1645 for (
auto& kv : metadata) {
1646 std::string mfirst = kv.first;
1647 std::string msecond = kv.second;
1649 char* mfirstEncoded = curl_easy_escape(curl, mfirst.c_str(), mfirst.size());
1650 char* msecondEncoded = curl_easy_escape(curl, msecond.c_str(), msecond.size());
1651 fullUrl << std::string(mfirstEncoded) +
"=" + std::string(msecondEncoded) +
"&";
1652 curl_free(mfirstEncoded);
1653 curl_free(msecondEncoded);
1656 if (curl !=
nullptr) {
1657 LOG(
debug) <<
"passing to curl: " << fullUrl.str();
1658 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.str().c_str());
1659 curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST,
"PUT");
1660 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1661 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
1665 res = CURL_perform(curl);
1666 if (
res != CURLE_OK) {
1667 LOGP(alarm,
"CURL_perform() failed: {}, code: {}", curl_easy_strerror(
res),
int(
res));
1672 curl_easy_cleanup(curl);
1679void CcdbApi::initHostsPool(std::string hosts)
1682 auto splitted = hosts | std::views::transform([](
char c) {
return (
c ==
';') ?
',' :
c; }) | std::views::split(
',');
1683 for (
auto&& part : splitted) {
1684 hostsPool.emplace_back(part.begin(), part.end());
1688std::string CcdbApi::getHostUrl(
int hostIndex)
const
1690 return hostsPool.at(hostIndex);
1696 data->hoPair.object = &requestContext.
dest;
1698 std::function<bool(std::string)> localContentCallback = [
this, &requestContext](std::string
url) {
1702 auto writeCallback = [](
void* contents,
size_t size,
size_t nmemb,
void* chunkptr) {
1704 auto& chunk = *ho.
object;
1705 size_t realsize =
size * nmemb, sz = 0;
1708 if (chunk.capacity() < chunk.size() + realsize) {
1710 const char hannot[] =
"header";
1711 size_t hsize = getFlatHeaderSize(ho.header);
1712 auto cl = ho.header.find(
"Content-Length");
1713 if (cl != ho.header.end()) {
1714 size_t sizeFromHeader = std::stol(cl->second);
1715 sz = hsize + std::max(chunk.size() * (sizeFromHeader ? 1 : 2) + realsize, sizeFromHeader);
1717 sz = hsize + std::max(chunk.size() * 2, chunk.size() + realsize);
1722 char* contC = (
char*)contents;
1723 chunk.insert(chunk.end(), contC, contC + realsize);
1724 }
catch (std::exception e) {
1731 CURL* curl_handle = curl_easy_init();
1732 curl_easy_setopt(curl_handle, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1733 std::string fullUrl = getFullUrlForRetrieval(curl_handle, requestContext.
path, requestContext.
metadata, requestContext.
timestamp);
1734 curl_slist* options_list =
nullptr;
1735 initCurlHTTPHeaderOptionsForRetrieve(curl_handle, options_list, requestContext.
timestamp, &requestContext.
headers,
1739 data->hosts = hostsPool;
1742 data->localContentCallback = localContentCallback;
1743 data->userAgent = mUniqueAgentID;
1744 data->optionsList = options_list;
1746 curl_easy_setopt(curl_handle, CURLOPT_URL, fullUrl.c_str());
1747 initCurlOptionsForRetrieve(curl_handle, (
void*)(&
data->hoPair), writeCallback,
false);
1748 curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, header_map_callback<
decltype(
data->hoPair.header)>);
1749 curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, (
void*)&(
data->hoPair.header));
1750 curl_easy_setopt(curl_handle, CURLOPT_PRIVATE, (
void*)
data);
1753 asynchPerform(curl_handle, requestCounter);
1758 std::hash<std::string> hasher;
1759 std::string semhashedstring =
"aliceccdb" +
std::to_string(hasher(basedir + ccdbpath)).substr(0, 16);
1760 return semhashedstring;
1768 return new boost::interprocess::named_semaphore(boost::interprocess::open_or_create_t{}, semhashedstring.c_str(), 1);
1769 }
catch (std::exception e) {
1770 LOG(warn) <<
"Exception occurred during CCDB (cache) semaphore setup; Continuing without";
1779 if (sem->try_wait()) {
1790 boost::interprocess::named_semaphore semaphore(boost::interprocess::open_only, semaname.c_str());
1791 std::cout <<
"Found CCDB semaphore: " << semaname <<
"\n";
1793 auto success = boost::interprocess::named_semaphore::remove(semaname.c_str());
1795 std::cout <<
"Removed CCDB semaphore: " << semaname <<
"\n";
1800 }
catch (std::exception
const& e) {
1811 namespace fs = std::filesystem;
1812 std::string fileName{
"snapshot.root"};
1814 auto absolutesnapshotdir = fs::weakly_canonical(fs::absolute(snapshotdir));
1815 for (
const auto&
entry : fs::recursive_directory_iterator(absolutesnapshotdir)) {
1816 if (
entry.is_directory()) {
1817 const fs::path& currentDir = fs::canonical(fs::absolute(
entry.path()));
1818 fs::path filePath = currentDir / fileName;
1819 if (fs::exists(filePath) && fs::is_regular_file(filePath)) {
1820 std::cout <<
"Directory with file '" << fileName <<
"': " << currentDir << std::endl;
1824 auto numtokens = pathtokens.size();
1825 if (numtokens < 3) {
1830 std::string
path = pathtokens[numtokens - 3] +
"/" + pathtokens[numtokens - 2] +
"/" + pathtokens[numtokens - 1];
1836 }
catch (std::exception
const& e) {
1837 LOG(info) <<
"Semaphore search had exception " << e.what();
1842 long timestamp, std::map<std::string, std::string>& headers,
1845 if (createSnapshot) {
1846 std::string logfile = mSnapshotCachePath +
"/log";
1847 std::fstream logStream = std::fstream(logfile, ios_base::out | ios_base::app);
1848 if (logStream.is_open()) {
1849 logStream <<
"CCDB-access[" << getpid() <<
"] of " << mUniqueAgentID <<
" to " <<
path <<
" timestamp " << timestamp <<
" for load to memory\n";
1852 if (mInSnapshotMode) {
1857 }
else if (mPreferSnapshotCache && std::filesystem::exists(snapshotpath)) {
1869 if (!mSnapshotCachePath.empty() && !(mInSnapshotMode && mSnapshotTopPath == mSnapshotCachePath)) {
1870 auto semaphore_barrier = std::make_unique<CCDBSemaphore>(mSnapshotCachePath, requestContext.
path);
1872 auto snapshotdir = getSnapshotDir(mSnapshotCachePath, requestContext.
path);
1873 std::string snapshotpath = getSnapshotFile(mSnapshotCachePath, requestContext.
path);
1875 std::fstream logStream;
1876 if (logStream.is_open()) {
1877 logStream <<
"CCDB-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
" downloading to snapshot " << snapshotpath <<
" from memory\n";
1880 LOGP(
debug,
"creating snapshot {} -> {}", requestContext.
path, snapshotpath);
1883 std::ofstream objFile(snapshotpath, std::ios::out | std::ofstream::binary);
1884 std::copy(requestContext.
dest.begin(), requestContext.
dest.end(), std::ostreambuf_iterator<char>(objFile));
1887 updateMetaInformationInLocalFile(snapshotpath, &requestContext.
headers, &querysummary);
1893 std::map<std::string, std::string>
const& metadata,
long timestamp,
1894 std::map<std::string, std::string>* headers, std::string
const&
etag,
1898 destP.reserve(dest.size());
1901 dest.reserve(destP.size());
1902 for (
const auto c : destP) {
1908 std::map<std::string, std::string>
const& metadata,
long timestamp,
1909 std::map<std::string, std::string>* headers, std::string
const&
etag,
1921 std::vector<RequestContext> contexts = {requestContext};
1927 size_t hsize = getFlatHeaderSize(headers), cnt = dest.size();
1928 dest.resize(cnt + hsize);
1929 auto addString = [&dest, &cnt](
const std::string& s) {
1936 for (
auto&
h : headers) {
1938 addString(
h.second);
1940 *
reinterpret_cast<int*
>(&dest[cnt]) = hsize;
1941 std::memcpy(&dest[cnt +
sizeof(
int)], FlatHeaderAnnot,
sizeof(FlatHeaderAnnot));
1946 LOGP(
debug,
"loadFileToMemory {} ETag=[{}]", requestContext.
path, requestContext.
etag);
1947 bool createSnapshot = requestContext.
considerSnapshot && !mSnapshotCachePath.empty();
1949 std::string snapshotpath;
1950 if (mInSnapshotMode || std::filesystem::exists(snapshotpath = getSnapshotFile(mSnapshotCachePath, requestContext.
path))) {
1951 auto semaphore_barrier = std::make_unique<CCDBSemaphore>(mSnapshotCachePath, requestContext.
path);
1962 std::vector<int> fromSnapshots(requestContexts.size());
1963 size_t requestCounter = 0;
1966 for (
int i = 0;
i < requestContexts.size();
i++) {
1968 auto& requestContext = requestContexts.at(
i);
1973 while (requestCounter > 0) {
1978 for (
int i = 0;
i < requestContexts.size();
i++) {
1979 auto& requestContext = requestContexts.at(
i);
1980 if (!requestContext.dest.empty()) {
1981 logReading(requestContext.path, requestContext.timestamp, &requestContext.headers,
1982 fmt::format(
"{}{}", requestContext.considerSnapshot ?
"load to memory" :
"retrieve", fromSnapshots.at(
i) ?
" from snapshot" :
""));
1983 if (requestContext.considerSnapshot && fromSnapshots.at(
i) != 2) {
1992 if (
url.find(
"alien:/", 0) != std::string::npos) {
1993 std::map<std::string, std::string> localHeaders;
1995 auto it = localHeaders.find(
"Error");
1996 if (it != localHeaders.end() && it->second ==
"An error occurred during retrieval") {
2002 if ((
url.find(
"file:/", 0) != std::string::npos)) {
2003 std::string
path =
url.substr(7);
2004 if (std::filesystem::exists(
path)) {
2005 std::map<std::string, std::string> localHeaders;
2007 auto it = localHeaders.find(
"Error");
2008 if (it != localHeaders.end() && it->second ==
"An error occurred during retrieval") {
2021 constexpr size_t MaxCopySize = 0x1L << 25;
2022 auto signalError = [&dest, localHeaders]() {
2026 (*localHeaders)[
"Error"] =
"An error occurred during retrieval";
2029 if (
path.find(
"alien:/") == 0 && !initTGrid()) {
2033 std::string fname(
path);
2034 if (fname.find(
"?filetype=raw") == std::string::npos) {
2035 fname +=
"?filetype=raw";
2037 std::unique_ptr<TFile> sfile{TFile::Open(fname.c_str())};
2038 if (!sfile || sfile->IsZombie()) {
2039 LOG(error) <<
"Failed to open file " << fname;
2043 size_t totalread = 0, fsize = sfile->GetSize(), b00 = sfile->GetBytesRead();
2045 char* dptr = dest.data();
2049 size_t b0 = sfile->GetBytesRead(), b1 = b0 - b00;
2050 size_t readsize = fsize - b1 > MaxCopySize ? MaxCopySize : fsize - b1;
2051 if (readsize == 0) {
2054 sfile->Seek(totalread, TFile::kBeg);
2055 bool failed = sfile->ReadBuffer(dptr, (Int_t)readsize);
2056 nread = sfile->GetBytesRead() - b0;
2057 if (
failed || nread < 0) {
2058 LOG(error) <<
"failed to copy file " << fname <<
" to memory buffer";
2064 }
while (nread == (
long)MaxCopySize);
2066 if (localHeaders && fetchLocalMetaData) {
2067 TMemFile memFile(
"name",
const_cast<char*
>(dest.data()), dest.size(),
"READ");
2068 auto storedmeta = (std::map<std::string, std::string>*)
extractFromTFile(memFile, TClass::GetClass(
"std::map<std::string, std::string>"),
CCDBMETA_ENTRY);
2070 *localHeaders = *storedmeta;
2073 if ((
isSnapshotMode() || mPreferSnapshotCache) && localHeaders->find(
"ETag") == localHeaders->end()) {
2074 (*localHeaders)[
"ETag"] =
path;
2076 if (localHeaders->find(
"fileSize") == localHeaders->end()) {
2077 (*localHeaders)[
"fileSize"] = fmt::format(
"{}", memFile.GetEND());
2083void CcdbApi::checkMetadataKeys(std::map<std::string, std::string>
const& metadata)
const
2089 const std::regex regexPatternSearch(R
"([ :;.,\\/'?!\(\)\{\}\[\]@<>=+*#$&`|~^%])");
2090 bool isInvalid =
false;
2092 for (
auto& el : metadata) {
2093 auto keyMd = el.first;
2095 std::smatch searchRes;
2096 while (std::regex_search(keyMd, searchRes, regexPatternSearch)) {
2098 LOG(error) <<
"Invalid character found in metadata key '" << tmp <<
"\': '" << searchRes.str() <<
"\'";
2099 keyMd = searchRes.suffix();
2103 LOG(fatal) <<
"Some metadata keys have invalid characters, please fix!";
2108void CcdbApi::logReading(
const std::string&
path,
long ts,
const std::map<std::string, std::string>* headers,
const std::string& comment)
const
2110 std::string upath{
path};
2112 auto ent = headers->find(
"Valid-From");
2113 if (ent != headers->end()) {
2114 upath +=
"/" + ent->second;
2116 ent = headers->find(
"ETag");
2117 if (ent != headers->end()) {
2118 upath +=
"/" + ent->second;
2121 upath.erase(remove(upath.begin(), upath.end(),
'\"'), upath.end());
2122 LOGP(info,
"ccdb reads {}{}{} for {} ({}, agent_id: {}), ", mUrl, mUrl.back() ==
'/' ?
"" :
"/", upath, ts < 0 ?
getCurrentTimestamp() : ts, comment, mUniqueAgentID);
2125void CcdbApi::asynchPerform(CURL* handle,
size_t* requestCounter)
const
2130CURLcode CcdbApi::CURL_perform(CURL* handle)
const
2132 if (mIsCCDBDownloaderPreferred) {
2133 return mDownloader->
perform(handle);
2136 for (
int i = 1;
i <= mCurlRetries && (
result = curl_easy_perform(handle)) != CURLE_OK;
i++) {
2137 usleep(mCurlDelayRetries *
i);
2148 LOG(
debug) <<
"Entering semaphore barrier";
2151 mSem =
new boost::interprocess::named_semaphore(boost::interprocess::open_or_create_t{}, mSemName.c_str(), 1);
2152 }
catch (std::exception e) {
2153 LOG(warn) <<
"Exception occurred during CCDB (cache) semaphore setup; Continuing without";
2158 gSemaRegistry.
add(
this);
2165 LOG(
debug) <<
"Ending semaphore barrier";
2168 if (mSem->try_wait()) {
2170 boost::interprocess::named_semaphore::remove(mSemName.c_str());
2172 gSemaRegistry.
remove(
this);
2178 LOG(
debug) <<
"Cleaning up semaphore registry with count " << mStore.size();
2179 for (
auto& s : mStore) {
std::string createdNotBefore
std::string createdNotAfter
Class for time synchronization of RawReader instances.
void setRequestTimeoutTime(int timeoutMS)
CURLcode perform(CURL *handle)
void asynchSchedule(CURL *handle, size_t *requestCounter)
void setKeepaliveTimeoutTime(int timeoutMS)
void runLoop(bool noWait)
CCDBSemaphore(std::string const &cachepath, std::string const &path)
static void curlSetSSLOptions(CURL *curl)
static std::string generateFileName(const std::string &inp)
std::string list(std::string const &path="", bool latestOnly=false, std::string const &returnFormat="text/plain", long createdNotAfter=-1, long createdNotBefore=-1) const
int storeAsTFile_impl(const void *obj1, std::type_info const &info, std::string const &path, std::map< std::string, std::string > const &metadata, long startValidityTimestamp=-1, long endValidityTimestamp=-1, std::vector< char >::size_type maxSize=0) const
static bool checkAlienToken()
void runDownloaderLoop(bool noWait)
void releaseNamedSemaphore(boost::interprocess::named_semaphore *sem, std::string const &path) const
static std::map< std::string, std::string > * retrieveMetaInfo(TFile &)
void scheduleDownload(RequestContext &requestContext, size_t *requestCounter) const
TObject * retrieve(std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp) const
void init(std::string const &hosts)
TObject * retrieveFromTFile(std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp, std::map< std::string, std::string > *headers, std::string const &etag, const std::string &createdNotAfter, const std::string &createdNotBefore) const
std::string const & getURL() const
void getFromSnapshot(bool createSnapshot, std::string const &path, long timestamp, std::map< std::string, std::string > &headers, std::string &snapshotpath, o2::pmr::vector< char > &dest, int &fromSnapshot, std::string const &etag) const
bool loadLocalContentToMemory(o2::pmr::vector< char > &dest, std::string &url) const
static void removeLeakingSemaphores(std::string const &basedir, bool remove=false)
void saveSnapshot(RequestContext &requestContext) const
static std::unique_ptr< std::vector< char > > createObjectImage(const T *obj, CcdbObjectInfo *info=nullptr)
static void * extractFromTFile(TFile &file, TClass const *cl, const char *what=CCDBOBJECT_ENTRY)
int storeAsTFile(const TObject *rootObject, std::string const &path, std::map< std::string, std::string > const &metadata, long startValidityTimestamp=-1, long endValidityTimestamp=-1, std::vector< char >::size_type maxSize=0) const
void snapshot(std::string const &ccdbrootpath, std::string const &localDir, long timestamp) const
bool isSnapshotMode() const
bool isHostReachable() const
void loadFileToMemory(std::vector< char > &dest, std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp, std::map< std::string, std::string > *headers, std::string const &etag, const std::string &createdNotAfter, const std::string &createdNotBefore, bool considerSnapshot=true) const
static std::string determineSemaphoreName(std::string const &basedir, std::string const &objectpath)
void deleteObject(std::string const &path, long timestamp=-1) const
static void appendFlatHeader(o2::pmr::vector< char > &dest, const std::map< std::string, std::string > &headers)
std::vector< std::string > getAllFolders(std::string const &top) const
void vectoredLoadFileToMemory(std::vector< RequestContext > &requestContext) const
boost::interprocess::named_semaphore * createNamedSemaphore(std::string const &path) const
static bool removeSemaphore(std::string const &name, bool remove=false)
std::map< std::string, std::string > retrieveHeaders(std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp=-1) const
CcdbApi()
Default constructor.
static bool getCCDBEntryHeaders(std::string const &url, std::string const &etag, std::vector< std::string > &headers, const std::string &agentID="")
static CCDBQuery * retrieveQueryInfo(TFile &)
static constexpr const char * CCDBQUERY_ENTRY
void truncate(std::string const &path) const
int storeAsBinaryFile(const char *buffer, size_t size, const std::string &fileName, const std::string &objectType, const std::string &path, const std::map< std::string, std::string > &metadata, long startValidityTimestamp, long endValidityTimestamp, std::vector< char >::size_type maxSize=0) const
static void parseCCDBHeaders(std::vector< std::string > const &headers, std::vector< std::string > &pfns, std::string &etag)
bool retrieveBlob(std::string const &path, std::string const &targetdir, std::map< std::string, std::string > const &metadata, long timestamp, bool preservePathStructure=true, std::string const &localFileName="snapshot.root", std::string const &createdNotAfter="", std::string const &createdNotBefore="", std::map< std::string, std::string > *headers=nullptr) const
int updateMetadata(std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp, std::string const &id="", long newEOV=0)
static constexpr const char * CCDBMETA_ENTRY
static constexpr const char * CCDBOBJECT_ENTRY
void navigateSourcesAndLoadFile(RequestContext &requestContext, int &fromSnapshot, size_t *requestCounter) const
std::vector< std::string > parseSubFolders(std::string const &reply) const
virtual ~CcdbApi()
Default destructor.
void add(CCDBSemaphore const *ptr)
void remove(CCDBSemaphore const *ptr)
SemaphoreRegistry()=default
GLdouble GLdouble GLdouble GLdouble top
GLsizei const GLfloat * value
GLsizei const GLchar *const * path
GLbitfield GLuint64 timeout
GLenum GLuint GLenum GLsizei const GLchar * buf
information complementary to a CCDB object (path, metadata, startTimeValidity, endTimeValidity etc)
bool stdmap_to_jsonfile(std::map< std::string, std::string > const &meta, std::string const &filename)
size_t write_data(void *, size_t size, size_t nmemb, void *)
long getCurrentTimestamp()
returns the timestamp in long corresponding to "now"
size_t(*)(void *, size_t, size_t, void *) CurlWriteCallback
std::string sanitizeObjectName(const std::string &objectName)
bool jsonfile_to_stdmap(std::map< std::string, std::string > &meta, std::string const &filename)
long getFutureTimestamp(int secondsInFuture)
returns the timestamp in long corresponding to "now + secondsInFuture"
size_t CurlWrite_CallbackFunc_StdString2(void *contents, size_t size, size_t nmemb, std::string *s)
std::string timestamp() noexcept
Defining ITS Vertex explicitly as messageable.
std::vector< T, fair::mq::pmr::polymorphic_allocator< T > > vector
void createDirectoriesIfAbsent(std::string const &path)
std::string to_string(gsl::span< T, Size > span)
std::map< std::string, std::string > const & metadata
std::string createdNotAfter
std::map< std::string, std::string > & headers
o2::pmr::vector< char > & dest
std::string createdNotBefore
static DeploymentMode deploymentMode()
static std::string getClassName(const T &obj)
get the class name of the object
static std::unique_ptr< FileImage > createFileImage(const TObject &obj, const std::string &fileName, const std::string &objName)
static void trim(std::string &s)
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
static std::string concat_string(Ts const &... ts)
static bool endsWith(const std::string &s, const std::string &ending)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))
uint64_t const void const *restrict const msg