diff --git a/Cargo.lock b/Cargo.lock index e9cd7ef29daf9..b0bed795e7463 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -24902,6 +24902,7 @@ dependencies = [ name = "substrate-test-runtime-transaction-pool" version = "2.0.0" dependencies = [ + "async-trait", "futures", "log", "parity-scale-codec", diff --git a/prdoc/pr_8875.prdoc b/prdoc/pr_8875.prdoc new file mode 100644 index 0000000000000..2c6a9b38eb9be --- /dev/null +++ b/prdoc/pr_8875.prdoc @@ -0,0 +1,7 @@ +title: '`fatxpool`: `ChainApi` is now async' +doc: +- audience: Node Dev + description: Internal `ChainApi` is now `async_trait`, `validate_transaction` and `block_body` are now `async` methods. This is just cleanup - migrating from returning `Future` to `async` method'. +crates: +- name: sc-transaction-pool + bump: major diff --git a/substrate/client/transaction-pool/benches/basics.rs b/substrate/client/transaction-pool/benches/basics.rs index 0672d47f70431..bbd8b536abd1d 100644 --- a/substrate/client/transaction-pool/benches/basics.rs +++ b/substrate/client/transaction-pool/benches/basics.rs @@ -16,13 +16,10 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use criterion::{criterion_group, criterion_main, Criterion}; - +use async_trait::async_trait; use codec::Encode; -use futures::{ - executor::block_on, - future::{ready, Ready}, -}; +use criterion::{criterion_group, criterion_main, Criterion}; +use futures::executor::block_on; use sc_transaction_pool::*; use sp_blockchain::HashAndNumber; use sp_crypto_hashing::blake2_256; @@ -55,19 +52,18 @@ fn to_tag(nonce: u64, from: AccountId) -> Tag { data.to_vec() } +#[async_trait] impl ChainApi for TestApi { type Block = Block; type Error = sc_transaction_pool_api::error::Error; - type ValidationFuture = Ready>; - type BodyFuture = Ready>>>; - fn validate_transaction( + async fn validate_transaction( &self, at: ::Hash, _: TransactionSource, uxt: Arc<::Extrinsic>, _: ValidateTransactionPriority, - ) -> Self::ValidationFuture { + ) -> Result { let uxt = (*uxt).clone(); let transfer = TransferData::try_from(&uxt) .expect("uxt is expected to be bench_call (carrying TransferData)"); @@ -75,11 +71,11 @@ impl ChainApi for TestApi { let from = transfer.from; match self.block_id_to_number(&BlockId::Hash(at)) { - Ok(Some(num)) if num > 5 => return ready(Ok(Err(InvalidTransaction::Stale.into()))), + Ok(Some(num)) if num > 5 => return Ok(Err(InvalidTransaction::Stale.into())), _ => {}, } - ready(Ok(Ok(ValidTransaction { + Ok(Ok(ValidTransaction { priority: 4, requires: if nonce > 1 && self.nonce_dependant { vec![to_tag(nonce - 1, from)] @@ -89,7 +85,7 @@ impl ChainApi for TestApi { provides: vec![to_tag(nonce, from)], longevity: 10, propagate: true, - }))) + })) } fn validate_transaction_blocking( @@ -97,7 +93,7 @@ impl ChainApi for TestApi { _at: ::Hash, _source: TransactionSource, _uxt: Arc<::Extrinsic>, - ) -> sc_transaction_pool_api::error::Result { + ) -> Result { unimplemented!(); } @@ -128,8 +124,11 @@ impl ChainApi for TestApi { (blake2_256(&encoded).into(), encoded.len()) } - fn block_body(&self, _id: ::Hash) -> Self::BodyFuture { - ready(Ok(None)) + async fn block_body( + &self, + _id: ::Hash, + ) -> Result::Extrinsic>>, Self::Error> { + Ok(None) } fn block_header( diff --git a/substrate/client/transaction-pool/src/common/api.rs b/substrate/client/transaction-pool/src/common/api.rs index b428582a01803..b77c390078fe7 100644 --- a/substrate/client/transaction-pool/src/common/api.rs +++ b/substrate/client/transaction-pool/src/common/api.rs @@ -23,8 +23,9 @@ use crate::{ graph::ValidateTransactionPriority, insert_and_log_throttled, LOG_TARGET, LOG_TARGET_STAT, }; +use async_trait::async_trait; use codec::Encode; -use futures::future::{ready, Future, FutureExt, Ready}; +use futures::future::{Future, FutureExt}; use prometheus_endpoint::Registry as PrometheusRegistry; use sc_client_api::{blockchain::HeaderBackend, BlockBackend}; use sp_api::{ApiExt, ProvideRuntimeApi}; @@ -181,6 +182,7 @@ impl FullChainApi { } } +#[async_trait] impl graph::ChainApi for FullChainApi where Block: BlockT, @@ -194,21 +196,21 @@ where { type Block = Block; type Error = error::Error; - type ValidationFuture = - Pin> + Send>>; - type BodyFuture = Ready::Extrinsic>>>>; - fn block_body(&self, hash: Block::Hash) -> Self::BodyFuture { - ready(self.client.block_body(hash).map_err(error::Error::from)) + async fn block_body( + &self, + hash: Block::Hash, + ) -> Result::Extrinsic>>, Self::Error> { + self.client.block_body(hash).map_err(error::Error::from) } - fn validate_transaction( + async fn validate_transaction( &self, at: ::Hash, source: TransactionSource, uxt: graph::ExtrinsicFor, validation_priority: ValidateTransactionPriority, - ) -> Self::ValidationFuture { + ) -> Result { let start = Instant::now(); let (tx, rx) = oneshot::channel(); let client = self.client.clone(); @@ -228,39 +230,36 @@ where }; let metrics = self.metrics.clone(); - async move { - metrics.report(|m| m.validations_scheduled.inc()); - - { - validation_pool - .send( - async move { - let res = validate_transaction_blocking(&*client, at, source, uxt); - let _ = tx.send(res); - metrics.report(|m| m.validations_finished.inc()); - } - .boxed(), - ) - .await - .map_err(|e| Error::RuntimeApi(format!("Validation pool down: {:?}", e)))?; - } + metrics.report(|m| m.validations_scheduled.inc()); - let validity = match rx.await { - Ok(r) => r, - Err(_) => Err(Error::RuntimeApi("Validation was canceled".into())), - }; + { + validation_pool + .send( + async move { + let res = validate_transaction_blocking(&*client, at, source, uxt); + let _ = tx.send(res); + metrics.report(|m| m.validations_finished.inc()); + } + .boxed(), + ) + .await + .map_err(|e| Error::RuntimeApi(format!("Validation pool down: {:?}", e)))?; + } - insert_and_log_throttled!( - Level::DEBUG, - target:LOG_TARGET_STAT, - prefix:prefix, - stats, - start.elapsed().into() - ); + let validity = match rx.await { + Ok(r) => r, + Err(_) => Err(Error::RuntimeApi("Validation was canceled".into())), + }; + + insert_and_log_throttled!( + Level::DEBUG, + target:LOG_TARGET_STAT, + prefix:prefix, + stats, + start.elapsed().into() + ); - validity - } - .boxed() + validity } /// Validates a transaction by calling into the runtime. @@ -271,21 +270,21 @@ where at: Block::Hash, source: TransactionSource, uxt: graph::ExtrinsicFor, - ) -> error::Result { + ) -> Result { validate_transaction_blocking(&*self.client, at, source, uxt) } fn block_id_to_number( &self, at: &BlockId, - ) -> error::Result>> { + ) -> Result>, Self::Error> { self.client.to_number(at).map_err(|e| Error::BlockIdConversion(e.to_string())) } fn block_id_to_hash( &self, at: &BlockId, - ) -> error::Result>> { + ) -> Result>, Self::Error> { self.client.to_hash(at).map_err(|e| Error::BlockIdConversion(e.to_string())) } diff --git a/substrate/client/transaction-pool/src/common/tests.rs b/substrate/client/transaction-pool/src/common/tests.rs index 71caf005333de..bd36bc8adbe4a 100644 --- a/substrate/client/transaction-pool/src/common/tests.rs +++ b/substrate/client/transaction-pool/src/common/tests.rs @@ -22,6 +22,7 @@ use crate::{ graph::{BlockHash, ChainApi, ExtrinsicFor, NumberFor, RawExtrinsicFor}, ValidateTransactionPriority, }; +use async_trait::async_trait; use codec::Encode; use parking_lot::Mutex; use sc_transaction_pool_api::error; @@ -69,20 +70,19 @@ impl TestApi { } } +#[async_trait] impl ChainApi for TestApi { type Block = Block; type Error = error::Error; - type ValidationFuture = futures::future::Ready>; - type BodyFuture = futures::future::Ready>>>; /// Verify extrinsic at given block. - fn validate_transaction( + async fn validate_transaction( &self, at: ::Hash, _source: TransactionSource, uxt: ExtrinsicFor, _: ValidateTransactionPriority, - ) -> Self::ValidationFuture { + ) -> Result { let uxt = (*uxt).clone(); self.validation_requests.lock().push(uxt.clone()); let hash = self.hash_and_length(&uxt).0; @@ -159,7 +159,7 @@ impl ChainApi for TestApi { _ => unimplemented!(), }; - futures::future::ready(Ok(res)) + Ok(res) } fn validate_transaction_blocking( @@ -167,7 +167,7 @@ impl ChainApi for TestApi { _at: ::Hash, _source: TransactionSource, _uxt: Arc<::Extrinsic>, - ) -> error::Result { + ) -> Result { unimplemented!(); } @@ -202,8 +202,11 @@ impl ChainApi for TestApi { (Hashing::hash(&encoded), len) } - fn block_body(&self, _id: ::Hash) -> Self::BodyFuture { - futures::future::ready(Ok(None)) + async fn block_body( + &self, + _id: ::Hash, + ) -> Result::Extrinsic>>, Self::Error> { + Ok(None) } fn block_header( diff --git a/substrate/client/transaction-pool/src/graph/pool.rs b/substrate/client/transaction-pool/src/graph/pool.rs index d6cbf7ccdef64..8c431d82082ce 100644 --- a/substrate/client/transaction-pool/src/graph/pool.rs +++ b/substrate/client/transaction-pool/src/graph/pool.rs @@ -17,7 +17,8 @@ // along with this program. If not, see . use crate::{common::tracing_log_xt::log_xt_trace, LOG_TARGET}; -use futures::{channel::mpsc::Receiver, Future}; +use async_trait::async_trait; +use futures::channel::mpsc::Receiver; use indexmap::IndexMap; use sc_transaction_pool_api::error; use sp_blockchain::{HashAndNumber, TreeRoute}; @@ -74,27 +75,21 @@ pub enum ValidateTransactionPriority { } /// Concrete extrinsic validation and query logic. +#[async_trait] pub trait ChainApi: Send + Sync { /// Block type. type Block: BlockT; /// Error type. type Error: From + error::IntoPoolError; - /// Validate transaction future. - type ValidationFuture: Future> + Send + Unpin; - /// Body future (since block body might be remote) - type BodyFuture: Future::Extrinsic>>, Self::Error>> - + Unpin - + Send - + 'static; /// Asynchronously verify extrinsic at given block. - fn validate_transaction( + async fn validate_transaction( &self, at: ::Hash, source: TransactionSource, uxt: ExtrinsicFor, validation_priority: ValidateTransactionPriority, - ) -> Self::ValidationFuture; + ) -> Result; /// Synchronously verify given extrinsic at given block. /// @@ -123,7 +118,10 @@ pub trait ChainApi: Send + Sync { fn hash_and_length(&self, uxt: &RawExtrinsicFor) -> (ExtrinsicHash, usize); /// Returns a block body given the block. - fn block_body(&self, at: ::Hash) -> Self::BodyFuture; + async fn block_body( + &self, + at: ::Hash, + ) -> Result::Extrinsic>>, Self::Error>; /// Returns a block header given the block id. fn block_header( diff --git a/substrate/test-utils/runtime/transaction-pool/Cargo.toml b/substrate/test-utils/runtime/transaction-pool/Cargo.toml index 501c9f99ebf11..e48f1136f3e51 100644 --- a/substrate/test-utils/runtime/transaction-pool/Cargo.toml +++ b/substrate/test-utils/runtime/transaction-pool/Cargo.toml @@ -15,6 +15,7 @@ workspace = true targets = ["x86_64-unknown-linux-gnu"] [dependencies] +async-trait = { workspace = true } codec = { workspace = true, default-features = true } futures = { workspace = true } log = { workspace = true } diff --git a/substrate/test-utils/runtime/transaction-pool/src/lib.rs b/substrate/test-utils/runtime/transaction-pool/src/lib.rs index d0518bd7fa1a3..8e77689180c47 100644 --- a/substrate/test-utils/runtime/transaction-pool/src/lib.rs +++ b/substrate/test-utils/runtime/transaction-pool/src/lib.rs @@ -19,8 +19,8 @@ //! //! See [`TestApi`] for more information. +use async_trait::async_trait; use codec::Encode; -use futures::future::ready; use parking_lot::RwLock; use sc_transaction_pool::{ChainApi, ValidateTransactionPriority}; use sp_blockchain::{CachedHeaderMetadata, HashAndNumber, TreeRoute}; @@ -352,20 +352,19 @@ impl TagFrom for AccountId { } } +#[async_trait] impl ChainApi for TestApi { type Block = Block; type Error = Error; - type ValidationFuture = futures::future::Ready>; - type BodyFuture = futures::future::Ready>, Error>>; - fn validate_transaction( + async fn validate_transaction( &self, at: ::Hash, source: TransactionSource, uxt: Arc<::Extrinsic>, _: ValidateTransactionPriority, - ) -> Self::ValidationFuture { - ready(self.validate_transaction_blocking(at, source, uxt)) + ) -> Result { + self.validate_transaction_blocking(at, source, uxt) } fn validate_transaction_blocking( @@ -506,13 +505,11 @@ impl ChainApi for TestApi { Self::hash_and_length_inner(ex) } - fn block_body(&self, hash: ::Hash) -> Self::BodyFuture { - futures::future::ready(Ok(self - .chain - .read() - .block_by_hash - .get(&hash) - .map(|b| b.extrinsics().to_vec()))) + async fn block_body( + &self, + hash: ::Hash, + ) -> Result>, Error> { + Ok(self.chain.read().block_by_hash.get(&hash).map(|b| b.extrinsics().to_vec())) } fn block_header(