Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 38 additions & 34 deletions wen-restart/src/last_voted_fork_slots_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ pub struct LastVotedForkSlotsAggregate {
slots_to_repair: HashSet<Slot>,
}

pub struct LastVotedForkSlotsAggregateResult {
pub slots_to_repair: Vec<Slot>,
pub active_percent: f64, /* 0 ~ 100.0 */
}

impl LastVotedForkSlotsAggregate {
pub(crate) fn new(
root_slot: Slot,
Expand Down Expand Up @@ -131,16 +126,16 @@ impl LastVotedForkSlotsAggregate {
Some(record)
}

pub(crate) fn get_aggregate_result(&self) -> LastVotedForkSlotsAggregateResult {
pub(crate) fn active_percent(&self) -> f64 {
let total_stake = self.epoch_stakes.total_stake();
let total_active_stake = self.active_peers.iter().fold(0, |sum: u64, pubkey| {
sum.saturating_add(Self::validator_stake(&self.epoch_stakes, pubkey))
});
let active_percent = total_active_stake as f64 / total_stake as f64 * 100.0;
LastVotedForkSlotsAggregateResult {
slots_to_repair: self.slots_to_repair.iter().cloned().collect(),
active_percent,
}
total_active_stake as f64 / total_stake as f64 * 100.0
}

pub(crate) fn slots_to_repair_iter(&self) -> impl Iterator<Item = &Slot> {
self.slots_to_repair.iter()
}
}

Expand Down Expand Up @@ -237,11 +232,15 @@ mod tests {
}),
);
}
let result = test_state.slots_aggregate.get_aggregate_result();
let mut expected_active_percent =
(initial_num_active_validators + 1) as f64 / TOTAL_VALIDATOR_COUNT as f64 * 100.0;
assert_eq!(result.active_percent, expected_active_percent);
assert!(result.slots_to_repair.is_empty());
assert_eq!(
test_state.slots_aggregate.active_percent(),
(initial_num_active_validators + 1) as f64 / TOTAL_VALIDATOR_COUNT as f64 * 100.0
);
assert!(test_state
.slots_aggregate
.slots_to_repair_iter()
.next()
.is_none());

let new_active_validator = test_state.validator_voting_keypairs
[initial_num_active_validators + 1]
Expand All @@ -267,11 +266,14 @@ mod tests {
wallclock: now,
}),
);
let result = test_state.slots_aggregate.get_aggregate_result();
expected_active_percent =
let expected_active_percent =
(initial_num_active_validators + 2) as f64 / TOTAL_VALIDATOR_COUNT as f64 * 100.0;
assert_eq!(result.active_percent, expected_active_percent);
let mut actual_slots = Vec::from_iter(result.slots_to_repair);
assert_eq!(
test_state.slots_aggregate.active_percent(),
expected_active_percent
);
let mut actual_slots =
Vec::from_iter(test_state.slots_aggregate.slots_to_repair_iter().cloned());
actual_slots.sort();
assert_eq!(actual_slots, test_state.last_voted_fork_slots);

Expand Down Expand Up @@ -299,9 +301,12 @@ mod tests {
wallclock: now,
}),
);
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, expected_active_percent);
let mut actual_slots = Vec::from_iter(result.slots_to_repair);
assert_eq!(
test_state.slots_aggregate.active_percent(),
expected_active_percent
);
let mut actual_slots =
Vec::from_iter(test_state.slots_aggregate.slots_to_repair_iter().cloned());
actual_slots.sort();
assert_eq!(actual_slots, vec![root_slot + 1]);

Expand All @@ -320,9 +325,12 @@ mod tests {
),
None,
);
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, expected_active_percent);
let mut actual_slots = Vec::from_iter(result.slots_to_repair);
assert_eq!(
test_state.slots_aggregate.active_percent(),
expected_active_percent
);
let mut actual_slots =
Vec::from_iter(test_state.slots_aggregate.slots_to_repair_iter().cloned());
actual_slots.sort();
assert_eq!(actual_slots, vec![root_slot + 1]);
}
Expand All @@ -339,8 +347,7 @@ mod tests {
last_vote_bankhash: last_vote_bankhash.to_string(),
shred_version: SHRED_VERSION as u32,
};
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, 10.0);
assert_eq!(test_state.slots_aggregate.active_percent(), 10.0);
assert_eq!(
test_state
.slots_aggregate
Expand All @@ -354,8 +361,7 @@ mod tests {
.unwrap(),
Some(record.clone()),
);
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, 20.0);
assert_eq!(test_state.slots_aggregate.active_percent(), 20.0);
// Now if you get the same result from Gossip again, it should be ignored.
assert_eq!(
test_state.slots_aggregate.aggregate(
Expand Down Expand Up @@ -399,8 +405,7 @@ mod tests {
}),
);
// percentage doesn't change since it's a replace.
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, 20.0);
assert_eq!(test_state.slots_aggregate.active_percent(), 20.0);

// Record from validator with zero stake should be ignored.
assert_eq!(
Expand All @@ -419,8 +424,7 @@ mod tests {
None,
);
// percentage doesn't change since the previous aggregate is ignored.
let result = test_state.slots_aggregate.get_aggregate_result();
assert_eq!(result.active_percent, 20.0);
assert_eq!(test_state.slots_aggregate.active_percent(), 20.0);
}

#[test]
Expand Down
21 changes: 12 additions & 9 deletions wen-restart/src/wen_restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,34 +154,37 @@ pub(crate) fn aggregate_restart_last_voted_fork_slots(
.insert(from, record);
}
}
let result = last_voted_fork_slots_aggregate.get_aggregate_result();
// Because all operations on the aggregate are called from this single thread, we can
// fetch all results separately without worrying about them being out of sync. We can
// also use returned iterator without the vector changing underneath us.
let active_percent = last_voted_fork_slots_aggregate.active_percent();
let mut filtered_slots: Vec<Slot>;
{
let my_bank_forks = bank_forks.read().unwrap();
filtered_slots = result
.slots_to_repair
.into_iter()
filtered_slots = last_voted_fork_slots_aggregate
.slots_to_repair_iter()
.filter(|slot| {
if slot <= &root_slot || is_full_slots.contains(slot) {
if *slot <= &root_slot || is_full_slots.contains(*slot) {
return false;
}
let is_full = my_bank_forks
.get(*slot)
.get(**slot)
.map_or(false, |bank| bank.is_frozen());
if is_full {
is_full_slots.insert(*slot);
is_full_slots.insert(**slot);
}
!is_full
})
.cloned()
.collect();
}
filtered_slots.sort();
info!(
"Active peers: {} Slots to repair: {:?}",
result.active_percent, &filtered_slots
active_percent, &filtered_slots
);
if filtered_slots.is_empty()
&& result.active_percent > wait_for_supermajority_threshold_percent as f64
&& active_percent > wait_for_supermajority_threshold_percent as f64
{
*wen_restart_repair_slots.write().unwrap() = vec![];
break;
Expand Down