Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 51 additions & 19 deletions local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ impl Default for ClusterConfig {
}
}

struct QuicConnectionCacheConfig {
client_keypair: Keypair,
staked_nodes: Arc<RwLock<StakedNodes>>,
}

pub struct LocalCluster {
/// Keypair with funding to participate in the network
pub funding_keypair: Keypair,
Expand All @@ -152,6 +157,8 @@ pub struct LocalCluster {
pub validators: HashMap<Pubkey, ClusterValidatorInfo>,
pub genesis_config: GenesisConfig,
pub connection_cache: Arc<ConnectionCache>,
quic_connection_cache_config: Option<QuicConnectionCacheConfig>,
tpu_connection_pool_size: usize,
}

impl LocalCluster {
Expand Down Expand Up @@ -191,7 +198,7 @@ impl LocalCluster {
pub fn new(config: &mut ClusterConfig, socket_addr_space: SocketAddrSpace) -> Self {
assert_eq!(config.validator_configs.len(), config.node_stakes.len());

let connection_cache = if config.tpu_use_quic {
let quic_connection_cache_config = config.tpu_use_quic.then(|| {
let client_keypair = Keypair::new();
let stake = DEFAULT_NODE_STAKE;

Expand All @@ -201,11 +208,6 @@ impl LocalCluster {
validator_config.staked_nodes_overrides = Arc::new(RwLock::new(overrides));
}

assert!(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why kill this assert?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assert is useless as the branch condition already cover it.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see "if config.tpu_use_quic"

config.tpu_use_quic,
"no support for staked override forwarding without quic"
);

let total_stake = config.node_stakes.iter().sum::<u64>();
let stakes = HashMap::from([
(client_keypair.pubkey(), stake),
Expand All @@ -216,19 +218,16 @@ impl LocalCluster {
HashMap::<Pubkey, u64>::default(), // overrides
)));

Arc::new(ConnectionCache::new_with_client_options(
"connection_cache_local_cluster_quic_staked",
config.tpu_connection_pool_size,
None,
Some((&client_keypair, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))),
Some((&staked_nodes, &client_keypair.pubkey())),
))
} else {
Arc::new(ConnectionCache::with_udp(
"connection_cache_local_cluster_udp",
config.tpu_connection_pool_size,
))
};
QuicConnectionCacheConfig {
client_keypair,
staked_nodes,
}
});

let connection_cache = create_connection_cache(
&quic_connection_cache_config,
config.tpu_connection_pool_size,
);

let mut validator_keys = {
if let Some(ref keys) = config.validator_keys {
Expand Down Expand Up @@ -379,6 +378,8 @@ impl LocalCluster {
validators,
genesis_config,
connection_cache,
quic_connection_cache_config,
tpu_connection_pool_size: config.tpu_connection_pool_size,
};

let node_pubkey_to_vote_key: HashMap<Pubkey, Arc<Keypair>> = keys_in_genesis
Expand Down Expand Up @@ -973,6 +974,29 @@ impl LocalCluster {
}
}

fn create_connection_cache(
quic_connection_cache_config: &Option<QuicConnectionCacheConfig>,
tpu_connection_pool_size: usize,
) -> Arc<ConnectionCache> {
if let Some(config) = quic_connection_cache_config {
Arc::new(ConnectionCache::new_with_client_options(
"connection_cache_local_cluster_quic_staked",
tpu_connection_pool_size,
None,
Some((
&config.client_keypair,
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
)),
Some((&config.staked_nodes, &config.client_keypair.pubkey())),
))
} else {
Arc::new(ConnectionCache::with_udp(
"connection_cache_local_cluster_udp",
tpu_connection_pool_size,
))
}
}

impl Cluster for LocalCluster {
fn get_node_pubkeys(&self) -> Vec<Pubkey> {
self.validators.keys().cloned().collect()
Expand Down Expand Up @@ -1059,6 +1083,14 @@ impl Cluster for LocalCluster {
socket_addr_space,
);
self.add_node(pubkey, cluster_validator_info);

// Recreate the connection cache as we are connecting to the nodes
// after restart. It can make connections faster without waiting for
// the existing connections to time out.
self.connection_cache = create_connection_cache(
&self.quic_connection_cache_config,
self.tpu_connection_pool_size,
);
}

fn add_node(&mut self, pubkey: &Pubkey, cluster_validator_info: ClusterValidatorInfo) {
Expand Down