diff --git a/Cargo.lock b/Cargo.lock index 88bae88fec9c27..2752faa29beede 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3097,12 +3097,6 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" -[[package]] -name = "futures-timer" -version = "3.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" - [[package]] name = "futures-util" version = "0.3.31" @@ -3289,26 +3283,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "governor" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b" -dependencies = [ - "cfg-if 1.0.3", - "dashmap", - "futures 0.3.31", - "futures-timer", - "no-std-compat", - "nonzero_ext", - "parking_lot 0.12.3", - "portable-atomic", - "quanta", - "rand 0.8.5", - "smallvec", - "spinning_top", -] - [[package]] name = "group" version = "0.13.0" @@ -4718,12 +4692,6 @@ dependencies = [ "memoffset 0.9.1", ] -[[package]] -name = "no-std-compat" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" - [[package]] name = "nom" version = "7.0.0" @@ -4735,12 +4703,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "nonzero_ext" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" - [[package]] name = "normalize-line-endings" version = "0.3.0" @@ -5586,21 +5548,6 @@ dependencies = [ "syn 2.0.106", ] -[[package]] -name = "quanta" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" -dependencies = [ - "crossbeam-utils", - "libc", - "once_cell", - "raw-cpuid", - "wasi 0.11.0+wasi-snapshot-preview1", - "web-sys", - "winapi 0.3.9", -] - [[package]] name = "quick-error" version = "1.2.3" @@ -5849,15 +5796,6 @@ dependencies = [ "rand_core 0.6.4", ] -[[package]] -name = "raw-cpuid" -version = "11.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb9ee317cfe3fbd54b36a511efc1edd42e216903c9cd575e686dd68a2ba90d8d" -dependencies = [ - "bitflags 2.9.4", -] - [[package]] name = "rayon" version = "1.11.0" @@ -10875,7 +10813,6 @@ dependencies = [ "dashmap", "futures 0.3.31", "futures-util", - "governor", "histogram", "indexmap 2.11.4", "itertools 0.12.1", @@ -11720,7 +11657,6 @@ dependencies = [ "dashmap", "futures 0.3.31", "futures-util", - "governor", "histogram", "indexmap 2.11.4", "itertools 0.12.1", @@ -12045,15 +11981,6 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" -[[package]] -name = "spinning_top" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" -dependencies = [ - "lock_api", -] - [[package]] name = "spki" version = "0.7.3" diff --git a/Cargo.toml b/Cargo.toml index 2c5b8f3f80776e..4c8f3a17fb8d05 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -268,7 +268,6 @@ gag = "1.0.0" gethostname = "0.2.3" getrandom = "0.3.3" goauth = "0.13.1" -governor = "0.6.3" hex = "0.4.3" hidapi = { version = "2.6.3", default-features = false } histogram = "0.6.9" diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 73203dbfe475da..b5d71528589a27 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -2351,12 +2351,6 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" -[[package]] -name = "futures-timer" -version = "3.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" - [[package]] name = "futures-util" version = "0.3.31" @@ -2480,26 +2474,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "governor" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b" -dependencies = [ - "cfg-if 1.0.3", - "dashmap", - "futures 0.3.31", - "futures-timer", - "no-std-compat", - "nonzero_ext", - "parking_lot 0.12.2", - "portable-atomic", - "quanta", - "rand 0.8.5", - "smallvec", - "spinning_top", -] - [[package]] name = "group" version = "0.13.0" @@ -3876,12 +3850,6 @@ dependencies = [ "memoffset 0.9.0", ] -[[package]] -name = "no-std-compat" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" - [[package]] name = "nom" version = "7.1.3" @@ -3892,12 +3860,6 @@ dependencies = [ "minimal-lexical", ] -[[package]] -name = "nonzero_ext" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" - [[package]] name = "normalize-line-endings" version = "0.3.0" @@ -4613,21 +4575,6 @@ dependencies = [ "syn 2.0.87", ] -[[package]] -name = "quanta" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" -dependencies = [ - "crossbeam-utils", - "libc", - "once_cell", - "raw-cpuid", - "wasi 0.11.0+wasi-snapshot-preview1", - "web-sys", - "winapi 0.3.9", -] - [[package]] name = "quinn" version = "0.11.9" @@ -4818,15 +4765,6 @@ dependencies = [ "rand_core 0.6.4", ] -[[package]] -name = "raw-cpuid" -version = "11.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb9ee317cfe3fbd54b36a511efc1edd42e216903c9cd575e686dd68a2ba90d8d" -dependencies = [ - "bitflags 2.9.4", -] - [[package]] name = "rayon" version = "1.11.0" @@ -9344,7 +9282,6 @@ dependencies = [ "dashmap", "futures 0.3.31", "futures-util", - "governor", "histogram", "indexmap 2.11.4", "itertools 0.12.1", @@ -10189,15 +10126,6 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" -[[package]] -name = "spinning_top" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" -dependencies = [ - "lock_api", -] - [[package]] name = "spki" version = "0.7.3" diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 417534a1b500e8..7f0a4f09a303cf 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -26,7 +26,6 @@ crossbeam-channel = { workspace = true } dashmap = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } -governor = { workspace = true } histogram = { workspace = true } indexmap = { workspace = true } itertools = { workspace = true } diff --git a/streamer/src/nonblocking/connection_rate_limiter.rs b/streamer/src/nonblocking/connection_rate_limiter.rs index 205b76c8620ef9..f56dab152621fe 100644 --- a/streamer/src/nonblocking/connection_rate_limiter.rs +++ b/streamer/src/nonblocking/connection_rate_limiter.rs @@ -1,74 +1,54 @@ use { - governor::{DefaultDirectRateLimiter, DefaultKeyedRateLimiter, Quota, RateLimiter}, - std::{net::IpAddr, num::NonZeroU32}, + solana_net_utils::token_bucket::{KeyedRateLimiter, TokenBucket}, + std::net::IpAddr, }; /// Limits the rate of connections per IP address. pub struct ConnectionRateLimiter { - limiter: DefaultKeyedRateLimiter, + limiter: KeyedRateLimiter, } +/// The threshold of the size of the connection rate limiter map. When +/// the map size is above this, we will trigger a cleanup of older +/// entries used by past requests. +const CONNECTION_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD: usize = 100_000; + impl ConnectionRateLimiter { /// Create a new rate limiter per IpAddr. The rate is specified as the count per minute to allow for - /// less frequent connections. - pub fn new(limit_per_minute: u64) -> Self { - let quota = - Quota::per_minute(NonZeroU32::new(u32::try_from(limit_per_minute).unwrap()).unwrap()); + /// less frequent connections. Higher limit also allows higher bursts. + /// num_shards controls how many shards are used in the underlying dashmap, + /// should be set >= number of contending threads. + pub fn new(limit_per_minute: u64, num_shards: usize) -> Self { Self { - limiter: DefaultKeyedRateLimiter::keyed(quota), + limiter: KeyedRateLimiter::new( + CONNECTION_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD, + TokenBucket::new( + limit_per_minute, + limit_per_minute, + limit_per_minute as f64 / 60.0, + ), + num_shards, + ), } } /// Check if the connection from the said `ip` is allowed. + /// Here we assume that only IPs with actual confirmed connections are stored in it, + /// since we should only modify server state once source IP is verified pub fn is_allowed(&self, ip: &IpAddr) -> bool { - // Acquire a permit from the rate limiter for the given IP address - if self.limiter.check_key(ip).is_ok() { - debug!("Request from IP {ip:?} allowed"); - true // Request allowed - } else { - debug!("Request from IP {ip:?} blocked"); - false // Request blocked - } - } - - /// retain only keys whose rate-limiting start date is within the rate-limiting interval. - /// Otherwise drop them as inactive - pub fn retain_recent(&self) { - self.limiter.retain_recent() - } - - /// Returns the number of "live" keys in the rate limiter. - pub fn len(&self) -> usize { - self.limiter.len() - } - - /// Returns `true` if the rate limiter has no keys in it. - pub fn is_empty(&self) -> bool { - self.limiter.is_empty() - } -} - -/// Connection rate limiter for enforcing connection rates from -/// all clients. -pub struct TotalConnectionRateLimiter { - limiter: DefaultDirectRateLimiter, -} - -impl TotalConnectionRateLimiter { - /// Create a new rate limiter. The rate is specified as the count per second. - pub fn new(limit_per_second: u64) -> Self { - let quota = - Quota::per_second(NonZeroU32::new(u32::try_from(limit_per_second).unwrap()).unwrap()); - Self { - limiter: RateLimiter::direct(quota), + // Check if we have records in the rate limiter for the given IP address + match self.limiter.current_tokens(ip) { + Some(r) => r > 0, // we have a record, and rate is not exceeded + None => true, // if we have not seen IP, allow connection request } } - /// Check if a connection is allowed. - pub fn is_allowed(&self) -> bool { - if self.limiter.check().is_ok() { + pub fn register_connection(&self, ip: &IpAddr) -> bool { + if self.limiter.consume_tokens(*ip, 1).is_ok() { + debug!("Request from IP {ip:?} allowed"); true // Request allowed } else { + debug!("Request from IP {ip:?} blocked"); false // Request blocked } } @@ -76,81 +56,31 @@ impl TotalConnectionRateLimiter { #[cfg(test)] pub mod test { - use { - super::*, - std::{ - net::Ipv4Addr, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - time::{Duration, Instant}, - }, - }; - - #[tokio::test] - async fn test_total_connection_rate_limiter() { - let limiter = TotalConnectionRateLimiter::new(2); - assert!(limiter.is_allowed()); - assert!(limiter.is_allowed()); - assert!(!limiter.is_allowed()); - } + use {super::*, std::net::Ipv4Addr}; #[tokio::test] async fn test_connection_rate_limiter() { - let limiter = ConnectionRateLimiter::new(4); + let limiter = ConnectionRateLimiter::new(3, 4); let ip1 = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)); assert!(limiter.is_allowed(&ip1)); + assert!(limiter.register_connection(&ip1)); + assert!(limiter.register_connection(&ip1)); assert!(limiter.is_allowed(&ip1)); - assert!(limiter.is_allowed(&ip1)); - assert!(limiter.is_allowed(&ip1)); + assert!(limiter.register_connection(&ip1)); assert!(!limiter.is_allowed(&ip1)); + assert!(!limiter.register_connection(&ip1)); - assert!(limiter.len() == 1); let ip2 = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)); + for _ in 0..100 { + assert!( + limiter.is_allowed(&ip2), + "just checking should not mutate state" + ); + } + assert!(limiter.register_connection(&ip2)); + assert!(limiter.register_connection(&ip2)); assert!(limiter.is_allowed(&ip2)); - assert!(limiter.len() == 2); - assert!(limiter.is_allowed(&ip2)); - assert!(limiter.is_allowed(&ip2)); - assert!(limiter.is_allowed(&ip2)); + assert!(limiter.register_connection(&ip2)); assert!(!limiter.is_allowed(&ip2)); } - - #[test] - fn test_bench_rate_limiter() { - let run_duration = Duration::from_secs(3); - let limiter = Arc::new(ConnectionRateLimiter::new(60 * 100)); - - let accepted = AtomicUsize::new(0); - let rejected = AtomicUsize::new(0); - - let start = Instant::now(); - let ip_pool = 2048; - let expected_total_accepts = (run_duration.as_secs() * 100 * ip_pool) as i64; - let workers = 8; - - std::thread::scope(|scope| { - for _ in 0..workers { - scope.spawn(|| { - for i in 1.. { - if Instant::now() > start + run_duration { - break; - } - let ip = IpAddr::V4(Ipv4Addr::from_bits(i % ip_pool as u32)); - if limiter.is_allowed(&ip) { - accepted.fetch_add(1, Ordering::Relaxed); - } else { - rejected.fetch_add(1, Ordering::Relaxed); - } - } - }); - } - }); - - let acc = accepted.load(Ordering::Relaxed); - let rej = rejected.load(Ordering::Relaxed); - println!("Run complete over {:?} seconds", run_duration.as_secs()); - println!("Accepted: {acc} (target {expected_total_accepts})"); - println!("Rejected: {rej}"); - } } diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index dab92349c92526..058c4d4ee9811a 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -1,7 +1,7 @@ use { crate::{ nonblocking::{ - connection_rate_limiter::{ConnectionRateLimiter, TotalConnectionRateLimiter}, + connection_rate_limiter::ConnectionRateLimiter, stream_throttle::{ ConnectionStreamCounter, StakedStreamLoadEMA, STREAM_THROTTLING_INTERVAL, STREAM_THROTTLING_INTERVAL_MS, @@ -21,6 +21,7 @@ use { smallvec::SmallVec, solana_keypair::Keypair, solana_measure::measure::Measure, + solana_net_utils::token_bucket::TokenBucket, solana_packet::{Meta, PACKET_DATA_SIZE}, solana_perf::packet::{BytesPacket, BytesPacketBatch, PacketBatch, PACKETS_PER_BATCH}, solana_pubkey::Pubkey, @@ -88,12 +89,9 @@ const CONNECTION_CLOSE_REASON_INVALID_STREAM: &[u8] = b"invalid_stream"; /// Total new connection counts per second. Heuristically taken from /// the default staked and unstaked connection limits. Might be adjusted /// later. -const TOTAL_CONNECTIONS_PER_SECOND: u64 = 2500; - -/// The threshold of the size of the connection rate limiter map. When -/// the map size is above this, we will trigger a cleanup of older -/// entries used by past requests. -const CONNECTION_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD: usize = 100_000; +const TOTAL_CONNECTIONS_PER_SECOND: f64 = 2500.0; +/// Max burst of connections above sustained rate to pass through +const MAX_CONNECTION_BURST: u64 = 1000; /// Timeout for connection handshake. Timer starts once we get Initial from the /// peer, and is canceled when we get a Handshake packet from them. @@ -263,8 +261,11 @@ async fn run_server( ) { let rate_limiter = Arc::new(ConnectionRateLimiter::new( quic_server_params.max_connections_per_ipaddr_per_min, + quic_server_params.num_threads.get() * 2, )); - let overall_connection_rate_limiter = Arc::new(TotalConnectionRateLimiter::new( + let overall_connection_rate_limiter = Arc::new(TokenBucket::new( + MAX_CONNECTION_BURST, + MAX_CONNECTION_BURST, TOTAL_CONNECTIONS_PER_SECOND, )); @@ -341,14 +342,30 @@ async fn run_server( .total_incoming_connection_attempts .fetch_add(1, Ordering::Relaxed); - // first do per IpAddr rate limiting - if rate_limiter.len() > CONNECTION_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD { - rate_limiter.retain_recent(); + // check overall connection request rate limiter + if overall_connection_rate_limiter.current_tokens() == 0 { + stats + .connection_rate_limited_across_all + .fetch_add(1, Ordering::Relaxed); + debug!( + "Ignoring incoming connection from {} due to overall rate limit.", + incoming.remote_address() + ); + incoming.ignore(); + continue; + } + // then perform per IpAddr rate limiting + if !rate_limiter.is_allowed(&incoming.remote_address().ip()) { + stats + .connection_rate_limited_per_ipaddr + .fetch_add(1, Ordering::Relaxed); + debug!( + "Ignoring incoming connection from {} due to per-IP rate limiting.", + incoming.remote_address() + ); + incoming.ignore(); + continue; } - stats - .connection_rate_limiter_length - .store(rate_limiter.len(), Ordering::Relaxed); - let Ok(client_connection_tracker) = ClientConnectionTracker::new( stats.clone(), quic_server_params.max_concurrent_connections(), @@ -670,7 +687,7 @@ fn compute_recieve_window( async fn setup_connection( connecting: Connecting, rate_limiter: Arc, - overall_connection_rate_limiter: Arc, + overall_connection_rate_limiter: Arc, client_connection_tracker: ClientConnectionTracker, unstaked_connection_table: Arc>, staked_connection_table: Arc>, @@ -690,7 +707,10 @@ async fn setup_connection( match connecting_result { Ok(new_connection) => { debug!("Got a connection {from:?}"); - if !rate_limiter.is_allowed(&from.ip()) { + // now that we have observed the handshake we can be certain + // that the initiator owns an IP address, we can update rate + // limiters on the server + if !rate_limiter.register_connection(&from.ip()) { debug!("Reject connection from {from:?} -- rate limiting exceeded"); stats .connection_rate_limited_per_ipaddr @@ -701,9 +721,7 @@ async fn setup_connection( ); return; } - stats.total_new_connections.fetch_add(1, Ordering::Relaxed); - - if !overall_connection_rate_limiter.is_allowed() { + if overall_connection_rate_limiter.consume_tokens(1).is_err() { debug!( "Reject connection from {:?} -- total rate limiting exceeded", from.ip() @@ -717,6 +735,7 @@ async fn setup_connection( ); return; } + stats.total_new_connections.fetch_add(1, Ordering::Relaxed); let params = get_connection_stake(&new_connection, &staked_nodes).map_or( NewConnectionHandlerParams::new_unstaked( diff --git a/tpu-client-next/tests/connection_workers_scheduler_test.rs b/tpu-client-next/tests/connection_workers_scheduler_test.rs index 8a876fb17baf4a..094d76ff1b1e5f 100644 --- a/tpu-client-next/tests/connection_workers_scheduler_test.rs +++ b/tpu-client-next/tests/connection_workers_scheduler_test.rs @@ -588,13 +588,10 @@ async fn test_rate_limiting() { scheduler_cancel.cancel(); let stats = join_scheduler(scheduler_handle).await; - // we get 2 transactions registered as sent (but not acked) because of how QUIC works - // before ratelimiter kicks in. assert!( stats == SendTransactionStatsNonAtomic { - successfully_sent: 2, - write_error_connection_lost: 2, + connection_error_timed_out: 1, ..Default::default() } ); diff --git a/vortexor/Cargo.toml b/vortexor/Cargo.toml index 6ea58785513d47..8d137f7c17d175 100644 --- a/vortexor/Cargo.toml +++ b/vortexor/Cargo.toml @@ -29,7 +29,6 @@ crossbeam-channel = { workspace = true } dashmap = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } -governor = { workspace = true } histogram = { workspace = true } indexmap = { workspace = true } itertools = { workspace = true }