Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TmpFs Memory Usage Tracking and Limiting #3902

Merged
merged 3 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/virtual-fs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,5 @@ webc-fs = ["webc", "anyhow"]
static-fs = ["webc", "anyhow"]
enable-serde = ["typetag"]
no-time = []
# Enables memory tracking/limiting functionality for the in-memory filesystem.
tracking = []
2 changes: 2 additions & 0 deletions lib/virtual-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ pub mod webc_fs;
#[cfg(feature = "webc-fs")]
mod webc_volume_fs;

pub mod limiter;

pub use arc_box_file::*;
pub use arc_file::*;
pub use arc_fs::*;
Expand Down
210 changes: 210 additions & 0 deletions lib/virtual-fs/src/limiter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
use std::sync::Arc;

use crate::FsError;

pub use self::tracked_vec::TrackedVec;

/// Allows tracking and limiting the memory usage of a memfs [`FileSystem`].
pub trait FsMemoryLimiter: Send + Sync + std::fmt::Debug {
fn on_grow(&self, grown_bytes: usize) -> std::result::Result<(), FsError>;
fn on_shrink(&self, shrunk_bytes: usize);
}

pub type DynFsMemoryLimiter = Arc<dyn FsMemoryLimiter + Send + Sync>;

#[cfg(feature = "tracking")]
mod tracked_vec {
use crate::FsError;

use super::DynFsMemoryLimiter;

#[derive(Debug, Clone)]
pub struct TrackedVec {
data: Vec<u8>,
pub(super) limiter: Option<DynFsMemoryLimiter>,
}

impl TrackedVec {
pub fn new(limiter: Option<DynFsMemoryLimiter>) -> Self {
Self {
data: Vec::new(),
limiter,
}
}

pub fn limiter(&self) -> Option<&DynFsMemoryLimiter> {
self.limiter.as_ref()
}

pub fn with_capacity(
capacity: usize,
limiter: Option<DynFsMemoryLimiter>,
) -> Result<Self, FsError> {
if let Some(limiter) = &limiter {
limiter.on_grow(capacity)?;
}
Ok(Self {
data: Vec::with_capacity(capacity),
limiter,
})
}

pub fn clear(&mut self) {
self.data.clear();
}

pub fn append(&mut self, other: &mut Self) -> Result<(), FsError> {
let old_capacity = self.data.capacity();
self.data.append(&mut other.data);

if let Some(limiter) = &self.limiter {
let new = self.data.capacity() - old_capacity;
limiter.on_grow(new)?;
}

Ok(())
}

pub fn split_off(&mut self, at: usize) -> Result<Self, FsError> {
let other = self.data.split_off(at);

if let Some(limiter) = &self.limiter {
// NOTE: split_off leaves the original vector capacity intact, so
// we can just add the new length.
let new_len = other.capacity();
limiter.on_grow(new_len)?;
}

Ok(Self {
data: other,
limiter: self.limiter.clone(),
})
}

pub fn resize(&mut self, new_len: usize, value: u8) -> Result<(), FsError> {
let old_capacity = self.data.capacity();
self.data.resize(new_len, value);
ptitSeb marked this conversation as resolved.
Show resolved Hide resolved
if let Some(limiter) = &self.limiter {
let new = self.data.capacity() - old_capacity;
limiter.on_grow(new)?;
}
Ok(())
}

pub fn extend_from_slice(&mut self, other: &[u8]) -> Result<(), FsError> {
let old_capacity = self.data.capacity();
self.data.extend_from_slice(other);
if let Some(limiter) = &self.limiter {
let new = self.data.capacity() - old_capacity;
limiter.on_grow(new)?;
}
Ok(())
}

pub fn reserve_exact(&mut self, additional: usize) -> Result<(), FsError> {
let old_capacity = self.data.capacity();
self.data.reserve_exact(additional);
if let Some(limiter) = &self.limiter {
let new = self.data.capacity() - old_capacity;
limiter.on_grow(new)?;
}
Ok(())
}
}

impl Drop for TrackedVec {
fn drop(&mut self) {
if let Some(limiter) = &self.limiter {
limiter.on_shrink(self.data.capacity());
}
}
}

impl std::ops::Deref for TrackedVec {
type Target = [u8];

fn deref(&self) -> &Self::Target {
&self.data
}
}

impl std::ops::DerefMut for TrackedVec {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.data
}
}
}

#[cfg(not(feature = "tracking"))]
mod tracked_vec {
use crate::FsError;

use super::DynFsMemoryLimiter;

#[derive(Debug)]
pub struct TrackedVec {
data: Vec<u8>,
}

impl TrackedVec {
pub fn new(_limiter: Option<DynFsMemoryLimiter>) -> Self {
Self { data: Vec::new() }
}

pub fn limiter(&self) -> Option<&DynFsMemoryLimiter> {
None
}

pub fn with_capacity(
capacity: usize,
_limiter: Option<DynFsMemoryLimiter>,
) -> Result<Self, FsError> {
Ok(Self {
data: Vec::with_capacity(capacity),
})
}

pub fn clear(&mut self) {
self.data.clear();
}

pub fn append(&mut self, other: &mut Self) -> Result<(), FsError> {
self.data.append(&mut other.data);
Ok(())
}

pub fn split_off(&mut self, at: usize) -> Result<Self, FsError> {
let other = self.data.split_off(at);
Ok(Self { data: other })
}

pub fn resize(&mut self, new_len: usize, value: u8) -> Result<(), FsError> {
self.data.resize(new_len, value);
Ok(())
}

pub fn extend_from_slice(&mut self, other: &[u8]) -> Result<(), FsError> {
self.data.extend_from_slice(other);
Ok(())
}

pub fn reserve_exact(&mut self, additional: usize) -> Result<(), FsError> {
self.data.reserve_exact(additional);
Ok(())
}
}

impl std::ops::Deref for TrackedVec {
type Target = Vec<u8>;

fn deref(&self) -> &Self::Target {
&self.data
}
}

impl std::ops::DerefMut for TrackedVec {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.data
}
}
}
32 changes: 20 additions & 12 deletions lib/virtual-fs/src/mem_fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use tokio::io::AsyncRead;
use tokio::io::{AsyncSeek, AsyncWrite};

