Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make examples/simple.rs compatible with hyper v1 #1

Merged
merged 3 commits into from
May 19, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,19 @@ tokio = { version = "1.17.0", features = ["io-util", "rt"] }
tracing = "0.1.34"

[dev-dependencies]
hyper = { version = "1.2.0", features = ["client", "http1"] }
hyper = { version = "1.2.0", features = ["client", "http1", "server"] }
futures = "0.3.21"
async-trait = "0.1.53"
async-tungstenite = { version = "0.17", features = ["tokio-runtime"] }
tokio-test = "0.4.2"
test-context = "0.1.3"
tokiotest-httpserver = "0.2.1"
hyper-trust-dns = { version = "0.4.2", features = [
"rustls-http2",
"dnssec-ring",
"dns-over-https-rustls",
"rustls-webpki"
] }
Comment on lines -41 to -46
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Motivation: hyper-trust-dns was rebranded to hyper-hickory and no longer has a HttpsConnector, so using hyper-rustls instead.

rand = "0.8.5"
tungstenite = "0.17"
url = "2.2"
criterion = "0.3.5"
hyper-rustls = "0.27.1"
rustls = "0.23.6"

[features]

Expand Down
113 changes: 83 additions & 30 deletions examples/simple.rs
Original file line number Diff line number Diff line change
@@ -1,67 +1,120 @@
use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server, StatusCode};
use std::convert::Infallible;
use std::io;
use std::net::{IpAddr, SocketAddr};
use std::time::Duration;

use http_body_util::combinators::UnsyncBoxBody;
use http_body_util::{BodyExt, Empty, Full};
use hyper::body::{Bytes, Incoming};
use hyper::server::conn::http1;
use hyper::service::service_fn;
use hyper::{Request, Response, StatusCode};
use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer};
use tokio::net::TcpListener;

use hyper_reverse_proxy::ReverseProxy;
use hyper_trust_dns::{RustlsHttpsConnector, TrustDnsResolver};
use std::net::IpAddr;
use std::{convert::Infallible, net::SocketAddr};
use hyper_rustls::{ConfigBuilderExt, HttpsConnector};
use hyper_util::client::legacy::connect::HttpConnector;

type Connector = HttpsConnector<HttpConnector>;
type ResponseBody = UnsyncBoxBody<Bytes, std::io::Error>;

lazy_static::lazy_static! {
static ref PROXY_CLIENT: ReverseProxy<RustlsHttpsConnector> = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as you're here fixing up the example, I think it's no longer recommended to use lazy_static. From the lazy_static repo:

It is now possible to easily replicate this crate's functionality in Rust's standard library with std::sync::OnceLock.

So if anything it'd be better to use that.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, I opened #2.

ReverseProxy::new(
hyper::Client::builder().build::<_, hyper::Body>(TrustDnsResolver::default().into_rustls_webpki_https_connector()),
)
static ref PROXY_CLIENT: ReverseProxy<Connector> = {
let connector: Connector = Connector::builder()
.with_tls_config(
rustls::ClientConfig::builder()
.with_native_roots()
.expect("with_native_roots")
.with_no_client_auth(),
)
.https_or_http()
.enable_http1()
.build();
Comment on lines +31 to +32
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only enabling http1 because http2 requires that the upstream server also supports http2, since we're not doing any http2-to-http1 translation at the moment.

ReverseProxy::new(
hyper_util::client::legacy::Builder::new(TokioExecutor::new())
.pool_idle_timeout(Duration::from_secs(3))
.pool_timer(TokioTimer::new())
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Motivation: The default idle timeout is 90 seconds, which is far too long for most services that you connect the reverse proxy to. My test service used 15 seconds, which means I started getting an error 15 seconds after I opened the reverse proxy.

Ideally the keep-alive header from the upstream server would be used automatically. But that can change per request, so I think it would require quite a lot of work to make this seamless. Open to ideas here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started getting an error 15 seconds after I opened the reverse proxy.

Was this an error from hyper-reverse-proxy? If so I would say the solution would be to silence that error. If upstream has decided that an idle connection should get closed that's not really an error, that's business as usual (imo).

90 seconds does initially strike me as being too long for a default, but it's what go's builtin reverse proxy uses (via the DefaultTransport.IdleConnTimeout), and they've probably thought about it more than I have.

In any case the client would ideally honor the keep alive timeout provided by the server, but that's more a concern of the hyper_util::client than of ours.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I guess I wasn't very clear on what was happening here.

The error is:

LegacyHyperError(Error { kind: SendRequest, source: Some(hyper::Error(IncompleteMessage)) })

Because the connection between the proxy and the upstream service has been closed (in my case after 15 seconds), but the connection between the client and the proxy is still open. So on the next request the proxy will attempt to use a connection that is already closed, which results in the error.

Here's a GitHub thread discussing the issue: hyperium/hyper#2136

The TL;DR seems to be that it isn't always safe to automatically retry the request.

Anyway, I thought that it might be useful to include in the example, but perhaps it requires a comment explaining the problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, maybe best to just include that link as a comment there for now 👍

.build::<_, Incoming>(connector),
)
};
}

