Skip to content

Commit

Permalink
Merge branch 'hyperium:master' into doc-incoming
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhangHanDong authored Nov 12, 2023
2 parents b09bb58 + 52b27fa commit 4b0991b
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 73 deletions.
14 changes: 7 additions & 7 deletions src/client/conn/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use super::super::dispatch;
use crate::body::{Body, Incoming as IncomingBody};
use crate::common::time::Time;
use crate::proto;
use crate::rt::bounds::ExecutorClient;
use crate::rt::bounds::Http2ClientConnExec;
use crate::rt::Timer;

/// The sender side of an established connection.
Expand All @@ -41,7 +41,7 @@ pub struct Connection<T, B, E>
where
T: Read + Write + 'static + Unpin,
B: Body + 'static,
E: ExecutorClient<B, T> + Unpin,
E: Http2ClientConnExec<B, T> + Unpin,
B::Error: Into<Box<dyn Error + Send + Sync>>,
{
inner: (PhantomData<T>, proto::h2::ClientTask<B, E, T>),
Expand Down Expand Up @@ -73,7 +73,7 @@ where
B: Body + 'static,
B::Data: Send,
B::Error: Into<Box<dyn Error + Send + Sync>>,
E: ExecutorClient<B, T> + Unpin + Clone,
E: Http2ClientConnExec<B, T> + Unpin + Clone,
{
Builder::new(exec).handshake(io).await
}
Expand Down Expand Up @@ -202,7 +202,7 @@ where
B: Body + Unpin + 'static,
B::Data: Send,
B::Error: Into<Box<dyn Error + Send + Sync>>,
E: ExecutorClient<B, T> + Unpin,
E: Http2ClientConnExec<B, T> + Unpin,
{
/// Returns whether the [extended CONNECT protocol][1] is enabled or not.
///
Expand All @@ -222,7 +222,7 @@ impl<T, B, E> fmt::Debug for Connection<T, B, E>
where
T: Read + Write + fmt::Debug + 'static + Unpin,
B: Body + 'static,
E: ExecutorClient<B, T> + Unpin,
E: Http2ClientConnExec<B, T> + Unpin,
B::Error: Into<Box<dyn Error + Send + Sync>>,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand All @@ -237,7 +237,7 @@ where
B::Data: Send,
E: Unpin,
B::Error: Into<Box<dyn Error + Send + Sync>>,
E: ExecutorClient<B, T> + 'static + Send + Sync + Unpin,
E: Http2ClientConnExec<B, T> + 'static + Send + Sync + Unpin,
{
type Output = crate::Result<()>;

Expand Down Expand Up @@ -407,7 +407,7 @@ where
B: Body + 'static,
B::Data: Send,
B::Error: Into<Box<dyn Error + Send + Sync>>,
Ex: ExecutorClient<B, T> + Unpin,
Ex: Http2ClientConnExec<B, T> + Unpin,
{
let opts = self.clone();

Expand Down
64 changes: 29 additions & 35 deletions src/common/time.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
#[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
#[cfg(any(
all(any(feature = "client", feature = "server"), feature = "http2"),
all(feature = "server", feature = "http1"),
))]
use std::time::Duration;
use std::{fmt, sync::Arc};
use std::{pin::Pin, time::Instant};
Expand All @@ -13,46 +16,19 @@ pub(crate) enum Time {
Empty,
}

#[cfg(all(feature = "server", feature = "http1"))]
#[derive(Clone, Copy, Debug)]
pub(crate) enum Dur {
Default(Option<Duration>),
Configured(Option<Duration>),
}

impl fmt::Debug for Time {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Time").finish()
}
}

/*
pub(crate) fn timeout<F>(tim: Tim, future: F, duration: Duration) -> HyperTimeout<F> {
HyperTimeout { sleep: tim.sleep(duration), future: future }
}
pin_project_lite::pin_project! {
pub(crate) struct HyperTimeout<F> {
sleep: Box<dyn Sleep>,
#[pin]
future: F
}
}
pub(crate) struct Timeout;
impl<F> Future for HyperTimeout<F> where F: Future {
type Output = Result<F::Output, Timeout>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output>{
let mut this = self.project();
if let Poll::Ready(v) = this.future.poll(ctx) {
return Poll::Ready(Ok(v));
}
if let Poll::Ready(_) = Pin::new(&mut this.sleep).poll(ctx) {
return Poll::Ready(Err(Timeout));
}
return Poll::Pending;
}
}
*/

