diff --git a/finality-aleph/src/lib.rs b/finality-aleph/src/lib.rs index 7dc23e2bc9..d6772d6aa8 100644 --- a/finality-aleph/src/lib.rs +++ b/finality-aleph/src/lib.rs @@ -22,12 +22,11 @@ use tokio::time::Duration; use crate::{ abft::{CurrentNetworkData, LegacyNetworkData}, aggregation::{CurrentRmcNetworkData, LegacyRmcNetworkData}, - network::Split, + network::{protocol_name, Split}, session::{ first_block_of_session, last_block_of_session, session_id_from_block_num, SessionBoundaries, SessionId, }, - substrate_network::protocol_name, VersionedTryFromError::{ExpectedNewGotOld, ExpectedOldGotNew}, }; @@ -45,7 +44,6 @@ mod nodes; mod party; mod session; mod session_map; -mod substrate_network; mod tcp_network; #[cfg(test)] pub mod testing; diff --git a/finality-aleph/src/network/gossip/mock.rs b/finality-aleph/src/network/gossip/mock.rs new file mode 100644 index 0000000000..076dc644f8 --- /dev/null +++ b/finality-aleph/src/network/gossip/mock.rs @@ -0,0 +1,132 @@ +use std::{collections::VecDeque, fmt, sync::Arc}; + +use async_trait::async_trait; +use futures::{ + channel::{mpsc, oneshot}, + StreamExt, +}; +use parking_lot::Mutex; + +use crate::{ + network::{ + gossip::{Event, EventStream, NetworkSender, Protocol, RawNetwork}, + mock::Channel, + }, + validator_network::mock::MockPublicKey, +}; + +pub type MockEvent = Event; + +pub struct MockEventStream(mpsc::UnboundedReceiver); + +#[async_trait] +impl EventStream for MockEventStream { + async fn next_event(&mut self) -> Option { + self.0.next().await + } +} + +pub struct MockNetworkSender { + sender: mpsc::UnboundedSender<(Vec, MockPublicKey, Protocol)>, + peer_id: MockPublicKey, + protocol: Protocol, + error: Result<(), MockSenderError>, +} + +#[async_trait] +impl NetworkSender for MockNetworkSender { + type SenderError = MockSenderError; + + async fn send<'a>( + &'a self, + data: impl Into> + Send + Sync + 'static, + ) -> Result<(), MockSenderError> { + self.error?; + self.sender + .unbounded_send((data.into(), self.peer_id.clone(), self.protocol)) + .unwrap(); + Ok(()) + } +} + +#[derive(Clone)] +pub struct MockRawNetwork { + pub send_message: Channel<(Vec, MockPublicKey, Protocol)>, + pub event_sinks: Arc>>>, + event_stream_taken_oneshot: Arc>>>, + pub create_sender_errors: Arc>>, + pub send_errors: Arc>>, +} + +#[derive(Debug, Copy, Clone)] +pub struct MockSenderError; + +impl fmt::Display for MockSenderError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Some error message") + } +} + +impl std::error::Error for MockSenderError {} + +impl RawNetwork for MockRawNetwork { + type SenderError = MockSenderError; + type NetworkSender = MockNetworkSender; + type PeerId = MockPublicKey; + type EventStream = MockEventStream; + + fn event_stream(&self) -> Self::EventStream { + let (tx, rx) = mpsc::unbounded(); + self.event_sinks.lock().push(tx); + // Necessary for tests to detect when service takes event_stream + if let Some(tx) = self.event_stream_taken_oneshot.lock().take() { + tx.send(()).unwrap(); + } + MockEventStream(rx) + } + + fn sender( + &self, + peer_id: Self::PeerId, + protocol: Protocol, + ) -> Result { + self.create_sender_errors + .lock() + .pop_front() + .map_or(Ok(()), Err)?; + let error = self.send_errors.lock().pop_front().map_or(Ok(()), Err); + Ok(MockNetworkSender { + sender: self.send_message.0.clone(), + peer_id, + protocol, + error, + }) + } +} + +impl MockRawNetwork { + pub fn new(oneshot_sender: oneshot::Sender<()>) -> Self { + MockRawNetwork { + send_message: Channel::new(), + event_sinks: Arc::new(Mutex::new(vec![])), + event_stream_taken_oneshot: Arc::new(Mutex::new(Some(oneshot_sender))), + create_sender_errors: Arc::new(Mutex::new(VecDeque::new())), + send_errors: Arc::new(Mutex::new(VecDeque::new())), + } + } + + pub fn emit_event(&mut self, event: MockEvent) { + for sink in &*self.event_sinks.lock() { + sink.unbounded_send(event.clone()).unwrap(); + } + } + + // Consumes the network asserting there are no unreceived messages in the channels. + pub async fn close_channels(self) { + self.event_sinks.lock().clear(); + // We disable it until tests regarding new substrate network protocol are created. + // assert!(self.add_reserved.close().await.is_none()); + // assert!(self.remove_reserved.close().await.is_none()); + assert!(self.send_message.close().await.is_none()); + } +} diff --git a/finality-aleph/src/network/gossip/mod.rs b/finality-aleph/src/network/gossip/mod.rs new file mode 100644 index 0000000000..c6ba22d633 --- /dev/null +++ b/finality-aleph/src/network/gossip/mod.rs @@ -0,0 +1,78 @@ +//! A P2P-based gossip network, for now only for sending broadcasts. +use std::{ + fmt::{Debug, Display}, + hash::Hash, +}; + +use bytes::Bytes; + +use crate::network::Data; + +#[cfg(test)] +pub mod mock; +mod service; + +pub use service::Service; + +#[async_trait::async_trait] +/// Interface for the gossip network, currently only supports broadcasting and receiving data. +pub trait Network: Send + 'static { + type Error: Display + Send; + + /// Broadcast data to all directly connected peers. Network-wide broadcasts have to be + /// implemented on top of this abstraction. Note that there might be no currently connected + /// peers, so there are no guarantees any single call sends anything even if no errors are + /// returned, retry appropriately. + fn broadcast(&mut self, data: D) -> Result<(), Self::Error>; + + /// Receive some data from the network. + async fn next(&mut self) -> Result; +} + +/// The Authentication protocol is used for validator discovery. +#[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)] +pub enum Protocol { + Authentication, +} + +/// Abstraction over a sender to the raw network. +#[async_trait::async_trait] +pub trait NetworkSender: Send + Sync + 'static { + type SenderError: std::error::Error; + + /// A method for sending data. Returns Error if not connected to the peer. + async fn send<'a>( + &'a self, + data: impl Into> + Send + Sync + 'static, + ) -> Result<(), Self::SenderError>; +} + +#[derive(Clone)] +pub enum Event

