Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 29 additions & 6 deletions bin/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ use aleph_primitives::AlephSessionApi;
use aleph_runtime::{self, opaque::Block, RuntimeApi, MAX_BLOCK_SIZE};
use finality_aleph::{
run_nonvalidator_node, run_validator_node, AlephBlockImport, AlephConfig,
JustificationNotification, Metrics, MillisecsPerBlock, Protocol, SessionPeriod,
JustificationNotification, Metrics, MillisecsPerBlock, Protocol, ProtocolNaming, SessionPeriod,
};
use futures::channel::mpsc;
use log::warn;
use sc_client_api::{Backend, HeaderBackend};
use sc_client_api::{Backend, BlockBackend, HeaderBackend};
use sc_consensus_aura::{ImportQueueParams, SlotProportion, StartAuraParams};
use sc_consensus_slots::BackoffAuthoringBlocksStrategy;
use sc_network::NetworkService;
Expand Down Expand Up @@ -212,14 +212,35 @@ fn setup(
(
RpcHandlers,
Arc<NetworkService<Block, <Block as BlockT>::Hash>>,
ProtocolNaming,
NetworkStarter,
),
ServiceError,
> {
let genesis_hash = client
.block_hash(0)
.ok()
.flatten()
.expect("we should have a hash");
let chain_prefix = match config.chain_spec.fork_id() {
Some(fork_id) => format!("/{}/{}", genesis_hash, fork_id),
None => format!("/{}", genesis_hash),
};
let protocol_naming = ProtocolNaming::new(chain_prefix);
config
.network
.extra_sets
.push(finality_aleph::peers_set_config(
protocol_naming.clone(),
Protocol::Authentication,
));
config
.network
.extra_sets
.push(finality_aleph::peers_set_config(Protocol::Authentication));
.push(finality_aleph::peers_set_config(
protocol_naming.clone(),
Protocol::BlockSync,
));

let (network, system_rpc_tx, tx_handler_controller, network_starter) =
sc_service::build_network(sc_service::BuildNetworkParams {
Expand Down Expand Up @@ -262,7 +283,7 @@ fn setup(
telemetry: telemetry.as_mut(),
})?;

Ok((rpc_handlers, network, network_starter))
Ok((rpc_handlers, network, protocol_naming, network_starter))
}

/// Builds a new service for a full client.
Expand Down Expand Up @@ -308,7 +329,7 @@ pub fn new_authority(
let backoff_authoring_blocks = Some(LimitNonfinalized(aleph_config.max_nonfinalized_blocks()));
let prometheus_registry = config.prometheus_registry().cloned();

let (_rpc_handlers, network, network_starter) = setup(
let (_rpc_handlers, network, protocol_naming, network_starter) = setup(
config,
backend.clone(),
&keystore_container,
Expand Down Expand Up @@ -383,6 +404,7 @@ pub fn new_authority(
backup_saving_path: backup_path,
external_addresses: aleph_config.external_addresses(),
validator_port: aleph_config.validator_port(),
protocol_naming,
};
task_manager.spawn_essential_handle().spawn_blocking(
"aleph",
Expand Down Expand Up @@ -418,7 +440,7 @@ pub fn new_full(
.path(),
);

let (_rpc_handlers, network, network_starter) = setup(
let (_rpc_handlers, network, protocol_naming, network_starter) = setup(
config,
backend.clone(),
&keystore_container,
Expand Down Expand Up @@ -460,6 +482,7 @@ pub fn new_full(
backup_saving_path: backup_path,
external_addresses: aleph_config.external_addresses(),
validator_port: aleph_config.validator_port(),
protocol_naming,
};

task_manager.spawn_essential_handle().spawn_blocking(
Expand Down
19 changes: 10 additions & 9 deletions finality-aleph/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tokio::time::Duration;
use crate::{
abft::{CurrentNetworkData, LegacyNetworkData, CURRENT_VERSION, LEGACY_VERSION},
aggregation::{CurrentRmcNetworkData, LegacyRmcNetworkData},
network::{data::split::Split, protocol_name},
network::data::split::Split,
session::{
first_block_of_session, last_block_of_session, session_id_from_block_num,
SessionBoundaries, SessionId,
Expand Down Expand Up @@ -51,7 +51,7 @@ pub use abft::{Keychain, NodeCount, NodeIndex, Recipient, SignatureSet, SpawnHan
pub use aleph_primitives::{AuthorityId, AuthorityPair, AuthoritySignature};
pub use import::AlephBlockImport;
pub use justification::{AlephJustification, JustificationNotification};
pub use network::Protocol;
pub use network::{Protocol, ProtocolNaming};
pub use nodes::{run_nonvalidator_node, run_validator_node};
pub use session::SessionPeriod;

Expand All @@ -67,21 +67,21 @@ enum Error {
}

/// Returns a NonDefaultSetConfig for the specified protocol.
pub fn peers_set_config(protocol: Protocol) -> sc_network_common::config::NonDefaultSetConfig {
let name = protocol_name(&protocol);

pub fn peers_set_config(
naming: ProtocolNaming,
protocol: Protocol,
) -> sc_network_common::config::NonDefaultSetConfig {
let mut config = sc_network_common::config::NonDefaultSetConfig::new(
name,
naming.protocol_name(&protocol),
// max_notification_size should be larger than the maximum possible honest message size (in bytes).
// Max size of alert is UNIT_SIZE * MAX_UNITS_IN_ALERT ~ 100 * 5000 = 50000 bytes
// Max size of parents response UNIT_SIZE * N_MEMBERS ~ 100 * N_MEMBERS
// When adding other (large) message types we need to make sure this limit is fine.
1024 * 1024,
);

config.set_config = match protocol {
Protocol::Authentication => sc_network_common::config::SetConfig::default(),
};
config.set_config = sc_network_common::config::SetConfig::default();
config.add_fallback_names(naming.fallback_protocol_names(&protocol));
config
}

Expand Down Expand Up @@ -254,6 +254,7 @@ pub struct AlephConfig<B: Block, H: ExHashT, C, SC, BB> {
pub backup_saving_path: Option<PathBuf>,
pub external_addresses: Vec<String>,
pub validator_port: u16,
pub protocol_naming: ProtocolNaming,
}

pub trait BlockchainBackend<B: Block> {
Expand Down
5 changes: 4 additions & 1 deletion finality-aleph/src/network/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@ pub trait Network<D: Data>: Send + 'static {
async fn next(&mut self) -> Result<(D, Self::PeerId), Self::Error>;
}

/// The Authentication protocol is used for validator discovery.
/// Protocols used by the network.
#[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)]
pub enum Protocol {
/// The authentication protocol is used for validator discovery.
Authentication,
/// The block synchronization protocol.
BlockSync,
}

/// Abstraction over a sender to the raw network.
Expand Down
64 changes: 51 additions & 13 deletions finality-aleph/src/network/gossip/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@ enum Command<D: Data, P: Clone + Debug + Eq + Hash + Send + 'static> {
pub struct Service<N: RawNetwork, D: Data> {
network: N,
messages_from_user: mpsc::UnboundedReceiver<Command<D, N::PeerId>>,
messages_for_user: mpsc::UnboundedSender<(D, N::PeerId)>,
messages_for_authentication_user: mpsc::UnboundedSender<(D, N::PeerId)>,
messages_for_block_sync_user: mpsc::UnboundedSender<(D, N::PeerId)>,
authentication_connected_peers: HashSet<N::PeerId>,
authentication_peer_senders: HashMap<N::PeerId, TracingUnboundedSender<D>>,
block_sync_connected_peers: HashSet<N::PeerId>,
block_sync_peer_senders: HashMap<N::PeerId, TracingUnboundedSender<D>>,
spawn_handle: SpawnTaskHandle,
}

struct ServiceInterface<D: Data, P: Clone + Debug + Eq + Hash + Send + 'static> {
protocol: Protocol,
messages_from_service: mpsc::UnboundedReceiver<(D, P)>,
messages_for_service: mpsc::UnboundedSender<Command<D, P>>,
}
Expand Down Expand Up @@ -70,7 +74,7 @@ impl<D: Data, P: Clone + Debug + Eq + Hash + Send + 'static> Network<D> for Serv

fn send_to(&mut self, data: D, peer_id: Self::PeerId) -> Result<(), Self::Error> {
self.messages_for_service
.unbounded_send(Command::Send(data, peer_id, Protocol::Authentication))
.unbounded_send(Command::Send(data, peer_id, self.protocol))
.map_err(|_| Error::ServiceStopped)
}

Expand All @@ -80,17 +84,13 @@ impl<D: Data, P: Clone + Debug + Eq + Hash + Send + 'static> Network<D> for Serv
peer_ids: HashSet<Self::PeerId>,
) -> Result<(), Self::Error> {
self.messages_for_service
.unbounded_send(Command::SendToRandom(
data,
peer_ids,
Protocol::Authentication,
))
.unbounded_send(Command::SendToRandom(data, peer_ids, self.protocol))
.map_err(|_| Error::ServiceStopped)
}

fn broadcast(&mut self, data: D) -> Result<(), Self::Error> {
self.messages_for_service
.unbounded_send(Command::Broadcast(data, Protocol::Authentication))
.unbounded_send(Command::Broadcast(data, self.protocol))
.map_err(|_| Error::ServiceStopped)
}

Expand All @@ -115,20 +115,32 @@ impl<N: RawNetwork, D: Data> Service<N, D> {
) -> (
Service<N, D>,
impl Network<D, Error = Error, PeerId = N::PeerId>,
impl Network<D, Error = Error, PeerId = N::PeerId>,
) {
let (messages_for_user, messages_from_service) = mpsc::unbounded();
let (messages_for_authentication_user, messages_from_authentication_service) =
mpsc::unbounded();
let (messages_for_block_sync_user, messages_from_block_sync_service) = mpsc::unbounded();
let (messages_for_service, messages_from_user) = mpsc::unbounded();
(
Service {
network,
messages_from_user,
messages_for_user,
messages_for_authentication_user,
messages_for_block_sync_user,
spawn_handle,
authentication_connected_peers: HashSet::new(),
authentication_peer_senders: HashMap::new(),
block_sync_connected_peers: HashSet::new(),
block_sync_peer_senders: HashMap::new(),
},
ServiceInterface {
protocol: Protocol::Authentication,
messages_from_service: messages_from_authentication_service,
messages_for_service: messages_for_service.clone(),
},
ServiceInterface {
messages_from_service,
protocol: Protocol::BlockSync,
messages_from_service: messages_from_block_sync_service,
messages_for_service,
},
)
Expand All @@ -141,6 +153,7 @@ impl<N: RawNetwork, D: Data> Service<N, D> {
) -> Option<&mut TracingUnboundedSender<D>> {
match protocol {
Protocol::Authentication => self.authentication_peer_senders.get_mut(peer),
Protocol::BlockSync => self.block_sync_peer_senders.get_mut(peer),
}
}

Expand Down Expand Up @@ -211,6 +224,7 @@ impl<N: RawNetwork, D: Data> Service<N, D> {
fn protocol_peers(&self, protocol: Protocol) -> &HashSet<N::PeerId> {
match protocol {
Protocol::Authentication => &self.authentication_connected_peers,
Protocol::BlockSync => &self.block_sync_connected_peers,
}
}

Expand Down Expand Up @@ -262,6 +276,12 @@ impl<N: RawNetwork, D: Data> Service<N, D> {
self.authentication_peer_senders.insert(peer.clone(), tx);
rx
}
Protocol::BlockSync => {
let (tx, rx) = tracing_unbounded("mpsc_notification_stream_block_sync");
self.block_sync_connected_peers.insert(peer.clone());
self.block_sync_peer_senders.insert(peer.clone(), tx);
rx
}
};
self.spawn_handle.spawn(
"aleph/network/peer_sender",
Expand All @@ -276,19 +296,33 @@ impl<N: RawNetwork, D: Data> Service<N, D> {
self.authentication_connected_peers.remove(&peer);
self.authentication_peer_senders.remove(&peer);
}
Protocol::BlockSync => {
self.block_sync_connected_peers.remove(&peer);
self.block_sync_peer_senders.remove(&peer);
}
}
}
Messages(peer_id, messages) => {
for (protocol, data) in messages.into_iter() {
match protocol {
Protocol::Authentication => match D::decode(&mut &data[..]) {
Ok(data) => self
.messages_for_user
.messages_for_authentication_user
.unbounded_send((data, peer_id.clone()))?,
Err(e) => {
warn!(target: "aleph-network", "Error decoding authentication protocol message: {}", e)
}
},
// This is a bit of a placeholder for now, as we are not yet using this
// protocol. In the future we will not be using the same D as above.
Protocol::BlockSync => match D::decode(&mut &data[..]) {
Ok(data) => self
.messages_for_block_sync_user
.unbounded_send((data, peer_id.clone()))?,
Err(e) => {
warn!(target: "aleph-network", "Error decoding block sync protocol message: {}", e)
}
},
};
}
}
Expand All @@ -303,6 +337,10 @@ impl<N: RawNetwork, D: Data> Service<N, D> {
"authentication connected peers - {:?}; ",
self.authentication_connected_peers.len()
));
status.push_str(&format!(
"block sync connected peers - {:?}; ",
self.block_sync_connected_peers.len()
));

info!(target: "aleph-network", "{}", status);
}
Expand Down Expand Up @@ -379,7 +417,7 @@ mod tests {

// Prepare service
let network = MockRawNetwork::new(event_stream_oneshot_tx);
let (service, gossip_network) =
let (service, gossip_network, _) =
Service::new(network.clone(), task_manager.spawn_handle());
let gossip_network = Box::new(gossip_network);

Expand Down
2 changes: 1 addition & 1 deletion finality-aleph/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub mod tcp;
#[cfg(test)]
pub use gossip::mock::{MockEvent, MockRawNetwork};
pub use gossip::{Network as GossipNetwork, Protocol, Service as GossipService};
pub use substrate::protocol_name;
pub use substrate::{ProtocolNaming, SubstrateNetwork};

/// Represents the id of an arbitrary node.
pub trait PeerId: PartialEq + Eq + Clone + Debug + Display + Hash + Codec + Send {
Expand Down
Loading