Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions crates/rpc/rpc-eth-api/src/helpers/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,15 @@ pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
) -> impl Future<Output = Result<B256, Self::Error>> + Send {
async move {
let recovered = recover_raw_transaction::<PoolPooledTx<Self::Pool>>(&tx)?;
self.send_transaction(WithEncoded::new(tx, recovered)).await
self.send_transaction(TransactionOrigin::External, WithEncoded::new(tx, recovered))
.await
}
}

/// Submits the transaction to the pool.
/// Submits the transaction to the pool with the given [`TransactionOrigin`].
fn send_transaction(
&self,
origin: TransactionOrigin,
tx: WithEncoded<Recovered<PoolPooledTx<Self::Pool>>>,
) -> impl Future<Output = Result<B256, Self::Error>> + Send;

Expand Down
3 changes: 2 additions & 1 deletion crates/rpc/rpc/src/eth/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,10 +556,11 @@ where
#[inline]
pub async fn add_pool_transaction(
&self,
origin: reth_transaction_pool::TransactionOrigin,
transaction: <N::Pool as TransactionPool>::Transaction,
) -> Result<AddedTransactionOutcome, EthApiError> {
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
let request = reth_transaction_pool::BatchTxRequest::new(transaction, response_tx);
let request = reth_transaction_pool::BatchTxRequest::new(origin, transaction, response_tx);

self.tx_batch_sender()
.send(request)
Expand Down
6 changes: 3 additions & 3 deletions crates/rpc/rpc/src/eth/helpers/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ where

async fn send_transaction(
&self,
origin: reth_transaction_pool::TransactionOrigin,
tx: WithEncoded<Recovered<PoolPooledTx<Self::Pool>>>,
) -> Result<B256, Self::Error> {
let (tx, recovered) = tx.split();
Expand Down Expand Up @@ -106,17 +107,16 @@ where
}).map_err(EthApiError::other)?;

// Retain tx in local tx pool after forwarding, for local RPC usage.
let _ = self.inner.add_pool_transaction(pool_transaction).await;
let _ = self.inner.add_pool_transaction(origin, pool_transaction).await;

return Ok(hash);
}

// broadcast raw transaction to subscribers if there is any.
self.broadcast_raw_transaction(tx);

// submit the transaction to the pool with a `Local` origin
let AddedTransactionOutcome { hash, .. } =
self.inner.add_pool_transaction(pool_transaction).await?;
self.inner.add_pool_transaction(origin, pool_transaction).await?;

