From 0fc865dd8253c08e9c7bcf73ec4839b3db7317a1 Mon Sep 17 00:00:00 2001 From: DenzelPenzel Date: Fri, 13 Feb 2026 22:00:12 +0000 Subject: [PATCH] statement-store: fix benchmark EMFILE by pooling RPC connections --- .../tests/zombie_ci/statement_store_bench.rs | 87 +++++++++++++++---- 1 file changed, 69 insertions(+), 18 deletions(-) diff --git a/cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store_bench.rs b/cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store_bench.rs index 96a27982f4255..759f6e0076058 100644 --- a/cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store_bench.rs +++ b/cumulus/zombienet/zombienet-sdk/tests/zombie_ci/statement_store_bench.rs @@ -32,6 +32,7 @@ const MESSAGE_SIZE: usize = 512; const MESSAGE_COUNT: usize = 1; const RETRY_DELAY_MS: u64 = 500; const SUBSCRIBE_TIMEOUT_SECS: u64 = 200; +const RPC_POOL_SIZE: usize = 10000; /// Single-node benchmark. /// @@ -58,11 +59,16 @@ async fn statement_store_one_node_bench() -> Result<(), anyhow::Error> { let target_node = collator_names[0]; let node = network.get_node(target_node)?; - info!("Created single RPC client for target node: {}", target_node); + + let mut rpc_pool = Vec::with_capacity(RPC_POOL_SIZE); + for _ in 0..RPC_POOL_SIZE { + rpc_pool.push(node.rpc().await?); + } + info!("Created RPC connection pool with {} connections to {}", RPC_POOL_SIZE, target_node); let mut participants = Vec::with_capacity(PARTICIPANT_SIZE as usize); for i in 0..(PARTICIPANT_SIZE) as usize { - participants.push(Participant::new(i as u32, node.rpc().await?)); + participants.push(Participant::new(i as u32, rpc_pool[i % RPC_POOL_SIZE].clone())); } let handles: Vec<_> = participants @@ -105,17 +111,29 @@ async fn statement_store_many_nodes_bench() -> Result<(), anyhow::Error> { info!("Starting statement store benchmark with {} participants", PARTICIPANT_SIZE); - let mut rpc_clients = Vec::new(); + let mut rpc_pools: Vec> = Vec::new(); + for &name in &collator_names { let node = network.get_node(name)?; - rpc_clients.push(node); + let mut pool = Vec::with_capacity(RPC_POOL_SIZE); + for _ in 0..RPC_POOL_SIZE { + pool.push(node.rpc().await?); + } + rpc_pools.push(pool); } - info!("Created RPC clients for {} collator nodes", rpc_clients.len()); + + info!( + "Created RPC connection pool: {} connections x {} nodes = {} total", + RPC_POOL_SIZE, + collator_names.len(), + RPC_POOL_SIZE * collator_names.len() + ); let mut participants = Vec::with_capacity(PARTICIPANT_SIZE as usize); for i in 0..(PARTICIPANT_SIZE) as usize { - let client_idx = i % collator_names.len(); - participants.push(Participant::new(i as u32, rpc_clients[client_idx].rpc().await?)); + let node_idx = i % collator_names.len(); + let conn_idx = (i / collator_names.len()) % RPC_POOL_SIZE; + participants.push(Participant::new(i as u32, rpc_pools[node_idx][conn_idx].clone())); } info!( "{} participants were distributed across {} nodes: {} participants per node", @@ -170,17 +188,23 @@ async fn statement_store_memory_stress_bench() -> Result<(), anyhow::Error> { let target_node = collator_names[0]; let node = network.get_node(target_node)?; - info!("Created single RPC client for target node: {}", target_node); + let mut rpc_pool = Vec::with_capacity(RPC_POOL_SIZE); + for _ in 0..RPC_POOL_SIZE { + rpc_pool.push(node.rpc().await?); + } + info!("Created RPC connection pool with {} connections to {}", RPC_POOL_SIZE, target_node); let num_collators = collator_names.len() as u64; let propogation_capacity = submit_capacity * (num_collators - 1); // 5x per node let start_time = std::time::Instant::now(); - info!("Starting memory stress benchmark with {} tasks, each submitting {} statements of {}B payload, total submit capacity per node: {}, total propagation capacity: {}", - total_tasks, statements_per_task, payload_size, submit_capacity, propogation_capacity); + info!( + "Starting memory stress benchmark with {} tasks, each submitting {} statements of {}B payload, total submit capacity per node: {}, total propagation capacity: {}", + total_tasks, statements_per_task, payload_size, submit_capacity, propogation_capacity + ); for idx in 0..total_tasks { - let rpc_client = node.rpc().await?; + let rpc_client = rpc_pool[idx as usize % RPC_POOL_SIZE].clone(); tokio::spawn(async move { let keyring = get_keypair(idx); let public = keyring.public().0; @@ -208,7 +232,10 @@ async fn statement_store_memory_stress_bench() -> Result<(), anyhow::Error> { }; if err.to_string().contains("Statement store error: Store is full") { - info!("Statement store is full, {}/{} statements submitted, `statements_per_task` overestimated", statement_count, statements_per_task); + info!( + "Statement store is full, {}/{} statements submitted, `statements_per_task` overestimated", + statement_count, statements_per_task + ); break; } @@ -307,7 +334,10 @@ async fn statement_store_memory_stress_bench() -> Result<(), anyhow::Error> { let total_submitted: u64 = submitted_metrics.iter().map(|(_, count, _)| *count).sum(); if total_submitted == submit_capacity * num_collators { - info!("Reached total submit capacity of {} statements per node in {}s, benchmark completed successfully", submit_capacity, elapsed); + info!( + "Reached total submit capacity of {} statements per node in {}s, benchmark completed successfully", + submit_capacity, elapsed + ); break; } } @@ -377,6 +407,9 @@ pub async fn spawn_network( .map_err(|e| anyhow!("Failed to create base directory: {}", e))?; let chain_spec_path = create_chain_spec_with_allowances(participant_count, &base_dir)?; + // Headroom for the ~5,000 subscriptions that + // actually end up on each pooled conn (500 participants * 10 subscriptions each) + let max_subs_per_conn = PARTICIPANT_SIZE / RPC_POOL_SIZE as u32 * 16; let config = NetworkConfigBuilder::new() .with_relaychain(|r| { @@ -399,6 +432,9 @@ pub async fn spawn_network( "-linfo,statement-store=info,statement-gossip=info".into(), "--enable-statement-store".into(), format!("--rpc-max-connections={}", PARTICIPANT_SIZE + 1000).as_str().into(), + format!("--rpc-max-subscriptions-per-connection={max_subs_per_conn}") + .as_str() + .into(), ]) // Have to set outside of the loop below, so that `p` has the right type. .with_collator(|n| n.with_name(collators[0])); @@ -863,11 +899,23 @@ async fn statement_store_latency_bench() -> Result<(), anyhow::Error> { } info!(""); - let mut rpc_clients = Vec::new(); + let clients_per_node = config.num_clients as usize / config.num_nodes; + let pool_size_per_node = RPC_POOL_SIZE.min(clients_per_node); + let mut rpc_pools: Vec> = Vec::new(); for &name in &collator_names { let node = network.get_node(name)?; - rpc_clients.push(node); + let mut pool = Vec::with_capacity(pool_size_per_node); + for _ in 0..pool_size_per_node { + pool.push(node.rpc().await?); + } + rpc_pools.push(pool); } + info!( + "Created RPC connection pool: {} connections x {} nodes = {} total", + pool_size_per_node, + collator_names.len(), + pool_size_per_node * collator_names.len() + ); let barrier = Arc::new(Barrier::new(config.num_clients as usize)); let sync_start = std::time::Instant::now(); @@ -884,7 +932,8 @@ async fn statement_store_latency_bench() -> Result<(), anyhow::Error> { let barrier = Arc::clone(&barrier); let keyring = get_keypair(client_id); let node_idx = (client_id as usize) % config.num_nodes; - let rpc_node = rpc_clients[node_idx].clone(); + let conn_idx = (client_id as usize / config.num_nodes) % pool_size_per_node; + let rpc_client = rpc_pools[node_idx][conn_idx].clone(); let neighbour_id = (client_id + 1) % config.num_clients; let neighbour_node_idx = (neighbour_id as usize) % config.num_nodes; if node_idx == neighbour_node_idx && config.num_nodes > 1 { @@ -895,7 +944,6 @@ async fn statement_store_latency_bench() -> Result<(), anyhow::Error> { tokio::spawn(async move { barrier.wait().await; - let rpc_client = rpc_node.rpc().await?; if client_id == 0 { let sync_time = sync_start.elapsed(); @@ -940,7 +988,10 @@ async fn statement_store_latency_bench() -> Result<(), anyhow::Error> { .map_err(|e| { anyhow!( "Client {}: Failed to subscribe for message {} from neighbour {}: {}", - client_id, msg_idx, neighbour_id, e + client_id, + msg_idx, + neighbour_id, + e ) })?; subscriptions.push((msg_idx, topic_str, subscription));