Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 204 additions & 3 deletions ledger-tool/src/bigtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
},
crossbeam_channel::unbounded,
futures::stream::FuturesUnordered,
log::{debug, error, info},
log::{debug, error, info, warn},
serde_json::json,
solana_clap_utils::{
input_parsers::pubkey_of,
Expand All @@ -19,11 +19,16 @@ use {
display::println_transaction, CliBlock, CliTransaction, CliTransactionConfirmation,
OutputFormat,
},
solana_entry::entry::{create_ticks, Entry},
solana_ledger::{
bigtable_upload::ConfirmedBlockUploadConfig, blockstore::Blockstore,
bigtable_upload::ConfirmedBlockUploadConfig,
blockstore::Blockstore,
blockstore_options::AccessType,
shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
},
solana_sdk::{
clock::Slot, hash::Hash, pubkey::Pubkey, signature::Signature, signer::keypair::Keypair,
},
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature},
solana_storage_bigtable::CredentialType,
solana_transaction_status::{
BlockEncodingOptions, ConfirmedBlock, EncodeError, EncodedConfirmedBlock,
Expand Down Expand Up @@ -178,6 +183,141 @@ async fn entries(
Ok(())
}

async fn shreds(
blockstore: Arc<Blockstore>,
starting_slot: Slot,
ending_slot: Slot,
allow_mock_poh: bool,
config: solana_storage_bigtable::LedgerStorageConfig,
) -> Result<(), Box<dyn std::error::Error>> {
let bigtable = solana_storage_bigtable::LedgerStorage::new_with_config(config)
.await
.map_err(|err| format!("Failed to connect to storage: {err:?}"))?;

// Make the range inclusive of both starting and ending slot
let limit = (ending_slot - starting_slot + 1) as usize;
let mut slots = bigtable.get_confirmed_blocks(starting_slot, limit).await?;
slots.retain(|&slot| slot <= ending_slot);

let keypair = Keypair::from_bytes(&[0; 64])?;
// TODO: parse this from CLI ?
let shred_version = 0;
// TODO: parse from CLI OR extract from genesis
let num_ticks_per_slot = 64;
// TODO: parse from CLI OR extract from Bank; tick rate changed recently
let num_hashes_per_tick = 12500;
Comment on lines +203 to +208
Copy link
Copy Markdown
Contributor Author

@steviez steviez Feb 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @CriesofCarrots - wanted to get some initial thoughts from you on these TODOs (and I guess the PR in general for context ha). Here is my current thinking:

  • shred_version: Make this an optional CLI flag; this isn't super critical IMO so fine to leave as optional
    • Also no place that we can fetch this from (don't think we want to introduce RPC calls into ledger-tool)
  • num_ticks_per_slot: This is currently a fixed value that is available from genesis or a Bank
  • num_hashes_per_tick: Until very recently, this was a fixed value that could be read from genesis. However, this value can now vary with slot so it must be determined from Bank

For num_ticks_per_slot and num_hashes_per_tick, I see two options: 1) require on CLI or 2) read from a Bank. 1) would be quicker, but more error prone. 2) will be more correct if the bank is in the same epoch as the desired slot range, but will take more time to execute as the snapshot will have to be unpacked

I'm leaning towards doing 2) so as not to introduce a foot-gun, but curious for a quick sanity check from you as well.

My thinking is that we'd extract a Bank from snapshot and use Bank helpers to confirm that the desired slot range to create shreds for is the same epoch as the Bank's slot. It could be nice to do this check before unpacking the snapshot to avoid wasted time, but we would need to re-impl some logic to determine epoch from slot ... maybe this wouldn't be so bad

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I lean toward extracting those two fields from a Bank as well. The unfortunate bit is, I guess we would have to error if the starting_slot..ending_slot range extends outside that one epoch, or only shred part of the range.

