From e457a2973ef841cdeec2fff4f8fcdf206b13d0bb Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 9 Jul 2018 17:33:54 +0100 Subject: [PATCH 01/20] skeleton of collators object --- polkadot/network/src/collators.rs | 157 ++++++++++++++++++++++++ polkadot/network/src/lib.rs | 30 ++--- substrate/network/src/protocol.rs | 5 + substrate/network/src/specialization.rs | 3 + 4 files changed, 180 insertions(+), 15 deletions(-) create mode 100644 polkadot/network/src/collators.rs diff --git a/polkadot/network/src/collators.rs b/polkadot/network/src/collators.rs new file mode 100644 index 0000000000000..0ce0f3c5270fd --- /dev/null +++ b/polkadot/network/src/collators.rs @@ -0,0 +1,157 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Bridge between the network and consensus service for getting collations to it. + +use polkadot_primitives::{AccountId, Hash}; +use polkadot_primitives::parachain::{Id as ParaId, Collation}; +use substrate_network::{PeerId, Context}; + +use futures::prelude::*; +use futures::sync::oneshot; + +use std::collections::hash_map::{HashMap, Entry}; +use std::sync::Arc; +use parking_lot::Mutex; + +/// The role of the collator. Whether they're the primary or backup for this parachain. +pub enum Role { + /// Primary collators should send collations whenever it's time. + Primary, + /// Backup collators should not. + Backup, +} + +/// A maintenance action for the collator set. +pub enum Action { + /// Disconnect the given collator. + Disconnect(AccountId), + /// Give the collator a new role. + NewRole(AccountId, Role), +} + +/// Manages connected collators and role assignments from the perspective of a validator. +#[derive(Clone)] +pub struct Collators { + inner: Arc>, +} + +impl Collators { + /// Create a new `Collators` object. + pub fn new() -> Self { + Collators { + inner: Arc::new(Mutex::new(Inner { + collators: HashMap::new(), + bad_collators: Vec::new(), + parachain_collators: HashMap::new(), + })) + } + } + + /// Call when a new collator is authenticated. Returns the role. + pub fn on_new_collator(&self, account_id: AccountId, para_id: ParaId) -> Role { + let mut inner = self.inner.lock(); + + inner.collators.insert(account_id.clone(), para_id); + match inner.parachain_collators.entry(para_id) { + Entry::Vacant(mut vacant) => { + vacant.insert(ParachainCollators { + primary: account_id, + backup: Vec::new(), + collations: HashMap::new(), + }); + + Role::Primary + }, + Entry::Occupied(mut occupied) => { + occupied.get_mut().backup.push(account_id); + + Role::Backup + } + } + } + + /// Called when a collator disconnects. If it was the primary, returns a new primary for that + /// parachain. + pub fn on_disconnect(&self, account_id: AccountId) -> Option<(AccountId, ParaId)> { + self.inner.lock().on_disconnect(account_id) + } + + /// Call periodically to perform collator set maintenance. + /// Returns a set of actions. + pub fn maintain_peers(&self) -> Vec { + // get rid of all bad peers. + let mut inner = self.inner.lock(); + let mut actions = Vec::new(); + let bad = ::std::mem::replace(&mut inner.bad_collators, Vec::new()); + for account in bad { + actions.push(Action::Disconnect(account)); + if let Some((new_primary, _)) = inner.on_disconnect(account) { + actions.push(Action::NewRole(new_primary, Role::Primary)); + } + } + + // TODO: put underperforming collators on the back-burner. + + actions + } +} + +struct Inner { + collators: HashMap, + bad_collators: Vec, + parachain_collators: HashMap, +} + +impl Inner { + fn on_disconnect(&mut self, account_id: AccountId) -> Option<(AccountId, ParaId)> { + self.collators.remove(&account_id).and_then(|para_id| match self.parachain_collators.entry(para_id) { + Entry::Vacant(_) => None, + Entry::Occupied(mut occ) => { + if occ.get().primary == account_id { + if occ.get().backup.is_empty() { + occ.remove(); + None + } else { + let mut collators = occ.get_mut(); + collators.primary = collators.backup.pop().expect("backup non-empty; qed"); + Some((collators.primary, para_id)) + } + } else { + None + } + } + }) + } +} + +enum CollationSlot { + // not queried yet + Pending(Vec), + // waiting for next to arrive. + Awaiting(oneshot::Sender), +} + +struct ParachainCollators { + primary: AccountId, + backup: Vec, + collations: HashMap, +} + +#[cfg(test)] +mod tests { + +} diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index 91c53338e6571..441af041fb601 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -43,6 +43,7 @@ extern crate rhododendron; #[macro_use] extern crate log; +mod collators; mod router; pub mod consensus; @@ -57,10 +58,12 @@ use substrate_network::consensus_gossip::ConsensusGossip; use substrate_network::{message, generic_message}; use substrate_network::specialization::Specialization; use substrate_network::StatusMessage as GenericFullStatus; +use self::collators::Collators; use std::collections::{HashMap, HashSet}; use std::sync::Arc; + #[cfg(test)] mod tests; @@ -75,16 +78,16 @@ pub type NetworkService = ::substrate_network::Service; /// Status of a Polkadot node. #[derive(Debug, PartialEq, Eq, Clone)] pub struct Status { - collating_for: Option, + collating_for: Option<(AccountId, ParaId)>, } impl Slicable for Status { fn encode(&self) -> Vec { let mut v = Vec::new(); match self.collating_for { - Some(ref id) => { + Some(ref details) => { v.push(1); - id.using_encoded(|s| v.extend(s)); + details.using_encoded(|s| v.extend(s)); } None => { v.push(0); @@ -96,7 +99,7 @@ impl Slicable for Status { fn decode(input: &mut I) -> Option { let collating_for = match input.read_byte()? { 0 => None, - 1 => Some(ParaId::decode(input)?), + 1 => Some(Slicable::decode(input)?), _ => return None, }; Some(Status { collating_for }) @@ -207,8 +210,7 @@ fn send_polkadot_message(ctx: &mut Context, to: PeerId, message: Message) pub struct PolkadotProtocol { peers: HashMap, consensus_gossip: ConsensusGossip, - collators: HashMap>, - collating_for: Option, + collators: Collators, live_consensus: Option, in_flight: HashMap<(RequestId, PeerId), BlockDataRequest>, pending: Vec, @@ -221,8 +223,7 @@ impl PolkadotProtocol { PolkadotProtocol { peers: HashMap::new(), consensus_gossip: ConsensusGossip::new(), - collators: HashMap::new(), - collating_for: None, + collators: Collators::new(), live_consensus: None, in_flight: HashMap::new(), pending: Vec::new(), @@ -398,10 +399,8 @@ impl Specialization for PolkadotProtocol { } }; - if let Some(ref para_id) = local_status.collating_for { - self.collators.entry(para_id.clone()) - .or_insert_with(Vec::new) - .push(peer_id); + if let Some((ref acc_id, ref para_id)) = local_status.collating_for { + let collator_role = self.collators.on_new_collator(acc_id.clone(), para_id.clone()); } let validator = status.roles.iter().any(|r| *r == message::Role::Authority); @@ -426,9 +425,10 @@ impl Specialization for PolkadotProtocol { fn on_disconnect(&mut self, ctx: &mut Context, peer_id: PeerId) { if let Some(info) = self.peers.remove(&peer_id) { - if let Some(collators) = info.status.collating_for.and_then(|id| self.collators.get_mut(&id)) { - if let Some(pos) = collators.iter().position(|x| x == &peer_id) { - collators.swap_remove(pos); + if let Some((acc_id, _)) = info.status.collating_for { + if let Some((new_primary, _)) = self.collators.on_disconnect(acc_id) { + // TODO: send new primary a role-change message. + unimplemented!() } } diff --git a/substrate/network/src/protocol.rs b/substrate/network/src/protocol.rs index e77ad6307e9be..3986a979f1914 100644 --- a/substrate/network/src/protocol.rs +++ b/substrate/network/src/protocol.rs @@ -556,6 +556,11 @@ impl> Protocol where B::Header: HeaderT: Send + Sync + 'static { /// Called periodically to maintain peers and handle timeouts. fn maintain_peers(&mut self, _ctx: &mut Context) { } + + /// Called when a block is _imported_ at the head of the chain (not during major sync). + fn on_block_imported(&mut self, _ctx: &mut Context, hash: B::Hash, header: &B::Header) { } } From b04aa4e33c287568024b3db8ed46dcde56c2a314 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 10 Jul 2018 12:44:52 +0100 Subject: [PATCH 02/20] awaiting and handling collations. rename `collators` to CollationPool --- polkadot/collator/src/lib.rs | 5 +- .../src/{collators.rs => collator_pool.rs} | 168 +++++++++++------- polkadot/network/src/lib.rs | 39 +++- polkadot/primitives/src/parachain.rs | 13 +- substrate/network/src/specialization.rs | 2 +- 5 files changed, 152 insertions(+), 75 deletions(-) rename polkadot/network/src/{collators.rs => collator_pool.rs} (52%) diff --git a/polkadot/collator/src/lib.rs b/polkadot/collator/src/lib.rs index f7557f353e195..a4629c57938ec 100644 --- a/polkadot/collator/src/lib.rs +++ b/polkadot/collator/src/lib.rs @@ -157,7 +157,8 @@ pub fn collate<'a, R, P>( ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg))) ); - let signature = key.sign(&block_data.0[..]).into(); + let block_data_hash = block_data.hash(); + let signature = key.sign(&block_data_hash.0[..]).into(); let pubkey_bytes: [u8; 32] = key.public().into(); let receipt = parachain::CandidateReceipt { @@ -168,7 +169,7 @@ pub fn collate<'a, R, P>( balance_uploads: Vec::new(), egress_queue_roots: Vec::new(), fees: 0, - block_data_hash: block_data.hash(), + block_data_hash, }; parachain::Collation { diff --git a/polkadot/network/src/collators.rs b/polkadot/network/src/collator_pool.rs similarity index 52% rename from polkadot/network/src/collators.rs rename to polkadot/network/src/collator_pool.rs index 0ce0f3c5270fd..10c3c0cf5721a 100644 --- a/polkadot/network/src/collators.rs +++ b/polkadot/network/src/collator_pool.rs @@ -18,14 +18,10 @@ use polkadot_primitives::{AccountId, Hash}; use polkadot_primitives::parachain::{Id as ParaId, Collation}; -use substrate_network::{PeerId, Context}; -use futures::prelude::*; use futures::sync::oneshot; use std::collections::hash_map::{HashMap, Entry}; -use std::sync::Arc; -use parking_lot::Mutex; /// The role of the collator. Whether they're the primary or backup for this parachain. pub enum Role { @@ -43,35 +39,85 @@ pub enum Action { NewRole(AccountId, Role), } +enum CollationSlot { + Blank, + // not queried yet + Pending(Vec), + // waiting for next to arrive. + Awaiting(Vec>), +} + +impl CollationSlot { + fn received_collation(&mut self, collation: Collation) { + *self = match ::std::mem::replace(self, CollationSlot::Blank) { + CollationSlot::Blank => CollationSlot::Pending(vec![collation]), + CollationSlot::Pending(mut cs) => { + cs.push(collation); + CollationSlot::Pending(cs) + } + CollationSlot::Awaiting(senders) => { + for sender in senders { + let _ = sender.send(collation.clone()); + } + + CollationSlot::Blank + } + }; + } + + fn await_with(&mut self, sender: oneshot::Sender) { + *self = match ::std::mem::replace(self, CollationSlot::Blank) { + CollationSlot::Blank => CollationSlot::Awaiting(vec![sender]), + CollationSlot::Awaiting(mut senders) => { + senders.push(sender); + CollationSlot::Awaiting(senders) + } + CollationSlot::Pending(mut cs) => { + let next_collation = cs.pop().expect("empty variant is always `Blank`; qed"); + let _ = sender.send(next_collation); + + if cs.is_empty() { + CollationSlot::Blank + } else { + CollationSlot::Pending(cs) + } + } + }; + } +} + +struct ParachainCollators { + primary: AccountId, + backup: Vec, +} + /// Manages connected collators and role assignments from the perspective of a validator. -#[derive(Clone)] -pub struct Collators { - inner: Arc>, +pub struct CollatorPool { + collators: HashMap, + bad_collators: Vec, + parachain_collators: HashMap, + collations: HashMap<(Hash, ParaId), CollationSlot>, } -impl Collators { - /// Create a new `Collators` object. +impl CollatorPool { + /// Create a new `CollatorPool` object. pub fn new() -> Self { - Collators { - inner: Arc::new(Mutex::new(Inner { - collators: HashMap::new(), - bad_collators: Vec::new(), - parachain_collators: HashMap::new(), - })) + CollatorPool { + collators: HashMap::new(), + bad_collators: Vec::new(), + parachain_collators: HashMap::new(), + collations: HashMap::new(), } } /// Call when a new collator is authenticated. Returns the role. - pub fn on_new_collator(&self, account_id: AccountId, para_id: ParaId) -> Role { - let mut inner = self.inner.lock(); - - inner.collators.insert(account_id.clone(), para_id); - match inner.parachain_collators.entry(para_id) { + pub fn on_new_collator(&mut self, account_id: AccountId, para_id: ParaId) -> Role { + self.collators.insert(account_id.clone(), para_id); + match self.parachain_collators.entry(para_id) { Entry::Vacant(mut vacant) => { vacant.insert(ParachainCollators { primary: account_id, backup: Vec::new(), - collations: HashMap::new(), }); Role::Primary @@ -86,38 +132,7 @@ impl Collators { /// Called when a collator disconnects. If it was the primary, returns a new primary for that /// parachain. - pub fn on_disconnect(&self, account_id: AccountId) -> Option<(AccountId, ParaId)> { - self.inner.lock().on_disconnect(account_id) - } - - /// Call periodically to perform collator set maintenance. - /// Returns a set of actions. - pub fn maintain_peers(&self) -> Vec { - // get rid of all bad peers. - let mut inner = self.inner.lock(); - let mut actions = Vec::new(); - let bad = ::std::mem::replace(&mut inner.bad_collators, Vec::new()); - for account in bad { - actions.push(Action::Disconnect(account)); - if let Some((new_primary, _)) = inner.on_disconnect(account) { - actions.push(Action::NewRole(new_primary, Role::Primary)); - } - } - - // TODO: put underperforming collators on the back-burner. - - actions - } -} - -struct Inner { - collators: HashMap, - bad_collators: Vec, - parachain_collators: HashMap, -} - -impl Inner { - fn on_disconnect(&mut self, account_id: AccountId) -> Option<(AccountId, ParaId)> { + pub fn on_disconnect(&mut self, account_id: AccountId) -> Option<(AccountId, ParaId)> { self.collators.remove(&account_id).and_then(|para_id| match self.parachain_collators.entry(para_id) { Entry::Vacant(_) => None, Entry::Occupied(mut occ) => { @@ -136,19 +151,44 @@ impl Inner { } }) } -} -enum CollationSlot { - // not queried yet - Pending(Vec), - // waiting for next to arrive. - Awaiting(oneshot::Sender), -} + /// Called when a collation is received. + /// The collator should be registered for the parachain of the collation as a precondition of this function. + /// The collation should have been checked for integrity of signature before passing to this function. + pub fn on_collation(&mut self, account_id: AccountId, relay_parent: Hash, collation: Collation) { + if let Some(para_id) = self.collators.get(&account_id) { + debug_assert_eq!(para_id, &collation.receipt.parachain_index); -struct ParachainCollators { - primary: AccountId, - backup: Vec, - collations: HashMap, + self.collations.entry((relay_parent, para_id.clone())) + .or_insert_with(|| CollationSlot::Blank) + .received_collation(collation); + } + } + + /// Wait for a collation from a parachain. + pub fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId, sender: oneshot::Sender) { + self.collations.entry((relay_parent, para_id)) + .or_insert_with(|| CollationSlot::Blank) + .await_with(sender); + } + + /// Call periodically to perform collator set maintenance. + /// Returns a set of actions to perform on the network level. + pub fn maintain_peers(&mut self) -> Vec { + // get rid of all bad peers. + let mut actions = Vec::new(); + let bad = ::std::mem::replace(&mut self.bad_collators, Vec::new()); + for account in bad { + actions.push(Action::Disconnect(account)); + if let Some((new_primary, _)) = self.on_disconnect(account) { + actions.push(Action::NewRole(new_primary, Role::Primary)); + } + } + + // TODO: put underperforming collators on the back-burner. + + actions + } } #[cfg(test)] diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index 441af041fb601..6c481c540ab94 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -43,7 +43,7 @@ extern crate rhododendron; #[macro_use] extern crate log; -mod collators; +mod collator_pool; mod router; pub mod consensus; @@ -51,14 +51,14 @@ use codec::Slicable; use futures::sync::oneshot; use parking_lot::Mutex; use polkadot_consensus::{Statement, SignedStatement, GenericStatement}; -use polkadot_primitives::{Block, SessionKey, Hash}; -use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt}; +use polkadot_primitives::{AccountId, Block, SessionKey, Hash}; +use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt, Collation}; use substrate_network::{PeerId, RequestId, Context}; use substrate_network::consensus_gossip::ConsensusGossip; use substrate_network::{message, generic_message}; use substrate_network::specialization::Specialization; use substrate_network::StatusMessage as GenericFullStatus; -use self::collators::Collators; +use self::collator_pool::CollatorPool; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -199,6 +199,8 @@ pub enum Message { RequestBlockData(RequestId, Hash), /// Provide block data by candidate hash or nothing if unknown. BlockData(RequestId, Option), + /// A collation provided by a peer. Relay parent and collation. + Collation(Hash, Collation), } fn send_polkadot_message(ctx: &mut Context, to: PeerId, message: Message) { @@ -210,7 +212,7 @@ fn send_polkadot_message(ctx: &mut Context, to: PeerId, message: Message) pub struct PolkadotProtocol { peers: HashMap, consensus_gossip: ConsensusGossip, - collators: Collators, + collators: CollatorPool, live_consensus: Option, in_flight: HashMap<(RequestId, PeerId), BlockDataRequest>, pending: Vec, @@ -223,7 +225,7 @@ impl PolkadotProtocol { PolkadotProtocol { peers: HashMap::new(), consensus_gossip: ConsensusGossip::new(), - collators: Collators::new(), + collators: CollatorPool::new(), live_consensus: None, in_flight: HashMap::new(), pending: Vec::new(), @@ -364,6 +366,7 @@ impl PolkadotProtocol { send_polkadot_message(ctx, peer_id, Message::BlockData(req_id, block_data)); } Message::BlockData(req_id, data) => self.on_block_data(ctx, peer_id, req_id, data), + Message::Collation(relay_parent, collation) => self.on_collation(ctx, peer_id, relay_parent, collation), } } @@ -387,7 +390,7 @@ impl PolkadotProtocol { impl Specialization for PolkadotProtocol { fn status(&self) -> Vec { - Status { collating_for: self.collating_for.clone() }.encode() + Status { collating_for: None }.encode() } fn on_connect(&mut self, ctx: &mut Context, peer_id: PeerId, status: FullStatus) { @@ -487,3 +490,25 @@ impl Specialization for PolkadotProtocol { self.dispatch_pending_requests(ctx); } } + +impl PolkadotProtocol { + // we received a collation from a peer + fn on_collation(&mut self, ctx: &mut Context, from: PeerId, relay_parent: Hash, collation: Collation) { + let collation_para = collation.receipt.parachain_index; + let collated_acc = collation.receipt.collator; + + match self.peers.get(&from) { + None => ctx.disconnect_peer(from), + Some(peer_info) => match peer_info.status.collating_for { + None => ctx.disable_peer(from), + Some((ref acc_id, ref para_id)) + if para_id != &collation_para || acc_id != &collated_acc || collation.receipt.check_signature().is_err() => ctx.disable_peer(from), + Some((ref acc_id, _)) => self.collators.on_collation(acc_id.clone(), relay_parent, collation), + }, + } + } + + fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId, sender: oneshot::Sender) { + self.collators.await_collation(relay_parent, para_id, sender); + } +} diff --git a/polkadot/primitives/src/parachain.rs b/polkadot/primitives/src/parachain.rs index dc77669785d04..bb87f7d3261ac 100644 --- a/polkadot/primitives/src/parachain.rs +++ b/polkadot/primitives/src/parachain.rs @@ -144,7 +144,7 @@ pub struct CandidateReceipt { pub parachain_index: Id, /// The collator's relay-chain account ID pub collator: super::AccountId, - /// Signature on block data by collator. + /// Signature on blake2-256 of the block data by collator. pub signature: CandidateSignature, /// The head-data pub head_data: HeadData, @@ -195,6 +195,17 @@ impl CandidateReceipt { use runtime_primitives::traits::{BlakeTwo256, Hashing}; BlakeTwo256::hash_of(self) } + + /// Check integrity vs. provided block data. + pub fn check_signature(&self) -> Result<(), ()> { + use runtime_primitives::traits::Verify; + + if self.signature.verify(&self.signature.0[..], &self.collator) { + Ok(()) + } else { + Err(()) + } + } } impl PartialOrd for CandidateReceipt { diff --git a/substrate/network/src/specialization.rs b/substrate/network/src/specialization.rs index 57f027794b3d5..999c545291d88 100644 --- a/substrate/network/src/specialization.rs +++ b/substrate/network/src/specialization.rs @@ -44,5 +44,5 @@ pub trait Specialization: Send + Sync + 'static { fn maintain_peers(&mut self, _ctx: &mut Context) { } /// Called when a block is _imported_ at the head of the chain (not during major sync). - fn on_block_imported(&mut self, _ctx: &mut Context, hash: B::Hash, header: &B::Header) { } + fn on_block_imported(&mut self, _ctx: &mut Context, _hash: B::Hash, _header: &B::Header) { } } From e9a3ace74a4ef0ea2eaa9bd7815009b98cafd9cd Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 10 Jul 2018 15:38:33 +0100 Subject: [PATCH 03/20] add some tests --- polkadot/network/src/collator_pool.rs | 59 ++++++++++++++++++++++++++- 1 file changed, 58 insertions(+), 1 deletion(-) diff --git a/polkadot/network/src/collator_pool.rs b/polkadot/network/src/collator_pool.rs index 10c3c0cf5721a..4bdebb8543bee 100644 --- a/polkadot/network/src/collator_pool.rs +++ b/polkadot/network/src/collator_pool.rs @@ -24,6 +24,7 @@ use futures::sync::oneshot; use std::collections::hash_map::{HashMap, Entry}; /// The role of the collator. Whether they're the primary or backup for this parachain. +#[derive(PartialEq, Debug)] pub enum Role { /// Primary collators should send collations whenever it's time. Primary, @@ -32,6 +33,7 @@ pub enum Role { } /// A maintenance action for the collator set. +#[derive(PartialEq, Debug)] pub enum Action { /// Disconnect the given collator. Disconnect(AccountId), @@ -114,7 +116,7 @@ impl CollatorPool { pub fn on_new_collator(&mut self, account_id: AccountId, para_id: ParaId) -> Role { self.collators.insert(account_id.clone(), para_id); match self.parachain_collators.entry(para_id) { - Entry::Vacant(mut vacant) => { + Entry::Vacant(vacant) => { vacant.insert(ParachainCollators { primary: account_id, backup: Vec::new(), @@ -159,6 +161,8 @@ impl CollatorPool { if let Some(para_id) = self.collators.get(&account_id) { debug_assert_eq!(para_id, &collation.receipt.parachain_index); + // TODO: punish if not primary? + self.collations.entry((relay_parent, para_id.clone())) .or_insert_with(|| CollationSlot::Blank) .received_collation(collation); @@ -189,9 +193,62 @@ impl CollatorPool { actions } + + /// Note a bad collator. + pub fn note_bad(&mut self, collator: AccountId) { + self.bad_collators.push(collator); + } } #[cfg(test)] mod tests { + use super::*; + use polkadot_primitives::parachain::{CandidateReceipt, BlockData, HeadData}; + use substrate_primitives::H512; + use futures::Future; + + #[test] + fn note_bad_primary_gives_new_primary() { + let mut pool = CollatorPool::new(); + let para_id: ParaId = 5.into(); + let bad_primary = [0; 32].into(); + let good_backup = [1; 32].into(); + + assert_eq!(pool.on_new_collator(bad_primary, para_id.clone()), Role::Primary); + assert_eq!(pool.on_new_collator(good_backup, para_id.clone()), Role::Backup); + + pool.note_bad(bad_primary); + + assert_eq!(pool.maintain_peers(), vec![ + Action::Disconnect(bad_primary), + Action::NewRole(good_backup, Role::Primary), + ]); + } + #[test] + fn await_before_collation() { + let mut pool = CollatorPool::new(); + let para_id: ParaId = 5.into(); + let primary = [0; 32].into(); + let relay_parent = [1; 32].into(); + + assert_eq!(pool.on_new_collator(primary, para_id.clone()), Role::Primary); + let (tx, rx) = oneshot::channel(); + pool.await_collation(relay_parent, para_id, tx); + pool.on_collation(primary, relay_parent, Collation { + receipt: CandidateReceipt { + parachain_index: para_id, + collator: primary.into(), + signature: H512::from([2; 64]).into(), + head_data: HeadData(vec![1, 2, 3]), + balance_uploads: vec![], + egress_queue_roots: vec![], + fees: 0, + block_data_hash: [3; 32].into(), + }, + block_data: BlockData(vec![4, 5, 6]), + }); + + rx.wait().unwrap(); + } } From c152a0c7fe4314ec986fafd7ba2403f9d1775fa7 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 10 Jul 2018 15:50:40 +0100 Subject: [PATCH 04/20] add tests --- polkadot/network/src/collator_pool.rs | 35 ++++++--------------------- 1 file changed, 7 insertions(+), 28 deletions(-) diff --git a/polkadot/network/src/collator_pool.rs b/polkadot/network/src/collator_pool.rs index 4bdebb8543bee..9d0790f430dbd 100644 --- a/polkadot/network/src/collator_pool.rs +++ b/polkadot/network/src/collator_pool.rs @@ -134,7 +134,7 @@ impl CollatorPool { /// Called when a collator disconnects. If it was the primary, returns a new primary for that /// parachain. - pub fn on_disconnect(&mut self, account_id: AccountId) -> Option<(AccountId, ParaId)> { + pub fn on_disconnect(&mut self, account_id: AccountId) -> Option { self.collators.remove(&account_id).and_then(|para_id| match self.parachain_collators.entry(para_id) { Entry::Vacant(_) => None, Entry::Occupied(mut occ) => { @@ -145,7 +145,7 @@ impl CollatorPool { } else { let mut collators = occ.get_mut(); collators.primary = collators.backup.pop().expect("backup non-empty; qed"); - Some((collators.primary, para_id)) + Some(collators.primary) } } else { None @@ -179,24 +179,8 @@ impl CollatorPool { /// Call periodically to perform collator set maintenance. /// Returns a set of actions to perform on the network level. pub fn maintain_peers(&mut self) -> Vec { - // get rid of all bad peers. - let mut actions = Vec::new(); - let bad = ::std::mem::replace(&mut self.bad_collators, Vec::new()); - for account in bad { - actions.push(Action::Disconnect(account)); - if let Some((new_primary, _)) = self.on_disconnect(account) { - actions.push(Action::NewRole(new_primary, Role::Primary)); - } - } - - // TODO: put underperforming collators on the back-burner. - - actions - } - - /// Note a bad collator. - pub fn note_bad(&mut self, collator: AccountId) { - self.bad_collators.push(collator); + // TODO: rearrange periodically to new primary, evaluate based on latency etc. + Vec::new() } } @@ -208,7 +192,7 @@ mod tests { use futures::Future; #[test] - fn note_bad_primary_gives_new_primary() { + fn disconnect_primary_gives_new_primary() { let mut pool = CollatorPool::new(); let para_id: ParaId = 5.into(); let bad_primary = [0; 32].into(); @@ -216,13 +200,8 @@ mod tests { assert_eq!(pool.on_new_collator(bad_primary, para_id.clone()), Role::Primary); assert_eq!(pool.on_new_collator(good_backup, para_id.clone()), Role::Backup); - - pool.note_bad(bad_primary); - - assert_eq!(pool.maintain_peers(), vec![ - Action::Disconnect(bad_primary), - Action::NewRole(good_backup, Role::Primary), - ]); + assert_eq!(pool.on_disconnect(bad_primary), Some(good_backup)); + assert_eq!(pool.on_disconnect(good_backup), None); } #[test] From 352565cbd82a9253b6fcf5b4f2b7860dd13e36e3 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 10 Jul 2018 16:03:54 +0100 Subject: [PATCH 05/20] implement Collators trait for ConsensusNetwork --- polkadot/network/src/consensus.rs | 37 ++++++++++++++++++++++++++----- polkadot/network/src/lib.rs | 20 ++++++++++++++--- 2 files changed, 48 insertions(+), 9 deletions(-) diff --git a/polkadot/network/src/consensus.rs b/polkadot/network/src/consensus.rs index 0eb14d9381aaa..7cd79abfdab11 100644 --- a/polkadot/network/src/consensus.rs +++ b/polkadot/network/src/consensus.rs @@ -27,7 +27,7 @@ use polkadot_consensus::{Network, SharedTable, Collators}; use polkadot_primitives::{AccountId, Block, Hash, SessionKey}; use polkadot_primitives::parachain::{Id as ParaId, Collation}; -use futures::{future, prelude::*}; +use futures::prelude::*; use futures::sync::mpsc; use std::sync::Arc; @@ -304,13 +304,38 @@ impl Network for ConsensusNetwork

