diff --git a/Cargo.lock b/Cargo.lock index 86351ab2029ca..ce4762627994c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13383,6 +13383,7 @@ dependencies = [ "tempfile", "thiserror 1.0.65", "tokio", + "tokio-stream", ] [[package]] diff --git a/prdoc/pr_11081.prdoc b/prdoc/pr_11081.prdoc new file mode 100644 index 0000000000000..dfd693272e430 --- /dev/null +++ b/prdoc/pr_11081.prdoc @@ -0,0 +1,12 @@ +title: Implement `eth_subscribe` +doc: +- audience: Runtime Dev + description: |- + # Description + + Implemented `eth_subscribe` in the eth-rpc. The subscription kinds implemented is `newHeads` and `logs`. +crates: +- name: pallet-revive-eth-rpc + bump: minor +- name: pallet-revive + bump: minor diff --git a/substrate/frame/revive/rpc/Cargo.toml b/substrate/frame/revive/rpc/Cargo.toml index 87d3eaa32361b..c492e062bb05e 100644 --- a/substrate/frame/revive/rpc/Cargo.toml +++ b/substrate/frame/revive/rpc/Cargo.toml @@ -35,7 +35,10 @@ sc-cli = { workspace = true, default-features = true } sc-rpc = { workspace = true, default-features = true } sc-rpc-api = { workspace = true, default-features = true } sc-service = { workspace = true, default-features = true } -serde = { workspace = true, default-features = true, features = ["alloc", "derive"] } +serde = { workspace = true, default-features = true, features = [ + "alloc", + "derive", +] } serde_json = { workspace = true } sp-arithmetic = { workspace = true, default-features = true } sp-core = { workspace = true, default-features = true } @@ -45,10 +48,13 @@ sp-runtime = { workspace = true, default-features = true } sp-timestamp = { workspace = true } sp-weights = { workspace = true, default-features = true } sqlx = { workspace = true, features = ["macros", "runtime-tokio", "sqlite"] } -subxt = { workspace = true, default-features = true, features = ["reconnecting-rpc-client"] } +subxt = { workspace = true, default-features = true, features = [ + "reconnecting-rpc-client", +] } subxt-signer = { workspace = true, features = ["unstable-eth"] } thiserror = { workspace = true } tokio = { workspace = true, features = ["full"] } +tokio-stream = { workspace = true } [dev-dependencies] env_logger = { workspace = true } diff --git a/substrate/frame/revive/rpc/src/apis/execution_apis.rs b/substrate/frame/revive/rpc/src/apis/execution_apis.rs index 350b661d00312..54e2025749be2 100644 --- a/substrate/frame/revive/rpc/src/apis/execution_apis.rs +++ b/substrate/frame/revive/rpc/src/apis/execution_apis.rs @@ -186,4 +186,14 @@ pub trait EthRpc { newest_block: BlockNumberOrTag, reward_percentiles: Option>, ) -> RpcResult; + + /// Creates a subscription to specific events, returning a subscription ID. + /// Notifications are sent for each event matching the subscription via + /// `eth_subscription`. + #[subscription( + name = "eth_subscribe" => "eth_subscription", + unsubscribe = "eth_unsubscribe", + item = SubscriptionItem + )] + async fn eth_subscribe(&self, kind: SubscriptionKind, options: Option); } diff --git a/substrate/frame/revive/rpc/src/client.rs b/substrate/frame/revive/rpc/src/client.rs index 05e84d7daae02..4d5958e47c909 100644 --- a/substrate/frame/revive/rpc/src/client.rs +++ b/substrate/frame/revive/rpc/src/client.rs @@ -249,6 +249,11 @@ pub struct Client { block_notifier: Option>, /// A lock to ensure only one subscription can perform write operations at a time. subscription_lock: Arc>, + + /// Block subscription sender side. + block_subscription_tx: tokio::sync::broadcast::Sender, + /// Log subscription sender side. + log_subscription_tx: tokio::sync::broadcast::Sender, /// Whether archive mode is enabled is_archive: bool, /// Whether historic backfill has completed. `false` if not started or in progress. @@ -331,6 +336,8 @@ impl Client { block_notifier: automine .then(|| tokio::sync::broadcast::channel::(NOTIFIER_CAPACITY).0), subscription_lock: Arc::new(Mutex::new(())), + block_subscription_tx: tokio::sync::broadcast::channel(256).0, + log_subscription_tx: tokio::sync::broadcast::channel(1000).0, is_archive, backfill_complete: Arc::new(AtomicBool::new(false)), }; @@ -464,6 +471,24 @@ impl Client { }, _ => {}, } + + // Broadcast the best blocks + if let SubscriptionType::BestBlocks = subscription_type && + self.block_subscription_tx.receiver_count() > 0 + { + let _ = self.block_subscription_tx.send(evm_block); + } + + // Broadcast the logs, we require a finalized subscription for this so that all of the + // events we broadcast are finalized and not prone to reorgs. + if let SubscriptionType::FinalizedBlocks = subscription_type && + self.log_subscription_tx.receiver_count() > 0 + { + receipts.iter().flat_map(|receipt| receipt.logs.iter()).for_each(|log| { + let _ = self.log_subscription_tx.send(log.clone()); + }); + } + Ok(()) }) .await @@ -920,6 +945,16 @@ impl Client { pub async fn get_automine(&self) -> bool { get_automine(&self.rpc_client).await } + + /// Gets the block subscription rx side of the channel. + pub fn get_block_subscription_rx(&self) -> tokio::sync::broadcast::Receiver { + self.block_subscription_tx.subscribe() + } + + /// Gets the log subscription rx side of the channel. + pub fn get_log_subscription_rx(&self) -> tokio::sync::broadcast::Receiver { + self.log_subscription_tx.subscribe() + } } fn to_hex(bytes: impl AsRef<[u8]>) -> String { diff --git a/substrate/frame/revive/rpc/src/lib.rs b/substrate/frame/revive/rpc/src/lib.rs index 0e87fa0d739c5..ab35fc52dbd73 100644 --- a/substrate/frame/revive/rpc/src/lib.rs +++ b/substrate/frame/revive/rpc/src/lib.rs @@ -18,14 +18,18 @@ #![cfg_attr(docsrs, feature(doc_cfg))] use client::ClientError; +use futures::{Stream, StreamExt, TryStreamExt}; use jsonrpsee::{ + PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink, core::{RpcResult, async_trait}, types::{ErrorCode, ErrorObjectOwned}, }; use pallet_revive::evm::*; use sp_core::{H160, H256, U256, keccak_256}; use subxt::backend::legacy::rpc_methods::TransactionStatus; +use subxt_signer::bip39::core::pin::Pin; use thiserror::Error; +use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}; mod block_sync; pub(crate) use block_sync::{ChainMetadata, SyncLabel, SyncStateKey}; @@ -492,6 +496,41 @@ impl EthRpcServer for EthRpcServerImpl { let result = self.client.fee_history(block_count, newest_block, reward_percentiles).await?; Ok(result) } + + async fn eth_subscribe( + &self, + pending: PendingSubscriptionSink, + kind: SubscriptionKind, + options: Option, + ) { + let Some(subscription_parameters) = SubscriptionParameters::new(kind, options) else { + return pending + .reject(ErrorObjectOwned::owned( + jsonrpsee::types::error::INVALID_PARAMS_CODE, + "Invalid subscription parameters", + None::<()>, + )) + .await; + }; + let Ok(sink) = pending.accept().await else { + return; + }; + + let stream: Pin< + Box> + Send>, + > = match subscription_parameters { + SubscriptionParameters::NewBlockHeaders => Box::pin( + BroadcastStream::new(self.client.get_block_subscription_rx()) + .map_ok(|block| SubscriptionItem::BlockHeader(BlockHeader::from(block))), + ) as _, + SubscriptionParameters::Logs(filter) => Box::pin( + BroadcastStream::new(self.client.get_log_subscription_rx()) + .try_filter(move |log| futures::future::ready(filter.matches(log))) + .map_ok(SubscriptionItem::Log), + ) as _, + }; + let _ = tokio::spawn(Self::handle_subscription_forwarding(sink, stream)); + } } impl EthRpcServerImpl { @@ -516,4 +555,38 @@ impl EthRpcServerImpl { Ok(Some(TransactionInfo::new(&receipt, signed_tx))) } + + async fn handle_subscription_forwarding( + sink: SubscriptionSink, + mut stream: Pin< + Box> + Send>, + >, + ) { + loop { + tokio::select! { + _ = sink.closed() => break, + item = stream.next() => { + match item { + // Stream ended. + None => break, + // Send the item to the subscriber. + Some(Ok(sub_item)) => { + let msg = SubscriptionMessage::from_json(&sub_item) + .expect("SubscriptionItem is serializable; qed"); + if sink.send(msg).await.is_err() { + break; + } + }, + // Broadcast receiver lagged behind — missed messages. + Some(Err(BroadcastStreamRecvError::Lagged(count))) => { + log::warn!( + target: LOG_TARGET, + "Subscription lagged, skipped {count} messages" + ); + }, + } + } + } + } + } } diff --git a/substrate/frame/revive/rpc/src/tests.rs b/substrate/frame/revive/rpc/src/tests.rs index a53af72df3b31..8060da3dc56bb 100644 --- a/substrate/frame/revive/rpc/src/tests.rs +++ b/substrate/frame/revive/rpc/src/tests.rs @@ -37,10 +37,13 @@ use jsonrpsee::{ use pallet_revive::{ create1, evm::{ - Account, Block, BlockNumberOrTag, BlockNumberOrTagOrHash, BlockTag, GenericTransaction, - H256, HashesOrTransactionInfos, TransactionInfo, TransactionUnsigned, U256, + Account, Block, BlockHeader, BlockNumberOrTag, BlockNumberOrTagOrHash, BlockTag, + BoundedOneOrMany, Filter, FilterResults, GenericTransaction, H256, + HashesOrTransactionInfos, Log, SubscriptionItem, SubscriptionKind, SubscriptionOptions, + TransactionInfo, TransactionUnsigned, U256, }, }; +use sp_runtime::BoundedVec; use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; use std::{sync::Arc, thread}; use subxt::{ @@ -325,6 +328,15 @@ async fn run_all_eth_rpc_tests_inner() -> anyhow::Result<()> { test_multiple_transactions_in_block, test_mixed_evm_substrate_transactions, test_runtime_pallets_address_upload_code, + test_subscribe_new_heads, + test_subscribe_new_heads_multiple_blocks, + test_subscribe_logs, + test_subscribe_logs_with_address_filter, + test_subscribe_logs_with_topic_filter, + test_subscribe_logs_address_filter_excludes_non_matching, + test_subscribe_logs_with_multiple_addresses_filter, + test_subscribe_logs_no_event_transaction_ignored, + test_subscribe_with_invalid_params_rejected, test_estimate_gas_of_contract_with_consume_all_gas, test_gas_estimation_for_contract_requiring_binary_search, test_gas_estimation_with_no_funds_no_gas_specified, @@ -853,6 +865,529 @@ async fn test_runtime_pallets_address_upload_code() -> anyhow::Result<()> { Ok(()) } +/// Verify that subscribing to `newHeads` delivers a block header matching the +/// corresponding block fetched via `eth_getBlockByNumber` after a transaction +/// triggers a new block. +async fn test_subscribe_new_heads() -> anyhow::Result<()> { + // Arrange + let client = Arc::new(SharedResources::client().await); + let ethan = Account::from(subxt_signer::eth::dev::ethan()); + let value = U256::from(1_000_000_000_000u128); + + let mut sub = client.eth_subscribe(SubscriptionKind::NewBlockHeaders, None).await?; + + // Act + let tx = TransactionBuilder::new(client.clone()) + .value(value) + .to(ethan.address()) + .send() + .await?; + tx.wait_for_receipt().await?; + + let notification = tokio::time::timeout(tokio::time::Duration::from_secs(10), sub.next()) + .await + .expect("Timed out waiting for newHeads notification") + .expect("Subscription stream ended unexpectedly") + .expect("Subscription returned an error"); + + let header = match notification { + SubscriptionItem::BlockHeader(header) => header, + other => panic!("Expected BlockHeader, got: {other:?}"), + }; + + let block = client + .get_block_by_number(BlockNumberOrTag::U256(header.number), false) + .await? + .expect("Block should exist"); + + // Assert + assert!(header.number > U256::zero(), "Block number should be > 0"); + assert_ne!(header.hash, H256::zero(), "Block hash should not be zero"); + assert_ne!(header.parent_hash, H256::zero(), "Parent hash should not be zero"); + + let expected_header = BlockHeader::from(block); + assert_eq!( + header, expected_header, + "Subscription header should match the block header from RPC" + ); + + drop(sub); + + Ok(()) +} + +/// Verify that subscribing to `logs` delivers a log matching the corresponding +/// log fetched via `eth_getLogs` after a contract emits an event. +async fn test_subscribe_logs() -> anyhow::Result<()> { + // Arrange + let client = Arc::new(SharedResources::client().await); + let account = Account::default(); + + let (bytes, _) = pallet_revive_fixtures::compile_module_with_type( + "SimpleReceiver", + pallet_revive_fixtures::FixtureType::Solc, + )?; + let nonce = client.get_transaction_count(account.address(), BlockTag::Latest.into()).await?; + let tx = TransactionBuilder::new(client.clone()).input(bytes.to_vec()).send().await?; + let receipt = tx.wait_for_receipt().await?; + let contract_address = create1(&account.address(), nonce.try_into().unwrap()); + assert_eq!(Some(contract_address), receipt.contract_address); + + let mut sub = client.eth_subscribe(SubscriptionKind::Logs, None).await?; + + // Act + let value = U256::from(1_000_000_000_000u128); + let tx = TransactionBuilder::new(client.clone()) + .value(value) + .to(contract_address) + .send() + .await?; + let call_receipt = tx.wait_for_receipt().await?; + + let notification = tokio::time::timeout(tokio::time::Duration::from_secs(10), sub.next()) + .await + .expect("Timed out waiting for logs notification") + .expect("Subscription stream ended unexpectedly") + .expect("Subscription returned an error"); + + let log = match notification { + SubscriptionItem::Log(log) => log, + other => panic!("Expected Log, got: {other:?}"), + }; + + let filter = Filter { block_hash: Some(call_receipt.block_hash), ..Default::default() }; + let rpc_logs = client.get_logs(Some(filter)).await?; + let rpc_logs: Vec = match rpc_logs { + FilterResults::Logs(logs) => logs, + other => panic!("Expected Logs from eth_getLogs, got: {other:?}"), + }; + + // Assert + let event_signature = H256(sp_io::hashing::keccak_256(b"Received(address,uint256)")); + assert_eq!(log.address, contract_address, "Log address should be the contract address"); + assert!(!log.topics.is_empty(), "Log should have at least one topic"); + assert_eq!(log.topics[0], event_signature, "First topic should be the event signature hash"); + assert_eq!( + log.block_hash, call_receipt.block_hash, + "Log block hash should match receipt block hash" + ); + assert_eq!( + log.transaction_hash, call_receipt.transaction_hash, + "Log tx hash should match receipt tx hash" + ); + assert!(rpc_logs.contains(&log), "Subscription log should match eth_getLogs result"); + + drop(sub); + Ok(()) +} + +/// Verify that subscribing to `logs` with an address filter only delivers logs +/// emitted from the specified contract address. +async fn test_subscribe_logs_with_address_filter() -> anyhow::Result<()> { + // Arrange + let client = Arc::new(SharedResources::client().await); + let account = Account::default(); + + let (bytes, _) = pallet_revive_fixtures::compile_module_with_type( + "SimpleReceiver", + pallet_revive_fixtures::FixtureType::Solc, + )?; + let nonce = client.get_transaction_count(account.address(), BlockTag::Latest.into()).await?; + let tx = TransactionBuilder::new(client.clone()).input(bytes.to_vec()).send().await?; + let receipt = tx.wait_for_receipt().await?; + let contract_address = create1(&account.address(), nonce.try_into().unwrap()); + assert_eq!(Some(contract_address), receipt.contract_address); + + let options = SubscriptionOptions::LogsOptions { + address: Some(BoundedOneOrMany::One(contract_address)), + topics: None, + }; + let mut sub = client.eth_subscribe(SubscriptionKind::Logs, Some(options)).await?; + + // Act + let value = U256::from(1_000_000_000_000u128); + let tx = TransactionBuilder::new(client.clone()) + .value(value) + .to(contract_address) + .send() + .await?; + tx.wait_for_receipt().await?; + + let notification = tokio::time::timeout(tokio::time::Duration::from_secs(10), sub.next()) + .await + .expect("Timed out waiting for logs notification") + .expect("Subscription stream ended unexpectedly") + .expect("Subscription returned an error"); + + let log = match notification { + SubscriptionItem::Log(log) => log, + other => panic!("Expected Log, got: {other:?}"), + }; + + // Assert + assert_eq!(log.address, contract_address, "Log address should match the filter address"); + + drop(sub); + Ok(()) +} + +/// Verify that subscribing to `logs` with a topic filter delivers logs whose +/// first topic matches the computed event signature hash. +async fn test_subscribe_logs_with_topic_filter() -> anyhow::Result<()> { + // Arrange + let client = Arc::new(SharedResources::client().await); + let account = Account::default(); + + let (bytes, _) = pallet_revive_fixtures::compile_module_with_type( + "SimpleReceiver", + pallet_revive_fixtures::FixtureType::Solc, + )?; + let nonce = client.get_transaction_count(account.address(), BlockTag::Latest.into()).await?; + let tx = TransactionBuilder::new(client.clone()).input(bytes.to_vec()).send().await?; + let receipt = tx.wait_for_receipt().await?; + let contract_address = create1(&account.address(), nonce.try_into().unwrap()); + assert_eq!(Some(contract_address), receipt.contract_address); + + let event_signature = H256(sp_io::hashing::keccak_256(b"Received(address,uint256)")); + let options = SubscriptionOptions::LogsOptions { + address: None, + topics: Some( + BoundedVec::try_from(vec![Some(BoundedOneOrMany::One(event_signature))]) + .expect("Single topic filter is within bounds"), + ), + }; + let mut sub = client.eth_subscribe(SubscriptionKind::Logs, Some(options)).await?; + + // Act + let value = U256::from(1_000_000_000_000u128); + let tx = TransactionBuilder::new(client.clone()) + .value(value) + .to(contract_address) + .send() + .await?; + tx.wait_for_receipt().await?; + + let notification = tokio::time::timeout(tokio::time::Duration::from_secs(10), sub.next()) + .await + .expect("Timed out waiting for logs notification") + .expect("Subscription stream ended unexpectedly") + .expect("Subscription returned an error"); + + let log = match notification { + SubscriptionItem::Log(log) => log, + other => panic!("Expected Log, got: {other:?}"), + }; + + // Assert + assert_eq!( + log.topics[0], event_signature, + "First topic should match the computed event signature" + ); + assert_eq!(log.address, contract_address, "Log should come from the deployed contract"); + + drop(sub); + Ok(()) +} + +/// Verify that sending two sequential transactions yields two `newHeads` +/// notifications whose block numbers are increasing and whose parent hashes +/// chain correctly (the second header's `parent_hash` equals the first +/// header's `hash`). +async fn test_subscribe_new_heads_multiple_blocks() -> anyhow::Result<()> { + // Arrange + let client = Arc::new(SharedResources::client().await); + let ethan = Account::from(subxt_signer::eth::dev::ethan()); + let value = U256::from(1_000_000_000_000u128); + + let mut sub = client.eth_subscribe(SubscriptionKind::NewBlockHeaders, None).await?; + + // Act + let tx1 = TransactionBuilder::new(client.clone()) + .value(value) + .to(ethan.address()) + .send() + .await?; + tx1.wait_for_receipt().await?; + + let tx2 = TransactionBuilder::new(client.clone()) + .value(value) + .to(ethan.address()) + .send() + .await?; + tx2.wait_for_receipt().await?; + + let header1 = match tokio::time::timeout(tokio::time::Duration::from_secs(10), sub.next()) + .await + .expect("Timed out waiting for first newHeads notification") + .expect("Subscription stream ended unexpectedly") + .expect("Subscription returned an error") + { + SubscriptionItem::BlockHeader(h) => h, + other => panic!("Expected BlockHeader, got: {other:?}"), + }; + + let header2 = match tokio::time::timeout(tokio::time::Duration::from_secs(10), sub.next()) + .await + .expect("Timed out waiting for second newHeads notification") + .expect("Subscription stream ended unexpectedly") + .expect("Subscription returned an error") + { + SubscriptionItem::BlockHeader(h) => h, + other => panic!("Expected BlockHeader, got: {other:?}"), + }; + + // Assert + assert!( + header2.number > header1.number, + "Second block number ({}) should be greater than first ({})", + header2.number, + header1.number, + ); + assert_eq!( + header2.parent_hash, header1.hash, + "Second header's parent_hash should equal first header's hash" + ); + + drop(sub); + Ok(()) +} + +/// Verify that a `logs` subscription with an address filter does NOT deliver +/// logs emitted by a different contract. Two `SimpleReceiver` contracts are +/// deployed. The subscription is filtered to contract A's address. An event +/// is triggered on contract B first, then on contract A. The first +/// notification received must be from contract A, proving B's log was +/// correctly filtered out. +async fn test_subscribe_logs_address_filter_excludes_non_matching() -> anyhow::Result<()> { + // Arrange + let client = Arc::new(SharedResources::client().await); + let account = Account::default(); + + let (bytes, _) = pallet_revive_fixtures::compile_module_with_type( + "SimpleReceiver", + pallet_revive_fixtures::FixtureType::Solc, + )?; + + let nonce_a = client.get_transaction_count(account.address(), BlockTag::Latest.into()).await?; + let tx_a = TransactionBuilder::new(client.clone()).input(bytes.to_vec()).send().await?; + let receipt_a = tx_a.wait_for_receipt().await?; + let contract_a = create1(&account.address(), nonce_a.try_into().unwrap()); + assert_eq!(Some(contract_a), receipt_a.contract_address); + + let nonce_b = client.get_transaction_count(account.address(), BlockTag::Latest.into()).await?; + let tx_b = TransactionBuilder::new(client.clone()).input(bytes.to_vec()).send().await?; + let receipt_b = tx_b.wait_for_receipt().await?; + let contract_b = create1(&account.address(), nonce_b.try_into().unwrap()); + assert_eq!(Some(contract_b), receipt_b.contract_address); + assert_ne!(contract_a, contract_b, "The two contracts must have different addresses"); + + let options = SubscriptionOptions::LogsOptions { + address: Some(BoundedOneOrMany::One(contract_a)), + topics: None, + }; + let mut sub = client.eth_subscribe(SubscriptionKind::Logs, Some(options)).await?; + + // Act + let value = U256::from(1_000_000_000_000u128); + let tx_b_call = TransactionBuilder::new(client.clone()) + .value(value) + .to(contract_b) + .send() + .await?; + tx_b_call.wait_for_receipt().await?; + + let tx_a_call = TransactionBuilder::new(client.clone()) + .value(value) + .to(contract_a) + .send() + .await?; + tx_a_call.wait_for_receipt().await?; + + let notification = tokio::time::timeout(tokio::time::Duration::from_secs(10), sub.next()) + .await + .expect("Timed out waiting for logs notification") + .expect("Subscription stream ended unexpectedly") + .expect("Subscription returned an error"); + + let log = match notification { + SubscriptionItem::Log(log) => log, + other => panic!("Expected Log, got: {other:?}"), + }; + + // Assert + assert_eq!(log.address, contract_a, "Log must come from contract A, not contract B"); + assert_ne!(log.address, contract_b, "Log should NOT come from contract B"); + + drop(sub); + Ok(()) +} + +/// Verify that a `logs` subscription with a multiple-address filter (OR +/// semantics) delivers logs from both specified contracts. Two +/// `SimpleReceiver` contracts are deployed and the subscription filter +/// includes both addresses. An event is triggered on each contract and +/// both logs must be received. +async fn test_subscribe_logs_with_multiple_addresses_filter() -> anyhow::Result<()> { + // Arrange + let client = Arc::new(SharedResources::client().await); + let account = Account::default(); + + let (bytes, _) = pallet_revive_fixtures::compile_module_with_type( + "SimpleReceiver", + pallet_revive_fixtures::FixtureType::Solc, + )?; + + let nonce_a = client.get_transaction_count(account.address(), BlockTag::Latest.into()).await?; + let tx_a = TransactionBuilder::new(client.clone()).input(bytes.to_vec()).send().await?; + let receipt_a = tx_a.wait_for_receipt().await?; + let contract_a = create1(&account.address(), nonce_a.try_into().unwrap()); + assert_eq!(Some(contract_a), receipt_a.contract_address); + + let nonce_b = client.get_transaction_count(account.address(), BlockTag::Latest.into()).await?; + let tx_b = TransactionBuilder::new(client.clone()).input(bytes.to_vec()).send().await?; + let receipt_b = tx_b.wait_for_receipt().await?; + let contract_b = create1(&account.address(), nonce_b.try_into().unwrap()); + assert_eq!(Some(contract_b), receipt_b.contract_address); + + let options = SubscriptionOptions::LogsOptions { + address: Some(BoundedOneOrMany::Many( + BoundedVec::try_from(vec![contract_a, contract_b]) + .expect("Two addresses is within bounds"), + )), + topics: None, + }; + let mut sub = client.eth_subscribe(SubscriptionKind::Logs, Some(options)).await?; + + // Act + let value = U256::from(1_000_000_000_000u128); + let tx_a_call = TransactionBuilder::new(client.clone()) + .value(value) + .to(contract_a) + .send() + .await?; + tx_a_call.wait_for_receipt().await?; + + let tx_b_call = TransactionBuilder::new(client.clone()) + .value(value) + .to(contract_b) + .send() + .await?; + tx_b_call.wait_for_receipt().await?; + + let log1 = match tokio::time::timeout(tokio::time::Duration::from_secs(10), sub.next()) + .await + .expect("Timed out waiting for first log notification") + .expect("Subscription stream ended unexpectedly") + .expect("Subscription returned an error") + { + SubscriptionItem::Log(log) => log, + other => panic!("Expected Log, got: {other:?}"), + }; + + let log2 = match tokio::time::timeout(tokio::time::Duration::from_secs(10), sub.next()) + .await + .expect("Timed out waiting for second log notification") + .expect("Subscription stream ended unexpectedly") + .expect("Subscription returned an error") + { + SubscriptionItem::Log(log) => log, + other => panic!("Expected Log, got: {other:?}"), + }; + + // Assert + let mut received_addresses = vec![log1.address, log2.address]; + received_addresses.sort(); + let mut expected_addresses = vec![contract_a, contract_b]; + expected_addresses.sort(); + assert_eq!(received_addresses, expected_addresses, "Should receive one log from each contract"); + + drop(sub); + Ok(()) +} + +/// Verify that a plain ETH transfer between EOAs (which emits no events) +/// does not produce a log subscription notification. The subscription must +/// only deliver the log triggered by the subsequent contract call. +async fn test_subscribe_logs_no_event_transaction_ignored() -> anyhow::Result<()> { + // Arrange + let client = Arc::new(SharedResources::client().await); + let account = Account::default(); + let ethan = Account::from(subxt_signer::eth::dev::ethan()); + + let (bytes, _) = pallet_revive_fixtures::compile_module_with_type( + "SimpleReceiver", + pallet_revive_fixtures::FixtureType::Solc, + )?; + let nonce = client.get_transaction_count(account.address(), BlockTag::Latest.into()).await?; + let tx = TransactionBuilder::new(client.clone()).input(bytes.to_vec()).send().await?; + let receipt = tx.wait_for_receipt().await?; + let contract_address = create1(&account.address(), nonce.try_into().unwrap()); + assert_eq!(Some(contract_address), receipt.contract_address); + + let mut sub = client.eth_subscribe(SubscriptionKind::Logs, None).await?; + + // Act + let value = U256::from(1_000_000_000_000u128); + let plain_tx = TransactionBuilder::new(client.clone()) + .value(value) + .to(ethan.address()) + .send() + .await?; + plain_tx.wait_for_receipt().await?; + + let contract_tx = TransactionBuilder::new(client.clone()) + .value(value) + .to(contract_address) + .send() + .await?; + contract_tx.wait_for_receipt().await?; + + let notification = tokio::time::timeout(tokio::time::Duration::from_secs(10), sub.next()) + .await + .expect("Timed out waiting for log notification") + .expect("Subscription stream ended unexpectedly") + .expect("Subscription returned an error"); + + let log = match notification { + SubscriptionItem::Log(log) => log, + other => panic!("Expected Log, got: {other:?}"), + }; + + // Assert + assert_eq!( + log.address, contract_address, + "First log notification must come from the contract call, not the plain transfer" + ); + assert_eq!( + log.transaction_hash, + contract_tx.hash(), + "Log transaction hash must match the contract call, not the plain transfer" + ); + + drop(sub); + Ok(()) +} + +/// Verify that calling `eth_subscribe("newHeads")` with `LogsOptions` +/// returns an error, since `newHeads` does not accept filter options. +async fn test_subscribe_with_invalid_params_rejected() -> anyhow::Result<()> { + // Arrange + let client = Arc::new(SharedResources::client().await); + + let options = SubscriptionOptions::LogsOptions { + address: Some(BoundedOneOrMany::One(Account::default().address())), + topics: None, + }; + + // Act + let result = client.eth_subscribe(SubscriptionKind::NewBlockHeaders, Some(options)).await; + + // Assert + assert!(result.is_err(), "newHeads with LogsOptions should be rejected"); + + Ok(()) +} + async fn test_estimate_gas_of_contract_with_consume_all_gas() -> anyhow::Result<()> { // Arrange let code = pallet_revive_fixtures::compile_module_with_type( diff --git a/substrate/frame/revive/src/evm/api/rpc_types_gen.rs b/substrate/frame/revive/src/evm/api/rpc_types_gen.rs index 3c1298b67d258..f6251b8b95d81 100644 --- a/substrate/frame/revive/src/evm/api/rpc_types_gen.rs +++ b/substrate/frame/revive/src/evm/api/rpc_types_gen.rs @@ -18,12 +18,14 @@ #![allow(missing_docs)] use super::{TypeEip1559, TypeEip2930, TypeEip4844, TypeEip7702, TypeLegacy, byte::*}; -use alloc::vec::Vec; +use alloc::{boxed::Box, collections::BTreeSet, vec::Vec}; use codec::{Decode, DecodeWithMemTracking, Encode}; use derive_more::{From, TryInto}; pub use ethereum_types::*; use scale_info::TypeInfo; use serde::{Deserialize, Deserializer, Serialize, de::Error}; +use sp_core::ConstU32; +use sp_runtime::BoundedVec; /// Input of a `GenericTransaction` #[derive( @@ -140,6 +142,61 @@ pub struct Block { pub withdrawals_root: H256, } +/// Block header object returned by `newHeads` subscriptions. +#[derive(Debug, Default, Clone, Serialize, Deserialize, Eq, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct BlockHeader { + /// Number + pub number: U256, + /// Hash + pub hash: H256, + /// Parent block hash + pub parent_hash: H256, + /// Nonce + pub nonce: Bytes8, + /// Ommers hash + pub sha_3_uncles: H256, + /// Bloom filter + pub logs_bloom: Bytes256, + /// Transactions root + pub transactions_root: H256, + /// State root + pub state_root: H256, + /// Receipts root + pub receipts_root: H256, + /// Coinbase + pub miner: Address, + /// Extra data + pub extra_data: Bytes, + /// Gas limit + pub gas_limit: U256, + /// Gas used + pub gas_used: U256, + /// Timestamp + pub timestamp: U256, +} + +impl From for BlockHeader { + fn from(block: Block) -> Self { + Self { + number: block.number, + hash: block.hash, + parent_hash: block.parent_hash, + nonce: block.nonce, + sha_3_uncles: block.sha_3_uncles, + logs_bloom: block.logs_bloom, + transactions_root: block.transactions_root, + state_root: block.state_root, + receipts_root: block.receipts_root, + miner: block.miner, + extra_data: block.extra_data, + gas_limit: block.gas_limit, + gas_used: block.gas_used, + timestamp: block.timestamp, + } + } +} + /// Block number or tag #[derive(Debug, Copy, Clone, Serialize, Deserialize, From, TryInto, Eq, PartialEq)] #[serde(untagged)] @@ -1106,6 +1163,164 @@ pub struct FeeHistoryResult { pub reward: Vec>, } +/// The kind of subscription the user is requesting from the eth-rpc. +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)] +#[serde(rename_all = "camelCase")] +pub enum SubscriptionKind { + NewBlockHeaders, + Logs, +} + +/// Options passed by the user for their subscription to make it more specific. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(untagged)] +pub enum SubscriptionOptions { + /// Options passed when subscribing for logs. + LogsOptions { + /// An optional address to use to filter the logs. + /// + /// If specified, then only logs where this address is the emitter will be returned in the + /// subscription. If not specified, then it means that there's no filtering based on the + /// address of the emitter. + /// + /// If it's specified as a vector of addresses then all of the addresses specified in the + /// vector pass the filter. + #[serde(default, skip_serializing_if = "Option::is_none")] + address: Option>, + + /// An optional set of topics to filter the logs by. + /// + /// If not specified, then logs with any topic would match the filter. If specified, then + /// only logs which match the specified topics pass the filter. + #[serde(default, skip_serializing_if = "Option::is_none")] + topics: Option>, ConstU32<4>>>, + }, +} + +/// A type used as a filter for logs in subscriptions. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct LogsSubscriptionFilter { + /// Defines if the filter is configured to make use of addresses or not. + addresses: Option>, + + /// Defines if the filter is configured to filter based on the topics. + topics: Option<[Option>; 4]>, +} + +impl LogsSubscriptionFilter { + /// Constructs a new logs filter. + pub fn new( + address: Option>, + topics: Option>, ConstU32<4>>>, + ) -> Self { + Self { + addresses: address.map(|addresses| addresses.into_iter().collect()), + topics: topics.map(|topics| { + let mut resolved_topics = [None, None, None, None]; + for (index, topic) in topics.into_iter().enumerate() { + resolved_topics[index] = + topic.map(|topic_filter| topic_filter.into_iter().collect()); + } + resolved_topics + }), + } + } + + /// Checks if a certain log matches this filter. + pub fn matches(&self, log: &Log) -> bool { + // Check the emitter address. If it doesn't match, then we return. + if let Some(ref address_filter) = self.addresses && + !address_filter.contains(&log.address) && + !address_filter.is_empty() + { + return false; + } + + // Check the topics filter to ensure that the log matches the topics filter. + if let Some(ref topics_filters) = self.topics { + let mut event_topics = log.topics.iter(); + for topics_filter in topics_filters { + let event_topic = event_topics.next(); + + match (topics_filter, event_topic) { + // Wildcard filters. + (None, _) => {}, + (Some(topic_filters), _) if topic_filters.is_empty() => {}, + // There's a filter but there's no topic at this index, return false at this + // point. + (Some(..), None) => return false, + // There's a filter and there's also a topic at this index. So filter based on + // it. + (Some(topics_filter), Some(topic)) => { + if !topics_filter.contains(topic) { + return false; + } + }, + } + } + } + + true + } +} + +/// Resolved parameters for the subscription request which contains both the request type and the +/// options. +#[derive(Clone, Debug)] +pub enum SubscriptionParameters { + NewBlockHeaders, + Logs(LogsSubscriptionFilter), +} + +impl SubscriptionParameters { + pub fn new( + subscription_kind: SubscriptionKind, + subscription_options: Option, + ) -> Option { + match (subscription_kind, subscription_options) { + (SubscriptionKind::Logs, None) => { + Some(Self::Logs(LogsSubscriptionFilter::new(None, None))) + }, + ( + SubscriptionKind::Logs, + Some(SubscriptionOptions::LogsOptions { address, topics }), + ) => Some(Self::Logs(LogsSubscriptionFilter::new(address, topics))), + (SubscriptionKind::NewBlockHeaders, None) => Some(Self::NewBlockHeaders), + _ => None, + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(untagged)] +pub enum SubscriptionItem { + BlockHeader(BlockHeader), + Log(Log), +} + +/// A helper type used when a type can be serialized and deserialized as either being one or as an +/// array. +#[derive( + Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Encode, Decode, TypeInfo, +)] +#[serde(untagged)] +pub enum BoundedOneOrMany { + One(T), + Many(BoundedVec>), +} + +impl IntoIterator for BoundedOneOrMany { + type IntoIter = Box>; + type Item = T; + + fn into_iter(self) -> Self::IntoIter { + match self { + BoundedOneOrMany::One(item) => Box::new(core::iter::once(item)) as _, + BoundedOneOrMany::Many(bounded_vec) => Box::new(bounded_vec.into_iter()) as _, + } + } +} + #[cfg(test)] mod tests { use super::*;