Skip to content

Commit

Permalink
fix(http1): force always-ready connections to yield after a few spins
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Jun 14, 2019
1 parent 5019885 commit 8316f96
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 14 deletions.
2 changes: 2 additions & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ pub(crate) mod exec;
pub(crate) mod io;
mod lazy;
mod never;
pub(crate) mod task;

pub(crate) use self::buf::StaticBuf;
pub(crate) use self::exec::Exec;
pub(crate) use self::lazy::{lazy, Started as Lazy};
pub use self::never::Never;
pub(crate) use self::task::YieldNow;
40 changes: 40 additions & 0 deletions src/common/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use futures::{Async, Poll, task::Task};

use super::Never;

/// A type to help "yield" a future, such that it is re-scheduled immediately.
///
/// Useful for spin counts, so a future doesn't hog too much time.
#[derive(Debug)]
pub(crate) struct YieldNow {
cached_task: Option<Task>,
}

impl YieldNow {
pub(crate) fn new() -> YieldNow {
YieldNow {
cached_task: None,
}
}

/// Returns `Ok(Async::NotReady)` always, while also notifying the
/// current task so that it is rescheduled immediately.
///
/// Since it never returns `Async::Ready` or `Err`, those types are
/// set to `Never`.
pub(crate) fn poll_yield(&mut self) -> Poll<Never, Never> {
// Check for a cached `Task` first...
if let Some(ref t) = self.cached_task {
if t.will_notify_current() {
t.notify();
return Ok(Async::NotReady);
}
}

// No cached task, or not current, so get a new one...
let t = ::futures::task::current();
t.notify();
self.cached_task = Some(t);
Ok(Async::NotReady)
}
}
53 changes: 39 additions & 14 deletions src/proto/h1/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tokio_io::{AsyncRead, AsyncWrite};

use body::{Body, Payload};
use body::internal::FullDataArg;
use common::Never;
use common::{Never, YieldNow};
use proto::{BodyLength, DecodedLength, Conn, Dispatched, MessageHead, RequestHead, RequestLine, ResponseHead};
use super::Http1Transaction;
use service::Service;
Expand All @@ -18,6 +18,10 @@ pub(crate) struct Dispatcher<D, Bs: Payload, I, T> {
body_tx: Option<::body::Sender>,
body_rx: Option<Bs>,
is_closing: bool,
/// If the poll loop reaches its max spin count, it will yield by notifying
/// the task immediately. This will cache that `Task`, since it usually is
/// the same one.
yield_now: YieldNow,
}

pub(crate) trait Dispatch {
Expand Down Expand Up @@ -58,6 +62,7 @@ where
body_tx: None,
body_rx: None,
is_closing: false,
yield_now: YieldNow::new(),
}
}

Expand Down Expand Up @@ -98,7 +103,29 @@ where
fn poll_inner(&mut self, should_shutdown: bool) -> Poll<Dispatched, ::Error> {
T::update_date();

loop {
try_ready!(self.poll_loop());

if self.is_done() {
if let Some(pending) = self.conn.pending_upgrade() {
self.conn.take_error()?;
return Ok(Async::Ready(Dispatched::Upgrade(pending)));
} else if should_shutdown {
try_ready!(self.conn.shutdown().map_err(::Error::new_shutdown));
}
self.conn.take_error()?;
Ok(Async::Ready(Dispatched::Shutdown))
} else {
Ok(Async::NotReady)
}
}

fn poll_loop(&mut self) -> Poll<(), ::Error> {
// Limit the looping on this connection, in case it is ready far too
// often, so that other futures don't starve.
//
// 16 was chosen arbitrarily, as that is number of pipelined requests
// benchmarks often use. Perhaps it should be a config option instead.
for _ in 0..16 {
self.poll_read()?;
self.poll_write()?;
self.poll_flush()?;
Expand All @@ -112,21 +139,19 @@ where
// Using this instead of task::current() and notify() inside
// the Conn is noticeably faster in pipelined benchmarks.
if !self.conn.wants_read_again() {
break;
//break;
return Ok(Async::Ready(()));
}
}

if self.is_done() {
if let Some(pending) = self.conn.pending_upgrade() {
self.conn.take_error()?;
return Ok(Async::Ready(Dispatched::Upgrade(pending)));
} else if should_shutdown {
try_ready!(self.conn.shutdown().map_err(::Error::new_shutdown));
}
self.conn.take_error()?;
Ok(Async::Ready(Dispatched::Shutdown))
} else {
Ok(Async::NotReady)
trace!("poll_loop yielding (self = {:p})", self);

match self.yield_now.poll_yield() {
Ok(Async::NotReady) => Ok(Async::NotReady),
// maybe with `!` this can be cleaner...
// but for now, just doing this to eliminate branches
Ok(Async::Ready(never)) |
Err(never) => match never {}
}
}

Expand Down

0 comments on commit 8316f96

Please sign in to comment.