From f1cfbc1b44ab88bb986000d74201865347f0d0c1 Mon Sep 17 00:00:00 2001 From: James Kominick Date: Tue, 7 May 2024 21:58:17 -0400 Subject: [PATCH] 0.51.3 expiring sized cache improvements --- CHANGELOG.md | 9 + Cargo.toml | 6 +- Makefile | 3 +- examples/expiring_sized_cache.rs | 54 +++++ src/stores/expiring_sized.rs | 369 +++++++++++++++---------------- 5 files changed, 250 insertions(+), 191 deletions(-) create mode 100644 examples/expiring_sized_cache.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 2243ea0..48f9eed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,15 @@ ## Changed ## Removed +## [0.51.3] +## Added +- `ExpiringSizedCache`: Allow specifying explicit TTL when inserting +## Changed +- Refactor `ExpiringSizedCache` internals to not require tombstones +- `ExpiringSizedCache` keys must impl `Ord` +- `ExpiringSizedCache` `remove` and `insert` updated to return only unexpired values +## Removed + ## [0.51.2] ## Added - Add `get_borrowed` methods to `ExpiringSizedCache` to support cache retrieval using `&str` / `&[T]` diff --git a/Cargo.toml b/Cargo.toml index d05596f..a93158a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cached" -version = "0.51.2" +version = "0.51.3" authors = ["James Kominick "] description = "Generic cache implementations and simplified function memoization" repository = "https://github.com/jaemk/cached" @@ -135,3 +135,7 @@ required-features = ["async", "proc_macro"] [[example]] name = "async_std" required-features = ["async", "proc_macro"] + +[[example]] +name = "expiring_sized_cache" +required-features = ["async_tokio_rt_multi_thread"] diff --git a/Makefile b/Makefile index abd4050..c11e74b 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,8 @@ CACHED_BASIC_EXAMPLES = async_std \ basic_proc_macro \ kitchen_sink \ kitchen_sink_proc_macro \ - tokio + tokio \ + expiring_sized_cache # Same as `CACHED_BASIC_EXAMPLES`, but these examples require the `docker/redis` # goal CACHED_REDIS_EXAMPLES = redis \ diff --git a/examples/expiring_sized_cache.rs b/examples/expiring_sized_cache.rs new file mode 100644 index 0000000..92085f1 --- /dev/null +++ b/examples/expiring_sized_cache.rs @@ -0,0 +1,54 @@ +use cached::stores::ExpiringSizedCache; +use instant::Instant; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::RwLock; + +#[tokio::main] +async fn main() { + let mut cache = ExpiringSizedCache::new(20_000); + cache.size_limit(100); + + let cache = Arc::new(RwLock::new(cache)); + + let write_cache = cache.clone(); + let write_handle = tokio::spawn(async move { + for _ in 0..10 { + { + let mut cache = write_cache.write().await; + cache + .insert("A".to_string(), "A".to_string()) + .expect("write failure"); + println!("[expiring_sized] wrote to cache"); + } + tokio::time::sleep(Duration::from_millis(500)).await; + } + }); + + let mut read_handles = vec![]; + for i in 0..5 { + let reader = i + 1; + let read_cache = cache.clone(); + let read_handle = tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(100)).await; + let start = Instant::now(); + let mut count = 0; + while Instant::now().duration_since(start) < Duration::from_millis(5_000) { + let cache = read_cache.read().await; + assert_eq!(cache.get_borrowed("A"), Some(&"A".to_string())); + count += 1; + if count % 1_000_000 == 0 { + println!("[expiring_sized] read 1M times in reader {}", reader); + } + } + }); + read_handles.push(read_handle); + } + + write_handle.await.expect("error in write loop"); + for (i, h) in read_handles.into_iter().enumerate() { + h.await + .map_err(|e| format!("error in read handle {}: {:?}", i + 1, e)) + .unwrap(); + } +} diff --git a/src/stores/expiring_sized.rs b/src/stores/expiring_sized.rs index 12ee2bc..856b186 100644 --- a/src/stores/expiring_sized.rs +++ b/src/stores/expiring_sized.rs @@ -1,12 +1,22 @@ use std::borrow::Borrow; -use std::collections::{HashMap, VecDeque}; +use std::cmp::Ordering; +use std::collections::{BTreeSet, HashMap}; use std::hash::{Hash, Hasher}; +use std::ops::Bound::{Excluded, Included}; use std::sync::Arc; use std::time::{Duration, Instant}; +/// Wrap keys so they don't need to implement Clone #[derive(Eq)] +// todo: can we switch to an Rc? struct CacheArc(Arc); +impl CacheArc { + fn new(key: T) -> Self { + CacheArc(Arc::new(key)) + } +} + impl Clone for CacheArc { fn clone(&self) -> Self { CacheArc(self.0.clone()) @@ -19,6 +29,17 @@ impl PartialEq for CacheArc { } } +impl PartialOrd for CacheArc { + fn partial_cmp(&self, other: &Self) -> Option { + self.0.partial_cmp(&other.0) + } +} +impl Ord for CacheArc { + fn cmp(&self, other: &Self) -> Ordering { + self.0.cmp(&other.0) + } +} + impl Hash for CacheArc { fn hash(&self, state: &mut H) { self.0.hash(state); @@ -50,24 +71,58 @@ pub enum Error { TimeBounds, } -#[derive(Hash, Eq, PartialEq)] +/// A timestamped key to allow identifying key ranges +#[derive(Hash, Eq, PartialEq, Ord, PartialOrd)] struct Stamped { - tombstone: bool, + // note: the field order matters here since the derived ord traits + // generate lexicographic ordering based on the top-to-bottom + // declaration order expiry: Instant, - key: CacheArc, + + // wrapped in an option so it's easy to generate + // a range bound containing None + key: Option>, } -struct Entry { - stamp_index: usize, +impl Clone for Stamped { + fn clone(&self) -> Self { + Self { + expiry: self.expiry, + key: self.key.clone(), + } + } +} + +impl Stamped { + fn bound(expiry: Instant) -> Stamped { + Stamped { expiry, key: None } + } +} + +/// A timestamped value to allow re-building a timestamped key +struct Entry { expiry: Instant, + key: CacheArc, value: V, } +impl Entry { + fn as_stamped(&self) -> Stamped { + Stamped { + expiry: self.expiry, + key: Some(self.key.clone()), + } + } + + fn is_expired(&self) -> bool { + self.expiry < Instant::now() + } +} + macro_rules! impl_get { ($_self:expr, $key:expr) => {{ - let cutoff = Instant::now(); $_self.map.get($key).and_then(|entry| { - if entry.expiry < cutoff { + if entry.is_expired() { None } else { Some(&entry.value) @@ -84,42 +139,40 @@ macro_rules! impl_get { /// /// To accomplish this, there are a few trade-offs: /// - Maximum cache size logic cannot support "LRU", instead dropping the next value to expire +/// - Cache keys must implement `Ord` /// - The cache's size, reported by `.len` is only guaranteed to be accurate immediately /// after a call to either `.evict` or `.retain_latest` /// - Eviction must be explicitly requested, either on its own or while inserting -/// - Writing to existing keys, removing, evict, or dropping (with `.retain_latest`) will -/// generate tombstones that must eventually be cleared. Clearing tombstones requires -/// a full traversal (`O(n)`) to rewrite internal indices. This happens automatically -/// when the number of tombstones reaches a certain threshold. pub struct ExpiringSizedCache { - // k/v where entry contains index into `key` - map: HashMap, Entry>, + // a minimum instant to compare ranges against since + // all keys must logically expire after the creation + // of the cache + min_instant: Instant, - // deque ordered in ascending expiration `Instant`s + // k/v where entry contains corresponds to an ordered value in `keys` + map: HashMap, Entry>, + + // ordered in ascending expiration `Instant`s // to support retaining/evicting without full traversal - keys: VecDeque>, + keys: BTreeSet>, pub ttl_millis: u64, pub size_limit: Option, - pub(self) tombstone_count: usize, - pub(self) max_tombstone_limit: usize, } -impl ExpiringSizedCache { +impl ExpiringSizedCache { pub fn new(ttl_millis: u64) -> Self { Self { + min_instant: Instant::now(), map: HashMap::new(), - keys: VecDeque::new(), + keys: BTreeSet::new(), ttl_millis, size_limit: None, - tombstone_count: 0, - max_tombstone_limit: 50, } } pub fn with_capacity(ttl_millis: u64, size: usize) -> Self { let mut new = Self::new(ttl_millis); new.map.reserve(size); - new.keys.reserve(size + new.max_tombstone_limit); new } @@ -131,19 +184,9 @@ impl ExpiringSizedCache { prev } - /// Set the max tombstone limit. When reached, tombstones will be cleared and - /// a full traversal will occur (`O(n)`) to rewrite internal indices - /// Returns the previous value that was set. - pub fn max_tombstone_limit(&mut self, limit: usize) -> usize { - let prev = self.max_tombstone_limit; - self.max_tombstone_limit = limit; - prev - } - /// Increase backing stores with enough capacity to store `more` pub fn reserve(&mut self, more: usize) { self.map.reserve(more); - self.keys.reserve(more); } /// Set ttl millis, return previous value @@ -157,163 +200,104 @@ impl ExpiringSizedCache { /// Returns number of dropped items. pub fn evict(&mut self) -> usize { let cutoff = Instant::now(); - let remove = match self - .keys - .binary_search_by_key(&cutoff, |stamped| stamped.expiry) - { - Ok(mut i) => { - // move past any duplicates - while self.keys[i].expiry == cutoff { - i += 1; - } - i - } - Err(i) => { - // index to insert at, drop those prior - i - } - }; - let mut count = 0; - for stamped in self.keys.iter() { - if count >= remove { - break; - } - if !stamped.tombstone { - self.map.remove(&stamped.key); - count += 1; - } - } - self.entomb_head(remove); - self.check_clear_tombstones(); - count - } + let min = Stamped::bound(self.min_instant); + let max = Stamped::bound(cutoff); + let min = Included(&min); + let max = Excluded(&max); - fn entomb_head(&mut self, remove: usize) { - let mut stamp_index = 0; + let remove = self.keys.range((min, max)).count(); let mut count = 0; - loop { - if count >= remove || stamp_index >= self.keys.len() { - break; - } - let stamped = self.keys.get_mut(stamp_index); - match stamped { + while count < remove { + match self.keys.pop_first() { None => break, Some(stamped) => { - if !stamped.tombstone { - count += 1; - stamped.tombstone = true; - self.tombstone_count += 1; - } - stamp_index += 1; + self.map.remove( + &stamped + .key + .expect("evicting: only artificial bounds are none"), + ); + count += 1; } } } + count } /// Retain only the latest `count` values, dropping the next values to expire. /// If `evict`, then also evict values that have expired. /// Returns number of dropped items. pub fn retain_latest(&mut self, count: usize, evict: bool) -> usize { - let count_index = self.len().saturating_sub(count); + let retain_drop_count = self.len().saturating_sub(count); let remove = if evict { let cutoff = Instant::now(); - match self - .keys - .binary_search_by_key(&cutoff, |stamped| stamped.expiry) - { - Ok(mut i) => { - while self.keys[i].expiry == cutoff { - i += 1; - } - count_index.max(i) - } - Err(i) => count_index.max(i), - } + let min = Stamped::bound(self.min_instant); + let max = Stamped::bound(cutoff); + let min = Included(&min); + let max = Excluded(&max); + let to_evict_count = self.keys.range((min, max)).count(); + retain_drop_count.max(to_evict_count) } else { - count_index + retain_drop_count }; let mut count = 0; - for stamped in self.keys.iter() { - if count >= remove { - break; - } - if !stamped.tombstone { - self.map.remove(&stamped.key); - count += 1; - } - } - self.entomb_head(remove); - self.check_clear_tombstones(); - count - } - - fn should_clear_tombstones(&self) -> bool { - // todo: consider some percentage of `self.size_limit`? - self.tombstone_count > self.max_tombstone_limit - } - - fn check_clear_tombstones(&mut self) -> usize { - if !self.should_clear_tombstones() { - return 0; - } - - let mut cleared = 0; - let mut stamp_index = 0; - loop { - if stamp_index >= self.keys.len() { - break; - } - if self.keys[stamp_index].tombstone { - self.keys - .remove(stamp_index) - .expect("already checked stamped key exists"); - cleared += 1; - self.tombstone_count -= 1; - } else { - if let Some(entry) = self.map.get_mut(&self.keys[stamp_index].key) { - entry.stamp_index = stamp_index; + while count < remove { + match self.keys.pop_first() { + None => break, + Some(stamped) => { + self.map.remove( + &stamped + .key + .expect("retaining: only artificial bounds are none"), + ); + count += 1; } - stamp_index += 1; } } - cleared + count } - /// Remove an entry, returning the value if it was present. - /// Note, the value is not checked for expiry. If returning - /// only non-expired values is desired, run `.evict` prior. + /// Remove an entry, returning an unexpired value if it was present. pub fn remove(&mut self, key: &K) -> Option { match self.map.remove(key) { None => None, Some(removed) => { - if let Some(stamped) = self.keys.get_mut(removed.stamp_index) { - stamped.tombstone = true; - self.tombstone_count += 1; + self.keys.remove(&removed.as_stamped()); + if removed.is_expired() { + None + } else { + Some(removed.value) } - self.check_clear_tombstones(); - Some(removed.value) } } } - /// Insert k/v pair without running eviction logic. If a `size_limit` was specified, the - /// next entry to expire will be evicted to make space. Returns any existing value. - /// Note, the existing value is not checked for expiry. If returning - /// only non-expired values is desired, run `.evict` prior or use `.insert_evict(..., true)` + /// Insert k/v pair without running eviction logic. See `.insert_ttl_evict` pub fn insert(&mut self, key: K, value: V) -> Result, Error> { - self.insert_evict(key, value, false) + self.insert_ttl_evict(key, value, None, false) } - /// Optionally run eviction logic before inserting a k/v pair. If a `size_limit` was specified, - /// next entry to expire will be evicted to make space. Returns any existing value. - /// Note, the existing value is not checked for expiry. If returning - /// only non-expired values is desired, run `.evict` prior or pass `evict = true` + /// Insert k/v pair with explicit ttl. See `.insert_ttl_evict` + pub fn insert_ttl(&mut self, key: K, value: V, ttl_millis: u64) -> Result, Error> { + self.insert_ttl_evict(key, value, Some(ttl_millis), false) + } + + /// Insert k/v pair and run eviction logic. See `.insert_ttl_evict` pub fn insert_evict(&mut self, key: K, value: V, evict: bool) -> Result, Error> { - // todo: allow specifying ttl on individual entries, will require - // inserting stamped-keys in-place instead of pushing to end + self.insert_ttl_evict(key, value, None, evict) + } + /// Optionally run eviction logic before inserting a k/v pair with an optional explicit TTL. + /// If a `size_limit` was specified, the next entry to expire will be evicted to make space. + /// Returns any existing unexpired value. + pub fn insert_ttl_evict( + &mut self, + key: K, + value: V, + ttl_millis: Option, + evict: bool, + ) -> Result, Error> { // optionally evict and retain to size if let Some(size_limit) = self.size_limit { if self.len() > size_limit - 1 { @@ -323,33 +307,31 @@ impl ExpiringSizedCache { self.evict(); } - let key = CacheArc(Arc::new(key)); + let key = CacheArc::new(key); let expiry = Instant::now() - .checked_add(Duration::from_millis(self.ttl_millis)) + .checked_add(Duration::from_millis(ttl_millis.unwrap_or(self.ttl_millis))) .ok_or(Error::TimeBounds)?; - self.keys.push_back(Stamped { - tombstone: false, + let new_stamped = Stamped { expiry, - key: key.clone(), - }); - let stamp_index = self.keys.len() - 1; - let old = self.map.insert( - key, - Entry { - stamp_index, - expiry, - value, - }, - ); + key: Some(key.clone()), + }; + self.keys.insert(new_stamped.clone()); + let old = self.map.insert(key.clone(), Entry { expiry, key, value }); if let Some(old) = &old { - if let Some(old_stamped) = self.keys.get_mut(old.stamp_index) { - old_stamped.tombstone = true; - self.tombstone_count += 1; - self.check_clear_tombstones(); + let old_stamped = old.as_stamped(); + // new-stamped didn't already replace an existing entry, delete it now + if old_stamped != new_stamped { + self.keys.remove(&old_stamped); } } - Ok(old.map(|entry| entry.value)) + Ok(old.and_then(|entry| { + if entry.is_expired() { + None + } else { + Some(entry.value) + } + })) } /// Clear all cache entries. Does not release underlying containers @@ -430,7 +412,6 @@ mod test { assert_eq!(cache.len(), 1); std::thread::sleep(Duration::from_millis(200)); assert_eq!(1, cache.evict()); - assert_eq!(1, cache.tombstone_count); assert!(cache.get(&"a".into()).is_none()); assert_eq!(cache.len(), 0); @@ -444,7 +425,6 @@ mod test { // in size until eviction assert_eq!(cache.len(), 1); assert_eq!(1, cache.retain_latest(1, true)); - assert_eq!(2, cache.tombstone_count); assert!(cache.get(&"a".into()).is_none()); assert_eq!(cache.len(), 0); @@ -455,7 +435,6 @@ mod test { cache.insert("e".to_string(), "e".to_string()).unwrap(); assert_eq!(3, cache.retain_latest(2, false)); assert_eq!(2, cache.len()); - assert_eq!(5, cache.tombstone_count); assert_eq!(cache.get(&"a".into()), None); assert_eq!(cache.get(&"b".into()), None); assert_eq!(cache.get(&"c".into()), None); @@ -467,7 +446,6 @@ mod test { cache.insert("b".to_string(), "b".to_string()).unwrap(); cache.insert("b".to_string(), "b".to_string()).unwrap(); assert_eq!(4, cache.len()); - assert_eq!(7, cache.tombstone_count); assert_eq!(2, cache.retain_latest(2, false)); assert_eq!(cache.get(&"d".into()), None); @@ -475,10 +453,35 @@ mod test { assert_eq!(cache.get(&"a".into()), Some("a".to_string()).as_ref()); assert_eq!(cache.get(&"b".into()), Some("b".to_string()).as_ref()); assert_eq!(2, cache.len()); - assert_eq!(9, cache.tombstone_count); + std::thread::sleep(Duration::from_millis(200)); + assert_eq!(cache.remove(&"a".into()), None); + // trying to get something expired will expire values + assert_eq!(1, cache.len()); + + cache.insert("a".to_string(), "a".to_string()).unwrap(); assert_eq!(cache.remove(&"a".into()), Some("a".to_string())); - assert_eq!(10, cache.tombstone_count); + // we haven't done anything to evict "b" so there's still one entry + assert_eq!(1, cache.len()); + + assert_eq!(1, cache.evict()); + assert_eq!(0, cache.len()); + + // default ttl is 100ms + cache + .insert_ttl("a".to_string(), "a".to_string(), 300) + .unwrap(); + std::thread::sleep(Duration::from_millis(200)); + assert_eq!(cache.get(&"a".into()), Some("a".to_string()).as_ref()); + assert_eq!(1, cache.len()); + + std::thread::sleep(Duration::from_millis(200)); + cache + .insert_ttl_evict("b".to_string(), "b".to_string(), Some(300), true) + .unwrap(); + // a should now be evicted + assert_eq!(1, cache.len()); + assert_eq!(cache.get_borrowed("a"), None); } #[test] @@ -501,16 +504,4 @@ mod test { assert_eq!(cache.get(&"c".into()), Some("C".to_string()).as_ref()); assert_eq!(cache.get(&"a".into()), None); } - - #[test] - fn tombstones() { - let mut cache = ExpiringSizedCache::with_capacity(100, 100); - cache.size_limit(2); - for _ in 0..=cache.max_tombstone_limit { - cache.insert("a".to_string(), "A".to_string()).unwrap(); - } - assert_eq!(cache.tombstone_count, cache.max_tombstone_limit); - cache.insert("a".to_string(), "A".to_string()).unwrap(); - assert_eq!(cache.tombstone_count, 0); - } }