Skip to content

Commit

Permalink
feat(server): change NewService to MakeService with connection co…
Browse files Browse the repository at this point in the history
…ntext

This adjusts the way `Service`s are created for a `hyper::Server`. The
`MakeService` trait allows receiving an argument when creating a
`Service`. The implementation for `hyper::Server` expects to pass a
reference to the accepted transport (so, `&Incoming::Item`). The user
can inspect the transport before making a `Service`.

In practice, this allows for things like getting the remote socket
address, or the TLS certification, or similar.

To prevent a breaking change, there is a blanket implementation of
`MakeService` for any `NewService`. Besides implementing `MakeService`
directly, there is also added `hyper::service::make_service_fn`.

Closes #1650
  • Loading branch information
seanmonstar committed Nov 1, 2018
1 parent 576addd commit 40f701b
Show file tree
Hide file tree
Showing 8 changed files with 219 additions and 58 deletions.
24 changes: 9 additions & 15 deletions examples/hello.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,23 @@
extern crate hyper;
extern crate pretty_env_logger;

use hyper::{Body, Response, Server};
use hyper::{Body, Request, Response, Server};
use hyper::service::service_fn_ok;
use hyper::rt::{self, Future};

static PHRASE: &'static [u8] = b"Hello World!";

fn main() {
pretty_env_logger::init();
let addr = ([127, 0, 0, 1], 3000).into();

// new_service is run for each connection, creating a 'service'
// to handle requests for that specific connection.
let new_service = || {
// This is the `Service` that will handle the connection.
// `service_fn_ok` is a helper to convert a function that
// returns a Response into a `Service`.
service_fn_ok(|_| {
Response::new(Body::from(PHRASE))
})
};

let server = Server::bind(&addr)
.serve(new_service)
.serve(|| {
// This is the `Service` that will handle the connection.
// `service_fn_ok` is a helper to convert a function that
// returns a Response into a `Service`.
service_fn_ok(move |_: Request<Body>| {
Response::new(Body::from("Hello World!"))
})
})
.map_err(|e| eprintln!("server error: {}", e));

println!("Listening on http://{}", addr);
Expand Down
95 changes: 73 additions & 22 deletions src/server/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ use common::exec::{Exec, H2Exec, NewSvcExec};
use common::io::Rewind;
use error::{Kind, Parse};
use proto;
use service::{NewService, Service};
use service::Service;
use upgrade::Upgraded;

pub(super) use self::make_service::MakeServiceRef;
pub(super) use self::spawn_all::NoopWatcher;
use self::spawn_all::NewSvcTask;
pub(super) use self::spawn_all::Watcher;
pub(super) use self::upgrades::UpgradeableConnection;

#[cfg(feature = "runtime")] pub use super::tcp::AddrIncoming;
#[cfg(feature = "runtime")] pub use super::tcp::{AddrIncoming, AddrStream};

/// A lower-level configuration of the HTTP protocol.
///
Expand Down Expand Up @@ -69,13 +70,13 @@ enum ConnectionMode {
#[derive(Debug)]
pub struct Serve<I, S, E = Exec> {
incoming: I,
new_service: S,
make_service: S,
protocol: Http<E>,
}

/// A future building a new `Service` to a `Connection`.
///
/// Wraps the future returned from `NewService` into one that returns
/// Wraps the future returned from `MakeService` into one that returns
/// a `Connection`.
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
Expand Down Expand Up @@ -349,12 +350,16 @@ impl<E> Http<E> {
///
/// This method will bind the `addr` provided with a new TCP listener ready
/// to accept connections. Each connection will be processed with the
/// `new_service` object provided, creating a new service per
/// `make_service` object provided, creating a new service per
/// connection.
#[cfg(feature = "runtime")]
pub fn serve_addr<S, Bd>(&self, addr: &SocketAddr, new_service: S) -> ::Result<Serve<AddrIncoming, S, E>>
pub fn serve_addr<S, Bd>(&self, addr: &SocketAddr, make_service: S) -> ::Result<Serve<AddrIncoming, S, E>>
where
S: NewService<ReqBody=Body, ResBody=Bd>,
S: MakeServiceRef<
AddrStream,
ReqBody=Body,
ResBody=Bd,
>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
Bd: Payload,
E: H2Exec<<S::Service as Service>::Future, Bd>,
Expand All @@ -363,19 +368,23 @@ impl<E> Http<E> {
if self.keep_alive {
incoming.set_keepalive(Some(Duration::from_secs(90)));
}
Ok(self.serve_incoming(incoming, new_service))
Ok(self.serve_incoming(incoming, make_service))
}

/// Bind the provided `addr` with the `Handle` and return a [`Serve`](Serve)
///
/// This method will bind the `addr` provided with a new TCP listener ready
/// to accept connections. Each connection will be processed with the
/// `new_service` object provided, creating a new service per
/// `make_service` object provided, creating a new service per
/// connection.
#[cfg(feature = "runtime")]
pub fn serve_addr_handle<S, Bd>(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result<Serve<AddrIncoming, S, E>>
pub fn serve_addr_handle<S, Bd>(&self, addr: &SocketAddr, handle: &Handle, make_service: S) -> ::Result<Serve<AddrIncoming, S, E>>
where
S: NewService<ReqBody=Body, ResBody=Bd>,
S: MakeServiceRef<
AddrStream,
ReqBody=Body,
ResBody=Bd,
>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
Bd: Payload,
E: H2Exec<<S::Service as Service>::Future, Bd>,
Expand All @@ -384,23 +393,27 @@ impl<E> Http<E> {
if self.keep_alive {
incoming.set_keepalive(Some(Duration::from_secs(90)));
}
Ok(self.serve_incoming(incoming, new_service))
Ok(self.serve_incoming(incoming, make_service))
}

/// Bind the provided stream of incoming IO objects with a `NewService`.
pub fn serve_incoming<I, S, Bd>(&self, incoming: I, new_service: S) -> Serve<I, S, E>
/// Bind the provided stream of incoming IO objects with a `MakeService`.
pub fn serve_incoming<I, S, Bd>(&self, incoming: I, make_service: S) -> Serve<I, S, E>
where
I: Stream,
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
I::Item: AsyncRead + AsyncWrite,
S: NewService<ReqBody=Body, ResBody=Bd>,
S: MakeServiceRef<
I::Item,
ReqBody=Body,
ResBody=Bd,
>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
Bd: Payload,
E: H2Exec<<S::Service as Service>::Future, Bd>,
{
Serve {
incoming: incoming,
new_service: new_service,
incoming,
make_service,
protocol: self.clone(),
}
}
Expand Down Expand Up @@ -604,8 +617,9 @@ where
I: Stream,
I::Item: AsyncRead + AsyncWrite,
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
S: NewService<ReqBody=Body, ResBody=B>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
S: MakeServiceRef<I::Item, ReqBody=Body, ResBody=B>,
//S::Error2: Into<Box<::std::error::Error + Send + Sync>>,
//SME: Into<Box<::std::error::Error + Send + Sync>>,
B: Payload,
E: H2Exec<<S::Service as Service>::Future, B>,
{
Expand All @@ -614,7 +628,7 @@ where

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if let Some(io) = try_ready!(self.incoming.poll().map_err(::Error::new_accept)) {
let new_fut = self.new_service.new_service();
let new_fut = self.make_service.make_service_ref(&io);
Ok(Async::Ready(Some(Connecting {
future: new_fut,
io: Some(io),
Expand Down Expand Up @@ -666,8 +680,11 @@ where
I: Stream,
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
I::Item: AsyncRead + AsyncWrite + Send + 'static,
S: NewService<ReqBody=Body, ResBody=B>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
S: MakeServiceRef<
I::Item,
ReqBody=Body,
ResBody=B,
>,
B: Payload,
E: H2Exec<<S::Service as Service>::Future, B>,
{
Expand Down Expand Up @@ -873,3 +890,37 @@ mod upgrades {
}
}

pub(crate) mod make_service {
use std::error::Error as StdError;

pub trait MakeServiceRef<Ctx> {
type Error: Into<Box<StdError + Send + Sync>>;
type ReqBody: ::body::Payload;
type ResBody: ::body::Payload;
type Service: ::service::Service<ReqBody=Self::ReqBody, ResBody=Self::ResBody, Error=Self::Error>;
type Future: ::futures::Future<Item=Self::Service>;

fn make_service_ref(&mut self, ctx: &Ctx) -> Self::Future;
}

impl<T, Ctx, E, ME, S, F, IB, OB> MakeServiceRef<Ctx> for T
where
T: for<'a> ::service::MakeService<&'a Ctx, Error=E, MakeError=ME, Service=S, Future=F, ReqBody=IB, ResBody=OB>,
E: Into<Box<StdError + Send + Sync>>,
ME: Into<Box<StdError + Send + Sync>>,
S: ::service::Service<ReqBody=IB, ResBody=OB, Error=E>,
F: ::futures::Future<Item=S, Error=ME>,
IB: ::body::Payload,
OB: ::body::Payload,
{
type Error = E;
type Service = S;
type ReqBody = IB;
type ResBody = OB;
type Future = F;

fn make_service_ref(&mut self, ctx: &Ctx) -> Self::Future {
self.make_service(ctx)
}
}
}
18 changes: 9 additions & 9 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
//! # Server
//!
//! The [`Server`](Server) is main way to start listening for HTTP requests.
//! It wraps a listener with a [`NewService`](::service), and then should
//! It wraps a listener with a [`MakeService`](::service), and then should
//! be executed to start serving requests.
//!
//! [`Server`](Server) accepts connections in both HTTP1 and HTTP2 by default.
Expand All @@ -30,16 +30,16 @@
//! // Construct our SocketAddr to listen on...
//! let addr = ([127, 0, 0, 1], 3000).into();
//!
//! // And a NewService to handle each connection...
//! let new_service = || {
//! // And a MakeService to handle each connection...
//! let make_service = || {
//! service_fn_ok(|_req| {
//! Response::new(Body::from("Hello World"))
//! })
//! };
//!
//! // Then bind and serve...
//! let server = Server::bind(&addr)
//! .serve(new_service);
//! .serve(make_service);
//!
//! // Finally, spawn `server` onto an Executor...
//! hyper::rt::run(server.map_err(|e| {
Expand All @@ -65,10 +65,10 @@ use tokio_io::{AsyncRead, AsyncWrite};

use body::{Body, Payload};
use common::exec::{Exec, H2Exec, NewSvcExec};
use service::{NewService, Service};
use service::Service;
// Renamed `Http` as `Http_` for now so that people upgrading don't see an
// error that `hyper::server::Http` is private...
use self::conn::{Http as Http_, NoopWatcher, SpawnAll};
use self::conn::{Http as Http_, MakeServiceRef, NoopWatcher, SpawnAll};
use self::shutdown::{Graceful, GracefulWatcher};
#[cfg(feature = "runtime")] use self::tcp::AddrIncoming;

Expand Down Expand Up @@ -144,7 +144,7 @@ where
I: Stream,
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
I::Item: AsyncRead + AsyncWrite + Send + 'static,
S: NewService<ReqBody=Body, ResBody=B>,
S: MakeServiceRef<I::Item, ReqBody=Body, ResBody=B>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
S::Service: 'static,
B: Payload,
Expand Down Expand Up @@ -203,7 +203,7 @@ where
I: Stream,
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
I::Item: AsyncRead + AsyncWrite + Send + 'static,
S: NewService<ReqBody=Body, ResBody=B>,
S: MakeServiceRef<I::Item, ReqBody=Body, ResBody=B>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
S::Service: 'static,
B: Payload,
Expand Down Expand Up @@ -332,7 +332,7 @@ impl<I, E> Builder<I, E> {
I: Stream,
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
I::Item: AsyncRead + AsyncWrite + Send + 'static,
S: NewService<ReqBody=Body, ResBody=B>,
S: MakeServiceRef<I::Item, ReqBody=Body, ResBody=B>,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
S::Service: 'static,
B: Payload,
Expand Down
6 changes: 3 additions & 3 deletions src/server/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use tokio_io::{AsyncRead, AsyncWrite};
use body::{Body, Payload};
use common::drain::{self, Draining, Signal, Watch, Watching};
use common::exec::{H2Exec, NewSvcExec};
use service::{Service, NewService};
use super::conn::{SpawnAll, UpgradeableConnection, Watcher};
use service::Service;
use super::conn::{MakeServiceRef, SpawnAll, UpgradeableConnection, Watcher};

#[allow(missing_debug_implementations)]
pub struct Graceful<I, S, F, E> {
Expand Down Expand Up @@ -40,7 +40,7 @@ where
I: Stream,
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
I::Item: AsyncRead + AsyncWrite + Send + 'static,
S: NewService<ReqBody=Body, ResBody=B>,
S: MakeServiceRef<I::Item, ReqBody=Body, ResBody=B>,
S::Service: 'static,
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
B: Payload,
Expand Down
3 changes: 2 additions & 1 deletion src/server/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tokio_reactor::Handle;
use tokio_tcp::TcpListener;
use tokio_timer::Delay;

use self::addr_stream::AddrStream;
pub use self::addr_stream::AddrStream;

/// A stream of connections from binding to an address.
#[must_use = "streams do nothing unless polled"]
Expand Down Expand Up @@ -194,6 +194,7 @@ mod addr_stream {
use tokio_io::{AsyncRead, AsyncWrite};


/// A transport returned yieled by `AddrIncoming`.
#[derive(Debug)]
pub struct AddrStream {
inner: TcpStream,
Expand Down
Loading

0 comments on commit 40f701b

Please sign in to comment.