From 68458cde57a20f4b3c9c306eaf9801189262e0a6 Mon Sep 17 00:00:00 2001 From: Klaus Purer Date: Sat, 24 Feb 2018 16:03:46 +0100 Subject: [PATCH] fix(server): Sleep on socket IO errors --- src/server/mod.rs | 65 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 61 insertions(+), 4 deletions(-) diff --git a/src/server/mod.rs b/src/server/mod.rs index 7234da690a..b8abbbbd45 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -57,6 +57,7 @@ pub struct Http { max_buf_size: Option, keep_alive: bool, pipeline: bool, + sleep_on_errors: bool, _marker: PhantomData, } @@ -102,6 +103,9 @@ pub struct AddrIncoming { addr: SocketAddr, keep_alive_timeout: Option, listener: TcpListener, + handle: Handle, + sleep_on_errors: bool, + timeout: Option, } /// A future binding a connection with a Service. @@ -144,6 +148,7 @@ impl + 'static> Http { keep_alive: true, max_buf_size: None, pipeline: false, + sleep_on_errors: false, _marker: PhantomData, } } @@ -172,6 +177,18 @@ impl + 'static> Http { self } + /// Swallow connection accept errors. Instead of passing up IO errors when + /// the server is under heavy load the errors will be ignored. Some + /// connection accept errors (like "connection reset") can be ignored, some + /// (like "too many files open") may consume 100% CPU and a timout of 10ms + /// is used in that case. + /// + /// Default is false. + pub fn sleep_on_errors(&mut self, enabled: bool) -> &mut Self { + self.sleep_on_errors = enabled; + self + } + /// Bind the provided `addr` and return a server ready to handle /// connections. /// @@ -225,7 +242,7 @@ impl + 'static> Http { Bd: Stream, { let listener = TcpListener::bind(addr, &handle)?; - let mut incoming = AddrIncoming::new(listener)?; + let mut incoming = AddrIncoming::new(listener, handle.clone(), self.sleep_on_errors)?; if self.keep_alive { incoming.set_keepalive(Some(Duration::from_secs(90))); } @@ -248,6 +265,7 @@ impl + 'static> Http { keep_alive: self.keep_alive, max_buf_size: self.max_buf_size, pipeline: self.pipeline, + sleep_on_errors: self.sleep_on_errors, _marker: PhantomData, }, } @@ -394,7 +412,7 @@ impl Server let handle = reactor.handle(); - let mut incoming = AddrIncoming::new(listener)?; + let mut incoming = AddrIncoming::new(listener, handle.clone(), protocol.sleep_on_errors)?; if protocol.keep_alive { incoming.set_keepalive(Some(Duration::from_secs(90))); @@ -619,11 +637,14 @@ mod unnameable { // ===== impl AddrIncoming ===== impl AddrIncoming { - fn new(listener: TcpListener) -> io::Result { + fn new(listener: TcpListener, handle: Handle, sleep_on_errors: bool) -> io::Result { Ok(AddrIncoming { addr: listener.local_addr()?, keep_alive_timeout: None, listener: listener, + handle: handle, + sleep_on_errors: sleep_on_errors, + timeout: None, }) } @@ -643,6 +664,13 @@ impl Stream for AddrIncoming { type Error = ::std::io::Error; fn poll(&mut self) -> Poll, Self::Error> { + if let Some(ref mut to) = self.timeout { + match to.poll().expect("timeout never fails") { + Async::Ready(_) => {} + Async::NotReady => return Ok(Async::NotReady), + } + } + self.timeout = None; loop { match self.listener.accept() { Ok((socket, addr)) => { @@ -654,12 +682,41 @@ impl Stream for AddrIncoming { return Ok(Async::Ready(Some(AddrStream::new(socket, addr)))); }, Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady), - Err(e) => return Err(e), + Err(ref e) if connection_error(e) => continue, + Err(e) => { + let delay = ::std::time::Duration::from_millis(10); + debug!("Accept error: {}. Sleeping {:?}...", + e, delay); + let mut timeout = Timeout::new(delay, &self.handle) + .expect("can always set a timeout"); + let result = timeout.poll() + .expect("timeout never fails"); + match result { + Async::Ready(()) => continue, + Async::NotReady => { + self.timeout = Some(timeout); + return Ok(Async::NotReady); + } + } + } } } } } +/// This function defines errors that are per-connection. Which basically +/// means that if we get this error from `accept()` system call it means +/// next connection might be ready to be accepted. +/// +/// All other errors will incur a timeout before next `accept()` is performed. +/// The timeout is useful to handle resource exhaustion errors like ENFILE +/// and EMFILE. Otherwise, could enter into tight loop. +fn connection_error(e: &io::Error) -> bool { + e.kind() == io::ErrorKind::ConnectionRefused || + e.kind() == io::ErrorKind::ConnectionAborted || + e.kind() == io::ErrorKind::ConnectionReset +} + mod addr_stream { use std::io::{self, Read, Write}; use std::net::SocketAddr;