From d5ffee2ec801274ac271273289084b7251b4ce89 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Sun, 27 Nov 2016 17:50:06 -0800 Subject: [PATCH] fix(client): close Pooled streams on sockopt error --- src/client/pool.rs | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/src/client/pool.rs b/src/client/pool.rs index 357afd523f..ecf6db22e6 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -5,6 +5,7 @@ use std::fmt; use std::io::{self, Read, Write}; use std::net::{SocketAddr, Shutdown}; use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; @@ -130,7 +131,7 @@ impl, S: NetworkStream + Send> NetworkConnector fo }; Ok(PooledStream { inner: Some(inner), - is_closed: false, + is_closed: AtomicBool::new(false), pool: self.inner.clone(), }) } @@ -139,7 +140,7 @@ impl, S: NetworkStream + Send> NetworkConnector fo /// A Stream that will try to be returned to the Pool when dropped. pub struct PooledStream { inner: Option>, - is_closed: bool, + is_closed: AtomicBool, pool: Arc>>, } @@ -148,7 +149,7 @@ impl fmt::Debug for PooledStream where S: fmt::Debug + 'static { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("PooledStream") .field("inner", &self.inner) - .field("is_closed", &self.is_closed) + .field("is_closed", &self.is_closed.load(Ordering::Relaxed)) .field("pool", &self.pool) .finish() } @@ -176,7 +177,7 @@ impl Read for PooledStream { // if the wrapped stream returns EOF (Ok(0)), that means the // server has closed the stream. we must be sure this stream // is dropped and not put back into the pool. - self.is_closed = true; + self.is_closed.store(true, Ordering::Relaxed); Ok(0) }, r => r @@ -200,21 +201,33 @@ impl NetworkStream for PooledStream { #[inline] fn peer_addr(&mut self) -> io::Result { self.inner.as_mut().unwrap().stream.peer_addr() + .map_err(|e| { + self.is_closed.store(true, Ordering::Relaxed); + e + }) } #[inline] fn set_read_timeout(&self, dur: Option) -> io::Result<()> { self.inner.as_ref().unwrap().stream.set_read_timeout(dur) + .map_err(|e| { + self.is_closed.store(true, Ordering::Relaxed); + e + }) } #[inline] fn set_write_timeout(&self, dur: Option) -> io::Result<()> { self.inner.as_ref().unwrap().stream.set_write_timeout(dur) + .map_err(|e| { + self.is_closed.store(true, Ordering::Relaxed); + e + }) } #[inline] fn close(&mut self, how: Shutdown) -> io::Result<()> { - self.is_closed = true; + self.is_closed.store(true, Ordering::Relaxed); self.inner.as_mut().unwrap().stream.close(how) } @@ -234,8 +247,9 @@ impl NetworkStream for PooledStream { impl Drop for PooledStream { fn drop(&mut self) { - trace!("PooledStream.drop, is_closed={}", self.is_closed); - if !self.is_closed { + let is_closed = self.is_closed.load(Ordering::Relaxed); + trace!("PooledStream.drop, is_closed={}", is_closed); + if !is_closed { self.inner.take().map(|inner| { if let Ok(mut pool) = self.pool.lock() { pool.reuse(inner.key.clone(), inner);