18#include <unordered_map>
25#include <condition_variable>
29#include <sys/socket.h>
30#include <fairlogger/Logger.h>
31#include <boost/asio/ip/host_name.hpp>
42 uv_strerror_r(code,
buf, 1000);
54 if (code != CURLE_OK) {
62 if (code != CURLM_OK) {
71 std::string host = boost::asio::ip::host_name();
72 char const* jobID = getenv(
"ALIEN_PROC_ID");
89 setupInternalUVLoop();
93 mTimeoutTimer = (
uv_timer_t*)malloc(
sizeof(*mTimeoutTimer));
94 mTimeoutTimer->data =
this;
98 initializeMultiHandle();
101void CCDBDownloader::setupInternalUVLoop()
107void CCDBDownloader::initializeMultiHandle()
109 mCurlMultiHandle = curl_multi_init();
110 curlMultiErrorCheck(curl_multi_setopt(mCurlMultiHandle, CURLMOPT_SOCKETFUNCTION, handleSocket));
111 auto socketData = &mSocketData;
112 socketData->curlm = mCurlMultiHandle;
113 socketData->CD =
this;
115 curlMultiErrorCheck(curl_multi_setopt(mCurlMultiHandle, CURLMOPT_TIMERFUNCTION, startTimeout));
116 curlMultiErrorCheck(curl_multi_setopt(mCurlMultiHandle, CURLMOPT_TIMERDATA, mTimeoutTimer));
127 while (uv_loop_alive(
mUVLoop) || (uv_loop_close(
mUVLoop) == UV_EBUSY)) {
139 if (CD->mHandleMap.find(handle) != CD->mHandleMap.end()) {
140 if (!uv_is_closing(handle)) {
148 if (handle !=
nullptr) {
153void CCDBDownloader::closesocketCallback(
void* clientp, curl_socket_t item)
155 auto CD = (CCDBDownloader*)clientp;
156 if (CD->mExternalLoop) {
158 if (CD->mSocketTimerMap.find(item) != CD->mSocketTimerMap.end()) {
159 auto timer = CD->mSocketTimerMap[item];
164 delete (DataForClosingSocket*)timer->data;
166 CD->mSocketTimerMap.erase(item);
167 if (close(item) == -1) {
173 if (
close(item) == -1) {
189 if (CD->mExternalLoop) {
190 CD->mSocketTimerMap[sock] = (
uv_timer_t*)malloc(
sizeof(*CD->mSocketTimerMap[sock]));
192 CD->mHandleMap[(
uv_handle_t*)CD->mSocketTimerMap[sock]] =
true;
197 CD->mSocketTimerMap[sock]->data =
data;
203void CCDBDownloader::closeSocketByTimer(
uv_timer_t* handle)
205 auto data = (DataForClosingSocket*)handle->data;
207 auto sock =
data->socket;
209 if (CD->mSocketTimerMap.find(sock) != CD->mSocketTimerMap.end()) {
211 CD->mSocketTimerMap.erase(sock);
212 if (close(sock) == -1) {
220void CCDBDownloader::curlTimeout(
uv_timer_t* handle)
224 curl_multi_socket_action(CD->mCurlMultiHandle, CURL_SOCKET_TIMEOUT, 0, &running_handles);
225 CD->checkMultiInfo();
228void CCDBDownloader::curlPerform(
uv_poll_t* handle,
int status,
int events)
233 if (events & UV_READABLE) {
234 flags |= CURL_CSELECT_IN;
236 if (events & UV_WRITABLE) {
237 flags |= CURL_CSELECT_OUT;
240 auto context = (CCDBDownloader::curl_context_t*)handle->data;
242 curlMultiErrorCheck(curl_multi_socket_action(context->CD->mCurlMultiHandle, context->sockfd,
flags, &running_handles));
243 context->CD->checkMultiInfo();
246int CCDBDownloader::handleSocket(CURL* easy, curl_socket_t s,
int action,
void* userp,
void* socketp)
248 auto socketData = (CCDBDownloader::DataForSocket*)userp;
250 CCDBDownloader::curl_context_t* curl_context;
256 case CURL_POLL_INOUT:
258 curl_context = socketp ? (CCDBDownloader::curl_context_t*)socketp : CD->createCurlContext(
s);
261 if (action != CURL_POLL_IN) {
262 events |= UV_WRITABLE;
264 if (action != CURL_POLL_OUT) {
265 events |= UV_READABLE;
268 if (CD->mExternalLoop && CD->mSocketTimerMap.find(s) != CD->mSocketTimerMap.end()) {
274 case CURL_POLL_REMOVE:
276 if (CD->mExternalLoop) {
278 if (CD->mSocketTimerMap.find(s) != CD->mSocketTimerMap.end()) {
279 uvErrorCheck(uv_timer_start(CD->mSocketTimerMap[s], closeSocketByTimer, CD->mKeepaliveTimeoutMS, 0),
SEVERE);
282 uvErrorCheck(uv_poll_stop(((CCDBDownloader::curl_context_t*)socketp)->poll_handle),
SEVERE);
283 CD->destroyCurlContext((CCDBDownloader::curl_context_t*)socketp);
333CCDBDownloader::curl_context_t* CCDBDownloader::createCurlContext(curl_socket_t sockfd)
335 curl_context_t* context;
337 context = (curl_context_t*)malloc(
sizeof(*context));
339 context->sockfd = sockfd;
340 context->poll_handle = (
uv_poll_t*)malloc(
sizeof(*context->poll_handle));
344 context->poll_handle->data = context;
349void CCDBDownloader::curlCloseCB(
uv_handle_t* handle)
351 auto* context = (curl_context_t*)handle->data;
352 free(context->poll_handle);
356void CCDBDownloader::destroyCurlContext(curl_context_t* context)
358 uv_close((
uv_handle_t*)context->poll_handle, curlCloseCB);
361void CCDBDownloader::tryNewHost(PerformData* performData, CURL* easy_handle)
363 auto requestData = performData->requestData;
364 std::string newUrl = requestData->hosts.at(performData->hostInd) +
"/" + requestData->path +
"/" +
std::to_string(requestData->timestamp);
365 LOG(
debug) <<
"Connecting to another host " << newUrl;
366 requestData->hoPair.header.clear();
367 curl_easy_setopt(easy_handle, CURLOPT_URL, newUrl.c_str());
368 mHandlesToBeAdded.push_back(easy_handle);
371void CCDBDownloader::getLocalContent(PerformData* performData, std::string& newLocation,
bool& contentRetrieved, std::vector<std::string>&
locations)
373 auto requestData = performData->requestData;
374 LOG(
debug) <<
"Redirecting to local content " << newLocation <<
"\n";
375 if (requestData->localContentCallback(newLocation)) {
376 contentRetrieved =
true;
379 newLocation = getNewLocation(performData,
locations);
383std::string CCDBDownloader::getNewLocation(PerformData* performData, std::vector<std::string>&
locations)
const
385 auto requestData = performData->requestData;
386 if (performData->locInd <
locations.size()) {
387 std::string newLocation =
locations.at(performData->locInd++);
388 std::string hostUrl = requestData->hosts.at(performData->hostInd);
389 std::string newUrl = prepareRedirectedURL(newLocation, hostUrl);
396void CCDBDownloader::httpRedirect(PerformData* performData, std::string& newLocation, CURL* easy_handle)
398 auto requestData = performData->requestData;
399 LOG(
debug) <<
"Trying content location " << newLocation;
400 curl_easy_setopt(easy_handle, CURLOPT_URL, newLocation.c_str());
401 mHandlesToBeAdded.push_back(easy_handle);
404void CCDBDownloader::followRedirect(PerformData* performData, CURL* easy_handle, std::vector<std::string>&
locations,
bool& rescheduled,
bool& contentRetrieved)
406 std::string newLocation = getNewLocation(performData,
locations);
407 if (newLocation.find(
"alien:/", 0) != std::string::npos || newLocation.find(
"file:/", 0) != std::string::npos) {
408 getLocalContent(performData, newLocation, contentRetrieved,
locations);
410 if (!contentRetrieved && newLocation !=
"") {
411 httpRedirect(performData, newLocation, easy_handle);
418 CURLU* host_url = curl_url();
419 curl_url_set(host_url, CURLUPART_URL, full_host_url.c_str(), 0);
423 CURLUcode host_result = curl_url_get(host_url, CURLUPART_HOST, &host, 0);
424 if (host_result != CURLUE_OK) {
425 LOG(error) <<
"CCDBDownloader: Malformed url detected when processing redirect, could not identify the host part: " << full_host_url;
426 curl_url_cleanup(host_url);
431 CURLUcode scheme_result = curl_url_get(host_url, CURLUPART_SCHEME, &scheme, 0);
434 CURLUcode port_result = curl_url_get(host_url, CURLUPART_PORT, &port, 0);
436 curl_url_cleanup(host_url);
439 std::string trimmed_url =
"";
440 if (scheme_result == CURLUE_OK) {
441 trimmed_url += scheme + std::string(
"://");
446 if (port_result == CURLUE_OK) {
447 trimmed_url += std::string(
":") + port;
453std::string CCDBDownloader::prepareRedirectedURL(std::string
address, std::string potentialHost)
const
456 if (
address.find(
"alien:/") != std::string::npos ||
address.find(
"file:/") != std::string::npos) {
460 CURLU* redirected_url = curl_url();
461 curl_url_set(redirected_url, CURLUPART_URL,
address.c_str(), 0);
463 CURLUcode scheme_result = curl_url_get(redirected_url, CURLUPART_SCHEME, &scheme, 0);
465 curl_url_cleanup(redirected_url);
466 if (scheme_result == CURLUE_OK) {
475void CCDBDownloader::transferFinished(CURL* easy_handle, CURLcode curlCode)
478 PerformData* performData;
482 *performData->codeDestination = curlCode;
484 bool rescheduled =
false;
485 bool contentRetrieved =
false;
488 LOG(error) <<
"CCDBDownloader CURL transfer error - " << curl_easy_strerror(curlCode) <<
"\n";
491 switch (performData->type) {
493 --(*performData->requestsLeft);
497 if (requestData->headers) {
498 for (
auto& p : requestData->hoPair.header) {
499 (*requestData->headers)[p.first] = p.second;
504 curl_easy_getinfo(easy_handle, CURLINFO_RESPONSE_CODE, &httpCode);
506 curl_easy_getinfo(easy_handle, CURLINFO_EFFECTIVE_URL, &url);
507 LOG(
debug) <<
"Transfer for " << url <<
" finished with code " << httpCode <<
"\n";
508 std::string currentHost = requestData->hosts[performData->hostInd];
509 std::string loggingMessage =
prepareLogMessage(currentHost, requestData->userAgent, requestData->path, requestData->timestamp, requestData->headers, httpCode);
512 auto locations = getLocations(&(requestData->hoPair.header));
515 if (200 <= httpCode && httpCode < 400) {
517 if (304 == httpCode) {
518 LOGP(
debug,
"Object exists but I am not serving it since it's already in your possession");
519 contentRetrieved =
true;
520 }
else if (300 <= httpCode && httpCode < 400 && performData->locInd <
locations.size()) {
521 followRedirect(performData, easy_handle,
locations, rescheduled, contentRetrieved);
522 }
else if (200 <= httpCode && httpCode < 300) {
523 contentRetrieved =
true;
526 LOG(error) << loggingMessage;
531 contentRetrieved =
false;
535 if (!rescheduled && !contentRetrieved && performData->locInd ==
locations.size()) {
537 if (++performData->hostInd < requestData->hosts.size()) {
538 tryNewHost(performData, easy_handle);
541 LOG(error) <<
"File " << requestData->path <<
" could not be retrieved. No more hosts to try.";
547 if (!contentRetrieved) {
548 if (requestData->hoPair.object) {
549 requestData->hoPair.object->clear();
551 if (requestData->headers) {
552 (*requestData->headers)[
"Error"] =
"An error occurred during retrieval";
554 LOGP(alarm,
"Curl request to {}, response code: {}", url, httpCode);
556 if (requestData->headers && requestData->headers->find(
"fileSize") == requestData->headers->end()) {
557 (*requestData->headers)[
"fileSize"] = fmt::format(
"{}", requestData->hoPair.object ? requestData->hoPair.object->size() : 0);
560 --(*performData->requestsLeft);
561 curl_slist_free_all(*performData->options);
563 delete performData->codeDestination;
564 curl_easy_cleanup(easy_handle);
577 curlMultiErrorCheck(curl_multi_socket_action(mCurlMultiHandle, CURL_SOCKET_TIMEOUT, 0, &running_handles));
581void CCDBDownloader::checkMultiInfo()
586 while ((
message = curl_multi_info_read(mCurlMultiHandle, &pending))) {
589 CURLcode code =
message->data.result;
590 transferFinished(
message->easy_handle, code);
594 fprintf(stderr,
"CURLMSG default\n");
600int CCDBDownloader::startTimeout(CURLM* multi,
long timeout_ms,
void* userp)
604 if (timeout_ms < 0) {
607 if (timeout_ms == 0) {
615void CCDBDownloader::setHandleOptions(CURL* handle, PerformData*
data)
618 curlEasyErrorCheck(curl_easy_setopt(handle, CURLOPT_CLOSESOCKETFUNCTION, closesocketCallback));
626 curlEasyErrorCheck(curl_easy_setopt(handle, CURLOPT_USERAGENT, mUserAgentId.c_str()));
629void CCDBDownloader::checkHandleQueue()
631 if (mHandlesToBeAdded.size() > 0) {
636 mHandlesToBeAdded.erase(mHandlesToBeAdded.begin());
643 uv_run(
mUVLoop, noWait ? UV_RUN_NOWAIT : UV_RUN_ONCE);
648 std::vector<CURL*> handleVector;
649 handleVector.push_back(handle);
653std::vector<std::string> CCDBDownloader::getLocations(std::multimap<std::string, std::string>* headerMap)
const
655 std::vector<std::string> locs;
656 auto iter = headerMap->find(
"Location");
657 if (iter != headerMap->end()) {
658 locs.push_back(iter->second);
661 auto iter2 = headerMap->find(
"Content-Location");
662 if (iter2 != headerMap->end()) {
663 auto range = headerMap->equal_range(
"Content-Location");
664 for (
auto it =
range.first; it !=
range.second; ++it) {
665 if (std::find(locs.begin(), locs.end(), it->second) == locs.end()) {
666 locs.push_back(it->second);
675 std::vector<CURLcode> codeVector(handleVector.size());
676 size_t requestsLeft = handleVector.size();
678 for (
int i = 0;
i < handleVector.size();
i++) {
679 auto*
data =
new CCDBDownloader::PerformData();
680 data->codeDestination = &codeVector[
i];
681 codeVector[
i] = CURLE_FAILED_INIT;
683 data->type = BLOCKING;
684 data->requestsLeft = &requestsLeft;
685 setHandleOptions(handleVector[
i],
data);
686 mHandlesToBeAdded.push_back(handleVector[
i]);
689 while (requestsLeft > 0) {
700 CURLcode* codeVector =
new CURLcode();
704 std::multimap<std::string, std::string>* headerMap;
705 std::vector<std::string>* hostsPool;
706 curl_easy_getinfo(handle, CURLINFO_PRIVATE, &requestData);
708 hostsPool = &(requestData->
hosts);
712 auto*
data =
new CCDBDownloader::PerformData();
713 data->codeDestination = codeVector;
714 *codeVector = CURLE_FAILED_INIT;
716 data->type = ASYNCHRONOUS;
717 data->requestsLeft = requestCounter;
720 data->requestData = requestData;
721 data->options = options;
724 setHandleOptions(handle,
data);
725 mHandlesToBeAdded.push_back(handle);
734 std::string upath{
path};
736 auto ent = headers->find(
"Valid-From");
737 if (ent != headers->end()) {
738 upath +=
"/" + ent->second;
740 ent = headers->find(
"ETag");
741 if (ent != headers->end()) {
742 upath +=
"/" + ent->second;
745 upath.erase(remove(upath.begin(), upath.end(),
'\"'), upath.end());
746 return fmt::format(
"CcdbDownloader finished transfer {}{}{} for {} (agent_id: {}) with http code: {}", host_url, (host_url.back() ==
'/') ?
"" :
"/", upath, (ts < 0) ?
getCurrentTimestamp() : ts, userAgent, httpCode);
struct uv_timer_s uv_timer_t
struct uv_handle_s uv_handle_t
struct uv_poll_s uv_poll_t
struct uv_loop_s uv_loop_t
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
#define O2_DECLARE_DYNAMIC_STACKTRACE_LOG(name)
For the moment we do not support logs with a stacktrace.
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
void setRequestTimeoutTime(int timeoutMS)
void setOfflineTimeoutSettings()
CURLcode perform(CURL *handle)
void asynchSchedule(CURL *handle, size_t *requestCounter)
std::string trimHostUrl(std::string full_host_url) const
void setKeepaliveTimeoutTime(int timeoutMS)
void runLoop(bool noWait)
std::vector< CURLcode > batchBlockingPerform(std::vector< CURL * > const &handleVector)
void setConnectionTimeoutTime(int timeoutMS)
void setHappyEyeballsHeadstartTime(int headstartMS)
std::string prepareLogMessage(std::string host_url, std::string userAgent, const std::string &path, long ts, const std::map< std::string, std::string > *headers, long httpCode) const
std::unordered_map< uv_handle_t *, bool > mHandleMap
int mHappyEyeballsHeadstartMS
void setMaxParallelConnections(int limit)
CCDBDownloader(uv_loop_t *uv_loop=nullptr)
void setOnlineTimeoutSettings()
GLuint const GLint * locations
GLuint GLuint64EXT address
GLsizei const GLchar *const * path
GLuint GLsizei const GLchar * message
GLbitfield GLuint64 timeout
GLenum GLuint GLenum GLsizei const GLchar * buf
information complementary to a CCDB object (path, metadata, startTimeValidity, endTimeValidity etc)
void curlEasyErrorCheck(CURLcode code)
std::string uniqueAgentID()
void uvErrorCheck(int code, DownloaderErrorLevel level)
struct o2::ccdb::DownloaderRequestData DownloaderRequestData
long getCurrentTimestamp()
returns the timestamp in long corresponding to "now"
void curlMultiErrorCheck(CURLMcode code)
void closeHandles(uv_handle_t *handle, void *arg)
curl_socket_t opensocketCallback(void *clientp, curlsocktype purpose, struct curl_sockaddr *address)
void onUVClose(uv_handle_t *handle)
Polygon< T > close(Polygon< T > polygon)
std::string to_string(gsl::span< T, Size > span)
std::vector< std::string > hosts
HeaderObjectPair_t hoPair
static std::string getRandomString(int length)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"