diff --git a/Cargo.toml b/Cargo.toml index 1da3bef0d1..a317b2d55e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,6 @@ percent-encoding = "1.0" relay = "0.1" time = "0.1" tokio-core = "0.1.11" -tokio-proto = { version = "0.1", optional = true } tokio-service = "0.1" tokio-io = "0.1" unicase = "2.0" @@ -47,8 +46,7 @@ spmc = "0.2" url = "1.0" [features] -default = ["server-proto"] +default = [] nightly = [] raw_status = [] compat = [ "http" ] -server-proto = ["tokio-proto"] diff --git a/src/client/mod.rs b/src/client/mod.rs index eb8c10eadc..697d8b509d 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -533,12 +533,6 @@ impl Config { self.set_host = val; self } - - #[doc(hidden)] - #[deprecated(since="0.11.11", note="no_proto is always enabled")] - pub fn no_proto(self) -> Config { - self - } } impl Config diff --git a/src/lib.rs b/src/lib.rs index 8dca895267..f39bc83ece 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,8 +33,6 @@ extern crate relay; extern crate time; extern crate tokio_core as tokio; #[macro_use] extern crate tokio_io; -#[cfg(feature = "tokio-proto")] -extern crate tokio_proto; extern crate tokio_service; extern crate unicase; @@ -55,19 +53,6 @@ pub use version::HttpVersion; #[cfg(feature = "raw_status")] pub use proto::RawStatus; -macro_rules! feat_server_proto { - ($($i:item)*) => ($( - #[cfg(feature = "server-proto")] - #[deprecated( - since="0.11.11", - note="All usage of the tokio-proto crate is going away." - )] - #[doc(hidden)] - #[allow(deprecated)] - $i - )*) -} - mod common; #[cfg(test)] mod mock; diff --git a/src/mock.rs b/src/mock.rs index ed340fa015..d871e73d23 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -118,12 +118,11 @@ impl AsyncIo { self.park_tasks = enabled; } - #[cfg(feature = "tokio-proto")] - //TODO: fix proto::conn::tests to not use tokio-proto API, - //and then this cfg flag go away + /* pub fn flushed(&self) -> bool { self.flushed } + */ pub fn blocked(&self) -> bool { self.blocked @@ -148,12 +147,11 @@ impl AsyncIo { AsyncIo::new(MockCursor::wrap(buf.into()), bytes) } - #[cfg(feature = "tokio-proto")] - //TODO: fix proto::conn::tests to not use tokio-proto API, - //and then this cfg flag go away - pub fn new_eof() -> AsyncIo { - AsyncIo::new(MockCursor::wrap(Vec::new().into()), 1) + /* + pub fn new_eof() -> AsyncIo { + AsyncIo::new(Buf::wrap(Vec::new().into()), 1) } + */ fn close(&mut self) { self.block_in(1); diff --git a/src/proto/body.rs b/src/proto/body.rs index 8f23378285..28acd51f67 100644 --- a/src/proto/body.rs +++ b/src/proto/body.rs @@ -3,14 +3,10 @@ use std::fmt; use bytes::Bytes; use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend, Stream}; use futures::sync::{mpsc, oneshot}; -#[cfg(feature = "tokio-proto")] -use tokio_proto; use std::borrow::Cow; use super::Chunk; -#[cfg(feature = "tokio-proto")] -pub type TokioBody = tokio_proto::streaming::Body; pub type BodySender = mpsc::Sender>; /// A `Stream` for `Chunk`s used in requests and responses. @@ -21,8 +17,6 @@ pub struct Body { #[derive(Debug)] enum Kind { - #[cfg(feature = "tokio-proto")] - Tokio(TokioBody), Chan { close_tx: oneshot::Sender, rx: mpsc::Receiver>, @@ -77,8 +71,6 @@ impl Body { fn poll_inner(&mut self) -> Poll, ::Error> { match self.kind { - #[cfg(feature = "tokio-proto")] - Kind::Tokio(ref mut rx) => rx.poll(), Kind::Chan { ref mut rx, .. } => match rx.poll().expect("mpsc cannot error") { Async::Ready(Some(Ok(chunk))) => Ok(Async::Ready(Some(chunk))), Async::Ready(Some(Err(err))) => Err(err), @@ -160,42 +152,6 @@ impl ChunkSender { } } -feat_server_proto! { - impl From for tokio_proto::streaming::Body { - fn from(b: Body) -> tokio_proto::streaming::Body { - match b.kind { - Kind::Tokio(b) => b, - Kind::Chan { close_tx, rx } => { - // disable knowing if the Rx gets dropped, since we cannot - // pass this tx along. - let _ = close_tx.send(false); - rx.into() - }, - Kind::Once(Some(chunk)) => TokioBody::from(chunk), - Kind::Once(None) | - Kind::Empty => TokioBody::empty(), - } - } - } - - impl From> for Body { - fn from(tokio_body: tokio_proto::streaming::Body) -> Body { - Body::new(Kind::Tokio(tokio_body)) - } - } -} - -impl From>> for Body { - #[inline] - fn from(src: mpsc::Receiver>) -> Body { - let (tx, _) = oneshot::channel(); - Body::new(Kind::Chan { - close_tx: tx, - rx: src, - }) - } -} - impl From for Body { #[inline] fn from (chunk: Chunk) -> Body { diff --git a/src/proto/chunk.rs b/src/proto/chunk.rs index 24caf796d2..3dd3fb762b 100644 --- a/src/proto/chunk.rs +++ b/src/proto/chunk.rs @@ -1,5 +1,4 @@ use std::fmt; -//use std::mem; use bytes::Bytes; diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index e9aa404c63..02559cf263 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -4,12 +4,8 @@ use std::marker::PhantomData; use bytes::Bytes; use futures::{Async, AsyncSink, Poll, StartSend}; -#[cfg(feature = "tokio-proto")] -use futures::{Sink, Stream}; use futures::task::Task; use tokio_io::{AsyncRead, AsyncWrite}; -#[cfg(feature = "tokio-proto")] -use tokio_proto::streaming::pipeline::{Frame, Transport}; use proto::{Chunk, Decode, Http1Transaction, MessageHead}; use super::io::{Cursor, Buffered}; @@ -81,70 +77,6 @@ where I: AsyncRead + AsyncWrite, self.io.into_inner() } - #[cfg(feature = "tokio-proto")] - fn poll_incoming(&mut self) -> Poll, Chunk, ::Error>>, io::Error> { - trace!("Conn::poll_incoming()"); - - #[derive(Debug)] - struct ParseEof; - - impl fmt::Display for ParseEof { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.write_str(::std::error::Error::description(self)) - } - } - - impl ::std::error::Error for ParseEof { - fn description(&self) -> &str { - "end of file reached before parsing could complete" - } - } - - loop { - if self.is_read_closed() { - trace!("Conn::poll when closed"); - return Ok(Async::Ready(None)); - } else if self.can_read_head() { - return match self.read_head() { - Ok(Async::Ready(Some((head, body)))) => { - Ok(Async::Ready(Some(Frame::Message { - message: head, - body: body, - }))) - }, - Ok(Async::Ready(None)) => Ok(Async::Ready(None)), - Ok(Async::NotReady) => Ok(Async::NotReady), - Err(::Error::Io(err)) => Err(err), - Err(::Error::Incomplete) => { - Err(io::Error::new(io::ErrorKind::UnexpectedEof, ParseEof)) - }, - Err(err) => Ok(Async::Ready(Some(Frame::Error { - error: err, - }))), - }; - } else if self.can_read_body() { - return self.read_body() - .map(|async| async.map(|chunk| Some(Frame::Body { - chunk: chunk - }))) - .or_else(|err| { - self.state.close_read(); - Ok(Async::Ready(Some(Frame::Error { error: err.into() }))) - }); - } else { - trace!("poll when on keep-alive"); - if !T::should_read_first() { - self.require_empty_read()?; - if self.is_read_closed() { - return Ok(Async::Ready(None)); - } - } - self.maybe_park_read(); - return Ok(Async::NotReady); - } - } - } - pub fn is_read_closed(&self) -> bool { self.state.is_read_closed() } @@ -667,101 +599,6 @@ where I: AsyncRead + AsyncWrite, } } -// ==== tokio_proto impl ==== - -#[cfg(feature = "tokio-proto")] -impl Stream for Conn -where I: AsyncRead + AsyncWrite, - B: AsRef<[u8]>, - T: Http1Transaction, - T::Outgoing: fmt::Debug { - type Item = Frame, Chunk, ::Error>; - type Error = io::Error; - - #[inline] - fn poll(&mut self) -> Poll, Self::Error> { - self.poll_incoming().map_err(|err| { - debug!("poll error: {}", err); - err - }) - } -} - -#[cfg(feature = "tokio-proto")] -impl Sink for Conn -where I: AsyncRead + AsyncWrite, - B: AsRef<[u8]>, - T: Http1Transaction, - T::Outgoing: fmt::Debug { - type SinkItem = Frame, B, ::Error>; - type SinkError = io::Error; - - #[inline] - fn start_send(&mut self, frame: Self::SinkItem) -> StartSend { - trace!("Conn::start_send( frame={:?} )", DebugFrame(&frame)); - - let frame: Self::SinkItem = match frame { - Frame::Message { message: head, body } => { - if self.can_write_head() { - self.write_head(head, body); - return Ok(AsyncSink::Ready); - } else { - Frame::Message { message: head, body: body } - } - }, - Frame::Body { chunk } => { - if self.can_write_body() { - return self.write_body(chunk) - .map(|async| { - match async { - AsyncSink::Ready => AsyncSink::Ready, - AsyncSink::NotReady(chunk) => AsyncSink::NotReady(Frame::Body { - chunk: chunk, - }) - } - }); - // This allows when chunk is `None`, or `Some([])`. - } else if chunk.as_ref().map(|c| c.as_ref().len()).unwrap_or(0) == 0 { - return Ok(AsyncSink::Ready); - } else { - Frame::Body { chunk: chunk } - } - }, - Frame::Error { error } => { - debug!("received error, closing: {:?}", error); - self.state.close(); - return Ok(AsyncSink::Ready); - }, - }; - - warn!("writing illegal frame; state={:?}, frame={:?}", self.state.writing, DebugFrame(&frame)); - Err(io::Error::new(io::ErrorKind::InvalidInput, "illegal frame")) - - } - - #[inline] - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - trace!("Conn::poll_complete()"); - self.flush().map_err(|err| { - debug!("error writing: {}", err); - err - }) - } - - #[inline] - fn close(&mut self) -> Poll<(), Self::SinkError> { - try_ready!(self.flush()); - self.shutdown() - } -} - -#[cfg(feature = "tokio-proto")] -impl Transport for Conn -where I: AsyncRead + AsyncWrite + 'static, - B: AsRef<[u8]> + 'static, - T: Http1Transaction + 'static, - T::Outgoing: fmt::Debug {} - impl, T> fmt::Debug for Conn { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_struct("Conn") @@ -958,46 +795,12 @@ enum Version { Http11, } -// The DebugFrame and DebugChunk are simple Debug implementations that allow -// us to dump the frame into logs, without logging the entirety of the bytes. -#[cfg(feature = "tokio-proto")] -struct DebugFrame<'a, T: fmt::Debug + 'a, B: AsRef<[u8]> + 'a>(&'a Frame, B, ::Error>); - -#[cfg(feature = "tokio-proto")] -impl<'a, T: fmt::Debug + 'a, B: AsRef<[u8]> + 'a> fmt::Debug for DebugFrame<'a, T, B> { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self.0 { - Frame::Message { ref body, .. } => { - f.debug_struct("Message") - .field("body", body) - .finish() - }, - Frame::Body { chunk: Some(ref chunk) } => { - f.debug_struct("Body") - .field("bytes", &chunk.as_ref().len()) - .finish() - }, - Frame::Body { chunk: None } => { - f.debug_struct("Body") - .field("bytes", &None::<()>) - .finish() - }, - Frame::Error { ref error } => { - f.debug_struct("Error") - .field("error", error) - .finish() - } - } - } -} - #[cfg(test)] -#[cfg(feature = "tokio-proto")] -//TODO: rewrite these using dispatch instead of tokio-proto API +//TODO: rewrite these using dispatch mod tests { + /* use futures::{Async, Future, Stream, Sink}; use futures::future; - use tokio_proto::streaming::pipeline::Frame; use proto::{self, ClientTransaction, MessageHead, ServerTransaction}; use super::super::Encoder; @@ -1326,4 +1129,5 @@ mod tests { assert!(conn.start_send(Frame::Body { chunk: Some(Vec::new().into()) }).unwrap().is_ready()); conn.start_send(Frame::Body { chunk: Some(vec![b'a'].into()) }).unwrap_err(); } + */ } diff --git a/src/proto/mod.rs b/src/proto/mod.rs index c0d6f773f6..7c91bd047c 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -13,8 +13,6 @@ use version::HttpVersion; use version::HttpVersion::{Http10, Http11}; pub use self::body::Body; -#[cfg(feature = "tokio-proto")] -pub use self::body::TokioBody; pub use self::chunk::Chunk; pub use self::h1::{dispatch, Conn}; diff --git a/src/server/mod.rs b/src/server/mod.rs index b7d6f8a54f..5e27c97a15 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -37,16 +37,6 @@ use self::hyper_service::HyperService; pub use proto::response::Response; pub use proto::request::Request; -feat_server_proto! { - mod server_proto; - pub use self::server_proto::{ - __ProtoRequest, - __ProtoResponse, - __ProtoTransport, - __ProtoBindTransport, - }; -} - pub use self::conn::Connection; pub use self::service::{const_service, service_fn}; @@ -349,12 +339,6 @@ impl Server self } - #[doc(hidden)] - #[deprecated(since="0.11.11", note="no_proto is always enabled")] - pub fn no_proto(&mut self) -> &mut Self { - self - } - /// Execute this server infinitely. /// /// This method does not currently return, but it will return an error if diff --git a/src/server/server_proto.rs b/src/server/server_proto.rs deleted file mode 100644 index 8e46d3442a..0000000000 --- a/src/server/server_proto.rs +++ /dev/null @@ -1,259 +0,0 @@ -//! The tokio-proto `ServerProto` machinery. -//! -//! Not to be confused with `hyper::proto`. -//! -//! Will be deprecated soon. - -use std::io; -use std::net::SocketAddr; - -#[cfg(feature = "compat")] -use http; -use futures::future::{self, Map}; -use futures::{Future, Stream, Poll, Sink, StartSend, AsyncSink}; -use tokio::reactor::Handle; -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_proto::BindServer; -use tokio_proto::streaming::Message; -use tokio_proto::streaming::pipeline::{Transport, Frame, ServerProto}; -use tokio_service::Service; - -use {Request, Response}; -use proto::{self, request, response}; -#[cfg(feature = "compat")] -use proto::Body; -#[cfg(feature = "compat")] -use super::compat; -use super::Http; - -impl + 'static> Http { - /// Use this `Http` instance to create a new server task which handles the - /// connection `io` provided. - /// - /// # Deprecated - /// - /// This method is deprecated. If seeking a replacement, consider - /// `Http::serve_connection`. - pub fn bind_connection(&self, - handle: &Handle, - io: I, - remote_addr: SocketAddr, - service: S) - where S: Service, Error = ::Error> + 'static, - Bd: Stream + 'static, - I: AsyncRead + AsyncWrite + 'static, - { - self.bind_server(handle, io, HttpService { - inner: service, - remote_addr: remote_addr, - }) - } - - - /// Bind a `Service` using types from the `http` crate. - /// - /// See `Http::bind_connection`. - #[cfg(feature = "compat")] - pub fn bind_connection_compat(&self, - handle: &Handle, - io: I, - remote_addr: SocketAddr, - service: S) - where S: Service, Response = http::Response, Error = ::Error> + 'static, - Bd: Stream + 'static, - I: AsyncRead + AsyncWrite + 'static, - { - self.bind_server(handle, io, HttpService { - inner: compat::service(service), - remote_addr: remote_addr, - }) - } -} - -#[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct __ProtoRequest(proto::RequestHead); -#[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct __ProtoResponse(proto::MessageHead<::StatusCode>); -#[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct __ProtoTransport(proto::Conn); -#[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct __ProtoBindTransport { - inner: future::FutureResult, io::Error>, -} - -impl ServerProto for Http - where T: AsyncRead + AsyncWrite + 'static, - B: AsRef<[u8]> + 'static, -{ - type Request = __ProtoRequest; - type RequestBody = proto::Chunk; - type Response = __ProtoResponse; - type ResponseBody = B; - type Error = ::Error; - type Transport = __ProtoTransport; - type BindTransport = __ProtoBindTransport; - - #[inline] - fn bind_transport(&self, io: T) -> Self::BindTransport { - let mut conn = proto::Conn::new(io); - if !self.keep_alive { - conn.disable_keep_alive(); - } - conn.set_flush_pipeline(self.pipeline); - if let Some(max) = self.max_buf_size { - conn.set_max_buf_size(max); - } - __ProtoBindTransport { - inner: future::ok(conn), - } - } -} - -impl Sink for __ProtoTransport - where T: AsyncRead + AsyncWrite + 'static, - B: AsRef<[u8]> + 'static, -{ - type SinkItem = Frame<__ProtoResponse, B, ::Error>; - type SinkError = io::Error; - - #[inline] - fn start_send(&mut self, item: Self::SinkItem) - -> StartSend { - let item = match item { - Frame::Message { message, body } => { - Frame::Message { message: message.0, body: body } - } - Frame::Body { chunk } => Frame::Body { chunk: chunk }, - Frame::Error { error } => Frame::Error { error: error }, - }; - match try!(self.0.start_send(item)) { - AsyncSink::Ready => Ok(AsyncSink::Ready), - AsyncSink::NotReady(Frame::Message { message, body }) => { - Ok(AsyncSink::NotReady(Frame::Message { - message: __ProtoResponse(message), - body: body, - })) - } - AsyncSink::NotReady(Frame::Body { chunk }) => { - Ok(AsyncSink::NotReady(Frame::Body { chunk: chunk })) - } - AsyncSink::NotReady(Frame::Error { error }) => { - Ok(AsyncSink::NotReady(Frame::Error { error: error })) - } - } - } - - #[inline] - fn poll_complete(&mut self) -> Poll<(), io::Error> { - self.0.poll_complete() - } - - #[inline] - fn close(&mut self) -> Poll<(), io::Error> { - self.0.close() - } -} - -impl Stream for __ProtoTransport - where T: AsyncRead + AsyncWrite + 'static, - B: AsRef<[u8]> + 'static, -{ - type Item = Frame<__ProtoRequest, proto::Chunk, ::Error>; - type Error = io::Error; - - #[inline] - fn poll(&mut self) -> Poll, io::Error> { - let item = match try_ready!(self.0.poll()) { - Some(item) => item, - None => return Ok(None.into()), - }; - let item = match item { - Frame::Message { message, body } => { - Frame::Message { message: __ProtoRequest(message), body: body } - } - Frame::Body { chunk } => Frame::Body { chunk: chunk }, - Frame::Error { error } => Frame::Error { error: error }, - }; - Ok(Some(item).into()) - } -} - -impl Transport for __ProtoTransport - where T: AsyncRead + AsyncWrite + 'static, - B: AsRef<[u8]> + 'static, -{ - #[inline] - fn tick(&mut self) { - self.0.tick() - } - - #[inline] - fn cancel(&mut self) -> io::Result<()> { - self.0.cancel() - } -} - -impl Future for __ProtoBindTransport - where T: AsyncRead + AsyncWrite + 'static, -{ - type Item = __ProtoTransport; - type Error = io::Error; - - #[inline] - fn poll(&mut self) -> Poll<__ProtoTransport, io::Error> { - self.inner.poll().map(|a| a.map(__ProtoTransport)) - } -} - -impl From> for Request { - #[inline] - fn from(message: Message<__ProtoRequest, proto::TokioBody>) -> Request { - let (head, body) = match message { - Message::WithoutBody(head) => (head.0, None), - Message::WithBody(head, body) => (head.0, Some(body.into())), - }; - request::from_wire(None, head, body) - } -} - -impl Into> for Response { - #[inline] - fn into(self) -> Message<__ProtoResponse, B> { - let (head, body) = response::split(self); - if let Some(body) = body { - Message::WithBody(__ProtoResponse(head), body.into()) - } else { - Message::WithoutBody(__ProtoResponse(head)) - } - } -} - -struct HttpService { - inner: T, - remote_addr: SocketAddr, -} - -impl Service for HttpService - where T: Service, Error=::Error>, - B: Stream, - B::Item: AsRef<[u8]>, -{ - type Request = Message<__ProtoRequest, proto::TokioBody>; - type Response = Message<__ProtoResponse, B>; - type Error = ::Error; - type Future = Map) -> Message<__ProtoResponse, B>>; - - #[inline] - fn call(&self, message: Self::Request) -> Self::Future { - let (head, body) = match message { - Message::WithoutBody(head) => (head.0, None), - Message::WithBody(head, body) => (head.0, Some(body.into())), - }; - let req = request::from_wire(Some(self.remote_addr), head, body); - self.inner.call(req).map(Into::into) - } -}