Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 42 additions & 110 deletions core/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,50 +17,53 @@
use crate::{
debug_info, discovery::DiscoveryBehaviour, discovery::DiscoveryOut, DiscoveryNetBehaviour, event::DhtEvent
};
use crate::{ExHashT, specialization::NetworkSpecialization};
use crate::protocol::{CustomMessageOutcome, Protocol};
use futures::prelude::*;
use libp2p::NetworkBehaviour;
use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, protocols_handler::IntoProtocolsHandler, PublicKey};
use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction};
use libp2p::core::swarm::{NetworkBehaviourEventProcess, PollParameters};
use libp2p::core::{Multiaddr, PeerId, PublicKey};
use libp2p::core::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess};
use libp2p::core::{nodes::Substream, muxing::StreamMuxerBox};
use libp2p::multihash::Multihash;
#[cfg(not(target_os = "unknown"))]
use libp2p::core::swarm::toggle::Toggle;
#[cfg(not(target_os = "unknown"))]
use libp2p::mdns::{Mdns, MdnsEvent};
use log::warn;
use runtime_primitives::traits::Block as BlockT;
use std::iter;
use void;

/// General behaviour of the network.
/// General behaviour of the network. Combines all protocols together.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourOut<TBehaviourEv>", poll_method = "poll")]
pub struct Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
/// Main protocol that handles everything except the discovery and the technicalities.
user_protocol: UserBehaviourWrap<TBehaviour>,
#[behaviour(out_event = "BehaviourOut<B>", poll_method = "poll")]
pub struct Behaviour<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> {
/// All the substrate-specific protocols.
substrate: Protocol<B, S, H>,
/// Periodically pings and identifies the nodes we are connected to, and store information in a
/// cache.
debug_info: debug_info::DebugInfoBehaviour<TSubstream>,
debug_info: debug_info::DebugInfoBehaviour<Substream<StreamMuxerBox>>,
/// Discovers nodes of the network. Defined below.
discovery: DiscoveryBehaviour<TSubstream>,
discovery: DiscoveryBehaviour<Substream<StreamMuxerBox>>,
/// Discovers nodes on the local network.
#[cfg(not(target_os = "unknown"))]
mdns: Toggle<Mdns<TSubstream>>,
mdns: Toggle<Mdns<Substream<StreamMuxerBox>>>,

/// Queue of events to produce for the outside.
#[behaviour(ignore)]
events: Vec<BehaviourOut<TBehaviourEv>>,
events: Vec<BehaviourOut<B>>,
}

