From 055a231d9657e8c6959ea3009c238f44cf101224 Mon Sep 17 00:00:00 2001 From: Marian Radu Date: Wed, 1 Oct 2025 19:04:27 +0300 Subject: [PATCH 01/11] Wait for transaction receipt if instant seal is enabled. Add a new revive-dev-node RPC API to check whether the node has instant seal enabled. The query is performed once during client initialization. --- .../frame/revive/dev-node/node/src/cli.rs | 2 +- .../frame/revive/dev-node/node/src/rpc.rs | 40 +++++++++++- .../frame/revive/dev-node/node/src/service.rs | 6 +- .../frame/revive/rpc/src/apis/debug_apis.rs | 10 +++ substrate/frame/revive/rpc/src/client.rs | 65 ++++++++++++++++++- substrate/frame/revive/rpc/src/lib.rs | 12 +++- 6 files changed, 127 insertions(+), 8 deletions(-) diff --git a/substrate/frame/revive/dev-node/node/src/cli.rs b/substrate/frame/revive/dev-node/node/src/cli.rs index a2d84e1069575..1d480067465d0 100644 --- a/substrate/frame/revive/dev-node/node/src/cli.rs +++ b/substrate/frame/revive/dev-node/node/src/cli.rs @@ -17,7 +17,7 @@ use polkadot_sdk::{sc_cli::RunCmd, *}; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy)] pub enum Consensus { ManualSeal(u64), InstantSeal, diff --git a/substrate/frame/revive/dev-node/node/src/rpc.rs b/substrate/frame/revive/dev-node/node/src/rpc.rs index 1369cf9381763..09c574ab8b685 100644 --- a/substrate/frame/revive/dev-node/node/src/rpc.rs +++ b/substrate/frame/revive/dev-node/node/src/rpc.rs @@ -22,7 +22,8 @@ #![warn(missing_docs)] -use jsonrpsee::RpcModule; +use crate::cli::Consensus; +use jsonrpsee::{core::RpcResult, proc_macros::rpc, RpcModule}; use polkadot_sdk::{ sc_transaction_pool_api::TransactionPool, sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata}, @@ -37,6 +38,40 @@ pub struct FullDeps { pub client: Arc, /// Transaction pool instance. pub pool: Arc

, + /// The consensus type of the node. + /// TODO: Should be Arc? + pub consensus: Consensus, +} + +/// AutoMine JSON-RPC api. +#[rpc(server, client)] +pub trait AutoMineRpc { + /// API to get the automine status. + #[method(name = "getAutomine")] + fn get_automine(&self) -> RpcResult; +} + +/// Implementation of the AutoMine RPC api. +pub struct AutoMineRpcImpl { + /// The consensus type of the node. + consensus: Consensus, +} + +impl AutoMineRpcImpl { + /// Create new `AutoMineRpcImpl` instance. + pub fn new(consensus: Consensus) -> Self { + Self { consensus } + } +} + +impl AutoMineRpcServer for AutoMineRpcImpl { + /// Returns `true` if block production is set to `instant`. + fn get_automine(&self) -> RpcResult { + Ok(match self.consensus { + Consensus::InstantSeal => true, + _ => false, + }) + } } #[docify::export] @@ -58,8 +93,9 @@ where { use polkadot_sdk::substrate_frame_rpc_system::{System, SystemApiServer}; let mut module = RpcModule::new(()); - let FullDeps { client, pool } = deps; + let FullDeps { client, pool, consensus } = deps; + module.merge(AutoMineRpcImpl::new(consensus).into_rpc())?; module.merge(System::new(client.clone(), pool.clone()).into_rpc())?; Ok(module) diff --git a/substrate/frame/revive/dev-node/node/src/service.rs b/substrate/frame/revive/dev-node/node/src/service.rs index 5cee6586da51d..4b48e221c57c1 100644 --- a/substrate/frame/revive/dev-node/node/src/service.rs +++ b/substrate/frame/revive/dev-node/node/src/service.rs @@ -168,7 +168,11 @@ pub fn new_full::Ha let pool = transaction_pool.clone(); Box::new(move |_| { - let deps = crate::rpc::FullDeps { client: client.clone(), pool: pool.clone() }; + let deps = crate::rpc::FullDeps { + client: client.clone(), + pool: pool.clone(), + consensus: consensus.clone(), + }; crate::rpc::create_full(deps).map_err(Into::into) }) }; diff --git a/substrate/frame/revive/rpc/src/apis/debug_apis.rs b/substrate/frame/revive/rpc/src/apis/debug_apis.rs index 5d2c61458c55a..6166b89306f2e 100644 --- a/substrate/frame/revive/rpc/src/apis/debug_apis.rs +++ b/substrate/frame/revive/rpc/src/apis/debug_apis.rs @@ -56,6 +56,9 @@ pub trait DebugRpc { block: BlockNumberOrTagOrHash, tracer_config: TracerConfig, ) -> RpcResult; + + #[method(name = "debug_getAutomine")] + async fn get_automine(&self) -> RpcResult; } pub struct DebugRpcServerImpl { @@ -115,4 +118,11 @@ impl DebugRpcServer for DebugRpcServerImpl { let TracerConfig { config, timeout } = tracer_config; with_timeout(timeout, self.client.trace_call(transaction, block, config)).await } + + async fn get_automine(&self) -> RpcResult { + self.client.get_automine().await.map_err(|err| { + log::error!(target: LOG_TARGET, "Get automine failed: {err:?}"); + ErrorCode::InternalError.into() + }) + } } diff --git a/substrate/frame/revive/rpc/src/client.rs b/substrate/frame/revive/rpc/src/client.rs index 36945ba5bc1bf..a6a510289393d 100644 --- a/substrate/frame/revive/rpc/src/client.rs +++ b/substrate/frame/revive/rpc/src/client.rs @@ -53,6 +53,7 @@ use subxt::{ Config, OnlineClient, }; use thiserror::Error; +use tokio::time::sleep; /// The substrate block type. pub type SubstrateBlock = subxt::blocks::Block>; @@ -121,6 +122,9 @@ pub enum ClientError { /// Failed to filter logs. #[error("Failed to filter logs")] LogFilterFailed(#[from] anyhow::Error), + /// The transaction receipt was not found. + #[error("Transaction receipt not found")] + TransactReceiptNotFound, } const REVERT_CODE: i32 = 3; @@ -160,6 +164,7 @@ pub struct Client { fee_history_provider: FeeHistoryProvider, chain_id: u64, max_block_weight: Weight, + automine: bool, } /// Fetch the chain ID from the substrate chain. @@ -217,7 +222,7 @@ impl Client { let (chain_id, max_block_weight) = tokio::try_join!(chain_id(&api), max_block_weight(&api))?; - Ok(Self { + let mut client = Self { api, rpc_client, rpc, @@ -226,7 +231,10 @@ impl Client { fee_history_provider: FeeHistoryProvider::default(), chain_id, max_block_weight, - }) + automine: false, + }; + client.update_automine().await; + Ok(client) } /// Subscribe to past blocks executing the callback for each block in `range`. @@ -703,4 +711,57 @@ impl Client { .fee_history(block_count, latest_block.number(), reward_percentiles) .await } + + /// Wait for a transaction receipt to be available in the database for the given transaction + /// hash. + pub async fn wait_for_transaction_receipt(&self, hash: H256) -> Result<(), ClientError> { + // It retries for a maximum of 10 times, waiting 100 milliseconds between each attempt + let max_retries = 10; + let wait_time_millis = 100; + for attempt in 0..max_retries { + if let Some(_receipt) = self.receipt(&hash).await { + log::debug!(target: LOG_TARGET, "Transaction receipt found for hash: {hash:?} after {attempt} attempts."); + return Ok(()); + } + + sleep(Duration::from_millis(wait_time_millis)).await; + } + + let total_wait = wait_time_millis * max_retries; + log::error!(target: LOG_TARGET, "Transaction {hash:?} receipt not found after {total_wait}ms"); + Err(ClientError::TransactReceiptNotFound) + } + + /// Check if automine is enabled. + pub fn is_automine(&self) -> bool { + self.automine + } + + /// Get the automine status from the node. + pub async fn get_automine(&self) -> Result { + let result = self.rpc_client.request("getAutomine", rpc_params![]).await; + + match &result { + Ok(automine) => { + log::debug!(target: LOG_TARGET, "Automine status fetched successfully: {automine}"); + Ok(*automine) + }, + Err(err) => { + log::error!(target: LOG_TARGET, "Get automine failed: {err:?}"); + Err(ClientError::TransactError(EthTransactError::Message( + "Get automine failed".to_string(), + ))) + }, + } + } + + /// Update the automine status by querying the node. + /// If the query fails, automine is set to false. + async fn update_automine(&mut self) { + if let Ok(automine) = self.get_automine().await { + self.automine = automine; + } else { + self.automine = false; + } + } } diff --git a/substrate/frame/revive/rpc/src/lib.rs b/substrate/frame/revive/rpc/src/lib.rs index 3cdb55f6f081a..c5423500c18e7 100644 --- a/substrate/frame/revive/rpc/src/lib.rs +++ b/substrate/frame/revive/rpc/src/lib.rs @@ -169,8 +169,16 @@ impl EthRpcServer for EthRpcServerImpl { err })?; - log::debug!(target: LOG_TARGET, "send_raw_transaction hash: {hash:?}"); - Ok(hash) + // If automine is enabled, wait for the transaction receipt to be available. + // This ensures that the transaction is mined before returning the hash. + if self.client.is_automine() { + self.client.wait_for_transaction_receipt(hash).await.map_err(|err| { + log::error!(target: LOG_TARGET, "Wait for receipt failed: {err:?}"); + err + })?; + } + + return Ok(hash); } async fn send_transaction(&self, mut transaction: GenericTransaction) -> RpcResult { From 94bf5aede9d1f867dd98012ab0837bb948840140 Mon Sep 17 00:00:00 2001 From: Marian Radu Date: Fri, 3 Oct 2025 09:59:30 +0300 Subject: [PATCH 02/11] Return a more specific error when automine status retrieval fails. --- substrate/frame/revive/rpc/src/client.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/substrate/frame/revive/rpc/src/client.rs b/substrate/frame/revive/rpc/src/client.rs index a6a510289393d..624053597a99a 100644 --- a/substrate/frame/revive/rpc/src/client.rs +++ b/substrate/frame/revive/rpc/src/client.rs @@ -125,6 +125,9 @@ pub enum ClientError { /// The transaction receipt was not found. #[error("Transaction receipt not found")] TransactReceiptNotFound, + /// Failed to check instant seal status. + #[error("Failed to check instant seal status")] + GetAutomineFailed, } const REVERT_CODE: i32 = 3; @@ -748,9 +751,7 @@ impl Client { }, Err(err) => { log::error!(target: LOG_TARGET, "Get automine failed: {err:?}"); - Err(ClientError::TransactError(EthTransactError::Message( - "Get automine failed".to_string(), - ))) + Err(ClientError::GetAutomineFailed) }, } } From d9eada1d6cdc0c66e306ec8db5b53001f81f1e98 Mon Sep 17 00:00:00 2001 From: Marian Radu Date: Tue, 7 Oct 2025 16:21:33 +0300 Subject: [PATCH 03/11] Address review comments. --- .../frame/revive/dev-node/node/src/rpc.rs | 3 +- substrate/frame/revive/rpc/src/client.rs | 29 +++++++++---------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/substrate/frame/revive/dev-node/node/src/rpc.rs b/substrate/frame/revive/dev-node/node/src/rpc.rs index 09c574ab8b685..5a3f92ee2a0ca 100644 --- a/substrate/frame/revive/dev-node/node/src/rpc.rs +++ b/substrate/frame/revive/dev-node/node/src/rpc.rs @@ -39,11 +39,12 @@ pub struct FullDeps { /// Transaction pool instance. pub pool: Arc

, /// The consensus type of the node. - /// TODO: Should be Arc? pub consensus: Consensus, } /// AutoMine JSON-RPC api. +/// Automine is a feature of the Hardhat Network where a new block is automatically mined after each +/// transaction. #[rpc(server, client)] pub trait AutoMineRpc { /// API to get the automine status. diff --git a/substrate/frame/revive/rpc/src/client.rs b/substrate/frame/revive/rpc/src/client.rs index 624053597a99a..f6e5c6a2f55c7 100644 --- a/substrate/frame/revive/rpc/src/client.rs +++ b/substrate/frame/revive/rpc/src/client.rs @@ -125,9 +125,6 @@ pub enum ClientError { /// The transaction receipt was not found. #[error("Transaction receipt not found")] TransactReceiptNotFound, - /// Failed to check instant seal status. - #[error("Failed to check instant seal status")] - GetAutomineFailed, } const REVERT_CODE: i32 = 3; @@ -236,7 +233,18 @@ impl Client { max_block_weight, automine: false, }; - client.update_automine().await; + + // Update the automine status by querying the node. + // If the query fails, automine is set to false. + let update_automine = async { + if let Ok(automine) = client.get_automine().await { + client.automine = automine; + } else { + client.automine = false; + } + }; + update_automine.await; + Ok(client) } @@ -751,18 +759,9 @@ impl Client { }, Err(err) => { log::error!(target: LOG_TARGET, "Get automine failed: {err:?}"); - Err(ClientError::GetAutomineFailed) + // Return false if the query fails, since only revive-dev-node implements automine. + return Ok(false) }, } } - - /// Update the automine status by querying the node. - /// If the query fails, automine is set to false. - async fn update_automine(&mut self) { - if let Ok(automine) = self.get_automine().await { - self.automine = automine; - } else { - self.automine = false; - } - } } From 004f8c5cfc9b3aa3bf936b7819ee3cfa6894ae23 Mon Sep 17 00:00:00 2001 From: Marian Radu Date: Wed, 8 Oct 2025 14:56:56 +0300 Subject: [PATCH 04/11] Switch to using tokio::broadcast::channel to wait for transaction to be mined into a block. --- substrate/frame/revive/rpc/src/cli.rs | 20 ++++-- substrate/frame/revive/rpc/src/client.rs | 83 ++++++++++++++++-------- substrate/frame/revive/rpc/src/lib.rs | 64 +++++++++++++++--- 3 files changed, 125 insertions(+), 42 deletions(-) diff --git a/substrate/frame/revive/rpc/src/cli.rs b/substrate/frame/revive/rpc/src/cli.rs index c603947e33a95..45d52bdb23a85 100644 --- a/substrate/frame/revive/rpc/src/cli.rs +++ b/substrate/frame/revive/rpc/src/cli.rs @@ -16,7 +16,7 @@ // limitations under the License. //! The Ethereum JSON-RPC server. use crate::{ - client::{connect, Client, SubscriptionType, SubstrateBlockNumber}, + client::{connect, Client, SubscriptionType, SubstrateBlockNumber, TransactHashes}, DebugRpcServer, DebugRpcServerImpl, EthRpcServer, EthRpcServerImpl, ReceiptExtractor, ReceiptProvider, SubxtBlockInfoProvider, SystemHealthRpcServer, SystemHealthRpcServerImpl, LOG_TARGET, @@ -106,6 +106,7 @@ fn build_client( node_rpc_url: &str, database_url: &str, abort_signal: Signals, + transact_hashes_sender: tokio::sync::broadcast::Sender, ) -> anyhow::Result { let fut = async { let (api, rpc_client, rpc) = connect(node_rpc_url).await?; @@ -139,7 +140,7 @@ fn build_client( .await?; let client = - Client::new(api, rpc_client, rpc, block_provider, receipt_provider).await?; + Client::new(api, rpc_client, rpc, block_provider, receipt_provider, transact_hashes_sender).await?; Ok(client) } @@ -199,6 +200,10 @@ pub fn run(cmd: CliCommand) -> anyhow::Result<()> { let tokio_handle = tokio_runtime.handle(); let mut task_manager = TaskManager::new(tokio_handle.clone(), prometheus_registry)?; + // Channel used to notify when a transaction hash is included in a block. + // Capacity of 32 should be enough since blocks are mined at a relatively low rate. + let (transact_hashes_sender, _) = tokio::sync::broadcast::channel::(32); + let client = build_client( tokio_handle, cache_size, @@ -206,6 +211,7 @@ pub fn run(cmd: CliCommand) -> anyhow::Result<()> { &node_rpc_url, &database_url, tokio_runtime.block_on(async { Signals::capture() })?, + transact_hashes_sender.clone(), )?; // Prometheus metrics. @@ -221,7 +227,7 @@ pub fn run(cmd: CliCommand) -> anyhow::Result<()> { &rpc_config, prometheus_registry, tokio_handle, - || rpc_module(is_dev, client.clone()), + || rpc_module(is_dev, client.clone(), transact_hashes_sender.clone()), None, )?; @@ -250,8 +256,12 @@ pub fn run(cmd: CliCommand) -> anyhow::Result<()> { } /// Create the JSON-RPC module. -fn rpc_module(is_dev: bool, client: Client) -> Result, sc_service::Error> { - let eth_api = EthRpcServerImpl::new(client.clone()) +fn rpc_module( + is_dev: bool, + client: Client, + transact_hashes_sender: tokio::sync::broadcast::Sender, +) -> Result, sc_service::Error> { + let eth_api = EthRpcServerImpl::new(client.clone(), transact_hashes_sender) .with_accounts(if is_dev { vec![crate::Account::default()] } else { vec![] }) .into_rpc(); diff --git a/substrate/frame/revive/rpc/src/client.rs b/substrate/frame/revive/rpc/src/client.rs index f6e5c6a2f55c7..a13353885311c 100644 --- a/substrate/frame/revive/rpc/src/client.rs +++ b/substrate/frame/revive/rpc/src/client.rs @@ -20,9 +20,6 @@ mod runtime_api; mod storage_api; -use runtime_api::RuntimeApi; -use storage_api::StorageApi; - use crate::{ subxt_client::{self, revive::calls::types::EthTransact, SrcChainConfig}, BlockInfoProvider, BlockTag, FeeHistoryProvider, ReceiptProvider, SubxtBlockInfoProvider, @@ -37,9 +34,11 @@ use pallet_revive::{ }, EthTransactError, }; +use runtime_api::RuntimeApi; use sp_runtime::traits::Block as BlockT; use sp_weights::Weight; -use std::{ops::Range, sync::Arc, time::Duration}; +use std::{collections::HashSet, ops::Range, sync::Arc, time::Duration}; +use storage_api::StorageApi; use subxt::{ backend::{ legacy::{rpc_methods::SystemHealth, LegacyRpcMethods}, @@ -53,7 +52,6 @@ use subxt::{ Config, OnlineClient, }; use thiserror::Error; -use tokio::time::sleep; /// The substrate block type. pub type SubstrateBlock = subxt::blocks::Block>; @@ -70,8 +68,10 @@ pub type SubstrateBlockHash = HashFor; /// The runtime balance type. pub type Balance = u128; +pub type TransactHashes = HashSet; + /// The subscription type used to listen to new blocks. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq)] pub enum SubscriptionType { /// Subscribe to best blocks. BestBlocks, @@ -165,6 +165,7 @@ pub struct Client { chain_id: u64, max_block_weight: Weight, automine: bool, + transact_hashes_sender: tokio::sync::broadcast::Sender, } /// Fetch the chain ID from the substrate chain. @@ -218,6 +219,7 @@ impl Client { rpc: LegacyRpcMethods, block_provider: SubxtBlockInfoProvider, receipt_provider: ReceiptProvider, + transact_hashes_sender: tokio::sync::broadcast::Sender, ) -> Result { let (chain_id, max_block_weight) = tokio::try_join!(chain_id(&api), max_block_weight(&api))?; @@ -232,6 +234,7 @@ impl Client { chain_id, max_block_weight, automine: false, + transact_hashes_sender, }; // Update the automine status by querying the node. @@ -343,6 +346,10 @@ impl Client { let (signed_txs, receipts): (Vec<_>, Vec<_>) = self.receipt_provider.insert_block_receipts(&block).await?.into_iter().unzip(); + // Broadcast transaction hashes + self.broadcast_transact_hashes(subscription_type, &receipts, block.number()) + .await; + let evm_block = self.evm_block_from_receipts(&block, &receipts, signed_txs, false).await; self.block_provider.update_latest(block, subscription_type).await; @@ -371,6 +378,47 @@ impl Client { Ok(()) } + /// Broadcast transaction hashes to subscribers. + async fn broadcast_transact_hashes( + &self, + subscription_type: SubscriptionType, + receipts: &[ReceiptInfo], + block_number: SubstrateBlockNumber, + ) { + // Closure to extract the transaction hashes from the receipts. + let extract_transact_hashes = || { + let mut transact_hashes = HashSet::new(); + for receipt in receipts { + transact_hashes.insert(receipt.transaction_hash); + } + transact_hashes + }; + + // Only broadcast for best blocks to avoid duplicate notifications. + if subscription_type != SubscriptionType::BestBlocks { + return; + } + + // Only broadcast if there are subscribers. + if self.transact_hashes_sender.receiver_count() == 0 { + return; + } + + let transact_hashes = extract_transact_hashes(); + if transact_hashes.is_empty() { + return; + } + + if let Err(err) = self.transact_hashes_sender.send(transact_hashes) { + log::warn!( + target: LOG_TARGET, + "Failed to broadcast transaction hashes for block: {:?}, error: {:?}", + block_number, + err + ); + } + } + /// Get the block hash for the given block number or tag. pub async fn block_hash_for_tag( &self, @@ -723,26 +771,6 @@ impl Client { .await } - /// Wait for a transaction receipt to be available in the database for the given transaction - /// hash. - pub async fn wait_for_transaction_receipt(&self, hash: H256) -> Result<(), ClientError> { - // It retries for a maximum of 10 times, waiting 100 milliseconds between each attempt - let max_retries = 10; - let wait_time_millis = 100; - for attempt in 0..max_retries { - if let Some(_receipt) = self.receipt(&hash).await { - log::debug!(target: LOG_TARGET, "Transaction receipt found for hash: {hash:?} after {attempt} attempts."); - return Ok(()); - } - - sleep(Duration::from_millis(wait_time_millis)).await; - } - - let total_wait = wait_time_millis * max_retries; - log::error!(target: LOG_TARGET, "Transaction {hash:?} receipt not found after {total_wait}ms"); - Err(ClientError::TransactReceiptNotFound) - } - /// Check if automine is enabled. pub fn is_automine(&self) -> bool { self.automine @@ -759,7 +787,8 @@ impl Client { }, Err(err) => { log::error!(target: LOG_TARGET, "Get automine failed: {err:?}"); - // Return false if the query fails, since only revive-dev-node implements automine. + // Return false if the query fails, as some nodes may not support the getAutomine + // RPC method. return Ok(false) }, } diff --git a/substrate/frame/revive/rpc/src/lib.rs b/substrate/frame/revive/rpc/src/lib.rs index c5423500c18e7..db588284e476b 100644 --- a/substrate/frame/revive/rpc/src/lib.rs +++ b/substrate/frame/revive/rpc/src/lib.rs @@ -17,7 +17,7 @@ //! The [`EthRpcServer`] RPC server implementation #![cfg_attr(docsrs, feature(doc_cfg))] -use client::ClientError; +use client::{ClientError, TransactHashes}; use jsonrpsee::{ core::{async_trait, RpcResult}, types::{ErrorCode, ErrorObjectOwned}, @@ -26,6 +26,7 @@ use pallet_revive::evm::*; use sp_arithmetic::Permill; use sp_core::{keccak_256, H160, H256, U256}; use thiserror::Error; +use tokio::time::Duration; pub mod cli; pub mod client; @@ -50,6 +51,8 @@ pub use receipt_extractor::*; mod apis; pub use apis::*; +use tokio::sync::broadcast; + pub const LOG_TARGET: &str = "eth-rpc"; /// An EVM RPC server implementation. @@ -59,12 +62,18 @@ pub struct EthRpcServerImpl { /// The accounts managed by the server. accounts: Vec, + + /// The transaction hashes sender. + transact_hashes_sender: broadcast::Sender, } impl EthRpcServerImpl { /// Creates a new [`EthRpcServerImpl`]. - pub fn new(client: client::Client) -> Self { - Self { client, accounts: vec![] } + pub fn new( + client: client::Client, + transact_hashes_sender: broadcast::Sender, + ) -> Self { + Self { client, accounts: vec![], transact_hashes_sender } } /// Sets the accounts managed by the server. @@ -110,6 +119,31 @@ impl From for ErrorObjectOwned { } } +async fn wait_for_transaction_to_be_mined( + mut transact_hashes_receiver: broadcast::Receiver, + hash: H256, + timeout: Duration, +) -> Result { + let timeout_fut = tokio::time::sleep(timeout); + tokio::pin!(timeout_fut); + + loop { + tokio::select! { + _ = &mut timeout_fut => { + return Err(format!("Timed out waiting for transaction {hash:?} to be mined in a block").into()); + } + result = transact_hashes_receiver.recv() => { + let transact_hashes = result + .map_err(|err| format!("Failed to receive transaction hashes: {err}"))?; + if transact_hashes.contains(&hash) { + return Ok(true); + } + // otherwise keep waiting + } + } + } +} + #[async_trait] impl EthRpcServer for EthRpcServerImpl { async fn net_version(&self) -> RpcResult { @@ -164,20 +198,30 @@ impl EthRpcServer for EthRpcServerImpl { async fn send_raw_transaction(&self, transaction: Bytes) -> RpcResult { let hash = H256(keccak_256(&transaction.0)); let call = subxt_client::tx().revive().eth_transact(transaction.0); + + // Subscribe to transaction hashes only when automine is enabled. + let transact_hashes_receiver = if self.client.is_automine() { + Some(self.transact_hashes_sender.subscribe()) // each task gets its own receiver + } else { + None + }; + + // Submit the transaction self.client.submit(call).await.map_err(|err| { log::debug!(target: LOG_TARGET, "submit call failed: {err:?}"); err })?; - // If automine is enabled, wait for the transaction receipt to be available. - // This ensures that the transaction is mined before returning the hash. - if self.client.is_automine() { - self.client.wait_for_transaction_receipt(hash).await.map_err(|err| { - log::error!(target: LOG_TARGET, "Wait for receipt failed: {err:?}"); - err - })?; + // Wait for the transaction to be included in a block if automine is enabled + if let Some(hashes_receiver) = transact_hashes_receiver { + let _ = wait_for_transaction_to_be_mined(hashes_receiver, hash, Duration::from_secs(1)) + .await + .map_err(|err| { + log::warn!(target : LOG_TARGET, "Waiting for tx receipt failed: {err}"); + }); } + log::debug!(target: LOG_TARGET, "send_raw_transaction hash: {hash:?}"); return Ok(hash); } From eaed42967b5a4312f529ab8fe396ac31932b34ec Mon Sep 17 00:00:00 2001 From: Marian Radu Date: Thu, 9 Oct 2025 14:05:54 +0300 Subject: [PATCH 05/11] Address review comments. --- .../frame/revive/dev-node/node/src/rpc.rs | 11 +- .../frame/revive/rpc/src/apis/debug_apis.rs | 5 +- substrate/frame/revive/rpc/src/cli.rs | 20 +-- substrate/frame/revive/rpc/src/client.rs | 147 +++++++++--------- substrate/frame/revive/rpc/src/lib.rs | 24 ++- 5 files changed, 91 insertions(+), 116 deletions(-) diff --git a/substrate/frame/revive/dev-node/node/src/rpc.rs b/substrate/frame/revive/dev-node/node/src/rpc.rs index 5a3f92ee2a0ca..48e435825f49f 100644 --- a/substrate/frame/revive/dev-node/node/src/rpc.rs +++ b/substrate/frame/revive/dev-node/node/src/rpc.rs @@ -54,24 +54,21 @@ pub trait AutoMineRpc { /// Implementation of the AutoMine RPC api. pub struct AutoMineRpcImpl { - /// The consensus type of the node. - consensus: Consensus, + /// Whether the node is running in auto-mine mode. + is_auto_mine: bool, } impl AutoMineRpcImpl { /// Create new `AutoMineRpcImpl` instance. pub fn new(consensus: Consensus) -> Self { - Self { consensus } + Self { is_auto_mine: matches!(consensus, Consensus::InstantSeal) } } } impl AutoMineRpcServer for AutoMineRpcImpl { /// Returns `true` if block production is set to `instant`. fn get_automine(&self) -> RpcResult { - Ok(match self.consensus { - Consensus::InstantSeal => true, - _ => false, - }) + Ok(self.is_auto_mine) } } diff --git a/substrate/frame/revive/rpc/src/apis/debug_apis.rs b/substrate/frame/revive/rpc/src/apis/debug_apis.rs index 6166b89306f2e..80b6f292d1c9d 100644 --- a/substrate/frame/revive/rpc/src/apis/debug_apis.rs +++ b/substrate/frame/revive/rpc/src/apis/debug_apis.rs @@ -120,9 +120,6 @@ impl DebugRpcServer for DebugRpcServerImpl { } async fn get_automine(&self) -> RpcResult { - self.client.get_automine().await.map_err(|err| { - log::error!(target: LOG_TARGET, "Get automine failed: {err:?}"); - ErrorCode::InternalError.into() - }) + sc_service::Result::Ok(self.client.get_automine().await.unwrap_or_default()) } } diff --git a/substrate/frame/revive/rpc/src/cli.rs b/substrate/frame/revive/rpc/src/cli.rs index 45d52bdb23a85..c603947e33a95 100644 --- a/substrate/frame/revive/rpc/src/cli.rs +++ b/substrate/frame/revive/rpc/src/cli.rs @@ -16,7 +16,7 @@ // limitations under the License. //! The Ethereum JSON-RPC server. use crate::{ - client::{connect, Client, SubscriptionType, SubstrateBlockNumber, TransactHashes}, + client::{connect, Client, SubscriptionType, SubstrateBlockNumber}, DebugRpcServer, DebugRpcServerImpl, EthRpcServer, EthRpcServerImpl, ReceiptExtractor, ReceiptProvider, SubxtBlockInfoProvider, SystemHealthRpcServer, SystemHealthRpcServerImpl, LOG_TARGET, @@ -106,7 +106,6 @@ fn build_client( node_rpc_url: &str, database_url: &str, abort_signal: Signals, - transact_hashes_sender: tokio::sync::broadcast::Sender, ) -> anyhow::Result { let fut = async { let (api, rpc_client, rpc) = connect(node_rpc_url).await?; @@ -140,7 +139,7 @@ fn build_client( .await?; let client = - Client::new(api, rpc_client, rpc, block_provider, receipt_provider, transact_hashes_sender).await?; + Client::new(api, rpc_client, rpc, block_provider, receipt_provider).await?; Ok(client) } @@ -200,10 +199,6 @@ pub fn run(cmd: CliCommand) -> anyhow::Result<()> { let tokio_handle = tokio_runtime.handle(); let mut task_manager = TaskManager::new(tokio_handle.clone(), prometheus_registry)?; - // Channel used to notify when a transaction hash is included in a block. - // Capacity of 32 should be enough since blocks are mined at a relatively low rate. - let (transact_hashes_sender, _) = tokio::sync::broadcast::channel::(32); - let client = build_client( tokio_handle, cache_size, @@ -211,7 +206,6 @@ pub fn run(cmd: CliCommand) -> anyhow::Result<()> { &node_rpc_url, &database_url, tokio_runtime.block_on(async { Signals::capture() })?, - transact_hashes_sender.clone(), )?; // Prometheus metrics. @@ -227,7 +221,7 @@ pub fn run(cmd: CliCommand) -> anyhow::Result<()> { &rpc_config, prometheus_registry, tokio_handle, - || rpc_module(is_dev, client.clone(), transact_hashes_sender.clone()), + || rpc_module(is_dev, client.clone()), None, )?; @@ -256,12 +250,8 @@ pub fn run(cmd: CliCommand) -> anyhow::Result<()> { } /// Create the JSON-RPC module. -fn rpc_module( - is_dev: bool, - client: Client, - transact_hashes_sender: tokio::sync::broadcast::Sender, -) -> Result, sc_service::Error> { - let eth_api = EthRpcServerImpl::new(client.clone(), transact_hashes_sender) +fn rpc_module(is_dev: bool, client: Client) -> Result, sc_service::Error> { + let eth_api = EthRpcServerImpl::new(client.clone()) .with_accounts(if is_dev { vec![crate::Account::default()] } else { vec![] }) .into_rpc(); diff --git a/substrate/frame/revive/rpc/src/client.rs b/substrate/frame/revive/rpc/src/client.rs index a13353885311c..26ddf8d1c7d1c 100644 --- a/substrate/frame/revive/rpc/src/client.rs +++ b/substrate/frame/revive/rpc/src/client.rs @@ -165,7 +165,7 @@ pub struct Client { chain_id: u64, max_block_weight: Weight, automine: bool, - transact_hashes_sender: tokio::sync::broadcast::Sender, + transact_hashes_sender: Option>, } /// Fetch the chain ID from the substrate chain. @@ -182,6 +182,17 @@ async fn max_block_weight(api: &OnlineClient) -> Result Result { + match rpc_client.request::("getAutomine", rpc_params![]).await { + Ok(val) => Ok(val), + Err(err) => { + log::warn!(target: LOG_TARGET, "getAutomine RPC call failed: {err:?}"); + Ok(false) + }, + } +} + /// Extract the block timestamp. async fn extract_block_timestamp(block: &SubstrateBlock) -> Option { let extrinsics = block.extrinsics().await.ok()?; @@ -211,6 +222,41 @@ pub async fn connect( Ok((api, rpc_client, rpc)) } +/// Broadcast transaction hashes to subscribers. +async fn broadcast_transact_hashes( + transact_hashes_sender: &tokio::sync::broadcast::Sender, + receipts: &[ReceiptInfo], + block_number: SubstrateBlockNumber, +) { + // Closure to extract the transaction hashes from the receipts. + let extract_transact_hashes = || { + let mut transact_hashes = HashSet::new(); + for receipt in receipts { + transact_hashes.insert(receipt.transaction_hash); + } + transact_hashes + }; + + // Only broadcast if there are subscribers. + if transact_hashes_sender.receiver_count() == 0 { + return; + } + + let transact_hashes = extract_transact_hashes(); + if transact_hashes.is_empty() { + return; + } + + if let Err(err) = transact_hashes_sender.send(transact_hashes) { + log::warn!( + target: LOG_TARGET, + "Failed to broadcast transaction hashes for block: {:?}, error: {:?}", + block_number, + err + ); + } +} + impl Client { /// Create a new client instance. pub async fn new( @@ -219,12 +265,18 @@ impl Client { rpc: LegacyRpcMethods, block_provider: SubxtBlockInfoProvider, receipt_provider: ReceiptProvider, - transact_hashes_sender: tokio::sync::broadcast::Sender, ) -> Result { - let (chain_id, max_block_weight) = - tokio::try_join!(chain_id(&api), max_block_weight(&api))?; + let (chain_id, max_block_weight, automine) = + tokio::try_join!(chain_id(&api), max_block_weight(&api), get_automine(&rpc_client))?; + + // Create the transaction hashes sender if automine is enabled. + let transact_hashes_sender = automine.then(|| { + // Channel used to notify when a transaction hash is included in a block. + // Capacity of 32 should be enough since blocks are mined at a relatively low rate. + tokio::sync::broadcast::channel::(32).0 + }); - let mut client = Self { + let client = Self { api, rpc_client, rpc, @@ -233,21 +285,10 @@ impl Client { fee_history_provider: FeeHistoryProvider::default(), chain_id, max_block_weight, - automine: false, + automine, transact_hashes_sender, }; - // Update the automine status by querying the node. - // If the query fails, automine is set to false. - let update_automine = async { - if let Ok(automine) = client.get_automine().await { - client.automine = automine; - } else { - client.automine = false; - } - }; - update_automine.await; - Ok(client) } @@ -346,9 +387,13 @@ impl Client { let (signed_txs, receipts): (Vec<_>, Vec<_>) = self.receipt_provider.insert_block_receipts(&block).await?.into_iter().unzip(); - // Broadcast transaction hashes - self.broadcast_transact_hashes(subscription_type, &receipts, block.number()) - .await; + // Only broadcast for best blocks to avoid duplicate notifications. + match (subscription_type, &self.transact_hashes_sender) { + (SubscriptionType::BestBlocks, Some(sender)) => { + broadcast_transact_hashes(sender, &receipts, block.number()).await; + }, + _ => {}, + } let evm_block = self.evm_block_from_receipts(&block, &receipts, signed_txs, false).await; @@ -378,47 +423,6 @@ impl Client { Ok(()) } - /// Broadcast transaction hashes to subscribers. - async fn broadcast_transact_hashes( - &self, - subscription_type: SubscriptionType, - receipts: &[ReceiptInfo], - block_number: SubstrateBlockNumber, - ) { - // Closure to extract the transaction hashes from the receipts. - let extract_transact_hashes = || { - let mut transact_hashes = HashSet::new(); - for receipt in receipts { - transact_hashes.insert(receipt.transaction_hash); - } - transact_hashes - }; - - // Only broadcast for best blocks to avoid duplicate notifications. - if subscription_type != SubscriptionType::BestBlocks { - return; - } - - // Only broadcast if there are subscribers. - if self.transact_hashes_sender.receiver_count() == 0 { - return; - } - - let transact_hashes = extract_transact_hashes(); - if transact_hashes.is_empty() { - return; - } - - if let Err(err) = self.transact_hashes_sender.send(transact_hashes) { - log::warn!( - target: LOG_TARGET, - "Failed to broadcast transaction hashes for block: {:?}, error: {:?}", - block_number, - err - ); - } - } - /// Get the block hash for the given block number or tag. pub async fn block_hash_for_tag( &self, @@ -749,6 +753,12 @@ impl Client { self.max_block_weight } + pub fn get_transact_hashes_sender( + &self, + ) -> Option> { + self.transact_hashes_sender.clone() + } + /// Get the logs matching the given filter. pub async fn logs(&self, filter: Option) -> Result, ClientError> { let logs = @@ -778,19 +788,6 @@ impl Client { /// Get the automine status from the node. pub async fn get_automine(&self) -> Result { - let result = self.rpc_client.request("getAutomine", rpc_params![]).await; - - match &result { - Ok(automine) => { - log::debug!(target: LOG_TARGET, "Automine status fetched successfully: {automine}"); - Ok(*automine) - }, - Err(err) => { - log::error!(target: LOG_TARGET, "Get automine failed: {err:?}"); - // Return false if the query fails, as some nodes may not support the getAutomine - // RPC method. - return Ok(false) - }, - } + get_automine(&self.rpc_client).await } } diff --git a/substrate/frame/revive/rpc/src/lib.rs b/substrate/frame/revive/rpc/src/lib.rs index db588284e476b..37a22c3379b34 100644 --- a/substrate/frame/revive/rpc/src/lib.rs +++ b/substrate/frame/revive/rpc/src/lib.rs @@ -62,18 +62,12 @@ pub struct EthRpcServerImpl { /// The accounts managed by the server. accounts: Vec, - - /// The transaction hashes sender. - transact_hashes_sender: broadcast::Sender, } impl EthRpcServerImpl { /// Creates a new [`EthRpcServerImpl`]. - pub fn new( - client: client::Client, - transact_hashes_sender: broadcast::Sender, - ) -> Self { - Self { client, accounts: vec![], transact_hashes_sender } + pub fn new(client: client::Client) -> Self { + Self { client, accounts: vec![] } } /// Sets the accounts managed by the server. @@ -120,8 +114,9 @@ impl From for ErrorObjectOwned { } async fn wait_for_transaction_to_be_mined( - mut transact_hashes_receiver: broadcast::Receiver, hash: H256, + mut transact_hashes_receiver: broadcast::Receiver, + timeout: Duration, ) -> Result { let timeout_fut = tokio::time::sleep(timeout); @@ -200,11 +195,10 @@ impl EthRpcServer for EthRpcServerImpl { let call = subxt_client::tx().revive().eth_transact(transaction.0); // Subscribe to transaction hashes only when automine is enabled. - let transact_hashes_receiver = if self.client.is_automine() { - Some(self.transact_hashes_sender.subscribe()) // each task gets its own receiver - } else { - None - }; + let transact_hashes_receiver = + self.client.get_transact_hashes_sender().map(|transact_hashes_sender| { + transact_hashes_sender.subscribe() // each task gets its own receiver + }); // Submit the transaction self.client.submit(call).await.map_err(|err| { @@ -214,7 +208,7 @@ impl EthRpcServer for EthRpcServerImpl { // Wait for the transaction to be included in a block if automine is enabled if let Some(hashes_receiver) = transact_hashes_receiver { - let _ = wait_for_transaction_to_be_mined(hashes_receiver, hash, Duration::from_secs(1)) + let _ = wait_for_transaction_to_be_mined(hash, hashes_receiver, Duration::from_secs(1)) .await .map_err(|err| { log::warn!(target : LOG_TARGET, "Waiting for tx receipt failed: {err}"); From 3b1d648b04701c7daa25e2e93b18f8365b7a6eb5 Mon Sep 17 00:00:00 2001 From: "cmd[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 9 Oct 2025 12:28:58 +0000 Subject: [PATCH 06/11] Update from github-actions[bot] running command 'prdoc --audience runtime_dev --bump patch' --- prdoc/pr_9914.prdoc | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 prdoc/pr_9914.prdoc diff --git a/prdoc/pr_9914.prdoc b/prdoc/pr_9914.prdoc new file mode 100644 index 0000000000000..b5414b287faf9 --- /dev/null +++ b/prdoc/pr_9914.prdoc @@ -0,0 +1,12 @@ +title: ' Wait for transaction receipt if instant seal is enabled' +doc: +- audience: Runtime Dev + description: "Fixes https://github.com/paritytech/contract-issues/issues/165\n\n\ + The main changes in this PR are:\n\n1. Add a new API to revive-dev-node to check\ + \ whether the node has instant seal enabled.\n2. Add a new debug API to eth-rpc\ + \ to check whether the node has instant seal enabled. (optional)\n3. Query and\ + \ cache the node\u2019s instant seal status during eth-rpc initialization.\n4.\ + \ If instant seal is enabled, wait for the transaction receipt to be available" +crates: +- name: pallet-revive-eth-rpc + bump: patch From 740f5f64a31aa50082a61b1f6067260daedeacf7 Mon Sep 17 00:00:00 2001 From: Marian Radu Date: Thu, 9 Oct 2025 16:59:59 +0300 Subject: [PATCH 07/11] eth-rpc: Handle RecvError::Lagged, remove unused ClientError, fix clippy - Treat broadcast::RecvError::Lagged appropriately in wait_for_transaction_to_be_mined. - Remove unused ClientError variant. - Fix clippy warnings. --- .../frame/revive/dev-node/node/src/service.rs | 7 ++----- substrate/frame/revive/rpc/src/client.rs | 3 --- substrate/frame/revive/rpc/src/lib.rs | 20 +++++++++---------- 3 files changed, 11 insertions(+), 19 deletions(-) diff --git a/substrate/frame/revive/dev-node/node/src/service.rs b/substrate/frame/revive/dev-node/node/src/service.rs index 4b48e221c57c1..1084ce3211612 100644 --- a/substrate/frame/revive/dev-node/node/src/service.rs +++ b/substrate/frame/revive/dev-node/node/src/service.rs @@ -168,11 +168,8 @@ pub fn new_full::Ha let pool = transaction_pool.clone(); Box::new(move |_| { - let deps = crate::rpc::FullDeps { - client: client.clone(), - pool: pool.clone(), - consensus: consensus.clone(), - }; + let deps = + crate::rpc::FullDeps { client: client.clone(), pool: pool.clone(), consensus }; crate::rpc::create_full(deps).map_err(Into::into) }) }; diff --git a/substrate/frame/revive/rpc/src/client.rs b/substrate/frame/revive/rpc/src/client.rs index 26ddf8d1c7d1c..3073008f85bda 100644 --- a/substrate/frame/revive/rpc/src/client.rs +++ b/substrate/frame/revive/rpc/src/client.rs @@ -122,9 +122,6 @@ pub enum ClientError { /// Failed to filter logs. #[error("Failed to filter logs")] LogFilterFailed(#[from] anyhow::Error), - /// The transaction receipt was not found. - #[error("Transaction receipt not found")] - TransactReceiptNotFound, } const REVERT_CODE: i32 = 3; diff --git a/substrate/frame/revive/rpc/src/lib.rs b/substrate/frame/revive/rpc/src/lib.rs index 37a22c3379b34..1b3b202805346 100644 --- a/substrate/frame/revive/rpc/src/lib.rs +++ b/substrate/frame/revive/rpc/src/lib.rs @@ -124,17 +124,15 @@ async fn wait_for_transaction_to_be_mined( loop { tokio::select! { - _ = &mut timeout_fut => { - return Err(format!("Timed out waiting for transaction {hash:?} to be mined in a block").into()); - } - result = transact_hashes_receiver.recv() => { - let transact_hashes = result - .map_err(|err| format!("Failed to receive transaction hashes: {err}"))?; - if transact_hashes.contains(&hash) { - return Ok(true); - } - // otherwise keep waiting - } + _ = &mut timeout_fut => return Err(format!("Timed out waiting for transaction {hash:?} to be mined.")), + recv = transact_hashes_receiver.recv() => match recv { + Ok(hashes) => if hashes.contains(&hash) { return Ok(true); }, + Err(broadcast::error::RecvError::Lagged(n)) => { + log::warn!(target : LOG_TARGET, "Missed {n} notifications while waiting for transaction {hash:?} to be mined."); + continue; + }, + Err(err) => return Err(format!("Failed to receive transaction hashes: {err}.").into()), + }, } } } From 4869a7cedb9d19e56498505ba420d3cd0fb5e558 Mon Sep 17 00:00:00 2001 From: Marian Radu Date: Thu, 9 Oct 2025 20:51:53 +0300 Subject: [PATCH 08/11] Address review comments. --- .../frame/revive/rpc/src/apis/debug_apis.rs | 2 +- substrate/frame/revive/rpc/src/client.rs | 39 +++++++------------ substrate/frame/revive/rpc/src/lib.rs | 32 ++++++++------- 3 files changed, 32 insertions(+), 41 deletions(-) diff --git a/substrate/frame/revive/rpc/src/apis/debug_apis.rs b/substrate/frame/revive/rpc/src/apis/debug_apis.rs index 80b6f292d1c9d..6704d74b13f8d 100644 --- a/substrate/frame/revive/rpc/src/apis/debug_apis.rs +++ b/substrate/frame/revive/rpc/src/apis/debug_apis.rs @@ -120,6 +120,6 @@ impl DebugRpcServer for DebugRpcServerImpl { } async fn get_automine(&self) -> RpcResult { - sc_service::Result::Ok(self.client.get_automine().await.unwrap_or_default()) + sc_service::Result::Ok(self.client.get_automine().await) } } diff --git a/substrate/frame/revive/rpc/src/client.rs b/substrate/frame/revive/rpc/src/client.rs index 3073008f85bda..014c48730bdeb 100644 --- a/substrate/frame/revive/rpc/src/client.rs +++ b/substrate/frame/revive/rpc/src/client.rs @@ -180,12 +180,12 @@ async fn max_block_weight(api: &OnlineClient) -> Result Result { +async fn get_automine(rpc_client: &RpcClient) -> bool { match rpc_client.request::("getAutomine", rpc_params![]).await { - Ok(val) => Ok(val), + Ok(val) => val, Err(err) => { - log::warn!(target: LOG_TARGET, "getAutomine RPC call failed: {err:?}"); - Ok(false) + log::info!(target: LOG_TARGET, "Node does not have getAutomine RPC. Defaulting to automine=false. error: {err:?}"); + false }, } } @@ -223,33 +223,18 @@ pub async fn connect( async fn broadcast_transact_hashes( transact_hashes_sender: &tokio::sync::broadcast::Sender, receipts: &[ReceiptInfo], - block_number: SubstrateBlockNumber, ) { - // Closure to extract the transaction hashes from the receipts. - let extract_transact_hashes = || { - let mut transact_hashes = HashSet::new(); - for receipt in receipts { - transact_hashes.insert(receipt.transaction_hash); - } - transact_hashes - }; - - // Only broadcast if there are subscribers. - if transact_hashes_sender.receiver_count() == 0 { + if transact_hashes_sender.receiver_count() == 0 || receipts.is_empty() { return; } - let transact_hashes = extract_transact_hashes(); - if transact_hashes.is_empty() { - return; - } + let transact_hashes: HashSet<_> = + receipts.iter().map(|receipt| receipt.transaction_hash).collect(); if let Err(err) = transact_hashes_sender.send(transact_hashes) { log::warn!( target: LOG_TARGET, - "Failed to broadcast transaction hashes for block: {:?}, error: {:?}", - block_number, - err + "Failed to broadcast transaction hashes. error: {err:?}", ); } } @@ -264,7 +249,9 @@ impl Client { receipt_provider: ReceiptProvider, ) -> Result { let (chain_id, max_block_weight, automine) = - tokio::try_join!(chain_id(&api), max_block_weight(&api), get_automine(&rpc_client))?; + tokio::try_join!(chain_id(&api), max_block_weight(&api), async { + Ok(get_automine(&rpc_client).await) + },)?; // Create the transaction hashes sender if automine is enabled. let transact_hashes_sender = automine.then(|| { @@ -387,7 +374,7 @@ impl Client { // Only broadcast for best blocks to avoid duplicate notifications. match (subscription_type, &self.transact_hashes_sender) { (SubscriptionType::BestBlocks, Some(sender)) => { - broadcast_transact_hashes(sender, &receipts, block.number()).await; + broadcast_transact_hashes(sender, &receipts).await; }, _ => {}, } @@ -784,7 +771,7 @@ impl Client { } /// Get the automine status from the node. - pub async fn get_automine(&self) -> Result { + pub async fn get_automine(&self) -> bool { get_automine(&self.rpc_client).await } } diff --git a/substrate/frame/revive/rpc/src/lib.rs b/substrate/frame/revive/rpc/src/lib.rs index 1b3b202805346..65f74c3d84ead 100644 --- a/substrate/frame/revive/rpc/src/lib.rs +++ b/substrate/frame/revive/rpc/src/lib.rs @@ -119,22 +119,26 @@ async fn wait_for_transaction_to_be_mined( timeout: Duration, ) -> Result { - let timeout_fut = tokio::time::sleep(timeout); - tokio::pin!(timeout_fut); - - loop { - tokio::select! { - _ = &mut timeout_fut => return Err(format!("Timed out waiting for transaction {hash:?} to be mined.")), - recv = transact_hashes_receiver.recv() => match recv { - Ok(hashes) => if hashes.contains(&hash) { return Ok(true); }, + tokio::time::timeout(timeout, async { + loop { + match transact_hashes_receiver.recv().await { + Ok(hashes) => { + if hashes.contains(&hash) { + return Ok(true); + } + } Err(broadcast::error::RecvError::Lagged(n)) => { - log::warn!(target : LOG_TARGET, "Missed {n} notifications while waiting for transaction {hash:?} to be mined."); + log::warn!(target: LOG_TARGET, "Missed {n} notifications while waiting for transaction {hash:?} to be mined."); continue; - }, - Err(err) => return Err(format!("Failed to receive transaction hashes: {err}.").into()), - }, + } + Err(err) => { + return Err(format!("Failed to receive transaction hashes: error: {err}.").into()); + } + } } - } + }) + .await + .map_err(|_| format!("Timed out waiting for transaction {hash:?} to be mined."))? } #[async_trait] @@ -214,7 +218,7 @@ impl EthRpcServer for EthRpcServerImpl { } log::debug!(target: LOG_TARGET, "send_raw_transaction hash: {hash:?}"); - return Ok(hash); + Ok(hash) } async fn send_transaction(&self, mut transaction: GenericTransaction) -> RpcResult { From acfabc1ab0e0127419405c07ad49ea5047abf716 Mon Sep 17 00:00:00 2001 From: pgherveou Date: Thu, 9 Oct 2025 21:26:49 +0200 Subject: [PATCH 09/11] Simplifications I think it's good enough to wait for the next block, no need to check if the actual transaction has been mined, this should be enough for test framework that assume that the next block is mined when eth_sendRawTransaction returns --- substrate/frame/revive/rpc/src/client.rs | 60 +++++++----------------- substrate/frame/revive/rpc/src/lib.rs | 53 +++++---------------- 2 files changed, 27 insertions(+), 86 deletions(-) diff --git a/substrate/frame/revive/rpc/src/client.rs b/substrate/frame/revive/rpc/src/client.rs index 014c48730bdeb..a538d548c9688 100644 --- a/substrate/frame/revive/rpc/src/client.rs +++ b/substrate/frame/revive/rpc/src/client.rs @@ -37,7 +37,7 @@ use pallet_revive::{ use runtime_api::RuntimeApi; use sp_runtime::traits::Block as BlockT; use sp_weights::Weight; -use std::{collections::HashSet, ops::Range, sync::Arc, time::Duration}; +use std::{ops::Range, sync::Arc, time::Duration}; use storage_api::StorageApi; use subxt::{ backend::{ @@ -68,8 +68,6 @@ pub type SubstrateBlockHash = HashFor; /// The runtime balance type. pub type Balance = u128; -pub type TransactHashes = HashSet; - /// The subscription type used to listen to new blocks. #[derive(Debug, Clone, Copy, PartialEq)] pub enum SubscriptionType { @@ -161,8 +159,10 @@ pub struct Client { fee_history_provider: FeeHistoryProvider, chain_id: u64, max_block_weight: Weight, + /// Whether the node has automine enabled. automine: bool, - transact_hashes_sender: Option>, + /// A notifier for new blocks when automine is enabled. + block_notifier: Option>, } /// Fetch the chain ID from the substrate chain. @@ -219,26 +219,6 @@ pub async fn connect( Ok((api, rpc_client, rpc)) } -/// Broadcast transaction hashes to subscribers. -async fn broadcast_transact_hashes( - transact_hashes_sender: &tokio::sync::broadcast::Sender, - receipts: &[ReceiptInfo], -) { - if transact_hashes_sender.receiver_count() == 0 || receipts.is_empty() { - return; - } - - let transact_hashes: HashSet<_> = - receipts.iter().map(|receipt| receipt.transaction_hash).collect(); - - if let Err(err) = transact_hashes_sender.send(transact_hashes) { - log::warn!( - target: LOG_TARGET, - "Failed to broadcast transaction hashes. error: {err:?}", - ); - } -} - impl Client { /// Create a new client instance. pub async fn new( @@ -253,13 +233,6 @@ impl Client { Ok(get_automine(&rpc_client).await) },)?; - // Create the transaction hashes sender if automine is enabled. - let transact_hashes_sender = automine.then(|| { - // Channel used to notify when a transaction hash is included in a block. - // Capacity of 32 should be enough since blocks are mined at a relatively low rate. - tokio::sync::broadcast::channel::(32).0 - }); - let client = Self { api, rpc_client, @@ -270,7 +243,7 @@ impl Client { chain_id, max_block_weight, automine, - transact_hashes_sender, + block_notifier: automine.then(|| tokio::sync::broadcast::channel::<()>(1).0), }; Ok(client) @@ -371,19 +344,19 @@ impl Client { let (signed_txs, receipts): (Vec<_>, Vec<_>) = self.receipt_provider.insert_block_receipts(&block).await?.into_iter().unzip(); - // Only broadcast for best blocks to avoid duplicate notifications. - match (subscription_type, &self.transact_hashes_sender) { - (SubscriptionType::BestBlocks, Some(sender)) => { - broadcast_transact_hashes(sender, &receipts).await; - }, - _ => {}, - } - let evm_block = self.evm_block_from_receipts(&block, &receipts, signed_txs, false).await; self.block_provider.update_latest(block, subscription_type).await; self.fee_history_provider.update_fee_history(&evm_block, &receipts).await; + + // Only broadcast for best blocks to avoid duplicate notifications. + match (subscription_type, &self.block_notifier) { + (SubscriptionType::BestBlocks, Some(sender)) if sender.receiver_count() > 0 => { + let _ = sender.send(()); + }, + _ => {}, + } Ok(()) }) .await @@ -737,10 +710,9 @@ impl Client { self.max_block_weight } - pub fn get_transact_hashes_sender( - &self, - ) -> Option> { - self.transact_hashes_sender.clone() + /// Get the block notifier, if automine is enabled. + pub fn block_notifier(&self) -> Option> { + self.block_notifier.clone() } /// Get the logs matching the given filter. diff --git a/substrate/frame/revive/rpc/src/lib.rs b/substrate/frame/revive/rpc/src/lib.rs index 65f74c3d84ead..d421230ef499a 100644 --- a/substrate/frame/revive/rpc/src/lib.rs +++ b/substrate/frame/revive/rpc/src/lib.rs @@ -17,7 +17,7 @@ //! The [`EthRpcServer`] RPC server implementation #![cfg_attr(docsrs, feature(doc_cfg))] -use client::{ClientError, TransactHashes}; +use client::ClientError; use jsonrpsee::{ core::{async_trait, RpcResult}, types::{ErrorCode, ErrorObjectOwned}, @@ -51,8 +51,6 @@ pub use receipt_extractor::*; mod apis; pub use apis::*; -use tokio::sync::broadcast; - pub const LOG_TARGET: &str = "eth-rpc"; /// An EVM RPC server implementation. @@ -113,34 +111,6 @@ impl From for ErrorObjectOwned { } } -async fn wait_for_transaction_to_be_mined( - hash: H256, - mut transact_hashes_receiver: broadcast::Receiver, - - timeout: Duration, -) -> Result { - tokio::time::timeout(timeout, async { - loop { - match transact_hashes_receiver.recv().await { - Ok(hashes) => { - if hashes.contains(&hash) { - return Ok(true); - } - } - Err(broadcast::error::RecvError::Lagged(n)) => { - log::warn!(target: LOG_TARGET, "Missed {n} notifications while waiting for transaction {hash:?} to be mined."); - continue; - } - Err(err) => { - return Err(format!("Failed to receive transaction hashes: error: {err}.").into()); - } - } - } - }) - .await - .map_err(|_| format!("Timed out waiting for transaction {hash:?} to be mined."))? -} - #[async_trait] impl EthRpcServer for EthRpcServerImpl { async fn net_version(&self) -> RpcResult { @@ -196,11 +166,8 @@ impl EthRpcServer for EthRpcServerImpl { let hash = H256(keccak_256(&transaction.0)); let call = subxt_client::tx().revive().eth_transact(transaction.0); - // Subscribe to transaction hashes only when automine is enabled. - let transact_hashes_receiver = - self.client.get_transact_hashes_sender().map(|transact_hashes_sender| { - transact_hashes_sender.subscribe() // each task gets its own receiver - }); + // Subscribe to new block only when automine is enabled. + let receiver = self.client.block_notifier().map(|sender| sender.subscribe()); // Submit the transaction self.client.submit(call).await.map_err(|err| { @@ -209,12 +176,14 @@ impl EthRpcServer for EthRpcServerImpl { })?; // Wait for the transaction to be included in a block if automine is enabled - if let Some(hashes_receiver) = transact_hashes_receiver { - let _ = wait_for_transaction_to_be_mined(hash, hashes_receiver, Duration::from_secs(1)) - .await - .map_err(|err| { - log::warn!(target : LOG_TARGET, "Waiting for tx receipt failed: {err}"); - }); + if let Some(mut receiver) = receiver { + let _ = tokio::time::timeout(Duration::from_millis(500), async { + let _ = receiver.recv().await; + }) + .inspect_err(|_| { + log::debug!(target: LOG_TARGET, "timeout waiting for new block"); + }) + .await; } log::debug!(target: LOG_TARGET, "send_raw_transaction hash: {hash:?}"); From d9e78aac72af2dbfc16062dfcd68c8ba40c9b608 Mon Sep 17 00:00:00 2001 From: pgherveou Date: Fri, 10 Oct 2025 14:00:03 +0200 Subject: [PATCH 10/11] fix --- substrate/frame/revive/rpc/src/client.rs | 20 +++++++++++--------- substrate/frame/revive/rpc/src/lib.rs | 14 ++++++++++---- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/substrate/frame/revive/rpc/src/client.rs b/substrate/frame/revive/rpc/src/client.rs index a538d548c9688..0477f3d0b7f5d 100644 --- a/substrate/frame/revive/rpc/src/client.rs +++ b/substrate/frame/revive/rpc/src/client.rs @@ -161,8 +161,9 @@ pub struct Client { max_block_weight: Weight, /// Whether the node has automine enabled. automine: bool, - /// A notifier for new blocks when automine is enabled. - block_notifier: Option>, + /// A notifier, that informs subscribers of new transaction hashes that are included in a + /// block, when automine is enabled. + tx_notifier: Option>, } /// Fetch the chain ID from the substrate chain. @@ -243,7 +244,7 @@ impl Client { chain_id, max_block_weight, automine, - block_notifier: automine.then(|| tokio::sync::broadcast::channel::<()>(1).0), + tx_notifier: automine.then(|| tokio::sync::broadcast::channel::(10).0), }; Ok(client) @@ -351,10 +352,11 @@ impl Client { self.fee_history_provider.update_fee_history(&evm_block, &receipts).await; // Only broadcast for best blocks to avoid duplicate notifications. - match (subscription_type, &self.block_notifier) { - (SubscriptionType::BestBlocks, Some(sender)) if sender.receiver_count() > 0 => { - let _ = sender.send(()); - }, + match (subscription_type, &self.tx_notifier) { + (SubscriptionType::BestBlocks, Some(sender)) if sender.receiver_count() > 0 => + for receipt in &receipts { + let _ = sender.send(receipt.transaction_hash); + }, _ => {}, } Ok(()) @@ -711,8 +713,8 @@ impl Client { } /// Get the block notifier, if automine is enabled. - pub fn block_notifier(&self) -> Option> { - self.block_notifier.clone() + pub fn tx_notifier(&self) -> Option> { + self.tx_notifier.clone() } /// Get the logs matching the given filter. diff --git a/substrate/frame/revive/rpc/src/lib.rs b/substrate/frame/revive/rpc/src/lib.rs index d421230ef499a..e1777c8d110cc 100644 --- a/substrate/frame/revive/rpc/src/lib.rs +++ b/substrate/frame/revive/rpc/src/lib.rs @@ -167,7 +167,7 @@ impl EthRpcServer for EthRpcServerImpl { let call = subxt_client::tx().revive().eth_transact(transaction.0); // Subscribe to new block only when automine is enabled. - let receiver = self.client.block_notifier().map(|sender| sender.subscribe()); + let receiver = self.client.tx_notifier().map(|sender| sender.subscribe()); // Submit the transaction self.client.submit(call).await.map_err(|err| { @@ -178,12 +178,18 @@ impl EthRpcServer for EthRpcServerImpl { // Wait for the transaction to be included in a block if automine is enabled if let Some(mut receiver) = receiver { let _ = tokio::time::timeout(Duration::from_millis(500), async { - let _ = receiver.recv().await; + loop { + if let Ok(mined_hash) = receiver.recv().await { + if mined_hash == hash { + break; + } + } + } }) + .await .inspect_err(|_| { log::debug!(target: LOG_TARGET, "timeout waiting for new block"); - }) - .await; + }); } log::debug!(target: LOG_TARGET, "send_raw_transaction hash: {hash:?}"); From 7be91ef6ebd570c886cffcc056ee9cd6697a8b2c Mon Sep 17 00:00:00 2001 From: pgherveou Date: Fri, 10 Oct 2025 14:42:23 +0200 Subject: [PATCH 11/11] add log --- substrate/frame/revive/rpc/src/lib.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/substrate/frame/revive/rpc/src/lib.rs b/substrate/frame/revive/rpc/src/lib.rs index e1777c8d110cc..686a2549b8822 100644 --- a/substrate/frame/revive/rpc/src/lib.rs +++ b/substrate/frame/revive/rpc/src/lib.rs @@ -177,19 +177,20 @@ impl EthRpcServer for EthRpcServerImpl { // Wait for the transaction to be included in a block if automine is enabled if let Some(mut receiver) = receiver { - let _ = tokio::time::timeout(Duration::from_millis(500), async { + if let Err(err) = tokio::time::timeout(Duration::from_millis(500), async { loop { if let Ok(mined_hash) = receiver.recv().await { if mined_hash == hash { + log::debug!(target: LOG_TARGET, "{hash:} was included in a block"); break; } } } }) .await - .inspect_err(|_| { - log::debug!(target: LOG_TARGET, "timeout waiting for new block"); - }); + { + log::debug!(target: LOG_TARGET, "timeout waiting for new block: {err:?}"); + } } log::debug!(target: LOG_TARGET, "send_raw_transaction hash: {hash:?}");