Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub trait Chain<H> {
/// even if that block is `base` itself.
///
/// If `base` is unknown, return `None`.
fn best_chain_containing(&self, base: H) -> Option<(H, usize)>;
fn best_chain_containing(&self, base: H) -> Option<(H, u32)>;
}

/// An equivocation (double-vote) in a given round.
Expand All @@ -117,6 +117,7 @@ pub struct Equivocation<Id, V, S> {
}

/// A protocol message or vote.
#[derive(Clone)]
pub enum Message<H> {
/// A prevote message.
Prevote(Prevote<H>),
Expand All @@ -126,6 +127,7 @@ pub enum Message<H> {
}

/// A signed message.
#[derive(Clone)]
pub struct SignedMessage<H, S, Id> {
pub message: Message<H>,
pub signature: S,
Expand Down
26 changes: 13 additions & 13 deletions src/round.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,18 +176,18 @@ impl<Id: Hash + Eq + Clone, Vote: Clone + Eq, Signature: Clone> VoteTracker<Id,
#[derive(PartialEq, Clone)]
pub struct State<H> {
/// The prevote-GHOST block.
pub prevote_ghost: Option<(H, usize)>,
pub prevote_ghost: Option<(H, u32)>,
/// The finalized block.
pub finalized: Option<(H, usize)>,
pub finalized: Option<(H, u32)>,
/// The new round-estimate.
pub estimate: Option<(H, usize)>,
pub estimate: Option<(H, u32)>,
/// Whether the round is completable.
pub completable: bool,
}

impl<H: Clone> State<H> {
// Genesis state.
pub fn genesis(genesis: (H, usize)) -> Self {
pub fn genesis(genesis: (H, u32)) -> Self {
State {
prevote_ghost: Some(genesis.clone()),
finalized: Some(genesis.clone()),
Expand All @@ -204,7 +204,7 @@ pub struct RoundParams<Id: Hash + Eq, H> {
/// Actors and weights in the round.
pub voters: HashMap<Id, usize>,
/// The base block to build on.
pub base: (H, usize),
pub base: (H, u32),
}

/// Stores data for a round.
Expand All @@ -217,9 +217,9 @@ pub struct Round<Id: Hash + Eq, H: Hash + Eq, Signature> {
faulty_weight: usize,
total_weight: usize,
bitfield_context: BitfieldContext<Id>,
prevote_ghost: Option<(H, usize)>, // current memoized prevote-GHOST block
finalized: Option<(H, usize)>, // best finalized block in this round.
estimate: Option<(H, usize)>, // current memoized round-estimate
prevote_ghost: Option<(H, u32)>, // current memoized prevote-GHOST block
finalized: Option<(H, u32)>, // best finalized block in this round.
estimate: Option<(H, u32)>, // current memoized round-estimate
completable: bool, // whether the round is completable
}

Expand Down Expand Up @@ -289,7 +289,7 @@ impl<Id, H, Signature> Round<Id, H, Signature> where

graph.insert(
vote.target_hash.clone(),
vote.target_number as usize,
vote.target_number,
vote_weight,
chain
)
Expand Down Expand Up @@ -353,7 +353,7 @@ impl<Id, H, Signature> Round<Id, H, Signature> where

graph.insert(
vote.target_hash.clone(),
vote.target_number as usize,
vote.target_number,
vote_weight,
chain
)
Expand Down Expand Up @@ -469,12 +469,12 @@ impl<Id, H, Signature> Round<Id, H, Signature> where
///
/// Returns `None` when new new blocks could have been finalized in this round,
/// according to our estimate.
pub fn estimate(&self) -> Option<&(H, usize)> {
pub fn estimate(&self) -> Option<&(H, u32)> {
self.estimate.as_ref()
}

/// Fetch the most recently finalized block.
pub fn finalized(&self) -> Option<&(H, usize)> {
pub fn finalized(&self) -> Option<&(H, u32)> {
self.finalized.as_ref()
}

Expand All @@ -493,7 +493,7 @@ impl<Id, H, Signature> Round<Id, H, Signature> where
}

/// Return the round base.
pub fn base(&self) -> (H, usize) {
pub fn base(&self) -> (H, u32) {
self.graph.base()
}
}
Expand Down
159 changes: 125 additions & 34 deletions src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,28 @@
//! Helpers for testing

use std::collections::HashMap;
use std::sync::Arc;

use round::State as RoundState;
use voter::RoundData;
use tokio::timer::Delay;
use parking_lot::Mutex;
use futures::prelude::*;
use futures::sync::mpsc;
use futures::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use super::{Chain, Error, Equivocation, Message, Prevote, Precommit, SignedMessage};

pub const GENESIS_HASH: &str = "genesis";
const NULL_HASH: &str = "NULL";

struct BlockRecord {
number: usize,
number: u32,
parent: &'static str,
}

pub struct DummyChain {
inner: HashMap<&'static str, BlockRecord>,
leaves: Vec<&'static str>,
finalized: (&'static str, usize),
finalized: (&'static str, u32),
}

impl DummyChain {
Expand Down Expand Up @@ -65,7 +66,7 @@ impl DummyChain {

for (i, descendent) in blocks.iter().enumerate() {
self.inner.insert(descendent, BlockRecord {
number: base_number + i,
number: base_number + i as u32,
parent,
});

Expand All @@ -82,11 +83,11 @@ impl DummyChain {
self.leaves.insert(insertion_index, new_leaf);
}

pub fn number(&self, hash: &'static str) -> usize {
pub fn number(&self, hash: &'static str) -> u32 {
self.inner.get(hash).unwrap().number
}

pub fn last_finalized(&self) -> (&'static str, usize) {
pub fn last_finalized(&self) -> (&'static str, u32) {
self.finalized.clone()
}
}
Expand All @@ -110,7 +111,7 @@ impl Chain<&'static str> for DummyChain {
Ok(ancestry)
}

fn best_chain_containing(&self, base: &'static str) -> Option<(&'static str, usize)> {
fn best_chain_containing(&self, base: &'static str) -> Option<(&'static str, u32)> {
let base_number = self.inner.get(base)?.number;

for leaf in &self.leaves {
Expand All @@ -132,24 +133,26 @@ impl Chain<&'static str> for DummyChain {
}

#[derive(Hash, Debug, Clone, Copy, PartialEq, Eq)]
pub struct Id(pub usize);
pub struct Id(pub u32);

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Signature(usize);
pub struct Signature(u32);

pub struct Environment {
chain: Mutex<DummyChain>,
voters: HashMap<Id, usize>,
local_id: Id,
listeners: Mutex<Vec<mpsc::UnboundedSender<(&'static str, usize)>>>,
network: Network,
listeners: Mutex<Vec<UnboundedSender<(&'static str, u32)>>>,
}

impl Environment {
pub fn new(voters: HashMap<Id, usize>, local_id: Id) -> Self {
pub fn new(voters: HashMap<Id, usize>, network: Network, local_id: Id) -> Self {
Environment {
chain: Mutex::new(DummyChain::new()),
voters,
local_id,
network,
listeners: Mutex::new(Vec::new()),
}
}
Expand All @@ -160,7 +163,7 @@ impl Environment {
}

/// Stream of finalized blocks.
pub fn finalized_stream(&self) -> mpsc::UnboundedReceiver<(&'static str, usize)> {
pub fn finalized_stream(&self) -> UnboundedReceiver<(&'static str, u32)> {
let (tx, rx) = mpsc::unbounded();
self.listeners.lock().push(tx);
rx
Expand All @@ -172,7 +175,7 @@ impl Chain<&'static str> for Environment {
self.chain.lock().ancestry(base, block)
}

fn best_chain_containing(&self, base: &'static str) -> Option<(&'static str, usize)> {
fn best_chain_containing(&self, base: &'static str) -> Option<(&'static str, u32)> {
self.chain.lock().best_chain_containing(base)
}
}
Expand All @@ -185,12 +188,12 @@ impl ::voter::Environment<&'static str> for Environment {
type Out = Box<Sink<SinkItem=Message<&'static str>,SinkError=Error> + Send + 'static>;
type Error = Error;

fn round_data(&self, _round: u64) -> RoundData<Self::Timer, Self::Id, Self::In, Self::Out> {
fn round_data(&self, round: u64) -> RoundData<Self::Timer, Self::Id, Self::In, Self::Out> {
use std::time::{Instant, Duration};
const GOSSIP_DURATION: Duration = Duration::from_millis(500);

let now = Instant::now();
let (incoming, outgoing) = make_comms(self.local_id);
let (incoming, outgoing) = self.network.make_round_comms(round, self.local_id);
RoundData {
prevote_timer: Box::new(Delay::new(now + GOSSIP_DURATION)
.map_err(|_| panic!("Timer failed"))),
Expand All @@ -207,7 +210,7 @@ impl ::voter::Environment<&'static str> for Environment {
fn finalize_block(&self, hash: &'static str, number: u32) {
let mut chain = self.chain.lock();

if number as usize <= chain.finalized.1 { panic!("Attempted to finalize backwards") }
if number as u32 <= chain.finalized.1 { panic!("Attempted to finalize backwards") }
assert!(chain.ancestry(chain.finalized.0, hash).is_ok(), "Safety violation: reverting finalized block.");
chain.finalized = (hash, number as _);
self.listeners.lock().retain(|s| s.unbounded_send((hash, number as _)).is_ok());
Expand All @@ -222,22 +225,110 @@ impl ::voter::Environment<&'static str> for Environment {
}
}

// TODO: replace this with full-fledged dummy network.
fn make_comms(local_id: Id) -> (
impl Stream<Item=SignedMessage<&'static str, Signature, Id>,Error=Error>,
impl Sink<SinkItem=Message<&'static str>,SinkError=Error>
)
{
let (tx, rx) = mpsc::unbounded();
let tx = tx
.sink_map_err(|e| panic!("Error sending messages: {:?}", e))
.with(move |message| Ok(SignedMessage {
message,
signature: Signature(local_id.0),
id: local_id,
}));

let rx = rx.map_err(|e| panic!("Error receiving messages: {:?}", e));

(rx, tx)
// p2p network data for a round.
struct RoundNetwork {
receiver: UnboundedReceiver<SignedMessage<&'static str, Signature, Id>>,
raw_sender: UnboundedSender<SignedMessage<&'static str, Signature, Id>>,
senders: Vec<UnboundedSender<SignedMessage<&'static str, Signature, Id>>>,
history: Vec<SignedMessage<&'static str, Signature, Id>>,
}

impl RoundNetwork {
fn new() -> Self {
let (tx, rx) = mpsc::unbounded();
RoundNetwork {
receiver: rx,
raw_sender: tx,
senders: Vec::new(),
history: Vec::new(),
}
}

// add a node to the network for a round.
fn add_node(&mut self, id: Id) -> (
impl Stream<Item=SignedMessage<&'static str, Signature, Id>,Error=Error>,
impl Sink<SinkItem=Message<&'static str>,SinkError=Error>
) {
let (tx, rx) = mpsc::unbounded();
let messages_out = self.raw_sender.clone()
.sink_map_err(|e| panic!("Error sending messages: {:?}", e))
.with(move |message| Ok(SignedMessage {
message,
signature: Signature(id.0),
id: id,
}));

// get history to the node.
for prior_message in self.history.iter().cloned() {
let _ = tx.unbounded_send(prior_message);
}

self.senders.push(tx);
let rx = rx.map_err(|e| panic!("Error receiving messages: {:?}", e));

(rx, messages_out)
}

// do routing work
fn route(&mut self) -> Poll<(), ()> {
loop {
match self.receiver.poll().map_err(|e| panic!("Error routing messages: {:?}", e))? {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(None) => return Ok(Async::Ready(())),
Async::Ready(Some(item)) => {
self.history.push(item.clone());
for sender in &self.senders {
let _ = sender.unbounded_send(item.clone());
}
}
}
}
}
}

/// Make a test network.
/// Give the network future to node environments and spawn the routing task
/// to run.
pub fn make_network() -> (Network, NetworkRouting) {
let rounds = Arc::new(Mutex::new(HashMap::new()));
(
Network { rounds: rounds.clone() },
NetworkRouting { rounds }
)
}

/// A test network. Instantiate this with `make_network`,
#[derive(Clone)]
pub struct Network {
rounds: Arc<Mutex<HashMap<u64, RoundNetwork>>>,
}

impl Network {
fn make_round_comms(&self, round_number: u64, node_id: Id) -> (
impl Stream<Item=SignedMessage<&'static str, Signature, Id>,Error=Error>,
impl Sink<SinkItem=Message<&'static str>,SinkError=Error>
) {
let mut rounds = self.rounds.lock();
rounds.entry(round_number).or_insert_with(RoundNetwork::new).add_node(node_id)
}
}

/// the network routing task.
pub struct NetworkRouting {
rounds: Arc<Mutex<HashMap<u64, RoundNetwork>>>,
}

impl Future for NetworkRouting {
type Item = ();
type Error = ();

fn poll(&mut self) -> Poll<(), ()> {
let mut rounds = self.rounds.lock();
rounds.retain(|_, round| match round.route() {
Ok(Async::Ready(())) | Err(()) => false,
Ok(Async::NotReady) => true,
});

Ok(Async::NotReady)
}
}
Loading