Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/rpc/rpc-eth-types/src/logs_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ pub fn append_matching_block_logs<P>(
where
P: BlockReader<Transaction: SignedTransaction>,
{
if !filter.matches_block(&block_num_hash) {
return Ok(());
}

// Tracks the index of a log in the entire block.
let mut log_index: u64 = 0;

Expand Down
3 changes: 2 additions & 1 deletion crates/rpc/rpc/src/eth/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,10 +556,11 @@ where
#[inline]
pub async fn add_pool_transaction(
&self,
origin: reth_transaction_pool::TransactionOrigin,
transaction: <N::Pool as TransactionPool>::Transaction,
) -> Result<AddedTransactionOutcome, EthApiError> {
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
let request = reth_transaction_pool::BatchTxRequest::new(transaction, response_tx);
let request = reth_transaction_pool::BatchTxRequest::new(transaction, origin, response_tx);

self.tx_batch_sender()
.send(request)
Expand Down
12 changes: 7 additions & 5 deletions crates/rpc/rpc/src/eth/helpers/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use reth_rpc_eth_types::{error::RpcPoolError, EthApiError};
use reth_storage_api::BlockReaderIdExt;
use reth_transaction_pool::{
error::Eip4844PoolTransactionError, AddedTransactionOutcome, EthBlobTransactionSidecar,
EthPoolTransaction, PoolPooledTx, PoolTransaction, TransactionPool,
EthPoolTransaction, PoolPooledTx, PoolTransaction, TransactionOrigin, TransactionPool,
};

impl<N, Rpc> EthTransactions for EthApi<N, Rpc>
Expand Down Expand Up @@ -105,18 +105,20 @@ where
tracing::debug!(target: "rpc::eth", %err, hash=% *pool_transaction.hash(), "failed to forward raw transaction");
}).map_err(EthApiError::other)?;

// Retain tx in local tx pool after forwarding, for local RPC usage.
let _ = self.inner.add_pool_transaction(pool_transaction).await;
// Retain tx in local tx pool after forwarding.
let _ = self
.inner
.add_pool_transaction(TransactionOrigin::External, pool_transaction)
.await;

return Ok(hash);
}

// broadcast raw transaction to subscribers if there is any.
self.broadcast_raw_transaction(tx);

// submit the transaction to the pool with a `Local` origin
let AddedTransactionOutcome { hash, .. } =
self.inner.add_pool_transaction(pool_transaction).await?;
self.inner.add_pool_transaction(TransactionOrigin::External, pool_transaction).await?;

Ok(hash)
}
Expand Down
52 changes: 37 additions & 15 deletions crates/transaction-pool/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ use std::{
use tokio::sync::{mpsc, oneshot};

/// A single batch transaction request
/// All transactions processed through the batcher are considered local
/// transactions (`TransactionOrigin::Local`) when inserted into the pool.
#[derive(Debug)]
pub struct BatchTxRequest<T: PoolTransaction> {
/// Tx to be inserted in to the pool
pool_tx: T,
/// The origin of the transaction
origin: TransactionOrigin,
/// Channel to send result back to caller
response_tx: oneshot::Sender<Result<AddedTransactionOutcome, PoolError>>,
}
Expand All @@ -32,9 +32,10 @@ where
/// Create a new batch transaction request
pub const fn new(
pool_tx: T,
origin: TransactionOrigin,
response_tx: oneshot::Sender<Result<AddedTransactionOutcome, PoolError>>,
) -> Self {
Self { pool_tx, response_tx }
Self { pool_tx, origin, response_tx }
}
}

Expand Down Expand Up @@ -66,25 +67,44 @@ where
}

async fn process_request(pool: &Pool, req: BatchTxRequest<Pool::Transaction>) {
let BatchTxRequest { pool_tx, response_tx } = req;
let pool_result = pool.add_transaction(TransactionOrigin::Local, pool_tx).await;
let BatchTxRequest { pool_tx, origin, response_tx } = req;
let pool_result = pool.add_transaction(origin, pool_tx).await;
let _ = response_tx.send(pool_result);
}

/// Process a batch of transaction requests, grouped by origin
async fn process_batch(pool: &Pool, mut batch: Vec<BatchTxRequest<Pool::Transaction>>) {
if batch.len() == 1 {
Self::process_request(pool, batch.remove(0)).await;
return
}

let (pool_transactions, response_tx): (Vec<_>, Vec<_>) =
batch.into_iter().map(|req| (req.pool_tx, req.response_tx)).unzip();
let mut local_txs = Vec::new();
let mut local_responses = Vec::new();
let mut external_txs = Vec::new();
let mut external_responses = Vec::new();

for req in batch {
if req.origin.is_local() {
local_txs.push(req.pool_tx);
local_responses.push(req.response_tx);
} else {
external_txs.push(req.pool_tx);
external_responses.push(req.response_tx);
}
}

let pool_results = pool.add_transactions(TransactionOrigin::Local, pool_transactions).await;
if !local_txs.is_empty() {
let results = pool.add_transactions(TransactionOrigin::Local, local_txs).await;
for (tx, result) in local_responses.into_iter().zip(results) {
let _ = tx.send(result);
}
}

for (response_tx, pool_result) in response_tx.into_iter().zip(pool_results) {
let _ = response_tx.send(pool_result);
if !external_txs.is_empty() {
let results = pool.add_transactions(TransactionOrigin::External, external_txs).await;
for (tx, result) in external_responses.into_iter().zip(results) {
let _ = tx.send(result);
}
}
}
}
Expand Down Expand Up @@ -138,7 +158,7 @@ mod tests {
let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
let (response_tx, response_rx) = tokio::sync::oneshot::channel();

batch_requests.push(BatchTxRequest::new(tx, response_tx));
batch_requests.push(BatchTxRequest::new(tx, TransactionOrigin::Local, response_tx));
responses.push(response_rx);
}

Expand Down Expand Up @@ -167,7 +187,9 @@ mod tests {
let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
let (response_tx, response_rx) = tokio::sync::oneshot::channel();

request_tx.send(BatchTxRequest::new(tx, response_tx)).expect("Could not send batch tx");
request_tx
.send(BatchTxRequest::new(tx, TransactionOrigin::Local, response_tx))
.expect("Could not send batch tx");
responses.push(response_rx);
}

Expand Down Expand Up @@ -197,7 +219,7 @@ mod tests {
for i in 0..10 {
let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
let request = BatchTxRequest::new(tx, response_tx);
let request = BatchTxRequest::new(tx, TransactionOrigin::Local, response_tx);
request_tx.send(request).expect("Could not send batch tx");
results.push(response_rx);
}
Expand Down Expand Up @@ -225,7 +247,7 @@ mod tests {
for i in 0..max_batch_size {
let tx = MockTransaction::legacy().with_nonce(i as u64).with_gas_price(100);
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
let request = BatchTxRequest::new(tx, response_tx);
let request = BatchTxRequest::new(tx, TransactionOrigin::Local, response_tx);
let request_tx_clone = request_tx.clone();

let tx_fut = async move {
Expand Down
Loading