diff --git a/Cargo.lock b/Cargo.lock index 3c9c6dc20a0a0a..cc5bc4d95697fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9395,6 +9395,8 @@ dependencies = [ "anyhow", "bincode", "bytes", + "cfg-if 1.0.3", + "dashmap", "hxdmp", "itertools 0.12.1", "log", @@ -9403,9 +9405,11 @@ dependencies = [ "rand 0.8.5", "serde", "serde_derive", + "shuttle", "socket2 0.6.0", "solana-logger", "solana-serde", + "solana-svm-type-overrides", "tokio", "url 2.5.7", ] diff --git a/net-utils/Cargo.toml b/net-utils/Cargo.toml index 148fca2db7d69f..52ae14e973d7fd 100644 --- a/net-utils/Cargo.toml +++ b/net-utils/Cargo.toml @@ -19,11 +19,14 @@ name = "solana_net_utils" agave-unstable-api = [] default = [] dev-context-only-utils = ["dep:pcap-file", "dep:hxdmp"] +shuttle-test = ["dep:shuttle", "solana-svm-type-overrides/shuttle-test"] [dependencies] anyhow = { workspace = true } bincode = { workspace = true } bytes = { workspace = true } +cfg-if = { workspace = true } +dashmap = { workspace = true, features = ["raw-api"] } hxdmp = { version = "0.2.1", optional = true } itertools = { workspace = true } log = { workspace = true } @@ -32,8 +35,10 @@ pcap-file = { version = "2.0.0", optional = true } rand = { workspace = true } serde = { workspace = true } serde_derive = { workspace = true } +shuttle = { workspace = true, optional = true } socket2 = { workspace = true } solana-serde = { workspace = true } +solana-svm-type-overrides = { workspace = true } tokio = { workspace = true, features = ["full"] } url = { workspace = true } @@ -42,3 +47,7 @@ solana-logger = { workspace = true } [lints] workspace = true + +[[bench]] +name = "token_bucket" +harness = false diff --git a/net-utils/benches/token_bucket.rs b/net-utils/benches/token_bucket.rs new file mode 100644 index 00000000000000..803a373e6eb15f --- /dev/null +++ b/net-utils/benches/token_bucket.rs @@ -0,0 +1,177 @@ +#![allow(clippy::arithmetic_side_effects)] +use { + solana_net_utils::token_bucket::*, + std::{ + net::{IpAddr, Ipv4Addr}, + sync::atomic::{AtomicUsize, Ordering}, + time::{Duration, Instant}, + }, +}; + +fn bench_token_bucket() { + println!("Running bench_token_bucket..."); + let run_duration = Duration::from_secs(5); + let fill_rate = 10000.0; + let request_size = 3; + let target_rate = fill_rate / request_size as f64; + let tb = TokenBucket::new(1, 600, fill_rate); + + let accepted = AtomicUsize::new(0); + let rejected = AtomicUsize::new(0); + + let start = Instant::now(); + let workers = 8; + + std::thread::scope(|scope| { + for _ in 0..workers { + scope.spawn(|| loop { + if start.elapsed() > run_duration { + break; + } + match tb.consume_tokens(request_size) { + Ok(_) => accepted.fetch_add(1, Ordering::Relaxed), + Err(_) => rejected.fetch_add(1, Ordering::Relaxed), + }; + }); + } + // periodically check for races + let jh = scope.spawn(|| loop { + std::thread::sleep(Duration::from_millis(100)); + let elapsed = start.elapsed(); + if elapsed > run_duration { + break; + } + let acc = accepted.load(Ordering::Relaxed); + let rate = acc as f64 / elapsed.as_secs_f64(); + assert!( + tb.current_tokens() < request_size * 2, + "bucket should have no spare tokens" + ); + assert!( + // allow 1% error + (rate - target_rate).abs() < target_rate / 100.0, + "Accepted rate should be about {target_rate}, actual {rate}" + ); + }); + jh.join().expect("Rate checks should pass"); + }); + + let acc = accepted.load(Ordering::Relaxed); + let rej = rejected.load(Ordering::Relaxed); + println!("Run complete over {:?} seconds", run_duration.as_secs()); + println!("Accepted {acc}, Rejected: {rej}"); + println!( + "processed {} requests, {} per second", + acc + rej, + (acc + rej) as f32 / run_duration.as_secs_f32() + ); +} + +fn bench_token_bucket_eviction() { + println!("Running bench_token_bucket_eviction..."); + let run_duration = Duration::from_secs(5); + let target_size = 256; + let tb = TokenBucket::new(1, 60, 100.0); + let mut limiter = KeyedRateLimiter::new(target_size, tb, 8); + // make shrinking more aggressive than default + // since only one worker is shrinking the + // datastructure at any given moment so we do not flake this test + // too hard + limiter.set_shrink_interval(32); + + let accepted = AtomicUsize::new(0); + let rejected = AtomicUsize::new(0); + + let start = Instant::now(); + let ip_pool = 1024; + let workers = 8; + + let max_size = AtomicUsize::new(0); + std::thread::scope(|scope| { + for _ in 0..workers { + scope.spawn(|| { + for i in 1.. { + if Instant::now() > start + run_duration { + break; + } + let ip = IpAddr::V4(Ipv4Addr::from_bits(i % ip_pool as u32)); + if limiter.consume_tokens(ip, 1).is_ok() { + accepted.fetch_add(1, Ordering::Relaxed); + } else { + rejected.fetch_add(1, Ordering::Relaxed); + } + let len_approx = limiter.len_approx(); + max_size.fetch_max(len_approx, Ordering::Relaxed); + } + }); + } + }); + + let acc = accepted.load(Ordering::Relaxed); + let rej = rejected.load(Ordering::Relaxed); + println!("Run complete over {:?} seconds", run_duration.as_secs()); + eprintln!("Max observed size was {}", max_size.load(Ordering::Relaxed)); + assert!( + max_size.load(Ordering::Relaxed) <= target_size * 2, + "Max target size should never be exceeded" + ); + println!( + "processed {} requests, {} per second", + acc + rej, + (acc + rej) as f32 / run_duration.as_secs_f32() + ); + println!("Rejected: {rej}"); +} + +fn bench_keyed_rate_limiter() { + println!("Running bench_keyed_rate_limiter..."); + let run_duration = Duration::from_secs(5); + let tb = TokenBucket::new(1, 60, 100.0); + let limiter = KeyedRateLimiter::new(2048, tb, 8); + + let accepted = AtomicUsize::new(0); + let rejected = AtomicUsize::new(0); + + let start = Instant::now(); + let ip_pool = 2048; + let expected_total_accepts = (run_duration.as_secs() * 100 * ip_pool) as i64; + let workers = 32; + + std::thread::scope(|scope| { + for _ in 0..workers { + scope.spawn(|| { + for i in 1.. { + if Instant::now() > start + run_duration { + break; + } + let ip = IpAddr::V4(Ipv4Addr::from_bits(i % ip_pool as u32)); + if limiter.consume_tokens(ip, 1).is_ok() { + accepted.fetch_add(1, Ordering::Relaxed); + } else { + rejected.fetch_add(1, Ordering::Relaxed); + } + } + }); + } + }); + + let acc = accepted.load(Ordering::Relaxed); + let rej = rejected.load(Ordering::Relaxed); + println!("Run complete over {:?} seconds", run_duration.as_secs()); + println!("Accepted: {acc} (target {expected_total_accepts})"); + println!("Rejected: {rej}"); + println!( + "processed {} requests, {} per second", + acc + rej, + (acc + rej) as f32 / run_duration.as_secs_f32() + ); + assert!(((acc as i64) - expected_total_accepts).abs() < expected_total_accepts / 10); +} + +fn main() { + bench_token_bucket(); + println!("=========="); + bench_token_bucket_eviction(); + println!("=========="); + bench_keyed_rate_limiter(); +} diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index 347557ba092912..b3c9205909ade8 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -14,6 +14,7 @@ mod ip_echo_client; mod ip_echo_server; pub mod multihomed_sockets; pub mod sockets; +pub mod token_bucket; #[cfg(feature = "dev-context-only-utils")] pub mod tooling_for_tests; diff --git a/net-utils/src/token_bucket.rs b/net-utils/src/token_bucket.rs new file mode 100644 index 00000000000000..cedf3c2379c4ec --- /dev/null +++ b/net-utils/src/token_bucket.rs @@ -0,0 +1,480 @@ +//! This module contains [`TokenBucket`], which provides ability to limit +//! rate of certain events, while allowing bursts through. +//! [`KeyedRateLimiter`] allows to rate-limit multiple keyed items, such +//! as connections. +use { + cfg_if::cfg_if, + dashmap::{mapref::entry::Entry, DashMap}, + solana_svm_type_overrides::sync::atomic::{AtomicU64, AtomicUsize, Ordering}, + std::{borrow::Borrow, cmp::Reverse, hash::Hash, time::Instant}, +}; + +/// Enforces a rate limit on the volume of requests per unit time. +/// +/// Instances update the amount of tokens upon access, and thus does not need to +/// be constantly polled to refill. Uses atomics internally so should be +/// relatively cheap to access from many threads +pub struct TokenBucket { + new_tokens_per_us: f64, + max_tokens: u64, + /// bucket creation + base_time: Instant, + tokens: AtomicU64, + /// time of last update in us since base_time + last_update: AtomicU64, + /// time unused in last token creation round + credit_time_us: AtomicU64, +} + +#[cfg(feature = "shuttle-test")] +static TIME_US: AtomicU64 = AtomicU64::new(0); //used to override Instant::now() + +// If changing this impl, make sure to run benches and ensure they do not panic. +// much of the testing is impossible outside of real multithreading in release mode. +impl TokenBucket { + /// Allocate a new TokenBucket + pub fn new(initial_tokens: u64, max_tokens: u64, new_tokens_per_second: f64) -> Self { + assert!( + new_tokens_per_second > 0.0, + "Token bucket can not have zero influx rate" + ); + assert!( + initial_tokens <= max_tokens, + "Can not have more initial tokens than max tokens" + ); + let base_time = Instant::now(); + TokenBucket { + // recompute into us to avoid FP division on every update + new_tokens_per_us: new_tokens_per_second / 1e6, + max_tokens, + tokens: AtomicU64::new(initial_tokens), + last_update: AtomicU64::new(0), + base_time, + credit_time_us: AtomicU64::new(0), + } + } + + /// Return current amount of tokens in the bucket. + /// This may be somewhat inconsistent across threads + /// due to Relaxed atomics. + #[inline] + pub fn current_tokens(&self) -> u64 { + let now = self.time_us(); + self.update_state(now); + self.tokens.load(Ordering::Relaxed) + } + + /// Attempts to consume tokens from bucket. + /// + /// On success, returns Ok(amount of tokens left in the bucket). + /// On failure, returns Err(amount of tokens missing to fill request). + #[inline] + pub fn consume_tokens(&self, request_size: u64) -> Result { + let now = self.time_us(); + self.update_state(now); + match self.tokens.fetch_update( + Ordering::AcqRel, // winner publishes new amount + Ordering::Acquire, // everyone observed correct number + |tokens| { + if tokens >= request_size { + Some(tokens.saturating_sub(request_size)) + } else { + None + } + }, + ) { + Ok(prev) => Ok(prev.saturating_sub(request_size)), + Err(prev) => Err(request_size.saturating_sub(prev)), + } + } + + /// Retrieves monotonic time since bucket creation. + fn time_us(&self) -> u64 { + cfg_if! { + if #[cfg(feature="shuttle-test")] { + TIME_US.load(Ordering::Relaxed) + } else { + let now = Instant::now(); + let elapsed = now.saturating_duration_since(self.base_time); + elapsed.as_micros() as u64 + } + } + } + + /// Updates internal state of the bucket by + /// depositing new tokens (if appropriate) + fn update_state(&self, now: u64) { + // fetch last update time + let last = self.last_update.load(Ordering::SeqCst); + + // If time has not advanced, nothing to do. + if now <= last { + return; + } + + // Try to claim the interval [last, now]. + // If we can not claim it, someone else will claim [last..some other time] when they + // touch the bucket. + // If we can claim interval [last, now], no other thread can credit tokens for it anymore. + // If [last, now] is too short to mint any tokens, spare time will be preserved in credit_time_us. + match self.last_update.compare_exchange( + last, + now, + Ordering::AcqRel, // winner publishes new timestamp + Ordering::Acquire, // loser observes updates + ) { + Ok(_) => { + // This thread won the race and is responsible for minting tokens + let elapsed = now.saturating_sub(last); + + // also add leftovers from previous conversion attempts. + // we do not care about who uses the spare_time_us, so relaxed is ok here. + let elapsed = + elapsed.saturating_add(self.credit_time_us.swap(0, Ordering::Relaxed)); + + let new_tokens_f64 = elapsed as f64 * self.new_tokens_per_us; + + // amount of full tokens to be minted + let new_tokens = new_tokens_f64.floor() as u64; + + let time_to_return = if new_tokens >= 1 { + // Credit tokens, saturating at max_tokens + let _ = self.tokens.fetch_update( + Ordering::AcqRel, // writer publishes new amount + Ordering::Acquire, //we fetch the correct amount + |tokens| Some(tokens.saturating_add(new_tokens).min(self.max_tokens)), + ); + // Fractional remainder of elapsed time (not enough to mint a whole token) + // that will be credited to other minters + (new_tokens_f64.fract() / self.new_tokens_per_us) as u64 + } else { + // No whole tokens minted → return whole interval + elapsed + }; + // Save unused elapsed time for other threads + self.credit_time_us + .fetch_add(time_to_return, Ordering::Relaxed); + } + Err(_) => { + // Another thread advanced last_update first → nothing we can do now. + } + } + } +} + +impl Clone for TokenBucket { + /// Clones the TokenBucket with approximate state + /// of the original. While this will never return an object in an + /// invalid state, using this in a contended environment is not recommended. + fn clone(&self) -> Self { + Self { + new_tokens_per_us: self.new_tokens_per_us, + max_tokens: self.max_tokens, + base_time: self.base_time, + tokens: AtomicU64::new(self.tokens.load(Ordering::Relaxed)), + last_update: AtomicU64::new(self.last_update.load(Ordering::Relaxed)), + credit_time_us: AtomicU64::new(self.credit_time_us.load(Ordering::Relaxed)), + } + } +} + +/// Provides rate limiting for multiple contexts at the same time +/// +/// This can use e.g. IP address as a Key. +/// Internally this is a [DashMap] of [TokenBucket] instances +/// that are created on demand using a prototype [TokenBucket] +/// to copy initial state from. +/// Uses LazyLru logic under the hood to keep the amount of items +/// under control. +pub struct KeyedRateLimiter +where + K: Hash + Eq, +{ + data: DashMap, + target_capacity: usize, + prototype_bucket: TokenBucket, + countdown_to_shrink: AtomicUsize, + approx_len: AtomicUsize, + shrink_interval: usize, +} + +impl KeyedRateLimiter +where + K: Hash + Eq, +{ + /// Creates a new KeyedRateLimiter with a specified taget capacity and shard amount for the + /// underlying DashMap. This uses a LazyLRU style eviction policy, so actual memory consumption + /// will be 2 * target_capacity. + /// + /// shard_amount should be greater than 0 and be a power of two. + /// If a shard_amount which is not a power of two is provided, the function will panic. + #[allow(clippy::arithmetic_side_effects)] + pub fn new(target_capacity: usize, prototype_bucket: TokenBucket, shard_amount: usize) -> Self { + let shrink_interval = target_capacity / 4; + Self { + data: DashMap::with_capacity_and_shard_amount(target_capacity * 2, shard_amount), + target_capacity, + prototype_bucket, + countdown_to_shrink: AtomicUsize::new(shrink_interval), + approx_len: AtomicUsize::new(0), + shrink_interval, + } + } + + /// Fetches amount of tokens available for key. + /// + /// Returns None if no bucket exists for the key provided + #[inline] + pub fn current_tokens(&self, key: impl Borrow) -> Option { + let bucket = self.data.get(key.borrow())?; + Some(bucket.current_tokens()) + } + + /// Consumes request_size tokens from a bucket at given key. + /// + /// On success, returns Ok(amount of tokens left in the bucket) + /// On failure, returns Err(amount of tokens missing to fill request) + /// If no bucket exists at key, a new bucket will be allocated, and normal policy will be applied to it + /// Outdated buckets may be evicted on an LRU basis. + pub fn consume_tokens(&self, key: K, request_size: u64) -> Result { + let (entry_added, res) = { + let bucket = self.data.entry(key); + match bucket { + Entry::Occupied(entry) => (false, entry.get().consume_tokens(request_size)), + Entry::Vacant(entry) => { + // if the key is not in the LRU, we need to allocate a new bucket + let bucket = self.prototype_bucket.clone(); + let res = bucket.consume_tokens(request_size); + entry.insert(bucket); + (true, res) + } + } + }; + + if entry_added { + if let Ok(count) = + self.countdown_to_shrink + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |v| { + if v == 0 { + // reset the countup to starting position + // thus preventing other threads from racing for locks + None + } else { + Some(v.saturating_sub(1)) + } + }) + { + if count == 1 { + // the last "previous" value we will see before counter reaches zero + self.maybe_shrink(); + self.countdown_to_shrink + .store(self.shrink_interval, Ordering::Relaxed); + } + } else { + self.approx_len.fetch_add(1, Ordering::Relaxed); + } + } + res + } + + /// Returns approximate amount of entries in the datastructure. + /// Should be within ~10% of the true amount. + #[inline] + pub fn len_approx(&self) -> usize { + self.approx_len.load(Ordering::Relaxed) + } + + // apply lazy-LRU eviction policy to each DashMap shard. + // Allowing side-effects here since overflows here are not + // actually possible + #[allow(clippy::arithmetic_side_effects)] + fn maybe_shrink(&self) { + let mut actual_len = 0; + let target_shard_size = self.target_capacity / self.data.shards().len(); + let mut entries = Vec::with_capacity(target_shard_size * 2); + for shardlock in self.data.shards() { + let mut shard = shardlock.write(); + + if shard.len() <= target_shard_size * 3 / 2 { + actual_len += shard.len(); + continue; + } + entries.clear(); + entries.extend( + shard.drain().map(|(key, value)| { + (key, value.get().last_update.load(Ordering::SeqCst), value) + }), + ); + + entries.select_nth_unstable_by_key(target_shard_size, |(_, last_update, _)| { + Reverse(*last_update) + }); + + shard.extend( + entries + .drain(..) + .take(target_shard_size) + .map(|(key, _last_update, value)| (key, value)), + ); + debug_assert!(shard.len() <= target_shard_size); + actual_len += shard.len(); + } + self.approx_len.store(actual_len, Ordering::Relaxed); + } + + /// Set the auto-shrink interval. Set to 0 to disable shrinking. + /// During writes we want to check for length, but not too often + /// to reduce probability of lock contention, so keeping this + /// large is good for perf (at cost of memory use) + pub fn set_shrink_interval(&mut self, interval: usize) { + self.shrink_interval = interval; + } + + /// Get the auto-shrink interval. + pub fn shrink_interval(&self) -> usize { + self.shrink_interval + } +} + +#[cfg(test)] +pub mod test { + use { + super::*, + solana_svm_type_overrides::thread, + std::{ + net::{IpAddr, Ipv4Addr}, + time::Duration, + }, + }; + + #[test] + fn test_token_bucket() { + let tb = TokenBucket::new(100, 100, 1000.0); + assert_eq!(tb.current_tokens(), 100); + tb.consume_tokens(50).expect("Bucket is initially full"); + tb.consume_tokens(50) + .expect("We should still have >50 tokens left"); + tb.consume_tokens(50) + .expect_err("There should not be enough tokens now"); + thread::sleep(Duration::from_millis(50)); + assert!( + tb.current_tokens() > 40, + "We should be refilling at ~1 token per millisecond" + ); + assert!( + tb.current_tokens() < 70, + "We should be refilling at ~1 token per millisecond" + ); + tb.consume_tokens(40) + .expect("Bucket should have enough for another request now"); + thread::sleep(Duration::from_millis(120)); + assert_eq!(tb.current_tokens(), 100, "Bucket should not overfill"); + } + #[test] + fn test_keyed_rate_limiter() { + let prototype_bucket = TokenBucket::new(100, 100, 1000.0); + let rl = KeyedRateLimiter::new(8, prototype_bucket, 2); + let ip1 = IpAddr::V4(Ipv4Addr::from_bits(1234)); + let ip2 = IpAddr::V4(Ipv4Addr::from_bits(4321)); + assert_eq!(rl.current_tokens(ip1), None, "Initially no buckets exist"); + rl.consume_tokens(ip1, 50) + .expect("Bucket is initially full"); + rl.consume_tokens(ip1, 50) + .expect("We should still have >50 tokens left"); + rl.consume_tokens(ip1, 50) + .expect_err("There should not be enough tokens now"); + rl.consume_tokens(ip2, 50) + .expect("Bucket is initially full"); + rl.consume_tokens(ip2, 50) + .expect("We should still have >50 tokens left"); + rl.consume_tokens(ip2, 50) + .expect_err("There should not be enough tokens now"); + std::thread::sleep(Duration::from_millis(50)); + assert!( + rl.current_tokens(ip1).unwrap() > 40, + "We should be refilling at ~1 token per millisecond" + ); + assert!( + rl.current_tokens(ip1).unwrap() < 70, + "We should be refilling at ~1 token per millisecond" + ); + rl.consume_tokens(ip1, 40) + .expect("Bucket should have enough for another request now"); + thread::sleep(Duration::from_millis(120)); + assert_eq!( + rl.current_tokens(ip1), + Some(100), + "Bucket should not overfill" + ); + assert_eq!( + rl.current_tokens(ip2), + Some(100), + "Bucket should not overfill" + ); + + rl.consume_tokens(ip2, 100).expect("Bucket should be full"); + // go several times over the capacity of the TB to make sure old record + // is erased no matter in which bucket it lands + for ip in 0..64 { + let ip = IpAddr::V4(Ipv4Addr::from_bits(ip)); + rl.consume_tokens(ip, 50).unwrap(); + } + assert_eq!( + rl.current_tokens(ip1), + None, + "Very old record should have been erased" + ); + rl.consume_tokens(ip2, 100) + .expect("New bucket should have been made for ip2"); + } + + #[cfg(feature = "shuttle-test")] + #[test] + fn shuttle_test_token_bucket_race() { + use shuttle::sync::atomic::AtomicBool; + shuttle::check_random( + || { + TIME_US.store(0, Ordering::SeqCst); + let test_duration_us = 2500; + let run: &AtomicBool = Box::leak(Box::new(AtomicBool::new(true))); + let tb: &TokenBucket = Box::leak(Box::new(TokenBucket::new(10, 20, 5000.0))); + + // time advancement thread + let time_advancer = thread::spawn(move || { + let mut current_time = 0; + while current_time < test_duration_us && run.load(Ordering::SeqCst) { + let increment = 100; // microseconds + current_time += increment; + TIME_US.store(current_time, Ordering::SeqCst); + shuttle::thread::yield_now(); + } + run.store(false, Ordering::SeqCst); + }); + + let threads: Vec<_> = (0..2) + .map(|_| { + thread::spawn(move || { + let mut total = 0; + while run.load(Ordering::SeqCst) { + if tb.consume_tokens(5).is_ok() { + total += 1; + } + shuttle::thread::yield_now(); + } + total + }) + }) + .collect(); + + time_advancer.join().unwrap(); + let received = threads.into_iter().map(|t| t.join().unwrap()).sum(); + + // Initial tokens: 10, refill rate: 5000 tokens/sec (5 tokens/ms) + // In 2ms: 10 + (5 * 2) = 20 tokens total + // Each consumption: 5 tokens → 4 total consumptions expected + assert_eq!(4, received); + }, + 100, + ); + } +} diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index ffd687912ad487..dc2ee06375e108 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -43,7 +43,7 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", "cipher", "cpufeatures", ] @@ -352,7 +352,7 @@ version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", "getrandom 0.2.10", "once_cell", "version_check", @@ -860,7 +860,7 @@ checksum = "4319208da049c43661739c5fade2ba182f09d1dc2299b32298d3a31692b17e12" dependencies = [ "addr2line", "cc", - "cfg-if 1.0.0", + "cfg-if 1.0.3", "libc", "miniz_oxide", "object 0.31.1", @@ -977,7 +977,7 @@ dependencies = [ "arrayref", "arrayvec", "cc", - "cfg-if 1.0.0", + "cfg-if 1.0.3", "constant_time_eq", "digest 0.10.7", ] @@ -1219,9 +1219,9 @@ checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" [[package]] name = "cfg-if" -version = "1.0.0" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +checksum = "2fd1289c04a9ea8cb22300a459a72a385d7c73d3259e2ed7dcb2af674838cfa9" [[package]] name = "cfg_aliases" @@ -1475,7 +1475,7 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6455c0ca19f0d2fbf751b908d5c55c1f5cbc65e03c4225427254b46890bdde1e" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", "crossbeam-epoch", "crossbeam-utils", ] @@ -1486,7 +1486,7 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", "crossbeam-utils", "lazy_static", "memoffset 0.6.4", @@ -1499,7 +1499,7 @@ version = "0.8.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3a430a770ebd84726f584a90ee7f020d28db52c6d02138900f22341f866d39c" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", ] [[package]] @@ -1569,7 +1569,7 @@ version = "4.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fb8b7c4503de7d6ae7b42ab72a5a59857b4c937ec27a3d4539dba95b5ab2be" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", "cpufeatures", "curve25519-dalek-derive", "digest 0.10.7", @@ -1633,7 +1633,7 @@ version = "5.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", "hashbrown 0.14.3", "lock_api", "once_cell", @@ -1797,7 +1797,7 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", "dirs-sys-next", ] @@ -2141,7 +2141,7 @@ version = "3.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef033ed5e9bad94e55838ca0ca906db0e043f517adda0c8b79c7a8c66c93c1b5" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", "rustix 0.38.39", "windows-sys 0.48.0", ] @@ -2426,7 +2426,7 @@ version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", "js-sys", "libc", "wasi 0.11.0+wasi-snapshot-preview1", @@ -2439,7 +2439,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43a49c392881ce6d5c3b8cb70f98717b7c07aabbdff06687b9030dbfbe2725f8" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", "js-sys", "libc", "wasi 0.13.3+wasi-0.2.2", @@ -2497,7 +2497,7 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", "dashmap", "futures 0.3.31", "futures-timer", @@ -2643,7 +2643,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03b876ecf37e86b359573c16c8366bc3eba52b689884a0fc42ba3f67203d2a8b" dependencies = [ "cc", - "cfg-if 1.0.0", + "cfg-if 1.0.3", "libc", "pkg-config", "windows-sys 0.48.0", @@ -3173,7 +3173,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61124eeebbd69b8190558df225adf7e4caafce0d743919e5d6b19652314ec5ec" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", ] [[package]] @@ -3183,7 +3183,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "046fa2d4d00aea763528b4950358d0ead425372445dc8ff86312b3c69ff7727b" dependencies = [ "bitflags 2.9.4", - "cfg-if 1.0.0", + "cfg-if 1.0.3", "libc", ] @@ -3240,7 +3240,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a87aa2bb7d2af34197c04845522473242e1aa17c12f4935d5856491a7fb8c97" dependencies = [ "cesu8", - "cfg-if 1.0.0", + "cfg-if 1.0.3", "combine 4.6.7", "jni-sys", "log", @@ -3411,7 +3411,7 @@ version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f6e3919bbaa2945715f0bb6d3934a173d1e9a59ac23767fbaaef277265a7411b" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", "ecdsa", "elliptic-curve", "once_cell", @@ -3471,7 +3471,7 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b67380fd3b2fbe7527a606e18729d21c6f3951633d0500574c4dc22d2d638b9f" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", "winapi 0.3.9", ] @@ -3798,7 +3798,7 @@ version = "0.11.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c84490118f2ee2d74570d114f3d0493cbf02790df303d2707606c3e14e07c96" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", "downcast", "fragile", "lazy_static", @@ -3813,7 +3813,7 @@ version = "0.11.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22ce75669015c4f47b289fd4d4f56e894e4c96003ffdf3ac51313126f94c6cbb" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", "proc-macro2", "quote", "syn 1.0.109", @@ -3881,7 +3881,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6" dependencies = [ "bitflags 2.9.4", - "cfg-if 1.0.0", + "cfg-if 1.0.3", "cfg_aliases", "libc", "memoffset 0.9.0", @@ -4105,7 +4105,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8505734d46c8ab1e19a1dce3aef597ad87dcb4c37e7188231769bd6bd51cebf8" dependencies = [ "bitflags 2.9.4", - "cfg-if 1.0.0", + "cfg-if 1.0.3", "foreign-types", "libc", "once_cell", @@ -4227,7 +4227,7 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", "instant", "libc", "redox_syscall 0.2.10", @@ -4241,7 +4241,7 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", "libc", "redox_syscall 0.3.5", "smallvec", @@ -4411,7 +4411,7 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d1fe60d06143b2430aa532c94cfe9e29783047f06c0d7fd359a9a51b729fa25" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", "cpufeatures", "opaque-debug", "universal-hash", @@ -5056,7 +5056,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7" dependencies = [ "cc", - "cfg-if 1.0.0", + "cfg-if 1.0.3", "getrandom 0.2.10", "libc", "untrusted", @@ -5486,7 +5486,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "99cd6713db3cf16b6c84e06321e049a9b9f699826e16096d23bbcc44d15d51a6" dependencies = [ "block-buffer 0.9.0", - "cfg-if 1.0.0", + "cfg-if 1.0.3", "cpufeatures", "digest 0.9.0", "opaque-debug", @@ -5498,7 +5498,7 @@ version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", "cpufeatures", "digest 0.10.7", ] @@ -5510,7 +5510,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800" dependencies = [ "block-buffer 0.9.0", - "cfg-if 1.0.0", + "cfg-if 1.0.3", "cpufeatures", "digest 0.9.0", "opaque-debug", @@ -5522,7 +5522,7 @@ version = "0.10.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", "cpufeatures", "digest 0.10.7", ] @@ -7359,6 +7359,8 @@ dependencies = [ "anyhow", "bincode", "bytes", + "cfg-if 1.0.3", + "dashmap", "itertools 0.12.1", "log", "nix", @@ -7367,6 +7369,7 @@ dependencies = [ "serde_derive", "socket2 0.6.0", "solana-serde", + "solana-svm-type-overrides", "tokio", "url 2.5.7", ] @@ -10687,7 +10690,7 @@ version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", "once_cell", ] @@ -11423,7 +11426,7 @@ version = "0.2.101" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e14915cadd45b529bb8d1f343c4ed0ac1de926144b746e2710f9cd05df6603b" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", "once_cell", "rustversion", "wasm-bindgen-macro", @@ -11450,7 +11453,7 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73157efb9af26fb564bb59a009afd1c7c334a44db171d280690d0c3faaec3468" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", "js-sys", "wasm-bindgen", "web-sys", @@ -11921,7 +11924,7 @@ version = "0.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" dependencies = [ - "cfg-if 1.0.0", + "cfg-if 1.0.3", "windows-sys 0.48.0", ] diff --git a/streamer/src/nonblocking/connection_rate_limiter.rs b/streamer/src/nonblocking/connection_rate_limiter.rs index 8680feba923fda..205b76c8620ef9 100644 --- a/streamer/src/nonblocking/connection_rate_limiter.rs +++ b/streamer/src/nonblocking/connection_rate_limiter.rs @@ -3,6 +3,7 @@ use { std::{net::IpAddr, num::NonZeroU32}, }; +/// Limits the rate of connections per IP address. pub struct ConnectionRateLimiter { limiter: DefaultKeyedRateLimiter, } @@ -75,7 +76,17 @@ impl TotalConnectionRateLimiter { #[cfg(test)] pub mod test { - use {super::*, std::net::Ipv4Addr}; + use { + super::*, + std::{ + net::Ipv4Addr, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::{Duration, Instant}, + }, + }; #[tokio::test] async fn test_total_connection_rate_limiter() { @@ -104,4 +115,42 @@ pub mod test { assert!(limiter.is_allowed(&ip2)); assert!(!limiter.is_allowed(&ip2)); } + + #[test] + fn test_bench_rate_limiter() { + let run_duration = Duration::from_secs(3); + let limiter = Arc::new(ConnectionRateLimiter::new(60 * 100)); + + let accepted = AtomicUsize::new(0); + let rejected = AtomicUsize::new(0); + + let start = Instant::now(); + let ip_pool = 2048; + let expected_total_accepts = (run_duration.as_secs() * 100 * ip_pool) as i64; + let workers = 8; + + std::thread::scope(|scope| { + for _ in 0..workers { + scope.spawn(|| { + for i in 1.. { + if Instant::now() > start + run_duration { + break; + } + let ip = IpAddr::V4(Ipv4Addr::from_bits(i % ip_pool as u32)); + if limiter.is_allowed(&ip) { + accepted.fetch_add(1, Ordering::Relaxed); + } else { + rejected.fetch_add(1, Ordering::Relaxed); + } + } + }); + } + }); + + let acc = accepted.load(Ordering::Relaxed); + let rej = rejected.load(Ordering::Relaxed); + println!("Run complete over {:?} seconds", run_duration.as_secs()); + println!("Accepted: {acc} (target {expected_total_accepts})"); + println!("Rejected: {rej}"); + } }