diff --git a/finality-aleph/src/network/gossip/mod.rs b/finality-aleph/src/network/gossip/mod.rs index c6ba22d633..4b8143d9ea 100644 --- a/finality-aleph/src/network/gossip/mod.rs +++ b/finality-aleph/src/network/gossip/mod.rs @@ -1,5 +1,6 @@ //! A P2P-based gossip network, for now only for sending broadcasts. use std::{ + collections::HashSet, fmt::{Debug, Display}, hash::Hash, }; @@ -15,9 +16,25 @@ mod service; pub use service::Service; #[async_trait::async_trait] -/// Interface for the gossip network, currently only supports broadcasting and receiving data. +/// Interface for the gossip network. This represents a P2P network and a lot of the properties of +/// this interface result from that. In particular we might know the ID of a given peer, but not be +/// connected to them directly. pub trait Network: Send + 'static { type Error: Display + Send; + type PeerId: Clone + Debug + Eq + Hash + Send + 'static; + + /// Attempt to send data to a peer. Might silently fail if we are not connected to them. + fn send_to(&mut self, data: D, peer_id: Self::PeerId) -> Result<(), Self::Error>; + + /// Send data to a random peer, preferably from a list. It should send the data to a randomly + /// chosen peer from the provided list, but if it cannot (e.g. because it's not connected) it + /// will send to a random available peer. No guarantees any peer gets it even if no errors are + /// returned, retry appropriately. + fn send_to_random( + &mut self, + data: D, + peer_ids: HashSet, + ) -> Result<(), Self::Error>; /// Broadcast data to all directly connected peers. Network-wide broadcasts have to be /// implemented on top of this abstraction. Note that there might be no currently connected @@ -25,8 +42,8 @@ pub trait Network: Send + 'static { /// returned, retry appropriately. fn broadcast(&mut self, data: D) -> Result<(), Self::Error>; - /// Receive some data from the network. - async fn next(&mut self) -> Result; + /// Receive some data from the network, including information about who sent it. + async fn next(&mut self) -> Result<(D, Self::PeerId), Self::Error>; } /// The Authentication protocol is used for validator discovery. @@ -51,7 +68,7 @@ pub trait NetworkSender: Send + Sync + 'static { pub enum Event

{ StreamOpened(P, Protocol), StreamClosed(P, Protocol), - Messages(Vec<(Protocol, Bytes)>), + Messages(P, Vec<(Protocol, Bytes)>), } #[async_trait::async_trait] @@ -63,7 +80,7 @@ pub trait EventStream

