Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions include/mp/proxy-io.h
Original file line number Diff line number Diff line change
Expand Up @@ -499,14 +499,26 @@ std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int f
kj::mv(init_client), connection.release(), /* destroy_connection= */ true);
}

//! Given stream and a callback to construct a new ProxyServer object that
//! handles requests from the stream, create a new Connection callback, pass it
//! to the callback, use the returned ProxyServer to handle requests, and delete
//! the proxyserver if the connection is disconnected.
//! This should be called from the event loop thread.
void ServeStream(EventLoop& loop,
kj::Own<kj::AsyncIoStream>&& stream,
std::function<capnp::Capability::Client(Connection&)> make_server);
//! Given stream file descriptor and an init object, construct a new ProxyServer
//! object that handles requests from the stream calling the init object. Embed
//! the ProxyServer in a Connection object that is stored and erased if
//! disconnected. This should be called from the event loop thread.
template <typename InitInterface, typename InitImpl>
void ServeStream(EventLoop& loop, int fd, InitImpl& init)
{
loop.m_incoming_connections.emplace_front(loop,
loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
[&](Connection& connection) {
// Set owned to false so proxy object doesn't attempt to delete init
// object on disconnect/close.
return kj::heap<mp::ProxyServer<InitInterface>>(&init, false, connection);
});
auto it = loop.m_incoming_connections.begin();
it->onDisconnect([&loop, it] {
loop.log() << "IPC server: socket disconnected.";
loop.m_incoming_connections.erase(it);
});
}

extern thread_local ThreadContext g_thread_context;

Expand Down
19 changes: 0 additions & 19 deletions src/mp/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,23 +305,4 @@ std::string LongThreadName(const char* exe_name)
return g_thread_context.thread_name.empty() ? ThreadName(exe_name) : g_thread_context.thread_name;
}

void ServeStream(EventLoop& loop,
kj::Own<kj::AsyncIoStream>&& stream,
std::function<capnp::Capability::Client(Connection&)> make_server)
{
loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), make_server);
auto it = loop.m_incoming_connections.begin();
it->onDisconnect([&loop, it] {
loop.log() << "IPC server: socket disconnected.";
loop.m_incoming_connections.erase(it);
});
}

void ServeStream(EventLoop& loop, int fd, std::function<capnp::Capability::Client(Connection&)> make_server)
{
ServeStream(loop,
loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
std::move(make_server));
}

} // namespace mp