31#include <TStreamerInfo.h>
35#include <fairlogger/Logger.h>
41#include <boost/algorithm/string.hpp>
42#include <boost/asio/ip/host_name.hpp>
45#include <boost/interprocess/sync/named_semaphore.hpp>
49#include <unordered_set>
50#include "rapidjson/document.h"
51#include "rapidjson/writer.h"
52#include "rapidjson/stringbuffer.h"
60unique_ptr<TJAlienCredentials> CcdbApi::mJAlienCredentials =
nullptr;
75 boost::interprocess::named_semaphore* mSem =
nullptr;
76 std::string mSemName{};
91 std::unordered_set<CCDBSemaphore const*> mStore;
101 mIsCCDBDownloaderPreferred = 0;
102 if (deploymentMode == DeploymentMode::OnlineDDS && deploymentMode == DeploymentMode::OnlineECS && deploymentMode == DeploymentMode::OnlineAUX && deploymentMode == DeploymentMode::FST) {
103 mIsCCDBDownloaderPreferred = 1;
105 if (getenv(
"ALICEO2_ENABLE_MULTIHANDLE_CCDBAPI")) {
106 mIsCCDBDownloaderPreferred = atoi(getenv(
"ALICEO2_ENABLE_MULTIHANDLE_CCDBAPI"));
113 curl_global_cleanup();
117void CcdbApi::setUniqueAgentID()
119 std::string host = boost::asio::ip::host_name();
120 char const* jobID = getenv(
"ALIEN_PROC_ID");
131 LOG(
debug) <<
"On macOS we simply rely on TGrid::Connect(\"alien\").";
134 if (getenv(
"ALICEO2_CCDB_NOTOKENCHECK") && atoi(getenv(
"ALICEO2_CCDB_NOTOKENCHECK"))) {
137 if (getenv(
"JALIEN_TOKEN_CERT")) {
140 auto returncode = system(
"LD_PRELOAD= alien-token-info &> /dev/null");
141 if (returncode == -1) {
144 return returncode == 0;
147void CcdbApi::curlInit()
150 curl_global_init(CURL_GLOBAL_DEFAULT);
151 CcdbApi::mJAlienCredentials = std::make_unique<TJAlienCredentials>();
152 CcdbApi::mJAlienCredentials->loadCredentials();
153 CcdbApi::mJAlienCredentials->selectPreferedCredentials();
156 if (getenv(
"ALICEO2_CCDB_SOCKET_TIMEOUT")) {
157 auto timeoutMS = atoi(getenv(
"ALICEO2_CCDB_SOCKET_TIMEOUT"));
158 if (timeoutMS >= 0) {
159 LOG(info) <<
"Setting socket timeout to " << timeoutMS <<
" milliseconds";
169 constexpr const char* SNAPSHOTPREFIX =
"file://";
172 if (host.substr(0, 7).compare(SNAPSHOTPREFIX) == 0) {
173 auto path = host.substr(7);
174 initInSnapshotMode(
path);
195 std::string snapshotReport{};
196 const char* cachedir = getenv(
"ALICEO2_CCDB_LOCALCACHE");
197 namespace fs = std::filesystem;
199 if (cachedir[0] == 0) {
200 mSnapshotCachePath = fs::weakly_canonical(fs::absolute(
"."));
202 mSnapshotCachePath = fs::weakly_canonical(fs::absolute(cachedir));
204 snapshotReport = fmt::format(
"(cache snapshots to dir={}", mSnapshotCachePath);
207 mPreferSnapshotCache =
true;
208 if (mSnapshotCachePath.empty()) {
209 LOGP(fatal,
"IGNORE_VALIDITYCHECK_OF_CCDB_LOCALCACHE is defined but the ALICEO2_CCDB_LOCALCACHE is not");
211 snapshotReport +=
", prefer if available";
213 if (!snapshotReport.empty()) {
214 snapshotReport +=
')';
217 mNeedAlienToken = (host.find(
"https://") != std::string::npos) || (host.find(
"alice-ccdb.cern.ch") != std::string::npos);
220 if (getenv(
"ALICEO2_CCDB_CURL_TIMEOUT_DOWNLOAD")) {
221 auto timeout = atoi(getenv(
"ALICEO2_CCDB_CURL_TIMEOUT_DOWNLOAD"));
223 mCurlTimeoutDownload =
timeout;
230 mCurlTimeoutDownload = 15;
233 mCurlTimeoutDownload = 15;
235 mCurlTimeoutDownload = 1;
239 if (getenv(
"ALICEO2_CCDB_CURL_TIMEOUT_UPLOAD")) {
240 auto timeout = atoi(getenv(
"ALICEO2_CCDB_CURL_TIMEOUT_UPLOAD"));
249 mCurlTimeoutUpload = 3;
252 mCurlTimeoutUpload = 20;
254 mCurlTimeoutUpload = 20;
261 LOGP(
debug,
"Curl timeouts are set to: download={:2}, upload={:2} seconds", mCurlTimeoutDownload, mCurlTimeoutUpload);
263 LOGP(info,
"Init CcdApi with UserAgentID: {}, Host: {}{}, Curl timeouts: upload:{} download:{}", mUniqueAgentID, host,
264 mInSnapshotMode ?
"(snapshot readonly mode)" : snapshotReport.c_str(), mCurlTimeoutUpload, mCurlTimeoutDownload);
273void CcdbApi::updateMetaInformationInLocalFile(std::string
const&
filename, std::map<std::string, std::string>
const* headers,
CCDBQuery const* querysummary)
275 std::lock_guard<std::mutex> guard(
gIOMutex);
276 auto oldlevel = gErrorIgnoreLevel;
277 gErrorIgnoreLevel = 6001;
278 TFile snapshotfile(
filename.c_str(),
"UPDATE");
280 if (!snapshotfile.IsZombie()) {
282 snapshotfile.WriteObjectAny(querysummary, TClass::GetClass(
typeid(*querysummary)),
CCDBQUERY_ENTRY);
285 snapshotfile.WriteObjectAny(headers, TClass::GetClass(
typeid(*headers)),
CCDBMETA_ENTRY);
287 snapshotfile.Write();
288 snapshotfile.Close();
290 gErrorIgnoreLevel = oldlevel;
300 string tmpObjectName = objectName;
301 tmpObjectName.erase(std::remove_if(tmpObjectName.begin(), tmpObjectName.end(),
302 [](
auto const&
c) ->
bool { return (!std::isalnum(c) && c !=
'_' && c !=
'/' && c !=
'.'); }),
303 tmpObjectName.end());
304 return tmpObjectName;
311 std::lock_guard<std::mutex> guard(
gIOMutex);
325 std::string className = rootObject->GetName();
331 std::lock_guard<std::mutex> guard(
gIOMutex);
336 std::map<std::string, std::string>
const& metadata,
337 long startValidityTimestamp,
long endValidityTimestamp,
338 std::vector<char>::size_type maxSize)
const
342 LOGP(error,
"nullptr is provided for object {}/{}/{}",
path, startValidityTimestamp, endValidityTimestamp);
348 path, metadata, startValidityTimestamp, endValidityTimestamp, maxSize);
352 const std::string&
path,
const std::map<std::string, std::string>& metadata,
353 long startValidityTimestamp,
long endValidityTimestamp, std::vector<char>::size_type maxSize)
const
355 if (maxSize > 0 &&
size > maxSize) {
356 LOGP(alarm,
"Object will not be uploaded to {} since its size {} exceeds max allowed {}",
path,
size, maxSize);
362 long sanitizedStartValidityTimestamp = startValidityTimestamp;
363 if (startValidityTimestamp == -1) {
364 LOGP(info,
"Start of Validity not set, current timestamp used.");
367 long sanitizedEndValidityTimestamp = endValidityTimestamp;
368 if (endValidityTimestamp == -1) {
369 LOGP(info,
"End of Validity not set, start of validity plus 1 day used.");
372 if (mInSnapshotMode) {
373 auto pthLoc = getSnapshotDir(mSnapshotTopPath,
path);
375 auto flLoc = getSnapshotFile(mSnapshotTopPath,
path,
filename);
377 auto pent = flLoc.find_last_of(
'.');
378 if (pent == std::string::npos) {
381 flLoc.insert(pent, fmt::format(
"_{}_{}", startValidityTimestamp, endValidityTimestamp));
382 ofstream outf(flLoc.c_str(), ios::out | ios::binary);
386 throw std::runtime_error(fmt::format(
"Failed to write local CCDB file {}", flLoc));
388 std::map<std::string, std::string> metaheader(metadata);
390 metaheader[
"Valid-From"] =
std::to_string(startValidityTimestamp);
392 updateMetaInformationInLocalFile(flLoc.c_str(), &metaheader);
393 std::string metaStr{};
394 for (
const auto& mentry : metadata) {
395 metaStr += fmt::format(
"{}={};", mentry.first, mentry.second);
397 metaStr +=
"$USER_META;";
398 LOGP(info,
"Created local snapshot {}", flLoc);
399 LOGP(info, R
"(Upload with: o2-ccdb-upload --host "$ccdbhost" -p {} -f {} -k {} --starttimestamp {} --endtimestamp {} -m "{}")",
406 CURL* curl =
nullptr;
407 curl = curl_easy_init();
410 checkMetadataKeys(metadata);
412 if (curl !=
nullptr) {
413 auto mime = curl_mime_init(curl);
414 auto field = curl_mime_addpart(mime);
415 curl_mime_name(field,
"send");
416 curl_mime_filedata(field,
filename.c_str());
419 struct curl_slist* headerlist =
nullptr;
420 static const char buf[] =
"Expect:";
421 headerlist = curl_slist_append(headerlist,
buf);
425 curl_easy_setopt(curl, CURLOPT_MIMEPOST, mime);
426 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headerlist);
427 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
428 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
429 curl_easy_setopt(curl, CURLOPT_TIMEOUT, mCurlTimeoutUpload);
431 CURLcode
res = CURL_LAST;
433 for (
size_t hostIndex = 0; hostIndex < hostsPool.size() &&
res > 0; hostIndex++) {
434 string fullUrl = getFullUrlForStorage(curl,
path, objectType, metadata, sanitizedStartValidityTimestamp, sanitizedEndValidityTimestamp, hostIndex);
435 LOG(debug3) <<
"Full URL Encoded: " << fullUrl;
437 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.c_str());
440 res = CURL_perform(curl);
442 if (
res != CURLE_OK) {
443 if (
res == CURLE_OPERATION_TIMEDOUT) {
444 LOGP(alarm,
"curl_easy_perform() timed out. Consider increasing the timeout using the env var `ALICEO2_CCDB_CURL_TIMEOUT_UPLOAD` (seconds), current one is {}", mCurlTimeoutUpload);
446 LOGP(alarm,
"curl_easy_perform() failed: {}", curl_easy_strerror(
res));
453 curl_easy_cleanup(curl);
456 curl_slist_free_all(headerlist);
458 curl_mime_free(mime);
460 LOGP(alarm,
"curl initialization failure");
467 long startValidityTimestamp,
long endValidityTimestamp, std::vector<char>::size_type maxSize)
const
471 LOGP(error,
"nullptr is provided for object {}/{}/{}",
path, startValidityTimestamp, endValidityTimestamp);
479string CcdbApi::getFullUrlForStorage(CURL* curl,
const string&
path,
const string& objtype,
480 const map<string, string>& metadata,
481 long startValidityTimestamp,
long endValidityTimestamp,
int hostIndex)
const
484 string startValidityString = getTimestampString(startValidityTimestamp < 0 ?
getCurrentTimestamp() : startValidityTimestamp);
485 string endValidityString = getTimestampString(endValidityTimestamp < 0 ?
getFutureTimestamp(60 * 60 * 24 * 1) : endValidityTimestamp);
487 string url = getHostUrl(hostIndex);
489 string fullUrl = url +
"/" +
path +
"/" + startValidityString +
"/" + endValidityString +
"/";
492 char* objtypeEncoded = curl_easy_escape(curl, objtype.c_str(), objtype.size());
493 fullUrl +=
"ObjectType=" +
string(objtypeEncoded) +
"/";
494 curl_free(objtypeEncoded);
496 for (
auto& kv : metadata) {
497 string mfirst = kv.first;
498 string msecond = kv.second;
500 char* mfirstEncoded = curl_easy_escape(curl, mfirst.c_str(), mfirst.size());
501 char* msecondEncoded = curl_easy_escape(curl, msecond.c_str(), msecond.size());
502 fullUrl +=
string(mfirstEncoded) +
"=" +
string(msecondEncoded) +
"/";
503 curl_free(mfirstEncoded);
504 curl_free(msecondEncoded);
510string CcdbApi::getFullUrlForRetrieval(CURL* curl,
const string&
path,
const map<string, string>& metadata,
long timestamp,
int hostIndex)
const
512 if (mInSnapshotMode) {
513 return getSnapshotFile(mSnapshotTopPath,
path);
519 string hostUrl = getHostUrl(hostIndex);
521 string fullUrl = hostUrl +
"/" +
path +
"/" + validityString +
"/";
523 for (
auto& kv : metadata) {
524 string mfirst = kv.first;
525 string msecond = kv.second;
527 char* mfirstEncoded = curl_easy_escape(curl, mfirst.c_str(), mfirst.size());
528 char* msecondEncoded = curl_easy_escape(curl, msecond.c_str(), msecond.size());
529 fullUrl +=
string(mfirstEncoded) +
"=" +
string(msecondEncoded) +
"/";
530 curl_free(mfirstEncoded);
531 curl_free(msecondEncoded);
553static size_t WriteMemoryCallback(
void* contents,
size_t size,
size_t nmemb,
void* userp)
555 size_t realsize =
size * nmemb;
558 mem->memory = (
char*)realloc(mem->memory, mem->size + realsize + 1);
559 if (mem->memory ==
nullptr) {
560 printf(
"not enough memory (realloc returned NULL)\n");
564 memcpy(&(mem->memory[mem->size]), contents, realsize);
565 mem->size += realsize;
566 mem->memory[mem->size] = 0;
582static size_t WriteToFileCallback(
void*
ptr,
size_t size,
size_t nmemb, FILE*
stream)
595static CURLcode ssl_ctx_callback(CURL*,
void*,
void* parm)
597 std::string
msg((
const char*)parm);
600 if (
msg.length() > 0 &&
end == -1) {
602 }
else if (
end > 0) {
614 CredentialsKind cmk = mJAlienCredentials->getPreferedCredentials();
617 if (cmk == cNOT_FOUND) {
621 TJAlienCredentialsObject cmo = mJAlienCredentials->get(cmk);
623 char* CAPath = getenv(
"X509_CERT_DIR");
625 curl_easy_setopt(curl_handle, CURLOPT_CAPATH, CAPath);
627 curl_easy_setopt(curl_handle, CURLOPT_CAINFO,
nullptr);
628 curl_easy_setopt(curl_handle, CURLOPT_SSLCERT, cmo.certpath.c_str());
629 curl_easy_setopt(curl_handle, CURLOPT_SSLKEY, cmo.keypath.c_str());
632 curl_easy_setopt(curl_handle, CURLOPT_SSL_CTX_FUNCTION, ssl_ctx_callback);
633 curl_easy_setopt(curl_handle, CURLOPT_SSL_CTX_DATA, mJAlienCredentials->getMessages().c_str());
640void CcdbApi::initCurlOptionsForRetrieve(CURL* curlHandle,
void* chunk,
CurlWriteCallback writeCallback,
bool followRedirect)
const
642 curl_easy_setopt(curlHandle, CURLOPT_WRITEFUNCTION, writeCallback);
643 curl_easy_setopt(curlHandle, CURLOPT_WRITEDATA, chunk);
644 curl_easy_setopt(curlHandle, CURLOPT_FOLLOWLOCATION, followRedirect ? 1L : 0L);
649template <
typename MapType = std::map<std::
string, std::
string>>
650size_t header_map_callback(
char*
buffer,
size_t size,
size_t nitems,
void* userdata)
652 auto* headers =
static_cast<MapType*
>(userdata);
653 auto header = std::string(
buffer,
size * nitems);
654 std::string::size_type
index = header.find(
':', 0);
655 if (
index != std::string::npos) {
656 const auto key = boost::algorithm::trim_copy(header.substr(0,
index));
657 const auto value = boost::algorithm::trim_copy(header.substr(
index + 1));
658 LOGP(
debug,
"Adding #{} {} -> {}", headers->size(),
key,
value);
660 if (
key ==
"Content-Length") {
661 auto cl = headers->find(
"Content-Length");
662 if (cl != headers->end()) {
663 if (std::stol(cl->second) < stol(
value)) {
671 headers->insert(std::make_pair(
key,
value));
674 return size * nitems;
678void CcdbApi::initCurlHTTPHeaderOptionsForRetrieve(CURL* curlHandle, curl_slist*& option_list,
long timestamp, std::map<std::string, std::string>* headers, std::string
const& etag,
679 const std::string& createdNotAfter,
const std::string& createdNotBefore)
const
683 option_list = curl_slist_append(option_list, (
"If-None-Match: " + etag).c_str());
686 if (!createdNotAfter.empty()) {
687 option_list = curl_slist_append(option_list, (
"If-Not-After: " + createdNotAfter).c_str());
690 if (!createdNotBefore.empty()) {
691 option_list = curl_slist_append(option_list, (
"If-Not-Before: " + createdNotBefore).c_str());
694 if (headers !=
nullptr) {
695 option_list = curl_slist_append(option_list, (
"If-None-Match: " +
to_string(timestamp)).c_str());
696 curl_easy_setopt(curlHandle, CURLOPT_HEADERFUNCTION, header_map_callback<>);
697 curl_easy_setopt(curlHandle, CURLOPT_HEADERDATA, headers);
701 curl_easy_setopt(curlHandle, CURLOPT_HTTPHEADER, option_list);
704 curl_easy_setopt(curlHandle, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
707bool CcdbApi::receiveToFile(FILE* fileHandle, std::string
const&
path, std::map<std::string, std::string>
const& metadata,
708 long timestamp, std::map<std::string, std::string>* headers, std::string
const& etag,
709 const std::string& createdNotAfter,
const std::string& createdNotBefore,
bool followRedirect)
const
711 return receiveObject((
void*)fileHandle,
path, metadata, timestamp, headers, etag, createdNotAfter, createdNotBefore, followRedirect, (CurlWriteCallback)&WriteToFileCallback);
714bool CcdbApi::receiveToMemory(
void* chunk, std::string
const&
path, std::map<std::string, std::string>
const& metadata,
715 long timestamp, std::map<std::string, std::string>* headers, std::string
const& etag,
716 const std::string& createdNotAfter,
const std::string& createdNotBefore,
bool followRedirect)
const
718 return receiveObject((
void*)chunk,
path, metadata, timestamp, headers, etag, createdNotAfter, createdNotBefore, followRedirect, (CurlWriteCallback)&WriteMemoryCallback);
721bool CcdbApi::receiveObject(
void* dataHolder, std::string
const&
path, std::map<std::string, std::string>
const& metadata,
722 long timestamp, std::map<std::string, std::string>* headers, std::string
const& etag,
723 const std::string& createdNotAfter,
const std::string& createdNotBefore,
bool followRedirect,
CurlWriteCallback writeCallback)
const
727 curlHandle = curl_easy_init();
728 curl_easy_setopt(curlHandle, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
730 if (curlHandle !=
nullptr) {
733 initCurlOptionsForRetrieve(curlHandle, dataHolder, writeCallback, followRedirect);
734 curl_slist* option_list =
nullptr;
735 initCurlHTTPHeaderOptionsForRetrieve(curlHandle, option_list, timestamp, headers, etag, createdNotAfter, createdNotBefore);
737 long responseCode = 0;
738 CURLcode curlResultCode = CURL_LAST;
740 for (
size_t hostIndex = 0; hostIndex < hostsPool.size() && (responseCode >= 400 || curlResultCode > 0); hostIndex++) {
741 string fullUrl = getFullUrlForRetrieval(curlHandle,
path, metadata, timestamp, hostIndex);
742 curl_easy_setopt(curlHandle, CURLOPT_URL, fullUrl.c_str());
744 curlResultCode = CURL_perform(curlHandle);
746 if (curlResultCode != CURLE_OK) {
747 LOGP(alarm,
"curl_easy_perform() failed: {}", curl_easy_strerror(curlResultCode));
749 curlResultCode = curl_easy_getinfo(curlHandle, CURLINFO_RESPONSE_CODE, &responseCode);
750 if ((curlResultCode == CURLE_OK) && (responseCode < 300)) {
751 curl_slist_free_all(option_list);
752 curl_easy_cleanup(curlHandle);
755 if (curlResultCode != CURLE_OK) {
756 LOGP(alarm,
"invalid URL {}", fullUrl);
758 LOGP(alarm,
"not found under link {}", fullUrl);
764 curl_slist_free_all(option_list);
765 curl_easy_cleanup(curlHandle);
771 long timestamp)
const
779 bool res = receiveToMemory((
void*)&chunk,
path, metadata, timestamp);
782 std::lock_guard<std::mutex> guard(
gIOMutex);
784 mess.SetBuffer(chunk.
memory, chunk.
size, kFALSE);
789 LOGP(info,
"couldn't retrieve the object {}",
path);
801 std::string
str = inp;
802 str.erase(std::remove_if(
str.begin(),
str.end(), ::isspace),
str.end());
803 str = std::regex_replace(
str, std::regex(
"::"),
"-");
809 long timestamp, std::map<std::string, std::string>* headers, std::string
const& etag,
810 const std::string& createdNotAfter,
const std::string& createdNotBefore)
const
816 long timestamp,
bool preservePath, std::string
const& localFileName, std::string
const& createdNotAfter, std::string
const& createdNotBefore)
const
820 std::string fulltargetdir = targetdir + (preservePath ? (
'/' +
path) :
"");
824 }
catch (std::exception e) {
825 LOGP(error,
"Could not create local snapshot cache directory {}, reason: {}", fulltargetdir, e.what());
830 std::map<std::string, std::string> headers;
832 loadFileToMemory(buff,
path, metadata, timestamp, &headers,
"", createdNotAfter, createdNotBefore,
false);
833 if ((headers.count(
"Error") != 0) || (buff.empty())) {
834 LOGP(error,
"Unable to find object {}/{}, Aborting",
path, timestamp);
838 auto getFileName = [&headers]() {
839 auto& s = headers[
"Content-Disposition"];
841 std::regex re(
"(.*;)filename=\"(.*)\"");
843 if (std::regex_match(s.c_str(),
m, re)) {
847 std::string backupname(
"ccdb-blob.bin");
848 LOG(error) <<
"Cannot determine original filename from Content-Disposition ... falling back to " << backupname;
851 auto filename = localFileName.size() > 0 ? localFileName : getFileName();
852 std::string targetpath = fulltargetdir +
"/" +
filename;
854 std::ofstream objFile(targetpath, std::ios::out | std::ofstream::binary);
855 std::copy(buff.begin(), buff.end(), std::ostreambuf_iterator<char>(objFile));
856 if (!objFile.good()) {
857 LOGP(error,
"Unable to open local file {}, Aborting", targetpath);
863 updateMetaInformationInLocalFile(targetpath.c_str(), &headers, &querysummary);
867void CcdbApi::snapshot(std::string
const& ccdbrootpath, std::string
const& localDir,
long timestamp)
const
871 std::map<string, string> metadata;
872 for (
auto& folder : allfolders) {
882 auto object =
file.GetObjectChecked(what, cl);
886 std::string objectName(cl->GetName());
888 object =
file.GetObjectChecked(objectName.c_str(), cl);
889 LOG(warn) <<
"Did not find object under expected name " << what;
893 LOG(warn) <<
"Found object under deprecated name " << cl->GetName();
898 if (cl->InheritsFrom(
"TObject")) {
901 auto tree =
dynamic_cast<TTree*
>((
TObject*)
object);
903 tree->LoadBaskets(0x1L << 32);
904 tree->SetDirectory(
nullptr);
907 auto h =
dynamic_cast<TH1*
>((
TObject*)
object);
909 h->SetDirectory(
nullptr);
917void* CcdbApi::extractFromLocalFile(std::string
const&
filename, std::type_info
const& tinfo, std::map<std::string, std::string>* headers)
const
919 if (!std::filesystem::exists(
filename)) {
920 LOG(error) <<
"Local snapshot " <<
filename <<
" not found \n";
923 std::lock_guard<std::mutex> guard(
gIOMutex);
924 auto tcl = tinfo2TClass(tinfo);
929 *headers = *storedmeta;
932 if ((
isSnapshotMode() || mPreferSnapshotCache) && headers->find(
"ETag") == headers->end()) {
935 if (headers->find(
"fileSize") == headers->end()) {
936 (*headers)[
"fileSize"] = fmt::format(
"{}",
f.GetEND());
942bool CcdbApi::initTGrid()
const
944 if (mNeedAlienToken && !mAlienInstance) {
945 static bool allowNoToken = getenv(
"ALICEO2_CCDB_NOTOKENCHECK") && atoi(getenv(
"ALICEO2_CCDB_NOTOKENCHECK"));
947 LOG(fatal) <<
"Alien Token Check failed - Please get an alien token before running with https CCDB endpoint, or alice-ccdb.cern.ch!";
949 mAlienInstance = TGrid::Connect(
"alien");
950 static bool errorShown =
false;
951 if (!mAlienInstance && errorShown ==
false) {
953 LOG(error) <<
"TGrid::Connect returned nullptr. May be due to missing alien token";
955 LOG(fatal) <<
"TGrid::Connect returned nullptr. May be due to missing alien token";
960 return mAlienInstance !=
nullptr;
963void* CcdbApi::downloadFilesystemContent(std::string
const& url, std::type_info
const& tinfo, std::map<string, string>* headers)
const
965 if ((url.find(
"alien:/", 0) != std::string::npos) && !initTGrid()) {
968 std::lock_guard<std::mutex> guard(
gIOMutex);
969 auto memfile = TMemFile::Open(url.c_str(),
"OPEN");
971 auto cl = tinfo2TClass(tinfo);
973 if (headers && headers->find(
"fileSize") == headers->end()) {
974 (*headers)[
"fileSize"] = fmt::format(
"{}", memfile->GetEND());
982void* CcdbApi::interpretAsTMemFileAndExtract(
char* contentptr,
size_t contentsize, std::type_info
const& tinfo)
985 Int_t previousErrorLevel = gErrorIgnoreLevel;
986 gErrorIgnoreLevel = kFatal;
987 std::lock_guard<std::mutex> guard(
gIOMutex);
988 TMemFile memFile(
"name", contentptr, contentsize,
"READ");
989 gErrorIgnoreLevel = previousErrorLevel;
990 if (!memFile.IsZombie()) {
991 auto tcl = tinfo2TClass(tinfo);
1002void* CcdbApi::navigateURLsAndRetrieveContent(CURL* curl_handle, std::string
const& url, std::type_info
const& tinfo, std::map<string, string>* headers)
const
1007 static thread_local std::multimap<std::string, std::string> headerData;
1010 if ((url.find(
"alien:/", 0) != std::string::npos) || (url.find(
"file:/", 0) != std::string::npos)) {
1011 return downloadFilesystemContent(url, tinfo, headers);
1018 curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str());
1020 MemoryStruct chunk{(
char*)malloc(1), 0};
1021 initCurlOptionsForRetrieve(curl_handle, (
void*)&chunk, WriteMemoryCallback,
false);
1023 curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, header_map_callback<
decltype(headerData)>);
1025 curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, (
void*)&headerData);
1029 auto res = CURL_perform(curl_handle);
1030 long response_code = -1;
1031 void* content =
nullptr;
1033 if (
res == CURLE_OK && curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &response_code) == CURLE_OK) {
1035 for (
auto& p : headerData) {
1036 (*headers)[p.first] = p.second;
1039 if (200 <= response_code && response_code < 300) {
1041 content = interpretAsTMemFileAndExtract(chunk.memory, chunk.size, tinfo);
1042 if (headers && headers->find(
"fileSize") == headers->end()) {
1043 (*headers)[
"fileSize"] = fmt::format(
"{}", chunk.size);
1045 }
else if (response_code == 304) {
1050 LOGP(
debug,
"Object exists but I am not serving it since it's already in your possession");
1053 else if (300 <= response_code && response_code < 400) {
1059 auto complement_Location = [
this](std::string
const& loc) {
1060 if (loc[0] ==
'/') {
1067 std::vector<std::string> locs;
1068 auto iter = headerData.find(
"Location");
1069 if (iter != headerData.end()) {
1070 locs.push_back(complement_Location(iter->second));
1073 auto iter2 = headerData.find(
"Content-Location");
1074 if (iter2 != headerData.end()) {
1075 auto range = headerData.equal_range(
"Content-Location");
1076 for (
auto it =
range.first; it !=
range.second; ++it) {
1077 if (std::find(locs.begin(), locs.end(), it->second) == locs.end()) {
1078 locs.push_back(complement_Location(it->second));
1082 for (
auto& l : locs) {
1084 LOG(
debug) <<
"Trying content location " << l;
1085 content = navigateURLsAndRetrieveContent(curl_handle, l, tinfo, headers);
1091 }
else if (response_code == 404) {
1092 LOG(error) <<
"Requested resource does not exist: " << url;
1095 LOG(error) <<
"Error in fetching object " << url <<
", curl response code:" << response_code;
1099 if (chunk.memory !=
nullptr) {
1103 LOGP(alarm,
"Curl request to {} failed with result {}, response code: {}", url,
int(
res), response_code);
1108 (*headers)[
"Error"] =
"An error occurred during retrieval";
1114 std::map<std::string, std::string>
const& metadata,
long timestamp,
1115 std::map<std::string, std::string>* headers, std::string
const& etag,
1116 const std::string& createdNotAfter,
const std::string& createdNotBefore)
const
1118 if (!mSnapshotCachePath.empty()) {
1120 auto semaphore_barrier = std::make_unique<CCDBSemaphore>(mSnapshotCachePath,
path);
1121 std::string logfile = mSnapshotCachePath +
"/log";
1122 std::fstream out(logfile, ios_base::out | ios_base::app);
1123 if (out.is_open()) {
1124 out <<
"CCDB-access[" << getpid() <<
"] of " << mUniqueAgentID <<
" to " <<
path <<
" timestamp " << timestamp <<
"\n";
1126 auto snapshotfile = getSnapshotFile(mSnapshotCachePath,
path);
1127 bool snapshoting =
false;
1128 if (!std::filesystem::exists(snapshotfile)) {
1130 out <<
"CCDB-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
" downloading to snapshot " << snapshotfile <<
"\n";
1133 out <<
"CCDB-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
" failed to create directory for " << snapshotfile <<
"\n";
1136 out <<
"CCDB-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
"serving from local snapshot " << snapshotfile <<
"\n";
1139 auto res = extractFromLocalFile(snapshotfile, tinfo, headers);
1141 logReading(
path, timestamp, headers,
"retrieve from snapshot");
1148 CURL* curl_handle = curl_easy_init();
1149 curl_easy_setopt(curl_handle, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1150 string fullUrl = getFullUrlForRetrieval(curl_handle,
path, metadata, timestamp);
1152 if (mInSnapshotMode) {
1153 auto res = extractFromLocalFile(fullUrl, tinfo, headers);
1155 logReading(
path, timestamp, headers,
"retrieve from snapshot");
1160 curl_slist* option_list =
nullptr;
1161 initCurlHTTPHeaderOptionsForRetrieve(curl_handle, option_list, timestamp, headers, etag, createdNotAfter, createdNotBefore);
1162 auto content = navigateURLsAndRetrieveContent(curl_handle, fullUrl, tinfo, headers);
1164 for (
size_t hostIndex = 1; hostIndex < hostsPool.size() && !(content); hostIndex++) {
1165 fullUrl = getFullUrlForRetrieval(curl_handle,
path, metadata, timestamp, hostIndex);
1166 content = navigateURLsAndRetrieveContent(curl_handle, fullUrl, tinfo, headers);
1169 logReading(
path, timestamp, headers,
"retrieve");
1171 curl_slist_free_all(option_list);
1172 curl_easy_cleanup(curl_handle);
1178 size_t newLength =
size * nmemb;
1179 size_t oldLength = s->size();
1181 s->resize(oldLength + newLength);
1182 }
catch (std::bad_alloc& e) {
1183 LOG(error) <<
"memory error when getting data from CCDB";
1187 std::copy((
char*)contents, (
char*)contents + newLength, s->begin() + oldLength);
1188 return size * nmemb;
1191std::string
CcdbApi::list(std::string
const&
path,
bool latestOnly, std::string
const& returnFormat,
long createdNotAfter,
long createdNotBefore)
const
1194 CURLcode
res = CURL_LAST;
1197 curl = curl_easy_init();
1198 if (curl !=
nullptr) {
1200 curl_easy_setopt(curl, CURLOPT_WRITEDATA, &
result);
1201 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1203 struct curl_slist* headers =
nullptr;
1204 headers = curl_slist_append(headers, (
string(
"Accept: ") + returnFormat).c_str());
1205 headers = curl_slist_append(headers, (
string(
"Content-Type: ") + returnFormat).c_str());
1206 if (createdNotAfter >= 0) {
1207 headers = curl_slist_append(headers, (
"If-Not-After: " +
std::to_string(createdNotAfter)).c_str());
1209 if (createdNotBefore >= 0) {
1210 headers = curl_slist_append(headers, (
"If-Not-Before: " +
std::to_string(createdNotBefore)).c_str());
1212 curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
1218 for (
size_t hostIndex = 0; hostIndex < hostsPool.size() &&
res != CURLE_OK; hostIndex++) {
1219 fullUrl = getHostUrl(hostIndex);
1220 fullUrl += latestOnly ?
"/latest/" :
"/browse/";
1222 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.c_str());
1224 res = CURL_perform(curl);
1225 if (
res != CURLE_OK) {
1226 LOGP(alarm,
"CURL_perform() failed: {}", curl_easy_strerror(
res));
1229 curl_slist_free_all(headers);
1230 curl_easy_cleanup(curl);
1236std::string CcdbApi::getTimestampString(
long timestamp)
const
1247 stringstream fullUrl;
1250 curl = curl_easy_init();
1251 if (curl !=
nullptr) {
1252 curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST,
"DELETE");
1253 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1256 for (
size_t hostIndex = 0; hostIndex < hostsPool.size(); hostIndex++) {
1257 fullUrl << getHostUrl(hostIndex) <<
"/" <<
path <<
"/" << timestampLocal;
1258 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.str().c_str());
1261 res = CURL_perform(curl);
1262 if (
res != CURLE_OK) {
1263 LOGP(alarm,
"CURL_perform() failed: {}", curl_easy_strerror(
res));
1265 curl_easy_cleanup(curl);
1274 stringstream fullUrl;
1275 for (
size_t i = 0;
i < hostsPool.size();
i++) {
1276 string url = getHostUrl(
i);
1277 fullUrl << url <<
"/truncate/" <<
path;
1279 curl = curl_easy_init();
1280 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1281 if (curl !=
nullptr) {
1282 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.str().c_str());
1287 res = CURL_perform(curl);
1288 if (
res != CURLE_OK) {
1289 LOGP(alarm,
"CURL_perform() failed: {}", curl_easy_strerror(
res));
1291 curl_easy_cleanup(curl);
1298 return size * nmemb;
1304 CURLcode
res = CURL_LAST;
1307 curl = curl_easy_init();
1308 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1310 for (
size_t hostIndex = 0; hostIndex < hostsPool.size() &&
res != CURLE_OK; hostIndex++) {
1311 curl_easy_setopt(curl, CURLOPT_URL, mUrl.data());
1312 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION,
write_data);
1314 res = CURL_perform(curl);
1319 curl_easy_cleanup(curl);
1328 std::stringstream ss(reply.c_str());
1330 std::vector<std::string> folders;
1332 size_t numberoflines = std::count(reply.begin(), reply.end(),
'\n');
1333 bool inSubFolderSection =
false;
1335 for (
size_t linenumber = 0; linenumber < numberoflines; ++linenumber) {
1336 std::getline(ss, line);
1337 if (inSubFolderSection && line.size() > 0) {
1342 if (line.compare(
"Subfolders:") == 0) {
1343 inSubFolderSection =
true;
1351size_t header_callback(
char*
buffer,
size_t size,
size_t nitems,
void* userdata)
1353 auto* headers =
static_cast<std::vector<std::string>*
>(userdata);
1354 auto header = std::string(
buffer,
size * nitems);
1355 headers->emplace_back(std::string(header.data()));
1356 return size * nitems;
1364 auto p = std::filesystem::path(
filename).parent_path();
1365 if (!std::filesystem::exists(p)) {
1366 std::filesystem::create_directories(p);
1369 rapidjson::StringBuffer
buffer;
1370 rapidjson::Writer<rapidjson::StringBuffer> writer(
buffer);
1371 writer.StartObject();
1372 for (
const auto& pair : meta) {
1373 writer.Key(pair.first.c_str());
1374 writer.String(pair.second.c_str());
1380 if (
file.is_open()) {
1393 if (!
file.is_open()) {
1394 std::cerr <<
"Failed to open file for reading." << std::endl;
1398 std::string jsonStr((std::istreambuf_iterator<char>(
file)), std::istreambuf_iterator<char>());
1401 rapidjson::Document document;
1402 document.Parse(jsonStr.c_str());
1404 if (document.HasParseError()) {
1405 std::cerr <<
"Error parsing JSON" << std::endl;
1410 for (
auto itr = document.MemberBegin(); itr != document.MemberEnd(); ++itr) {
1411 meta[itr->name.GetString()] = itr->value.GetString();
1416std::map<std::string, std::string>
CcdbApi::retrieveHeaders(std::string
const&
path, std::map<std::string, std::string>
const& metadata,
long timestamp)
const
1419 auto do_remote_header_call = [
this, &
path, &metadata, timestamp]() -> std::map<std::string, std::string> {
1420 CURL* curl = curl_easy_init();
1421 CURLcode
res = CURL_LAST;
1422 string fullUrl = getFullUrlForRetrieval(curl,
path, metadata, timestamp);
1423 std::map<std::string, std::string> headers;
1425 if (curl !=
nullptr) {
1426 struct curl_slist*
list =
nullptr;
1429 curl_easy_setopt(curl, CURLOPT_HTTPHEADER,
list);
1432 curl_easy_setopt(curl, CURLOPT_NOBODY, 1L);
1433 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
1434 curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, header_map_callback<>);
1435 curl_easy_setopt(curl, CURLOPT_HEADERDATA, &headers);
1436 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1441 long httpCode = 404;
1442 CURLcode getCodeRes = CURL_LAST;
1443 for (
size_t hostIndex = 0; hostIndex < hostsPool.size() && (httpCode >= 400 ||
res > 0 || getCodeRes > 0); hostIndex++) {
1444 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.c_str());
1445 res = CURL_perform(curl);
1446 if (
res != CURLE_OK &&
res != CURLE_UNSUPPORTED_PROTOCOL) {
1450 LOG(error) <<
"CURL_perform() failed: " << curl_easy_strerror(
res);
1452 getCodeRes = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode);
1454 if (httpCode == 404) {
1457 curl_easy_cleanup(curl);
1462 if (!mSnapshotCachePath.empty()) {
1464 auto semaphore_barrier = std::make_unique<CCDBSemaphore>(mSnapshotCachePath,
path);
1466 std::string logfile = mSnapshotCachePath +
"/log";
1467 std::fstream out(logfile, ios_base::out | ios_base::app);
1468 if (out.is_open()) {
1469 out <<
"CCDB-header-access[" << getpid() <<
"] of " << mUniqueAgentID <<
" to " <<
path <<
" timestamp " << timestamp <<
"\n";
1471 auto snapshotfile = getSnapshotFile(mSnapshotCachePath,
path +
"/" +
std::to_string(timestamp),
"header.json");
1472 if (!std::filesystem::exists(snapshotfile)) {
1473 out <<
"CCDB-header-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
" storing to snapshot " << snapshotfile <<
"\n";
1476 auto meta = do_remote_header_call();
1480 LOG(warn) <<
"Failed to cache the header information to disc";
1484 out <<
"CCDB-header-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
"serving from local snapshot " << snapshotfile <<
"\n";
1485 std::map<std::string, std::string> meta;
1487 LOG(warn) <<
"Failed to read cached information from disc";
1488 return do_remote_header_call();
1493 return do_remote_header_call();
1498 auto curl = curl_easy_init();
1504 struct curl_slist*
list =
nullptr;
1505 list = curl_slist_append(
list, (
"If-None-Match: " + etag).c_str());
1507 curl_easy_setopt(curl, CURLOPT_HTTPHEADER,
list);
1509 curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
1511 curl_easy_setopt(curl, CURLOPT_NOBODY, 1L);
1512 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
1513 curl_easy_setopt(curl, CURLOPT_HEADERFUNCTION, header_callback);
1514 curl_easy_setopt(curl, CURLOPT_HEADERDATA, &headers);
1515 if (!agentID.empty()) {
1516 curl_easy_setopt(curl, CURLOPT_USERAGENT, agentID.c_str());
1522 curl_easy_perform(curl);
1523 long http_code = 404;
1524 curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code);
1525 if (http_code == 304) {
1533 static std::string etagHeader =
"ETag: ";
1534 static std::string locationHeader =
"Content-Location: ";
1535 for (
auto h : headers) {
1536 if (
h.find(etagHeader) == 0) {
1537 etag = std::string(
h.data() + etagHeader.size());
1538 }
else if (
h.find(locationHeader) == 0) {
1539 pfns.emplace_back(std::string(
h.data() + locationHeader.size(),
h.size() - locationHeader.size()));
1555 auto object =
file.GetObjectChecked(
CCDBMETA_ENTRY, TClass::GetClass(
typeid(std::map<std::string, std::string>)));
1557 return static_cast<std::map<std::string, std::string>*
>(
object);
1564void traverseAndFillFolders(
CcdbApi const& api, std::string
const&
top, std::vector<std::string>& folders)
1568 folders.emplace_back(
top);
1571 if (subfolders.size() > 0) {
1573 for (
auto& sub : subfolders) {
1574 traverseAndFillFolders(api, sub, folders);
1584 std::vector<std::string> folders;
1585 traverseAndFillFolders(*
this,
top, folders);
1589TClass* CcdbApi::tinfo2TClass(std::type_info
const& tinfo)
1591 TClass* cl = TClass::GetClass(tinfo);
1593 throw std::runtime_error(fmt::format(
"Could not retrieve ROOT dictionary for type {}, aborting", tinfo.name()));
1599int CcdbApi::updateMetadata(std::string
const&
path, std::map<std::string, std::string>
const& metadata,
long timestamp, std::string
const&
id,
long newEOV)
1602 CURL* curl = curl_easy_init();
1603 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1604 if (curl !=
nullptr) {
1606 stringstream fullUrl;
1607 for (
size_t hostIndex = 0; hostIndex < hostsPool.size(); hostIndex++) {
1608 fullUrl << getHostUrl(hostIndex) <<
"/" <<
path <<
"/" << timestamp;
1610 fullUrl <<
"/" << newEOV;
1613 fullUrl <<
"/" <<
id;
1617 for (
auto& kv : metadata) {
1618 string mfirst = kv.first;
1619 string msecond = kv.second;
1621 char* mfirstEncoded = curl_easy_escape(curl, mfirst.c_str(), mfirst.size());
1622 char* msecondEncoded = curl_easy_escape(curl, msecond.c_str(), msecond.size());
1623 fullUrl <<
string(mfirstEncoded) +
"=" +
string(msecondEncoded) +
"&";
1624 curl_free(mfirstEncoded);
1625 curl_free(msecondEncoded);
1628 if (curl !=
nullptr) {
1629 LOG(
debug) <<
"passing to curl: " << fullUrl.str();
1630 curl_easy_setopt(curl, CURLOPT_URL, fullUrl.str().c_str());
1631 curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST,
"PUT");
1632 curl_easy_setopt(curl, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1633 curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
1637 res = CURL_perform(curl);
1638 if (
res != CURLE_OK) {
1639 LOGP(alarm,
"CURL_perform() failed: {}, code: {}", curl_easy_strerror(
res),
int(
res));
1644 curl_easy_cleanup(curl);
1651std::vector<std::string> CcdbApi::splitString(
const std::string&
str,
const char* delimiters)
1653 std::vector<std::string> tokens;
1654 char stringForStrTok[
str.length() + 1];
1655 strcpy(stringForStrTok,
str.c_str());
1656 char* token = strtok(stringForStrTok, delimiters);
1657 while (token !=
nullptr) {
1658 tokens.emplace_back(token);
1659 token = strtok(
nullptr, delimiters);
1664void CcdbApi::initHostsPool(std::string hosts)
1669std::string CcdbApi::getHostUrl(
int hostIndex)
const
1671 return hostsPool.at(hostIndex);
1677 data->hoPair.object = &requestContext.
dest;
1679 std::function<bool(std::string)> localContentCallback = [
this, &requestContext](std::string url) {
1683 auto writeCallback = [](
void* contents,
size_t size,
size_t nmemb,
void* chunkptr) {
1685 auto& chunk = *ho.
object;
1686 size_t realsize =
size * nmemb, sz = 0;
1689 if (chunk.capacity() < chunk.size() + realsize) {
1691 const char hannot[] =
"header";
1692 size_t hsize = getFlatHeaderSize(ho.header);
1693 auto cl = ho.header.find(
"Content-Length");
1694 if (cl != ho.header.end()) {
1695 size_t sizeFromHeader = std::stol(cl->second);
1696 sz = hsize + std::max(chunk.size() * (sizeFromHeader ? 1 : 2) + realsize, sizeFromHeader);
1698 sz = hsize + std::max(chunk.size() * 2, chunk.size() + realsize);
1703 char* contC = (
char*)contents;
1704 chunk.insert(chunk.end(), contC, contC + realsize);
1705 }
catch (std::exception e) {
1712 CURL* curl_handle = curl_easy_init();
1713 curl_easy_setopt(curl_handle, CURLOPT_USERAGENT, mUniqueAgentID.c_str());
1714 string fullUrl = getFullUrlForRetrieval(curl_handle, requestContext.
path, requestContext.
metadata, requestContext.
timestamp);
1715 curl_slist* options_list =
nullptr;
1716 initCurlHTTPHeaderOptionsForRetrieve(curl_handle, options_list, requestContext.
timestamp, &requestContext.
headers,
1720 data->hosts = hostsPool;
1723 data->localContentCallback = localContentCallback;
1724 data->userAgent = mUniqueAgentID;
1725 data->optionsList = options_list;
1727 curl_easy_setopt(curl_handle, CURLOPT_URL, fullUrl.c_str());
1728 initCurlOptionsForRetrieve(curl_handle, (
void*)(&
data->hoPair), writeCallback,
false);
1729 curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, header_map_callback<
decltype(
data->hoPair.header)>);
1730 curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, (
void*)&(
data->hoPair.header));
1731 curl_easy_setopt(curl_handle, CURLOPT_PRIVATE, (
void*)
data);
1734 asynchPerform(curl_handle, requestCounter);
1739 std::hash<std::string> hasher;
1740 std::string semhashedstring =
"aliceccdb" +
std::to_string(hasher(basedir + ccdbpath)).substr(0, 16);
1741 return semhashedstring;
1749 return new boost::interprocess::named_semaphore(boost::interprocess::open_or_create_t{}, semhashedstring.c_str(), 1);
1750 }
catch (std::exception e) {
1751 LOG(warn) <<
"Exception occurred during CCDB (cache) semaphore setup; Continuing without";
1760 if (sem->try_wait()) {
1771 boost::interprocess::named_semaphore semaphore(boost::interprocess::open_only, semaname.c_str());
1772 std::cout <<
"Found CCDB semaphore: " << semaname <<
"\n";
1774 auto success = boost::interprocess::named_semaphore::remove(semaname.c_str());
1776 std::cout <<
"Removed CCDB semaphore: " << semaname <<
"\n";
1781 }
catch (std::exception
const& e) {
1792 namespace fs = std::filesystem;
1793 std::string fileName{
"snapshot.root"};
1795 auto absolutesnapshotdir = fs::weakly_canonical(fs::absolute(snapshotdir));
1796 for (
const auto&
entry : fs::recursive_directory_iterator(absolutesnapshotdir)) {
1797 if (
entry.is_directory()) {
1798 const fs::path& currentDir = fs::canonical(fs::absolute(
entry.path()));
1799 fs::path filePath = currentDir / fileName;
1800 if (fs::exists(filePath) && fs::is_regular_file(filePath)) {
1801 std::cout <<
"Directory with file '" << fileName <<
"': " << currentDir << std::endl;
1805 auto numtokens = pathtokens.size();
1806 if (numtokens < 3) {
1811 std::string
path = pathtokens[numtokens - 3] +
"/" + pathtokens[numtokens - 2] +
"/" + pathtokens[numtokens - 1];
1817 }
catch (std::exception
const& e) {
1818 LOG(info) <<
"Semaphore search had exception " << e.what();
1823 long timestamp, std::map<std::string, std::string>& headers,
1824 std::string& snapshotpath,
o2::pmr::vector<char>& dest,
int& fromSnapshot, std::string
const& etag)
const
1826 if (createSnapshot) {
1827 std::string logfile = mSnapshotCachePath +
"/log";
1828 std::fstream logStream = std::fstream(logfile, ios_base::out | ios_base::app);
1829 if (logStream.is_open()) {
1830 logStream <<
"CCDB-access[" << getpid() <<
"] of " << mUniqueAgentID <<
" to " <<
path <<
" timestamp " << timestamp <<
" for load to memory\n";
1833 if (mInSnapshotMode) {
1838 }
else if (mPreferSnapshotCache && std::filesystem::exists(snapshotpath)) {
1850 if (!mSnapshotCachePath.empty() && !(mInSnapshotMode && mSnapshotTopPath == mSnapshotCachePath)) {
1851 auto semaphore_barrier = std::make_unique<CCDBSemaphore>(mSnapshotCachePath, requestContext.
path);
1853 auto snapshotdir = getSnapshotDir(mSnapshotCachePath, requestContext.
path);
1854 std::string snapshotpath = getSnapshotFile(mSnapshotCachePath, requestContext.
path);
1856 std::fstream logStream;
1857 if (logStream.is_open()) {
1858 logStream <<
"CCDB-access[" << getpid() <<
"] ... " << mUniqueAgentID <<
" downloading to snapshot " << snapshotpath <<
" from memory\n";
1861 LOGP(
debug,
"creating snapshot {} -> {}", requestContext.
path, snapshotpath);
1864 std::ofstream objFile(snapshotpath, std::ios::out | std::ofstream::binary);
1865 std::copy(requestContext.
dest.begin(), requestContext.
dest.end(), std::ostreambuf_iterator<char>(objFile));
1868 updateMetaInformationInLocalFile(snapshotpath, &requestContext.
headers, &querysummary);
1874 std::map<std::string, std::string>
const& metadata,
long timestamp,
1875 std::map<std::string, std::string>* headers, std::string
const& etag,
1876 const std::string& createdNotAfter,
const std::string& createdNotBefore,
bool considerSnapshot)
const
1879 destP.reserve(dest.size());
1880 loadFileToMemory(destP,
path, metadata, timestamp, headers, etag, createdNotAfter, createdNotBefore, considerSnapshot);
1882 dest.reserve(destP.size());
1883 for (
const auto c : destP) {
1889 std::map<std::string, std::string>
const& metadata,
long timestamp,
1890 std::map<std::string, std::string>* headers, std::string
const& etag,
1891 const std::string& createdNotAfter,
const std::string& createdNotBefore,
bool considerSnapshot)
const
1898 requestContext.
etag = etag;
1902 std::vector<RequestContext> contexts = {requestContext};
1908 size_t hsize = getFlatHeaderSize(headers), cnt = dest.size();
1909 dest.resize(cnt + hsize);
1910 auto addString = [&dest, &cnt](
const std::string& s) {
1917 for (
auto&
h : headers) {
1919 addString(
h.second);
1921 *
reinterpret_cast<int*
>(&dest[cnt]) = hsize;
1922 std::memcpy(&dest[cnt +
sizeof(
int)], FlatHeaderAnnot,
sizeof(FlatHeaderAnnot));
1927 LOGP(
debug,
"loadFileToMemory {} ETag=[{}]", requestContext.
path, requestContext.
etag);
1928 bool createSnapshot = requestContext.
considerSnapshot && !mSnapshotCachePath.empty();
1930 std::string snapshotpath;
1931 if (mInSnapshotMode || std::filesystem::exists(snapshotpath = getSnapshotFile(mSnapshotCachePath, requestContext.
path))) {
1932 auto semaphore_barrier = std::make_unique<CCDBSemaphore>(mSnapshotCachePath, requestContext.
path);
1943 std::vector<int> fromSnapshots(requestContexts.size());
1944 size_t requestCounter = 0;
1947 for (
int i = 0;
i < requestContexts.size();
i++) {
1949 auto& requestContext = requestContexts.at(
i);
1954 while (requestCounter > 0) {
1959 for (
int i = 0;
i < requestContexts.size();
i++) {
1960 auto& requestContext = requestContexts.at(
i);
1961 if (!requestContext.dest.empty()) {
1962 logReading(requestContext.path, requestContext.timestamp, &requestContext.headers,
1963 fmt::format(
"{}{}", requestContext.considerSnapshot ?
"load to memory" :
"retrieve", fromSnapshots.at(
i) ?
" from snapshot" :
""));
1964 if (requestContext.considerSnapshot && fromSnapshots.at(
i) != 2) {
1973 if (url.find(
"alien:/", 0) != std::string::npos) {
1977 if ((url.find(
"file:/", 0) != std::string::npos)) {
1978 std::string
path = url.substr(7);
1979 if (std::filesystem::exists(
path)) {
1990 constexpr size_t MaxCopySize = 0x1L << 25;
1991 auto signalError = [&dest, localHeaders]() {
1995 (*localHeaders)[
"Error"] =
"An error occurred during retrieval";
1998 if (
path.find(
"alien:/") == 0 && !initTGrid()) {
2002 std::string fname(
path);
2003 if (fname.find(
"?filetype=raw") == std::string::npos) {
2004 fname +=
"?filetype=raw";
2006 std::unique_ptr<TFile> sfile{TFile::Open(fname.c_str())};
2007 if (!sfile || sfile->IsZombie()) {
2008 LOG(error) <<
"Failed to open file " << fname;
2012 size_t totalread = 0, fsize = sfile->GetSize(), b00 = sfile->GetBytesRead();
2014 char* dptr = dest.data();
2018 size_t b0 = sfile->GetBytesRead(),
b1 = b0 - b00;
2019 size_t readsize = fsize -
b1 > MaxCopySize ? MaxCopySize : fsize -
b1;
2020 if (readsize == 0) {
2023 sfile->Seek(totalread, TFile::kBeg);
2024 bool failed = sfile->ReadBuffer(dptr, (Int_t)readsize);
2025 nread = sfile->GetBytesRead() - b0;
2026 if (
failed || nread < 0) {
2027 LOG(error) <<
"failed to copy file " << fname <<
" to memory buffer";
2033 }
while (nread == (
long)MaxCopySize);
2036 TMemFile memFile(
"name",
const_cast<char*
>(dest.data()), dest.size(),
"READ");
2037 auto storedmeta = (std::map<std::string, std::string>*)
extractFromTFile(memFile, TClass::GetClass(
"std::map<std::string, std::string>"),
CCDBMETA_ENTRY);
2039 *localHeaders = *storedmeta;
2042 if ((
isSnapshotMode() || mPreferSnapshotCache) && localHeaders->find(
"ETag") == localHeaders->end()) {
2043 (*localHeaders)[
"ETag"] =
path;
2045 if (localHeaders->find(
"fileSize") == localHeaders->end()) {
2046 (*localHeaders)[
"fileSize"] = fmt::format(
"{}", memFile.GetEND());
2052void CcdbApi::checkMetadataKeys(std::map<std::string, std::string>
const& metadata)
const
2058 const std::regex regexPatternSearch(R
"([ :;.,\\/'?!\(\)\{\}\[\]@<>=+*#$&`|~^%])");
2059 bool isInvalid =
false;
2061 for (
auto& el : metadata) {
2062 auto keyMd = el.first;
2064 std::smatch searchRes;
2065 while (std::regex_search(keyMd, searchRes, regexPatternSearch)) {
2067 LOG(error) <<
"Invalid character found in metadata key '" << tmp <<
"\': '" << searchRes.str() <<
"\'";
2068 keyMd = searchRes.suffix();
2072 LOG(fatal) <<
"Some metadata keys have invalid characters, please fix!";
2077void CcdbApi::logReading(
const std::string&
path,
long ts,
const std::map<std::string, std::string>* headers,
const std::string& comment)
const
2079 std::string upath{
path};
2081 auto ent = headers->find(
"Valid-From");
2082 if (ent != headers->end()) {
2083 upath +=
"/" + ent->second;
2085 ent = headers->find(
"ETag");
2086 if (ent != headers->end()) {
2087 upath +=
"/" + ent->second;
2090 upath.erase(remove(upath.begin(), upath.end(),
'\"'), upath.end());
2091 LOGP(info,
"ccdb reads {}{}{} for {} ({}, agent_id: {}), ", mUrl, mUrl.back() ==
'/' ?
"" :
"/", upath, ts < 0 ?
getCurrentTimestamp() : ts, comment, mUniqueAgentID);
2094void CcdbApi::asynchPerform(CURL* handle,
size_t* requestCounter)
const
2099CURLcode CcdbApi::CURL_perform(CURL* handle)
const
2101 if (mIsCCDBDownloaderPreferred) {
2102 return mDownloader->
perform(handle);
2105 for (
int i = 1;
i <= mCurlRetries && (
result = curl_easy_perform(handle)) != CURLE_OK;
i++) {
2106 usleep(mCurlDelayRetries *
i);
2117 LOG(
debug) <<
"Entering semaphore barrier";
2120 mSem =
new boost::interprocess::named_semaphore(boost::interprocess::open_or_create_t{}, mSemName.c_str(), 1);
2121 }
catch (std::exception e) {
2122 LOG(warn) <<
"Exception occurred during CCDB (cache) semaphore setup; Continuing without";
2127 gSemaRegistry.
add(
this);
2134 LOG(
debug) <<
"Ending semaphore barrier";
2137 if (mSem->try_wait()) {
2139 boost::interprocess::named_semaphore::remove(mSemName.c_str());
2141 gSemaRegistry.
remove(
this);
2147 LOG(
debug) <<
"Cleaning up semaphore registry with count " << mStore.size();
2148 for (
auto& s : mStore) {
const GPUTPCGMMerger::trackCluster & b1
Class for time synchronization of RawReader instances.
void setRequestTimeoutTime(int timeoutMS)
CURLcode perform(CURL *handle)
void asynchSchedule(CURL *handle, size_t *requestCounter)
void setKeepaliveTimeoutTime(int timeoutMS)
void runLoop(bool noWait)
CCDBSemaphore(std::string const &cachepath, std::string const &path)
static void curlSetSSLOptions(CURL *curl)
static std::string generateFileName(const std::string &inp)
std::string list(std::string const &path="", bool latestOnly=false, std::string const &returnFormat="text/plain", long createdNotAfter=-1, long createdNotBefore=-1) const
int storeAsTFile_impl(const void *obj1, std::type_info const &info, std::string const &path, std::map< std::string, std::string > const &metadata, long startValidityTimestamp=-1, long endValidityTimestamp=-1, std::vector< char >::size_type maxSize=0) const
static bool checkAlienToken()
void runDownloaderLoop(bool noWait)
void releaseNamedSemaphore(boost::interprocess::named_semaphore *sem, std::string const &path) const
bool retrieveBlob(std::string const &path, std::string const &targetdir, std::map< std::string, std::string > const &metadata, long timestamp, bool preservePathStructure=true, std::string const &localFileName="snapshot.root", std::string const &createdNotAfter="", std::string const &createdNotBefore="") const
static std::map< std::string, std::string > * retrieveMetaInfo(TFile &)
void scheduleDownload(RequestContext &requestContext, size_t *requestCounter) const
TObject * retrieve(std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp) const
void init(std::string const &hosts)
TObject * retrieveFromTFile(std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp, std::map< std::string, std::string > *headers, std::string const &etag, const std::string &createdNotAfter, const std::string &createdNotBefore) const
std::string const & getURL() const
void getFromSnapshot(bool createSnapshot, std::string const &path, long timestamp, std::map< std::string, std::string > &headers, std::string &snapshotpath, o2::pmr::vector< char > &dest, int &fromSnapshot, std::string const &etag) const
bool loadLocalContentToMemory(o2::pmr::vector< char > &dest, std::string &url) const
static void removeLeakingSemaphores(std::string const &basedir, bool remove=false)
void saveSnapshot(RequestContext &requestContext) const
static std::unique_ptr< std::vector< char > > createObjectImage(const T *obj, CcdbObjectInfo *info=nullptr)
static void * extractFromTFile(TFile &file, TClass const *cl, const char *what=CCDBOBJECT_ENTRY)
int storeAsTFile(const TObject *rootObject, std::string const &path, std::map< std::string, std::string > const &metadata, long startValidityTimestamp=-1, long endValidityTimestamp=-1, std::vector< char >::size_type maxSize=0) const
void snapshot(std::string const &ccdbrootpath, std::string const &localDir, long timestamp) const
bool isSnapshotMode() const
bool isHostReachable() const
void loadFileToMemory(std::vector< char > &dest, std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp, std::map< std::string, std::string > *headers, std::string const &etag, const std::string &createdNotAfter, const std::string &createdNotBefore, bool considerSnapshot=true) const
static std::string determineSemaphoreName(std::string const &basedir, std::string const &objectpath)
void deleteObject(std::string const &path, long timestamp=-1) const
static void appendFlatHeader(o2::pmr::vector< char > &dest, const std::map< std::string, std::string > &headers)
std::vector< std::string > getAllFolders(std::string const &top) const
void vectoredLoadFileToMemory(std::vector< RequestContext > &requestContext) const
boost::interprocess::named_semaphore * createNamedSemaphore(std::string const &path) const
static bool removeSemaphore(std::string const &name, bool remove=false)
std::map< std::string, std::string > retrieveHeaders(std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp=-1) const
CcdbApi()
Default constructor.
static bool getCCDBEntryHeaders(std::string const &url, std::string const &etag, std::vector< std::string > &headers, const std::string &agentID="")
static CCDBQuery * retrieveQueryInfo(TFile &)
static constexpr const char * CCDBQUERY_ENTRY
void truncate(std::string const &path) const
int storeAsBinaryFile(const char *buffer, size_t size, const std::string &fileName, const std::string &objectType, const std::string &path, const std::map< std::string, std::string > &metadata, long startValidityTimestamp, long endValidityTimestamp, std::vector< char >::size_type maxSize=0) const
static void parseCCDBHeaders(std::vector< std::string > const &headers, std::vector< std::string > &pfns, std::string &etag)
int updateMetadata(std::string const &path, std::map< std::string, std::string > const &metadata, long timestamp, std::string const &id="", long newEOV=0)
static constexpr const char * CCDBMETA_ENTRY
static constexpr const char * CCDBOBJECT_ENTRY
void navigateSourcesAndLoadFile(RequestContext &requestContext, int &fromSnapshot, size_t *requestCounter) const
std::vector< std::string > parseSubFolders(std::string const &reply) const
virtual ~CcdbApi()
Default destructor.
void setFileName(const std::string &nm)
const std::string & getObjectType() const
void setObjectType(const std::string &tp)
const std::string & getFileName() const
void add(CCDBSemaphore const *ptr)
void remove(CCDBSemaphore const *ptr)
SemaphoreRegistry()=default
GLsizei const GLchar *const * string
GLdouble GLdouble GLdouble GLdouble top
GLsizei const GLfloat * value
GLsizei const GLchar *const * path
GLbitfield GLuint64 timeout
GLenum GLuint GLenum GLsizei const GLchar * buf
information complementary to a CCDB object (path, metadata, startTimeValidity, endTimeValidity etc)
bool stdmap_to_jsonfile(std::map< std::string, std::string > const &meta, std::string const &filename)
size_t write_data(void *, size_t size, size_t nmemb, void *)
long getCurrentTimestamp()
returns the timestamp in long corresponding to "now"
size_t(*)(void *, size_t, size_t, void *) CurlWriteCallback
std::string sanitizeObjectName(const std::string &objectName)
bool jsonfile_to_stdmap(std::map< std::string, std::string > &meta, std::string const &filename)
long getFutureTimestamp(int secondsInFuture)
returns the timestamp in long corresponding to "now + secondsInFuture"
size_t CurlWrite_CallbackFunc_StdString2(void *contents, size_t size, size_t nmemb, std::string *s)
std::string timestamp() noexcept
Defining PrimaryVertex explicitly as messageable.
std::vector< T, o2::pmr::polymorphic_allocator< T > > vector
void createDirectoriesIfAbsent(std::string const &path)
Defining DataPointCompositeObject explicitly as copiable.
std::string to_string(gsl::span< T, Size > span)
std::map< std::string, std::string > const & metadata
std::string createdNotAfter
std::map< std::string, std::string > & headers
o2::pmr::vector< char > & dest
std::string createdNotBefore
static DeploymentMode deploymentMode()
static std::string getClassName(const T &obj)
get the class name of the object
static std::unique_ptr< FileImage > createFileImage(const TObject &obj, const std::string &fileName, const std::string &objName)
static void trim(std::string &s)
static std::vector< std::string > tokenize(const std::string &src, char delim, bool trimToken=true, bool skipEmpty=true)
static std::string concat_string(Ts const &... ts)
static std::string getRandomString(int length)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"
std::unique_ptr< TTree > tree((TTree *) flIn.Get(std::string(o2::base::NameConf::CTFTREENAME).c_str()))
uint64_t const void const *restrict const msg