Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/node/core/src/args/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,7 @@ mod tests {
max_receipts: 2000,
max_headers: 1000,
max_concurrent_db_requests: 512,
max_cached_tx_hashes: 30_000,
},
gas_price_oracle: GasPriceOracleArgs {
blocks: 20,
Expand Down
10 changes: 9 additions & 1 deletion crates/node/core/src/args/rpc_state_cache.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use clap::Args;
use reth_rpc_server_types::constants::cache::{
DEFAULT_BLOCK_CACHE_MAX_LEN, DEFAULT_CONCURRENT_DB_REQUESTS, DEFAULT_HEADER_CACHE_MAX_LEN,
DEFAULT_RECEIPT_CACHE_MAX_LEN,
DEFAULT_MAX_CACHED_TX_HASHES, DEFAULT_RECEIPT_CACHE_MAX_LEN,
};

/// Parameters to configure RPC state cache.
Expand Down Expand Up @@ -36,6 +36,13 @@ pub struct RpcStateCacheArgs {
default_value_t = DEFAULT_CONCURRENT_DB_REQUESTS,
)]
pub max_concurrent_db_requests: usize,

/// Maximum number of transaction hashes to cache for transaction lookups.
#[arg(
long = "rpc-cache.max-cached-tx-hashes",
default_value_t = DEFAULT_MAX_CACHED_TX_HASHES,
)]
pub max_cached_tx_hashes: u32,
}

impl RpcStateCacheArgs {
Expand All @@ -54,6 +61,7 @@ impl Default for RpcStateCacheArgs {
max_receipts: DEFAULT_RECEIPT_CACHE_MAX_LEN,
max_headers: DEFAULT_HEADER_CACHE_MAX_LEN,
max_concurrent_db_requests: DEFAULT_CONCURRENT_DB_REQUESTS,
max_cached_tx_hashes: DEFAULT_MAX_CACHED_TX_HASHES,
}
}
}
1 change: 1 addition & 0 deletions crates/rpc/rpc-builder/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ impl RethRpcServerConfig for RpcServerArgs {
max_receipts: self.rpc_state_cache.max_receipts,
max_headers: self.rpc_state_cache.max_headers,
max_concurrent_db_requests: self.rpc_state_cache.max_concurrent_db_requests,
max_cached_tx_hashes: self.rpc_state_cache.max_cached_tx_hashes,
}
}

