Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions prdoc/pr_8923.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
title: '`fatxpool`: fix: remove invalid txs from the dropped stream controller'
doc:
- audience: Node Dev
description: |-
While testing mortal transaction I encountered exactly the same problem as in #8490.
This PR should fix the problem.

fixes: #8490
crates:
- name: sc-transaction-pool
bump: minor
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,12 @@ where
let (dropped_stream_controller, dropped_stream) =
MultiViewDroppedWatcherController::<ChainApi>::new();

let view_store =
Arc::new(ViewStore::new(pool_api.clone(), listener, dropped_stream_controller));
let view_store = Arc::new(ViewStore::new(
pool_api.clone(),
listener,
dropped_stream_controller,
import_notification_sink.clone(),
));

let dropped_monitor_task = Self::dropped_monitor_task(
dropped_stream,
Expand Down Expand Up @@ -404,8 +408,12 @@ where
let (dropped_stream_controller, dropped_stream) =
MultiViewDroppedWatcherController::<ChainApi>::new();

let view_store =
Arc::new(ViewStore::new(pool_api.clone(), listener, dropped_stream_controller));
let view_store = Arc::new(ViewStore::new(
pool_api.clone(),
listener,
dropped_stream_controller,
import_notification_sink.clone(),
));

let dropped_monitor_task = Self::dropped_monitor_task(
dropped_stream,
Expand Down Expand Up @@ -837,6 +845,13 @@ where

Ok(final_results)
}

/// Number of notified items in import_notification_sink.
///
/// Internal detail, exposed only for testing.
pub fn import_notification_sink_len(&self) -> usize {
self.import_notification_sink.notified_items_len()
}
}

/// Converts the input view-to-statuses map into the output vector of statuses.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ where
already_notified_items.remove(i);
});
}

/// Lenght of the `already_notified_items` set.
///
/// Exposed for testing only.
pub fn notified_items_len(&self) -> usize {
self.already_notified_items.read().len()
}
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,8 +697,18 @@ where
let revalidated_invalid_hashes_len = revalidated_invalid_hashes.len();
let invalid_hashes_subtrees_len = invalid_hashes_subtrees.len();

self.listener
.transactions_invalidated(&invalid_hashes_subtrees.into_iter().collect::<Vec<_>>());
let invalid_hashes_subtrees = invalid_hashes_subtrees.into_iter().collect::<Vec<_>>();

//note: here the consistency is assumed: it is expected that transaction will be
// actually removed from the listener with Invalid event. This means assumption that no view
// is referencing tx as ready.
self.listener.transactions_invalidated(&invalid_hashes_subtrees);
view_store
.import_notification_sink
.clean_notified_items(&invalid_hashes_subtrees);
view_store
.dropped_stream_controller
.remove_transactions(invalid_hashes_subtrees);

