Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions turbine/src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl BroadcastStage {
| Error::ClusterInfo(ClusterInfoError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these?
_ => {
inc_new_counter_error!("streamer-broadcaster-error", 1, 1);
error!("{} broadcaster error: {:?}", name, e);
error!("{name} broadcaster error: {e:?}");
}
}
}
Expand Down Expand Up @@ -745,10 +745,8 @@ pub mod test {
}

trace!(
"[broadcast_ledger] max_tick_height: {}, start_tick_height: {}, ticks_per_slot: {}",
max_tick_height,
start_tick_height,
ticks_per_slot,
"[broadcast_ledger] max_tick_height: {max_tick_height}, start_tick_height: \
{start_tick_height}, ticks_per_slot: {ticks_per_slot}",
);

let mut entries = vec![];
Expand Down
11 changes: 7 additions & 4 deletions turbine/src/broadcast_stage/broadcast_duplicates_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,8 @@ impl BroadcastRun for BroadcastDuplicatesRun {
{
if cluster_partition.contains(node.pubkey()) {
info!(
"Not broadcasting original shred index {}, slot {} to partition node {}",
"Not broadcasting original shred index {}, slot {} to partition node \
{}",
shred.index(),
shred.slot(),
node.pubkey(),
Expand All @@ -378,13 +379,15 @@ impl BroadcastRun for BroadcastDuplicatesRun {
.iter()
.filter_map(|pubkey| {
info!(
"Broadcasting partition shred index {}, slot {} to partition node {}",
"Broadcasting partition shred index {}, slot {} to partition \
node {}",
shred.index(),
shred.slot(),
pubkey,
);
let tvu = cluster_info
.lookup_contact_info(pubkey, |node| node.tvu(Protocol::UDP))??;
let tvu = cluster_info.lookup_contact_info(pubkey, |node| {
node.tvu(Protocol::UDP)
})??;
Some((shred.payload(), tvu))
})
.collect(),
Expand Down
2 changes: 1 addition & 1 deletion turbine/src/broadcast_stage/standard_broadcast_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ mod test {
assert!(!coding.is_empty());

let r = bs.entries_to_shreds(&keypair, &entries, 0, false, &mut stats, 10, 10);
info!("{:?}", r);
info!("{r:?}");
assert_matches!(r, Err(BroadcastError::TooManyShreds));
}
}
4 changes: 2 additions & 2 deletions turbine/src/cluster_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,8 @@ impl<T: 'static> ClusterNodesCache<T> {
.find_map(|bank| bank.epoch_staked_nodes(epoch))
.unwrap_or_else(|| {
error!(
"ClusterNodesCache::get: unknown Bank::epoch_staked_nodes \
for epoch: {epoch}, slot: {shred_slot}"
"ClusterNodesCache::get: unknown Bank::epoch_staked_nodes for epoch: \
{epoch}, slot: {shred_slot}"
);
inc_new_counter_error!("cluster_nodes-unknown_epoch_staked_nodes", 1);
Arc::<HashMap<Pubkey, /*stake:*/ u64>>::default()
Expand Down
5 changes: 1 addition & 4 deletions turbine/src/quic_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,7 @@ async fn run_server(
));
}
Err(error) => {
debug!(
"Error while accepting incoming connection: {error:?} from {}",
remote_addr
);
debug!("Error while accepting incoming connection: {error:?} from {remote_addr}");
}
}
}
Expand Down
58 changes: 45 additions & 13 deletions turbine/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,10 @@ fn retransmit_shred(
RetransmitSocket::Socket(socket) => match multi_target_send(socket, shred, &addrs) {
Ok(()) => num_addrs,
Err(SendPktsError::IoError(ioerr, num_failed)) => {
error!("retransmit_to multi_target_send error: {ioerr:?}, {num_failed}/{} packets failed", num_addrs);
error!(
"retransmit_to multi_target_send error: {ioerr:?}, \
{num_failed}/{num_addrs} packets failed"
);
num_addrs - num_failed
}
},
Expand Down Expand Up @@ -915,18 +918,24 @@ mod tests {
// Pick a shred with same index as `shred` but different parent offset
let shred_dup = shreds_data_5_3.last().unwrap().clone();
// first shred passed through
assert!(!shred_deduper.dedup(shred_dup.id(), shred_dup.payload(), MAX_DUPLICATE_COUNT),
"First time seeing shred X with differnt parent slot (3 instead of 4) => Not dup because common header is unique & shred ID only seen once"
assert!(
!shred_deduper.dedup(shred_dup.id(), shred_dup.payload(), MAX_DUPLICATE_COUNT),
"First time seeing shred X with differnt parent slot (3 instead of 4) => Not dup \
because common header is unique & shred ID only seen once"
);
// then blocked
assert!(shred_deduper.dedup(shred_dup.id(), shred_dup.payload(), MAX_DUPLICATE_COUNT),
"Second time seeing shred X with parent slot 3 => Dup because common header is not unique & shred ID seen twice"
assert!(
shred_deduper.dedup(shred_dup.id(), shred_dup.payload(), MAX_DUPLICATE_COUNT),
"Second time seeing shred X with parent slot 3 => Dup because common header is not \
unique & shred ID seen twice"
);

let shred_dup2 = shreds_data_5_2.last().unwrap().clone();

assert!(shred_deduper.dedup(shred_dup2.id(), shred_dup2.payload(), MAX_DUPLICATE_COUNT),
"First time seeing shred X with parent slot 2 => Dup because common header is unique but shred ID seen twice already"
assert!(
shred_deduper.dedup(shred_dup2.id(), shred_dup2.payload(), MAX_DUPLICATE_COUNT),
"First time seeing shred X with parent slot 2 => Dup because common header is unique \
but shred ID seen twice already"
);

/* Coding shreds */
Expand Down Expand Up @@ -957,11 +966,26 @@ mod tests {
"we want a shred with same index but different FEC set index"
);
// 2nd unique coding passes
assert!(!shred_deduper.dedup(shred_inv_code_1.id(), shred_inv_code_1.payload(), MAX_DUPLICATE_COUNT),
"First time seeing shred Y w/ changed header (FEC Set index 2) => Not dup because common header is unique & shred ID only seen once");
assert!(
!shred_deduper.dedup(
shred_inv_code_1.id(),
shred_inv_code_1.payload(),
MAX_DUPLICATE_COUNT
),
"First time seeing shred Y w/ changed header (FEC Set index 2) => Not dup because \
common header is unique & shred ID only seen once"
);
// same again is blocked
assert!(shred_deduper.dedup(shred_inv_code_1.id(), shred_inv_code_1.payload(), MAX_DUPLICATE_COUNT),"
Second time seeing shred Y w/ changed header (FEC Set index 2) => Dup because common header is not unique & shred ID seen twice ");
assert!(
shred_deduper.dedup(
shred_inv_code_1.id(),
shred_inv_code_1.payload(),
MAX_DUPLICATE_COUNT
),
"
Second time seeing shred Y w/ changed header (FEC Set index 2) => Dup because common \
header is not unique & shred ID seen twice "
);
// Make a coding shred at index 4 based off FEC set index 3
let (_, shreds_code_invalid) = make_shreds_for_slot(5, 4, 3);

Expand All @@ -971,7 +995,15 @@ mod tests {
shred_inv_code_2.index(),
"we want a shred with same index but different FEC set index"
);
assert!(shred_deduper.dedup(shred_inv_code_2.id(), shred_inv_code_2.payload(), MAX_DUPLICATE_COUNT),"
First time seeing shred Y w/ changed header (FEC Set index 3)=>Dup because common header is unique but shred ID seen twice already");
assert!(
shred_deduper.dedup(
shred_inv_code_2.id(),
shred_inv_code_2.payload(),
MAX_DUPLICATE_COUNT
),
"
First time seeing shred Y w/ changed header (FEC Set index 3)=>Dup because common \
header is unique but shred ID seen twice already"
);
}
}
Loading