Skip to content

Commit

Permalink
Backport confirmations fix. (#9252)
Browse files Browse the repository at this point in the history
automerge
  • Loading branch information
CriesofCarrots authored Apr 2, 2020
1 parent d4bb7ce commit 251054d
Show file tree
Hide file tree
Showing 15 changed files with 518 additions and 374 deletions.
193 changes: 100 additions & 93 deletions core/src/commitment.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use crate::consensus::VOTE_THRESHOLD_SIZE;
use solana_measure::measure::Measure;
use solana_metrics::inc_new_counter_info;
use solana_runtime::bank::Bank;
use solana_sdk::clock::Slot;
use solana_vote_program::{vote_state::VoteState, vote_state::MAX_LOCKOUT_HISTORY};
Expand Down Expand Up @@ -31,17 +34,40 @@ impl BlockCommitment {
}
}

#[derive(Debug, Default)]
#[derive(Default)]
pub struct BlockCommitmentCache {
block_commitment: HashMap<Slot, BlockCommitment>,
total_stake: u64,
bank: Arc<Bank>,
root: Slot,
}

impl std::fmt::Debug for BlockCommitmentCache {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BlockCommitmentCache")
.field("block_commitment", &self.block_commitment)
.field("total_stake", &self.total_stake)
.field(
"bank",
&format_args!("Bank({{current_slot: {:?}}})", self.bank.slot()),
)
.field("root", &self.root)
.finish()
}
}

