Skip to content

Commit

Permalink
support http on wasm (#106)
Browse files Browse the repository at this point in the history
* support http for wasm

* use environment variable at compile time

* update ci for testing wasm http
  • Loading branch information
crwen authored Nov 19, 2024
1 parent 78897bf commit 4e7e7d0
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ jobs:
- name: Run wasm-pack test on fusion
run: |
export PATH=$PATH:/tmp/chrome/chrome-linux64/:/tmp/chrome/chromedriver-linux64/
wasm-pack test --chrome --headless fusio --features opfs
wasm-pack test --chrome --headless fusio --features aws,opfs,wasm-http
- name: Run wasm-pack test on fusion-parquet
run: |
Expand Down
1 change: 1 addition & 0 deletions fusio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ opfs = [
tokio = ["async-stream", "dep:tokio"]
tokio-http = ["dep:reqwest", "http"]
tokio-uring = ["async-stream", "completion-based", "dep:tokio-uring", "no-send"]
wasm-http = ["dep:reqwest", "http"]

[[bench]]
harness = false
Expand Down
6 changes: 6 additions & 0 deletions fusio/src/impls/remotes/aws/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,15 +558,19 @@ pub(crate) struct TemporaryToken<T> {
mod tests {
use std::time::Duration;

#[allow(unused)]
use bytes::Bytes;
use chrono::{DateTime, Utc};
#[allow(unused)]
use http::{header::AUTHORIZATION, Method, Request};
#[allow(unused)]
use http_body_util::Empty;
use url::Url;

use crate::remotes::aws::credential::{AwsAuthorizer, AwsCredential};

// Test generated using https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html
#[cfg(feature = "tokio")]
#[tokio::test]
async fn test_sign_with_signed_payload() {
// Test credentials from https://docs.aws.amazon.com/AmazonS3/latest/userguide/RESTAuthentication.html
Expand Down Expand Up @@ -610,6 +614,7 @@ mod tests {
)
}

#[cfg(feature = "tokio")]
#[tokio::test]
async fn test_sign_with_unsigned_payload() {
// Test credentials from https://docs.aws.amazon.com/AmazonS3/latest/userguide/RESTAuthentication.html
Expand Down Expand Up @@ -693,6 +698,7 @@ mod tests {
);
}

#[cfg(feature = "tokio")]
#[tokio::test]
async fn test_sign_port() {
let credential = AwsCredential {
Expand Down
24 changes: 14 additions & 10 deletions fusio/src/impls/remotes/aws/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,26 @@ pub struct AmazonS3Builder {
impl AmazonS3Builder {
#[allow(unused_variables)]
pub fn new(bucket: String) -> Self {
let client: Box<dyn DynHttpClient>;
cfg_if::cfg_if! {
if #[cfg(all(feature = "tokio-http", not(feature = "completion-based")))] {
let client = Box::new(crate::remotes::http::tokio::TokioClient::new());
Self {
endpoint: None,
region: "us-east-1".into(),
bucket,
credential: None,
sign_payload: false,
checksum: false,
client,
}
client = Box::new(crate::remotes::http::tokio::TokioClient::new());
} else if #[cfg(all(feature = "wasm-http", not(feature = "completion-based")))]{
client = Box::new(crate::remotes::http::wasm::WasmClient::new());
} else {
unreachable!()
}
}

Self {
endpoint: None,
region: "us-east-1".into(),
bucket,
credential: None,
sign_payload: false,
checksum: false,
client,
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion fusio/src/impls/remotes/http/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub enum HttpError {
},
#[error(transparent)]
Http(#[from] http::Error),
#[cfg(feature = "tokio-http")]
#[cfg(any(feature = "tokio-http", feature = "wasm-http"))]
#[error(transparent)]
Reqwest(#[from] reqwest::Error),
#[error(transparent)]
Expand Down
2 changes: 2 additions & 0 deletions fusio/src/impls/remotes/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod error;
#[cfg(all(feature = "tokio-http", not(feature = "completion-based")))]
pub mod tokio;
#[cfg(all(feature = "wasm-http", not(feature = "completion-based")))]
pub mod wasm;

use std::{future::Future, pin::Pin};

Expand Down
114 changes: 114 additions & 0 deletions fusio/src/impls/remotes/http/wasm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use std::str::FromStr;

use bytes::Bytes;
use http::{Request, Response};
use http_body::Body;
use http_body_util::BodyExt;

use super::{HttpClient, HttpError};
use crate::{error::BoxedError, MaybeSync};

#[derive(Default)]
pub struct WasmClient;

impl WasmClient {
pub fn new() -> Self {
Default::default()
}
}

impl HttpClient for WasmClient {
type RespBody = http_body_util::Full<Bytes>;

async fn send_request<B>(
&self,
request: Request<B>,
) -> Result<Response<Self::RespBody>, HttpError>
where
B: Body + Send + MaybeSync + 'static,
B::Data: Into<Bytes>,
B::Error: Into<BoxedError>,
{
let uri = request.uri().clone();
let (parts, body) = request.into_parts();

let url = reqwest::Url::from_str(&uri.to_string())?;
let body = http_body_util::combinators::UnsyncBoxBody::new(body);

match body.collect().await {
Ok(body) => {
let client = reqwest::Client::new();

let mut builder = client.request(parts.method, url);
builder = builder.body(reqwest::Body::from(body.to_bytes()));
let response = builder.send().await?;
let bytes = response.bytes().await?;

Ok(Response::new(http_body_util::Full::new(bytes)))
}
Err(err) => Err(HttpError::Other(err.into())),
}
}
}

#[cfg(feature = "wasm-http")]
#[cfg(test)]
mod tests {
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);

use wasm_bindgen_test::wasm_bindgen_test;

#[wasm_bindgen_test]
async fn test_wasm_client() {
use bytes::Bytes;
use http::{Request, StatusCode};
use http_body_util::Empty;

use super::{HttpClient, WasmClient};

let request = Request::get("https://jsonplaceholder.typicode.com/users")
.body(Empty::<Bytes>::new())
.unwrap();
let client = WasmClient::new();
let response = client.send_request(request).await.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}

#[cfg(all(feature = "wasm-http", feature = "aws"))]
#[wasm_bindgen_test]
async fn list_and_remove_wasm() {
use std::pin::pin;

use futures_util::StreamExt;

use crate::{
fs::Fs,
path::Path,
remotes::aws::{fs::AmazonS3Builder, AwsCredential},
};

if option_env!("AWS_ACCESS_KEY_ID").is_none() {
eprintln!("skipping AWS s3 test");
return;
}
let key_id = option_env!("AWS_ACCESS_KEY_ID").unwrap().to_string();
let secret_key = option_env!("AWS_SECRET_ACCESS_KEY").unwrap().to_string();

let s3 = AmazonS3Builder::new("fusio-test".into())
.credential(AwsCredential {
key_id,
secret_key,
token: None,
})
.region("ap-southeast-1".into())
.sign_payload(true)
.build();

let path = Path::parse("test").unwrap();
let mut stream = pin!(s3.list(&path).await.unwrap());
while let Some(meta) = stream.next().await {
let meta = meta.unwrap();
s3.remove(&meta.path).await.unwrap();
}
}
}

0 comments on commit 4e7e7d0

Please sign in to comment.