Ok(hash)
}
Expand Down
3 changes: 2 additions & 1 deletion crates/transaction-pool/benches/insertion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ fn txpool_batch_insertion(c: &mut Criterion) {
let mut response_futures = Vec::with_capacity(tx_count);
for tx in txs {
let (response_tx, response_rx) = oneshot::channel();
let request = BatchTxRequest::new(tx, response_tx);
let request =
BatchTxRequest::new(TransactionOrigin::Local, tx, response_tx);
batch_requests.push(request);
response_futures.push(response_rx);
}
Expand Down
37 changes: 20 additions & 17 deletions crates/transaction-pool/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ use std::{
use tokio::sync::{mpsc, oneshot};

/// A single batch transaction request
/// All transactions processed through the batcher are considered local
/// transactions (`TransactionOrigin::Local`) when inserted into the pool.
#[derive(Debug)]
pub struct BatchTxRequest<T: PoolTransaction> {
/// Origin of the transaction (e.g. Local, External)
origin: TransactionOrigin,
/// Tx to be inserted in to the pool
pool_tx: T,
/// Channel to send result back to caller
Expand All @@ -31,10 +31,11 @@ where
{
/// Create a new batch transaction request
pub const fn new(
origin: TransactionOrigin,
pool_tx: T,
response_tx: oneshot::Sender<Result<AddedTransactionOutcome, PoolError>>,
) -> Self {
Self { pool_tx, response_tx }
Self { origin, pool_tx, response_tx }
}
}

Expand Down Expand Up @@ -66,24 +67,24 @@ where
}

async fn process_request(pool: &Pool, req: BatchTxRequest<Pool::Transaction>) {
let BatchTxRequest { pool_tx, response_tx } = req;
let pool_result = pool.add_transaction(TransactionOrigin::Local, pool_tx).await;
let BatchTxRequest { origin, pool_tx, response_tx } = req;
let pool_result = pool.add_transaction(origin, pool_tx).await;
let _ = response_tx.send(pool_result);
}

/// Process a batch of transaction requests, grouped by origin
async fn process_batch(pool: &Pool, mut batch: Vec<BatchTxRequest<Pool::Transaction>>) {
/// Process a batch of transaction requests with per-transaction origins
async fn process_batch(pool: &Pool, batch: Vec<BatchTxRequest<Pool::Transaction>>) {
if batch.len() == 1 {
Self::process_request(pool, batch.remove(0)).await;
Self::process_request(pool, batch.into_iter().next().expect("batch is not empty"))
.await;
return
}

let (pool_transactions, response_tx): (Vec<_>, Vec<_>) =
batch.into_iter().map(|req| (req.pool_tx, req.response_tx)).unzip();
let (transactions, response_txs): (Vec<_>, Vec<_>) =
batch.into_iter().map(|req| ((req.origin, req.pool_tx), req.response_tx)).unzip();

let pool_results = pool.add_transactions(TransactionOrigin::Local, pool_transactions).await;

for (response_tx, pool_result) in response_tx.into_iter().zip(pool_results) {
let pool_results = pool.add_transactions_with_origins(transactions).await;
for (response_tx, pool_result) in response_txs.into_iter().zip(pool_results) {
let _ = response_tx.send(pool_result);
}
}
Expand Down Expand Up @@ -138,7 +139,7 @@ mod tests {
let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
let (response_tx, response_rx) = tokio::sync::oneshot::channel();

batch_requests.push(BatchTxRequest::new(tx, response_tx));
batch_requests.push(BatchTxRequest::new(TransactionOrigin::Local, tx, response_tx));
responses.push(response_rx);
}

Expand Down Expand Up @@ -167,7 +168,9 @@ mod tests {
let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
let (response_tx, response_rx) = tokio::sync::oneshot::channel();

request_tx.send(BatchTxRequest::new(tx, response_tx)).expect("Could not send batch tx");
request_tx
.send(BatchTxRequest::new(TransactionOrigin::Local, tx, response_tx))
.expect("Could not send batch tx");
responses.push(response_rx);
}

Expand Down Expand Up @@ -197,7 +200,7 @@ mod tests {
for i in 0..10 {
let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100);
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
let request = BatchTxRequest::new(tx, response_tx);
let request = BatchTxRequest::new(TransactionOrigin::Local, tx, response_tx);
request_tx.send(request).expect("Could not send batch tx");
results.push(response_rx);
}
Expand Down Expand Up @@ -225,7 +228,7 @@ mod tests {
for i in 0..max_batch_size {
let tx = MockTransaction::legacy().with_nonce(i as u64).with_gas_price(100);
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
let request = BatchTxRequest::new(tx, response_tx);
let request = BatchTxRequest::new(TransactionOrigin::Local, tx, response_tx);
let request_tx_clone = request_tx.clone();

let tx_fut = async move {
Expand Down
23 changes: 6 additions & 17 deletions crates/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,17 +389,6 @@ where
self.pool.validator().validate_transaction(origin, transaction).await
}

/// Returns future that validates all transactions in the given iterator.
///
/// This returns the validated transactions in the iterator's order.
async fn validate_all(
&self,
origin: TransactionOrigin,
transactions: impl IntoIterator<Item = V::Transaction> + Send,
) -> Vec<TransactionValidationOutcome<V::Transaction>> {
self.pool.validator().validate_transactions_with_origin(origin, transactions).await
}

/// Number of transactions in the entire pool
pub fn len(&self) -> usize {
self.pool.len()
Expand Down Expand Up @@ -513,17 +502,17 @@ where
results.pop().expect("result length is the same as the input")
}

async fn add_transactions(
async fn add_transactions_with_origins(
&self,
origin: TransactionOrigin,
transactions: Vec<Self::Transaction>,
transactions: impl IntoIterator<Item = (TransactionOrigin, Self::Transaction)> + Send,
) -> Vec<PoolResult<AddedTransactionOutcome>> {
let transactions: Vec<_> = transactions.into_iter().collect();
if transactions.is_empty() {
return Vec::new()
}
let validated = self.validate_all(origin, transactions).await;

self.pool.add_transactions(origin, validated.into_iter())
let origins: Vec<_> = transactions.iter().map(|(origin, _)| *origin).collect();
let validated = self.pool.validator().validate_transactions(transactions).await;
self.pool.add_transactions_with_origins(origins.into_iter().zip(validated))
}

fn transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
Expand Down
7 changes: 3 additions & 4 deletions crates/transaction-pool/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,13 @@ impl<T: EthPoolTransaction> TransactionPool for NoopTransactionPool<T> {
Err(PoolError::other(hash, Box::new(NoopInsertError::new(transaction))))
}

async fn add_transactions(
async fn add_transactions_with_origins(
&self,
_origin: TransactionOrigin,
transactions: Vec<Self::Transaction>,
transactions: impl IntoIterator<Item = (TransactionOrigin, Self::Transaction)> + Send,
) -> Vec<PoolResult<AddedTransactionOutcome>> {
transactions
.into_iter()
.map(|transaction| {
.map(|(_, transaction)| {
let hash = *transaction.hash();
Err(PoolError::other(hash, Box::new(NoopInsertError::new(transaction))))
})
Expand Down
16 changes: 15 additions & 1 deletion crates/transaction-pool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,10 +630,24 @@ where
}

/// Adds all transactions in the iterator to the pool, returning a list of results.
///
/// Convenience method that assigns the same origin to all transactions. Delegates to
/// [`Self::add_transactions_with_origins`].
pub fn add_transactions(
&self,
origin: TransactionOrigin,
transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
) -> Vec<PoolResult<AddedTransactionOutcome>> {
self.add_transactions_with_origins(transactions.into_iter().map(|tx| (origin, tx)))
}

/// Adds all transactions in the iterator to the pool, each with its own
/// [`TransactionOrigin`], returning a list of results.
pub fn add_transactions_with_origins(
&self,
transactions: impl IntoIterator<
Item = (TransactionOrigin, TransactionValidationOutcome<T::Transaction>),
>,
) -> Vec<PoolResult<AddedTransactionOutcome>> {
// Collect results and metadata while holding the pool write lock
let (mut results, added_metas, discarded) = {
Expand All @@ -642,7 +656,7 @@ where

let results = transactions
.into_iter()
.map(|tx| {
.map(|(origin, tx)| {
let (result, meta) = self.add_transaction(&mut pool, origin, tx);

// Only collect metadata for successful insertions
Expand Down
14 changes: 14 additions & 0 deletions crates/transaction-pool/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,20 @@ pub trait TransactionPool: Clone + Debug + Send + Sync {
&self,
origin: TransactionOrigin,
transactions: Vec<Self::Transaction>,
) -> impl Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send {
self.add_transactions_with_origins(transactions.into_iter().map(move |tx| (origin, tx)))
}

/// Adds the given _unvalidated_ transactions into the pool.
///
/// Each transaction is paired with its own [`TransactionOrigin`].
///
/// Returns a list of results.
///
/// Consumer: RPC
fn add_transactions_with_origins(
&self,
transactions: impl IntoIterator<Item = (TransactionOrigin, Self::Transaction)> + Send,
) -> impl Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send;

/// Submit a consensus transaction directly to the pool
Expand Down
4 changes: 2 additions & 2 deletions crates/transaction-pool/src/validate/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ where
/// Validates all given transactions.
fn validate_batch(
&self,
transactions: Vec<(TransactionOrigin, Tx)>,
transactions: impl IntoIterator<Item = (TransactionOrigin, Tx)>,
) -> Vec<TransactionValidationOutcome<Tx>> {
let mut provider = None;
transactions
Expand Down Expand Up @@ -853,7 +853,7 @@ where

async fn validate_transactions(
&self,
transactions: Vec<(TransactionOrigin, Self::Transaction)>,
transactions: impl IntoIterator<Item = (TransactionOrigin, Self::Transaction)> + Send,
) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
self.validate_batch(transactions)
}
Expand Down
4 changes: 2 additions & 2 deletions crates/transaction-pool/src/validate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ pub trait TransactionValidator: Debug + Send + Sync {
/// See also [`Self::validate_transaction`].
fn validate_transactions(
&self,
transactions: Vec<(TransactionOrigin, Self::Transaction)>,
transactions: impl IntoIterator<Item = (TransactionOrigin, Self::Transaction)> + Send,
) -> impl Future<Output = Vec<TransactionValidationOutcome<Self::Transaction>>> + Send {
futures_util::future::join_all(
transactions.into_iter().map(|(origin, tx)| self.validate_transaction(origin, tx)),
Expand Down Expand Up @@ -260,7 +260,7 @@ where

async fn validate_transactions(
&self,
transactions: Vec<(TransactionOrigin, Self::Transaction)>,
transactions: impl IntoIterator<Item = (TransactionOrigin, Self::Transaction)> + Send,
) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
match self {
Self::Left(v) => v.validate_transactions(transactions).await,
Expand Down
6 changes: 4 additions & 2 deletions crates/transaction-pool/src/validate/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,9 @@ where

async fn validate_transactions(
&self,
transactions: Vec<(TransactionOrigin, Self::Transaction)>,
transactions: impl IntoIterator<Item = (TransactionOrigin, Self::Transaction)> + Send,
) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
let transactions: Vec<_> = transactions.into_iter().collect();
let hashes: Vec<_> = transactions.iter().map(|(_, tx)| *tx.hash()).collect();
let (tx, rx) = oneshot::channel();
{
Expand Down Expand Up @@ -300,7 +301,8 @@ where
origin: TransactionOrigin,
transactions: impl IntoIterator<Item = Self::Transaction> + Send,
) -> Vec<TransactionValidationOutcome<Self::Transaction>> {
self.validate_transactions(transactions.into_iter().map(|tx| (origin, tx)).collect()).await
let transactions: Vec<_> = transactions.into_iter().map(|tx| (origin, tx)).collect();
self.validate_transactions(transactions).await
}

fn on_new_head_block(&self, new_tip_block: &SealedBlock<Self::Block>) {
Expand Down
Loading