Skip to content

Commit 106024f

Browse files
feat(anvil-polkadot): add transaction pool RPCs 1/2 (#356)
* Implement Drop RPCs * tx pool status RPC * extract helper for ETH hash matching in transaction pool * Add basic tests * update drop_transaction return type and simplify helper * update tests * extend txpool tests to cover queued transactions
1 parent 90400aa commit 106024f

File tree

5 files changed

+249
-5
lines changed

5 files changed

+249
-5
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/anvil-polkadot/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ itertools.workspace = true
126126
rand_08.workspace = true
127127
eyre.workspace = true
128128
lru = "0.16.0"
129+
indexmap = "2.0"
129130

130131
# cli
131132
clap = { version = "4", features = [

crates/anvil-polkadot/src/api_server/server.rs

Lines changed: 98 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::{
1515
in_mem_rpc::InMemoryRpcClient,
1616
mining_engine::MiningEngine,
1717
service::{
18-
BackendError, BackendWithOverlay, Client, Service,
18+
BackendError, BackendWithOverlay, Client, Service, TransactionPoolHandle,
1919
storage::{
2020
AccountType, ByteCodeType, CodeInfo, ContractInfo, ReviveAccountInfo,
2121
SystemAccountInfo,
@@ -26,12 +26,13 @@ use crate::{
2626
};
2727
use alloy_eips::{BlockId, BlockNumberOrTag};
2828
use alloy_primitives::{Address, B256, U64, U256};
29-
use alloy_rpc_types::{Filter, TransactionRequest, anvil::MineOptions};
29+
use alloy_rpc_types::{Filter, TransactionRequest, anvil::MineOptions, txpool::TxpoolStatus};
3030
use alloy_serde::WithOtherFields;
3131
use anvil_core::eth::{EthRequest, Params as MineParams};
3232
use anvil_rpc::response::ResponseResult;
33-
use codec::{Decode, Encode};
33+
use codec::{Decode, DecodeLimit, Encode};
3434
use futures::{StreamExt, channel::mpsc};
35+
use indexmap::IndexMap;
3536
use pallet_revive_eth_rpc::{
3637
BlockInfoProvider, EthRpcError, ReceiptExtractor, ReceiptProvider, SubxtBlockInfoProvider,
3738
client::{Client as EthRpcClient, ClientError, SubscriptionType},
@@ -48,7 +49,7 @@ use polkadot_sdk::{
4849
parachains_common::{AccountId, Hash, Nonce},
4950
polkadot_sdk_frame::runtime::types_common::OpaqueBlock,
5051
sc_client_api::HeaderBackend,
51-
sc_service::SpawnTaskHandle,
52+
sc_service::{InPoolTransaction, SpawnTaskHandle, TransactionPool},
5253
sp_api::{Metadata, ProvideRuntimeApi},
5354
sp_arithmetic::Permill,
5455
sp_blockchain::Info,
@@ -57,14 +58,15 @@ use polkadot_sdk::{
5758
};
5859
use sqlx::sqlite::SqlitePoolOptions;
5960
use std::{collections::HashSet, sync::Arc, time::Duration};
60-
use substrate_runtime::Balance;
61+
use substrate_runtime::{Balance, RuntimeCall, UncheckedExtrinsic};
6162
use subxt::{
6263
Metadata as SubxtMetadata, OnlineClient, backend::rpc::RpcClient,
6364
client::RuntimeVersion as SubxtRuntimeVersion, config::substrate::H256,
6465
ext::subxt_rpcs::LegacyRpcMethods, utils::H160,
6566
};
6667

6768
pub const CLIENT_VERSION: &str = concat!("anvil-polkadot/v", env!("CARGO_PKG_VERSION"));
69+
const MAX_EXTRINSIC_DEPTH: u32 = 256;
6870

6971
pub struct Wallet {
7072
accounts: Vec<Account>,
@@ -81,6 +83,7 @@ pub struct ApiServer {
8183
wallet: Wallet,
8284
snapshot_manager: SnapshotManager,
8385
impersonation_manager: ImpersonationManager,
86+
tx_pool: Arc<TransactionPoolHandle>,
8487
}
8588

8689
impl ApiServer {
@@ -117,6 +120,7 @@ impl ApiServer {
117120
eth_rpc_client,
118121
snapshot_manager,
119122
impersonation_manager,
123+
tx_pool: substrate_service.tx_pool.clone(),
120124
wallet: Wallet {
121125
accounts: vec![
122126
Account::from(subxt_signer::eth::dev::baltathar()),
@@ -283,6 +287,19 @@ impl ApiServer {
283287
node_info!("eth_getLogs");
284288
self.get_logs(filter).await.to_rpc_result()
285289
}
290+
//------- Transaction Pool ---------
291+
EthRequest::TxPoolStatus(_) => {
292+
node_info!("txpool_status");
293+
self.txpool_status().await.to_rpc_result()
294+
}
295+
EthRequest::DropAllTransactions() => {
296+
node_info!("anvil_dropAllTransactions");
297+
self.anvil_drop_all_transactions().await.to_rpc_result()
298+
}
299+
EthRequest::DropTransaction(eth_hash) => {
300+
node_info!("anvil_dropTransaction");
301+
self.anvil_drop_transaction(eth_hash).await.to_rpc_result()
302+
}
286303
_ => Err::<(), _>(Error::RpcUnimplemented).to_rpc_result(),
287304
};
288305

@@ -1031,6 +1048,82 @@ impl ApiServer {
10311048

10321049
Ok(())
10331050
}
1051+
1052+
/// Returns transaction pool status
1053+
async fn txpool_status(&self) -> Result<TxpoolStatus> {
1054+
let pool_status = self.tx_pool.status();
1055+
Ok(TxpoolStatus { pending: pool_status.ready as u64, queued: pool_status.future as u64 })
1056+
}
1057+
1058+
/// Drop all transactions from pool
1059+
async fn anvil_drop_all_transactions(&self) -> Result<()> {
1060+
let ready_txs = self.tx_pool.ready();
1061+
let future_txs = self.tx_pool.futures();
1062+
1063+
let mut invalid_txs = IndexMap::new();
1064+
1065+
for tx in ready_txs {
1066+
invalid_txs.insert(*tx.hash(), None);
1067+
}
1068+
1069+
for tx in future_txs {
1070+
invalid_txs.insert(*tx.hash(), None);
1071+
}
1072+
1073+
self.tx_pool.report_invalid(None, invalid_txs).await;
1074+
1075+
Ok(())
1076+
}
1077+
1078+
/// Drop a specific transaction from the pool by its ETH hash
1079+
async fn anvil_drop_transaction(&self, eth_hash: B256) -> Result<Option<B256>> {
1080+
// Search in ready transactions
1081+
for tx in self.tx_pool.ready() {
1082+
if transaction_matches_eth_hash(tx.data(), eth_hash) {
1083+
let mut invalid_txs = IndexMap::new();
1084+
invalid_txs.insert(*tx.hash(), None);
1085+
self.tx_pool.report_invalid(None, invalid_txs).await;
1086+
return Ok(Some(eth_hash));
1087+
}
1088+
}
1089+
1090+
// Search in future transactions
1091+
for tx in self.tx_pool.futures() {
1092+
if transaction_matches_eth_hash(tx.data(), eth_hash) {
1093+
let mut invalid_txs = IndexMap::new();
1094+
invalid_txs.insert(*tx.hash(), None);
1095+
self.tx_pool.report_invalid(None, invalid_txs).await;
1096+
return Ok(Some(eth_hash));
1097+
}
1098+
}
1099+
1100+
// Transaction not found
1101+
Ok(None)
1102+
}
1103+
}
1104+
1105+
/// Helper function to check if transaction matches ETH hash
1106+
fn transaction_matches_eth_hash(
1107+
tx_data: &Arc<polkadot_sdk::sp_runtime::OpaqueExtrinsic>,
1108+
target_eth_hash: B256,
1109+
) -> bool {
1110+
let encoded = tx_data.encode();
1111+
let Ok(ext) =
1112+
UncheckedExtrinsic::decode_all_with_depth_limit(MAX_EXTRINSIC_DEPTH, &mut &encoded[..])
1113+
else {
1114+
return false;
1115+
};
1116+
1117+
let polkadot_sdk::sp_runtime::generic::UncheckedExtrinsic {
1118+
function: RuntimeCall::Revive(polkadot_sdk::pallet_revive::Call::eth_transact { payload }),
1119+
..
1120+
} = ext.0
1121+
else {
1122+
return false;
1123+
};
1124+
1125+
let tx_eth_hash = keccak_256(&payload);
1126+
B256::from_slice(&tx_eth_hash) == target_eth_hash
10341127
}
10351128

10361129
fn new_contract_info(address: &Address, code_hash: H256, nonce: Nonce) -> ContractInfo {

crates/anvil-polkadot/tests/it/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@ mod snapshot;
66
mod standard_rpc;
77
mod state_injector;
88
mod time_machine;
9+
mod txpool;
910
mod utils;
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
use crate::utils::{TestNode, unwrap_response};
2+
use alloy_primitives::{Address, B256, U256};
3+
use alloy_rpc_types::{TransactionRequest, txpool::TxpoolStatus};
4+
use anvil_core::eth::EthRequest;
5+
use anvil_polkadot::{
6+
api_server::revive_conversions::ReviveAddress,
7+
config::{AnvilNodeConfig, SubstrateNodeConfig},
8+
};
9+
use polkadot_sdk::pallet_revive::evm::Account;
10+
11+
#[tokio::test(flavor = "multi_thread")]
12+
async fn test_txpool_status() {
13+
let anvil_node_config = AnvilNodeConfig::test_config();
14+
let substrate_node_config = SubstrateNodeConfig::new(&anvil_node_config);
15+
let mut node = TestNode::new(anvil_node_config, substrate_node_config).await.unwrap();
16+
17+
let alith = Account::from(subxt_signer::eth::dev::alith());
18+
let alith_addr = Address::from(ReviveAddress::new(alith.address()));
19+
let recipient_addr = Address::repeat_byte(0x42);
20+
21+
let status: TxpoolStatus =
22+
unwrap_response(node.eth_rpc(EthRequest::TxPoolStatus(())).await.unwrap()).unwrap();
23+
assert_eq!(status.pending, 0);
24+
assert_eq!(status.queued, 0);
25+
26+
for i in 0..3 {
27+
let tx = TransactionRequest::default()
28+
.from(alith_addr)
29+
.to(recipient_addr)
30+
.value(U256::from(1000 * (i + 1)))
31+
.nonce(i);
32+
node.send_transaction(tx, None).await.unwrap();
33+
}
34+
35+
let status: TxpoolStatus =
36+
unwrap_response(node.eth_rpc(EthRequest::TxPoolStatus(())).await.unwrap()).unwrap();
37+
assert_eq!(status.pending, 3);
38+
assert_eq!(status.queued, 0);
39+
40+
let tx_future = TransactionRequest::default()
41+
.from(alith_addr)
42+
.to(recipient_addr)
43+
.value(U256::from(5000))
44+
.nonce(5);
45+
node.send_transaction(tx_future, None).await.unwrap();
46+
47+
let status: TxpoolStatus =
48+
unwrap_response(node.eth_rpc(EthRequest::TxPoolStatus(())).await.unwrap()).unwrap();
49+
assert_eq!(status.pending, 3);
50+
assert_eq!(status.queued, 1);
51+
}
52+
53+
#[tokio::test(flavor = "multi_thread")]
54+
async fn test_drop_transaction() {
55+
let anvil_node_config = AnvilNodeConfig::test_config();
56+
let substrate_node_config = SubstrateNodeConfig::new(&anvil_node_config);
57+
let mut node = TestNode::new(anvil_node_config, substrate_node_config).await.unwrap();
58+
59+
let alith = Account::from(subxt_signer::eth::dev::alith());
60+
let alith_addr = Address::from(ReviveAddress::new(alith.address()));
61+
let recipient_addr = Address::repeat_byte(0x42);
62+
63+
let tx1 =
64+
TransactionRequest::default().from(alith_addr).to(recipient_addr).value(U256::from(1000));
65+
node.send_transaction(tx1, None).await.unwrap();
66+
67+
let tx2 = TransactionRequest::default()
68+
.from(alith_addr)
69+
.to(recipient_addr)
70+
.value(U256::from(2000))
71+
.nonce(1);
72+
let tx2_hash = node.send_transaction(tx2, None).await.unwrap();
73+
74+
let tx_future = TransactionRequest::default()
75+
.from(alith_addr)
76+
.to(recipient_addr)
77+
.value(U256::from(5000))
78+
.nonce(5);
79+
let tx_future_hash = node.send_transaction(tx_future, None).await.unwrap();
80+
81+
let status: TxpoolStatus =
82+
unwrap_response(node.eth_rpc(EthRequest::TxPoolStatus(())).await.unwrap()).unwrap();
83+
assert_eq!(status.pending, 2);
84+
assert_eq!(status.queued, 1);
85+
86+
let tx2_hash_b256 = B256::from_slice(tx2_hash.0.as_ref());
87+
let dropped_hash = unwrap_response::<Option<B256>>(
88+
node.eth_rpc(EthRequest::DropTransaction(tx2_hash_b256)).await.unwrap(),
89+
)
90+
.unwrap();
91+
assert_eq!(dropped_hash, Some(tx2_hash_b256));
92+
93+
let status: TxpoolStatus =
94+
unwrap_response(node.eth_rpc(EthRequest::TxPoolStatus(())).await.unwrap()).unwrap();
95+
assert_eq!(status.pending, 1);
96+
assert_eq!(status.queued, 1);
97+
98+
let tx_future_hash_b256 = B256::from_slice(tx_future_hash.0.as_ref());
99+
let dropped_hash = unwrap_response::<Option<B256>>(
100+
node.eth_rpc(EthRequest::DropTransaction(tx_future_hash_b256)).await.unwrap(),
101+
)
102+
.unwrap();
103+
assert_eq!(dropped_hash, Some(tx_future_hash_b256));
104+
105+
let status: TxpoolStatus =
106+
unwrap_response(node.eth_rpc(EthRequest::TxPoolStatus(())).await.unwrap()).unwrap();
107+
assert_eq!(status.pending, 1);
108+
assert_eq!(status.queued, 0);
109+
}
110+
111+
#[tokio::test(flavor = "multi_thread")]
112+
async fn test_drop_all_transactions() {
113+
let anvil_node_config = AnvilNodeConfig::test_config();
114+
let substrate_node_config = SubstrateNodeConfig::new(&anvil_node_config);
115+
let mut node = TestNode::new(anvil_node_config, substrate_node_config).await.unwrap();
116+
117+
let alith = Account::from(subxt_signer::eth::dev::alith());
118+
let alith_addr = Address::from(ReviveAddress::new(alith.address()));
119+
let recipient_addr = Address::repeat_byte(0x42);
120+
121+
for i in 0..3 {
122+
let tx = TransactionRequest::default()
123+
.from(alith_addr)
124+
.to(recipient_addr)
125+
.value(U256::from(1000 * (i + 1)))
126+
.nonce(i);
127+
node.send_transaction(tx, None).await.unwrap();
128+
}
129+
130+
let tx_future = TransactionRequest::default()
131+
.from(alith_addr)
132+
.to(recipient_addr)
133+
.value(U256::from(5000))
134+
.nonce(5);
135+
node.send_transaction(tx_future, None).await.unwrap();
136+
137+
let status: TxpoolStatus =
138+
unwrap_response(node.eth_rpc(EthRequest::TxPoolStatus(())).await.unwrap()).unwrap();
139+
assert_eq!(status.pending, 3);
140+
assert_eq!(status.queued, 1);
141+
142+
unwrap_response::<()>(node.eth_rpc(EthRequest::DropAllTransactions()).await.unwrap()).unwrap();
143+
144+
let status: TxpoolStatus =
145+
unwrap_response(node.eth_rpc(EthRequest::TxPoolStatus(())).await.unwrap()).unwrap();
146+
assert_eq!(status.pending, 0);
147+
assert_eq!(status.queued, 0);
148+
}

0 commit comments

Comments
 (0)