Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
2336e3d
feat: tx batcher
0xKitsune Jul 29, 2025
8a53008
chore: simplify batch processing logic
0xKitsune Jul 29, 2025
db09551
feat: add tx batcher to ethapi
0xKitsune Jul 29, 2025
c6dd4c8
chore: fix imports, add todos
0xKitsune Jul 29, 2025
c739901
wip: add batcher to ethapi
0xKitsune Jul 29, 2025
db5c589
wip: batcher initialization
0xKitsune Jul 29, 2025
a32086a
wip: update process_batches
0xKitsune Jul 29, 2025
c3a9a5c
test: test process batch
0xKitsune Jul 29, 2025
6f6e47f
wip: use local tx origin
0xKitsune Jul 29, 2025
3d0cd6b
fix: test process batch
0xKitsune Jul 29, 2025
ba1de93
test: add transaction, process batches
0xKitsune Jul 29, 2025
fc9fec3
feat: BatchTxRequest::new
0xKitsune Jul 29, 2025
68562d9
feat: add batch threshold
0xKitsune Jul 29, 2025
5167b0f
test: test_batch_threshold
0xKitsune Jul 29, 2025
517842a
chore: clippy
0xKitsune Jul 29, 2025
a36d15f
chore: move tx batcher to tx-pool
0xKitsune Jul 29, 2025
d2d816e
fix: update error types, imports
0xKitsune Jul 29, 2025
bcab49c
chore: remove unused dep
0xKitsune Jul 29, 2025
9d3a7d3
chore: clippy
0xKitsune Jul 29, 2025
5aad871
chore: remove unneeded hash trait
0xKitsune Jul 29, 2025
ca94194
wip: add todo to drain channel
0xKitsune Jul 29, 2025
4c2bdda
fix: drain channel when processing batch txs
0xKitsune Jul 29, 2025
53aaad0
docs: remove comment
0xKitsune Jul 29, 2025
69d6bcb
Update crates/transaction-pool/src/traits.rs
0xKitsune Jul 29, 2025
879a4da
Update crates/transaction-pool/src/traits.rs
0xKitsune Jul 29, 2025
1f9dfae
Update crates/transaction-pool/src/lib.rs
0xKitsune Jul 29, 2025
3fb23b5
feat: update batch processing as native fut
0xKitsune Jul 29, 2025
354c375
chore: add TxBatchError to EthApiError
0xKitsune Jul 29, 2025
06474c6
fix: error handling
0xKitsune Jul 29, 2025
4fac19c
chore: error handling, cleanup
0xKitsune Jul 29, 2025
26e2247
fmt: fix formatting
0xKitsune Jul 29, 2025
8cf82ab
chore: update naming
0xKitsune Jul 29, 2025
58bf2ee
chore: rename batch types
0xKitsune Jul 29, 2025
cbf078c
fmt: cargo fmt
0xKitsune Jul 29, 2025
852776c
bench: txpool insertion
0xKitsune Jul 30, 2025
eb7de71
chore: comments
0xKitsune Jul 30, 2025
aff4967
fix: remove buffer size, use unbounded channel
0xKitsune Jul 31, 2025
097312f
fix: update types
0xKitsune Jul 31, 2025
1302fbc
fix: update error handling
0xKitsune Jul 31, 2025
f826921
Merge branch 'main' into kit/batch-insert
0xKitsune Jul 31, 2025
cd239eb
chore: update benches
0xKitsune Jul 31, 2025
f3c6546
fmt: cargo fmt
0xKitsune Jul 31, 2025
b7a3f06
docs: txpool.max-batch-size args
0xKitsune Jul 31, 2025
18f0633
Merge branch 'main' into kit/batch-insert
0xKitsune Aug 3, 2025
a2539a5
Merge branch 'main' into kit/batch-insert
0xKitsune Aug 6, 2025
520bc66
chore: use static
0xKitsune Aug 6, 2025
ca14322
feat: use batch tx sender by default
0xKitsune Aug 7, 2025
0ca9a07
chore: update trait bounds
0xKitsune Aug 7, 2025
e128924
Merge branch 'kit/batch-insert' of github.com:0xKitsune/reth into kit…
0xKitsune Aug 7, 2025
2949733
docs: update max batch size arg
0xKitsune Aug 7, 2025
7751378
feat: OpEthApi batch insertion
0xKitsune Aug 7, 2025
9809fc5
docs: update vocs
0xKitsune Aug 7, 2025
010bd61
chore: spawn BatchTxProcessor in EthApiInner
0xKitsune Aug 7, 2025
7fdf6c1
docs: add comment back
0xKitsune Aug 7, 2025
bff2cc4
docs: add max-batch-size default value
0xKitsune Aug 7, 2025
80c88f2
docs: fix lint
0xKitsune Aug 7, 2025
35440fd
docs: fix ci
0xKitsune Aug 7, 2025
f5ef873
fix: cli docs
0xKitsune Aug 7, 2025
b40eeaa
docs: run cli/update.sh
0xKitsune Aug 7, 2025
92e19c5
docs: rerun update.sh
0xKitsune Aug 7, 2025
70f0003
Merge branch 'main' into kit/batch-insert
0xKitsune Aug 12, 2025
dfb75b3
Merge branch 'main' into kit/batch-insert
mattsse Aug 13, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion crates/node/builder/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use reth_node_api::{
NodeAddOns, NodeTypes, PayloadTypes, PayloadValidator, PrimitivesTy, TreeConfig,
};
use reth_node_core::{
cli::config::RethTransactionPoolConfig,
node_config::NodeConfig,
version::{version_metadata, CLIENT_CODE},
};
Expand Down Expand Up @@ -863,7 +864,8 @@ where
}),
);

