diff --git a/Cargo.toml b/Cargo.toml index 222eeff4..17f07b91 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,11 +76,14 @@ compact_jwt = "0.4.2" futures = "^0.3.25" hex = "0.4.3" http = "^0.2.9" -http-body = "=1.0.0-rc.2" -http-body-util = "=0.1.0-rc.2" -hyper = { version = "=1.0.0-rc.3", default-features = false, features = [ +http-body = "1.0.1" +http-body-util = "0.1.2" +hyper = { version = "1.5.1", default-features = false, features = [ "http1", ] } +hyper-util = { version = "0.1.10", features = [ + "tokio", +] } nom = "7.1" peg = "0.8.1" openssl = "^0.10.56" @@ -100,14 +103,14 @@ tokio = { version = "1.22.0", features = [ ] } tokio-native-tls = "^0.3.1" tokio-stream = { version = "0.1", features = ["sync"] } -tokio-tungstenite = { version = "^0.18.0", features = ["native-tls"] } +tokio-tungstenite = { version = "^0.24.0", features = ["native-tls"] } tracing = "^0.1.35" tracing-subscriber = { version = "0.3", features = [ "env-filter", "std", "fmt", ] } -tungstenite = { version = "^0.18.0", default-features = false, features = [ +tungstenite = { version = "^0.24.0", default-features = false, features = [ "handshake", ] } url = "2" diff --git a/cable-tunnel-server/backend/Cargo.toml b/cable-tunnel-server/backend/Cargo.toml index 73fd38ae..4d5d10ce 100644 --- a/cable-tunnel-server/backend/Cargo.toml +++ b/cable-tunnel-server/backend/Cargo.toml @@ -20,6 +20,7 @@ hex.workspace = true http-body.workspace = true http-body-util.workspace = true hyper = { workspace = true, features = ["server"] } +hyper-util.workspace = true thiserror.workspace = true tokio.workspace = true tokio-native-tls.workspace = true diff --git a/cable-tunnel-server/backend/src/main.rs b/cable-tunnel-server/backend/src/main.rs index 9f3cc922..5ff1d18a 100644 --- a/cable-tunnel-server/backend/src/main.rs +++ b/cable-tunnel-server/backend/src/main.rs @@ -14,6 +14,7 @@ use hyper::{ upgrade::Upgraded, Request, Response, StatusCode, }; +use hyper_util::rt::tokio::TokioIo; use tokio::{ select, sync::{ @@ -202,7 +203,7 @@ impl CableError { #[instrument(level = "info", skip_all, err, fields(addr = addr.to_string()))] async fn handle_websocket( state: Arc, - mut ws_stream: WebSocketStream, + mut ws_stream: WebSocketStream>, tx: Tx, mut rx: Rx, addr: SocketAddr, @@ -383,6 +384,7 @@ async fn handle_request( match hyper::upgrade::on(&mut req).await { Ok(upgraded) => { + let upgraded = TokioIo::new(upgraded); let ws_stream = WebSocketStream::from_raw_socket(upgraded, Role::Server, config).await; handle_websocket(ss, ws_stream, tx, rx, addr).await.ok(); diff --git a/cable-tunnel-server/common/Cargo.toml b/cable-tunnel-server/common/Cargo.toml index 5728b144..8459c442 100644 --- a/cable-tunnel-server/common/Cargo.toml +++ b/cable-tunnel-server/common/Cargo.toml @@ -16,6 +16,7 @@ hex.workspace = true http-body.workspace = true http-body-util.workspace = true hyper = { workspace = true, features = ["server"] } +hyper-util.workspace = true thiserror.workspace = true tokio.workspace = true tokio-native-tls.workspace = true diff --git a/cable-tunnel-server/common/src/lib.rs b/cable-tunnel-server/common/src/lib.rs index 090b5fa8..9f7f9c8b 100644 --- a/cable-tunnel-server/common/src/lib.rs +++ b/cable-tunnel-server/common/src/lib.rs @@ -322,10 +322,10 @@ pub async fn run_server( bind_address: SocketAddr, tls_acceptor: Option, server_state: T, - mut request_handler: F, + request_handler: F, ) -> Result<(), Box> where - F: FnMut(Arc, SocketAddr, Request) -> R + Copy + Send + Sync + 'static, + F: Fn(Arc, SocketAddr, Request) -> R + Copy + Send + Sync + 'static, R: Future, Infallible>> + Send, ResBody: Body + Send + 'static, ::Error: Into>, @@ -346,7 +346,9 @@ where }; let server_state = server_state.clone(); let service = - service_fn(move |req| request_handler(server_state.clone(), remote_addr, req)); + service_fn(move |req| { + request_handler(server_state.clone(), remote_addr, req) + }); let tls_acceptor = tls_acceptor.clone(); let span = info_span!("handle_connection", addr = remote_addr.to_string()); @@ -362,6 +364,7 @@ where } }, }; + let stream = hyper_util::rt::tokio::TokioIo::new(stream); let conn = hyper::server::conn::http1::Builder::new().serve_connection(stream, service); diff --git a/cable-tunnel-server/frontend/Cargo.toml b/cable-tunnel-server/frontend/Cargo.toml index 620060fa..eed4e711 100644 --- a/cable-tunnel-server/frontend/Cargo.toml +++ b/cable-tunnel-server/frontend/Cargo.toml @@ -18,6 +18,7 @@ clap.workspace = true hex.workspace = true http-body-util.workspace = true hyper = { workspace = true, features = ["client", "server"] } +hyper-util.workspace = true tokio.workspace = true tokio-native-tls.workspace = true tokio-tungstenite.workspace = true diff --git a/cable-tunnel-server/frontend/src/main.rs b/cable-tunnel-server/frontend/src/main.rs index 9b59e755..25b27a65 100644 --- a/cable-tunnel-server/frontend/src/main.rs +++ b/cable-tunnel-server/frontend/src/main.rs @@ -14,7 +14,8 @@ use hyper::{ }; use cable_tunnel_server_common::*; -use tokio::{io::AsyncWriteExt, net::TcpStream, sync::RwLock}; +use hyper_util::rt::TokioIo; +use tokio::{io::AsyncWriteExt as _, net::TcpStream, sync::RwLock}; use tokio_native_tls::TlsConnector; use tokio_tungstenite::MaybeTlsStream; @@ -199,6 +200,7 @@ async fn handle_request( } } }; + let backend_socket = TokioIo::new(backend_socket); let (mut sender, conn) = match hyper::client::conn::http1::handshake(backend_socket).await { Ok(v) => v, @@ -239,22 +241,24 @@ async fn handle_request( // Set up the "upgrade" handler to connect the two sockets together tokio::task::spawn(async move { // Upgrade the connection to the backend - let mut backend_upgraded = match hyper::upgrade::on(&mut backend_res).await { + let backend_upgraded = match hyper::upgrade::on(&mut backend_res).await { Ok(u) => u, Err(e) => { error!("failure upgrading connection to backend: {e}"); return; } }; + let mut backend_upgraded = TokioIo::new(backend_upgraded); // Upgrade the connection from the client - let mut client_upgraded = match hyper::upgrade::on(&mut req).await { + let client_upgraded = match hyper::upgrade::on(&mut req).await { Ok(u) => u, Err(e) => { error!("failure upgrading connection to client: {e}"); return; } }; + let mut client_upgraded = TokioIo::new(client_upgraded); // Connect the two streams together directly. match tokio::io::copy_bidirectional(&mut backend_upgraded, &mut client_upgraded).await {