32#include <TStreamerInfo.h>
36#include <fairlogger/Logger.h>
42#include <boost/algorithm/string.hpp>
43#include <boost/asio/ip/host_name.hpp>
46#include <boost/interprocess/sync/named_semaphore.hpp>
50#include <unordered_set>
51#include "rapidjson/document.h"
52#include "rapidjson/writer.h"
53#include "rapidjson/stringbuffer.h"
61unique_ptr<TJAlienCredentials> CcdbApi::mJAlienCredentials =
nullptr;
76 boost::interprocess::named_semaphore* mSem =
nullptr;
77 std::string mSemName{};
92 std::unordered_set<CCDBSemaphore const*> mStore;
102 mIsCCDBDownloaderPreferred = 0;
103 if (deploymentMode == DeploymentMode::OnlineDDS && deploymentMode == DeploymentMode::OnlineECS && deploymentMode == DeploymentMode::OnlineAUX && deploymentMode == DeploymentMode::FST) {
104 mIsCCDBDownloaderPreferred = 1;
106 if (getenv(
"ALICEO2_ENABLE_MULTIHANDLE_CCDBAPI")) {
107 mIsCCDBDownloaderPreferred = atoi(getenv(
"ALICEO2_ENABLE_MULTIHANDLE_CCDBAPI"));
114 curl_global_cleanup();
118void CcdbApi::setUniqueAgentID()
120 std::string host = boost::asio::ip::host_name();
121 char const* jobID = getenv(
"ALIEN_PROC_ID");
132 LOG(
debug) <<
"On macOS we simply rely on TGrid::Connect(\"alien\").";
135 if (getenv(
"ALICEO2_CCDB_NOTOKENCHECK") && atoi(getenv(
"ALICEO2_CCDB_NOTOKENCHECK"))) {
138 if (getenv(
"JALIEN_TOKEN_CERT")) {
141 auto returncode = system(
"LD_PRELOAD= alien-token-info &> /dev/null");
142 if (returncode == -1) {
145 return returncode == 0;
148void CcdbApi::curlInit()
151 curl_global_init(CURL_GLOBAL_DEFAULT);
152 CcdbApi::mJAlienCredentials = std::make_unique<TJAlienCredentials>();
153 CcdbApi::mJAlienCredentials->loadCredentials();
154 CcdbApi::mJAlienCredentials->selectPreferedCredentials();
157 if (getenv(
"ALICEO2_CCDB_SOCKET_TIMEOUT")) {
158 auto timeoutMS = atoi(getenv(
"ALICEO2_CCDB_SOCKET_TIMEOUT"));
159 if (timeoutMS >= 0) {
160 LOG(info) <<
"Setting socket timeout to " << timeoutMS <<
" milliseconds";
170 constexpr const char* SNAPSHOTPREFIX =
"file://";
173 if (host.substr(0, 7).compare(SNAPSHOTPREFIX) == 0) {
174 auto path = host.substr(7);
175 initInSnapshotMode(
path);
196 std::string snapshotReport{};
197 const char* cachedir = getenv(
"ALICEO2_CCDB_LOCALCACHE");
198 namespace fs = std::filesystem;
200 if (cachedir[0] == 0) {
201 mSnapshotCachePath = fs::weakly_canonical(fs::absolute(
"."));
203 mSnapshotCachePath = fs::weakly_canonical(fs::absolute(cachedir));
205 snapshotReport = fmt::format(
"(cache snapshots to dir={}", mSnapshotCachePath);
208 mPreferSnapshotCache =
true;
209 if (mSnapshotCachePath.empty()) {
210 LOGP(fatal,
"IGNORE_VALIDITYCHECK_OF_CCDB_LOCALCACHE is defined but the ALICEO2_CCDB_LOCALCACHE is not");
212 snapshotReport +=
", prefer if available";
214 if (!snapshotReport.empty()) {
215 snapshotReport +=
')';
218 mNeedAlienToken = (host.find(
"https://") != std::string::npos) || (host.find(
"alice-ccdb.cern.ch") != std::string::npos);
221 if (getenv(
"ALICEO2_CCDB_CURL_TIMEOUT_DOWNLOAD")) {
222 auto timeout = atoi(getenv(
"ALICEO2_CCDB_CURL_TIMEOUT_DOWNLOAD"));
224 mCurlTimeoutDownload =
timeout;
231 mCurlTimeoutDownload = 15;
234 mCurlTimeoutDownload = 15;
236 mCurlTimeoutDownload = 5;
240 if (getenv(
"ALICEO2_CCDB_CURL_TIMEOUT_UPLOAD")) {
241 auto timeout = atoi(getenv(
"ALICEO2_CCDB_CURL_TIMEOUT_UPLOAD"));
250 mCurlTimeoutUpload = 3;
253 mCurlTimeoutUpload = 20;
255 mCurlTimeoutUpload = 20;
262 LOGP(
debug,
"Curl timeouts are set to: download={:2}, upload={:2} seconds", mCurlTimeoutDownload, mCurlTimeoutUpload);
264 LOGP(info,
"Init CcdApi with UserAgentID: {}, Host: {}{}, Curl timeouts: upload:{} download:{}", mUniqueAgentID, host,
265 mInSnapshotMode ?
"(snapshot readonly mode)" : snapshotReport.c_str(), mCurlTimeoutUpload, mCurlTimeoutDownload);
274void CcdbApi::updateMetaInformationInLocalFile(std::string
const&
filename, std::map<std::string, std::string>
const* headers,
CCDBQuery const* querysummary)
276 std::lock_guard<std::mutex> guard(
gIOMutex);
277 auto oldlevel = gErrorIgnoreLevel;
278 gErrorIgnoreLevel = 6001;
279 TFile snapshotfile(
filename.c_str(),
"UPDATE");
281 if (!snapshotfile.IsZombie()) {
283 snapshotfile.WriteObjectAny(querysummary, TClass::GetClass(
typeid(*querysummary)),
CCDBQUERY_ENTRY);
286 snapshotfile.WriteObjectAny(headers, TClass::GetClass(
typeid(*headers)),
CCDBMETA_ENTRY);
288 snapshotfile.Write();
289 snapshotfile.Close();
291 gErrorIgnoreLevel = oldlevel;
301 std::string tmpObjectName = objectName;
302 tmpObjectName.erase(std::remove_if(tmpObjectName.begin(), tmpObjectName.end(),
303 [](
auto const&
c) ->
bool { return (!std::isalnum(c) && c !=
'_' && c !=
'/' && c !=
'.'); }),
304 tmpObjectName.end());
305 return tmpObjectName;
312 std::lock_guard<std::mutex> guard(
gIOMutex);
326 std::string className = rootObject->GetName();
332 std::lock_guard<std::mutex> guard(
gIOMutex);
337 std::map<std::string, std::string>
const& metadata,
338 long startValidityTimestamp,
long endValidityTimestamp,
339 std::vector<char>::size_type maxSize)
const
343 LOGP(error,
"nullptr is provided for object {}/{}/{}",
path, startValidityTimestamp, endValidityTimestamp);
349 path, metadata, startValidityTimestamp, endValidityTimestamp, maxSize);
353 const std::string&
path,
const std::map<std::string, std::string>& metadata,
354 long startValidityTimestamp,
long endValidityTimestamp, std::vector<char>::size_type maxSize)
const
356 if (maxSize > 0 &&
size > maxSize) {
357 LOGP(alarm,
"Object will not be uploaded to {} since its size {} exceeds max allowed {}",
path,
size, maxSize);
363 long sanitizedStartValidityTimestamp = startValidityTimestamp;
364 if (startValidityTimestamp == -1) {
365 LOGP(info,
"Start of Validity not set, current timestamp used.");
368 long sanitizedEndValidityTimestamp = endValidityTimestamp;
369 if (endValidityTimestamp == -1) {
370 LOGP(info,
"End of Validity not set, start of validity plus 1 day used.");
373 if (mInSnapshotMode) {
374 auto pthLoc = getSnapshotDir(mSnapshotTopPath,
path);
376 auto flLoc = getSnapshotFile(mSnapshotTopPath,
path,
filename);
378 auto pent = flLoc.find_last_of(
'.');
379 if (pent == std::string::npos) {
382 flLoc.insert(pent, fmt::format(
"_{}_{}", startValidityTimestamp, endValidityTimestamp));
383 ofstream outf(flLoc.c_str(), ios::out | ios::binary);
387 throw std::runtime_error(fmt::format(
"Failed to write local CCDB file {}", flLoc));
389 std::map<std::string, std::string> metaheader(metadata);
391 metaheader[
"Valid-From"] =
std::to_string(startValidityTimestamp);
393 updateMetaInformationInLocalFile(flLoc.c_str(), &metaheader);
394 std::string metaStr{};
395 for (
const auto& mentry : metadata) {
396 metaStr += fmt::format(
"{}={};", mentry.first, mentry.second);
398 metaStr +=
"$USER_META;";
399 LOGP(info,
"Created local snapshot {}", flLoc);
400 LOGP(info, R
"(Upload with: o2-ccdb-upload --host "$ccdbhost" -p {} -f {} -k {} --starttimestamp {} --endtimestamp {} -m "{}")",
407 CURL* curl =
nullptr;
408 curl = curl_easy_init();
411 checkMetadataKeys(metadata);
413 if (curl !=
nullptr) {
414 auto mime = curl_mime_init(curl);
415 auto field = curl_mime_addpart(mime);
416 curl_mime_name(field,
"send");
417 curl_mime_filedata(field,
filename.c_str());
420 struct curl_slist* headerlist =
nullptr;
421 static const char buf[] =
"Expect:";
422 headerlist = curl_slist_append(headerlist,
buf);
426 curl_easy_setopt(curl, CURLOPT_MIMEPOST, mime);
427 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headerlist);
428 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
429 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
430 curl_easy_setopt(curl, CURLOPT_TIMEOUT, mCurlTimeoutUpload);
432 CURLcode
res = CURL_LAST;
434 for (
size_t hostIndex = 0; hostIndex < hostsPool.size() &&
res > 0; hostIndex++) {
435 std::string fullUrl = getFullUrlForStorage(curl,
path, objectType, metadata, sanitizedStartValidityTimestamp, sanitizedEndValidityTimestamp, hostIndex);
436 LOG(debug3) <<
"Full URL Encoded: " << fullUrl;
438 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.c_str());
441 res = CURL_perform(curl);
443 if (
res != CURLE_OK) {
444 if (
res == CURLE_OPERATION_TIMEDOUT) {
445 LOGP(alarm,
"curl_easy_perform() timed out. Consider increasing the timeout using the env var `ALICEO2_CCDB_CURL_TIMEOUT_UPLOAD` (seconds), current one is {}", mCurlTimeoutUpload);
447 LOGP(alarm,
"curl_easy_perform() failed: {}", curl_easy_strerror(
res));
454 curl_easy_cleanup(curl);
457 curl_slist_free_all(headerlist);
459 curl_mime_free(mime);
461 LOGP(alarm,
"curl initialization failure");
468 long startValidityTimestamp,
long endValidityTimestamp, std::vector<char>::size_type maxSize)
const
472 LOGP(error,
"nullptr is provided for object {}/{}/{}",
path, startValidityTimestamp, endValidityTimestamp);
480std::string CcdbApi::getFullUrlForStorage(CURL* curl,
const std::string&
path,
const std::string& objtype,
481 const std::map<std::string, std::string>& metadata,
482 long startValidityTimestamp,
long endValidityTimestamp,
int hostIndex)
const
485 std::string startValidityString = getTimestampString(startValidityTimestamp < 0 ?
getCurrentTimestamp() : startValidityTimestamp);
486 std::string endValidityString = getTimestampString(endValidityTimestamp < 0 ?
getFutureTimestamp(60 * 60 * 24 * 1) : endValidityTimestamp);
488 std::string url = getHostUrl(hostIndex);
490 std::string fullUrl = url +
"/" +
path +
"/" + startValidityString +
"/" + endValidityString +
"/";
493 char* objtypeEncoded = curl_easy_escape(curl, objtype.c_str(), objtype.size());
494 fullUrl +=
"ObjectType=" + std::string(objtypeEncoded) +
"/";
495 curl_free(objtypeEncoded);
497 for (
auto& kv : metadata) {
498 std::string mfirst = kv.first;
499 std::string msecond = kv.second;
501 char* mfirstEncoded = curl_easy_escape(curl, mfirst.c_str(), mfirst.size());
502 char* msecondEncoded = curl_easy_escape(curl, msecond.c_str(), msecond.size());
503 fullUrl += std::string(mfirstEncoded) +
"=" + std::string(msecondEncoded) +
"/";
504 curl_free(mfirstEncoded);
505 curl_free(msecondEncoded);
511std::string CcdbApi::getFullUrlForRetrieval(CURL* curl,
const std::string&
path,
const std::map<std::string, std::string>& metadata,
long timestamp,
int hostIndex)
const
513 if (mInSnapshotMode) {
514 return getSnapshotFile(mSnapshotTopPath,
path);
520 std::string hostUrl = getHostUrl(hostIndex);
522 std::string fullUrl = hostUrl +
"/" +
path +
"/" + validityString +
"/";
524 for (
auto& kv : metadata) {
525 std::string mfirst = kv.first;
526 std::string msecond = kv.second;
528 char* mfirstEncoded = curl_easy_escape(curl, mfirst.c_str(), mfirst.size());
529 char* msecondEncoded = curl_easy_escape(curl, msecond.c_str(), msecond.size());
530 fullUrl += std::string(mfirstEncoded) +
"=" + std::string(msecondEncoded) +
"/";
531 curl_free(mfirstEncoded);
532 curl_free(msecondEncoded);
554static size_t WriteMemoryCallback(
void* contents,
size_t size,
size_t nmemb,
void* userp)
556 size_t realsize =
size * nmemb;
559 mem->memory = (
char*)realloc(mem->memory, mem->size + realsize + 1);
560 if (mem->memory ==
nullptr) {
561 printf(
"not enough memory (realloc returned NULL)\n");
565 memcpy(&(mem->memory[mem->size]), contents, realsize);
566 mem->size += realsize;
567 mem->memory[mem->size] = 0;
583static size_t WriteToFileCallback(
void*
ptr,
size_t size,
size_t nmemb, FILE*
stream)
596static CURLcode ssl_ctx_callback(CURL*,
void*,
void* parm)
598 std::string
msg((
const char*)parm);
601 if (
msg.length() > 0 &&
end == -1) {
603 }
else if (
end > 0) {
615 CredentialsKind cmk = mJAlienCredentials->getPreferedCredentials();
618 if (cmk == cNOT_FOUND) {
622 TJAlienCredentialsObject cmo = mJAlienCredentials->get(cmk);
624 char* CAPath = getenv(
"X509_CERT_DIR");
626 curl_easy_setopt(curl_handle, CURLOPT_CAPATH, CAPath);
628 curl_easy_setopt(curl_handle, CURLOPT_CAINFO,
nullptr);
629 curl_easy_setopt(curl_handle, CURLOPT_SSLCERT, cmo.certpath.c_str());
630 curl_easy_setopt(curl_handle, CURLOPT_SSLKEY, cmo.keypath.c_str());
633 curl_easy_setopt(curl_handle, CURLOPT_SSL_CTX_FUNCTION, ssl_ctx_callback);
634 curl_easy_setopt(curl_handle, CURLOPT_SSL_CTX_DATA, mJAlienCredentials->getMessages().c_str());
641void CcdbApi::initCurlOptionsForRetrieve(CURL* curlHandle,
void* chunk,
CurlWriteCallback writeCallback,
bool followRedirect)
const
643 curl_easy_setopt(curlHandle, CURLOPT_WRITEFUNCTION, writeCallback);
644 curl_easy_setopt(curlHandle, CURLOPT_WRITEDATA, chunk);
645 curl_easy_setopt(curlHandle, CURLOPT_FOLLOWLOCATION, followRedirect ? 1L : 0L);
650template <
typename MapType = std::map<std::
string, std::
string>>
651size_t header_map_callback(
char*
buffer,
size_t size,
size_t nitems,
void* userdata)
653 auto* headers =
static_cast<MapType*
>(userdata);
654 auto header = std::string(
buffer,
size * nitems);
655 std::string::size_type
index = header.find(
':', 0);
656 if (
index != std::string::npos) {
657 const auto key = boost::algorithm::trim_copy(header.substr(0,
index));
658 const auto value = boost::algorithm::trim_copy(header.substr(
index + 1));
659 LOGP(
debug,
"Adding #{} {} -> {}", headers->size(),
key,
value);
661 if (
key ==
"Content-Length") {
662 auto cl = headers->find(
"Content-Length");
663 if (cl != headers->end()) {
664 if (std::stol(cl->second) < stol(
value)) {
674 auto cl = headers->find(
"ETag");
675 if (cl != headers->end()) {
681 if (
key ==
"Content-Type") {
682 auto cl = headers->find(
"Content-Type");
683 if (cl != headers->end()) {
689 headers->insert(std::make_pair(
key,
value));
692 return size * nitems;
696void CcdbApi::initCurlHTTPHeaderOptionsForRetrieve(CURL* curlHandle, curl_slist*& option_list,
long timestamp, std::map<std::string, std::string>* headers, std::string
const& etag,
697 const std::string& createdNotAfter,
const std::string& createdNotBefore)
const
701 option_list = curl_slist_append(option_list, (
"If-None-Match: " + etag).c_str());
704 if (!createdNotAfter.empty()) {
705 option_list = curl_slist_append(option_list, (
"If-Not-After: " + createdNotAfter).c_str());
708 if (!createdNotBefore.empty()) {
709 option_list = curl_slist_append(option_list, (
"If-Not-Before: " + createdNotBefore).c_str());
712 if (headers !=
nullptr) {
713 option_list = curl_slist_append(option_list, (
"If-None-Match: " +
to_string(timestamp)).c_str());
714 curl_easy_setopt(curlHandle, CURLOPT_HEADERFUNCTION, header_map_callback<>);
715 curl_easy_setopt(curlHandle, CURLOPT_HEADERDATA, headers);
719 curl_easy_setopt(curlHandle, CURLOPT_HTTPHEADER, option_list);
722 curl_easy_setopt(curlHandle, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
725bool CcdbApi::receiveToFile(FILE* fileHandle, std::string
const&
path, std::map<std::string, std::string>
const& metadata,
726 long timestamp, std::map<std::string, std::string>* headers, std::string
const& etag,
727 const std::string& createdNotAfter,
const std::string& createdNotBefore,
bool followRedirect)
const
729 return receiveObject((
void*)fileHandle,
path, metadata, timestamp, headers, etag, createdNotAfter, createdNotBefore, followRedirect, (CurlWriteCallback)&WriteToFileCallback);
732bool CcdbApi::receiveToMemory(
void* chunk, std::string
const&
path, std::map<std::string, std::string>
const& metadata,
733 long timestamp, std::map<std::string, std::string>* headers, std::string
const& etag,
734 const std::string& createdNotAfter,
const std::string& createdNotBefore,
bool followRedirect)
const
736 return receiveObject((
void*)chunk,
path, metadata, timestamp, headers, etag, createdNotAfter, createdNotBefore, followRedirect, (CurlWriteCallback)&WriteMemoryCallback);
739bool CcdbApi::receiveObject(
void* dataHolder, std::string
const&
path, std::map<std::string, std::string>
const& metadata,
740 long timestamp, std::map<std::string, std::string>* headers, std::string
const& etag,
741 const std::string& createdNotAfter,
const std::string& createdNotBefore,
bool followRedirect,
CurlWriteCallback writeCallback)
const
745 curlHandle = curl_easy_init();
746 curl_easy_setopt(curlHandle, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
748 if (curlHandle !=
nullptr) {
751 initCurlOptionsForRetrieve(curlHandle, dataHolder, writeCallback, followRedirect);
752 curl_slist* option_list =
nullptr;
753 initCurlHTTPHeaderOptionsForRetrieve(curlHandle, option_list, timestamp, headers, etag, createdNotAfter, createdNotBefore);
755 long responseCode = 0;
756 CURLcode curlResultCode = CURL_LAST;
758 for (
size_t hostIndex = 0; hostIndex < hostsPool.size() && (responseCode >= 400 || curlResultCode > 0); hostIndex++) {
759 std::string fullUrl = getFullUrlForRetrieval(curlHandle,
path, metadata, timestamp, hostIndex);
760 curl_easy_setopt(curlHandle, CURLOPT_URL, fullUrl.c_str());
762 curlResultCode = CURL_perform(curlHandle);
764 if (curlResultCode != CURLE_OK) {
765 LOGP(alarm,
"curl_easy_perform() failed: {}", curl_easy_strerror(curlResultCode));
767 curlResultCode = curl_easy_getinfo(curlHandle, CURLINFO_RESPONSE_CODE, &responseCode);
768 if ((curlResultCode == CURLE_OK) && (responseCode < 300)) {
769 curl_slist_free_all(option_list);
770 curl_easy_cleanup(curlHandle);
773 if (curlResultCode != CURLE_OK) {
774 LOGP(alarm,
"invalid URL {}", fullUrl);
776 LOGP(alarm,
"not found under link {}", fullUrl);
782 curl_slist_free_all(option_list);
783 curl_easy_cleanup(curlHandle);
789 long timestamp)
const
797 bool res = receiveToMemory((
void*)&chunk,
path, metadata, timestamp);
800 std::lock_guard<std::mutex> guard(
gIOMutex);
802 mess.SetBuffer(chunk.
memory, chunk.
size, kFALSE);
807 LOGP(info,
"couldn't retrieve the object {}",
path);
819 std::string
str = inp;
820 str.erase(std::remove_if(
str.begin(),
str.end(), ::isspace),
str.end());
821 str = std::regex_replace(
str, std::regex(
"::"),
"-");
827 long timestamp, std::map<std::string, std::string>* headers, std::string
const& etag,
828 const std::string& createdNotAfter,
const std::string& createdNotBefore)
const
834 long timestamp,
bool preservePath, std::string
const& localFileName, std::string
const& createdNotAfter, std::string
const& createdNotBefore)
const
838 std::string fulltargetdir = targetdir + (preservePath ? (
'/' +
path) :
"");
842 }
catch (std::exception e) {
843 LOGP(error,
"Could not create local snapshot cache directory {}, reason: {}", fulltargetdir, e.what());
847 std::pmr::vector<char> buff;
848 std::map<std::string, std::string> headers;
850 loadFileToMemory(buff,
path, metadata, timestamp, &headers,
"", createdNotAfter, createdNotBefore,
false);
851 if ((headers.count(
"Error") != 0) || (buff.empty())) {
852 LOGP(error,
"Unable to find object {}/{}, Aborting",
path, timestamp);
856 auto getFileName = [&headers]() {
857 auto& s = headers[
"Content-Disposition"];
859 std::regex re(
"(.*;)filename=\"(.*)\"");
861 if (std::regex_match(s.c_str(),
m, re)) {
865 std::string backupname(
"ccdb-blob.bin");
866 LOG(error) <<
"Cannot determine original filename from Content-Disposition ... falling back to " << backupname;
869 auto filename = localFileName.size() > 0 ? localFileName : getFileName();
870 std::string targetpath = fulltargetdir +
"/" +
filename;
872 std::ofstream objFile(targetpath, std::ios::out | std::ofstream::binary);
873 std::copy(buff.begin(), buff.end(), std::ostreambuf_iterator<char>(objFile));
874 if (!objFile.good()) {
875 LOGP(error,
"Unable to open local file {}, Aborting", targetpath);
881 updateMetaInformationInLocalFile(targetpath.c_str(), &headers, &querysummary);
885void CcdbApi::snapshot(std::string
const& ccdbrootpath, std::string
const& localDir,
long timestamp)
const
889 std::map<std::string, std::string> metadata;
890 for (
auto& folder : allfolders) {
900 auto object =
file.GetObjectChecked(what, cl);
904 std::string objectName(cl->GetName());
906 object =
file.GetObjectChecked(objectName.c_str(), cl);
907 LOG(warn) <<
"Did not find object under expected name " << what;
911 LOG(warn) <<
"Found object under deprecated name " << cl->GetName();
916 if (cl->InheritsFrom(
"TObject")) {
919 auto tree =
dynamic_cast<TTree*
>((
TObject*)
object);
921 tree->LoadBaskets(0x1L << 32);
922 tree->SetDirectory(
nullptr);
925 auto h =
dynamic_cast<TH1*
>((
TObject*)
object);
927 h->SetDirectory(
nullptr);
935void* CcdbApi::extractFromLocalFile(std::string
const&
filename, std::type_info
const& tinfo, std::map<std::string, std::string>* headers)
const
937 if (!std::filesystem::exists(
filename)) {
938 LOG(error) <<
"Local snapshot " <<
filename <<
" not found \n";
941 std::lock_guard<std::mutex> guard(
gIOMutex);
942 auto tcl = tinfo2TClass(tinfo);
947 *headers = *storedmeta;
950 if ((
isSnapshotMode() || mPreferSnapshotCache) && headers->find(
"ETag") == headers->end()) {
953 if (headers->find(
"fileSize") == headers->end()) {
954 (*headers)[
"fileSize"] = fmt::format(
"{}",
f.GetEND());
960bool CcdbApi::initTGrid()
const
962 if (mNeedAlienToken && !gGrid) {
963 static bool allowNoToken = getenv(
"ALICEO2_CCDB_NOTOKENCHECK") && atoi(getenv(
"ALICEO2_CCDB_NOTOKENCHECK"));
965 LOG(fatal) <<
"Alien Token Check failed - Please get an alien token before running with https CCDB endpoint, or alice-ccdb.cern.ch!";
967 TGrid::Connect(
"alien");
968 static bool errorShown =
false;
969 if (!gGrid && errorShown ==
false) {
971 LOG(error) <<
"TGrid::Connect returned nullptr. May be due to missing alien token";
973 LOG(fatal) <<
"TGrid::Connect returned nullptr. May be due to missing alien token";
978 return gGrid !=
nullptr;
981void* CcdbApi::downloadFilesystemContent(std::string
const& url, std::type_info
const& tinfo, std::map<std::string, std::string>* headers)
const
983 if ((url.find(
"alien:/", 0) != std::string::npos) && !initTGrid()) {
986 std::lock_guard<std::mutex> guard(
gIOMutex);
987 auto memfile = TMemFile::Open(url.c_str(),
"OPEN");
989 auto cl = tinfo2TClass(tinfo);
991 if (headers && headers->find(
"fileSize") == headers->end()) {
992 (*headers)[
"fileSize"] = fmt::format(
"{}", memfile->GetEND());
1000void* CcdbApi::interpretAsTMemFileAndExtract(
char* contentptr,
size_t contentsize, std::type_info
const& tinfo)
1003 Int_t previousErrorLevel = gErrorIgnoreLevel;
1004 gErrorIgnoreLevel = kFatal;
1005 std::lock_guard<std::mutex> guard(
gIOMutex);
1006 TMemFile memFile(
"name", contentptr, contentsize,
"READ");
1007 gErrorIgnoreLevel = previousErrorLevel;
1008 if (!memFile.IsZombie()) {
1009 auto tcl = tinfo2TClass(tinfo);
1020void* CcdbApi::navigateURLsAndRetrieveContent(CURL* curl_handle, std::string
const& url, std::type_info
const& tinfo, std::map<std::string, std::string>* headers)
const
1025 static thread_local std::multimap<std::string, std::string> headerData;
1028 if ((url.find(
"alien:/", 0) != std::string::npos) || (url.find(
"file:/", 0) != std::string::npos)) {
1029 return downloadFilesystemContent(url, tinfo, headers);
1036 curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
1038 MemoryStruct chunk{(
char*)malloc(1), 0};
1039 initCurlOptionsForRetrieve(curl_handle, (
void*)&chunk, WriteMemoryCallback,
false);
1041 curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, header_map_callback<
decltype(headerData)>);
1043 curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, (
void*)&headerData);
1047 auto res = CURL_perform(curl_handle);
1048 long response_code = -1;
1049 void* content =
nullptr;
1051 if (
res == CURLE_OK && curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &response_code) == CURLE_OK) {
1053 for (
auto& p : headerData) {
1054 (*headers)[
p.first] =
p.second;
1057 if (200 <= response_code && response_code < 300) {
1059 content = interpretAsTMemFileAndExtract(chunk.memory, chunk.size, tinfo);
1060 if (headers && headers->find(
"fileSize") == headers->end()) {
1061 (*headers)[
"fileSize"] = fmt::format(
"{}", chunk.size);
1063 }
else if (response_code == 304) {
1068 LOGP(
debug,
"Object exists but I am not serving it since it's already in your possession");
1071 else if (300 <= response_code && response_code < 400) {
1077 auto complement_Location = [
this](std::string
const& loc) {
1078 if (loc[0] ==
'/') {
1085 std::vector<std::string> locs;
1086 auto iter = headerData.find(
"Location");
1087 if (iter != headerData.end()) {
1088 locs.push_back(complement_Location(iter->second));
1091 auto iter2 = headerData.find(
"Content-Location");
1092 if (iter2 != headerData.end()) {
1093 auto range = headerData.equal_range(
"Content-Location");
1094 for (
auto it =
range.first; it !=
range.second; ++it) {
1095 if (std::find(locs.begin(), locs.end(), it->second) == locs.end()) {
1096 locs.push_back(complement_Location(it->second));
1100 for (
auto& l : locs) {
1102 LOG(
debug) <<
"Trying content location " << l;
1103 content = navigateURLsAndRetrieveContent(curl_handle, l, tinfo, headers);
1109 }
else if (response_code == 404) {
1110 LOG(error) <<
"Requested resource does not exist: " << url;
1113 LOG(error) <<
"Error in fetching object " << url <<
", curl response code:" << response_code;
1117 if (chunk.memory !=
nullptr) {
1121 LOGP(alarm,
"Curl request to {} failed with result {}, response code: {}", url,
int(
res), response_code);
1126 (*headers)[
"Error"] =
"An error occurred during retrieval";
1132 std::map<std::string, std::string>
const& metadata,
long timestamp,
1133 std::map<std::string, std::string>* headers, std::string
const& etag,
1134 const std::string& createdNotAfter,
const std::string& createdNotBefore)
const
1136 if (!mSnapshotCachePath.empty()) {
1138 auto semaphore_barrier = std::make_unique<CCDBSemaphore>(mSnapshotCachePath,
path);
1139 std::string logfile = mSnapshotCachePath +
"/log";
1140 std::fstream out(logfile, ios_base::out | ios_base::app);
1141 if (out.is_open()) {
1142 out <<
"CCDB-access[" << getpid() <<
"] of " << mUniqueAgentID <<
" to " <<
path <<
" timestamp " << timestamp <<
"\n";
1144 auto snapshotfile = getSnapshotFile(mSnapshotCachePath,
path);
1145 bool snapshoting =
false;
1146 if (!std::filesystem::exists(snapshotfile)) {
1148 out <<
"CCDB-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
" downloading to snapshot " << snapshotfile <<
"\n";
1151 out <<
"CCDB-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
" failed to create directory for " << snapshotfile <<
"\n";
1154 out <<
"CCDB-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
"serving from local snapshot " << snapshotfile <<
"\n";
1157 auto res = extractFromLocalFile(snapshotfile, tinfo, headers);
1159 logReading(
path, timestamp, headers,
"retrieve from snapshot");
1166 CURL* curl_handle = curl_easy_init();
1167 curl_easy_setopt(curl_handle, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1168 std::string fullUrl = getFullUrlForRetrieval(curl_handle,
path, metadata, timestamp);
1170 if (mInSnapshotMode) {
1171 auto res = extractFromLocalFile(fullUrl, tinfo, headers);
1173 logReading(
path, timestamp, headers,
"retrieve from snapshot");
1178 curl_slist* option_list =
nullptr;
1179 initCurlHTTPHeaderOptionsForRetrieve(curl_handle, option_list, timestamp, headers, etag, createdNotAfter, createdNotBefore);
1180 auto content = navigateURLsAndRetrieveContent(curl_handle, fullUrl, tinfo, headers);
1182 for (
size_t hostIndex = 1; hostIndex < hostsPool.size() && !(content); hostIndex++) {
1183 fullUrl = getFullUrlForRetrieval(curl_handle,
path, metadata, timestamp, hostIndex);
1184 content = navigateURLsAndRetrieveContent(curl_handle, fullUrl, tinfo, headers);
1187 logReading(
path, timestamp, headers,
"retrieve");
1189 curl_slist_free_all(option_list);
1190 curl_easy_cleanup(curl_handle);
1196 size_t newLength =
size * nmemb;
1197 size_t oldLength = s->size();
1199 s->resize(oldLength + newLength);
1200 }
catch (std::bad_alloc& e) {
1201 LOG(error) <<
"memory error when getting data from CCDB";
1205 std::copy((
char*)contents, (
char*)contents + newLength, s->begin() + oldLength);
1206 return size * nmemb;
1209std::string
CcdbApi::list(std::string
const&
path,
bool latestOnly, std::string
const& returnFormat,
long createdNotAfter,
long createdNotBefore)
const
1212 CURLcode
res = CURL_LAST;
1215 curl = curl_easy_init();
1216 if (curl !=
nullptr) {
1218 curl_easy_setopt(curl, CURLOPT_WRITEDATA, &
result);
1219 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1221 struct curl_slist* headers =
nullptr;
1222 headers = curl_slist_append(headers, (std::string(
"Accept: ") + returnFormat).c_str());
1223 headers = curl_slist_append(headers, (std::string(
"Content-Type: ") + returnFormat).c_str());
1224 if (createdNotAfter >= 0) {
1225 headers = curl_slist_append(headers, (
"If-Not-After: " +
std::to_string(createdNotAfter)).c_str());
1227 if (createdNotBefore >= 0) {
1228 headers = curl_slist_append(headers, (
"If-Not-Before: " +
std::to_string(createdNotBefore)).c_str());
1230 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
1234 std::string fullUrl;
1236 for (
size_t hostIndex = 0; hostIndex < hostsPool.size() &&
res != CURLE_OK; hostIndex++) {
1237 fullUrl = getHostUrl(hostIndex);
1238 fullUrl += latestOnly ?
"/latest/" :
"/browse/";
1240 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.c_str());
1242 res = CURL_perform(curl);
1243 if (
res != CURLE_OK) {
1244 LOGP(alarm,
"CURL_perform() failed: {}", curl_easy_strerror(
res));
1247 curl_slist_free_all(headers);
1248 curl_easy_cleanup(curl);
1254std::string CcdbApi::getTimestampString(
long timestamp)
const
1265 stringstream fullUrl;
1268 curl = curl_easy_init();
1269 if (curl !=
nullptr) {
1270 curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST,
"DELETE");
1271 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1274 for (
size_t hostIndex = 0; hostIndex < hostsPool.size(); hostIndex++) {
1275 fullUrl << getHostUrl(hostIndex) <<
"/" <<
path <<
"/" << timestampLocal;
1276 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.str().c_str());
1279 res = CURL_perform(curl);
1280 if (
res != CURLE_OK) {
1281 LOGP(alarm,
"CURL_perform() failed: {}", curl_easy_strerror(
res));
1283 curl_easy_cleanup(curl);
1292 stringstream fullUrl;
1293 for (
size_t i = 0;
i < hostsPool.size();
i++) {
1294 std::string url = getHostUrl(
i);
1295 fullUrl << url <<
"/truncate/" <<
path;
1297 curl = curl_easy_init();
1298 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1299 if (curl !=
nullptr) {
1300 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.str().c_str());
1305 res = CURL_perform(curl);
1306 if (
res != CURLE_OK) {
1307 LOGP(alarm,
"CURL_perform() failed: {}", curl_easy_strerror(
res));
1309 curl_easy_cleanup(curl);
1316 return size * nmemb;
1322 CURLcode
res = CURL_LAST;
1325 curl = curl_easy_init();
1326 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1328 for (
size_t hostIndex = 0; hostIndex < hostsPool.size() &&
res != CURLE_OK; hostIndex++) {
1329 curl_easy_setopt(curl, CURLOPT_URL, mUrl.data());
1330 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
write_data);
1332 res = CURL_perform(curl);
1337 curl_easy_cleanup(curl);
1346 std::stringstream ss(reply.c_str());
1348 std::vector<std::string> folders;
1350 size_t numberoflines = std::count(reply.begin(), reply.end(),
'\n');
1351 bool inSubFolderSection =
false;
1353 for (
size_t linenumber = 0; linenumber < numberoflines; ++linenumber) {
1354 std::getline(ss, line);
1355 if (inSubFolderSection && line.size() > 0) {
1360 if (line.compare(
"Subfolders:") == 0) {
1361 inSubFolderSection =
true;
1369size_t header_callback(
char*
buffer,
size_t size,
size_t nitems,
void* userdata)
1371 auto* headers =
static_cast<std::vector<std::string>*
>(userdata);
1372 auto header = std::string(
buffer,
size * nitems);
1373 headers->emplace_back(std::string(header.data()));
1374 return size * nitems;
1382 auto p = std::filesystem::path(
filename).parent_path();
1383 if (!std::filesystem::exists(p)) {
1384 std::filesystem::create_directories(p);
1387 rapidjson::StringBuffer
buffer;
1388 rapidjson::Writer<rapidjson::StringBuffer> writer(
buffer);
1389 writer.StartObject();
1390 for (
const auto& pair : meta) {
1391 writer.Key(pair.first.c_str());
1392 writer.String(pair.second.c_str());
1398 if (
file.is_open()) {
1411 if (!
file.is_open()) {
1412 std::cerr <<
"Failed to open file for reading." << std::endl;
1416 std::string jsonStr((std::istreambuf_iterator<char>(
file)), std::istreambuf_iterator<char>());
1419 rapidjson::Document document;
1420 document.Parse(jsonStr.c_str());
1422 if (document.HasParseError()) {
1423 std::cerr <<
"Error parsing JSON" << std::endl;
1428 for (
auto itr = document.MemberBegin(); itr != document.MemberEnd(); ++itr) {
1429 meta[itr->name.GetString()] = itr->value.GetString();
1434std::map<std::string, std::string>
CcdbApi::retrieveHeaders(std::string
const&
path, std::map<std::string, std::string>
const& metadata,
long timestamp)
const
1437 auto do_remote_header_call = [
this, &
path, &metadata, timestamp]() -> std::map<std::string, std::string> {
1438 CURL* curl = curl_easy_init();
1439 CURLcode
res = CURL_LAST;
1440 std::string fullUrl = getFullUrlForRetrieval(curl,
path, metadata, timestamp);
1441 std::map<std::string, std::string> headers;
1443 if (curl !=
nullptr) {
1444 struct curl_slist*
list =
nullptr;
1447 curl_easy_setopt(curl, CURLOPT_HTTPHEADER,
list);
1450 curl_easy_setopt(curl, CURLOPT_NOBODY, 1L);
1451 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
1452 curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, header_map_callback<>);
1453 curl_easy_setopt(curl, CURLOPT_HEADERDATA, &headers);
1454 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1459 long httpCode = 404;
1460 CURLcode getCodeRes = CURL_LAST;
1461 for (
size_t hostIndex = 0; hostIndex < hostsPool.size() && (httpCode >= 400 ||
res > 0 || getCodeRes > 0); hostIndex++) {
1462 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.c_str());
1463 res = CURL_perform(curl);
1464 if (
res != CURLE_OK &&
res != CURLE_UNSUPPORTED_PROTOCOL) {
1468 LOG(error) <<
"CURL_perform() failed: " << curl_easy_strerror(
res);
1470 getCodeRes = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode);
1472 if (httpCode == 404) {
1475 curl_easy_cleanup(curl);
1480 if (!mSnapshotCachePath.empty()) {
1482 auto semaphore_barrier = std::make_unique<CCDBSemaphore>(mSnapshotCachePath + std::string(
"_headers"),
path);
1484 std::string logfile = mSnapshotCachePath +
"/log";
1485 std::fstream out(logfile, ios_base::out | ios_base::app);
1486 if (out.is_open()) {
1487 out <<
"CCDB-header-access[" << getpid() <<
"] of " << mUniqueAgentID <<
" to " <<
path <<
" timestamp " << timestamp <<
"\n";
1489 auto snapshotfile = getSnapshotFile(mSnapshotCachePath,
path +
"/" +
std::to_string(timestamp),
"header.json");
1490 if (!std::filesystem::exists(snapshotfile)) {
1491 out <<
"CCDB-header-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
" storing to snapshot " << snapshotfile <<
"\n";
1494 auto meta = do_remote_header_call();
1498 LOG(warn) <<
"Failed to cache the header information to disc";
1502 out <<
"CCDB-header-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
"serving from local snapshot " << snapshotfile <<
"\n";
1503 std::map<std::string, std::string> meta;
1505 LOG(warn) <<
"Failed to read cached information from disc";
1506 return do_remote_header_call();
1511 return do_remote_header_call();
1516 auto curl = curl_easy_init();
1522 struct curl_slist*
list =
nullptr;
1523 list = curl_slist_append(
list, (
"If-None-Match: " + etag).c_str());
1525 curl_easy_setopt(curl, CURLOPT_HTTPHEADER,
list);
1527 curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
1529 curl_easy_setopt(curl, CURLOPT_NOBODY, 1L);
1530 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
1531 curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, header_callback);
1532 curl_easy_setopt(curl, CURLOPT_HEADERDATA, &headers);
1533 if (!agentID.empty()) {
1534 curl_easy_setopt(curl, CURLOPT_USERAGENT, agentID.c_str());
1540 curl_easy_perform(curl);
1541 long http_code = 404;
1542 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code);
1543 if (http_code == 304) {
1551 static std::string etagHeader =
"ETag: ";
1552 static std::string locationHeader =
"Content-Location: ";
1553 for (
auto h : headers) {
1554 if (
h.find(etagHeader) == 0) {
1555 etag = std::string(
h.data() + etagHeader.size());
1556 }
else if (
h.find(locationHeader) == 0) {
1557 pfns.emplace_back(std::string(
h.data() + locationHeader.size(),
h.size() - locationHeader.size()));
1573 auto object =
file.GetObjectChecked(
CCDBMETA_ENTRY, TClass::GetClass(
typeid(std::map<std::string, std::string>)));
1575 return static_cast<std::map<std::string, std::string>*
>(
object);
1582void traverseAndFillFolders(
CcdbApi const& api, std::string
const&
top, std::vector<std::string>& folders)
1586 folders.emplace_back(
top);
1589 if (subfolders.size() > 0) {
1591 for (
auto& sub : subfolders) {
1592 traverseAndFillFolders(api, sub, folders);
1602 std::vector<std::string> folders;
1603 traverseAndFillFolders(*
this,
top, folders);
1607TClass* CcdbApi::tinfo2TClass(std::type_info
const& tinfo)
1609 TClass* cl = TClass::GetClass(tinfo);
1611 throw std::runtime_error(fmt::format(
"Could not retrieve ROOT dictionary for type {}, aborting", tinfo.name()));
1617int CcdbApi::updateMetadata(std::string
const&
path, std::map<std::string, std::string>
const& metadata,
long timestamp, std::string
const&
id,
long newEOV)
1620 CURL* curl = curl_easy_init();
1621 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1622 if (curl !=
nullptr) {
1624 stringstream fullUrl;
1625 for (
size_t hostIndex = 0; hostIndex < hostsPool.size(); hostIndex++) {
1626 fullUrl << getHostUrl(hostIndex) <<
"/" <<
path <<
"/" << timestamp;
1628 fullUrl <<
"/" << newEOV;
1631 fullUrl <<
"/" <<
id;
1635 for (
auto& kv : metadata) {
1636 std::string mfirst = kv.first;
1637 std::string msecond = kv.second;
1639 char* mfirstEncoded = curl_easy_escape(curl, mfirst.c_str(), mfirst.size());
1640 char* msecondEncoded = curl_easy_escape(curl, msecond.c_str(), msecond.size());
1641 fullUrl << std::string(mfirstEncoded) +
"=" + std::string(msecondEncoded) +
"&";
1642 curl_free(mfirstEncoded);
1643 curl_free(msecondEncoded);
1646 if (curl !=
nullptr) {
1647 LOG(
debug) <<
"passing to curl: " << fullUrl.str();
1648 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.str().c_str());
1649 curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST,
"PUT");
1650 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1651 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
1655 res = CURL_perform(curl);
1656 if (
res != CURLE_OK) {
1657 LOGP(alarm,
"CURL_perform() failed: {}, code: {}", curl_easy_strerror(
res),
int(
res));
1662 curl_easy_cleanup(curl);
1669void CcdbApi::initHostsPool(std::string hosts)
1672 auto splitted = hosts | std::views::transform([](
char c) {
return (
c ==
';') ?
',' :
c; }) | std::views::split(
',');
1673 for (
auto&& part : splitted) {
1674 hostsPool.emplace_back(part.begin(), part.end());
1678std::string CcdbApi::getHostUrl(
int hostIndex)
const
1680 return hostsPool.at(hostIndex);
1686 data->hoPair.object = &requestContext.
dest;
1688 std::function<bool(std::string)> localContentCallback = [
this, &requestContext](std::string url) {
1692 auto writeCallback = [](
void* contents,
size_t size,
size_t nmemb,
void* chunkptr) {
1694 auto& chunk = *ho.
object;
1695 size_t realsize =
size * nmemb, sz = 0;
1698 if (chunk.capacity() < chunk.size() + realsize) {
1700 const char hannot[] =
"header";
1701 size_t hsize = getFlatHeaderSize(ho.header);
1702 auto cl = ho.header.find(
"Content-Length");
1703 if (cl != ho.header.end()) {
1704 size_t sizeFromHeader = std::stol(cl->second);
1705 sz = hsize + std::max(chunk.size() * (sizeFromHeader ? 1 : 2) + realsize, sizeFromHeader);
1707 sz = hsize + std::max(chunk.size() * 2, chunk.size() + realsize);
1712 char* contC = (
char*)contents;
1713 chunk.insert(chunk.end(), contC, contC + realsize);
1714 }
catch (std::exception e) {
1721 CURL* curl_handle = curl_easy_init();
1722 curl_easy_setopt(curl_handle, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1723 std::string fullUrl = getFullUrlForRetrieval(curl_handle, requestContext.
path, requestContext.
metadata, requestContext.
timestamp);
1724 curl_slist* options_list =
nullptr;
1725 initCurlHTTPHeaderOptionsForRetrieve(curl_handle, options_list, requestContext.
timestamp, &requestContext.
headers,
1729 data->hosts = hostsPool;
1732 data->localContentCallback = localContentCallback;
1733 data->userAgent = mUniqueAgentID;
1734 data->optionsList = options_list;
1736 curl_easy_setopt(curl_handle, CURLOPT_URL, fullUrl.c_str());
1737 initCurlOptionsForRetrieve(curl_handle, (
void*)(&
data->hoPair), writeCallback,
false);
1738 curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, header_map_callback<
decltype(
data->hoPair.header)>);
1739 curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, (
void*)&(
data->hoPair.header));
1740 curl_easy_setopt(curl_handle, CURLOPT_PRIVATE, (
void*)
data);
1743 asynchPerform(curl_handle, requestCounter);
1748 std::hash<std::string> hasher;
1749 std::string semhashedstring =
"aliceccdb" +
std::to_string(hasher(basedir + ccdbpath)).substr(0, 16);
1750 return semhashedstring;
1758 return new boost::interprocess::named_semaphore(boost::interprocess::open_or_create_t{}, semhashedstring.c_str(), 1);
1759 }
catch (std::exception e) {
1760 LOG(warn) <<
"Exception occurred during CCDB (cache) semaphore setup; Continuing without";
1769 if (sem->try_wait()) {
1780 boost::interprocess::named_semaphore semaphore(boost::interprocess::open_only, semaname.c_str());
1781 std::cout <<
"Found CCDB semaphore: " << semaname <<
"\n";
1783 auto success = boost::interprocess::named_semaphore::remove(semaname.c_str());
1785 std::cout <<
"Removed CCDB semaphore: " << semaname <<
"\n";
1790 }
catch (std::exception
const& e) {
1801 namespace fs = std::filesystem;
1802 std::string fileName{
"snapshot.root"};
1804 auto absolutesnapshotdir = fs::weakly_canonical(fs::absolute(snapshotdir));
1805 for (
const auto&
entry : fs::recursive_directory_iterator(absolutesnapshotdir)) {
1806 if (
entry.is_directory()) {
1807 const fs::path& currentDir = fs::canonical(fs::absolute(
entry.path()));
1808 fs::path filePath = currentDir / fileName;
1809 if (fs::exists(filePath) && fs::is_regular_file(filePath)) {
1810 std::cout <<
"Directory with file '" << fileName <<
"': " << currentDir << std::endl;
1814 auto numtokens = pathtokens.size();
1815 if (numtokens < 3) {
1820 std::string
path = pathtokens[numtokens - 3] +
"/" + pathtokens[numtokens - 2] +
"/" + pathtokens[numtokens - 1];
1826 }
catch (std::exception
const& e) {
1827 LOG(info) <<
"Semaphore search had exception " << e.what();
1832 long timestamp, std::map<std::string, std::string>& headers,
1833 std::string& snapshotpath, std::pmr::vector<char>& dest,
int& fromSnapshot, std::string
const& etag)
const
1835 if (createSnapshot) {
1836 std::string logfile = mSnapshotCachePath +
"/log";
1837 std::fstream logStream = std::fstream(logfile, ios_base::out | ios_base::app);
1838 if (logStream.is_open()) {
1839 logStream <<
"CCDB-access[" << getpid() <<
"] of " << mUniqueAgentID <<
" to " <<
path <<
" timestamp " << timestamp <<
" for load to memory\n";
1842 if (mInSnapshotMode) {
1847 }
else if (mPreferSnapshotCache && std::filesystem::exists(snapshotpath)) {
1859 if (!mSnapshotCachePath.empty() && !(mInSnapshotMode && mSnapshotTopPath == mSnapshotCachePath)) {
1860 auto semaphore_barrier = std::make_unique<CCDBSemaphore>(mSnapshotCachePath, requestContext.
path);
1862 auto snapshotdir = getSnapshotDir(mSnapshotCachePath, requestContext.
path);
1863 std::string snapshotpath = getSnapshotFile(mSnapshotCachePath, requestContext.
path);
1865 std::fstream logStream;
1866 if (logStream.is_open()) {
1867 logStream <<
"CCDB-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
" downloading to snapshot " << snapshotpath <<
" from memory\n";
1870 LOGP(
debug,
"creating snapshot {} -> {}", requestContext.
path, snapshotpath);
1873 std::ofstream objFile(snapshotpath, std::ios::out | std::ofstream::binary);
1874 std::copy(requestContext.
dest.begin(), requestContext.
dest.end(), std::ostreambuf_iterator<char>(objFile));
1877 updateMetaInformationInLocalFile(snapshotpath, &requestContext.
headers, &querysummary);
1883 std::map<std::string, std::string>
const& metadata,
long timestamp,
1884 std::map<std::string, std::string>* headers, std::string
const& etag,
1885 const std::string& createdNotAfter,
const std::string& createdNotBefore,
bool considerSnapshot)
const
1887 std::pmr::vector<char> destP;
1888 destP.reserve(dest.size());
1889 loadFileToMemory(destP,
path, metadata, timestamp, headers, etag, createdNotAfter, createdNotBefore, considerSnapshot);
1891 dest.reserve(destP.size());
1892 for (
const auto c : destP) {
1898 std::map<std::string, std::string>
const& metadata,
long timestamp,
1899 std::map<std::string, std::string>* headers, std::string
const& etag,
1900 const std::string& createdNotAfter,
const std::string& createdNotBefore,
bool considerSnapshot)
const
1907 requestContext.
etag = etag;
1911 std::vector<RequestContext> contexts = {requestContext};
1917 size_t hsize = getFlatHeaderSize(headers), cnt = dest.size();
1918 dest.resize(cnt + hsize);
1919 auto addString = [&dest, &cnt](
const std::string& s) {
1926 for (
auto&
h : headers) {
1928 addString(
h.second);
1930 *
reinterpret_cast<int*
>(&dest[cnt]) = hsize;
1931 std::memcpy(&dest[cnt +
sizeof(
int)], FlatHeaderAnnot,
sizeof(FlatHeaderAnnot));
1936 LOGP(
debug,
"loadFileToMemory {} ETag=[{}]", requestContext.
path, requestContext.
etag);
1937 bool createSnapshot = requestContext.
considerSnapshot && !mSnapshotCachePath.empty();
1939 std::string snapshotpath;
1940 if (mInSnapshotMode || std::filesystem::exists(snapshotpath = getSnapshotFile(mSnapshotCachePath, requestContext.
path))) {
1941 auto semaphore_barrier = std::make_unique<CCDBSemaphore>(mSnapshotCachePath, requestContext.
path);
1952 std::vector<int> fromSnapshots(requestContexts.size());
1953 size_t requestCounter = 0;
1956 for (
int i = 0;
i < requestContexts.size();
i++) {
1958 auto& requestContext = requestContexts.at(
i);
1963 while (requestCounter > 0) {
1968 for (
int i = 0;
i < requestContexts.size();
i++) {
1969 auto& requestContext = requestContexts.at(
i);
1970 if (!requestContext.dest.empty()) {
1971 logReading(requestContext.path, requestContext.timestamp, &requestContext.headers,
1972 fmt::format(
"{}{}", requestContext.considerSnapshot ?
"load to memory" :
"retrieve", fromSnapshots.at(
i) ?
" from snapshot" :
""));
1973 if (requestContext.considerSnapshot && fromSnapshots.at(
i) != 2) {
1982 if (url.find(
"alien:/", 0) != std::string::npos) {
1983 std::map<std::string, std::string> localHeaders;
1985 auto it = localHeaders.find(
"Error");
1986 if (it != localHeaders.end() && it->second ==
"An error occurred during retrieval") {
1992 if ((url.find(
"file:/", 0) != std::string::npos)) {
1993 std::string
path = url.substr(7);
1994 if (std::filesystem::exists(
path)) {
1995 std::map<std::string, std::string> localHeaders;
1997 auto it = localHeaders.find(
"Error");
1998 if (it != localHeaders.end() && it->second ==
"An error occurred during retrieval") {
2008void CcdbApi::loadFileToMemory(std::pmr::vector<char>& dest,
const std::string&
path, std::map<std::string, std::string>* localHeaders,
bool fetchLocalMetaData)
const
2011 constexpr size_t MaxCopySize = 0x1L << 25;
2012 auto signalError = [&dest, localHeaders]() {
2016 (*localHeaders)[
"Error"] =
"An error occurred during retrieval";
2019 if (
path.find(
"alien:/") == 0 && !initTGrid()) {
2023 std::string fname(
path);
2024 if (fname.find(
"?filetype=raw") == std::string::npos) {
2025 fname +=
"?filetype=raw";
2027 std::unique_ptr<TFile> sfile{TFile::Open(fname.c_str())};
2028 if (!sfile || sfile->IsZombie()) {
2029 LOG(error) <<
"Failed to open file " << fname;
2033 size_t totalread = 0, fsize = sfile->GetSize(), b00 = sfile->GetBytesRead();
2035 char* dptr = dest.data();
2039 size_t b0 = sfile->GetBytesRead(),
b1 = b0 - b00;
2040 size_t readsize = fsize -
b1 > MaxCopySize ? MaxCopySize : fsize -
b1;
2041 if (readsize == 0) {
2044 sfile->Seek(totalread, TFile::kBeg);
2045 bool failed = sfile->ReadBuffer(dptr, (Int_t)readsize);
2046 nread = sfile->GetBytesRead() - b0;
2047 if (
failed || nread < 0) {
2048 LOG(error) <<
"failed to copy file " << fname <<
" to memory buffer";
2054 }
while (nread == (
long)MaxCopySize);
2056 if (localHeaders && fetchLocalMetaData) {
2057 TMemFile memFile(
"name",
const_cast<char*
>(dest.data()), dest.size(),
"READ");
2058 auto storedmeta = (std::map<std::string, std::string>*)
extractFromTFile(memFile, TClass::GetClass(
"std::map<std::string, std::string>"),
CCDBMETA_ENTRY);
2060 *localHeaders = *storedmeta;
2063 if ((
isSnapshotMode() || mPreferSnapshotCache) && localHeaders->find(
"ETag") == localHeaders->end()) {
2064 (*localHeaders)[
"ETag"] =
path;
2066 if (localHeaders->find(
"fileSize") == localHeaders->end()) {
2067 (*localHeaders)[
"fileSize"] = fmt::format(
"{}", memFile.GetEND());
2073void CcdbApi::checkMetadataKeys(std::map<std::string, std::string>
const& metadata)
const
2079 const std::regex regexPatternSearch(R
"([ :;.,\\/'?!\(\)\{\}\[\]@<>=+*#$&`|~^%])");
2080 bool isInvalid =
false;
2082 for (
auto& el : metadata) {
2083 auto keyMd = el.first;
2085 std::smatch searchRes;
2086 while (std::regex_search(keyMd, searchRes, regexPatternSearch)) {
2088 LOG(error) <<
"Invalid character found in metadata key '" << tmp <<
"\': '" << searchRes.str() <<
"\'";
2089 keyMd = searchRes.suffix();
2093 LOG(fatal) <<
"Some metadata keys have invalid characters, please fix!";
2098void CcdbApi::logReading(
const std::string&
path,
long ts,
const std::map<std::string, std::string>* headers,
const std::string& comment)
const
2100 std::string upath{
path};
2102 auto ent = headers->find(
"Valid-From");
2103 if (ent != headers->end()) {
2104 upath +=
"/" + ent->second;
2106 ent = headers->find(
"ETag");
2107 if (ent != headers->end()) {
2108 upath +=
"/" + ent->second;
2111 upath.erase(remove(upath.begin(), upath.end(),
'\"'), upath.end());
2112 LOGP(info,
"ccdb reads {}{}{} for {} ({}, agent_id: {}), ", mUrl, mUrl.back() ==
'/' ?
"" :
"/", upath, ts < 0 ?
getCurrentTimestamp() : ts, comment, mUniqueAgentID);
2115void CcdbApi::asynchPerform(CURL* handle,
size_t* requestCounter)
const
2120CURLcode CcdbApi::CURL_perform(CURL* handle)
const
2122 if (mIsCCDBDownloaderPreferred) {
2123 return mDownloader->
perform(handle);
2126 for (
int i = 1;
i <= mCurlRetries && (
result = curl_easy_perform(handle)) != CURLE_OK;
i++) {
2127 usleep(mCurlDelayRetries *
i);
2138 LOG(
debug) <<
"Entering semaphore barrier";
2141 mSem =
new boost::interprocess::named_semaphore(boost::interprocess::open_or_create_t{}, mSemName.c_str(), 1);
2142 }
catch (std::exception e) {
2143 LOG(warn) <<
"Exception occurred during CCDB (cache) semaphore setup; Continuing without";
2148 gSemaRegistry.
add(
this);
2155 LOG(
debug) <<
"Ending semaphore barrier";
2158 if (mSem->try_wait()) {
2160 boost::interprocess::named_semaphore::remove(mSemName.c_str());
2162 gSemaRegistry.
remove(
this);
2168 LOG(
debug) <<
"Cleaning up semaphore registry with count " << mStore.size();
2169 for (
auto& s : mStore) {
const GPUTPCGMMerger::trackCluster & b1
Class for time synchronization of RawReader instances.
void setRequestTimeoutTime(int timeoutMS)
CURLcode perform(CURL *handle)
void asynchSchedule(CURL *handle, size_t *requestCounter)
void setKeepaliveTimeoutTime(int timeoutMS)
void runLoop(bool noWait)
CCDBSemaphore(std::string const &cachepath, std::string const &path)
static void curlSetSSLOptions(CURL *curl)
static std::string generateFileName(const std::string &inp)
std::string list(std::string const &path="", bool latestOnly=false, std::string const &returnFormat="text/plain", long createdNotAfter=-1, long createdNotBefore=-1) const
int storeAsTFile_impl(const void *obj1, std::type_info const &info, std::string const &path, std::map< std::string, std::string > const &metadata, long startValidityTimestamp=-1, long endValidityTimestamp=-1, std::vector< char >::size_type maxSize=0) const
static bool checkAlienToken()
void runDownloaderLoop(bool noWait)
void releaseNamedSemaphore(boost::interprocess::named_semaphore *sem, std::string const &path) const
bool retrieveBlob(std::string const &path, std::string const &targetdir, std::map< std::string, std::string > const &metadata, long timestamp, bool preservePathStructure=true, std::string const &localFileName="snapshot.root", std::string const &createdNotAfter="", std::string const &createdNotBefore="") const
static std::map< std::string, std::string > * retrieveMetaInfo(TFile &)
void scheduleDownload(RequestContext &requestContext, size_t *requestCounter) const
TObject * retrieve(std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp) const
void init(std::string const &hosts)
TObject * retrieveFromTFile(std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp, std::map< std::string, std::string > *headers, std::string const &etag, const std::string &createdNotAfter, const std::string &createdNotBefore) const
std::string const & getURL() const
static void removeLeakingSemaphores(std::string const &basedir, bool remove=false)
bool loadLocalContentToMemory(std::pmr::vector< char > &dest, std::string &url) const
void saveSnapshot(RequestContext &requestContext) const
static std::unique_ptr< std::vector< char > > createObjectImage(const T *obj, CcdbObjectInfo *info=nullptr)
void getFromSnapshot(bool createSnapshot, std::string const &path, long timestamp, std::map< std::string, std::string > &headers, std::string &snapshotpath, std::pmr::vector< char > &dest, int &fromSnapshot, std::string const &etag) const
static void appendFlatHeader(std::pmr::vector< char > &dest, const std::map< std::string, std::string > &headers)
static void * extractFromTFile(TFile &file, TClass const *cl, const char *what=CCDBOBJECT_ENTRY)
int storeAsTFile(const TObject *rootObject, std::string const &path, std::map< std::string, std::string > const &metadata, long startValidityTimestamp=-1, long endValidityTimestamp=-1, std::vector< char >::size_type maxSize=0) const
void snapshot(std::string const &ccdbrootpath, std::string const &localDir, long timestamp) const
bool isSnapshotMode() const
bool isHostReachable() const
void loadFileToMemory(std::vector< char > &dest, std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp, std::map< std::string, std::string > *headers, std::string const &etag, const std::string &createdNotAfter, const std::string &createdNotBefore, bool considerSnapshot=true) const
static std::string determineSemaphoreName(std::string const &basedir, std::string const &objectpath)
void deleteObject(std::string const &path, long timestamp=-1) const
std::vector< std::string > getAllFolders(std::string const &top) const
void vectoredLoadFileToMemory(std::vector< RequestContext > &requestContext) const
boost::interprocess::named_semaphore * createNamedSemaphore(std::string const &path) const
static bool removeSemaphore(std::string const &name, bool remove=false)
std::map< std::string, std::string > retrieveHeaders(std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp=-1) const
CcdbApi()
Default constructor.
static bool getCCDBEntryHeaders(std::string const &url, std::string const &etag, std::vector< std::string > &headers, const std::string &agentID="")
static CCDBQuery * retrieveQueryInfo(TFile &)
static constexpr const char * CCDBQUERY_ENTRY
void truncate(std::string const &path) const
int storeAsBinaryFile(const char *buffer, size_t size, const std::string &fileName, const std::string &objectType, const std::string &path, const std::map< std::string, std::string > &metadata, long startValidityTimestamp, long endValidityTimestamp, std::vector< char >::size_type maxSize=0) const
static void parseCCDBHeaders(std::vector< std::string > const &headers, std::vector< std::string > &pfns, std::string &etag)
int updateMetadata(std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp, std::string const &id="", long newEOV=0)
static constexpr const char * CCDBMETA_ENTRY
static constexpr const char * CCDBOBJECT_ENTRY
void navigateSourcesAndLoadFile(RequestContext &requestContext, int &fromSnapshot, size_t *requestCounter) const
std::vector< std::string > parseSubFolders(std::string const &reply) const
virtual ~CcdbApi()
Default destructor.
void setFileName(const std::string &nm)
const std::string & getObjectType() const
void setObjectType(const std::string &tp)
const std::string & getFileName() const
void add(CCDBSemaphore const *ptr)
void remove(CCDBSemaphore const *ptr)
SemaphoreRegistry()=default
GLdouble GLdouble GLdouble GLdouble top
GLsizei const GLfloat * value
GLsizei const GLchar *const * path
GLbitfield GLuint64 timeout
GLenum GLuint GLenum GLsizei const GLchar * buf
information complementary to a CCDB object (path, metadata, startTimeValidity, endTimeValidity etc)
bool stdmap_to_jsonfile(std::map< std::string, std::string > const &meta, std::string const &filename)
size_t write_data(void *, size_t size, size_t nmemb, void *)
long getCurrentTimestamp()
returns the timestamp in long corresponding to "now"
size_t(*)(void *, size_t, size_t, void *) CurlWriteCallback
std::string sanitizeObjectName(const std::string &objectName)
bool jsonfile_to_stdmap(std::map< std::string, std::string > &meta, std::string const &filename)
long getFutureTimestamp(int secondsInFuture)
returns the timestamp in long corresponding to "now + secondsInFuture"
size_t CurlWrite_CallbackFunc_StdString2(void *contents, size_t size, size_t nmemb, std::string *s)
std::string timestamp() noexcept
Defining PrimaryVertex explicitly as messageable.
void createDirectoriesIfAbsent(std::string const &path)
Defining DataPointCompositeObject explicitly as copiable.
std::string to_string(gsl::span< T, Size > span)
std::map< std::string, std::string > const & metadata
std::pmr::vector< char > & dest
std::string createdNotAfter
std::map< std::string, std::string > & headers
std::string createdNotBefore
static DeploymentMode deploymentMode()
static std::string getClassName(const T &obj)
get the class name of the object
static std::unique_ptr< FileImage > createFileImage(const TObject &obj, const std::string &fileName, const std::string &objName)
static void trim(std::string &s)
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
static std::string concat_string(Ts const &... ts)
static bool endsWith(const std::string &s, const std::string &ending)
static std::string getRandomString(int length)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))
uint64_t const void const *restrict const msg