Expand Down
15 changes: 14 additions & 1 deletion crates/rpc/rpc-eth-api/src/helpers/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,20 @@ pub trait LoadTransaction: SpawnBlocking + FullEthApiTypes + RpcNodeCoreExt {
Output = Result<Option<TransactionSource<ProviderTx<Self::Provider>>>, Self::Error>,
> + Send {
async move {
// Try to find the transaction on disk
// First, try the RPC cache
if let Some(cached) = self.cache().get_transaction_by_hash(hash).await &&
let Some(tx) = cached.recovered_transaction()
{
return Ok(Some(TransactionSource::Block {
transaction: tx.cloned(),
index: cached.tx_index as u64,
block_hash: cached.block.hash(),
block_number: cached.block.number(),
base_fee: cached.block.base_fee_per_gas(),
}));
}

// Cache miss - try to find the transaction on disk
if let Some((tx, meta)) = self
.spawn_blocking_io(move |this| {
this.provider()
Expand Down
57 changes: 55 additions & 2 deletions crates/rpc/rpc-eth-types/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,68 @@

use std::sync::Arc;

use alloy_consensus::TxReceipt;
use alloy_consensus::{transaction::TxHashRef, TxReceipt};
use alloy_primitives::TxHash;
use reth_primitives_traits::{
BlockTy, IndexedTx, NodePrimitives, ReceiptTy, RecoveredBlock, SealedBlock,
Block, BlockBody, BlockTy, IndexedTx, NodePrimitives, ReceiptTy, Recovered, RecoveredBlock,
SealedBlock,
};
use reth_rpc_convert::{transaction::ConvertReceiptInput, RpcConvert, RpcTypes};

use crate::utils::calculate_gas_used_and_next_log_index;

/// Cached data for a transaction lookup.
#[derive(Debug, Clone)]
pub struct CachedTransaction<B: Block, R> {
/// The block containing this transaction.
pub block: Arc<RecoveredBlock<B>>,
/// Index of the transaction within the block.
pub tx_index: usize,
/// Receipts for the block, if available.
pub receipts: Option<Arc<Vec<R>>>,
}

impl<B: Block, R> CachedTransaction<B, R> {
/// Creates a new cached transaction entry.
pub const fn new(
block: Arc<RecoveredBlock<B>>,
tx_index: usize,
receipts: Option<Arc<Vec<R>>>,
) -> Self {
Self { block, tx_index, receipts }
}

/// Returns the `Recovered<&T>` transaction at the cached index.
pub fn recovered_transaction(&self) -> Option<Recovered<&<B::Body as BlockBody>::Transaction>> {
self.block.recovered_transaction(self.tx_index)
}

/// Converts this cached transaction into an RPC receipt using the given converter.
///
/// Returns `None` if receipts are not available or the transaction index is out of bounds.
pub fn into_receipt<N, C>(
self,
converter: &C,
) -> Option<Result<<C::Network as RpcTypes>::Receipt, C::Error>>
where
N: NodePrimitives<Block = B, Receipt = R>,
R: TxReceipt + Clone,
C: RpcConvert<Primitives = N>,
{
let receipts = self.receipts?;
let receipt = receipts.get(self.tx_index)?;
let tx_hash = *self.block.body().transactions().get(self.tx_index)?.tx_hash();
let tx = self.block.find_indexed(tx_hash)?;
convert_transaction_receipt::<N, C>(
self.block.as_ref(),
receipts.as_ref(),
tx,
receipt,
converter,
)
}
}

/// A pair of an [`Arc`] wrapped [`RecoveredBlock`] and its corresponding receipts.
///
/// This type is used throughout the RPC layer to efficiently pass around
Expand Down
5 changes: 4 additions & 1 deletion crates/rpc/rpc-eth-types/src/cache/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};

use reth_rpc_server_types::constants::cache::{
DEFAULT_BLOCK_CACHE_MAX_LEN, DEFAULT_CONCURRENT_DB_REQUESTS, DEFAULT_HEADER_CACHE_MAX_LEN,
DEFAULT_RECEIPT_CACHE_MAX_LEN,
DEFAULT_MAX_CACHED_TX_HASHES, DEFAULT_RECEIPT_CACHE_MAX_LEN,
};

/// Settings for the [`EthStateCache`](super::EthStateCache).
Expand All @@ -27,6 +27,8 @@ pub struct EthStateCacheConfig {
///
/// Default is 512.
pub max_concurrent_db_requests: usize,
/// Maximum number of transaction hashes to cache for transaction lookups.
pub max_cached_tx_hashes: u32,
}

impl Default for EthStateCacheConfig {
Expand All @@ -36,6 +38,7 @@ impl Default for EthStateCacheConfig {
max_receipts: DEFAULT_RECEIPT_CACHE_MAX_LEN,
max_headers: DEFAULT_HEADER_CACHE_MAX_LEN,
max_concurrent_db_requests: DEFAULT_CONCURRENT_DB_REQUESTS,
max_cached_tx_hashes: DEFAULT_MAX_CACHED_TX_HASHES,
}
}
}
73 changes: 69 additions & 4 deletions crates/rpc/rpc-eth-types/src/cache/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
//! Async caching support for eth RPC

use super::{EthStateCacheConfig, MultiConsumerLruCache};
use alloy_consensus::BlockHeader;
use crate::block::CachedTransaction;
use alloy_consensus::{transaction::TxHashRef, BlockHeader};
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::B256;
use alloy_primitives::{TxHash, B256};
use futures::{stream::FuturesOrdered, Stream, StreamExt};
use reth_chain_state::CanonStateNotification;
use reth_errors::{ProviderError, ProviderResult};
use reth_execution_types::Chain;
use reth_primitives_traits::{Block, NodePrimitives, RecoveredBlock};
use reth_primitives_traits::{Block, BlockBody, NodePrimitives, RecoveredBlock};
use reth_storage_api::{BlockReader, TransactionVariant};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use schnellru::{ByLength, Limiter};
use schnellru::{ByLength, Limiter, LruMap};
use std::{
future::Future,
pin::Pin,
Expand Down Expand Up @@ -47,6 +48,9 @@ type HeaderResponseSender<H> = oneshot::Sender<ProviderResult<H>>;
/// The type that can send the response with a chain of cached blocks
type CachedParentBlocksResponseSender<B> = oneshot::Sender<Vec<Arc<RecoveredBlock<B>>>>;

/// The type that can send the response for a transaction hash lookup
type TransactionHashResponseSender<B, R> = oneshot::Sender<Option<CachedTransaction<B, R>>>;

type BlockLruCache<B, L> =
MultiConsumerLruCache<B256, Arc<RecoveredBlock<B>>, L, BlockWithSendersResponseSender<B>>;

Expand Down Expand Up @@ -79,11 +83,13 @@ impl<N: NodePrimitives> EthStateCache<N> {
max_receipts: u32,
max_headers: u32,
max_concurrent_db_operations: usize,
max_cached_tx_hashes: u32,
) -> (Self, EthStateCacheService<Provider, Tasks>)
where
Provider: BlockReader<Block = N::Block, Receipt = N::Receipt>,
{
let (to_service, rx) = unbounded_channel();

let service = EthStateCacheService {
provider,
full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
Expand All @@ -93,6 +99,7 @@ impl<N: NodePrimitives> EthStateCache<N> {
action_rx: UnboundedReceiverStream::new(rx),
action_task_spawner,
rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_operations)),
tx_hash_index: LruMap::new(ByLength::new(max_cached_tx_hashes)),
};
let cache = Self { to_service };
(cache, service)
Expand Down Expand Up @@ -127,6 +134,7 @@ impl<N: NodePrimitives> EthStateCache<N> {
max_receipts,
max_headers,
max_concurrent_db_requests,
max_cached_tx_hashes,
} = config;
let (this, service) = Self::create(
provider,
Expand All @@ -135,6 +143,7 @@ impl<N: NodePrimitives> EthStateCache<N> {
max_receipts,
max_headers,
max_concurrent_db_requests,
max_cached_tx_hashes,
);
executor.spawn_critical("eth state cache", Box::pin(service));
this
Expand Down Expand Up @@ -255,6 +264,19 @@ impl<N: NodePrimitives> EthStateCache<N> {
Some(blocks)
}
}

/// Looks up a transaction by its hash in the cache index.
///
/// Returns the cached block, transaction index, and optionally receipts if the transaction
/// is in a cached block.
pub async fn get_transaction_by_hash(
&self,
tx_hash: TxHash,
) -> Option<CachedTransaction<N::Block, N::Receipt>> {
let (response_tx, rx) = oneshot::channel();
let _ = self.to_service.send(CacheAction::GetTransactionByHash { tx_hash, response_tx });
rx.await.ok()?
}
}
/// Thrown when the cache service task dropped.
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -317,13 +339,38 @@ pub(crate) struct EthStateCacheService<
///
/// This restricts the max concurrent fetch tasks at the same time.
rate_limiter: Arc<Semaphore>,
/// LRU index mapping transaction hashes to their block hash and index within the block.
tx_hash_index: LruMap<TxHash, (B256, usize), ByLength>,
}

impl<Provider, Tasks> EthStateCacheService<Provider, Tasks>
where
Provider: BlockReader + Clone + Unpin + 'static,
Tasks: TaskSpawner + Clone + 'static,
{
/// Indexes all transactions in a block by transaction hash.
fn index_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
let block_hash = block.hash();
for (tx_idx, tx) in block.body().transactions().iter().enumerate() {
self.tx_hash_index.insert(*tx.tx_hash(), (block_hash, tx_idx));
}
}

/// Removes transaction index entries for a reorged block.
///
/// Only removes entries that still point to this block, preserving mappings for transactions
/// that were re-mined in a new canonical block.
fn remove_block_transactions(&mut self, block: &RecoveredBlock<Provider::Block>) {
let block_hash = block.hash();
for tx in block.body().transactions() {
if let Some((mapped_hash, _)) = self.tx_hash_index.get(tx.tx_hash()) &&
*mapped_hash == block_hash
{
self.tx_hash_index.remove(tx.tx_hash());
}
}
}

fn on_new_block(
&mut self,
block_hash: B256,
Expand Down Expand Up @@ -550,6 +597,8 @@ where
}
CacheAction::CacheNewCanonicalChain { chain_change } => {
for block in chain_change.blocks {
// Index transactions before caching the block
this.index_block_transactions(&block);
this.on_new_block(block.hash(), Ok(Some(Arc::new(block))));
}

Expand All @@ -562,6 +611,8 @@ where
}
CacheAction::RemoveReorgedChain { chain_change } => {
for block in chain_change.blocks {
// Remove transaction index entries for reorged blocks
this.remove_block_transactions(&block);
this.on_reorg_block(block.hash(), Ok(Some(block)));
}

Expand Down Expand Up @@ -596,6 +647,15 @@ where

let _ = response_tx.send(blocks);
}
CacheAction::GetTransactionByHash { tx_hash, response_tx } => {
let result =
this.tx_hash_index.get(&tx_hash).and_then(|(block_hash, idx)| {
let block = this.full_block_cache.get(block_hash).cloned()?;
let receipts = this.receipts_cache.get(block_hash).cloned();
Some(CachedTransaction::new(block, *idx, receipts))
});
let _ = response_tx.send(result);
}
};
this.update_cached_metrics();
}
Expand Down Expand Up @@ -649,6 +709,11 @@ enum CacheAction<B: Block, R> {
max_blocks: usize,
response_tx: CachedParentBlocksResponseSender<B>,
},
/// Look up a transaction's cached data by its hash
GetTransactionByHash {
tx_hash: TxHash,
response_tx: TransactionHashResponseSender<B, R>,
},
}

