Skip to content
2 changes: 1 addition & 1 deletion src/message_pool/msgpool/msg_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ pub struct MessagePool<T> {
/// Acts as a signal to republish messages from the republished set of
/// messages
pub repub_trigger: flume::Sender<()>,
local_msgs: Arc<SyncRwLock<HashSet<SignedMessage>>>,
pub local_msgs: Arc<SyncRwLock<HashSet<SignedMessage>>>,
/// Configurable parameters of the message pool
pub config: MpoolConfig,
/// Chain configuration
Expand Down
5 changes: 4 additions & 1 deletion src/rpc/auth_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::auth::{verify_token, JWT_IDENTIFIER};
use crate::key_management::KeyStore;
use crate::rpc::{chain, Permission, RpcMethod as _, CANCEL_METHOD_NAME};
use crate::rpc::{chain, eth, Permission, RpcMethod as _, CANCEL_METHOD_NAME};
use ahash::{HashMap, HashMapExt as _};
use futures::future::BoxFuture;
use futures::FutureExt;
Expand Down Expand Up @@ -31,6 +31,9 @@ static METHOD_NAME2REQUIRED_PERMISSION: Lazy<HashMap<&str, Permission>> = Lazy::
super::for_each_method!(insert);

access.insert(chain::CHAIN_NOTIFY, Permission::Read);
access.insert(eth::ETH_SUBSCRIBE, Permission::Read);
access.insert(eth::ETH_SUBSCRIPTION, Permission::Read);
access.insert(eth::ETH_UNSUBSCRIBE, Permission::Read);
access.insert(CANCEL_METHOD_NAME, Permission::Read);

access
Expand Down
49 changes: 49 additions & 0 deletions src/rpc/methods/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::ipld::DfsIter;
use crate::lotus_json::{assert_all_snapshots, assert_unchanged_via_json};
use crate::lotus_json::{lotus_json_with_self, HasLotusJson, LotusJson};
use crate::message::{ChainMessage, SignedMessage};
use crate::message_pool::Provider;
use crate::rpc::types::ApiTipsetKey;
use crate::rpc::{ApiPaths, Ctx, Permission, RpcMethod, ServerError};
use crate::shim::clock::ChainEpoch;
Expand Down Expand Up @@ -669,6 +670,51 @@ pub(crate) fn chain_notify<DB: Blockstore>(
receiver
}

pub(crate) fn new_heads<DB: Blockstore>(data: &crate::rpc::RPCState<DB>) -> Subscriber<ApiHeaders> {
let (sender, receiver) = broadcast::channel(100);

let mut subscriber = data.chain_store().publisher().subscribe();

tokio::spawn(async move {
while let Ok(v) = subscriber.recv().await {
let headers = match v {
HeadChange::Apply(ts) => ApiHeaders(ts.block_headers().clone().into()),
};
if sender.send(headers).is_err() {
break;
}
}
});

receiver
}

pub(crate) fn pending_txn<DB: Blockstore + Send + Sync + 'static>(
data: Arc<crate::rpc::RPCState<DB>>,
) -> Subscriber<Vec<SignedMessage>> {
let (sender, receiver) = broadcast::channel(100);

let mut subscriber = data.mpool.api.subscribe_head_changes();

tokio::spawn(async move {
while let Ok(v) = subscriber.recv().await {
let messages = match v {
HeadChange::Apply(_ts) => {
let local_msgs = data.mpool.local_msgs.write();
let pending = local_msgs.iter().cloned().collect::<Vec<SignedMessage>>();
pending
}
};

if sender.send(messages).is_err() {
break;
}
}
});

receiver
}

