Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions crates/rpc/rpc-builder/src/eth.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use reth_rpc::{
eth::cache::{EthStateCache, EthStateCacheConfig},
EthApi, EthFilter, EthPubSub,
};
use serde::{Deserialize, Serialize};

/// All handlers for the `eth` namespace
#[derive(Debug, Clone)]
pub struct EthHandlers<Client, Pool, Network, Events> {
/// Main `eth_` request handler
pub api: EthApi<Client, Pool, Network>,
/// The async caching layer used by the eth handlers
pub eth_cache: EthStateCache,
/// Polling based filter handler available on all transports
pub filter: EthFilter<Client, Pool>,
/// Handler for subscriptions only available for transports that support it (ws, ipc)
pub pubsub: Option<EthPubSub<Client, Pool, Events>>,
}

/// Additional config values for the eth namespace
#[derive(Debug, Clone, Eq, PartialEq, Default, Serialize, Deserialize)]
pub struct EthConfig {
/// Settings for the caching layer
pub cache: EthStateCacheConfig,
}
144 changes: 107 additions & 37 deletions crates/rpc/rpc-builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use jsonrpsee::{
use reth_ipc::server::IpcServer;
use reth_network_api::{NetworkInfo, Peers};
use reth_provider::{BlockProvider, EvmEnvProvider, HeaderProvider, StateProviderFactory};
use reth_rpc::{AdminApi, DebugApi, EthApi, NetApi, TraceApi, Web3Api};
use reth_rpc::{AdminApi, DebugApi, EthApi, EthFilter, NetApi, TraceApi, Web3Api};
use reth_rpc_api::servers::*;
use reth_transaction_pool::TransactionPool;
use serde::{Deserialize, Serialize, Serializer};
Expand All @@ -84,8 +84,12 @@ pub use reth_ipc::server::{Builder as IpcServerBuilder, Endpoint};
/// Auth server utilities.
pub mod auth;

/// Eth utils
mod eth;

/// Common RPC constants.
pub mod constants;
pub use crate::eth::{EthConfig, EthHandlers};
use constants::*;
use reth_rpc::eth::cache::EthStateCache;
use reth_tasks::TaskSpawner;
Expand Down Expand Up @@ -204,10 +208,17 @@ where

let Self { client, pool, network, executor } = self;

let mut registry = RethModuleRegistry::new(client, pool, network, executor);

if !module_config.is_empty() {
let TransportRpcModuleConfig { http, ws, ipc } = module_config;
let TransportRpcModuleConfig { http, ws, ipc, config } = module_config;

let mut registry = RethModuleRegistry::new(
client,
pool,
network,
executor,
config.unwrap_or_default(),
);

modules.http = registry.maybe_module(http.as_ref());
modules.ws = registry.maybe_module(ws.as_ref());
modules.ipc = registry.maybe_module(ipc.as_ref());
Expand All @@ -223,6 +234,44 @@ impl Default for RpcModuleBuilder<(), (), (), ()> {
}
}

/// Bundles settings for modules
#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct RpcModuleConfig {
/// `eth` namespace settings
eth: EthConfig,
}

// === impl RpcModuleConfig ===

impl RpcModuleConfig {
/// Convenience method to create a new [RpcModuleConfigBuilder]
pub fn builder() -> RpcModuleConfigBuilder {
RpcModuleConfigBuilder::default()
}
}

/// Configures [RpcModuleConfig]
#[derive(Default)]
pub struct RpcModuleConfigBuilder {
eth: Option<EthConfig>,
}

// === impl RpcModuleConfigBuilder ===

impl RpcModuleConfigBuilder {
/// Configures a custom eth namespace config
pub fn eth(mut self, eth: EthConfig) -> Self {
self.eth = Some(eth);
self
}

/// Consumes the type and creates the [RpcModuleConfig]
pub fn build(self) -> RpcModuleConfig {
let RpcModuleConfigBuilder { eth } = self;
RpcModuleConfig { eth: eth.unwrap_or_default() }
}
}

/// Describes the modules that should be installed.
///
/// # Example
Expand All @@ -244,7 +293,7 @@ pub enum RpcModuleSelection {
Selection(Vec<RethRpcModule>),
}

// === impl RpcModuleConfig ===
// === impl RpcModuleSelection ===

