Skip to content

Commit 3087002

Browse files
committed
feat(server): change NewService to MakeService with connection context
This adjusts the way `Service`s are created for a `hyper::Server`. The `MakeService` trait allows receiving an argument when creating a `Service`. The implementation for `hyper::Server` expects to pass a reference to the accepted transport (so, `&Incoming::Item`). The user can inspect the transport before making a `Service`. In practice, this allows for things like getting the remote socket address, or the TLS certification, or similar. To prevent a breaking change, there is a blanket implementation of `MakeService` for any `NewService`. Besides implementing `MakeService` directly, there is also added `hyper::service::make_service_fn`. Closes #1650
1 parent 1158bd2 commit 3087002

File tree

8 files changed

+219
-58
lines changed

8 files changed

+219
-58
lines changed

examples/hello.rs

+9-15
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,23 @@
22
extern crate hyper;
33
extern crate pretty_env_logger;
44

5-
use hyper::{Body, Response, Server};
5+
use hyper::{Body, Request, Response, Server};
66
use hyper::service::service_fn_ok;
77
use hyper::rt::{self, Future};
88

9-
static PHRASE: &'static [u8] = b"Hello World!";
10-
119
fn main() {
1210
pretty_env_logger::init();
1311
let addr = ([127, 0, 0, 1], 3000).into();
1412

15-
// new_service is run for each connection, creating a 'service'
16-
// to handle requests for that specific connection.
17-
let new_service = || {
18-
// This is the `Service` that will handle the connection.
19-
// `service_fn_ok` is a helper to convert a function that
20-
// returns a Response into a `Service`.
21-
service_fn_ok(|_| {
22-
Response::new(Body::from(PHRASE))
23-
})
24-
};
25-
2613
let server = Server::bind(&addr)
27-
.serve(new_service)
14+
.serve(|| {
15+
// This is the `Service` that will handle the connection.
16+
// `service_fn_ok` is a helper to convert a function that
17+
// returns a Response into a `Service`.
18+
service_fn_ok(move |_: Request<Body>| {
19+
Response::new(Body::from("Hello World!"))
20+
})
21+
})
2822
.map_err(|e| eprintln!("server error: {}", e));
2923

3024
println!("Listening on http://{}", addr);

src/server/conn.rs

+73-22
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,16 @@ use common::exec::{Exec, H2Exec, NewSvcExec};
2525
use common::io::Rewind;
2626
use error::{Kind, Parse};
2727
use proto;
28-
use service::{NewService, Service};
28+
use service::Service;
2929
use upgrade::Upgraded;
3030

31+
pub(super) use self::make_service::MakeServiceRef;
3132
pub(super) use self::spawn_all::NoopWatcher;
3233
use self::spawn_all::NewSvcTask;
3334
pub(super) use self::spawn_all::Watcher;
3435
pub(super) use self::upgrades::UpgradeableConnection;
3536

36-
#[cfg(feature = "runtime")] pub use super::tcp::AddrIncoming;
37+
#[cfg(feature = "runtime")] pub use super::tcp::{AddrIncoming, AddrStream};
3738

3839
/// A lower-level configuration of the HTTP protocol.
3940
///
@@ -69,13 +70,13 @@ enum ConnectionMode {
6970
#[derive(Debug)]
7071
pub struct Serve<I, S, E = Exec> {
7172
incoming: I,
72-
new_service: S,
73+
make_service: S,
7374
protocol: Http<E>,
7475
}
7576

7677
/// A future building a new `Service` to a `Connection`.
7778
///
78-
/// Wraps the future returned from `NewService` into one that returns
79+
/// Wraps the future returned from `MakeService` into one that returns
7980
/// a `Connection`.
8081
#[must_use = "futures do nothing unless polled"]
8182
#[derive(Debug)]
@@ -349,12 +350,16 @@ impl<E> Http<E> {
349350
///
350351
/// This method will bind the `addr` provided with a new TCP listener ready
351352
/// to accept connections. Each connection will be processed with the
352-
/// `new_service` object provided, creating a new service per
353+
/// `make_service` object provided, creating a new service per
353354
/// connection.
354355
#[cfg(feature = "runtime")]
355-
pub fn serve_addr<S, Bd>(&self, addr: &SocketAddr, new_service: S) -> ::Result<Serve<AddrIncoming, S, E>>
356+
pub fn serve_addr<S, Bd>(&self, addr: &SocketAddr, make_service: S) -> ::Result<Serve<AddrIncoming, S, E>>
356357
where
357-
S: NewService<ReqBody=Body, ResBody=Bd>,
358+
S: MakeServiceRef<
359+
AddrStream,
360+
ReqBody=Body,
361+
ResBody=Bd,
362+
>,
358363
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
359364
Bd: Payload,
360365
E: H2Exec<<S::Service as Service>::Future, Bd>,
@@ -363,19 +368,23 @@ impl<E> Http<E> {
363368
if self.keep_alive {
364369
incoming.set_keepalive(Some(Duration::from_secs(90)));
365370
}
366-
Ok(self.serve_incoming(incoming, new_service))
371+
Ok(self.serve_incoming(incoming, make_service))
367372
}
368373

369374
/// Bind the provided `addr` with the `Handle` and return a [`Serve`](Serve)
370375
///
371376
/// This method will bind the `addr` provided with a new TCP listener ready
372377
/// to accept connections. Each connection will be processed with the
373-
/// `new_service` object provided, creating a new service per
378+
/// `make_service` object provided, creating a new service per
374379
/// connection.
375380
#[cfg(feature = "runtime")]
376-
pub fn serve_addr_handle<S, Bd>(&self, addr: &SocketAddr, handle: &Handle, new_service: S) -> ::Result<Serve<AddrIncoming, S, E>>
381+
pub fn serve_addr_handle<S, Bd>(&self, addr: &SocketAddr, handle: &Handle, make_service: S) -> ::Result<Serve<AddrIncoming, S, E>>
377382
where
378-
S: NewService<ReqBody=Body, ResBody=Bd>,
383+
S: MakeServiceRef<
384+
AddrStream,
385+
ReqBody=Body,
386+
ResBody=Bd,
387+
>,
379388
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
380389
Bd: Payload,
381390
E: H2Exec<<S::Service as Service>::Future, Bd>,
@@ -384,23 +393,27 @@ impl<E> Http<E> {
384393
if self.keep_alive {
385394
incoming.set_keepalive(Some(Duration::from_secs(90)));
386395
}
387-
Ok(self.serve_incoming(incoming, new_service))
396+
Ok(self.serve_incoming(incoming, make_service))
388397
}
389398

390-
/// Bind the provided stream of incoming IO objects with a `NewService`.
391-
pub fn serve_incoming<I, S, Bd>(&self, incoming: I, new_service: S) -> Serve<I, S, E>
399+
/// Bind the provided stream of incoming IO objects with a `MakeService`.
400+
pub fn serve_incoming<I, S, Bd>(&self, incoming: I, make_service: S) -> Serve<I, S, E>
392401
where
393402
I: Stream,
394403
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
395404
I::Item: AsyncRead + AsyncWrite,
396-
S: NewService<ReqBody=Body, ResBody=Bd>,
405+
S: MakeServiceRef<
406+
I::Item,
407+
ReqBody=Body,
408+
ResBody=Bd,
409+
>,
397410
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
398411
Bd: Payload,
399412
E: H2Exec<<S::Service as Service>::Future, Bd>,
400413
{
401414
Serve {
402-
incoming: incoming,
403-
new_service: new_service,
415+
incoming,
416+
make_service,
404417
protocol: self.clone(),
405418
}
406419
}
@@ -604,8 +617,9 @@ where
604617
I: Stream,
605618
I::Item: AsyncRead + AsyncWrite,
606619
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
607-
S: NewService<ReqBody=Body, ResBody=B>,
608-
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
620+
S: MakeServiceRef<I::Item, ReqBody=Body, ResBody=B>,
621+
//S::Error2: Into<Box<::std::error::Error + Send + Sync>>,
622+
//SME: Into<Box<::std::error::Error + Send + Sync>>,
609623
B: Payload,
610624
E: H2Exec<<S::Service as Service>::Future, B>,
611625
{
@@ -614,7 +628,7 @@ where
614628

615629
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
616630
if let Some(io) = try_ready!(self.incoming.poll().map_err(::Error::new_accept)) {
617-
let new_fut = self.new_service.new_service();
631+
let new_fut = self.make_service.make_service_ref(&io);
618632
Ok(Async::Ready(Some(Connecting {
619633
future: new_fut,
620634
io: Some(io),
@@ -666,8 +680,11 @@ where
666680
I: Stream,
667681
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
668682
I::Item: AsyncRead + AsyncWrite + Send + 'static,
669-
S: NewService<ReqBody=Body, ResBody=B>,
670-
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
683+
S: MakeServiceRef<
684+
I::Item,
685+
ReqBody=Body,
686+
ResBody=B,
687+
>,
671688
B: Payload,
672689
E: H2Exec<<S::Service as Service>::Future, B>,
673690
{
@@ -873,3 +890,37 @@ mod upgrades {
873890
}
874891
}
875892

893+
pub(crate) mod make_service {
894+
use std::error::Error as StdError;
895+
896+
pub trait MakeServiceRef<Ctx> {
897+
type Error: Into<Box<StdError + Send + Sync>>;
898+
type ReqBody: ::body::Payload;
899+
type ResBody: ::body::Payload;
900+
type Service: ::service::Service<ReqBody=Self::ReqBody, ResBody=Self::ResBody, Error=Self::Error>;
901+
type Future: ::futures::Future<Item=Self::Service>;
902+
903+
fn make_service_ref(&mut self, ctx: &Ctx) -> Self::Future;
904+
}
905+
906+
impl<T, Ctx, E, ME, S, F, IB, OB> MakeServiceRef<Ctx> for T
907+
where
908+
T: for<'a> ::service::MakeService<&'a Ctx, Error=E, MakeError=ME, Service=S, Future=F, ReqBody=IB, ResBody=OB>,
909+
E: Into<Box<StdError + Send + Sync>>,
910+
ME: Into<Box<StdError + Send + Sync>>,
911+
S: ::service::Service<ReqBody=IB, ResBody=OB, Error=E>,
912+
F: ::futures::Future<Item=S, Error=ME>,
913+
IB: ::body::Payload,
914+
OB: ::body::Payload,
915+
{
916+
type Error = E;
917+
type Service = S;
918+
type ReqBody = IB;
919+
type ResBody = OB;
920+
type Future = F;
921+
922+
fn make_service_ref(&mut self, ctx: &Ctx) -> Self::Future {
923+
self.make_service(ctx)
924+
}
925+
}
926+
}

src/server/mod.rs

+9-9
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
//! # Server
1212
//!
1313
//! The [`Server`](Server) is main way to start listening for HTTP requests.
14-
//! It wraps a listener with a [`NewService`](::service), and then should
14+
//! It wraps a listener with a [`MakeService`](::service), and then should
1515
//! be executed to start serving requests.
1616
//!
1717
//! [`Server`](Server) accepts connections in both HTTP1 and HTTP2 by default.
@@ -30,16 +30,16 @@
3030
//! // Construct our SocketAddr to listen on...
3131
//! let addr = ([127, 0, 0, 1], 3000).into();
3232
//!
33-
//! // And a NewService to handle each connection...
34-
//! let new_service = || {
33+
//! // And a MakeService to handle each connection...
34+
//! let make_service = || {
3535
//! service_fn_ok(|_req| {
3636
//! Response::new(Body::from("Hello World"))
3737
//! })
3838
//! };
3939
//!
4040
//! // Then bind and serve...
4141
//! let server = Server::bind(&addr)
42-
//! .serve(new_service);
42+
//! .serve(make_service);
4343
//!
4444
//! // Finally, spawn `server` onto an Executor...
4545
//! hyper::rt::run(server.map_err(|e| {
@@ -65,10 +65,10 @@ use tokio_io::{AsyncRead, AsyncWrite};
6565

6666
use body::{Body, Payload};
6767
use common::exec::{Exec, H2Exec, NewSvcExec};
68-
use service::{NewService, Service};
68+
use service::Service;
6969
// Renamed `Http` as `Http_` for now so that people upgrading don't see an
7070
// error that `hyper::server::Http` is private...
71-
use self::conn::{Http as Http_, NoopWatcher, SpawnAll};
71+
use self::conn::{Http as Http_, MakeServiceRef, NoopWatcher, SpawnAll};
7272
use self::shutdown::{Graceful, GracefulWatcher};
7373
#[cfg(feature = "runtime")] use self::tcp::AddrIncoming;
7474

@@ -144,7 +144,7 @@ where
144144
I: Stream,
145145
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
146146
I::Item: AsyncRead + AsyncWrite + Send + 'static,
147-
S: NewService<ReqBody=Body, ResBody=B>,
147+
S: MakeServiceRef<I::Item, ReqBody=Body, ResBody=B>,
148148
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
149149
S::Service: 'static,
150150
B: Payload,
@@ -203,7 +203,7 @@ where
203203
I: Stream,
204204
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
205205
I::Item: AsyncRead + AsyncWrite + Send + 'static,
206-
S: NewService<ReqBody=Body, ResBody=B>,
206+
S: MakeServiceRef<I::Item, ReqBody=Body, ResBody=B>,
207207
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
208208
S::Service: 'static,
209209
B: Payload,
@@ -332,7 +332,7 @@ impl<I, E> Builder<I, E> {
332332
I: Stream,
333333
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
334334
I::Item: AsyncRead + AsyncWrite + Send + 'static,
335-
S: NewService<ReqBody=Body, ResBody=B>,
335+
S: MakeServiceRef<I::Item, ReqBody=Body, ResBody=B>,
336336
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
337337
S::Service: 'static,
338338
B: Payload,

src/server/shutdown.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use tokio_io::{AsyncRead, AsyncWrite};
44
use body::{Body, Payload};
55
use common::drain::{self, Draining, Signal, Watch, Watching};
66
use common::exec::{H2Exec, NewSvcExec};
7-
use service::{Service, NewService};
8-
use super::conn::{SpawnAll, UpgradeableConnection, Watcher};
7+
use service::Service;
8+
use super::conn::{MakeServiceRef, SpawnAll, UpgradeableConnection, Watcher};
99

1010
#[allow(missing_debug_implementations)]
1111
pub struct Graceful<I, S, F, E> {
@@ -40,7 +40,7 @@ where
4040
I: Stream,
4141
I::Error: Into<Box<::std::error::Error + Send + Sync>>,
4242
I::Item: AsyncRead + AsyncWrite + Send + 'static,
43-
S: NewService<ReqBody=Body, ResBody=B>,
43+
S: MakeServiceRef<I::Item, ReqBody=Body, ResBody=B>,
4444
S::Service: 'static,
4545
S::Error: Into<Box<::std::error::Error + Send + Sync>>,
4646
B: Payload,

src/server/tcp.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use tokio_reactor::Handle;
88
use tokio_tcp::TcpListener;
99
use tokio_timer::Delay;
1010

11-
use self::addr_stream::AddrStream;
11+
pub use self::addr_stream::AddrStream;
1212

1313
/// A stream of connections from binding to an address.
1414
#[must_use = "streams do nothing unless polled"]
@@ -194,6 +194,7 @@ mod addr_stream {
194194
use tokio_io::{AsyncRead, AsyncWrite};
195195

196196

197+
/// A transport returned yieled by `AddrIncoming`.
197198
#[derive(Debug)]
198199
pub struct AddrStream {
199200
inner: TcpStream,

0 commit comments

Comments
 (0)