Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
strategy:
fail-fast: false
matrix:
config: [default, llvm, gnu32]
config: [default, llvm, gnu32, sanitize]

name: build • ${{ matrix.config }}

Expand Down
7 changes: 4 additions & 3 deletions ci/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ All CI is just bash and nix.
To run jobs locally:

```bash
CI_CONFIG=ci/configs/default.bash ci/scripts/run.sh
CI_CONFIG=ci/configs/llvm.bash ci/scripts/run.sh
CI_CONFIG=ci/configs/gnu32.bash ci/scripts/run.sh
CI_CONFIG=ci/configs/default.bash ci/scripts/run.sh
CI_CONFIG=ci/configs/llvm.bash ci/scripts/run.sh
CI_CONFIG=ci/configs/gnu32.bash ci/scripts/run.sh
CI_CONFIG=ci/configs/sanitize.bash ci/scripts/run.sh
```

By default CI jobs will reuse their build directories. `CI_CLEAN=1` can be specified to delete them before running instead.
7 changes: 7 additions & 0 deletions ci/configs/sanitize.bash
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CI_DESC="CI job running ThreadSanitizer"
CI_DIR=build-sanitize
export CXX=clang++
export CXXFLAGS="-ggdb -Werror -Wall -Wextra -Wpedantic -Wthread-safety-analysis -Wno-unused-parameter -fsanitize=thread"
CMAKE_ARGS=()
BUILD_ARGS=(-k -j4)
BUILD_TARGETS=(mptest)
5 changes: 4 additions & 1 deletion ci/scripts/ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ set -o errexit -o nounset -o pipefail -o xtrace
[ "${CI_CONFIG+x}" ] && source "$CI_CONFIG"

: "${CI_DIR:=build}"
if ! [ -v BUILD_TARGETS ]; then
BUILD_TARGETS=(all tests mpexamples)
fi

[ -n "${CI_CLEAN-}" ] && rm -rf "${CI_DIR}"

cmake -B "$CI_DIR" "${CMAKE_ARGS[@]+"${CMAKE_ARGS[@]}"}"
cmake --build "$CI_DIR" -t all tests mpexamples -- "${BUILD_ARGS[@]+"${BUILD_ARGS[@]}"}"
cmake --build "$CI_DIR" -t "${BUILD_TARGETS[@]}" -- "${BUILD_ARGS[@]+"${BUILD_ARGS[@]}"}"
ctest --test-dir "$CI_DIR" --output-on-failure
52 changes: 33 additions & 19 deletions include/mp/proxy-io.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,18 @@ struct ProxyClient<Thread> : public ProxyClientBase<Thread, ::capnp::Void>
ProxyClient(const ProxyClient&) = delete;
~ProxyClient();

void setCleanup(const std::function<void()>& fn);

//! Cleanup function to run when the connection is closed. If the Connection
//! gets destroyed before this ProxyClient<Thread> object, this cleanup
//! callback lets it destroy this object and remove its entry in the
//! thread's request_threads or callback_threads map (after resetting
//! m_cleanup_it so the destructor does not try to access it). But if this
//! object gets destroyed before the Connection, there's no need to run the
//! cleanup function and the destructor will unregister it.
std::optional<CleanupIt> m_cleanup_it;
void setDisconnectCallback(const std::function<void()>& fn);

//! Reference to callback function that is run if there is a sudden
//! disconnect and the Connection object is destroyed before this
//! ProxyClient<Thread> object. The callback will destroy this object and
//! remove its entry from the thread's request_threads or callback_threads
//! map. It will also reset m_disconnect_cb so the destructor does not
//! access it. In the normal case where there is no sudden disconnect, the
//! destructor will unregister m_disconnect_cb so the callback is never run.
//! Since this variable is accessed from multiple threads, accesses should
//! be guarded with the associated Waiter::m_mutex.
std::optional<CleanupIt> m_disconnect_cb;
};

template <>
Expand Down Expand Up @@ -298,6 +300,13 @@ struct Waiter
});
}

