Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions local-cluster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ itertools = "0.10.5"
log = "0.4.17"
rand = "0.7.0"
rayon = "1.5.3"
scopeguard = "1.1.0"
solana-client = { path = "../client", version = "=1.16.0" }
solana-config-program = { path = "../programs/config", version = "=1.16.0" }
solana-core = { path = "../core", version = "=1.16.0" }
Expand Down
8 changes: 2 additions & 6 deletions local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,7 @@ impl LocalCluster {
(0..config.num_listeners).for_each(|_| {
cluster.add_validator_listener(
&listener_config,
0,
Arc::new(Keypair::new()),
None,
socket_addr_space,
);
});
Expand Down Expand Up @@ -390,17 +388,15 @@ impl LocalCluster {
pub fn add_validator_listener(
&mut self,
validator_config: &ValidatorConfig,
stake: u64,
validator_keypair: Arc<Keypair>,
voting_keypair: Option<Arc<Keypair>>,
socket_addr_space: SocketAddrSpace,
) -> Pubkey {
self.do_add_validator(
validator_config,
true,
stake,
0u64,
validator_keypair,
voting_keypair,
None,
socket_addr_space,
)
}
Expand Down
108 changes: 80 additions & 28 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#![allow(clippy::integer_arithmetic)]

use {
assert_matches::assert_matches,
common::*,
crossbeam_channel::{unbounded, Receiver},
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError},
gag::BufferRedirect,
log::*,
scopeguard::defer,
serial_test::serial,
solana_client::thin_client::ThinClient,
solana_core::{
Expand All @@ -30,11 +32,12 @@ use {
solana_pubsub_client::pubsub_client::PubsubClient,
solana_rpc_client::rpc_client::RpcClient,
solana_rpc_client_api::{
self,
config::{
RpcBlockSubscribeConfig, RpcBlockSubscribeFilter, RpcProgramAccountsConfig,
RpcSignatureSubscribeConfig,
},
response::RpcSignatureResult,
response::{ProcessedSignatureResult, ReceivedSignatureResult, RpcSignatureResult},
},
solana_runtime::{
commitment::VOTE_THRESHOLD_SIZE,
Expand Down Expand Up @@ -180,18 +183,34 @@ fn test_spend_and_verify_all_nodes_3() {

#[test]
#[serial]
#[ignore]
fn test_local_cluster_signature_subscribe() {
solana_logger::setup_with_default(RUST_LOG_FILTER);

let num_nodes = 2;
let cluster = LocalCluster::new_with_equal_stakes(
let mut cluster = LocalCluster::new_with_equal_stakes(
num_nodes,
DEFAULT_CLUSTER_LAMPORTS,
DEFAULT_NODE_STAKE,
SocketAddrSpace::Unspecified,
);
let funding_keypair = cluster.funding_keypair.insecure_clone();
let nodes = cluster.get_node_pubkeys();

/*
* `RpcSignatureResult::ReceivedSignature` is not sent by a TPU, only by TVUs. But as the
* leader role is rotating, we can not know which validator are we talking to: the TPU or a TVU.
*
* In real setups, RPC subscription is not accepted by validators at all, so we deploy a
* listener, in order to avoid a race condition as well as to make sure that
* `RpcSignatureResult::ReceivedSignature` is indeed properly generated.
*/
let listener_pubkey = cluster.add_validator_listener(
&ValidatorConfig::default_for_test(),
Arc::new(Keypair::new()),
SocketAddrSpace::Unspecified,
);
let listener_info = cluster.get_contact_info(&listener_pubkey).unwrap();

// Get non leader
let non_bootstrap_id = nodes
.into_iter()
Expand All @@ -209,7 +228,7 @@ fn test_local_cluster_signature_subscribe() {
.unwrap();

let mut transaction = system_transaction::transfer(
&cluster.funding_keypair,
&funding_keypair,
&solana_sdk::pubkey::new_rand(),
10,
blockhash,
Expand All @@ -218,7 +237,7 @@ fn test_local_cluster_signature_subscribe() {
let (mut sig_subscribe_client, receiver) = PubsubClient::signature_subscribe(
&format!(
"ws://{}",
&non_bootstrap_info.rpc_pubsub().unwrap().to_string()
&listener_info.rpc_pubsub().unwrap().to_string()
),
&transaction.signatures[0],
Some(RpcSignatureSubscribeConfig {
Expand All @@ -228,37 +247,70 @@ fn test_local_cluster_signature_subscribe() {
)
.unwrap();

defer! {
// If we don't drop the cluster, the blocking web socket service
// won't return, and the `sig_subscribe_client` won't shut down
drop(cluster);

if let Err(_err) = sig_subscribe_client.shutdown() {
if std::thread::panicking() {
// Do not hide the original problem that failed the test.
} else {
// `_err` is `Box<dyn Any + Send + 'static>`, so there is no
// point in printing it.
panic!("`sig_subscribe_client.shutdown() failed");
}
}
}

tx_client
.retry_transfer(&cluster.funding_keypair, &mut transaction, 5)
.retry_transfer(&funding_keypair, &mut transaction, 5)
.unwrap();

let timeout = Duration::from_secs(60);
let check_start = Instant::now();
let deadline = check_start + timeout;
let mut got_received_notification = false;
loop {
let responses: Vec<_> = receiver.try_iter().collect();
let mut should_break = false;
for response in responses {
match response.value {
RpcSignatureResult::ProcessedSignature(_) => {
should_break = true;
break;
}
RpcSignatureResult::ReceivedSignature(_) => {
got_received_notification = true;
}
let value = match receiver.recv_deadline(deadline) {
Ok(solana_rpc_client_api::response::Response { context: _, value }) => value,
Err(RecvTimeoutError::Timeout) => {
assert!(
got_received_notification,
"No RpcSignatureResult::ReceivedSignature in {:?}",
check_start.elapsed()
);
panic!(
"No RpcSignatureResult::ProcessedSignature in {:?}",
check_start.elapsed()
);
}
}
Err(RecvTimeoutError::Disconnected) => panic!("Client disconnected"),
};

if should_break {
break;
use RpcSignatureResult::{ProcessedSignature, ReceivedSignature};
match value {
ReceivedSignature(ReceivedSignatureResult::ReceivedSignature) => {
assert!(
!got_received_notification,
"Second RpcSignatureResult::ReceivedSignature before \
RpcSignatureResult::ProcessedSignature"
);
got_received_notification = true;
}
ProcessedSignature(ProcessedSignatureResult { err: None }) => {
assert!(
got_received_notification,
"No RpcSignatureResult::ReceivedSignature before \
RpcSignatureResult::ProcessedSignature"
);
break;
}
ProcessedSignature(ProcessedSignatureResult { err: Some(err) }) => {
panic!("ProcessedSignature() failed: {:?}", err);
}
}
sleep(Duration::from_millis(100));
}

// If we don't drop the cluster, the blocking web socket service
// won't return, and the `sig_subscribe_client` won't shut down
drop(cluster);
sig_subscribe_client.shutdown().unwrap();
assert!(got_received_notification);
}

#[test]
Expand Down