{ + StreamOpened(P, Protocol), + StreamClosed(P, Protocol), + Messages(Vec<(Protocol, Bytes)>), +} + +#[async_trait::async_trait] +pub trait EventStream

{ + async fn next_event(&mut self) -> Option>; +} + +/// Abstraction over a raw p2p network. +pub trait RawNetwork: Clone + Send + Sync + 'static { + type SenderError: std::error::Error; + type NetworkSender: NetworkSender; + type PeerId: Clone + Debug + Eq + Hash + Send; + type EventStream: EventStream; + + /// Returns a stream of events representing what happens on the network. + fn event_stream(&self) -> Self::EventStream; + + /// Returns a sender to the given peer using a given protocol. Returns Error if not connected to the peer. + fn sender( + &self, + peer_id: Self::PeerId, + protocol: Protocol, + ) -> Result; +} diff --git a/finality-aleph/src/network/service.rs b/finality-aleph/src/network/gossip/service.rs similarity index 82% rename from finality-aleph/src/network/service.rs rename to finality-aleph/src/network/gossip/service.rs index 7c231e80bd..6b5ae91f98 100644 --- a/finality-aleph/src/network/service.rs +++ b/finality-aleph/src/network/gossip/service.rs @@ -1,5 +1,6 @@ use std::{ collections::{HashMap, HashSet}, + fmt::{Display, Error as FmtError, Formatter}, future::Future, }; @@ -10,7 +11,10 @@ use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnbound use tokio::time; use crate::{ - network::{Data, Event, EventStream, Network, NetworkSender, Protocol}, + network::{ + gossip::{Event, EventStream, Network, NetworkSender, Protocol, RawNetwork}, + Data, + }, STATUS_REPORT_INTERVAL, }; @@ -20,7 +24,7 @@ use crate::{ /// 1. Messages are forwarded to the user. /// 2. Various forms of (dis)connecting, keeping track of all currently connected nodes. /// 3. Outgoing messages, sending them out, using 1.2. to broadcast. -pub struct Service { +pub struct Service { network: N, messages_from_user: mpsc::UnboundedReceiver, messages_for_user: mpsc::UnboundedSender, @@ -29,40 +33,73 @@ pub struct Service { spawn_handle: SpawnTaskHandle, } -/// Input/output channels for the network service. -pub struct IO { - pub messages_from_user: mpsc::UnboundedReceiver, - pub messages_for_user: mpsc::UnboundedSender, +struct ServiceInterface { + messages_from_service: mpsc::UnboundedReceiver, + messages_for_service: mpsc::UnboundedSender, } -impl IO { - pub fn new( - messages_from_user: mpsc::UnboundedReceiver, - messages_for_user: mpsc::UnboundedSender, - ) -> IO { - IO { - messages_from_user, - messages_for_user, +/// What can go wrong when receiving or sending data. +#[derive(Debug)] +pub enum Error { + ServiceStopped, +} + +impl Display for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { + use Error::*; + match self { + ServiceStopped => { + write!(f, "gossip network service stopped") + } } } } +#[async_trait::async_trait] +impl Network for ServiceInterface { + type Error = Error; + + fn broadcast(&mut self, data: D) -> Result<(), Self::Error> { + self.messages_for_service + .unbounded_send(data) + .map_err(|_| Error::ServiceStopped) + } + + async fn next(&mut self) -> Result { + self.messages_from_service + .next() + .await + .ok_or(Error::ServiceStopped) + } +} + #[derive(Debug)] enum SendError { MissingSender, SendingFailed, } -impl Service { - pub fn new(network: N, spawn_handle: SpawnTaskHandle, io: IO) -> Service { - Service { - network, - messages_from_user: io.messages_from_user, - messages_for_user: io.messages_for_user, - spawn_handle, - authentication_connected_peers: HashSet::new(), - authentication_peer_senders: HashMap::new(), - } +impl Service { + pub fn new( + network: N, + spawn_handle: SpawnTaskHandle, + ) -> (Service, impl Network) { + let (messages_for_user, messages_from_service) = mpsc::unbounded(); + let (messages_for_service, messages_from_user) = mpsc::unbounded(); + ( + Service { + network, + messages_from_user, + messages_for_user, + spawn_handle, + authentication_connected_peers: HashSet::new(), + authentication_peer_senders: HashMap::new(), + }, + ServiceInterface { + messages_from_service, + messages_for_service, + }, + ) } fn get_sender( @@ -238,18 +275,19 @@ mod tests { use std::collections::HashSet; use codec::Encode; - use futures::{ - channel::{mpsc, oneshot}, - StreamExt, - }; + use futures::channel::oneshot; use sc_service::TaskManager; use tokio::runtime::Handle; - use super::Service; + use super::{Error, Service}; use crate::{ network::{ - mock::{MockData, MockEvent, MockNetwork, MockSenderError}, - NetworkServiceIO, Protocol, + gossip::{ + mock::{MockEvent, MockRawNetwork, MockSenderError}, + Network, + }, + mock::MockData, + Protocol, }, testing::mocks::validator_network::random_peer_id, }; @@ -257,9 +295,9 @@ mod tests { const PROTOCOL: Protocol = Protocol::Authentication; pub struct TestData { - pub network: MockNetwork, - pub messages_from_network: mpsc::UnboundedReceiver, - pub service: Service, + pub network: MockRawNetwork, + gossip_network: Box>, + pub service: Service, // `TaskManager` can't be dropped for `SpawnTaskHandle` to work _task_manager: TaskManager, } @@ -268,23 +306,20 @@ mod tests { async fn prepare() -> Self { let task_manager = TaskManager::new(Handle::current(), None).unwrap(); - // Prepare communication with service - // We can drop the sender, as we will call service.broadcast directly - let (_, messages_from_user) = mpsc::unbounded(); - let (messages_for_user, messages_from_network) = mpsc::unbounded(); - let io = NetworkServiceIO::new(messages_from_user, messages_for_user); // Event stream will never be taken, so we can drop the receiver let (event_stream_oneshot_tx, _) = oneshot::channel(); // Prepare service - let network = MockNetwork::new(event_stream_oneshot_tx); - let service = Service::new(network.clone(), task_manager.spawn_handle(), io); + let network = MockRawNetwork::new(event_stream_oneshot_tx); + let (service, gossip_network) = + Service::new(network.clone(), task_manager.spawn_handle()); + let gossip_network = Box::new(gossip_network); // `TaskManager` needs to be passed, so sender threads are running in background. Self { network, service, - messages_from_network, + gossip_network, _task_manager: task_manager, } } @@ -294,6 +329,19 @@ mod tests { } } + #[async_trait::async_trait] + impl Network for TestData { + type Error = Error; + + fn broadcast(&mut self, data: MockData) -> Result<(), Self::Error> { + self.gossip_network.broadcast(data) + } + + async fn next(&mut self) -> Result { + self.gossip_network.next().await + } + } + fn message(i: u8) -> MockData { vec![i, i + 1, i + 2] } @@ -475,11 +523,7 @@ mod tests { .expect("Should handle"); assert_eq!( - test_data - .messages_from_network - .next() - .await - .expect("Should receive message"), + test_data.next().await.expect("Should receive message"), message, ); diff --git a/finality-aleph/src/network/io.rs b/finality-aleph/src/network/io.rs index 32c890fb23..60bed0de91 100644 --- a/finality-aleph/src/network/io.rs +++ b/finality-aleph/src/network/io.rs @@ -5,51 +5,38 @@ use futures::channel::mpsc; use crate::{ network::{ manager::{DataInSession, VersionedAuthentication}, - AddressingInformation, ConnectionManagerIO, Data, NetworkServiceIO as NetworkIO, - SessionManagerIO, + AddressingInformation, ConnectionManagerIO, Data, GossipNetwork, SessionManagerIO, }, validator_network::{Network as ValidatorNetwork, PublicKey}, }; -type AuthenticationNetworkIO = NetworkIO>; - -type FullIO = ( - ConnectionManagerIO, - AuthenticationNetworkIO, - SessionManagerIO, -); +type FullIO = (ConnectionManagerIO, SessionManagerIO); pub fn setup< D: Data, M: Data + Debug, A: AddressingInformation + TryFrom> + Into>, VN: ValidatorNetwork>, + GN: GossipNetwork>, >( validator_network: VN, -) -> FullIO + gossip_network: GN, +) -> FullIO where A::PeerId: PublicKey, { // Prepare and start the network - let (messages_for_network, messages_from_user) = mpsc::unbounded(); let (commands_for_service, commands_from_user) = mpsc::unbounded(); let (messages_for_service, commands_from_manager) = mpsc::unbounded(); - let (messages_for_user, messages_from_network) = mpsc::unbounded(); let connection_io = ConnectionManagerIO::new( - messages_for_network, commands_from_user, commands_from_manager, - messages_from_network, validator_network, + gossip_network, ); - let channels_for_network = NetworkIO::new(messages_from_user, messages_for_user); let channels_for_session_manager = SessionManagerIO::new(commands_for_service, messages_for_service); - ( - connection_io, - channels_for_network, - channels_for_session_manager, - ) + (connection_io, channels_for_session_manager) } diff --git a/finality-aleph/src/network/manager/service.rs b/finality-aleph/src/network/manager/service.rs index 7f6583a3b6..1b390bd316 100644 --- a/finality-aleph/src/network/manager/service.rs +++ b/finality-aleph/src/network/manager/service.rs @@ -1,7 +1,8 @@ use std::{ cmp, collections::{HashMap, HashSet}, - fmt::Debug, + fmt::{Debug, Display, Error as FmtError, Formatter}, + marker::PhantomData, time::Duration, }; @@ -20,7 +21,8 @@ use crate::{ compatibility::PeerAuthentications, Connections, DataInSession, Discovery, DiscoveryMessage, SessionHandler, SessionHandlerError, VersionedAuthentication, }, - AddressedData, AddressingInformation, ConnectionCommand, Data, NetworkIdentity, PeerId, + AddressedData, AddressingInformation, ConnectionCommand, Data, GossipNetwork, + NetworkIdentity, PeerId, }, validator_network::{Network as ValidatorNetwork, PublicKey}, MillisecsPerBlock, NodeIndex, SessionId, SessionPeriod, STATUS_REPORT_INTERVAL, @@ -137,6 +139,13 @@ where initial_delay: Duration, } +/// Error when trying to forward data from the network to the user, should never be fatal. +#[derive(Debug, PartialEq, Eq)] +pub enum SendError { + UserSend, + NoSession, +} + impl Service where NI::AddressingInformation: TryFrom> + Into>, @@ -451,7 +460,7 @@ where } /// Sends the data to the identified session. - pub fn send_session_data(&self, session_id: &SessionId, data: D) -> Result<(), Error> { + pub fn send_session_data(&self, session_id: &SessionId, data: D) -> Result<(), SendError> { match self .sessions .get(session_id) @@ -459,8 +468,8 @@ where { Some(data_for_user) => data_for_user .unbounded_send(data) - .map_err(|_| Error::UserSend), - None => Err(Error::NoSession), + .map_err(|_| SendError::UserSend), + None => Err(SendError::NoSession), } } @@ -550,27 +559,36 @@ pub struct IO< M: Data, A: AddressingInformation + TryFrom> + Into>, VN: ValidatorNetwork>, + GN: GossipNetwork>, > where A::PeerId: PublicKey, { - authentications_for_network: mpsc::UnboundedSender>, commands_from_user: mpsc::UnboundedReceiver>, messages_from_user: mpsc::UnboundedReceiver<(D, SessionId, Recipient)>, - authentications_from_network: mpsc::UnboundedReceiver>, validator_network: VN, + gossip_network: GN, + _phantom: PhantomData<(M, A)>, } /// Errors that can happen during the network service operations. #[derive(Debug, PartialEq, Eq)] -pub enum Error { - NetworkSend, - /// Should never be fatal. - UserSend, - /// Should never be fatal. - NoSession, +pub enum Error { CommandsChannel, MessageChannel, - NetworkChannel, + ValidatorNetwork, + GossipNetwork(GE), +} + +impl Display for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { + use Error::*; + match self { + CommandsChannel => write!(f, "commands channel unexpectedly closed"), + MessageChannel => write!(f, "message channel unexpectedly closed"), + ValidatorNetwork => write!(f, "validator network unexpectedly done"), + GossipNetwork(e) => write!(f, "gossip network unexpectedly done: {}", e), + } + } } impl< @@ -578,23 +596,23 @@ impl< M: Data + Debug, A: AddressingInformation + TryFrom> + Into>, VN: ValidatorNetwork>, - > IO + GN: GossipNetwork>, + > IO where A::PeerId: PublicKey, { pub fn new( - authentications_for_network: mpsc::UnboundedSender>, commands_from_user: mpsc::UnboundedReceiver>, messages_from_user: mpsc::UnboundedReceiver<(D, SessionId, Recipient)>, - authentications_from_network: mpsc::UnboundedReceiver>, validator_network: VN, - ) -> IO { + gossip_network: GN, + ) -> IO { IO { - authentications_for_network, commands_from_user, messages_from_user, - authentications_from_network, validator_network, + gossip_network, + _phantom: PhantomData, } } @@ -603,13 +621,13 @@ where } fn send_authentications( - &self, + &mut self, to_send: Vec>, - ) -> Result<(), Error> { + ) -> Result<(), Error> { for auth in to_send { - self.authentications_for_network - .unbounded_send(auth) - .map_err(|_| Error::NetworkSend)?; + self.gossip_network + .broadcast(auth) + .map_err(Error::GossipNetwork)?; } Ok(()) } @@ -636,7 +654,7 @@ where maybe_command, maybe_message, }: ServiceActions, - ) -> Result<(), Error> { + ) -> Result<(), Error> { if let Some(command) = maybe_command { self.handle_connection_command(command); } @@ -650,7 +668,7 @@ where pub async fn run>( mut self, mut service: Service, - ) -> Result<(), Error> { + ) -> Result<(), Error> { // Initial delay is needed so that Network is fully set up and we received some first discovery broadcasts from other nodes. // Otherwise this might cause first maintenance never working, as it happens before first broadcasts. let mut maintenance = time::interval_at( @@ -686,22 +704,19 @@ where match maybe_data { Some(DataInSession{data, session_id}) => if let Err(e) = service.send_session_data(&session_id, data) { match e { - Error::UserSend => trace!(target: "aleph-network", "Failed to send to user in session."), - Error::NoSession => trace!(target: "aleph-network", "Received message for unknown session."), - _ => return Err(e), + SendError::UserSend => trace!(target: "aleph-network", "Failed to send to user in session."), + SendError::NoSession => trace!(target: "aleph-network", "Received message for unknown session."), } }, - None => return Err(Error::NetworkChannel), + None => return Err(Error::ValidatorNetwork), } }, - maybe_authentication = self.authentications_from_network.next() => { + maybe_authentication = self.gossip_network.next() => { + let authentication = maybe_authentication.map_err(Error::GossipNetwork)?; trace!(target: "aleph-network", "Manager received an authentication from network"); - match maybe_authentication { - Some(authentication) => match authentication.try_into() { - Ok(message) => self.handle_service_actions(service.on_discovery_message(message))?, - Err(e) => warn!(target: "aleph-network", "Error casting versioned authentication to discovery message: {:?}", e), - }, - None => return Err(Error::NetworkChannel), + match authentication.try_into() { + Ok(message) => self.handle_service_actions(service.on_discovery_message(message))?, + Err(e) => warn!(target: "aleph-network", "Error casting versioned authentication to discovery message: {:?}", e), } }, _ = maintenance.tick() => { @@ -724,7 +739,7 @@ mod tests { use futures::{channel::oneshot, StreamExt}; - use super::{Config, Error, Service, ServiceActions, SessionCommand}; + use super::{Config, SendError, Service, ServiceActions, SessionCommand}; use crate::{ network::{ manager::{compatibility::PeerAuthentications, DataInSession, DiscoveryMessage}, @@ -763,7 +778,7 @@ mod tests { assert!(maybe_message.is_none()); assert_eq!( service.send_session_data(&session_id, -43), - Err(Error::NoSession) + Err(SendError::NoSession) ); } @@ -829,7 +844,7 @@ mod tests { assert!(maybe_message.is_none()); assert_eq!( service.send_session_data(&session_id, -43), - Err(Error::NoSession) + Err(SendError::NoSession) ); assert!(data_from_network.next().await.is_none()); } diff --git a/finality-aleph/src/network/mock.rs b/finality-aleph/src/network/mock.rs index 69886efb98..30c3aa56c9 100644 --- a/finality-aleph/src/network/mock.rs +++ b/finality-aleph/src/network/mock.rs @@ -1,22 +1,17 @@ -use std::{collections::VecDeque, fmt, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use aleph_primitives::KEY_TYPE; -use async_trait::async_trait; -use futures::{ - channel::{mpsc, oneshot}, - StreamExt, -}; -use parking_lot::Mutex; +use futures::{channel::mpsc, StreamExt}; use sp_keystore::{testing::KeyStore, CryptoStore}; use tokio::time::timeout; use crate::{ crypto::{AuthorityPen, AuthorityVerifier}, - network::{Event, EventStream, Network, NetworkSender, Protocol}, - validator_network::mock::MockPublicKey, AuthorityId, NodeIndex, }; +pub type MockData = Vec; + #[derive(Clone)] pub struct Channel( pub mpsc::UnboundedSender, @@ -67,124 +62,6 @@ impl Default for Channel { } } -pub type MockEvent = Event; - -pub type MockData = Vec; - -pub struct MockEventStream(mpsc::UnboundedReceiver); - -#[async_trait] -impl EventStream for MockEventStream { - async fn next_event(&mut self) -> Option { - self.0.next().await - } -} - -pub struct MockNetworkSender { - sender: mpsc::UnboundedSender<(Vec, MockPublicKey, Protocol)>, - peer_id: MockPublicKey, - protocol: Protocol, - error: Result<(), MockSenderError>, -} - -#[async_trait] -impl NetworkSender for MockNetworkSender { - type SenderError = MockSenderError; - - async fn send<'a>( - &'a self, - data: impl Into> + Send + Sync + 'static, - ) -> Result<(), MockSenderError> { - self.error?; - self.sender - .unbounded_send((data.into(), self.peer_id.clone(), self.protocol)) - .unwrap(); - Ok(()) - } -} - -#[derive(Clone)] -pub struct MockNetwork { - pub send_message: Channel<(Vec, MockPublicKey, Protocol)>, - pub event_sinks: Arc>>>, - event_stream_taken_oneshot: Arc>>>, - pub create_sender_errors: Arc>>, - pub send_errors: Arc>>, -} - -#[derive(Debug, Copy, Clone)] -pub struct MockSenderError; - -impl fmt::Display for MockSenderError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Some error message") - } -} - -impl std::error::Error for MockSenderError {} - -impl Network for MockNetwork { - type SenderError = MockSenderError; - type NetworkSender = MockNetworkSender; - type PeerId = MockPublicKey; - type EventStream = MockEventStream; - - fn event_stream(&self) -> Self::EventStream { - let (tx, rx) = mpsc::unbounded(); - self.event_sinks.lock().push(tx); - // Necessary for tests to detect when service takes event_stream - if let Some(tx) = self.event_stream_taken_oneshot.lock().take() { - tx.send(()).unwrap(); - } - MockEventStream(rx) - } - - fn sender( - &self, - peer_id: Self::PeerId, - protocol: Protocol, - ) -> Result { - self.create_sender_errors - .lock() - .pop_front() - .map_or(Ok(()), Err)?; - let error = self.send_errors.lock().pop_front().map_or(Ok(()), Err); - Ok(MockNetworkSender { - sender: self.send_message.0.clone(), - peer_id, - protocol, - error, - }) - } -} - -impl MockNetwork { - pub fn new(oneshot_sender: oneshot::Sender<()>) -> Self { - MockNetwork { - send_message: Channel::new(), - event_sinks: Arc::new(Mutex::new(vec![])), - event_stream_taken_oneshot: Arc::new(Mutex::new(Some(oneshot_sender))), - create_sender_errors: Arc::new(Mutex::new(VecDeque::new())), - send_errors: Arc::new(Mutex::new(VecDeque::new())), - } - } - - pub fn emit_event(&mut self, event: MockEvent) { - for sink in &*self.event_sinks.lock() { - sink.unbounded_send(event.clone()).unwrap(); - } - } - - // Consumes the network asserting there are no unreceived messages in the channels. - pub async fn close_channels(self) { - self.event_sinks.lock().clear(); - // We disable it until tests regarding new substrate network protocol are created. - // assert!(self.add_reserved.close().await.is_none()); - // assert!(self.remove_reserved.close().await.is_none()); - assert!(self.send_message.close().await.is_none()); - } -} - pub async fn crypto_basics( num_crypto_basics: usize, ) -> (Vec<(NodeIndex, AuthorityPen)>, AuthorityVerifier) { diff --git a/finality-aleph/src/network/mod.rs b/finality-aleph/src/network/mod.rs index 0a41c4aedd..4f76b90dcb 100644 --- a/finality-aleph/src/network/mod.rs +++ b/finality-aleph/src/network/mod.rs @@ -4,8 +4,6 @@ use std::{ hash::Hash, }; -use async_trait::async_trait; -use bytes::Bytes; use codec::Codec; use sp_api::NumberFor; use sp_runtime::traits::Block; @@ -13,33 +11,38 @@ use sp_runtime::traits::Block; use crate::abft::Recipient; mod component; +mod gossip; mod io; mod manager; #[cfg(test)] pub mod mock; -mod service; mod session; mod split; +mod substrate; pub use component::{ Network as ComponentNetwork, NetworkExt as ComponentNetworkExt, NetworkMap as ComponentNetworkMap, Receiver as ReceiverComponent, Sender as SenderComponent, SimpleNetwork, }; +pub use gossip::{Network as GossipNetwork, Protocol, Service as GossipService}; pub use io::setup as setup_io; use manager::SessionCommand; pub use manager::{ ConnectionIO as ConnectionManagerIO, ConnectionManager, ConnectionManagerConfig, }; -pub use service::{Service, IO as NetworkServiceIO}; pub use session::{Manager as SessionManager, ManagerError, Sender, IO as SessionManagerIO}; pub use split::{split, Split}; +pub use substrate::protocol_name; #[cfg(test)] pub mod testing { use super::manager::LegacyAuthentication; - pub use super::manager::{ - Authentication, DataInSession, DiscoveryMessage, LegacyDiscoveryMessage, - PeerAuthentications, SessionHandler, VersionedAuthentication, + pub use super::{ + gossip::mock::{MockEvent, MockRawNetwork}, + manager::{ + Authentication, DataInSession, DiscoveryMessage, LegacyDiscoveryMessage, + PeerAuthentications, SessionHandler, VersionedAuthentication, + }, }; use crate::testing::mocks::validator_network::MockAddressingInformation; @@ -96,54 +99,6 @@ pub trait AddressingInformation: Debug + Hash + Codec + Clone + Eq + Send + Sync fn verify(&self) -> bool; } -/// The Authentication protocol is used for validator discovery. -#[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)] -pub enum Protocol { - Authentication, -} - -/// Abstraction over a sender to network. -#[async_trait] -pub trait NetworkSender: Send + Sync + 'static { - type SenderError: std::error::Error; - - /// A method for sending data. Returns Error if not connected to the peer. - async fn send<'a>( - &'a self, - data: impl Into> + Send + Sync + 'static, - ) -> Result<(), Self::SenderError>; -} - -#[derive(Clone)] -pub enum Event

