Skip to content

Commit

Permalink
Move cleaned pages into read cache instead of discarding them.
Browse files Browse the repository at this point in the history
Just after dirty pages have been written out to disk, they are currently discard
and need to be read in again when these pages are accessed in the future.

This changes moves them into the read cache instead thereby keeping the clean
data available and cache hot in memory.
  • Loading branch information
adamreichold committed May 12, 2024
1 parent 94e90d5 commit 09435b1
Showing 1 changed file with 38 additions and 9 deletions.
47 changes: 38 additions & 9 deletions src/tree_store/page_store/cached_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ pub(super) struct PagedCachedFile {
#[cfg(feature = "cache_metrics")]
reads_hits: AtomicU64,
fsync_failed: AtomicBool,
read_cache: Vec<RwLock<PrioritizedCache>>,
read_cache: Box<[RwLock<PrioritizedCache>]>,
// TODO: maybe move this cache to WriteTransaction?
write_buffer: Arc<Mutex<PrioritizedWriteCache>>,
}
Expand All @@ -237,10 +237,9 @@ impl PagedCachedFile {
max_read_cache_bytes: usize,
max_write_buffer_bytes: usize,
) -> Result<Self, DatabaseError> {
let mut read_cache = Vec::with_capacity(Self::lock_stripes());
for _ in 0..Self::lock_stripes() {
read_cache.push(RwLock::new(PrioritizedCache::new()));
}
let read_cache = (0..Self::lock_stripes())
.map(|_| RwLock::new(PrioritizedCache::new()))
.collect();

Ok(Self {
file,
Expand All @@ -263,7 +262,7 @@ impl PagedCachedFile {
self.file.len().map_err(StorageError::from)
}

const fn lock_stripes() -> usize {
const fn lock_stripes() -> u64 {
131
}

Expand Down Expand Up @@ -291,6 +290,36 @@ impl PagedCachedFile {
for (offset, buffer) in write_buffer.low_pri_cache.iter() {
self.file.write(*offset, buffer.as_ref().unwrap())?;
}
for (offset, buffer) in write_buffer.cache.iter_mut() {
let buffer = buffer.take().unwrap();
let cache_size = self
.read_cache_bytes
.fetch_add(buffer.len(), Ordering::AcqRel);

if cache_size + buffer.len() <= self.max_read_cache_bytes {
let cache_slot: usize = (offset % Self::lock_stripes()).try_into().unwrap();
let mut lock = self.read_cache[cache_slot].write().unwrap();
lock.insert(*offset, buffer.clone(), CachePriority::High);
} else {
self.read_cache_bytes
.fetch_sub(buffer.len(), Ordering::AcqRel);
}
}
for (offset, buffer) in write_buffer.low_pri_cache.iter_mut() {
let buffer = buffer.take().unwrap();
let cache_size = self
.read_cache_bytes
.fetch_add(buffer.len(), Ordering::AcqRel);

if cache_size + buffer.len() <= self.max_read_cache_bytes {
let cache_slot: usize = (offset % Self::lock_stripes()).try_into().unwrap();
let mut lock = self.read_cache[cache_slot].write().unwrap();
lock.insert(*offset, buffer.clone(), CachePriority::Low);
} else {
self.read_cache_bytes
.fetch_sub(buffer.len(), Ordering::AcqRel);
}
}
self.write_buffer_bytes.store(0, Ordering::Release);
write_buffer.clear();

Expand Down Expand Up @@ -353,7 +382,7 @@ impl PagedCachedFile {
}
}

let cache_slot: usize = (offset % Self::lock_stripes() as u64).try_into().unwrap();
let cache_slot: usize = (offset % Self::lock_stripes()).try_into().unwrap();
{
let read_lock = self.read_cache[cache_slot].read().unwrap();
if let Some(cached) = read_lock.get(&offset) {
Expand Down Expand Up @@ -398,7 +427,7 @@ impl PagedCachedFile {
//
// NOTE: Invalidating a cached region in subsections is permitted, as long as all subsections are invalidated
pub(super) fn invalidate_cache(&self, offset: u64, len: usize) {
let cache_slot: usize = (offset % self.read_cache.len() as u64).try_into().unwrap();
let cache_slot: usize = (offset % Self::lock_stripes()).try_into().unwrap();
let mut lock = self.read_cache[cache_slot].write().unwrap();
if let Some(removed) = lock.remove(&offset) {
assert_eq!(len, removed.len());
Expand Down Expand Up @@ -431,7 +460,7 @@ impl PagedCachedFile {
let mut lock = self.write_buffer.lock().unwrap();

// TODO: allow hint that page is known to be dirty and will not be in the read cache
let cache_slot: usize = (offset % self.read_cache.len() as u64).try_into().unwrap();
let cache_slot: usize = (offset % Self::lock_stripes()).try_into().unwrap();
let existing = {
let mut lock = self.read_cache[cache_slot].write().unwrap();
if let Some(removed) = lock.remove(&offset) {
Expand Down

0 comments on commit 09435b1

Please sign in to comment.