Skip to content

Commit

Permalink
feat(server): allow !Send Servers
Browse files Browse the repository at this point in the history
Until this commit, servers have required that `Service` and their
`Future` to be `Send`, since the server needs to spawn some internal
tasks to an executor, and by default, that is `tokio::spawn`, which
could be spawning to a threadpool. This was true even if the user were
certain there was no threadpool involved, and was instead using a
different single-threaded runtime, like
`tokio::runtime::current_thread`.

This changes makes all the server pieces generic over an `E`, which is
essentially `Executor<PrivateTypes<Server::Future>>`. There's a new set
of internal traits, `H2Exec` and `NewSvcExec`, which allow for the type
signature to only show the generics that the user is providing. The
traits cannot be implemented explicitly, but there are blanket
implementations for `E: Executor<SpecificType>`. If the user provides
their own executor, it simply needs to have a generic `impl<F>
Executor<F> for MyExec`. That impl can have bounds deciding whether to
require `F: Send`. If the executor does require `Send`, and the
`Service` futures are `!Send`, there will be compiler errors.

To prevent a breaking change, all the types that gained the `E` generic
have a default type set, which is the original `tokio::spawn` executor.
  • Loading branch information
seanmonstar committed Oct 16, 2018
1 parent 00c96de commit ced949c
Show file tree
Hide file tree
Showing 11 changed files with 426 additions and 133 deletions.
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ name = "send_file"
path = "examples/send_file.rs"
required-features = ["runtime"]

[[example]]
name = "single_threaded"
path = "examples/single_threaded.rs"
required-features = ["runtime"]

[[example]]
name = "state"
path = "examples/state.rs"
Expand Down
2 changes: 2 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ parses it with serde and outputs the result.

* [`send_file`](send_file.rs) - A server that sends back content of files using tokio_fs to read the files asynchronously.

* [`single_threaded`](single_threaded.rs) - A server only running on 1 thread, so it can make use of `!Send` app state (like an `Rc` counter).

* [`state`](state.rs) - A webserver showing basic state sharing among requests. A counter is shared, incremented for every request, and every response is sent the last count.

* [`upgrades`](upgrades.rs) - A server and client demonstrating how to do HTTP upgrades (such as WebSockets or `CONNECT` tunneling).
Expand Down
51 changes: 51 additions & 0 deletions examples/single_threaded.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#![deny(warnings)]
extern crate futures;
extern crate hyper;
extern crate pretty_env_logger;
extern crate tokio;

use std::cell::Cell;
use std::rc::Rc;

use hyper::{Body, Response, Server};
use hyper::service::service_fn_ok;
use hyper::rt::Future;
use tokio::runtime::current_thread;

fn main() {
pretty_env_logger::init();

let addr = ([127, 0, 0, 1], 3000).into();

// Using a !Send request counter is fine on 1 thread...
let counter = Rc::new(Cell::new(0));

let new_service = move || {
// For each connection, clone the counter to use in our service...
let cnt = counter.clone();

service_fn_ok(move |_| {
let prev = cnt.get();
cnt.set(prev + 1);
Response::new(Body::from(format!("Request count: {}", prev + 1)))
})
};

// Since the Server needs to spawn some background tasks, we needed
// to configure an Executor that can spawn !Send futures...
let exec = current_thread::TaskExecutor::current();

let server = Server::bind(&addr)
.executor(exec)
.serve(new_service)
.map_err(|e| eprintln!("server error: {}", e));

println!("Listening on http://{}", addr);

current_thread::Runtime::new()
.expect("rt new")
.spawn(server)
.run()
.expect("rt run");
}

1 change: 1 addition & 0 deletions src/common/drain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub struct Watch {
rx: Shared<oneshot::Receiver<()>>,
}

