Skip to content

Commit

Permalink
feat(rt): add Timer trait (#2974)
Browse files Browse the repository at this point in the history
This adds a `hyper::rt::Timer` trait, and it is used in connection
builders to configure a custom timer source for timeouts.

Co-authored-by: Robert Cunningham <[email protected]>
  • Loading branch information
seanmonstar and Robert-Cunningham authored Sep 2, 2022
1 parent 2988baa commit fae97ce
Show file tree
Hide file tree
Showing 22 changed files with 470 additions and 114 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ server = []
runtime = [
"tokio/net",
"tokio/rt",
"tokio/time",
]

# C-API support (currently unstable (no semver))
Expand Down
3 changes: 3 additions & 0 deletions benches/support/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

mod tokiort;
pub use tokiort::TokioTimer;
66 changes: 66 additions & 0 deletions benches/support/tokiort.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#![allow(dead_code)]
//! Various runtimes for hyper
use std::{
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};

use futures_util::Future;
use hyper::rt::{Sleep, Timer};

/// An Executor that uses the tokio runtime.
pub struct TokioExecutor;

/// A Timer that uses the tokio runtime.
#[derive(Clone, Debug)]
pub struct TokioTimer;

impl Timer for TokioTimer {
fn sleep(&self, duration: Duration) -> Box<dyn Sleep + Unpin> {
let s = tokio::time::sleep(duration);
let hs = TokioSleep { inner: Box::pin(s) };
return Box::new(hs);
}

fn sleep_until(&self, deadline: Instant) -> Box<dyn Sleep + Unpin> {
return Box::new(TokioSleep {
inner: Box::pin(tokio::time::sleep_until(deadline.into())),
});
}
}

struct TokioTimeout<T> {
inner: Pin<Box<tokio::time::Timeout<T>>>,
}

impl<T> Future for TokioTimeout<T>
where
T: Future,
{
type Output = Result<T::Output, tokio::time::error::Elapsed>;

fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
self.inner.as_mut().poll(context)
}
}

// Use TokioSleep to get tokio::time::Sleep to implement Unpin.
// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html
pub(crate) struct TokioSleep {
pub(crate) inner: Pin<Box<tokio::time::Sleep>>,
}

impl Future for TokioSleep {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.inner.as_mut().poll(cx)
}
}

// Use HasSleep to get tokio::time::Sleep to implement Unpin.
// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html

impl Sleep for TokioSleep {}
14 changes: 8 additions & 6 deletions src/client/conn/http1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ use tokio::io::{AsyncRead, AsyncWrite};

use crate::Recv;
use crate::body::Body;
use super::super::dispatch;
use crate::common::{
exec::{BoxSendFuture, Exec},
task, Future, Pin, Poll,
};
use crate::upgrade::Upgraded;
use crate::proto;
use crate::rt::Executor;
use super::super::dispatch;
use crate::rt::{Executor};
use crate::upgrade::Upgraded;

type Dispatcher<T, B> =
proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
Expand Down Expand Up @@ -120,7 +120,10 @@ where
/// before calling this method.
/// - Since absolute-form `Uri`s are not required, if received, they will
/// be serialized as-is.
pub fn send_request(&mut self, req: Request<B>) -> impl Future<Output = crate::Result<Response<Recv>>> {
pub fn send_request(
&mut self,
req: Request<B>,
) -> impl Future<Output = crate::Result<Response<Recv>>> {
let sent = self.dispatch.send(req);

async move {
Expand All @@ -130,7 +133,7 @@ where
Ok(Err(err)) => Err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_canceled) => panic!("dispatch dropped without returning error"),
}
},
Err(_req) => {
tracing::debug!("connection was not ready");

Expand Down Expand Up @@ -476,4 +479,3 @@ impl Builder {
}
}
}

30 changes: 22 additions & 8 deletions src/client/conn/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ use tokio::io::{AsyncRead, AsyncWrite};

use crate::Recv;
use crate::body::Body;
use super::super::dispatch;
use crate::common::time::Time;
use crate::common::{
exec::{BoxSendFuture, Exec},
task, Future, Pin, Poll,
};
use crate::proto;
use crate::rt::Executor;
use super::super::dispatch;
use crate::rt::{Executor, Timer};

/// The sender side of an established connection.
pub struct SendRequest<B> {
Expand All @@ -44,6 +45,7 @@ where
#[derive(Clone, Debug)]
pub struct Builder {
pub(super) exec: Exec,
pub(super) timer: Time,
h2_builder: proto::h2::client::Config,
}

Expand Down Expand Up @@ -114,7 +116,10 @@ where
/// before calling this method.
/// - Since absolute-form `Uri`s are not required, if received, they will
/// be serialized as-is.
pub fn send_request(&mut self, req: Request<B>) -> impl Future<Output = crate::Result<Response<Recv>>> {
pub fn send_request(
&mut self,
req: Request<B>,
) -> impl Future<Output = crate::Result<Response<Recv>>> {
let sent = self.dispatch.send(req);

async move {
Expand All @@ -124,7 +129,7 @@ where
Ok(Err(err)) => Err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_canceled) => panic!("dispatch dropped without returning error"),
}
},
Err(_req) => {
tracing::debug!("connection was not ready");

Expand Down Expand Up @@ -207,6 +212,7 @@ impl Builder {
pub fn new() -> Builder {
Builder {
exec: Exec::Default,
timer: Time::Empty,
h2_builder: Default::default(),
}
}
Expand All @@ -220,6 +226,15 @@ impl Builder {
self
}

/// Provide a timer to execute background HTTP2 tasks.
pub fn timer<M>(&mut self, timer: M) -> &mut Builder
where
M: Timer + Send + Sync + 'static,
{
self.timer = Time::Timer(Arc::new(timer));
self
}

/// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
/// stream-level flow control.
///
Expand Down Expand Up @@ -398,14 +413,13 @@ impl Builder {
tracing::trace!("client handshake HTTP/1");

let (tx, rx) = dispatch::channel();
let h2 =
proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec)
.await?;
let h2 = proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec, opts.timer)
.await?;
Ok((
SendRequest { dispatch: tx.unbound() },
//SendRequest { dispatch: tx },
Connection { inner: (PhantomData, h2) },
))
}
}
}