fn debug_request(req: &Request<Body>) -> Result<Response<Body>, Infallible> {
let body_str = format!("{:?}", req);
Ok(Response::new(Body::from(body_str)))
}

async fn handle(client_ip: IpAddr, req: Request<Body>) -> Result<Response<Body>, Infallible> {
if req.uri().path().starts_with("/target/first") {
async fn handle(
client_ip: IpAddr,
req: Request<Incoming>,
) -> Result<Response<ResponseBody>, Infallible> {
let host = req.headers().get("host").and_then(|v| v.to_str().ok());
if host.is_some_and(|host| host.starts_with("service1.localhost")) {
match PROXY_CLIENT
Comment on lines +47 to +48
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The motivation for using the host header over the path is because the path is forwarded to the upstream server, and most servers will return a useless 404 for the path /target/first. Meanwhile, most servers will ignore what is in the host header, and most browsers let you use subdomains on localhost now, so http://service1.localhost should work for most cases out of the box. Using starts_with since the port number will be at the end!

.call(client_ip, "http://127.0.0.1:13901", req)
.await
{
Ok(response) => Ok(response),
Err(_error) => Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::empty())
.body(UnsyncBoxBody::new(
Empty::<Bytes>::new().map_err(io::Error::other),
))
.unwrap()),
}
} else if req.uri().path().starts_with("/target/second") {
} else if host.is_some_and(|host| host.starts_with("service2.localhost")) {
match PROXY_CLIENT
.call(client_ip, "http://127.0.0.1:13902", req)
.await
{
Ok(response) => Ok(response),
Err(_error) => Ok(Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::empty())
.body(UnsyncBoxBody::new(
Empty::<Bytes>::new().map_err(io::Error::other),
))
.unwrap()),
}
} else {
debug_request(&req)
let body_str = format!("{:?}", req);
Ok(Response::new(UnsyncBoxBody::new(
Full::new(Bytes::from(body_str)).map_err(io::Error::other),
)))
}
}

#[tokio::main]
async fn main() {
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let bind_addr = "127.0.0.1:8000";
let addr: SocketAddr = bind_addr.parse().expect("Could not parse ip:port.");

let make_svc = make_service_fn(|conn: &AddrStream| {
let remote_addr = conn.remote_addr().ip();
async move { Ok::<_, Infallible>(service_fn(move |req| handle(remote_addr, req))) }
});
// We create a TcpListener and bind it to the address
let listener = TcpListener::bind(addr).await?;

println!(
"Access service1 on http://service1.localhost:{}",
addr.port()
);
println!(
"Access service2 on http://service2.localhost:{}",
addr.port()
);

let server = Server::bind(&addr).serve(make_svc);
// We start a loop to continuously accept incoming connections
loop {
let (stream, remote_addr) = listener.accept().await?;
let client_ip = remote_addr.ip();

println!("Running server on {:?}", addr);
// Use an adapter to access something implementing `tokio::io` traits as if they implement
// `hyper::rt` IO traits.
let io = TokioIo::new(stream);

if let Err(e) = server.await {
eprintln!("server error: {}", e);
// Spawn a tokio task to serve multiple connections concurrently
tokio::task::spawn(async move {
// Finally, we bind the incoming connection to our `hello` service
if let Err(err) = http1::Builder::new()
// `service_fn` converts our function in a `Service`
.serve_connection(io, service_fn(move |req| handle(client_ip, req)))
.await
{
eprintln!("Error serving connection: {:?}", err);
}
});
}
}
Loading