Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ solana-transaction-error = { workspace = true }
solana-transaction-metrics-tracker = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio-util = { workspace = true }
tokio-util = { workspace = true, features = ["rt"] }
x509-parser = { workspace = true }

[dev-dependencies]
Expand Down
45 changes: 32 additions & 13 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use {
task::{self, JoinHandle},
time::{sleep, timeout},
},
tokio_util::sync::CancellationToken,
tokio_util::{sync::CancellationToken, task::TaskTracker},
};

pub const DEFAULT_WAIT_FOR_CHUNK_TIMEOUT: Duration = Duration::from_secs(2);
Expand Down Expand Up @@ -240,15 +240,24 @@ pub fn spawn_server_with_cancel(
});

let max_concurrent_connections = quic_server_params.max_concurrent_connections();
let handle = tokio::spawn(run_server(
name,
endpoints.clone(),
packet_batch_sender,
staked_nodes,
stats.clone(),
quic_server_params,
cancel,
));
let handle = tokio::spawn({
let endpoints = endpoints.clone();
let stats = stats.clone();
async move {
let tasks = run_server(
name,
endpoints.clone(),
packet_batch_sender,
staked_nodes,
stats.clone(),
quic_server_params,
cancel,
)
.await;
tasks.close();
tasks.wait().await;
}
});

Ok(SpawnNonBlockingServerResult {
endpoints,
Expand Down Expand Up @@ -312,7 +321,7 @@ async fn run_server(
stats: Arc<StreamerStats>,
quic_server_params: QuicServerParams,
cancel: CancellationToken,
) {
) -> TaskTracker {
let rate_limiter = Arc::new(ConnectionRateLimiter::new(
quic_server_params.max_connections_per_ipaddr_per_min,
));
Expand Down Expand Up @@ -347,6 +356,7 @@ async fn run_server(
})
.collect::<FuturesUnordered<_>>();

let tasks = TaskTracker::new();
loop {
let timeout_connection = select! {
ready = accepts.next() => {
Expand Down Expand Up @@ -406,7 +416,7 @@ async fn run_server(
Ok(connecting) => {
let rate_limiter = rate_limiter.clone();
let overall_connection_rate_limiter = overall_connection_rate_limiter.clone();
tokio::spawn(setup_connection(
tasks.spawn(setup_connection(
connecting,
rate_limiter,
overall_connection_rate_limiter,
Expand All @@ -418,6 +428,7 @@ async fn run_server(
stats.clone(),
stream_load_ema.clone(),
quic_server_params.clone(),
tasks.clone(),
));
}
Err(err) => {
Expand All @@ -431,6 +442,7 @@ async fn run_server(
debug!("accept(): Timed out waiting for connection");
}
}
tasks
}

fn prune_unstaked_connection_table(
Expand Down Expand Up @@ -563,6 +575,7 @@ fn handle_and_cache_new_connection(
connection_table: Arc<Mutex<ConnectionTable>>,
params: &NewConnectionHandlerParams,
stream_load_ema: Arc<StakedStreamLoadEMA>,
tasks: TaskTracker,
) -> Result<(), ConnectionHandlerError> {
if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams(
params.peer_type,
Expand Down Expand Up @@ -601,7 +614,7 @@ fn handle_and_cache_new_connection(
}
connection.set_max_concurrent_uni_streams(max_uni_streams);

tokio::spawn(handle_connection(
tasks.spawn(handle_connection(
connection,
remote_addr,
last_update,
Expand Down Expand Up @@ -638,6 +651,7 @@ async fn prune_unstaked_connections_and_add_new_connection(
connection_table: Arc<Mutex<ConnectionTable>>,
params: &NewConnectionHandlerParams,
stream_load_ema: Arc<StakedStreamLoadEMA>,
tasks: TaskTracker,
) -> Result<(), ConnectionHandlerError> {
let stats = params.stats.clone();
if params.max_connections > 0 {
Expand All @@ -651,6 +665,7 @@ async fn prune_unstaked_connections_and_add_new_connection(
connection_table_clone,
params,
stream_load_ema,
tasks,
)
} else {
connection.close(
Expand Down Expand Up @@ -717,6 +732,7 @@ async fn setup_connection(
stats: Arc<StreamerStats>,
stream_load_ema: Arc<StakedStreamLoadEMA>,
quic_server_params: QuicServerParams,
tasks: TaskTracker,
) {
const PRUNE_RANDOM_SAMPLE_SIZE: usize = 2;
let from = connecting.remote_address();
Expand Down Expand Up @@ -815,6 +831,7 @@ async fn setup_connection(
staked_connection_table.clone(),
&params,
stream_load_ema.clone(),
tasks,
) {
stats
.connection_added_from_staked_peer
Expand All @@ -830,6 +847,7 @@ async fn setup_connection(
unstaked_connection_table.clone(),
&params,
stream_load_ema.clone(),
tasks,
)
.await
{
Expand All @@ -853,6 +871,7 @@ async fn setup_connection(
unstaked_connection_table.clone(),
&params,
stream_load_ema.clone(),
tasks,
)
.await
{
Expand Down
Loading