Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bench-vote/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ solana-streamer = { workspace = true }
solana-transaction = { workspace = true }
solana-version = { workspace = true }
solana-vote-program = { workspace = true }
tokio-util = { workspace = true }

[target.'cfg(not(any(target_env = "msvc", target_os = "freebsd")))'.dependencies]
jemallocator = { workspace = true }
14 changes: 9 additions & 5 deletions bench-vote/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use {
solana_streamer::{
packet::PacketBatchRecycler,
quic::{
spawn_server, QuicServerParams, DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER,
spawn_server_with_cancel, QuicServerParams, DEFAULT_MAX_QUIC_CONNECTIONS_PER_PEER,
DEFAULT_MAX_STAKED_CONNECTIONS,
},
streamer::{receiver, PacketBatchReceiver, StakedNodes, StreamerReceiveStats},
Expand All @@ -36,6 +36,7 @@ use {
thread::{self, spawn, JoinHandle, Result},
time::{Duration, Instant, SystemTime},
},
tokio_util::sync::CancellationToken,
};

#[cfg(not(any(target_env = "msvc", target_os = "freebsd")))]
Expand Down Expand Up @@ -244,8 +245,9 @@ fn main() -> Result<()> {
}
});

let (exit, read_threads, sink_threads, destination) = if !client_only {
let (exit, cancel, read_threads, sink_threads, destination) = if !client_only {
let exit = Arc::new(AtomicBool::new(false));
let cancel = CancellationToken::new();

let mut read_channels = Vec::new();
let mut read_threads = Vec::new();
Expand Down Expand Up @@ -273,15 +275,15 @@ fn main() -> Result<()> {
let (s_reader, r_reader) = unbounded();
read_channels.push(r_reader);

let server = spawn_server(
let server = spawn_server_with_cancel(
"solRcvrBenVote",
"bench_vote_metrics",
read_sockets,
&quic_params.identity_keypair,
s_reader,
exit.clone(),
quic_params.staked_nodes.clone(),
quic_server_params,
cancel.clone(),
)
.unwrap();
read_threads.push(server.thread);
Expand Down Expand Up @@ -316,12 +318,13 @@ fn main() -> Result<()> {
println!("Running server at {destination:?}");
(
Some(exit),
Some(cancel),
Some(read_threads),
Some(sink_threads),
destination,
)
} else {
(None, None, None, destination.unwrap())
(None, None, None, None, destination.unwrap())
};

let start = SystemTime::now();
Expand All @@ -344,6 +347,7 @@ fn main() -> Result<()> {
if !server_only {
if let Some(exit) = exit {
exit.store(true, Ordering::Relaxed);
cancel.unwrap().cancel();
}
} else {
println!("To stop the server, please press ^C");
Expand Down
16 changes: 9 additions & 7 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use {
vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender},
},
solana_streamer::{
quic::{spawn_server, QuicServerParams, SpawnServerResult},
quic::{spawn_server_with_cancel, QuicServerParams, SpawnServerResult},
streamer::StakedNodes,
},
solana_turbine::{
Expand All @@ -65,6 +65,7 @@ use {
time::Duration,
},
tokio::sync::mpsc::Sender as AsyncSender,
tokio_util::sync::CancellationToken,
};

pub struct TpuSockets {
Expand Down Expand Up @@ -156,6 +157,7 @@ impl Tpu {
enable_block_production_forwarding: bool,
_generator_config: Option<GeneratorConfig>, /* vestigial code for replay invalidator */
key_notifiers: Arc<RwLock<KeyUpdaters>>,
cancel: CancellationToken,
) -> Self {
let TpuSockets {
transactions: transactions_sockets,
Expand Down Expand Up @@ -208,15 +210,15 @@ impl Tpu {
endpoints: _,
thread: tpu_vote_quic_t,
key_updater: vote_streamer_key_updater,
} = spawn_server(
} = spawn_server_with_cancel(
"solQuicTVo",
"quic_streamer_tpu_vote",
tpu_vote_quic_sockets,
keypair,
vote_packet_sender.clone(),
exit.clone(),
staked_nodes.clone(),
vote_quic_server_config,
cancel.clone(),
)
.unwrap();

Expand All @@ -226,15 +228,15 @@ impl Tpu {
endpoints: _,
thread: tpu_quic_t,
key_updater,
} = spawn_server(
} = spawn_server_with_cancel(
"solQuicTpu",
"quic_streamer_tpu",
transactions_quic_sockets,
keypair,
packet_sender,
exit.clone(),
staked_nodes.clone(),
tpu_quic_server_config,
cancel.clone(),
)
.unwrap();
(Some(tpu_quic_t), Some(key_updater))
Expand All @@ -248,15 +250,15 @@ impl Tpu {
endpoints: _,
thread: tpu_forwards_quic_t,
key_updater: forwards_key_updater,
} = spawn_server(
} = spawn_server_with_cancel(
"solQuicTpuFwd",
"quic_streamer_tpu_forwards",
transactions_forwards_quic_sockets,
keypair,
forwarded_packet_sender,
exit.clone(),
staked_nodes.clone(),
tpu_fwd_quic_server_config,
cancel,
)
.unwrap();
(Some(tpu_forwards_quic_t), Some(forwards_key_updater))
Expand Down
13 changes: 7 additions & 6 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,21 +735,21 @@ impl Validator {
timer.stop();
info!("Cleaning orphaned account snapshot directories done. {timer}");

// token used to cancel tpu-client-next.
let cancel_tpu_client_next = CancellationToken::new();
// token used to cancel tpu-client-next and streamer.
let cancel = CancellationToken::new();
{
let exit = exit.clone();
config
.validator_exit
.write()
.unwrap()
.register_exit(Box::new(move || exit.store(true, Ordering::Relaxed)));
let cancel_tpu_client_next = cancel_tpu_client_next.clone();
let cancel = cancel.clone();
config
.validator_exit
.write()
.unwrap()
.register_exit(Box::new(move || cancel_tpu_client_next.cancel()));
.register_exit(Box::new(move || cancel.cancel()));
}

let (
Expand Down Expand Up @@ -1178,7 +1178,7 @@ impl Validator {
Arc::as_ref(&identity_keypair),
node.sockets.rpc_sts_client,
runtime_handle.clone(),
cancel_tpu_client_next.clone(),
cancel.clone(),
)
} else {
let Some(connection_cache) = &connection_cache else {
Expand Down Expand Up @@ -1633,7 +1633,7 @@ impl Validator {
Arc::as_ref(&identity_keypair),
tpu_transactions_forwards_client_sockets.take().unwrap(),
runtime_handle.clone(),
cancel_tpu_client_next,
cancel.clone(),
node_multihoming.clone(),
))
};
Expand Down Expand Up @@ -1690,6 +1690,7 @@ impl Validator {
config.enable_block_production_forwarding,
config.generator_config.clone(),
key_notifiers.clone(),
cancel,
);

datapoint_info!(
Expand Down
2 changes: 1 addition & 1 deletion programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quic-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ solana-net-utils = { workspace = true, features = ["dev-context-only-utils"] }
solana-packet = { workspace = true }
solana-perf = { workspace = true }
solana-streamer = { workspace = true, features = ["dev-context-only-utils"] }
tokio-util = { workspace = true }
Loading
Loading