diff --git a/examples/echo.rs b/examples/echo.rs index c7d4fa98f9..6161774a8e 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -4,15 +4,16 @@ use std::net::SocketAddr; use bytes::Bytes; use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full}; -use hyper::body::Body as _; use hyper::server::conn::http1; use hyper::service::service_fn; -use hyper::{Method, Recv, Request, Response, StatusCode}; +use hyper::{body::Body, Method, Request, Response, StatusCode}; use tokio::net::TcpListener; /// This is our service handler. It receives a Request, routes on its /// path, and returns a Future of a Response. -async fn echo(req: Request) -> Result>, hyper::Error> { +async fn echo( + req: Request, +) -> Result>, hyper::Error> { match (req.method(), req.uri().path()) { // Serve some instructions at / (&Method::GET, "/") => Ok(Response::new(full( diff --git a/examples/hello.rs b/examples/hello.rs index 28a145dee4..167c9a0176 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -7,10 +7,10 @@ use bytes::Bytes; use http_body_util::Full; use hyper::server::conn::http1; use hyper::service::service_fn; -use hyper::{Recv, Request, Response}; +use hyper::{Request, Response}; use tokio::net::TcpListener; -async fn hello(_: Request) -> Result>, Infallible> { +async fn hello(_: Request) -> Result>, Infallible> { Ok(Response::new(Full::new(Bytes::from("Hello World!")))) } diff --git a/examples/http_proxy.rs b/examples/http_proxy.rs index 792e1e470d..7cd4e624f8 100644 --- a/examples/http_proxy.rs +++ b/examples/http_proxy.rs @@ -8,7 +8,7 @@ use hyper::client::conn::http1::Builder; use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::upgrade::Upgraded; -use hyper::{Method, Recv, Request, Response}; +use hyper::{Method, Request, Response}; use tokio::net::{TcpListener, TcpStream}; @@ -43,7 +43,9 @@ async fn main() -> Result<(), Box> { } } -async fn proxy(req: Request) -> Result>, hyper::Error> { +async fn proxy( + req: Request, +) -> Result>, hyper::Error> { println!("req: {:?}", req); if Method::CONNECT == req.method() { diff --git a/examples/multi_server.rs b/examples/multi_server.rs index 00232133b6..5eb520dbdb 100644 --- a/examples/multi_server.rs +++ b/examples/multi_server.rs @@ -8,17 +8,17 @@ use futures_util::future::join; use http_body_util::Full; use hyper::server::conn::http1; use hyper::service::service_fn; -use hyper::{Recv, Request, Response}; +use hyper::{Request, Response}; use tokio::net::TcpListener; static INDEX1: &[u8] = b"The 1st service!"; static INDEX2: &[u8] = b"The 2nd service!"; -async fn index1(_: Request) -> Result>, hyper::Error> { +async fn index1(_: Request) -> Result>, hyper::Error> { Ok(Response::new(Full::new(Bytes::from(INDEX1)))) } -async fn index2(_: Request) -> Result>, hyper::Error> { +async fn index2(_: Request) -> Result>, hyper::Error> { Ok(Response::new(Full::new(Bytes::from(INDEX2)))) } diff --git a/examples/params.rs b/examples/params.rs index cce182583e..a902867f2e 100644 --- a/examples/params.rs +++ b/examples/params.rs @@ -5,7 +5,7 @@ use bytes::Bytes; use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full}; use hyper::server::conn::http1; use hyper::service::service_fn; -use hyper::{Method, Recv, Request, Response, StatusCode}; +use hyper::{Method, Request, Response, StatusCode}; use tokio::net::TcpListener; use std::collections::HashMap; @@ -19,7 +19,7 @@ static NOTNUMERIC: &[u8] = b"Number field is not numeric"; // Using service_fn, we can turn this function into a `Service`. async fn param_example( - req: Request, + req: Request, ) -> Result>, hyper::Error> { match (req.method(), req.uri().path()) { (&Method::GET, "/") | (&Method::GET, "/post") => Ok(Response::new(full(INDEX))), diff --git a/examples/send_file.rs b/examples/send_file.rs index 5d6700f2b3..a4514eb52b 100644 --- a/examples/send_file.rs +++ b/examples/send_file.rs @@ -8,7 +8,7 @@ use tokio::net::TcpListener; use bytes::Bytes; use http_body_util::Full; use hyper::service::service_fn; -use hyper::{Method, Recv, Request, Response, Result, StatusCode}; +use hyper::{Method, Request, Response, Result, StatusCode}; static INDEX: &str = "examples/send_file_index.html"; static NOTFOUND: &[u8] = b"Not Found"; @@ -36,7 +36,7 @@ async fn main() -> std::result::Result<(), Box> { } } -async fn response_examples(req: Request) -> Result>> { +async fn response_examples(req: Request) -> Result>> { match (req.method(), req.uri().path()) { (&Method::GET, "/") | (&Method::GET, "/index.html") => simple_file_send(INDEX).await, (&Method::GET, "/no_file.html") => { diff --git a/examples/service_struct_impl.rs b/examples/service_struct_impl.rs index 5a9e6ab61b..50fd3ab749 100644 --- a/examples/service_struct_impl.rs +++ b/examples/service_struct_impl.rs @@ -2,7 +2,7 @@ use bytes::Bytes; use http_body_util::Full; use hyper::server::conn::http1; use hyper::service::Service; -use hyper::{Recv, Request, Response}; +use hyper::{body::Incoming as IncomingBody, Request, Response}; use tokio::net::TcpListener; use std::future::Future; @@ -36,12 +36,12 @@ struct Svc { counter: Counter, } -impl Service> for Svc { +impl Service> for Svc { type Response = Response>; type Error = hyper::Error; type Future = Pin> + Send>>; - fn call(&mut self, req: Request) -> Self::Future { + fn call(&mut self, req: Request) -> Self::Future { fn mk_response(s: String) -> Result>, hyper::Error> { Ok(Response::builder().body(Full::new(Bytes::from(s))).unwrap()) } diff --git a/examples/upgrades.rs b/examples/upgrades.rs index e5494e7bbb..92a80d7567 100644 --- a/examples/upgrades.rs +++ b/examples/upgrades.rs @@ -14,7 +14,7 @@ use hyper::header::{HeaderValue, UPGRADE}; use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::upgrade::Upgraded; -use hyper::{Recv, Request, Response, StatusCode}; +use hyper::{Request, Response, StatusCode}; // A simple type alias so as to DRY. type Result = std::result::Result>; @@ -38,7 +38,7 @@ async fn server_upgraded_io(mut upgraded: Upgraded) -> Result<()> { } /// Our server HTTP handler to initiate HTTP upgrades. -async fn server_upgrade(mut req: Request) -> Result>> { +async fn server_upgrade(mut req: Request) -> Result>> { let mut res = Response::new(Empty::new()); // Send a 400 to any request that doesn't have diff --git a/examples/web_api.rs b/examples/web_api.rs index 47c30cd852..79834a0acd 100644 --- a/examples/web_api.rs +++ b/examples/web_api.rs @@ -6,7 +6,7 @@ use bytes::{Buf, Bytes}; use http_body_util::{BodyExt, Full}; use hyper::server::conn::http1; use hyper::service::service_fn; -use hyper::{header, Method, Recv, Request, Response, StatusCode}; +use hyper::{body::Incoming as IncomingBody, header, Method, Request, Response, StatusCode}; use tokio::net::{TcpListener, TcpStream}; type GenericError = Box; @@ -46,7 +46,7 @@ async fn client_request_response() -> Result> { Ok(Response::new(res_body)) } -async fn api_post_response(req: Request) -> Result> { +async fn api_post_response(req: Request) -> Result> { // Aggregate the body... let whole_body = req.collect().await?.aggregate(); // Decode as JSON... @@ -77,7 +77,7 @@ async fn api_get_response() -> Result> { Ok(res) } -async fn response_examples(req: Request) -> Result> { +async fn response_examples(req: Request) -> Result> { match (req.method(), req.uri().path()) { (&Method::GET, "/") | (&Method::GET, "/index.html") => Ok(Response::new(full(INDEX))), (&Method::GET, "/test.html") => client_request_response().await, diff --git a/src/body/body.rs b/src/body/incoming.rs similarity index 91% rename from src/body/body.rs rename to src/body/incoming.rs index f93030f7c7..863d5d3cb8 100644 --- a/src/body/body.rs +++ b/src/body/incoming.rs @@ -18,7 +18,7 @@ type TrailersSender = oneshot::Sender; /// A stream of `Bytes`, used when receiving bodies from the network. #[must_use = "streams do nothing unless polled"] -pub struct Recv { +pub struct Incoming { kind: Kind, } @@ -65,17 +65,17 @@ pub(crate) struct Sender { const WANT_PENDING: usize = 1; const WANT_READY: usize = 2; -impl Recv { +impl Incoming { /// Create a `Body` stream with an associated sender half. /// /// Useful when wanting to stream chunks from another thread. #[inline] #[allow(unused)] - pub(crate) fn channel() -> (Sender, Recv) { + pub(crate) fn channel() -> (Sender, Incoming) { Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false) } - pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Recv) { + pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Incoming) { let (data_tx, data_rx) = mpsc::channel(0); let (trailers_tx, trailers_rx) = oneshot::channel(); @@ -90,7 +90,7 @@ impl Recv { data_tx, trailers_tx: Some(trailers_tx), }; - let rx = Recv::new(Kind::Chan { + let rx = Incoming::new(Kind::Chan { content_length, want_tx, data_rx, @@ -100,18 +100,18 @@ impl Recv { (tx, rx) } - fn new(kind: Kind) -> Recv { - Recv { kind } + fn new(kind: Kind) -> Incoming { + Incoming { kind } } #[allow(dead_code)] - pub(crate) fn empty() -> Recv { - Recv::new(Kind::Empty) + pub(crate) fn empty() -> Incoming { + Incoming::new(Kind::Empty) } #[cfg(feature = "ffi")] - pub(crate) fn ffi() -> Recv { - Recv::new(Kind::Ffi(crate::ffi::UserBody::new())) + pub(crate) fn ffi() -> Incoming { + Incoming::new(Kind::Ffi(crate::ffi::UserBody::new())) } #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] @@ -125,7 +125,7 @@ impl Recv { if !content_length.is_exact() && recv.is_end_stream() { content_length = DecodedLength::ZERO; } - let body = Recv::new(Kind::H2 { + let body = Incoming::new(Kind::H2 { data_done: false, ping, content_length, @@ -151,7 +151,7 @@ impl Recv { } } -impl Body for Recv { +impl Body for Incoming { type Data = Bytes; type Error = crate::Error; @@ -259,7 +259,7 @@ impl Body for Recv { } } -impl fmt::Debug for Recv { +impl fmt::Debug for Incoming { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { #[derive(Debug)] struct Streaming; @@ -375,7 +375,7 @@ mod tests { use std::mem; use std::task::Poll; - use super::{Body, DecodedLength, Recv, Sender, SizeHint}; + use super::{Body, DecodedLength, Incoming, Sender, SizeHint}; use http_body_util::BodyExt; #[test] @@ -383,7 +383,7 @@ mod tests { // These are mostly to help catch *accidentally* increasing // the size by too much. - let body_size = mem::size_of::(); + let body_size = mem::size_of::(); let body_expected_size = mem::size_of::() * 5; assert!( body_size <= body_expected_size, @@ -392,7 +392,7 @@ mod tests { body_expected_size, ); - //assert_eq!(body_size, mem::size_of::>(), "Option"); + //assert_eq!(body_size, mem::size_of::>(), "Option"); assert_eq!( mem::size_of::(), @@ -409,18 +409,18 @@ mod tests { #[test] fn size_hint() { - fn eq(body: Recv, b: SizeHint, note: &str) { + fn eq(body: Incoming, b: SizeHint, note: &str) { let a = body.size_hint(); assert_eq!(a.lower(), b.lower(), "lower for {:?}", note); assert_eq!(a.upper(), b.upper(), "upper for {:?}", note); } - eq(Recv::empty(), SizeHint::with_exact(0), "empty"); + eq(Incoming::empty(), SizeHint::with_exact(0), "empty"); - eq(Recv::channel().1, SizeHint::new(), "channel"); + eq(Incoming::channel().1, SizeHint::new(), "channel"); eq( - Recv::new_channel(DecodedLength::new(4), /*wanter =*/ false).1, + Incoming::new_channel(DecodedLength::new(4), /*wanter =*/ false).1, SizeHint::with_exact(4), "channel with length", ); @@ -429,7 +429,7 @@ mod tests { #[cfg(not(miri))] #[tokio::test] async fn channel_abort() { - let (tx, mut rx) = Recv::channel(); + let (tx, mut rx) = Incoming::channel(); tx.abort(); @@ -440,7 +440,7 @@ mod tests { #[cfg(all(not(miri), feature = "http1"))] #[tokio::test] async fn channel_abort_when_buffer_is_full() { - let (mut tx, mut rx) = Recv::channel(); + let (mut tx, mut rx) = Incoming::channel(); tx.try_send_data("chunk 1".into()).expect("send 1"); // buffer is full, but can still send abort @@ -462,7 +462,7 @@ mod tests { #[cfg(feature = "http1")] #[test] fn channel_buffers_one() { - let (mut tx, _rx) = Recv::channel(); + let (mut tx, _rx) = Incoming::channel(); tx.try_send_data("chunk 1".into()).expect("send 1"); @@ -474,14 +474,14 @@ mod tests { #[cfg(not(miri))] #[tokio::test] async fn channel_empty() { - let (_, mut rx) = Recv::channel(); + let (_, mut rx) = Incoming::channel(); assert!(rx.frame().await.is_none()); } #[test] fn channel_ready() { - let (mut tx, _rx) = Recv::new_channel(DecodedLength::CHUNKED, /*wanter = */ false); + let (mut tx, _rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ false); let mut tx_ready = tokio_test::task::spawn(tx.ready()); @@ -490,7 +490,8 @@ mod tests { #[test] fn channel_wanter() { - let (mut tx, mut rx) = Recv::new_channel(DecodedLength::CHUNKED, /*wanter = */ true); + let (mut tx, mut rx) = + Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ true); let mut tx_ready = tokio_test::task::spawn(tx.ready()); let mut rx_data = tokio_test::task::spawn(rx.frame()); @@ -511,7 +512,7 @@ mod tests { #[test] fn channel_notices_closure() { - let (mut tx, rx) = Recv::new_channel(DecodedLength::CHUNKED, /*wanter = */ true); + let (mut tx, rx) = Incoming::new_channel(DecodedLength::CHUNKED, /*wanter = */ true); let mut tx_ready = tokio_test::task::spawn(tx.ready()); diff --git a/src/body/mod.rs b/src/body/mod.rs index 26c381b1c7..60c9914596 100644 --- a/src/body/mod.rs +++ b/src/body/mod.rs @@ -10,28 +10,28 @@ //! - **The [`Body`](Body) trait** describes all possible bodies. //! hyper allows any body type that implements `Body`, allowing //! applications to have fine-grained control over their streaming. -//! - **The [`Recv`](Recv) concrete type**, which is an implementation of +//! - **The [`Incoming`](Incoming) concrete type**, which is an implementation of //! `Body`, and returned by hyper as a "receive stream" (so, for server -//! requests and client responses). It is also a decent default implementation -//! if you don't have very custom needs of your send streams. +//! requests and client responses). pub use bytes::{Buf, Bytes}; pub use http_body::Body; pub use http_body::Frame; pub use http_body::SizeHint; -pub use self::body::Recv; +pub use self::incoming::Incoming; + #[cfg(feature = "http1")] -pub(crate) use self::body::Sender; +pub(crate) use self::incoming::Sender; pub(crate) use self::length::DecodedLength; -mod body; +mod incoming; mod length; fn _assert_send_sync() { fn _assert_send() {} fn _assert_sync() {} - _assert_send::(); - _assert_sync::(); + _assert_send::(); + _assert_sync::(); } diff --git a/src/client/conn/http1.rs b/src/client/conn/http1.rs index ad100f8819..88f6c565e8 100644 --- a/src/client/conn/http1.rs +++ b/src/client/conn/http1.rs @@ -9,8 +9,7 @@ use http::{Request, Response}; use httparse::ParserConfig; use tokio::io::{AsyncRead, AsyncWrite}; -use crate::Recv; -use crate::body::Body; +use crate::body::{Body, Incoming as IncomingBody}; use super::super::dispatch; use crate::common::{ exec::{BoxSendFuture, Exec}, @@ -25,7 +24,7 @@ type Dispatcher = /// The sender side of an established connection. pub struct SendRequest { - dispatch: dispatch::Sender, Response>, + dispatch: dispatch::Sender, Response>, } /// Deconstructed parts of a `Connection`. @@ -189,7 +188,7 @@ where pub fn send_request( &mut self, req: Request, - ) -> impl Future>> { + ) -> impl Future>> { let sent = self.dispatch.send(req); async move { diff --git a/src/client/conn/http2.rs b/src/client/conn/http2.rs index ee7325d385..94dafb9f50 100644 --- a/src/client/conn/http2.rs +++ b/src/client/conn/http2.rs @@ -10,7 +10,7 @@ use http::{Request, Response}; use tokio::io::{AsyncRead, AsyncWrite}; use super::super::dispatch; -use crate::body::Body; +use crate::body::{Body, Incoming as IncomingBody}; use crate::common::time::Time; use crate::common::{ exec::{BoxSendFuture, Exec}, @@ -18,11 +18,10 @@ use crate::common::{ }; use crate::proto; use crate::rt::{Executor, Timer}; -use crate::Recv; /// The sender side of an established connection. pub struct SendRequest { - dispatch: dispatch::UnboundedSender, Response>, + dispatch: dispatch::UnboundedSender, Response>, } /// A future that processes all HTTP state for the IO object. @@ -128,7 +127,7 @@ where pub fn send_request( &mut self, req: Request, - ) -> impl Future>> { + ) -> impl Future>> { let sent = self.dispatch.send(req); async move { diff --git a/src/client/dispatch.rs b/src/client/dispatch.rs index 8806c638b0..63bb0256d5 100644 --- a/src/client/dispatch.rs +++ b/src/client/dispatch.rs @@ -379,15 +379,15 @@ mod tests { #[cfg(feature = "nightly")] #[bench] fn giver_queue_throughput(b: &mut test::Bencher) { - use crate::{Recv, Request, Response}; + use crate::{body::Incoming, Request, Response}; let rt = tokio::runtime::Builder::new_current_thread() .build() .unwrap(); - let (mut tx, mut rx) = channel::, Response>(); + let (mut tx, mut rx) = channel::, Response>(); b.iter(move || { - let _ = tx.send(Request::new(Recv::empty())).unwrap(); + let _ = tx.send(Request::new(Incoming::empty())).unwrap(); rt.block_on(async { loop { let poll_once = PollOnce(&mut rx); diff --git a/src/ffi/body.rs b/src/ffi/body.rs index d28961f444..4cc3415f2b 100644 --- a/src/ffi/body.rs +++ b/src/ffi/body.rs @@ -8,10 +8,10 @@ use libc::{c_int, size_t}; use super::task::{hyper_context, hyper_task, hyper_task_return_type, AsTaskType}; use super::{UserDataPointer, HYPER_ITER_CONTINUE}; -use crate::body::{Bytes, Frame, Recv}; +use crate::body::{Bytes, Frame, Incoming as IncomingBody}; /// A streaming HTTP body. -pub struct hyper_body(pub(super) Recv); +pub struct hyper_body(pub(super) IncomingBody); /// A buffer of bytes that is sent or received on a `hyper_body`. pub struct hyper_buf(pub(crate) Bytes); @@ -33,7 +33,7 @@ ffi_fn! { /// /// If not configured, this body acts as an empty payload. fn hyper_body_new() -> *mut hyper_body { - Box::into_raw(Box::new(hyper_body(Recv::ffi()))) + Box::into_raw(Box::new(hyper_body(IncomingBody::ffi()))) } ?= ptr::null_mut() } diff --git a/src/ffi/client.rs b/src/ffi/client.rs index 8cd1dca4e3..c4da61950e 100644 --- a/src/ffi/client.rs +++ b/src/ffi/client.rs @@ -32,9 +32,9 @@ pub struct hyper_clientconn { enum Tx { #[cfg(feature = "http1")] - Http1(conn::http1::SendRequest), + Http1(conn::http1::SendRequest), #[cfg(feature = "http2")] - Http2(conn::http2::SendRequest), + Http2(conn::http2::SendRequest), } // ===== impl hyper_clientconn ===== @@ -57,7 +57,7 @@ ffi_fn! { if options.http2 { return conn::http2::Builder::new() .executor(options.exec.clone()) - .handshake::<_, crate::Recv>(io) + .handshake::<_, crate::body::Incoming>(io) .await .map(|(tx, conn)| { options.exec.execute(Box::pin(async move { @@ -73,7 +73,7 @@ ffi_fn! { .http1_allow_obsolete_multiline_headers_in_responses(options.http1_allow_obsolete_multiline_headers_in_responses) .http1_preserve_header_case(options.http1_preserve_header_case) .http1_preserve_header_order(options.http1_preserve_header_order) - .handshake::<_, crate::Recv>(io) + .handshake::<_, crate::body::Incoming>(io) .await .map(|(tx, conn)| { options.exec.execute(Box::pin(async move { diff --git a/src/ffi/http_types.rs b/src/ffi/http_types.rs index a10c10bddc..473feefa84 100644 --- a/src/ffi/http_types.rs +++ b/src/ffi/http_types.rs @@ -6,15 +6,16 @@ use super::body::hyper_body; use super::error::hyper_code; use super::task::{hyper_task_return_type, AsTaskType}; use super::{UserDataPointer, HYPER_ITER_CONTINUE}; +use crate::body::Incoming as IncomingBody; use crate::ext::{HeaderCaseMap, OriginalHeaderOrder, ReasonPhrase}; use crate::header::{HeaderName, HeaderValue}; -use crate::{HeaderMap, Method, Recv, Request, Response, Uri}; +use crate::{HeaderMap, Method, Request, Response, Uri}; /// An HTTP request. -pub struct hyper_request(pub(super) Request); +pub struct hyper_request(pub(super) Request); /// An HTTP response. -pub struct hyper_response(pub(super) Response); +pub struct hyper_response(pub(super) Response); /// An HTTP header map. /// @@ -37,7 +38,7 @@ type hyper_request_on_informational_callback = extern "C" fn(*mut c_void, *mut h ffi_fn! { /// Construct a new HTTP request. fn hyper_request_new() -> *mut hyper_request { - Box::into_raw(Box::new(hyper_request(Request::new(Recv::empty())))) + Box::into_raw(Box::new(hyper_request(Request::new(IncomingBody::empty())))) } ?= std::ptr::null_mut() } @@ -312,13 +313,13 @@ ffi_fn! { /// /// It is safe to free the response even after taking ownership of its body. fn hyper_response_body(resp: *mut hyper_response) -> *mut hyper_body { - let body = std::mem::replace(non_null!(&mut *resp ?= std::ptr::null_mut()).0.body_mut(), crate::Recv::empty()); + let body = std::mem::replace(non_null!(&mut *resp ?= std::ptr::null_mut()).0.body_mut(), IncomingBody::empty()); Box::into_raw(Box::new(hyper_body(body))) } ?= std::ptr::null_mut() } impl hyper_response { - pub(super) fn wrap(mut resp: Response) -> hyper_response { + pub(super) fn wrap(mut resp: Response) -> hyper_response { let headers = std::mem::take(resp.headers_mut()); let orig_casing = resp .extensions_mut() @@ -509,7 +510,7 @@ unsafe fn raw_name_value( // ===== impl OnInformational ===== impl OnInformational { - pub(crate) fn call(&mut self, resp: Response) { + pub(crate) fn call(&mut self, resp: Response) { let mut resp = hyper_response::wrap(resp); (self.func)(self.data.0, &mut resp); } diff --git a/src/lib.rs b/src/lib.rs index 2bd4d759d3..f5f442d71e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -63,7 +63,6 @@ pub use crate::http::{header, Method, Request, Response, StatusCode, Uri, Versio #[doc(no_inline)] pub use crate::http::HeaderMap; -pub use crate::body::Recv; pub use crate::error::{Error, Result}; #[macro_use] diff --git a/src/proto/h1/dispatch.rs b/src/proto/h1/dispatch.rs index a1c9341953..81f7014de9 100644 --- a/src/proto/h1/dispatch.rs +++ b/src/proto/h1/dispatch.rs @@ -6,7 +6,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tracing::{debug, trace}; use super::{Http1Transaction, Wants}; -use crate::body::{Body, DecodedLength, Recv}; +use crate::body::{Body, DecodedLength, Incoming as IncomingBody}; use crate::common::{task, Future, Pin, Poll, Unpin}; use crate::proto::{BodyLength, Conn, Dispatched, MessageHead, RequestHead}; use crate::upgrade::OnUpgrade; @@ -28,7 +28,7 @@ pub(crate) trait Dispatch { self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll>>; - fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Recv)>) -> crate::Result<()>; + fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, IncomingBody)>) -> crate::Result<()>; fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll>; fn should_poll(&self) -> bool; } @@ -45,14 +45,14 @@ cfg_server! { cfg_client! { pin_project_lite::pin_project! { pub(crate) struct Client { - callback: Option, http::Response>>, + callback: Option, http::Response>>, #[pin] rx: ClientRx, rx_closed: bool, } } - type ClientRx = crate::client::dispatch::Receiver, http::Response>; + type ClientRx = crate::client::dispatch::Receiver, http::Response>; } impl Dispatcher @@ -247,9 +247,9 @@ where match ready!(self.conn.poll_read_head(cx)) { Some(Ok((mut head, body_len, wants))) => { let body = match body_len { - DecodedLength::ZERO => Recv::empty(), + DecodedLength::ZERO => IncomingBody::empty(), other => { - let (tx, rx) = Recv::new_channel(other, wants.contains(Wants::EXPECT)); + let (tx, rx) = IncomingBody::new_channel(other, wants.contains(Wants::EXPECT)); self.body_tx = Some(tx); rx } @@ -470,9 +470,9 @@ cfg_server! { // Service is never pinned impl, B> Unpin for Server {} - impl Dispatch for Server + impl Dispatch for Server where - S: HttpService, + S: HttpService, S::Error: Into>, Bs: Body, { @@ -505,7 +505,7 @@ cfg_server! { ret } - fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Recv)>) -> crate::Result<()> { + fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, IncomingBody)>) -> crate::Result<()> { let (msg, body) = msg?; let mut req = Request::new(body); *req.method_mut() = msg.subject.0; @@ -591,7 +591,7 @@ cfg_client! { } } - fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Recv)>) -> crate::Result<()> { + fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, IncomingBody)>) -> crate::Result<()> { match msg { Ok((msg, body)) => { if let Some(cb) = self.callback.take() { @@ -673,7 +673,7 @@ mod tests { handle.read(b"HTTP/1.1 200 OK\r\n\r\n"); let mut res_rx = tx - .try_send(crate::Request::new(crate::Recv::empty())) + .try_send(crate::Request::new(IncomingBody::empty())) .unwrap(); tokio_test::assert_ready_ok!(Pin::new(&mut dispatcher).poll(cx)); @@ -706,7 +706,7 @@ mod tests { let _dispatcher = tokio::spawn(async move { dispatcher.await }); let body = { - let (mut tx, body) = crate::Recv::new_channel(DecodedLength::new(4), false); + let (mut tx, body) = IncomingBody::new_channel(DecodedLength::new(4), false); tx.try_send_data("reee".into()).unwrap(); body }; @@ -737,7 +737,7 @@ mod tests { assert!(dispatcher.poll().is_pending()); let body = { - let (mut tx, body) = crate::Recv::channel(); + let (mut tx, body) = IncomingBody::channel(); tx.try_send_data("".into()).unwrap(); body }; diff --git a/src/proto/h1/role.rs b/src/proto/h1/role.rs index 37072384a2..d28c44889c 100644 --- a/src/proto/h1/role.rs +++ b/src/proto/h1/role.rs @@ -1085,7 +1085,7 @@ impl Http1Transaction for Client { #[cfg(feature = "ffi")] if head.subject.is_informational() { if let Some(callback) = ctx.on_informational { - callback.call(head.into_response(crate::Recv::empty())); + callback.call(head.into_response(crate::body::Incoming::empty())); } } diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index 92ab69cd56..a2b78fd478 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -11,7 +11,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tracing::{debug, trace, warn}; use super::{ping, H2Upgraded, PipeToSendStream, SendBuf}; -use crate::body::Body; +use crate::body::{Body, Incoming as IncomingBody}; use crate::common::time::Time; use crate::common::{exec::Exec, task, Future, Never, Pin, Poll}; use crate::ext::Protocol; @@ -19,9 +19,9 @@ use crate::headers; use crate::proto::h2::UpgradedSendStream; use crate::proto::Dispatched; use crate::upgrade::Upgraded; -use crate::{Recv, Request, Response}; +use crate::{Request, Response}; -type ClientRx = crate::client::dispatch::Receiver, Response>; +type ClientRx = crate::client::dispatch::Receiver, Response>; ///// An mpsc channel is used to help notify the `Connection` task when *all* ///// other handles to it have been dropped, so that it can shutdown. @@ -327,7 +327,7 @@ where )); } let (parts, recv_stream) = res.into_parts(); - let mut res = Response::from_parts(parts, Recv::empty()); + let mut res = Response::from_parts(parts, IncomingBody::empty()); let (pending, on_upgrade) = crate::upgrade::pending(); let io = H2Upgraded { @@ -345,7 +345,7 @@ where } else { let res = res.map(|stream| { let ping = ping.for_stream(&stream); - crate::Recv::h2(stream, content_length.into(), ping) + IncomingBody::h2(stream, content_length.into(), ping) }); Ok(res) } diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index 70619d526d..2ec7601a6e 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -12,7 +12,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tracing::{debug, trace, warn}; use super::{ping, PipeToSendStream, SendBuf}; -use crate::body::Body; +use crate::body::{Body, Incoming as IncomingBody}; use crate::common::exec::ConnStreamExec; use crate::common::time::Time; use crate::common::{date, task, Future, Pin, Poll}; @@ -24,7 +24,7 @@ use crate::proto::Dispatched; use crate::service::HttpService; use crate::upgrade::{OnUpgrade, Pending, Upgraded}; -use crate::{Recv, Response}; +use crate::{Response}; // Our defaults are chosen for the "majority" case, which usually are not // resource constrained, and so the spec default of 64kb can be too limiting @@ -73,7 +73,7 @@ impl Default for Config { pin_project! { pub(crate) struct Server where - S: HttpService, + S: HttpService, B: Body, { exec: E, @@ -107,7 +107,7 @@ where impl Server where T: AsyncRead + AsyncWrite + Unpin, - S: HttpService, + S: HttpService, S::Error: Into>, B: Body + 'static, E: ConnStreamExec, @@ -183,7 +183,7 @@ where impl Future for Server where T: AsyncRead + AsyncWrite + Unpin, - S: HttpService, + S: HttpService, S::Error: Into>, B: Body + 'static, E: ConnStreamExec, @@ -238,7 +238,7 @@ where exec: &mut E, ) -> Poll> where - S: HttpService, + S: HttpService, S::Error: Into>, E: ConnStreamExec, { @@ -265,7 +265,7 @@ where ( Request::from_parts( parts, - crate::Recv::h2(stream, content_length.into(), ping), + IncomingBody::h2(stream, content_length.into(), ping), ), None, ) @@ -279,7 +279,7 @@ where debug_assert!(parts.extensions.get::().is_none()); parts.extensions.insert(upgrade); ( - Request::from_parts(parts, crate::Recv::empty()), + Request::from_parts(parts, IncomingBody::empty()), Some(ConnectParts { pending, ping, diff --git a/src/server/conn/http1.rs b/src/server/conn/http1.rs index 421ef9cd72..200269cb51 100644 --- a/src/server/conn/http1.rs +++ b/src/server/conn/http1.rs @@ -8,14 +8,14 @@ use std::time::Duration; use bytes::Bytes; use tokio::io::{AsyncRead, AsyncWrite}; -use crate::body::{Body, Recv}; +use crate::body::{Body, Incoming as IncomingBody}; use crate::common::{task, Future, Pin, Poll, Unpin}; use crate::{common::time::Time, rt::Timer}; use crate::proto; use crate::service::HttpService; type Http1Dispatcher = - proto::h1::Dispatcher, B, T, proto::ServerTransaction>; + proto::h1::Dispatcher, B, T, proto::ServerTransaction>; pin_project_lite::pin_project! { @@ -25,7 +25,7 @@ pin_project_lite::pin_project! { #[must_use = "futures do nothing unless polled"] pub struct Connection where - S: HttpService, + S: HttpService, { conn: Http1Dispatcher, } @@ -72,7 +72,7 @@ pub struct Parts { impl fmt::Debug for Connection where - S: HttpService, + S: HttpService, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Connection").finish() @@ -81,7 +81,7 @@ where impl Connection where - S: HttpService, + S: HttpService, S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin, B: Body + 'static, @@ -171,7 +171,7 @@ where impl Future for Connection where - S: HttpService, + S: HttpService, S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin + 'static, B: Body + 'static, @@ -337,14 +337,14 @@ impl Builder { /// # Example /// /// ``` - /// # use hyper::{Recv, Request, Response}; + /// # use hyper::{body::Incoming, Request, Response}; /// # use hyper::service::Service; /// # use hyper::server::conn::http1::Builder; /// # use tokio::io::{AsyncRead, AsyncWrite}; /// # async fn run(some_io: I, some_service: S) /// # where /// # I: AsyncRead + AsyncWrite + Unpin + Send + 'static, - /// # S: Service, Response=hyper::Response> + Send + 'static, + /// # S: Service, Response=hyper::Response> + Send + 'static, /// # S::Error: Into>, /// # S::Future: Send, /// # { @@ -359,7 +359,7 @@ impl Builder { /// ``` pub fn serve_connection(&self, io: I, service: S) -> Connection where - S: HttpService, + S: HttpService, S::Error: Into>, S::ResBody: 'static, ::Error: Into>, @@ -413,14 +413,14 @@ mod upgrades { #[allow(missing_debug_implementations)] pub struct UpgradeableConnection where - S: HttpService, + S: HttpService, { pub(super) inner: Option>, } impl UpgradeableConnection where - S: HttpService, + S: HttpService, S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin, B: Body + 'static, @@ -437,7 +437,7 @@ mod upgrades { impl Future for UpgradeableConnection where - S: HttpService, + S: HttpService, S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin + Send + 'static, B: Body + 'static, diff --git a/src/server/conn/http2.rs b/src/server/conn/http2.rs index e39aaf80e9..501ab2e613 100644 --- a/src/server/conn/http2.rs +++ b/src/server/conn/http2.rs @@ -8,7 +8,7 @@ use std::time::Duration; use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncWrite}; -use crate::body::{Body, Recv}; +use crate::body::{Body, Incoming as IncomingBody}; use crate::common::exec::{ConnStreamExec}; use crate::common::{task, Future, Pin, Poll, Unpin}; use crate::{common::time::Time, rt::Timer}; @@ -22,7 +22,7 @@ pin_project! { #[must_use = "futures do nothing unless polled"] pub struct Connection where - S: HttpService, + S: HttpService, { conn: proto::h2::Server, } @@ -40,7 +40,7 @@ pub struct Builder { impl fmt::Debug for Connection where - S: HttpService, + S: HttpService, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Connection").finish() @@ -49,7 +49,7 @@ where impl Connection where - S: HttpService, + S: HttpService, S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin, B: Body + 'static, @@ -73,7 +73,7 @@ where impl Future for Connection where - S: HttpService, + S: HttpService, S::Error: Into>, I: AsyncRead + AsyncWrite + Unpin + 'static, B: Body + 'static, @@ -286,7 +286,7 @@ impl Builder { /// driven on the connection. pub fn serve_connection(&self, io: I, service: S) -> Connection where - S: HttpService, + S: HttpService, S::Error: Into>, Bd: Body + 'static, Bd::Error: Into>, diff --git a/src/server/conn/mod.rs b/src/server/conn/mod.rs index 41f9d0363f..be196cfbcc 100644 --- a/src/server/conn/mod.rs +++ b/src/server/conn/mod.rs @@ -17,7 +17,7 @@ //! # mod rt { //! use http::{Request, Response, StatusCode}; //! use http_body_util::Full; -//! use hyper::{server::conn::http1, service::service_fn, body::Bytes}; +//! use hyper::{server::conn::http1, service::service_fn, body, body::Bytes}; //! use std::{net::SocketAddr, convert::Infallible}; //! use tokio::net::TcpListener; //! @@ -39,7 +39,7 @@ //! } //! } //! -//! async fn hello(_req: Request) -> Result>, Infallible> { +//! async fn hello(_req: Request) -> Result>, Infallible> { //! Ok(Response::new(Full::new(Bytes::from("Hello World!")))) //! } //! # } diff --git a/src/service/util.rs b/src/service/util.rs index a41945951c..1d8587fe82 100644 --- a/src/service/util.rs +++ b/src/service/util.rs @@ -13,11 +13,11 @@ use crate::{Request, Response}; /// /// ``` /// use bytes::Bytes; -/// use hyper::{Recv, Request, Response, Version}; +/// use hyper::{body, Request, Response, Version}; /// use http_body_util::Full; /// use hyper::service::service_fn; /// -/// let service = service_fn(|req: Request| async move { +/// let service = service_fn(|req: Request| async move { /// if req.version() == Version::HTTP_11 { /// Ok(Response::new(Full::::from("Hello World"))) /// } else { diff --git a/tests/client.rs b/tests/client.rs index 508995046b..8968433885 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -1345,7 +1345,7 @@ mod conn { use hyper::body::{Body, Frame}; use hyper::client::conn; use hyper::upgrade::OnUpgrade; - use hyper::{self, Method, Recv, Request, Response, StatusCode}; + use hyper::{Method, Request, Response, StatusCode}; use super::{concat, s, support, tcp_connect, FutureHyperExt}; @@ -1899,7 +1899,7 @@ mod conn { res = listener.accept() => { let (stream, _) = res.unwrap(); - let service = service_fn(|_:Request| future::ok::<_, hyper::Error>(Response::new(Empty::::new()))); + let service = service_fn(|_:Request| future::ok::<_, hyper::Error>(Response::new(Empty::::new()))); let mut shdn_rx = shdn_rx.clone(); tokio::task::spawn(async move { @@ -1994,7 +1994,7 @@ mod conn { .http2_keep_alive_timeout(Duration::from_secs(1)) // enable while idle since we aren't sending requests .http2_keep_alive_while_idle(true) - .handshake::<_, Recv>(io) + .handshake::<_, hyper::body::Incoming>(io) .await .expect("http handshake"); @@ -2026,7 +2026,7 @@ mod conn { .timer(TokioTimer) .http2_keep_alive_interval(Duration::from_secs(1)) .http2_keep_alive_timeout(Duration::from_secs(1)) - .handshake::<_, Recv>(io) + .handshake::<_, hyper::body::Incoming>(io) .await .expect("http handshake"); diff --git a/tests/server.rs b/tests/server.rs index 6b798ef3e4..609b0541a9 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -27,10 +27,10 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener as TkTcpListener, TcpListener, TcpStream as TkTcpStream}; -use hyper::body::Body; +use hyper::body::{Body, Incoming as IncomingBody}; use hyper::server::conn::{http1, http2}; use hyper::service::{service_fn, Service}; -use hyper::{Method, Recv, Request, Response, StatusCode, Uri, Version}; +use hyper::{Method, Request, Response, StatusCode, Uri, Version}; mod support; @@ -1287,7 +1287,7 @@ async fn disconnect_after_reading_request_before_responding() { socket, service_fn(|_| { TokioTimer.sleep(Duration::from_secs(2)).map( - |_| -> Result, hyper::Error> { + |_| -> Result, hyper::Error> { panic!("response future should have been dropped"); }, ) @@ -1616,7 +1616,7 @@ async fn upgrades_new() { }); let (upgrades_tx, upgrades_rx) = mpsc::channel(); - let svc = service_fn(move |req: Request| { + let svc = service_fn(move |req: Request| { let on_upgrade = hyper::upgrade::on(req); let _ = upgrades_tx.send(on_upgrade); future::ok::<_, hyper::Error>( @@ -1658,7 +1658,7 @@ async fn upgrades_ignored() { let addr = listener.local_addr().unwrap(); tokio::spawn(async move { - let svc = service_fn(move |req: Request| { + let svc = service_fn(move |req: Request| { assert_eq!(req.headers()["upgrade"], "yolo"); future::ok::<_, hyper::Error>(Response::new(Empty::::new())) }); @@ -1725,7 +1725,7 @@ async fn http_connect_new() { }); let (upgrades_tx, upgrades_rx) = mpsc::channel(); - let svc = service_fn(move |req: Request| { + let svc = service_fn(move |req: Request| { let on_upgrade = hyper::upgrade::on(req); let _ = upgrades_tx.send(on_upgrade); future::ok::<_, hyper::Error>( @@ -1796,7 +1796,7 @@ async fn h2_connect() { assert!(recv_stream.data().await.unwrap().unwrap().is_empty()); }); - let svc = service_fn(move |req: Request| { + let svc = service_fn(move |req: Request| { let on_upgrade = hyper::upgrade::on(req); tokio::spawn(async move { @@ -1884,7 +1884,7 @@ async fn h2_connect_multiplex() { futures.for_each(future::ready).await; }); - let svc = service_fn(move |req: Request| { + let svc = service_fn(move |req: Request| { let authority = req.uri().authority().unwrap().to_string(); let on_upgrade = hyper::upgrade::on(req); @@ -1979,7 +1979,7 @@ async fn h2_connect_large_body() { assert!(recv_stream.data().await.unwrap().unwrap().is_empty()); }); - let svc = service_fn(move |req: Request| { + let svc = service_fn(move |req: Request| { let on_upgrade = hyper::upgrade::on(req); tokio::spawn(async move { @@ -2052,7 +2052,7 @@ async fn h2_connect_empty_frames() { assert!(recv_stream.data().await.unwrap().unwrap().is_empty()); }); - let svc = service_fn(move |req: Request| { + let svc = service_fn(move |req: Request| { let on_upgrade = hyper::upgrade::on(req); tokio::spawn(async move { @@ -2695,12 +2695,12 @@ enum Msg { End, } -impl Service> for TestService { +impl Service> for TestService { type Response = Response; type Error = BoxError; type Future = BoxFuture; - fn call(&mut self, mut req: Request) -> Self::Future { + fn call(&mut self, mut req: Request) -> Self::Future { let tx = self.tx.clone(); let replies = self.reply.clone(); @@ -2761,19 +2761,19 @@ const HELLO: &str = "hello"; struct HelloWorld; -impl Service> for HelloWorld { +impl Service> for HelloWorld { type Response = Response>; type Error = hyper::Error; type Future = future::Ready>; - fn call(&mut self, _req: Request) -> Self::Future { + fn call(&mut self, _req: Request) -> Self::Future { let response = Response::new(Full::new(HELLO.into())); future::ok(response) } } fn unreachable_service() -> impl Service< - http::Request, + http::Request, Response = http::Response, Error = BoxError, Future = BoxFuture, @@ -3036,7 +3036,7 @@ impl TestClient { self } - async fn get(&self, uri: Uri) -> Result, hyper::Error> { + async fn get(&self, uri: Uri) -> Result, hyper::Error> { self.request( Request::builder() .uri(uri) @@ -3047,7 +3047,10 @@ impl TestClient { .await } - async fn request(&self, req: Request>) -> Result, hyper::Error> { + async fn request( + &self, + req: Request>, + ) -> Result, hyper::Error> { let host = req.uri().host().expect("uri has no host"); let port = req.uri().port_u16().expect("uri has no port"); diff --git a/tests/support/mod.rs b/tests/support/mod.rs index dced9a2e3b..a8eb37dc35 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -12,7 +12,7 @@ use hyper::server; use tokio::net::{TcpListener, TcpStream}; use hyper::service::service_fn; -use hyper::{Recv, Request, Response, Version}; +use hyper::{body::Incoming as IncomingBody, Request, Response, Version}; pub use futures_util::{ future, FutureExt as _, StreamExt as _, TryFutureExt as _, TryStreamExt as _, @@ -360,7 +360,7 @@ async fn async_test(cfg: __TestConfig) { // Move a clone into the service_fn let serve_handles = serve_handles.clone(); - let service = service_fn(move |req: Request| { + let service = service_fn(move |req: Request| { let (sreq, sres) = serve_handles.lock().unwrap().remove(0); assert_eq!(req.uri().path(), sreq.uri, "client path"); @@ -562,7 +562,9 @@ async fn naive_proxy(cfg: ProxyConfig) -> (SocketAddr, impl Future) let mut builder = Response::builder().status(parts.status); *builder.headers_mut().unwrap() = parts.headers; - Result::, hyper::Error>::Ok(builder.body(body).unwrap()) + Result::, hyper::Error>::Ok( + builder.body(body).unwrap(), + ) } });