{ - StreamOpened(P, Protocol), - StreamClosed(P, Protocol), - Messages(Vec<(Protocol, Bytes)>), -} - -#[async_trait] -pub trait EventStream

{ - async fn next_event(&mut self) -> Option>; -} - -/// Abstraction over a network. -pub trait Network: Clone + Send + Sync + 'static { - type SenderError: std::error::Error; - type NetworkSender: NetworkSender; - type PeerId: Clone + Debug + Eq + Hash + Send; - type EventStream: EventStream; - - /// Returns a stream of events representing what happens on the network. - fn event_stream(&self) -> Self::EventStream; - - /// Returns a sender to the given peer using a given protocol. Returns Error if not connected to the peer. - fn sender( - &self, - peer_id: Self::PeerId, - protocol: Protocol, - ) -> Result; -} - /// Abstraction for requesting own network addressing information. pub trait NetworkIdentity { type PeerId: PeerId; diff --git a/finality-aleph/src/substrate_network.rs b/finality-aleph/src/network/substrate.rs similarity index 97% rename from finality-aleph/src/substrate_network.rs rename to finality-aleph/src/network/substrate.rs index 0f3e0b2bae..57d0c9745b 100644 --- a/finality-aleph/src/substrate_network.rs +++ b/finality-aleph/src/network/substrate.rs @@ -17,7 +17,10 @@ use sp_api::NumberFor; use sp_consensus::SyncOracle; use sp_runtime::traits::Block; -use crate::network::{Event, EventStream, Network, NetworkSender, Protocol, RequestBlocks}; +use crate::network::{ + gossip::{Event, EventStream, NetworkSender, Protocol, RawNetwork}, + RequestBlocks, +}; impl RequestBlocks for Arc> { fn request_justification(&self, hash: &B::Hash, number: NumberFor) { @@ -191,7 +194,7 @@ impl EventStream for NetworkEventStream { } } -impl Network for Arc> { +impl RawNetwork for Arc> { type SenderError = SenderError; type NetworkSender = SubstrateNetworkSender; type PeerId = PeerId; diff --git a/finality-aleph/src/nodes/validator_node.rs b/finality-aleph/src/nodes/validator_node.rs index a167ca065e..81b651b44e 100644 --- a/finality-aleph/src/nodes/validator_node.rs +++ b/finality-aleph/src/nodes/validator_node.rs @@ -12,8 +12,7 @@ use sp_runtime::traits::Block; use crate::{ crypto::AuthorityPen, network::{ - setup_io, ConnectionManager, ConnectionManagerConfig, Service as NetworkService, - SessionManager, + setup_io, ConnectionManager, ConnectionManagerConfig, GossipService, SessionManager, }, nodes::{setup_justification_handler, JustificationParams}, party::{ @@ -92,6 +91,10 @@ where validator_network_service.run(exit).await }); + let (gossip_network_service, gossip_network) = + GossipService::new(network.clone(), spawn_handle.clone()); + let gossip_network_task = async move { gossip_network_service.run().await }; + let block_requester = network.clone(); let map_updater = SessionMapUpdater::<_, _, B>::new( AuthorityProviderImpl::new(client.clone()), @@ -106,7 +109,7 @@ where let (authority_justification_tx, handler_task) = setup_justification_handler(JustificationParams { justification_rx, - network: network.clone(), + network, client: client.clone(), blockchain_backend, metrics: metrics.clone(), @@ -115,7 +118,7 @@ where session_map: session_authorities.clone(), }); - let (connection_io, network_io, session_io) = setup_io(validator_network); + let (connection_io, session_io) = setup_io(validator_network, gossip_network); let connection_manager = ConnectionManager::new( network_identity, @@ -123,22 +126,19 @@ where ); let connection_manager_task = async move { - connection_io - .run(connection_manager) - .await - .expect("Failed to run connection manager") + if let Err(e) = connection_io.run(connection_manager).await { + panic!("Failed to run connection manager: {}", e); + } }; let session_manager = SessionManager::new(session_io); - let network = NetworkService::new(network.clone(), spawn_handle.clone(), network_io); - let network_task = async move { network.run().await }; spawn_handle.spawn("aleph/justification_handler", None, handler_task); debug!(target: "aleph-party", "JustificationHandler has started."); spawn_handle.spawn("aleph/connection_manager", None, connection_manager_task); - spawn_handle.spawn("aleph/network", None, network_task); - debug!(target: "aleph-party", "Network has started."); + spawn_handle.spawn("aleph/gossip_network", None, gossip_network_task); + debug!(target: "aleph-party", "Gossip network has started."); let party = ConsensusParty::new(ConsensusPartyParams { session_authorities, diff --git a/finality-aleph/src/testing/network.rs b/finality-aleph/src/testing/network.rs index 1bfa47ac83..2cb7dd7e2f 100644 --- a/finality-aleph/src/testing/network.rs +++ b/finality-aleph/src/testing/network.rs @@ -12,14 +12,14 @@ use tokio::{runtime::Handle, task::JoinHandle, time::timeout}; use crate::{ crypto::{AuthorityPen, AuthorityVerifier}, network::{ - mock::{crypto_basics, MockData, MockEvent, MockNetwork}, + mock::{crypto_basics, MockData}, setup_io, testing::{ authentication, legacy_authentication, DataInSession, LegacyDiscoveryMessage, - SessionHandler, VersionedAuthentication, + MockEvent, MockRawNetwork, SessionHandler, VersionedAuthentication, }, AddressingInformation, ConnectionManager, ConnectionManagerConfig, DataNetwork, - NetworkIdentity, Protocol, Service as NetworkService, SessionManager, + GossipService, NetworkIdentity, Protocol, SessionManager, }, testing::mocks::validator_network::{ random_address_from, MockAddressingInformation, MockNetwork as MockValidatorNetwork, @@ -72,12 +72,12 @@ struct TestData { pub authorities: Vec, pub authority_verifier: AuthorityVerifier, pub session_manager: SessionManager, - pub network: MockNetwork, + pub network: MockRawNetwork, pub validator_network: MockValidatorNetwork>, network_manager_exit_tx: oneshot::Sender<()>, - network_service_exit_tx: oneshot::Sender<()>, + gossip_service_exit_tx: oneshot::Sender<()>, network_manager_handle: JoinHandle<()>, - network_service_handle: JoinHandle<()>, + gossip_service_handle: JoinHandle<()>, // `TaskManager` can't be dropped for `SpawnTaskHandle` to work _task_manager: TaskManager, } @@ -100,11 +100,14 @@ async fn prepare_one_session_test_data() -> TestData { // Prepare Network let (event_stream_tx, event_stream_rx) = oneshot::channel(); let (network_manager_exit_tx, network_manager_exit_rx) = oneshot::channel(); - let (network_service_exit_tx, network_service_exit_rx) = oneshot::channel(); - let network = MockNetwork::new(event_stream_tx); + let (gossip_service_exit_tx, gossip_service_exit_rx) = oneshot::channel(); + let network = MockRawNetwork::new(event_stream_tx); let validator_network = MockValidatorNetwork::new(); - let (connection_io, network_io, session_io) = setup_io(validator_network.clone()); + let (gossip_service, gossip_network) = + GossipService::new(network.clone(), task_manager.spawn_handle()); + + let (connection_io, session_io) = setup_io(validator_network.clone(), gossip_network); let connection_manager = ConnectionManager::new( authorities[0].clone(), @@ -113,9 +116,6 @@ async fn prepare_one_session_test_data() -> TestData { let session_manager = SessionManager::new(session_io); - let network_service = - NetworkService::new(network.clone(), task_manager.spawn_handle(), network_io); - let network_manager_task = async move { tokio::select! { _ = connection_io @@ -124,14 +124,14 @@ async fn prepare_one_session_test_data() -> TestData { }; }; - let network_service_task = async move { + let gossip_service_task = async move { tokio::select! { - _ = network_service.run() => { }, - _ = network_service_exit_rx => { }, + _ = gossip_service.run() => { }, + _ = gossip_service_exit_rx => { }, }; }; let network_manager_handle = tokio::spawn(network_manager_task); - let network_service_handle = tokio::spawn(network_service_task); + let gossip_service_handle = tokio::spawn(gossip_service_task); event_stream_rx.await.unwrap(); @@ -142,9 +142,9 @@ async fn prepare_one_session_test_data() -> TestData { network, validator_network, network_manager_exit_tx, - network_service_exit_tx, + gossip_service_exit_tx, network_manager_handle, - network_service_handle, + gossip_service_handle, _task_manager: task_manager, } } @@ -279,9 +279,9 @@ impl TestData { async fn cleanup(self) { self.network_manager_exit_tx.send(()).unwrap(); - self.network_service_exit_tx.send(()).unwrap(); + self.gossip_service_exit_tx.send(()).unwrap(); self.network_manager_handle.await.unwrap(); - self.network_service_handle.await.unwrap(); + self.gossip_service_handle.await.unwrap(); while self.network.send_message.try_next().await.is_some() {} self.network.close_channels().await; self.validator_network.close_channels().await;