Project
Loading...
Searching...
No Matches
CCDBDownloader.cxx
Go to the documentation of this file.
1// Copyright 2019-2023 CERN and copyright holders of ALICE O2.
2// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3// All rights not expressly granted are reserved.
4//
5// This software is distributed under the terms of the GNU General Public
6// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7//
8// In applying this license CERN does not waive the privileges and immunities
9// granted to it by virtue of its status as an Intergovernmental Organization
10// or submit itself to any jurisdiction.
11
12#include <CCDB/CCDBDownloader.h>
15#include "Framework/Signpost.h"
16
17#include <curl/curl.h>
18#include <unordered_map>
19#include <cstdio>
20#include <cstdlib>
21#include <uv.h>
22#include <string>
23#include <thread>
24#include <vector>
25#include <condition_variable>
26#include <mutex>
27#include <unistd.h>
28#include <sys/types.h>
29#include <sys/socket.h>
30#include <fairlogger/Logger.h>
31#include <boost/asio/ip/host_name.hpp>
32
34
35namespace o2::ccdb
36{
37
39{
40 if (code != 0) {
41 char buf[1000];
42 uv_strerror_r(code, buf, 1000);
43 O2_SIGNPOST_ID_GENERATE(sid, ccdb_downloader);
44 if (level == SEVERE) {
45 O2_SIGNPOST_EVENT_EMIT_ERROR(ccdb_downloader, sid, "CCDBDownloader", "UV error - %{public}s", buf);
46 } else {
47 O2_SIGNPOST_EVENT_EMIT_WARN(ccdb_downloader, sid, "CCDBDownloader", "UV minor error - %{public}s", buf);
48 }
49 }
50}
51
52void curlEasyErrorCheck(CURLcode code)
53{
54 if (code != CURLE_OK) {
55 O2_SIGNPOST_ID_GENERATE(sid, ccdb_downloader);
56 O2_SIGNPOST_EVENT_EMIT_ERROR(ccdb_downloader, sid, "CCDBDownloader", "CURL error - %{public}s", curl_easy_strerror(code));
57 }
58}
59
60void curlMultiErrorCheck(CURLMcode code)
61{
62 if (code != CURLM_OK) {
63 O2_SIGNPOST_ID_GENERATE(sid, ccdb_downloader);
64 O2_SIGNPOST_EVENT_EMIT_ERROR(ccdb_downloader, sid, "CCDBDownloader", "CURL error - %{public}s", curl_multi_strerror(code));
65 }
66}
67namespace
68{
69std::string uniqueAgentID()
70{
71 std::string host = boost::asio::ip::host_name();
72 char const* jobID = getenv("ALIEN_PROC_ID");
73 if (jobID) {
74 return fmt::format("{}-{}-{}-{}", host, getCurrentTimestamp() / 1000, o2::utils::Str::getRandomString(6), jobID);
75 } else {
76 return fmt::format("{}-{}-{}", host, getCurrentTimestamp() / 1000, o2::utils::Str::getRandomString(6));
77 }
78}
79} // namespace
80
82 : mUserAgentId(uniqueAgentID())
83{
84 if (uv_loop) {
85 mExternalLoop = true;
86 mUVLoop = uv_loop;
87 } else {
88 mExternalLoop = false;
89 setupInternalUVLoop();
90 }
91
92 // Preparing timer to be used by curl
93 mTimeoutTimer = (uv_timer_t*)malloc(sizeof(*mTimeoutTimer));
94 mTimeoutTimer->data = this;
95 uvErrorCheck(uv_timer_init(mUVLoop, mTimeoutTimer), SEVERE);
96 mHandleMap[(uv_handle_t*)mTimeoutTimer] = true;
97
98 initializeMultiHandle();
99}
100
101void CCDBDownloader::setupInternalUVLoop()
102{
103 mUVLoop = new uv_loop_t();
104 uvErrorCheck(uv_loop_init(mUVLoop), SEVERE);
105}
106
107void CCDBDownloader::initializeMultiHandle()
108{
109 mCurlMultiHandle = curl_multi_init();
110 curlMultiErrorCheck(curl_multi_setopt(mCurlMultiHandle, CURLMOPT_SOCKETFUNCTION, handleSocket));
111 auto socketData = &mSocketData;
112 socketData->curlm = mCurlMultiHandle;
113 socketData->CD = this;
114 curlMultiErrorCheck(curl_multi_setopt(mCurlMultiHandle, CURLMOPT_SOCKETDATA, socketData));
115 curlMultiErrorCheck(curl_multi_setopt(mCurlMultiHandle, CURLMOPT_TIMERFUNCTION, startTimeout));
116 curlMultiErrorCheck(curl_multi_setopt(mCurlMultiHandle, CURLMOPT_TIMERDATA, mTimeoutTimer));
117 curlMultiErrorCheck(curl_multi_setopt(mCurlMultiHandle, CURLMOPT_MAX_TOTAL_CONNECTIONS, mMaxHandlesInUse));
118}
119
121{
122 // Loop has been ordered to stop via signalToClose()
123 curlMultiErrorCheck(curl_multi_cleanup(mCurlMultiHandle));
124
125 if (!mExternalLoop) {
126 // Schedule all handles to close. Execute loop to allow them to execute their destructors.
127 while (uv_loop_alive(mUVLoop) || (uv_loop_close(mUVLoop) == UV_EBUSY)) {
128 uv_walk(mUVLoop, closeHandles, this);
129 uv_run(mUVLoop, UV_RUN_ONCE);
130 }
131 delete mUVLoop;
132 }
133}
134
135void closeHandles(uv_handle_t* handle, void* arg)
136{
137 auto CD = (CCDBDownloader*)arg;
138 // Close only handles belonging to the Downloader
139 if (CD->mHandleMap.find(handle) != CD->mHandleMap.end()) {
140 if (!uv_is_closing(handle)) {
141 uv_close(handle, onUVClose);
142 }
143 }
144}
145
147{
148 if (handle != nullptr) {
149 free(handle);
150 }
151}
152
153void CCDBDownloader::closesocketCallback(void* clientp, curl_socket_t item)
154{
155 auto CD = (CCDBDownloader*)clientp;
156 if (CD->mExternalLoop) {
157 // If external uv loop is used then the keepalive mechanism is active.
158 if (CD->mSocketTimerMap.find(item) != CD->mSocketTimerMap.end()) {
159 auto timer = CD->mSocketTimerMap[item];
160 uvErrorCheck(uv_timer_stop(timer), SEVERE);
161 // we are getting rid of the uv_timer_t pointer ... so we need
162 // to free possibly attached user data pointers as well. Counteracts action of opensocketCallback
163 if (timer->data) {
164 delete (DataForClosingSocket*)timer->data;
165 }
166 CD->mSocketTimerMap.erase(item);
167 if (close(item) == -1) {
168 O2_SIGNPOST_ID_GENERATE(sid, ccdb_downloader);
169 O2_SIGNPOST_EVENT_EMIT_ERROR(ccdb_downloader, sid, "CCDBDownloader", "CCDBDownloader: Socket failed to close");
170 }
171 }
172 } else {
173 if (close(item) == -1) {
174 O2_SIGNPOST_ID_GENERATE(sid, ccdb_downloader);
175 O2_SIGNPOST_EVENT_EMIT_ERROR(ccdb_downloader, sid, "CCDBDownloader", "CCDBDownloader: Socket failed to close");
176 }
177 }
178}
179
180curl_socket_t opensocketCallback(void* clientp, curlsocktype purpose, struct curl_sockaddr* address)
181{
182 auto CD = (CCDBDownloader*)clientp;
183 auto sock = socket(address->family, address->socktype, address->protocol);
184 if (sock == -1) {
185 O2_SIGNPOST_ID_GENERATE(sid, ccdb_downloader);
186 O2_SIGNPOST_EVENT_EMIT_ERROR(ccdb_downloader, sid, "CCDBDownloader", "CCDBDownloader: Socket failed to open");
187 }
188
189 if (CD->mExternalLoop) {
190 CD->mSocketTimerMap[sock] = (uv_timer_t*)malloc(sizeof(*CD->mSocketTimerMap[sock]));
191 uvErrorCheck(uv_timer_init(CD->mUVLoop, CD->mSocketTimerMap[sock]), SEVERE);
192 CD->mHandleMap[(uv_handle_t*)CD->mSocketTimerMap[sock]] = true;
193
194 auto data = new DataForClosingSocket();
195 data->CD = CD;
196 data->socket = sock;
197 CD->mSocketTimerMap[sock]->data = data;
198 }
199
200 return sock;
201}
202
203void CCDBDownloader::closeSocketByTimer(uv_timer_t* handle)
204{
205 auto data = (DataForClosingSocket*)handle->data;
206 auto CD = data->CD;
207 auto sock = data->socket;
208
209 if (CD->mSocketTimerMap.find(sock) != CD->mSocketTimerMap.end()) {
210 uvErrorCheck(uv_timer_stop(CD->mSocketTimerMap[sock]), SEVERE);
211 CD->mSocketTimerMap.erase(sock);
212 if (close(sock) == -1) {
213 O2_SIGNPOST_ID_GENERATE(sid, ccdb_downloader);
214 O2_SIGNPOST_EVENT_EMIT_ERROR(ccdb_downloader, sid, "CCDBDownloader", "CCDBDownloader: Socket failed to close");
215 }
216 delete data;
217 }
218}
219
220void CCDBDownloader::curlTimeout(uv_timer_t* handle)
221{
222 auto CD = (CCDBDownloader*)handle->data;
223 int running_handles;
224 curl_multi_socket_action(CD->mCurlMultiHandle, CURL_SOCKET_TIMEOUT, 0, &running_handles);
225 CD->checkMultiInfo();
226}
227
228void CCDBDownloader::curlPerform(uv_poll_t* handle, int status, int events)
229{
230 uvErrorCheck(status, MINOR);
231 int running_handles;
232 int flags = 0;
233 if (events & UV_READABLE) {
234 flags |= CURL_CSELECT_IN;
235 }
236 if (events & UV_WRITABLE) {
237 flags |= CURL_CSELECT_OUT;
238 }
239
240 auto context = (CCDBDownloader::curl_context_t*)handle->data;
241
242 curlMultiErrorCheck(curl_multi_socket_action(context->CD->mCurlMultiHandle, context->sockfd, flags, &running_handles));
243 context->CD->checkMultiInfo();
244}
245
246int CCDBDownloader::handleSocket(CURL* easy, curl_socket_t s, int action, void* userp, void* socketp)
247{
248 auto socketData = (CCDBDownloader::DataForSocket*)userp;
249 auto CD = (CCDBDownloader*)socketData->CD;
250 CCDBDownloader::curl_context_t* curl_context;
251 int events = 0;
252
253 switch (action) {
254 case CURL_POLL_IN:
255 case CURL_POLL_OUT:
256 case CURL_POLL_INOUT:
257
258 curl_context = socketp ? (CCDBDownloader::curl_context_t*)socketp : CD->createCurlContext(s);
259 curlMultiErrorCheck(curl_multi_assign(socketData->curlm, s, (void*)curl_context));
260
261 if (action != CURL_POLL_IN) {
262 events |= UV_WRITABLE;
263 }
264 if (action != CURL_POLL_OUT) {
265 events |= UV_READABLE;
266 }
267
268 if (CD->mExternalLoop && CD->mSocketTimerMap.find(s) != CD->mSocketTimerMap.end()) {
269 uvErrorCheck(uv_timer_stop(CD->mSocketTimerMap[s]), SEVERE);
270 }
271
272 uvErrorCheck(uv_poll_start(curl_context->poll_handle, events, curlPerform), SEVERE);
273 break;
274 case CURL_POLL_REMOVE:
275 if (socketp) {
276 if (CD->mExternalLoop) {
277 // If external loop is used then start the keepalive timeout.
278 if (CD->mSocketTimerMap.find(s) != CD->mSocketTimerMap.end()) {
279 uvErrorCheck(uv_timer_start(CD->mSocketTimerMap[s], closeSocketByTimer, CD->mKeepaliveTimeoutMS, 0), SEVERE);
280 }
281 }
282 uvErrorCheck(uv_poll_stop(((CCDBDownloader::curl_context_t*)socketp)->poll_handle), SEVERE);
283 CD->destroyCurlContext((CCDBDownloader::curl_context_t*)socketp);
284 curlMultiErrorCheck(curl_multi_assign(socketData->curlm, s, nullptr));
285 }
286 break;
287 default:
288 abort();
289 }
290
291 return 0;
292}
293
295{
296 mMaxHandlesInUse = limit;
297}
298
300{
301 mKeepaliveTimeoutMS = timeoutMS;
302}
303
305{
306 mConnectionTimeoutMS = timeoutMS;
307}
308
310{
311 mRequestTimeoutMS = timeoutMS;
312}
313
315{
316 mHappyEyeballsHeadstartMS = headstartMS;
317}
318
325
332
333CCDBDownloader::curl_context_t* CCDBDownloader::createCurlContext(curl_socket_t sockfd)
334{
335 curl_context_t* context;
336
337 context = (curl_context_t*)malloc(sizeof(*context));
338 context->CD = this;
339 context->sockfd = sockfd;
340 context->poll_handle = (uv_poll_t*)malloc(sizeof(*context->poll_handle));
341
342 uvErrorCheck(uv_poll_init_socket(mUVLoop, context->poll_handle, sockfd), SEVERE);
343 mHandleMap[(uv_handle_t*)(context->poll_handle)] = true;
344 context->poll_handle->data = context;
345
346 return context;
347}
348
349void CCDBDownloader::curlCloseCB(uv_handle_t* handle)
350{
351 auto* context = (curl_context_t*)handle->data;
352 free(context->poll_handle);
353 free(context);
354}
355
356void CCDBDownloader::destroyCurlContext(curl_context_t* context)
357{
358 uv_close((uv_handle_t*)context->poll_handle, curlCloseCB);
359}
360
361void CCDBDownloader::tryNewHost(PerformData* performData, CURL* easy_handle)
362{
363 auto requestData = performData->requestData;
364 std::string newUrl = requestData->hosts.at(performData->hostInd) + "/" + requestData->path + "/" + std::to_string(requestData->timestamp);
365 LOG(debug) << "Connecting to another host " << newUrl << "\n";
366 requestData->hoPair.header.clear();
367 curl_easy_setopt(easy_handle, CURLOPT_URL, newUrl.c_str());
368 mHandlesToBeAdded.push_back(easy_handle);
369}
370
371void CCDBDownloader::getLocalContent(PerformData* performData, std::string& newLocation, bool& contentRetrieved, std::vector<std::string>& locations)
372{
373 auto requestData = performData->requestData;
374 LOG(debug) << "Redirecting to local content " << newLocation << "\n";
375 if (requestData->localContentCallback(newLocation)) {
376 contentRetrieved = true;
377 LOG(debug) << "Local content retrieved succesfully: " << newLocation << " n";
378 } else {
379 // Prepare next redirect url
380 newLocation = getNewLocation(performData, locations);
381 LOG(debug) << "Failed to retrieve local content: " << newLocation << "\n";
382 }
383}
384
385std::string CCDBDownloader::getNewLocation(PerformData* performData, std::vector<std::string>& locations) const
386{
387 auto requestData = performData->requestData;
388 if (performData->locInd < locations.size()) {
389 std::string newLocation = locations.at(performData->locInd++);
390 std::string hostUrl = requestData->hosts.at(performData->hostInd);
391 std::string newUrl = prepareRedirectedURL(newLocation, hostUrl);
392 return newUrl;
393 } else {
394 return "";
395 }
396}
397
398void CCDBDownloader::httpRedirect(PerformData* performData, std::string& newLocation, CURL* easy_handle)
399{
400 auto requestData = performData->requestData;
401 LOG(debug) << "Trying content location " << newLocation << "\n";
402 curl_easy_setopt(easy_handle, CURLOPT_URL, newLocation.c_str());
403 mHandlesToBeAdded.push_back(easy_handle);
404}
405
406void CCDBDownloader::followRedirect(PerformData* performData, CURL* easy_handle, std::vector<std::string>& locations, bool& rescheduled, bool& contentRetrieved)
407{
408 std::string newLocation = getNewLocation(performData, locations);
409 while (!contentRetrieved && (newLocation.find("alien:/", 0) != std::string::npos || newLocation.find("file:/", 0) != std::string::npos)) {
410 getLocalContent(performData, newLocation, contentRetrieved, locations);
411 }
412 if (!contentRetrieved && newLocation != "") {
413 httpRedirect(performData, newLocation, easy_handle);
414 rescheduled = true;
415 }
416}
417
418std::string CCDBDownloader::trimHostUrl(std::string full_host_url) const
419{
420 CURLU* host_url = curl_url();
421 curl_url_set(host_url, CURLUPART_URL, full_host_url.c_str(), 0);
422
423 // Get host part (the only critical part)
424 char* host;
425 CURLUcode host_result = curl_url_get(host_url, CURLUPART_HOST, &host, 0);
426 if (host_result != CURLUE_OK) {
427 LOG(error) << "CCDBDownloader: Malformed url detected when processing redirect, could not identify the host part: " << full_host_url;
428 curl_url_cleanup(host_url);
429 return "";
430 }
431 // Get scheme (protocol) part
432 char* scheme;
433 CURLUcode scheme_result = curl_url_get(host_url, CURLUPART_SCHEME, &scheme, 0);
434 // Get port
435 char* port;
436 CURLUcode port_result = curl_url_get(host_url, CURLUPART_PORT, &port, 0);
437
438 curl_url_cleanup(host_url);
439
440 // Assemble parts
441 std::string trimmed_url = "";
442 if (scheme_result == CURLUE_OK) {
443 trimmed_url += scheme + std::string("://");
444 free(scheme);
445 }
446 trimmed_url += host;
447 free(host);
448 if (port_result == CURLUE_OK) {
449 trimmed_url += std::string(":") + port;
450 free(port);
451 }
452 return trimmed_url;
453}
454
455std::string CCDBDownloader::prepareRedirectedURL(std::string address, std::string potentialHost) const
456{
457 // If it is an alien or local address it does not need preparation
458 if (address.find("alien:/") != std::string::npos || address.find("file:/") != std::string::npos) {
459 return address;
460 }
461 // Check if URL contains a scheme (protocol)
462 CURLU* redirected_url = curl_url();
463 curl_url_set(redirected_url, CURLUPART_URL, address.c_str(), 0);
464 char* scheme;
465 CURLUcode scheme_result = curl_url_get(redirected_url, CURLUPART_SCHEME, &scheme, 0);
466 curl_free(scheme);
467 curl_url_cleanup(redirected_url);
468 if (scheme_result == CURLUE_OK) {
469 // The redirected_url contains a scheme (protocol) so there is no need for preparation
470 return address;
471 }
472 // If the address doesn't contain a scheme it means it is a relative url. We need to append it to the trimmed host url
473 // The host url must be trimmed from it's path (if it ends in one) as otherwise the redirection url would be appended after said path
474 return trimHostUrl(potentialHost) + address;
475}
476
477void CCDBDownloader::transferFinished(CURL* easy_handle, CURLcode curlCode)
478{
479 mHandlesInUse--;
480 PerformData* performData;
481 curlEasyErrorCheck(curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &performData));
482
483 curlMultiErrorCheck(curl_multi_remove_handle(mCurlMultiHandle, easy_handle));
484 *performData->codeDestination = curlCode;
485
486 bool rescheduled = false;
487 bool contentRetrieved = false;
488
489 if (curlCode != 0) {
490 LOG(error) << "CCDBDownloader CURL transfer error - " << curl_easy_strerror(curlCode) << "\n";
491 }
492
493 switch (performData->type) {
494 case BLOCKING: {
495 --(*performData->requestsLeft);
496 } break;
497 case ASYNCHRONOUS: {
498 DownloaderRequestData* requestData = performData->requestData;
499 if (requestData->headers) {
500 for (auto& p : requestData->hoPair.header) {
501 (*requestData->headers)[p.first] = p.second;
502 }
503 }
504 // Log that transfer finished
505 long httpCode;
506 curl_easy_getinfo(easy_handle, CURLINFO_RESPONSE_CODE, &httpCode);
507 char* url;
508 curl_easy_getinfo(easy_handle, CURLINFO_EFFECTIVE_URL, &url);
509 LOG(debug) << "Transfer for " << url << " finished with code " << httpCode << "\n";
510 std::string currentHost = requestData->hosts[performData->hostInd];
511 std::string loggingMessage = prepareLogMessage(currentHost, requestData->userAgent, requestData->path, requestData->timestamp, requestData->headers, httpCode);
512
513 // Get new locations based on received headers
514 updateLocations(&(requestData->hoPair.header), &requestData->locations, &performData->locInd);
515
516 // React to received http code
517 if (200 <= httpCode && httpCode < 400) {
518 LOG(debug) << loggingMessage;
519 if (304 == httpCode) {
520 LOGP(debug, "Object exists but I am not serving it since it's already in your possession");
521 contentRetrieved = true;
522 } else if (300 <= httpCode && httpCode < 400 && performData->locInd < requestData->locations.size()) {
523 followRedirect(performData, easy_handle, requestData->locations, rescheduled, contentRetrieved);
524 } else if (200 <= httpCode && httpCode < 300) {
525 contentRetrieved = true; // Can be overruled by following error check
526 }
527 } else {
528 LOG(error) << loggingMessage;
529 }
530
531 // Check for errors
532 if (curlCode != 0) {
533 contentRetrieved = false;
534 }
535
536 // Check if content was retrieved or scheduled to be retrieved
537 if (!rescheduled && !contentRetrieved) {
538 // Current location failed without providing 3xx http code, try next redirect for the same host
539 if (performData->locInd < requestData->locations.size()) {
540 followRedirect(performData, easy_handle, requestData->locations, rescheduled, contentRetrieved);
541 }
542 }
543
544 // Check again because content might have been retrieved or rescheduled via a redirect
545 if (!rescheduled && !contentRetrieved) {
546 // Ran out of locations to redirect, try new host
547 if (++performData->hostInd < requestData->hosts.size()) {
548 tryNewHost(performData, easy_handle);
549 rescheduled = true;
550 } else {
551 LOG(error) << "File " << requestData->path << " could not be retrieved. No more hosts to try.";
552 }
553 }
554
555 if (!rescheduled) {
556 // No more transfers will be done for this request, do cleanup specific for ASYNCHRONOUS calls
557 if (!contentRetrieved) {
558 if (requestData->hoPair.object) {
559 requestData->hoPair.object->clear();
560 }
561 if (requestData->headers) {
562 (*requestData->headers)["Error"] = "An error occurred during retrieval";
563 }
564 LOGP(alarm, "Curl request to {}, response code: {}", url, httpCode);
565 } else {
566 if (requestData->headers && requestData->headers->find("fileSize") == requestData->headers->end()) {
567 (*requestData->headers)["fileSize"] = fmt::format("{}", requestData->hoPair.object ? requestData->hoPair.object->size() : 0);
568 }
569 }
570 --(*performData->requestsLeft);
571 curl_slist_free_all(*performData->options);
572 delete requestData;
573 delete performData->codeDestination;
574 curl_easy_cleanup(easy_handle);
575 }
576 } break;
577 }
578 if (!rescheduled) {
579 // No more transfers will be done for this request, do general cleanup
580 delete performData;
581 }
582
583 checkHandleQueue();
584
585 // Calling timeout starts a new download if a new easy_handle was added.
586 int running_handles;
587 curlMultiErrorCheck(curl_multi_socket_action(mCurlMultiHandle, CURL_SOCKET_TIMEOUT, 0, &running_handles));
588 checkMultiInfo();
589}
590
591void CCDBDownloader::checkMultiInfo()
592{
593 CURLMsg* message;
594 int pending;
595
596 while ((message = curl_multi_info_read(mCurlMultiHandle, &pending))) {
597 switch (message->msg) {
598 case CURLMSG_DONE: {
599 CURLcode code = message->data.result;
600 transferFinished(message->easy_handle, code);
601 } break;
602
603 default:
604 fprintf(stderr, "CURLMSG default\n");
605 break;
606 }
607 }
608}
609
610int CCDBDownloader::startTimeout(CURLM* multi, long timeout_ms, void* userp)
611{
612 auto timeout = (uv_timer_t*)userp;
613
614 if (timeout_ms < 0) {
615 uvErrorCheck(uv_timer_stop(timeout), SEVERE);
616 } else {
617 if (timeout_ms == 0) {
618 timeout_ms = 1; // Calling curlTimeout when timeout = 0 could create an infinite loop
619 }
620 uvErrorCheck(uv_timer_start(timeout, curlTimeout, timeout_ms, 0), SEVERE);
621 }
622 return 0;
623}
624
625void CCDBDownloader::setHandleOptions(CURL* handle, PerformData* data)
626{
627 curlEasyErrorCheck(curl_easy_setopt(handle, CURLOPT_PRIVATE, data));
628 curlEasyErrorCheck(curl_easy_setopt(handle, CURLOPT_CLOSESOCKETFUNCTION, closesocketCallback));
629 curlEasyErrorCheck(curl_easy_setopt(handle, CURLOPT_CLOSESOCKETDATA, this));
630 curlEasyErrorCheck(curl_easy_setopt(handle, CURLOPT_OPENSOCKETFUNCTION, opensocketCallback));
631 curlEasyErrorCheck(curl_easy_setopt(handle, CURLOPT_OPENSOCKETDATA, this));
632
633 curlEasyErrorCheck(curl_easy_setopt(handle, CURLOPT_TIMEOUT_MS, mRequestTimeoutMS));
634 curlEasyErrorCheck(curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT_MS, mConnectionTimeoutMS));
635 curlEasyErrorCheck(curl_easy_setopt(handle, CURLOPT_HAPPY_EYEBALLS_TIMEOUT_MS, mHappyEyeballsHeadstartMS));
636 curlEasyErrorCheck(curl_easy_setopt(handle, CURLOPT_USERAGENT, mUserAgentId.c_str()));
637}
638
639void CCDBDownloader::checkHandleQueue()
640{
641 if (mHandlesToBeAdded.size() > 0) {
642 // Add handles without going over the limit
643 while (mHandlesToBeAdded.size() > 0 && mHandlesInUse < mMaxHandlesInUse) {
644 curlMultiErrorCheck(curl_multi_add_handle(mCurlMultiHandle, mHandlesToBeAdded.front()));
645 mHandlesInUse++;
646 mHandlesToBeAdded.erase(mHandlesToBeAdded.begin());
647 }
648 }
649}
650
651void CCDBDownloader::runLoop(bool noWait)
652{
653 uv_run(mUVLoop, noWait ? UV_RUN_NOWAIT : UV_RUN_ONCE);
654}
655
656CURLcode CCDBDownloader::perform(CURL* handle)
657{
658 std::vector<CURL*> handleVector;
659 handleVector.push_back(handle);
660 return batchBlockingPerform(handleVector).back();
661}
662
663void CCDBDownloader::updateLocations(std::multimap<std::string, std::string>* headerMap, std::vector<std::string>* locations, int* locIndex) const
664{
665 std::vector<std::string> newLocations;
666
667 auto iter = headerMap->find("Location");
668 if (iter != headerMap->end()) {
669 auto range = headerMap->equal_range("Location");
670 for (auto it = range.first; it != range.second; ++it) {
671 if (std::find(locations->begin(), locations->end(), it->second) == locations->end()) {
672 if (std::find(newLocations.begin(), newLocations.end(), it->second) == newLocations.end()) {
673 newLocations.push_back(it->second);
674 }
675 }
676 }
677 }
678
679 // add alternative locations (not yet included)
680 auto iter2 = headerMap->find("Content-Location");
681 if (iter2 != headerMap->end()) {
682 auto range = headerMap->equal_range("Content-Location");
683 for (auto it = range.first; it != range.second; ++it) {
684 if (std::find(locations->begin(), locations->end(), it->second) == locations->end()) {
685 if (std::find(newLocations.begin(), newLocations.end(), it->second) == newLocations.end()) {
686 newLocations.push_back(it->second);
687 }
688 }
689 }
690 }
691
692 // Insert location list at the current location index. This assures that the provided locations will be tried first.
693 locations->insert(locations->begin() + (*locIndex), newLocations.begin(), newLocations.end());
694}
695
696std::vector<CURLcode> CCDBDownloader::batchBlockingPerform(std::vector<CURL*> const& handleVector)
697{
698 std::vector<CURLcode> codeVector(handleVector.size());
699 size_t requestsLeft = handleVector.size();
700
701 for (int i = 0; i < handleVector.size(); i++) {
702 auto* data = new CCDBDownloader::PerformData();
703 data->codeDestination = &codeVector[i];
704 codeVector[i] = CURLE_FAILED_INIT;
705
706 data->type = BLOCKING;
707 data->requestsLeft = &requestsLeft;
708 setHandleOptions(handleVector[i], data);
709 mHandlesToBeAdded.push_back(handleVector[i]);
710 }
711 checkHandleQueue();
712 while (requestsLeft > 0) {
713 uv_run(mUVLoop, UV_RUN_ONCE);
714 }
715
716 return codeVector;
717}
718
719void CCDBDownloader::asynchSchedule(CURL* handle, size_t* requestCounter)
720{
721 (*requestCounter)++;
722
723 CURLcode* codeVector = new CURLcode();
724
725 // Get data about request
726 DownloaderRequestData* requestData;
727 std::multimap<std::string, std::string>* headerMap;
728 std::vector<std::string>* hostsPool;
729 curl_easy_getinfo(handle, CURLINFO_PRIVATE, &requestData);
730 headerMap = &(requestData->hoPair.header);
731 hostsPool = &(requestData->hosts);
732 auto* options = &(requestData->optionsList);
733
734 // Prepare temporary data about transfer
735 auto* data = new CCDBDownloader::PerformData(); // Freed in transferFinished
736 data->codeDestination = codeVector;
737 *codeVector = CURLE_FAILED_INIT;
738
739 data->type = ASYNCHRONOUS;
740 data->requestsLeft = requestCounter;
741 data->hostInd = 0;
742 data->locInd = 0;
743 data->requestData = requestData;
744 data->options = options;
745
746 // Prepare handle and schedule download
747 setHandleOptions(handle, data);
748 mHandlesToBeAdded.push_back(handle);
749
750 checkHandleQueue();
751
752 // return codeVector;
753}
754
755std::string CCDBDownloader::prepareLogMessage(std::string host_url, std::string userAgent, const std::string& path, long ts, const std::map<std::string, std::string>* headers, long httpCode) const
756{
757 std::string upath{path};
758 if (headers) {
759 auto ent = headers->find("Valid-From");
760 if (ent != headers->end()) {
761 upath += "/" + ent->second;
762 }
763 ent = headers->find("ETag");
764 if (ent != headers->end()) {
765 upath += "/" + ent->second;
766 }
767 }
768 upath.erase(remove(upath.begin(), upath.end(), '\"'), upath.end());
769 return fmt::format("CcdbDownloader finished transfer {}{}{} for {} (agent_id: {}) with http code: {}", host_url, (host_url.back() == '/') ? "" : "/", upath, (ts < 0) ? getCurrentTimestamp() : ts, userAgent, httpCode);
770}
771
772} // namespace o2::ccdb
struct uv_timer_s uv_timer_t
struct uv_handle_s uv_handle_t
struct uv_poll_s uv_poll_t
struct uv_loop_s uv_loop_t
int32_t i
#define O2_SIGNPOST_EVENT_EMIT_ERROR(log, id, name, format,...)
Definition Signpost.h:536
#define O2_DECLARE_DYNAMIC_STACKTRACE_LOG(name)
For the moment we do not support logs with a stacktrace.
Definition Signpost.h:485
#define O2_SIGNPOST_ID_GENERATE(name, log)
Definition Signpost.h:500
#define O2_SIGNPOST_EVENT_EMIT_WARN(log, id, name, format,...)
Definition Signpost.h:546
std::ostringstream debug
void setRequestTimeoutTime(int timeoutMS)
CURLcode perform(CURL *handle)
void asynchSchedule(CURL *handle, size_t *requestCounter)
std::string trimHostUrl(std::string full_host_url) const
void setKeepaliveTimeoutTime(int timeoutMS)
std::vector< CURLcode > batchBlockingPerform(std::vector< CURL * > const &handleVector)
void setConnectionTimeoutTime(int timeoutMS)
void setHappyEyeballsHeadstartTime(int headstartMS)
std::string prepareLogMessage(std::string host_url, std::string userAgent, const std::string &path, long ts, const std::map< std::string, std::string > *headers, long httpCode) const
std::unordered_map< uv_handle_t *, bool > mHandleMap
void setMaxParallelConnections(int limit)
CCDBDownloader(uv_loop_t *uv_loop=nullptr)
GLuint const GLint * locations
Definition glcorearb.h:5740
GLuint GLuint64EXT address
Definition glcorearb.h:5846
GLenum GLint * range
Definition glcorearb.h:1899
GLboolean * data
Definition glcorearb.h:298
GLbitfield flags
Definition glcorearb.h:1570
GLsizei const GLchar *const * path
Definition glcorearb.h:3591
GLuint GLsizei const GLchar * message
Definition glcorearb.h:2517
GLbitfield GLuint64 timeout
Definition glcorearb.h:1573
GLenum GLuint GLenum GLsizei const GLchar * buf
Definition glcorearb.h:2514
information complementary to a CCDB object (path, metadata, startTimeValidity, endTimeValidity etc)
void curlEasyErrorCheck(CURLcode code)
std::string uniqueAgentID()
void uvErrorCheck(int code, DownloaderErrorLevel level)
struct o2::ccdb::DownloaderRequestData DownloaderRequestData
long getCurrentTimestamp()
returns the timestamp in long corresponding to "now"
void curlMultiErrorCheck(CURLMcode code)
void closeHandles(uv_handle_t *handle, void *arg)
curl_socket_t opensocketCallback(void *clientp, curlsocktype purpose, struct curl_sockaddr *address)
void onUVClose(uv_handle_t *handle)
Polygon< T > close(Polygon< T > polygon)
Definition Polygon.h:126
std::string to_string(gsl::span< T, Size > span)
Definition common.h:52
std::vector< std::string > hosts
std::multimap< std::string, std::string > header
static std::string getRandomString(int length)
LOG(info)<< "Compressed in "<< sw.CpuTime()<< " s"