Skip to content

Commit

Permalink
rpc: stabilize chainhead backend (#1802)
Browse files Browse the repository at this point in the history
* rpc: stabilize ChainHeadBackend

* remove noise from example

* add missing features

* make tests compile

* make tests compile v2

* revert stop event

* feature-gate runtime

* Update subxt/Cargo.toml

* add docsrs feature stuff

* Update subxt/src/backend/chain_head/mod.rs

* Update subxt/src/backend/chain_head/mod.rs

* Update subxt/src/backend/chain_head/mod.rs
  • Loading branch information
niklasad1 authored Oct 3, 2024
1 parent 7f9a1a1 commit 3807b29
Show file tree
Hide file tree
Showing 22 changed files with 302 additions and 184 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ jobs:
uses: "andymckay/cancel-action@a955d435292c0d409d104b57d8e78435a93a6ef1" # v0.5

unstable_backend_tests:
name: "Test (Unstable Backend)"
name: "Test chainhead backend"
runs-on: ubuntu-latest-16-cores
needs: [clippy, wasm_clippy, check, wasm_check, docs]
timeout-minutes: 30
Expand Down Expand Up @@ -329,7 +329,7 @@ jobs:
uses: actions-rs/[email protected]
with:
command: nextest
args: run --workspace --features unstable-backend-client
args: run --workspace --features chainhead-backend

- if: "failure()"
uses: "andymckay/cancel-action@a955d435292c0d409d104b57d8e78435a93a6ef1" # v0.5
Expand Down
11 changes: 10 additions & 1 deletion subxt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,21 @@ web = [
"finito?/wasm-bindgen",
]

# Feature flag to enable the default future executor.
# Technically it's a hack enable to both but simplifies the conditional compilation
# and subxt is selecting executor based on the used platform.
#
# For instance `wasm-bindgen-futures` panics if the platform isn't wasm32 and
# similar for tokio that requires a tokio runtime to be initialized.
runtime = ["tokio/rt", "wasm-bindgen-futures"]

# Enable this to use the reconnecting rpc client
unstable-reconnecting-rpc-client = ["dep:finito", "dep:tokio", "jsonrpsee", "wasm-bindgen-futures"]
unstable-reconnecting-rpc-client = ["dep:finito", "jsonrpsee"]

# Enable this to use jsonrpsee (allowing for example `OnlineClient::from_url`).
jsonrpsee = [
"dep:jsonrpsee",
"runtime",
]

# Enable this to pull in extra Substrate dependencies which make it possible to
Expand Down
13 changes: 3 additions & 10 deletions subxt/examples/setup_reconnecting_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.build("ws://localhost:9944".to_string())
.await?;

// If you want to use the unstable backend with the reconnecting RPC client, you can do so like this:
// If you want to use the chainhead backend with the reconnecting RPC client, you can do so like this:
//
// ```
// use subxt::backend::unstable::UnstableBackend;
// use subxt::backend::chain_head:ChainHeadBackend;
// use subxt::OnlineClient;
//
// let (backend, mut driver) = UnstableBackend::builder().build(RpcClient::new(rpc.clone()));
// tokio::spawn(async move {
// while let Some(val) = driver.next().await {
// if let Err(e) = val {
// eprintln!("Error driving unstable backend: {e}; terminating client");
// }
// }
// });
// let backend = ChainHeadBackend::builder().build_with_background_task(RpcClient::new(rpc.clone()));
// let api: OnlineClient<PolkadotConfig> = OnlineClient::from_backend(Arc::new(backend)).await?;
// ```

Expand Down
35 changes: 35 additions & 0 deletions subxt/examples/setup_rpc_chainhead_backend.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//! Example to utilize the ChainHeadBackend rpc backend to subscribe to finalized blocks.

#![allow(missing_docs)]

use futures::StreamExt;
use subxt::backend::chain_head::{ChainHeadBackend, ChainHeadBackendBuilder};
use subxt::backend::rpc::RpcClient;
use subxt::{OnlineClient, PolkadotConfig};

// Generate an interface that we can use from the node's metadata.
#[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata_small.scale")]
pub mod polkadot {}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();

let rpc = RpcClient::from_url("ws://localhost:9944".to_string()).await?;
let backend: ChainHeadBackend<PolkadotConfig> =
ChainHeadBackendBuilder::default().build_with_background_driver(rpc.clone());
let api = OnlineClient::from_backend(std::sync::Arc::new(backend)).await?;

let mut blocks_sub = api.blocks().subscribe_finalized().await?.take(100);

while let Some(block) = blocks_sub.next().await {
let block = block?;

let block_number = block.number();
let block_hash = block.hash();

println!("Block #{block_number} ({block_hash})");
}

Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.

use super::rpc_methods::{FollowEvent, UnstableRpcMethods};
use super::rpc_methods::{ChainHeadRpcMethods, FollowEvent};
use crate::config::Config;
use crate::error::Error;
use futures::{FutureExt, Stream, StreamExt};
Expand Down Expand Up @@ -99,7 +99,7 @@ impl<Hash> FollowStream<Hash> {
}

/// Create a new [`FollowStream`] given the RPC methods.
pub fn from_methods<T: Config>(methods: UnstableRpcMethods<T>) -> FollowStream<T::Hash> {
pub fn from_methods<T: Config>(methods: ChainHeadRpcMethods<T>) -> FollowStream<T::Hash> {
FollowStream {
stream_getter: Box::new(move || {
let methods = methods.clone();
Expand Down Expand Up @@ -215,7 +215,7 @@ impl<Hash> Stream for FollowStream<Hash> {
#[cfg(test)]
pub(super) mod test_utils {
use super::*;
use crate::backend::unstable::rpc_methods::{
use crate::backend::chain_head::rpc_methods::{
BestBlockChanged, Finalized, Initialized, NewBlock,
};
use crate::config::substrate::H256;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// see LICENSE for license details.

use super::follow_stream_unpin::{BlockRef, FollowStreamMsg, FollowStreamUnpin};
use crate::backend::unstable::rpc_methods::{FollowEvent, Initialized, RuntimeEvent};
use crate::backend::chain_head::rpc_methods::{FollowEvent, Initialized, RuntimeEvent};
use crate::config::BlockHash;
use crate::error::{Error, RpcError};
use futures::stream::{Stream, StreamExt};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
// see LICENSE for license details.

use super::follow_stream::FollowStream;
use super::UnstableRpcMethods;
use crate::backend::unstable::rpc_methods::{
use super::ChainHeadRpcMethods;
use crate::backend::chain_head::rpc_methods::{
BestBlockChanged, Finalized, FollowEvent, Initialized, NewBlock,
};
use crate::config::{BlockHash, Config};
Expand Down Expand Up @@ -275,7 +275,7 @@ impl<Hash: BlockHash> FollowStreamUnpin<Hash> {
/// Create a new [`FollowStreamUnpin`] given the RPC methods.
pub fn from_methods<T: Config>(
follow_stream: FollowStream<T::Hash>,
methods: UnstableRpcMethods<T>,
methods: ChainHeadRpcMethods<T>,
max_block_life: usize,
) -> FollowStreamUnpin<T::Hash> {
let unpin_method = Box::new(move |hash: T::Hash, sub_id: Arc<str>| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,22 @@ use std::task::Poll;
use storage_items::StorageItems;

// Expose the RPC methods.
pub use rpc_methods::UnstableRpcMethods;
pub use rpc_methods::ChainHeadRpcMethods;

/// Configure and build an [`UnstableBackend`].
pub struct UnstableBackendBuilder<T> {
/// Configure and build an [`ChainHeadBackend`].
pub struct ChainHeadBackendBuilder<T> {
max_block_life: usize,
_marker: std::marker::PhantomData<T>,
}

impl<T: Config> Default for UnstableBackendBuilder<T> {
impl<T: Config> Default for ChainHeadBackendBuilder<T> {
fn default() -> Self {
Self::new()
}
}

impl<T: Config> UnstableBackendBuilder<T> {
/// Create a new [`UnstableBackendBuilder`].
impl<T: Config> ChainHeadBackendBuilder<T> {
/// Create a new [`ChainHeadBackendBuilder`].
pub fn new() -> Self {
Self {
max_block_life: usize::MAX,
Expand All @@ -73,15 +73,20 @@ impl<T: Config> UnstableBackendBuilder<T> {
self
}

/// Given an [`RpcClient`] to use to make requests, this returns a tuple of an [`UnstableBackend`],
/// which implements the [`Backend`] trait, and an [`UnstableBackendDriver`] which must be polled in
/// order for the backend to make progress.
/// A low-level API to build the backend and driver which requires polling the driver for the backend
/// to make progress.
///
/// This is useful if you want to manage the driver yourself, for example if you want to run it in on
/// a specific runtime.
///
/// If you just want to run the driver in the background until completion in on the default runtime,
/// use [`ChainHeadBackendBuilder::build_with_background_driver`] instead.
pub fn build(
self,
client: impl Into<RpcClient>,
) -> (UnstableBackend<T>, UnstableBackendDriver<T>) {
) -> (ChainHeadBackend<T>, ChainHeadBackendDriver<T>) {
// Construct the underlying follow_stream layers:
let rpc_methods = UnstableRpcMethods::new(client.into());
let rpc_methods = ChainHeadRpcMethods::new(client.into());
let follow_stream =
follow_stream::FollowStream::<T::Hash>::from_methods(rpc_methods.clone());
let follow_stream_unpin = follow_stream_unpin::FollowStreamUnpin::<T::Hash>::from_methods(
Expand All @@ -92,26 +97,61 @@ impl<T: Config> UnstableBackendBuilder<T> {
let follow_stream_driver = FollowStreamDriver::new(follow_stream_unpin);

// Wrap these into the backend and driver that we'll expose.
let backend = UnstableBackend {
let backend = ChainHeadBackend {
methods: rpc_methods,
follow_handle: follow_stream_driver.handle(),
};
let driver = UnstableBackendDriver {
let driver = ChainHeadBackendDriver {
driver: follow_stream_driver,
};

(backend, driver)
}

/// An API to build the backend and driver which will run in the background until completion
/// on the default runtime.
///
/// - On non-wasm targets, this will spawn the driver on `tokio`.
/// - On wasm targets, this will spawn the driver on `wasm-bindgen-futures`.
#[cfg(feature = "runtime")]
#[cfg_attr(docsrs, doc(cfg(feature = "runtime")))]
pub fn build_with_background_driver(self, client: impl Into<RpcClient>) -> ChainHeadBackend<T> {
fn spawn<F: std::future::Future + Send + 'static>(future: F) {
#[cfg(not(target_family = "wasm"))]
tokio::spawn(async move {
future.await;
});
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
wasm_bindgen_futures::spawn_local(async move {
future.await;
});
}

let (backend, mut driver) = self.build(client);

spawn(async move {
while let Some(res) = driver.next().await {
if let Err(e) = res {
if !e.is_disconnected_will_reconnect() {
tracing::debug!(target: "subxt", "chainHead driver was closed: {e}");
break;
}
}
}
});

backend
}
}

/// Driver for the [`UnstableBackend`]. This must be polled in order for the
/// Driver for the [`ChainHeadBackend`]. This must be polled in order for the
/// backend to make progress.
#[derive(Debug)]
pub struct UnstableBackendDriver<T: Config> {
pub struct ChainHeadBackendDriver<T: Config> {
driver: FollowStreamDriver<T::Hash>,
}

impl<T: Config> Stream for UnstableBackendDriver<T> {
impl<T: Config> Stream for ChainHeadBackendDriver<T> {
type Item = <FollowStreamDriver<T::Hash> as Stream>::Item;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
Expand All @@ -121,19 +161,19 @@ impl<T: Config> Stream for UnstableBackendDriver<T> {
}
}

/// The unstable backend.
/// The chainHead backend.
#[derive(Debug, Clone)]
pub struct UnstableBackend<T: Config> {
pub struct ChainHeadBackend<T: Config> {
// RPC methods we'll want to call:
methods: UnstableRpcMethods<T>,
methods: ChainHeadRpcMethods<T>,
// A handle to the chainHead_follow subscription:
follow_handle: FollowStreamDriverHandle<T::Hash>,
}

impl<T: Config> UnstableBackend<T> {
/// Configure and construct an [`UnstableBackend`] and the associated [`UnstableBackendDriver`].
pub fn builder() -> UnstableBackendBuilder<T> {
UnstableBackendBuilder::new()
impl<T: Config> ChainHeadBackend<T> {
/// Configure and construct an [`ChainHeadBackend`] and the associated [`ChainHeadBackendDriver`].
pub fn builder() -> ChainHeadBackendBuilder<T> {
ChainHeadBackendBuilder::new()
}

/// Stream block headers based on the provided filter fn
Expand Down Expand Up @@ -193,10 +233,10 @@ impl<Hash: BlockHash + 'static> From<follow_stream_unpin::BlockRef<Hash>> for Bl
}
}

impl<T: Config> super::sealed::Sealed for UnstableBackend<T> {}
impl<T: Config> super::sealed::Sealed for ChainHeadBackend<T> {}

#[async_trait]
impl<T: Config + Send + Sync + 'static> Backend<T> for UnstableBackend<T> {
impl<T: Config + Send + Sync + 'static> Backend<T> for ChainHeadBackend<T> {
async fn storage_fetch_values(
&self,
keys: Vec<Vec<u8>>,
Expand Down
Loading

0 comments on commit 3807b29

Please sign in to comment.