Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make transaction status service multi-threaded. #4032

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 66 additions & 42 deletions rpc/src/transaction_status_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ use {
std::{
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
Arc, Mutex,
},
thread::{self, Builder, JoinHandle},
time::Duration,
},
};

const NUM_TSS_WORKER_THREADS: usize = 4;

pub struct TransactionStatusService {
thread_hdl: JoinHandle<()>,
thread_handles: Vec<JoinHandle<()>>,
}

impl TransactionStatusService {
Expand All @@ -34,47 +36,63 @@ impl TransactionStatusService {
enable_extended_tx_metadata_storage: bool,
exit: Arc<AtomicBool>,
) -> Self {
let thread_hdl = Builder::new()
.name("solTxStatusWrtr".to_string())
.spawn(move || {
info!("TransactionStatusService has started");
loop {
if exit.load(Ordering::Relaxed) {
break;
}

let message = match write_transaction_status_receiver
.recv_timeout(Duration::from_secs(1))
{
Ok(message) => message,
Err(RecvTimeoutError::Disconnected) => {
break;
let mut thread_handles = vec![];
let write_transaction_status_receiver =
Arc::new(Mutex::new(write_transaction_status_receiver));

for worker in 0..NUM_TSS_WORKER_THREADS {
let write_transaction_status_receiver = Arc::clone(&write_transaction_status_receiver);
let max_complete_transaction_status_slot =
Arc::clone(&max_complete_transaction_status_slot);
let transaction_notifier = transaction_notifier.clone();
let blockstore = Arc::clone(&blockstore);
let exit = Arc::clone(&exit);

let thread_hdl = Builder::new()
.name(format!("solTxStatusWrtr-{}", worker))
.spawn(move || {
info!("TransactionStatusService worker {worker} has started");
loop {
if exit.load(Ordering::Relaxed) {
break;
}

let message = {
let tss_receiver = write_transaction_status_receiver.lock().unwrap();
match tss_receiver.recv_timeout(Duration::from_secs(1)) {
Ok(message) => message,
Err(RecvTimeoutError::Disconnected) => {
break;
}
Err(RecvTimeoutError::Timeout) => {
continue;
}
}
};

match Self::write_transaction_status_batch(
message,
&max_complete_transaction_status_slot,
enable_rpc_transaction_history,
transaction_notifier.clone(),
&blockstore,
enable_extended_tx_metadata_storage,
) {
Ok(_) => {}
Err(err) => {
error!("TransactionStatusService worker {worker} stopping due to error: {err}");
exit.store(true, Ordering::Relaxed);
break;
}
}
}
Err(RecvTimeoutError::Timeout) => {
continue;
}
};
info!("TransactionStatusService worker {worker} has stopped");
})
.unwrap();

match Self::write_transaction_status_batch(
message,
&max_complete_transaction_status_slot,
enable_rpc_transaction_history,
transaction_notifier.clone(),
&blockstore,
enable_extended_tx_metadata_storage,
) {
Ok(_) => {}
Err(err) => {
error!("TransactionStatusService stopping due to error: {err}");
exit.store(true, Ordering::Relaxed);
break;
}
}
}
info!("TransactionStatusService has stopped");
})
.unwrap();
Self { thread_hdl }
thread_handles.push(thread_hdl);
}
Self { thread_handles }
}

fn write_transaction_status_batch(
Expand Down Expand Up @@ -219,7 +237,13 @@ impl TransactionStatusService {
}

pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
let mut result = Ok(());
for handle in self.thread_handles {
if let Err(err) = handle.join() {
result = Err(err);
}
}
result
}
}

Expand Down
Loading