From fbc449e49cc4a4f8319647dccfb288d3d83df2bd Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 14 Mar 2018 12:40:24 -0700 Subject: [PATCH] feat(body): introduce an `Entity` trait to represent bodies This dedicated `Entity` trait replaces the previous `Stream, Error=hyper::Error>`. This allows for several improvements immediately, and prepares for HTTP2 support. - The `Entity::is_end_stream` makes up for change away from `Option`, which was previously used to know if the body should be empty. Since `Request` and `Response` now require a body to be set, this method can be used to tell hyper that the body is actually empty. It also provides the possibility of slight optimizations when polling for data, by allowing to check `is_end_stream` before polling again. This can allow a consumer to know that a body stream has ended without polling for `None` afterwards. - The `Entity::content_length` method allows a body to automatically declare a size, in case a user doesn't set a `Content-Length` or `Transfer-Encoding` header. - It's now possible to send and receive trailers, though this will be for HTTP2 connections only. By being a trait owned by hyper, new methods can be added later as new features are wanted (with default implementations). The `hyper::Body` type now implements `Entity` instead of `Stream`, provides a better channel option, and is easier to use with custom streams via `Body::wrap_stream`. BREAKING CHANGE: All code that was assuming the body was a `Stream` must be adjusted to use an `Entity` instead. Using `hyper::Body` as a `Stream` can call `Body::into_stream` to get a stream wrapper. Passing a custom `impl Stream` will need to either implement `Entity`, or as an easier option, switch to `Body::wrap_stream`. `Body::pair` has been replaced with `Body::channel`, which returns a `hyper::body::Sender` instead of a `futures::sync::mpsc::Sender`. Closes #1438 --- examples/client.rs | 2 +- examples/params.rs | 2 +- examples/send_file.rs | 14 +- examples/server.rs | 2 +- examples/web_api.rs | 13 +- src/client/conn.rs | 62 +++---- src/client/mod.rs | 29 ++- src/headers.rs | 60 ++++++- src/lib.rs | 2 +- src/proto/body.rs | 344 ++++++++++++++++++++++++++--------- src/proto/h1/conn.rs | 6 +- src/proto/h1/dispatch.rs | 52 +++--- src/proto/h1/role.rs | 174 +++++++++++++++--- src/proto/mod.rs | 18 +- src/server/conn.rs | 19 +- src/server/mod.rs | 37 ++-- tests/client.rs | 77 ++------ tests/server.rs | 375 +++++++++++++++++++++------------------ 18 files changed, 807 insertions(+), 481 deletions(-) diff --git a/examples/client.rs b/examples/client.rs index bafb6ba7fe..b5df77e25d 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -40,7 +40,7 @@ fn main() { println!("Response: {}", res.status()); println!("Headers: {:#?}", res.headers()); - res.into_parts().1.for_each(|chunk| { + res.into_parts().1.into_stream().for_each(|chunk| { io::stdout().write_all(&chunk).map_err(From::from) }) }).map(|_| { diff --git a/examples/params.rs b/examples/params.rs index ff51b91e3c..c632daa88e 100644 --- a/examples/params.rs +++ b/examples/params.rs @@ -30,7 +30,7 @@ impl Service for ParamExample { Box::new(futures::future::ok(Response::new(INDEX.into()))) }, (&Method::POST, "/post") => { - Box::new(req.into_parts().1.concat2().map(|b| { + Box::new(req.into_parts().1.into_stream().concat2().map(|b| { // Parse the request body. form_urlencoded::parse // always succeeds, but in general parsing may // fail (for example, an invalid post of json), so diff --git a/examples/send_file.rs b/examples/send_file.rs index 71351ef975..7e507fe33f 100644 --- a/examples/send_file.rs +++ b/examples/send_file.rs @@ -3,15 +3,15 @@ extern crate futures; extern crate hyper; extern crate pretty_env_logger; -use futures::{Future, Sink}; +use futures::{Future/*, Sink*/}; use futures::sync::oneshot; -use hyper::{Body, Chunk, Method, Request, Response, StatusCode}; +use hyper::{Body, /*Chunk,*/ Method, Request, Response, StatusCode}; use hyper::error::Error; use hyper::server::{Http, Service}; use std::fs::File; -use std::io::{self, copy, Read}; +use std::io::{self, copy/*, Read*/}; use std::thread; static NOTFOUND: &[u8] = b"Not Found"; @@ -80,7 +80,7 @@ impl Service for ResponseExamples { // a small test file. let (tx, rx) = oneshot::channel(); thread::spawn(move || { - let mut file = match File::open(INDEX) { + let _file = match File::open(INDEX) { Ok(f) => f, Err(_) => { tx.send(Response::builder() @@ -91,9 +91,10 @@ impl Service for ResponseExamples { return; }, }; - let (mut tx_body, rx_body) = Body::pair(); + let (_tx_body, rx_body) = Body::channel(); let res = Response::new(rx_body.into()); tx.send(res).expect("Send error on successful file read"); + /* TODO: fix once we have futures 0.2 Sink working let mut buf = [0u8; 16]; loop { match file.read(&mut buf) { @@ -104,7 +105,7 @@ impl Service for ResponseExamples { break; } else { let chunk: Chunk = buf[0..n].to_vec().into(); - match tx_body.send(Ok(chunk)).wait() { + match tx_body.send_data(chunk).wait() { Ok(t) => { tx_body = t; }, Err(_) => { break; } }; @@ -113,6 +114,7 @@ impl Service for ResponseExamples { Err(_) => { break; } } } + */ }); Box::new(rx.map_err(|e| Error::from(io::Error::new(io::ErrorKind::Other, e)))) diff --git a/examples/server.rs b/examples/server.rs index 7c7b3c510b..4c8cba4229 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -24,7 +24,7 @@ impl Service for Echo { Response::new(INDEX.into()) }, (&Method::POST, "/echo") => { - Response::new(req.into_parts().1) + Response::new(req.into_body()) }, _ => { let mut res = Response::new(Body::empty()); diff --git a/examples/web_api.rs b/examples/web_api.rs index d7fdb46e7b..566dad4c49 100644 --- a/examples/web_api.rs +++ b/examples/web_api.rs @@ -7,7 +7,6 @@ extern crate tokio_core; use futures::{Future, Stream}; use hyper::{Body, Chunk, Client, Method, Request, Response, StatusCode}; -use hyper::error::Error; use hyper::server::{Http, Service}; #[allow(unused)] @@ -18,20 +17,18 @@ static URL: &str = "http://127.0.0.1:1337/web_api"; static INDEX: &[u8] = b"test.html"; static LOWERCASE: &[u8] = b"i am a lower case string"; -pub type ResponseStream = Box>; - struct ResponseExamples(tokio_core::reactor::Handle); impl Service for ResponseExamples { type Request = Request; - type Response = Response; + type Response = Response; type Error = hyper::Error; type Future = Box>; fn call(&self, req: Self::Request) -> Self::Future { match (req.method(), req.uri().path()) { (&Method::GET, "/") | (&Method::GET, "/index.html") => { - let body: ResponseStream = Box::new(Body::from(INDEX)); + let body = Body::from(INDEX); Box::new(futures::future::ok(Response::new(body))) }, (&Method::GET, "/test.html") => { @@ -45,7 +42,7 @@ impl Service for ResponseExamples { let web_res_future = client.request(req); Box::new(web_res_future.map(|web_res| { - let body: ResponseStream = Box::new(web_res.into_parts().1.map(|b| { + let body = Body::wrap_stream(web_res.into_body().into_stream().map(|b| { Chunk::from(format!("before: '{:?}'
after: '{:?}'", std::str::from_utf8(LOWERCASE).unwrap(), std::str::from_utf8(&b).unwrap())) @@ -55,7 +52,7 @@ impl Service for ResponseExamples { }, (&Method::POST, "/web_api") => { // A web api to run against. Simple upcasing of the body. - let body: ResponseStream = Box::new(req.into_parts().1.map(|chunk| { + let body = Body::wrap_stream(req.into_body().into_stream().map(|chunk| { let upper = chunk.iter().map(|byte| byte.to_ascii_uppercase()) .collect::>(); Chunk::from(upper) @@ -63,7 +60,7 @@ impl Service for ResponseExamples { Box::new(futures::future::ok(Response::new(body))) }, _ => { - let body: ResponseStream = Box::new(Body::from(NOTFOUND)); + let body = Body::from(NOTFOUND); Box::new(futures::future::ok(Response::builder() .status(StatusCode::NOT_FOUND) .body(body) diff --git a/src/client/conn.rs b/src/client/conn.rs index 91fdeeaa36..2a01d0135b 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -11,11 +11,12 @@ use std::fmt; use std::marker::PhantomData; use bytes::Bytes; -use futures::{Async, Future, Poll, Stream}; +use futures::{Async, Future, Poll}; use futures::future::{self, Either}; use tokio_io::{AsyncRead, AsyncWrite}; use proto; +use proto::body::Entity; use super::dispatch; use {Body, Request, Response, StatusCode}; @@ -44,14 +45,13 @@ pub struct SendRequest { pub struct Connection where T: AsyncRead + AsyncWrite, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { inner: proto::dispatch::Dispatcher< proto::dispatch::Client, B, T, - B::Item, + B::Data, proto::ClientUpgradeTransaction, >, } @@ -134,8 +134,7 @@ impl SendRequest impl SendRequest where - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { /// Sends a `Request` on the associated connection. /// @@ -152,7 +151,7 @@ where /// the `Host` header based on it. You must add a `Host` header yourself /// before calling this method. /// - Since absolute-form `Uri`s are not required, if received, they will - /// be serialized as-is, irregardless of calling `Request::set_proxy`. + /// be serialized as-is. /// /// # Example /// @@ -185,19 +184,6 @@ where /// # fn main() {} /// ``` pub fn send_request(&mut self, req: Request) -> ResponseFuture { - /* TODO? - // The Connection API does less things automatically than the Client - // API does. For instance, right here, we always assume set_proxy, so - // that if an absolute-form URI is provided, it is serialized as-is. - // - // Part of the reason for this is to prepare for the change to `http` - // types, where there is no more set_proxy. - // - // It's important that this method isn't called directly from the - // `Client`, so that `set_proxy` there is still respected. - req.set_proxy(true); - */ - let inner = match self.dispatch.send(req) { Ok(rx) => { Either::A(rx.then(move |res| { @@ -269,8 +255,7 @@ impl fmt::Debug for SendRequest { impl Connection where T: AsyncRead + AsyncWrite, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { /// Return the inner IO object, and additional information. pub fn into_parts(self) -> Parts { @@ -297,8 +282,7 @@ where impl Future for Connection where T: AsyncRead + AsyncWrite, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { type Item = (); type Error = ::Error; @@ -311,8 +295,7 @@ where impl fmt::Debug for Connection where T: AsyncRead + AsyncWrite + fmt::Debug, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Connection") @@ -341,8 +324,7 @@ impl Builder { pub fn handshake(&self, io: T) -> Handshake where T: AsyncRead + AsyncWrite, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { Handshake { inner: HandshakeInner { @@ -356,8 +338,7 @@ impl Builder { pub(super) fn handshake_no_upgrades(&self, io: T) -> HandshakeNoUpgrades where T: AsyncRead + AsyncWrite, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { HandshakeNoUpgrades { inner: HandshakeInner { @@ -374,8 +355,7 @@ impl Builder { impl Future for Handshake where T: AsyncRead + AsyncWrite, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { type Item = (SendRequest, Connection); type Error = ::Error; @@ -400,14 +380,13 @@ impl fmt::Debug for Handshake { impl Future for HandshakeNoUpgrades where T: AsyncRead + AsyncWrite, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { type Item = (SendRequest, proto::dispatch::Dispatcher< proto::dispatch::Client, B, T, - B::Item, + B::Data, proto::ClientTransaction, >); type Error = ::Error; @@ -420,8 +399,7 @@ where impl Future for HandshakeInner where T: AsyncRead + AsyncWrite, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, R: proto::Http1Transaction< Incoming=StatusCode, Outgoing=proto::RequestLine, @@ -431,7 +409,7 @@ where proto::dispatch::Client, B, T, - B::Item, + B::Data, R, >); type Error = ::Error; @@ -485,16 +463,16 @@ impl AssertSendSync for SendRequest {} impl AssertSend for Connection where T: AsyncRead + AsyncWrite, - B: Stream, - B::Item: AsRef<[u8]> + Send, + B: Entity + 'static, + B::Data: Send + 'static, {} #[doc(hidden)] impl AssertSendSync for Connection where T: AsyncRead + AsyncWrite, - B: Stream, - B::Item: AsRef<[u8]> + Send + Sync, + B: Entity + 'static, + B::Data: Send + Sync + 'static, {} #[doc(hidden)] diff --git a/src/client/mod.rs b/src/client/mod.rs index e2245de403..d8a808e318 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -7,14 +7,15 @@ use std::rc::Rc; use std::sync::Arc; use std::time::Duration; -use futures::{Async, Future, Poll, Stream}; +use futures::{Async, Future, Poll}; use futures::future::{self, Executor}; use http::{Method, Request, Response, Uri, Version}; use http::header::{Entry, HeaderValue, HOST}; use tokio::reactor::Handle; pub use tokio_service::Service; -use proto::{self, Body}; +use proto::body::{Body, Entity}; +use proto; use self::pool::Pool; pub use self::connect::{HttpConnector, Connect}; @@ -101,8 +102,7 @@ impl Client { impl Client where C: Connect, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { /// Send a `GET` request to the supplied `Uri`. @@ -181,13 +181,13 @@ where C: Connect, let client = self.clone(); //TODO: let is_proxy = req.is_proxy(); - //let uri = req.uri().clone(); + let uri = req.uri().clone(); let fut = RetryableSendRequest { client: client, future: self.send_request(req, &domain), domain: domain, //is_proxy: is_proxy, - //uri: uri, + uri: uri, }; FutureResponse(Box::new(fut)) } @@ -293,8 +293,7 @@ where C: Connect, impl Service for Client where C: Connect, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { type Request = Request; type Response = Response; @@ -350,14 +349,13 @@ struct RetryableSendRequest { domain: String, future: Box, Error=ClientError>>, //is_proxy: bool, - //uri: Uri, + uri: Uri, } impl Future for RetryableSendRequest where C: Connect, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { type Item = Response; type Error = ::Error; @@ -370,7 +368,7 @@ where Err(ClientError::Normal(err)) => return Err(err), Err(ClientError::Canceled { connection_reused, - req, + mut req, reason, }) => { if !self.client.retry_canceled_requests || !connection_reused { @@ -380,6 +378,7 @@ where } trace!("unstarted request canceled, trying again (reason={:?})", reason); + *req.uri_mut() = self.uri.clone(); self.future = self.client.send_request(req, &self.domain); } } @@ -547,8 +546,7 @@ impl Config { impl Config where C: Connect, - B: Stream, - B::Item: AsRef<[u8]>, + B: Entity, { /// Construct the Client with this configuration. #[inline] @@ -569,8 +567,7 @@ where C: Connect, } impl Config -where B: Stream, - B::Item: AsRef<[u8]>, +where B: Entity, { /// Construct the Client with this configuration. #[inline] diff --git a/src/headers.rs b/src/headers.rs index 1e98ee4ce2..1cad1ba9e0 100644 --- a/src/headers.rs +++ b/src/headers.rs @@ -1,7 +1,14 @@ +use std::fmt::Write; + +use bytes::BytesMut; use http::HeaderMap; -use http::header::{CONNECTION, CONTENT_LENGTH, EXPECT, HeaderValue, TRANSFER_ENCODING}; +use http::header::{CONNECTION, CONTENT_LENGTH, EXPECT, TRANSFER_ENCODING}; +use http::header::{HeaderValue, OccupiedEntry, ValueIter}; use unicase; +/// Maximum number of bytes needed to serialize a u64 into ASCII decimal. +const MAX_DECIMAL_U64_BYTES: usize = 20; + pub fn connection_keep_alive(headers: &HeaderMap) -> bool { for line in headers.get_all(CONNECTION) { if let Ok(s) = line.to_str() { @@ -31,13 +38,15 @@ pub fn connection_close(headers: &HeaderMap) -> bool { } pub fn content_length_parse(headers: &HeaderMap) -> Option { + content_length_parse_all(headers.get_all(CONTENT_LENGTH).into_iter()) +} + +pub fn content_length_parse_all(values: ValueIter) -> Option { // If multiple Content-Length headers were sent, everything can still // be alright if they all contain the same value, and all parse // correctly. If not, then it's an error. - let values = headers.get_all(CONTENT_LENGTH); let folded = values - .into_iter() .fold(None, |prev, line| match prev { Some(Ok(prev)) => { Some(line @@ -66,12 +75,25 @@ pub fn content_length_zero(headers: &mut HeaderMap) { headers.insert(CONTENT_LENGTH, HeaderValue::from_static("0")); } +pub fn content_length_value(len: u64) -> HeaderValue { + let mut len_buf = BytesMut::with_capacity(MAX_DECIMAL_U64_BYTES); + write!(len_buf, "{}", len) + .expect("BytesMut can hold a decimal u64"); + // safe because u64 Display is ascii numerals + unsafe { + HeaderValue::from_shared_unchecked(len_buf.freeze()) + } +} + pub fn expect_continue(headers: &HeaderMap) -> bool { Some(&b"100-continue"[..]) == headers.get(EXPECT).map(|v| v.as_bytes()) } pub fn transfer_encoding_is_chunked(headers: &HeaderMap) -> bool { - let mut encodings = headers.get_all(TRANSFER_ENCODING).into_iter(); + is_chunked(headers.get_all(TRANSFER_ENCODING).into_iter()) +} + +pub fn is_chunked(mut encodings: ValueIter) -> bool { // chunked must always be the last encoding, according to spec if let Some(line) = encodings.next_back() { if let Ok(s) = line.to_str() { @@ -83,3 +105,33 @@ pub fn transfer_encoding_is_chunked(headers: &HeaderMap) -> bool { false } + +pub fn add_chunked(mut entry: OccupiedEntry) { + const CHUNKED: &'static str = "chunked"; + + if let Some(line) = entry.iter_mut().next_back() { + // + 2 for ", " + let new_cap = line.as_bytes().len() + CHUNKED.len() + 2; + let mut buf = BytesMut::with_capacity(new_cap); + buf.copy_from_slice(line.as_bytes()); + buf.copy_from_slice(b", "); + buf.copy_from_slice(CHUNKED.as_bytes()); + + *line = HeaderValue::from_shared(buf.freeze()) + .expect("original header value plus ascii is valid"); + return; + } + + entry.insert(HeaderValue::from_static(CHUNKED)); +} + +#[cfg(test)] +mod tests { + #[test] + fn assert_max_decimal_u64_bytes() { + assert_eq!( + super::MAX_DECIMAL_U64_BYTES, + ::std::u64::MAX.to_string().len() + ); + } +} diff --git a/src/lib.rs b/src/lib.rs index 18d13c2bfc..66a0fa9a72 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,7 +44,7 @@ pub use http::{ pub use client::Client; pub use error::{Result, Error}; -pub use proto::{Body, Chunk}; +pub use proto::{body, Body, Chunk}; pub use server::Server; mod common; diff --git a/src/proto/body.rs b/src/proto/body.rs index 28acd51f67..23eea59e7b 100644 --- a/src/proto/body.rs +++ b/src/proto/body.rs @@ -1,50 +1,236 @@ +//! Streaming bodies for Requests and Responses +use std::borrow::Cow; use std::fmt; use bytes::Bytes; -use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend, Stream}; +use futures::{Async, Future, Poll, Stream}; use futures::sync::{mpsc, oneshot}; -use std::borrow::Cow; +use http::HeaderMap; use super::Chunk; -pub type BodySender = mpsc::Sender>; +type BodySender = mpsc::Sender>; + +/// This trait represents a streaming body of a `Request` or `Response`. +pub trait Entity { + /// A buffer of bytes representing a single chunk of a body. + type Data: AsRef<[u8]>; + + /// The error type of this stream. + //TODO: add bounds Into<::error::User> (or whatever it is called) + type Error; + + /// Poll for a `Data` buffer. + /// + /// Similar to `Stream::poll_next`, this yields `Some(Data)` until + /// the body ends, when it yields `None`. + fn poll_data(&mut self) -> Poll, Self::Error>; + + /// Poll for an optional **single** `HeaderMap` of trailers. + /// + /// This should **only** be called after `poll_data` has ended. + /// + /// Note: Trailers aren't currently used for HTTP/1, only for HTTP/2. + fn poll_trailers(&mut self) -> Poll, Self::Error> { + Ok(Async::Ready(None)) + } + + /// A hint that the `Body` is complete, and doesn't need to be polled more. + /// + /// This can be useful to determine if the there is any body or trailers + /// without having to poll. An empty `Body` could return `true` and hyper + /// would be able to know that only the headers need to be sent. Or, it can + /// also be checked after each `poll_data` call, to allow hyper to try to + /// end the underlying stream with the last chunk, instead of needing to + /// send an extra `DATA` frame just to mark the stream as finished. + /// + /// As a hint, it is used to try to optimize, and thus is OK for a default + /// implementation to return `false`. + fn is_end_stream(&self) -> bool { + false + } + + /// Return a length of the total bytes that will be streamed, if known. + /// + /// If an exact size of bytes is known, this would allow hyper to send a + /// `Content-Length` header automatically, not needing to fall back to + /// `Transfer-Encoding: chunked`. + /// + /// This does not need to be kept updated after polls, it will only be + /// called once to create the headers. + fn content_length(&self) -> Option { + None + } +} + +impl Entity for Box { + type Data = E::Data; + type Error = E::Error; + + fn poll_data(&mut self) -> Poll, Self::Error> { + (**self).poll_data() + } + + fn poll_trailers(&mut self) -> Poll, Self::Error> { + (**self).poll_trailers() + } -/// A `Stream` for `Chunk`s used in requests and responses. + fn is_end_stream(&self) -> bool { + (**self).is_end_stream() + } + + fn content_length(&self) -> Option { + (**self).content_length() + } +} + +/// A wrapper to consume an `Entity` as a futures `Stream`. +#[must_use = "streams do nothing unless polled"] +#[derive(Debug)] +pub struct EntityStream { + is_data_eof: bool, + entity: E, +} + +impl Stream for EntityStream { + type Item = E::Data; + type Error = E::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + loop { + if self.is_data_eof { + return self.entity.poll_trailers() + .map(|async| { + async.map(|_opt| { + // drop the trailers and return that Stream is done + None + }) + }); + } + + let opt = try_ready!(self.entity.poll_data()); + if let Some(data) = opt { + return Ok(Async::Ready(Some(data))); + } else { + self.is_data_eof = true; + } + } + } +} + +/// An `Entity` of `Chunk`s, used when receiving bodies. +/// +/// Also a good default `Entity` to use in many applications. #[must_use = "streams do nothing unless polled"] pub struct Body { kind: Kind, } -#[derive(Debug)] enum Kind { Chan { - close_tx: oneshot::Sender, + _close_tx: oneshot::Sender<()>, rx: mpsc::Receiver>, }, + Wrapped(Box + Send>), Once(Option), Empty, } -//pub(crate) +/// A sender half used with `Body::channel()`. #[derive(Debug)] -pub struct ChunkSender { - close_rx: oneshot::Receiver, - close_rx_check: bool, +pub struct Sender { + close_rx: oneshot::Receiver<()>, tx: BodySender, } impl Body { - /// Return an empty body stream + /// Create an empty `Body` stream. + /// + /// # Example + /// + /// ``` + /// use hyper::{Body, Request}; + /// + /// // create a `GET /` request + /// let get = Request::new(Body::empty()); + /// ``` #[inline] pub fn empty() -> Body { Body::new(Kind::Empty) } - /// Return a body stream with an associated sender half + /// Create a `Body` stream with an associated sender half. + #[inline] + pub fn channel() -> (Sender, Body) { + let (tx, rx) = mpsc::channel(0); + let (close_tx, close_rx) = oneshot::channel(); + + let tx = Sender { + close_rx: close_rx, + tx: tx, + }; + let rx = Body::new(Kind::Chan { + _close_tx: close_tx, + rx: rx, + }); + + (tx, rx) + } + + /// Wrap a futures `Stream` in a box inside `Body`. + /// + /// # Example + /// + /// ``` + /// # extern crate futures; + /// # extern crate hyper; + /// # use hyper::Body; + /// # fn main() { + /// let chunks = vec![ + /// "hello", + /// " ", + /// "world", + /// ]; + /// let stream = futures::stream::iter_ok(chunks); + /// + /// let body = Body::wrap_stream(stream); + /// # } + /// ``` + pub fn wrap_stream(stream: S) -> Body + where + S: Stream + Send + 'static, + Chunk: From, + { + Body::new(Kind::Wrapped(Box::new(stream.map(Chunk::from)))) + } + + /// Convert this `Body` into a `Stream`. + /// + /// # Example + /// + /// ``` + /// # extern crate futures; + /// # extern crate hyper; + /// # use futures::{Future, Stream}; + /// # use hyper::{Body, Request}; + /// # fn request_concat(some_req: Request) { + /// let req: Request = some_req; + /// let body = req.into_body(); + /// + /// let stream = body.into_stream(); + /// stream.concat2() + /// .map(|buf| { + /// println!("body length: {}", buf.len()); + /// }); + /// # } + /// # fn main() {} + /// ``` #[inline] - pub fn pair() -> (mpsc::Sender>, Body) { - let (tx, rx) = channel(); - (tx.tx, rx) + pub fn into_stream(self) -> EntityStream { + EntityStream { + is_data_eof: false, + entity: self, + } } /// Returns if this body was constructed via `Body::empty()`. @@ -68,8 +254,20 @@ impl Body { kind: kind, } } +} + +impl Default for Body { + #[inline] + fn default() -> Body { + Body::empty() + } +} + +impl Entity for Body { + type Data = Chunk; + type Error = ::Error; - fn poll_inner(&mut self) -> Poll, ::Error> { + fn poll_data(&mut self) -> Poll, Self::Error> { match self.kind { Kind::Chan { ref mut rx, .. } => match rx.poll().expect("mpsc cannot error") { Async::Ready(Some(Ok(chunk))) => Ok(Async::Ready(Some(chunk))), @@ -77,85 +275,75 @@ impl Body { Async::Ready(None) => Ok(Async::Ready(None)), Async::NotReady => Ok(Async::NotReady), }, + Kind::Wrapped(ref mut s) => s.poll(), Kind::Once(ref mut val) => Ok(Async::Ready(val.take())), Kind::Empty => Ok(Async::Ready(None)), } } -} -impl Default for Body { - #[inline] - fn default() -> Body { - Body::empty() + fn is_end_stream(&self) -> bool { + match self.kind { + Kind::Chan { .. } => false, + Kind::Wrapped(..) => false, + Kind::Once(ref val) => val.is_none(), + Kind::Empty => true + } } -} -impl Stream for Body { - type Item = Chunk; - type Error = ::Error; - - #[inline] - fn poll(&mut self) -> Poll, ::Error> { - self.poll_inner() + fn content_length(&self) -> Option { + match self.kind { + Kind::Chan { .. } => None, + Kind::Wrapped(..) => None, + Kind::Once(Some(ref val)) => Some(val.len() as u64), + Kind::Once(None) => None, + Kind::Empty => Some(0) + } } } impl fmt::Debug for Body { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_tuple("Body") - .field(&self.kind) + f.debug_struct("Body") .finish() } } -//pub(crate) -pub fn channel() -> (ChunkSender, Body) { - let (tx, rx) = mpsc::channel(0); - let (close_tx, close_rx) = oneshot::channel(); - - let tx = ChunkSender { - close_rx: close_rx, - close_rx_check: true, - tx: tx, - }; - let rx = Body::new(Kind::Chan { - close_tx: close_tx, - rx: rx, - }); - - (tx, rx) -} - -impl ChunkSender { +impl Sender { + /// Check to see if this `Sender` can send more data. pub fn poll_ready(&mut self) -> Poll<(), ()> { - if self.close_rx_check { - match self.close_rx.poll() { - Ok(Async::Ready(true)) | Err(_) => return Err(()), - Ok(Async::Ready(false)) => { - // needed to allow converting into a plain mpsc::Receiver - // if it has been, the tx will send false to disable this check - self.close_rx_check = false; - } - Ok(Async::NotReady) => (), - } + match self.close_rx.poll() { + Ok(Async::Ready(())) | Err(_) => return Err(()), + Ok(Async::NotReady) => (), } self.tx.poll_ready().map_err(|_| ()) } - pub fn start_send(&mut self, msg: Result) -> StartSend<(), ()> { - match self.tx.start_send(msg) { - Ok(AsyncSink::Ready) => Ok(AsyncSink::Ready), - Ok(AsyncSink::NotReady(_)) => Ok(AsyncSink::NotReady(())), - Err(_) => Err(()), - } + /// Sends data on this channel. + /// + /// This should be called after `poll_ready` indicated the channel + /// could accept another `Chunk`. + /// + /// Returns `Err(Chunk)` if the channel could not (currently) accept + /// another `Chunk`. + pub fn send_data(&mut self, chunk: Chunk) -> Result<(), Chunk> { + self.tx.try_send(Ok(chunk)) + .map_err(|err| err.into_inner().expect("just sent Ok")) + } + + pub(crate) fn send_error(&mut self, err: ::Error) { + let _ = self.tx.try_send(Err(err)); } } impl From for Body { #[inline] - fn from (chunk: Chunk) -> Body { - Body::new(Kind::Once(Some(chunk))) + fn from(chunk: Chunk) -> Body { + if chunk.is_empty() { + Body::empty() + } else { + Body::new(Kind::Once(Some(chunk))) + } } } @@ -214,13 +402,6 @@ impl From> for Body { } } -impl From> for Body { - #[inline] - fn from (body: Option) -> Body { - body.unwrap_or_default() - } -} - fn _assert_send_sync() { fn _assert_send() {} fn _assert_sync() {} @@ -232,15 +413,14 @@ fn _assert_send_sync() { #[test] fn test_body_stream_concat() { - use futures::{Sink, Stream, Future}; - let (tx, body) = Body::pair(); + use futures::{Stream, Future}; - ::std::thread::spawn(move || { - let tx = tx.send(Ok("hello ".into())).wait().unwrap(); - tx.send(Ok("world".into())).wait().unwrap(); - }); + let body = Body::from("hello world"); - let total = body.concat2().wait().unwrap(); + let total = body.into_stream() + .concat2() + .wait() + .unwrap(); assert_eq!(total.as_ref(), b"hello world"); } diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index 30f9df87f3..9a3419f6a2 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -8,7 +8,7 @@ use futures::task::Task; use http::{Method, Version}; use tokio_io::{AsyncRead, AsyncWrite}; -use proto::{Chunk, Decode, Http1Transaction, MessageHead}; +use proto::{BodyLength, Chunk, Decode, Http1Transaction, MessageHead}; use super::io::{Cursor, Buffered}; use super::{EncodedBuf, Encoder, Decoder}; @@ -418,7 +418,7 @@ where I: AsyncRead + AsyncWrite, self.io.can_buffer() } - pub fn write_head(&mut self, mut head: MessageHead, body: bool) { + pub fn write_head(&mut self, mut head: MessageHead, body: Option) { debug_assert!(self.can_write_head()); if !T::should_read_first() { @@ -541,7 +541,7 @@ where I: AsyncRead + AsyncWrite, match self.state.writing { Writing::Init => { if let Some(msg) = T::on_error(&err) { - self.write_head(msg, false); + self.write_head(msg, None); self.state.error = Some(err); return Ok(()); } diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index 31539e3887..dc77d45e88 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -1,17 +1,18 @@ use std::io; use bytes::Bytes; -use futures::{Async, AsyncSink, Future, Poll, Stream}; +use futures::{Async, Future, Poll, Stream}; use http::{Request, Response, StatusCode}; use tokio_io::{AsyncRead, AsyncWrite}; use tokio_service::Service; -use proto::{Body, Conn, Http1Transaction, MessageHead, RequestHead, RequestLine, ResponseHead}; +use proto::body::Entity; +use proto::{Body, BodyLength, Conn, Http1Transaction, MessageHead, RequestHead, RequestLine, ResponseHead}; pub struct Dispatcher { conn: Conn, dispatch: D, - body_tx: Option<::proto::body::ChunkSender>, + body_tx: Option<::proto::body::Sender>, body_rx: Option, is_closing: bool, } @@ -46,7 +47,7 @@ where I: AsyncRead + AsyncWrite, B: AsRef<[u8]>, T: Http1Transaction, - Bs: Stream, + Bs: Entity, { pub fn new(dispatch: D, conn: Conn) -> Self { Dispatcher { @@ -130,13 +131,10 @@ where } match self.conn.read_body() { Ok(Async::Ready(Some(chunk))) => { - match body.start_send(Ok(chunk)) { - Ok(AsyncSink::Ready) => { + match body.send_data(chunk) { + Ok(()) => { self.body_tx = Some(body); }, - Ok(AsyncSink::NotReady(_chunk)) => { - unreachable!("mpsc poll_ready was ready, start_send was not"); - } Err(_canceled) => { if self.conn.can_read_body() { trace!("body receiver dropped before eof, closing"); @@ -154,7 +152,7 @@ where return Ok(Async::NotReady); } Err(e) => { - let _ = body.start_send(Err(::Error::Io(e))); + body.send_error(::Error::Io(e)); } } } else { @@ -181,7 +179,7 @@ where match self.conn.read_head() { Ok(Async::Ready(Some((head, has_body)))) => { let body = if has_body { - let (mut tx, rx) = ::proto::body::channel(); + let (mut tx, rx) = Body::channel(); let _ = tx.poll_ready(); // register this task if rx is dropped self.body_tx = Some(tx); rx @@ -213,7 +211,12 @@ where return Ok(Async::Ready(())); } else if self.body_rx.is_none() && self.conn.can_write_head() && self.dispatch.should_poll() { if let Some((head, body)) = try_ready!(self.dispatch.poll_msg()) { - self.conn.write_head(head, body.is_some()); + let body_type = body.as_ref().map(|body| { + body.content_length() + .map(BodyLength::Known) + .unwrap_or(BodyLength::Unknown) + }); + self.conn.write_head(head, body_type); self.body_rx = body; } else { self.close(); @@ -222,7 +225,7 @@ where } else if !self.conn.can_buffer_body() { try_ready!(self.poll_flush()); } else if let Some(mut body) = self.body_rx.take() { - let chunk = match body.poll()? { + let chunk = match body.poll_data()? { Async::Ready(Some(chunk)) => { self.body_rx = Some(body); chunk @@ -291,7 +294,7 @@ where I: AsyncRead + AsyncWrite, B: AsRef<[u8]>, T: Http1Transaction, - Bs: Stream, + Bs: Entity, { type Item = (); type Error = ::Error; @@ -316,8 +319,7 @@ impl Server where S: Service { impl Dispatch for Server where S: Service, Response=Response, Error=::Error>, - Bs: Stream, - Bs::Item: AsRef<[u8]>, + Bs: Entity, { type PollItem = MessageHead; type PollBody = Bs; @@ -338,7 +340,12 @@ where subject: parts.status, headers: parts.headers, }; - Ok(Async::Ready(Some((head, Some(body))))) + let body = if body.is_end_stream() { + None + } else { + Some(body) + }; + Ok(Async::Ready(Some((head, body)))) } else { unreachable!("poll_msg shouldn't be called if no inflight"); } @@ -382,8 +389,7 @@ impl Client { impl Dispatch for Client where - B: Stream, - B::Item: AsRef<[u8]>, + B: Entity, { type PollItem = RequestHead; type PollBody = B; @@ -405,8 +411,14 @@ where subject: RequestLine(parts.method, parts.uri), headers: parts.headers, }; + + let body = if body.is_end_stream() { + None + } else { + Some(body) + }; self.callback = Some(cb); - Ok(Async::Ready(Some((head, Some(body))))) + Ok(Async::Ready(Some((head, body)))) } } }, diff --git a/src/proto/h1/role.rs b/src/proto/h1/role.rs index 10bfbdcafa..2c1320cbd3 100644 --- a/src/proto/h1/role.rs +++ b/src/proto/h1/role.rs @@ -1,12 +1,12 @@ use std::fmt::{self, Write}; use bytes::{BytesMut, Bytes}; -use http::header::{CONTENT_LENGTH, DATE, HeaderName, HeaderValue, TRANSFER_ENCODING}; +use http::header::{CONTENT_LENGTH, DATE, Entry, HeaderName, HeaderValue, TRANSFER_ENCODING}; use http::{HeaderMap, Method, StatusCode, Uri, Version}; use httparse; use headers; -use proto::{Decode, MessageHead, Http1Transaction, ParseResult, RequestLine, RequestHead}; +use proto::{BodyLength, Decode, MessageHead, Http1Transaction, ParseResult, RequestLine, RequestHead}; use proto::h1::{Encoder, Decoder, date}; const MAX_HEADERS: usize = 100; @@ -122,8 +122,13 @@ where } - fn encode(mut head: MessageHead, has_body: bool, method: &mut Option, dst: &mut Vec) -> ::Result { - trace!("Server::encode has_body={}, method={:?}", has_body, method); + fn encode( + mut head: MessageHead, + body: Option, + method: &mut Option, + dst: &mut Vec, + ) -> ::Result { + trace!("Server::encode body={:?}, method={:?}", body, method); // hyper currently doesn't support returning 1xx status codes as a Response // This is because Service only allows returning a single Response, and @@ -132,7 +137,7 @@ where let ret = if StatusCode::SWITCHING_PROTOCOLS == head.subject { T::on_encode_upgrade(&mut head) .map(|_| { - let mut enc = Server::set_length(&mut head, has_body, method.as_ref()); + let mut enc = Server::set_length(&mut head, body, method.as_ref()); enc.set_last(); enc }) @@ -143,7 +148,7 @@ where headers::content_length_zero(&mut head.headers); Err(::Error::Status) } else { - Ok(Server::set_length(&mut head, has_body, method.as_ref())) + Ok(Server::set_length(&mut head, body, method.as_ref())) }; @@ -160,6 +165,7 @@ where extend(dst, head.subject.as_str().as_bytes()); extend(dst, b" "); + // a reason MUST be written, as many parsers will expect it. extend(dst, head.subject.canonical_reason().unwrap_or("").as_bytes()); extend(dst, b"\r\n"); } @@ -207,7 +213,7 @@ where } impl Server<()> { - fn set_length(head: &mut MessageHead, has_body: bool, method: Option<&Method>) -> Encoder { + fn set_length(head: &mut MessageHead, body: Option, method: Option<&Method>) -> Encoder { // these are here thanks to borrowck // `if method == Some(&Method::Get)` says the RHS doesn't live long enough const HEAD: Option<&'static Method> = Some(&Method::HEAD); @@ -230,8 +236,8 @@ impl Server<()> { } }; - if has_body && can_have_body { - set_length(&mut head.headers, head.version == Version::HTTP_11) + if let (Some(body), true) = (body, can_have_body) { + set_length(&mut head.headers, body, head.version == Version::HTTP_11) } else { head.headers.remove(TRANSFER_ENCODING); if can_have_body { @@ -355,12 +361,17 @@ where } } - fn encode(mut head: MessageHead, has_body: bool, method: &mut Option, dst: &mut Vec) -> ::Result { - trace!("Client::encode has_body={}, method={:?}", has_body, method); + fn encode( + mut head: MessageHead, + body: Option, + method: &mut Option, + dst: &mut Vec, + ) -> ::Result { + trace!("Client::encode body={:?}, method={:?}", body, method); *method = Some(head.subject.0.clone()); - let body = Client::set_length(&mut head, has_body); + let body = Client::set_length(&mut head, body); let init_cap = 30 + head.headers.len() * AVERAGE_HEADER_SIZE; dst.reserve(init_cap); @@ -399,33 +410,143 @@ where } impl Client<()> { - fn set_length(head: &mut RequestHead, has_body: bool) -> Encoder { - if has_body { + fn set_length(head: &mut RequestHead, body: Option) -> Encoder { + if let Some(body) = body { let can_chunked = head.version == Version::HTTP_11 && (head.subject.0 != Method::HEAD) && (head.subject.0 != Method::GET) && (head.subject.0 != Method::CONNECT); - set_length(&mut head.headers, can_chunked) + set_length(&mut head.headers, body, can_chunked) } else { - head.headers.remove(CONTENT_LENGTH); head.headers.remove(TRANSFER_ENCODING); Encoder::length(0) } } } -fn set_length(headers: &mut HeaderMap, can_chunked: bool) -> Encoder { - let len = headers::content_length_parse(&headers); +fn set_length(headers: &mut HeaderMap, body: BodyLength, can_chunked: bool) -> Encoder { + // If the user already set specific headers, we should respect them, regardless + // of what the Entity knows about itself. They set them for a reason. + + // Because of the borrow checker, we can't check the for an existing + // Content-Length header while holding an `Entry` for the Transfer-Encoding + // header, so unfortunately, we must do the check here, first. + + let existing_con_len = headers::content_length_parse(headers); + let mut should_remove_con_len = false; + + if can_chunked { + // If the user set a transfer-encoding, respect that. Let's just + // make sure `chunked` is the final encoding. + let encoder = match headers.entry(TRANSFER_ENCODING) + .expect("TRANSFER_ENCODING is valid HeaderName") { + Entry::Occupied(te) => { + should_remove_con_len = true; + if headers::is_chunked(te.iter()) { + Some(Encoder::chunked()) + } else { + warn!("user provided transfer-encoding does not end in 'chunked'"); + + // There's a Transfer-Encoding, but it doesn't end in 'chunked'! + // An example that could trigger this: + // + // Transfer-Encoding: gzip + // + // This can be bad, depending on if this is a request or a + // response. + // + // - A request is illegal if there is a `Transfer-Encoding` + // but it doesn't end in `chunked`. + // - A response that has `Transfer-Encoding` but doesn't + // end in `chunked` isn't illegal, it just forces this + // to be close-delimited. + // + // We can try to repair this, by adding `chunked` ourselves. + + headers::add_chunked(te); + Some(Encoder::chunked()) + } + }, + Entry::Vacant(te) => { + if let Some(len) = existing_con_len { + Some(Encoder::length(len)) + } else if let BodyLength::Unknown = body { + should_remove_con_len = true; + te.insert(HeaderValue::from_static("chunked")); + Some(Encoder::chunked()) + } else { + None + } + }, + }; + + // This is because we need a second mutable borrow to remove + // content-length header. + if let Some(encoder) = encoder { + if should_remove_con_len && existing_con_len.is_some() { + headers.remove(CONTENT_LENGTH); + } + return encoder; + } + + // User didn't set transfer-encoding, AND we know body length, + // so we can just set the Content-Length automatically. + + let len = if let BodyLength::Known(len) = body { + len + } else { + unreachable!("BodyLength::Unknown would set chunked"); + }; - if let Some(len) = len { - Encoder::length(len) - } else if can_chunked { - //TODO: maybe not overwrite existing transfer-encoding - headers.insert(TRANSFER_ENCODING, HeaderValue::from_static("chunked")); - Encoder::chunked() + set_content_length(headers, len) } else { - headers.remove(TRANSFER_ENCODING); - Encoder::eof() + // Chunked isn't legal, so if it is set, we need to remove it. + // Also, if it *is* set, then we shouldn't replace with a length, + // since the user tried to imply there isn't a length. + let encoder = if headers.remove(TRANSFER_ENCODING).is_some() { + trace!("removing illegal transfer-encoding header"); + should_remove_con_len = true; + Encoder::eof() + } else if let Some(len) = existing_con_len { + Encoder::length(len) + } else if let BodyLength::Known(len) = body { + set_content_length(headers, len) + } else { + Encoder::eof() + }; + + if should_remove_con_len && existing_con_len.is_some() { + headers.remove(CONTENT_LENGTH); + } + + encoder + } +} + +fn set_content_length(headers: &mut HeaderMap, len: u64) -> Encoder { + // At this point, there should not be a valid Content-Length + // header. However, since we'll be indexing in anyways, we can + // warn the user if there was an existing illegal header. + + match headers.entry(CONTENT_LENGTH) + .expect("CONTENT_LENGTH is valid HeaderName") { + Entry::Occupied(mut cl) => { + // Uh oh, the user set `Content-Length` headers, but set bad ones. + // This would be an illegal message anyways, so let's try to repair + // with our known good length. + warn!("user provided content-length header was invalid"); + + // Internal sanity check, we should have already determined + // that the header was illegal before calling this function. + debug_assert!(headers::content_length_parse_all(cl.iter()).is_none()); + + cl.insert(headers::content_length_value(len)); + Encoder::length(len) + }, + Entry::Vacant(cl) => { + cl.insert(headers::content_length_value(len)); + Encoder::length(len) + } } } @@ -573,6 +694,7 @@ mod tests { } } + #[test] fn test_parse_request() { extern crate pretty_env_logger; diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 68b7be0cf3..b0e2a9d531 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -8,7 +8,7 @@ pub use self::body::Body; pub use self::chunk::Chunk; pub use self::h1::{dispatch, Conn}; -mod body; +pub mod body; mod chunk; mod h1; //mod h2; @@ -72,7 +72,12 @@ pub trait Http1Transaction { type Outgoing: Default; fn parse(bytes: &mut BytesMut) -> ParseResult; fn decoder(head: &MessageHead, method: &mut Option) -> ::Result; - fn encode(head: MessageHead, has_body: bool, method: &mut Option, dst: &mut Vec) -> ::Result; + fn encode( + head: MessageHead, + body: Option, + method: &mut Option, + dst: &mut Vec, + ) -> ::Result; fn on_error(err: &::Error) -> Option>; fn should_error_on_parse_eof() -> bool; @@ -81,6 +86,15 @@ pub trait Http1Transaction { pub type ParseResult = ::Result, usize)>>; +#[derive(Debug)] +pub enum BodyLength { + /// Content-Length + Known(u64), + /// Transfer-Encoding: chunked (if h1) + Unknown, +} + + #[derive(Debug)] pub enum Decode { /// Decode normally. diff --git a/src/server/conn.rs b/src/server/conn.rs index 389e28c708..4b34351cad 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -11,10 +11,11 @@ use std::fmt; use bytes::Bytes; -use futures::{Future, Poll, Stream}; +use futures::{Future, Poll}; use tokio_io::{AsyncRead, AsyncWrite}; -use proto::{self, Body}; +use proto; +use proto::body::{Body, Entity}; use super::{HyperService, Request, Response, Service}; /// A future binding a connection with a Service. @@ -24,14 +25,13 @@ use super::{HyperService, Request, Response, Service}; pub struct Connection where S: HyperService, - S::ResponseBody: Stream, - ::Item: AsRef<[u8]>, + S::ResponseBody: Entity, { pub(super) conn: proto::dispatch::Dispatcher< proto::dispatch::Server, S::ResponseBody, I, - ::Item, + ::Data, proto::ServerTransaction, >, } @@ -61,8 +61,7 @@ pub struct Parts { impl Connection where S: Service, Response = Response, Error = ::Error> + 'static, I: AsyncRead + AsyncWrite + 'static, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { /// Disables keep-alive for this connection. pub fn disable_keep_alive(&mut self) { @@ -99,8 +98,7 @@ where S: Service, Response = Response, Error = ::Erro impl Future for Connection where S: Service, Response = Response, Error = ::Error> + 'static, I: AsyncRead + AsyncWrite + 'static, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { type Item = (); type Error = ::Error; @@ -113,8 +111,7 @@ where S: Service, Response = Response, Error = ::Erro impl fmt::Debug for Connection where S: HyperService, - S::ResponseBody: Stream, - ::Item: AsRef<[u8]>, + S::ResponseBody: Entity, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Connection") diff --git a/src/server/mod.rs b/src/server/mod.rs index 0c2c93100d..18b0d0e7cb 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -23,7 +23,8 @@ use tokio::reactor::{Core, Handle, Timeout}; use tokio::net::TcpListener; pub use tokio_service::{NewService, Service}; -use proto::{self, Body}; +use proto::body::{Body, Entity}; +use proto; use self::addr_stream::AddrStream; use self::hyper_service::HyperService; @@ -48,10 +49,10 @@ pub struct Http { /// This server is intended as a convenience for creating a TCP listener on an /// address and then serving TCP connections accepted with the service provided. pub struct Server -where B: Stream, - B::Item: AsRef<[u8]>, +where + B: Entity, { - protocol: Http, + protocol: Http, new_service: S, reactor: Core, listener: TcpListener, @@ -90,7 +91,6 @@ pub struct AddrIncoming { timeout: Option, } - // ===== impl Http ===== impl + 'static> Http { @@ -154,7 +154,7 @@ impl + 'static> Http { /// actually run the server. pub fn bind(&self, addr: &SocketAddr, new_service: S) -> ::Result> where S: NewService, Response = Response, Error = ::Error> + 'static, - Bd: Stream, + Bd: Entity, { let core = try!(Core::new()); let handle = core.handle(); @@ -179,7 +179,7 @@ impl + 'static> Http { /// connection. pub fn serve_addr_handle(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result> where S: NewService, Response = Response, Error = ::Error>, - Bd: Stream, + Bd: Entity, { let listener = TcpListener::bind(addr, &handle)?; let mut incoming = AddrIncoming::new(listener, handle.clone(), self.sleep_on_errors)?; @@ -196,7 +196,7 @@ impl + 'static> Http { where I: Stream, I::Item: AsyncRead + AsyncWrite, S: NewService, Response = Response, Error = ::Error>, - Bd: Stream, + Bd: Entity, { Serve { incoming: incoming, @@ -246,10 +246,8 @@ impl + 'static> Http { /// ``` pub fn serve_connection(&self, io: I, service: S) -> Connection where S: Service, Response = Response, Error = ::Error>, - Bd: Stream, - Bd::Item: AsRef<[u8]>, + Bd: Entity, I: AsyncRead + AsyncWrite, - { let mut conn = proto::Conn::new(io); if !self.keep_alive { @@ -290,8 +288,7 @@ impl fmt::Debug for Http { impl Server where S: NewService, Response = Response, Error = ::Error> + 'static, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + B: Entity + 'static, { /// Returns the local address that this server is bound to. pub fn local_addr(&self) -> ::Result { @@ -407,8 +404,7 @@ impl Server } } -impl> fmt::Debug for Server -where B::Item: AsRef<[u8]> +impl> fmt::Debug for Server { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Server") @@ -445,8 +441,7 @@ where I: Stream, I::Item: AsyncRead + AsyncWrite, S: NewService, Response=Response, Error=::Error>, - B: Stream, - B::Item: AsRef<[u8]>, + B: Entity, { type Item = Connection; type Error = ::Error; @@ -720,7 +715,7 @@ impl Future for WaitUntilZero { } mod hyper_service { - use super::{Body, Request, Response, Service, Stream}; + use super::{Body, Entity, Request, Response, Service}; /// A "trait alias" for any type that implements `Service` with hyper's /// Request, Response, and Error types, and a streaming body. /// @@ -751,8 +746,7 @@ mod hyper_service { Response=Response, Error=::Error, >, - B: Stream, - B::Item: AsRef<[u8]>, + B: Entity, {} impl HyperService for S @@ -763,8 +757,7 @@ mod hyper_service { Error=::Error, >, S: Sealed, - B: Stream, - B::Item: AsRef<[u8]>, + B: Entity, { type ResponseBody = B; type Sealed = Opaque; diff --git a/tests/client.rs b/tests/client.rs index 51a29a6fd4..f3bfd5626d 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -34,7 +34,6 @@ macro_rules! test { url: $client_url:expr, headers: { $($request_header_name:expr => $request_header_val:expr,)* }, body: $request_body:expr, - proxy: $request_proxy:expr, response: status: $client_status:ident, @@ -53,7 +52,6 @@ macro_rules! test { url: $client_url, headers: { $($request_header_name => $request_header_val,)* }, body: $request_body, - proxy: $request_proxy, response: status: $client_status, @@ -73,7 +71,6 @@ macro_rules! test { url: $client_url:expr, headers: { $($request_header_name:expr => $request_header_val:expr,)* }, body: $request_body:expr, - proxy: $request_proxy:expr, response: status: $client_status:ident, @@ -99,7 +96,6 @@ macro_rules! test { url: $client_url, headers: { $($request_header_name => $request_header_val,)* }, body: $request_body, - proxy: $request_proxy, }.unwrap(); @@ -108,7 +104,7 @@ macro_rules! test { assert_eq!(res.headers()[$response_header_name], $response_header_val); )* - let body = core.run(res.into_parts().1.concat2()).unwrap(); + let body = core.run(res.into_body().into_stream().concat2()).unwrap(); let expected_res_body = Option::<&[u8]>::from($response_body) .unwrap_or_default(); @@ -126,7 +122,6 @@ macro_rules! test { url: $client_url:expr, headers: { $($request_header_name:expr => $request_header_val:expr,)* }, body: $request_body:expr, - proxy: $request_proxy:expr, error: $err:expr, ) => ( @@ -149,7 +144,6 @@ macro_rules! test { url: $client_url, headers: { $($request_header_name => $request_header_val,)* }, body: $request_body, - proxy: $request_proxy, }.unwrap_err(); if !$err(&err) { panic!("unexpected error: {:?}", err) @@ -171,7 +165,6 @@ macro_rules! test { url: $client_url:expr, headers: { $($request_header_name:expr => $request_header_val:expr,)* }, body: $request_body:expr, - proxy: $request_proxy:expr, ) => ({ let server = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = server.local_addr().unwrap(); @@ -183,32 +176,21 @@ macro_rules! test { } let client = config.build(&core.handle()); - let mut is_empty = false; let body = if let Some(body) = $request_body { let body: &'static str = body; body.into() } else { - is_empty = true; Body::empty() }; - let mut req = Request::builder(); - req + let req = Request::builder() .method(Method::$client_method) + .uri(&*format!($client_url, addr=addr)) $( .header($request_header_name, $request_header_val) )* - .uri(&*format!($client_url, addr=addr)); - - //TODO: remove when client bodies are fixed - if is_empty { - req.header("content-length", "0"); - } - - let req = req.body(body) + .body(body) .unwrap(); - // req.set_proxy($request_proxy); - let res = client.request(req); let (tx, rx) = oneshot::channel(); @@ -257,7 +239,6 @@ test! { url: "http://{addr}/", headers: {}, body: None, - proxy: false, response: status: OK, headers: { @@ -279,7 +260,6 @@ test! { url: "http://{addr}/foo?key=val#dont_send_me", headers: {}, body: None, - proxy: false, response: status: OK, headers: { @@ -301,7 +281,6 @@ test! { url: "http://{addr}/", headers: {}, body: Some(""), - proxy: false, response: status: OK, headers: { @@ -331,7 +310,6 @@ test! { "Content-Length" => "7", }, body: Some("foo bar"), - proxy: false, response: status: OK, headers: {}, @@ -361,7 +339,6 @@ test! { "Transfer-Encoding" => "chunked", }, body: Some("foo bar baz"), - proxy: false, response: status: OK, headers: {}, @@ -387,14 +364,14 @@ test! { headers: { "Content-Length" => "0", }, - body: Some(""), - proxy: false, + body: None, response: status: OK, headers: {}, body: None, } +/*TODO: when new Connect trait allows stating connection is proxied test! { name: client_http_proxy, @@ -407,18 +384,18 @@ test! { reply: REPLY_OK, client: + proxy: true, request: method: GET, url: "http://{addr}/proxy", headers: {}, body: None, - proxy: true, response: status: OK, headers: {}, body: None, } - +*/ test! { name: client_head_ignores_body, @@ -442,7 +419,6 @@ test! { url: "http://{addr}/head", headers: {}, body: None, - proxy: false, response: status: OK, headers: {}, @@ -473,7 +449,6 @@ test! { url: "http://{addr}/pipe", headers: {}, body: None, - proxy: false, response: status: OK, headers: {}, @@ -500,7 +475,6 @@ test! { url: "http://{addr}/err", headers: {}, body: None, - proxy: false, error: |err| match err { &hyper::Error::Incomplete => true, _ => false, @@ -527,7 +501,6 @@ test! { url: "http://{addr}/err", headers: {}, body: None, - proxy: false, error: |err| match err { &hyper::Error::Version => true, _ => false, @@ -562,7 +535,6 @@ test! { "Content-Length" => "7", }, body: Some("foo bar"), - proxy: false, response: status: OK, headers: {}, @@ -592,7 +564,6 @@ test! { url: "http://{addr}/upgrade", headers: {}, body: None, - proxy: false, error: |err| match err { &hyper::Error::Upgrade => true, _ => false, @@ -618,7 +589,6 @@ test! { url: "http://{addr}/", headers: {}, body: None, - proxy: false, error: |err| match err { &hyper::Error::Method => true, _ => false, @@ -648,7 +618,6 @@ test! { url: "http://{addr}/no-host/{addr}", headers: {}, body: None, - proxy: false, response: status: OK, headers: {}, @@ -755,7 +724,7 @@ mod dispatch_impl { .unwrap(); client.request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_parts().1.concat2() + res.into_body().into_stream().concat2() }).and_then(|_| { Timeout::new(Duration::from_secs(1), &handle).unwrap() .from_err() @@ -808,7 +777,7 @@ mod dispatch_impl { .unwrap(); let res = client.request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_parts().1.concat2() + res.into_body().into_stream().concat2() }); let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); core.run(res.join(rx).map(|r| r.0)).unwrap(); @@ -975,7 +944,7 @@ mod dispatch_impl { .unwrap(); let res = client.request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_parts().1.concat2() + res.into_body().into_stream().concat2() }); let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); core.run(res.join(rx).map(|r| r.0)).unwrap(); @@ -1023,7 +992,7 @@ mod dispatch_impl { .unwrap(); let res = client.request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_parts().1.concat2() + res.into_body().into_stream().concat2() }); let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); core.run(res.join(rx).map(|r| r.0)).unwrap(); @@ -1078,7 +1047,7 @@ mod dispatch_impl { .unwrap(); let res = client.request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_parts().1.concat2() + res.into_body().into_stream().concat2() }); core.run(res).unwrap(); @@ -1130,7 +1099,7 @@ mod dispatch_impl { .unwrap(); let res = client.request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_parts().1.concat2() + res.into_body().into_stream().concat2() }); let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); @@ -1215,8 +1184,6 @@ mod dispatch_impl { let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) - //TODO: remove this header when auto lengths are fixed - .header("content-length", "0") .body(Body::empty()) .unwrap(); let res = client.request(req); @@ -1227,8 +1194,6 @@ mod dispatch_impl { let rx = rx2.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); let req = Request::builder() .uri(&*format!("http://{}/b", addr)) - //TODO: remove this header when auto lengths are fixed - .header("content-length", "0") .body(Body::empty()) .unwrap(); let res = client.request(req); @@ -1281,8 +1246,6 @@ mod dispatch_impl { let req = Request::builder() .method("HEAD") .uri(&*format!("http://{}/a", addr)) - //TODO: remove this header when auto lengths are fixed - .header("content-length", "0") .body(Body::empty()) .unwrap(); let res = client.request(req); @@ -1293,8 +1256,6 @@ mod dispatch_impl { let rx = rx2.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); let req = Request::builder() .uri(&*format!("http://{}/b", addr)) - //TODO: remove this header when auto lengths are fixed - .header("content-length", "0") .body(Body::empty()) .unwrap(); let res = client.request(req); @@ -1434,7 +1395,7 @@ mod conn { .unwrap(); let res = client.send_request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().concat2() + res.into_body().into_stream().concat2() }); let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); @@ -1481,7 +1442,7 @@ mod conn { let res = client.send_request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().concat2() + res.into_body().into_stream().concat2() }); let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); @@ -1522,7 +1483,7 @@ mod conn { .unwrap(); let res1 = client.send_request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().concat2() + res.into_body().into_stream().concat2() }); // pipelined request will hit NotReady, and thus should return an Error::Cancel @@ -1599,7 +1560,7 @@ mod conn { let res = client.send_request(req).and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::SWITCHING_PROTOCOLS); assert_eq!(res.headers()["Upgrade"], "foobar"); - res.into_body().concat2() + res.into_body().into_stream().concat2() }); let rx = rx1.map_err(|_| hyper::Error::Io(io::Error::new(io::ErrorKind::Other, "thread panicked"))); @@ -1680,7 +1641,7 @@ mod conn { let res = client.send_request(req) .and_then(move |res| { assert_eq!(res.status(), hyper::StatusCode::OK); - res.into_body().concat2() + res.into_body().into_stream().concat2() }) .map(|body| { assert_eq!(body.as_ref(), b""); diff --git a/tests/server.rs b/tests/server.rs index 2bfc80b6e2..d5ce55746e 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -99,8 +99,8 @@ fn get_implicitly_empty() { type Future = Box>; fn call(&self, req: Request) -> Self::Future { - Box::new(req.into_parts() - .1 + Box::new(req.into_body() + .into_stream() .concat2() .map(|buf| { assert!(buf.is_empty()); @@ -110,112 +110,188 @@ fn get_implicitly_empty() { } } -#[test] -fn get_fixed_response() { - let foo_bar = b"foo bar baz"; - let server = serve(); - server.reply() - .header("content-length", foo_bar.len().to_string()) - .body(foo_bar); - let mut req = connect(server.addr()); - req.write_all(b"\ - GET / HTTP/1.1\r\n\ - Host: example.domain\r\n\ - Connection: close\r\n\ - \r\n\ - ").unwrap(); - let mut body = String::new(); - req.read_to_string(&mut body).unwrap(); - let n = body.find("\r\n\r\n").unwrap() + 4; +mod response_body_lengths { + use super::*; - assert_eq!(&body[n..], "foo bar baz"); -} + struct TestCase { + version: usize, + headers: &'static [(&'static str, &'static str)], + body: Bd, + expects_chunked: bool, + expects_con_len: bool, + } -#[test] -fn get_chunked_response() { - let foo_bar = b"foo bar baz"; - let server = serve(); - server.reply() - .header("transfer-encoding", "chunked") - .body(foo_bar); - let mut req = connect(server.addr()); - req.write_all(b"\ - GET / HTTP/1.1\r\n\ - Host: example.domain\r\n\ - Connection: close\r\n\ - \r\n\ - ").unwrap(); - let mut body = String::new(); - req.read_to_string(&mut body).unwrap(); - let n = body.find("\r\n\r\n").unwrap() + 4; + enum Bd { + Known(&'static str), + Unknown(&'static str), + } - assert_eq!(&body[n..], "B\r\nfoo bar baz\r\n0\r\n\r\n"); -} + fn run_test(case: TestCase) { + assert!(case.version == 0 || case.version == 1, "TestCase.version must 0 or 1"); -#[test] -fn get_auto_response() { - let foo_bar = b"foo bar baz"; - let server = serve(); - server.reply() - .body(foo_bar); - let mut req = connect(server.addr()); - req.write_all(b"\ - GET / HTTP/1.1\r\n\ - Host: example.domain\r\n\ - Connection: close\r\n\ - \r\n\ - ").unwrap(); - let mut body = String::new(); - req.read_to_string(&mut body).unwrap(); + let server = serve(); - assert!(has_header(&body, "Transfer-Encoding: chunked")); + let mut reply = server.reply(); + for header in case.headers { + reply = reply.header(header.0, header.1); + } - let n = body.find("\r\n\r\n").unwrap() + 4; - assert_eq!(&body[n..], "B\r\nfoo bar baz\r\n0\r\n\r\n"); -} + let body_str = match case.body { + Bd::Known(b) => { + reply.body(b); + b + }, + Bd::Unknown(b) => { + let (mut tx, body) = hyper::Body::channel(); + tx.send_data(b.into()).expect("send_data"); + reply.body_stream(body); + b + }, + }; + + let mut req = connect(server.addr()); + write!(req, "\ + GET / HTTP/1.{}\r\n\ + Host: example.domain\r\n\ + Connection: close\r\n\ + \r\n\ + ", case.version).expect("request write"); + let mut body = String::new(); + req.read_to_string(&mut body).unwrap(); + + assert_eq!( + case.expects_chunked, + has_header(&body, "transfer-encoding:"), + "expects_chunked" + ); + assert_eq!( + case.expects_con_len, + has_header(&body, "content-length:"), + "expects_con_len" + ); + + let n = body.find("\r\n\r\n").unwrap() + 4; + + if case.expects_chunked { + let len = body.len(); + assert_eq!(&body[n + 1..n + 3], "\r\n", "expected body chunk size header"); + assert_eq!(&body[n + 3..len - 7], body_str, "expected body"); + assert_eq!(&body[len - 7..], "\r\n0\r\n\r\n", "expected body final chunk size header"); + } else { + assert_eq!(&body[n..], body_str, "expected body"); + } + } -#[test] -fn http_10_get_auto_response() { - let foo_bar = b"foo bar baz"; - let server = serve(); - server.reply() - .body(foo_bar); - let mut req = connect(server.addr()); - req.write_all(b"\ - GET / HTTP/1.0\r\n\ - Host: example.domain\r\n\ - \r\n\ - ").unwrap(); - let mut body = String::new(); - req.read_to_string(&mut body).unwrap(); + #[test] + fn get_fixed_response_known() { + run_test(TestCase { + version: 1, + headers: &[("content-length", "11")], + body: Bd::Known("foo bar baz"), + expects_chunked: false, + expects_con_len: true, + }); + } - assert!(!has_header(&body, "Transfer-Encoding:")); + #[test] + fn get_fixed_response_unknown() { + run_test(TestCase { + version: 1, + headers: &[("content-length", "11")], + body: Bd::Unknown("foo bar baz"), + expects_chunked: false, + expects_con_len: true, + }); + } - let n = body.find("\r\n\r\n").unwrap() + 4; - assert_eq!(&body[n..], "foo bar baz"); -} + #[test] + fn get_chunked_response_known() { + run_test(TestCase { + version: 1, + headers: &[("transfer-encoding", "chunked")], + // even though we know the length, don't strip user's TE header + body: Bd::Known("foo bar baz"), + expects_chunked: true, + expects_con_len: false, + }); + } -#[test] -fn http_10_get_chunked_response() { - let foo_bar = b"foo bar baz"; - let server = serve(); - server.reply() - // this header should actually get removed - .header("transfer-encoding", "chunked") - .body(foo_bar); - let mut req = connect(server.addr()); - req.write_all(b"\ - GET / HTTP/1.0\r\n\ - Host: example.domain\r\n\ - \r\n\ - ").unwrap(); - let mut body = String::new(); - req.read_to_string(&mut body).unwrap(); + #[test] + fn get_chunked_response_unknown() { + run_test(TestCase { + version: 1, + headers: &[("transfer-encoding", "chunked")], + body: Bd::Unknown("foo bar baz"), + expects_chunked: true, + expects_con_len: false, + }); + } - assert!(!has_header(&body, "Transfer-Encoding:")); + #[test] + fn get_chunked_response_trumps_length() { + run_test(TestCase { + version: 1, + headers: &[ + ("transfer-encoding", "chunked"), + // both headers means content-length is stripped + ("content-length", "11"), + ], + body: Bd::Known("foo bar baz"), + expects_chunked: true, + expects_con_len: false, + }); + } - let n = body.find("\r\n\r\n").unwrap() + 4; - assert_eq!(&body[n..], "foo bar baz"); + #[test] + fn get_auto_response_with_entity_unknown_length() { + run_test(TestCase { + version: 1, + // no headers means trying to guess from Entity + headers: &[], + body: Bd::Unknown("foo bar baz"), + expects_chunked: true, + expects_con_len: false, + }); + } + + #[test] + fn get_auto_response_with_entity_known_length() { + run_test(TestCase { + version: 1, + // no headers means trying to guess from Entity + headers: &[], + body: Bd::Known("foo bar baz"), + expects_chunked: false, + expects_con_len: true, + }); + } + + + #[test] + fn http_10_get_auto_response_with_entity_unknown_length() { + run_test(TestCase { + version: 0, + // no headers means trying to guess from Entity + headers: &[], + body: Bd::Unknown("foo bar baz"), + expects_chunked: false, + expects_con_len: false, + }); + } + + + #[test] + fn http_10_get_chunked_response() { + run_test(TestCase { + version: 0, + // http/1.0 should strip this header + headers: &[("transfer-encoding", "chunked")], + // even when we don't know the length + body: Bd::Unknown("foo bar baz"), + expects_chunked: false, + expects_con_len: false, + }); + } } #[test] @@ -314,67 +390,6 @@ fn post_with_incomplete_body() { req.read(&mut [0; 256]).expect("read"); } -#[test] -fn empty_response_chunked() { - let server = serve(); - - server.reply() - .body(""); - - let mut req = connect(server.addr()); - req.write_all(b"\ - GET / HTTP/1.1\r\n\ - Host: example.domain\r\n\ - Content-Length: 0\r\n\ - Connection: close\r\n\ - \r\n\ - ").unwrap(); - - - let mut response = String::new(); - req.read_to_string(&mut response).unwrap(); - - assert!(response.contains("Transfer-Encoding: chunked\r\n")); - - let mut lines = response.lines(); - assert_eq!(lines.next(), Some("HTTP/1.1 200 OK")); - - let mut lines = lines.skip_while(|line| !line.is_empty()); - assert_eq!(lines.next(), Some("")); - // 0\r\n\r\n - assert_eq!(lines.next(), Some("0")); - assert_eq!(lines.next(), Some("")); - assert_eq!(lines.next(), None); -} - -#[test] -fn empty_response_chunked_without_body_should_set_content_length() { - extern crate pretty_env_logger; - let _ = pretty_env_logger::try_init(); - let server = serve(); - server.reply() - .header("transfer-encoding", "chunked"); - let mut req = connect(server.addr()); - req.write_all(b"\ - GET / HTTP/1.1\r\n\ - Host: example.domain\r\n\ - Connection: close\r\n\ - \r\n\ - ").unwrap(); - - let mut response = String::new(); - req.read_to_string(&mut response).unwrap(); - - assert!(!response.contains("Transfer-Encoding: chunked\r\n")); - assert!(response.contains("Content-Length: 0\r\n")); - - let mut lines = response.lines(); - assert_eq!(lines.next(), Some("HTTP/1.1 200 OK")); - - let mut lines = lines.skip_while(|line| !line.is_empty()); - assert_eq!(lines.next(), Some("")); - assert_eq!(lines.next(), None); -} #[test] fn head_response_can_send_content_length() { @@ -394,7 +409,7 @@ fn head_response_can_send_content_length() { let mut response = String::new(); req.read_to_string(&mut response).unwrap(); - assert!(response.contains("Content-Length: 1024\r\n")); + assert!(response.contains("content-length: 1024\r\n")); let mut lines = response.lines(); assert_eq!(lines.next(), Some("HTTP/1.1 200 OK")); @@ -423,7 +438,7 @@ fn response_does_not_set_chunked_if_body_not_allowed() { let mut response = String::new(); req.read_to_string(&mut response).unwrap(); - assert!(!response.contains("Transfer-Encoding")); + assert!(!response.contains("transfer-encoding")); let mut lines = response.lines(); assert_eq!(lines.next(), Some("HTTP/1.1 304 Not Modified")); @@ -691,13 +706,13 @@ fn pipeline_enabled() { { let mut lines = buf.split(|&b| b == b'\n'); assert_eq!(s(lines.next().unwrap()), "HTTP/1.1 200 OK\r"); - assert_eq!(s(lines.next().unwrap()), "Content-Length: 12\r"); + assert_eq!(s(lines.next().unwrap()), "content-length: 12\r"); lines.next().unwrap(); // Date assert_eq!(s(lines.next().unwrap()), "\r"); assert_eq!(s(lines.next().unwrap()), "Hello World"); assert_eq!(s(lines.next().unwrap()), "HTTP/1.1 200 OK\r"); - assert_eq!(s(lines.next().unwrap()), "Content-Length: 12\r"); + assert_eq!(s(lines.next().unwrap()), "content-length: 12\r"); lines.next().unwrap(); // Date assert_eq!(s(lines.next().unwrap()), "\r"); assert_eq!(s(lines.next().unwrap()), "Hello World"); @@ -720,7 +735,7 @@ fn http_10_request_receives_http_10_response() { \r\n\ ").unwrap(); - let expected = "HTTP/1.0 200 OK\r\nContent-Length: 0\r\n"; + let expected = "HTTP/1.0 200 OK\r\ncontent-length: 0\r\n"; let mut buf = [0; 256]; let n = req.read(&mut buf).unwrap(); assert!(n >= expected.len(), "read: {:?} >= {:?}", n, expected.len()); @@ -729,6 +744,7 @@ fn http_10_request_receives_http_10_response() { #[test] fn disable_keep_alive_mid_request() { + let mut core = Core::new().unwrap(); let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); let addr = listener.local_addr().unwrap(); @@ -773,6 +789,7 @@ fn disable_keep_alive_mid_request() { #[test] fn disable_keep_alive_post_request() { + let _ = pretty_env_logger::try_init(); let mut core = Core::new().unwrap(); let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &core.handle()).unwrap(); let addr = listener.local_addr().unwrap(); @@ -790,10 +807,11 @@ fn disable_keep_alive_post_request() { let mut buf = [0; 1024 * 8]; loop { let n = req.read(&mut buf).expect("reading 1"); - if n < buf.len() { - if &buf[n - HELLO.len()..n] == HELLO.as_bytes() { - break; - } + if &buf[n - HELLO.len()..n] == HELLO.as_bytes() { + break; + } + if n == 0 { + panic!("unexpected eof"); } } @@ -1113,16 +1131,14 @@ fn streaming_body() { .map_err(|_| unreachable!()) .and_then(|(item, _incoming)| { let (socket, _) = item.unwrap(); - Http::<& &'static [u8]>::new() + Http::::new() .keep_alive(false) .serve_connection(socket, service_fn(|_| { static S: &'static [&'static [u8]] = &[&[b'x'; 1_000] as &[u8]; 1_00] as _; - let b = ::futures::stream::iter_ok(S.iter()); + let b = ::futures::stream::iter_ok(S.into_iter()) + .map(|&s| s); + let b = hyper::Body::wrap_stream(b); Ok(Response::new(b)) - /* - Ok(Response::, ::hyper::Error>>::new() - .with_body(b)) - */ })) .map(|_| ()) }); @@ -1195,7 +1211,12 @@ impl<'a> ReplyBuilder<'a> { } fn body>(self, body: T) { - self.tx.send(Reply::Body(body.as_ref().into())).unwrap(); + self.tx.send(Reply::Body(body.as_ref().to_vec().into())).unwrap(); + } + + fn body_stream(self, body: Body) + { + self.tx.send(Reply::Body(body)).unwrap(); } } @@ -1219,11 +1240,11 @@ struct TestService { _timeout: Option, } -#[derive(Clone, Debug)] +#[derive(Debug)] enum Reply { Status(hyper::StatusCode), Header(HeaderName, HeaderValue), - Body(Vec), + Body(hyper::Body), End, } @@ -1257,7 +1278,7 @@ impl Service for TestService { let tx2 = self.tx.clone(); let replies = self.reply.clone(); - Box::new(req.into_parts().1.for_each(move |chunk| { + Box::new(req.into_body().into_stream().for_each(move |chunk| { tx1.lock().unwrap().send(Msg::Chunk(chunk.to_vec())).unwrap(); Ok(()) }).then(move |result| { @@ -1278,7 +1299,7 @@ impl Service for TestService { res.headers_mut().insert(name, value); }, Reply::Body(body) => { - *res.body_mut() = body.into(); + *res.body_mut() = body; }, Reply::End => break, }