From aa297f45322d66980bb2b51c413b15dfd51533ea Mon Sep 17 00:00:00 2001 From: Marko Lalic Date: Mon, 25 May 2015 22:18:26 +0200 Subject: [PATCH] refactor(client): use HttpMessage in Request and Response BREAKING CHANGE: `hyper::client::request::Response` is no longer generic over `NetworkStream` types. It no longer requires a generic type parameter at all. --- src/client/request.rs | 134 +++++++++++++++-------------------------- src/client/response.rs | 84 +++++++------------------- 2 files changed, 70 insertions(+), 148 deletions(-) diff --git a/src/client/request.rs b/src/client/request.rs index 9e851027d0..98db4a6678 100644 --- a/src/client/request.rs +++ b/src/client/request.rs @@ -1,18 +1,19 @@ //! Client Requests use std::marker::PhantomData; -use std::io::{self, Write, BufWriter}; +use std::io::{self, Write}; use url::Url; use method::{self, Method}; use header::Headers; -use header::{self, Host}; +use header::Host; use net::{NetworkStream, NetworkConnector, HttpConnector, Fresh, Streaming}; -use http::{HttpWriter, LINE_ENDING}; -use http::HttpWriter::{ThroughWriter, ChunkedWriter, SizedWriter, EmptyWriter}; use version; use client::{Response, get_host_and_port}; +use message::{HttpMessage, RequestHead}; +use http11::Http11Message; + /// A client request to a remote server. /// The W type tracks the state of the request, Fresh vs Streaming. @@ -23,7 +24,7 @@ pub struct Request { /// The HTTP version of this request. pub version: version::HttpVersion, - body: HttpWriter>>, + message: Box, headers: Headers, method: method::Method, @@ -41,22 +42,12 @@ impl Request { } impl Request { - /// Create a new client request. - pub fn new(method: method::Method, url: Url) -> ::Result> { - let mut conn = HttpConnector(None); - Request::with_connector(method, url, &mut conn) - } - - /// Create a new client request with a specific underlying NetworkStream. - pub fn with_connector(method: method::Method, url: Url, connector: &C) - -> ::Result> where - C: NetworkConnector, - S: Into> { + /// Create a new `Request` that will use the given `HttpMessage` for its communication + /// with the server. This implies that the given `HttpMessage` instance has already been + /// properly initialized by the caller (e.g. a TCP connection's already established). + pub fn with_message(method: method::Method, url: Url, message: Box) + -> ::Result> { let (host, port) = try!(get_host_and_port(&url)); - - let stream = try!(connector.connect(&*host, port, &*url.scheme)).into(); - let stream = ThroughWriter(BufWriter::new(stream)); - let mut headers = Headers::new(); headers.set(Host { hostname: host, @@ -68,77 +59,43 @@ impl Request { headers: headers, url: url, version: version::HttpVersion::Http11, - body: stream, + message: message, _marker: PhantomData, }) } - /// Consume a Fresh Request, writing the headers and method, - /// returning a Streaming Request. - pub fn start(mut self) -> ::Result> { - let mut uri = self.url.serialize_path().unwrap(); - if let Some(ref q) = self.url.query { - uri.push('?'); - uri.push_str(&q[..]); - } - - debug!("request line: {:?} {:?} {:?}", self.method, uri, self.version); - try!(write!(&mut self.body, "{} {} {}{}", - self.method, uri, self.version, LINE_ENDING)); - - - let stream = match self.method { - Method::Get | Method::Head => { - debug!("headers={:?}", self.headers); - try!(write!(&mut self.body, "{}{}", self.headers, LINE_ENDING)); - EmptyWriter(self.body.into_inner()) - }, - _ => { - let mut chunked = true; - let mut len = 0; - - match self.headers.get::() { - Some(cl) => { - chunked = false; - len = **cl; - }, - None => () - }; - - // can't do in match above, thanks borrowck - if chunked { - let encodings = match self.headers.get_mut::() { - Some(&mut header::TransferEncoding(ref mut encodings)) => { - //TODO: check if chunked is already in encodings. use HashSet? - encodings.push(header::Encoding::Chunked); - false - }, - None => true - }; - - if encodings { - self.headers.set::( - header::TransferEncoding(vec![header::Encoding::Chunked])) - } - } + /// Create a new client request. + pub fn new(method: method::Method, url: Url) -> ::Result> { + let mut conn = HttpConnector(None); + Request::with_connector(method, url, &mut conn) + } - debug!("headers={:?}", self.headers); - try!(write!(&mut self.body, "{}{}", self.headers, LINE_ENDING)); + /// Create a new client request with a specific underlying NetworkStream. + pub fn with_connector(method: method::Method, url: Url, connector: &C) + -> ::Result> where + C: NetworkConnector, + S: Into> { + let (host, port) = try!(get_host_and_port(&url)); + let stream = try!(connector.connect(&*host, port, &*url.scheme)).into(); - if chunked { - ChunkedWriter(self.body.into_inner()) - } else { - SizedWriter(self.body.into_inner(), len) - } - } - }; + Request::with_message(method, url, Box::new(Http11Message::with_stream(stream))) + } - Ok(Request { - method: self.method, + /// Consume a Fresh Request, writing the headers and method, + /// returning a Streaming Request. + pub fn start(mut self) -> ::Result> { + let head = try!(self.message.set_outgoing(RequestHead { headers: self.headers, + method: self.method, url: self.url, + })); + + Ok(Request { + method: head.method, + headers: head.headers, + url: head.url, version: self.version, - body: stream, + message: self.message, _marker: PhantomData, }) } @@ -153,20 +110,19 @@ impl Request { /// /// Consumes the Request. pub fn send(self) -> ::Result { - let raw = try!(self.body.end()).into_inner().unwrap(); // end() already flushes - Response::new(raw) + Response::with_message(self.message) } } impl Write for Request { #[inline] fn write(&mut self, msg: &[u8]) -> io::Result { - self.body.write(msg) + self.message.write(msg) } #[inline] fn flush(&mut self) -> io::Result<()> { - self.body.flush() + self.message.flush() } } @@ -180,11 +136,15 @@ mod tests { use header::{ContentLength,TransferEncoding,Encoding}; use url::form_urlencoded; use super::Request; + use http11::Http11Message; fn run_request(req: Request) -> Vec { let req = req.start().unwrap(); - let stream = *req.body.end().unwrap() - .into_inner().unwrap().downcast::().ok().unwrap(); + let message = req.message; + let mut message = message.downcast::().ok().unwrap(); + message.flush_outgoing().unwrap(); + let stream = *message + .into_inner().downcast::().ok().unwrap(); stream.write } diff --git a/src/client/response.rs b/src/client/response.rs index f21f762328..458c7005eb 100644 --- a/src/client/response.rs +++ b/src/client/response.rs @@ -1,21 +1,17 @@ //! Client Responses use std::io::{self, Read}; -use std::marker::PhantomData; -use std::net::Shutdown; -use buffer::BufReader; use header; -use header::{ContentLength, TransferEncoding}; -use header::Encoding::Chunked; -use net::{NetworkStream, HttpStream}; -use http::{self, HttpReader, RawStatus}; -use http::HttpReader::{SizedReader, ChunkedReader, EofReader}; +use net::NetworkStream; +use http::{self, RawStatus}; use status; use version; +use message::{ResponseHead, HttpMessage}; +use http11::Http11Message; /// A response for a client request to a remote server. #[derive(Debug)] -pub struct Response { +pub struct Response { /// The status from the server. pub status: status::StatusCode, /// The headers from the server. @@ -23,9 +19,7 @@ pub struct Response { /// The HTTP version of this response from the server. pub version: version::HttpVersion, status_raw: RawStatus, - body: HttpReader>>, - - _marker: PhantomData, + message: Box, } impl Response { @@ -33,50 +27,23 @@ impl Response { /// Creates a new response from a server. pub fn new(stream: Box) -> ::Result { trace!("Response::new"); - let mut stream = BufReader::new(stream); - - let head = try!(http::parse_response(&mut stream)); - let raw_status = head.subject; - let headers = head.headers; + Response::with_message(Box::new(Http11Message::with_stream(stream))) + } + /// Creates a new response received from the server on the given `HttpMessage`. + pub fn with_message(mut message: Box) -> ::Result { + trace!("Response::with_message"); + let ResponseHead { headers, raw_status, version } = try!(message.get_incoming()); let status = status::StatusCode::from_u16(raw_status.0); - debug!("version={:?}, status={:?}", head.version, status); + debug!("version={:?}, status={:?}", version, status); debug!("headers={:?}", headers); - - let body = if headers.has::() { - match headers.get::() { - Some(&TransferEncoding(ref codings)) => { - if codings.len() > 1 { - trace!("TODO: #2 handle other codings: {:?}", codings); - }; - - if codings.contains(&Chunked) { - ChunkedReader(stream, None) - } else { - trace!("not chuncked. read till eof"); - EofReader(stream) - } - } - None => unreachable!() - } - } else if headers.has::() { - match headers.get::() { - Some(&ContentLength(len)) => SizedReader(stream, len), - None => unreachable!() - } - } else { - trace!("neither Transfer-Encoding nor Content-Length"); - EofReader(stream) - }; - Ok(Response { status: status, - version: head.version, + version: version, headers: headers, - body: body, + message: message, status_raw: raw_status, - _marker: PhantomData, }) } @@ -84,21 +51,18 @@ impl Response { pub fn status_raw(&self) -> &RawStatus { &self.status_raw } - - /// Consumes the Request to return the NetworkStream underneath. - pub fn into_inner(self) -> Box { - self.body.into_inner().into_inner() - } } impl Read for Response { #[inline] fn read(&mut self, buf: &mut [u8]) -> io::Result { - let count = try!(self.body.read(buf)); + let count = try!(self.message.read(buf)); if count == 0 { if !http::should_keep_alive(self.version, &self.headers) { - try!(self.body.get_mut().get_mut().close(Shutdown::Both)); + try!(self.message.close_connection() + .map_err(|_| io::Error::new(io::ErrorKind::Other, + "Error closing connection"))); } } @@ -110,17 +74,15 @@ impl Read for Response { mod tests { use std::borrow::Cow::Borrowed; use std::io::{self, Read}; - use std::marker::PhantomData; - use buffer::BufReader; use header::Headers; use header::TransferEncoding; use header::Encoding; - use http::HttpReader::EofReader; use http::RawStatus; use mock::MockStream; use status; use version; + use http11::Http11Message; use super::Response; @@ -137,12 +99,12 @@ mod tests { status: status::StatusCode::Ok, headers: Headers::new(), version: version::HttpVersion::Http11, - body: EofReader(BufReader::new(Box::new(MockStream::new()))), + message: Box::new(Http11Message::with_stream(Box::new(MockStream::new()))), status_raw: RawStatus(200, Borrowed("OK")), - _marker: PhantomData, }; - let b = res.into_inner().downcast::().ok().unwrap(); + let message = res.message.downcast::().ok().unwrap(); + let b = message.into_inner().downcast::().ok().unwrap(); assert_eq!(b, Box::new(MockStream::new())); }