From 6471823e70703f28f3c35645d5e4dc9ac7282b22 Mon Sep 17 00:00:00 2001 From: Steven Czabaniuk Date: Fri, 2 Feb 2024 22:35:54 -0400 Subject: [PATCH 1/3] ledger-tool: Add command to create shreds from bigtable data --- ledger-tool/src/bigtable.rs | 106 +++++++++++++++++++++++++++++++++++- 1 file changed, 104 insertions(+), 2 deletions(-) diff --git a/ledger-tool/src/bigtable.rs b/ledger-tool/src/bigtable.rs index 89a0cb6e2d7..5fad9cb47ca 100644 --- a/ledger-tool/src/bigtable.rs +++ b/ledger-tool/src/bigtable.rs @@ -19,11 +19,14 @@ use { display::println_transaction, CliBlock, CliTransaction, CliTransactionConfirmation, OutputFormat, }, + solana_entry::entry::Entry, solana_ledger::{ - bigtable_upload::ConfirmedBlockUploadConfig, blockstore::Blockstore, + bigtable_upload::ConfirmedBlockUploadConfig, + blockstore::Blockstore, blockstore_options::AccessType, + shred::{ProcessShredsStats, ReedSolomonCache, Shredder}, }, - solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature}, + solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature, signer::keypair::Keypair}, solana_storage_bigtable::CredentialType, solana_transaction_status::{ BlockEncodingOptions, ConfirmedBlock, EncodeError, EncodedConfirmedBlock, @@ -178,6 +181,63 @@ async fn entries( Ok(()) } +async fn shreds( + blockstore: Arc, + starting_slot: Slot, + ending_slot: Slot, + config: solana_storage_bigtable::LedgerStorageConfig, +) -> Result<(), Box> { + let bigtable = solana_storage_bigtable::LedgerStorage::new_with_config(config) + .await + .map_err(|err| format!("Failed to connect to storage: {err:?}"))?; + + // Make the range inclusive of both starting and ending slot + let limit = (ending_slot - starting_slot + 1) as usize; + let mut slots = bigtable.get_confirmed_blocks(starting_slot, limit).await?; + slots.retain(|&slot| slot <= ending_slot); + + let keypair = Keypair::from_bytes(&[0; 64]).unwrap(); + // TODO: parse this / allow command line flag ? + let shred_version = 0; + + for slot in slots.iter() { + let block = bigtable.get_confirmed_block(*slot).await?; + let entry_summaries = bigtable.get_entries(*slot).await?; + let entries: Vec<_> = entry_summaries + .map(|entry_summary| { + let num_hashes = entry_summary.num_hashes; + let hash = entry_summary.hash; + let transactions = block.transactions[entry_summary.starting_transaction_index + ..entry_summary.starting_transaction_index + + entry_summary.num_transactions as usize] + .iter() + .map(|tx_with_meta| tx_with_meta.get_transaction()) + .collect(); + Entry { + num_hashes, + hash, + transactions, + } + }) + .collect(); + + let shredder = Shredder::new(*slot, block.parent_slot, 0, shred_version)?; + let (data_shreds, _coding_shreds) = shredder.entries_to_shreds( + &keypair, + &entries, + true, // last_in_slot + None, // chained_merkle_root + 0, // next_shred_index + 0, // next_code_index + false, // merkle_variant + &ReedSolomonCache::default(), + &mut ProcessShredsStats::default(), + ); + blockstore.insert_shreds(data_shreds, None, false)?; + } + Ok(()) +} + async fn blocks( starting_slot: Slot, limit: usize, @@ -862,6 +922,31 @@ impl BigTableSubCommand for App<'_, '_> { .required(true), ), ) + .subcommand( + SubCommand::with_name("shreds") + .about( + "Get confirmed blocks, shred them and insert the shreds into the \ + local Blockstore", + ) + .arg( + Arg::with_name("starting_slot") + .long("starting-slot") + .validator(is_slot) + .value_name("SLOT") + .takes_value(true) + .required(true) + .help("Reconstruct starting from this slot (inclusive)"), + ) + .arg( + Arg::with_name("ending_slot") + .long("ending-slot") + .validator(is_slot) + .value_name("SLOT") + .takes_value(true) + .required(true) + .help("Reconstruct ending with this slot (inclusive)"), + ), + ) .subcommand( SubCommand::with_name("confirm") .about("Confirm transaction by signature") @@ -1156,6 +1241,23 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) { }; runtime.block_on(entries(slot, output_format, config)) } + ("shreds", Some(arg_matches)) => { + let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot); + let ending_slot = value_t_or_exit!(arg_matches, "ending_slot", Slot); + let blockstore = Arc::new(crate::open_blockstore( + &canonicalize_ledger_path(ledger_path), + arg_matches, + AccessType::Primary, + )); + + let config = solana_storage_bigtable::LedgerStorageConfig { + read_only: true, + instance_name, + app_profile_id, + ..solana_storage_bigtable::LedgerStorageConfig::default() + }; + runtime.block_on(shreds(blockstore, starting_slot, ending_slot, config)) + } ("blocks", Some(arg_matches)) => { let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot); let limit = value_t_or_exit!(arg_matches, "limit", usize); From be2e052ab7973ab7e2c07aa6db0376a453f4a0e6 Mon Sep 17 00:00:00 2001 From: Steven Czabaniuk Date: Mon, 5 Feb 2024 15:14:07 -0600 Subject: [PATCH 2/3] Add fallback incase entries data does not exist --- ledger-tool/src/bigtable.rs | 132 +++++++++++++++++++++++++++++------- 1 file changed, 109 insertions(+), 23 deletions(-) diff --git a/ledger-tool/src/bigtable.rs b/ledger-tool/src/bigtable.rs index 5fad9cb47ca..20731035c34 100644 --- a/ledger-tool/src/bigtable.rs +++ b/ledger-tool/src/bigtable.rs @@ -9,7 +9,7 @@ use { }, crossbeam_channel::unbounded, futures::stream::FuturesUnordered, - log::{debug, error, info}, + log::{debug, error, info, warn}, serde_json::json, solana_clap_utils::{ input_parsers::pubkey_of, @@ -19,14 +19,16 @@ use { display::println_transaction, CliBlock, CliTransaction, CliTransactionConfirmation, OutputFormat, }, - solana_entry::entry::Entry, + solana_entry::entry::{create_ticks, Entry}, solana_ledger::{ bigtable_upload::ConfirmedBlockUploadConfig, blockstore::Blockstore, blockstore_options::AccessType, shred::{ProcessShredsStats, ReedSolomonCache, Shredder}, }, - solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature, signer::keypair::Keypair}, + solana_sdk::{ + clock::Slot, hash::Hash, pubkey::Pubkey, signature::Signature, signer::keypair::Keypair, + }, solana_storage_bigtable::CredentialType, solana_transaction_status::{ BlockEncodingOptions, ConfirmedBlock, EncodeError, EncodedConfirmedBlock, @@ -185,6 +187,7 @@ async fn shreds( blockstore: Arc, starting_slot: Slot, ending_slot: Slot, + allow_dummy_poh: bool, config: solana_storage_bigtable::LedgerStorageConfig, ) -> Result<(), Box> { let bigtable = solana_storage_bigtable::LedgerStorage::new_with_config(config) @@ -196,30 +199,107 @@ async fn shreds( let mut slots = bigtable.get_confirmed_blocks(starting_slot, limit).await?; slots.retain(|&slot| slot <= ending_slot); - let keypair = Keypair::from_bytes(&[0; 64]).unwrap(); - // TODO: parse this / allow command line flag ? + let keypair = Keypair::from_bytes(&[0; 64])?; + // TODO: parse this from CLI ? let shred_version = 0; + // TODO: parse from CLI OR extract from genesis + let num_ticks_per_slot = 64; + // TODO: parse from CLI OR extract from Bank; tick rate changed recently + let num_hashes_per_tick = 12500; for slot in slots.iter() { let block = bigtable.get_confirmed_block(*slot).await?; - let entry_summaries = bigtable.get_entries(*slot).await?; - let entries: Vec<_> = entry_summaries - .map(|entry_summary| { - let num_hashes = entry_summary.num_hashes; - let hash = entry_summary.hash; - let transactions = block.transactions[entry_summary.starting_transaction_index - ..entry_summary.starting_transaction_index - + entry_summary.num_transactions as usize] - .iter() - .map(|tx_with_meta| tx_with_meta.get_transaction()) - .collect(); - Entry { - num_hashes, - hash, - transactions, + let entry_summaries = match bigtable.get_entries(*slot).await { + Ok(summaries) => Some(summaries), + Err(err) => { + let err_msg = format!("Failed to get PoH entry data for {slot}: {err}"); + + if allow_dummy_poh { + warn!("{err_msg}. Will create dummy PoH data instead."); + } else { + return Err(format!( + "{err_msg}. Try passing --allow-dummy-poh to allow \ + creation of shreds with dummy PoH data" + ))?; } - }) - .collect(); + None + } + }; + + let entries = match entry_summaries { + Some(entry_summaries) => entry_summaries + .map(|entry_summary| { + let num_hashes = entry_summary.num_hashes; + let hash = entry_summary.hash; + let transactions = block.transactions[entry_summary.starting_transaction_index + ..entry_summary.starting_transaction_index + + entry_summary.num_transactions as usize] + .iter() + .map(|tx_with_meta| tx_with_meta.get_transaction()) + .collect(); + Entry { + num_hashes, + hash, + transactions, + } + }) + .collect(), + None => { + let num_total_ticks = ((slot - block.parent_slot) * num_ticks_per_slot) as usize; + let num_total_entries = num_total_ticks + block.transactions.len(); + let mut entries = Vec::with_capacity(num_total_entries); + + // Create virtual tick entries for any skipped slots + // + // These ticks are necessary so that the tick height is + // advanced to the proper value when this block is processed. + // + // Additionally, a blockhash will still be inserted into the + // recent blockhashes sysvar for skipped slots. So, these + // virtual ticks will have the proper PoH + let num_skipped_slots = slot - block.parent_slot - 1; + if num_skipped_slots > 0 { + let num_virtual_ticks = num_skipped_slots * num_ticks_per_slot; + let parent_blockhash = Hash::from_str(&block.previous_blockhash)?; + let virtual_ticks_entries = + create_ticks(num_virtual_ticks, num_hashes_per_tick, parent_blockhash); + entries.extend(virtual_ticks_entries.into_iter()); + } + + // Create transaction entries + // + // Keep it simple and just do one transaction per Entry + let transaction_entries = block.transactions.iter().map(|tx_with_meta| Entry { + num_hashes: 0, + hash: Hash::default(), + transactions: vec![tx_with_meta.get_transaction()], + }); + entries.extend(transaction_entries.into_iter()); + + // Create the tick entries for this slot + // + // We do not know the intermediate hashes, so just use default + // hash for all ticks. The exception is the final tick; the + // final tick determines the blockhash so set it the known + // blockhash from the bigtable block + let blockhash = Hash::from_str(&block.blockhash)?; + let tick_entries = (0..num_ticks_per_slot).map(|idx| { + let hash = if idx == num_ticks_per_slot - 1 { + blockhash + } else { + Hash::default() + }; + Entry { + num_hashes: 0, + hash, + transactions: vec![], + } + }); + entries.extend(tick_entries.into_iter()); + + entries + } + }; let shredder = Shredder::new(*slot, block.parent_slot, 0, shred_version)?; let (data_shreds, _coding_shreds) = shredder.entries_to_shreds( @@ -1256,7 +1336,13 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) { app_profile_id, ..solana_storage_bigtable::LedgerStorageConfig::default() }; - runtime.block_on(shreds(blockstore, starting_slot, ending_slot, config)) + runtime.block_on(shreds( + blockstore, + starting_slot, + ending_slot, + false, + config, + )) } ("blocks", Some(arg_matches)) => { let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot); From 579360860517bdb20774219ae3a16d0c5a494928 Mon Sep 17 00:00:00 2001 From: Steven Czabaniuk Date: Mon, 5 Feb 2024 15:41:49 -0600 Subject: [PATCH 3/3] More thought out terminology for args and help --- ledger-tool/src/bigtable.rs | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/ledger-tool/src/bigtable.rs b/ledger-tool/src/bigtable.rs index 20731035c34..ef35f396afa 100644 --- a/ledger-tool/src/bigtable.rs +++ b/ledger-tool/src/bigtable.rs @@ -187,7 +187,7 @@ async fn shreds( blockstore: Arc, starting_slot: Slot, ending_slot: Slot, - allow_dummy_poh: bool, + allow_mock_poh: bool, config: solana_storage_bigtable::LedgerStorageConfig, ) -> Result<(), Box> { let bigtable = solana_storage_bigtable::LedgerStorage::new_with_config(config) @@ -212,14 +212,14 @@ async fn shreds( let entry_summaries = match bigtable.get_entries(*slot).await { Ok(summaries) => Some(summaries), Err(err) => { - let err_msg = format!("Failed to get PoH entry data for {slot}: {err}"); + let err_msg = format!("Failed to get PoH entries for {slot}: {err}"); - if allow_dummy_poh { - warn!("{err_msg}. Will create dummy PoH data instead."); + if allow_mock_poh { + warn!("{err_msg}. Will create mock PoH entries instead."); } else { return Err(format!( - "{err_msg}. Try passing --allow-dummy-poh to allow \ - creation of shreds with dummy PoH data" + "{err_msg}. Try passing --allow-mock-poh to allow \ + creation of shreds with mocked PoH entries" ))?; } None @@ -1005,8 +1005,9 @@ impl BigTableSubCommand for App<'_, '_> { .subcommand( SubCommand::with_name("shreds") .about( - "Get confirmed blocks, shred them and insert the shreds into the \ - local Blockstore", + "Get confirmed blocks from BigTable, reassemble the transactions \ + and entries, shred the block and then insert the shredded blocks into \ + the local Blockstore", ) .arg( Arg::with_name("starting_slot") @@ -1015,7 +1016,7 @@ impl BigTableSubCommand for App<'_, '_> { .value_name("SLOT") .takes_value(true) .required(true) - .help("Reconstruct starting from this slot (inclusive)"), + .help("Start shred creation at this slot (inclusive)"), ) .arg( Arg::with_name("ending_slot") @@ -1024,7 +1025,18 @@ impl BigTableSubCommand for App<'_, '_> { .value_name("SLOT") .takes_value(true) .required(true) - .help("Reconstruct ending with this slot (inclusive)"), + .help("Stop shred creation at this slot (inclusive)"), + ) + .arg( + Arg::with_name("allow_mock_poh") + .long("allow-mock-poh") + .takes_value(false) + .help( + "For slots where PoH entries are unavailable, allow the \ + generation of mock PoH entries. The mock PoH entries enable \ + the shredded block(s) to be replayable if PoH verification is \ + disabled.", + ), ), ) .subcommand( @@ -1324,6 +1336,7 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) { ("shreds", Some(arg_matches)) => { let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot); let ending_slot = value_t_or_exit!(arg_matches, "ending_slot", Slot); + let allow_mock_poh = arg_matches.is_present("allow_mock_poh"); let blockstore = Arc::new(crate::open_blockstore( &canonicalize_ledger_path(ledger_path), arg_matches, @@ -1340,7 +1353,7 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) { blockstore, starting_slot, ending_slot, - false, + allow_mock_poh, config, )) }