From 0cb4a4ef710daa2ca559e139782dc92d2f7d68c2 Mon Sep 17 00:00:00 2001 From: elmattic Date: Tue, 17 Jun 2025 17:03:55 +0200 Subject: [PATCH 01/29] Revive the previous eth_subscribe PR from the graveyard to the battlefield (a.k.a. main) --- src/rpc/auth_layer.rs | 3 ++ src/rpc/methods/chain.rs | 20 +++++++++ src/rpc/methods/eth/types.rs | 33 +++++++++++++++ src/rpc/mod.rs | 78 ++++++++++++++++++++++++++++++++++++ src/rpc/segregation_layer.rs | 3 ++ 5 files changed, 137 insertions(+) diff --git a/src/rpc/auth_layer.rs b/src/rpc/auth_layer.rs index d7f1ee2f65b8..26ccf87c0bf6 100644 --- a/src/rpc/auth_layer.rs +++ b/src/rpc/auth_layer.rs @@ -39,6 +39,9 @@ static METHOD_NAME2REQUIRED_PERMISSION: Lazy> = Lazy:: access.insert(chain::CHAIN_NOTIFY, Permission::Read); access.insert(CANCEL_METHOD_NAME, Permission::Read); + access.insert("eth_subscribe", Permission::Read); + access.insert("eth_unsubscribe", Permission::Read); + access }); diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 9e2965613888..963597beda6c 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -16,6 +16,7 @@ use crate::lotus_json::{HasLotusJson, LotusJson, lotus_json_with_self}; #[cfg(test)] use crate::lotus_json::{assert_all_snapshots, assert_unchanged_via_json}; use crate::message::{ChainMessage, SignedMessage}; +use crate::rpc::eth::types::ApiHeaders; use crate::rpc::types::{ApiTipsetKey, Event}; use crate::rpc::{ApiPaths, Ctx, EthEventHandler, Permission, RpcMethod, ServerError}; use crate::shim::clock::ChainEpoch; @@ -43,6 +44,25 @@ use tokio::sync::{ broadcast::{self, Receiver as Subscriber}, }; +pub(crate) fn new_heads(data: &crate::rpc::RPCState) -> Subscriber { + let (sender, receiver) = broadcast::channel(100); + + let mut subscriber = data.chain_store().publisher().subscribe(); + + tokio::spawn(async move { + while let Ok(v) = subscriber.recv().await { + let headers = match v { + HeadChange::Apply(ts) => ApiHeaders(ts.block_headers().clone().into()), + }; + if sender.send(headers).is_err() { + break; + } + } + }); + + receiver +} + pub enum ChainGetMessage {} impl RpcMethod<1> for ChainGetMessage { const NAME: &'static str = "Filecoin.ChainGetMessage"; diff --git a/src/rpc/methods/eth/types.rs b/src/rpc/methods/eth/types.rs index 9a988dad981f..1b4ac440134a 100644 --- a/src/rpc/methods/eth/types.rs +++ b/src/rpc/methods/eth/types.rs @@ -2,9 +2,13 @@ // SPDX-License-Identifier: Apache-2.0, MIT use super::*; +use crate::blocks::CachingBlockHeader; use anyhow::ensure; use ipld_core::serde::SerdeError; +use jsonrpsee::core::traits::IdProvider; +use jsonrpsee::types::SubscriptionId; use libsecp256k1::util::FULL_PUBLIC_KEY_SIZE; +use rand::Rng; use serde::de::{IntoDeserializer, value::StringDeserializer}; use std::{hash::Hash, ops::Deref}; @@ -385,6 +389,16 @@ pub struct FilterID(EthHash); lotus_json_with_self!(FilterID); +#[derive(Debug, Serialize, Deserialize, JsonSchema, PartialEq, Eq, Hash, Clone)] +pub struct SubscriptionID(pub String); + +lotus_json_with_self!(SubscriptionID); + +#[derive(Clone, Serialize, Deserialize, PartialEq, Debug)] +pub struct ApiHeaders(#[serde(with = "crate::lotus_json")] pub Vec); + +lotus_json_with_self!(ApiHeaders); + impl FilterID { pub fn new() -> Result { let raw_id = crate::utils::rand::new_uuid_v4(); @@ -394,6 +408,25 @@ impl FilterID { } } +#[derive(Debug, Copy, Clone)] +pub struct RandomHexStringIdProvider {} + +impl RandomHexStringIdProvider { + pub fn new() -> Self { + Self {} + } +} + +impl IdProvider for RandomHexStringIdProvider { + fn next_id(&self) -> SubscriptionId<'static> { + let mut bytes = [0u8; 32]; + let mut rng = rand::thread_rng(); + rng.try_fill(&mut bytes).unwrap(); + + SubscriptionId::Str(format!("{}", EthHash::from(bytes)).into()) + } +} + /// `EthHashList` represents a topic filter that can take one of two forms: /// - `List`: Matches if the hash is present in the vector. /// - `Single`: An optional hash, where: diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 9f41f9ebf37e..9f06ac560edc 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -12,6 +12,8 @@ mod request; mod segregation_layer; mod set_extension_layer; +use crate::rpc::chain::new_heads; +use crate::rpc::eth::types::RandomHexStringIdProvider; use crate::shim::clock::ChainEpoch; pub use client::Client; pub use error::ServerError; @@ -26,6 +28,7 @@ pub use reflect::{ApiPaths, Permission, RpcMethod, RpcMethodExt}; pub use request::Request; use segregation_layer::SegregationLayer; use set_extension_layer::SetExtensionLayer; +use tokio::sync::broadcast::error::RecvError; mod actor_registry; mod error; mod reflect; @@ -120,6 +123,8 @@ macro_rules! for_each_rpc_method { $callback!($crate::rpc::eth::EthNewPendingTransactionFilter); $callback!($crate::rpc::eth::EthNewBlockFilter); $callback!($crate::rpc::eth::EthUninstallFilter); + // $callback!($crate::rpc::eth::EthUnsubscribe); + // $callback!($crate::rpc::eth::EthSubscribe); $callback!($crate::rpc::eth::EthSyncing); $callback!($crate::rpc::eth::EthTraceBlock); $callback!($crate::rpc::eth::EthTraceFilter); @@ -489,6 +494,78 @@ where let keystore = state.keystore.clone(); let mut module = create_module(state.clone()); + module.register_subscription( + "eth_subscribe", + "eth_subscription", + "eth_unsubscribe", + |params, pending, ctx, _ext| async move { + let event_types = match params.parse::>() { + Ok(v) => v, + Err(e) => { + pending + .reject(jsonrpsee::types::ErrorObjectOwned::from(e)) + .await; + // If the subscription has not been "accepted" then + // the return value will be "ignored" as it's not + // allowed to send out any further notifications on + // on the subscription. + return Ok(()); + } + }; + // `event_types` is one OR more of: + // - "newHeads": notify when new blocks arrive + // - "pendingTransactions": notify when new messages arrive in the message pool + // - "logs": notify new event logs that match a criteria + tracing::trace!("Subscribing to events: {:?}", event_types); + + let mut receiver = new_heads(&ctx); + + tokio::spawn(async move { + // Mark the subscription is accepted after the params has been parsed successful. + // This is actually responds the underlying RPC method call and may fail if the + // connection is closed. + let sink = pending.accept().await.unwrap(); + + tracing::trace!("Subscription started (id: {:?})", sink.subscription_id()); + + loop { + tokio::select! { + action = receiver.recv() => { + match action { + Ok(v) => { + match jsonrpsee::SubscriptionMessage::new("eth_subscription", sink.subscription_id(), &v) { + Ok(msg) => { + // This fails only if the connection is closed + if sink.send(msg).await.is_err() { + break; + } + } + Err(e) => { + tracing::error!("Failed to serialize message: {:?}", e); + break; + } + } + } + Err(RecvError::Closed) => { + break; + } + Err(RecvError::Lagged(_)) => { + } + } + } + _ = sink.closed() => { + break; + } + } + } + + tracing::trace!("Subscription task ended (id: {:?})", sink.subscription_id()); + }); + + Ok(()) + }, + )?; + let mut pubsub_module = FilRpcModule::default(); pubsub_module.register_channel("Filecoin.ChainNotify", { @@ -508,6 +585,7 @@ where // Default size (10 MiB) is not enough for methods like `Filecoin.StateMinerActiveSectors` .max_request_body_size(MAX_REQUEST_BODY_SIZE) .max_response_body_size(MAX_RESPONSE_BODY_SIZE) + .set_id_provider(RandomHexStringIdProvider::new()) .build(), ) .set_http_middleware( diff --git a/src/rpc/segregation_layer.rs b/src/rpc/segregation_layer.rs index b4e2d15332da..506e8969999b 100644 --- a/src/rpc/segregation_layer.rs +++ b/src/rpc/segregation_layer.rs @@ -31,6 +31,9 @@ static VERSION_METHODS_MAPPINGS: Lazy>> for_each_rpc_method!(insert); + supported.insert("eth_subscribe"); + supported.insert("eth_unsubscribe"); + map.insert(version, supported); } From cba87c37f7fe5cf7cb63e0660ed738d59d53835e Mon Sep 17 00:00:00 2001 From: elmattic Date: Wed, 18 Jun 2025 11:34:00 +0200 Subject: [PATCH 02/29] Subscription methods use RpcMethod as well --- src/rpc/auth_layer.rs | 3 --- src/rpc/methods/eth.rs | 40 ++++++++++++++++++++++++++++++++++++++++ src/rpc/mod.rs | 14 +++++++++----- src/rpc/reflect/mod.rs | 2 ++ 4 files changed, 51 insertions(+), 8 deletions(-) diff --git a/src/rpc/auth_layer.rs b/src/rpc/auth_layer.rs index 26ccf87c0bf6..d7f1ee2f65b8 100644 --- a/src/rpc/auth_layer.rs +++ b/src/rpc/auth_layer.rs @@ -39,9 +39,6 @@ static METHOD_NAME2REQUIRED_PERMISSION: Lazy> = Lazy:: access.insert(chain::CHAIN_NOTIFY, Permission::Read); access.insert(CANCEL_METHOD_NAME, Permission::Read); - access.insert("eth_subscribe", Permission::Read); - access.insert("eth_unsubscribe", Permission::Read); - access }); diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 07c2c29e0561..d89536028a95 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -2640,6 +2640,46 @@ impl RpcMethod<1> for EthUninstallFilter { } } +pub enum EthUnsubscribe {} +impl RpcMethod<0> for EthUnsubscribe { + const NAME: &'static str = "Filecoin.EthUnsubscribe"; + const NAME_ALIAS: Option<&'static str> = Some("eth_unsubscribe"); + const PARAM_NAMES: [&'static str; 0] = []; + const API_PATHS: BitFlags = ApiPaths::all(); + const PERMISSION: Permission = Permission::Read; + const SUBSCRIPTION: bool = true; + + type Params = (); + type Ok = (); + + async fn handle( + _: Ctx, + (): Self::Params, + ) -> Result { + Ok(()) + } +} + +pub enum EthSubscribe {} +impl RpcMethod<0> for EthSubscribe { + const NAME: &'static str = "Filecoin.EthSubscribe"; + const NAME_ALIAS: Option<&'static str> = Some("eth_subscribe"); + const PARAM_NAMES: [&'static str; 0] = []; + const API_PATHS: BitFlags = ApiPaths::all(); + const PERMISSION: Permission = Permission::Read; + const SUBSCRIPTION: bool = true; + + type Params = (); + type Ok = (); + + async fn handle( + _: Ctx, + (): Self::Params, + ) -> Result { + Ok(()) + } +} + pub enum EthAddressToFilecoinAddress {} impl RpcMethod<1> for EthAddressToFilecoinAddress { const NAME: &'static str = "Filecoin.EthAddressToFilecoinAddress"; diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 9f06ac560edc..2c4d553e56d9 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -123,8 +123,8 @@ macro_rules! for_each_rpc_method { $callback!($crate::rpc::eth::EthNewPendingTransactionFilter); $callback!($crate::rpc::eth::EthNewBlockFilter); $callback!($crate::rpc::eth::EthUninstallFilter); - // $callback!($crate::rpc::eth::EthUnsubscribe); - // $callback!($crate::rpc::eth::EthSubscribe); + $callback!($crate::rpc::eth::EthUnsubscribe); + $callback!($crate::rpc::eth::EthSubscribe); $callback!($crate::rpc::eth::EthSyncing); $callback!($crate::rpc::eth::EthTraceBlock); $callback!($crate::rpc::eth::EthTraceFilter); @@ -702,9 +702,13 @@ where let mut module = RpcModule::from_arc(state); macro_rules! register { ($ty:ty) => { - <$ty>::register(&mut module, ParamStructure::ByPosition).unwrap(); - // Optionally register an alias for the method. - <$ty>::register_alias(&mut module).unwrap(); + // Register only non-subscription RPC methods. + // Subscription methods are registered separately in the RPC module. + if !<$ty>::SUBSCRIPTION { + <$ty>::register(&mut module, ParamStructure::ByPosition).unwrap(); + // Optionally register an alias for the method. + <$ty>::register_alias(&mut module).unwrap(); + } }; } for_each_rpc_method!(register); diff --git a/src/rpc/reflect/mod.rs b/src/rpc/reflect/mod.rs index 252362dedfd6..9a6212b76d6b 100644 --- a/src/rpc/reflect/mod.rs +++ b/src/rpc/reflect/mod.rs @@ -80,6 +80,8 @@ pub trait RpcMethod { ctx: Ctx, params: Self::Params, ) -> impl Future> + Send; + /// If it a subscription method. Defaults to false. + const SUBSCRIPTION: bool = false; } /// The permission required to call an RPC method. From b6ceae6967e128c7fd583884675680b76f3cd2f2 Mon Sep 17 00:00:00 2001 From: elmattic Date: Wed, 18 Jun 2025 11:48:09 +0200 Subject: [PATCH 03/29] Remove now redundant code --- src/rpc/segregation_layer.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/rpc/segregation_layer.rs b/src/rpc/segregation_layer.rs index 506e8969999b..b4e2d15332da 100644 --- a/src/rpc/segregation_layer.rs +++ b/src/rpc/segregation_layer.rs @@ -31,9 +31,6 @@ static VERSION_METHODS_MAPPINGS: Lazy>> for_each_rpc_method!(insert); - supported.insert("eth_subscribe"); - supported.insert("eth_unsubscribe"); - map.insert(version, supported); } From 44b4cca9b4ea2232ccaa802a1c007dd217d14e49 Mon Sep 17 00:00:00 2001 From: elmattic Date: Wed, 18 Jun 2025 12:24:32 +0200 Subject: [PATCH 04/29] Add support for aliases --- src/rpc/mod.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 2c4d553e56d9..fff993754702 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -385,6 +385,7 @@ mod methods { use crate::rpc::auth_layer::AuthLayer; pub use crate::rpc::channel::CANCEL_METHOD_NAME; use crate::rpc::channel::RpcModule as FilRpcModule; +use crate::rpc::eth::{EthSubscribe, EthUnsubscribe}; use crate::rpc::metrics_layer::MetricsLayer; use crate::{chain_sync::network_context::SyncNetworkContext, key_management::KeyStore}; @@ -495,9 +496,9 @@ where let mut module = create_module(state.clone()); module.register_subscription( - "eth_subscribe", + EthSubscribe::NAME, "eth_subscription", - "eth_unsubscribe", + EthUnsubscribe::NAME, |params, pending, ctx, _ext| async move { let event_types = match params.parse::>() { Ok(v) => v, @@ -566,6 +567,13 @@ where }, )?; + if let Some(alias) = EthSubscribe::NAME_ALIAS { + module.register_alias(alias, EthSubscribe::NAME)?; + } + if let Some(alias) = EthUnsubscribe::NAME_ALIAS { + module.register_alias(alias, EthUnsubscribe::NAME)?; + } + let mut pubsub_module = FilRpcModule::default(); pubsub_module.register_channel("Filecoin.ChainNotify", { From 222bc9e60cb69eac71cc73bef93b24f1d13ed0c1 Mon Sep 17 00:00:00 2001 From: elmattic Date: Wed, 18 Jun 2025 14:06:19 +0200 Subject: [PATCH 05/29] Move callback to eth module --- src/rpc/methods/eth.rs | 75 ++++++++++++++++++++++++++++++++++++++++++ src/rpc/mod.rs | 74 +++-------------------------------------- 2 files changed, 79 insertions(+), 70 deletions(-) diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index d89536028a95..8e65adcd6a83 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -71,6 +71,7 @@ use serde::{Deserialize, Serialize}; use std::ops::RangeInclusive; use std::str::FromStr; use std::sync::Arc; +use tokio::sync::broadcast::error::RecvError; use tracing::log; use utils::{decode_payload, lookup_eth_address}; @@ -2680,6 +2681,80 @@ impl RpcMethod<0> for EthSubscribe { } } +pub async fn eth_subscribe( + params: jsonrpsee::types::Params<'static>, + pending: jsonrpsee::core::server::PendingSubscriptionSink, + ctx: Ctx, + _ext: http::Extensions, +) -> impl jsonrpsee::IntoSubscriptionCloseResponse { + let event_types = match params.parse::>() { + Ok(v) => v, + Err(e) => { + pending + .reject(jsonrpsee::types::ErrorObjectOwned::from(e)) + .await; + // If the subscription has not been "accepted" then + // the return value will be "ignored" as it's not + // allowed to send out any further notifications on + // on the subscription. + return Ok(()); + } + }; + // `event_types` is one OR more of: + // - "newHeads": notify when new blocks arrive + // - "pendingTransactions": notify when new messages arrive in the message pool + // - "logs": notify new event logs that match a criteria + tracing::trace!("Subscribing to events: {:?}", event_types); + + let mut receiver = crate::rpc::new_heads(&ctx); + + tokio::spawn(async move { + // Mark the subscription is accepted after the params has been parsed successful. + // This is actually responds the underlying RPC method call and may fail if the + // connection is closed. + let sink = pending.accept().await.unwrap(); + + tracing::trace!("Subscription started (id: {:?})", sink.subscription_id()); + + loop { + tokio::select! { + action = receiver.recv() => { + match action { + Ok(v) => { + match jsonrpsee::SubscriptionMessage::new("eth_subscription", sink.subscription_id(), &v) { + Ok(msg) => { + // This fails only if the connection is closed + if sink.send(msg).await.is_err() { + break; + } + } + Err(e) => { + tracing::error!("Failed to serialize message: {:?}", e); + break; + } + } + } + Err(RecvError::Closed) => { + break; + } + Err(RecvError::Lagged(_)) => { + } + } + } + _ = sink.closed() => { + break; + } + } + } + + tracing::trace!("Subscription task ended (id: {:?})", sink.subscription_id()); + }); + + Ok(()) +} + +pub const ETH_SUBSCRIPTION: &'static str = "eth_subscription"; + pub enum EthAddressToFilecoinAddress {} impl RpcMethod<1> for EthAddressToFilecoinAddress { const NAME: &'static str = "Filecoin.EthAddressToFilecoinAddress"; diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index fff993754702..c36d70d1b196 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -28,7 +28,6 @@ pub use reflect::{ApiPaths, Permission, RpcMethod, RpcMethodExt}; pub use request::Request; use segregation_layer::SegregationLayer; use set_extension_layer::SetExtensionLayer; -use tokio::sync::broadcast::error::RecvError; mod actor_registry; mod error; mod reflect; @@ -385,7 +384,7 @@ mod methods { use crate::rpc::auth_layer::AuthLayer; pub use crate::rpc::channel::CANCEL_METHOD_NAME; use crate::rpc::channel::RpcModule as FilRpcModule; -use crate::rpc::eth::{EthSubscribe, EthUnsubscribe}; +use crate::rpc::eth::{ETH_SUBSCRIPTION, EthSubscribe, EthUnsubscribe, eth_subscribe}; use crate::rpc::metrics_layer::MetricsLayer; use crate::{chain_sync::network_context::SyncNetworkContext, key_management::KeyStore}; @@ -495,78 +494,13 @@ where let keystore = state.keystore.clone(); let mut module = create_module(state.clone()); + // Register `Filecoin.EthSubscribe` and related methods. module.register_subscription( EthSubscribe::NAME, - "eth_subscription", + ETH_SUBSCRIPTION, EthUnsubscribe::NAME, - |params, pending, ctx, _ext| async move { - let event_types = match params.parse::>() { - Ok(v) => v, - Err(e) => { - pending - .reject(jsonrpsee::types::ErrorObjectOwned::from(e)) - .await; - // If the subscription has not been "accepted" then - // the return value will be "ignored" as it's not - // allowed to send out any further notifications on - // on the subscription. - return Ok(()); - } - }; - // `event_types` is one OR more of: - // - "newHeads": notify when new blocks arrive - // - "pendingTransactions": notify when new messages arrive in the message pool - // - "logs": notify new event logs that match a criteria - tracing::trace!("Subscribing to events: {:?}", event_types); - - let mut receiver = new_heads(&ctx); - - tokio::spawn(async move { - // Mark the subscription is accepted after the params has been parsed successful. - // This is actually responds the underlying RPC method call and may fail if the - // connection is closed. - let sink = pending.accept().await.unwrap(); - - tracing::trace!("Subscription started (id: {:?})", sink.subscription_id()); - - loop { - tokio::select! { - action = receiver.recv() => { - match action { - Ok(v) => { - match jsonrpsee::SubscriptionMessage::new("eth_subscription", sink.subscription_id(), &v) { - Ok(msg) => { - // This fails only if the connection is closed - if sink.send(msg).await.is_err() { - break; - } - } - Err(e) => { - tracing::error!("Failed to serialize message: {:?}", e); - break; - } - } - } - Err(RecvError::Closed) => { - break; - } - Err(RecvError::Lagged(_)) => { - } - } - } - _ = sink.closed() => { - break; - } - } - } - - tracing::trace!("Subscription task ended (id: {:?})", sink.subscription_id()); - }); - - Ok(()) - }, + eth_subscribe, )?; - if let Some(alias) = EthSubscribe::NAME_ALIAS { module.register_alias(alias, EthSubscribe::NAME)?; } From 86240dc13c3a27336cd3066f592c550eb8d97383 Mon Sep 17 00:00:00 2001 From: elmattic Date: Wed, 18 Jun 2025 15:10:13 +0200 Subject: [PATCH 06/29] Better error handling --- src/rpc/methods/eth.rs | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 8e65adcd6a83..7b4bd49fe9e7 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -2688,7 +2688,29 @@ pub async fn eth_subscribe( _ext: http::Extensions, ) -> impl jsonrpsee::IntoSubscriptionCloseResponse { let event_types = match params.parse::>() { - Ok(v) => v, + Ok(v) => { + if let Some(event) = v.first() { + if event != "newHeads" { + pending + .reject(jsonrpsee::types::ErrorObjectOwned::owned( + 1, + format!("unsupported event type: {}", event), + None::, + )) + .await; + return Ok(()); + } + } else { + pending + .reject(jsonrpsee::types::ErrorObjectOwned::owned( + 1, + format!("decoding params: expected 1 param, got 0"), + None::, + )) + .await; + return Ok(()); + } + } Err(e) => { pending .reject(jsonrpsee::types::ErrorObjectOwned::from(e)) @@ -2714,7 +2736,10 @@ pub async fn eth_subscribe( // connection is closed. let sink = pending.accept().await.unwrap(); - tracing::trace!("Subscription started (id: {:?})", sink.subscription_id()); + tracing::trace!( + "Subscription task started (id: {:?})", + sink.subscription_id() + ); loop { tokio::select! { From 57aef9d81731d10f6c5176eadd17a101ac3b2c03 Mon Sep 17 00:00:00 2001 From: elmattic Date: Wed, 18 Jun 2025 15:30:40 +0200 Subject: [PATCH 07/29] Make clippy happy --- src/rpc/methods/eth.rs | 7 ++++--- src/rpc/methods/eth/types.rs | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 7b4bd49fe9e7..ef4a9a98f213 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -2704,12 +2704,13 @@ pub async fn eth_subscribe( pending .reject(jsonrpsee::types::ErrorObjectOwned::owned( 1, - format!("decoding params: expected 1 param, got 0"), + "decoding params: expected 1 param, got 0".to_string(), None::, )) .await; return Ok(()); } + v } Err(e) => { pending @@ -2726,7 +2727,7 @@ pub async fn eth_subscribe( // - "newHeads": notify when new blocks arrive // - "pendingTransactions": notify when new messages arrive in the message pool // - "logs": notify new event logs that match a criteria - tracing::trace!("Subscribing to events: {:?}", event_types); + tracing::trace!("Subscribing to events: [{}]", event_types.iter().join(",")); let mut receiver = crate::rpc::new_heads(&ctx); @@ -2778,7 +2779,7 @@ pub async fn eth_subscribe( Ok(()) } -pub const ETH_SUBSCRIPTION: &'static str = "eth_subscription"; +pub const ETH_SUBSCRIPTION: &str = "eth_subscription"; pub enum EthAddressToFilecoinAddress {} impl RpcMethod<1> for EthAddressToFilecoinAddress { diff --git a/src/rpc/methods/eth/types.rs b/src/rpc/methods/eth/types.rs index 1b4ac440134a..b11e18388f8a 100644 --- a/src/rpc/methods/eth/types.rs +++ b/src/rpc/methods/eth/types.rs @@ -420,8 +420,8 @@ impl RandomHexStringIdProvider { impl IdProvider for RandomHexStringIdProvider { fn next_id(&self) -> SubscriptionId<'static> { let mut bytes = [0u8; 32]; - let mut rng = rand::thread_rng(); - rng.try_fill(&mut bytes).unwrap(); + let mut rng = crate::utils::rand::forest_rng(); + rng.fill(&mut bytes); SubscriptionId::Str(format!("{}", EthHash::from(bytes)).into()) } From 693a6d67c910a55b7e569e750ade85e6374c17c0 Mon Sep 17 00:00:00 2001 From: elmattic Date: Thu, 19 Jun 2025 10:48:48 +0200 Subject: [PATCH 08/29] Refactor and add mod level documentation --- src/rpc/methods/eth.rs | 102 +-------------------- src/rpc/methods/eth/pubsub.rs | 165 ++++++++++++++++++++++++++++++++++ src/rpc/mod.rs | 5 +- 3 files changed, 170 insertions(+), 102 deletions(-) create mode 100644 src/rpc/methods/eth/pubsub.rs diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index ef4a9a98f213..4b8dccd6a64c 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -4,6 +4,7 @@ pub(crate) mod errors; mod eth_tx; pub mod filter; +pub mod pubsub; mod trace; pub mod types; mod utils; @@ -71,7 +72,6 @@ use serde::{Deserialize, Serialize}; use std::ops::RangeInclusive; use std::str::FromStr; use std::sync::Arc; -use tokio::sync::broadcast::error::RecvError; use tracing::log; use utils::{decode_payload, lookup_eth_address}; @@ -2681,106 +2681,6 @@ impl RpcMethod<0> for EthSubscribe { } } -pub async fn eth_subscribe( - params: jsonrpsee::types::Params<'static>, - pending: jsonrpsee::core::server::PendingSubscriptionSink, - ctx: Ctx, - _ext: http::Extensions, -) -> impl jsonrpsee::IntoSubscriptionCloseResponse { - let event_types = match params.parse::>() { - Ok(v) => { - if let Some(event) = v.first() { - if event != "newHeads" { - pending - .reject(jsonrpsee::types::ErrorObjectOwned::owned( - 1, - format!("unsupported event type: {}", event), - None::, - )) - .await; - return Ok(()); - } - } else { - pending - .reject(jsonrpsee::types::ErrorObjectOwned::owned( - 1, - "decoding params: expected 1 param, got 0".to_string(), - None::, - )) - .await; - return Ok(()); - } - v - } - Err(e) => { - pending - .reject(jsonrpsee::types::ErrorObjectOwned::from(e)) - .await; - // If the subscription has not been "accepted" then - // the return value will be "ignored" as it's not - // allowed to send out any further notifications on - // on the subscription. - return Ok(()); - } - }; - // `event_types` is one OR more of: - // - "newHeads": notify when new blocks arrive - // - "pendingTransactions": notify when new messages arrive in the message pool - // - "logs": notify new event logs that match a criteria - tracing::trace!("Subscribing to events: [{}]", event_types.iter().join(",")); - - let mut receiver = crate::rpc::new_heads(&ctx); - - tokio::spawn(async move { - // Mark the subscription is accepted after the params has been parsed successful. - // This is actually responds the underlying RPC method call and may fail if the - // connection is closed. - let sink = pending.accept().await.unwrap(); - - tracing::trace!( - "Subscription task started (id: {:?})", - sink.subscription_id() - ); - - loop { - tokio::select! { - action = receiver.recv() => { - match action { - Ok(v) => { - match jsonrpsee::SubscriptionMessage::new("eth_subscription", sink.subscription_id(), &v) { - Ok(msg) => { - // This fails only if the connection is closed - if sink.send(msg).await.is_err() { - break; - } - } - Err(e) => { - tracing::error!("Failed to serialize message: {:?}", e); - break; - } - } - } - Err(RecvError::Closed) => { - break; - } - Err(RecvError::Lagged(_)) => { - } - } - } - _ = sink.closed() => { - break; - } - } - } - - tracing::trace!("Subscription task ended (id: {:?})", sink.subscription_id()); - }); - - Ok(()) -} - -pub const ETH_SUBSCRIPTION: &str = "eth_subscription"; - pub enum EthAddressToFilecoinAddress {} impl RpcMethod<1> for EthAddressToFilecoinAddress { const NAME: &'static str = "Filecoin.EthAddressToFilecoinAddress"; diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs new file mode 100644 index 000000000000..f2f5cb66b025 --- /dev/null +++ b/src/rpc/methods/eth/pubsub.rs @@ -0,0 +1,165 @@ +// Copyright 2019-2025 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +//! Official documentation for the Ethereum pubsub protocol is available at: +//! https://geth.ethereum.org/docs/interacting-with-geth/rpc/pubsub +//! +//! Note that Filecoin uses this protocol without modifications. +//! +//! The sequence diagram for an event subscription is shown below: +//! ```text +//! ┌─────────────┐ ┌─────────────┐ +//! │ WS Client │ │ Node │ +//! └─────────────┘ └─────────────┘ +//! │ │ +//! │ ┌────────────────────────────────┐ │ +//! │──┤ Subscription message ├───────────────────────────────▶ │ +//! │ │ │ │ +//! │ │{ jsonrpc:'2.0', │ │ +//! │ │ id:, │ │ +//! │ │ method:'eth_subscribe', │ │ +//! │ │ params:[] } │ │ +//! │ └────────────────────────────────┘ │ +//! │ ┌────────────────────────────────┐ │ +//! │ ◀───────────────────────────────┤ Opened subscription message ├──│ +//! │ │ │ │ +//! │ │{ jsonrpc:'2.0', │ │ +//! │ │ id:, │ │ +//! │ │ result: } │ │ +//! │ └────────────────────────────────┘ │ +//! │ │ +//! │ │ +//! │ ┌────────────────────────────────┐ │ +//! │ ◀───────────────────────────────┤ Notification message ├──│ +//! │ │ │ │ +//! │ │{ jsonrpc:'2.0', │ │ +//! │ │ method:'eth_subscription', │ │ +//! │ │ params:{ subscription:,│ │ +//! │ │ result: } } │ │ +//! │ └────────────────────────────────┘ │ +//! │ │ +//! │ │ +//! │ │ +//! │ After a few notifications │ +//! │ ┌────────────────────────────────┐ │ +//! │──┤ Cancel subscription ├───────────────────────────────▶ │ +//! │ │ │ │ +//! │ │{ jsonrpc:'2.0', │ │ +//! │ │ id:, │ │ +//! │ │ method:'eth_unsubscribe', │ │ +//! │ │ params:[] } │ │ +//! │ └────────────────────────────────┘ │ +//! │ ┌────────────────────────────────┐ │ +//! │ ◀───────────────────────────────┤ Closed subscription message ├──│ +//! │ │ │ │ +//! │ │{ jsonrpc:'2.0', │ │ +//! │ │ id:, │ │ +//! │ │ result:true } │ │ +//! │ └────────────────────────────────┘ │ +//! ``` +//! + +use crate::rpc::Ctx; +use fvm_ipld_blockstore::Blockstore; +use itertools::Itertools; +use tokio::sync::broadcast::error::RecvError; + +pub const ETH_SUBSCRIPTION: &str = "eth_subscription"; + +pub async fn eth_subscribe( + params: jsonrpsee::types::Params<'static>, + pending: jsonrpsee::core::server::PendingSubscriptionSink, + ctx: Ctx, + _ext: http::Extensions, +) -> impl jsonrpsee::IntoSubscriptionCloseResponse { + let event_types = match params.parse::>() { + Ok(v) => { + if let Some(event) = v.first() { + if event != "newHeads" { + pending + .reject(jsonrpsee::types::ErrorObjectOwned::owned( + 1, + format!("unsupported event type: {}", event), + None::, + )) + .await; + return Ok(()); + } + } else { + pending + .reject(jsonrpsee::types::ErrorObjectOwned::owned( + 1, + "decoding params: expected 1 param, got 0".to_string(), + None::, + )) + .await; + return Ok(()); + } + v + } + Err(e) => { + pending + .reject(jsonrpsee::types::ErrorObjectOwned::from(e)) + .await; + // If the subscription has not been "accepted" then + // the return value will be "ignored" as it's not + // allowed to send out any further notifications on + // on the subscription. + return Ok(()); + } + }; + // `event_types` is one OR more of: + // - "newHeads": notify when new blocks arrive + // - "pendingTransactions": notify when new messages arrive in the message pool + // - "logs": notify new event logs that match a criteria + tracing::trace!("Subscribing to events: [{}]", event_types.iter().join(",")); + + let mut receiver = crate::rpc::new_heads(&ctx); + + tokio::spawn(async move { + // Mark the subscription is accepted after the params has been parsed successful. + // This is actually responds the underlying RPC method call and may fail if the + // connection is closed. + let sink = pending.accept().await.unwrap(); + + tracing::trace!( + "Subscription task started (id: {:?})", + sink.subscription_id() + ); + + loop { + tokio::select! { + action = receiver.recv() => { + match action { + Ok(v) => { + match jsonrpsee::SubscriptionMessage::new("eth_subscription", sink.subscription_id(), &v) { + Ok(msg) => { + // This fails only if the connection is closed + if sink.send(msg).await.is_err() { + break; + } + } + Err(e) => { + tracing::error!("Failed to serialize message: {:?}", e); + break; + } + } + } + Err(RecvError::Closed) => { + break; + } + Err(RecvError::Lagged(_)) => { + } + } + } + _ = sink.closed() => { + break; + } + } + } + + tracing::trace!("Subscription task ended (id: {:?})", sink.subscription_id()); + }); + + Ok(()) +} diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index c76918832da4..793832677c8e 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -385,7 +385,10 @@ mod methods { use crate::rpc::auth_layer::AuthLayer; pub use crate::rpc::channel::CANCEL_METHOD_NAME; use crate::rpc::channel::RpcModule as FilRpcModule; -use crate::rpc::eth::{ETH_SUBSCRIPTION, EthSubscribe, EthUnsubscribe, eth_subscribe}; +use crate::rpc::eth::{ + EthSubscribe, EthUnsubscribe, + pubsub::{ETH_SUBSCRIPTION, eth_subscribe}, +}; use crate::rpc::metrics_layer::MetricsLayer; use crate::{chain_sync::network_context::SyncNetworkContext, key_management::KeyStore}; From a5e0982a05614f5c64aee1c0b69a4de71b65fe8d Mon Sep 17 00:00:00 2001 From: elmattic Date: Fri, 20 Jun 2025 14:58:51 +0200 Subject: [PATCH 09/29] Check for logs --- src/rpc/methods/eth/pubsub.rs | 154 ++++++++++++++++++++++------------ 1 file changed, 101 insertions(+), 53 deletions(-) diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index f2f5cb66b025..5c13f1c32a58 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -62,40 +62,46 @@ use crate::rpc::Ctx; use fvm_ipld_blockstore::Blockstore; use itertools::Itertools; -use tokio::sync::broadcast::error::RecvError; +use tokio::sync::broadcast::{Receiver as Subscriber, error::RecvError}; pub const ETH_SUBSCRIPTION: &str = "eth_subscription"; +const NEW_HEADS: &str = "newHeads"; +const PENDING_TXS: &str = "pendingTransactions"; +const LOGS: &str = "logs"; + pub async fn eth_subscribe( params: jsonrpsee::types::Params<'static>, pending: jsonrpsee::core::server::PendingSubscriptionSink, ctx: Ctx, _ext: http::Extensions, ) -> impl jsonrpsee::IntoSubscriptionCloseResponse { - let event_types = match params.parse::>() { + let (first, event_types) = match params.parse::>() { Ok(v) => { if let Some(event) = v.first() { - if event != "newHeads" { - pending - .reject(jsonrpsee::types::ErrorObjectOwned::owned( - 1, - format!("unsupported event type: {}", event), - None::, - )) - .await; - return Ok(()); + match event.as_str() { + NEW_HEADS | PENDING_TXS | LOGS => (event.to_string(), v), + _ => { + pending + .reject(jsonrpsee::types::ErrorObjectOwned::owned( + 1, + format!("unsupported event type: {}", event), + None::, + )) + .await; + return Ok(()); + } } } else { pending .reject(jsonrpsee::types::ErrorObjectOwned::owned( 1, - "decoding params: expected 1 param, got 0".to_string(), + "decoding params: expected 1 or 2 params, got 0".to_string(), None::, )) .await; return Ok(()); } - v } Err(e) => { pending @@ -108,58 +114,100 @@ pub async fn eth_subscribe( return Ok(()); } }; - // `event_types` is one OR more of: + // `event_types` is one of: // - "newHeads": notify when new blocks arrive // - "pendingTransactions": notify when new messages arrive in the message pool // - "logs": notify new event logs that match a criteria tracing::trace!("Subscribing to events: [{}]", event_types.iter().join(",")); - let mut receiver = crate::rpc::new_heads(&ctx); - - tokio::spawn(async move { - // Mark the subscription is accepted after the params has been parsed successful. - // This is actually responds the underlying RPC method call and may fail if the - // connection is closed. - let sink = pending.accept().await.unwrap(); - - tracing::trace!( - "Subscription task started (id: {:?})", - sink.subscription_id() - ); - - loop { - tokio::select! { - action = receiver.recv() => { - match action { - Ok(v) => { - match jsonrpsee::SubscriptionMessage::new("eth_subscription", sink.subscription_id(), &v) { - Ok(msg) => { - // This fails only if the connection is closed - if sink.send(msg).await.is_err() { - break; - } - } - Err(e) => { - tracing::error!("Failed to serialize message: {:?}", e); + match (first.as_str(), event_types.get(1)) { + (NEW_HEADS, None) => { + // Spawn newHeads task + let new_heads = crate::rpc::new_heads(&ctx); + + tokio::spawn(async move { + // Mark the subscription is accepted after the params has been parsed successful. + // This is actually responds the underlying RPC method call and may fail if the + // connection is closed. + let sink = pending.accept().await.unwrap(); + + tracing::trace!( + "Subscription task started (id: {:?})", + sink.subscription_id() + ); + + handle_subscription(new_heads, sink).await; + }); + } + (LOGS, filter) => (), + _ => (), + } + + Ok(()) +} + +async fn handle_subscription(mut subscriber: Subscriber, sink: jsonrpsee::SubscriptionSink) +where + T: serde::Serialize + Clone, +{ + loop { + tokio::select! { + action = subscriber.recv() => { + match action { + Ok(v) => { + match jsonrpsee::SubscriptionMessage::new("eth_subscription", sink.subscription_id(), &v) { + Ok(msg) => { + // This fails only if the connection is closed + if sink.send(msg).await.is_err() { break; } } - } - Err(RecvError::Closed) => { - break; - } - Err(RecvError::Lagged(_)) => { + Err(e) => { + tracing::error!("Failed to serialize message: {:?}", e); + break; + } } } - } - _ = sink.closed() => { - break; + Err(RecvError::Closed) => { + break; + } + Err(RecvError::Lagged(_)) => { + } } } + _ = sink.closed() => { + break; + } } + } - tracing::trace!("Subscription task ended (id: {:?})", sink.subscription_id()); - }); - - Ok(()) + tracing::trace!("Subscription task ended (id: {:?})", sink.subscription_id()); } + +// fn pending_txs( +// ctx: &Ctx, +// ) -> Subscriber> { +// let (sender, receiver) = broadcast::channel(100); + +// let mut subscriber = ctx.mpool.api.subscribe_head_changes(); + +// let task_mpool = ctx.mpool.clone(); + +// tokio::spawn(async move { +// while let Ok(v) = subscriber.recv().await { +// let messages = match v { +// HeadChange::Apply(_) => { +// let local_msgs = task_mpool.local_msgs.write(); +// let pending = local_msgs.iter().cloned().collect::>(); +// pending +// } +// }; + +// if sender.send(messages).is_err() { +// break; +// } +// } +// }); + +// receiver +// } From 55225b02f770165ebeb39337bd6c2c6465e260c9 Mon Sep 17 00:00:00 2001 From: elmattic Date: Fri, 20 Jun 2025 15:48:05 +0200 Subject: [PATCH 10/29] Add eth_logs --- src/rpc/methods/chain.rs | 46 +++++++++++++++++++++++++++++++++++ src/rpc/methods/eth.rs | 2 +- src/rpc/methods/eth/pubsub.rs | 21 ++++++++++++++-- 3 files changed, 66 insertions(+), 3 deletions(-) diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 8bd1177fbc94..c16a34b20b13 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -63,6 +63,52 @@ pub(crate) fn new_heads(data: &crate::rpc::RPCState) -> Subs receiver } +use crate::rpc::eth::eth_logs_for_block_and_transaction; +use crate::rpc::eth::types::EthHash; +use crate::rpc::eth::{EthLog, types::EthFilterSpec}; + +/// | Field | Supported in `eth_getLogs` | Supported in `eth_subscribe` | Notes | +/// |-------------|----------------------------|------------------------------|--------------------------------------------| +/// | `address` | Yes | Yes | Can be a single address or an array | +/// | `topics` | Yes | Yes | Same topic filtering rules apply | +/// | `fromBlock` | Yes | No | Not relevant for real-time subscriptions | +/// | `toBlock` | Yes | No | Same as above | +/// | `blockhash` | Yes | No | Specific to a single block, not for streams| +/// +pub(crate) fn logs( + ctx: &Ctx, + filter: Option, +) -> Subscriber> { + let (sender, receiver) = broadcast::channel(100); + + let mut subscriber = ctx.chain_store().publisher().subscribe(); + + let ctx = ctx.clone(); + + tokio::spawn(async move { + while let Ok(v) = subscriber.recv().await { + let logs = match v { + HeadChange::Apply(ts) => { + let logs = eth_logs_for_block_and_transaction( + &ctx, + &ts, + &EthHash::default(), + &EthHash::default(), + ) + .await + .unwrap(); + logs + } + }; + if sender.send(logs).is_err() { + break; + } + } + }); + + receiver +} + pub enum ChainGetMessage {} impl RpcMethod<1> for ChainGetMessage { const NAME: &'static str = "Filecoin.ChainGetMessage"; diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 4b8dccd6a64c..11cf14b113d8 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -1327,7 +1327,7 @@ async fn new_eth_tx_receipt( Ok(tx_receipt) } -async fn eth_logs_for_block_and_transaction( +pub async fn eth_logs_for_block_and_transaction( ctx: &Ctx, ts: &Arc, block_hash: &EthHash, diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index 5c13f1c32a58..39bcf037536f 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -70,7 +70,7 @@ const NEW_HEADS: &str = "newHeads"; const PENDING_TXS: &str = "pendingTransactions"; const LOGS: &str = "logs"; -pub async fn eth_subscribe( +pub async fn eth_subscribe( params: jsonrpsee::types::Params<'static>, pending: jsonrpsee::core::server::PendingSubscriptionSink, ctx: Ctx, @@ -139,7 +139,24 @@ pub async fn eth_subscribe( handle_subscription(new_heads, sink).await; }); } - (LOGS, filter) => (), + (LOGS, filter) => { + // Spawn logs task + let logs = crate::rpc::chain::logs(&ctx, None); + + tokio::spawn(async move { + // Mark the subscription is accepted after the params has been parsed successful. + // This is actually responds the underlying RPC method call and may fail if the + // connection is closed. + let sink = pending.accept().await.unwrap(); + + tracing::trace!( + "Subscription task started (id: {:?})", + sink.subscription_id() + ); + + handle_subscription(logs, sink).await; + }); + } _ => (), } From 562a63cff9e795358f36bb424404c29d34eb2546 Mon Sep 17 00:00:00 2001 From: elmattic Date: Fri, 20 Jun 2025 17:12:01 +0200 Subject: [PATCH 11/29] Refactor --- src/rpc/methods/chain.rs | 15 +++-------- src/rpc/methods/eth.rs | 51 +++++++++++++++++++++++++++++++---- src/rpc/methods/eth/pubsub.rs | 18 ++++++++++--- 3 files changed, 65 insertions(+), 19 deletions(-) diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index c16a34b20b13..c888c8461708 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -63,9 +63,7 @@ pub(crate) fn new_heads(data: &crate::rpc::RPCState) -> Subs receiver } -use crate::rpc::eth::eth_logs_for_block_and_transaction; -use crate::rpc::eth::types::EthHash; -use crate::rpc::eth::{EthLog, types::EthFilterSpec}; +use crate::rpc::eth::{EthLog, eth_logs_with_filter, types::EthFilterSpec}; /// | Field | Supported in `eth_getLogs` | Supported in `eth_subscribe` | Notes | /// |-------------|----------------------------|------------------------------|--------------------------------------------| @@ -89,14 +87,9 @@ pub(crate) fn logs( while let Ok(v) = subscriber.recv().await { let logs = match v { HeadChange::Apply(ts) => { - let logs = eth_logs_for_block_and_transaction( - &ctx, - &ts, - &EthHash::default(), - &EthHash::default(), - ) - .await - .unwrap(); + let logs = eth_logs_with_filter(&ctx, &ts, filter.clone(), None) + .await + .unwrap(); logs } }; diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 11cf14b113d8..845e248d960e 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -1327,6 +1327,35 @@ async fn new_eth_tx_receipt( Ok(tx_receipt) } +// pub async fn eth_logs_for_block_and_transaction( +// ctx: &Ctx, +// ts: &Arc, +// block_hash: &EthHash, +// tx_hash: &EthHash, +// ) -> anyhow::Result> { +// let spec = EthFilterSpec { +// block_hash: Some(block_hash.clone()), +// ..Default::default() +// }; + +// let mut events = vec![]; +// EthEventHandler::collect_events( +// ctx, +// ts, +// Some(&spec), +// SkipEvent::OnUnresolvedAddress, +// &mut events, +// ) +// .await?; + +// let logs = eth_filter_logs_from_events(ctx, &events)?; +// let out: Vec<_> = logs +// .into_iter() +// .filter(|log| &log.transaction_hash == tx_hash) +// .collect(); +// Ok(out) +// } + pub async fn eth_logs_for_block_and_transaction( ctx: &Ctx, ts: &Arc, @@ -1338,21 +1367,33 @@ pub async fn eth_logs_for_block_and_transaction( + ctx: &Ctx, + ts: &Arc, + spec: Option, + tx_hash: Option<&EthHash>, +) -> anyhow::Result> { let mut events = vec![]; EthEventHandler::collect_events( ctx, ts, - Some(&spec), + spec.as_ref(), SkipEvent::OnUnresolvedAddress, &mut events, ) .await?; let logs = eth_filter_logs_from_events(ctx, &events)?; - let out: Vec<_> = logs - .into_iter() - .filter(|log| &log.transaction_hash == tx_hash) - .collect(); + let out: Vec<_> = match tx_hash { + Some(hash) => logs + .into_iter() + .filter(|log| &log.transaction_hash == hash) + .collect(), + None => logs, // no tx hash, keep all logs + }; Ok(out) } diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index 39bcf037536f..f2681d0bafdd 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -59,11 +59,16 @@ //! ``` //! -use crate::rpc::Ctx; +use std::str::FromStr; + use fvm_ipld_blockstore::Blockstore; use itertools::Itertools; use tokio::sync::broadcast::{Receiver as Subscriber, error::RecvError}; +use crate::rpc::Ctx; +use crate::rpc::eth::EthFilterSpec; +use crate::rpc::eth::types::{EthAddress, EthAddressList}; + pub const ETH_SUBSCRIPTION: &str = "eth_subscription"; const NEW_HEADS: &str = "newHeads"; @@ -139,9 +144,16 @@ pub async fn eth_subscribe( handle_subscription(new_heads, sink).await; }); } - (LOGS, filter) => { + (LOGS, _filter) => { + let spec = EthFilterSpec { + address: EthAddressList::Single( + EthAddress::from_str("0x6c3f61ba9b4abe943bb61bf1f28b79e3f8018b0e").unwrap(), + ), + ..Default::default() + }; + // Spawn logs task - let logs = crate::rpc::chain::logs(&ctx, None); + let logs = crate::rpc::chain::logs(&ctx, Some(spec)); tokio::spawn(async move { // Mark the subscription is accepted after the params has been parsed successful. From 0cd8d86593b932d7749308e1a80dfb6982291892 Mon Sep 17 00:00:00 2001 From: elmattic Date: Tue, 24 Jun 2025 12:47:37 +0200 Subject: [PATCH 12/29] Send event only if logs is non-empty --- src/rpc/methods/chain.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index c888c8461708..e9e280360aa2 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -93,8 +93,10 @@ pub(crate) fn logs( logs } }; - if sender.send(logs).is_err() { - break; + if !logs.is_empty() { + if sender.send(logs).is_err() { + break; + } } } }); From aa7a4f4a6e6230091f80dd41bb6716338734f0aa Mon Sep 17 00:00:00 2001 From: elmattic Date: Wed, 25 Jun 2025 14:03:24 +0200 Subject: [PATCH 13/29] Rework params parser --- src/rpc/methods/eth/pubsub.rs | 130 +++++++++++++++++++++------------- src/rpc/methods/eth/types.rs | 13 ++++ 2 files changed, 93 insertions(+), 50 deletions(-) diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index f2681d0bafdd..f4f341b92a8e 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -59,55 +59,93 @@ //! ``` //! -use std::str::FromStr; +use std::fmt; use fvm_ipld_blockstore::Blockstore; -use itertools::Itertools; +use serde::de::{self, Deserializer, SeqAccess, Visitor}; +use serde::{Deserialize, Serialize}; use tokio::sync::broadcast::{Receiver as Subscriber, error::RecvError}; use crate::rpc::Ctx; -use crate::rpc::eth::EthFilterSpec; -use crate::rpc::eth::types::{EthAddress, EthAddressList}; +use crate::rpc::eth::types::EthAddressList; +use crate::rpc::eth::{EthFilterSpec, EthTopicSpec}; pub const ETH_SUBSCRIPTION: &str = "eth_subscription"; const NEW_HEADS: &str = "newHeads"; -const PENDING_TXS: &str = "pendingTransactions"; +const PENDING_TRANSACTIONS: &str = "pendingTransactions"; const LOGS: &str = "logs"; +#[derive(Default, Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct LogFilter { + pub address: EthAddressList, + pub topics: Option, +} + +#[derive(Debug)] +enum Subscription { + NewHeads, + PendingTransactions, + Logs(Option), +} + +impl<'de> Deserialize<'de> for Subscription { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct SubscriptionVisitor; + + impl<'de> Visitor<'de> for SubscriptionVisitor { + type Value = Subscription; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str(r#"a JSON array like ["logs", {...}] or ["newHeads"]"#) + } + + fn visit_seq(self, mut seq: V) -> Result + where + V: SeqAccess<'de>, + { + let event_type: String = seq + .next_element()? + .ok_or_else(|| de::Error::invalid_length(0, &self))?; + + match event_type.as_str() { + NEW_HEADS => { + if seq.next_element::()?.is_some() { + return Err(de::Error::custom("unsupported event type")); + } + Ok(Subscription::NewHeads) + } + PENDING_TRANSACTIONS => { + if seq.next_element::()?.is_some() { + return Err(de::Error::custom("unsupported event type")); + } + Ok(Subscription::PendingTransactions) + } + LOGS => Ok(Subscription::Logs(seq.next_element()?)), + _ => Err(de::Error::unknown_variant( + &event_type, + &[NEW_HEADS, PENDING_TRANSACTIONS, LOGS], + )), + } + } + } + + deserializer.deserialize_seq(SubscriptionVisitor) + } +} + pub async fn eth_subscribe( params: jsonrpsee::types::Params<'static>, pending: jsonrpsee::core::server::PendingSubscriptionSink, ctx: Ctx, _ext: http::Extensions, ) -> impl jsonrpsee::IntoSubscriptionCloseResponse { - let (first, event_types) = match params.parse::>() { - Ok(v) => { - if let Some(event) = v.first() { - match event.as_str() { - NEW_HEADS | PENDING_TXS | LOGS => (event.to_string(), v), - _ => { - pending - .reject(jsonrpsee::types::ErrorObjectOwned::owned( - 1, - format!("unsupported event type: {}", event), - None::, - )) - .await; - return Ok(()); - } - } - } else { - pending - .reject(jsonrpsee::types::ErrorObjectOwned::owned( - 1, - "decoding params: expected 1 or 2 params, got 0".to_string(), - None::, - )) - .await; - return Ok(()); - } - } + let subscription: Subscription = match params.parse() { + Ok(sub) => sub, Err(e) => { pending .reject(jsonrpsee::types::ErrorObjectOwned::from(e)) @@ -119,14 +157,11 @@ pub async fn eth_subscribe( return Ok(()); } }; - // `event_types` is one of: - // - "newHeads": notify when new blocks arrive - // - "pendingTransactions": notify when new messages arrive in the message pool - // - "logs": notify new event logs that match a criteria - tracing::trace!("Subscribing to events: [{}]", event_types.iter().join(",")); - - match (first.as_str(), event_types.get(1)) { - (NEW_HEADS, None) => { + + tracing::trace!("Subscribing to event: {:?}", subscription); + + match subscription { + Subscription::NewHeads => { // Spawn newHeads task let new_heads = crate::rpc::new_heads(&ctx); @@ -144,16 +179,11 @@ pub async fn eth_subscribe( handle_subscription(new_heads, sink).await; }); } - (LOGS, _filter) => { - let spec = EthFilterSpec { - address: EthAddressList::Single( - EthAddress::from_str("0x6c3f61ba9b4abe943bb61bf1f28b79e3f8018b0e").unwrap(), - ), - ..Default::default() - }; + Subscription::Logs(filter) => { + let filter_spec: Option = filter.map(Into::into); // Spawn logs task - let logs = crate::rpc::chain::logs(&ctx, Some(spec)); + let logs = crate::rpc::chain::logs(&ctx, filter_spec); tokio::spawn(async move { // Mark the subscription is accepted after the params has been parsed successful. @@ -162,14 +192,14 @@ pub async fn eth_subscribe( let sink = pending.accept().await.unwrap(); tracing::trace!( - "Subscription task started (id: {:?})", + "Logs subscription task started (id: {:?})", sink.subscription_id() ); handle_subscription(logs, sink).await; }); } - _ => (), + Subscription::PendingTransactions => (), } Ok(()) diff --git a/src/rpc/methods/eth/types.rs b/src/rpc/methods/eth/types.rs index b11e18388f8a..cb6d94e466e7 100644 --- a/src/rpc/methods/eth/types.rs +++ b/src/rpc/methods/eth/types.rs @@ -3,6 +3,7 @@ use super::*; use crate::blocks::CachingBlockHeader; +use crate::rpc::eth::pubsub::LogFilter; use anyhow::ensure; use ipld_core::serde::SerdeError; use jsonrpsee::core::traits::IdProvider; @@ -526,6 +527,18 @@ pub struct EthFilterSpec { } lotus_json_with_self!(EthFilterSpec); +impl From for EthFilterSpec { + fn from(filter: LogFilter) -> Self { + EthFilterSpec { + from_block: None, + to_block: None, + block_hash: None, + address: filter.address, + topics: filter.topics, + } + } +} + /// `EthFilterResult` represents the response from executing a filter: /// - A list of block hashes /// - A list of transaction hashes From 09c09c037ccd3aca51888ccc5a1e6ea8af0c97f9 Mon Sep 17 00:00:00 2001 From: elmattic Date: Wed, 25 Jun 2025 14:21:27 +0200 Subject: [PATCH 14/29] Fix lint errors --- src/rpc/methods/chain.rs | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index e9e280360aa2..b948eeb25af5 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -86,17 +86,12 @@ pub(crate) fn logs( tokio::spawn(async move { while let Ok(v) = subscriber.recv().await { let logs = match v { - HeadChange::Apply(ts) => { - let logs = eth_logs_with_filter(&ctx, &ts, filter.clone(), None) - .await - .unwrap(); - logs - } + HeadChange::Apply(ts) => eth_logs_with_filter(&ctx, &ts, filter.clone(), None) + .await + .unwrap(), }; - if !logs.is_empty() { - if sender.send(logs).is_err() { - break; - } + if !logs.is_empty() && sender.send(logs).is_err() { + break; } } }); From 12e458ae4bd30a94b9a50a5ec1d5e2933364f8d2 Mon Sep 17 00:00:00 2001 From: elmattic Date: Tue, 15 Jul 2025 19:00:11 +0200 Subject: [PATCH 15/29] Refactor --- src/rpc/methods/eth.rs | 34 ++-------------------------------- src/rpc/methods/eth/pubsub.rs | 32 +++----------------------------- 2 files changed, 5 insertions(+), 61 deletions(-) diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 96c93c3b3e8a..f3856ca68660 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -1326,35 +1326,6 @@ async fn new_eth_tx_receipt( Ok(tx_receipt) } -// pub async fn eth_logs_for_block_and_transaction( -// ctx: &Ctx, -// ts: &Arc, -// block_hash: &EthHash, -// tx_hash: &EthHash, -// ) -> anyhow::Result> { -// let spec = EthFilterSpec { -// block_hash: Some(block_hash.clone()), -// ..Default::default() -// }; - -// let mut events = vec![]; -// EthEventHandler::collect_events( -// ctx, -// ts, -// Some(&spec), -// SkipEvent::OnUnresolvedAddress, -// &mut events, -// ) -// .await?; - -// let logs = eth_filter_logs_from_events(ctx, &events)?; -// let out: Vec<_> = logs -// .into_iter() -// .filter(|log| &log.transaction_hash == tx_hash) -// .collect(); -// Ok(out) -// } - pub async fn eth_logs_for_block_and_transaction( ctx: &Ctx, ts: &Arc, @@ -1386,14 +1357,13 @@ pub async fn eth_logs_with_filter( .await?; let logs = eth_filter_logs_from_events(ctx, &events)?; - let out: Vec<_> = match tx_hash { + Ok(match tx_hash { Some(hash) => logs .into_iter() .filter(|log| &log.transaction_hash == hash) .collect(), None => logs, // no tx hash, keep all logs - }; - Ok(out) + }) } fn get_signed_message(ctx: &Ctx, message_cid: Cid) -> Result { diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index f4f341b92a8e..5899278411aa 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -199,7 +199,9 @@ pub async fn eth_subscribe( handle_subscription(logs, sink).await; }); } - Subscription::PendingTransactions => (), + Subscription::PendingTransactions => { + // TODO(akaladarshi): https://github.com/ChainSafe/forest/pull/5782 + } } Ok(()) @@ -242,31 +244,3 @@ where tracing::trace!("Subscription task ended (id: {:?})", sink.subscription_id()); } - -// fn pending_txs( -// ctx: &Ctx, -// ) -> Subscriber> { -// let (sender, receiver) = broadcast::channel(100); - -// let mut subscriber = ctx.mpool.api.subscribe_head_changes(); - -// let task_mpool = ctx.mpool.clone(); - -// tokio::spawn(async move { -// while let Ok(v) = subscriber.recv().await { -// let messages = match v { -// HeadChange::Apply(_) => { -// let local_msgs = task_mpool.local_msgs.write(); -// let pending = local_msgs.iter().cloned().collect::>(); -// pending -// } -// }; - -// if sender.send(messages).is_err() { -// break; -// } -// } -// }); - -// receiver -// } From 5d0ed6c414746c2679c82e827df456cb88a270a3 Mon Sep 17 00:00:00 2001 From: elmattic Date: Wed, 16 Jul 2025 11:14:58 +0200 Subject: [PATCH 16/29] Update CHANGELOG --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a25d2eec3449..766d45b7ed87 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,8 @@ ### Added +- [#4976](https://github.com/ChainSafe/forest/issues/4976) Add support for the `Filecoin.EthSubscribe` and `Filecoin.EthUnsubscribe` API methods to enable subscriptions to Ethereum event types: `heads` and `logs`. + - [#5739](https://github.com/ChainSafe/forest/issues/5739) Add `--export-mode` flag to the `forest-tool archive sync-bucket` subcommand. This allows exporting and uploading only the required files. - [#5778](https://github.com/ChainSafe/forest/pull/5778) Feat generate a detailed test report in `api compare` command through `--report-dir` and `--report-mode` From e41cfa1e218d9a9805c2357ec886364c669ccb3b Mon Sep 17 00:00:00 2001 From: elmattic Date: Wed, 16 Jul 2025 11:58:45 +0200 Subject: [PATCH 17/29] Update ignore list --- src/tool/subcommands/api_cmd/test_snapshots_ignored.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/tool/subcommands/api_cmd/test_snapshots_ignored.txt b/src/tool/subcommands/api_cmd/test_snapshots_ignored.txt index d665bb42e907..a577cfa29abe 100644 --- a/src/tool/subcommands/api_cmd/test_snapshots_ignored.txt +++ b/src/tool/subcommands/api_cmd/test_snapshots_ignored.txt @@ -18,6 +18,8 @@ Filecoin.EthGetFilterChanges Filecoin.EthGetFilterLogs Filecoin.EthSendRawTransaction Filecoin.EthSyncing +Filecoin.EthSubscribe +Filecoin.EthUnsubscribe Filecoin.F3GetCertificate Filecoin.F3GetECPowerTable Filecoin.F3GetF3PowerTable From 28f7a9424aeaf461386073789bb86657eb8f91fb Mon Sep 17 00:00:00 2001 From: elmattic Date: Wed, 16 Jul 2025 12:20:38 +0200 Subject: [PATCH 18/29] Remove unwraps --- src/rpc/methods/eth/pubsub.rs | 40 ++++++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index 5899278411aa..791c2ead78bb 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -162,15 +162,21 @@ pub async fn eth_subscribe( match subscription { Subscription::NewHeads => { + // Mark the subscription is accepted after the params has been parsed successful. + // This is actually responds the underlying RPC method call and may fail if the + // connection is closed. + let sink = match pending.accept().await { + Ok(sink) => sink, + Err(e) => { + tracing::error!("Failed to accept subscription: {:?}", e); + return Ok(()); + } + }; + // Spawn newHeads task let new_heads = crate::rpc::new_heads(&ctx); tokio::spawn(async move { - // Mark the subscription is accepted after the params has been parsed successful. - // This is actually responds the underlying RPC method call and may fail if the - // connection is closed. - let sink = pending.accept().await.unwrap(); - tracing::trace!( "Subscription task started (id: {:?})", sink.subscription_id() @@ -180,17 +186,23 @@ pub async fn eth_subscribe( }); } Subscription::Logs(filter) => { + // Mark the subscription is accepted after the params has been parsed successful. + // This is actually responds the underlying RPC method call and may fail if the + // connection is closed. + let sink = match pending.accept().await { + Ok(sink) => sink, + Err(e) => { + tracing::error!("Failed to accept subscription: {:?}", e); + return Ok(()); + } + }; + let filter_spec: Option = filter.map(Into::into); // Spawn logs task let logs = crate::rpc::chain::logs(&ctx, filter_spec); tokio::spawn(async move { - // Mark the subscription is accepted after the params has been parsed successful. - // This is actually responds the underlying RPC method call and may fail if the - // connection is closed. - let sink = pending.accept().await.unwrap(); - tracing::trace!( "Logs subscription task started (id: {:?})", sink.subscription_id() @@ -201,6 +213,14 @@ pub async fn eth_subscribe( } Subscription::PendingTransactions => { // TODO(akaladarshi): https://github.com/ChainSafe/forest/pull/5782 + pending + .reject(jsonrpsee::types::ErrorObjectOwned::owned( + jsonrpsee::types::error::METHOD_NOT_FOUND_CODE, + "pendingTransactions subscription not yet implemented", + None::<()>, + )) + .await; + return Ok(()); } } From 191020685c33d3ad9d55d890fdfe096808f4f465 Mon Sep 17 00:00:00 2001 From: elmattic Date: Wed, 16 Jul 2025 14:19:45 +0200 Subject: [PATCH 19/29] Remove unwrap --- src/rpc/methods/chain.rs | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index e53f2dc82610..ae8e6be27ded 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -88,13 +88,19 @@ pub(crate) fn logs( tokio::spawn(async move { while let Ok(v) = subscriber.recv().await { - let logs = match v { - HeadChange::Apply(ts) => eth_logs_with_filter(&ctx, &ts, filter.clone(), None) - .await - .unwrap(), - }; - if !logs.is_empty() && sender.send(logs).is_err() { - break; + match v { + HeadChange::Apply(ts) => { + match eth_logs_with_filter(&ctx, &ts, filter.clone(), None).await { + Ok(logs) => { + if !logs.is_empty() && sender.send(logs).is_err() { + break; + } + } + Err(e) => { + tracing::error!("Failed to fetch logs for tipset {}: {}", ts.key(), e); + } + } + } } } }); From de8213c5eb72d8a0ea3ac8076b0ef87f1d635984 Mon Sep 17 00:00:00 2001 From: elmattic Date: Wed, 16 Jul 2025 15:18:52 +0200 Subject: [PATCH 20/29] Abort subscription associated task --- src/rpc/methods/chain.rs | 15 +++++++++------ src/rpc/methods/eth/pubsub.rs | 16 ++++++++++------ 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index ae8e6be27ded..9b63301134ae 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -46,13 +46,16 @@ use tokio::sync::{ Mutex, broadcast::{self, Receiver as Subscriber}, }; +use tokio::task::JoinHandle; -pub(crate) fn new_heads(data: &crate::rpc::RPCState) -> Subscriber { +pub(crate) fn new_heads( + data: &crate::rpc::RPCState, +) -> (Subscriber, JoinHandle<()>) { let (sender, receiver) = broadcast::channel(100); let mut subscriber = data.chain_store().publisher().subscribe(); - tokio::spawn(async move { + let handle = tokio::spawn(async move { while let Ok(v) = subscriber.recv().await { let headers = match v { HeadChange::Apply(ts) => ApiHeaders(ts.block_headers().clone().into()), @@ -63,7 +66,7 @@ pub(crate) fn new_heads(data: &crate::rpc::RPCState) -> Subs } }); - receiver + (receiver, handle) } use crate::rpc::eth::{EthLog, eth_logs_with_filter, types::EthFilterSpec}; @@ -79,14 +82,14 @@ use crate::rpc::eth::{EthLog, eth_logs_with_filter, types::EthFilterSpec}; pub(crate) fn logs( ctx: &Ctx, filter: Option, -) -> Subscriber> { +) -> (Subscriber>, JoinHandle<()>) { let (sender, receiver) = broadcast::channel(100); let mut subscriber = ctx.chain_store().publisher().subscribe(); let ctx = ctx.clone(); - tokio::spawn(async move { + let handle = tokio::spawn(async move { while let Ok(v) = subscriber.recv().await { match v { HeadChange::Apply(ts) => { @@ -105,7 +108,7 @@ pub(crate) fn logs( } }); - receiver + (receiver, handle) } pub enum ChainGetMessage {} diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index 791c2ead78bb..eba6c267d83b 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -174,7 +174,7 @@ pub async fn eth_subscribe( }; // Spawn newHeads task - let new_heads = crate::rpc::new_heads(&ctx); + let (new_heads, handle) = crate::rpc::new_heads(&ctx); tokio::spawn(async move { tracing::trace!( @@ -182,7 +182,7 @@ pub async fn eth_subscribe( sink.subscription_id() ); - handle_subscription(new_heads, sink).await; + handle_subscription(new_heads, sink, handle).await; }); } Subscription::Logs(filter) => { @@ -200,7 +200,7 @@ pub async fn eth_subscribe( let filter_spec: Option = filter.map(Into::into); // Spawn logs task - let logs = crate::rpc::chain::logs(&ctx, filter_spec); + let (logs, handle) = crate::rpc::chain::logs(&ctx, filter_spec); tokio::spawn(async move { tracing::trace!( @@ -208,7 +208,7 @@ pub async fn eth_subscribe( sink.subscription_id() ); - handle_subscription(logs, sink).await; + handle_subscription(logs, sink, handle).await; }); } Subscription::PendingTransactions => { @@ -227,8 +227,11 @@ pub async fn eth_subscribe( Ok(()) } -async fn handle_subscription(mut subscriber: Subscriber, sink: jsonrpsee::SubscriptionSink) -where +async fn handle_subscription( + mut subscriber: Subscriber, + sink: jsonrpsee::SubscriptionSink, + handle: tokio::task::JoinHandle<()>, +) where T: serde::Serialize + Clone, { loop { @@ -261,6 +264,7 @@ where } } } + handle.abort(); tracing::trace!("Subscription task ended (id: {:?})", sink.subscription_id()); } From 4d5f1cda2b63f6b6bbf66a709650970912aeeeb7 Mon Sep 17 00:00:00 2001 From: elmattic Date: Mon, 28 Jul 2025 10:00:44 +0200 Subject: [PATCH 21/29] Add comments to explain why we need placeholders --- src/rpc/methods/eth.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index f3856ca68660..6374b4d2fa7e 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -2663,6 +2663,11 @@ impl RpcMethod<0> for EthUnsubscribe { type Params = (); type Ok = (); + // This method is a placeholder and is never actually called. + // Subscription handling is performed in `pubsub.rs`. + // + // We still need to implement the `RpcMethod` trait to expose method metadata + // like `NAME`, `NAME_ALIAS`, `PERMISSION`, etc.. async fn handle( _: Ctx, (): Self::Params, @@ -2683,6 +2688,11 @@ impl RpcMethod<0> for EthSubscribe { type Params = (); type Ok = (); + // This method is a placeholder and is never actually called. + // Subscription handling is performed in `pubsub.rs`. + // + // We still need to implement the `RpcMethod` trait to expose method metadata + // like `NAME`, `NAME_ALIAS`, `PERMISSION`, etc.. async fn handle( _: Ctx, (): Self::Params, From 0fe9ce55014c43a7b853911737b198780ef616d9 Mon Sep 17 00:00:00 2001 From: elmattic Date: Mon, 28 Jul 2025 10:29:13 +0200 Subject: [PATCH 22/29] Use a smaller capacity --- src/rpc/methods/chain.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index e5f501132540..861f506b635d 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -48,10 +48,12 @@ use tokio::sync::{ }; use tokio::task::JoinHandle; +const HEAD_CHANNEL_CAPACITY: usize = 10; + pub(crate) fn new_heads( data: &crate::rpc::RPCState, ) -> (Subscriber, JoinHandle<()>) { - let (sender, receiver) = broadcast::channel(100); + let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY); let mut subscriber = data.chain_store().publisher().subscribe(); @@ -83,7 +85,7 @@ pub(crate) fn logs( ctx: &Ctx, filter: Option, ) -> (Subscriber>, JoinHandle<()>) { - let (sender, receiver) = broadcast::channel(100); + let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY); let mut subscriber = ctx.chain_store().publisher().subscribe(); @@ -786,7 +788,7 @@ pub(crate) fn chain_notify( _params: Params<'_>, data: &crate::rpc::RPCState, ) -> Subscriber> { - let (sender, receiver) = broadcast::channel(100); + let (sender, receiver) = broadcast::channel(HEAD_CHANNEL_CAPACITY); // As soon as the channel is created, send the current tipset let current = data.chain_store().heaviest_tipset(); From b986e48d419d6823bf092aaf928049ae32035fe9 Mon Sep 17 00:00:00 2001 From: elmattic Date: Mon, 28 Jul 2025 10:31:04 +0200 Subject: [PATCH 23/29] Remove doc comment --- src/rpc/methods/chain.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 861f506b635d..94ab77d30489 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -73,14 +73,6 @@ pub(crate) fn new_heads( use crate::rpc::eth::{EthLog, eth_logs_with_filter, types::EthFilterSpec}; -/// | Field | Supported in `eth_getLogs` | Supported in `eth_subscribe` | Notes | -/// |-------------|----------------------------|------------------------------|--------------------------------------------| -/// | `address` | Yes | Yes | Can be a single address or an array | -/// | `topics` | Yes | Yes | Same topic filtering rules apply | -/// | `fromBlock` | Yes | No | Not relevant for real-time subscriptions | -/// | `toBlock` | Yes | No | Same as above | -/// | `blockhash` | Yes | No | Specific to a single block, not for streams| -/// pub(crate) fn logs( ctx: &Ctx, filter: Option, From 2e994b80d29149079852bd71604acc979fd02e21 Mon Sep 17 00:00:00 2001 From: elmattic Date: Mon, 28 Jul 2025 10:47:07 +0200 Subject: [PATCH 24/29] Add some doc comments --- src/rpc/methods/chain.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index 94ab77d30489..cc197e6521a4 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -50,6 +50,12 @@ use tokio::task::JoinHandle; const HEAD_CHANNEL_CAPACITY: usize = 10; +/// Subscribes to head changes from the chain store and broadcasts new blocks. +/// +/// # Notes +/// +/// Spawns an internal `tokio` task that can be aborted anytime via the returned `JoinHandle`, +/// allowing manual cleanup if needed. pub(crate) fn new_heads( data: &crate::rpc::RPCState, ) -> (Subscriber, JoinHandle<()>) { @@ -73,6 +79,12 @@ pub(crate) fn new_heads( use crate::rpc::eth::{EthLog, eth_logs_with_filter, types::EthFilterSpec}; +/// Subscribes to head changes from the chain store and broadcasts new `Ethereum` logs. +/// +/// # Notes +/// +/// Spawns an internal `tokio` task that can be aborted anytime via the returned `JoinHandle`, +/// allowing manual cleanup if needed. pub(crate) fn logs( ctx: &Ctx, filter: Option, From 065b89f9e6b2cbadcf53ec17464685fba2b8855f Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Tue, 29 Jul 2025 17:34:36 +0530 Subject: [PATCH 25/29] refactor: use macros to register the subscription APIs (#5782) --- Cargo.lock | 15 ++ Cargo.toml | 2 +- src/rpc/methods/eth.rs | 1 + src/rpc/methods/eth/pubsub.rs | 218 +++++++++------------------- src/rpc/methods/eth/pubsub_trait.rs | 44 ++++++ src/rpc/methods/eth/types.rs | 2 +- src/rpc/mod.rs | 23 +-- 7 files changed, 134 insertions(+), 171 deletions(-) create mode 100644 src/rpc/methods/eth/pubsub_trait.rs diff --git a/Cargo.lock b/Cargo.lock index 2ee5c3f94bf1..e9362c18ef61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4904,10 +4904,12 @@ checksum = "1fba77a59c4c644fd48732367624d1bcf6f409f9c9a286fbc71d2f1fc0b2ea16" dependencies = [ "jsonrpsee-core", "jsonrpsee-http-client", + "jsonrpsee-proc-macros", "jsonrpsee-server", "jsonrpsee-types", "jsonrpsee-ws-client", "tokio", + "tracing", ] [[package]] @@ -4983,6 +4985,19 @@ dependencies = [ "url", ] +[[package]] +name = "jsonrpsee-proc-macros" +version = "0.25.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fa4f5daed39f982a1bb9d15449a28347490ad42b212f8eaa2a2a344a0dce9e9" +dependencies = [ + "heck 0.5.0", + "proc-macro-crate 3.3.0", + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "jsonrpsee-server" version = "0.25.1" diff --git a/Cargo.toml b/Cargo.toml index eb2a11d968f9..3a02b53504cb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -108,7 +108,7 @@ integer-encoding = "4.0" ipld-core = { version = "0.4", features = ["serde", "arb"] } is-terminal = "0.4" itertools = "0.14" -jsonrpsee = { version = "0.25", features = ["server", "ws-client", "http-client"] } +jsonrpsee = { version = "0.25", features = ["server", "ws-client", "http-client", "macros"] } jsonwebtoken = "9" keccak-hash = "0.11" kubert-prometheus-process = "0.2" diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 6374b4d2fa7e..d4df13714659 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -5,6 +5,7 @@ pub(crate) mod errors; mod eth_tx; pub mod filter; pub mod pubsub; +pub(crate) mod pubsub_trait; mod trace; pub mod types; mod utils; diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index eba6c267d83b..ad614d91a4b8 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -59,172 +59,88 @@ //! ``` //! -use std::fmt; - +use crate::rpc::eth::pubsub_trait::{ + EthPubSubApiServer, LogFilter, SubscriptionKind, SubscriptionParams, +}; +use crate::rpc::{RPCState, chain}; use fvm_ipld_blockstore::Blockstore; -use serde::de::{self, Deserializer, SeqAccess, Visitor}; -use serde::{Deserialize, Serialize}; +use jsonrpsee::PendingSubscriptionSink; +use jsonrpsee::core::{SubscriptionError, SubscriptionResult}; +use std::sync::Arc; use tokio::sync::broadcast::{Receiver as Subscriber, error::RecvError}; -use crate::rpc::Ctx; -use crate::rpc::eth::types::EthAddressList; -use crate::rpc::eth::{EthFilterSpec, EthTopicSpec}; - -pub const ETH_SUBSCRIPTION: &str = "eth_subscription"; - -const NEW_HEADS: &str = "newHeads"; -const PENDING_TRANSACTIONS: &str = "pendingTransactions"; -const LOGS: &str = "logs"; - -#[derive(Default, Serialize, Deserialize, Debug, Clone)] -#[serde(rename_all = "camelCase")] -pub struct LogFilter { - pub address: EthAddressList, - pub topics: Option, +pub struct EthPubSub { + ctx: Arc>, } -#[derive(Debug)] -enum Subscription { - NewHeads, - PendingTransactions, - Logs(Option), +impl EthPubSub { + pub fn new(ctx: Arc>) -> Self { + Self { ctx } + } } -impl<'de> Deserialize<'de> for Subscription { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - struct SubscriptionVisitor; - - impl<'de> Visitor<'de> for SubscriptionVisitor { - type Value = Subscription; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str(r#"a JSON array like ["logs", {...}] or ["newHeads"]"#) +#[async_trait::async_trait] +impl EthPubSubApiServer for EthPubSub +where + DB: Blockstore + Send + Sync + 'static, +{ + async fn subscribe( + &self, + pending: PendingSubscriptionSink, + kind: SubscriptionKind, + params: Option, + ) -> SubscriptionResult { + let sink = pending.accept().await?; + let ctx = self.ctx.clone(); + + match kind { + SubscriptionKind::NewHeads => self.handle_new_heads_subscription(sink, ctx).await, + SubscriptionKind::PendingTransactions => { + return Err(SubscriptionError::from( + jsonrpsee::types::ErrorObjectOwned::owned( + jsonrpsee::types::error::METHOD_NOT_FOUND_CODE, + "pendingTransactions subscription not yet implemented", + None::<()>, + ), + )); } - - fn visit_seq(self, mut seq: V) -> Result - where - V: SeqAccess<'de>, - { - let event_type: String = seq - .next_element()? - .ok_or_else(|| de::Error::invalid_length(0, &self))?; - - match event_type.as_str() { - NEW_HEADS => { - if seq.next_element::()?.is_some() { - return Err(de::Error::custom("unsupported event type")); - } - Ok(Subscription::NewHeads) - } - PENDING_TRANSACTIONS => { - if seq.next_element::()?.is_some() { - return Err(de::Error::custom("unsupported event type")); - } - Ok(Subscription::PendingTransactions) - } - LOGS => Ok(Subscription::Logs(seq.next_element()?)), - _ => Err(de::Error::unknown_variant( - &event_type, - &[NEW_HEADS, PENDING_TRANSACTIONS, LOGS], - )), - } + SubscriptionKind::Logs => { + let filter = params.and_then(|p| p.filter); + self.handle_logs_subscription(sink, ctx, filter).await } } - deserializer.deserialize_seq(SubscriptionVisitor) + Ok(()) } } -pub async fn eth_subscribe( - params: jsonrpsee::types::Params<'static>, - pending: jsonrpsee::core::server::PendingSubscriptionSink, - ctx: Ctx, - _ext: http::Extensions, -) -> impl jsonrpsee::IntoSubscriptionCloseResponse { - let subscription: Subscription = match params.parse() { - Ok(sub) => sub, - Err(e) => { - pending - .reject(jsonrpsee::types::ErrorObjectOwned::from(e)) - .await; - // If the subscription has not been "accepted" then - // the return value will be "ignored" as it's not - // allowed to send out any further notifications on - // on the subscription. - return Ok(()); - } - }; - - tracing::trace!("Subscribing to event: {:?}", subscription); - - match subscription { - Subscription::NewHeads => { - // Mark the subscription is accepted after the params has been parsed successful. - // This is actually responds the underlying RPC method call and may fail if the - // connection is closed. - let sink = match pending.accept().await { - Ok(sink) => sink, - Err(e) => { - tracing::error!("Failed to accept subscription: {:?}", e); - return Ok(()); - } - }; - - // Spawn newHeads task - let (new_heads, handle) = crate::rpc::new_heads(&ctx); - - tokio::spawn(async move { - tracing::trace!( - "Subscription task started (id: {:?})", - sink.subscription_id() - ); - - handle_subscription(new_heads, sink, handle).await; - }); - } - Subscription::Logs(filter) => { - // Mark the subscription is accepted after the params has been parsed successful. - // This is actually responds the underlying RPC method call and may fail if the - // connection is closed. - let sink = match pending.accept().await { - Ok(sink) => sink, - Err(e) => { - tracing::error!("Failed to accept subscription: {:?}", e); - return Ok(()); - } - }; - - let filter_spec: Option = filter.map(Into::into); - - // Spawn logs task - let (logs, handle) = crate::rpc::chain::logs(&ctx, filter_spec); - - tokio::spawn(async move { - tracing::trace!( - "Logs subscription task started (id: {:?})", - sink.subscription_id() - ); - - handle_subscription(logs, sink, handle).await; - }); - } - Subscription::PendingTransactions => { - // TODO(akaladarshi): https://github.com/ChainSafe/forest/pull/5782 - pending - .reject(jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::error::METHOD_NOT_FOUND_CODE, - "pendingTransactions subscription not yet implemented", - None::<()>, - )) - .await; - return Ok(()); - } +impl EthPubSub +where + DB: Blockstore + Send + Sync + 'static, +{ + async fn handle_new_heads_subscription( + &self, + accepted_sink: jsonrpsee::SubscriptionSink, + ctx: Arc>, + ) { + let (subscriber, handle) = chain::new_heads(&ctx); + tokio::spawn(async move { + handle_subscription(subscriber, accepted_sink, handle).await; + }); } - Ok(()) + async fn handle_logs_subscription( + &self, + accepted_sink: jsonrpsee::SubscriptionSink, + ctx: Arc>, + filter_spec: Option, + ) { + let filter_spec = filter_spec.map(Into::into); + let (logs, handle) = chain::logs(&ctx, filter_spec); + tokio::spawn(async move { + handle_subscription(logs, accepted_sink, handle).await; + }); + } } async fn handle_subscription( @@ -266,5 +182,5 @@ async fn handle_subscription( } handle.abort(); - tracing::trace!("Subscription task ended (id: {:?})", sink.subscription_id()); + tracing::info!("Subscription task ended (id: {:?})", sink.subscription_id()); } diff --git a/src/rpc/methods/eth/pubsub_trait.rs b/src/rpc/methods/eth/pubsub_trait.rs new file mode 100644 index 000000000000..39bb57f4f0b1 --- /dev/null +++ b/src/rpc/methods/eth/pubsub_trait.rs @@ -0,0 +1,44 @@ +// Copyright 2019-2025 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use crate::rpc::eth::types::{EthAddressList, EthTopicSpec}; +use jsonrpsee::proc_macros::rpc; +use serde::{Deserialize, Serialize}; + +#[rpc(server, namespace = "eth")] +pub trait EthPubSubApi { + /// Subscribe to Ethereum events + #[subscription( + name = "subscribe" => "subscription", + aliases = ["Filecoin.EthSubscribe"], + unsubscribe = "unsubscribe", + unsubscribe_aliases = ["Filecoin.EthUnsubscribe"], + item = serde_json::Value + )] + async fn subscribe( + &self, + kind: SubscriptionKind, + params: Option, + ) -> jsonrpsee::core::SubscriptionResult; +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum SubscriptionKind { + NewHeads, + PendingTransactions, + Logs, +} + +#[derive(Default, Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct LogFilter { + pub address: EthAddressList, + pub topics: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SubscriptionParams { + #[serde(flatten)] + pub filter: Option, +} diff --git a/src/rpc/methods/eth/types.rs b/src/rpc/methods/eth/types.rs index cb6d94e466e7..29087f9a67cb 100644 --- a/src/rpc/methods/eth/types.rs +++ b/src/rpc/methods/eth/types.rs @@ -3,7 +3,7 @@ use super::*; use crate::blocks::CachingBlockHeader; -use crate::rpc::eth::pubsub::LogFilter; +use crate::rpc::eth::pubsub_trait::LogFilter; use anyhow::ensure; use ipld_core::serde::SerdeError; use jsonrpsee::core::traits::IdProvider; diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 3a09f47c956a..c14634d11850 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -1,6 +1,7 @@ // Copyright 2019-2025 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +use crate::rpc::methods::eth::pubsub_trait::EthPubSubApiServer; mod auth_layer; mod channel; mod client; @@ -12,7 +13,6 @@ mod request; mod segregation_layer; mod set_extension_layer; -use crate::rpc::chain::new_heads; use crate::rpc::eth::types::RandomHexStringIdProvider; use crate::shim::clock::ChainEpoch; pub use client::Client; @@ -404,10 +404,7 @@ mod methods { use crate::rpc::auth_layer::AuthLayer; pub use crate::rpc::channel::CANCEL_METHOD_NAME; use crate::rpc::channel::RpcModule as FilRpcModule; -use crate::rpc::eth::{ - EthSubscribe, EthUnsubscribe, - pubsub::{ETH_SUBSCRIPTION, eth_subscribe}, -}; +use crate::rpc::eth::pubsub::EthPubSub; use crate::rpc::metrics_layer::MetricsLayer; use crate::{chain_sync::network_context::SyncNetworkContext, key_management::KeyStore}; @@ -515,19 +512,9 @@ where let keystore = state.keystore.clone(); let mut module = create_module(state.clone()); - // Register `Filecoin.EthSubscribe` and related methods. - module.register_subscription( - EthSubscribe::NAME, - ETH_SUBSCRIPTION, - EthUnsubscribe::NAME, - eth_subscribe, - )?; - if let Some(alias) = EthSubscribe::NAME_ALIAS { - module.register_alias(alias, EthSubscribe::NAME)?; - } - if let Some(alias) = EthUnsubscribe::NAME_ALIAS { - module.register_alias(alias, EthUnsubscribe::NAME)?; - } + // register eth subscription APIs + let eth_pubsub = EthPubSub::new(state.clone()); + module.merge(eth_pubsub.into_rpc())?; let mut pubsub_module = FilRpcModule::default(); From a25d9a73866a6815150a087e69d8672e4aaf163e Mon Sep 17 00:00:00 2001 From: elmattic Date: Wed, 30 Jul 2025 11:49:59 +0200 Subject: [PATCH 26/29] Add actual links in comments --- src/rpc/methods/eth.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index d4df13714659..83ee71bfbd19 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -2665,10 +2665,10 @@ impl RpcMethod<0> for EthUnsubscribe { type Ok = (); // This method is a placeholder and is never actually called. - // Subscription handling is performed in `pubsub.rs`. + // Subscription handling is performed in [`pubsub.rs`](pubsub). // - // We still need to implement the `RpcMethod` trait to expose method metadata - // like `NAME`, `NAME_ALIAS`, `PERMISSION`, etc.. + // We still need to implement the [`RpcMethod`] trait to expose method metadata + // like [`NAME`](Self::NAME), [`NAME_ALIAS`](Self::NAME_ALIAS), [`PERMISSION`](Self::PERMISSION), etc.. async fn handle( _: Ctx, (): Self::Params, @@ -2690,10 +2690,10 @@ impl RpcMethod<0> for EthSubscribe { type Ok = (); // This method is a placeholder and is never actually called. - // Subscription handling is performed in `pubsub.rs`. + // Subscription handling is performed in [`pubsub.rs`](pubsub). // - // We still need to implement the `RpcMethod` trait to expose method metadata - // like `NAME`, `NAME_ALIAS`, `PERMISSION`, etc.. + // We still need to implement the [`RpcMethod`] trait to expose method metadata + // like [`NAME`](Self::NAME), [`NAME_ALIAS`](Self::NAME_ALIAS), [`PERMISSION`](Self::PERMISSION), etc.. async fn handle( _: Ctx, (): Self::Params, From 0e68b0ff9add89d54e25bdc903776dfe792dca84 Mon Sep 17 00:00:00 2001 From: elmattic Date: Wed, 30 Jul 2025 12:06:05 +0200 Subject: [PATCH 27/29] Add error logs --- src/rpc/channel.rs | 4 ++-- src/rpc/methods/eth/pubsub.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/rpc/channel.rs b/src/rpc/channel.rs index 25da40a0f422..22da36412db2 100644 --- a/src/rpc/channel.rs +++ b/src/rpc/channel.rs @@ -367,8 +367,8 @@ impl RpcModule { Ok(msg) => { match create_notif_message(&sink, &msg) { Ok(msg) => { - // This fails only if the connection is closed - if sink.send(msg).await.is_err() { + if let Err(e) = sink.send(msg).await { + tracing::error!("Failed to send message: {:?}", e); break; } } diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index ad614d91a4b8..30bcce1c1d43 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -157,8 +157,8 @@ async fn handle_subscription( Ok(v) => { match jsonrpsee::SubscriptionMessage::new("eth_subscription", sink.subscription_id(), &v) { Ok(msg) => { - // This fails only if the connection is closed - if sink.send(msg).await.is_err() { + if let Err(e) = sink.send(msg).await { + tracing::error!("Failed to send message: {:?}", e); break; } } From c1f4969b1b0cf316a89c6a58bfa5c4735a927661 Mon Sep 17 00:00:00 2001 From: elmattic Date: Thu, 31 Jul 2025 11:20:01 +0200 Subject: [PATCH 28/29] Address CR comments --- src/rpc/methods/chain.rs | 15 ++++++++++----- src/rpc/methods/eth/pubsub.rs | 2 +- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index cc197e6521a4..fbaacac6dfbc 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -16,7 +16,7 @@ use crate::lotus_json::{HasLotusJson, LotusJson, lotus_json_with_self}; #[cfg(test)] use crate::lotus_json::{assert_all_snapshots, assert_unchanged_via_json}; use crate::message::{ChainMessage, SignedMessage}; -use crate::rpc::eth::types::ApiHeaders; +use crate::rpc::eth::{EthLog, eth_logs_with_filter, types::ApiHeaders, types::EthFilterSpec}; use crate::rpc::types::{ApiTipsetKey, Event}; use crate::rpc::{ApiPaths, Ctx, EthEventHandler, Permission, RpcMethod, ServerError}; use crate::shim::clock::ChainEpoch; @@ -77,8 +77,6 @@ pub(crate) fn new_heads( (receiver, handle) } -use crate::rpc::eth::{EthLog, eth_logs_with_filter, types::EthFilterSpec}; - /// Subscribes to head changes from the chain store and broadcasts new `Ethereum` logs. /// /// # Notes @@ -101,8 +99,15 @@ pub(crate) fn logs( HeadChange::Apply(ts) => { match eth_logs_with_filter(&ctx, &ts, filter.clone(), None).await { Ok(logs) => { - if !logs.is_empty() && sender.send(logs).is_err() { - break; + if !logs.is_empty() { + if let Err(e) = sender.send(logs) { + tracing::error!( + "Failed to send logs for tipset {}: {}", + ts.key(), + e + ); + break; + } } } Err(e) => { diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index 30bcce1c1d43..4b954eeb4a3a 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -155,7 +155,7 @@ async fn handle_subscription( action = subscriber.recv() => { match action { Ok(v) => { - match jsonrpsee::SubscriptionMessage::new("eth_subscription", sink.subscription_id(), &v) { + match jsonrpsee::SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &v) { Ok(msg) => { if let Err(e) = sink.send(msg).await { tracing::error!("Failed to send message: {:?}", e); From e3811e867de2da125c2beb248a4915f32a722a49 Mon Sep 17 00:00:00 2001 From: elmattic Date: Thu, 31 Jul 2025 13:44:06 +0200 Subject: [PATCH 29/29] Address comment --- src/rpc/methods/chain.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index fbaacac6dfbc..15cfee927ce3 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -68,7 +68,8 @@ pub(crate) fn new_heads( let headers = match v { HeadChange::Apply(ts) => ApiHeaders(ts.block_headers().clone().into()), }; - if sender.send(headers).is_err() { + if let Err(e) = sender.send(headers) { + tracing::error!("Failed to send headers: {}", e); break; } }