Skip to content

Commit d89d53a

Browse files
authored
Reapply PR#31: optimize retry pool (anza-xyz#5113)
This PR fixes three subtle problems with SendTransactionService: * do not insert transactions with zero max_retries to the retry pool * remove transactions reached max_retries in the same iteration of the loop * dynamically select sleep time between iterations based on last_sent_time in TransactionInfo
1 parent 49fb512 commit d89d53a

File tree

1 file changed

+84
-43
lines changed

1 file changed

+84
-43
lines changed

send-transaction-service/src/send_transaction_service.rs

+84-43
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,7 @@ use {
2121
hash::Hash, nonce_account, pubkey::Pubkey, saturating_add_assign, signature::Signature,
2222
},
2323
std::{
24-
collections::{
25-
hash_map::{Entry, HashMap},
26-
HashSet,
27-
},
24+
collections::hash_map::{Entry, HashMap},
2825
net::SocketAddr,
2926
sync::{
3027
atomic::{AtomicBool, Ordering},
@@ -99,6 +96,16 @@ impl TransactionInfo {
9996
last_sent_time,
10097
}
10198
}
99+
100+
fn get_max_retries(
101+
&self,
102+
default_max_retries: Option<usize>,
103+
service_max_retries: usize,
104+
) -> Option<usize> {
105+
self.max_retries
106+
.or(default_max_retries)
107+
.map(|max_retries| max_retries.min(service_max_retries))
108+
}
102109
}
103110

104111
#[derive(Default, Debug, PartialEq, Eq)]
@@ -109,6 +116,7 @@ struct ProcessTransactionsResult {
109116
max_retries_elapsed: u64,
110117
failed: u64,
111118
retained: u64,
119+
last_sent_time: Option<Instant>,
112120
}
113121