let ctx = EthApiCtx { components: &node, config: config.rpc.eth_config(), cache };
let eth_config = config.rpc.eth_config().max_batch_size(config.txpool.max_batch_size());
let ctx = EthApiCtx { components: &node, config: eth_config, cache };
let eth_api = eth_api_builder.build_eth_api(ctx).await?;

let auth_config = config.rpc.auth_server_config(jwt_secret)?;
Expand Down Expand Up @@ -1040,6 +1042,7 @@ impl<'a, N: FullNodeComponents<Types: NodeTypes<ChainSpec: EthereumHardforks>>>
.fee_history_cache_config(self.config.fee_history_cache)
.proof_permits(self.config.proof_permits)
.gas_oracle_config(self.config.gas_oracle)
.max_batch_size(self.config.max_batch_size)
}
}

Expand Down
10 changes: 10 additions & 0 deletions crates/node/core/src/args/txpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ pub struct TxPoolArgs {
conflicts_with = "transactions_backup_path"
)]
pub disable_transactions_backup: bool,

/// Max batch size for transaction pool insertions
#[arg(long = "txpool.max-batch-size", default_value_t = 1)]
pub max_batch_size: usize,
}

impl Default for TxPoolArgs {
Expand Down Expand Up @@ -165,6 +169,7 @@ impl Default for TxPoolArgs {
max_queued_lifetime: MAX_QUEUED_TRANSACTION_LIFETIME,
transactions_backup_path: None,
disable_transactions_backup: false,
max_batch_size: 1,
}
}
}
Expand Down Expand Up @@ -209,6 +214,11 @@ impl RethTransactionPoolConfig for TxPoolArgs {
max_queued_lifetime: self.max_queued_lifetime,
}
}

/// Returns max batch size for transaction batch insertion.
fn max_batch_size(&self) -> usize {
self.max_batch_size
}
}

