diff --git a/crates/rpc/rpc-eth-types/src/logs_utils.rs b/crates/rpc/rpc-eth-types/src/logs_utils.rs index 6562c3043df..4eb2ee7c001 100644 --- a/crates/rpc/rpc-eth-types/src/logs_utils.rs +++ b/crates/rpc/rpc-eth-types/src/logs_utils.rs @@ -79,6 +79,10 @@ pub fn append_matching_block_logs

( where P: BlockReader, { + if !filter.matches_block(&block_num_hash) { + return Ok(()); + } + // Tracks the index of a log in the entire block. let mut log_index: u64 = 0; diff --git a/crates/rpc/rpc/src/eth/core.rs b/crates/rpc/rpc/src/eth/core.rs index 28fc38e0783..83cefe290f3 100644 --- a/crates/rpc/rpc/src/eth/core.rs +++ b/crates/rpc/rpc/src/eth/core.rs @@ -556,10 +556,11 @@ where #[inline] pub async fn add_pool_transaction( &self, + origin: reth_transaction_pool::TransactionOrigin, transaction: ::Transaction, ) -> Result { let (response_tx, response_rx) = tokio::sync::oneshot::channel(); - let request = reth_transaction_pool::BatchTxRequest::new(transaction, response_tx); + let request = reth_transaction_pool::BatchTxRequest::new(transaction, origin, response_tx); self.tx_batch_sender() .send(request) diff --git a/crates/rpc/rpc/src/eth/helpers/transaction.rs b/crates/rpc/rpc/src/eth/helpers/transaction.rs index 5aee1387353..fc9ede87fc4 100644 --- a/crates/rpc/rpc/src/eth/helpers/transaction.rs +++ b/crates/rpc/rpc/src/eth/helpers/transaction.rs @@ -17,7 +17,7 @@ use reth_rpc_eth_types::{error::RpcPoolError, EthApiError}; use reth_storage_api::BlockReaderIdExt; use reth_transaction_pool::{ error::Eip4844PoolTransactionError, AddedTransactionOutcome, EthBlobTransactionSidecar, - EthPoolTransaction, PoolPooledTx, PoolTransaction, TransactionPool, + EthPoolTransaction, PoolPooledTx, PoolTransaction, TransactionOrigin, TransactionPool, }; impl EthTransactions for EthApi @@ -105,8 +105,11 @@ where tracing::debug!(target: "rpc::eth", %err, hash=% *pool_transaction.hash(), "failed to forward raw transaction"); }).map_err(EthApiError::other)?; - // Retain tx in local tx pool after forwarding, for local RPC usage. - let _ = self.inner.add_pool_transaction(pool_transaction).await; + // Retain tx in local tx pool after forwarding. + let _ = self + .inner + .add_pool_transaction(TransactionOrigin::External, pool_transaction) + .await; return Ok(hash); } @@ -114,9 +117,8 @@ where // broadcast raw transaction to subscribers if there is any. self.broadcast_raw_transaction(tx); - // submit the transaction to the pool with a `Local` origin let AddedTransactionOutcome { hash, .. } = - self.inner.add_pool_transaction(pool_transaction).await?; + self.inner.add_pool_transaction(TransactionOrigin::External, pool_transaction).await?; Ok(hash) } diff --git a/crates/transaction-pool/src/batcher.rs b/crates/transaction-pool/src/batcher.rs index 75280e68b3c..d43c7d9d98b 100644 --- a/crates/transaction-pool/src/batcher.rs +++ b/crates/transaction-pool/src/batcher.rs @@ -15,12 +15,12 @@ use std::{ use tokio::sync::{mpsc, oneshot}; /// A single batch transaction request -/// All transactions processed through the batcher are considered local -/// transactions (`TransactionOrigin::Local`) when inserted into the pool. #[derive(Debug)] pub struct BatchTxRequest { /// Tx to be inserted in to the pool pool_tx: T, + /// The origin of the transaction + origin: TransactionOrigin, /// Channel to send result back to caller response_tx: oneshot::Sender>, } @@ -32,9 +32,10 @@ where /// Create a new batch transaction request pub const fn new( pool_tx: T, + origin: TransactionOrigin, response_tx: oneshot::Sender>, ) -> Self { - Self { pool_tx, response_tx } + Self { pool_tx, origin, response_tx } } } @@ -66,25 +67,44 @@ where } async fn process_request(pool: &Pool, req: BatchTxRequest) { - let BatchTxRequest { pool_tx, response_tx } = req; - let pool_result = pool.add_transaction(TransactionOrigin::Local, pool_tx).await; + let BatchTxRequest { pool_tx, origin, response_tx } = req; + let pool_result = pool.add_transaction(origin, pool_tx).await; let _ = response_tx.send(pool_result); } - /// Process a batch of transaction requests, grouped by origin async fn process_batch(pool: &Pool, mut batch: Vec>) { if batch.len() == 1 { Self::process_request(pool, batch.remove(0)).await; return } - let (pool_transactions, response_tx): (Vec<_>, Vec<_>) = - batch.into_iter().map(|req| (req.pool_tx, req.response_tx)).unzip(); + let mut local_txs = Vec::new(); + let mut local_responses = Vec::new(); + let mut external_txs = Vec::new(); + let mut external_responses = Vec::new(); + + for req in batch { + if req.origin.is_local() { + local_txs.push(req.pool_tx); + local_responses.push(req.response_tx); + } else { + external_txs.push(req.pool_tx); + external_responses.push(req.response_tx); + } + } - let pool_results = pool.add_transactions(TransactionOrigin::Local, pool_transactions).await; + if !local_txs.is_empty() { + let results = pool.add_transactions(TransactionOrigin::Local, local_txs).await; + for (tx, result) in local_responses.into_iter().zip(results) { + let _ = tx.send(result); + } + } - for (response_tx, pool_result) in response_tx.into_iter().zip(pool_results) { - let _ = response_tx.send(pool_result); + if !external_txs.is_empty() { + let results = pool.add_transactions(TransactionOrigin::External, external_txs).await; + for (tx, result) in external_responses.into_iter().zip(results) { + let _ = tx.send(result); + } } } } @@ -138,7 +158,7 @@ mod tests { let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100); let (response_tx, response_rx) = tokio::sync::oneshot::channel(); - batch_requests.push(BatchTxRequest::new(tx, response_tx)); + batch_requests.push(BatchTxRequest::new(tx, TransactionOrigin::Local, response_tx)); responses.push(response_rx); } @@ -167,7 +187,9 @@ mod tests { let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100); let (response_tx, response_rx) = tokio::sync::oneshot::channel(); - request_tx.send(BatchTxRequest::new(tx, response_tx)).expect("Could not send batch tx"); + request_tx + .send(BatchTxRequest::new(tx, TransactionOrigin::Local, response_tx)) + .expect("Could not send batch tx"); responses.push(response_rx); } @@ -197,7 +219,7 @@ mod tests { for i in 0..10 { let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100); let (response_tx, response_rx) = tokio::sync::oneshot::channel(); - let request = BatchTxRequest::new(tx, response_tx); + let request = BatchTxRequest::new(tx, TransactionOrigin::Local, response_tx); request_tx.send(request).expect("Could not send batch tx"); results.push(response_rx); } @@ -225,7 +247,7 @@ mod tests { for i in 0..max_batch_size { let tx = MockTransaction::legacy().with_nonce(i as u64).with_gas_price(100); let (response_tx, response_rx) = tokio::sync::oneshot::channel(); - let request = BatchTxRequest::new(tx, response_tx); + let request = BatchTxRequest::new(tx, TransactionOrigin::Local, response_tx); let request_tx_clone = request_tx.clone(); let tx_fut = async move {