Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions finality-aleph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ use tokio::time::Duration;
use crate::{
abft::{CurrentNetworkData, LegacyNetworkData},
aggregation::{CurrentRmcNetworkData, LegacyRmcNetworkData},
network::Split,
network::{protocol_name, Split},
session::{
first_block_of_session, last_block_of_session, session_id_from_block_num,
SessionBoundaries, SessionId,
},
substrate_network::protocol_name,
VersionedTryFromError::{ExpectedNewGotOld, ExpectedOldGotNew},
};

Expand All @@ -45,7 +44,6 @@ mod nodes;
mod party;
mod session;
mod session_map;
mod substrate_network;
mod tcp_network;
#[cfg(test)]
pub mod testing;
Expand Down
132 changes: 132 additions & 0 deletions finality-aleph/src/network/gossip/mock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
use std::{collections::VecDeque, fmt, sync::Arc};

use async_trait::async_trait;
use futures::{
channel::{mpsc, oneshot},
StreamExt,
};
use parking_lot::Mutex;

use crate::{
network::{
gossip::{Event, EventStream, NetworkSender, Protocol, RawNetwork},
mock::Channel,
},
validator_network::mock::MockPublicKey,
};

pub type MockEvent = Event<MockPublicKey>;

pub struct MockEventStream(mpsc::UnboundedReceiver<MockEvent>);

#[async_trait]
impl EventStream<MockPublicKey> for MockEventStream {
async fn next_event(&mut self) -> Option<MockEvent> {
self.0.next().await
}
}

pub struct MockNetworkSender {
sender: mpsc::UnboundedSender<(Vec<u8>, MockPublicKey, Protocol)>,
peer_id: MockPublicKey,
protocol: Protocol,
error: Result<(), MockSenderError>,
}

#[async_trait]
impl NetworkSender for MockNetworkSender {
type SenderError = MockSenderError;

async fn send<'a>(
&'a self,
data: impl Into<Vec<u8>> + Send + Sync + 'static,
) -> Result<(), MockSenderError> {
self.error?;
self.sender
.unbounded_send((data.into(), self.peer_id.clone(), self.protocol))
.unwrap();
Ok(())
}
}

#[derive(Clone)]
pub struct MockRawNetwork {
pub send_message: Channel<(Vec<u8>, MockPublicKey, Protocol)>,
pub event_sinks: Arc<Mutex<Vec<mpsc::UnboundedSender<MockEvent>>>>,
event_stream_taken_oneshot: Arc<Mutex<Option<oneshot::Sender<()>>>>,
pub create_sender_errors: Arc<Mutex<VecDeque<MockSenderError>>>,
pub send_errors: Arc<Mutex<VecDeque<MockSenderError>>>,
}

#[derive(Debug, Copy, Clone)]
pub struct MockSenderError;

impl fmt::Display for MockSenderError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Some error message")
}
}

impl std::error::Error for MockSenderError {}

impl RawNetwork for MockRawNetwork {
type SenderError = MockSenderError;
type NetworkSender = MockNetworkSender;
type PeerId = MockPublicKey;
type EventStream = MockEventStream;

fn event_stream(&self) -> Self::EventStream {
let (tx, rx) = mpsc::unbounded();
self.event_sinks.lock().push(tx);
// Necessary for tests to detect when service takes event_stream
if let Some(tx) = self.event_stream_taken_oneshot.lock().take() {
tx.send(()).unwrap();
}
MockEventStream(rx)
}

fn sender(
&self,
peer_id: Self::PeerId,
protocol: Protocol,
) -> Result<Self::NetworkSender, Self::SenderError> {
self.create_sender_errors
.lock()
.pop_front()
.map_or(Ok(()), Err)?;
let error = self.send_errors.lock().pop_front().map_or(Ok(()), Err);
Ok(MockNetworkSender {
sender: self.send_message.0.clone(),
peer_id,
protocol,
error,
})
}
}

impl MockRawNetwork {
pub fn new(oneshot_sender: oneshot::Sender<()>) -> Self {
MockRawNetwork {
send_message: Channel::new(),
event_sinks: Arc::new(Mutex::new(vec![])),
event_stream_taken_oneshot: Arc::new(Mutex::new(Some(oneshot_sender))),
create_sender_errors: Arc::new(Mutex::new(VecDeque::new())),
send_errors: Arc::new(Mutex::new(VecDeque::new())),
}
}

pub fn emit_event(&mut self, event: MockEvent) {
for sink in &*self.event_sinks.lock() {
sink.unbounded_send(event.clone()).unwrap();
}
}

// Consumes the network asserting there are no unreceived messages in the channels.
pub async fn close_channels(self) {
self.event_sinks.lock().clear();
// We disable it until tests regarding new substrate network protocol are created.
// assert!(self.add_reserved.close().await.is_none());
// assert!(self.remove_reserved.close().await.is_none());
assert!(self.send_message.close().await.is_none());
}
}
78 changes: 78 additions & 0 deletions finality-aleph/src/network/gossip/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
//! A P2P-based gossip network, for now only for sending broadcasts.
use std::{
fmt::{Debug, Display},
hash::Hash,
};

use bytes::Bytes;

use crate::network::Data;

#[cfg(test)]
pub mod mock;
mod service;

pub use service::Service;

#[async_trait::async_trait]
/// Interface for the gossip network, currently only supports broadcasting and receiving data.
pub trait Network<D: Data>: Send + 'static {
type Error: Display + Send;

/// Broadcast data to all directly connected peers. Network-wide broadcasts have to be
/// implemented on top of this abstraction. Note that there might be no currently connected
/// peers, so there are no guarantees any single call sends anything even if no errors are
/// returned, retry appropriately.
fn broadcast(&mut self, data: D) -> Result<(), Self::Error>;

/// Receive some data from the network.
async fn next(&mut self) -> Result<D, Self::Error>;
}

/// The Authentication protocol is used for validator discovery.
#[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)]
pub enum Protocol {
Authentication,
}

/// Abstraction over a sender to the raw network.
#[async_trait::async_trait]
pub trait NetworkSender: Send + Sync + 'static {
type SenderError: std::error::Error;

/// A method for sending data. Returns Error if not connected to the peer.
async fn send<'a>(
&'a self,
data: impl Into<Vec<u8>> + Send + Sync + 'static,
) -> Result<(), Self::SenderError>;
}

#[derive(Clone)]
pub enum Event<P> {
StreamOpened(P, Protocol),
StreamClosed(P, Protocol),
Messages(Vec<(Protocol, Bytes)>),
}

#[async_trait::async_trait]
pub trait EventStream<P> {
async fn next_event(&mut self) -> Option<Event<P>>;
}

/// Abstraction over a raw p2p network.
pub trait RawNetwork: Clone + Send + Sync + 'static {
type SenderError: std::error::Error;
type NetworkSender: NetworkSender;
type PeerId: Clone + Debug + Eq + Hash + Send;
type EventStream: EventStream<Self::PeerId>;

/// Returns a stream of events representing what happens on the network.
fn event_stream(&self) -> Self::EventStream;

/// Returns a sender to the given peer using a given protocol. Returns Error if not connected to the peer.
fn sender(
&self,
peer_id: Self::PeerId,
protocol: Protocol,
) -> Result<Self::NetworkSender, Self::SenderError>;
}
Loading