Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 31 additions & 20 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ impl ClusterInfo {
let mut gossip_crds = self.gossip.crds.write().unwrap();
for node in nodes {
if let Err(err) = gossip_crds.insert(node, now, GossipRoute::LocalMessage) {
warn!("crds insert failed {:?}", err);
warn!("crds insert failed {err:?}");
}
}
}
Expand Down Expand Up @@ -544,15 +544,20 @@ impl ClusterInfo {
}
let ip_addr = node.gossip().as_ref().map(SocketAddr::ip);
Some(format!(
"{:15} {:2}| {:5} | {:44} |{:^9}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| {}\n",
"{:15} {:2}| {:5} | {:44} |{:^9}| {:5}| {:5}| {:5}| {:5}| {:5}| {:5}| \
{:5}| {}\n",
node.gossip()
.filter(|addr| self.socket_addr_space.check(addr))
.as_ref()
.map(SocketAddr::ip)
.as_ref()
.map(IpAddr::to_string)
.unwrap_or_else(|| String::from("none")),
if node.pubkey() == &my_pubkey { "me" } else { "" },
if node.pubkey() == &my_pubkey {
"me"
} else {
""
},
now.saturating_sub(last_updated),
node.pubkey().to_string(),
if let Some(node_version) = node_version {
Expand All @@ -563,10 +568,16 @@ impl ClusterInfo {
self.addr_to_string(&ip_addr, &node.gossip()),
self.addr_to_string(&ip_addr, &node.tpu_vote(contact_info::Protocol::UDP)),
self.addr_to_string(&ip_addr, &node.tpu(contact_info::Protocol::UDP)),
self.addr_to_string(&ip_addr, &node.tpu_forwards(contact_info::Protocol::UDP)),
self.addr_to_string(
&ip_addr,
&node.tpu_forwards(contact_info::Protocol::UDP)
),
self.addr_to_string(&ip_addr, &node.tvu(contact_info::Protocol::UDP)),
self.addr_to_string(&ip_addr, &node.tvu(contact_info::Protocol::QUIC)),
self.addr_to_string(&ip_addr, &node.serve_repair(contact_info::Protocol::UDP)),
self.addr_to_string(
&ip_addr,
&node.serve_repair(contact_info::Protocol::UDP)
),
node.shred_version(),
))
}
Expand Down Expand Up @@ -679,7 +690,7 @@ impl ClusterInfo {
let now = timestamp();
for entry in entries {
if let Err(err) = gossip_crds.insert(entry, now, GossipRoute::LocalMessage) {
error!("push_epoch_slots failed: {:?}", err);
error!("push_epoch_slots failed: {err:?}");
}
}
}
Expand Down Expand Up @@ -768,7 +779,7 @@ impl ClusterInfo {
let vote = CrdsValue::new(vote, &self.keypair());
let mut gossip_crds = self.gossip.crds.write().unwrap();
if let Err(err) = gossip_crds.insert(vote, now, GossipRoute::LocalMessage) {
error!("push_vote failed: {:?}", err);
error!("push_vote failed: {err:?}");
}
}

Expand Down Expand Up @@ -869,8 +880,8 @@ impl ClusterInfo {
// restarted, and need to repush and evict the oldest vote
let Some(vote_index) = self.find_vote_index_to_evict(refresh_vote_slot) else {
warn!(
"trying to refresh slot {} but all votes in gossip table are for newer slots",
refresh_vote_slot,
"trying to refresh slot {refresh_vote_slot} but all votes in gossip table are \
for newer slots",
);
return;
};
Expand Down Expand Up @@ -1400,7 +1411,7 @@ impl ClusterInfo {
self.stats.trim_crds_table_failed.add_relaxed(1);
// TODO: Stakes are coming from the root-bank. Debug why/when
// they are empty/zero.
debug!("crds table trim failed: {:?}", err);
debug!("crds table trim failed: {err:?}");
}
Ok(num_purged) => {
self.stats
Expand Down Expand Up @@ -2213,7 +2224,7 @@ impl ClusterInfo {
// A send operation can only fail if the receiving end of a
// channel is disconnected.
Err(GossipError::SendError) => break,
Err(err) => error!("gossip consume: {}", err),
Err(err) => error!("gossip consume: {err}"),
Ok(()) => (),
}
}
Expand Down Expand Up @@ -2264,7 +2275,7 @@ impl ClusterInfo {
// that this will exit cleanly.
std::process::exit(1);
}
_ => error!("gossip run_listen failed: {}", err),
_ => error!("gossip run_listen failed: {err}"),
}
}
}
Expand Down Expand Up @@ -2386,8 +2397,8 @@ impl BindIpAddrs {
for ip in &addrs {
if ip.is_loopback() || ip.is_unspecified() || ip.is_multicast() {
return Err(format!(
"Invalid configuration: {:?} is not allowed with multiple --bind-address values (loopback, unspecified, or multicast)",
ip
"Invalid configuration: {ip:?} is not allowed with multiple \
--bind-address values (loopback, unspecified, or multicast)"
));
}
}
Expand Down Expand Up @@ -2645,7 +2656,7 @@ impl Node {
});

