Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ integer-encoding = "4.0"
ipld-core = { version = "0.4", features = ["serde", "arb"] }
is-terminal = "0.4"
itertools = "0.14"
jsonrpsee = { version = "0.25", features = ["server", "ws-client", "http-client"] }
jsonrpsee = { version = "0.25", features = ["server", "ws-client", "http-client", "macros"] }
jsonwebtoken = "9"
keccak-hash = "0.11"
kubert-prometheus-process = "0.2"
Expand Down
1 change: 1 addition & 0 deletions src/rpc/methods/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub(crate) mod errors;
mod eth_tx;
pub mod filter;
pub mod pubsub;
pub(crate) mod pubsub_trait;
mod trace;
pub mod types;
mod utils;
Expand Down
218 changes: 67 additions & 151 deletions src/rpc/methods/eth/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,172 +59,88 @@
//! ```
//!

use std::fmt;

use crate::rpc::eth::pubsub_trait::{
EthPubSubApiServer, LogFilter, SubscriptionKind, SubscriptionParams,
};
use crate::rpc::{RPCState, chain};
use fvm_ipld_blockstore::Blockstore;
use serde::de::{self, Deserializer, SeqAccess, Visitor};
use serde::{Deserialize, Serialize};
use jsonrpsee::PendingSubscriptionSink;
use jsonrpsee::core::{SubscriptionError, SubscriptionResult};
use std::sync::Arc;
use tokio::sync::broadcast::{Receiver as Subscriber, error::RecvError};

use crate::rpc::Ctx;
use crate::rpc::eth::types::EthAddressList;
use crate::rpc::eth::{EthFilterSpec, EthTopicSpec};

pub const ETH_SUBSCRIPTION: &str = "eth_subscription";

const NEW_HEADS: &str = "newHeads";
const PENDING_TRANSACTIONS: &str = "pendingTransactions";
const LOGS: &str = "logs";

#[derive(Default, Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct LogFilter {
pub address: EthAddressList,
pub topics: Option<EthTopicSpec>,
pub struct EthPubSub<DB> {
ctx: Arc<RPCState<DB>>,
}

