Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,41 @@ pub fn max_ticks_per_n_shreds(num_shreds: u64, shred_data_size: Option<usize>) -
max_entries_per_n_shred(&ticks[0], num_shreds, shred_data_size)
}

// This is used in the integration tests for shredding.
#[cfg(feature = "dev-context-only-utils")]
pub fn max_entries_per_n_shred_last_or_not(
entry: &Entry,
num_shreds: u64,
is_last_in_slot: bool,
) -> u64 {
// Default 32:32 erasure batches yields 64 shreds; log2(64) = 6.
let merkle_variant_unsigned = Some((
/*proof_size:*/ 6, /*chained:*/ true, /*resigned:*/ false,
));
let merkle_variant_signed = Some((
/*proof_size:*/ 6, /*chained:*/ true, /*resigned:*/ true,
));

let vec_size = bincode::serialized_size(&vec![entry]).unwrap();
let entry_size = bincode::serialized_size(entry).unwrap();
let count_size = vec_size - entry_size;

if !is_last_in_slot {
// all shreds are unsigned
let shred_data_size = ShredData::capacity(merkle_variant_unsigned).unwrap() as u64;
(shred_data_size * num_shreds - count_size) / entry_size
} else {
// last FEC SET is signed, all others are unsigned
let shred_data_size_unsigned = ShredData::capacity(merkle_variant_unsigned).unwrap() as u64;
let shred_data_size_signed = ShredData::capacity(merkle_variant_signed).unwrap() as u64;
let shreds_per_fec_block = SHREDS_PER_FEC_BLOCK as u64;
(shred_data_size_unsigned * (num_shreds - shreds_per_fec_block)
+ shred_data_size_signed * shreds_per_fec_block
- count_size)
/ entry_size
}
}
Comment thread
maheshr marked this conversation as resolved.