/// A wrapper for the behavbour event that adds DHT-related event variant.
pub enum BehaviourOut<TBehaviourEv> {
Behaviour(TBehaviourEv),
/// Event generated by `Behaviour`.
pub enum BehaviourOut<B: BlockT> {
SubstrateAction(CustomMessageOutcome<B>),
Dht(DhtEvent),
}

impl<TBehaviour, TBehaviourEv, TSubstream> Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Behaviour<B, S, H> {
/// Builds a new `Behaviour`.
pub fn new(
user_protocol: TBehaviour,
substrate: Protocol<B, S, H>,
user_agent: String,
local_public_key: PublicKey,
known_addresses: Vec<(PeerId, Multiaddr)>,
Expand All @@ -74,7 +77,7 @@ impl<TBehaviour, TBehaviourEv, TSubstream> Behaviour<TBehaviour, TBehaviourEv, T
}

Behaviour {
user_protocol: UserBehaviourWrap(user_protocol),
substrate,
debug_info,
discovery: DiscoveryBehaviour::new(local_public_key, known_addresses),
#[cfg(not(target_os = "unknown"))]
Expand Down Expand Up @@ -113,13 +116,13 @@ impl<TBehaviour, TBehaviourEv, TSubstream> Behaviour<TBehaviour, TBehaviourEv, T
}

/// Returns a shared reference to the user protocol.
pub fn user_protocol(&self) -> &TBehaviour {
&self.user_protocol.0
pub fn user_protocol(&self) -> &Protocol<B, S, H> {
&self.substrate
}

/// Returns a mutable reference to the user protocol.
pub fn user_protocol_mut(&mut self) -> &mut TBehaviour {
&mut self.user_protocol.0
pub fn user_protocol_mut(&mut self) -> &mut Protocol<B, S, H> {
&mut self.substrate
}

/// Start querying a record from the DHT. Will later produce either a `ValueFound` or a `ValueNotFound` event.
Expand All @@ -133,23 +136,22 @@ impl<TBehaviour, TBehaviourEv, TSubstream> Behaviour<TBehaviour, TBehaviourEv, T
}
}

impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<void::Void> for
Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> NetworkBehaviourEventProcess<void::Void> for
Behaviour<B, S, H> {
fn inject_event(&mut self, event: void::Void) {
void::unreachable(event)
}
}

impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<UserEventWrap<TBehaviourEv>> for
Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
fn inject_event(&mut self, event: UserEventWrap<TBehaviourEv>) {
self.events.push(BehaviourOut::Behaviour(event.0));
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> NetworkBehaviourEventProcess<CustomMessageOutcome<B>> for
Behaviour<B, S, H> {
fn inject_event(&mut self, event: CustomMessageOutcome<B>) {
self.events.push(BehaviourOut::SubstrateAction(event));
}
}

impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<debug_info::DebugInfoEvent>
for Behaviour<TBehaviour, TBehaviourEv, TSubstream>
where TBehaviour: DiscoveryNetBehaviour {
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> NetworkBehaviourEventProcess<debug_info::DebugInfoEvent>
for Behaviour<B, S, H> {
fn inject_event(&mut self, event: debug_info::DebugInfoEvent) {
let debug_info::DebugInfoEvent::Identified { peer_id, mut info } = event;
if !info.protocol_version.contains("substrate") {
Expand All @@ -165,17 +167,16 @@ impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<debug_in
for addr in &info.listen_addrs {
self.discovery.add_self_reported_address(&peer_id, addr.clone());
}
self.user_protocol.0.add_discovered_nodes(iter::once(peer_id.clone()));
self.substrate.add_discovered_nodes(iter::once(peer_id.clone()));
}
}

impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<DiscoveryOut>
for Behaviour<TBehaviour, TBehaviourEv, TSubstream>
where TBehaviour: DiscoveryNetBehaviour {
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> NetworkBehaviourEventProcess<DiscoveryOut>
for Behaviour<B, S, H> {
fn inject_event(&mut self, out: DiscoveryOut) {
match out {
DiscoveryOut::Discovered(peer_id) => {
self.user_protocol.0.add_discovered_nodes(iter::once(peer_id));
self.substrate.add_discovered_nodes(iter::once(peer_id));
}
DiscoveryOut::ValueFound(results) => {
self.events.push(BehaviourOut::Dht(DhtEvent::ValueFound(results)));
Expand All @@ -194,93 +195,24 @@ impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<Discover
}

#[cfg(not(target_os = "unknown"))]
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<MdnsEvent> for
Behaviour<TBehaviour, TBehaviourEv, TSubstream>
where TBehaviour: DiscoveryNetBehaviour {
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> NetworkBehaviourEventProcess<MdnsEvent> for
Behaviour<B, S, H> {
fn inject_event(&mut self, event: MdnsEvent) {
match event {
MdnsEvent::Discovered(list) => {
self.user_protocol.0.add_discovered_nodes(list.into_iter().map(|(peer_id, _)| peer_id));
self.substrate.add_discovered_nodes(list.into_iter().map(|(peer_id, _)| peer_id));
},
MdnsEvent::Expired(_) => {}
}
}
}

impl<TBehaviour, TBehaviourEv, TSubstream> Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
fn poll<TEv>(&mut self) -> Async<NetworkBehaviourAction<TEv, BehaviourOut<TBehaviourEv>>> {
impl<B: BlockT, S: NetworkSpecialization<B>, H: ExHashT> Behaviour<B, S, H> {
fn poll<TEv>(&mut self) -> Async<NetworkBehaviourAction<TEv, BehaviourOut<B>>> {
if !self.events.is_empty() {
return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)))
}

Async::NotReady
}
}

/// Because of limitations with the network behaviour custom derive and trait impl duplication, we
/// have to wrap the user protocol into a struct.
pub struct UserBehaviourWrap<TInner>(TInner);
/// Event produced by `UserBehaviourWrap`.
pub struct UserEventWrap<TInner>(TInner);
impl<TInner: NetworkBehaviour> NetworkBehaviour for UserBehaviourWrap<TInner> {
type ProtocolsHandler = TInner::ProtocolsHandler;
type OutEvent = UserEventWrap<TInner::OutEvent>;
fn new_handler(&mut self) -> Self::ProtocolsHandler { self.0.new_handler() }
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
self.0.addresses_of_peer(peer_id)
}
fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) {
self.0.inject_connected(peer_id, endpoint)
}
fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) {
self.0.inject_disconnected(peer_id, endpoint)
}
fn inject_node_event(
&mut self,
peer_id: PeerId,
event: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent
) {
self.0.inject_node_event(peer_id, event)
}
fn poll(
&mut self,
params: &mut impl PollParameters
) -> Async<
NetworkBehaviourAction<
<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
Self::OutEvent
>
> {
match self.0.poll(params) {
Async::NotReady => Async::NotReady,
Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) =>
Async::Ready(NetworkBehaviourAction::GenerateEvent(UserEventWrap(ev))),
Async::Ready(NetworkBehaviourAction::DialAddress { address }) =>
Async::Ready(NetworkBehaviourAction::DialAddress { address }),
Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) =>
Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }),
Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) =>
Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }),
Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) =>
Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
}
}
fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) {
self.0.inject_replaced(peer_id, closed_endpoint, new_endpoint)
}
fn inject_addr_reach_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, error: &dyn std::error::Error) {
self.0.inject_addr_reach_failure(peer_id, addr, error)
}
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
self.0.inject_dial_failure(peer_id)
}
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
self.0.inject_new_listen_addr(addr)
}
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
self.0.inject_expired_listen_addr(addr)
}
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
self.0.inject_new_external_addr(addr)
}
}
1 change: 0 additions & 1 deletion core/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ mod discovery;
mod on_demand_layer;
#[macro_use]
mod protocol;
mod protocol_behaviour;
mod service;
mod transport;

Expand Down
Loading