diff --git a/Cargo.lock b/Cargo.lock index 2e0d7b5e933..c963d6c844b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -387,6 +387,20 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "futures-core", + "getrandom 0.2.3", + "instant", + "pin-project-lite 0.2.7", + "rand 0.8.5", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.63" @@ -1575,6 +1589,7 @@ dependencies = [ "clap 3.1.2", "sc-cli", "sc-service", + "url 2.2.2", ] [[package]] @@ -1687,8 +1702,8 @@ version = "0.1.0" dependencies = [ "async-trait", "cumulus-primitives-core", + "cumulus-relay-chain-inprocess-interface", "cumulus-relay-chain-interface", - "cumulus-relay-chain-local", "cumulus-test-service", "derive_more", "futures 0.3.21", @@ -1701,6 +1716,7 @@ dependencies = [ "polkadot-primitives", "polkadot-service", "polkadot-test-client", + "portpicker", "sc-cli", "sc-client-api", "sc-service", @@ -1715,6 +1731,7 @@ dependencies = [ "substrate-test-utils", "tokio", "tracing", + "url 2.2.2", ] [[package]] @@ -1749,6 +1766,7 @@ dependencies = [ name = "cumulus-client-service" version = "0.1.0" dependencies = [ + "cumulus-client-cli", "cumulus-client-collator", "cumulus-client-consensus-common", "cumulus-client-pov-recovery", @@ -2009,6 +2027,37 @@ dependencies = [ "xcm", ] +[[package]] +name = "cumulus-relay-chain-inprocess-interface" +version = "0.1.0" +dependencies = [ + "async-trait", + "cumulus-primitives-core", + "cumulus-relay-chain-interface", + "cumulus-test-service", + "futures 0.3.21", + "futures-timer", + "parking_lot 0.12.0", + "polkadot-client", + "polkadot-primitives", + "polkadot-service", + "polkadot-test-client", + "sc-client-api", + "sc-consensus-babe", + "sc-network", + "sc-service", + "sc-telemetry", + "sc-tracing", + "sp-api", + "sp-blockchain", + "sp-consensus", + "sp-core", + "sp-keyring", + "sp-runtime", + "sp-state-machine", + "tracing", +] + [[package]] name = "cumulus-relay-chain-interface" version = "0.1.0" @@ -2017,8 +2066,11 @@ dependencies = [ "cumulus-primitives-core", "derive_more", "futures 0.3.21", + "jsonrpsee-core", + "parity-scale-codec", "parking_lot 0.12.0", "polkadot-overseer", + "polkadot-service", "sc-client-api", "sc-service", "sp-api", @@ -2030,34 +2082,28 @@ dependencies = [ ] [[package]] -name = "cumulus-relay-chain-local" +name = "cumulus-relay-chain-rpc-interface" version = "0.1.0" dependencies = [ "async-trait", + "backoff", "cumulus-primitives-core", "cumulus-relay-chain-interface", - "cumulus-test-service", "futures 0.3.21", "futures-timer", - "parking_lot 0.12.0", - "polkadot-client", - "polkadot-primitives", + "jsonrpsee 0.8.0", + "parity-scale-codec", + "parking_lot 0.11.2", "polkadot-service", - "polkadot-test-client", "sc-client-api", - "sc-consensus-babe", - "sc-network", - "sc-service", - "sc-telemetry", - "sc-tracing", + "sc-rpc-api", "sp-api", - "sp-blockchain", - "sp-consensus", "sp-core", - "sp-keyring", "sp-runtime", "sp-state-machine", + "sp-storage", "tracing", + "url 2.2.2", ] [[package]] @@ -2148,13 +2194,16 @@ version = "0.1.0" dependencies = [ "async-trait", "criterion", + "cumulus-client-cli", "cumulus-client-consensus-common", "cumulus-client-consensus-relay-chain", "cumulus-client-network", "cumulus-client-service", "cumulus-primitives-core", "cumulus-primitives-parachain-inherent", - "cumulus-relay-chain-local", + "cumulus-relay-chain-inprocess-interface", + "cumulus-relay-chain-interface", + "cumulus-relay-chain-rpc-interface", "cumulus-test-relay-validation-worker-provider", "cumulus-test-runtime", "frame-system", @@ -2167,6 +2216,7 @@ dependencies = [ "polkadot-primitives", "polkadot-service", "polkadot-test-service", + "portpicker", "rand 0.8.5", "sc-basic-authorship", "sc-chain-spec", @@ -2194,6 +2244,7 @@ dependencies = [ "substrate-test-client", "substrate-test-utils", "tokio", + "url 2.2.2", ] [[package]] @@ -3458,6 +3509,22 @@ dependencies = [ "webpki 0.21.4", ] +[[package]] +name = "hyper-rustls" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d87c48c02e0dc5e3b849a2041db3029fd066650f8f717c07bf8ed78ccb895cac" +dependencies = [ + "http", + "hyper", + "log", + "rustls 0.20.2", + "rustls-native-certs 0.6.1", + "tokio", + "tokio-rustls 0.23.2", + "webpki-roots 0.22.2", +] + [[package]] name = "idna" version = "0.1.5" @@ -3812,6 +3879,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05fd8cd6c6b1bbd06881d2cf88f1fc83cc36c98f2219090f839115fb4a956cb9" dependencies = [ "jsonrpsee-core", + "jsonrpsee-http-client", "jsonrpsee-proc-macros", "jsonrpsee-types 0.8.0", "jsonrpsee-ws-client 0.8.0", @@ -3861,6 +3929,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "jsonrpsee-http-client" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dce69e96aa236cc2e3a20467420b31cbc8464703aa95bc33a163d25b0f56023" +dependencies = [ + "async-trait", + "hyper", + "hyper-rustls 0.23.0", + "jsonrpsee-core", + "jsonrpsee-types 0.8.0", + "rustc-hash", + "serde", + "serde_json", + "thiserror", + "tokio", + "tracing", +] + [[package]] name = "jsonrpsee-proc-macros" version = "0.8.0" @@ -6463,8 +6550,9 @@ dependencies = [ "cumulus-client-service", "cumulus-primitives-core", "cumulus-primitives-parachain-inherent", + "cumulus-relay-chain-inprocess-interface", "cumulus-relay-chain-interface", - "cumulus-relay-chain-local", + "cumulus-relay-chain-rpc-interface", "derive_more", "frame-benchmarking", "frame-benchmarking-cli", @@ -7119,8 +7207,9 @@ dependencies = [ "cumulus-client-service", "cumulus-primitives-core", "cumulus-primitives-parachain-inherent", + "cumulus-relay-chain-inprocess-interface", "cumulus-relay-chain-interface", - "cumulus-relay-chain-local", + "cumulus-relay-chain-rpc-interface", "frame-benchmarking", "frame-benchmarking-cli", "futures 0.3.21", @@ -8352,6 +8441,15 @@ dependencies = [ "universal-hash", ] +[[package]] +name = "portpicker" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be97d76faf1bfab666e1375477b23fde79eccf0276e9b63b92a39d676a889ba9" +dependencies = [ + "rand 0.8.5", +] + [[package]] name = "ppv-lite86" version = "0.2.15" @@ -9793,7 +9891,7 @@ dependencies = [ "futures-timer", "hex", "hyper", - "hyper-rustls", + "hyper-rustls 0.22.1", "num_cpus", "once_cell", "parity-scale-codec", diff --git a/Cargo.toml b/Cargo.toml index 4ad17909ba2..31e3db293ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,8 @@ members = [ "client/pov-recovery", "client/service", "client/relay-chain-interface", - "client/relay-chain-local", + "client/relay-chain-inprocess-interface", + "client/relay-chain-rpc-interface", "pallets/aura-ext", "pallets/collator-selection", "pallets/dmp-queue", diff --git a/client/cli/Cargo.toml b/client/cli/Cargo.toml index 4166eaa4fd2..358143e683e 100644 --- a/client/cli/Cargo.toml +++ b/client/cli/Cargo.toml @@ -10,3 +10,4 @@ clap = { version = "3.1", features = ["derive"] } # Substrate dependencies sc-cli = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-service = { git = "https://github.com/paritytech/substrate", branch = "master" } +url = "2.2.2" diff --git a/client/cli/src/lib.rs b/client/cli/src/lib.rs index 51156d5e186..d7323e61a46 100644 --- a/client/cli/src/lib.rs +++ b/client/cli/src/lib.rs @@ -19,7 +19,6 @@ #![warn(missing_docs)] use clap::Parser; -use sc_cli; use sc_service::{ config::{PrometheusConfig, TelemetryEndpoints}, BasePath, TransactionPoolOptions, @@ -29,6 +28,7 @@ use std::{ io::{self, Write}, net::SocketAddr, }; +use url::Url; /// The `purge-chain` command used to remove the whole chain: the parachain and the relay chain. #[derive(Debug, Parser)] @@ -119,6 +119,19 @@ impl sc_cli::CliConfiguration for PurgeChainCmd { } } +fn validate_relay_chain_url(arg: &str) -> Result<(), String> { + let url = Url::parse(arg).map_err(|e| e.to_string())?; + + if url.scheme() == "ws" { + Ok(()) + } else { + Err(format!( + "'{}' URL scheme not supported. Only websocket RPC is currently supported", + url.scheme() + )) + } +} + /// The `run` command used to run a node. #[derive(Debug, Parser)] pub struct RunCmd { @@ -131,6 +144,23 @@ pub struct RunCmd { /// Note that this is the same as running with `--validator`. #[clap(long, conflicts_with = "validator")] pub collator: bool, + + /// EXPERIMENTAL: Specify an URL to a relay chain full node to communicate with. + #[clap( + long, + parse(try_from_str), + validator = validate_relay_chain_url, + conflicts_with = "collator", + conflicts_with = "validator" + )] + pub relay_chain_rpc_url: Option, +} + +/// Options only relevant for collator nodes +#[derive(Clone, Debug)] +pub struct CollatorOptions { + /// Location of relay chain full node + pub relay_chain_rpc_url: Option, } /// A non-redundant version of the `RunCmd` that sets the `validator` field when the @@ -150,6 +180,11 @@ impl RunCmd { NormalizedRunCmd { base: new_base } } + + /// Create [`CollatorOptions`] representing options only relevant to parachain collator nodes + pub fn collator_options(&self) -> CollatorOptions { + CollatorOptions { relay_chain_rpc_url: self.relay_chain_rpc_url.clone() } + } } impl sc_cli::CliConfiguration for NormalizedRunCmd { diff --git a/client/consensus/common/src/parachain_consensus.rs b/client/consensus/common/src/parachain_consensus.rs index ca330d92ac7..a4aa3d5beff 100644 --- a/client/consensus/common/src/parachain_consensus.rs +++ b/client/consensus/common/src/parachain_consensus.rs @@ -27,7 +27,7 @@ use sp_runtime::{ traits::{Block as BlockT, Header as HeaderT}, }; -use polkadot_primitives::v1::{Block as PBlock, Id as ParaId, OccupiedCoreAssumption}; +use polkadot_primitives::v1::{Hash as PHash, Id as ParaId, OccupiedCoreAssumption}; use codec::Decode; use futures::{select, FutureExt, Stream, StreamExt}; @@ -54,7 +54,7 @@ pub trait RelaychainClient: Clone + 'static { /// Returns the parachain head for the given `para_id` at the given block id. async fn parachain_head_at( &self, - at: &BlockId, + at: PHash, para_id: ParaId, ) -> RelayChainResult>>; } @@ -402,13 +402,7 @@ where .await? .filter_map(move |n| { let relay_chain = relay_chain.clone(); - async move { - relay_chain - .parachain_head_at(&BlockId::hash(n.hash()), para_id) - .await - .ok() - .flatten() - } + async move { relay_chain.parachain_head_at(n.hash(), para_id).await.ok().flatten() } }) .boxed(); Ok(new_best_notification_stream) @@ -422,13 +416,7 @@ where .await? .filter_map(move |n| { let relay_chain = relay_chain.clone(); - async move { - relay_chain - .parachain_head_at(&BlockId::hash(n.hash()), para_id) - .await - .ok() - .flatten() - } + async move { relay_chain.parachain_head_at(n.hash(), para_id).await.ok().flatten() } }) .boxed(); Ok(finality_notification_stream) @@ -436,7 +424,7 @@ where async fn parachain_head_at( &self, - at: &BlockId, + at: PHash, para_id: ParaId, ) -> RelayChainResult>> { self.persisted_validation_data(at, para_id, OccupiedCoreAssumption::TimedOut) diff --git a/client/consensus/common/src/tests.rs b/client/consensus/common/src/tests.rs index ceb60aa501e..c9099c354ea 100644 --- a/client/consensus/common/src/tests.rs +++ b/client/consensus/common/src/tests.rs @@ -25,7 +25,7 @@ use cumulus_test_client::{ }; use futures::{channel::mpsc, executor::block_on, select, FutureExt, Stream, StreamExt}; use futures_timer::Delay; -use polkadot_primitives::v1::{Block as PBlock, Id as ParaId}; +use polkadot_primitives::v1::Id as ParaId; use sc_client_api::UsageProvider; use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy}; use sp_blockchain::Error as ClientError; @@ -98,11 +98,7 @@ impl crate::parachain_consensus::RelaychainClient for Relaychain { Ok(Box::new(stream.map(|v| v.encode()))) } - async fn parachain_head_at( - &self, - _: &BlockId, - _: ParaId, - ) -> RelayChainResult>> { + async fn parachain_head_at(&self, _: PHash, _: ParaId) -> RelayChainResult>> { unimplemented!("Not required for tests") } } diff --git a/client/network/Cargo.toml b/client/network/Cargo.toml index 96105b33050..ea2dda69cff 100644 --- a/client/network/Cargo.toml +++ b/client/network/Cargo.toml @@ -32,12 +32,14 @@ derive_more = "0.99.2" async-trait = "0.1.52" [dev-dependencies] +portpicker = "0.1.1" +url = "2.2.2" tokio = { version = "1.17.0", features = ["macros"] } # Cumulus deps cumulus-test-service = { path = "../../test/service" } cumulus-primitives-core = { path = "../../primitives/core" } -cumulus-relay-chain-local = { path = "../relay-chain-local" } +cumulus-relay-chain-inprocess-interface = { path = "../relay-chain-inprocess-interface" } # Polkadot deps polkadot-test-client = { git = "https://github.com/paritytech/polkadot", branch = "master" } diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index 5f950ce08d1..61407e8ce0f 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -24,17 +24,14 @@ use sp_consensus::block_validation::{ BlockAnnounceValidator as BlockAnnounceValidatorT, Validation, }; use sp_core::traits::SpawnNamed; -use sp_runtime::{ - generic::BlockId, - traits::{Block as BlockT, Header as HeaderT}, -}; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; use cumulus_relay_chain_interface::RelayChainInterface; use polkadot_node_primitives::{CollationSecondedSignal, Statement}; use polkadot_parachain::primitives::HeadData; use polkadot_primitives::v1::{ - Block as PBlock, CandidateReceipt, CompactStatement, Hash as PHash, Id as ParaId, - OccupiedCoreAssumption, SigningContext, UncheckedSigned, + CandidateReceipt, CompactStatement, Hash as PHash, Id as ParaId, OccupiedCoreAssumption, + SigningContext, UncheckedSigned, }; use codec::{Decode, DecodeAll, Encode}; @@ -133,9 +130,8 @@ impl BlockAnnounceData { { let validator_index = self.statement.unchecked_validator_index(); - let runtime_api_block_id = BlockId::Hash(self.relay_parent); let session_index = - match relay_chain_client.session_index_for_child(&runtime_api_block_id).await { + match relay_chain_client.session_index_for_child(self.relay_parent).await { Ok(r) => r, Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))), }; @@ -143,7 +139,7 @@ impl BlockAnnounceData { let signing_context = SigningContext { parent_hash: self.relay_parent, session_index }; // Check that the signer is a legit validator. - let authorities = match relay_chain_client.validators(&runtime_api_block_id).await { + let authorities = match relay_chain_client.validators(self.relay_parent).await { Ok(r) => r, Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))), }; @@ -160,7 +156,7 @@ impl BlockAnnounceData { }; // Check statement is correctly signed. - if self.statement.try_into_checked(&signing_context, &signer).is_err() { + if self.statement.try_into_checked(&signing_context, signer).is_err() { tracing::debug!( target: LOG_TARGET, "Block announcement justification signature is invalid.", @@ -231,11 +227,7 @@ where { /// Create a new [`BlockAnnounceValidator`]. pub fn new(relay_chain_interface: RCInterface, para_id: ParaId) -> Self { - Self { - phantom: Default::default(), - relay_chain_interface: relay_chain_interface.clone(), - para_id, - } + Self { phantom: Default::default(), relay_chain_interface, para_id } } } @@ -246,11 +238,11 @@ where /// Get the included block of the given parachain in the relay chain. async fn included_block( relay_chain_interface: &RCInterface, - block_id: &BlockId, + hash: PHash, para_id: ParaId, ) -> Result { let validation_data = relay_chain_interface - .persisted_validation_data(block_id, para_id, OccupiedCoreAssumption::TimedOut) + .persisted_validation_data(hash, para_id, OccupiedCoreAssumption::TimedOut) .await .map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)? .ok_or_else(|| { @@ -269,11 +261,11 @@ where /// Get the backed block hash of the given parachain in the relay chain. async fn backed_block_hash( relay_chain_interface: &RCInterface, - block_id: &BlockId, + hash: PHash, para_id: ParaId, ) -> Result, BoxedError> { let candidate_receipt = relay_chain_interface - .candidate_pending_availability(block_id, para_id) + .candidate_pending_availability(hash, para_id) .await .map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?; @@ -293,14 +285,13 @@ where .best_block_hash() .await .map_err(|e| Box::new(e) as Box<_>)?; - let runtime_api_block_id = BlockId::Hash(relay_chain_best_hash); let block_number = header.number(); let best_head = - Self::included_block(&relay_chain_interface, &runtime_api_block_id, para_id).await?; + Self::included_block(&relay_chain_interface, relay_chain_best_hash, para_id).await?; let known_best_number = best_head.number(); let backed_block = || async { - Self::backed_block_hash(&relay_chain_interface, &runtime_api_block_id, para_id).await + Self::backed_block_hash(&relay_chain_interface, relay_chain_best_hash, para_id).await }; if best_head == header { diff --git a/client/network/src/tests.rs b/client/network/src/tests.rs index bd52fc0b93b..da6ad75f106 100644 --- a/client/network/src/tests.rs +++ b/client/network/src/tests.rs @@ -16,8 +16,8 @@ use super::*; use async_trait::async_trait; +use cumulus_relay_chain_inprocess_interface::{check_block_in_chain, BlockCheckStatus}; use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult}; -use cumulus_relay_chain_local::{check_block_in_chain, BlockCheckStatus}; use cumulus_test_service::runtime::{Block, Hash, Header}; use futures::{executor::block_on, poll, task::Poll, FutureExt, Stream, StreamExt}; use parking_lot::Mutex; @@ -77,23 +77,10 @@ impl DummyRelayChainInterface { #[async_trait] impl RelayChainInterface for DummyRelayChainInterface { - async fn validators( - &self, - _: &cumulus_primitives_core::relay_chain::BlockId, - ) -> RelayChainResult> { + async fn validators(&self, _: PHash) -> RelayChainResult> { Ok(self.data.lock().validators.clone()) } - async fn block_status( - &self, - block_id: cumulus_primitives_core::relay_chain::BlockId, - ) -> RelayChainResult { - self.relay_backend - .blockchain() - .status(block_id) - .map_err(RelayChainError::BlockchainError) - } - async fn best_block_hash(&self) -> RelayChainResult { Ok(self.relay_backend.blockchain().info().best_hash) } @@ -116,7 +103,7 @@ impl RelayChainInterface for DummyRelayChainInterface { async fn persisted_validation_data( &self, - _: &cumulus_primitives_core::relay_chain::BlockId, + _: PHash, _: ParaId, _: OccupiedCoreAssumption, ) -> RelayChainResult> { @@ -128,7 +115,7 @@ impl RelayChainInterface for DummyRelayChainInterface { async fn candidate_pending_availability( &self, - _: &cumulus_primitives_core::relay_chain::BlockId, + _: PHash, _: ParaId, ) -> RelayChainResult> { if self.data.lock().has_pending_availability { @@ -159,10 +146,7 @@ impl RelayChainInterface for DummyRelayChainInterface { } } - async fn session_index_for_child( - &self, - _: &cumulus_primitives_core::relay_chain::BlockId, - ) -> RelayChainResult { + async fn session_index_for_child(&self, _: PHash) -> RelayChainResult { Ok(0) } @@ -196,7 +180,7 @@ impl RelayChainInterface for DummyRelayChainInterface { async fn get_storage_by_key( &self, - _: &polkadot_service::BlockId, + _: PHash, _: &[u8], ) -> RelayChainResult> { unimplemented!("Not needed for test") @@ -204,7 +188,7 @@ impl RelayChainInterface for DummyRelayChainInterface { async fn prove_read( &self, - _: &polkadot_service::BlockId, + _: PHash, _: &Vec>, ) -> RelayChainResult { unimplemented!("Not needed for test") @@ -293,10 +277,7 @@ async fn make_gossip_message_and_header( Some(&Sr25519Keyring::Alice.to_seed()), ) .unwrap(); - let session_index = relay_chain_interface - .session_index_for_child(&BlockId::Hash(relay_parent)) - .await - .unwrap(); + let session_index = relay_chain_interface.session_index_for_child(relay_parent).await.unwrap(); let signing_context = SigningContext { parent_hash: relay_parent, session_index }; let header = default_header(); @@ -477,10 +458,7 @@ async fn check_statement_seconded() { Some(&Sr25519Keyring::Alice.to_seed()), ) .unwrap(); - let session_index = relay_chain_interface - .session_index_for_child(&BlockId::Hash(relay_parent)) - .await - .unwrap(); + let session_index = relay_chain_interface.session_index_for_child(relay_parent).await.unwrap(); let signing_context = SigningContext { parent_hash: relay_parent, session_index }; let statement = Statement::Valid(Default::default()); diff --git a/client/network/tests/sync.rs b/client/network/tests/sync_blocks_from_tip_without_connected_collator.rs similarity index 73% rename from client/network/tests/sync.rs rename to client/network/tests/sync_blocks_from_tip_without_connected_collator.rs index 1ab1d4399be..e0566788823 100644 --- a/client/network/tests/sync.rs +++ b/client/network/tests/sync_blocks_from_tip_without_connected_collator.rs @@ -16,6 +16,7 @@ use cumulus_primitives_core::ParaId; use cumulus_test_service::{initial_head_data, run_relay_chain_validator_node, Keyring::*}; +use futures::join; #[substrate_test_utils::test] #[ignore] @@ -27,12 +28,24 @@ async fn sync_blocks_from_tip_without_being_connected_to_a_collator() { let para_id = ParaId::from(100); let tokio_handle = tokio::runtime::Handle::current(); + let ws_port = portpicker::pick_unused_port().expect("No free ports"); // start alice - let alice = run_relay_chain_validator_node(tokio_handle.clone(), Alice, || {}, Vec::new()); + let alice = run_relay_chain_validator_node( + tokio_handle.clone(), + Alice, + || {}, + Vec::new(), + Some(ws_port), + ); // start bob - let bob = - run_relay_chain_validator_node(tokio_handle.clone(), Bob, || {}, vec![alice.addr.clone()]); + let bob = run_relay_chain_validator_node( + tokio_handle.clone(), + Bob, + || {}, + vec![alice.addr.clone()], + None, + ); // register parachain alice @@ -62,12 +75,21 @@ async fn sync_blocks_from_tip_without_being_connected_to_a_collator() { .await; // run eve as parachain full node that is only connected to dave - let eve = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle, Eve) + let eve = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle.clone(), Eve) .connect_to_parachain_node(&dave) .exclusively_connect_to_registered_parachain_nodes() .connect_to_relay_chain_nodes(vec![&alice, &bob]) .build() .await; - eve.wait_for_blocks(7).await; + // run eve as parachain full node that is only connected to dave + let ferdie = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle, Ferdie) + .connect_to_parachain_node(&dave) + .exclusively_connect_to_registered_parachain_nodes() + .connect_to_relay_chain_nodes(vec![&alice, &bob]) + .use_external_relay_chain_node_at_port(ws_port) + .build() + .await; + + join!(ferdie.wait_for_blocks(7), eve.wait_for_blocks(7)); } diff --git a/client/pov-recovery/src/lib.rs b/client/pov-recovery/src/lib.rs index d5d1a19b1d9..6c96961e911 100644 --- a/client/pov-recovery/src/lib.rs +++ b/client/pov-recovery/src/lib.rs @@ -451,9 +451,9 @@ async fn pending_candidates( let filtered_stream = import_notification_stream.filter_map(move |n| { let client_for_closure = relay_chain_client.clone(); async move { - let block_id = BlockId::hash(n.hash()); + let hash = n.hash(); let pending_availability_result = client_for_closure - .candidate_pending_availability(&block_id, para_id) + .candidate_pending_availability(hash, para_id) .await .map_err(|e| { tracing::error!( @@ -463,7 +463,7 @@ async fn pending_candidates( ) }); let session_index_result = - client_for_closure.session_index_for_child(&block_id).await.map_err(|e| { + client_for_closure.session_index_for_child(hash).await.map_err(|e| { tracing::error!( target: LOG_TARGET, error = ?e, diff --git a/client/pov-recovery/tests/pov_recovery.rs b/client/pov-recovery/tests/pov_recovery.rs index 0b536bfd712..2994d40225c 100644 --- a/client/pov-recovery/tests/pov_recovery.rs +++ b/client/pov-recovery/tests/pov_recovery.rs @@ -39,6 +39,7 @@ async fn pov_recovery() { Alice, || {}, Vec::new(), + None, ); // Start bob @@ -47,6 +48,7 @@ async fn pov_recovery() { Bob, || {}, vec![alice.addr.clone()], + None, ); // Register parachain diff --git a/client/relay-chain-local/Cargo.toml b/client/relay-chain-inprocess-interface/Cargo.toml similarity index 95% rename from client/relay-chain-local/Cargo.toml rename to client/relay-chain-inprocess-interface/Cargo.toml index d99e6f99d86..ba1c5fa5302 100644 --- a/client/relay-chain-local/Cargo.toml +++ b/client/relay-chain-inprocess-interface/Cargo.toml @@ -1,6 +1,6 @@ [package] authors = ["Parity Technologies "] -name = "cumulus-relay-chain-local" +name = "cumulus-relay-chain-inprocess-interface" version = "0.1.0" edition = "2021" @@ -28,7 +28,7 @@ sc-tracing = { git = "https://github.com/paritytech/substrate", branch = "master parking_lot = "0.12.0" tracing = "0.1.31" async-trait = "0.1.52" -futures = { version = "0.3.1", features = ["compat"] } +futures = "0.3.21" futures-timer = "3.0.2" [dev-dependencies] diff --git a/client/relay-chain-local/src/lib.rs b/client/relay-chain-inprocess-interface/src/lib.rs similarity index 89% rename from client/relay-chain-local/src/lib.rs rename to client/relay-chain-inprocess-interface/src/lib.rs index 09f15a5a51e..e6df0ea0ad2 100644 --- a/client/relay-chain-local/src/lib.rs +++ b/client/relay-chain-inprocess-interface/src/lib.rs @@ -46,15 +46,15 @@ use sp_state_machine::{Backend as StateBackend, StorageValue}; const TIMEOUT_IN_SECONDS: u64 = 6; /// Provides an implementation of the [`RelayChainInterface`] using a local in-process relay chain node. -pub struct RelayChainLocal { +pub struct RelayChainInProcessInterface { full_client: Arc, backend: Arc, sync_oracle: Arc>>, overseer_handle: Option, } -impl RelayChainLocal { - /// Create a new instance of [`RelayChainLocal`] +impl RelayChainInProcessInterface { + /// Create a new instance of [`RelayChainInProcessInterface`] pub fn new( full_client: Arc, backend: Arc, @@ -65,7 +65,7 @@ impl RelayChainLocal { } } -impl Clone for RelayChainLocal { +impl Clone for RelayChainInProcessInterface { fn clone(&self) -> Self { Self { full_client: self.full_client.clone(), @@ -77,7 +77,7 @@ impl Clone for RelayChainLocal { } #[async_trait] -impl RelayChainInterface for RelayChainLocal +impl RelayChainInterface for RelayChainInProcessInterface where Client: ProvideRuntimeApi + BlockchainEvents @@ -113,12 +113,12 @@ where async fn persisted_validation_data( &self, - block_id: &BlockId, + hash: PHash, para_id: ParaId, occupied_core_assumption: OccupiedCoreAssumption, ) -> RelayChainResult> { Ok(self.full_client.runtime_api().persisted_validation_data( - block_id, + &BlockId::Hash(hash), para_id, occupied_core_assumption, )?) @@ -126,21 +126,21 @@ where async fn candidate_pending_availability( &self, - block_id: &BlockId, + hash: PHash, para_id: ParaId, ) -> RelayChainResult> { Ok(self .full_client .runtime_api() - .candidate_pending_availability(block_id, para_id)?) + .candidate_pending_availability(&BlockId::Hash(hash), para_id)?) } - async fn session_index_for_child(&self, block_id: &BlockId) -> RelayChainResult { - Ok(self.full_client.runtime_api().session_index_for_child(block_id)?) + async fn session_index_for_child(&self, hash: PHash) -> RelayChainResult { + Ok(self.full_client.runtime_api().session_index_for_child(&BlockId::Hash(hash))?) } - async fn validators(&self, block_id: &BlockId) -> RelayChainResult> { - Ok(self.full_client.runtime_api().validators(block_id)?) + async fn validators(&self, hash: PHash) -> RelayChainResult> { + Ok(self.full_client.runtime_api().validators(&BlockId::Hash(hash))?) } async fn import_notification_stream( @@ -167,10 +167,6 @@ where Ok(self.backend.blockchain().info().best_hash) } - async fn block_status(&self, block_id: BlockId) -> RelayChainResult { - Ok(self.backend.blockchain().status(block_id)?) - } - async fn is_major_syncing(&self) -> RelayChainResult { let mut network = self.sync_oracle.lock(); Ok(network.is_major_syncing()) @@ -182,19 +178,21 @@ where async fn get_storage_by_key( &self, - block_id: &BlockId, + relay_parent: PHash, key: &[u8], ) -> RelayChainResult> { - let state = self.backend.state_at(*block_id)?; + let block_id = BlockId::Hash(relay_parent); + let state = self.backend.state_at(block_id)?; state.storage(key).map_err(RelayChainError::GenericError) } async fn prove_read( &self, - block_id: &BlockId, + relay_parent: PHash, relevant_keys: &Vec>, ) -> RelayChainResult { - let state_backend = self.backend.state_at(*block_id)?; + let block_id = BlockId::Hash(relay_parent); + let state_backend = self.backend.state_at(block_id)?; sp_state_machine::prove_read(state_backend, relevant_keys) .map_err(RelayChainError::StateMachineError) @@ -271,9 +269,9 @@ where let _lock = backend.get_import_lock().read(); let block_id = BlockId::Hash(hash); - match backend.blockchain().status(block_id)? { - BlockStatus::InChain => return Ok(BlockCheckStatus::InChain), - _ => {}, + + if backend.blockchain().status(block_id)? == BlockStatus::InChain { + return Ok(BlockCheckStatus::InChain) } let listener = client.import_notification_stream(); @@ -282,25 +280,25 @@ where } /// Builder for a concrete relay chain interface, created from a full node. Builds -/// a [`RelayChainLocal`] to access relay chain data necessary for parachain operation. +/// a [`RelayChainInProcessInterface`] to access relay chain data necessary for parachain operation. /// /// The builder takes a [`polkadot_client::Client`] /// that wraps a concrete instance. By using [`polkadot_client::ExecuteWithClient`] -/// the builder gets access to this concrete instance and instantiates a [`RelayChainLocal`] with it. -struct RelayChainLocalBuilder { +/// the builder gets access to this concrete instance and instantiates a [`RelayChainInProcessInterface`] with it. +struct RelayChainInProcessInterfaceBuilder { polkadot_client: polkadot_client::Client, backend: Arc, sync_oracle: Arc>>, overseer_handle: Option, } -impl RelayChainLocalBuilder { +impl RelayChainInProcessInterfaceBuilder { pub fn build(self) -> Arc { self.polkadot_client.clone().execute_with(self) } } -impl ExecuteWithClient for RelayChainLocalBuilder { +impl ExecuteWithClient for RelayChainInProcessInterfaceBuilder { type Output = Arc; fn execute_with_client(self, client: Arc) -> Self::Output @@ -314,7 +312,12 @@ impl ExecuteWithClient for RelayChainLocalBuilder { + Send, Client::Api: ParachainHost + BabeApi, { - Arc::new(RelayChainLocal::new(client, self.backend, self.sync_oracle, self.overseer_handle)) + Arc::new(RelayChainInProcessInterface::new( + client, + self.backend, + self.sync_oracle, + self.overseer_handle, + )) } } @@ -346,7 +349,7 @@ fn build_polkadot_full_node( } /// Builds a relay chain interface by constructing a full relay chain node -pub fn build_relay_chain_interface( +pub fn build_inprocess_relay_chain( polkadot_config: Configuration, telemetry_worker_handle: Option, task_manager: &mut TaskManager, @@ -361,7 +364,7 @@ pub fn build_relay_chain_interface( let sync_oracle: Box = Box::new(full_node.network.clone()); let sync_oracle = Arc::new(Mutex::new(sync_oracle)); - let relay_chain_interface_builder = RelayChainLocalBuilder { + let relay_chain_interface_builder = RelayChainInProcessInterfaceBuilder { polkadot_client: full_node.client.clone(), backend: full_node.backend.clone(), sync_oracle, @@ -402,7 +405,8 @@ mod tests { } } - fn build_client_backend_and_block() -> (Arc, PBlock, RelayChainLocal) { + fn build_client_backend_and_block( + ) -> (Arc, PBlock, RelayChainInProcessInterface) { let builder = TestClientBuilder::new().set_execution_strategy(ExecutionStrategy::NativeWhenPossible); let backend = builder.backend(); @@ -415,7 +419,7 @@ mod tests { ( client.clone(), block, - RelayChainLocal::new( + RelayChainInProcessInterface::new( client, backend.clone(), Arc::new(Mutex::new(dummy_network)), diff --git a/client/relay-chain-interface/Cargo.toml b/client/relay-chain-interface/Cargo.toml index 8c5b253693c..754b3bc0fe7 100644 --- a/client/relay-chain-interface/Cargo.toml +++ b/client/relay-chain-interface/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" [dependencies] polkadot-overseer = { git = "https://github.com/paritytech/polkadot", branch = "master" } +polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "master" } cumulus-primitives-core = { path = "../../primitives/core" } @@ -17,8 +18,10 @@ sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = " sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sc-service = { git = "https://github.com/paritytech/substrate", branch = "master" } -futures = "0.3.1" +futures = "0.3.21" parking_lot = "0.12.0" derive_more = "0.99.2" async-trait = "0.1.52" thiserror = "1.0.30" +jsonrpsee-core = "0.8.0" +parity-scale-codec = "3.0.0" diff --git a/client/relay-chain-interface/src/lib.rs b/client/relay-chain-interface/src/lib.rs index 3c45b5a6e40..44885ec5f36 100644 --- a/client/relay-chain-interface/src/lib.rs +++ b/client/relay-chain-interface/src/lib.rs @@ -19,16 +19,18 @@ use std::{collections::BTreeMap, pin::Pin, sync::Arc}; use cumulus_primitives_core::{ relay_chain::{ v1::{CommittedCandidateReceipt, OccupiedCoreAssumption, SessionIndex, ValidatorId}, - BlockId, Hash as PHash, Header as PHeader, InboundHrmpMessage, + Hash as PHash, Header as PHeader, InboundHrmpMessage, }, InboundDownwardMessage, ParaId, PersistedValidationData, }; use polkadot_overseer::Handle as OverseerHandle; -use sc_client_api::{blockchain::BlockStatus, StorageProof}; +use sc_client_api::StorageProof; use futures::Stream; use async_trait::async_trait; +use jsonrpsee_core::Error as JsonRPSeeError; +use parity_scale_codec::Error as CodecError; use sp_api::ApiError; use sp_state_machine::StorageValue; @@ -36,37 +38,48 @@ pub type RelayChainResult = Result; #[derive(thiserror::Error, Debug)] pub enum RelayChainError { - #[error("Error occurred while calling relay chain runtime: {0:?}")] + #[error("Error occured while calling relay chain runtime: {0}")] ApiError(#[from] ApiError), #[error("Timeout while waiting for relay-chain block `{0}` to be imported.")] WaitTimeout(PHash), #[error("Import listener closed while waiting for relay-chain block `{0}` to be imported.")] ImportListenerClosed(PHash), - #[error("Blockchain returned an error while waiting for relay-chain block `{0}` to be imported: {1:?}")] + #[error("Blockchain returned an error while waiting for relay-chain block `{0}` to be imported: {1}")] WaitBlockchainError(PHash, sp_blockchain::Error), - #[error("Blockchain returned an error: {0:?}")] + #[error("Blockchain returned an error: {0}")] BlockchainError(#[from] sp_blockchain::Error), - #[error("State machine error occurred: {0:?}")] + #[error("State machine error occured: {0}")] StateMachineError(Box), - #[error("Unspecified error occurred: {0:?}")] + #[error("Unable to call RPC method '{0}' due to error: {1}")] + RPCCallError(String, JsonRPSeeError), + #[error("RPC Error: '{0}'")] + JsonRPCError(#[from] JsonRPSeeError), + #[error("Scale codec deserialization error: {0}")] + DeserializationError(CodecError), + #[error("Scale codec deserialization error: {0}")] + ServiceError(#[from] polkadot_service::Error), + #[error("Unspecified error occured: {0}")] GenericError(String), } +impl From for RelayChainError { + fn from(e: CodecError) -> Self { + RelayChainError::DeserializationError(e) + } +} + /// Trait that provides all necessary methods for interaction between collator and relay chain. #[async_trait] pub trait RelayChainInterface: Send + Sync { /// Fetch a storage item by key. async fn get_storage_by_key( &self, - block_id: &BlockId, + relay_parent: PHash, key: &[u8], ) -> RelayChainResult>; /// Fetch a vector of current validators. - async fn validators(&self, block_id: &BlockId) -> RelayChainResult>; - - /// Get the status of a given block. - async fn block_status(&self, block_id: BlockId) -> RelayChainResult; + async fn validators(&self, block_id: PHash) -> RelayChainResult>; /// Get the hash of the current best block. async fn best_block_hash(&self) -> RelayChainResult; @@ -98,7 +111,7 @@ pub trait RelayChainInterface: Send + Sync { /// and the para already occupies a core. async fn persisted_validation_data( &self, - block_id: &BlockId, + block_id: PHash, para_id: ParaId, _: OccupiedCoreAssumption, ) -> RelayChainResult>; @@ -107,12 +120,12 @@ pub trait RelayChainInterface: Send + Sync { /// assigned to occupied cores in `availability_cores` and `None` otherwise. async fn candidate_pending_availability( &self, - block_id: &BlockId, + block_id: PHash, para_id: ParaId, ) -> RelayChainResult>; /// Returns the session index expected at a child of the block. - async fn session_index_for_child(&self, block_id: &BlockId) -> RelayChainResult; + async fn session_index_for_child(&self, block_id: PHash) -> RelayChainResult; /// Get a stream of import block notifications. async fn import_notification_stream( @@ -145,7 +158,7 @@ pub trait RelayChainInterface: Send + Sync { /// Generate a storage read proof. async fn prove_read( &self, - block_id: &BlockId, + relay_parent: PHash, relevant_keys: &Vec>, ) -> RelayChainResult; } @@ -173,7 +186,7 @@ where async fn persisted_validation_data( &self, - block_id: &BlockId, + block_id: PHash, para_id: ParaId, occupied_core_assumption: OccupiedCoreAssumption, ) -> RelayChainResult> { @@ -184,17 +197,17 @@ where async fn candidate_pending_availability( &self, - block_id: &BlockId, + block_id: PHash, para_id: ParaId, ) -> RelayChainResult> { (**self).candidate_pending_availability(block_id, para_id).await } - async fn session_index_for_child(&self, block_id: &BlockId) -> RelayChainResult { + async fn session_index_for_child(&self, block_id: PHash) -> RelayChainResult { (**self).session_index_for_child(block_id).await } - async fn validators(&self, block_id: &BlockId) -> RelayChainResult> { + async fn validators(&self, block_id: PHash) -> RelayChainResult> { (**self).validators(block_id).await } @@ -214,10 +227,6 @@ where (**self).best_block_hash().await } - async fn block_status(&self, block_id: BlockId) -> RelayChainResult { - (**self).block_status(block_id).await - } - async fn is_major_syncing(&self) -> RelayChainResult { (**self).is_major_syncing().await } @@ -228,18 +237,18 @@ where async fn get_storage_by_key( &self, - block_id: &BlockId, + relay_parent: PHash, key: &[u8], ) -> RelayChainResult> { - (**self).get_storage_by_key(block_id, key).await + (**self).get_storage_by_key(relay_parent, key).await } async fn prove_read( &self, - block_id: &BlockId, + relay_parent: PHash, relevant_keys: &Vec>, ) -> RelayChainResult { - (**self).prove_read(block_id, relevant_keys).await + (**self).prove_read(relay_parent, relevant_keys).await } async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()> { diff --git a/client/relay-chain-rpc-interface/Cargo.toml b/client/relay-chain-rpc-interface/Cargo.toml new file mode 100644 index 00000000000..f17d7be0ceb --- /dev/null +++ b/client/relay-chain-rpc-interface/Cargo.toml @@ -0,0 +1,30 @@ +[package] +authors = ["Parity Technologies "] +name = "cumulus-relay-chain-rpc-interface" +version = "0.1.0" +edition = "2021" + + +[dependencies] +polkadot-service = { git = "https://github.com/paritytech/polkadot", branch = "master" } + +cumulus-primitives-core = { path = "../../primitives/core" } +cumulus-relay-chain-interface = { path = "../relay-chain-interface" } + +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-state-machine = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-storage = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-rpc-api = { git = "https://github.com/paritytech/substrate", branch = "master" } + +futures = "0.3.21" +futures-timer = "3.0.2" +parity-scale-codec = "3.0.0" +parking_lot = "0.11.1" +jsonrpsee = { version = "0.8.0", features = ["client"] } +tracing = "0.1.25" +async-trait = "0.1.52" +url = "2.2.2" +backoff = { version = "0.4.0", features = ["tokio"] } diff --git a/client/relay-chain-rpc-interface/src/lib.rs b/client/relay-chain-rpc-interface/src/lib.rs new file mode 100644 index 00000000000..dbc3927201a --- /dev/null +++ b/client/relay-chain-rpc-interface/src/lib.rs @@ -0,0 +1,472 @@ +// Copyright 2022 Parity Technologies (UK) Ltd. +// This file is part of Cumulus. + +// Cumulus is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Cumulus is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Cumulus. If not, see . + +use async_trait::async_trait; +use backoff::{future::retry_notify, ExponentialBackoff}; +use core::time::Duration; +use cumulus_primitives_core::{ + relay_chain::{ + v1::{CommittedCandidateReceipt, OccupiedCoreAssumption, SessionIndex, ValidatorId}, + Hash as PHash, Header as PHeader, InboundHrmpMessage, + }, + InboundDownwardMessage, ParaId, PersistedValidationData, +}; +use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; +use futures::{FutureExt, Stream, StreamExt}; +use jsonrpsee::{ + core::{ + client::{Client as JsonRPCClient, ClientT, Subscription, SubscriptionClientT}, + Error as JsonRpseeError, + }, + rpc_params, + types::ParamsSer, + ws_client::WsClientBuilder, +}; +use parity_scale_codec::{Decode, Encode}; +use polkadot_service::Handle; +use sc_client_api::{StorageData, StorageProof}; +use sc_rpc_api::{state::ReadProof, system::Health}; +use sp_core::sp_std::collections::btree_map::BTreeMap; +use sp_runtime::DeserializeOwned; +use sp_state_machine::StorageValue; +use sp_storage::StorageKey; +use std::{pin::Pin, sync::Arc}; + +pub use url::Url; + +const LOG_TARGET: &str = "relay-chain-rpc-interface"; +const TIMEOUT_IN_SECONDS: u64 = 6; + +/// Client that maps RPC methods and deserializes results +#[derive(Clone)] +struct RelayChainRPCClient { + /// Websocket client to make calls + ws_client: Arc, + + /// Retry strategy that should be used for requests and subscriptions + retry_strategy: ExponentialBackoff, +} + +impl RelayChainRPCClient { + pub async fn new(url: Url) -> RelayChainResult { + tracing::info!(target: LOG_TARGET, url = %url.to_string(), "Initializing RPC Client"); + let ws_client = WsClientBuilder::default().build(url.as_str()).await?; + + Ok(RelayChainRPCClient { + ws_client: Arc::new(ws_client), + retry_strategy: ExponentialBackoff::default(), + }) + } + + /// Call a call to `state_call` rpc method. + async fn call_remote_runtime_function( + &self, + method_name: &str, + hash: PHash, + payload: Option, + ) -> RelayChainResult { + let payload_bytes = + payload.map_or(sp_core::Bytes(Vec::new()), |v| sp_core::Bytes(v.encode())); + let params = rpc_params! { + method_name, + payload_bytes, + hash + }; + let res = self + .request_tracing::("state_call", params, |err| { + tracing::trace!( + target: LOG_TARGET, + %method_name, + %hash, + error = %err, + "Error during call to 'state_call'.", + ); + }) + .await?; + Decode::decode(&mut &*res.0).map_err(Into::into) + } + + /// Subscribe to a notification stream via RPC + async fn subscribe<'a, R>( + &self, + sub_name: &'a str, + unsub_name: &'a str, + params: Option>, + ) -> RelayChainResult> + where + R: DeserializeOwned, + { + self.ws_client + .subscribe::(sub_name, params, unsub_name) + .await + .map_err(|err| RelayChainError::RPCCallError(sub_name.to_string(), err)) + } + + /// Perform RPC request + async fn request<'a, R>( + &self, + method: &'a str, + params: Option>, + ) -> Result + where + R: DeserializeOwned + std::fmt::Debug, + { + self.request_tracing( + method, + params, + |e| tracing::trace!(target:LOG_TARGET, error = %e, %method, "Unable to complete RPC request"), + ) + .await + } + + /// Perform RPC request + async fn request_tracing<'a, R, OR>( + &self, + method: &'a str, + params: Option>, + trace_error: OR, + ) -> Result + where + R: DeserializeOwned + std::fmt::Debug, + OR: Fn(&jsonrpsee::core::Error), + { + retry_notify( + self.retry_strategy.clone(), + || async { + self.ws_client.request(method, params.clone()).await.map_err(|err| match err { + JsonRpseeError::Transport(_) => + backoff::Error::Transient { err, retry_after: None }, + _ => backoff::Error::Permanent(err), + }) + }, + |error, dur| tracing::trace!(target: LOG_TARGET, %error, ?dur, "Encountered transport error, retrying."), + ) + .await + .map_err(|err| { + trace_error(&err); + RelayChainError::RPCCallError(method.to_string(), err)}) + } + + async fn system_health(&self) -> Result { + self.request("system_health", None).await + } + + async fn state_get_read_proof( + &self, + storage_keys: Vec, + at: Option, + ) -> Result, RelayChainError> { + let params = rpc_params!(storage_keys, at); + self.request("state_getReadProof", params).await + } + + async fn state_get_storage( + &self, + storage_key: StorageKey, + at: Option, + ) -> Result, RelayChainError> { + let params = rpc_params!(storage_key, at); + self.request("state_getStorage", params).await + } + + async fn chain_get_head(&self) -> Result { + self.request("chain_getHead", None).await + } + + async fn chain_get_header( + &self, + hash: Option, + ) -> Result, RelayChainError> { + let params = rpc_params!(hash); + self.request("chain_getHeader", params).await + } + + async fn parachain_host_candidate_pending_availability( + &self, + at: PHash, + para_id: ParaId, + ) -> Result, RelayChainError> { + self.call_remote_runtime_function( + "ParachainHost_candidate_pending_availability", + at, + Some(para_id), + ) + .await + } + + async fn parachain_host_session_index_for_child( + &self, + at: PHash, + ) -> Result { + self.call_remote_runtime_function("ParachainHost_session_index_for_child", at, None::<()>) + .await + } + + async fn parachain_host_validators( + &self, + at: PHash, + ) -> Result, RelayChainError> { + self.call_remote_runtime_function("ParachainHost_validators", at, None::<()>) + .await + } + + async fn parachain_host_persisted_validation_data( + &self, + at: PHash, + para_id: ParaId, + occupied_core_assumption: OccupiedCoreAssumption, + ) -> Result, RelayChainError> { + self.call_remote_runtime_function( + "ParachainHost_persisted_validation_data", + at, + Some((para_id, occupied_core_assumption)), + ) + .await + } + + async fn parachain_host_inbound_hrmp_channels_contents( + &self, + para_id: ParaId, + at: PHash, + ) -> Result>, RelayChainError> { + self.call_remote_runtime_function( + "ParachainHost_inbound_hrmp_channels_contents", + at, + Some(para_id), + ) + .await + } + + async fn parachain_host_dmq_contents( + &self, + para_id: ParaId, + at: PHash, + ) -> Result, RelayChainError> { + self.call_remote_runtime_function("ParachainHost_dmq_contents", at, Some(para_id)) + .await + } + + async fn subscribe_all_heads(&self) -> Result, RelayChainError> { + self.subscribe::("chain_subscribeAllHeads", "chain_unsubscribeAllHeads", None) + .await + } + + async fn subscribe_new_best_heads(&self) -> Result, RelayChainError> { + self.subscribe::("chain_subscribeNewHeads", "chain_unsubscribeNewHeads", None) + .await + } + + async fn subscribe_finalized_heads(&self) -> Result, RelayChainError> { + self.subscribe::( + "chain_subscribeFinalizedHeads", + "chain_unsubscribeFinalizedHeads", + None, + ) + .await + } +} + +/// RelayChainRPCInterface is used to interact with a full node that is running locally +/// in the same process. +#[derive(Clone)] +pub struct RelayChainRPCInterface { + rpc_client: RelayChainRPCClient, +} + +impl RelayChainRPCInterface { + pub async fn new(url: Url) -> RelayChainResult { + Ok(Self { rpc_client: RelayChainRPCClient::new(url).await? }) + } +} + +#[async_trait] +impl RelayChainInterface for RelayChainRPCInterface { + async fn retrieve_dmq_contents( + &self, + para_id: ParaId, + relay_parent: PHash, + ) -> RelayChainResult> { + self.rpc_client.parachain_host_dmq_contents(para_id, relay_parent).await + } + + async fn retrieve_all_inbound_hrmp_channel_contents( + &self, + para_id: ParaId, + relay_parent: PHash, + ) -> RelayChainResult>> { + self.rpc_client + .parachain_host_inbound_hrmp_channels_contents(para_id, relay_parent) + .await + } + + async fn persisted_validation_data( + &self, + hash: PHash, + para_id: ParaId, + occupied_core_assumption: OccupiedCoreAssumption, + ) -> RelayChainResult> { + self.rpc_client + .parachain_host_persisted_validation_data(hash, para_id, occupied_core_assumption) + .await + } + + async fn candidate_pending_availability( + &self, + hash: PHash, + para_id: ParaId, + ) -> RelayChainResult> { + self.rpc_client + .parachain_host_candidate_pending_availability(hash, para_id) + .await + } + + async fn session_index_for_child(&self, hash: PHash) -> RelayChainResult { + self.rpc_client.parachain_host_session_index_for_child(hash).await + } + + async fn validators(&self, block_id: PHash) -> RelayChainResult> { + self.rpc_client.parachain_host_validators(block_id).await + } + + async fn import_notification_stream( + &self, + ) -> RelayChainResult + Send>>> { + let imported_headers_stream = + self.rpc_client.subscribe_all_heads().await?.filter_map(|item| async move { + item.map_err(|err| { + tracing::error!( + target: LOG_TARGET, + "Encountered error in import notification stream: {}", + err + ) + }) + .ok() + }); + + Ok(imported_headers_stream.boxed()) + } + + async fn finality_notification_stream( + &self, + ) -> RelayChainResult + Send>>> { + let imported_headers_stream = self + .rpc_client + .subscribe_finalized_heads() + .await? + .filter_map(|item| async move { + item.map_err(|err| { + tracing::error!( + target: LOG_TARGET, + "Encountered error in finality notification stream: {}", + err + ) + }) + .ok() + }); + + Ok(imported_headers_stream.boxed()) + } + + async fn best_block_hash(&self) -> RelayChainResult { + self.rpc_client.chain_get_head().await + } + + async fn is_major_syncing(&self) -> RelayChainResult { + self.rpc_client.system_health().await.map(|h| h.is_syncing) + } + + fn overseer_handle(&self) -> RelayChainResult> { + unimplemented!("Overseer handle is not available on relay-chain-rpc-interface"); + } + + async fn get_storage_by_key( + &self, + relay_parent: PHash, + key: &[u8], + ) -> RelayChainResult> { + let storage_key = StorageKey(key.to_vec()); + self.rpc_client + .state_get_storage(storage_key, Some(relay_parent)) + .await + .map(|storage_data| storage_data.map(|sv| sv.0)) + } + + async fn prove_read( + &self, + relay_parent: PHash, + relevant_keys: &Vec>, + ) -> RelayChainResult { + let cloned = relevant_keys.clone(); + let storage_keys: Vec = cloned.into_iter().map(StorageKey).collect(); + + self.rpc_client + .state_get_read_proof(storage_keys, Some(relay_parent)) + .await + .map(|read_proof| { + let bytes = read_proof.proof.into_iter().map(|bytes| bytes.to_vec()).collect(); + StorageProof::new(bytes) + }) + } + + /// Wait for a given relay chain block + /// + /// The hash of the block to wait for is passed. We wait for the block to arrive or return after a timeout. + /// + /// Implementation: + /// 1. Register a listener to all new blocks. + /// 2. Check if the block is already in chain. If yes, succeed early. + /// 3. Wait for the block to be imported via subscription. + /// 4. If timeout is reached, we return an error. + async fn wait_for_block(&self, wait_for_hash: PHash) -> RelayChainResult<()> { + let mut head_stream = self.rpc_client.subscribe_all_heads().await?; + + if self.rpc_client.chain_get_header(Some(wait_for_hash)).await?.is_some() { + return Ok(()) + } + + let mut timeout = futures_timer::Delay::new(Duration::from_secs(TIMEOUT_IN_SECONDS)).fuse(); + + loop { + futures::select! { + _ = timeout => return Err(RelayChainError::WaitTimeout(wait_for_hash)), + evt = head_stream.next().fuse() => match evt { + Some(Ok(evt)) if evt.hash() == wait_for_hash => return Ok(()), + // Not the event we waited on. + Some(_) => continue, + None => return Err(RelayChainError::ImportListenerClosed(wait_for_hash)), + } + } + } + } + + async fn new_best_notification_stream( + &self, + ) -> RelayChainResult + Send>>> { + let imported_headers_stream = + self.rpc_client.subscribe_new_best_heads().await?.filter_map(|item| async move { + item.map_err(|err| { + tracing::error!( + target: LOG_TARGET, + "Error in best block notification stream: {}", + err + ) + }) + .ok() + }); + + Ok(imported_headers_stream.boxed()) + } +} diff --git a/client/service/Cargo.toml b/client/service/Cargo.toml index b5f02fc2b62..7978919b613 100644 --- a/client/service/Cargo.toml +++ b/client/service/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" cumulus-client-consensus-common = { path = "../consensus/common" } cumulus-client-collator = { path = "../collator" } cumulus-client-pov-recovery = { path = "../pov-recovery" } +cumulus-client-cli = { path = "../cli" } cumulus-relay-chain-interface = { path = "../relay-chain-interface" } cumulus-primitives-core = { path = "../../primitives/core" } diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 08cd8584f22..c850775caab 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -18,6 +18,7 @@ //! //! Provides functions for starting a collator node or a normal full node. +use cumulus_client_cli::CollatorOptions; use cumulus_client_consensus_common::ParachainConsensus; use cumulus_primitives_core::{CollectCollationInfo, ParaId}; use cumulus_relay_chain_interface::RelayChainInterface; @@ -151,6 +152,7 @@ pub struct StartFullNodeParams<'a, Block: BlockT, Client, RCInterface, IQ> { pub announce_block: Arc>) + Send + Sync>, pub relay_chain_slot_duration: Duration, pub import_queue: IQ, + pub collator_options: CollatorOptions, } /// Start a full node for a parachain. @@ -166,6 +168,7 @@ pub fn start_full_node( para_id, relay_chain_slot_duration, import_queue, + collator_options, }: StartFullNodeParams, ) -> sc_service::error::Result<()> where @@ -193,6 +196,14 @@ where .spawn_essential_handle() .spawn("cumulus-consensus", None, consensus); + // PoV Recovery is currently not supported when we connect to the + // relay chain via RPC, so we return early. The node will work, but not be able to recover PoVs from the + // relay chain if blocks are not announced on parachain. This will be enabled again once + // https://github.com/paritytech/cumulus/issues/545 is finished. + if collator_options.relay_chain_rpc_url.is_some() { + return Ok(()) + } + let overseer_handle = relay_chain_interface .overseer_handle() .map_err(|e| sc_service::Error::Application(Box::new(e)))? diff --git a/parachain-template/node/Cargo.toml b/parachain-template/node/Cargo.toml index 321d1c9a2a0..afeb3ecbfcc 100644 --- a/parachain-template/node/Cargo.toml +++ b/parachain-template/node/Cargo.toml @@ -92,7 +92,8 @@ cumulus-client-service = { path = "../../client/service" } cumulus-primitives-core = { path = "../../primitives/core" } cumulus-primitives-parachain-inherent = { path = "../../primitives/parachain-inherent" } cumulus-relay-chain-interface = { path = "../../client/relay-chain-interface" } -cumulus-relay-chain-local = { path = "../../client/relay-chain-local" } +cumulus-relay-chain-inprocess-interface = { path = "../../client/relay-chain-inprocess-interface" } +cumulus-relay-chain-rpc-interface = { path = "../../client/relay-chain-rpc-interface" } # Polkadot dependencies polkadot-cli = { git = "https://github.com/paritytech/polkadot", branch = "master" } diff --git a/parachain-template/node/src/command.rs b/parachain-template/node/src/command.rs index 2d62cf4428b..9f3124a35fc 100644 --- a/parachain-template/node/src/command.rs +++ b/parachain-template/node/src/command.rs @@ -260,6 +260,7 @@ pub fn run() -> Result<()> { }, None => { let runner = cli.create_runner(&cli.run.normalize())?; + let collator_options = cli.run.collator_options(); runner.run_node_until_exit(|config| async move { let para_id = chain_spec::Extensions::try_get(&*config.chain_spec) @@ -292,7 +293,7 @@ pub fn run() -> Result<()> { info!("Parachain genesis state: {}", genesis_state); info!("Is collating: {}", if config.role.is_authority() { "yes" } else { "no" }); - crate::service::start_parachain_node(config, polkadot_config, id) + crate::service::start_parachain_node(config, polkadot_config, collator_options, id) .await .map(|r| r.0) .map_err(Into::into) diff --git a/parachain-template/node/src/service.rs b/parachain-template/node/src/service.rs index 317826054ac..f7093222af2 100644 --- a/parachain-template/node/src/service.rs +++ b/parachain-template/node/src/service.rs @@ -3,6 +3,7 @@ // std use std::{sync::Arc, time::Duration}; +use cumulus_client_cli::CollatorOptions; // Local Runtime Types use parachain_template_runtime::{ opaque::Block, AccountId, Balance, Hash, Index as Nonce, RuntimeApi, @@ -16,8 +17,9 @@ use cumulus_client_service::{ prepare_node_config, start_collator, start_full_node, StartCollatorParams, StartFullNodeParams, }; use cumulus_primitives_core::ParaId; -use cumulus_relay_chain_interface::RelayChainInterface; -use cumulus_relay_chain_local::build_relay_chain_interface; +use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain; +use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; +use cumulus_relay_chain_rpc_interface::RelayChainRPCInterface; // Substrate Imports use sc_client_api::ExecutorProvider; @@ -30,6 +32,8 @@ use sp_keystore::SyncCryptoStorePtr; use sp_runtime::traits::BlakeTwo256; use substrate_prometheus_endpoint::Registry; +use polkadot_service::CollatorPair; + /// Native executor instance. pub struct TemplateRuntimeExecutor; @@ -160,6 +164,26 @@ where Ok(params) } +async fn build_relay_chain_interface( + polkadot_config: Configuration, + telemetry_worker_handle: Option, + task_manager: &mut TaskManager, + collator_options: CollatorOptions, +) -> RelayChainResult<(Arc<(dyn RelayChainInterface + 'static)>, Option)> { + match collator_options.relay_chain_rpc_url { + Some(relay_chain_url) => + Ok((Arc::new(RelayChainRPCInterface::new(relay_chain_url).await?) as Arc<_>, None)), + None => { + let relay_chain_local = build_inprocess_relay_chain( + polkadot_config, + telemetry_worker_handle, + task_manager, + )?; + Ok((relay_chain_local.0, Some(relay_chain_local.1))) + }, + } +} + /// Start a node with the given parachain `Configuration` and relay chain `Configuration`. /// /// This is the actual implementation that is abstract over the executor and the runtime api. @@ -167,6 +191,7 @@ where async fn start_node_impl( parachain_config: Configuration, polkadot_config: Configuration, + collator_options: CollatorOptions, id: ParaId, _rpc_ext_builder: RB, build_import_queue: BIQ, @@ -240,12 +265,17 @@ where let backend = params.backend.clone(); let mut task_manager = params.task_manager; - let (relay_chain_interface, collator_key) = - build_relay_chain_interface(polkadot_config, telemetry_worker_handle, &mut task_manager) - .map_err(|e| match e { - polkadot_service::Error::Sub(x) => x, - s => format!("{}", s).into(), - })?; + let (relay_chain_interface, collator_key) = build_relay_chain_interface( + polkadot_config, + telemetry_worker_handle, + &mut task_manager, + collator_options.clone(), + ) + .await + .map_err(|e| match e { + RelayChainError::ServiceError(polkadot_service::Error::Sub(x)) => x, + s => s.to_string().into(), + })?; let block_announce_validator = BlockAnnounceValidator::new(relay_chain_interface.clone(), id); @@ -327,7 +357,7 @@ where spawner, parachain_consensus, import_queue, - collator_key, + collator_key: collator_key.expect("Command line arguments do not allow this. qed"), relay_chain_slot_duration, }; @@ -341,6 +371,7 @@ where relay_chain_interface, relay_chain_slot_duration, import_queue, + collator_options, }; start_full_node(params)?; @@ -401,6 +432,7 @@ pub fn parachain_build_import_queue( pub async fn start_parachain_node( parachain_config: Configuration, polkadot_config: Configuration, + collator_options: CollatorOptions, id: ParaId, ) -> sc_service::error::Result<( TaskManager, @@ -409,6 +441,7 @@ pub async fn start_parachain_node( start_node_impl::( parachain_config, polkadot_config, + collator_options, id, |_| Ok(Default::default()), parachain_build_import_queue, diff --git a/polkadot-parachains/Cargo.toml b/polkadot-parachains/Cargo.toml index d93acc9c895..1bcf254e8d8 100644 --- a/polkadot-parachains/Cargo.toml +++ b/polkadot-parachains/Cargo.toml @@ -79,7 +79,9 @@ cumulus-client-network = { path = "../client/network" } cumulus-primitives-core = { path = "../primitives/core" } cumulus-primitives-parachain-inherent = { path = "../primitives/parachain-inherent" } cumulus-relay-chain-interface = { path = "../client/relay-chain-interface" } -cumulus-relay-chain-local = { path = "../client/relay-chain-local" } +cumulus-relay-chain-inprocess-interface = { path = "../client/relay-chain-inprocess-interface" } +cumulus-relay-chain-rpc-interface = { path = "../client/relay-chain-rpc-interface" } + # Polkadot dependencies polkadot-primitives = { git = "https://github.com/paritytech/polkadot", branch = "master" } diff --git a/polkadot-parachains/src/cli.rs b/polkadot-parachains/src/cli.rs index 2226111ac41..cceb34d927b 100644 --- a/polkadot-parachains/src/cli.rs +++ b/polkadot-parachains/src/cli.rs @@ -109,7 +109,7 @@ pub struct Cli { pub run: cumulus_client_cli::RunCmd, /// Relay chain arguments - #[clap(raw = true)] + #[clap(raw = true, conflicts_with = "relay-chain-rpc-url")] pub relaychain_args: Vec, } diff --git a/polkadot-parachains/src/command.rs b/polkadot-parachains/src/command.rs index caa2531dbc3..382769190e4 100644 --- a/polkadot-parachains/src/command.rs +++ b/polkadot-parachains/src/command.rs @@ -493,6 +493,7 @@ pub fn run() -> Result<()> { Some(Subcommand::Key(cmd)) => Ok(cmd.run(&cli)?), None => { let runner = cli.create_runner(&cli.run.normalize())?; + let collator_options = cli.run.collator_options(); runner.run_node_until_exit(|config| async move { let para_id = chain_spec::Extensions::try_get(&*config.chain_spec) @@ -534,7 +535,7 @@ pub fn run() -> Result<()> { statemint_runtime::RuntimeApi, StatemintRuntimeExecutor, StatemintAuraId, - >(config, polkadot_config, id) + >(config, polkadot_config, collator_options, id) .await .map(|r| r.0) .map_err(Into::into) @@ -543,7 +544,7 @@ pub fn run() -> Result<()> { statemine_runtime::RuntimeApi, StatemineRuntimeExecutor, AuraId, - >(config, polkadot_config, id) + >(config, polkadot_config, collator_options, id) .await .map(|r| r.0) .map_err(Into::into) @@ -552,7 +553,7 @@ pub fn run() -> Result<()> { westmint_runtime::RuntimeApi, WestmintRuntimeExecutor, AuraId, - >(config, polkadot_config, id) + >(config, polkadot_config, collator_options, id) .await .map(|r| r.0) .map_err(Into::into) @@ -560,7 +561,7 @@ pub fn run() -> Result<()> { crate::service::start_shell_node::< shell_runtime::RuntimeApi, ShellRuntimeExecutor, - >(config, polkadot_config, id) + >(config, polkadot_config, collator_options, id) .await .map(|r| r.0) .map_err(Into::into) @@ -568,20 +569,30 @@ pub fn run() -> Result<()> { crate::service::start_shell_node::< seedling_runtime::RuntimeApi, SeedlingRuntimeExecutor, - >(config, polkadot_config, id) + >(config, polkadot_config, collator_options, id) .await .map(|r| r.0) .map_err(Into::into) } else if config.chain_spec.is_canvas_kusama() { - crate::service::start_canvas_kusama_node(config, polkadot_config, id) - .await - .map(|r| r.0) - .map_err(Into::into) + crate::service::start_canvas_kusama_node( + config, + polkadot_config, + collator_options, + id, + ) + .await + .map(|r| r.0) + .map_err(Into::into) } else { - crate::service::start_rococo_parachain_node(config, polkadot_config, id) - .await - .map(|r| r.0) - .map_err(Into::into) + crate::service::start_rococo_parachain_node( + config, + polkadot_config, + collator_options, + id, + ) + .await + .map(|r| r.0) + .map_err(Into::into) } }) }, diff --git a/polkadot-parachains/src/service.rs b/polkadot-parachains/src/service.rs index 2fe31635a67..9a47a7b0f0d 100644 --- a/polkadot-parachains/src/service.rs +++ b/polkadot-parachains/src/service.rs @@ -15,6 +15,7 @@ // along with Cumulus. If not, see . use codec::Codec; +use cumulus_client_cli::CollatorOptions; use cumulus_client_consensus_aura::{AuraConsensus, BuildAuraConsensusParams, SlotProportion}; use cumulus_client_consensus_common::{ ParachainBlockImport, ParachainCandidate, ParachainConsensus, @@ -27,9 +28,11 @@ use cumulus_primitives_core::{ relay_chain::v1::{Hash as PHash, PersistedValidationData}, ParaId, }; -use cumulus_relay_chain_interface::RelayChainInterface; -use cumulus_relay_chain_local::build_relay_chain_interface; -use polkadot_service::NativeExecutionDispatch; +use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain; +use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; +use cumulus_relay_chain_rpc_interface::RelayChainRPCInterface; +use polkadot_service::{CollatorPair, NativeExecutionDispatch}; +use sp_core::Pair; use crate::rpc; pub use parachains_common::{AccountId, Balance, Block, BlockNumber, Hash, Header, Index as Nonce}; @@ -48,7 +51,6 @@ use sc_telemetry::{Telemetry, TelemetryHandle, TelemetryWorker, TelemetryWorkerH use sp_api::{ApiExt, ConstructRuntimeApi}; use sp_consensus::CacheKeyId; use sp_consensus_aura::AuraApi; -use sp_core::crypto::Pair; use sp_keystore::SyncCryptoStorePtr; use sp_runtime::{ app_crypto::AppKey, @@ -277,6 +279,26 @@ where Ok(params) } +async fn build_relay_chain_interface( + polkadot_config: Configuration, + telemetry_worker_handle: Option, + task_manager: &mut TaskManager, + collator_options: CollatorOptions, +) -> RelayChainResult<(Arc<(dyn RelayChainInterface + 'static)>, Option)> { + match collator_options.relay_chain_rpc_url { + Some(relay_chain_url) => + Ok((Arc::new(RelayChainRPCInterface::new(relay_chain_url).await?) as Arc<_>, None)), + None => { + let relay_chain_local = build_inprocess_relay_chain( + polkadot_config, + telemetry_worker_handle, + task_manager, + )?; + Ok((relay_chain_local.0, Some(relay_chain_local.1))) + }, + } +} + /// Start a shell node with the given parachain `Configuration` and relay chain `Configuration`. /// /// This is the actual implementation that is abstract over the executor and the runtime api for shell nodes. @@ -284,6 +306,7 @@ where async fn start_shell_node_impl( parachain_config: Configuration, polkadot_config: Configuration, + collator_options: CollatorOptions, id: ParaId, rpc_ext_builder: RB, build_import_queue: BIQ, @@ -356,12 +379,17 @@ where let mut task_manager = params.task_manager; - let (relay_chain_interface, collator_key) = - build_relay_chain_interface(polkadot_config, telemetry_worker_handle, &mut task_manager) - .map_err(|e| match e { - polkadot_service::Error::Sub(x) => x, - s => format!("{}", s).into(), - })?; + let (relay_chain_interface, collator_key) = build_relay_chain_interface( + polkadot_config, + telemetry_worker_handle, + &mut task_manager, + collator_options.clone(), + ) + .await + .map_err(|e| match e { + RelayChainError::ServiceError(polkadot_service::Error::Sub(x)) => x, + s => s.to_string().into(), + })?; let block_announce_validator = BlockAnnounceValidator::new(relay_chain_interface.clone(), id); @@ -431,7 +459,7 @@ where spawner, parachain_consensus, import_queue, - collator_key, + collator_key: collator_key.expect("Command line arguments do not allow this. qed"), relay_chain_slot_duration, }; @@ -445,6 +473,7 @@ where relay_chain_interface, relay_chain_slot_duration, import_queue, + collator_options, }; start_full_node(params)?; @@ -462,6 +491,7 @@ where async fn start_node_impl( parachain_config: Configuration, polkadot_config: Configuration, + collator_options: CollatorOptions, id: ParaId, _rpc_ext_builder: RB, build_import_queue: BIQ, @@ -535,12 +565,17 @@ where let backend = params.backend.clone(); let mut task_manager = params.task_manager; - let (relay_chain_interface, collator_key) = - build_relay_chain_interface(polkadot_config, telemetry_worker_handle, &mut task_manager) - .map_err(|e| match e { - polkadot_service::Error::Sub(x) => x, - s => format!("{}", s).into(), - })?; + let (relay_chain_interface, collator_key) = build_relay_chain_interface( + polkadot_config, + telemetry_worker_handle, + &mut task_manager, + collator_options.clone(), + ) + .await + .map_err(|e| match e { + RelayChainError::ServiceError(polkadot_service::Error::Sub(x)) => x, + s => s.to_string().into(), + })?; let block_announce_validator = BlockAnnounceValidator::new(relay_chain_interface.clone(), id); @@ -622,7 +657,7 @@ where spawner, parachain_consensus, import_queue, - collator_key, + collator_key: collator_key.expect("Command line arguments do not allow this. qed"), relay_chain_slot_duration, }; @@ -636,6 +671,7 @@ where relay_chain_interface, relay_chain_slot_duration, import_queue, + collator_options, }; start_full_node(params)?; @@ -705,6 +741,7 @@ pub fn rococo_parachain_build_import_queue( pub async fn start_rococo_parachain_node( parachain_config: Configuration, polkadot_config: Configuration, + collator_options: CollatorOptions, id: ParaId, ) -> sc_service::error::Result<( TaskManager, @@ -719,6 +756,7 @@ pub async fn start_rococo_parachain_node( start_node_impl::( parachain_config, polkadot_config, + collator_options, id, |_| Ok(Default::default()), rococo_parachain_build_import_queue, @@ -842,6 +880,7 @@ where pub async fn start_shell_node( parachain_config: Configuration, polkadot_config: Configuration, + collator_options: CollatorOptions, id: ParaId, ) -> sc_service::error::Result<( TaskManager, @@ -867,6 +906,7 @@ where start_shell_node_impl::( parachain_config, polkadot_config, + collator_options, id, |_| Ok(Default::default()), shell_build_import_queue, @@ -1112,6 +1152,7 @@ where pub async fn start_statemint_node( parachain_config: Configuration, polkadot_config: Configuration, + collator_options: CollatorOptions, id: ParaId, ) -> sc_service::error::Result<( TaskManager, @@ -1142,6 +1183,7 @@ where start_node_impl::( parachain_config, polkadot_config, + collator_options, id, |_| Ok(Default::default()), statemint_build_import_queue::<_, _, AuraId>, @@ -1277,6 +1319,7 @@ where async fn start_canvas_kusama_node_impl( parachain_config: Configuration, polkadot_config: Configuration, + collator_options: CollatorOptions, id: ParaId, _rpc_ext_builder: RB, build_import_queue: BIQ, @@ -1351,12 +1394,17 @@ where let backend = params.backend.clone(); let mut task_manager = params.task_manager; - let (relay_chain_interface, collator_key) = - build_relay_chain_interface(polkadot_config, telemetry_worker_handle, &mut task_manager) - .map_err(|e| match e { - polkadot_service::Error::Sub(x) => x, - s => format!("{}", s).into(), - })?; + let (relay_chain_interface, collator_key) = build_relay_chain_interface( + polkadot_config, + telemetry_worker_handle, + &mut task_manager, + collator_options.clone(), + ) + .await + .map_err(|e| match e { + RelayChainError::ServiceError(polkadot_service::Error::Sub(x)) => x, + s => s.to_string().into(), + })?; let block_announce_validator = BlockAnnounceValidator::new(relay_chain_interface.clone(), id); @@ -1438,7 +1486,7 @@ where spawner, parachain_consensus, import_queue, - collator_key, + collator_key: collator_key.expect("Command line arguments do not allow this. qed"), relay_chain_slot_duration, }; @@ -1452,6 +1500,7 @@ where relay_chain_interface, relay_chain_slot_duration, import_queue, + collator_options, }; start_full_node(params)?; @@ -1521,6 +1570,7 @@ pub fn canvas_kusama_build_import_queue( pub async fn start_canvas_kusama_node( parachain_config: Configuration, polkadot_config: Configuration, + collator_options: CollatorOptions, id: ParaId, ) -> sc_service::error::Result<( TaskManager, @@ -1541,6 +1591,7 @@ pub async fn start_canvas_kusama_node( >( parachain_config, polkadot_config, + collator_options, id, |_| Ok(Default::default()), canvas_kusama_build_import_queue, diff --git a/primitives/parachain-inherent/src/client_side.rs b/primitives/parachain-inherent/src/client_side.rs index b14c2257654..d585a83fd1f 100644 --- a/primitives/parachain-inherent/src/client_side.rs +++ b/primitives/parachain-inherent/src/client_side.rs @@ -23,7 +23,6 @@ use cumulus_primitives_core::{ ParaId, PersistedValidationData, }; use cumulus_relay_chain_interface::RelayChainInterface; -use sp_runtime::generic::BlockId; const LOG_TARGET: &str = "parachain-inherent"; @@ -36,10 +35,9 @@ async fn collect_relay_storage_proof( ) -> Option { use relay_chain::well_known_keys as relay_well_known_keys; - let relay_parent_block_id = BlockId::Hash(relay_parent); let ingress_channels = relay_chain_interface .get_storage_by_key( - &relay_parent_block_id, + relay_parent, &relay_well_known_keys::hrmp_ingress_channel_index(para_id), ) .await @@ -68,7 +66,7 @@ async fn collect_relay_storage_proof( let egress_channels = relay_chain_interface .get_storage_by_key( - &relay_parent_block_id, + relay_parent, &relay_well_known_keys::hrmp_egress_channel_index(para_id), ) .await @@ -111,12 +109,12 @@ async fn collect_relay_storage_proof( })); relay_chain_interface - .prove_read(&relay_parent_block_id, &relevant_keys) + .prove_read(relay_parent, &relevant_keys) .await .map_err(|e| { tracing::error!( target: LOG_TARGET, - relay_parent = ?relay_parent_block_id, + relay_parent = ?relay_parent, error = ?e, "Cannot obtain read proof from relay chain.", ); diff --git a/test/service/Cargo.toml b/test/service/Cargo.toml index b2e8dfb31ac..a6dfd1ce4cf 100644 --- a/test/service/Cargo.toml +++ b/test/service/Cargo.toml @@ -47,12 +47,15 @@ polkadot-test-service = { git = "https://github.com/paritytech/polkadot", branch cumulus-client-consensus-relay-chain = { path = "../../client/consensus/relay-chain" } cumulus-client-network = { path = "../../client/network" } cumulus-client-service = { path = "../../client/service" } +cumulus-client-cli = { path = "../../client/cli" } cumulus-client-consensus-common = { path = "../../client/consensus/common" } cumulus-primitives-core = { path = "../../primitives/core" } cumulus-primitives-parachain-inherent = { path = "../../primitives/parachain-inherent" } cumulus-test-runtime = { path = "../runtime" } cumulus-test-relay-validation-worker-provider = { path = "../relay-validation-worker-provider" } -cumulus-relay-chain-local = { path = "../../client/relay-chain-local" } +cumulus-relay-chain-inprocess-interface = { path = "../../client/relay-chain-inprocess-interface" } +cumulus-relay-chain-interface = { path = "../../client/relay-chain-interface" } +cumulus-relay-chain-rpc-interface = { path = "../../client/relay-chain-rpc-interface" } criterion = { version = "0.3.5", features = [ "async_tokio" ] } @@ -60,9 +63,11 @@ parking_lot = "0.12.0" # RPC related dependencies jsonrpc-core = "18.0.0" +url = "2.2.2" [dev-dependencies] futures = "0.3.5" +portpicker = "0.1.1" # Polkadot dependencies polkadot-test-service = { git = "https://github.com/paritytech/polkadot", branch = "master" } diff --git a/test/service/benches/transaction_throughput.rs b/test/service/benches/transaction_throughput.rs index 17722e6bd60..29aba41022b 100644 --- a/test/service/benches/transaction_throughput.rs +++ b/test/service/benches/transaction_throughput.rs @@ -151,6 +151,7 @@ fn transaction_throughput_benchmarks(c: &mut Criterion) { Alice, || {}, vec![], + None, ); // Start bob @@ -159,6 +160,7 @@ fn transaction_throughput_benchmarks(c: &mut Criterion) { Bob, || {}, vec![alice.addr.clone()], + None, ); // Register parachain diff --git a/test/service/src/lib.rs b/test/service/src/lib.rs index 750cb7d881f..54d745c80c0 100644 --- a/test/service/src/lib.rs +++ b/test/service/src/lib.rs @@ -21,15 +21,23 @@ mod chain_spec; mod genesis; -use std::{future::Future, time::Duration}; +use std::{ + future::Future, + net::{IpAddr, Ipv4Addr, SocketAddr}, + time::Duration, +}; +use url::Url; +use cumulus_client_cli::CollatorOptions; use cumulus_client_consensus_common::{ParachainCandidate, ParachainConsensus}; use cumulus_client_network::BlockAnnounceValidator; use cumulus_client_service::{ prepare_node_config, start_collator, start_full_node, StartCollatorParams, StartFullNodeParams, }; use cumulus_primitives_core::ParaId; -use cumulus_relay_chain_local::RelayChainLocal; +use cumulus_relay_chain_inprocess_interface::RelayChainInProcessInterface; +use cumulus_relay_chain_interface::{RelayChainError, RelayChainInterface, RelayChainResult}; +use cumulus_relay_chain_rpc_interface::RelayChainRPCInterface; use cumulus_test_runtime::{Hash, Header, NodeBlock as Block, RuntimeApi}; use parking_lot::Mutex; @@ -167,6 +175,35 @@ pub fn new_partial( Ok(params) } +async fn build_relay_chain_interface( + relay_chain_config: Configuration, + collator_key: Option, + collator_options: CollatorOptions, + task_manager: &mut TaskManager, +) -> RelayChainResult> { + if let Some(relay_chain_url) = collator_options.relay_chain_rpc_url { + return Ok(Arc::new(RelayChainRPCInterface::new(relay_chain_url).await?) as Arc<_>) + } + + let relay_chain_full_node = polkadot_test_service::new_full( + relay_chain_config, + if let Some(ref key) = collator_key { + polkadot_service::IsCollator::Yes(key.clone()) + } else { + polkadot_service::IsCollator::Yes(CollatorPair::generate().0) + }, + None, + )?; + + task_manager.add_child(relay_chain_full_node.task_manager); + Ok(Arc::new(RelayChainInProcessInterface::new( + relay_chain_full_node.client.clone(), + relay_chain_full_node.backend.clone(), + Arc::new(Mutex::new(Box::new(relay_chain_full_node.network.clone()))), + relay_chain_full_node.overseer_handle.clone(), + )) as Arc<_>) +} + /// Start a node with the given parachain `Configuration` and relay chain `Configuration`. /// /// This is the actual implementation that is abstract over the executor and the runtime api. @@ -179,6 +216,7 @@ async fn start_node_impl( wrap_announce_block: Option AnnounceBlockFn>>, rpc_ext_builder: RB, consensus: Consensus, + collator_options: CollatorOptions, ) -> sc_service::error::Result<( TaskManager, Arc, @@ -202,31 +240,21 @@ where let transaction_pool = params.transaction_pool.clone(); let mut task_manager = params.task_manager; - let relay_chain_full_node = polkadot_test_service::new_full( + let client = params.client.clone(); + let backend = params.backend.clone(); + + let relay_chain_interface = build_relay_chain_interface( relay_chain_config, - if let Some(ref key) = collator_key { - polkadot_service::IsCollator::Yes(key.clone()) - } else { - polkadot_service::IsCollator::Yes(CollatorPair::generate().0) - }, - None, + collator_key.clone(), + collator_options.clone(), + &mut task_manager, ) + .await .map_err(|e| match e { - polkadot_service::Error::Sub(x) => x, + RelayChainError::ServiceError(polkadot_service::Error::Sub(x)) => x, s => s.to_string().into(), })?; - let client = params.client.clone(); - let backend = params.backend.clone(); - - let relay_chain_interface = Arc::new(RelayChainLocal::new( - relay_chain_full_node.client.clone(), - relay_chain_full_node.backend.clone(), - Arc::new(Mutex::new(Box::new(relay_chain_full_node.network.clone()))), - relay_chain_full_node.overseer_handle.clone(), - )); - task_manager.add_child(relay_chain_full_node.task_manager); - let block_announce_validator = BlockAnnounceValidator::new(relay_chain_interface.clone(), para_id); let block_announce_validator_builder = move |_| Box::new(block_announce_validator) as Box<_>; @@ -342,6 +370,7 @@ where // the recovery delay of pov-recovery. We don't want to wait for too // long on the full node to recover, so we reduce this time here. relay_chain_slot_duration: Duration::from_millis(6), + collator_options, }; start_full_node(params)?; @@ -389,6 +418,7 @@ pub struct TestNodeBuilder { storage_update_func_parachain: Option>, storage_update_func_relay_chain: Option>, consensus: Consensus, + relay_chain_full_node_url: Option, } impl TestNodeBuilder { @@ -410,6 +440,7 @@ impl TestNodeBuilder { storage_update_func_parachain: None, storage_update_func_relay_chain: None, consensus: Consensus::RelayChain, + relay_chain_full_node_url: None, } } @@ -501,6 +532,21 @@ impl TestNodeBuilder { self } + /// Connect to full node via RPC. + pub fn use_external_relay_chain_node_at_url(mut self, network_address: Url) -> Self { + self.relay_chain_full_node_url = Some(network_address); + self + } + + /// Connect to full node via RPC. + pub fn use_external_relay_chain_node_at_port(mut self, port: u16) -> Self { + let mut localhost_url = + Url::parse("ws://localhost").expect("Should be able to parse localhost Url"); + localhost_url.set_port(Some(port)).expect("Should be able to set port"); + self.relay_chain_full_node_url = Some(localhost_url); + self + } + /// Build the [`TestNode`]. pub async fn build(self) -> TestNode { let parachain_config = node_config( @@ -513,6 +559,7 @@ impl TestNodeBuilder { self.collator_key.is_some(), ) .expect("could not generate Configuration"); + let mut relay_chain_config = polkadot_test_service::node_config( self.storage_update_func_relay_chain.unwrap_or_else(|| Box::new(|| ())), self.tokio_handle, @@ -521,6 +568,9 @@ impl TestNodeBuilder { false, ); + let collator_options = + CollatorOptions { relay_chain_rpc_url: self.relay_chain_full_node_url }; + relay_chain_config.network.node_name = format!("{} (relay chain)", relay_chain_config.network.node_name); @@ -533,6 +583,7 @@ impl TestNodeBuilder { self.wrap_announce_block, |_| Ok(Default::default()), self.consensus, + collator_options, ) .await .expect("could not create Cumulus test service"); @@ -737,8 +788,9 @@ pub fn run_relay_chain_validator_node( key: Sr25519Keyring, storage_update_func: impl Fn(), boot_nodes: Vec, + websocket_port: Option, ) -> polkadot_test_service::PolkadotTestNode { - let config = polkadot_test_service::node_config( + let mut config = polkadot_test_service::node_config( storage_update_func, tokio_handle, key, @@ -746,6 +798,10 @@ pub fn run_relay_chain_validator_node( true, ); + if let Some(port) = websocket_port { + config.rpc_ws = Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port)); + } + polkadot_test_service::run_validator_node( config, Some(cumulus_test_relay_validation_worker_provider::VALIDATION_WORKER.into()), diff --git a/test/service/tests/full_node_catching_up.rs b/test/service/tests/full_node_catching_up.rs index ce3fd62c023..5e66de9e74d 100644 --- a/test/service/tests/full_node_catching_up.rs +++ b/test/service/tests/full_node_catching_up.rs @@ -16,6 +16,7 @@ use cumulus_primitives_core::ParaId; use cumulus_test_service::{initial_head_data, run_relay_chain_validator_node, Keyring::*}; +use futures::join; #[substrate_test_utils::test] #[ignore] @@ -28,12 +29,24 @@ async fn test_full_node_catching_up() { let tokio_handle = tokio::runtime::Handle::current(); + let ws_port = portpicker::pick_unused_port().expect("No free ports"); // start alice - let alice = run_relay_chain_validator_node(tokio_handle.clone(), Alice, || {}, Vec::new()); + let alice = run_relay_chain_validator_node( + tokio_handle.clone(), + Alice, + || {}, + Vec::new(), + Some(ws_port), + ); // start bob - let bob = - run_relay_chain_validator_node(tokio_handle.clone(), Bob, || {}, vec![alice.addr.clone()]); + let bob = run_relay_chain_validator_node( + tokio_handle.clone(), + Bob, + || {}, + vec![alice.addr.clone()], + None, + ); // register parachain alice @@ -57,10 +70,19 @@ async fn test_full_node_catching_up() { charlie.wait_for_blocks(5).await; // run cumulus dave (a parachain full node) and wait for it to sync some blocks - let dave = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle, Dave) + let dave = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle.clone(), Dave) .connect_to_parachain_node(&charlie) .connect_to_relay_chain_nodes(vec![&alice, &bob]) .build() .await; - dave.wait_for_blocks(7).await; + + // run cumulus dave (a parachain full node) and wait for it to sync some blocks + let eve = cumulus_test_service::TestNodeBuilder::new(para_id, tokio_handle, Eve) + .connect_to_parachain_node(&charlie) + .connect_to_relay_chain_nodes(vec![&alice, &bob]) + .use_external_relay_chain_node_at_port(ws_port) + .build() + .await; + + join!(dave.wait_for_blocks(7), eve.wait_for_blocks(7)); } diff --git a/test/service/tests/migrate_solo_to_para.rs b/test/service/tests/migrate_solo_to_para.rs index 686db4d7e1f..f2b7e3d37f4 100644 --- a/test/service/tests/migrate_solo_to_para.rs +++ b/test/service/tests/migrate_solo_to_para.rs @@ -42,11 +42,17 @@ async fn test_migrate_solo_to_para() { let tokio_handle = tokio::runtime::Handle::current(); // start alice - let alice = run_relay_chain_validator_node(tokio_handle.clone(), Alice, || {}, Vec::new()); + let alice = + run_relay_chain_validator_node(tokio_handle.clone(), Alice, || {}, Vec::new(), None); // start bob - let bob = - run_relay_chain_validator_node(tokio_handle.clone(), Bob, || {}, vec![alice.addr.clone()]); + let bob = run_relay_chain_validator_node( + tokio_handle.clone(), + Bob, + || {}, + vec![alice.addr.clone()], + None, + ); // register parachain alice diff --git a/test/service/tests/runtime_upgrade.rs b/test/service/tests/runtime_upgrade.rs index 4731404fee8..236c31acb40 100644 --- a/test/service/tests/runtime_upgrade.rs +++ b/test/service/tests/runtime_upgrade.rs @@ -31,11 +31,17 @@ async fn test_runtime_upgrade() { let tokio_handle = tokio::runtime::Handle::current(); // start alice - let alice = run_relay_chain_validator_node(tokio_handle.clone(), Alice, || {}, Vec::new()); + let alice = + run_relay_chain_validator_node(tokio_handle.clone(), Alice, || {}, Vec::new(), None); // start bob - let bob = - run_relay_chain_validator_node(tokio_handle.clone(), Bob, || {}, vec![alice.addr.clone()]); + let bob = run_relay_chain_validator_node( + tokio_handle.clone(), + Bob, + || {}, + vec![alice.addr.clone()], + None, + ); // register parachain alice