Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ impl Tower {
bank_forks.frozen_banks().values().cloned().collect(),
node_pubkey,
vote_account,
vec![],
);
let root = root_bank.slot();

Expand Down
73 changes: 70 additions & 3 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ impl ReplayStage {
&bank_forks,
&my_pubkey,
&vote_account,
&blockstore,
);
let mut current_leader = None;
let mut last_reset = Hash::default();
Expand Down Expand Up @@ -1230,23 +1231,37 @@ impl ReplayStage {
bank_forks: &RwLock<BankForks>,
my_pubkey: &Pubkey,
vote_account: &Pubkey,
blockstore: &Blockstore,
) -> (ProgressMap, HeaviestSubtreeForkChoice) {
let (root_bank, frozen_banks) = {
let (root_bank, frozen_banks, duplicate_slot_hashes) = {
let bank_forks = bank_forks.read().unwrap();
let duplicate_slots = blockstore
.duplicate_slots_iterator(bank_forks.root_bank().slot())
.unwrap();
let duplicate_slot_hashes = duplicate_slots
.filter_map(|slot| bank_forks.bank_hash(slot).map(|hash| (slot, hash)));
Comment on lines +1238 to +1242
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be moved outside of this block scope using just the returned root_bank, since it doesn't rely on holding the bank_forks lock

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You still need the bank forks lock to filter get the hashes for the duplicate slots:
.filter_map(|slot| bank_forks.bank_hash(slot).map(|hash| (slot, hash)));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yeah, I'm dumb, ignore this

(
bank_forks.root_bank(),
bank_forks.frozen_banks().values().cloned().collect(),
duplicate_slot_hashes.collect::<Vec<(Slot, Hash)>>(),
)
};

Self::initialize_progress_and_fork_choice(&root_bank, frozen_banks, my_pubkey, vote_account)
Self::initialize_progress_and_fork_choice(
&root_bank,
frozen_banks,
my_pubkey,
vote_account,
duplicate_slot_hashes,
)
}

pub fn initialize_progress_and_fork_choice(
root_bank: &Bank,
mut frozen_banks: Vec<Arc<Bank>>,
my_pubkey: &Pubkey,
vote_account: &Pubkey,
duplicate_slot_hashes: Vec<(Slot, Hash)>,
) -> (ProgressMap, HeaviestSubtreeForkChoice) {
let mut progress = ProgressMap::default();

Expand All @@ -1261,11 +1276,15 @@ impl ReplayStage {
);
}
let root = root_bank.slot();
let heaviest_subtree_fork_choice = HeaviestSubtreeForkChoice::new_from_frozen_banks(
let mut heaviest_subtree_fork_choice = HeaviestSubtreeForkChoice::new_from_frozen_banks(
(root, root_bank.hash()),
&frozen_banks,
);

for slot_hash in duplicate_slot_hashes {
heaviest_subtree_fork_choice.mark_fork_invalid_candidate(&slot_hash);
}

(progress, heaviest_subtree_fork_choice)
}

Expand Down Expand Up @@ -2086,6 +2105,30 @@ impl ReplayStage {
purge_repair_slot_counter,
SlotStateUpdate::Dead(dead_state),
);

// If we previously marked this slot as duplicate in blockstore, let the state machine know
if !duplicate_slots_tracker.contains(&slot) && blockstore.get_duplicate_slot(slot).is_some()
{
let duplicate_state = DuplicateState::new_from_state(
slot,
gossip_duplicate_confirmed_slots,
heaviest_subtree_fork_choice,
|| true,
|| None,
);
check_slot_agrees_with_cluster(
slot,
root,
blockstore,
duplicate_slots_tracker,
epoch_slots_frozen_slots,
heaviest_subtree_fork_choice,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
SlotStateUpdate::Duplicate(duplicate_state),
);
}
}

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -2827,6 +2870,30 @@ impl ReplayStage {
purge_repair_slot_counter,
SlotStateUpdate::BankFrozen(bank_frozen_state),
);
// If we previously marked this slot as duplicate in blockstore, let the state machine know
Comment thread
bw-solana marked this conversation as resolved.
if !duplicate_slots_tracker.contains(&bank.slot())
&& blockstore.get_duplicate_slot(bank.slot()).is_some()
{
let duplicate_state = DuplicateState::new_from_state(
bank.slot(),
gossip_duplicate_confirmed_slots,
heaviest_subtree_fork_choice,
|| false,
|| Some(bank.hash()),
);
check_slot_agrees_with_cluster(
bank.slot(),
bank_forks.read().unwrap().root(),
blockstore,
duplicate_slots_tracker,
epoch_slots_frozen_slots,
heaviest_subtree_fork_choice,
duplicate_slots_to_repair,
ancestor_hashes_replay_update_sender,
purge_repair_slot_counter,
SlotStateUpdate::Duplicate(duplicate_state),
);
}
if let Some(sender) = bank_notification_sender {
sender
.sender
Expand Down
18 changes: 18 additions & 0 deletions local-cluster/src/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use {
solana_ledger::{
ancestor_iterator::AncestorIterator,
blockstore::{Blockstore, PurgeType},
blockstore_meta::DuplicateSlotProof,
blockstore_options::{AccessType, BlockstoreOptions},
leader_schedule::{FixedSchedule, LeaderSchedule},
},
Expand Down Expand Up @@ -153,6 +154,23 @@ pub fn wait_for_last_vote_in_tower_to_land_in_ledger(
})
}

