Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: cleanup fds and tmp #586

Merged
merged 5 commits into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ exclude = [
"resources/persist",
"resources/secrets",
"resources/shared-db",
"tmp",
"resources/static-folder"
]

Expand Down
17 changes: 5 additions & 12 deletions codegen/src/next/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,17 +265,14 @@ pub(crate) fn wasi_bindings(app: App) -> proc_macro2::TokenStream {
pub extern "C" fn __SHUTTLE_Axum_call(
logs_fd: std::os::wasi::prelude::RawFd,
parts_fd: std::os::wasi::prelude::RawFd,
body_read_fd: std::os::wasi::prelude::RawFd,
body_write_fd: std::os::wasi::prelude::RawFd,
body_fd: std::os::wasi::prelude::RawFd,
) {
use shuttle_next::body::{Body, HttpBody};
use shuttle_next::tracing_prelude::*;
use shuttle_next::Logger;
use std::io::{Read, Write};
use std::os::wasi::io::FromRawFd;

println!("inner handler awoken; interacting with fd={},{},{},{}", logs_fd, parts_fd, body_read_fd, body_write_fd);
oddgrd marked this conversation as resolved.
Show resolved Hide resolved

// file descriptor 2 for writing logs to
let logs_fd = unsafe { std::fs::File::from_raw_fd(logs_fd) };

Expand All @@ -291,10 +288,10 @@ pub(crate) fn wasi_bindings(app: App) -> proc_macro2::TokenStream {
// deserialize request parts from rust messagepack
let wrapper: shuttle_next::RequestWrapper = shuttle_next::from_read(reader).unwrap();

// file descriptor 4 for reading http body into wasm
let mut body_read_stream = unsafe { std::fs::File::from_raw_fd(body_read_fd) };
// file descriptor 4 for reading and writing http body
let mut body_stream = unsafe { std::fs::File::from_raw_fd(body_fd) };

let mut reader = std::io::BufReader::new(&mut body_read_stream);
let mut reader = std::io::BufReader::new(&mut body_stream);
let mut body_buf = Vec::new();
reader.read_to_end(&mut body_buf).unwrap();

Expand All @@ -305,7 +302,6 @@ pub(crate) fn wasi_bindings(app: App) -> proc_macro2::TokenStream {
.body(shuttle_next::body::boxed(body))
.unwrap();

println!("inner router received request: {:?}", &request);
let res = shuttle_next::block_on(__app(request));

let (parts, mut body) = res.into_parts();
Expand All @@ -316,12 +312,9 @@ pub(crate) fn wasi_bindings(app: App) -> proc_macro2::TokenStream {
// write response parts
parts_fd.write_all(&response_parts).unwrap();

// file descriptor 5 for writing http body to host
let mut body_write_stream = unsafe { std::fs::File::from_raw_fd(body_write_fd) };

// write body if there is one
if let Some(body) = shuttle_next::block_on(body.data()) {
body_write_stream.write_all(body.unwrap().as_ref()).unwrap();
body_stream.write_all(body.unwrap().as_ref()).unwrap();
}
}
)
Expand Down
55 changes: 32 additions & 23 deletions runtime/src/axum/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::convert::Infallible;
use std::io::{BufReader, Read, Write};
use std::net::SocketAddr;
use std::net::{Shutdown, SocketAddr};
use std::ops::DerefMut;
use std::os::unix::prelude::RawFd;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -35,8 +35,7 @@ extern crate rmp_serde as rmps;

const LOGS_FD: u32 = 20;
const PARTS_FD: u32 = 3;
const BODY_WRITE_FD: u32 = 4;
const BODY_READ_FD: u32 = 5;
const BODY_FD: u32 = 4;

pub struct AxumWasm {
router: Mutex<Option<Router>>,
Expand Down Expand Up @@ -244,15 +243,12 @@ impl Router {
UnixStream::pair().context("failed to open logs unixstream")?;
let (mut parts_stream, parts_client) =
UnixStream::pair().context("failed to open parts unixstream")?;
let (mut body_write_stream, body_write_client) =
let (mut body_stream, body_client) =
UnixStream::pair().context("failed to open body write unixstream")?;
let (body_read_stream, body_read_client) =
UnixStream::pair().context("failed to open body read unixstream")?;

let logs_client = WasiUnixStream::from_cap_std(logs_client);
let parts_client = WasiUnixStream::from_cap_std(parts_client);
let body_write_client = WasiUnixStream::from_cap_std(body_write_client);
let body_read_client = WasiUnixStream::from_cap_std(body_read_client);
let body_client = WasiUnixStream::from_cap_std(body_client);

store
.data_mut()
Expand All @@ -263,10 +259,7 @@ impl Router {
.insert_file(PARTS_FD, Box::new(parts_client), FileCaps::all());
store
.data_mut()
.insert_file(BODY_WRITE_FD, Box::new(body_write_client), FileCaps::all());
store
.data_mut()
.insert_file(BODY_READ_FD, Box::new(body_read_client), FileCaps::all());
.insert_file(BODY_FD, Box::new(body_client), FileCaps::all());

tokio::task::spawn_blocking(move || {
let mut iter = logs_stream.bytes().filter_map(Result::ok);
Expand Down Expand Up @@ -308,12 +301,14 @@ impl Router {
.context("failed to concatenate request body buffers")?;

// Write body to wasm
body_write_stream
body_stream
.write_all(body_bytes.as_ref())
.context("failed to write body to wasm")?;

// Drop stream to signal EOF
drop(body_write_stream);
// Shut down the write part of the stream to signal EOF
body_stream
.shutdown(Shutdown::Write)
.expect("failed to shut down body write half");

// Call our function in wasm, telling it to route the request we've written to it
// and write back a response
Expand All @@ -323,15 +318,10 @@ impl Router {
.expect("wasm module should be loaded and the router function should be available")
.into_func()
.expect("router function should be a function")
.typed::<(RawFd, RawFd, RawFd, RawFd), ()>(&store)?
.typed::<(RawFd, RawFd, RawFd), ()>(&store)?
.call(
&mut store,
(
LOGS_FD as i32,
PARTS_FD as i32,
BODY_WRITE_FD as i32,
BODY_READ_FD as i32,
),
(LOGS_FD as i32, PARTS_FD as i32, BODY_FD as i32),
)?;

// Read response parts from wasm
Expand All @@ -342,7 +332,7 @@ impl Router {
rmps::from_read(reader).context("failed to deserialize response parts")?;

// Read response body from wasm, convert it to a Stream and pass it to hyper
let reader = BufReader::new(body_read_stream);
let reader = BufReader::new(body_stream);
let stream = futures::stream::iter(reader.bytes()).try_chunks(2);
let body = hyper::Body::wrap_stream(stream);

Expand Down Expand Up @@ -409,17 +399,36 @@ async fn run_until_stopped(

#[cfg(test)]
pub mod tests {
use std::process::Command;

use super::*;
use hyper::{http::HeaderValue, Method, Request, StatusCode, Version};
use uuid::Uuid;

// Compile axum wasm module
fn compile_module() {
Command::new("cargo")
.arg("build")
.arg("--target")
.arg("wasm32-wasi")
.current_dir("tests/resources/axum-wasm-expanded")
.spawn()
.unwrap()
.wait()
.unwrap();
}

#[tokio::test]
async fn axum() {
compile_module();

let router = RouterBuilder::new()
.unwrap()
.src("axum.wasm")
.src("tests/resources/axum-wasm-expanded/target/wasm32-wasi/debug/shuttle_axum_expanded.wasm")
.build()
.unwrap();

let id = Uuid::default().as_bytes().to_vec();
let (tx, mut rx) = mpsc::channel(1);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
[workspace]

[package]
name = "shuttle-axum-expanded"
version = "0.1.0"
Expand All @@ -8,5 +10,5 @@ crate-type = [ "cdylib" ]

[dependencies]
futures = "0.3.25"
shuttle-next = "0.8.0"
shuttle-next = { path = "../../../../next" }
tracing = "0.1.37"
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,14 @@ async fn uppercase(body: BodyStream) -> impl IntoResponse {
pub extern "C" fn __SHUTTLE_Axum_call(
logs_fd: std::os::wasi::prelude::RawFd,
parts_fd: std::os::wasi::prelude::RawFd,
body_read_fd: std::os::wasi::prelude::RawFd,
body_write_fd: std::os::wasi::prelude::RawFd,
body_fd: std::os::wasi::prelude::RawFd,
) {
use shuttle_next::body::{Body, HttpBody};
use shuttle_next::tracing_prelude::*;
use shuttle_next::Logger;
use std::io::{Read, Write};
use std::os::wasi::io::FromRawFd;

println!("inner handler awoken; interacting with fd={logs_fd},{parts_fd},{body_read_fd},{body_write_fd}");

// file descriptor 2 for writing logs to
let logs_fd = unsafe { std::fs::File::from_raw_fd(logs_fd) };

Expand All @@ -76,10 +73,10 @@ pub extern "C" fn __SHUTTLE_Axum_call(
// deserialize request parts from rust messagepack
let wrapper: shuttle_next::RequestWrapper = shuttle_next::from_read(reader).unwrap();

// file descriptor 4 for reading http body into wasm
let mut body_read_stream = unsafe { std::fs::File::from_raw_fd(body_read_fd) };
// file descriptor 4 for reading and writing http body
let mut body_stream = unsafe { std::fs::File::from_raw_fd(body_fd) };

let mut reader = std::io::BufReader::new(&mut body_read_stream);
let mut reader = std::io::BufReader::new(&mut body_stream);
let mut body_buf = Vec::new();
reader.read_to_end(&mut body_buf).unwrap();

Expand All @@ -90,7 +87,6 @@ pub extern "C" fn __SHUTTLE_Axum_call(
.body(shuttle_next::body::boxed(body))
.unwrap();

println!("inner router received request: {:?}", &request);
let res = handle_request(request);

let (parts, mut body) = res.into_parts();
Expand All @@ -101,11 +97,8 @@ pub extern "C" fn __SHUTTLE_Axum_call(
// write response parts
parts_fd.write_all(&response_parts).unwrap();

// file descriptor 5 for writing http body to host
let mut body_write_stream = unsafe { std::fs::File::from_raw_fd(body_write_fd) };

// write body if there is one
if let Some(body) = shuttle_next::block_on(body.data()) {
body_write_stream.write_all(body.unwrap().as_ref()).unwrap();
body_stream.write_all(body.unwrap().as_ref()).unwrap();
}
}
15 changes: 0 additions & 15 deletions tmp/axum-wasm/Cargo.toml

This file was deleted.

39 changes: 0 additions & 39 deletions tmp/axum-wasm/src/lib.rs

This file was deleted.