diff --git a/Cargo.lock b/Cargo.lock index c0c2f890926..7d06a84f1dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10508,10 +10508,12 @@ dependencies = [ "auto_impl", "bitflags 2.9.1", "codspeed-criterion-compat", + "futures", "futures-util", "metrics", "parking_lot", "paste", + "pin-project", "proptest", "proptest-arbitrary-interop", "rand 0.9.2", diff --git a/crates/node/builder/src/rpc.rs b/crates/node/builder/src/rpc.rs index e1ecf93b5de..b6b6f344c8a 100644 --- a/crates/node/builder/src/rpc.rs +++ b/crates/node/builder/src/rpc.rs @@ -18,6 +18,7 @@ use reth_node_api::{ NodeAddOns, NodeTypes, PayloadTypes, PayloadValidator, PrimitivesTy, TreeConfig, }; use reth_node_core::{ + cli::config::RethTransactionPoolConfig, node_config::NodeConfig, version::{version_metadata, CLIENT_CODE}, }; @@ -863,7 +864,8 @@ where }), ); - let ctx = EthApiCtx { components: &node, config: config.rpc.eth_config(), cache }; + let eth_config = config.rpc.eth_config().max_batch_size(config.txpool.max_batch_size()); + let ctx = EthApiCtx { components: &node, config: eth_config, cache }; let eth_api = eth_api_builder.build_eth_api(ctx).await?; let auth_config = config.rpc.auth_server_config(jwt_secret)?; @@ -1040,6 +1042,7 @@ impl<'a, N: FullNodeComponents>> .fee_history_cache_config(self.config.fee_history_cache) .proof_permits(self.config.proof_permits) .gas_oracle_config(self.config.gas_oracle) + .max_batch_size(self.config.max_batch_size) } } diff --git a/crates/node/core/src/args/txpool.rs b/crates/node/core/src/args/txpool.rs index bcb033301fc..164fc83d4b1 100644 --- a/crates/node/core/src/args/txpool.rs +++ b/crates/node/core/src/args/txpool.rs @@ -132,6 +132,10 @@ pub struct TxPoolArgs { conflicts_with = "transactions_backup_path" )] pub disable_transactions_backup: bool, + + /// Max batch size for transaction pool insertions + #[arg(long = "txpool.max-batch-size", default_value_t = 1)] + pub max_batch_size: usize, } impl Default for TxPoolArgs { @@ -165,6 +169,7 @@ impl Default for TxPoolArgs { max_queued_lifetime: MAX_QUEUED_TRANSACTION_LIFETIME, transactions_backup_path: None, disable_transactions_backup: false, + max_batch_size: 1, } } } @@ -209,6 +214,11 @@ impl RethTransactionPoolConfig for TxPoolArgs { max_queued_lifetime: self.max_queued_lifetime, } } + + /// Returns max batch size for transaction batch insertion. + fn max_batch_size(&self) -> usize { + self.max_batch_size + } } #[cfg(test)] diff --git a/crates/node/core/src/cli/config.rs b/crates/node/core/src/cli/config.rs index 186d0c7d88f..c232d8b6e23 100644 --- a/crates/node/core/src/cli/config.rs +++ b/crates/node/core/src/cli/config.rs @@ -87,4 +87,7 @@ impl RethNetworkConfig for reth_network::NetworkManager pub trait RethTransactionPoolConfig { /// Returns transaction pool configuration. fn pool_config(&self) -> PoolConfig; + + /// Returns max batch size for transaction batch insertion. + fn max_batch_size(&self) -> usize; } diff --git a/crates/optimism/rpc/src/eth/transaction.rs b/crates/optimism/rpc/src/eth/transaction.rs index f8437c12623..cb117dac5d7 100644 --- a/crates/optimism/rpc/src/eth/transaction.rs +++ b/crates/optimism/rpc/src/eth/transaction.rs @@ -46,11 +46,8 @@ where })?; // Retain tx in local tx pool after forwarding, for local RPC usage. - let _ = self - .pool() - .add_transaction(TransactionOrigin::Local, pool_transaction) - .await.inspect_err(|err| { - tracing::warn!(target: "rpc::eth", %err, %hash, "successfully sent tx to sequencer, but failed to persist in local tx pool"); + let _ = self.inner.eth_api.add_pool_transaction(pool_transaction).await.inspect_err(|err| { + tracing::warn!(target: "rpc::eth", %err, %hash, "successfully sent tx to sequencer, but failed to persist in local tx pool"); }); return Ok(hash) diff --git a/crates/rpc/rpc-eth-types/src/builder/config.rs b/crates/rpc/rpc-eth-types/src/builder/config.rs index 6503e83f2e5..8b2fd229ba1 100644 --- a/crates/rpc/rpc-eth-types/src/builder/config.rs +++ b/crates/rpc/rpc-eth-types/src/builder/config.rs @@ -45,6 +45,8 @@ pub struct EthConfig { pub fee_history_cache: FeeHistoryCacheConfig, /// The maximum number of getproof calls that can be executed concurrently. pub proof_permits: usize, + /// Maximum batch size for transaction pool insertions. + pub max_batch_size: usize, } impl EthConfig { @@ -72,6 +74,7 @@ impl Default for EthConfig { stale_filter_ttl: DEFAULT_STALE_FILTER_TTL, fee_history_cache: FeeHistoryCacheConfig::default(), proof_permits: DEFAULT_PROOF_PERMITS, + max_batch_size: 1, } } } @@ -136,6 +139,12 @@ impl EthConfig { self.proof_permits = permits; self } + + /// Configures the maximum batch size for transaction pool insertions + pub const fn max_batch_size(mut self, max_batch_size: usize) -> Self { + self.max_batch_size = max_batch_size; + self + } } /// Config for the filter diff --git a/crates/rpc/rpc-eth-types/src/error/mod.rs b/crates/rpc/rpc-eth-types/src/error/mod.rs index a8e8354fed1..413e15585a8 100644 --- a/crates/rpc/rpc-eth-types/src/error/mod.rs +++ b/crates/rpc/rpc-eth-types/src/error/mod.rs @@ -24,6 +24,7 @@ use revm::context_interface::result::{ }; use revm_inspectors::tracing::MuxError; use std::convert::Infallible; +use tokio::sync::oneshot::error::RecvError; use tracing::error; /// A trait to convert an error to an RPC error. @@ -166,6 +167,12 @@ pub enum EthApiError { /// Duration that was waited before timing out duration: Duration, }, + /// Error thrown when batch tx response channel fails + #[error(transparent)] + BatchTxRecvError(#[from] RecvError), + /// Error thrown when batch tx send channel fails + #[error("Batch transaction sender channel closed")] + BatchTxSendError, /// Any other error #[error("{0}")] Other(Box), @@ -279,6 +286,10 @@ impl From for jsonrpsee_types::error::ErrorObject<'static> { EthApiError::PrunedHistoryUnavailable => rpc_error_with_code(4444, error.to_string()), EthApiError::Other(err) => err.to_rpc_error(), EthApiError::MuxTracerError(msg) => internal_rpc_err(msg.to_string()), + EthApiError::BatchTxRecvError(err) => internal_rpc_err(err.to_string()), + EthApiError::BatchTxSendError => { + internal_rpc_err("Batch transaction sender channel closed".to_string()) + } } } } diff --git a/crates/rpc/rpc/src/eth/builder.rs b/crates/rpc/rpc/src/eth/builder.rs index 2e6a6dcf91f..6ef68acbe28 100644 --- a/crates/rpc/rpc/src/eth/builder.rs +++ b/crates/rpc/rpc/src/eth/builder.rs @@ -40,6 +40,7 @@ pub struct EthApiBuilder { blocking_task_pool: Option, task_spawner: Box, next_env: NextEnv, + max_batch_size: usize, } impl @@ -78,6 +79,7 @@ impl EthApiBuilder { blocking_task_pool, task_spawner, next_env, + max_batch_size, } = self; EthApiBuilder { components, @@ -94,6 +96,7 @@ impl EthApiBuilder { blocking_task_pool, task_spawner, next_env, + max_batch_size, } } } @@ -121,6 +124,7 @@ where gas_oracle_config: Default::default(), eth_state_cache_config: Default::default(), next_env: Default::default(), + max_batch_size: 1, } } } @@ -155,6 +159,7 @@ where task_spawner, gas_oracle_config, next_env, + max_batch_size, } = self; EthApiBuilder { components, @@ -171,6 +176,7 @@ where task_spawner, gas_oracle_config, next_env, + max_batch_size, } } @@ -194,6 +200,7 @@ where task_spawner, gas_oracle_config, next_env: _, + max_batch_size, } = self; EthApiBuilder { components, @@ -210,6 +217,7 @@ where task_spawner, gas_oracle_config, next_env, + max_batch_size, } } @@ -281,6 +289,12 @@ where self } + /// Sets the max batch size for batching transaction insertions. + pub const fn max_batch_size(mut self, max_batch_size: usize) -> Self { + self.max_batch_size = max_batch_size; + self + } + /// Builds the [`EthApiInner`] instance. /// /// If not configured, this will spawn the cache backend: [`EthStateCache::spawn`]. @@ -309,6 +323,7 @@ where proof_permits, task_spawner, next_env, + max_batch_size, } = self; let provider = components.provider().clone(); @@ -345,6 +360,7 @@ where proof_permits, rpc_converter, next_env, + max_batch_size, ) } diff --git a/crates/rpc/rpc/src/eth/core.rs b/crates/rpc/rpc/src/eth/core.rs index ffad40b0117..6868f15a02a 100644 --- a/crates/rpc/rpc/src/eth/core.rs +++ b/crates/rpc/rpc/src/eth/core.rs @@ -28,8 +28,11 @@ use reth_tasks::{ pool::{BlockingTaskGuard, BlockingTaskPool}, TaskSpawner, TokioTaskExecutor, }; -use reth_transaction_pool::noop::NoopTransactionPool; -use tokio::sync::{broadcast, Mutex}; +use reth_transaction_pool::{ + noop::NoopTransactionPool, AddedTransactionOutcome, BatchTxProcessor, BatchTxRequest, + TransactionPool, +}; +use tokio::sync::{broadcast, mpsc, Mutex}; const DEFAULT_BROADCAST_CAPACITY: usize = 2000; @@ -147,6 +150,7 @@ where fee_history_cache: FeeHistoryCache>, proof_permits: usize, rpc_converter: Rpc, + max_batch_size: usize, ) -> Self { let inner = EthApiInner::new( components, @@ -161,6 +165,7 @@ where proof_permits, rpc_converter, (), + max_batch_size, ); Self { inner: Arc::new(inner) } @@ -290,6 +295,10 @@ pub struct EthApiInner { /// Builder for pending block environment. next_env_builder: Box>, + + /// Transaction batch sender for batching tx insertions + tx_batch_sender: + mpsc::UnboundedSender::Transaction>>, } impl EthApiInner @@ -312,6 +321,7 @@ where proof_permits: usize, tx_resp_builder: Rpc, next_env: impl PendingEnvBuilder, + max_batch_size: usize, ) -> Self { let signers = parking_lot::RwLock::new(Default::default()); // get the block number of the latest block @@ -327,6 +337,11 @@ where let (raw_tx_sender, _) = broadcast::channel(DEFAULT_BROADCAST_CAPACITY); + // Create tx pool insertion batcher + let (processor, tx_batch_sender) = + BatchTxProcessor::new(components.pool().clone(), max_batch_size); + task_spawner.spawn_critical("tx-batcher", Box::pin(processor)); + Self { components, signers, @@ -344,6 +359,7 @@ where raw_tx_sender, tx_resp_builder, next_env_builder: Box::new(next_env), + tx_batch_sender, } } } @@ -473,6 +489,30 @@ where pub fn broadcast_raw_transaction(&self, raw_tx: Bytes) { let _ = self.raw_tx_sender.send(raw_tx); } + + /// Returns the transaction batch sender + #[inline] + const fn tx_batch_sender( + &self, + ) -> &mpsc::UnboundedSender::Transaction>> { + &self.tx_batch_sender + } + + /// Adds an _unvalidated_ transaction into the pool via the transaction batch sender. + #[inline] + pub async fn add_pool_transaction( + &self, + transaction: ::Transaction, + ) -> Result { + let (response_tx, response_rx) = tokio::sync::oneshot::channel(); + let request = reth_transaction_pool::BatchTxRequest::new(transaction, response_tx); + + self.tx_batch_sender() + .send(request) + .map_err(|_| reth_rpc_eth_types::EthApiError::BatchTxSendError)?; + + Ok(response_rx.await??) + } } #[cfg(test)] diff --git a/crates/rpc/rpc/src/eth/helpers/transaction.rs b/crates/rpc/rpc/src/eth/helpers/transaction.rs index f82886a9beb..b3a3614447b 100644 --- a/crates/rpc/rpc/src/eth/helpers/transaction.rs +++ b/crates/rpc/rpc/src/eth/helpers/transaction.rs @@ -8,9 +8,7 @@ use reth_rpc_eth_api::{ FromEvmError, RpcNodeCore, }; use reth_rpc_eth_types::{utils::recover_raw_transaction, EthApiError}; -use reth_transaction_pool::{ - AddedTransactionOutcome, PoolTransaction, TransactionOrigin, TransactionPool, -}; +use reth_transaction_pool::{AddedTransactionOutcome, PoolTransaction, TransactionPool}; impl EthTransactions for EthApi where @@ -34,9 +32,8 @@ where let pool_transaction = ::Transaction::from_pooled(recovered); - // submit the transaction to the pool with a `Local` origin let AddedTransactionOutcome { hash, .. } = - self.pool().add_transaction(TransactionOrigin::Local, pool_transaction).await?; + self.inner.add_pool_transaction(pool_transaction).await?; Ok(hash) } diff --git a/crates/transaction-pool/Cargo.toml b/crates/transaction-pool/Cargo.toml index 5409bb102e1..02030719840 100644 --- a/crates/transaction-pool/Cargo.toml +++ b/crates/transaction-pool/Cargo.toml @@ -34,6 +34,7 @@ alloy-consensus = { workspace = true, features = ["kzg"] } # async/futures futures-util.workspace = true parking_lot.workspace = true +pin-project.workspace = true tokio = { workspace = true, features = ["sync"] } tokio-stream.workspace = true @@ -72,6 +73,7 @@ assert_matches.workspace = true tempfile.workspace = true serde_json.workspace = true tokio = { workspace = true, features = ["rt-multi-thread"] } +futures.workspace = true [features] serde = [ @@ -133,6 +135,11 @@ name = "priority" required-features = ["arbitrary"] harness = false +[[bench]] +name = "insertion" +required-features = ["test-utils", "arbitrary"] +harness = false + [[bench]] name = "canonical_state_change" required-features = ["test-utils", "arbitrary"] diff --git a/crates/transaction-pool/benches/insertion.rs b/crates/transaction-pool/benches/insertion.rs new file mode 100644 index 00000000000..dc90d47366f --- /dev/null +++ b/crates/transaction-pool/benches/insertion.rs @@ -0,0 +1,128 @@ +#![allow(missing_docs)] +use alloy_primitives::Address; +use criterion::{criterion_group, criterion_main, Criterion}; +use proptest::{prelude::*, strategy::ValueTree, test_runner::TestRunner}; +use reth_transaction_pool::{ + batcher::{BatchTxProcessor, BatchTxRequest}, + test_utils::{testing_pool, MockTransaction}, + TransactionOrigin, TransactionPool, +}; +use tokio::sync::oneshot; + +/// Generates a set of transactions for multiple senders +fn generate_transactions(num_senders: usize, txs_per_sender: usize) -> Vec { + let mut runner = TestRunner::deterministic(); + let mut txs = Vec::new(); + + for sender_idx in 0..num_senders { + // Create a unique sender address + let sender_bytes = sender_idx.to_be_bytes(); + let addr_slice = [0u8; 12].into_iter().chain(sender_bytes.into_iter()).collect::>(); + let sender = Address::from_slice(&addr_slice); + + // Generate transactions for this sender + for nonce in 0..txs_per_sender { + let mut tx = any::().new_tree(&mut runner).unwrap().current(); + tx.set_sender(sender); + tx.set_nonce(nonce as u64); + + // Ensure it's not a legacy transaction + if tx.is_legacy() || tx.is_eip2930() { + tx = MockTransaction::eip1559(); + tx.set_priority_fee(any::().new_tree(&mut runner).unwrap().current()); + tx.set_max_fee(any::().new_tree(&mut runner).unwrap().current()); + tx.set_sender(sender); + tx.set_nonce(nonce as u64); + } + + txs.push(tx); + } + } + + txs +} + +/// Benchmark individual transaction insertion +fn txpool_insertion(c: &mut Criterion) { + let mut group = c.benchmark_group("Txpool insertion"); + let scenarios = [(1000, 100), (5000, 500), (10000, 1000), (20000, 2000)]; + + for (tx_count, sender_count) in scenarios { + let group_id = format!("txs: {tx_count} | senders: {sender_count}"); + + group.bench_function(group_id, |b| { + b.iter_with_setup( + || { + let rt = tokio::runtime::Runtime::new().unwrap(); + let pool = testing_pool(); + let txs = generate_transactions(tx_count, sender_count); + (rt, pool, txs) + }, + |(rt, pool, txs)| { + rt.block_on(async { + for tx in &txs { + let _ = + pool.add_transaction(TransactionOrigin::Local, tx.clone()).await; + } + }); + }, + ); + }); + } + + group.finish(); +} + +/// Benchmark batch transaction insertion +fn txpool_batch_insertion(c: &mut Criterion) { + let mut group = c.benchmark_group("Txpool batch insertion"); + let scenarios = [(1000, 100), (5000, 500), (10000, 1000), (20000, 2000)]; + + for (tx_count, sender_count) in scenarios { + let group_id = format!("txs: {tx_count} | senders: {sender_count}"); + + group.bench_function(group_id, |b| { + b.iter_with_setup( + || { + let rt = tokio::runtime::Runtime::new().unwrap(); + let pool = testing_pool(); + let txs = generate_transactions(tx_count, sender_count); + let (processor, request_tx) = BatchTxProcessor::new(pool, tx_count); + let processor_handle = rt.spawn(processor); + + let mut batch_requests = Vec::with_capacity(tx_count); + let mut response_futures = Vec::with_capacity(tx_count); + for tx in txs { + let (response_tx, response_rx) = oneshot::channel(); + let request = BatchTxRequest::new(tx, response_tx); + batch_requests.push(request); + response_futures.push(response_rx); + } + + (rt, request_tx, processor_handle, batch_requests, response_futures) + }, + |(rt, request_tx, _processor_handle, batch_requests, response_futures)| { + rt.block_on(async { + // Send all transactions + for request in batch_requests { + request_tx.send(request).unwrap(); + } + + for response_rx in response_futures { + let _res = response_rx.await.unwrap(); + } + }); + }, + ); + }); + } + + group.finish(); +} + +criterion_group! { + name = insertion; + config = Criterion::default(); + targets = txpool_insertion, txpool_batch_insertion +} +criterion_main!(insertion); diff --git a/crates/transaction-pool/src/batcher.rs b/crates/transaction-pool/src/batcher.rs new file mode 100644 index 00000000000..dcf59c9ea6d --- /dev/null +++ b/crates/transaction-pool/src/batcher.rs @@ -0,0 +1,241 @@ +//! Transaction batching for `Pool` insertion for high-throughput scenarios +//! +//! This module provides transaction batching logic to reduce lock contention when processing +//! many concurrent transaction pool insertions. + +use crate::{ + error::PoolError, AddedTransactionOutcome, PoolTransaction, TransactionOrigin, TransactionPool, +}; +use pin_project::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::sync::{mpsc, oneshot}; + +/// A single batch transaction request +/// All transactions processed through the batcher are considered local +/// transactions (`TransactionOrigin::Local`) when inserted into the pool. +#[derive(Debug)] +pub struct BatchTxRequest { + /// Tx to be inserted in to the pool + pool_tx: T, + /// Channel to send result back to caller + response_tx: oneshot::Sender>, +} + +impl BatchTxRequest +where + T: PoolTransaction, +{ + /// Create a new batch transaction request + pub const fn new( + pool_tx: T, + response_tx: oneshot::Sender>, + ) -> Self { + Self { pool_tx, response_tx } + } +} + +/// Transaction batch processor that handles batch processing +#[pin_project] +#[derive(Debug)] +pub struct BatchTxProcessor { + pool: Pool, + max_batch_size: usize, + #[pin] + request_rx: mpsc::UnboundedReceiver>, +} + +impl BatchTxProcessor +where + Pool: TransactionPool + 'static, +{ + /// Create a new `BatchTxProcessor` + pub fn new( + pool: Pool, + max_batch_size: usize, + ) -> (Self, mpsc::UnboundedSender>) { + let (request_tx, request_rx) = mpsc::unbounded_channel(); + + let processor = Self { pool, max_batch_size, request_rx }; + + (processor, request_tx) + } + + /// Process a batch of transaction requests, grouped by origin + async fn process_batch(pool: &Pool, batch: Vec>) { + let (pool_transactions, response_tx): (Vec<_>, Vec<_>) = + batch.into_iter().map(|req| (req.pool_tx, req.response_tx)).unzip(); + + let pool_results = pool.add_transactions(TransactionOrigin::Local, pool_transactions).await; + + for (response_tx, pool_result) in response_tx.into_iter().zip(pool_results) { + let _ = response_tx.send(pool_result); + } + } +} + +impl Future for BatchTxProcessor +where + Pool: TransactionPool + 'static, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + loop { + // Drain all available requests from the receiver + let mut batch = Vec::with_capacity(1); + while let Poll::Ready(Some(request)) = this.request_rx.poll_recv(cx) { + batch.push(request); + + // Check if the max batch size threshold has been reached + if batch.len() >= *this.max_batch_size { + break; + } + } + + if !batch.is_empty() { + let pool = this.pool.clone(); + tokio::spawn(async move { + Self::process_batch(&pool, batch).await; + }); + + continue; + } + + // No requests available, return Pending to wait for more + return Poll::Pending; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::{testing_pool, MockTransaction}; + use futures::stream::{FuturesUnordered, StreamExt}; + use std::time::Duration; + use tokio::time::timeout; + + #[tokio::test] + async fn test_process_batch() { + let pool = testing_pool(); + + let mut batch_requests = Vec::new(); + let mut responses = Vec::new(); + + for i in 0..100 { + let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100); + let (response_tx, response_rx) = tokio::sync::oneshot::channel(); + + batch_requests.push(BatchTxRequest::new(tx, response_tx)); + responses.push(response_rx); + } + + BatchTxProcessor::process_batch(&pool, batch_requests).await; + + for response_rx in responses { + let result = timeout(Duration::from_millis(5), response_rx) + .await + .expect("Timeout waiting for response") + .expect("Response channel was closed unexpectedly"); + assert!(result.is_ok()); + } + } + + #[tokio::test] + async fn test_batch_processor() { + let pool = testing_pool(); + let (processor, request_tx) = BatchTxProcessor::new(pool.clone(), 1000); + + // Spawn the processor + let handle = tokio::spawn(processor); + + let mut responses = Vec::new(); + + for i in 0..50 { + let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100); + let (response_tx, response_rx) = tokio::sync::oneshot::channel(); + + request_tx.send(BatchTxRequest::new(tx, response_tx)).expect("Could not send batch tx"); + responses.push(response_rx); + } + + tokio::time::sleep(Duration::from_millis(10)).await; + + for rx in responses { + let result = timeout(Duration::from_millis(10), rx) + .await + .expect("Timeout waiting for response") + .expect("Response channel was closed unexpectedly"); + assert!(result.is_ok()); + } + + drop(request_tx); + handle.abort(); + } + + #[tokio::test] + async fn test_add_transaction() { + let pool = testing_pool(); + let (processor, request_tx) = BatchTxProcessor::new(pool.clone(), 1000); + + // Spawn the processor + let handle = tokio::spawn(processor); + + let mut results = Vec::new(); + for i in 0..10 { + let tx = MockTransaction::legacy().with_nonce(i).with_gas_price(100); + let (response_tx, response_rx) = tokio::sync::oneshot::channel(); + let request = BatchTxRequest::new(tx, response_tx); + request_tx.send(request).expect("Could not send batch tx"); + results.push(response_rx); + } + + for res in results { + let result = timeout(Duration::from_millis(10), res) + .await + .expect("Timeout waiting for transaction result"); + assert!(result.is_ok()); + } + + handle.abort(); + } + + #[tokio::test] + async fn test_max_batch_size() { + let pool = testing_pool(); + let max_batch_size = 10; + let (processor, request_tx) = BatchTxProcessor::new(pool.clone(), max_batch_size); + + // Spawn batch processor with threshold + let handle = tokio::spawn(processor); + + let mut futures = FuturesUnordered::new(); + for i in 0..max_batch_size { + let tx = MockTransaction::legacy().with_nonce(i as u64).with_gas_price(100); + let (response_tx, response_rx) = tokio::sync::oneshot::channel(); + let request = BatchTxRequest::new(tx, response_tx); + let request_tx_clone = request_tx.clone(); + + let tx_fut = async move { + request_tx_clone.send(request).expect("Could not send batch tx"); + response_rx.await.expect("Could not receive batch response") + }; + futures.push(tx_fut); + } + + while let Some(result) = timeout(Duration::from_millis(5), futures.next()) + .await + .expect("Timeout waiting for transaction result") + { + assert!(result.is_ok()); + } + + handle.abort(); + } +} diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index de923635928..e5e2df416fb 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -63,7 +63,7 @@ //! //! ## Validation Process //! -//! ### Stateless Checks +//! ### Stateless Checks //! //! Ethereum transactions undergo several stateless checks: //! @@ -271,6 +271,7 @@ #![cfg_attr(not(test), warn(unused_crate_dependencies))] pub use crate::{ + batcher::{BatchTxProcessor, BatchTxRequest}, blobstore::{BlobStore, BlobStoreError}, config::{ LocalTransactionConfig, PoolConfig, PriceBumpConfig, SubPoolLimit, DEFAULT_PRICE_BUMP, @@ -314,6 +315,7 @@ pub mod noop; pub mod pool; pub mod validate; +pub mod batcher; pub mod blobstore; mod config; pub mod identifier; diff --git a/docs/vocs/docs/pages/cli/reth/node.mdx b/docs/vocs/docs/pages/cli/reth/node.mdx index 83c954d886a..ed0b012dd8b 100644 --- a/docs/vocs/docs/pages/cli/reth/node.mdx +++ b/docs/vocs/docs/pages/cli/reth/node.mdx @@ -570,6 +570,11 @@ TxPool: --txpool.disable-transactions-backup Disables transaction backup to disk on node shutdown + --txpool.max-batch-size + Max batch size for transaction pool insertions + + [default: 1] + Builder: --builder.extradata Block extra data set by the payload builder