diff --git a/src/body/body.rs b/src/body/body.rs index 09f36b9b45..0f4a1d0220 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -2,14 +2,14 @@ use std::borrow::Cow; use std::fmt; use bytes::Bytes; -use futures::{Async, Future, Poll, Stream}; use futures::sync::{mpsc, oneshot}; +use futures::{Async, Future, Poll, Stream}; use h2; use http::HeaderMap; use common::Never; -use super::{Chunk, Payload}; use super::internal::{FullDataArg, FullDataRet}; +use super::{Chunk, Payload}; use upgrade::OnUpgrade; type BodySender = mpsc::Sender>; @@ -34,8 +34,11 @@ enum Kind { abort_rx: oneshot::Receiver<()>, rx: mpsc::Receiver>, }, - H2(h2::RecvStream), - Wrapped(Box> + Send>), + H2 { + content_length: Option, + recv: h2::RecvStream, + }, + Wrapped(Box> + Send>), } struct Extra { @@ -140,9 +143,7 @@ impl Body { S::Error: Into>, Chunk: From, { - let mapped = stream - .map(Chunk::from) - .map_err(Into::into); + let mapped = stream.map(Chunk::from).map_err(Into::into); Body::new(Kind::Wrapped(Box::new(mapped))) } @@ -163,8 +164,11 @@ impl Body { } } - pub(crate) fn h2(recv: h2::RecvStream) -> Self { - Body::new(Kind::H2(recv)) + pub(crate) fn h2(recv: h2::RecvStream, content_length: Option) -> Self { + Body::new(Kind::H2 { + content_length, + recv, + }) } pub(crate) fn set_on_upgrade(&mut self, upgrade: OnUpgrade) { @@ -235,7 +239,11 @@ impl Body { fn poll_inner(&mut self) -> Poll, ::Error> { match self.kind { Kind::Once(ref mut val) => Ok(Async::Ready(val.take())), - Kind::Chan { content_length: ref mut len, ref mut rx, ref mut abort_rx } => { + Kind::Chan { + content_length: ref mut len, + ref mut rx, + ref mut abort_rx, + } => { if let Ok(Async::Ready(())) = abort_rx.poll() { return Err(::Error::new_body_write("body write aborted")); } @@ -252,19 +260,20 @@ impl Body { Async::Ready(None) => Ok(Async::Ready(None)), Async::NotReady => Ok(Async::NotReady), } - }, - Kind::H2(ref mut h2) => { - h2.poll() - .map(|async| { - async.map(|opt| { - opt.map(|bytes| { - let _ = h2.release_capacity().release_capacity(bytes.len()); - Chunk::from(bytes) - }) + } + Kind::H2 { + recv: ref mut h2, .. + } => h2 + .poll() + .map(|async| { + async.map(|opt| { + opt.map(|bytes| { + let _ = h2.release_capacity().release_capacity(bytes.len()); + Chunk::from(bytes) }) }) - .map_err(::Error::new_body) - }, + }) + .map_err(::Error::new_body), Kind::Wrapped(ref mut s) => s.poll().map_err(::Error::new_body), } } @@ -288,7 +297,9 @@ impl Payload for Body { fn poll_trailers(&mut self) -> Poll, Self::Error> { match self.kind { - Kind::H2(ref mut h2) => h2.poll_trailers().map_err(::Error::new_h2), + Kind::H2 { + recv: ref mut h2, .. + } => h2.poll_trailers().map_err(::Error::new_h2), _ => Ok(Async::Ready(None)), } } @@ -296,8 +307,8 @@ impl Payload for Body { fn is_end_stream(&self) -> bool { match self.kind { Kind::Once(ref val) => val.is_none(), - Kind::Chan { content_length: len, .. } => len == Some(0), - Kind::H2(ref h2) => h2.is_end_stream(), + Kind::Chan { content_length, .. } => content_length == Some(0), + Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(), Kind::Wrapped(..) => false, } } @@ -306,9 +317,8 @@ impl Payload for Body { match self.kind { Kind::Once(Some(ref val)) => Some(val.len() as u64), Kind::Once(None) => Some(0), - Kind::Chan { content_length: len, .. } => len, - Kind::H2(..) => None, Kind::Wrapped(..) => None, + Kind::Chan { content_length, .. } | Kind::H2 { content_length, .. } => content_length, } } @@ -333,8 +343,7 @@ impl Stream for Body { impl fmt::Debug for Body { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Body") - .finish() + f.debug_struct("Body").finish() } } @@ -357,7 +366,8 @@ impl Sender { /// Returns `Err(Chunk)` if the channel could not (currently) accept /// another `Chunk`. pub fn send_data(&mut self, chunk: Chunk) -> Result<(), Chunk> { - self.tx.try_send(Ok(chunk)) + self.tx + .try_send(Ok(chunk)) .map_err(|err| err.into_inner().expect("just sent Ok")) } @@ -398,38 +408,38 @@ impl impl From for Body { #[inline] - fn from (bytes: Bytes) -> Body { + fn from(bytes: Bytes) -> Body { Body::from(Chunk::from(bytes)) } } impl From> for Body { #[inline] - fn from (vec: Vec) -> Body { + fn from(vec: Vec) -> Body { Body::from(Chunk::from(vec)) } } impl From<&'static [u8]> for Body { #[inline] - fn from (slice: &'static [u8]) -> Body { + fn from(slice: &'static [u8]) -> Body { Body::from(Chunk::from(slice)) } } impl From> for Body { #[inline] - fn from (cow: Cow<'static, [u8]>) -> Body { + fn from(cow: Cow<'static, [u8]>) -> Body { match cow { Cow::Borrowed(b) => Body::from(b), - Cow::Owned(o) => Body::from(o) + Cow::Owned(o) => Body::from(o), } } } impl From for Body { #[inline] - fn from (s: String) -> Body { + fn from(s: String) -> Body { Body::from(Chunk::from(s.into_bytes())) } } @@ -446,7 +456,7 @@ impl From> for Body { fn from(cow: Cow<'static, str>) -> Body { match cow { Cow::Borrowed(b) => Body::from(b), - Cow::Owned(o) => Body::from(o) + Cow::Owned(o) => Body::from(o), } } } @@ -455,10 +465,6 @@ impl From> for Body { fn test_body_stream_concat() { let body = Body::from("hello world"); - let total = body - .concat2() - .wait() - .unwrap(); + let total = body.concat2().wait().unwrap(); assert_eq!(total.as_ref(), b"hello world"); } - diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index f8824518d2..a445eb4124 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -5,6 +5,7 @@ use futures::sync::mpsc; use h2::client::{Builder, Handshake, SendRequest}; use tokio_io::{AsyncRead, AsyncWrite}; +use headers::content_length_parse_all; use body::Payload; use ::common::{Exec, Never}; use headers; @@ -135,7 +136,9 @@ where .then(move |result| { match result { Ok(res) => { - let res = res.map(::Body::h2); + let content_length = content_length_parse_all(res.headers()); + let res = res.map(|stream| + ::Body::h2(stream, content_length)); let _ = cb.send(Ok(res)); }, Err(err) => { diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index 07400eadeb..ad5e2d2e6c 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -3,6 +3,7 @@ use h2::Reason; use h2::server::{Builder, Connection, Handshake, SendResponse}; use tokio_io::{AsyncRead, AsyncWrite}; +use ::headers::content_length_parse_all; use ::body::Payload; use ::common::Exec; use ::headers; @@ -126,7 +127,10 @@ where { while let Some((req, respond)) = try_ready!(self.conn.poll().map_err(::Error::new_h2)) { trace!("incoming request"); - let req = req.map(::Body::h2); + let content_length = content_length_parse_all(req.headers()); + let req = req.map(|stream| { + ::Body::h2(stream, content_length) + }); let fut = H2Stream::new(service.call(req), respond); exec.execute(fut); } diff --git a/tests/server.rs b/tests/server.rs index 7979b65e5f..71f66ec04a 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -372,9 +372,7 @@ mod response_body_lengths { .get(uri) .and_then(|res| { assert_eq!(res.headers().get("content-length").unwrap(), "13"); - // TODO: enable this after #1546 - let _ = res.body().content_length(); - // assert_eq!(res.body().content_length(), Some(13)); + assert_eq!(res.body().content_length(), Some(13)); Ok(()) }) .map(|_| ()) @@ -403,9 +401,7 @@ mod response_body_lengths { .get(uri) .and_then(|res| { assert_eq!(res.headers().get("content-length").unwrap(), "10"); - // TODO: enable or remove this after #1546 - let _ = res.body().content_length(); - // assert_eq!(res.body().content_length(), Some(10)); + assert_eq!(res.body().content_length(), Some(10)); Ok(()) }) .map(|_| ())