Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod loader_utils;
pub mod non_circulating_supply;
pub mod prioritization_fee;
pub mod prioritization_fee_cache;
mod read_optimized_dashmap;
pub mod rent_collector;
pub mod runtime_config;
pub mod serde_snapshot;
Expand Down
303 changes: 303 additions & 0 deletions runtime/src/read_optimized_dashmap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
#![allow(dead_code)]
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will go away in the next PR


#[cfg(feature = "shuttle-test")]
use shuttle::sync::Arc;
#[cfg(not(feature = "shuttle-test"))]
use std::sync::Arc;
use {
dashmap::{
mapref::{entry::Entry, multiple::RefMulti},
DashMap,
},
std::{hash::BuildHasher, ops::Deref},
};

type DashmapIteratorItem<'a, K, V, S> = RefMulti<'a, K, ROValue<V>, S>;

// Wrapper around dashmap that stores `Arc`s of values to minimize shard contention.
#[derive(Debug)]
pub struct ReadOptimizedDashMap<K, V, S>
where
K: Clone + Eq + std::hash::Hash,
S: Clone + BuildHasher,
{
inner: DashMap<K, ROValue<V>, S>,
}

impl<K, V, S> ReadOptimizedDashMap<K, V, S>
where
K: Clone + Eq + std::hash::Hash,
S: Clone + BuildHasher,
{
pub fn new(inner: DashMap<K, ROValue<V>, S>) -> Self {
Self { inner }
}

/// Alternative to entry(k).or_insert_with(default) that returns a cloned `Arc` instead of
/// returning a guard that holds the underlying shard's write lock.
pub fn get_or_insert_with(&self, k: &K, default: impl FnOnce() -> V) -> ROValue<V> {
match self.inner.get(k) {
Some(v) => ROValue::clone(&*v),
None => ROValue::clone(
self.inner
.entry(k.clone())
.or_insert_with(|| ROValue::new(default()))
.value(),
),
}
Comment thread
bw-solana marked this conversation as resolved.
}

/// Returns an Arc clone of the value corresponding to the key.
pub fn get(&self, k: &K) -> Option<ROValue<V>> {
self.inner.get(k).map(|v| ROValue::clone(&v))
}

pub fn iter(&self) -> impl Iterator<Item = DashmapIteratorItem<K, V, S>> {
self.inner.iter()
}
Comment thread
apfitzge marked this conversation as resolved.

/// Removes the entry if it exists and is not being accessed by any other threads.
///
/// Returns Ok(Some(value)) if the entry was removed, Ok(None) if the entry did not exist, and
/// Err(()) if the entry exists but is being accessed by another thread.
pub fn remove_if_not_accessed(&self, k: &K) -> Result<Option<ROValue<V>>, ()> {
self.remove_if_not_accessed_and(k, |_| true)
}

/// Removes the entry if it exists, it is not being accessed by any other
/// threads and the predicate returns true.
///
/// Returns Ok(Some(value)) if the entry was removed, Ok(None) if the entry did not exist, and
/// Err(()) if the entry exists but is being accessed by another thread.
pub fn remove_if_not_accessed_and(
&self,
k: &K,
pred: impl Fn(&ROValue<V>) -> bool,
) -> Result<Option<ROValue<V>>, ()> {
let entry = self.inner.entry(k.clone());
if let Entry::Occupied(e) = entry {
let v = e.get();
// the entry guard holds a write lock, so checking for strong_count is safe
if pred(v) && !v.shared() {
return Ok(Some(e.remove()));
} else {
return Err(());
}
}
Ok(None)
}

/// Retains only the elements specified by the predicate or that are being
/// accessed by other threads.
pub fn retain_if_accessed_or(&self, mut f: impl FnMut(&K, &mut ROValue<V>) -> bool) {
self.inner.retain(|k, v| v.shared() || f(k, v))
}

/// Retains only the elements specified by the predicate.
///
/// # Safety
///
/// - If called concurrently with other methods that mutate values that are
/// not retained, the modifications may be lost.
pub unsafe fn retain(&self, f: impl FnMut(&K, &mut ROValue<V>) -> bool) {
self.inner.retain(f)
}
Comment on lines +102 to +104
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a dumb question. Could we not make this safe wrt concurrent modification if we did something similar to remove_if_not_accessed_and?

DashMap::retain grabs write locks on each shard, so if we just did something like:

    pub unsafe fn retain(&self, mut f: impl FnMut(&K, &mut ROValue<V>) -> bool) {
        self.inner.retain(move |k, v| !v.shared() && f(k, v))
    }

Obviously this could be done in the passed f, but in this way it is forced.
And actually I think if we do this...then nothing is unsafe anymore? It'd become impossible to mutate values that are concurrently dropped. Since if this goes through, the only way to drop is if no shared references are out. not sure this holds with Weak...if not then could potentially just make ROValue not give access directly to arc, that way being weak is impossible!


If accepting the doc I reccomended for iter we should do that here too.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is effectively what the caller code does. The reason retain itself doesn't do it is that in slot_deltas.retain() you actually want to remove even if accessed, because that happens effectively all the time when snapshots are getting generated, and it's safe because snapshot generation doesn't mutate anything.

But now I'm thinking I could split this into UnsafeReadOptimizedDashMap and ReadOptimizedDashMap and use the former for slot_deltas and the latter for StatusCache::cache.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up leaving it ReadOptimizedDashMap, making everything safe, and just leaving retain as unsafe

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If accepting the doc I reccomended for iter we should do that here too.

does not apply here either


#[cfg(feature = "dev-context-only-utils")]
pub fn clear(&self) {
self.inner.clear();
}
}

