diff --git a/Cargo.toml b/Cargo.toml index 7737fd8f81b90..51e92e511d65a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ path = "polkadot/src/main.rs" [package] name = "polkadot" -version = "0.2.2" +version = "0.2.3" authors = ["Parity Technologies "] build = "build.rs" diff --git a/polkadot/consensus/src/lib.rs b/polkadot/consensus/src/lib.rs index 4aee785def5b2..4bef0fceafd93 100644 --- a/polkadot/consensus/src/lib.rs +++ b/polkadot/consensus/src/lib.rs @@ -258,8 +258,6 @@ impl bft::Environment for ProposerFactory ) -> Result<(Self::Proposer, Self::Input, Self::Output), Error> { use runtime_primitives::traits::{Hash as HashT, BlakeTwo256}; - const DELAY_UNTIL: Duration = Duration::from_millis(5000); - let parent_hash = parent_header.hash().into(); let id = BlockId::hash(parent_hash); @@ -290,9 +288,6 @@ impl bft::Environment for ProposerFactory self.parachain_empty_duration.clone(), ); - debug!(target: "bft", "Initialising consensus proposer. Refusing to evaluate for {:?} from now.", - DELAY_UNTIL); - let validation_para = match local_duty.validation { Chain::Relay => None, Chain::Parachain(id) => Some(id), @@ -315,7 +310,6 @@ impl bft::Environment for ProposerFactory client: self.client.clone(), dynamic_inclusion, local_key: sign_with, - minimum_delay: now + DELAY_UNTIL, parent_hash, parent_id: id, parent_number: parent_header.number, @@ -370,7 +364,6 @@ pub struct Proposer { client: Arc, dynamic_inclusion: DynamicInclusion, local_key: Arc, - minimum_delay: Instant, parent_hash: Hash, parent_id: BlockId, parent_number: BlockNumber, @@ -401,17 +394,10 @@ impl bft::Proposer for Proposer initial_included, ).unwrap_or_else(|| now + Duration::from_millis(1)); - let minimum_delay = if self.minimum_delay > now + ATTEMPT_PROPOSE_EVERY { - Some(Delay::new(self.minimum_delay)) - } else { - None - }; - let timing = ProposalTiming { attempt_propose: Interval::new(now + ATTEMPT_PROPOSE_EVERY, ATTEMPT_PROPOSE_EVERY), enough_candidates: Delay::new(enough_candidates), dynamic_inclusion: self.dynamic_inclusion.clone(), - minimum_delay, last_included: initial_included, }; @@ -484,11 +470,7 @@ impl bft::Proposer for Proposer // delay casting vote until able according to minimum block time, // timestamp delay, and count delay. // construct a future from the maximum of the two durations. - let max_delay = [timestamp_delay, count_delay, Some(self.minimum_delay)] - .iter() - .cloned() - .max() - .expect("iterator not empty; thus max returns `Some`; qed"); + let max_delay = ::std::cmp::max(timestamp_delay, count_delay); let temporary_delay = match max_delay { Some(duration) => future::Either::A( @@ -610,7 +592,6 @@ struct ProposalTiming { attempt_propose: Interval, dynamic_inclusion: DynamicInclusion, enough_candidates: Delay, - minimum_delay: Option, last_included: usize, } @@ -627,12 +608,6 @@ impl ProposalTiming { x.expect("timer still alive; intervals never end; qed"); } - if let Some(ref mut min) = self.minimum_delay { - try_ready!(min.poll().map_err(ErrorKind::Timer)); - } - - self.minimum_delay = None; // after this point, the future must have completed. - if included == self.last_included { return self.enough_candidates.poll().map_err(ErrorKind::Timer); } diff --git a/polkadot/consensus/src/service.rs b/polkadot/consensus/src/service.rs index c577a98e96979..ca7561bd4d818 100644 --- a/polkadot/consensus/src/service.rs +++ b/polkadot/consensus/src/service.rs @@ -38,7 +38,7 @@ use transaction_pool::TransactionPool; use tokio::executor::current_thread::TaskExecutor as LocalThreadHandle; use tokio::runtime::TaskExecutor as ThreadPoolHandle; use tokio::runtime::current_thread::Runtime as LocalRuntime; -use tokio::timer::Interval; +use tokio::timer::{Delay, Interval}; use super::{Network, Collators, ProposerFactory}; use error; @@ -49,8 +49,8 @@ const TIMER_INTERVAL_MS: u64 = 500; // spin up an instance of BFT agreement on the current thread's executor. // panics if there is no current thread executor. fn start_bft( - header: &Header, - bft_service: &BftService, + header: Header, + bft_service: Arc>, ) where F: bft::Environment + 'static, C: bft::BlockImport + bft::Authorities + 'static, @@ -58,14 +58,35 @@ fn start_bft( >::Error: ::std::fmt::Display + Into, >::Error: ::std::fmt::Display { + const DELAY_UNTIL: Duration = Duration::from_millis(5000); + let mut handle = LocalThreadHandle::current(); - match bft_service.build_upon(&header) { - Ok(Some(bft)) => if let Err(e) = handle.spawn_local(Box::new(bft)) { - debug!(target: "bft", "Couldn't initialize BFT agreement: {:?}", e); - }, - Ok(None) => {}, - Err(e) => warn!(target: "bft", "BFT agreement error: {}", e), - } + let work = Delay::new(Instant::now() + DELAY_UNTIL) + .then(move |res| { + if let Err(e) = res { + warn!(target: "bft", "Failed to force delay of consensus: {:?}", e); + } + + match bft_service.build_upon(&header) { + Ok(maybe_bft_work) => { + if maybe_bft_work.is_some() { + debug!(target: "bft", "Starting agreement. After forced delay for {:?}", + DELAY_UNTIL); + } + + maybe_bft_work + } + Err(e) => { + warn!(target: "bft", "BFT agreement error: {}", e); + None + } + } + }) + .map(|_| ()); + + if let Err(e) = handle.spawn_local(Box::new(work)) { + debug!(target: "bft", "Couldn't initialize BFT agreement: {:?}", e); + } } /// Consensus service. Starts working when created. @@ -113,7 +134,7 @@ impl Service { client.import_notification_stream().for_each(move |notification| { if notification.is_new_best { - start_bft(¬ification.header, &*bft_service); + start_bft(notification.header, bft_service.clone()); } Ok(()) }) @@ -139,9 +160,9 @@ impl Service { interval.map_err(|e| debug!("Timer error: {:?}", e)).for_each(move |_| { if let Ok(best_block) = c.best_block_header() { let hash = best_block.hash(); - if hash == prev_best { + if hash == prev_best && s.live_agreement() != Some(hash) { debug!("Starting consensus round after a timeout"); - start_bft(&best_block, &*s); + start_bft(best_block, s.clone()); } prev_best = hash; } diff --git a/substrate/bft/src/lib.rs b/substrate/bft/src/lib.rs index 28c26fe21ae98..c85002dbccfd1 100644 --- a/substrate/bft/src/lib.rs +++ b/substrate/bft/src/lib.rs @@ -248,6 +248,7 @@ impl> rhododendron::Context for BftInstance fn begin_round_timeout(&self, round: usize) -> Self::RoundTimeout { use std::time::{Instant, Duration}; + let round = round / 3; let round = ::std::cmp::min(63, round) as u32; let timeout = 1u64.checked_shl(round) .unwrap_or_else(u64::max_value)