23 changes: 20 additions & 3 deletions src/client/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ use crate::rt::Executor;
#[cfg(feature = "http1")]
use crate::upgrade::Upgraded;
use crate::{Recv, Request, Response};
use crate::{common::time::Time, rt::Timer};

#[cfg(feature = "http1")]
pub mod http1;
Expand Down Expand Up @@ -161,6 +162,7 @@ where
#[derive(Clone, Debug)]
pub struct Builder {
pub(super) exec: Exec,
pub(super) timer: Time,
h09_responses: bool,
h1_parser_config: ParserConfig,
h1_writev: Option<bool>,
Expand Down Expand Up @@ -418,6 +420,7 @@ impl Builder {
pub fn new() -> Builder {
Builder {
exec: Exec::Default,
timer: Time::Empty,
h09_responses: false,
h1_writev: None,
h1_read_buf_exact_size: None,
Expand Down Expand Up @@ -447,6 +450,15 @@ impl Builder {
self
}

/// Provide a timer to execute background HTTP2 tasks.
pub fn timer<M>(&mut self, timer: M) -> &mut Builder
where
M: Timer + Send + Sync + 'static,
{
self.timer = Time::Timer(Arc::new(timer));
self
}

/// Set whether HTTP/0.9 responses should be tolerated.
///
/// Default is false.
Expand Down Expand Up @@ -857,9 +869,14 @@ impl Builder {
}
#[cfg(feature = "http2")]
Proto::Http2 => {
let h2 =
proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec.clone())
.await?;
let h2 = proto::h2::client::handshake(
io,
rx,
&opts.h2_builder,
opts.exec.clone(),
opts.timer.clone(),
)
.await?;
ProtoClient::H2 { h2 }
}
};
Expand Down
9 changes: 9 additions & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ pub(crate) mod exec;
pub(crate) mod io;
mod never;
pub(crate) mod task;
#[cfg(any(feature = "http1", feature = "http2", feature = "server"))]
pub(crate) mod time;
pub(crate) mod watch;

#[cfg(any(feature = "http1", feature = "http2", feature = "runtime"))]
Expand All @@ -26,3 +28,10 @@ cfg_proto! {
pub(crate) use std::marker::Unpin;
}
pub(crate) use std::{future::Future, pin::Pin};

pub(crate) fn into_pin<T: ?Sized>(boxed: Box<T>) -> Pin<Box<T>> {
// It's not possible to move or replace the insides of a `Pin<Box<T>>`
// when `T: !Unpin`, so it's safe to pin it directly without any
// additional requirements.
unsafe { Pin::new_unchecked(boxed) }
}
87 changes: 87 additions & 0 deletions src/common/time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use std::{fmt, sync::Arc};
#[cfg(all(feature = "server", feature = "runtime"))]
use std::{
pin::Pin,
time::{Duration, Instant},
};

#[cfg(all(feature = "server", feature = "runtime"))]
use crate::rt::Sleep;
use crate::rt::Timer;

/// A user-provided timer to time background tasks.
#[derive(Clone)]
pub(crate) enum Time {
Timer(Arc<dyn Timer + Send + Sync>),
Empty,
}

impl fmt::Debug for Time {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Time").finish()
}
}

/*
pub(crate) fn timeout<F>(tim: Tim, future: F, duration: Duration) -> HyperTimeout<F> {
HyperTimeout { sleep: tim.sleep(duration), future: future }
}
pin_project_lite::pin_project! {
pub(crate) struct HyperTimeout<F> {
sleep: Box<dyn Sleep>,
#[pin]
future: F
}
}
pub(crate) struct Timeout;
impl<F> Future for HyperTimeout<F> where F: Future {
type Output = Result<F::Output, Timeout>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output>{
let mut this = self.project();
if let Poll::Ready(v) = this.future.poll(ctx) {
return Poll::Ready(Ok(v));
}
if let Poll::Ready(_) = Pin::new(&mut this.sleep).poll(ctx) {
return Poll::Ready(Err(Timeout));
}
return Poll::Pending;
}
}
*/

#[cfg(all(feature = "server", feature = "runtime"))]
impl Time {
pub(crate) fn sleep(&self, duration: Duration) -> Box<dyn Sleep + Unpin> {
match *self {
Time::Empty => {
panic!("You must supply a timer.")
}
Time::Timer(ref t) => t.sleep(duration),
}
}

pub(crate) fn sleep_until(&self, deadline: Instant) -> Box<dyn Sleep + Unpin> {
match *self {
Time::Empty => {
panic!("You must supply a timer.")
}
Time::Timer(ref t) => t.sleep_until(deadline),
}
}

pub(crate) fn reset(&self, sleep: &mut Pin<Box<dyn Sleep>>, new_deadline: Instant) {
match *self {
Time::Empty => {
panic!("You must supply a timer.")
}
Time::Timer(ref t) => t.reset(sleep, new_deadline),
}
}
}
Loading

0 comments on commit fae97ce

Please sign in to comment.