diff --git a/Cargo.lock b/Cargo.lock index bcafc3d30..01d04921a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6192,6 +6192,7 @@ dependencies = [ "anyhow", "async-trait", "axum", + "cap-std", "chrono", "comfy-table", "crossterm", @@ -6206,6 +6207,7 @@ dependencies = [ "serde_json", "strum", "tracing", + "tracing-subscriber", "uuid 1.2.2", ] diff --git a/codegen/src/next/mod.rs b/codegen/src/next/mod.rs index 4de2e98ef..f91c19928 100644 --- a/codegen/src/next/mod.rs +++ b/codegen/src/next/mod.rs @@ -262,18 +262,28 @@ pub(crate) fn wasi_bindings(app: App) -> proc_macro2::TokenStream { #[no_mangle] #[allow(non_snake_case)] pub extern "C" fn __SHUTTLE_Axum_call( - fd_3: std::os::wasi::prelude::RawFd, - fd_4: std::os::wasi::prelude::RawFd, - fd_5: std::os::wasi::prelude::RawFd, + logs_fd: std::os::wasi::prelude::RawFd, + parts_fd: std::os::wasi::prelude::RawFd, + body_read_fd: std::os::wasi::prelude::RawFd, + body_write_fd: std::os::wasi::prelude::RawFd, ) { use axum::body::HttpBody; + use shuttle_common::wasm::Logger; use std::io::{Read, Write}; use std::os::wasi::io::FromRawFd; + use tracing_subscriber::prelude::*; - println!("inner handler awoken; interacting with fd={},{},{}", fd_3, fd_4, fd_5); + println!("inner handler awoken; interacting with fd={},{},{},{}", logs_fd, parts_fd, body_read_fd, body_write_fd); + + // file descriptor 2 for writing logs to + let logs_fd = unsafe { std::fs::File::from_raw_fd(logs_fd) }; + + tracing_subscriber::registry() + .with(Logger::new(logs_fd)) + .init(); // this sets the subscriber as the global default and also adds a compatibility layer for capturing `log::Record`s // file descriptor 3 for reading and writing http parts - let mut parts_fd = unsafe { std::fs::File::from_raw_fd(fd_3) }; + let mut parts_fd = unsafe { std::fs::File::from_raw_fd(parts_fd) }; let reader = std::io::BufReader::new(&mut parts_fd); @@ -281,7 +291,7 @@ pub(crate) fn wasi_bindings(app: App) -> proc_macro2::TokenStream { let wrapper: shuttle_common::wasm::RequestWrapper = rmp_serde::from_read(reader).unwrap(); // file descriptor 4 for reading http body into wasm - let mut body_read_stream = unsafe { std::fs::File::from_raw_fd(fd_4) }; + let mut body_read_stream = unsafe { std::fs::File::from_raw_fd(body_read_fd) }; let mut reader = std::io::BufReader::new(&mut body_read_stream); let mut body_buf = Vec::new(); @@ -306,7 +316,7 @@ pub(crate) fn wasi_bindings(app: App) -> proc_macro2::TokenStream { parts_fd.write_all(&response_parts).unwrap(); // file descriptor 5 for writing http body to host - let mut body_write_stream = unsafe { std::fs::File::from_raw_fd(fd_5) }; + let mut body_write_stream = unsafe { std::fs::File::from_raw_fd(body_write_fd) }; // write body if there is one if let Some(body) = futures_executor::block_on(body.data()) { diff --git a/common/Cargo.toml b/common/Cargo.toml index 4aeff55fb..f90991bd9 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -23,13 +23,16 @@ serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true, optional = true } strum = { version = "0.24.1", features = ["derive"] } tracing = { workspace = true } +tracing-subscriber = { workspace = true, optional = true } uuid = { workspace = true, features = ["v4", "serde"] } [dev-dependencies] +cap-std = "1.0.2" hyper = "0.14.3" [features] backend = ["async-trait", "axum"] display = ["comfy-table", "crossterm"] -axum-wasm = ["http-serde", "http", "rmp-serde"] +tracing = ["serde_json"] +axum-wasm = ["http-serde", "http", "rmp-serde", "tracing", "tracing-subscriber"] models = ["anyhow", "async-trait", "display", "http", "reqwest", "serde_json"] diff --git a/common/src/lib.rs b/common/src/lib.rs index a7a3a66fc..f57e647b2 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -7,6 +7,8 @@ pub mod log; pub mod models; pub mod project; pub mod storage_manager; +#[cfg(feature = "tracing")] +pub mod tracing; #[cfg(feature = "axum-wasm")] pub mod wasm; diff --git a/common/src/tracing.rs b/common/src/tracing.rs new file mode 100644 index 000000000..2532262fe --- /dev/null +++ b/common/src/tracing.rs @@ -0,0 +1,53 @@ +use serde_json::json; +use tracing::field::Visit; + +// Boilerplate for extracting the fields from the event +#[derive(Default)] +pub struct JsonVisitor { + pub fields: serde_json::Map, + pub target: Option, + pub file: Option, + pub line: Option, +} + +impl JsonVisitor { + /// Ignores log metadata as it is included in the other LogItem fields (target, file, line...) + fn filter_insert(&mut self, field: &tracing::field::Field, value: serde_json::Value) { + match field.name() { + "log.line" => self.line = value.as_u64().map(|u| u as u32), + "log.target" => self.target = value.as_str().map(ToOwned::to_owned), + "log.file" => self.file = value.as_str().map(ToOwned::to_owned), + "log.module_path" => {} + name => { + self.fields.insert(name.to_string(), json!(value)); + } + } + } +} +impl Visit for JsonVisitor { + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + self.filter_insert(field, json!(value)); + } + fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { + self.filter_insert(field, json!(value)); + } + fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { + self.filter_insert(field, json!(value)); + } + fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { + self.filter_insert(field, json!(value)); + } + fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { + self.filter_insert(field, json!(value)); + } + fn record_error( + &mut self, + field: &tracing::field::Field, + value: &(dyn std::error::Error + 'static), + ) { + self.filter_insert(field, json!(value.to_string())); + } + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + self.filter_insert(field, json!(format!("{value:?}"))); + } +} diff --git a/common/src/wasm.rs b/common/src/wasm.rs index 46d111d04..82da1d2ee 100644 --- a/common/src/wasm.rs +++ b/common/src/wasm.rs @@ -1,6 +1,17 @@ -use http::{self, HeaderMap, Method, Request, Response, StatusCode, Uri, Version}; +use std::{ + io::Write, + slice::IterMut, + sync::{Arc, Mutex}, +}; + +use chrono::{DateTime, NaiveDateTime, Utc}; +use http::{HeaderMap, Method, Request, Response, StatusCode, Uri, Version}; use rmps::Serializer; use serde::{Deserialize, Serialize}; +use tracing::Subscriber; +use tracing_subscriber::Layer; + +use crate::tracing::JsonVisitor; extern crate rmp_serde as rmps; @@ -103,11 +114,269 @@ impl ResponseWrapper { } } +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct Log { + pub level: Level, + pub timestamp: DateTime, + pub file: String, + pub line: u32, + pub target: String, + pub fields: Vec, +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum Level { + Trace, + Debug, + Info, + Warn, + Error, +} + +impl From<&tracing::Level> for Level { + fn from(level: &tracing::Level) -> Self { + match *level { + tracing::Level::TRACE => Self::Trace, + tracing::Level::DEBUG => Self::Debug, + tracing::Level::INFO => Self::Info, + tracing::Level::WARN => Self::Warn, + tracing::Level::ERROR => Self::Error, + } + } +} + +impl Log { + pub fn into_bytes(self) -> Vec { + let mut buf = Vec::new(); + + buf.add(self); + + buf + } +} + +/// Like [slice::fill_with] but allows unwrapping of [Option]s +trait TryFillWith: Sized { + fn try_fill_with>(self, iter: &mut I) -> Option<()>; +} + +impl<'a> TryFillWith for IterMut<'a, u8> { + fn try_fill_with>(self, iter: &mut I) -> Option<()> { + for el in self { + *el = iter.next()?; + } + + Some(()) + } +} + +/// Convert a structure to and from bytes (array of u8) +pub trait Bytesable: Sized { + /// Add self to bytes vec + fn append_bytes(self, buf: &mut Vec); + + /// Get self from bytes vec + fn from_bytes>(iter: &mut I) -> Option; +} + +macro_rules! impl_bytesable { + ($($int:ident),*) => { + $(impl Bytesable for $int { + fn append_bytes(self, buf: &mut Vec) { + buf.extend_from_slice(&self.to_le_bytes()); + } + + fn from_bytes>(iter: &mut I) -> Option { + let mut buf = [0; $int::BITS as usize / 8]; + buf.iter_mut().try_fill_with(iter)?; + + Some($int::from_le_bytes(buf)) + } + })* + }; +} + +// Never implement for usize because it could map to u64 in the runtime and a u32 inside wasm +// This will cause the deserialization step to fail +impl_bytesable!(u32, u64, i64); + +impl Bytesable for String { + fn append_bytes(self, buf: &mut Vec) { + buf.add(self.len() as u64); + buf.extend_from_slice(self.as_bytes()); + } + + fn from_bytes>(iter: &mut I) -> Option { + let length: u64 = iter.get()?; + + let mut vec = vec![0; length as usize]; + vec.iter_mut().try_fill_with(iter)?; + + String::from_utf8(vec).ok() + } +} + +impl Bytesable for Level { + fn append_bytes(self, buf: &mut Vec) { + buf.add(self as u32); + } + + fn from_bytes>(iter: &mut I) -> Option { + let i: u32 = iter.get()?; + + let res = match i { + 0 => Self::Trace, + 1 => Self::Debug, + 2 => Self::Info, + 3 => Self::Warn, + 4 => Self::Error, + _ => Self::Trace, + }; + + Some(res) + } +} + +impl Bytesable for DateTime { + fn append_bytes(self, buf: &mut Vec) { + buf.add(self.naive_utc().timestamp_millis()); + } + + fn from_bytes>(iter: &mut I) -> Option { + let millis: i64 = iter.get()?; + + let datetime = NaiveDateTime::from_timestamp_millis(millis)?; + + Some(Self::from_utc(datetime, Utc)) + } +} + +impl Bytesable for Vec { + fn append_bytes(self, buf: &mut Vec) { + buf.add(self.len() as u64); + buf.extend_from_slice(&self); + } + + fn from_bytes>(iter: &mut I) -> Option { + let length: u64 = iter.get()?; + + let mut vec = vec![0; length as usize]; + vec.iter_mut().try_fill_with(iter)?; + + Some(vec) + } +} + +impl Bytesable for Log { + fn append_bytes(self, buf: &mut Vec) { + buf.add(self.level); + buf.add(self.timestamp); + buf.add(self.file); + buf.add(self.line); + buf.add(self.target); + buf.add(self.fields); + } + + // These should be in the same order as they appear in [Self::append_bytes] + fn from_bytes>(iter: &mut I) -> Option { + Some(Self { + level: iter.get()?, + timestamp: iter.get()?, + file: iter.get()?, + line: iter.get()?, + target: iter.get()?, + fields: iter.get()?, + }) + } +} + +/// Trait to make it easier to add a bytable type to a data source +trait BytesableAppendExt { + fn add(&mut self, i: B); +} + +impl BytesableAppendExt for Vec { + fn add(&mut self, i: B) { + i.append_bytes(self); + } +} + +/// Trait to make it easier to get a bytable type from a data source +trait BytesableFromExt { + fn get(&mut self) -> Option; +} + +impl> BytesableFromExt for I { + fn get(&mut self) -> Option { + B::from_bytes(self) + } +} + +pub struct Logger { + writer: Arc>, +} + +impl Logger { + pub fn new(writer: W) -> Self { + Self { + writer: Arc::new(Mutex::new(writer)), + } + } +} + +impl Layer for Logger +where + S: Subscriber, + W: Write + 'static, +{ + fn on_event( + &self, + event: &tracing::Event<'_>, + _ctx: tracing_subscriber::layer::Context<'_, S>, + ) { + let datetime = Utc::now(); + + let item = { + let metadata = event.metadata(); + let mut visitor = JsonVisitor::default(); + + event.record(&mut visitor); + + Log { + level: metadata.level().into(), + timestamp: datetime, + file: visitor + .file + .or_else(|| metadata.file().map(str::to_string)) + .unwrap_or_default(), + line: visitor.line.or_else(|| metadata.line()).unwrap_or_default(), + target: visitor + .target + .unwrap_or_else(|| metadata.target().to_string()), + fields: serde_json::to_vec(&visitor.fields).unwrap(), + } + }; + + let _ = self + .writer + .lock() + .expect("to get lock on writer") + .write(&item.into_bytes()) + .expect("sending log should succeed"); + } +} + #[cfg(test)] mod test { + use cap_std::os::unix::net::UnixStream; + use serde_json::json; + use std::io::{Read, Write}; + use super::*; + use chrono::SubsecRound; use http::HeaderValue; use hyper::body::Body; + use tracing_subscriber::prelude::*; #[test] fn request_roundtrip() { @@ -157,4 +426,96 @@ mod test { assert_eq!(back.status, StatusCode::NOT_MODIFIED); assert_eq!(back.version, Version::HTTP_11); } + + #[test] + fn log_roundtrip() { + let log = Log { + level: Level::Debug, + timestamp: Utc::now().trunc_subsecs(3), + file: "main.rs".to_string(), + line: 5, + target: "crate::main".to_string(), + fields: serde_json::to_vec(&json!({"message": "Hello"})).unwrap(), + }; + + let mut buf = Vec::new(); + buf.add(log.clone()); + let mut iter = buf.into_iter(); + + let actual = iter.get::(); + + assert_eq!(log, actual.unwrap()); + assert_eq!(iter.next(), None); + } + + #[test] + fn logs_over_socket() { + let (mut tx, rx) = UnixStream::pair().unwrap(); + let log1 = Log { + level: Level::Debug, + timestamp: Utc::now().trunc_subsecs(3), + file: "lib.rs".to_string(), + line: 9, + target: "crate::lib".to_string(), + fields: serde_json::to_vec(&json!({"message": "starting"})).unwrap(), + }; + let log2 = Log { + level: Level::Debug, + timestamp: Utc::now().trunc_subsecs(3), + file: Default::default(), + line: Default::default(), + target: Default::default(), + fields: Default::default(), + }; + + let _ = tx.write(&log1.clone().into_bytes()).unwrap(); + let _ = tx.write(&log2.clone().into_bytes()).unwrap(); + + let mut rx = rx.bytes().filter_map(Result::ok); + + let actual = rx.get::().unwrap(); + assert_eq!(log1, actual); + + let actual = rx.get::().unwrap(); + assert_eq!(log2, actual); + + // Make sure the closed channel (end) is handled correctly + drop(tx); + assert_eq!(rx.get::(), None); + } + + #[test] + fn logging() { + let (tx, rx) = UnixStream::pair().unwrap(); + let mut rx = rx.bytes().filter_map(Result::ok); + + let logger = Logger::new(tx); + let to_tuple = |log: Option| { + let log = log.unwrap(); + let fields: serde_json::Map = + serde_json::from_slice(&log.fields).unwrap(); + + let message = fields["message"].as_str().unwrap().to_owned(); + + (message, log.level) + }; + + tracing_subscriber::registry().with(logger).init(); + + tracing::debug!("this is"); + tracing::info!("hi"); + tracing::warn!("from"); + tracing::error!("logger"); + + assert_eq!( + to_tuple(rx.get::()), + ("this is".to_string(), Level::Debug) + ); + assert_eq!(to_tuple(rx.get::()), ("hi".to_string(), Level::Info)); + assert_eq!(to_tuple(rx.get::()), ("from".to_string(), Level::Warn)); + assert_eq!( + to_tuple(rx.get::()), + ("logger".to_string(), Level::Error) + ); + } } diff --git a/deployer/src/deployment/deploy_layer.rs b/deployer/src/deployment/deploy_layer.rs index ca7df03e7..afe795a72 100644 --- a/deployer/src/deployment/deploy_layer.rs +++ b/deployer/src/deployment/deploy_layer.rs @@ -21,7 +21,7 @@ use chrono::{DateTime, Utc}; use serde_json::json; -use shuttle_common::STATE_MESSAGE; +use shuttle_common::{tracing::JsonVisitor, STATE_MESSAGE}; use shuttle_proto::runtime; use std::{net::SocketAddr, str::FromStr, time::SystemTime}; use tracing::{error, field::Visit, span, warn, Metadata, Subscriber}; @@ -218,36 +218,17 @@ where event.record(&mut visitor); let metadata = event.metadata(); - // Extract details from log::Log interface which is different from tracing - let target = if let Some(target) = visitor.0.remove("log.target") { - target.as_str().unwrap_or_default().to_string() - } else { - metadata.target().to_string() - }; - - let line = if let Some(line) = visitor.0.remove("log.line") { - line.as_u64().and_then(|u| u32::try_from(u).ok()) - } else { - metadata.line() - }; - - let file = if let Some(file) = visitor.0.remove("log.file") { - file.as_str().map(str::to_string) - } else { - metadata.file().map(str::to_string) - }; - - visitor.0.remove("log.module_path"); - self.recorder.record(Log { id: details.id, state: details.state, level: metadata.level().into(), timestamp: Utc::now(), - file, - line, - target, - fields: serde_json::Value::Object(visitor.0), + file: visitor.file.or_else(|| metadata.file().map(str::to_string)), + line: visitor.line.or_else(|| metadata.line()), + target: visitor + .target + .unwrap_or_else(|| metadata.target().to_string()), + fields: serde_json::Value::Object(visitor.fields), r#type: LogType::Event, address: None, }); @@ -355,39 +336,6 @@ impl Visit for NewStateVisitor { } } -#[derive(Default)] -struct JsonVisitor(serde_json::Map); - -impl Visit for JsonVisitor { - fn record_str(&mut self, field: &tracing::field::Field, value: &str) { - self.0.insert(field.name().to_string(), json!(value)); - } - fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { - self.0.insert(field.name().to_string(), json!(value)); - } - fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { - self.0.insert(field.name().to_string(), json!(value)); - } - fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { - self.0.insert(field.name().to_string(), json!(value)); - } - fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { - self.0.insert(field.name().to_string(), json!(value)); - } - fn record_error( - &mut self, - field: &tracing::field::Field, - value: &(dyn std::error::Error + 'static), - ) { - self.0 - .insert(field.name().to_string(), json!(value.to_string())); - } - fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { - self.0 - .insert(field.name().to_string(), json!(format!("{value:?}"))); - } -} - #[cfg(test)] mod tests { use std::{ diff --git a/proto/src/lib.rs b/proto/src/lib.rs index 73726c2e8..ae78a8487 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -204,6 +204,41 @@ pub mod runtime { } } + impl From for LogItem { + fn from(log: shuttle_common::wasm::Log) -> Self { + let file = if log.file.is_empty() { + None + } else { + Some(log.file) + }; + + let line = if log.line == 0 { None } else { Some(log.line) }; + + Self { + id: Default::default(), + timestamp: Some(Timestamp::from(SystemTime::from(log.timestamp))), + state: LogState::Running as i32, + level: LogLevel::from(log.level) as i32, + file, + line, + target: log.target, + fields: log.fields, + } + } + } + + impl From for LogLevel { + fn from(level: shuttle_common::wasm::Level) -> Self { + match level { + shuttle_common::wasm::Level::Trace => Self::Trace, + shuttle_common::wasm::Level::Debug => Self::Debug, + shuttle_common::wasm::Level::Info => Self::Info, + shuttle_common::wasm::Level::Warn => Self::Warn, + shuttle_common::wasm::Level::Error => Self::Error, + } + } + } + pub async fn start( binary_bytes: &[u8], wasm: bool, diff --git a/runtime/src/axum/mod.rs b/runtime/src/axum/mod.rs index 7a94889e9..967764270 100644 --- a/runtime/src/axum/mod.rs +++ b/runtime/src/axum/mod.rs @@ -5,6 +5,7 @@ use std::ops::DerefMut; use std::os::unix::prelude::RawFd; use std::path::{Path, PathBuf}; use std::str::FromStr; +use std::sync::Mutex; use anyhow::Context; use async_trait::async_trait; @@ -13,13 +14,14 @@ use futures::TryStreamExt; use hyper::body::HttpBody; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response}; -use shuttle_common::wasm::{RequestWrapper, ResponseWrapper}; +use shuttle_common::wasm::{Bytesable, Log, RequestWrapper, ResponseWrapper}; use shuttle_proto::runtime::runtime_server::Runtime; use shuttle_proto::runtime::{ self, LoadRequest, LoadResponse, StartRequest, StartResponse, StopRequest, StopResponse, SubscribeLogsRequest, }; use shuttle_service::ServiceName; +use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::ReceiverStream; use tonic::Status; @@ -31,16 +33,33 @@ use wasmtime_wasi::{WasiCtx, WasiCtxBuilder}; extern crate rmp_serde as rmps; +const LOGS_FD: u32 = 20; +const PARTS_FD: u32 = 3; +const BODY_WRITE_FD: u32 = 4; +const BODY_READ_FD: u32 = 5; + pub struct AxumWasm { - router: std::sync::Mutex>, - kill_tx: std::sync::Mutex>>, + router: Mutex>, + logs_rx: Mutex>>>, + logs_tx: Mutex>>, + kill_tx: Mutex>>, } impl AxumWasm { pub fn new() -> Self { + // Allow about 2^15 = 32k logs of backpressure + // We know the wasm currently handles about 16k requests per second (req / sec) so 16k seems to be a safe number + // As we make performance gains elsewhere this might eventually become the new bottleneck to increase :D + // + // Testing has shown that a number half the req / sec yields poor performance. A number the same as the req / sec + // seems acceptable so going with double the number for some headroom + let (tx, rx) = mpsc::channel(1 << 15); + Self { - router: std::sync::Mutex::new(None), - kill_tx: std::sync::Mutex::new(None), + router: Mutex::new(None), + logs_rx: Mutex::new(Some(rx)), + logs_tx: Mutex::new(tx), + kill_tx: Mutex::new(None), } } } @@ -77,9 +96,16 @@ impl Runtime for AxumWasm { &self, request: tonic::Request, ) -> Result, Status> { - let StartRequest { port, .. } = request.into_inner(); + let StartRequest { + deployment_id, + port, + .. + } = request.into_inner(); + let address = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), port as u16); + let logs_tx = self.logs_tx.lock().unwrap().clone(); + let (kill_tx, kill_rx) = tokio::sync::oneshot::channel(); *self.kill_tx.lock().unwrap() = Some(kill_tx); @@ -92,7 +118,13 @@ impl Runtime for AxumWasm { .context("tried to start a service that was not loaded") .map_err(|err| Status::internal(err.to_string()))?; - tokio::spawn(run_until_stopped(router, address, kill_rx)); + tokio::spawn(run_until_stopped( + router, + deployment_id, + address, + logs_tx, + kill_rx, + )); let message = StartResponse { success: true }; @@ -105,9 +137,13 @@ impl Runtime for AxumWasm { &self, _request: tonic::Request, ) -> Result, Status> { - let (_tx, rx) = mpsc::channel(1); + let logs_rx = self.logs_rx.lock().unwrap().deref_mut().take(); - Ok(tonic::Response::new(ReceiverStream::new(rx))) + if let Some(logs_rx) = logs_rx { + Ok(tonic::Response::new(ReceiverStream::new(logs_rx))) + } else { + Err(Status::internal("logs have already been subscribed to")) + } } async fn stop( @@ -191,7 +227,9 @@ impl Router { /// Send a HTTP request with body to given endpoint on the axum-wasm router and return the response async fn handle_request( &mut self, + deployment_id: Vec, req: hyper::Request, + logs_tx: Sender>, ) -> anyhow::Result> { let wasi = WasiCtxBuilder::new() .inherit_stdio() @@ -202,6 +240,8 @@ impl Router { let mut store = Store::new(&self.engine, wasi); self.linker.module(&mut store, "axum", &self.module)?; + let (logs_stream, logs_client) = + UnixStream::pair().context("failed to open logs unixstream")?; let (mut parts_stream, parts_client) = UnixStream::pair().context("failed to open parts unixstream")?; let (mut body_write_stream, body_write_client) = @@ -209,19 +249,35 @@ impl Router { let (body_read_stream, body_read_client) = UnixStream::pair().context("failed to open body read unixstream")?; + let logs_client = WasiUnixStream::from_cap_std(logs_client); let parts_client = WasiUnixStream::from_cap_std(parts_client); let body_write_client = WasiUnixStream::from_cap_std(body_write_client); let body_read_client = WasiUnixStream::from_cap_std(body_read_client); store .data_mut() - .insert_file(3, Box::new(parts_client), FileCaps::all()); + .insert_file(LOGS_FD, Box::new(logs_client), FileCaps::all()); + + store + .data_mut() + .insert_file(PARTS_FD, Box::new(parts_client), FileCaps::all()); store .data_mut() - .insert_file(4, Box::new(body_write_client), FileCaps::all()); + .insert_file(BODY_WRITE_FD, Box::new(body_write_client), FileCaps::all()); store .data_mut() - .insert_file(5, Box::new(body_read_client), FileCaps::all()); + .insert_file(BODY_READ_FD, Box::new(body_read_client), FileCaps::all()); + + tokio::task::spawn(async move { + let mut iter = logs_stream.bytes().filter_map(Result::ok); + + while let Some(log) = Log::from_bytes(&mut iter) { + let mut log: runtime::LogItem = log.into(); + log.id = deployment_id.clone(); + + logs_tx.send(Ok(log)).await.unwrap(); + } + }); let (parts, body) = req.into_parts(); @@ -261,14 +317,22 @@ impl Router { // Call our function in wasm, telling it to route the request we've written to it // and write back a response - trace!("calling inner Router"); + trace!("calling Router"); self.linker .get(&mut store, "axum", "__SHUTTLE_Axum_call") .expect("wasm module should be loaded and the router function should be available") .into_func() .expect("router function should be a function") - .typed::<(RawFd, RawFd, RawFd), ()>(&store)? - .call(&mut store, (3, 4, 5))?; + .typed::<(RawFd, RawFd, RawFd, RawFd), ()>(&store)? + .call( + &mut store, + ( + LOGS_FD as i32, + PARTS_FD as i32, + BODY_WRITE_FD as i32, + BODY_READ_FD as i32, + ), + )?; // Read response parts from wasm let reader = BufReader::new(&mut parts_stream); @@ -295,25 +359,33 @@ impl Router { /// and a kill receiver for stopping the server. async fn run_until_stopped( router: Router, + deployment_id: Vec, address: SocketAddr, + logs_tx: Sender>, kill_rx: tokio::sync::oneshot::Receiver, ) { let make_service = make_service_fn(move |_conn| { + let deployment_id = deployment_id.clone(); let router = router.clone(); + let logs_tx = logs_tx.clone(); async move { Ok::<_, Infallible>(service_fn(move |req: Request| { + let deployment_id = deployment_id.clone(); let mut router = router.clone(); + let logs_tx = logs_tx.clone(); async move { - Ok::<_, Infallible>(match router.handle_request(req).await { - Ok(res) => res, - Err(err) => { - error!("error sending request: {}", err); - Response::builder() - .status(hyper::http::StatusCode::INTERNAL_SERVER_ERROR) - .body(Body::empty()) - .expect("building request with empty body should not fail") - } - }) + Ok::<_, Infallible>( + match router.handle_request(deployment_id, req, logs_tx).await { + Ok(res) => res, + Err(err) => { + error!("error sending request: {}", err); + Response::builder() + .status(hyper::http::StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::empty()) + .expect("building request with empty body should not fail") + } + }, + ) } })) } @@ -339,6 +411,7 @@ async fn run_until_stopped( pub mod tests { use super::*; use hyper::{http::HeaderValue, Method, Request, StatusCode, Version}; + use uuid::Uuid; #[tokio::test] async fn axum() { @@ -347,6 +420,14 @@ pub mod tests { .src("axum.wasm") .build() .unwrap(); + let id = Uuid::default().as_bytes().to_vec(); + let (tx, mut rx) = mpsc::channel(1); + + tokio::spawn(async move { + while let Some(log) = rx.recv().await { + println!("{log:?}"); + } + }); // GET /hello let request: Request = Request::builder() @@ -356,7 +437,11 @@ pub mod tests { .body(Body::empty()) .unwrap(); - let res = router.clone().handle_request(request).await.unwrap(); + let res = router + .clone() + .handle_request(id.clone(), request, tx.clone()) + .await + .unwrap(); assert_eq!(res.status(), StatusCode::OK); assert_eq!( @@ -379,7 +464,11 @@ pub mod tests { .body(Body::from("Goodbye world body")) .unwrap(); - let res = router.clone().handle_request(request).await.unwrap(); + let res = router + .clone() + .handle_request(id.clone(), request, tx.clone()) + .await + .unwrap(); assert_eq!(res.status(), StatusCode::OK); assert_eq!( @@ -402,7 +491,11 @@ pub mod tests { .body(Body::empty()) .unwrap(); - let res = router.clone().handle_request(request).await.unwrap(); + let res = router + .clone() + .handle_request(id.clone(), request, tx.clone()) + .await + .unwrap(); assert_eq!(res.status(), StatusCode::NOT_FOUND); @@ -415,7 +508,11 @@ pub mod tests { .body("this should be uppercased".into()) .unwrap(); - let res = router.clone().handle_request(request).await.unwrap(); + let res = router + .clone() + .handle_request(id, request, tx) + .await + .unwrap(); assert_eq!(res.status(), StatusCode::OK); assert_eq!( diff --git a/service/Cargo.toml b/service/Cargo.toml index 523a16281..6c0baa322 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -54,6 +54,7 @@ optional = true [dependencies.shuttle-common] workspace = true +features = ["tracing"] [dev-dependencies] portpicker = "0.1.1" diff --git a/service/src/logger.rs b/service/src/logger.rs index 8572aea99..79b2f6a49 100644 --- a/service/src/logger.rs +++ b/service/src/logger.rs @@ -1,8 +1,7 @@ use chrono::Utc; -use serde_json::json; -use shuttle_common::{deployment::State, DeploymentId, LogItem}; +use shuttle_common::{deployment::State, tracing::JsonVisitor, DeploymentId, LogItem}; use tokio::sync::mpsc::UnboundedSender; -use tracing::{field::Visit, Subscriber}; +use tracing::Subscriber; use tracing_subscriber::Layer; pub struct Logger { @@ -51,57 +50,6 @@ where } } -// Boilerplate for extracting the fields from the event -#[derive(Default)] -struct JsonVisitor { - fields: serde_json::Map, - target: Option, - file: Option, - line: Option, -} - -impl JsonVisitor { - /// Ignores log metadata as it is included in the other LogItem fields (target, file, line...) - fn filter_insert(&mut self, field: &tracing::field::Field, value: serde_json::Value) { - match field.name() { - "log.line" => self.line = value.as_u64().map(|u| u as u32), - "log.target" => self.target = value.as_str().map(ToOwned::to_owned), - "log.file" => self.file = value.as_str().map(ToOwned::to_owned), - "log.module_path" => {} - name => { - self.fields.insert(name.to_string(), json!(value)); - } - } - } -} -impl Visit for JsonVisitor { - fn record_str(&mut self, field: &tracing::field::Field, value: &str) { - self.filter_insert(field, json!(value)); - } - fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { - self.filter_insert(field, json!(value)); - } - fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { - self.filter_insert(field, json!(value)); - } - fn record_i64(&mut self, field: &tracing::field::Field, value: i64) { - self.filter_insert(field, json!(value)); - } - fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { - self.filter_insert(field, json!(value)); - } - fn record_error( - &mut self, - field: &tracing::field::Field, - value: &(dyn std::error::Error + 'static), - ) { - self.filter_insert(field, json!(value.to_string())); - } - fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { - self.filter_insert(field, json!(format!("{value:?}"))); - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/tmp/axum-wasm-expanded/Cargo.toml b/tmp/axum-wasm-expanded/Cargo.toml index 23a77e86a..7a36a408a 100644 --- a/tmp/axum-wasm-expanded/Cargo.toml +++ b/tmp/axum-wasm-expanded/Cargo.toml @@ -12,10 +12,11 @@ crate-type = [ "cdylib" ] axum = { version = "0.6.0", default-features = false } futures = "0.3.25" futures-executor = "0.3.21" -futures = "0.3.25" http = "0.2.7" tower-service = "0.3.1" rmp-serde = { version = "1.1.1" } +tracing = "0.1.37" +tracing-subscriber = "0.3.16" [dependencies.shuttle-common] path = "../../common" diff --git a/tmp/axum-wasm-expanded/src/lib.rs b/tmp/axum-wasm-expanded/src/lib.rs index e0801ac9a..a15b022a1 100644 --- a/tmp/axum-wasm-expanded/src/lib.rs +++ b/tmp/axum-wasm-expanded/src/lib.rs @@ -4,6 +4,7 @@ use axum::{ response::{IntoResponse, Response}, }; use futures::TryStreamExt; +use tracing::debug; pub fn handle_request(req: http::Request) -> axum::response::Response { futures_executor::block_on(app(req)) @@ -23,15 +24,18 @@ async fn app(request: http::Request) -> axum::response::Response { } async fn hello() -> &'static str { + debug!("in hello()"); "Hello, World!" } async fn goodbye() -> &'static str { + debug!("in goodbye()"); "Goodbye, World!" } // Map the bytes of the body stream to uppercase and return the stream directly. async fn uppercase(body: BodyStream) -> impl IntoResponse { + debug!("in uppercase()"); let chunk_stream = body.map_ok(|chunk| { chunk .iter() @@ -44,18 +48,28 @@ async fn uppercase(body: BodyStream) -> impl IntoResponse { #[no_mangle] #[allow(non_snake_case)] pub extern "C" fn __SHUTTLE_Axum_call( - fd_3: std::os::wasi::prelude::RawFd, - fd_4: std::os::wasi::prelude::RawFd, - fd_5: std::os::wasi::prelude::RawFd, + logs_fd: std::os::wasi::prelude::RawFd, + parts_fd: std::os::wasi::prelude::RawFd, + body_read_fd: std::os::wasi::prelude::RawFd, + body_write_fd: std::os::wasi::prelude::RawFd, ) { use axum::body::HttpBody; + use shuttle_common::wasm::Logger; use std::io::{Read, Write}; use std::os::wasi::io::FromRawFd; + use tracing_subscriber::prelude::*; - println!("inner handler awoken; interacting with fd={fd_3},{fd_4},{fd_5}"); + println!("inner handler awoken; interacting with fd={logs_fd},{parts_fd},{body_read_fd},{body_write_fd}"); + + // file descriptor 2 for writing logs to + let logs_fd = unsafe { std::fs::File::from_raw_fd(logs_fd) }; + + tracing_subscriber::registry() + .with(Logger::new(logs_fd)) + .init(); // this sets the subscriber as the global default and also adds a compatibility layer for capturing `log::Record`s // file descriptor 3 for reading and writing http parts - let mut parts_fd = unsafe { std::fs::File::from_raw_fd(fd_3) }; + let mut parts_fd = unsafe { std::fs::File::from_raw_fd(parts_fd) }; let reader = std::io::BufReader::new(&mut parts_fd); @@ -63,7 +77,7 @@ pub extern "C" fn __SHUTTLE_Axum_call( let wrapper: shuttle_common::wasm::RequestWrapper = rmp_serde::from_read(reader).unwrap(); // file descriptor 4 for reading http body into wasm - let mut body_read_stream = unsafe { std::fs::File::from_raw_fd(fd_4) }; + let mut body_read_stream = unsafe { std::fs::File::from_raw_fd(body_read_fd) }; let mut reader = std::io::BufReader::new(&mut body_read_stream); let mut body_buf = Vec::new(); @@ -88,7 +102,7 @@ pub extern "C" fn __SHUTTLE_Axum_call( parts_fd.write_all(&response_parts).unwrap(); // file descriptor 5 for writing http body to host - let mut body_write_stream = unsafe { std::fs::File::from_raw_fd(fd_5) }; + let mut body_write_stream = unsafe { std::fs::File::from_raw_fd(body_write_fd) }; // write body if there is one if let Some(body) = futures_executor::block_on(body.data()) { diff --git a/tmp/axum-wasm/Cargo.toml b/tmp/axum-wasm/Cargo.toml index aba75386b..2f5171163 100644 --- a/tmp/axum-wasm/Cargo.toml +++ b/tmp/axum-wasm/Cargo.toml @@ -15,6 +15,8 @@ futures = "0.3.25" http = "0.2.7" tower-service = "0.3.1" rmp-serde = { version = "1.1.1" } +tracing = "0.1.37" +tracing-subscriber = "0.3.16" [dependencies.shuttle-codegen] path = "../../codegen" diff --git a/tmp/axum-wasm/src/lib.rs b/tmp/axum-wasm/src/lib.rs index 5be67e21c..b4620386a 100644 --- a/tmp/axum-wasm/src/lib.rs +++ b/tmp/axum-wasm/src/lib.rs @@ -1,11 +1,15 @@ +use tracing::debug; + shuttle_codegen::app! { #[shuttle_codegen::endpoint(method = get, route = "/hello")] async fn hello() -> &'static str { + debug!("called hello()"); "Hello, World!" } #[shuttle_codegen::endpoint(method = get, route = "/goodbye")] async fn goodbye() -> &'static str { + debug!("called goodbye()"); "Goodbye, World!" } }