Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/libstore/filetransfer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
#include <thread>
#include <regex>

using namespace std::string_literals;

namespace nix {

const unsigned int RETRY_TIME_MS_DEFAULT = 250;
Expand Down Expand Up @@ -77,8 +75,9 @@ struct curlFileTransfer : public FileTransfer
CURL * req = 0;
// buffer to accompany the `req` above
char errbuf[CURL_ERROR_SIZE];
bool active = false; // whether the handle has been added to the multi object
bool paused = false; // whether the request has been paused previously
bool active = false; // whether the handle has been added to the multi object
bool paused = false; // whether the request has been paused previously
bool enqueued = false; // whether the request has been added the incoming queue
std::string statusMsg;

unsigned int attempt = 0;
Expand Down Expand Up @@ -176,7 +175,7 @@ struct curlFileTransfer : public FileTransfer
curl_easy_cleanup(req);
}
try {
if (!done)
if (!done && enqueued)
fail(FileTransferError(
Interrupted, {}, "%s of '%s' was interrupted", Uncolored(request.noun()), request.uri));
} catch (...) {
Expand Down Expand Up @@ -887,6 +886,7 @@ struct curlFileTransfer : public FileTransfer
if (state->isQuitting())
throw nix::Error("cannot enqueue download request because the download thread is shutting down");
state->incoming.push(item);
item->enqueued = true; /* Now any exceptions should be reported via the callback. */
}

