From 24d381b65da9f860d571d945221875b0cae3111a Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 9 Apr 2026 09:22:48 +0800 Subject: [PATCH 1/4] refactor: parallelize `collect_event` for tipset range --- src/rpc/methods/eth/filter/mod.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/rpc/methods/eth/filter/mod.rs b/src/rpc/methods/eth/filter/mod.rs index 40e5d12a78c..25701776778 100644 --- a/src/rpc/methods/eth/filter/mod.rs +++ b/src/rpc/methods/eth/filter/mod.rs @@ -14,6 +14,7 @@ //! - **Event Filter**: Captures blockchain events, such as smart contract log events, emitted by specific actors. //! - **TipSet Filter**: Tracks changes in the blockchain's tipset (the latest set of blocks). //! - **Mempool Filter**: Monitors the Ethereum mempool for new pending transactions that meet certain criteria. + pub mod event; pub mod mempool; mod store; @@ -44,6 +45,7 @@ use crate::utils::misc::env::env_or_default; use ahash::AHashMap as HashMap; use anyhow::{Context, Error, anyhow, bail, ensure}; use cid::Cid; +use futures::{TryStreamExt as _, stream::FuturesOrdered}; use fvm_ipld_blockstore::Blockstore; use fvm_ipld_encoding::IPLD_RAW; use serde::*; @@ -405,12 +407,26 @@ impl EthEventHandler { ctx.chain_store().heaviest_tipset(), ResolveNullTipset::TakeOlder, )?; + let mut tasks = FuturesOrdered::new(); for tipset in max_tipset .chain(&ctx.store()) .take_while(|ts| ts.epoch() >= *range.start()) { - Self::collect_events(ctx, &tipset, Some(pf), skip_event, &mut collected_events) + tasks.push_back(async move { + let mut collected_events = vec![]; + Self::collect_events( + ctx, + &tipset, + Some(pf), + skip_event, + &mut collected_events, + ) .await?; + anyhow::Ok(collected_events) + }); + } + while let Some(events) = tasks.try_next().await? { + collected_events.extend(events); } } } From db51085d5429bd87fa9f7b0763a3c49446c52296 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 9 Apr 2026 10:13:30 +0800 Subject: [PATCH 2/4] max_filter_results check --- src/rpc/methods/eth/filter/mod.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/rpc/methods/eth/filter/mod.rs b/src/rpc/methods/eth/filter/mod.rs index 25701776778..45d592b30ef 100644 --- a/src/rpc/methods/eth/filter/mod.rs +++ b/src/rpc/methods/eth/filter/mod.rs @@ -426,6 +426,14 @@ impl EthEventHandler { }); } while let Some(events) = tasks.try_next().await? { + let remaining = self + .max_filter_results + .saturating_sub(collected_events.len()); + ensure!( + events.len() <= remaining, + "filter matches too many events (maximum {}), try a more restricted filter", + self.max_filter_results + ); collected_events.extend(events); } } From 15dc5b34790a530f1e3006e1f5b0933146ab4ab9 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Thu, 9 Apr 2026 21:35:09 +0800 Subject: [PATCH 3/4] collect_events_for_tipsets --- src/rpc/methods/eth/filter/mod.rs | 75 ++++++++++++++++++------------- 1 file changed, 43 insertions(+), 32 deletions(-) diff --git a/src/rpc/methods/eth/filter/mod.rs b/src/rpc/methods/eth/filter/mod.rs index 45d592b30ef..7b200083033 100644 --- a/src/rpc/methods/eth/filter/mod.rs +++ b/src/rpc/methods/eth/filter/mod.rs @@ -263,6 +263,33 @@ impl EthEventHandler { ) } + pub async fn collect_events_for_tipsets( + ctx: &Ctx, + tipsets: impl Iterator, + spec: Option<&impl Matcher>, + skip_event: SkipEvent, + collected_events: &mut Vec, + ) -> anyhow::Result<()> { + let mut tasks = FuturesOrdered::new(); + for tipset in tipsets { + tasks.push_back(async move { + let mut events = vec![]; + Self::collect_events(ctx, &tipset, spec, skip_event, &mut events).await?; + anyhow::Ok(events) + }); + } + let max_filter_results = ctx.eth_event_handler.max_filter_results; + while let Some(events) = tasks.try_next().await? { + let remaining = max_filter_results.saturating_sub(collected_events.len()); + ensure!( + events.len() <= remaining, + "filter matches too many events (maximum {max_filter_results}), try a more restricted filter" + ); + collected_events.extend(events); + } + Ok(()) + } + pub async fn collect_events( ctx: &Ctx, tipset: &Tipset, @@ -270,6 +297,7 @@ impl EthEventHandler { skip_event: SkipEvent, collected_events: &mut Vec, ) -> anyhow::Result<()> { + let max_filter_results = ctx.eth_event_handler.max_filter_results; let height = tipset.epoch(); let tipset_key = tipset.key(); let ExecutedTipset { @@ -340,8 +368,10 @@ impl EthEventHandler { msg_idx: msg_idx as u64, msg_cid: message.cid(), }; - if collected_events.len() >= ctx.eth_event_handler.max_filter_results { - bail!("filter matches too many events, try a more restricted filter"); + if collected_events.len() >= max_filter_results { + bail!( + "filter matches too many events (maximum {max_filter_results} allowed), try a more restricted filter" + ); } collected_events.push(ce); } @@ -401,41 +431,22 @@ impl EthEventHandler { } else { *range.end() }; - let max_tipset = ctx.chain_index().tipset_by_height( max_height, ctx.chain_store().heaviest_tipset(), ResolveNullTipset::TakeOlder, )?; - let mut tasks = FuturesOrdered::new(); - for tipset in max_tipset - .chain(&ctx.store()) - .take_while(|ts| ts.epoch() >= *range.start()) - { - tasks.push_back(async move { - let mut collected_events = vec![]; - Self::collect_events( - ctx, - &tipset, - Some(pf), - skip_event, - &mut collected_events, - ) - .await?; - anyhow::Ok(collected_events) - }); - } - while let Some(events) = tasks.try_next().await? { - let remaining = self - .max_filter_results - .saturating_sub(collected_events.len()); - ensure!( - events.len() <= remaining, - "filter matches too many events (maximum {}), try a more restricted filter", - self.max_filter_results - ); - collected_events.extend(events); - } + let tipsets = max_tipset + .chain(ctx.store()) + .take_while(|ts| ts.epoch() >= *range.start()); + Self::collect_events_for_tipsets( + ctx, + tipsets, + Some(pf), + skip_event, + &mut collected_events, + ) + .await?; } } From b5270699460581f0c7e251b6872dec6b2ba0aa92 Mon Sep 17 00:00:00 2001 From: hanabi1224 Date: Fri, 10 Apr 2026 14:19:57 +0800 Subject: [PATCH 4/4] fix --- src/rpc/methods/eth/filter/mod.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/rpc/methods/eth/filter/mod.rs b/src/rpc/methods/eth/filter/mod.rs index 7b200083033..cef3ae19e2c 100644 --- a/src/rpc/methods/eth/filter/mod.rs +++ b/src/rpc/methods/eth/filter/mod.rs @@ -368,11 +368,10 @@ impl EthEventHandler { msg_idx: msg_idx as u64, msg_cid: message.cid(), }; - if collected_events.len() >= max_filter_results { - bail!( - "filter matches too many events (maximum {max_filter_results} allowed), try a more restricted filter" - ); - } + ensure!( + collected_events.len() <= max_filter_results, + "filter matches too many events (maximum {max_filter_results} allowed), try a more restricted filter" + ); collected_events.push(ce); } }