Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const MESSAGE_SIZE: usize = 512;
const MESSAGE_COUNT: usize = 1;
const RETRY_DELAY_MS: u64 = 500;
const SUBSCRIBE_TIMEOUT_SECS: u64 = 200;
const RPC_POOL_SIZE: usize = 10000;

/// Single-node benchmark.
///
Expand All @@ -58,11 +59,16 @@ async fn statement_store_one_node_bench() -> Result<(), anyhow::Error> {

let target_node = collator_names[0];
let node = network.get_node(target_node)?;
info!("Created single RPC client for target node: {}", target_node);

let mut rpc_pool = Vec::with_capacity(RPC_POOL_SIZE);
for _ in 0..RPC_POOL_SIZE {
rpc_pool.push(node.rpc().await?);
}
info!("Created RPC connection pool with {} connections to {}", RPC_POOL_SIZE, target_node);

let mut participants = Vec::with_capacity(PARTICIPANT_SIZE as usize);
for i in 0..(PARTICIPANT_SIZE) as usize {
participants.push(Participant::new(i as u32, node.rpc().await?));
participants.push(Participant::new(i as u32, rpc_pool[i % RPC_POOL_SIZE].clone()));
}

let handles: Vec<_> = participants
Expand Down Expand Up @@ -105,17 +111,29 @@ async fn statement_store_many_nodes_bench() -> Result<(), anyhow::Error> {

info!("Starting statement store benchmark with {} participants", PARTICIPANT_SIZE);

let mut rpc_clients = Vec::new();
let mut rpc_pools: Vec<Vec<RpcClient>> = Vec::new();

for &name in &collator_names {
let node = network.get_node(name)?;
rpc_clients.push(node);
let mut pool = Vec::with_capacity(RPC_POOL_SIZE);
for _ in 0..RPC_POOL_SIZE {
pool.push(node.rpc().await?);
}
rpc_pools.push(pool);
}
info!("Created RPC clients for {} collator nodes", rpc_clients.len());

info!(
"Created RPC connection pool: {} connections x {} nodes = {} total",
RPC_POOL_SIZE,
collator_names.len(),
RPC_POOL_SIZE * collator_names.len()
);

let mut participants = Vec::with_capacity(PARTICIPANT_SIZE as usize);
for i in 0..(PARTICIPANT_SIZE) as usize {
let client_idx = i % collator_names.len();
participants.push(Participant::new(i as u32, rpc_clients[client_idx].rpc().await?));
let node_idx = i % collator_names.len();
let conn_idx = (i / collator_names.len()) % RPC_POOL_SIZE;
participants.push(Participant::new(i as u32, rpc_pools[node_idx][conn_idx].clone()));
}
info!(
"{} participants were distributed across {} nodes: {} participants per node",
Expand Down Expand Up @@ -170,17 +188,23 @@ async fn statement_store_memory_stress_bench() -> Result<(), anyhow::Error> {

let target_node = collator_names[0];
let node = network.get_node(target_node)?;
info!("Created single RPC client for target node: {}", target_node);
let mut rpc_pool = Vec::with_capacity(RPC_POOL_SIZE);
for _ in 0..RPC_POOL_SIZE {
rpc_pool.push(node.rpc().await?);
}
info!("Created RPC connection pool with {} connections to {}", RPC_POOL_SIZE, target_node);

let num_collators = collator_names.len() as u64;
let propogation_capacity = submit_capacity * (num_collators - 1); // 5x per node
let start_time = std::time::Instant::now();

info!("Starting memory stress benchmark with {} tasks, each submitting {} statements of {}B payload, total submit capacity per node: {}, total propagation capacity: {}",
total_tasks, statements_per_task, payload_size, submit_capacity, propogation_capacity);
info!(
"Starting memory stress benchmark with {} tasks, each submitting {} statements of {}B payload, total submit capacity per node: {}, total propagation capacity: {}",
total_tasks, statements_per_task, payload_size, submit_capacity, propogation_capacity
);

for idx in 0..total_tasks {
let rpc_client = node.rpc().await?;
let rpc_client = rpc_pool[idx as usize % RPC_POOL_SIZE].clone();
tokio::spawn(async move {
let keyring = get_keypair(idx);
let public = keyring.public().0;
Expand Down Expand Up @@ -208,7 +232,10 @@ async fn statement_store_memory_stress_bench() -> Result<(), anyhow::Error> {
};

if err.to_string().contains("Statement store error: Store is full") {
info!("Statement store is full, {}/{} statements submitted, `statements_per_task` overestimated", statement_count, statements_per_task);
info!(
"Statement store is full, {}/{} statements submitted, `statements_per_task` overestimated",
statement_count, statements_per_task
);
break;
}

Expand Down Expand Up @@ -307,7 +334,10 @@ async fn statement_store_memory_stress_bench() -> Result<(), anyhow::Error> {

let total_submitted: u64 = submitted_metrics.iter().map(|(_, count, _)| *count).sum();
if total_submitted == submit_capacity * num_collators {
info!("Reached total submit capacity of {} statements per node in {}s, benchmark completed successfully", submit_capacity, elapsed);
info!(
"Reached total submit capacity of {} statements per node in {}s, benchmark completed successfully",
submit_capacity, elapsed
);
break;
}
}
Expand Down Expand Up @@ -377,6 +407,9 @@ pub async fn spawn_network(
.map_err(|e| anyhow!("Failed to create base directory: {}", e))?;

let chain_spec_path = create_chain_spec_with_allowances(participant_count, &base_dir)?;
// Headroom for the ~5,000 subscriptions that
// actually end up on each pooled conn (500 participants * 10 subscriptions each)
let max_subs_per_conn = PARTICIPANT_SIZE / RPC_POOL_SIZE as u32 * 16;

let config = NetworkConfigBuilder::new()
.with_relaychain(|r| {
Expand All @@ -399,6 +432,9 @@ pub async fn spawn_network(
"-linfo,statement-store=info,statement-gossip=info".into(),
"--enable-statement-store".into(),
format!("--rpc-max-connections={}", PARTICIPANT_SIZE + 1000).as_str().into(),
format!("--rpc-max-subscriptions-per-connection={max_subs_per_conn}")
.as_str()
.into(),
])
// Have to set outside of the loop below, so that `p` has the right type.
.with_collator(|n| n.with_name(collators[0]));
Expand Down Expand Up @@ -863,11 +899,23 @@ async fn statement_store_latency_bench() -> Result<(), anyhow::Error> {
}
info!("");

let mut rpc_clients = Vec::new();
let clients_per_node = config.num_clients as usize / config.num_nodes;
let pool_size_per_node = RPC_POOL_SIZE.min(clients_per_node);
let mut rpc_pools: Vec<Vec<RpcClient>> = Vec::new();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check that running a test with 1000 submitters we don't create 10_000*nodes rpc clients

for &name in &collator_names {
let node = network.get_node(name)?;
rpc_clients.push(node);
let mut pool = Vec::with_capacity(pool_size_per_node);
for _ in 0..pool_size_per_node {
pool.push(node.rpc().await?);
}
rpc_pools.push(pool);
}
info!(
"Created RPC connection pool: {} connections x {} nodes = {} total",
pool_size_per_node,
collator_names.len(),
pool_size_per_node * collator_names.len()
);

let barrier = Arc::new(Barrier::new(config.num_clients as usize));
let sync_start = std::time::Instant::now();
Expand All @@ -884,7 +932,8 @@ async fn statement_store_latency_bench() -> Result<(), anyhow::Error> {
let barrier = Arc::clone(&barrier);
let keyring = get_keypair(client_id);
let node_idx = (client_id as usize) % config.num_nodes;
let rpc_node = rpc_clients[node_idx].clone();
let conn_idx = (client_id as usize / config.num_nodes) % pool_size_per_node;
let rpc_client = rpc_pools[node_idx][conn_idx].clone();
let neighbour_id = (client_id + 1) % config.num_clients;
let neighbour_node_idx = (neighbour_id as usize) % config.num_nodes;
if node_idx == neighbour_node_idx && config.num_nodes > 1 {
Expand All @@ -895,7 +944,6 @@ async fn statement_store_latency_bench() -> Result<(), anyhow::Error> {

tokio::spawn(async move {
barrier.wait().await;
let rpc_client = rpc_node.rpc().await?;

if client_id == 0 {
let sync_time = sync_start.elapsed();
Expand Down Expand Up @@ -940,7 +988,10 @@ async fn statement_store_latency_bench() -> Result<(), anyhow::Error> {
.map_err(|e| {
anyhow!(
"Client {}: Failed to subscribe for message {} from neighbour {}: {}",
client_id, msg_idx, neighbour_id, e
client_id,
msg_idx,
neighbour_id,
e
)
})?;
subscriptions.push((msg_idx, topic_str, subscription));
Expand Down
Loading