diff --git a/Cargo.lock b/Cargo.lock index dc586e841b510..c439728dd3047 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1212,6 +1212,7 @@ dependencies = [ "futures", "futures-timer", "hyper 1.6.0", + "indexmap 2.10.0", "itertools 0.14.0", "jsonrpsee", "libsecp256k1", diff --git a/crates/anvil-polkadot/Cargo.toml b/crates/anvil-polkadot/Cargo.toml index 8cca5e23cc788..d9aeeb27c3dcd 100644 --- a/crates/anvil-polkadot/Cargo.toml +++ b/crates/anvil-polkadot/Cargo.toml @@ -126,6 +126,7 @@ itertools.workspace = true rand_08.workspace = true eyre.workspace = true lru = "0.16.0" +indexmap = "2.0" # cli clap = { version = "4", features = [ diff --git a/crates/anvil-polkadot/src/api_server/server.rs b/crates/anvil-polkadot/src/api_server/server.rs index b2caf86f20c2b..9f012ddd34dc9 100644 --- a/crates/anvil-polkadot/src/api_server/server.rs +++ b/crates/anvil-polkadot/src/api_server/server.rs @@ -15,7 +15,7 @@ use crate::{ in_mem_rpc::InMemoryRpcClient, mining_engine::MiningEngine, service::{ - BackendError, BackendWithOverlay, Client, Service, + BackendError, BackendWithOverlay, Client, Service, TransactionPoolHandle, storage::{ AccountType, ByteCodeType, CodeInfo, ContractInfo, ReviveAccountInfo, SystemAccountInfo, @@ -26,12 +26,13 @@ use crate::{ }; use alloy_eips::{BlockId, BlockNumberOrTag}; use alloy_primitives::{Address, B256, U64, U256}; -use alloy_rpc_types::{Filter, TransactionRequest, anvil::MineOptions}; +use alloy_rpc_types::{Filter, TransactionRequest, anvil::MineOptions, txpool::TxpoolStatus}; use alloy_serde::WithOtherFields; use anvil_core::eth::{EthRequest, Params as MineParams}; use anvil_rpc::response::ResponseResult; -use codec::{Decode, Encode}; +use codec::{Decode, DecodeLimit, Encode}; use futures::{StreamExt, channel::mpsc}; +use indexmap::IndexMap; use pallet_revive_eth_rpc::{ BlockInfoProvider, EthRpcError, ReceiptExtractor, ReceiptProvider, SubxtBlockInfoProvider, client::{Client as EthRpcClient, ClientError, SubscriptionType}, @@ -48,7 +49,7 @@ use polkadot_sdk::{ parachains_common::{AccountId, Hash, Nonce}, polkadot_sdk_frame::runtime::types_common::OpaqueBlock, sc_client_api::HeaderBackend, - sc_service::SpawnTaskHandle, + sc_service::{InPoolTransaction, SpawnTaskHandle, TransactionPool}, sp_api::{Metadata, ProvideRuntimeApi}, sp_arithmetic::Permill, sp_blockchain::Info, @@ -57,7 +58,7 @@ use polkadot_sdk::{ }; use sqlx::sqlite::SqlitePoolOptions; use std::{collections::HashSet, sync::Arc, time::Duration}; -use substrate_runtime::Balance; +use substrate_runtime::{Balance, RuntimeCall, UncheckedExtrinsic}; use subxt::{ Metadata as SubxtMetadata, OnlineClient, backend::rpc::RpcClient, client::RuntimeVersion as SubxtRuntimeVersion, config::substrate::H256, @@ -65,6 +66,7 @@ use subxt::{ }; pub const CLIENT_VERSION: &str = concat!("anvil-polkadot/v", env!("CARGO_PKG_VERSION")); +const MAX_EXTRINSIC_DEPTH: u32 = 256; pub struct Wallet { accounts: Vec, @@ -81,6 +83,7 @@ pub struct ApiServer { wallet: Wallet, snapshot_manager: SnapshotManager, impersonation_manager: ImpersonationManager, + tx_pool: Arc, } impl ApiServer { @@ -117,6 +120,7 @@ impl ApiServer { eth_rpc_client, snapshot_manager, impersonation_manager, + tx_pool: substrate_service.tx_pool.clone(), wallet: Wallet { accounts: vec![ Account::from(subxt_signer::eth::dev::baltathar()), @@ -283,6 +287,19 @@ impl ApiServer { node_info!("eth_getLogs"); self.get_logs(filter).await.to_rpc_result() } + //------- Transaction Pool --------- + EthRequest::TxPoolStatus(_) => { + node_info!("txpool_status"); + self.txpool_status().await.to_rpc_result() + } + EthRequest::DropAllTransactions() => { + node_info!("anvil_dropAllTransactions"); + self.anvil_drop_all_transactions().await.to_rpc_result() + } + EthRequest::DropTransaction(eth_hash) => { + node_info!("anvil_dropTransaction"); + self.anvil_drop_transaction(eth_hash).await.to_rpc_result() + } _ => Err::<(), _>(Error::RpcUnimplemented).to_rpc_result(), }; @@ -1031,6 +1048,82 @@ impl ApiServer { Ok(()) } + + /// Returns transaction pool status + async fn txpool_status(&self) -> Result { + let pool_status = self.tx_pool.status(); + Ok(TxpoolStatus { pending: pool_status.ready as u64, queued: pool_status.future as u64 }) + } + + /// Drop all transactions from pool + async fn anvil_drop_all_transactions(&self) -> Result<()> { + let ready_txs = self.tx_pool.ready(); + let future_txs = self.tx_pool.futures(); + + let mut invalid_txs = IndexMap::new(); + + for tx in ready_txs { + invalid_txs.insert(*tx.hash(), None); + } + + for tx in future_txs { + invalid_txs.insert(*tx.hash(), None); + } + + self.tx_pool.report_invalid(None, invalid_txs).await; + + Ok(()) + } + + /// Drop a specific transaction from the pool by its ETH hash + async fn anvil_drop_transaction(&self, eth_hash: B256) -> Result> { + // Search in ready transactions + for tx in self.tx_pool.ready() { + if transaction_matches_eth_hash(tx.data(), eth_hash) { + let mut invalid_txs = IndexMap::new(); + invalid_txs.insert(*tx.hash(), None); + self.tx_pool.report_invalid(None, invalid_txs).await; + return Ok(Some(eth_hash)); + } + } + + // Search in future transactions + for tx in self.tx_pool.futures() { + if transaction_matches_eth_hash(tx.data(), eth_hash) { + let mut invalid_txs = IndexMap::new(); + invalid_txs.insert(*tx.hash(), None); + self.tx_pool.report_invalid(None, invalid_txs).await; + return Ok(Some(eth_hash)); + } + } + + // Transaction not found + Ok(None) + } +} + +/// Helper function to check if transaction matches ETH hash +fn transaction_matches_eth_hash( + tx_data: &Arc, + target_eth_hash: B256, +) -> bool { + let encoded = tx_data.encode(); + let Ok(ext) = + UncheckedExtrinsic::decode_all_with_depth_limit(MAX_EXTRINSIC_DEPTH, &mut &encoded[..]) + else { + return false; + }; + + let polkadot_sdk::sp_runtime::generic::UncheckedExtrinsic { + function: RuntimeCall::Revive(polkadot_sdk::pallet_revive::Call::eth_transact { payload }), + .. + } = ext.0 + else { + return false; + }; + + let tx_eth_hash = keccak_256(&payload); + B256::from_slice(&tx_eth_hash) == target_eth_hash } fn new_contract_info(address: &Address, code_hash: H256, nonce: Nonce) -> ContractInfo { diff --git a/crates/anvil-polkadot/tests/it/main.rs b/crates/anvil-polkadot/tests/it/main.rs index 9123fb8cd08a8..c1a30bc847992 100644 --- a/crates/anvil-polkadot/tests/it/main.rs +++ b/crates/anvil-polkadot/tests/it/main.rs @@ -6,4 +6,5 @@ mod snapshot; mod standard_rpc; mod state_injector; mod time_machine; +mod txpool; mod utils; diff --git a/crates/anvil-polkadot/tests/it/txpool.rs b/crates/anvil-polkadot/tests/it/txpool.rs new file mode 100644 index 0000000000000..42c6d6cffe210 --- /dev/null +++ b/crates/anvil-polkadot/tests/it/txpool.rs @@ -0,0 +1,148 @@ +use crate::utils::{TestNode, unwrap_response}; +use alloy_primitives::{Address, B256, U256}; +use alloy_rpc_types::{TransactionRequest, txpool::TxpoolStatus}; +use anvil_core::eth::EthRequest; +use anvil_polkadot::{ + api_server::revive_conversions::ReviveAddress, + config::{AnvilNodeConfig, SubstrateNodeConfig}, +}; +use polkadot_sdk::pallet_revive::evm::Account; + +#[tokio::test(flavor = "multi_thread")] +async fn test_txpool_status() { + let anvil_node_config = AnvilNodeConfig::test_config(); + let substrate_node_config = SubstrateNodeConfig::new(&anvil_node_config); + let mut node = TestNode::new(anvil_node_config, substrate_node_config).await.unwrap(); + + let alith = Account::from(subxt_signer::eth::dev::alith()); + let alith_addr = Address::from(ReviveAddress::new(alith.address())); + let recipient_addr = Address::repeat_byte(0x42); + + let status: TxpoolStatus = + unwrap_response(node.eth_rpc(EthRequest::TxPoolStatus(())).await.unwrap()).unwrap(); + assert_eq!(status.pending, 0); + assert_eq!(status.queued, 0); + + for i in 0..3 { + let tx = TransactionRequest::default() + .from(alith_addr) + .to(recipient_addr) + .value(U256::from(1000 * (i + 1))) + .nonce(i); + node.send_transaction(tx, None).await.unwrap(); + } + + let status: TxpoolStatus = + unwrap_response(node.eth_rpc(EthRequest::TxPoolStatus(())).await.unwrap()).unwrap(); + assert_eq!(status.pending, 3); + assert_eq!(status.queued, 0); + + let tx_future = TransactionRequest::default() + .from(alith_addr) + .to(recipient_addr) + .value(U256::from(5000)) + .nonce(5); + node.send_transaction(tx_future, None).await.unwrap(); + + let status: TxpoolStatus = + unwrap_response(node.eth_rpc(EthRequest::TxPoolStatus(())).await.unwrap()).unwrap(); + assert_eq!(status.pending, 3); + assert_eq!(status.queued, 1); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_drop_transaction() { + let anvil_node_config = AnvilNodeConfig::test_config(); + let substrate_node_config = SubstrateNodeConfig::new(&anvil_node_config); + let mut node = TestNode::new(anvil_node_config, substrate_node_config).await.unwrap(); + + let alith = Account::from(subxt_signer::eth::dev::alith()); + let alith_addr = Address::from(ReviveAddress::new(alith.address())); + let recipient_addr = Address::repeat_byte(0x42); + + let tx1 = + TransactionRequest::default().from(alith_addr).to(recipient_addr).value(U256::from(1000)); + node.send_transaction(tx1, None).await.unwrap(); + + let tx2 = TransactionRequest::default() + .from(alith_addr) + .to(recipient_addr) + .value(U256::from(2000)) + .nonce(1); + let tx2_hash = node.send_transaction(tx2, None).await.unwrap(); + + let tx_future = TransactionRequest::default() + .from(alith_addr) + .to(recipient_addr) + .value(U256::from(5000)) + .nonce(5); + let tx_future_hash = node.send_transaction(tx_future, None).await.unwrap(); + + let status: TxpoolStatus = + unwrap_response(node.eth_rpc(EthRequest::TxPoolStatus(())).await.unwrap()).unwrap(); + assert_eq!(status.pending, 2); + assert_eq!(status.queued, 1); + + let tx2_hash_b256 = B256::from_slice(tx2_hash.0.as_ref()); + let dropped_hash = unwrap_response::>( + node.eth_rpc(EthRequest::DropTransaction(tx2_hash_b256)).await.unwrap(), + ) + .unwrap(); + assert_eq!(dropped_hash, Some(tx2_hash_b256)); + + let status: TxpoolStatus = + unwrap_response(node.eth_rpc(EthRequest::TxPoolStatus(())).await.unwrap()).unwrap(); + assert_eq!(status.pending, 1); + assert_eq!(status.queued, 1); + + let tx_future_hash_b256 = B256::from_slice(tx_future_hash.0.as_ref()); + let dropped_hash = unwrap_response::>( + node.eth_rpc(EthRequest::DropTransaction(tx_future_hash_b256)).await.unwrap(), + ) + .unwrap(); + assert_eq!(dropped_hash, Some(tx_future_hash_b256)); + + let status: TxpoolStatus = + unwrap_response(node.eth_rpc(EthRequest::TxPoolStatus(())).await.unwrap()).unwrap(); + assert_eq!(status.pending, 1); + assert_eq!(status.queued, 0); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_drop_all_transactions() { + let anvil_node_config = AnvilNodeConfig::test_config(); + let substrate_node_config = SubstrateNodeConfig::new(&anvil_node_config); + let mut node = TestNode::new(anvil_node_config, substrate_node_config).await.unwrap(); + + let alith = Account::from(subxt_signer::eth::dev::alith()); + let alith_addr = Address::from(ReviveAddress::new(alith.address())); + let recipient_addr = Address::repeat_byte(0x42); + + for i in 0..3 { + let tx = TransactionRequest::default() + .from(alith_addr) + .to(recipient_addr) + .value(U256::from(1000 * (i + 1))) + .nonce(i); + node.send_transaction(tx, None).await.unwrap(); + } + + let tx_future = TransactionRequest::default() + .from(alith_addr) + .to(recipient_addr) + .value(U256::from(5000)) + .nonce(5); + node.send_transaction(tx_future, None).await.unwrap(); + + let status: TxpoolStatus = + unwrap_response(node.eth_rpc(EthRequest::TxPoolStatus(())).await.unwrap()).unwrap(); + assert_eq!(status.pending, 3); + assert_eq!(status.queued, 1); + + unwrap_response::<()>(node.eth_rpc(EthRequest::DropAllTransactions()).await.unwrap()).unwrap(); + + let status: TxpoolStatus = + unwrap_response(node.eth_rpc(EthRequest::TxPoolStatus(())).await.unwrap()).unwrap(); + assert_eq!(status.pending, 0); + assert_eq!(status.queued, 0); +}