Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 81 additions & 14 deletions crates/rpc/rpc/src/eth/filter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! `eth_` `Filter` RPC handler implementation

use alloy_consensus::BlockHeader;
use alloy_eips::BlockNumberOrTag;
use alloy_primitives::{Sealable, TxHash};
use alloy_rpc_types_eth::{
BlockNumHash, Filter, FilterBlockOption, FilterChanges, FilterId, Log,
Expand All @@ -17,6 +18,7 @@ use jsonrpsee::{core::RpcResult, server::IdProvider};
use reth_errors::ProviderError;
use reth_primitives_traits::{NodePrimitives, SealedHeader};
use reth_rpc_eth_api::{
helpers::{EthBlocks, LoadReceipt},
EngineEthFilter, EthApiTypes, EthFilterApiServer, FullEthApiTypes, QueryLimits, RpcConvert,
RpcNodeCoreExt, RpcTransaction,
};
Expand Down Expand Up @@ -48,7 +50,11 @@ use tracing::{debug, error, trace};

impl<Eth> EngineEthFilter for EthFilter<Eth>
where
Eth: FullEthApiTypes + RpcNodeCoreExt<Provider: BlockIdReader> + 'static,
Eth: FullEthApiTypes
+ RpcNodeCoreExt<Provider: BlockIdReader>
+ LoadReceipt
+ EthBlocks
+ 'static,
{
/// Returns logs matching given filter object, no query limits
fn logs(
Expand Down Expand Up @@ -193,7 +199,11 @@ where

impl<Eth> EthFilter<Eth>
where
Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader> + RpcNodeCoreExt + 'static,
Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader>
+ RpcNodeCoreExt
+ LoadReceipt
+ EthBlocks
+ 'static,
{
/// Access the underlying provider.
fn provider(&self) -> &Eth::Provider {
Expand Down Expand Up @@ -315,7 +325,7 @@ where
#[async_trait]
impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
where
Eth: FullEthApiTypes + RpcNodeCoreExt + 'static,
Eth: FullEthApiTypes + RpcNodeCoreExt + LoadReceipt + EthBlocks + 'static,
{
/// Handler for `eth_newFilter`
async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
Expand Down Expand Up @@ -356,8 +366,6 @@ where
}
};

//let filter = FilterKind::PendingTransaction(transaction_kind);

// Install the filter and propagate any errors
self.inner.install_filter(transaction_kind).await
}
Expand Down Expand Up @@ -434,6 +442,8 @@ impl<Eth> EthFilterInner<Eth>
where
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
+ EthApiTypes<NetworkTypes: reth_rpc_eth_api::types::RpcTypes>
+ LoadReceipt
+ EthBlocks
+ 'static,
{
/// Access the underlying provider.
Expand Down Expand Up @@ -487,10 +497,43 @@ where
Ok(all_logs)
}
FilterBlockOption::Range { from_block, to_block } => {
// compute the range
let info = self.provider().chain_info()?;
// Handle special case where from block is pending
if from_block.is_some_and(|b| b.is_pending()) {
let to_block = to_block.unwrap_or(BlockNumberOrTag::Pending);
if !(to_block.is_pending() || to_block.is_number()) {
// always empty range
return Ok(Vec::new());
}
// Try to get pending block and receipts
if let Ok(Some(pending_block)) = self.eth_api.local_pending_block().await {
if let BlockNumberOrTag::Number(to_block) = to_block &&
to_block < pending_block.block.number()
{
// this block range is empty based on the user input
return Ok(Vec::new());
}

let info = self.provider().chain_info()?;
if pending_block.block.number() > info.best_number {
// only consider the pending block if it is ahead of the chain
let mut all_logs = Vec::new();
let timestamp = pending_block.block.timestamp();
let block_num_hash = pending_block.block.num_hash();
append_matching_block_logs(
&mut all_logs,
ProviderOrBlock::<Eth::Provider>::Block(pending_block.block),
&filter,
block_num_hash,
&pending_block.receipts,
false, // removed = false for pending blocks
timestamp,
)?;
return Ok(all_logs);
}
}
}

// we start at the most recent block if unset in filter
let info = self.provider().chain_info()?;
let start_block = info.best_number;
let from = from_block
.map(|num| self.provider().convert_block_number(num))
Expand Down Expand Up @@ -912,7 +955,11 @@ where

/// Represents different modes for processing block ranges when filtering logs
enum RangeMode<
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
+ EthApiTypes
+ LoadReceipt
+ EthBlocks
+ 'static,
> {
/// Use cache-based processing for recent blocks
Cached(CachedMode<Eth>),
Expand All @@ -921,7 +968,11 @@ enum RangeMode<
}

impl<
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
+ EthApiTypes
+ LoadReceipt
+ EthBlocks
+ 'static,
> RangeMode<Eth>
{
/// Creates a new `RangeMode`.
Expand Down Expand Up @@ -993,14 +1044,22 @@ impl<

/// Mode for processing blocks using cache optimization for recent blocks
struct CachedMode<
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
+ EthApiTypes
+ LoadReceipt
+ EthBlocks
+ 'static,
> {
filter_inner: Arc<EthFilterInner<Eth>>,
headers_iter: std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>,
}

impl<
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
+ EthApiTypes
+ LoadReceipt
+ EthBlocks
+ 'static,
> CachedMode<Eth>
{
async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
Expand All @@ -1027,7 +1086,11 @@ type ReceiptFetchFuture<P> =

/// Mode for processing blocks using range queries for older blocks
struct RangeBlockMode<
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
+ EthApiTypes
+ LoadReceipt
+ EthBlocks
+ 'static,
> {
filter_inner: Arc<EthFilterInner<Eth>>,
iter: Peekable<std::vec::IntoIter<SealedHeader<<Eth::Provider as HeaderProvider>::Header>>>,
Expand All @@ -1038,7 +1101,11 @@ struct RangeBlockMode<
}

impl<
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes + 'static,
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool>
+ EthApiTypes
+ LoadReceipt
+ EthBlocks
+ 'static,
> RangeBlockMode<Eth>
{
async fn next(&mut self) -> Result<Option<ReceiptBlockResult<Eth::Provider>>, EthFilterError> {
Expand Down