#[cfg(test)]
Expand Down
3 changes: 3 additions & 0 deletions crates/node/core/src/cli/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,7 @@ impl<N: NetworkPrimitives> RethNetworkConfig for reth_network::NetworkManager<N>
pub trait RethTransactionPoolConfig {
/// Returns transaction pool configuration.
fn pool_config(&self) -> PoolConfig;

/// Returns max batch size for transaction batch insertion.
fn max_batch_size(&self) -> usize;
}
7 changes: 2 additions & 5 deletions crates/optimism/rpc/src/eth/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,8 @@ where
})?;

// Retain tx in local tx pool after forwarding, for local RPC usage.
let _ = self
.pool()
.add_transaction(TransactionOrigin::Local, pool_transaction)
.await.inspect_err(|err| {
tracing::warn!(target: "rpc::eth", %err, %hash, "successfully sent tx to sequencer, but failed to persist in local tx pool");
let _ = self.inner.eth_api.add_pool_transaction(pool_transaction).await.inspect_err(|err| {
tracing::warn!(target: "rpc::eth", %err, %hash, "successfully sent tx to sequencer, but failed to persist in local tx pool");
});

return Ok(hash)
Expand Down
9 changes: 9 additions & 0 deletions crates/rpc/rpc-eth-types/src/builder/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub struct EthConfig {
pub fee_history_cache: FeeHistoryCacheConfig,
/// The maximum number of getproof calls that can be executed concurrently.
pub proof_permits: usize,
/// Maximum batch size for transaction pool insertions.
pub max_batch_size: usize,
}

impl EthConfig {
Expand Down Expand Up @@ -72,6 +74,7 @@ impl Default for EthConfig {
stale_filter_ttl: DEFAULT_STALE_FILTER_TTL,
fee_history_cache: FeeHistoryCacheConfig::default(),
proof_permits: DEFAULT_PROOF_PERMITS,
max_batch_size: 1,
}
}
}
Expand Down Expand Up @@ -136,6 +139,12 @@ impl EthConfig {
self.proof_permits = permits;
self
}

/// Configures the maximum batch size for transaction pool insertions
pub const fn max_batch_size(mut self, max_batch_size: usize) -> Self {
self.max_batch_size = max_batch_size;
self
}
}

/// Config for the filter
Expand Down
11 changes: 11 additions & 0 deletions crates/rpc/rpc-eth-types/src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use revm::context_interface::result::{
};
use revm_inspectors::tracing::MuxError;
use std::convert::Infallible;
use tokio::sync::oneshot::error::RecvError;
use tracing::error;

/// A trait to convert an error to an RPC error.
Expand Down Expand Up @@ -166,6 +167,12 @@ pub enum EthApiError {
/// Duration that was waited before timing out
duration: Duration,
},
/// Error thrown when batch tx response channel fails
#[error(transparent)]
BatchTxRecvError(#[from] RecvError),
/// Error thrown when batch tx send channel fails
#[error("Batch transaction sender channel closed")]
BatchTxSendError,
/// Any other error
#[error("{0}")]
Other(Box<dyn ToRpcError>),
Expand Down Expand Up @@ -279,6 +286,10 @@ impl From<EthApiError> for jsonrpsee_types::error::ErrorObject<'static> {
EthApiError::PrunedHistoryUnavailable => rpc_error_with_code(4444, error.to_string()),
EthApiError::Other(err) => err.to_rpc_error(),
EthApiError::MuxTracerError(msg) => internal_rpc_err(msg.to_string()),
EthApiError::BatchTxRecvError(err) => internal_rpc_err(err.to_string()),
EthApiError::BatchTxSendError => {
internal_rpc_err("Batch transaction sender channel closed".to_string())
}
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions crates/rpc/rpc/src/eth/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub struct EthApiBuilder<N: RpcNodeCore, Rpc, NextEnv = ()> {
blocking_task_pool: Option<BlockingTaskPool>,
task_spawner: Box<dyn TaskSpawner + 'static>,
next_env: NextEnv,
max_batch_size: usize,
}

impl<Provider, Pool, Network, EvmConfig, ChainSpec>
Expand Down Expand Up @@ -78,6 +79,7 @@ impl<N: RpcNodeCore, Rpc, NextEnv> EthApiBuilder<N, Rpc, NextEnv> {
blocking_task_pool,
task_spawner,
next_env,
max_batch_size,
} = self;
EthApiBuilder {
components,
Expand All @@ -94,6 +96,7 @@ impl<N: RpcNodeCore, Rpc, NextEnv> EthApiBuilder<N, Rpc, NextEnv> {
blocking_task_pool,
task_spawner,
next_env,
max_batch_size,
}
}
}
Expand Down Expand Up @@ -121,6 +124,7 @@ where
gas_oracle_config: Default::default(),
eth_state_cache_config: Default::default(),
next_env: Default::default(),
max_batch_size: 1,
}
}
}
Expand Down Expand Up @@ -155,6 +159,7 @@ where
task_spawner,
gas_oracle_config,
next_env,
max_batch_size,
} = self;
EthApiBuilder {
components,
Expand All @@ -171,6 +176,7 @@ where
task_spawner,
gas_oracle_config,
next_env,
max_batch_size,
}
}

