From caf1894726362a15db0fc12390d2bf886c2df6f0 Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Wed, 9 Nov 2022 21:37:30 +0100 Subject: [PATCH 01/19] feat(shuttle-next): POC of axum wasm router --- runtime/Makefile | 4 + runtime/README.md | 21 ++++- runtime/src/axum/mod.rs | 191 +++++++++++++++++++++++++++++++++++++++ runtime/src/lib.rs | 1 + tmp/axum-wasm/Cargo.toml | 13 +++ tmp/axum-wasm/src/lib.rs | 72 +++++++++++++++ 6 files changed, 301 insertions(+), 1 deletion(-) create mode 100644 runtime/src/axum/mod.rs create mode 100644 tmp/axum-wasm/Cargo.toml create mode 100644 tmp/axum-wasm/src/lib.rs diff --git a/runtime/Makefile b/runtime/Makefile index 18dac0caa..458f6ad3b 100644 --- a/runtime/Makefile +++ b/runtime/Makefile @@ -6,6 +6,10 @@ wasm: cd ../tmp/wasm; cargo build --target wasm32-wasi cp ../tmp/wasm/target/wasm32-wasi/debug/shuttle_serenity.wasm bot.wasm +axum: + cd ../tmp/axum-wasm; cargo build --target wasm32-wasi + cp ../tmp/axum-wasm/target/wasm32-wasi/debug/shuttle_axum.wasm axum.wasm + test: wasm cargo test -- --nocapture diff --git a/runtime/README.md b/runtime/README.md index 193ee80f7..a1243e115 100644 --- a/runtime/README.md +++ b/runtime/README.md @@ -1,5 +1,6 @@ -## How to run +# How to run +## shuttle-next ```bash $ make wasm $ DISCORD_TOKEN=xxx cargo run @@ -12,6 +13,24 @@ grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic"}' localhost:8000 runtime.Runtime/Start grpcurl -plaintext -import-path ../proto -proto runtime.proto localhost:8000 runtime.Runtime/SubscribeLogs ``` + +## axum-wasm + +Compile the wasm axum router: + +```bash +make wasm +``` + +Run the test: + +```bash +cargo test axum -- --nocapture +``` + +Load and run: +TODO + ## shuttle-legacy Load and run an .so library that implements `shuttle_service::Service`. diff --git a/runtime/src/axum/mod.rs b/runtime/src/axum/mod.rs new file mode 100644 index 000000000..c7e340a9e --- /dev/null +++ b/runtime/src/axum/mod.rs @@ -0,0 +1,191 @@ +use std::fs::File; +use std::io::{Read, Write}; +use std::os::unix::prelude::RawFd; +use std::path::Path; +use std::sync::Arc; + +use async_trait::async_trait; +use cap_std::os::unix::net::UnixStream; +use shuttle_proto::runtime::runtime_server::Runtime; +use shuttle_proto::runtime::{LoadRequest, LoadResponse, StartRequest, StartResponse}; +use tonic::{Request, Response, Status}; +use tracing::trace; +use wasi_common::file::FileCaps; +use wasmtime::{Engine, Linker, Module, Store}; +use wasmtime_wasi::sync::net::UnixStream as WasiUnixStream; +use wasmtime_wasi::{WasiCtx, WasiCtxBuilder}; + +pub struct AxumWasm { + router: std::sync::Mutex>, +} + +impl AxumWasm { + pub fn new() -> Self { + Self { + router: std::sync::Mutex::new(None), + } + } +} + +#[async_trait] +impl Runtime for AxumWasm { + async fn load(&self, request: Request) -> Result, Status> { + let wasm_path = request.into_inner().path; + trace!(wasm_path, "loading"); + + let router = Router::new(wasm_path); + + *self.router.lock().unwrap() = Some(router); + + let message = LoadResponse { success: true }; + + Ok(Response::new(message)) + } + + async fn start( + &self, + _request: Request, + ) -> Result, Status> { + // TODO: start a process that streams requests from a socket into wasm router and returns response? + + let message = StartResponse { + success: true, + port: 7002, + }; + + Ok(Response::new(message)) + } +} + +struct RouterBuilder { + engine: Engine, + store: Store, + linker: Linker, + src: Option, +} + +impl RouterBuilder { + pub fn new() -> Self { + let engine = Engine::default(); + + let mut linker: Linker = Linker::new(&engine); + wasmtime_wasi::add_to_linker(&mut linker, |s| s).unwrap(); + + let wasi = WasiCtxBuilder::new() + .inherit_stdio() + .inherit_args() + .unwrap() + .build(); + + let store = Store::new(&engine, wasi); + + Self { + engine, + store, + linker, + src: None, + } + } + + pub fn src>(mut self, src: P) -> Self { + self.src = Some(File::open(src).unwrap()); + self + } + + pub fn build(mut self) -> Router { + let mut buf = Vec::new(); + self.src.unwrap().read_to_end(&mut buf).unwrap(); + let module = Module::new(&self.engine, buf).unwrap(); + + for export in module.exports() { + println!("export: {}", export.name()); + } + + self.linker + .module(&mut self.store, "axum", &module) + .unwrap(); + let inner = RouterInner { + store: self.store, + linker: self.linker, + }; + Router { + inner: Arc::new(tokio::sync::Mutex::new(inner)), + } + } +} + +struct RouterInner { + store: Store, + linker: Linker, +} + +impl RouterInner { + /// Send a GET request to given endpoint on the axum-wasm router + pub async fn get(&mut self, endpoint: &str) -> Option { + let (mut host, client) = UnixStream::pair().unwrap(); + let client = WasiUnixStream::from_cap_std(client); + + self.store + .data_mut() + .insert_file(3, Box::new(client), FileCaps::all()); + + host.write_all(endpoint.as_bytes()).unwrap(); + host.write(&[0]).unwrap(); + + println!("calling inner Router endpoint: /{endpoint}"); + + self.linker + .get(&mut self.store, "axum", "__SHUTTLE_Axum_call") + .unwrap() + .into_func() + .unwrap() + .typed::(&self.store) + .unwrap() + .call(&mut self.store, 3) + .unwrap(); + + let mut res = String::new(); + host.read_to_string(&mut res).unwrap(); + + if res.is_empty() { + println!("invalid endpoint"); + None + } else { + println!("received response: {res}"); + Some(res) + } + } +} + +#[derive(Clone)] +struct Router { + inner: Arc>, +} + +impl Router { + pub fn builder() -> RouterBuilder { + RouterBuilder::new() + } + + pub fn new>(src: P) -> Self { + Self::builder().src(src).build() + } +} + +#[cfg(test)] +pub mod tests { + use super::*; + + #[tokio::test] + async fn axum() { + let axum = Router::new("axum.wasm"); + let mut inner = axum.inner.lock().await; + + assert_eq!(inner.get("hello").await, Some("Hello, World!".to_string())); + assert_eq!( + inner.get("goodbye").await, + Some("Goodbye, World!".to_string()) + ); + assert_eq!(inner.get("not/a/real/endpoint").await, None); + } +} diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 7ca426d3a..8834f2198 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -1,4 +1,5 @@ mod args; +mod axum; mod legacy; mod next; pub mod provisioner_factory; diff --git a/tmp/axum-wasm/Cargo.toml b/tmp/axum-wasm/Cargo.toml new file mode 100644 index 000000000..638d0f793 --- /dev/null +++ b/tmp/axum-wasm/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "shuttle-axum" +version = "0.1.0" +edition = "2021" + +[lib] +crate-type = [ "cdylib" ] + +[dependencies] +axum = { version = "0.6.0-rc.4", default-features = false } +futures-executor = "0.3.21" +http = "0.2.7" +tower-service = "0.3.1" diff --git a/tmp/axum-wasm/src/lib.rs b/tmp/axum-wasm/src/lib.rs new file mode 100644 index 000000000..36b10b4c7 --- /dev/null +++ b/tmp/axum-wasm/src/lib.rs @@ -0,0 +1,72 @@ +use axum::body::Bytes; +use axum::{body::HttpBody, response::Response, routing::get, Router}; +use futures_executor::block_on; +use http::Request; +use std::fs::File; +use std::io::{Read, Write}; +use std::os::wasi::prelude::*; +use tower_service::Service; + +pub fn handle_request(endpoint: String) -> Option { + let request: Request = Request::builder() + .uri(format!("https://serverless.example/{}", endpoint.clone())) + .body("Some body".to_string()) + .unwrap(); + + let response = block_on(app(request)); + + let response_body = block_on(response.into_body().data()); + + if let Some(body) = response_body { + Some(body.unwrap()) + } else { + None + } +} + +async fn app(request: Request) -> Response { + let mut router = Router::new() + .route("/hello", get(hello)) + .route("/goodbye", get(goodbye)) + .into_service(); + + let response = router.call(request).await.unwrap(); + + response +} + +async fn hello() -> &'static str { + "Hello, World!" +} + +async fn goodbye() -> &'static str { + "Goodbye, World!" +} + +#[no_mangle] +#[allow(non_snake_case)] +pub extern "C" fn __SHUTTLE_Axum_call(fd: RawFd) { + println!("inner handler awoken; interacting with fd={fd}"); + + let mut f = unsafe { File::from_raw_fd(fd) }; + + let mut buf = Vec::new(); + let mut c_buf = [0; 1]; + loop { + f.read(&mut c_buf).unwrap(); + if c_buf[0] == 0 { + break; + } else { + buf.push(c_buf[0]); + } + } + + let endpoint = String::from_utf8(buf).unwrap(); + + println!("inner router called; GET /{endpoint}"); + let res = handle_request(endpoint); + + if let Some(bytes) = res { + f.write_all(&bytes).unwrap(); + } +} From 69645acc3bdc72d7e19644a49aaa54f635d87652 Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Thu, 10 Nov 2022 11:03:43 +0100 Subject: [PATCH 02/19] feat: use std mutex --- runtime/src/axum/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/runtime/src/axum/mod.rs b/runtime/src/axum/mod.rs index c7e340a9e..cd4411ae6 100644 --- a/runtime/src/axum/mod.rs +++ b/runtime/src/axum/mod.rs @@ -2,7 +2,7 @@ use std::fs::File; use std::io::{Read, Write}; use std::os::unix::prelude::RawFd; use std::path::Path; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use async_trait::async_trait; use cap_std::os::unix::net::UnixStream; @@ -109,7 +109,7 @@ impl RouterBuilder { linker: self.linker, }; Router { - inner: Arc::new(tokio::sync::Mutex::new(inner)), + inner: Arc::new(Mutex::new(inner)), } } } @@ -159,7 +159,7 @@ impl RouterInner { #[derive(Clone)] struct Router { - inner: Arc>, + inner: Arc>, } impl Router { @@ -179,7 +179,7 @@ pub mod tests { #[tokio::test] async fn axum() { let axum = Router::new("axum.wasm"); - let mut inner = axum.inner.lock().await; + let mut inner = axum.inner.lock().unwrap(); assert_eq!(inner.get("hello").await, Some("Hello, World!".to_string())); assert_eq!( From 4b9e47cc374cef826f3c88e05b1adf5b07d79fa8 Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Sat, 12 Nov 2022 23:48:20 +0100 Subject: [PATCH 03/19] feat: serialize/deserialize http requests to rmp this serializes/deserializes http requests across the ffi to the axum-wasm router, using the rust messagepack dataformat. sending the body is still a wip --- Cargo.lock | 152 +++++++++++++++++++++++++++++---------- runtime/Cargo.toml | 8 +++ runtime/README.md | 2 +- runtime/src/axum/mod.rs | 98 ++++++++++++++++++------- tmp/axum-wasm/Cargo.toml | 4 ++ tmp/axum-wasm/src/lib.rs | 50 +++++++------ tmp/utils/Cargo.toml | 12 ++++ tmp/utils/src/lib.rs | 147 +++++++++++++++++++++++++++++++++++++ 8 files changed, 381 insertions(+), 92 deletions(-) create mode 100644 tmp/utils/Cargo.toml create mode 100644 tmp/utils/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index d4f089eca..0bfb15132 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -520,7 +520,7 @@ dependencies = [ "bytes 1.1.0", "hex 0.4.3", "http 0.2.8", - "hyper", + "hyper 0.14.20", "ring", "time 0.3.11", "tokio", @@ -553,7 +553,7 @@ dependencies = [ "aws-types", "bytes 1.1.0", "http 0.2.8", - "http-body", + "http-body 0.4.5", "lazy_static", "percent-encoding", "pin-project-lite 0.2.9", @@ -683,8 +683,8 @@ dependencies = [ "bytes 1.1.0", "fastrand", "http 0.2.8", - "http-body", - "hyper", + "http-body 0.4.5", + "hyper 0.14.20", "hyper-rustls 0.22.1", "lazy_static", "pin-project-lite 0.2.9", @@ -704,8 +704,8 @@ dependencies = [ "bytes-utils", "futures-core", "http 0.2.8", - "http-body", - "hyper", + "http-body 0.4.5", + "hyper 0.14.20", "once_cell", "percent-encoding", "pin-project-lite 0.2.9", @@ -723,7 +723,7 @@ dependencies = [ "aws-smithy-http", "bytes 1.1.0", "http 0.2.8", - "http-body", + "http-body 0.4.5", "pin-project-lite 0.2.9", "tower", "tracing", @@ -799,8 +799,8 @@ dependencies = [ "futures-util", "headers", "http 0.2.8", - "http-body", - "hyper", + "http-body 0.4.5", + "hyper 0.14.20", "itoa 1.0.2", "matchit", "memchr", @@ -830,7 +830,7 @@ dependencies = [ "bytes 1.1.0", "futures-util", "http 0.2.8", - "http-body", + "http-body 0.4.5", "mime", ] @@ -948,7 +948,7 @@ dependencies = [ "futures-util", "hex 0.4.3", "http 0.2.8", - "hyper", + "hyper 0.14.20", "hyperlocal", "log", "pin-project-lite 0.2.9", @@ -3001,6 +3001,16 @@ dependencies = [ "pin-project-lite 0.2.9", ] +[[package]] +name = "http-body" +version = "1.0.0-rc1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f038884e63a5a85612eeea789f5e0f54c3ada7306234502543b23d98744781f0" +dependencies = [ + "bytes 1.1.0", + "http 0.2.8", +] + [[package]] name = "http-client" version = "6.5.3" @@ -3019,6 +3029,16 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" +[[package]] +name = "http-serde" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e272971f774ba29341db2f686255ff8a979365a26fb9e4277f6b6d9ec0cdd5e" +dependencies = [ + "http 0.2.8", + "serde", +] + [[package]] name = "http-types" version = "2.12.0" @@ -3071,7 +3091,7 @@ dependencies = [ "futures-util", "h2", "http 0.2.8", - "http-body", + "http-body 0.4.5", "httparse", "httpdate", "itoa 1.0.2", @@ -3083,12 +3103,33 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.0.0-rc.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d85cfb5d312f278a651d0567873b9c32b28d8ced5603a9242c73c54c93fec814" +dependencies = [ + "bytes 1.1.0", + "futures-channel", + "futures-core", + "futures-util", + "http 0.2.8", + "http-body 1.0.0-rc1", + "httparse", + "httpdate", + "itoa 1.0.2", + "pin-project-lite 0.2.9", + "tokio", + "tracing", + "want", +] + [[package]] name = "hyper-reverse-proxy" version = "0.5.2-dev" source = "git+https://github.com/chesedo/hyper-reverse-proxy?branch=bug/host_header#5f82b7dffe940abf896fe47dadf6c8e87ddc670b" dependencies = [ - "hyper", + "hyper 0.14.20", "lazy_static", "tokio", "tracing", @@ -3099,7 +3140,7 @@ name = "hyper-reverse-proxy" version = "0.5.2-dev" source = "git+https://github.com/chesedo/hyper-reverse-proxy?branch=master#a4deffef77685b37fda7224ae678d3d9f00d391e" dependencies = [ - "hyper", + "hyper 0.14.20", "lazy_static", "tokio", "tracing", @@ -3113,7 +3154,7 @@ checksum = "5f9f7a97316d44c0af9b0301e65010573a853a9fc97046d7331d7f6bc0fd5a64" dependencies = [ "ct-logs", "futures-util", - "hyper", + "hyper 0.14.20", "log", "rustls 0.19.1", "rustls-native-certs", @@ -3129,7 +3170,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d87c48c02e0dc5e3b849a2041db3029fd066650f8f717c07bf8ed78ccb895cac" dependencies = [ "http 0.2.8", - "hyper", + "hyper 0.14.20", "rustls 0.20.6", "tokio", "tokio-rustls 0.23.4", @@ -3141,7 +3182,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper", + "hyper 0.14.20", "pin-project-lite 0.2.9", "tokio", "tokio-io-timeout", @@ -3154,7 +3195,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ "bytes 1.1.0", - "hyper", + "hyper 0.14.20", "native-tls", "tokio", "tokio-native-tls", @@ -3168,7 +3209,7 @@ checksum = "0fafdf7b2b2de7c9784f76e02c0935e65a8117ec3b768644379983ab333ac98c" dependencies = [ "futures-util", "hex 0.4.3", - "hyper", + "hyper 0.14.20", "pin-project", "tokio", ] @@ -4325,7 +4366,7 @@ dependencies = [ "futures-util", "headers", "http 0.2.8", - "hyper", + "hyper 0.14.20", "mime", "parking_lot 0.12.1", "percent-encoding", @@ -4925,8 +4966,8 @@ dependencies = [ "futures-util", "h2", "http 0.2.8", - "http-body", - "hyper", + "http-body 0.4.5", + "hyper 0.14.20", "hyper-rustls 0.23.0", "hyper-tls", "ipnet", @@ -4983,7 +5024,7 @@ dependencies = [ "chrono", "futures", "http 0.2.8", - "hyper", + "hyper 0.14.20", "reqwest", "reqwest-middleware", "retry-policies", @@ -5048,6 +5089,17 @@ dependencies = [ "paste", ] +[[package]] +name = "rmp-serde" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5b13be192e0220b8afb7222aa5813cb62cc269ebb5cac346ca6487681d2913e" +dependencies = [ + "byteorder", + "rmp", + "serde", +] + [[package]] name = "rocket" version = "0.5.0-rc.2" @@ -5112,7 +5164,7 @@ dependencies = [ "either", "futures", "http 0.2.8", - "hyper", + "hyper 0.14.20", "indexmap", "log", "memchr", @@ -5326,7 +5378,7 @@ dependencies = [ "futures-util", "headers", "http 0.2.8", - "hyper", + "hyper 0.14.20", "mime", "mime_guess", "multer", @@ -5463,9 +5515,9 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.143" +version = "1.0.147" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53e8e5d5b70924f74ff5c6d64d9a5acd91422117c60f48c4e07855238a254553" +checksum = "d193d69bae983fc11a79df82342761dfbf28a99fc8d203dca4c3c1b590948965" dependencies = [ "serde_derive", ] @@ -5491,9 +5543,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.143" +version = "1.0.147" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3d8e8de557aee63c26b85b947f5e59b690d0454c753f3adeb5cd7835ab88391" +checksum = "4f1d362ca8fc9c3e3a7484440752472d68a6caa98f1ab81d99b5dfe517cec852" dependencies = [ "proc-macro2 1.0.43", "quote 1.0.21", @@ -5701,6 +5753,16 @@ dependencies = [ "dirs", ] +[[package]] +name = "shuttle-axum-utils" +version = "0.1.0" +dependencies = [ + "http 0.2.8", + "http-serde", + "rmp-serde", + "serde", +] + [[package]] name = "shuttle-codegen" version = "0.7.0" @@ -5748,7 +5810,7 @@ dependencies = [ "fqdn", "futures", "hex 0.4.3", - "hyper", + "hyper 0.14.20", "hyper-reverse-proxy 0.5.2-dev (git+https://github.com/chesedo/hyper-reverse-proxy?branch=master)", "once_cell", "opentelemetry", @@ -5791,7 +5853,7 @@ dependencies = [ "fqdn", "futures", "http 0.2.8", - "hyper", + "hyper 0.14.20", "hyper-reverse-proxy 0.5.2-dev (git+https://github.com/chesedo/hyper-reverse-proxy?branch=bug/host_header)", "once_cell", "opentelemetry", @@ -5858,7 +5920,9 @@ dependencies = [ "async-trait", "cap-std", "clap 4.0.18", + "hyper 1.0.0-rc.1", "serenity", + "shuttle-axum-utils", "shuttle-common", "shuttle-proto", "shuttle-service", @@ -5897,7 +5961,7 @@ dependencies = [ "chrono", "crossbeam-channel", "futures", - "hyper", + "hyper 0.14.20", "libloading", "pipe", "poem", @@ -6912,7 +6976,7 @@ source = "git+https://github.com/shuttle-hq/tokiotest-httpserver?branch=feat/bod dependencies = [ "async-trait", "futures", - "hyper", + "hyper 0.14.20", "lazy_static", "queues", "serde_json", @@ -6959,8 +7023,8 @@ dependencies = [ "futures-util", "h2", "http 0.2.8", - "http-body", - "hyper", + "http-body 0.4.5", + "hyper 0.14.20", "hyper-timeout", "percent-encoding", "pin-project", @@ -7020,7 +7084,7 @@ dependencies = [ "futures-core", "futures-util", "http 0.2.8", - "http-body", + "http-body 0.4.5", "http-range-header", "pin-project-lite 0.2.9", "tower-layer", @@ -7039,7 +7103,7 @@ dependencies = [ "futures-core", "futures-util", "http 0.2.8", - "http-body", + "http-body 0.4.5", "http-range-header", "pin-project-lite 0.2.9", "tower", @@ -7564,7 +7628,7 @@ dependencies = [ "futures-util", "headers", "http 0.2.8", - "hyper", + "hyper 0.14.20", "log", "mime", "mime_guess", @@ -8270,3 +8334,15 @@ dependencies = [ "cc", "libc", ] + +[[patch.unused]] +name = "shuttle-aws-rds" +version = "0.7.0" + +[[patch.unused]] +name = "shuttle-persist" +version = "0.7.0" + +[[patch.unused]] +name = "shuttle-shared-db" +version = "0.7.0" diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 04e03b72d..708ca692f 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -12,6 +12,7 @@ cap-std = "0.26.0" clap ={ version = "4.0.18", features = ["derive"] } serenity = { version = "0.11.5", default-features = false, features = ["client", "gateway", "rustls_backend", "model"] } thiserror = "1.0.37" +hyper = "1.0.0-rc.1" tokio = { version = "=1.20.1", features = ["full"] } tokio-stream = "0.1.11" tonic = "0.8.2" @@ -35,3 +36,10 @@ version = "0.7.0" default-features = false features = ["loader"] path = "../service" + +[dependencies.shuttle-axum-utils] +path = "../tmp/utils" +optional = true + +[features] +axum = ["shuttle-axum-utils"] diff --git a/runtime/README.md b/runtime/README.md index a1243e115..fbf6fb10d 100644 --- a/runtime/README.md +++ b/runtime/README.md @@ -25,7 +25,7 @@ make wasm Run the test: ```bash -cargo test axum -- --nocapture +cargo test --all-features axum -- --nocapture ``` Load and run: diff --git a/runtime/src/axum/mod.rs b/runtime/src/axum/mod.rs index cd4411ae6..9f8a559a6 100644 --- a/runtime/src/axum/mod.rs +++ b/runtime/src/axum/mod.rs @@ -6,9 +6,11 @@ use std::sync::{Arc, Mutex}; use async_trait::async_trait; use cap_std::os::unix::net::UnixStream; +use hyper::Response; +use shuttle_axum_utils::{RequestWrapper, ResponseWrapper}; use shuttle_proto::runtime::runtime_server::Runtime; use shuttle_proto::runtime::{LoadRequest, LoadResponse, StartRequest, StartResponse}; -use tonic::{Request, Response, Status}; +use tonic::Status; use tracing::trace; use wasi_common::file::FileCaps; use wasmtime::{Engine, Linker, Module, Store}; @@ -29,7 +31,10 @@ impl AxumWasm { #[async_trait] impl Runtime for AxumWasm { - async fn load(&self, request: Request) -> Result, Status> { + async fn load( + &self, + request: tonic::Request, + ) -> Result, Status> { let wasm_path = request.into_inner().path; trace!(wasm_path, "loading"); @@ -39,21 +44,21 @@ impl Runtime for AxumWasm { let message = LoadResponse { success: true }; - Ok(Response::new(message)) + Ok(tonic::Response::new(message)) } async fn start( &self, - _request: Request, - ) -> Result, Status> { - // TODO: start a process that streams requests from a socket into wasm router and returns response? + _request: tonic::Request, + ) -> Result, Status> { + // TODO: start a hyper server and serve the axum-wasm router as a service let message = StartResponse { success: true, port: 7002, }; - Ok(Response::new(message)) + Ok(tonic::Response::new(message)) } } @@ -120,8 +125,9 @@ struct RouterInner { } impl RouterInner { - /// Send a GET request to given endpoint on the axum-wasm router - pub async fn get(&mut self, endpoint: &str) -> Option { + /// Send a HTTP request to given endpoint on the axum-wasm router and return the response + /// todo: also send and receive the body + pub async fn send_request(&mut self, req: hyper::Request) -> Response { let (mut host, client) = UnixStream::pair().unwrap(); let client = WasiUnixStream::from_cap_std(client); @@ -129,11 +135,13 @@ impl RouterInner { .data_mut() .insert_file(3, Box::new(client), FileCaps::all()); - host.write_all(endpoint.as_bytes()).unwrap(); - host.write(&[0]).unwrap(); + // serialise request to rmp + let request_rmp = RequestWrapper::from(req).into_rmp(); - println!("calling inner Router endpoint: /{endpoint}"); + host.write_all(&request_rmp).unwrap(); + host.write(&[0]).unwrap(); + println!("calling inner Router"); self.linker .get(&mut self.store, "axum", "__SHUTTLE_Axum_call") .unwrap() @@ -144,16 +152,20 @@ impl RouterInner { .call(&mut self.store, 3) .unwrap(); - let mut res = String::new(); - host.read_to_string(&mut res).unwrap(); + let mut res_buf = Vec::new(); + host.read_to_end(&mut res_buf).unwrap(); - if res.is_empty() { - println!("invalid endpoint"); - None - } else { - println!("received response: {res}"); - Some(res) - } + // deserialize response from rmp + let res = ResponseWrapper::from_rmp(res_buf); + + // todo: clean up conversion of wrapper to request + let mut response = Response::builder().status(res.status).version(res.version); + response + .headers_mut() + .unwrap() + .extend(res.headers.into_iter()); + + response.body("Some body".to_string()).unwrap() } } @@ -174,6 +186,8 @@ impl Router { #[cfg(test)] pub mod tests { + use hyper::{http::HeaderValue, Method, Request, StatusCode, Version}; + use super::*; #[tokio::test] @@ -181,11 +195,41 @@ pub mod tests { let axum = Router::new("axum.wasm"); let mut inner = axum.inner.lock().unwrap(); - assert_eq!(inner.get("hello").await, Some("Hello, World!".to_string())); - assert_eq!( - inner.get("goodbye").await, - Some("Goodbye, World!".to_string()) - ); - assert_eq!(inner.get("not/a/real/endpoint").await, None); + // GET /hello + let request: Request = Request::builder() + .method(Method::GET) + .version(Version::HTTP_11) + .header("test", HeaderValue::from_static("hello")) + .uri(format!("https://axum-wasm.example/hello")) + .body("Some body".to_string()) + .unwrap(); + + let res = inner.send_request(request).await; + assert_eq!(res.status(), StatusCode::OK); + + // GET /goodbye + let request: Request = Request::builder() + .method(Method::GET) + .version(Version::HTTP_11) + .header("test", HeaderValue::from_static("goodbye")) + .uri(format!("https://axum-wasm.example/goodbye")) + .body("Some body".to_string()) + .unwrap(); + + let res = inner.send_request(request).await; + assert_eq!(res.status(), StatusCode::OK); + + // GET /invalid + let request: Request = Request::builder() + .method(Method::GET) + .version(Version::HTTP_11) + .header("test", HeaderValue::from_static("invalid")) + .uri(format!("https://axum-wasm.example/invalid")) + .body("Some body".to_string()) + .unwrap(); + + let res = inner.send_request(request).await; + + assert_eq!(res.status(), StatusCode::NOT_FOUND); } } diff --git a/tmp/axum-wasm/Cargo.toml b/tmp/axum-wasm/Cargo.toml index 638d0f793..7e48765e8 100644 --- a/tmp/axum-wasm/Cargo.toml +++ b/tmp/axum-wasm/Cargo.toml @@ -11,3 +11,7 @@ axum = { version = "0.6.0-rc.4", default-features = false } futures-executor = "0.3.21" http = "0.2.7" tower-service = "0.3.1" + +[dependencies.shuttle-axum-utils] +path = "../utils" +version = "0.1.0" diff --git a/tmp/axum-wasm/src/lib.rs b/tmp/axum-wasm/src/lib.rs index 36b10b4c7..f8b0a2eeb 100644 --- a/tmp/axum-wasm/src/lib.rs +++ b/tmp/axum-wasm/src/lib.rs @@ -1,27 +1,14 @@ -use axum::body::Bytes; -use axum::{body::HttpBody, response::Response, routing::get, Router}; +use axum::{response::Response, routing::get, Router}; use futures_executor::block_on; use http::Request; +use shuttle_axum_utils::{RequestWrapper, ResponseWrapper}; use std::fs::File; use std::io::{Read, Write}; use std::os::wasi::prelude::*; use tower_service::Service; -pub fn handle_request(endpoint: String) -> Option { - let request: Request = Request::builder() - .uri(format!("https://serverless.example/{}", endpoint.clone())) - .body("Some body".to_string()) - .unwrap(); - - let response = block_on(app(request)); - - let response_body = block_on(response.into_body().data()); - - if let Some(body) = response_body { - Some(body.unwrap()) - } else { - None - } +pub fn handle_request(req: Request) -> Response { + block_on(app(req)) } async fn app(request: Request) -> Response { @@ -50,23 +37,34 @@ pub extern "C" fn __SHUTTLE_Axum_call(fd: RawFd) { let mut f = unsafe { File::from_raw_fd(fd) }; - let mut buf = Vec::new(); - let mut c_buf = [0; 1]; + let mut req_buf = Vec::new(); + let mut c_buf: [u8; 1] = [0; 1]; loop { f.read(&mut c_buf).unwrap(); if c_buf[0] == 0 { break; } else { - buf.push(c_buf[0]); + req_buf.push(c_buf[0]); } } - let endpoint = String::from_utf8(buf).unwrap(); + let req = RequestWrapper::from_rmp(req_buf); - println!("inner router called; GET /{endpoint}"); - let res = handle_request(endpoint); + // todo: clean up conversion of wrapper to request + let mut request: Request = Request::builder() + .method(req.method) + .version(req.version) + .uri(req.uri) + .body("Some body".to_string()) + .unwrap(); - if let Some(bytes) = res { - f.write_all(&bytes).unwrap(); - } + request.headers_mut().extend(req.headers.into_iter()); + + println!("inner router received request: {:?}", &request); + let res = handle_request(request); + + println!("inner router sending response: {:?}", &res); + let response = ResponseWrapper::from(res); + + f.write_all(&response.into_rmp()).unwrap(); } diff --git a/tmp/utils/Cargo.toml b/tmp/utils/Cargo.toml new file mode 100644 index 000000000..691214143 --- /dev/null +++ b/tmp/utils/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "shuttle-axum-utils" +version = "0.1.0" +edition = "2021" + +[lib] + +[dependencies] +http = "0.2.7" +serde = { version = "1.0.137", features = [ "derive" ] } +http-serde = { version ="1.1.2" } +rmp-serde = { version = "1.1.1" } diff --git a/tmp/utils/src/lib.rs b/tmp/utils/src/lib.rs new file mode 100644 index 000000000..477f1d86d --- /dev/null +++ b/tmp/utils/src/lib.rs @@ -0,0 +1,147 @@ +use http::{HeaderMap, Method, Request, Response, StatusCode, Uri, Version}; +use rmps::{Deserializer, Serializer}; +use serde::{Deserialize, Serialize}; + +extern crate rmp_serde as rmps; + +// todo: add extensions +#[derive(Serialize, Deserialize, Debug)] +pub struct RequestWrapper { + #[serde(with = "http_serde::method")] + pub method: Method, + + #[serde(with = "http_serde::uri")] + pub uri: Uri, + + #[serde(with = "http_serde::version")] + pub version: Version, + + #[serde(with = "http_serde::header_map")] + pub headers: HeaderMap, +} + +impl From> for RequestWrapper { + fn from(req: Request) -> Self { + let (parts, _) = req.into_parts(); + + Self { + method: parts.method, + uri: parts.uri, + version: parts.version, + headers: parts.headers, + } + } +} + +impl RequestWrapper { + /// Serialize a RequestWrapper to the Rust MessagePack data format + pub fn into_rmp(self) -> Vec { + let mut buf = Vec::new(); + self.serialize(&mut Serializer::new(&mut buf)).unwrap(); + + buf + } + + /// Deserialize a RequestWrapper from the Rust MessagePack data format + pub fn from_rmp(buf: Vec) -> Self { + let mut de = Deserializer::new(buf.as_slice()); + + Deserialize::deserialize(&mut de).unwrap() + } +} + +// todo: add extensions +#[derive(Serialize, Deserialize, Debug)] +pub struct ResponseWrapper { + #[serde(with = "http_serde::status_code")] + pub status: StatusCode, + + #[serde(with = "http_serde::version")] + pub version: Version, + + #[serde(with = "http_serde::header_map")] + pub headers: HeaderMap, +} + +impl From> for ResponseWrapper { + fn from(res: Response) -> Self { + let (parts, _) = res.into_parts(); + + Self { + status: parts.status, + version: parts.version, + headers: parts.headers, + } + } +} + +impl ResponseWrapper { + /// Serialize a ResponseWrapper into the Rust MessagePack data format + pub fn into_rmp(self) -> Vec { + let mut buf = Vec::new(); + self.serialize(&mut Serializer::new(&mut buf)).unwrap(); + + buf + } + + /// Deserialize a ResponseWrapper from the Rust MessagePack data format + pub fn from_rmp(buf: Vec) -> Self { + let mut de = Deserializer::new(buf.as_slice()); + + Deserialize::deserialize(&mut de).unwrap() + } +} + +#[cfg(test)] +mod test { + use http::HeaderValue; + + use super::*; + + #[test] + fn request_roundtrip() { + let request: Request = Request::builder() + .method(Method::PUT) + .version(Version::HTTP_11) + .header("test", HeaderValue::from_static("request")) + .uri(format!("https://axum-wasm.example/hello")) + .body("Some body".to_string()) + .unwrap(); + + let rmp = RequestWrapper::from(request).into_rmp(); + + let back = RequestWrapper::from_rmp(rmp); + + assert_eq!( + back.headers.get("test").unwrap(), + HeaderValue::from_static("request") + ); + assert_eq!(back.method, Method::PUT); + assert_eq!(back.version, Version::HTTP_11); + assert_eq!( + back.uri.to_string(), + "https://axum-wasm.example/hello".to_string() + ); + } + + #[test] + fn response_roundtrip() { + let response: Response = Response::builder() + .version(Version::HTTP_11) + .header("test", HeaderValue::from_static("response")) + .status(StatusCode::NOT_MODIFIED) + .body("Some body".to_string()) + .unwrap(); + + let rmp = ResponseWrapper::from(response).into_rmp(); + + let back = ResponseWrapper::from_rmp(rmp); + + assert_eq!( + back.headers.get("test").unwrap(), + HeaderValue::from_static("response") + ); + assert_eq!(back.status, StatusCode::NOT_MODIFIED); + assert_eq!(back.version, Version::HTTP_11); + } +} From f9d73c21ea4b8269fef5659874f6b7a05ec1a0aa Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Sun, 13 Nov 2022 21:26:28 +0100 Subject: [PATCH 04/19] feat: serialize the full HTTP req/res to rmp --- runtime/Cargo.toml | 3 +- runtime/src/axum/mod.rs | 35 ++++++------- tmp/axum-wasm/src/lib.rs | 30 ++++++----- tmp/utils/Cargo.toml | 13 +++-- tmp/utils/src/lib.rs | 107 ++++++++++++++++++++++++++++----------- 5 files changed, 122 insertions(+), 66 deletions(-) diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 708ca692f..629e90c43 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -12,7 +12,7 @@ cap-std = "0.26.0" clap ={ version = "4.0.18", features = ["derive"] } serenity = { version = "0.11.5", default-features = false, features = ["client", "gateway", "rustls_backend", "model"] } thiserror = "1.0.37" -hyper = "1.0.0-rc.1" +hyper = "0.14.23" tokio = { version = "=1.20.1", features = ["full"] } tokio-stream = "0.1.11" tonic = "0.8.2" @@ -22,6 +22,7 @@ uuid = { version = "1.1.2", features = ["v4"] } wasi-common = "2.0.0" wasmtime = "2.0.0" wasmtime-wasi = "2.0.0" +http-body = "0.4.5" [dependencies.shuttle-common] version = "0.7.0" diff --git a/runtime/src/axum/mod.rs b/runtime/src/axum/mod.rs index 9f8a559a6..cd9b4a0dd 100644 --- a/runtime/src/axum/mod.rs +++ b/runtime/src/axum/mod.rs @@ -6,8 +6,10 @@ use std::sync::{Arc, Mutex}; use async_trait::async_trait; use cap_std::os::unix::net::UnixStream; +use http_body::Full; +use hyper::body::Bytes; use hyper::Response; -use shuttle_axum_utils::{RequestWrapper, ResponseWrapper}; +use shuttle_axum_utils::{wrap_request, RequestWrapper, ResponseWrapper}; use shuttle_proto::runtime::runtime_server::Runtime; use shuttle_proto::runtime::{LoadRequest, LoadResponse, StartRequest, StartResponse}; use tonic::Status; @@ -125,9 +127,8 @@ struct RouterInner { } impl RouterInner { - /// Send a HTTP request to given endpoint on the axum-wasm router and return the response - /// todo: also send and receive the body - pub async fn send_request(&mut self, req: hyper::Request) -> Response { + /// Send a HTTP request with body to given endpoint on the axum-wasm router and return the response + pub async fn send_request(&mut self, req: hyper::Request>) -> Response> { let (mut host, client) = UnixStream::pair().unwrap(); let client = WasiUnixStream::from_cap_std(client); @@ -136,7 +137,7 @@ impl RouterInner { .insert_file(3, Box::new(client), FileCaps::all()); // serialise request to rmp - let request_rmp = RequestWrapper::from(req).into_rmp(); + let request_rmp = wrap_request(req).await.into_rmp(); host.write_all(&request_rmp).unwrap(); host.write(&[0]).unwrap(); @@ -158,14 +159,8 @@ impl RouterInner { // deserialize response from rmp let res = ResponseWrapper::from_rmp(res_buf); - // todo: clean up conversion of wrapper to request - let mut response = Response::builder().status(res.status).version(res.version); - response - .headers_mut() - .unwrap() - .extend(res.headers.into_iter()); - - response.body("Some body".to_string()).unwrap() + // consume the wrapper and return response + res.into_response() } } @@ -196,36 +191,38 @@ pub mod tests { let mut inner = axum.inner.lock().unwrap(); // GET /hello - let request: Request = Request::builder() + let request: Request> = Request::builder() .method(Method::GET) .version(Version::HTTP_11) .header("test", HeaderValue::from_static("hello")) .uri(format!("https://axum-wasm.example/hello")) - .body("Some body".to_string()) + .body(Full::new(Bytes::from_static(b"some body"))) .unwrap(); let res = inner.send_request(request).await; assert_eq!(res.status(), StatusCode::OK); + assert_eq!(std::str::from_utf8(&res.body()).unwrap(), "Hello, World!"); // GET /goodbye - let request: Request = Request::builder() + let request: Request> = Request::builder() .method(Method::GET) .version(Version::HTTP_11) .header("test", HeaderValue::from_static("goodbye")) .uri(format!("https://axum-wasm.example/goodbye")) - .body("Some body".to_string()) + .body(Full::new(Bytes::from_static(b"some body"))) .unwrap(); let res = inner.send_request(request).await; assert_eq!(res.status(), StatusCode::OK); + assert_eq!(std::str::from_utf8(&res.body()).unwrap(), "Goodbye, World!"); // GET /invalid - let request: Request = Request::builder() + let request: Request> = Request::builder() .method(Method::GET) .version(Version::HTTP_11) .header("test", HeaderValue::from_static("invalid")) .uri(format!("https://axum-wasm.example/invalid")) - .body("Some body".to_string()) + .body(Full::new(Bytes::from_static(b"some body"))) .unwrap(); let res = inner.send_request(request).await; diff --git a/tmp/axum-wasm/src/lib.rs b/tmp/axum-wasm/src/lib.rs index f8b0a2eeb..230032a92 100644 --- a/tmp/axum-wasm/src/lib.rs +++ b/tmp/axum-wasm/src/lib.rs @@ -1,17 +1,24 @@ +use axum::body::HttpBody; use axum::{response::Response, routing::get, Router}; use futures_executor::block_on; use http::Request; -use shuttle_axum_utils::{RequestWrapper, ResponseWrapper}; +use shuttle_axum_utils::{wrap_response, RequestWrapper}; use std::fs::File; use std::io::{Read, Write}; use std::os::wasi::prelude::*; use tower_service::Service; -pub fn handle_request(req: Request) -> Response { +pub fn handle_request(req: Request) -> Response +where + B: HttpBody + Send + 'static, +{ block_on(app(req)) } -async fn app(request: Request) -> Response { +async fn app(request: Request) -> Response +where + B: HttpBody + Send + 'static, +{ let mut router = Router::new() .route("/hello", get(hello)) .route("/goodbye", get(goodbye)) @@ -48,23 +55,18 @@ pub extern "C" fn __SHUTTLE_Axum_call(fd: RawFd) { } } + // deserialize request from rust messagepack let req = RequestWrapper::from_rmp(req_buf); - // todo: clean up conversion of wrapper to request - let mut request: Request = Request::builder() - .method(req.method) - .version(req.version) - .uri(req.uri) - .body("Some body".to_string()) - .unwrap(); - - request.headers_mut().extend(req.headers.into_iter()); + // consume wrapper and return Request + let request = req.into_request(); println!("inner router received request: {:?}", &request); let res = handle_request(request); println!("inner router sending response: {:?}", &res); - let response = ResponseWrapper::from(res); + // wrap inner response and serialize it as rust messagepack + let response = block_on(wrap_response(res)).into_rmp(); - f.write_all(&response.into_rmp()).unwrap(); + f.write_all(&response).unwrap(); } diff --git a/tmp/utils/Cargo.toml b/tmp/utils/Cargo.toml index 691214143..52d422674 100644 --- a/tmp/utils/Cargo.toml +++ b/tmp/utils/Cargo.toml @@ -2,11 +2,18 @@ name = "shuttle-axum-utils" version = "0.1.0" edition = "2021" - +description = "Utilities for serializing requests to and from rust messagepack" [lib] [dependencies] http = "0.2.7" -serde = { version = "1.0.137", features = [ "derive" ] } -http-serde = { version ="1.1.2" } +http-body = "0.4.5" +http-serde = { version = "1.1.2" } +# hyper dep because I was struggling to turn http body to bytes with hyper::body::to_bytes +hyper = "0.14.23" rmp-serde = { version = "1.1.1" } +serde = { version = "1.0.137", features = [ "derive" ] } + +[dev-dependencies] +# unit tests have to call an async function to wrap req/res +futures-executor = "0.3.21" diff --git a/tmp/utils/src/lib.rs b/tmp/utils/src/lib.rs index 477f1d86d..82ccf43d2 100644 --- a/tmp/utils/src/lib.rs +++ b/tmp/utils/src/lib.rs @@ -1,10 +1,12 @@ use http::{HeaderMap, Method, Request, Response, StatusCode, Uri, Version}; +use http_body::{Body, Full}; +use hyper::body::Bytes; use rmps::{Deserializer, Serializer}; use serde::{Deserialize, Serialize}; extern crate rmp_serde as rmps; -// todo: add extensions +// todo: add http extensions field #[derive(Serialize, Deserialize, Debug)] pub struct RequestWrapper { #[serde(with = "http_serde::method")] @@ -18,18 +20,27 @@ pub struct RequestWrapper { #[serde(with = "http_serde::header_map")] pub headers: HeaderMap, -} -impl From> for RequestWrapper { - fn from(req: Request) -> Self { - let (parts, _) = req.into_parts(); + // I used Vec since it can derive serialize/deserialize + pub body: Vec, +} - Self { - method: parts.method, - uri: parts.uri, - version: parts.version, - headers: parts.headers, - } +/// Wrap HTTP Request in a struct that can be serialized to and from Rust MessagePack +pub async fn wrap_request(req: Request) -> RequestWrapper +where + B: Body, + B::Error: std::fmt::Debug, +{ + let (parts, body) = req.into_parts(); + + let body = hyper::body::to_bytes(body).await.unwrap(); + + RequestWrapper { + method: parts.method, + uri: parts.uri, + version: parts.version, + headers: parts.headers, + body: body.into(), } } @@ -48,9 +59,23 @@ impl RequestWrapper { Deserialize::deserialize(&mut de).unwrap() } + + /// Consume wrapper and return Request + pub fn into_request(self) -> Request> { + let mut request: Request> = Request::builder() + .method(self.method) + .version(self.version) + .uri(self.uri) + .body(Full::new(self.body.into())) + .unwrap(); + + request.headers_mut().extend(self.headers.into_iter()); + + request + } } -// todo: add extensions +// todo: add http extensions field #[derive(Serialize, Deserialize, Debug)] pub struct ResponseWrapper { #[serde(with = "http_serde::status_code")] @@ -61,17 +86,26 @@ pub struct ResponseWrapper { #[serde(with = "http_serde::header_map")] pub headers: HeaderMap, -} -impl From> for ResponseWrapper { - fn from(res: Response) -> Self { - let (parts, _) = res.into_parts(); + // I used Vec since it can derive serialize/deserialize + pub body: Vec, +} - Self { - status: parts.status, - version: parts.version, - headers: parts.headers, - } +/// Wrap HTTP Response in a struct that can be serialized to and from Rust MessagePack +pub async fn wrap_response(res: Response) -> ResponseWrapper +where + B: Body, + B::Error: std::fmt::Debug, +{ + let (parts, body) = res.into_parts(); + + let body = hyper::body::to_bytes(body).await.unwrap(); + + ResponseWrapper { + status: parts.status, + version: parts.version, + headers: parts.headers, + body: body.into(), } } @@ -90,25 +124,38 @@ impl ResponseWrapper { Deserialize::deserialize(&mut de).unwrap() } + + /// Consume wrapper and return Response + pub fn into_response(self) -> Response> { + let mut response = Response::builder() + .status(self.status) + .version(self.version); + response + .headers_mut() + .unwrap() + .extend(self.headers.into_iter()); + + response.body(self.body).unwrap() + } } #[cfg(test)] mod test { - use http::HeaderValue; - use super::*; + use futures_executor::block_on; + use http::HeaderValue; #[test] fn request_roundtrip() { - let request: Request = Request::builder() + let request: Request> = Request::builder() .method(Method::PUT) .version(Version::HTTP_11) .header("test", HeaderValue::from_static("request")) .uri(format!("https://axum-wasm.example/hello")) - .body("Some body".to_string()) + .body(Full::new(Bytes::from_static(b"request body"))) .unwrap(); - let rmp = RequestWrapper::from(request).into_rmp(); + let rmp = block_on(wrap_request(request)).into_rmp(); let back = RequestWrapper::from_rmp(rmp); @@ -122,18 +169,19 @@ mod test { back.uri.to_string(), "https://axum-wasm.example/hello".to_string() ); + assert_eq!(std::str::from_utf8(&back.body).unwrap(), "request body"); } #[test] fn response_roundtrip() { - let response: Response = Response::builder() + let response: Response> = Response::builder() .version(Version::HTTP_11) .header("test", HeaderValue::from_static("response")) .status(StatusCode::NOT_MODIFIED) - .body("Some body".to_string()) + .body(Full::new(Bytes::from_static(b"response body"))) .unwrap(); - let rmp = ResponseWrapper::from(response).into_rmp(); + let rmp = block_on(wrap_response(response)).into_rmp(); let back = ResponseWrapper::from_rmp(rmp); @@ -143,5 +191,6 @@ mod test { ); assert_eq!(back.status, StatusCode::NOT_MODIFIED); assert_eq!(back.version, Version::HTTP_11); + assert_eq!(std::str::from_utf8(&back.body).unwrap(), "response body"); } } From adbae0aabc0d6e0c3d15a3d0447eb09eaae20983 Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Sun, 13 Nov 2022 21:39:40 +0100 Subject: [PATCH 05/19] fix: comment typo --- tmp/utils/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tmp/utils/Cargo.toml b/tmp/utils/Cargo.toml index 52d422674..ffa1e7f3b 100644 --- a/tmp/utils/Cargo.toml +++ b/tmp/utils/Cargo.toml @@ -9,7 +9,7 @@ description = "Utilities for serializing requests to and from rust messagepack" http = "0.2.7" http-body = "0.4.5" http-serde = { version = "1.1.2" } -# hyper dep because I was struggling to turn http body to bytes with hyper::body::to_bytes +# hyper dep because I was struggling to turn http body to bytes without hyper::body::to_bytes hyper = "0.14.23" rmp-serde = { version = "1.1.1" } serde = { version = "1.0.137", features = [ "derive" ] } From 3c2adb0653ad69dc4ea4751c3976f26de978e19a Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Tue, 15 Nov 2022 18:14:48 +0100 Subject: [PATCH 06/19] feat: start hyper with tonic and serve wasm router --- runtime/Cargo.toml | 4 +- runtime/README.md | 21 ++++++-- runtime/src/axum/mod.rs | 114 ++++++++++++++++++++++++++++++---------- runtime/src/lib.rs | 1 + runtime/src/main.rs | 23 ++++---- tmp/utils/src/lib.rs | 10 ++-- 6 files changed, 126 insertions(+), 47 deletions(-) diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 629e90c43..f7d062ed7 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -12,7 +12,7 @@ cap-std = "0.26.0" clap ={ version = "4.0.18", features = ["derive"] } serenity = { version = "0.11.5", default-features = false, features = ["client", "gateway", "rustls_backend", "model"] } thiserror = "1.0.37" -hyper = "0.14.23" +hyper = { version = "0.14.23", features = ["full"] } tokio = { version = "=1.20.1", features = ["full"] } tokio-stream = "0.1.11" tonic = "0.8.2" @@ -22,7 +22,6 @@ uuid = { version = "1.1.2", features = ["v4"] } wasi-common = "2.0.0" wasmtime = "2.0.0" wasmtime-wasi = "2.0.0" -http-body = "0.4.5" [dependencies.shuttle-common] version = "0.7.0" @@ -44,3 +43,4 @@ optional = true [features] axum = ["shuttle-axum-utils"] + diff --git a/runtime/README.md b/runtime/README.md index fbf6fb10d..4cf650944 100644 --- a/runtime/README.md +++ b/runtime/README.md @@ -9,9 +9,9 @@ $ DISCORD_TOKEN=xxx cargo run In another terminal: ``` bash -grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic", "path": "runtime/bot.wasm"}' localhost:8000 runtime.Runtime/Load -grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic"}' localhost:8000 runtime.Runtime/Start -grpcurl -plaintext -import-path ../proto -proto runtime.proto localhost:8000 runtime.Runtime/SubscribeLogs +grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic", "path": "runtime/axum.wasm"}' localhost:6001 runtime.Runtime/Load +grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic"}' localhost:6001 runtime.Runtime/Start +grpcurl -plaintext -import-path ../proto -proto runtime.proto localhost:6001 runtime.Runtime/SubscribeLogs ``` ## axum-wasm @@ -19,7 +19,7 @@ grpcurl -plaintext -import-path ../proto -proto runtime.proto localhost:8000 run Compile the wasm axum router: ```bash -make wasm +make axum ``` Run the test: @@ -29,7 +29,18 @@ cargo test --all-features axum -- --nocapture ``` Load and run: -TODO + +```bash +make axum +``` + +In another terminal: + +``` bash +grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic", "path": "runtime/axum.wasm"}' localhost:8000 runtime.Runtime/Load +grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic"}' localhost:8000 runtime.Runtime/Start +# grpcurl -plaintext -import-path ../proto -proto runtime.proto localhost:8000 runtime.Runtime/SubscribeLogs +``` ## shuttle-legacy diff --git a/runtime/src/axum/mod.rs b/runtime/src/axum/mod.rs index cd9b4a0dd..df6fe3d21 100644 --- a/runtime/src/axum/mod.rs +++ b/runtime/src/axum/mod.rs @@ -1,19 +1,23 @@ +use std::convert::Infallible; use std::fs::File; use std::io::{Read, Write}; +use std::net::{Ipv4Addr, SocketAddr}; use std::os::unix::prelude::RawFd; use std::path::Path; use std::sync::{Arc, Mutex}; use async_trait::async_trait; use cap_std::os::unix::net::UnixStream; -use http_body::Full; -use hyper::body::Bytes; -use hyper::Response; -use shuttle_axum_utils::{wrap_request, RequestWrapper, ResponseWrapper}; +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Body, Request, Response}; +use shuttle_axum_utils::{wrap_request, ResponseWrapper}; use shuttle_proto::runtime::runtime_server::Runtime; -use shuttle_proto::runtime::{LoadRequest, LoadResponse, StartRequest, StartResponse}; +use shuttle_proto::runtime::{ + self, LoadRequest, LoadResponse, StartRequest, StartResponse, SubscribeLogsRequest, +}; +use tokio_stream::wrappers::ReceiverStream; use tonic::Status; -use tracing::trace; +use tracing::{info, trace}; use wasi_common::file::FileCaps; use wasmtime::{Engine, Linker, Module, Store}; use wasmtime_wasi::sync::net::UnixStream as WasiUnixStream; @@ -21,12 +25,14 @@ use wasmtime_wasi::{WasiCtx, WasiCtxBuilder}; pub struct AxumWasm { router: std::sync::Mutex>, + port: Mutex>, } impl AxumWasm { pub fn new() -> Self { Self { router: std::sync::Mutex::new(None), + port: std::sync::Mutex::new(None), } } } @@ -38,7 +44,7 @@ impl Runtime for AxumWasm { request: tonic::Request, ) -> Result, Status> { let wasm_path = request.into_inner().path; - trace!(wasm_path, "loading"); + info!(wasm_path, "loading"); let router = Router::new(wasm_path); @@ -53,15 +59,47 @@ impl Runtime for AxumWasm { &self, _request: tonic::Request, ) -> Result, Status> { - // TODO: start a hyper server and serve the axum-wasm router as a service + let port = 7002; + let address = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), port); + + let router = self.router.lock().unwrap().take().unwrap(); + let make_service = make_service_fn(move |_conn| { + let router = router.clone(); + async move { + Ok::<_, Infallible>(service_fn(move |req: Request| { + let router = router.clone(); + async move { + Ok::<_, Infallible>( + router.inner.lock().await.send_request(req).await.unwrap(), + ) + } + })) + } + }); + + info!("starting hyper server on: {}", &address); + let server = hyper::Server::bind(&address).serve(make_service); + + _ = tokio::spawn(server); + + *self.port.lock().unwrap() = Some(port); let message = StartResponse { success: true, - port: 7002, + port: port as u32, }; Ok(tonic::Response::new(message)) } + + type SubscribeLogsStream = ReceiverStream>; + + async fn subscribe_logs( + &self, + _request: tonic::Request, + ) -> Result, Status> { + todo!() + } } struct RouterBuilder { @@ -116,7 +154,7 @@ impl RouterBuilder { linker: self.linker, }; Router { - inner: Arc::new(Mutex::new(inner)), + inner: Arc::new(tokio::sync::Mutex::new(inner)), } } } @@ -128,7 +166,10 @@ struct RouterInner { impl RouterInner { /// Send a HTTP request with body to given endpoint on the axum-wasm router and return the response - pub async fn send_request(&mut self, req: hyper::Request>) -> Response> { + pub async fn send_request( + &mut self, + req: hyper::Request, + ) -> Result, Infallible> { let (mut host, client) = UnixStream::pair().unwrap(); let client = WasiUnixStream::from_cap_std(client); @@ -160,13 +201,13 @@ impl RouterInner { let res = ResponseWrapper::from_rmp(res_buf); // consume the wrapper and return response - res.into_response() + Ok(res.into_response()) } } #[derive(Clone)] struct Router { - inner: Arc>, + inner: Arc>, } impl Router { @@ -181,51 +222,70 @@ impl Router { #[cfg(test)] pub mod tests { - use hyper::{http::HeaderValue, Method, Request, StatusCode, Version}; - use super::*; + use hyper::{http::HeaderValue, Method, Request, StatusCode, Version}; #[tokio::test] async fn axum() { let axum = Router::new("axum.wasm"); - let mut inner = axum.inner.lock().unwrap(); + let mut inner = axum.inner.lock().await; // GET /hello - let request: Request> = Request::builder() + let request: Request = Request::builder() .method(Method::GET) .version(Version::HTTP_11) .header("test", HeaderValue::from_static("hello")) .uri(format!("https://axum-wasm.example/hello")) - .body(Full::new(Bytes::from_static(b"some body"))) + .body(Body::empty()) .unwrap(); - let res = inner.send_request(request).await; + let res = inner.send_request(request).await.unwrap(); + assert_eq!(res.status(), StatusCode::OK); - assert_eq!(std::str::from_utf8(&res.body()).unwrap(), "Hello, World!"); + assert_eq!( + &hyper::body::to_bytes(res.into_body()) + .await + .unwrap() + .iter() + .cloned() + .collect::>() + .as_ref(), + b"Hello, World!" + ); // GET /goodbye - let request: Request> = Request::builder() + let request: Request = Request::builder() .method(Method::GET) .version(Version::HTTP_11) .header("test", HeaderValue::from_static("goodbye")) .uri(format!("https://axum-wasm.example/goodbye")) - .body(Full::new(Bytes::from_static(b"some body"))) + .body(Body::empty()) .unwrap(); - let res = inner.send_request(request).await; + let res = inner.send_request(request).await.unwrap(); + assert_eq!(res.status(), StatusCode::OK); - assert_eq!(std::str::from_utf8(&res.body()).unwrap(), "Goodbye, World!"); + assert_eq!( + &hyper::body::to_bytes(res.into_body()) + .await + .unwrap() + .iter() + .cloned() + .collect::>() + .as_ref(), + b"Goodbye, World!" + ); // GET /invalid - let request: Request> = Request::builder() + let request: Request = Request::builder() .method(Method::GET) .version(Version::HTTP_11) .header("test", HeaderValue::from_static("invalid")) .uri(format!("https://axum-wasm.example/invalid")) - .body(Full::new(Bytes::from_static(b"some body"))) + .body(Body::empty()) .unwrap(); - let res = inner.send_request(request).await; + let res = inner.send_request(request).await.unwrap(); assert_eq!(res.status(), StatusCode::NOT_FOUND); } diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 8834f2198..1119bf890 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -5,5 +5,6 @@ mod next; pub mod provisioner_factory; pub use args::Args; +pub use axum::AxumWasm; pub use legacy::Legacy; pub use next::Next; diff --git a/runtime/src/main.rs b/runtime/src/main.rs index d2eae6557..4a4c5bba5 100644 --- a/runtime/src/main.rs +++ b/runtime/src/main.rs @@ -5,7 +5,7 @@ use std::{ use clap::Parser; use shuttle_proto::runtime::runtime_server::RuntimeServer; -use shuttle_runtime::{Args, Legacy, Next}; +use shuttle_runtime::{Args, AxumWasm, Legacy, Next}; use tonic::transport::Server; use tracing::trace; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; @@ -28,19 +28,24 @@ async fn main() { let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 6001); - let provisioner_address = args.provisioner_address; + // let provisioner_address = args.provisioner_address; let mut server_builder = Server::builder().http2_keepalive_interval(Some(Duration::from_secs(60))); - let router = if args.legacy { - let legacy = Legacy::new(provisioner_address); - let svc = RuntimeServer::new(legacy); - server_builder.add_service(svc) - } else { - let next = Next::new(); - let svc = RuntimeServer::new(next); + let router = { + let axum = AxumWasm::new(); + let svc = RuntimeServer::new(axum); server_builder.add_service(svc) }; + // if args.legacy { + // let legacy = Legacy::new(provisioner_address); + // let svc = RuntimeServer::new(legacy); + // server_builder.add_service(svc) + // } else { + // let next = Next::new(); + // let svc = RuntimeServer::new(next); + // server_builder.add_service(svc) + // }; router.serve(addr).await.unwrap(); } diff --git a/tmp/utils/src/lib.rs b/tmp/utils/src/lib.rs index 82ccf43d2..b18b8ab92 100644 --- a/tmp/utils/src/lib.rs +++ b/tmp/utils/src/lib.rs @@ -34,13 +34,14 @@ where let (parts, body) = req.into_parts(); let body = hyper::body::to_bytes(body).await.unwrap(); + let body = body.iter().cloned().collect::>(); RequestWrapper { method: parts.method, uri: parts.uri, version: parts.version, headers: parts.headers, - body: body.into(), + body, } } @@ -100,12 +101,13 @@ where let (parts, body) = res.into_parts(); let body = hyper::body::to_bytes(body).await.unwrap(); + let body = body.iter().cloned().collect::>(); ResponseWrapper { status: parts.status, version: parts.version, headers: parts.headers, - body: body.into(), + body, } } @@ -126,7 +128,7 @@ impl ResponseWrapper { } /// Consume wrapper and return Response - pub fn into_response(self) -> Response> { + pub fn into_response(self) -> Response { let mut response = Response::builder() .status(self.status) .version(self.version); @@ -135,7 +137,7 @@ impl ResponseWrapper { .unwrap() .extend(self.headers.into_iter()); - response.body(self.body).unwrap() + response.body(hyper::Body::from(self.body)).unwrap() } } From 3780bfd246ed79aaee92b3b45d8ea4356ee6948f Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Tue, 15 Nov 2022 21:23:00 +0100 Subject: [PATCH 07/19] feat: clone inner router arc --- runtime/src/axum/mod.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/runtime/src/axum/mod.rs b/runtime/src/axum/mod.rs index df6fe3d21..49e08ffd6 100644 --- a/runtime/src/axum/mod.rs +++ b/runtime/src/axum/mod.rs @@ -17,7 +17,7 @@ use shuttle_proto::runtime::{ }; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; -use tracing::{info, trace}; +use tracing::info; use wasi_common::file::FileCaps; use wasmtime::{Engine, Linker, Module, Store}; use wasmtime_wasi::sync::net::UnixStream as WasiUnixStream; @@ -62,16 +62,15 @@ impl Runtime for AxumWasm { let port = 7002; let address = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), port); - let router = self.router.lock().unwrap().take().unwrap(); + let router = self.router.lock().unwrap().take().unwrap().inner; + let make_service = make_service_fn(move |_conn| { let router = router.clone(); async move { Ok::<_, Infallible>(service_fn(move |req: Request| { let router = router.clone(); async move { - Ok::<_, Infallible>( - router.inner.lock().await.send_request(req).await.unwrap(), - ) + Ok::<_, Infallible>(router.lock().await.send_request(req).await.unwrap()) } })) } From dc26f3ac3a5066941ec9efb6afbce6eb6a3950e6 Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Tue, 15 Nov 2022 22:22:07 +0100 Subject: [PATCH 08/19] feat: send request body without serialization --- runtime/src/axum/mod.rs | 17 +++++++++++++---- tmp/axum-wasm/src/lib.rs | 22 ++++++++++++++++++---- tmp/utils/src/lib.rs | 36 +++++++++++++++++------------------- 3 files changed, 48 insertions(+), 27 deletions(-) diff --git a/runtime/src/axum/mod.rs b/runtime/src/axum/mod.rs index 49e08ffd6..1981303b0 100644 --- a/runtime/src/axum/mod.rs +++ b/runtime/src/axum/mod.rs @@ -176,12 +176,20 @@ impl RouterInner { .data_mut() .insert_file(3, Box::new(client), FileCaps::all()); - // serialise request to rmp - let request_rmp = wrap_request(req).await.into_rmp(); + let (parts, body) = req.into_parts(); + // serialise request parts to rmp + let request_rmp = wrap_request(parts).await.into_rmp(); + + // write request parts host.write_all(&request_rmp).unwrap(); host.write(&[0]).unwrap(); + // write body + host.write_all(hyper::body::to_bytes(body).await.unwrap().as_ref()) + .unwrap(); + host.write(&[0]).unwrap(); + println!("calling inner Router"); self.linker .get(&mut self.store, "axum", "__SHUTTLE_Axum_call") @@ -196,6 +204,7 @@ impl RouterInner { let mut res_buf = Vec::new(); host.read_to_end(&mut res_buf).unwrap(); + // TODO: handle response body the same way as request body (don't serialize it as rmp) // deserialize response from rmp let res = ResponseWrapper::from_rmp(res_buf); @@ -235,7 +244,7 @@ pub mod tests { .version(Version::HTTP_11) .header("test", HeaderValue::from_static("hello")) .uri(format!("https://axum-wasm.example/hello")) - .body(Body::empty()) + .body(Body::from("Hello world body")) .unwrap(); let res = inner.send_request(request).await.unwrap(); @@ -258,7 +267,7 @@ pub mod tests { .version(Version::HTTP_11) .header("test", HeaderValue::from_static("goodbye")) .uri(format!("https://axum-wasm.example/goodbye")) - .body(Body::empty()) + .body(Body::from("Goodbye world body")) .unwrap(); let res = inner.send_request(request).await.unwrap(); diff --git a/tmp/axum-wasm/src/lib.rs b/tmp/axum-wasm/src/lib.rs index 230032a92..b3d5026e2 100644 --- a/tmp/axum-wasm/src/lib.rs +++ b/tmp/axum-wasm/src/lib.rs @@ -44,6 +44,7 @@ pub extern "C" fn __SHUTTLE_Axum_call(fd: RawFd) { let mut f = unsafe { File::from_raw_fd(fd) }; + // read request parts from host let mut req_buf = Vec::new(); let mut c_buf: [u8; 1] = [0; 1]; loop { @@ -55,16 +56,29 @@ pub extern "C" fn __SHUTTLE_Axum_call(fd: RawFd) { } } - // deserialize request from rust messagepack - let req = RequestWrapper::from_rmp(req_buf); + // deserialize request parts from rust messagepack + let wrapper = RequestWrapper::from_rmp(req_buf); - // consume wrapper and return Request - let request = req.into_request(); + // read request body from host + let mut body_buf = Vec::new(); + let mut c_buf: [u8; 1] = [0; 1]; + loop { + f.read(&mut c_buf).unwrap(); + if c_buf[0] == 0 { + break; + } else { + body_buf.push(c_buf[0]); + } + } + + // set body in the wrapper (Body::Empty if buf is empty), consume wrapper and return Request + let request = wrapper.set_body(body_buf).into_request(); println!("inner router received request: {:?}", &request); let res = handle_request(request); println!("inner router sending response: {:?}", &res); + // TODO: handle response body the same as request body (don't serialize it as rmp) // wrap inner response and serialize it as rust messagepack let response = block_on(wrap_response(res)).into_rmp(); diff --git a/tmp/utils/src/lib.rs b/tmp/utils/src/lib.rs index b18b8ab92..ae28cc614 100644 --- a/tmp/utils/src/lib.rs +++ b/tmp/utils/src/lib.rs @@ -1,5 +1,6 @@ -use http::{HeaderMap, Method, Request, Response, StatusCode, Uri, Version}; -use http_body::{Body, Full}; +use http::{request::Parts, HeaderMap, Method, Request, Response, StatusCode, Uri, Version}; +use http_body::Body as HttpBody; +use hyper::body::Body; use hyper::body::Bytes; use rmps::{Deserializer, Serializer}; use serde::{Deserialize, Serialize}; @@ -21,27 +22,18 @@ pub struct RequestWrapper { #[serde(with = "http_serde::header_map")] pub headers: HeaderMap, - // I used Vec since it can derive serialize/deserialize + #[serde(skip)] pub body: Vec, } -/// Wrap HTTP Request in a struct that can be serialized to and from Rust MessagePack -pub async fn wrap_request(req: Request) -> RequestWrapper -where - B: Body, - B::Error: std::fmt::Debug, -{ - let (parts, body) = req.into_parts(); - - let body = hyper::body::to_bytes(body).await.unwrap(); - let body = body.iter().cloned().collect::>(); - +/// Wrap HTTP Request parts in a struct that can be serialized to and from Rust MessagePack +pub async fn wrap_request(parts: Parts) -> RequestWrapper { RequestWrapper { method: parts.method, uri: parts.uri, version: parts.version, headers: parts.headers, - body, + body: Vec::new(), } } @@ -61,13 +53,19 @@ impl RequestWrapper { Deserialize::deserialize(&mut de).unwrap() } + /// Set the http body + pub fn set_body(mut self, buf: Vec) -> Self { + self.body = buf; + self + } + /// Consume wrapper and return Request - pub fn into_request(self) -> Request> { - let mut request: Request> = Request::builder() + pub fn into_request(self) -> Request { + let mut request = Request::builder() .method(self.method) .version(self.version) .uri(self.uri) - .body(Full::new(self.body.into())) + .body(Body::from(self.body)) .unwrap(); request.headers_mut().extend(self.headers.into_iter()); @@ -95,7 +93,7 @@ pub struct ResponseWrapper { /// Wrap HTTP Response in a struct that can be serialized to and from Rust MessagePack pub async fn wrap_response(res: Response) -> ResponseWrapper where - B: Body, + B: HttpBody, B::Error: std::fmt::Debug, { let (parts, body) = res.into_parts(); From d558aa38682baf2fea3a3ff48e5e105406ccc9ce Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Tue, 15 Nov 2022 22:24:28 +0100 Subject: [PATCH 09/19] docs: todo comment --- tmp/axum-wasm/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tmp/axum-wasm/src/lib.rs b/tmp/axum-wasm/src/lib.rs index b3d5026e2..71a40c11e 100644 --- a/tmp/axum-wasm/src/lib.rs +++ b/tmp/axum-wasm/src/lib.rs @@ -59,6 +59,7 @@ pub extern "C" fn __SHUTTLE_Axum_call(fd: RawFd) { // deserialize request parts from rust messagepack let wrapper = RequestWrapper::from_rmp(req_buf); + // TODO: deduplicate this? Is it the correct strategy to send two separate files? // read request body from host let mut body_buf = Vec::new(); let mut c_buf: [u8; 1] = [0; 1]; From fe98ea0714d47ffb6756e57da073c9dbcc87050c Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Wed, 16 Nov 2022 12:08:50 +0100 Subject: [PATCH 10/19] feat: write response body --- runtime/README.md | 2 +- runtime/src/axum/mod.rs | 41 +++++++++++++++---- tmp/axum-wasm/src/lib.rs | 21 +++++++--- tmp/utils/Cargo.toml | 3 -- tmp/utils/src/lib.rs | 88 +++++++++++++++++++--------------------- 5 files changed, 90 insertions(+), 65 deletions(-) diff --git a/runtime/README.md b/runtime/README.md index 4cf650944..0d9da1bad 100644 --- a/runtime/README.md +++ b/runtime/README.md @@ -31,7 +31,7 @@ cargo test --all-features axum -- --nocapture Load and run: ```bash -make axum +cargo run --all-features ``` In another terminal: diff --git a/runtime/src/axum/mod.rs b/runtime/src/axum/mod.rs index 1981303b0..547a00f72 100644 --- a/runtime/src/axum/mod.rs +++ b/runtime/src/axum/mod.rs @@ -10,7 +10,7 @@ use async_trait::async_trait; use cap_std::os::unix::net::UnixStream; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response}; -use shuttle_axum_utils::{wrap_request, ResponseWrapper}; +use shuttle_axum_utils::{RequestWrapper, ResponseWrapper}; use shuttle_proto::runtime::runtime_server::Runtime; use shuttle_proto::runtime::{ self, LoadRequest, LoadResponse, StartRequest, StartResponse, SubscribeLogsRequest, @@ -179,7 +179,7 @@ impl RouterInner { let (parts, body) = req.into_parts(); // serialise request parts to rmp - let request_rmp = wrap_request(parts).await.into_rmp(); + let request_rmp = RequestWrapper::from(parts).into_rmp(); // write request parts host.write_all(&request_rmp).unwrap(); @@ -201,15 +201,39 @@ impl RouterInner { .call(&mut self.store, 3) .unwrap(); + // read response parts from host let mut res_buf = Vec::new(); - host.read_to_end(&mut res_buf).unwrap(); + let mut c_buf: [u8; 1] = [0; 1]; + loop { + host.read(&mut c_buf).unwrap(); + if c_buf[0] == 0 { + break; + } else { + res_buf.push(c_buf[0]); + } + } + + println!("received response parts buf: \n{:?}", &res_buf); + // deserialize response parts from rust messagepack + let wrapper = ResponseWrapper::from_rmp(res_buf); + + // read response body from wasm router + let mut body_buf = Vec::new(); + let mut c_buf: [u8; 1] = [0; 1]; + loop { + host.read(&mut c_buf).unwrap(); + if c_buf[0] == 0 { + break; + } else { + body_buf.push(c_buf[0]); + } + } - // TODO: handle response body the same way as request body (don't serialize it as rmp) - // deserialize response from rmp - let res = ResponseWrapper::from_rmp(res_buf); + // set body in the wrapper (Body::Empty if buf is empty), consume wrapper and return Response + let response = wrapper.set_body(body_buf).into_response(); // consume the wrapper and return response - Ok(res.into_response()) + Ok(response) } } @@ -242,9 +266,8 @@ pub mod tests { let request: Request = Request::builder() .method(Method::GET) .version(Version::HTTP_11) - .header("test", HeaderValue::from_static("hello")) .uri(format!("https://axum-wasm.example/hello")) - .body(Body::from("Hello world body")) + .body(Body::empty()) .unwrap(); let res = inner.send_request(request).await.unwrap(); diff --git a/tmp/axum-wasm/src/lib.rs b/tmp/axum-wasm/src/lib.rs index 71a40c11e..19f70385c 100644 --- a/tmp/axum-wasm/src/lib.rs +++ b/tmp/axum-wasm/src/lib.rs @@ -2,7 +2,7 @@ use axum::body::HttpBody; use axum::{response::Response, routing::get, Router}; use futures_executor::block_on; use http::Request; -use shuttle_axum_utils::{wrap_response, RequestWrapper}; +use shuttle_axum_utils::{RequestWrapper, ResponseWrapper}; use std::fs::File; use std::io::{Read, Write}; use std::os::wasi::prelude::*; @@ -78,10 +78,19 @@ pub extern "C" fn __SHUTTLE_Axum_call(fd: RawFd) { println!("inner router received request: {:?}", &request); let res = handle_request(request); - println!("inner router sending response: {:?}", &res); - // TODO: handle response body the same as request body (don't serialize it as rmp) - // wrap inner response and serialize it as rust messagepack - let response = block_on(wrap_response(res)).into_rmp(); + let (parts, mut body) = res.into_parts(); - f.write_all(&response).unwrap(); + println!("sending parts: {:?}", parts.headers.clone()); + // wrap and serialize response parts as rmp + let response_parts = ResponseWrapper::from(parts); + + println!("sending response parts: {:?}", &response_parts); + // write response parts + f.write_all(&response_parts).unwrap(); + f.write(&[0]).unwrap(); + + // write body + f.write_all(block_on(body.data()).unwrap().unwrap().as_ref()) + .unwrap(); + f.write(&[0]).unwrap(); } diff --git a/tmp/utils/Cargo.toml b/tmp/utils/Cargo.toml index ffa1e7f3b..970ab63e6 100644 --- a/tmp/utils/Cargo.toml +++ b/tmp/utils/Cargo.toml @@ -6,10 +6,7 @@ description = "Utilities for serializing requests to and from rust messagepack" [lib] [dependencies] -http = "0.2.7" -http-body = "0.4.5" http-serde = { version = "1.1.2" } -# hyper dep because I was struggling to turn http body to bytes without hyper::body::to_bytes hyper = "0.14.23" rmp-serde = { version = "1.1.1" } serde = { version = "1.0.137", features = [ "derive" ] } diff --git a/tmp/utils/src/lib.rs b/tmp/utils/src/lib.rs index ae28cc614..eacc6625b 100644 --- a/tmp/utils/src/lib.rs +++ b/tmp/utils/src/lib.rs @@ -1,7 +1,5 @@ -use http::{request::Parts, HeaderMap, Method, Request, Response, StatusCode, Uri, Version}; -use http_body::Body as HttpBody; use hyper::body::Body; -use hyper::body::Bytes; +use hyper::http::{HeaderMap, Method, Request, Response, StatusCode, Uri, Version}; use rmps::{Deserializer, Serializer}; use serde::{Deserialize, Serialize}; @@ -26,14 +24,15 @@ pub struct RequestWrapper { pub body: Vec, } -/// Wrap HTTP Request parts in a struct that can be serialized to and from Rust MessagePack -pub async fn wrap_request(parts: Parts) -> RequestWrapper { - RequestWrapper { - method: parts.method, - uri: parts.uri, - version: parts.version, - headers: parts.headers, - body: Vec::new(), +impl From for RequestWrapper { + fn from(parts: hyper::http::request::Parts) -> Self { + RequestWrapper { + method: parts.method, + uri: parts.uri, + version: parts.version, + headers: parts.headers, + body: Vec::new(), + } } } @@ -53,7 +52,7 @@ impl RequestWrapper { Deserialize::deserialize(&mut de).unwrap() } - /// Set the http body + /// Set the request body pub fn set_body(mut self, buf: Vec) -> Self { self.body = buf; self @@ -86,26 +85,18 @@ pub struct ResponseWrapper { #[serde(with = "http_serde::header_map")] pub headers: HeaderMap, - // I used Vec since it can derive serialize/deserialize + #[serde(skip)] pub body: Vec, } -/// Wrap HTTP Response in a struct that can be serialized to and from Rust MessagePack -pub async fn wrap_response(res: Response) -> ResponseWrapper -where - B: HttpBody, - B::Error: std::fmt::Debug, -{ - let (parts, body) = res.into_parts(); - - let body = hyper::body::to_bytes(body).await.unwrap(); - let body = body.iter().cloned().collect::>(); - - ResponseWrapper { - status: parts.status, - version: parts.version, - headers: parts.headers, - body, +impl From for ResponseWrapper { + fn from(parts: hyper::http::response::Parts) -> Self { + ResponseWrapper { + status: parts.status, + version: parts.version, + headers: parts.headers, + body: Vec::new(), + } } } @@ -125,37 +116,43 @@ impl ResponseWrapper { Deserialize::deserialize(&mut de).unwrap() } + /// Set the response body + pub fn set_body(mut self, buf: Vec) -> Self { + self.body = buf; + self + } + /// Consume wrapper and return Response - pub fn into_response(self) -> Response { + pub fn into_response(self) -> Response { let mut response = Response::builder() .status(self.status) - .version(self.version); - response - .headers_mut() - .unwrap() - .extend(self.headers.into_iter()); + .version(self.version) + .body(Body::from(self.body)) + .unwrap(); + + response.headers_mut().extend(self.headers.into_iter()); - response.body(hyper::Body::from(self.body)).unwrap() + response } } #[cfg(test)] mod test { use super::*; - use futures_executor::block_on; - use http::HeaderValue; + use hyper::http::HeaderValue; #[test] fn request_roundtrip() { - let request: Request> = Request::builder() + let request: Request = Request::builder() .method(Method::PUT) .version(Version::HTTP_11) .header("test", HeaderValue::from_static("request")) .uri(format!("https://axum-wasm.example/hello")) - .body(Full::new(Bytes::from_static(b"request body"))) + .body(Body::empty()) .unwrap(); - let rmp = block_on(wrap_request(request)).into_rmp(); + let (parts, _) = request.into_parts(); + let rmp = RequestWrapper::from(parts).into_rmp(); let back = RequestWrapper::from_rmp(rmp); @@ -169,19 +166,19 @@ mod test { back.uri.to_string(), "https://axum-wasm.example/hello".to_string() ); - assert_eq!(std::str::from_utf8(&back.body).unwrap(), "request body"); } #[test] fn response_roundtrip() { - let response: Response> = Response::builder() + let response: Response = Response::builder() .version(Version::HTTP_11) .header("test", HeaderValue::from_static("response")) .status(StatusCode::NOT_MODIFIED) - .body(Full::new(Bytes::from_static(b"response body"))) + .body(Body::empty()) .unwrap(); - let rmp = block_on(wrap_response(response)).into_rmp(); + let (parts, _) = response.into_parts(); + let rmp = ResponseWrapper::from(parts).into_rmp(); let back = ResponseWrapper::from_rmp(rmp); @@ -191,6 +188,5 @@ mod test { ); assert_eq!(back.status, StatusCode::NOT_MODIFIED); assert_eq!(back.version, Version::HTTP_11); - assert_eq!(std::str::from_utf8(&back.body).unwrap(), "response body"); } } From d78a5bdc174bb82a1ad1ac21952330011711d6e9 Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Wed, 16 Nov 2022 12:18:49 +0100 Subject: [PATCH 11/19] fix: serialize response parts --- tmp/axum-wasm/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tmp/axum-wasm/src/lib.rs b/tmp/axum-wasm/src/lib.rs index 19f70385c..508c582a4 100644 --- a/tmp/axum-wasm/src/lib.rs +++ b/tmp/axum-wasm/src/lib.rs @@ -82,7 +82,7 @@ pub extern "C" fn __SHUTTLE_Axum_call(fd: RawFd) { println!("sending parts: {:?}", parts.headers.clone()); // wrap and serialize response parts as rmp - let response_parts = ResponseWrapper::from(parts); + let response_parts = ResponseWrapper::from(parts).into_rmp(); println!("sending response parts: {:?}", &response_parts); // write response parts From e47628f01cff69fd4c6bc2691cacf85249e36dbe Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Wed, 16 Nov 2022 13:44:57 +0100 Subject: [PATCH 12/19] feat: deserialize parts directly from reader this also adds a new fd to separate streaming of body and parts --- Cargo.lock | 206 +++++++++++++++++---------------------- runtime/Cargo.toml | 1 + runtime/src/axum/mod.rs | 45 +++++---- tmp/axum-wasm/Cargo.toml | 1 + tmp/axum-wasm/src/lib.rs | 45 ++++----- 5 files changed, 131 insertions(+), 167 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0bfb15132..e55afbd4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -517,10 +517,10 @@ dependencies = [ "aws-smithy-json", "aws-smithy-types", "aws-types", - "bytes 1.1.0", + "bytes 1.2.1", "hex 0.4.3", "http 0.2.8", - "hyper 0.14.20", + "hyper", "ring", "time 0.3.11", "tokio", @@ -551,9 +551,9 @@ dependencies = [ "aws-smithy-http", "aws-smithy-types", "aws-types", - "bytes 1.1.0", + "bytes 1.2.1", "http 0.2.8", - "http-body 0.4.5", + "http-body", "lazy_static", "percent-encoding", "pin-project-lite 0.2.9", @@ -577,7 +577,7 @@ dependencies = [ "aws-smithy-types", "aws-smithy-xml", "aws-types", - "bytes 1.1.0", + "bytes 1.2.1", "http 0.2.8", "tokio-stream", "tower", @@ -599,7 +599,7 @@ dependencies = [ "aws-smithy-json", "aws-smithy-types", "aws-types", - "bytes 1.1.0", + "bytes 1.2.1", "http 0.2.8", "tokio-stream", "tower", @@ -622,7 +622,7 @@ dependencies = [ "aws-smithy-types", "aws-smithy-xml", "aws-types", - "bytes 1.1.0", + "bytes 1.2.1", "http 0.2.8", "tower", ] @@ -680,11 +680,11 @@ dependencies = [ "aws-smithy-http", "aws-smithy-http-tower", "aws-smithy-types", - "bytes 1.1.0", + "bytes 1.2.1", "fastrand", "http 0.2.8", - "http-body 0.4.5", - "hyper 0.14.20", + "http-body", + "hyper", "hyper-rustls 0.22.1", "lazy_static", "pin-project-lite 0.2.9", @@ -700,12 +700,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5cc1af50eac644ab6f58e5bae29328ba3092851fc2ce648ad139134699b2b66f" dependencies = [ "aws-smithy-types", - "bytes 1.1.0", + "bytes 1.2.1", "bytes-utils", "futures-core", "http 0.2.8", - "http-body 0.4.5", - "hyper 0.14.20", + "http-body", + "hyper", "once_cell", "percent-encoding", "pin-project-lite 0.2.9", @@ -721,9 +721,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1bf4c4664dff2febf91f8796505c5bc8f38a0bff0d1397d1d3fdda17bd5c5d1" dependencies = [ "aws-smithy-http", - "bytes 1.1.0", + "bytes 1.2.1", "http 0.2.8", - "http-body 0.4.5", + "http-body", "pin-project-lite 0.2.9", "tower", "tracing", @@ -795,12 +795,12 @@ dependencies = [ "axum-core", "base64 0.13.0", "bitflags", - "bytes 1.1.0", + "bytes 1.2.1", "futures-util", "headers", "http 0.2.8", - "http-body 0.4.5", - "hyper 0.14.20", + "http-body", + "hyper", "itoa 1.0.2", "matchit", "memchr", @@ -827,10 +827,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4f44a0e6200e9d11a1cdc989e4b358f6e3d354fbf48478f345a17f4e43f8635" dependencies = [ "async-trait", - "bytes 1.1.0", + "bytes 1.2.1", "futures-util", "http 0.2.8", - "http-body 0.4.5", + "http-body", "mime", ] @@ -943,12 +943,12 @@ checksum = "d82e7850583ead5f8bbef247e2a3c37a19bd576e8420cd262a6711921827e1e5" dependencies = [ "base64 0.13.0", "bollard-stubs", - "bytes 1.1.0", + "bytes 1.2.1", "futures-core", "futures-util", "hex 0.4.3", "http 0.2.8", - "hyper 0.14.20", + "hyper", "hyperlocal", "log", "pin-project-lite 0.2.9", @@ -1064,9 +1064,9 @@ checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" [[package]] name = "bytes" -version = "1.1.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" [[package]] name = "bytes-utils" @@ -1074,7 +1074,7 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1934a3ef9cac8efde4966a92781e77713e1ba329f1d42e446c7d7eba340d8ef1" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.1", "either", ] @@ -1513,7 +1513,7 @@ version = "4.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a604e93b79d1808327a6fca85a6f2d69de66461e7620f5a4cbf5fb4d1d7c948" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.1", "memchr", ] @@ -2784,7 +2784,7 @@ version = "0.3.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37a82c6d637fc9515a4694bbf1cb2457b79d81ce52b3108bdeea58b07dd34a57" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.1", "fnv", "futures-core", "futures-sink", @@ -2841,7 +2841,7 @@ checksum = "f3e372db8e5c0d213e0cd0b9be18be2aca3d44cf2fe30a9d46a65581cd454584" dependencies = [ "base64 0.13.0", "bitflags", - "bytes 1.1.0", + "bytes 1.2.1", "headers-core", "http 0.2.8", "httpdate", @@ -2985,7 +2985,7 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.1", "fnv", "itoa 1.0.2", ] @@ -2996,21 +2996,11 @@ version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.1", "http 0.2.8", "pin-project-lite 0.2.9", ] -[[package]] -name = "http-body" -version = "1.0.0-rc1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f038884e63a5a85612eeea789f5e0f54c3ada7306234502543b23d98744781f0" -dependencies = [ - "bytes 1.1.0", - "http 0.2.8", -] - [[package]] name = "http-client" version = "6.5.3" @@ -3063,9 +3053,9 @@ dependencies = [ [[package]] name = "httparse" -version = "1.7.1" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "496ce29bb5a52785b44e0f7ca2847ae0bb839c9bd28f69acac9b99d461c0c04c" +checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" [[package]] name = "httpdate" @@ -3081,17 +3071,17 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.20" +version = "0.14.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02c929dc5c39e335a03c405292728118860721b10190d98c2a0f0efd5baafbac" +checksum = "034711faac9d2166cb1baf1a2fb0b60b1f277f8492fd72176c17f3515e1abd3c" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.1", "futures-channel", "futures-core", "futures-util", "h2", "http 0.2.8", - "http-body 0.4.5", + "http-body", "httparse", "httpdate", "itoa 1.0.2", @@ -3103,33 +3093,12 @@ dependencies = [ "want", ] -[[package]] -name = "hyper" -version = "1.0.0-rc.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d85cfb5d312f278a651d0567873b9c32b28d8ced5603a9242c73c54c93fec814" -dependencies = [ - "bytes 1.1.0", - "futures-channel", - "futures-core", - "futures-util", - "http 0.2.8", - "http-body 1.0.0-rc1", - "httparse", - "httpdate", - "itoa 1.0.2", - "pin-project-lite 0.2.9", - "tokio", - "tracing", - "want", -] - [[package]] name = "hyper-reverse-proxy" version = "0.5.2-dev" source = "git+https://github.com/chesedo/hyper-reverse-proxy?branch=bug/host_header#5f82b7dffe940abf896fe47dadf6c8e87ddc670b" dependencies = [ - "hyper 0.14.20", + "hyper", "lazy_static", "tokio", "tracing", @@ -3140,7 +3109,7 @@ name = "hyper-reverse-proxy" version = "0.5.2-dev" source = "git+https://github.com/chesedo/hyper-reverse-proxy?branch=master#a4deffef77685b37fda7224ae678d3d9f00d391e" dependencies = [ - "hyper 0.14.20", + "hyper", "lazy_static", "tokio", "tracing", @@ -3154,7 +3123,7 @@ checksum = "5f9f7a97316d44c0af9b0301e65010573a853a9fc97046d7331d7f6bc0fd5a64" dependencies = [ "ct-logs", "futures-util", - "hyper 0.14.20", + "hyper", "log", "rustls 0.19.1", "rustls-native-certs", @@ -3170,7 +3139,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d87c48c02e0dc5e3b849a2041db3029fd066650f8f717c07bf8ed78ccb895cac" dependencies = [ "http 0.2.8", - "hyper 0.14.20", + "hyper", "rustls 0.20.6", "tokio", "tokio-rustls 0.23.4", @@ -3182,7 +3151,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper 0.14.20", + "hyper", "pin-project-lite 0.2.9", "tokio", "tokio-io-timeout", @@ -3194,8 +3163,8 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ - "bytes 1.1.0", - "hyper 0.14.20", + "bytes 1.2.1", + "hyper", "native-tls", "tokio", "tokio-native-tls", @@ -3209,7 +3178,7 @@ checksum = "0fafdf7b2b2de7c9784f76e02c0935e65a8117ec3b768644379983ab333ac98c" dependencies = [ "futures-util", "hex 0.4.3", - "hyper 0.14.20", + "hyper", "pin-project", "tokio", ] @@ -3801,7 +3770,7 @@ version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f8f35e687561d5c1667590911e6698a8cb714a134a7505718a182e7bc9d3836" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.1", "encoding_rs", "futures-util", "http 0.2.8", @@ -4132,7 +4101,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "449048140ee61e28f57abe6e9975eedc1f3a29855c7407bd6c12b18578863379" dependencies = [ "async-trait", - "bytes 1.1.0", + "bytes 1.2.1", "http 0.2.8", "opentelemetry", "reqwest", @@ -4362,11 +4331,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "16d0fec4acc8779b696e3ff25527884fb17cda6cf59a249c57aa1af1e2f65b36" dependencies = [ "async-trait", - "bytes 1.1.0", + "bytes 1.2.1", "futures-util", "headers", "http 0.2.8", - "hyper 0.14.20", + "hyper", "mime", "parking_lot 0.12.1", "percent-encoding", @@ -4548,7 +4517,7 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "399c3c31cdec40583bb68f0b18403400d01ec4289c383aa047560439952c4dd7" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.1", "prost-derive", ] @@ -4558,7 +4527,7 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f835c582e6bd972ba8347313300219fed5bfa52caf175298d860b61ff6069bb" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.1", "heck", "itertools", "lazy_static", @@ -4591,7 +4560,7 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dfaa718ad76a44b3415e6c4d53b17c8f99160dcb3a99b10470fce8ad43f6e3e" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.1", "prost", ] @@ -4960,14 +4929,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b75aa69a3f06bbcc66ede33af2af253c6f7a86b1ca0033f60c580a27074fbf92" dependencies = [ "base64 0.13.0", - "bytes 1.1.0", + "bytes 1.2.1", "encoding_rs", "futures-core", "futures-util", "h2", "http 0.2.8", - "http-body 0.4.5", - "hyper 0.14.20", + "http-body", + "hyper", "hyper-rustls 0.23.0", "hyper-tls", "ipnet", @@ -5024,7 +4993,7 @@ dependencies = [ "chrono", "futures", "http 0.2.8", - "hyper 0.14.20", + "hyper", "reqwest", "reqwest-middleware", "retry-policies", @@ -5111,7 +5080,7 @@ dependencies = [ "atomic", "atty", "binascii", - "bytes 1.1.0", + "bytes 1.2.1", "either", "figment", "futures", @@ -5164,7 +5133,7 @@ dependencies = [ "either", "futures", "http 0.2.8", - "hyper 0.14.20", + "hyper", "indexmap", "log", "memchr", @@ -5369,7 +5338,7 @@ dependencies = [ "async-compression", "async-trait", "base64 0.13.0", - "bytes 1.1.0", + "bytes 1.2.1", "cookie 0.16.0", "encoding_rs", "enumflags2", @@ -5378,7 +5347,7 @@ dependencies = [ "futures-util", "headers", "http 0.2.8", - "hyper 0.14.20", + "hyper", "mime", "mime_guess", "multer", @@ -5637,7 +5606,7 @@ dependencies = [ "async-tungstenite", "base64 0.13.0", "bitflags", - "bytes 1.1.0", + "bytes 1.2.1", "cfg-if 1.0.0", "flate2", "futures", @@ -5757,8 +5726,8 @@ dependencies = [ name = "shuttle-axum-utils" version = "0.1.0" dependencies = [ - "http 0.2.8", "http-serde", + "hyper", "rmp-serde", "serde", ] @@ -5799,7 +5768,7 @@ dependencies = [ "anyhow", "async-trait", "axum", - "bytes 1.1.0", + "bytes 1.2.1", "cargo", "cargo_metadata", "chrono", @@ -5810,7 +5779,7 @@ dependencies = [ "fqdn", "futures", "hex 0.4.3", - "hyper 0.14.20", + "hyper", "hyper-reverse-proxy 0.5.2-dev (git+https://github.com/chesedo/hyper-reverse-proxy?branch=master)", "once_cell", "opentelemetry", @@ -5853,7 +5822,7 @@ dependencies = [ "fqdn", "futures", "http 0.2.8", - "hyper 0.14.20", + "hyper", "hyper-reverse-proxy 0.5.2-dev (git+https://github.com/chesedo/hyper-reverse-proxy?branch=bug/host_header)", "once_cell", "opentelemetry", @@ -5920,7 +5889,8 @@ dependencies = [ "async-trait", "cap-std", "clap 4.0.18", - "hyper 1.0.0-rc.1", + "hyper", + "rmp-serde", "serenity", "shuttle-axum-utils", "shuttle-common", @@ -5961,7 +5931,7 @@ dependencies = [ "chrono", "crossbeam-channel", "futures", - "hyper 0.14.20", + "hyper", "libloading", "pipe", "poem", @@ -6076,9 +6046,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.4.4" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0" +checksum = "02e2d2db9033d13a1567121ddd7a095ee144db4e1ca1b1bda3419bc0da294ebd" dependencies = [ "libc", "winapi", @@ -6151,7 +6121,7 @@ dependencies = [ "atoi 0.4.0", "bitflags", "byteorder", - "bytes 1.1.0", + "bytes 1.2.1", "crc 2.1.0", "crossbeam-queue", "either", @@ -6199,7 +6169,7 @@ dependencies = [ "base64 0.13.0", "bitflags", "byteorder", - "bytes 1.1.0", + "bytes 1.2.1", "chrono", "crc 3.0.0", "crossbeam-queue", @@ -6671,7 +6641,7 @@ checksum = "4bc22b1c2267be6d1769c6d787936201341f03c915456ed8a8db8d40d665215f" dependencies = [ "async-trait", "bytes 0.5.6", - "bytes 1.1.0", + "bytes 1.2.1", "fnv", "futures", "http 0.1.21", @@ -6821,7 +6791,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a8325f63a7d4774dd041e363b2409ed1c5cbbd0f867795e661df066b2b0a581" dependencies = [ "autocfg 1.1.0", - "bytes 1.1.0", + "bytes 1.2.1", "libc", "memchr", "mio", @@ -6906,7 +6876,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53474327ae5e166530d17f2d956afcb4f8a004de581b3cae10f12006bc8163e3" dependencies = [ "async-stream", - "bytes 1.1.0", + "bytes 1.2.1", "futures-core", "tokio", "tokio-stream", @@ -6945,7 +6915,7 @@ version = "0.6.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.1", "futures-core", "futures-io", "futures-sink", @@ -6961,7 +6931,7 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cc463cd8deddc3770d20f9852143d50bf6094e640b485cb2e189a2099085ff45" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.1", "futures-core", "futures-sink", "pin-project-lite 0.2.9", @@ -6976,7 +6946,7 @@ source = "git+https://github.com/shuttle-hq/tokiotest-httpserver?branch=feat/bod dependencies = [ "async-trait", "futures", - "hyper 0.14.20", + "hyper", "lazy_static", "queues", "serde_json", @@ -7018,13 +6988,13 @@ dependencies = [ "async-trait", "axum", "base64 0.13.0", - "bytes 1.1.0", + "bytes 1.2.1", "futures-core", "futures-util", "h2", "http 0.2.8", - "http-body 0.4.5", - "hyper 0.14.20", + "http-body", + "hyper", "hyper-timeout", "percent-encoding", "pin-project", @@ -7080,11 +7050,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aba3f3efabf7fb41fae8534fc20a817013dd1c12cb45441efb6c82e6556b4cd8" dependencies = [ "bitflags", - "bytes 1.1.0", + "bytes 1.2.1", "futures-core", "futures-util", "http 0.2.8", - "http-body 0.4.5", + "http-body", "http-range-header", "pin-project-lite 0.2.9", "tower-layer", @@ -7099,11 +7069,11 @@ checksum = "3c530c8675c1dbf98facee631536fa116b5fb6382d7dd6dc1b118d970eafe3ba" dependencies = [ "base64 0.13.0", "bitflags", - "bytes 1.1.0", + "bytes 1.2.1", "futures-core", "futures-util", "http 0.2.8", - "http-body 0.4.5", + "http-body", "http-range-header", "pin-project-lite 0.2.9", "tower", @@ -7285,7 +7255,7 @@ checksum = "a0b2d8558abd2e276b0a8df5c05a2ec762609344191e5fd23e292c910e9165b5" dependencies = [ "base64 0.13.0", "byteorder", - "bytes 1.1.0", + "bytes 1.2.1", "http 0.2.8", "httparse", "log", @@ -7304,7 +7274,7 @@ checksum = "e27992fd6a8c29ee7eef28fc78349aa244134e10ad447ce3b9f0ac0ed0fa4ce0" dependencies = [ "base64 0.13.0", "byteorder", - "bytes 1.1.0", + "bytes 1.2.1", "http 0.2.8", "httparse", "log", @@ -7623,12 +7593,12 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3cef4e1e9114a4b7f1ac799f16ce71c14de5778500c5450ec6b7b920c55b587e" dependencies = [ - "bytes 1.1.0", + "bytes 1.2.1", "futures-channel", "futures-util", "headers", "http 0.2.8", - "hyper 0.14.20", + "hyper", "log", "mime", "mime_guess", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index f7d062ed7..1c62e8f09 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -22,6 +22,7 @@ uuid = { version = "1.1.2", features = ["v4"] } wasi-common = "2.0.0" wasmtime = "2.0.0" wasmtime-wasi = "2.0.0" +rmp-serde = { version = "1.1.1" } [dependencies.shuttle-common] version = "0.7.0" diff --git a/runtime/src/axum/mod.rs b/runtime/src/axum/mod.rs index 547a00f72..3f7f5592c 100644 --- a/runtime/src/axum/mod.rs +++ b/runtime/src/axum/mod.rs @@ -1,6 +1,6 @@ use std::convert::Infallible; use std::fs::File; -use std::io::{Read, Write}; +use std::io::{BufReader, Read, Write}; use std::net::{Ipv4Addr, SocketAddr}; use std::os::unix::prelude::RawFd; use std::path::Path; @@ -23,6 +23,8 @@ use wasmtime::{Engine, Linker, Module, Store}; use wasmtime_wasi::sync::net::UnixStream as WasiUnixStream; use wasmtime_wasi::{WasiCtx, WasiCtxBuilder}; +extern crate rmp_serde as rmps; + pub struct AxumWasm { router: std::sync::Mutex>, port: Mutex>, @@ -169,12 +171,19 @@ impl RouterInner { &mut self, req: hyper::Request, ) -> Result, Infallible> { - let (mut host, client) = UnixStream::pair().unwrap(); - let client = WasiUnixStream::from_cap_std(client); + let (mut parts_stream, parts_client) = UnixStream::pair().unwrap(); + let (mut body_stream, body_client) = UnixStream::pair().unwrap(); + + let parts_client = WasiUnixStream::from_cap_std(parts_client); + let body_client = WasiUnixStream::from_cap_std(body_client); + + self.store + .data_mut() + .insert_file(3, Box::new(parts_client), FileCaps::all()); self.store .data_mut() - .insert_file(3, Box::new(client), FileCaps::all()); + .insert_file(4, Box::new(body_client), FileCaps::all()); let (parts, body) = req.into_parts(); @@ -182,13 +191,13 @@ impl RouterInner { let request_rmp = RequestWrapper::from(parts).into_rmp(); // write request parts - host.write_all(&request_rmp).unwrap(); - host.write(&[0]).unwrap(); + parts_stream.write_all(&request_rmp).unwrap(); // write body - host.write_all(hyper::body::to_bytes(body).await.unwrap().as_ref()) + body_stream + .write_all(hyper::body::to_bytes(body).await.unwrap().as_ref()) .unwrap(); - host.write(&[0]).unwrap(); + body_stream.write(&[0]).unwrap(); println!("calling inner Router"); self.linker @@ -196,32 +205,22 @@ impl RouterInner { .unwrap() .into_func() .unwrap() - .typed::(&self.store) + .typed::<(RawFd, RawFd), (), _>(&self.store) .unwrap() - .call(&mut self.store, 3) + .call(&mut self.store, (3, 4)) .unwrap(); // read response parts from host - let mut res_buf = Vec::new(); - let mut c_buf: [u8; 1] = [0; 1]; - loop { - host.read(&mut c_buf).unwrap(); - if c_buf[0] == 0 { - break; - } else { - res_buf.push(c_buf[0]); - } - } + let reader = BufReader::new(&mut parts_stream); - println!("received response parts buf: \n{:?}", &res_buf); // deserialize response parts from rust messagepack - let wrapper = ResponseWrapper::from_rmp(res_buf); + let wrapper: ResponseWrapper = rmps::from_read(reader).unwrap(); // read response body from wasm router let mut body_buf = Vec::new(); let mut c_buf: [u8; 1] = [0; 1]; loop { - host.read(&mut c_buf).unwrap(); + body_stream.read(&mut c_buf).unwrap(); if c_buf[0] == 0 { break; } else { diff --git a/tmp/axum-wasm/Cargo.toml b/tmp/axum-wasm/Cargo.toml index 7e48765e8..da0bf3b1a 100644 --- a/tmp/axum-wasm/Cargo.toml +++ b/tmp/axum-wasm/Cargo.toml @@ -11,6 +11,7 @@ axum = { version = "0.6.0-rc.4", default-features = false } futures-executor = "0.3.21" http = "0.2.7" tower-service = "0.3.1" +rmp-serde = { version = "1.1.1" } [dependencies.shuttle-axum-utils] path = "../utils" diff --git a/tmp/axum-wasm/src/lib.rs b/tmp/axum-wasm/src/lib.rs index 508c582a4..c24419e9a 100644 --- a/tmp/axum-wasm/src/lib.rs +++ b/tmp/axum-wasm/src/lib.rs @@ -4,10 +4,13 @@ use futures_executor::block_on; use http::Request; use shuttle_axum_utils::{RequestWrapper, ResponseWrapper}; use std::fs::File; +use std::io::BufReader; use std::io::{Read, Write}; use std::os::wasi::prelude::*; use tower_service::Service; +extern crate rmp_serde as rmps; + pub fn handle_request(req: Request) -> Response where B: HttpBody + Send + 'static, @@ -39,32 +42,25 @@ async fn goodbye() -> &'static str { #[no_mangle] #[allow(non_snake_case)] -pub extern "C" fn __SHUTTLE_Axum_call(fd: RawFd) { - println!("inner handler awoken; interacting with fd={fd}"); +pub extern "C" fn __SHUTTLE_Axum_call(fd_3: RawFd, fd_4: RawFd) { + println!("inner handler awoken; interacting with fd={fd_3},{fd_4}"); - let mut f = unsafe { File::from_raw_fd(fd) }; + // file descriptor 3 for reading and writing http parts + let mut parts_fd = unsafe { File::from_raw_fd(fd_3) }; - // read request parts from host - let mut req_buf = Vec::new(); - let mut c_buf: [u8; 1] = [0; 1]; - loop { - f.read(&mut c_buf).unwrap(); - if c_buf[0] == 0 { - break; - } else { - req_buf.push(c_buf[0]); - } - } + let reader = BufReader::new(&mut parts_fd); // deserialize request parts from rust messagepack - let wrapper = RequestWrapper::from_rmp(req_buf); + let wrapper: RequestWrapper = rmps::from_read(reader).unwrap(); - // TODO: deduplicate this? Is it the correct strategy to send two separate files? - // read request body from host + // file descriptor 4 for reading and writing http parts + let mut body_fd = unsafe { File::from_raw_fd(fd_4) }; + + // read body from host let mut body_buf = Vec::new(); let mut c_buf: [u8; 1] = [0; 1]; loop { - f.read(&mut c_buf).unwrap(); + body_fd.read(&mut c_buf).unwrap(); if c_buf[0] == 0 { break; } else { @@ -72,7 +68,6 @@ pub extern "C" fn __SHUTTLE_Axum_call(fd: RawFd) { } } - // set body in the wrapper (Body::Empty if buf is empty), consume wrapper and return Request let request = wrapper.set_body(body_buf).into_request(); println!("inner router received request: {:?}", &request); @@ -80,17 +75,15 @@ pub extern "C" fn __SHUTTLE_Axum_call(fd: RawFd) { let (parts, mut body) = res.into_parts(); - println!("sending parts: {:?}", parts.headers.clone()); // wrap and serialize response parts as rmp let response_parts = ResponseWrapper::from(parts).into_rmp(); - println!("sending response parts: {:?}", &response_parts); // write response parts - f.write_all(&response_parts).unwrap(); - f.write(&[0]).unwrap(); + parts_fd.write_all(&response_parts).unwrap(); // write body - f.write_all(block_on(body.data()).unwrap().unwrap().as_ref()) - .unwrap(); - f.write(&[0]).unwrap(); + if let Some(body) = block_on(body.data()) { + body_fd.write_all(body.unwrap().as_ref()).unwrap(); + } + body_fd.write(&[0]).unwrap(); } From 510e6003b6b4cca9169cb0377a15fb34590bea9e Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Wed, 16 Nov 2022 14:05:09 +0100 Subject: [PATCH 13/19] feat: add axum-wasm to runtime cli --- runtime/Cargo.toml | 2 +- runtime/README.md | 29 +++++++++++++++++++---------- runtime/src/args.rs | 6 +++++- runtime/src/main.rs | 21 ++++++++++----------- 4 files changed, 35 insertions(+), 23 deletions(-) diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 1c62e8f09..f978940c7 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -43,5 +43,5 @@ path = "../tmp/utils" optional = true [features] -axum = ["shuttle-axum-utils"] +shuttle-axum = ["shuttle-axum-utils"] diff --git a/runtime/README.md b/runtime/README.md index 0d9da1bad..f38778c44 100644 --- a/runtime/README.md +++ b/runtime/README.md @@ -9,7 +9,7 @@ $ DISCORD_TOKEN=xxx cargo run In another terminal: ``` bash -grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic", "path": "runtime/axum.wasm"}' localhost:6001 runtime.Runtime/Load +grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic", "path": "runtime/bot.wasm"}' localhost:6001 runtime.Runtime/Load grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic"}' localhost:6001 runtime.Runtime/Start grpcurl -plaintext -import-path ../proto -proto runtime.proto localhost:6001 runtime.Runtime/SubscribeLogs ``` @@ -25,23 +25,32 @@ make axum Run the test: ```bash -cargo test --all-features axum -- --nocapture +cargo test axum --features shuttle-axum -- --nocapture ``` Load and run: ```bash -cargo run --all-features +cargo run --features shuttle-axum -- --axum --provisioner-address http://localhost:8000 ``` In another terminal: ``` bash -grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic", "path": "runtime/axum.wasm"}' localhost:8000 runtime.Runtime/Load -grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic"}' localhost:8000 runtime.Runtime/Start -# grpcurl -plaintext -import-path ../proto -proto runtime.proto localhost:8000 runtime.Runtime/SubscribeLogs +# a full, absolute path from home was needed for me in the load request +grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic", "path": "runtime/axum.wasm"}' localhost:6001 runtime.Runtime/Load + +grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic"}' localhost:6001 runtime.Runtime/Start + +# grpcurl -plaintext -import-path ../proto -proto runtime.proto localhost:6001 runtime.Runtime/SubscribeLogs ``` +Curl the service: +```bash +curl localhost:7002/hello + +curl localhost:7002/goodbye +``` ## shuttle-legacy Load and run an .so library that implements `shuttle_service::Service`. @@ -63,16 +72,16 @@ Or directly (this is the path hardcoded in `deployer::start`): # first, make sure the shuttle-runtime binary is built cargo build # then -/home//target/debug/shuttle-runtime --legacy --provisioner-address http://localhost:8000 +/home//target/debug/shuttle-runtime --legacy --provisioner-address http://localhost:6001 ``` Pass the path to `deployer::start` Then in another shell, load a `.so` file and start it up: ``` bash -grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic", "path": "examples/rocket/hello-world/target/debug/libhello_world.so"}' localhost:8000 runtime.Runtime/Load -grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic"}' localhost:8000 runtime.Runtime/Start -grpcurl -plaintext -import-path ../proto -proto runtime.proto localhost:8000 runtime.Runtime/SubscribeLogs +grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic", "path": "examples/rocket/hello-world/target/debug/libhello_world.so"}' localhost:6001 runtime.Runtime/Load +grpcurl -plaintext -import-path ../proto -proto runtime.proto -d '{"service_name": "Tonic"}' localhost:6001 runtime.Runtime/Start +grpcurl -plaintext -import-path ../proto -proto runtime.proto localhost:6001 runtime.Runtime/SubscribeLogs ``` ## Running the tests diff --git a/runtime/src/args.rs b/runtime/src/args.rs index 084304364..2e123f4e8 100644 --- a/runtime/src/args.rs +++ b/runtime/src/args.rs @@ -8,6 +8,10 @@ pub struct Args { pub provisioner_address: Endpoint, /// Is this runtime for a legacy service - #[clap(long)] + #[clap(long, conflicts_with("axum"))] pub legacy: bool, + + /// Is this runtime for an axum-wasm service + #[clap(long, conflicts_with("legacy"))] + pub axum: bool, } diff --git a/runtime/src/main.rs b/runtime/src/main.rs index 4a4c5bba5..f7a5dc597 100644 --- a/runtime/src/main.rs +++ b/runtime/src/main.rs @@ -28,24 +28,23 @@ async fn main() { let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 6001); - // let provisioner_address = args.provisioner_address; + let provisioner_address = args.provisioner_address; let mut server_builder = Server::builder().http2_keepalive_interval(Some(Duration::from_secs(60))); - let router = { + let router = if args.legacy { + let legacy = Legacy::new(provisioner_address); + let svc = RuntimeServer::new(legacy); + server_builder.add_service(svc) + } else if args.axum { let axum = AxumWasm::new(); let svc = RuntimeServer::new(axum); server_builder.add_service(svc) + } else { + let next = Next::new(); + let svc = RuntimeServer::new(next); + server_builder.add_service(svc) }; - // if args.legacy { - // let legacy = Legacy::new(provisioner_address); - // let svc = RuntimeServer::new(legacy); - // server_builder.add_service(svc) - // } else { - // let next = Next::new(); - // let svc = RuntimeServer::new(next); - // server_builder.add_service(svc) - // }; router.serve(addr).await.unwrap(); } From 984d0a294fc2f95ea72950ce82a9e1bd9304ec7a Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Wed, 16 Nov 2022 14:19:48 +0100 Subject: [PATCH 14/19] refactor: remove unused method --- tmp/utils/src/lib.rs | 20 +++----------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/tmp/utils/src/lib.rs b/tmp/utils/src/lib.rs index eacc6625b..727ce56f5 100644 --- a/tmp/utils/src/lib.rs +++ b/tmp/utils/src/lib.rs @@ -1,6 +1,6 @@ use hyper::body::Body; use hyper::http::{HeaderMap, Method, Request, Response, StatusCode, Uri, Version}; -use rmps::{Deserializer, Serializer}; +use rmps::Serializer; use serde::{Deserialize, Serialize}; extern crate rmp_serde as rmps; @@ -45,13 +45,6 @@ impl RequestWrapper { buf } - /// Deserialize a RequestWrapper from the Rust MessagePack data format - pub fn from_rmp(buf: Vec) -> Self { - let mut de = Deserializer::new(buf.as_slice()); - - Deserialize::deserialize(&mut de).unwrap() - } - /// Set the request body pub fn set_body(mut self, buf: Vec) -> Self { self.body = buf; @@ -109,13 +102,6 @@ impl ResponseWrapper { buf } - /// Deserialize a ResponseWrapper from the Rust MessagePack data format - pub fn from_rmp(buf: Vec) -> Self { - let mut de = Deserializer::new(buf.as_slice()); - - Deserialize::deserialize(&mut de).unwrap() - } - /// Set the response body pub fn set_body(mut self, buf: Vec) -> Self { self.body = buf; @@ -154,7 +140,7 @@ mod test { let (parts, _) = request.into_parts(); let rmp = RequestWrapper::from(parts).into_rmp(); - let back = RequestWrapper::from_rmp(rmp); + let back: RequestWrapper = rmps::from_slice(&rmp).unwrap(); assert_eq!( back.headers.get("test").unwrap(), @@ -180,7 +166,7 @@ mod test { let (parts, _) = response.into_parts(); let rmp = ResponseWrapper::from(parts).into_rmp(); - let back = ResponseWrapper::from_rmp(rmp); + let back: ResponseWrapper = rmps::from_slice(&rmp).unwrap(); assert_eq!( back.headers.get("test").unwrap(), From 9f559c309be332e9a6439d48c6686c857c4478a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Oddbj=C3=B8rn=20Gr=C3=B8dem?= <29732646+oddgrd@users.noreply.github.com> Date: Wed, 16 Nov 2022 16:13:16 +0100 Subject: [PATCH 15/19] refactor: typo Co-authored-by: Pieter --- tmp/axum-wasm/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tmp/axum-wasm/src/lib.rs b/tmp/axum-wasm/src/lib.rs index c24419e9a..0d16aff21 100644 --- a/tmp/axum-wasm/src/lib.rs +++ b/tmp/axum-wasm/src/lib.rs @@ -53,7 +53,7 @@ pub extern "C" fn __SHUTTLE_Axum_call(fd_3: RawFd, fd_4: RawFd) { // deserialize request parts from rust messagepack let wrapper: RequestWrapper = rmps::from_read(reader).unwrap(); - // file descriptor 4 for reading and writing http parts + // file descriptor 4 for reading and writing http body let mut body_fd = unsafe { File::from_raw_fd(fd_4) }; // read body from host From 6450ed5586322c704b9cf8a4234cc1f3f7e6d298 Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Thu, 17 Nov 2022 10:10:11 +0100 Subject: [PATCH 16/19] refactor: comments, clean up wrappers --- runtime/src/axum/mod.rs | 8 ++++--- tmp/axum-wasm/Cargo.toml | 2 ++ tmp/axum-wasm/src/lib.rs | 10 ++++++--- tmp/utils/src/lib.rs | 48 +++++++++++++--------------------------- 4 files changed, 29 insertions(+), 39 deletions(-) diff --git a/runtime/src/axum/mod.rs b/runtime/src/axum/mod.rs index 3f7f5592c..242fe293f 100644 --- a/runtime/src/axum/mod.rs +++ b/runtime/src/axum/mod.rs @@ -197,6 +197,7 @@ impl RouterInner { body_stream .write_all(hyper::body::to_bytes(body).await.unwrap().as_ref()) .unwrap(); + // signal to the receiver that end of file has been reached body_stream.write(&[0]).unwrap(); println!("calling inner Router"); @@ -228,10 +229,11 @@ impl RouterInner { } } - // set body in the wrapper (Body::Empty if buf is empty), consume wrapper and return Response - let response = wrapper.set_body(body_buf).into_response(); + let response: Response = wrapper + .into_response_builder() + .body(body_buf.into()) + .unwrap(); - // consume the wrapper and return response Ok(response) } } diff --git a/tmp/axum-wasm/Cargo.toml b/tmp/axum-wasm/Cargo.toml index da0bf3b1a..b85ae3d34 100644 --- a/tmp/axum-wasm/Cargo.toml +++ b/tmp/axum-wasm/Cargo.toml @@ -7,6 +7,8 @@ edition = "2021" crate-type = [ "cdylib" ] [dependencies] +# most axum features can be enabled, but "tokio" and "ws" depend on socket2 +# via "hyper/tcp" which is not compatible with wasi axum = { version = "0.6.0-rc.4", default-features = false } futures-executor = "0.3.21" http = "0.2.7" diff --git a/tmp/axum-wasm/src/lib.rs b/tmp/axum-wasm/src/lib.rs index 0d16aff21..487cbae29 100644 --- a/tmp/axum-wasm/src/lib.rs +++ b/tmp/axum-wasm/src/lib.rs @@ -1,4 +1,4 @@ -use axum::body::HttpBody; +use axum::body::{Body, HttpBody}; use axum::{response::Response, routing::get, Router}; use futures_executor::block_on; use http::Request; @@ -68,7 +68,10 @@ pub extern "C" fn __SHUTTLE_Axum_call(fd_3: RawFd, fd_4: RawFd) { } } - let request = wrapper.set_body(body_buf).into_request(); + let request: Request = wrapper + .into_request_builder() + .body(body_buf.into()) + .unwrap(); println!("inner router received request: {:?}", &request); let res = handle_request(request); @@ -81,9 +84,10 @@ pub extern "C" fn __SHUTTLE_Axum_call(fd_3: RawFd, fd_4: RawFd) { // write response parts parts_fd.write_all(&response_parts).unwrap(); - // write body + // write body if there is one if let Some(body) = block_on(body.data()) { body_fd.write_all(body.unwrap().as_ref()).unwrap(); } + // signal to the reader that end of file has been reached body_fd.write(&[0]).unwrap(); } diff --git a/tmp/utils/src/lib.rs b/tmp/utils/src/lib.rs index 727ce56f5..9a3512c9b 100644 --- a/tmp/utils/src/lib.rs +++ b/tmp/utils/src/lib.rs @@ -1,4 +1,3 @@ -use hyper::body::Body; use hyper::http::{HeaderMap, Method, Request, Response, StatusCode, Uri, Version}; use rmps::Serializer; use serde::{Deserialize, Serialize}; @@ -19,9 +18,6 @@ pub struct RequestWrapper { #[serde(with = "http_serde::header_map")] pub headers: HeaderMap, - - #[serde(skip)] - pub body: Vec, } impl From for RequestWrapper { @@ -31,7 +27,6 @@ impl From for RequestWrapper { uri: parts.uri, version: parts.version, headers: parts.headers, - body: Vec::new(), } } } @@ -45,22 +40,17 @@ impl RequestWrapper { buf } - /// Set the request body - pub fn set_body(mut self, buf: Vec) -> Self { - self.body = buf; - self - } - - /// Consume wrapper and return Request - pub fn into_request(self) -> Request { + /// Consume the wrapper and return a request builder with `Parts` set + pub fn into_request_builder(self) -> hyper::http::request::Builder { let mut request = Request::builder() .method(self.method) .version(self.version) - .uri(self.uri) - .body(Body::from(self.body)) - .unwrap(); + .uri(self.uri); - request.headers_mut().extend(self.headers.into_iter()); + request + .headers_mut() + .unwrap() + .extend(self.headers.into_iter()); request } @@ -77,9 +67,6 @@ pub struct ResponseWrapper { #[serde(with = "http_serde::header_map")] pub headers: HeaderMap, - - #[serde(skip)] - pub body: Vec, } impl From for ResponseWrapper { @@ -88,7 +75,6 @@ impl From for ResponseWrapper { status: parts.status, version: parts.version, headers: parts.headers, - body: Vec::new(), } } } @@ -102,21 +88,16 @@ impl ResponseWrapper { buf } - /// Set the response body - pub fn set_body(mut self, buf: Vec) -> Self { - self.body = buf; - self - } - - /// Consume wrapper and return Response - pub fn into_response(self) -> Response { + /// Consume the wrapper and return a response builder with `Parts` set + pub fn into_response_builder(self) -> hyper::http::response::Builder { let mut response = Response::builder() .status(self.status) - .version(self.version) - .body(Body::from(self.body)) - .unwrap(); + .version(self.version); - response.headers_mut().extend(self.headers.into_iter()); + response + .headers_mut() + .unwrap() + .extend(self.headers.into_iter()); response } @@ -125,6 +106,7 @@ impl ResponseWrapper { #[cfg(test)] mod test { use super::*; + use hyper::body::Body; use hyper::http::HeaderValue; #[test] From 3b4ab3b96137f9c03a50a7693f2616589ca932fa Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Thu, 17 Nov 2022 10:34:09 +0100 Subject: [PATCH 17/19] refactor: move axum-wasm utils to shuttle-common --- Cargo.lock | 14 +++----------- common/Cargo.toml | 4 ++++ common/src/lib.rs | 2 ++ tmp/utils/src/lib.rs => common/src/wasm.rs | 0 runtime/Cargo.toml | 6 +----- runtime/src/axum/mod.rs | 2 +- tmp/axum-wasm/Cargo.toml | 8 +++++--- tmp/axum-wasm/src/lib.rs | 2 +- tmp/utils/Cargo.toml | 16 ---------------- 9 files changed, 17 insertions(+), 37 deletions(-) rename tmp/utils/src/lib.rs => common/src/wasm.rs (100%) delete mode 100644 tmp/utils/Cargo.toml diff --git a/Cargo.lock b/Cargo.lock index e55afbd4f..014ded2a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5722,16 +5722,6 @@ dependencies = [ "dirs", ] -[[package]] -name = "shuttle-axum-utils" -version = "0.1.0" -dependencies = [ - "http-serde", - "hyper", - "rmp-serde", - "serde", -] - [[package]] name = "shuttle-codegen" version = "0.7.0" @@ -5752,7 +5742,10 @@ dependencies = [ "comfy-table", "crossterm", "http 0.2.8", + "http-serde", + "hyper", "once_cell", + "rmp-serde", "rustrict", "serde", "serde_json", @@ -5892,7 +5885,6 @@ dependencies = [ "hyper", "rmp-serde", "serenity", - "shuttle-axum-utils", "shuttle-common", "shuttle-proto", "shuttle-service", diff --git a/common/Cargo.toml b/common/Cargo.toml index 8b10d111a..c54827c57 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -11,7 +11,10 @@ chrono = { version = "0.4.22", features = ["serde"] } comfy-table = { version = "6.1.0", optional = true } crossterm = { version = "0.25.0", optional = true } http = { version = "0.2.8", optional = true } +http-serde = { version = "1.1.2", optional = true } +hyper = { version = "0.14.23", optional = true } once_cell = "1.13.1" +rmp-serde = { version = "1.1.1", optional = true } rustrict = "0.5.0" serde = { version = "1.0.143", features = ["derive"] } serde_json = { version = "1.0.85", optional = true } @@ -24,3 +27,4 @@ default = ["models"] models = ["display", "serde_json", "http"] display = ["comfy-table", "crossterm"] +axum-wasm = ["http-serde", "hyper", "rmp-serde"] diff --git a/common/src/lib.rs b/common/src/lib.rs index bd7e50afc..6d44adf67 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -4,6 +4,8 @@ pub mod log; #[cfg(feature = "models")] pub mod models; pub mod project; +#[cfg(feature = "axum-wasm")] +pub mod wasm; use serde::{Deserialize, Serialize}; use uuid::Uuid; diff --git a/tmp/utils/src/lib.rs b/common/src/wasm.rs similarity index 100% rename from tmp/utils/src/lib.rs rename to common/src/wasm.rs diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index f978940c7..5a8b10b0e 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -38,10 +38,6 @@ default-features = false features = ["loader"] path = "../service" -[dependencies.shuttle-axum-utils] -path = "../tmp/utils" -optional = true - [features] -shuttle-axum = ["shuttle-axum-utils"] +shuttle-axum = ["shuttle-common/axum-wasm"] diff --git a/runtime/src/axum/mod.rs b/runtime/src/axum/mod.rs index 242fe293f..f0b5fd9eb 100644 --- a/runtime/src/axum/mod.rs +++ b/runtime/src/axum/mod.rs @@ -10,7 +10,7 @@ use async_trait::async_trait; use cap_std::os::unix::net::UnixStream; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response}; -use shuttle_axum_utils::{RequestWrapper, ResponseWrapper}; +use shuttle_common::wasm::{RequestWrapper, ResponseWrapper}; use shuttle_proto::runtime::runtime_server::Runtime; use shuttle_proto::runtime::{ self, LoadRequest, LoadResponse, StartRequest, StartResponse, SubscribeLogsRequest, diff --git a/tmp/axum-wasm/Cargo.toml b/tmp/axum-wasm/Cargo.toml index b85ae3d34..52c5418cf 100644 --- a/tmp/axum-wasm/Cargo.toml +++ b/tmp/axum-wasm/Cargo.toml @@ -15,6 +15,8 @@ http = "0.2.7" tower-service = "0.3.1" rmp-serde = { version = "1.1.1" } -[dependencies.shuttle-axum-utils] -path = "../utils" -version = "0.1.0" +[dependencies.shuttle-common] +path = "../../common" +default-features = false +features = ["axum-wasm"] +version = "0.7.0" diff --git a/tmp/axum-wasm/src/lib.rs b/tmp/axum-wasm/src/lib.rs index 487cbae29..e0a8b6542 100644 --- a/tmp/axum-wasm/src/lib.rs +++ b/tmp/axum-wasm/src/lib.rs @@ -2,7 +2,7 @@ use axum::body::{Body, HttpBody}; use axum::{response::Response, routing::get, Router}; use futures_executor::block_on; use http::Request; -use shuttle_axum_utils::{RequestWrapper, ResponseWrapper}; +use shuttle_common::wasm::{RequestWrapper, ResponseWrapper}; use std::fs::File; use std::io::BufReader; use std::io::{Read, Write}; diff --git a/tmp/utils/Cargo.toml b/tmp/utils/Cargo.toml deleted file mode 100644 index 970ab63e6..000000000 --- a/tmp/utils/Cargo.toml +++ /dev/null @@ -1,16 +0,0 @@ -[package] -name = "shuttle-axum-utils" -version = "0.1.0" -edition = "2021" -description = "Utilities for serializing requests to and from rust messagepack" -[lib] - -[dependencies] -http-serde = { version = "1.1.2" } -hyper = "0.14.23" -rmp-serde = { version = "1.1.1" } -serde = { version = "1.0.137", features = [ "derive" ] } - -[dev-dependencies] -# unit tests have to call an async function to wrap req/res -futures-executor = "0.3.21" From cf5c9253682ec62cdec1ccb357d11748e2aeab46 Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Mon, 21 Nov 2022 14:45:16 +0100 Subject: [PATCH 18/19] refactor: fmt --- runtime/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 5a8b10b0e..5ea9e6003 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -10,9 +10,10 @@ anyhow = "1.0.62" async-trait = "0.1.58" cap-std = "0.26.0" clap ={ version = "4.0.18", features = ["derive"] } +hyper = { version = "0.14.23", features = ["full"] } +rmp-serde = { version = "1.1.1" } serenity = { version = "0.11.5", default-features = false, features = ["client", "gateway", "rustls_backend", "model"] } thiserror = "1.0.37" -hyper = { version = "0.14.23", features = ["full"] } tokio = { version = "=1.20.1", features = ["full"] } tokio-stream = "0.1.11" tonic = "0.8.2" @@ -22,7 +23,6 @@ uuid = { version = "1.1.2", features = ["v4"] } wasi-common = "2.0.0" wasmtime = "2.0.0" wasmtime-wasi = "2.0.0" -rmp-serde = { version = "1.1.1" } [dependencies.shuttle-common] version = "0.7.0" From aae15ddcfea8b253402f459d3f8f30ddd9f65c60 Mon Sep 17 00:00:00 2001 From: oddgrd <29732646+oddgrd@users.noreply.github.com> Date: Mon, 21 Nov 2022 14:52:49 +0100 Subject: [PATCH 19/19] refactor: clippy --- runtime/src/axum/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runtime/src/axum/mod.rs b/runtime/src/axum/mod.rs index f0b5fd9eb..40aaa06db 100644 --- a/runtime/src/axum/mod.rs +++ b/runtime/src/axum/mod.rs @@ -198,7 +198,7 @@ impl RouterInner { .write_all(hyper::body::to_bytes(body).await.unwrap().as_ref()) .unwrap(); // signal to the receiver that end of file has been reached - body_stream.write(&[0]).unwrap(); + body_stream.write_all(&[0]).unwrap(); println!("calling inner Router"); self.linker @@ -221,7 +221,7 @@ impl RouterInner { let mut body_buf = Vec::new(); let mut c_buf: [u8; 1] = [0; 1]; loop { - body_stream.read(&mut c_buf).unwrap(); + body_stream.read_exact(&mut c_buf).unwrap(); if c_buf[0] == 0 { break; } else {