Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions prdoc/pr_8836.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
title: '`fatxpool`: `report_invalid` is now aync'
doc:
- audience: Node Dev
description: '[`TransactionPool::report_invalid`](https://github.com/paritytech/polkadot-sdk/blob/74dafaee5c600fd2c8a59a280f647f94ccf0a755/substrate/client/transaction-pool/api/src/lib.rs#L314)
is now `async`, function was typically used in async context, seems right to be
fully async.'
crates:
- name: sc-basic-authorship
bump: major
- name: sc-rpc-api
bump: major
- name: sc-rpc-spec-v2
bump: major
- name: sc-rpc
bump: major
- name: sc-transaction-pool-api
bump: major
- name: sc-transaction-pool
bump: major
2 changes: 1 addition & 1 deletion substrate/bin/node/bench/src/construct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ impl sc_transaction_pool_api::TransactionPool for Transactions {
unimplemented!()
}

fn report_invalid(
async fn report_invalid(
&self,
_at: Option<Self::Hash>,
_invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
Expand Down
4 changes: 3 additions & 1 deletion substrate/client/basic-authorship/src/basic_authorship.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,9 @@ where
);
}

self.transaction_pool.report_invalid(Some(self.parent_hash), unqueue_invalid);
self.transaction_pool
.report_invalid(Some(self.parent_hash), unqueue_invalid)
.await;
Ok(end_reason)
}