>); + +impl Future for AwaitingCollation { + type Item = Collation; + type Error = NetworkDown; + + fn poll(&mut self) -> Poll { + match self.0.poll().map_err(|_| NetworkDown)? { + Async::Ready(None) => Err(NetworkDown), + Async::Ready(Some(x)) => Ok(Async::Ready(x)), + Async::NotReady => Ok(Async::NotReady), + } + } +} + + impl Collators for ConsensusNetwork

{ - type Error = (); - type Collation = future::Empty; + type Error = NetworkDown; + type Collation = AwaitingCollation; - fn collate(&self, _parachain: ParaId, _relay_parent: Hash) -> Self::Collation { - future::empty() + fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation { + AwaitingCollation( + self.network.with_spec(|spec, _| spec.await_collation(relay_parent, parachain)) + ) } - fn note_bad_collator(&self, _collator: AccountId) { } + fn note_bad_collator(&self, collator: AccountId) { + self.network.with_spec(|spec, ctx| spec.disconnect_collator(ctx, collator)); + } } diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index 6c481c540ab94..acd014deddda6 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -429,7 +429,7 @@ impl Specialization for PolkadotProtocol { fn on_disconnect(&mut self, ctx: &mut Context, peer_id: PeerId) { if let Some(info) = self.peers.remove(&peer_id) { if let Some((acc_id, _)) = info.status.collating_for { - if let Some((new_primary, _)) = self.collators.on_disconnect(acc_id) { + if let Some(new_primary) = self.collators.on_disconnect(acc_id) { // TODO: send new primary a role-change message. unimplemented!() } @@ -508,7 +508,21 @@ impl PolkadotProtocol { } } - fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId, sender: oneshot::Sender) { - self.collators.await_collation(relay_parent, para_id, sender); + fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + self.collators.await_collation(relay_parent, para_id, tx); + rx + } + + // disconnect a collator by account-id. + fn disconnect_collator(&mut self, ctx: &mut Context, account_id: AccountId) { + let bad_peers = self.peers + .iter() + .filter(|&(_, info)| info.status.collating_for.as_ref().map_or(false, |&(ref acc_id, _)| acc_id == &account_id)) + .map(|(peer_id, _)| *peer_id); + + for peer in bad_peers { + ctx.disable_peer(peer); + } } } From 11875ec74e5dc371ef8fc5a78439d8bd311a2dd9 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 10 Jul 2018 16:49:04 +0100 Subject: [PATCH 06/20] plug collators into main polkadot-network --- polkadot/network/src/collator_pool.rs | 5 +-- polkadot/network/src/consensus.rs | 2 +- polkadot/network/src/lib.rs | 63 ++++++++++++++++++++++----- 3 files changed, 54 insertions(+), 16 deletions(-) diff --git a/polkadot/network/src/collator_pool.rs b/polkadot/network/src/collator_pool.rs index 9d0790f430dbd..12568941f1308 100644 --- a/polkadot/network/src/collator_pool.rs +++ b/polkadot/network/src/collator_pool.rs @@ -24,7 +24,7 @@ use futures::sync::oneshot; use std::collections::hash_map::{HashMap, Entry}; /// The role of the collator. Whether they're the primary or backup for this parachain. -#[derive(PartialEq, Debug)] +#[derive(PartialEq, Debug, Serialize, Deserialize)] pub enum Role { /// Primary collators should send collations whenever it's time. Primary, @@ -34,6 +34,7 @@ pub enum Role { /// A maintenance action for the collator set. #[derive(PartialEq, Debug)] +#[allow(dead_code)] pub enum Action { /// Disconnect the given collator. Disconnect(AccountId), @@ -96,7 +97,6 @@ struct ParachainCollators { /// Manages connected collators and role assignments from the perspective of a validator. pub struct CollatorPool { collators: HashMap, - bad_collators: Vec, parachain_collators: HashMap, collations: HashMap<(Hash, ParaId), CollationSlot>, } @@ -106,7 +106,6 @@ impl CollatorPool { pub fn new() -> Self { CollatorPool { collators: HashMap::new(), - bad_collators: Vec::new(), parachain_collators: HashMap::new(), collations: HashMap::new(), } diff --git a/polkadot/network/src/consensus.rs b/polkadot/network/src/consensus.rs index 7cd79abfdab11..3fe22acd5aacf 100644 --- a/polkadot/network/src/consensus.rs +++ b/polkadot/network/src/consensus.rs @@ -336,6 +336,6 @@ impl Collators for ConsensusNetwork } fn note_bad_collator(&self, collator: AccountId) { - self.network.with_spec(|spec, ctx| spec.disconnect_collator(ctx, collator)); + self.network.with_spec(|spec, ctx| spec.disconnect_bad_collator(ctx, collator)); } } diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index acd014deddda6..e5341885b134a 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -58,7 +58,7 @@ use substrate_network::consensus_gossip::ConsensusGossip; use substrate_network::{message, generic_message}; use substrate_network::specialization::Specialization; use substrate_network::StatusMessage as GenericFullStatus; -use self::collator_pool::CollatorPool; +use self::collator_pool::{CollatorPool, Role, Action}; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -199,6 +199,8 @@ pub enum Message { RequestBlockData(RequestId, Hash), /// Provide block data by candidate hash or nothing if unknown. BlockData(RequestId, Option), + /// Tell a collator their role. + CollatorRole(Role), /// A collation provided by a peer. Relay parent and collation. Collation(Hash, Collation), } @@ -263,7 +265,10 @@ impl PolkadotProtocol { let parent_hash = consensus.parent_hash; let old_parent = self.live_consensus.as_ref().map(|c| c.parent_hash); - for (id, info) in self.peers.iter_mut().filter(|&(_, ref info)| info.validator) { + // TODO: optimize for when session key changes and only send to collators who are relevant in next few blocks. + for (id, info) in self.peers.iter_mut() + .filter(|&(_, ref info)| info.validator || info.status.collating_for.is_some()) + { send_polkadot_message( ctx, *id, @@ -367,6 +372,7 @@ impl PolkadotProtocol { } Message::BlockData(req_id, data) => self.on_block_data(ctx, peer_id, req_id, data), Message::Collation(relay_parent, collation) => self.on_collation(ctx, peer_id, relay_parent, collation), + Message::CollatorRole(_) => unimplemented!(), } } @@ -398,12 +404,22 @@ impl Specialization for PolkadotProtocol { Some(status) => status, None => { ctx.disable_peer(peer_id); - return; + return } }; if let Some((ref acc_id, ref para_id)) = local_status.collating_for { + if self.collator_peer_id(acc_id.clone()).is_some() { + ctx.disable_peer(peer_id); + return + } + let collator_role = self.collators.on_new_collator(acc_id.clone(), para_id.clone()); + send_polkadot_message( + ctx, + peer_id, + Message::CollatorRole(collator_role), + ); } let validator = status.roles.iter().any(|r| *r == message::Role::Authority); @@ -429,9 +445,15 @@ impl Specialization for PolkadotProtocol { fn on_disconnect(&mut self, ctx: &mut Context, peer_id: PeerId) { if let Some(info) = self.peers.remove(&peer_id) { if let Some((acc_id, _)) = info.status.collating_for { - if let Some(new_primary) = self.collators.on_disconnect(acc_id) { - // TODO: send new primary a role-change message. - unimplemented!() + let new_primary = self.collators.on_disconnect(acc_id) + .and_then(|new_primary| self.collator_peer_id(new_primary)); + + if let Some(new_primary) = new_primary { + send_polkadot_message( + ctx, + new_primary, + Message::CollatorRole(Role::Primary), + ) } } @@ -488,6 +510,19 @@ impl Specialization for PolkadotProtocol { fn maintain_peers(&mut self, ctx: &mut Context) { self.consensus_gossip.collect_garbage(None); self.dispatch_pending_requests(ctx); + + for collator_action in self.collators.maintain_peers() { + match collator_action { + Action::Disconnect(collator) => self.disconnect_bad_collator(ctx, collator), + Action::NewRole(account_id, role) => if let Some(collator) = self.collator_peer_id(account_id) { + send_polkadot_message( + ctx, + collator, + Message::CollatorRole(role), + ) + }, + } + } } } @@ -514,15 +549,19 @@ impl PolkadotProtocol { rx } - // disconnect a collator by account-id. - fn disconnect_collator(&mut self, ctx: &mut Context, account_id: AccountId) { - let bad_peers = self.peers + // get connected peer with given account ID for collation. + fn collator_peer_id(&self, account_id: AccountId) -> Option { + self.peers .iter() .filter(|&(_, info)| info.status.collating_for.as_ref().map_or(false, |&(ref acc_id, _)| acc_id == &account_id)) - .map(|(peer_id, _)| *peer_id); + .map(|(peer_id, _)| *peer_id) + .next() + } - for peer in bad_peers { - ctx.disable_peer(peer); + // disconnect a collator by account-id. + fn disconnect_bad_collator(&self, ctx: &mut Context, account_id: AccountId) { + if let Some(peer_id) = self.collator_peer_id(account_id) { + ctx.disable_peer(peer_id) } } } From d2366634b1313c4346510bc61f20c048cab5de50 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 11 Jul 2018 10:42:23 +0100 Subject: [PATCH 07/20] ignore collator role message --- polkadot/network/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index b0d31499be528..3bf77a52123be 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -372,7 +372,7 @@ impl PolkadotProtocol { } Message::BlockData(req_id, data) => self.on_block_data(ctx, peer_id, req_id, data), Message::Collation(relay_parent, collation) => self.on_collation(ctx, peer_id, relay_parent, collation), - Message::CollatorRole(_) => unimplemented!(), + Message::CollatorRole(_) => {}, } } From bd70053cbd5e21eddbb2cf32fa447e2e57e1630c Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 11 Jul 2018 10:57:18 +0100 Subject: [PATCH 08/20] add a couple more tests --- polkadot/network/src/collator_pool.rs | 35 +++++++++++++++++++++++++-- polkadot/network/src/lib.rs | 5 ++-- polkadot/network/src/tests.rs | 28 ++++++++++++++++++--- 3 files changed, 61 insertions(+), 7 deletions(-) diff --git a/polkadot/network/src/collator_pool.rs b/polkadot/network/src/collator_pool.rs index 12568941f1308..2d05e02c8ce5a 100644 --- a/polkadot/network/src/collator_pool.rs +++ b/polkadot/network/src/collator_pool.rs @@ -211,8 +211,37 @@ mod tests { let relay_parent = [1; 32].into(); assert_eq!(pool.on_new_collator(primary, para_id.clone()), Role::Primary); - let (tx, rx) = oneshot::channel(); - pool.await_collation(relay_parent, para_id, tx); + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + pool.await_collation(relay_parent, para_id, tx1); + pool.await_collation(relay_parent, para_id, tx2); + pool.on_collation(primary, relay_parent, Collation { + receipt: CandidateReceipt { + parachain_index: para_id, + collator: primary.into(), + signature: H512::from([2; 64]).into(), + head_data: HeadData(vec![1, 2, 3]), + balance_uploads: vec![], + egress_queue_roots: vec![], + fees: 0, + block_data_hash: [3; 32].into(), + }, + block_data: BlockData(vec![4, 5, 6]), + }); + + rx1.wait().unwrap(); + rx2.wait().unwrap(); + } + + #[test] + fn collate_before_await() { + let mut pool = CollatorPool::new(); + let para_id: ParaId = 5.into(); + let primary = [0; 32].into(); + let relay_parent = [1; 32].into(); + + assert_eq!(pool.on_new_collator(primary, para_id.clone()), Role::Primary); + pool.on_collation(primary, relay_parent, Collation { receipt: CandidateReceipt { parachain_index: para_id, @@ -227,6 +256,8 @@ mod tests { block_data: BlockData(vec![4, 5, 6]), }); + let (tx, rx) = oneshot::channel(); + pool.await_collation(relay_parent, para_id, tx); rx.wait().unwrap(); } } diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index 3bf77a52123be..d149a1ed320ee 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -422,6 +422,8 @@ impl Specialization for PolkadotProtocol { } let validator = status.roles.iter().any(|r| *r == message::Role::Authority); + let send_key = validator || local_status.collating_for.is_some(); + self.peers.insert(peer_id, PeerInfo { status: local_status, session_keys: Default::default(), @@ -429,8 +431,7 @@ impl Specialization for PolkadotProtocol { }); self.consensus_gossip.new_peer(ctx, peer_id, &status.roles); - - if let (true, &Some(ref consensus)) = (validator, &self.live_consensus) { + if let (true, &Some(ref consensus)) = (send_key, &self.live_consensus) { send_polkadot_message( ctx, peer_id, diff --git a/polkadot/network/src/tests.rs b/polkadot/network/src/tests.rs index 19db9890ccb97..5e3eca4651646 100644 --- a/polkadot/network/src/tests.rs +++ b/polkadot/network/src/tests.rs @@ -110,11 +110,12 @@ fn sends_session_key() { let parent_hash = [0; 32].into(); let local_key = [1; 32].into(); - let status = Status { collating_for: None }; + let validator_status = Status { collating_for: None }; + let collator_status = Status { collating_for: Some(([2; 32].into(), 5.into())) }; { let mut ctx = TestContext::default(); - protocol.on_connect(&mut ctx, peer_a, make_status(&status, vec![Role::Authority])); + protocol.on_connect(&mut ctx, peer_a, make_status(&validator_status, vec![Role::Authority])); assert!(ctx.messages.is_empty()); } @@ -128,7 +129,7 @@ fn sends_session_key() { { let mut ctx = TestContext::default(); - protocol.on_connect(&mut ctx, peer_b, make_status(&status, vec![Role::Authority])); + protocol.on_connect(&mut ctx, peer_b, make_status(&collator_status, vec![])); assert!(ctx.has_message(peer_b, Message::SessionKey(parent_hash, local_key))); } } @@ -207,3 +208,24 @@ fn fetches_from_those_with_knowledge() { assert_eq!(recv.wait().unwrap(), block_data); } } + +#[test] +fn remove_bad_collator() { + let mut protocol = PolkadotProtocol::new(); + + let peer_id = 1; + let account_id = [2; 32].into(); + + let status = Status { collating_for: Some((account_id, 5.into())) }; + + { + let mut ctx = TestContext::default(); + protocol.on_connect(&mut ctx, peer_id, make_status(&status, vec![])); + } + + { + let mut ctx = TestContext::default(); + protocol.disconnect_bad_collator(&mut ctx, account_id); + assert!(ctx.disabled.contains(&peer_id)); + } +} From 2a03637388183a8a78406faa17fa407cf30f1d6b Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 11 Jul 2018 11:44:31 +0100 Subject: [PATCH 09/20] garbage collection for collations --- polkadot/consensus/src/collation.rs | 6 +++ polkadot/network/src/collator_pool.rs | 74 +++++++++++++++++++++------ polkadot/network/src/lib.rs | 7 ++- 3 files changed, 69 insertions(+), 18 deletions(-) diff --git a/polkadot/consensus/src/collation.rs b/polkadot/consensus/src/collation.rs index db490a0eb17d8..f7db48db619bb 100644 --- a/polkadot/consensus/src/collation.rs +++ b/polkadot/consensus/src/collation.rs @@ -37,6 +37,12 @@ pub trait Collators: Clone { type Collation: IntoFuture; /// Collate on a specific parachain, building on a given relay chain parent hash. + /// + /// The returned collation should be checked for basic validity in the signature + /// and will be checked for state-transition validity by the consumer of this trait. + /// + /// This does not have to guarantee local availability, as a valid collation + /// will be passed to the `TableRouter` instance. fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation; /// Note a bad collator. TODO: take proof diff --git a/polkadot/network/src/collator_pool.rs b/polkadot/network/src/collator_pool.rs index 2d05e02c8ce5a..76df8098219b3 100644 --- a/polkadot/network/src/collator_pool.rs +++ b/polkadot/network/src/collator_pool.rs @@ -22,6 +22,9 @@ use polkadot_primitives::parachain::{Id as ParaId, Collation}; use futures::sync::oneshot; use std::collections::hash_map::{HashMap, Entry}; +use std::time::{Duration, Instant}; + +const COLLATION_LIFETIME: Duration = Duration::from_secs(60 * 5); /// The role of the collator. Whether they're the primary or backup for this parachain. #[derive(PartialEq, Debug, Serialize, Deserialize)] @@ -42,7 +45,25 @@ pub enum Action { NewRole(AccountId, Role), } -enum CollationSlot { +struct CollationSlot { + live_at: Instant, + entries: SlotEntries, +} + +impl CollationSlot { + fn blank_now() -> Self { + CollationSlot { + live_at: Instant::now(), + entries: SlotEntries::Blank, + } + } + + fn stay_alive(&self, now: Instant) -> bool { + self.live_at + COLLATION_LIFETIME > now + } +} + +enum SlotEntries { Blank, // not queried yet Pending(Vec), @@ -50,39 +71,39 @@ enum CollationSlot { Awaiting(Vec>), } -impl CollationSlot { +impl SlotEntries { fn received_collation(&mut self, collation: Collation) { - *self = match ::std::mem::replace(self, CollationSlot::Blank) { - CollationSlot::Blank => CollationSlot::Pending(vec![collation]), - CollationSlot::Pending(mut cs) => { + *self = match ::std::mem::replace(self, SlotEntries::Blank) { + SlotEntries::Blank => SlotEntries::Pending(vec![collation]), + SlotEntries::Pending(mut cs) => { cs.push(collation); - CollationSlot::Pending(cs) + SlotEntries::Pending(cs) } - CollationSlot::Awaiting(senders) => { + SlotEntries::Awaiting(senders) => { for sender in senders { let _ = sender.send(collation.clone()); } - CollationSlot::Blank + SlotEntries::Blank } }; } fn await_with(&mut self, sender: oneshot::Sender) { - *self = match ::std::mem::replace(self, CollationSlot::Blank) { - CollationSlot::Blank => CollationSlot::Awaiting(vec![sender]), - CollationSlot::Awaiting(mut senders) => { + *self = match ::std::mem::replace(self, SlotEntries::Blank) { + SlotEntries::Blank => SlotEntries::Awaiting(vec![sender]), + SlotEntries::Awaiting(mut senders) => { senders.push(sender); - CollationSlot::Awaiting(senders) + SlotEntries::Awaiting(senders) } - CollationSlot::Pending(mut cs) => { + SlotEntries::Pending(mut cs) => { let next_collation = cs.pop().expect("empty variant is always `Blank`; qed"); let _ = sender.send(next_collation); if cs.is_empty() { - CollationSlot::Blank + SlotEntries::Blank } else { - CollationSlot::Pending(cs) + SlotEntries::Pending(cs) } } }; @@ -163,7 +184,8 @@ impl CollatorPool { // TODO: punish if not primary? self.collations.entry((relay_parent, para_id.clone())) - .or_insert_with(|| CollationSlot::Blank) + .or_insert_with(CollationSlot::blank_now) + .entries .received_collation(collation); } } @@ -171,7 +193,8 @@ impl CollatorPool { /// Wait for a collation from a parachain. pub fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId, sender: oneshot::Sender) { self.collations.entry((relay_parent, para_id)) - .or_insert_with(|| CollationSlot::Blank) + .or_insert_with(CollationSlot::blank_now) + .entries .await_with(sender); } @@ -181,6 +204,12 @@ impl CollatorPool { // TODO: rearrange periodically to new primary, evaluate based on latency etc. Vec::new() } + + /// called when a block with given hash has been imported. + pub fn collect_garbage(&mut self, chain_head: Option<&Hash>) { + let now = Instant::now(); + self.collations.retain(|&(ref h, _), slot| chain_head != Some(h) && slot.stay_alive(now)); + } } #[cfg(test)] @@ -260,4 +289,15 @@ mod tests { pool.await_collation(relay_parent, para_id, tx); rx.wait().unwrap(); } + + #[test] + fn slot_stay_alive() { + let slot = CollationSlot::blank_now(); + let now = slot.live_at; + + assert!(slot.stay_alive(now)); + assert!(slot.stay_alive(now + Duration::from_secs(10))); + assert!(!slot.stay_alive(now + COLLATION_LIFETIME)); + assert!(!slot.stay_alive(now + COLLATION_LIFETIME + Duration::from_secs(10))); + } } diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index d149a1ed320ee..9ed1b1a579df3 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -51,7 +51,7 @@ use codec::Slicable; use futures::sync::oneshot; use parking_lot::Mutex; use polkadot_consensus::{Statement, SignedStatement, GenericStatement}; -use polkadot_primitives::{AccountId, Block, SessionKey, Hash}; +use polkadot_primitives::{AccountId, Block, SessionKey, Hash, Header}; use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt, Collation}; use substrate_network::{PeerId, RequestId, Context}; use substrate_network::consensus_gossip::ConsensusGossip; @@ -509,6 +509,7 @@ impl Specialization for PolkadotProtocol { fn maintain_peers(&mut self, ctx: &mut Context) { self.consensus_gossip.collect_garbage(None); + self.collators.collect_garbage(None); self.dispatch_pending_requests(ctx); for collator_action in self.collators.maintain_peers() { @@ -524,6 +525,10 @@ impl Specialization for PolkadotProtocol { } } } + + fn on_block_imported(&mut self, _ctx: &mut Context, hash: Hash, _header: &Header) { + self.collators.collect_garbage(Some(&hash)); + } } impl PolkadotProtocol { From 3d1f9e8dbf1a8d12e5ad9d945b83674147ae9384 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Fri, 13 Jul 2018 14:55:53 +0100 Subject: [PATCH 10/20] extract session-key tracking from consensus --- polkadot/network/src/consensus.rs | 1 - polkadot/network/src/lib.rs | 95 +++++++++++++------------------ polkadot/network/src/tests.rs | 14 ++--- 3 files changed, 46 insertions(+), 64 deletions(-) diff --git a/polkadot/network/src/consensus.rs b/polkadot/network/src/consensus.rs index 3fe22acd5aacf..a09ff1725841d 100644 --- a/polkadot/network/src/consensus.rs +++ b/polkadot/network/src/consensus.rs @@ -284,7 +284,6 @@ impl Network for ConsensusNetwork

{ - v.push(0); - } + None => v.push(0), } v } @@ -115,9 +113,9 @@ struct BlockDataRequest { } struct PeerInfo { - status: Status, - validator: bool, - session_keys: HashMap, + collating_for: Option<(AccountId, ParaId)>, + validator_key: Option, + claimed_validator: bool, } #[derive(Default)] @@ -169,7 +167,6 @@ impl Knowledge { struct CurrentConsensus { knowledge: Arc>, parent_hash: Hash, - session_keys: HashMap, local_session_key: SessionKey, } @@ -179,12 +176,6 @@ impl CurrentConsensus { self.knowledge.lock().candidates.get(hash) .and_then(|entry| entry.block_data.clone()) } - - fn peer_disconnected(&mut self, peer: &PeerInfo) { - if let Some(key) = peer.session_keys.get(&self.parent_hash) { - self.session_keys.remove(key); - } - } } /// Polkadot-specific messages. @@ -192,9 +183,9 @@ impl CurrentConsensus { pub enum Message { /// signed statement and localized parent hash. Statement(Hash, SignedStatement), - /// Tell the peer your session key for the current block. - // TODO: do this with a random challenge protocol - SessionKey(Hash, SessionKey), + /// As a validator, tell the peer your current session key. + // TODO: do this with a cryptographic proof of some kind + SessionKey(SessionKey), /// Requesting parachain block data by candidate hash. RequestBlockData(RequestId, Hash), /// Provide block data by candidate hash or nothing if unknown. @@ -215,6 +206,7 @@ pub struct PolkadotProtocol { peers: HashMap, consensus_gossip: ConsensusGossip, collators: CollatorPool, + validators: HashMap, live_consensus: Option, in_flight: HashMap<(RequestId, PeerId), BlockDataRequest>, pending: Vec, @@ -228,6 +220,7 @@ impl PolkadotProtocol { peers: HashMap::new(), consensus_gossip: ConsensusGossip::new(), collators: CollatorPool::new(), + validators: HashMap::new(), live_consensus: None, in_flight: HashMap::new(), pending: Vec::new(), @@ -261,31 +254,23 @@ impl PolkadotProtocol { } /// Note new consensus session. - fn new_consensus(&mut self, ctx: &mut Context, mut consensus: CurrentConsensus) { - let parent_hash = consensus.parent_hash; - let old_parent = self.live_consensus.as_ref().map(|c| c.parent_hash); - - // TODO: optimize for when session key changes and only send to collators who are relevant in next few blocks. - for (id, info) in self.peers.iter_mut() - .filter(|&(_, ref info)| info.validator || info.status.collating_for.is_some()) - { - send_polkadot_message( - ctx, - *id, - Message::SessionKey(parent_hash, consensus.local_session_key) - ); + fn new_consensus(&mut self, ctx: &mut Context, consensus: CurrentConsensus) { + let old_data = self.live_consensus.as_ref().map(|c| (c.parent_hash, c.local_session_key)); - if let Some(key) = info.session_keys.get(&parent_hash) { - consensus.session_keys.insert(*key, *id); - } - - if let Some(ref old_parent) = old_parent { - info.session_keys.remove(old_parent); + if Some(&consensus.local_session_key) != old_data.as_ref().map(|&(_, ref key)| key) { + for (id, _) in self.peers.iter() + .filter(|&(_, ref info)| info.claimed_validator || info.collating_for.is_some()) + { + send_polkadot_message( + ctx, + *id, + Message::SessionKey(consensus.local_session_key) + ); } } self.live_consensus = Some(consensus); - self.consensus_gossip.collect_garbage(old_parent.as_ref()); + self.consensus_gossip.collect_garbage(old_data.as_ref().map(|&(ref hash, _)| hash)); } fn dispatch_pending_requests(&mut self, ctx: &mut Context) { @@ -309,8 +294,9 @@ impl PolkadotProtocol { continue; } + let validator_keys = &mut self.validators; let next_peer = entry.knows_block_data.iter() - .filter_map(|x| consensus.session_keys.get(x).map(|id| (*x, *id))) + .filter_map(|x| validator_keys.get(x).map(|id| (*x, *id))) .find(|&(ref key, _)| pending.attempted_peers.insert(*key)) .map(|(_, id)| id); @@ -341,26 +327,23 @@ impl PolkadotProtocol { match msg { Message::Statement(parent_hash, _statement) => self.consensus_gossip.on_chain_specific(ctx, peer_id, raw, parent_hash), - Message::SessionKey(parent_hash, key) => { + Message::SessionKey(key) => { { let info = match self.peers.get_mut(&peer_id) { Some(peer) => peer, None => return, }; - if !info.validator { + if !info.claimed_validator { ctx.disable_peer(peer_id); - return; + return } - match self.live_consensus { - Some(ref mut consensus) if consensus.parent_hash == parent_hash => { - consensus.session_keys.insert(key, peer_id); - } - _ => {} + if let Some(old_key) = ::std::mem::replace(&mut info.validator_key, Some(key)) { + self.validators.remove(&old_key); } + self.validators.insert(key, peer_id); - info.session_keys.insert(parent_hash, key); } self.dispatch_pending_requests(ctx); } @@ -425,9 +408,9 @@ impl Specialization for PolkadotProtocol { let send_key = validator || local_status.collating_for.is_some(); self.peers.insert(peer_id, PeerInfo { - status: local_status, - session_keys: Default::default(), - validator, + collating_for: local_status.collating_for, + validator_key: None, + claimed_validator: validator, }); self.consensus_gossip.new_peer(ctx, peer_id, &status.roles); @@ -435,7 +418,7 @@ impl Specialization for PolkadotProtocol { send_polkadot_message( ctx, peer_id, - Message::SessionKey(consensus.parent_hash, consensus.local_session_key) + Message::SessionKey(consensus.local_session_key) ); } @@ -444,7 +427,7 @@ impl Specialization for PolkadotProtocol { fn on_disconnect(&mut self, ctx: &mut Context, peer_id: PeerId) { if let Some(info) = self.peers.remove(&peer_id) { - if let Some((acc_id, _)) = info.status.collating_for { + if let Some((acc_id, _)) = info.collating_for { let new_primary = self.collators.on_disconnect(acc_id) .and_then(|new_primary| self.collator_peer_id(new_primary)); @@ -457,8 +440,8 @@ impl Specialization for PolkadotProtocol { } } - if let (true, &mut Some(ref mut consensus)) = (info.validator, &mut self.live_consensus) { - consensus.peer_disconnected(&info); + if let Some(validator_key) = info.validator_key { + self.validators.remove(&validator_key); } { @@ -539,7 +522,7 @@ impl PolkadotProtocol { match self.peers.get(&from) { None => ctx.disconnect_peer(from), - Some(peer_info) => match peer_info.status.collating_for { + Some(peer_info) => match peer_info.collating_for { None => ctx.disable_peer(from), Some((ref acc_id, ref para_id)) if para_id != &collation_para || acc_id != &collated_acc || collation.receipt.check_signature().is_err() => ctx.disable_peer(from), @@ -558,7 +541,7 @@ impl PolkadotProtocol { fn collator_peer_id(&self, account_id: AccountId) -> Option { self.peers .iter() - .filter(|&(_, info)| info.status.collating_for.as_ref().map_or(false, |&(ref acc_id, _)| acc_id == &account_id)) + .filter(|&(_, info)| info.collating_for.as_ref().map_or(false, |&(ref acc_id, _)| acc_id == &account_id)) .map(|(peer_id, _)| *peer_id) .next() } diff --git a/polkadot/network/src/tests.rs b/polkadot/network/src/tests.rs index 5e3eca4651646..b17f3d9086241 100644 --- a/polkadot/network/src/tests.rs +++ b/polkadot/network/src/tests.rs @@ -89,7 +89,6 @@ fn make_consensus(parent_hash: Hash, local_key: SessionKey) -> (CurrentConsensus let c = CurrentConsensus { knowledge: knowledge.clone(), parent_hash, - session_keys: Default::default(), local_session_key: local_key, }; @@ -123,14 +122,13 @@ fn sends_session_key() { let mut ctx = TestContext::default(); let (consensus, _knowledge) = make_consensus(parent_hash, local_key); protocol.new_consensus(&mut ctx, consensus); - - assert!(ctx.has_message(peer_a, Message::SessionKey(parent_hash, local_key))); + assert!(ctx.has_message(peer_a, Message::SessionKey(local_key))); } { let mut ctx = TestContext::default(); protocol.on_connect(&mut ctx, peer_b, make_status(&collator_status, vec![])); - assert!(ctx.has_message(peer_b, Message::SessionKey(parent_hash, local_key))); + assert!(ctx.has_message(peer_b, Message::SessionKey(local_key))); } } @@ -172,13 +170,14 @@ fn fetches_from_those_with_knowledge() { { let mut ctx = TestContext::default(); protocol.on_connect(&mut ctx, peer_a, make_status(&status, vec![Role::Authority])); - assert!(ctx.has_message(peer_a, Message::SessionKey(parent_hash, local_key))); + assert!(ctx.has_message(peer_a, Message::SessionKey(local_key))); } // peer A gives session key and gets asked for data. { let mut ctx = TestContext::default(); - on_message(&mut protocol, &mut ctx, peer_a, Message::SessionKey(parent_hash, a_key)); + on_message(&mut protocol, &mut ctx, peer_a, Message::SessionKey(a_key)); + assert!(protocol.validators.contains_key(&a_key)); assert!(ctx.has_message(peer_a, Message::RequestBlockData(1, candidate_hash))); } @@ -188,7 +187,7 @@ fn fetches_from_those_with_knowledge() { { let mut ctx = TestContext::default(); protocol.on_connect(&mut ctx, peer_b, make_status(&status, vec![Role::Authority])); - on_message(&mut protocol, &mut ctx, peer_b, Message::SessionKey(parent_hash, b_key)); + on_message(&mut protocol, &mut ctx, peer_b, Message::SessionKey(b_key)); assert!(!ctx.has_message(peer_b, Message::RequestBlockData(2, candidate_hash))); } @@ -197,6 +196,7 @@ fn fetches_from_those_with_knowledge() { { let mut ctx = TestContext::default(); protocol.on_disconnect(&mut ctx, peer_a); + assert!(!protocol.validators.contains_key(&a_key)); assert!(ctx.has_message(peer_b, Message::RequestBlockData(2, candidate_hash))); } From 531fc54db6985309a7d84ce05f926a56437b109c Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 17 Jul 2018 13:52:19 +0200 Subject: [PATCH 11/20] add local_collations.rs --- polkadot/network/src/lib.rs | 2 +- polkadot/network/src/local_collations.rs | 48 ++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) create mode 100644 polkadot/network/src/local_collations.rs diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index cb9fb40df5ed0..d9fa52e572de9 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -44,7 +44,7 @@ extern crate rhododendron; extern crate log; mod collator_pool; -//mod local_collations; +mod local_collations; mod router; pub mod consensus; diff --git a/polkadot/network/src/local_collations.rs b/polkadot/network/src/local_collations.rs new file mode 100644 index 0000000000000..9c6e4cbd46dd7 --- /dev/null +++ b/polkadot/network/src/local_collations.rs @@ -0,0 +1,48 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Local collations to be circulated to validators. +//! +//! Collations are attempted to be repropagated when a new validator connects, +//! a validator changes his session key, or when they are generated. + +use polkadot_primitives::{Hash, Collation}; + +use std::collations::{HashMap, HashSet}; +use std::time::Duration; + +const LIVE_FOR: Duration = Duration::from_secs(60 * 5); + +/// Actions to take. +pub enum Action { + SendTo(SessionKey), +} + +pub struct LocalCollation { + targets: HashSet, + collation: Collation, +} + +pub struct LocalCollations { + primary_for: HashSet, + local_collations: HashMap, +} + +impl LocalCollations { + pub fn new_validator(&mut self) { + unimplemented!() + } +} From c07c35a3531fbf57a51ee4ca45d315f1ed9c9d6f Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 17 Jul 2018 17:09:23 +0200 Subject: [PATCH 12/20] finish polish of local_collations --- polkadot/network/src/local_collations.rs | 135 ++++++++++++++++++++--- 1 file changed, 120 insertions(+), 15 deletions(-) diff --git a/polkadot/network/src/local_collations.rs b/polkadot/network/src/local_collations.rs index 9c6e4cbd46dd7..1a1b1ef9e0c72 100644 --- a/polkadot/network/src/local_collations.rs +++ b/polkadot/network/src/local_collations.rs @@ -19,30 +19,135 @@ //! Collations are attempted to be repropagated when a new validator connects, //! a validator changes his session key, or when they are generated. -use polkadot_primitives::{Hash, Collation}; +use polkadot_primitives::{Hash, SessionKey}; -use std::collations::{HashMap, HashSet}; -use std::time::Duration; +use collator_pool::Role; -const LIVE_FOR: Duration = Duration::from_secs(60 * 5); +use std::collections::{HashMap, HashSet}; +use std::time::{Duration, Instant}; -/// Actions to take. -pub enum Action { - SendTo(SessionKey), -} +const LIVE_FOR: Duration = Duration::from_secs(60 * 5); -pub struct LocalCollation { +struct LocalCollation { targets: HashSet, - collation: Collation, + collation: C, + live_since: Instant, } -pub struct LocalCollations { +/// Tracker for locally collated values and which validators to send them to. +pub struct LocalCollations { primary_for: HashSet, - local_collations: HashMap, + local_collations: HashMap>, } -impl LocalCollations { - pub fn new_validator(&mut self) { - unimplemented!() +impl LocalCollations { + /// Create a new `LocalCollations` tracker. + fn new() -> Self { + LocalCollations { + primary_for: HashSet::new(), + local_collations: HashMap::new(), + } + } + + /// Validator gave us a new role. If the new role is "primary", this function might return + /// a set of collations to send to that validator. + pub fn note_validator_role(&mut self, key: SessionKey, role: Role) -> Vec { + match role { + Role::Backup => { + self.primary_for.remove(&key); + Vec::new() + } + Role::Primary => { + let new_primary = self.primary_for.insert(key); + if new_primary { + self.collations_targeting(&key) + } else { + Vec::new() + } + } + } + } + + /// Fresh session key from a validator. Returns a vector of collations to send + /// to the validator. + pub fn fresh_key(&mut self, old_key: &SessionKey, new_key: &SessionKey) -> Vec { + if self.primary_for.remove(old_key) { + self.primary_for.insert(*new_key); + + self.collations_targeting(new_key) + } else { + Vec::new() + } + } + + /// Validator disconnected. + pub fn on_disconnect(&mut self, key: &SessionKey) { + self.primary_for.remove(key); + } + + /// Mark collations relevant to the given parent hash as obsolete. + pub fn mark_obsolete(&mut self, relay_parent: &Hash) { + self.local_collations.remove(relay_parent); + + let now = Instant::now(); + self.local_collations.retain(|_, v| v.live_since + LIVE_FOR > now); + } + + /// Add a collation. Returns a vector of validators to send the collation to. + pub fn add_collation<'a>(&'a mut self, relay_parent: Hash, targets: HashSet, collation: C) -> impl Iterator + 'a { + self.local_collations.insert(relay_parent, LocalCollation { + targets, + collation, + live_since: Instant::now(), + }); + + let local = self.local_collations.get(&relay_parent) + .expect("just inserted to this key; qed"); + + local.targets.intersection(&self.primary_for).cloned() + } + + fn collations_targeting(&self, key: &SessionKey) -> Vec { + self.local_collations.values() + .filter(|v| v.targets.contains(key)) + .map(|v| v.collation.clone()) + .collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn add_validator_with_ready_collation() { + let key = [1; 32].into(); + let relay_parent = [2; 32].into(); + let targets = { + let mut set = HashSet::new(); + set.insert(key); + set + }; + + let mut tracker = LocalCollations::new(); + assert!(tracker.add_collation(relay_parent, targets, 5).next().is_none()); + assert_eq!(tracker.note_validator_role(key, Role::Primary), vec![5]); + } + + #[test] + fn rename_with_ready() { + let orig_key = [1; 32].into(); + let new_key = [2; 32].into(); + let relay_parent = [255; 32].into(); + let targets = { + let mut set = HashSet::new(); + set.insert(new_key); + set + }; + + let mut tracker = LocalCollations::new(); + assert!(tracker.add_collation(relay_parent, targets, 5).next().is_none()); + assert_eq!(tracker.note_validator_role(orig_key, Role::Primary), vec![]); + assert_eq!(tracker.fresh_key(&orig_key, &new_key), vec![5]); } } From 86b26780b09e689c92f4faa4d00dab33171da8fb Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 17 Jul 2018 17:09:35 +0200 Subject: [PATCH 13/20] integrate local_collations into network layer --- polkadot/network/src/lib.rs | 85 +++++++++++++++++------- polkadot/network/src/local_collations.rs | 46 +++++++++---- 2 files changed, 95 insertions(+), 36 deletions(-) diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index 4e54a9613772b..373150a986909 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -44,7 +44,7 @@ extern crate rhododendron; extern crate log; mod collator_pool; -//mod local_collations; +mod local_collations; mod router; pub mod consensus; @@ -60,6 +60,7 @@ use substrate_network::{message, generic_message}; use substrate_network::specialization::Specialization; use substrate_network::StatusMessage as GenericFullStatus; use self::collator_pool::{CollatorPool, Role, Action}; +use self::local_collations::LocalCollations; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -210,6 +211,7 @@ pub struct PolkadotProtocol { consensus_gossip: ConsensusGossip, collators: CollatorPool, validators: HashMap, + local_collations: LocalCollations, live_consensus: Option, in_flight: HashMap<(RequestId, PeerId), BlockDataRequest>, pending: Vec, @@ -230,6 +232,7 @@ impl PolkadotProtocol { consensus_gossip: ConsensusGossip::new(), collators: CollatorPool::new(), validators: HashMap::new(), + local_collations: LocalCollations::new(), live_consensus: None, in_flight: HashMap::new(), pending: Vec::new(), @@ -336,26 +339,7 @@ impl PolkadotProtocol { match msg { Message::Statement(parent_hash, _statement) => self.consensus_gossip.on_chain_specific(ctx, peer_id, raw, parent_hash), - Message::SessionKey(key) => { - { - let info = match self.peers.get_mut(&peer_id) { - Some(peer) => peer, - None => return, - }; - - if !info.claimed_validator { - ctx.disable_peer(peer_id, "Session key broadcasted without setting authority role"); - return; - } - - if let Some(old_key) = ::std::mem::replace(&mut info.validator_key, Some(key)) { - self.validators.remove(&old_key); - } - self.validators.insert(key, peer_id); - - } - self.dispatch_pending_requests(ctx); - } + Message::SessionKey(key) => self.on_session_key(ctx, peer_id, key), Message::RequestBlockData(req_id, hash) => { let block_data = self.live_consensus.as_ref() .and_then(|c| c.block_data(&hash)); @@ -364,10 +348,40 @@ impl PolkadotProtocol { } Message::BlockData(req_id, data) => self.on_block_data(ctx, peer_id, req_id, data), Message::Collation(relay_parent, collation) => self.on_collation(ctx, peer_id, relay_parent, collation), - Message::CollatorRole(_) => {}, + Message::CollatorRole(role) => self.on_new_role(ctx, peer_id, role), } } + fn on_session_key(&mut self, ctx: &mut Context, peer_id: PeerId, key: SessionKey) { + { + let info = match self.peers.get_mut(&peer_id) { + Some(peer) => peer, + None => return, + }; + + if !info.claimed_validator { + ctx.disable_peer(peer_id, "Session key broadcasted without setting authority role"); + return; + } + + if let Some(old_key) = ::std::mem::replace(&mut info.validator_key, Some(key)) { + self.validators.remove(&old_key); + + for (relay_parent, collation) in self.local_collations.fresh_key(&old_key, &key) { + send_polkadot_message( + ctx, + peer_id, + Message::Collation(relay_parent, collation), + ) + } + + } + self.validators.insert(key, peer_id); + } + + self.dispatch_pending_requests(ctx); + } + fn on_block_data(&mut self, ctx: &mut Context, peer_id: PeerId, req_id: RequestId, data: Option) { match self.in_flight.remove(&(req_id, peer_id)) { Some(req) => { @@ -384,6 +398,28 @@ impl PolkadotProtocol { None => ctx.disable_peer(peer_id, "Unexpected block data response"), } } + + // when a validator sends us (a collator) a new role. + fn on_new_role(&mut self, ctx: &mut Context, peer_id: PeerId, role: Role) { + let info = match self.peers.get(&peer_id) { + Some(peer) => peer, + None => return, + }; + + match info.validator_key { + None => { + ctx.disable_peer(peer_id, "Sent collator role without registering first as validator"); + return; + } + Some(key) => for (relay_parent, collation) in self.local_collations.note_validator_role(key, role) { + send_polkadot_message( + ctx, + peer_id, + Message::Collation(relay_parent, collation), + ) + }, + } + } } impl Specialization for PolkadotProtocol { @@ -451,6 +487,7 @@ impl Specialization for PolkadotProtocol { if let Some(validator_key) = info.validator_key { self.validators.remove(&validator_key); + self.local_collations.on_disconnect(&validator_key); } { @@ -502,6 +539,7 @@ impl Specialization for PolkadotProtocol { fn maintain_peers(&mut self, ctx: &mut Context) { self.consensus_gossip.collect_garbage(None); self.collators.collect_garbage(None); + self.local_collations.collect_garbage(None); self.dispatch_pending_requests(ctx); for collator_action in self.collators.maintain_peers() { @@ -518,8 +556,9 @@ impl Specialization for PolkadotProtocol { } } - fn on_block_imported(&mut self, _ctx: &mut Context, hash: Hash, _header: &Header) { + fn on_block_imported(&mut self, _ctx: &mut Context, hash: Hash, header: &Header) { self.collators.collect_garbage(Some(&hash)); + self.local_collations.collect_garbage(Some(&header.parent_hash)); } } diff --git a/polkadot/network/src/local_collations.rs b/polkadot/network/src/local_collations.rs index 1a1b1ef9e0c72..4240b31a7d753 100644 --- a/polkadot/network/src/local_collations.rs +++ b/polkadot/network/src/local_collations.rs @@ -42,7 +42,7 @@ pub struct LocalCollations { impl LocalCollations { /// Create a new `LocalCollations` tracker. - fn new() -> Self { + pub fn new() -> Self { LocalCollations { primary_for: HashSet::new(), local_collations: HashMap::new(), @@ -51,7 +51,7 @@ impl LocalCollations { /// Validator gave us a new role. If the new role is "primary", this function might return /// a set of collations to send to that validator. - pub fn note_validator_role(&mut self, key: SessionKey, role: Role) -> Vec { + pub fn note_validator_role(&mut self, key: SessionKey, role: Role) -> Vec<(Hash, C)> { match role { Role::Backup => { self.primary_for.remove(&key); @@ -70,7 +70,7 @@ impl LocalCollations { /// Fresh session key from a validator. Returns a vector of collations to send /// to the validator. - pub fn fresh_key(&mut self, old_key: &SessionKey, new_key: &SessionKey) -> Vec { + pub fn fresh_key(&mut self, old_key: &SessionKey, new_key: &SessionKey) -> Vec<(Hash, C)> { if self.primary_for.remove(old_key) { self.primary_for.insert(*new_key); @@ -86,8 +86,10 @@ impl LocalCollations { } /// Mark collations relevant to the given parent hash as obsolete. - pub fn mark_obsolete(&mut self, relay_parent: &Hash) { - self.local_collations.remove(relay_parent); + pub fn collect_garbage(&mut self, relay_parent: Option<&Hash>) { + if let Some(relay_parent) = relay_parent { + self.local_collations.remove(relay_parent); + } let now = Instant::now(); self.local_collations.retain(|_, v| v.live_since + LIVE_FOR > now); @@ -107,10 +109,10 @@ impl LocalCollations { local.targets.intersection(&self.primary_for).cloned() } - fn collations_targeting(&self, key: &SessionKey) -> Vec { - self.local_collations.values() - .filter(|v| v.targets.contains(key)) - .map(|v| v.collation.clone()) + fn collations_targeting(&self, key: &SessionKey) -> Vec<(Hash, C)> { + self.local_collations.iter() + .filter(|&(_, ref v)| v.targets.contains(key)) + .map(|(h, v)| (*h, v.collation.clone())) .collect() } } @@ -131,7 +133,7 @@ mod tests { let mut tracker = LocalCollations::new(); assert!(tracker.add_collation(relay_parent, targets, 5).next().is_none()); - assert_eq!(tracker.note_validator_role(key, Role::Primary), vec![5]); + assert_eq!(tracker.note_validator_role(key, Role::Primary), vec![(relay_parent, 5)]); } #[test] @@ -145,9 +147,27 @@ mod tests { set }; - let mut tracker = LocalCollations::new(); + let mut tracker: LocalCollations = LocalCollations::new(); assert!(tracker.add_collation(relay_parent, targets, 5).next().is_none()); - assert_eq!(tracker.note_validator_role(orig_key, Role::Primary), vec![]); - assert_eq!(tracker.fresh_key(&orig_key, &new_key), vec![5]); + assert_eq!(tracker.note_validator_role(orig_key, Role::Primary), Vec::<(Hash, u8)>::new()); + assert_eq!(tracker.fresh_key(&orig_key, &new_key), vec![(relay_parent, 5u8)]); + } + + #[test] + fn collecting_garbage() { + let relay_parent_a = [255; 32].into(); + let relay_parent_b = [222; 32].into(); + + let mut tracker: LocalCollations = LocalCollations::new(); + assert!(tracker.add_collation(relay_parent_a, HashSet::new(), 5).next().is_none()); + assert!(tracker.add_collation(relay_parent_b, HashSet::new(), 69).next().is_none()); + + let live_since = Instant::now() - LIVE_FOR - Duration::from_secs(10); + tracker.local_collations.get_mut(&relay_parent_b).unwrap().live_since = live_since; + + tracker.collect_garbage(Some(&relay_parent_a)); + + // first one pruned because of relay parent, other because of time. + assert!(tracker.local_collations.is_empty()); } } From c6daf249812a78368605d4e16901eca63ff01dfc Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 17 Jul 2018 17:19:28 +0200 Subject: [PATCH 14/20] introduce API for adding local collations --- polkadot/network/src/lib.rs | 23 ++++++++++++++++++++ polkadot/network/src/local_collations.rs | 27 ++++++++++++++++++++---- 2 files changed, 46 insertions(+), 4 deletions(-) diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index 373150a986909..c9201062adb3e 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -611,3 +611,26 @@ impl PolkadotProtocol { } } } + +impl PolkadotProtocol { + /// Add a local collation and broadcast it to the necessary peers. + pub fn add_local_collation( + &mut self, + ctx: &mut Context, + relay_parent: Hash, + targets: HashSet, + collation: Collation, + ) { + for (primary, cloned_collation) in self.local_collations.add_collation(relay_parent, targets, collation.clone()) { + match self.validators.get(&primary) { + Some(peer_id) => send_polkadot_message( + ctx, + *peer_id, + Message::Collation(relay_parent, cloned_collation), + ), + None => + warn!(target: "polkadot_network", "Encountered tracked but disconnected validator {:?}", primary), + } + } + } +} diff --git a/polkadot/network/src/local_collations.rs b/polkadot/network/src/local_collations.rs index 4240b31a7d753..68851ae50f438 100644 --- a/polkadot/network/src/local_collations.rs +++ b/polkadot/network/src/local_collations.rs @@ -95,8 +95,8 @@ impl LocalCollations { self.local_collations.retain(|_, v| v.live_since + LIVE_FOR > now); } - /// Add a collation. Returns a vector of validators to send the collation to. - pub fn add_collation<'a>(&'a mut self, relay_parent: Hash, targets: HashSet, collation: C) -> impl Iterator + 'a { + /// Add a collation. Returns an iterator of session keys to send to and lazy copies of the collation. + pub fn add_collation<'a>(&'a mut self, relay_parent: Hash, targets: HashSet, collation: C) -> impl Iterator + 'a { self.local_collations.insert(relay_parent, LocalCollation { targets, collation, @@ -106,7 +106,10 @@ impl LocalCollations { let local = self.local_collations.get(&relay_parent) .expect("just inserted to this key; qed"); - local.targets.intersection(&self.primary_for).cloned() + let borrowed_collation = &local.collation; + local.targets + .intersection(&self.primary_for) + .map(move |k| (*k, borrowed_collation.clone())) } fn collations_targeting(&self, key: &SessionKey) -> Vec<(Hash, C)> { @@ -149,7 +152,7 @@ mod tests { let mut tracker: LocalCollations = LocalCollations::new(); assert!(tracker.add_collation(relay_parent, targets, 5).next().is_none()); - assert_eq!(tracker.note_validator_role(orig_key, Role::Primary), Vec::<(Hash, u8)>::new()); + assert!(tracker.note_validator_role(orig_key, Role::Primary).is_empty()); assert_eq!(tracker.fresh_key(&orig_key, &new_key), vec![(relay_parent, 5u8)]); } @@ -170,4 +173,20 @@ mod tests { // first one pruned because of relay parent, other because of time. assert!(tracker.local_collations.is_empty()); } + + #[test] + fn add_collation_with_connected_target() { + let key = [1; 32].into(); + let relay_parent = [2; 32].into(); + let targets = { + let mut set = HashSet::new(); + set.insert(key); + set + }; + + let mut tracker = LocalCollations::new(); + assert!(tracker.note_validator_role(key, Role::Primary).is_empty()); + assert_eq!(tracker.add_collation(relay_parent, targets, 5).next(), Some((key, 5))); + + } } From 96a7093c354f68c1d75c6ca5d03f16f87866096c Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 17 Jul 2018 18:49:59 +0200 Subject: [PATCH 15/20] mostly finish collator implementation pending service fix --- polkadot/collator/Cargo.toml | 1 + polkadot/collator/src/lib.rs | 89 +++++++++++++++++++++++------------- 2 files changed, 59 insertions(+), 31 deletions(-) diff --git a/polkadot/collator/Cargo.toml b/polkadot/collator/Cargo.toml index c91b6a5d24166..122261e3f65b7 100644 --- a/polkadot/collator/Cargo.toml +++ b/polkadot/collator/Cargo.toml @@ -15,3 +15,4 @@ polkadot-primitives = { path = "../primitives", version = "0.1" } polkadot-cli = { path = "../cli" } log = "0.4" ed25519 = { path = "../../substrate/ed25519" } +tokio = "0.1.7" diff --git a/polkadot/collator/src/lib.rs b/polkadot/collator/src/lib.rs index 7da24c51b691f..5624243927184 100644 --- a/polkadot/collator/src/lib.rs +++ b/polkadot/collator/src/lib.rs @@ -49,6 +49,7 @@ extern crate substrate_client as client; extern crate substrate_codec as codec; extern crate substrate_primitives as primitives; extern crate ed25519; +extern crate tokio; extern crate polkadot_api; extern crate polkadot_cli; @@ -58,16 +59,20 @@ extern crate polkadot_primitives; #[macro_use] extern crate log; -use std::collections::{BTreeSet, BTreeMap}; +use std::collections::{BTreeSet, BTreeMap, HashSet}; use std::sync::Arc; +use std::time::{Duration, Instant}; use futures::{future, stream, Stream, Future, IntoFuture}; use client::BlockchainEvents; use polkadot_api::PolkadotApi; -use polkadot_primitives::BlockId; -use polkadot_primitives::parachain::{self, BlockData, HeadData, ConsolidatedIngress, Collation, Message, Id as ParaId}; +use polkadot_primitives::{BlockId, SessionKey}; +use polkadot_primitives::parachain::{self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId}; use polkadot_cli::{ServiceComponents, Service}; use polkadot_cli::Worker; +use tokio::timer::Deadline; + +const COLLATION_TIMEOUT: Duration = Duration::from_secs(30); /// Parachain context needed for collation. /// @@ -183,7 +188,7 @@ pub fn collate<'a, R, P>( struct ApiContext; impl RelayChainContext for ApiContext { - type Error = (); + type Error = ::polkadot_api::Error; type FutureEgress = Result>, Self::Error>; fn routing_parachains(&self) -> BTreeSet { @@ -217,35 +222,48 @@ impl Worker for CollationNode where let CollationNode { parachain_context, exit, para_id, key } = self; let client = service.client(); let api = service.api(); + let network = service.network(); let work = client.import_notification_stream() - .and_then(move |notification| { - let id = BlockId::hash(notification.hash); - - match api.parachain_head(&id, para_id) { - Ok(Some(last_head)) => { - let collation_work = collate( - para_id, - HeadData(last_head), - ApiContext, - parachain_context.clone(), - key.clone(), - ).map(Some); - - future::Either::A(collation_work) - } - Ok(None) => { - info!("Parachain {:?} appears to be inactive. Cannot collate.", id); - future::Either::B(future::ok(None)) - } - Err(e) => { - warn!("Could not collate for parachain {:?}: {:?}", id, e); - future::Either::B(future::ok(None)) // returning error would shut down the collation node - } - } - }) - .for_each(|_collation: Option| { - // TODO: import into network. + .for_each(move |notification| { + let relay_parent = notification.hash; + let id = BlockId::hash(relay_parent); + + let network = network.clone(); + let api = api.clone(); + let key = key.clone(); + let parachain_context = parachain_context.clone(); + + let work = future::lazy(move || { + let last_head = match api.parachain_head(&id, para_id)? { + Some(last_head) => last_head, + None => return Ok(()), + }; + + let targets = compute_targets( + para_id, + api.session_keys(&id)?.as_slice(), + api.duty_roster(&id)?, + ); + + collate( + para_id, + HeadData(last_head), + ApiContext, + parachain_context, + key, + ).map(|collation| { + network.with_spec(|spec, ctx| spec.add_local_collation( + ctx, + relay_parent, + targets, + collation, + )) + }) + }); + let deadlined = Deadline::new(work, Instant::now() + COLLATION_TIMEOUT); + + tokio::spawn(deadlined.map_err(|e| warn!("Collation failure: {}", e))); Ok(()) }); @@ -254,6 +272,15 @@ impl Worker for CollationNode where } } +fn compute_targets(para_id: ParaId, session_keys: &[SessionKey], roster: DutyRoster) -> HashSet { + use polkadot_primitives::parachain::Chain; + + roster.validator_duty.iter().enumerate() + .filter(|&(_, ref c)| c == &Chain::Parachain(para_id)) + .filter_map(|(i, _)| session_keys.get(i)) + .collect() +} + /// Run a collator node with the given `RelayChainContext` and `ParachainContext` and /// arguments to the underlying polkadot node. /// From 1eaa1bc4be55f2fff8a81a558d1770ba5ef00c6e Mon Sep 17 00:00:00 2001 From: arkpar Date: Tue, 17 Jul 2018 18:52:47 +0200 Subject: [PATCH 16/20] Specialized network() --- polkadot/service/src/lib.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index ed650ae680b17..45be076dc1c19 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -60,6 +60,7 @@ pub use client::ExecutionStrategy; pub type ChainSpec = service::ChainSpec; /// Polkadot client type for specialised `Components`. pub type ComponentClient = Client<::Backend, ::Executor, Block>; +pub type NetworkService = network::Service::NetworkProtocol>; /// A collection of type to generalise Polkadot specific components over full / light client. pub trait Components: service::Components { @@ -134,6 +135,7 @@ impl service::ServiceFactory for Factory { pub struct Service { inner: service::Service, client: Arc>, + network: Arc, api: Arc<::Api>, _consensus: Option, } @@ -143,6 +145,10 @@ impl Service { self.client.clone() } + pub fn network(&self) -> Arc { + self.network.clone() + } + pub fn api(&self) -> Arc<::Api> { self.api.clone() } @@ -156,6 +162,7 @@ pub fn new_light(config: Configuration, executor: TaskExecutor) let api = Arc::new(RemotePolkadotApiWrapper(service.client())); Ok(Service { client: service.client(), + network: service.network(), api: api, inner: service, _consensus: None, @@ -192,6 +199,7 @@ pub fn new_full(config: Configuration, executor: TaskExecutor) Ok(Service { client: service.client(), + network: service.network(), api: service.client(), inner: service, _consensus: consensus, From 398b29c9726f0e1d77f884d1388005578fab2b05 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 17 Jul 2018 19:24:49 +0200 Subject: [PATCH 17/20] push collations to the network --- Cargo.lock | 1 + polkadot/cli/src/lib.rs | 6 ++--- polkadot/collator/src/lib.rs | 45 +++++++++++++++++++++++++----------- polkadot/network/src/lib.rs | 5 ++-- 4 files changed, 38 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2ef890ba26f8b..c8af746f7a30e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1883,6 +1883,7 @@ dependencies = [ "substrate-client 0.1.0", "substrate-codec 0.1.0", "substrate-primitives 0.1.0", + "tokio 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index e4c6aa8a704a5..6752e01c180e0 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -134,7 +134,7 @@ fn base_path(matches: &clap::ArgMatches) -> PathBuf { pub trait Worker { /// A future that resolves when the work is done or the node should exit. /// This will be run on a tokio runtime. - type Work: Future; + type Work: Future + Send + 'static; /// An exit scheduled for the future. type Exit: Future + Send + 'static; @@ -221,7 +221,7 @@ pub fn run(args: I, worker: W) -> error::Result<()> where info!("Starting collator"); // TODO [rob]: collation node implementation // This isn't a thing. Different parachains will have their own collator executables and - // maybe link to libpolkadot to get a light-client. + // maybe link to libpolkadot to get a light-client. service::Roles::LIGHT } else if matches.is_present("light") { info!("Starting (light)"); @@ -494,7 +494,7 @@ fn run_until_exit( ) }; - let _ = worker.work(&service).wait(); + let _ = runtime.block_on(worker.work(&service)); exit_send.fire(); Ok(()) } diff --git a/polkadot/collator/src/lib.rs b/polkadot/collator/src/lib.rs index 5624243927184..e9bd153a11c25 100644 --- a/polkadot/collator/src/lib.rs +++ b/polkadot/collator/src/lib.rs @@ -208,10 +208,10 @@ struct CollationNode { } impl Worker for CollationNode where - P: ParachainContext + 'static, + P: ParachainContext + Send + 'static, E: Future + Send + 'static { - type Work = Box>; + type Work = Box + Send>; type Exit = E; fn exit_only(self) -> Self::Exit { @@ -226,6 +226,15 @@ impl Worker for CollationNode where let work = client.import_notification_stream() .for_each(move |notification| { + macro_rules! try_fr { + ($e:expr) => { + match $e { + Ok(x) => x, + Err(e) => return future::Either::A(future::err(e)), + } + } + } + let relay_parent = notification.hash; let id = BlockId::hash(relay_parent); @@ -235,35 +244,44 @@ impl Worker for CollationNode where let parachain_context = parachain_context.clone(); let work = future::lazy(move || { - let last_head = match api.parachain_head(&id, para_id)? { + let last_head = match try_fr!(api.parachain_head(&id, para_id)) { Some(last_head) => last_head, - None => return Ok(()), + None => return future::Either::A(future::ok(())), }; let targets = compute_targets( para_id, - api.session_keys(&id)?.as_slice(), - api.duty_roster(&id)?, + try_fr!(api.session_keys(&id)).as_slice(), + try_fr!(api.duty_roster(&id)), ); - collate( + let collation_work = collate( para_id, HeadData(last_head), ApiContext, parachain_context, key, - ).map(|collation| { + ).map(move |collation| { network.with_spec(|spec, ctx| spec.add_local_collation( ctx, relay_parent, targets, collation, - )) - }) + )); + }); + + future::Either::B(collation_work) }); let deadlined = Deadline::new(work, Instant::now() + COLLATION_TIMEOUT); + let silenced = deadlined.then(|res| match res { + Ok(()) => Ok(()), + Err(e) => { + warn!("Collation failure: {}", e); + Ok(()) + } + }); - tokio::spawn(deadlined.map_err(|e| warn!("Collation failure: {}", e))); + tokio::spawn(silenced); Ok(()) }); @@ -276,8 +294,9 @@ fn compute_targets(para_id: ParaId, session_keys: &[SessionKey], roster: DutyRos use polkadot_primitives::parachain::Chain; roster.validator_duty.iter().enumerate() - .filter(|&(_, ref c)| c == &Chain::Parachain(para_id)) + .filter(|&(_, c)| c == &Chain::Parachain(para_id)) .filter_map(|(i, _)| session_keys.get(i)) + .cloned() .collect() } @@ -293,7 +312,7 @@ pub fn run_collator( key: Arc, args: Vec<::std::ffi::OsString> ) -> polkadot_cli::error::Result<()> where - P: ParachainContext + 'static, + P: ParachainContext + Send + 'static, E: IntoFuture, E::Future: Send + 'static, { diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index fe208dc2ea80d..00477dbf2a2fb 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -202,9 +202,8 @@ impl Encode for Message { dest.push(h); dest.push(s); } - Message::SessionKey(ref h, ref k) => { + Message::SessionKey(ref k) => { dest.push_byte(1); - dest.push(h); dest.push(k); } Message::RequestBlockData(ref id, ref d) => { @@ -234,7 +233,7 @@ impl Decode for Message { fn decode(input: &mut I) -> Option { match input.read_byte()? { 0 => Some(Message::Statement(Decode::decode(input)?, Decode::decode(input)?)), - 1 => Some(Message::SessionKey(Decode::decode(input)?, Decode::decode(input)?)), + 1 => Some(Message::SessionKey(Decode::decode(input)?)), 2 => Some(Message::RequestBlockData(Decode::decode(input)?, Decode::decode(input)?)), 3 => Some(Message::BlockData(Decode::decode(input)?, Decode::decode(input)?)), 4 => Some(Message::CollatorRole(Decode::decode(input)?)), From 7645d50d7509ebaed9e853b3dd542d5e02c969cb Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 18 Jul 2018 13:06:26 +0200 Subject: [PATCH 18/20] grumbles --- polkadot/network/src/lib.rs | 18 ++++++++++++------ polkadot/network/src/local_collations.rs | 9 ++++++++- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index 00477dbf2a2fb..832407f4b8367 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -397,7 +397,10 @@ impl PolkadotProtocol { { let info = match self.peers.get_mut(&peer_id) { Some(peer) => peer, - None => return, + None => { + trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", peer_id); + return + } }; if !info.claimed_validator { @@ -444,14 +447,17 @@ impl PolkadotProtocol { fn on_new_role(&mut self, ctx: &mut Context, peer_id: PeerId, role: Role) { let info = match self.peers.get(&peer_id) { Some(peer) => peer, - None => return, + None => { + trace!(target: "p_net", "Network inconsistency: message received from unconnected peer {}", peer_id); + return + } }; match info.validator_key { - None => { - ctx.disable_peer(peer_id, "Sent collator role without registering first as validator"); - return; - } + None => ctx.disable_peer( + peer_id, + "Sent collator role without registering first as validator", + ), Some(key) => for (relay_parent, collation) in self.local_collations.note_validator_role(key, role) { send_polkadot_message( ctx, diff --git a/polkadot/network/src/local_collations.rs b/polkadot/network/src/local_collations.rs index 68851ae50f438..2902ed5f0e710 100644 --- a/polkadot/network/src/local_collations.rs +++ b/polkadot/network/src/local_collations.rs @@ -96,7 +96,14 @@ impl LocalCollations { } /// Add a collation. Returns an iterator of session keys to send to and lazy copies of the collation. - pub fn add_collation<'a>(&'a mut self, relay_parent: Hash, targets: HashSet, collation: C) -> impl Iterator + 'a { + pub fn add_collation<'a>( + &'a mut self, + relay_parent: Hash, + targets: HashSet, + collation: C + ) + -> impl Iterator + 'a + { self.local_collations.insert(relay_parent, LocalCollation { targets, collation, From 7a1d194c5e6a062dfaee25aaa051ec5280cdc843 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 18 Jul 2018 13:34:41 +0200 Subject: [PATCH 19/20] substrate-service has custom configuration --- polkadot/cli/src/cli.yml | 9 +++++---- polkadot/cli/src/lib.rs | 10 +++------- substrate/service/src/components.rs | 16 +++++++++++++--- substrate/service/src/config.rs | 9 ++++++--- substrate/service/src/lib.rs | 12 +++++++----- 5 files changed, 34 insertions(+), 22 deletions(-) diff --git a/polkadot/cli/src/cli.yml b/polkadot/cli/src/cli.yml index 1d4e876d80ad3..1f818cab2d11e 100644 --- a/polkadot/cli/src/cli.yml +++ b/polkadot/cli/src/cli.yml @@ -29,10 +29,11 @@ args: value_name: KEY help: Specify node secret key (64-character hex string) takes_value: true - - collator: - long: collator - help: Enable collator mode - takes_value: false + - collator-for: + long: collator-for + value_name: CHAIN + help: Enable collator mode on given chain ID (8-character hex string) + takes_value: true - validator: long: validator help: Enable validator mode diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index 6752e01c180e0..4fa0e78d86d56 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -217,13 +217,7 @@ pub fn run(args: I, worker: W) -> error::Result<()> where }; let role = - if matches.is_present("collator") { - info!("Starting collator"); - // TODO [rob]: collation node implementation - // This isn't a thing. Different parachains will have their own collator executables and - // maybe link to libpolkadot to get a light-client. - service::Roles::LIGHT - } else if matches.is_present("light") { + if matches.is_present("light") { info!("Starting (light)"); config.execution_strategy = service::ExecutionStrategy::NativeWhenPossible; service::Roles::LIGHT @@ -265,6 +259,8 @@ pub fn run(args: I, worker: W) -> error::Result<()> where Some(port) => port.parse().expect("Invalid p2p port value specified."), None => 30333, }; + + let config.network.listen_address = Some(SocketAddr::new("0.0.0.0".parse().unwrap(), port)); config.network.public_address = None; config.network.client_version = format!("parity-polkadot/{}", crate_version!()); diff --git a/substrate/service/src/components.rs b/substrate/service/src/components.rs index dac2bd1da7e17..f6d6dd35642cc 100644 --- a/substrate/service/src/components.rs +++ b/substrate/service/src/components.rs @@ -80,6 +80,9 @@ pub type FactoryGenesis = ::Genesis; /// `Block` type for a factory. pub type FactoryBlock = ::Block; +/// Full `Configuration` type for a factory. +pub type FactoryFullConfiguration = Configuration<::Configuration, FactoryGenesis>; + /// Client type for `Components`. pub type ComponentClient = Client< ::Backend, @@ -111,6 +114,9 @@ pub trait ServiceFactory { type LightExtrinsicPool: ExtrinsicPool; /// Genesis configuration for the runtime. type Genesis: RuntimeGenesis; + /// Other configuration for service members. + type Configuration: Default; + /// Network protocol id. const NETWORK_PROTOCOL_ID: network::ProtocolId; @@ -121,6 +127,10 @@ pub trait ServiceFactory { /// Extrinsic pool constructor for the light client. fn build_light_extrinsic_pool(config: ExtrinsicPoolOptions, client: Arc>) -> Result; + + /// Build network protocol. + fn build_network_protocol(config: &FactoryFullConfiguration) + -> Result; } // TODO: move this to substrate-extrinsic-pool @@ -147,7 +157,7 @@ pub trait Components { /// Create client. fn build_client( - config: &Configuration>, + config: &FactoryFullConfiguration, executor: CodeExecutor, ) -> Result<( @@ -172,7 +182,7 @@ impl Components for FullComponents { type ExtrinsicPool = ::FullExtrinsicPool; fn build_client( - config: &Configuration>, + config: &FactoryFullConfiguration, executor: CodeExecutor, ) -> Result<( @@ -207,7 +217,7 @@ impl Components for LightComponents { type ExtrinsicPool = ::LightExtrinsicPool; fn build_client( - config: &Configuration>, + config: &FactoryFullConfiguration, executor: CodeExecutor, ) -> Result<( diff --git a/substrate/service/src/config.rs b/substrate/service/src/config.rs index c2600383aaef4..8a5f32410006b 100644 --- a/substrate/service/src/config.rs +++ b/substrate/service/src/config.rs @@ -26,7 +26,7 @@ use runtime_primitives::BuildStorage; use serde::{Serialize, de::DeserializeOwned}; /// Service configuration. -pub struct Configuration { +pub struct Configuration { /// Node roles. pub roles: Roles, /// Extrinsic pool configuration. @@ -43,6 +43,8 @@ pub struct Configuration { pub keys: Vec, /// Chain configuration. pub chain_spec: ChainSpec, + /// Custom configuration. + pub custom: C, /// Telemetry server URL, optional - only `Some` if telemetry reporting is enabled pub telemetry: Option, /// Node name. @@ -55,9 +57,9 @@ pub struct Configuration { pub max_heap_pages: usize, } -impl Configuration { +impl Configuration { /// Create default config for given chain spec. - pub fn default_with_spec(chain_spec: ChainSpec) -> Configuration { + pub fn default_with_spec(chain_spec: ChainSpec) -> Self { let mut configuration = Configuration { chain_spec, name: Default::default(), @@ -67,6 +69,7 @@ impl Configuration { keystore_path: Default::default(), database_path: Default::default(), keys: Default::default(), + custom: Default::default(), telemetry: Default::default(), pruning: PruningMode::ArchiveAll, execution_strategy: ExecutionStrategy::Both, diff --git a/substrate/service/src/lib.rs b/substrate/service/src/lib.rs index bbd8bd177830c..0ba8c3c7040ea 100644 --- a/substrate/service/src/lib.rs +++ b/substrate/service/src/lib.rs @@ -68,7 +68,7 @@ pub use extrinsic_pool::api::{ExtrinsicPool as ExtrinsicPoolApi}; pub use components::{ServiceFactory, FullBackend, FullExecutor, LightBackend, LightExecutor, ExtrinsicPool, Components, PoolApi, ComponentClient, ComponentBlock, FullClient, LightClient, FullComponents, LightComponents, - CodeExecutor, NetworkService, FactoryChainSpec, FactoryBlock, RuntimeGenesis, + CodeExecutor, NetworkService, FactoryChainSpec, FactoryBlock, FactoryFullConfiguration, RuntimeGenesis, }; /// Substrate service. @@ -81,7 +81,7 @@ pub struct Service { } /// Creates bare client without any networking. -pub fn new_client(config: Configuration) +pub fn new_client(config: FactoryFullConfiguration) -> Result>>, error::Error> { let executor = NativeExecutor::with_heap_pages(config.min_heap_pages, config.max_heap_pages); @@ -98,8 +98,8 @@ impl Service { /// Creates a new service. pub fn new( - config: Configuration<::Genesis>, - task_executor: TaskExecutor + config: FactoryFullConfiguration, + task_executor: TaskExecutor, ) -> Result { @@ -124,10 +124,12 @@ impl Service info!("Best block: #{}", best_header.number()); telemetry!("node.start"; "height" => best_header.number().as_(), "best" => ?best_header.hash()); + let network_protocol = ::build_network_protocol(&config)?; let extrinsic_pool = Arc::new( Components::build_extrinsic_pool(config.extrinsic_pool, client.clone())? ); let extrinsic_pool_adapter = extrinsic_pool.clone(); + let network_params = network::Params { config: network::ProtocolConfig { roles: config.roles, @@ -137,7 +139,7 @@ impl Service on_demand: on_demand.clone() .map(|d| d as Arc>>), transaction_pool: extrinsic_pool_adapter, - specialization: ::NetworkProtocol::default(), + specialization: network_protocol, }; let network = network::Service::new(network_params, Components::Factory::NETWORK_PROTOCOL_ID)?; From 2382cbb9ac5f4255607e47a547a0a581219870e2 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 18 Jul 2018 14:05:50 +0200 Subject: [PATCH 20/20] initialize network in collator mode as necessary --- polkadot/cli/src/cli.yml | 5 ----- polkadot/cli/src/lib.rs | 12 ++++++++--- polkadot/collator/src/lib.rs | 21 +++++++++++++++---- polkadot/network/src/lib.rs | 12 ++++------- polkadot/network/src/tests.rs | 6 +++--- polkadot/service/src/lib.rs | 32 ++++++++++++++++++++++++----- substrate/service/src/components.rs | 2 +- 7 files changed, 61 insertions(+), 29 deletions(-) diff --git a/polkadot/cli/src/cli.yml b/polkadot/cli/src/cli.yml index 1f818cab2d11e..be0d0e501bf09 100644 --- a/polkadot/cli/src/cli.yml +++ b/polkadot/cli/src/cli.yml @@ -29,11 +29,6 @@ args: value_name: KEY help: Specify node secret key (64-character hex string) takes_value: true - - collator-for: - long: collator-for - value_name: CHAIN - help: Enable collator mode on given chain ID (8-character hex string) - takes_value: true - validator: long: validator help: Enable validator mode diff --git a/polkadot/cli/src/lib.rs b/polkadot/cli/src/lib.rs index 4fa0e78d86d56..827b7518eb999 100644 --- a/polkadot/cli/src/lib.rs +++ b/polkadot/cli/src/lib.rs @@ -71,7 +71,7 @@ pub use client::error::Error as ClientError; pub use client::backend::Backend as ClientBackend; pub use state_machine::Backend as StateMachineBackend; pub use polkadot_primitives::Block as PolkadotBlock; -pub use service::{Components as ServiceComponents, Service}; +pub use service::{Components as ServiceComponents, Service, CustomConfiguration}; use std::io::{self, Write, Read, stdin, stdout}; use std::fs::File; @@ -139,6 +139,11 @@ pub trait Worker { /// An exit scheduled for the future. type Exit: Future + Send + 'static; + /// Return configuration for the polkadot node. + // TODO: make this the full configuration, so embedded nodes don't need + // string CLI args + fn configuration(&self) -> CustomConfiguration { Default::default() } + /// Don't work, but schedule an exit. fn exit_only(self) -> Self::Exit; @@ -256,11 +261,10 @@ pub fn run(args: I, worker: W) -> error::Result<()> where config.network.net_config_path = config.network.config_path.clone(); let port = match matches.value_of("port") { - Some(port) => port.parse().expect("Invalid p2p port value specified."), + Some(port) => port.parse().map_err(|_| "Invalid p2p port value specified.")?, None => 30333, }; - let config.network.listen_address = Some(SocketAddr::new("0.0.0.0".parse().unwrap(), port)); config.network.public_address = None; config.network.client_version = format!("parity-polkadot/{}", crate_version!()); @@ -271,6 +275,8 @@ pub fn run(args: I, worker: W) -> error::Result<()> where }; } + config.custom = worker.configuration(); + config.keys = matches.values_of("key").unwrap_or_default().map(str::to_owned).collect(); if matches.is_present("dev") { config.keys.push("Alice".into()); diff --git a/polkadot/collator/src/lib.rs b/polkadot/collator/src/lib.rs index e9bd153a11c25..7e4b36cc48e3f 100644 --- a/polkadot/collator/src/lib.rs +++ b/polkadot/collator/src/lib.rs @@ -66,9 +66,9 @@ use std::time::{Duration, Instant}; use futures::{future, stream, Stream, Future, IntoFuture}; use client::BlockchainEvents; use polkadot_api::PolkadotApi; -use polkadot_primitives::{BlockId, SessionKey}; +use polkadot_primitives::{AccountId, BlockId, SessionKey}; use polkadot_primitives::parachain::{self, BlockData, DutyRoster, HeadData, ConsolidatedIngress, Message, Id as ParaId}; -use polkadot_cli::{ServiceComponents, Service}; +use polkadot_cli::{ServiceComponents, Service, CustomConfiguration}; use polkadot_cli::Worker; use tokio::timer::Deadline; @@ -104,6 +104,11 @@ pub trait RelayChainContext { fn unrouted_egress(&self, id: ParaId) -> Self::FutureEgress; } +fn key_to_account_id(key: &ed25519::Pair) -> AccountId { + let pubkey_bytes: [u8; 32] = key.public().into(); + pubkey_bytes.into() +} + /// Collate the necessary ingress queue using the given context. pub fn collate_ingress<'a, R>(relay_context: R) -> impl Future + 'a @@ -164,11 +169,10 @@ pub fn collate<'a, R, P>( let block_data_hash = block_data.hash(); let signature = key.sign(&block_data_hash.0[..]).into(); - let pubkey_bytes: [u8; 32] = key.public().into(); let receipt = parachain::CandidateReceipt { parachain_index: local_id, - collator: pubkey_bytes.into(), + collator: key_to_account_id(&*key), signature, head_data, balance_uploads: Vec::new(), @@ -214,6 +218,15 @@ impl Worker for CollationNode where type Work = Box + Send>; type Exit = E; + fn configuration(&self) -> CustomConfiguration { + let mut config = CustomConfiguration::default(); + config.collating_for = Some(( + key_to_account_id(&*self.key), + self.para_id.clone(), + )); + config + } + fn exit_only(self) -> Self::Exit { self.exit } diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index 832407f4b8367..abbe7f99059ea 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -251,6 +251,7 @@ fn send_polkadot_message(ctx: &mut Context, to: PeerId, message: Message) /// Polkadot protocol attachment for substrate. pub struct PolkadotProtocol { peers: HashMap, + collating_for: Option<(AccountId, ParaId)>, consensus_gossip: ConsensusGossip, collators: CollatorPool, validators: HashMap, @@ -261,19 +262,14 @@ pub struct PolkadotProtocol { next_req_id: u64, } -impl Default for PolkadotProtocol { - fn default() -> Self { - Self::new() - } -} - impl PolkadotProtocol { /// Instantiate a polkadot protocol handler. - pub fn new() -> Self { + pub fn new(collating_for: Option<(AccountId, ParaId)>) -> Self { PolkadotProtocol { peers: HashMap::new(), consensus_gossip: ConsensusGossip::new(), collators: CollatorPool::new(), + collating_for, validators: HashMap::new(), local_collations: LocalCollations::new(), live_consensus: None, @@ -471,7 +467,7 @@ impl PolkadotProtocol { impl Specialization for PolkadotProtocol { fn status(&self) -> Vec { - Status { collating_for: None }.encode() + Status { collating_for: self.collating_for.clone() }.encode() } fn on_connect(&mut self, ctx: &mut Context, peer_id: PeerId, status: FullStatus) { diff --git a/polkadot/network/src/tests.rs b/polkadot/network/src/tests.rs index ab95f7969d04b..021ea739b1b7f 100644 --- a/polkadot/network/src/tests.rs +++ b/polkadot/network/src/tests.rs @@ -99,7 +99,7 @@ fn on_message(protocol: &mut PolkadotProtocol, ctx: &mut TestContext, from: Peer #[test] fn sends_session_key() { - let mut protocol = PolkadotProtocol::new(); + let mut protocol = PolkadotProtocol::new(None); let peer_a = 1; let peer_b = 2; @@ -131,7 +131,7 @@ fn sends_session_key() { #[test] fn fetches_from_those_with_knowledge() { - let mut protocol = PolkadotProtocol::new(); + let mut protocol = PolkadotProtocol::new(None); let peer_a = 1; let peer_b = 2; @@ -208,7 +208,7 @@ fn fetches_from_those_with_knowledge() { #[test] fn remove_bad_collator() { - let mut protocol = PolkadotProtocol::new(); + let mut protocol = PolkadotProtocol::new(None); let peer_id = 1; let account_id = [2; 32].into(); diff --git a/polkadot/service/src/lib.rs b/polkadot/service/src/lib.rs index 45be076dc1c19..6d2c4aee157b8 100644 --- a/polkadot/service/src/lib.rs +++ b/polkadot/service/src/lib.rs @@ -46,13 +46,14 @@ use std::collections::HashMap; use codec::Encode; use transaction_pool::TransactionPool; use polkadot_api::{PolkadotApi, light::RemotePolkadotApiWrapper}; -use polkadot_primitives::{Block, BlockId, Hash}; +use polkadot_primitives::{parachain, AccountId, Block, BlockId, Hash}; use polkadot_runtime::GenesisConfig; use client::Client; use polkadot_network::{PolkadotProtocol, consensus::ConsensusNetwork}; use tokio::runtime::TaskExecutor; +use service::FactoryFullConfiguration; -pub use service::{Configuration, Roles, PruningMode, ExtrinsicPoolOptions, +pub use service::{Roles, PruningMode, ExtrinsicPoolOptions, ErrorKind, Error, ComponentBlock, LightComponents, FullComponents}; pub use client::ExecutionStrategy; @@ -87,6 +88,17 @@ impl Components for service::FullComponents { type Backend = service::FullBackend; } +/// All configuration for the polkadot node. +pub type Configuration = FactoryFullConfiguration; + +/// Polkadot-specific configuration. +#[derive(Default)] +pub struct CustomConfiguration { + /// Set to `Some` with a collator `AccountId` and desired parachain + /// if the network protocol should be started in collator mode. + pub collating_for: Option<(AccountId, parachain::Id)>, +} + /// Polkadot config for the substrate service. pub struct Factory; @@ -105,6 +117,7 @@ impl service::ServiceFactory for Factory { RemotePolkadotApiWrapper, service::LightExecutor> >; type Genesis = GenesisConfig; + type Configuration = CustomConfiguration; const NETWORK_PROTOCOL_ID: network::ProtocolId = ::polkadot_network::DOT_PROTOCOL_ID; @@ -129,6 +142,15 @@ impl service::ServiceFactory for Factory { imports_external_transactions: false, }) } + + fn build_network_protocol(config: &Configuration) + -> Result + { + if let Some((_, ref para_id)) = config.custom.collating_for { + info!("Starting network in Collator mode for parachain {:?}", para_id); + } + Ok(PolkadotProtocol::new(config.custom.collating_for)) + } } /// Polkadot service. @@ -155,7 +177,7 @@ impl Service { } /// Creates light client and register protocol with the network service -pub fn new_light(config: Configuration, executor: TaskExecutor) +pub fn new_light(config: Configuration, executor: TaskExecutor) -> Result>, Error> { let service = service::Service::>::new(config, executor)?; @@ -170,7 +192,7 @@ pub fn new_light(config: Configuration, executor: TaskExecutor) } /// Creates full client and register protocol with the network service -pub fn new_full(config: Configuration, executor: TaskExecutor) +pub fn new_full(config: Configuration, executor: TaskExecutor) -> Result>, Error> { let is_validator = (config.roles & Roles::AUTHORITY) == Roles::AUTHORITY; @@ -207,7 +229,7 @@ pub fn new_full(config: Configuration, executor: TaskExecutor) } /// Creates bare client without any networking. -pub fn new_client(config: Configuration) +pub fn new_client(config: Configuration) -> Result>>, Error> { service::new_client::(config) diff --git a/substrate/service/src/components.rs b/substrate/service/src/components.rs index f6d6dd35642cc..7c6086b3f9d11 100644 --- a/substrate/service/src/components.rs +++ b/substrate/service/src/components.rs @@ -105,7 +105,7 @@ pub trait ServiceFactory { /// Block type. type Block: BlockT; /// Network protocol extensions. - type NetworkProtocol: network::specialization::Specialization + Default; + type NetworkProtocol: network::specialization::Specialization; /// Chain runtime. type RuntimeDispatch: NativeExecutionDispatch + Send + Sync + 'static; /// Extrinsic pool type for the full client.