pub fn max_entries_per_n_shred(
entry: &Entry,
num_shreds: u64,
Expand Down
158 changes: 115 additions & 43 deletions ledger/src/shred/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,7 @@ pub(crate) fn make_shreds_from_data(
keypair: &Keypair,
// The Merkle root of the previous erasure batch if chained.
chained_merkle_root: Option<Hash>,
mut data: &[u8], // Serialized &[Entry]
data: &[u8], // Serialized &[Entry]
slot: Slot,
parent_slot: Slot,
shred_version: u16,
Expand All @@ -1039,18 +1039,31 @@ pub(crate) fn make_shreds_from_data(
) -> Result<Vec<Shred>, Error> {
let now = Instant::now();
let chained = chained_merkle_root.is_some();
let resigned = chained && is_last_in_slot;

// only sign if last fec set in slot and is chained
let sign_last_fec_set = chained && is_last_in_slot;
let proof_size = PROOF_ENTRIES_FOR_32_32_BATCH;
let data_buffer_per_shred_size = ShredData::capacity(proof_size, chained, resigned)?;

// unsigned data_buffer size
let data_buffer_per_shred_size = ShredData::capacity(proof_size, chained, false)?;
let data_buffer_total_size = DATA_SHREDS_PER_FEC_BLOCK * data_buffer_per_shred_size;

// signed data_buffer size
let data_buffer_per_shred_size_signed = if sign_last_fec_set {
ShredData::capacity(proof_size, chained, true)?
} else {
0
};
let data_buffer_total_size_signed =
DATA_SHREDS_PER_FEC_BLOCK * data_buffer_per_shred_size_signed;

// Common header for the data shreds.
let mut common_header_data = ShredCommonHeader {
signature: Signature::default(),
shred_variant: ShredVariant::MerkleData {
proof_size,
chained,
resigned,
resigned: false,
},
slot,
index: next_shred_index,
Expand All @@ -1063,7 +1076,7 @@ pub(crate) fn make_shreds_from_data(
shred_variant: ShredVariant::MerkleCode {
proof_size,
chained,
resigned,
resigned: false,
},
index: next_code_index,
..common_header_data
Expand All @@ -1083,18 +1096,34 @@ pub(crate) fn make_shreds_from_data(
}
};

// Pre-allocate shreds to avoid reallocations.
let mut shreds = {
let number_of_batches = data.len().div_ceil(data_buffer_total_size);
let total_num_shreds = SHREDS_PER_FEC_BLOCK * number_of_batches;
Vec::<Shred>::with_capacity(total_num_shreds)
let (mut unsigned_data, signed_data) = if sign_last_fec_set {
// Reserve at least one signed batch (may be empty) at the end.
if data.len() > data_buffer_total_size_signed {
// sign everything except the last batch
let split_at = data.len() - data_buffer_total_size_signed;
data.split_at(split_at)
} else {
// only enough data for one fec set, sign the whole thing
(&[][..], data)
}
} else {
// not last fec set, so don't sign
(data, &[][..])
};
stats.data_bytes += data.len();
stats.data_bytes += unsigned_data.len() + signed_data.len();

let unsigned_sets = unsigned_data.len().div_ceil(data_buffer_total_size);
let number_of_fec_sets = if sign_last_fec_set {
unsigned_sets + 1
} else {
unsigned_sets
};
let mut shreds = Vec::<Shred>::with_capacity(SHREDS_PER_FEC_BLOCK * number_of_fec_sets);

// Split the data into full erasure batches and initialize data and coding
// shreds for each batch.
while data.len() >= data_buffer_total_size {
let (current_batch_data_chunk, rest) = data.split_at(data_buffer_total_size);
while unsigned_data.len() >= data_buffer_total_size {
let (current_batch_data_chunk, rest) = unsigned_data.split_at(data_buffer_total_size);
debug_assert_eq!(
current_batch_data_chunk.len(),
DATA_SHREDS_PER_FEC_BLOCK * data_buffer_per_shred_size
Expand All @@ -1110,7 +1139,7 @@ pub(crate) fn make_shreds_from_data(
.map(Shred::ShredData),
);
shreds.extend(make_shreds_code_header_only(&mut common_header_code).map(Shred::ShredCode));
data = rest;
unsigned_data = rest;
}

// Two possibilities for taking this conditional:
Expand All @@ -1120,29 +1149,33 @@ pub(crate) fn make_shreds_from_data(
// 2.) Shreds is_empty, which only happens when we entered w/ zero data.
//
// In either case, we want to generate empty data shreds.
if !data.is_empty() || shreds.is_empty() {
stats.padding_bytes += data_buffer_total_size - data.len();
common_header_data.shred_variant = ShredVariant::MerkleData {
if !unsigned_data.is_empty() || (shreds.is_empty() && !sign_last_fec_set) {
stats.padding_bytes += data_buffer_total_size - unsigned_data.len();
shred_leftover_data(
proof_size,
chained,
resigned,
};
common_header_code.shred_variant = ShredVariant::MerkleCode {
false,
unsigned_data,
data_buffer_per_shred_size,
&mut common_header_data,
&mut common_header_code,
data_header,
&mut shreds,
);
}
if !signed_data.is_empty() || (shreds.is_empty() && sign_last_fec_set) {
stats.padding_bytes += data_buffer_total_size_signed - signed_data.len();
shred_leftover_data(
proof_size,
chained,
resigned,
};
common_header_data.fec_set_index = common_header_data.index;
common_header_code.fec_set_index = common_header_data.fec_set_index;
shreds.extend({
// Create data chunks out of remaining data + padding.
let chunks = data
.chunks(data_buffer_per_shred_size)
.chain(std::iter::repeat(&[][..])) // possible padding
.take(DATA_SHREDS_PER_FEC_BLOCK);
make_shreds_data(&mut common_header_data, data_header, chunks).map(Shred::ShredData)
});
shreds.extend(make_shreds_code_header_only(&mut common_header_code).map(Shred::ShredCode));
true,
signed_data,
data_buffer_per_shred_size_signed,
&mut common_header_data,
&mut common_header_code,
data_header,
&mut shreds,
);
}

// Adjust flags for the very last data shred.
Expand Down Expand Up @@ -1212,6 +1245,41 @@ pub(crate) fn make_shreds_from_data(
Ok(shreds)
}

#[allow(clippy::too_many_arguments)]
fn shred_leftover_data(
proof_size: u8,
chained: bool,
resigned: bool,
data: &[u8],
data_buffer_per_shred_size: usize,
common_header_data: &mut ShredCommonHeader,
common_header_code: &mut ShredCommonHeader,
data_header: DataShredHeader,
shreds: &mut Vec<Shred>,
) {
common_header_data.shred_variant = ShredVariant::MerkleData {
proof_size,
chained,
resigned,
};
common_header_code.shred_variant = ShredVariant::MerkleCode {
proof_size,
chained,
resigned,
};
common_header_data.fec_set_index = common_header_data.index;
common_header_code.fec_set_index = common_header_data.fec_set_index;
shreds.extend({
// Create data chunks out of remaining data + padding.
let chunks = data
.chunks(data_buffer_per_shred_size)
.chain(std::iter::repeat(&[][..])) // possible padding
.take(DATA_SHREDS_PER_FEC_BLOCK);
make_shreds_data(common_header_data, data_header, chunks).map(Shred::ShredData)
});
shreds.extend(make_shreds_code_header_only(common_header_code).map(Shred::ShredCode));
}

// Given shreds of the same erasure batch:
// - Writes common and {data,coding} headers into shreds' payload.
// - Fills in erasure code buffers in the coding shreds.
Expand Down Expand Up @@ -1687,7 +1755,11 @@ mod test {
let thread_pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap();
let keypair = Keypair::new();
let chained_merkle_root = chained.then(|| Hash::new_from_array(rng.gen()));
let resigned = chained && is_last_in_slot;

// only sign last batch if it is chained and is the last in slot
// let resigned = chained && is_last_in_slot;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: looks like this was left in. can be deleted:

// let resigned = chained && is_last_in_slot;

^ you can do this in a follow up PR

let sign_last_fec_set = chained && is_last_in_slot;

let slot = 149_745_689;
let parent_slot = slot - rng.gen_range(1..65536);
let shred_version = rng.gen();
Expand Down Expand Up @@ -1720,14 +1792,12 @@ mod test {
})
.collect();
// Assert that the input data can be recovered from data shreds.
assert_eq!(
data,
data_shreds
.iter()
.flat_map(|shred| shred.data().unwrap())
.copied()
.collect::<Vec<_>>()
);
let data2 = data_shreds
.iter()
.flat_map(|shred| shred.data().unwrap())
.copied()
.collect::<Vec<_>>();
assert_eq!(data, data2);
// Assert that shreds sanitize and verify.
let pubkey = keypair.pubkey();
for shred in &shreds {
Expand Down Expand Up @@ -1775,8 +1845,10 @@ mod test {
// Verify common, data and coding headers.
let mut num_data_shreds = 0;
let mut num_coding_shreds = 0;
for shred in &shreds {
for (index, shred) in shreds.iter().enumerate() {
let common_header = shred.common_header();
let resigned = sign_last_fec_set && index >= shreds.len() - 64;

assert_eq!(common_header.slot, slot);
assert_eq!(common_header.version, shred_version);
let proof_size = shred.proof_size().unwrap();
Expand Down
25 changes: 17 additions & 8 deletions ledger/src/shred/wire.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,9 @@ pub(crate) fn corrupt_packet<R: Rng>(
mod tests {
use {
super::*,
crate::shred::{tests::make_merkle_shreds_for_tests, traits::ShredData},
crate::shred::{
tests::make_merkle_shreds_for_tests, traits::ShredData, SHREDS_PER_FEC_BLOCK,
},
assert_matches::assert_matches,
rand::Rng,
solana_perf::packet::PacketFlags,
Expand All @@ -443,11 +445,14 @@ mod tests {
let mut shreds =
make_merkle_shreds_for_tests(&mut rng, slot, data_size, chained, is_last_in_slot)
.unwrap();
for shred in shreds.iter_mut() {
// enumerate the shreds so that I have index of each shred
let shreds_len = shreds.len();
for (index, shred) in shreds.iter_mut().enumerate() {
let keypair = Keypair::new();
let signature = make_dummy_signature(&mut rng);
let nonce = repaired.then(|| rng.gen::<Nonce>());
if chained && is_last_in_slot {
let is_last_batch = index >= shreds_len - SHREDS_PER_FEC_BLOCK;
if chained && is_last_in_slot && is_last_batch {
shred.set_retransmitter_signature(&signature).unwrap();

let packet = &mut shred.payload().to_packet(nonce);
Expand Down Expand Up @@ -500,9 +505,11 @@ mod tests {
let mut shreds =
make_merkle_shreds_for_tests(&mut rng, slot, data_size, chained, is_last_in_slot)
.unwrap();
for shred in &mut shreds {
let shreds_len = shreds.len();
for (index, shred) in shreds.iter_mut().enumerate() {
let signature = make_dummy_signature(&mut rng);
if chained && is_last_in_slot {
let is_last_batch = index >= shreds_len - SHREDS_PER_FEC_BLOCK;
if chained && is_last_in_slot && is_last_batch {
shred.set_retransmitter_signature(&signature).unwrap();
} else {
assert_matches!(
Expand All @@ -511,8 +518,10 @@ mod tests {
);
}
}
for shred in &shreds {

for (index, shred) in shreds.iter().enumerate() {
let nonce = repaired.then(|| rng.gen::<Nonce>());
let is_last_batch = index >= shreds_len - SHREDS_PER_FEC_BLOCK;
let mut packet = shred.payload().to_packet(nonce);
if repaired {
packet.meta_mut().flags |= PacketFlags::REPAIR;
Expand Down Expand Up @@ -571,9 +580,9 @@ mod tests {
}
assert_eq!(
is_retransmitter_signed_variant(bytes).unwrap(),
chained && is_last_in_slot
chained && is_last_in_slot && is_last_batch,
);
if chained && is_last_in_slot {
if chained && is_last_in_slot && is_last_batch {
assert_eq!(
get_retransmitter_signature_offset(bytes).unwrap(),
shred.retransmitter_signature_offset().unwrap(),
Expand Down
10 changes: 5 additions & 5 deletions ledger/tests/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use {
solana_hash::Hash,
solana_keypair::Keypair,
solana_ledger::shred::{
self, max_entries_per_n_shred, recover, verify_test_data_shred, ProcessShredsStats,
ReedSolomonCache, Shred, ShredData, Shredder, DATA_SHREDS_PER_FEC_BLOCK,
self, max_entries_per_n_shred, max_entries_per_n_shred_last_or_not, recover,
verify_test_data_shred, ProcessShredsStats, ReedSolomonCache, Shred, ShredData, Shredder,
DATA_SHREDS_PER_FEC_BLOCK,
},
solana_signer::Signer,
solana_system_transaction as system_transaction,
Expand All @@ -33,9 +34,8 @@ fn test_multi_fec_block_coding(is_last_in_slot: bool) {
let tx0 = system_transaction::transfer(&keypair0, &keypair1.pubkey(), 1, Hash::default());
let entry = Entry::new(&Hash::default(), 1, vec![tx0]);
let chained_merkle_root = Some(Hash::default());
let merkle_capacity = ShredData::capacity(Some((6, true, is_last_in_slot))).unwrap();
let num_entries =
max_entries_per_n_shred(&entry, num_data_shreds as u64, Some(merkle_capacity));
max_entries_per_n_shred_last_or_not(&entry, num_data_shreds as u64, is_last_in_slot);

let entries: Vec<_> = (0..num_entries)
.map(|_| {
Expand Down Expand Up @@ -145,7 +145,7 @@ fn test_multi_fec_block_different_size_coding() {
// Necessary in order to ensure the last shred in the slot
// is part of the recovered set, and that the below `index`
// calculation in the loop is correct
assert!(fec_data_shreds.len() % 2 == 0);
assert_eq!(fec_data_shreds.len() % 2, 0);
for (i, recovered_shred) in recovered_data.into_iter().enumerate() {
let index = first_data_index + (i * 2) + 1;
verify_test_data_shred(
Expand Down
Loading