Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
0cb4a4e
Revive the previous eth_subscribe PR from the graveyard to the battle…
elmattic Jun 17, 2025
cba87c3
Subscription methods use RpcMethod as well
elmattic Jun 18, 2025
b6ceae6
Remove now redundant code
elmattic Jun 18, 2025
44b4cca
Add support for aliases
elmattic Jun 18, 2025
222bc9e
Move callback to eth module
elmattic Jun 18, 2025
86240dc
Better error handling
elmattic Jun 18, 2025
a543037
Merge branch 'main' into elmattic/eth-subscribe
elmattic Jun 18, 2025
57aef9d
Make clippy happy
elmattic Jun 18, 2025
693a6d6
Refactor and add mod level documentation
elmattic Jun 19, 2025
65c61a4
Merge branch 'main' into elmattic/eth-subscribe
elmattic Jun 19, 2025
fe0417b
Merge branch 'main' into elmattic/eth-subscribe
elmattic Jun 19, 2025
a5e0982
Check for logs
elmattic Jun 20, 2025
55225b0
Add eth_logs
elmattic Jun 20, 2025
562a63c
Refactor
elmattic Jun 20, 2025
0cd8d86
Send event only if logs is non-empty
elmattic Jun 24, 2025
aa7a4f4
Rework params parser
elmattic Jun 25, 2025
09c09c0
Fix lint errors
elmattic Jun 25, 2025
a1caba8
Merge branch 'main' into elmattic/eth-subscribe
elmattic Jun 30, 2025
f19f808
Merge branch 'main' into elmattic/eth-subscribe
elmattic Jul 15, 2025
12e458a
Refactor
elmattic Jul 15, 2025
09b8871
Merge branch 'main' into elmattic/eth-subscribe
elmattic Jul 15, 2025
5d0ed6c
Update CHANGELOG
elmattic Jul 16, 2025
e41cfa1
Update ignore list
elmattic Jul 16, 2025
28f7a94
Remove unwraps
elmattic Jul 16, 2025
1910206
Remove unwrap
elmattic Jul 16, 2025
7464d10
Merge branch 'main' into elmattic/eth-subscribe
elmattic Jul 16, 2025
de8213c
Abort subscription associated task
elmattic Jul 16, 2025
cc3dbe1
Merge branch 'main' into elmattic/eth-subscribe
elmattic Jul 17, 2025
3ad7f33
Merge branch 'main' into elmattic/eth-subscribe
elmattic Jul 28, 2025
4d5f1cd
Add comments to explain why we need placeholders
elmattic Jul 28, 2025
0fe9ce5
Use a smaller capacity
elmattic Jul 28, 2025
b986e48
Remove doc comment
elmattic Jul 28, 2025
2e994b8
Add some doc comments
elmattic Jul 28, 2025
5377199
Merge branch 'main' into elmattic/eth-subscribe
elmattic Jul 28, 2025
1e02435
Merge branch 'main' into elmattic/eth-subscribe
elmattic Jul 29, 2025
d569eb4
Merge branch 'main' into elmattic/eth-subscribe
elmattic Jul 29, 2025
065b89f
refactor: use macros to register the subscription APIs (#5782)
akaladarshi Jul 29, 2025
76e130d
Merge branch 'main' into elmattic/eth-subscribe
elmattic Jul 29, 2025
dcb8576
Merge branch 'main' into elmattic/eth-subscribe
elmattic Jul 30, 2025
a25d9a7
Add actual links in comments
elmattic Jul 30, 2025
0e68b0f
Add error logs
elmattic Jul 30, 2025
c1f4969
Address CR comments
elmattic Jul 31, 2025
6a7de37
Merge branch 'main' into elmattic/eth-subscribe
elmattic Jul 31, 2025
16f06fe
Merge branch 'main' into elmattic/eth-subscribe
elmattic Jul 31, 2025
e3811e8
Address comment
elmattic Jul 31, 2025
290b5c4
Merge branch 'main' into elmattic/eth-subscribe
elmattic Aug 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

- [#5859](https://github.com/ChainSafe/forest/pull/5859) Added size metrics for zstd frame cache and made max size configurable via `FOREST_ZSTD_FRAME_CACHE_DEFAULT_MAX_SIZE` environment variable.

- [#4976](https://github.com/ChainSafe/forest/issues/4976) Add support for the `Filecoin.EthSubscribe` and `Filecoin.EthUnsubscribe` API methods to enable subscriptions to Ethereum event types: `heads` and `logs`.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should claim we added support for EthUnsubscribe. As the AI reviewer noted, it's a placeholder implementation.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the EthSubscribe seems also empty?

Copy link
Copy Markdown
Contributor Author

@elmattic elmattic Jul 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some comments to explain why we need them.


### Changed

- [#5869](https://github.com/ChainSafe/forest/pull/5869) Updated `forest-cli snapshot export` to print average speed.
Expand Down
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ integer-encoding = "4.0"
ipld-core = { version = "0.4", features = ["serde", "arb"] }
is-terminal = "0.4"
itertools = "0.14"
jsonrpsee = { version = "0.25", features = ["server", "ws-client", "http-client"] }
jsonrpsee = { version = "0.25", features = ["server", "ws-client", "http-client", "macros"] }
jsonwebtoken = "9"
keccak-hash = "0.11"
kubert-prometheus-process = "0.2"
Expand Down
4 changes: 2 additions & 2 deletions src/rpc/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,8 @@ impl RpcModule {
Ok(msg) => {
match create_notif_message(&sink, &msg) {
Ok(msg) => {
// This fails only if the connection is closed
if sink.send(msg).await.is_err() {
if let Err(e) = sink.send(msg).await {
tracing::error!("Failed to send message: {:?}", e);
break;
}
}
Expand Down
79 changes: 78 additions & 1 deletion src/rpc/methods/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::lotus_json::{HasLotusJson, LotusJson, lotus_json_with_self};
#[cfg(test)]
use crate::lotus_json::{assert_all_snapshots, assert_unchanged_via_json};
use crate::message::{ChainMessage, SignedMessage};
use crate::rpc::eth::{EthLog, eth_logs_with_filter, types::ApiHeaders, types::EthFilterSpec};
use crate::rpc::types::{ApiTipsetKey, Event};
use crate::rpc::{ApiPaths, Ctx, EthEventHandler, Permission, RpcMethod, ServerError};
use crate::shim::clock::ChainEpoch;
Expand Down Expand Up @@ -45,6 +46,82 @@ use tokio::sync::{
Mutex,
broadcast::{self, Receiver as Subscriber},
};
use tokio::task::JoinHandle;

const HEAD_CHANNEL_CAPACITY: usize = 10;

/// Subscribes to head changes from the chain store and broadcasts new blocks.
///
/// # Notes
///
/// Spawns an internal `tokio` task that can be aborted anytime via the returned `JoinHandle`,
/// allowing manual cleanup if needed.
pub(crate) fn new_heads<DB: Blockstore>(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some docs would be helpful.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

data: &crate::rpc::RPCState<DB>,
) -> (Subscriber<ApiHeaders>, JoinHandle<()>) {
let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY);

let mut subscriber = data.chain_store().publisher().subscribe();

let handle = tokio::spawn(async move {
while let Ok(v) = subscriber.recv().await {
let headers = match v {
HeadChange::Apply(ts) => ApiHeaders(ts.block_headers().clone().into()),
};
if let Err(e) = sender.send(headers) {
tracing::error!("Failed to send headers: {}", e);
break;
}
}
});

(receiver, handle)
}

/// Subscribes to head changes from the chain store and broadcasts new `Ethereum` logs.
///
/// # Notes
///
/// Spawns an internal `tokio` task that can be aborted anytime via the returned `JoinHandle`,
/// allowing manual cleanup if needed.
pub(crate) fn logs<DB: Blockstore + Sync + Send + 'static>(
ctx: &Ctx<DB>,
filter: Option<EthFilterSpec>,
) -> (Subscriber<Vec<EthLog>>, JoinHandle<()>) {
let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY);

let mut subscriber = ctx.chain_store().publisher().subscribe();

let ctx = ctx.clone();

let handle = tokio::spawn(async move {
while let Ok(v) = subscriber.recv().await {
match v {
HeadChange::Apply(ts) => {
match eth_logs_with_filter(&ctx, &ts, filter.clone(), None).await {
Ok(logs) => {
if !logs.is_empty() {
if let Err(e) = sender.send(logs) {
tracing::error!(
"Failed to send logs for tipset {}: {}",
ts.key(),
e
);
break;
}
}
}
Err(e) => {
tracing::error!("Failed to fetch logs for tipset {}: {}", ts.key(), e);
}
}
}
}
}
});

(receiver, handle)
}

pub enum ChainGetMessage {}
impl RpcMethod<1> for ChainGetMessage {
Expand Down Expand Up @@ -721,7 +798,7 @@ pub(crate) fn chain_notify<DB: Blockstore>(
_params: Params<'_>,
data: &crate::rpc::RPCState<DB>,
) -> Subscriber<Vec<ApiHeadChange>> {
let (sender, receiver) = broadcast::channel(100);
let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY);

// As soon as the channel is created, send the current tipset
let current = data.chain_store().heaviest_tipset();
Expand Down
77 changes: 70 additions & 7 deletions src/rpc/methods/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
pub(crate) mod errors;
mod eth_tx;
pub mod filter;
pub mod pubsub;
pub(crate) mod pubsub_trait;
mod trace;
pub mod types;
mod utils;
Expand Down Expand Up @@ -1325,7 +1327,7 @@ async fn new_eth_tx_receipt<DB: Blockstore + Send + Sync + 'static>(
Ok(tx_receipt)
}

async fn eth_logs_for_block_and_transaction<DB: Blockstore + Send + Sync + 'static>(
pub async fn eth_logs_for_block_and_transaction<DB: Blockstore + Send + Sync + 'static>(
ctx: &Ctx<DB>,
ts: &Arc<Tipset>,
block_hash: &EthHash,
Expand All @@ -1336,22 +1338,33 @@ async fn eth_logs_for_block_and_transaction<DB: Blockstore + Send + Sync + 'stat
..Default::default()
};

eth_logs_with_filter(ctx, ts, Some(spec), Some(tx_hash)).await
}

pub async fn eth_logs_with_filter<DB: Blockstore + Send + Sync + 'static>(
ctx: &Ctx<DB>,
ts: &Arc<Tipset>,
spec: Option<EthFilterSpec>,
tx_hash: Option<&EthHash>,
) -> anyhow::Result<Vec<EthLog>> {
let mut events = vec![];
EthEventHandler::collect_events(
ctx,
ts,
Some(&spec),
spec.as_ref(),
SkipEvent::OnUnresolvedAddress,
&mut events,
)
.await?;

let logs = eth_filter_logs_from_events(ctx, &events)?;
let out: Vec<_> = logs
.into_iter()
.filter(|log| &log.transaction_hash == tx_hash)
.collect();
Ok(out)
Ok(match tx_hash {
Some(hash) => logs
.into_iter()
.filter(|log| &log.transaction_hash == hash)
.collect(),
None => logs, // no tx hash, keep all logs
})
}

fn get_signed_message<DB: Blockstore>(ctx: &Ctx<DB>, message_cid: Cid) -> Result<SignedMessage> {
Expand Down Expand Up @@ -2639,6 +2652,56 @@ impl RpcMethod<1> for EthUninstallFilter {
}
}

pub enum EthUnsubscribe {}
impl RpcMethod<0> for EthUnsubscribe {
const NAME: &'static str = "Filecoin.EthUnsubscribe";
const NAME_ALIAS: Option<&'static str> = Some("eth_unsubscribe");
const PARAM_NAMES: [&'static str; 0] = [];
const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
const PERMISSION: Permission = Permission::Read;
const SUBSCRIPTION: bool = true;

type Params = ();
type Ok = ();

// This method is a placeholder and is never actually called.
// Subscription handling is performed in [`pubsub.rs`](pubsub).
//
// We still need to implement the [`RpcMethod`] trait to expose method metadata
// like [`NAME`](Self::NAME), [`NAME_ALIAS`](Self::NAME_ALIAS), [`PERMISSION`](Self::PERMISSION), etc..
async fn handle(
_: Ctx<impl Blockstore + Send + Sync + 'static>,
(): Self::Params,
) -> Result<Self::Ok, ServerError> {
Ok(())
}
}

pub enum EthSubscribe {}
impl RpcMethod<0> for EthSubscribe {
const NAME: &'static str = "Filecoin.EthSubscribe";
const NAME_ALIAS: Option<&'static str> = Some("eth_subscribe");
const PARAM_NAMES: [&'static str; 0] = [];
const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
const PERMISSION: Permission = Permission::Read;
const SUBSCRIPTION: bool = true;

type Params = ();
type Ok = ();

// This method is a placeholder and is never actually called.
// Subscription handling is performed in [`pubsub.rs`](pubsub).
//
// We still need to implement the [`RpcMethod`] trait to expose method metadata
// like [`NAME`](Self::NAME), [`NAME_ALIAS`](Self::NAME_ALIAS), [`PERMISSION`](Self::PERMISSION), etc..
async fn handle(
_: Ctx<impl Blockstore + Send + Sync + 'static>,
(): Self::Params,
) -> Result<Self::Ok, ServerError> {
Ok(())
}
}

pub enum EthAddressToFilecoinAddress {}
impl RpcMethod<1> for EthAddressToFilecoinAddress {
const NAME: &'static str = "Filecoin.EthAddressToFilecoinAddress";
Expand Down
Loading
Loading