diff --git a/lib/virtual-fs/Cargo.toml b/lib/virtual-fs/Cargo.toml index 09253053472..803118e1e10 100644 --- a/lib/virtual-fs/Cargo.toml +++ b/lib/virtual-fs/Cargo.toml @@ -42,3 +42,5 @@ webc-fs = ["webc", "anyhow"] static-fs = ["webc", "anyhow"] enable-serde = ["typetag"] no-time = [] +# Enables memory tracking/limiting functionality for the in-memory filesystem. +tracking = [] diff --git a/lib/virtual-fs/src/lib.rs b/lib/virtual-fs/src/lib.rs index 6433ae638e7..f39a05b15de 100644 --- a/lib/virtual-fs/src/lib.rs +++ b/lib/virtual-fs/src/lib.rs @@ -43,6 +43,8 @@ pub mod webc_fs; #[cfg(feature = "webc-fs")] mod webc_volume_fs; +pub mod limiter; + pub use arc_box_file::*; pub use arc_file::*; pub use arc_fs::*; diff --git a/lib/virtual-fs/src/limiter.rs b/lib/virtual-fs/src/limiter.rs new file mode 100644 index 00000000000..bcd44ee676f --- /dev/null +++ b/lib/virtual-fs/src/limiter.rs @@ -0,0 +1,210 @@ +use std::sync::Arc; + +use crate::FsError; + +pub use self::tracked_vec::TrackedVec; + +/// Allows tracking and limiting the memory usage of a memfs [`FileSystem`]. +pub trait FsMemoryLimiter: Send + Sync + std::fmt::Debug { + fn on_grow(&self, grown_bytes: usize) -> std::result::Result<(), FsError>; + fn on_shrink(&self, shrunk_bytes: usize); +} + +pub type DynFsMemoryLimiter = Arc; + +#[cfg(feature = "tracking")] +mod tracked_vec { + use crate::FsError; + + use super::DynFsMemoryLimiter; + + #[derive(Debug, Clone)] + pub struct TrackedVec { + data: Vec, + pub(super) limiter: Option, + } + + impl TrackedVec { + pub fn new(limiter: Option) -> Self { + Self { + data: Vec::new(), + limiter, + } + } + + pub fn limiter(&self) -> Option<&DynFsMemoryLimiter> { + self.limiter.as_ref() + } + + pub fn with_capacity( + capacity: usize, + limiter: Option, + ) -> Result { + if let Some(limiter) = &limiter { + limiter.on_grow(capacity)?; + } + Ok(Self { + data: Vec::with_capacity(capacity), + limiter, + }) + } + + pub fn clear(&mut self) { + self.data.clear(); + } + + pub fn append(&mut self, other: &mut Self) -> Result<(), FsError> { + let old_capacity = self.data.capacity(); + self.data.append(&mut other.data); + + if let Some(limiter) = &self.limiter { + let new = self.data.capacity() - old_capacity; + limiter.on_grow(new)?; + } + + Ok(()) + } + + pub fn split_off(&mut self, at: usize) -> Result { + let other = self.data.split_off(at); + + if let Some(limiter) = &self.limiter { + // NOTE: split_off leaves the original vector capacity intact, so + // we can just add the new length. + let new_len = other.capacity(); + limiter.on_grow(new_len)?; + } + + Ok(Self { + data: other, + limiter: self.limiter.clone(), + }) + } + + pub fn resize(&mut self, new_len: usize, value: u8) -> Result<(), FsError> { + let old_capacity = self.data.capacity(); + self.data.resize(new_len, value); + if let Some(limiter) = &self.limiter { + let new = self.data.capacity() - old_capacity; + limiter.on_grow(new)?; + } + Ok(()) + } + + pub fn extend_from_slice(&mut self, other: &[u8]) -> Result<(), FsError> { + let old_capacity = self.data.capacity(); + self.data.extend_from_slice(other); + if let Some(limiter) = &self.limiter { + let new = self.data.capacity() - old_capacity; + limiter.on_grow(new)?; + } + Ok(()) + } + + pub fn reserve_exact(&mut self, additional: usize) -> Result<(), FsError> { + let old_capacity = self.data.capacity(); + self.data.reserve_exact(additional); + if let Some(limiter) = &self.limiter { + let new = self.data.capacity() - old_capacity; + limiter.on_grow(new)?; + } + Ok(()) + } + } + + impl Drop for TrackedVec { + fn drop(&mut self) { + if let Some(limiter) = &self.limiter { + limiter.on_shrink(self.data.capacity()); + } + } + } + + impl std::ops::Deref for TrackedVec { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + &self.data + } + } + + impl std::ops::DerefMut for TrackedVec { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.data + } + } +} + +#[cfg(not(feature = "tracking"))] +mod tracked_vec { + use crate::FsError; + + use super::DynFsMemoryLimiter; + + #[derive(Debug)] + pub struct TrackedVec { + data: Vec, + } + + impl TrackedVec { + pub fn new(_limiter: Option) -> Self { + Self { data: Vec::new() } + } + + pub fn limiter(&self) -> Option<&DynFsMemoryLimiter> { + None + } + + pub fn with_capacity( + capacity: usize, + _limiter: Option, + ) -> Result { + Ok(Self { + data: Vec::with_capacity(capacity), + }) + } + + pub fn clear(&mut self) { + self.data.clear(); + } + + pub fn append(&mut self, other: &mut Self) -> Result<(), FsError> { + self.data.append(&mut other.data); + Ok(()) + } + + pub fn split_off(&mut self, at: usize) -> Result { + let other = self.data.split_off(at); + Ok(Self { data: other }) + } + + pub fn resize(&mut self, new_len: usize, value: u8) -> Result<(), FsError> { + self.data.resize(new_len, value); + Ok(()) + } + + pub fn extend_from_slice(&mut self, other: &[u8]) -> Result<(), FsError> { + self.data.extend_from_slice(other); + Ok(()) + } + + pub fn reserve_exact(&mut self, additional: usize) -> Result<(), FsError> { + self.data.reserve_exact(additional); + Ok(()) + } + } + + impl std::ops::Deref for TrackedVec { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.data + } + } + + impl std::ops::DerefMut for TrackedVec { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.data + } + } +} diff --git a/lib/virtual-fs/src/mem_fs/file.rs b/lib/virtual-fs/src/mem_fs/file.rs index 278e93e907d..e8854310355 100644 --- a/lib/virtual-fs/src/mem_fs/file.rs +++ b/lib/virtual-fs/src/mem_fs/file.rs @@ -6,6 +6,7 @@ use tokio::io::AsyncRead; use tokio::io::{AsyncSeek, AsyncWrite}; use super::*; +use crate::limiter::TrackedVec; use crate::{FsError, Result, VirtualFile}; use std::borrow::Cow; use std::cmp; @@ -177,7 +178,7 @@ impl VirtualFile for FileHandle { match inode { Some(Node::File(FileNode { file, metadata, .. })) => { file.buffer - .resize(new_size.try_into().map_err(|_| FsError::UnknownError)?, 0); + .resize(new_size.try_into().map_err(|_| FsError::UnknownError)?, 0)?; metadata.len = new_size; } Some(Node::CustomFile(node)) => { @@ -1228,12 +1229,14 @@ impl fmt::Debug for FileHandle { /// represents a read/write position in the buffer. #[derive(Debug)] pub(super) struct File { - buffer: Vec, + buffer: TrackedVec, } impl File { - pub(super) fn new() -> Self { - Self { buffer: Vec::new() } + pub(super) fn new(limiter: Option) -> Self { + Self { + buffer: TrackedVec::new(limiter), + } } pub(super) fn truncate(&mut self) { @@ -1304,27 +1307,32 @@ impl File { match *cursor { // The cursor is at the end of the buffer: happy path! position if position == self.buffer.len() as u64 => { - self.buffer.extend_from_slice(buf); + self.buffer.extend_from_slice(buf)?; } // The cursor is at the beginning of the buffer (and the // buffer is not empty, otherwise it would have been // caught by the previous arm): almost a happy path! 0 => { - let mut new_buffer = Vec::with_capacity(self.buffer.len() + buf.len()); - new_buffer.extend_from_slice(buf); - new_buffer.append(&mut self.buffer); + // FIXME(perf,theduke): make this faster, it's horrible! + let mut new_buffer = TrackedVec::with_capacity( + self.buffer.len() + buf.len(), + self.buffer.limiter().cloned(), + )?; + new_buffer.extend_from_slice(buf)?; + new_buffer.append(&mut self.buffer)?; self.buffer = new_buffer; } // The cursor is somewhere in the buffer: not the happy path. position => { - self.buffer.reserve_exact(buf.len()); + self.buffer.reserve_exact(buf.len())?; - let mut remainder = self.buffer.split_off(position as usize); - self.buffer.extend_from_slice(buf); - self.buffer.append(&mut remainder); + // FIXME(perf,theduke): make this faster, it's horrible! + let mut remainder = self.buffer.split_off(position as usize)?; + self.buffer.extend_from_slice(buf)?; + self.buffer.append(&mut remainder)?; } } diff --git a/lib/virtual-fs/src/mem_fs/file_opener.rs b/lib/virtual-fs/src/mem_fs/file_opener.rs index b123d3f8558..a904fd458ff 100644 --- a/lib/virtual-fs/src/mem_fs/file_opener.rs +++ b/lib/virtual-fs/src/mem_fs/file_opener.rs @@ -448,7 +448,7 @@ impl crate::FileOpener for FileSystem { // Write lock. let mut fs = self.inner.write().map_err(|_| FsError::Lock)?; - let file = File::new(); + let file = File::new(fs.limiter.clone()); // Creating the file in the storage. let inode_of_file = fs.storage.vacant_entry().key(); diff --git a/lib/virtual-fs/src/mem_fs/filesystem.rs b/lib/virtual-fs/src/mem_fs/filesystem.rs index 5249547d1d1..356580b7b40 100644 --- a/lib/virtual-fs/src/mem_fs/filesystem.rs +++ b/lib/virtual-fs/src/mem_fs/filesystem.rs @@ -20,6 +20,10 @@ pub struct FileSystem { } impl FileSystem { + pub fn set_memory_limiter(&self, limiter: crate::limiter::DynFsMemoryLimiter) { + self.inner.write().unwrap().limiter = Some(limiter); + } + pub fn new_open_options_ext(&self) -> &FileSystem { self } @@ -539,6 +543,7 @@ impl fmt::Debug for FileSystem { /// indexed by their respective `Inode` in a slab. pub(super) struct FileSystemInner { pub(super) storage: Slab, + pub(super) limiter: Option, } #[derive(Debug)] @@ -932,7 +937,10 @@ impl Default for FileSystemInner { }, })); - Self { storage: slab } + Self { + storage: slab, + limiter: None, + } } } diff --git a/lib/virtual-fs/src/tmp_fs.rs b/lib/virtual-fs/src/tmp_fs.rs index 9722aa215de..ed1f4be0149 100644 --- a/lib/virtual-fs/src/tmp_fs.rs +++ b/lib/virtual-fs/src/tmp_fs.rs @@ -30,6 +30,10 @@ impl TmpFileSystem { Self::default() } + pub fn set_memory_limiter(&self, limiter: crate::limiter::DynFsMemoryLimiter) { + self.fs.set_memory_limiter(limiter); + } + pub fn new_open_options_ext(&self) -> &mem_fs::FileSystem { self.fs.new_open_options_ext() } diff --git a/lib/wasi/src/os/console/mod.rs b/lib/wasi/src/os/console/mod.rs index 1a0d5e80db1..a9c1222bf1d 100644 --- a/lib/wasi/src/os/console/mod.rs +++ b/lib/wasi/src/os/console/mod.rs @@ -51,6 +51,7 @@ pub struct Console { stdout: ArcBoxFile, stderr: ArcBoxFile, capabilities: Capabilities, + memfs_memory_limiter: Option, } impl Console { @@ -81,6 +82,7 @@ impl Console { stdout: ArcBoxFile::new(Box::new(Pipe::channel().0)), stderr: ArcBoxFile::new(Box::new(Pipe::channel().0)), capabilities: Default::default(), + memfs_memory_limiter: None, } } @@ -143,6 +145,14 @@ impl Console { self } + pub fn with_mem_fs_memory_limiter( + mut self, + limiter: virtual_fs::limiter::DynFsMemoryLimiter, + ) -> Self { + self.memfs_memory_limiter = Some(limiter); + self + } + pub fn run(&mut self) -> Result<(TaskJoinHandle, WasiProcess), VirtualBusError> { // Extract the program name from the arguments let empty_args: Vec<&[u8]> = Vec::new(); @@ -193,6 +203,18 @@ impl Console { // TODO: no unwrap! let env = WasiEnv::from_init(env_init).unwrap(); + if let Some(limiter) = &self.memfs_memory_limiter { + match &env.state.fs.root_fs { + crate::fs::WasiFsRoot::Sandbox(tmpfs) => { + tmpfs.set_memory_limiter(limiter.clone()); + } + crate::fs::WasiFsRoot::Backing(_) => { + tracing::error!("tried to set a tmpfs memory limiter on a backing fs"); + return Err(VirtualBusError::InvokeFailed); + } + } + } + // TODO: this should not happen here... // Display the welcome message let tasks = env.tasks().clone();