fn load_api_messages_from_tipset(
store: &impl Blockstore,
tipset: &Tipset,
Expand Down Expand Up @@ -773,6 +819,9 @@ pub struct ApiHeadChange {
}
lotus_json_with_self!(ApiHeadChange);

#[derive(Clone, Serialize, Deserialize, PartialEq, Debug)]
pub struct ApiHeaders(#[serde(with = "crate::lotus_json")] pub Vec<CachingBlockHeader>);

#[derive(PartialEq, Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(tag = "Type", content = "Val", rename_all = "snake_case")]
pub enum PathChange<T = Arc<Tipset>> {
Expand Down
4 changes: 4 additions & 0 deletions src/rpc/methods/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ use serde::{Deserialize, Serialize};
use std::str::FromStr;
use std::{ops::Add, sync::Arc};

pub const ETH_SUBSCRIBE: &str = "Filecoin.EthSubscribe";
pub const ETH_SUBSCRIPTION: &str = "Filecoin.EthSubscription";
pub const ETH_UNSUBSCRIBE: &str = "Filecoin.EthUnsubscribe";

const MASKED_ID_PREFIX: [u8; 12] = [0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0];

/// Ethereum Bloom filter size in bits.
Expand Down
122 changes: 122 additions & 0 deletions src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod request;
pub use client::Client;
pub use error::ServerError;
use futures::FutureExt as _;
use methods::chain::{new_heads, pending_txn};
use reflect::Ctx;
pub use reflect::{ApiPath, ApiPaths, RpcMethod, RpcMethodExt};
pub use request::Request;
Expand All @@ -17,6 +18,7 @@ mod reflect;
pub mod types;
pub use methods::*;
use reflect::Permission;
use tokio::sync::broadcast::Receiver;

/// Protocol or transport-specific error
pub use jsonrpsee::core::ClientError;
Expand Down Expand Up @@ -300,16 +302,21 @@ use crate::rpc::channel::RpcModule as FilRpcModule;
pub use crate::rpc::channel::CANCEL_METHOD_NAME;

use crate::blocks::Tipset;
use ethereum_types::H256;
use fvm_ipld_blockstore::Blockstore;
use jsonrpsee::{
core::traits::IdProvider,
server::{stop_channel, RpcModule, RpcServiceBuilder, Server, StopHandle, TowerServiceBuilder},
types::SubscriptionId,
Methods,
};
use once_cell::sync::Lazy;
use rand::Rng;
use std::env;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::{mpsc, RwLock};
use tower::Service;

Expand Down Expand Up @@ -377,6 +384,64 @@ struct PerConnection<RpcMiddleware, HttpMiddleware> {
keystore: Arc<RwLock<KeyStore>>,
}

#[derive(Debug, Copy, Clone)]
pub struct RandomHexStringIdProvider {}

impl RandomHexStringIdProvider {
pub fn new() -> Self {
Self {}
}
}

impl IdProvider for RandomHexStringIdProvider {
fn next_id(&self) -> SubscriptionId<'static> {
let mut bytes = [0u8; 32];
let mut rng = rand::thread_rng();
rng.fill(&mut bytes);

SubscriptionId::Str(format!("{:#x}", H256::from(bytes)).into())
}
}

enum ReceiverType {
Heads(Receiver<chain::ApiHeaders>),
Txn(Receiver<Vec<crate::message::SignedMessage>>),
}

async fn handle_subscription<T>(mut rx: Receiver<T>, sink: jsonrpsee::SubscriptionSink)
where
T: serde::Serialize + Clone,
{
loop {
let action = rx.recv().await;

tokio::select! {
action = async { action } => {
match action {
Ok(v) => {
match jsonrpsee::SubscriptionMessage::from_json(&v) {
Ok(msg) => {
// This fails only if the connection is closed
if sink.send(msg).await.is_err() {
break;
}
}
Err(e) => {
tracing::error!("Failed to serialize message: {:?}", e);
break;
}
}
}
Err(RecvError::Closed) => break,
Err(RecvError::Lagged(_)) => {},
}
}
_ = sink.closed() => break,
}
}
tracing::trace!("Subscription task ended (id: {:?})", sink.subscription_id());
}

pub async fn start_rpc<DB>(state: RPCState<DB>, rpc_endpoint: SocketAddr) -> anyhow::Result<()>
where
DB: Blockstore + Send + Sync + 'static,
Expand All @@ -394,6 +459,61 @@ where
})?;
module.merge(pubsub_module)?;

module
.register_subscription(
eth::ETH_SUBSCRIBE,
eth::ETH_SUBSCRIPTION,
eth::ETH_UNSUBSCRIBE,
|params, pending, ctx, _| async move {
let event_types = match params.parse::<Vec<String>>() {
Ok(v) => v,
Err(e) => {
pending
.reject(jsonrpsee::types::ErrorObjectOwned::from(e))
.await;
// If the subscription has not been "accepted" then
// the return value will be "ignored" as it's not
// allowed to send out any further notifications on
// on the subscription.
return Ok(());
}
};
// `event_types` is one OR more of:
// - "newHeads": notify when new blocks arrive
// - "newPendingTransactions": notify when new messages arrive in the message pool
// - "logs": notify new event logs that match a criteria

tracing::trace!("Subscribing to events: {:?}", event_types);

let receiver = event_types
.iter()
.find_map(|event| match event.as_str() {
"newHeads" => Some(ReceiverType::Heads(new_heads(&ctx))),
"newPendingTransactions" => {
Some(ReceiverType::Txn(pending_txn(ctx.clone())))
}
_ => None,
})
.expect("No valid event type found");

tokio::spawn(async move {
// Mark the subscription is accepted after the params has been parsed successful.
// This is actually responds the underlying RPC method call and may fail if the
// connection is closed.
let sink = pending.accept().await.unwrap();
tracing::trace!("Subscription started (id: {:?})", sink.subscription_id());

match receiver {
ReceiverType::Heads(rx) => handle_subscription(rx, sink).await,
ReceiverType::Txn(rx) => handle_subscription(rx, sink).await,
}
});

Ok(())
},
)
.unwrap();

let (stop_handle, _server_handle) = stop_channel();

let per_conn = PerConnection {
Expand All @@ -403,6 +523,7 @@ where
// Default size (10 MiB) is not enough for methods like `Filecoin.StateMinerActiveSectors`
.max_request_body_size(MAX_REQUEST_BODY_SIZE)
.max_response_body_size(MAX_RESPONSE_BODY_SIZE)
.set_id_provider(RandomHexStringIdProvider::new())
.to_service_builder(),
keystore,
};
Expand Down Expand Up @@ -504,6 +625,7 @@ where
};
}
for_each_method!(register);

module
}

Expand Down