Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion src/rpc/methods/eth/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
//! - **Event Filter**: Captures blockchain events, such as smart contract log events, emitted by specific actors.
//! - **TipSet Filter**: Tracks changes in the blockchain's tipset (the latest set of blocks).
//! - **Mempool Filter**: Monitors the Ethereum mempool for new pending transactions that meet certain criteria.

pub mod event;
pub mod mempool;
mod store;
Expand Down Expand Up @@ -44,6 +45,7 @@ use crate::utils::misc::env::env_or_default;
use ahash::AHashMap as HashMap;
use anyhow::{Context, Error, anyhow, bail, ensure};
use cid::Cid;
use futures::{TryStreamExt as _, stream::FuturesOrdered};
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::IPLD_RAW;
use serde::*;
Expand Down Expand Up @@ -405,12 +407,26 @@ impl EthEventHandler {
ctx.chain_store().heaviest_tipset(),
ResolveNullTipset::TakeOlder,
)?;
let mut tasks = FuturesOrdered::new();
for tipset in max_tipset
.chain(&ctx.store())
.take_while(|ts| ts.epoch() >= *range.start())
{
Self::collect_events(ctx, &tipset, Some(pf), skip_event, &mut collected_events)
tasks.push_back(async move {
Comment thread
hanabi1224 marked this conversation as resolved.
Outdated
let mut collected_events = vec![];
Self::collect_events(
ctx,
&tipset,
Some(pf),
skip_event,
&mut collected_events,
)
.await?;
anyhow::Ok(collected_events)
});
}
while let Some(events) = tasks.try_next().await? {
collected_events.extend(events);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}
}
}
Expand Down
Loading