Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions streamer/src/nonblocking/connection_rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ impl ConnectionRateLimiter {
pub fn is_allowed(&self, ip: &IpAddr) -> bool {
// Acquire a permit from the rate limiter for the given IP address
if self.limiter.check_key(ip).is_ok() {
debug!("Request from IP {:?} allowed", ip);
debug!("Request from IP {ip:?} allowed");
true // Request allowed
} else {
debug!("Request from IP {:?} blocked", ip);
debug!("Request from IP {ip:?} blocked");
false // Request blocked
}
}
Expand Down
43 changes: 21 additions & 22 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ impl ClientConnectionTracker {
if open_connections >= max_concurrent_connections {
stats.open_connections.fetch_sub(1, Ordering::Relaxed);
debug!(
"There are too many concurrent connections opened already: open: {}, max: {}",
open_connections, max_concurrent_connections
"There are too many concurrent connections opened already: open: \
{open_connections}, max: {max_concurrent_connections}"
);
return Err(());
}
Expand Down Expand Up @@ -408,7 +408,7 @@ async fn run_server(
stats
.outstanding_incoming_connection_attempts
.fetch_sub(1, Ordering::Relaxed);
debug!("Incoming::accept(): error {:?}", err);
debug!("Incoming::accept(): error {err:?}");
}
}
} else {
Expand Down Expand Up @@ -465,8 +465,8 @@ pub fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stak
// No checked math for f64 type. So let's explicitly check for 0 here
if total_stake == 0 || peer_stake > total_stake {
warn!(
"Invalid stake values: peer_stake: {:?}, total_stake: {:?}",
peer_stake, total_stake,
"Invalid stake values: peer_stake: {peer_stake:?}, total_stake: \
{total_stake:?}"
);

QUIC_MIN_STAKED_CONCURRENT_STREAMS
Expand Down Expand Up @@ -708,10 +708,7 @@ async fn setup_connection(
Ok(new_connection) => {
debug!("Got a connection {from:?}");
if !rate_limiter.is_allowed(&from.ip()) {
debug!(
"Reject connection from {:?} -- rate limiting exceeded",
from
);
debug!("Reject connection from {from:?} -- rate limiting exceeded");
stats
.connection_rate_limited_per_ipaddr
.fetch_add(1, Ordering::Relaxed);
Expand Down Expand Up @@ -856,7 +853,7 @@ async fn setup_connection(
}

fn handle_connection_error(e: quinn::ConnectionError, stats: &StreamerStats, from: SocketAddr) {
debug!("error: {:?} from: {:?}", e, from);
debug!("error: {e:?} from: {from:?}");
stats.connection_setup_error.fetch_add(1, Ordering::Relaxed);
match e {
quinn::ConnectionError::TimedOut => {
Expand Down Expand Up @@ -931,7 +928,7 @@ fn packet_batch_sender(
stats
.total_packet_batch_send_err
.fetch_add(1, Ordering::Relaxed);
trace!("Send error: {}", e);
trace!("Send error: {e}");

// The downstream channel is disconnected, this error is not recoverable.
if matches!(e, TrySendError::Disconnected(_)) {
Expand All @@ -951,7 +948,7 @@ fn packet_batch_sender(
.total_bytes_sent_to_consumer
.fetch_add(total_bytes, Ordering::Relaxed);

trace!("Sent {} packet batch", len);
trace!("Sent {len} packet batch");
}
break;
}
Expand Down Expand Up @@ -1079,7 +1076,7 @@ async fn handle_connection(
stream = connection.accept_uni() => match stream {
Ok(stream) => stream,
Err(e) => {
debug!("stream error: {:?}", e);
debug!("stream error: {e:?}");
break;
}
},
Expand All @@ -1098,10 +1095,12 @@ async fn handle_connection(
STREAM_THROTTLING_INTERVAL.saturating_sub(throttle_interval_start.elapsed());

if !throttle_duration.is_zero() {
debug!("Throttling stream from {remote_addr:?}, peer type: {:?}, total stake: {}, \
max_streams_per_interval: {max_streams_per_throttling_interval}, read_interval_streams: {streams_read_in_throttle_interval} \
throttle_duration: {throttle_duration:?}",
peer_type, total_stake);
debug!(
"Throttling stream from {remote_addr:?}, peer type: {peer_type:?}, total \
stake: {total_stake}, max_streams_per_interval: \
{max_streams_per_throttling_interval}, read_interval_streams: \
{streams_read_in_throttle_interval} throttle_duration: {throttle_duration:?}"
);
stats.throttled_streams.fetch_add(1, Ordering::Relaxed);
match peer_type {
ConnectionPeerType::Unstaked => {
Expand Down Expand Up @@ -1154,7 +1153,7 @@ async fn handle_connection(
Ok(Ok(chunk)) => chunk.unwrap_or(0),
// read_chunk returned error
Ok(Err(e)) => {
debug!("Received stream error: {:?}", e);
debug!("Received stream error: {e:?}");
stats
.total_stream_read_errors
.fetch_add(1, Ordering::Relaxed);
Expand Down Expand Up @@ -1296,7 +1295,7 @@ async fn handle_chunks(
.fetch_add(1, Ordering::Relaxed);
}
}
trace!("packet batch send error {:?}", err);
trace!("packet batch send error {err:?}");
} else {
stats
.total_packets_sent_for_batching
Expand All @@ -1321,7 +1320,7 @@ async fn handle_chunks(
}
}

trace!("sent {} byte packet for batching", bytes_sent);
trace!("sent {bytes_sent} byte packet for batching");
}

Ok(StreamState::Finished)
Expand Down Expand Up @@ -1589,14 +1588,14 @@ pub mod test {
let mut s1 = conn1.open_uni().await.unwrap();
s1.write_all(&[0u8]).await.unwrap();
s1.finish().unwrap();
info!("done {}", i);
info!("done {i}");
sleep(Duration::from_millis(1000)).await;
}
let mut received = 0;
loop {
if let Ok(_x) = receiver.try_recv() {
received += 1;
info!("got {}", received);
info!("got {received}");
} else {
sleep(Duration::from_millis(500)).await;
}
Expand Down
4 changes: 2 additions & 2 deletions streamer/src/nonblocking/recvmmsg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ mod tests {

match test_setup_reader_sender("::1:0").await {
Ok(config) => test_one_iter(config).await,
Err(e) => warn!("Failed to configure IPv6: {:?}", e),
Err(e) => warn!("Failed to configure IPv6: {e:?}"),
}
}

Expand Down Expand Up @@ -136,7 +136,7 @@ mod tests {

match test_setup_reader_sender("::1:0").await {
Ok(config) => test_multi_iter(config).await,
Err(e) => warn!("Failed to configure IPv6: {:?}", e),
Err(e) => warn!("Failed to configure IPv6: {e:?}"),
}
}

Expand Down
9 changes: 3 additions & 6 deletions streamer/src/nonblocking/stream_throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,7 @@ impl StakedStreamLoadEMA {
}

let Ok(updated_load_ema) = u64::try_from(updated_load_ema) else {
error!(
"Failed to convert EMA {} to a u64. Not updating the load EMA",
updated_load_ema
);
error!("Failed to convert EMA {updated_load_ema} to a u64. Not updating the load EMA");
self.stats
.stream_load_ema_overflow
.fetch_add(1, Ordering::Relaxed);
Expand Down Expand Up @@ -164,8 +161,8 @@ impl StakedStreamLoadEMA {
/ u128::from(EMA_WINDOW_MS);
let calculated_capacity = u64::try_from(calculated_capacity).unwrap_or_else(|_| {
error!(
"Failed to convert stream capacity {} to u64. Using minimum load capacity",
calculated_capacity
"Failed to convert stream capacity {calculated_capacity} to u64. Using \
minimum load capacity"
);
self.stats
.stream_load_capacity_overflow
Expand Down
2 changes: 1 addition & 1 deletion streamer/src/nonblocking/testing_utilities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ pub async fn check_multiple_streams(
let conn2 = Arc::new(make_client_endpoint(&server_address, client_keypair).await);
let mut num_expected_packets = 0;
for i in 0..10 {
info!("sending: {}", i);
info!("sending: {i}");
let c1 = conn1.clone();
let c2 = conn2.clone();
let mut s1 = c1.open_uni().await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion streamer/src/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ pub fn spawn_server_multi(
.name(thread_name.into())
.spawn(move || {
if let Err(e) = runtime.block_on(result.thread) {
warn!("error from runtime.block_on: {:?}", e);
warn!("error from runtime.block_on: {e:?}");
}
})
.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions streamer/src/recvmmsg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ mod tests {

match test_setup_reader_sender(IpAddr::V6(Ipv6Addr::LOCALHOST)) {
Ok(config) => test_one_iter(config),
Err(e) => warn!("Failed to configure IPv6: {:?}", e),
Err(e) => warn!("Failed to configure IPv6: {e:?}"),
}
}

Expand Down Expand Up @@ -262,7 +262,7 @@ mod tests {

match test_setup_reader_sender(IpAddr::V6(Ipv6Addr::LOCALHOST)) {
Ok(config) => test_multi_iter(config),
Err(e) => warn!("Failed to configure IPv6: {:?}", e),
Err(e) => warn!("Failed to configure IPv6: {e:?}"),
}
}

Expand Down
7 changes: 2 additions & 5 deletions streamer/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,10 +397,7 @@ impl StreamerSendStats {
});
entries.truncate(MAX_REPORT_ENTRIES);
}
info!(
"streamer send {} hosts: count:{} {:?}",
name, num_entries, entries,
);
info!("streamer send {name} hosts: count:{num_entries} {entries:?}");
}

fn maybe_submit(&mut self, name: &'static str, sender: &Sender<Box<dyn FnOnce() + Send>>) {
Expand Down Expand Up @@ -610,7 +607,7 @@ fn responder_loop<P: SocketProvider>(
let now = timestamp();
if now - last_print > 1000 && errors != 0 {
datapoint_info!(name, ("errors", errors, i64),);
info!("{} last-error: {:?} count: {}", name, last_error, errors);
info!("{name} last-error: {last_error:?} count: {errors}");
last_print = now;
errors = 0;
}
Expand Down
Loading