Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 13 additions & 19 deletions include/mp/proxy-io.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ struct ServerInvokeContext : InvokeContext
int req;

ServerInvokeContext(ProxyServer& proxy_server, CallContext& call_context, int req)
: InvokeContext{*proxy_server.m_connection}, proxy_server{proxy_server}, call_context{call_context}, req{req}
: InvokeContext{proxy_server.m_connection}, proxy_server{proxy_server}, call_context{call_context}, req{req}
{
}
};
Expand Down Expand Up @@ -207,9 +207,6 @@ class EventLoop
LogFn m_log_fn;
};

void AddClient(EventLoop& loop);
void RemoveClient(EventLoop& loop);

//! Single element task queue used to handle recursive capnp calls. (If server
//! makes an callback into the client in the middle of a request, while client
//! thread is blocked waiting for server response, this is what allows the
Expand Down Expand Up @@ -263,15 +260,13 @@ struct Waiter
class Connection
{
public:
Connection(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_, bool add_client)
Connection(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_)
: m_loop(loop), m_stream(kj::mv(stream_)),
m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
m_rpc_system(::capnp::makeRpcClient(m_network))
{
if (add_client) {
std::unique_lock<std::mutex> lock(m_loop.m_mutex);
m_loop.addClient(lock);
}
std::unique_lock<std::mutex> lock(m_loop.m_mutex);
m_loop.addClient(lock);
}
Connection(EventLoop& loop,
kj::Own<kj::AsyncIoStream>&& stream_,
Expand Down Expand Up @@ -381,7 +376,7 @@ ProxyClientBase<Interface, Impl>::~ProxyClientBase() noexcept
// down while external code is still holding client references.
//
// The first case is handled here in destructor when m_loop is not null. The
// second case is handled by the m_cleanup function, which sets m_loop to
// second case is handled by the m_cleanup function, which sets m_connection to
// null so nothing happens here.
if (m_connection) {
// Remove m_cleanup callback so it doesn't run and try to access
Expand Down Expand Up @@ -412,10 +407,11 @@ ProxyClientBase<Interface, Impl>::~ProxyClientBase() noexcept

template <typename Interface, typename Impl>
ProxyServerBase<Interface, Impl>::ProxyServerBase(Impl* impl, bool owned, Connection& connection)
: m_impl(impl), m_owned(owned), m_connection(&connection)
: m_impl(impl), m_owned(owned), m_connection(connection)
{
assert(impl != nullptr);
AddClient(connection.m_loop);
std::unique_lock<std::mutex> lock(m_connection.m_loop.m_mutex);
m_connection.m_loop.addClient(lock);
}

template <typename Interface, typename Impl>
Expand All @@ -428,12 +424,13 @@ ProxyServerBase<Interface, Impl>::~ProxyServerBase()
// (event loop) thread since destructors could be making IPC calls or
// doing expensive cleanup.
if (m_owned) {
m_connection->addAsyncCleanup([impl] { delete impl; });
m_connection.addAsyncCleanup([impl] { delete impl; });
}
m_impl = nullptr;
m_owned = false;
}
RemoveClient(m_connection->m_loop); // FIXME: Broken when connection is null?
std::unique_lock<std::mutex> lock(m_connection.m_loop.m_mutex);
m_connection.m_loop.removeClient(lock);
}

template <typename Interface, typename Impl>
Expand Down Expand Up @@ -479,14 +476,14 @@ struct ThreadContext
//! over the stream. Also create a new Connection object embedded in the
//! client that is freed when the client is closed.
template <typename InitInterface>
std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int fd, bool add_client)
std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int fd)
{
typename InitInterface::Client init_client(nullptr);
std::unique_ptr<Connection> connection;
loop.sync([&] {
auto stream =
loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
connection = std::make_unique<Connection>(loop, kj::mv(stream), add_client);
connection = std::make_unique<Connection>(loop, kj::mv(stream));
init_client = connection->m_rpc_system.bootstrap(ServerVatId().vat_id).castAs<InitInterface>();
Connection* connection_ptr = connection.get();
connection->onDisconnect([&loop, connection_ptr] {
Expand All @@ -507,9 +504,6 @@ void ServeStream(EventLoop& loop,
kj::Own<kj::AsyncIoStream>&& stream,
std::function<capnp::Capability::Client(Connection&)> make_server);

//! Same as above but accept file descriptor rather than stream object.
void ServeStream(EventLoop& loop, int fd, std::function<capnp::Capability::Client(Connection&)> make_server);

extern thread_local ThreadContext g_thread_context;

} // namespace mp
Expand Down
32 changes: 16 additions & 16 deletions include/mp/proxy-types.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,12 @@ auto PassField(TypeList<>, ServerContext& server_context, const Fn& fn, const Ar
ServerContext server_context{server, call_context, req};
{
auto& request_threads = g_thread_context.request_threads;
auto request_thread = request_threads.find(server.m_connection);
auto request_thread = request_threads.find(&server.m_connection);
if (request_thread == request_threads.end()) {
request_thread =
g_thread_context.request_threads
.emplace(std::piecewise_construct, std::forward_as_tuple(server.m_connection),
std::forward_as_tuple(context_arg.getCallbackThread(), server.m_connection,
.emplace(std::piecewise_construct, std::forward_as_tuple(&server.m_connection),
std::forward_as_tuple(context_arg.getCallbackThread(), &server.m_connection,
/* destroy_connection= */ false))
.first;
} else {
Expand All @@ -139,33 +139,33 @@ auto PassField(TypeList<>, ServerContext& server_context, const Fn& fn, const Ar
fn.invoke(server_context, args...);
}
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
server.m_connection->m_loop.sync([&] {
server.m_connection.m_loop.sync([&] {
auto fulfiller_dispose = kj::mv(fulfiller);
fulfiller_dispose->fulfill(kj::mv(call_context));
});
}))
{
server.m_connection->m_loop.sync([&]() {
server.m_connection.m_loop.sync([&]() {
auto fulfiller_dispose = kj::mv(fulfiller);
fulfiller_dispose->reject(kj::mv(*exception));
});
}
})));

auto thread_client = context_arg.getThread();
return JoinPromises(server.m_connection->m_threads.getLocalServer(thread_client)
return JoinPromises(server.m_connection.m_threads.getLocalServer(thread_client)
.then([&server, invoke, req](kj::Maybe<Thread::Server&> perhaps) {
KJ_IF_MAYBE(thread_server, perhaps)
{
const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
server.m_connection->m_loop.log() << "IPC server post request #" << req << " {"
<< thread.m_thread_context.thread_name << "}";
server.m_connection.m_loop.log() << "IPC server post request #" << req << " {"
<< thread.m_thread_context.thread_name << "}";
thread.m_thread_context.waiter->post(std::move(invoke));
}
else
{
server.m_connection->m_loop.log() << "IPC server error request #" << req
<< ", missing thread to execute request";
server.m_connection.m_loop.log() << "IPC server error request #" << req
<< ", missing thread to execute request";
throw std::runtime_error("invalid thread handle");
}
}),
Expand Down Expand Up @@ -1327,7 +1327,7 @@ void clientDestroy(Client& client)
template <typename Server>
void serverDestroy(Server& server)
{
server.m_connection->m_loop.log() << "IPC server destroy" << typeid(server).name();
server.m_connection.m_loop.log() << "IPC server destroy" << typeid(server).name();
}

template <typename ProxyClient, typename GetRequest, typename... FieldObjs>
Expand Down Expand Up @@ -1418,8 +1418,8 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
using Results = typename decltype(call_context.getResults())::Builds;

int req = ++server_reqs;
server.m_connection->m_loop.log() << "IPC server recv request #" << req << " "
<< TypeName<typename Params::Reads>() << " " << LogEscape(params.toString());
server.m_connection.m_loop.log() << "IPC server recv request #" << req << " "
<< TypeName<typename Params::Reads>() << " " << LogEscape(params.toString());

try {
using ServerContext = ServerInvokeContext<Server, CallContext>;
Expand All @@ -1428,11 +1428,11 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
return ReplaceVoid([&]() { return fn.invoke(server_context, ArgList()); },
[&]() { return kj::Promise<CallContext>(kj::mv(call_context)); })
.then([&server, req](CallContext call_context) {
server.m_connection->m_loop.log() << "IPC server send response #" << req << " " << TypeName<Results>()
<< " " << LogEscape(call_context.getResults().toString());
server.m_connection.m_loop.log() << "IPC server send response #" << req << " " << TypeName<Results>()
<< " " << LogEscape(call_context.getResults().toString());
});
} catch (...) {
server.m_connection->m_loop.log()
server.m_connection.m_loop.log()
<< "IPC server unhandled exception " << boost::current_exception_diagnostic_information();
throw;
}
Expand Down
6 changes: 1 addition & 5 deletions include/mp/proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,7 @@ struct ProxyServerBase : public virtual Interface_::Server
* appropriate times depending on semantics of the particular method being
* wrapped. */
bool m_owned;
/**
* Connection is a pointer rather than a reference because for the Init
* server, the server object needs to be created before the connection.
*/
Connection* m_connection;
Connection& m_connection;
};

//! Customizable (through template specialization) base class used in generated ProxyServer implementations from
Expand Down
11 changes: 0 additions & 11 deletions src/mp/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,17 +236,6 @@ void EventLoop::startAsyncThread(std::unique_lock<std::mutex>& lock)
}
}

void AddClient(EventLoop& loop)
{
std::unique_lock<std::mutex> lock(loop.m_mutex);
loop.addClient(lock);
}
void RemoveClient(EventLoop& loop)
{
std::unique_lock<std::mutex> lock(loop.m_mutex);
loop.removeClient(lock);
}

ProxyServer<Thread>::ProxyServer(ThreadContext& thread_context, std::thread&& thread)
: m_thread_context(thread_context), m_thread(std::move(thread))
{
Expand Down
2 changes: 1 addition & 1 deletion src/mp/test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ KJ_TEST("Call FooInterface methods")
EventLoop loop("mptest", [](bool raise, const std::string& log) {});
auto pipe = loop.m_io_context.provider->newTwoWayPipe();

auto connection_client = std::make_unique<Connection>(loop, kj::mv(pipe.ends[0]), true);
auto connection_client = std::make_unique<Connection>(loop, kj::mv(pipe.ends[0]));
auto foo_client = std::make_unique<ProxyClient<messages::FooInterface>>(
connection_client->m_rpc_system.bootstrap(ServerVatId().vat_id).castAs<messages::FooInterface>(),
connection_client.get(), /* destroy_connection= */ false);
Expand Down