Skip to content

Commit d519e18

Browse files
committed
Add ListenConnections function
Handles accepting connections from a listening socket and serving requests. (Existing ServeStream function just serves requests on a single connection.)
1 parent d24cae6 commit d519e18

File tree

1 file changed

+44
-11
lines changed

1 file changed

+44
-11
lines changed

include/mp/proxy-io.h

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -499,27 +499,60 @@ std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int f
499499
kj::mv(init_client), connection.release(), /* destroy_connection= */ true);
500500
}
501501

502-
//! Given stream file descriptor and an init object, construct a new ProxyServer
503-
//! object that handles requests from the stream calling the init object. Embed
504-
//! the ProxyServer in a Connection object that is stored and erased if
502+
//! Given stream and init objects, construct a new ProxyServer object that
503+
//! handles requests from the stream by calling the init object. Embed the
504+
//! ProxyServer in a Connection object that is stored and erased if
505505
//! disconnected. This should be called from the event loop thread.
506506
template <typename InitInterface, typename InitImpl>
507-
void ServeStream(EventLoop& loop, int fd, InitImpl& init)
507+
void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init)
508508
{
509-
loop.m_incoming_connections.emplace_front(loop,
510-
loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
511-
[&](Connection& connection) {
512-
// Set owned to false so proxy object doesn't attempt to delete init
513-
// object on disconnect/close.
514-
return kj::heap<mp::ProxyServer<InitInterface>>(&init, false, connection);
515-
});
509+
loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), [&](Connection& connection) {
510+
// Set owned to false so proxy object doesn't attempt to delete init
511+
// object on disconnect/close.
512+
return kj::heap<mp::ProxyServer<InitInterface>>(&init, false, connection);
513+
});
516514
auto it = loop.m_incoming_connections.begin();
517515
it->onDisconnect([&loop, it] {
518516
loop.log() << "IPC server: socket disconnected.";
519517
loop.m_incoming_connections.erase(it);
520518
});
521519
}
522520

521+
//! Given connection receiver and an init object, handle incoming connections by
522+
//! calling _Serve, to create ProxyServer objects and forward requests to the
523+
//! init object.
524+
template <typename InitInterface, typename InitImpl>
525+
void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitImpl& init)
526+
{
527+
auto* ptr = listener.get();
528+
loop.m_task_set->add(ptr->accept().then(kj::mvCapture(kj::mv(listener),
529+
[&loop, &init](kj::Own<kj::ConnectionReceiver>&& listener, kj::Own<kj::AsyncIoStream>&& stream) {
530+
_Serve<InitInterface>(loop, kj::mv(stream), init);
531+
_Listen<InitInterface>(loop, kj::mv(listener), init);
532+
})));
533+
}
534+
535+
//! Given stream file descriptor and an init object, handle requests on the
536+
//! stream by calling methods on the Init object.
537+
template <typename InitInterface, typename InitImpl>
538+
void ServeStream(EventLoop& loop, int fd, InitImpl& init)
539+
{
540+
_Serve<InitInterface>(
541+
loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init);
542+
}
543+
544+
//! Given listening socket file descriptor and an init object, handle incoming
545+
//! connections and requests by calling methods on the Init object.
546+
template <typename InitInterface, typename InitImpl>
547+
void ListenConnections(EventLoop& loop, int fd, InitImpl& init)
548+
{
549+
loop.sync([&]() {
550+
_Listen<InitInterface>(loop,
551+
loop.m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
552+
init);
553+
});
554+
}
555+
523556
extern thread_local ThreadContext g_thread_context;
524557

525558
} // namespace mp

0 commit comments

Comments
 (0)