Skip to content
Closed
2 changes: 2 additions & 0 deletions src/rpc/auth_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ static ACCESS_MAP: Lazy<HashMap<&str, Access>> = Lazy::new(|| {
access.insert(eth::ETH_GET_BALANCE, Access::Read);
access.insert(eth::EthSyncing::NAME, Access::Read);
access.insert(eth::ETH_GET_BLOCK_BY_NUMBER, Access::Read);
access.insert(eth::ETH_SUBSCRIBE, Access::Read);
access.insert(eth::ETH_UNSUBSCRIBE, Access::Read);
access.insert(eth::WEB3_CLIENT_VERSION, Access::Read);

// Pubsub API
Expand Down
22 changes: 22 additions & 0 deletions src/rpc/methods/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,25 @@ pub(crate) fn chain_notify<DB: Blockstore>(
receiver
}

pub(crate) fn new_heads<DB: Blockstore>(data: &crate::rpc::RPCState<DB>) -> Subscriber<ApiHeaders> {
let (sender, receiver) = broadcast::channel(100);

let mut subscriber = data.chain_store.publisher().subscribe();

tokio::spawn(async move {
while let Ok(v) = subscriber.recv().await {
let headers = match v {
HeadChange::Apply(ts) => ApiHeaders(ts.block_headers().clone().into()),
};
if sender.send(headers).is_err() {
break;
}
}
});

receiver
}

fn load_api_messages_from_tipset(
store: &impl Blockstore,
tipset: &Tipset,
Expand Down Expand Up @@ -782,6 +801,9 @@ pub struct ApiHeadChange {
pub headers: Vec<CachingBlockHeader>,
}

#[derive(Clone, Serialize, Deserialize, PartialEq, Debug)]
pub struct ApiHeaders(#[serde(with = "crate::lotus_json")] pub Vec<CachingBlockHeader>);

#[derive(PartialEq, Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub enum PathChange<T = Arc<Tipset>> {
Expand Down
5 changes: 5 additions & 0 deletions src/rpc/methods/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub const ETH_CHAIN_ID: &str = "Filecoin.EthChainId";
pub const ETH_GAS_PRICE: &str = "Filecoin.EthGasPrice";
pub const ETH_GET_BALANCE: &str = "Filecoin.EthGetBalance";
pub const ETH_GET_BLOCK_BY_NUMBER: &str = "Filecoin.EthGetBlockByNumber";
pub const ETH_SUBSCRIBE: &str = "Filecoin.EthSubscribe";
pub const ETH_UNSUBSCRIBE: &str = "Filecoin.EthUnsubscribe";
pub const WEB3_CLIENT_VERSION: &str = "Filecoin.Web3ClientVersion";

const MASKED_ID_PREFIX: [u8; 12] = [0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0];
Expand Down Expand Up @@ -627,6 +629,9 @@ impl HasLotusJson for EthSyncingResult {
}
}

/// An opaque identifier generated by the node to refer to an active subscription.
pub type SubscriptionID = Hash;

pub async fn eth_accounts() -> Result<Vec<String>, ServerError> {
// EthAccounts will always return [] since we don't expect Forest to manage private keys
Ok(vec![])
Expand Down
100 changes: 99 additions & 1 deletion src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod client;

pub use client::Client;
pub use error::ServerError;
use methods::chain::new_heads;
use reflect::Ctx;
pub use reflect::{ApiVersion, RpcMethod, RpcMethodExt};
mod error;
Expand Down Expand Up @@ -116,14 +117,18 @@ use crate::rpc::channel::RpcModule as FilRpcModule;
pub use crate::rpc::channel::CANCEL_METHOD_NAME;
use crate::rpc::state::*;

use ethereum_types::H256;
use fvm_ipld_blockstore::Blockstore;
use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn};
use jsonrpsee::{
core::RegisterMethodError,
core::{traits::IdProvider, RegisterMethodError},
server::{stop_channel, RpcModule, RpcServiceBuilder, Server, StopHandle, TowerServiceBuilder},
types::SubscriptionId,
Methods,
};
use rand::Rng;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::{mpsc, RwLock};
use tower::Service;
use tracing::info;
Expand All @@ -133,6 +138,8 @@ use self::reflect::openrpc_types::ParamStructure;
const MAX_REQUEST_BODY_SIZE: u32 = 64 * 1024 * 1024;
const MAX_RESPONSE_BODY_SIZE: u32 = MAX_REQUEST_BODY_SIZE;

const ETH_SUBSCRIPTION: &str = "eth_subscription";

/// This is where you store persistent data, or at least access to stateful
/// data.
pub struct RPCState<DB> {
Expand Down Expand Up @@ -163,6 +170,25 @@ struct PerConnection<RpcMiddleware, HttpMiddleware> {
keystore: Arc<RwLock<KeyStore>>,
}

#[derive(Debug, Copy, Clone)]
pub struct RandomHexStringIdProvider {}

impl RandomHexStringIdProvider {
pub fn new() -> Self {
Self {}
}
}

impl IdProvider for RandomHexStringIdProvider {
fn next_id(&self) -> SubscriptionId<'static> {
let mut bytes = [0u8; 32];
let mut rng = rand::thread_rng();
rng.fill(&mut bytes);

SubscriptionId::Str(format!("{:#x}", H256::from(bytes)).into())
}
}

pub async fn start_rpc<DB>(state: RPCState<DB>, rpc_endpoint: SocketAddr) -> anyhow::Result<()>
where
DB: Blockstore + Send + Sync + 'static,
Expand Down Expand Up @@ -193,6 +219,7 @@ where
// Default size (10 MiB) is not enough for methods like `Filecoin.StateMinerActiveSectors`
.max_request_body_size(MAX_REQUEST_BODY_SIZE)
.max_response_body_size(MAX_RESPONSE_BODY_SIZE)
.set_id_provider(RandomHexStringIdProvider::new())
.to_service_builder(),
keystore,
};
Expand Down Expand Up @@ -295,6 +322,77 @@ where
module.register_method(WEB3_CLIENT_VERSION, move |_, _| {
crate::utils::version::FOREST_VERSION_STRING.clone()
})?;
module.register_subscription(
ETH_SUBSCRIBE,
ETH_SUBSCRIPTION,
ETH_UNSUBSCRIBE,
|params, pending, ctx| async move {
let event_types = match params.parse::<Vec<String>>() {
Ok(v) => v,
Err(e) => {
pending
.reject(jsonrpsee::types::ErrorObjectOwned::from(e))
.await;
// If the subscription has not been "accepted" then
// the return value will be "ignored" as it's not
// allowed to send out any further notifications on
// on the subscription.
return Ok(());
}
};
// `event_types` is one OR more of:
// - "newHeads": notify when new blocks arrive
// - "pendingTransactions": notify when new messages arrive in the message pool
// - "logs": notify new event logs that match a criteria

tracing::trace!("Subscribing to events: {:?}", event_types);

let mut receiver = new_heads(&ctx);
tokio::spawn(async move {
// Mark the subscription is accepted after the params has been parsed successful.
// This is actually responds the underlying RPC method call and may fail if the
// connection is closed.
let sink = pending.accept().await.unwrap();

tracing::trace!("Subscription started (id: {:?})", sink.subscription_id());

loop {
tokio::select! {
action = receiver.recv() => {
match action {
Ok(v) => {
match jsonrpsee::SubscriptionMessage::from_json(&v) {
Ok(msg) => {
// This fails only if the connection is closed
if sink.send(msg).await.is_err() {
break;
}
}
Err(e) => {
tracing::error!("Failed to serialize message: {:?}", e);
break;
}
}
}
Err(RecvError::Closed) => {
break;
}
Err(RecvError::Lagged(_)) => {
}
}
}
_ = sink.closed() => {
break;
}
}
}

tracing::trace!("Subscription task ended (id: {:?})", sink.subscription_id());
});

Ok(())
},
)?;

Ok(())
}
Expand Down
4 changes: 4 additions & 0 deletions src/rpc_client/eth_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ impl ApiInfo {
RpcRequest::new_v1(ETH_GET_BALANCE, (address, block_param))
}

pub fn eth_subscribe_req(event: serde_json::Value) -> RpcRequest<SubscriptionID> {
RpcRequest::new_v1(ETH_SUBSCRIBE, event)
}

pub fn web3_client_version_req() -> RpcRequest<String> {
RpcRequest::new_v1(WEB3_CLIENT_VERSION, ())
}
Expand Down
7 changes: 5 additions & 2 deletions src/tool/subcommands/api_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use fvm_shared2::piece::PaddedPieceSize;
use itertools::Itertools as _;
use jsonrpsee::types::ErrorCode;
use serde::de::DeserializeOwned;
use serde_json::json;
use similar::{ChangeTag, TextDiff};
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
Expand Down Expand Up @@ -911,8 +912,10 @@ fn snapshot_tests(store: Arc<ManyCar>, n_tipsets: usize) -> anyhow::Result<Vec<R
}

fn websocket_tests() -> Vec<RpcTest> {
let test = RpcTest::identity(ApiInfo::chain_notify_req()).ignore("Not implemented yet");
vec![test]
vec![
RpcTest::identity(ApiInfo::chain_notify_req()).ignore("Not implemented yet"),
RpcTest::basic(ApiInfo::eth_subscribe_req(json!(["newHeads"]))),
]
}

fn sample_message_cids<'a>(
Expand Down