Skip to content

Commit

Permalink
fix(client): fix connection leak when Response finishes before Reques…
Browse files Browse the repository at this point in the history
…t body

If the Response was received and the body finished while the Request
body was still streaming, the connection could get into a state where it
was never polled again, thus never re-inserting into the connection pool
or being dropped.

Closes #1717
  • Loading branch information
seanmonstar committed Nov 22, 2018
1 parent 2e7250b commit e455fa2
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 3 deletions.
15 changes: 13 additions & 2 deletions src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ impl State {
match (&self.reading, &self.writing) {
(&Reading::KeepAlive, &Writing::KeepAlive) => {
if let KA::Busy = self.keep_alive.status() {
self.idle();
self.idle::<T>();
} else {
trace!("try_keep_alive({}): could keep-alive, but status = {:?}", T::LOG, self.keep_alive);
self.close();
Expand All @@ -819,12 +819,23 @@ impl State {
self.keep_alive.busy();
}

fn idle(&mut self) {
fn idle<T: Http1Transaction>(&mut self) {
debug_assert!(!self.is_idle(), "State::idle() called while idle");

self.method = None;
self.keep_alive.idle();
if self.is_idle() {
self.reading = Reading::Init;
self.writing = Writing::Init;

// !T::should_read_first() means Client.
//
// If Client connection has just gone idle, the Dispatcher
// should try the poll loop one more time, so as to poll the
// pending requests stream.
if !T::should_read_first() {
self.notify_read = true;
}
} else {
self.close();
}
Expand Down
2 changes: 1 addition & 1 deletion src/proto/h1/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ where
// user has dropped sender handle
Ok(Async::Ready(None))
},
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(never) => match never {},
}
}
Expand Down
77 changes: 77 additions & 0 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,83 @@ mod dispatch_impl {
assert_eq!(connects.load(Ordering::Relaxed), 2);
}

#[test]
fn client_keep_alive_when_response_before_request_body_ends() {
use futures_timer::Delay;
let _ = pretty_env_logger::try_init();
let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap();
let mut rt = Runtime::new().unwrap();

let connector = DebugConnector::new();
let connects = connector.connects.clone();

let client = Client::builder()
.build(connector);

let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let (tx3, rx3) = oneshot::channel();
thread::spawn(move || {
let mut sock = server.accept().unwrap().0;
sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
sock.set_write_timeout(Some(Duration::from_secs(5))).unwrap();
let mut buf = [0; 4096];
sock.read(&mut buf).expect("read 1");
sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").expect("write 1");
// after writing the response, THEN stream the body
let _ = tx1.send(());

sock.read(&mut buf).expect("read 2");
let _ = tx2.send(());

let n2 = sock.read(&mut buf).expect("read 3");
assert_ne!(n2, 0);
let second_get = "GET /b HTTP/1.1\r\n";
assert_eq!(s(&buf[..second_get.len()]), second_get);
sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n").expect("write 2");
let _ = tx3.send(());
});


assert_eq!(connects.load(Ordering::Relaxed), 0);

let delayed_body = rx1
.map_err(|_| -> hyper::Error { panic!("rx1") })
.and_then(|_| Delay::new(Duration::from_millis(200)).map_err(|_| panic!("delay")))
.into_stream()
.map(|_| "hello a");

let rx = rx2.expect("thread panicked");
let req = Request::builder()
.method("POST")
.uri(&*format!("http://{}/a", addr))
.body(Body::wrap_stream(delayed_body))
.unwrap();
let client2 = client.clone();

// req 1
let fut = client.request(req)
.join(rx)
.and_then(|_| Delay::new(Duration::from_millis(200)).expect("delay"))
// req 2
.and_then(move |()| {
let rx = rx3.expect("thread panicked");
let req = Request::builder()
.uri(&*format!("http://{}/b", addr))
.body(Body::empty())
.unwrap();
client2
.request(req)
.join(rx)
.map(|_| ())
});

rt.block_on(fut).unwrap();

assert_eq!(connects.load(Ordering::Relaxed), 1);
}

#[test]
fn connect_proxy_sends_absolute_uri() {
let _ = pretty_env_logger::try_init();
Expand Down

0 comments on commit e455fa2

Please sign in to comment.