struct BlockReceipts<R> {
Expand Down
1 change: 1 addition & 0 deletions crates/rpc/rpc-eth-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod tx_forward;
pub mod utils;

pub use alloy_rpc_types_eth::FillTransaction;
pub use block::CachedTransaction;
pub use builder::config::{EthConfig, EthFilterConfig};
pub use cache::{
config::EthStateCacheConfig, db::StateCacheDb, multi_consumer::MultiConsumerLruCache,
Expand Down
4 changes: 2 additions & 2 deletions crates/rpc/rpc-eth-types/src/simulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ pub fn apply_precompile_overrides(
return Ok(());
}

for (source, _dest) in &moves {
for (source, _) in &moves {
if precompiles.get(source).is_none() {
return Err(EthSimulateError::NotAPrecompile(*source));
}
Expand All @@ -182,7 +182,7 @@ pub fn apply_precompile_overrides(
}
}

for (_source, dest, precompile) in extracted {
for (_, dest, precompile) in extracted {
precompiles.apply_precompile(&dest, |_| Some(precompile));
}

Expand Down
3 changes: 3 additions & 0 deletions crates/rpc/rpc-server-types/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,7 @@ pub mod cache {

/// Default number of concurrent database requests.
pub const DEFAULT_CONCURRENT_DB_REQUESTS: usize = 512;

/// Default maximum number of transaction hashes to cache for lookups.
pub const DEFAULT_MAX_CACHED_TX_HASHES: u32 = 30_000;
}
5 changes: 5 additions & 0 deletions docs/vocs/docs/pages/cli/op-reth/node.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,11 @@ RPC State Cache:

[default: 512]

--rpc-cache.max-cached-tx-hashes <MAX_CACHED_TX_HASHES>
Maximum number of transaction hashes to cache for transaction lookups

[default: 30000]

Gas Price Oracle:
--gpo.blocks <BLOCKS>
Number of recent blocks to check for gas price
Expand Down
5 changes: 5 additions & 0 deletions docs/vocs/docs/pages/cli/reth/node.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,11 @@ RPC State Cache:

[default: 512]

--rpc-cache.max-cached-tx-hashes <MAX_CACHED_TX_HASHES>
Maximum number of transaction hashes to cache for transaction lookups

[default: 30000]

Gas Price Oracle:
--gpo.blocks <BLOCKS>
Number of recent blocks to check for gas price
Expand Down
Loading