From 602b444eb8f00abe66eea6a9301622a9e7c28d34 Mon Sep 17 00:00:00 2001 From: steviez Date: Wed, 16 Jul 2025 11:48:58 -0500 Subject: [PATCH 1/3] Cargo clippy with Rust 1.88 toolchain --- .../nonblocking/connection_rate_limiter.rs | 4 +-- streamer/src/nonblocking/quic.rs | 34 ++++++++----------- streamer/src/nonblocking/recvmmsg.rs | 4 +-- streamer/src/nonblocking/stream_throttle.rs | 6 ++-- streamer/src/nonblocking/testing_utilities.rs | 2 +- streamer/src/quic.rs | 2 +- streamer/src/recvmmsg.rs | 4 +-- streamer/src/streamer.rs | 5 ++- 8 files changed, 27 insertions(+), 34 deletions(-) diff --git a/streamer/src/nonblocking/connection_rate_limiter.rs b/streamer/src/nonblocking/connection_rate_limiter.rs index fa781f8d6e0d44..8680feba923fda 100644 --- a/streamer/src/nonblocking/connection_rate_limiter.rs +++ b/streamer/src/nonblocking/connection_rate_limiter.rs @@ -22,10 +22,10 @@ impl ConnectionRateLimiter { pub fn is_allowed(&self, ip: &IpAddr) -> bool { // Acquire a permit from the rate limiter for the given IP address if self.limiter.check_key(ip).is_ok() { - debug!("Request from IP {:?} allowed", ip); + debug!("Request from IP {ip:?} allowed"); true // Request allowed } else { - debug!("Request from IP {:?} blocked", ip); + debug!("Request from IP {ip:?} blocked"); false // Request blocked } } diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 6de0280ba1d560..fefb07fdededbc 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -258,8 +258,7 @@ impl ClientConnectionTracker { if open_connections >= max_concurrent_connections { stats.open_connections.fetch_sub(1, Ordering::Relaxed); debug!( - "There are too many concurrent connections opened already: open: {}, max: {}", - open_connections, max_concurrent_connections + "There are too many concurrent connections opened already: open: {open_connections}, max: {max_concurrent_connections}" ); return Err(()); } @@ -408,7 +407,7 @@ async fn run_server( stats .outstanding_incoming_connection_attempts .fetch_sub(1, Ordering::Relaxed); - debug!("Incoming::accept(): error {:?}", err); + debug!("Incoming::accept(): error {err:?}"); } } } else { @@ -465,8 +464,7 @@ pub fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stak // No checked math for f64 type. So let's explicitly check for 0 here if total_stake == 0 || peer_stake > total_stake { warn!( - "Invalid stake values: peer_stake: {:?}, total_stake: {:?}", - peer_stake, total_stake, + "Invalid stake values: peer_stake: {peer_stake:?}, total_stake: {total_stake:?}", ); QUIC_MIN_STAKED_CONCURRENT_STREAMS @@ -709,8 +707,7 @@ async fn setup_connection( debug!("Got a connection {from:?}"); if !rate_limiter.is_allowed(&from.ip()) { debug!( - "Reject connection from {:?} -- rate limiting exceeded", - from + "Reject connection from {from:?} -- rate limiting exceeded" ); stats .connection_rate_limited_per_ipaddr @@ -856,7 +853,7 @@ async fn setup_connection( } fn handle_connection_error(e: quinn::ConnectionError, stats: &StreamerStats, from: SocketAddr) { - debug!("error: {:?} from: {:?}", e, from); + debug!("error: {e:?} from: {from:?}"); stats.connection_setup_error.fetch_add(1, Ordering::Relaxed); match e { quinn::ConnectionError::TimedOut => { @@ -931,7 +928,7 @@ fn packet_batch_sender( stats .total_packet_batch_send_err .fetch_add(1, Ordering::Relaxed); - trace!("Send error: {}", e); + trace!("Send error: {e}"); // The downstream channel is disconnected, this error is not recoverable. if matches!(e, TrySendError::Disconnected(_)) { @@ -951,7 +948,7 @@ fn packet_batch_sender( .total_bytes_sent_to_consumer .fetch_add(total_bytes, Ordering::Relaxed); - trace!("Sent {} packet batch", len); + trace!("Sent {len} packet batch"); } break; } @@ -1079,7 +1076,7 @@ async fn handle_connection( stream = connection.accept_uni() => match stream { Ok(stream) => stream, Err(e) => { - debug!("stream error: {:?}", e); + debug!("stream error: {e:?}"); break; } }, @@ -1098,10 +1095,9 @@ async fn handle_connection( STREAM_THROTTLING_INTERVAL.saturating_sub(throttle_interval_start.elapsed()); if !throttle_duration.is_zero() { - debug!("Throttling stream from {remote_addr:?}, peer type: {:?}, total stake: {}, \ + debug!("Throttling stream from {remote_addr:?}, peer type: {peer_type:?}, total stake: {total_stake}, \ max_streams_per_interval: {max_streams_per_throttling_interval}, read_interval_streams: {streams_read_in_throttle_interval} \ - throttle_duration: {throttle_duration:?}", - peer_type, total_stake); + throttle_duration: {throttle_duration:?}"); stats.throttled_streams.fetch_add(1, Ordering::Relaxed); match peer_type { ConnectionPeerType::Unstaked => { @@ -1154,7 +1150,7 @@ async fn handle_connection( Ok(Ok(chunk)) => chunk.unwrap_or(0), // read_chunk returned error Ok(Err(e)) => { - debug!("Received stream error: {:?}", e); + debug!("Received stream error: {e:?}"); stats .total_stream_read_errors .fetch_add(1, Ordering::Relaxed); @@ -1296,7 +1292,7 @@ async fn handle_chunks( .fetch_add(1, Ordering::Relaxed); } } - trace!("packet batch send error {:?}", err); + trace!("packet batch send error {err:?}"); } else { stats .total_packets_sent_for_batching @@ -1321,7 +1317,7 @@ async fn handle_chunks( } } - trace!("sent {} byte packet for batching", bytes_sent); + trace!("sent {bytes_sent} byte packet for batching"); } Ok(StreamState::Finished) @@ -1589,14 +1585,14 @@ pub mod test { let mut s1 = conn1.open_uni().await.unwrap(); s1.write_all(&[0u8]).await.unwrap(); s1.finish().unwrap(); - info!("done {}", i); + info!("done {i}"); sleep(Duration::from_millis(1000)).await; } let mut received = 0; loop { if let Ok(_x) = receiver.try_recv() { received += 1; - info!("got {}", received); + info!("got {received}"); } else { sleep(Duration::from_millis(500)).await; } diff --git a/streamer/src/nonblocking/recvmmsg.rs b/streamer/src/nonblocking/recvmmsg.rs index fdd2675dc8a794..8f07ae7ab95d33 100644 --- a/streamer/src/nonblocking/recvmmsg.rs +++ b/streamer/src/nonblocking/recvmmsg.rs @@ -99,7 +99,7 @@ mod tests { match test_setup_reader_sender("::1:0").await { Ok(config) => test_one_iter(config).await, - Err(e) => warn!("Failed to configure IPv6: {:?}", e), + Err(e) => warn!("Failed to configure IPv6: {e:?}"), } } @@ -136,7 +136,7 @@ mod tests { match test_setup_reader_sender("::1:0").await { Ok(config) => test_multi_iter(config).await, - Err(e) => warn!("Failed to configure IPv6: {:?}", e), + Err(e) => warn!("Failed to configure IPv6: {e:?}"), } } diff --git a/streamer/src/nonblocking/stream_throttle.rs b/streamer/src/nonblocking/stream_throttle.rs index 064d3ded8352f3..0c7ed5bee741b6 100644 --- a/streamer/src/nonblocking/stream_throttle.rs +++ b/streamer/src/nonblocking/stream_throttle.rs @@ -102,8 +102,7 @@ impl StakedStreamLoadEMA { let Ok(updated_load_ema) = u64::try_from(updated_load_ema) else { error!( - "Failed to convert EMA {} to a u64. Not updating the load EMA", - updated_load_ema + "Failed to convert EMA {updated_load_ema} to a u64. Not updating the load EMA" ); self.stats .stream_load_ema_overflow @@ -164,8 +163,7 @@ impl StakedStreamLoadEMA { / u128::from(EMA_WINDOW_MS); let calculated_capacity = u64::try_from(calculated_capacity).unwrap_or_else(|_| { error!( - "Failed to convert stream capacity {} to u64. Using minimum load capacity", - calculated_capacity + "Failed to convert stream capacity {calculated_capacity} to u64. Using minimum load capacity" ); self.stats .stream_load_capacity_overflow diff --git a/streamer/src/nonblocking/testing_utilities.rs b/streamer/src/nonblocking/testing_utilities.rs index 5db38ecf20d22e..bd9e6afbb283c0 100644 --- a/streamer/src/nonblocking/testing_utilities.rs +++ b/streamer/src/nonblocking/testing_utilities.rs @@ -151,7 +151,7 @@ pub async fn check_multiple_streams( let conn2 = Arc::new(make_client_endpoint(&server_address, client_keypair).await); let mut num_expected_packets = 0; for i in 0..10 { - info!("sending: {}", i); + info!("sending: {i}"); let c1 = conn1.clone(); let c2 = conn2.clone(); let mut s1 = c1.open_uni().await.unwrap(); diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 7ab9202008b73e..ed1021f2c2a0cf 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -674,7 +674,7 @@ pub fn spawn_server_multi( .name(thread_name.into()) .spawn(move || { if let Err(e) = runtime.block_on(result.thread) { - warn!("error from runtime.block_on: {:?}", e); + warn!("error from runtime.block_on: {e:?}"); } }) .unwrap(); diff --git a/streamer/src/recvmmsg.rs b/streamer/src/recvmmsg.rs index 22398f4a5bff2b..2a48aa06cc2ccc 100644 --- a/streamer/src/recvmmsg.rs +++ b/streamer/src/recvmmsg.rs @@ -226,7 +226,7 @@ mod tests { match test_setup_reader_sender(IpAddr::V6(Ipv6Addr::LOCALHOST)) { Ok(config) => test_one_iter(config), - Err(e) => warn!("Failed to configure IPv6: {:?}", e), + Err(e) => warn!("Failed to configure IPv6: {e:?}"), } } @@ -262,7 +262,7 @@ mod tests { match test_setup_reader_sender(IpAddr::V6(Ipv6Addr::LOCALHOST)) { Ok(config) => test_multi_iter(config), - Err(e) => warn!("Failed to configure IPv6: {:?}", e), + Err(e) => warn!("Failed to configure IPv6: {e:?}"), } } diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index 63c9d66fb4c18a..f4ba01da5d2ba2 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -398,8 +398,7 @@ impl StreamerSendStats { entries.truncate(MAX_REPORT_ENTRIES); } info!( - "streamer send {} hosts: count:{} {:?}", - name, num_entries, entries, + "streamer send {name} hosts: count:{num_entries} {entries:?}", ); } @@ -610,7 +609,7 @@ fn responder_loop( let now = timestamp(); if now - last_print > 1000 && errors != 0 { datapoint_info!(name, ("errors", errors, i64),); - info!("{} last-error: {:?} count: {}", name, last_error, errors); + info!("{name} last-error: {last_error:?} count: {errors}"); last_print = now; errors = 0; } From ae44c2fb7f291d78265529008015d6a17ca6a473 Mon Sep 17 00:00:00 2001 From: steviez Date: Wed, 16 Jul 2025 11:49:40 -0500 Subject: [PATCH 2/3] cargo fmt with format_strings = true --- streamer/src/nonblocking/quic.rs | 19 +++++++++++-------- streamer/src/nonblocking/stream_throttle.rs | 7 +++---- streamer/src/streamer.rs | 4 +--- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index fefb07fdededbc..bb53d3903a0679 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -258,7 +258,8 @@ impl ClientConnectionTracker { if open_connections >= max_concurrent_connections { stats.open_connections.fetch_sub(1, Ordering::Relaxed); debug!( - "There are too many concurrent connections opened already: open: {open_connections}, max: {max_concurrent_connections}" + "There are too many concurrent connections opened already: open: \ + {open_connections}, max: {max_concurrent_connections}" ); return Err(()); } @@ -464,7 +465,8 @@ pub fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stak // No checked math for f64 type. So let's explicitly check for 0 here if total_stake == 0 || peer_stake > total_stake { warn!( - "Invalid stake values: peer_stake: {peer_stake:?}, total_stake: {total_stake:?}", + "Invalid stake values: peer_stake: {peer_stake:?}, total_stake: \ + {total_stake:?}", ); QUIC_MIN_STAKED_CONCURRENT_STREAMS @@ -706,9 +708,7 @@ async fn setup_connection( Ok(new_connection) => { debug!("Got a connection {from:?}"); if !rate_limiter.is_allowed(&from.ip()) { - debug!( - "Reject connection from {from:?} -- rate limiting exceeded" - ); + debug!("Reject connection from {from:?} -- rate limiting exceeded"); stats .connection_rate_limited_per_ipaddr .fetch_add(1, Ordering::Relaxed); @@ -1095,9 +1095,12 @@ async fn handle_connection( STREAM_THROTTLING_INTERVAL.saturating_sub(throttle_interval_start.elapsed()); if !throttle_duration.is_zero() { - debug!("Throttling stream from {remote_addr:?}, peer type: {peer_type:?}, total stake: {total_stake}, \ - max_streams_per_interval: {max_streams_per_throttling_interval}, read_interval_streams: {streams_read_in_throttle_interval} \ - throttle_duration: {throttle_duration:?}"); + debug!( + "Throttling stream from {remote_addr:?}, peer type: {peer_type:?}, total \ + stake: {total_stake}, max_streams_per_interval: \ + {max_streams_per_throttling_interval}, read_interval_streams: \ + {streams_read_in_throttle_interval} throttle_duration: {throttle_duration:?}" + ); stats.throttled_streams.fetch_add(1, Ordering::Relaxed); match peer_type { ConnectionPeerType::Unstaked => { diff --git a/streamer/src/nonblocking/stream_throttle.rs b/streamer/src/nonblocking/stream_throttle.rs index 0c7ed5bee741b6..c4770b44b427b1 100644 --- a/streamer/src/nonblocking/stream_throttle.rs +++ b/streamer/src/nonblocking/stream_throttle.rs @@ -101,9 +101,7 @@ impl StakedStreamLoadEMA { } let Ok(updated_load_ema) = u64::try_from(updated_load_ema) else { - error!( - "Failed to convert EMA {updated_load_ema} to a u64. Not updating the load EMA" - ); + error!("Failed to convert EMA {updated_load_ema} to a u64. Not updating the load EMA"); self.stats .stream_load_ema_overflow .fetch_add(1, Ordering::Relaxed); @@ -163,7 +161,8 @@ impl StakedStreamLoadEMA { / u128::from(EMA_WINDOW_MS); let calculated_capacity = u64::try_from(calculated_capacity).unwrap_or_else(|_| { error!( - "Failed to convert stream capacity {calculated_capacity} to u64. Using minimum load capacity" + "Failed to convert stream capacity {calculated_capacity} to u64. Using \ + minimum load capacity" ); self.stats .stream_load_capacity_overflow diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index f4ba01da5d2ba2..a0fbb5eda378b7 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -397,9 +397,7 @@ impl StreamerSendStats { }); entries.truncate(MAX_REPORT_ENTRIES); } - info!( - "streamer send {name} hosts: count:{num_entries} {entries:?}", - ); + info!("streamer send {name} hosts: count:{num_entries} {entries:?}",); } fn maybe_submit(&mut self, name: &'static str, sender: &Sender>) { From 5635a0422de0f381997f8da3a3bd841f7e6ab902 Mon Sep 17 00:00:00 2001 From: steviez Date: Wed, 16 Jul 2025 14:15:10 -0500 Subject: [PATCH 3/3] Manually remove some trailing commas --- streamer/src/nonblocking/quic.rs | 2 +- streamer/src/streamer.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index bb53d3903a0679..f1e5c195cc5146 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -466,7 +466,7 @@ pub fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stak if total_stake == 0 || peer_stake > total_stake { warn!( "Invalid stake values: peer_stake: {peer_stake:?}, total_stake: \ - {total_stake:?}", + {total_stake:?}" ); QUIC_MIN_STAKED_CONCURRENT_STREAMS diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index a0fbb5eda378b7..afefffd0e2a29f 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -397,7 +397,7 @@ impl StreamerSendStats { }); entries.truncate(MAX_REPORT_ENTRIES); } - info!("streamer send {name} hosts: count:{num_entries} {entries:?}",); + info!("streamer send {name} hosts: count:{num_entries} {entries:?}"); } fn maybe_submit(&mut self, name: &'static str, sender: &Sender>) {