Skip to content

Commit

Permalink
feat(virtual-fs): Add TmpFs memory tracking and limiting
Browse files Browse the repository at this point in the history
Hacky implementation for tracking and limiting the memory consumption of
in-memory files of the TmpFs / mem_fs.

Ideally this would use a Vec with a custom allocator, but the allocator
APIs are currently restricted to nightly Rust.

To keep both code impact and performance impact low, a TrackedVec is
added that can hold a FsMemoryLimiter, which has callbacks for growing
and for shrinking memory.

Without the new "tracked" feature enabled, a stub impl is added, which
does not do any tracking , and hence has minimal performance impact.

This should be rewritten to a sane implementaiton soon as part of a
larger virtual-fs rewrite.

Part of #3865
  • Loading branch information
theduke committed May 24, 2023
1 parent 458330e commit 01c20f6
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 14 deletions.
2 changes: 2 additions & 0 deletions lib/virtual-fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,5 @@ webc-fs = ["webc", "anyhow"]
static-fs = ["webc", "anyhow"]
enable-serde = ["typetag"]
no-time = []
# Enables memory tracking/limiting functionality for the in-memory filesystem.
tracking = []
2 changes: 2 additions & 0 deletions lib/virtual-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub mod webc_fs;
#[cfg(feature = "webc-fs")]
mod webc_volume_fs;

pub mod limiter;

pub use arc_box_file::*;
pub use arc_file::*;
pub use arc_fs::*;
Expand Down
210 changes: 210 additions & 0 deletions lib/virtual-fs/src/limiter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
use std::sync::Arc;

use crate::FsError;

pub use self::tracked_vec::TrackedVec;

/// Allows tracking and limiting the memory usage of a memfs [`FileSystem`].
pub trait FsMemoryLimiter: Send + Sync + std::fmt::Debug {
fn on_grow(&self, grown_bytes: usize) -> std::result::Result<(), FsError>;
fn on_shrink(&self, shrunk_bytes: usize);
}

pub type DynFsMemoryLimiter = Arc<dyn FsMemoryLimiter + Send + Sync>;

#[cfg(feature = "tracking")]
mod tracked_vec {
use crate::FsError;

use super::DynFsMemoryLimiter;

#[derive(Debug, Clone)]
pub struct TrackedVec {
data: Vec<u8>,
pub(super) limiter: Option<DynFsMemoryLimiter>,
}

impl TrackedVec {
pub fn new(limiter: Option<DynFsMemoryLimiter>) -> Self {
Self {
data: Vec::new(),
limiter,
}
}

pub fn limiter(&self) -> Option<&DynFsMemoryLimiter> {
self.limiter.as_ref()
}

pub fn with_capacity(
capacity: usize,
limiter: Option<DynFsMemoryLimiter>,
) -> Result<Self, FsError> {
if let Some(limiter) = &limiter {
limiter.on_grow(capacity)?;
}
Ok(Self {
data: Vec::with_capacity(capacity),
limiter,
})
}

pub fn clear(&mut self) {
self.data.clear();
}

pub fn append(&mut self, other: &mut Self) -> Result<(), FsError> {
let old_capacity = self.data.capacity();
self.data.append(&mut other.data);

if let Some(limiter) = &self.limiter {
let new = self.data.capacity() - old_capacity;
limiter.on_grow(new)?;
}

Ok(())
}

pub fn split_off(&mut self, at: usize) -> Result<Self, FsError> {
let other = self.data.split_off(at);

if let Some(limiter) = &self.limiter {
// NOTE: split_off leaves the original vector capacity intact, so
// we can just add the new length.
let new_len = other.capacity();
limiter.on_grow(new_len)?;
}

Ok(Self {
data: other,
limiter: self.limiter.clone(),
})
}

pub fn resize(&mut self, new_len: usize, value: u8) -> Result<(), FsError> {
let old_capacity = self.data.capacity();
self.data.resize(new_len, value);
if let Some(limiter) = &self.limiter {
let new = self.data.capacity() - old_capacity;
limiter.on_grow(new)?;
}
Ok(())
}

pub fn extend_from_slice(&mut self, other: &[u8]) -> Result<(), FsError> {
let old_capacity = self.data.capacity();
self.data.extend_from_slice(other);
if let Some(limiter) = &self.limiter {
let new = self.data.capacity() - old_capacity;
limiter.on_grow(new)?;
}
Ok(())
}

pub fn reserve_exact(&mut self, additional: usize) -> Result<(), FsError> {
let old_capacity = self.data.capacity();
self.data.reserve_exact(additional);
if let Some(limiter) = &self.limiter {
let new = self.data.capacity() - old_capacity;
limiter.on_grow(new)?;
}
Ok(())
}
}

impl Drop for TrackedVec {
fn drop(&mut self) {
if let Some(limiter) = &self.limiter {
limiter.on_shrink(self.data.len());
}
}
}

impl std::ops::Deref for TrackedVec {
type Target = [u8];

fn deref(&self) -> &Self::Target {
&self.data
}
}

impl std::ops::DerefMut for TrackedVec {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.data
}
}
}

