diff --git a/src/client/conn.rs b/src/client/conn.rs index 60a04788f3..3d0f15a74e 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -466,6 +466,7 @@ impl Builder { T: AsyncRead + AsyncWrite + Send + 'static, B: Payload + 'static, { + trace!("client handshake HTTP/{}", if self.http2 { 2 } else { 1 }); Handshake { builder: self.clone(), io: Some(io), diff --git a/src/client/connect/mod.rs b/src/client/connect/mod.rs index 4754f64c6c..7d0a70b421 100644 --- a/src/client/connect/mod.rs +++ b/src/client/connect/mod.rs @@ -36,7 +36,6 @@ pub trait Connect: Send + Sync { /// A set of properties to describe where and how to try to connect. #[derive(Clone, Debug)] pub struct Destination { - //pub(super) alpn: Alpn, pub(super) uri: Uri, } @@ -46,21 +45,18 @@ pub struct Destination { /// was used, or if connected to an HTTP proxy. #[derive(Debug)] pub struct Connected { - //alpn: Alpn, + pub(super) alpn: Alpn, pub(super) is_proxied: bool, pub(super) extra: Option, } pub(super) struct Extra(Box); -/*TODO: when HTTP1 Upgrades to H2 are added, this will be needed -#[derive(Debug)] +#[derive(Clone, Copy, Debug, PartialEq)] pub(super) enum Alpn { - Http1, - //H2, - //Http1OrH2 + H2, + None, } -*/ impl Destination { /// Get the protocol scheme. @@ -246,7 +242,7 @@ impl Connected { /// Create new `Connected` type with empty metadata. pub fn new() -> Connected { Connected { - //alpn: Alpn::Http1, + alpn: Alpn::None, is_proxied: false, extra: None, } @@ -274,19 +270,18 @@ impl Connected { self } - /* /// Set that the connected transport negotiated HTTP/2 as it's /// next protocol. - pub fn h2(mut self) -> Connected { + pub fn negotiated_h2(mut self) -> Connected { self.alpn = Alpn::H2; self } - */ // Don't public expose that `Connected` is `Clone`, unsure if we want to // keep that contract... pub(super) fn clone(&self) -> Connected { Connected { + alpn: self.alpn.clone(), is_proxied: self.is_proxied, extra: self.extra.clone(), } diff --git a/src/client/mod.rs b/src/client/mod.rs index 2660089362..b25486f1bf 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -86,12 +86,12 @@ use futures::{Async, Future, Poll}; use futures::future::{self, Either, Executor}; use futures::sync::oneshot; use http::{Method, Request, Response, Uri, Version}; -use http::header::{Entry, HeaderValue, HOST}; +use http::header::{HeaderValue, HOST}; use http::uri::Scheme; use body::{Body, Payload}; use common::{Exec, lazy as hyper_lazy, Lazy}; -use self::connect::{Connect, Connected, Destination}; +use self::connect::{Alpn, Connect, Connected, Destination}; use self::pool::{Key as PoolKey, Pool, Poolable, Pooled, Reservation}; #[cfg(feature = "runtime")] pub use self::connect::HttpConnector; @@ -192,23 +192,19 @@ where C: Connect + Sync + 'static, /// Send a constructed Request using this Client. pub fn request(&self, mut req: Request) -> ResponseFuture { - let is_http_11 = self.ver == Ver::Http1 && match req.version() { - Version::HTTP_11 => true, - Version::HTTP_10 => false, - other => { + let is_http_connect = req.method() == &Method::CONNECT; + match req.version() { + Version::HTTP_11 => (), + Version::HTTP_10 => if is_http_connect { + debug!("CONNECT is not allowed for HTTP/1.0"); + return ResponseFuture::new(Box::new(future::err(::Error::new_user_unsupported_request_method()))); + }, + other => if self.ver != Ver::Http2 { error!("Request has unsupported version \"{:?}\"", other); return ResponseFuture::new(Box::new(future::err(::Error::new_user_unsupported_version()))); } }; - let is_http_connect = req.method() == &Method::CONNECT; - - if !is_http_11 && is_http_connect { - debug!("client does not support CONNECT requests for {:?}", req.version()); - return ResponseFuture::new(Box::new(future::err(::Error::new_user_unsupported_request_method()))); - } - - let uri = req.uri().clone(); let domain = match (uri.scheme_part(), uri.authority_part()) { (Some(scheme), Some(auth)) => { @@ -233,21 +229,7 @@ where C: Connect + Sync + 'static, } }; - if self.set_host && self.ver == Ver::Http1 { - if let Entry::Vacant(entry) = req.headers_mut().entry(HOST).expect("HOST is always valid header name") { - let hostname = uri.host().expect("authority implies host"); - let host = if let Some(port) = uri.port() { - let s = format!("{}:{}", hostname, port); - HeaderValue::from_str(&s) - } else { - HeaderValue::from_str(hostname) - }.expect("uri host is valid header value"); - entry.insert(host); - } - } - - - let pool_key = (Arc::new(domain.to_string()), self.ver); + let pool_key = Arc::new(domain.to_string()); ResponseFuture::new(Box::new(self.retryably_send_request(req, pool_key))) } @@ -283,11 +265,28 @@ where C: Connect + Sync + 'static, fn send_request(&self, mut req: Request, pool_key: PoolKey) -> impl Future, Error=ClientError> { let conn = self.connection_for(req.uri().clone(), pool_key); - let ver = self.ver; + let set_host = self.set_host; let executor = self.executor.clone(); conn.and_then(move |mut pooled| { - if ver == Ver::Http1 { - // CONNECT always sends origin-form, so check it first... + if pooled.is_http1() { + if set_host { + let uri = req.uri().clone(); + req + .headers_mut() + .entry(HOST) + .expect("HOST is always valid header name") + .or_insert_with(|| { + let hostname = uri.host().expect("authority implies host"); + if let Some(port) = uri.port() { + let s = format!("{}:{}", hostname, port); + HeaderValue::from_str(&s) + } else { + HeaderValue::from_str(hostname) + }.expect("uri host is valid header value") + }); + } + + // CONNECT always sends authority-form, so check it first... if req.method() == &Method::CONNECT { authority_form(req.uri_mut()); } else if pooled.conn_info.is_proxied { @@ -295,11 +294,9 @@ where C: Connect + Sync + 'static, } else { origin_form(req.uri_mut()); }; - } else { - debug_assert!( - req.method() != &Method::CONNECT, - "Client should have returned Error for HTTP2 CONNECT" - ); + } else if req.method() == &Method::CONNECT { + debug!("client does not support CONNECT requests over HTTP2"); + return Either::A(future::err(ClientError::Normal(::Error::new_user_unsupported_request_method()))); } let fut = pooled.send_request_retryable(req) @@ -322,10 +319,10 @@ where C: Connect + Sync + 'static, // To counteract this, we must check if our senders 'want' channel // has been closed after having tried to send. If so, error out... if pooled.is_closed() { - return Either::A(fut); + return Either::B(Either::A(fut)); } - Either::B(fut + Either::B(Either::B(fut .and_then(move |mut res| { // If pooled is HTTP/2, we can toss this reference immediately. // @@ -337,7 +334,7 @@ where C: Connect + Sync + 'static, // for a new request to start. // // It won't be ready if there is a body to stream. - if ver == Ver::Http2 || !pooled.is_pool_enabled() || pooled.is_ready() { + if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() { drop(pooled); } else if !res.body().is_end_stream() { let (delayed_tx, delayed_rx) = oneshot::channel(); @@ -370,7 +367,7 @@ where C: Connect + Sync + 'static, } } Ok(res) - })) + }))) }) } @@ -463,8 +460,9 @@ where C: Connect + Sync + 'static, let pool = self.pool.clone(); let h1_writev = self.h1_writev; let h1_title_case_headers = self.h1_title_case_headers; + let ver = self.ver; + let is_ver_h2 = self.ver == Ver::Http2; let connector = self.connector.clone(); - let ver = pool_key.1; let dst = Destination { uri, }; @@ -474,7 +472,7 @@ where C: Connect + Sync + 'static, // If the pool_key is for HTTP/2, and there is already a // connection being estabalished, then this can't take a // second lock. The "connect_to" future is Canceled. - let connecting = match pool.connecting(&pool_key) { + let connecting = match pool.connecting(&pool_key, ver) { Some(lock) => lock, None => { let canceled = ::Error::new_canceled(Some("HTTP/2 connection in progress")); @@ -484,11 +482,31 @@ where C: Connect + Sync + 'static, Either::A(connector.connect(dst) .map_err(::Error::new_connect) .and_then(move |(io, connected)| { - conn::Builder::new() + // If ALPN is h2 and we aren't http2_only already, + // then we need to convert our pool checkout into + // a single HTTP2 one. + let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 { + match connecting.alpn_h2(&pool) { + Some(lock) => { + trace!("ALPN negotiated h2, updating pool"); + lock + }, + None => { + // Another connection has already upgraded, + // the pool checkout should finish up for us. + let canceled = ::Error::new_canceled(Some("ALPN upgraded to HTTP/2")); + return Either::B(future::err(canceled)); + } + } + } else { + connecting + }; + let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2; + Either::A(conn::Builder::new() .exec(executor.clone()) .h1_writev(h1_writev) .h1_title_case_headers(h1_title_case_headers) - .http2_only(pool_key.1 == Ver::Http2) + .http2_only(is_h2) .handshake(io) .and_then(move |(tx, conn)| { let bg = executor.execute(conn.map_err(|e| { @@ -509,12 +527,13 @@ where C: Connect + Sync + 'static, .map(move |tx| { pool.pooled(connecting, PoolClient { conn_info: connected, - tx: match ver { - Ver::Http1 => PoolTx::Http1(tx), - Ver::Http2 => PoolTx::Http2(tx.into_http2()), + tx: if is_h2 { + PoolTx::Http2(tx.into_http2()) + } else { + PoolTx::Http1(tx) }, }) - }) + })) })) }) } @@ -591,6 +610,17 @@ impl PoolClient { } } + fn is_http1(&self) -> bool { + !self.is_http2() + } + + fn is_http2(&self) -> bool { + match self.tx { + PoolTx::Http1(_) => false, + PoolTx::Http2(_) => true, + } + } + fn is_ready(&self) -> bool { match self.tx { PoolTx::Http1(ref tx) => tx.is_ready(), @@ -650,6 +680,10 @@ where } } } + + fn can_share(&self) -> bool { + self.is_http2() + } } // FIXME: allow() required due to `impl Trait` leaking types to this lint diff --git a/src/client/pool.rs b/src/client/pool.rs index 146ef77be8..9fb615bc3d 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -30,6 +30,7 @@ pub(super) trait Poolable: Send + Sized + 'static { /// /// Allows for HTTP/2 to return a shared reservation. fn reserve(self) -> Reservation; + fn can_share(&self) -> bool; } /// When checking out a pooled connection, it might be that the connection @@ -50,7 +51,7 @@ pub(super) enum Reservation { } /// Simple type alias in case the key type needs to be adjusted. -pub(super) type Key = (Arc, Ver); +pub(super) type Key = Arc; struct PoolInner { // A flag that a connection is being estabilished, and the connection @@ -151,8 +152,8 @@ impl Pool { /// Ensure that there is only ever 1 connecting task for HTTP/2 /// connections. This does nothing for HTTP/1. - pub(super) fn connecting(&self, key: &Key) -> Option> { - if key.1 == Ver::Http2 { + pub(super) fn connecting(&self, key: &Key, ver: Ver) -> Option> { + if ver == Ver::Http2 { if let Some(ref enabled) = self.inner { let mut inner = enabled.lock().unwrap(); return if inner.connecting.insert(key.clone()) { @@ -162,7 +163,7 @@ impl Pool { }; Some(connecting) } else { - trace!("HTTP/2 connecting already in progress for {:?}", key.0); + trace!("HTTP/2 connecting already in progress for {:?}", key); None }; } @@ -190,7 +191,7 @@ impl Pool { #[cfg(feature = "runtime")] #[cfg(test)] pub(super) fn h1_key(&self, s: &str) -> Key { - (Arc::new(s.to_string()), Ver::Http1) + Arc::new(s.to_string()) } #[cfg(feature = "runtime")] @@ -243,11 +244,6 @@ impl Pool { let (value, pool_ref) = if let Some(ref enabled) = self.inner { match value.reserve() { Reservation::Shared(to_insert, to_return) => { - debug_assert_eq!( - connecting.key.1, - Ver::Http2, - "shared reservation without Http2" - ); let mut inner = enabled.lock().unwrap(); inner.put(connecting.key.clone(), to_insert, enabled); // Do this here instead of Drop for Connecting because we @@ -294,7 +290,7 @@ impl Pool { // unique or shared. So, the hack is to just assume Ver::Http2 means // shared... :( let mut pool_ref = WeakOpt::none(); - if key.1 == Ver::Http1 { + if !value.can_share() { if let Some(ref enabled) = self.inner { pool_ref = WeakOpt::downgrade(enabled); } @@ -377,7 +373,7 @@ impl<'a, T: Poolable + 'a> IdlePopper<'a, T> { impl PoolInner { fn put(&mut self, key: Key, value: T, __pool_ref: &Arc>>) { - if key.1 == Ver::Http2 && self.idle.contains_key(&key) { + if value.can_share() && self.idle.contains_key(&key) { trace!("put; existing idle HTTP/2 connection for {:?}", key); return; } @@ -601,7 +597,7 @@ impl Drop for Pooled { if let Ok(mut inner) = pool.lock() { inner.put(self.key.clone(), value, &pool); } - } else if self.key.1 == Ver::Http1 { + } else if !value.can_share() { trace!("pool dropped, dropping pooled ({:?})", self.key); } // Ver::Http2 is already in the Pool (or dead), so we wouldn't @@ -705,16 +701,22 @@ pub(super) struct Connecting { pool: WeakOpt>>, } +impl Connecting { + pub(super) fn alpn_h2(self, pool: &Pool) -> Option { + debug_assert!( + self.pool.0.is_none(), + "Connecting::alpn_h2 but already Http2" + ); + + pool.connecting(&self.key, Ver::Http2) + } +} + impl Drop for Connecting { fn drop(&mut self) { if let Some(pool) = self.pool.upgrade() { // No need to panic on drop, that could abort! if let Ok(mut inner) = pool.lock() { - debug_assert_eq!( - self.key.1, - Ver::Http2, - "Connecting constructed without Http2" - ); inner.connected(&self.key); } } @@ -804,7 +806,7 @@ mod tests { use futures::{Async, Future}; use futures::future; use common::Exec; - use super::{Connecting, Key, Poolable, Pool, Reservation, Ver, WeakOpt}; + use super::{Connecting, Key, Poolable, Pool, Reservation, WeakOpt}; /// Test unique reservations. #[derive(Debug, PartialEq, Eq)] @@ -818,6 +820,10 @@ mod tests { fn reserve(self) -> Reservation { Reservation::Unique(self) } + + fn can_share(&self) -> bool { + false + } } fn c(key: Key) -> Connecting { @@ -845,7 +851,7 @@ mod tests { #[test] fn test_pool_checkout_smoke() { let pool = pool_no_timer(); - let key = (Arc::new("foo".to_string()), Ver::Http1); + let key = Arc::new("foo".to_string()); let pooled = pool.pooled(c(key.clone()), Uniq(41)); drop(pooled); @@ -860,7 +866,7 @@ mod tests { fn test_pool_checkout_returns_none_if_expired() { future::lazy(|| { let pool = pool_no_timer(); - let key = (Arc::new("foo".to_string()), Ver::Http1); + let key = Arc::new("foo".to_string()); let pooled = pool.pooled(c(key.clone()), Uniq(41)); drop(pooled); ::std::thread::sleep(pool.locked().timeout.unwrap()); @@ -873,7 +879,7 @@ mod tests { fn test_pool_checkout_removes_expired() { future::lazy(|| { let pool = pool_no_timer(); - let key = (Arc::new("foo".to_string()), Ver::Http1); + let key = Arc::new("foo".to_string()); pool.pooled(c(key.clone()), Uniq(41)); pool.pooled(c(key.clone()), Uniq(5)); @@ -894,7 +900,7 @@ mod tests { fn test_pool_max_idle_per_host() { future::lazy(|| { let pool = pool_max_idle_no_timer(2); - let key = (Arc::new("foo".to_string()), Ver::Http1); + let key = Arc::new("foo".to_string()); pool.pooled(c(key.clone()), Uniq(41)); pool.pooled(c(key.clone()), Uniq(5)); @@ -920,7 +926,7 @@ mod tests { &Exec::Default, ); - let key = (Arc::new("foo".to_string()), Ver::Http1); + let key = Arc::new("foo".to_string()); // Since pool.pooled() will be calling spawn on drop, need to be sure // those drops are called while `rt` is the current executor. To do so, @@ -945,7 +951,7 @@ mod tests { #[test] fn test_pool_checkout_task_unparked() { let pool = pool_no_timer(); - let key = (Arc::new("foo".to_string()), Ver::Http1); + let key = Arc::new("foo".to_string()); let pooled = pool.pooled(c(key.clone()), Uniq(41)); let checkout = pool.checkout(key).join(future::lazy(move || { @@ -964,7 +970,7 @@ mod tests { fn test_pool_checkout_drop_cleans_up_waiters() { future::lazy(|| { let pool = pool_no_timer::>(); - let key = (Arc::new("localhost:12345".to_string()), Ver::Http1); + let key = Arc::new("localhost:12345".to_string()); let mut checkout1 = pool.checkout(key.clone()); let mut checkout2 = pool.checkout(key.clone()); @@ -1000,12 +1006,16 @@ mod tests { fn reserve(self) -> Reservation { Reservation::Unique(self) } + + fn can_share(&self) -> bool { + false + } } #[test] fn pooled_drop_if_closed_doesnt_reinsert() { let pool = pool_no_timer(); - let key = (Arc::new("localhost:12345".to_string()), Ver::Http1); + let key = Arc::new("localhost:12345".to_string()); pool.pooled(c(key.clone()), CanClose { val: 57, closed: true, diff --git a/src/client/tests.rs b/src/client/tests.rs index 7df6a7efe5..17a4194367 100644 --- a/src/client/tests.rs +++ b/src/client/tests.rs @@ -207,4 +207,3 @@ fn checkout_win_allows_connect_future_to_be_pooled() { } } - diff --git a/src/mock.rs b/src/mock.rs index 010b6cf9e3..b27bf6552f 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -358,11 +358,12 @@ impl Read for Duplex { impl Write for Duplex { fn write(&mut self, buf: &[u8]) -> io::Result { let mut inner = self.inner.lock().unwrap(); + let ret = inner.write.write(buf); if let Some(task) = inner.handle_read_task.take() { trace!("waking DuplexHandle read"); task.notify(); } - inner.write.write(buf) + ret } fn flush(&mut self) -> io::Result<()> { @@ -404,8 +405,7 @@ impl DuplexHandle { inner.handle_read_task = Some(task::current()); return Ok(Async::NotReady); } - inner.write.inner.vec.truncate(0); - Ok(Async::Ready(inner.write.inner.len())) + inner.write.read(buf).map(Async::Ready) } pub fn write(&self, bytes: &[u8]) -> Poll { @@ -456,6 +456,13 @@ impl MockConnector { } pub fn mock_fut(&mut self, key: &str, fut: F) -> DuplexHandle + where + F: Future + Send + 'static, + { + self.mock_opts(key, Connected::new(), fut) + } + + pub fn mock_opts(&mut self, key: &str, connected: Connected, fut: F) -> DuplexHandle where F: Future + Send + 'static, { @@ -465,7 +472,7 @@ impl MockConnector { let fut = Box::new(fut.then(move |_| { trace!("MockConnector mocked fut ready"); - Ok((duplex, Connected::new())) + Ok((duplex, connected)) })); self.mocks.lock().unwrap().entry(key) .or_insert(Vec::new()) diff --git a/tests/client.rs b/tests/client.rs index dae5fb9864..0df729e2d8 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -18,7 +18,7 @@ use hyper::{Body, Client, Method, Request, StatusCode}; use futures::{Future, Stream}; use futures::sync::oneshot; use tokio::runtime::current_thread::Runtime; -use tokio::net::tcp::{ConnectFuture, TcpStream}; +use tokio::net::tcp::{ConnectFuture, TcpListener as TkTcpListener, TcpStream}; fn s(buf: &[u8]) -> &str { ::std::str::from_utf8(buf).expect("from_utf8") @@ -1349,12 +1349,66 @@ mod dispatch_impl { assert_eq!(vec, b"bar=foo"); } + #[test] + fn alpn_h2() { + use hyper::Response; + use hyper::server::conn::Http; + use hyper::service::service_fn_ok; + + let _ = pretty_env_logger::try_init(); + let mut rt = Runtime::new().unwrap(); + let listener = TkTcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap(); + let addr = listener.local_addr().unwrap(); + let mut connector = DebugConnector::new(); + connector.alpn_h2 = true; + let connects = connector.connects.clone(); + + let client = Client::builder() + .build::<_, ::hyper::Body>(connector); + + let srv = listener.incoming() + .into_future() + .map_err(|_| unreachable!()) + .and_then(|(item, _incoming)| { + let socket = item.unwrap(); + Http::new() + .http2_only(true) + .serve_connection(socket, service_fn_ok(|req| { + assert_eq!(req.headers().get("host"), None); + Response::new(Body::empty()) + })) + }) + .map_err(|e| panic!("server error: {}", e)); + rt.spawn(srv); + + + assert_eq!(connects.load(Ordering::SeqCst), 0); + + let url = format!("http://{}/a", addr).parse::<::hyper::Uri>().unwrap(); + let res1 = client.get(url.clone()); + let res2 = client.get(url.clone()); + let res3 = client.get(url.clone()); + rt.block_on(res1.join(res2).join(res3)).unwrap(); + + // Since the client doesn't know it can ALPN at first, it will have + // started 3 connections. But, the server above will only handle 1, + // so the unwrapped responses futures show it still worked. + assert_eq!(connects.load(Ordering::SeqCst), 3); + + let res4 = client.get(url.clone()); + rt.block_on(res4).unwrap(); + + assert_eq!(connects.load(Ordering::SeqCst), 3, "after ALPN, no more connects"); + drop(client); + } + struct DebugConnector { http: HttpConnector, closes: mpsc::Sender<()>, connects: Arc, is_proxy: bool, + alpn_h2: bool, } impl DebugConnector { @@ -1370,6 +1424,7 @@ mod dispatch_impl { closes: closes, connects: Arc::new(AtomicUsize::new(0)), is_proxy: false, + alpn_h2: false, } } @@ -1388,7 +1443,11 @@ mod dispatch_impl { self.connects.fetch_add(1, Ordering::SeqCst); let closes = self.closes.clone(); let is_proxy = self.is_proxy; - Box::new(self.http.connect(dst).map(move |(s, c)| { + let is_alpn_h2 = self.alpn_h2; + Box::new(self.http.connect(dst).map(move |(s, mut c)| { + if is_alpn_h2 { + c = c.negotiated_h2(); + } (DebugStream(s, closes), c.proxy(is_proxy)) })) }