{ pub trait RawNetwork: Clone + Send + Sync + 'static { type SenderError: std::error::Error; type NetworkSender: NetworkSender; - type PeerId: Clone + Debug + Eq + Hash + Send; + type PeerId: Clone + Debug + Eq + Hash + Send + 'static; type EventStream: EventStream; /// Returns a stream of events representing what happens on the network. diff --git a/finality-aleph/src/network/gossip/service.rs b/finality-aleph/src/network/gossip/service.rs index 7f94c31d6c..876349d2b7 100644 --- a/finality-aleph/src/network/gossip/service.rs +++ b/finality-aleph/src/network/gossip/service.rs @@ -1,11 +1,13 @@ use std::{ collections::{HashMap, HashSet}, - fmt::{Display, Error as FmtError, Formatter}, + fmt::{Debug, Display, Error as FmtError, Formatter}, future::Future, + hash::Hash, }; use futures::{channel::mpsc, StreamExt}; use log::{debug, error, info, trace, warn}; +use rand::{seq::IteratorRandom, thread_rng}; use sc_service::SpawnTaskHandle; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use tokio::time; @@ -18,6 +20,12 @@ use crate::{ STATUS_REPORT_INTERVAL, }; +enum Command { + Send(D, P, Protocol), + SendToRandom(D, HashSet

, Protocol), + Broadcast(D, Protocol), +} + /// A service managing all the direct interaction with the underlying network implementation. It /// handles: /// 1. Incoming network events @@ -26,16 +34,16 @@ use crate::{ /// 3. Outgoing messages, sending them out, using 1.2. to broadcast. pub struct Service { network: N, - messages_from_user: mpsc::UnboundedReceiver, - messages_for_user: mpsc::UnboundedSender, + messages_from_user: mpsc::UnboundedReceiver>, + messages_for_user: mpsc::UnboundedSender<(D, N::PeerId)>, authentication_connected_peers: HashSet, authentication_peer_senders: HashMap>, spawn_handle: SpawnTaskHandle, } -struct ServiceInterface { - messages_from_service: mpsc::UnboundedReceiver, - messages_for_service: mpsc::UnboundedSender, +struct ServiceInterface { + messages_from_service: mpsc::UnboundedReceiver<(D, P)>, + messages_for_service: mpsc::UnboundedSender>, } /// What can go wrong when receiving or sending data. @@ -56,16 +64,37 @@ impl Display for Error { } #[async_trait::async_trait] -impl Network for ServiceInterface { +impl Network for ServiceInterface { type Error = Error; + type PeerId = P; + + fn send_to(&mut self, data: D, peer_id: Self::PeerId) -> Result<(), Self::Error> { + self.messages_for_service + .unbounded_send(Command::Send(data, peer_id, Protocol::Authentication)) + .map_err(|_| Error::ServiceStopped) + } + + fn send_to_random( + &mut self, + data: D, + peer_ids: HashSet, + ) -> Result<(), Self::Error> { + self.messages_for_service + .unbounded_send(Command::SendToRandom( + data, + peer_ids, + Protocol::Authentication, + )) + .map_err(|_| Error::ServiceStopped) + } fn broadcast(&mut self, data: D) -> Result<(), Self::Error> { self.messages_for_service - .unbounded_send(data) + .unbounded_send(Command::Broadcast(data, Protocol::Authentication)) .map_err(|_| Error::ServiceStopped) } - async fn next(&mut self) -> Result { + async fn next(&mut self) -> Result<(D, Self::PeerId), Self::Error> { self.messages_from_service .next() .await @@ -83,7 +112,10 @@ impl Service { pub fn new( network: N, spawn_handle: SpawnTaskHandle, - ) -> (Service, impl Network) { + ) -> ( + Service, + impl Network, + ) { let (messages_for_user, messages_from_service) = mpsc::unbounded(); let (messages_for_service, messages_from_user) = mpsc::unbounded(); ( @@ -170,21 +202,55 @@ impl Service { } } - fn broadcast(&mut self, data: D, protocol: Protocol) { - let peers = match protocol { - Protocol::Authentication => self.authentication_connected_peers.clone(), + fn send(&mut self, data: D, peer_id: N::PeerId, protocol: Protocol) { + if let Err(e) = self.send_to_peer(data, peer_id.clone(), protocol) { + trace!(target: "aleph-network", "Failed to send to peer{:?}, {:?}", peer_id, e); + } + } + + fn protocol_peers(&self, protocol: Protocol) -> &HashSet { + match protocol { + Protocol::Authentication => &self.authentication_connected_peers, + } + } + + fn random_peer<'a>( + &'a self, + peer_ids: &'a HashSet, + protocol: Protocol, + ) -> Option<&'a N::PeerId> { + peer_ids + .intersection(self.protocol_peers(protocol)) + .into_iter() + .choose(&mut thread_rng()) + .or(self + .protocol_peers(protocol) + .iter() + .choose(&mut thread_rng())) + } + + fn send_to_random(&mut self, data: D, peer_ids: HashSet, protocol: Protocol) { + let peer_id = match self.random_peer(&peer_ids, protocol) { + Some(peer_id) => peer_id.clone(), + None => { + trace!(target: "aleph-network", "Failed to send to random peer, no peers are available."); + return; + } }; + self.send(data, peer_id, protocol); + } + + fn broadcast(&mut self, data: D, protocol: Protocol) { + let peers = self.protocol_peers(protocol).clone(); for peer in peers { - if let Err(e) = self.send_to_peer(data.clone(), peer.clone(), protocol) { - trace!(target: "aleph-network", "Failed to send broadcast to peer{:?}, {:?}", peer, e); - } + self.send(data.clone(), peer, protocol); } } fn handle_network_event( &mut self, event: Event, - ) -> Result<(), mpsc::TrySendError> { + ) -> Result<(), mpsc::TrySendError<(D, N::PeerId)>> { use Event::*; match event { StreamOpened(peer, protocol) => { @@ -212,11 +278,13 @@ impl Service { } } } - Messages(messages) => { + Messages(peer_id, messages) => { for (protocol, data) in messages.into_iter() { match protocol { Protocol::Authentication => match D::decode(&mut &data[..]) { - Ok(data) => self.messages_for_user.unbounded_send(data)?, + Ok(data) => self + .messages_for_user + .unbounded_send((data, peer_id.clone()))?, Err(e) => { warn!(target: "aleph-network", "Error decoding authentication protocol message: {}", e) } @@ -256,7 +324,9 @@ impl Service { } }, maybe_message = self.messages_from_user.next() => match maybe_message { - Some(message) => self.broadcast(message, Protocol::Authentication), + Some(Command::Broadcast(message, protocol)) => self.broadcast(message, protocol), + Some(Command::SendToRandom(message, peer_ids, protocol)) => self.send_to_random(message, peer_ids, protocol), + Some(Command::Send(message, peer_id, protocol)) => self.send(message, peer_id, protocol), None => { error!(target: "aleph-network", "User message stream ended."); return; @@ -272,7 +342,7 @@ impl Service { #[cfg(test)] mod tests { - use std::collections::HashSet; + use std::{collections::HashSet, iter}; use codec::Encode; use futures::channel::oneshot; @@ -281,7 +351,7 @@ mod tests { use super::{Error, Service}; use crate::network::{ - clique::mock::random_peer_id, + clique::mock::{random_peer_id, MockPublicKey}, gossip::{ mock::{MockEvent, MockRawNetwork, MockSenderError}, Network, @@ -294,7 +364,7 @@ mod tests { pub struct TestData { pub network: MockRawNetwork, - gossip_network: Box>, + gossip_network: Box>, pub service: Service, // `TaskManager` can't be dropped for `SpawnTaskHandle` to work _task_manager: TaskManager, @@ -330,12 +400,25 @@ mod tests { #[async_trait::async_trait] impl Network for TestData { type Error = Error; + type PeerId = MockPublicKey; + + fn send_to(&mut self, data: MockData, peer_id: Self::PeerId) -> Result<(), Self::Error> { + self.gossip_network.send_to(data, peer_id) + } + + fn send_to_random( + &mut self, + data: MockData, + peer_ids: HashSet, + ) -> Result<(), Self::Error> { + self.gossip_network.send_to_random(data, peer_ids) + } fn broadcast(&mut self, data: MockData) -> Result<(), Self::Error> { self.gossip_network.broadcast(data) } - async fn next(&mut self) -> Result { + async fn next(&mut self) -> Result<(MockData, Self::PeerId), Self::Error> { self.gossip_network.next().await } } @@ -512,17 +595,132 @@ mod tests { let message = message(1); + let peer_id = random_peer_id(); + test_data + .service + .handle_network_event(MockEvent::Messages( + peer_id.clone(), + vec![(PROTOCOL, message.clone().encode().into())], + )) + .expect("Should handle"); + + let (received_message, received_peer_id) = + test_data.next().await.expect("Should receive message"); + assert_eq!(received_message, message); + assert_eq!(received_peer_id, peer_id); + + test_data.cleanup().await + } + + #[tokio::test] + async fn test_send_to_connected() { + let mut test_data = TestData::prepare().await; + + let peer_id = random_peer_id(); + + let message = message(1); + test_data .service - .handle_network_event(MockEvent::Messages(vec![( - PROTOCOL, - message.clone().encode().into(), - )])) + .handle_network_event(MockEvent::StreamOpened(peer_id.clone(), PROTOCOL)) .expect("Should handle"); + test_data + .service + .send(message.clone(), peer_id.clone(), PROTOCOL); + + let expected = (message.encode(), peer_id, PROTOCOL); + assert_eq!( - test_data.next().await.expect("Should receive message"), - message, + test_data + .network + .send_message + .next() + .await + .expect("Should receive message"), + expected, + ); + + test_data.cleanup().await + } + + #[tokio::test] + async fn test_no_send_to_disconnected() { + let mut test_data = TestData::prepare().await; + + let peer_id = random_peer_id(); + + let message = message(1); + + test_data.service.send(message, peer_id, PROTOCOL); + + test_data.cleanup().await + } + + #[tokio::test] + async fn test_send_to_random_connected() { + let mut test_data = TestData::prepare().await; + + let peer_id = random_peer_id(); + + let message = message(1); + + test_data + .service + .handle_network_event(MockEvent::StreamOpened(peer_id.clone(), PROTOCOL)) + .expect("Should handle"); + + test_data.service.send_to_random( + message.clone(), + iter::once(peer_id.clone()).collect(), + PROTOCOL, + ); + + let expected = (message.encode(), peer_id, PROTOCOL); + + assert_eq!( + test_data + .network + .send_message + .next() + .await + .expect("Should receive message"), + expected, + ); + + test_data.cleanup().await + } + + #[tokio::test] + async fn test_send_to_random_disconnected() { + let mut test_data = TestData::prepare().await; + + let peer_id = random_peer_id(); + let other_peer_id = random_peer_id(); + + let message = message(1); + + test_data + .service + .handle_network_event(MockEvent::StreamOpened(other_peer_id.clone(), PROTOCOL)) + .expect("Should handle"); + + test_data.service.send_to_random( + message.clone(), + iter::once(peer_id.clone()).collect(), + PROTOCOL, + ); + + let expected = (message.encode(), other_peer_id, PROTOCOL); + + assert_eq!( + test_data + .network + .send_message + .next() + .await + .expect("Should receive message"), + expected, ); test_data.cleanup().await diff --git a/finality-aleph/src/network/session/service.rs b/finality-aleph/src/network/session/service.rs index 080368397d..8225260e2f 100644 --- a/finality-aleph/src/network/session/service.rs +++ b/finality-aleph/src/network/session/service.rs @@ -385,7 +385,7 @@ where } }, maybe_authentication = self.gossip_network.next() => { - let authentication = maybe_authentication.map_err(Error::GossipNetwork)?; + let (authentication, _) = maybe_authentication.map_err(Error::GossipNetwork)?; trace!(target: "aleph-network", "Manager received an authentication from network"); match authentication.try_into() { Ok(message) => { diff --git a/finality-aleph/src/network/substrate.rs b/finality-aleph/src/network/substrate.rs index 57d0c9745b..123423b9a8 100644 --- a/finality-aleph/src/network/substrate.rs +++ b/finality-aleph/src/network/substrate.rs @@ -171,8 +171,9 @@ impl EventStream for NetworkEventStream { Err(_) => continue, } } - NotificationsReceived { messages, .. } => { + NotificationsReceived { messages, remote } => { return Some(Messages( + remote, messages .into_iter() .filter_map(|(protocol, data)| { diff --git a/finality-aleph/src/testing/network.rs b/finality-aleph/src/testing/network.rs index 77445be02c..5d6733f1b5 100644 --- a/finality-aleph/src/testing/network.rs +++ b/finality-aleph/src/testing/network.rs @@ -227,10 +227,13 @@ impl TestData { for versioned_authentication in Vec::>::from(handler.authentication().unwrap()) { - self.network.emit_event(MockEvent::Messages(vec![( - Protocol::Authentication, - versioned_authentication.encode().into(), - )])); + self.network.emit_event(MockEvent::Messages( + authority.auth_peer_id(), + vec![( + Protocol::Authentication, + versioned_authentication.encode().into(), + )], + )); } } } @@ -334,10 +337,13 @@ async fn test_forwards_authentication_broadcast() { for versioned_authentication in Vec::>::from(sending_peer_handler.authentication().unwrap()) { - test_data.network.emit_event(MockEvent::Messages(vec![( - Protocol::Authentication, - versioned_authentication.encode().into(), - )])); + test_data.network.emit_event(MockEvent::Messages( + sending_peer.auth_peer_id(), + vec![( + Protocol::Authentication, + versioned_authentication.encode().into(), + )], + )); } for _ in 0..2 {