From af77a9aa9e0d09ad152272d0e1203685da378537 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 27 May 2020 22:26:54 -0400 Subject: [PATCH 01/38] scheduler module skeleton --- runtime/parachains/src/scheduler.rs | 123 ++++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index 1f45de2df705..51490f02e8e6 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -13,3 +13,126 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . + +//! The scheduler module for parachains and parathreads. +//! +//! This module is responsible for two main tasks: +//! - Paritioning validators into groups and assigning groups to parachains and parathreads +//! - Scheduling parachains and parathreads +//! +//! It aims to achieve these tasks with these goals in mind: +//! - It should be possible to know at least a block ahead-of-time, ideally more, +//! which validators are going to be assigned to which parachains. +//! - Parachains that have a candidate pending availability in this fork of the chain +//! should not be assigned. +//! - Validator assignments should not be gameable. Malicious cartels should not be able to +//! manipulate the scheduler to assign themselves as desired. +//! - High or close to optimal throughput of parachains and parathreads. Work among validator groups should be balanced. +//! +//! The Scheduler manages resource allocation using the concept of "Execution Cores". +//! There will be one execution core for each parachain, and a fixed number of cores +//! used for multiplexing parathreads. Validators will be partitioned into groups, with the same +//! number of groups as execution cores. Validator groups will be assigned to different execution cores +//! over time. + +use sp_std::prelude::*; +use primitives::{ + parachain::{ValidatorId, Id as ParaId, CollatorId, ValidatorIndex}, +}; +use frame_support::{ + decl_storage, decl_module, decl_error, + dispatch::DispatchResult, + weights::{DispatchClass, Weight}, +}; +use codec::{Encode, Decode}; +use system::ensure_root; + +use crate::{configuration, paras}; + +// A claim on authorship for a specific parathread. +#[derive(Encode, Decode)] +struct ParathreadClaim(ParaId, CollatorId); + +// A parathread that is scheduled onto a specific core. +#[derive(Encode, Decode)] +struct ParathreadEntry { + claim: ParathreadClaim, + core: CoreIndex, +} + +// what a core is occupied by +#[derive(Encode, Decode)] +enum CoreOccupied { + Parathread(ParathreadClaim, u32), // claim & retries + Parachain, +} + +/// The unique (during session) index of a core. +#[derive(Encode, Decode)] +pub(crate) struct CoreIndex(u32); + +pub trait Trait: system::Trait + configuration::Trait + paras::Trait { } + +decl_storage! { + trait Store for Module as Scheduler { + /// All of the validator groups, one for each core. + ValidatorGroups: Vec>; + /// A queue of upcoming claims and which core they should be mapped onto. + ParathreadQueue: Vec; + /// One entry for each execution core. Entries are `None` if the core is not currently occupied. + /// Can be temporarily `Some` if scheduled but not occupied. + /// The i'th parachain belongs to the i'th core, with the remaining cores all being + /// parathread-multiplexers. + ExecutionCores: Vec>; + /// An index used to ensure that only one claim on a parathread exists in the queue or retry queue + /// or is currently being handled by an occupied core. + ParathreadClaimIndex: Vec<(ParaId, CollatorId)>; + /// The block number where the session start occurred. Used to track how many group rotation have + /// occurred. + SessionStartBlock: BlockNumber, + /// Currently scheduled cores - free but up to be occupied. Ephemeral storage item that's wiped on + /// finalization. + Scheduled get(fn scheduled): Vec, // sorted by ParaId + } +} + +impl Module { + /// Called by the initializer to initialize the scheduler module. + pub(crate) fn initializer_initialize(_now: T::BlockNumber) -> Weight { + Self::schedule(Vec::new()); + + 0 + } + + /// Called by the initializer to finalize the scheduler module. + pub(crate) fn initializer_finalize() { + // TODO [now]: free all scheduled cores and return parathread claims to queue, with retries incremented. + } + + /// Called by the initializer to note that a new session has started. + pub(crate) fn initializer_on_new_session(_validators: &[ValidatorId], _queued: &[ValidatorId]) { + let config = >::config(); + + SessionStartBlock::set(>::block_number()); + ExecutionCores::mutate(|cores| { + // clear all occupied cores. + for maybe_occupied in cores.iter_mut() { + if let Some(CoreOccupied::Parathread(claim, retries)) = maybe_occupied.take() { + // TODO [now]: return to parathread queue, do not increment retries + } + } + + let n_parachains = >::parachains().len(); + + cores.resize(n_parachains + config.parathread_cores, None); + }); + + // TODO [now]: shuffle validators into groups + + // TODO [now]: prune out all parathread claims with too many retries. + } + + pub(crate) fn schedule(just_freed_cores: Vec) { + // TODO [now]: schedule new core assignments. + } +} From d2eb2479d50624bf2c4eece24dc79e2a2bc8149e Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 2 Jun 2020 18:57:38 -0400 Subject: [PATCH 02/38] update scheduler skeleton to match latest version of guide --- runtime/parachains/src/scheduler.rs | 130 +++++++++++++++++++--------- 1 file changed, 90 insertions(+), 40 deletions(-) diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index 51490f02e8e6..b9ddad4ec0f8 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -29,10 +29,10 @@ //! manipulate the scheduler to assign themselves as desired. //! - High or close to optimal throughput of parachains and parathreads. Work among validator groups should be balanced. //! -//! The Scheduler manages resource allocation using the concept of "Execution Cores". -//! There will be one execution core for each parachain, and a fixed number of cores +//! The Scheduler manages resource allocation using the concept of "Availability Cores". +//! There will be one availability core for each parachain, and a fixed number of cores //! used for multiplexing parathreads. Validators will be partitioned into groups, with the same -//! number of groups as execution cores. Validator groups will be assigned to different execution cores +//! number of groups as availability cores. Validator groups will be assigned to different availability cores //! over time. use sp_std::prelude::*; @@ -49,50 +49,87 @@ use system::ensure_root; use crate::{configuration, paras}; -// A claim on authorship for a specific parathread. -#[derive(Encode, Decode)] -struct ParathreadClaim(ParaId, CollatorId); +/// The unique (during session) index of a core. +#[derive(Encode, Decode, Default)] +pub(crate) struct CoreIndex(u32); + +/// The unique (during session) index of a validator group. +#[derive(Encode, Decode, Default)] +pub(crate) struct GroupIndex(u32); + +/// A claim on authoring the next block for a given parathread. +#[derive(Clone, Encode, Decode, Default)] +pub(crate) struct ParathreadClaim(ParaId, CollatorId); + +/// An entry tracking a claim to ensure it does not pass the maximum number of retries. +#[derive(Clone, Encode, Decode, Default)] +pub(crate) struct ParathreadEntry { + claim: ParathreadClaim, + retries: u32, +} -// A parathread that is scheduled onto a specific core. -#[derive(Encode, Decode)] -struct ParathreadEntry { - claim: ParathreadClaim, +/// A queued parathread entry, pre-assigned to a core. +#[derive(Encode, Decode, Default)] +pub(crate) struct QueuedParathread { + claim: ParathreadEntry, core: CoreIndex, } -// what a core is occupied by -#[derive(Encode, Decode)] -enum CoreOccupied { - Parathread(ParathreadClaim, u32), // claim & retries - Parachain, +/// The queue of all parathread claims. +#[derive(Encode, Decode, Default)] +pub(crate) struct ParathreadClaimQueue { + queue: Vec, + // this value is between 0 and config.parathread_cores + next_core: CoreIndex, } -/// The unique (during session) index of a core. -#[derive(Encode, Decode)] -pub(crate) struct CoreIndex(u32); +/// What is occupying a specific availability core. +#[derive(Clone, Encode, Decode)] +pub(crate) enum CoreOccupied { + Parathread(ParathreadEntry), + Parachain, +} + +/// How a free core is scheduled to be assigned. +#[derive(Encode, Decode, Default)] +pub(crate) struct CoreAssignment { + core: CoreIndex, + para_id: ParaId, + collator: Option, + group_idx: GroupIndex, +} pub trait Trait: system::Trait + configuration::Trait + paras::Trait { } decl_storage! { trait Store for Module as Scheduler { - /// All of the validator groups, one for each core. + /// All the validator groups. One for each core. ValidatorGroups: Vec>; /// A queue of upcoming claims and which core they should be mapped onto. - ParathreadQueue: Vec; - /// One entry for each execution core. Entries are `None` if the core is not currently occupied. - /// Can be temporarily `Some` if scheduled but not occupied. + ParathreadQueue: ParathreadClaimQueue; + /// One entry for each availability core. Entries are `None` if the core is not currently occupied. Can be + /// temporarily `Some` if scheduled but not occupied. /// The i'th parachain belongs to the i'th core, with the remaining cores all being /// parathread-multiplexers. - ExecutionCores: Vec>; - /// An index used to ensure that only one claim on a parathread exists in the queue or retry queue - /// or is currently being handled by an occupied core. - ParathreadClaimIndex: Vec<(ParaId, CollatorId)>; - /// The block number where the session start occurred. Used to track how many group rotation have - /// occurred. - SessionStartBlock: BlockNumber, - /// Currently scheduled cores - free but up to be occupied. Ephemeral storage item that's wiped on - /// finalization. - Scheduled get(fn scheduled): Vec, // sorted by ParaId + AvailabilityCores: Vec>; + /// An index used to ensure that only one claim on a parathread exists in the queue or is + /// currently being handled by an occupied core. + ParathreadClaimIndex: Vec; + /// The block number where the session start occurred. Used to track how many group rotations have occurred. + SessionStartBlock: T::BlockNumber; + /// Currently scheduled cores - free but up to be occupied. Ephemeral storage item that's wiped on finalization. + Scheduled: Vec; // sorted ascending by CoreIndex. + } +} + +decl_error! { + pub enum Error for Module { } +} + +decl_module! { + /// The scheduler module. + pub struct Module for enum Call where origin: ::Origin { + type Error = Error; } } @@ -110,26 +147,39 @@ impl Module { } /// Called by the initializer to note that a new session has started. - pub(crate) fn initializer_on_new_session(_validators: &[ValidatorId], _queued: &[ValidatorId]) { + pub(crate) fn initializer_on_new_session(validators: &[ValidatorId], _queued: &[ValidatorId]) { let config = >::config(); - SessionStartBlock::set(>::block_number()); - ExecutionCores::mutate(|cores| { + let mut thread_queue = ParathreadQueue::get(); + let n_parachains = >::parachains().len() as u32; + let n_cores = n_parachains + config.parathread_cores; + + >::set(>::block_number()); + AvailabilityCores::mutate(|cores| { // clear all occupied cores. for maybe_occupied in cores.iter_mut() { - if let Some(CoreOccupied::Parathread(claim, retries)) = maybe_occupied.take() { - // TODO [now]: return to parathread queue, do not increment retries + if let Some(CoreOccupied::Parathread(claim)) = maybe_occupied.take() { + let queued = QueuedParathread { + claim, + core: CoreIndex(0), // this gets set later in the re-balancing. + }; + + thread_queue.queue.push(queued); } } - let n_parachains = >::parachains().len(); - - cores.resize(n_parachains + config.parathread_cores, None); + cores.resize(n_cores as _, None); }); // TODO [now]: shuffle validators into groups + if n_cores == 0 || validators.is_empty() { + ValidatorGroups::set(Vec::new()); + } else { + let group_base_size = validators.len() as u32 / n_cores; + } // TODO [now]: prune out all parathread claims with too many retries. + ParathreadQueue::set(thread_queue); } pub(crate) fn schedule(just_freed_cores: Vec) { From 631a6bad78ac12e8aaddc93808f516b29f8f34fc Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 2 Jun 2020 19:17:57 -0400 Subject: [PATCH 03/38] better session change notification --- runtime/parachains/src/configuration.rs | 2 +- runtime/parachains/src/initializer.rs | 48 ++++++++++++++++++++++--- runtime/parachains/src/paras.rs | 4 +-- runtime/parachains/src/scheduler.rs | 12 +++++-- 4 files changed, 56 insertions(+), 10 deletions(-) diff --git a/runtime/parachains/src/configuration.rs b/runtime/parachains/src/configuration.rs index b747dbf85cd2..657fb882bc30 100644 --- a/runtime/parachains/src/configuration.rs +++ b/runtime/parachains/src/configuration.rs @@ -33,7 +33,7 @@ use system::ensure_root; /// All configuration of the runtime with respect to parachains and parathreads. #[derive(Clone, Encode, Decode, PartialEq, Default)] #[cfg_attr(test, derive(Debug))] -pub struct HostConfiguration { +pub struct HostConfiguration { /// The minimum frequency at which parachains can update their validation code. pub validation_upgrade_frequency: BlockNumber, /// The delay, in blocks, before a validation upgrade is applied. diff --git a/runtime/parachains/src/initializer.rs b/runtime/parachains/src/initializer.rs index c942de4756c1..4c5aecca2916 100644 --- a/runtime/parachains/src/initializer.rs +++ b/runtime/parachains/src/initializer.rs @@ -25,11 +25,28 @@ use primitives::{ parachain::{ValidatorId}, }; use frame_support::{ - decl_storage, decl_module, decl_error, + decl_storage, decl_module, decl_error, traits::Randomness, }; -use crate::{configuration, paras}; +use crate::{configuration::{self, HostConfiguration}, paras}; + +/// Information about a session change that has just occurred. +pub struct SessionChangeNotification { + /// The new validators in the session. + pub validators: Vec, + /// The qeueud validators for the following session. + pub queued: Vec, + /// The configuration before handling the session change + pub prev_config: HostConfiguration, + /// The configuration after handling the session change. + pub new_config: HostConfiguration, + /// A secure random seed for the session, gathered from BABE. + pub random_seed: [u8; 32], +} -pub trait Trait: system::Trait + configuration::Trait + paras::Trait { } +pub trait Trait: system::Trait + configuration::Trait + paras::Trait { + /// A randomness beacon. + type Randomness: Randomness; +} decl_storage! { trait Store for Module as Initializer { @@ -90,8 +107,31 @@ impl Module { let validators: Vec<_> = validators.map(|(_, v)| v).collect(); let queued: Vec<_> = queued.map(|(_, v)| v).collect(); + let prev_config = >::config(); + + let random_seed = { + let mut buf = [0u8; 32]; + let random_hash = T::Randomness::random(&b"paras"[..]); + let len = sp_std::cmp::min(32, random_hash.as_ref().len()); + buf[..len].copy_from_slice(&random_hash.as_ref()[..len]); + buf + }; + + // We can't pass the new config into the thing that determines the new config, + // so we don't pass the `SessionChangeNotification` into this module. configuration::Module::::initializer_on_new_session(&validators, &queued); - paras::Module::::initializer_on_new_session(&validators, &queued); + + let new_config = >::config(); + + let notification = SessionChangeNotification { + validators, + queued, + prev_config, + new_config, + random_seed, + }; + + paras::Module::::initializer_on_new_session(¬ification); } } diff --git a/runtime/parachains/src/paras.rs b/runtime/parachains/src/paras.rs index 51f76d6df774..ac547c0717df 100644 --- a/runtime/parachains/src/paras.rs +++ b/runtime/parachains/src/paras.rs @@ -35,7 +35,7 @@ use frame_support::{ weights::Weight, }; use codec::{Encode, Decode}; -use crate::configuration; +use crate::{configuration, initializer::SessionChangeNotification}; #[cfg(feature = "std")] use serde::{Serialize, Deserialize}; @@ -253,7 +253,7 @@ impl Module { pub(crate) fn initializer_finalize() { } /// Called by the initializer to note that a new session has started. - pub(crate) fn initializer_on_new_session(_validators: &[ValidatorId], _queued: &[ValidatorId]) { + pub(crate) fn initializer_on_new_session(_notification: &SessionChangeNotification) { let now = >::block_number(); let mut parachains = Self::clean_up_outgoing(now); Self::apply_incoming(&mut parachains); diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index b9ddad4ec0f8..12639b72925a 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -47,7 +47,7 @@ use frame_support::{ use codec::{Encode, Decode}; use system::ensure_root; -use crate::{configuration, paras}; +use crate::{configuration, paras, initializer::SessionChangeNotification}; /// The unique (during session) index of a core. #[derive(Encode, Decode, Default)] @@ -147,8 +147,14 @@ impl Module { } /// Called by the initializer to note that a new session has started. - pub(crate) fn initializer_on_new_session(validators: &[ValidatorId], _queued: &[ValidatorId]) { - let config = >::config(); + pub(crate) fn initializer_on_new_session(notification: &SessionChangeNotification) { + let &SessionChangeNotification { + ref validators, + ref random_seed, + ref new_config, + .. + } = notification; + let config = new_config; let mut thread_queue = ParathreadQueue::get(); let n_parachains = >::parachains().len() as u32; From cdf114b206f22cb88b588b3ad2870ff8bd5a121e Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 2 Jun 2020 19:22:55 -0400 Subject: [PATCH 04/38] add mock randomness and fix test compilation --- runtime/parachains/src/initializer.rs | 1 + runtime/parachains/src/mock.rs | 14 ++++++++++++-- runtime/parachains/src/paras.rs | 2 +- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/runtime/parachains/src/initializer.rs b/runtime/parachains/src/initializer.rs index 4c5aecca2916..b467a3bbba84 100644 --- a/runtime/parachains/src/initializer.rs +++ b/runtime/parachains/src/initializer.rs @@ -30,6 +30,7 @@ use frame_support::{ use crate::{configuration::{self, HostConfiguration}, paras}; /// Information about a session change that has just occurred. +#[derive(Default, Clone)] pub struct SessionChangeNotification { /// The new validators in the session. pub validators: Vec, diff --git a/runtime/parachains/src/mock.rs b/runtime/parachains/src/mock.rs index 6f08545008a9..949502c18374 100644 --- a/runtime/parachains/src/mock.rs +++ b/runtime/parachains/src/mock.rs @@ -30,7 +30,7 @@ use primitives::{ }; use frame_support::{ impl_outer_origin, impl_outer_dispatch, parameter_types, - weights::Weight, + weights::Weight, traits::Randomness as RandomnessT, }; /// A test runtime struct. @@ -47,6 +47,14 @@ impl_outer_dispatch! { } } +pub struct TestRandomness; + +impl RandomnessT for TestRandomness { + fn random(_subject: &[u8]) -> H256 { + Default::default() + } +} + parameter_types! { pub const BlockHashCount: u32 = 250; pub const MaximumBlockWeight: Weight = 4 * 1024 * 1024; @@ -80,7 +88,9 @@ impl system::Trait for Test { type OnKilledAccount = (); } -impl crate::initializer::Trait for Test { } +impl crate::initializer::Trait for Test { + type Randomness = TestRandomness; +} impl crate::configuration::Trait for Test { } diff --git a/runtime/parachains/src/paras.rs b/runtime/parachains/src/paras.rs index ac547c0717df..a33d3c8f54a0 100644 --- a/runtime/parachains/src/paras.rs +++ b/runtime/parachains/src/paras.rs @@ -536,7 +536,7 @@ mod tests { System::set_block_number(b + 1); if new_session.as_ref().map_or(false, |v| v.contains(&(b + 1))) { - Paras::initializer_on_new_session(&[], &[]); + Paras::initializer_on_new_session(&Default::default()); } Paras::initializer_initialize(b + 1); } From b2a8c4229d0a4c11ee71f66210a448b60085a760 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 2 Jun 2020 19:40:51 -0400 Subject: [PATCH 05/38] shuffle validators into groups --- Cargo.lock | 2 ++ runtime/parachains/Cargo.toml | 3 +++ runtime/parachains/src/scheduler.rs | 27 +++++++++++++++++++++++++-- 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 44dc59e5efcb..96c98f857170 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4481,6 +4481,8 @@ dependencies = [ "pallet-vesting", "parity-scale-codec", "polkadot-primitives", + "rand 0.7.3", + "rand_chacha 0.2.2", "rustc-hex", "serde", "serde_derive", diff --git a/runtime/parachains/Cargo.toml b/runtime/parachains/Cargo.toml index adaf400a2e2a..5288e267b67e 100644 --- a/runtime/parachains/Cargo.toml +++ b/runtime/parachains/Cargo.toml @@ -35,6 +35,9 @@ frame-benchmarking = { git = "https://github.com/paritytech/substrate", branch = primitives = { package = "polkadot-primitives", path = "../../primitives", default-features = false } libsecp256k1 = { version = "0.3.2", default-features = false, optional = true } +rand = { version = "0.7", default-features = false } +rand_chacha = { version = "0.2.2", default-features = false } + [dev-dependencies] hex-literal = "0.2.1" keyring = { package = "sp-keyring", git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index 12639b72925a..ea2beabdd632 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -47,6 +47,9 @@ use frame_support::{ use codec::{Encode, Decode}; use system::ensure_root; +use rand::{SeedableRng, seq::SliceRandom}; +use rand_chacha::ChaCha20Rng; + use crate::{configuration, paras, initializer::SessionChangeNotification}; /// The unique (during session) index of a core. @@ -177,11 +180,31 @@ impl Module { cores.resize(n_cores as _, None); }); - // TODO [now]: shuffle validators into groups if n_cores == 0 || validators.is_empty() { ValidatorGroups::set(Vec::new()); } else { - let group_base_size = validators.len() as u32 / n_cores; + let mut rng: ChaCha20Rng = SeedableRng::from_seed(notification.random_seed); + + let mut shuffled_indices: Vec<_> = (0..validators.len()) + .enumerate() + .map(|(i, _)| i as ValidatorIndex) + .collect(); + + shuffled_indices.shuffle(&mut rng); + + let group_base_size = validators.len() / n_cores as usize; + let larger_groups = validators.len() % n_cores as usize; + let groups: Vec> = (0..n_cores).map(|core_id| { + let n_members = if (core_id as usize) < larger_groups { + group_base_size + 1 + } else { + group_base_size + }; + + shuffled_indices.drain(shuffled_indices.len() - n_members ..).rev().collect() + }).collect(); + + ValidatorGroups::set(groups); } // TODO [now]: prune out all parathread claims with too many retries. From d6d751d6fce143642f1859d9a83b493f33d1d084 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 2 Jun 2020 19:57:33 -0400 Subject: [PATCH 06/38] finish implementing session change logic for scheduler --- runtime/parachains/src/scheduler.rs | 44 +++++++++++++++++++++++++++-- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index ea2beabdd632..7aad1332a99e 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -83,7 +83,7 @@ pub(crate) struct QueuedParathread { pub(crate) struct ParathreadClaimQueue { queue: Vec, // this value is between 0 and config.parathread_cores - next_core: CoreIndex, + next_core_offset: CoreIndex, } /// What is occupying a specific availability core. @@ -180,6 +180,7 @@ impl Module { cores.resize(n_cores as _, None); }); + // shuffle validators into groups. if n_cores == 0 || validators.is_empty() { ValidatorGroups::set(Vec::new()); } else { @@ -207,7 +208,46 @@ impl Module { ValidatorGroups::set(groups); } - // TODO [now]: prune out all parathread claims with too many retries. + // prune out all parathread claims with too many retries. + // assign all non-pruned claims to new cores, if they've changed. + ParathreadClaimIndex::mutate(|claim_index| { + // wipe all parathread metadata if no parathread cores are configured. + if config.parathread_cores == 0 { + thread_queue = ParathreadClaimQueue { + queue: Vec::new(), + next_core_offset: CoreIndex(0), + }; + claim_index.clear(); + return; + } + + // prune out all entries beyond retry. + thread_queue.queue.retain(|queued| { + let will_keep = queued.claim.retries <= config.parathread_retries; + + if !will_keep { + let claim_para = queued.claim.claim.0; + + // clean up the pruned entry from the index. + if let Ok(i) = claim_index.binary_search(&claim_para) { + claim_index.remove(i); + } + } + + will_keep + }); + + // do re-balancing of claims. + { + for (i, queued) in thread_queue.queue.iter_mut().enumerate() { + // offset by the number of parachains. + queued.core = CoreIndex((i as u32) % config.parathread_cores + n_parachains); + } + + thread_queue.next_core_offset = + CoreIndex(((thread_queue.queue.len() + 1) as u32) % config.parathread_cores); + } + }); ParathreadQueue::set(thread_queue); } From ba8dc9f85df71b8941d5b72ef0322eb57da66406 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 2 Jun 2020 20:04:54 -0400 Subject: [PATCH 07/38] tweak core assignment type to track retries of parathread --- roadmap/implementors-guide/guide.md | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/roadmap/implementors-guide/guide.md b/roadmap/implementors-guide/guide.md index 0da67bf35dba..6ddd83e18524 100644 --- a/roadmap/implementors-guide/guide.md +++ b/roadmap/implementors-guide/guide.md @@ -619,11 +619,16 @@ enum CoreOccupied { Parachain, } +enum AssignmentKind { + Parachain, + Parathread(CollatorId, u32), +} + struct CoreAssignment { - core: CoreIndex, - para_id: ParaId, - collator: Option, - group_idx: GroupIndex, + core: CoreIndex, + para_id: ParaId, + kind: AssignmentKind, + group_idx: GroupIndex, } ``` From be800ddb38b42423f0340dfd8144bd1d746ee675 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 2 Jun 2020 20:18:59 -0400 Subject: [PATCH 08/38] reframe queued parathread core as offset --- roadmap/implementors-guide/guide.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/roadmap/implementors-guide/guide.md b/roadmap/implementors-guide/guide.md index 6ddd83e18524..b27a3f659508 100644 --- a/roadmap/implementors-guide/guide.md +++ b/roadmap/implementors-guide/guide.md @@ -605,13 +605,14 @@ struct ParathreadEntry { // A queued parathread entry, pre-assigned to a core. struct QueuedParathread { claim: ParathreadEntry, - core: CoreIndex, + // this value is between 0 and config.parathread_cores. + core_offset: u32, } struct ParathreadQueue { queue: Vec, // this value is between 0 and config.parathread_cores - next_core: CoreIndex, + next_core_offset: u32, } enum CoreOccupied { From cad53e66d3e25ce0be9d57f243d56fde0eb03cbf Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 2 Jun 2020 20:19:08 -0400 Subject: [PATCH 09/38] implement initialzation and finalization routines --- runtime/parachains/src/scheduler.rs | 84 +++++++++++++++++++++++------ 1 file changed, 67 insertions(+), 17 deletions(-) diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index 7aad1332a99e..a3c903f4d13f 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -67,15 +67,15 @@ pub(crate) struct ParathreadClaim(ParaId, CollatorId); /// An entry tracking a claim to ensure it does not pass the maximum number of retries. #[derive(Clone, Encode, Decode, Default)] pub(crate) struct ParathreadEntry { - claim: ParathreadClaim, - retries: u32, + claim: ParathreadClaim, + retries: u32, } /// A queued parathread entry, pre-assigned to a core. #[derive(Encode, Decode, Default)] pub(crate) struct QueuedParathread { claim: ParathreadEntry, - core: CoreIndex, + core_offset: u32, } /// The queue of all parathread claims. @@ -83,23 +83,59 @@ pub(crate) struct QueuedParathread { pub(crate) struct ParathreadClaimQueue { queue: Vec, // this value is between 0 and config.parathread_cores - next_core_offset: CoreIndex, + next_core_offset: u32, +} + +impl ParathreadClaimQueue { + // Queue a parathread entry to be processed. + // + // Provide the entry and the number of parathread cores, which must be greater than 0. + fn queue_entry(&mut self, entry: ParathreadEntry, n_parathread_cores: u32) { + let core_offset = self.next_core_offset; + self.next_core_offset = (self.next_core_offset + 1) % n_parathread_cores; + + self.queue.push(QueuedParathread { + claim: entry, + core_offset, + }) + } } /// What is occupying a specific availability core. #[derive(Clone, Encode, Decode)] pub(crate) enum CoreOccupied { - Parathread(ParathreadEntry), - Parachain, + Parathread(ParathreadEntry), + Parachain, +} + +/// The assignment type. +#[derive(Clone, Encode, Decode)] +pub(crate) enum AssignmentKind { + Parachain, + Parathread(CollatorId, u32), } /// How a free core is scheduled to be assigned. -#[derive(Encode, Decode, Default)] +#[derive(Encode, Decode)] pub(crate) struct CoreAssignment { - core: CoreIndex, - para_id: ParaId, - collator: Option, - group_idx: GroupIndex, + /// The core that is assigned. + pub core: CoreIndex, + /// The unique ID of the para that is assigned to the core. + pub para_id: ParaId, + /// The kind of the assigment. + pub kind: AssignmentKind, + /// The index of the validator group assigned to the core. + pub group_idx: GroupIndex, +} + +impl CoreAssignment { + /// Get the ID of a collator who is required to collate this block. + pub(crate) fn required_collator(&self) -> Option<&CollatorId> { + match self.kind { + AssignmentKind::Parachain => None, + AssignmentKind::Parathread(ref id, _) => Some(id), + } + } } pub trait Trait: system::Trait + configuration::Trait + paras::Trait { } @@ -146,7 +182,22 @@ impl Module { /// Called by the initializer to finalize the scheduler module. pub(crate) fn initializer_finalize() { - // TODO [now]: free all scheduled cores and return parathread claims to queue, with retries incremented. + // Free all scheduled cores and return parathread claims to queue, with retries incremented. + let config = >::config(); + ParathreadQueue::mutate(|queue| { + for core_assignment in Scheduled::take() { + if let AssignmentKind::Parathread(collator, retries) = core_assignment.kind { + let entry = ParathreadEntry { + claim: ParathreadClaim(core_assignment.para_id, collator), + retries: retries + 1, + }; + + if entry.retries > config.parathread_retries { continue } + queue.queue_entry(entry, config.parathread_cores); + } + } + }) + } /// Called by the initializer to note that a new session has started. @@ -170,7 +221,7 @@ impl Module { if let Some(CoreOccupied::Parathread(claim)) = maybe_occupied.take() { let queued = QueuedParathread { claim, - core: CoreIndex(0), // this gets set later in the re-balancing. + core_offset: 0, // this gets set later in the re-balancing. }; thread_queue.queue.push(queued); @@ -215,7 +266,7 @@ impl Module { if config.parathread_cores == 0 { thread_queue = ParathreadClaimQueue { queue: Vec::new(), - next_core_offset: CoreIndex(0), + next_core_offset: 0, }; claim_index.clear(); return; @@ -240,12 +291,11 @@ impl Module { // do re-balancing of claims. { for (i, queued) in thread_queue.queue.iter_mut().enumerate() { - // offset by the number of parachains. - queued.core = CoreIndex((i as u32) % config.parathread_cores + n_parachains); + queued.core_offset = (i as u32) % config.parathread_cores; } thread_queue.next_core_offset = - CoreIndex(((thread_queue.queue.len() + 1) as u32) % config.parathread_cores); + ((thread_queue.queue.len() + 1) as u32) % config.parathread_cores; } }); ParathreadQueue::set(thread_queue); From eb6404467a64aba66beec6fbbc648e43b205ccba Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 2 Jun 2020 20:29:04 -0400 Subject: [PATCH 10/38] implement parathread claim queuing --- runtime/parachains/src/scheduler.rs | 30 ++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index a3c903f4d13f..7e18e6a062cc 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -62,7 +62,7 @@ pub(crate) struct GroupIndex(u32); /// A claim on authoring the next block for a given parathread. #[derive(Clone, Encode, Decode, Default)] -pub(crate) struct ParathreadClaim(ParaId, CollatorId); +pub struct ParathreadClaim(pub ParaId, pub CollatorId); /// An entry tracking a claim to ensure it does not pass the maximum number of retries. #[derive(Clone, Encode, Decode, Default)] @@ -301,6 +301,34 @@ impl Module { ParathreadQueue::set(thread_queue); } + /// Add a parathread claim to the queue. If there is a competing claim in the queue or currently + /// assigned to a core, this call will fail. This call will also fail if the queue is full. + pub fn add_parathread_claim(claim: ParathreadClaim) { + let config = >::config(); + let queue_max_size = config.parathread_cores * config.scheduling_lookahead; + + ParathreadQueue::mutate(|queue| { + if queue.queue.len() >= queue_max_size as usize { return } + + let para_id = claim.0; + + let competes_with_another = ParathreadClaimIndex::mutate(|index| { + match index.binary_search(¶_id) { + Ok(_) => true, + Err(i) => { + index.insert(i, para_id); + false + } + } + }); + + if competes_with_another { return } + + let entry = ParathreadEntry { claim, retries: 0 }; + queue.queue_entry(entry, config.parathread_cores); + }) + } + pub(crate) fn schedule(just_freed_cores: Vec) { // TODO [now]: schedule new core assignments. } From 8b4cfd4bdf38a2d966d3f6a08cd09a5a19869f74 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 2 Jun 2020 20:34:06 -0400 Subject: [PATCH 11/38] implement core_para --- runtime/parachains/src/scheduler.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index 7e18e6a062cc..a553d56d2daf 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -157,7 +157,7 @@ decl_storage! { /// The block number where the session start occurred. Used to track how many group rotations have occurred. SessionStartBlock: T::BlockNumber; /// Currently scheduled cores - free but up to be occupied. Ephemeral storage item that's wiped on finalization. - Scheduled: Vec; // sorted ascending by CoreIndex. + Scheduled get(fn scheduled): Vec; // sorted ascending by CoreIndex. } } @@ -332,4 +332,17 @@ impl Module { pub(crate) fn schedule(just_freed_cores: Vec) { // TODO [now]: schedule new core assignments. } + + /// Get the para (chain or thread) ID assigned to a particular core or index, if any. Core indices + /// out of bounds will return `None`, as will indices of unassigned cores. + pub(crate) fn core_para(core_index: CoreIndex) -> Option { + match AvailabilityCores::get().get(core_index.0 as usize).and_then(|c| c) { + None => None, + Some(CoreOccupied::Parachain) => { + let parachains = >::parachains(); + Some(parachains[core_index.0 as usize]) + } + Some(CoreOccupied::Parathread(entry)) => Some(entry.claim.0), + } + } } From ff2e4d773057386ef2644fadc075cd421bd72a0c Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 2 Jun 2020 20:39:41 -0400 Subject: [PATCH 12/38] implement the group_validators routine and fix errors --- runtime/parachains/src/scheduler.rs | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index a553d56d2daf..b9be916b9673 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -54,11 +54,11 @@ use crate::{configuration, paras, initializer::SessionChangeNotification}; /// The unique (during session) index of a core. #[derive(Encode, Decode, Default)] -pub(crate) struct CoreIndex(u32); +pub struct CoreIndex(u32); /// The unique (during session) index of a validator group. #[derive(Encode, Decode, Default)] -pub(crate) struct GroupIndex(u32); +pub struct GroupIndex(u32); /// A claim on authoring the next block for a given parathread. #[derive(Clone, Encode, Decode, Default)] @@ -66,21 +66,21 @@ pub struct ParathreadClaim(pub ParaId, pub CollatorId); /// An entry tracking a claim to ensure it does not pass the maximum number of retries. #[derive(Clone, Encode, Decode, Default)] -pub(crate) struct ParathreadEntry { +pub struct ParathreadEntry { claim: ParathreadClaim, retries: u32, } /// A queued parathread entry, pre-assigned to a core. #[derive(Encode, Decode, Default)] -pub(crate) struct QueuedParathread { +pub struct QueuedParathread { claim: ParathreadEntry, core_offset: u32, } /// The queue of all parathread claims. #[derive(Encode, Decode, Default)] -pub(crate) struct ParathreadClaimQueue { +pub struct ParathreadClaimQueue { queue: Vec, // this value is between 0 and config.parathread_cores next_core_offset: u32, @@ -110,14 +110,14 @@ pub(crate) enum CoreOccupied { /// The assignment type. #[derive(Clone, Encode, Decode)] -pub(crate) enum AssignmentKind { +pub enum AssignmentKind { Parachain, Parathread(CollatorId, u32), } /// How a free core is scheduled to be assigned. #[derive(Encode, Decode)] -pub(crate) struct CoreAssignment { +pub struct CoreAssignment { /// The core that is assigned. pub core: CoreIndex, /// The unique ID of the para that is assigned to the core. @@ -336,13 +336,19 @@ impl Module { /// Get the para (chain or thread) ID assigned to a particular core or index, if any. Core indices /// out of bounds will return `None`, as will indices of unassigned cores. pub(crate) fn core_para(core_index: CoreIndex) -> Option { - match AvailabilityCores::get().get(core_index.0 as usize).and_then(|c| c) { + let cores = AvailabilityCores::get(); + match cores.get(core_index.0 as usize).and_then(|c| c.as_ref()) { None => None, Some(CoreOccupied::Parachain) => { let parachains = >::parachains(); Some(parachains[core_index.0 as usize]) } - Some(CoreOccupied::Parathread(entry)) => Some(entry.claim.0), + Some(CoreOccupied::Parathread(ref entry)) => Some(entry.claim.0), } } + + /// Get the validators in the given group, if the group index is valid for this session. + pub(crate) fn group_validators(group_index: GroupIndex) -> Option> { + ValidatorGroups::get().get(group_index.0 as usize).map(|g| g.clone()) + } } From 35a369256ec848dbed15c80494f57b124c21f9d8 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 2 Jun 2020 20:49:47 -0400 Subject: [PATCH 13/38] add a reason for freeing cores --- roadmap/implementors-guide/guide.md | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/roadmap/implementors-guide/guide.md b/roadmap/implementors-guide/guide.md index b27a3f659508..849ea5683c3b 100644 --- a/roadmap/implementors-guide/guide.md +++ b/roadmap/implementors-guide/guide.md @@ -631,6 +631,12 @@ struct CoreAssignment { kind: AssignmentKind, group_idx: GroupIndex, } + +// reasons a core might be freed. +enum FreedReason { + Concluded, + TimedOut, +} ``` Storage layout: @@ -690,9 +696,10 @@ Actions: - `next_core` is then updated by adding 1 and taking it modulo `config.parathread_cores`. - The claim is then added to the claim index. -* `schedule(Vec)`: schedule new core assignments, with a parameter indicating previously-occupied cores which are to be considered returned. +* `schedule(Vec<(CoreIndex, FreedReason)>)`: schedule new core assignments, with a parameter indicating previously-occupied cores which are to be considered returned and why they are being returned. - All freed parachain cores should be assigned to their respective parachain - - All freed parathread cores should have the claim removed from the claim index. + - All freed parathread cores should have the claim removed from the claim index, if the reason for freeing was `FreedReason::Concluded` + - All freed parathread cores should have the claim added to the parathread queue again without retries incremented, if the reason for freeing was `FreedReason::TimedOut`. - All freed parathread cores should take the next parathread entry from the queue. - The i'th validator group will be assigned to the `(i+k)%n`'th core at any point in time, where `k` is the number of rotations that have occurred in the session, and `n` is the total number of cores. This makes upcoming rotations within the same session predictable. * `scheduled() -> Vec`: Get currently scheduled core assignments. @@ -798,8 +805,8 @@ Included: Option<()>, #### Entry Points * `inclusion`: This entry-point accepts two parameters: [`Bitfields`](#Signed-Availability-Bitfield) and [`BackedCandidates`](#Backed-Candidate). - 1. The `Bitfields` are first forwarded to the `process_bitfields` routine, returning a set of freed cores. Provide a `Scheduler::core_para` as a core-lookup to the `process_bitfields` routine. - 1. If `Scheduler::availability_timeout_predicate` is `Some`, invoke `Inclusion::collect_pending` using it, and add timed-out cores to the free cores. + 1. The `Bitfields` are first forwarded to the `process_bitfields` routine, returning a set of freed cores. Provide a `Scheduler::core_para` as a core-lookup to the `process_bitfields` routine. Annotate each of these freed cores with `FreedReason::Concluded`. + 1. If `Scheduler::availability_timeout_predicate` is `Some`, invoke `Inclusion::collect_pending` using it, and add timed-out cores to the free cores, annotated with `FreedReason::TimedOut`. 1. Invoke `Scheduler::schedule(freed)` 1. Pass the `BackedCandidates` along with the output of `Scheduler::scheduled` to the `Inclusion::process_candidates` routine, getting a list of all newly-occupied cores. 1. Call `Scheduler::occupied` for all scheduled cores where a backed candidate was submitted. From adbaa1db8e7324c3ee99163de437bae13b6399f6 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 2 Jun 2020 22:01:54 -0400 Subject: [PATCH 14/38] implement `schedule` function --- runtime/parachains/src/scheduler.rs | 180 +++++++++++++++++++++++++++- 1 file changed, 176 insertions(+), 4 deletions(-) diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index b9be916b9673..c339a9db9705 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -36,6 +36,7 @@ //! over time. use sp_std::prelude::*; +use sp_std::convert::{TryFrom, TryInto}; use primitives::{ parachain::{ValidatorId, Id as ParaId, CollatorId, ValidatorIndex}, }; @@ -46,6 +47,7 @@ use frame_support::{ }; use codec::{Encode, Decode}; use system::ensure_root; +use sp_runtime::traits::Zero; use rand::{SeedableRng, seq::SliceRandom}; use rand_chacha::ChaCha20Rng; @@ -53,11 +55,11 @@ use rand_chacha::ChaCha20Rng; use crate::{configuration, paras, initializer::SessionChangeNotification}; /// The unique (during session) index of a core. -#[derive(Encode, Decode, Default)] +#[derive(Encode, Decode, Default, PartialOrd, Ord, Eq, PartialEq, Clone, Copy)] pub struct CoreIndex(u32); /// The unique (during session) index of a validator group. -#[derive(Encode, Decode, Default)] +#[derive(Encode, Decode, Default, Clone, Copy)] pub struct GroupIndex(u32); /// A claim on authoring the next block for a given parathread. @@ -99,6 +101,12 @@ impl ParathreadClaimQueue { core_offset, }) } + + // Take next queued entry with given core offset, if any. + fn take_next_on_core(&mut self, core_offset: u32) -> Option { + let pos = self.queue.iter().position(|queued| queued.core_offset == core_offset); + pos.map(|i| self.queue.remove(i).claim) + } } /// What is occupying a specific availability core. @@ -138,6 +146,14 @@ impl CoreAssignment { } } +/// Reasons a core might be freed +pub enum FreedReason { + /// The core's work concluded and the parablock assigned to it is considered available. + Concluded, + /// The core's work timed out. + TimedOut, +} + pub trait Trait: system::Trait + configuration::Trait + paras::Trait { } decl_storage! { @@ -329,8 +345,132 @@ impl Module { }) } - pub(crate) fn schedule(just_freed_cores: Vec) { - // TODO [now]: schedule new core assignments. + pub(crate) fn schedule(just_freed_cores: Vec<(CoreIndex, FreedReason)>) { + let mut cores = AvailabilityCores::get(); + let config = >::config(); + + for (freed_index, freed_reason) in just_freed_cores { + if (freed_index.0 as usize) < cores.len() { + match cores[freed_index.0 as usize].take() { + None => continue, + Some(CoreOccupied::Parachain) => {}, + Some(CoreOccupied::Parathread(entry)) => { + match freed_reason { + FreedReason::Concluded => { + // After a parathread candidate has successfully been included, + // open it up for further claims! + ParathreadClaimIndex::mutate(|index| { + if let Ok(i) = index.binary_search(&entry.claim.0) { + index.remove(i); + } + }) + } + FreedReason::TimedOut => { + // If a parathread candidate times out, it's not the collator's fault, + // so we don't increment retries. + ParathreadQueue::mutate(|queue| { + queue.queue_entry(entry, config.parathread_cores); + }) + } + } + } + } + } + } + + let parachains = >::parachains(); + let mut scheduled = Scheduled::get(); + let mut parathread_queue = ParathreadQueue::get(); + let now = >::block_number(); + + { + let mut prev_scheduled_in_order = scheduled.iter().enumerate().peekable(); + + // Updates to the previous list of scheduled updates and the position of where to insert + // them, without accounting for prior updates. + let mut scheduled_updates: Vec<(usize, CoreAssignment)> = Vec::new(); + + // single-sweep O(n) in the number of cores. + for (core_index, _core) in cores.iter().enumerate().filter(|(_, ref c)| c.is_none()) { + let schedule_and_insert_at = { + // advance the iterator until just before the core index we are looking at now. + while prev_scheduled_in_order.peek().map_or( + false, + |(_, assign)| (assign.core.0 as usize) < core_index, + ) { + let _ = prev_scheduled_in_order.next(); + } + + // check the first entry already scheduled with core index >= than the one we + // are looking at. 3 cases: + // 1. No such entry, clearly this core is not scheduled, so we need to schedule and put at the end. + // 2. Entry exists and has same index as the core we are inspecting. do not schedule again. + // 3. Entry exists and has higher index than the core we are inspecting. schedule and note + // insertion position. + prev_scheduled_in_order.peek().map_or( + Some(scheduled.len()), + |(idx_in_scheduled, assign)| if (assign.core.0 as usize) == core_index { + None + } else { + Some(*idx_in_scheduled) + }, + ) + }; + + let schedule_and_insert_at = match schedule_and_insert_at { + None => continue, + Some(at) => at, + }; + + let core = CoreIndex(core_index as u32); + + let core_assignment = if core_index < parachains.len() { + // parachain core. + Some(CoreAssignment { + kind: AssignmentKind::Parachain, + para_id: parachains[core_index], + core: core.clone(), + group_idx: Self::group_assigned_to_core(core, now) + .expect("core is not out of bounds and we are guaranteed \ + to be after the most recent session start; qed"), + }) + } else { + // parathread core offset, rel. to beginning. + let core_offset = (core_index - parachains.len()) as u32; + + parathread_queue.take_next_on_core(core_offset).map(|entry| CoreAssignment { + kind: AssignmentKind::Parathread(entry.claim.1, entry.retries), + para_id: entry.claim.0, + core: core.clone(), + group_idx: Self::group_assigned_to_core(core, now) + .expect("core is not out of bounds and we are guaranteed \ + to be after the most recent session start; qed"), + }) + }; + + if let Some(assignment) = core_assignment { + scheduled_updates.push((schedule_and_insert_at, assignment)) + } + } + + // at this point, because `Scheduled` is guaranteed to be sorted and we navigated unassigned + // core indices in ascending order, we can enact the updates prepared by the previous actions. + // + // while inserting, we have to account for the amount of insertions already done. + // + // This is O(n) as well, capped at n operations, where n is the number of cores. + for (num_insertions_before, (insert_at, to_insert)) in scheduled_updates.into_iter().enumerate() { + let insert_at = num_insertions_before + insert_at; + scheduled.insert(insert_at, to_insert); + } + + // scheduled is guaranteed to be sorted after this point because it was sorted before, and we + // applied sorted updates at their correct positions, accounting for the offsets of previous + // insertions. + } + + Scheduled::set(scheduled); + ParathreadQueue::set(parathread_queue); } /// Get the para (chain or thread) ID assigned to a particular core or index, if any. Core indices @@ -351,4 +491,36 @@ impl Module { pub(crate) fn group_validators(group_index: GroupIndex) -> Option> { ValidatorGroups::get().get(group_index.0 as usize).map(|g| g.clone()) } + + /// Get the group assigned to a specific core by index at the current block number. Result undefined if the core index is unknown + /// or the block number is less than the session start index. + pub(crate) fn group_assigned_to_core(core: CoreIndex, at: T::BlockNumber) -> Option { + let config = >::config(); + let session_start_block = >::get(); + + if at < session_start_block { return None } + + if config.parachain_rotation_frequency.is_zero() { + // interpret this as "no rotations" + return Some(GroupIndex(core.0)); + } + + let validator_groups = ValidatorGroups::get(); + + if core.0 as usize >= validator_groups.len() { return None } + + let rotations_since_session_start: T::BlockNumber = + (at - session_start_block) / config.parachain_rotation_frequency.into(); + + let rotations_since_session_start + = match >::try_into(rotations_since_session_start) + { + Ok(i) => i, + Err(_) => 0, // can only happen if rotations occur only once every u32::max(), + // so functionally no difference in behavior. + }; + + let group_idx = (core.0 as usize + rotations_since_session_start as usize) % validator_groups.len(); + Some(GroupIndex(group_idx as u32)) + } } From 6a1133792b15ff95a315c4f02a4ceb1bfcf83ca2 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sat, 6 Jun 2020 12:44:58 -0400 Subject: [PATCH 15/38] add some docs to the scheduled function --- runtime/parachains/src/scheduler.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index c339a9db9705..3b2fa48c3c92 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -345,6 +345,9 @@ impl Module { }) } + /// Schedule all unassigned cores, where possible. Provide a list of cores that should be considered + /// newly-freed along with the reason for them being freed. The list is assumed to be sorted in + /// ascending order by core index. pub(crate) fn schedule(just_freed_cores: Vec<(CoreIndex, FreedReason)>) { let mut cores = AvailabilityCores::get(); let config = >::config(); From c63a94b58c2850c5e4be4fbf6b172ba607f60876 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sat, 6 Jun 2020 13:01:36 -0400 Subject: [PATCH 16/38] implement `occupied` helper --- roadmap/implementors-guide/guide.md | 4 +-- runtime/parachains/src/scheduler.rs | 49 ++++++++++++++++++++++++++++- 2 files changed, 50 insertions(+), 3 deletions(-) diff --git a/roadmap/implementors-guide/guide.md b/roadmap/implementors-guide/guide.md index 6213b264ee79..abfdc160a85e 100644 --- a/roadmap/implementors-guide/guide.md +++ b/roadmap/implementors-guide/guide.md @@ -704,8 +704,8 @@ Actions: - The i'th validator group will be assigned to the `(i+k)%n`'th core at any point in time, where `k` is the number of rotations that have occurred in the session, and `n` is the total number of cores. This makes upcoming rotations within the same session predictable. * `scheduled() -> Vec`: Get currently scheduled core assignments. * `occupied(Vec). Note that the given cores have become occupied. - - Fails if any given cores were not scheduled. - - Fails if the given cores are not sorted ascending by core index + - Behavior undefined if any given cores were not scheduled. + - Behavior undefined if the given cores are not sorted ascending by core index - This clears them from `Scheduled` and marks each corresponding `core` in the `AvailabilityCores` as occupied. - Since both the availability cores and the newly-occupied cores lists are sorted ascending, this method can be implemented efficiently. * `core_para(CoreIndex) -> ParaId`: return the currently-scheduled or occupied ParaId for the given core. diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index 3b2fa48c3c92..1ad26f213813 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -130,7 +130,7 @@ pub struct CoreAssignment { pub core: CoreIndex, /// The unique ID of the para that is assigned to the core. pub para_id: ParaId, - /// The kind of the assigment. + /// The kind of the assignment. pub kind: AssignmentKind, /// The index of the validator group assigned to the core. pub group_idx: GroupIndex, @@ -144,6 +144,18 @@ impl CoreAssignment { AssignmentKind::Parathread(ref id, _) => Some(id), } } + + fn to_core_occupied(&self) -> CoreOccupied { + match self.kind { + AssignmentKind::Parachain => CoreOccupied::Parachain, + AssignmentKind::Parathread(collator, retries) => CoreOccupied::Parathread( + ParathreadEntry { + claim: ParathreadClaim(self.para_id, collator), + retries, + } + ), + } + } } /// Reasons a core might be freed @@ -476,6 +488,41 @@ impl Module { ParathreadQueue::set(parathread_queue); } + /// Note that the given cores have become occupied. Behavior undefined if any of the given cores were not scheduled + /// or the slice is not sorted ascending by core index. + /// + /// Complexity: O(n) in the number of scheduled cores, which is capped at the number of total cores. + /// This is efficient in the case that most scheduled cores are occupied. + pub(crate) fn occupied(now_occupied: &[CoreIndex]) { + if now_occupied.is_empty() { return } + + let mut availability_cores = AvailabilityCores::get(); + Scheduled::mutate(|scheduled| { + // The constraints on the function require that now_occupied is a sorted subset of the + // `scheduled` cores, which are also sorted. + + let mut occupied_iter = now_occupied.iter().cloned().peekable(); + scheduled.retain(|assignment| { + occupied_iter.peek().map_or(true, |occupied_idx| { + if occupied_idx == assignment.core { + // remove this entry - it's now occupied. and begin inspecting the next extry + // of the occupied iterator. + let _ = occupied_iter.next(); + + availability_cores[occupied_idx.0 as usize] = Some(assignment.to_core_occupied()); + + false + } else { + // retain all members of `scheduled` which do not match the next entry of occupied. + true + } + }) + }) + }); + + AvailabilityCores::set(availability_cores); + } + /// Get the para (chain or thread) ID assigned to a particular core or index, if any. Core indices /// out of bounds will return `None`, as will indices of unassigned cores. pub(crate) fn core_para(core_index: CoreIndex) -> Option { From 1c6d4abd859364e2a45825972e4e979bb38dbeae Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sat, 6 Jun 2020 13:21:22 -0400 Subject: [PATCH 17/38] implement availability predicate --- runtime/parachains/src/configuration.rs | 2 +- runtime/parachains/src/scheduler.rs | 76 ++++++++++++++++++++----- 2 files changed, 62 insertions(+), 16 deletions(-) diff --git a/runtime/parachains/src/configuration.rs b/runtime/parachains/src/configuration.rs index 657fb882bc30..8849abe25132 100644 --- a/runtime/parachains/src/configuration.rs +++ b/runtime/parachains/src/configuration.rs @@ -49,7 +49,7 @@ pub struct HostConfiguration { pub parathread_cores: u32, /// The number of retries that a parathread author has to submit their block. pub parathread_retries: u32, - /// How often parachain groups should be rotated across parachains. + /// How often parachain groups should be rotated across parachains. Must be non-zero. pub parachain_rotation_frequency: BlockNumber, /// The availability period, in blocks, for parachains. This is the amount of blocks /// after inclusion that validators have to make the block available and signal its availability to diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index 1ad26f213813..a1ed7677565a 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -47,7 +47,7 @@ use frame_support::{ }; use codec::{Encode, Decode}; use system::ensure_root; -use sp_runtime::traits::Zero; +use sp_runtime::traits::{Saturating, Zero}; use rand::{SeedableRng, seq::SliceRandom}; use rand_chacha::ChaCha20Rng; @@ -148,9 +148,9 @@ impl CoreAssignment { fn to_core_occupied(&self) -> CoreOccupied { match self.kind { AssignmentKind::Parachain => CoreOccupied::Parachain, - AssignmentKind::Parathread(collator, retries) => CoreOccupied::Parathread( + AssignmentKind::Parathread(ref collator, retries) => CoreOccupied::Parathread( ParathreadEntry { - claim: ParathreadClaim(self.para_id, collator), + claim: ParathreadClaim(self.para_id, collator.clone()), retries, } ), @@ -503,20 +503,19 @@ impl Module { let mut occupied_iter = now_occupied.iter().cloned().peekable(); scheduled.retain(|assignment| { - occupied_iter.peek().map_or(true, |occupied_idx| { - if occupied_idx == assignment.core { - // remove this entry - it's now occupied. and begin inspecting the next extry - // of the occupied iterator. - let _ = occupied_iter.next(); + let retain = occupied_iter.peek().map_or(true, |occupied_idx| { + occupied_idx != &assignment.core + }); - availability_cores[occupied_idx.0 as usize] = Some(assignment.to_core_occupied()); + if !retain { + // remove this entry - it's now occupied. and begin inspecting the next extry + // of the occupied iterator. + let _ = occupied_iter.next(); - false - } else { - // retain all members of `scheduled` which do not match the next entry of occupied. - true - } - }) + availability_cores[assignment.core.0 as usize] = Some(assignment.to_core_occupied()); + } + + retain }) }); @@ -573,4 +572,51 @@ impl Module { let group_idx = (core.0 as usize + rotations_since_session_start as usize) % validator_groups.len(); Some(GroupIndex(group_idx as u32)) } + + /// Returns an optional predicate that should be used for timing out occupied cores. + /// + /// If `None`, no timing-out should be done. The predicate accepts the index of the core, and the + /// block number since the last validator group rotation, and the respective parachain and parathread + /// timeouts, i.e. only within `max(config.chain_availability_period, config.thread_availability_period)` + /// of the last rotation would this return `Some`. + pub(crate) fn availability_timeout_predicate() -> Option bool> { + let now = >::block_number(); + let config = >::config(); + + let session_start = >::get(); + let blocks_since_session_start = now.saturating_sub(session_start); + let blocks_since_last_rotation = blocks_since_session_start % config.parachain_rotation_frequency; + + let absolute_cutoff = sp_std::cmp::max( + config.chain_availability_period, + config.thread_availability_period, + ); + + let availability_cores = AvailabilityCores::get(); + + if blocks_since_last_rotation >= absolute_cutoff { + None + } else { + Some(move |core_index: CoreIndex, pending_since| { + match availability_cores.get(core_index.0 as usize) { + None => true, // out-of-bounds, doesn't really matter what is returned. + Some(None) => true, // core not occupied, still doesn't really matter. + Some(Some(CoreOccupied::Parachain)) => { + if blocks_since_last_rotation >= config.chain_availability_period { + false // no pruning except recently after rotation. + } else { + now.saturating_sub(pending_since) >= config.chain_availability_period + } + } + Some(Some(CoreOccupied::Parathread(_))) => { + if blocks_since_last_rotation >= config.thread_availability_period { + false // no pruning except recently after rotation. + } else { + now.saturating_sub(pending_since) >= config.thread_availability_period + } + } + } + }) + } + } } From 83ba6785f6b95d02036c9749c2e789a6581f48d5 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sat, 6 Jun 2020 13:22:20 -0400 Subject: [PATCH 18/38] fix some warnings --- runtime/parachains/src/scheduler.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index a1ed7677565a..2194a84cd855 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -36,17 +36,15 @@ //! over time. use sp_std::prelude::*; -use sp_std::convert::{TryFrom, TryInto}; +use sp_std::convert::TryInto; use primitives::{ - parachain::{ValidatorId, Id as ParaId, CollatorId, ValidatorIndex}, + parachain::{Id as ParaId, CollatorId, ValidatorIndex}, }; use frame_support::{ decl_storage, decl_module, decl_error, - dispatch::DispatchResult, - weights::{DispatchClass, Weight}, + weights::Weight, }; use codec::{Encode, Decode}; -use system::ensure_root; use sp_runtime::traits::{Saturating, Zero}; use rand::{SeedableRng, seq::SliceRandom}; @@ -263,7 +261,7 @@ impl Module { if n_cores == 0 || validators.is_empty() { ValidatorGroups::set(Vec::new()); } else { - let mut rng: ChaCha20Rng = SeedableRng::from_seed(notification.random_seed); + let mut rng: ChaCha20Rng = SeedableRng::from_seed(random_seed); let mut shuffled_indices: Vec<_> = (0..validators.len()) .enumerate() From d9df37809415b19a1bbf1976e3b2020c94fa89d6 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sat, 6 Jun 2020 13:25:09 -0400 Subject: [PATCH 19/38] integrate scheduler into initializer --- runtime/parachains/src/initializer.rs | 11 ++++++++--- runtime/parachains/src/scheduler.rs | 2 +- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/runtime/parachains/src/initializer.rs b/runtime/parachains/src/initializer.rs index b467a3bbba84..2b86d19654cd 100644 --- a/runtime/parachains/src/initializer.rs +++ b/runtime/parachains/src/initializer.rs @@ -27,7 +27,7 @@ use primitives::{ use frame_support::{ decl_storage, decl_module, decl_error, traits::Randomness, }; -use crate::{configuration::{self, HostConfiguration}, paras}; +use crate::{configuration::{self, HostConfiguration}, paras, scheduler}; /// Information about a session change that has just occurred. #[derive(Default, Clone)] @@ -44,7 +44,7 @@ pub struct SessionChangeNotification { pub random_seed: [u8; 32], } -pub trait Trait: system::Trait + configuration::Trait + paras::Trait { +pub trait Trait: system::Trait + configuration::Trait + paras::Trait + scheduler::Trait { /// A randomness beacon. type Randomness: Randomness; } @@ -80,7 +80,8 @@ decl_module! { // - Inclusion // - Validity let total_weight = configuration::Module::::initializer_initialize(now) + - paras::Module::::initializer_initialize(now); + paras::Module::::initializer_initialize(now) + + scheduler::Module::::initializer_initialize(now); HasInitialized::set(Some(())); @@ -88,6 +89,9 @@ decl_module! { } fn on_finalize() { + // reverse initialization order. + + scheduler::Module::::initializer_finalize(); paras::Module::::initializer_finalize(); configuration::Module::::initializer_finalize(); HasInitialized::take(); @@ -133,6 +137,7 @@ impl Module { }; paras::Module::::initializer_on_new_session(¬ification); + scheduler::Module::::initializer_on_new_session(¬ification); } } diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index 2194a84cd855..a2570888d809 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -261,7 +261,7 @@ impl Module { if n_cores == 0 || validators.is_empty() { ValidatorGroups::set(Vec::new()); } else { - let mut rng: ChaCha20Rng = SeedableRng::from_seed(random_seed); + let mut rng: ChaCha20Rng = SeedableRng::from_seed(*random_seed); let mut shuffled_indices: Vec<_> = (0..validators.len()) .enumerate() From 3e7ac7ba89b85b1f25dcda5e2f823caacd63eae4 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sat, 6 Jun 2020 13:27:49 -0400 Subject: [PATCH 20/38] integrate scheduler into mock module --- runtime/parachains/src/mock.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/runtime/parachains/src/mock.rs b/runtime/parachains/src/mock.rs index 949502c18374..552b45fa2350 100644 --- a/runtime/parachains/src/mock.rs +++ b/runtime/parachains/src/mock.rs @@ -96,6 +96,8 @@ impl crate::configuration::Trait for Test { } impl crate::paras::Trait for Test { } +impl crate::scheduler::Trait for Test { } + pub type System = system::Module; /// Mocked initializer. From 1bee2559db4911550e500f961c752d3c5e1c858d Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sat, 6 Jun 2020 13:31:05 -0400 Subject: [PATCH 21/38] avoid conflict with Substrate's scheduler storage --- runtime/parachains/src/scheduler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index a2570888d809..94aee2441242 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -167,7 +167,7 @@ pub enum FreedReason { pub trait Trait: system::Trait + configuration::Trait + paras::Trait { } decl_storage! { - trait Store for Module as Scheduler { + trait Store for Module as ParaScheduler { /// All the validator groups. One for each core. ValidatorGroups: Vec>; /// A queue of upcoming claims and which core they should be mapped onto. From 9096c14d1024d6d5bb3ed9520fe1180401a3378c Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sat, 6 Jun 2020 13:49:34 -0400 Subject: [PATCH 22/38] add parathreads index to paras module --- roadmap/implementors-guide/guide.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/roadmap/implementors-guide/guide.md b/roadmap/implementors-guide/guide.md index abfdc160a85e..4e1d6fa81513 100644 --- a/roadmap/implementors-guide/guide.md +++ b/roadmap/implementors-guide/guide.md @@ -459,6 +459,8 @@ Storage layout: ```rust /// All parachains. Ordered ascending by ParaId. Parathreads are not included. Parachains: Vec, +/// All parathreads. +Parathreads: map ParaId => Option<()>, /// The head-data of every registered para. Heads: map ParaId => Option; /// The validation code of every live para. @@ -496,6 +498,7 @@ OutgoingParas: Vec; 1. Clean up outgoing paras. This means removing the entries under `Heads`, `ValidationCode`, `FutureCodeUpgrades`, and `FutureCode`. An according entry should be added to `PastCode`, `PastCodeMeta`, and `PastCodePruning` using the outgoing `ParaId` and removed `ValidationCode` value. This is because any outdated validation code must remain available on-chain for a determined amount of blocks, and validation code outdated by de-registering the para is still subject to that invariant. 1. Apply all incoming paras by initializing the `Heads` and `ValidationCode` using the genesis parameters. 1. Amend the `Parachains` list to reflect changes in registered parachains. +1. Amend the `Parathreads` map to reflect changes in registered parathreads. #### Initialization @@ -508,6 +511,7 @@ OutgoingParas: Vec; * `schedule_code_upgrade(ParaId, ValidationCode, expected_at: BlockNumber)`: Schedule a future code upgrade of the given parachain, to be applied after inclusion of a block of the same parachain executed in the context of a relay-chain block with number >= `expected_at`. * `note_new_head(ParaId, HeadData, BlockNumber)`: note that a para has progressed to a new head, where the new head was executed in the context of a relay-chain block with given number. This will apply pending code upgrades based on the block number provided. * `validation_code_at(ParaId, at: BlockNumber, assume_intermediate: Option)`: Fetches the validation code to be used when validating a block in the context of the given relay-chain height. A second block number parameter may be used to tell the lookup to proceed as if an intermediate parablock has been included at the given relay-chain height. This may return past, current, or (with certain choices of `assume_intermediate`) future code. `assume_intermediate`, if provided, must be before `at`. If the validation code has been pruned, this will return `None`. +* `is_parathread(ParaId) -> bool`: Returns true if the para ID references any live parathread. #### Finalization @@ -674,6 +678,7 @@ Actions: - First, we obtain "shuffled validators" `SV` by shuffling the validators using the `SessionChangeNotification`'s random seed. - The groups are selected by partitioning `SV`. The first V % N groups will have (V / N) + 1 members, while the remaining groups will have (V / N) members each. 1. Prune the parathread queue to remove all retries beyond `configuration.parathread_retries`. + - Also prune all parathread claims corresponding to de-registered parathreads. - all pruned claims should have their entry removed from the parathread index. - assign all non-pruned claims to new cores if the number of parathread cores has changed between the `new_config` and `old_config` of the `SessionChangeNotification`. - Assign claims in equal balance across all cores if rebalancing, and set the `next_core` of the `ParathreadQueue` by incrementing the relative index of the last assigned core and taking it modulo the number of parathread cores. From 242b1298c7076391cffaadbae7b80d11de793233 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sat, 6 Jun 2020 13:55:51 -0400 Subject: [PATCH 23/38] implement parathreads map in paras module --- runtime/parachains/src/paras.rs | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/runtime/parachains/src/paras.rs b/runtime/parachains/src/paras.rs index a33d3c8f54a0..7e40d41904f7 100644 --- a/runtime/parachains/src/paras.rs +++ b/runtime/parachains/src/paras.rs @@ -27,7 +27,7 @@ use sp_std::prelude::*; use sp_std::marker::PhantomData; use sp_runtime::traits::One; use primitives::{ - parachain::{ValidatorId, Id as ParaId, ValidationCode, HeadData}, + parachain::{Id as ParaId, ValidationCode, HeadData}, }; use frame_support::{ decl_storage, decl_module, decl_error, @@ -171,6 +171,8 @@ decl_storage! { trait Store for Module as Paras { /// All parachains. Ordered ascending by ParaId. Parathreads are not included. Parachains get(fn parachains): Vec; + /// All parathreads. + Parathreads: map hasher(twox_64_concat) ParaId => Option<()>; /// The head-data of every registered para. Heads get(fn parachain_head): map hasher(twox_64_concat) ParaId => Option; /// The validation code of every live para. @@ -268,6 +270,8 @@ impl Module { for outgoing_para in outgoing { if let Ok(i) = parachains.binary_search(&outgoing_para) { parachains.remove(i); + } else { + ::Parathreads::remove(&outgoing_para); } ::Heads::remove(&outgoing_para); @@ -296,6 +300,8 @@ impl Module { if let Err(i) = parachains.binary_search(&upcoming_para) { parachains.insert(i, upcoming_para); } + } else { + ::Parathreads::insert(&upcoming_para, ()); } ::Heads::insert(&upcoming_para, genesis_data.genesis_head); @@ -1040,18 +1046,24 @@ mod tests { ); assert_eq!(::UpcomingParas::get(), vec![c, b, a]); + assert!(::Parathreads::get(&a).is_none()); + // run to block without session change. run_to_block(2, None); assert_eq!(Paras::parachains(), Vec::new()); assert_eq!(::UpcomingParas::get(), vec![c, b, a]); + assert!(::Parathreads::get(&a).is_none()); + run_to_block(3, Some(vec![3])); assert_eq!(Paras::parachains(), vec![c, b]); assert_eq!(::UpcomingParas::get(), Vec::new()); + assert!(::Parathreads::get(&a).is_some()); + assert_eq!(Paras::current_code(&a), Some(vec![2].into())); assert_eq!(Paras::current_code(&b), Some(vec![1].into())); assert_eq!(Paras::current_code(&c), Some(vec![3].into())); From 94cb16934e1f47c35e1f1aa9b1d5562929c02941 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sat, 6 Jun 2020 13:57:54 -0400 Subject: [PATCH 24/38] add is_parathread to paras --- runtime/parachains/src/paras.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/runtime/parachains/src/paras.rs b/runtime/parachains/src/paras.rs index 7e40d41904f7..cd4bd3a21354 100644 --- a/runtime/parachains/src/paras.rs +++ b/runtime/parachains/src/paras.rs @@ -521,6 +521,11 @@ impl Module { } } } + + /// Whether a para ID corresponds to any live parathread. + pub(crate) fn is_parathread(id: &ParaId) -> bool { + Parathreads::get(&id).is_some() + } } #[cfg(test)] From 55ad79d569fdbadeb7be43ea1b33544b57826411 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sat, 6 Jun 2020 14:15:43 -0400 Subject: [PATCH 25/38] test adding parathread claim --- runtime/parachains/src/mock.rs | 3 + runtime/parachains/src/paras.rs | 6 +- runtime/parachains/src/scheduler.rs | 142 +++++++++++++++++++++++++++- 3 files changed, 146 insertions(+), 5 deletions(-) diff --git a/runtime/parachains/src/mock.rs b/runtime/parachains/src/mock.rs index 552b45fa2350..79a6b5bcc442 100644 --- a/runtime/parachains/src/mock.rs +++ b/runtime/parachains/src/mock.rs @@ -109,6 +109,9 @@ pub type Configuration = crate::configuration::Module; /// Mocked paras. pub type Paras = crate::paras::Module; +/// Mocked scheduler. +pub type Scheduler = crate::scheduler::Module; + /// Create a new set of test externalities. pub fn new_test_ext(state: GenesisConfig) -> TestExternalities { let mut t = state.system.build_storage::().unwrap(); diff --git a/runtime/parachains/src/paras.rs b/runtime/parachains/src/paras.rs index cd4bd3a21354..bd9526dd76f1 100644 --- a/runtime/parachains/src/paras.rs +++ b/runtime/parachains/src/paras.rs @@ -159,11 +159,11 @@ impl ParaPastCodeMeta { #[cfg_attr(feature = "std", derive(Serialize, Deserialize))] pub struct ParaGenesisArgs { /// The initial head data to use. - genesis_head: HeadData, + pub genesis_head: HeadData, /// The initial validation code to use. - validation_code: ValidationCode, + pub validation_code: ValidationCode, /// True if parachain, false if parathread. - parachain: bool, + pub parachain: bool, } diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index 94aee2441242..ea3d0695e6be 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -58,14 +58,17 @@ pub struct CoreIndex(u32); /// The unique (during session) index of a validator group. #[derive(Encode, Decode, Default, Clone, Copy)] +#[cfg_attr(test, derive(PartialEq, Debug))] pub struct GroupIndex(u32); /// A claim on authoring the next block for a given parathread. #[derive(Clone, Encode, Decode, Default)] +#[cfg_attr(test, derive(PartialEq, Debug))] pub struct ParathreadClaim(pub ParaId, pub CollatorId); /// An entry tracking a claim to ensure it does not pass the maximum number of retries. #[derive(Clone, Encode, Decode, Default)] +#[cfg_attr(test, derive(PartialEq, Debug))] pub struct ParathreadEntry { claim: ParathreadClaim, retries: u32, @@ -73,6 +76,7 @@ pub struct ParathreadEntry { /// A queued parathread entry, pre-assigned to a core. #[derive(Encode, Decode, Default)] +#[cfg_attr(test, derive(PartialEq, Debug))] pub struct QueuedParathread { claim: ParathreadEntry, core_offset: u32, @@ -80,6 +84,7 @@ pub struct QueuedParathread { /// The queue of all parathread claims. #[derive(Encode, Decode, Default)] +#[cfg_attr(test, derive(PartialEq, Debug))] pub struct ParathreadClaimQueue { queue: Vec, // this value is between 0 and config.parathread_cores @@ -298,9 +303,11 @@ impl Module { return; } - // prune out all entries beyond retry. + // prune out all entries beyond retry or that no longer correspond to live parathread. thread_queue.queue.retain(|queued| { - let will_keep = queued.claim.retries <= config.parathread_retries; + let will_keep = queued.claim.retries <= config.parathread_retries + && >::is_parathread(&queued.claim.claim.0); + if !will_keep { let claim_para = queued.claim.claim.0; @@ -329,7 +336,11 @@ impl Module { /// Add a parathread claim to the queue. If there is a competing claim in the queue or currently /// assigned to a core, this call will fail. This call will also fail if the queue is full. + /// + /// Fails if the claim does not correspond to any live parathread. pub fn add_parathread_claim(claim: ParathreadClaim) { + if !>::is_parathread(&claim.0) { return } + let config = >::config(); let queue_max_size = config.parathread_cores * config.scheduling_lookahead; @@ -618,3 +629,130 @@ impl Module { } } } + +#[cfg(test)] +mod tests { + use super::*; + + use primitives::BlockNumber; + use frame_support::traits::{OnFinalize, OnInitialize}; + use keyring::Sr25519Keyring; + + use crate::mock::{new_test_ext, Paras, System, Scheduler, GenesisConfig as MockGenesisConfig}; + use crate::configuration::HostConfiguration; + use crate::paras::ParaGenesisArgs; + + fn run_to_block(to: BlockNumber, new_session: Option>) { + while System::block_number() < to { + let b = System::block_number(); + + Scheduler::initializer_finalize(); + Paras::initializer_finalize(); + + System::on_finalize(b); + + System::on_initialize(b + 1); + System::set_block_number(b + 1); + + if new_session.as_ref().map_or(false, |v| v.contains(&(b + 1))) { + Paras::initializer_on_new_session(&Default::default()); + Scheduler::initializer_on_new_session(&Default::default()); + } + + Paras::initializer_initialize(b + 1); + Scheduler::initializer_initialize(b + 1); + } + } + + fn default_config() -> HostConfiguration { + HostConfiguration { + parathread_cores: 3, + parachain_rotation_frequency: 10, + chain_availability_period: 3, + thread_availability_period: 5, + scheduling_lookahead: 2, + ..Default::default() + } + } + + #[test] + fn add_parathread_claim_works() { + let genesis_config = MockGenesisConfig { + configuration: crate::configuration::GenesisConfig { + config: default_config(), + ..Default::default() + }, + ..Default::default() + }; + + let thread_id = ParaId::from(10); + let collator = CollatorId::from(Sr25519Keyring::Alice.public()); + + new_test_ext(genesis_config).execute_with(|| { + Paras::schedule_para_initialize(thread_id, ParaGenesisArgs { + genesis_head: Vec::new().into(), + validation_code: Vec::new().into(), + parachain: false, + }); + + assert!(!Paras::is_parathread(&thread_id)); + + run_to_block(10, Some(vec![10])); + + assert!(Paras::is_parathread(&thread_id)); + + { + Scheduler::add_parathread_claim(ParathreadClaim(thread_id, collator.clone())); + let queue = ParathreadQueue::get(); + assert_eq!(queue.next_core_offset, 1); + assert_eq!(queue.queue.len(), 1); + assert_eq!(queue.queue[0], QueuedParathread { + claim: ParathreadEntry { + claim: ParathreadClaim(thread_id, collator.clone()), + retries: 0, + }, + core_offset: 0, + }); + } + + // due to the index, completing claims are not allowed. + { + let collator2 = CollatorId::from(Sr25519Keyring::Bob.public()); + Scheduler::add_parathread_claim(ParathreadClaim(thread_id, collator2.clone())); + let queue = ParathreadQueue::get(); + assert_eq!(queue.next_core_offset, 1); + assert_eq!(queue.queue.len(), 1); + assert_eq!(queue.queue[0], QueuedParathread { + claim: ParathreadEntry { + claim: ParathreadClaim(thread_id, collator.clone()), + retries: 0, + }, + core_offset: 0, + }); + } + + // claims on non-live parathreads have no effect. + { + let thread_id2 = ParaId::from(11); + Scheduler::add_parathread_claim(ParathreadClaim(thread_id2, collator.clone())); + let queue = ParathreadQueue::get(); + assert_eq!(queue.next_core_offset, 1); + assert_eq!(queue.queue.len(), 1); + assert_eq!(queue.queue[0], QueuedParathread { + claim: ParathreadEntry { + claim: ParathreadClaim(thread_id, collator.clone()), + retries: 0, + }, + core_offset: 0, + }); + } + }) + } + + // TODO [now]: schedule schedules all previously unassigned cores. + // TODO [now]: occupied successfully marks all cores as occupied. + // TODO [now]: availability predicate functions correctly. + // TODO [now]: session change shuffles validators correctly. + // TODO [now]: session change prunes queue members with too many retries + // TODO [now]: session change reassigns claims to cores. +} From 2703687f192504012ee36c3384d3028b952953ac Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sat, 6 Jun 2020 14:17:56 -0400 Subject: [PATCH 26/38] test that you cannot add claims when no parathread cores exist --- runtime/parachains/src/scheduler.rs | 36 +++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index ea3d0695e6be..454acc153962 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -749,6 +749,42 @@ mod tests { }) } + #[test] + fn cannot_add_claim_when_no_parathread_cores() { + let config = { + let mut config = default_config(); + config.parathread_cores = 0; + config + }; + let genesis_config = MockGenesisConfig { + configuration: crate::configuration::GenesisConfig { + config, + ..Default::default() + }, + ..Default::default() + }; + + let thread_id = ParaId::from(10); + let collator = CollatorId::from(Sr25519Keyring::Alice.public()); + + new_test_ext(genesis_config).execute_with(|| { + Paras::schedule_para_initialize(thread_id, ParaGenesisArgs { + genesis_head: Vec::new().into(), + validation_code: Vec::new().into(), + parachain: false, + }); + + assert!(!Paras::is_parathread(&thread_id)); + + run_to_block(10, Some(vec![10])); + + assert!(Paras::is_parathread(&thread_id)); + + Scheduler::add_parathread_claim(ParathreadClaim(thread_id, collator.clone())); + assert_eq!(ParathreadQueue::get(), Default::default()); + }); + } + // TODO [now]: schedule schedules all previously unassigned cores. // TODO [now]: occupied successfully marks all cores as occupied. // TODO [now]: availability predicate functions correctly. From f7073710059cc1e217d7833ca1067134b799f88a Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sat, 6 Jun 2020 15:01:58 -0400 Subject: [PATCH 27/38] check session change parathread queue pruning --- runtime/parachains/src/scheduler.rs | 140 ++++++++++++++++++++++++++-- 1 file changed, 130 insertions(+), 10 deletions(-) diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index 454acc153962..a8025e04a6ed 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -229,7 +229,7 @@ impl Module { } }) - } + } /// Called by the initializer to note that a new session has started. pub(crate) fn initializer_on_new_session(notification: &SessionChangeNotification) { @@ -308,7 +308,6 @@ impl Module { let will_keep = queued.claim.retries <= config.parathread_retries && >::is_parathread(&queued.claim.claim.0); - if !will_keep { let claim_para = queued.claim.claim.0; @@ -328,7 +327,7 @@ impl Module { } thread_queue.next_core_offset = - ((thread_queue.queue.len() + 1) as u32) % config.parathread_cores; + ((thread_queue.queue.len()) as u32) % config.parathread_cores; } }); ParathreadQueue::set(thread_queue); @@ -407,6 +406,8 @@ impl Module { let mut parathread_queue = ParathreadQueue::get(); let now = >::block_number(); + if ValidatorGroups::get().is_empty() { return } + { let mut prev_scheduled_in_order = scheduled.iter().enumerate().peekable(); @@ -638,11 +639,15 @@ mod tests { use frame_support::traits::{OnFinalize, OnInitialize}; use keyring::Sr25519Keyring; - use crate::mock::{new_test_ext, Paras, System, Scheduler, GenesisConfig as MockGenesisConfig}; + use crate::mock::{new_test_ext, Configuration, Paras, System, Scheduler, GenesisConfig as MockGenesisConfig}; + use crate::initializer::SessionChangeNotification; use crate::configuration::HostConfiguration; use crate::paras::ParaGenesisArgs; - fn run_to_block(to: BlockNumber, new_session: Option>) { + fn run_to_block( + to: BlockNumber, + new_session: impl Fn(BlockNumber) -> Option>, + ) { while System::block_number() < to { let b = System::block_number(); @@ -654,9 +659,9 @@ mod tests { System::on_initialize(b + 1); System::set_block_number(b + 1); - if new_session.as_ref().map_or(false, |v| v.contains(&(b + 1))) { - Paras::initializer_on_new_session(&Default::default()); - Scheduler::initializer_on_new_session(&Default::default()); + if let Some(notification) = new_session(b + 1) { + Paras::initializer_on_new_session(¬ification); + Scheduler::initializer_on_new_session(¬ification); } Paras::initializer_initialize(b + 1); @@ -671,6 +676,7 @@ mod tests { chain_availability_period: 3, thread_availability_period: 5, scheduling_lookahead: 2, + parathread_retries: 1, ..Default::default() } } @@ -697,7 +703,7 @@ mod tests { assert!(!Paras::is_parathread(&thread_id)); - run_to_block(10, Some(vec![10])); + run_to_block(10, |n| if n == 10 { Some(Default::default()) } else { None }); assert!(Paras::is_parathread(&thread_id)); @@ -776,7 +782,7 @@ mod tests { assert!(!Paras::is_parathread(&thread_id)); - run_to_block(10, Some(vec![10])); + run_to_block(10, |n| if n == 10 { Some(Default::default()) } else { None }); assert!(Paras::is_parathread(&thread_id)); @@ -785,6 +791,120 @@ mod tests { }); } + #[test] + fn session_change_prunes_cores_beyond_retries_and_those_from_non_live_parathreads() { + let genesis_config = MockGenesisConfig { + configuration: crate::configuration::GenesisConfig { + config: default_config(), + ..Default::default() + }, + ..Default::default() + }; + let max_parathread_retries = default_config().parathread_retries; + let n_cores = default_config().parathread_cores; + + let thread_a = ParaId::from(1); + let thread_b = ParaId::from(2); + let thread_c = ParaId::from(3); + let thread_d = ParaId::from(4); + + let collator = CollatorId::from(Sr25519Keyring::Alice.public()); + + new_test_ext(genesis_config).execute_with(|| { + assert_eq!(Configuration::config(), default_config()); + + // threads a, b, and c will be live in next session, but not d. + { + Paras::schedule_para_initialize(thread_a, ParaGenesisArgs { + genesis_head: Vec::new().into(), + validation_code: Vec::new().into(), + parachain: false, + }); + + Paras::schedule_para_initialize(thread_b, ParaGenesisArgs { + genesis_head: Vec::new().into(), + validation_code: Vec::new().into(), + parachain: false, + }); + + Paras::schedule_para_initialize(thread_c, ParaGenesisArgs { + genesis_head: Vec::new().into(), + validation_code: Vec::new().into(), + parachain: false, + }); + } + + // set up a queue as if n_cores was 4 and with some with many retries. + ParathreadQueue::put({ + let mut queue = ParathreadClaimQueue::default(); + + // Will be pruned: too many retries. + queue.queue_entry(ParathreadEntry { + claim: ParathreadClaim(thread_a, collator.clone()), + retries: max_parathread_retries + 1, + }, 4); + + // Will not be pruned. + queue.queue_entry(ParathreadEntry { + claim: ParathreadClaim(thread_b, collator.clone()), + retries: max_parathread_retries, + }, 4); + + // Will not be pruned. + queue.queue_entry(ParathreadEntry { + claim: ParathreadClaim(thread_c, collator.clone()), + retries: 0, + }, 4); + + // Will be pruned: not a live parathread. + queue.queue_entry(ParathreadEntry { + claim: ParathreadClaim(thread_d, collator.clone()), + retries: 0, + }, 4); + + queue + }); + + ParathreadClaimIndex::put(vec![thread_a, thread_b, thread_c, thread_d]); + + run_to_block( + 10, + |b| match b { + 10 => Some(SessionChangeNotification { + new_config: Configuration::config(), + ..Default::default() + }), + _ => None, + } + ); + assert_eq!(Configuration::config(), default_config()); + + let queue = ParathreadQueue::get(); + assert_eq!( + queue.queue, + vec![ + QueuedParathread { + claim: ParathreadEntry { + claim: ParathreadClaim(thread_b, collator.clone()), + retries: max_parathread_retries, + }, + core_offset: 0, + }, + QueuedParathread { + claim: ParathreadEntry { + claim: ParathreadClaim(thread_c, collator.clone()), + retries: 0, + }, + core_offset: 1, + }, + ] + ); + assert_eq!(queue.next_core_offset, 2); + + assert_eq!(ParathreadClaimIndex::get(), vec![thread_b, thread_c]); + }) + } + // TODO [now]: schedule schedules all previously unassigned cores. // TODO [now]: occupied successfully marks all cores as occupied. // TODO [now]: availability predicate functions correctly. From 40d39ebc7e6a6610284a61d4026d9121f0dac248 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sat, 6 Jun 2020 15:32:03 -0400 Subject: [PATCH 28/38] test validator shuffling --- runtime/parachains/src/scheduler.rs | 64 +++++++++++++++++++++++++++-- 1 file changed, 60 insertions(+), 4 deletions(-) diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index a8025e04a6ed..a743a296247e 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -635,7 +635,7 @@ impl Module { mod tests { use super::*; - use primitives::BlockNumber; + use primitives::{BlockNumber, parachain::ValidatorId}; use frame_support::traits::{OnFinalize, OnInitialize}; use keyring::Sr25519Keyring; @@ -905,10 +905,66 @@ mod tests { }) } + #[test] + fn session_change_shuffles_validators() { + let genesis_config = MockGenesisConfig { + configuration: crate::configuration::GenesisConfig { + config: default_config(), + ..Default::default() + }, + ..Default::default() + }; + + assert_eq!(default_config().parathread_cores, 3); + new_test_ext(genesis_config).execute_with(|| { + let chain_a = ParaId::from(1); + let chain_b = ParaId::from(2); + + // ensure that we have 5 groups by registering 2 parachains. + Paras::schedule_para_initialize(chain_a, ParaGenesisArgs { + genesis_head: Vec::new().into(), + validation_code: Vec::new().into(), + parachain: true, + }); + Paras::schedule_para_initialize(chain_b, ParaGenesisArgs { + genesis_head: Vec::new().into(), + validation_code: Vec::new().into(), + parachain: true, + }); + + run_to_block(1, |number| match number { + 1 => Some(SessionChangeNotification { + new_config: default_config(), + validators: vec![ + ValidatorId::from(Sr25519Keyring::Alice.public()), + ValidatorId::from(Sr25519Keyring::Bob.public()), + ValidatorId::from(Sr25519Keyring::Charlie.public()), + ValidatorId::from(Sr25519Keyring::Dave.public()), + ValidatorId::from(Sr25519Keyring::Eve.public()), + ValidatorId::from(Sr25519Keyring::Ferdie.public()), + ValidatorId::from(Sr25519Keyring::One.public()), + ], + random_seed: [99; 32], + ..Default::default() + }), + _ => None, + }); + + let groups = ValidatorGroups::get(); + assert_eq!(groups.len(), 5); + + // first two groups have the overflow. + for i in 0..2 { + assert_eq!(groups[i].len(), 2); + } + + for i in 2..5 { + assert_eq!(groups[i].len(), 1); + } + }); + } + // TODO [now]: schedule schedules all previously unassigned cores. // TODO [now]: occupied successfully marks all cores as occupied. // TODO [now]: availability predicate functions correctly. - // TODO [now]: session change shuffles validators correctly. - // TODO [now]: session change prunes queue members with too many retries - // TODO [now]: session change reassigns claims to cores. } From 19f6d5303f60e4ea7ac59006d327d59918e2b5c1 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sun, 7 Jun 2020 10:47:05 -0400 Subject: [PATCH 29/38] add allow_unused to scheduler items --- runtime/parachains/src/scheduler.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index a743a296247e..75748473f328 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -141,6 +141,7 @@ pub struct CoreAssignment { impl CoreAssignment { /// Get the ID of a collator who is required to collate this block. + #[allow(unused)] pub(crate) fn required_collator(&self) -> Option<&CollatorId> { match self.kind { AssignmentKind::Parachain => None, @@ -148,6 +149,7 @@ impl CoreAssignment { } } + #[allow(unused)] fn to_core_occupied(&self) -> CoreOccupied { match self.kind { AssignmentKind::Parachain => CoreOccupied::Parachain, @@ -162,6 +164,7 @@ impl CoreAssignment { } /// Reasons a core might be freed +#[allow(unused)] pub enum FreedReason { /// The core's work concluded and the parablock assigned to it is considered available. Concluded, @@ -337,6 +340,7 @@ impl Module { /// assigned to a core, this call will fail. This call will also fail if the queue is full. /// /// Fails if the claim does not correspond to any live parathread. + #[allow(unused)] pub fn add_parathread_claim(claim: ParathreadClaim) { if !>::is_parathread(&claim.0) { return } @@ -503,6 +507,7 @@ impl Module { /// /// Complexity: O(n) in the number of scheduled cores, which is capped at the number of total cores. /// This is efficient in the case that most scheduled cores are occupied. + #[allow(unused)] pub(crate) fn occupied(now_occupied: &[CoreIndex]) { if now_occupied.is_empty() { return } @@ -534,6 +539,7 @@ impl Module { /// Get the para (chain or thread) ID assigned to a particular core or index, if any. Core indices /// out of bounds will return `None`, as will indices of unassigned cores. + #[allow(unused)] pub(crate) fn core_para(core_index: CoreIndex) -> Option { let cores = AvailabilityCores::get(); match cores.get(core_index.0 as usize).and_then(|c| c.as_ref()) { @@ -547,6 +553,7 @@ impl Module { } /// Get the validators in the given group, if the group index is valid for this session. + #[allow(unused)] pub(crate) fn group_validators(group_index: GroupIndex) -> Option> { ValidatorGroups::get().get(group_index.0 as usize).map(|g| g.clone()) } @@ -589,6 +596,7 @@ impl Module { /// block number since the last validator group rotation, and the respective parachain and parathread /// timeouts, i.e. only within `max(config.chain_availability_period, config.thread_availability_period)` /// of the last rotation would this return `Some`. + #[allow(unused)] pub(crate) fn availability_timeout_predicate() -> Option bool> { let now = >::block_number(); let config = >::config(); @@ -801,7 +809,6 @@ mod tests { ..Default::default() }; let max_parathread_retries = default_config().parathread_retries; - let n_cores = default_config().parathread_cores; let thread_a = ParaId::from(1); let thread_b = ParaId::from(2); From f5ef2fa448a6128b819e0cb1b7f0aa794d1578ae Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sun, 7 Jun 2020 11:16:44 -0400 Subject: [PATCH 30/38] add test for scheduling --- runtime/parachains/src/scheduler.rs | 151 +++++++++++++++++++++++++++- 1 file changed, 148 insertions(+), 3 deletions(-) diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index 75748473f328..84c715e24c67 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -54,6 +54,7 @@ use crate::{configuration, paras, initializer::SessionChangeNotification}; /// The unique (during session) index of a core. #[derive(Encode, Decode, Default, PartialOrd, Ord, Eq, PartialEq, Clone, Copy)] +#[cfg_attr(test, derive(Debug))] pub struct CoreIndex(u32); /// The unique (during session) index of a validator group. @@ -114,6 +115,7 @@ impl ParathreadClaimQueue { /// What is occupying a specific availability core. #[derive(Clone, Encode, Decode)] +#[cfg_attr(test, derive(PartialEq, Debug))] pub(crate) enum CoreOccupied { Parathread(ParathreadEntry), Parachain, @@ -121,6 +123,7 @@ pub(crate) enum CoreOccupied { /// The assignment type. #[derive(Clone, Encode, Decode)] +#[cfg_attr(test, derive(PartialEq, Debug))] pub enum AssignmentKind { Parachain, Parathread(CollatorId, u32), @@ -128,6 +131,7 @@ pub enum AssignmentKind { /// How a free core is scheduled to be assigned. #[derive(Encode, Decode)] +#[cfg_attr(test, derive(PartialEq, Debug))] pub struct CoreAssignment { /// The core that is assigned. pub core: CoreIndex, @@ -971,7 +975,148 @@ mod tests { }); } - // TODO [now]: schedule schedules all previously unassigned cores. - // TODO [now]: occupied successfully marks all cores as occupied. - // TODO [now]: availability predicate functions correctly. + #[test] + fn schedule_schedules() { + let genesis_config = MockGenesisConfig { + configuration: crate::configuration::GenesisConfig { + config: default_config(), + ..Default::default() + }, + ..Default::default() + }; + + let chain_a = ParaId::from(1); + let chain_b = ParaId::from(2); + + let thread_a = ParaId::from(3); + let thread_b = ParaId::from(4); + let thread_c = ParaId::from(5); + + let collator = CollatorId::from(Sr25519Keyring::Alice.public()); + + let schedule_blank_para = |id, is_chain| Paras::schedule_para_initialize(id, ParaGenesisArgs { + genesis_head: Vec::new().into(), + validation_code: Vec::new().into(), + parachain: is_chain, + }); + + let validators = + + new_test_ext(genesis_config).execute_with(|| { + assert_eq!(default_config().parathread_cores, 3); + + // register 2 parachains + schedule_blank_para(chain_a, true); + schedule_blank_para(chain_b, true); + + // and 3 parathreads + schedule_blank_para(thread_a, false); + schedule_blank_para(thread_b, false); + schedule_blank_para(thread_c, false); + + // start a new session to activate, 5 validators for 5 cores. + run_to_block(1, |number| match number { + 1 => Some(SessionChangeNotification { + new_config: default_config(), + validators: vec![ + ValidatorId::from(Sr25519Keyring::Alice.public()), + ValidatorId::from(Sr25519Keyring::Bob.public()), + ValidatorId::from(Sr25519Keyring::Charlie.public()), + ValidatorId::from(Sr25519Keyring::Dave.public()), + ValidatorId::from(Sr25519Keyring::Eve.public()), + ], + ..Default::default() + }), + _ => None, + }); + + { + let scheduled = Scheduler::scheduled(); + assert_eq!(scheduled.len(), 2); + + assert_eq!(scheduled[0], CoreAssignment { + core: CoreIndex(0), + para_id: chain_a, + kind: AssignmentKind::Parachain, + group_idx: GroupIndex(0), + }); + + assert_eq!(scheduled[1], CoreAssignment { + core: CoreIndex(1), + para_id: chain_b, + kind: AssignmentKind::Parachain, + group_idx: GroupIndex(1), + }); + } + + // add a couple of parathread claims. + Scheduler::add_parathread_claim(ParathreadClaim(thread_a, collator.clone())); + Scheduler::add_parathread_claim(ParathreadClaim(thread_c, collator.clone())); + + run_to_block(2, |_| None); + + { + let scheduled = Scheduler::scheduled(); + assert_eq!(scheduled.len(), 4); + + assert_eq!(scheduled[0], CoreAssignment { + core: CoreIndex(0), + para_id: chain_a, + kind: AssignmentKind::Parachain, + group_idx: GroupIndex(0), + }); + + assert_eq!(scheduled[1], CoreAssignment { + core: CoreIndex(1), + para_id: chain_b, + kind: AssignmentKind::Parachain, + group_idx: GroupIndex(1), + }); + + assert_eq!(scheduled[2], CoreAssignment{ + core: CoreIndex(2), + para_id: thread_a, + kind: AssignmentKind::Parathread(collator.clone(), 0), + group_idx: GroupIndex(2), + }); + + assert_eq!(scheduled[3], CoreAssignment{ + core: CoreIndex(3), + para_id: thread_c, + kind: AssignmentKind::Parathread(collator.clone(), 0), + group_idx: GroupIndex(3), + }); + } + }); + } + + #[test] + fn schedule_schedules_including_just_freed() { + // TODO [now] + } + + #[test] + fn schedule_rotates_groups() { + // TODO [now] + } + + #[test] + fn concluded_parathreads_are_removed_from_index() { + // TODO [now] + } + + #[test] + fn timed_out_parathreads_do_not_have_retries_incremented() { + // TODO [now] + } + + #[test] + fn occupied_marks_cores_occupied() { + // TODO [now] + } + + #[test] + fn availability_predicate_works() { + // TODO [now] + } } From 2d52e0596095e4ca652958c678704fdb1fec91a5 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sun, 7 Jun 2020 11:43:46 -0400 Subject: [PATCH 31/38] add some more tests for scheduling logic --- runtime/parachains/src/scheduler.rs | 172 +++++++++++++++++++++++++--- 1 file changed, 159 insertions(+), 13 deletions(-) diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index 84c715e24c67..d9a59159b447 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -1000,8 +1000,6 @@ mod tests { parachain: is_chain, }); - let validators = - new_test_ext(genesis_config).execute_with(|| { assert_eq!(default_config().parathread_cores, 3); @@ -1092,26 +1090,174 @@ mod tests { #[test] fn schedule_schedules_including_just_freed() { - // TODO [now] - } + let genesis_config = MockGenesisConfig { + configuration: crate::configuration::GenesisConfig { + config: default_config(), + ..Default::default() + }, + ..Default::default() + }; - #[test] - fn schedule_rotates_groups() { - // TODO [now] - } + let chain_a = ParaId::from(1); + let chain_b = ParaId::from(2); - #[test] - fn concluded_parathreads_are_removed_from_index() { - // TODO [now] + let thread_a = ParaId::from(3); + let thread_b = ParaId::from(4); + let thread_c = ParaId::from(5); + let thread_d = ParaId::from(6); + let thread_e = ParaId::from(7); + + let collator = CollatorId::from(Sr25519Keyring::Alice.public()); + + let schedule_blank_para = |id, is_chain| Paras::schedule_para_initialize(id, ParaGenesisArgs { + genesis_head: Vec::new().into(), + validation_code: Vec::new().into(), + parachain: is_chain, + }); + + new_test_ext(genesis_config).execute_with(|| { + assert_eq!(default_config().parathread_cores, 3); + + // register 2 parachains + schedule_blank_para(chain_a, true); + schedule_blank_para(chain_b, true); + + // and 5 parathreads + schedule_blank_para(thread_a, false); + schedule_blank_para(thread_b, false); + schedule_blank_para(thread_c, false); + schedule_blank_para(thread_d, false); + schedule_blank_para(thread_e, false); + + // start a new session to activate, 5 validators for 5 cores. + run_to_block(1, |number| match number { + 1 => Some(SessionChangeNotification { + new_config: default_config(), + validators: vec![ + ValidatorId::from(Sr25519Keyring::Alice.public()), + ValidatorId::from(Sr25519Keyring::Bob.public()), + ValidatorId::from(Sr25519Keyring::Charlie.public()), + ValidatorId::from(Sr25519Keyring::Dave.public()), + ValidatorId::from(Sr25519Keyring::Eve.public()), + ], + ..Default::default() + }), + _ => None, + }); + + // add a couple of parathread claims now that the parathreads are live. + Scheduler::add_parathread_claim(ParathreadClaim(thread_a, collator.clone())); + Scheduler::add_parathread_claim(ParathreadClaim(thread_c, collator.clone())); + + run_to_block(2, |_| None); + + assert_eq!(Scheduler::scheduled().len(), 4); + + // cores 0, 1, 2, and 3 should be occupied. mark them as such. + Scheduler::occupied(&[CoreIndex(0), CoreIndex(1), CoreIndex(2), CoreIndex(3)]); + + { + let cores = AvailabilityCores::get(); + + assert!(cores[0].is_some()); + assert!(cores[1].is_some()); + assert!(cores[2].is_some()); + assert!(cores[3].is_some()); + assert!(cores[4].is_none()); + + assert!(Scheduler::scheduled().is_empty()); + } + + // add a couple more parathread claims - the claim on `b` will go to the 3rd parathread core (4) + // and the claim on `d` will go back to the 1st parathread core (2). The claim on `e` then + // will go for core `3`. + Scheduler::add_parathread_claim(ParathreadClaim(thread_b, collator.clone())); + Scheduler::add_parathread_claim(ParathreadClaim(thread_d, collator.clone())); + Scheduler::add_parathread_claim(ParathreadClaim(thread_e, collator.clone())); + + run_to_block(3, |_| None); + + { + let scheduled = Scheduler::scheduled(); + + // cores 0 and 1 are occupied by parachains. cores 2 and 3 are occupied by parathread + // claims. core 4 was free. + assert_eq!(scheduled.len(), 1); + assert_eq!(scheduled[0], CoreAssignment { + core: CoreIndex(4), + para_id: thread_b, + kind: AssignmentKind::Parathread(collator.clone(), 0), + group_idx: GroupIndex(4), + }); + } + + // now note that cores 0, 2, and 3 were freed. + Scheduler::schedule(vec![ + (CoreIndex(0), FreedReason::Concluded), + (CoreIndex(2), FreedReason::Concluded), + (CoreIndex(3), FreedReason::TimedOut), // should go back on queue. + ]); + + { + let scheduled = Scheduler::scheduled(); + + println!("{:?}", scheduled); + + // 1 thing scheduled before, + 3 cores freed. + assert_eq!(scheduled.len(), 4); + assert_eq!(scheduled[0], CoreAssignment { + core: CoreIndex(0), + para_id: chain_a, + kind: AssignmentKind::Parachain, + group_idx: GroupIndex(0), + }); + assert_eq!(scheduled[1], CoreAssignment { + core: CoreIndex(2), + para_id: thread_d, + kind: AssignmentKind::Parathread(collator.clone(), 0), + group_idx: GroupIndex(2), + }); + assert_eq!(scheduled[2], CoreAssignment { + core: CoreIndex(3), + para_id: thread_e, + kind: AssignmentKind::Parathread(collator.clone(), 0), + group_idx: GroupIndex(3), + }); + assert_eq!(scheduled[3], CoreAssignment { + core: CoreIndex(4), + para_id: thread_b, + kind: AssignmentKind::Parathread(collator.clone(), 0), + group_idx: GroupIndex(4), + }); + + // the prior claim on thread A concluded, but the claim on thread C was marked as + // timed out. + let index = ParathreadClaimIndex::get(); + let parathread_queue = ParathreadQueue::get(); + + // thread A claim should have been wiped, but thread C claim should remain. + assert_eq!(index, vec![thread_b, thread_c, thread_d, thread_e]); + + // Although C was descheduled, the core `4` was occupied so C goes back on the queue. + assert_eq!(parathread_queue.queue.len(), 1); + assert_eq!(parathread_queue.queue[0], QueuedParathread { + claim: ParathreadEntry { + claim: ParathreadClaim(thread_c, collator.clone()), + retries: 0, // retries not incremented by timeout - validators' fault. + }, + core_offset: 2, // reassigned to next core. thread_e claim was on offset 1. + }); + } + }); } #[test] - fn timed_out_parathreads_do_not_have_retries_incremented() { + fn schedule_rotates_groups() { // TODO [now] } #[test] - fn occupied_marks_cores_occupied() { + fn unoccupied_core_means_retries_incremented() { // TODO [now] } From 56e86699224229c8478b56e4e41b8011d913e959 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sun, 7 Jun 2020 17:25:33 -0400 Subject: [PATCH 32/38] test core rotation --- runtime/parachains/src/scheduler.rs | 89 ++++++++++++++++++++++++++++- 1 file changed, 86 insertions(+), 3 deletions(-) diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index d9a59159b447..9059e267e9c7 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -1201,8 +1201,6 @@ mod tests { { let scheduled = Scheduler::scheduled(); - println!("{:?}", scheduled); - // 1 thing scheduled before, + 3 cores freed. assert_eq!(scheduled.len(), 4); assert_eq!(scheduled[0], CoreAssignment { @@ -1253,7 +1251,92 @@ mod tests { #[test] fn schedule_rotates_groups() { - // TODO [now] + let config = { + let mut config = default_config(); + + // make sure parathread requests don't retry-out + config.parathread_retries = config.parachain_rotation_frequency * 3; + config.parathread_cores = 2; + config + }; + + let rotation_frequency = config.parachain_rotation_frequency; + let parathread_cores = config.parathread_cores; + + let genesis_config = MockGenesisConfig { + configuration: crate::configuration::GenesisConfig { + config: config.clone(), + ..Default::default() + }, + ..Default::default() + }; + + let thread_a = ParaId::from(1); + let thread_b = ParaId::from(2); + + let collator = CollatorId::from(Sr25519Keyring::Alice.public()); + + let schedule_blank_para = |id, is_chain| Paras::schedule_para_initialize(id, ParaGenesisArgs { + genesis_head: Vec::new().into(), + validation_code: Vec::new().into(), + parachain: is_chain, + }); + + new_test_ext(genesis_config).execute_with(|| { + assert_eq!(default_config().parathread_cores, 3); + + schedule_blank_para(thread_a, false); + schedule_blank_para(thread_b, false); + + // start a new session to activate, 5 validators for 5 cores. + run_to_block(1, |number| match number { + 1 => Some(SessionChangeNotification { + new_config: config.clone(), + validators: vec![ + ValidatorId::from(Sr25519Keyring::Alice.public()), + ValidatorId::from(Sr25519Keyring::Eve.public()), + ], + ..Default::default() + }), + _ => None, + }); + + let session_start_block = ::SessionStartBlock::get(); + assert_eq!(session_start_block, 1); + + Scheduler::add_parathread_claim(ParathreadClaim(thread_a, collator.clone())); + Scheduler::add_parathread_claim(ParathreadClaim(thread_b, collator.clone())); + + run_to_block(2, |_| None); + + let assert_groups_rotated = |rotations: u32| { + let scheduled = Scheduler::scheduled(); + assert_eq!(scheduled.len(), 2); + assert_eq!(scheduled[0].group_idx, GroupIndex((0u32 + rotations) % parathread_cores)); + assert_eq!(scheduled[1].group_idx, GroupIndex((1u32 + rotations) % parathread_cores)); + }; + + assert_groups_rotated(0); + + // one block before first rotation. + run_to_block(rotation_frequency, |_| None); + + let rotations_since_session_start = (rotation_frequency - session_start_block) / rotation_frequency; + assert_eq!(rotations_since_session_start, 0); + assert_groups_rotated(0); + + // first rotation. + run_to_block(rotation_frequency + 1, |_| None); + assert_groups_rotated(1); + + // one block before second rotation. + run_to_block(rotation_frequency * 2, |_| None); + assert_groups_rotated(1); + + // second rotation. + run_to_block(rotation_frequency * 2 + 1, |_| None); + assert_groups_rotated(2); + }); } #[test] From a909ad8b257b75e8b90303db20fb6af0ec5cfedd Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sun, 7 Jun 2020 17:35:54 -0400 Subject: [PATCH 33/38] check parathread claim pruning after retries --- runtime/parachains/src/scheduler.rs | 55 +++++++++++++++++++++++++++-- 1 file changed, 53 insertions(+), 2 deletions(-) diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index 9059e267e9c7..c57b4c8f8f71 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -1340,8 +1340,59 @@ mod tests { } #[test] - fn unoccupied_core_means_retries_incremented() { - // TODO [now] + fn parathread_claims_are_pruned_after_retries() { + let max_retries = default_config().parathread_retries; + + let genesis_config = MockGenesisConfig { + configuration: crate::configuration::GenesisConfig { + config: default_config(), + ..Default::default() + }, + ..Default::default() + }; + + let thread_a = ParaId::from(1); + let thread_b = ParaId::from(2); + + let collator = CollatorId::from(Sr25519Keyring::Alice.public()); + + let schedule_blank_para = |id, is_chain| Paras::schedule_para_initialize(id, ParaGenesisArgs { + genesis_head: Vec::new().into(), + validation_code: Vec::new().into(), + parachain: is_chain, + }); + + new_test_ext(genesis_config).execute_with(|| { + assert_eq!(default_config().parathread_cores, 3); + + schedule_blank_para(thread_a, false); + schedule_blank_para(thread_b, false); + + // start a new session to activate, 5 validators for 5 cores. + run_to_block(1, |number| match number { + 1 => Some(SessionChangeNotification { + new_config: default_config(), + validators: vec![ + ValidatorId::from(Sr25519Keyring::Alice.public()), + ValidatorId::from(Sr25519Keyring::Eve.public()), + ], + ..Default::default() + }), + _ => None, + }); + + Scheduler::add_parathread_claim(ParathreadClaim(thread_a, collator.clone())); + Scheduler::add_parathread_claim(ParathreadClaim(thread_b, collator.clone())); + + run_to_block(2, |_| None); + assert_eq!(Scheduler::scheduled().len(), 2); + + run_to_block(2 + max_retries, |_| None); + assert_eq!(Scheduler::scheduled().len(), 2); + + run_to_block(2 + max_retries + 1, |_| None); + assert_eq!(Scheduler::scheduled().len(), 0); + }); } #[test] From 89ad0f8b1e34eb1bf5814aa8ed23538e688e461d Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Sun, 7 Jun 2020 18:01:12 -0400 Subject: [PATCH 34/38] add bound notes --- runtime/parachains/src/scheduler.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index c57b4c8f8f71..5f7ca7314f85 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -181,20 +181,33 @@ pub trait Trait: system::Trait + configuration::Trait + paras::Trait { } decl_storage! { trait Store for Module as ParaScheduler { /// All the validator groups. One for each core. + /// + /// Bound: The number of cores is the sum of the numbers of parachains and parathread multiplexers. + /// Reasonably, 100-1000. The dominant factor is the number of validators: safe upper bound at 10k. ValidatorGroups: Vec>; + /// A queue of upcoming claims and which core they should be mapped onto. + /// + /// The number of queued claims is bounded at the `scheduling_lookahead` + /// multiplied by the number of parathread multiplexer cores. Reasonably, 10 * 50 = 500. ParathreadQueue: ParathreadClaimQueue; /// One entry for each availability core. Entries are `None` if the core is not currently occupied. Can be /// temporarily `Some` if scheduled but not occupied. /// The i'th parachain belongs to the i'th core, with the remaining cores all being /// parathread-multiplexers. + /// + /// Bounded by the number of cores: one for each parachain and parathread multiplexer. AvailabilityCores: Vec>; /// An index used to ensure that only one claim on a parathread exists in the queue or is /// currently being handled by an occupied core. + /// + /// Bounded by the number of parathread cores and scheduling lookahead. Reasonably, 10 * 50 = 500. ParathreadClaimIndex: Vec; /// The block number where the session start occurred. Used to track how many group rotations have occurred. SessionStartBlock: T::BlockNumber; /// Currently scheduled cores - free but up to be occupied. Ephemeral storage item that's wiped on finalization. + /// + /// Bounded by the number of cores: one for each parachain and parathread multiplexer. Scheduled get(fn scheduled): Vec; // sorted ascending by CoreIndex. } } From f85ff4eb07e31ca0fcbacb3c899dc53fc925217c Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 10 Jun 2020 14:13:05 -0400 Subject: [PATCH 35/38] Apply suggestions from code review Co-authored-by: Peter Goodspeed-Niklaus Co-authored-by: Bernhard Schuster --- roadmap/implementors-guide/guide.md | 10 +++++----- runtime/parachains/src/paras.rs | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/roadmap/implementors-guide/guide.md b/roadmap/implementors-guide/guide.md index 4e1d6fa81513..eb926ccbb762 100644 --- a/roadmap/implementors-guide/guide.md +++ b/roadmap/implementors-guide/guide.md @@ -498,7 +498,7 @@ OutgoingParas: Vec; 1. Clean up outgoing paras. This means removing the entries under `Heads`, `ValidationCode`, `FutureCodeUpgrades`, and `FutureCode`. An according entry should be added to `PastCode`, `PastCodeMeta`, and `PastCodePruning` using the outgoing `ParaId` and removed `ValidationCode` value. This is because any outdated validation code must remain available on-chain for a determined amount of blocks, and validation code outdated by de-registering the para is still subject to that invariant. 1. Apply all incoming paras by initializing the `Heads` and `ValidationCode` using the genesis parameters. 1. Amend the `Parachains` list to reflect changes in registered parachains. -1. Amend the `Parathreads` map to reflect changes in registered parathreads. +1. Amend the `Parathreads` set to reflect changes in registered parathreads. #### Initialization @@ -609,13 +609,13 @@ struct ParathreadEntry { // A queued parathread entry, pre-assigned to a core. struct QueuedParathread { claim: ParathreadEntry, - // this value is between 0 and config.parathread_cores. + /// offset within the set of para-threads ranged `0..config.parathread_cores`. core_offset: u32, } struct ParathreadQueue { queue: Vec, - // this value is between 0 and config.parathread_cores + /// offset within the set of para-threads ranged `0..config.parathread_cores`. next_core_offset: u32, } @@ -703,8 +703,8 @@ Actions: * `schedule(Vec<(CoreIndex, FreedReason)>)`: schedule new core assignments, with a parameter indicating previously-occupied cores which are to be considered returned and why they are being returned. - All freed parachain cores should be assigned to their respective parachain - - All freed parathread cores should have the claim removed from the claim index, if the reason for freeing was `FreedReason::Concluded` - - All freed parathread cores should have the claim added to the parathread queue again without retries incremented, if the reason for freeing was `FreedReason::TimedOut`. + - All freed parathread cores whose reason for freeing was `FreedReason::Concluded` should have the claim removed from the claim index. + - All freed parathread cores whose reason for freeing was `FreedReason::TimedOut` should have the claim added to the parathread queue again without retries incremented. - All freed parathread cores should take the next parathread entry from the queue. - The i'th validator group will be assigned to the `(i+k)%n`'th core at any point in time, where `k` is the number of rotations that have occurred in the session, and `n` is the total number of cores. This makes upcoming rotations within the same session predictable. * `scheduled() -> Vec`: Get currently scheduled core assignments. diff --git a/runtime/parachains/src/paras.rs b/runtime/parachains/src/paras.rs index bd9526dd76f1..9d8c74ca0932 100644 --- a/runtime/parachains/src/paras.rs +++ b/runtime/parachains/src/paras.rs @@ -523,7 +523,7 @@ impl Module { } /// Whether a para ID corresponds to any live parathread. - pub(crate) fn is_parathread(id: &ParaId) -> bool { + pub(crate) fn is_parathread(id: ParaId) -> bool { Parathreads::get(&id).is_some() } } From 50965e8e256b45812a72fc34fbd3ed08927d786c Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 10 Jun 2020 14:22:53 -0400 Subject: [PATCH 36/38] more suggestions from review --- runtime/parachains/src/scheduler.rs | 41 +++++++++++++++-------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index 5f7ca7314f85..8e93300df4eb 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -93,10 +93,10 @@ pub struct ParathreadClaimQueue { } impl ParathreadClaimQueue { - // Queue a parathread entry to be processed. - // - // Provide the entry and the number of parathread cores, which must be greater than 0. - fn queue_entry(&mut self, entry: ParathreadEntry, n_parathread_cores: u32) { + /// Queue a parathread entry to be processed. + /// + /// Provide the entry and the number of parathread cores, which must be greater than 0. + fn enqueue_entry(&mut self, entry: ParathreadEntry, n_parathread_cores: u32) { let core_offset = self.next_core_offset; self.next_core_offset = (self.next_core_offset + 1) % n_parathread_cores; @@ -243,8 +243,9 @@ impl Module { retries: retries + 1, }; - if entry.retries > config.parathread_retries { continue } - queue.queue_entry(entry, config.parathread_cores); + if entry.retries <= config.parathread_retries { + queue.enqueue_entry(entry, config.parathread_cores); + } } } }) @@ -296,9 +297,9 @@ impl Module { shuffled_indices.shuffle(&mut rng); let group_base_size = validators.len() / n_cores as usize; - let larger_groups = validators.len() % n_cores as usize; + let n_larger_groups = validators.len() % n_cores as usize; let groups: Vec> = (0..n_cores).map(|core_id| { - let n_members = if (core_id as usize) < larger_groups { + let n_members = if (core_id as usize) < n_larger_groups { group_base_size + 1 } else { group_base_size @@ -326,7 +327,7 @@ impl Module { // prune out all entries beyond retry or that no longer correspond to live parathread. thread_queue.queue.retain(|queued| { let will_keep = queued.claim.retries <= config.parathread_retries - && >::is_parathread(&queued.claim.claim.0); + && >::is_parathread(queued.claim.claim.0); if !will_keep { let claim_para = queued.claim.claim.0; @@ -359,7 +360,7 @@ impl Module { /// Fails if the claim does not correspond to any live parathread. #[allow(unused)] pub fn add_parathread_claim(claim: ParathreadClaim) { - if !>::is_parathread(&claim.0) { return } + if !>::is_parathread(claim.0) { return } let config = >::config(); let queue_max_size = config.parathread_cores * config.scheduling_lookahead; @@ -382,7 +383,7 @@ impl Module { if competes_with_another { return } let entry = ParathreadEntry { claim, retries: 0 }; - queue.queue_entry(entry, config.parathread_cores); + queue.enqueue_entry(entry, config.parathread_cores); }) } @@ -413,7 +414,7 @@ impl Module { // If a parathread candidate times out, it's not the collator's fault, // so we don't increment retries. ParathreadQueue::mutate(|queue| { - queue.queue_entry(entry, config.parathread_cores); + queue.enqueue_entry(entry, config.parathread_cores); }) } } @@ -726,11 +727,11 @@ mod tests { parachain: false, }); - assert!(!Paras::is_parathread(&thread_id)); + assert!(!Paras::is_parathread(thread_id)); run_to_block(10, |n| if n == 10 { Some(Default::default()) } else { None }); - assert!(Paras::is_parathread(&thread_id)); + assert!(Paras::is_parathread(thread_id)); { Scheduler::add_parathread_claim(ParathreadClaim(thread_id, collator.clone())); @@ -805,11 +806,11 @@ mod tests { parachain: false, }); - assert!(!Paras::is_parathread(&thread_id)); + assert!(!Paras::is_parathread(thread_id)); run_to_block(10, |n| if n == 10 { Some(Default::default()) } else { None }); - assert!(Paras::is_parathread(&thread_id)); + assert!(Paras::is_parathread(thread_id)); Scheduler::add_parathread_claim(ParathreadClaim(thread_id, collator.clone())); assert_eq!(ParathreadQueue::get(), Default::default()); @@ -863,25 +864,25 @@ mod tests { let mut queue = ParathreadClaimQueue::default(); // Will be pruned: too many retries. - queue.queue_entry(ParathreadEntry { + queue.enqueue_entry(ParathreadEntry { claim: ParathreadClaim(thread_a, collator.clone()), retries: max_parathread_retries + 1, }, 4); // Will not be pruned. - queue.queue_entry(ParathreadEntry { + queue.enqueue_entry(ParathreadEntry { claim: ParathreadClaim(thread_b, collator.clone()), retries: max_parathread_retries, }, 4); // Will not be pruned. - queue.queue_entry(ParathreadEntry { + queue.enqueue_entry(ParathreadEntry { claim: ParathreadClaim(thread_c, collator.clone()), retries: 0, }, 4); // Will be pruned: not a live parathread. - queue.queue_entry(ParathreadEntry { + queue.enqueue_entry(ParathreadEntry { claim: ParathreadClaim(thread_d, collator.clone()), retries: 0, }, 4); From 39187134d64cc0b881d82b4ee1fe13e8b04effba Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 10 Jun 2020 20:23:18 -0400 Subject: [PATCH 37/38] test availability predicate, add box to please compiler --- runtime/parachains/src/scheduler.rs | 118 ++++++++++++++++++++++++++-- 1 file changed, 113 insertions(+), 5 deletions(-) diff --git a/runtime/parachains/src/scheduler.rs b/runtime/parachains/src/scheduler.rs index 8e93300df4eb..4f0554407774 100644 --- a/runtime/parachains/src/scheduler.rs +++ b/runtime/parachains/src/scheduler.rs @@ -611,11 +611,15 @@ impl Module { /// Returns an optional predicate that should be used for timing out occupied cores. /// /// If `None`, no timing-out should be done. The predicate accepts the index of the core, and the - /// block number since the last validator group rotation, and the respective parachain and parathread + /// block number since which it has been occupied, and the respective parachain and parathread /// timeouts, i.e. only within `max(config.chain_availability_period, config.thread_availability_period)` /// of the last rotation would this return `Some`. + /// + /// This really should not be a box, but is working around a compiler limitation described here: + /// https://users.rust-lang.org/t/cannot-unify-associated-type-in-impl-fn-with-concrete-type/44129 + /// which prevents us from testing the code if using `impl Trait`. #[allow(unused)] - pub(crate) fn availability_timeout_predicate() -> Option bool> { + pub(crate) fn availability_timeout_predicate() -> Option bool>> { let now = >::block_number(); let config = >::config(); @@ -633,7 +637,7 @@ impl Module { if blocks_since_last_rotation >= absolute_cutoff { None } else { - Some(move |core_index: CoreIndex, pending_since| { + Some(Box::new(move |core_index: CoreIndex, pending_since| { match availability_cores.get(core_index.0 as usize) { None => true, // out-of-bounds, doesn't really matter what is returned. Some(None) => true, // core not occupied, still doesn't really matter. @@ -652,7 +656,7 @@ impl Module { } } } - }) + })) } } } @@ -1411,6 +1415,110 @@ mod tests { #[test] fn availability_predicate_works() { - // TODO [now] + let genesis_config = MockGenesisConfig { + configuration: crate::configuration::GenesisConfig { + config: default_config(), + ..Default::default() + }, + ..Default::default() + }; + + let HostConfiguration { + parachain_rotation_frequency, + chain_availability_period, + thread_availability_period, + .. + } = default_config(); + let collator = CollatorId::from(Sr25519Keyring::Alice.public()); + + assert!(chain_availability_period < thread_availability_period && + thread_availability_period < parachain_rotation_frequency); + + let chain_a = ParaId::from(1); + let thread_a = ParaId::from(2); + + let schedule_blank_para = |id, is_chain| Paras::schedule_para_initialize(id, ParaGenesisArgs { + genesis_head: Vec::new().into(), + validation_code: Vec::new().into(), + parachain: is_chain, + }); + + new_test_ext(genesis_config).execute_with(|| { + schedule_blank_para(chain_a, true); + schedule_blank_para(thread_a, false); + + // start a new session with our chain & thread registered. + run_to_block(1, |number| match number { + 1 => Some(SessionChangeNotification { + new_config: default_config(), + validators: vec![ + ValidatorId::from(Sr25519Keyring::Alice.public()), + ValidatorId::from(Sr25519Keyring::Bob.public()), + ValidatorId::from(Sr25519Keyring::Charlie.public()), + ValidatorId::from(Sr25519Keyring::Dave.public()), + ValidatorId::from(Sr25519Keyring::Eve.public()), + ], + ..Default::default() + }), + _ => None, + }); + + // assign some availability cores. + { + AvailabilityCores::mutate(|cores| { + cores[0] = Some(CoreOccupied::Parachain); + cores[1] = Some(CoreOccupied::Parathread(ParathreadEntry { + claim: ParathreadClaim(thread_a, collator), + retries: 0, + })) + }); + } + + run_to_block(1 + thread_availability_period, |_| None); + assert!(Scheduler::availability_timeout_predicate().is_none()); + + run_to_block(1 + parachain_rotation_frequency, |_| None); + + { + let pred = Scheduler::availability_timeout_predicate() + .expect("predicate exists recently after rotation"); + + let now = System::block_number(); + let would_be_timed_out = now - thread_availability_period; + for i in 0..AvailabilityCores::get().len() { + // returns true for unoccupied cores. + // And can time out both threads and chains at this stage. + assert!(pred(CoreIndex(i as u32), would_be_timed_out)); + } + + assert!(!pred(CoreIndex(0), now)); // assigned: chain + assert!(!pred(CoreIndex(1), now)); // assigned: thread + assert!(pred(CoreIndex(2), now)); + + // check the tighter bound on chains vs threads. + assert!(pred(CoreIndex(0), now - chain_availability_period)); + assert!(!pred(CoreIndex(1), now - chain_availability_period)); + + // check the threshold is exact. + assert!(!pred(CoreIndex(0), now - chain_availability_period + 1)); + assert!(!pred(CoreIndex(1), now - thread_availability_period + 1)); + } + + run_to_block(1 + parachain_rotation_frequency + chain_availability_period, |_| None); + + { + let pred = Scheduler::availability_timeout_predicate() + .expect("predicate exists recently after rotation"); + + let would_be_timed_out = System::block_number() - thread_availability_period; + + assert!(!pred(CoreIndex(0), would_be_timed_out)); // chains can't be timed out now. + assert!(pred(CoreIndex(1), would_be_timed_out)); // but threads can. + } + + run_to_block(1 + parachain_rotation_frequency + thread_availability_period, |_| None); + + assert!(Scheduler::availability_timeout_predicate().is_none()); + }); } } From 63dfb9c70ca138bdb112874df530b6b567135ff7 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Thu, 11 Jun 2020 12:55:14 -0400 Subject: [PATCH 38/38] add changes to guide --- .../src/runtime/inclusioninherent.md | 5 ++- .../implementors-guide/src/runtime/paras.md | 4 +++ .../src/runtime/scheduler.md | 33 +++++++++++++------ 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/roadmap/implementors-guide/src/runtime/inclusioninherent.md b/roadmap/implementors-guide/src/runtime/inclusioninherent.md index 0fceb78f7f95..bd5ecc375a93 100644 --- a/roadmap/implementors-guide/src/runtime/inclusioninherent.md +++ b/roadmap/implementors-guide/src/runtime/inclusioninherent.md @@ -17,9 +17,8 @@ Included: Option<()>, ## Entry Points * `inclusion`: This entry-point accepts two parameters: [`Bitfields`](/type-definitions.html#signed-availability-bitfield) and [`BackedCandidates`](/type-definitions.html#backed-candidate). - 1. The `Bitfields` are first forwarded to the `process_bitfields` routine, returning a set of freed cores. Provide a `Scheduler::core_para` as a core-lookup to the `process_bitfields` routine. - 1. If `Scheduler::availability_timeout_predicate` is `Some`, invoke `Inclusion::collect_pending` using it, and add timed-out cores to the free cores. + 1. The `Bitfields` are first forwarded to the `process_bitfields` routine, returning a set of freed cores. Provide a `Scheduler::core_para` as a core-lookup to the `process_bitfields` routine. Annotate each of these freed cores with `FreedReason::Concluded`. + 1. If `Scheduler::availability_timeout_predicate` is `Some`, invoke `Inclusion::collect_pending` using it, and add timed-out cores to the free cores, annotated with `FreedReason::TimedOut`. 1. Invoke `Scheduler::schedule(freed)` - 1. Pass the `BackedCandidates` along with the output of `Scheduler::scheduled` to the `Inclusion::process_candidates` routine, getting a list of all newly-occupied cores. 1. Call `Scheduler::occupied` for all scheduled cores where a backed candidate was submitted. 1. If all of the above succeeds, set `Included` to `Some(())`. diff --git a/roadmap/implementors-guide/src/runtime/paras.md b/roadmap/implementors-guide/src/runtime/paras.md index b523c1fe20c3..d22021d68116 100644 --- a/roadmap/implementors-guide/src/runtime/paras.md +++ b/roadmap/implementors-guide/src/runtime/paras.md @@ -56,6 +56,8 @@ Storage layout: ```rust /// All parachains. Ordered ascending by ParaId. Parathreads are not included. Parachains: Vec, +/// All parathreads. +Parathreads: map ParaId => Option<()>, /// The head-data of every registered para. Heads: map ParaId => Option; /// The validation code of every live para. @@ -94,6 +96,7 @@ OutgoingParas: Vec; 1. Clean up outgoing paras. This means removing the entries under `Heads`, `ValidationCode`, `FutureCodeUpgrades`, and `FutureCode`. An according entry should be added to `PastCode`, `PastCodeMeta`, and `PastCodePruning` using the outgoing `ParaId` and removed `ValidationCode` value. This is because any outdated validation code must remain available on-chain for a determined amount of blocks, and validation code outdated by de-registering the para is still subject to that invariant. 1. Apply all incoming paras by initializing the `Heads` and `ValidationCode` using the genesis parameters. 1. Amend the `Parachains` list to reflect changes in registered parachains. +1. Amend the `Parathreads` set to reflect changes in registered parathreads. ## Initialization @@ -106,6 +109,7 @@ OutgoingParas: Vec; * `schedule_code_upgrade(ParaId, ValidationCode, expected_at: BlockNumber)`: Schedule a future code upgrade of the given parachain, to be applied after inclusion of a block of the same parachain executed in the context of a relay-chain block with number >= `expected_at`. * `note_new_head(ParaId, HeadData, BlockNumber)`: note that a para has progressed to a new head, where the new head was executed in the context of a relay-chain block with given number. This will apply pending code upgrades based on the block number provided. * `validation_code_at(ParaId, at: BlockNumber, assume_intermediate: Option)`: Fetches the validation code to be used when validating a block in the context of the given relay-chain height. A second block number parameter may be used to tell the lookup to proceed as if an intermediate parablock has been included at the given relay-chain height. This may return past, current, or (with certain choices of `assume_intermediate`) future code. `assume_intermediate`, if provided, must be before `at`. If the validation code has been pruned, this will return `None`. +* `is_parathread(ParaId) -> bool`: Returns true if the para ID references any live parathread. ## Finalization diff --git a/roadmap/implementors-guide/src/runtime/scheduler.md b/roadmap/implementors-guide/src/runtime/scheduler.md index e7d80d8d7d0a..b365308743b1 100644 --- a/roadmap/implementors-guide/src/runtime/scheduler.md +++ b/roadmap/implementors-guide/src/runtime/scheduler.md @@ -90,14 +90,15 @@ struct ParathreadEntry { // A queued parathread entry, pre-assigned to a core. struct QueuedParathread { - claim: ParathreadEntry, - core: CoreIndex, + claim: ParathreadEntry, + /// offset within the set of para-threads ranged `0..config.parathread_cores`. + core_offset: u32, } struct ParathreadQueue { - queue: Vec, - // this value is between 0 and config.parathread_cores - next_core: CoreIndex, + queue: Vec, + /// offset within the set of para-threads ranged `0..config.parathread_cores`. + next_core_offset: u32, } enum CoreOccupied { @@ -105,12 +106,22 @@ enum CoreOccupied { Parachain, } +enum AssignmentKind { + Parachain, + Parathread(CollatorId, u32), +} + struct CoreAssignment { core: CoreIndex, para_id: ParaId, - collator: Option, + kind: AssignmentKind, group_idx: GroupIndex, } +// reasons a core might be freed. +enum FreedReason { + Concluded, + TimedOut, +} ``` Storage layout: @@ -150,6 +161,7 @@ Actions: - First, we obtain "shuffled validators" `SV` by shuffling the validators using the `SessionChangeNotification`'s random seed. - The groups are selected by partitioning `SV`. The first V % N groups will have (V / N) + 1 members, while the remaining groups will have (V / N) members each. 1. Prune the parathread queue to remove all retries beyond `configuration.parathread_retries`. + - Also prune all parathread claims corresponding to de-registered parathreads. - all pruned claims should have their entry removed from the parathread index. - assign all non-pruned claims to new cores if the number of parathread cores has changed between the `new_config` and `old_config` of the `SessionChangeNotification`. - Assign claims in equal balance across all cores if rebalancing, and set the `next_core` of the `ParathreadQueue` by incrementing the relative index of the last assigned core and taking it modulo the number of parathread cores. @@ -172,15 +184,16 @@ Actions: - The core used for the parathread claim is the `next_core` field of the `ParathreadQueue` and adding `Paras::parachains().len()` to it. - `next_core` is then updated by adding 1 and taking it modulo `config.parathread_cores`. - The claim is then added to the claim index. -- `schedule(Vec)`: schedule new core assignments, with a parameter indicating previously-occupied cores which are to be considered returned. +- `schedule(Vec<(CoreIndex, FreedReason)>)`: schedule new core assignments, with a parameter indicating previously-occupied cores which are to be considered returned and why they are being returned. - All freed parachain cores should be assigned to their respective parachain - - All freed parathread cores should have the claim removed from the claim index. + - All freed parathread cores whose reason for freeing was `FreedReason::Concluded` should have the claim removed from the claim index. + - All freed parathread cores whose reason for freeing was `FreedReason::TimedOut` should have the claim added to the parathread queue again without retries incremented - All freed parathread cores should take the next parathread entry from the queue. - The i'th validator group will be assigned to the `(i+k)%n`'th core at any point in time, where `k` is the number of rotations that have occurred in the session, and `n` is the total number of cores. This makes upcoming rotations within the same session predictable. - `scheduled() -> Vec`: Get currently scheduled core assignments. - `occupied(Vec)`. Note that the given cores have become occupied. - - Fails if any given cores were not scheduled. - - Fails if the given cores are not sorted ascending by core index + - Behavior undefined if any given cores were not scheduled. + - Behavior undefined if the given cores are not sorted ascending by core index - This clears them from `Scheduled` and marks each corresponding `core` in the `AvailabilityCores` as occupied. - Since both the availability cores and the newly-occupied cores lists are sorted ascending, this method can be implemented efficiently. - `core_para(CoreIndex) -> ParaId`: return the currently-scheduled or occupied ParaId for the given core.