#[cfg(not(feature = "tracking"))]
mod tracked_vec {
use crate::FsError;

use super::DynFsMemoryLimiter;

#[derive(Debug)]
pub struct TrackedVec {
data: Vec<u8>,
}

impl TrackedVec {
pub fn new(_limiter: Option<DynFsMemoryLimiter>) -> Self {
Self { data: Vec::new() }
}

pub fn limiter(&self) -> Option<&DynFsMemoryLimiter> {
None
}

pub fn with_capacity(
capacity: usize,
_limiter: Option<DynFsMemoryLimiter>,
) -> Result<Self, FsError> {
Ok(Self {
data: Vec::with_capacity(capacity),
})
}

pub fn clear(&mut self) {
self.data.clear();
}

pub fn append(&mut self, other: &mut Self) -> Result<(), FsError> {
self.data.append(&mut other.data);
Ok(())
}

pub fn split_off(&mut self, at: usize) -> Result<Self, FsError> {
let other = self.data.split_off(at);
Ok(Self { data: other })
}

pub fn resize(&mut self, new_len: usize, value: u8) -> Result<(), FsError> {
self.data.resize(new_len, value);
Ok(())
}

pub fn extend_from_slice(&mut self, other: &[u8]) -> Result<(), FsError> {
self.data.extend_from_slice(other);
Ok(())
}

pub fn reserve_exact(&mut self, additional: usize) -> Result<(), FsError> {
self.data.reserve_exact(additional);
Ok(())
}
}

impl std::ops::Deref for TrackedVec {
type Target = Vec<u8>;

fn deref(&self) -> &Self::Target {
&self.data
}
}

impl std::ops::DerefMut for TrackedVec {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.data
}
}
}
32 changes: 20 additions & 12 deletions lib/virtual-fs/src/mem_fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use tokio::io::AsyncRead;
use tokio::io::{AsyncSeek, AsyncWrite};

use super::*;
use crate::limiter::TrackedVec;
use crate::{FsError, Result, VirtualFile};
use std::borrow::Cow;
use std::cmp;
Expand Down Expand Up @@ -177,7 +178,7 @@ impl VirtualFile for FileHandle {
match inode {
Some(Node::File(FileNode { file, metadata, .. })) => {
file.buffer
.resize(new_size.try_into().map_err(|_| FsError::UnknownError)?, 0);
.resize(new_size.try_into().map_err(|_| FsError::UnknownError)?, 0)?;
metadata.len = new_size;
}
Some(Node::CustomFile(node)) => {
Expand Down Expand Up @@ -1228,12 +1229,14 @@ impl fmt::Debug for FileHandle {
/// represents a read/write position in the buffer.
#[derive(Debug)]
pub(super) struct File {
buffer: Vec<u8>,
buffer: TrackedVec,
}

impl File {
pub(super) fn new() -> Self {
Self { buffer: Vec::new() }
pub(super) fn new(limiter: Option<crate::limiter::DynFsMemoryLimiter>) -> Self {
Self {
buffer: TrackedVec::new(limiter),
}
}

pub(super) fn truncate(&mut self) {
Expand Down Expand Up @@ -1304,27 +1307,32 @@ impl File {
match *cursor {
// The cursor is at the end of the buffer: happy path!
position if position == self.buffer.len() as u64 => {
self.buffer.extend_from_slice(buf);
self.buffer.extend_from_slice(buf)?;
}

// The cursor is at the beginning of the buffer (and the
// buffer is not empty, otherwise it would have been
// caught by the previous arm): almost a happy path!
0 => {
let mut new_buffer = Vec::with_capacity(self.buffer.len() + buf.len());
new_buffer.extend_from_slice(buf);
new_buffer.append(&mut self.buffer);
// FIXME(perf,theduke): make this faster, it's horrible!
let mut new_buffer = TrackedVec::with_capacity(
self.buffer.len() + buf.len(),
self.buffer.limiter().cloned(),
)?;
new_buffer.extend_from_slice(buf)?;
new_buffer.append(&mut self.buffer)?;

self.buffer = new_buffer;
}

// The cursor is somewhere in the buffer: not the happy path.
position => {
self.buffer.reserve_exact(buf.len());
self.buffer.reserve_exact(buf.len())?;

let mut remainder = self.buffer.split_off(position as usize);
self.buffer.extend_from_slice(buf);
self.buffer.append(&mut remainder);
// FIXME(perf,theduke): make this faster, it's horrible!
let mut remainder = self.buffer.split_off(position as usize)?;
self.buffer.extend_from_slice(buf)?;
self.buffer.append(&mut remainder)?;
}
}

Expand Down
2 changes: 1 addition & 1 deletion lib/virtual-fs/src/mem_fs/file_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ impl crate::FileOpener for FileSystem {
// Write lock.
let mut fs = self.inner.write().map_err(|_| FsError::Lock)?;

let file = File::new();
let file = File::new(fs.limiter.clone());

// Creating the file in the storage.
let inode_of_file = fs.storage.vacant_entry().key();
Expand Down
10 changes: 9 additions & 1 deletion lib/virtual-fs/src/mem_fs/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ pub struct FileSystem {
}

impl FileSystem {
pub fn set_memory_limiter(&self, limiter: crate::limiter::DynFsMemoryLimiter) {
self.inner.write().unwrap().limiter = Some(limiter);
}

pub fn new_open_options_ext(&self) -> &FileSystem {
self
}
Expand Down Expand Up @@ -539,6 +543,7 @@ impl fmt::Debug for FileSystem {
/// indexed by their respective `Inode` in a slab.
pub(super) struct FileSystemInner {
pub(super) storage: Slab<Node>,
pub(super) limiter: Option<crate::limiter::DynFsMemoryLimiter>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -932,7 +937,10 @@ impl Default for FileSystemInner {
},
}));

Self { storage: slab }
Self {
storage: slab,
limiter: None,
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions lib/virtual-fs/src/tmp_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ impl TmpFileSystem {
Self::default()
}

pub fn set_memory_limiter(&self, limiter: crate::limiter::DynFsMemoryLimiter) {
self.fs.set_memory_limiter(limiter);
}

pub fn new_open_options_ext(&self) -> &mem_fs::FileSystem {
self.fs.new_open_options_ext()
}
Expand Down

0 comments on commit 01c20f6

Please sign in to comment.