Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions wen-restart/src/heaviest_fork_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,17 @@ impl HeaviestForkAggregate {
let from = &received_heaviest_fork.from;
let sender_stake = self.epoch_stakes.node_id_to_stake(from).unwrap_or(0);
if sender_stake == 0 {
warn!(
"Gossip should not accept zero-stake RestartLastVotedFork from {:?}",
from
);
warn!("Gossip should not accept zero-stake RestartLastVotedFork from {from:?}");
return HeaviestForkAggregateResult::ZeroStakeIgnored;
}
if from == &self.my_pubkey {
return HeaviestForkAggregateResult::AlreadyExists;
}
if received_heaviest_fork.shred_version != self.my_shred_version {
warn!(
"Gossip should not accept RestartLastVotedFork with different shred version {} from {:?}",
received_heaviest_fork.shred_version, from
"Gossip should not accept RestartLastVotedFork with different shred version {} \
from {from:?}",
received_heaviest_fork.shred_version
);
return HeaviestForkAggregateResult::Malformed;
}
Expand Down
134 changes: 68 additions & 66 deletions wen-restart/src/wen_restart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,24 +102,23 @@ impl std::fmt::Display for WenRestartError {
WenRestartError::BankHashMismatch(slot, expected, actual) => {
write!(
f,
"Bank hash mismatch for slot: {} expected: {} actual: {}",
slot, expected, actual
"Bank hash mismatch for slot: {slot} expected: {expected} actual: {actual}"
)
}
WenRestartError::BlockNotFound(slot) => {
write!(f, "Block not found: {}", slot)
write!(f, "Block not found: {slot}")
}
WenRestartError::BlockNotFull(slot) => {
write!(f, "Block not full: {}", slot)
write!(f, "Block not full: {slot}")
}
WenRestartError::BlockNotFrozenAfterReplay(slot, err) => {
write!(f, "Block not frozen after replay: {} {:?}", slot, err)
write!(f, "Block not frozen after replay: {slot} {err:?}")
}
WenRestartError::BlockNotLinkedToExpectedParent(slot, parent, expected_parent) => {
write!(
f,
"Block {} is not linked to expected parent {} but to {:?}",
slot, expected_parent, parent
"Block {slot} is not linked to expected parent {expected_parent} but to \
{parent:?}"
)
}
WenRestartError::ChildStakeLargerThanParent(
Expand All @@ -130,15 +129,16 @@ impl std::fmt::Display for WenRestartError {
) => {
write!(
f,
"Block {} has more stake {} than its parent {} with stake {}",
slot, child_stake, parent, parent_stake
"Block {slot} has more stake {child_stake} than its parent {parent} with \
stake {parent_stake}"
)
}
WenRestartError::Exiting => write!(f, "Exiting"),
WenRestartError::FutureSnapshotExists(slot, highest_slot, directory) => {
write!(
f,
"Future snapshot exists for slot: {slot} highest slot: {highest_slot} in directory: {directory}",
"Future snapshot exists for slot: {slot} highest slot: {highest_slot} in \
directory: {directory}",
)
}
WenRestartError::GenerateSnapshotWhenOneExists(slot, directory) => {
Expand All @@ -156,14 +156,15 @@ impl std::fmt::Display for WenRestartError {
) => {
write!(
f,
"Heaviest fork on coordinator on different fork: heaviest: {coordinator_heaviest_slot} does not include: {should_include_slot}",
"Heaviest fork on coordinator on different fork: heaviest: \
{coordinator_heaviest_slot} does not include: {should_include_slot}",
)
}
WenRestartError::MalformedLastVotedForkSlotsProtobuf(record) => {
write!(f, "Malformed last voted fork slots protobuf: {:?}", record)
write!(f, "Malformed last voted fork slots protobuf: {record:?}")
}
WenRestartError::MalformedProgress(state, missing) => {
write!(f, "Malformed progress: {:?} missing {}", state, missing)
write!(f, "Malformed progress: {state:?} missing {missing}")
}
WenRestartError::MissingLastVotedForkSlots => {
write!(f, "Missing last voted fork slots")
Expand All @@ -174,12 +175,12 @@ impl std::fmt::Display for WenRestartError {
WenRestartError::NotEnoughStakeAgreeingWithUs(slot, hash, block_stake_map) => {
write!(
f,
"Not enough stake agreeing with our slot: {} hash: {}\n {:?}",
slot, hash, block_stake_map,
"Not enough stake agreeing with our slot: {slot} hash: {hash}\n \
{block_stake_map:?}",
)
}
WenRestartError::UnexpectedState(state) => {
write!(f, "Unexpected state: {:?}", state)
write!(f, "Unexpected state: {state:?}")
}
}
}
Expand Down Expand Up @@ -257,7 +258,7 @@ pub(crate) fn aggregate_restart_last_voted_fork_slots(
if let Err(e) =
last_voted_fork_slots_aggregate.aggregate_from_record(key_string, message)
{
error!("Failed to aggregate from record: {:?}", e);
error!("Failed to aggregate from record: {e:?}");
}
}
} else {
Expand Down Expand Up @@ -290,12 +291,15 @@ pub(crate) fn aggregate_restart_last_voted_fork_slots(
old_record,
new_record,
) => {
info!("Different LastVotedForkSlots message exists from {from}: {old_record:#?} vs {new_record:#?}");
info!(
"Different LastVotedForkSlots message exists from {from}: {old_record:#?} \
vs {new_record:#?}"
);
progress.conflict_message.insert(
from,
ConflictMessage {
old_message: format!("{:?}", old_record),
new_message: format!("{:?}", new_record),
old_message: format!("{old_record:?}"),
new_message: format!("{new_record:?}"),
},
);
}
Expand Down Expand Up @@ -405,12 +409,15 @@ pub(crate) fn find_heaviest_fork(
if let Ok(Some(block_meta)) = blockstore.meta(*slot) {
if block_meta.parent_slot != Some(expected_parent) {
if expected_parent == root_slot {
error!("First block {} in repair list not linked to local root {}, this could mean our root is too old",
slot, root_slot);
error!(
"First block {slot} in repair list not linked to local root {root_slot}, \
this could mean our root is too old"
);
} else {
error!(
"Block {} in blockstore is not linked to expected parent from Wen Restart {} but to Block {:?}",
slot, expected_parent, block_meta.parent_slot
"Block {slot} in blockstore is not linked to expected parent from Wen \
Restart {expected_parent} but to Block {:?}",
block_meta.parent_slot
);
}
return Err(WenRestartError::BlockNotLinkedToExpectedParent(
Expand All @@ -435,10 +442,7 @@ pub(crate) fn find_heaviest_fork(
bank_forks.clone(),
&exit,
)?;
info!(
"Heaviest fork found: slot: {}, bankhash: {:?}",
heaviest_fork_slot, heaviest_fork_bankhash
);
info!("Heaviest fork found: slot: {heaviest_fork_slot}, bankhash: {heaviest_fork_bankhash:?}");
Ok((heaviest_fork_slot, heaviest_fork_bankhash))
}

Expand Down Expand Up @@ -720,7 +724,7 @@ pub(crate) fn aggregate_restart_heaviest_fork(
for message in &aggregate_record.received {
if let Err(e) = heaviest_fork_aggregate.aggregate_from_record(message) {
// Do not abort wen_restart if we got one malformed message.
error!("Failed to aggregate from record: {:?}", e);
error!("Failed to aggregate from record: {e:?}");
}
}
} else {
Expand All @@ -740,11 +744,11 @@ pub(crate) fn aggregate_restart_heaviest_fork(
}
let start = timestamp();
for new_heaviest_fork in cluster_info.get_restart_heaviest_fork(&mut cursor) {
info!("Received new heaviest fork: {:?}", new_heaviest_fork);
info!("Received new heaviest fork: {new_heaviest_fork:?}");
let from = new_heaviest_fork.from.to_string();
match heaviest_fork_aggregate.aggregate(new_heaviest_fork) {
HeaviestForkAggregateResult::Inserted(record) => {
info!("Successfully aggregated new heaviest fork: {:?}", record);
info!("Successfully aggregated new heaviest fork: {record:?}");
progress
.heaviest_fork_aggregate
.as_mut()
Expand All @@ -753,12 +757,15 @@ pub(crate) fn aggregate_restart_heaviest_fork(
.push(record);
}
HeaviestForkAggregateResult::DifferentVersionExists(old_record, new_record) => {
warn!("Different version from {from} exists old {old_record:#?} vs new {new_record:#?}");
warn!(
"Different version from {from} exists old {old_record:#?} vs new \
{new_record:#?}"
);
progress.conflict_message.insert(
from,
ConflictMessage {
old_message: format!("{:?}", old_record),
new_message: format!("{:?}", new_record),
old_message: format!("{old_record:?}"),
new_message: format!("{new_record:?}"),
},
);
}
Expand Down Expand Up @@ -820,7 +827,7 @@ pub(crate) fn repair_heaviest_fork(
} else {
vec![heaviest_slot]
};
info!("wen_restart repair slots: {:?}", to_repair);
info!("wen_restart repair slots: {to_repair:?}");
if to_repair.is_empty() {
return Ok(()); // All blocks are full
}
Expand Down Expand Up @@ -918,8 +925,8 @@ pub(crate) fn receive_restart_heaviest_fork(
for new_heaviest_fork in cluster_info.get_restart_heaviest_fork(&mut cursor) {
if new_heaviest_fork.from == wen_restart_coordinator {
info!(
"Received new heaviest fork from coordinator: {} {:?}",
wen_restart_coordinator, new_heaviest_fork
"Received new heaviest fork from coordinator: {wen_restart_coordinator} \
{new_heaviest_fork:?}"
);
let coordinator_heaviest_slot = new_heaviest_fork.last_slot;
let coordinator_heaviest_hash = new_heaviest_fork.last_slot_hash;
Expand Down Expand Up @@ -966,10 +973,7 @@ pub(crate) fn send_and_receive_heaviest_fork(
) {
Ok(()) => pushfn(coordinator_slot, coordinator_hash),
Err(e) => {
warn!(
"Failed to verify coordinator heaviest fork: {:?}, exit soon",
e
);
warn!("Failed to verify coordinator heaviest fork: {e:?}, exit soon");
pushfn(my_heaviest_fork_slot, my_heaviest_fork_hash);
// flush_push_queue only flushes the messages to crds, doesn't guarantee
// sending them out, so we still need to wait for a while before exiting.
Expand Down Expand Up @@ -1056,10 +1060,7 @@ pub fn wait_for_wen_restart(config: WenRestartConfig) -> Result<()> {
config.blockstore.clone(),
config.exit.clone(),
)?;
info!(
"Heaviest fork found: slot: {}, bankhash: {}",
slot, bankhash
);
info!("Heaviest fork found: slot: {slot}, bankhash: {bankhash}");
HeaviestForkRecord {
slot,
bankhash: bankhash.to_string(),
Expand Down Expand Up @@ -1128,9 +1129,8 @@ pub fn wait_for_wen_restart(config: WenRestartConfig) -> Result<()> {
} => {
error!(
"Wen start finished, please remove --wen_restart and restart with \
--wait-for-supermajority {} --expected-bank-hash {} --expected-shred-version {} \
--no-snapshot-fetch",
slot, hash, shred_version,
--wait-for-supermajority {slot} --expected-bank-hash {hash} \
--expected-shred-version {shred_version} --no-snapshot-fetch",
);
if config.cluster_info.id() == config.wen_restart_coordinator {
aggregate_restart_heaviest_fork(
Expand Down Expand Up @@ -1258,10 +1258,7 @@ pub(crate) fn initialize(
Err(e) => {
let stdio_err = e.downcast_ref::<std::io::Error>();
if stdio_err.is_some_and(|e| e.kind() == std::io::ErrorKind::NotFound) {
info!(
"wen restart proto file not found at {:?}, write init state",
records_path
);
info!("wen restart proto file not found at {records_path:?}, write init state");
let progress = WenRestartProgress {
state: RestartState::Init.into(),
..Default::default()
Expand Down Expand Up @@ -1306,12 +1303,15 @@ pub(crate) fn initialize(
.take(RestartLastVotedForkSlots::MAX_SLOTS)
.collect();
} else {
error!("
Cannot find last voted slot in the tower storage, it either means that this node has never \
voted or the tower storage is corrupted. Unfortunately, since WenRestart is a consensus protocol \
depending on each participant to send their last voted fork slots, your validator cannot participate.\
Please check discord for the conclusion of the WenRestart protocol, then generate a snapshot and use \
--wait-for-supermajority to restart the validator.");
error!(
"Cannot find last voted slot in the tower storage, it either means \
that this node has never voted or the tower storage is corrupted. \
Unfortunately, since WenRestart is a consensus protocol depending on \
each participant to send their last voted fork slots, your validator \
cannot participate.Please check discord for the conclusion of the \
WenRestart protocol, then generate a snapshot and use \
--wait-for-supermajority to restart the validator."
);
return Err(WenRestartError::MissingLastVotedForkSlots.into());
}
}
Expand Down Expand Up @@ -1408,7 +1408,7 @@ pub(crate) fn initialize(
fn read_wen_restart_records(records_path: &PathBuf) -> Result<WenRestartProgress> {
let buffer = read(records_path)?;
let progress = WenRestartProgress::decode(&mut Cursor::new(buffer))?;
info!("read record {:?}", progress);
info!("read record {progress:?}");
Ok(progress)
}

Expand All @@ -1418,7 +1418,7 @@ pub(crate) fn write_wen_restart_records(
) -> Result<()> {
// overwrite anything if exists
let mut file = File::create(records_path)?;
info!("writing new record {:?}", new_progress);
info!("writing new record {new_progress:?}");
let mut buf = Vec::with_capacity(new_progress.encoded_len());
new_progress.encode(&mut buf)?;
file.write_all(&buf)?;
Expand Down Expand Up @@ -1683,9 +1683,9 @@ mod tests {
expected_progress.last_voted_fork_slots_aggregate
);
panic!(
"wait_on_expected_progress_with_timeout failed to get expected progress {:?} expected {:?}",
&progress,
expected_progress
"wait_on_expected_progress_with_timeout failed to get expected progress {:?} \
expected {:?}",
&progress, expected_progress
);
}
sleep(Duration::from_millis(10));
Expand Down Expand Up @@ -2052,7 +2052,7 @@ mod tests {
None,
&mut timing,
) {
panic!("process_single_slot failed: {:?}", e);
panic!("process_single_slot failed: {e:?}");
}

{
Expand Down Expand Up @@ -2219,10 +2219,12 @@ mod tests {
&test_state.wen_restart_proto_path,
VoteTransaction::from(Vote::new(last_voted_fork_slots.clone(), last_vote_bankhash)),
test_state.blockstore.clone()
).err()
)
.err()
.unwrap()
.to_string(),
"Malformed progress: HeaviestFork missing final_result in last_voted_fork_slots_aggregate",
"Malformed progress: HeaviestFork missing final_result in \
last_voted_fork_slots_aggregate",
);
let progress_missing_my_heaviestfork = WenRestartProgress {
state: RestartState::GenerateSnapshot.into(),
Expand Down
Loading