diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index 22b8029ff8c3..cd61d8c8647c 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -188,7 +188,7 @@ pub struct MessagePool { /// Acts as a signal to republish messages from the republished set of /// messages pub repub_trigger: flume::Sender<()>, - local_msgs: Arc>>, + pub local_msgs: Arc>>, /// Configurable parameters of the message pool pub config: MpoolConfig, /// Chain configuration diff --git a/src/rpc/auth_layer.rs b/src/rpc/auth_layer.rs index d18b3218443c..f5ab416f1d05 100644 --- a/src/rpc/auth_layer.rs +++ b/src/rpc/auth_layer.rs @@ -3,7 +3,7 @@ use crate::auth::{verify_token, JWT_IDENTIFIER}; use crate::key_management::KeyStore; -use crate::rpc::{chain, Permission, RpcMethod as _, CANCEL_METHOD_NAME}; +use crate::rpc::{chain, eth, Permission, RpcMethod as _, CANCEL_METHOD_NAME}; use ahash::{HashMap, HashMapExt as _}; use futures::future::BoxFuture; use futures::FutureExt; @@ -31,6 +31,9 @@ static METHOD_NAME2REQUIRED_PERMISSION: Lazy> = Lazy:: super::for_each_method!(insert); access.insert(chain::CHAIN_NOTIFY, Permission::Read); + access.insert(eth::ETH_SUBSCRIBE, Permission::Read); + access.insert(eth::ETH_SUBSCRIPTION, Permission::Read); + access.insert(eth::ETH_UNSUBSCRIBE, Permission::Read); access.insert(CANCEL_METHOD_NAME, Permission::Read); access diff --git a/src/rpc/methods/chain.rs b/src/rpc/methods/chain.rs index df3ec254be92..6156ca1ba7cc 100644 --- a/src/rpc/methods/chain.rs +++ b/src/rpc/methods/chain.rs @@ -15,6 +15,7 @@ use crate::ipld::DfsIter; use crate::lotus_json::{assert_all_snapshots, assert_unchanged_via_json}; use crate::lotus_json::{lotus_json_with_self, HasLotusJson, LotusJson}; use crate::message::{ChainMessage, SignedMessage}; +use crate::message_pool::Provider; use crate::rpc::types::ApiTipsetKey; use crate::rpc::{ApiPaths, Ctx, Permission, RpcMethod, ServerError}; use crate::shim::clock::ChainEpoch; @@ -669,6 +670,51 @@ pub(crate) fn chain_notify( receiver } +pub(crate) fn new_heads(data: &crate::rpc::RPCState) -> Subscriber { + let (sender, receiver) = broadcast::channel(100); + + let mut subscriber = data.chain_store().publisher().subscribe(); + + tokio::spawn(async move { + while let Ok(v) = subscriber.recv().await { + let headers = match v { + HeadChange::Apply(ts) => ApiHeaders(ts.block_headers().clone().into()), + }; + if sender.send(headers).is_err() { + break; + } + } + }); + + receiver +} + +pub(crate) fn pending_txn( + data: Arc>, +) -> Subscriber> { + let (sender, receiver) = broadcast::channel(100); + + let mut subscriber = data.mpool.api.subscribe_head_changes(); + + tokio::spawn(async move { + while let Ok(v) = subscriber.recv().await { + let messages = match v { + HeadChange::Apply(_ts) => { + let local_msgs = data.mpool.local_msgs.write(); + let pending = local_msgs.iter().cloned().collect::>(); + pending + } + }; + + if sender.send(messages).is_err() { + break; + } + } + }); + + receiver +} + fn load_api_messages_from_tipset( store: &impl Blockstore, tipset: &Tipset, @@ -773,6 +819,9 @@ pub struct ApiHeadChange { } lotus_json_with_self!(ApiHeadChange); +#[derive(Clone, Serialize, Deserialize, PartialEq, Debug)] +pub struct ApiHeaders(#[serde(with = "crate::lotus_json")] pub Vec); + #[derive(PartialEq, Debug, Serialize, Deserialize, Clone, JsonSchema)] #[serde(tag = "Type", content = "Val", rename_all = "snake_case")] pub enum PathChange> { diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index d4a5892a55c7..7ebd19d7c7c5 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -46,6 +46,10 @@ use serde::{Deserialize, Serialize}; use std::str::FromStr; use std::{ops::Add, sync::Arc}; +pub const ETH_SUBSCRIBE: &str = "Filecoin.EthSubscribe"; +pub const ETH_SUBSCRIPTION: &str = "Filecoin.EthSubscription"; +pub const ETH_UNSUBSCRIBE: &str = "Filecoin.EthUnsubscribe"; + const MASKED_ID_PREFIX: [u8; 12] = [0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; /// Ethereum Bloom filter size in bits. diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index af1c4a5a12d5..a682f324e32d 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -9,6 +9,7 @@ mod request; pub use client::Client; pub use error::ServerError; use futures::FutureExt as _; +use methods::chain::{new_heads, pending_txn}; use reflect::Ctx; pub use reflect::{ApiPath, ApiPaths, RpcMethod, RpcMethodExt}; pub use request::Request; @@ -17,6 +18,7 @@ mod reflect; pub mod types; pub use methods::*; use reflect::Permission; +use tokio::sync::broadcast::Receiver; /// Protocol or transport-specific error pub use jsonrpsee::core::ClientError; @@ -300,16 +302,21 @@ use crate::rpc::channel::RpcModule as FilRpcModule; pub use crate::rpc::channel::CANCEL_METHOD_NAME; use crate::blocks::Tipset; +use ethereum_types::H256; use fvm_ipld_blockstore::Blockstore; use jsonrpsee::{ + core::traits::IdProvider, server::{stop_channel, RpcModule, RpcServiceBuilder, Server, StopHandle, TowerServiceBuilder}, + types::SubscriptionId, Methods, }; use once_cell::sync::Lazy; +use rand::Rng; use std::env; use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; +use tokio::sync::broadcast::error::RecvError; use tokio::sync::{mpsc, RwLock}; use tower::Service; @@ -377,6 +384,64 @@ struct PerConnection { keystore: Arc>, } +#[derive(Debug, Copy, Clone)] +pub struct RandomHexStringIdProvider {} + +impl RandomHexStringIdProvider { + pub fn new() -> Self { + Self {} + } +} + +impl IdProvider for RandomHexStringIdProvider { + fn next_id(&self) -> SubscriptionId<'static> { + let mut bytes = [0u8; 32]; + let mut rng = rand::thread_rng(); + rng.fill(&mut bytes); + + SubscriptionId::Str(format!("{:#x}", H256::from(bytes)).into()) + } +} + +enum ReceiverType { + Heads(Receiver), + Txn(Receiver>), +} + +async fn handle_subscription(mut rx: Receiver, sink: jsonrpsee::SubscriptionSink) +where + T: serde::Serialize + Clone, +{ + loop { + let action = rx.recv().await; + + tokio::select! { + action = async { action } => { + match action { + Ok(v) => { + match jsonrpsee::SubscriptionMessage::from_json(&v) { + Ok(msg) => { + // This fails only if the connection is closed + if sink.send(msg).await.is_err() { + break; + } + } + Err(e) => { + tracing::error!("Failed to serialize message: {:?}", e); + break; + } + } + } + Err(RecvError::Closed) => break, + Err(RecvError::Lagged(_)) => {}, + } + } + _ = sink.closed() => break, + } + } + tracing::trace!("Subscription task ended (id: {:?})", sink.subscription_id()); +} + pub async fn start_rpc(state: RPCState, rpc_endpoint: SocketAddr) -> anyhow::Result<()> where DB: Blockstore + Send + Sync + 'static, @@ -394,6 +459,61 @@ where })?; module.merge(pubsub_module)?; + module + .register_subscription( + eth::ETH_SUBSCRIBE, + eth::ETH_SUBSCRIPTION, + eth::ETH_UNSUBSCRIBE, + |params, pending, ctx, _| async move { + let event_types = match params.parse::>() { + Ok(v) => v, + Err(e) => { + pending + .reject(jsonrpsee::types::ErrorObjectOwned::from(e)) + .await; + // If the subscription has not been "accepted" then + // the return value will be "ignored" as it's not + // allowed to send out any further notifications on + // on the subscription. + return Ok(()); + } + }; + // `event_types` is one OR more of: + // - "newHeads": notify when new blocks arrive + // - "newPendingTransactions": notify when new messages arrive in the message pool + // - "logs": notify new event logs that match a criteria + + tracing::trace!("Subscribing to events: {:?}", event_types); + + let receiver = event_types + .iter() + .find_map(|event| match event.as_str() { + "newHeads" => Some(ReceiverType::Heads(new_heads(&ctx))), + "newPendingTransactions" => { + Some(ReceiverType::Txn(pending_txn(ctx.clone()))) + } + _ => None, + }) + .expect("No valid event type found"); + + tokio::spawn(async move { + // Mark the subscription is accepted after the params has been parsed successful. + // This is actually responds the underlying RPC method call and may fail if the + // connection is closed. + let sink = pending.accept().await.unwrap(); + tracing::trace!("Subscription started (id: {:?})", sink.subscription_id()); + + match receiver { + ReceiverType::Heads(rx) => handle_subscription(rx, sink).await, + ReceiverType::Txn(rx) => handle_subscription(rx, sink).await, + } + }); + + Ok(()) + }, + ) + .unwrap(); + let (stop_handle, _server_handle) = stop_channel(); let per_conn = PerConnection { @@ -403,6 +523,7 @@ where // Default size (10 MiB) is not enough for methods like `Filecoin.StateMinerActiveSectors` .max_request_body_size(MAX_REQUEST_BODY_SIZE) .max_response_body_size(MAX_RESPONSE_BODY_SIZE) + .set_id_provider(RandomHexStringIdProvider::new()) .to_service_builder(), keystore, }; @@ -504,6 +625,7 @@ where }; } for_each_method!(register); + module }