diff --git a/Cargo.lock b/Cargo.lock index 9cc34439aa22..3ac381046682 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4661,6 +4661,20 @@ dependencies = [ "wasm-timer", ] +[[package]] +name = "polkadot-node-core-runtime-api" +version = "0.1.0" +dependencies = [ + "assert_matches", + "futures 0.3.5", + "polkadot-node-primitives", + "polkadot-node-subsystem", + "polkadot-primitives", + "sp-api", + "sp-blockchain", + "sp-core", +] + [[package]] name = "polkadot-node-primitives" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 23079972a030..595936f357c1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,9 +43,10 @@ members = [ "service", "validation", + "node/core/av-store", "node/core/bitfield-signing", "node/core/proposer", - "node/core/av-store", + "node/core/runtime-api", "node/network/bridge", "node/network/pov-distribution", "node/network/statement-distribution", diff --git a/node/core/runtime-api/Cargo.toml b/node/core/runtime-api/Cargo.toml index 14a0ce5540d3..200aed3897da 100644 --- a/node/core/runtime-api/Cargo.toml +++ b/node/core/runtime-api/Cargo.toml @@ -6,17 +6,15 @@ edition = "2018" [dependencies] futures = "0.3.5" -log = "0.4.8" sp-api = { git = "https://github.com/paritytech/substrate", branch = "master" } -sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" } -primitives = { package = "sp-core", git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-primitives = { path = "../../../primitives" } polkadot-node-primitives = { path = "../../primitives" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem" } [dev-dependencies] +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } futures = { version = "0.3.5", features = ["thread-pool"] } -subsystem-test = { package = "polkadot-subsystem-test-helpers", path = "../../test-helpers/subsystem" } +polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../../subsystem", features = ["test-helpers"] } assert_matches = "1.3.0" diff --git a/node/core/runtime-api/src/lib.rs b/node/core/runtime-api/src/lib.rs index 0e1023b8c4b4..f2423817235a 100644 --- a/node/core/runtime-api/src/lib.rs +++ b/node/core/runtime-api/src/lib.rs @@ -18,5 +18,444 @@ //! //! This provides a clean, ownerless wrapper around the parachain-related runtime APIs. This crate //! can also be used to cache responses from heavy runtime APIs. -//! -//! TODO: https://github.com/paritytech/polkadot/issues/1419 implement this. + +use polkadot_subsystem::{ + Subsystem, SpawnedSubsystem, SubsystemResult, SubsystemContext, + FromOverseer, OverseerSignal, +}; +use polkadot_subsystem::messages::{ + RuntimeApiMessage, RuntimeApiRequest as Request, RuntimeApiError, +}; +use polkadot_primitives::v1::{Block, BlockId, Hash, ParachainHost}; + +use sp_api::{ProvideRuntimeApi}; + +use futures::prelude::*; + +/// The `RuntimeApiSubsystem`. See module docs for more details. +pub struct RuntimeApiSubsystem(Client); + +impl RuntimeApiSubsystem { + /// Create a new Runtime API subsystem wrapping the given client. + pub fn new(client: Client) -> Self { + RuntimeApiSubsystem(client) + } +} + +impl Subsystem for RuntimeApiSubsystem where + Client: ProvideRuntimeApi + Send + 'static, + Client::Api: ParachainHost, + Context: SubsystemContext +{ + fn start(self, ctx: Context) -> SpawnedSubsystem { + SpawnedSubsystem { + future: run(ctx, self.0).map(|_| ()).boxed(), + name: "RuntimeApiSubsystem", + } + } +} + +async fn run( + mut ctx: impl SubsystemContext, + client: Client, +) -> SubsystemResult<()> where + Client: ProvideRuntimeApi, + Client::Api: ParachainHost, +{ + loop { + match ctx.recv().await? { + FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()), + FromOverseer::Signal(OverseerSignal::ActiveLeaves(_)) => {}, + FromOverseer::Signal(OverseerSignal::BlockFinalized(_)) => {}, + FromOverseer::Communication { msg } => match msg { + RuntimeApiMessage::Request(relay_parent, request) => make_runtime_api_request( + &client, + relay_parent, + request, + ), + } + } + } +} + +fn make_runtime_api_request( + client: &Client, + relay_parent: Hash, + request: Request, +) where + Client: ProvideRuntimeApi, + Client::Api: ParachainHost, +{ + macro_rules! query { + ($api_name:ident ($($param:expr),*), $sender:expr) => {{ + let sender = $sender; + let api = client.runtime_api(); + let res = api.$api_name(&BlockId::Hash(relay_parent), $($param),*) + .map_err(|e| RuntimeApiError::from(format!("{:?}", e))); + + let _ = sender.send(res); + }} + } + + match request { + Request::Validators(sender) => query!(validators(), sender), + Request::ValidatorGroups(sender) => query!(validator_groups(), sender), + Request::AvailabilityCores(sender) => query!(availability_cores(), sender), + Request::GlobalValidationData(sender) => query!(global_validation_data(), sender), + Request::LocalValidationData(para, assumption, sender) => + query!(local_validation_data(para, assumption), sender), + Request::SessionIndexForChild(sender) => query!(session_index_for_child(), sender), + Request::ValidationCode(para, assumption, sender) => + query!(validation_code(para, assumption), sender), + Request::CandidatePendingAvailability(para, sender) => + query!(candidate_pending_availability(para), sender), + Request::CandidateEvents(sender) => query!(candidate_events(), sender), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use polkadot_primitives::v1::{ + ValidatorId, ValidatorIndex, GroupRotationInfo, CoreState, GlobalValidationData, + Id as ParaId, OccupiedCoreAssumption, LocalValidationData, SessionIndex, ValidationCode, + CommittedCandidateReceipt, CandidateEvent, + }; + use polkadot_subsystem::test_helpers; + use sp_core::testing::TaskExecutor; + + use std::collections::HashMap; + use futures::channel::oneshot; + + #[derive(Default, Clone)] + struct MockRuntimeApi { + validators: Vec, + validator_groups: Vec>, + availability_cores: Vec, + global_validation_data: GlobalValidationData, + local_validation_data: HashMap, + session_index_for_child: SessionIndex, + validation_code: HashMap, + candidate_pending_availability: HashMap, + candidate_events: Vec, + } + + impl ProvideRuntimeApi for MockRuntimeApi { + type Api = Self; + + fn runtime_api<'a>(&'a self) -> sp_api::ApiRef<'a, Self::Api> { + self.clone().into() + } + } + + sp_api::mock_impl_runtime_apis! { + impl ParachainHost for MockRuntimeApi { + type Error = String; + + fn validators(&self) -> Vec { + self.validators.clone() + } + + fn validator_groups(&self) -> (Vec>, GroupRotationInfo) { + ( + self.validator_groups.clone(), + GroupRotationInfo { + session_start_block: 1, + group_rotation_frequency: 100, + now: 10, + }, + ) + } + + fn availability_cores(&self) -> Vec { + self.availability_cores.clone() + } + + fn global_validation_data(&self) -> GlobalValidationData { + self.global_validation_data.clone() + } + + fn local_validation_data( + &self, + para: ParaId, + _assumption: OccupiedCoreAssumption, + ) -> Option { + self.local_validation_data.get(¶).map(|l| l.clone()) + } + + fn session_index_for_child(&self) -> SessionIndex { + self.session_index_for_child.clone() + } + + fn validation_code( + &self, + para: ParaId, + _assumption: OccupiedCoreAssumption, + ) -> Option { + self.validation_code.get(¶).map(|c| c.clone()) + } + + fn candidate_pending_availability( + &self, + para: ParaId, + ) -> Option { + self.candidate_pending_availability.get(¶).map(|c| c.clone()) + } + + fn candidate_events(&self) -> Vec { + self.candidate_events.clone() + } + } + } + + #[test] + fn requests_validators() { + let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); + let runtime_api = MockRuntimeApi::default(); + let relay_parent = [1; 32].into(); + + let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap()); + let test_task = async move { + let (tx, rx) = oneshot::channel(); + + ctx_handle.send(FromOverseer::Communication { + msg: RuntimeApiMessage::Request(relay_parent, Request::Validators(tx)) + }).await; + + assert_eq!(rx.await.unwrap().unwrap(), runtime_api.validators); + + ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + }; + + futures::executor::block_on(future::join(subsystem_task, test_task)); + } + + #[test] + fn requests_validator_groups() { + let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); + let runtime_api = MockRuntimeApi::default(); + let relay_parent = [1; 32].into(); + + let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap()); + let test_task = async move { + let (tx, rx) = oneshot::channel(); + + ctx_handle.send(FromOverseer::Communication { + msg: RuntimeApiMessage::Request(relay_parent, Request::ValidatorGroups(tx)) + }).await; + + assert_eq!(rx.await.unwrap().unwrap().0, runtime_api.validator_groups); + + ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + }; + + futures::executor::block_on(future::join(subsystem_task, test_task)); + } + + #[test] + fn requests_availability_cores() { + let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); + let runtime_api = MockRuntimeApi::default(); + let relay_parent = [1; 32].into(); + + let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap()); + let test_task = async move { + let (tx, rx) = oneshot::channel(); + + ctx_handle.send(FromOverseer::Communication { + msg: RuntimeApiMessage::Request(relay_parent, Request::AvailabilityCores(tx)) + }).await; + + assert_eq!(rx.await.unwrap().unwrap(), runtime_api.availability_cores); + + ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + }; + + futures::executor::block_on(future::join(subsystem_task, test_task)); + } + + #[test] + fn requests_global_validation_data() { + let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); + let runtime_api = MockRuntimeApi::default(); + let relay_parent = [1; 32].into(); + + let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap()); + let test_task = async move { + let (tx, rx) = oneshot::channel(); + + ctx_handle.send(FromOverseer::Communication { + msg: RuntimeApiMessage::Request(relay_parent, Request::GlobalValidationData(tx)) + }).await; + + assert_eq!(rx.await.unwrap().unwrap(), runtime_api.global_validation_data); + + ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + }; + + futures::executor::block_on(future::join(subsystem_task, test_task)); + } + + #[test] + fn requests_local_validation_data() { + let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); + let mut runtime_api = MockRuntimeApi::default(); + let relay_parent = [1; 32].into(); + let para_a = 5.into(); + let para_b = 6.into(); + + runtime_api.local_validation_data.insert(para_a, Default::default()); + + let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap()); + let test_task = async move { + let (tx, rx) = oneshot::channel(); + + ctx_handle.send(FromOverseer::Communication { + msg: RuntimeApiMessage::Request( + relay_parent, + Request::LocalValidationData(para_a, OccupiedCoreAssumption::Included, tx) + ), + }).await; + + assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default())); + + let (tx, rx) = oneshot::channel(); + ctx_handle.send(FromOverseer::Communication { + msg: RuntimeApiMessage::Request( + relay_parent, + Request::LocalValidationData(para_b, OccupiedCoreAssumption::Included, tx) + ), + }).await; + + assert_eq!(rx.await.unwrap().unwrap(), None); + + ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + }; + + futures::executor::block_on(future::join(subsystem_task, test_task)); + } + + #[test] + fn requests_session_index_for_child() { + let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); + let runtime_api = MockRuntimeApi::default(); + let relay_parent = [1; 32].into(); + + let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap()); + let test_task = async move { + let (tx, rx) = oneshot::channel(); + + ctx_handle.send(FromOverseer::Communication { + msg: RuntimeApiMessage::Request(relay_parent, Request::SessionIndexForChild(tx)) + }).await; + + assert_eq!(rx.await.unwrap().unwrap(), runtime_api.session_index_for_child); + + ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + }; + + futures::executor::block_on(future::join(subsystem_task, test_task)); + } + + #[test] + fn requests_validation_code() { + let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); + let mut runtime_api = MockRuntimeApi::default(); + let relay_parent = [1; 32].into(); + let para_a = 5.into(); + let para_b = 6.into(); + + runtime_api.validation_code.insert(para_a, Default::default()); + + let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap()); + let test_task = async move { + let (tx, rx) = oneshot::channel(); + + ctx_handle.send(FromOverseer::Communication { + msg: RuntimeApiMessage::Request( + relay_parent, + Request::ValidationCode(para_a, OccupiedCoreAssumption::Included, tx) + ), + }).await; + + assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default())); + + let (tx, rx) = oneshot::channel(); + ctx_handle.send(FromOverseer::Communication { + msg: RuntimeApiMessage::Request( + relay_parent, + Request::ValidationCode(para_b, OccupiedCoreAssumption::Included, tx) + ), + }).await; + + assert_eq!(rx.await.unwrap().unwrap(), None); + + ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + }; + + futures::executor::block_on(future::join(subsystem_task, test_task)); + } + + #[test] + fn requests_candidate_pending_availability() { + let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); + let mut runtime_api = MockRuntimeApi::default(); + let relay_parent = [1; 32].into(); + let para_a = 5.into(); + let para_b = 6.into(); + + runtime_api.candidate_pending_availability.insert(para_a, Default::default()); + + let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap()); + let test_task = async move { + let (tx, rx) = oneshot::channel(); + + ctx_handle.send(FromOverseer::Communication { + msg: RuntimeApiMessage::Request( + relay_parent, + Request::CandidatePendingAvailability(para_a, tx), + ) + }).await; + + assert_eq!(rx.await.unwrap().unwrap(), Some(Default::default())); + + let (tx, rx) = oneshot::channel(); + + ctx_handle.send(FromOverseer::Communication { + msg: RuntimeApiMessage::Request( + relay_parent, + Request::CandidatePendingAvailability(para_b, tx), + ) + }).await; + + assert_eq!(rx.await.unwrap().unwrap(), None); + + ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + }; + + futures::executor::block_on(future::join(subsystem_task, test_task)); + } + + #[test] + fn requests_candidate_events() { + let (ctx, mut ctx_handle) = test_helpers::make_subsystem_context(TaskExecutor::new()); + let runtime_api = MockRuntimeApi::default(); + let relay_parent = [1; 32].into(); + + let subsystem_task = run(ctx, runtime_api.clone()).map(|x| x.unwrap()); + let test_task = async move { + let (tx, rx) = oneshot::channel(); + + ctx_handle.send(FromOverseer::Communication { + msg: RuntimeApiMessage::Request(relay_parent, Request::CandidateEvents(tx)) + }).await; + + assert_eq!(rx.await.unwrap().unwrap(), runtime_api.candidate_events); + + ctx_handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + }; + + futures::executor::block_on(future::join(subsystem_task, test_task)); + } +} diff --git a/node/subsystem/src/messages.rs b/node/subsystem/src/messages.rs index 895e936d5b24..c049b9d199cf 100644 --- a/node/subsystem/src/messages.rs +++ b/node/subsystem/src/messages.rs @@ -300,6 +300,12 @@ pub struct SchedulerRoster { #[derive(Debug, Clone)] pub struct RuntimeApiError(String); +impl From for RuntimeApiError { + fn from(s: String) -> Self { + RuntimeApiError(s) + } +} + /// A sender for the result of a runtime API request. pub type RuntimeApiSender = oneshot::Sender>; @@ -324,6 +330,10 @@ pub enum RuntimeApiRequest { ), /// Get the session index that a child of the block will have. SessionIndexForChild(RuntimeApiSender), + /// Get the validation code for a para, taking the given `OccupiedCoreAssumption`, which + /// will inform on how the validation data should be computed if the para currently + /// occupies a core. + ValidationCode(ParaId, OccupiedCoreAssumption, RuntimeApiSender>), /// Get a the candidate pending availability for a particular parachain by parachain / core index CandidatePendingAvailability(ParaId, RuntimeApiSender>), /// Get all events concerning candidates (backing, inclusion, time-out) in the parent of diff --git a/primitives/src/v1.rs b/primitives/src/v1.rs index 5d523fd30074..8b4130fe9de0 100644 --- a/primitives/src/v1.rs +++ b/primitives/src/v1.rs @@ -646,7 +646,7 @@ pub enum CandidateEvent { sp_api::decl_runtime_apis! { /// The API for querying the state of parachains on-chain. - pub trait ParachainHost { + pub trait ParachainHost { /// Get the current validators. fn validators() -> Vec;