Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 98 additions & 69 deletions core/src/cluster_info_vote_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use {
unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Select,
Sender as CrossbeamSender,
},
itertools::izip,
log::*,
solana_gossip::{
cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS},
Expand Down Expand Up @@ -297,10 +296,10 @@ pub struct ClusterInfoVoteListener {
impl ClusterInfoVoteListener {
#[allow(clippy::too_many_arguments)]
pub fn new(
exit: &Arc<AtomicBool>,
exit: Arc<AtomicBool>,
cluster_info: Arc<ClusterInfo>,
verified_packets_sender: CrossbeamSender<Vec<PacketBatch>>,
poh_recorder: &Arc<Mutex<PohRecorder>>,
poh_recorder: Arc<Mutex<PohRecorder>>,
vote_tracker: Arc<VoteTracker>,
bank_forks: Arc<RwLock<BankForks>>,
subscriptions: Arc<RpcSubscriptions>,
Expand All @@ -311,25 +310,26 @@ impl ClusterInfoVoteListener {
bank_notification_sender: Option<BankNotificationSender>,
cluster_confirmed_slot_sender: GossipDuplicateConfirmedSlotsSender,
) -> Self {
let exit_ = exit.clone();

let (verified_vote_label_packets_sender, verified_vote_label_packets_receiver) =
unbounded();
let (verified_vote_transactions_sender, verified_vote_transactions_receiver) = unbounded();
let listen_thread = Builder::new()
.name("solana-cluster_info_vote_listener".to_string())
.spawn(move || {
let _ = Self::recv_loop(
exit_,
&cluster_info,
verified_vote_label_packets_sender,
verified_vote_transactions_sender,
);
})
.unwrap();

let listen_thread = {
let exit = exit.clone();
let bank_forks = bank_forks.clone();
Builder::new()
.name("solana-cluster_info_vote_listener".to_string())
.spawn(move || {
let _ = Self::recv_loop(
exit,
&cluster_info,
&bank_forks,
verified_vote_label_packets_sender,
verified_vote_transactions_sender,
);
})
.unwrap()
};
let exit_ = exit.clone();
let poh_recorder = poh_recorder.clone();
let bank_send_thread = Builder::new()
.name("solana-cluster_info_bank_send".to_string())
.spawn(move || {
Expand All @@ -342,12 +342,11 @@ impl ClusterInfoVoteListener {
})
.unwrap();

let exit_ = exit.clone();
let send_thread = Builder::new()
.name("solana-cluster_info_process_votes".to_string())
.spawn(move || {
let _ = Self::process_votes_loop(
exit_,
exit,
verified_vote_transactions_receiver,
vote_tracker,
bank_forks,
Expand All @@ -367,16 +366,14 @@ impl ClusterInfoVoteListener {
}
}

pub fn join(self) -> thread::Result<()> {
for thread_hdl in self.thread_hdls {
thread_hdl.join()?;
}
Ok(())
pub(crate) fn join(self) -> thread::Result<()> {
self.thread_hdls.into_iter().try_for_each(JoinHandle::join)
}

fn recv_loop(
exit: Arc<AtomicBool>,
cluster_info: &ClusterInfo,
bank_forks: &RwLock<BankForks>,
verified_vote_label_packets_sender: VerifiedLabelVotePacketsSender,
verified_vote_transactions_sender: VerifiedVoteTransactionsSender,
) -> Result<()> {
Expand All @@ -385,7 +382,7 @@ impl ClusterInfoVoteListener {
let votes = cluster_info.get_votes(&mut cursor);
inc_new_counter_debug!("cluster_info_vote_listener-recv_count", votes.len());
if !votes.is_empty() {
let (vote_txs, packets) = Self::verify_votes(votes);
let (vote_txs, packets) = Self::verify_votes(votes, bank_forks);
verified_vote_transactions_sender.send(vote_txs)?;
verified_vote_label_packets_sender.send(packets)?;
}
Expand All @@ -395,43 +392,45 @@ impl ClusterInfoVoteListener {
}

#[allow(clippy::type_complexity)]
fn verify_votes(votes: Vec<Transaction>) -> (Vec<Transaction>, Vec<VerifiedVoteMetadata>) {
fn verify_votes(
votes: Vec<Transaction>,
bank_forks: &RwLock<BankForks>,
) -> (Vec<Transaction>, Vec<VerifiedVoteMetadata>) {
let mut packet_batches = packet::to_packet_batches(&votes, 1);

// Votes should already be filtered by this point.
let reject_non_vote = false;
sigverify::ed25519_verify_cpu(&mut packet_batches, reject_non_vote);

let (vote_txs, vote_metadata) = izip!(votes.into_iter(), packet_batches)
.filter_map(|(vote_tx, packet_batch)| {
let (vote, vote_account_key) = vote_transaction::parse_vote_transaction(&vote_tx)
.and_then(|(vote_account_key, vote, _)| {
if vote.slots().is_empty() {
None
} else {
Some((vote, vote_account_key))
}
})?;

sigverify::ed25519_verify_cpu(&mut packet_batches, /*reject_non_vote=*/ false);
let root_bank = bank_forks.read().unwrap().root_bank();
let epoch_schedule = root_bank.epoch_schedule();
votes
.into_iter()
.zip(packet_batches)
.filter(|(_, packet_batch)| {
// to_packet_batches() above splits into 1 packet long batches
assert_eq!(packet_batch.packets.len(), 1);
if !packet_batch.packets[0].meta.discard {
if let Some(signature) = vote_tx.signatures.first().cloned() {
return Some((
vote_tx,
VerifiedVoteMetadata {
vote_account_key,
vote,
packet_batch,
signature,
},
));
}
!packet_batch.packets[0].meta.discard
})
.filter_map(|(tx, packet_batch)| {
let (vote_account_key, vote, _) = vote_transaction::parse_vote_transaction(&tx)?;
let slot = vote.last_voted_slot()?;
let epoch = epoch_schedule.get_epoch(slot);
let authorized_voter = root_bank
.epoch_stakes(epoch)?
.epoch_authorized_voters()
.get(&vote_account_key)?;
let mut keys = tx.message.account_keys.iter().enumerate();
if !keys.any(|(i, key)| tx.message.is_signer(i) && key == authorized_voter) {
return None;
}
None
let verified_vote_metadata = VerifiedVoteMetadata {
vote_account_key,
vote,
packet_batch,
signature: *tx.signatures.first()?,
};
Some((tx, verified_vote_metadata))
})
.unzip();
(vote_txs, vote_metadata)
.unzip()
}

fn bank_send_loop(
Expand Down Expand Up @@ -558,7 +557,7 @@ impl ClusterInfoVoteListener {
return Ok(());
}

let root_bank = bank_forks.read().unwrap().root_bank().clone();
let root_bank = bank_forks.read().unwrap().root_bank();
if last_process_root.elapsed().as_millis() > DEFAULT_MS_PER_SLOT as u128 {
let unrooted_optimistic_slots = confirmation_verifier
.verify_for_unrooted_optimistic_slots(&root_bank, &blockstore);
Expand Down Expand Up @@ -965,6 +964,7 @@ mod tests {
solana_vote_program::vote_state::Vote,
std::{
collections::BTreeSet,
iter::repeat_with,
sync::{atomic::AtomicU64, Arc},
},
};
Expand Down Expand Up @@ -1817,8 +1817,11 @@ mod tests {
#[test]
fn test_verify_votes_empty() {
solana_logger::setup();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = RwLock::new(BankForks::new(bank));
let votes = vec![];
let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes);
let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes, &bank_forks);
assert!(vote_txs.is_empty());
assert!(packets.is_empty());
}
Expand All @@ -1831,25 +1834,40 @@ mod tests {
assert_eq!(num_packets, ref_value);
}

fn test_vote_tx(hash: Option<Hash>) -> Transaction {
let node_keypair = Keypair::new();
let vote_keypair = Keypair::new();
let auth_voter_keypair = Keypair::new();
fn test_vote_tx(
validator_vote_keypairs: Option<&ValidatorVoteKeypairs>,
hash: Option<Hash>,
) -> Transaction {
let other = ValidatorVoteKeypairs::new_rand();
let validator_vote_keypair = validator_vote_keypairs.unwrap_or(&other);
// TODO authorized_voter_keypair should be different from vote-keypair
// but that is what create_genesis_... currently generates.
vote_transaction::new_vote_transaction(
vec![0],
Hash::default(),
Hash::default(),
&node_keypair,
&vote_keypair,
&auth_voter_keypair,
&validator_vote_keypair.node_keypair,
&validator_vote_keypair.vote_keypair,
&validator_vote_keypair.vote_keypair, // authorized_voter_keypair
hash,
)
}

fn run_test_verify_votes_1_pass(hash: Option<Hash>) {
let vote_tx = test_vote_tx(hash);
let voting_keypairs: Vec<_> = repeat_with(ValidatorVoteKeypairs::new_rand)
.take(10)
.collect();
let GenesisConfigInfo { genesis_config, .. } =
genesis_utils::create_genesis_config_with_vote_accounts(
10_000, // mint_lamports
&voting_keypairs,
vec![100; voting_keypairs.len()], // stakes
);
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = RwLock::new(BankForks::new(bank));
let vote_tx = test_vote_tx(voting_keypairs.first(), hash);
let votes = vec![vote_tx];
let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes);
let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes, &bank_forks);
assert_eq!(vote_txs.len(), 1);
verify_packets_len(&packets, 1);
}
Expand All @@ -1861,11 +1879,22 @@ mod tests {
}

fn run_test_bad_vote(hash: Option<Hash>) {
let vote_tx = test_vote_tx(hash);
let voting_keypairs: Vec<_> = repeat_with(ValidatorVoteKeypairs::new_rand)
.take(10)
.collect();
let GenesisConfigInfo { genesis_config, .. } =
genesis_utils::create_genesis_config_with_vote_accounts(
10_000, // mint_lamports
&voting_keypairs,
vec![100; voting_keypairs.len()], // stakes
);
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = RwLock::new(BankForks::new(bank));
let vote_tx = test_vote_tx(voting_keypairs.first(), hash);
let mut bad_vote = vote_tx.clone();
bad_vote.signatures[0] = Signature::default();
let votes = vec![vote_tx.clone(), bad_vote, vote_tx];
let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes);
let (vote_txs, packets) = ClusterInfoVoteListener::verify_votes(votes, &bank_forks);
assert_eq!(vote_txs.len(), 2);
verify_packets_len(&packets, 2);
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ impl Tpu {
let (verified_gossip_vote_packets_sender, verified_gossip_vote_packets_receiver) =
unbounded();
let cluster_info_vote_listener = ClusterInfoVoteListener::new(
exit,
exit.clone(),
cluster_info.clone(),
verified_gossip_vote_packets_sender,
poh_recorder,
poh_recorder.clone(),
vote_tracker,
bank_forks.clone(),
subscriptions.clone(),
Expand Down