-
Notifications
You must be signed in to change notification settings - Fork 8
Limit the number of active curl handles #315
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -53,7 +53,7 @@ struct curlFileTransfer : public FileTransfer | |
| curlFileTransfer & fileTransfer; | ||
| FileTransferRequest request; | ||
| FileTransferResult result; | ||
| Activity act; | ||
| std::unique_ptr<Activity> _act; | ||
| bool done = false; // whether either the success or failure function has been called | ||
| Callback<FileTransferResult> callback; | ||
| CURL * req = 0; | ||
|
|
@@ -98,12 +98,6 @@ struct curlFileTransfer : public FileTransfer | |
| Callback<FileTransferResult> && callback) | ||
| : fileTransfer(fileTransfer) | ||
| , request(request) | ||
| , act(*logger, | ||
| lvlTalkative, | ||
| actFileTransfer, | ||
| fmt("%s '%s'", request.verb(/*continuous=*/true), request.uri), | ||
| {request.uri.to_string()}, | ||
| request.parentAct) | ||
| , callback(std::move(callback)) | ||
| , finalSink([this](std::string_view data) { | ||
| if (errorSink) { | ||
|
|
@@ -301,9 +295,29 @@ struct curlFileTransfer : public FileTransfer | |
| return ((TransferItem *) userp)->headerCallback(contents, size, nmemb); | ||
| } | ||
|
|
||
| /** | ||
| * Lazily start an `Activity`. We don't do this in the `TransferItem` constructor to avoid showing downloads | ||
| * that are only enqueued but not actually started. | ||
| */ | ||
| Activity & act() | ||
| { | ||
| if (!_act) { | ||
| _act = std::make_unique<Activity>( | ||
| *logger, | ||
| lvlTalkative, | ||
| actFileTransfer, | ||
| fmt("%s '%s'", request.verb(/*continuous=*/true), request.uri), | ||
| Logger::Fields{request.uri.to_string()}, | ||
| request.parentAct); | ||
| // Reset the start time to when we actually started the download. | ||
| startTime = std::chrono::steady_clock::now(); | ||
| } | ||
| return *_act; | ||
| } | ||
|
|
||
| int progressCallback(curl_off_t dltotal, curl_off_t dlnow) noexcept | ||
| try { | ||
| act.progress(dlnow, dltotal); | ||
| act().progress(dlnow, dltotal); | ||
| return getInterrupted(); | ||
| } catch (nix::Interrupted &) { | ||
| assert(getInterrupted()); | ||
|
|
@@ -380,6 +394,15 @@ struct curlFileTransfer : public FileTransfer | |
| return ((TransferItem *) clientp)->seekCallback(offset, origin); | ||
| } | ||
|
|
||
| static int resolverCallbackWrapper(void *, void *, void * clientp) noexcept | ||
| try { | ||
| // Create the `Activity` associated with this download. | ||
| ((TransferItem *) clientp)->act(); | ||
| return 0; | ||
| } catch (...) { | ||
| return 1; | ||
| } | ||
|
|
||
| void unpause() | ||
| { | ||
| /* Unpausing an already unpaused transfer is a no-op. */ | ||
|
|
@@ -497,6 +520,11 @@ struct curlFileTransfer : public FileTransfer | |
| } | ||
| #endif | ||
|
|
||
| // This seems to be the earliest libcurl callback that signals that the download is happening, so we can | ||
| // call act(). | ||
| curl_easy_setopt(req, CURLOPT_RESOLVER_START_FUNCTION, resolverCallbackWrapper); | ||
| curl_easy_setopt(req, CURLOPT_RESOLVER_START_DATA, this); | ||
|
Comment on lines
+523
to
+526
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🌐 Web query:
💡 Result: CURLOPT_RESOLVER_START_FUNCTION was added in libcurl 7.59.0 (released March 14, 2018). [1][2] Sources: 🏁 Script executed: # First, let's examine the file at the specified lines and context
cd src/libstore && wc -l filetransfer.ccRepository: DeterminateSystems/nix-src Length of output: 90 🏁 Script executed: # Get lines 521-524 and surrounding context
sed -n '510,530p' src/libstore/filetransfer.ccRepository: DeterminateSystems/nix-src Length of output: 972 🏁 Script executed: # Check the version guards mentioned (lines 435-443 and 745-747)
sed -n '430,450p' src/libstore/filetransfer.ccRepository: DeterminateSystems/nix-src Length of output: 1223 🏁 Script executed: # Check lines 745-747
sed -n '740,755p' src/libstore/filetransfer.ccRepository: DeterminateSystems/nix-src Length of output: 689 🏁 Script executed: # Search for CURL_VERSION_* guards in the file to understand the pattern
rg -n "CURL_VERSION|HAVE_LIBCURL" src/libstore/filetransfer.cc | head -20Repository: DeterminateSystems/nix-src Length of output: 570 🏁 Script executed: # Check project configuration for minimum libcurl version
find . -type f \( -name "*.nix" -o -name "CMakeLists.txt" -o -name "meson.build" -o -name "configure.ac" \) | xargs grep -l libcurl 2>/dev/null | head -5Repository: DeterminateSystems/nix-src Length of output: 119 Add version guard for
🤖 Prompt for AI Agents |
||
|
|
||
| result.data.clear(); | ||
| result.bodySize = 0; | ||
| } | ||
|
|
@@ -545,7 +573,7 @@ struct curlFileTransfer : public FileTransfer | |
| if (httpStatus == 304 && result.etag == "") | ||
| result.etag = request.expectedETag; | ||
|
|
||
| act.progress(result.bodySize, result.bodySize); | ||
| act().progress(result.bodySize, result.bodySize); | ||
| done = true; | ||
| callback(std::move(result)); | ||
| } | ||
|
|
@@ -703,6 +731,8 @@ struct curlFileTransfer : public FileTransfer | |
|
|
||
| std::thread workerThread; | ||
|
|
||
| const size_t maxQueueSize = fileTransferSettings.httpConnections.get() * 5; | ||
|
|
||
| curlFileTransfer() | ||
| : mt19937(rd()) | ||
| { | ||
|
|
@@ -832,6 +862,13 @@ struct curlFileTransfer : public FileTransfer | |
| { | ||
| auto state(state_.lock()); | ||
| while (!state->incoming.empty()) { | ||
| /* Limit the number of active curl handles, since curl doesn't scale well. */ | ||
| if (items.size() + incoming.size() >= maxQueueSize) { | ||
| auto t = now + std::chrono::milliseconds(100); | ||
| if (nextWakeup == std::chrono::steady_clock::time_point() || t < nextWakeup) | ||
| nextWakeup = t; | ||
| break; | ||
| } | ||
| auto item = state->incoming.top(); | ||
| if (item->embargo <= now) { | ||
| incoming.push_back(item); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.