Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 45 additions & 11 deletions src/rpc/methods/eth/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
//! - **Event Filter**: Captures blockchain events, such as smart contract log events, emitted by specific actors.
//! - **TipSet Filter**: Tracks changes in the blockchain's tipset (the latest set of blocks).
//! - **Mempool Filter**: Monitors the Ethereum mempool for new pending transactions that meet certain criteria.

pub mod event;
pub mod mempool;
mod store;
Expand Down Expand Up @@ -44,6 +45,7 @@ use crate::utils::misc::env::env_or_default;
use ahash::AHashMap as HashMap;
use anyhow::{Context, Error, anyhow, bail, ensure};
use cid::Cid;
use futures::{TryStreamExt as _, stream::FuturesOrdered};
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::IPLD_RAW;
use serde::*;
Expand Down Expand Up @@ -261,13 +263,41 @@ impl EthEventHandler {
)
}

pub async fn collect_events_for_tipsets<DB: Blockstore + Send + Sync + 'static>(
ctx: &Ctx<DB>,
tipsets: impl Iterator<Item = Tipset>,
spec: Option<&impl Matcher>,
skip_event: SkipEvent,
collected_events: &mut Vec<CollectedEvent>,
) -> anyhow::Result<()> {
let mut tasks = FuturesOrdered::new();
Comment thread
hanabi1224 marked this conversation as resolved.
for tipset in tipsets {
tasks.push_back(async move {
let mut events = vec![];
Self::collect_events(ctx, &tipset, spec, skip_event, &mut events).await?;
anyhow::Ok(events)
});
}
let max_filter_results = ctx.eth_event_handler.max_filter_results;
while let Some(events) = tasks.try_next().await? {
let remaining = max_filter_results.saturating_sub(collected_events.len());
ensure!(
events.len() <= remaining,
"filter matches too many events (maximum {max_filter_results}), try a more restricted filter"
);
collected_events.extend(events);
}
Ok(())
}

pub async fn collect_events<DB: Blockstore + Send + Sync + 'static>(
ctx: &Ctx<DB>,
tipset: &Tipset,
spec: Option<&impl Matcher>,
skip_event: SkipEvent,
collected_events: &mut Vec<CollectedEvent>,
) -> anyhow::Result<()> {
let max_filter_results = ctx.eth_event_handler.max_filter_results;
let height = tipset.epoch();
let tipset_key = tipset.key();
let ExecutedTipset {
Expand Down Expand Up @@ -338,9 +368,10 @@ impl EthEventHandler {
msg_idx: msg_idx as u64,
msg_cid: message.cid(),
};
if collected_events.len() >= ctx.eth_event_handler.max_filter_results {
bail!("filter matches too many events, try a more restricted filter");
}
ensure!(
collected_events.len() <= max_filter_results,
"filter matches too many events (maximum {max_filter_results} allowed), try a more restricted filter"
);
collected_events.push(ce);
}
}
Expand Down Expand Up @@ -399,19 +430,22 @@ impl EthEventHandler {
} else {
*range.end()
};

let max_tipset = ctx.chain_index().tipset_by_height(
max_height,
ctx.chain_store().heaviest_tipset(),
ResolveNullTipset::TakeOlder,
)?;
for tipset in max_tipset
.chain(&ctx.store())
.take_while(|ts| ts.epoch() >= *range.start())
{
Self::collect_events(ctx, &tipset, Some(pf), skip_event, &mut collected_events)
.await?;
}
let tipsets = max_tipset
.chain(ctx.store())
.take_while(|ts| ts.epoch() >= *range.start());
Self::collect_events_for_tipsets(
ctx,
tipsets,
Some(pf),
skip_event,
&mut collected_events,
)
.await?;
}
}

Expand Down
Loading