Skip to content

Commit ce4a52d

Browse files
authored
TPU Vote using quic -- client side implementation (anza-xyz#3473)
* Vote using QUIC Vote using QUIC send vote packet using the rigth sender removed dup declared functions rebase with master QuicServerParams removed remove_tpu_vote first part of sending votes using quic use quic for vote on client side with connection cache add debug messages turn on quic for vote by default for testing * remove unsed import * turn DEFAULT_VOTE_USE_QUIC to false * Minor fixes * addressed some feedback from Behzad and Brennan * fixed one more merge conflicts
1 parent 5b8489f commit ce4a52d

File tree

15 files changed

+281
-105
lines changed

15 files changed

+281
-105
lines changed

banking-bench/src/main.rs

+8-7
Original file line numberDiff line numberDiff line change
@@ -450,15 +450,16 @@ fn main() {
450450
};
451451
let cluster_info = Arc::new(cluster_info);
452452
let tpu_disable_quic = matches.is_present("tpu_disable_quic");
453-
let connection_cache = match tpu_disable_quic {
454-
false => ConnectionCache::new_quic(
455-
"connection_cache_banking_bench_quic",
456-
DEFAULT_TPU_CONNECTION_POOL_SIZE,
457-
),
458-
true => ConnectionCache::with_udp(
453+
let connection_cache = if tpu_disable_quic {
454+
ConnectionCache::with_udp(
459455
"connection_cache_banking_bench_udp",
460456
DEFAULT_TPU_CONNECTION_POOL_SIZE,
461-
),
457+
)
458+
} else {
459+
ConnectionCache::new_quic(
460+
"connection_cache_banking_bench_quic",
461+
DEFAULT_TPU_CONNECTION_POOL_SIZE,
462+
)
462463
};
463464
let banking_stage = BankingStage::new_num_threads(
464465
block_production_method,

core/src/next_leader.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub(crate) fn upcoming_leader_tpu_vote_sockets(
1616
cluster_info: &ClusterInfo,
1717
poh_recorder: &RwLock<PohRecorder>,
1818
fanout_slots: u64,
19+
protocol: Protocol,
1920
) -> Vec<SocketAddr> {
2021
let upcoming_leaders = {
2122
let poh_recorder = poh_recorder.read().unwrap();
@@ -29,7 +30,7 @@ pub(crate) fn upcoming_leader_tpu_vote_sockets(
2930
.dedup()
3031
.filter_map(|leader_pubkey| {
3132
cluster_info
32-
.lookup_contact_info(&leader_pubkey, |node| node.tpu_vote(Protocol::UDP))?
33+
.lookup_contact_info(&leader_pubkey, |node| node.tpu_vote(protocol))?
3334
.ok()
3435
})
3536
// dedup again since leaders could potentially share the same tpu vote socket

core/src/replay_stage.rs

+57
Original file line numberDiff line numberDiff line change
@@ -4233,6 +4233,7 @@ pub(crate) mod tests {
42334233
},
42344234
crossbeam_channel::unbounded,
42354235
itertools::Itertools,
4236+
solana_client::connection_cache::ConnectionCache,
42364237
solana_entry::entry::{self, Entry},
42374238
solana_gossip::{cluster_info::Node, crds::Cursor},
42384239
solana_ledger::{
@@ -4263,6 +4264,7 @@ pub(crate) mod tests {
42634264
transaction::TransactionError,
42644265
},
42654266
solana_streamer::socket::SocketAddrSpace,
4267+
solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_VOTE_USE_QUIC},
42664268
solana_transaction_status::VersionedTransactionWithStatusMeta,
42674269
solana_vote_program::{
42684270
vote_state::{self, TowerSync, VoteStateVersions},
@@ -7547,11 +7549,25 @@ pub(crate) mod tests {
75477549
let vote_info = voting_receiver
75487550
.recv_timeout(Duration::from_secs(1))
75497551
.unwrap();
7552+
7553+
let connection_cache = if DEFAULT_VOTE_USE_QUIC {
7554+
ConnectionCache::new_quic(
7555+
"connection_cache_vote_quic",
7556+
DEFAULT_TPU_CONNECTION_POOL_SIZE,
7557+
)
7558+
} else {
7559+
ConnectionCache::with_udp(
7560+
"connection_cache_vote_udp",
7561+
DEFAULT_TPU_CONNECTION_POOL_SIZE,
7562+
)
7563+
};
7564+
75507565
crate::voting_service::VotingService::handle_vote(
75517566
&cluster_info,
75527567
&poh_recorder,
75537568
&tower_storage,
75547569
vote_info,
7570+
Arc::new(connection_cache),
75557571
);
75567572

75577573
let mut cursor = Cursor::default();
@@ -7622,12 +7638,27 @@ pub(crate) mod tests {
76227638
let vote_info = voting_receiver
76237639
.recv_timeout(Duration::from_secs(1))
76247640
.unwrap();
7641+
7642+
let connection_cache = if DEFAULT_VOTE_USE_QUIC {
7643+
ConnectionCache::new_quic(
7644+
"connection_cache_vote_quic",
7645+
DEFAULT_TPU_CONNECTION_POOL_SIZE,
7646+
)
7647+
} else {
7648+
ConnectionCache::with_udp(
7649+
"connection_cache_vote_udp",
7650+
DEFAULT_TPU_CONNECTION_POOL_SIZE,
7651+
)
7652+
};
7653+
76257654
crate::voting_service::VotingService::handle_vote(
76267655
&cluster_info,
76277656
&poh_recorder,
76287657
&tower_storage,
76297658
vote_info,
7659+
Arc::new(connection_cache),
76307660
);
7661+
76317662
let votes = cluster_info.get_votes(&mut cursor);
76327663
assert_eq!(votes.len(), 1);
76337664
let vote_tx = &votes[0];
@@ -7705,11 +7736,24 @@ pub(crate) mod tests {
77057736
let vote_info = voting_receiver
77067737
.recv_timeout(Duration::from_secs(1))
77077738
.unwrap();
7739+
let connection_cache = if DEFAULT_VOTE_USE_QUIC {
7740+
ConnectionCache::new_quic(
7741+
"connection_cache_vote_quic",
7742+
DEFAULT_TPU_CONNECTION_POOL_SIZE,
7743+
)
7744+
} else {
7745+
ConnectionCache::with_udp(
7746+
"connection_cache_vote_udp",
7747+
DEFAULT_TPU_CONNECTION_POOL_SIZE,
7748+
)
7749+
};
7750+
77087751
crate::voting_service::VotingService::handle_vote(
77097752
&cluster_info,
77107753
&poh_recorder,
77117754
&tower_storage,
77127755
vote_info,
7756+
Arc::new(connection_cache),
77137757
);
77147758

77157759
assert!(last_vote_refresh_time.last_refresh_time > clone_refresh_time);
@@ -7820,11 +7864,24 @@ pub(crate) mod tests {
78207864
let vote_info = voting_receiver
78217865
.recv_timeout(Duration::from_secs(1))
78227866
.unwrap();
7867+
let connection_cache = if DEFAULT_VOTE_USE_QUIC {
7868+
ConnectionCache::new_quic(
7869+
"connection_cache_vote_quic",
7870+
DEFAULT_TPU_CONNECTION_POOL_SIZE,
7871+
)
7872+
} else {
7873+
ConnectionCache::with_udp(
7874+
"connection_cache_vote_udp",
7875+
DEFAULT_TPU_CONNECTION_POOL_SIZE,
7876+
)
7877+
};
7878+
78237879
crate::voting_service::VotingService::handle_vote(
78247880
cluster_info,
78257881
poh_recorder,
78267882
tower_storage,
78277883
vote_info,
7884+
Arc::new(connection_cache),
78287885
);
78297886

78307887
let votes = cluster_info.get_votes(cursor);

core/src/tvu.rs

+16
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ impl Tvu {
163163
cluster_slots: Arc<ClusterSlots>,
164164
wen_restart_repair_slots: Option<Arc<RwLock<Vec<Slot>>>>,
165165
slot_status_notifier: Option<SlotStatusNotifier>,
166+
vote_connection_cache: Arc<ConnectionCache>,
166167
) -> Result<Self, String> {
167168
let in_wen_restart = wen_restart_repair_slots.is_some();
168169

@@ -331,6 +332,7 @@ impl Tvu {
331332
cluster_info.clone(),
332333
poh_recorder.clone(),
333334
tower_storage,
335+
vote_connection_cache,
334336
);
335337

336338
let warm_quic_cache_service = connection_cache.and_then(|connection_cache| {
@@ -436,6 +438,7 @@ pub mod tests {
436438
solana_runtime::bank::Bank,
437439
solana_sdk::signature::{Keypair, Signer},
438440
solana_streamer::socket::SocketAddrSpace,
441+
solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_VOTE_USE_QUIC},
439442
std::sync::atomic::{AtomicU64, Ordering},
440443
};
441444

@@ -494,6 +497,18 @@ pub mod tests {
494497
} else {
495498
None
496499
};
500+
let connection_cache = if DEFAULT_VOTE_USE_QUIC {
501+
ConnectionCache::new_quic(
502+
"connection_cache_vote_quic",
503+
DEFAULT_TPU_CONNECTION_POOL_SIZE,
504+
)
505+
} else {
506+
ConnectionCache::with_udp(
507+
"connection_cache_vote_udp",
508+
DEFAULT_TPU_CONNECTION_POOL_SIZE,
509+
)
510+
};
511+
497512
let tvu = Tvu::new(
498513
&vote_keypair.pubkey(),
499514
Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])),
@@ -555,6 +570,7 @@ pub mod tests {
555570
cluster_slots,
556571
wen_restart_repair_slots,
557572
None,
573+
Arc::new(connection_cache),
558574
)
559575
.expect("assume success");
560576
if enable_wen_restart {

core/src/validator.rs

+83-33
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,20 @@ struct TransactionHistoryServices {
476476
cache_block_meta_service: Option<CacheBlockMetaService>,
477477
}
478478

479+
/// A struct easing passing Validator TPU Configurations
480+
pub struct ValidatorTpuConfig {
481+
/// Controls if to use QUIC for sending regular TPU transaction
482+
pub use_quic: bool,
483+
/// Controls if to use QUIC for sending TPU votes
484+
pub vote_use_quic: bool,
485+
/// Controls the connection cache pool size
486+
pub tpu_connection_pool_size: usize,
487+
/// Controls if to enable UDP for TPU tansactions.
488+
pub tpu_enable_udp: bool,
489+
/// Controls the new maximum connections per IpAddr per minute
490+
pub tpu_max_connections_per_ipaddr_per_minute: u64,
491+
}
492+
479493
pub struct Validator {
480494
validator_exit: Arc<RwLock<Exit>>,
481495
json_rpc_service: Option<JsonRpcService>,
@@ -528,12 +542,17 @@ impl Validator {
528542
rpc_to_plugin_manager_receiver: Option<Receiver<GeyserPluginManagerRequest>>,
529543
start_progress: Arc<RwLock<ValidatorStartProgress>>,
530544
socket_addr_space: SocketAddrSpace,
531-
use_quic: bool,
532-
tpu_connection_pool_size: usize,
533-
tpu_enable_udp: bool,
534-
tpu_max_connections_per_ipaddr_per_minute: u64,
545+
tpu_config: ValidatorTpuConfig,
535546
admin_rpc_service_post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
536547
) -> Result<Self> {
548+
let ValidatorTpuConfig {
549+
use_quic,
550+
vote_use_quic,
551+
tpu_connection_pool_size,
552+
tpu_enable_udp,
553+
tpu_max_connections_per_ipaddr_per_minute,
554+
} = tpu_config;
555+
537556
let start_time = Instant::now();
538557

539558
// Initialize the global rayon pool first to ensure the value in config
@@ -990,29 +1009,52 @@ impl Validator {
9901009

9911010
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
9921011

993-
let connection_cache = match use_quic {
994-
true => {
995-
let connection_cache = ConnectionCache::new_with_client_options(
996-
"connection_cache_tpu_quic",
997-
tpu_connection_pool_size,
998-
None,
999-
Some((
1000-
&identity_keypair,
1001-
node.info
1002-
.tpu(Protocol::UDP)
1003-
.map_err(|err| {
1004-
ValidatorError::Other(format!("Invalid TPU address: {err:?}"))
1005-
})?
1006-
.ip(),
1007-
)),
1008-
Some((&staked_nodes, &identity_keypair.pubkey())),
1009-
);
1010-
Arc::new(connection_cache)
1011-
}
1012-
false => Arc::new(ConnectionCache::with_udp(
1012+
let connection_cache = if use_quic {
1013+
let connection_cache = ConnectionCache::new_with_client_options(
1014+
"connection_cache_tpu_quic",
1015+
tpu_connection_pool_size,
1016+
None,
1017+
Some((
1018+
&identity_keypair,
1019+
node.info
1020+
.tpu(Protocol::UDP)
1021+
.map_err(|err| {
1022+
ValidatorError::Other(format!("Invalid TPU address: {err:?}"))
1023+
})?
1024+
.ip(),
1025+
)),
1026+
Some((&staked_nodes, &identity_keypair.pubkey())),
1027+
);
1028+
Arc::new(connection_cache)
1029+
} else {
1030+
Arc::new(ConnectionCache::with_udp(
10131031
"connection_cache_tpu_udp",
10141032
tpu_connection_pool_size,
1015-
)),
1033+
))
1034+
};
1035+
1036+
let vote_connection_cache = if vote_use_quic {
1037+
let vote_connection_cache = ConnectionCache::new_with_client_options(
1038+
"connection_cache_vote_quic",
1039+
tpu_connection_pool_size,
1040+
None, // client_endpoint
1041+
Some((
1042+
&identity_keypair,
1043+
node.info
1044+
.tpu_vote(Protocol::QUIC)
1045+
.map_err(|err| {
1046+
ValidatorError::Other(format!("Invalid TPU Vote address: {err:?}"))
1047+
})?
1048+
.ip(),
1049+
)),
1050+
Some((&staked_nodes, &identity_keypair.pubkey())),
1051+
);
1052+
Arc::new(vote_connection_cache)
1053+
} else {
1054+
Arc::new(ConnectionCache::with_udp(
1055+
"connection_cache_vote_udp",
1056+
tpu_connection_pool_size,
1057+
))
10161058
};
10171059

10181060
let rpc_override_health_check =
@@ -1428,6 +1470,7 @@ impl Validator {
14281470
cluster_slots.clone(),
14291471
wen_restart_repair_slots.clone(),
14301472
slot_status_notifier,
1473+
vote_connection_cache,
14311474
)
14321475
.map_err(ValidatorError::Other)?;
14331476

@@ -2715,6 +2758,7 @@ mod tests {
27152758
solana_sdk::{genesis_config::create_genesis_config, poh_config::PohConfig},
27162759
solana_tpu_client::tpu_client::{
27172760
DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC,
2761+
DEFAULT_VOTE_USE_QUIC,
27182762
},
27192763
std::{fs::remove_dir_all, thread, time::Duration},
27202764
};
@@ -2753,10 +2797,13 @@ mod tests {
27532797
None, // rpc_to_plugin_manager_receiver
27542798
start_progress.clone(),
27552799
SocketAddrSpace::Unspecified,
2756-
DEFAULT_TPU_USE_QUIC,
2757-
DEFAULT_TPU_CONNECTION_POOL_SIZE,
2758-
DEFAULT_TPU_ENABLE_UDP,
2759-
32, // max connections per IpAddr per minute for test
2800+
ValidatorTpuConfig {
2801+
use_quic: DEFAULT_TPU_USE_QUIC,
2802+
vote_use_quic: DEFAULT_VOTE_USE_QUIC,
2803+
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
2804+
tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP,
2805+
tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute for test
2806+
},
27602807
Arc::new(RwLock::new(None)),
27612808
)
27622809
.expect("assume successful validator start");
@@ -2972,10 +3019,13 @@ mod tests {
29723019
None, // rpc_to_plugin_manager_receiver
29733020
Arc::new(RwLock::new(ValidatorStartProgress::default())),
29743021
SocketAddrSpace::Unspecified,
2975-
DEFAULT_TPU_USE_QUIC,
2976-
DEFAULT_TPU_CONNECTION_POOL_SIZE,
2977-
DEFAULT_TPU_ENABLE_UDP,
2978-
32, // max connections per IpAddr per minute for test
3022+
ValidatorTpuConfig {
3023+
use_quic: DEFAULT_TPU_USE_QUIC,
3024+
vote_use_quic: DEFAULT_VOTE_USE_QUIC,
3025+
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
3026+
tpu_enable_udp: DEFAULT_TPU_ENABLE_UDP,
3027+
tpu_max_connections_per_ipaddr_per_minute: 32, // max connections per IpAddr per minute for test
3028+
},
29793029
Arc::new(RwLock::new(None)),
29803030
)
29813031
.expect("assume successful validator start")

0 commit comments

Comments
 (0)