Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion crates/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,13 @@ where

/// Initializes the config and block env.
fn init_env(&mut self, header: &Header, total_difficulty: U256) {
fill_cfg_and_block_env(&mut self.evm.env, self.chain_spec, header, total_difficulty);
fill_cfg_and_block_env(
&mut self.evm.env.cfg,
&mut self.evm.env.block,
self.chain_spec,
header,
total_difficulty,
);
}

/// Commit change to database and return change diff that is used to update state and create
Expand Down
2 changes: 1 addition & 1 deletion crates/interfaces/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ reth-eth-wire = { path = "../net/eth-wire" }

# codecs
parity-scale-codec = { version = "3.2.1", features = ["bytes"] }
futures = "0.3.25"
futures = "0.3"
tokio-stream = "0.1.11"
rand = "0.8.5"
arbitrary = { version = "1.1.7", features = ["derive"], optional = true }
Expand Down
3 changes: 3 additions & 0 deletions crates/interfaces/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,7 @@ pub enum ProviderError {
/// Thrown when required header related data was not found but was required.
#[error("requested data not found")]
HeaderNotFound,
/// Thrown when the cache service task dropped
#[error("cache service task stopped")]
CacheServiceUnavailable,
}
11 changes: 6 additions & 5 deletions crates/revm/revm-primitives/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@ use reth_primitives::{
Address, ChainSpec, Head, Header, Transaction, TransactionKind, TransactionSigned, TxEip1559,
TxEip2930, TxLegacy, U256,
};
use revm::primitives::{AnalysisKind, BlockEnv, CfgEnv, Env, SpecId, TransactTo, TxEnv};
use revm::primitives::{AnalysisKind, BlockEnv, CfgEnv, SpecId, TransactTo, TxEnv};

/// Convenience function to call both [fill_cfg_env] and [fill_block_env]
pub fn fill_cfg_and_block_env(
env: &mut Env,
cfg: &mut CfgEnv,
block_env: &mut BlockEnv,
chain_spec: &ChainSpec,
header: &Header,
total_difficulty: U256,
) {
fill_cfg_env(&mut env.cfg, chain_spec, header, total_difficulty);
let after_merge = env.cfg.spec_id >= SpecId::MERGE;
fill_block_env(&mut env.block, header, after_merge);
fill_cfg_env(cfg, chain_spec, header, total_difficulty);
let after_merge = cfg.spec_id >= SpecId::MERGE;
fill_block_env(block_env, header, after_merge);
}

/// Fill [CfgEnv] fields according to the chain spec and given header
Expand Down
2 changes: 2 additions & 0 deletions crates/rpc/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ thiserror = "1.0"
hex = "0.4"
rand = "0.8.5"
tracing = "0.1"
schnellru = "0.2"
futures = "0.3.26"

[dev-dependencies]
jsonrpsee = { version = "0.16", features = ["client"]}
Expand Down
285 changes: 285 additions & 0 deletions crates/rpc/rpc/src/eth/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
//! Async caching support for eth RPC

use futures::StreamExt;
use reth_interfaces::{provider::ProviderError, Result};
use reth_primitives::{Block, H256};
use reth_provider::{BlockProvider, EvmEnvProvider, StateProviderFactory};
use reth_tasks::TaskSpawner;
use revm::primitives::{BlockEnv, CfgEnv};
use schnellru::{ByMemoryUsage, Limiter, LruMap};
use std::{
collections::{hash_map::Entry, HashMap},
future::Future,
hash::Hash,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedSender},
oneshot,
};
use tokio_stream::wrappers::UnboundedReceiverStream;

/// The type that can send the response to a requested [Block]
type BlockResponseSender = oneshot::Sender<Result<Option<Block>>>;

/// The type that can send the response to a requested env
type EnvResponseSender = oneshot::Sender<Result<(CfgEnv, BlockEnv)>>;

type BlockLruCache<L> = MultiConsumerLruCache<H256, Block, L, BlockResponseSender>;

type EnvLruCache<L> = MultiConsumerLruCache<H256, (CfgEnv, BlockEnv), L, EnvResponseSender>;

