18#include <unordered_map> 
   25#include <condition_variable> 
   29#include <sys/socket.h> 
   30#include <fairlogger/Logger.h> 
   31#include <boost/asio/ip/host_name.hpp> 
   42    uv_strerror_r(code, 
buf, 1000);
 
 
   54  if (code != CURLE_OK) {
 
 
   62  if (code != CURLM_OK) {
 
 
   71  std::string host = boost::asio::ip::host_name();
 
   72  char const* jobID = getenv(
"ALIEN_PROC_ID");
 
   89    setupInternalUVLoop();
 
   93  mTimeoutTimer = (
uv_timer_t*)malloc(
sizeof(*mTimeoutTimer));
 
   94  mTimeoutTimer->data = 
this;
 
   98  initializeMultiHandle();
 
 
  101void CCDBDownloader::setupInternalUVLoop()
 
  107void CCDBDownloader::initializeMultiHandle()
 
  109  mCurlMultiHandle = curl_multi_init();
 
  110  curlMultiErrorCheck(curl_multi_setopt(mCurlMultiHandle, CURLMOPT_SOCKETFUNCTION, handleSocket));
 
  111  auto socketData = &mSocketData;
 
  112  socketData->curlm = mCurlMultiHandle;
 
  113  socketData->CD = 
this;
 
  115  curlMultiErrorCheck(curl_multi_setopt(mCurlMultiHandle, CURLMOPT_TIMERFUNCTION, startTimeout));
 
  116  curlMultiErrorCheck(curl_multi_setopt(mCurlMultiHandle, CURLMOPT_TIMERDATA, mTimeoutTimer));
 
  127    while (uv_loop_alive(
mUVLoop) || (uv_loop_close(
mUVLoop) == UV_EBUSY)) {
 
 
  139  if (CD->mHandleMap.find(handle) != CD->mHandleMap.end()) {
 
  140    if (!uv_is_closing(handle)) {
 
 
  148  if (handle != 
nullptr) {
 
 
  153void CCDBDownloader::closesocketCallback(
void* clientp, curl_socket_t item)
 
  155  auto CD = (CCDBDownloader*)clientp;
 
  156  if (CD->mExternalLoop) {
 
  158    if (CD->mSocketTimerMap.find(item) != CD->mSocketTimerMap.end()) {
 
  159      auto timer = CD->mSocketTimerMap[item];
 
  164        delete (DataForClosingSocket*)timer->data;
 
  166      CD->mSocketTimerMap.erase(item);
 
  167      if (close(item) == -1) {
 
  173    if (
close(item) == -1) {
 
  189  if (CD->mExternalLoop) {
 
  190    CD->mSocketTimerMap[sock] = (
uv_timer_t*)malloc(
sizeof(*CD->mSocketTimerMap[sock]));
 
  192    CD->mHandleMap[(
uv_handle_t*)CD->mSocketTimerMap[sock]] = 
true;
 
  197    CD->mSocketTimerMap[sock]->data = 
data;
 
 
  203void CCDBDownloader::closeSocketByTimer(
uv_timer_t* handle)
 
  205  auto data = (DataForClosingSocket*)handle->data;
 
  207  auto sock = 
data->socket;
 
  209  if (CD->mSocketTimerMap.find(sock) != CD->mSocketTimerMap.end()) {
 
  211    CD->mSocketTimerMap.erase(sock);
 
  212    if (close(sock) == -1) {
 
  220void CCDBDownloader::curlTimeout(
uv_timer_t* handle)
 
  224  curl_multi_socket_action(CD->mCurlMultiHandle, CURL_SOCKET_TIMEOUT, 0, &running_handles);
 
  225  CD->checkMultiInfo();
 
  228void CCDBDownloader::curlPerform(
uv_poll_t* handle, 
int status, 
int events)
 
  233  if (events & UV_READABLE) {
 
  234    flags |= CURL_CSELECT_IN;
 
  236  if (events & UV_WRITABLE) {
 
  237    flags |= CURL_CSELECT_OUT;
 
  240  auto context = (CCDBDownloader::curl_context_t*)handle->data;
 
  242  curlMultiErrorCheck(curl_multi_socket_action(context->CD->mCurlMultiHandle, context->sockfd, 
flags, &running_handles));
 
  243  context->CD->checkMultiInfo();
 
  246int CCDBDownloader::handleSocket(CURL* easy, curl_socket_t s, 
int action, 
void* userp, 
void* socketp)
 
  248  auto socketData = (CCDBDownloader::DataForSocket*)userp;
 
  250  CCDBDownloader::curl_context_t* curl_context;
 
  256    case CURL_POLL_INOUT:
 
  258      curl_context = socketp ? (CCDBDownloader::curl_context_t*)socketp : CD->createCurlContext(
s);
 
  261      if (action != CURL_POLL_IN) {
 
  262        events |= UV_WRITABLE;
 
  264      if (action != CURL_POLL_OUT) {
 
  265        events |= UV_READABLE;
 
  268      if (CD->mExternalLoop && CD->mSocketTimerMap.find(s) != CD->mSocketTimerMap.end()) {
 
  274    case CURL_POLL_REMOVE:
 
  276        if (CD->mExternalLoop) {
 
  278          if (CD->mSocketTimerMap.find(s) != CD->mSocketTimerMap.end()) {
 
  279            uvErrorCheck(uv_timer_start(CD->mSocketTimerMap[s], closeSocketByTimer, CD->mKeepaliveTimeoutMS, 0), 
SEVERE);
 
  282        uvErrorCheck(uv_poll_stop(((CCDBDownloader::curl_context_t*)socketp)->poll_handle), 
SEVERE);
 
  283        CD->destroyCurlContext((CCDBDownloader::curl_context_t*)socketp);
 
  333CCDBDownloader::curl_context_t* CCDBDownloader::createCurlContext(curl_socket_t sockfd)
 
  335  curl_context_t* context;
 
  337  context = (curl_context_t*)malloc(
sizeof(*context));
 
  339  context->sockfd = sockfd;
 
  340  context->poll_handle = (
uv_poll_t*)malloc(
sizeof(*context->poll_handle));
 
  344  context->poll_handle->data = context;
 
  349void CCDBDownloader::curlCloseCB(
uv_handle_t* handle)
 
  351  auto* context = (curl_context_t*)handle->data;
 
  352  free(context->poll_handle);
 
  356void CCDBDownloader::destroyCurlContext(curl_context_t* context)
 
  358  uv_close((
uv_handle_t*)context->poll_handle, curlCloseCB);
 
  361void CCDBDownloader::tryNewHost(PerformData* performData, CURL* easy_handle)
 
  363  auto requestData = performData->requestData;
 
  364  std::string newUrl = requestData->hosts.at(performData->hostInd) + 
"/" + requestData->path + 
"/" + 
std::to_string(requestData->timestamp);
 
  365  LOG(
debug) << 
"Connecting to another host " << newUrl << 
"\n";
 
  366  requestData->hoPair.header.clear();
 
  367  curl_easy_setopt(easy_handle, CURLOPT_URL, newUrl.c_str());
 
  368  mHandlesToBeAdded.push_back(easy_handle);
 
  371void CCDBDownloader::getLocalContent(PerformData* performData, std::string& newLocation, 
bool& contentRetrieved, std::vector<std::string>& 
locations)
 
  373  auto requestData = performData->requestData;
 
  374  LOG(
debug) << 
"Redirecting to local content " << newLocation << 
"\n";
 
  375  if (requestData->localContentCallback(newLocation)) {
 
  376    contentRetrieved = 
true;
 
  377    LOG(
debug) << 
"Local content retrieved succesfully: " << newLocation << 
" n";
 
  380    newLocation = getNewLocation(performData, 
locations);
 
  381    LOG(
debug) << 
"Failed to retrieve local content: " << newLocation << 
"\n";
 
  385std::string CCDBDownloader::getNewLocation(PerformData* performData, std::vector<std::string>& 
locations)
 const 
  387  auto requestData = performData->requestData;
 
  388  if (performData->locInd < 
locations.size()) {
 
  389    std::string newLocation = 
locations.at(performData->locInd++);
 
  390    std::string hostUrl = requestData->hosts.at(performData->hostInd);
 
  391    std::string newUrl = prepareRedirectedURL(newLocation, hostUrl);
 
  398void CCDBDownloader::httpRedirect(PerformData* performData, std::string& newLocation, CURL* easy_handle)
 
  400  auto requestData = performData->requestData;
 
  401  LOG(
debug) << 
"Trying content location " << newLocation << 
"\n";
 
  402  curl_easy_setopt(easy_handle, CURLOPT_URL, newLocation.c_str());
 
  403  mHandlesToBeAdded.push_back(easy_handle);
 
  406void CCDBDownloader::followRedirect(PerformData* performData, CURL* easy_handle, std::vector<std::string>& 
locations, 
bool& rescheduled, 
bool& contentRetrieved)
 
  408  std::string newLocation = getNewLocation(performData, 
locations);
 
  409  while (!contentRetrieved && (newLocation.find(
"alien:/", 0) != std::string::npos || newLocation.find(
"file:/", 0) != std::string::npos)) {
 
  410    getLocalContent(performData, newLocation, contentRetrieved, 
locations);
 
  412  if (!contentRetrieved && newLocation != 
"") {
 
  413    httpRedirect(performData, newLocation, easy_handle);
 
  420  CURLU* host_url = curl_url();
 
  421  curl_url_set(host_url, CURLUPART_URL, full_host_url.c_str(), 0);
 
  425  CURLUcode host_result = curl_url_get(host_url, CURLUPART_HOST, &host, 0);
 
  426  if (host_result != CURLUE_OK) {
 
  427    LOG(error) << 
"CCDBDownloader: Malformed url detected when processing redirect, could not identify the host part: " << full_host_url;
 
  428    curl_url_cleanup(host_url);
 
  433  CURLUcode scheme_result = curl_url_get(host_url, CURLUPART_SCHEME, &scheme, 0);
 
  436  CURLUcode port_result = curl_url_get(host_url, CURLUPART_PORT, &port, 0);
 
  438  curl_url_cleanup(host_url);
 
  441  std::string trimmed_url = 
"";
 
  442  if (scheme_result == CURLUE_OK) {
 
  443    trimmed_url += scheme + std::string(
"://");
 
  448  if (port_result == CURLUE_OK) {
 
  449    trimmed_url += std::string(
":") + port;
 
 
  455std::string CCDBDownloader::prepareRedirectedURL(std::string 
address, std::string potentialHost)
 const 
  458  if (
address.find(
"alien:/") != std::string::npos || 
address.find(
"file:/") != std::string::npos) {
 
  462  CURLU* redirected_url = curl_url();
 
  463  curl_url_set(redirected_url, CURLUPART_URL, 
address.c_str(), 0);
 
  465  CURLUcode scheme_result = curl_url_get(redirected_url, CURLUPART_SCHEME, &scheme, 0);
 
  467  curl_url_cleanup(redirected_url);
 
  468  if (scheme_result == CURLUE_OK) {
 
  477void CCDBDownloader::transferFinished(CURL* easy_handle, CURLcode curlCode)
 
  480  PerformData* performData;
 
  484  *performData->codeDestination = curlCode;
 
  486  bool rescheduled = 
false;
 
  487  bool contentRetrieved = 
false;
 
  490    LOG(error) << 
"CCDBDownloader CURL transfer error - " << curl_easy_strerror(curlCode) << 
"\n";
 
  493  switch (performData->type) {
 
  495      --(*performData->requestsLeft);
 
  499      if (requestData->headers) {
 
  500        for (
auto& p : requestData->hoPair.header) {
 
  501          (*requestData->headers)[
p.first] = 
p.second;
 
  506      curl_easy_getinfo(easy_handle, CURLINFO_RESPONSE_CODE, &httpCode);
 
  508      curl_easy_getinfo(easy_handle, CURLINFO_EFFECTIVE_URL, &
url);
 
  509      LOG(
debug) << 
"Transfer for " << 
url << 
" finished with code " << httpCode << 
"\n";
 
  510      std::string currentHost = requestData->hosts[performData->hostInd];
 
  511      std::string loggingMessage = 
prepareLogMessage(currentHost, requestData->userAgent, requestData->path, requestData->timestamp, requestData->headers, httpCode);
 
  514      updateLocations(&(requestData->hoPair.header), &requestData->locations, &performData->locInd);
 
  517      if (200 <= httpCode && httpCode < 400) {
 
  519        if (304 == httpCode) {
 
  520          LOGP(
debug, 
"Object exists but I am not serving it since it's already in your possession");
 
  521          contentRetrieved = 
true;
 
  522        } 
else if (300 <= httpCode && httpCode < 400 && performData->locInd < requestData->
locations.size()) {
 
  523          followRedirect(performData, easy_handle, requestData->locations, rescheduled, contentRetrieved);
 
  524        } 
else if (200 <= httpCode && httpCode < 300) {
 
  525          contentRetrieved = 
true; 
 
  528        LOG(error) << loggingMessage;
 
  533        contentRetrieved = 
false;
 
  537      if (!rescheduled && !contentRetrieved) {
 
  539        if (performData->locInd < requestData->locations.size()) {
 
  540          followRedirect(performData, easy_handle, requestData->locations, rescheduled, contentRetrieved);
 
  545      if (!rescheduled && !contentRetrieved) {
 
  547        if (++performData->hostInd < requestData->hosts.size()) {
 
  548          tryNewHost(performData, easy_handle);
 
  551          LOG(error) << 
"File " << requestData->path << 
" could not be retrieved. No more hosts to try.";
 
  557        if (!contentRetrieved) {
 
  558          if (requestData->hoPair.object) {
 
  559            requestData->hoPair.object->clear();
 
  561          if (requestData->headers) {
 
  562            (*requestData->headers)[
"Error"] = 
"An error occurred during retrieval";
 
  564          LOGP(alarm, 
"Curl request to {}, response code: {}", 
url, httpCode);
 
  566          if (requestData->headers && requestData->headers->find(
"fileSize") == requestData->headers->end()) {
 
  567            (*requestData->headers)[
"fileSize"] = fmt::format(
"{}", requestData->hoPair.object ? requestData->hoPair.object->size() : 0);
 
  570        --(*performData->requestsLeft);
 
  571        curl_slist_free_all(*performData->options);
 
  573        delete performData->codeDestination;
 
  574        curl_easy_cleanup(easy_handle);
 
  587  curlMultiErrorCheck(curl_multi_socket_action(mCurlMultiHandle, CURL_SOCKET_TIMEOUT, 0, &running_handles));
 
  591void CCDBDownloader::checkMultiInfo()
 
  596  while ((
message = curl_multi_info_read(mCurlMultiHandle, &pending))) {
 
  599        CURLcode code = 
message->data.result;
 
  600        transferFinished(
message->easy_handle, code);
 
  604        fprintf(stderr, 
"CURLMSG default\n");
 
  610int CCDBDownloader::startTimeout(CURLM* multi, 
long timeout_ms, 
void* userp)
 
  614  if (timeout_ms < 0) {
 
  617    if (timeout_ms == 0) {
 
  625void CCDBDownloader::setHandleOptions(CURL* handle, PerformData* 
data)
 
  628  curlEasyErrorCheck(curl_easy_setopt(handle, CURLOPT_CLOSESOCKETFUNCTION, closesocketCallback));
 
  636  curlEasyErrorCheck(curl_easy_setopt(handle, CURLOPT_USERAGENT, mUserAgentId.c_str()));
 
  639void CCDBDownloader::checkHandleQueue()
 
  641  if (mHandlesToBeAdded.size() > 0) {
 
  646      mHandlesToBeAdded.erase(mHandlesToBeAdded.begin());
 
  653  uv_run(
mUVLoop, noWait ? UV_RUN_NOWAIT : UV_RUN_ONCE);
 
 
  658  std::vector<CURL*> handleVector;
 
  659  handleVector.push_back(handle);
 
 
  663void CCDBDownloader::updateLocations(std::multimap<std::string, std::string>* headerMap, std::vector<std::string>* 
locations, 
int* locIndex)
 const 
  665  std::vector<std::string> newLocations;
 
  667  auto iter = headerMap->find(
"Location");
 
  668  if (iter != headerMap->end()) {
 
  669    auto range = headerMap->equal_range(
"Location");
 
  670    for (
auto it = 
range.first; it != 
range.second; ++it) {
 
  672        if (std::find(newLocations.begin(), newLocations.end(), it->second) == newLocations.end()) {
 
  673          newLocations.push_back(it->second);
 
  680  auto iter2 = headerMap->find(
"Content-Location");
 
  681  if (iter2 != headerMap->end()) {
 
  682    auto range = headerMap->equal_range(
"Content-Location");
 
  683    for (
auto it = 
range.first; it != 
range.second; ++it) {
 
  685        if (std::find(newLocations.begin(), newLocations.end(), it->second) == newLocations.end()) {
 
  686          newLocations.push_back(it->second);
 
  693  locations->insert(
locations->begin() + (*locIndex), newLocations.begin(), newLocations.end());
 
  698  std::vector<CURLcode> codeVector(handleVector.size());
 
  699  size_t requestsLeft = handleVector.size();
 
  701  for (
int i = 0; 
i < handleVector.size(); 
i++) {
 
  702    auto* 
data = 
new CCDBDownloader::PerformData();
 
  703    data->codeDestination = &codeVector[
i];
 
  704    codeVector[
i] = CURLE_FAILED_INIT;
 
  706    data->type = BLOCKING;
 
  707    data->requestsLeft = &requestsLeft;
 
  708    setHandleOptions(handleVector[
i], 
data);
 
  709    mHandlesToBeAdded.push_back(handleVector[
i]);
 
  712  while (requestsLeft > 0) {
 
 
  723  CURLcode* codeVector = 
new CURLcode();
 
  727  std::multimap<std::string, std::string>* headerMap;
 
  728  std::vector<std::string>* hostsPool;
 
  729  curl_easy_getinfo(handle, CURLINFO_PRIVATE, &requestData);
 
  731  hostsPool = &(requestData->
hosts);
 
  735  auto* 
data = 
new CCDBDownloader::PerformData(); 
 
  736  data->codeDestination = codeVector;
 
  737  *codeVector = CURLE_FAILED_INIT;
 
  739  data->type = ASYNCHRONOUS;
 
  740  data->requestsLeft = requestCounter;
 
  743  data->requestData = requestData;
 
  744  data->options = options;
 
  747  setHandleOptions(handle, 
data);
 
  748  mHandlesToBeAdded.push_back(handle);
 
 
  757  std::string upath{
path};
 
  759    auto ent = headers->find(
"Valid-From");
 
  760    if (ent != headers->end()) {
 
  761      upath += 
"/" + ent->second;
 
  763    ent = headers->find(
"ETag");
 
  764    if (ent != headers->end()) {
 
  765      upath += 
"/" + ent->second;
 
  768  upath.erase(remove(upath.begin(), upath.end(), 
'\"'), upath.end());
 
  769  return fmt::format(
"CcdbDownloader finished transfer {}{}{} for {} (agent_id: {}) with http code: {}", host_url, (host_url.back() == 
'/') ? 
"" : 
"/", upath, (ts < 0) ? 
getCurrentTimestamp() : ts, userAgent, httpCode);
 
 
struct uv_timer_s uv_timer_t
struct uv_handle_s uv_handle_t
struct uv_poll_s uv_poll_t
struct uv_loop_s uv_loop_t
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
#define O2_DECLARE_DYNAMIC_STACKTRACE_LOG(name)
For the moment we do not support logs with a stacktrace.
#define O2_SIGNPOST_ID_GENERATE(name, log)
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
void setRequestTimeoutTime(int timeoutMS)
void setOfflineTimeoutSettings()
CURLcode perform(CURL *handle)
void asynchSchedule(CURL *handle, size_t *requestCounter)
std::string trimHostUrl(std::string full_host_url) const
void setKeepaliveTimeoutTime(int timeoutMS)
void runLoop(bool noWait)
std::vector< CURLcode > batchBlockingPerform(std::vector< CURL * > const &handleVector)
void setConnectionTimeoutTime(int timeoutMS)
void setHappyEyeballsHeadstartTime(int headstartMS)
std::string prepareLogMessage(std::string host_url, std::string userAgent, const std::string &path, long ts, const std::map< std::string, std::string > *headers, long httpCode) const
std::unordered_map< uv_handle_t *, bool > mHandleMap
int mHappyEyeballsHeadstartMS
void setMaxParallelConnections(int limit)
CCDBDownloader(uv_loop_t *uv_loop=nullptr)
void setOnlineTimeoutSettings()
GLuint const GLint * locations
GLuint GLuint64EXT address
GLsizei const GLchar *const  * path
GLuint GLsizei const GLchar * message
GLbitfield GLuint64 timeout
GLenum GLuint GLenum GLsizei const GLchar * buf
information complementary to a CCDB object (path, metadata, startTimeValidity, endTimeValidity etc)
void curlEasyErrorCheck(CURLcode code)
std::string uniqueAgentID()
void uvErrorCheck(int code, DownloaderErrorLevel level)
struct o2::ccdb::DownloaderRequestData DownloaderRequestData
long getCurrentTimestamp()
returns the timestamp in long corresponding to "now"
void curlMultiErrorCheck(CURLMcode code)
void closeHandles(uv_handle_t *handle, void *arg)
curl_socket_t opensocketCallback(void *clientp, curlsocktype purpose, struct curl_sockaddr *address)
void onUVClose(uv_handle_t *handle)
Polygon< T > close(Polygon< T > polygon)
std::string to_string(gsl::span< T, Size > span)
std::vector< std::string > hosts
HeaderObjectPair_t hoPair
static std::string getRandomString(int length)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"