info!("vortexor_receivers is {vortexor_receivers:?}");
trace!("new ContactInfo: {:?}", info);
trace!("new ContactInfo: {info:?}");
let sockets = Sockets {
gossip: AtomicUdpSocket::new(gossip),
tvu: tvu_sockets,
Expand Down Expand Up @@ -4055,7 +4066,7 @@ mod tests {

let cluster_info44 = Arc::new({
let node = Node::new_localhost_with_pubkey(&keypair44.pubkey());
info!("{:?}", node);
info!("{node:?}");
ClusterInfo::new(node.info, keypair44.clone(), SocketAddrSpace::Unspecified)
});
let cluster_info43 = Arc::new({
Expand All @@ -4067,19 +4078,19 @@ mod tests {
assert_eq!(keypair44.pubkey().to_string().len(), 44);

let trace = cluster_info44.contact_info_trace();
info!("cluster:\n{}", trace);
info!("cluster:\n{trace}");
assert_eq!(trace.len(), 431);

let trace = cluster_info44.rpc_info_trace();
info!("rpc:\n{}", trace);
info!("rpc:\n{trace}");
assert_eq!(trace.len(), 335);

let trace = cluster_info43.contact_info_trace();
info!("cluster:\n{}", trace);
info!("cluster:\n{trace}");
assert_eq!(trace.len(), 431);

let trace = cluster_info43.rpc_info_trace();
info!("rpc:\n{}", trace);
info!("rpc:\n{trace}");
assert_eq!(trace.len(), 335);
}
}
2 changes: 1 addition & 1 deletion gossip/src/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl CrdsGossip {
let now = timestamp();
for entry in entries {
if let Err(err) = crds.insert(entry, now, GossipRoute::LocalMessage) {
error!("push_duplicate_shred failed: {:?}", err);
error!("push_duplicate_shred failed: {err:?}");
}
}
Ok(())
Expand Down
10 changes: 8 additions & 2 deletions gossip/src/duplicate_shred_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,15 @@ impl DuplicateShredHandlerTrait for DuplicateShredHandler {
let pubkey = shred_data.from;
if let Err(error) = self.handle_shred_data(shred_data) {
if error.is_non_critical() {
info!("Received invalid duplicate shred proof from {pubkey} for slot {slot}: {error:?}");
info!(
"Received invalid duplicate shred proof from {pubkey} for slot {slot}: \
{error:?}"
);
} else {
error!("Unable to process duplicate shred proof from {pubkey} for slot {slot}: {error:?}");
error!(
"Unable to process duplicate shred proof from {pubkey} for slot {slot}: \
{error:?}"
);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions gossip/src/gossip_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,10 @@ pub fn discover(
);

let id = spy_ref.id();
info!("Entrypoint: {:?}", entrypoint);
info!("Node Id: {:?}", id);
info!("Entrypoint: {entrypoint:?}");
info!("Node Id: {id:?}");
if let Some(my_gossip_addr) = my_gossip_addr {
info!("Gossip Address: {:?}", my_gossip_addr);
info!("Gossip Address: {my_gossip_addr:?}");
}

let _ip_echo_server = ip_echo.map(|tcp_listener| {
Expand Down
48 changes: 11 additions & 37 deletions gossip/tests/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,20 +276,15 @@ fn network_simulator_pull_only(thread_pool: &ThreadPool, network: &Network) {
let _ = crds.insert(entry, timestamp(), GossipRoute::LocalMessage);
}
let (converged, bytes_tx) = network_run_pull(thread_pool, network, 0, num * 2, 0.9);
trace!(
"network_simulator_pull_{}: converged: {} total_bytes: {}",
num,
converged,
bytes_tx
);
trace!("network_simulator_pull_{num}: converged: {converged} total_bytes: {bytes_tx}");
assert!(converged >= 0.9);
}

fn network_simulator(thread_pool: &ThreadPool, network: &mut Network, max_convergance: f64) {
let num = network.len();
// run for a small amount of time
let (converged, bytes_tx) = network_run_pull(thread_pool, network, 0, 10, 1.0);
trace!("network_simulator_push_{}: converged: {}", num, converged);
trace!("network_simulator_push_{num}: converged: {converged}");
// make sure there is someone in the active set
let network_values: Vec<Node> = network.values().cloned().collect();
network_values.par_iter().for_each(|node| {
Expand Down Expand Up @@ -329,21 +324,13 @@ fn network_simulator(thread_pool: &ThreadPool, network: &mut Network, max_conver
// push for a bit
let (queue_size, bytes_tx) = network_run_push(thread_pool, network, start, end);
total_bytes += bytes_tx;
trace!(
"network_simulator_push_{}: queue_size: {} bytes: {}",
num,
queue_size,
bytes_tx
);
trace!("network_simulator_push_{num}: queue_size: {queue_size} bytes: {bytes_tx}");
// pull for a bit
let (converged, bytes_tx) = network_run_pull(thread_pool, network, start, end, 1.0);
total_bytes += bytes_tx;
trace!(
"network_simulator_push_{}: converged: {} bytes: {} total_bytes: {}",
num,
converged,
bytes_tx,
total_bytes
"network_simulator_push_{num}: converged: {converged} bytes: {bytes_tx} total_bytes: \
{total_bytes}"
);
if converged > max_convergance {
break;
Expand Down Expand Up @@ -489,16 +476,9 @@ fn network_run_push(
.map(|node| node.gossip.push.num_pending(&node.gossip.crds))
.sum();
trace!(
"network_run_push_{}: now: {} queue: {} bytes: {} num_msgs: {} prunes: {} stake_pruned: {} delivered: {}",
num,
now,
total,
bytes,
num_msgs,
prunes,
stake_pruned,
delivered,
);
"network_run_push_{num}: now: {now} queue: {total} bytes: {bytes} num_msgs: \
{num_msgs} prunes: {prunes} stake_pruned: {stake_pruned} delivered: {delivered}"
);
}

network.stake_pruned += stake_pruned;
Expand Down Expand Up @@ -664,15 +644,9 @@ fn network_run_pull(
break;
}
trace!(
"network_run_pull_{}: now: {} connections: {} convergance: {} bytes: {} msgs: {} overhead: {}",
num,
now,
total,
convergance,
bytes,
msgs,
overhead
);
"network_run_pull_{num}: now: {now} connections: {total} convergance: {convergance} \
bytes: {bytes} msgs: {msgs} overhead: {overhead}"
);
}
(convergance, bytes)
}
Expand Down
15 changes: 6 additions & 9 deletions gossip/tests/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ pub fn cluster_info_scale() {
sleep(Duration::from_secs(1));
}
time.stop();
warn!("found {} nodes in {} success: {}", num_nodes, time, success);
warn!("found {num_nodes} nodes in {time} success: {success}");

for num_votes in 1..1000 {
let mut time = Measure::start("votes");
Expand Down Expand Up @@ -390,21 +390,18 @@ pub fn cluster_info_scale() {
}
}
warn!("not_done: {}/{}", not_done, nodes.len());
warn!("num_old: {}", num_old);
warn!("num_push_total: {}", num_push_total);
warn!("num_pushes: {}", num_pushes);
warn!("num_pulls: {}", num_pulls);
warn!("num_old: {num_old}");
warn!("num_push_total: {num_push_total}");
warn!("num_pushes: {num_pushes}");
warn!("num_pulls: {num_pulls}");
success = not_done < (nodes.len() / 20);
if success {
break;
}
sleep(Duration::from_millis(200));
}
time.stop();
warn!(
"propagated vote {} in {} success: {}",
num_votes, time, success
);
warn!("propagated vote {num_votes} in {time} success: {success}");
sleep(Duration::from_millis(200));
for (node, _, _) in nodes.iter() {
node.gossip.push.num_old.store(0, Ordering::Relaxed);
Expand Down
Loading