From b28b92237224dbe1a3d26531904f9afe0294ab97 Mon Sep 17 00:00:00 2001 From: timorl Date: Mon, 21 Nov 2022 17:43:46 +0100 Subject: [PATCH 1/3] Replace AuthorityId with parameter --- finality-aleph/src/network/service.rs | 17 +-- finality-aleph/src/nodes/validator_node.rs | 4 +- finality-aleph/src/tcp_network.rs | 28 ++++- .../src/testing/mocks/validator_network.rs | 2 +- .../src/validator_network/crypto.rs | 26 +++++ .../src/validator_network/incoming.rs | 41 ++++--- .../validator_network/manager/direction.rs | 36 +++--- .../src/validator_network/manager/legacy.rs | 52 ++++----- .../src/validator_network/manager/mod.rs | 55 ++++----- finality-aleph/src/validator_network/mod.rs | 14 +-- .../src/validator_network/outgoing.rs | 40 +++---- .../validator_network/protocols/handshake.rs | 106 +++++++++--------- .../src/validator_network/protocols/mod.rs | 56 ++++----- .../src/validator_network/protocols/v0/mod.rs | 54 +++++---- .../src/validator_network/protocols/v1/mod.rs | 63 ++++++----- .../src/validator_network/service.rs | 82 ++++++++------ 16 files changed, 362 insertions(+), 314 deletions(-) create mode 100644 finality-aleph/src/validator_network/crypto.rs diff --git a/finality-aleph/src/network/service.rs b/finality-aleph/src/network/service.rs index 83f4ea22f4..154ac69b12 100644 --- a/finality-aleph/src/network/service.rs +++ b/finality-aleph/src/network/service.rs @@ -4,7 +4,6 @@ use std::{ iter, }; -use aleph_primitives::AuthorityId; use futures::{channel::mpsc, StreamExt}; use log::{debug, error, info, trace, warn}; use sc_service::SpawnTaskHandle; @@ -16,7 +15,7 @@ use crate::{ AddressedData, ConnectionCommand, Data, Event, EventStream, Multiaddress, Network, NetworkSender, Protocol, }, - validator_network::Network as ValidatorNetwork, + validator_network::{Network as ValidatorNetwork, PublicKey}, STATUS_REPORT_INTERVAL, }; @@ -33,9 +32,11 @@ pub struct Service< N: Network, D: Data, VD: Data, - A: Data + Multiaddress, - VN: ValidatorNetwork, -> { + A: Data + Multiaddress, + VN: ValidatorNetwork, +> where + A::PeerId: PublicKey, +{ network: N, validator_network: VN, data_from_user: mpsc::UnboundedReceiver>, @@ -85,9 +86,11 @@ impl< N: Network, D: Data, VD: Data, - A: Data + Multiaddress, - VN: ValidatorNetwork, + A: Data + Multiaddress, + VN: ValidatorNetwork, > Service +where + A::PeerId: PublicKey, { pub fn new( network: N, diff --git a/finality-aleph/src/nodes/validator_node.rs b/finality-aleph/src/nodes/validator_node.rs index b6ed7b619b..907d433b23 100644 --- a/finality-aleph/src/nodes/validator_node.rs +++ b/finality-aleph/src/nodes/validator_node.rs @@ -21,8 +21,8 @@ use crate::{ ConsensusParty, ConsensusPartyParams, }, session_map::{AuthorityProviderImpl, FinalityNotificatorImpl, SessionMapUpdater}, - tcp_network::new_tcp_network, - validator_network::{Service, KEY_TYPE}, + tcp_network::{new_tcp_network, KEY_TYPE}, + validator_network::Service, AlephConfig, }; diff --git a/finality-aleph/src/tcp_network.rs b/finality-aleph/src/tcp_network.rs index dd2ee8ef5b..dfd30b3c3d 100644 --- a/finality-aleph/src/tcp_network.rs +++ b/finality-aleph/src/tcp_network.rs @@ -3,16 +3,20 @@ use std::{io::Result as IoResult, net::ToSocketAddrs as _}; use aleph_primitives::AuthorityId; use codec::{Decode, Encode}; use log::info; +use sp_core::crypto::KeyTypeId; use tokio::net::{ tcp::{OwnedReadHalf, OwnedWriteHalf}, TcpListener, TcpStream, ToSocketAddrs, }; use crate::{ + crypto::{verify, AuthorityPen, Signature}, network::{Multiaddress, NetworkIdentity, PeerId}, - validator_network::{ConnectionInfo, Dialer, Listener, Splittable}, + validator_network::{ConnectionInfo, Dialer, Listener, PrivateKey, PublicKey, Splittable}, }; +pub const KEY_TYPE: KeyTypeId = KeyTypeId(*b"a0vn"); + impl ConnectionInfo for TcpStream { fn peer_address_info(&self) -> String { match self.peer_addr() { @@ -66,6 +70,28 @@ impl Listener for TcpListener { impl PeerId for AuthorityId {} +impl PublicKey for AuthorityId { + type Signature = Signature; + + fn verify(&self, message: &[u8], signature: &Self::Signature) -> bool { + verify(self, message, signature) + } +} + +#[async_trait::async_trait] +impl PrivateKey for AuthorityPen { + type Signature = Signature; + type PublicKey = AuthorityId; + + async fn sign(&self, message: &[u8]) -> Self::Signature { + AuthorityPen::sign(self, message).await + } + + fn public_key(&self) -> Self::PublicKey { + self.authority_id() + } +} + /// A representation of a single TCP address with an associated peer ID. #[derive(Debug, Hash, Encode, Decode, Clone, PartialEq, Eq)] pub struct TcpMultiaddress { diff --git a/finality-aleph/src/testing/mocks/validator_network.rs b/finality-aleph/src/testing/mocks/validator_network.rs index b9efbbbf1a..2ea14775b3 100644 --- a/finality-aleph/src/testing/mocks/validator_network.rs +++ b/finality-aleph/src/testing/mocks/validator_network.rs @@ -59,7 +59,7 @@ pub struct MockNetwork { } #[async_trait::async_trait] -impl Network for MockNetwork { +impl Network for MockNetwork { fn add_connection(&mut self, peer: AuthorityId, addresses: Vec) { self.add_connection.send((peer, addresses)); } diff --git a/finality-aleph/src/validator_network/crypto.rs b/finality-aleph/src/validator_network/crypto.rs new file mode 100644 index 0000000000..35e16aed8c --- /dev/null +++ b/finality-aleph/src/validator_network/crypto.rs @@ -0,0 +1,26 @@ +use std::{fmt::Display, hash::Hash}; + +use codec::Codec; + +/// A public key for signature verification. +pub trait PublicKey: + Send + Sync + Eq + Clone + AsRef<[u8]> + Display + Hash + Codec + 'static +{ + type Signature: Send + Sync + Clone + Codec; + + /// Verify whether the message has been signed with the associated private key. + fn verify(&self, message: &[u8], signature: &Self::Signature) -> bool; +} + +/// Private key for signing messages, with an associated public key. +#[async_trait::async_trait] +pub trait PrivateKey: Clone + Send + Sync + 'static { + type Signature: Send + Sync + Clone + Codec; + type PublicKey: PublicKey; + + /// Produce a signature for the provided message. + async fn sign(&self, message: &[u8]) -> Self::Signature; + + /// Return the associated public key. + fn public_key(&self) -> Self::PublicKey; +} diff --git a/finality-aleph/src/validator_network/incoming.rs b/finality-aleph/src/validator_network/incoming.rs index a8ec06bad4..2fe558ab3c 100644 --- a/finality-aleph/src/validator_network/incoming.rs +++ b/finality-aleph/src/validator_network/incoming.rs @@ -3,20 +3,17 @@ use std::fmt::{Display, Error as FmtError, Formatter}; use futures::channel::mpsc; use log::{debug, info}; -use crate::{ - crypto::AuthorityPen, - validator_network::{ - protocols::{protocol, ProtocolError, ProtocolNegotiationError, ResultForService}, - Data, Splittable, - }, +use crate::validator_network::{ + protocols::{protocol, ProtocolError, ProtocolNegotiationError, ResultForService}, + Data, PrivateKey, PublicKey, Splittable, }; -enum IncomingError { +enum IncomingError { ProtocolNegotiationError(ProtocolNegotiationError), - ProtocolError(ProtocolError), + ProtocolError(ProtocolError), } -impl Display for IncomingError { +impl Display for IncomingError { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { use IncomingError::*; match self { @@ -26,45 +23,45 @@ impl Display for IncomingError { } } -impl From for IncomingError { +impl From for IncomingError { fn from(e: ProtocolNegotiationError) -> Self { IncomingError::ProtocolNegotiationError(e) } } -impl From for IncomingError { - fn from(e: ProtocolError) -> Self { +impl From> for IncomingError { + fn from(e: ProtocolError) -> Self { IncomingError::ProtocolError(e) } } -async fn manage_incoming( - authority_pen: AuthorityPen, +async fn manage_incoming( + private_key: PK, stream: S, - result_for_parent: mpsc::UnboundedSender>, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, -) -> Result<(), IncomingError> { +) -> Result<(), IncomingError> { debug!(target: "validator-network", "Performing incoming protocol negotiation."); let (stream, protocol) = protocol(stream).await?; debug!(target: "validator-network", "Negotiated protocol, running."); Ok(protocol - .manage_incoming(stream, authority_pen, result_for_parent, data_for_user) + .manage_incoming(stream, private_key, result_for_parent, data_for_user) .await?) } -/// Manage an incoming connection. After the handshake it will send the recognized AuthorityId to +/// Manage an incoming connection. After the handshake it will send the recognized PublicKey to /// the parent, together with an exit channel for this process. When this channel is dropped the /// process ends. Whenever data arrives on this connection it will be passed to the user. Any /// failures in receiving data result in the process stopping, we assume the other side will /// reestablish it if necessary. -pub async fn incoming( - authority_pen: AuthorityPen, +pub async fn incoming( + private_key: PK, stream: S, - result_for_parent: mpsc::UnboundedSender>, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) { let addr = stream.peer_address_info(); - if let Err(e) = manage_incoming(authority_pen, stream, result_for_parent, data_for_user).await { + if let Err(e) = manage_incoming(private_key, stream, result_for_parent, data_for_user).await { info!(target: "validator-network", "Incoming connection from {} failed: {}.", addr, e); } } diff --git a/finality-aleph/src/validator_network/manager/direction.rs b/finality-aleph/src/validator_network/manager/direction.rs index 042a57c607..7d32eab0f1 100644 --- a/finality-aleph/src/validator_network/manager/direction.rs +++ b/finality-aleph/src/validator_network/manager/direction.rs @@ -3,16 +3,14 @@ use std::{ ops::BitXor, }; -use aleph_primitives::AuthorityId; - -use crate::validator_network::Data; +use crate::validator_network::{Data, PublicKey}; /// Data about peers we know and whether we should connect to them or they to us. For the former /// case also keeps the peers' addresses. -pub struct DirectedPeers { - own_id: AuthorityId, - outgoing: HashMap>, - incoming: HashSet, +pub struct DirectedPeers { + own_id: PK, + outgoing: HashMap>, + incoming: HashSet, } /// Whether we should call the remote or the other way around. We xor the peer ids and based on the @@ -29,9 +27,9 @@ fn should_we_call(own_id: &[u8], remote_id: &[u8]) -> bool { } } -impl DirectedPeers { +impl DirectedPeers { /// Create a new set of peers directed using our own peer id. - pub fn new(own_id: AuthorityId) -> Self { + pub fn new(own_id: PK) -> Self { DirectedPeers { own_id, outgoing: HashMap::new(), @@ -44,7 +42,7 @@ impl DirectedPeers { /// Returns whether we should start attempts at connecting with the peer, which is the case /// exactly when the peer is one with which we should attempt connections AND it was added for /// the first time. - pub fn add_peer(&mut self, peer_id: AuthorityId, addresses: Vec) -> bool { + pub fn add_peer(&mut self, peer_id: PK, addresses: Vec) -> bool { match should_we_call(self.own_id.as_ref(), peer_id.as_ref()) { true => self.outgoing.insert(peer_id, addresses).is_none(), false => { @@ -57,28 +55,28 @@ impl DirectedPeers { } /// Return the addresses of the given peer, or None if we shouldn't attempt connecting with the peer. - pub fn peer_addresses(&self, peer_id: &AuthorityId) -> Option> { + pub fn peer_addresses(&self, peer_id: &PK) -> Option> { self.outgoing.get(peer_id).cloned() } /// Whether we should be maintaining a connection with this peer. - pub fn interested(&self, peer_id: &AuthorityId) -> bool { + pub fn interested(&self, peer_id: &PK) -> bool { self.incoming.contains(peer_id) || self.outgoing.contains_key(peer_id) } /// Iterator over the peers we want connections from. - pub fn incoming_peers(&self) -> impl Iterator { + pub fn incoming_peers(&self) -> impl Iterator { self.incoming.iter() } /// Iterator over the peers we want to connect to. - pub fn outgoing_peers(&self) -> impl Iterator { + pub fn outgoing_peers(&self) -> impl Iterator { self.outgoing.keys() } /// Remove a peer from the list of peers that we want to stay connected with, whether the /// connection was supposed to be incoming or outgoing. - pub fn remove_peer(&mut self, peer_id: &AuthorityId) { + pub fn remove_peer(&mut self, peer_id: &PK) { self.incoming.remove(peer_id); self.outgoing.remove(peer_id); } @@ -93,7 +91,7 @@ mod tests { type Address = String; - async fn container_with_id() -> (DirectedPeers
, AuthorityId) { + async fn container_with_id() -> (DirectedPeers, AuthorityId) { let (id, _) = key().await; let container = DirectedPeers::new(id.clone()); (container, id) @@ -118,7 +116,8 @@ mod tests { ); } - async fn container_with_added_connecting_peer() -> (DirectedPeers
, AuthorityId) { + async fn container_with_added_connecting_peer( + ) -> (DirectedPeers, AuthorityId) { let (mut container0, id0) = container_with_id().await; let (mut container1, id1) = container_with_id().await; let addresses = some_addresses(); @@ -131,7 +130,8 @@ mod tests { } } - async fn container_with_added_nonconnecting_peer() -> (DirectedPeers
, AuthorityId) { + async fn container_with_added_nonconnecting_peer( + ) -> (DirectedPeers, AuthorityId) { let (mut container0, id0) = container_with_id().await; let (mut container1, id1) = container_with_id().await; let addresses = some_addresses(); diff --git a/finality-aleph/src/validator_network/manager/legacy.rs b/finality-aleph/src/validator_network/manager/legacy.rs index f305c716f8..977682aacc 100644 --- a/finality-aleph/src/validator_network/manager/legacy.rs +++ b/finality-aleph/src/validator_network/manager/legacy.rs @@ -3,35 +3,34 @@ use std::{ fmt::{Display, Error as FmtError, Formatter}, }; -use aleph_primitives::AuthorityId; use futures::channel::mpsc; use crate::{ network::PeerId, validator_network::{ manager::{AddResult, SendError}, - Data, + Data, PublicKey, }, }; /// Network component responsible for holding the list of peers that we /// want to connect to, and managing the established connections. -pub struct Manager { - addresses: HashMap>, - outgoing: HashMap>, - incoming: HashMap>, +pub struct Manager { + addresses: HashMap>, + outgoing: HashMap>, + incoming: HashMap>, } -struct ManagerStatus { +struct ManagerStatus { wanted_peers: usize, - both_ways_peers: HashSet, - outgoing_peers: HashSet, - incoming_peers: HashSet, - missing_peers: HashSet, + both_ways_peers: HashSet, + outgoing_peers: HashSet, + incoming_peers: HashSet, + missing_peers: HashSet, } -impl ManagerStatus { - fn new(manager: &Manager) -> Self { +impl ManagerStatus { + fn new(manager: &Manager) -> Self { let incoming: HashSet<_> = manager .incoming .iter() @@ -65,7 +64,7 @@ impl ManagerStatus { } } -impl Display for ManagerStatus { +impl Display for ManagerStatus { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { if self.wanted_peers == 0 { return write!(f, "not maintaining any connections; "); @@ -136,7 +135,7 @@ impl Display for ManagerStatus { } } -impl Manager { +impl Manager { /// Create a new Manager with empty list of peers. pub fn new() -> Self { Manager { @@ -149,13 +148,13 @@ impl Manager { /// Add a peer to the list of peers we want to stay connected to, or /// update the list of addresses if the peer was already added. /// Returns whether this peer is a new peer. - pub fn add_peer(&mut self, peer_id: AuthorityId, addresses: Vec) -> bool { + pub fn add_peer(&mut self, peer_id: PK, addresses: Vec) -> bool { self.addresses.insert(peer_id, addresses).is_none() } /// Return Option containing addresses of the given peer, or None if /// the peer is unknown. - pub fn peer_addresses(&self, peer_id: &AuthorityId) -> Option> { + pub fn peer_addresses(&self, peer_id: &PK) -> Option> { self.addresses.get(peer_id).cloned() } @@ -163,7 +162,7 @@ impl Manager { /// but only if the peer is on the list of peers that we want to stay connected with. pub fn add_outgoing( &mut self, - peer_id: AuthorityId, + peer_id: PK, data_for_network: mpsc::UnboundedSender, ) -> AddResult { use AddResult::*; @@ -178,11 +177,7 @@ impl Manager { /// Add an established incoming connection with a known peer, /// but only if the peer is on the list of peers that we want to stay connected with. - pub fn add_incoming( - &mut self, - peer_id: AuthorityId, - exit: mpsc::UnboundedSender, - ) -> AddResult { + pub fn add_incoming(&mut self, peer_id: PK, exit: mpsc::UnboundedSender) -> AddResult { use AddResult::*; if !self.addresses.contains_key(&peer_id) { return Uninterested; @@ -195,7 +190,7 @@ impl Manager { /// Remove a peer from the list of peers that we want to stay connected with. /// Close any incoming and outgoing connections that were established. - pub fn remove_peer(&mut self, peer_id: &AuthorityId) { + pub fn remove_peer(&mut self, peer_id: &PK) { self.addresses.remove(peer_id); self.incoming.remove(peer_id); self.outgoing.remove(peer_id); @@ -204,7 +199,7 @@ impl Manager { /// Send data to a peer. /// Returns error if there is no outgoing connection to the peer, /// or if the connection is dead. - pub fn send_to(&mut self, peer_id: &AuthorityId, data: D) -> Result<(), SendError> { + pub fn send_to(&mut self, peer_id: &PK, data: D) -> Result<(), SendError> { self.outgoing .get(peer_id) .ok_or(SendError::PeerNotFound)? @@ -220,6 +215,7 @@ impl Manager { #[cfg(test)] mod tests { + use aleph_primitives::AuthorityId; use futures::{channel::mpsc, StreamExt}; use super::{AddResult::*, Manager, SendError}; @@ -230,7 +226,7 @@ mod tests { #[tokio::test] async fn add_remove() { - let mut manager = Manager::::new(); + let mut manager = Manager::::new(); let (peer_id, _) = key().await; let (peer_id_b, _) = key().await; let addresses = vec![ @@ -254,7 +250,7 @@ mod tests { #[tokio::test] async fn outgoing() { - let mut manager = Manager::::new(); + let mut manager = Manager::::new(); let data = String::from("DATA"); let (peer_id, _) = key().await; let addresses = vec![ @@ -285,7 +281,7 @@ mod tests { #[tokio::test] async fn incoming() { - let mut manager = Manager::::new(); + let mut manager = Manager::::new(); let (peer_id, _) = key().await; let addresses = vec![ String::from(""), diff --git a/finality-aleph/src/validator_network/manager/mod.rs b/finality-aleph/src/validator_network/manager/mod.rs index bbfe0641b5..421be90d29 100644 --- a/finality-aleph/src/validator_network/manager/mod.rs +++ b/finality-aleph/src/validator_network/manager/mod.rs @@ -3,10 +3,12 @@ use std::{ fmt::{Display, Error as FmtError, Formatter}, }; -use aleph_primitives::AuthorityId; use futures::channel::mpsc; -use crate::{network::PeerId, validator_network::Data}; +use crate::{ + network::PeerId, + validator_network::{Data, PublicKey}, +}; mod direction; mod legacy; @@ -44,15 +46,15 @@ pub enum AddResult { Replaced, } -struct ManagerStatus { - outgoing_peers: HashSet, - missing_outgoing: HashSet, - incoming_peers: HashSet, - missing_incoming: HashSet, +struct ManagerStatus { + outgoing_peers: HashSet, + missing_outgoing: HashSet, + incoming_peers: HashSet, + missing_incoming: HashSet, } -impl ManagerStatus { - fn new(manager: &Manager) -> Self { +impl ManagerStatus { + fn new(manager: &Manager) -> Self { let mut incoming_peers = HashSet::new(); let mut missing_incoming = HashSet::new(); let mut outgoing_peers = HashSet::new(); @@ -87,14 +89,14 @@ impl ManagerStatus { } } -fn pretty_authority_id_set(set: &HashSet) -> String { +fn pretty_authority_id_set(set: &HashSet) -> String { set.iter() .map(|authority_id| authority_id.to_short_string()) .collect::>() .join(", ") } -impl Display for ManagerStatus { +impl Display for ManagerStatus { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { let wanted_incoming = self.wanted_incoming(); let wanted_outgoing = self.wanted_outgoing(); @@ -158,23 +160,23 @@ impl Display for ManagerStatus { /// Network component responsible for holding the list of peers that we /// want to connect to or let them connect to us, and managing the established /// connections. -pub struct Manager { +pub struct Manager { // Which peers we want to be connected with, and which way. - wanted: DirectedPeers, + wanted: DirectedPeers, // This peers we are connected with. We ensure that this is always a subset of what we want. - have: HashMap>, + have: HashMap>, } -impl Manager { +impl Manager { /// Create a new Manager with empty list of peers. - pub fn new(own_id: AuthorityId) -> Self { + pub fn new(own_id: PK) -> Self { Manager { wanted: DirectedPeers::new(own_id), have: HashMap::new(), } } - fn active_connection(&self, peer_id: &AuthorityId) -> bool { + fn active_connection(&self, peer_id: &PK) -> bool { self.have .get(peer_id) .map(|sender| !sender.is_closed()) @@ -186,19 +188,19 @@ impl Manager { /// Returns whether we should start attempts at connecting with the peer, which depends on the /// coorddinated pseudorandom decision on the direction of the connection and whether this was /// added for the first time. - pub fn add_peer(&mut self, peer_id: AuthorityId, addresses: Vec) -> bool { + pub fn add_peer(&mut self, peer_id: PK, addresses: Vec) -> bool { self.wanted.add_peer(peer_id, addresses) } /// Return the addresses of the given peer, or None if we shouldn't attempt connecting with the peer. - pub fn peer_addresses(&self, peer_id: &AuthorityId) -> Option> { + pub fn peer_addresses(&self, peer_id: &PK) -> Option> { self.wanted.peer_addresses(peer_id) } /// Add an established connection with a known peer, but only if the peer is among the peers we want to be connected to. pub fn add_connection( &mut self, - peer_id: AuthorityId, + peer_id: PK, data_for_network: mpsc::UnboundedSender, ) -> AddResult { use AddResult::*; @@ -213,7 +215,7 @@ impl Manager { /// Remove a peer from the list of peers that we want to stay connected with. /// Close any incoming and outgoing connections that were established. - pub fn remove_peer(&mut self, peer_id: &AuthorityId) { + pub fn remove_peer(&mut self, peer_id: &PK) { self.wanted.remove_peer(peer_id); self.have.remove(peer_id); } @@ -221,7 +223,7 @@ impl Manager { /// Send data to a peer. /// Returns error if there is no outgoing connection to the peer, /// or if the connection is dead. - pub fn send_to(&mut self, peer_id: &AuthorityId, data: D) -> Result<(), SendError> { + pub fn send_to(&mut self, peer_id: &PK, data: D) -> Result<(), SendError> { self.have .get(peer_id) .ok_or(SendError::PeerNotFound)? @@ -237,6 +239,7 @@ impl Manager { #[cfg(test)] mod tests { + use aleph_primitives::AuthorityId; use futures::{channel::mpsc, StreamExt}; use super::{AddResult::*, Manager, SendError}; @@ -248,7 +251,7 @@ mod tests { #[tokio::test] async fn add_remove() { let (own_id, _) = key().await; - let mut manager = Manager::::new(own_id); + let mut manager = Manager::::new(own_id); let (peer_id, _) = key().await; let (peer_id_b, _) = key().await; let addresses = vec![ @@ -280,9 +283,11 @@ mod tests { #[tokio::test] async fn send_receive() { let (mut connecting_id, _) = key().await; - let mut connecting_manager = Manager::::new(connecting_id.clone()); + let mut connecting_manager = + Manager::::new(connecting_id.clone()); let (mut listening_id, _) = key().await; - let mut listening_manager = Manager::::new(listening_id.clone()); + let mut listening_manager = + Manager::::new(listening_id.clone()); let data = String::from("DATA"); let addresses = vec![ String::from(""), diff --git a/finality-aleph/src/validator_network/mod.rs b/finality-aleph/src/validator_network/mod.rs index b9f3d2524b..e07e8ca3b4 100644 --- a/finality-aleph/src/validator_network/mod.rs +++ b/finality-aleph/src/validator_network/mod.rs @@ -1,10 +1,9 @@ use std::fmt::Display; -use aleph_primitives::AuthorityId; use codec::Codec; -use sp_core::crypto::KeyTypeId; use tokio::io::{AsyncRead, AsyncWrite}; +mod crypto; mod incoming; mod io; mod manager; @@ -14,10 +13,9 @@ mod outgoing; mod protocols; mod service; +pub use crypto::{PrivateKey, PublicKey}; pub use service::Service; -pub const KEY_TYPE: KeyTypeId = KeyTypeId(*b"a0vn"); - /// What the data sent using the network has to satisfy. pub trait Data: Clone + Codec + Send + Sync + 'static {} @@ -31,16 +29,16 @@ impl Data for D {} /// implementation might fail to deliver any specific message, so messages have to be resent while /// they still should be delivered. #[async_trait::async_trait] -pub trait Network: Send + 'static { +pub trait Network: Send + 'static { /// Add the peer to the set of connected peers. - fn add_connection(&mut self, peer: AuthorityId, addresses: Vec); + fn add_connection(&mut self, peer: PK, addresses: Vec); /// Remove the peer from the set of connected peers and close the connection. - fn remove_connection(&mut self, peer: AuthorityId); + fn remove_connection(&mut self, peer: PK); /// Send a message to a single peer. /// This function should be implemented in a non-blocking manner. - fn send(&self, data: D, recipient: AuthorityId); + fn send(&self, data: D, recipient: PK); /// Receive a message from the network. async fn next(&mut self) -> Option; diff --git a/finality-aleph/src/validator_network/outgoing.rs b/finality-aleph/src/validator_network/outgoing.rs index d22858497b..cdefe86b74 100644 --- a/finality-aleph/src/validator_network/outgoing.rs +++ b/finality-aleph/src/validator_network/outgoing.rs @@ -1,27 +1,23 @@ use std::fmt::{Debug, Display, Error as FmtError, Formatter}; -use aleph_primitives::AuthorityId; use futures::channel::mpsc; use log::{debug, info}; use tokio::time::{sleep, Duration}; -use crate::{ - crypto::AuthorityPen, - validator_network::{ - protocols::{ - protocol, ConnectionType, ProtocolError, ProtocolNegotiationError, ResultForService, - }, - ConnectionInfo, Data, Dialer, PeerAddressInfo, +use crate::validator_network::{ + protocols::{ + protocol, ConnectionType, ProtocolError, ProtocolNegotiationError, ResultForService, }, + ConnectionInfo, Data, Dialer, PeerAddressInfo, PrivateKey, PublicKey, }; -enum OutgoingError> { +enum OutgoingError> { Dial(ND::Error), ProtocolNegotiation(PeerAddressInfo, ProtocolNegotiationError), - Protocol(PeerAddressInfo, ProtocolError), + Protocol(PeerAddressInfo, ProtocolError), } -impl> Display for OutgoingError { +impl> Display for OutgoingError { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { use OutgoingError::*; match self { @@ -40,14 +36,14 @@ impl> Display for OutgoingError { } } -async fn manage_outgoing>( - authority_pen: AuthorityPen, - peer_id: AuthorityId, +async fn manage_outgoing>( + private_key: PK, + peer_id: PK::PublicKey, mut dialer: ND, addresses: Vec, - result_for_parent: mpsc::UnboundedSender>, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, -) -> Result<(), OutgoingError> { +) -> Result<(), OutgoingError> { debug!(target: "validator-network", "Trying to connect to {}.", peer_id); let stream = dialer .connect(addresses) @@ -62,7 +58,7 @@ async fn manage_outgoing>( protocol .manage_outgoing( stream, - authority_pen, + private_key, peer_id, result_for_parent, data_for_user, @@ -76,16 +72,16 @@ const RETRY_DELAY: Duration = Duration::from_secs(10); /// Establish an outgoing connection to the provided peer using the dialer and then manage it. /// While this works it will send any data from the user to the peer. Any failures will be reported /// to the parent, so that connections can be reestablished if necessary. -pub async fn outgoing>( - authority_pen: AuthorityPen, - peer_id: AuthorityId, +pub async fn outgoing>( + private_key: PK, + peer_id: PK::PublicKey, dialer: ND, addresses: Vec, - result_for_parent: mpsc::UnboundedSender>, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) { if let Err(e) = manage_outgoing( - authority_pen, + private_key, peer_id.clone(), dialer, addresses.clone(), diff --git a/finality-aleph/src/validator_network/protocols/handshake.rs b/finality-aleph/src/validator_network/protocols/handshake.rs index 54f0bf8820..e4c021b0ff 100644 --- a/finality-aleph/src/validator_network/protocols/handshake.rs +++ b/finality-aleph/src/validator_network/protocols/handshake.rs @@ -1,23 +1,19 @@ use std::fmt::{Display, Error as FmtError, Formatter}; -use aleph_primitives::AuthorityId; use codec::{Decode, Encode}; use rand::Rng; use tokio::time::{timeout, Duration}; -use crate::{ - crypto::{verify, AuthorityPen, Signature}, - validator_network::{ - io::{receive_data, send_data, ReceiveError, SendError}, - Splittable, - }, +use crate::validator_network::{ + io::{receive_data, send_data, ReceiveError, SendError}, + PrivateKey, PublicKey, Splittable, }; pub const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10); /// Handshake error. #[derive(Debug)] -pub enum HandshakeError { +pub enum HandshakeError { /// Send error. SendError(SendError), /// Receive error. @@ -25,12 +21,12 @@ pub enum HandshakeError { /// Signature error. SignatureError, /// Challenge contains invalid peer id. - ChallengeError(AuthorityId, AuthorityId), + ChallengeError(PK, PK), /// Timeout. TimedOut, } -impl Display for HandshakeError { +impl Display for HandshakeError { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { use HandshakeError::*; match self { @@ -47,13 +43,13 @@ impl Display for HandshakeError { } } -impl From for HandshakeError { +impl From for HandshakeError { fn from(e: SendError) -> Self { HandshakeError::SendError(e) } } -impl From for HandshakeError { +impl From for HandshakeError { fn from(e: ReceiveError) -> Self { HandshakeError::ReceiveError(e) } @@ -61,14 +57,14 @@ impl From for HandshakeError { /// Handshake challenge. Contains public key of the creator, and a random nonce. #[derive(Debug, Clone, Encode, Decode)] -struct Challenge { - id: AuthorityId, +struct Challenge { + id: PK, nonce: [u8; 32], } -impl Challenge { +impl Challenge { /// Prepare new challenge that contains ID of the creator. - fn new(id: AuthorityId) -> Self { + fn new(id: PK) -> Self { let nonce = rand::thread_rng().gen::<[u8; 32]>(); Self { id, nonce } } @@ -77,23 +73,28 @@ impl Challenge { /// Handshake response. Contains public key of the creator, and signature /// related to the received challenge. #[derive(Debug, Clone, Encode, Decode)] -struct Response { - id: AuthorityId, - signature: Signature, +struct Response { + id: PK, + signature: PK::Signature, } -impl Response { +impl Response { + // Amusingly the `Signature = PK::Signature` is necessary, the compiler cannot even do this + // simple reasoning. :/ /// Create a new response by signing the challenge. - async fn new(pen: &AuthorityPen, challenge: &Challenge) -> Self { + async fn new>( + private_key: &P, + challenge: &Challenge, + ) -> Self { Self { - id: pen.authority_id(), - signature: pen.sign(&challenge.encode()).await, + id: private_key.public_key(), + signature: private_key.sign(&challenge.encode()).await, } } /// Verify the Response sent by the peer. - fn verify(&self, challenge: &Challenge) -> bool { - verify(&self.id, &challenge.encode(), &self.signature) + fn verify(&self, challenge: &Challenge) -> bool { + self.id.verify(&challenge.encode(), &self.signature) } } @@ -105,15 +106,15 @@ impl Response { /// will NOT be secured in any way. We assume that if the channel is /// compromised after the handshake, the peer will establish another connection, /// which will replace the current one. -pub async fn execute_v0_handshake_incoming( +pub async fn execute_v0_handshake_incoming( stream: S, - authority_pen: AuthorityPen, -) -> Result<(S::Sender, S::Receiver, AuthorityId), HandshakeError> { + private_key: PK, +) -> Result<(S::Sender, S::Receiver, PK::PublicKey), HandshakeError> { // send challenge - let our_challenge = Challenge::new(authority_pen.authority_id()); + let our_challenge = Challenge::new(private_key.public_key()); let stream = send_data(stream, our_challenge.clone()).await?; // receive response - let (stream, peer_response) = receive_data::<_, Response>(stream).await?; + let (stream, peer_response) = receive_data::<_, Response>(stream).await?; // validate response if !peer_response.verify(&our_challenge) { return Err(HandshakeError::SignatureError); @@ -132,45 +133,45 @@ pub async fn execute_v0_handshake_incoming( /// will NOT be secured in any way. We assume that if the channel is /// compromised after the handshake, we will establish another connection, /// which will replace the current one. -pub async fn execute_v0_handshake_outgoing( +pub async fn execute_v0_handshake_outgoing( stream: S, - authority_pen: AuthorityPen, - peer_id: AuthorityId, -) -> Result<(S::Sender, S::Receiver), HandshakeError> { + private_key: PK, + peer_id: PK::PublicKey, +) -> Result<(S::Sender, S::Receiver), HandshakeError> { // receive challenge - let (stream, peer_challenge) = receive_data::<_, Challenge>(stream).await?; + let (stream, peer_challenge) = receive_data::<_, Challenge>(stream).await?; if peer_id != peer_challenge.id { return Err(HandshakeError::ChallengeError(peer_id, peer_challenge.id)); } // send response - let our_response = Response::new(&authority_pen, &peer_challenge).await; + let our_response = Response::new(&private_key, &peer_challenge).await; let stream = send_data(stream, our_response).await?; let (sender, receiver) = stream.split(); Ok((sender, receiver)) } /// Wrapper that adds timeout to the function performing handshake. -pub async fn v0_handshake_incoming( +pub async fn v0_handshake_incoming( stream: S, - authority_pen: AuthorityPen, -) -> Result<(S::Sender, S::Receiver, AuthorityId), HandshakeError> { + private_key: PK, +) -> Result<(S::Sender, S::Receiver, PK::PublicKey), HandshakeError> { timeout( HANDSHAKE_TIMEOUT, - execute_v0_handshake_incoming(stream, authority_pen), + execute_v0_handshake_incoming(stream, private_key), ) .await .map_err(|_| HandshakeError::TimedOut)? } /// Wrapper that adds timeout to the function performing handshake. -pub async fn v0_handshake_outgoing( +pub async fn v0_handshake_outgoing( stream: S, - authority_pen: AuthorityPen, - peer_id: AuthorityId, -) -> Result<(S::Sender, S::Receiver), HandshakeError> { + private_key: PK, + peer_id: PK::PublicKey, +) -> Result<(S::Sender, S::Receiver), HandshakeError> { timeout( HANDSHAKE_TIMEOUT, - execute_v0_handshake_outgoing(stream, authority_pen, peer_id), + execute_v0_handshake_outgoing(stream, private_key, peer_id), ) .await .map_err(|_| HandshakeError::TimedOut)? @@ -178,6 +179,7 @@ pub async fn v0_handshake_outgoing( #[cfg(test)] mod tests { + use aleph_primitives::AuthorityId; use futures::{join, try_join}; use super::{ @@ -193,7 +195,7 @@ mod tests { }, }; - fn assert_send_error(result: Result) { + fn assert_send_error(result: Result>) { match result { Err(HandshakeError::SendError(_)) => (), x => panic!( @@ -203,7 +205,7 @@ mod tests { }; } - fn assert_receive_error(result: Result) { + fn assert_receive_error(result: Result>) { match result { Err(HandshakeError::ReceiveError(_)) => (), x => panic!( @@ -213,7 +215,7 @@ mod tests { }; } - fn assert_signature_error(result: Result) { + fn assert_signature_error(result: Result>) { match result { Err(HandshakeError::SignatureError) => (), x => panic!( @@ -223,7 +225,7 @@ mod tests { }; } - fn assert_challenge_error(result: Result) { + fn assert_challenge_error(result: Result>) { match result { Err(HandshakeError::ChallengeError(_, _)) => (), x => panic!( @@ -276,7 +278,7 @@ mod tests { authority_pen: AuthorityPen, ) { // receive challenge - let (stream, _) = receive_data::<_, Challenge>(stream) + let (stream, _) = receive_data::<_, Challenge>(stream) .await .expect("should receive"); // prepare fake challenge @@ -304,7 +306,7 @@ mod tests { authority_pen: AuthorityPen, ) { // receive challenge - let (stream, challenge) = receive_data::<_, Challenge>(stream) + let (stream, challenge) = receive_data::<_, Challenge>(stream) .await .expect("should receive"); // prepare fake id @@ -341,7 +343,7 @@ mod tests { execute_v0_handshake_incoming(stream_a, pen_a), // mock outgoing handshake: receive the first message and terminate async { - receive_data::<_, Challenge>(stream_b) + receive_data::<_, Challenge>(stream_b) .await .expect("should receive"); }, diff --git a/finality-aleph/src/validator_network/protocols/mod.rs b/finality-aleph/src/validator_network/protocols/mod.rs index d16430d205..dbbdb3af2e 100644 --- a/finality-aleph/src/validator_network/protocols/mod.rs +++ b/finality-aleph/src/validator_network/protocols/mod.rs @@ -1,14 +1,10 @@ use std::fmt::{Display, Error as FmtError, Formatter}; -use aleph_primitives::AuthorityId; use futures::channel::mpsc; -use crate::{ - crypto::AuthorityPen, - validator_network::{ - io::{ReceiveError, SendError}, - Data, Splittable, - }, +use crate::validator_network::{ + io::{ReceiveError, SendError}, + Data, PrivateKey, PublicKey, Splittable, }; mod handshake; @@ -34,11 +30,7 @@ pub enum ConnectionType { /// of the remote node, followed by a channel for sending data to that node, with None if the /// connection was unsuccessful and should be reestablished. Finally a marker for legacy /// compatibility. -pub type ResultForService = ( - AuthorityId, - Option>, - ConnectionType, -); +pub type ResultForService = (PK, Option>, ConnectionType); /// Defines the protocol for communication. #[derive(Debug, PartialEq, Eq)] @@ -52,9 +44,9 @@ pub enum Protocol { /// Protocol error. #[derive(Debug)] -pub enum ProtocolError { +pub enum ProtocolError { /// Error during performing a handshake. - HandshakeError(HandshakeError), + HandshakeError(HandshakeError), /// Sending failed. SendError(SendError), /// Receiving failed. @@ -67,7 +59,7 @@ pub enum ProtocolError { NoUserConnection, } -impl Display for ProtocolError { +impl Display for ProtocolError { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { use ProtocolError::*; match self { @@ -81,19 +73,19 @@ impl Display for ProtocolError { } } -impl From for ProtocolError { - fn from(e: HandshakeError) -> Self { +impl From> for ProtocolError { + fn from(e: HandshakeError) -> Self { ProtocolError::HandshakeError(e) } } -impl From for ProtocolError { +impl From for ProtocolError { fn from(e: SendError) -> Self { ProtocolError::SendError(e) } } -impl From for ProtocolError { +impl From for ProtocolError { fn from(e: ReceiveError) -> Self { ProtocolError::ReceiveError(e) } @@ -107,36 +99,36 @@ impl Protocol { const MAX_VERSION: Version = 1; /// Launches the proper variant of the protocol (receiver half). - pub async fn manage_incoming( + pub async fn manage_incoming( &self, stream: S, - authority_pen: AuthorityPen, - result_for_service: mpsc::UnboundedSender>, + private_key: PK, + result_for_service: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, - ) -> Result<(), ProtocolError> { + ) -> Result<(), ProtocolError> { use Protocol::*; match self { - V0 => v0::incoming(stream, authority_pen, result_for_service, data_for_user).await, - V1 => v1::incoming(stream, authority_pen, result_for_service, data_for_user).await, + V0 => v0::incoming(stream, private_key, result_for_service, data_for_user).await, + V1 => v1::incoming(stream, private_key, result_for_service, data_for_user).await, } } /// Launches the proper variant of the protocol (sender half). - pub async fn manage_outgoing( + pub async fn manage_outgoing( &self, stream: S, - authority_pen: AuthorityPen, - peer_id: AuthorityId, - result_for_service: mpsc::UnboundedSender>, + private_key: PK, + peer_id: PK::PublicKey, + result_for_service: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, - ) -> Result<(), ProtocolError> { + ) -> Result<(), ProtocolError> { use Protocol::*; match self { - V0 => v0::outgoing(stream, authority_pen, peer_id, result_for_service).await, + V0 => v0::outgoing(stream, private_key, peer_id, result_for_service).await, V1 => { v1::outgoing( stream, - authority_pen, + private_key, peer_id, result_for_service, data_for_user, diff --git a/finality-aleph/src/validator_network/protocols/v0/mod.rs b/finality-aleph/src/validator_network/protocols/v0/mod.rs index 1f80de2baf..1cbbb895a5 100644 --- a/finality-aleph/src/validator_network/protocols/v0/mod.rs +++ b/finality-aleph/src/validator_network/protocols/v0/mod.rs @@ -1,18 +1,14 @@ -use aleph_primitives::AuthorityId; use futures::{channel::mpsc, StreamExt}; use log::{debug, info, trace}; use tokio::io::{AsyncRead, AsyncWrite}; -use crate::{ - crypto::AuthorityPen, - validator_network::{ - io::{receive_data, send_data}, - protocols::{ - handshake::{v0_handshake_incoming, v0_handshake_outgoing}, - ConnectionType, ProtocolError, ResultForService, - }, - Data, Splittable, +use crate::validator_network::{ + io::{receive_data, send_data}, + protocols::{ + handshake::{v0_handshake_incoming, v0_handshake_outgoing}, + ConnectionType, ProtocolError, ResultForService, }, + Data, PrivateKey, PublicKey, Splittable, }; mod heartbeat; @@ -21,10 +17,10 @@ use heartbeat::{heartbeat_receiver, heartbeat_sender}; /// Receives data from the parent service and sends it over the network. /// Exits when the parent channel is closed, or if the network connection is broken. -async fn sending( +async fn sending( mut sender: S, mut data_from_user: mpsc::UnboundedReceiver, -) -> Result<(), ProtocolError> { +) -> Result<(), ProtocolError> { loop { sender = match data_from_user.next().await { Some(data) => send_data(sender, data).await?, @@ -36,14 +32,14 @@ async fn sending( /// Performs the handshake, and then keeps sending data received from the parent service. /// Exits on parent request, or in case of broken or dead network connection. -pub async fn outgoing( +pub async fn outgoing( stream: S, - authority_pen: AuthorityPen, - peer_id: AuthorityId, - result_for_parent: mpsc::UnboundedSender>, -) -> Result<(), ProtocolError> { + private_key: PK, + peer_id: PK::PublicKey, + result_for_parent: mpsc::UnboundedSender>, +) -> Result<(), ProtocolError> { trace!(target: "validator-network", "Extending hand to {}.", peer_id); - let (sender, receiver) = v0_handshake_outgoing(stream, authority_pen, peer_id.clone()).await?; + let (sender, receiver) = v0_handshake_outgoing(stream, private_key, peer_id.clone()).await?; info!(target: "validator-network", "Outgoing handshake with {} finished successfully.", peer_id); let (data_for_network, data_from_user) = mpsc::unbounded(); result_for_parent @@ -68,10 +64,10 @@ pub async fn outgoing( /// Receives data from the network and sends it to the parent service. /// Exits when the parent channel is closed, or if the network connection is broken. -async fn receiving( +async fn receiving( mut stream: S, data_for_user: mpsc::UnboundedSender, -) -> Result<(), ProtocolError> { +) -> Result<(), ProtocolError> { loop { let (old_stream, data) = receive_data(stream).await?; stream = old_stream; @@ -83,14 +79,14 @@ async fn receiving( /// Performs the handshake, and then keeps sending data received from the network to the parent service. /// Exits on parent request, or in case of broken or dead network connection. -pub async fn incoming( +pub async fn incoming( stream: S, - authority_pen: AuthorityPen, - result_for_parent: mpsc::UnboundedSender>, + private_key: PK, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, -) -> Result<(), ProtocolError> { +) -> Result<(), ProtocolError> { trace!(target: "validator-network", "Waiting for extended hand..."); - let (sender, receiver, peer_id) = v0_handshake_incoming(stream, authority_pen).await?; + let (sender, receiver, peer_id) = v0_handshake_incoming(stream, private_key).await?; info!(target: "validator-network", "Incoming handshake with {} finished successfully.", peer_id); let (tx_exit, mut exit) = mpsc::unbounded(); @@ -138,11 +134,11 @@ mod tests { AuthorityPen, AuthorityId, AuthorityPen, - impl futures::Future>, - impl futures::Future>, + impl futures::Future>>, + impl futures::Future>>, UnboundedReceiver, - UnboundedReceiver>, - UnboundedReceiver>, + UnboundedReceiver>, + UnboundedReceiver>, ) { let (stream_incoming, stream_outgoing) = MockSplittable::new(4096); let (id_incoming, pen_incoming) = key().await; diff --git a/finality-aleph/src/validator_network/protocols/v1/mod.rs b/finality-aleph/src/validator_network/protocols/v1/mod.rs index b1599f243e..844e4a385c 100644 --- a/finality-aleph/src/validator_network/protocols/v1/mod.rs +++ b/finality-aleph/src/validator_network/protocols/v1/mod.rs @@ -1,4 +1,3 @@ -use aleph_primitives::AuthorityId; use codec::{Decode, Encode}; use futures::{channel::mpsc, StreamExt}; use log::{debug, info, trace}; @@ -7,16 +6,13 @@ use tokio::{ time::{timeout, Duration}, }; -use crate::{ - crypto::AuthorityPen, - validator_network::{ - io::{receive_data, send_data}, - protocols::{ - handshake::{v0_handshake_incoming, v0_handshake_outgoing}, - ConnectionType, ProtocolError, ResultForService, - }, - Data, Splittable, +use crate::validator_network::{ + io::{receive_data, send_data}, + protocols::{ + handshake::{v0_handshake_incoming, v0_handshake_outgoing}, + ConnectionType, ProtocolError, ResultForService, }, + Data, PrivateKey, PublicKey, Splittable, }; const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5); @@ -28,10 +24,10 @@ enum Message { Heartbeat, } -async fn sending( +async fn sending( mut sender: S, mut data_from_user: mpsc::UnboundedReceiver, -) -> Result<(), ProtocolError> { +) -> Result<(), ProtocolError> { use Message::*; loop { let to_send = match timeout(HEARTBEAT_TIMEOUT, data_from_user.next()).await { @@ -46,10 +42,10 @@ async fn sending( } } -async fn receiving( +async fn receiving( mut stream: S, data_for_user: mpsc::UnboundedSender, -) -> Result<(), ProtocolError> { +) -> Result<(), ProtocolError> { use Message::*; loop { let (old_stream, message) = timeout( @@ -68,12 +64,17 @@ async fn receiving( } } -async fn manage_connection( +async fn manage_connection< + PK: PublicKey, + D: Data, + S: AsyncWrite + Unpin + Send, + R: AsyncRead + Unpin + Send, +>( sender: S, receiver: R, data_from_user: mpsc::UnboundedReceiver, data_for_user: mpsc::UnboundedSender, -) -> Result<(), ProtocolError> { +) -> Result<(), ProtocolError> { let sending = sending(sender, data_from_user); let receiving = receiving(receiver, data_for_user); tokio::select! { @@ -84,15 +85,15 @@ async fn manage_connection( +pub async fn outgoing( stream: S, - authority_pen: AuthorityPen, - peer_id: AuthorityId, - result_for_parent: mpsc::UnboundedSender>, + private_key: PK, + peer_id: PK::PublicKey, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, -) -> Result<(), ProtocolError> { +) -> Result<(), ProtocolError> { trace!(target: "validator-network", "Extending hand to {}.", peer_id); - let (sender, receiver) = v0_handshake_outgoing(stream, authority_pen, peer_id.clone()).await?; + let (sender, receiver) = v0_handshake_outgoing(stream, private_key, peer_id.clone()).await?; info!(target: "validator-network", "Outgoing handshake with {} finished successfully.", peer_id); let (data_for_network, data_from_user) = mpsc::unbounded(); result_for_parent @@ -105,14 +106,14 @@ pub async fn outgoing( /// Performs the incoming handshake, and then manages a connection sending and receiving data. /// Exits on parent request (when the data source is dropped), or in case of broken or dead /// network connection. -pub async fn incoming( +pub async fn incoming( stream: S, - authority_pen: AuthorityPen, - result_for_parent: mpsc::UnboundedSender>, + private_key: PK, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, -) -> Result<(), ProtocolError> { +) -> Result<(), ProtocolError> { trace!(target: "validator-network", "Waiting for extended hand..."); - let (sender, receiver, peer_id) = v0_handshake_incoming(stream, authority_pen).await?; + let (sender, receiver, peer_id) = v0_handshake_incoming(stream, private_key).await?; info!(target: "validator-network", "Incoming handshake with {} finished successfully.", peer_id); let (data_for_network, data_from_user) = mpsc::unbounded(); result_for_parent @@ -145,12 +146,12 @@ mod tests { AuthorityPen, AuthorityId, AuthorityPen, - impl futures::Future>, - impl futures::Future>, + impl futures::Future>>, + impl futures::Future>>, UnboundedReceiver, UnboundedReceiver, - UnboundedReceiver>, - UnboundedReceiver>, + UnboundedReceiver>, + UnboundedReceiver>, ) { let (stream_incoming, stream_outgoing) = MockSplittable::new(4096); let (id_incoming, pen_incoming) = key().await; diff --git a/finality-aleph/src/validator_network/service.rs b/finality-aleph/src/validator_network/service.rs index 86a2aca44c..fa387384c6 100644 --- a/finality-aleph/src/validator_network/service.rs +++ b/finality-aleph/src/validator_network/service.rs @@ -1,6 +1,5 @@ use std::{collections::HashSet, fmt::Debug}; -use aleph_primitives::AuthorityId; use futures::{ channel::{mpsc, oneshot}, StreamExt, @@ -9,32 +8,32 @@ use log::{debug, info, trace, warn}; use tokio::time; use crate::{ - crypto::AuthorityPen, + network::PeerId, validator_network::{ incoming::incoming, manager::{AddResult, LegacyManager, Manager}, outgoing::outgoing, protocols::{ConnectionType, ResultForService}, - Data, Dialer, Listener, Network, + Data, Dialer, Listener, Network, PrivateKey, PublicKey, }, SpawnTaskHandle, STATUS_REPORT_INTERVAL, }; -enum ServiceCommand { - AddConnection(AuthorityId, Vec), - DelConnection(AuthorityId), - SendData(D, AuthorityId), +enum ServiceCommand { + AddConnection(PK, Vec), + DelConnection(PK), + SendData(D, PK), } -struct ServiceInterface { - commands_for_service: mpsc::UnboundedSender>, +struct ServiceInterface { + commands_for_service: mpsc::UnboundedSender>, next_from_service: mpsc::UnboundedReceiver, } #[async_trait::async_trait] -impl Network for ServiceInterface { +impl Network for ServiceInterface { /// Add the peer to the set of connected peers. - fn add_connection(&mut self, peer: AuthorityId, addresses: Vec) { + fn add_connection(&mut self, peer: PK, addresses: Vec) { if self .commands_for_service .unbounded_send(ServiceCommand::AddConnection(peer, addresses)) @@ -45,7 +44,7 @@ impl Network for ServiceInterface { } /// Remove the peer from the set of connected peers and close the connection. - fn remove_connection(&mut self, peer: AuthorityId) { + fn remove_connection(&mut self, peer: PK) { if self .commands_for_service .unbounded_send(ServiceCommand::DelConnection(peer)) @@ -57,7 +56,7 @@ impl Network for ServiceInterface { /// Send a message to a single peer. /// This function should be implemented in a non-blocking manner. - fn send(&self, data: D, recipient: AuthorityId) { + fn send(&self, data: D, recipient: PK) { if self .commands_for_service .unbounded_send(ServiceCommand::SendData(data, recipient)) @@ -74,27 +73,34 @@ impl Network for ServiceInterface { } /// A service that has to be run for the validator network to work. -pub struct Service, NL: Listener> { - commands_from_interface: mpsc::UnboundedReceiver>, +pub struct Service, NL: Listener> +where + PK::PublicKey: PeerId, +{ + commands_from_interface: mpsc::UnboundedReceiver>, next_to_interface: mpsc::UnboundedSender, - manager: Manager, + manager: Manager, dialer: ND, listener: NL, spawn_handle: SpawnTaskHandle, - authority_pen: AuthorityPen, + private_key: PK, // Backwards compatibility with the one-sided connections, remove when no longer needed. - legacy_connected: HashSet, - legacy_manager: LegacyManager, + legacy_connected: HashSet, + legacy_manager: LegacyManager, } -impl, NL: Listener> Service { +impl, NL: Listener> + Service +where + PK::PublicKey: PeerId, +{ /// Create a new validator network service plus an interface for interacting with it. pub fn new( dialer: ND, listener: NL, - authority_pen: AuthorityPen, + private_key: PK, spawn_handle: SpawnTaskHandle, - ) -> (Self, impl Network) { + ) -> (Self, impl Network) { // Channel for sending commands between the service and interface let (commands_for_service, commands_from_interface) = mpsc::unbounded(); // Channel for receiving data from the network @@ -103,11 +109,11 @@ impl, NL: Listener> Service, NL: Listener> Service, - result_for_parent: mpsc::UnboundedSender>, + result_for_parent: mpsc::UnboundedSender>, ) { - let authority_pen = self.authority_pen.clone(); + let private_key = self.private_key.clone(); let dialer = self.dialer.clone(); let next_to_interface = self.next_to_interface.clone(); self.spawn_handle .spawn("aleph/validator_network_outgoing", None, async move { outgoing( - authority_pen, + private_key, peer_id, dialer, addresses, @@ -144,17 +150,17 @@ impl, NL: Listener> Service>, + result_for_parent: mpsc::UnboundedSender>, ) { - let authority_pen = self.authority_pen.clone(); + let private_key = self.private_key.clone(); let next_to_interface = self.next_to_interface.clone(); self.spawn_handle .spawn("aleph/validator_network_incoming", None, async move { - incoming(authority_pen, stream, result_for_parent, next_to_interface).await; + incoming(private_key, stream, result_for_parent, next_to_interface).await; }); } - fn peer_addresses(&self, peer_id: &AuthorityId) -> Option> { + fn peer_addresses(&self, peer_id: &PK::PublicKey) -> Option> { match self.legacy_connected.contains(peer_id) { true => self.legacy_manager.peer_addresses(peer_id), false => self.manager.peer_addresses(peer_id), @@ -163,7 +169,7 @@ impl, NL: Listener> Service, connection_type: ConnectionType, ) -> AddResult { @@ -182,13 +188,13 @@ impl, NL: Listener> Service bool { + fn mark_legacy(&mut self, peer_id: &PK::PublicKey) -> bool { self.manager.remove_peer(peer_id); self.legacy_connected.insert(peer_id.clone()) } // Unmark a peer as legacy, putting it back in the normal set. - fn unmark_legacy(&mut self, peer_id: &AuthorityId) { + fn unmark_legacy(&mut self, peer_id: &PK::PublicKey) { self.legacy_connected.remove(peer_id); // Put it back if we still want to be connected. if let Some(addresses) = self.legacy_manager.peer_addresses(peer_id) { @@ -198,7 +204,11 @@ impl, NL: Listener> Service bool { + fn check_for_legacy( + &mut self, + peer_id: &PK::PublicKey, + connection_type: ConnectionType, + ) -> bool { use ConnectionType::*; match connection_type { LegacyIncoming => self.mark_legacy(peer_id), From bddce3508bf50ba8b66b0c51a9626bd173131915 Mon Sep 17 00:00:00 2001 From: timorl Date: Thu, 24 Nov 2022 10:39:49 +0100 Subject: [PATCH 2/3] SecretKey is a much better name than PrivateKey --- finality-aleph/src/tcp_network.rs | 4 +- .../src/validator_network/crypto.rs | 4 +- .../src/validator_network/incoming.rs | 20 +++---- finality-aleph/src/validator_network/mod.rs | 2 +- .../src/validator_network/outgoing.rs | 24 ++++----- .../validator_network/protocols/handshake.rs | 50 ++++++++--------- .../src/validator_network/protocols/mod.rs | 28 +++++----- .../src/validator_network/protocols/v0/mod.rs | 24 ++++----- .../src/validator_network/protocols/v1/mod.rs | 24 ++++----- .../src/validator_network/service.rs | 53 +++++++++---------- 10 files changed, 116 insertions(+), 117 deletions(-) diff --git a/finality-aleph/src/tcp_network.rs b/finality-aleph/src/tcp_network.rs index dfd30b3c3d..7fd805d6f5 100644 --- a/finality-aleph/src/tcp_network.rs +++ b/finality-aleph/src/tcp_network.rs @@ -12,7 +12,7 @@ use tokio::net::{ use crate::{ crypto::{verify, AuthorityPen, Signature}, network::{Multiaddress, NetworkIdentity, PeerId}, - validator_network::{ConnectionInfo, Dialer, Listener, PrivateKey, PublicKey, Splittable}, + validator_network::{ConnectionInfo, Dialer, Listener, PublicKey, SecretKey, Splittable}, }; pub const KEY_TYPE: KeyTypeId = KeyTypeId(*b"a0vn"); @@ -79,7 +79,7 @@ impl PublicKey for AuthorityId { } #[async_trait::async_trait] -impl PrivateKey for AuthorityPen { +impl SecretKey for AuthorityPen { type Signature = Signature; type PublicKey = AuthorityId; diff --git a/finality-aleph/src/validator_network/crypto.rs b/finality-aleph/src/validator_network/crypto.rs index 35e16aed8c..74891a6317 100644 --- a/finality-aleph/src/validator_network/crypto.rs +++ b/finality-aleph/src/validator_network/crypto.rs @@ -12,9 +12,9 @@ pub trait PublicKey: fn verify(&self, message: &[u8], signature: &Self::Signature) -> bool; } -/// Private key for signing messages, with an associated public key. +/// Secret key for signing messages, with an associated public key. #[async_trait::async_trait] -pub trait PrivateKey: Clone + Send + Sync + 'static { +pub trait SecretKey: Clone + Send + Sync + 'static { type Signature: Send + Sync + Clone + Codec; type PublicKey: PublicKey; diff --git a/finality-aleph/src/validator_network/incoming.rs b/finality-aleph/src/validator_network/incoming.rs index 2fe558ab3c..719092759e 100644 --- a/finality-aleph/src/validator_network/incoming.rs +++ b/finality-aleph/src/validator_network/incoming.rs @@ -5,7 +5,7 @@ use log::{debug, info}; use crate::validator_network::{ protocols::{protocol, ProtocolError, ProtocolNegotiationError, ResultForService}, - Data, PrivateKey, PublicKey, Splittable, + Data, PublicKey, SecretKey, Splittable, }; enum IncomingError { @@ -35,17 +35,17 @@ impl From> for IncomingError { } } -async fn manage_incoming( - private_key: PK, +async fn manage_incoming( + secret_key: SK, stream: S, - result_for_parent: mpsc::UnboundedSender>, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, -) -> Result<(), IncomingError> { +) -> Result<(), IncomingError> { debug!(target: "validator-network", "Performing incoming protocol negotiation."); let (stream, protocol) = protocol(stream).await?; debug!(target: "validator-network", "Negotiated protocol, running."); Ok(protocol - .manage_incoming(stream, private_key, result_for_parent, data_for_user) + .manage_incoming(stream, secret_key, result_for_parent, data_for_user) .await?) } @@ -54,14 +54,14 @@ async fn manage_incoming( /// process ends. Whenever data arrives on this connection it will be passed to the user. Any /// failures in receiving data result in the process stopping, we assume the other side will /// reestablish it if necessary. -pub async fn incoming( - private_key: PK, +pub async fn incoming( + secret_key: SK, stream: S, - result_for_parent: mpsc::UnboundedSender>, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) { let addr = stream.peer_address_info(); - if let Err(e) = manage_incoming(private_key, stream, result_for_parent, data_for_user).await { + if let Err(e) = manage_incoming(secret_key, stream, result_for_parent, data_for_user).await { info!(target: "validator-network", "Incoming connection from {} failed: {}.", addr, e); } } diff --git a/finality-aleph/src/validator_network/mod.rs b/finality-aleph/src/validator_network/mod.rs index e07e8ca3b4..5a35646ec0 100644 --- a/finality-aleph/src/validator_network/mod.rs +++ b/finality-aleph/src/validator_network/mod.rs @@ -13,7 +13,7 @@ mod outgoing; mod protocols; mod service; -pub use crypto::{PrivateKey, PublicKey}; +pub use crypto::{PublicKey, SecretKey}; pub use service::Service; /// What the data sent using the network has to satisfy. diff --git a/finality-aleph/src/validator_network/outgoing.rs b/finality-aleph/src/validator_network/outgoing.rs index cdefe86b74..e906cc558b 100644 --- a/finality-aleph/src/validator_network/outgoing.rs +++ b/finality-aleph/src/validator_network/outgoing.rs @@ -8,7 +8,7 @@ use crate::validator_network::{ protocols::{ protocol, ConnectionType, ProtocolError, ProtocolNegotiationError, ResultForService, }, - ConnectionInfo, Data, Dialer, PeerAddressInfo, PrivateKey, PublicKey, + ConnectionInfo, Data, Dialer, PeerAddressInfo, PublicKey, SecretKey, }; enum OutgoingError> { @@ -36,14 +36,14 @@ impl> Display for OutgoingError } } -async fn manage_outgoing>( - private_key: PK, - peer_id: PK::PublicKey, +async fn manage_outgoing>( + secret_key: SK, + peer_id: SK::PublicKey, mut dialer: ND, addresses: Vec, - result_for_parent: mpsc::UnboundedSender>, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, -) -> Result<(), OutgoingError> { +) -> Result<(), OutgoingError> { debug!(target: "validator-network", "Trying to connect to {}.", peer_id); let stream = dialer .connect(addresses) @@ -58,7 +58,7 @@ async fn manage_outgoing>( protocol .manage_outgoing( stream, - private_key, + secret_key, peer_id, result_for_parent, data_for_user, @@ -72,16 +72,16 @@ const RETRY_DELAY: Duration = Duration::from_secs(10); /// Establish an outgoing connection to the provided peer using the dialer and then manage it. /// While this works it will send any data from the user to the peer. Any failures will be reported /// to the parent, so that connections can be reestablished if necessary. -pub async fn outgoing>( - private_key: PK, - peer_id: PK::PublicKey, +pub async fn outgoing>( + secret_key: SK, + peer_id: SK::PublicKey, dialer: ND, addresses: Vec, - result_for_parent: mpsc::UnboundedSender>, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) { if let Err(e) = manage_outgoing( - private_key, + secret_key, peer_id.clone(), dialer, addresses.clone(), diff --git a/finality-aleph/src/validator_network/protocols/handshake.rs b/finality-aleph/src/validator_network/protocols/handshake.rs index e4c021b0ff..8f3c18d470 100644 --- a/finality-aleph/src/validator_network/protocols/handshake.rs +++ b/finality-aleph/src/validator_network/protocols/handshake.rs @@ -6,7 +6,7 @@ use tokio::time::{timeout, Duration}; use crate::validator_network::{ io::{receive_data, send_data, ReceiveError, SendError}, - PrivateKey, PublicKey, Splittable, + PublicKey, SecretKey, Splittable, }; pub const HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(10); @@ -82,13 +82,13 @@ impl Response { // Amusingly the `Signature = PK::Signature` is necessary, the compiler cannot even do this // simple reasoning. :/ /// Create a new response by signing the challenge. - async fn new>( - private_key: &P, + async fn new>( + secret_key: &SK, challenge: &Challenge, ) -> Self { Self { - id: private_key.public_key(), - signature: private_key.sign(&challenge.encode()).await, + id: secret_key.public_key(), + signature: secret_key.sign(&challenge.encode()).await, } } @@ -106,15 +106,15 @@ impl Response { /// will NOT be secured in any way. We assume that if the channel is /// compromised after the handshake, the peer will establish another connection, /// which will replace the current one. -pub async fn execute_v0_handshake_incoming( +pub async fn execute_v0_handshake_incoming( stream: S, - private_key: PK, -) -> Result<(S::Sender, S::Receiver, PK::PublicKey), HandshakeError> { + secret_key: SK, +) -> Result<(S::Sender, S::Receiver, SK::PublicKey), HandshakeError> { // send challenge - let our_challenge = Challenge::new(private_key.public_key()); + let our_challenge = Challenge::new(secret_key.public_key()); let stream = send_data(stream, our_challenge.clone()).await?; // receive response - let (stream, peer_response) = receive_data::<_, Response>(stream).await?; + let (stream, peer_response) = receive_data::<_, Response>(stream).await?; // validate response if !peer_response.verify(&our_challenge) { return Err(HandshakeError::SignatureError); @@ -133,45 +133,45 @@ pub async fn execute_v0_handshake_incoming( /// will NOT be secured in any way. We assume that if the channel is /// compromised after the handshake, we will establish another connection, /// which will replace the current one. -pub async fn execute_v0_handshake_outgoing( +pub async fn execute_v0_handshake_outgoing( stream: S, - private_key: PK, - peer_id: PK::PublicKey, -) -> Result<(S::Sender, S::Receiver), HandshakeError> { + secret_key: SK, + peer_id: SK::PublicKey, +) -> Result<(S::Sender, S::Receiver), HandshakeError> { // receive challenge - let (stream, peer_challenge) = receive_data::<_, Challenge>(stream).await?; + let (stream, peer_challenge) = receive_data::<_, Challenge>(stream).await?; if peer_id != peer_challenge.id { return Err(HandshakeError::ChallengeError(peer_id, peer_challenge.id)); } // send response - let our_response = Response::new(&private_key, &peer_challenge).await; + let our_response = Response::new(&secret_key, &peer_challenge).await; let stream = send_data(stream, our_response).await?; let (sender, receiver) = stream.split(); Ok((sender, receiver)) } /// Wrapper that adds timeout to the function performing handshake. -pub async fn v0_handshake_incoming( +pub async fn v0_handshake_incoming( stream: S, - private_key: PK, -) -> Result<(S::Sender, S::Receiver, PK::PublicKey), HandshakeError> { + secret_key: SK, +) -> Result<(S::Sender, S::Receiver, SK::PublicKey), HandshakeError> { timeout( HANDSHAKE_TIMEOUT, - execute_v0_handshake_incoming(stream, private_key), + execute_v0_handshake_incoming(stream, secret_key), ) .await .map_err(|_| HandshakeError::TimedOut)? } /// Wrapper that adds timeout to the function performing handshake. -pub async fn v0_handshake_outgoing( +pub async fn v0_handshake_outgoing( stream: S, - private_key: PK, - peer_id: PK::PublicKey, -) -> Result<(S::Sender, S::Receiver), HandshakeError> { + secret_key: SK, + peer_id: SK::PublicKey, +) -> Result<(S::Sender, S::Receiver), HandshakeError> { timeout( HANDSHAKE_TIMEOUT, - execute_v0_handshake_outgoing(stream, private_key, peer_id), + execute_v0_handshake_outgoing(stream, secret_key, peer_id), ) .await .map_err(|_| HandshakeError::TimedOut)? diff --git a/finality-aleph/src/validator_network/protocols/mod.rs b/finality-aleph/src/validator_network/protocols/mod.rs index dbbdb3af2e..151558406e 100644 --- a/finality-aleph/src/validator_network/protocols/mod.rs +++ b/finality-aleph/src/validator_network/protocols/mod.rs @@ -4,7 +4,7 @@ use futures::channel::mpsc; use crate::validator_network::{ io::{ReceiveError, SendError}, - Data, PrivateKey, PublicKey, Splittable, + Data, PublicKey, SecretKey, Splittable, }; mod handshake; @@ -99,36 +99,36 @@ impl Protocol { const MAX_VERSION: Version = 1; /// Launches the proper variant of the protocol (receiver half). - pub async fn manage_incoming( + pub async fn manage_incoming( &self, stream: S, - private_key: PK, - result_for_service: mpsc::UnboundedSender>, + secret_key: SK, + result_for_service: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, - ) -> Result<(), ProtocolError> { + ) -> Result<(), ProtocolError> { use Protocol::*; match self { - V0 => v0::incoming(stream, private_key, result_for_service, data_for_user).await, - V1 => v1::incoming(stream, private_key, result_for_service, data_for_user).await, + V0 => v0::incoming(stream, secret_key, result_for_service, data_for_user).await, + V1 => v1::incoming(stream, secret_key, result_for_service, data_for_user).await, } } /// Launches the proper variant of the protocol (sender half). - pub async fn manage_outgoing( + pub async fn manage_outgoing( &self, stream: S, - private_key: PK, - peer_id: PK::PublicKey, - result_for_service: mpsc::UnboundedSender>, + secret_key: SK, + peer_id: SK::PublicKey, + result_for_service: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, - ) -> Result<(), ProtocolError> { + ) -> Result<(), ProtocolError> { use Protocol::*; match self { - V0 => v0::outgoing(stream, private_key, peer_id, result_for_service).await, + V0 => v0::outgoing(stream, secret_key, peer_id, result_for_service).await, V1 => { v1::outgoing( stream, - private_key, + secret_key, peer_id, result_for_service, data_for_user, diff --git a/finality-aleph/src/validator_network/protocols/v0/mod.rs b/finality-aleph/src/validator_network/protocols/v0/mod.rs index 1cbbb895a5..8abe666e24 100644 --- a/finality-aleph/src/validator_network/protocols/v0/mod.rs +++ b/finality-aleph/src/validator_network/protocols/v0/mod.rs @@ -8,7 +8,7 @@ use crate::validator_network::{ handshake::{v0_handshake_incoming, v0_handshake_outgoing}, ConnectionType, ProtocolError, ResultForService, }, - Data, PrivateKey, PublicKey, Splittable, + Data, PublicKey, SecretKey, Splittable, }; mod heartbeat; @@ -32,14 +32,14 @@ async fn sending( /// Performs the handshake, and then keeps sending data received from the parent service. /// Exits on parent request, or in case of broken or dead network connection. -pub async fn outgoing( +pub async fn outgoing( stream: S, - private_key: PK, - peer_id: PK::PublicKey, - result_for_parent: mpsc::UnboundedSender>, -) -> Result<(), ProtocolError> { + secret_key: SK, + peer_id: SK::PublicKey, + result_for_parent: mpsc::UnboundedSender>, +) -> Result<(), ProtocolError> { trace!(target: "validator-network", "Extending hand to {}.", peer_id); - let (sender, receiver) = v0_handshake_outgoing(stream, private_key, peer_id.clone()).await?; + let (sender, receiver) = v0_handshake_outgoing(stream, secret_key, peer_id.clone()).await?; info!(target: "validator-network", "Outgoing handshake with {} finished successfully.", peer_id); let (data_for_network, data_from_user) = mpsc::unbounded(); result_for_parent @@ -79,14 +79,14 @@ async fn receiving( /// Performs the handshake, and then keeps sending data received from the network to the parent service. /// Exits on parent request, or in case of broken or dead network connection. -pub async fn incoming( +pub async fn incoming( stream: S, - private_key: PK, - result_for_parent: mpsc::UnboundedSender>, + secret_key: SK, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, -) -> Result<(), ProtocolError> { +) -> Result<(), ProtocolError> { trace!(target: "validator-network", "Waiting for extended hand..."); - let (sender, receiver, peer_id) = v0_handshake_incoming(stream, private_key).await?; + let (sender, receiver, peer_id) = v0_handshake_incoming(stream, secret_key).await?; info!(target: "validator-network", "Incoming handshake with {} finished successfully.", peer_id); let (tx_exit, mut exit) = mpsc::unbounded(); diff --git a/finality-aleph/src/validator_network/protocols/v1/mod.rs b/finality-aleph/src/validator_network/protocols/v1/mod.rs index 844e4a385c..170f48e2eb 100644 --- a/finality-aleph/src/validator_network/protocols/v1/mod.rs +++ b/finality-aleph/src/validator_network/protocols/v1/mod.rs @@ -12,7 +12,7 @@ use crate::validator_network::{ handshake::{v0_handshake_incoming, v0_handshake_outgoing}, ConnectionType, ProtocolError, ResultForService, }, - Data, PrivateKey, PublicKey, Splittable, + Data, PublicKey, SecretKey, Splittable, }; const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5); @@ -85,15 +85,15 @@ async fn manage_connection< /// Performs the outgoing handshake, and then manages a connection sending and receiving data. /// Exits on parent request, or in case of broken or dead network connection. -pub async fn outgoing( +pub async fn outgoing( stream: S, - private_key: PK, - peer_id: PK::PublicKey, - result_for_parent: mpsc::UnboundedSender>, + secret_key: SK, + peer_id: SK::PublicKey, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, -) -> Result<(), ProtocolError> { +) -> Result<(), ProtocolError> { trace!(target: "validator-network", "Extending hand to {}.", peer_id); - let (sender, receiver) = v0_handshake_outgoing(stream, private_key, peer_id.clone()).await?; + let (sender, receiver) = v0_handshake_outgoing(stream, secret_key, peer_id.clone()).await?; info!(target: "validator-network", "Outgoing handshake with {} finished successfully.", peer_id); let (data_for_network, data_from_user) = mpsc::unbounded(); result_for_parent @@ -106,14 +106,14 @@ pub async fn outgoing( /// Performs the incoming handshake, and then manages a connection sending and receiving data. /// Exits on parent request (when the data source is dropped), or in case of broken or dead /// network connection. -pub async fn incoming( +pub async fn incoming( stream: S, - private_key: PK, - result_for_parent: mpsc::UnboundedSender>, + secret_key: SK, + result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, -) -> Result<(), ProtocolError> { +) -> Result<(), ProtocolError> { trace!(target: "validator-network", "Waiting for extended hand..."); - let (sender, receiver, peer_id) = v0_handshake_incoming(stream, private_key).await?; + let (sender, receiver, peer_id) = v0_handshake_incoming(stream, secret_key).await?; info!(target: "validator-network", "Incoming handshake with {} finished successfully.", peer_id); let (data_for_network, data_from_user) = mpsc::unbounded(); result_for_parent diff --git a/finality-aleph/src/validator_network/service.rs b/finality-aleph/src/validator_network/service.rs index fa387384c6..b068098cfa 100644 --- a/finality-aleph/src/validator_network/service.rs +++ b/finality-aleph/src/validator_network/service.rs @@ -14,7 +14,7 @@ use crate::{ manager::{AddResult, LegacyManager, Manager}, outgoing::outgoing, protocols::{ConnectionType, ResultForService}, - Data, Dialer, Listener, Network, PrivateKey, PublicKey, + Data, Dialer, Listener, Network, PublicKey, SecretKey, }, SpawnTaskHandle, STATUS_REPORT_INTERVAL, }; @@ -73,34 +73,33 @@ impl Network for ServiceInterface, NL: Listener> +pub struct Service, NL: Listener> where - PK::PublicKey: PeerId, + SK::PublicKey: PeerId, { - commands_from_interface: mpsc::UnboundedReceiver>, + commands_from_interface: mpsc::UnboundedReceiver>, next_to_interface: mpsc::UnboundedSender, - manager: Manager, + manager: Manager, dialer: ND, listener: NL, spawn_handle: SpawnTaskHandle, - private_key: PK, + secret_key: SK, // Backwards compatibility with the one-sided connections, remove when no longer needed. - legacy_connected: HashSet, - legacy_manager: LegacyManager, + legacy_connected: HashSet, + legacy_manager: LegacyManager, } -impl, NL: Listener> - Service +impl, NL: Listener> Service where - PK::PublicKey: PeerId, + SK::PublicKey: PeerId, { /// Create a new validator network service plus an interface for interacting with it. pub fn new( dialer: ND, listener: NL, - private_key: PK, + secret_key: SK, spawn_handle: SpawnTaskHandle, - ) -> (Self, impl Network) { + ) -> (Self, impl Network) { // Channel for sending commands between the service and interface let (commands_for_service, commands_from_interface) = mpsc::unbounded(); // Channel for receiving data from the network @@ -109,11 +108,11 @@ where Self { commands_from_interface, next_to_interface, - manager: Manager::new(private_key.public_key()), + manager: Manager::new(secret_key.public_key()), dialer, listener, spawn_handle, - private_key, + secret_key, legacy_connected: HashSet::new(), legacy_manager: LegacyManager::new(), }, @@ -126,17 +125,17 @@ where fn spawn_new_outgoing( &self, - peer_id: PK::PublicKey, + peer_id: SK::PublicKey, addresses: Vec, - result_for_parent: mpsc::UnboundedSender>, + result_for_parent: mpsc::UnboundedSender>, ) { - let private_key = self.private_key.clone(); + let secret_key = self.secret_key.clone(); let dialer = self.dialer.clone(); let next_to_interface = self.next_to_interface.clone(); self.spawn_handle .spawn("aleph/validator_network_outgoing", None, async move { outgoing( - private_key, + secret_key, peer_id, dialer, addresses, @@ -150,17 +149,17 @@ where fn spawn_new_incoming( &self, stream: NL::Connection, - result_for_parent: mpsc::UnboundedSender>, + result_for_parent: mpsc::UnboundedSender>, ) { - let private_key = self.private_key.clone(); + let secret_key = self.secret_key.clone(); let next_to_interface = self.next_to_interface.clone(); self.spawn_handle .spawn("aleph/validator_network_incoming", None, async move { - incoming(private_key, stream, result_for_parent, next_to_interface).await; + incoming(secret_key, stream, result_for_parent, next_to_interface).await; }); } - fn peer_addresses(&self, peer_id: &PK::PublicKey) -> Option> { + fn peer_addresses(&self, peer_id: &SK::PublicKey) -> Option> { match self.legacy_connected.contains(peer_id) { true => self.legacy_manager.peer_addresses(peer_id), false => self.manager.peer_addresses(peer_id), @@ -169,7 +168,7 @@ where fn add_connection( &mut self, - peer_id: PK::PublicKey, + peer_id: SK::PublicKey, data_for_network: mpsc::UnboundedSender, connection_type: ConnectionType, ) -> AddResult { @@ -188,13 +187,13 @@ where } // Mark a peer as legacy and return whether it is the first time we do so. - fn mark_legacy(&mut self, peer_id: &PK::PublicKey) -> bool { + fn mark_legacy(&mut self, peer_id: &SK::PublicKey) -> bool { self.manager.remove_peer(peer_id); self.legacy_connected.insert(peer_id.clone()) } // Unmark a peer as legacy, putting it back in the normal set. - fn unmark_legacy(&mut self, peer_id: &PK::PublicKey) { + fn unmark_legacy(&mut self, peer_id: &SK::PublicKey) { self.legacy_connected.remove(peer_id); // Put it back if we still want to be connected. if let Some(addresses) = self.legacy_manager.peer_addresses(peer_id) { @@ -206,7 +205,7 @@ where // accordingly. Returns whether we should spawn a new connection worker because of that. fn check_for_legacy( &mut self, - peer_id: &PK::PublicKey, + peer_id: &SK::PublicKey, connection_type: ConnectionType, ) -> bool { use ConnectionType::*; From f7eb9d876842801ca6921394e22c98a0e9a7cf32 Mon Sep 17 00:00:00 2001 From: timorl Date: Thu, 24 Nov 2022 10:56:42 +0100 Subject: [PATCH 3/3] Rename most peer ids to public keys --- .../src/validator_network/manager/legacy.rs | 8 +- .../src/validator_network/manager/mod.rs | 12 +-- .../src/validator_network/outgoing.rs | 14 +-- .../validator_network/protocols/handshake.rs | 31 +++--- .../src/validator_network/protocols/mod.rs | 10 +- .../src/validator_network/protocols/v0/mod.rs | 20 ++-- .../src/validator_network/protocols/v1/mod.rs | 28 +++-- .../src/validator_network/service.rs | 100 +++++++++--------- 8 files changed, 119 insertions(+), 104 deletions(-) diff --git a/finality-aleph/src/validator_network/manager/legacy.rs b/finality-aleph/src/validator_network/manager/legacy.rs index 977682aacc..6e7e13697c 100644 --- a/finality-aleph/src/validator_network/manager/legacy.rs +++ b/finality-aleph/src/validator_network/manager/legacy.rs @@ -80,7 +80,7 @@ impl Display for ManagerStatus { let peers = self .both_ways_peers .iter() - .map(|authority_id| authority_id.to_short_string()) + .map(|peer_id| peer_id.to_short_string()) .collect::>() .join(", "); write!( @@ -95,7 +95,7 @@ impl Display for ManagerStatus { let peers = self .incoming_peers .iter() - .map(|authority_id| authority_id.to_short_string()) + .map(|peer_id| peer_id.to_short_string()) .collect::>() .join(", "); write!( @@ -110,7 +110,7 @@ impl Display for ManagerStatus { let peers = self .outgoing_peers .iter() - .map(|authority_id| authority_id.to_short_string()) + .map(|peer_id| peer_id.to_short_string()) .collect::>() .join(", "); write!( @@ -125,7 +125,7 @@ impl Display for ManagerStatus { let peers = self .missing_peers .iter() - .map(|authority_id| authority_id.to_short_string()) + .map(|peer_id| peer_id.to_short_string()) .collect::>() .join(", "); write!(f, "missing - {:?} [{}];", self.missing_peers.len(), peers)?; diff --git a/finality-aleph/src/validator_network/manager/mod.rs b/finality-aleph/src/validator_network/manager/mod.rs index 421be90d29..dd9288e7d7 100644 --- a/finality-aleph/src/validator_network/manager/mod.rs +++ b/finality-aleph/src/validator_network/manager/mod.rs @@ -89,9 +89,9 @@ impl ManagerStatus { } } -fn pretty_authority_id_set(set: &HashSet) -> String { +fn pretty_peer_id_set(set: &HashSet) -> String { set.iter() - .map(|authority_id| authority_id.to_short_string()) + .map(|peer_id| peer_id.to_short_string()) .collect::>() .join(", ") } @@ -116,7 +116,7 @@ impl Display for ManagerStatus { f, "have - {:?} [{}]; ", self.incoming_peers.len(), - pretty_authority_id_set(&self.incoming_peers), + pretty_peer_id_set(&self.incoming_peers), )?, } if !self.missing_incoming.is_empty() { @@ -124,7 +124,7 @@ impl Display for ManagerStatus { f, "missing - {:?} [{}]; ", self.missing_incoming.len(), - pretty_authority_id_set(&self.missing_incoming), + pretty_peer_id_set(&self.missing_incoming), )?; } } @@ -139,7 +139,7 @@ impl Display for ManagerStatus { f, "have - {:?} [{}]; ", self.incoming_peers.len(), - pretty_authority_id_set(&self.outgoing_peers), + pretty_peer_id_set(&self.outgoing_peers), )?; } if !self.missing_outgoing.is_empty() { @@ -147,7 +147,7 @@ impl Display for ManagerStatus { f, "missing - {:?} [{}]; ", self.missing_incoming.len(), - pretty_authority_id_set(&self.missing_outgoing), + pretty_peer_id_set(&self.missing_outgoing), )?; } } diff --git a/finality-aleph/src/validator_network/outgoing.rs b/finality-aleph/src/validator_network/outgoing.rs index e906cc558b..679b9d0b9b 100644 --- a/finality-aleph/src/validator_network/outgoing.rs +++ b/finality-aleph/src/validator_network/outgoing.rs @@ -38,13 +38,13 @@ impl> Display for OutgoingError async fn manage_outgoing>( secret_key: SK, - peer_id: SK::PublicKey, + public_key: SK::PublicKey, mut dialer: ND, addresses: Vec, result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), OutgoingError> { - debug!(target: "validator-network", "Trying to connect to {}.", peer_id); + debug!(target: "validator-network", "Trying to connect to {}.", public_key); let stream = dialer .connect(addresses) .await @@ -59,7 +59,7 @@ async fn manage_outgoing>( .manage_outgoing( stream, secret_key, - peer_id, + public_key, result_for_parent, data_for_user, ) @@ -74,7 +74,7 @@ const RETRY_DELAY: Duration = Duration::from_secs(10); /// to the parent, so that connections can be reestablished if necessary. pub async fn outgoing>( secret_key: SK, - peer_id: SK::PublicKey, + public_key: SK::PublicKey, dialer: ND, addresses: Vec, result_for_parent: mpsc::UnboundedSender>, @@ -82,7 +82,7 @@ pub async fn outgoing>( ) { if let Err(e) = manage_outgoing( secret_key, - peer_id.clone(), + public_key.clone(), dialer, addresses.clone(), result_for_parent.clone(), @@ -90,12 +90,12 @@ pub async fn outgoing>( ) .await { - info!(target: "validator-network", "Outgoing connection to {} {:?} failed: {}, will retry after {}s.", peer_id, addresses, e, RETRY_DELAY.as_secs()); + info!(target: "validator-network", "Outgoing connection to {} {:?} failed: {}, will retry after {}s.", public_key, addresses, e, RETRY_DELAY.as_secs()); sleep(RETRY_DELAY).await; // we send the "new" connection type, because we always assume it's new until proven // otherwise, and here we did not even get the chance to attempt negotiating a protocol if result_for_parent - .unbounded_send((peer_id, None, ConnectionType::New)) + .unbounded_send((public_key, None, ConnectionType::New)) .is_err() { debug!(target: "validator-network", "Could not send the closing message, we've probably been terminated by the parent service."); diff --git a/finality-aleph/src/validator_network/protocols/handshake.rs b/finality-aleph/src/validator_network/protocols/handshake.rs index 8f3c18d470..3286d5733e 100644 --- a/finality-aleph/src/validator_network/protocols/handshake.rs +++ b/finality-aleph/src/validator_network/protocols/handshake.rs @@ -58,15 +58,15 @@ impl From for HandshakeError { /// Handshake challenge. Contains public key of the creator, and a random nonce. #[derive(Debug, Clone, Encode, Decode)] struct Challenge { - id: PK, + public_key: PK, nonce: [u8; 32], } impl Challenge { /// Prepare new challenge that contains ID of the creator. - fn new(id: PK) -> Self { + fn new(public_key: PK) -> Self { let nonce = rand::thread_rng().gen::<[u8; 32]>(); - Self { id, nonce } + Self { public_key, nonce } } } @@ -74,7 +74,7 @@ impl Challenge { /// related to the received challenge. #[derive(Debug, Clone, Encode, Decode)] struct Response { - id: PK, + public_key: PK, signature: PK::Signature, } @@ -87,14 +87,14 @@ impl Response { challenge: &Challenge, ) -> Self { Self { - id: secret_key.public_key(), + public_key: secret_key.public_key(), signature: secret_key.sign(&challenge.encode()).await, } } /// Verify the Response sent by the peer. fn verify(&self, challenge: &Challenge) -> bool { - self.id.verify(&challenge.encode(), &self.signature) + self.public_key.verify(&challenge.encode(), &self.signature) } } @@ -120,8 +120,8 @@ pub async fn execute_v0_handshake_incoming( return Err(HandshakeError::SignatureError); } let (sender, receiver) = stream.split(); - let peer_id = peer_response.id; - Ok((sender, receiver, peer_id)) + let public_key = peer_response.public_key; + Ok((sender, receiver, public_key)) } /// Performs the handshake with a peer that we called. We assume that their @@ -136,12 +136,15 @@ pub async fn execute_v0_handshake_incoming( pub async fn execute_v0_handshake_outgoing( stream: S, secret_key: SK, - peer_id: SK::PublicKey, + public_key: SK::PublicKey, ) -> Result<(S::Sender, S::Receiver), HandshakeError> { // receive challenge let (stream, peer_challenge) = receive_data::<_, Challenge>(stream).await?; - if peer_id != peer_challenge.id { - return Err(HandshakeError::ChallengeError(peer_id, peer_challenge.id)); + if public_key != peer_challenge.public_key { + return Err(HandshakeError::ChallengeError( + public_key, + peer_challenge.public_key, + )); } // send response let our_response = Response::new(&secret_key, &peer_challenge).await; @@ -167,11 +170,11 @@ pub async fn v0_handshake_incoming( pub async fn v0_handshake_outgoing( stream: S, secret_key: SK, - peer_id: SK::PublicKey, + public_key: SK::PublicKey, ) -> Result<(S::Sender, S::Receiver), HandshakeError> { timeout( HANDSHAKE_TIMEOUT, - execute_v0_handshake_outgoing(stream, secret_key, peer_id), + execute_v0_handshake_outgoing(stream, secret_key, public_key), ) .await .map_err(|_| HandshakeError::TimedOut)? @@ -313,7 +316,7 @@ mod tests { let (fake_id, _) = key().await; // send response with substituted id let mut our_response = Response::new(&authority_pen, &challenge).await; - our_response.id = fake_id; + our_response.public_key = fake_id; send_data(stream, our_response).await.expect("should send"); futures::future::pending::<()>().await; } diff --git a/finality-aleph/src/validator_network/protocols/mod.rs b/finality-aleph/src/validator_network/protocols/mod.rs index 151558406e..c37e2d4253 100644 --- a/finality-aleph/src/validator_network/protocols/mod.rs +++ b/finality-aleph/src/validator_network/protocols/mod.rs @@ -26,8 +26,8 @@ pub enum ConnectionType { LegacyOutgoing, } -/// What connections send back to the service after they become established. Starts with a peer id -/// of the remote node, followed by a channel for sending data to that node, with None if the +/// What connections send back to the service after they become established. Starts with a public +/// key of the remote node, followed by a channel for sending data to that node, with None if the /// connection was unsuccessful and should be reestablished. Finally a marker for legacy /// compatibility. pub type ResultForService = (PK, Option>, ConnectionType); @@ -118,18 +118,18 @@ impl Protocol { &self, stream: S, secret_key: SK, - peer_id: SK::PublicKey, + public_key: SK::PublicKey, result_for_service: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { use Protocol::*; match self { - V0 => v0::outgoing(stream, secret_key, peer_id, result_for_service).await, + V0 => v0::outgoing(stream, secret_key, public_key, result_for_service).await, V1 => { v1::outgoing( stream, secret_key, - peer_id, + public_key, result_for_service, data_for_user, ) diff --git a/finality-aleph/src/validator_network/protocols/v0/mod.rs b/finality-aleph/src/validator_network/protocols/v0/mod.rs index 8abe666e24..c0c1af3ea0 100644 --- a/finality-aleph/src/validator_network/protocols/v0/mod.rs +++ b/finality-aleph/src/validator_network/protocols/v0/mod.rs @@ -35,16 +35,16 @@ async fn sending( pub async fn outgoing( stream: S, secret_key: SK, - peer_id: SK::PublicKey, + public_key: SK::PublicKey, result_for_parent: mpsc::UnboundedSender>, ) -> Result<(), ProtocolError> { - trace!(target: "validator-network", "Extending hand to {}.", peer_id); - let (sender, receiver) = v0_handshake_outgoing(stream, secret_key, peer_id.clone()).await?; - info!(target: "validator-network", "Outgoing handshake with {} finished successfully.", peer_id); + trace!(target: "validator-network", "Extending hand to {}.", public_key); + let (sender, receiver) = v0_handshake_outgoing(stream, secret_key, public_key.clone()).await?; + info!(target: "validator-network", "Outgoing handshake with {} finished successfully.", public_key); let (data_for_network, data_from_user) = mpsc::unbounded(); result_for_parent .unbounded_send(( - peer_id.clone(), + public_key.clone(), Some(data_for_network), ConnectionType::LegacyOutgoing, )) @@ -53,7 +53,7 @@ pub async fn outgoing( let sending = sending(sender, data_from_user); let heartbeat = heartbeat_receiver(receiver); - debug!(target: "validator-network", "Starting worker for sending to {}.", peer_id); + debug!(target: "validator-network", "Starting worker for sending to {}.", public_key); loop { tokio::select! { _ = heartbeat => return Err(ProtocolError::CardiacArrest), @@ -86,13 +86,13 @@ pub async fn incoming( data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { trace!(target: "validator-network", "Waiting for extended hand..."); - let (sender, receiver, peer_id) = v0_handshake_incoming(stream, secret_key).await?; - info!(target: "validator-network", "Incoming handshake with {} finished successfully.", peer_id); + let (sender, receiver, public_key) = v0_handshake_incoming(stream, secret_key).await?; + info!(target: "validator-network", "Incoming handshake with {} finished successfully.", public_key); let (tx_exit, mut exit) = mpsc::unbounded(); result_for_parent .unbounded_send(( - peer_id.clone(), + public_key.clone(), Some(tx_exit), ConnectionType::LegacyIncoming, )) @@ -101,7 +101,7 @@ pub async fn incoming( let receiving = receiving(receiver, data_for_user); let heartbeat = heartbeat_sender(sender); - debug!(target: "validator-network", "Starting worker for receiving from {}.", peer_id); + debug!(target: "validator-network", "Starting worker for receiving from {}.", public_key); loop { tokio::select! { _ = heartbeat => return Err(ProtocolError::CardiacArrest), diff --git a/finality-aleph/src/validator_network/protocols/v1/mod.rs b/finality-aleph/src/validator_network/protocols/v1/mod.rs index 170f48e2eb..372a00dfac 100644 --- a/finality-aleph/src/validator_network/protocols/v1/mod.rs +++ b/finality-aleph/src/validator_network/protocols/v1/mod.rs @@ -88,18 +88,22 @@ async fn manage_connection< pub async fn outgoing( stream: S, secret_key: SK, - peer_id: SK::PublicKey, + public_key: SK::PublicKey, result_for_parent: mpsc::UnboundedSender>, data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { - trace!(target: "validator-network", "Extending hand to {}.", peer_id); - let (sender, receiver) = v0_handshake_outgoing(stream, secret_key, peer_id.clone()).await?; - info!(target: "validator-network", "Outgoing handshake with {} finished successfully.", peer_id); + trace!(target: "validator-network", "Extending hand to {}.", public_key); + let (sender, receiver) = v0_handshake_outgoing(stream, secret_key, public_key.clone()).await?; + info!(target: "validator-network", "Outgoing handshake with {} finished successfully.", public_key); let (data_for_network, data_from_user) = mpsc::unbounded(); result_for_parent - .unbounded_send((peer_id.clone(), Some(data_for_network), ConnectionType::New)) + .unbounded_send(( + public_key.clone(), + Some(data_for_network), + ConnectionType::New, + )) .map_err(|_| ProtocolError::NoParentConnection)?; - debug!(target: "validator-network", "Starting worker for communicating with {}.", peer_id); + debug!(target: "validator-network", "Starting worker for communicating with {}.", public_key); manage_connection(sender, receiver, data_from_user, data_for_user).await } @@ -113,13 +117,17 @@ pub async fn incoming( data_for_user: mpsc::UnboundedSender, ) -> Result<(), ProtocolError> { trace!(target: "validator-network", "Waiting for extended hand..."); - let (sender, receiver, peer_id) = v0_handshake_incoming(stream, secret_key).await?; - info!(target: "validator-network", "Incoming handshake with {} finished successfully.", peer_id); + let (sender, receiver, public_key) = v0_handshake_incoming(stream, secret_key).await?; + info!(target: "validator-network", "Incoming handshake with {} finished successfully.", public_key); let (data_for_network, data_from_user) = mpsc::unbounded(); result_for_parent - .unbounded_send((peer_id.clone(), Some(data_for_network), ConnectionType::New)) + .unbounded_send(( + public_key.clone(), + Some(data_for_network), + ConnectionType::New, + )) .map_err(|_| ProtocolError::NoParentConnection)?; - debug!(target: "validator-network", "Starting worker for communicating with {}.", peer_id); + debug!(target: "validator-network", "Starting worker for communicating with {}.", public_key); manage_connection(sender, receiver, data_from_user, data_for_user).await } diff --git a/finality-aleph/src/validator_network/service.rs b/finality-aleph/src/validator_network/service.rs index b068098cfa..c1c2f4a762 100644 --- a/finality-aleph/src/validator_network/service.rs +++ b/finality-aleph/src/validator_network/service.rs @@ -125,7 +125,7 @@ where fn spawn_new_outgoing( &self, - peer_id: SK::PublicKey, + public_key: SK::PublicKey, addresses: Vec, result_for_parent: mpsc::UnboundedSender>, ) { @@ -136,7 +136,7 @@ where .spawn("aleph/validator_network_outgoing", None, async move { outgoing( secret_key, - peer_id, + public_key, dialer, addresses, result_for_parent, @@ -159,16 +159,16 @@ where }); } - fn peer_addresses(&self, peer_id: &SK::PublicKey) -> Option> { - match self.legacy_connected.contains(peer_id) { - true => self.legacy_manager.peer_addresses(peer_id), - false => self.manager.peer_addresses(peer_id), + fn peer_addresses(&self, public_key: &SK::PublicKey) -> Option> { + match self.legacy_connected.contains(public_key) { + true => self.legacy_manager.peer_addresses(public_key), + false => self.manager.peer_addresses(public_key), } } fn add_connection( &mut self, - peer_id: SK::PublicKey, + public_key: SK::PublicKey, data_for_network: mpsc::UnboundedSender, connection_type: ConnectionType, ) -> AddResult { @@ -178,26 +178,30 @@ where // If we are adding a non-legacy connection we want to ensure it's not marked as // such. This should only matter if a peer initially used the legacy protocol but // now upgraded, otherwise this is unnecessary busywork, but what can you do. - self.unmark_legacy(&peer_id); - self.manager.add_connection(peer_id, data_for_network) + self.unmark_legacy(&public_key); + self.manager.add_connection(public_key, data_for_network) } - LegacyIncoming => self.legacy_manager.add_incoming(peer_id, data_for_network), - LegacyOutgoing => self.legacy_manager.add_outgoing(peer_id, data_for_network), + LegacyIncoming => self + .legacy_manager + .add_incoming(public_key, data_for_network), + LegacyOutgoing => self + .legacy_manager + .add_outgoing(public_key, data_for_network), } } // Mark a peer as legacy and return whether it is the first time we do so. - fn mark_legacy(&mut self, peer_id: &SK::PublicKey) -> bool { - self.manager.remove_peer(peer_id); - self.legacy_connected.insert(peer_id.clone()) + fn mark_legacy(&mut self, public_key: &SK::PublicKey) -> bool { + self.manager.remove_peer(public_key); + self.legacy_connected.insert(public_key.clone()) } // Unmark a peer as legacy, putting it back in the normal set. - fn unmark_legacy(&mut self, peer_id: &SK::PublicKey) { - self.legacy_connected.remove(peer_id); + fn unmark_legacy(&mut self, public_key: &SK::PublicKey) { + self.legacy_connected.remove(public_key); // Put it back if we still want to be connected. - if let Some(addresses) = self.legacy_manager.peer_addresses(peer_id) { - self.manager.add_peer(peer_id.clone(), addresses); + if let Some(addresses) = self.legacy_manager.peer_addresses(public_key) { + self.manager.add_peer(public_key.clone(), addresses); } } @@ -205,14 +209,14 @@ where // accordingly. Returns whether we should spawn a new connection worker because of that. fn check_for_legacy( &mut self, - peer_id: &SK::PublicKey, + public_key: &SK::PublicKey, connection_type: ConnectionType, ) -> bool { use ConnectionType::*; match connection_type { - LegacyIncoming => self.mark_legacy(peer_id), + LegacyIncoming => self.mark_legacy(public_key), LegacyOutgoing => { - self.mark_legacy(peer_id); + self.mark_legacy(public_key); false } // We don't unmark here, because we always return New when a connection @@ -239,59 +243,59 @@ where Some(command) = self.commands_from_interface.next() => match command { // register new peer in manager or update its list of addresses if already there // spawn a worker managing outgoing connection if the peer was not known - AddConnection(peer_id, addresses) => { + AddConnection(public_key, addresses) => { // we add all the peers to the legacy manager so we don't lose the // addresses, but only care about its opinion when it turns out we have to // in particular the first time we add a peer we never know whether it // requires legacy connecting, so we only attempt to connect to it if the // new criterion is satisfied, otherwise we wait for it to connect to us - self.legacy_manager.add_peer(peer_id.clone(), addresses.clone()); - if self.manager.add_peer(peer_id.clone(), addresses.clone()) { - self.spawn_new_outgoing(peer_id, addresses, result_for_parent.clone()); + self.legacy_manager.add_peer(public_key.clone(), addresses.clone()); + if self.manager.add_peer(public_key.clone(), addresses.clone()) { + self.spawn_new_outgoing(public_key, addresses, result_for_parent.clone()); }; }, // remove the peer from the manager all workers will be killed automatically, due to closed channels - DelConnection(peer_id) => { - self.manager.remove_peer(&peer_id); - self.legacy_manager.remove_peer(&peer_id); - self.legacy_connected.remove(&peer_id); + DelConnection(public_key) => { + self.manager.remove_peer(&public_key); + self.legacy_manager.remove_peer(&public_key); + self.legacy_connected.remove(&public_key); }, // pass the data to the manager - SendData(data, peer_id) => { - match self.legacy_connected.contains(&peer_id) { - true => match self.legacy_manager.send_to(&peer_id, data) { - Ok(_) => trace!(target: "validator-network", "Sending data to {} through legacy.", peer_id), - Err(e) => trace!(target: "validator-network", "Failed sending to {} through legacy: {}", peer_id, e), + SendData(data, public_key) => { + match self.legacy_connected.contains(&public_key) { + true => match self.legacy_manager.send_to(&public_key, data) { + Ok(_) => trace!(target: "validator-network", "Sending data to {} through legacy.", public_key), + Err(e) => trace!(target: "validator-network", "Failed sending to {} through legacy: {}", public_key, e), }, - false => match self.manager.send_to(&peer_id, data) { - Ok(_) => trace!(target: "validator-network", "Sending data to {}.", peer_id), - Err(e) => trace!(target: "validator-network", "Failed sending to {}: {}", peer_id, e), + false => match self.manager.send_to(&public_key, data) { + Ok(_) => trace!(target: "validator-network", "Sending data to {}.", public_key), + Err(e) => trace!(target: "validator-network", "Failed sending to {}: {}", public_key, e), }, } }, }, // received information from a spawned worker managing a connection // check if we still want to be connected to the peer, and if so, spawn a new worker or actually add proper connection - Some((peer_id, maybe_data_for_network, connection_type)) = worker_results.next() => { - if self.check_for_legacy(&peer_id, connection_type) { - match self.legacy_manager.peer_addresses(&peer_id) { - Some(addresses) => self.spawn_new_outgoing(peer_id.clone(), addresses, result_for_parent.clone()), + Some((public_key, maybe_data_for_network, connection_type)) = worker_results.next() => { + if self.check_for_legacy(&public_key, connection_type) { + match self.legacy_manager.peer_addresses(&public_key) { + Some(addresses) => self.spawn_new_outgoing(public_key.clone(), addresses, result_for_parent.clone()), None => { // We received a result from a worker we are no longer interested // in. - self.legacy_connected.remove(&peer_id); + self.legacy_connected.remove(&public_key); }, } } use AddResult::*; match maybe_data_for_network { - Some(data_for_network) => match self.add_connection(peer_id.clone(), data_for_network, connection_type) { - Uninterested => warn!(target: "validator-network", "Established connection with peer {} for unknown reasons.", peer_id), - Added => info!(target: "validator-network", "New connection with peer {}.", peer_id), - Replaced => info!(target: "validator-network", "Replaced connection with peer {}.", peer_id), + Some(data_for_network) => match self.add_connection(public_key.clone(), data_for_network, connection_type) { + Uninterested => warn!(target: "validator-network", "Established connection with peer {} for unknown reasons.", public_key), + Added => info!(target: "validator-network", "New connection with peer {}.", public_key), + Replaced => info!(target: "validator-network", "Replaced connection with peer {}.", public_key), }, - None => if let Some(addresses) = self.peer_addresses(&peer_id) { - self.spawn_new_outgoing(peer_id, addresses, result_for_parent.clone()); + None => if let Some(addresses) = self.peer_addresses(&public_key) { + self.spawn_new_outgoing(public_key, addresses, result_for_parent.clone()); } } },