Skip to content

Commit

Permalink
Feat: cleanup fds and tmp (#586)
Browse files Browse the repository at this point in the history
* feat: remove redundant body fd and /tmp

* feat: update codegen

* refactor: remove tmp from workspace exclude

* ci: next patch and wasi target
  • Loading branch information
oddgrd authored Jan 18, 2023
1 parent 325b90f commit 35c0660
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 104 deletions.
3 changes: 2 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ commands:
shuttle-service = { path = "$PWD/service" }
shuttle-aws-rds = { path = "$PWD/resources/aws-rds" }
shuttle-persist = { path = "$PWD/resources/persist" }
shuttle-next = { path = "$PWD/next" }
shuttle-shared-db = { path = "$PWD/resources/shared-db" }
shuttle-secrets = { path = "$PWD/resources/secrets" }
shuttle-static-folder = { path = "$PWD/resources/static-folder" }
Expand All @@ -97,7 +98,7 @@ commands:
- run:
name: Install Rust
command: |
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --target add wasm32-wasi
sudo apt update && sudo apt install -y libssl1.1
install-protoc:
steps:
Expand Down
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ exclude = [
"resources/persist",
"resources/secrets",
"resources/shared-db",
"tmp",
"resources/static-folder"
]

Expand Down
17 changes: 5 additions & 12 deletions codegen/src/next/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,17 +265,14 @@ pub(crate) fn wasi_bindings(app: App) -> proc_macro2::TokenStream {
pub extern "C" fn __SHUTTLE_Axum_call(
logs_fd: std::os::wasi::prelude::RawFd,
parts_fd: std::os::wasi::prelude::RawFd,
body_read_fd: std::os::wasi::prelude::RawFd,
body_write_fd: std::os::wasi::prelude::RawFd,
body_fd: std::os::wasi::prelude::RawFd,
) {
use shuttle_next::body::{Body, HttpBody};
use shuttle_next::tracing_prelude::*;
use shuttle_next::Logger;
use std::io::{Read, Write};
use std::os::wasi::io::FromRawFd;

println!("inner handler awoken; interacting with fd={},{},{},{}", logs_fd, parts_fd, body_read_fd, body_write_fd);

// file descriptor 2 for writing logs to
let logs_fd = unsafe { std::fs::File::from_raw_fd(logs_fd) };

Expand All @@ -291,10 +288,10 @@ pub(crate) fn wasi_bindings(app: App) -> proc_macro2::TokenStream {
// deserialize request parts from rust messagepack
let wrapper: shuttle_next::RequestWrapper = shuttle_next::from_read(reader).unwrap();

// file descriptor 4 for reading http body into wasm
let mut body_read_stream = unsafe { std::fs::File::from_raw_fd(body_read_fd) };
// file descriptor 4 for reading and writing http body
let mut body_stream = unsafe { std::fs::File::from_raw_fd(body_fd) };

let mut reader = std::io::BufReader::new(&mut body_read_stream);
let mut reader = std::io::BufReader::new(&mut body_stream);
let mut body_buf = Vec::new();
reader.read_to_end(&mut body_buf).unwrap();

Expand All @@ -305,7 +302,6 @@ pub(crate) fn wasi_bindings(app: App) -> proc_macro2::TokenStream {
.body(shuttle_next::body::boxed(body))
.unwrap();

println!("inner router received request: {:?}", &request);
let res = shuttle_next::block_on(__app(request));

let (parts, mut body) = res.into_parts();
Expand All @@ -316,12 +312,9 @@ pub(crate) fn wasi_bindings(app: App) -> proc_macro2::TokenStream {
// write response parts
parts_fd.write_all(&response_parts).unwrap();

// file descriptor 5 for writing http body to host
let mut body_write_stream = unsafe { std::fs::File::from_raw_fd(body_write_fd) };

// write body if there is one
if let Some(body) = shuttle_next::block_on(body.data()) {
body_write_stream.write_all(body.unwrap().as_ref()).unwrap();
body_stream.write_all(body.unwrap().as_ref()).unwrap();
}
}
)
Expand Down
55 changes: 32 additions & 23 deletions runtime/src/axum/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::convert::Infallible;
use std::io::{BufReader, Read, Write};
use std::net::SocketAddr;
use std::net::{Shutdown, SocketAddr};
use std::ops::DerefMut;
use std::os::unix::prelude::RawFd;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -35,8 +35,7 @@ extern crate rmp_serde as rmps;

const LOGS_FD: u32 = 20;
const PARTS_FD: u32 = 3;
const BODY_WRITE_FD: u32 = 4;
const BODY_READ_FD: u32 = 5;
const BODY_FD: u32 = 4;

pub struct AxumWasm {
router: Mutex<Option<Router>>,
Expand Down Expand Up @@ -244,15 +243,12 @@ impl Router {
UnixStream::pair().context("failed to open logs unixstream")?;
let (mut parts_stream, parts_client) =
UnixStream::pair().context("failed to open parts unixstream")?;
let (mut body_write_stream, body_write_client) =
let (mut body_stream, body_client) =
UnixStream::pair().context("failed to open body write unixstream")?;
let (body_read_stream, body_read_client) =
UnixStream::pair().context("failed to open body read unixstream")?;

let logs_client = WasiUnixStream::from_cap_std(logs_client);
let parts_client = WasiUnixStream::from_cap_std(parts_client);
let body_write_client = WasiUnixStream::from_cap_std(body_write_client);
let body_read_client = WasiUnixStream::from_cap_std(body_read_client);
let body_client = WasiUnixStream::from_cap_std(body_client);

store
.data_mut()
Expand All @@ -263,10 +259,7 @@ impl Router {
.insert_file(PARTS_FD, Box::new(parts_client), FileCaps::all());
store
.data_mut()
.insert_file(BODY_WRITE_FD, Box::new(body_write_client), FileCaps::all());
store
.data_mut()
.insert_file(BODY_READ_FD, Box::new(body_read_client), FileCaps::all());
.insert_file(BODY_FD, Box::new(body_client), FileCaps::all());

tokio::task::spawn_blocking(move || {
let mut iter = logs_stream.bytes().filter_map(Result::ok);
Expand Down Expand Up @@ -308,12 +301,14 @@ impl Router {
.context("failed to concatenate request body buffers")?;

// Write body to wasm
body_write_stream
body_stream
.write_all(body_bytes.as_ref())
.context("failed to write body to wasm")?;

// Drop stream to signal EOF
drop(body_write_stream);
// Shut down the write part of the stream to signal EOF
body_stream
.shutdown(Shutdown::Write)
.expect("failed to shut down body write half");

// Call our function in wasm, telling it to route the request we've written to it
// and write back a response
Expand All @@ -323,15 +318,10 @@ impl Router {
.expect("wasm module should be loaded and the router function should be available")
.into_func()
.expect("router function should be a function")
.typed::<(RawFd, RawFd, RawFd, RawFd), ()>(&store)?
.typed::<(RawFd, RawFd, RawFd), ()>(&store)?
.call(
&mut store,
(
LOGS_FD as i32,
PARTS_FD as i32,
BODY_WRITE_FD as i32,
BODY_READ_FD as i32,
),
(LOGS_FD as i32, PARTS_FD as i32, BODY_FD as i32),
)?;

// Read response parts from wasm
Expand All @@ -342,7 +332,7 @@ impl Router {
rmps::from_read(reader).context("failed to deserialize response parts")?;

// Read response body from wasm, convert it to a Stream and pass it to hyper
let reader = BufReader::new(body_read_stream);
let reader = BufReader::new(body_stream);
let stream = futures::stream::iter(reader.bytes()).try_chunks(2);
let body = hyper::Body::wrap_stream(stream);

Expand Down Expand Up @@ -409,17 +399,36 @@ async fn run_until_stopped(

#[cfg(test)]
pub mod tests {
use std::process::Command;

use super::*;
use hyper::{http::HeaderValue, Method, Request, StatusCode, Version};
use uuid::Uuid;

// Compile axum wasm module
fn compile_module() {
Command::new("cargo")
.arg("build")
.arg("--target")
.arg("wasm32-wasi")
.current_dir("tests/resources/axum-wasm-expanded")
.spawn()
.unwrap()
.wait()
.unwrap();
}

#[tokio::test]
async fn axum() {
compile_module();

let router = RouterBuilder::new()
.unwrap()
.src("axum.wasm")
.src("tests/resources/axum-wasm-expanded/target/wasm32-wasi/debug/shuttle_axum_expanded.wasm")
.build()
.unwrap();

let id = Uuid::default().as_bytes().to_vec();
let (tx, mut rx) = mpsc::channel(1);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
[workspace]

[package]
name = "shuttle-axum-expanded"
version = "0.1.0"
Expand All @@ -8,5 +10,5 @@ crate-type = [ "cdylib" ]

[dependencies]
futures = "0.3.25"
shuttle-next = "0.8.0"
shuttle-next = { path = "../../../../next" }
tracing = "0.1.37"
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,14 @@ async fn uppercase(body: BodyStream) -> impl IntoResponse {
pub extern "C" fn __SHUTTLE_Axum_call(
logs_fd: std::os::wasi::prelude::RawFd,
parts_fd: std::os::wasi::prelude::RawFd,
body_read_fd: std::os::wasi::prelude::RawFd,
body_write_fd: std::os::wasi::prelude::RawFd,
body_fd: std::os::wasi::prelude::RawFd,
) {
use shuttle_next::body::{Body, HttpBody};
use shuttle_next::tracing_prelude::*;
use shuttle_next::Logger;
use std::io::{Read, Write};
use std::os::wasi::io::FromRawFd;

println!("inner handler awoken; interacting with fd={logs_fd},{parts_fd},{body_read_fd},{body_write_fd}");

// file descriptor 2 for writing logs to
let logs_fd = unsafe { std::fs::File::from_raw_fd(logs_fd) };

Expand All @@ -76,10 +73,10 @@ pub extern "C" fn __SHUTTLE_Axum_call(
// deserialize request parts from rust messagepack
let wrapper: shuttle_next::RequestWrapper = shuttle_next::from_read(reader).unwrap();

// file descriptor 4 for reading http body into wasm
let mut body_read_stream = unsafe { std::fs::File::from_raw_fd(body_read_fd) };
// file descriptor 4 for reading and writing http body
let mut body_stream = unsafe { std::fs::File::from_raw_fd(body_fd) };

let mut reader = std::io::BufReader::new(&mut body_read_stream);
let mut reader = std::io::BufReader::new(&mut body_stream);
let mut body_buf = Vec::new();
reader.read_to_end(&mut body_buf).unwrap();

Expand All @@ -90,7 +87,6 @@ pub extern "C" fn __SHUTTLE_Axum_call(
.body(shuttle_next::body::boxed(body))
.unwrap();

println!("inner router received request: {:?}", &request);
let res = handle_request(request);

let (parts, mut body) = res.into_parts();
Expand All @@ -101,11 +97,8 @@ pub extern "C" fn __SHUTTLE_Axum_call(
// write response parts
parts_fd.write_all(&response_parts).unwrap();

// file descriptor 5 for writing http body to host
let mut body_write_stream = unsafe { std::fs::File::from_raw_fd(body_write_fd) };

// write body if there is one
if let Some(body) = shuttle_next::block_on(body.data()) {
body_write_stream.write_all(body.unwrap().as_ref()).unwrap();
body_stream.write_all(body.unwrap().as_ref()).unwrap();
}
}
15 changes: 0 additions & 15 deletions tmp/axum-wasm/Cargo.toml

This file was deleted.

39 changes: 0 additions & 39 deletions tmp/axum-wasm/src/lib.rs

This file was deleted.

0 comments on commit 35c0660

Please sign in to comment.