diff --git a/Cargo.lock b/Cargo.lock index 3fcd820d43b3e8..037546f38cb5eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3133,6 +3133,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.31" @@ -3311,6 +3317,26 @@ dependencies = [ "tokio", ] +[[package]] +name = "governor" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b" +dependencies = [ + "cfg-if 1.0.4", + "dashmap", + "futures 0.3.31", + "futures-timer", + "no-std-compat", + "nonzero_ext", + "parking_lot 0.12.3", + "portable-atomic", + "quanta", + "rand 0.8.5", + "smallvec", + "spinning_top", +] + [[package]] name = "group" version = "0.13.0" @@ -4726,6 +4752,12 @@ dependencies = [ "memoffset 0.9.1", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "nom" version = "7.0.0" @@ -4737,6 +4769,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "normalize-line-endings" version = "0.3.0" @@ -5583,6 +5621,21 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "quanta" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi 0.3.9", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -5831,6 +5884,15 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "raw-cpuid" +version = "11.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb9ee317cfe3fbd54b36a511efc1edd42e216903c9cd575e686dd68a2ba90d8d" +dependencies = [ + "bitflags 2.9.4", +] + [[package]] name = "rayon" version = "1.11.0" @@ -10880,6 +10942,7 @@ dependencies = [ "dashmap", "futures 0.3.31", "futures-util", + "governor", "histogram", "indexmap 2.11.4", "itertools 0.12.1", @@ -11759,6 +11822,7 @@ dependencies = [ "dashmap", "futures 0.3.31", "futures-util", + "governor", "histogram", "indexmap 2.11.4", "itertools 0.12.1", @@ -12071,6 +12135,15 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "spinning_top" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" +dependencies = [ + "lock_api", +] + [[package]] name = "spki" version = "0.7.3" diff --git a/Cargo.toml b/Cargo.toml index 5bcf9a75cf55ce..d0b9754e10ea9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -273,6 +273,7 @@ gag = "1.0.0" gethostname = "0.2.3" getrandom = "0.3.4" goauth = "0.13.1" +governor = "0.6.3" hex = "0.4.3" hidapi = { version = "2.6.3", default-features = false } histogram = "0.6.9" diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 5aee96515a0ff3..8c9dc37b7ba8bb 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -2380,6 +2380,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.31" @@ -2496,6 +2502,26 @@ dependencies = [ "tokio", ] +[[package]] +name = "governor" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b" +dependencies = [ + "cfg-if 1.0.4", + "dashmap", + "futures 0.3.31", + "futures-timer", + "no-std-compat", + "nonzero_ext", + "parking_lot 0.12.2", + "portable-atomic", + "quanta", + "rand 0.8.5", + "smallvec", + "spinning_top", +] + [[package]] name = "group" version = "0.13.0" @@ -3859,6 +3885,12 @@ dependencies = [ "memoffset 0.9.0", ] +[[package]] +name = "no-std-compat" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" + [[package]] name = "nom" version = "7.1.3" @@ -3869,6 +3901,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nonzero_ext" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" + [[package]] name = "normalize-line-endings" version = "0.3.0" @@ -4576,6 +4614,21 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "quanta" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" +dependencies = [ + "crossbeam-utils", + "libc", + "once_cell", + "raw-cpuid", + "wasi 0.11.0+wasi-snapshot-preview1", + "web-sys", + "winapi 0.3.9", +] + [[package]] name = "quinn" version = "0.11.9" @@ -4766,6 +4819,15 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "raw-cpuid" +version = "11.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb9ee317cfe3fbd54b36a511efc1edd42e216903c9cd575e686dd68a2ba90d8d" +dependencies = [ + "bitflags 2.9.4", +] + [[package]] name = "rayon" version = "1.11.0" @@ -9299,6 +9361,7 @@ dependencies = [ "dashmap", "futures 0.3.31", "futures-util", + "governor", "histogram", "indexmap 2.11.4", "itertools 0.12.1", @@ -10126,6 +10189,15 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "spinning_top" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" +dependencies = [ + "lock_api", +] + [[package]] name = "spki" version = "0.7.3" diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index ff0f4947e3b20b..d183762196a8f5 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -27,6 +27,7 @@ crossbeam-channel = { workspace = true } dashmap = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } +governor = { workspace = true } histogram = { workspace = true } indexmap = { workspace = true } itertools = { workspace = true } diff --git a/streamer/src/nonblocking/connection_rate_limiter.rs b/streamer/src/nonblocking/connection_rate_limiter.rs index f56dab152621fe..205b76c8620ef9 100644 --- a/streamer/src/nonblocking/connection_rate_limiter.rs +++ b/streamer/src/nonblocking/connection_rate_limiter.rs @@ -1,54 +1,74 @@ use { - solana_net_utils::token_bucket::{KeyedRateLimiter, TokenBucket}, - std::net::IpAddr, + governor::{DefaultDirectRateLimiter, DefaultKeyedRateLimiter, Quota, RateLimiter}, + std::{net::IpAddr, num::NonZeroU32}, }; /// Limits the rate of connections per IP address. pub struct ConnectionRateLimiter { - limiter: KeyedRateLimiter, + limiter: DefaultKeyedRateLimiter, } -/// The threshold of the size of the connection rate limiter map. When -/// the map size is above this, we will trigger a cleanup of older -/// entries used by past requests. -const CONNECTION_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD: usize = 100_000; - impl ConnectionRateLimiter { /// Create a new rate limiter per IpAddr. The rate is specified as the count per minute to allow for - /// less frequent connections. Higher limit also allows higher bursts. - /// num_shards controls how many shards are used in the underlying dashmap, - /// should be set >= number of contending threads. - pub fn new(limit_per_minute: u64, num_shards: usize) -> Self { + /// less frequent connections. + pub fn new(limit_per_minute: u64) -> Self { + let quota = + Quota::per_minute(NonZeroU32::new(u32::try_from(limit_per_minute).unwrap()).unwrap()); Self { - limiter: KeyedRateLimiter::new( - CONNECTION_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD, - TokenBucket::new( - limit_per_minute, - limit_per_minute, - limit_per_minute as f64 / 60.0, - ), - num_shards, - ), + limiter: DefaultKeyedRateLimiter::keyed(quota), } } /// Check if the connection from the said `ip` is allowed. - /// Here we assume that only IPs with actual confirmed connections are stored in it, - /// since we should only modify server state once source IP is verified pub fn is_allowed(&self, ip: &IpAddr) -> bool { - // Check if we have records in the rate limiter for the given IP address - match self.limiter.current_tokens(ip) { - Some(r) => r > 0, // we have a record, and rate is not exceeded - None => true, // if we have not seen IP, allow connection request + // Acquire a permit from the rate limiter for the given IP address + if self.limiter.check_key(ip).is_ok() { + debug!("Request from IP {ip:?} allowed"); + true // Request allowed + } else { + debug!("Request from IP {ip:?} blocked"); + false // Request blocked } } - pub fn register_connection(&self, ip: &IpAddr) -> bool { - if self.limiter.consume_tokens(*ip, 1).is_ok() { - debug!("Request from IP {ip:?} allowed"); + /// retain only keys whose rate-limiting start date is within the rate-limiting interval. + /// Otherwise drop them as inactive + pub fn retain_recent(&self) { + self.limiter.retain_recent() + } + + /// Returns the number of "live" keys in the rate limiter. + pub fn len(&self) -> usize { + self.limiter.len() + } + + /// Returns `true` if the rate limiter has no keys in it. + pub fn is_empty(&self) -> bool { + self.limiter.is_empty() + } +} + +/// Connection rate limiter for enforcing connection rates from +/// all clients. +pub struct TotalConnectionRateLimiter { + limiter: DefaultDirectRateLimiter, +} + +impl TotalConnectionRateLimiter { + /// Create a new rate limiter. The rate is specified as the count per second. + pub fn new(limit_per_second: u64) -> Self { + let quota = + Quota::per_second(NonZeroU32::new(u32::try_from(limit_per_second).unwrap()).unwrap()); + Self { + limiter: RateLimiter::direct(quota), + } + } + + /// Check if a connection is allowed. + pub fn is_allowed(&self) -> bool { + if self.limiter.check().is_ok() { true // Request allowed } else { - debug!("Request from IP {ip:?} blocked"); false // Request blocked } } @@ -56,31 +76,81 @@ impl ConnectionRateLimiter { #[cfg(test)] pub mod test { - use {super::*, std::net::Ipv4Addr}; + use { + super::*, + std::{ + net::Ipv4Addr, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::{Duration, Instant}, + }, + }; + + #[tokio::test] + async fn test_total_connection_rate_limiter() { + let limiter = TotalConnectionRateLimiter::new(2); + assert!(limiter.is_allowed()); + assert!(limiter.is_allowed()); + assert!(!limiter.is_allowed()); + } #[tokio::test] async fn test_connection_rate_limiter() { - let limiter = ConnectionRateLimiter::new(3, 4); + let limiter = ConnectionRateLimiter::new(4); let ip1 = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)); assert!(limiter.is_allowed(&ip1)); - assert!(limiter.register_connection(&ip1)); - assert!(limiter.register_connection(&ip1)); assert!(limiter.is_allowed(&ip1)); - assert!(limiter.register_connection(&ip1)); + assert!(limiter.is_allowed(&ip1)); + assert!(limiter.is_allowed(&ip1)); assert!(!limiter.is_allowed(&ip1)); - assert!(!limiter.register_connection(&ip1)); + assert!(limiter.len() == 1); let ip2 = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)); - for _ in 0..100 { - assert!( - limiter.is_allowed(&ip2), - "just checking should not mutate state" - ); - } - assert!(limiter.register_connection(&ip2)); - assert!(limiter.register_connection(&ip2)); assert!(limiter.is_allowed(&ip2)); - assert!(limiter.register_connection(&ip2)); + assert!(limiter.len() == 2); + assert!(limiter.is_allowed(&ip2)); + assert!(limiter.is_allowed(&ip2)); + assert!(limiter.is_allowed(&ip2)); assert!(!limiter.is_allowed(&ip2)); } + + #[test] + fn test_bench_rate_limiter() { + let run_duration = Duration::from_secs(3); + let limiter = Arc::new(ConnectionRateLimiter::new(60 * 100)); + + let accepted = AtomicUsize::new(0); + let rejected = AtomicUsize::new(0); + + let start = Instant::now(); + let ip_pool = 2048; + let expected_total_accepts = (run_duration.as_secs() * 100 * ip_pool) as i64; + let workers = 8; + + std::thread::scope(|scope| { + for _ in 0..workers { + scope.spawn(|| { + for i in 1.. { + if Instant::now() > start + run_duration { + break; + } + let ip = IpAddr::V4(Ipv4Addr::from_bits(i % ip_pool as u32)); + if limiter.is_allowed(&ip) { + accepted.fetch_add(1, Ordering::Relaxed); + } else { + rejected.fetch_add(1, Ordering::Relaxed); + } + } + }); + } + }); + + let acc = accepted.load(Ordering::Relaxed); + let rej = rejected.load(Ordering::Relaxed); + println!("Run complete over {:?} seconds", run_duration.as_secs()); + println!("Accepted: {acc} (target {expected_total_accepts})"); + println!("Rejected: {rej}"); + } } diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index c993501f4daeb4..7338c4283f4e81 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -1,7 +1,7 @@ use { crate::{ nonblocking::{ - connection_rate_limiter::ConnectionRateLimiter, + connection_rate_limiter::{ConnectionRateLimiter, TotalConnectionRateLimiter}, stream_throttle::{ ConnectionStreamCounter, StakedStreamLoadEMA, STREAM_THROTTLING_INTERVAL, STREAM_THROTTLING_INTERVAL_MS, @@ -21,7 +21,6 @@ use { smallvec::SmallVec, solana_keypair::Keypair, solana_measure::measure::Measure, - solana_net_utils::token_bucket::TokenBucket, solana_packet::{Meta, PACKET_DATA_SIZE}, solana_perf::packet::{BytesPacket, BytesPacketBatch, PacketBatch, PACKETS_PER_BATCH}, solana_pubkey::Pubkey, @@ -88,9 +87,12 @@ const CONNECTION_CLOSE_REASON_INVALID_STREAM: &[u8] = b"invalid_stream"; /// Total new connection counts per second. Heuristically taken from /// the default staked and unstaked connection limits. Might be adjusted /// later. -const TOTAL_CONNECTIONS_PER_SECOND: f64 = 2500.0; -/// Max burst of connections above sustained rate to pass through -const MAX_CONNECTION_BURST: u64 = 1000; +const TOTAL_CONNECTIONS_PER_SECOND: u64 = 2500; + +/// The threshold of the size of the connection rate limiter map. When +/// the map size is above this, we will trigger a cleanup of older +/// entries used by past requests. +const CONNECTION_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD: usize = 100_000; /// Timeout for connection handshake. Timer starts once we get Initial from the /// peer, and is canceled when we get a Handshake packet from them. @@ -322,11 +324,8 @@ async fn run_server( ) -> TaskTracker { let rate_limiter = Arc::new(ConnectionRateLimiter::new( quic_server_params.max_connections_per_ipaddr_per_min, - quic_server_params.num_threads.get() * 2, )); - let overall_connection_rate_limiter = Arc::new(TokenBucket::new( - MAX_CONNECTION_BURST, - MAX_CONNECTION_BURST, + let overall_connection_rate_limiter = Arc::new(TotalConnectionRateLimiter::new( TOTAL_CONNECTIONS_PER_SECOND, )); @@ -392,30 +391,14 @@ async fn run_server( .total_incoming_connection_attempts .fetch_add(1, Ordering::Relaxed); - // check overall connection request rate limiter - if overall_connection_rate_limiter.current_tokens() == 0 { - stats - .connection_rate_limited_across_all - .fetch_add(1, Ordering::Relaxed); - debug!( - "Ignoring incoming connection from {} due to overall rate limit.", - incoming.remote_address() - ); - incoming.ignore(); - continue; - } - // then perform per IpAddr rate limiting - if !rate_limiter.is_allowed(&incoming.remote_address().ip()) { - stats - .connection_rate_limited_per_ipaddr - .fetch_add(1, Ordering::Relaxed); - debug!( - "Ignoring incoming connection from {} due to per-IP rate limiting.", - incoming.remote_address() - ); - incoming.ignore(); - continue; + // first do per IpAddr rate limiting + if rate_limiter.len() > CONNECTION_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD { + rate_limiter.retain_recent(); } + stats + .connection_rate_limiter_length + .store(rate_limiter.len(), Ordering::Relaxed); + let Ok(client_connection_tracker) = ClientConnectionTracker::new( stats.clone(), quic_server_params.max_concurrent_connections(), @@ -744,7 +727,7 @@ fn compute_recieve_window( async fn setup_connection( connecting: Connecting, rate_limiter: Arc, - overall_connection_rate_limiter: Arc, + overall_connection_rate_limiter: Arc, client_connection_tracker: ClientConnectionTracker, unstaked_connection_table: Arc>, staked_connection_table: Arc>, @@ -765,10 +748,7 @@ async fn setup_connection( match connecting_result { Ok(new_connection) => { debug!("Got a connection {from:?}"); - // now that we have observed the handshake we can be certain - // that the initiator owns an IP address, we can update rate - // limiters on the server - if !rate_limiter.register_connection(&from.ip()) { + if !rate_limiter.is_allowed(&from.ip()) { debug!("Reject connection from {from:?} -- rate limiting exceeded"); stats .connection_rate_limited_per_ipaddr @@ -779,7 +759,9 @@ async fn setup_connection( ); return; } - if overall_connection_rate_limiter.consume_tokens(1).is_err() { + stats.total_new_connections.fetch_add(1, Ordering::Relaxed); + + if !overall_connection_rate_limiter.is_allowed() { debug!( "Reject connection from {:?} -- total rate limiting exceeded", from.ip() @@ -793,7 +775,6 @@ async fn setup_connection( ); return; } - stats.total_new_connections.fetch_add(1, Ordering::Relaxed); let params = get_connection_stake(&new_connection, &staked_nodes).map_or( NewConnectionHandlerParams::new_unstaked( diff --git a/tpu-client-next/tests/connection_workers_scheduler_test.rs b/tpu-client-next/tests/connection_workers_scheduler_test.rs index dfdcee5e327e65..b05d22a45cab7b 100644 --- a/tpu-client-next/tests/connection_workers_scheduler_test.rs +++ b/tpu-client-next/tests/connection_workers_scheduler_test.rs @@ -588,10 +588,13 @@ async fn test_rate_limiting() { scheduler_cancel.cancel(); let stats = join_scheduler(scheduler_handle).await; + // we get 2 transactions registered as sent (but not acked) because of how QUIC works + // before ratelimiter kicks in. assert!( stats == SendTransactionStatsNonAtomic { - connection_error_timed_out: 1, + successfully_sent: 2, + write_error_connection_lost: 2, ..Default::default() } ); diff --git a/vortexor/Cargo.toml b/vortexor/Cargo.toml index 25b9b75754fd98..83cf237331eeca 100644 --- a/vortexor/Cargo.toml +++ b/vortexor/Cargo.toml @@ -31,6 +31,7 @@ crossbeam-channel = { workspace = true } dashmap = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } +governor = { workspace = true } histogram = { workspace = true } indexmap = { workspace = true } itertools = { workspace = true }