Expand All @@ -194,6 +200,7 @@ where
task_spawner,
gas_oracle_config,
next_env: _,
max_batch_size,
} = self;
EthApiBuilder {
components,
Expand All @@ -210,6 +217,7 @@ where
task_spawner,
gas_oracle_config,
next_env,
max_batch_size,
}
}

Expand Down Expand Up @@ -281,6 +289,12 @@ where
self
}

/// Sets the max batch size for batching transaction insertions.
pub const fn max_batch_size(mut self, max_batch_size: usize) -> Self {
self.max_batch_size = max_batch_size;
self
}

/// Builds the [`EthApiInner`] instance.
///
/// If not configured, this will spawn the cache backend: [`EthStateCache::spawn`].
Expand Down Expand Up @@ -309,6 +323,7 @@ where
proof_permits,
task_spawner,
next_env,
max_batch_size,
} = self;

let provider = components.provider().clone();
Expand Down Expand Up @@ -345,6 +360,7 @@ where
proof_permits,
rpc_converter,
next_env,
max_batch_size,
)
}

Expand Down
44 changes: 42 additions & 2 deletions crates/rpc/rpc/src/eth/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ use reth_tasks::{
pool::{BlockingTaskGuard, BlockingTaskPool},
TaskSpawner, TokioTaskExecutor,
};
use reth_transaction_pool::noop::NoopTransactionPool;
use tokio::sync::{broadcast, Mutex};
use reth_transaction_pool::{
noop::NoopTransactionPool, AddedTransactionOutcome, BatchTxProcessor, BatchTxRequest,
TransactionPool,
};
use tokio::sync::{broadcast, mpsc, Mutex};

const DEFAULT_BROADCAST_CAPACITY: usize = 2000;

