diff --git a/src/client/connect.rs b/src/client/connect.rs index ae90f94d5a..8e0559440b 100644 --- a/src/client/connect.rs +++ b/src/client/connect.rs @@ -403,12 +403,16 @@ mod http { use self::http_connector::HttpConnectorBlockingTask; - fn connect(addr: &SocketAddr, local_addr: &Option, handle: &Option) -> io::Result { + fn connect(addr: &SocketAddr, local_addr: &Option, handle: &Option, reuse_address: bool) -> io::Result { let builder = match addr { &SocketAddr::V4(_) => TcpBuilder::new_v4()?, &SocketAddr::V6(_) => TcpBuilder::new_v6()?, }; + if reuse_address { + builder.reuse_address(reuse_address)?; + } + if let Some(ref local_addr) = *local_addr { // Caller has requested this socket be bound before calling connect builder.bind(SocketAddr::new(local_addr.clone(), 0))?; @@ -446,6 +450,7 @@ mod http { nodelay: bool, local_address: Option, happy_eyeballs_timeout: Option, + reuse_address: bool, } impl HttpConnector { @@ -484,6 +489,7 @@ mod http { nodelay: false, local_address: None, happy_eyeballs_timeout: Some(Duration::from_millis(300)), + reuse_address: false, } } @@ -539,6 +545,15 @@ mod http { pub fn set_happy_eyeballs_timeout(&mut self, dur: Option) { self.happy_eyeballs_timeout = dur; } + + /// Set that all socket have `SO_REUSEADDR` set to the supplied value `reuse_address`. + /// + /// Default is `false`. + #[inline] + pub fn set_reuse_address(&mut self, reuse_address: bool) -> &mut Self { + self.reuse_address = reuse_address; + self + } } impl fmt::Debug for HttpConnector { @@ -585,6 +600,7 @@ mod http { keep_alive_timeout: self.keep_alive_timeout, nodelay: self.nodelay, happy_eyeballs_timeout: self.happy_eyeballs_timeout, + reuse_address: self.reuse_address, } } } @@ -597,6 +613,7 @@ mod http { keep_alive_timeout: None, nodelay: false, happy_eyeballs_timeout: None, + reuse_address: false, } } @@ -630,6 +647,7 @@ mod http { keep_alive_timeout: Option, nodelay: bool, happy_eyeballs_timeout: Option, + reuse_address: bool, } enum State { @@ -652,7 +670,7 @@ mod http { // skip resolving the dns and start connecting right away. if let Some(addrs) = dns::IpAddrs::try_parse(host, port) { state = State::Connecting(ConnectingTcp::new( - local_addr, addrs, self.happy_eyeballs_timeout)); + local_addr, addrs, self.happy_eyeballs_timeout, self.reuse_address)); } else { let host = mem::replace(host, String::new()); let work = dns::Work::new(host, port); @@ -664,7 +682,7 @@ mod http { Async::NotReady => return Ok(Async::NotReady), Async::Ready(addrs) => { state = State::Connecting(ConnectingTcp::new( - local_addr, addrs, self.happy_eyeballs_timeout)); + local_addr, addrs, self.happy_eyeballs_timeout, self.reuse_address)); } }; }, @@ -696,6 +714,7 @@ mod http { local_addr: Option, preferred: ConnectingTcpRemote, fallback: Option, + reuse_address: bool, } impl ConnectingTcp { @@ -703,6 +722,7 @@ mod http { local_addr: Option, remote_addrs: dns::IpAddrs, fallback_timeout: Option, + reuse_address: bool, ) -> ConnectingTcp { if let Some(fallback_timeout) = fallback_timeout { let (preferred_addrs, fallback_addrs) = remote_addrs.split_by_preference(); @@ -711,6 +731,7 @@ mod http { local_addr, preferred: ConnectingTcpRemote::new(preferred_addrs), fallback: None, + reuse_address, }; } @@ -721,12 +742,14 @@ mod http { delay: Delay::new(Instant::now() + fallback_timeout), remote: ConnectingTcpRemote::new(fallback_addrs), }), + reuse_address, } } else { ConnectingTcp { local_addr, preferred: ConnectingTcpRemote::new(remote_addrs), fallback: None, + reuse_address, } } } @@ -757,6 +780,7 @@ mod http { &mut self, local_addr: &Option, handle: &Option, + reuse_address: bool, ) -> Poll { let mut err = None; loop { @@ -768,14 +792,14 @@ mod http { err = Some(e); if let Some(addr) = self.addrs.next() { debug!("connecting to {}", addr); - *current = connect(&addr, local_addr, handle)?; + *current = connect(&addr, local_addr, handle, reuse_address)?; continue; } } } } else if let Some(addr) = self.addrs.next() { debug!("connecting to {}", addr); - self.current = Some(connect(&addr, local_addr, handle)?); + self.current = Some(connect(&addr, local_addr, handle, reuse_address)?); continue; } @@ -788,14 +812,14 @@ mod http { // not a Future, since passing a &Handle to poll fn poll(&mut self, handle: &Option) -> Poll { match self.fallback.take() { - None => self.preferred.poll(&self.local_addr, handle), - Some(mut fallback) => match self.preferred.poll(&self.local_addr, handle) { + None => self.preferred.poll(&self.local_addr, handle, self.reuse_address), + Some(mut fallback) => match self.preferred.poll(&self.local_addr, handle, self.reuse_address) { Ok(Async::Ready(stream)) => { // Preferred successful - drop fallback. Ok(Async::Ready(stream)) } Ok(Async::NotReady) => match fallback.delay.poll() { - Ok(Async::Ready(_)) => match fallback.remote.poll(&self.local_addr, handle) { + Ok(Async::Ready(_)) => match fallback.remote.poll(&self.local_addr, handle, self.reuse_address) { Ok(Async::Ready(stream)) => { // Fallback successful - drop current preferred, // but keep fallback as new preferred. @@ -825,7 +849,7 @@ mod http { Err(_) => { // Preferred failed - use fallback as new preferred. self.preferred = fallback.remote; - self.preferred.poll(&self.local_addr, handle) + self.preferred.poll(&self.local_addr, handle, self.reuse_address) } } } @@ -980,7 +1004,7 @@ mod http { } let addrs = hosts.iter().map(|host| (host.clone(), addr.port()).into()).collect(); - let connecting_tcp = ConnectingTcp::new(None, dns::IpAddrs::new(addrs), Some(fallback_timeout)); + let connecting_tcp = ConnectingTcp::new(None, dns::IpAddrs::new(addrs), Some(fallback_timeout), false); let fut = ConnectingTcpFuture(connecting_tcp); let start = Instant::now();