#[allow(missing_debug_implementations)]
pub struct Watching<F, FN> {
future: F,
state: State<FN>,
Expand Down
75 changes: 72 additions & 3 deletions src/common/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,28 @@ use std::sync::Arc;

use futures::future::{Executor, Future};

/// Either the user provides an executor for background tasks, or we use
/// `tokio::spawn`.
use body::Payload;
use proto::h2::server::H2Stream;
use server::conn::spawn_all::{NewSvcTask, Watcher};
use service::Service;

pub trait H2Exec<F, B: Payload>: Clone {
fn execute_h2stream(&self, fut: H2Stream<F, B>) -> ::Result<()>;
}

pub trait NewSvcExec<I, N, S: Service, E, W: Watcher<I, S, E>>: Clone {
fn execute_new_svc(&self, fut: NewSvcTask<I, N, S, E, W>) -> ::Result<()>;
}

// Either the user provides an executor for background tasks, or we use
// `tokio::spawn`.
#[derive(Clone)]
pub(crate) enum Exec {
pub enum Exec {
Default,
Executor(Arc<Executor<Box<Future<Item=(), Error=()> + Send>> + Send + Sync>),
}

// ===== impl Exec =====

impl Exec {
pub(crate) fn execute<F>(&self, fut: F) -> ::Result<()>
Expand Down Expand Up @@ -52,3 +66,58 @@ impl fmt::Debug for Exec {
.finish()
}
}


impl<F, B> H2Exec<F, B> for Exec
where
H2Stream<F, B>: Future<Item=(), Error=()> + Send + 'static,
B: Payload,
{
fn execute_h2stream(&self, fut: H2Stream<F, B>) -> ::Result<()> {
self.execute(fut)
}
}

impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for Exec
where
NewSvcTask<I, N, S, E, W>: Future<Item=(), Error=()> + Send + 'static,
S: Service,
W: Watcher<I, S, E>,
{
fn execute_new_svc(&self, fut: NewSvcTask<I, N, S, E, W>) -> ::Result<()> {
self.execute(fut)
}
}

// ==== impl Executor =====

impl<E, F, B> H2Exec<F, B> for E
where
E: Executor<H2Stream<F, B>> + Clone,
H2Stream<F, B>: Future<Item=(), Error=()>,
B: Payload,
{
fn execute_h2stream(&self, fut: H2Stream<F, B>) -> ::Result<()> {
self.execute(fut)
.map_err(|err| {
warn!("executor error: {:?}", err.kind());
::Error::new_execute()
})
}
}

impl<I, N, S, E, W> NewSvcExec<I, N, S, E, W> for E
where
E: Executor<NewSvcTask<I, N, S, E, W>> + Clone,
NewSvcTask<I, N, S, E, W>: Future<Item=(), Error=()>,
S: Service,
W: Watcher<I, S, E>,
{
fn execute_new_svc(&self, fut: NewSvcTask<I, N, S, E, W>) -> ::Result<()> {
self.execute(fut)
.map_err(|err| {
warn!("executor error: {:?}", err.kind());
::Error::new_execute()
})
}
}
2 changes: 1 addition & 1 deletion src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod buf;
pub(crate) mod drain;
mod exec;
pub(crate) mod exec;
pub(crate) mod io;
mod lazy;
mod never;
Expand Down
2 changes: 1 addition & 1 deletion src/proto/h2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use http::HeaderMap;
use body::Payload;

mod client;
mod server;
pub(crate) mod server;

pub(crate) use self::client::Client;
pub(crate) use self::server::Server;
Expand Down
27 changes: 15 additions & 12 deletions src/proto/h2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,20 @@ use tokio_io::{AsyncRead, AsyncWrite};

use ::headers::content_length_parse_all;
use ::body::Payload;
use ::common::Exec;
use ::common::exec::H2Exec;
use ::headers;
use ::service::Service;
use ::proto::Dispatched;
use super::{PipeToSendStream, SendBuf};

use ::{Body, Response};

pub(crate) struct Server<T, S, B>
pub(crate) struct Server<T, S, B, E>
where
S: Service,
B: Payload,
{
exec: Exec,
exec: E,
service: S,
state: State<T, B>,
}
Expand All @@ -40,15 +40,16 @@ where
}


impl<T, S, B> Server<T, S, B>
impl<T, S, B, E> Server<T, S, B, E>
where
T: AsyncRead + AsyncWrite,
S: Service<ReqBody=Body, ResBody=B>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
S::Future: Send + 'static,
//S::Future: Send + 'static,
B: Payload,
E: H2Exec<S::Future, B>,
{
pub(crate) fn new(io: T, service: S, exec: Exec) -> Server<T, S, B> {
pub(crate) fn new(io: T, service: S, exec: E) -> Server<T, S, B, E> {
let handshake = Builder::new()
.handshake(io);
Server {
Expand Down Expand Up @@ -76,13 +77,14 @@ where
}
}

impl<T, S, B> Future for Server<T, S, B>
impl<T, S, B, E> Future for Server<T, S, B, E>
where
T: AsyncRead + AsyncWrite,
S: Service<ReqBody=Body, ResBody=B>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
S::Future: Send + 'static,
//S::Future: Send + 'static,
B: Payload,
E: H2Exec<S::Future, B>,
{
type Item = Dispatched;
type Error = ::Error;
Expand Down Expand Up @@ -116,14 +118,14 @@ where
T: AsyncRead + AsyncWrite,
B: Payload,
{
fn poll_server<S>(&mut self, service: &mut S, exec: &Exec) -> Poll<(), ::Error>
fn poll_server<S, E>(&mut self, service: &mut S, exec: &E) -> Poll<(), ::Error>
where
S: Service<
ReqBody=Body,
ResBody=B,
>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
S::Future: Send + 'static,
E: H2Exec<S::Future, B>,
{
while let Some((req, respond)) = try_ready!(self.conn.poll().map_err(::Error::new_h2)) {
trace!("incoming request");
Expand All @@ -132,7 +134,7 @@ where
::Body::h2(stream, content_length)
});
let fut = H2Stream::new(service.call(req), respond);
exec.execute(fut)?;
exec.execute_h2stream(fut)?;
}

// no more incoming streams...
Expand All @@ -141,7 +143,8 @@ where
}
}

struct H2Stream<F, B>
#[allow(missing_debug_implementations)]
pub struct H2Stream<F, B>
where
B: Payload,
{
Expand Down
Loading

0 comments on commit ced949c

Please sign in to comment.