diff --git a/src/rpc/auth_layer.rs b/src/rpc/auth_layer.rs index c708f43581aa..c79e0c838c7d 100644 --- a/src/rpc/auth_layer.rs +++ b/src/rpc/auth_layer.rs @@ -169,6 +169,8 @@ static ACCESS_MAP: Lazy> = Lazy::new(|| { access.insert(eth::ETH_GET_BALANCE, Access::Read); access.insert(eth::EthSyncing::NAME, Access::Read); access.insert(eth::ETH_GET_BLOCK_BY_NUMBER, Access::Read); + access.insert(eth::ETH_SUBSCRIBE, Access::Read); + access.insert(eth::ETH_UNSUBSCRIBE, Access::Read); access.insert(eth::WEB3_CLIENT_VERSION, Access::Read); // Pubsub API diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index ae8ef1ee8871..4928799d7f16 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -680,6 +680,25 @@ pub(crate) fn chain_notify( receiver } +pub(crate) fn new_heads(data: &crate::rpc::RPCState) -> Subscriber { + let (sender, receiver) = broadcast::channel(100); + + let mut subscriber = data.chain_store.publisher().subscribe(); + + tokio::spawn(async move { + while let Ok(v) = subscriber.recv().await { + let headers = match v { + HeadChange::Apply(ts) => ApiHeaders(ts.block_headers().clone().into()), + }; + if sender.send(headers).is_err() { + break; + } + } + }); + + receiver +} + fn load_api_messages_from_tipset( store: &impl Blockstore, tipset: &Tipset, @@ -782,6 +801,9 @@ pub struct ApiHeadChange { pub headers: Vec, } +#[derive(Clone, Serialize, Deserialize, PartialEq, Debug)] +pub struct ApiHeaders(#[serde(with = "crate::lotus_json")] pub Vec); + #[derive(PartialEq, Debug, Serialize, Deserialize, Clone, JsonSchema)] #[serde(rename_all = "snake_case")] pub enum PathChange> { diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index c64f9f804833..6101044c0f76 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -54,6 +54,8 @@ pub const ETH_CHAIN_ID: &str = "Filecoin.EthChainId"; pub const ETH_GAS_PRICE: &str = "Filecoin.EthGasPrice"; pub const ETH_GET_BALANCE: &str = "Filecoin.EthGetBalance"; pub const ETH_GET_BLOCK_BY_NUMBER: &str = "Filecoin.EthGetBlockByNumber"; +pub const ETH_SUBSCRIBE: &str = "Filecoin.EthSubscribe"; +pub const ETH_UNSUBSCRIBE: &str = "Filecoin.EthUnsubscribe"; pub const WEB3_CLIENT_VERSION: &str = "Filecoin.Web3ClientVersion"; const MASKED_ID_PREFIX: [u8; 12] = [0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; @@ -627,6 +629,9 @@ impl HasLotusJson for EthSyncingResult { } } +/// An opaque identifier generated by the node to refer to an active subscription. +pub type SubscriptionID = Hash; + pub async fn eth_accounts() -> Result, ServerError> { // EthAccounts will always return [] since we don't expect Forest to manage private keys Ok(vec![]) diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index ab3018e6e5e5..9c9bc9efbbe2 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -7,6 +7,7 @@ mod client; pub use client::Client; pub use error::ServerError; +use methods::chain::new_heads; use reflect::Ctx; pub use reflect::{ApiVersion, RpcMethod, RpcMethodExt}; mod error; @@ -116,14 +117,18 @@ use crate::rpc::channel::RpcModule as FilRpcModule; pub use crate::rpc::channel::CANCEL_METHOD_NAME; use crate::rpc::state::*; +use ethereum_types::H256; use fvm_ipld_blockstore::Blockstore; use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; use jsonrpsee::{ - core::RegisterMethodError, + core::{traits::IdProvider, RegisterMethodError}, server::{stop_channel, RpcModule, RpcServiceBuilder, Server, StopHandle, TowerServiceBuilder}, + types::SubscriptionId, Methods, }; +use rand::Rng; +use tokio::sync::broadcast::error::RecvError; use tokio::sync::{mpsc, RwLock}; use tower::Service; use tracing::info; @@ -133,6 +138,8 @@ use self::reflect::openrpc_types::ParamStructure; const MAX_REQUEST_BODY_SIZE: u32 = 64 * 1024 * 1024; const MAX_RESPONSE_BODY_SIZE: u32 = MAX_REQUEST_BODY_SIZE; +const ETH_SUBSCRIPTION: &str = "eth_subscription"; + /// This is where you store persistent data, or at least access to stateful /// data. pub struct RPCState { @@ -163,6 +170,25 @@ struct PerConnection { keystore: Arc>, } +#[derive(Debug, Copy, Clone)] +pub struct RandomHexStringIdProvider {} + +impl RandomHexStringIdProvider { + pub fn new() -> Self { + Self {} + } +} + +impl IdProvider for RandomHexStringIdProvider { + fn next_id(&self) -> SubscriptionId<'static> { + let mut bytes = [0u8; 32]; + let mut rng = rand::thread_rng(); + rng.fill(&mut bytes); + + SubscriptionId::Str(format!("{:#x}", H256::from(bytes)).into()) + } +} + pub async fn start_rpc(state: RPCState, rpc_endpoint: SocketAddr) -> anyhow::Result<()> where DB: Blockstore + Send + Sync + 'static, @@ -193,6 +219,7 @@ where // Default size (10 MiB) is not enough for methods like `Filecoin.StateMinerActiveSectors` .max_request_body_size(MAX_REQUEST_BODY_SIZE) .max_response_body_size(MAX_RESPONSE_BODY_SIZE) + .set_id_provider(RandomHexStringIdProvider::new()) .to_service_builder(), keystore, }; @@ -295,6 +322,77 @@ where module.register_method(WEB3_CLIENT_VERSION, move |_, _| { crate::utils::version::FOREST_VERSION_STRING.clone() })?; + module.register_subscription( + ETH_SUBSCRIBE, + ETH_SUBSCRIPTION, + ETH_UNSUBSCRIBE, + |params, pending, ctx| async move { + let event_types = match params.parse::>() { + Ok(v) => v, + Err(e) => { + pending + .reject(jsonrpsee::types::ErrorObjectOwned::from(e)) + .await; + // If the subscription has not been "accepted" then + // the return value will be "ignored" as it's not + // allowed to send out any further notifications on + // on the subscription. + return Ok(()); + } + }; + // `event_types` is one OR more of: + // - "newHeads": notify when new blocks arrive + // - "pendingTransactions": notify when new messages arrive in the message pool + // - "logs": notify new event logs that match a criteria + + tracing::trace!("Subscribing to events: {:?}", event_types); + + let mut receiver = new_heads(&ctx); + tokio::spawn(async move { + // Mark the subscription is accepted after the params has been parsed successful. + // This is actually responds the underlying RPC method call and may fail if the + // connection is closed. + let sink = pending.accept().await.unwrap(); + + tracing::trace!("Subscription started (id: {:?})", sink.subscription_id()); + + loop { + tokio::select! { + action = receiver.recv() => { + match action { + Ok(v) => { + match jsonrpsee::SubscriptionMessage::from_json(&v) { + Ok(msg) => { + // This fails only if the connection is closed + if sink.send(msg).await.is_err() { + break; + } + } + Err(e) => { + tracing::error!("Failed to serialize message: {:?}", e); + break; + } + } + } + Err(RecvError::Closed) => { + break; + } + Err(RecvError::Lagged(_)) => { + } + } + } + _ = sink.closed() => { + break; + } + } + } + + tracing::trace!("Subscription task ended (id: {:?})", sink.subscription_id()); + }); + + Ok(()) + }, + )?; Ok(()) } diff --git a/src/rpc_client/eth_ops.rs b/src/rpc_client/eth_ops.rs index 7343fb221d7d..01894c6d41e5 100644 --- a/src/rpc_client/eth_ops.rs +++ b/src/rpc_client/eth_ops.rs @@ -35,6 +35,10 @@ impl ApiInfo { RpcRequest::new_v1(ETH_GET_BALANCE, (address, block_param)) } + pub fn eth_subscribe_req(event: serde_json::Value) -> RpcRequest { + RpcRequest::new_v1(ETH_SUBSCRIBE, event) + } + pub fn web3_client_version_req() -> RpcRequest { RpcRequest::new_v1(WEB3_CLIENT_VERSION, ()) } diff --git a/src/tool/subcommands/api_cmd.rs b/src/tool/subcommands/api_cmd.rs index 73ef1b5fd8da..f1f1bee1e960 100644 --- a/src/tool/subcommands/api_cmd.rs +++ b/src/tool/subcommands/api_cmd.rs @@ -42,6 +42,7 @@ use fvm_shared2::piece::PaddedPieceSize; use itertools::Itertools as _; use jsonrpsee::types::ErrorCode; use serde::de::DeserializeOwned; +use serde_json::json; use similar::{ChangeTag, TextDiff}; use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, @@ -911,8 +912,10 @@ fn snapshot_tests(store: Arc, n_tipsets: usize) -> anyhow::Result Vec { - let test = RpcTest::identity(ApiInfo::chain_notify_req()).ignore("Not implemented yet"); - vec![test] + vec![ + RpcTest::identity(ApiInfo::chain_notify_req()).ignore("Not implemented yet"), + RpcTest::basic(ApiInfo::eth_subscribe_req(json!(["newHeads"]))), + ] } fn sample_message_cids<'a>(