Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions dev-bins/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ socket2 = { workspace = true }
solana-keypair = { workspace = true }
solana-measure = { workspace = true }
solana-metrics = { workspace = true }
solana-net-utils = { workspace = true }
solana-native-token = { workspace = true }
solana-net-utils = { workspace = true, features = ["agave-unstable-api"] }
solana-packet = { workspace = true }
solana-perf = { workspace = true }
solana-pubkey = { workspace = true }
Expand All @@ -55,6 +56,7 @@ solana-time-utils = { workspace = true }
solana-tls-utils = { workspace = true }
solana-transaction-error = { workspace = true }
solana-transaction-metrics-tracker = { workspace = true }
static_assertions = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio-util = { workspace = true, features = ["rt"] }
Expand All @@ -66,5 +68,10 @@ anyhow = { workspace = true }
assert_matches = { workspace = true }
chrono = { workspace = true, features = ["now"] }
clap = { version = "4.5.31", features = ["cargo", "derive", "error-context"] }
criterion = { workspace = true, features = ["async", "async_tokio"] }
solana-net-utils = { workspace = true, features = ["dev-context-only-utils"] }
solana-streamer = { path = ".", features = ["agave-unstable-api", "dev-context-only-utils"] }

[[bench]]
name = "bench_refiller"
harness = false
51 changes: 51 additions & 0 deletions streamer/benches/bench_refiller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use {
criterion::{criterion_group, criterion_main, Criterion},
solana_streamer::{
nonblocking::{
stream_throttle::{Refiller, StreamRateLimiter},
testing_utilities::fill_connection_table,
},
quic::StreamerStats,
},
std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::Arc,
},
tokio::sync::Mutex,
};

const NUM_CLIENTS: usize = 10000;
fn bench_refiller(c: &mut Criterion) {
let stats = Arc::new(StreamerStats::default());
let sockets: Vec<_> = (0..NUM_CLIENTS as u32)
.map(|i| SocketAddr::new(IpAddr::V4(Ipv4Addr::from_bits(i)), 0))
.collect();

let rate_limiters: Vec<_> = sockets
.iter()
.map(|_| Arc::new(StreamRateLimiter::new_unstaked()))
.collect();
let connection_table1 = fill_connection_table(&sockets, &rate_limiters, stats.clone());
let connection_table2 = fill_connection_table(&sockets, &rate_limiters, stats);
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();

// Using mutexes here is obviously not ideal,
// but we are looking to prove that refilling is not expensive,
// rather than to get super accurate data
let refiller =
Arc::new(Mutex::new(rt.block_on(async {
Refiller::new(connection_table1, connection_table2).await
})));

c.bench_function(&format!("do_refill_{NUM_CLIENTS}"), |b| {
b.to_async(&rt).iter(|| async {
refiller.lock().await.do_refill(100000).await;
});
});
}

criterion_group!(benches, bench_refiller);

criterion_main!(benches);
5 changes: 4 additions & 1 deletion streamer/examples/swqos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ struct Cli {

#[arg(short, long)]
stake_amounts: String,

#[arg(short, long, default_value_t = 100_000)]
max_tps: u64,
}

// number of threads as in fn default_num_tpu_transaction_forward_receive_threads
Expand All @@ -108,7 +111,6 @@ async fn main() -> anyhow::Result<()> {
);
Arc::new(RwLock::new(nodes))
};

let cancel = CancellationToken::new();
let SpawnNonBlockingServerResult {
endpoints,
Expand All @@ -127,6 +129,7 @@ async fn main() -> anyhow::Result<()> {
SwQosConfig {
max_connections_per_staked_peer: cli.max_connections_per_staked_peer,
max_connections_per_unstaked_peer: cli.max_connections_per_unstaked_peer,
max_streams_per_ms: cli.max_tps / 1000,
..Default::default()
},
cancel.clone(),
Expand Down
2 changes: 1 addition & 1 deletion streamer/src/nonblocking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub mod quic;
pub mod recvmmsg;
pub mod sendmmsg;
pub mod simple_qos;
mod stream_throttle;
pub mod stream_throttle;
pub mod swqos;
#[cfg(feature = "dev-context-only-utils")]
pub mod testing_utilities;
9 changes: 7 additions & 2 deletions streamer/src/nonblocking/qos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ pub(crate) trait ConnectionContext: Clone + Send + Sync {
/// A trait to manage QoS for connections. This includes
/// 1) deriving the ConnectionContext for a connection
/// 2) managing connection caching and connection limits, stream limits
pub(crate) trait QosController<C: ConnectionContext> {
pub(crate) trait QosController<C: ConnectionContext + Send + Sync> {
/// Initialize the controller's async logic (if any)
fn async_init(&mut self) -> impl std::future::Future<Output = ()> + std::marker::Send {
async {}
}

/// Build the ConnectionContext for a connection
fn build_connection_context(&self, connection: &Connection) -> C;

Expand Down Expand Up @@ -57,7 +62,7 @@ pub(crate) trait QosController<C: ConnectionContext> {
}

/// Marker trait to indicate what is the shared state for connections
pub(crate) trait OpaqueStreamerCounter: Send + Sync + 'static {}
pub trait OpaqueStreamerCounter: Send + Sync + 'static {}

#[cfg(test)]
pub(crate) struct NullStreamerCounter;
Expand Down
Loading
Loading