impl RpcModuleSelection {
/// The standard modules to instantiate by default `eth`, `net`, `web3`
Expand Down Expand Up @@ -291,6 +340,7 @@ impl RpcModuleSelection {
pool: Pool,
network: Network,
executor: Tasks,
config: RpcModuleConfig,
) -> RpcModule<()>
where
Client: BlockProvider
Expand All @@ -304,7 +354,7 @@ impl RpcModuleSelection {
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
{
let mut registry = RethModuleRegistry::new(client, pool, network, executor);
let mut registry = RethModuleRegistry::new(client, pool, network, executor, config);
registry.module_for(self)
}

Expand Down Expand Up @@ -389,11 +439,10 @@ pub struct RethModuleRegistry<Client, Pool, Network, Tasks> {
pool: Pool,
network: Network,
executor: Tasks,
/// Holds a clone of the async [EthStateCache] channel.
eth_cache: Option<EthStateCache>,
/// Holds a clone of the actual [EthApi] namespace impl since this can be required by other
/// namespaces
eth_api: Option<EthApi<Client, Pool, Network>>,
/// Additional settings for handlers.
config: RpcModuleConfig,
/// Holds a clone of all the eth namespace handlers
eth: Option<EthHandlers<Client, Pool, Network, ()>>,
/// Contains the [Methods] of a module
modules: HashMap<RethRpcModule, Methods>,
}
Expand All @@ -402,16 +451,14 @@ pub struct RethModuleRegistry<Client, Pool, Network, Tasks> {

impl<Client, Pool, Network, Tasks> RethModuleRegistry<Client, Pool, Network, Tasks> {
/// Creates a new, empty instance.
pub fn new(client: Client, pool: Pool, network: Network, executor: Tasks) -> Self {
Self {
client,
pool,
network,
eth_api: None,
executor,
modules: Default::default(),
eth_cache: None,
}
pub fn new(
client: Client,
pool: Pool,
network: Network,
executor: Tasks,
config: RpcModuleConfig,
) -> Self {
Self { client, pool, network, eth: None, executor, modules: Default::default(), config }
}

/// Returns all installed methods
Expand Down Expand Up @@ -538,25 +585,39 @@ where
/// This will spawn exactly one [EthStateCache] service if this is the first time the cache is
/// requested.
pub fn eth_cache(&mut self) -> EthStateCache {
self.eth_cache
.get_or_insert_with(|| {
EthStateCache::spawn_with(
self.client.clone(),
Default::default(),
self.executor.clone(),
)
})
.clone()
self.with_eth(|handlers| handlers.eth_cache.clone())
}

/// Creates the [EthHandlers] type the first time this is called.
fn with_eth<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&EthHandlers<Client, Pool, Network, ()>) -> R,
{
if self.eth.is_none() {
let eth_cache = EthStateCache::spawn_with(
self.client.clone(),
self.config.eth.cache.clone(),
self.executor.clone(),
);
let api = EthApi::new(
self.client.clone(),
self.pool.clone(),
self.network.clone(),
eth_cache.clone(),
);
let filter = EthFilter::new(self.client.clone(), self.pool.clone());

// TODO: install pubsub

let eth = EthHandlers { api, eth_cache, filter, pubsub: None };
self.eth = Some(eth);
}
f(self.eth.as_ref().expect("exists; qed"))
}

/// Returns the configured [EthApi] or creates it if it does not exist yet
fn eth_api(&mut self) -> EthApi<Client, Pool, Network> {
let cache = self.eth_cache();
self.eth_api
.get_or_insert_with(|| {
EthApi::new(self.client.clone(), self.pool.clone(), self.network.clone(), cache)
})
.clone()
self.with_eth(|handlers| handlers.api.clone())
}
}

Expand Down Expand Up @@ -731,7 +792,7 @@ impl RpcServerConfig {
///
/// # Example
///
/// Configure an http transport only
/// Configure a http transport only
///
/// ```
/// use reth_rpc_builder::{RethRpcModule, TransportRpcModuleConfig};
Expand All @@ -746,6 +807,8 @@ pub struct TransportRpcModuleConfig {
ws: Option<RpcModuleSelection>,
/// ipc module configuration
ipc: Option<RpcModuleSelection>,
/// Config for the modules
config: Option<RpcModuleConfig>,
}

// === impl TransportRpcModuleConfig ===
Expand Down Expand Up @@ -784,6 +847,12 @@ impl TransportRpcModuleConfig {
self
}

/// Sets a custom [RpcModuleConfig] for the configured modules.
pub fn with_config(mut self, config: RpcModuleConfig) -> Self {
self.config = Some(config);
self
}

/// Returns true if no transports are configured
pub fn is_empty(&self) -> bool {
self.http.is_none() && self.ws.is_none() && self.ipc.is_none()
Expand Down Expand Up @@ -1058,6 +1127,7 @@ mod tests {
])),
ws: None,
ipc: None,
config: None,
}
)
}
Expand Down
7 changes: 6 additions & 1 deletion crates/rpc/rpc/src/eth/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ pub trait EthApiSpec: Send + Sync {
/// the main impls. This way [`EthApi`] is not limited to [`jsonrpsee`] and can be used standalone
/// or in other network handlers (for example ipc).
#[derive(Clone)]
#[allow(missing_debug_implementations)]
pub struct EthApi<Client, Pool, Network> {
/// All nested fields bundled together.
inner: Arc<EthApiInner<Client, Pool, Network>>,
Expand Down Expand Up @@ -172,6 +171,12 @@ where
}
}

impl<Client, Pool, Events> std::fmt::Debug for EthApi<Client, Pool, Events> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EthApi").finish_non_exhaustive()
}
}

#[async_trait]
impl<Client, Pool, Network> EthApiSpec for EthApi<Client, Pool, Network>
where
Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/rpc/src/eth/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type BlockLruCache<L> = MultiConsumerLruCache<H256, Block, L, BlockResponseSende
type EnvLruCache<L> = MultiConsumerLruCache<H256, (CfgEnv, BlockEnv), L, EnvResponseSender>;

/// Settings for the [EthStateCache]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct EthStateCacheConfig {
/// Max number of bytes for cached block data.
Expand Down
3 changes: 2 additions & 1 deletion crates/rpc/rpc/src/eth/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use tokio_stream::{

/// `Eth` pubsub RPC implementation.
///
/// This handles
/// This handles `eth_subscribe` RPC calls.
#[derive(Clone)]
pub struct EthPubSub<Client, Pool, Events> {
/// All nested fields bundled together.
inner: EthPubSubInner<Client, Pool, Events>,
Expand Down