/// Provides async access to cached eth data
///
/// This is the frontend to the [EthStateCacheService] which manages cached data on a different
/// task.
#[derive(Debug, Clone)]
pub(crate) struct EthStateCache {
to_service: UnboundedSender<CacheAction>,
}

impl EthStateCache {
/// Creates and returns both [EthStateCache] frontend and the memory bound service.
fn create<Client>(
client: Client,
action_task_spawner: Box<dyn TaskSpawner>,
max_block_bytes: usize,
max_env_bytes: usize,
) -> (Self, EthStateCacheService<Client>) {
let (to_service, rx) = unbounded_channel();
let service = EthStateCacheService {
client,
full_block_cache: BlockLruCache::with_memory_budget(max_block_bytes),
evm_env_cache: EnvLruCache::with_memory_budget(max_env_bytes),
action_tx: to_service.clone(),
action_rx: UnboundedReceiverStream::new(rx),
action_task_spawner,
};
let cache = EthStateCache { to_service };
(cache, service)
}

/// Creates a new async LRU backed cache service task and spawns it to a new task via the given
/// spawner.
///
/// The cache is memory limited by the given max bytes values.
pub(crate) fn spawn<Client>(
client: Client,
spawner: Box<dyn TaskSpawner>,
max_block_bytes: usize,
max_env_bytes: usize,
) -> Self
where
Client: StateProviderFactory + BlockProvider + EvmEnvProvider + Clone + Unpin + 'static,
{
let (this, service) = Self::create(client, spawner.clone(), max_block_bytes, max_env_bytes);
spawner.spawn(Box::pin(service));
this
}

/// Requests the [Block] for the block hash
///
/// Returns `None` if the block does not exist.
pub(crate) async fn get_block(&self, block_hash: H256) -> Result<Option<Block>> {
let (response_tx, rx) = oneshot::channel();
let _ = self.to_service.send(CacheAction::GetBlock { block_hash, response_tx });
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
}

/// Requests the evm env config for the block hash.
///
/// Returns an error if the corresponding header (required for populating the envs) was not
/// found.
pub(crate) async fn get_evm_evn(&self, block_hash: H256) -> Result<(CfgEnv, BlockEnv)> {
let (response_tx, rx) = oneshot::channel();
let _ = self.to_service.send(CacheAction::GetEnv { block_hash, response_tx });
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
}
}

/// A task than manages caches for data required by the `eth` rpc implementation.
///
/// It provides a caching layer on top of the given [StateProvider](reth_provider::StateProvider)
/// and keeps data fetched via the provider in memory in an LRU cache. If the requested data is
/// missing in the cache it is fetched and inserted into the cache afterwards. While fetching data
/// from disk is sync, this service is async since requests and data is shared via channels.
///
/// This type is an endless future that listens for incoming messages from the user facing
/// [EthStateCache] via a channel. If the requested data is not cached then it spawns a new task
/// that does the IO and sends the result back to it. This way the [EthStateCacheService] only
/// handles messages and does LRU lookups and never blocking IO.
///
/// Caution: The channel for the data is _unbounded_ it is assumed that this is mainly used by the
/// [EthApi](crate::EthApi) which is typically invoked by the RPC server, which already uses permits
/// to limit concurrent requests.
#[must_use = "Type does nothing unless spawned"]
pub(crate) struct EthStateCacheService<
Client,
LimitBlocks = ByMemoryUsage,
LimitEnvs = ByMemoryUsage,
> where
LimitBlocks: Limiter<H256, Block>,
LimitEnvs: Limiter<H256, (CfgEnv, BlockEnv)>,
{
/// The type used to lookup data from disk
client: Client,
/// The LRU cache for full blocks grouped by their hash.
full_block_cache: BlockLruCache<LimitBlocks>,
/// The LRU cache for revm environments
evm_env_cache: EnvLruCache<LimitEnvs>,
/// Sender half of the action channel.
action_tx: UnboundedSender<CacheAction>,
/// Receiver half of the action channel.
action_rx: UnboundedReceiverStream<CacheAction>,
/// The type that's used to spawn tasks that do the actual work
action_task_spawner: Box<dyn TaskSpawner>,
}