impl Time {
#[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))]
pub(crate) fn sleep(&self, duration: Duration) -> Pin<Box<dyn Sleep>> {
Expand Down Expand Up @@ -82,4 +58,22 @@ impl Time {
Time::Timer(ref t) => t.reset(sleep, new_deadline),
}
}

#[cfg(all(feature = "server", feature = "http1"))]
pub(crate) fn check(&self, dur: Dur, name: &'static str) -> Option<Duration> {
match dur {
Dur::Default(Some(dur)) => match self {
Time::Empty => {
warn!("timeout `{}` has default, but no timer set", name,);
None
}
Time::Timer(..) => Some(dur),
},
Dur::Configured(Some(dur)) => match self {
Time::Empty => panic!("timeout `{}` set, but no timer set", name,),
Time::Timer(..) => Some(dur),
},
Dur::Default(None) | Dur::Configured(None) => None,
}
}
}
10 changes: 5 additions & 5 deletions src/proto/h2/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::ext::Protocol;
use crate::headers;
use crate::proto::h2::UpgradedSendStream;
use crate::proto::Dispatched;
use crate::rt::bounds::ExecutorClient;
use crate::rt::bounds::Http2ClientConnExec;
use crate::upgrade::Upgraded;
use crate::{Request, Response};
use h2::client::ResponseFuture;
Expand Down Expand Up @@ -118,7 +118,7 @@ where
T: Read + Write + Unpin + 'static,
B: Body + 'static,
B::Data: Send + 'static,
E: ExecutorClient<B, T> + Unpin,
E: Http2ClientConnExec<B, T> + Unpin,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
let (h2_tx, mut conn) = new_builder(config)
Expand Down Expand Up @@ -405,7 +405,7 @@ where
impl<B, E, T> ClientTask<B, E, T>
where
B: Body + 'static,
E: ExecutorClient<B, T> + Unpin,
E: Http2ClientConnExec<B, T> + Unpin,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
T: Read + Write + Unpin,
{
Expand Down Expand Up @@ -457,7 +457,7 @@ impl<B, E, T> ClientTask<B, E, T>
where
B: Body + 'static + Unpin,
B::Data: Send,
E: ExecutorClient<B, T> + Unpin,
E: Http2ClientConnExec<B, T> + Unpin,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
T: Read + Write + Unpin,
{
Expand Down Expand Up @@ -593,7 +593,7 @@ where
B: Body + 'static + Unpin,
B::Data: Send,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
E: ExecutorClient<B, T> + 'static + Send + Sync + Unpin,
E: Http2ClientConnExec<B, T> + 'static + Send + Sync + Unpin,
T: Read + Write + Unpin,
{
type Output = crate::Result<Dispatched>;
Expand Down
8 changes: 4 additions & 4 deletions src/proto/h2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::headers;
use crate::proto::h2::ping::Recorder;
use crate::proto::h2::{H2Upgraded, UpgradedSendStream};
use crate::proto::Dispatched;
use crate::rt::bounds::Http2ConnExec;
use crate::rt::bounds::Http2ServerConnExec;
use crate::rt::{Read, Write};
use crate::service::HttpService;

Expand Down Expand Up @@ -112,7 +112,7 @@ where
S: HttpService<IncomingBody, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Body + 'static,
E: Http2ConnExec<S::Future, B>,
E: Http2ServerConnExec<S::Future, B>,
{
pub(crate) fn new(
io: T,
Expand Down Expand Up @@ -188,7 +188,7 @@ where
S: HttpService<IncomingBody, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
B: Body + 'static,
E: Http2ConnExec<S::Future, B>,
E: Http2ServerConnExec<S::Future, B>,
{
type Output = crate::Result<Dispatched>;

Expand Down Expand Up @@ -242,7 +242,7 @@ where
where
S: HttpService<IncomingBody, ResBody = B>,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
E: Http2ConnExec<S::Future, B>,
E: Http2ServerConnExec<S::Future, B>,
{
if self.closing.is_none() {
loop {
Expand Down
14 changes: 7 additions & 7 deletions src/rt/bounds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
//! implemented by implementing another trait.
#[cfg(all(feature = "server", feature = "http2"))]
pub use self::h2::Http2ConnExec;
pub use self::h2::Http2ServerConnExec;

#[cfg(all(feature = "client", feature = "http2"))]
pub use self::h2_client::ExecutorClient;
pub use self::h2_client::Http2ClientConnExec;

#[cfg(all(feature = "client", feature = "http2"))]
#[cfg_attr(docsrs, doc(cfg(all(feature = "server", feature = "http2"))))]
#[cfg_attr(docsrs, doc(cfg(all(feature = "client", feature = "http2"))))]
mod h2_client {
use std::{error::Error, future::Future};

Expand All @@ -25,7 +25,7 @@ mod h2_client {
/// This trait is sealed and cannot be implemented for types outside this crate.
///
/// [`Executor`]: crate::rt::Executor
pub trait ExecutorClient<B, T>: sealed_client::Sealed<(B, T)>
pub trait Http2ClientConnExec<B, T>: sealed_client::Sealed<(B, T)>
where
B: http_body::Body,
B::Error: Into<Box<dyn Error + Send + Sync>>,
Expand All @@ -35,7 +35,7 @@ mod h2_client {
fn execute_h2_future(&mut self, future: H2ClientFuture<B, T>);
}

impl<E, B, T> ExecutorClient<B, T> for E
impl<E, B, T> Http2ClientConnExec<B, T> for E
where
E: Executor<H2ClientFuture<B, T>>,
B: http_body::Body + 'static,
Expand Down Expand Up @@ -78,13 +78,13 @@ mod h2 {
/// This trait is sealed and cannot be implemented for types outside this crate.
///
/// [`Executor`]: crate::rt::Executor
pub trait Http2ConnExec<F, B: Body>: sealed::Sealed<(F, B)> + Clone {
pub trait Http2ServerConnExec<F, B: Body>: sealed::Sealed<(F, B)> + Clone {
#[doc(hidden)]
fn execute_h2stream(&mut self, fut: H2Stream<F, B>);
}

#[doc(hidden)]
impl<E, F, B> Http2ConnExec<F, B> for E
impl<E, F, B> Http2ServerConnExec<F, B> for E
where
E: Executor<H2Stream<F, B>> + Clone,
H2Stream<F, B>: Future<Output = ()>,
Expand Down
31 changes: 22 additions & 9 deletions src/server/conn/http1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ use bytes::Bytes;
use crate::body::{Body, Incoming as IncomingBody};
use crate::proto;
use crate::service::HttpService;
use crate::{common::time::Time, rt::Timer};
use crate::{
common::time::{Dur, Time},
rt::Timer,
};

type Http1Dispatcher<T, B, S> = proto::h1::Dispatcher<
proto::h1::dispatch::Server<S, IncomingBody>,
Expand Down Expand Up @@ -70,7 +73,7 @@ pub struct Builder {
h1_keep_alive: bool,
h1_title_case_headers: bool,
h1_preserve_header_case: bool,
h1_header_read_timeout: Option<Duration>,
h1_header_read_timeout: Dur,
h1_writev: Option<bool>,
max_buf_size: Option<usize>,
pipeline_flush: bool,
Expand Down Expand Up @@ -237,7 +240,7 @@ impl Builder {
h1_keep_alive: true,
h1_title_case_headers: false,
h1_preserve_header_case: false,
h1_header_read_timeout: None,
h1_header_read_timeout: Dur::Default(Some(Duration::from_secs(30))),
h1_writev: None,
max_buf_size: None,
pipeline_flush: false,
Expand Down Expand Up @@ -292,9 +295,11 @@ impl Builder {
/// Set a timeout for reading client request headers. If a client does not
/// transmit the entire header within this time, the connection is closed.
///
/// Default is None.
pub fn header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self {
self.h1_header_read_timeout = Some(read_timeout);
/// Pass `None` to disable.
///
/// Default is 30 seconds.
pub fn header_read_timeout(&mut self, read_timeout: impl Into<Option<Duration>>) -> &mut Self {
self.h1_header_read_timeout = Dur::Configured(read_timeout.into());
self
}

Expand Down Expand Up @@ -355,6 +360,11 @@ impl Builder {
/// This returns a Future that must be polled in order for HTTP to be
/// driven on the connection.
///
/// # Panics
///
/// If a timeout option has been configured, but a `timer` has not been
/// provided, calling `serve_connection` will panic.
///
/// # Example
///
/// ```
Expand Down Expand Up @@ -400,9 +410,12 @@ impl Builder {
if self.h1_preserve_header_case {
conn.set_preserve_header_case();
}
if let Some(header_read_timeout) = self.h1_header_read_timeout {
conn.set_http1_header_read_timeout(header_read_timeout);
}
if let Some(dur) = self
.timer
.check(self.h1_header_read_timeout, "header_read_timeout")
{
conn.set_http1_header_read_timeout(dur);
};
if let Some(writev) = self.h1_writev {
if writev {
conn.set_write_strategy_queue();
Expand Down
Loading

0 comments on commit 4b0991b

Please sign in to comment.