Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion banks-server/src/banks_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use {
convert::TryFrom,
io,
net::{Ipv4Addr, SocketAddr},
sync::{Arc, RwLock},
sync::{atomic::AtomicBool, Arc, RwLock},
thread::Builder,
time::Duration,
},
Expand Down Expand Up @@ -433,6 +433,7 @@ pub async fn start_tcp_server(
bank_forks: Arc<RwLock<BankForks>>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
connection_cache: Arc<ConnectionCache>,
exit: Arc<AtomicBool>,
) -> io::Result<()> {
// Note: These settings are copied straight from the tarpc example.
let server = tcp::listen(listen_addr, Bincode::default)
Expand Down Expand Up @@ -460,6 +461,7 @@ pub async fn start_tcp_server(
&connection_cache,
5_000,
0,
exit.clone(),
);

let server = BanksServer::new(
Expand Down
1 change: 1 addition & 0 deletions banks-server/src/rpc_banks_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ async fn start_abortable_tcp_server(
bank_forks.clone(),
block_commitment_cache.clone(),
connection_cache,
exit.clone(),
)
.fuse();
let interval = IntervalStream::new(time::interval(Duration::from_millis(100))).fuse();
Expand Down
3 changes: 3 additions & 0 deletions rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ impl JsonRpcRequestProcessor {
&connection_cache,
1000,
1,
exit.clone(),
);

Self {
Expand Down Expand Up @@ -6429,6 +6430,7 @@ pub mod tests {
&connection_cache,
1000,
1,
exit,
);

let mut bad_transaction = system_transaction::transfer(
Expand Down Expand Up @@ -6697,6 +6699,7 @@ pub mod tests {
&connection_cache,
1000,
1,
exit,
);
assert_eq!(
request_processor.get_block_commitment(0),
Expand Down
7 changes: 6 additions & 1 deletion rpc/src/rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ impl JsonRpcService {
prioritization_fee_cache,
);

let exit = Arc::new(AtomicBool::new(false));
let leader_info =
poh_recorder.map(|recorder| ClusterTpuInfo::new(cluster_info.clone(), recorder));
let _send_transaction_service = Arc::new(SendTransactionService::new_with_config(
Expand All @@ -485,6 +486,7 @@ impl JsonRpcService {
receiver,
&connection_cache,
send_transaction_service_config,
exit.clone(),
));

#[cfg(test)]
Expand Down Expand Up @@ -556,7 +558,10 @@ impl JsonRpcService {
validator_exit
.write()
.unwrap()
.register_exit(Box::new(move || close_handle_.close()));
.register_exit(Box::new(move || {
close_handle_.close();
exit.store(true, Ordering::Relaxed);
}));
Ok(Self {
thread_hdl,
#[cfg(test)]
Expand Down
49 changes: 47 additions & 2 deletions send-transaction-service/src/send_transaction_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ impl SendTransactionService {
connection_cache: &Arc<ConnectionCache>,
retry_rate_ms: u64,
leader_forward_count: u64,
exit: Arc<AtomicBool>,
) -> Self {
let config = Config {
retry_rate_ms,
Expand All @@ -345,6 +346,7 @@ impl SendTransactionService {
receiver,
connection_cache,
config,
exit,
)
}

Expand All @@ -355,14 +357,14 @@ impl SendTransactionService {
receiver: Receiver<TransactionInfo>,
connection_cache: &Arc<ConnectionCache>,
config: Config,
exit: Arc<AtomicBool>,
) -> Self {
let stats_report = Arc::new(SendTransactionServiceStatsReport::default());

let retry_transactions = Arc::new(Mutex::new(HashMap::new()));

let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(leader_info)));

let exit = Arc::new(AtomicBool::new(false));
let receive_txn_thread = Self::receive_txn_thread(
tpu_address,
receiver,
Expand Down Expand Up @@ -776,7 +778,7 @@ mod test {
use {
super::*,
crate::tpu_info::NullTpuInfo,
crossbeam_channel::unbounded,
crossbeam_channel::{bounded, unbounded},
solana_sdk::{
account::AccountSharedData,
genesis_config::create_genesis_config,
Expand Down Expand Up @@ -804,12 +806,55 @@ mod test {
&connection_cache,
1000,
1,
Arc::new(AtomicBool::new(false)),
);

drop(sender);
send_transaction_service.join().unwrap();
}

#[test]
fn validator_exit() {
let tpu_address = "127.0.0.1:0".parse().unwrap();
let bank = Bank::default_for_tests();
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let (sender, receiver) = bounded(0);

let dummy_tx_info = || TransactionInfo {
signature: Signature::default(),
wire_transaction: vec![0; 128],
last_valid_block_height: 0,
durable_nonce_info: None,
max_retries: None,
retries: 0,
last_sent_time: None,
};

let exit = Arc::new(AtomicBool::new(false));
let connection_cache = Arc::new(ConnectionCache::default());
let _send_transaction_service = SendTransactionService::new::<NullTpuInfo>(
tpu_address,
&bank_forks,
None,
receiver,
&connection_cache,
1000,
1,
exit.clone(),
);

sender.send(dummy_tx_info()).unwrap();

thread::spawn(move || {
exit.store(true, Ordering::Relaxed);
});

let mut option = Ok(());
while option.is_ok() {
option = sender.send(dummy_tx_info());
}
Comment on lines +852 to +855
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm guessing we do this 'cause SendTransactionService::join() looks busted and that will be addressed in the alluded to followup?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naw, join() looks fine, actually. Ensuring the receiver was dropped was the best thing I thought of to show that updating exit outside of the service actually causes stuff to stop. join() already sets exit directly. Open to better ideas, though!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...post-merge ;)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out this was wrong. join() also needs to ensure the rpc Server is closed.

}

#[test]
fn process_transactions() {
solana_logger::setup();
Expand Down