impl BlockCommitmentCache {
pub fn new(block_commitment: HashMap<Slot, BlockCommitment>, total_stake: u64) -> Self {
pub fn new(
block_commitment: HashMap<Slot, BlockCommitment>,
total_stake: u64,
bank: Arc<Bank>,
root: Slot,
) -> Self {
Self {
block_commitment,
total_stake,
bank,
root,
}
}

Expand All @@ -53,38 +79,62 @@ impl BlockCommitmentCache {
self.total_stake
}

pub fn get_block_with_depth_commitment(
&self,
minimum_depth: usize,
minimum_stake_percentage: f64,
) -> Option<Slot> {
self.block_commitment
.iter()
.filter(|&(_, block_commitment)| {
let fork_stake_minimum_depth: u64 = block_commitment.commitment[minimum_depth..]
.iter()
.cloned()
.sum();
fork_stake_minimum_depth as f64 / self.total_stake as f64
>= minimum_stake_percentage
})
.map(|(slot, _)| *slot)
.max()
pub fn bank(&self) -> Arc<Bank> {
self.bank.clone()
}

pub fn get_rooted_block_with_commitment(&self, minimum_stake_percentage: f64) -> Option<u64> {
self.get_block_with_depth_commitment(MAX_LOCKOUT_HISTORY - 1, minimum_stake_percentage)
pub fn slot(&self) -> Slot {
self.bank.slot()
}

pub fn root(&self) -> Slot {
self.root
}

pub fn get_confirmation_count(&self, slot: Slot) -> Option<usize> {
self.get_lockout_count(slot, VOTE_THRESHOLD_SIZE)
}

// Returns the lowest level at which at least `minimum_stake_percentage` of the total epoch
// stake is locked out
fn get_lockout_count(&self, slot: Slot, minimum_stake_percentage: f64) -> Option<usize> {
self.get_block_commitment(slot).map(|block_commitment| {
let iterator = block_commitment.commitment.iter().enumerate().rev();
let mut sum = 0;
for (i, stake) in iterator {
sum += stake;
if (sum as f64 / self.total_stake as f64) > minimum_stake_percentage {
return i + 1;
}
}
0
})
}
#[cfg(test)]
pub fn new_for_tests() -> Self {
let mut block_commitment: HashMap<Slot, BlockCommitment> = HashMap::new();
block_commitment.insert(0, BlockCommitment::default());
Self {
block_commitment,
total_stake: 42,
..Self::default()
}
}
}

pub struct CommitmentAggregationData {
bank: Arc<Bank>,
root: Slot,
total_staked: u64,
}

impl CommitmentAggregationData {
pub fn new(bank: Arc<Bank>, total_staked: u64) -> Self {
Self { bank, total_staked }
pub fn new(bank: Arc<Bank>, root: Slot, total_staked: u64) -> Self {
Self {
bank,
root,
total_staked,
}
}
}

Expand Down Expand Up @@ -144,14 +194,24 @@ impl AggregateCommitmentService {
continue;
}

let mut aggregate_commitment_time = Measure::start("aggregate-commitment-ms");
let block_commitment = Self::aggregate_commitment(&ancestors, &aggregation_data.bank);

let mut new_block_commitment =
BlockCommitmentCache::new(block_commitment, aggregation_data.total_staked);
let mut new_block_commitment = BlockCommitmentCache::new(
block_commitment,
aggregation_data.total_staked,
aggregation_data.bank,
aggregation_data.root,
);

let mut w_block_commitment_cache = block_commitment_cache.write().unwrap();

std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment);
aggregate_commitment_time.stop();
inc_new_counter_info!(
"aggregate-commitment-ms",
aggregate_commitment_time.as_ms() as usize
);
}
}

Expand Down Expand Up @@ -246,84 +306,31 @@ mod tests {
}

#[test]
fn test_get_block_with_depth_commitment() {
fn test_get_confirmations() {
let bank = Arc::new(Bank::default());
// Build BlockCommitmentCache with votes at depths 0 and 1 for 2 slots
let mut cache0 = BlockCommitment::default();
cache0.increase_confirmation_stake(1, 15);
cache0.increase_confirmation_stake(2, 25);
cache0.increase_confirmation_stake(1, 5);
cache0.increase_confirmation_stake(2, 40);

let mut cache1 = BlockCommitment::default();
cache1.increase_confirmation_stake(1, 10);
cache1.increase_confirmation_stake(2, 20);
cache1.increase_confirmation_stake(1, 40);
cache1.increase_confirmation_stake(2, 5);

let mut block_commitment = HashMap::new();
block_commitment.entry(0).or_insert(cache0.clone());
block_commitment.entry(1).or_insert(cache1.clone());
let block_commitment_cache = BlockCommitmentCache::new(block_commitment, 50);

// Neither slot has rooted votes
assert_eq!(
block_commitment_cache.get_rooted_block_with_commitment(0.1),
None
);
// Neither slot meets the minimum level of commitment 0.6 at depth 1
assert_eq!(
block_commitment_cache.get_block_with_depth_commitment(1, 0.6),
None
);
// Only slot 0 meets the minimum level of commitment 0.5 at depth 1
assert_eq!(
block_commitment_cache.get_block_with_depth_commitment(1, 0.5),
Some(0)
);
// If multiple slots meet the minimum level of commitment, method should return the most recent
assert_eq!(
block_commitment_cache.get_block_with_depth_commitment(1, 0.4),
Some(1)
);
// If multiple slots meet the minimum level of commitment, method should return the most recent
assert_eq!(
block_commitment_cache.get_block_with_depth_commitment(0, 0.6),
Some(1)
);
// Neither slot meets the minimum level of commitment 0.9 at depth 0
assert_eq!(
block_commitment_cache.get_block_with_depth_commitment(0, 0.9),
None
);
}

#[test]
fn test_get_rooted_block_with_commitment() {
// Build BlockCommitmentCache with rooted votes
let mut cache0 = BlockCommitment::new([0; MAX_LOCKOUT_HISTORY]);
cache0.increase_confirmation_stake(MAX_LOCKOUT_HISTORY, 40);
cache0.increase_confirmation_stake(MAX_LOCKOUT_HISTORY - 1, 10);
let mut cache1 = BlockCommitment::new([0; MAX_LOCKOUT_HISTORY]);
cache1.increase_confirmation_stake(MAX_LOCKOUT_HISTORY, 30);
cache1.increase_confirmation_stake(MAX_LOCKOUT_HISTORY - 1, 10);
cache1.increase_confirmation_stake(MAX_LOCKOUT_HISTORY - 2, 10);
let mut cache2 = BlockCommitment::default();
cache2.increase_confirmation_stake(1, 20);
cache2.increase_confirmation_stake(2, 5);

let mut block_commitment = HashMap::new();
block_commitment.entry(0).or_insert(cache0.clone());
block_commitment.entry(1).or_insert(cache1.clone());
let block_commitment_cache = BlockCommitmentCache::new(block_commitment, 50);
block_commitment.entry(2).or_insert(cache2.clone());
let block_commitment_cache = BlockCommitmentCache::new(block_commitment, 50, bank, 0);

// Only slot 0 meets the minimum level of commitment 0.66 at root
assert_eq!(
block_commitment_cache.get_rooted_block_with_commitment(0.66),
Some(0)
);
// If multiple slots meet the minimum level of commitment, method should return the most recent
assert_eq!(
block_commitment_cache.get_rooted_block_with_commitment(0.6),
Some(1)
);
// Neither slot meets the minimum level of commitment 0.9 at root
assert_eq!(
block_commitment_cache.get_rooted_block_with_commitment(0.9),
None
);
assert_eq!(block_commitment_cache.get_confirmation_count(0), Some(2));
assert_eq!(block_commitment_cache.get_confirmation_count(1), Some(1));
assert_eq!(block_commitment_cache.get_confirmation_count(2), Some(0),);
assert_eq!(block_commitment_cache.get_confirmation_count(3), None,);
}

#[test]
Expand Down
39 changes: 32 additions & 7 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl ReplayStage {
// Start the replay stage loop

let (lockouts_sender, commitment_service) =
AggregateCommitmentService::new(&exit, block_commitment_cache);
AggregateCommitmentService::new(&exit, block_commitment_cache.clone());

#[allow(clippy::cognitive_complexity)]
let t_replay = Builder::new()
Expand Down Expand Up @@ -309,7 +309,10 @@ impl ReplayStage {
let start = allocated.get();
if !is_locked_out && vote_threshold {
info!("voting: {} {}", bank.slot(), fork_weight);
subscriptions.notify_subscribers(bank.slot(), &bank_forks);
subscriptions.notify_subscribers(
block_commitment_cache.read().unwrap().slot(),
&bank_forks,
);
if let Some(votable_leader) =
leader_schedule_cache.slot_leader_at(bank.slot(), Some(&bank))
{
Expand Down Expand Up @@ -647,7 +650,13 @@ impl ReplayStage {
return Err(e.into());
}
}
Self::update_commitment_cache(bank.clone(), total_staked, lockouts_sender);

Self::update_commitment_cache(
bank.clone(),
bank_forks.read().unwrap().root(),
total_staked,
lockouts_sender,
);

if let Some(ref voting_keypair) = voting_keypair {
let node_keypair = cluster_info.read().unwrap().keypair.clone();
Expand Down Expand Up @@ -675,10 +684,13 @@ impl ReplayStage {

fn update_commitment_cache(
bank: Arc<Bank>,
root: Slot,
total_staked: u64,
lockouts_sender: &Sender<CommitmentAggregationData>,
) {
if let Err(e) = lockouts_sender.send(CommitmentAggregationData::new(bank, total_staked)) {
if let Err(e) =
lockouts_sender.send(CommitmentAggregationData::new(bank, root, total_staked))
{
trace!("lockouts_sender failed: {:?}", e);
}
}
Expand Down Expand Up @@ -1397,7 +1409,10 @@ pub(crate) mod tests {
let bank0 = Bank::new(&genesis_config);
let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank0));
let exit = Arc::new(AtomicBool::new(false));
let subscriptions = Arc::new(RpcSubscriptions::new(&exit));
let subscriptions = Arc::new(RpcSubscriptions::new(
&exit,
Arc::new(RwLock::new(BlockCommitmentCache::default())),
));
let bank_forks = BankForks::new(0, bank0);
bank_forks.working_bank().freeze();

Expand Down Expand Up @@ -1766,7 +1781,12 @@ pub(crate) mod tests {
bank_forks.write().unwrap().insert(bank1);
let arc_bank1 = bank_forks.read().unwrap().get(1).unwrap().clone();
leader_vote(&arc_bank1, &leader_voting_pubkey);
ReplayStage::update_commitment_cache(arc_bank1.clone(), leader_lamports, &lockouts_sender);
ReplayStage::update_commitment_cache(
arc_bank1.clone(),
0,
leader_lamports,
&lockouts_sender,
);

let bank2 = Bank::new_from_parent(&arc_bank1, &Pubkey::default(), arc_bank1.slot() + 1);
let _res = bank2.transfer(10, &genesis_config_info.mint_keypair, &Pubkey::new_rand());
Expand All @@ -1777,7 +1797,12 @@ pub(crate) mod tests {
bank_forks.write().unwrap().insert(bank2);
let arc_bank2 = bank_forks.read().unwrap().get(2).unwrap().clone();
leader_vote(&arc_bank2, &leader_voting_pubkey);
ReplayStage::update_commitment_cache(arc_bank2.clone(), leader_lamports, &lockouts_sender);
ReplayStage::update_commitment_cache(
arc_bank2.clone(),
0,
leader_lamports,
&lockouts_sender,
);
thread::sleep(Duration::from_millis(200));

let mut expected0 = BlockCommitment::default();
Expand Down
Loading

0 comments on commit 251054d

Please sign in to comment.