diff --git a/src/lib.rs b/src/lib.rs index 512a92d8..5883026f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -100,7 +100,7 @@ pub trait Chain { /// even if that block is `base` itself. /// /// If `base` is unknown, return `None`. - fn best_chain_containing(&self, base: H) -> Option<(H, usize)>; + fn best_chain_containing(&self, base: H) -> Option<(H, u32)>; } /// An equivocation (double-vote) in a given round. @@ -117,6 +117,7 @@ pub struct Equivocation { } /// A protocol message or vote. +#[derive(Clone)] pub enum Message { /// A prevote message. Prevote(Prevote), @@ -126,6 +127,7 @@ pub enum Message { } /// A signed message. +#[derive(Clone)] pub struct SignedMessage { pub message: Message, pub signature: S, diff --git a/src/round.rs b/src/round.rs index 44b706b0..b3880f3e 100644 --- a/src/round.rs +++ b/src/round.rs @@ -176,18 +176,18 @@ impl VoteTracker { /// The prevote-GHOST block. - pub prevote_ghost: Option<(H, usize)>, + pub prevote_ghost: Option<(H, u32)>, /// The finalized block. - pub finalized: Option<(H, usize)>, + pub finalized: Option<(H, u32)>, /// The new round-estimate. - pub estimate: Option<(H, usize)>, + pub estimate: Option<(H, u32)>, /// Whether the round is completable. pub completable: bool, } impl State { // Genesis state. - pub fn genesis(genesis: (H, usize)) -> Self { + pub fn genesis(genesis: (H, u32)) -> Self { State { prevote_ghost: Some(genesis.clone()), finalized: Some(genesis.clone()), @@ -204,7 +204,7 @@ pub struct RoundParams { /// Actors and weights in the round. pub voters: HashMap, /// The base block to build on. - pub base: (H, usize), + pub base: (H, u32), } /// Stores data for a round. @@ -217,9 +217,9 @@ pub struct Round { faulty_weight: usize, total_weight: usize, bitfield_context: BitfieldContext, - prevote_ghost: Option<(H, usize)>, // current memoized prevote-GHOST block - finalized: Option<(H, usize)>, // best finalized block in this round. - estimate: Option<(H, usize)>, // current memoized round-estimate + prevote_ghost: Option<(H, u32)>, // current memoized prevote-GHOST block + finalized: Option<(H, u32)>, // best finalized block in this round. + estimate: Option<(H, u32)>, // current memoized round-estimate completable: bool, // whether the round is completable } @@ -289,7 +289,7 @@ impl Round where graph.insert( vote.target_hash.clone(), - vote.target_number as usize, + vote.target_number, vote_weight, chain ) @@ -353,7 +353,7 @@ impl Round where graph.insert( vote.target_hash.clone(), - vote.target_number as usize, + vote.target_number, vote_weight, chain ) @@ -469,12 +469,12 @@ impl Round where /// /// Returns `None` when new new blocks could have been finalized in this round, /// according to our estimate. - pub fn estimate(&self) -> Option<&(H, usize)> { + pub fn estimate(&self) -> Option<&(H, u32)> { self.estimate.as_ref() } /// Fetch the most recently finalized block. - pub fn finalized(&self) -> Option<&(H, usize)> { + pub fn finalized(&self) -> Option<&(H, u32)> { self.finalized.as_ref() } @@ -493,7 +493,7 @@ impl Round where } /// Return the round base. - pub fn base(&self) -> (H, usize) { + pub fn base(&self) -> (H, u32) { self.graph.base() } } diff --git a/src/testing.rs b/src/testing.rs index 7587c0ac..3f59faa7 100644 --- a/src/testing.rs +++ b/src/testing.rs @@ -17,27 +17,28 @@ //! Helpers for testing use std::collections::HashMap; +use std::sync::Arc; use round::State as RoundState; use voter::RoundData; use tokio::timer::Delay; use parking_lot::Mutex; use futures::prelude::*; -use futures::sync::mpsc; +use futures::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; use super::{Chain, Error, Equivocation, Message, Prevote, Precommit, SignedMessage}; pub const GENESIS_HASH: &str = "genesis"; const NULL_HASH: &str = "NULL"; struct BlockRecord { - number: usize, + number: u32, parent: &'static str, } pub struct DummyChain { inner: HashMap<&'static str, BlockRecord>, leaves: Vec<&'static str>, - finalized: (&'static str, usize), + finalized: (&'static str, u32), } impl DummyChain { @@ -65,7 +66,7 @@ impl DummyChain { for (i, descendent) in blocks.iter().enumerate() { self.inner.insert(descendent, BlockRecord { - number: base_number + i, + number: base_number + i as u32, parent, }); @@ -82,11 +83,11 @@ impl DummyChain { self.leaves.insert(insertion_index, new_leaf); } - pub fn number(&self, hash: &'static str) -> usize { + pub fn number(&self, hash: &'static str) -> u32 { self.inner.get(hash).unwrap().number } - pub fn last_finalized(&self) -> (&'static str, usize) { + pub fn last_finalized(&self) -> (&'static str, u32) { self.finalized.clone() } } @@ -110,7 +111,7 @@ impl Chain<&'static str> for DummyChain { Ok(ancestry) } - fn best_chain_containing(&self, base: &'static str) -> Option<(&'static str, usize)> { + fn best_chain_containing(&self, base: &'static str) -> Option<(&'static str, u32)> { let base_number = self.inner.get(base)?.number; for leaf in &self.leaves { @@ -132,24 +133,26 @@ impl Chain<&'static str> for DummyChain { } #[derive(Hash, Debug, Clone, Copy, PartialEq, Eq)] -pub struct Id(pub usize); +pub struct Id(pub u32); #[derive(Debug, Clone, PartialEq, Eq)] -pub struct Signature(usize); +pub struct Signature(u32); pub struct Environment { chain: Mutex, voters: HashMap, local_id: Id, - listeners: Mutex>>, + network: Network, + listeners: Mutex>>, } impl Environment { - pub fn new(voters: HashMap, local_id: Id) -> Self { + pub fn new(voters: HashMap, network: Network, local_id: Id) -> Self { Environment { chain: Mutex::new(DummyChain::new()), voters, local_id, + network, listeners: Mutex::new(Vec::new()), } } @@ -160,7 +163,7 @@ impl Environment { } /// Stream of finalized blocks. - pub fn finalized_stream(&self) -> mpsc::UnboundedReceiver<(&'static str, usize)> { + pub fn finalized_stream(&self) -> UnboundedReceiver<(&'static str, u32)> { let (tx, rx) = mpsc::unbounded(); self.listeners.lock().push(tx); rx @@ -172,7 +175,7 @@ impl Chain<&'static str> for Environment { self.chain.lock().ancestry(base, block) } - fn best_chain_containing(&self, base: &'static str) -> Option<(&'static str, usize)> { + fn best_chain_containing(&self, base: &'static str) -> Option<(&'static str, u32)> { self.chain.lock().best_chain_containing(base) } } @@ -185,12 +188,12 @@ impl ::voter::Environment<&'static str> for Environment { type Out = Box,SinkError=Error> + Send + 'static>; type Error = Error; - fn round_data(&self, _round: u64) -> RoundData { + fn round_data(&self, round: u64) -> RoundData { use std::time::{Instant, Duration}; const GOSSIP_DURATION: Duration = Duration::from_millis(500); let now = Instant::now(); - let (incoming, outgoing) = make_comms(self.local_id); + let (incoming, outgoing) = self.network.make_round_comms(round, self.local_id); RoundData { prevote_timer: Box::new(Delay::new(now + GOSSIP_DURATION) .map_err(|_| panic!("Timer failed"))), @@ -207,7 +210,7 @@ impl ::voter::Environment<&'static str> for Environment { fn finalize_block(&self, hash: &'static str, number: u32) { let mut chain = self.chain.lock(); - if number as usize <= chain.finalized.1 { panic!("Attempted to finalize backwards") } + if number as u32 <= chain.finalized.1 { panic!("Attempted to finalize backwards") } assert!(chain.ancestry(chain.finalized.0, hash).is_ok(), "Safety violation: reverting finalized block."); chain.finalized = (hash, number as _); self.listeners.lock().retain(|s| s.unbounded_send((hash, number as _)).is_ok()); @@ -222,22 +225,110 @@ impl ::voter::Environment<&'static str> for Environment { } } -// TODO: replace this with full-fledged dummy network. -fn make_comms(local_id: Id) -> ( - impl Stream,Error=Error>, - impl Sink,SinkError=Error> -) -{ - let (tx, rx) = mpsc::unbounded(); - let tx = tx - .sink_map_err(|e| panic!("Error sending messages: {:?}", e)) - .with(move |message| Ok(SignedMessage { - message, - signature: Signature(local_id.0), - id: local_id, - })); - - let rx = rx.map_err(|e| panic!("Error receiving messages: {:?}", e)); - - (rx, tx) +// p2p network data for a round. +struct RoundNetwork { + receiver: UnboundedReceiver>, + raw_sender: UnboundedSender>, + senders: Vec>>, + history: Vec>, +} + +impl RoundNetwork { + fn new() -> Self { + let (tx, rx) = mpsc::unbounded(); + RoundNetwork { + receiver: rx, + raw_sender: tx, + senders: Vec::new(), + history: Vec::new(), + } + } + + // add a node to the network for a round. + fn add_node(&mut self, id: Id) -> ( + impl Stream,Error=Error>, + impl Sink,SinkError=Error> + ) { + let (tx, rx) = mpsc::unbounded(); + let messages_out = self.raw_sender.clone() + .sink_map_err(|e| panic!("Error sending messages: {:?}", e)) + .with(move |message| Ok(SignedMessage { + message, + signature: Signature(id.0), + id: id, + })); + + // get history to the node. + for prior_message in self.history.iter().cloned() { + let _ = tx.unbounded_send(prior_message); + } + + self.senders.push(tx); + let rx = rx.map_err(|e| panic!("Error receiving messages: {:?}", e)); + + (rx, messages_out) + } + + // do routing work + fn route(&mut self) -> Poll<(), ()> { + loop { + match self.receiver.poll().map_err(|e| panic!("Error routing messages: {:?}", e))? { + Async::NotReady => return Ok(Async::NotReady), + Async::Ready(None) => return Ok(Async::Ready(())), + Async::Ready(Some(item)) => { + self.history.push(item.clone()); + for sender in &self.senders { + let _ = sender.unbounded_send(item.clone()); + } + } + } + } + } +} + +/// Make a test network. +/// Give the network future to node environments and spawn the routing task +/// to run. +pub fn make_network() -> (Network, NetworkRouting) { + let rounds = Arc::new(Mutex::new(HashMap::new())); + ( + Network { rounds: rounds.clone() }, + NetworkRouting { rounds } + ) +} + +/// A test network. Instantiate this with `make_network`, +#[derive(Clone)] +pub struct Network { + rounds: Arc>>, +} + +impl Network { + fn make_round_comms(&self, round_number: u64, node_id: Id) -> ( + impl Stream,Error=Error>, + impl Sink,SinkError=Error> + ) { + let mut rounds = self.rounds.lock(); + rounds.entry(round_number).or_insert_with(RoundNetwork::new).add_node(node_id) + } +} + +/// the network routing task. +pub struct NetworkRouting { + rounds: Arc>>, +} + +impl Future for NetworkRouting { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + let mut rounds = self.rounds.lock(); + rounds.retain(|_, round| match round.route() { + Ok(Async::Ready(())) | Err(()) => false, + Ok(Async::NotReady) => true, + }); + + Ok(Async::NotReady) + } } diff --git a/src/vote_graph.rs b/src/vote_graph.rs index 051aeffe..f1e9ec8f 100644 --- a/src/vote_graph.rs +++ b/src/vote_graph.rs @@ -27,7 +27,7 @@ use super::{Chain, Error}; #[derive(Debug)] struct Entry { - number: usize, + number: u32, // ancestor hashes in reverse order, e.g. ancestors[0] is the parent // and the last entry is the hash of the parent vote-node. ancestors: Vec, @@ -38,17 +38,17 @@ struct Entry { impl Entry { // whether the given hash, number pair is a direct ancestor of this node. // `None` signifies that the graph must be traversed further back. - fn in_direct_ancestry(&self, hash: &H, number: usize) -> Option { + fn in_direct_ancestry(&self, hash: &H, number: u32) -> Option { self.ancestor_block(number).map(|h| h == hash) } // Get ancestor block by number. Returns `None` if there is no block // by that number in the direct ancestry. - fn ancestor_block(&self, number: usize) -> Option<&H> { + fn ancestor_block(&self, number: u32) -> Option<&H> { if number >= self.number { return None } let offset = self.number - number - 1; - self.ancestors.get(offset) + self.ancestors.get(offset as usize) } // get ancestor vote-node. @@ -60,16 +60,16 @@ impl Entry { // a subchain of blocks by hash. struct Subchain { hashes: Vec, // forward order. - best_number: usize, + best_number: u32, } impl Subchain { - fn blocks_reverse<'a>(&'a self) -> impl Iterator + 'a { + fn blocks_reverse<'a>(&'a self) -> impl Iterator + 'a { let best = self.best_number; - self.hashes.iter().rev().cloned().enumerate().map(move |(i, x)| (x, best - i)) + self.hashes.iter().rev().cloned().enumerate().map(move |(i, x)| (x, best - i as u32)) } - fn best(&self) -> Option<(H, usize)> { + fn best(&self) -> Option<(H, u32)> { self.hashes.last().map(|x| (x.clone(), self.best_number)) } } @@ -80,7 +80,7 @@ pub struct VoteGraph { entries: HashMap>, heads: HashSet, base: H, - base_number: usize, + base_number: u32, } impl VoteGraph where @@ -88,7 +88,7 @@ impl VoteGraph where V: AddAssign + Default + Clone + Debug, { /// Create a new `VoteGraph` with base node as given. - pub fn new(base_hash: H, base_number: usize) -> Self { + pub fn new(base_hash: H, base_number: u32) -> Self { let mut entries = HashMap::new(); entries.insert(base_hash.clone(), Entry { number: base_number, @@ -109,12 +109,12 @@ impl VoteGraph where } /// Get the base block. - pub fn base(&self) -> (H, usize) { + pub fn base(&self) -> (H, u32) { (self.base.clone(), self.base_number) } /// Insert a vote with given value into the graph at given hash and number. - pub fn insert>(&mut self, hash: H, number: usize, vote: V, chain: &C) -> Result<(), Error> { + pub fn insert>(&mut self, hash: H, number: u32, vote: V, chain: &C) -> Result<(), Error> { match self.find_containing_nodes(hash.clone(), number) { Some(containing) => if containing.is_empty() { self.append(hash.clone(), number, chain)?; @@ -144,7 +144,7 @@ impl VoteGraph where /// Find the highest block which is either an ancestor of or equal to the given, which fulfills a /// condition. - pub fn find_ancestor<'a, F>(&'a self, hash: H, number: usize, condition: F) -> Option<(H, usize)> + pub fn find_ancestor<'a, F>(&'a self, hash: H, number: u32, condition: F) -> Option<(H, u32)> where F: Fn(&V) -> bool { let entries = &self.entries; @@ -212,7 +212,7 @@ impl VoteGraph where /// enough to trigger the threshold. /// /// Returns `None` when the given `current_best` does not fulfill the condition. - pub fn find_ghost<'a, F>(&'a self, current_best: Option<(H, usize)>, condition: F) -> Option<(H, usize)> + pub fn find_ghost<'a, F>(&'a self, current_best: Option<(H, u32)>, condition: F) -> Option<(H, u32)> where F: Fn(&V) -> bool { let entries = &self.entries; @@ -286,7 +286,7 @@ impl VoteGraph where &'a self, node_key: H, active_node: &'a Entry, - force_constrain: Option<(H, usize)>, + force_constrain: Option<(H, u32)>, condition: F, ) -> Subchain where F: Fn(&V) -> bool @@ -306,7 +306,7 @@ impl VoteGraph where let mut hashes = vec![node_key]; // TODO: for long ranges of blocks this could get inefficient - for offset in 1usize.. { + for offset in 1u32.. { let mut new_best = None; for d_node in descendent_nodes.iter() { if let Some(d_block) = d_node.ancestor_block(base_number + offset) { @@ -352,7 +352,7 @@ impl VoteGraph where // returns `None` if there is a node by that key already, and a vector // (potentially empty) of nodes with the given block in its ancestor-edge // otherwise. - fn find_containing_nodes(&self, hash: H, number: usize) -> Option> { + fn find_containing_nodes(&self, hash: H, number: u32) -> Option> { if self.entries.contains_key(&hash) { return None } @@ -400,7 +400,7 @@ impl VoteGraph where // This function panics if any member of `descendents` is not a vote-node // or does not have ancestor with given hash and number OR if `ancestor_hash` // is already a known entry. - fn introduce_branch(&mut self, descendents: Vec, ancestor_hash: H, ancestor_number: usize) { + fn introduce_branch(&mut self, descendents: Vec, ancestor_hash: H, ancestor_number: u32) { let produced_entry = descendents.into_iter().fold(None, |mut maybe_entry, descendent| { let entry = self.entries.get_mut(&descendent) .expect("this function only invoked with keys of vote-nodes; qed"); @@ -416,7 +416,7 @@ impl VoteGraph where let offset = entry.number.checked_sub(ancestor_number) .expect("this function only invoked with direct ancestors; qed"); let prev_ancestor = entry.ancestor_node(); - let new_ancestors = entry.ancestors.drain(offset..); + let new_ancestors = entry.ancestors.drain((offset as usize)..); let &mut (ref mut new_entry, _) = maybe_entry.get_or_insert_with(move || { let new_entry = Entry { @@ -454,7 +454,7 @@ impl VoteGraph where // append a vote-node onto the chain-tree. This should only be called if // no node in the tree keeps the target anyway. - fn append>(&mut self, hash: H, number: usize, chain: &C) -> Result<(), Error> { + fn append>(&mut self, hash: H, number: u32, chain: &C) -> Result<(), Error> { let mut ancestry = chain.ancestry(self.base.clone(), hash.clone())?; let mut ancestor_index = None; @@ -500,7 +500,7 @@ mod tests { chain.push_blocks("C", &["D1", "E1", "F1"]); chain.push_blocks("C", &["D2", "E2", "F2"]); - tracker.insert("A", 2, 100usize, &chain).unwrap(); + tracker.insert("A", 2, 100u32, &chain).unwrap(); tracker.insert("E1", 6, 100, &chain).unwrap(); tracker.insert("F2", 7, 100, &chain).unwrap(); @@ -532,11 +532,11 @@ mod tests { chain.push_blocks("C", &["D1", "E1", "F1"]); chain.push_blocks("C", &["D2", "E2", "F2"]); - tracker1.insert("C", 4, 100usize, &chain).unwrap(); + tracker1.insert("C", 4, 100u32, &chain).unwrap(); tracker1.insert("E1", 6, 100, &chain).unwrap(); tracker1.insert("F2", 7, 100, &chain).unwrap(); - tracker2.insert("E1", 6, 100usize, &chain).unwrap(); + tracker2.insert("E1", 6, 100u32, &chain).unwrap(); tracker2.insert("F2", 7, 100, &chain).unwrap(); tracker2.insert("C", 4, 100, &chain).unwrap(); @@ -570,7 +570,7 @@ mod tests { chain.push_blocks("C", &["D1", "E1", "F1"]); chain.push_blocks("C", &["D2", "E2", "F2"]); - tracker.insert("B", 3, 0usize, &chain).unwrap(); + tracker.insert("B", 3, 0u32, &chain).unwrap(); tracker.insert("C", 4, 100, &chain).unwrap(); tracker.insert("E1", 6, 100, &chain).unwrap(); tracker.insert("F2", 7, 100, &chain).unwrap(); @@ -589,7 +589,7 @@ mod tests { chain.push_blocks("F", &["G1", "H1", "I1"]); chain.push_blocks("F", &["G2", "H2", "I2"]); - tracker.insert("B", 3, 0usize, &chain).unwrap(); + tracker.insert("B", 3, 0u32, &chain).unwrap(); tracker.insert("G1", 8, 100, &chain).unwrap(); tracker.insert("H2", 9, 150, &chain).unwrap(); @@ -608,7 +608,7 @@ mod tests { chain.push_blocks("E", &["EA", "EB", "EC", "ED"]); chain.push_blocks("F", &["FA", "FB", "FC"]); - tracker.insert("FC", 10, 5usize, &chain).unwrap(); + tracker.insert("FC", 10, 5u32, &chain).unwrap(); tracker.insert("ED", 10, 7, &chain).unwrap(); assert_eq!(tracker.find_ghost(None, |&x| x >= 10), Some(("E", 6))); @@ -638,9 +638,9 @@ mod tests { chain.push_blocks("C", &["D1", "E1", "F1", "G1", "H1", "I1"]); chain.push_blocks("C", &["D2", "E2", "F2", "G2", "H2", "I2"]); - tracker.insert("B", 3, 10usize, &chain).unwrap(); - tracker.insert("F1", 7, 5usize, &chain).unwrap(); - tracker.insert("G2", 8, 5usize, &chain).unwrap(); + tracker.insert("B", 3, 10u32, &chain).unwrap(); + tracker.insert("F1", 7, 5, &chain).unwrap(); + tracker.insert("G2", 8, 5, &chain).unwrap(); let test_cases = &[ "D1", @@ -667,9 +667,9 @@ mod tests { chain.push_blocks("D", &["E1", "F1", "G1", "H1", "I1"]); chain.push_blocks("D", &["E2", "F2", "G2", "H2", "I2"]); - tracker.insert("B", 3, 10usize, &chain).unwrap(); - tracker.insert("F1", 7, 5usize, &chain).unwrap(); - tracker.insert("G2", 8, 5usize, &chain).unwrap(); + tracker.insert("B", 3, 10u32, &chain).unwrap(); + tracker.insert("F1", 7, 5, &chain).unwrap(); + tracker.insert("G2", 8, 5, &chain).unwrap(); assert_eq!(tracker.find_ancestor("G2", 8, |&x| x > 5).unwrap(), ("D", 5)); let test_cases = &[ @@ -695,10 +695,10 @@ mod tests { chain.push_blocks("C", &["D1", "E1", "F1", "G1", "H1", "I1"]); chain.push_blocks("C", &["D2", "E2", "F2"]); - tracker.insert("C", 4, 10usize, &chain).unwrap(); - tracker.insert("F1", 7, 5usize, &chain).unwrap(); - tracker.insert("F2", 7, 5usize, &chain).unwrap(); - tracker.insert("I1", 10, 1usize, &chain).unwrap(); + tracker.insert("C", 4, 10u32, &chain).unwrap(); + tracker.insert("F1", 7, 5, &chain).unwrap(); + tracker.insert("F2", 7, 5, &chain).unwrap(); + tracker.insert("I1", 10, 1, &chain).unwrap(); let test_cases = &[ "C", diff --git a/src/voter.rs b/src/voter.rs index a6db9177..e9222803 100644 --- a/src/voter.rs +++ b/src/voter.rs @@ -156,8 +156,8 @@ pub struct VotingRound> where H: Hash + Clone + Eq + Ord + state: Option>, // state machine driving votes. bridged_round_state: Option<::bridge_state::PriorView>, // updates to later round last_round_state: ::bridge_state::LatterView, // updates from prior round - primary_block: Option<(H, usize)>, // a block posted by primary as a hint. TODO: implement - finalized_sender: UnboundedSender<(H, usize)>, + primary_block: Option<(H, u32)>, // a block posted by primary as a hint. TODO: implement + finalized_sender: UnboundedSender<(H, u32)>, } impl> VotingRound where H: Hash + Clone + Eq + Ord + ::std::fmt::Debug { @@ -294,7 +294,7 @@ impl> VotingRound where H: Hash + Clone + Eq + Ord + match self.env.ancestry(last_round_estimate.0.clone(), last_prevote_g.0) { Ok(ancestry) => { let offset = last_prevote_g.1.saturating_sub(p_num + 1); - if ancestry.get(offset).map_or(false, |b| b == p_hash) { + if ancestry.get(offset as usize).map_or(false, |b| b == p_hash) { p_hash.clone() } else { last_round_estimate.0 @@ -321,7 +321,7 @@ impl> VotingRound where H: Hash + Clone + Eq + Ord + Ok(Some(Prevote { target_hash: t.0, - target_number: t.1 as u32, + target_number: t.1, })) } @@ -334,7 +334,7 @@ impl> VotingRound where H: Hash + Clone + Eq + Ord + Precommit { target_hash: t.0, - target_number: t.1 as u32, + target_number: t.1, } } @@ -392,7 +392,7 @@ impl> BackgroundRound fn is_done(&self) -> bool { // no need to listen on a round anymore once the estimate is finalized. self.inner.votes.state().estimate - .map_or(false, |x| (x.1 as u32) <= self.finalized_number) + .map_or(false, |x| (x.1) <= self.finalized_number) } fn update_finalized(&mut self, new_finalized: u32) { @@ -433,8 +433,8 @@ pub struct Voter> env: Arc, best_round: VotingRound, past_rounds: FuturesUnordered>, - finalized_notifications: UnboundedReceiver<(H, usize)>, - last_finalized: (H, usize), + finalized_notifications: UnboundedReceiver<(H, u32)>, + last_finalized: (H, u32), } impl> Voter @@ -449,7 +449,7 @@ impl> Voter env: Arc, last_round: u64, last_round_state: RoundState, - last_finalized: (H, usize), + last_finalized: (H, u32), ) -> Self { let (finalized_sender, finalized_notifications) = mpsc::unbounded(); @@ -457,7 +457,7 @@ impl> Voter let round_data = env.round_data(next_number); let round_params = ::round::RoundParams { - round_number: next_number as _, + round_number: next_number, voters: round_data.voters, base: last_finalized.clone(), }; @@ -499,13 +499,13 @@ impl> Voter let (f_hash, f_num) = res.expect("one sender always kept alive in self.best_round; qed"); for bg in self.past_rounds.iter_mut() { - bg.update_finalized(f_num as u32); + bg.update_finalized(f_num); } if f_num > self.last_finalized.1 { // TODO: handle safety violations and check ancestry. self.last_finalized = (f_hash.clone(), f_num); - self.env.finalize_block(f_hash, f_num as u32); + self.env.finalize_block(f_hash, f_num); } } @@ -539,7 +539,7 @@ impl> Future for Voter let next_round_data = self.env.round_data(next_number); let round_params = ::round::RoundParams { - round_number: next_number as _, + round_number: next_number, voters: next_round_data.voters, base: self.last_finalized.clone(), }; @@ -579,7 +579,7 @@ impl> Future for Voter mod tests { use super::*; use tokio::runtime::current_thread; - use testing::{GENESIS_HASH, Environment, Id}; + use testing::{self, GENESIS_HASH, Environment, Id}; use std::collections::HashMap; #[test] @@ -591,7 +591,8 @@ mod tests { map }; - let env = Arc::new(Environment::new(voters, local_id)); + let (network, routing_task) = testing::make_network(); + let env = Arc::new(Environment::new(voters, network, local_id)); current_thread::block_on_all(::futures::future::lazy(move || { // initialize chain let last_finalized = env.with_chain(|chain| { @@ -605,7 +606,10 @@ mod tests { let finalized = env.finalized_stream(); let (_signal, exit) = ::exit_future::signal(); let voter = Voter::new(env.clone(), 0, last_round_state, last_finalized); - ::tokio::spawn(exit.until(voter.map_err(|_| panic!("Error voting"))).map(|_| ())); + ::tokio::spawn(exit.clone() + .until(voter.map_err(|_| panic!("Error voting"))).map(|_| ())); + + ::tokio::spawn(exit.until(routing_task).map(|_| ())); // wait for the best block to finalize. finalized @@ -613,4 +617,48 @@ mod tests { .for_each(|_| Ok(())) })).unwrap(); } + + #[test] + fn finalizing_at_fault_threshold() { + // 10 voters + let voters = { + let mut map = HashMap::new(); + for i in 0..10 { + map.insert(Id(i), 1); + } + map + }; + + let (network, routing_task) = testing::make_network(); + current_thread::block_on_all(::futures::future::lazy(move || { + let (_signal, exit) = ::exit_future::signal(); + ::tokio::spawn(exit.clone().until(routing_task).map(|_| ())); + + // 3 voters offline. + let finalized_streams = (0..7).map(move |i| { + let local_id = Id(i); + // initialize chain + let env = Arc::new(Environment::new(voters.clone(), network.clone(), local_id)); + let last_finalized = env.with_chain(|chain| { + chain.push_blocks(GENESIS_HASH, &["A", "B", "C", "D", "E"]); + chain.last_finalized() + }); + + let last_round_state = RoundState::genesis((GENESIS_HASH, 1)); + + // run voter in background. scheduling it to shut down at the end. + let finalized = env.finalized_stream(); + let voter = Voter::new(env.clone(), 0, last_round_state, last_finalized); + ::tokio::spawn(exit.clone() + .until(voter.map_err(|_| panic!("Error voting"))).map(|_| ())); + + // wait for the best block to be finalized by all honest voters + finalized + .take_while(|&(_, n)| Ok(n < 6)) + .for_each(|_| Ok(())) + }); + + ::futures::future::join_all(finalized_streams) + })).unwrap(); + } }