Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changelog/warm-geese-build.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
reth-rpc-api: minor
reth-rpc-builder: patch
reth-rpc: minor
---

Added `subscribeFinalizedChainNotifications` RPC endpoint that buffers committed chain notifications and emits them once a new finalized block is received.
15 changes: 14 additions & 1 deletion crates/rpc/rpc-api/src/reth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use alloy_eips::BlockId;
use alloy_primitives::{map::AddressMap, U256};
use jsonrpsee::{core::RpcResult, proc_macros::rpc};

// Required for the subscription attribute below
// Required for the subscription attributes below
use reth_chain_state as _;

/// Reth API namespace for reth-specific methods
Expand Down Expand Up @@ -33,4 +33,17 @@ pub trait RethApi {
item = alloy_eips::BlockNumHash
)]
async fn reth_subscribe_persisted_block(&self) -> jsonrpsee::core::SubscriptionResult;

/// Subscribe to finalized chain notifications.
///
/// Buffers committed chain notifications and emits them once a new finalized block is received.
/// Each notification contains all committed chain segments up to the finalized block.
#[subscription(
name = "subscribeFinalizedChainNotifications",
unsubscribe = "unsubscribeFinalizedChainNotifications",
item = Vec<reth_chain_state::CanonStateNotification>
)]
async fn reth_subscribe_finalized_chain_notifications(
&self,
) -> jsonrpsee::core::SubscriptionResult;
}
9 changes: 7 additions & 2 deletions crates/rpc/rpc-builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ pub use eth::EthHandlers;
mod metrics;
use crate::middleware::RethRpcMiddleware;
pub use metrics::{MeteredBatchRequestsFuture, MeteredRequestFuture, RpcRequestMetricsService};
use reth_chain_state::{CanonStateSubscriptions, PersistedBlockSubscriptions};
use reth_chain_state::{
CanonStateSubscriptions, ForkChoiceSubscriptions, PersistedBlockSubscriptions,
};
use reth_rpc::eth::sim_bundle::EthSimBundle;

// Rpc rate limiter
Expand Down Expand Up @@ -311,6 +313,7 @@ where
N: NodePrimitives,
Provider: FullRpcProvider<Block = N::Block, Receipt = N::Receipt, Header = N::BlockHeader>
+ CanonStateSubscriptions<Primitives = N>
+ ForkChoiceSubscriptions<Header = N::BlockHeader>
+ PersistedBlockSubscriptions
+ AccountReader
+ ChangeSetReader,
Expand Down Expand Up @@ -656,7 +659,8 @@ where
Transaction = N::SignedTx,
> + AccountReader
+ ChangeSetReader
+ CanonStateSubscriptions
+ CanonStateSubscriptions<Primitives = N>
+ ForkChoiceSubscriptions<Header = N::BlockHeader>
+ PersistedBlockSubscriptions,
Network: NetworkInfo + Peers + Clone + 'static,
EthApi: EthApiServer<
Expand Down Expand Up @@ -845,6 +849,7 @@ where
N: NodePrimitives,
Provider: FullRpcProvider<Block = N::Block>
+ CanonStateSubscriptions<Primitives = N>
+ ForkChoiceSubscriptions<Header = N::BlockHeader>
+ PersistedBlockSubscriptions
+ AccountReader
+ ChangeSetReader,
Expand Down
90 changes: 89 additions & 1 deletion crates/rpc/rpc/src/reth.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
use std::{future::Future, sync::Arc};

use alloy_consensus::BlockHeader;
use alloy_eips::BlockId;
use alloy_primitives::{map::AddressMap, U256};
use async_trait::async_trait;
use futures::{Stream, StreamExt};
use jsonrpsee::{core::RpcResult, PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink};
use reth_chain_state::{CanonStateSubscriptions, PersistedBlockSubscriptions};
use reth_chain_state::{
CanonStateNotification, CanonStateSubscriptions, ForkChoiceSubscriptions,
PersistedBlockSubscriptions,
};
use reth_errors::RethResult;
use reth_primitives_traits::{NodePrimitives, SealedHeader};
use reth_rpc_api::RethApiServer;
use reth_rpc_eth_types::{EthApiError, EthResult};
use reth_storage_api::{BlockReaderIdExt, ChangeSetReader, StateProviderFactory};
Expand Down Expand Up @@ -92,6 +97,7 @@ where
+ ChangeSetReader
+ StateProviderFactory
+ CanonStateSubscriptions
+ ForkChoiceSubscriptions<Header = <Provider::Primitives as NodePrimitives>::BlockHeader>
+ PersistedBlockSubscriptions
+ 'static,
{
Expand Down Expand Up @@ -126,6 +132,23 @@ where

Ok(())
}

/// Handler for `reth_subscribeFinalizedChainNotifications`
async fn reth_subscribe_finalized_chain_notifications(
&self,
pending: PendingSubscriptionSink,
) -> jsonrpsee::core::SubscriptionResult {
let sink = pending.accept().await?;
let canon_stream = self.provider().canonical_state_stream();
let finalized_stream = self.provider().finalized_block_stream();
self.inner.task_spawner.spawn(Box::pin(finalized_chain_notifications(
sink,
canon_stream,
finalized_stream,
)));

Ok(())
}
}

/// Pipes all stream items to the subscription sink.
Expand Down Expand Up @@ -158,6 +181,71 @@ where
}
}

/// Buffers committed chain notifications and emits them when a new finalized block is received.
async fn finalized_chain_notifications<N>(
sink: SubscriptionSink,
mut canon_stream: reth_chain_state::CanonStateNotificationStream<N>,
mut finalized_stream: reth_chain_state::ForkChoiceStream<SealedHeader<N::BlockHeader>>,
) where
N: NodePrimitives,
{
let mut buffered: Vec<CanonStateNotification<N>> = Vec::new();

loop {
tokio::select! {
_ = sink.closed() => {
break
}
maybe_canon = canon_stream.next() => {
let Some(notification) = maybe_canon else { break };
match &notification {
CanonStateNotification::Commit { .. } => {
buffered.push(notification);
}
CanonStateNotification::Reorg { .. } => {
buffered.clear();
}
}
}
maybe_finalized = finalized_stream.next() => {
let Some(finalized_header) = maybe_finalized else { break };
let finalized_num = finalized_header.number();

let mut committed = Vec::new();
buffered.retain(|n| {
if *n.committed().range().end() <= finalized_num {
committed.push(n.clone());
false
} else {
true
}
});

if committed.is_empty() {
continue;
}

committed.sort_by_key(|n| *n.committed().range().start());

let msg = match SubscriptionMessage::new(
sink.method_name(),
sink.subscription_id(),
&committed,
) {
Ok(msg) => msg,
Err(err) => {
tracing::error!(target: "rpc::reth", %err, "Failed to serialize finalized chain notification");
break
}
};
if sink.send(msg).await.is_err() {
break;
}
}
}
}
}

impl<Provider> std::fmt::Debug for RethApi<Provider> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RethApi").finish_non_exhaustive()
Expand Down
Loading