#[derive(Debug)]
enum Subscription {
NewHeads,
PendingTransactions,
Logs(Option<LogFilter>),
impl<DB> EthPubSub<DB> {
pub fn new(ctx: Arc<RPCState<DB>>) -> Self {
Self { ctx }
}
}

impl<'de> Deserialize<'de> for Subscription {
fn deserialize<D>(deserializer: D) -> Result<Subscription, D::Error>
where
D: Deserializer<'de>,
{
struct SubscriptionVisitor;

impl<'de> Visitor<'de> for SubscriptionVisitor {
type Value = Subscription;

fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str(r#"a JSON array like ["logs", {...}] or ["newHeads"]"#)
#[async_trait::async_trait]
impl<DB> EthPubSubApiServer for EthPubSub<DB>
where
DB: Blockstore + Send + Sync + 'static,
{
async fn subscribe(
&self,
pending: PendingSubscriptionSink,
kind: SubscriptionKind,
params: Option<SubscriptionParams>,
) -> SubscriptionResult {
let sink = pending.accept().await?;
let ctx = self.ctx.clone();

match kind {
SubscriptionKind::NewHeads => self.handle_new_heads_subscription(sink, ctx).await,
SubscriptionKind::PendingTransactions => {
return Err(SubscriptionError::from(
jsonrpsee::types::ErrorObjectOwned::owned(
jsonrpsee::types::error::METHOD_NOT_FOUND_CODE,
"pendingTransactions subscription not yet implemented",
None::<()>,
),
));
}

fn visit_seq<V>(self, mut seq: V) -> Result<Subscription, V::Error>
where
V: SeqAccess<'de>,
{
let event_type: String = seq
.next_element()?
.ok_or_else(|| de::Error::invalid_length(0, &self))?;

match event_type.as_str() {
NEW_HEADS => {
if seq.next_element::<serde::de::IgnoredAny>()?.is_some() {
return Err(de::Error::custom("unsupported event type"));
}
Ok(Subscription::NewHeads)
}
PENDING_TRANSACTIONS => {
if seq.next_element::<serde::de::IgnoredAny>()?.is_some() {
return Err(de::Error::custom("unsupported event type"));
}
Ok(Subscription::PendingTransactions)
}
LOGS => Ok(Subscription::Logs(seq.next_element()?)),
_ => Err(de::Error::unknown_variant(
&event_type,
&[NEW_HEADS, PENDING_TRANSACTIONS, LOGS],
)),
}
SubscriptionKind::Logs => {
let filter = params.and_then(|p| p.filter);
self.handle_logs_subscription(sink, ctx, filter).await
}
}

deserializer.deserialize_seq(SubscriptionVisitor)
Ok(())
}
}

pub async fn eth_subscribe<DB: Blockstore + Sync + Send + 'static>(
params: jsonrpsee::types::Params<'static>,
pending: jsonrpsee::core::server::PendingSubscriptionSink,
ctx: Ctx<DB>,
_ext: http::Extensions,
) -> impl jsonrpsee::IntoSubscriptionCloseResponse {
let subscription: Subscription = match params.parse() {
Ok(sub) => sub,
Err(e) => {
pending
.reject(jsonrpsee::types::ErrorObjectOwned::from(e))
.await;
// If the subscription has not been "accepted" then
// the return value will be "ignored" as it's not
// allowed to send out any further notifications on
// on the subscription.
return Ok(());
}
};

tracing::trace!("Subscribing to event: {:?}", subscription);

match subscription {
Subscription::NewHeads => {
// Mark the subscription is accepted after the params has been parsed successful.
// This is actually responds the underlying RPC method call and may fail if the
// connection is closed.
let sink = match pending.accept().await {
Ok(sink) => sink,
Err(e) => {
tracing::error!("Failed to accept subscription: {:?}", e);
return Ok(());
}
};

// Spawn newHeads task
let (new_heads, handle) = crate::rpc::new_heads(&ctx);

tokio::spawn(async move {
tracing::trace!(
"Subscription task started (id: {:?})",
sink.subscription_id()
);

handle_subscription(new_heads, sink, handle).await;
});
}
Subscription::Logs(filter) => {
// Mark the subscription is accepted after the params has been parsed successful.
// This is actually responds the underlying RPC method call and may fail if the
// connection is closed.
let sink = match pending.accept().await {
Ok(sink) => sink,
Err(e) => {
tracing::error!("Failed to accept subscription: {:?}", e);
return Ok(());
}
};

let filter_spec: Option<EthFilterSpec> = filter.map(Into::into);

// Spawn logs task
let (logs, handle) = crate::rpc::chain::logs(&ctx, filter_spec);

tokio::spawn(async move {
tracing::trace!(
"Logs subscription task started (id: {:?})",
sink.subscription_id()
);

handle_subscription(logs, sink, handle).await;
});
}
Subscription::PendingTransactions => {
// TODO(akaladarshi): https://github.com/ChainSafe/forest/pull/5782
pending
.reject(jsonrpsee::types::ErrorObjectOwned::owned(
jsonrpsee::types::error::METHOD_NOT_FOUND_CODE,
"pendingTransactions subscription not yet implemented",
None::<()>,
))
.await;
return Ok(());
}
impl<DB> EthPubSub<DB>
where
DB: Blockstore + Send + Sync + 'static,
{
async fn handle_new_heads_subscription(
&self,
accepted_sink: jsonrpsee::SubscriptionSink,
ctx: Arc<RPCState<DB>>,
) {
let (subscriber, handle) = chain::new_heads(&ctx);
tokio::spawn(async move {
handle_subscription(subscriber, accepted_sink, handle).await;
});
}

Ok(())
async fn handle_logs_subscription(
&self,
accepted_sink: jsonrpsee::SubscriptionSink,
ctx: Arc<RPCState<DB>>,
filter_spec: Option<LogFilter>,
) {
let filter_spec = filter_spec.map(Into::into);
let (logs, handle) = chain::logs(&ctx, filter_spec);
tokio::spawn(async move {
handle_subscription(logs, accepted_sink, handle).await;
});
}
}

async fn handle_subscription<T>(
Expand Down Expand Up @@ -266,5 +182,5 @@ async fn handle_subscription<T>(
}
handle.abort();

tracing::trace!("Subscription task ended (id: {:?})", sink.subscription_id());
tracing::info!("Subscription task ended (id: {:?})", sink.subscription_id());
}
44 changes: 44 additions & 0 deletions src/rpc/methods/eth/pubsub_trait.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2019-2025 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::rpc::eth::types::{EthAddressList, EthTopicSpec};
use jsonrpsee::proc_macros::rpc;
use serde::{Deserialize, Serialize};

#[rpc(server, namespace = "eth")]
pub trait EthPubSubApi {
/// Subscribe to Ethereum events
#[subscription(
name = "subscribe" => "subscription",
aliases = ["Filecoin.EthSubscribe"],
unsubscribe = "unsubscribe",
unsubscribe_aliases = ["Filecoin.EthUnsubscribe"],
item = serde_json::Value
)]
async fn subscribe(
&self,
kind: SubscriptionKind,
params: Option<SubscriptionParams>,
) -> jsonrpsee::core::SubscriptionResult;
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum SubscriptionKind {
NewHeads,
PendingTransactions,
Logs,
}

#[derive(Default, Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
pub struct LogFilter {
pub address: EthAddressList,
pub topics: Option<EthTopicSpec>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SubscriptionParams {
#[serde(flatten)]
pub filter: Option<LogFilter>,
}
2 changes: 1 addition & 1 deletion src/rpc/methods/eth/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use super::*;
use crate::blocks::CachingBlockHeader;
use crate::rpc::eth::pubsub::LogFilter;
use crate::rpc::eth::pubsub_trait::LogFilter;
use anyhow::ensure;
use ipld_core::serde::SerdeError;
use jsonrpsee::core::traits::IdProvider;
Expand Down
Loading
Loading