diff --git a/finality-aleph/src/validator_network/incoming.rs b/finality-aleph/src/validator_network/incoming.rs index 2200c41b8b..56692564c8 100644 --- a/finality-aleph/src/validator_network/incoming.rs +++ b/finality-aleph/src/validator_network/incoming.rs @@ -1,14 +1,12 @@ use std::fmt::{Display, Error as FmtError, Formatter}; -use aleph_primitives::AuthorityId; -use futures::channel::{mpsc, oneshot}; +use futures::channel::mpsc; use log::{debug, info}; use crate::{ crypto::AuthorityPen, validator_network::{ - protocol_negotiation::{protocol, ProtocolNegotiationError}, - protocols::ProtocolError, + protocols::{protocol, ProtocolError, ProtocolNegotiationError, ResultForService}, Data, Splittable, }, }; @@ -43,7 +41,7 @@ impl From for IncomingError { async fn manage_incoming( authority_pen: AuthorityPen, stream: S, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, oneshot::Sender<()>)>, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), IncomingError> { debug!(target: "validator-network", "Performing incoming protocol negotiation."); @@ -62,7 +60,7 @@ async fn manage_incoming( pub async fn incoming( authority_pen: AuthorityPen, stream: S, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, oneshot::Sender<()>)>, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) { if let Err(e) = manage_incoming(authority_pen, stream, result_for_parent, data_for_user).await { diff --git a/finality-aleph/src/validator_network/manager/direction.rs b/finality-aleph/src/validator_network/manager/direction.rs new file mode 100644 index 0000000000..6b6db0715d --- /dev/null +++ b/finality-aleph/src/validator_network/manager/direction.rs @@ -0,0 +1,225 @@ +use std::collections::{HashMap, HashSet}; + +use aleph_primitives::AuthorityId; + +use crate::validator_network::Data; + +pub struct DirectedPeers { + own_id: AuthorityId, + outgoing: HashMap>, + incoming: HashSet, +} + +fn bit_xor_sum_parity((a, b): (u8, u8)) -> u8 { + let mut result = 0; + for i in 0..8 { + result += ((a >> i) ^ (b >> i)) % 2; + } + result % 2 +} + +// Whether we shold call the remote or the other way around. We xor the peer ids and based on the +// parity of the sum of bits of the result decide whether the caller should be the smaller or +// greated lexicographically. They are never equal, because cryptography. +fn should_call(own_id: &[u8], remote_id: &[u8]) -> bool { + let xor_sum_parity: u8 = own_id + .iter() + .cloned() + .zip(remote_id.iter().cloned()) + .map(bit_xor_sum_parity) + .fold(0u8, |a, b| (a + b) % 2); + match xor_sum_parity == 0 { + true => own_id < remote_id, + false => own_id > remote_id, + } +} + +impl DirectedPeers { + /// Create a new set of peers directed using our own peer id. + pub fn new(own_id: AuthorityId) -> Self { + DirectedPeers { + own_id, + outgoing: HashMap::new(), + incoming: HashSet::new(), + } + } + + /// Add a peer to the list of peers we want to stay connected to, or + /// update the list of addresses if the peer was already added. + /// Returns whether we should start attampts at connecting with the peer, which is the case + /// exactly when the peer is one with which we should attempt connections AND it was added for + /// the first time. + pub fn add_peer(&mut self, peer_id: AuthorityId, addresses: Vec) -> bool { + match should_call(self.own_id.as_ref(), peer_id.as_ref()) { + true => self.outgoing.insert(peer_id, addresses).is_none(), + false => { + // We discard the addresses here, as we will never want to call this peer anyway, + // so we don't need them. + self.incoming.insert(peer_id); + false + } + } + } + + /// Return the addresses of the given peer, or None if we shouldn't attempt connecting with the peer. + pub fn peer_addresses(&self, peer_id: &AuthorityId) -> Option> { + self.outgoing.get(peer_id).cloned() + } + + /// Whether we should be maintaining a connection with this peer. + pub fn interested(&self, peer_id: &AuthorityId) -> bool { + self.incoming.contains(peer_id) || self.outgoing.contains_key(peer_id) + } + + /// Iterator over the peers we want connections from. + pub fn incoming_peers(&self) -> impl Iterator { + self.incoming.iter() + } + + /// Iterator over the peers we want to connect to. + pub fn outgoing_peers(&self) -> impl Iterator { + self.outgoing.keys() + } + + /// Remove a peer from the list of peers that we want to stay connected with, whether the + /// connection was supposed to be incoming or outgoing. + pub fn remove_peer(&mut self, peer_id: &AuthorityId) { + self.incoming.remove(peer_id); + self.outgoing.remove(peer_id); + } +} + +#[cfg(test)] +mod tests { + use aleph_primitives::AuthorityId; + + use super::DirectedPeers; + use crate::validator_network::mock::key; + + type Address = String; + + async fn container_with_id() -> (DirectedPeers
, AuthorityId) { + let (own_id, _) = key().await; + let own_container = DirectedPeers::new(own_id.clone()); + (own_container, own_id) + } + + #[tokio::test] + async fn exactly_one_direction_attempts_connections() { + let (mut own_container, own_id) = container_with_id().await; + let (mut remote_container, remote_id) = container_with_id().await; + let addresses = vec![ + String::from(""), + String::from("a/b/c"), + String::from("43.43.43.43:43000"), + ]; + assert!( + own_container.add_peer(remote_id, addresses.clone()) + != remote_container.add_peer(own_id, addresses.clone()) + ); + } + + async fn container_with_added_connecting_peer() -> (DirectedPeers
, AuthorityId) { + let (mut own_container, own_id) = container_with_id().await; + let (mut remote_container, remote_id) = container_with_id().await; + let addresses = vec![ + String::from(""), + String::from("a/b/c"), + String::from("43.43.43.43:43000"), + ]; + match own_container.add_peer(remote_id.clone(), addresses.clone()) { + true => (own_container, remote_id), + false => { + remote_container.add_peer(own_id.clone(), addresses); + (remote_container, own_id) + } + } + } + + async fn container_with_added_nonconnecting_peer() -> (DirectedPeers
, AuthorityId) { + let (mut own_container, own_id) = container_with_id().await; + let (mut remote_container, remote_id) = container_with_id().await; + let addresses = vec![ + String::from(""), + String::from("a/b/c"), + String::from("43.43.43.43:43000"), + ]; + match own_container.add_peer(remote_id.clone(), addresses.clone()) { + false => (own_container, remote_id), + true => { + remote_container.add_peer(own_id.clone(), addresses); + (remote_container, own_id) + } + } + } + + #[tokio::test] + async fn no_connecting_on_readd() { + let (mut own_container, remote_id) = container_with_added_connecting_peer().await; + let addresses = vec![ + String::from(""), + String::from("a/b/c"), + String::from("43.43.43.43:43000"), + ]; + assert!(!own_container.add_peer(remote_id, addresses)); + } + + #[tokio::test] + async fn peer_addresses_when_connecting() { + let (own_container, remote_id) = container_with_added_connecting_peer().await; + assert!(own_container.peer_addresses(&remote_id).is_some()); + } + + #[tokio::test] + async fn no_peer_addresses_when_nonconnecting() { + let (own_container, remote_id) = container_with_added_nonconnecting_peer().await; + assert!(own_container.peer_addresses(&remote_id).is_none()); + } + + #[tokio::test] + async fn interested_in_connecting() { + let (own_container, remote_id) = container_with_added_connecting_peer().await; + assert!(own_container.interested(&remote_id)); + } + + #[tokio::test] + async fn interested_in_nonconnecting() { + let (own_container, remote_id) = container_with_added_nonconnecting_peer().await; + assert!(own_container.interested(&remote_id)); + } + + #[tokio::test] + async fn uninterested_in_unknown() { + let (own_container, _) = container_with_id().await; + let (_, remote_id) = container_with_id().await; + assert!(!own_container.interested(&remote_id)); + } + + #[tokio::test] + async fn connecting_are_outgoing() { + let (own_container, remote_id) = container_with_added_connecting_peer().await; + assert_eq!( + own_container.outgoing_peers().collect::>(), + vec![&remote_id] + ); + assert_eq!(own_container.incoming_peers().next(), None); + } + + #[tokio::test] + async fn nonconnecting_are_incoming() { + let (own_container, remote_id) = container_with_added_nonconnecting_peer().await; + assert_eq!( + own_container.incoming_peers().collect::>(), + vec![&remote_id] + ); + assert_eq!(own_container.outgoing_peers().next(), None); + } + + #[tokio::test] + async fn uninterested_in_removed() { + let (mut own_container, remote_id) = container_with_added_connecting_peer().await; + assert!(own_container.interested(&remote_id)); + own_container.remove_peer(&remote_id); + assert!(!own_container.interested(&remote_id)); + } +} diff --git a/finality-aleph/src/validator_network/manager.rs b/finality-aleph/src/validator_network/manager/legacy.rs similarity index 87% rename from finality-aleph/src/validator_network/manager.rs rename to finality-aleph/src/validator_network/manager/legacy.rs index c765c33a78..76e3220e68 100644 --- a/finality-aleph/src/validator_network/manager.rs +++ b/finality-aleph/src/validator_network/manager/legacy.rs @@ -4,35 +4,22 @@ use std::{ }; use aleph_primitives::AuthorityId; -use futures::channel::{mpsc, oneshot}; - -use crate::{network::PeerId, validator_network::Data}; +use futures::channel::mpsc; + +use crate::{ + network::PeerId, + validator_network::{ + manager::{AddResult, SendError}, + Data, + }, +}; /// Network component responsible for holding the list of peers that we /// want to connect to, and managing the established connections. pub struct Manager { addresses: HashMap>, outgoing: HashMap>, - incoming: HashMap>, -} - -/// Error during sending data through the Manager -#[derive(Debug, PartialEq, Eq)] -pub enum SendError { - /// Outgoing network connection closed - ConnectionClosed, - /// Peer not added to the manager - PeerNotFound, -} - -impl Display for SendError { - fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { - use SendError::*; - match self { - ConnectionClosed => write!(f, "worker dead"), - PeerNotFound => write!(f, "peer not found"), - } - } + incoming: HashMap>, } struct ManagerStatus { @@ -48,7 +35,7 @@ impl ManagerStatus { let incoming: HashSet<_> = manager .incoming .iter() - .filter(|(_, exit)| !exit.is_canceled()) + .filter(|(_, exit)| !exit.is_closed()) .map(|(k, _)| k.clone()) .collect(); let outgoing: HashSet<_> = manager @@ -149,17 +136,6 @@ impl Display for ManagerStatus { } } -/// Possible results of adding connections. -#[derive(Debug, PartialEq, Eq)] -pub enum AddResult { - /// We do not want to maintain a connection with this peer. - Uninterested, - /// Connection added. - Added, - /// Old connection replaced with new one. - Replaced, -} - impl Manager { /// Create a new Manager with empty list of peers. pub fn new() -> Self { @@ -202,7 +178,11 @@ impl Manager { /// Add an established incoming connection with a known peer, /// but only if the peer is on the list of peers that we want to stay connected with. - pub fn add_incoming(&mut self, peer_id: AuthorityId, exit: oneshot::Sender<()>) -> AddResult { + pub fn add_incoming( + &mut self, + peer_id: AuthorityId, + exit: mpsc::UnboundedSender, + ) -> AddResult { use AddResult::*; if !self.addresses.contains_key(&peer_id) { return Uninterested; @@ -240,10 +220,7 @@ impl Manager { #[cfg(test)] mod tests { - use futures::{ - channel::{mpsc, oneshot}, - StreamExt, - }; + use futures::{channel::mpsc, StreamExt}; use super::{AddResult::*, Manager, SendError}; use crate::validator_network::mock::key; @@ -319,27 +296,27 @@ mod tests { String::from("a/b/c"), String::from("43.43.43.43:43000"), ]; - let (tx, rx) = oneshot::channel(); + let (tx, mut rx) = mpsc::unbounded(); // try add unknown peer assert_eq!(manager.add_incoming(peer_id.clone(), tx), Uninterested); // rx should fail - assert!(rx.await.is_err()); + assert!(rx.try_next().expect("channel should be closed").is_none()); // add peer, this time for real assert!(manager.add_peer(peer_id.clone(), addresses.clone())); - let (tx, mut rx) = oneshot::channel(); + let (tx, mut rx) = mpsc::unbounded(); // should just add assert_eq!(manager.add_incoming(peer_id.clone(), tx), Added); // the exit channel should be open - assert!(rx.try_recv().is_ok()); - let (tx, mut rx2) = oneshot::channel(); + assert!(rx.try_next().is_err()); + let (tx, mut rx2) = mpsc::unbounded(); // should replace now assert_eq!(manager.add_incoming(peer_id.clone(), tx), Replaced); // receiving should fail on old, but work on new channel - assert!(rx.try_recv().is_err()); - assert!(rx2.try_recv().is_ok()); + assert!(rx.try_next().expect("channel should be closed").is_none()); + assert!(rx2.try_next().is_err()); // remove peer manager.remove_peer(&peer_id); // receiving should fail - assert!(rx2.try_recv().is_err()); + assert!(rx2.try_next().expect("channel should be closed").is_none()); } } diff --git a/finality-aleph/src/validator_network/manager/mod.rs b/finality-aleph/src/validator_network/manager/mod.rs new file mode 100644 index 0000000000..5e4ff1dc21 --- /dev/null +++ b/finality-aleph/src/validator_network/manager/mod.rs @@ -0,0 +1,338 @@ +use std::{ + collections::{HashMap, HashSet}, + fmt::{Display, Error as FmtError, Formatter}, +}; + +use aleph_primitives::AuthorityId; +use futures::channel::mpsc; + +use crate::{network::PeerId, validator_network::Data}; + +mod direction; +mod legacy; + +use direction::DirectedPeers; +pub use legacy::Manager as LegacyManager; + +/// Network component responsible for holding the list of peers that we +/// want to connect to or let them connect to us, and managing the established +/// connections. +pub struct Manager { + wanted: DirectedPeers, + have: HashMap>, +} + +/// Error during sending data through the Manager +#[derive(Debug, PartialEq, Eq)] +pub enum SendError { + /// Outgoing network connection closed + ConnectionClosed, + /// Peer not added to the manager + PeerNotFound, +} + +impl Display for SendError { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { + use SendError::*; + match self { + ConnectionClosed => write!(f, "worker dead"), + PeerNotFound => write!(f, "peer not found"), + } + } +} + +struct ManagerStatus { + outgoing_peers: HashSet, + missing_outgoing: HashSet, + incoming_peers: HashSet, + missing_incoming: HashSet, +} + +impl ManagerStatus { + fn new(manager: &Manager) -> Self { + let mut incoming_peers = HashSet::new(); + let mut missing_incoming = HashSet::new(); + let mut outgoing_peers = HashSet::new(); + let mut missing_outgoing = HashSet::new(); + + for peer in manager.wanted.incoming_peers() { + match manager.active_connection(peer) { + true => incoming_peers.insert(peer.clone()), + false => missing_incoming.insert(peer.clone()), + }; + } + for peer in manager.wanted.outgoing_peers() { + match manager.active_connection(peer) { + true => outgoing_peers.insert(peer.clone()), + false => missing_outgoing.insert(peer.clone()), + }; + } + ManagerStatus { + incoming_peers, + missing_incoming, + outgoing_peers, + missing_outgoing, + } + } + + fn wanted_incoming(&self) -> usize { + self.incoming_peers.len() + self.missing_incoming.len() + } + + fn wanted_outgoing(&self) -> usize { + self.outgoing_peers.len() + self.missing_outgoing.len() + } +} + +fn pretty_authority_id_set(set: &HashSet) -> String { + set.iter() + .map(|authority_id| authority_id.to_short_string()) + .collect::>() + .join(", ") +} + +impl Display for ManagerStatus { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { + let wanted_incoming = self.wanted_incoming(); + let wanted_outgoing = self.wanted_outgoing(); + if wanted_incoming + wanted_outgoing == 0 { + return write!(f, "not maintaining any connections; "); + } + + match wanted_incoming { + 0 => write!(f, "not expecting any incoming connections; ")?, + _ => { + write!(f, "expecting {:?} incoming connections; ", wanted_incoming)?; + match self.incoming_peers.is_empty() { + true => write!(f, "WARNING! No incoming peers even though we expected tham, maybe connecting to us is impossible; ")?, + false => write!( + f, + "have - {:?} [{}]; ", + self.incoming_peers.len(), + pretty_authority_id_set(&self.incoming_peers), + )?, + } + if !self.missing_incoming.is_empty() { + write!( + f, + "missing - {:?} [{}]; ", + self.missing_incoming.len(), + pretty_authority_id_set(&self.missing_incoming), + )?; + } + } + } + + match wanted_outgoing { + 0 => write!(f, "not attempting any outgoing connections; ")?, + _ => { + write!(f, "attempting {:?} outgoing connections; ", wanted_outgoing)?; + if !self.outgoing_peers.is_empty() { + write!( + f, + "have - {:?} [{}]; ", + self.incoming_peers.len(), + pretty_authority_id_set(&self.outgoing_peers), + )?; + } + if !self.missing_outgoing.is_empty() { + write!( + f, + "missing - {:?} [{}]; ", + self.missing_incoming.len(), + pretty_authority_id_set(&self.missing_outgoing), + )?; + } + } + } + + Ok(()) + } +} + +/// Possible results of adding connections. +#[derive(Debug, PartialEq, Eq)] +pub enum AddResult { + /// We do not want to maintain a connection with this peer. + Uninterested, + /// Connection added. + Added, + /// Old connection replaced with new one. + Replaced, +} + +impl Manager { + /// Create a new Manager with empty list of peers. + pub fn new(own_id: AuthorityId) -> Self { + Manager { + wanted: DirectedPeers::new(own_id), + have: HashMap::new(), + } + } + + fn active_connection(&self, peer_id: &AuthorityId) -> bool { + self.have + .get(peer_id) + .map(|sender| !sender.is_closed()) + .unwrap_or(false) + } + + /// Add a peer to the list of peers we want to stay connected to, or + /// update the list of addresses if the peer was already added. + /// Returns whether we should start attempts at connecting with the peer. + pub fn add_peer(&mut self, peer_id: AuthorityId, addresses: Vec) -> bool { + self.wanted.add_peer(peer_id, addresses) + } + + /// Return the addresses of the given peer, or None if we shouldn't attempt connecting with the peer. + pub fn peer_addresses(&self, peer_id: &AuthorityId) -> Option> { + self.wanted.peer_addresses(peer_id) + } + + /// Add an established connection with a known peer, but only if the peer is among the peers we want to be connected to. + pub fn add_connection( + &mut self, + peer_id: AuthorityId, + data_for_network: mpsc::UnboundedSender, + ) -> AddResult { + use AddResult::*; + if !self.wanted.interested(&peer_id) { + return Uninterested; + } + match self.have.insert(peer_id, data_for_network) { + Some(_) => Replaced, + None => Added, + } + } + + /// Remove a peer from the list of peers that we want to stay connected with. + /// Close any incoming and outgoing connections that were established. + pub fn remove_peer(&mut self, peer_id: &AuthorityId) { + self.wanted.remove_peer(peer_id); + self.have.remove(peer_id); + } + + /// Send data to a peer. + /// Returns error if there is no outgoing connection to the peer, + /// or if the connection is dead. + pub fn send_to(&mut self, peer_id: &AuthorityId, data: D) -> Result<(), SendError> { + self.have + .get(peer_id) + .ok_or(SendError::PeerNotFound)? + .unbounded_send(data) + .map_err(|_| SendError::ConnectionClosed) + } + + /// A status of the manager, to be displayed somewhere. + pub fn status_report(&self) -> impl Display { + ManagerStatus::new(self) + } +} + +#[cfg(test)] +mod tests { + use futures::{channel::mpsc, StreamExt}; + + use super::{AddResult::*, Manager, SendError}; + use crate::validator_network::mock::key; + + type Data = String; + type Address = String; + + #[tokio::test] + async fn add_remove() { + let (own_id, _) = key().await; + let mut manager = Manager::::new(own_id); + let (peer_id, _) = key().await; + let (peer_id_b, _) = key().await; + let addresses = vec![ + String::from(""), + String::from("a/b/c"), + String::from("43.43.43.43:43000"), + ]; + // add new peer - might return either true or false, depending on the ids + let attempting_connections = manager.add_peer(peer_id.clone(), addresses.clone()); + // add known peer - always returns false + assert!(!manager.add_peer(peer_id.clone(), addresses.clone())); + // get address + match attempting_connections { + true => assert_eq!(manager.peer_addresses(&peer_id), Some(addresses)), + false => assert_eq!(manager.peer_addresses(&peer_id), None), + } + // try to get address of an unknown peer + assert_eq!(manager.peer_addresses(&peer_id_b), None); + // remove peer + manager.remove_peer(&peer_id); + // try to get address of removed peer + assert_eq!(manager.peer_addresses(&peer_id), None); + // remove again + manager.remove_peer(&peer_id); + // remove unknown peer + manager.remove_peer(&peer_id_b); + } + + #[tokio::test] + async fn send_receive() { + let (mut connecting_id, _) = key().await; + let mut connecting_manager = Manager::::new(connecting_id.clone()); + let (mut listening_id, _) = key().await; + let mut listening_manager = Manager::::new(listening_id.clone()); + let data = String::from("DATA"); + let addresses = vec![ + String::from(""), + String::from("a/b/c"), + String::from("43.43.43.43:43000"), + ]; + let (tx, _rx) = mpsc::unbounded(); + // try add unknown peer + assert_eq!( + connecting_manager.add_connection(listening_id.clone(), tx), + Uninterested + ); + // sending should fail + assert_eq!( + connecting_manager.send_to(&listening_id, data.clone()), + Err(SendError::PeerNotFound) + ); + // add peer, this time for real + if connecting_manager.add_peer(listening_id.clone(), addresses.clone()) { + assert!(!listening_manager.add_peer(connecting_id.clone(), addresses.clone())) + } else { + // We need to switch the names around, because the connection was randomly the + // other way around. + let temp_id = connecting_id; + connecting_id = listening_id; + listening_id = temp_id; + let temp_manager = connecting_manager; + connecting_manager = listening_manager; + listening_manager = temp_manager; + assert!(connecting_manager.add_peer(listening_id.clone(), addresses.clone())); + } + // add outgoing to connecting + let (tx, mut rx) = mpsc::unbounded(); + assert_eq!( + connecting_manager.add_connection(listening_id.clone(), tx), + Added + ); + // send and receive connecting + assert!(connecting_manager + .send_to(&listening_id, data.clone()) + .is_ok()); + assert_eq!(data, rx.next().await.expect("should receive")); + // add incoming to listening + let (tx, mut rx) = mpsc::unbounded(); + assert_eq!( + listening_manager.add_connection(connecting_id.clone(), tx), + Added + ); + // send and receive listening + assert!(listening_manager + .send_to(&connecting_id, data.clone()) + .is_ok()); + assert_eq!(data, rx.next().await.expect("should receive")); + // remove peer + listening_manager.remove_peer(&connecting_id); + // receiving should fail + assert!(rx.next().await.is_none()); + } +} diff --git a/finality-aleph/src/validator_network/mod.rs b/finality-aleph/src/validator_network/mod.rs index a797b7e44a..d4f14aac4c 100644 --- a/finality-aleph/src/validator_network/mod.rs +++ b/finality-aleph/src/validator_network/mod.rs @@ -5,15 +5,12 @@ use codec::Codec; use sp_core::crypto::KeyTypeId; use tokio::io::{AsyncRead, AsyncWrite}; -mod handshake; -mod heartbeat; mod incoming; mod io; mod manager; #[cfg(test)] pub mod mock; mod outgoing; -mod protocol_negotiation; mod protocols; mod service; diff --git a/finality-aleph/src/validator_network/outgoing.rs b/finality-aleph/src/validator_network/outgoing.rs index 36115181ab..a72a605480 100644 --- a/finality-aleph/src/validator_network/outgoing.rs +++ b/finality-aleph/src/validator_network/outgoing.rs @@ -8,8 +8,9 @@ use tokio::time::{sleep, Duration}; use crate::{ crypto::AuthorityPen, validator_network::{ - protocol_negotiation::{protocol, ProtocolNegotiationError}, - protocols::ProtocolError, + protocols::{ + protocol, ConnectionType, ProtocolError, ProtocolNegotiationError, ResultForService, + }, Data, Dialer, }, }; @@ -48,7 +49,8 @@ async fn manage_outgoing>( peer_id: AuthorityId, mut dialer: ND, addresses: Vec, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, + result_for_parent: mpsc::UnboundedSender>, + data_for_user: mpsc::UnboundedSender, ) -> Result<(), OutgoingError> { debug!(target: "validator-network", "Trying to connect to {}.", peer_id); let stream = dialer @@ -59,7 +61,13 @@ async fn manage_outgoing>( let (stream, protocol) = protocol(stream).await?; debug!(target: "validator-network", "Negotiated protocol, running."); Ok(protocol - .manage_outgoing(stream, authority_pen, peer_id, result_for_parent) + .manage_outgoing( + stream, + authority_pen, + peer_id, + result_for_parent, + data_for_user, + ) .await?) } @@ -73,7 +81,8 @@ pub async fn outgoing>( peer_id: AuthorityId, dialer: ND, addresses: Vec, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, + result_for_parent: mpsc::UnboundedSender>, + data_for_user: mpsc::UnboundedSender, ) { if let Err(e) = manage_outgoing( authority_pen, @@ -81,12 +90,18 @@ pub async fn outgoing>( dialer, addresses, result_for_parent.clone(), + data_for_user, ) .await { info!(target: "validator-network", "Outgoing connection to {} failed: {}, will retry after {}s.", peer_id, e, RETRY_DELAY.as_secs()); sleep(RETRY_DELAY).await; - if result_for_parent.unbounded_send((peer_id, None)).is_err() { + // we send the "new" connection type, because we always assume it's new until proven + // otherwise, and here we did not even get the chance to attempt negotiating a protocol + if result_for_parent + .unbounded_send((peer_id, None, ConnectionType::New)) + .is_err() + { debug!(target: "validator-network", "Could not send the closing message, we've probably been terminated by the parent service."); } } diff --git a/finality-aleph/src/validator_network/handshake.rs b/finality-aleph/src/validator_network/protocols/handshake.rs similarity index 100% rename from finality-aleph/src/validator_network/handshake.rs rename to finality-aleph/src/validator_network/protocols/handshake.rs diff --git a/finality-aleph/src/validator_network/protocols/mod.rs b/finality-aleph/src/validator_network/protocols/mod.rs new file mode 100644 index 0000000000..d16430d205 --- /dev/null +++ b/finality-aleph/src/validator_network/protocols/mod.rs @@ -0,0 +1,160 @@ +use std::fmt::{Display, Error as FmtError, Formatter}; + +use aleph_primitives::AuthorityId; +use futures::channel::mpsc; + +use crate::{ + crypto::AuthorityPen, + validator_network::{ + io::{ReceiveError, SendError}, + Data, Splittable, + }, +}; + +mod handshake; +mod negotiation; +mod v0; +mod v1; + +use handshake::HandshakeError; +pub use negotiation::{protocol, ProtocolNegotiationError}; + +pub type Version = u32; + +/// The types of connections needed for backwards compatibility with the legacy two connections +/// protocol. Remove after it's no longer needed. +#[derive(PartialEq, Debug, Eq, Clone, Copy)] +pub enum ConnectionType { + New, + LegacyIncoming, + LegacyOutgoing, +} + +/// What connections send back to the service after they become established. Starts with a peer id +/// of the remote node, followed by a channel for sending data to that node, with None if the +/// connection was unsuccessful and should be reestablished. Finally a marker for legacy +/// compatibility. +pub type ResultForService = ( + AuthorityId, + Option>, + ConnectionType, +); + +/// Defines the protocol for communication. +#[derive(Debug, PartialEq, Eq)] +pub enum Protocol { + /// The first version of the protocol, with unidirectional connections. + V0, + /// The current version of the protocol, with pseudorandom connection direction and + /// multiplexing. + V1, +} + +/// Protocol error. +#[derive(Debug)] +pub enum ProtocolError { + /// Error during performing a handshake. + HandshakeError(HandshakeError), + /// Sending failed. + SendError(SendError), + /// Receiving failed. + ReceiveError(ReceiveError), + /// Heartbeat stopped. + CardiacArrest, + /// Channel to the parent service closed. + NoParentConnection, + /// Data channel closed. + NoUserConnection, +} + +impl Display for ProtocolError { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { + use ProtocolError::*; + match self { + HandshakeError(e) => write!(f, "handshake error: {}", e), + SendError(e) => write!(f, "send error: {}", e), + ReceiveError(e) => write!(f, "receive error: {}", e), + CardiacArrest => write!(f, "heartbeat stopped"), + NoParentConnection => write!(f, "cannot send result to service"), + NoUserConnection => write!(f, "cannot send data to user"), + } + } +} + +impl From for ProtocolError { + fn from(e: HandshakeError) -> Self { + ProtocolError::HandshakeError(e) + } +} + +impl From for ProtocolError { + fn from(e: SendError) -> Self { + ProtocolError::SendError(e) + } +} + +impl From for ProtocolError { + fn from(e: ReceiveError) -> Self { + ProtocolError::ReceiveError(e) + } +} + +impl Protocol { + /// Minimal supported protocol version. + const MIN_VERSION: Version = 0; + + /// Maximal supported protocol version. + const MAX_VERSION: Version = 1; + + /// Launches the proper variant of the protocol (receiver half). + pub async fn manage_incoming( + &self, + stream: S, + authority_pen: AuthorityPen, + result_for_service: mpsc::UnboundedSender>, + data_for_user: mpsc::UnboundedSender, + ) -> Result<(), ProtocolError> { + use Protocol::*; + match self { + V0 => v0::incoming(stream, authority_pen, result_for_service, data_for_user).await, + V1 => v1::incoming(stream, authority_pen, result_for_service, data_for_user).await, + } + } + + /// Launches the proper variant of the protocol (sender half). + pub async fn manage_outgoing( + &self, + stream: S, + authority_pen: AuthorityPen, + peer_id: AuthorityId, + result_for_service: mpsc::UnboundedSender>, + data_for_user: mpsc::UnboundedSender, + ) -> Result<(), ProtocolError> { + use Protocol::*; + match self { + V0 => v0::outgoing(stream, authority_pen, peer_id, result_for_service).await, + V1 => { + v1::outgoing( + stream, + authority_pen, + peer_id, + result_for_service, + data_for_user, + ) + .await + } + } + } +} + +impl TryFrom for Protocol { + type Error = Version; + + fn try_from(version: Version) -> Result { + match version { + 0 => Ok(Protocol::V0), + 1 => Ok(Protocol::V1), + unknown_version => Err(unknown_version), + } + } +} diff --git a/finality-aleph/src/validator_network/protocol_negotiation.rs b/finality-aleph/src/validator_network/protocols/negotiation.rs similarity index 91% rename from finality-aleph/src/validator_network/protocol_negotiation.rs rename to finality-aleph/src/validator_network/protocols/negotiation.rs index 6413dfc63b..52f0f1203a 100644 --- a/finality-aleph/src/validator_network/protocol_negotiation.rs +++ b/finality-aleph/src/validator_network/protocols/negotiation.rs @@ -8,17 +8,13 @@ use tokio::{ time::{timeout, Duration}, }; -use crate::validator_network::protocols::Protocol; +use crate::validator_network::protocols::{Protocol, Version}; -pub type ProtocolVersion = u32; - -const MIN_SUPPORTED_PROTOCOL: ProtocolVersion = 0; -const MAX_SUPPORTED_PROTOCOL: ProtocolVersion = 0; const PROTOCOL_NEGOTIATION_TIMEOUT: Duration = Duration::from_secs(5); /// A range of supported protocols, will fail to decode if the range is empty. #[derive(Clone, Debug, PartialEq, Eq)] -pub struct ProtocolsRange(ProtocolVersion, ProtocolVersion); +pub struct ProtocolsRange(Version, Version); impl Display for ProtocolsRange { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { @@ -27,7 +23,7 @@ impl Display for ProtocolsRange { } const fn supported_protocol_range() -> ProtocolsRange { - ProtocolsRange(MIN_SUPPORTED_PROTOCOL, MAX_SUPPORTED_PROTOCOL) + ProtocolsRange(Protocol::MIN_VERSION, Protocol::MAX_VERSION) } /// What went wrong when negotiating a protocol. @@ -36,7 +32,7 @@ pub enum ProtocolNegotiationError { ConnectionClosed, InvalidRange(ProtocolsRange), ProtocolMismatch(ProtocolsRange, ProtocolsRange), - BadChoice(ProtocolVersion), + BadChoice(Version), TimedOut, } @@ -74,12 +70,8 @@ impl ProtocolsRange { fn decode(encoded: &[u8; 8]) -> Result { let result = ProtocolsRange( - ProtocolVersion::from_le_bytes( - encoded[0..4].try_into().expect("this is literally 4 bytes"), - ), - ProtocolVersion::from_le_bytes( - encoded[4..8].try_into().expect("this is literally 4 bytes"), - ), + Version::from_le_bytes(encoded[0..4].try_into().expect("this is literally 4 bytes")), + Version::from_le_bytes(encoded[4..8].try_into().expect("this is literally 4 bytes")), ); match result.valid() { true => Ok(result), @@ -103,9 +95,11 @@ fn maximum_of_intersection( range1: ProtocolsRange, range2: ProtocolsRange, ) -> Result { - intersection(range1, range2).map(|intersection| match intersection.1 { - 0 => Ok(Protocol::V0), - unknown_version => Err(ProtocolNegotiationError::BadChoice(unknown_version)), + intersection(range1, range2).map(|intersection| { + intersection + .1 + .try_into() + .map_err(ProtocolNegotiationError::BadChoice) })? } @@ -151,7 +145,7 @@ mod tests { fn correct_negotiation(result: Result<(S, Protocol), ProtocolNegotiationError>) { match result { - Ok((_stream, protocol)) => assert_eq!(Protocol::V0, protocol), + Ok((_stream, protocol)) => assert_eq!(Protocol::V1, protocol), Err(e) => panic!("Unexpected error: {:?}", e), } } diff --git a/finality-aleph/src/validator_network/heartbeat.rs b/finality-aleph/src/validator_network/protocols/v0/heartbeat.rs similarity index 100% rename from finality-aleph/src/validator_network/heartbeat.rs rename to finality-aleph/src/validator_network/protocols/v0/heartbeat.rs diff --git a/finality-aleph/src/validator_network/protocols.rs b/finality-aleph/src/validator_network/protocols/v0/mod.rs similarity index 77% rename from finality-aleph/src/validator_network/protocols.rs rename to finality-aleph/src/validator_network/protocols/v0/mod.rs index 54581c20bb..1f80de2baf 100644 --- a/finality-aleph/src/validator_network/protocols.rs +++ b/finality-aleph/src/validator_network/protocols/v0/mod.rs @@ -1,78 +1,23 @@ -use std::fmt::{Display, Error as FmtError, Formatter}; - use aleph_primitives::AuthorityId; -use futures::{ - channel::{mpsc, oneshot}, - StreamExt, -}; +use futures::{channel::mpsc, StreamExt}; use log::{debug, info, trace}; use tokio::io::{AsyncRead, AsyncWrite}; use crate::{ crypto::AuthorityPen, validator_network::{ - handshake::{v0_handshake_incoming, v0_handshake_outgoing, HandshakeError}, - heartbeat::{heartbeat_receiver, heartbeat_sender}, - io::{receive_data, send_data, ReceiveError, SendError}, + io::{receive_data, send_data}, + protocols::{ + handshake::{v0_handshake_incoming, v0_handshake_outgoing}, + ConnectionType, ProtocolError, ResultForService, + }, Data, Splittable, }, }; -/// Defines the protocol for communication. -#[derive(Debug, PartialEq, Eq)] -pub enum Protocol { - /// The current version of the protocol. - V0, -} +mod heartbeat; -/// Protocol error. -#[derive(Debug)] -pub enum ProtocolError { - /// Error during performing a handshake. - HandshakeError(HandshakeError), - /// Sending failed. - SendError(SendError), - /// Receiving failed. - ReceiveError(ReceiveError), - /// Heartbeat stopped. - CardiacArrest, - /// Channel to the parent service closed. - NoParentConnection, - /// Data channel closed. - NoUserConnection, -} - -impl Display for ProtocolError { - fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { - use ProtocolError::*; - match self { - HandshakeError(e) => write!(f, "handshake error: {}", e), - SendError(e) => write!(f, "send error: {}", e), - ReceiveError(e) => write!(f, "receive error: {}", e), - CardiacArrest => write!(f, "heartbeat stopped"), - NoParentConnection => write!(f, "cannot send result to service"), - NoUserConnection => write!(f, "cannot send data to user"), - } - } -} - -impl From for ProtocolError { - fn from(e: HandshakeError) -> Self { - ProtocolError::HandshakeError(e) - } -} - -impl From for ProtocolError { - fn from(e: SendError) -> Self { - ProtocolError::SendError(e) - } -} - -impl From for ProtocolError { - fn from(e: ReceiveError) -> Self { - ProtocolError::ReceiveError(e) - } -} +use heartbeat::{heartbeat_receiver, heartbeat_sender}; /// Receives data from the parent service and sends it over the network. /// Exits when the parent channel is closed, or if the network connection is broken. @@ -91,18 +36,22 @@ async fn sending( /// Performs the handshake, and then keeps sending data received from the parent service. /// Exits on parent request, or in case of broken or dead network connection. -async fn v0_outgoing( +pub async fn outgoing( stream: S, authority_pen: AuthorityPen, peer_id: AuthorityId, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, + result_for_parent: mpsc::UnboundedSender>, ) -> Result<(), ProtocolError> { trace!(target: "validator-network", "Extending hand to {}.", peer_id); let (sender, receiver) = v0_handshake_outgoing(stream, authority_pen, peer_id.clone()).await?; info!(target: "validator-network", "Outgoing handshake with {} finished successfully.", peer_id); - let (data_for_network, data_from_user) = mpsc::unbounded::(); + let (data_for_network, data_from_user) = mpsc::unbounded(); result_for_parent - .unbounded_send((peer_id.clone(), Some(data_for_network))) + .unbounded_send(( + peer_id.clone(), + Some(data_for_network), + ConnectionType::LegacyOutgoing, + )) .map_err(|_| ProtocolError::NoParentConnection)?; let sending = sending(sender, data_from_user); @@ -134,19 +83,23 @@ async fn receiving( /// Performs the handshake, and then keeps sending data received from the network to the parent service. /// Exits on parent request, or in case of broken or dead network connection. -async fn v0_incoming( +pub async fn incoming( stream: S, authority_pen: AuthorityPen, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, oneshot::Sender<()>)>, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { trace!(target: "validator-network", "Waiting for extended hand..."); let (sender, receiver, peer_id) = v0_handshake_incoming(stream, authority_pen).await?; info!(target: "validator-network", "Incoming handshake with {} finished successfully.", peer_id); - let (tx_exit, exit) = oneshot::channel(); + let (tx_exit, mut exit) = mpsc::unbounded(); result_for_parent - .unbounded_send((peer_id.clone(), tx_exit)) + .unbounded_send(( + peer_id.clone(), + Some(tx_exit), + ConnectionType::LegacyIncoming, + )) .map_err(|_| ProtocolError::NoParentConnection)?; let receiving = receiving(receiver, data_for_user); @@ -157,37 +110,7 @@ async fn v0_incoming( tokio::select! { _ = heartbeat => return Err(ProtocolError::CardiacArrest), result = receiving => return result, - _ = exit => return Ok(()), - } - } -} - -impl Protocol { - /// Launches the proper variant of the protocol (receiver half). - pub async fn manage_incoming( - &self, - stream: S, - authority_pen: AuthorityPen, - result_for_service: mpsc::UnboundedSender<(AuthorityId, oneshot::Sender<()>)>, - data_for_user: mpsc::UnboundedSender, - ) -> Result<(), ProtocolError> { - use Protocol::*; - match self { - V0 => v0_incoming(stream, authority_pen, result_for_service, data_for_user).await, - } - } - - /// Launches the proper variant of the protocol (sender half). - pub async fn manage_outgoing( - &self, - stream: S, - authority_pen: AuthorityPen, - peer_id: AuthorityId, - result_for_service: mpsc::UnboundedSender<(AuthorityId, Option>)>, - ) -> Result<(), ProtocolError> { - use Protocol::*; - match self { - V0 => v0_outgoing(stream, authority_pen, peer_id, result_for_service).await, + _ = exit.next() => return Ok(()), } } } @@ -196,15 +119,16 @@ impl Protocol { mod tests { use aleph_primitives::AuthorityId; use futures::{ - channel::{mpsc, mpsc::UnboundedReceiver, oneshot}, + channel::{mpsc, mpsc::UnboundedReceiver}, pin_mut, FutureExt, StreamExt, }; - use super::{Protocol, ProtocolError}; + use super::{incoming, outgoing, ProtocolError}; use crate::{ crypto::AuthorityPen, validator_network::{ mock::{key, MockSplittable}, + protocols::{ConnectionType, ResultForService}, Data, }, }; @@ -217,24 +141,23 @@ mod tests { impl futures::Future>, impl futures::Future>, UnboundedReceiver, - UnboundedReceiver<(AuthorityId, oneshot::Sender<()>)>, - UnboundedReceiver<(AuthorityId, Option>)>, + UnboundedReceiver>, + UnboundedReceiver>, ) { let (stream_incoming, stream_outgoing) = MockSplittable::new(4096); let (id_incoming, pen_incoming) = key().await; let (id_outgoing, pen_outgoing) = key().await; assert_ne!(id_incoming, id_outgoing); - let (incoming_result_for_service, result_from_incoming) = - mpsc::unbounded::<(AuthorityId, oneshot::Sender<()>)>(); + let (incoming_result_for_service, result_from_incoming) = mpsc::unbounded(); let (outgoing_result_for_service, result_from_outgoing) = mpsc::unbounded(); let (data_for_user, data_from_incoming) = mpsc::unbounded::(); - let incoming_handle = Protocol::V0.manage_incoming( + let incoming_handle = incoming( stream_incoming, pen_incoming.clone(), incoming_result_for_service, data_for_user, ); - let outgoing_handle = Protocol::V0.manage_outgoing( + let outgoing_handle = outgoing( stream_outgoing, pen_outgoing.clone(), id_incoming.clone(), @@ -274,7 +197,8 @@ mod tests { _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), result = result_from_outgoing.next() => { - let (_, maybe_data_for_outgoing) = result.expect("outgoing should have resturned Some"); + let (_, maybe_data_for_outgoing, connection_type) = result.expect("the chennel shouldn't be dropped"); + assert_eq!(connection_type, ConnectionType::LegacyOutgoing); let data_for_outgoing = maybe_data_for_outgoing.expect("successfully connected"); data_for_outgoing .unbounded_send(vec![4, 3, 43]) @@ -323,7 +247,8 @@ mod tests { _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), received = result_from_incoming.next() => { // we drop the exit oneshot channel, thus finishing incoming_handle - let (received_id, _) = received.expect("should receive"); + let (received_id, _, connection_type) = received.expect("the chennel shouldn't be dropped"); + assert_eq!(connection_type, ConnectionType::LegacyIncoming); assert_eq!(received_id, id_outgoing); }, }; @@ -382,7 +307,8 @@ mod tests { _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), result = result_from_outgoing.next() => { - let (_, maybe_data_for_outgoing) = result.expect("outgoing should have resturned Some"); + let (_, maybe_data_for_outgoing, connection_type) = result.expect("the chennel shouldn't be dropped"); + assert_eq!(connection_type, ConnectionType::LegacyOutgoing); let data_for_outgoing = maybe_data_for_outgoing.expect("successfully connected"); data_for_outgoing .unbounded_send(vec![2, 1, 3, 7]) @@ -436,11 +362,12 @@ mod tests { ) = prepare::>().await; let incoming_handle = incoming_handle.fuse(); pin_mut!(incoming_handle); - let (_, _exit) = tokio::select! { + let (_, _exit, connection_type) = tokio::select! { _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), _ = outgoing_handle => panic!("outgoing process unexpectedly finished"), out = result_from_incoming.next() => out.expect("should receive"), }; + assert_eq!(connection_type, ConnectionType::LegacyIncoming); // outgoing_handle got consumed by tokio::select!, the sender is dead match incoming_handle.await { Err(ProtocolError::ReceiveError(_)) => (), @@ -485,11 +412,12 @@ mod tests { ) = prepare::>().await; let outgoing_handle = outgoing_handle.fuse(); pin_mut!(outgoing_handle); - let (_, _exit) = tokio::select! { + let (_, _exit, connection_type) = tokio::select! { _ = incoming_handle => panic!("incoming process unexpectedly finished"), _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), out = result_from_incoming.next() => out.expect("should receive"), }; + assert_eq!(connection_type, ConnectionType::LegacyIncoming); // incoming_handle got consumed by tokio::select!, the receiver is dead match outgoing_handle.await { // We never get the SendError variant here, because we did not send anything @@ -515,11 +443,12 @@ mod tests { ) = prepare::>().await; let outgoing_handle = outgoing_handle.fuse(); pin_mut!(outgoing_handle); - let (_, _exit) = tokio::select! { + let (_, _exit, connection_type) = tokio::select! { _ = incoming_handle => panic!("incoming process unexpectedly finished"), _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), out = result_from_incoming.next() => out.expect("should receive"), }; + assert_eq!(connection_type, ConnectionType::LegacyIncoming); match outgoing_handle.await { Err(ProtocolError::CardiacArrest) => (), Err(e) => panic!("unexpected error: {}", e), diff --git a/finality-aleph/src/validator_network/protocols/v1/mod.rs b/finality-aleph/src/validator_network/protocols/v1/mod.rs new file mode 100644 index 0000000000..b0287bca4b --- /dev/null +++ b/finality-aleph/src/validator_network/protocols/v1/mod.rs @@ -0,0 +1,454 @@ +use aleph_primitives::AuthorityId; +use codec::{Decode, Encode}; +use futures::{channel::mpsc, StreamExt}; +use log::{debug, info, trace}; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + time::{timeout, Duration}, +}; + +use crate::{ + crypto::AuthorityPen, + validator_network::{ + io::{receive_data, send_data}, + protocols::{ + handshake::{v0_handshake_incoming, v0_handshake_outgoing}, + ConnectionType, ProtocolError, ResultForService, + }, + Data, Splittable, + }, +}; + +const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5); +const MAX_MISSED_HEARTBEATS: u32 = 4; + +#[derive(Debug, Clone, Encode, Decode)] +enum Message { + Data(D), + Heartbeat, +} + +async fn sending( + mut sender: S, + mut data_from_user: mpsc::UnboundedReceiver, +) -> Result<(), ProtocolError> { + use Message::*; + loop { + let to_send = match timeout(HEARTBEAT_TIMEOUT, data_from_user.next()).await { + Ok(maybe_data) => match maybe_data { + Some(data) => Data(data), + // We have been closed by the parent service, all good. + None => return Ok(()), + }, + _ => Heartbeat, + }; + sender = send_data(sender, to_send).await?; + } +} + +async fn receiving( + mut stream: S, + data_for_user: mpsc::UnboundedSender, +) -> Result<(), ProtocolError> { + use Message::*; + loop { + let (old_stream, message) = timeout( + MAX_MISSED_HEARTBEATS * HEARTBEAT_TIMEOUT, + receive_data(stream), + ) + .await + .map_err(|_| ProtocolError::CardiacArrest)??; + stream = old_stream; + match message { + Data(data) => data_for_user + .unbounded_send(data) + .map_err(|_| ProtocolError::NoUserConnection)?, + Heartbeat => (), + } + } +} + +async fn manage_connection( + sender: S, + receiver: R, + data_from_user: mpsc::UnboundedReceiver, + data_for_user: mpsc::UnboundedSender, +) -> Result<(), ProtocolError> { + let sending = sending(sender, data_from_user); + let receiving = receiving(receiver, data_for_user); + tokio::select! { + result = receiving => result, + result = sending => result, + } +} + +/// Performs the outgoing handshake, and then manages a connection sending and receiving data. +/// Exits on parent request, or in case of broken or dead network connection. +pub async fn outgoing( + stream: S, + authority_pen: AuthorityPen, + peer_id: AuthorityId, + result_for_parent: ResultForService, + data_for_user: mpsc::UnboundedSender, +) -> Result<(), ProtocolError> { + trace!(target: "validator-network", "Extending hand to {}.", peer_id); + let (sender, receiver) = v0_handshake_outgoing(stream, authority_pen, peer_id.clone()).await?; + info!(target: "validator-network", "Outgoing handshake with {} finished successfully.", peer_id); + let (data_for_network, data_from_user) = mpsc::unbounded(); + result_for_parent + .unbounded_send((peer_id.clone(), Some(data_for_network), ConnectionType::New)) + .map_err(|_| ProtocolError::NoParentConnection)?; + debug!(target: "validator-network", "Starting worker for communicating with {}.", peer_id); + manage_connection(sender, receiver, data_from_user, data_for_user).await +} + +/// Performs the incoming handshake, and then manages a connection sending and receiving data. +/// Exits on parent request, or in case of broken or dead network connection. +pub async fn incoming( + stream: S, + authority_pen: AuthorityPen, + result_for_parent: ResultForService, + data_for_user: mpsc::UnboundedSender, +) -> Result<(), ProtocolError> { + trace!(target: "validator-network", "Waiting for extended hand..."); + let (sender, receiver, peer_id) = v0_handshake_incoming(stream, authority_pen).await?; + info!(target: "validator-network", "Incoming handshake with {} finished successfully.", peer_id); + let (data_for_network, data_from_user) = mpsc::unbounded(); + result_for_parent + .unbounded_send((peer_id.clone(), Some(data_for_network), ConnectionType::New)) + .map_err(|_| ProtocolError::NoParentConnection)?; + debug!(target: "validator-network", "Starting worker for communicating with {}.", peer_id); + manage_connection(sender, receiver, data_from_user, data_for_user).await +} + +#[cfg(test)] +mod tests { + use aleph_primitives::AuthorityId; + use futures::{ + channel::{mpsc, mpsc::UnboundedReceiver}, + pin_mut, FutureExt, StreamExt, + }; + + use super::{incoming, outgoing, ProtocolError}; + use crate::{ + crypto::AuthorityPen, + validator_network::{ + mock::{key, MockSplittable}, + protocols::ConnectionType, + Data, + }, + }; + + async fn prepare() -> ( + AuthorityId, + AuthorityPen, + AuthorityId, + AuthorityPen, + impl futures::Future>, + impl futures::Future>, + UnboundedReceiver, + UnboundedReceiver, + UnboundedReceiver<( + AuthorityId, + Option>, + ConnectionType, + )>, + UnboundedReceiver<( + AuthorityId, + Option>, + ConnectionType, + )>, + ) { + let (stream_incoming, stream_outgoing) = MockSplittable::new(4096); + let (id_incoming, pen_incoming) = key().await; + let (id_outgoing, pen_outgoing) = key().await; + assert_ne!(id_incoming, id_outgoing); + let (incoming_result_for_service, result_from_incoming) = mpsc::unbounded(); + let (outgoing_result_for_service, result_from_outgoing) = mpsc::unbounded(); + let (incoming_data_for_user, data_from_incoming) = mpsc::unbounded::(); + let (outgoing_data_for_user, data_from_outgoing) = mpsc::unbounded::(); + let incoming_handle = incoming( + stream_incoming, + pen_incoming.clone(), + incoming_result_for_service, + incoming_data_for_user, + ); + let outgoing_handle = outgoing( + stream_outgoing, + pen_outgoing.clone(), + id_incoming.clone(), + outgoing_result_for_service, + outgoing_data_for_user, + ); + ( + id_incoming, + pen_incoming, + id_outgoing, + pen_outgoing, + incoming_handle, + outgoing_handle, + data_from_incoming, + data_from_outgoing, + result_from_incoming, + result_from_outgoing, + ) + } + + #[tokio::test] + async fn send_data() { + let ( + _id_incoming, + _pen_incoming, + _id_outgoing, + _pen_outgoing, + incoming_handle, + outgoing_handle, + mut data_from_incoming, + mut data_from_outgoing, + mut result_from_incoming, + mut result_from_outgoing, + ) = prepare::>().await; + let incoming_handle = incoming_handle.fuse(); + let outgoing_handle = outgoing_handle.fuse(); + pin_mut!(incoming_handle); + pin_mut!(outgoing_handle); + let _data_for_outgoing = tokio::select! { + _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), + _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), + result = result_from_outgoing.next() => { + let (_, maybe_data_for_outgoing, connection_type) = result.expect("the chennel shouldn't be dropped"); + assert_eq!(connection_type, ConnectionType::New); + let data_for_outgoing = maybe_data_for_outgoing.expect("successfully connected"); + data_for_outgoing + .unbounded_send(vec![4, 3, 43]) + .expect("should send"); + data_for_outgoing + .unbounded_send(vec![2, 1, 3, 7]) + .expect("should send"); + data_for_outgoing + }, + }; + let _data_for_incoming = tokio::select! { + _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), + _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), + result = result_from_incoming.next() => { + let (_, maybe_data_for_incoming, connection_type) = result.expect("the chennel shouldn't be dropped"); + assert_eq!(connection_type, ConnectionType::New); + let data_for_incoming = maybe_data_for_incoming.expect("successfully connected"); + data_for_incoming + .unbounded_send(vec![5, 4, 44]) + .expect("should send"); + data_for_incoming + .unbounded_send(vec![3, 2, 4, 8]) + .expect("should send"); + data_for_incoming + }, + }; + tokio::select! { + _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), + _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), + v = data_from_incoming.next() => { + assert_eq!(v, Some(vec![4, 3, 43])); + }, + }; + tokio::select! { + _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), + _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), + v = data_from_incoming.next() => { + assert_eq!(v, Some(vec![2, 1, 3, 7])); + }, + }; + tokio::select! { + _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), + _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), + v = data_from_outgoing.next() => { + assert_eq!(v, Some(vec![5, 4, 44])); + }, + }; + tokio::select! { + _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), + _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), + v = data_from_outgoing.next() => { + assert_eq!(v, Some(vec![3, 2, 4, 8])); + }, + }; + } + + #[tokio::test] + async fn closed_by_parent_service() { + let ( + _id_incoming, + _pen_incoming, + id_outgoing, + _pen_outgoing, + incoming_handle, + outgoing_handle, + _data_from_incoming, + _data_from_outgoing, + mut result_from_incoming, + _result_from_outgoing, + ) = prepare::>().await; + let incoming_handle = incoming_handle.fuse(); + let outgoing_handle = outgoing_handle.fuse(); + pin_mut!(incoming_handle); + pin_mut!(outgoing_handle); + tokio::select! { + _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), + _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), + received = result_from_incoming.next() => { + // we drop the data sending channel, thus finishing incoming_handle + let (received_id, _, connection_type) = received.expect("the chennel shouldn't be dropped"); + assert_eq!(connection_type, ConnectionType::New); + assert_eq!(received_id, id_outgoing); + }, + }; + incoming_handle + .await + .expect("closed manually, should finish with no error"); + } + + #[tokio::test] + async fn parent_service_dead() { + let ( + _id_incoming, + _pen_incoming, + _id_outgoing, + _pen_outgoing, + incoming_handle, + outgoing_handle, + _data_from_incoming, + _data_from_outgoing, + result_from_incoming, + _result_from_outgoing, + ) = prepare::>().await; + std::mem::drop(result_from_incoming); + let incoming_handle = incoming_handle.fuse(); + let outgoing_handle = outgoing_handle.fuse(); + pin_mut!(incoming_handle); + pin_mut!(outgoing_handle); + tokio::select! { + e = &mut incoming_handle => match e { + Err(ProtocolError::NoParentConnection) => (), + Err(e) => panic!("unexpected error: {}", e), + Ok(_) => panic!("successfully finished when parent dead"), + }, + _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), + }; + } + + #[tokio::test] + async fn parent_user_dead() { + let ( + _id_incoming, + _pen_incoming, + _id_outgoing, + _pen_outgoing, + incoming_handle, + outgoing_handle, + data_from_incoming, + _data_from_outgoing, + _result_from_incoming, + mut result_from_outgoing, + ) = prepare::>().await; + std::mem::drop(data_from_incoming); + let incoming_handle = incoming_handle.fuse(); + let outgoing_handle = outgoing_handle.fuse(); + pin_mut!(incoming_handle); + pin_mut!(outgoing_handle); + let _data_for_outgoing = tokio::select! { + _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), + _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), + result = result_from_outgoing.next() => { + let (_, maybe_data_for_outgoing, connection_type) = result.expect("the chennel shouldn't be dropped"); + assert_eq!(connection_type, ConnectionType::New); + let data_for_outgoing = maybe_data_for_outgoing.expect("successfully connected"); + data_for_outgoing + .unbounded_send(vec![2, 1, 3, 7]) + .expect("should send"); + data_for_outgoing + }, + }; + tokio::select! { + e = &mut incoming_handle => match e { + Err(ProtocolError::NoUserConnection) => (), + Err(e) => panic!("unexpected error: {}", e), + Ok(_) => panic!("successfully finished when user dead"), + }, + _ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"), + }; + } + + #[tokio::test] + async fn sender_dead_before_handshake() { + let ( + _id_incoming, + _pen_incoming, + _id_outgoing, + _pen_outgoing, + incoming_handle, + outgoing_handle, + _data_from_incoming, + _data_from_outgoing, + _result_from_incoming, + _result_from_outgoing, + ) = prepare::>().await; + std::mem::drop(outgoing_handle); + match incoming_handle.await { + Err(ProtocolError::HandshakeError(_)) => (), + Err(e) => panic!("unexpected error: {}", e), + Ok(_) => panic!("successfully finished when connection dead"), + }; + } + + #[tokio::test] + async fn sender_dead_after_handshake() { + let ( + _id_incoming, + _pen_incoming, + _id_outgoing, + _pen_outgoing, + incoming_handle, + outgoing_handle, + _data_from_incoming, + _data_from_outgoing, + mut result_from_incoming, + _result_from_outgoing, + ) = prepare::>().await; + let incoming_handle = incoming_handle.fuse(); + pin_mut!(incoming_handle); + let (_, _exit, connection_type) = tokio::select! { + _ = &mut incoming_handle => panic!("incoming process unexpectedly finished"), + _ = outgoing_handle => panic!("outgoing process unexpectedly finished"), + out = result_from_incoming.next() => out.expect("should receive"), + }; + assert_eq!(connection_type, ConnectionType::New); + // outgoing_handle got consumed by tokio::select!, the sender is dead + match incoming_handle.await { + Err(ProtocolError::ReceiveError(_)) => (), + Err(e) => panic!("unexpected error: {}", e), + Ok(_) => panic!("successfully finished when connection dead"), + }; + } + + #[tokio::test] + async fn receiver_dead_before_handshake() { + let ( + _id_incoming, + _pen_incoming, + _id_outgoing, + _pen_outgoing, + incoming_handle, + outgoing_handle, + _data_from_incoming, + _data_from_outgoing, + _result_from_incoming, + _result_from_outgoing, + ) = prepare::>().await; + std::mem::drop(incoming_handle); + match outgoing_handle.await { + Err(ProtocolError::HandshakeError(_)) => (), + Err(e) => panic!("unexpected error: {}", e), + Ok(_) => panic!("successfully finished when connection dead"), + }; + } +} diff --git a/finality-aleph/src/validator_network/service.rs b/finality-aleph/src/validator_network/service.rs index ead09e5256..c1c94c5ebe 100644 --- a/finality-aleph/src/validator_network/service.rs +++ b/finality-aleph/src/validator_network/service.rs @@ -1,17 +1,20 @@ +use std::collections::HashSet; + use aleph_primitives::AuthorityId; use futures::{ channel::{mpsc, oneshot}, StreamExt, }; -use log::{info, trace, warn}; +use log::{debug, info, trace, warn}; use tokio::time; use crate::{ crypto::AuthorityPen, validator_network::{ incoming::incoming, - manager::{AddResult, Manager}, + manager::{AddResult, LegacyManager, Manager}, outgoing::outgoing, + protocols::{ConnectionType, ResultForService}, Data, Dialer, Listener, Network, }, SpawnTaskHandle, STATUS_REPORT_INTERVAL, @@ -79,6 +82,9 @@ pub struct Service, NL: Listener> { listener: NL, spawn_handle: SpawnTaskHandle, authority_pen: AuthorityPen, + // Backwards compatibility with the one-sided connections, remove when no longer needed. + legacy_connected: HashSet, + legacy_manager: LegacyManager, } impl, NL: Listener> Service { @@ -97,11 +103,13 @@ impl, NL: Listener> Service { Self { commands_from_interface, next_to_interface, - manager: Manager::new(), + manager: Manager::new(authority_pen.authority_id()), dialer, listener, spawn_handle, authority_pen, + legacy_connected: HashSet::new(), + legacy_manager: LegacyManager::new(), }, ServiceInterface { commands_for_service, @@ -114,20 +122,29 @@ impl, NL: Listener> Service { &self, peer_id: AuthorityId, addresses: Vec, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, Option>)>, + result_for_parent: mpsc::UnboundedSender>, ) { let authority_pen = self.authority_pen.clone(); let dialer = self.dialer.clone(); + let next_to_interface = self.next_to_interface.clone(); self.spawn_handle .spawn("aleph/validator_network_outgoing", None, async move { - outgoing(authority_pen, peer_id, dialer, addresses, result_for_parent).await; + outgoing( + authority_pen, + peer_id, + dialer, + addresses, + result_for_parent, + next_to_interface, + ) + .await; }); } fn spawn_new_incoming( &self, stream: NL::Connection, - result_for_parent: mpsc::UnboundedSender<(AuthorityId, oneshot::Sender<()>)>, + result_for_parent: mpsc::UnboundedSender>, ) { let authority_pen = self.authority_pen.clone(); let next_to_interface = self.next_to_interface.clone(); @@ -137,23 +154,37 @@ impl, NL: Listener> Service { }); } + fn peer_addresses(&self, peer_id: &AuthorityId) -> Option> { + match self.legacy_connected.contains(peer_id) { + true => self.legacy_manager.peer_addresses(peer_id), + false => self.manager.peer_addresses(peer_id), + } + } + + fn add_connection( + &mut self, + peer_id: AuthorityId, + data_for_network: mpsc::UnboundedSender, + connection_type: ConnectionType, + ) -> AddResult { + use ConnectionType::*; + match connection_type { + New => self.manager.add_connection(peer_id, data_for_network), + LegacyIncoming => self.legacy_manager.add_incoming(peer_id, data_for_network), + LegacyOutgoing => self.legacy_manager.add_outgoing(peer_id, data_for_network), + } + } + /// Run the service until a signal from exit. pub async fn run(mut self, mut exit: oneshot::Receiver<()>) { let mut status_ticker = time::interval(STATUS_REPORT_INTERVAL); - // channel used to receive tuple (peer_id, exit_handle) from a spawned worker - // that has just established an incoming connection - // exit_handle may be used to kill the worker later - let (incoming_result_for_parent, mut incoming_workers) = mpsc::unbounded(); - // channel used to receive information about failure from a spawned worker - // that managed an outgoing connection - // the received peer_id can be used to spawn another worker - let (outgoing_result_for_parent, mut outgoing_workers) = mpsc::unbounded(); + let (result_for_parent, mut worker_results) = mpsc::unbounded(); use ServiceCommand::*; loop { tokio::select! { // got new incoming connection from the listener - spawn an incoming worker maybe_stream = self.listener.accept() => match maybe_stream { - Ok(stream) => self.spawn_new_incoming(stream, incoming_result_for_parent.clone()), + Ok(stream) => self.spawn_new_incoming(stream, result_for_parent.clone()), Err(e) => warn!(target: "validator-network", "Listener failed to accept connection: {}", e), }, // got a new command from the interface @@ -161,52 +192,84 @@ impl, NL: Listener> Service { // register new peer in manager or update its list of addresses if already there // spawn a worker managing outgoing connection if the peer was not known AddConnection(peer_id, addresses) => { + // we add all the peers to the legacy manager so we don't lose the + // addresses, but only care about its opinion when it turns out we have to + // in particular the first time we add a peer we never know whether it + // requires legacy connecting, so we only attempt to connect to it if the + // new criterion is satisfied, otherwise we wait for it to connect to us + self.legacy_manager.add_peer(peer_id.clone(), addresses.clone()); if self.manager.add_peer(peer_id.clone(), addresses.clone()) { - self.spawn_new_outgoing(peer_id, addresses, outgoing_result_for_parent.clone()); + self.spawn_new_outgoing(peer_id, addresses, result_for_parent.clone()); }; }, // remove the peer from the manager all workers will be killed automatically, due to closed channels DelConnection(peer_id) => { self.manager.remove_peer(&peer_id); + self.legacy_manager.remove_peer(&peer_id); + self.legacy_connected.remove(&peer_id); }, // pass the data to the manager SendData(data, peer_id) => { - match self.manager.send_to(&peer_id, data) { - Ok(_) => trace!(target: "validator-network", "Sending data to {}.", peer_id), - Err(e) => trace!(target: "validator-network", "Failed sending to {}: {}", peer_id, e), + match self.legacy_connected.contains(&peer_id) { + true => match self.legacy_manager.send_to(&peer_id, data) { + Ok(_) => trace!(target: "validator-network", "Sending data to {} through legacy.", peer_id), + Err(e) => trace!(target: "validator-network", "Failed sending to {} through legacy: {}", peer_id, e), + }, + false => match self.manager.send_to(&peer_id, data) { + Ok(_) => trace!(target: "validator-network", "Sending data to {}.", peer_id), + Err(e) => trace!(target: "validator-network", "Failed sending to {}: {}", peer_id, e), + }, } }, }, - // received tuple (peer_id, exit_handle) from a spawned worker - // that has just established an incoming connection - // pass the tuple to the manager to register the connection - // the manager will be responsible for killing the worker if necessary - Some((peer_id, exit)) = incoming_workers.next() => { - use AddResult::*; - match self.manager.add_incoming(peer_id.clone(), exit) { - Uninterested => info!(target: "validator-network", "Peer {} connected to us despite out lack of interest.", peer_id), - Added => info!(target: "validator-network", "New incoming connection for peer {}.", peer_id), - Replaced => info!(target: "validator-network", "Replaced incoming connection for peer {}.", peer_id), - } - }, - // received information from a spawned worker managing an outgoing connection + // received information from a spawned worker managing a connection // check if we still want to be connected to the peer, and if so, spawn a new worker or actually add proper connection - Some((peer_id, maybe_data_for_network)) = outgoing_workers.next() => { - use AddResult::*; - if let Some(addresses) = self.manager.peer_addresses(&peer_id) { - match maybe_data_for_network { - Some(data_for_network) => match self.manager.add_outgoing(peer_id.clone(), data_for_network) { - Uninterested => warn!(target: "validator-network", "We connected to peer {} for unknown reasons.", peer_id), - Added => info!(target: "validator-network", "New outgoing connection to peer {}.", peer_id), - Replaced => info!(target: "validator-network", "Replaced outgoing connection to peer {}.", peer_id), - }, - None => self.spawn_new_outgoing(peer_id, addresses, outgoing_result_for_parent.clone()), + Some((peer_id, maybe_data_for_network, connection_type)) = worker_results.next() => { + use ConnectionType::*; + let spawn_new_legacy_connection = match connection_type { + LegacyIncoming => { + self.manager.remove_peer(&peer_id); + self.legacy_connected.insert(peer_id.clone()) + }, + LegacyOutgoing => { + self.manager.remove_peer(&peer_id); + self.legacy_connected.insert(peer_id.clone()); + false } + New => { + // We always return New if connecting fails, so only New+Some means we + // actually negotiated the new protocol, otherwise we should stay + // with our previous guess. + if maybe_data_for_network.is_some() { + self.legacy_connected.remove(&peer_id); + } + false + }, }; + if spawn_new_legacy_connection { + match self.legacy_manager.peer_addresses(&peer_id) { + Some(addresses) => self.spawn_new_outgoing(peer_id.clone(), addresses, result_for_parent.clone()), + None => { + self.legacy_connected.remove(&peer_id); + }, + } + } + use AddResult::*; + match maybe_data_for_network { + Some(data_for_network) => match self.add_connection(peer_id.clone(), data_for_network, connection_type) { + Uninterested => warn!(target: "validator-network", "Established connection with peer {} for unknown reasons.", peer_id), + Added => info!(target: "validator-network", "New connection with peer {}.", peer_id), + Replaced => info!(target: "validator-network", "Replaced connection with peer {}.", peer_id), + }, + None => if let Some(addresses) = self.peer_addresses(&peer_id) { + self.spawn_new_outgoing(peer_id, addresses, result_for_parent.clone()); + } + } }, // periodically reporting what we are trying to do _ = status_ticker.tick() => { - info!(target: "validator-network", "Validator Network status: {}", self.manager.status_report()) + info!(target: "validator-network", "Validator Network status: {}", self.manager.status_report()); + debug!(target: "validator-network", "Validator Network legacy status: {}", self.legacy_manager.status_report()); } // received exit signal, stop the network // all workers will be killed automatically after the manager gets dropped