use super::*;
use crate::limiter::TrackedVec;
use crate::{FsError, Result, VirtualFile};
use std::borrow::Cow;
use std::cmp;
Expand Down Expand Up @@ -177,7 +178,7 @@ impl VirtualFile for FileHandle {
match inode {
Some(Node::File(FileNode { file, metadata, .. })) => {
file.buffer
.resize(new_size.try_into().map_err(|_| FsError::UnknownError)?, 0);
.resize(new_size.try_into().map_err(|_| FsError::UnknownError)?, 0)?;
metadata.len = new_size;
}
Some(Node::CustomFile(node)) => {
Expand Down Expand Up @@ -1228,12 +1229,14 @@ impl fmt::Debug for FileHandle {
/// represents a read/write position in the buffer.
#[derive(Debug)]
pub(super) struct File {
buffer: Vec<u8>,
buffer: TrackedVec,
}

impl File {
pub(super) fn new() -> Self {
Self { buffer: Vec::new() }
pub(super) fn new(limiter: Option<crate::limiter::DynFsMemoryLimiter>) -> Self {
Self {
buffer: TrackedVec::new(limiter),
}
}

pub(super) fn truncate(&mut self) {
Expand Down Expand Up @@ -1304,27 +1307,32 @@ impl File {
match *cursor {
// The cursor is at the end of the buffer: happy path!
position if position == self.buffer.len() as u64 => {
self.buffer.extend_from_slice(buf);
self.buffer.extend_from_slice(buf)?;
}

// The cursor is at the beginning of the buffer (and the
// buffer is not empty, otherwise it would have been
// caught by the previous arm): almost a happy path!
0 => {
let mut new_buffer = Vec::with_capacity(self.buffer.len() + buf.len());
new_buffer.extend_from_slice(buf);
new_buffer.append(&mut self.buffer);
// FIXME(perf,theduke): make this faster, it's horrible!
let mut new_buffer = TrackedVec::with_capacity(
self.buffer.len() + buf.len(),
self.buffer.limiter().cloned(),
)?;
new_buffer.extend_from_slice(buf)?;
new_buffer.append(&mut self.buffer)?;

self.buffer = new_buffer;
}

// The cursor is somewhere in the buffer: not the happy path.
position => {
self.buffer.reserve_exact(buf.len());
self.buffer.reserve_exact(buf.len())?;

let mut remainder = self.buffer.split_off(position as usize);
self.buffer.extend_from_slice(buf);
self.buffer.append(&mut remainder);
// FIXME(perf,theduke): make this faster, it's horrible!
let mut remainder = self.buffer.split_off(position as usize)?;
self.buffer.extend_from_slice(buf)?;
self.buffer.append(&mut remainder)?;
}
}

Expand Down
2 changes: 1 addition & 1 deletion lib/virtual-fs/src/mem_fs/file_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ impl crate::FileOpener for FileSystem {
// Write lock.
let mut fs = self.inner.write().map_err(|_| FsError::Lock)?;

let file = File::new();
let file = File::new(fs.limiter.clone());

// Creating the file in the storage.
let inode_of_file = fs.storage.vacant_entry().key();
Expand Down
10 changes: 9 additions & 1 deletion lib/virtual-fs/src/mem_fs/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ pub struct FileSystem {
}

impl FileSystem {
pub fn set_memory_limiter(&self, limiter: crate::limiter::DynFsMemoryLimiter) {
self.inner.write().unwrap().limiter = Some(limiter);
}

pub fn new_open_options_ext(&self) -> &FileSystem {
self
}
Expand Down Expand Up @@ -539,6 +543,7 @@ impl fmt::Debug for FileSystem {
/// indexed by their respective `Inode` in a slab.
pub(super) struct FileSystemInner {
pub(super) storage: Slab<Node>,
pub(super) limiter: Option<crate::limiter::DynFsMemoryLimiter>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -932,7 +937,10 @@ impl Default for FileSystemInner {
},
}));

Self { storage: slab }
Self {
storage: slab,
limiter: None,
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions lib/virtual-fs/src/tmp_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ impl TmpFileSystem {
Self::default()
}

pub fn set_memory_limiter(&self, limiter: crate::limiter::DynFsMemoryLimiter) {
self.fs.set_memory_limiter(limiter);
}

pub fn new_open_options_ext(&self) -> &mem_fs::FileSystem {
self.fs.new_open_options_ext()
}
Expand Down
Loading