diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 049e34cb5168c0..249899dffd439d 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -144,6 +144,11 @@ impl Default for ClusterConfig { } } +struct QuicConnectionCacheConfig { + client_keypair: Keypair, + staked_nodes: Arc>, +} + pub struct LocalCluster { /// Keypair with funding to participate in the network pub funding_keypair: Keypair, @@ -152,6 +157,8 @@ pub struct LocalCluster { pub validators: HashMap, pub genesis_config: GenesisConfig, pub connection_cache: Arc, + quic_connection_cache_config: Option, + tpu_connection_pool_size: usize, } impl LocalCluster { @@ -191,7 +198,7 @@ impl LocalCluster { pub fn new(config: &mut ClusterConfig, socket_addr_space: SocketAddrSpace) -> Self { assert_eq!(config.validator_configs.len(), config.node_stakes.len()); - let connection_cache = if config.tpu_use_quic { + let quic_connection_cache_config = config.tpu_use_quic.then(|| { let client_keypair = Keypair::new(); let stake = DEFAULT_NODE_STAKE; @@ -201,11 +208,6 @@ impl LocalCluster { validator_config.staked_nodes_overrides = Arc::new(RwLock::new(overrides)); } - assert!( - config.tpu_use_quic, - "no support for staked override forwarding without quic" - ); - let total_stake = config.node_stakes.iter().sum::(); let stakes = HashMap::from([ (client_keypair.pubkey(), stake), @@ -216,19 +218,16 @@ impl LocalCluster { HashMap::::default(), // overrides ))); - Arc::new(ConnectionCache::new_with_client_options( - "connection_cache_local_cluster_quic_staked", - config.tpu_connection_pool_size, - None, - Some((&client_keypair, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))), - Some((&staked_nodes, &client_keypair.pubkey())), - )) - } else { - Arc::new(ConnectionCache::with_udp( - "connection_cache_local_cluster_udp", - config.tpu_connection_pool_size, - )) - }; + QuicConnectionCacheConfig { + client_keypair, + staked_nodes, + } + }); + + let connection_cache = create_connection_cache( + &quic_connection_cache_config, + config.tpu_connection_pool_size, + ); let mut validator_keys = { if let Some(ref keys) = config.validator_keys { @@ -379,6 +378,8 @@ impl LocalCluster { validators, genesis_config, connection_cache, + quic_connection_cache_config, + tpu_connection_pool_size: config.tpu_connection_pool_size, }; let node_pubkey_to_vote_key: HashMap> = keys_in_genesis @@ -973,6 +974,29 @@ impl LocalCluster { } } +fn create_connection_cache( + quic_connection_cache_config: &Option, + tpu_connection_pool_size: usize, +) -> Arc { + if let Some(config) = quic_connection_cache_config { + Arc::new(ConnectionCache::new_with_client_options( + "connection_cache_local_cluster_quic_staked", + tpu_connection_pool_size, + None, + Some(( + &config.client_keypair, + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + )), + Some((&config.staked_nodes, &config.client_keypair.pubkey())), + )) + } else { + Arc::new(ConnectionCache::with_udp( + "connection_cache_local_cluster_udp", + tpu_connection_pool_size, + )) + } +} + impl Cluster for LocalCluster { fn get_node_pubkeys(&self) -> Vec { self.validators.keys().cloned().collect() @@ -1059,6 +1083,14 @@ impl Cluster for LocalCluster { socket_addr_space, ); self.add_node(pubkey, cluster_validator_info); + + // Recreate the connection cache as we are connecting to the nodes + // after restart. It can make connections faster without waiting for + // the existing connections to time out. + self.connection_cache = create_connection_cache( + &self.quic_connection_cache_config, + self.tpu_connection_pool_size, + ); } fn add_node(&mut self, pubkey: &Pubkey, cluster_validator_info: ClusterValidatorInfo) {