Project
Loading...
Searching...
No Matches
CCDBDownloader.cxx
Go to the documentation of this file.
1// Copyright 2019-2023 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <CCDB/CCDBDownloader.h>
15#include "Framework/Signpost.h"
16
17#include <curl/curl.h>
18#include <unordered_map>
19#include <cstdio>
20#include <cstdlib>
21#include <uv.h>
22#include <string>
23#include <thread>
24#include <vector>
25#include <condition_variable>
26#include <mutex>
27#include <unistd.h>
28#include <sys/types.h>
29#include <sys/socket.h>
30#include <fairlogger/Logger.h>
31#include <boost/asio/ip/host_name.hpp>
32
34
35namespace o2::ccdb
36{
37
39{
40 if (code != 0) {
41 char buf[1000];
42 uv_strerror_r(code, buf, 1000);
43 O2_SIGNPOST_ID_GENERATE(sid, ccdb_downloader);
44 if (level == SEVERE) {
45 O2_SIGNPOST_EVENT_EMIT_ERROR(ccdb_downloader, sid, "CCDBDownloader", "UV error - %{public}s", buf);
46 } else {
47 O2_SIGNPOST_EVENT_EMIT_WARN(ccdb_downloader, sid, "CCDBDownloader", "UV minor error - %{public}s", buf);
48 }
49 }
50}
51
52void curlEasyErrorCheck(CURLcode code)
53{
54 if (code != CURLE_OK) {
55 O2_SIGNPOST_ID_GENERATE(sid, ccdb_downloader);
56 O2_SIGNPOST_EVENT_EMIT_ERROR(ccdb_downloader, sid, "CCDBDownloader", "CURL error - %{public}s", curl_easy_strerror(code));
57 }
58}
59
60void curlMultiErrorCheck(CURLMcode code)
61{
62 if (code != CURLM_OK) {
63 O2_SIGNPOST_ID_GENERATE(sid, ccdb_downloader);
64 O2_SIGNPOST_EVENT_EMIT_ERROR(ccdb_downloader, sid, "CCDBDownloader", "CURL error - %{public}s", curl_multi_strerror(code));
65 }
66}
67namespace
68{
69std::string uniqueAgentID()
70{
71 std::string host = boost::asio::ip::host_name();
72 char const* jobID = getenv("ALIEN_PROC_ID");
73 if (jobID) {
74 return fmt::format("{}-{}-{}-{}", host, getCurrentTimestamp() / 1000, o2::utils::Str::getRandomString(6), jobID);
75 } else {
76 return fmt::format("{}-{}-{}", host, getCurrentTimestamp() / 1000, o2::utils::Str::getRandomString(6));
77 }
78}
79} // namespace
80
82 : mUserAgentId(uniqueAgentID())
83{
84 if (uv_loop) {
85 mExternalLoop = true;
86 mUVLoop = uv_loop;
87 } else {
88 mExternalLoop = false;
89 setupInternalUVLoop();
90 }
91
92 // Preparing timer to be used by curl
93 mTimeoutTimer = (uv_timer_t*)malloc(sizeof(*mTimeoutTimer));
94 mTimeoutTimer->data = this;
95 uvErrorCheck(uv_timer_init(mUVLoop, mTimeoutTimer), SEVERE);
96 mHandleMap[(uv_handle_t*)mTimeoutTimer] = true;
97
98 initializeMultiHandle();
99}
100
101void CCDBDownloader::setupInternalUVLoop()
102{
103 mUVLoop = new uv_loop_t();
104 uvErrorCheck(uv_loop_init(mUVLoop), SEVERE);
105}
106
107void CCDBDownloader::initializeMultiHandle()
108{
109 mCurlMultiHandle = curl_multi_init();
110 curlMultiErrorCheck(curl_multi_setopt(mCurlMultiHandle, CURLMOPT_SOCKETFUNCTION, handleSocket));
111 auto socketData = &mSocketData;
112 socketData->curlm = mCurlMultiHandle;
113 socketData->CD = this;
114 curlMultiErrorCheck(curl_multi_setopt(mCurlMultiHandle, CURLMOPT_SOCKETDATA, socketData));
115 curlMultiErrorCheck(curl_multi_setopt(mCurlMultiHandle, CURLMOPT_TIMERFUNCTION, startTimeout));
116 curlMultiErrorCheck(curl_multi_setopt(mCurlMultiHandle, CURLMOPT_TIMERDATA, mTimeoutTimer));
117 curlMultiErrorCheck(curl_multi_setopt(mCurlMultiHandle, CURLMOPT_MAX_TOTAL_CONNECTIONS, mMaxHandlesInUse));
118}
119
121{
122 // Loop has been ordered to stop via signalToClose()
123 curlMultiErrorCheck(curl_multi_cleanup(mCurlMultiHandle));
124
125 if (!mExternalLoop) {
126 // Schedule all handles to close. Execute loop to allow them to execute their destructors.
127 while (uv_loop_alive(mUVLoop) || (uv_loop_close(mUVLoop) == UV_EBUSY)) {
128 uv_walk(mUVLoop, closeHandles, this);
129 uv_run(mUVLoop, UV_RUN_ONCE);
130 }
131 delete mUVLoop;
132 }
133}
134
135void closeHandles(uv_handle_t* handle, void* arg)
136{
137 auto CD = (CCDBDownloader*)arg;
138 // Close only handles belonging to the Downloader
139 if (CD->mHandleMap.find(handle) != CD->mHandleMap.end()) {
140 if (!uv_is_closing(handle)) {
141 uv_close(handle, onUVClose);
142 }
143 }
144}
145
147{
148 if (handle != nullptr) {
149 free(handle);
150 }
151}
152
153void CCDBDownloader::closesocketCallback(void* clientp, curl_socket_t item)
154{
155 auto CD = (CCDBDownloader*)clientp;
156 if (CD->mExternalLoop) {
157 // If external uv loop is used then the keepalive mechanism is active.
158 if (CD->mSocketTimerMap.find(item) != CD->mSocketTimerMap.end()) {
159 auto timer = CD->mSocketTimerMap[item];
160 uvErrorCheck(uv_timer_stop(timer), SEVERE);
161 // we are getting rid of the uv_timer_t pointer ... so we need
162 // to free possibly attached user data pointers as well. Counteracts action of opensocketCallback
163 if (timer->data) {
164 delete (DataForClosingSocket*)timer->data;
165 }
166 CD->mSocketTimerMap.erase(item);
167 if (close(item) == -1) {
168 O2_SIGNPOST_ID_GENERATE(sid, ccdb_downloader);
169 O2_SIGNPOST_EVENT_EMIT_ERROR(ccdb_downloader, sid, "CCDBDownloader", "CCDBDownloader: Socket failed to close");
170 }
171 }
172 } else {
173 if (close(item) == -1) {
174 O2_SIGNPOST_ID_GENERATE(sid, ccdb_downloader);
175 O2_SIGNPOST_EVENT_EMIT_ERROR(ccdb_downloader, sid, "CCDBDownloader", "CCDBDownloader: Socket failed to close");
176 }
177 }
178}
179
180curl_socket_t opensocketCallback(void* clientp, curlsocktype purpose, struct curl_sockaddr* address)
181{
182 auto CD = (CCDBDownloader*)clientp;
183 auto sock = socket(address->family, address->socktype, address->protocol);
184 if (sock == -1) {
185 O2_SIGNPOST_ID_GENERATE(sid, ccdb_downloader);
186 O2_SIGNPOST_EVENT_EMIT_ERROR(ccdb_downloader, sid, "CCDBDownloader", "CCDBDownloader: Socket failed to open");
187 }
188
189 if (CD->mExternalLoop) {
190 CD->mSocketTimerMap[sock] = (uv_timer_t*)malloc(sizeof(*CD->mSocketTimerMap[sock]));
191 uvErrorCheck(uv_timer_init(CD->mUVLoop, CD->mSocketTimerMap[sock]), SEVERE);
192 CD->mHandleMap[(uv_handle_t*)CD->mSocketTimerMap[sock]] = true;
193
194 auto data = new DataForClosingSocket();
195 data->CD = CD;
196 data->socket = sock;
197 CD->mSocketTimerMap[sock]->data = data;
198 }
199
200 return sock;
201}
202
203void CCDBDownloader::closeSocketByTimer(uv_timer_t* handle)
204{
205 auto data = (DataForClosingSocket*)handle->data;
206 auto CD = data->CD;
207 auto sock = data->socket;
208
209 if (CD->mSocketTimerMap.find(sock) != CD->mSocketTimerMap.end()) {
210 uvErrorCheck(uv_timer_stop(CD->mSocketTimerMap[sock]), SEVERE);
211 CD->mSocketTimerMap.erase(sock);
212 if (close(sock) == -1) {
213 O2_SIGNPOST_ID_GENERATE(sid, ccdb_downloader);
214 O2_SIGNPOST_EVENT_EMIT_ERROR(ccdb_downloader, sid, "CCDBDownloader", "CCDBDownloader: Socket failed to close");
215 }
216 delete data;
217 }
218}
219
220void CCDBDownloader::curlTimeout(uv_timer_t* handle)
221{
222 auto CD = (CCDBDownloader*)handle->data;
223 int running_handles;
224 curl_multi_socket_action(CD->mCurlMultiHandle, CURL_SOCKET_TIMEOUT, 0, &running_handles);
225 CD->checkMultiInfo();
226}
227
228void CCDBDownloader::curlPerform(uv_poll_t* handle, int status, int events)
229{
230 uvErrorCheck(status, MINOR);
231 int running_handles;
232 int flags = 0;
233 if (events & UV_READABLE) {
234 flags |= CURL_CSELECT_IN;
235 }
236 if (events & UV_WRITABLE) {
237 flags |= CURL_CSELECT_OUT;
238 }
239
240 auto context = (CCDBDownloader::curl_context_t*)handle->data;
241
242 curlMultiErrorCheck(curl_multi_socket_action(context->CD->mCurlMultiHandle, context->sockfd, flags, &running_handles));
243 context->CD->checkMultiInfo();
244}
245
246int CCDBDownloader::handleSocket(CURL* easy, curl_socket_t s, int action, void* userp, void* socketp)
247{
248 auto socketData = (CCDBDownloader::DataForSocket*)userp;
249 auto CD = (CCDBDownloader*)socketData->CD;
250 CCDBDownloader::curl_context_t* curl_context;
251 int events = 0;
252
253 switch (action) {
254 case CURL_POLL_IN:
255 case CURL_POLL_OUT:
256 case CURL_POLL_INOUT:
257
258 curl_context = socketp ? (CCDBDownloader::curl_context_t*)socketp : CD->createCurlContext(s);
259 curlMultiErrorCheck(curl_multi_assign(socketData->curlm, s, (void*)curl_context));
260
261 if (action != CURL_POLL_IN) {
262 events |= UV_WRITABLE;
263 }
264 if (action != CURL_POLL_OUT) {
265 events |= UV_READABLE;
266 }
267
268 if (CD->mExternalLoop && CD->mSocketTimerMap.find(s) != CD->mSocketTimerMap.end()) {
269 uvErrorCheck(uv_timer_stop(CD->mSocketTimerMap[s]), SEVERE);
270 }
271
272 uvErrorCheck(uv_poll_start(curl_context->poll_handle, events, curlPerform), SEVERE);
273 break;
274 case CURL_POLL_REMOVE:
275 if (socketp) {
276 if (CD->mExternalLoop) {
277 // If external loop is used then start the keepalive timeout.
278 if (CD->mSocketTimerMap.find(s) != CD->mSocketTimerMap.end()) {
279 uvErrorCheck(uv_timer_start(CD->mSocketTimerMap[s], closeSocketByTimer, CD->mKeepaliveTimeoutMS, 0), SEVERE);
280 }
281 }
282 uvErrorCheck(uv_poll_stop(((CCDBDownloader::curl_context_t*)socketp)->poll_handle), SEVERE);
283 CD->destroyCurlContext((CCDBDownloader::curl_context_t*)socketp);
284 curlMultiErrorCheck(curl_multi_assign(socketData->curlm, s, nullptr));
285 }
286 break;
287 default:
288 abort();
289 }
290
291 return 0;
292}
293
295{
296 mMaxHandlesInUse = limit;
297}
298
300{
301 mKeepaliveTimeoutMS = timeoutMS;
302}
303
305{
306 mConnectionTimeoutMS = timeoutMS;
307}
308
310{
311 mRequestTimeoutMS = timeoutMS;
312}
313
315{
316 mHappyEyeballsHeadstartMS = headstartMS;
317}
318
325
332
333CCDBDownloader::curl_context_t* CCDBDownloader::createCurlContext(curl_socket_t sockfd)
334{
335 curl_context_t* context;
336
337 context = (curl_context_t*)malloc(sizeof(*context));
338 context->CD = this;
339 context->sockfd = sockfd;
340 context->poll_handle = (uv_poll_t*)malloc(sizeof(*context->poll_handle));
341
342 uvErrorCheck(uv_poll_init_socket(mUVLoop, context->poll_handle, sockfd), SEVERE);
343 mHandleMap[(uv_handle_t*)(context->poll_handle)] = true;
344 context->poll_handle->data = context;
345
346 return context;
347}
348
349void CCDBDownloader::curlCloseCB(uv_handle_t* handle)
350{
351 auto* context = (curl_context_t*)handle->data;
352 free(context->poll_handle);
353 free(context);
354}
355
356void CCDBDownloader::destroyCurlContext(curl_context_t* context)
357{
358 uv_close((uv_handle_t*)context->poll_handle, curlCloseCB);
359}
360
361void CCDBDownloader::tryNewHost(PerformData* performData, CURL* easy_handle)
362{
363 auto requestData = performData->requestData;
364 std::string newUrl = requestData->hosts.at(performData->hostInd) + "/" + requestData->path + "/" + std::to_string(requestData->timestamp);
365 LOG(debug) << "Connecting to another host " << newUrl;
366 requestData->hoPair.header.clear();
367 curl_easy_setopt(easy_handle, CURLOPT_URL, newUrl.c_str());
368 mHandlesToBeAdded.push_back(easy_handle);
369}
370
371void CCDBDownloader::getLocalContent(PerformData* performData, std::string& newLocation, bool& contentRetrieved, std::vector<std::string>& locations)
372{
373 auto requestData = performData->requestData;
374 LOG(debug) << "Redirecting to local content " << newLocation << "\n";
375 if (requestData->localContentCallback(newLocation)) {
376 contentRetrieved = true;
377 } else {
378 // Prepare next redirect url
379 newLocation = getNewLocation(performData, locations);
380 }
381}
382
383std::string CCDBDownloader::getNewLocation(PerformData* performData, std::vector<std::string>& locations) const
384{
385 auto requestData = performData->requestData;
386 if (performData->locInd < locations.size()) {
387 std::string newLocation = locations.at(performData->locInd++);
388 std::string hostUrl = requestData->hosts.at(performData->hostInd);
389 std::string newUrl = prepareRedirectedURL(newLocation, hostUrl);
390 return newUrl;
391 } else {
392 return "";
393 }
394}
395
396void CCDBDownloader::httpRedirect(PerformData* performData, std::string& newLocation, CURL* easy_handle)
397{
398 auto requestData = performData->requestData;
399 LOG(debug) << "Trying content location " << newLocation;
400 curl_easy_setopt(easy_handle, CURLOPT_URL, newLocation.c_str());
401 mHandlesToBeAdded.push_back(easy_handle);
402}
403
404void CCDBDownloader::followRedirect(PerformData* performData, CURL* easy_handle, std::vector<std::string>& locations, bool& rescheduled, bool& contentRetrieved)
405{
406 std::string newLocation = getNewLocation(performData, locations);
407 if (newLocation.find("alien:/", 0) != std::string::npos || newLocation.find("file:/", 0) != std::string::npos) {
408 getLocalContent(performData, newLocation, contentRetrieved, locations);
409 }
410 if (!contentRetrieved && newLocation != "") {
411 httpRedirect(performData, newLocation, easy_handle);
412 rescheduled = true;
413 }
414}
415
416std::string CCDBDownloader::trimHostUrl(std::string full_host_url) const
417{
418 CURLU* host_url = curl_url();
419 curl_url_set(host_url, CURLUPART_URL, full_host_url.c_str(), 0);
420
421 // Get host part (the only critical part)
422 char* host;
423 CURLUcode host_result = curl_url_get(host_url, CURLUPART_HOST, &host, 0);
424 if (host_result != CURLUE_OK) {
425 LOG(error) << "CCDBDownloader: Malformed url detected when processing redirect, could not identify the host part: " << full_host_url;
426 curl_url_cleanup(host_url);
427 return "";
428 }
429 // Get scheme (protocol) part
430 char* scheme;
431 CURLUcode scheme_result = curl_url_get(host_url, CURLUPART_SCHEME, &scheme, 0);
432 // Get port
433 char* port;
434 CURLUcode port_result = curl_url_get(host_url, CURLUPART_PORT, &port, 0);
435
436 curl_url_cleanup(host_url);
437
438 // Assemble parts
439 std::string trimmed_url = "";
440 if (scheme_result == CURLUE_OK) {
441 trimmed_url += scheme + std::string("://");
442 free(scheme);
443 }
444 trimmed_url += host;
445 free(host);
446 if (port_result == CURLUE_OK) {
447 trimmed_url += std::string(":") + port;
448 free(port);
449 }
450 return trimmed_url;
451}
452
453std::string CCDBDownloader::prepareRedirectedURL(std::string address, std::string potentialHost) const
454{
455 // If it is an alien or local address it does not need preparation
456 if (address.find("alien:/") != std::string::npos || address.find("file:/") != std::string::npos) {
457 return address;
458 }
459 // Check if URL contains a scheme (protocol)
460 CURLU* redirected_url = curl_url();
461 curl_url_set(redirected_url, CURLUPART_URL, address.c_str(), 0);
462 char* scheme;
463 CURLUcode scheme_result = curl_url_get(redirected_url, CURLUPART_SCHEME, &scheme, 0);
464 curl_free(scheme);
465 curl_url_cleanup(redirected_url);
466 if (scheme_result == CURLUE_OK) {
467 // The redirected_url contains a scheme (protocol) so there is no need for preparation
468 return address;
469 }
470 // If the address doesn't contain a scheme it means it is a relative url. We need to append it to the trimmed host url
471 // The host url must be trimmed from it's path (if it ends in one) as otherwise the redirection url would be appended after said path
472 return trimHostUrl(potentialHost) + address;
473}
474
475void CCDBDownloader::transferFinished(CURL* easy_handle, CURLcode curlCode)
476{
477 mHandlesInUse--;
478 PerformData* performData;
479 curlEasyErrorCheck(curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &performData));
480
481 curlMultiErrorCheck(curl_multi_remove_handle(mCurlMultiHandle, easy_handle));
482 *performData->codeDestination = curlCode;
483
484 bool rescheduled = false;
485 bool contentRetrieved = false;
486
487 if (curlCode != 0) {
488 LOG(error) << "CCDBDownloader CURL transfer error - " << curl_easy_strerror(curlCode) << "\n";
489 }
490
491 switch (performData->type) {
492 case BLOCKING: {
493 --(*performData->requestsLeft);
494 } break;
495 case ASYNCHRONOUS: {
496 DownloaderRequestData* requestData = performData->requestData;
497 if (requestData->headers) {
498 for (auto& p : requestData->hoPair.header) {
499 (*requestData->headers)[p.first] = p.second;
500 }
501 }
502 // Log that transfer finished
503 long httpCode;
504 curl_easy_getinfo(easy_handle, CURLINFO_RESPONSE_CODE, &httpCode);
505 char* url;
506 curl_easy_getinfo(easy_handle, CURLINFO_EFFECTIVE_URL, &url);
507 LOG(debug) << "Transfer for " << url << " finished with code " << httpCode << "\n";
508 std::string currentHost = requestData->hosts[performData->hostInd];
509 std::string loggingMessage = prepareLogMessage(currentHost, requestData->userAgent, requestData->path, requestData->timestamp, requestData->headers, httpCode);
510
511 // Get alternative locations for the same host
512 auto locations = getLocations(&(requestData->hoPair.header));
513
514 // React to received http code
515 if (200 <= httpCode && httpCode < 400) {
516 LOG(debug) << loggingMessage;
517 if (304 == httpCode) {
518 LOGP(debug, "Object exists but I am not serving it since it's already in your possession");
519 contentRetrieved = true;
520 } else if (300 <= httpCode && httpCode < 400 && performData->locInd < locations.size()) {
521 followRedirect(performData, easy_handle, locations, rescheduled, contentRetrieved);
522 } else if (200 <= httpCode && httpCode < 300) {
523 contentRetrieved = true; // Can be overruled by following error check
524 }
525 } else {
526 LOG(error) << loggingMessage;
527 }
528
529 // Check for errors
530 if (curlCode != 0) {
531 contentRetrieved = false;
532 }
533
534 // Check if content was retrieved, or scheduled to be retrieved
535 if (!rescheduled && !contentRetrieved && performData->locInd == locations.size()) {
536 // Ran out of locations to redirect, try new host
537 if (++performData->hostInd < requestData->hosts.size()) {
538 tryNewHost(performData, easy_handle);
539 rescheduled = true;
540 } else {
541 LOG(error) << "File " << requestData->path << " could not be retrieved. No more hosts to try.";
542 }
543 }
544
545 if (!rescheduled) {
546 // No more transfers will be done for this request, do cleanup specific for ASYNCHRONOUS calls
547 if (!contentRetrieved) {
548 if (requestData->hoPair.object) {
549 requestData->hoPair.object->clear();
550 }
551 if (requestData->headers) {
552 (*requestData->headers)["Error"] = "An error occurred during retrieval";
553 }
554 LOGP(alarm, "Curl request to {}, response code: {}", url, httpCode);
555 } else {
556 if (requestData->headers && requestData->headers->find("fileSize") == requestData->headers->end()) {
557 (*requestData->headers)["fileSize"] = fmt::format("{}", requestData->hoPair.object ? requestData->hoPair.object->size() : 0);
558 }
559 }
560 --(*performData->requestsLeft);
561 curl_slist_free_all(*performData->options);
562 delete requestData;
563 delete performData->codeDestination;
564 curl_easy_cleanup(easy_handle);
565 }
566 } break;
567 }
568 if (!rescheduled) {
569 // No more transfers will be done for this request, do general cleanup
570 delete performData;
571 }
572
573 checkHandleQueue();
574
575 // Calling timeout starts a new download if a new easy_handle was added.
576 int running_handles;
577 curlMultiErrorCheck(curl_multi_socket_action(mCurlMultiHandle, CURL_SOCKET_TIMEOUT, 0, &running_handles));
578 checkMultiInfo();
579}
580
581void CCDBDownloader::checkMultiInfo()
582{
583 CURLMsg* message;
584 int pending;
585
586 while ((message = curl_multi_info_read(mCurlMultiHandle, &pending))) {
587 switch (message->msg) {
588 case CURLMSG_DONE: {
589 CURLcode code = message->data.result;
590 transferFinished(message->easy_handle, code);
591 } break;
592
593 default:
594 fprintf(stderr, "CURLMSG default\n");
595 break;
596 }
597 }
598}
599
600int CCDBDownloader::startTimeout(CURLM* multi, long timeout_ms, void* userp)
601{
602 auto timeout = (uv_timer_t*)userp;
603
604 if (timeout_ms < 0) {
605 uvErrorCheck(uv_timer_stop(timeout), SEVERE);
606 } else {
607 if (timeout_ms == 0) {
608 timeout_ms = 1; // Calling curlTimeout when timeout = 0 could create an infinite loop
609 }
610 uvErrorCheck(uv_timer_start(timeout, curlTimeout, timeout_ms, 0), SEVERE);
611 }
612 return 0;
613}
614
615void CCDBDownloader::setHandleOptions(CURL* handle, PerformData* data)
616{
617 curlEasyErrorCheck(curl_easy_setopt(handle, CURLOPT_PRIVATE, data));
618 curlEasyErrorCheck(curl_easy_setopt(handle, CURLOPT_CLOSESOCKETFUNCTION, closesocketCallback));
619 curlEasyErrorCheck(curl_easy_setopt(handle, CURLOPT_CLOSESOCKETDATA, this));
620 curlEasyErrorCheck(curl_easy_setopt(handle, CURLOPT_OPENSOCKETFUNCTION, opensocketCallback));
621 curlEasyErrorCheck(curl_easy_setopt(handle, CURLOPT_OPENSOCKETDATA, this));
622
623 curlEasyErrorCheck(curl_easy_setopt(handle, CURLOPT_TIMEOUT_MS, mRequestTimeoutMS));
624 curlEasyErrorCheck(curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT_MS, mConnectionTimeoutMS));
625 curlEasyErrorCheck(curl_easy_setopt(handle, CURLOPT_HAPPY_EYEBALLS_TIMEOUT_MS, mHappyEyeballsHeadstartMS));
626 curlEasyErrorCheck(curl_easy_setopt(handle, CURLOPT_USERAGENT, mUserAgentId.c_str()));
627}
628
629void CCDBDownloader::checkHandleQueue()
630{
631 if (mHandlesToBeAdded.size() > 0) {
632 // Add handles without going over the limit
633 while (mHandlesToBeAdded.size() > 0 && mHandlesInUse < mMaxHandlesInUse) {
634 curlMultiErrorCheck(curl_multi_add_handle(mCurlMultiHandle, mHandlesToBeAdded.front()));
635 mHandlesInUse++;
636 mHandlesToBeAdded.erase(mHandlesToBeAdded.begin());
637 }
638 }
639}
640
641void CCDBDownloader::runLoop(bool noWait)
642{
643 uv_run(mUVLoop, noWait ? UV_RUN_NOWAIT : UV_RUN_ONCE);
644}
645
646CURLcode CCDBDownloader::perform(CURL* handle)
647{
648 std::vector<CURL*> handleVector;
649 handleVector.push_back(handle);
650 return batchBlockingPerform(handleVector).back();
651}
652
653std::vector<std::string> CCDBDownloader::getLocations(std::multimap<std::string, std::string>* headerMap) const
654{
655 std::vector<std::string> locs;
656 auto iter = headerMap->find("Location");
657 if (iter != headerMap->end()) {
658 locs.push_back(iter->second);
659 }
660 // add alternative locations (not yet included)
661 auto iter2 = headerMap->find("Content-Location");
662 if (iter2 != headerMap->end()) {
663 auto range = headerMap->equal_range("Content-Location");
664 for (auto it = range.first; it != range.second; ++it) {
665 if (std::find(locs.begin(), locs.end(), it->second) == locs.end()) {
666 locs.push_back(it->second);
667 }
668 }
669 }
670 return locs;
671}
672
673std::vector<CURLcode> CCDBDownloader::batchBlockingPerform(std::vector<CURL*> const& handleVector)
674{
675 std::vector<CURLcode> codeVector(handleVector.size());
676 size_t requestsLeft = handleVector.size();
677
678 for (int i = 0; i < handleVector.size(); i++) {
679 auto* data = new CCDBDownloader::PerformData();
680 data->codeDestination = &codeVector[i];
681 codeVector[i] = CURLE_FAILED_INIT;
682
683 data->type = BLOCKING;
684 data->requestsLeft = &requestsLeft;
685 setHandleOptions(handleVector[i], data);
686 mHandlesToBeAdded.push_back(handleVector[i]);
687 }
688 checkHandleQueue();
689 while (requestsLeft > 0) {
690 uv_run(mUVLoop, UV_RUN_ONCE);
691 }
692
693 return codeVector;
694}
695
696void CCDBDownloader::asynchSchedule(CURL* handle, size_t* requestCounter)
697{
698 (*requestCounter)++;
699
700 CURLcode* codeVector = new CURLcode();
701
702 // Get data about request
703 DownloaderRequestData* requestData;
704 std::multimap<std::string, std::string>* headerMap;
705 std::vector<std::string>* hostsPool;
706 curl_easy_getinfo(handle, CURLINFO_PRIVATE, &requestData);
707 headerMap = &(requestData->hoPair.header);
708 hostsPool = &(requestData->hosts);
709 auto* options = &(requestData->optionsList);
710
711 // Prepare temporary data about transfer
712 auto* data = new CCDBDownloader::PerformData(); // Freed in transferFinished
713 data->codeDestination = codeVector;
714 *codeVector = CURLE_FAILED_INIT;
715
716 data->type = ASYNCHRONOUS;
717 data->requestsLeft = requestCounter;
718 data->hostInd = 0;
719 data->locInd = 0;
720 data->requestData = requestData;
721 data->options = options;
722
723 // Prepare handle and schedule download
724 setHandleOptions(handle, data);
725 mHandlesToBeAdded.push_back(handle);
726
727 checkHandleQueue();
728
729 // return codeVector;
730}
731
732std::string CCDBDownloader::prepareLogMessage(std::string host_url, std::string userAgent, const std::string& path, long ts, const std::map<std::string, std::string>* headers, long httpCode) const
733{
734 std::string upath{path};
735 if (headers) {
736 auto ent = headers->find("Valid-From");
737 if (ent != headers->end()) {
738 upath += "/" + ent->second;
739 }
740 ent = headers->find("ETag");
741 if (ent != headers->end()) {
742 upath += "/" + ent->second;
743 }
744 }
745 upath.erase(remove(upath.begin(), upath.end(), '\"'), upath.end());
746 return fmt::format("CcdbDownloader finished transfer {}{}{} for {} (agent_id: {}) with http code: {}", host_url, (host_url.back() == '/') ? "" : "/", upath, (ts < 0) ? getCurrentTimestamp() : ts, userAgent, httpCode);
747}
748
749} // namespace o2::ccdb
struct uv_timer_s uv_timer_t
struct uv_handle_s uv_handle_t
struct uv_poll_s uv_poll_t
struct uv_loop_s uv_loop_t
int32_t i
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
Definition Signpost.h:515
#define O2_DECLARE_DYNAMIC_STACKTRACE_LOG(name)
For the moment we do not support logs with a stacktrace.
Definition Signpost.h:475
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:490
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
Definition Signpost.h:525
std::ostringstream debug
void setRequestTimeoutTime(int timeoutMS)
CURLcode perform(CURL *handle)
void asynchSchedule(CURL *handle, size_t *requestCounter)
std::string trimHostUrl(std::string full_host_url) const
void setKeepaliveTimeoutTime(int timeoutMS)
std::vector< CURLcode > batchBlockingPerform(std::vector< CURL * > const &handleVector)
void setConnectionTimeoutTime(int timeoutMS)
void setHappyEyeballsHeadstartTime(int headstartMS)
std::string prepareLogMessage(std::string host_url, std::string userAgent, const std::string &path, long ts, const std::map< std::string, std::string > *headers, long httpCode) const
std::unordered_map< uv_handle_t *, bool > mHandleMap
void setMaxParallelConnections(int limit)
CCDBDownloader(uv_loop_t *uv_loop=nullptr)
GLuint const GLint * locations
Definition glcorearb.h:5740
GLuint GLuint64EXT address
Definition glcorearb.h:5846
GLenum GLint * range
Definition glcorearb.h:1899
GLboolean * data
Definition glcorearb.h:298
GLbitfield flags
Definition glcorearb.h:1570
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLbitfield GLuint64 timeout
Definition glcorearb.h:1573
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
information complementary to a CCDB object (path, metadata, startTimeValidity, endTimeValidity etc)
void curlEasyErrorCheck(CURLcode code)
std::string uniqueAgentID()
void uvErrorCheck(int code, DownloaderErrorLevel level)
struct o2::ccdb::DownloaderRequestData DownloaderRequestData
long getCurrentTimestamp()
returns the timestamp in long corresponding to "now"
void curlMultiErrorCheck(CURLMcode code)
void closeHandles(uv_handle_t *handle, void *arg)
curl_socket_t opensocketCallback(void *clientp, curlsocktype purpose, struct curl_sockaddr *address)
void onUVClose(uv_handle_t *handle)
Polygon< T > close(Polygon< T > polygon)
Definition Polygon.h:126
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::vector< std::string > hosts
std::multimap< std::string, std::string > header
static std::string getRandomString(int length)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"