Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 38 additions & 15 deletions local-cluster/src/cluster_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
/// discover the rest of the network.
use log::*;
use {
crate::cluster::QuicTpuClient,
crate::{cluster::QuicTpuClient, local_cluster::LocalCluster},
rand::{thread_rng, Rng},
rayon::{prelude::*, ThreadPool},
solana_client::connection_cache::{ConnectionCache, Protocol},
Expand Down Expand Up @@ -101,12 +101,17 @@ pub fn spend_and_verify_all_nodes<S: ::std::hash::BuildHasher + Sync + Send>(
.rpc_client()
.get_latest_blockhash_with_commitment(CommitmentConfig::confirmed())
.unwrap();
let transaction =
let mut transaction =
system_transaction::transfer(funding_keypair, &random_keypair.pubkey(), 1, blockhash);
let confs = VOTE_THRESHOLD_DEPTH + 1;
client
.send_transaction_to_upcoming_leaders(&transaction)
.unwrap();
LocalCluster::send_transaction_with_retries(
&client,
&[funding_keypair],
&mut transaction,
10,
confs,
)
.unwrap();
for validator in &cluster_nodes {
if ignore_nodes.contains(validator.pubkey()) {
continue;
Expand Down Expand Up @@ -160,16 +165,21 @@ pub fn send_many_transactions(
.unwrap();
let transfer_amount = thread_rng().gen_range(1..max_tokens_per_transfer);

let transaction = system_transaction::transfer(
let mut transaction = system_transaction::transfer(
funding_keypair,
&random_keypair.pubkey(),
transfer_amount,
blockhash,
);

client
.send_transaction_to_upcoming_leaders(&transaction)
.unwrap();
LocalCluster::send_transaction_with_retries(
&client,
&[funding_keypair],
&mut transaction,
5,
0,
)
.unwrap();

expected_balances.insert(random_keypair.pubkey(), transfer_amount);
}
Expand Down Expand Up @@ -292,24 +302,37 @@ pub fn kill_entry_and_spend_and_verify_rest(
.rpc_client()
.get_latest_blockhash_with_commitment(CommitmentConfig::processed())
.unwrap();
let transaction = system_transaction::transfer(
let mut transaction = system_transaction::transfer(
funding_keypair,
&random_keypair.pubkey(),
1,
blockhash,
);

let confs = VOTE_THRESHOLD_DEPTH + 1;
if let Err(e) = client.send_transaction_to_upcoming_leaders(&transaction) {
result = Err(e);
continue;
}
let sig = {
let sig = LocalCluster::send_transaction_with_retries(
&client,
&[funding_keypair],
&mut transaction,
5,
confs,
);
match sig {
Err(e) => {
result = Err(e);
continue;
}

Ok(sig) => sig,
}
};
info!("poll_all_nodes_for_signature()");
match poll_all_nodes_for_signature(
entry_point_info,
&cluster_nodes,
connection_cache,
&transaction.signatures[0],
&sig,
confs,
) {
Err(e) => {
Expand Down
85 changes: 72 additions & 13 deletions local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,23 @@ use {
},
solana_sdk::{
account::{Account, AccountSharedData},
clock::{Slot, DEFAULT_DEV_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT},
clock::{Slot, DEFAULT_DEV_SLOTS_PER_EPOCH, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE},
commitment_config::CommitmentConfig,
epoch_schedule::EpochSchedule,
feature_set,
genesis_config::{ClusterType, GenesisConfig},
message::Message,
poh_config::PohConfig,
pubkey::Pubkey,
signature::{Keypair, Signer},
signature::{Keypair, Signature, Signer},
signers::Signers,
stake::{
instruction as stake_instruction,
state::{Authorized, Lockup},
},
system_transaction,
transaction::Transaction,
transport::TransportError,
},
solana_stake_program::stake_state,
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
Expand All @@ -61,6 +63,7 @@ use {
net::{IpAddr, Ipv4Addr, UdpSocket},
path::{Path, PathBuf},
sync::{Arc, RwLock},
time::Instant,
},
};

Expand Down Expand Up @@ -665,6 +668,53 @@ impl LocalCluster {
info!("{} done waiting for roots", test_name);
}

/// Attempt to send and confirm tx "attempts" times
/// Wait for signature confirmation before returning
/// Return the transaction signature
pub fn send_transaction_with_retries<T: Signers + ?Sized>(
client: &QuicTpuClient,
keypairs: &T,
transaction: &mut Transaction,
attempts: usize,
pending_confirmations: usize,
) -> std::result::Result<Signature, TransportError> {
for attempt in 0..attempts {
let now = Instant::now();
let mut num_confirmed = 0;
let mut wait_time = MAX_PROCESSING_AGE;

while now.elapsed().as_secs() < wait_time as u64 {
if num_confirmed == 0 {
client.send_transaction_to_upcoming_leaders(transaction)?;
}

if let Ok(confirmed_blocks) = client.rpc_client().poll_for_signature_confirmation(
&transaction.signatures[0],
pending_confirmations,
) {
num_confirmed = confirmed_blocks;
if confirmed_blocks >= pending_confirmations {
return Ok(transaction.signatures[0]);
}
// Since network has seen the transaction, wait longer to receive
// all pending confirmations. Resending the transaction could result into
// extra transaction fees
wait_time = wait_time.max(
MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed),
);
}
}
info!("{attempt} tries failed transfer");
let blockhash = client.rpc_client().get_latest_blockhash()?;
transaction.sign(keypairs, blockhash);
}
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"failed to confirm transaction".to_string(),
)
.into())
}
Comment on lines +674 to +716
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part is similar to thin_client and I think it is closer to the original behavior which before we switched to QuicTpuClient.

pub fn send_and_confirm_transaction<T: Signers + ?Sized>(
&self,
keypairs: &T,
transaction: &mut Transaction,
tries: usize,
pending_confirmations: usize,
) -> TransportResult<Signature> {
for x in 0..tries {
let now = Instant::now();
let mut num_confirmed = 0;
let mut wait_time = MAX_PROCESSING_AGE;
// resend the same transaction until the transaction has no chance of succeeding
let wire_transaction =
bincode::serialize(&transaction).expect("transaction serialization failed");
while now.elapsed().as_secs() < wait_time as u64 {
if num_confirmed == 0 {
let conn = self.connection_cache.get_connection(self.tpu_addr());
// Send the transaction if there has been no confirmation (e.g. the first time)
#[allow(clippy::needless_borrow)]
conn.send_data(&wire_transaction)?;
}
if let Ok(confirmed_blocks) = self.poll_for_signature_confirmation(
&transaction.signatures[0],
pending_confirmations,
) {
num_confirmed = confirmed_blocks;
if confirmed_blocks >= pending_confirmations {
return Ok(transaction.signatures[0]);
}
// Since network has seen the transaction, wait longer to receive
// all pending confirmations. Resending the transaction could result into
// extra transaction fees
wait_time = wait_time.max(
MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed),
);
}
}
info!("{} tries failed transfer to {}", x, self.tpu_addr());
let blockhash = self.get_latest_blockhash()?;
transaction.sign(keypairs, blockhash);
}
Err(io::Error::new(
io::ErrorKind::Other,
format!("retry_transfer failed in {tries} retries"),
)
.into())
}

I'm a little bit confused why it compares MAX_PROCESSING_AGE with now.elapsed().as_secs() but I think we can update this one in different PR!

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid concern. The idea of the original code is that if we waited longer than MAX_PROCESSING_AGE it means that the blockhash has expired so we must update blockhash and resign which might a desired behavior for production applications.
Yet I'm concerned about using this code for integration tests since in the worst case scenario we can wait for minutes for transaction to be delivered, not sure if it worth it or better to accept failure if we cannot send within several leader slots (opposite to hundreds).

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think yihau's point was that MAX_PROCESSING_AGE has a unit of slots, not seconds. So the code waits more than twice as long as necessary.


fn transfer_with_client(
client: &QuicTpuClient,
source_keypair: &Keypair,
Expand All @@ -676,16 +726,15 @@ impl LocalCluster {
.rpc_client()
.get_latest_blockhash_with_commitment(CommitmentConfig::processed())
.unwrap();
let tx = system_transaction::transfer(source_keypair, dest_pubkey, lamports, blockhash);
let mut tx = system_transaction::transfer(source_keypair, dest_pubkey, lamports, blockhash);
info!(
"executing transfer of {} from {} to {}",
lamports,
source_keypair.pubkey(),
*dest_pubkey
);

client
.send_transaction_to_upcoming_leaders(&tx)
LocalCluster::send_transaction_with_retries(client, &[source_keypair], &mut tx, 10, 0)
.expect("client transfer should succeed");
client
.rpc_client()
Expand Down Expand Up @@ -749,7 +798,7 @@ impl LocalCluster {
},
);
let message = Message::new(&instructions, Some(&from_account.pubkey()));
let transaction = Transaction::new(
let mut transaction = Transaction::new(
&[from_account.as_ref(), vote_account],
message,
client
Expand All @@ -758,9 +807,14 @@ impl LocalCluster {
.unwrap()
.0,
);
client
.send_transaction_to_upcoming_leaders(&transaction)
.expect("fund vote");
LocalCluster::send_transaction_with_retries(
client,
&[from_account],
&mut transaction,
10,
0,
)
.expect("should fund vote");
client
.rpc_client()
.wait_for_balance_with_commitment(
Expand All @@ -779,7 +833,7 @@ impl LocalCluster {
amount,
);
let message = Message::new(&instructions, Some(&from_account.pubkey()));
let transaction = Transaction::new(
let mut transaction = Transaction::new(
&[from_account.as_ref(), &stake_account_keypair],
message,
client
Expand All @@ -789,9 +843,14 @@ impl LocalCluster {
.0,
);

client
.send_transaction_to_upcoming_leaders(&transaction)
.expect("delegate stake");
LocalCluster::send_transaction_with_retries(
client,
&[from_account.as_ref(), &stake_account_keypair],
&mut transaction,
5,
0,
)
.expect("should delegate stake");
client
.rpc_client()
.wait_for_balance_with_commitment(
Expand Down
27 changes: 19 additions & 8 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ fn test_local_cluster_signature_subscribe() {
.get_latest_blockhash_with_commitment(CommitmentConfig::processed())
.unwrap();

let transaction = system_transaction::transfer(
let mut transaction = system_transaction::transfer(
&cluster.funding_keypair,
&solana_sdk::pubkey::new_rand(),
10,
Expand All @@ -239,9 +239,14 @@ fn test_local_cluster_signature_subscribe() {
)
.unwrap();

tx_client
.send_transaction_to_upcoming_leaders(&transaction)
.unwrap();
LocalCluster::send_transaction_with_retries(
&tx_client,
&[&cluster.funding_keypair],
&mut transaction,
5,
0,
)
.unwrap();

let mut got_received_notification = false;
loop {
Expand Down Expand Up @@ -2669,6 +2674,7 @@ fn test_oc_bad_signatures() {

// 3) Start up a spy to listen for and push votes to leader TPU
let client = cluster.build_tpu_quic_client().unwrap();
let cluster_funding_keypair = cluster.funding_keypair.insecure_clone();
let voter_thread_sleep_ms: usize = 100;
let num_votes_simulated = Arc::new(AtomicUsize::new(0));
let gossip_voter = cluster_tests::start_gossip_voter(
Expand Down Expand Up @@ -2703,7 +2709,7 @@ fn test_oc_bad_signatures() {
let vote_slots: Vec<Slot> = vec![vote_slot];

let bad_authorized_signer_keypair = Keypair::new();
let vote_tx = vote_transaction::new_vote_transaction(
let mut vote_tx = vote_transaction::new_vote_transaction(
vote_slots,
vote_hash,
leader_vote_tx.message.recent_blockhash,
Expand All @@ -2713,9 +2719,14 @@ fn test_oc_bad_signatures() {
&bad_authorized_signer_keypair,
None,
);
client
.send_transaction_to_upcoming_leaders(&vote_tx)
.unwrap();
LocalCluster::send_transaction_with_retries(
&client,
&[&cluster_funding_keypair],
&mut vote_tx,
5,
0,
)
.unwrap();

num_votes_simulated.fetch_add(1, Ordering::Relaxed);
}
Expand Down