/// Waits roughly 10 seconds for duplicate proof to appear in blockstore at `dup_slot`. Returns proof if found.
pub fn wait_for_duplicate_proof(ledger_path: &Path, dup_slot: Slot) -> Option<DuplicateSlotProof> {
for _ in 0..10 {
let duplicate_fork_validator_blockstore = open_blockstore(ledger_path);
if let Some((found_dup_slot, found_duplicate_proof)) =
duplicate_fork_validator_blockstore.get_first_duplicate_proof()
{
if found_dup_slot == dup_slot {
return Some(found_duplicate_proof);
};
}

sleep(Duration::from_millis(1000));
}
None
}

pub fn copy_blocks(end_slot: Slot, source: &Blockstore, dest: &Blockstore) {
for slot in std::iter::once(end_slot).chain(AncestorIterator::new(end_slot, source)) {
let source_meta = source.meta(slot).unwrap().unwrap();
Expand Down
176 changes: 156 additions & 20 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ use {
ancestor_iterator::AncestorIterator,
bank_forks_utils,
blockstore::{entries_to_test_shreds, Blockstore},
blockstore_meta::DuplicateSlotProof,
blockstore_processor::ProcessOptions,
leader_schedule::FixedSchedule,
shred::Shred,
shred::{ProcessShredsStats, ReedSolomonCache, Shred, Shredder},
use_snapshot_archives_at_startup::UseSnapshotArchivesAtStartup,
},
solana_local_cluster::{
Expand All @@ -39,7 +38,7 @@ use {
last_root_in_tower, last_vote_in_tower, ms_for_n_slots, open_blockstore,
purge_slots_with_count, remove_tower, remove_tower_if_exists, restore_tower,
run_cluster_partition, run_kill_partition_switch_threshold, save_tower,
setup_snapshot_validator_config, test_faulty_node,
setup_snapshot_validator_config, test_faulty_node, wait_for_duplicate_proof,
wait_for_last_vote_in_tower_to_land_in_ledger, SnapshotValidatorConfig,
ValidatorTestConfig, DEFAULT_CLUSTER_LAMPORTS, DEFAULT_NODE_STAKE, RUST_LOG_FILTER,
},
Expand Down Expand Up @@ -69,7 +68,7 @@ use {
client::{AsyncClient, SyncClient},
clock::{self, Slot, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE},
commitment_config::CommitmentConfig,
epoch_schedule::MINIMUM_SLOTS_PER_EPOCH,
epoch_schedule::{DEFAULT_SLOTS_PER_EPOCH, MINIMUM_SLOTS_PER_EPOCH},
genesis_config::ClusterType,
hard_forks::HardForks,
hash::Hash,
Expand Down Expand Up @@ -5145,22 +5144,6 @@ fn test_duplicate_shreds_switch_failure() {
}
}

fn wait_for_duplicate_proof(ledger_path: &Path, dup_slot: Slot) -> Option<DuplicateSlotProof> {
for _ in 0..10 {
let duplicate_fork_validator_blockstore = open_blockstore(ledger_path);
if let Some((found_dup_slot, found_duplicate_proof)) =
duplicate_fork_validator_blockstore.get_first_duplicate_proof()
{
if found_dup_slot == dup_slot {
return Some(found_duplicate_proof);
};
}

sleep(Duration::from_millis(1000));
}
None
}

solana_logger::setup_with_default(RUST_LOG_FILTER);
let validator_keypairs = [
"28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4",
Expand Down Expand Up @@ -5506,3 +5489,156 @@ fn test_duplicate_shreds_switch_failure() {
SocketAddrSpace::Unspecified,
);
}

/// Forks previous marked invalid should be marked as such in fork choice on restart
#[test]
#[serial]
fn test_invalid_forks_persisted_on_restart() {
solana_logger::setup_with("info,solana_metrics=off,solana_ledger=off");

let dup_slot = 10;
let validator_keypairs = [
"28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4",
"2saHBBoTkLMmttmPQP8KfBkcCw45S5cwtV3wTdGCscRC8uxdgvHxpHiWXKx4LvJjNJtnNcbSv5NdheokFFqnNDt8",
]
.iter()
.map(|s| (Arc::new(Keypair::from_base58_string(s)), true))
.collect::<Vec<_>>();
let majority_keypair = validator_keypairs[1].0.clone();

let validators = validator_keypairs
.iter()
.map(|(kp, _)| kp.pubkey())
.collect::<Vec<_>>();

let node_stakes = vec![DEFAULT_NODE_STAKE, 100 * DEFAULT_NODE_STAKE];
let (target_pubkey, majority_pubkey) = (validators[0], validators[1]);
// Need majority validator to make the dup_slot
let validator_to_slots = vec![
(majority_pubkey, dup_slot as usize + 5),
(target_pubkey, DEFAULT_SLOTS_PER_EPOCH as usize),
];
let leader_schedule = create_custom_leader_schedule(validator_to_slots.into_iter());
let mut default_config = ValidatorConfig::default_for_test();
default_config.fixed_leader_schedule = Some(FixedSchedule {
leader_schedule: Arc::new(leader_schedule),
});
let mut validator_configs = make_identical_validator_configs(&default_config, 2);
// Majority shouldn't duplicate confirm anything
validator_configs[1].voting_disabled = true;

let mut cluster = LocalCluster::new(
&mut ClusterConfig {
cluster_lamports: DEFAULT_CLUSTER_LAMPORTS + node_stakes.iter().sum::<u64>(),
validator_configs,
node_stakes,
validator_keys: Some(validator_keypairs),
skip_warmup_slots: true,
..ClusterConfig::default()
},
SocketAddrSpace::Unspecified,
);

let target_ledger_path = cluster.ledger_path(&target_pubkey);

// Wait for us to vote past duplicate slot
let timer = Instant::now();
loop {
if let Some(slot) =
wait_for_last_vote_in_tower_to_land_in_ledger(&target_ledger_path, &target_pubkey)
{
if slot > dup_slot {
break;
}
}

assert!(
timer.elapsed() < Duration::from_secs(30),
"Did not make more than 10 blocks in 30 seconds"
);
sleep(Duration::from_millis(100));
}

// Send duplicate
let parent = {
let blockstore = open_blockstore(&target_ledger_path);
let parent = blockstore
.meta(dup_slot)
.unwrap()
.unwrap()
.parent_slot
.unwrap();

let entries = create_ticks(
64 * (std::cmp::max(1, dup_slot - parent)),
0,
cluster.genesis_config.hash(),
);
let last_hash = entries.last().unwrap().hash;
let version = solana_sdk::shred_version::version_from_hash(&last_hash);
let dup_shreds = Shredder::new(dup_slot, parent, 0, version)
.unwrap()
.entries_to_shreds(
&majority_keypair,
&entries,
true, // is_full_slot
0, // next_shred_index,
0, // next_code_index
false, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
)
.0;

info!("Sending duplicate shreds for {dup_slot}");
cluster.send_shreds_to_validator(dup_shreds.iter().collect(), &target_pubkey);
wait_for_duplicate_proof(&target_ledger_path, dup_slot)
.expect("Duplicate proof for {dup_slot} not found");
parent
};

info!("Duplicate proof for {dup_slot} has landed, restarting node");
let info = cluster.exit_node(&target_pubkey);

{
let blockstore = open_blockstore(&target_ledger_path);
purge_slots_with_count(&blockstore, dup_slot + 5, 100);
}

// Restart, should create an entirely new fork
cluster.restart_node(&target_pubkey, info, SocketAddrSpace::Unspecified);

info!("Waiting for fork built off {parent}");
let timer = Instant::now();
let mut checked_children: HashSet<Slot> = HashSet::default();
let mut done = false;
while !done {
let blockstore = open_blockstore(&target_ledger_path);
let parent_meta = blockstore.meta(parent).unwrap().expect("Meta must exist");
for child in parent_meta.next_slots {
if checked_children.contains(&child) {
continue;
}

if blockstore.is_full(child) {
let shreds = blockstore
.get_data_shreds_for_slot(child, 0)
.expect("Child is full");
let mut is_our_block = true;
for shred in shreds {
is_our_block &= shred.verify(&target_pubkey);
}
if is_our_block {
done = true;
}
checked_children.insert(child);
}
}

assert!(
timer.elapsed() < Duration::from_secs(30),
"Did not create a new fork off parent {parent} in 30 seconds after restart"
);
sleep(Duration::from_millis(100));
}
}