//! Mutex mainly used internally by waiter class, but also used externally
//! to guard access to related state. Specifically, since the thread_local
//! ThreadContext struct owns a Waiter, the Waiter::m_mutex is used to guard
//! access to other parts of the struct to avoid needing to deal with more
//! mutexes than necessary. This mutex can be held at the same time as
//! EventLoop::m_mutex as long as Waiter::mutex is locked first and
//! EventLoop::m_mutex is locked second.
std::mutex m_mutex;
std::condition_variable m_cv;
std::optional<kj::Function<void()>> m_fn;
Expand Down Expand Up @@ -393,11 +402,12 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli

{
// Handler for the connection getting destroyed before this client object.
auto cleanup_it = m_context.connection->addSyncCleanup([this]() {
auto disconnect_cb = m_context.connection->addSyncCleanup([this]() {
// Release client capability by move-assigning to temporary.
{
typename Interface::Client(std::move(m_client));
}
Lock lock{m_context.loop->m_mutex};
m_context.connection = nullptr;
});

Expand All @@ -410,14 +420,10 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli
// down while external code is still holding client references.
//
// The first case is handled here when m_context.connection is not null. The
// second case is handled by the cleanup function, which sets m_context.connection to
// null so nothing happens here.
m_context.cleanup_fns.emplace_front([this, destroy_connection, cleanup_it]{
if (m_context.connection) {
// Remove cleanup callback so it doesn't run and try to access
// this object after it's already destroyed.
m_context.connection->removeSyncCleanup(cleanup_it);

// second case is handled by the disconnect_cb function, which sets
// m_context.connection to null so nothing happens here.
m_context.cleanup_fns.emplace_front([this, destroy_connection, disconnect_cb]{
{
// If the capnp interface defines a destroy method, call it to destroy
// the remote object, waiting for it to be deleted server side. If the
// capnp interface does not define a destroy method, this will just call
Expand All @@ -426,6 +432,14 @@ ProxyClientBase<Interface, Impl>::ProxyClientBase(typename Interface::Client cli

// FIXME: Could just invoke removed addCleanup fn here instead of duplicating code
m_context.loop->sync([&]() {
// Remove disconnect callback on cleanup so it doesn't run and try
// to access this object after it's destroyed. This call needs to
// run inside loop->sync() on the event loop thread because
// otherwise, if there were an ill-timed disconnect, the
// onDisconnect handler could fire and delete the Connection object
// before the removeSyncCleanup call.
if (m_context.connection) m_context.connection->removeSyncCleanup(disconnect_cb);

// Release client capability by move-assigning to temporary.
{
typename Interface::Client(std::move(m_client));
Expand Down
30 changes: 16 additions & 14 deletions include/mp/proxy-types.h
Original file line number Diff line number Diff line change
Expand Up @@ -609,59 +609,61 @@ void clientInvoke(ProxyClient& proxy_client, const GetRequest& get_request, Fiel
<< "{" << g_thread_context.thread_name
<< "} IPC client first request from current thread, constructing waiter";
}
ClientInvokeContext invoke_context{*proxy_client.m_context.connection, g_thread_context};
ThreadContext& thread_context{g_thread_context};
std::optional<ClientInvokeContext> invoke_context; // Must outlive waiter->wait() call below
std::exception_ptr exception;
std::string kj_exception;
bool done = false;
const char* disconnected = nullptr;
proxy_client.m_context.loop->sync([&]() {
if (!proxy_client.m_context.connection) {
const std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
done = true;
disconnected = "IPC client method called after disconnect.";
invoke_context.thread_context.waiter->m_cv.notify_all();
thread_context.waiter->m_cv.notify_all();
return;
}

auto request = (proxy_client.m_client.*get_request)(nullptr);
using Request = CapRequestTraits<decltype(request)>;
using FieldList = typename ProxyClientMethodTraits<typename Request::Params>::Fields;
IterateFields().handleChain(invoke_context, request, FieldList(), typename FieldObjs::BuildParams{&fields}...);
invoke_context.emplace(*proxy_client.m_context.connection, thread_context);
IterateFields().handleChain(*invoke_context, request, FieldList(), typename FieldObjs::BuildParams{&fields}...);
proxy_client.m_context.loop->logPlain()
<< "{" << invoke_context.thread_context.thread_name << "} IPC client send "
<< "{" << thread_context.thread_name << "} IPC client send "
<< TypeName<typename Request::Params>() << " " << LogEscape(request.toString());

proxy_client.m_context.loop->m_task_set->add(request.send().then(
[&](::capnp::Response<typename Request::Results>&& response) {
proxy_client.m_context.loop->logPlain()
<< "{" << invoke_context.thread_context.thread_name << "} IPC client recv "
<< "{" << thread_context.thread_name << "} IPC client recv "
<< TypeName<typename Request::Results>() << " " << LogEscape(response.toString());
try {
IterateFields().handleChain(
invoke_context, response, FieldList(), typename FieldObjs::ReadResults{&fields}...);
*invoke_context, response, FieldList(), typename FieldObjs::ReadResults{&fields}...);
} catch (...) {
exception = std::current_exception();
}
const std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
done = true;
invoke_context.thread_context.waiter->m_cv.notify_all();
thread_context.waiter->m_cv.notify_all();
},
[&](const ::kj::Exception& e) {
if (e.getType() == ::kj::Exception::Type::DISCONNECTED) {
disconnected = "IPC client method call interrupted by disconnect.";
} else {
kj_exception = kj::str("kj::Exception: ", e).cStr();
proxy_client.m_context.loop->logPlain()
<< "{" << invoke_context.thread_context.thread_name << "} IPC client exception " << kj_exception;
<< "{" << thread_context.thread_name << "} IPC client exception " << kj_exception;
}
const std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
const std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
done = true;
invoke_context.thread_context.waiter->m_cv.notify_all();
thread_context.waiter->m_cv.notify_all();
}));
});

std::unique_lock<std::mutex> lock(invoke_context.thread_context.waiter->m_mutex);
invoke_context.thread_context.waiter->wait(lock, [&done]() { return done; });
std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
thread_context.waiter->wait(lock, [&done]() { return done; });
if (exception) std::rethrow_exception(exception);
if (!kj_exception.empty()) proxy_client.m_context.loop->raise() << kj_exception;
if (disconnected) proxy_client.m_context.loop->raise() << disconnected;
Expand Down
19 changes: 2 additions & 17 deletions include/mp/type-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
const auto& params = call_context.getParams();
Context::Reader context_arg = Accessor::get(params);
ServerContext server_context{server, call_context, req};
bool disconnected{false};
{
// Before invoking the function, store a reference to the
// callbackThread provided by the client in the
Expand Down Expand Up @@ -101,7 +100,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
// recursive call (IPC call calling back to the caller which
// makes another IPC call), so avoid modifying the map.
const bool erase_thread{inserted};
KJ_DEFER({
KJ_DEFER(if (erase_thread) {
std::unique_lock<std::mutex> lock(thread_context.waiter->m_mutex);
// Call erase here with a Connection* argument instead
// of an iterator argument, because the `request_thread`
Expand All @@ -112,24 +111,10 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
// erases the thread from the map, and also because the
// ProxyServer<Thread> destructor calls
// request_threads.clear().
if (erase_thread) {
disconnected = !request_threads.erase(server.m_context.connection);
} else {
disconnected = !request_threads.count(server.m_context.connection);
}
request_threads.erase(server.m_context.connection);
});
fn.invoke(server_context, args...);
}
if (disconnected) {
// If disconnected is true, the Connection object was
// destroyed during the method call. Deal with this by
// returning without ever fulfilling the promise, which will
// cause the ProxyServer object to leak. This is not ideal,
// but fixing the leak will require nontrivial code changes
// because there is a lot of code assuming ProxyServer
// objects are destroyed before Connection objects.
return;
}
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
server.m_context.loop->sync([&] {
auto fulfiller_dispose = kj::mv(fulfiller);
Expand Down
24 changes: 13 additions & 11 deletions src/mp/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,11 @@ Connection::~Connection()
// on clean and unclean shutdowns. In unclean shutdown case when the
// connection is broken, sync and async cleanup lists will filled with
// callbacks. In the clean shutdown case both lists will be empty.
Lock lock{m_loop->m_mutex};
while (!m_sync_cleanup_fns.empty()) {
m_sync_cleanup_fns.front()();
m_sync_cleanup_fns.pop_front();
CleanupList fn;
fn.splice(fn.begin(), m_sync_cleanup_fns, m_sync_cleanup_fns.begin());
Unlock(lock, fn.front());
}
}

Expand Down Expand Up @@ -311,18 +313,18 @@ std::tuple<ConnThread, bool> SetThread(ConnThreads& threads, std::mutex& mutex,
thread = threads.emplace(
std::piecewise_construct, std::forward_as_tuple(connection),
std::forward_as_tuple(make_thread(), connection, /* destroy_connection= */ false)).first;
thread->second.setCleanup([&threads, &mutex, thread] {
thread->second.setDisconnectCallback([&threads, &mutex, thread] {
// Note: it is safe to use the `thread` iterator in this cleanup
// function, because the iterator would only be invalid if the map entry
// was removed, and if the map entry is removed the ProxyClient<Thread>
// destructor unregisters the cleanup.

// Connection is being destroyed before thread client is, so reset
// thread client m_cleanup_it member so thread client destructor does not
// try unregister this callback after connection is destroyed.
thread->second.m_cleanup_it.reset();
// thread client m_disconnect_cb member so thread client destructor does not
// try to unregister this callback after connection is destroyed.
// Remove connection pointer about to be destroyed from the map
const std::unique_lock<std::mutex> lock(mutex);
thread->second.m_disconnect_cb.reset();
threads.erase(thread);
});
return {thread, true};
Expand All @@ -333,16 +335,16 @@ ProxyClient<Thread>::~ProxyClient()
// If thread is being destroyed before connection is destroyed, remove the
// cleanup callback that was registered to handle the connection being
// destroyed before the thread being destroyed.
if (m_cleanup_it) {
m_context.connection->removeSyncCleanup(*m_cleanup_it);
if (m_disconnect_cb) {
m_context.connection->removeSyncCleanup(*m_disconnect_cb);
}
}

void ProxyClient<Thread>::setCleanup(const std::function<void()>& fn)
void ProxyClient<Thread>::setDisconnectCallback(const std::function<void()>& fn)
{
assert(fn);
assert(!m_cleanup_it);
m_cleanup_it = m_context.connection->addSyncCleanup(fn);
assert(!m_disconnect_cb);
m_disconnect_cb = m_context.connection->addSyncCleanup(fn);
}

ProxyServer<Thread>::ProxyServer(ThreadContext& thread_context, std::thread&& thread)
Expand Down
11 changes: 9 additions & 2 deletions test/mp/test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ namespace test {
class TestSetup
{
public:
std::thread thread;
std::function<void()> server_disconnect;
std::function<void()> client_disconnect;
std::promise<std::unique_ptr<ProxyClient<messages::FooInterface>>> client_promise;
std::unique_ptr<ProxyClient<messages::FooInterface>> client;
ProxyServer<messages::FooInterface>* server{nullptr};
//! Thread variable should be after other struct members so the thread does
//! not start until the other members are initialized.
std::thread thread;

TestSetup(bool client_owns_connection = true)
: thread{[&] {
Expand Down Expand Up @@ -266,12 +268,12 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call")
// ProxyServer objects associated with the connection. Having an in-progress
// RPC call requires keeping the ProxyServer longer.

std::promise<void> signal;
TestSetup setup{/*client_owns_connection=*/false};
ProxyClient<messages::FooInterface>* foo = setup.client.get();
KJ_EXPECT(foo->add(1, 2) == 3);

foo->initThreadMap();
std::promise<void> signal;
setup.server->m_impl->m_fn = [&] {
EventLoopRef loop{*setup.server->m_context.loop};
setup.client_disconnect();
Expand All @@ -287,6 +289,11 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call")
}
KJ_EXPECT(disconnected);

// Now that the disconnect has been detected, set signal allowing the
// callFnAsync() IPC call to return. Since signalling may not wake up the
// thread right away, it is important for the signal variable to be declared
// *before* the TestSetup variable so is not destroyed while
// signal.get_future().get() is called.
signal.set_value();
}

Expand Down