diff --git a/src/rpc/methods/eth/filter/mod.rs b/src/rpc/methods/eth/filter/mod.rs index 40e5d12a78c..cef3ae19e2c 100644 --- a/src/rpc/methods/eth/filter/mod.rs +++ b/src/rpc/methods/eth/filter/mod.rs @@ -14,6 +14,7 @@ //! - **Event Filter**: Captures blockchain events, such as smart contract log events, emitted by specific actors. //! - **TipSet Filter**: Tracks changes in the blockchain's tipset (the latest set of blocks). //! - **Mempool Filter**: Monitors the Ethereum mempool for new pending transactions that meet certain criteria. + pub mod event; pub mod mempool; mod store; @@ -44,6 +45,7 @@ use crate::utils::misc::env::env_or_default; use ahash::AHashMap as HashMap; use anyhow::{Context, Error, anyhow, bail, ensure}; use cid::Cid; +use futures::{TryStreamExt as _, stream::FuturesOrdered}; use fvm_ipld_blockstore::Blockstore; use fvm_ipld_encoding::IPLD_RAW; use serde::*; @@ -261,6 +263,33 @@ impl EthEventHandler { ) } + pub async fn collect_events_for_tipsets( + ctx: &Ctx, + tipsets: impl Iterator, + spec: Option<&impl Matcher>, + skip_event: SkipEvent, + collected_events: &mut Vec, + ) -> anyhow::Result<()> { + let mut tasks = FuturesOrdered::new(); + for tipset in tipsets { + tasks.push_back(async move { + let mut events = vec![]; + Self::collect_events(ctx, &tipset, spec, skip_event, &mut events).await?; + anyhow::Ok(events) + }); + } + let max_filter_results = ctx.eth_event_handler.max_filter_results; + while let Some(events) = tasks.try_next().await? { + let remaining = max_filter_results.saturating_sub(collected_events.len()); + ensure!( + events.len() <= remaining, + "filter matches too many events (maximum {max_filter_results}), try a more restricted filter" + ); + collected_events.extend(events); + } + Ok(()) + } + pub async fn collect_events( ctx: &Ctx, tipset: &Tipset, @@ -268,6 +297,7 @@ impl EthEventHandler { skip_event: SkipEvent, collected_events: &mut Vec, ) -> anyhow::Result<()> { + let max_filter_results = ctx.eth_event_handler.max_filter_results; let height = tipset.epoch(); let tipset_key = tipset.key(); let ExecutedTipset { @@ -338,9 +368,10 @@ impl EthEventHandler { msg_idx: msg_idx as u64, msg_cid: message.cid(), }; - if collected_events.len() >= ctx.eth_event_handler.max_filter_results { - bail!("filter matches too many events, try a more restricted filter"); - } + ensure!( + collected_events.len() <= max_filter_results, + "filter matches too many events (maximum {max_filter_results} allowed), try a more restricted filter" + ); collected_events.push(ce); } } @@ -399,19 +430,22 @@ impl EthEventHandler { } else { *range.end() }; - let max_tipset = ctx.chain_index().tipset_by_height( max_height, ctx.chain_store().heaviest_tipset(), ResolveNullTipset::TakeOlder, )?; - for tipset in max_tipset - .chain(&ctx.store()) - .take_while(|ts| ts.epoch() >= *range.start()) - { - Self::collect_events(ctx, &tipset, Some(pf), skip_event, &mut collected_events) - .await?; - } + let tipsets = max_tipset + .chain(ctx.store()) + .take_while(|ts| ts.epoch() >= *range.start()); + Self::collect_events_for_tipsets( + ctx, + tipsets, + Some(pf), + skip_event, + &mut collected_events, + ) + .await?; } }