diff --git a/src/proto/conn.rs b/src/proto/conn.rs index 0d3980718d..13805acb16 100644 --- a/src/proto/conn.rs +++ b/src/proto/conn.rs @@ -237,20 +237,17 @@ where I: AsyncRead + AsyncWrite, Reading::Body(ref mut decoder) => { match decoder.decode(&mut self.io) { Ok(Async::Ready(slice)) => { - let chunk = if !slice.is_empty() { - Some(super::Chunk::from(slice)) - } else { - None - }; - let reading = if decoder.is_eof() { + let (reading, chunk) = if !slice.is_empty() { + return Ok(Async::Ready(Some(super::Chunk::from(slice)))); + } else if decoder.is_eof() { debug!("incoming body completed"); - Reading::KeepAlive - } else if chunk.is_some() { - Reading::Body(decoder.clone()) + (Reading::KeepAlive, None) } else { trace!("decode stream unexpectedly ended"); - //TODO: Should this return an UnexpectedEof? - Reading::Closed + // this should actually be unreachable: + // the decoder will return an UnexpectedEof if there were + // no bytes to read and it isn't eof yet... + (Reading::Closed, None) }; (reading, Ok(Async::Ready(chunk))) }, @@ -1078,6 +1075,37 @@ mod tests { }).wait(); } + #[test] + fn test_conn_read_body_end() { + let _: Result<(), ()> = future::lazy(|| { + let io = AsyncIo::new_buf(b"POST / HTTP/1.1\r\nContent-Length: 5\r\n\r\n12345".to_vec(), 1024); + let mut conn = Conn::<_, proto::Chunk, ServerTransaction>::new(io, Default::default()); + conn.state.busy(); + + match conn.poll() { + Ok(Async::Ready(Some(Frame::Message { body: true, .. }))) => (), + other => panic!("unexpected frame: {:?}", other) + } + + match conn.poll() { + Ok(Async::Ready(Some(Frame::Body { chunk: Some(_) }))) => (), + other => panic!("unexpected frame: {:?}", other) + } + + // When the body is done, `poll` MUST return a `Body` frame with chunk set to `None` + match conn.poll() { + Ok(Async::Ready(Some(Frame::Body { chunk: None }))) => (), + other => panic!("unexpected frame: {:?}", other) + } + + match conn.poll() { + Ok(Async::NotReady) => (), + other => panic!("unexpected frame: {:?}", other) + } + Ok(()) + }).wait(); + } + #[test] fn test_conn_closed_read() { let io = AsyncIo::new_buf(vec![], 0); diff --git a/src/proto/h1/decode.rs b/src/proto/h1/decode.rs index 462cc96762..a8fd0fb9e3 100644 --- a/src/proto/h1/decode.rs +++ b/src/proto/h1/decode.rs @@ -75,7 +75,6 @@ impl Decoder { // methods pub fn is_eof(&self) -> bool { - trace!("is_eof? {:?}", self); match self.kind { Length(0) | Chunked(ChunkedState::End, _) | @@ -85,16 +84,15 @@ impl Decoder { } pub fn decode(&mut self, body: &mut R) -> Poll { + trace!("decode; state={:?}", self.kind); match self.kind { Length(ref mut remaining) => { - trace!("Sized read, remaining={:?}", remaining); if *remaining == 0 { Ok(Async::Ready(Bytes::new())) } else { let to_read = *remaining as usize; let buf = try_ready!(body.read_mem(to_read)); let num = buf.as_ref().len() as u64; - trace!("Length read: {}", num); if num > *remaining { *remaining = 0; } else if num == 0 {