From 4a22f005a4ca4416d5cfbbd8ecd5ea79cbdd72a4 Mon Sep 17 00:00:00 2001 From: Dax Huiberts Date: Fri, 8 Dec 2023 13:49:33 +0100 Subject: [PATCH 1/2] feat: Add TokioTimer to rt::tokio module --- src/rt/mod.rs | 2 +- src/rt/tokio.rs | 63 ++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 63 insertions(+), 2 deletions(-) diff --git a/src/rt/mod.rs b/src/rt/mod.rs index cedc8cca..3ed86285 100644 --- a/src/rt/mod.rs +++ b/src/rt/mod.rs @@ -4,4 +4,4 @@ pub mod tokio; #[cfg(feature = "tokio")] -pub use self::tokio::{TokioExecutor, TokioIo}; +pub use self::tokio::{TokioExecutor, TokioIo, TokioTimer}; diff --git a/src/rt/tokio.rs b/src/rt/tokio.rs index ac2c0099..edce57a1 100644 --- a/src/rt/tokio.rs +++ b/src/rt/tokio.rs @@ -4,9 +4,10 @@ use std::{ future::Future, pin::Pin, task::{Context, Poll}, + time::{Duration, Instant}, }; -use hyper::rt::Executor; +use hyper::rt::{Executor, Sleep, Timer}; use pin_project_lite::pin_project; /// Future executor that utilises `tokio` threads. @@ -24,6 +25,21 @@ pin_project! { } } +/// A Timer that uses the tokio runtime. +#[non_exhaustive] +#[derive(Default, Clone, Debug)] +pub struct TokioTimer; + +// Use TokioSleep to get tokio::time::Sleep to implement Unpin. +// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html +pin_project! { + #[derive(Debug)] + struct TokioSleep { + #[pin] + inner: tokio::time::Sleep, + } +} + // ===== impl TokioExecutor ===== impl Executor for TokioExecutor @@ -190,6 +206,51 @@ where } } +// ==== impl TokioTimer ===== + +impl Timer for TokioTimer { + fn sleep(&self, duration: Duration) -> Pin> { + Box::pin(TokioSleep { + inner: tokio::time::sleep(duration), + }) + } + + fn sleep_until(&self, deadline: Instant) -> Pin> { + Box::pin(TokioSleep { + inner: tokio::time::sleep_until(deadline.into()), + }) + } + + fn reset(&self, sleep: &mut Pin>, new_deadline: Instant) { + if let Some(sleep) = sleep.as_mut().downcast_mut_pin::() { + sleep.reset(new_deadline) + } + } +} + +impl TokioTimer { + /// Create a new TokioTimer + pub fn new() -> Self { + Self {} + } +} + +impl Future for TokioSleep { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().inner.poll(cx) + } +} + +impl Sleep for TokioSleep {} + +impl TokioSleep { + pub fn reset(self: Pin<&mut Self>, deadline: Instant) { + self.project().inner.as_mut().reset(deadline.into()); + } +} + #[cfg(test)] mod tests { use crate::rt::TokioExecutor; From bee620bdcf9e3b39bf199d5e4d9ddedcc95a3cc5 Mon Sep 17 00:00:00 2001 From: Dax Huiberts Date: Fri, 8 Dec 2023 22:43:51 +0100 Subject: [PATCH 2/2] Make `TokioSleep.reset` implementation not public Co-authored-by: Sean McArthur --- src/rt/tokio.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rt/tokio.rs b/src/rt/tokio.rs index edce57a1..0a6b6789 100644 --- a/src/rt/tokio.rs +++ b/src/rt/tokio.rs @@ -246,7 +246,7 @@ impl Future for TokioSleep { impl Sleep for TokioSleep {} impl TokioSleep { - pub fn reset(self: Pin<&mut Self>, deadline: Instant) { + fn reset(self: Pin<&mut Self>, deadline: Instant) { self.project().inner.as_mut().reset(deadline.into()); } }