From 1cdf49cb30bb42eef786c0bc208d7d631332515f Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Tue, 16 Sep 2025 14:23:09 +0000 Subject: [PATCH 1/2] use TaskTracker in streamer --- streamer/Cargo.toml | 2 +- streamer/src/nonblocking/quic.rs | 45 +++++++++++++++++++++++--------- 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 417534a1b50..241d49664ad 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -58,7 +58,7 @@ solana-transaction-error = { workspace = true } solana-transaction-metrics-tracker = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["full"] } -tokio-util = { workspace = true } +tokio-util = { workspace = true, features = ["rt"]} x509-parser = { workspace = true } [dev-dependencies] diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index d6f9c11e817..4e054d3d1d8 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -62,7 +62,7 @@ use { task::{self, JoinHandle}, time::{sleep, timeout}, }, - tokio_util::sync::CancellationToken, + tokio_util::{sync::CancellationToken, task::TaskTracker}, }; pub const DEFAULT_WAIT_FOR_CHUNK_TIMEOUT: Duration = Duration::from_secs(2); @@ -240,15 +240,24 @@ pub fn spawn_server_with_cancel( }); let max_concurrent_connections = quic_server_params.max_concurrent_connections(); - let handle = tokio::spawn(run_server( - name, - endpoints.clone(), - packet_batch_sender, - staked_nodes, - stats.clone(), - quic_server_params, - cancel, - )); + let handle = tokio::spawn({ + let endpoints = endpoints.clone(); + let stats = stats.clone(); + async move { + let tasks = run_server( + name, + endpoints.clone(), + packet_batch_sender, + staked_nodes, + stats.clone(), + quic_server_params, + cancel, + ) + .await; + tasks.close(); + tasks.wait().await; + } + }); Ok(SpawnNonBlockingServerResult { endpoints, @@ -312,7 +321,7 @@ async fn run_server( stats: Arc, quic_server_params: QuicServerParams, cancel: CancellationToken, -) { +) -> TaskTracker { let rate_limiter = Arc::new(ConnectionRateLimiter::new( quic_server_params.max_connections_per_ipaddr_per_min, )); @@ -347,6 +356,7 @@ async fn run_server( }) .collect::>(); + let tasks = TaskTracker::new(); loop { let timeout_connection = select! { ready = accepts.next() => { @@ -406,7 +416,7 @@ async fn run_server( Ok(connecting) => { let rate_limiter = rate_limiter.clone(); let overall_connection_rate_limiter = overall_connection_rate_limiter.clone(); - tokio::spawn(setup_connection( + tasks.spawn(setup_connection( connecting, rate_limiter, overall_connection_rate_limiter, @@ -418,6 +428,7 @@ async fn run_server( stats.clone(), stream_load_ema.clone(), quic_server_params.clone(), + tasks.clone(), )); } Err(err) => { @@ -431,6 +442,7 @@ async fn run_server( debug!("accept(): Timed out waiting for connection"); } } + tasks } fn prune_unstaked_connection_table( @@ -563,6 +575,7 @@ fn handle_and_cache_new_connection( connection_table: Arc>, params: &NewConnectionHandlerParams, stream_load_ema: Arc, + tasks: TaskTracker, ) -> Result<(), ConnectionHandlerError> { if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams( params.peer_type, @@ -601,7 +614,7 @@ fn handle_and_cache_new_connection( } connection.set_max_concurrent_uni_streams(max_uni_streams); - tokio::spawn(handle_connection( + tasks.spawn(handle_connection( connection, remote_addr, last_update, @@ -638,6 +651,7 @@ async fn prune_unstaked_connections_and_add_new_connection( connection_table: Arc>, params: &NewConnectionHandlerParams, stream_load_ema: Arc, + tasks: TaskTracker, ) -> Result<(), ConnectionHandlerError> { let stats = params.stats.clone(); if params.max_connections > 0 { @@ -651,6 +665,7 @@ async fn prune_unstaked_connections_and_add_new_connection( connection_table_clone, params, stream_load_ema, + tasks, ) } else { connection.close( @@ -717,6 +732,7 @@ async fn setup_connection( stats: Arc, stream_load_ema: Arc, quic_server_params: QuicServerParams, + tasks: TaskTracker, ) { const PRUNE_RANDOM_SAMPLE_SIZE: usize = 2; let from = connecting.remote_address(); @@ -815,6 +831,7 @@ async fn setup_connection( staked_connection_table.clone(), ¶ms, stream_load_ema.clone(), + tasks, ) { stats .connection_added_from_staked_peer @@ -830,6 +847,7 @@ async fn setup_connection( unstaked_connection_table.clone(), ¶ms, stream_load_ema.clone(), + tasks, ) .await { @@ -853,6 +871,7 @@ async fn setup_connection( unstaked_connection_table.clone(), ¶ms, stream_load_ema.clone(), + tasks, ) .await { From ee7d17ad71f67baa259a4e24f292b7a66efd2c95 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Tue, 16 Sep 2025 14:23:29 +0000 Subject: [PATCH 2/2] Update Cargo.lock --- Cargo.lock | 1 + programs/sbf/Cargo.lock | 1 + streamer/Cargo.toml | 2 +- 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index a9a5ae57c1d..4ae7ba4aae5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12934,6 +12934,7 @@ dependencies = [ "futures-core", "futures-io", "futures-sink", + "futures-util", "pin-project-lite", "tokio", ] diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 11e454b9aa9..82ce4750fe2 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -11006,6 +11006,7 @@ dependencies = [ "futures-core", "futures-io", "futures-sink", + "futures-util", "pin-project-lite", "tokio", ] diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 241d49664ad..4a3c33a5a7c 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -58,7 +58,7 @@ solana-transaction-error = { workspace = true } solana-transaction-metrics-tracker = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["full"] } -tokio-util = { workspace = true, features = ["rt"]} +tokio-util = { workspace = true, features = ["rt"] } x509-parser = { workspace = true } [dev-dependencies]