From 710238c551f0ec4acbdabe45dfbb23aee4b217ec Mon Sep 17 00:00:00 2001 From: Russell Yanofsky Date: Thu, 5 Sep 2019 11:13:40 -0400 Subject: [PATCH] Add simpler ServeStream function Be more consistent with ConnectStream and just require Init message type instead of ProxyServer constructing callback, and file descriptor number instead of a stream object. --- include/mp/proxy-io.h | 28 ++++++++++++++++++++-------- src/mp/proxy.cpp | 19 ------------------- 2 files changed, 20 insertions(+), 27 deletions(-) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index 5e0baf7d..c5752ee7 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -499,14 +499,26 @@ std::unique_ptr> ConnectStream(EventLoop& loop, int f kj::mv(init_client), connection.release(), /* destroy_connection= */ true); } -//! Given stream and a callback to construct a new ProxyServer object that -//! handles requests from the stream, create a new Connection callback, pass it -//! to the callback, use the returned ProxyServer to handle requests, and delete -//! the proxyserver if the connection is disconnected. -//! This should be called from the event loop thread. -void ServeStream(EventLoop& loop, - kj::Own&& stream, - std::function make_server); +//! Given stream file descriptor and an init object, construct a new ProxyServer +//! object that handles requests from the stream calling the init object. Embed +//! the ProxyServer in a Connection object that is stored and erased if +//! disconnected. This should be called from the event loop thread. +template +void ServeStream(EventLoop& loop, int fd, InitImpl& init) +{ + loop.m_incoming_connections.emplace_front(loop, + loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), + [&](Connection& connection) { + // Set owned to false so proxy object doesn't attempt to delete init + // object on disconnect/close. + return kj::heap>(&init, false, connection); + }); + auto it = loop.m_incoming_connections.begin(); + it->onDisconnect([&loop, it] { + loop.log() << "IPC server: socket disconnected."; + loop.m_incoming_connections.erase(it); + }); +} extern thread_local ThreadContext g_thread_context; diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index 2c3c557e..69231562 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -305,23 +305,4 @@ std::string LongThreadName(const char* exe_name) return g_thread_context.thread_name.empty() ? ThreadName(exe_name) : g_thread_context.thread_name; } -void ServeStream(EventLoop& loop, - kj::Own&& stream, - std::function make_server) -{ - loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), make_server); - auto it = loop.m_incoming_connections.begin(); - it->onDisconnect([&loop, it] { - loop.log() << "IPC server: socket disconnected."; - loop.m_incoming_connections.erase(it); - }); -} - -void ServeStream(EventLoop& loop, int fd, std::function make_server) -{ - ServeStream(loop, - loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), - std::move(make_server)); -} - } // namespace mp