Expand Down Expand Up @@ -147,6 +150,7 @@ where
fee_history_cache: FeeHistoryCache<ProviderHeader<N::Provider>>,
proof_permits: usize,
rpc_converter: Rpc,
max_batch_size: usize,
) -> Self {
let inner = EthApiInner::new(
components,
Expand All @@ -161,6 +165,7 @@ where
proof_permits,
rpc_converter,
(),
max_batch_size,
);

Self { inner: Arc::new(inner) }
Expand Down Expand Up @@ -290,6 +295,10 @@ pub struct EthApiInner<N: RpcNodeCore, Rpc: RpcConvert> {

/// Builder for pending block environment.
next_env_builder: Box<dyn PendingEnvBuilder<N::Evm>>,

/// Transaction batch sender for batching tx insertions
tx_batch_sender:
mpsc::UnboundedSender<BatchTxRequest<<N::Pool as TransactionPool>::Transaction>>,
}

impl<N, Rpc> EthApiInner<N, Rpc>
Expand All @@ -312,6 +321,7 @@ where
proof_permits: usize,
tx_resp_builder: Rpc,
next_env: impl PendingEnvBuilder<N::Evm>,
max_batch_size: usize,
) -> Self {
let signers = parking_lot::RwLock::new(Default::default());
// get the block number of the latest block
Expand All @@ -327,6 +337,11 @@ where

let (raw_tx_sender, _) = broadcast::channel(DEFAULT_BROADCAST_CAPACITY);

// Create tx pool insertion batcher
let (processor, tx_batch_sender) =
BatchTxProcessor::new(components.pool().clone(), max_batch_size);
task_spawner.spawn_critical("tx-batcher", Box::pin(processor));

Self {
components,
signers,
Expand All @@ -344,6 +359,7 @@ where
raw_tx_sender,
tx_resp_builder,
next_env_builder: Box::new(next_env),
tx_batch_sender,
}
}
}
Expand Down Expand Up @@ -473,6 +489,30 @@ where
pub fn broadcast_raw_transaction(&self, raw_tx: Bytes) {
let _ = self.raw_tx_sender.send(raw_tx);
}

/// Returns the transaction batch sender
#[inline]
const fn tx_batch_sender(
&self,
) -> &mpsc::UnboundedSender<BatchTxRequest<<N::Pool as TransactionPool>::Transaction>> {
&self.tx_batch_sender
}

/// Adds an _unvalidated_ transaction into the pool via the transaction batch sender.
#[inline]
pub async fn add_pool_transaction(
&self,
transaction: <N::Pool as TransactionPool>::Transaction,
) -> Result<AddedTransactionOutcome, EthApiError> {
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
let request = reth_transaction_pool::BatchTxRequest::new(transaction, response_tx);

self.tx_batch_sender()
.send(request)
.map_err(|_| reth_rpc_eth_types::EthApiError::BatchTxSendError)?;

Ok(response_rx.await??)
}
}

#[cfg(test)]
Expand Down
7 changes: 2 additions & 5 deletions crates/rpc/rpc/src/eth/helpers/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ use reth_rpc_eth_api::{
FromEvmError, RpcNodeCore,
};
use reth_rpc_eth_types::{utils::recover_raw_transaction, EthApiError};
use reth_transaction_pool::{
AddedTransactionOutcome, PoolTransaction, TransactionOrigin, TransactionPool,
};
use reth_transaction_pool::{AddedTransactionOutcome, PoolTransaction, TransactionPool};

impl<N, Rpc> EthTransactions for EthApi<N, Rpc>
where
Expand All @@ -34,9 +32,8 @@ where

let pool_transaction = <Self::Pool as TransactionPool>::Transaction::from_pooled(recovered);

// submit the transaction to the pool with a `Local` origin
let AddedTransactionOutcome { hash, .. } =
self.pool().add_transaction(TransactionOrigin::Local, pool_transaction).await?;
self.inner.add_pool_transaction(pool_transaction).await?;

Ok(hash)
}
Expand Down
7 changes: 7 additions & 0 deletions crates/transaction-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ alloy-consensus = { workspace = true, features = ["kzg"] }
# async/futures
futures-util.workspace = true
parking_lot.workspace = true
pin-project.workspace = true
tokio = { workspace = true, features = ["sync"] }
tokio-stream.workspace = true

Expand Down Expand Up @@ -72,6 +73,7 @@ assert_matches.workspace = true
tempfile.workspace = true
serde_json.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }
futures.workspace = true

[features]
serde = [
Expand Down Expand Up @@ -133,6 +135,11 @@ name = "priority"
required-features = ["arbitrary"]
harness = false

[[bench]]
name = "insertion"
required-features = ["test-utils", "arbitrary"]
harness = false

[[bench]]
name = "canonical_state_change"
required-features = ["test-utils", "arbitrary"]
Expand Down
Loading
Loading