wakeupMulti();
Expand Down
32 changes: 9 additions & 23 deletions src/libstore/misc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ void Store::computeFSClosure(
bool includeOutputs,
bool includeDerivers)
{
std::function<std::set<StorePath>(const StorePath & path, std::future<ref<const ValidPathInfo>> &)> queryDeps;
std::function<asio::awaitable<StorePathSet>(const StorePath & path)> queryDeps;
if (flipDirection)
queryDeps = [&](const StorePath & path, std::future<ref<const ValidPathInfo>> & fut) {
queryDeps = [this, includeOutputs, includeDerivers](const StorePath & path) -> asio::awaitable<StorePathSet> {
StorePathSet res;
StorePathSet referrers;
queryReferrers(path, referrers);
Expand All @@ -41,12 +41,14 @@ void Store::computeFSClosure(
for (auto & [_, maybeOutPath] : queryPartialDerivationOutputMap(path))
if (maybeOutPath && isValidPath(*maybeOutPath))
res.insert(*maybeOutPath);
return res;
co_return res;
};
else
queryDeps = [&](const StorePath & path, std::future<ref<const ValidPathInfo>> & fut) {
queryDeps = [this, includeOutputs, includeDerivers](const StorePath & path) -> asio::awaitable<StorePathSet> {
StorePathSet res;
auto info = fut.get();
auto info = co_await callbackToAwaitable<ref<const ValidPathInfo>>(
[this, path](Callback<ref<const ValidPathInfo>> cb) { queryPathInfo(path, std::move(cb)); });
Comment on lines +49 to +50
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The special sauce that makes this work is this. The callback is invoked on the libcurl thread and the processing gets suspended until it's marshalled to the executor. This only works this way for the binary cache store at the moment, all other stores implement this callback synchronously


for (auto & ref : info->references)
if (ref != path)
res.insert(ref);
Expand All @@ -58,25 +60,9 @@ void Store::computeFSClosure(

if (includeDerivers && info->deriver && isValidPath(*info->deriver))
res.insert(*info->deriver);
return res;
co_return res;
};

computeClosure<StorePath>(
startPaths,
paths_,
[&](const StorePath & path, std::function<void(std::promise<std::set<StorePath>> &)> processEdges) {
std::promise<std::set<StorePath>> promise;
std::function<void(std::future<ref<const ValidPathInfo>>)> getDependencies =
[&](std::future<ref<const ValidPathInfo>> fut) {
try {
promise.set_value(queryDeps(path, fut));
} catch (...) {
promise.set_exception(std::current_exception());
}
};
queryPathInfo(path, getDependencies);
processEdges(promise);
});
computeClosure<StorePath>(startPaths, paths_, queryDeps);
}

void Store::computeFSClosure(
Expand Down
32 changes: 3 additions & 29 deletions src/libutil-tests/closure.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ TEST(closure, correctClosure)
set<string> aClosure;
set<string> expectedClosure = {"A", "B", "C", "F", "G"};
computeClosure<string>(
{"A"}, aClosure, [&](const string currentNode, function<void(promise<set<string>> &)> processEdges) {
promise<set<string>> promisedNodes;
promisedNodes.set_value(testGraph[currentNode]);
processEdges(promisedNodes);
{"A"}, aClosure, [&](const std::string & currentNode) -> asio::awaitable<std::set<std::string>> {
co_return testGraph[currentNode];
});

ASSERT_EQ(aClosure, expectedClosure);
Expand All @@ -37,31 +35,7 @@ TEST(closure, properlyHandlesDirectExceptions)
set<string> aClosure;
EXPECT_THROW(
computeClosure<string>(
{"A"},
aClosure,
[&](const string currentNode, function<void(promise<set<string>> &)> processEdges) { throw TestExn(); }),
TestExn);
}

TEST(closure, properlyHandlesExceptionsInPromise)
{
struct TestExn
{};

set<string> aClosure;
EXPECT_THROW(
computeClosure<string>(
{"A"},
aClosure,
[&](const string currentNode, function<void(promise<set<string>> &)> processEdges) {
promise<set<string>> promise;
try {
throw TestExn();
} catch (...) {
promise.set_exception(std::current_exception());
}
processEdges(promise);
}),
{"A"}, aClosure, [&](const std::string &) -> asio::awaitable<std::set<std::string>> { throw TestExn(); }),
TestExn);
}

Expand Down
68 changes: 68 additions & 0 deletions src/libutil/include/nix/util/async.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#pragma once
///@file

#include "nix/util/callback.hh"
#include "nix/util/ref.hh"
#include "nix/util/signals.hh"

#include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/associated_cancellation_slot.hpp>

#include <concepts>

namespace nix {

namespace asio = boost::asio;

template<typename T, std::invocable<Callback<T>> F, typename CompletionToken>
auto callbackToAwaitable(F && initiate, CompletionToken && token)
{
return asio::async_initiate<CompletionToken, void(std::future<T>)>(
[initiate = std::forward<F>(initiate)](auto handler) mutable {
auto executor = asio::get_associated_executor(handler);
auto done = std::make_shared<std::atomic<bool>>(false);
auto h = std::make_shared<decltype(handler)>(std::move(handler));

if (auto slot = asio::get_associated_cancellation_slot(*h); slot.is_connected()) {
std::weak_ptr wh = h; /* To handle the cyclic ownership. */
std::weak_ptr wdone = done;
slot.assign([executor, wh, wdone](asio::cancellation_type /*don't care*/) {
auto h = wh.lock();
auto done = wdone.lock();
if (!h || !done || done->exchange(true))
return; /* Gracefully die. */
/* Doesn't need to be kept alive for get_future() since it shares the ownership. */
std::promise<T> p;
p.set_exception(std::make_exception_ptr(Interrupted("interrupted by user")));
asio::post(executor, [h, fut = p.get_future()]() mutable { std::move (*h)(std::move(fut)); });
});
}

initiate(Callback<T>([executor, done, h](std::future<T> fut) mutable {
if (done->exchange(true))
/* Early return for cooperative cancellation. The callback has been caller
later than we've been cancelled. In practice we'll get an error, the handler
has already been posted by the cancellation handler. */
return;
asio::post(executor, [h, fut = std::move(fut)]() mutable { std::move (*h)(std::move(fut)); });
}));
},
std::forward<CompletionToken>(token));
}

/**
* Convert a completion handler callback into a stackless coroutine. The
* callback can be invoked on any thread and the completion handler will be
* marshalled to the coroutines executer.
*/
template<typename T, std::invocable<Callback<T>> F>
asio::awaitable<T> callbackToAwaitable(F && initiate)
{
auto fut = co_await callbackToAwaitable<T>(std::forward<F>(initiate), asio::use_awaitable);
co_return fut.get();
}

} // namespace nix
Loading
Loading