Skip to content

Commit

Permalink
wip: upgrade hyper/http, use new APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
micolous committed Dec 11, 2024
1 parent acac497 commit 09de028
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 12 deletions.
13 changes: 8 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,14 @@ compact_jwt = "0.4.2"
futures = "^0.3.25"
hex = "0.4.3"
http = "^0.2.9"
http-body = "=1.0.0-rc.2"
http-body-util = "=0.1.0-rc.2"
hyper = { version = "=1.0.0-rc.3", default-features = false, features = [
http-body = "1.0.1"
http-body-util = "0.1.2"
hyper = { version = "1.5.1", default-features = false, features = [
"http1",
] }
hyper-util = { version = "0.1.10", features = [
"tokio",
] }
nom = "7.1"
peg = "0.8.1"
openssl = "^0.10.56"
Expand All @@ -100,14 +103,14 @@ tokio = { version = "1.22.0", features = [
] }
tokio-native-tls = "^0.3.1"
tokio-stream = { version = "0.1", features = ["sync"] }
tokio-tungstenite = { version = "^0.18.0", features = ["native-tls"] }
tokio-tungstenite = { version = "^0.24.0", features = ["native-tls"] }
tracing = "^0.1.35"
tracing-subscriber = { version = "0.3", features = [
"env-filter",
"std",
"fmt",
] }
tungstenite = { version = "^0.18.0", default-features = false, features = [
tungstenite = { version = "^0.24.0", default-features = false, features = [
"handshake",
] }
url = "2"
Expand Down
1 change: 1 addition & 0 deletions cable-tunnel-server/backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ hex.workspace = true
http-body.workspace = true
http-body-util.workspace = true
hyper = { workspace = true, features = ["server"] }
hyper-util.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-native-tls.workspace = true
Expand Down
4 changes: 3 additions & 1 deletion cable-tunnel-server/backend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use hyper::{
upgrade::Upgraded,
Request, Response, StatusCode,
};
use hyper_util::rt::tokio::TokioIo;
use tokio::{
select,
sync::{
Expand Down Expand Up @@ -202,7 +203,7 @@ impl CableError {
#[instrument(level = "info", skip_all, err, fields(addr = addr.to_string()))]
async fn handle_websocket(
state: Arc<ServerState>,
mut ws_stream: WebSocketStream<Upgraded>,
mut ws_stream: WebSocketStream<TokioIo<Upgraded>>,
tx: Tx,
mut rx: Rx,
addr: SocketAddr,
Expand Down Expand Up @@ -383,6 +384,7 @@ async fn handle_request(

match hyper::upgrade::on(&mut req).await {
Ok(upgraded) => {
let upgraded = TokioIo::new(upgraded);
let ws_stream =
WebSocketStream::from_raw_socket(upgraded, Role::Server, config).await;
handle_websocket(ss, ws_stream, tx, rx, addr).await.ok();
Expand Down
1 change: 1 addition & 0 deletions cable-tunnel-server/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ hex.workspace = true
http-body.workspace = true
http-body-util.workspace = true
hyper = { workspace = true, features = ["server"] }
hyper-util.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-native-tls.workspace = true
Expand Down
9 changes: 6 additions & 3 deletions cable-tunnel-server/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,10 @@ pub async fn run_server<F, R, ResBody, T>(
bind_address: SocketAddr,
tls_acceptor: Option<TlsAcceptor>,
server_state: T,
mut request_handler: F,
request_handler: F,
) -> Result<(), Box<dyn StdError>>
where
F: FnMut(Arc<T>, SocketAddr, Request<Incoming>) -> R + Copy + Send + Sync + 'static,
F: Fn(Arc<T>, SocketAddr, Request<Incoming>) -> R + Copy + Send + Sync + 'static,
R: Future<Output = Result<Response<ResBody>, Infallible>> + Send,
ResBody: Body + Send + 'static,
<ResBody as Body>::Error: Into<Box<dyn StdError + Send + Sync>>,
Expand All @@ -346,7 +346,9 @@ where
};
let server_state = server_state.clone();
let service =
service_fn(move |req| request_handler(server_state.clone(), remote_addr, req));
service_fn(move |req| {
request_handler(server_state.clone(), remote_addr, req)
});
let tls_acceptor = tls_acceptor.clone();

let span = info_span!("handle_connection", addr = remote_addr.to_string());
Expand All @@ -362,6 +364,7 @@ where
}
},
};
let stream = hyper_util::rt::tokio::TokioIo::new(stream);

let conn =
hyper::server::conn::http1::Builder::new().serve_connection(stream, service);
Expand Down
1 change: 1 addition & 0 deletions cable-tunnel-server/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ clap.workspace = true
hex.workspace = true
http-body-util.workspace = true
hyper = { workspace = true, features = ["client", "server"] }
hyper-util.workspace = true
tokio.workspace = true
tokio-native-tls.workspace = true
tokio-tungstenite.workspace = true
Expand Down
10 changes: 7 additions & 3 deletions cable-tunnel-server/frontend/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use hyper::{
};

use cable_tunnel_server_common::*;
use tokio::{io::AsyncWriteExt, net::TcpStream, sync::RwLock};
use hyper_util::rt::TokioIo;
use tokio::{io::AsyncWriteExt as _, net::TcpStream, sync::RwLock};
use tokio_native_tls::TlsConnector;
use tokio_tungstenite::MaybeTlsStream;

Expand Down Expand Up @@ -199,6 +200,7 @@ async fn handle_request(
}
}
};
let backend_socket = TokioIo::new(backend_socket);

let (mut sender, conn) = match hyper::client::conn::http1::handshake(backend_socket).await {
Ok(v) => v,
Expand Down Expand Up @@ -239,22 +241,24 @@ async fn handle_request(
// Set up the "upgrade" handler to connect the two sockets together
tokio::task::spawn(async move {
// Upgrade the connection to the backend
let mut backend_upgraded = match hyper::upgrade::on(&mut backend_res).await {
let backend_upgraded = match hyper::upgrade::on(&mut backend_res).await {
Ok(u) => u,
Err(e) => {
error!("failure upgrading connection to backend: {e}");
return;
}
};
let mut backend_upgraded = TokioIo::new(backend_upgraded);

// Upgrade the connection from the client
let mut client_upgraded = match hyper::upgrade::on(&mut req).await {
let client_upgraded = match hyper::upgrade::on(&mut req).await {
Ok(u) => u,
Err(e) => {
error!("failure upgrading connection to client: {e}");
return;
}
};
let mut client_upgraded = TokioIo::new(client_upgraded);

// Connect the two streams together directly.
match tokio::io::copy_bidirectional(&mut backend_upgraded, &mut client_upgraded).await {
Expand Down

0 comments on commit 09de028

Please sign in to comment.