diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index cd11f19f8ef..ffbc55b3087 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -69,7 +69,7 @@ struct curlFileTransfer : public FileTransfer curlFileTransfer & fileTransfer; FileTransferRequest request; FileTransferResult result; - Activity act; + std::unique_ptr _act; bool done = false; // whether either the success or failure function has been called Callback callback; CURL * req = 0; @@ -124,12 +124,6 @@ struct curlFileTransfer : public FileTransfer Callback && callback) : fileTransfer(fileTransfer) , request(request) - , act(*logger, - lvlTalkative, - actFileTransfer, - fmt("%s '%s'", request.verb(/*continuous=*/true), request.uri), - {request.uri.to_string()}, - request.parentAct) , callback(std::move(callback)) , finalSink([this](std::string_view data) { if (errorSink) { @@ -325,9 +319,29 @@ struct curlFileTransfer : public FileTransfer return ((TransferItem *) userp)->headerCallback(contents, size, nmemb); } + /** + * Lazily start an `Activity`. We don't do this in the `TransferItem` constructor to avoid showing downloads + * that are only enqueued but not actually started. + */ + Activity & act() + { + if (!_act) { + _act = std::make_unique( + *logger, + lvlTalkative, + actFileTransfer, + fmt("%s '%s'", request.verb(/*continuous=*/true), request.uri), + Logger::Fields{request.uri.to_string()}, + request.parentAct); + // Reset the start time to when we actually started the download. + startTime = std::chrono::steady_clock::now(); + } + return *_act; + } + int progressCallback(curl_off_t dltotal, curl_off_t dlnow) noexcept try { - act.progress(dlnow, dltotal); + act().progress(dlnow, dltotal); return getInterrupted(); } catch (nix::Interrupted &) { assert(getInterrupted()); @@ -404,6 +418,13 @@ struct curlFileTransfer : public FileTransfer return ((TransferItem *) clientp)->seekCallback(offset, origin); } + static int resolverCallbackWrapper(void *, void *, void * clientp) noexcept + { + // Create the `Activity` associated with this download. + ((TransferItem *) clientp)->act(); + return 0; + } + void unpause() { /* Unpausing an already unpaused transfer is a no-op. */ @@ -516,6 +537,11 @@ struct curlFileTransfer : public FileTransfer } #endif + // This seems to be the earliest libcurl callback that signals that the download is happening, so we can + // call act(). + curl_easy_setopt(req, CURLOPT_RESOLVER_START_FUNCTION, resolverCallbackWrapper); + curl_easy_setopt(req, CURLOPT_RESOLVER_START_DATA, this); + result.data.clear(); result.bodySize = 0; } @@ -564,7 +590,7 @@ struct curlFileTransfer : public FileTransfer if (httpStatus == 304 && result.etag == "") result.etag = request.expectedETag; - act.progress(result.bodySize, result.bodySize); + act().progress(result.bodySize, result.bodySize); done = true; callback(std::move(result)); } @@ -715,6 +741,8 @@ struct curlFileTransfer : public FileTransfer std::thread workerThread; + const size_t maxQueueSize = fileTransferSettings.httpConnections.get() * 5; + curlFileTransfer() : mt19937(rd()) { @@ -820,6 +848,13 @@ struct curlFileTransfer : public FileTransfer { auto state(state_.lock()); while (!state->incoming.empty()) { + /* Limit the number of active curl handles, since curl doesn't scale well. */ + if (items.size() + incoming.size() >= maxQueueSize) { + auto t = now + std::chrono::milliseconds(100); + if (nextWakeup == std::chrono::steady_clock::time_point() || t < nextWakeup) + nextWakeup = t; + break; + } auto item = state->incoming.top(); if (item->embargo <= now) { incoming.push_back(item);