Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:
RUSTC_WORKSPACE_WRAPPER: sccache
with:
command: clippy
args: -- --no-deps -D warnings
args: --all-targets --all-features -- --no-deps -D warnings

- name: Run Unit Test Suite
uses: actions-rs/cargo@v1
Expand Down
4 changes: 2 additions & 2 deletions e2e-tests/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::accounts::{get_sudo_key, get_validators_keys, get_validators_seeds, N

static GLOBAL_CONFIG: Lazy<Config> = Lazy::new(|| {
let node = get_env("NODE_URL").unwrap_or_else(|| "ws://127.0.0.1:9943".to_string());
let validator_count = get_env("VALIDATOR_COUNT").unwrap_or_else(|| 5);
let validator_count = get_env("VALIDATOR_COUNT").unwrap_or(5);
let validators_seeds = env::var("VALIDATORS_SEEDS")
.ok()
.map(|s| s.split(',').map(|s| s.to_string()).collect());
Expand Down Expand Up @@ -41,7 +41,7 @@ where
{
env::var(name).ok().map(|v| {
v.parse()
.expect(&format!("Failed to parse env var {}", name))
.unwrap_or_else(|_| panic!("Failed to parse env var {}", name))
})
}

Expand Down
24 changes: 22 additions & 2 deletions finality-aleph/src/network/clique/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@ use std::{

use codec::{Decode, Encode};
use futures::{
channel::{mpsc, oneshot},
StreamExt,
channel::{mpsc, mpsc::UnboundedReceiver, oneshot},
Future, StreamExt,
};
use log::info;
use rand::Rng;
use tokio::io::{duplex, AsyncRead, AsyncWrite, DuplexStream, ReadBuf};

use crate::network::{
clique::{
protocols::{ProtocolError, ResultForService},
ConnectionInfo, Dialer, Listener, Network, PeerAddressInfo, PublicKey, SecretKey,
Splittable, LOG_TARGET,
},
Expand Down Expand Up @@ -290,6 +291,12 @@ impl<D: Data> MockNetwork<D> {
}
}

impl<D: Data> Default for MockNetwork<D> {
fn default() -> Self {
Self::new()
}
}

/// Bidirectional in-memory stream that closes abruptly after a specified
/// number of poll_write calls.
#[derive(Debug)]
Expand Down Expand Up @@ -535,3 +542,16 @@ impl UnreliableConnectionMaker {
}
}
}

pub struct MockPrelims<D> {
pub id_incoming: MockPublicKey,
pub pen_incoming: MockSecretKey,
pub id_outgoing: MockPublicKey,
pub pen_outgoing: MockSecretKey,
pub incoming_handle: Pin<Box<dyn Future<Output = Result<(), ProtocolError<MockPublicKey>>>>>,
pub outgoing_handle: Pin<Box<dyn Future<Output = Result<(), ProtocolError<MockPublicKey>>>>>,
pub data_from_incoming: UnboundedReceiver<D>,
pub data_from_outgoing: Option<UnboundedReceiver<D>>,
pub result_from_incoming: UnboundedReceiver<ResultForService<MockPublicKey, D>>,
pub result_from_outgoing: UnboundedReceiver<ResultForService<MockPublicKey, D>>,
}
156 changes: 59 additions & 97 deletions finality-aleph/src/network/clique/protocols/v0/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,74 +125,59 @@ pub async fn incoming<SK: SecretKey, D: Data, S: Splittable>(

#[cfg(test)]
mod tests {
use futures::{
channel::{mpsc, mpsc::UnboundedReceiver},
pin_mut, FutureExt, StreamExt,
};
use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt};

use super::{incoming, outgoing, ProtocolError};
use crate::network::clique::{
mock::{key, MockPublicKey, MockSecretKey, MockSplittable},
protocols::{ConnectionType, ResultForService},
mock::{key, MockPrelims, MockSplittable},
protocols::ConnectionType,
Data,
};

fn prepare<D: Data>() -> (
MockPublicKey,
MockSecretKey,
MockPublicKey,
MockSecretKey,
impl futures::Future<Output = Result<(), ProtocolError<MockPublicKey>>>,
impl futures::Future<Output = Result<(), ProtocolError<MockPublicKey>>>,
UnboundedReceiver<D>,
UnboundedReceiver<ResultForService<MockPublicKey, D>>,
UnboundedReceiver<ResultForService<MockPublicKey, D>>,
) {
fn prepare<D: Data>() -> MockPrelims<D> {
let (stream_incoming, stream_outgoing) = MockSplittable::new(4096);
let (id_incoming, pen_incoming) = key();
let (id_outgoing, pen_outgoing) = key();
assert_ne!(id_incoming, id_outgoing);
let (incoming_result_for_service, result_from_incoming) = mpsc::unbounded();
let (outgoing_result_for_service, result_from_outgoing) = mpsc::unbounded();
let (data_for_user, data_from_incoming) = mpsc::unbounded::<D>();
let incoming_handle = incoming(
let incoming_handle = Box::pin(incoming(
stream_incoming,
pen_incoming.clone(),
incoming_result_for_service,
data_for_user,
);
let outgoing_handle = outgoing(
));
let outgoing_handle = Box::pin(outgoing(
stream_outgoing,
pen_outgoing.clone(),
id_incoming.clone(),
outgoing_result_for_service,
);
(
));
MockPrelims {
id_incoming,
pen_incoming,
id_outgoing,
pen_outgoing,
incoming_handle,
outgoing_handle,
data_from_incoming,
data_from_outgoing: None,
result_from_incoming,
result_from_outgoing,
)
}
}

#[tokio::test]
async fn send_data() {
let (
_id_incoming,
_pen_incoming,
_id_outgoing,
_pen_outgoing,
let MockPrelims {
incoming_handle,
outgoing_handle,
mut data_from_incoming,
_result_from_incoming,
result_from_incoming: _result_from_incoming,
mut result_from_outgoing,
) = prepare::<Vec<i32>>();
..
} = prepare::<Vec<i32>>();
let incoming_handle = incoming_handle.fuse();
let outgoing_handle = outgoing_handle.fuse();
pin_mut!(incoming_handle);
Expand All @@ -201,7 +186,7 @@ mod tests {
_ = &mut incoming_handle => panic!("incoming process unexpectedly finished"),
_ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"),
result = result_from_outgoing.next() => {
let (_, maybe_data_for_outgoing, connection_type) = result.expect("the chennel shouldn't be dropped");
let (_, maybe_data_for_outgoing, connection_type) = result.expect("the channel shouldn't be dropped");
assert_eq!(connection_type, ConnectionType::LegacyOutgoing);
let data_for_outgoing = maybe_data_for_outgoing.expect("successfully connected");
data_for_outgoing
Expand Down Expand Up @@ -231,17 +216,15 @@ mod tests {

#[tokio::test]
async fn closed_by_parent_service() {
let (
_id_incoming,
_pen_incoming,
let MockPrelims {
id_outgoing,
_pen_outgoing,
incoming_handle,
outgoing_handle,
_data_from_incoming,
data_from_incoming: _data_from_incoming,
mut result_from_incoming,
_result_from_outgoing,
) = prepare::<Vec<i32>>();
result_from_outgoing: _result_from_outgoing,
..
} = prepare::<Vec<i32>>();
let incoming_handle = incoming_handle.fuse();
let outgoing_handle = outgoing_handle.fuse();
pin_mut!(incoming_handle);
Expand All @@ -251,7 +234,7 @@ mod tests {
_ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"),
received = result_from_incoming.next() => {
// we drop the exit oneshot channel, thus finishing incoming_handle
let (received_id, _, connection_type) = received.expect("the chennel shouldn't be dropped");
let (received_id, _, connection_type) = received.expect("the channel shouldn't be dropped");
assert_eq!(connection_type, ConnectionType::LegacyIncoming);
assert_eq!(received_id, id_outgoing);
},
Expand All @@ -263,17 +246,14 @@ mod tests {

#[tokio::test]
async fn parent_service_dead() {
let (
_id_incoming,
_pen_incoming,
_id_outgoing,
_pen_outgoing,
let MockPrelims {
incoming_handle,
outgoing_handle,
_data_from_incoming,
data_from_incoming: _data_from_incoming,
result_from_incoming,
_result_from_outgoing,
) = prepare::<Vec<i32>>();
result_from_outgoing: _result_from_outgoing,
..
} = prepare::<Vec<i32>>();
std::mem::drop(result_from_incoming);
let incoming_handle = incoming_handle.fuse();
let outgoing_handle = outgoing_handle.fuse();
Expand All @@ -291,17 +271,14 @@ mod tests {

#[tokio::test]
async fn parent_user_dead() {
let (
_id_incoming,
_pen_incoming,
_id_outgoing,
_pen_outgoing,
let MockPrelims {
incoming_handle,
outgoing_handle,
data_from_incoming,
_result_from_incoming,
result_from_incoming: _result_from_incoming,
mut result_from_outgoing,
) = prepare::<Vec<i32>>();
..
} = prepare::<Vec<i32>>();
std::mem::drop(data_from_incoming);
let incoming_handle = incoming_handle.fuse();
let outgoing_handle = outgoing_handle.fuse();
Expand All @@ -311,7 +288,7 @@ mod tests {
_ = &mut incoming_handle => panic!("incoming process unexpectedly finished"),
_ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"),
result = result_from_outgoing.next() => {
let (_, maybe_data_for_outgoing, connection_type) = result.expect("the chennel shouldn't be dropped");
let (_, maybe_data_for_outgoing, connection_type) = result.expect("the channel shouldn't be dropped");
assert_eq!(connection_type, ConnectionType::LegacyOutgoing);
let data_for_outgoing = maybe_data_for_outgoing.expect("successfully connected");
data_for_outgoing
Expand All @@ -332,17 +309,14 @@ mod tests {

#[tokio::test]
async fn sender_dead_before_handshake() {
let (
_id_incoming,
_pen_incoming,
_id_outgoing,
_pen_outgoing,
let MockPrelims {
incoming_handle,
outgoing_handle,
_data_from_incoming,
_result_from_incoming,
_result_from_outgoing,
) = prepare::<Vec<i32>>();
data_from_incoming: _data_from_incoming,
result_from_incoming: _result_from_incoming,
result_from_outgoing: _result_from_outgoing,
..
} = prepare::<Vec<i32>>();
std::mem::drop(outgoing_handle);
match incoming_handle.await {
Err(ProtocolError::HandshakeError(_)) => (),
Expand All @@ -353,17 +327,14 @@ mod tests {

#[tokio::test]
async fn sender_dead_after_handshake() {
let (
_id_incoming,
_pen_incoming,
_id_outgoing,
_pen_outgoing,
let MockPrelims {
incoming_handle,
outgoing_handle,
_data_from_incoming,
data_from_incoming: _data_from_incoming,
mut result_from_incoming,
_result_from_outgoing,
) = prepare::<Vec<i32>>();
result_from_outgoing: _result_from_outgoing,
..
} = prepare::<Vec<i32>>();
let incoming_handle = incoming_handle.fuse();
pin_mut!(incoming_handle);
let (_, _exit, connection_type) = tokio::select! {
Expand All @@ -382,17 +353,14 @@ mod tests {

#[tokio::test]
async fn receiver_dead_before_handshake() {
let (
_id_incoming,
_pen_incoming,
_id_outgoing,
_pen_outgoing,
let MockPrelims {
incoming_handle,
outgoing_handle,
_data_from_incoming,
_result_from_incoming,
_result_from_outgoing,
) = prepare::<Vec<i32>>();
data_from_incoming: _data_from_incoming,
result_from_incoming: _result_from_incoming,
result_from_outgoing: _result_from_outgoing,
..
} = prepare::<Vec<i32>>();
std::mem::drop(incoming_handle);
match outgoing_handle.await {
Err(ProtocolError::HandshakeError(_)) => (),
Expand All @@ -403,17 +371,14 @@ mod tests {

#[tokio::test]
async fn receiver_dead_after_handshake() {
let (
_id_incoming,
_pen_incoming,
_id_outgoing,
_pen_outgoing,
let MockPrelims {
incoming_handle,
outgoing_handle,
_data_from_incoming,
data_from_incoming: _data_from_incoming,
mut result_from_incoming,
_result_from_outgoing,
) = prepare::<Vec<i32>>();
result_from_outgoing: _result_from_outgoing,
..
} = prepare::<Vec<i32>>();
let outgoing_handle = outgoing_handle.fuse();
pin_mut!(outgoing_handle);
let (_, _exit, connection_type) = tokio::select! {
Expand All @@ -434,17 +399,14 @@ mod tests {

#[tokio::test]
async fn receiver_dead_after_handshake_try_send_error() {
let (
_id_incoming,
_pen_incoming,
_id_outgoing,
_pen_outgoing,
let MockPrelims {
incoming_handle,
outgoing_handle,
_data_from_incoming,
data_from_incoming: _data_from_incoming,
mut result_from_incoming,
_result_from_outgoing,
) = prepare::<Vec<i32>>();
result_from_outgoing: _result_from_outgoing,
..
} = prepare::<Vec<i32>>();
let outgoing_handle = outgoing_handle.fuse();
pin_mut!(outgoing_handle);
let (_, _exit, connection_type) = tokio::select! {
Expand Down
Loading