trace!(
target: LOG_TARGET,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! Transaction pool view store. Basically block hash to view map with some utility methods.

use super::{
import_notification_sink::MultiViewImportNotificationSink,
multi_view_listener::{MultiViewListener, TxStatusStream},
view::{View, ViewPoolObserver},
};
Expand Down Expand Up @@ -171,6 +172,10 @@ where
pub(super) most_recent_view: RwLock<Option<Arc<View<ChainApi>>>>,
/// The controller of multi view dropped stream.
pub(super) dropped_stream_controller: MultiViewDroppedWatcherController<ChainApi>,
/// Util providing an aggregated stream of transactions that were imported to ready queue in
/// any view. Reference kept here for clean up purposes.
pub(super) import_notification_sink:
MultiViewImportNotificationSink<Block::Hash, ExtrinsicHash<ChainApi>>,
/// The map used to synchronize replacement of transactions between maintain and dropped
/// notifcication threads. It is meant to assure that replaced transaction is also removed from
/// newly built views in maintain process.
Expand Down Expand Up @@ -202,6 +207,10 @@ where
api: Arc<ChainApi>,
listener: Arc<MultiViewListener<ChainApi>>,
dropped_stream_controller: MultiViewDroppedWatcherController<ChainApi>,
import_notification_sink: MultiViewImportNotificationSink<
Block::Hash,
ExtrinsicHash<ChainApi>,
>,
) -> Self {
Self {
api,
Expand All @@ -210,6 +219,7 @@ where
listener,
most_recent_view: RwLock::from(None),
dropped_stream_controller,
import_notification_sink,
pending_txs_tasks: Default::default(),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use std::{
sync::Arc,
time::{Duration, Instant},
};
use tracing::{trace, warn, Level};
use tracing::{debug, trace, warn, Level};

use super::{
base_pool::{self as base, PruneStatus},
Expand Down Expand Up @@ -722,6 +722,12 @@ impl<B: ChainApi, L: EventHandler<B>> ValidatedPool<B, L> {
}
hashes
};
debug!(
target:LOG_TARGET,
to_remove_len=to_remove.len(),
futures_to_remove_len=futures_to_remove.len(),
"clear_stale"
);
// removing old transactions
self.remove_invalid(&to_remove);
self.remove_invalid(&futures_to_remove);
Expand Down
45 changes: 45 additions & 0 deletions substrate/client/transaction-pool/tests/fatp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,51 @@ fn fatp_linear_old_ready_becoming_stale() {
}
}

#[test]
fn fatp_proper_cleanup_after_mortal_tx_becoming_invalid() {
sp_tracing::try_init_simple();

let (pool, api, _) = pool();

let xts = vec![uxt(Alice, 200), uxt(Alice, 201), uxt(Alice, 202)];

api.set_valid_till(&xts[0], 66);
api.set_valid_till(&xts[1], 66);
api.set_valid_till(&xts[2], 66);

let header01 = api.push_block(1, vec![], true);
let event = new_best_block_event(&pool, None, header01.hash());
block_on(pool.maintain(event));

xts.into_iter().for_each(|xt| {
block_on(pool.submit_one(invalid_hash(), SOURCE, xt)).unwrap();
});
assert_eq!(pool.status_all()[&header01.hash()].ready, 3);
assert_eq!(pool.status_all()[&header01.hash()].future, 0);

// Import enough blocks to make our transactions stale (longevity is 64)
let mut prev_header = header01;
for n in 2..67 {
let header = api.push_block_with_parent(prev_header.hash(), vec![], true);
let event = new_best_block_event(&pool, Some(prev_header.hash()), header.hash());
block_on(pool.maintain(event));

if n == 66 {
assert_eq!(pool.status_all()[&header.hash()].ready, 0);
assert_eq!(pool.status_all()[&header.hash()].future, 0);
} else {
assert_eq!(pool.status_all()[&header.hash()].ready, 3);
assert_eq!(pool.status_all()[&header.hash()].future, 0);
}
prev_header = header;
}

let header = api.push_block_with_parent(prev_header.hash(), vec![], true);
let event = finalized_block_event(&pool, prev_header.hash(), header.hash());
block_on(pool.maintain(event));
assert_eq!(pool.import_notification_sink_len(), 0);
}

#[test]
fn fatp_fork_reorg() {
sp_tracing::try_init_simple();
Expand Down
24 changes: 23 additions & 1 deletion substrate/test-utils/runtime/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub struct ChainState {
pub nonces: HashMap<Hash, HashMap<AccountId, u64>>,
pub invalid_hashes: HashSet<Hash>,
pub priorities: HashMap<Hash, u64>,
pub valid_till_blocks: HashMap<Hash, u64>,
}

/// Test Api for transaction pool.
Expand Down Expand Up @@ -269,6 +270,14 @@ impl TestApi {
.insert(Self::hash_and_length_inner(xts).0, priority);
}

/// Set a transaction mortality (block at which it will expire).
pub fn set_valid_till(&self, xts: &Extrinsic, valid_till: u64) {
self.chain
.write()
.valid_till_blocks
.insert(Self::hash_and_length_inner(xts).0, valid_till);
}

/// Query validation requests received.
pub fn validation_requests(&self) -> Vec<Extrinsic> {
self.validation_requests.read().clone()
Expand Down Expand Up @@ -443,11 +452,24 @@ impl ChainApi for TestApi {
}

let priority = self.chain.read().priorities.get(&self.hash_and_length(&uxt).0).cloned();
let longevity = self
.chain
.read()
.valid_till_blocks
.get(&self.hash_and_length(&uxt).0)
.cloned()
.map(|valid_till| valid_till.saturating_sub(block_number.unwrap()))
.unwrap_or(64);

if longevity == 0 {
return Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::BadProof)))
}

let mut validity = ValidTransaction {
priority: priority.unwrap_or(1),
requires,
provides,
longevity: 64,
longevity,
propagate: true,
};

Expand Down
Loading