Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 253 additions & 12 deletions ledger/src/blockstore_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use solana_runtime::{
bank::{Bank, TransactionBalancesSet, TransactionProcessResult, TransactionResults},
bank_forks::BankForks,
bank_utils,
commitment::VOTE_THRESHOLD_SIZE,
transaction_batch::TransactionBatch,
transaction_utils::OrderedIterator,
vote_sender_types::ReplayVoteSender,
Expand All @@ -31,9 +32,10 @@ use solana_sdk::{
timing::duration_as_ms,
transaction::{Result, Transaction, TransactionError},
};
use solana_vote_program::vote_state::VoteState;
use std::{
cell::RefCell,
collections::HashMap,
collections::{BTreeMap, HashMap},
path::PathBuf,
result,
sync::Arc,
Expand Down Expand Up @@ -454,6 +456,7 @@ pub fn verify_ticks(
) -> std::result::Result<(), BlockError> {
let next_bank_tick_height = bank.tick_height() + entries.tick_count();
let max_bank_tick_height = bank.max_tick_height();

if next_bank_tick_height > max_bank_tick_height {
warn!("Too many entry ticks found in slot: {}", bank.slot());
return Err(BlockError::InvalidTickCount);
Expand Down Expand Up @@ -757,11 +760,18 @@ fn load_frozen_forks(
transaction_status_sender: Option<TransactionStatusSender>,
) -> result::Result<Vec<Arc<Bank>>, BlockstoreProcessorError> {
let mut initial_forks = HashMap::new();
let mut all_banks = HashMap::new();
let mut last_status_report = Instant::now();
let mut pending_slots = vec![];
let mut last_root_slot = root_bank.slot();
let mut slots_elapsed = 0;
let mut txs = 0;
let blockstore_max_root = blockstore.max_root();
let max_root = std::cmp::max(root_bank.slot(), blockstore_max_root);
info!(
"load_frozen_forks() latest root from blockstore: {}, max_root: {}",
blockstore_max_root, max_root,
);
process_next_slots(
root_bank,
root_meta,
Expand Down Expand Up @@ -810,14 +820,49 @@ fn load_frozen_forks(
}
txs += progress.num_txs;

if blockstore.is_root(slot) {
*root = slot;
leader_schedule_cache.set_root(&bank);
bank.squash();
pending_slots.clear();
initial_forks.clear();
last_root_slot = slot;
// Block must be frozen by this point, otherwise `process_single_slot` would
// have errored above
assert!(bank.is_frozen());
all_banks.insert(bank.slot(), bank.clone());

// If we've reached the last known root in blockstore, start looking
// for newer cluster confirmed roots
let new_root_bank = {
if *root == max_root {
supermajority_root_from_bank(&bank).and_then(|supermajority_root| {
if supermajority_root > *root {
// If there's a cluster confirmed root greater than our last
// replayed root, then beccause the cluster confirmed root should
// be descended from our last root, it must exist in `all_banks`
let cluster_root_bank = all_banks.get(&supermajority_root).unwrap();

// cluster root must be a descendant of our root, otherwise something
// is drastically wrong
assert!(cluster_root_bank.ancestors.contains_key(root));
info!("blockstore processor found new cluster confirmed root: {}, observed in bank: {}", cluster_root_bank.slot(), bank.slot());
Some(cluster_root_bank)
} else {
None
}
})
} else if blockstore.is_root(slot) {
Some(&bank)
} else {
None
}
};

if let Some(new_root_bank) = new_root_bank {
*root = new_root_bank.slot();
last_root_slot = new_root_bank.slot();
leader_schedule_cache.set_root(&new_root_bank);
new_root_bank.squash();

// Filter out all non descendants of the new root
pending_slots.retain(|(_, pending_bank, _)| pending_bank.ancestors.contains_key(root));
initial_forks.retain(|_, fork_tip_bank| fork_tip_bank.ancestors.contains_key(root));
}

slots_elapsed += 1;

trace!(
Expand All @@ -844,6 +889,51 @@ fn load_frozen_forks(
Ok(initial_forks.values().cloned().collect::<Vec<_>>())
}

fn supermajority_root(roots: &BTreeMap<Slot, u64>, total_epoch_stake: u64) -> Option<Slot> {
// Find latest root
let mut total = 0;
for (root, stake) in roots.iter().rev() {
total += stake;
if total as f64 / total_epoch_stake as f64 > VOTE_THRESHOLD_SIZE {
return Some(*root);
}
}

None
}

fn supermajority_root_from_bank(bank: &Bank) -> Option<Slot> {
let roots: BTreeMap<Slot, u64> = bank
.vote_accounts()
.into_iter()
.filter_map(|(key, (stake, account))| {
if stake == 0 {
return None;
}

let vote_state = VoteState::from(&account);
if vote_state.is_none() {
warn!(
"Unable to get vote_state from account {} in bank: {}",
key,
bank.slot()
);
return None;
}

vote_state
.unwrap()
.root_slot
.map(|root_slot| (root_slot, stake))
})
.collect();

let total_epoch_stake = bank.total_epoch_stake();

// Find latest root
supermajority_root(&roots, total_epoch_stake)
}

// Processes and replays the contents of a single slot, returns Error
// if failed to play the slot
fn process_single_slot(
Expand Down Expand Up @@ -871,6 +961,7 @@ fn process_single_slot(
})?;

bank.freeze(); // all banks handled by this routine are created from complete slots

Ok(())
}

Expand Down Expand Up @@ -951,7 +1042,7 @@ pub mod tests {
use matches::assert_matches;
use rand::{thread_rng, Rng};
use solana_runtime::genesis_utils::{
create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs,
self, create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs,
};
use solana_sdk::account::Account;
use solana_sdk::{
Expand All @@ -963,8 +1054,9 @@ pub mod tests {
system_transaction,
transaction::{Transaction, TransactionError},
};
use solana_vote_program::vote_transaction;
use solana_vote_program::{vote_state::MAX_LOCKOUT_HISTORY, vote_transaction};
use std::{collections::BTreeSet, sync::RwLock};
use trees::tr;

#[test]
fn test_process_blockstore_with_missing_hashes() {
Expand Down Expand Up @@ -1535,11 +1627,11 @@ pub mod tests {
let blockstore =
Blockstore::open(&ledger_path).expect("Expected to successfully open database ledger");

// Let last_slot be the number of slots in the first two epochs
// Let `last_slot` be the number of slots in the first two epochs
let epoch_schedule = get_epoch_schedule(&genesis_config, Vec::new());
let last_slot = epoch_schedule.get_last_slot_in_epoch(1);

// Create a single chain of slots with all indexes in the range [0, last_slot + 1]
// Create a single chain of slots with all indexes in the range [0, v + 1]
for i in 1..=last_slot + 1 {
last_entry_hash = fill_blockstore_slot_with_ticks(
&blockstore,
Expand Down Expand Up @@ -2824,4 +2916,153 @@ pub mod tests {
.collect();
assert_eq!(successes, expected_successful_voter_pubkeys);
}

fn make_slot_with_vote_tx(
blockstore: &Blockstore,
ticks_per_slot: u64,
tx_landed_slot: Slot,
parent_slot: Slot,
parent_blockhash: &Hash,
vote_tx: Transaction,
slot_leader_keypair: &Arc<Keypair>,
) {
// Add votes to `last_slot` so that `root` will be confirmed
let vote_entry = next_entry(&parent_blockhash, 1, vec![vote_tx]);
let mut entries = create_ticks(ticks_per_slot, 0, vote_entry.hash);
entries.insert(0, vote_entry);
blockstore
.write_entries(
tx_landed_slot,
0,
0,
ticks_per_slot,
Some(parent_slot),
true,
&slot_leader_keypair,
entries,
0,
)
.unwrap();
}

fn run_test_process_blockstore_with_supermajority_root(blockstore_root: Option<Slot>) {
solana_logger::setup();
/*
Build fork structure:
slot 0
|
slot 1 <- (blockstore root)
/ \
slot 2 |
| |
slot 4 |
slot 5
|
`expected_root_slot`
/ \
... minor fork
/
`last_slot`
*/
let starting_fork_slot = 5;
let mut main_fork = tr(starting_fork_slot);
let mut main_fork_ref = main_fork.root_mut();

// Make enough slots to make a root slot > blockstore_root
let expected_root_slot = starting_fork_slot + blockstore_root.unwrap_or(0);
let last_main_fork_slot = expected_root_slot + MAX_LOCKOUT_HISTORY as u64 + 1;

// Make `minor_fork`
let last_minor_fork_slot = last_main_fork_slot + 1;
let minor_fork = tr(last_minor_fork_slot);

// Make 'main_fork`
for slot in starting_fork_slot + 1..last_main_fork_slot {
if slot - 1 == expected_root_slot {
main_fork_ref.push_front(minor_fork.clone());
}
main_fork_ref.push_front(tr(slot));
main_fork_ref = main_fork_ref.first_mut().unwrap();
}
let forks = tr(0) / (tr(1) / (tr(2) / (tr(4))) / main_fork);
let validator_keypairs = ValidatorVoteKeypairs::new_rand();
let GenesisConfigInfo { genesis_config, .. } =
genesis_utils::create_genesis_config_with_vote_accounts(
10_000,
&[&validator_keypairs],
vec![100],
);
let ticks_per_slot = genesis_config.ticks_per_slot();
let ledger_path = get_tmp_ledger_path!();
let blockstore = Blockstore::open(&ledger_path).unwrap();
blockstore.add_tree(forks, false, true, ticks_per_slot, genesis_config.hash());

if let Some(blockstore_root) = blockstore_root {
blockstore.set_roots(&[blockstore_root]).unwrap();
}

let opts = ProcessOptions {
poh_verify: true,
..ProcessOptions::default()
};
let (bank_forks, _leader_schedule) =
process_blockstore(&genesis_config, &blockstore, Vec::new(), opts.clone()).unwrap();

let last_vote_bank_hash = bank_forks.get(last_main_fork_slot - 1).unwrap().hash();
let last_vote_blockhash = bank_forks
.get(last_main_fork_slot - 1)
.unwrap()
.last_blockhash();
let slots: Vec<_> = (expected_root_slot..last_main_fork_slot).collect();
let vote_tx = vote_transaction::new_vote_transaction(
slots,
last_vote_bank_hash,
last_vote_blockhash,
&validator_keypairs.node_keypair,
&validator_keypairs.vote_keypair,
&validator_keypairs.vote_keypair,
None,
);

// Add votes to `last_slot` so that `root` will be confirmed
let leader_keypair = Arc::new(validator_keypairs.node_keypair);
make_slot_with_vote_tx(
&blockstore,
ticks_per_slot,
last_main_fork_slot,
last_main_fork_slot - 1,
&last_vote_blockhash,
vote_tx,
&leader_keypair,
);

let (bank_forks, _leader_schedule) =
process_blockstore(&genesis_config, &blockstore, Vec::new(), opts).unwrap();

assert_eq!(bank_forks.root(), expected_root_slot);
assert_eq!(
bank_forks.frozen_banks().len() as u64,
last_minor_fork_slot - expected_root_slot + 1
);

// Minor fork at `last_main_fork_slot + 1` was above the `expected_root_slot`
// so should not have been purged
//
// Fork at slot 2 was purged because it was below the `expected_root_slot`
for slot in 0..=last_minor_fork_slot {
if slot >= expected_root_slot {
let bank = bank_forks.get(slot).unwrap();
assert_eq!(bank.slot(), slot);
assert!(bank.is_frozen());
} else {
assert!(bank_forks.get(slot).is_none());
}
}
}

#[test]
fn test_process_blockstore_with_supermajority_root() {
run_test_process_blockstore_with_supermajority_root(None);
run_test_process_blockstore_with_supermajority_root(Some(1))
}
}