114122
#[derive(Clone, Debug)]
@@ -239,6 +247,8 @@ impl SendTransactionService {
239247
batch_send_rate_ms,
240248
batch_size,
241249
retry_pool_max_size,
250+
default_max_retries,
251+
service_max_retries,
242252
..
243253
}: Config,
244254
stats_report: Arc<SendTransactionServiceStatsReport>,
@@ -301,9 +311,17 @@ impl SendTransactionService {
301311
{
302312
// take a lock of retry_transactions and move the batch to the retry set.
303313
let mut retry_transactions = retry_transactions.lock().unwrap();
304-
let transactions_to_retry = transactions.len();
314+
let mut transactions_to_retry: usize = 0;
305315
let mut transactions_added_to_retry: usize = 0;
306316
for (signature, mut transaction_info) in transactions.drain() {
317+
// drop transactions with 0 max retries
318+
let max_retries = transaction_info
319+
.get_max_retries(default_max_retries, service_max_retries);
320+
if max_retries == Some(0) {
321+
continue;
322+
}
323+
transactions_to_retry += 1;
324+
307325
let retry_len = retry_transactions.len();
308326
let entry = retry_transactions.entry(signature);
309327
if let Entry::Vacant(_) = entry {
@@ -342,19 +360,20 @@ impl SendTransactionService {
342360
exit: Arc<AtomicBool>,
343361
) -> JoinHandle<()> {
344362
debug!("Starting send-transaction-service::retry_thread.");
363+
let retry_interval_ms_default = MAX_RETRY_SLEEP_MS.min(config.retry_rate_ms);
364+
let mut retry_interval_ms = retry_interval_ms_default;
345365
Builder::new()
346366
.name("solStxRetry".to_string())
347367
.spawn(move || loop {
348-
let retry_interval_ms = config.retry_rate_ms;
349-
let stats = &stats_report.stats;
350-
sleep(Duration::from_millis(
351-
MAX_RETRY_SLEEP_MS.min(retry_interval_ms),
352-
));
368+
sleep(Duration::from_millis(retry_interval_ms));
353369
if exit.load(Ordering::Relaxed) {
354370
break;
355371
}
356372
let mut transactions = retry_transactions.lock().unwrap();
357-
if !transactions.is_empty() {
373+
if transactions.is_empty() {
374+
retry_interval_ms = retry_interval_ms_default;
375+
} else {
376+
let stats = &stats_report.stats;
358377
stats
359378
.retry_queue_size
360379
.store(transactions.len() as u64, Ordering::Relaxed);
@@ -363,7 +382,7 @@ impl SendTransactionService {
363382
(bank_forks.root_bank(), bank_forks.working_bank())
364383
};
365384

366-
let _result = Self::process_transactions(
385+
let result = Self::process_transactions(
367386
&working_bank,
368387
&root_bank,
369388
&mut transactions,
@@ -372,6 +391,17 @@ impl SendTransactionService {
372391
stats,
373392
);
374393
stats_report.report();
394+
395+
// Adjust retry interval taking into account the time since the last send.
396+
retry_interval_ms = retry_interval_ms_default
397+
.checked_sub(
398+
result
399+
.last_sent_time
400+
.and_then(|last| Instant::now().checked_duration_since(last))
401+
.and_then(|interval| interval.as_millis().try_into().ok())
402+
.unwrap_or(0),
403+
)
404+
.unwrap_or(retry_interval_ms_default);
375405
}
376406
})
377407
.unwrap()
@@ -394,7 +424,8 @@ impl SendTransactionService {
394424
) -> ProcessTransactionsResult {
395425
let mut result = ProcessTransactionsResult::default();
396426

397-
let mut batched_transactions = HashSet::new();
427+
let mut batched_transactions = Vec::new();
428+
let mut exceeded_retries_transactions = Vec::new();
398429
let retry_rate = Duration::from_millis(retry_rate_ms);
399430

400431
transactions.retain(|signature, transaction_info| {
@@ -413,7 +444,8 @@ impl SendTransactionService {
413444
let now = Instant::now();
414445
let expired = transaction_info
415446
.last_sent_time
416-
.map(|last| now.duration_since(last) >= retry_rate)
447+
.and_then(|last| now.checked_duration_since(last))
448+
.map(|elapsed| elapsed >= retry_rate)
417449
.unwrap_or(false);
418450
let verify_nonce_account =
419451
nonce_account::verify_nonce_account(&nonce_account, &durable_nonce);
@@ -431,10 +463,8 @@ impl SendTransactionService {
431463
return false;
432464
}
433465

434-
let max_retries = transaction_info
435-
.max_retries
436-
.or(default_max_retries)
437-
.map(|max_retries| max_retries.min(service_max_retries));
466+
let max_retries =
467+
transaction_info.get_max_retries(default_max_retries, service_max_retries);
438468

439469
if let Some(max_retries) = max_retries {
440470
if transaction_info.retries >= max_retries {
@@ -452,21 +482,36 @@ impl SendTransactionService {
452482
let now = Instant::now();
453483
let need_send = transaction_info
454484
.last_sent_time
455-
.map(|last| now.duration_since(last) >= retry_rate)
485+
.and_then(|last| now.checked_duration_since(last))
486+
.map(|elapsed| elapsed >= retry_rate)
456487
.unwrap_or(true);
457488
if need_send {
458489
if transaction_info.last_sent_time.is_some() {
459490
// Transaction sent before is unknown to the working bank, it might have been
460-
// dropped or landed in another fork. Re-send it
491+
// dropped or landed in another fork. Re-send it.
461492

462493
info!("Retrying transaction: {}", signature);
463494
result.retried += 1;
464495
transaction_info.retries += 1;
465-
stats.retries.fetch_add(1, Ordering::Relaxed);
466496
}
467497

468-
batched_transactions.insert(*signature);
498+
batched_transactions.push(*signature);
469499
transaction_info.last_sent_time = Some(now);
500+
501+
let max_retries = transaction_info
502+
.get_max_retries(default_max_retries, service_max_retries);
503+
if let Some(max_retries) = max_retries {
504+
if transaction_info.retries >= max_retries {
505+
exceeded_retries_transactions.push(*signature);
506+
}
507+
}
508+
} else if let Some(last) = transaction_info.last_sent_time {
509+
result.last_sent_time = Some(
510+
result
511+
.last_sent_time
512+
.map(|result_last| result_last.min(last))
513+
.unwrap_or(last),
514+
);
470515
}
471516
true
472517
}
@@ -484,19 +529,31 @@ impl SendTransactionService {
484529
}
485530
});
486531

532+
stats.retries.fetch_add(result.retried, Ordering::Relaxed);
533+
487534
if !batched_transactions.is_empty() {
488535
// Processing the transactions in batch
489-
let wire_transactions = transactions
536+
let wire_transactions = batched_transactions
490537
.iter()
491-
.filter(|(signature, _)| batched_transactions.contains(signature))
492-
.map(|(_, transaction_info)| transaction_info.wire_transaction.clone());
538+
.filter_map(|signature| transactions.get(signature))
539+
.map(|transaction_info| transaction_info.wire_transaction.clone());
493540

494541
let iter = wire_transactions.chunks(batch_size);
495542
for chunk in &iter {
496543
let chunk = chunk.collect();
497544
client.send_transactions_in_batch(chunk, stats);
498545
}
499546
}
547+
548+
result.max_retries_elapsed += exceeded_retries_transactions.len() as u64;
549+
stats
550+
.transactions_exceeding_max_retries
551+
.fetch_add(result.max_retries_elapsed, Ordering::Relaxed);
552+
for signature in exceeded_retries_transactions {
553+
info!("Dropping transaction due to max retries: {signature}");
554+
transactions.remove(&signature);
555+
}
556+
500557
result
501558
}
502559

@@ -848,28 +905,12 @@ mod test {
848905
&config,
849906
&stats,
850907
);
851-
assert_eq!(transactions.len(), 1);
852-
assert_eq!(
853-
result,
854-
ProcessTransactionsResult {
855-
retried: 1,
856-
max_retries_elapsed: 1,
857-
..ProcessTransactionsResult::default()
858-
}
859-
);
860-
let result = SendTransactionService::process_transactions(
861-
&working_bank,
862-
&root_bank,
863-
&mut transactions,
864-
&client,
865-
&config,
866-
&stats,
867-
);
868908
assert!(transactions.is_empty());
869909
assert_eq!(
870910
result,
871911
ProcessTransactionsResult {
872-
max_retries_elapsed: 1,
912+
retried: 1,
913+
max_retries_elapsed: 2,
873914
..ProcessTransactionsResult::default()
874915
}
875916
);

0 commit comments

Comments
 (0)