From cfa2d967535805ccb8661c880527653de3a1db1a Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 20 Aug 2018 20:04:22 +0200 Subject: [PATCH 1/5] more accurate consensus superseding logic --- Cargo.lock | 12 ++-- substrate/bft/src/lib.rs | 101 ++++++++++++++++++++------------- substrate/client/src/client.rs | 4 +- 3 files changed, 71 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6fafdb2b69d3c..8b04d38ccd992 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1918,7 +1918,7 @@ dependencies = [ "lazy_static 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "libc 0.2.41 (registry+https://github.com/rust-lang/crates.io-index)", "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", - "rand 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.3.22 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -1992,7 +1992,7 @@ dependencies = [ [[package]] name = "rhododendron" -version = "0.3.2" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "error-chain 0.12.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -2345,7 +2345,7 @@ dependencies = [ "futures 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", - "rhododendron 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "rhododendron 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "substrate-codec 0.1.0", "substrate-executor 0.1.0", "substrate-keyring 0.1.0", @@ -2529,7 +2529,7 @@ dependencies = [ name = "substrate-misbehavior-check" version = "0.1.0" dependencies = [ - "rhododendron 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "rhododendron 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "substrate-bft 0.1.0", "substrate-codec 0.1.0", "substrate-keyring 0.1.0", @@ -3004,7 +3004,7 @@ name = "substrate-test-client" version = "0.1.0" dependencies = [ "hashdb 0.2.1 (git+https://github.com/paritytech/parity-common)", - "rhododendron 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "rhododendron 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)", "substrate-bft 0.1.0", "substrate-client 0.1.0", "substrate-codec 0.1.0", @@ -3899,7 +3899,7 @@ dependencies = [ "checksum regex-syntax 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "747ba3b235651f6e2f67dfa8bcdcd073ddb7c243cb21c442fc12395dfcac212d" "checksum relay 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1576e382688d7e9deecea24417e350d3062d97e32e45d70b1cde65994ff1489a" "checksum remove_dir_all 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "3488ba1b9a2084d38645c4c08276a1752dcbf2c7130d74f1569681ad5d2799c5" -"checksum rhododendron 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "289a6395497f70b8076bf5b9c223e1dc5c0a77619d0a75124f7d4c728d09d2d8" +"checksum rhododendron 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "e20523445e693f394c0e487113ae656071311c9ee4c1e914441bece8c929b21d" "checksum ring 0.12.1 (registry+https://github.com/rust-lang/crates.io-index)" = "6f7d28b30a72c01b458428e0ae988d4149c20d902346902be881e3edc4bb325c" "checksum rlp 0.2.1 (git+https://github.com/paritytech/parity-common)" = "" "checksum rlp 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "89db7f8dfdd5eb7ab3ac3ece7a07fd273a680b4b224cb231181280e8996f9f0b" diff --git a/substrate/bft/src/lib.rs b/substrate/bft/src/lib.rs index 882ccdffee405..237e131361918 100644 --- a/substrate/bft/src/lib.rs +++ b/substrate/bft/src/lib.rs @@ -53,9 +53,8 @@ extern crate futures; #[macro_use] extern crate error_chain; -use std::mem; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::{Instant, Duration}; use codec::Encode; @@ -73,6 +72,14 @@ use parking_lot::Mutex; pub use rhododendron::{InputStreamConcluded, AdvanceRoundReason}; pub use error::{Error, ErrorKind}; +// statuses for an agreement +mod status { + pub const LIVE: usize = 0; + pub const CANCELED: usize = 1; + pub const BAD: usize = 2; + pub const GOOD: usize = 3; +} + /// Messages over the proposal. /// Each message carries an associated round number. pub type Message = rhododendron::Message::Hash>; @@ -193,7 +200,7 @@ pub trait Proposer { /// Block import trait. pub trait BlockImport { /// Import a block alongside its corresponding justification. - fn import_block(&self, block: B, justification: Justification, authorities: &[AuthorityId]); + fn import_block(&self, block: B, justification: Justification, authorities: &[AuthorityId]) -> bool; } /// Trait for getting the authorities at a given block. @@ -336,7 +343,7 @@ pub struct BftFuture where OutSink: Sink, SinkError=P::Error>, { inner: rhododendron::Agreement, InStream, OutSink>, - cancel: Arc, + status: Arc, send_task: Option>, import: Arc, } @@ -360,7 +367,7 @@ impl Future for BftFuture Future for BftFuture Drop for BftFuture // TODO: have a trait member to pass misbehavior reports into. let misbehavior = self.inner.drain_misbehavior().collect::>(); self.inner.context().proposer.import_misbehavior(misbehavior); - self.cancel.store(true, Ordering::Release); } } struct AgreementHandle { - cancel: Arc, + status: Arc, task: Option>, } impl AgreementHandle { - fn is_live(&self) -> bool { - !self.cancel.load(Ordering::Acquire) + fn status(&self) -> usize { + self.status.load(Ordering::Acquire) } } @@ -421,7 +439,7 @@ impl Drop for AgreementHandle { // if this fails, the task is definitely not live anyway. if let Ok(task) = task.wait() { - self.cancel.store(true, Ordering::Release); + self.status.compare_and_swap(status::LIVE, status::CANCELED, Ordering::SeqCst); task.notify(); } } @@ -486,7 +504,12 @@ impl BftService where { let hash = header.hash(); - if self.last_agreement().map_or(false, |last| last.parent_hash == hash) { + + let mut live_agreement = self.live_agreement.lock(); + let can_build = live_agreement.as_ref() + .map_or(true, |x| self.can_build_on_inner(header, x)); + + if !can_build { return Ok(None) } @@ -539,21 +562,18 @@ impl BftService } } - let cancel = Arc::new(AtomicBool::new(false)); + let status = Arc::new(AtomicUsize::new(status::LIVE)); let (tx, rx) = oneshot::channel(); // cancel current agreement. - // defers drop of live to the end. - let _preempted_consensus = { - mem::replace(&mut *self.live_agreement.lock(), Some((hash, AgreementHandle { - task: Some(rx), - cancel: cancel.clone(), - }))) - }; + *live_agreement = Some((hash, AgreementHandle { + task: Some(rx), + status: status.clone(), + })); Ok(Some(BftFuture { inner: agreement, - cancel: cancel, + status: status, send_task: Some(tx), import: self.client.clone(), })) @@ -564,21 +584,25 @@ impl BftService self.live_agreement.lock().take(); } - /// Get current agreement parent hash if any. - pub fn last_agreement(&self) -> Option> { - self.live_agreement.lock() - .as_ref() - .map(|&(ref h, ref handle)| LastAgreement { parent_hash: h.clone(), live: handle.is_live() }) + /// Whether we can build using the given header. + pub fn can_build_on(&self, header: &B::Header) -> bool { + self.live_agreement.lock().as_ref() + .map_or(true, |x| self.can_build_on_inner(header, x)) } -} + /// Get a reference to the underyling client. + pub fn client(&self) -> &I { &*self.client } -/// Struct representing the last agreement the service has processed. -pub struct LastAgreement { - /// The parent hash that agreement was building on. - pub parent_hash: H, - /// Whether that agreement was live. - pub live: bool, + fn can_build_on_inner(&self, header: &B::Header, live: &(B::Hash, AgreementHandle)) -> bool { + let hash = header.hash(); + let &(ref live_hash, ref handle) = live; + match handle.status() { + status::BAD => true, // if the block was bad, can always go on. + status::LIVE => hash != *live_hash, // can supersede with better block. + status::GOOD => *header.parent_hash() == *live_hash, // can follow with next block. + _ => false, // canceled won't appear since we overwrite the handle before returning. + } + } } /// Given a total number of authorities, yield the maximum faulty that would be allowed. @@ -758,8 +782,9 @@ mod tests { } impl BlockImport for FakeClient { - fn import_block(&self, block: TestBlock, _justification: Justification, _authorities: &[AuthorityId]) { - assert!(self.imported_heights.lock().insert(block.header.number)) + fn import_block(&self, block: TestBlock, _justification: Justification, _authorities: &[AuthorityId]) -> bool { + assert!(self.imported_heights.lock().insert(block.header.number)); + true } } diff --git a/substrate/client/src/client.rs b/substrate/client/src/client.rs index 2bd8e9f8a6863..2bd53d1a881ff 100644 --- a/substrate/client/src/client.rs +++ b/substrate/client/src/client.rs @@ -518,7 +518,7 @@ impl bft::BlockImport for Client block: Block, justification: ::bft::Justification, authorities: &[AuthorityId] - ) { + ) -> bool { let (header, extrinsics) = block.deconstruct(); let justified_header = JustifiedHeader { header: header, @@ -526,7 +526,7 @@ impl bft::BlockImport for Client authorities: authorities.to_vec(), }; - let _ = self.import_block(BlockOrigin::ConsensusBroadcast, justified_header, Some(extrinsics)); + self.import_block(BlockOrigin::ConsensusBroadcast, justified_header, Some(extrinsics)).is_ok() } } From e2e5cb498fdcb6a2a355e29dd23c125982b041e7 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 22 Aug 2018 11:22:12 +0200 Subject: [PATCH 2/5] mild revision to `can_build_on` logic --- substrate/bft/src/lib.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/substrate/bft/src/lib.rs b/substrate/bft/src/lib.rs index 237e131361918..7c27dab8fd97b 100644 --- a/substrate/bft/src/lib.rs +++ b/substrate/bft/src/lib.rs @@ -597,9 +597,8 @@ impl BftService let hash = header.hash(); let &(ref live_hash, ref handle) = live; match handle.status() { - status::BAD => true, // if the block was bad, can always go on. - status::LIVE => hash != *live_hash, // can supersede with better block. - status::GOOD => *header.parent_hash() == *live_hash, // can follow with next block. + _ if *header.parent_hash() == *live_hash => true, // can always follow with next block. + status::BAD => hash == *live_hash, // bad block can be re-agreed on. _ => false, // canceled won't appear since we overwrite the handle before returning. } } From 298163a573bb17a2cd2f8e610c74394590e15984 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 22 Aug 2018 10:57:12 +0200 Subject: [PATCH 3/5] fix a deadlock when spawning agreement as non-authority --- substrate/bft/src/lib.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/substrate/bft/src/lib.rs b/substrate/bft/src/lib.rs index 7c27dab8fd97b..e1f569a378e16 100644 --- a/substrate/bft/src/lib.rs +++ b/substrate/bft/src/lib.rs @@ -517,14 +517,15 @@ impl BftService let n = authorities.len(); let max_faulty = max_faulty_of(n); + trace!(target: "bft", "Initiating agreement on top of #{}, {:?}", header.number(), hash); trace!(target: "bft", "max_faulty_of({})={}", n, max_faulty); let local_id = self.local_id(); if !authorities.contains(&local_id) { // cancel current agreement - self.live_agreement.lock().take(); - Err(From::from(ErrorKind::InvalidAuthority(local_id)))?; + live_agreement.take(); + Err(ErrorKind::InvalidAuthority(local_id).into())?; } let (proposer, input, output) = self.factory.init(header, &authorities, self.key.clone())?; From 623aab4f322584eb475610ed037ba7bbbc2157e3 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 22 Aug 2018 17:56:29 +0200 Subject: [PATCH 4/5] dropping BFT future before poll doesn't lead to service deadlock --- substrate/bft/src/lib.rs | 38 ++++++++++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/substrate/bft/src/lib.rs b/substrate/bft/src/lib.rs index e1f569a378e16..3ab8118c2441b 100644 --- a/substrate/bft/src/lib.rs +++ b/substrate/bft/src/lib.rs @@ -344,7 +344,7 @@ pub struct BftFuture where { inner: rhododendron::Agreement, InStream, OutSink>, status: Arc, - send_task: Option>, + send_task: Option>>, import: Arc, } @@ -363,7 +363,7 @@ impl Future for BftFuture ::futures::Poll<(), ()> { // send the task to the bft service so this can be cancelled. if let Some(sender) = self.send_task.take() { - let _ = sender.send(task::current()); + let _ = sender.send(Some(task::current())); } // service has canceled the future. bail @@ -413,6 +413,10 @@ impl Drop for BftFuture OutSink: Sink, SinkError=P::Error>, { fn drop(&mut self) { + if let Some(sender) = self.send_task.take() { + let _ = sender.send(None); + } + // TODO: have a trait member to pass misbehavior reports into. let misbehavior = self.inner.drain_misbehavior().collect::>(); self.inner.context().proposer.import_misbehavior(misbehavior); @@ -421,7 +425,7 @@ impl Drop for BftFuture struct AgreementHandle { status: Arc, - task: Option>, + task: Option>>, } impl AgreementHandle { @@ -438,7 +442,7 @@ impl Drop for AgreementHandle { }; // if this fails, the task is definitely not live anyway. - if let Ok(task) = task.wait() { + if let Ok(Some(task)) = task.wait() { self.status.compare_and_swap(status::LIVE, status::CANCELED, Ordering::SeqCst); task.notify(); } @@ -1066,4 +1070,30 @@ mod tests { assert!(false); } } + + #[test] + fn drop_bft_future_does_not_deadlock() { + let client = FakeClient { + authorities: vec![ + Keyring::One.to_raw_public().into(), + Keyring::Two.to_raw_public().into(), + Keyring::Alice.to_raw_public().into(), + Keyring::Eve.to_raw_public().into(), + ], + imported_heights: Mutex::new(HashSet::new()), + }; + + let service = make_service(client); + + let first = from_block_number(2); + let first_hash = first.hash(); + + let mut second = from_block_number(3); + second.parent_hash = first_hash; + let second_hash = second.hash(); + + let _ = service.build_upon(&first).unwrap(); + assert!(service.live_agreement.lock().as_ref().unwrap().0 == first_hash); + service.live_agreement.lock().take(); + } } From 6c6cae1647026f9a22a914014bbfc7715be31799 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 28 Aug 2018 12:37:09 +0200 Subject: [PATCH 5/5] push cancel to BFTfuture rather than waiting for task --- substrate/bft/src/lib.rs | 77 +++++++++++++++++----------------------- 1 file changed, 33 insertions(+), 44 deletions(-) diff --git a/substrate/bft/src/lib.rs b/substrate/bft/src/lib.rs index 3ab8118c2441b..5103cedc3dd3d 100644 --- a/substrate/bft/src/lib.rs +++ b/substrate/bft/src/lib.rs @@ -47,7 +47,6 @@ extern crate rhododendron; #[macro_use] extern crate log; -#[macro_use] extern crate futures; #[macro_use] @@ -64,7 +63,7 @@ use runtime_primitives::traits::{Block, Header}; use runtime_primitives::bft::{Message as PrimitiveMessage, Action as PrimitiveAction, Justification as PrimitiveJustification}; use primitives::AuthorityId; -use futures::{task, Async, Stream, Sink, Future, IntoFuture}; +use futures::{Async, Stream, Sink, Future, IntoFuture}; use futures::sync::oneshot; use tokio::timer::Delay; use parking_lot::Mutex; @@ -75,9 +74,8 @@ pub use error::{Error, ErrorKind}; // statuses for an agreement mod status { pub const LIVE: usize = 0; - pub const CANCELED: usize = 1; - pub const BAD: usize = 2; - pub const GOOD: usize = 3; + pub const BAD: usize = 1; + pub const GOOD: usize = 2; } /// Messages over the proposal. @@ -344,7 +342,7 @@ pub struct BftFuture where { inner: rhododendron::Agreement, InStream, OutSink>, status: Arc, - send_task: Option>>, + cancel: oneshot::Receiver<()>, import: Arc, } @@ -361,18 +359,19 @@ impl Future for BftFuture ::futures::Poll<(), ()> { - // send the task to the bft service so this can be cancelled. - if let Some(sender) = self.send_task.take() { - let _ = sender.send(Some(task::current())); - } - // service has canceled the future. bail - if self.status.load(Ordering::Acquire) == status::CANCELED { - return Ok(Async::Ready(())) - } + let cancel = match self.cancel.poll() { + Ok(Async::Ready(())) | Err(_) => true, + Ok(Async::NotReady) => false, + }; // TODO: handle and log this error in a way which isn't noisy on exit. - let committed = try_ready!(self.inner.poll().map_err(|_| ())); + let committed = match self.inner.poll().map_err(|_| ()) { + Ok(Async::Ready(x)) => x, + Ok(Async::NotReady) => + return Ok(if cancel { Async::Ready(()) } else { Async::NotReady }), + Err(()) => return Err(()), + }; // if something was committed, the round leader must have proposed. self.inner.context().proposer.on_round_end(committed.round_number, true); @@ -397,6 +396,9 @@ impl Future for BftFuture Drop for BftFuture OutSink: Sink, SinkError=P::Error>, { fn drop(&mut self) { - if let Some(sender) = self.send_task.take() { - let _ = sender.send(None); - } - // TODO: have a trait member to pass misbehavior reports into. let misbehavior = self.inner.drain_misbehavior().collect::>(); self.inner.context().proposer.import_misbehavior(misbehavior); @@ -425,7 +423,7 @@ impl Drop for BftFuture struct AgreementHandle { status: Arc, - task: Option>>, + send_cancel: Option>, } impl AgreementHandle { @@ -436,15 +434,8 @@ impl AgreementHandle { impl Drop for AgreementHandle { fn drop(&mut self) { - let task = match self.task.take() { - Some(t) => t, - None => return, - }; - - // if this fails, the task is definitely not live anyway. - if let Ok(Some(task)) = task.wait() { - self.status.compare_and_swap(status::LIVE, status::CANCELED, Ordering::SeqCst); - task.notify(); + if let Some(sender) = self.send_cancel.take() { + let _ = sender.send(()); } } } @@ -557,8 +548,11 @@ impl BftService trace!(target: "bft", "Round cache: {:?}", &*cache); if cache.hash.as_ref() == Some(&hash) { trace!(target: "bft", "Fast-forwarding to round {}", cache.start_round); - agreement.fast_forward(cache.start_round); + let start_round = cache.start_round; cache.start_round += 1; + + drop(cache); + agreement.fast_forward(start_round); } else { *cache = RoundCache { hash: Some(hash.clone()), @@ -572,14 +566,14 @@ impl BftService // cancel current agreement. *live_agreement = Some((hash, AgreementHandle { - task: Some(rx), + send_cancel: Some(tx), status: status.clone(), })); Ok(Some(BftFuture { inner: agreement, status: status, - send_task: Some(tx), + cancel: rx, import: self.client.clone(), })) } @@ -774,7 +768,6 @@ mod tests { use runtime_primitives::testing::{Block as GenericTestBlock, Header as TestHeader}; use primitives::H256; use self::keyring::Keyring; - use tokio::executor::current_thread; extern crate substrate_keyring as keyring; @@ -917,21 +910,18 @@ mod tests { second.parent_hash = first_hash; let second_hash = second.hash(); - let bft = service.build_upon(&first).unwrap(); + let mut first_bft = service.build_upon(&first).unwrap().unwrap(); assert!(service.live_agreement.lock().as_ref().unwrap().0 == first_hash); - let mut core = current_thread::CurrentThread::new(); - - // turn the core so the future gets polled and sends its task to the - // service. otherwise it deadlocks. - core.spawn(bft.unwrap()); - core.run_timeout(::std::time::Duration::from_millis(100)).unwrap(); - let bft = service.build_upon(&second).unwrap(); + let _second_bft = service.build_upon(&second).unwrap(); assert!(service.live_agreement.lock().as_ref().unwrap().0 != first_hash); assert!(service.live_agreement.lock().as_ref().unwrap().0 == second_hash); - core.spawn(bft.unwrap()); - core.run_timeout(::std::time::Duration::from_millis(100)).unwrap(); + // first_bft has been cancelled. need to swap out so we can check it. + let (_tx, mut rx) = oneshot::channel(); + ::std::mem::swap(&mut rx, &mut first_bft.cancel); + + assert!(rx.wait().is_ok()); } #[test] @@ -1090,7 +1080,6 @@ mod tests { let mut second = from_block_number(3); second.parent_hash = first_hash; - let second_hash = second.hash(); let _ = service.build_upon(&first).unwrap(); assert!(service.live_agreement.lock().as_ref().unwrap().0 == first_hash);