From 8b644c1a2a1a629be9b263d8fae5963a61af91cd Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Sat, 5 May 2018 01:17:05 -0700 Subject: [PATCH] feat(server): add Server::run_threads to run on multiple threads --- Cargo.toml | 1 + src/lib.rs | 1 + src/server/mod.rs | 89 +++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 89 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 905486ef41..8bca6e6b9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,7 @@ iovec = "0.1" language-tags = "0.2" log = "0.4" mime = "0.3.2" +net2 = "0.2" percent-encoding = "1.0" relay = "0.1" time = "0.1" diff --git a/src/lib.rs b/src/lib.rs index 89ab897262..9bc61b5160 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,6 +28,7 @@ extern crate iovec; extern crate language_tags; #[macro_use] extern crate log; pub extern crate mime; +extern crate net2; #[macro_use] extern crate percent_encoding; extern crate relay; extern crate time; diff --git a/src/server/mod.rs b/src/server/mod.rs index b7d6f8a54f..d17c06da1f 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -14,11 +14,14 @@ use std::io; use std::marker::PhantomData; use std::net::SocketAddr; use std::rc::{Rc, Weak}; +use std::sync::Arc; +use std::thread; use std::time::Duration; use futures::task::{self, Task}; use futures::future::{self}; use futures::{Future, Stream, Poll, Async}; +use net2; #[cfg(feature = "compat")] use http; @@ -60,7 +63,7 @@ pub struct Http { keep_alive: bool, pipeline: bool, sleep_on_errors: bool, - _marker: PhantomData, + _marker: PhantomData B>, } /// An instance of a server created through `Http::bind`. @@ -178,7 +181,7 @@ impl + 'static> Http { { let core = try!(Core::new()); let handle = core.handle(); - let listener = try!(TcpListener::bind(addr, &handle)); + let listener = try!(thread_listener(addr, &handle)); Ok(Server { new_service: new_service, @@ -445,6 +448,88 @@ impl Server } } + +impl Server + where S: NewService, Error = ::Error> + Send + Sync + 'static, + B: Stream + 'static, + B::Item: AsRef<[u8]>, +{ + /// Run the server on multiple threads. + #[cfg(unix)] + pub fn run_threads(self, threads: usize) { + assert!(threads > 0, "threads must be more than 0"); + + let Server { + protocol, + new_service, + reactor, + listener, + shutdown_timeout, + } = self; + + let new_service = Arc::new(new_service); + let addr = listener.local_addr().unwrap(); + + let threads = (1..threads).map(|i| { + let protocol = protocol.clone(); + let new_service = new_service.clone(); + thread::Builder::new() + .name(format!("hyper-server-thread-{}", i)) + .spawn(move || { + let reactor = Core::new().unwrap(); + let listener = thread_listener(&addr, &reactor.handle()).unwrap(); + let srv = Server { + protocol, + new_service, + reactor, + listener, + shutdown_timeout, + }; + srv.run().unwrap(); + }) + .unwrap() + }).collect::>(); + + let srv = Server { + protocol, + new_service, + reactor, + listener, + shutdown_timeout, + }; + srv.run().unwrap(); + + for thread in threads { + thread.join().unwrap(); + } + } +} + +fn thread_listener(addr: &SocketAddr, handle: &Handle) -> io::Result { + let listener = match *addr { + SocketAddr::V4(_) => try!(net2::TcpBuilder::new_v4()), + SocketAddr::V6(_) => try!(net2::TcpBuilder::new_v6()), + }; + try!(reuse_port(&listener)); + try!(listener.reuse_address(true)); + try!(listener.bind(addr)); + listener.listen(1024).and_then(|l| { + TcpListener::from_listener(l, addr, handle) + }) +} + +#[cfg(unix)] +fn reuse_port(tcp: &net2::TcpBuilder) -> io::Result<()> { + use net2::unix::*; + try!(tcp.reuse_port(true)); + Ok(()) +} + +#[cfg(not(unix))] +fn reuse_port(_tcp: &net2::TcpBuilder) -> io::Result<()> { + Ok(()) +} + impl> fmt::Debug for Server where B::Item: AsRef<[u8]> {