diff --git a/Cargo.lock b/Cargo.lock index 2ee5c3f94bf1..e9362c18ef61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4904,10 +4904,12 @@ checksum = "1fba77a59c4c644fd48732367624d1bcf6f409f9c9a286fbc71d2f1fc0b2ea16" dependencies = [ "jsonrpsee-core", "jsonrpsee-http-client", + "jsonrpsee-proc-macros", "jsonrpsee-server", "jsonrpsee-types", "jsonrpsee-ws-client", "tokio", + "tracing", ] [[package]] @@ -4983,6 +4985,19 @@ dependencies = [ "url", ] +[[package]] +name = "jsonrpsee-proc-macros" +version = "0.25.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fa4f5daed39f982a1bb9d15449a28347490ad42b212f8eaa2a2a344a0dce9e9" +dependencies = [ + "heck 0.5.0", + "proc-macro-crate 3.3.0", + "proc-macro2", + "quote", + "syn 2.0.104", +] + [[package]] name = "jsonrpsee-server" version = "0.25.1" diff --git a/Cargo.toml b/Cargo.toml index eb2a11d968f9..3a02b53504cb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -108,7 +108,7 @@ integer-encoding = "4.0" ipld-core = { version = "0.4", features = ["serde", "arb"] } is-terminal = "0.4" itertools = "0.14" -jsonrpsee = { version = "0.25", features = ["server", "ws-client", "http-client"] } +jsonrpsee = { version = "0.25", features = ["server", "ws-client", "http-client", "macros"] } jsonwebtoken = "9" keccak-hash = "0.11" kubert-prometheus-process = "0.2" diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 6374b4d2fa7e..d4df13714659 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -5,6 +5,7 @@ pub(crate) mod errors; mod eth_tx; pub mod filter; pub mod pubsub; +pub(crate) mod pubsub_trait; mod trace; pub mod types; mod utils; diff --git a/src/rpc/methods/eth/pubsub.rs b/src/rpc/methods/eth/pubsub.rs index eba6c267d83b..ad614d91a4b8 100644 --- a/src/rpc/methods/eth/pubsub.rs +++ b/src/rpc/methods/eth/pubsub.rs @@ -59,172 +59,88 @@ //! ``` //! -use std::fmt; - +use crate::rpc::eth::pubsub_trait::{ + EthPubSubApiServer, LogFilter, SubscriptionKind, SubscriptionParams, +}; +use crate::rpc::{RPCState, chain}; use fvm_ipld_blockstore::Blockstore; -use serde::de::{self, Deserializer, SeqAccess, Visitor}; -use serde::{Deserialize, Serialize}; +use jsonrpsee::PendingSubscriptionSink; +use jsonrpsee::core::{SubscriptionError, SubscriptionResult}; +use std::sync::Arc; use tokio::sync::broadcast::{Receiver as Subscriber, error::RecvError}; -use crate::rpc::Ctx; -use crate::rpc::eth::types::EthAddressList; -use crate::rpc::eth::{EthFilterSpec, EthTopicSpec}; - -pub const ETH_SUBSCRIPTION: &str = "eth_subscription"; - -const NEW_HEADS: &str = "newHeads"; -const PENDING_TRANSACTIONS: &str = "pendingTransactions"; -const LOGS: &str = "logs"; - -#[derive(Default, Serialize, Deserialize, Debug, Clone)] -#[serde(rename_all = "camelCase")] -pub struct LogFilter { - pub address: EthAddressList, - pub topics: Option, +pub struct EthPubSub { + ctx: Arc>, } -#[derive(Debug)] -enum Subscription { - NewHeads, - PendingTransactions, - Logs(Option), +impl EthPubSub { + pub fn new(ctx: Arc>) -> Self { + Self { ctx } + } } -impl<'de> Deserialize<'de> for Subscription { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - struct SubscriptionVisitor; - - impl<'de> Visitor<'de> for SubscriptionVisitor { - type Value = Subscription; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str(r#"a JSON array like ["logs", {...}] or ["newHeads"]"#) +#[async_trait::async_trait] +impl EthPubSubApiServer for EthPubSub +where + DB: Blockstore + Send + Sync + 'static, +{ + async fn subscribe( + &self, + pending: PendingSubscriptionSink, + kind: SubscriptionKind, + params: Option, + ) -> SubscriptionResult { + let sink = pending.accept().await?; + let ctx = self.ctx.clone(); + + match kind { + SubscriptionKind::NewHeads => self.handle_new_heads_subscription(sink, ctx).await, + SubscriptionKind::PendingTransactions => { + return Err(SubscriptionError::from( + jsonrpsee::types::ErrorObjectOwned::owned( + jsonrpsee::types::error::METHOD_NOT_FOUND_CODE, + "pendingTransactions subscription not yet implemented", + None::<()>, + ), + )); } - - fn visit_seq(self, mut seq: V) -> Result - where - V: SeqAccess<'de>, - { - let event_type: String = seq - .next_element()? - .ok_or_else(|| de::Error::invalid_length(0, &self))?; - - match event_type.as_str() { - NEW_HEADS => { - if seq.next_element::()?.is_some() { - return Err(de::Error::custom("unsupported event type")); - } - Ok(Subscription::NewHeads) - } - PENDING_TRANSACTIONS => { - if seq.next_element::()?.is_some() { - return Err(de::Error::custom("unsupported event type")); - } - Ok(Subscription::PendingTransactions) - } - LOGS => Ok(Subscription::Logs(seq.next_element()?)), - _ => Err(de::Error::unknown_variant( - &event_type, - &[NEW_HEADS, PENDING_TRANSACTIONS, LOGS], - )), - } + SubscriptionKind::Logs => { + let filter = params.and_then(|p| p.filter); + self.handle_logs_subscription(sink, ctx, filter).await } } - deserializer.deserialize_seq(SubscriptionVisitor) + Ok(()) } } -pub async fn eth_subscribe( - params: jsonrpsee::types::Params<'static>, - pending: jsonrpsee::core::server::PendingSubscriptionSink, - ctx: Ctx, - _ext: http::Extensions, -) -> impl jsonrpsee::IntoSubscriptionCloseResponse { - let subscription: Subscription = match params.parse() { - Ok(sub) => sub, - Err(e) => { - pending - .reject(jsonrpsee::types::ErrorObjectOwned::from(e)) - .await; - // If the subscription has not been "accepted" then - // the return value will be "ignored" as it's not - // allowed to send out any further notifications on - // on the subscription. - return Ok(()); - } - }; - - tracing::trace!("Subscribing to event: {:?}", subscription); - - match subscription { - Subscription::NewHeads => { - // Mark the subscription is accepted after the params has been parsed successful. - // This is actually responds the underlying RPC method call and may fail if the - // connection is closed. - let sink = match pending.accept().await { - Ok(sink) => sink, - Err(e) => { - tracing::error!("Failed to accept subscription: {:?}", e); - return Ok(()); - } - }; - - // Spawn newHeads task - let (new_heads, handle) = crate::rpc::new_heads(&ctx); - - tokio::spawn(async move { - tracing::trace!( - "Subscription task started (id: {:?})", - sink.subscription_id() - ); - - handle_subscription(new_heads, sink, handle).await; - }); - } - Subscription::Logs(filter) => { - // Mark the subscription is accepted after the params has been parsed successful. - // This is actually responds the underlying RPC method call and may fail if the - // connection is closed. - let sink = match pending.accept().await { - Ok(sink) => sink, - Err(e) => { - tracing::error!("Failed to accept subscription: {:?}", e); - return Ok(()); - } - }; - - let filter_spec: Option = filter.map(Into::into); - - // Spawn logs task - let (logs, handle) = crate::rpc::chain::logs(&ctx, filter_spec); - - tokio::spawn(async move { - tracing::trace!( - "Logs subscription task started (id: {:?})", - sink.subscription_id() - ); - - handle_subscription(logs, sink, handle).await; - }); - } - Subscription::PendingTransactions => { - // TODO(akaladarshi): https://github.com/ChainSafe/forest/pull/5782 - pending - .reject(jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::error::METHOD_NOT_FOUND_CODE, - "pendingTransactions subscription not yet implemented", - None::<()>, - )) - .await; - return Ok(()); - } +impl EthPubSub +where + DB: Blockstore + Send + Sync + 'static, +{ + async fn handle_new_heads_subscription( + &self, + accepted_sink: jsonrpsee::SubscriptionSink, + ctx: Arc>, + ) { + let (subscriber, handle) = chain::new_heads(&ctx); + tokio::spawn(async move { + handle_subscription(subscriber, accepted_sink, handle).await; + }); } - Ok(()) + async fn handle_logs_subscription( + &self, + accepted_sink: jsonrpsee::SubscriptionSink, + ctx: Arc>, + filter_spec: Option, + ) { + let filter_spec = filter_spec.map(Into::into); + let (logs, handle) = chain::logs(&ctx, filter_spec); + tokio::spawn(async move { + handle_subscription(logs, accepted_sink, handle).await; + }); + } } async fn handle_subscription( @@ -266,5 +182,5 @@ async fn handle_subscription( } handle.abort(); - tracing::trace!("Subscription task ended (id: {:?})", sink.subscription_id()); + tracing::info!("Subscription task ended (id: {:?})", sink.subscription_id()); } diff --git a/src/rpc/methods/eth/pubsub_trait.rs b/src/rpc/methods/eth/pubsub_trait.rs new file mode 100644 index 000000000000..39bb57f4f0b1 --- /dev/null +++ b/src/rpc/methods/eth/pubsub_trait.rs @@ -0,0 +1,44 @@ +// Copyright 2019-2025 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use crate::rpc::eth::types::{EthAddressList, EthTopicSpec}; +use jsonrpsee::proc_macros::rpc; +use serde::{Deserialize, Serialize}; + +#[rpc(server, namespace = "eth")] +pub trait EthPubSubApi { + /// Subscribe to Ethereum events + #[subscription( + name = "subscribe" => "subscription", + aliases = ["Filecoin.EthSubscribe"], + unsubscribe = "unsubscribe", + unsubscribe_aliases = ["Filecoin.EthUnsubscribe"], + item = serde_json::Value + )] + async fn subscribe( + &self, + kind: SubscriptionKind, + params: Option, + ) -> jsonrpsee::core::SubscriptionResult; +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum SubscriptionKind { + NewHeads, + PendingTransactions, + Logs, +} + +#[derive(Default, Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct LogFilter { + pub address: EthAddressList, + pub topics: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SubscriptionParams { + #[serde(flatten)] + pub filter: Option, +} diff --git a/src/rpc/methods/eth/types.rs b/src/rpc/methods/eth/types.rs index cb6d94e466e7..29087f9a67cb 100644 --- a/src/rpc/methods/eth/types.rs +++ b/src/rpc/methods/eth/types.rs @@ -3,7 +3,7 @@ use super::*; use crate::blocks::CachingBlockHeader; -use crate::rpc::eth::pubsub::LogFilter; +use crate::rpc::eth::pubsub_trait::LogFilter; use anyhow::ensure; use ipld_core::serde::SerdeError; use jsonrpsee::core::traits::IdProvider; diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 3a09f47c956a..c14634d11850 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -1,6 +1,7 @@ // Copyright 2019-2025 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +use crate::rpc::methods::eth::pubsub_trait::EthPubSubApiServer; mod auth_layer; mod channel; mod client; @@ -12,7 +13,6 @@ mod request; mod segregation_layer; mod set_extension_layer; -use crate::rpc::chain::new_heads; use crate::rpc::eth::types::RandomHexStringIdProvider; use crate::shim::clock::ChainEpoch; pub use client::Client; @@ -404,10 +404,7 @@ mod methods { use crate::rpc::auth_layer::AuthLayer; pub use crate::rpc::channel::CANCEL_METHOD_NAME; use crate::rpc::channel::RpcModule as FilRpcModule; -use crate::rpc::eth::{ - EthSubscribe, EthUnsubscribe, - pubsub::{ETH_SUBSCRIPTION, eth_subscribe}, -}; +use crate::rpc::eth::pubsub::EthPubSub; use crate::rpc::metrics_layer::MetricsLayer; use crate::{chain_sync::network_context::SyncNetworkContext, key_management::KeyStore}; @@ -515,19 +512,9 @@ where let keystore = state.keystore.clone(); let mut module = create_module(state.clone()); - // Register `Filecoin.EthSubscribe` and related methods. - module.register_subscription( - EthSubscribe::NAME, - ETH_SUBSCRIPTION, - EthUnsubscribe::NAME, - eth_subscribe, - )?; - if let Some(alias) = EthSubscribe::NAME_ALIAS { - module.register_alias(alias, EthSubscribe::NAME)?; - } - if let Some(alias) = EthUnsubscribe::NAME_ALIAS { - module.register_alias(alias, EthUnsubscribe::NAME)?; - } + // register eth subscription APIs + let eth_pubsub = EthPubSub::new(state.clone()); + module.merge(eth_pubsub.into_rpc())?; let mut pubsub_module = FilRpcModule::default();