diff --git a/prdoc/pr_8923.prdoc b/prdoc/pr_8923.prdoc new file mode 100644 index 0000000000000..c2bb6982aa6fb --- /dev/null +++ b/prdoc/pr_8923.prdoc @@ -0,0 +1,11 @@ +title: '`fatxpool`: fix: remove invalid txs from the dropped stream controller' +doc: +- audience: Node Dev + description: |- + While testing mortal transaction I encountered exactly the same problem as in #8490. + This PR should fix the problem. + + fixes: #8490 +crates: +- name: sc-transaction-pool + bump: minor diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs index cc655db91f0f9..04a6cd4365ad3 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs @@ -256,8 +256,12 @@ where let (dropped_stream_controller, dropped_stream) = MultiViewDroppedWatcherController::::new(); - let view_store = - Arc::new(ViewStore::new(pool_api.clone(), listener, dropped_stream_controller)); + let view_store = Arc::new(ViewStore::new( + pool_api.clone(), + listener, + dropped_stream_controller, + import_notification_sink.clone(), + )); let dropped_monitor_task = Self::dropped_monitor_task( dropped_stream, @@ -404,8 +408,12 @@ where let (dropped_stream_controller, dropped_stream) = MultiViewDroppedWatcherController::::new(); - let view_store = - Arc::new(ViewStore::new(pool_api.clone(), listener, dropped_stream_controller)); + let view_store = Arc::new(ViewStore::new( + pool_api.clone(), + listener, + dropped_stream_controller, + import_notification_sink.clone(), + )); let dropped_monitor_task = Self::dropped_monitor_task( dropped_stream, @@ -837,6 +845,13 @@ where Ok(final_results) } + + /// Number of notified items in import_notification_sink. + /// + /// Internal detail, exposed only for testing. + pub fn import_notification_sink_len(&self) -> usize { + self.import_notification_sink.notified_items_len() + } } /// Converts the input view-to-statuses map into the output vector of statuses. diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/import_notification_sink.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/import_notification_sink.rs index 1ca287fa23715..a9a8eb811e4d3 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/import_notification_sink.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/import_notification_sink.rs @@ -245,6 +245,13 @@ where already_notified_items.remove(i); }); } + + /// Lenght of the `already_notified_items` set. + /// + /// Exposed for testing only. + pub fn notified_items_len(&self) -> usize { + self.already_notified_items.read().len() + } } #[cfg(test)] diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs index e67566baa8b33..0c9273256b045 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs @@ -697,8 +697,18 @@ where let revalidated_invalid_hashes_len = revalidated_invalid_hashes.len(); let invalid_hashes_subtrees_len = invalid_hashes_subtrees.len(); - self.listener - .transactions_invalidated(&invalid_hashes_subtrees.into_iter().collect::>()); + let invalid_hashes_subtrees = invalid_hashes_subtrees.into_iter().collect::>(); + + //note: here the consistency is assumed: it is expected that transaction will be + // actually removed from the listener with Invalid event. This means assumption that no view + // is referencing tx as ready. + self.listener.transactions_invalidated(&invalid_hashes_subtrees); + view_store + .import_notification_sink + .clean_notified_items(&invalid_hashes_subtrees); + view_store + .dropped_stream_controller + .remove_transactions(invalid_hashes_subtrees); trace!( target: LOG_TARGET, diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs index 91ef881a397c6..c392bd80d38d7 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs @@ -19,6 +19,7 @@ //! Transaction pool view store. Basically block hash to view map with some utility methods. use super::{ + import_notification_sink::MultiViewImportNotificationSink, multi_view_listener::{MultiViewListener, TxStatusStream}, view::{View, ViewPoolObserver}, }; @@ -171,6 +172,10 @@ where pub(super) most_recent_view: RwLock>>>, /// The controller of multi view dropped stream. pub(super) dropped_stream_controller: MultiViewDroppedWatcherController, + /// Util providing an aggregated stream of transactions that were imported to ready queue in + /// any view. Reference kept here for clean up purposes. + pub(super) import_notification_sink: + MultiViewImportNotificationSink>, /// The map used to synchronize replacement of transactions between maintain and dropped /// notifcication threads. It is meant to assure that replaced transaction is also removed from /// newly built views in maintain process. @@ -202,6 +207,10 @@ where api: Arc, listener: Arc>, dropped_stream_controller: MultiViewDroppedWatcherController, + import_notification_sink: MultiViewImportNotificationSink< + Block::Hash, + ExtrinsicHash, + >, ) -> Self { Self { api, @@ -210,6 +219,7 @@ where listener, most_recent_view: RwLock::from(None), dropped_stream_controller, + import_notification_sink, pending_txs_tasks: Default::default(), } } diff --git a/substrate/client/transaction-pool/src/graph/validated_pool.rs b/substrate/client/transaction-pool/src/graph/validated_pool.rs index e4e33111a1006..fdd2f280dc205 100644 --- a/substrate/client/transaction-pool/src/graph/validated_pool.rs +++ b/substrate/client/transaction-pool/src/graph/validated_pool.rs @@ -36,7 +36,7 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; -use tracing::{trace, warn, Level}; +use tracing::{debug, trace, warn, Level}; use super::{ base_pool::{self as base, PruneStatus}, @@ -722,6 +722,12 @@ impl> ValidatedPool { } hashes }; + debug!( + target:LOG_TARGET, + to_remove_len=to_remove.len(), + futures_to_remove_len=futures_to_remove.len(), + "clear_stale" + ); // removing old transactions self.remove_invalid(&to_remove); self.remove_invalid(&futures_to_remove); diff --git a/substrate/client/transaction-pool/tests/fatp.rs b/substrate/client/transaction-pool/tests/fatp.rs index f0e12dfb6a749..49adc9ccc9639 100644 --- a/substrate/client/transaction-pool/tests/fatp.rs +++ b/substrate/client/transaction-pool/tests/fatp.rs @@ -484,6 +484,51 @@ fn fatp_linear_old_ready_becoming_stale() { } } +#[test] +fn fatp_proper_cleanup_after_mortal_tx_becoming_invalid() { + sp_tracing::try_init_simple(); + + let (pool, api, _) = pool(); + + let xts = vec![uxt(Alice, 200), uxt(Alice, 201), uxt(Alice, 202)]; + + api.set_valid_till(&xts[0], 66); + api.set_valid_till(&xts[1], 66); + api.set_valid_till(&xts[2], 66); + + let header01 = api.push_block(1, vec![], true); + let event = new_best_block_event(&pool, None, header01.hash()); + block_on(pool.maintain(event)); + + xts.into_iter().for_each(|xt| { + block_on(pool.submit_one(invalid_hash(), SOURCE, xt)).unwrap(); + }); + assert_eq!(pool.status_all()[&header01.hash()].ready, 3); + assert_eq!(pool.status_all()[&header01.hash()].future, 0); + + // Import enough blocks to make our transactions stale (longevity is 64) + let mut prev_header = header01; + for n in 2..67 { + let header = api.push_block_with_parent(prev_header.hash(), vec![], true); + let event = new_best_block_event(&pool, Some(prev_header.hash()), header.hash()); + block_on(pool.maintain(event)); + + if n == 66 { + assert_eq!(pool.status_all()[&header.hash()].ready, 0); + assert_eq!(pool.status_all()[&header.hash()].future, 0); + } else { + assert_eq!(pool.status_all()[&header.hash()].ready, 3); + assert_eq!(pool.status_all()[&header.hash()].future, 0); + } + prev_header = header; + } + + let header = api.push_block_with_parent(prev_header.hash(), vec![], true); + let event = finalized_block_event(&pool, prev_header.hash(), header.hash()); + block_on(pool.maintain(event)); + assert_eq!(pool.import_notification_sink_len(), 0); +} + #[test] fn fatp_fork_reorg() { sp_tracing::try_init_simple(); diff --git a/substrate/test-utils/runtime/transaction-pool/src/lib.rs b/substrate/test-utils/runtime/transaction-pool/src/lib.rs index 01af801fae942..d0518bd7fa1a3 100644 --- a/substrate/test-utils/runtime/transaction-pool/src/lib.rs +++ b/substrate/test-utils/runtime/transaction-pool/src/lib.rs @@ -85,6 +85,7 @@ pub struct ChainState { pub nonces: HashMap>, pub invalid_hashes: HashSet, pub priorities: HashMap, + pub valid_till_blocks: HashMap, } /// Test Api for transaction pool. @@ -269,6 +270,14 @@ impl TestApi { .insert(Self::hash_and_length_inner(xts).0, priority); } + /// Set a transaction mortality (block at which it will expire). + pub fn set_valid_till(&self, xts: &Extrinsic, valid_till: u64) { + self.chain + .write() + .valid_till_blocks + .insert(Self::hash_and_length_inner(xts).0, valid_till); + } + /// Query validation requests received. pub fn validation_requests(&self) -> Vec { self.validation_requests.read().clone() @@ -443,11 +452,24 @@ impl ChainApi for TestApi { } let priority = self.chain.read().priorities.get(&self.hash_and_length(&uxt).0).cloned(); + let longevity = self + .chain + .read() + .valid_till_blocks + .get(&self.hash_and_length(&uxt).0) + .cloned() + .map(|valid_till| valid_till.saturating_sub(block_number.unwrap())) + .unwrap_or(64); + + if longevity == 0 { + return Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::BadProof))) + } + let mut validity = ValidTransaction { priority: priority.unwrap_or(1), requires, provides, - longevity: 64, + longevity, propagate: true, };