/// A value held inside a ReadOptimizedDashMap.
///
/// This type is a wrapper around Arc that allows checking whether there are
/// other strong references to the inner value.
#[derive(Debug, Default, Eq, PartialEq)]
pub struct ROValue<V> {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I get the purpose of this wrapper. Am I missing something? Usually with these wrappers it'll hide or prevent outside mutation or something, but this gives pub access to &inner anyway?

Copy link
Copy Markdown
Author

@alessandrod alessandrod Oct 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this is exactly why I think that splitting PRs is a bad idea 😋

This wrapper is only needed to split PRs, because in a later PR I'm going to make ROValue always use std::sync::Arc even when shuttle is in use, otherwise you get a deadlock in shuttle tests if you yield to the shuttle scheduler while holding a shard lock (this breaks the dashmap assumption that code that holds a shard lock can't re-enter itself).

Then in an even later PR, I'm going to introduce ShuttleMap, which uses shuttle for the shard RwLocks and those can reenter the shuttle scheduler by design.

So yeah, this is basically churn 😋

EDIT:

oh, and the reason for exposing inner() - and another reason for not splitting PRs because you don't see where code is used - is that otherwise I need to leak this whole monstrosity all the way to the accounts-db/snapshot generator.

inner: Arc<V>,
}

impl<V> Clone for ROValue<V> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}

impl<V> ROValue<V> {
fn new(v: V) -> Self {
Self { inner: Arc::new(v) }
}

/// Returns true if there are other strong references to the inner value.
pub fn shared(&self) -> bool {
Arc::strong_count(&self.inner) > 1
}

/// Returns a reference to the inner Arc<V>.
pub fn inner(&self) -> &Arc<V> {
&self.inner
}
}

