Skip to content
This repository has been archived by the owner on Aug 3, 2023. It is now read-only.

Proxy websocket connections when using authenticated (realish) preview #2135

Merged
merged 2 commits into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions src/commands/dev/edge/server/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use std::sync::{Arc, Mutex};
use anyhow::Result;
use chrono::prelude::*;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Client as HyperClient, Request, Server};
use hyper::upgrade::OnUpgrade;
use hyper::{Body, Client as HyperClient, Server};
use hyper_rustls::HttpsConnector;
use tokio::sync::oneshot::{Receiver, Sender};

Expand All @@ -34,6 +35,11 @@ pub async fn http(

async move {
Ok::<_, anyhow::Error>(service_fn(move |req| {
let is_websocket = req
.headers()
.get("upgrade")
.map_or(false, |h| h.as_bytes() == b"websocket");

let client = client.to_owned();
let preview_token = preview_token.lock().unwrap().to_owned();
let host = host.to_owned();
Expand All @@ -48,15 +54,17 @@ pub async fn http(
let now: DateTime<Local> = Local::now();
let path = get_path_as_str(&parts.uri);
async move {
let mut resp = preview_request(
Request::from_parts(parts, body),
client,
let mut req = preview_request(
parts,
body,
preview_token.to_owned(),
host.clone(),
upstream_protocol,
)
.await?;
);
let client_on_upgrade = req.extensions_mut().remove::<OnUpgrade>();

let mut resp = client.request(req).await?;
super::maybe_proxy_websocket(is_websocket, client_on_upgrade, &mut resp);
rewrite_redirect(&mut resp, &host, &local_host, false);

println!(
Expand Down
22 changes: 15 additions & 7 deletions src/commands/dev/edge/server/https.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use chrono::prelude::*;
use futures_util::{stream::StreamExt, FutureExt};

use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Client as HyperClient, Request, Server};
use hyper::upgrade::OnUpgrade;
use hyper::{Body, Client as HyperClient, Server};
use hyper_rustls::HttpsConnector;
use tokio::net::TcpListener;
use tokio::sync::oneshot::{Receiver, Sender};
Expand Down Expand Up @@ -38,6 +39,10 @@ pub async fn https(

async move {
Ok::<_, anyhow::Error>(service_fn(move |req| {
let is_websocket = req
.headers()
.get("upgrade")
.map_or(false, |h| h.as_bytes() == b"websocket");
let client = client.to_owned();
let preview_token = preview_token.lock().unwrap().to_owned();
let host = host.to_owned();
Expand All @@ -52,14 +57,17 @@ pub async fn https(
let now: DateTime<Local> = Local::now();
let path = get_path_as_str(&parts.uri);
async move {
let mut resp = preview_request(
Request::from_parts(parts, body),
client,
let mut req = preview_request(
parts,
body,
preview_token.to_owned(),
host.clone(),
Protocol::Https,
)
.await?;
Protocol::Http,
);

let client_on_upgrade = req.extensions_mut().remove::<OnUpgrade>();
let mut resp = client.request(req).await?;
super::maybe_proxy_websocket(is_websocket, client_on_upgrade, &mut resp);

rewrite_redirect(&mut resp, &host, &local_host, true);

Expand Down
42 changes: 32 additions & 10 deletions src/commands/dev/edge/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,18 @@ pub use self::https::https;
use crate::commands::dev::utils::get_path_as_str;
use crate::commands::dev::Protocol;

use hyper::client::{HttpConnector, ResponseFuture};
use hyper::header::{HeaderName, HeaderValue};
use hyper::{Body, Client as HyperClient, Request};
use hyper_rustls::HttpsConnector;
use hyper::upgrade::OnUpgrade;
use hyper::{Body, Request};
use tokio::io::copy_bidirectional;

fn preview_request(
req: Request<Body>,
client: HyperClient<HttpsConnector<HttpConnector>>,
mut parts: ::http::request::Parts,
body: Body,
preview_token: String,
host: String,
protocol: Protocol,
) -> ResponseFuture {
let (mut parts, body) = req.into_parts();

) -> Request<Body> {
let path = get_path_as_str(&parts.uri);

parts.headers.insert(
Expand All @@ -40,7 +38,31 @@ fn preview_request(
.parse()
.expect("Could not construct preview url");

let req = Request::from_parts(parts, body);
Request::from_parts(parts, body)
}

client.request(req)
fn maybe_proxy_websocket(
is_websocket: bool,
client_on_upgrade: Option<OnUpgrade>,
resp: &mut ::http::Response<Body>,
) {
if is_websocket && resp.status() == 101 {
if let (Some(client_on_upgrade), Some(upstream_on_upgrade)) = (
client_on_upgrade,
resp.extensions_mut().remove::<OnUpgrade>(),
) {
tokio::spawn(async move {
match tokio::try_join!(client_on_upgrade, upstream_on_upgrade) {
Ok((mut client_upgraded, mut server_upgraded)) => {
let proxy_future =
copy_bidirectional(&mut client_upgraded, &mut server_upgraded);
if let Err(err) = proxy_future.await {
log::warn!("could not proxy WebSocket: {}", err);
}
}
Err(e) => log::warn!("could not proxy WebSocket: {}", e),
}
});
}
}
}
13 changes: 6 additions & 7 deletions src/commands/dev/gcs/server/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::sync::{Arc, Mutex};
use anyhow::Result;
use chrono::prelude::*;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Client as HyperClient, Request, Response, Server};
use hyper::{Body, Client as HyperClient, Response, Server};
use hyper_rustls::HttpsConnector;

/// performs all logic that takes an incoming request
Expand Down Expand Up @@ -55,12 +55,9 @@ pub async fn http(server_config: ServerConfig, preview_id: Arc<Mutex<String>>) -

async move {
// send the request to the preview service
let resp = preview_request(
Request::from_parts(parts, body),
client,
preview_id.to_owned(),
)
.await?;
let resp = client
.request(preview_request(parts, body, preview_id.to_owned()))
.await?;
let (mut parts, body) = resp.into_parts();

// format the response for the user
Expand All @@ -73,6 +70,8 @@ pub async fn http(server_config: ServerConfig, preview_id: Arc<Mutex<String>>) -
false,
);

// TODO: proxy websocket

// print information about the response
// [2020-04-20 15:25:54] GET example.com/ HTTP/1.1 200 OK
println!(
Expand Down
11 changes: 4 additions & 7 deletions src/commands/dev/gcs/server/https.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use anyhow::Result;
use chrono::prelude::*;
use futures_util::{FutureExt, StreamExt};
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Client as HyperClient, Request, Response, Server};
use hyper::{Body, Client as HyperClient, Response, Server};
use hyper_rustls::HttpsConnector;
use tokio::net::TcpListener;

Expand Down Expand Up @@ -60,12 +60,9 @@ pub async fn https(server_config: ServerConfig, preview_id: Arc<Mutex<String>>)

async move {
// send the request to the preview service
let resp = preview_request(
Request::from_parts(parts, body),
client,
preview_id.to_owned(),
)
.await?;
let resp = client
.request(preview_request(parts, body, preview_id.to_owned()))
.await?;
let (mut parts, body) = resp.into_parts();

// format the response for the user
Expand Down
16 changes: 5 additions & 11 deletions src/commands/dev/gcs/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ pub use self::https::https;
use crate::commands::dev::gcs::headers::structure_request;
use crate::commands::dev::utils::get_path_as_str;

use hyper::client::{HttpConnector, ResponseFuture};
use hyper::header::{HeaderName, HeaderValue};
use hyper::http::uri::InvalidUri;
use hyper::{Body, Client as HyperClient, Request, Uri};
use hyper_rustls::HttpsConnector;
use hyper::{Body, Request, Uri};

const PREVIEW_HOST: &str = "rawhttp.cloudflareworkers.com";

Expand All @@ -20,12 +18,10 @@ fn get_preview_url(path_string: &str) -> Result<Uri, InvalidUri> {
}

pub fn preview_request(
req: Request<Body>,
client: HyperClient<HttpsConnector<HttpConnector>>,
mut parts: ::http::request::Parts,
body: Body,
preview_id: String,
) -> ResponseFuture {
let (mut parts, body) = req.into_parts();

) -> Request<Body> {
let path = get_path_as_str(&parts.uri);
let preview_id = &preview_id;

Expand All @@ -43,7 +39,5 @@ pub fn preview_request(

parts.uri = get_preview_url(&path).expect("Could not get preview url");

let req = Request::from_parts(parts, body);

client.request(req)
Request::from_parts(parts, body)
}