diff --git a/finality-aleph/src/network/service.rs b/finality-aleph/src/network/service.rs index 83f4ea22f4..154ac69b12 100644 --- a/finality-aleph/src/network/service.rs +++ b/finality-aleph/src/network/service.rs @@ -4,7 +4,6 @@ use std::{ iter, }; -use aleph_primitives::AuthorityId; use futures::{channel::mpsc, StreamExt}; use log::{debug, error, info, trace, warn}; use sc_service::SpawnTaskHandle; @@ -16,7 +15,7 @@ use crate::{ AddressedData, ConnectionCommand, Data, Event, EventStream, Multiaddress, Network, NetworkSender, Protocol, }, - validator_network::Network as ValidatorNetwork, + validator_network::{Network as ValidatorNetwork, PublicKey}, STATUS_REPORT_INTERVAL, }; @@ -33,9 +32,11 @@ pub struct Service< N: Network, D: Data, VD: Data, - A: Data + Multiaddress, - VN: ValidatorNetwork, -> { + A: Data + Multiaddress, + VN: ValidatorNetwork, +> where + A::PeerId: PublicKey, +{ network: N, validator_network: VN, data_from_user: mpsc::UnboundedReceiver>, @@ -85,9 +86,11 @@ impl< N: Network, D: Data, VD: Data, - A: Data + Multiaddress, - VN: ValidatorNetwork, + A: Data + Multiaddress, + VN: ValidatorNetwork, > Service +where + A::PeerId: PublicKey, { pub fn new( network: N, diff --git a/finality-aleph/src/nodes/validator_node.rs b/finality-aleph/src/nodes/validator_node.rs index d35c7e8c0c..9cb3dda2a3 100644 --- a/finality-aleph/src/nodes/validator_node.rs +++ b/finality-aleph/src/nodes/validator_node.rs @@ -22,8 +22,8 @@ use crate::{ ConsensusParty, ConsensusPartyParams, }, session_map::{AuthorityProviderImpl, FinalityNotificatorImpl, SessionMapUpdater}, - tcp_network::new_tcp_network, - validator_network::{Service, KEY_TYPE}, + tcp_network::{new_tcp_network, KEY_TYPE}, + validator_network::Service, AlephConfig, }; diff --git a/finality-aleph/src/tcp_network.rs b/finality-aleph/src/tcp_network.rs index e338344d18..9c0dd4bf4a 100644 --- a/finality-aleph/src/tcp_network.rs +++ b/finality-aleph/src/tcp_network.rs @@ -3,16 +3,20 @@ use std::{io::Result as IoResult, net::ToSocketAddrs as _}; use aleph_primitives::AuthorityId; use codec::{Decode, Encode}; use log::info; +use sp_core::crypto::KeyTypeId; use tokio::net::{ tcp::{OwnedReadHalf, OwnedWriteHalf}, TcpListener, TcpStream, ToSocketAddrs, }; use crate::{ + crypto::{verify, AuthorityPen, Signature}, network::{Multiaddress, NetworkIdentity, PeerId}, - validator_network::{ConnectionInfo, Dialer, Listener, Splittable}, + validator_network::{ConnectionInfo, Dialer, Listener, PublicKey, SecretKey, Splittable}, }; +pub const KEY_TYPE: KeyTypeId = KeyTypeId(*b"a0vn"); + impl ConnectionInfo for TcpStream { fn peer_address_info(&self) -> String { match self.peer_addr() { @@ -66,6 +70,28 @@ impl Listener for TcpListener { impl PeerId for AuthorityId {} +impl PublicKey for AuthorityId { + type Signature = Signature; + + fn verify(&self, message: &[u8], signature: &Self::Signature) -> bool { + verify(self, message, signature) + } +} + +#[async_trait::async_trait] +impl SecretKey for AuthorityPen { + type Signature = Signature; + type PublicKey = AuthorityId; + + async fn sign(&self, message: &[u8]) -> Self::Signature { + AuthorityPen::sign(self, message).await + } + + fn public_key(&self) -> Self::PublicKey { + self.authority_id() + } +} + /// A representation of a single TCP address with an associated peer ID. #[derive(Debug, Hash, Encode, Decode, Clone, PartialEq, Eq)] pub struct TcpMultiaddress { diff --git a/finality-aleph/src/testing/mocks/validator_network.rs b/finality-aleph/src/testing/mocks/validator_network.rs index b9efbbbf1a..2ea14775b3 100644 --- a/finality-aleph/src/testing/mocks/validator_network.rs +++ b/finality-aleph/src/testing/mocks/validator_network.rs @@ -59,7 +59,7 @@ pub struct MockNetwork { } #[async_trait::async_trait] -impl Network for MockNetwork { +impl Network for MockNetwork { fn add_connection(&mut self, peer: AuthorityId, addresses: Vec) { self.add_connection.send((peer, addresses)); } diff --git a/finality-aleph/src/validator_network/crypto.rs b/finality-aleph/src/validator_network/crypto.rs new file mode 100644 index 0000000000..74891a6317 --- /dev/null +++ b/finality-aleph/src/validator_network/crypto.rs @@ -0,0 +1,26 @@ +use std::{fmt::Display, hash::Hash}; + +use codec::Codec; + +/// A public key for signature verification. +pub trait PublicKey: + Send + Sync + Eq + Clone + AsRef<[u8]> + Display + Hash + Codec + 'static +{ + type Signature: Send + Sync + Clone + Codec; + + /// Verify whether the message has been signed with the associated private key. + fn verify(&self, message: &[u8], signature: &Self::Signature) -> bool; +} + +/// Secret key for signing messages, with an associated public key. +#[async_trait::async_trait] +pub trait SecretKey: Clone + Send + Sync + 'static { + type Signature: Send + Sync + Clone + Codec; + type PublicKey: PublicKey; + + /// Produce a signature for the provided message. + async fn sign(&self, message: &[u8]) -> Self::Signature; + + /// Return the associated public key. + fn public_key(&self) -> Self::PublicKey; +} diff --git a/finality-aleph/src/validator_network/incoming.rs b/finality-aleph/src/validator_network/incoming.rs index a8ec06bad4..719092759e 100644 --- a/finality-aleph/src/validator_network/incoming.rs +++ b/finality-aleph/src/validator_network/incoming.rs @@ -3,20 +3,17 @@ use std::fmt::{Display, Error as FmtError, Formatter}; use futures::channel::mpsc; use log::{debug, info}; -use crate::{ - crypto::AuthorityPen, - validator_network::{ - protocols::{protocol, ProtocolError, ProtocolNegotiationError, ResultForService}, - Data, Splittable, - }, +use crate::validator_network::{ + protocols::{protocol, ProtocolError, ProtocolNegotiationError, ResultForService}, + Data, PublicKey, SecretKey, Splittable, }; -enum IncomingError { +enum IncomingError { ProtocolNegotiationError(ProtocolNegotiationError), - ProtocolError(ProtocolError), + ProtocolError(ProtocolError), } -impl Display for IncomingError { +impl Display for IncomingError { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { use IncomingError::*; match self { @@ -26,45 +23,45 @@ impl Display for IncomingError { } } -impl From for IncomingError { +impl From for IncomingError { fn from(e: ProtocolNegotiationError) -> Self { IncomingError::ProtocolNegotiationError(e) } } -impl From for IncomingError { - fn from(e: ProtocolError) -> Self { +impl From> for IncomingError { + fn from(e: ProtocolError) -> Self { IncomingError::ProtocolError(e) } } -async fn manage_incoming( - authority_pen: AuthorityPen, +async fn manage_incoming( + secret_key: SK, stream: S, - result_for_parent: mpsc::UnboundedSender>, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, -) -> Result<(), IncomingError> { +) -> Result<(), IncomingError> { debug!(target: "validator-network", "Performing incoming protocol negotiation."); let (stream, protocol) = protocol(stream).await?; debug!(target: "validator-network", "Negotiated protocol, running."); Ok(protocol - .manage_incoming(stream, authority_pen, result_for_parent, data_for_user) + .manage_incoming(stream, secret_key, result_for_parent, data_for_user) .await?) } -/// Manage an incoming connection. After the handshake it will send the recognized AuthorityId to +/// Manage an incoming connection. After the handshake it will send the recognized PublicKey to /// the parent, together with an exit channel for this process. When this channel is dropped the /// process ends. Whenever data arrives on this connection it will be passed to the user. Any /// failures in receiving data result in the process stopping, we assume the other side will /// reestablish it if necessary. -pub async fn incoming( - authority_pen: AuthorityPen, +pub async fn incoming( + secret_key: SK, stream: S, - result_for_parent: mpsc::UnboundedSender>, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) { let addr = stream.peer_address_info(); - if let Err(e) = manage_incoming(authority_pen, stream, result_for_parent, data_for_user).await { + if let Err(e) = manage_incoming(secret_key, stream, result_for_parent, data_for_user).await { info!(target: "validator-network", "Incoming connection from {} failed: {}.", addr, e); } } diff --git a/finality-aleph/src/validator_network/manager/direction.rs b/finality-aleph/src/validator_network/manager/direction.rs index 042a57c607..7d32eab0f1 100644 --- a/finality-aleph/src/validator_network/manager/direction.rs +++ b/finality-aleph/src/validator_network/manager/direction.rs @@ -3,16 +3,14 @@ use std::{ ops::BitXor, }; -use aleph_primitives::AuthorityId; - -use crate::validator_network::Data; +use crate::validator_network::{Data, PublicKey}; /// Data about peers we know and whether we should connect to them or they to us. For the former /// case also keeps the peers' addresses. -pub struct DirectedPeers { - own_id: AuthorityId, - outgoing: HashMap>, - incoming: HashSet, +pub struct DirectedPeers { + own_id: PK, + outgoing: HashMap>, + incoming: HashSet, } /// Whether we should call the remote or the other way around. We xor the peer ids and based on the @@ -29,9 +27,9 @@ fn should_we_call(own_id: &[u8], remote_id: &[u8]) -> bool { } } -impl DirectedPeers { +impl DirectedPeers { /// Create a new set of peers directed using our own peer id. - pub fn new(own_id: AuthorityId) -> Self { + pub fn new(own_id: PK) -> Self { DirectedPeers { own_id, outgoing: HashMap::new(), @@ -44,7 +42,7 @@ impl DirectedPeers { /// Returns whether we should start attempts at connecting with the peer, which is the case /// exactly when the peer is one with which we should attempt connections AND it was added for /// the first time. - pub fn add_peer(&mut self, peer_id: AuthorityId, addresses: Vec) -> bool { + pub fn add_peer(&mut self, peer_id: PK, addresses: Vec) -> bool { match should_we_call(self.own_id.as_ref(), peer_id.as_ref()) { true => self.outgoing.insert(peer_id, addresses).is_none(), false => { @@ -57,28 +55,28 @@ impl DirectedPeers { } /// Return the addresses of the given peer, or None if we shouldn't attempt connecting with the peer. - pub fn peer_addresses(&self, peer_id: &AuthorityId) -> Option> { + pub fn peer_addresses(&self, peer_id: &PK) -> Option> { self.outgoing.get(peer_id).cloned() } /// Whether we should be maintaining a connection with this peer. - pub fn interested(&self, peer_id: &AuthorityId) -> bool { + pub fn interested(&self, peer_id: &PK) -> bool { self.incoming.contains(peer_id) || self.outgoing.contains_key(peer_id) } /// Iterator over the peers we want connections from. - pub fn incoming_peers(&self) -> impl Iterator { + pub fn incoming_peers(&self) -> impl Iterator { self.incoming.iter() } /// Iterator over the peers we want to connect to. - pub fn outgoing_peers(&self) -> impl Iterator { + pub fn outgoing_peers(&self) -> impl Iterator { self.outgoing.keys() } /// Remove a peer from the list of peers that we want to stay connected with, whether the /// connection was supposed to be incoming or outgoing. - pub fn remove_peer(&mut self, peer_id: &AuthorityId) { + pub fn remove_peer(&mut self, peer_id: &PK) { self.incoming.remove(peer_id); self.outgoing.remove(peer_id); } @@ -93,7 +91,7 @@ mod tests { type Address = String; - async fn container_with_id() -> (DirectedPeers
, AuthorityId) { + async fn container_with_id() -> (DirectedPeers, AuthorityId) { let (id, _) = key().await; let container = DirectedPeers::new(id.clone()); (container, id) @@ -118,7 +116,8 @@ mod tests { ); } - async fn container_with_added_connecting_peer() -> (DirectedPeers
, AuthorityId) { + async fn container_with_added_connecting_peer( + ) -> (DirectedPeers, AuthorityId) { let (mut container0, id0) = container_with_id().await; let (mut container1, id1) = container_with_id().await; let addresses = some_addresses(); @@ -131,7 +130,8 @@ mod tests { } } - async fn container_with_added_nonconnecting_peer() -> (DirectedPeers
, AuthorityId) { + async fn container_with_added_nonconnecting_peer( + ) -> (DirectedPeers, AuthorityId) { let (mut container0, id0) = container_with_id().await; let (mut container1, id1) = container_with_id().await; let addresses = some_addresses(); diff --git a/finality-aleph/src/validator_network/manager/legacy.rs b/finality-aleph/src/validator_network/manager/legacy.rs index f305c716f8..6e7e13697c 100644 --- a/finality-aleph/src/validator_network/manager/legacy.rs +++ b/finality-aleph/src/validator_network/manager/legacy.rs @@ -3,35 +3,34 @@ use std::{ fmt::{Display, Error as FmtError, Formatter}, }; -use aleph_primitives::AuthorityId; use futures::channel::mpsc; use crate::{ network::PeerId, validator_network::{ manager::{AddResult, SendError}, - Data, + Data, PublicKey, }, }; /// Network component responsible for holding the list of peers that we /// want to connect to, and managing the established connections. -pub struct Manager { - addresses: HashMap>, - outgoing: HashMap>, - incoming: HashMap>, +pub struct Manager { + addresses: HashMap>, + outgoing: HashMap>, + incoming: HashMap>, } -struct ManagerStatus { +struct ManagerStatus { wanted_peers: usize, - both_ways_peers: HashSet, - outgoing_peers: HashSet, - incoming_peers: HashSet, - missing_peers: HashSet, + both_ways_peers: HashSet, + outgoing_peers: HashSet, + incoming_peers: HashSet, + missing_peers: HashSet, } -impl ManagerStatus { - fn new(manager: &Manager) -> Self { +impl ManagerStatus { + fn new(manager: &Manager) -> Self { let incoming: HashSet<_> = manager .incoming .iter() @@ -65,7 +64,7 @@ impl ManagerStatus { } } -impl Display for ManagerStatus { +impl Display for ManagerStatus { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { if self.wanted_peers == 0 { return write!(f, "not maintaining any connections; "); @@ -81,7 +80,7 @@ impl Display for ManagerStatus { let peers = self .both_ways_peers .iter() - .map(|authority_id| authority_id.to_short_string()) + .map(|peer_id| peer_id.to_short_string()) .collect::>() .join(", "); write!( @@ -96,7 +95,7 @@ impl Display for ManagerStatus { let peers = self .incoming_peers .iter() - .map(|authority_id| authority_id.to_short_string()) + .map(|peer_id| peer_id.to_short_string()) .collect::>() .join(", "); write!( @@ -111,7 +110,7 @@ impl Display for ManagerStatus { let peers = self .outgoing_peers .iter() - .map(|authority_id| authority_id.to_short_string()) + .map(|peer_id| peer_id.to_short_string()) .collect::>() .join(", "); write!( @@ -126,7 +125,7 @@ impl Display for ManagerStatus { let peers = self .missing_peers .iter() - .map(|authority_id| authority_id.to_short_string()) + .map(|peer_id| peer_id.to_short_string()) .collect::>() .join(", "); write!(f, "missing - {:?} [{}];", self.missing_peers.len(), peers)?; @@ -136,7 +135,7 @@ impl Display for ManagerStatus { } } -impl Manager { +impl Manager { /// Create a new Manager with empty list of peers. pub fn new() -> Self { Manager { @@ -149,13 +148,13 @@ impl Manager { /// Add a peer to the list of peers we want to stay connected to, or /// update the list of addresses if the peer was already added. /// Returns whether this peer is a new peer. - pub fn add_peer(&mut self, peer_id: AuthorityId, addresses: Vec) -> bool { + pub fn add_peer(&mut self, peer_id: PK, addresses: Vec) -> bool { self.addresses.insert(peer_id, addresses).is_none() } /// Return Option containing addresses of the given peer, or None if /// the peer is unknown. - pub fn peer_addresses(&self, peer_id: &AuthorityId) -> Option> { + pub fn peer_addresses(&self, peer_id: &PK) -> Option> { self.addresses.get(peer_id).cloned() } @@ -163,7 +162,7 @@ impl Manager { /// but only if the peer is on the list of peers that we want to stay connected with. pub fn add_outgoing( &mut self, - peer_id: AuthorityId, + peer_id: PK, data_for_network: mpsc::UnboundedSender, ) -> AddResult { use AddResult::*; @@ -178,11 +177,7 @@ impl Manager { /// Add an established incoming connection with a known peer, /// but only if the peer is on the list of peers that we want to stay connected with. - pub fn add_incoming( - &mut self, - peer_id: AuthorityId, - exit: mpsc::UnboundedSender, - ) -> AddResult { + pub fn add_incoming(&mut self, peer_id: PK, exit: mpsc::UnboundedSender) -> AddResult { use AddResult::*; if !self.addresses.contains_key(&peer_id) { return Uninterested; @@ -195,7 +190,7 @@ impl Manager { /// Remove a peer from the list of peers that we want to stay connected with. /// Close any incoming and outgoing connections that were established. - pub fn remove_peer(&mut self, peer_id: &AuthorityId) { + pub fn remove_peer(&mut self, peer_id: &PK) { self.addresses.remove(peer_id); self.incoming.remove(peer_id); self.outgoing.remove(peer_id); @@ -204,7 +199,7 @@ impl Manager { /// Send data to a peer. /// Returns error if there is no outgoing connection to the peer, /// or if the connection is dead. - pub fn send_to(&mut self, peer_id: &AuthorityId, data: D) -> Result<(), SendError> { + pub fn send_to(&mut self, peer_id: &PK, data: D) -> Result<(), SendError> { self.outgoing .get(peer_id) .ok_or(SendError::PeerNotFound)? @@ -220,6 +215,7 @@ impl Manager { #[cfg(test)] mod tests { + use aleph_primitives::AuthorityId; use futures::{channel::mpsc, StreamExt}; use super::{AddResult::*, Manager, SendError}; @@ -230,7 +226,7 @@ mod tests { #[tokio::test] async fn add_remove() { - let mut manager = Manager::::new(); + let mut manager = Manager::::new(); let (peer_id, _) = key().await; let (peer_id_b, _) = key().await; let addresses = vec![ @@ -254,7 +250,7 @@ mod tests { #[tokio::test] async fn outgoing() { - let mut manager = Manager::::new(); + let mut manager = Manager::::new(); let data = String::from("DATA"); let (peer_id, _) = key().await; let addresses = vec![ @@ -285,7 +281,7 @@ mod tests { #[tokio::test] async fn incoming() { - let mut manager = Manager::::new(); + let mut manager = Manager::::new(); let (peer_id, _) = key().await; let addresses = vec![ String::from(""), diff --git a/finality-aleph/src/validator_network/manager/mod.rs b/finality-aleph/src/validator_network/manager/mod.rs index bbfe0641b5..dd9288e7d7 100644 --- a/finality-aleph/src/validator_network/manager/mod.rs +++ b/finality-aleph/src/validator_network/manager/mod.rs @@ -3,10 +3,12 @@ use std::{ fmt::{Display, Error as FmtError, Formatter}, }; -use aleph_primitives::AuthorityId; use futures::channel::mpsc; -use crate::{network::PeerId, validator_network::Data}; +use crate::{ + network::PeerId, + validator_network::{Data, PublicKey}, +}; mod direction; mod legacy; @@ -44,15 +46,15 @@ pub enum AddResult { Replaced, } -struct ManagerStatus { - outgoing_peers: HashSet, - missing_outgoing: HashSet, - incoming_peers: HashSet, - missing_incoming: HashSet, +struct ManagerStatus { + outgoing_peers: HashSet, + missing_outgoing: HashSet, + incoming_peers: HashSet, + missing_incoming: HashSet, } -impl ManagerStatus { - fn new(manager: &Manager) -> Self { +impl ManagerStatus { + fn new(manager: &Manager) -> Self { let mut incoming_peers = HashSet::new(); let mut missing_incoming = HashSet::new(); let mut outgoing_peers = HashSet::new(); @@ -87,14 +89,14 @@ impl ManagerStatus { } } -fn pretty_authority_id_set(set: &HashSet) -> String { +fn pretty_peer_id_set(set: &HashSet) -> String { set.iter() - .map(|authority_id| authority_id.to_short_string()) + .map(|peer_id| peer_id.to_short_string()) .collect::>() .join(", ") } -impl Display for ManagerStatus { +impl Display for ManagerStatus { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { let wanted_incoming = self.wanted_incoming(); let wanted_outgoing = self.wanted_outgoing(); @@ -114,7 +116,7 @@ impl Display for ManagerStatus { f, "have - {:?} [{}]; ", self.incoming_peers.len(), - pretty_authority_id_set(&self.incoming_peers), + pretty_peer_id_set(&self.incoming_peers), )?, } if !self.missing_incoming.is_empty() { @@ -122,7 +124,7 @@ impl Display for ManagerStatus { f, "missing - {:?} [{}]; ", self.missing_incoming.len(), - pretty_authority_id_set(&self.missing_incoming), + pretty_peer_id_set(&self.missing_incoming), )?; } } @@ -137,7 +139,7 @@ impl Display for ManagerStatus { f, "have - {:?} [{}]; ", self.incoming_peers.len(), - pretty_authority_id_set(&self.outgoing_peers), + pretty_peer_id_set(&self.outgoing_peers), )?; } if !self.missing_outgoing.is_empty() { @@ -145,7 +147,7 @@ impl Display for ManagerStatus { f, "missing - {:?} [{}]; ", self.missing_incoming.len(), - pretty_authority_id_set(&self.missing_outgoing), + pretty_peer_id_set(&self.missing_outgoing), )?; } } @@ -158,23 +160,23 @@ impl Display for ManagerStatus { /// Network component responsible for holding the list of peers that we /// want to connect to or let them connect to us, and managing the established /// connections. -pub struct Manager { +pub struct Manager { // Which peers we want to be connected with, and which way. - wanted: DirectedPeers, + wanted: DirectedPeers, // This peers we are connected with. We ensure that this is always a subset of what we want. - have: HashMap>, + have: HashMap>, } -impl Manager { +impl Manager { /// Create a new Manager with empty list of peers. - pub fn new(own_id: AuthorityId) -> Self { + pub fn new(own_id: PK) -> Self { Manager { wanted: DirectedPeers::new(own_id), have: HashMap::new(), } } - fn active_connection(&self, peer_id: &AuthorityId) -> bool { + fn active_connection(&self, peer_id: &PK) -> bool { self.have .get(peer_id) .map(|sender| !sender.is_closed()) @@ -186,19 +188,19 @@ impl Manager { /// Returns whether we should start attempts at connecting with the peer, which depends on the /// coorddinated pseudorandom decision on the direction of the connection and whether this was /// added for the first time. - pub fn add_peer(&mut self, peer_id: AuthorityId, addresses: Vec) -> bool { + pub fn add_peer(&mut self, peer_id: PK, addresses: Vec) -> bool { self.wanted.add_peer(peer_id, addresses) } /// Return the addresses of the given peer, or None if we shouldn't attempt connecting with the peer. - pub fn peer_addresses(&self, peer_id: &AuthorityId) -> Option> { + pub fn peer_addresses(&self, peer_id: &PK) -> Option> { self.wanted.peer_addresses(peer_id) } /// Add an established connection with a known peer, but only if the peer is among the peers we want to be connected to. pub fn add_connection( &mut self, - peer_id: AuthorityId, + peer_id: PK, data_for_network: mpsc::UnboundedSender, ) -> AddResult { use AddResult::*; @@ -213,7 +215,7 @@ impl Manager { /// Remove a peer from the list of peers that we want to stay connected with. /// Close any incoming and outgoing connections that were established. - pub fn remove_peer(&mut self, peer_id: &AuthorityId) { + pub fn remove_peer(&mut self, peer_id: &PK) { self.wanted.remove_peer(peer_id); self.have.remove(peer_id); } @@ -221,7 +223,7 @@ impl Manager { /// Send data to a peer. /// Returns error if there is no outgoing connection to the peer, /// or if the connection is dead. - pub fn send_to(&mut self, peer_id: &AuthorityId, data: D) -> Result<(), SendError> { + pub fn send_to(&mut self, peer_id: &PK, data: D) -> Result<(), SendError> { self.have .get(peer_id) .ok_or(SendError::PeerNotFound)? @@ -237,6 +239,7 @@ impl Manager { #[cfg(test)] mod tests { + use aleph_primitives::AuthorityId; use futures::{channel::mpsc, StreamExt}; use super::{AddResult::*, Manager, SendError}; @@ -248,7 +251,7 @@ mod tests { #[tokio::test] async fn add_remove() { let (own_id, _) = key().await; - let mut manager = Manager::::new(own_id); + let mut manager = Manager::::new(own_id); let (peer_id, _) = key().await; let (peer_id_b, _) = key().await; let addresses = vec![ @@ -280,9 +283,11 @@ mod tests { #[tokio::test] async fn send_receive() { let (mut connecting_id, _) = key().await; - let mut connecting_manager = Manager::::new(connecting_id.clone()); + let mut connecting_manager = + Manager::::new(connecting_id.clone()); let (mut listening_id, _) = key().await; - let mut listening_manager = Manager::::new(listening_id.clone()); + let mut listening_manager = + Manager::::new(listening_id.clone()); let data = String::from("DATA"); let addresses = vec![ String::from(""), diff --git a/finality-aleph/src/validator_network/mod.rs b/finality-aleph/src/validator_network/mod.rs index b9f3d2524b..5a35646ec0 100644 --- a/finality-aleph/src/validator_network/mod.rs +++ b/finality-aleph/src/validator_network/mod.rs @@ -1,10 +1,9 @@ use std::fmt::Display; -use aleph_primitives::AuthorityId; use codec::Codec; -use sp_core::crypto::KeyTypeId; use tokio::io::{AsyncRead, AsyncWrite}; +mod crypto; mod incoming; mod io; mod manager; @@ -14,10 +13,9 @@ mod outgoing; mod protocols; mod service; +pub use crypto::{PublicKey, SecretKey}; pub use service::Service; -pub const KEY_TYPE: KeyTypeId = KeyTypeId(*b"a0vn"); - /// What the data sent using the network has to satisfy. pub trait Data: Clone + Codec + Send + Sync + 'static {} @@ -31,16 +29,16 @@ impl Data for D {} /// implementation might fail to deliver any specific message, so messages have to be resent while /// they still should be delivered. #[async_trait::async_trait] -pub trait Network: Send + 'static { +pub trait Network: Send + 'static { /// Add the peer to the set of connected peers. - fn add_connection(&mut self, peer: AuthorityId, addresses: Vec); + fn add_connection(&mut self, peer: PK, addresses: Vec); /// Remove the peer from the set of connected peers and close the connection. - fn remove_connection(&mut self, peer: AuthorityId); + fn remove_connection(&mut self, peer: PK); /// Send a message to a single peer. /// This function should be implemented in a non-blocking manner. - fn send(&self, data: D, recipient: AuthorityId); + fn send(&self, data: D, recipient: PK); /// Receive a message from the network. async fn next(&mut self) -> Option; diff --git a/finality-aleph/src/validator_network/outgoing.rs b/finality-aleph/src/validator_network/outgoing.rs index d22858497b..679b9d0b9b 100644 --- a/finality-aleph/src/validator_network/outgoing.rs +++ b/finality-aleph/src/validator_network/outgoing.rs @@ -1,27 +1,23 @@ use std::fmt::{Debug, Display, Error as FmtError, Formatter}; -use aleph_primitives::AuthorityId; use futures::channel::mpsc; use log::{debug, info}; use tokio::time::{sleep, Duration}; -use crate::{ - crypto::AuthorityPen, - validator_network::{ - protocols::{ - protocol, ConnectionType, ProtocolError, ProtocolNegotiationError, ResultForService, - }, - ConnectionInfo, Data, Dialer, PeerAddressInfo, +use crate::validator_network::{ + protocols::{ + protocol, ConnectionType, ProtocolError, ProtocolNegotiationError, ResultForService, }, + ConnectionInfo, Data, Dialer, PeerAddressInfo, PublicKey, SecretKey, }; -enum OutgoingError> { +enum OutgoingError> { Dial(ND::Error), ProtocolNegotiation(PeerAddressInfo, ProtocolNegotiationError), - Protocol(PeerAddressInfo, ProtocolError), + Protocol(PeerAddressInfo, ProtocolError), } -impl> Display for OutgoingError { +impl> Display for OutgoingError { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { use OutgoingError::*; match self { @@ -40,15 +36,15 @@ impl> Display for OutgoingError { } } -async fn manage_outgoing>( - authority_pen: AuthorityPen, - peer_id: AuthorityId, +async fn manage_outgoing>( + secret_key: SK, + public_key: SK::PublicKey, mut dialer: ND, addresses: Vec, - result_for_parent: mpsc::UnboundedSender>, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, -) -> Result<(), OutgoingError> { - debug!(target: "validator-network", "Trying to connect to {}.", peer_id); +) -> Result<(), OutgoingError> { + debug!(target: "validator-network", "Trying to connect to {}.", public_key); let stream = dialer .connect(addresses) .await @@ -62,8 +58,8 @@ async fn manage_outgoing>( protocol .manage_outgoing( stream, - authority_pen, - peer_id, + secret_key, + public_key, result_for_parent, data_for_user, ) @@ -76,17 +72,17 @@ const RETRY_DELAY: Duration = Duration::from_secs(10); /// Establish an outgoing connection to the provided peer using the dialer and then manage it. /// While this works it will send any data from the user to the peer. Any failures will be reported /// to the parent, so that connections can be reestablished if necessary. -pub async fn outgoing>( - authority_pen: AuthorityPen, - peer_id: AuthorityId, +pub async fn outgoing>( + secret_key: SK, + public_key: SK::PublicKey, dialer: ND, addresses: Vec, - result_for_parent: mpsc::UnboundedSender>, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) { if let Err(e) = manage_outgoing( - authority_pen, - peer_id.clone(), + secret_key, + public_key.clone(), dialer, addresses.clone(), result_for_parent.clone(), @@ -94,12 +90,12 @@ pub async fn outgoing>( ) .await { - info!(target: "validator-network", "Outgoing connection to {} {:?} failed: {}, will retry after {}s.", peer_id, addresses, e, RETRY_DELAY.as_secs()); + info!(target: "validator-network", "Outgoing connection to {} {:?} failed: {}, will retry after {}s.", public_key, addresses, e, RETRY_DELAY.as_secs()); sleep(RETRY_DELAY).await; // we send the "new" connection type, because we always assume it's new until proven // otherwise, and here we did not even get the chance to attempt negotiating a protocol if result_for_parent - .unbounded_send((peer_id, None, ConnectionType::New)) + .unbounded_send((public_key, None, ConnectionType::New)) .is_err() { debug!(target: "validator-network", "Could not send the closing message, we've probably been terminated by the parent service."); diff --git a/finality-aleph/src/validator_network/protocols/handshake.rs b/finality-aleph/src/validator_network/protocols/handshake.rs index 54f0bf8820..3286d5733e 100644 --- a/finality-aleph/src/validator_network/protocols/handshake.rs +++ b/finality-aleph/src/validator_network/protocols/handshake.rs @@ -1,23 +1,19 @@ use std::fmt::{Display, Error as FmtError, Formatter}; -use aleph_primitives::AuthorityId; use codec::{Decode, Encode}; use rand::Rng; use tokio::time::{timeout, Duration}; -use crate::{ - crypto::{verify, AuthorityPen, Signature}, - validator_network::{ - io::{receive_data, send_data, ReceiveError, SendError}, - Splittable, - }, +use crate::validator_network::{ + io::{receive_data, send_data, ReceiveError, SendError}, + PublicKey, SecretKey, Splittable, }; pub const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10); /// Handshake error. #[derive(Debug)] -pub enum HandshakeError { +pub enum HandshakeError { /// Send error. SendError(SendError), /// Receive error. @@ -25,12 +21,12 @@ pub enum HandshakeError { /// Signature error. SignatureError, /// Challenge contains invalid peer id. - ChallengeError(AuthorityId, AuthorityId), + ChallengeError(PK, PK), /// Timeout. TimedOut, } -impl Display for HandshakeError { +impl Display for HandshakeError { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { use HandshakeError::*; match self { @@ -47,13 +43,13 @@ impl Display for HandshakeError { } } -impl From for HandshakeError { +impl From for HandshakeError { fn from(e: SendError) -> Self { HandshakeError::SendError(e) } } -impl From for HandshakeError { +impl From for HandshakeError { fn from(e: ReceiveError) -> Self { HandshakeError::ReceiveError(e) } @@ -61,39 +57,44 @@ impl From for HandshakeError { /// Handshake challenge. Contains public key of the creator, and a random nonce. #[derive(Debug, Clone, Encode, Decode)] -struct Challenge { - id: AuthorityId, +struct Challenge { + public_key: PK, nonce: [u8; 32], } -impl Challenge { +impl Challenge { /// Prepare new challenge that contains ID of the creator. - fn new(id: AuthorityId) -> Self { + fn new(public_key: PK) -> Self { let nonce = rand::thread_rng().gen::<[u8; 32]>(); - Self { id, nonce } + Self { public_key, nonce } } } /// Handshake response. Contains public key of the creator, and signature /// related to the received challenge. #[derive(Debug, Clone, Encode, Decode)] -struct Response { - id: AuthorityId, - signature: Signature, +struct Response { + public_key: PK, + signature: PK::Signature, } -impl Response { +impl Response { + // Amusingly the `Signature = PK::Signature` is necessary, the compiler cannot even do this + // simple reasoning. :/ /// Create a new response by signing the challenge. - async fn new(pen: &AuthorityPen, challenge: &Challenge) -> Self { + async fn new>( + secret_key: &SK, + challenge: &Challenge, + ) -> Self { Self { - id: pen.authority_id(), - signature: pen.sign(&challenge.encode()).await, + public_key: secret_key.public_key(), + signature: secret_key.sign(&challenge.encode()).await, } } /// Verify the Response sent by the peer. - fn verify(&self, challenge: &Challenge) -> bool { - verify(&self.id, &challenge.encode(), &self.signature) + fn verify(&self, challenge: &Challenge) -> bool { + self.public_key.verify(&challenge.encode(), &self.signature) } } @@ -105,22 +106,22 @@ impl Response { /// will NOT be secured in any way. We assume that if the channel is /// compromised after the handshake, the peer will establish another connection, /// which will replace the current one. -pub async fn execute_v0_handshake_incoming( +pub async fn execute_v0_handshake_incoming( stream: S, - authority_pen: AuthorityPen, -) -> Result<(S::Sender, S::Receiver, AuthorityId), HandshakeError> { + secret_key: SK, +) -> Result<(S::Sender, S::Receiver, SK::PublicKey), HandshakeError> { // send challenge - let our_challenge = Challenge::new(authority_pen.authority_id()); + let our_challenge = Challenge::new(secret_key.public_key()); let stream = send_data(stream, our_challenge.clone()).await?; // receive response - let (stream, peer_response) = receive_data::<_, Response>(stream).await?; + let (stream, peer_response) = receive_data::<_, Response>(stream).await?; // validate response if !peer_response.verify(&our_challenge) { return Err(HandshakeError::SignatureError); } let (sender, receiver) = stream.split(); - let peer_id = peer_response.id; - Ok((sender, receiver, peer_id)) + let public_key = peer_response.public_key; + Ok((sender, receiver, public_key)) } /// Performs the handshake with a peer that we called. We assume that their @@ -132,45 +133,48 @@ pub async fn execute_v0_handshake_incoming( /// will NOT be secured in any way. We assume that if the channel is /// compromised after the handshake, we will establish another connection, /// which will replace the current one. -pub async fn execute_v0_handshake_outgoing( +pub async fn execute_v0_handshake_outgoing( stream: S, - authority_pen: AuthorityPen, - peer_id: AuthorityId, -) -> Result<(S::Sender, S::Receiver), HandshakeError> { + secret_key: SK, + public_key: SK::PublicKey, +) -> Result<(S::Sender, S::Receiver), HandshakeError> { // receive challenge - let (stream, peer_challenge) = receive_data::<_, Challenge>(stream).await?; - if peer_id != peer_challenge.id { - return Err(HandshakeError::ChallengeError(peer_id, peer_challenge.id)); + let (stream, peer_challenge) = receive_data::<_, Challenge>(stream).await?; + if public_key != peer_challenge.public_key { + return Err(HandshakeError::ChallengeError( + public_key, + peer_challenge.public_key, + )); } // send response - let our_response = Response::new(&authority_pen, &peer_challenge).await; + let our_response = Response::new(&secret_key, &peer_challenge).await; let stream = send_data(stream, our_response).await?; let (sender, receiver) = stream.split(); Ok((sender, receiver)) } /// Wrapper that adds timeout to the function performing handshake. -pub async fn v0_handshake_incoming( +pub async fn v0_handshake_incoming( stream: S, - authority_pen: AuthorityPen, -) -> Result<(S::Sender, S::Receiver, AuthorityId), HandshakeError> { + secret_key: SK, +) -> Result<(S::Sender, S::Receiver, SK::PublicKey), HandshakeError> { timeout( HANDSHAKE_TIMEOUT, - execute_v0_handshake_incoming(stream, authority_pen), + execute_v0_handshake_incoming(stream, secret_key), ) .await .map_err(|_| HandshakeError::TimedOut)? } /// Wrapper that adds timeout to the function performing handshake. -pub async fn v0_handshake_outgoing( +pub async fn v0_handshake_outgoing( stream: S, - authority_pen: AuthorityPen, - peer_id: AuthorityId, -) -> Result<(S::Sender, S::Receiver), HandshakeError> { + secret_key: SK, + public_key: SK::PublicKey, +) -> Result<(S::Sender, S::Receiver), HandshakeError> { timeout( HANDSHAKE_TIMEOUT, - execute_v0_handshake_outgoing(stream, authority_pen, peer_id), + execute_v0_handshake_outgoing(stream, secret_key, public_key), ) .await .map_err(|_| HandshakeError::TimedOut)? @@ -178,6 +182,7 @@ pub async fn v0_handshake_outgoing( #[cfg(test)] mod tests { + use aleph_primitives::AuthorityId; use futures::{join, try_join}; use super::{ @@ -193,7 +198,7 @@ mod tests { }, }; - fn assert_send_error(result: Result) { + fn assert_send_error(result: Result>) { match result { Err(HandshakeError::SendError(_)) => (), x => panic!( @@ -203,7 +208,7 @@ mod tests { }; } - fn assert_receive_error(result: Result) { + fn assert_receive_error(result: Result>) { match result { Err(HandshakeError::ReceiveError(_)) => (), x => panic!( @@ -213,7 +218,7 @@ mod tests { }; } - fn assert_signature_error(result: Result) { + fn assert_signature_error(result: Result>) { match result { Err(HandshakeError::SignatureError) => (), x => panic!( @@ -223,7 +228,7 @@ mod tests { }; } - fn assert_challenge_error(result: Result) { + fn assert_challenge_error(result: Result>) { match result { Err(HandshakeError::ChallengeError(_, _)) => (), x => panic!( @@ -276,7 +281,7 @@ mod tests { authority_pen: AuthorityPen, ) { // receive challenge - let (stream, _) = receive_data::<_, Challenge>(stream) + let (stream, _) = receive_data::<_, Challenge>(stream) .await .expect("should receive"); // prepare fake challenge @@ -304,14 +309,14 @@ mod tests { authority_pen: AuthorityPen, ) { // receive challenge - let (stream, challenge) = receive_data::<_, Challenge>(stream) + let (stream, challenge) = receive_data::<_, Challenge>(stream) .await .expect("should receive"); // prepare fake id let (fake_id, _) = key().await; // send response with substituted id let mut our_response = Response::new(&authority_pen, &challenge).await; - our_response.id = fake_id; + our_response.public_key = fake_id; send_data(stream, our_response).await.expect("should send"); futures::future::pending::<()>().await; } @@ -341,7 +346,7 @@ mod tests { execute_v0_handshake_incoming(stream_a, pen_a), // mock outgoing handshake: receive the first message and terminate async { - receive_data::<_, Challenge>(stream_b) + receive_data::<_, Challenge>(stream_b) .await .expect("should receive"); }, diff --git a/finality-aleph/src/validator_network/protocols/mod.rs b/finality-aleph/src/validator_network/protocols/mod.rs index d16430d205..c37e2d4253 100644 --- a/finality-aleph/src/validator_network/protocols/mod.rs +++ b/finality-aleph/src/validator_network/protocols/mod.rs @@ -1,14 +1,10 @@ use std::fmt::{Display, Error as FmtError, Formatter}; -use aleph_primitives::AuthorityId; use futures::channel::mpsc; -use crate::{ - crypto::AuthorityPen, - validator_network::{ - io::{ReceiveError, SendError}, - Data, Splittable, - }, +use crate::validator_network::{ + io::{ReceiveError, SendError}, + Data, PublicKey, SecretKey, Splittable, }; mod handshake; @@ -30,15 +26,11 @@ pub enum ConnectionType { LegacyOutgoing, } -/// What connections send back to the service after they become established. Starts with a peer id -/// of the remote node, followed by a channel for sending data to that node, with None if the +/// What connections send back to the service after they become established. Starts with a public +/// key of the remote node, followed by a channel for sending data to that node, with None if the /// connection was unsuccessful and should be reestablished. Finally a marker for legacy /// compatibility. -pub type ResultForService = ( - AuthorityId, - Option>, - ConnectionType, -); +pub type ResultForService = (PK, Option>, ConnectionType); /// Defines the protocol for communication. #[derive(Debug, PartialEq, Eq)] @@ -52,9 +44,9 @@ pub enum Protocol { /// Protocol error. #[derive(Debug)] -pub enum ProtocolError { +pub enum ProtocolError { /// Error during performing a handshake. - HandshakeError(HandshakeError), + HandshakeError(HandshakeError), /// Sending failed. SendError(SendError), /// Receiving failed. @@ -67,7 +59,7 @@ pub enum ProtocolError { NoUserConnection, } -impl Display for ProtocolError { +impl Display for ProtocolError { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { use ProtocolError::*; match self { @@ -81,19 +73,19 @@ impl Display for ProtocolError { } } -impl From for ProtocolError { - fn from(e: HandshakeError) -> Self { +impl From> for ProtocolError { + fn from(e: HandshakeError) -> Self { ProtocolError::HandshakeError(e) } } -impl From for ProtocolError { +impl From for ProtocolError { fn from(e: SendError) -> Self { ProtocolError::SendError(e) } } -impl From for ProtocolError { +impl From for ProtocolError { fn from(e: ReceiveError) -> Self { ProtocolError::ReceiveError(e) } @@ -107,37 +99,37 @@ impl Protocol { const MAX_VERSION: Version = 1; /// Launches the proper variant of the protocol (receiver half). - pub async fn manage_incoming( + pub async fn manage_incoming( &self, stream: S, - authority_pen: AuthorityPen, - result_for_service: mpsc::UnboundedSender>, + secret_key: SK, + result_for_service: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, - ) -> Result<(), ProtocolError> { + ) -> Result<(), ProtocolError> { use Protocol::*; match self { - V0 => v0::incoming(stream, authority_pen, result_for_service, data_for_user).await, - V1 => v1::incoming(stream, authority_pen, result_for_service, data_for_user).await, + V0 => v0::incoming(stream, secret_key, result_for_service, data_for_user).await, + V1 => v1::incoming(stream, secret_key, result_for_service, data_for_user).await, } } /// Launches the proper variant of the protocol (sender half). - pub async fn manage_outgoing( + pub async fn manage_outgoing( &self, stream: S, - authority_pen: AuthorityPen, - peer_id: AuthorityId, - result_for_service: mpsc::UnboundedSender>, + secret_key: SK, + public_key: SK::PublicKey, + result_for_service: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, - ) -> Result<(), ProtocolError> { + ) -> Result<(), ProtocolError> { use Protocol::*; match self { - V0 => v0::outgoing(stream, authority_pen, peer_id, result_for_service).await, + V0 => v0::outgoing(stream, secret_key, public_key, result_for_service).await, V1 => { v1::outgoing( stream, - authority_pen, - peer_id, + secret_key, + public_key, result_for_service, data_for_user, ) diff --git a/finality-aleph/src/validator_network/protocols/v0/mod.rs b/finality-aleph/src/validator_network/protocols/v0/mod.rs index 1f80de2baf..c0c1af3ea0 100644 --- a/finality-aleph/src/validator_network/protocols/v0/mod.rs +++ b/finality-aleph/src/validator_network/protocols/v0/mod.rs @@ -1,18 +1,14 @@ -use aleph_primitives::AuthorityId; use futures::{channel::mpsc, StreamExt}; use log::{debug, info, trace}; use tokio::io::{AsyncRead, AsyncWrite}; -use crate::{ - crypto::AuthorityPen, - validator_network::{ - io::{receive_data, send_data}, - protocols::{ - handshake::{v0_handshake_incoming, v0_handshake_outgoing}, - ConnectionType, ProtocolError, ResultForService, - }, - Data, Splittable, +use crate::validator_network::{ + io::{receive_data, send_data}, + protocols::{ + handshake::{v0_handshake_incoming, v0_handshake_outgoing}, + ConnectionType, ProtocolError, ResultForService, }, + Data, PublicKey, SecretKey, Splittable, }; mod heartbeat; @@ -21,10 +17,10 @@ use heartbeat::{heartbeat_receiver, heartbeat_sender}; /// Receives data from the parent service and sends it over the network. /// Exits when the parent channel is closed, or if the network connection is broken. -async fn sending( +async fn sending( mut sender: S, mut data_from_user: mpsc::UnboundedReceiver, -) -> Result<(), ProtocolError> { +) -> Result<(), ProtocolError> { loop { sender = match data_from_user.next().await { Some(data) => send_data(sender, data).await?, @@ -36,19 +32,19 @@ async fn sending( /// Performs the handshake, and then keeps sending data received from the parent service. /// Exits on parent request, or in case of broken or dead network connection. -pub async fn outgoing( +pub async fn outgoing( stream: S, - authority_pen: AuthorityPen, - peer_id: AuthorityId, - result_for_parent: mpsc::UnboundedSender>, -) -> Result<(), ProtocolError> { - trace!(target: "validator-network", "Extending hand to {}.", peer_id); - let (sender, receiver) = v0_handshake_outgoing(stream, authority_pen, peer_id.clone()).await?; - info!(target: "validator-network", "Outgoing handshake with {} finished successfully.", peer_id); + secret_key: SK, + public_key: SK::PublicKey, + result_for_parent: mpsc::UnboundedSender>, +) -> Result<(), ProtocolError> { + trace!(target: "validator-network", "Extending hand to {}.", public_key); + let (sender, receiver) = v0_handshake_outgoing(stream, secret_key, public_key.clone()).await?; + info!(target: "validator-network", "Outgoing handshake with {} finished successfully.", public_key); let (data_for_network, data_from_user) = mpsc::unbounded(); result_for_parent .unbounded_send(( - peer_id.clone(), + public_key.clone(), Some(data_for_network), ConnectionType::LegacyOutgoing, )) @@ -57,7 +53,7 @@ pub async fn outgoing( let sending = sending(sender, data_from_user); let heartbeat = heartbeat_receiver(receiver); - debug!(target: "validator-network", "Starting worker for sending to {}.", peer_id); + debug!(target: "validator-network", "Starting worker for sending to {}.", public_key); loop { tokio::select! { _ = heartbeat => return Err(ProtocolError::CardiacArrest), @@ -68,10 +64,10 @@ pub async fn outgoing( /// Receives data from the network and sends it to the parent service. /// Exits when the parent channel is closed, or if the network connection is broken. -async fn receiving( +async fn receiving( mut stream: S, data_for_user: mpsc::UnboundedSender, -) -> Result<(), ProtocolError> { +) -> Result<(), ProtocolError> { loop { let (old_stream, data) = receive_data(stream).await?; stream = old_stream; @@ -83,20 +79,20 @@ async fn receiving( /// Performs the handshake, and then keeps sending data received from the network to the parent service. /// Exits on parent request, or in case of broken or dead network connection. -pub async fn incoming( +pub async fn incoming( stream: S, - authority_pen: AuthorityPen, - result_for_parent: mpsc::UnboundedSender>, + secret_key: SK, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, -) -> Result<(), ProtocolError> { +) -> Result<(), ProtocolError> { trace!(target: "validator-network", "Waiting for extended hand..."); - let (sender, receiver, peer_id) = v0_handshake_incoming(stream, authority_pen).await?; - info!(target: "validator-network", "Incoming handshake with {} finished successfully.", peer_id); + let (sender, receiver, public_key) = v0_handshake_incoming(stream, secret_key).await?; + info!(target: "validator-network", "Incoming handshake with {} finished successfully.", public_key); let (tx_exit, mut exit) = mpsc::unbounded(); result_for_parent .unbounded_send(( - peer_id.clone(), + public_key.clone(), Some(tx_exit), ConnectionType::LegacyIncoming, )) @@ -105,7 +101,7 @@ pub async fn incoming( let receiving = receiving(receiver, data_for_user); let heartbeat = heartbeat_sender(sender); - debug!(target: "validator-network", "Starting worker for receiving from {}.", peer_id); + debug!(target: "validator-network", "Starting worker for receiving from {}.", public_key); loop { tokio::select! { _ = heartbeat => return Err(ProtocolError::CardiacArrest), @@ -138,11 +134,11 @@ mod tests { AuthorityPen, AuthorityId, AuthorityPen, - impl futures::Future>, - impl futures::Future>, + impl futures::Future>>, + impl futures::Future>>, UnboundedReceiver, - UnboundedReceiver>, - UnboundedReceiver>, + UnboundedReceiver>, + UnboundedReceiver>, ) { let (stream_incoming, stream_outgoing) = MockSplittable::new(4096); let (id_incoming, pen_incoming) = key().await; diff --git a/finality-aleph/src/validator_network/protocols/v1/mod.rs b/finality-aleph/src/validator_network/protocols/v1/mod.rs index b1599f243e..372a00dfac 100644 --- a/finality-aleph/src/validator_network/protocols/v1/mod.rs +++ b/finality-aleph/src/validator_network/protocols/v1/mod.rs @@ -1,4 +1,3 @@ -use aleph_primitives::AuthorityId; use codec::{Decode, Encode}; use futures::{channel::mpsc, StreamExt}; use log::{debug, info, trace}; @@ -7,16 +6,13 @@ use tokio::{ time::{timeout, Duration}, }; -use crate::{ - crypto::AuthorityPen, - validator_network::{ - io::{receive_data, send_data}, - protocols::{ - handshake::{v0_handshake_incoming, v0_handshake_outgoing}, - ConnectionType, ProtocolError, ResultForService, - }, - Data, Splittable, +use crate::validator_network::{ + io::{receive_data, send_data}, + protocols::{ + handshake::{v0_handshake_incoming, v0_handshake_outgoing}, + ConnectionType, ProtocolError, ResultForService, }, + Data, PublicKey, SecretKey, Splittable, }; const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5); @@ -28,10 +24,10 @@ enum Message { Heartbeat, } -async fn sending( +async fn sending( mut sender: S, mut data_from_user: mpsc::UnboundedReceiver, -) -> Result<(), ProtocolError> { +) -> Result<(), ProtocolError> { use Message::*; loop { let to_send = match timeout(HEARTBEAT_TIMEOUT, data_from_user.next()).await { @@ -46,10 +42,10 @@ async fn sending( } } -async fn receiving( +async fn receiving( mut stream: S, data_for_user: mpsc::UnboundedSender, -) -> Result<(), ProtocolError> { +) -> Result<(), ProtocolError> { use Message::*; loop { let (old_stream, message) = timeout( @@ -68,12 +64,17 @@ async fn receiving( } } -async fn manage_connection( +async fn manage_connection< + PK: PublicKey, + D: Data, + S: AsyncWrite + Unpin + Send, + R: AsyncRead + Unpin + Send, +>( sender: S, receiver: R, data_from_user: mpsc::UnboundedReceiver, data_for_user: mpsc::UnboundedSender, -) -> Result<(), ProtocolError> { +) -> Result<(), ProtocolError> { let sending = sending(sender, data_from_user); let receiving = receiving(receiver, data_for_user); tokio::select! { @@ -84,41 +85,49 @@ async fn manage_connection( +pub async fn outgoing( stream: S, - authority_pen: AuthorityPen, - peer_id: AuthorityId, - result_for_parent: mpsc::UnboundedSender>, + secret_key: SK, + public_key: SK::PublicKey, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, -) -> Result<(), ProtocolError> { - trace!(target: "validator-network", "Extending hand to {}.", peer_id); - let (sender, receiver) = v0_handshake_outgoing(stream, authority_pen, peer_id.clone()).await?; - info!(target: "validator-network", "Outgoing handshake with {} finished successfully.", peer_id); +) -> Result<(), ProtocolError> { + trace!(target: "validator-network", "Extending hand to {}.", public_key); + let (sender, receiver) = v0_handshake_outgoing(stream, secret_key, public_key.clone()).await?; + info!(target: "validator-network", "Outgoing handshake with {} finished successfully.", public_key); let (data_for_network, data_from_user) = mpsc::unbounded(); result_for_parent - .unbounded_send((peer_id.clone(), Some(data_for_network), ConnectionType::New)) + .unbounded_send(( + public_key.clone(), + Some(data_for_network), + ConnectionType::New, + )) .map_err(|_| ProtocolError::NoParentConnection)?; - debug!(target: "validator-network", "Starting worker for communicating with {}.", peer_id); + debug!(target: "validator-network", "Starting worker for communicating with {}.", public_key); manage_connection(sender, receiver, data_from_user, data_for_user).await } /// Performs the incoming handshake, and then manages a connection sending and receiving data. /// Exits on parent request (when the data source is dropped), or in case of broken or dead /// network connection. -pub async fn incoming( +pub async fn incoming( stream: S, - authority_pen: AuthorityPen, - result_for_parent: mpsc::UnboundedSender>, + secret_key: SK, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, -) -> Result<(), ProtocolError> { +) -> Result<(), ProtocolError> { trace!(target: "validator-network", "Waiting for extended hand..."); - let (sender, receiver, peer_id) = v0_handshake_incoming(stream, authority_pen).await?; - info!(target: "validator-network", "Incoming handshake with {} finished successfully.", peer_id); + let (sender, receiver, public_key) = v0_handshake_incoming(stream, secret_key).await?; + info!(target: "validator-network", "Incoming handshake with {} finished successfully.", public_key); let (data_for_network, data_from_user) = mpsc::unbounded(); result_for_parent - .unbounded_send((peer_id.clone(), Some(data_for_network), ConnectionType::New)) + .unbounded_send(( + public_key.clone(), + Some(data_for_network), + ConnectionType::New, + )) .map_err(|_| ProtocolError::NoParentConnection)?; - debug!(target: "validator-network", "Starting worker for communicating with {}.", peer_id); + debug!(target: "validator-network", "Starting worker for communicating with {}.", public_key); manage_connection(sender, receiver, data_from_user, data_for_user).await } @@ -145,12 +154,12 @@ mod tests { AuthorityPen, AuthorityId, AuthorityPen, - impl futures::Future>, - impl futures::Future>, + impl futures::Future>>, + impl futures::Future>>, UnboundedReceiver, UnboundedReceiver, - UnboundedReceiver>, - UnboundedReceiver>, + UnboundedReceiver>, + UnboundedReceiver>, ) { let (stream_incoming, stream_outgoing) = MockSplittable::new(4096); let (id_incoming, pen_incoming) = key().await; diff --git a/finality-aleph/src/validator_network/service.rs b/finality-aleph/src/validator_network/service.rs index 86a2aca44c..c1c2f4a762 100644 --- a/finality-aleph/src/validator_network/service.rs +++ b/finality-aleph/src/validator_network/service.rs @@ -1,6 +1,5 @@ use std::{collections::HashSet, fmt::Debug}; -use aleph_primitives::AuthorityId; use futures::{ channel::{mpsc, oneshot}, StreamExt, @@ -9,32 +8,32 @@ use log::{debug, info, trace, warn}; use tokio::time; use crate::{ - crypto::AuthorityPen, + network::PeerId, validator_network::{ incoming::incoming, manager::{AddResult, LegacyManager, Manager}, outgoing::outgoing, protocols::{ConnectionType, ResultForService}, - Data, Dialer, Listener, Network, + Data, Dialer, Listener, Network, PublicKey, SecretKey, }, SpawnTaskHandle, STATUS_REPORT_INTERVAL, }; -enum ServiceCommand { - AddConnection(AuthorityId, Vec), - DelConnection(AuthorityId), - SendData(D, AuthorityId), +enum ServiceCommand { + AddConnection(PK, Vec), + DelConnection(PK), + SendData(D, PK), } -struct ServiceInterface { - commands_for_service: mpsc::UnboundedSender>, +struct ServiceInterface { + commands_for_service: mpsc::UnboundedSender>, next_from_service: mpsc::UnboundedReceiver, } #[async_trait::async_trait] -impl Network for ServiceInterface { +impl Network for ServiceInterface { /// Add the peer to the set of connected peers. - fn add_connection(&mut self, peer: AuthorityId, addresses: Vec) { + fn add_connection(&mut self, peer: PK, addresses: Vec) { if self .commands_for_service .unbounded_send(ServiceCommand::AddConnection(peer, addresses)) @@ -45,7 +44,7 @@ impl Network for ServiceInterface { } /// Remove the peer from the set of connected peers and close the connection. - fn remove_connection(&mut self, peer: AuthorityId) { + fn remove_connection(&mut self, peer: PK) { if self .commands_for_service .unbounded_send(ServiceCommand::DelConnection(peer)) @@ -57,7 +56,7 @@ impl Network for ServiceInterface { /// Send a message to a single peer. /// This function should be implemented in a non-blocking manner. - fn send(&self, data: D, recipient: AuthorityId) { + fn send(&self, data: D, recipient: PK) { if self .commands_for_service .unbounded_send(ServiceCommand::SendData(data, recipient)) @@ -74,27 +73,33 @@ impl Network for ServiceInterface { } /// A service that has to be run for the validator network to work. -pub struct Service, NL: Listener> { - commands_from_interface: mpsc::UnboundedReceiver>, +pub struct Service, NL: Listener> +where + SK::PublicKey: PeerId, +{ + commands_from_interface: mpsc::UnboundedReceiver>, next_to_interface: mpsc::UnboundedSender, - manager: Manager, + manager: Manager, dialer: ND, listener: NL, spawn_handle: SpawnTaskHandle, - authority_pen: AuthorityPen, + secret_key: SK, // Backwards compatibility with the one-sided connections, remove when no longer needed. - legacy_connected: HashSet, - legacy_manager: LegacyManager, + legacy_connected: HashSet, + legacy_manager: LegacyManager, } -impl, NL: Listener> Service { +impl, NL: Listener> Service +where + SK::PublicKey: PeerId, +{ /// Create a new validator network service plus an interface for interacting with it. pub fn new( dialer: ND, listener: NL, - authority_pen: AuthorityPen, + secret_key: SK, spawn_handle: SpawnTaskHandle, - ) -> (Self, impl Network) { + ) -> (Self, impl Network) { // Channel for sending commands between the service and interface let (commands_for_service, commands_from_interface) = mpsc::unbounded(); // Channel for receiving data from the network @@ -103,11 +108,11 @@ impl, NL: Listener> Service, NL: Listener> Service, - result_for_parent: mpsc::UnboundedSender>, + result_for_parent: mpsc::UnboundedSender>, ) { - let authority_pen = self.authority_pen.clone(); + let secret_key = self.secret_key.clone(); let dialer = self.dialer.clone(); let next_to_interface = self.next_to_interface.clone(); self.spawn_handle .spawn("aleph/validator_network_outgoing", None, async move { outgoing( - authority_pen, - peer_id, + secret_key, + public_key, dialer, addresses, result_for_parent, @@ -144,26 +149,26 @@ impl, NL: Listener> Service>, + result_for_parent: mpsc::UnboundedSender>, ) { - let authority_pen = self.authority_pen.clone(); + let secret_key = self.secret_key.clone(); let next_to_interface = self.next_to_interface.clone(); self.spawn_handle .spawn("aleph/validator_network_incoming", None, async move { - incoming(authority_pen, stream, result_for_parent, next_to_interface).await; + incoming(secret_key, stream, result_for_parent, next_to_interface).await; }); } - fn peer_addresses(&self, peer_id: &AuthorityId) -> Option> { - match self.legacy_connected.contains(peer_id) { - true => self.legacy_manager.peer_addresses(peer_id), - false => self.manager.peer_addresses(peer_id), + fn peer_addresses(&self, public_key: &SK::PublicKey) -> Option> { + match self.legacy_connected.contains(public_key) { + true => self.legacy_manager.peer_addresses(public_key), + false => self.manager.peer_addresses(public_key), } } fn add_connection( &mut self, - peer_id: AuthorityId, + public_key: SK::PublicKey, data_for_network: mpsc::UnboundedSender, connection_type: ConnectionType, ) -> AddResult { @@ -173,37 +178,45 @@ impl, NL: Listener> Service self.legacy_manager.add_incoming(peer_id, data_for_network), - LegacyOutgoing => self.legacy_manager.add_outgoing(peer_id, data_for_network), + LegacyIncoming => self + .legacy_manager + .add_incoming(public_key, data_for_network), + LegacyOutgoing => self + .legacy_manager + .add_outgoing(public_key, data_for_network), } } // Mark a peer as legacy and return whether it is the first time we do so. - fn mark_legacy(&mut self, peer_id: &AuthorityId) -> bool { - self.manager.remove_peer(peer_id); - self.legacy_connected.insert(peer_id.clone()) + fn mark_legacy(&mut self, public_key: &SK::PublicKey) -> bool { + self.manager.remove_peer(public_key); + self.legacy_connected.insert(public_key.clone()) } // Unmark a peer as legacy, putting it back in the normal set. - fn unmark_legacy(&mut self, peer_id: &AuthorityId) { - self.legacy_connected.remove(peer_id); + fn unmark_legacy(&mut self, public_key: &SK::PublicKey) { + self.legacy_connected.remove(public_key); // Put it back if we still want to be connected. - if let Some(addresses) = self.legacy_manager.peer_addresses(peer_id) { - self.manager.add_peer(peer_id.clone(), addresses); + if let Some(addresses) = self.legacy_manager.peer_addresses(public_key) { + self.manager.add_peer(public_key.clone(), addresses); } } // Checks whether this peer should now be marked as one using the legacy protocol and handled // accordingly. Returns whether we should spawn a new connection worker because of that. - fn check_for_legacy(&mut self, peer_id: &AuthorityId, connection_type: ConnectionType) -> bool { + fn check_for_legacy( + &mut self, + public_key: &SK::PublicKey, + connection_type: ConnectionType, + ) -> bool { use ConnectionType::*; match connection_type { - LegacyIncoming => self.mark_legacy(peer_id), + LegacyIncoming => self.mark_legacy(public_key), LegacyOutgoing => { - self.mark_legacy(peer_id); + self.mark_legacy(public_key); false } // We don't unmark here, because we always return New when a connection @@ -230,59 +243,59 @@ impl, NL: Listener> Service match command { // register new peer in manager or update its list of addresses if already there // spawn a worker managing outgoing connection if the peer was not known - AddConnection(peer_id, addresses) => { + AddConnection(public_key, addresses) => { // we add all the peers to the legacy manager so we don't lose the // addresses, but only care about its opinion when it turns out we have to // in particular the first time we add a peer we never know whether it // requires legacy connecting, so we only attempt to connect to it if the // new criterion is satisfied, otherwise we wait for it to connect to us - self.legacy_manager.add_peer(peer_id.clone(), addresses.clone()); - if self.manager.add_peer(peer_id.clone(), addresses.clone()) { - self.spawn_new_outgoing(peer_id, addresses, result_for_parent.clone()); + self.legacy_manager.add_peer(public_key.clone(), addresses.clone()); + if self.manager.add_peer(public_key.clone(), addresses.clone()) { + self.spawn_new_outgoing(public_key, addresses, result_for_parent.clone()); }; }, // remove the peer from the manager all workers will be killed automatically, due to closed channels - DelConnection(peer_id) => { - self.manager.remove_peer(&peer_id); - self.legacy_manager.remove_peer(&peer_id); - self.legacy_connected.remove(&peer_id); + DelConnection(public_key) => { + self.manager.remove_peer(&public_key); + self.legacy_manager.remove_peer(&public_key); + self.legacy_connected.remove(&public_key); }, // pass the data to the manager - SendData(data, peer_id) => { - match self.legacy_connected.contains(&peer_id) { - true => match self.legacy_manager.send_to(&peer_id, data) { - Ok(_) => trace!(target: "validator-network", "Sending data to {} through legacy.", peer_id), - Err(e) => trace!(target: "validator-network", "Failed sending to {} through legacy: {}", peer_id, e), + SendData(data, public_key) => { + match self.legacy_connected.contains(&public_key) { + true => match self.legacy_manager.send_to(&public_key, data) { + Ok(_) => trace!(target: "validator-network", "Sending data to {} through legacy.", public_key), + Err(e) => trace!(target: "validator-network", "Failed sending to {} through legacy: {}", public_key, e), }, - false => match self.manager.send_to(&peer_id, data) { - Ok(_) => trace!(target: "validator-network", "Sending data to {}.", peer_id), - Err(e) => trace!(target: "validator-network", "Failed sending to {}: {}", peer_id, e), + false => match self.manager.send_to(&public_key, data) { + Ok(_) => trace!(target: "validator-network", "Sending data to {}.", public_key), + Err(e) => trace!(target: "validator-network", "Failed sending to {}: {}", public_key, e), }, } }, }, // received information from a spawned worker managing a connection // check if we still want to be connected to the peer, and if so, spawn a new worker or actually add proper connection - Some((peer_id, maybe_data_for_network, connection_type)) = worker_results.next() => { - if self.check_for_legacy(&peer_id, connection_type) { - match self.legacy_manager.peer_addresses(&peer_id) { - Some(addresses) => self.spawn_new_outgoing(peer_id.clone(), addresses, result_for_parent.clone()), + Some((public_key, maybe_data_for_network, connection_type)) = worker_results.next() => { + if self.check_for_legacy(&public_key, connection_type) { + match self.legacy_manager.peer_addresses(&public_key) { + Some(addresses) => self.spawn_new_outgoing(public_key.clone(), addresses, result_for_parent.clone()), None => { // We received a result from a worker we are no longer interested // in. - self.legacy_connected.remove(&peer_id); + self.legacy_connected.remove(&public_key); }, } } use AddResult::*; match maybe_data_for_network { - Some(data_for_network) => match self.add_connection(peer_id.clone(), data_for_network, connection_type) { - Uninterested => warn!(target: "validator-network", "Established connection with peer {} for unknown reasons.", peer_id), - Added => info!(target: "validator-network", "New connection with peer {}.", peer_id), - Replaced => info!(target: "validator-network", "Replaced connection with peer {}.", peer_id), + Some(data_for_network) => match self.add_connection(public_key.clone(), data_for_network, connection_type) { + Uninterested => warn!(target: "validator-network", "Established connection with peer {} for unknown reasons.", public_key), + Added => info!(target: "validator-network", "New connection with peer {}.", public_key), + Replaced => info!(target: "validator-network", "Replaced connection with peer {}.", public_key), }, - None => if let Some(addresses) = self.peer_addresses(&peer_id) { - self.spawn_new_outgoing(peer_id, addresses, result_for_parent.clone()); + None => if let Some(addresses) = self.peer_addresses(&public_key) { + self.spawn_new_outgoing(public_key, addresses, result_for_parent.clone()); } } },