shred_version... Also no place that we can fetch this from (don't think we want to introduce RPC calls into ledger-tool)

Definitely don't want to have to depend on a running node for anything. If we put more requirements on the snapshot being used (or make greater assumptions), I guess we could actually compute the shred_version from the hard_forks in the Bank, right? Not sure it's worth it, though.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lean toward extracting those two fields from a Bank as well

Cool, this seems like the correct answer, but I was feeling a little lazy so wanted a heat check. Let's get it from the bank.

I guess we could actually compute the shred_version from the hard_forks in the Bank, right? Not sure it's worth it, though

Ohh, you might be right! If we can get it from Bank easily, then I'm good with doing it that way. I bet we may have a helper for that somewhere already


for slot in slots.iter() {
let block = bigtable.get_confirmed_block(*slot).await?;
let entry_summaries = match bigtable.get_entries(*slot).await {
Ok(summaries) => Some(summaries),
Err(err) => {
let err_msg = format!("Failed to get PoH entries for {slot}: {err}");

if allow_mock_poh {
warn!("{err_msg}. Will create mock PoH entries instead.");
} else {
return Err(format!(
"{err_msg}. Try passing --allow-mock-poh to allow \
creation of shreds with mocked PoH entries"
))?;
}
None
}
};

let entries = match entry_summaries {
Some(entry_summaries) => entry_summaries
.map(|entry_summary| {
let num_hashes = entry_summary.num_hashes;
let hash = entry_summary.hash;
let transactions = block.transactions[entry_summary.starting_transaction_index
..entry_summary.starting_transaction_index
+ entry_summary.num_transactions as usize]
.iter()
.map(|tx_with_meta| tx_with_meta.get_transaction())
.collect();
Entry {
num_hashes,
hash,
transactions,
}
})
.collect(),
None => {
let num_total_ticks = ((slot - block.parent_slot) * num_ticks_per_slot) as usize;
let num_total_entries = num_total_ticks + block.transactions.len();
let mut entries = Vec::with_capacity(num_total_entries);

// Create virtual tick entries for any skipped slots
//
// These ticks are necessary so that the tick height is
// advanced to the proper value when this block is processed.
//
// Additionally, a blockhash will still be inserted into the
// recent blockhashes sysvar for skipped slots. So, these
// virtual ticks will have the proper PoH
let num_skipped_slots = slot - block.parent_slot - 1;
if num_skipped_slots > 0 {
let num_virtual_ticks = num_skipped_slots * num_ticks_per_slot;
let parent_blockhash = Hash::from_str(&block.previous_blockhash)?;
let virtual_ticks_entries =
create_ticks(num_virtual_ticks, num_hashes_per_tick, parent_blockhash);
entries.extend(virtual_ticks_entries.into_iter());
}

// Create transaction entries
//
// Keep it simple and just do one transaction per Entry
let transaction_entries = block.transactions.iter().map(|tx_with_meta| Entry {
num_hashes: 0,
hash: Hash::default(),
transactions: vec![tx_with_meta.get_transaction()],
});
entries.extend(transaction_entries.into_iter());

// Create the tick entries for this slot
//
// We do not know the intermediate hashes, so just use default
// hash for all ticks. The exception is the final tick; the
// final tick determines the blockhash so set it the known
// blockhash from the bigtable block
let blockhash = Hash::from_str(&block.blockhash)?;
let tick_entries = (0..num_ticks_per_slot).map(|idx| {
let hash = if idx == num_ticks_per_slot - 1 {
blockhash
} else {
Hash::default()
};
Entry {
num_hashes: 0,
hash,
transactions: vec![],
}
});
entries.extend(tick_entries.into_iter());

entries
}
};

let shredder = Shredder::new(*slot, block.parent_slot, 0, shred_version)?;
let (data_shreds, _coding_shreds) = shredder.entries_to_shreds(
&keypair,
&entries,
true, // last_in_slot
None, // chained_merkle_root
0, // next_shred_index
0, // next_code_index
false, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
blockstore.insert_shreds(data_shreds, None, false)?;
}
Ok(())
}

async fn blocks(
starting_slot: Slot,
limit: usize,
Expand Down Expand Up @@ -862,6 +1002,43 @@ impl BigTableSubCommand for App<'_, '_> {
.required(true),
),
)
.subcommand(
SubCommand::with_name("shreds")
.about(
"Get confirmed blocks from BigTable, reassemble the transactions \
and entries, shred the block and then insert the shredded blocks into \
the local Blockstore",
)
.arg(
Arg::with_name("starting_slot")
.long("starting-slot")
.validator(is_slot)
.value_name("SLOT")
.takes_value(true)
.required(true)
.help("Start shred creation at this slot (inclusive)"),
)
.arg(
Arg::with_name("ending_slot")
.long("ending-slot")
.validator(is_slot)
.value_name("SLOT")
.takes_value(true)
.required(true)
.help("Stop shred creation at this slot (inclusive)"),
)
.arg(
Arg::with_name("allow_mock_poh")
.long("allow-mock-poh")
.takes_value(false)
.help(
"For slots where PoH entries are unavailable, allow the \
generation of mock PoH entries. The mock PoH entries enable \
the shredded block(s) to be replayable if PoH verification is \
disabled.",
),
),
)
.subcommand(
SubCommand::with_name("confirm")
.about("Confirm transaction by signature")
Expand Down Expand Up @@ -1156,6 +1333,30 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) {
};
runtime.block_on(entries(slot, output_format, config))
}
("shreds", Some(arg_matches)) => {
let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot);
let ending_slot = value_t_or_exit!(arg_matches, "ending_slot", Slot);
let allow_mock_poh = arg_matches.is_present("allow_mock_poh");
let blockstore = Arc::new(crate::open_blockstore(
&canonicalize_ledger_path(ledger_path),
arg_matches,
AccessType::Primary,
));

let config = solana_storage_bigtable::LedgerStorageConfig {
read_only: true,
instance_name,
app_profile_id,
..solana_storage_bigtable::LedgerStorageConfig::default()
};
runtime.block_on(shreds(
blockstore,
starting_slot,
ending_slot,
allow_mock_poh,
config,
))
}
("blocks", Some(arg_matches)) => {
let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot);
let limit = value_t_or_exit!(arg_matches, "limit", usize);
Expand Down