Skip to content

Commit 2a2549c

Browse files
committed
Merge #11: Replace ProxyServer connection pointer with reference
e7687db Replace ProxyServer connection pointer with reference (Russell Yanofsky) Pull request description: This is just cleanup. Using a pointer instead of reference used to be necessary before the Connection constructor took a make_server option. But now there is no reason to have a pointer and unnecessary null handling. Also, Connection add_client option and AddClient RemoveClient ServeStream wrapper functions are removed. These were originally meant to simplify usages but cause more trouble than they are worth. Top commit has no ACKs. Tree-SHA512: a566d5b7b228a412c6bb9184f24e72a288719ea6f28bdc39bbbfe2c05436fa7f9ef8301988781374db765be846e2a5ad38de73e118115b4bce5dfabc2efc3f9e
2 parents bd8ee26 + e7687db commit 2a2549c

File tree

5 files changed

+31
-52
lines changed

5 files changed

+31
-52
lines changed

include/mp/proxy-io.h

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ struct ServerInvokeContext : InvokeContext
4444
int req;
4545

4646
ServerInvokeContext(ProxyServer& proxy_server, CallContext& call_context, int req)
47-
: InvokeContext{*proxy_server.m_connection}, proxy_server{proxy_server}, call_context{call_context}, req{req}
47+
: InvokeContext{proxy_server.m_connection}, proxy_server{proxy_server}, call_context{call_context}, req{req}
4848
{
4949
}
5050
};
@@ -207,9 +207,6 @@ class EventLoop
207207
LogFn m_log_fn;
208208
};
209209

