From ccff6a13374eed5a04bd960ce1c1fa174c05b04b Mon Sep 17 00:00:00 2001 From: Carl Date: Thu, 13 Feb 2020 01:30:38 -0800 Subject: [PATCH 01/16] Connect vote listener and replay stage --- core/src/cluster_info_vote_listener.rs | 76 ++++++++++++++++++++++++-- core/src/replay_stage.rs | 12 +--- core/src/tpu.rs | 4 +- core/src/tvu.rs | 2 + core/src/validator.rs | 2 + 5 files changed, 80 insertions(+), 16 deletions(-) diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index ce9882718b7d99..863df006a039da 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -1,15 +1,21 @@ use crate::cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}; use crate::packet::Packets; use crate::poh_recorder::PohRecorder; -use crate::result::Result; +use crate::result::{Error, Result}; use crate::{packet, sigverify}; -use crossbeam_channel::Sender as CrossbeamSender; +use crossbeam_channel::{ + unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender, +}; use solana_metrics::inc_new_counter_debug; +use solana_sdk::{clock::Slot, pubkey::Pubkey, transaction::Transaction}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::thread::{self, sleep, Builder, JoinHandle}; use std::time::Duration; +pub type VoteSender = CrossbeamSender<(Pubkey, Vec)>; +pub type VoteReceiver = CrossbeamReceiver<(Pubkey, Vec)>; + pub struct ClusterInfoVoteListener { thread_hdls: Vec>, } @@ -21,10 +27,12 @@ impl ClusterInfoVoteListener { sigverify_disabled: bool, sender: CrossbeamSender>, poh_recorder: &Arc>, + vote_sender: VoteSender, ) -> Self { let exit = exit.clone(); let poh_recorder = poh_recorder.clone(); - let thread = Builder::new() + let (vote_tx_sender, vote_tx_receiver) = unbounded(); + let listen_thread = Builder::new() .name("solana-cluster_info_vote_listener".to_string()) .spawn(move || { let _ = Self::recv_loop( @@ -32,19 +40,30 @@ impl ClusterInfoVoteListener { &cluster_info, sigverify_disabled, &sender, + vote_tx_sender, poh_recorder, ); }) .unwrap(); + + let send_thread = Builder::new() + .name("solana-cluster_info_vote_sender".to_string()) + .spawn(move || { + let _ = Self::send_loop(exit, vote_tx_receiver, vote_sender); + }) + .unwrap(); + Self { - thread_hdls: vec![thread], + thread_hdls: vec![listen_thread, send_thread], } } + fn recv_loop( exit: Arc, cluster_info: &Arc>, sigverify_disabled: bool, - sender: &CrossbeamSender>, + packets_sender: &CrossbeamSender>, + vote_tx_sender: CrossbeamSender, poh_recorder: Arc>, ) -> Result<()> { loop { @@ -64,14 +83,59 @@ impl ClusterInfoVoteListener { } else { sigverify::ed25519_verify_cpu(&msgs) }; + assert_eq!( + r.iter().map(|packets_results| packets_results.len()).sum(), + votes.len(0) + ); + for (vote_tx, result) in votes.zip(r.iter().flatten()) { + if result != 0 { + vote_tx_sender.send(vote_tx); + } + } sigverify::mark_disabled(&mut msgs, &r); - sender.send(msgs)?; + packets_sender.send(msgs)?; } } sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); } } + fn send_loop( + exit: Arc, + vote_tx_receiver: CrossbeamReceiver, + vote_sender: VoteSender, + ) -> Result<()> { + loop { + if exit.load(Ordering::Relaxed) { + return Ok(()); + } + if let Err(e) = Self::run_deserialize_and_send_vote(&vote_tx_receiver, &vote_sender) { + match e { + Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => { + return Ok(()); + } + Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout) => (), + _ => { + error!("thread {:?} error {:?}", thread::current().name(), e); + } + } + } + } + } + + fn run_deserialize_and_send_vote( + vote_tx_receiver: &CrossbeamReceiver, + vote_sender: &VoteSender, + ) -> Result<()> { + let timer = Duration::from_millis(200); + let tx = vote_tx_receiver.recv_timeout(timer)?; + while let Ok(shred) = vote_tx_receiver.try_recv() { + vote_sender.send((0, Pubkey::default())); + } + + Ok(()) + } + pub fn join(self) -> thread::Result<()> { for thread_hdl in self.thread_hdls { thread_hdl.join()?; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 4d2e0f1cd52bd3..a533d5693179fe 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -2,11 +2,11 @@ use crate::{ cluster_info::ClusterInfo, + cluster_info_vote_listener::VoteReceiver, commitment::{AggregateCommitmentService, BlockCommitmentCache, CommitmentAggregationData}, consensus::{StakeLockout, Tower}, poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, result::Result, - rewards_recorder_service::RewardsRecorderSender, rpc_subscriptions::RpcSubscriptions, }; use solana_ledger::{ @@ -86,7 +86,6 @@ pub struct ReplayStageConfig { pub snapshot_package_sender: Option, pub block_commitment_cache: Arc>, pub transaction_status_sender: Option, - pub rewards_recorder_sender: Option, } pub struct ReplayStage { @@ -176,6 +175,7 @@ impl ReplayStage { cluster_info: Arc>, ledger_signal_receiver: Receiver, poh_recorder: Arc>, + vote_receiver: VoteReceiver, ) -> (Self, Receiver>>) { let ReplayStageConfig { my_pubkey, @@ -189,7 +189,6 @@ impl ReplayStage { snapshot_package_sender, block_commitment_cache, transaction_status_sender, - rewards_recorder_sender, } = config; let (root_bank_sender, root_bank_receiver) = channel(); @@ -236,7 +235,6 @@ impl ReplayStage { &bank_forks, &leader_schedule_cache, &subscriptions, - rewards_recorder_sender.clone(), ); Self::report_memory(&allocated, "generate_new_bank_forks", start); @@ -440,7 +438,6 @@ impl ReplayStage { &poh_recorder, &leader_schedule_cache, &subscriptions, - rewards_recorder_sender.clone(), ); if let Some(bank) = poh_recorder.lock().unwrap().bank() { @@ -522,7 +519,6 @@ impl ReplayStage { poh_recorder: &Arc>, leader_schedule_cache: &Arc, subscriptions: &Arc, - rewards_recorder_sender: Option, ) { // all the individual calls to poh_recorder.lock() are designed to // increase granularity, decrease contention @@ -1186,7 +1182,6 @@ impl ReplayStage { forks_lock: &RwLock, leader_schedule_cache: &Arc, subscriptions: &Arc, - rewards_recorder_sender: Option, ) { // Find the next slot that chains to the old slot let forks = forks_lock.read().unwrap(); @@ -1335,6 +1330,7 @@ pub(crate) mod tests { struct ForkSelectionResponse { slot: u64, is_locked_out: bool, + is_available: bool, } fn simulate_fork_selection( @@ -1630,7 +1626,6 @@ pub(crate) mod tests { &bank_forks, &leader_schedule_cache, &subscriptions, - None, ); assert!(bank_forks.read().unwrap().get(1).is_some()); @@ -1643,7 +1638,6 @@ pub(crate) mod tests { &bank_forks, &leader_schedule_cache, &subscriptions, - None, ); assert!(bank_forks.read().unwrap().get(1).is_some()); assert!(bank_forks.read().unwrap().get(2).is_some()); diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 62d1c9961c3fb3..05efae36655a7c 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -5,7 +5,7 @@ use crate::{ banking_stage::BankingStage, broadcast_stage::{BroadcastStage, BroadcastStageType}, cluster_info::ClusterInfo, - cluster_info_vote_listener::ClusterInfoVoteListener, + cluster_info_vote_listener::{ClusterInfoVoteListener, VoteSender}, fetch_stage::FetchStage, poh_recorder::{PohRecorder, WorkingBankEntry}, sigverify::TransactionSigVerifier, @@ -46,6 +46,7 @@ impl Tpu { broadcast_type: &BroadcastStageType, exit: &Arc, shred_version: u16, + vote_sender: VoteSender, ) -> Self { let (packet_sender, packet_receiver) = channel(); let fetch_stage = FetchStage::new_with_sender( @@ -72,6 +73,7 @@ impl Tpu { sigverify_disabled, verified_vote_sender, &poh_recorder, + vote_sender, ); let banking_stage = BankingStage::new( diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 913eb0be7a541b..e986813f2dc891 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -4,6 +4,7 @@ use crate::{ blockstream_service::BlockstreamService, cluster_info::ClusterInfo, + cluster_info_vote_listener::VoteReceiver, commitment::BlockCommitmentCache, ledger_cleanup_service::LedgerCleanupService, poh_recorder::PohRecorder, @@ -171,6 +172,7 @@ impl Tvu { cluster_info.clone(), ledger_signal_receiver, poh_recorder.clone(), + vote_receiver, ); let blockstream_service = if let Some(blockstream_unix_socket) = blockstream_unix_socket { diff --git a/core/src/validator.rs b/core/src/validator.rs index 8b1522d45ab2fc..005afe0277b798 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -378,6 +378,7 @@ impl Validator { "New shred signal for the TVU should be the same as the clear bank signal." ); + let (vote_sender, vote_receiver) = unbounded(); let tvu = Tvu::new( vote_account, voting_keypair, @@ -445,6 +446,7 @@ impl Validator { &config.broadcast_stage_type, &exit, node.info.shred_version, + vote_sender, ); datapoint_info!("validator-new", ("id", id.to_string(), String)); From edd1fd0dbaa4f0038dca0212361d8eed199d0ffa Mon Sep 17 00:00:00 2001 From: Carl Date: Thu, 13 Feb 2020 12:25:49 -0800 Subject: [PATCH 02/16] Switch from using channel --- core/src/cluster_info_vote_listener.rs | 210 +++++++++++++++++++++---- core/src/replay_stage.rs | 14 +- core/src/tpu.rs | 12 +- core/src/tvu.rs | 4 +- core/src/validator.rs | 14 +- ledger/src/bank_forks.rs | 4 + 6 files changed, 219 insertions(+), 39 deletions(-) diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 863df006a039da..81ab45d80ef157 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -6,15 +6,74 @@ use crate::{packet, sigverify}; use crossbeam_channel::{ unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender, }; +use solana_ledger::bank_forks::BankForks; use solana_metrics::inc_new_counter_debug; -use solana_sdk::{clock::Slot, pubkey::Pubkey, transaction::Transaction}; +use solana_runtime::bank::Bank; +use solana_sdk::{ + clock::Slot, program_utils::limited_deserialize, pubkey::Pubkey, transaction::Transaction, +}; +use solana_vote_program::vote_instruction::VoteInstruction; +use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::thread::{self, sleep, Builder, JoinHandle}; use std::time::Duration; -pub type VoteSender = CrossbeamSender<(Pubkey, Vec)>; -pub type VoteReceiver = CrossbeamReceiver<(Pubkey, Vec)>; +pub struct VoteTracker { + // Don't track votes from people who are not staked, acts as a spam filter + epoch_validators: HashSet>, + // Map from a slot to a set of validators who have voted for that slot + votes: HashMap>>, + current_epoch: u64, +} + +impl VoteTracker { + pub fn new(root_bank: &Bank) -> Self { + let current_epoch = root_bank.epoch_schedule().get_epoch(root_bank.slot()); + let epoch_validators = Self::get_staked_pubkeys(root_bank, current_epoch); + Self { + epoch_validators, + votes: HashMap::new(), + current_epoch, + } + } + + pub fn votes(&self) -> &HashMap>> { + &self.votes + } + + pub fn get_voter_pubkey(&self, pubkey: &Pubkey) -> Option<&Arc> { + self.epoch_validators.get(pubkey) + } + + pub fn get_staked_pubkeys(bank: &Bank, current_epoch: u64) -> HashSet> { + let mut i = 0; + let epoch_validators = HashSet::new(); + loop { + // Get all known vote accounts with nonzero stake in any + // upcoming epochs + if let Some(vote_accounts) = bank.epoch_vote_accounts(current_epoch + i) { + let staked_pubkeys = vote_accounts + .into_iter() + .filter_map(|(pk, (stake, _))| { + if *stake > 0 { + Some(Arc::new(*pk)) + } else { + None + } + }) + .collect(); + + epoch_validators.union(&staked_pubkeys); + } else { + break; + } + i += 1; + } + + epoch_validators + } +} pub struct ClusterInfoVoteListener { thread_hdls: Vec>, @@ -27,29 +86,32 @@ impl ClusterInfoVoteListener { sigverify_disabled: bool, sender: CrossbeamSender>, poh_recorder: &Arc>, - vote_sender: VoteSender, + vote_tracker: Arc>, + bank_forks: Arc>, ) -> Self { - let exit = exit.clone(); + let exit_ = exit.clone(); let poh_recorder = poh_recorder.clone(); - let (vote_tx_sender, vote_tx_receiver) = unbounded(); + let (vote_txs_sender, vote_txs_receiver) = unbounded(); let listen_thread = Builder::new() .name("solana-cluster_info_vote_listener".to_string()) .spawn(move || { let _ = Self::recv_loop( - exit, + exit_, &cluster_info, sigverify_disabled, &sender, - vote_tx_sender, + vote_txs_sender, poh_recorder, ); }) .unwrap(); + let exit_ = exit.clone(); let send_thread = Builder::new() - .name("solana-cluster_info_vote_sender".to_string()) + .name("solana-cluster_info_process_votes".to_string()) .spawn(move || { - let _ = Self::send_loop(exit, vote_tx_receiver, vote_sender); + let _ = + Self::process_votes_loop(exit_, vote_txs_receiver, vote_tracker, &bank_forks); }) .unwrap(); @@ -63,7 +125,7 @@ impl ClusterInfoVoteListener { cluster_info: &Arc>, sigverify_disabled: bool, packets_sender: &CrossbeamSender>, - vote_tx_sender: CrossbeamSender, + vote_txs_sender: CrossbeamSender>, poh_recorder: Arc>, ) -> Result<()> { loop { @@ -84,14 +146,23 @@ impl ClusterInfoVoteListener { sigverify::ed25519_verify_cpu(&msgs) }; assert_eq!( - r.iter().map(|packets_results| packets_results.len()).sum(), - votes.len(0) + r.iter() + .map(|packets_results| packets_results.len()) + .sum::(), + votes.len() ); - for (vote_tx, result) in votes.zip(r.iter().flatten()) { - if result != 0 { - vote_tx_sender.send(vote_tx); - } - } + let valid_votes: Vec<_> = votes + .into_iter() + .zip(r.iter().flatten()) + .filter_map(|(vote, verify_result)| { + if *verify_result != 0 { + Some(vote) + } else { + None + } + }) + .collect(); + vote_txs_sender.send(valid_votes)?; sigverify::mark_disabled(&mut msgs, &r); packets_sender.send(msgs)?; } @@ -100,16 +171,26 @@ impl ClusterInfoVoteListener { } } - fn send_loop( + fn process_votes_loop( exit: Arc, - vote_tx_receiver: CrossbeamReceiver, - vote_sender: VoteSender, + vote_txs_receiver: CrossbeamReceiver>, + vote_tracker: Arc>, + bank_forks: &RwLock, ) -> Result<()> { + let mut old_root = bank_forks.read().unwrap().root(); loop { if exit.load(Ordering::Relaxed) { return Ok(()); } - if let Err(e) = Self::run_deserialize_and_send_vote(&vote_tx_receiver, &vote_sender) { + + let root_bank = bank_forks.read().unwrap().root_bank().clone(); + + if root_bank.slot() != old_root { + Self::process_new_root(&root_bank, &vote_tracker); + old_root = root_bank.slot(); + } + + if let Err(e) = Self::process_votes(&vote_txs_receiver, &vote_tracker) { match e { Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => { return Ok(()); @@ -123,19 +204,92 @@ impl ClusterInfoVoteListener { } } - fn run_deserialize_and_send_vote( - vote_tx_receiver: &CrossbeamReceiver, - vote_sender: &VoteSender, + fn process_votes( + vote_txs_receiver: &CrossbeamReceiver>, + vote_tracker: &Arc>, ) -> Result<()> { let timer = Duration::from_millis(200); - let tx = vote_tx_receiver.recv_timeout(timer)?; - while let Ok(shred) = vote_tx_receiver.try_recv() { - vote_sender.send((0, Pubkey::default())); + let mut vote_txs = vote_txs_receiver.recv_timeout(timer)?; + let mut diff: HashMap>> = HashMap::new(); + { + let vote_tracker = vote_tracker.read().unwrap(); + let slot_pubkeys = &vote_tracker.votes; + while let Ok(new_txs) = vote_txs_receiver.try_recv() { + vote_txs.extend(new_txs); + } + + for tx in vote_txs { + if let (Some(instruction), Some(vote_pubkey)) = ( + tx.message.instructions.first(), + tx.message.instructions.first(), + ) { + if let Ok(instruction) = limited_deserialize(&instruction.data) { + let (vote, vote_pubkey) = { + match instruction { + VoteInstruction::Vote(vote) => (vote, Pubkey::default()), + _ => { + continue; + } + } + }; + + // Only accept votes from vote pubkeys with non-zero stake + if let Some(vote_pubkey) = vote_tracker.get_voter_pubkey(&vote_pubkey) { + for slot in vote.slots { + if slot_pubkeys + .get(&slot) + .map(|slot_vote_pubkeys| { + slot_vote_pubkeys.contains(vote_pubkey) + }) + .unwrap_or(false) + { + diff.entry(slot).or_default().insert(vote_pubkey.clone()); + } + } + } + } + } + } + } + + let mut vote_tracker = vote_tracker.write().unwrap(); + let all_votes = &mut vote_tracker.votes; + for (slot, slot_diff) in diff { + let slot_pubkeys = all_votes.entry(slot).or_default(); + for pk in slot_diff { + slot_pubkeys.insert(pk); + } } Ok(()) } + pub fn process_new_root(root_bank: &Bank, vote_tracker: &RwLock) { + let new_epoch = root_bank.epoch_schedule().get_epoch(root_bank.slot()); + if new_epoch != vote_tracker.read().unwrap().current_epoch { + let mut current_validators = vote_tracker.read().unwrap().epoch_validators.clone(); + let new_epoch_validators = VoteTracker::get_staked_pubkeys(root_bank, new_epoch); + + // Remove the old pubkeys + current_validators.retain(|v| new_epoch_validators.contains(v)); + + // Insert any new pubkeys, don't re-insert ones we already have, + // otherwise memory usage increases from the duplicates being held + // in VoteTracker.votes + let new_public_keys: Vec<_> = new_epoch_validators + .difference(¤t_validators) + .cloned() + .collect(); + for key in new_public_keys { + current_validators.insert(key); + } + + let mut vote_tracker = vote_tracker.write().unwrap(); + std::mem::swap(&mut current_validators, &mut vote_tracker.epoch_validators); + vote_tracker.current_epoch = new_epoch; + } + } + pub fn join(self) -> thread::Result<()> { for thread_hdl in self.thread_hdls { thread_hdl.join()?; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index a533d5693179fe..bd54ebec390a60 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -2,11 +2,12 @@ use crate::{ cluster_info::ClusterInfo, - cluster_info_vote_listener::VoteReceiver, + cluster_info_vote_listener::VoteTracker, commitment::{AggregateCommitmentService, BlockCommitmentCache, CommitmentAggregationData}, consensus::{StakeLockout, Tower}, poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, result::Result, + rewards_recorder_service::RewardsRecorderSender, rpc_subscriptions::RpcSubscriptions, }; use solana_ledger::{ @@ -86,6 +87,7 @@ pub struct ReplayStageConfig { pub snapshot_package_sender: Option, pub block_commitment_cache: Arc>, pub transaction_status_sender: Option, + pub rewards_recorder_sender: Option, } pub struct ReplayStage { @@ -175,7 +177,7 @@ impl ReplayStage { cluster_info: Arc>, ledger_signal_receiver: Receiver, poh_recorder: Arc>, - vote_receiver: VoteReceiver, + vote_tracker: Arc>, ) -> (Self, Receiver>>) { let ReplayStageConfig { my_pubkey, @@ -189,6 +191,7 @@ impl ReplayStage { snapshot_package_sender, block_commitment_cache, transaction_status_sender, + rewards_recorder_sender, } = config; let (root_bank_sender, root_bank_receiver) = channel(); @@ -235,6 +238,7 @@ impl ReplayStage { &bank_forks, &leader_schedule_cache, &subscriptions, + rewards_recorder_sender.clone(), ); Self::report_memory(&allocated, "generate_new_bank_forks", start); @@ -438,6 +442,7 @@ impl ReplayStage { &poh_recorder, &leader_schedule_cache, &subscriptions, + rewards_recorder_sender.clone(), ); if let Some(bank) = poh_recorder.lock().unwrap().bank() { @@ -519,6 +524,7 @@ impl ReplayStage { poh_recorder: &Arc>, leader_schedule_cache: &Arc, subscriptions: &Arc, + rewards_recorder_sender: Option, ) { // all the individual calls to poh_recorder.lock() are designed to // increase granularity, decrease contention @@ -1182,6 +1188,7 @@ impl ReplayStage { forks_lock: &RwLock, leader_schedule_cache: &Arc, subscriptions: &Arc, + rewards_recorder_sender: Option, ) { // Find the next slot that chains to the old slot let forks = forks_lock.read().unwrap(); @@ -1330,7 +1337,6 @@ pub(crate) mod tests { struct ForkSelectionResponse { slot: u64, is_locked_out: bool, - is_available: bool, } fn simulate_fork_selection( @@ -1626,6 +1632,7 @@ pub(crate) mod tests { &bank_forks, &leader_schedule_cache, &subscriptions, + None, ); assert!(bank_forks.read().unwrap().get(1).is_some()); @@ -1638,6 +1645,7 @@ pub(crate) mod tests { &bank_forks, &leader_schedule_cache, &subscriptions, + None, ); assert!(bank_forks.read().unwrap().get(1).is_some()); assert!(bank_forks.read().unwrap().get(2).is_some()); diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 05efae36655a7c..7d0c88dcb07421 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -5,14 +5,16 @@ use crate::{ banking_stage::BankingStage, broadcast_stage::{BroadcastStage, BroadcastStageType}, cluster_info::ClusterInfo, - cluster_info_vote_listener::{ClusterInfoVoteListener, VoteSender}, + cluster_info_vote_listener::{ClusterInfoVoteListener, VoteTracker}, fetch_stage::FetchStage, poh_recorder::{PohRecorder, WorkingBankEntry}, sigverify::TransactionSigVerifier, sigverify_stage::{DisabledSigVerifier, SigVerifyStage}, }; use crossbeam_channel::unbounded; -use solana_ledger::{blockstore::Blockstore, blockstore_processor::TransactionStatusSender}; +use solana_ledger::{ + bank_forks::BankForks, blockstore::Blockstore, blockstore_processor::TransactionStatusSender, +}; use std::{ net::UdpSocket, sync::{ @@ -46,7 +48,8 @@ impl Tpu { broadcast_type: &BroadcastStageType, exit: &Arc, shred_version: u16, - vote_sender: VoteSender, + vote_tracker: Arc>, + bank_forks: Arc>, ) -> Self { let (packet_sender, packet_receiver) = channel(); let fetch_stage = FetchStage::new_with_sender( @@ -73,7 +76,8 @@ impl Tpu { sigverify_disabled, verified_vote_sender, &poh_recorder, - vote_sender, + vote_tracker, + bank_forks, ); let banking_stage = BankingStage::new( diff --git a/core/src/tvu.rs b/core/src/tvu.rs index e986813f2dc891..bc802974cc9979 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -4,7 +4,7 @@ use crate::{ blockstream_service::BlockstreamService, cluster_info::ClusterInfo, - cluster_info_vote_listener::VoteReceiver, + cluster_info_vote_listener::VoteTracker, commitment::BlockCommitmentCache, ledger_cleanup_service::LedgerCleanupService, poh_recorder::PohRecorder, @@ -172,7 +172,7 @@ impl Tvu { cluster_info.clone(), ledger_signal_receiver, poh_recorder.clone(), - vote_receiver, + vote_tracker, ); let blockstream_service = if let Some(blockstream_unix_socket) = blockstream_unix_socket { diff --git a/core/src/validator.rs b/core/src/validator.rs index 005afe0277b798..addbb59933f682 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -3,6 +3,7 @@ use crate::{ broadcast_stage::BroadcastStageType, cluster_info::{ClusterInfo, Node}, + cluster_info_vote_listener::VoteTracker, commitment::BlockCommitmentCache, contact_info::ContactInfo, gossip_service::{discover_cluster, GossipService}, @@ -378,7 +379,15 @@ impl Validator { "New shred signal for the TVU should be the same as the clear bank signal." ); - let (vote_sender, vote_receiver) = unbounded(); + let vote_tracker = Arc::new(RwLock::new({ + let bank_forks = bank_forks.read().unwrap(); + VoteTracker::new( + bank_forks + .get(bank_forks.root()) + .expect("Root bank must exist"), + ) + })); + let tvu = Tvu::new( vote_account, voting_keypair, @@ -446,7 +455,8 @@ impl Validator { &config.broadcast_stage_type, &exit, node.info.shred_version, - vote_sender, + vote_tracker, + bank_forks, ); datapoint_info!("validator-new", ("id", id.to_string(), String)); diff --git a/ledger/src/bank_forks.rs b/ledger/src/bank_forks.rs index 56123263e0dbc6..dea63ab75b3251 100644 --- a/ledger/src/bank_forks.rs +++ b/ledger/src/bank_forks.rs @@ -122,6 +122,10 @@ impl BankForks { self.banks.get(&bank_slot) } + pub fn root_bank(&self) -> &Arc { + self.banks.get(&self.root()).expect("Root bank must exist") + } + pub fn new_from_banks(initial_forks: &[Arc], rooted_path: Vec) -> Self { let mut banks = HashMap::new(); let working_bank = initial_forks[0].clone(); From 53190b4947161536cf768d9a54011723926c8575 Mon Sep 17 00:00:00 2001 From: Carl Date: Mon, 17 Feb 2020 14:14:54 -0800 Subject: [PATCH 03/16] Track AuthorizedVoters --- core/src/cluster_info_vote_listener.rs | 167 +++++++++++++++++-------- core/src/replay_stage.rs | 2 +- core/src/tvu.rs | 1 + sdk/src/message.rs | 4 + 4 files changed, 122 insertions(+), 52 deletions(-) diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 81ab45d80ef157..d57a8f46d22fb5 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -6,13 +6,16 @@ use crate::{packet, sigverify}; use crossbeam_channel::{ unbounded, Receiver as CrossbeamReceiver, RecvTimeoutError, Sender as CrossbeamSender, }; +use log::*; use solana_ledger::bank_forks::BankForks; use solana_metrics::inc_new_counter_debug; use solana_runtime::bank::Bank; use solana_sdk::{ - clock::Slot, program_utils::limited_deserialize, pubkey::Pubkey, transaction::Transaction, + clock::Slot, epoch_schedule::EpochSchedule, program_utils::limited_deserialize, pubkey::Pubkey, + transaction::Transaction, }; use solana_vote_program::vote_instruction::VoteInstruction; +use solana_vote_program::vote_state::{AuthorizedVoters, VoteState}; use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, RwLock}; @@ -21,20 +24,22 @@ use std::time::Duration; pub struct VoteTracker { // Don't track votes from people who are not staked, acts as a spam filter - epoch_validators: HashSet>, + epoch_authorized_voters: HashMap, AuthorizedVoters>, // Map from a slot to a set of validators who have voted for that slot votes: HashMap>>, current_epoch: u64, + epoch_schedule: EpochSchedule, } impl VoteTracker { pub fn new(root_bank: &Bank) -> Self { let current_epoch = root_bank.epoch_schedule().get_epoch(root_bank.slot()); - let epoch_validators = Self::get_staked_pubkeys(root_bank, current_epoch); + let epoch_authorized_voters = Self::get_staked_authorized_voters(root_bank, current_epoch); Self { - epoch_validators, + epoch_authorized_voters, votes: HashMap::new(), current_epoch, + epoch_schedule: root_bank.epoch_schedule().clone(), } } @@ -43,35 +48,52 @@ impl VoteTracker { } pub fn get_voter_pubkey(&self, pubkey: &Pubkey) -> Option<&Arc> { - self.epoch_validators.get(pubkey) + self.epoch_authorized_voters + .get_key_value(pubkey) + .map(|(key, _)| key) } - pub fn get_staked_pubkeys(bank: &Bank, current_epoch: u64) -> HashSet> { - let mut i = 0; - let epoch_validators = HashSet::new(); - loop { - // Get all known vote accounts with nonzero stake in any - // upcoming epochs - if let Some(vote_accounts) = bank.epoch_vote_accounts(current_epoch + i) { - let staked_pubkeys = vote_accounts - .into_iter() - .filter_map(|(pk, (stake, _))| { - if *stake > 0 { - Some(Arc::new(*pk)) - } else { - None - } - }) - .collect(); - - epoch_validators.union(&staked_pubkeys); - } else { - break; - } - i += 1; - } + pub fn get_authorized_voter(&self, pubkey: &Pubkey, slot: u64) -> Option { + let epoch = self.epoch_schedule.get_epoch(slot); + self.epoch_authorized_voters + .get(pubkey) + .map(|authorized_voters| authorized_voters.get_authorized_voter(epoch)) + .unwrap_or(None) + } - epoch_validators + pub fn get_staked_authorized_voters( + bank: &Bank, + current_epoch: u64, + ) -> HashMap, AuthorizedVoters> { + // Get all known vote accounts with nonzero stake and read out their + // authorized voters + bank.epoch_vote_accounts(current_epoch) + .expect("Epoch vote accounts must exist") + .into_iter() + .filter_map(|(key, (stake, account))| { + let vote_state = VoteState::from(&account); + if vote_state.is_none() { + datapoint_warn!( + "cluster_info_vote_listener", + ( + "warn", + format!("Unable to get vote_state from account {}", key), + String + ), + ); + return None; + } + let vote_state = vote_state.unwrap(); + if *stake > 0 { + let mut authorized_voters = vote_state.authorized_voter().clone(); + authorized_voters.get_and_cache_authorized_voter_for_epoch(current_epoch); + authorized_voters.get_and_cache_authorized_voter_for_epoch(current_epoch + 1); + Some((Arc::new(*key), authorized_voters)) + } else { + None + } + }) + .collect() } } @@ -177,17 +199,23 @@ impl ClusterInfoVoteListener { vote_tracker: Arc>, bank_forks: &RwLock, ) -> Result<()> { - let mut old_root = bank_forks.read().unwrap().root(); + let mut old_leader_schedule_epoch = { + let root_bank = bank_forks.read().unwrap().root_bank().clone(); + root_bank.get_leader_schedule_epoch(root_bank.slot()) + }; + loop { if exit.load(Ordering::Relaxed) { return Ok(()); } let root_bank = bank_forks.read().unwrap().root_bank().clone(); + let new_leader_schedule_epoch = + { root_bank.get_leader_schedule_epoch(root_bank.slot()) }; - if root_bank.slot() != old_root { + if old_leader_schedule_epoch != new_leader_schedule_epoch { Self::process_new_root(&root_bank, &vote_tracker); - old_root = root_bank.slot(); + old_leader_schedule_epoch = new_leader_schedule_epoch; } if let Err(e) = Self::process_votes(&vote_txs_receiver, &vote_tracker) { @@ -204,6 +232,17 @@ impl ClusterInfoVoteListener { } } + fn vote_contains_authorized_voter(vote_tx: &Transaction, authorized_voter: Pubkey) -> bool { + let message = &vote_tx.message; + for (i, key) in message.account_keys[1..].iter().enumerate() { + if message.is_signer(i + 1) && *key == authorized_voter { + return true; + } + } + + false + } + fn process_votes( vote_txs_receiver: &CrossbeamReceiver>, vote_tracker: &Arc>, @@ -221,21 +260,48 @@ impl ClusterInfoVoteListener { for tx in vote_txs { if let (Some(instruction), Some(vote_pubkey)) = ( tx.message.instructions.first(), - tx.message.instructions.first(), + tx.message.account_keys.first(), ) { - if let Ok(instruction) = limited_deserialize(&instruction.data) { - let (vote, vote_pubkey) = { - match instruction { - VoteInstruction::Vote(vote) => (vote, Pubkey::default()), + if let Ok(vote_instruction) = limited_deserialize(&instruction.data) { + let vote = { + match vote_instruction { + VoteInstruction::Vote(vote) => vote, _ => { continue; } } }; - // Only accept votes from vote pubkeys with non-zero stake + if vote.slots.is_empty() { + continue; + } + + let last_vote_slot = vote.slots.last().unwrap(); + + // Determine the authorized voter based on the last vote slot. This will + // drop votes from authorized voters trying to make votes for slots + // earlier than the epoch for which they are authorized + let actual_authorized_voter = + vote_tracker.get_authorized_voter(&vote_pubkey, *last_vote_slot); + + if actual_authorized_voter.is_none() { + continue; + } + + // Voting without the correct authorized pubkey, dump the vote + if !Self::vote_contains_authorized_voter( + &tx, + actual_authorized_voter.unwrap(), + ) { + continue; + } + + // Only accept votes from authorized vote pubkeys with non-zero stake + // that we determined at leader_schedule_epoch boundaries if let Some(vote_pubkey) = vote_tracker.get_voter_pubkey(&vote_pubkey) { for slot in vote.slots { + // Don't insert if we already have marked down this pubkey + // voting for this slot if slot_pubkeys .get(&slot) .map(|slot_vote_pubkeys| { @@ -267,25 +333,24 @@ impl ClusterInfoVoteListener { pub fn process_new_root(root_bank: &Bank, vote_tracker: &RwLock) { let new_epoch = root_bank.epoch_schedule().get_epoch(root_bank.slot()); if new_epoch != vote_tracker.read().unwrap().current_epoch { - let mut current_validators = vote_tracker.read().unwrap().epoch_validators.clone(); - let new_epoch_validators = VoteTracker::get_staked_pubkeys(root_bank, new_epoch); + let mut current_validators = + vote_tracker.read().unwrap().epoch_authorized_voters.clone(); + let new_epoch_authorized_voters = + VoteTracker::get_staked_authorized_voters(root_bank, new_epoch); // Remove the old pubkeys - current_validators.retain(|v| new_epoch_validators.contains(v)); + current_validators.retain(|pubkey, _| new_epoch_authorized_voters.contains_key(pubkey)); // Insert any new pubkeys, don't re-insert ones we already have, // otherwise memory usage increases from the duplicates being held - // in VoteTracker.votes - let new_public_keys: Vec<_> = new_epoch_validators - .difference(¤t_validators) - .cloned() - .collect(); - for key in new_public_keys { - current_validators.insert(key); - } + // in Arc references to those duplicates in VoteTracker.votes + current_validators.extend(new_epoch_authorized_voters); let mut vote_tracker = vote_tracker.write().unwrap(); - std::mem::swap(&mut current_validators, &mut vote_tracker.epoch_validators); + std::mem::swap( + &mut current_validators, + &mut vote_tracker.epoch_authorized_voters, + ); vote_tracker.current_epoch = new_epoch; } } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index bd54ebec390a60..d0e6872f5bfe16 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -177,7 +177,7 @@ impl ReplayStage { cluster_info: Arc>, ledger_signal_receiver: Receiver, poh_recorder: Arc>, - vote_tracker: Arc>, + _vote_tracker: Arc>, ) -> (Self, Receiver>>) { let ReplayStageConfig { my_pubkey, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index bc802974cc9979..9d8621da5411b0 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -304,6 +304,7 @@ pub mod tests { None, None, None, + Arc::new(RwLock::new(VoteTracker::new(&bank))), ); exit.store(true, Ordering::Relaxed); tvu.join().unwrap(); diff --git a/sdk/src/message.rs b/sdk/src/message.rs index 386fcd46a41607..2abe276d4d3e5a 100644 --- a/sdk/src/message.rs +++ b/sdk/src/message.rs @@ -260,6 +260,10 @@ impl Message { - self.header.num_readonly_unsigned_accounts as usize) } + pub fn is_signer(&self, i: usize) -> bool { + i < self.header.num_required_signatures as usize + } + pub fn get_account_keys_by_lock_type(&self) -> (Vec<&Pubkey>, Vec<&Pubkey>) { let mut writable_keys = vec![]; let mut readonly_keys = vec![]; From 83a0a2993190caf8a61a3929ca4c54709f709c37 Mon Sep 17 00:00:00 2001 From: Carl Date: Mon, 17 Feb 2020 20:47:17 -0800 Subject: [PATCH 04/16] Add tests --- core/src/cluster_info_vote_listener.rs | 209 +++++++++++++++++++------ 1 file changed, 160 insertions(+), 49 deletions(-) diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index d57a8f46d22fb5..827df061e9b501 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -11,7 +11,10 @@ use solana_ledger::bank_forks::BankForks; use solana_metrics::inc_new_counter_debug; use solana_runtime::bank::Bank; use solana_sdk::{ - clock::Slot, epoch_schedule::EpochSchedule, program_utils::limited_deserialize, pubkey::Pubkey, + clock::{Epoch, Slot}, + epoch_schedule::EpochSchedule, + program_utils::limited_deserialize, + pubkey::Pubkey, transaction::Transaction, }; use solana_vote_program::vote_instruction::VoteInstruction; @@ -27,18 +30,19 @@ pub struct VoteTracker { epoch_authorized_voters: HashMap, AuthorizedVoters>, // Map from a slot to a set of validators who have voted for that slot votes: HashMap>>, - current_epoch: u64, epoch_schedule: EpochSchedule, } impl VoteTracker { pub fn new(root_bank: &Bank) -> Self { - let current_epoch = root_bank.epoch_schedule().get_epoch(root_bank.slot()); - let epoch_authorized_voters = Self::get_staked_authorized_voters(root_bank, current_epoch); + let leader_schedule_epoch = root_bank + .epoch_schedule() + .get_leader_schedule_epoch(root_bank.slot()); + let epoch_authorized_voters = + Self::get_staked_authorized_voters(root_bank, leader_schedule_epoch); Self { epoch_authorized_voters, votes: HashMap::new(), - current_epoch, epoch_schedule: root_bank.epoch_schedule().clone(), } } @@ -53,7 +57,7 @@ impl VoteTracker { .map(|(key, _)| key) } - pub fn get_authorized_voter(&self, pubkey: &Pubkey, slot: u64) -> Option { + pub fn get_authorized_voter(&self, pubkey: &Pubkey, slot: Slot) -> Option { let epoch = self.epoch_schedule.get_epoch(slot); self.epoch_authorized_voters .get(pubkey) @@ -63,11 +67,11 @@ impl VoteTracker { pub fn get_staked_authorized_voters( bank: &Bank, - current_epoch: u64, + epoch: Epoch, ) -> HashMap, AuthorizedVoters> { // Get all known vote accounts with nonzero stake and read out their // authorized voters - bank.epoch_vote_accounts(current_epoch) + bank.epoch_vote_accounts(epoch) .expect("Epoch vote accounts must exist") .into_iter() .filter_map(|(key, (stake, account))| { @@ -85,9 +89,9 @@ impl VoteTracker { } let vote_state = vote_state.unwrap(); if *stake > 0 { - let mut authorized_voters = vote_state.authorized_voter().clone(); - authorized_voters.get_and_cache_authorized_voter_for_epoch(current_epoch); - authorized_voters.get_and_cache_authorized_voter_for_epoch(current_epoch + 1); + let mut authorized_voters = vote_state.authorized_voters().clone(); + authorized_voters.get_and_cache_authorized_voter_for_epoch(epoch); + authorized_voters.get_and_cache_authorized_voter_for_epoch(epoch + 1); Some((Arc::new(*key), authorized_voters)) } else { None @@ -95,6 +99,17 @@ impl VoteTracker { }) .collect() } + + pub fn vote_contains_authorized_voter(vote_tx: &Transaction, authorized_voter: Pubkey) -> bool { + let message = &vote_tx.message; + for (i, key) in message.account_keys.iter().enumerate() { + if message.is_signer(i) && *key == authorized_voter { + return true; + } + } + + false + } } pub struct ClusterInfoVoteListener { @@ -214,7 +229,11 @@ impl ClusterInfoVoteListener { { root_bank.get_leader_schedule_epoch(root_bank.slot()) }; if old_leader_schedule_epoch != new_leader_schedule_epoch { - Self::process_new_root(&root_bank, &vote_tracker); + Self::process_new_leader_schedule_epoch( + &root_bank, + &vote_tracker, + new_leader_schedule_epoch, + ); old_leader_schedule_epoch = new_leader_schedule_epoch; } @@ -232,17 +251,6 @@ impl ClusterInfoVoteListener { } } - fn vote_contains_authorized_voter(vote_tx: &Transaction, authorized_voter: Pubkey) -> bool { - let message = &vote_tx.message; - for (i, key) in message.account_keys[1..].iter().enumerate() { - if message.is_signer(i + 1) && *key == authorized_voter { - return true; - } - } - - false - } - fn process_votes( vote_txs_receiver: &CrossbeamReceiver>, vote_tracker: &Arc>, @@ -289,7 +297,7 @@ impl ClusterInfoVoteListener { } // Voting without the correct authorized pubkey, dump the vote - if !Self::vote_contains_authorized_voter( + if !VoteTracker::vote_contains_authorized_voter( &tx, actual_authorized_voter.unwrap(), ) { @@ -330,29 +338,28 @@ impl ClusterInfoVoteListener { Ok(()) } - pub fn process_new_root(root_bank: &Bank, vote_tracker: &RwLock) { - let new_epoch = root_bank.epoch_schedule().get_epoch(root_bank.slot()); - if new_epoch != vote_tracker.read().unwrap().current_epoch { - let mut current_validators = - vote_tracker.read().unwrap().epoch_authorized_voters.clone(); - let new_epoch_authorized_voters = - VoteTracker::get_staked_authorized_voters(root_bank, new_epoch); - - // Remove the old pubkeys - current_validators.retain(|pubkey, _| new_epoch_authorized_voters.contains_key(pubkey)); - - // Insert any new pubkeys, don't re-insert ones we already have, - // otherwise memory usage increases from the duplicates being held - // in Arc references to those duplicates in VoteTracker.votes - current_validators.extend(new_epoch_authorized_voters); - - let mut vote_tracker = vote_tracker.write().unwrap(); - std::mem::swap( - &mut current_validators, - &mut vote_tracker.epoch_authorized_voters, - ); - vote_tracker.current_epoch = new_epoch; - } + pub fn process_new_leader_schedule_epoch( + root_bank: &Bank, + vote_tracker: &RwLock, + new_leader_schedule_epoch: Epoch, + ) { + let mut current_validators = vote_tracker.read().unwrap().epoch_authorized_voters.clone(); + let new_epoch_authorized_voters = + VoteTracker::get_staked_authorized_voters(root_bank, new_leader_schedule_epoch); + + // Remove the old pubkeys + current_validators.retain(|pubkey, _| new_epoch_authorized_voters.contains_key(pubkey)); + + // Insert any new pubkeys, don't re-insert ones we already have, + // otherwise memory usage increases from the duplicates being held + // in Arc references to those duplicates in VoteTracker.votes + current_validators.extend(new_epoch_authorized_voters); + + let mut vote_tracker = vote_tracker.write().unwrap(); + std::mem::swap( + &mut current_validators, + &mut vote_tracker.epoch_authorized_voters, + ); } pub fn join(self) -> thread::Result<()> { @@ -365,12 +372,16 @@ impl ClusterInfoVoteListener { #[cfg(test)] mod tests { + use super::*; use crate::packet; + use solana_runtime::{ + bank::Bank, + genesis_utils::{self, GenesisConfigInfo, ValidatorVoteKeypairs}, + }; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, Signer}; use solana_sdk::transaction::Transaction; - use solana_vote_program::vote_instruction; - use solana_vote_program::vote_state::Vote; + use solana_vote_program::{vote_instruction, vote_state::Vote}; #[test] fn test_max_vote_tx_fits() { @@ -393,4 +404,104 @@ mod tests { assert_eq!(msgs.len(), 1); } + + #[test] + fn vote_contains_authorized_voter() { + let node_keypair = Keypair::new(); + let authorized_voter = Keypair::new(); + + let vote_ix = vote_instruction::vote( + &node_keypair.pubkey(), + &authorized_voter.pubkey(), + Vote::default(), + ); + + let mut vote_tx = Transaction::new_with_payer(vec![vote_ix], Some(&node_keypair.pubkey())); + + vote_tx.partial_sign(&[&node_keypair], Hash::default()); + vote_tx.partial_sign(&[&authorized_voter], Hash::default()); + + // Check that the two signing keys pass the check + assert!(VoteTracker::vote_contains_authorized_voter( + &vote_tx, + node_keypair.pubkey() + )); + + assert!(VoteTracker::vote_contains_authorized_voter( + &vote_tx, + authorized_voter.pubkey() + )); + + // Set the authorized voter == node keypair + let vote_ix = vote_instruction::vote( + &node_keypair.pubkey(), + &node_keypair.pubkey(), + Vote::default(), + ); + let mut vote_tx = Transaction::new_with_payer(vec![vote_ix], Some(&node_keypair.pubkey())); + vote_tx.partial_sign(&[&node_keypair], Hash::default()); + vote_tx.partial_sign(&[&node_keypair], Hash::default()); + + // Check that the node_keypair itself still passes the authorized voter check + assert!(VoteTracker::vote_contains_authorized_voter( + &vote_tx, + node_keypair.pubkey() + )); + + // The other keyapir should not pass + assert!(!VoteTracker::vote_contains_authorized_voter( + &vote_tx, + authorized_voter.pubkey() + )); + } + + #[test] + fn test_process_votes() { + // Create some voters at genesiss + let validator_voting_keypairs: Vec<_> = (0..10) + .map(|_| ValidatorVoteKeypairs::new(Keypair::new(), Keypair::new(), Keypair::new())) + .collect(); + let GenesisConfigInfo { genesis_config, .. } = + genesis_utils::create_genesis_config_with_vote_accounts( + 10_000, + &validator_voting_keypairs, + ); + let bank = Bank::new(&genesis_config); + + // Send some votes to process + let vote_tracker = Arc::new(RwLock::new(VoteTracker::new(&bank))); + let (votes_sender, votes_receiver) = unbounded(); + + let vote_slot = 1; + validator_voting_keypairs.iter().for_each(|keypairs| { + let node_keypair = &keypairs.node_keypair; + let vote_keypair = &keypairs.vote_keypair; + let vote_ix = vote_instruction::vote( + &node_keypair.pubkey(), + &vote_keypair.pubkey(), + Vote::new(vec![vote_slot], Hash::default()), + ); + + let mut vote_tx = + Transaction::new_with_payer(vec![vote_ix], Some(&node_keypair.pubkey())); + + vote_tx.partial_sign(&[node_keypair], Hash::default()); + vote_tx.partial_sign(&[vote_keypair], Hash::default()); + votes_sender.send(vec![vote_tx]).unwrap(); + }); + + // Check that all the votes were registered for each validator correctly + ClusterInfoVoteListener::process_votes(&votes_receiver, &vote_tracker).unwrap(); + let r_vote_tracker = vote_tracker.read().unwrap(); + let votes = r_vote_tracker.votes(); + let votes_for_slot = votes.get(&vote_slot).unwrap(); + for voting_keypairs in validator_voting_keypairs { + assert!(votes_for_slot.contains(&voting_keypairs.node_keypair.pubkey())); + } + } + + #[test] + fn test_process_new_leader_schedule_epoch() { + // Make sure size doesn't grow + } } From 1c2f3d2df213c67759eb5307ef0b6b5384060f3d Mon Sep 17 00:00:00 2001 From: Carl Date: Tue, 18 Feb 2020 13:54:00 -0800 Subject: [PATCH 05/16] Temp --- core/src/cluster_info_vote_listener.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 827df061e9b501..dac4287c3fe031 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -61,7 +61,10 @@ impl VoteTracker { let epoch = self.epoch_schedule.get_epoch(slot); self.epoch_authorized_voters .get(pubkey) - .map(|authorized_voters| authorized_voters.get_authorized_voter(epoch)) + .map(|authorized_voters| { + println!("authorized voters: {:?}", authorized_voters); + authorized_voters.get_authorized_voter(epoch) + }) .unwrap_or(None) } @@ -88,7 +91,9 @@ impl VoteTracker { return None; } let vote_state = vote_state.unwrap(); + println!("Found vote state"); if *stake > 0 { + println!("Found vote state with stake > 0, key: {:?}", key); let mut authorized_voters = vote_state.authorized_voters().clone(); authorized_voters.get_and_cache_authorized_voter_for_epoch(epoch); authorized_voters.get_and_cache_authorized_voter_for_epoch(epoch + 1); @@ -292,6 +297,10 @@ impl ClusterInfoVoteListener { let actual_authorized_voter = vote_tracker.get_authorized_voter(&vote_pubkey, *last_vote_slot); + println!( + "vk: {:?}, actual_av: {:?}", + vote_pubkey, actual_authorized_voter + ); if actual_authorized_voter.is_none() { continue; } @@ -476,6 +485,11 @@ mod tests { validator_voting_keypairs.iter().for_each(|keypairs| { let node_keypair = &keypairs.node_keypair; let vote_keypair = &keypairs.vote_keypair; + println!( + "nk: {:?}, vk: {:?}", + node_keypair.pubkey().to_string(), + vote_keypair.pubkey().to_string() + ); let vote_ix = vote_instruction::vote( &node_keypair.pubkey(), &vote_keypair.pubkey(), From 85b8a03b3212c2fdacaea1df78b64be7dced7bce Mon Sep 17 00:00:00 2001 From: Carl Date: Tue, 18 Feb 2020 16:17:13 -0800 Subject: [PATCH 06/16] Add tracking of node_id to vote accounts --- core/src/cluster_info_vote_listener.rs | 218 +++++++++++++++++-------- 1 file changed, 150 insertions(+), 68 deletions(-) diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index dac4287c3fe031..6d186e6544358f 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -25,11 +25,16 @@ use std::sync::{Arc, Mutex, RwLock}; use std::thread::{self, sleep, Builder, JoinHandle}; use std::time::Duration; +pub type EpochAuthorizedVoters = HashMap, AuthorizedVoters>; +pub type NodeIdToVoteAccounts = HashMap>; + pub struct VoteTracker { // Don't track votes from people who are not staked, acts as a spam filter - epoch_authorized_voters: HashMap, AuthorizedVoters>, + epoch_authorized_voters: EpochAuthorizedVoters, // Map from a slot to a set of validators who have voted for that slot votes: HashMap>>, + // Map from node id to the set of associated vote accounts + node_id_to_vote_accounts: NodeIdToVoteAccounts, epoch_schedule: EpochSchedule, } @@ -38,11 +43,12 @@ impl VoteTracker { let leader_schedule_epoch = root_bank .epoch_schedule() .get_leader_schedule_epoch(root_bank.slot()); - let epoch_authorized_voters = - Self::get_staked_authorized_voters(root_bank, leader_schedule_epoch); + let (epoch_authorized_voters, node_id_to_vote_accounts) = + Self::parse_epoch_state(root_bank, leader_schedule_epoch); Self { epoch_authorized_voters, votes: HashMap::new(), + node_id_to_vote_accounts, epoch_schedule: root_bank.epoch_schedule().clone(), } } @@ -68,13 +74,15 @@ impl VoteTracker { .unwrap_or(None) } - pub fn get_staked_authorized_voters( + pub fn parse_epoch_state( bank: &Bank, epoch: Epoch, - ) -> HashMap, AuthorizedVoters> { + ) -> (EpochAuthorizedVoters, NodeIdToVoteAccounts) { + let mut node_id_to_vote_accounts: NodeIdToVoteAccounts = HashMap::new(); // Get all known vote accounts with nonzero stake and read out their // authorized voters - bank.epoch_vote_accounts(epoch) + let epoch_authorized_voters = bank + .epoch_vote_accounts(epoch) .expect("Epoch vote accounts must exist") .into_iter() .filter_map(|(key, (stake, account))| { @@ -97,12 +105,19 @@ impl VoteTracker { let mut authorized_voters = vote_state.authorized_voters().clone(); authorized_voters.get_and_cache_authorized_voter_for_epoch(epoch); authorized_voters.get_and_cache_authorized_voter_for_epoch(epoch + 1); - Some((Arc::new(*key), authorized_voters)) + let key = Arc::new(*key); + node_id_to_vote_accounts + .entry(vote_state.node_pubkey) + .or_default() + .push(*key.clone()); + Some((key, authorized_voters)) } else { None } }) - .collect() + .collect(); + + (epoch_authorized_voters, node_id_to_vote_accounts) } pub fn vote_contains_authorized_voter(vote_tx: &Transaction, authorized_voter: Pubkey) -> bool { @@ -115,6 +130,20 @@ impl VoteTracker { false } + + // Given a set of validator node ids `N` and vote accounts `V`, removes the vote accounts + // from `V` that belong to `N` + pub fn diff_vote_accounts(&self, node_ids: &[Pubkey], vote_accounts: &mut HashSet) { + for node_id in node_ids { + self.node_id_to_vote_accounts + .get(node_id) + .map(|node_vote_accounts| { + for node_vote_account in node_vote_accounts { + vote_accounts.remove(node_vote_account); + } + }); + } + } } pub struct ClusterInfoVoteListener { @@ -271,63 +300,69 @@ impl ClusterInfoVoteListener { } for tx in vote_txs { - if let (Some(instruction), Some(vote_pubkey)) = ( - tx.message.instructions.first(), - tx.message.account_keys.first(), - ) { - if let Ok(vote_instruction) = limited_deserialize(&instruction.data) { - let vote = { - match vote_instruction { - VoteInstruction::Vote(vote) => vote, - _ => { - continue; - } + if let (Some(vote_pubkey), Some(vote_instruction)) = tx + .message + .instructions + .first() + .and_then(|first_instruction| { + first_instruction.accounts.first().and_then(|offset| { + Some(( + tx.message.account_keys.get(*offset as usize), + limited_deserialize(&first_instruction.data).ok(), + )) + }) + }) + .unwrap_or((None, None)) + { + let vote = { + match vote_instruction { + VoteInstruction::Vote(vote) => vote, + _ => { + continue; } - }; - - if vote.slots.is_empty() { - continue; } + }; - let last_vote_slot = vote.slots.last().unwrap(); + if vote.slots.is_empty() { + continue; + } - // Determine the authorized voter based on the last vote slot. This will - // drop votes from authorized voters trying to make votes for slots - // earlier than the epoch for which they are authorized - let actual_authorized_voter = - vote_tracker.get_authorized_voter(&vote_pubkey, *last_vote_slot); + let last_vote_slot = vote.slots.last().unwrap(); - println!( - "vk: {:?}, actual_av: {:?}", - vote_pubkey, actual_authorized_voter - ); - if actual_authorized_voter.is_none() { - continue; - } + // Determine the authorized voter based on the last vote slot. This will + // drop votes from authorized voters trying to make votes for slots + // earlier than the epoch for which they are authorized + let actual_authorized_voter = + vote_tracker.get_authorized_voter(&vote_pubkey, *last_vote_slot); - // Voting without the correct authorized pubkey, dump the vote - if !VoteTracker::vote_contains_authorized_voter( - &tx, - actual_authorized_voter.unwrap(), - ) { - continue; - } + println!( + "vk: {:?}, actual_av: {:?}", + vote_pubkey, actual_authorized_voter + ); + if actual_authorized_voter.is_none() { + continue; + } + + // Voting without the correct authorized pubkey, dump the vote + if !VoteTracker::vote_contains_authorized_voter( + &tx, + actual_authorized_voter.unwrap(), + ) { + continue; + } - // Only accept votes from authorized vote pubkeys with non-zero stake - // that we determined at leader_schedule_epoch boundaries - if let Some(vote_pubkey) = vote_tracker.get_voter_pubkey(&vote_pubkey) { - for slot in vote.slots { - // Don't insert if we already have marked down this pubkey - // voting for this slot - if slot_pubkeys - .get(&slot) - .map(|slot_vote_pubkeys| { - slot_vote_pubkeys.contains(vote_pubkey) - }) - .unwrap_or(false) - { - diff.entry(slot).or_default().insert(vote_pubkey.clone()); - } + // Only accept votes from authorized vote pubkeys with non-zero stake + // that we determined at leader_schedule_epoch boundaries + if let Some(vote_pubkey) = vote_tracker.get_voter_pubkey(&vote_pubkey) { + for slot in vote.slots { + // Don't insert if we already have marked down this pubkey + // voting for this slot + if slot_pubkeys + .get(&slot) + .map(|slot_vote_pubkeys| slot_vote_pubkeys.contains(vote_pubkey)) + .unwrap_or(false) + { + diff.entry(slot).or_default().insert(vote_pubkey.clone()); } } } @@ -353,8 +388,8 @@ impl ClusterInfoVoteListener { new_leader_schedule_epoch: Epoch, ) { let mut current_validators = vote_tracker.read().unwrap().epoch_authorized_voters.clone(); - let new_epoch_authorized_voters = - VoteTracker::get_staked_authorized_voters(root_bank, new_leader_schedule_epoch); + let (new_epoch_authorized_voters, mut new_node_id_to_vote_accounts) = + VoteTracker::parse_epoch_state(root_bank, new_leader_schedule_epoch); // Remove the old pubkeys current_validators.retain(|pubkey, _| new_epoch_authorized_voters.contains_key(pubkey)); @@ -369,6 +404,11 @@ impl ClusterInfoVoteListener { &mut current_validators, &mut vote_tracker.epoch_authorized_voters, ); + + std::mem::swap( + &mut new_node_id_to_vote_accounts, + &mut vote_tracker.node_id_to_vote_accounts, + ) } pub fn join(self) -> thread::Result<()> { @@ -466,7 +506,7 @@ mod tests { #[test] fn test_process_votes() { - // Create some voters at genesiss + // Create some voters at genesis let validator_voting_keypairs: Vec<_> = (0..10) .map(|_| ValidatorVoteKeypairs::new(Keypair::new(), Keypair::new(), Keypair::new())) .collect(); @@ -485,13 +525,9 @@ mod tests { validator_voting_keypairs.iter().for_each(|keypairs| { let node_keypair = &keypairs.node_keypair; let vote_keypair = &keypairs.vote_keypair; - println!( - "nk: {:?}, vk: {:?}", - node_keypair.pubkey().to_string(), - vote_keypair.pubkey().to_string() - ); + let vote_ix = vote_instruction::vote( - &node_keypair.pubkey(), + &vote_keypair.pubkey(), &vote_keypair.pubkey(), Vote::new(vec![vote_slot], Hash::default()), ); @@ -499,7 +535,7 @@ mod tests { let mut vote_tx = Transaction::new_with_payer(vec![vote_ix], Some(&node_keypair.pubkey())); - vote_tx.partial_sign(&[node_keypair], Hash::default()); + vote_tx.partial_sign(&[vote_keypair], Hash::default()); vote_tx.partial_sign(&[vote_keypair], Hash::default()); votes_sender.send(vec![vote_tx]).unwrap(); }); @@ -514,6 +550,52 @@ mod tests { } } + #[test] + fn test_diff_vote_accounts() { + // Create some voters at genesis + let validator_voting_keypairs: Vec<_> = (0..10) + .map(|_| ValidatorVoteKeypairs::new(Keypair::new(), Keypair::new(), Keypair::new())) + .collect(); + let GenesisConfigInfo { genesis_config, .. } = + genesis_utils::create_genesis_config_with_vote_accounts( + 10_000, + &validator_voting_keypairs, + ); + let bank = Bank::new(&genesis_config); + + // Send some votes to process + let vote_tracker = VoteTracker::new(&bank); + + // Given all the node id's, should diff out all the vote accounts + let node_ids: Vec<_> = validator_voting_keypairs + .iter() + .map(|v| v.node_keypair.pubkey()) + .collect(); + let mut vote_accounts = validator_voting_keypairs + .iter() + .map(|v| v.vote_keypair.pubkey()) + .collect(); + vote_tracker.diff_vote_accounts(&node_ids, &mut vote_accounts); + assert!(vote_accounts.is_empty()); + + // Given the later half of the node id's, should diff out + // the later half of the vote accounts + let node_ids: Vec<_> = validator_voting_keypairs[5..] + .iter() + .map(|v| v.node_keypair.pubkey()) + .collect(); + let mut vote_accounts = validator_voting_keypairs + .iter() + .map(|v| v.vote_keypair.pubkey()) + .collect::>(); + vote_tracker.diff_vote_accounts(&node_ids, &mut vote_accounts); + let expected = validator_voting_keypairs[0..5] + .iter() + .map(|v| v.vote_keypair.pubkey()) + .collect::>(); + assert_eq!(vote_accounts, expected); + } + #[test] fn test_process_new_leader_schedule_epoch() { // Make sure size doesn't grow From f664b082b8075403fd180eeb83ab59d8663bf50c Mon Sep 17 00:00:00 2001 From: Carl Date: Tue, 18 Feb 2020 19:32:28 -0800 Subject: [PATCH 07/16] more testing --- core/src/cluster_info_vote_listener.rs | 265 ++++++++++++++++++------- 1 file changed, 197 insertions(+), 68 deletions(-) diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 6d186e6544358f..50ad7554f5675e 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -271,7 +271,7 @@ impl ClusterInfoVoteListener { old_leader_schedule_epoch = new_leader_schedule_epoch; } - if let Err(e) = Self::process_votes(&vote_txs_receiver, &vote_tracker) { + if let Err(e) = Self::get_and_process_votes(&vote_txs_receiver, &vote_tracker) { match e { Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => { return Ok(()); @@ -285,20 +285,25 @@ impl ClusterInfoVoteListener { } } - fn process_votes( + fn get_and_process_votes( vote_txs_receiver: &CrossbeamReceiver>, vote_tracker: &Arc>, ) -> Result<()> { let timer = Duration::from_millis(200); let mut vote_txs = vote_txs_receiver.recv_timeout(timer)?; + while let Ok(new_txs) = vote_txs_receiver.try_recv() { + vote_txs.extend(new_txs); + } + + Self::process_votes(vote_tracker, vote_txs); + Ok(()) + } + + fn process_votes(vote_tracker: &Arc>, vote_txs: Vec) { let mut diff: HashMap>> = HashMap::new(); { let vote_tracker = vote_tracker.read().unwrap(); let slot_pubkeys = &vote_tracker.votes; - while let Ok(new_txs) = vote_txs_receiver.try_recv() { - vote_txs.extend(new_txs); - } - for tx in vote_txs { if let (Some(vote_pubkey), Some(vote_instruction)) = tx .message @@ -335,10 +340,6 @@ impl ClusterInfoVoteListener { let actual_authorized_voter = vote_tracker.get_authorized_voter(&vote_pubkey, *last_vote_slot); - println!( - "vk: {:?}, actual_av: {:?}", - vote_pubkey, actual_authorized_voter - ); if actual_authorized_voter.is_none() { continue; } @@ -357,13 +358,13 @@ impl ClusterInfoVoteListener { for slot in vote.slots { // Don't insert if we already have marked down this pubkey // voting for this slot - if slot_pubkeys - .get(&slot) - .map(|slot_vote_pubkeys| slot_vote_pubkeys.contains(vote_pubkey)) - .unwrap_or(false) - { - diff.entry(slot).or_default().insert(vote_pubkey.clone()); + if let Some(slot_vote_pubkeys) = slot_pubkeys.get(&slot) { + if slot_vote_pubkeys.contains(vote_pubkey) { + continue; + } } + + diff.entry(slot).or_default().insert(vote_pubkey.clone()); } } } @@ -378,19 +379,37 @@ impl ClusterInfoVoteListener { slot_pubkeys.insert(pk); } } + } + pub fn join(self) -> thread::Result<()> { + for thread_hdl in self.thread_hdls { + thread_hdl.join()?; + } Ok(()) } - pub fn process_new_leader_schedule_epoch( + fn process_new_leader_schedule_epoch( root_bank: &Bank, vote_tracker: &RwLock, new_leader_schedule_epoch: Epoch, ) { - let mut current_validators = vote_tracker.read().unwrap().epoch_authorized_voters.clone(); - let (new_epoch_authorized_voters, mut new_node_id_to_vote_accounts) = + let (new_epoch_authorized_voters, new_node_id_to_vote_accounts) = VoteTracker::parse_epoch_state(root_bank, new_leader_schedule_epoch); + Self::process_new_leader_schedule_epoch_state( + vote_tracker, + new_epoch_authorized_voters, + new_node_id_to_vote_accounts, + ); + } + + fn process_new_leader_schedule_epoch_state( + vote_tracker: &RwLock, + new_epoch_authorized_voters: EpochAuthorizedVoters, + mut new_node_id_to_vote_accounts: NodeIdToVoteAccounts, + ) { + let mut current_validators = vote_tracker.read().unwrap().epoch_authorized_voters.clone(); + // Remove the old pubkeys current_validators.retain(|pubkey, _| new_epoch_authorized_voters.contains_key(pubkey)); @@ -400,6 +419,7 @@ impl ClusterInfoVoteListener { current_validators.extend(new_epoch_authorized_voters); let mut vote_tracker = vote_tracker.write().unwrap(); + std::mem::swap( &mut current_validators, &mut vote_tracker.epoch_authorized_voters, @@ -410,13 +430,6 @@ impl ClusterInfoVoteListener { &mut vote_tracker.node_id_to_vote_accounts, ) } - - pub fn join(self) -> thread::Result<()> { - for thread_hdl in self.thread_hdls { - thread_hdl.join()?; - } - Ok(()) - } } #[cfg(test)] @@ -438,13 +451,8 @@ mod tests { let node_keypair = Keypair::new(); let vote_keypair = Keypair::new(); let slots: Vec<_> = (0..31).into_iter().collect(); - let votes = Vote::new(slots, Hash::default()); - let vote_ix = vote_instruction::vote(&vote_keypair.pubkey(), &vote_keypair.pubkey(), votes); - - let mut vote_tx = Transaction::new_with_payer(vec![vote_ix], Some(&node_keypair.pubkey())); - vote_tx.partial_sign(&[&node_keypair], Hash::default()); - vote_tx.partial_sign(&[&vote_keypair], Hash::default()); + let vote_tx = new_vote_tx(slots, Hash::default(), &node_keypair, &vote_keypair); use bincode::serialized_size; info!("max vote size {}", serialized_size(&vote_tx).unwrap()); @@ -459,16 +467,7 @@ mod tests { let node_keypair = Keypair::new(); let authorized_voter = Keypair::new(); - let vote_ix = vote_instruction::vote( - &node_keypair.pubkey(), - &authorized_voter.pubkey(), - Vote::default(), - ); - - let mut vote_tx = Transaction::new_with_payer(vec![vote_ix], Some(&node_keypair.pubkey())); - - vote_tx.partial_sign(&[&node_keypair], Hash::default()); - vote_tx.partial_sign(&[&authorized_voter], Hash::default()); + let vote_tx = new_vote_tx(vec![0], Hash::default(), &node_keypair, &authorized_voter); // Check that the two signing keys pass the check assert!(VoteTracker::vote_contains_authorized_voter( @@ -482,14 +481,7 @@ mod tests { )); // Set the authorized voter == node keypair - let vote_ix = vote_instruction::vote( - &node_keypair.pubkey(), - &node_keypair.pubkey(), - Vote::default(), - ); - let mut vote_tx = Transaction::new_with_payer(vec![vote_ix], Some(&node_keypair.pubkey())); - vote_tx.partial_sign(&[&node_keypair], Hash::default()); - vote_tx.partial_sign(&[&node_keypair], Hash::default()); + let vote_tx = new_vote_tx(vec![0], Hash::default(), &node_keypair, &node_keypair); // Check that the node_keypair itself still passes the authorized voter check assert!(VoteTracker::vote_contains_authorized_voter( @@ -497,7 +489,7 @@ mod tests { node_keypair.pubkey() )); - // The other keyapir should not pass + // The other keypair should not pass assert!(!VoteTracker::vote_contains_authorized_voter( &vote_tx, authorized_voter.pubkey() @@ -521,32 +513,72 @@ mod tests { let vote_tracker = Arc::new(RwLock::new(VoteTracker::new(&bank))); let (votes_sender, votes_receiver) = unbounded(); - let vote_slot = 1; + let vote_slots = vec![1, 2]; validator_voting_keypairs.iter().for_each(|keypairs| { let node_keypair = &keypairs.node_keypair; let vote_keypair = &keypairs.vote_keypair; + let vote_tx = new_vote_tx( + vote_slots.clone(), + Hash::default(), + node_keypair, + vote_keypair, + ); + votes_sender.send(vec![vote_tx]).unwrap(); + }); - let vote_ix = vote_instruction::vote( - &vote_keypair.pubkey(), - &vote_keypair.pubkey(), - Vote::new(vec![vote_slot], Hash::default()), + // Check that all the votes were registered for each validator correctly + ClusterInfoVoteListener::get_and_process_votes(&votes_receiver, &vote_tracker).unwrap(); + for vote_slot in vote_slots { + let r_vote_tracker = vote_tracker.read().unwrap(); + let votes = r_vote_tracker.votes(); + let votes_for_slot = votes.get(&vote_slot).unwrap(); + for voting_keypairs in &validator_voting_keypairs { + assert!(votes_for_slot.contains(&voting_keypairs.vote_keypair.pubkey())); + } + } + } + + #[test] + fn test_process_votes2() { + // Create some voters at genesis + let num_voters = 10; + let validator_voting_keypairs: Vec<_> = (0..num_voters) + .map(|_| ValidatorVoteKeypairs::new(Keypair::new(), Keypair::new(), Keypair::new())) + .collect(); + let GenesisConfigInfo { genesis_config, .. } = + genesis_utils::create_genesis_config_with_vote_accounts( + 10_000, + &validator_voting_keypairs, ); + let bank = Bank::new(&genesis_config); - let mut vote_tx = - Transaction::new_with_payer(vec![vote_ix], Some(&node_keypair.pubkey())); + // Send some votes to process + let vote_tracker = Arc::new(RwLock::new(VoteTracker::new(&bank))); + let (votes_sender, votes_receiver) = unbounded(); - vote_tx.partial_sign(&[vote_keypair], Hash::default()); - vote_tx.partial_sign(&[vote_keypair], Hash::default()); - votes_sender.send(vec![vote_tx]).unwrap(); - }); + for (i, keyset) in validator_voting_keypairs.chunks(2).enumerate() { + let validator_votes: Vec<_> = keyset + .iter() + .map(|keypairs| { + let node_keypair = &keypairs.node_keypair; + let vote_keypair = &keypairs.vote_keypair; + let vote_tx = + new_vote_tx(vec![i as u64], Hash::default(), node_keypair, vote_keypair); + vote_tx + }) + .collect(); + votes_sender.send(validator_votes).unwrap(); + } // Check that all the votes were registered for each validator correctly - ClusterInfoVoteListener::process_votes(&votes_receiver, &vote_tracker).unwrap(); + ClusterInfoVoteListener::get_and_process_votes(&votes_receiver, &vote_tracker).unwrap(); let r_vote_tracker = vote_tracker.read().unwrap(); let votes = r_vote_tracker.votes(); - let votes_for_slot = votes.get(&vote_slot).unwrap(); - for voting_keypairs in validator_voting_keypairs { - assert!(votes_for_slot.contains(&voting_keypairs.node_keypair.pubkey())); + for (i, keyset) in validator_voting_keypairs.chunks(2).enumerate() { + let votes_for_slot = votes.get(&(i as u64)).unwrap(); + for voting_keypairs in keyset { + assert!(votes_for_slot.contains(&voting_keypairs.vote_keypair.pubkey())); + } } } @@ -597,7 +629,104 @@ mod tests { } #[test] - fn test_process_new_leader_schedule_epoch() { - // Make sure size doesn't grow + fn test_vote_tracker_references() { + // Create some voters at genesis + let validator_voting_keypairs: Vec<_> = (0..2) + .map(|_| ValidatorVoteKeypairs::new(Keypair::new(), Keypair::new(), Keypair::new())) + .collect(); + + let GenesisConfigInfo { genesis_config, .. } = + genesis_utils::create_genesis_config_with_vote_accounts( + 10_000, + &validator_voting_keypairs, + ); + let bank = Bank::new(&genesis_config); + + // Send a vote to process, should add a reference to the pubkey for that voter + // in the tracker + let validator0_keypairs = &validator_voting_keypairs[0]; + let vote_tracker = Arc::new(RwLock::new(VoteTracker::new(&bank))); + let vote_tx = vec![new_vote_tx( + vec![0], + Hash::default(), + &validator0_keypairs.node_keypair, + &validator0_keypairs.vote_keypair, + )]; + + { + let r_vote_tracker = vote_tracker.read().unwrap(); + let pubkey_ref = r_vote_tracker + .get_voter_pubkey(&validator0_keypairs.vote_keypair.pubkey()) + .unwrap(); + + // Refcount is 1 because no votes have referenced this pubkey yet + assert_eq!(Arc::strong_count(&pubkey_ref), 1); + + ClusterInfoVoteListener::process_votes(&vote_tracker, vote_tx); + let pubkey_ref = r_vote_tracker + .get_voter_pubkey(&validator0_keypairs.vote_keypair.pubkey()) + .unwrap(); + + // This pubkey voted for a slot, so the refcount is now 2 + assert_eq!(Arc::strong_count(&pubkey_ref), 2); + } + + // Move into the next epoch, a new set of voters is introduced, with some + // old voters also still present + let new_pubkey = Arc::new(Pubkey::new_rand()); + let old_refreshed_pubkey = Arc::new(validator0_keypairs.vote_keypair.pubkey()); + let old_outdated_pubkey = validator_voting_keypairs[1].vote_keypair.pubkey(); + let mut new_epoch_authorized_voters = HashMap::new(); + let new_node_id_to_vote_accounts = HashMap::new(); + + // Create the set of relevant voters for the next epoch + new_epoch_authorized_voters + .insert(old_refreshed_pubkey.clone(), AuthorizedVoters::default()); + new_epoch_authorized_voters.insert(new_pubkey.clone(), AuthorizedVoters::default()); + + ClusterInfoVoteListener::process_new_leader_schedule_epoch_state( + &vote_tracker, + new_epoch_authorized_voters, + new_node_id_to_vote_accounts, + ); + let r_vote_tracker = vote_tracker.read().unwrap(); + + assert!(r_vote_tracker + .epoch_authorized_voters + .get(&new_pubkey) + .is_some()); + assert!(r_vote_tracker + .epoch_authorized_voters + .get(&old_outdated_pubkey) + .is_none()); + + // Make sure new copies of the same pubkeys aren't constantly being + // introduced when the same voter is in both the old and new set + let pubkey_ref = r_vote_tracker + .get_voter_pubkey(&old_refreshed_pubkey) + .unwrap(); + + // Ref count remains unchanged from earlier + assert_eq!(Arc::strong_count(&pubkey_ref), 2); + } + + fn new_vote_tx( + slots: Vec, + blockhash: Hash, + node_keypair: &Keypair, + authorized_voter_keypair: &Keypair, + ) -> Transaction { + let votes = Vote::new(slots, blockhash); + let vote_ix = vote_instruction::vote( + &node_keypair.pubkey(), + &authorized_voter_keypair.pubkey(), + votes, + ); + + let mut vote_tx = Transaction::new_with_payer(vec![vote_ix], Some(&node_keypair.pubkey())); + + vote_tx.partial_sign(&[node_keypair], blockhash); + vote_tx.partial_sign(&[authorized_voter_keypair], blockhash); + vote_tx } } From e6bae4634fa5c33f415b2b82ce9111233913c350 Mon Sep 17 00:00:00 2001 From: Carl Date: Tue, 18 Feb 2020 19:44:53 -0800 Subject: [PATCH 08/16] Purge when new roots are set, don't track votes <= root --- core/src/cluster_info_vote_listener.rs | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 50ad7554f5675e..b2b3f7cc589701 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -36,6 +36,7 @@ pub struct VoteTracker { // Map from node id to the set of associated vote accounts node_id_to_vote_accounts: NodeIdToVoteAccounts, epoch_schedule: EpochSchedule, + root: u64, } impl VoteTracker { @@ -50,6 +51,7 @@ impl VoteTracker { votes: HashMap::new(), node_id_to_vote_accounts, epoch_schedule: root_bank.epoch_schedule().clone(), + root: root_bank.slot(), } } @@ -191,6 +193,13 @@ impl ClusterInfoVoteListener { } } + pub fn join(self) -> thread::Result<()> { + for thread_hdl in self.thread_hdls { + thread_hdl.join()?; + } + Ok(()) + } + fn recv_loop( exit: Arc, cluster_info: &Arc>, @@ -259,6 +268,10 @@ impl ClusterInfoVoteListener { } let root_bank = bank_forks.read().unwrap().root_bank().clone(); + if root_bank.slot() != vote_tracker.read().unwrap().root { + Self::process_new_root(&vote_tracker, root_bank.slot()); + } + let new_leader_schedule_epoch = { root_bank.get_leader_schedule_epoch(root_bank.slot()) }; @@ -304,6 +317,7 @@ impl ClusterInfoVoteListener { { let vote_tracker = vote_tracker.read().unwrap(); let slot_pubkeys = &vote_tracker.votes; + let root = vote_tracker.root; for tx in vote_txs { if let (Some(vote_pubkey), Some(vote_instruction)) = tx .message @@ -356,6 +370,9 @@ impl ClusterInfoVoteListener { // that we determined at leader_schedule_epoch boundaries if let Some(vote_pubkey) = vote_tracker.get_voter_pubkey(&vote_pubkey) { for slot in vote.slots { + if slot <= root { + continue; + } // Don't insert if we already have marked down this pubkey // voting for this slot if let Some(slot_vote_pubkeys) = slot_pubkeys.get(&slot) { @@ -381,11 +398,10 @@ impl ClusterInfoVoteListener { } } - pub fn join(self) -> thread::Result<()> { - for thread_hdl in self.thread_hdls { - thread_hdl.join()?; - } - Ok(()) + fn process_new_root(vote_tracker: &RwLock, new_root: Slot) { + let mut w_vote_tracker = vote_tracker.write().unwrap(); + w_vote_tracker.root = new_root; + w_vote_tracker.votes.retain(|slot, _| *slot >= new_root) } fn process_new_leader_schedule_epoch( From c2750d1c9370479e17ee215aeabe2533c1b32683 Mon Sep 17 00:00:00 2001 From: Carl Date: Wed, 19 Feb 2020 00:11:24 -0800 Subject: [PATCH 09/16] Fixes --- core/src/cluster_info_vote_listener.rs | 152 ++++++++++++++++--------- 1 file changed, 99 insertions(+), 53 deletions(-) diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index b2b3f7cc589701..2a812e2ee23c39 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -17,8 +17,9 @@ use solana_sdk::{ pubkey::Pubkey, transaction::Transaction, }; -use solana_vote_program::vote_instruction::VoteInstruction; -use solana_vote_program::vote_state::{AuthorizedVoters, VoteState}; +use solana_vote_program::{ + authorized_voters::AuthorizedVoters, vote_instruction::VoteInstruction, vote_state::VoteState, +}; use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, RwLock}; @@ -50,7 +51,7 @@ impl VoteTracker { epoch_authorized_voters, votes: HashMap::new(), node_id_to_vote_accounts, - epoch_schedule: root_bank.epoch_schedule().clone(), + epoch_schedule: *root_bank.epoch_schedule(), root: root_bank.slot(), } } @@ -69,10 +70,7 @@ impl VoteTracker { let epoch = self.epoch_schedule.get_epoch(slot); self.epoch_authorized_voters .get(pubkey) - .map(|authorized_voters| { - println!("authorized voters: {:?}", authorized_voters); - authorized_voters.get_authorized_voter(epoch) - }) + .map(|authorized_voters| authorized_voters.get_authorized_voter(epoch)) .unwrap_or(None) } @@ -86,7 +84,7 @@ impl VoteTracker { let epoch_authorized_voters = bank .epoch_vote_accounts(epoch) .expect("Epoch vote accounts must exist") - .into_iter() + .iter() .filter_map(|(key, (stake, account))| { let vote_state = VoteState::from(&account); if vote_state.is_none() { @@ -101,9 +99,7 @@ impl VoteTracker { return None; } let vote_state = vote_state.unwrap(); - println!("Found vote state"); if *stake > 0 { - println!("Found vote state with stake > 0, key: {:?}", key); let mut authorized_voters = vote_state.authorized_voters().clone(); authorized_voters.get_and_cache_authorized_voter_for_epoch(epoch); authorized_voters.get_and_cache_authorized_voter_for_epoch(epoch + 1); @@ -137,13 +133,11 @@ impl VoteTracker { // from `V` that belong to `N` pub fn diff_vote_accounts(&self, node_ids: &[Pubkey], vote_accounts: &mut HashSet) { for node_id in node_ids { - self.node_id_to_vote_accounts - .get(node_id) - .map(|node_vote_accounts| { - for node_vote_account in node_vote_accounts { - vote_accounts.remove(node_vote_account); - } - }); + if let Some(node_vote_accounts) = self.node_id_to_vote_accounts.get(node_id) { + for node_vote_account in node_vote_accounts { + vote_accounts.remove(node_vote_account); + } + }; } } } @@ -324,11 +318,11 @@ impl ClusterInfoVoteListener { .instructions .first() .and_then(|first_instruction| { - first_instruction.accounts.first().and_then(|offset| { - Some(( + first_instruction.accounts.first().map(|offset| { + ( tx.message.account_keys.get(*offset as usize), limited_deserialize(&first_instruction.data).ok(), - )) + ) }) }) .unwrap_or((None, None)) @@ -468,7 +462,13 @@ mod tests { let vote_keypair = Keypair::new(); let slots: Vec<_> = (0..31).into_iter().collect(); - let vote_tx = new_vote_tx(slots, Hash::default(), &node_keypair, &vote_keypair); + let vote_tx = new_vote_tx( + slots, + Hash::default(), + &node_keypair, + &vote_keypair, + &vote_keypair, + ); use bincode::serialized_size; info!("max vote size {}", serialized_size(&vote_tx).unwrap()); @@ -481,9 +481,16 @@ mod tests { #[test] fn vote_contains_authorized_voter() { let node_keypair = Keypair::new(); + let vote_keypair = Keypair::new(); let authorized_voter = Keypair::new(); - let vote_tx = new_vote_tx(vec![0], Hash::default(), &node_keypair, &authorized_voter); + let vote_tx = new_vote_tx( + vec![0], + Hash::default(), + &node_keypair, + &vote_keypair, + &authorized_voter, + ); // Check that the two signing keys pass the check assert!(VoteTracker::vote_contains_authorized_voter( @@ -496,16 +503,33 @@ mod tests { authorized_voter.pubkey() )); - // Set the authorized voter == node keypair - let vote_tx = new_vote_tx(vec![0], Hash::default(), &node_keypair, &node_keypair); + // Non signing key shouldn't pass the check + assert!(!VoteTracker::vote_contains_authorized_voter( + &vote_tx, + vote_keypair.pubkey() + )); + + // Set the authorized voter == vote keypair + let vote_tx = new_vote_tx( + vec![0], + Hash::default(), + &node_keypair, + &vote_keypair, + &vote_keypair, + ); - // Check that the node_keypair itself still passes the authorized voter check + // Check that the node_keypair and vote keypair pass the authorized voter check assert!(VoteTracker::vote_contains_authorized_voter( &vote_tx, node_keypair.pubkey() )); - // The other keypair should not pass + assert!(VoteTracker::vote_contains_authorized_voter( + &vote_tx, + vote_keypair.pubkey() + )); + + // The other keypair should not pss the cchecck assert!(!VoteTracker::vote_contains_authorized_voter( &vote_tx, authorized_voter.pubkey() @@ -538,6 +562,7 @@ mod tests { Hash::default(), node_keypair, vote_keypair, + vote_keypair, ); votes_sender.send(vec![vote_tx]).unwrap(); }); @@ -578,8 +603,13 @@ mod tests { .map(|keypairs| { let node_keypair = &keypairs.node_keypair; let vote_keypair = &keypairs.vote_keypair; - let vote_tx = - new_vote_tx(vec![i as u64], Hash::default(), node_keypair, vote_keypair); + let vote_tx = new_vote_tx( + vec![i as u64 + 1], + Hash::default(), + node_keypair, + vote_keypair, + vote_keypair, + ); vote_tx }) .collect(); @@ -591,7 +621,7 @@ mod tests { let r_vote_tracker = vote_tracker.read().unwrap(); let votes = r_vote_tracker.votes(); for (i, keyset) in validator_voting_keypairs.chunks(2).enumerate() { - let votes_for_slot = votes.get(&(i as u64)).unwrap(); + let votes_for_slot = votes.get(&(i as u64 + 1)).unwrap(); for voting_keypairs in keyset { assert!(votes_for_slot.contains(&voting_keypairs.vote_keypair.pubkey())); } @@ -663,28 +693,37 @@ mod tests { let validator0_keypairs = &validator_voting_keypairs[0]; let vote_tracker = Arc::new(RwLock::new(VoteTracker::new(&bank))); let vote_tx = vec![new_vote_tx( - vec![0], + // Must vote > root to be processed + vec![bank.slot() + 1], Hash::default(), &validator0_keypairs.node_keypair, &validator0_keypairs.vote_keypair, + &validator0_keypairs.vote_keypair, )]; { - let r_vote_tracker = vote_tracker.read().unwrap(); - let pubkey_ref = r_vote_tracker - .get_voter_pubkey(&validator0_keypairs.vote_keypair.pubkey()) - .unwrap(); + let ref_count = Arc::strong_count( + vote_tracker + .read() + .unwrap() + .get_voter_pubkey(&validator0_keypairs.vote_keypair.pubkey()) + .unwrap(), + ); // Refcount is 1 because no votes have referenced this pubkey yet - assert_eq!(Arc::strong_count(&pubkey_ref), 1); + assert_eq!(ref_count, 1); ClusterInfoVoteListener::process_votes(&vote_tracker, vote_tx); - let pubkey_ref = r_vote_tracker - .get_voter_pubkey(&validator0_keypairs.vote_keypair.pubkey()) - .unwrap(); + let ref_count = Arc::strong_count( + vote_tracker + .read() + .unwrap() + .get_voter_pubkey(&validator0_keypairs.vote_keypair.pubkey()) + .unwrap(), + ); // This pubkey voted for a slot, so the refcount is now 2 - assert_eq!(Arc::strong_count(&pubkey_ref), 2); + assert_eq!(ref_count, 2); } // Move into the next epoch, a new set of voters is introduced, with some @@ -705,36 +744,43 @@ mod tests { new_epoch_authorized_voters, new_node_id_to_vote_accounts, ); - let r_vote_tracker = vote_tracker.read().unwrap(); - assert!(r_vote_tracker - .epoch_authorized_voters - .get(&new_pubkey) - .is_some()); - assert!(r_vote_tracker - .epoch_authorized_voters - .get(&old_outdated_pubkey) - .is_none()); + { + let r_vote_tracker = vote_tracker.read().unwrap(); + assert!(r_vote_tracker + .epoch_authorized_voters + .get(&new_pubkey) + .is_some()); + assert!(r_vote_tracker + .epoch_authorized_voters + .get(&old_outdated_pubkey) + .is_none()); + } // Make sure new copies of the same pubkeys aren't constantly being // introduced when the same voter is in both the old and new set - let pubkey_ref = r_vote_tracker - .get_voter_pubkey(&old_refreshed_pubkey) - .unwrap(); + let ref_count = Arc::strong_count( + vote_tracker + .read() + .unwrap() + .get_voter_pubkey(&old_refreshed_pubkey) + .unwrap(), + ); // Ref count remains unchanged from earlier - assert_eq!(Arc::strong_count(&pubkey_ref), 2); + assert_eq!(ref_count, 2); } fn new_vote_tx( slots: Vec, blockhash: Hash, node_keypair: &Keypair, + vote_keypair: &Keypair, authorized_voter_keypair: &Keypair, ) -> Transaction { let votes = Vote::new(slots, blockhash); let vote_ix = vote_instruction::vote( - &node_keypair.pubkey(), + &vote_keypair.pubkey(), &authorized_voter_keypair.pubkey(), votes, ); From e22f47a8ebcb5343838df092d9a6770a2b4217c0 Mon Sep 17 00:00:00 2001 From: Carl Date: Mon, 2 Mar 2020 15:58:16 -0800 Subject: [PATCH 10/16] Partition by epoch --- core/src/cluster_info_vote_listener.rs | 376 +++++++++++++++++-------- 1 file changed, 252 insertions(+), 124 deletions(-) diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 2a812e2ee23c39..23af95db96c526 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -11,79 +11,113 @@ use solana_ledger::bank_forks::BankForks; use solana_metrics::inc_new_counter_debug; use solana_runtime::bank::Bank; use solana_sdk::{ + account::Account, clock::{Epoch, Slot}, epoch_schedule::EpochSchedule, program_utils::limited_deserialize, pubkey::Pubkey, transaction::Transaction, }; -use solana_vote_program::{ - authorized_voters::AuthorizedVoters, vote_instruction::VoteInstruction, vote_state::VoteState, -}; +use solana_vote_program::{vote_instruction::VoteInstruction, vote_state::VoteState}; use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, RwLock}; use std::thread::{self, sleep, Builder, JoinHandle}; use std::time::Duration; -pub type EpochAuthorizedVoters = HashMap, AuthorizedVoters>; -pub type NodeIdToVoteAccounts = HashMap>; +// Map from a vote account to the authorized voter for an epoch +pub type EpochAuthorizedVoters = HashMap, Pubkey>; +pub type NodeIdToVoteAccounts = HashMap>>; pub struct VoteTracker { // Don't track votes from people who are not staked, acts as a spam filter - epoch_authorized_voters: EpochAuthorizedVoters, + epoch_authorized_voters: HashMap, // Map from a slot to a set of validators who have voted for that slot votes: HashMap>>, // Map from node id to the set of associated vote accounts - node_id_to_vote_accounts: NodeIdToVoteAccounts, + node_id_to_vote_accounts: HashMap, epoch_schedule: EpochSchedule, + all_pubkeys: HashSet>, root: u64, } impl VoteTracker { pub fn new(root_bank: &Bank) -> Self { + let current_epoch = root_bank.epoch(); let leader_schedule_epoch = root_bank .epoch_schedule() .get_leader_schedule_epoch(root_bank.slot()); - let (epoch_authorized_voters, node_id_to_vote_accounts) = - Self::parse_epoch_state(root_bank, leader_schedule_epoch); - Self { - epoch_authorized_voters, + + let mut vote_tracker = Self { + epoch_authorized_voters: HashMap::new(), votes: HashMap::new(), - node_id_to_vote_accounts, + node_id_to_vote_accounts: HashMap::new(), epoch_schedule: *root_bank.epoch_schedule(), + all_pubkeys: HashSet::new(), root: root_bank.slot(), + }; + + // Parse voter information about all the known epochs + for epoch in current_epoch..=leader_schedule_epoch { + let (new_epoch_authorized_voters, new_node_id_to_vote_accounts, new_pubkeys) = + VoteTracker::parse_epoch_state( + epoch, + root_bank + .epoch_vote_accounts(epoch) + .expect("Epoch vote accounts must exist"), + &vote_tracker.all_pubkeys, + ); + vote_tracker.process_new_leader_schedule_epoch_state( + epoch, + new_epoch_authorized_voters, + new_node_id_to_vote_accounts, + new_pubkeys, + ); } + + vote_tracker } pub fn votes(&self) -> &HashMap>> { &self.votes } - pub fn get_voter_pubkey(&self, pubkey: &Pubkey) -> Option<&Arc> { + // Returns Some if the given pubkey is a staked voter for the epoch at the given + // slot. Note this decisoin uses bank.EpochStakes not live stakes. + pub fn get_voter_pubkey(&self, pubkey: &Pubkey, slot: Slot) -> Option<&Arc> { + let epoch = self.epoch_schedule.get_epoch(slot); self.epoch_authorized_voters - .get_key_value(pubkey) - .map(|(key, _)| key) + .get(&epoch) + .map(|epoch_authorized_voters| { + epoch_authorized_voters + .get_key_value(pubkey) + .map(|(key, _)| key) + }) + .unwrap_or(None) } - pub fn get_authorized_voter(&self, pubkey: &Pubkey, slot: Slot) -> Option { + pub fn get_authorized_voter(&self, pubkey: &Pubkey, slot: Slot) -> Option<&Pubkey> { let epoch = self.epoch_schedule.get_epoch(slot); self.epoch_authorized_voters - .get(pubkey) - .map(|authorized_voters| authorized_voters.get_authorized_voter(epoch)) + .get(&epoch) + .map(|epoch_authorized_voters| epoch_authorized_voters.get(pubkey)) .unwrap_or(None) } pub fn parse_epoch_state( - bank: &Bank, epoch: Epoch, - ) -> (EpochAuthorizedVoters, NodeIdToVoteAccounts) { + epoch_vote_acounts: &HashMap, + all_pubkeys: &HashSet>, + ) -> ( + EpochAuthorizedVoters, + NodeIdToVoteAccounts, + Vec>, + ) { + let mut new_pubkeys = vec![]; let mut node_id_to_vote_accounts: NodeIdToVoteAccounts = HashMap::new(); // Get all known vote accounts with nonzero stake and read out their // authorized voters - let epoch_authorized_voters = bank - .epoch_vote_accounts(epoch) - .expect("Epoch vote accounts must exist") + let epoch_authorized_voters = epoch_vote_acounts .iter() .filter_map(|(key, (stake, account))| { let vote_state = VoteState::from(&account); @@ -103,25 +137,41 @@ impl VoteTracker { let mut authorized_voters = vote_state.authorized_voters().clone(); authorized_voters.get_and_cache_authorized_voter_for_epoch(epoch); authorized_voters.get_and_cache_authorized_voter_for_epoch(epoch + 1); - let key = Arc::new(*key); + let unduplicated_key = all_pubkeys.get(key).cloned().unwrap_or_else(|| { + let new_key = Arc::new(*key); + new_pubkeys.push(new_key.clone()); + new_key + }); node_id_to_vote_accounts .entry(vote_state.node_pubkey) .or_default() - .push(*key.clone()); - Some((key, authorized_voters)) + .push(unduplicated_key.clone()); + Some(( + unduplicated_key, + authorized_voters + .get_authorized_voter(epoch) + .expect("Authorized voter for current epoch must be known"), + )) } else { None } }) .collect(); - (epoch_authorized_voters, node_id_to_vote_accounts) + ( + epoch_authorized_voters, + node_id_to_vote_accounts, + new_pubkeys, + ) } - pub fn vote_contains_authorized_voter(vote_tx: &Transaction, authorized_voter: Pubkey) -> bool { + pub fn vote_contains_authorized_voter( + vote_tx: &Transaction, + authorized_voter: &Pubkey, + ) -> bool { let message = &vote_tx.message; for (i, key) in message.account_keys.iter().enumerate() { - if message.is_signer(i) && *key == authorized_voter { + if message.is_signer(i) && key == authorized_voter { return true; } } @@ -131,15 +181,42 @@ impl VoteTracker { // Given a set of validator node ids `N` and vote accounts `V`, removes the vote accounts // from `V` that belong to `N` - pub fn diff_vote_accounts(&self, node_ids: &[Pubkey], vote_accounts: &mut HashSet) { - for node_id in node_ids { - if let Some(node_vote_accounts) = self.node_id_to_vote_accounts.get(node_id) { - for node_vote_account in node_vote_accounts { - vote_accounts.remove(node_vote_account); - } - }; + pub fn diff_vote_accounts( + &self, + node_ids: &[Pubkey], + vote_accounts: &mut HashSet, + slot: Slot, + ) { + let epoch = self.epoch_schedule.get_epoch(slot); + if let Some(node_id_to_vote_accounts) = self.node_id_to_vote_accounts.get(&epoch) { + for node_id in node_ids { + if let Some(node_vote_accounts) = node_id_to_vote_accounts.get(node_id) { + for node_vote_account in node_vote_accounts { + vote_accounts.remove(node_vote_account); + } + }; + } } } + + fn process_new_leader_schedule_epoch_state( + &mut self, + new_leader_schedule_epoch: Epoch, + new_epoch_authorized_voters: EpochAuthorizedVoters, + new_node_id_to_vote_accounts: NodeIdToVoteAccounts, + new_pubkeys: Vec>, + ) { + self.epoch_authorized_voters + .insert(new_leader_schedule_epoch, new_epoch_authorized_voters); + self.node_id_to_vote_accounts + .insert(new_leader_schedule_epoch, new_node_id_to_vote_accounts); + for key in new_pubkeys { + self.all_pubkeys.insert(key); + } + + self.all_pubkeys + .retain(|pubkey| Arc::strong_count(pubkey) > 1); + } } pub struct ClusterInfoVoteListener { @@ -270,6 +347,12 @@ impl ClusterInfoVoteListener { { root_bank.get_leader_schedule_epoch(root_bank.slot()) }; if old_leader_schedule_epoch != new_leader_schedule_epoch { + assert!(vote_tracker + .read() + .unwrap() + .epoch_authorized_voters + .get(&new_leader_schedule_epoch) + .is_none()); Self::process_new_leader_schedule_epoch( &root_bank, &vote_tracker, @@ -306,7 +389,7 @@ impl ClusterInfoVoteListener { Ok(()) } - fn process_votes(vote_tracker: &Arc>, vote_txs: Vec) { + fn process_votes(vote_tracker: &RwLock, vote_txs: Vec) { let mut diff: HashMap>> = HashMap::new(); { let vote_tracker = vote_tracker.read().unwrap(); @@ -360,13 +443,15 @@ impl ClusterInfoVoteListener { continue; } - // Only accept votes from authorized vote pubkeys with non-zero stake - // that we determined at leader_schedule_epoch boundaries - if let Some(vote_pubkey) = vote_tracker.get_voter_pubkey(&vote_pubkey) { - for slot in vote.slots { - if slot <= root { - continue; - } + for slot in vote.slots { + if slot <= root { + continue; + } + + // Only accept votes from authorized vote pubkeys with non-zero stake + // that we determined at leader_schedule_epoch boundaries + if let Some(vote_pubkey) = vote_tracker.get_voter_pubkey(&vote_pubkey, slot) + { // Don't insert if we already have marked down this pubkey // voting for this slot if let Some(slot_vote_pubkeys) = slot_pubkeys.get(&slot) { @@ -394,8 +479,15 @@ impl ClusterInfoVoteListener { fn process_new_root(vote_tracker: &RwLock, new_root: Slot) { let mut w_vote_tracker = vote_tracker.write().unwrap(); + let root_epoch = w_vote_tracker.epoch_schedule.get_epoch(new_root); w_vote_tracker.root = new_root; - w_vote_tracker.votes.retain(|slot, _| *slot >= new_root) + w_vote_tracker.votes.retain(|slot, _| *slot >= new_root); + w_vote_tracker + .node_id_to_vote_accounts + .retain(|epoch, _| epoch >= &root_epoch); + w_vote_tracker + .epoch_authorized_voters + .retain(|epoch, _| epoch >= &root_epoch); } fn process_new_leader_schedule_epoch( @@ -403,42 +495,24 @@ impl ClusterInfoVoteListener { vote_tracker: &RwLock, new_leader_schedule_epoch: Epoch, ) { - let (new_epoch_authorized_voters, new_node_id_to_vote_accounts) = - VoteTracker::parse_epoch_state(root_bank, new_leader_schedule_epoch); - - Self::process_new_leader_schedule_epoch_state( - vote_tracker, - new_epoch_authorized_voters, - new_node_id_to_vote_accounts, - ); - } - - fn process_new_leader_schedule_epoch_state( - vote_tracker: &RwLock, - new_epoch_authorized_voters: EpochAuthorizedVoters, - mut new_node_id_to_vote_accounts: NodeIdToVoteAccounts, - ) { - let mut current_validators = vote_tracker.read().unwrap().epoch_authorized_voters.clone(); - - // Remove the old pubkeys - current_validators.retain(|pubkey, _| new_epoch_authorized_voters.contains_key(pubkey)); - - // Insert any new pubkeys, don't re-insert ones we already have, - // otherwise memory usage increases from the duplicates being held - // in Arc references to those duplicates in VoteTracker.votes - current_validators.extend(new_epoch_authorized_voters); - - let mut vote_tracker = vote_tracker.write().unwrap(); - - std::mem::swap( - &mut current_validators, - &mut vote_tracker.epoch_authorized_voters, - ); + let (new_epoch_authorized_voters, new_node_id_to_vote_accounts, new_pubkeys) = + VoteTracker::parse_epoch_state( + new_leader_schedule_epoch, + root_bank + .epoch_vote_accounts(new_leader_schedule_epoch) + .expect("Epoch vote accounts must exist"), + &vote_tracker.read().unwrap().all_pubkeys, + ); - std::mem::swap( - &mut new_node_id_to_vote_accounts, - &mut vote_tracker.node_id_to_vote_accounts, - ) + vote_tracker + .write() + .unwrap() + .process_new_leader_schedule_epoch_state( + new_leader_schedule_epoch, + new_epoch_authorized_voters, + new_node_id_to_vote_accounts, + new_pubkeys, + ); } } @@ -453,7 +527,10 @@ mod tests { use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, Signer}; use solana_sdk::transaction::Transaction; - use solana_vote_program::{vote_instruction, vote_state::Vote}; + use solana_vote_program::{ + vote_instruction, + vote_state::{create_account, Vote}, + }; #[test] fn test_max_vote_tx_fits() { @@ -495,18 +572,18 @@ mod tests { // Check that the two signing keys pass the check assert!(VoteTracker::vote_contains_authorized_voter( &vote_tx, - node_keypair.pubkey() + &node_keypair.pubkey() )); assert!(VoteTracker::vote_contains_authorized_voter( &vote_tx, - authorized_voter.pubkey() + &authorized_voter.pubkey() )); // Non signing key shouldn't pass the check assert!(!VoteTracker::vote_contains_authorized_voter( &vote_tx, - vote_keypair.pubkey() + &vote_keypair.pubkey() )); // Set the authorized voter == vote keypair @@ -521,18 +598,18 @@ mod tests { // Check that the node_keypair and vote keypair pass the authorized voter check assert!(VoteTracker::vote_contains_authorized_voter( &vote_tx, - node_keypair.pubkey() + &node_keypair.pubkey() )); assert!(VoteTracker::vote_contains_authorized_voter( &vote_tx, - vote_keypair.pubkey() + &vote_keypair.pubkey() )); // The other keypair should not pss the cchecck assert!(!VoteTracker::vote_contains_authorized_voter( &vote_tx, - authorized_voter.pubkey() + &authorized_voter.pubkey() )); } @@ -653,7 +730,7 @@ mod tests { .iter() .map(|v| v.vote_keypair.pubkey()) .collect(); - vote_tracker.diff_vote_accounts(&node_ids, &mut vote_accounts); + vote_tracker.diff_vote_accounts(&node_ids, &mut vote_accounts, bank.slot()); assert!(vote_accounts.is_empty()); // Given the later half of the node id's, should diff out @@ -666,7 +743,7 @@ mod tests { .iter() .map(|v| v.vote_keypair.pubkey()) .collect::>(); - vote_tracker.diff_vote_accounts(&node_ids, &mut vote_accounts); + vote_tracker.diff_vote_accounts(&node_ids, &mut vote_accounts, bank.slot()); let expected = validator_voting_keypairs[0..5] .iter() .map(|v| v.vote_keypair.pubkey()) @@ -701,74 +778,125 @@ mod tests { &validator0_keypairs.vote_keypair, )]; - { - let ref_count = Arc::strong_count( - vote_tracker - .read() - .unwrap() - .get_voter_pubkey(&validator0_keypairs.vote_keypair.pubkey()) - .unwrap(), - ); - - // Refcount is 1 because no votes have referenced this pubkey yet - assert_eq!(ref_count, 1); + let mut current_ref_count = Arc::strong_count( + vote_tracker + .read() + .unwrap() + .get_voter_pubkey(&validator0_keypairs.vote_keypair.pubkey(), bank.slot()) + .unwrap(), + ); + { ClusterInfoVoteListener::process_votes(&vote_tracker, vote_tx); let ref_count = Arc::strong_count( vote_tracker .read() .unwrap() - .get_voter_pubkey(&validator0_keypairs.vote_keypair.pubkey()) + .get_voter_pubkey(&validator0_keypairs.vote_keypair.pubkey(), bank.slot()) .unwrap(), ); - // This pubkey voted for a slot, so the refcount is now 2 - assert_eq!(ref_count, 2); + // This pubkey voted for a slot, so the refcount is now 4 + current_ref_count += 1; + assert_eq!(ref_count, current_ref_count); } // Move into the next epoch, a new set of voters is introduced, with some // old voters also still present - let new_pubkey = Arc::new(Pubkey::new_rand()); - let old_refreshed_pubkey = Arc::new(validator0_keypairs.vote_keypair.pubkey()); + let new_pubkey = Pubkey::new_rand(); + + // Pubkey of a vote account that will stick around for the next epoch + let old_refreshed_pubkey = validator0_keypairs.vote_keypair.pubkey(); + let old_refreshed_account = bank.get_account(&old_refreshed_pubkey).unwrap(); + + // Pubkey of a vote account that will be removed in the next epoch let old_outdated_pubkey = validator_voting_keypairs[1].vote_keypair.pubkey(); - let mut new_epoch_authorized_voters = HashMap::new(); - let new_node_id_to_vote_accounts = HashMap::new(); + let new_epoch = bank.get_leader_schedule_epoch(bank.slot()) + 1; + let first_slot_in_new_epoch = bank.epoch_schedule().get_first_slot_in_epoch(new_epoch); // Create the set of relevant voters for the next epoch - new_epoch_authorized_voters - .insert(old_refreshed_pubkey.clone(), AuthorizedVoters::default()); - new_epoch_authorized_voters.insert(new_pubkey.clone(), AuthorizedVoters::default()); - - ClusterInfoVoteListener::process_new_leader_schedule_epoch_state( - &vote_tracker, - new_epoch_authorized_voters, - new_node_id_to_vote_accounts, - ); + let new_epoch_vote_accounts: HashMap<_, _> = vec![ + ((old_refreshed_pubkey.clone(), (1, old_refreshed_account))), + ( + new_pubkey.clone(), + (1, create_account(&new_pubkey, &new_pubkey, 0, 100)), + ), + ] + .into_iter() + .collect(); + + let (new_epoch_authorized_voters, new_node_id_to_vote_accounts, new_pubkeys) = + VoteTracker::parse_epoch_state( + new_epoch, + &new_epoch_vote_accounts, + &vote_tracker.read().unwrap().all_pubkeys, + ); + + // Should add 2 new references to `old_refreshed_pubkey`, one in `new_epoch_authorized_voters`, + // one in `new_node_id_to_vote_accounts` + vote_tracker + .write() + .unwrap() + .process_new_leader_schedule_epoch_state( + new_epoch, + new_epoch_authorized_voters, + new_node_id_to_vote_accounts, + new_pubkeys, + ); { let r_vote_tracker = vote_tracker.read().unwrap(); assert!(r_vote_tracker - .epoch_authorized_voters - .get(&new_pubkey) + .get_voter_pubkey(&new_pubkey, first_slot_in_new_epoch) .is_some()); assert!(r_vote_tracker - .epoch_authorized_voters - .get(&old_outdated_pubkey) + .get_voter_pubkey(&old_outdated_pubkey, first_slot_in_new_epoch) .is_none()); } // Make sure new copies of the same pubkeys aren't constantly being - // introduced when the same voter is in both the old and new set + // introduced when the same voter is in both the old and new epoch + // Instead, only the ref count should go up. + let ref_count = Arc::strong_count( + vote_tracker + .read() + .unwrap() + .get_voter_pubkey(&old_refreshed_pubkey, first_slot_in_new_epoch) + .unwrap(), + ); + + // Ref count goes up by 2 (see above comments) + current_ref_count += 2; + assert_eq!(ref_count, current_ref_count); + + // Make 2 new votes in two different epochs, ref count should go up by 2 + let vote_txs: Vec<_> = [bank.slot() + 2, first_slot_in_new_epoch] + .iter() + .map(|slot| { + new_vote_tx( + // Must vote > root to be processed + vec![*slot], + Hash::default(), + &validator0_keypairs.node_keypair, + &validator0_keypairs.vote_keypair, + &validator0_keypairs.vote_keypair, + ) + }) + .collect(); + + ClusterInfoVoteListener::process_votes(&vote_tracker, vote_txs); + let ref_count = Arc::strong_count( vote_tracker .read() .unwrap() - .get_voter_pubkey(&old_refreshed_pubkey) + .get_voter_pubkey(&old_refreshed_pubkey, first_slot_in_new_epoch) .unwrap(), ); - // Ref count remains unchanged from earlier - assert_eq!(ref_count, 2); + // Ref count goes up by 2 (see above comments) + current_ref_count += 2; + assert_eq!(ref_count, current_ref_count); } fn new_vote_tx( From 4f58cc1529f874ccbfdf1959bb5eb7130caf1508 Mon Sep 17 00:00:00 2001 From: Carl Date: Tue, 3 Mar 2020 15:15:26 -0800 Subject: [PATCH 11/16] Add test --- core/src/cluster_info_vote_listener.rs | 99 ++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 23af95db96c526..2a598f1568068f 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -705,6 +705,105 @@ mod tests { } } + #[test] + fn test_get_voters_by_epoch() { + // Create some voters at genesis + let validator_voting_keypairs: Vec<_> = (0..10) + .map(|_| ValidatorVoteKeypairs::new(Keypair::new(), Keypair::new(), Keypair::new())) + .collect(); + let GenesisConfigInfo { genesis_config, .. } = + genesis_utils::create_genesis_config_with_vote_accounts( + 10_000, + &validator_voting_keypairs, + ); + let bank = Bank::new(&genesis_config); + + let mut vote_tracker = VoteTracker::new(&bank); + let last_known_epoch = bank.get_leader_schedule_epoch(bank.slot()); + let last_known_slot = bank + .epoch_schedule() + .get_last_slot_in_epoch(last_known_epoch); + + // Check we can get the voters and authorized voters + for keypairs in &validator_voting_keypairs { + assert!(vote_tracker + .get_voter_pubkey(&keypairs.vote_keypair.pubkey(), last_known_slot) + .is_some()); + assert!(vote_tracker + .get_voter_pubkey(&keypairs.vote_keypair.pubkey(), last_known_slot + 1) + .is_none()); + assert!(vote_tracker + .get_authorized_voter(&keypairs.vote_keypair.pubkey(), last_known_slot) + .is_some()); + assert!(vote_tracker + .get_authorized_voter(&keypairs.vote_keypair.pubkey(), last_known_slot + 1) + .is_none()); + } + + // Create the set of relevant voters for the next epoch + let new_epoch = last_known_epoch + 1; + let first_slot_in_new_epoch = bank.epoch_schedule().get_first_slot_in_epoch(new_epoch); + let new_keypairs: Vec<_> = (0..10) + .map(|_| ValidatorVoteKeypairs::new(Keypair::new(), Keypair::new(), Keypair::new())) + .collect(); + let new_epoch_vote_accounts: HashMap<_, _> = new_keypairs + .iter() + .chain(validator_voting_keypairs[0..5].iter()) + .map(|keypair| { + ( + keypair.vote_keypair.pubkey(), + ( + 1, + bank.get_account(&keypair.vote_keypair.pubkey()) + .unwrap_or(create_account( + &keypair.vote_keypair.pubkey(), + &keypair.vote_keypair.pubkey(), + 0, + 100, + )), + ), + ) + }) + .collect(); + + let (new_epoch_authorized_voters, new_node_id_to_vote_accounts, new_pubkeys) = + VoteTracker::parse_epoch_state( + new_epoch, + &new_epoch_vote_accounts, + &vote_tracker.all_pubkeys, + ); + + vote_tracker.process_new_leader_schedule_epoch_state( + new_epoch, + new_epoch_authorized_voters, + new_node_id_to_vote_accounts, + new_pubkeys, + ); + + // These keypairs made it into the new epoch + for keypairs in new_keypairs + .iter() + .chain(validator_voting_keypairs[0..5].iter()) + { + assert!(vote_tracker + .get_voter_pubkey(&keypairs.vote_keypair.pubkey(), first_slot_in_new_epoch) + .is_some()); + assert!(vote_tracker + .get_authorized_voter(&keypairs.vote_keypair.pubkey(), first_slot_in_new_epoch) + .is_some()); + } + + // These keypairs were not refreshed in new epoch + for keypairs in validator_voting_keypairs[5..10].iter() { + assert!(vote_tracker + .get_voter_pubkey(&keypairs.vote_keypair.pubkey(), first_slot_in_new_epoch) + .is_none()); + assert!(vote_tracker + .get_authorized_voter(&keypairs.vote_keypair.pubkey(), first_slot_in_new_epoch) + .is_none()); + } + } + #[test] fn test_diff_vote_accounts() { // Create some voters at genesis From a7968d4e0505ac27df6e2a5404288e0ba71bf53b Mon Sep 17 00:00:00 2001 From: Carl Date: Wed, 4 Mar 2020 12:19:36 -0800 Subject: [PATCH 12/16] Use new utility function --- core/src/cluster_info_vote_listener.rs | 48 ++++++++------------------ 1 file changed, 15 insertions(+), 33 deletions(-) diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 2a598f1568068f..086dacb6e6f2c9 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -526,11 +526,7 @@ mod tests { }; use solana_sdk::hash::Hash; use solana_sdk::signature::{Keypair, Signer}; - use solana_sdk::transaction::Transaction; - use solana_vote_program::{ - vote_instruction, - vote_state::{create_account, Vote}, - }; + use solana_vote_program::{vote_state::create_account, vote_transaction}; #[test] fn test_max_vote_tx_fits() { @@ -539,9 +535,10 @@ mod tests { let vote_keypair = Keypair::new(); let slots: Vec<_> = (0..31).into_iter().collect(); - let vote_tx = new_vote_tx( + let vote_tx = vote_transaction::new_vote_transaction( slots, Hash::default(), + Hash::default(), &node_keypair, &vote_keypair, &vote_keypair, @@ -561,9 +558,10 @@ mod tests { let vote_keypair = Keypair::new(); let authorized_voter = Keypair::new(); - let vote_tx = new_vote_tx( + let vote_tx = vote_transaction::new_vote_transaction( vec![0], Hash::default(), + Hash::default(), &node_keypair, &vote_keypair, &authorized_voter, @@ -587,9 +585,10 @@ mod tests { )); // Set the authorized voter == vote keypair - let vote_tx = new_vote_tx( + let vote_tx = vote_transaction::new_vote_transaction( vec![0], Hash::default(), + Hash::default(), &node_keypair, &vote_keypair, &vote_keypair, @@ -634,9 +633,10 @@ mod tests { validator_voting_keypairs.iter().for_each(|keypairs| { let node_keypair = &keypairs.node_keypair; let vote_keypair = &keypairs.vote_keypair; - let vote_tx = new_vote_tx( + let vote_tx = vote_transaction::new_vote_transaction( vote_slots.clone(), Hash::default(), + Hash::default(), node_keypair, vote_keypair, vote_keypair, @@ -680,9 +680,10 @@ mod tests { .map(|keypairs| { let node_keypair = &keypairs.node_keypair; let vote_keypair = &keypairs.vote_keypair; - let vote_tx = new_vote_tx( + let vote_tx = vote_transaction::new_vote_transaction( vec![i as u64 + 1], Hash::default(), + Hash::default(), node_keypair, vote_keypair, vote_keypair, @@ -868,10 +869,11 @@ mod tests { // in the tracker let validator0_keypairs = &validator_voting_keypairs[0]; let vote_tracker = Arc::new(RwLock::new(VoteTracker::new(&bank))); - let vote_tx = vec![new_vote_tx( + let vote_tx = vec![vote_transaction::new_vote_transaction( // Must vote > root to be processed vec![bank.slot() + 1], Hash::default(), + Hash::default(), &validator0_keypairs.node_keypair, &validator0_keypairs.vote_keypair, &validator0_keypairs.vote_keypair, @@ -972,10 +974,11 @@ mod tests { let vote_txs: Vec<_> = [bank.slot() + 2, first_slot_in_new_epoch] .iter() .map(|slot| { - new_vote_tx( + vote_transaction::new_vote_transaction( // Must vote > root to be processed vec![*slot], Hash::default(), + Hash::default(), &validator0_keypairs.node_keypair, &validator0_keypairs.vote_keypair, &validator0_keypairs.vote_keypair, @@ -997,25 +1000,4 @@ mod tests { current_ref_count += 2; assert_eq!(ref_count, current_ref_count); } - - fn new_vote_tx( - slots: Vec, - blockhash: Hash, - node_keypair: &Keypair, - vote_keypair: &Keypair, - authorized_voter_keypair: &Keypair, - ) -> Transaction { - let votes = Vote::new(slots, blockhash); - let vote_ix = vote_instruction::vote( - &vote_keypair.pubkey(), - &authorized_voter_keypair.pubkey(), - votes, - ); - - let mut vote_tx = Transaction::new_with_payer(vec![vote_ix], Some(&node_keypair.pubkey())); - - vote_tx.partial_sign(&[node_keypair], blockhash); - vote_tx.partial_sign(&[authorized_voter_keypair], blockhash); - vote_tx - } } From b3ad412cdd34552767119c159db11f6448f36f6e Mon Sep 17 00:00:00 2001 From: Carl Date: Thu, 5 Mar 2020 18:13:25 -0800 Subject: [PATCH 13/16] Fix rebase --- runtime/src/message_processor.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/src/message_processor.rs b/runtime/src/message_processor.rs index 5eef38f09d6ecb..acc8dba8f744ee 100644 --- a/runtime/src/message_processor.rs +++ b/runtime/src/message_processor.rs @@ -178,7 +178,7 @@ impl MessageProcessor { .accounts .iter() .map(|&index| { - let is_signer = index < message.header.num_required_signatures; + let is_signer = message.is_signer(index as usize); let index = index as usize; let key = &message.account_keys[index]; let account = &accounts[index]; From 60fa54cb749d42e5b0177f1942a2103d2cfac54b Mon Sep 17 00:00:00 2001 From: Carl Date: Thu, 5 Mar 2020 19:03:24 -0800 Subject: [PATCH 14/16] Use more fine grained locking --- core/src/cluster_info_vote_listener.rs | 272 ++++++++++++++----------- core/src/replay_stage.rs | 2 +- core/src/tpu.rs | 2 +- core/src/tvu.rs | 6 +- core/src/validator.rs | 9 +- 5 files changed, 164 insertions(+), 127 deletions(-) diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 086dacb6e6f2c9..1a43da97f7107d 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -26,19 +26,29 @@ use std::thread::{self, sleep, Builder, JoinHandle}; use std::time::Duration; // Map from a vote account to the authorized voter for an epoch -pub type EpochAuthorizedVoters = HashMap, Pubkey>; +pub type EpochAuthorizedVoters = HashMap, Arc>; pub type NodeIdToVoteAccounts = HashMap>>; +pub struct SlotVoteTracker { + voted: HashSet>, + updates: Option>>, +} + +impl SlotVoteTracker { + fn get_updates(&mut self) -> Option>> { + self.updates.take() + } +} + pub struct VoteTracker { - // Don't track votes from people who are not staked, acts as a spam filter - epoch_authorized_voters: HashMap, // Map from a slot to a set of validators who have voted for that slot - votes: HashMap>>, + slot_vote_trackers: RwLock>>>, + // Don't track votes from people who are not staked, acts as a spam filter + epoch_authorized_voters: RwLock>, // Map from node id to the set of associated vote accounts - node_id_to_vote_accounts: HashMap, + node_id_to_vote_accounts: RwLock>, + all_pubkeys: RwLock>>, epoch_schedule: EpochSchedule, - all_pubkeys: HashSet>, - root: u64, } impl VoteTracker { @@ -48,13 +58,12 @@ impl VoteTracker { .epoch_schedule() .get_leader_schedule_epoch(root_bank.slot()); - let mut vote_tracker = Self { - epoch_authorized_voters: HashMap::new(), - votes: HashMap::new(), - node_id_to_vote_accounts: HashMap::new(), - epoch_schedule: *root_bank.epoch_schedule(), - all_pubkeys: HashSet::new(), - root: root_bank.slot(), + let vote_tracker = Self { + epoch_authorized_voters: RwLock::new(HashMap::new()), + slot_vote_trackers: RwLock::new(HashMap::new()), + node_id_to_vote_accounts: RwLock::new(HashMap::new()), + all_pubkeys: RwLock::new(HashSet::new()), + epoch_schedule: root_bank.epoch_schedule().clone(), }; // Parse voter information about all the known epochs @@ -65,7 +74,7 @@ impl VoteTracker { root_bank .epoch_vote_accounts(epoch) .expect("Epoch vote accounts must exist"), - &vote_tracker.all_pubkeys, + &vote_tracker.all_pubkeys.read().unwrap(), ); vote_tracker.process_new_leader_schedule_epoch_state( epoch, @@ -78,15 +87,17 @@ impl VoteTracker { vote_tracker } - pub fn votes(&self) -> &HashMap>> { - &self.votes + pub fn get_slot_vote_tracker(&self, slot: Slot) -> Option>> { + self.slot_vote_trackers.read().unwrap().get(&slot).cloned() } // Returns Some if the given pubkey is a staked voter for the epoch at the given // slot. Note this decisoin uses bank.EpochStakes not live stakes. - pub fn get_voter_pubkey(&self, pubkey: &Pubkey, slot: Slot) -> Option<&Arc> { + pub fn get_voter_pubkey(&self, pubkey: &Pubkey, slot: Slot) -> Option> { let epoch = self.epoch_schedule.get_epoch(slot); self.epoch_authorized_voters + .read() + .unwrap() .get(&epoch) .map(|epoch_authorized_voters| { epoch_authorized_voters @@ -94,14 +105,18 @@ impl VoteTracker { .map(|(key, _)| key) }) .unwrap_or(None) + .cloned() } - pub fn get_authorized_voter(&self, pubkey: &Pubkey, slot: Slot) -> Option<&Pubkey> { + pub fn get_authorized_voter(&self, pubkey: &Pubkey, slot: Slot) -> Option> { let epoch = self.epoch_schedule.get_epoch(slot); self.epoch_authorized_voters + .read() + .unwrap() .get(&epoch) .map(|epoch_authorized_voters| epoch_authorized_voters.get(pubkey)) .unwrap_or(None) + .cloned() } pub fn parse_epoch_state( @@ -134,9 +149,22 @@ impl VoteTracker { } let vote_state = vote_state.unwrap(); if *stake > 0 { + // Read out the authorized voters let mut authorized_voters = vote_state.authorized_voters().clone(); authorized_voters.get_and_cache_authorized_voter_for_epoch(epoch); - authorized_voters.get_and_cache_authorized_voter_for_epoch(epoch + 1); + let authorized_voter = authorized_voters + .get_authorized_voter(epoch) + .expect("Authorized voter for current epoch must be known"); + + // Get Arcs for all the needed keys + let unduplicated_authorized_voter_key = all_pubkeys + .get(&authorized_voter) + .cloned() + .unwrap_or_else(|| { + let new_key = Arc::new(authorized_voter); + new_pubkeys.push(new_key.clone()); + new_key + }); let unduplicated_key = all_pubkeys.get(key).cloned().unwrap_or_else(|| { let new_key = Arc::new(*key); new_pubkeys.push(new_key.clone()); @@ -146,12 +174,7 @@ impl VoteTracker { .entry(vote_state.node_pubkey) .or_default() .push(unduplicated_key.clone()); - Some(( - unduplicated_key, - authorized_voters - .get_authorized_voter(epoch) - .expect("Authorized voter for current epoch must be known"), - )) + Some((unduplicated_key, unduplicated_authorized_voter_key)) } else { None } @@ -188,7 +211,9 @@ impl VoteTracker { slot: Slot, ) { let epoch = self.epoch_schedule.get_epoch(slot); - if let Some(node_id_to_vote_accounts) = self.node_id_to_vote_accounts.get(&epoch) { + if let Some(node_id_to_vote_accounts) = + self.node_id_to_vote_accounts.write().unwrap().get(&epoch) + { for node_id in node_ids { if let Some(node_vote_accounts) = node_id_to_vote_accounts.get(node_id) { for node_vote_account in node_vote_accounts { @@ -200,21 +225,27 @@ impl VoteTracker { } fn process_new_leader_schedule_epoch_state( - &mut self, + &self, new_leader_schedule_epoch: Epoch, new_epoch_authorized_voters: EpochAuthorizedVoters, new_node_id_to_vote_accounts: NodeIdToVoteAccounts, new_pubkeys: Vec>, ) { self.epoch_authorized_voters + .write() + .unwrap() .insert(new_leader_schedule_epoch, new_epoch_authorized_voters); self.node_id_to_vote_accounts + .write() + .unwrap() .insert(new_leader_schedule_epoch, new_node_id_to_vote_accounts); for key in new_pubkeys { - self.all_pubkeys.insert(key); + self.all_pubkeys.write().unwrap().insert(key); } self.all_pubkeys + .write() + .unwrap() .retain(|pubkey| Arc::strong_count(pubkey) > 1); } } @@ -230,7 +261,7 @@ impl ClusterInfoVoteListener { sigverify_disabled: bool, sender: CrossbeamSender>, poh_recorder: &Arc>, - vote_tracker: Arc>, + vote_tracker: Arc, bank_forks: Arc>, ) -> Self { let exit_ = exit.clone(); @@ -325,12 +356,15 @@ impl ClusterInfoVoteListener { fn process_votes_loop( exit: Arc, vote_txs_receiver: CrossbeamReceiver>, - vote_tracker: Arc>, + vote_tracker: Arc, bank_forks: &RwLock, ) -> Result<()> { - let mut old_leader_schedule_epoch = { + let (mut old_leader_schedule_epoch, mut last_root) = { let root_bank = bank_forks.read().unwrap().root_bank().clone(); - root_bank.get_leader_schedule_epoch(root_bank.slot()) + ( + root_bank.get_leader_schedule_epoch(root_bank.slot()), + root_bank.slot(), + ) }; loop { @@ -339,8 +373,9 @@ impl ClusterInfoVoteListener { } let root_bank = bank_forks.read().unwrap().root_bank().clone(); - if root_bank.slot() != vote_tracker.read().unwrap().root { + if root_bank.slot() != last_root { Self::process_new_root(&vote_tracker, root_bank.slot()); + last_root = root_bank.slot(); } let new_leader_schedule_epoch = @@ -348,9 +383,9 @@ impl ClusterInfoVoteListener { if old_leader_schedule_epoch != new_leader_schedule_epoch { assert!(vote_tracker + .epoch_authorized_voters .read() .unwrap() - .epoch_authorized_voters .get(&new_leader_schedule_epoch) .is_none()); Self::process_new_leader_schedule_epoch( @@ -361,7 +396,9 @@ impl ClusterInfoVoteListener { old_leader_schedule_epoch = new_leader_schedule_epoch; } - if let Err(e) = Self::get_and_process_votes(&vote_txs_receiver, &vote_tracker) { + if let Err(e) = + Self::get_and_process_votes(&vote_txs_receiver, &vote_tracker, last_root) + { match e { Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Disconnected) => { return Ok(()); @@ -377,7 +414,8 @@ impl ClusterInfoVoteListener { fn get_and_process_votes( vote_txs_receiver: &CrossbeamReceiver>, - vote_tracker: &Arc>, + vote_tracker: &Arc, + last_root: Slot, ) -> Result<()> { let timer = Duration::from_millis(200); let mut vote_txs = vote_txs_receiver.recv_timeout(timer)?; @@ -385,16 +423,14 @@ impl ClusterInfoVoteListener { vote_txs.extend(new_txs); } - Self::process_votes(vote_tracker, vote_txs); + Self::process_votes(vote_tracker, vote_txs, last_root); Ok(()) } - fn process_votes(vote_tracker: &RwLock, vote_txs: Vec) { + fn process_votes(vote_tracker: &VoteTracker, vote_txs: Vec, root: Slot) { let mut diff: HashMap>> = HashMap::new(); { - let vote_tracker = vote_tracker.read().unwrap(); - let slot_pubkeys = &vote_tracker.votes; - let root = vote_tracker.root; + let all_slot_trackers = &vote_tracker.slot_vote_trackers; for tx in vote_txs { if let (Some(vote_pubkey), Some(vote_instruction)) = tx .message @@ -438,7 +474,7 @@ impl ClusterInfoVoteListener { // Voting without the correct authorized pubkey, dump the vote if !VoteTracker::vote_contains_authorized_voter( &tx, - actual_authorized_voter.unwrap(), + &actual_authorized_voter.unwrap(), ) { continue; } @@ -454,8 +490,9 @@ impl ClusterInfoVoteListener { { // Don't insert if we already have marked down this pubkey // voting for this slot - if let Some(slot_vote_pubkeys) = slot_pubkeys.get(&slot) { - if slot_vote_pubkeys.contains(vote_pubkey) { + if let Some(slot_tracker) = all_slot_trackers.read().unwrap().get(&slot) + { + if slot_tracker.read().unwrap().voted.contains(&vote_pubkey) { continue; } } @@ -467,32 +504,50 @@ impl ClusterInfoVoteListener { } } - let mut vote_tracker = vote_tracker.write().unwrap(); - let all_votes = &mut vote_tracker.votes; for (slot, slot_diff) in diff { - let slot_pubkeys = all_votes.entry(slot).or_default(); - for pk in slot_diff { - slot_pubkeys.insert(pk); + if let Some(slot_tracker) = vote_tracker.slot_vote_trackers.read().unwrap().get(&slot) { + let mut w_slot_tracker = slot_tracker.write().unwrap(); + for pk in slot_diff { + w_slot_tracker.voted.insert(pk.clone()); + w_slot_tracker.updates.push(pk); + } + } else { + let voted: HashSet<_> = slot_diff.into_iter().collect(); + let new_slot_tracker = SlotVoteTracker { + voted, + updates: vec![], + }; + vote_tracker + .slot_vote_trackers + .write() + .unwrap() + .insert(slot, Arc::new(RwLock::new(new_slot_tracker))); } } } - fn process_new_root(vote_tracker: &RwLock, new_root: Slot) { - let mut w_vote_tracker = vote_tracker.write().unwrap(); - let root_epoch = w_vote_tracker.epoch_schedule.get_epoch(new_root); - w_vote_tracker.root = new_root; - w_vote_tracker.votes.retain(|slot, _| *slot >= new_root); - w_vote_tracker + fn process_new_root(vote_tracker: &VoteTracker, new_root: Slot) { + let root_epoch = vote_tracker.epoch_schedule.get_epoch(new_root); + vote_tracker + .slot_vote_trackers + .write() + .unwrap() + .retain(|slot, _| *slot >= new_root); + vote_tracker .node_id_to_vote_accounts + .write() + .unwrap() .retain(|epoch, _| epoch >= &root_epoch); - w_vote_tracker + vote_tracker .epoch_authorized_voters + .write() + .unwrap() .retain(|epoch, _| epoch >= &root_epoch); } fn process_new_leader_schedule_epoch( root_bank: &Bank, - vote_tracker: &RwLock, + vote_tracker: &VoteTracker, new_leader_schedule_epoch: Epoch, ) { let (new_epoch_authorized_voters, new_node_id_to_vote_accounts, new_pubkeys) = @@ -501,18 +556,15 @@ impl ClusterInfoVoteListener { root_bank .epoch_vote_accounts(new_leader_schedule_epoch) .expect("Epoch vote accounts must exist"), - &vote_tracker.read().unwrap().all_pubkeys, + &vote_tracker.all_pubkeys.read().unwrap(), ); - vote_tracker - .write() - .unwrap() - .process_new_leader_schedule_epoch_state( - new_leader_schedule_epoch, - new_epoch_authorized_voters, - new_node_id_to_vote_accounts, - new_pubkeys, - ); + vote_tracker.process_new_leader_schedule_epoch_state( + new_leader_schedule_epoch, + new_epoch_authorized_voters, + new_node_id_to_vote_accounts, + new_pubkeys, + ); } } @@ -626,7 +678,7 @@ mod tests { let bank = Bank::new(&genesis_config); // Send some votes to process - let vote_tracker = Arc::new(RwLock::new(VoteTracker::new(&bank))); + let vote_tracker = Arc::new(VoteTracker::new(&bank)); let (votes_sender, votes_receiver) = unbounded(); let vote_slots = vec![1, 2]; @@ -645,13 +697,14 @@ mod tests { }); // Check that all the votes were registered for each validator correctly - ClusterInfoVoteListener::get_and_process_votes(&votes_receiver, &vote_tracker).unwrap(); + ClusterInfoVoteListener::get_and_process_votes(&votes_receiver, &vote_tracker, 0).unwrap(); for vote_slot in vote_slots { - let r_vote_tracker = vote_tracker.read().unwrap(); - let votes = r_vote_tracker.votes(); - let votes_for_slot = votes.get(&vote_slot).unwrap(); + let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(vote_slot).unwrap(); + let r_slot_vote_tracker = slot_vote_tracker.read().unwrap(); for voting_keypairs in &validator_voting_keypairs { - assert!(votes_for_slot.contains(&voting_keypairs.vote_keypair.pubkey())); + assert!(r_slot_vote_tracker + .voted + .contains(&voting_keypairs.vote_keypair.pubkey())); } } } @@ -671,7 +724,7 @@ mod tests { let bank = Bank::new(&genesis_config); // Send some votes to process - let vote_tracker = Arc::new(RwLock::new(VoteTracker::new(&bank))); + let vote_tracker = Arc::new(VoteTracker::new(&bank)); let (votes_sender, votes_receiver) = unbounded(); for (i, keyset) in validator_voting_keypairs.chunks(2).enumerate() { @@ -695,13 +748,14 @@ mod tests { } // Check that all the votes were registered for each validator correctly - ClusterInfoVoteListener::get_and_process_votes(&votes_receiver, &vote_tracker).unwrap(); - let r_vote_tracker = vote_tracker.read().unwrap(); - let votes = r_vote_tracker.votes(); + ClusterInfoVoteListener::get_and_process_votes(&votes_receiver, &vote_tracker, 0).unwrap(); for (i, keyset) in validator_voting_keypairs.chunks(2).enumerate() { - let votes_for_slot = votes.get(&(i as u64 + 1)).unwrap(); + let vote_tracker = vote_tracker.get_slot_vote_tracker(i as u64 + 1).unwrap(); + let r_votes_for_slot = &vote_tracker.read().unwrap(); for voting_keypairs in keyset { - assert!(votes_for_slot.contains(&voting_keypairs.vote_keypair.pubkey())); + assert!(r_votes_for_slot + .voted + .contains(&voting_keypairs.vote_keypair.pubkey())); } } } @@ -719,7 +773,7 @@ mod tests { ); let bank = Bank::new(&genesis_config); - let mut vote_tracker = VoteTracker::new(&bank); + let vote_tracker = VoteTracker::new(&bank); let last_known_epoch = bank.get_leader_schedule_epoch(bank.slot()); let last_known_slot = bank .epoch_schedule() @@ -771,7 +825,7 @@ mod tests { VoteTracker::parse_epoch_state( new_epoch, &new_epoch_vote_accounts, - &vote_tracker.all_pubkeys, + &vote_tracker.all_pubkeys.read().unwrap(), ); vote_tracker.process_new_leader_schedule_epoch_state( @@ -868,7 +922,7 @@ mod tests { // Send a vote to process, should add a reference to the pubkey for that voter // in the tracker let validator0_keypairs = &validator_voting_keypairs[0]; - let vote_tracker = Arc::new(RwLock::new(VoteTracker::new(&bank))); + let vote_tracker = VoteTracker::new(&bank); let vote_tx = vec![vote_transaction::new_vote_transaction( // Must vote > root to be processed vec![bank.slot() + 1], @@ -880,19 +934,15 @@ mod tests { )]; let mut current_ref_count = Arc::strong_count( - vote_tracker - .read() - .unwrap() + &vote_tracker .get_voter_pubkey(&validator0_keypairs.vote_keypair.pubkey(), bank.slot()) .unwrap(), ); { - ClusterInfoVoteListener::process_votes(&vote_tracker, vote_tx); + ClusterInfoVoteListener::process_votes(&vote_tracker, vote_tx, 0); let ref_count = Arc::strong_count( - vote_tracker - .read() - .unwrap() + &vote_tracker .get_voter_pubkey(&validator0_keypairs.vote_keypair.pubkey(), bank.slot()) .unwrap(), ); @@ -930,38 +980,30 @@ mod tests { VoteTracker::parse_epoch_state( new_epoch, &new_epoch_vote_accounts, - &vote_tracker.read().unwrap().all_pubkeys, + &vote_tracker.all_pubkeys.read().unwrap(), ); // Should add 2 new references to `old_refreshed_pubkey`, one in `new_epoch_authorized_voters`, // one in `new_node_id_to_vote_accounts` - vote_tracker - .write() - .unwrap() - .process_new_leader_schedule_epoch_state( - new_epoch, - new_epoch_authorized_voters, - new_node_id_to_vote_accounts, - new_pubkeys, - ); + vote_tracker.process_new_leader_schedule_epoch_state( + new_epoch, + new_epoch_authorized_voters, + new_node_id_to_vote_accounts, + new_pubkeys, + ); - { - let r_vote_tracker = vote_tracker.read().unwrap(); - assert!(r_vote_tracker - .get_voter_pubkey(&new_pubkey, first_slot_in_new_epoch) - .is_some()); - assert!(r_vote_tracker - .get_voter_pubkey(&old_outdated_pubkey, first_slot_in_new_epoch) - .is_none()); - } + assert!(vote_tracker + .get_voter_pubkey(&new_pubkey, first_slot_in_new_epoch) + .is_some()); + assert!(vote_tracker + .get_voter_pubkey(&old_outdated_pubkey, first_slot_in_new_epoch) + .is_none()); // Make sure new copies of the same pubkeys aren't constantly being // introduced when the same voter is in both the old and new epoch // Instead, only the ref count should go up. let ref_count = Arc::strong_count( - vote_tracker - .read() - .unwrap() + &vote_tracker .get_voter_pubkey(&old_refreshed_pubkey, first_slot_in_new_epoch) .unwrap(), ); @@ -986,12 +1028,10 @@ mod tests { }) .collect(); - ClusterInfoVoteListener::process_votes(&vote_tracker, vote_txs); + ClusterInfoVoteListener::process_votes(&vote_tracker, vote_txs, 0); let ref_count = Arc::strong_count( - vote_tracker - .read() - .unwrap() + &vote_tracker .get_voter_pubkey(&old_refreshed_pubkey, first_slot_in_new_epoch) .unwrap(), ); diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index d0e6872f5bfe16..e65c50c395f220 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -177,7 +177,7 @@ impl ReplayStage { cluster_info: Arc>, ledger_signal_receiver: Receiver, poh_recorder: Arc>, - _vote_tracker: Arc>, + _vote_tracker: Arc, ) -> (Self, Receiver>>) { let ReplayStageConfig { my_pubkey, diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 7d0c88dcb07421..8a0f572d6a3e8f 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -48,7 +48,7 @@ impl Tpu { broadcast_type: &BroadcastStageType, exit: &Arc, shred_version: u16, - vote_tracker: Arc>, + vote_tracker: Arc, bank_forks: Arc>, ) -> Self { let (packet_sender, packet_receiver) = channel(); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 9d8621da5411b0..d070894cc06e68 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -88,7 +88,11 @@ impl Tvu { shred_version: u16, transaction_status_sender: Option, rewards_recorder_sender: Option, +<<<<<<< HEAD snapshot_package_sender: Option, +======= + vote_tracker: Arc, +>>>>>>> Use more fine grained locking ) -> Self { let keypair: Arc = cluster_info .read() @@ -304,7 +308,7 @@ pub mod tests { None, None, None, - Arc::new(RwLock::new(VoteTracker::new(&bank))), + Arc::new(VoteTracker::new(&bank)), ); exit.store(true, Ordering::Relaxed); tvu.join().unwrap(); diff --git a/core/src/validator.rs b/core/src/validator.rs index addbb59933f682..0d30fcc5ac684e 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -379,14 +379,7 @@ impl Validator { "New shred signal for the TVU should be the same as the clear bank signal." ); - let vote_tracker = Arc::new(RwLock::new({ - let bank_forks = bank_forks.read().unwrap(); - VoteTracker::new( - bank_forks - .get(bank_forks.root()) - .expect("Root bank must exist"), - ) - })); + let vote_tracker = Arc::new({ VoteTracker::new(bank_forks.read().unwrap().root_bank()) }); let tvu = Tvu::new( vote_account, From abc205d9eac5d83ceca759c21691ff1476d76d4a Mon Sep 17 00:00:00 2001 From: Carl Date: Fri, 6 Mar 2020 15:00:36 -0800 Subject: [PATCH 15/16] Fix tests --- core/src/cluster_info_vote_listener.rs | 101 +++++++++++++++++-------- core/src/tvu.rs | 3 - core/src/validator.rs | 1 + 3 files changed, 71 insertions(+), 34 deletions(-) diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index 1a43da97f7107d..d520c2cdff666b 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -35,7 +35,8 @@ pub struct SlotVoteTracker { } impl SlotVoteTracker { - fn get_updates(&mut self) -> Option>> { + #[allow(dead_code)] + pub fn get_updates(&mut self) -> Option>> { self.updates.take() } } @@ -63,7 +64,7 @@ impl VoteTracker { slot_vote_trackers: RwLock::new(HashMap::new()), node_id_to_vote_accounts: RwLock::new(HashMap::new()), all_pubkeys: RwLock::new(HashSet::new()), - epoch_schedule: root_bank.epoch_schedule().clone(), + epoch_schedule: *root_bank.epoch_schedule(), }; // Parse voter information about all the known epochs @@ -92,7 +93,7 @@ impl VoteTracker { } // Returns Some if the given pubkey is a staked voter for the epoch at the given - // slot. Note this decisoin uses bank.EpochStakes not live stakes. + // slot. Note this decision uses bank.EpochStakes not live stakes. pub fn get_voter_pubkey(&self, pubkey: &Pubkey, slot: Slot) -> Option> { let epoch = self.epoch_schedule.get_epoch(slot); self.epoch_authorized_voters @@ -126,9 +127,9 @@ impl VoteTracker { ) -> ( EpochAuthorizedVoters, NodeIdToVoteAccounts, - Vec>, + HashSet>, ) { - let mut new_pubkeys = vec![]; + let mut new_pubkeys = HashSet::new(); let mut node_id_to_vote_accounts: NodeIdToVoteAccounts = HashMap::new(); // Get all known vote accounts with nonzero stake and read out their // authorized voters @@ -161,19 +162,29 @@ impl VoteTracker { .get(&authorized_voter) .cloned() .unwrap_or_else(|| { - let new_key = Arc::new(authorized_voter); - new_pubkeys.push(new_key.clone()); - new_key + new_pubkeys + .get(&authorized_voter) + .cloned() + .unwrap_or_else(|| { + let new_key = Arc::new(authorized_voter); + new_pubkeys.insert(new_key.clone()); + new_key + }) }); + let unduplicated_key = all_pubkeys.get(key).cloned().unwrap_or_else(|| { - let new_key = Arc::new(*key); - new_pubkeys.push(new_key.clone()); - new_key + new_pubkeys.get(key).cloned().unwrap_or_else(|| { + let new_key = Arc::new(*key); + new_pubkeys.insert(new_key.clone()); + new_key + }) }); + node_id_to_vote_accounts .entry(vote_state.node_pubkey) .or_default() .push(unduplicated_key.clone()); + Some((unduplicated_key, unduplicated_authorized_voter_key)) } else { None @@ -229,7 +240,7 @@ impl VoteTracker { new_leader_schedule_epoch: Epoch, new_epoch_authorized_voters: EpochAuthorizedVoters, new_node_id_to_vote_accounts: NodeIdToVoteAccounts, - new_pubkeys: Vec>, + new_pubkeys: HashSet>, ) { self.epoch_authorized_voters .write() @@ -505,17 +516,25 @@ impl ClusterInfoVoteListener { } for (slot, slot_diff) in diff { - if let Some(slot_tracker) = vote_tracker.slot_vote_trackers.read().unwrap().get(&slot) { + let slot_tracker = vote_tracker + .slot_vote_trackers + .read() + .unwrap() + .get(&slot) + .cloned(); + if let Some(slot_tracker) = slot_tracker { let mut w_slot_tracker = slot_tracker.write().unwrap(); + let mut updates = w_slot_tracker.updates.take().unwrap_or_else(|| vec![]); for pk in slot_diff { w_slot_tracker.voted.insert(pk.clone()); - w_slot_tracker.updates.push(pk); + updates.push(pk); } + w_slot_tracker.updates = Some(updates); } else { let voted: HashSet<_> = slot_diff.into_iter().collect(); let new_slot_tracker = SlotVoteTracker { - voted, - updates: vec![], + voted: voted.clone(), + updates: Some(voted.into_iter().collect()), }; vote_tracker .slot_vote_trackers @@ -702,9 +721,13 @@ mod tests { let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(vote_slot).unwrap(); let r_slot_vote_tracker = slot_vote_tracker.read().unwrap(); for voting_keypairs in &validator_voting_keypairs { + let pubkey = voting_keypairs.vote_keypair.pubkey(); + assert!(r_slot_vote_tracker.voted.contains(&pubkey)); assert!(r_slot_vote_tracker - .voted - .contains(&voting_keypairs.vote_keypair.pubkey())); + .updates + .as_ref() + .unwrap() + .contains(&Arc::new(pubkey))); } } } @@ -750,12 +773,16 @@ mod tests { // Check that all the votes were registered for each validator correctly ClusterInfoVoteListener::get_and_process_votes(&votes_receiver, &vote_tracker, 0).unwrap(); for (i, keyset) in validator_voting_keypairs.chunks(2).enumerate() { - let vote_tracker = vote_tracker.get_slot_vote_tracker(i as u64 + 1).unwrap(); - let r_votes_for_slot = &vote_tracker.read().unwrap(); + let slot_vote_tracker = vote_tracker.get_slot_vote_tracker(i as u64 + 1).unwrap(); + let r_slot_vote_tracker = &slot_vote_tracker.read().unwrap(); for voting_keypairs in keyset { - assert!(r_votes_for_slot - .voted - .contains(&voting_keypairs.vote_keypair.pubkey())); + let pubkey = voting_keypairs.vote_keypair.pubkey(); + assert!(r_slot_vote_tracker.voted.contains(&pubkey)); + assert!(r_slot_vote_tracker + .updates + .as_ref() + .unwrap() + .contains(&Arc::new(pubkey))); } } } @@ -907,6 +934,11 @@ mod tests { #[test] fn test_vote_tracker_references() { + // The number of references that get stored for a pubkey every time + // a vote is made. One stored in the SlotVoteTracker.voted, one in + // SlotVoteTracker.updates + let ref_count_per_vote = 2; + // Create some voters at genesis let validator_voting_keypairs: Vec<_> = (0..2) .map(|_| ValidatorVoteKeypairs::new(Keypair::new(), Keypair::new(), Keypair::new())) @@ -947,8 +979,8 @@ mod tests { .unwrap(), ); - // This pubkey voted for a slot, so the refcount is now 4 - current_ref_count += 1; + // This pubkey voted for a slot, so ref count goes up + current_ref_count += ref_count_per_vote; assert_eq!(ref_count, current_ref_count); } @@ -983,8 +1015,14 @@ mod tests { &vote_tracker.all_pubkeys.read().unwrap(), ); - // Should add 2 new references to `old_refreshed_pubkey`, one in `new_epoch_authorized_voters`, - // one in `new_node_id_to_vote_accounts` + assert_eq!( + new_pubkeys, + vec![Arc::new(new_pubkey)].into_iter().collect() + ); + + // Should add 3 new references to `old_refreshed_pubkey`, two in `new_epoch_authorized_voters`, + // (one for the voter, one for the authorized voter b/c both are the same key) and + // one in `new_node_id_to_vote_accounts`s vote_tracker.process_new_leader_schedule_epoch_state( new_epoch, new_epoch_authorized_voters, @@ -1008,11 +1046,12 @@ mod tests { .unwrap(), ); - // Ref count goes up by 2 (see above comments) - current_ref_count += 2; + // Ref count goes up by 3 (see above comments) + current_ref_count += 3; assert_eq!(ref_count, current_ref_count); - // Make 2 new votes in two different epochs, ref count should go up by 2 + // Make 2 new votes in two different epochs, ref count should go up + // by 2 * ref_count_per_vote let vote_txs: Vec<_> = [bank.slot() + 2, first_slot_in_new_epoch] .iter() .map(|slot| { @@ -1037,7 +1076,7 @@ mod tests { ); // Ref count goes up by 2 (see above comments) - current_ref_count += 2; + current_ref_count += 2 * ref_count_per_vote; assert_eq!(ref_count, current_ref_count); } } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index d070894cc06e68..6814926ee1f8f4 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -88,11 +88,8 @@ impl Tvu { shred_version: u16, transaction_status_sender: Option, rewards_recorder_sender: Option, -<<<<<<< HEAD snapshot_package_sender: Option, -======= vote_tracker: Arc, ->>>>>>> Use more fine grained locking ) -> Self { let keypair: Arc = cluster_info .read() diff --git a/core/src/validator.rs b/core/src/validator.rs index 0d30fcc5ac684e..3884810566e9d8 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -429,6 +429,7 @@ impl Validator { transaction_status_sender.clone(), rewards_recorder_sender, snapshot_package_sender, + vote_tracker.clone(), ); if config.dev_sigverify_disabled { From b07af8bef20bf50fdb176d7a4b041c38dfca4117 Mon Sep 17 00:00:00 2001 From: Carl Date: Mon, 9 Mar 2020 20:27:22 -0700 Subject: [PATCH 16/16] Update node_id_to_vote_accounts --- core/src/cluster_info_vote_listener.rs | 57 ++++++++++---------------- 1 file changed, 21 insertions(+), 36 deletions(-) diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index d520c2cdff666b..d4ac2b180f8519 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -215,23 +215,23 @@ impl VoteTracker { // Given a set of validator node ids `N` and vote accounts `V`, removes the vote accounts // from `V` that belong to `N` - pub fn diff_vote_accounts( - &self, - node_ids: &[Pubkey], - vote_accounts: &mut HashSet, - slot: Slot, - ) { + pub fn node_id_to_vote_accounts(&self, node_ids: &[Pubkey], slot: Slot) -> Vec> { let epoch = self.epoch_schedule.get_epoch(slot); if let Some(node_id_to_vote_accounts) = - self.node_id_to_vote_accounts.write().unwrap().get(&epoch) + self.node_id_to_vote_accounts.read().unwrap().get(&epoch) { - for node_id in node_ids { - if let Some(node_vote_accounts) = node_id_to_vote_accounts.get(node_id) { - for node_vote_account in node_vote_accounts { - vote_accounts.remove(node_vote_account); - } - }; - } + node_ids + .iter() + .flat_map(|node_id| { + node_id_to_vote_accounts + .get(node_id) + .cloned() + .unwrap_or_else(|| vec![]) + .into_iter() + }) + .collect() + } else { + vec![] } } @@ -887,7 +887,7 @@ mod tests { } #[test] - fn test_diff_vote_accounts() { + fn test_node_id_to_vote_accounts() { // Create some voters at genesis let validator_voting_keypairs: Vec<_> = (0..10) .map(|_| ValidatorVoteKeypairs::new(Keypair::new(), Keypair::new(), Keypair::new())) @@ -907,29 +907,14 @@ mod tests { .iter() .map(|v| v.node_keypair.pubkey()) .collect(); - let mut vote_accounts = validator_voting_keypairs + let vote_accounts: Vec<_> = validator_voting_keypairs .iter() - .map(|v| v.vote_keypair.pubkey()) - .collect(); - vote_tracker.diff_vote_accounts(&node_ids, &mut vote_accounts, bank.slot()); - assert!(vote_accounts.is_empty()); - - // Given the later half of the node id's, should diff out - // the later half of the vote accounts - let node_ids: Vec<_> = validator_voting_keypairs[5..] - .iter() - .map(|v| v.node_keypair.pubkey()) + .map(|v| Arc::new(v.vote_keypair.pubkey())) .collect(); - let mut vote_accounts = validator_voting_keypairs - .iter() - .map(|v| v.vote_keypair.pubkey()) - .collect::>(); - vote_tracker.diff_vote_accounts(&node_ids, &mut vote_accounts, bank.slot()); - let expected = validator_voting_keypairs[0..5] - .iter() - .map(|v| v.vote_keypair.pubkey()) - .collect::>(); - assert_eq!(vote_accounts, expected); + assert_eq!( + vote_tracker.node_id_to_vote_accounts(&node_ids, bank.slot()), + vote_accounts + ); } #[test]