From 3a8b740c3921c149ca66f0333785f87b1121b708 Mon Sep 17 00:00:00 2001 From: Michal Kucharczyk <1728078+michalkucharczyk@users.noreply.github.com> Date: Wed, 25 Jun 2025 08:40:14 +0200 Subject: [PATCH] BACKPORT-CONFLICT --- prdoc/pr_8923.prdoc | 11 ++ .../fork_aware_txpool/fork_aware_txpool.rs | 163 +++++++++++++++++- .../import_notification_sink.rs | 7 + .../src/fork_aware_txpool/tx_mem_pool.rs | 14 +- .../src/fork_aware_txpool/view_store.rs | 10 ++ .../src/graph/validated_pool.rs | 15 ++ .../client/transaction-pool/tests/fatp.rs | 45 +++++ .../runtime/transaction-pool/src/lib.rs | 24 ++- 8 files changed, 282 insertions(+), 7 deletions(-) create mode 100644 prdoc/pr_8923.prdoc diff --git a/prdoc/pr_8923.prdoc b/prdoc/pr_8923.prdoc new file mode 100644 index 0000000000000..c2bb6982aa6fb --- /dev/null +++ b/prdoc/pr_8923.prdoc @@ -0,0 +1,11 @@ +title: '`fatxpool`: fix: remove invalid txs from the dropped stream controller' +doc: +- audience: Node Dev + description: |- + While testing mortal transaction I encountered exactly the same problem as in #8490. + This PR should fix the problem. + + fixes: #8490 +crates: +- name: sc-transaction-pool + bump: minor diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs index 3553465668e31..fa9d444a8d0ae 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs @@ -238,8 +238,12 @@ where let (dropped_stream_controller, dropped_stream) = MultiViewDroppedWatcherController::::new(); - let view_store = - Arc::new(ViewStore::new(pool_api.clone(), listener, dropped_stream_controller)); + let view_store = Arc::new(ViewStore::new( + pool_api.clone(), + listener, + dropped_stream_controller, + import_notification_sink.clone(), + )); let dropped_monitor_task = Self::dropped_monitor_task( dropped_stream, @@ -372,8 +376,12 @@ where let (dropped_stream_controller, dropped_stream) = MultiViewDroppedWatcherController::::new(); - let view_store = - Arc::new(ViewStore::new(pool_api.clone(), listener, dropped_stream_controller)); + let view_store = Arc::new(ViewStore::new( + pool_api.clone(), + listener, + dropped_stream_controller, + import_notification_sink.clone(), + )); let dropped_monitor_task = Self::dropped_monitor_task( dropped_stream, @@ -658,6 +666,153 @@ where ); (false, pending) } +<<<<<<< HEAD +======= + + /// Refer to [`Self::submit_and_watch`] + async fn submit_and_watch_inner( + &self, + at: Block::Hash, + source: TransactionSource, + xt: TransactionFor, + ) -> Result>>, ChainApi::Error> { + let xt = Arc::from(xt); + + let insertion = match self.mempool.push_watched(source, xt.clone()).await { + Ok(result) => result, + Err(TxPoolApiError::ImmediatelyDropped) => + self.attempt_transaction_replacement(source, true, xt.clone()).await?, + Err(e) => return Err(e.into()), + }; + + self.metrics.report(|metrics| metrics.submitted_transactions.inc()); + self.events_metrics_collector.report_submitted(&insertion); + + match self.view_store.submit_and_watch(at, insertion.source, xt).await { + Err(e) => { + self.mempool.remove_transactions(&[insertion.hash]).await; + Err(e.into()) + }, + Ok(mut outcome) => { + self.mempool + .update_transaction_priority(outcome.hash(), outcome.priority()) + .await; + Ok(outcome.expect_watcher()) + }, + } + } + + /// Refer to [`Self::submit_at`] + async fn submit_at_inner( + &self, + source: TransactionSource, + xts: Vec>, + ) -> Result, ChainApi::Error>>, ChainApi::Error> { + let view_store = self.view_store.clone(); + trace!( + target: LOG_TARGET, + count = xts.len(), + active_views_count = self.active_views_count(), + "fatp::submit_at" + ); + log_xt_trace!(target: LOG_TARGET, xts.iter().map(|xt| self.tx_hash(xt)), "fatp::submit_at"); + let xts = xts.into_iter().map(Arc::from).collect::>(); + let mempool_results = self.mempool.extend_unwatched(source, &xts).await; + + if view_store.is_empty() { + return Ok(mempool_results + .into_iter() + .map(|r| r.map(|r| r.hash).map_err(Into::into)) + .collect::>()) + } + + // Submit all the transactions to the mempool + let retries = mempool_results + .into_iter() + .zip(xts.clone()) + .map(|(result, xt)| async move { + match result { + Err(TxPoolApiError::ImmediatelyDropped) => + self.attempt_transaction_replacement(source, false, xt).await, + _ => result, + } + }) + .collect::>(); + + let mempool_results = futures::future::join_all(retries).await; + + // Collect transactions that were successfully submitted to the mempool... + let to_be_submitted = mempool_results + .iter() + .zip(xts) + .filter_map(|(result, xt)| { + result.as_ref().ok().map(|insertion| { + self.events_metrics_collector.report_submitted(&insertion); + (insertion.source.clone(), xt) + }) + }) + .collect::>(); + + self.metrics + .report(|metrics| metrics.submitted_transactions.inc_by(to_be_submitted.len() as _)); + + // ... and submit them to the view_store. Please note that transactions rejected by mempool + // are not sent here. + let mempool = self.mempool.clone(); + let results_map = view_store.submit(to_be_submitted.into_iter()).await; + let mut submission_results = reduce_multiview_result(results_map).into_iter(); + + // Note for composing final result: + // + // For each failed insertion into the mempool, the mempool result should be placed into + // the returned vector. + // + // For each successful insertion into the mempool, the corresponding + // view_store submission result needs to be examined (merged_results): + // - If there is an error during view_store submission, the transaction is removed from + // the mempool, and the final result recorded in the vector for this transaction is the + // view_store submission error. + // + // - If the view_store submission is successful, the transaction priority is updated in the + // mempool. + // + // Finally, it collects the hashes of updated transactions or submission errors (either + // from the mempool or view_store) into a returned vector (final_results). + const RESULTS_ASSUMPTION : &str = + "The number of Ok results in mempool is exactly the same as the size of view_store submission result. qed."; + let merged_results = mempool_results.into_iter().map(|result| { + result.map_err(Into::into).and_then(|insertion| { + Ok((insertion.hash, submission_results.next().expect(RESULTS_ASSUMPTION))) + }) + }); + + let mut final_results = vec![]; + for r in merged_results { + match r { + Ok((hash, submission_result)) => match submission_result { + Ok(r) => { + mempool.update_transaction_priority(r.hash(), r.priority()).await; + final_results.push(Ok(r.hash())); + }, + Err(e) => { + mempool.remove_transactions(&[hash]).await; + final_results.push(Err(e)); + }, + }, + Err(e) => final_results.push(Err(e)), + } + } + + Ok(final_results) + } + + /// Number of notified items in import_notification_sink. + /// + /// Internal detail, exposed only for testing. + pub fn import_notification_sink_len(&self) -> usize { + self.import_notification_sink.notified_items_len() + } +>>>>>>> 63973cc (`fatxpool`: fix: remove invalid txs from the dropped stream controller (#8923)) } /// Converts the input view-to-statuses map into the output vector of statuses. diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/import_notification_sink.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/import_notification_sink.rs index 1ca287fa23715..a9a8eb811e4d3 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/import_notification_sink.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/import_notification_sink.rs @@ -245,6 +245,13 @@ where already_notified_items.remove(i); }); } + + /// Lenght of the `already_notified_items` set. + /// + /// Exposed for testing only. + pub fn notified_items_len(&self) -> usize { + self.already_notified_items.read().len() + } } #[cfg(test)] diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs index 559f11da4cdb2..38aa310014762 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs @@ -605,8 +605,18 @@ where let revalidated_invalid_hashes_len = revalidated_invalid_hashes.len(); let invalid_hashes_subtrees_len = invalid_hashes_subtrees.len(); - self.listener - .transactions_invalidated(&invalid_hashes_subtrees.into_iter().collect::>()); + let invalid_hashes_subtrees = invalid_hashes_subtrees.into_iter().collect::>(); + + //note: here the consistency is assumed: it is expected that transaction will be + // actually removed from the listener with Invalid event. This means assumption that no view + // is referencing tx as ready. + self.listener.transactions_invalidated(&invalid_hashes_subtrees); + view_store + .import_notification_sink + .clean_notified_items(&invalid_hashes_subtrees); + view_store + .dropped_stream_controller + .remove_transactions(invalid_hashes_subtrees); trace!( target: LOG_TARGET, diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs index bcf1e8d5ceef9..6ccb708b631c4 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs @@ -19,6 +19,7 @@ //! Transaction pool view store. Basically block hash to view map with some utility methods. use super::{ + import_notification_sink::MultiViewImportNotificationSink, multi_view_listener::{MultiViewListener, TxStatusStream}, view::{View, ViewPoolObserver}, }; @@ -171,6 +172,10 @@ where pub(super) most_recent_view: RwLock>, /// The controller of multi view dropped stream. pub(super) dropped_stream_controller: MultiViewDroppedWatcherController, + /// Util providing an aggregated stream of transactions that were imported to ready queue in + /// any view. Reference kept here for clean up purposes. + pub(super) import_notification_sink: + MultiViewImportNotificationSink>, /// The map used to synchronize replacement of transactions between maintain and dropped /// notifcication threads. It is meant to assure that replaced transaction is also removed from /// newly built views in maintain process. @@ -202,6 +207,10 @@ where api: Arc, listener: Arc>, dropped_stream_controller: MultiViewDroppedWatcherController, + import_notification_sink: MultiViewImportNotificationSink< + Block::Hash, + ExtrinsicHash, + >, ) -> Self { Self { api, @@ -210,6 +219,7 @@ where listener, most_recent_view: RwLock::from(None), dropped_stream_controller, + import_notification_sink, pending_txs_tasks: Default::default(), } } diff --git a/substrate/client/transaction-pool/src/graph/validated_pool.rs b/substrate/client/transaction-pool/src/graph/validated_pool.rs index caccc3370a0aa..45ca805f6a1db 100644 --- a/substrate/client/transaction-pool/src/graph/validated_pool.rs +++ b/substrate/client/transaction-pool/src/graph/validated_pool.rs @@ -31,8 +31,17 @@ use sp_runtime::{ traits::SaturatedConversion, transaction_validity::{TransactionTag as Tag, ValidTransaction}, }; +<<<<<<< HEAD use std::time::Instant; use tracing::{debug, trace, warn}; +======= +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::{Duration, Instant}, +}; +use tracing::{debug, trace, warn, Level}; +>>>>>>> 63973cc (`fatxpool`: fix: remove invalid txs from the dropped stream controller (#8923)) use super::{ base_pool::{self as base, PruneStatus}, @@ -704,6 +713,12 @@ impl> ValidatedPool { } hashes }; + debug!( + target:LOG_TARGET, + to_remove_len=to_remove.len(), + futures_to_remove_len=futures_to_remove.len(), + "clear_stale" + ); // removing old transactions self.remove_invalid(&to_remove); self.remove_invalid(&futures_to_remove); diff --git a/substrate/client/transaction-pool/tests/fatp.rs b/substrate/client/transaction-pool/tests/fatp.rs index c1411b29dafb9..57d8b04fdd908 100644 --- a/substrate/client/transaction-pool/tests/fatp.rs +++ b/substrate/client/transaction-pool/tests/fatp.rs @@ -484,6 +484,51 @@ fn fatp_linear_old_ready_becoming_stale() { } } +#[test] +fn fatp_proper_cleanup_after_mortal_tx_becoming_invalid() { + sp_tracing::try_init_simple(); + + let (pool, api, _) = pool(); + + let xts = vec![uxt(Alice, 200), uxt(Alice, 201), uxt(Alice, 202)]; + + api.set_valid_till(&xts[0], 66); + api.set_valid_till(&xts[1], 66); + api.set_valid_till(&xts[2], 66); + + let header01 = api.push_block(1, vec![], true); + let event = new_best_block_event(&pool, None, header01.hash()); + block_on(pool.maintain(event)); + + xts.into_iter().for_each(|xt| { + block_on(pool.submit_one(invalid_hash(), SOURCE, xt)).unwrap(); + }); + assert_eq!(pool.status_all()[&header01.hash()].ready, 3); + assert_eq!(pool.status_all()[&header01.hash()].future, 0); + + // Import enough blocks to make our transactions stale (longevity is 64) + let mut prev_header = header01; + for n in 2..67 { + let header = api.push_block_with_parent(prev_header.hash(), vec![], true); + let event = new_best_block_event(&pool, Some(prev_header.hash()), header.hash()); + block_on(pool.maintain(event)); + + if n == 66 { + assert_eq!(pool.status_all()[&header.hash()].ready, 0); + assert_eq!(pool.status_all()[&header.hash()].future, 0); + } else { + assert_eq!(pool.status_all()[&header.hash()].ready, 3); + assert_eq!(pool.status_all()[&header.hash()].future, 0); + } + prev_header = header; + } + + let header = api.push_block_with_parent(prev_header.hash(), vec![], true); + let event = finalized_block_event(&pool, prev_header.hash(), header.hash()); + block_on(pool.maintain(event)); + assert_eq!(pool.import_notification_sink_len(), 0); +} + #[test] fn fatp_fork_reorg() { sp_tracing::try_init_simple(); diff --git a/substrate/test-utils/runtime/transaction-pool/src/lib.rs b/substrate/test-utils/runtime/transaction-pool/src/lib.rs index f88694fb1071e..27fd260569a01 100644 --- a/substrate/test-utils/runtime/transaction-pool/src/lib.rs +++ b/substrate/test-utils/runtime/transaction-pool/src/lib.rs @@ -85,6 +85,7 @@ pub struct ChainState { pub nonces: HashMap>, pub invalid_hashes: HashSet, pub priorities: HashMap, + pub valid_till_blocks: HashMap, } /// Test Api for transaction pool. @@ -269,6 +270,14 @@ impl TestApi { .insert(Self::hash_and_length_inner(xts).0, priority); } + /// Set a transaction mortality (block at which it will expire). + pub fn set_valid_till(&self, xts: &Extrinsic, valid_till: u64) { + self.chain + .write() + .valid_till_blocks + .insert(Self::hash_and_length_inner(xts).0, valid_till); + } + /// Query validation requests received. pub fn validation_requests(&self) -> Vec { self.validation_requests.read().clone() @@ -442,11 +451,24 @@ impl ChainApi for TestApi { } let priority = self.chain.read().priorities.get(&self.hash_and_length(&uxt).0).cloned(); + let longevity = self + .chain + .read() + .valid_till_blocks + .get(&self.hash_and_length(&uxt).0) + .cloned() + .map(|valid_till| valid_till.saturating_sub(block_number.unwrap())) + .unwrap_or(64); + + if longevity == 0 { + return Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::BadProof))) + } + let mut validity = ValidTransaction { priority: priority.unwrap_or(1), requires, provides, - longevity: 64, + longevity, propagate: true, };