Expand Down
2 changes: 1 addition & 1 deletion substrate/client/rpc-api/src/author/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub trait AuthorApi<Hash, BlockHash> {

/// Remove given extrinsic from the pool and temporarily ban it to prevent reimporting.
#[method(name = "author_removeExtrinsic", with_extensions)]
fn remove_extrinsic(
async fn remove_extrinsic(
&self,
bytes_or_hash: Vec<hash::ExtrinsicOrHash<Hash>>,
) -> Result<Vec<Hash>, Error>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,12 @@ impl TransactionPool for MiddlewarePool {
Ok(watcher.boxed())
}

fn report_invalid(
async fn report_invalid(
&self,
at: Option<<Self::Block as BlockT>::Hash>,
invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
) -> Vec<Arc<Self::InPoolTransaction>> {
self.inner_pool.report_invalid(at, invalid_tx_errors)
self.inner_pool.report_invalid(at, invalid_tx_errors).await
}

fn status(&self) -> PoolStatus {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,20 +226,22 @@ where
let pool = self.pool.clone();
// The future expected by the executor must be `Future<Output = ()>` instead of
// `Future<Output = Result<(), Aborted>>`.
let fut = fut.map(move |result| {
// Connection space is cleaned when this object is dropped.
drop(reserved_identifier);
let fut = fut.then(move |result| {
async move {
// Connection space is cleaned when this object is dropped.
drop(reserved_identifier);

// Remove the entry from the broadcast IDs map.
let Some(broadcast_state) = broadcast_ids.write().remove(&drop_id) else { return };
// Remove the entry from the broadcast IDs map.
let Some(broadcast_state) = broadcast_ids.write().remove(&drop_id) else { return };

// The broadcast was not stopped.
if result.is_ok() {
return
}
// The broadcast was not stopped.
if result.is_ok() {
return
}

// Best effort pool removal (tx can already be finalized).
pool.report_invalid(None, [(broadcast_state.tx_hash, None)].into());
// Best effort pool removal (tx can already be finalized).
pool.report_invalid(None, [(broadcast_state.tx_hash, None)].into()).await;
}
});

// Keep track of this entry and the abortable handle.
Expand Down
3 changes: 2 additions & 1 deletion substrate/client/rpc/src/author/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ where
Ok(self.pool.ready().map(|tx| tx.data().encode().into()).collect())
}

fn remove_extrinsic(
async fn remove_extrinsic(
&self,
ext: &Extensions,
bytes_or_hash: Vec<hash::ExtrinsicOrHash<TxHash<P>>>,
Expand All @@ -173,6 +173,7 @@ where
Ok(self
.pool
.report_invalid(None, hashes)
.await
.into_iter()
.map(|tx| tx.hash().clone())
.collect())
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/transaction-pool/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ pub trait TransactionPool: Send + Sync {
/// occurred.
///
/// Function returns the transactions actually removed from the pool.
fn report_invalid(
async fn report_invalid(
&self,
at: Option<<Self::Block as BlockT>::Hash>,
invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ where
/// The transaction pool implementation will determine which transactions should be
/// removed from the pool. Transactions that depend on invalid transactions will also
/// be removed.
fn report_invalid(
async fn report_invalid(
&self,
at: Option<<Self::Block as BlockT>::Hash>,
invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
Expand All @@ -1018,7 +1018,7 @@ where
let removed = self.view_store.report_invalid(at, invalid_tx_errors);

let removed_hashes = removed.iter().map(|tx| tx.hash).collect::<Vec<_>>();
self.mempool.clone().remove_transactions_sync(removed_hashes.clone());
self.mempool.remove_transactions(&removed_hashes).await;
self.import_notification_sink.clean_notified_items(&removed_hashes);

self.metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ where
.map(|mut outcome| outcome.expect_watcher().into_stream().boxed())
}

fn report_invalid(
async fn report_invalid(
&self,
_at: Option<<Self::Block as BlockT>::Hash>,
invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ where
self.0.ready()
}

fn report_invalid(
async fn report_invalid(
&self,
at: Option<<Self::Block as BlockT>::Hash>,
invalid_tx_errors: TxInvalidityReportMap<TxHash<Self>>,
) -> Vec<Arc<Self::InPoolTransaction>> {
self.0.report_invalid(at, invalid_tx_errors)
self.0.report_invalid(at, invalid_tx_errors).await
}

fn futures(&self) -> Vec<Self::InPoolTransaction> {
Expand Down
12 changes: 6 additions & 6 deletions substrate/client/transaction-pool/tests/fatp_invalid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ fn fatp_invalid_report_stale_or_future_works_as_expected() {
Some(TransactionValidityError::Invalid(InvalidTransaction::Future)),
);
let invalid_txs = [xt0_report].into();
let result = pool.report_invalid(None, invalid_txs);
let result = block_on(pool.report_invalid(None, invalid_txs));
assert!(result.is_empty());
assert_ready_iterator!(header01.hash(), pool, [xt0, xt1, xt2, xt3]);

Expand All @@ -459,7 +459,7 @@ fn fatp_invalid_report_stale_or_future_works_as_expected() {
Some(TransactionValidityError::Invalid(InvalidTransaction::Stale)),
);
let invalid_txs = [xt0_report, xt1_report].into();
let result = pool.report_invalid(Some(header01.hash()), invalid_txs);
let result = block_on(pool.report_invalid(Some(header01.hash()), invalid_txs));
// stale/future does not cause tx to be removed from the pool
assert!(result.is_empty());
// assert_eq!(result[0].hash, pool.api().hash_and_length(&xt0).0);
Expand Down Expand Up @@ -521,7 +521,7 @@ fn fatp_invalid_report_future_dont_remove_from_pool() {
Some(TransactionValidityError::Invalid(InvalidTransaction::BadProof)),
);
let invalid_txs = [xt0_report, xt1_report, xt4_report].into();
let result = pool.report_invalid(Some(header01.hash()), invalid_txs);
let result = block_on(pool.report_invalid(Some(header01.hash()), invalid_txs));

assert_watcher_stream!(xt4_watcher, [TransactionStatus::Ready, TransactionStatus::Invalid]);

Expand Down Expand Up @@ -570,7 +570,7 @@ fn fatp_invalid_tx_is_removed_from_the_pool() {
);
let xt1_report = (pool.api().hash_and_length(&xt1).0, None);
let invalid_txs = [xt0_report, xt1_report].into();
let result = pool.report_invalid(Some(header01.hash()), invalid_txs);
let result = block_on(pool.report_invalid(Some(header01.hash()), invalid_txs));
assert!(result.iter().any(|tx| tx.hash == pool.api().hash_and_length(&xt0).0));
assert_pool_status!(header01.hash(), &pool, 2, 0);
assert_ready_iterator!(header01.hash(), pool, [xt2, xt3]);
Expand Down Expand Up @@ -613,7 +613,7 @@ fn fatp_invalid_tx_is_removed_from_the_pool_future_subtree_stays() {
Some(TransactionValidityError::Invalid(InvalidTransaction::BadProof)),
);
let invalid_txs = [xt0_report].into();
let result = pool.report_invalid(Some(header01.hash()), invalid_txs);
let result = block_on(pool.report_invalid(Some(header01.hash()), invalid_txs));
assert_eq!(result[0].hash, pool.api().hash_and_length(&xt0).0);
assert_pool_status!(header01.hash(), &pool, 0, 0);
assert_ready_iterator!(header01.hash(), pool, []);
Expand Down Expand Up @@ -671,7 +671,7 @@ fn fatp_invalid_tx_is_removed_from_the_pool2() {
);
let xt1_report = (pool.api().hash_and_length(&xt1).0, None);
let invalid_txs = [xt0_report, xt1_report].into();
let result = pool.report_invalid(Some(header01.hash()), invalid_txs);
let result = block_on(pool.report_invalid(Some(header01.hash()), invalid_txs));
assert!(result.iter().any(|tx| tx.hash == pool.api().hash_and_length(&xt0).0));
assert_ready_iterator!(header01.hash(), pool, [xt2, xt3]);
assert_pool_status!(header02a.hash(), &pool, 2, 0);
Expand Down