From 4c807048201262d3d853b146d8fc4878bdfa8ef2 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Thu, 7 Mar 2024 14:10:35 -0800 Subject: [PATCH] use async mutex in quic --- streamer/src/nonblocking/quic.rs | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 225412dd08b315..7b03fd1bff785c 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -35,11 +35,15 @@ use { net::{IpAddr, SocketAddr, UdpSocket}, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, - Arc, Mutex, MutexGuard, RwLock, + Arc, RwLock, }, time::{Duration, Instant}, }, - tokio::{task::JoinHandle, time::timeout}, + tokio::{ + sync::{Mutex, MutexGuard}, + task::JoinHandle, + time::timeout, + }, }; const WAIT_FOR_STREAM_TIMEOUT: Duration = Duration::from_millis(100); @@ -383,7 +387,7 @@ fn handle_and_cache_new_connection( } } -fn prune_unstaked_connections_and_add_new_connection( +async fn prune_unstaked_connections_and_add_new_connection( connection: Connection, connection_table: Arc>, max_connections: usize, @@ -394,7 +398,7 @@ fn prune_unstaked_connections_and_add_new_connection( let stats = params.stats.clone(); if max_connections > 0 { let connection_table_clone = connection_table.clone(); - let mut connection_table = connection_table.lock().unwrap(); + let mut connection_table = connection_table.lock().await; prune_unstaked_connection_table(&mut connection_table, max_connections, stats); handle_and_cache_new_connection( connection, @@ -504,7 +508,7 @@ async fn setup_connection( match params.peer_type { ConnectionPeerType::Staked(stake) => { - let mut connection_table_l = staked_connection_table.lock().unwrap(); + let mut connection_table_l = staked_connection_table.lock().await; if connection_table_l.total_size >= max_staked_connections { let num_pruned = connection_table_l.prune_random(PRUNE_RANDOM_SAMPLE_SIZE, stake); @@ -535,7 +539,9 @@ async fn setup_connection( ¶ms, wait_for_chunk_timeout, stream_load_ema.clone(), - ) { + ) + .await + { stats .connection_added_from_staked_peer .fetch_add(1, Ordering::Relaxed); @@ -557,7 +563,9 @@ async fn setup_connection( ¶ms, wait_for_chunk_timeout, stream_load_ema.clone(), - ) { + ) + .await + { stats .connection_added_from_unstaked_peer .fetch_add(1, Ordering::Relaxed); @@ -800,7 +808,7 @@ async fn handle_connection( } } - let removed_connection_count = connection_table.lock().unwrap().remove_connection( + let removed_connection_count = connection_table.lock().await.remove_connection( ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey), remote_addr.port(), stable_id,