Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion bench-vote/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,14 @@ fn main() -> Result<()> {
max_connections_per_ipaddr_per_min: max_connections_per_ipaddr_per_min
.try_into()
.unwrap(),
..Default::default()
};
let qos_config = SwQosConfig {
max_connections_per_unstaked_peer: max_connections_per_peer,
max_staked_connections: max_connections,
max_unstaked_connections: 0,
..Default::default()
};
let qos_config = SwQosConfig::default();
let (s_reader, r_reader) = unbounded();
read_channels.push(r_reader);

Expand Down
5 changes: 3 additions & 2 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,17 +589,18 @@ impl ValidatorTpuConfig {
let tpu_fwd_quic_server_config = SwQosQuicStreamerConfig {
quic_streamer_config: QuicStreamerConfig {
max_connections_per_ipaddr_per_min: 32,
..Default::default()
},
qos_config: SwQosConfig {
max_unstaked_connections: 0,
..Default::default()
},
qos_config: SwQosConfig::default(),
};

// vote and tpu_fwd share the same characteristics -- disallow non-staked connections:
let vote_quic_server_config = SimpleQosQuicStreamerConfig {
quic_streamer_config: QuicStreamerConfig {
max_connections_per_ipaddr_per_min: 32,
max_unstaked_connections: 0,
..Default::default()
},
qos_config: SimpleQosConfig::default(),
Expand Down
13 changes: 9 additions & 4 deletions streamer/examples/swqos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ pub fn load_staked_nodes_overrides(path: &String) -> anyhow::Result<HashMap<Pubk

#[derive(Debug, Parser)]
struct Cli {
#[arg(short, long, default_value_t = 1)]
max_connections_per_peer: usize,
#[arg(short, long, default_value_t = 10)]
max_connections_per_staked_peer: usize,
#[arg(short, long, default_value_t = 10)]
max_connections_per_unstaked_peer: usize,

#[arg(short, long, default_value = "0.0.0.0:8008")]
bind_to: SocketAddr,
Expand Down Expand Up @@ -120,10 +122,13 @@ async fn main() -> anyhow::Result<()> {
sender,
staked_nodes,
QuicStreamerConfig {
max_connections_per_unstaked_peer: cli.max_connections_per_peer,
..QuicStreamerConfig::default()
},
SwQosConfig::default(),
SwQosConfig {
max_connections_per_staked_peer: cli.max_connections_per_staked_peer,
max_connections_per_unstaked_peer: cli.max_connections_per_unstaked_peer,
..Default::default()
},
cancel.clone(),
)?;
info!("Server listening on {}", socket.local_addr()?);
Expand Down
5 changes: 5 additions & 0 deletions streamer/src/nonblocking/qos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,9 @@ pub(crate) trait QosController<C: ConnectionContext> {
context: &C,
connection: Connection,
) -> impl Future<Output = usize> + Send;

/// How many concurrent
fn max_concurrent_connections(&self) -> usize;
}

pub trait QosConfig {}
29 changes: 17 additions & 12 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ where
})
.collect::<Result<Vec<_>, _>>()?;

let max_concurrent_connections = quic_server_params.max_concurrent_connections();
let max_concurrent_connections = qos.max_concurrent_connections();
let handle = tokio::spawn({
let endpoints = endpoints.clone();
let stats = stats.clone();
Expand Down Expand Up @@ -334,10 +334,9 @@ where
continue;
}

let Ok(client_connection_tracker) = ClientConnectionTracker::new(
stats.clone(),
quic_server_params.max_concurrent_connections(),
) else {
let Ok(client_connection_tracker) =
ClientConnectionTracker::new(stats.clone(), qos.max_concurrent_connections())
else {
stats
.refused_connections_too_many_open_connections
.fetch_add(1, Ordering::Relaxed);
Expand Down Expand Up @@ -1351,7 +1350,7 @@ pub mod test {
} = setup_quic_server(
None,
QuicStreamerConfig::default_for_tests(),
SwQosConfig::default(),
SwQosConfig::default_for_tests(),
);
check_block_multiple_connections(server_address).await;
cancel.cancel();
Expand All @@ -1372,10 +1371,12 @@ pub mod test {
} = setup_quic_server(
None,
QuicStreamerConfig {
max_connections_per_unstaked_peer: 2,
..QuicStreamerConfig::default_for_tests()
},
SwQosConfig::default(),
SwQosConfig {
max_connections_per_unstaked_peer: 2,
..SwQosConfig::default_for_tests()
},
);

let client_socket = bind_to_localhost_unique().expect("should bind - client");
Expand Down Expand Up @@ -1582,10 +1583,12 @@ pub mod test {
sender,
staked_nodes,
QuicStreamerConfig {
max_unstaked_connections: 0, // Do not allow any connection from unstaked clients/nodes
..QuicStreamerConfig::default_for_tests()
},
SwQosConfig::default(),
SwQosConfig {
max_unstaked_connections: 0, // Do not allow any connection from unstaked clients/nodes
..Default::default()
},
cancel.clone(),
)
.unwrap();
Expand Down Expand Up @@ -1616,10 +1619,12 @@ pub mod test {
sender,
staked_nodes,
QuicStreamerConfig {
max_connections_per_unstaked_peer: 2,
..QuicStreamerConfig::default_for_tests()
},
SwQosConfig::default(),
SwQosConfig {
max_connections_per_unstaked_peer: 2,
..Default::default()
},
cancel.clone(),
)
.unwrap();
Expand Down
Loading
Loading