impl<V> Deref for ROValue<V> {
type Target = V;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

#[cfg(test)]
mod tests {
use {super::*, std::hash::RandomState};

#[test]
fn test_get() {
let map = ReadOptimizedDashMap::new(DashMap::with_hasher_and_shard_amount(
RandomState::default(),
4,
));
let v1 = map.get_or_insert_with(&1, || 10);
assert_eq!(*v1, 10);
assert!(v1.shared());
let v2 = map.get(&1).unwrap();
assert_eq!(*v2, 10);
assert!(v2.shared());
let v3 = map.get(&2);
assert!(v3.is_none());
}

#[test]
fn test_remove_if_not_accessed() {
let map = ReadOptimizedDashMap::new(DashMap::with_hasher_and_shard_amount(
RandomState::default(),
4,
));
let v1 = map.get_or_insert_with(&1, || 10);
assert_eq!(*v1, 10);
// cannot remove while v1 is held
assert!(map.remove_if_not_accessed(&1).is_err());
drop(v1);
// pred returns false
let removed = map.remove_if_not_accessed_and(&1, |_| false);
assert!(removed.is_err());
// can remove now that v1 is dropped
let removed = map.remove_if_not_accessed(&1).unwrap();
assert!(removed.is_some());
assert_eq!(*removed.unwrap(), 10);
// cannot remove non-existent key
let removed = map.remove_if_not_accessed(&1).unwrap();
assert!(removed.is_none());
}

#[test]
fn test_retain_if_accessed_or() {
let map = ReadOptimizedDashMap::new(DashMap::with_hasher_and_shard_amount(
RandomState::default(),
4,
));

let v1 = map.get_or_insert_with(&1, || 10);
let v2 = map.get_or_insert_with(&2, || 20);
drop(v2);
let v3 = map.get_or_insert_with(&3, || 30);
assert_eq!(map.inner.len(), 3);
map.retain_if_accessed_or(|_k, v| **v >= 30);
assert_eq!(map.inner.len(), 2);
drop(v1);
drop(v3);
map.retain_if_accessed_or(|_k, v| **v >= 30);
assert_eq!(map.inner.len(), 1);
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be good to have shuttle test for the drop protection of values when shared. particularly if we rely on that for safety!

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is tested in the next PR in StatusCache itself and I was lazy to add the same test here, but I'll stop being lazy I guess 😋

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added tests for things that can run concurrently.

I didn't add a shuttle test for remove_if_not_accessed because the regular tests already test what we need to test: given an outstanding ref, a key is not removed. A shuttle test doesn't make sense for that since the point of shuttle would be scheduling so that at least some of the time there are no outstanding refs (if drop happens before remove).


#[cfg(all(test, feature = "shuttle-test"))]
mod shuttle_tests {
use {
super::*,
shuttle::{sync::atomic::AtomicU64, thread},
std::{hash::RandomState, sync::atomic::Ordering},
};

const INSERT_RETAIN_RANDOM_ITERATIONS: usize = 200000;
const INSERT_RETAIN_DFS_ITERATIONS: Option<usize> = None;

const CONCURRENT_INSERTS_RANDOM_ITERATIONS: usize = 200000;
const CONCURRENT_INSERTS_DFS_ITERATIONS: Option<usize> = None;

fn do_test_shuttle_concurrent_inserts() {
let map = Arc::new(ReadOptimizedDashMap::new(
DashMap::with_hasher_and_shard_amount(RandomState::default(), 4),
));
let handles = (0..2)
.map(|_| {
let map = Arc::clone(&map);
thread::spawn(move || {
map.get_or_insert_with(&0, || AtomicU64::new(30))
.fetch_add(10, Ordering::Relaxed);
})
})
.collect::<Vec<_>>();
for handle in handles {
handle.join().unwrap();
}
let v = map.get(&0).unwrap();
assert_eq!(v.load(Ordering::Relaxed), 50);
}

#[test]
fn test_shuttle_concurrent_inserts_dfs() {
shuttle::check_dfs(
do_test_shuttle_concurrent_inserts,
CONCURRENT_INSERTS_DFS_ITERATIONS,
);
}

#[test]
fn test_shuttle_concurrent_inserts_random() {
shuttle::check_random(
do_test_shuttle_concurrent_inserts,
CONCURRENT_INSERTS_RANDOM_ITERATIONS,
);
}

fn do_test_shuttle_insert_retain() {
let map = Arc::new(ReadOptimizedDashMap::new(
DashMap::with_hasher_and_shard_amount(RandomState::default(), 4),
));
map.get_or_insert_with(&1, || 10);
let retain_th = thread::spawn({
let map = Arc::clone(&map);
move || {
map.retain_if_accessed_or(|_k, v| **v >= 20);
}
});
let insert_th = thread::spawn({
let map = Arc::clone(&map);
move || {
map.get_or_insert_with(&2, || 20);
}
});
retain_th.join().unwrap();
insert_th.join().unwrap();
assert_eq!(map.get(&1), None);
let v = map.get(&2).unwrap();
assert_eq!(*v, 20);
}

#[test]
fn test_shuttle_insert_retain_dfs() {
shuttle::check_dfs(do_test_shuttle_insert_retain, INSERT_RETAIN_DFS_ITERATIONS);
}

#[test]
fn test_shuttle_insert_retain_random() {
shuttle::check_random(
do_test_shuttle_insert_retain,
INSERT_RETAIN_RANDOM_ITERATIONS,
);
}
}
Loading