210-
void AddClient(EventLoop& loop);
211-
void RemoveClient(EventLoop& loop);
212-
213210
//! Single element task queue used to handle recursive capnp calls. (If server
214211
//! makes an callback into the client in the middle of a request, while client
215212
//! thread is blocked waiting for server response, this is what allows the
@@ -263,15 +260,13 @@ struct Waiter
263260
class Connection
264261
{
265262
public:
266-
Connection(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_, bool add_client)
263+
Connection(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream_)
267264
: m_loop(loop), m_stream(kj::mv(stream_)),
268265
m_network(*m_stream, ::capnp::rpc::twoparty::Side::CLIENT, ::capnp::ReaderOptions()),
269266
m_rpc_system(::capnp::makeRpcClient(m_network))
270267
{
271-
if (add_client) {
272-
std::unique_lock<std::mutex> lock(m_loop.m_mutex);
273-
m_loop.addClient(lock);
274-
}
268+
std::unique_lock<std::mutex> lock(m_loop.m_mutex);
269+
m_loop.addClient(lock);
275270
}
276271
Connection(EventLoop& loop,
277272
kj::Own<kj::AsyncIoStream>&& stream_,
@@ -381,7 +376,7 @@ ProxyClientBase<Interface, Impl>::~ProxyClientBase() noexcept
381376
// down while external code is still holding client references.
382377
//
383378
// The first case is handled here in destructor when m_loop is not null. The
384-
// second case is handled by the m_cleanup function, which sets m_loop to
379+
// second case is handled by the m_cleanup function, which sets m_connection to
385380
// null so nothing happens here.
386381
if (m_connection) {
387382
// Remove m_cleanup callback so it doesn't run and try to access
@@ -412,10 +407,11 @@ ProxyClientBase<Interface, Impl>::~ProxyClientBase() noexcept
412407

413408
template <typename Interface, typename Impl>
414409
ProxyServerBase<Interface, Impl>::ProxyServerBase(Impl* impl, bool owned, Connection& connection)
415-
: m_impl(impl), m_owned(owned), m_connection(&connection)
410+
: m_impl(impl), m_owned(owned), m_connection(connection)
416411
{
417412
assert(impl != nullptr);
418-
AddClient(connection.m_loop);
413+
std::unique_lock<std::mutex> lock(m_connection.m_loop.m_mutex);
414+
m_connection.m_loop.addClient(lock);
419415
}
420416

421417
template <typename Interface, typename Impl>
@@ -428,12 +424,13 @@ ProxyServerBase<Interface, Impl>::~ProxyServerBase()
428424
// (event loop) thread since destructors could be making IPC calls or
429425
// doing expensive cleanup.
430426
if (m_owned) {
431-
m_connection->addAsyncCleanup([impl] { delete impl; });
427+
m_connection.addAsyncCleanup([impl] { delete impl; });
432428
}
433429
m_impl = nullptr;
434430
m_owned = false;
435431
}
436-
RemoveClient(m_connection->m_loop); // FIXME: Broken when connection is null?
432+
std::unique_lock<std::mutex> lock(m_connection.m_loop.m_mutex);
433+
m_connection.m_loop.removeClient(lock);
437434
}
438435

439436
template <typename Interface, typename Impl>
@@ -479,14 +476,14 @@ struct ThreadContext
479476
//! over the stream. Also create a new Connection object embedded in the
480477
//! client that is freed when the client is closed.
481478
template <typename InitInterface>
482-
std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int fd, bool add_client)
479+
std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int fd)
483480
{
484481
typename InitInterface::Client init_client(nullptr);
485482
std::unique_ptr<Connection> connection;
486483
loop.sync([&] {
487484
auto stream =
488485
loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP);
489-
connection = std::make_unique<Connection>(loop, kj::mv(stream), add_client);
486+
connection = std::make_unique<Connection>(loop, kj::mv(stream));
490487
init_client = connection->m_rpc_system.bootstrap(ServerVatId().vat_id).castAs<InitInterface>();
491488
Connection* connection_ptr = connection.get();
492489
connection->onDisconnect([&loop, connection_ptr] {
@@ -507,9 +504,6 @@ void ServeStream(EventLoop& loop,
507504
kj::Own<kj::AsyncIoStream>&& stream,
508505
std::function<capnp::Capability::Client(Connection&)> make_server);
509506

510-
//! Same as above but accept file descriptor rather than stream object.
511-
void ServeStream(EventLoop& loop, int fd, std::function<capnp::Capability::Client(Connection&)> make_server);
512-
513507
extern thread_local ThreadContext g_thread_context;
514508

515509
} // namespace mp

include/mp/proxy-types.h

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,12 @@ auto PassField(TypeList<>, ServerContext& server_context, const Fn& fn, const Ar
122122
ServerContext server_context{server, call_context, req};
123123
{
124124
auto& request_threads = g_thread_context.request_threads;
125-
auto request_thread = request_threads.find(server.m_connection);
125+
auto request_thread = request_threads.find(&server.m_connection);
126126
if (request_thread == request_threads.end()) {
127127
request_thread =
128128
g_thread_context.request_threads
129-
.emplace(std::piecewise_construct, std::forward_as_tuple(server.m_connection),
130-
std::forward_as_tuple(context_arg.getCallbackThread(), server.m_connection,
129+
.emplace(std::piecewise_construct, std::forward_as_tuple(&server.m_connection),
130+
std::forward_as_tuple(context_arg.getCallbackThread(), &server.m_connection,
131131
/* destroy_connection= */ false))
132132
.first;
133133
} else {
@@ -139,33 +139,33 @@ auto PassField(TypeList<>, ServerContext& server_context, const Fn& fn, const Ar
139139
fn.invoke(server_context, args...);
140140
}
141141
KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
142-
server.m_connection->m_loop.sync([&] {
142+
server.m_connection.m_loop.sync([&] {
143143
auto fulfiller_dispose = kj::mv(fulfiller);
144144
fulfiller_dispose->fulfill(kj::mv(call_context));
145145
});
146146
}))
147147
{
148-
server.m_connection->m_loop.sync([&]() {
148+
server.m_connection.m_loop.sync([&]() {
149149
auto fulfiller_dispose = kj::mv(fulfiller);
150150
fulfiller_dispose->reject(kj::mv(*exception));
151151
});
152152
}
153153
})));
154154

155155
auto thread_client = context_arg.getThread();
156-
return JoinPromises(server.m_connection->m_threads.getLocalServer(thread_client)
156+
return JoinPromises(server.m_connection.m_threads.getLocalServer(thread_client)
157157
.then([&server, invoke, req](kj::Maybe<Thread::Server&> perhaps) {
158158
KJ_IF_MAYBE(thread_server, perhaps)
159159
{
160160
const auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
161-
server.m_connection->m_loop.log() << "IPC server post request #" << req << " {"
162-
<< thread.m_thread_context.thread_name << "}";
161+
server.m_connection.m_loop.log() << "IPC server post request #" << req << " {"
162+
<< thread.m_thread_context.thread_name << "}";
163163
thread.m_thread_context.waiter->post(std::move(invoke));
164164
}
165165
else
166166
{
167-
server.m_connection->m_loop.log() << "IPC server error request #" << req
168-
<< ", missing thread to execute request";
167+
server.m_connection.m_loop.log() << "IPC server error request #" << req
168+
<< ", missing thread to execute request";
169169
throw std::runtime_error("invalid thread handle");
170170
}
171171
}),
@@ -1327,7 +1327,7 @@ void clientDestroy(Client& client)
13271327
template <typename Server>
13281328
void serverDestroy(Server& server)
13291329
{
1330-
server.m_connection->m_loop.log() << "IPC server destroy" << typeid(server).name();
1330+
server.m_connection.m_loop.log() << "IPC server destroy" << typeid(server).name();
13311331
}
13321332

13331333
template <typename ProxyClient, typename GetRequest, typename... FieldObjs>
@@ -1418,8 +1418,8 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
14181418
using Results = typename decltype(call_context.getResults())::Builds;
14191419

14201420
int req = ++server_reqs;
1421-
server.m_connection->m_loop.log() << "IPC server recv request #" << req << " "
1422-
<< TypeName<typename Params::Reads>() << " " << LogEscape(params.toString());
1421+
server.m_connection.m_loop.log() << "IPC server recv request #" << req << " "
1422+
<< TypeName<typename Params::Reads>() << " " << LogEscape(params.toString());
14231423

14241424
try {
14251425
using ServerContext = ServerInvokeContext<Server, CallContext>;
@@ -1428,11 +1428,11 @@ kj::Promise<void> serverInvoke(Server& server, CallContext& call_context, Fn fn)
14281428
return ReplaceVoid([&]() { return fn.invoke(server_context, ArgList()); },
14291429
[&]() { return kj::Promise<CallContext>(kj::mv(call_context)); })
14301430
.then([&server, req](CallContext call_context) {
1431-
server.m_connection->m_loop.log() << "IPC server send response #" << req << " " << TypeName<Results>()
1432-
<< " " << LogEscape(call_context.getResults().toString());
1431+
server.m_connection.m_loop.log() << "IPC server send response #" << req << " " << TypeName<Results>()
1432+
<< " " << LogEscape(call_context.getResults().toString());
14331433
});
14341434
} catch (...) {
1435-
server.m_connection->m_loop.log()
1435+
server.m_connection.m_loop.log()
14361436
<< "IPC server unhandled exception " << boost::current_exception_diagnostic_information();
14371437
throw;
14381438
}

include/mp/proxy.h

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,11 +101,7 @@ struct ProxyServerBase : public virtual Interface_::Server
101101
* appropriate times depending on semantics of the particular method being
102102
* wrapped. */
103103
bool m_owned;
104-
/**
105-
* Connection is a pointer rather than a reference because for the Init
106-
* server, the server object needs to be created before the connection.
107-
*/
108-
Connection* m_connection;
104+
Connection& m_connection;
109105
};
110106

111107
//! Customizable (through template specialization) base class used in generated ProxyServer implementations from

src/mp/proxy.cpp

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -236,17 +236,6 @@ void EventLoop::startAsyncThread(std::unique_lock<std::mutex>& lock)
236236
}
237237
}
238238

239-
void AddClient(EventLoop& loop)
240-
{
241-
std::unique_lock<std::mutex> lock(loop.m_mutex);
242-
loop.addClient(lock);
243-
}
244-
void RemoveClient(EventLoop& loop)
245-
{
246-
std::unique_lock<std::mutex> lock(loop.m_mutex);
247-
loop.removeClient(lock);
248-
}
249-
250239
ProxyServer<Thread>::ProxyServer(ThreadContext& thread_context, std::thread&& thread)
251240
: m_thread_context(thread_context), m_thread(std::move(thread))
252241
{

src/mp/test/test.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ KJ_TEST("Call FooInterface methods")
2323
EventLoop loop("mptest", [](bool raise, const std::string& log) {});
2424
auto pipe = loop.m_io_context.provider->newTwoWayPipe();
2525

26-
auto connection_client = std::make_unique<Connection>(loop, kj::mv(pipe.ends[0]), true);
26+
auto connection_client = std::make_unique<Connection>(loop, kj::mv(pipe.ends[0]));
2727
auto foo_client = std::make_unique<ProxyClient<messages::FooInterface>>(
2828
connection_client->m_rpc_system.bootstrap(ServerVatId().vat_id).castAs<messages::FooInterface>(),
2929
connection_client.get(), /* destroy_connection= */ false);

0 commit comments

Comments
 (0)