Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,14 @@ futures-channel = { version = "0.3", optional = true }
[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
env_logger = "0.10"
hyper = { version = "1.1.0", default-features = false, features = ["http1", "http2", "client", "server"] }
hyper-util = { version = "0.1.12", features = ["http1", "http2", "client", "client-legacy", "server-auto", "tokio"] }
hyper-util = { version = "0.1.12", features = ["http1", "http2", "client", "client-legacy", "server-auto", "server-graceful", "tokio"] }
serde = { version = "1.0", features = ["derive"] }
flate2 = "1.0.13"
brotli_crate = { package = "brotli", version = "7.0.0" }
zstd_crate = { package = "zstd", version = "0.13" }
doc-comment = "0.3"
tokio = { version = "1.0", default-features = false, features = ["macros", "rt-multi-thread"] }
futures-util = { version = "0.3.28", default-features = false, features = ["std", "alloc"] }
rustls = { version = "0.23", default-features = false, features = ["ring"] }

# wasm

Expand Down
14 changes: 8 additions & 6 deletions src/async_impl/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1494,13 +1494,14 @@ impl ClientBuilder {
/// # Example
///
/// ```
/// # #[cfg(all(feature = "__rustls", not(feature = "__rustls-ring")))]
/// # let _ = rustls::crypto::ring::default_provider().install_default();
/// # fn doc() -> Result<(), reqwest::Error> {
/// use std::net::IpAddr;
/// let local_addr = IpAddr::from([12, 4, 1, 8]);
/// let client = reqwest::Client::builder()
/// .local_address(local_addr)
/// .build().unwrap();
/// .build()?;
/// # Ok(())
/// # }
/// ```
pub fn local_address<T>(mut self, addr: T) -> ClientBuilder
where
Expand Down Expand Up @@ -1531,12 +1532,13 @@ impl ClientBuilder {
/// # Example
///
/// ```
/// # #[cfg(all(feature = "__rustls", not(feature = "__rustls-ring")))]
/// # let _ = rustls::crypto::ring::default_provider().install_default();
/// # fn doc() -> Result<(), reqwest::Error> {
/// let interface = "lo";
/// let client = reqwest::Client::builder()
/// .interface(interface)
/// .build().unwrap();
/// .build()?;
/// # Ok(())
/// # }
/// ```
///
/// [man-7-socket]: https://man7.org/linux/man-pages/man7/socket.7.html
Expand Down
308 changes: 38 additions & 270 deletions src/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,24 +632,27 @@ impl ConnectorService {
#[cfg(feature = "default-tls")]
Inner::DefaultTls(http, tls) => {
if dst.scheme() == Some(&Scheme::HTTPS) {
let host = dst.host().to_owned();
let port = dst.port().map(|p| p.as_u16()).unwrap_or(443);
let http = http.clone();
let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
let mut http = hyper_tls::HttpsConnector::from((http, tls_connector));
let conn = http.call(proxy_dst).await?;
log::trace!("tunneling HTTPS over proxy");
let tunneled = tunnel(
conn,
host.ok_or("no host in url")?.to_string(),
port,
self.user_agent.clone(),
auth,
)
.await?;
let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
let inner =
hyper_tls::HttpsConnector::from((http.clone(), tls_connector.clone()));
// TODO: we could cache constructing this
let mut tunnel =
hyper_util::client::legacy::connect::proxy::Tunnel::new(proxy_dst, inner);
if let Some(auth) = auth {
tunnel = tunnel.with_auth(auth);
}
if let Some(ua) = self.user_agent {
let mut headers = http::HeaderMap::new();
headers.insert(http::header::USER_AGENT, ua);
tunnel = tunnel.with_headers(headers);
}
// We don't wrap this again in an HttpsConnector since that uses Maybe,
// and we know this is definitely HTTPS.
let tunneled = tunnel.call(dst.clone()).await?;
let tls_connector = tokio_native_tls::TlsConnector::from(tls.clone());
let io = tls_connector
.connect(host.ok_or("no host in url")?, TokioIo::new(tunneled))
.connect(dst.host().ok_or("no host in url")?, TokioIo::new(tunneled))
.await?;
return Ok(Conn {
inner: self.verbose.wrap(NativeTlsConn {
Expand All @@ -671,18 +674,27 @@ impl ConnectorService {
use std::convert::TryFrom;
use tokio_rustls::TlsConnector as RustlsConnector;

let host = dst.host().ok_or("no host in url")?.to_string();
let port = dst.port().map(|r| r.as_u16()).unwrap_or(443);
let http = http.clone();
let mut http = hyper_rustls::HttpsConnector::from((http, tls_proxy.clone()));
let tls = tls.clone();
let conn = http.call(proxy_dst).await?;
log::trace!("tunneling HTTPS over proxy");
let maybe_server_name = ServerName::try_from(host.as_str().to_owned())
.map_err(|_| "Invalid Server Name");
let tunneled = tunnel(conn, host, port, self.user_agent.clone(), auth).await?;
let server_name = maybe_server_name?;
let io = RustlsConnector::from(tls)
let http = http.clone();
let inner = hyper_rustls::HttpsConnector::from((http, tls_proxy.clone()));
// TODO: we could cache constructing this
let mut tunnel =
hyper_util::client::legacy::connect::proxy::Tunnel::new(proxy_dst, inner);
if let Some(auth) = auth {
tunnel = tunnel.with_auth(auth);
}
if let Some(ua) = self.user_agent {
let mut headers = http::HeaderMap::new();
headers.insert(http::header::USER_AGENT, ua);
tunnel = tunnel.with_headers(headers);
}
// We don't wrap this again in an HttpsConnector since that uses Maybe,
// and we know this is definitely HTTPS.
let tunneled = tunnel.call(dst.clone()).await?;
let host = dst.host().ok_or("no host in url")?.to_string();
let server_name = ServerName::try_from(host.as_str().to_owned())
.map_err(|_| "Invalid Server Name")?;
let io = RustlsConnector::from(tls.clone())
.connect(server_name, TokioIo::new(tunneled))
.await?;

Expand Down Expand Up @@ -950,83 +962,6 @@ pub(crate) mod sealed {

pub(crate) type Connecting = Pin<Box<dyn Future<Output = Result<Conn, BoxError>> + Send>>;

#[cfg(feature = "__tls")]
async fn tunnel<T>(
mut conn: T,
host: String,
port: u16,
user_agent: Option<HeaderValue>,
auth: Option<HeaderValue>,
) -> Result<T, BoxError>
where
T: Read + Write + Unpin,
{
use hyper_util::rt::TokioIo;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

let mut buf = format!(
"\
CONNECT {host}:{port} HTTP/1.1\r\n\
Host: {host}:{port}\r\n\
"
)
.into_bytes();

// user-agent
if let Some(user_agent) = user_agent {
buf.extend_from_slice(b"User-Agent: ");
buf.extend_from_slice(user_agent.as_bytes());
buf.extend_from_slice(b"\r\n");
}

// proxy-authorization
if let Some(value) = auth {
log::debug!("tunnel to {host}:{port} using basic auth");
buf.extend_from_slice(b"Proxy-Authorization: ");
buf.extend_from_slice(value.as_bytes());
buf.extend_from_slice(b"\r\n");
}

// headers end
buf.extend_from_slice(b"\r\n");

let mut tokio_conn = TokioIo::new(&mut conn);

tokio_conn.write_all(&buf).await?;

let mut buf = [0; 8192];
let mut pos = 0;

loop {
let n = tokio_conn.read(&mut buf[pos..]).await?;

if n == 0 {
return Err(tunnel_eof());
}
pos += n;

let recvd = &buf[..pos];
if recvd.starts_with(b"HTTP/1.1 200") || recvd.starts_with(b"HTTP/1.0 200") {
if recvd.ends_with(b"\r\n\r\n") {
return Ok(conn);
}
if pos == buf.len() {
return Err("proxy headers too long for tunnel".into());
}
// else read more
} else if recvd.starts_with(b"HTTP/1.1 407") {
return Err("proxy authentication required".into());
} else {
return Err("unsuccessful tunnel".into());
}
}
}

#[cfg(feature = "__tls")]
fn tunnel_eof() -> BoxError {
"unexpected eof while tunneling".into()
}

#[cfg(feature = "default-tls")]
mod native_tls_conn {
use super::TlsInfoFactory;
Expand Down Expand Up @@ -1518,170 +1453,3 @@ mod verbose {
}
}
}

#[cfg(feature = "__tls")]
#[cfg(test)]
mod tests {
use super::tunnel;
use crate::proxy;
use hyper_util::rt::TokioIo;
use std::io::{Read, Write};
use std::net::TcpListener;
use std::thread;
use tokio::net::TcpStream;
use tokio::runtime;

static TUNNEL_UA: &str = "tunnel-test/x.y";
static TUNNEL_OK: &[u8] = b"\
HTTP/1.1 200 OK\r\n\
\r\n\
";

macro_rules! mock_tunnel {
() => {{
mock_tunnel!(TUNNEL_OK)
}};
($write:expr) => {{
mock_tunnel!($write, "")
}};
($write:expr, $auth:expr) => {{
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
let connect_expected = format!(
"\
CONNECT {0}:{1} HTTP/1.1\r\n\
Host: {0}:{1}\r\n\
User-Agent: {2}\r\n\
{3}\
\r\n\
",
addr.ip(),
addr.port(),
TUNNEL_UA,
$auth
)
.into_bytes();

thread::spawn(move || {
let (mut sock, _) = listener.accept().unwrap();
let mut buf = [0u8; 4096];
let n = sock.read(&mut buf).unwrap();
assert_eq!(&buf[..n], &connect_expected[..]);

sock.write_all($write).unwrap();
});
addr
}};
}

fn ua() -> Option<http::header::HeaderValue> {
Some(http::header::HeaderValue::from_static(TUNNEL_UA))
}

#[test]
fn test_tunnel() {
let addr = mock_tunnel!();

let rt = runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("new rt");
let f = async move {
let tcp = TokioIo::new(TcpStream::connect(&addr).await?);
let host = addr.ip().to_string();
let port = addr.port();
tunnel(tcp, host, port, ua(), None).await
};

rt.block_on(f).unwrap();
}

#[test]
fn test_tunnel_eof() {
let addr = mock_tunnel!(b"HTTP/1.1 200 OK");

let rt = runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("new rt");
let f = async move {
let tcp = TokioIo::new(TcpStream::connect(&addr).await?);
let host = addr.ip().to_string();
let port = addr.port();
tunnel(tcp, host, port, ua(), None).await
};

rt.block_on(f).unwrap_err();
}

#[test]
fn test_tunnel_non_http_response() {
let addr = mock_tunnel!(b"foo bar baz hallo");

let rt = runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("new rt");
let f = async move {
let tcp = TokioIo::new(TcpStream::connect(&addr).await?);
let host = addr.ip().to_string();
let port = addr.port();
tunnel(tcp, host, port, ua(), None).await
};

rt.block_on(f).unwrap_err();
}

#[test]
fn test_tunnel_proxy_unauthorized() {
let addr = mock_tunnel!(
b"\
HTTP/1.1 407 Proxy Authentication Required\r\n\
Proxy-Authenticate: Basic realm=\"nope\"\r\n\
\r\n\
"
);

let rt = runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("new rt");
let f = async move {
let tcp = TokioIo::new(TcpStream::connect(&addr).await?);
let host = addr.ip().to_string();
let port = addr.port();
tunnel(tcp, host, port, ua(), None).await
};

let error = rt.block_on(f).unwrap_err();
assert_eq!(error.to_string(), "proxy authentication required");
}

#[test]
fn test_tunnel_basic_auth() {
let addr = mock_tunnel!(
TUNNEL_OK,
"Proxy-Authorization: Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==\r\n"
);

let rt = runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("new rt");
let f = async move {
let tcp = TokioIo::new(TcpStream::connect(&addr).await?);
let host = addr.ip().to_string();
let port = addr.port();
tunnel(
tcp,
host,
port,
ua(),
Some(proxy::encode_basic_auth("Aladdin", "open sesame")),
)
.await
};

rt.block_on(f).unwrap();
}
}
Loading