Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions prdoc/pr_8875.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
title: '`fatxpool`: `ChainApi` is now async'
doc:
- audience: Node Dev
description: Internal `ChainApi` is now `async_trait`, `validate_transaction` and `block_body` are now `async` methods. This is just cleanup - migrating from returning `Future` to `async` method'.
crates:
- name: sc-transaction-pool
bump: major
31 changes: 15 additions & 16 deletions substrate/client/transaction-pool/benches/basics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use criterion::{criterion_group, criterion_main, Criterion};

use async_trait::async_trait;
use codec::Encode;
use futures::{
executor::block_on,
future::{ready, Ready},
};
use criterion::{criterion_group, criterion_main, Criterion};
use futures::executor::block_on;
use sc_transaction_pool::*;
use sp_blockchain::HashAndNumber;
use sp_crypto_hashing::blake2_256;
Expand Down Expand Up @@ -55,31 +52,30 @@ fn to_tag(nonce: u64, from: AccountId) -> Tag {
data.to_vec()
}

#[async_trait]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DQ: With async traits stabilized, do we even need this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#[async_trait] also makes a future Send. I've just tried removing this and run into some lengthy errors which seems to be related to it.

impl ChainApi for TestApi {
type Block = Block;
type Error = sc_transaction_pool_api::error::Error;
type ValidationFuture = Ready<sc_transaction_pool_api::error::Result<TransactionValidity>>;
type BodyFuture = Ready<sc_transaction_pool_api::error::Result<Option<Vec<Extrinsic>>>>;

fn validate_transaction(
async fn validate_transaction(
&self,
at: <Self::Block as BlockT>::Hash,
_: TransactionSource,
uxt: Arc<<Self::Block as BlockT>::Extrinsic>,
_: ValidateTransactionPriority,
) -> Self::ValidationFuture {
) -> Result<TransactionValidity, Self::Error> {
let uxt = (*uxt).clone();
let transfer = TransferData::try_from(&uxt)
.expect("uxt is expected to be bench_call (carrying TransferData)");
let nonce = transfer.nonce;
let from = transfer.from;

match self.block_id_to_number(&BlockId::Hash(at)) {
Ok(Some(num)) if num > 5 => return ready(Ok(Err(InvalidTransaction::Stale.into()))),
Ok(Some(num)) if num > 5 => return Ok(Err(InvalidTransaction::Stale.into())),
_ => {},
}

ready(Ok(Ok(ValidTransaction {
Ok(Ok(ValidTransaction {
priority: 4,
requires: if nonce > 1 && self.nonce_dependant {
vec![to_tag(nonce - 1, from)]
Expand All @@ -89,15 +85,15 @@ impl ChainApi for TestApi {
provides: vec![to_tag(nonce, from)],
longevity: 10,
propagate: true,
})))
}))
}

fn validate_transaction_blocking(
&self,
_at: <Self::Block as BlockT>::Hash,
_source: TransactionSource,
_uxt: Arc<<Self::Block as BlockT>::Extrinsic>,
) -> sc_transaction_pool_api::error::Result<TransactionValidity> {
) -> Result<TransactionValidity, Self::Error> {
unimplemented!();
}

Expand Down Expand Up @@ -128,8 +124,11 @@ impl ChainApi for TestApi {
(blake2_256(&encoded).into(), encoded.len())
}

fn block_body(&self, _id: <Self::Block as BlockT>::Hash) -> Self::BodyFuture {
ready(Ok(None))
async fn block_body(
&self,
_id: <Self::Block as BlockT>::Hash,
) -> Result<Option<Vec<<Self::Block as BlockT>::Extrinsic>>, Self::Error> {
Ok(None)
}

fn block_header(
Expand Down
81 changes: 40 additions & 41 deletions substrate/client/transaction-pool/src/common/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ use crate::{
graph::ValidateTransactionPriority,
insert_and_log_throttled, LOG_TARGET, LOG_TARGET_STAT,
};
use async_trait::async_trait;
use codec::Encode;
use futures::future::{ready, Future, FutureExt, Ready};
use futures::future::{Future, FutureExt};
use prometheus_endpoint::Registry as PrometheusRegistry;
use sc_client_api::{blockchain::HeaderBackend, BlockBackend};
use sp_api::{ApiExt, ProvideRuntimeApi};
Expand Down Expand Up @@ -181,6 +182,7 @@ impl<Client, Block> FullChainApi<Client, Block> {
}
}

#[async_trait]
impl<Client, Block> graph::ChainApi for FullChainApi<Client, Block>
where
Block: BlockT,
Expand All @@ -194,21 +196,21 @@ where
{
type Block = Block;
type Error = error::Error;
type ValidationFuture =
Pin<Box<dyn Future<Output = error::Result<TransactionValidity>> + Send>>;
type BodyFuture = Ready<error::Result<Option<Vec<<Self::Block as BlockT>::Extrinsic>>>>;

fn block_body(&self, hash: Block::Hash) -> Self::BodyFuture {
ready(self.client.block_body(hash).map_err(error::Error::from))
async fn block_body(
&self,
hash: Block::Hash,
) -> Result<Option<Vec<<Self::Block as BlockT>::Extrinsic>>, Self::Error> {
self.client.block_body(hash).map_err(error::Error::from)
}

fn validate_transaction(
async fn validate_transaction(
&self,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
uxt: graph::ExtrinsicFor<Self>,
validation_priority: ValidateTransactionPriority,
) -> Self::ValidationFuture {
) -> Result<TransactionValidity, Self::Error> {
let start = Instant::now();
let (tx, rx) = oneshot::channel();
let client = self.client.clone();
Expand All @@ -228,39 +230,36 @@ where
};
let metrics = self.metrics.clone();

async move {
metrics.report(|m| m.validations_scheduled.inc());

{
validation_pool
.send(
async move {
let res = validate_transaction_blocking(&*client, at, source, uxt);
let _ = tx.send(res);
metrics.report(|m| m.validations_finished.inc());
}
.boxed(),
)
.await
.map_err(|e| Error::RuntimeApi(format!("Validation pool down: {:?}", e)))?;
}
metrics.report(|m| m.validations_scheduled.inc());

let validity = match rx.await {
Ok(r) => r,
Err(_) => Err(Error::RuntimeApi("Validation was canceled".into())),
};
{
validation_pool
.send(
async move {
let res = validate_transaction_blocking(&*client, at, source, uxt);
let _ = tx.send(res);
metrics.report(|m| m.validations_finished.inc());
}
.boxed(),
)
.await
.map_err(|e| Error::RuntimeApi(format!("Validation pool down: {:?}", e)))?;
}

insert_and_log_throttled!(
Level::DEBUG,
target:LOG_TARGET_STAT,
prefix:prefix,
stats,
start.elapsed().into()
);
let validity = match rx.await {
Ok(r) => r,
Err(_) => Err(Error::RuntimeApi("Validation was canceled".into())),
};

insert_and_log_throttled!(
Level::DEBUG,
target:LOG_TARGET_STAT,
prefix:prefix,
stats,
start.elapsed().into()
);

validity
}
.boxed()
validity
}

/// Validates a transaction by calling into the runtime.
Expand All @@ -271,21 +270,21 @@ where
at: Block::Hash,
source: TransactionSource,
uxt: graph::ExtrinsicFor<Self>,
) -> error::Result<TransactionValidity> {
) -> Result<TransactionValidity, Self::Error> {
validate_transaction_blocking(&*self.client, at, source, uxt)
}

fn block_id_to_number(
&self,
at: &BlockId<Self::Block>,
) -> error::Result<Option<graph::NumberFor<Self>>> {
) -> Result<Option<graph::NumberFor<Self>>, Self::Error> {
self.client.to_number(at).map_err(|e| Error::BlockIdConversion(e.to_string()))
}

fn block_id_to_hash(
&self,
at: &BlockId<Self::Block>,
) -> error::Result<Option<graph::BlockHash<Self>>> {
) -> Result<Option<graph::BlockHash<Self>>, Self::Error> {
self.client.to_hash(at).map_err(|e| Error::BlockIdConversion(e.to_string()))
}

Expand Down
19 changes: 11 additions & 8 deletions substrate/client/transaction-pool/src/common/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
graph::{BlockHash, ChainApi, ExtrinsicFor, NumberFor, RawExtrinsicFor},
ValidateTransactionPriority,
};
use async_trait::async_trait;
use codec::Encode;
use parking_lot::Mutex;
use sc_transaction_pool_api::error;
Expand Down Expand Up @@ -69,20 +70,19 @@ impl TestApi {
}
}

#[async_trait]
impl ChainApi for TestApi {
type Block = Block;
type Error = error::Error;
type ValidationFuture = futures::future::Ready<error::Result<TransactionValidity>>;
type BodyFuture = futures::future::Ready<error::Result<Option<Vec<Extrinsic>>>>;

/// Verify extrinsic at given block.
fn validate_transaction(
async fn validate_transaction(
&self,
at: <Self::Block as BlockT>::Hash,
_source: TransactionSource,
uxt: ExtrinsicFor<Self>,
_: ValidateTransactionPriority,
) -> Self::ValidationFuture {
) -> Result<TransactionValidity, Self::Error> {
let uxt = (*uxt).clone();
self.validation_requests.lock().push(uxt.clone());
let hash = self.hash_and_length(&uxt).0;
Expand Down Expand Up @@ -159,15 +159,15 @@ impl ChainApi for TestApi {
_ => unimplemented!(),
};

futures::future::ready(Ok(res))
Ok(res)
}

fn validate_transaction_blocking(
&self,
_at: <Self::Block as BlockT>::Hash,
_source: TransactionSource,
_uxt: Arc<<Self::Block as BlockT>::Extrinsic>,
) -> error::Result<TransactionValidity> {
) -> Result<TransactionValidity, Self::Error> {
unimplemented!();
}

Expand Down Expand Up @@ -202,8 +202,11 @@ impl ChainApi for TestApi {
(Hashing::hash(&encoded), len)
}

fn block_body(&self, _id: <Self::Block as BlockT>::Hash) -> Self::BodyFuture {
futures::future::ready(Ok(None))
async fn block_body(
&self,
_id: <Self::Block as BlockT>::Hash,
) -> Result<Option<Vec<<Self::Block as BlockT>::Extrinsic>>, Self::Error> {
Ok(None)
}

fn block_header(
Expand Down
20 changes: 9 additions & 11 deletions substrate/client/transaction-pool/src/graph/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::{common::tracing_log_xt::log_xt_trace, LOG_TARGET};
use futures::{channel::mpsc::Receiver, Future};
use async_trait::async_trait;
use futures::channel::mpsc::Receiver;
use indexmap::IndexMap;
use sc_transaction_pool_api::error;
use sp_blockchain::{HashAndNumber, TreeRoute};
Expand Down Expand Up @@ -74,27 +75,21 @@ pub enum ValidateTransactionPriority {
}

/// Concrete extrinsic validation and query logic.
#[async_trait]
pub trait ChainApi: Send + Sync {
/// Block type.
type Block: BlockT;
/// Error type.
type Error: From<error::Error> + error::IntoPoolError;
/// Validate transaction future.
type ValidationFuture: Future<Output = Result<TransactionValidity, Self::Error>> + Send + Unpin;
Comment thread
michalkucharczyk marked this conversation as resolved.
/// Body future (since block body might be remote)
type BodyFuture: Future<Output = Result<Option<Vec<<Self::Block as traits::Block>::Extrinsic>>, Self::Error>>
+ Unpin
+ Send
+ 'static;

/// Asynchronously verify extrinsic at given block.
fn validate_transaction(
async fn validate_transaction(
&self,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
uxt: ExtrinsicFor<Self>,
validation_priority: ValidateTransactionPriority,
) -> Self::ValidationFuture;
) -> Result<TransactionValidity, Self::Error>;

/// Synchronously verify given extrinsic at given block.
///
Expand Down Expand Up @@ -123,7 +118,10 @@ pub trait ChainApi: Send + Sync {
fn hash_and_length(&self, uxt: &RawExtrinsicFor<Self>) -> (ExtrinsicHash<Self>, usize);

/// Returns a block body given the block.
fn block_body(&self, at: <Self::Block as BlockT>::Hash) -> Self::BodyFuture;
async fn block_body(
&self,
at: <Self::Block as BlockT>::Hash,
) -> Result<Option<Vec<<Self::Block as traits::Block>::Extrinsic>>, Self::Error>;

/// Returns a block header given the block id.
fn block_header(
Expand Down
1 change: 1 addition & 0 deletions substrate/test-utils/runtime/transaction-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ workspace = true
targets = ["x86_64-unknown-linux-gnu"]

[dependencies]
async-trait = { workspace = true }
codec = { workspace = true, default-features = true }
futures = { workspace = true }
log = { workspace = true }
Expand Down
Loading