impl<Client> Future for EthStateCacheService<Client>
where
Client: StateProviderFactory + BlockProvider + EvmEnvProvider + Clone + Unpin + 'static,
{
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();

loop {
match ready!(this.action_rx.poll_next_unpin(cx)) {
None => {
unreachable!("can't close")
}
Some(action) => {
match action {
CacheAction::GetBlock { block_hash, response_tx } => {
// check if block is cached
if let Some(block) =
this.full_block_cache.cache.get(&block_hash).cloned()
{
let _ = response_tx.send(Ok(Some(block)));
continue
}

// block is not in the cache, request it if this is the first consumer
if this.full_block_cache.queue(block_hash, response_tx) {
let client = this.client.clone();
let action_tx = this.action_tx.clone();
this.action_task_spawner.spawn(Box::pin(async move {
let res = client.block_by_hash(block_hash);
let _ = action_tx
.send(CacheAction::BlockResult { block_hash, res });
}));
}
}
CacheAction::GetEnv { block_hash, response_tx } => {
// check if env data is cached
if let Some(env) = this.evm_env_cache.cache.get(&block_hash).cloned() {
let _ = response_tx.send(Ok(env));
continue
}

// env data is not in the cache, request it if this is the first
// consumer
if this.evm_env_cache.queue(block_hash, response_tx) {
let client = this.client.clone();
let action_tx = this.action_tx.clone();
this.action_task_spawner.spawn(Box::pin(async move {
let mut cfg = CfgEnv::default();
let mut block_env = BlockEnv::default();
let res = client
.fill_env_at(&mut cfg, &mut block_env, block_hash.into())
.map(|_| (cfg, block_env));
let _ = action_tx.send(CacheAction::EnvResult {
block_hash,
res: Box::new(res),
});
}));
}
}
CacheAction::BlockResult { block_hash, res } => {
if let Some(queued) = this.full_block_cache.queued.remove(&block_hash) {
// send the response to queued senders
for tx in queued {
let _ = tx.send(res.clone());
}
}

// cache good block
if let Ok(Some(block)) = res {
this.full_block_cache.cache.insert(block_hash, block);
}
}
CacheAction::EnvResult { block_hash, res } => {
let res = *res;
if let Some(queued) = this.evm_env_cache.queued.remove(&block_hash) {
// send the response to queued senders
for tx in queued {
let _ = tx.send(res.clone());
}
}

// cache good env data
if let Ok(data) = res {
this.evm_env_cache.cache.insert(block_hash, data);
}
}
}
}
}
}
}
}

struct MultiConsumerLruCache<K, V, L, S>
where
K: Hash + Eq,
L: Limiter<K, V>,
{
/// The LRU cache for the
cache: LruMap<K, V, L>,
/// All queued consumers
queued: HashMap<K, Vec<S>>,
}

impl<K, V, L, S> MultiConsumerLruCache<K, V, L, S>
where
K: Hash + Eq,
L: Limiter<K, V>,
{
/// Adds the sender to the queue for the given key.
///
/// Returns true if this is the first queued sender for the key
fn queue(&mut self, key: K, sender: S) -> bool {
match self.queued.entry(key) {
Entry::Occupied(mut entry) => {
entry.get_mut().push(sender);
false
}
Entry::Vacant(entry) => {
entry.insert(vec![sender]);
true
}
}
}
}

impl<K, V, S> MultiConsumerLruCache<K, V, ByMemoryUsage, S>
where
K: Hash + Eq,
{
/// Creates a new empty map with a given `memory_budget`.
///
/// See also [LruMap::with_memory_budget]
fn with_memory_budget(memory_budget: usize) -> Self {
Self { cache: LruMap::with_memory_budget(memory_budget), queued: Default::default() }
}
}

/// All message variants sent through the channel
enum CacheAction {
GetBlock { block_hash: H256, response_tx: BlockResponseSender },
GetEnv { block_hash: H256, response_tx: EnvResponseSender },
BlockResult { block_hash: H256, res: Result<Option<Block>> },
EnvResult { block_hash: H256, res: Box<Result<(CfgEnv, BlockEnv)>> },
}
1 change: 1 addition & 0 deletions crates/rpc/rpc/src/eth/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! `eth` namespace handler implementation.

mod api;
mod cache;
pub(crate) mod error;
mod filter;
mod pubsub;
Expand Down
Loading