diff --git a/Cargo.lock b/Cargo.lock index 6fafcd4fa1f1ff..06008d3aae29fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5809,6 +5809,7 @@ dependencies = [ "log", "rand 0.7.3", "rayon", + "scopeguard", "serial_test", "solana-client", "solana-config-program", diff --git a/local-cluster/Cargo.toml b/local-cluster/Cargo.toml index e7be65f88fb9df..ef87aebfce18f2 100644 --- a/local-cluster/Cargo.toml +++ b/local-cluster/Cargo.toml @@ -16,6 +16,7 @@ itertools = "0.10.5" log = "0.4.17" rand = "0.7.0" rayon = "1.5.3" +scopeguard = "1.1.0" solana-client = { path = "../client", version = "=1.16.0" } solana-config-program = { path = "../programs/config", version = "=1.16.0" } solana-core = { path = "../core", version = "=1.16.0" } diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 51be49d7e163da..43987ec2f68e53 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -345,9 +345,7 @@ impl LocalCluster { (0..config.num_listeners).for_each(|_| { cluster.add_validator_listener( &listener_config, - 0, Arc::new(Keypair::new()), - None, socket_addr_space, ); }); @@ -390,17 +388,15 @@ impl LocalCluster { pub fn add_validator_listener( &mut self, validator_config: &ValidatorConfig, - stake: u64, validator_keypair: Arc, - voting_keypair: Option>, socket_addr_space: SocketAddrSpace, ) -> Pubkey { self.do_add_validator( validator_config, true, - stake, + 0u64, validator_keypair, - voting_keypair, + None, socket_addr_space, ) } diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 4b5178e61fd297..a099ce7b84f798 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -1,10 +1,12 @@ #![allow(clippy::integer_arithmetic)] + use { assert_matches::assert_matches, common::*, - crossbeam_channel::{unbounded, Receiver}, + crossbeam_channel::{unbounded, Receiver, RecvTimeoutError}, gag::BufferRedirect, log::*, + scopeguard::defer, serial_test::serial, solana_client::thin_client::ThinClient, solana_core::{ @@ -30,11 +32,12 @@ use { solana_pubsub_client::pubsub_client::PubsubClient, solana_rpc_client::rpc_client::RpcClient, solana_rpc_client_api::{ + self, config::{ RpcBlockSubscribeConfig, RpcBlockSubscribeFilter, RpcProgramAccountsConfig, RpcSignatureSubscribeConfig, }, - response::RpcSignatureResult, + response::{ProcessedSignatureResult, ReceivedSignatureResult, RpcSignatureResult}, }, solana_runtime::{ commitment::VOTE_THRESHOLD_SIZE, @@ -180,18 +183,34 @@ fn test_spend_and_verify_all_nodes_3() { #[test] #[serial] -#[ignore] fn test_local_cluster_signature_subscribe() { solana_logger::setup_with_default(RUST_LOG_FILTER); + let num_nodes = 2; - let cluster = LocalCluster::new_with_equal_stakes( + let mut cluster = LocalCluster::new_with_equal_stakes( num_nodes, DEFAULT_CLUSTER_LAMPORTS, DEFAULT_NODE_STAKE, SocketAddrSpace::Unspecified, ); + let funding_keypair = cluster.funding_keypair.insecure_clone(); let nodes = cluster.get_node_pubkeys(); + /* + * `RpcSignatureResult::ReceivedSignature` is not sent by a TPU, only by TVUs. But as the + * leader role is rotating, we can not know which validator are we talking to: the TPU or a TVU. + * + * In real setups, RPC subscription is not accepted by validators at all, so we deploy a + * listener, in order to avoid a race condition as well as to make sure that + * `RpcSignatureResult::ReceivedSignature` is indeed properly generated. + */ + let listener_pubkey = cluster.add_validator_listener( + &ValidatorConfig::default_for_test(), + Arc::new(Keypair::new()), + SocketAddrSpace::Unspecified, + ); + let listener_info = cluster.get_contact_info(&listener_pubkey).unwrap(); + // Get non leader let non_bootstrap_id = nodes .into_iter() @@ -209,7 +228,7 @@ fn test_local_cluster_signature_subscribe() { .unwrap(); let mut transaction = system_transaction::transfer( - &cluster.funding_keypair, + &funding_keypair, &solana_sdk::pubkey::new_rand(), 10, blockhash, @@ -218,7 +237,7 @@ fn test_local_cluster_signature_subscribe() { let (mut sig_subscribe_client, receiver) = PubsubClient::signature_subscribe( &format!( "ws://{}", - &non_bootstrap_info.rpc_pubsub().unwrap().to_string() + &listener_info.rpc_pubsub().unwrap().to_string() ), &transaction.signatures[0], Some(RpcSignatureSubscribeConfig { @@ -228,37 +247,70 @@ fn test_local_cluster_signature_subscribe() { ) .unwrap(); + defer! { + // If we don't drop the cluster, the blocking web socket service + // won't return, and the `sig_subscribe_client` won't shut down + drop(cluster); + + if let Err(_err) = sig_subscribe_client.shutdown() { + if std::thread::panicking() { + // Do not hide the original problem that failed the test. + } else { + // `_err` is `Box`, so there is no + // point in printing it. + panic!("`sig_subscribe_client.shutdown() failed"); + } + } + } + tx_client - .retry_transfer(&cluster.funding_keypair, &mut transaction, 5) + .retry_transfer(&funding_keypair, &mut transaction, 5) .unwrap(); + let timeout = Duration::from_secs(60); + let check_start = Instant::now(); + let deadline = check_start + timeout; let mut got_received_notification = false; loop { - let responses: Vec<_> = receiver.try_iter().collect(); - let mut should_break = false; - for response in responses { - match response.value { - RpcSignatureResult::ProcessedSignature(_) => { - should_break = true; - break; - } - RpcSignatureResult::ReceivedSignature(_) => { - got_received_notification = true; - } + let value = match receiver.recv_deadline(deadline) { + Ok(solana_rpc_client_api::response::Response { context: _, value }) => value, + Err(RecvTimeoutError::Timeout) => { + assert!( + got_received_notification, + "No RpcSignatureResult::ReceivedSignature in {:?}", + check_start.elapsed() + ); + panic!( + "No RpcSignatureResult::ProcessedSignature in {:?}", + check_start.elapsed() + ); } - } + Err(RecvTimeoutError::Disconnected) => panic!("Client disconnected"), + }; - if should_break { - break; + use RpcSignatureResult::{ProcessedSignature, ReceivedSignature}; + match value { + ReceivedSignature(ReceivedSignatureResult::ReceivedSignature) => { + assert!( + !got_received_notification, + "Second RpcSignatureResult::ReceivedSignature before \ + RpcSignatureResult::ProcessedSignature" + ); + got_received_notification = true; + } + ProcessedSignature(ProcessedSignatureResult { err: None }) => { + assert!( + got_received_notification, + "No RpcSignatureResult::ReceivedSignature before \ + RpcSignatureResult::ProcessedSignature" + ); + break; + } + ProcessedSignature(ProcessedSignatureResult { err: Some(err) }) => { + panic!("ProcessedSignature() failed: {:?}", err); + } } - sleep(Duration::from_millis(100)); } - - // If we don't drop the cluster, the blocking web socket service - // won't return, and the `sig_subscribe_client` won't shut down - drop(cluster); - sig_subscribe_client.shutdown().unwrap(); - assert!(got_received_notification); } #[test]