diff --git a/core/network/src/behaviour.rs b/core/network/src/behaviour.rs index 2899234938c72..8d5c3fe9cce15 100644 --- a/core/network/src/behaviour.rs +++ b/core/network/src/behaviour.rs @@ -17,50 +17,53 @@ use crate::{ debug_info, discovery::DiscoveryBehaviour, discovery::DiscoveryOut, DiscoveryNetBehaviour, event::DhtEvent }; +use crate::{ExHashT, specialization::NetworkSpecialization}; +use crate::protocol::{CustomMessageOutcome, Protocol}; use futures::prelude::*; use libp2p::NetworkBehaviour; -use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, protocols_handler::IntoProtocolsHandler, PublicKey}; -use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction}; -use libp2p::core::swarm::{NetworkBehaviourEventProcess, PollParameters}; +use libp2p::core::{Multiaddr, PeerId, PublicKey}; +use libp2p::core::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}; +use libp2p::core::{nodes::Substream, muxing::StreamMuxerBox}; use libp2p::multihash::Multihash; #[cfg(not(target_os = "unknown"))] use libp2p::core::swarm::toggle::Toggle; #[cfg(not(target_os = "unknown"))] use libp2p::mdns::{Mdns, MdnsEvent}; use log::warn; +use runtime_primitives::traits::Block as BlockT; use std::iter; use void; -/// General behaviour of the network. +/// General behaviour of the network. Combines all protocols together. #[derive(NetworkBehaviour)] -#[behaviour(out_event = "BehaviourOut", poll_method = "poll")] -pub struct Behaviour { - /// Main protocol that handles everything except the discovery and the technicalities. - user_protocol: UserBehaviourWrap, +#[behaviour(out_event = "BehaviourOut", poll_method = "poll")] +pub struct Behaviour, H: ExHashT> { + /// All the substrate-specific protocols. + substrate: Protocol, /// Periodically pings and identifies the nodes we are connected to, and store information in a /// cache. - debug_info: debug_info::DebugInfoBehaviour, + debug_info: debug_info::DebugInfoBehaviour>, /// Discovers nodes of the network. Defined below. - discovery: DiscoveryBehaviour, + discovery: DiscoveryBehaviour>, /// Discovers nodes on the local network. #[cfg(not(target_os = "unknown"))] - mdns: Toggle>, + mdns: Toggle>>, /// Queue of events to produce for the outside. #[behaviour(ignore)] - events: Vec>, + events: Vec>, } -/// A wrapper for the behavbour event that adds DHT-related event variant. -pub enum BehaviourOut { - Behaviour(TBehaviourEv), +/// Event generated by `Behaviour`. +pub enum BehaviourOut { + SubstrateAction(CustomMessageOutcome), Dht(DhtEvent), } -impl Behaviour { +impl, H: ExHashT> Behaviour { /// Builds a new `Behaviour`. pub fn new( - user_protocol: TBehaviour, + substrate: Protocol, user_agent: String, local_public_key: PublicKey, known_addresses: Vec<(PeerId, Multiaddr)>, @@ -74,7 +77,7 @@ impl Behaviour Behaviour &TBehaviour { - &self.user_protocol.0 + pub fn user_protocol(&self) -> &Protocol { + &self.substrate } /// Returns a mutable reference to the user protocol. - pub fn user_protocol_mut(&mut self) -> &mut TBehaviour { - &mut self.user_protocol.0 + pub fn user_protocol_mut(&mut self) -> &mut Protocol { + &mut self.substrate } /// Start querying a record from the DHT. Will later produce either a `ValueFound` or a `ValueNotFound` event. @@ -133,23 +136,22 @@ impl Behaviour NetworkBehaviourEventProcess for -Behaviour { +impl, H: ExHashT> NetworkBehaviourEventProcess for +Behaviour { fn inject_event(&mut self, event: void::Void) { void::unreachable(event) } } -impl NetworkBehaviourEventProcess> for -Behaviour { - fn inject_event(&mut self, event: UserEventWrap) { - self.events.push(BehaviourOut::Behaviour(event.0)); +impl, H: ExHashT> NetworkBehaviourEventProcess> for +Behaviour { + fn inject_event(&mut self, event: CustomMessageOutcome) { + self.events.push(BehaviourOut::SubstrateAction(event)); } } -impl NetworkBehaviourEventProcess - for Behaviour - where TBehaviour: DiscoveryNetBehaviour { +impl, H: ExHashT> NetworkBehaviourEventProcess + for Behaviour { fn inject_event(&mut self, event: debug_info::DebugInfoEvent) { let debug_info::DebugInfoEvent::Identified { peer_id, mut info } = event; if !info.protocol_version.contains("substrate") { @@ -165,17 +167,16 @@ impl NetworkBehaviourEventProcess NetworkBehaviourEventProcess - for Behaviour - where TBehaviour: DiscoveryNetBehaviour { +impl, H: ExHashT> NetworkBehaviourEventProcess + for Behaviour { fn inject_event(&mut self, out: DiscoveryOut) { match out { DiscoveryOut::Discovered(peer_id) => { - self.user_protocol.0.add_discovered_nodes(iter::once(peer_id)); + self.substrate.add_discovered_nodes(iter::once(peer_id)); } DiscoveryOut::ValueFound(results) => { self.events.push(BehaviourOut::Dht(DhtEvent::ValueFound(results))); @@ -194,21 +195,20 @@ impl NetworkBehaviourEventProcess NetworkBehaviourEventProcess for - Behaviour - where TBehaviour: DiscoveryNetBehaviour { +impl, H: ExHashT> NetworkBehaviourEventProcess for + Behaviour { fn inject_event(&mut self, event: MdnsEvent) { match event { MdnsEvent::Discovered(list) => { - self.user_protocol.0.add_discovered_nodes(list.into_iter().map(|(peer_id, _)| peer_id)); + self.substrate.add_discovered_nodes(list.into_iter().map(|(peer_id, _)| peer_id)); }, MdnsEvent::Expired(_) => {} } } } -impl Behaviour { - fn poll(&mut self) -> Async>> { +impl, H: ExHashT> Behaviour { + fn poll(&mut self) -> Async>> { if !self.events.is_empty() { return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))) } @@ -216,71 +216,3 @@ impl Behaviour(TInner); -/// Event produced by `UserBehaviourWrap`. -pub struct UserEventWrap(TInner); -impl NetworkBehaviour for UserBehaviourWrap { - type ProtocolsHandler = TInner::ProtocolsHandler; - type OutEvent = UserEventWrap; - fn new_handler(&mut self) -> Self::ProtocolsHandler { self.0.new_handler() } - fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - self.0.addresses_of_peer(peer_id) - } - fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { - self.0.inject_connected(peer_id, endpoint) - } - fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) { - self.0.inject_disconnected(peer_id, endpoint) - } - fn inject_node_event( - &mut self, - peer_id: PeerId, - event: <::Handler as ProtocolsHandler>::OutEvent - ) { - self.0.inject_node_event(peer_id, event) - } - fn poll( - &mut self, - params: &mut impl PollParameters - ) -> Async< - NetworkBehaviourAction< - <::Handler as ProtocolsHandler>::InEvent, - Self::OutEvent - > - > { - match self.0.poll(params) { - Async::NotReady => Async::NotReady, - Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => - Async::Ready(NetworkBehaviourAction::GenerateEvent(UserEventWrap(ev))), - Async::Ready(NetworkBehaviourAction::DialAddress { address }) => - Async::Ready(NetworkBehaviourAction::DialAddress { address }), - Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => - Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }), - Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) => - Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }), - Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => - Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }), - } - } - fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) { - self.0.inject_replaced(peer_id, closed_endpoint, new_endpoint) - } - fn inject_addr_reach_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, error: &dyn std::error::Error) { - self.0.inject_addr_reach_failure(peer_id, addr, error) - } - fn inject_dial_failure(&mut self, peer_id: &PeerId) { - self.0.inject_dial_failure(peer_id) - } - fn inject_new_listen_addr(&mut self, addr: &Multiaddr) { - self.0.inject_new_listen_addr(addr) - } - fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { - self.0.inject_expired_listen_addr(addr) - } - fn inject_new_external_addr(&mut self, addr: &Multiaddr) { - self.0.inject_new_external_addr(addr) - } -} diff --git a/core/network/src/lib.rs b/core/network/src/lib.rs index 9b3d86322b5ee..dc2b7a8af12af 100644 --- a/core/network/src/lib.rs +++ b/core/network/src/lib.rs @@ -169,7 +169,6 @@ mod discovery; mod on_demand_layer; #[macro_use] mod protocol; -mod protocol_behaviour; mod service; mod transport; diff --git a/core/network/src/protocol.rs b/core/network/src/protocol.rs index 61f79225cd82a..83f59e2208bbf 100644 --- a/core/network/src/protocol.rs +++ b/core/network/src/protocol.rs @@ -14,8 +14,13 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . +use crate::{DiscoveryNetBehaviour, ProtocolId}; +use crate::custom_proto::{CustomProto, CustomProtoOut}; use futures::prelude::*; -use libp2p::PeerId; +use libp2p::{Multiaddr, PeerId}; +use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; +use libp2p::core::{nodes::Substream, muxing::StreamMuxerBox}; +use libp2p::core::protocols_handler::{ProtocolsHandler, IntoProtocolsHandler}; use primitives::storage::StorageKey; use consensus::{import_queue::IncomingBlock, import_queue::Origin, BlockOrigin}; use runtime_primitives::{generic::BlockId, ConsensusEngineId, Justification}; @@ -103,6 +108,13 @@ pub struct Protocol, H: ExHashT> { context_data: ContextData, // Connected peers pending Status message. handshaking_peers: HashMap, + /// Used to report reputation changes. + peerset_handle: peerset::PeersetHandle, + transaction_pool: Arc>, + /// When asked for a proof of finality, we use this struct to build one. + finality_proof_provider: Option>>, + /// Handles opening the unique substream and sending and receiving raw messages. + behaviour: CustomProto, Substream>, } /// A peer from whom we have received a Status message. @@ -146,27 +158,18 @@ pub struct PeerInfo { pub best_number: ::Number, } -/// Context passed as input to the methods of `protocol.rs` and that is used to communicate back -/// with the network. -pub trait NetworkOut { - /// Adjusts the reputation of the peer. Use this to point out that a peer has been malign or - /// irresponsible or appeared lazy. - fn report_peer(&mut self, who: PeerId, reputation: i32); - - /// Force disconnecting from a peer. - fn disconnect_peer(&mut self, who: PeerId); - - /// Send a message to a peer. - fn send_message(&mut self, who: PeerId, message: Message); +struct OnDemandIn<'a, B: BlockT> { + behaviour: &'a mut CustomProto, Substream>, + peerset: peerset::PeersetHandle, } -impl<'a, 'b, B: BlockT> OnDemandNetwork for &'a mut &'b mut dyn NetworkOut { +impl<'a, B: BlockT> OnDemandNetwork for OnDemandIn<'a, B> { fn report_peer(&mut self, who: &PeerId, reputation: i32) { - NetworkOut::report_peer(**self, who.clone(), reputation) + self.peerset.report_peer(who.clone(), reputation) } fn disconnect_peer(&mut self, who: &PeerId) { - NetworkOut::disconnect_peer(**self, who.clone()) + self.behaviour.disconnect_peer(who) } fn send_header_request(&mut self, who: &PeerId, id: RequestId, block: <::Header as HeaderT>::Number) { @@ -175,7 +178,7 @@ impl<'a, 'b, B: BlockT> OnDemandNetwork for &'a mut &'b mut dyn NetworkOut block, }); - NetworkOut::send_message(**self, who.clone(), message) + self.behaviour.send_packet(who, message) } fn send_read_request(&mut self, who: &PeerId, id: RequestId, block: ::Hash, key: Vec) { @@ -185,7 +188,7 @@ impl<'a, 'b, B: BlockT> OnDemandNetwork for &'a mut &'b mut dyn NetworkOut key, }); - NetworkOut::send_message(**self, who.clone(), message) + self.behaviour.send_packet(who, message) } fn send_read_child_request( @@ -203,7 +206,7 @@ impl<'a, 'b, B: BlockT> OnDemandNetwork for &'a mut &'b mut dyn NetworkOut key, }); - NetworkOut::send_message(**self, who.clone(), message) + self.behaviour.send_packet(who, message) } fn send_call_request( @@ -221,7 +224,7 @@ impl<'a, 'b, B: BlockT> OnDemandNetwork for &'a mut &'b mut dyn NetworkOut data, }); - NetworkOut::send_message(**self, who.clone(), message) + self.behaviour.send_packet(who, message) } fn send_changes_request( @@ -243,7 +246,7 @@ impl<'a, 'b, B: BlockT> OnDemandNetwork for &'a mut &'b mut dyn NetworkOut key, }); - NetworkOut::send_message(**self, who.clone(), message) + self.behaviour.send_packet(who, message) } fn send_body_request( @@ -265,7 +268,7 @@ impl<'a, 'b, B: BlockT> OnDemandNetwork for &'a mut &'b mut dyn NetworkOut max, }); - NetworkOut::send_message(**self, who.clone(), message) + self.behaviour.send_packet(who, message) } } @@ -287,29 +290,34 @@ pub trait Context { /// Protocol context. struct ProtocolContext<'a, B: 'a + BlockT, H: 'a + ExHashT> { - network_out: &'a mut dyn NetworkOut, + behaviour: &'a mut CustomProto, Substream>, context_data: &'a mut ContextData, + peerset_handle: &'a peerset::PeersetHandle, } impl<'a, B: BlockT + 'a, H: 'a + ExHashT> ProtocolContext<'a, B, H> { - fn new(context_data: &'a mut ContextData, network_out: &'a mut dyn NetworkOut) -> Self { - ProtocolContext { network_out, context_data } + fn new( + context_data: &'a mut ContextData, + behaviour: &'a mut CustomProto, Substream>, + peerset_handle: &'a peerset::PeersetHandle, + ) -> Self { + ProtocolContext { context_data, peerset_handle, behaviour } } } impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context for ProtocolContext<'a, B, H> { fn report_peer(&mut self, who: PeerId, reputation: i32) { - self.network_out.report_peer(who, reputation) + self.peerset_handle.report_peer(who, reputation) } fn disconnect_peer(&mut self, who: PeerId) { - self.network_out.disconnect_peer(who) + self.behaviour.disconnect_peer(&who) } fn send_consensus(&mut self, who: PeerId, consensus: ConsensusMessage) { send_message( + self.behaviour, &mut self.context_data.peers, - self.network_out, who, GenericMessage::Consensus(consensus) ) @@ -317,8 +325,8 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context for ProtocolContext<'a, B, fn send_chain_specific(&mut self, who: PeerId, message: Vec) { send_message( + self.behaviour, &mut self.context_data.peers, - self.network_out, who, GenericMessage::ChainSpecific(message) ) @@ -327,11 +335,11 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> Context for ProtocolContext<'a, B, impl<'a, B: BlockT + 'a, H: ExHashT + 'a> SyncContext for ProtocolContext<'a, B, H> { fn report_peer(&mut self, who: PeerId, reputation: i32) { - self.network_out.report_peer(who, reputation) + self.peerset_handle.report_peer(who, reputation) } fn disconnect_peer(&mut self, who: PeerId) { - self.network_out.disconnect_peer(who) + self.behaviour.disconnect_peer(&who) } fn client(&self) -> &dyn Client { @@ -340,8 +348,8 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> SyncContext for ProtocolContext<'a, fn send_finality_proof_request(&mut self, who: PeerId, request: FinalityProofRequestMessage) { send_message( + self.behaviour, &mut self.context_data.peers, - self.network_out, who, GenericMessage::FinalityProofRequest(request) ) @@ -349,8 +357,8 @@ impl<'a, B: BlockT + 'a, H: ExHashT + 'a> SyncContext for ProtocolContext<'a, fn send_block_request(&mut self, who: PeerId, request: BlockRequestMessage) { send_message( + self.behaviour, &mut self.context_data.peers, - self.network_out, who, GenericMessage::BlockRequest(request) ) @@ -386,10 +394,18 @@ impl, H: ExHashT> Protocol { chain: Arc>, checker: Arc>, specialization: S, - ) -> error::Result> { + transaction_pool: Arc>, + finality_proof_provider: Option>>, + protocol_id: ProtocolId, + peerset_config: peerset::PeersetConfig, + ) -> error::Result<(Protocol, peerset::PeersetHandle)> { let info = chain.info(); let sync = ChainSync::new(config.roles, &info); - Ok(Protocol { + let (peerset, peerset_handle) = peerset::Peerset::from_config(peerset_config); + let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::>(); + let behaviour = CustomProto::new(protocol_id, versions, peerset); + + let protocol = Protocol { tick_timeout: tokio_timer::Interval::new_interval(TICK_TIMEOUT), propagate_timeout: tokio_timer::Interval::new_interval(PROPAGATE_TIMEOUT), config: config, @@ -403,7 +419,48 @@ impl, H: ExHashT> Protocol { specialization: specialization, consensus_gossip: ConsensusGossip::new(), handshaking_peers: HashMap::new(), - }) + transaction_pool, + finality_proof_provider, + peerset_handle: peerset_handle.clone(), + behaviour, + }; + + Ok((protocol, peerset_handle)) + } + + /// Returns the list of all the peers we have an open channel to. + pub fn open_peers(&self) -> impl Iterator { + self.behaviour.open_peers() + } + + /// Returns true if we have a channel open with this node. + pub fn is_open(&self, peer_id: &PeerId) -> bool { + self.behaviour.is_open(peer_id) + } + + /// Disconnects the given peer if we are connected to it. + pub fn disconnect_peer(&mut self, peer_id: &PeerId) { + self.behaviour.disconnect_peer(peer_id) + } + + /// Returns true if we try to open protocols with the given peer. + pub fn is_enabled(&self, peer_id: &PeerId) -> bool { + self.behaviour.is_enabled(peer_id) + } + + /// Sends a message to a peer. + /// + /// Has no effect if the custom protocol is not open with the given peer. + /// + /// Also note that even we have a valid open substream, it may in fact be already closed + /// without us knowing, in which case the packet will not be received. + pub fn send_packet(&mut self, target: &PeerId, message: Message) { + self.behaviour.send_packet(target, message) + } + + /// Returns the state of the peerset manager, for debugging purposes. + pub fn peerset_debug_info(&mut self) -> serde_json::Value { + self.behaviour.peerset_debug_info() } /// Returns the number of peers we're connected to. @@ -438,24 +495,11 @@ impl, H: ExHashT> Protocol { /// Starts a new data demand request. /// /// The parameter contains a `Sender` where the result, once received, must be sent. - pub(crate) fn add_on_demand_request(&mut self, mut network_out: &mut dyn NetworkOut, rq: RequestData) { - self.on_demand_core.add_request(&mut network_out, rq); - } - - pub fn poll( - &mut self, - network_out: &mut dyn NetworkOut, - transaction_pool: &(impl TransactionPool + ?Sized) - ) -> Poll { - while let Ok(Async::Ready(_)) = self.tick_timeout.poll() { - self.tick(network_out); - } - - while let Ok(Async::Ready(_)) = self.propagate_timeout.poll() { - self.propagate_extrinsics(network_out, transaction_pool); - } - - Ok(Async::NotReady) + pub(crate) fn add_on_demand_request(&mut self, rq: RequestData) { + self.on_demand_core.add_request(OnDemandIn { + behaviour: &mut self.behaviour, + peerset: self.peerset_handle.clone(), + }, rq); } fn is_on_demand_response(&self, who: &PeerId, response_id: message::RequestId) -> bool { @@ -464,7 +508,6 @@ impl, H: ExHashT> Protocol { fn handle_response( &mut self, - network_out: &mut dyn NetworkOut, who: PeerId, response: &message::BlockResponse ) -> Option> { @@ -479,8 +522,8 @@ impl, H: ExHashT> Protocol { return request.map(|(_, r)| r) } trace!(target: "sync", "Unexpected response packet from {} ({})", who, response.id); - network_out.report_peer(who.clone(), i32::min_value()); - network_out.disconnect_peer(who); + self.peerset_handle.report_peer(who.clone(), i32::min_value()); + self.behaviour.disconnect_peer(&who); } None } @@ -505,64 +548,61 @@ impl, H: ExHashT> Protocol { pub fn on_custom_message( &mut self, - network_out: &mut dyn NetworkOut, - transaction_pool: &(impl TransactionPool + ?Sized), who: PeerId, message: Message, - finality_proof_provider: Option<&dyn FinalityProofProvider> ) -> CustomMessageOutcome { match message { - GenericMessage::Status(s) => self.on_status_message(network_out, who, s), - GenericMessage::BlockRequest(r) => self.on_block_request(network_out, who, r), + GenericMessage::Status(s) => self.on_status_message(who, s), + GenericMessage::BlockRequest(r) => self.on_block_request(who, r), GenericMessage::BlockResponse(r) => { // Note, this is safe because only `ordinary bodies` and `remote bodies` are received in this matter. if self.is_on_demand_response(&who, r.id) { - self.on_remote_body_response(network_out, who, r); + self.on_remote_body_response(who, r); } else { - if let Some(request) = self.handle_response(network_out, who.clone(), &r) { - let outcome = self.on_block_response(network_out, who.clone(), request, r); + if let Some(request) = self.handle_response(who.clone(), &r) { + let outcome = self.on_block_response(who.clone(), request, r); self.update_peer_info(&who); return outcome } } }, GenericMessage::BlockAnnounce(announce) => { - let outcome = self.on_block_announce(network_out, who.clone(), announce); + let outcome = self.on_block_announce(who.clone(), announce); self.update_peer_info(&who); return outcome; }, GenericMessage::Transactions(m) => - self.on_extrinsics(network_out, transaction_pool, who, m), - GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(network_out, who, request), + self.on_extrinsics(who, m), + GenericMessage::RemoteCallRequest(request) => self.on_remote_call_request(who, request), GenericMessage::RemoteCallResponse(response) => - self.on_remote_call_response(network_out, who, response), + self.on_remote_call_response(who, response), GenericMessage::RemoteReadRequest(request) => - self.on_remote_read_request(network_out, who, request), + self.on_remote_read_request(who, request), GenericMessage::RemoteReadResponse(response) => - self.on_remote_read_response(network_out, who, response), + self.on_remote_read_response(who, response), GenericMessage::RemoteHeaderRequest(request) => - self.on_remote_header_request(network_out, who, request), + self.on_remote_header_request(who, request), GenericMessage::RemoteHeaderResponse(response) => - self.on_remote_header_response(network_out, who, response), + self.on_remote_header_response(who, response), GenericMessage::RemoteChangesRequest(request) => - self.on_remote_changes_request(network_out, who, request), + self.on_remote_changes_request(who, request), GenericMessage::RemoteChangesResponse(response) => - self.on_remote_changes_response(network_out, who, response), + self.on_remote_changes_response(who, response), GenericMessage::FinalityProofRequest(request) => - self.on_finality_proof_request(network_out, who, request, finality_proof_provider), + self.on_finality_proof_request(who, request), GenericMessage::FinalityProofResponse(response) => - return self.on_finality_proof_response(network_out, who, response), + return self.on_finality_proof_response(who, response), GenericMessage::Consensus(msg) => { if self.context_data.peers.get(&who).map_or(false, |peer| peer.info.protocol_version > 2) { self.consensus_gossip.on_incoming( - &mut ProtocolContext::new(&mut self.context_data, network_out), + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), who, msg, ); } } other => self.specialization.on_message( - &mut ProtocolContext::new(&mut self.context_data, network_out), + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), who, &mut Some(other), ), @@ -571,10 +611,10 @@ impl, H: ExHashT> Protocol { CustomMessageOutcome::None } - fn send_message(&mut self, network_out: &mut dyn NetworkOut, who: PeerId, message: Message) { + fn send_message(&mut self, who: PeerId, message: Message) { send_message::( + &mut self.behaviour, &mut self.context_data.peers, - network_out, who, message, ); @@ -583,31 +623,28 @@ impl, H: ExHashT> Protocol { /// Locks `self` and returns a context plus the `ConsensusGossip` struct. pub fn consensus_gossip_lock<'a>( &'a mut self, - network_out: &'a mut dyn NetworkOut ) -> (impl Context + 'a, &'a mut ConsensusGossip) { - let context = ProtocolContext::new(&mut self.context_data, network_out); + let context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); (context, &mut self.consensus_gossip) } /// Locks `self` and returns a context plus the network specialization. pub fn specialization_lock<'a>( &'a mut self, - network_out: &'a mut dyn NetworkOut ) -> (impl Context + 'a, &'a mut S) { - let context = ProtocolContext::new(&mut self.context_data, network_out); + let context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); (context, &mut self.specialization) } /// Gossip a consensus message to the network. pub fn gossip_consensus_message( &mut self, - network_out: &mut dyn NetworkOut, topic: B::Hash, engine_id: ConsensusEngineId, message: Vec, recipient: GossipMessageRecipient, ) { - let mut context = ProtocolContext::new(&mut self.context_data, network_out); + let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); let message = ConsensusMessage { data: message, engine_id }; match recipient { GossipMessageRecipient::BroadcastToAll => @@ -615,19 +652,19 @@ impl, H: ExHashT> Protocol { GossipMessageRecipient::BroadcastNew => self.consensus_gossip.multicast(&mut context, topic, message, false), GossipMessageRecipient::Peer(who) => - self.send_message(network_out, who, GenericMessage::Consensus(message)), + self.send_message(who, GenericMessage::Consensus(message)), } } /// Called when a new peer is connected - pub fn on_peer_connected(&mut self, network_out: &mut dyn NetworkOut, who: PeerId) { + pub fn on_peer_connected(&mut self, who: PeerId) { trace!(target: "sync", "Connecting {}", who); self.handshaking_peers.insert(who.clone(), HandshakingPeer { timestamp: time::Instant::now() }); - self.send_status(network_out, who); + self.send_status(who); } /// Called by peer when it is disconnecting - pub fn on_peer_disconnected(&mut self, mut network_out: &mut dyn NetworkOut, peer: PeerId) { + pub fn on_peer_disconnected(&mut self, peer: PeerId) { trace!(target: "sync", "Disconnecting {}", peer); // lock all the the peer lists so that add/remove peer events are in order let removed = { @@ -635,20 +672,23 @@ impl, H: ExHashT> Protocol { self.context_data.peers.remove(&peer) }; if let Some(peer_data) = removed { - let mut context = ProtocolContext::new(&mut self.context_data, network_out); + let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); if peer_data.info.protocol_version > 2 { self.consensus_gossip.peer_disconnected(&mut context, peer.clone()); } self.sync.peer_disconnected(&mut context, peer.clone()); self.specialization.on_disconnect(&mut context, peer.clone()); - self.on_demand_core.on_disconnect(&mut network_out, peer); + self.on_demand_core.on_disconnect(OnDemandIn { + behaviour: &mut self.behaviour, + peerset: self.peerset_handle.clone(), + }, peer); } } /// Called as a back-pressure mechanism if the networking detects that the peer cannot process /// our messaging rate fast enough. - pub fn on_clogged_peer(&self, network_out: &mut dyn NetworkOut, who: PeerId, _msg: Option>) { - network_out.report_peer(who.clone(), CLOGGED_PEER_REPUTATION_CHANGE); + pub fn on_clogged_peer(&self, who: PeerId, _msg: Option>) { + self.peerset_handle.report_peer(who.clone(), CLOGGED_PEER_REPUTATION_CHANGE); // Print some diagnostics. if let Some(peer) = self.context_data.peers.get(&who) { @@ -663,7 +703,6 @@ impl, H: ExHashT> Protocol { fn on_block_request( &mut self, - network_out: &mut dyn NetworkOut, peer: PeerId, request: message::BlockRequest ) { @@ -677,8 +716,8 @@ impl, H: ExHashT> Protocol { // sending block requests to the node that is unable to serve it is considered a bad behavior if !self.config.roles.is_full() { trace!(target: "sync", "Peer {} is trying to sync from the light node", peer); - network_out.disconnect_peer(peer.clone()); - network_out.report_peer(peer, i32::min_value()); + self.behaviour.disconnect_peer(&peer); + self.peerset_handle.report_peer(peer, i32::min_value()); return; } @@ -736,12 +775,16 @@ impl, H: ExHashT> Protocol { blocks: blocks, }; trace!(target: "sync", "Sending BlockResponse with {} blocks", response.blocks.len()); - self.send_message(network_out, peer, GenericMessage::BlockResponse(response)) + self.send_message(peer, GenericMessage::BlockResponse(response)) + } + + /// Adjusts the reputation of a node. + pub fn report_peer(&self, who: PeerId, reputation: i32) { + self.peerset_handle.report_peer(who, reputation) } fn on_block_response( &mut self, - network_out: &mut dyn NetworkOut, peer: PeerId, request: message::BlockRequest, response: message::BlockResponse, @@ -765,7 +808,7 @@ impl, H: ExHashT> Protocol { // justifications are imported asynchronously (#1482) if request.fields == message::BlockAttributes::JUSTIFICATION { let outcome = self.sync.on_block_justification_data( - &mut ProtocolContext::new(&mut self.context_data, network_out), + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), peer, response ); @@ -778,7 +821,7 @@ impl, H: ExHashT> Protocol { } else { let outcome = self.sync.on_block_data( - &mut ProtocolContext::new(&mut self.context_data, network_out), + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), peer, request, response @@ -794,14 +837,19 @@ impl, H: ExHashT> Protocol { /// Perform time based maintenance. /// /// > **Note**: This method normally doesn't have to be called except for testing purposes. - pub fn tick(&mut self, mut network_out: &mut dyn NetworkOut) { - self.consensus_gossip.tick(&mut ProtocolContext::new(&mut self.context_data, network_out)); - self.maintain_peers(network_out); - self.sync.tick(&mut ProtocolContext::new(&mut self.context_data, network_out)); - self.on_demand_core.maintain_peers(&mut network_out); + pub fn tick(&mut self) { + self.consensus_gossip.tick( + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle) + ); + self.maintain_peers(); + self.sync.tick(&mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle)); + self.on_demand_core.maintain_peers(OnDemandIn { + behaviour: &mut self.behaviour, + peerset: self.peerset_handle.clone(), + }); } - fn maintain_peers(&mut self, network_out: &mut dyn NetworkOut) { + fn maintain_peers(&mut self) { let tick = time::Instant::now(); let mut aborting = Vec::new(); { @@ -822,20 +870,22 @@ impl, H: ExHashT> Protocol { } } - self.specialization.maintain_peers(&mut ProtocolContext::new(&mut self.context_data, network_out)); + self.specialization.maintain_peers( + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle) + ); for p in aborting { - network_out.disconnect_peer(p.clone()); - network_out.report_peer(p, TIMEOUT_REPUTATION_CHANGE); + self.behaviour.disconnect_peer(&p); + self.peerset_handle.report_peer(p, TIMEOUT_REPUTATION_CHANGE); } } /// Called by peer to report status - fn on_status_message(&mut self, mut network_out: &mut dyn NetworkOut, who: PeerId, status: message::Status) { + fn on_status_message(&mut self, who: PeerId, status: message::Status) { trace!(target: "sync", "New peer {} {:?}", who, status); let protocol_version = { if self.context_data.peers.contains_key(&who) { debug!("Unexpected status packet from {}", who); - network_out.report_peer(who, UNEXPECTED_STATUS_REPUTATION_CHANGE); + self.peerset_handle.report_peer(who, UNEXPECTED_STATUS_REPUTATION_CHANGE); return; } if status.genesis_hash != self.genesis_hash { @@ -844,14 +894,14 @@ impl, H: ExHashT> Protocol { "Peer is on different chain (our genesis: {} theirs: {})", self.genesis_hash, status.genesis_hash ); - network_out.report_peer(who.clone(), i32::min_value()); - network_out.disconnect_peer(who); + self.peerset_handle.report_peer(who.clone(), i32::min_value()); + self.behaviour.disconnect_peer(&who); return; } if status.version < MIN_VERSION && CURRENT_VERSION < status.min_supported_version { trace!(target: "protocol", "Peer {:?} using unsupported protocol version {}", who, status.version); - network_out.report_peer(who.clone(), i32::min_value()); - network_out.disconnect_peer(who); + self.peerset_handle.report_peer(who.clone(), i32::min_value()); + self.behaviour.disconnect_peer(&who); return; } @@ -859,8 +909,8 @@ impl, H: ExHashT> Protocol { // we're not interested in light peers if status.roles.is_light() { debug!(target: "sync", "Peer {} is unable to serve light requests", who); - network_out.report_peer(who.clone(), i32::min_value()); - network_out.disconnect_peer(who); + self.peerset_handle.report_peer(who.clone(), i32::min_value()); + self.behaviour.disconnect_peer(&who); return; } @@ -876,8 +926,8 @@ impl, H: ExHashT> Protocol { .saturated_into::(); if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE { debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who); - network_out.report_peer(who.clone(), PEER_BEHIND_US_LIGHT_REPUTATION_CHANGE); - network_out.disconnect_peer(who); + self.peerset_handle.report_peer(who.clone(), PEER_BEHIND_US_LIGHT_REPUTATION_CHANGE); + self.behaviour.disconnect_peer(&who); return; } } @@ -914,8 +964,11 @@ impl, H: ExHashT> Protocol { }; let info = self.context_data.peers.get(&who).expect("We just inserted above; QED").info.clone(); - self.on_demand_core.on_connect(&mut network_out, who.clone(), status.roles, status.best_number); - let mut context = ProtocolContext::new(&mut self.context_data, network_out); + self.on_demand_core.on_connect(OnDemandIn { + behaviour: &mut self.behaviour, + peerset: self.peerset_handle.clone(), + }, who.clone(), status.roles, status.best_number); + let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); self.sync.new_peer(&mut context, who.clone(), info); if protocol_version > 2 { self.consensus_gossip.new_peer(&mut context, who.clone(), status.roles); @@ -926,8 +979,6 @@ impl, H: ExHashT> Protocol { /// Called when peer sends us new extrinsics fn on_extrinsics( &mut self, - network_out: &mut dyn NetworkOut, - transaction_pool: &(impl TransactionPool + ?Sized), who: PeerId, extrinsics: message::Transactions ) { @@ -939,8 +990,8 @@ impl, H: ExHashT> Protocol { trace!(target: "sync", "Received {} extrinsics from {}", extrinsics.len(), who); if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) { for t in extrinsics { - if let Some(hash) = transaction_pool.import(&t) { - network_out.report_peer(who.clone(), NEW_EXTRINSIC_REPUTATION_CHANGE); + if let Some(hash) = self.transaction_pool.import(&t) { + self.peerset_handle.report_peer(who.clone(), NEW_EXTRINSIC_REPUTATION_CHANGE); peer.known_extrinsics.insert(hash); } else { trace!(target: "sync", "Extrinsic rejected"); @@ -952,8 +1003,6 @@ impl, H: ExHashT> Protocol { /// Call when we must propagate ready extrinsics to peers. pub fn propagate_extrinsics( &mut self, - network_out: &mut dyn NetworkOut, - transaction_pool: &(impl TransactionPool + ?Sized) ) { debug!(target: "sync", "Propagating extrinsics"); @@ -962,7 +1011,7 @@ impl, H: ExHashT> Protocol { return; } - let extrinsics = transaction_pool.transactions(); + let extrinsics = self.transaction_pool.transactions(); let mut propagated_to = HashMap::new(); for (who, peer) in self.context_data.peers.iter_mut() { let (hashes, to_send): (Vec<_>, Vec<_>) = extrinsics @@ -979,18 +1028,18 @@ impl, H: ExHashT> Protocol { .push(who.to_base58()); } trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who); - network_out.send_message(who.clone(), GenericMessage::Transactions(to_send)) + self.behaviour.send_packet(who, GenericMessage::Transactions(to_send)) } } - transaction_pool.on_broadcasted(propagated_to); + self.transaction_pool.on_broadcasted(propagated_to); } /// Make sure an important block is propagated to peers. /// /// In chain-based consensus, we often need to make sure non-best forks are /// at least temporarily synced. - pub fn announce_block(&mut self, network_out: &mut dyn NetworkOut, hash: B::Hash) { + pub fn announce_block(&mut self, hash: B::Hash) { let header = match self.context_data.chain.header(&BlockId::Hash(hash)) { Ok(Some(header)) => header, Ok(None) => { @@ -1009,12 +1058,12 @@ impl, H: ExHashT> Protocol { for (who, ref mut peer) in self.context_data.peers.iter_mut() { trace!(target: "sync", "Reannouncing block {:?} to {}", hash, who); peer.known_blocks.insert(hash); - network_out.send_message(who.clone(), message.clone()) + self.behaviour.send_packet(who, message.clone()) } } /// Send Status message - fn send_status(&mut self, network_out: &mut dyn NetworkOut, who: PeerId) { + fn send_status(&mut self, who: PeerId) { let info = self.context_data.chain.info(); let status = message::generic::Status { version: CURRENT_VERSION, @@ -1026,12 +1075,11 @@ impl, H: ExHashT> Protocol { chain_status: self.specialization.status(), }; - self.send_message(network_out, who, GenericMessage::Status(status)) + self.send_message(who, GenericMessage::Status(status)) } fn on_block_announce( &mut self, - mut network_out: &mut dyn NetworkOut, who: PeerId, announce: message::BlockAnnounce ) -> CustomMessageOutcome { @@ -1042,9 +1090,12 @@ impl, H: ExHashT> Protocol { peer.known_blocks.insert(hash.clone()); } } - self.on_demand_core.on_block_announce(&mut network_out, who.clone(), *header.number()); + self.on_demand_core.on_block_announce(OnDemandIn { + behaviour: &mut self.behaviour, + peerset: self.peerset_handle.clone(), + }, who.clone(), *header.number()); let try_import = self.sync.on_block_announce( - &mut ProtocolContext::new(&mut self.context_data, network_out), + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), who.clone(), hash, &header, @@ -1064,7 +1115,7 @@ impl, H: ExHashT> Protocol { // to import header from announced block let's construct response to request that normally would have // been sent over network (but it is not in our case) let blocks_to_import = self.sync.on_block_data( - &mut ProtocolContext::new(&mut self.context_data, network_out), + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), who.clone(), message::generic::BlockRequest { id: 0, @@ -1096,10 +1147,10 @@ impl, H: ExHashT> Protocol { /// Call this when a block has been imported in the import queue and we should announce it on /// the network. - pub fn on_block_imported(&mut self, network_out: &mut dyn NetworkOut, hash: B::Hash, header: &B::Header) { + pub fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header) { self.sync.update_chain_info(header); self.specialization.on_block_imported( - &mut ProtocolContext::new(&mut self.context_data, network_out), + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), hash.clone(), header, ); @@ -1116,24 +1167,23 @@ impl, H: ExHashT> Protocol { for (who, ref mut peer) in self.context_data.peers.iter_mut() { if peer.known_blocks.insert(hash.clone()) { trace!(target: "sync", "Announcing block {:?} to {}", hash, who); - network_out.send_message(who.clone(), message.clone()) + self.behaviour.send_packet(who, message.clone()) } } } /// Call this when a block has been finalized. The sync layer may have some additional /// requesting to perform. - pub fn on_block_finalized(&mut self, network_out: &mut dyn NetworkOut, hash: B::Hash, header: &B::Header) { + pub fn on_block_finalized(&mut self, hash: B::Hash, header: &B::Header) { self.sync.on_block_finalized( &hash, *header.number(), - &mut ProtocolContext::new(&mut self.context_data, network_out), + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), ); } fn on_remote_call_request( &mut self, - network_out: &mut dyn NetworkOut, who: PeerId, request: message::RemoteCallRequest, ) { @@ -1157,13 +1207,12 @@ impl, H: ExHashT> Protocol { request.block, error ); - network_out.report_peer(who.clone(), RPC_FAILED_REPUTATION_CHANGE); + self.peerset_handle.report_peer(who.clone(), RPC_FAILED_REPUTATION_CHANGE); Default::default() } }; self.send_message( - network_out, who, GenericMessage::RemoteCallResponse(message::RemoteCallResponse { id: request.id, @@ -1176,9 +1225,9 @@ impl, H: ExHashT> Protocol { /// /// Uses `protocol` to queue a new justification request and tries to dispatch all pending /// requests. - pub fn request_justification(&mut self, network_out: &mut dyn NetworkOut, hash: &B::Hash, number: NumberFor) { + pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { let mut context = - ProtocolContext::new(&mut self.context_data, network_out); + ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); self.sync.request_justification(&hash, number, &mut context); } @@ -1192,18 +1241,17 @@ impl, H: ExHashT> Protocol { /// errors. pub fn blocks_processed( &mut self, - network_out: &mut dyn NetworkOut, processed_blocks: Vec, has_error: bool ) { - let mut context = ProtocolContext::new(&mut self.context_data, network_out); + let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); self.sync.blocks_processed(&mut context, processed_blocks, has_error); } /// Restart the sync process. - pub fn restart(&mut self, network_out: &mut dyn NetworkOut) { + pub fn restart(&mut self) { let peers = self.context_data.peers.clone(); - let mut context = ProtocolContext::new(&mut self.context_data, network_out); + let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); self.sync.restart(&mut context, |peer_id| peers.get(peer_id).map(|i| i.info.clone())); } @@ -1227,11 +1275,10 @@ impl, H: ExHashT> Protocol { /// Queues a new finality proof request and tries to dispatch all pending requests. pub fn request_finality_proof( &mut self, - network_out: &mut dyn NetworkOut, hash: &B::Hash, number: NumberFor ) { - let mut context = ProtocolContext::new(&mut self.context_data, network_out); + let mut context = ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle); self.sync.request_finality_proof(&hash, number, &mut context); } @@ -1245,17 +1292,18 @@ impl, H: ExHashT> Protocol { fn on_remote_call_response( &mut self, - mut network_out: &mut dyn NetworkOut, who: PeerId, response: message::RemoteCallResponse ) { trace!(target: "sync", "Remote call response {} from {}", response.id, who); - self.on_demand_core.on_remote_call_response(&mut network_out, who, response); + self.on_demand_core.on_remote_call_response(OnDemandIn { + behaviour: &mut self.behaviour, + peerset: self.peerset_handle.clone(), + }, who, response); } fn on_remote_read_request( &mut self, - network_out: &mut dyn NetworkOut, who: PeerId, request: message::RemoteReadRequest, ) { @@ -1275,7 +1323,6 @@ impl, H: ExHashT> Protocol { } }; self.send_message( - network_out, who, GenericMessage::RemoteReadResponse(message::RemoteReadResponse { id: request.id, @@ -1286,17 +1333,18 @@ impl, H: ExHashT> Protocol { fn on_remote_read_response( &mut self, - mut network_out: &mut dyn NetworkOut, who: PeerId, response: message::RemoteReadResponse ) { trace!(target: "sync", "Remote read response {} from {}", response.id, who); - self.on_demand_core.on_remote_read_response(&mut network_out, who, response); + self.on_demand_core.on_remote_read_response(OnDemandIn { + behaviour: &mut self.behaviour, + peerset: self.peerset_handle.clone(), + }, who, response); } fn on_remote_header_request( &mut self, - network_out: &mut dyn NetworkOut, who: PeerId, request: message::RemoteHeaderRequest>, ) { @@ -1315,7 +1363,6 @@ impl, H: ExHashT> Protocol { } }; self.send_message( - network_out, who, GenericMessage::RemoteHeaderResponse(message::RemoteHeaderResponse { id: request.id, @@ -1327,17 +1374,18 @@ impl, H: ExHashT> Protocol { fn on_remote_header_response( &mut self, - mut network_out: &mut dyn NetworkOut, who: PeerId, response: message::RemoteHeaderResponse, ) { trace!(target: "sync", "Remote header proof response {} from {}", response.id, who); - self.on_demand_core.on_remote_header_response(&mut network_out, who, response); + self.on_demand_core.on_remote_header_response(OnDemandIn { + behaviour: &mut self.behaviour, + peerset: self.peerset_handle.clone(), + }, who, response); } fn on_remote_changes_request( &mut self, - network_out: &mut dyn NetworkOut, who: PeerId, request: message::RemoteChangesRequest, ) { @@ -1375,7 +1423,6 @@ impl, H: ExHashT> Protocol { } }; self.send_message( - network_out, who, GenericMessage::RemoteChangesResponse(message::RemoteChangesResponse { id: request.id, @@ -1389,7 +1436,6 @@ impl, H: ExHashT> Protocol { fn on_remote_changes_response( &mut self, - mut network_out: &mut dyn NetworkOut, who: PeerId, response: message::RemoteChangesResponse, B::Hash>, ) { @@ -1398,18 +1444,19 @@ impl, H: ExHashT> Protocol { who, response.max ); - self.on_demand_core.on_remote_changes_response(&mut network_out, who, response); + self.on_demand_core.on_remote_changes_response(OnDemandIn { + behaviour: &mut self.behaviour, + peerset: self.peerset_handle.clone(), + }, who, response); } fn on_finality_proof_request( &mut self, - network_out: &mut dyn NetworkOut, who: PeerId, request: message::FinalityProofRequest, - finality_proof_provider: Option<&dyn FinalityProofProvider> ) { trace!(target: "sync", "Finality proof request from {} for {}", who, request.block); - let finality_proof = finality_proof_provider.as_ref() + let finality_proof = self.finality_proof_provider.as_ref() .ok_or_else(|| String::from("Finality provider is not configured")) .and_then(|provider| provider.prove_finality(request.block, &request.request).map_err(|e| e.to_string()) @@ -1426,7 +1473,6 @@ impl, H: ExHashT> Protocol { }, }; self.send_message( - network_out, who, GenericMessage::FinalityProofResponse(message::FinalityProofResponse { id: 0, @@ -1438,13 +1484,12 @@ impl, H: ExHashT> Protocol { fn on_finality_proof_response( &mut self, - network_out: &mut dyn NetworkOut, who: PeerId, response: message::FinalityProofResponse, ) -> CustomMessageOutcome { trace!(target: "sync", "Finality proof response from {} for {}", who, response.block); let outcome = self.sync.on_block_finality_proof_data( - &mut ProtocolContext::new(&mut self.context_data, network_out), + &mut ProtocolContext::new(&mut self.context_data, &mut self.behaviour, &self.peerset_handle), who, response, ); @@ -1458,11 +1503,13 @@ impl, H: ExHashT> Protocol { fn on_remote_body_response( &mut self, - mut network_out: &mut dyn NetworkOut, peer: PeerId, response: message::BlockResponse ) { - self.on_demand_core.on_remote_body_response(&mut network_out, peer, response); + self.on_demand_core.on_remote_body_response(OnDemandIn { + behaviour: &mut self.behaviour, + peerset: self.peerset_handle.clone(), + }, peer, response); } } @@ -1476,8 +1523,8 @@ pub enum CustomMessageOutcome { } fn send_message( + behaviour: &mut CustomProto, Substream>, peers: &mut HashMap>, - network_out: &mut dyn NetworkOut, who: PeerId, mut message: Message, ) { @@ -1492,5 +1539,132 @@ fn send_message( peer.block_request = Some((time::Instant::now(), r.clone())); } } - network_out.send_message(who, message); + behaviour.send_packet(&who, message); +} + +impl, H: ExHashT> NetworkBehaviour for +Protocol { + type ProtocolsHandler = , Substream> as NetworkBehaviour>::ProtocolsHandler; + type OutEvent = CustomMessageOutcome; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + self.behaviour.new_handler() + } + + fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { + self.behaviour.addresses_of_peer(peer_id) + } + + fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { + self.behaviour.inject_connected(peer_id, endpoint) + } + + fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) { + self.behaviour.inject_disconnected(peer_id, endpoint) + } + + fn inject_node_event( + &mut self, + peer_id: PeerId, + event: <::Handler as ProtocolsHandler>::OutEvent, + ) { + self.behaviour.inject_node_event(peer_id, event) + } + + fn poll( + &mut self, + params: &mut impl PollParameters, + ) -> Async< + NetworkBehaviourAction< + <::Handler as ProtocolsHandler>::InEvent, + Self::OutEvent + > + > { + while let Ok(Async::Ready(_)) = self.tick_timeout.poll() { + self.tick(); + } + + while let Ok(Async::Ready(_)) = self.propagate_timeout.poll() { + self.propagate_extrinsics(); + } + + let event = match self.behaviour.poll(params) { + Async::NotReady => return Async::NotReady, + Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => ev, + Async::Ready(NetworkBehaviourAction::DialAddress { address }) => + return Async::Ready(NetworkBehaviourAction::DialAddress { address }), + Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => + return Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }), + Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) => + return Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }), + Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => + return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }), + }; + + let outcome = match event { + CustomProtoOut::CustomProtocolOpen { peer_id, version, .. } => { + debug_assert!( + version <= CURRENT_VERSION as u8 + && version >= MIN_VERSION as u8 + ); + self.on_peer_connected(peer_id); + CustomMessageOutcome::None + } + CustomProtoOut::CustomProtocolClosed { peer_id, .. } => { + self.on_peer_disconnected(peer_id); + CustomMessageOutcome::None + }, + CustomProtoOut::CustomMessage { peer_id, message } => + self.on_custom_message(peer_id, message), + CustomProtoOut::Clogged { peer_id, messages } => { + debug!(target: "sync", "{} clogging messages:", messages.len()); + for msg in messages.into_iter().take(5) { + debug!(target: "sync", "{:?}", msg); + self.on_clogged_peer(peer_id.clone(), Some(msg)); + } + CustomMessageOutcome::None + } + }; + + if let CustomMessageOutcome::None = outcome { + Async::NotReady + } else { + Async::Ready(NetworkBehaviourAction::GenerateEvent(outcome)) + } + } + + fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) { + self.behaviour.inject_replaced(peer_id, closed_endpoint, new_endpoint) + } + + fn inject_addr_reach_failure( + &mut self, + peer_id: Option<&PeerId>, + addr: &Multiaddr, + error: &dyn std::error::Error + ) { + self.behaviour.inject_addr_reach_failure(peer_id, addr, error) + } + + fn inject_dial_failure(&mut self, peer_id: &PeerId) { + self.behaviour.inject_dial_failure(peer_id) + } + + fn inject_new_listen_addr(&mut self, addr: &Multiaddr) { + self.behaviour.inject_new_listen_addr(addr) + } + + fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { + self.behaviour.inject_expired_listen_addr(addr) + } + + fn inject_new_external_addr(&mut self, addr: &Multiaddr) { + self.behaviour.inject_new_external_addr(addr) + } +} + +impl, H: ExHashT> DiscoveryNetBehaviour for Protocol { + fn add_discovered_nodes(&mut self, peer_ids: impl Iterator) { + self.behaviour.add_discovered_nodes(peer_ids) + } } diff --git a/core/network/src/protocol_behaviour.rs b/core/network/src/protocol_behaviour.rs deleted file mode 100644 index 70fb487055772..0000000000000 --- a/core/network/src/protocol_behaviour.rs +++ /dev/null @@ -1,468 +0,0 @@ -// Copyright 2019 Parity Technologies (UK) Ltd. -// This file is part of Substrate. - -// Substrate is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Substrate is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Substrate. If not, see . - -//! Implementation of libp2p's `NetworkBehaviour` trait that handles everything Substrate-specific. - -use crate::{ExHashT, DiscoveryNetBehaviour, ProtocolId}; -use crate::custom_proto::{CustomProto, CustomProtoOut}; -use crate::chain::{Client, FinalityProofProvider}; -use crate::protocol::{self, event::Event, CustomMessageOutcome, Protocol, ProtocolConfig, sync::SyncState}; -use crate::protocol::{PeerInfo, NetworkOut, message::Message, on_demand::RequestData}; -use crate::protocol::consensus_gossip::MessageRecipient as GossipMessageRecipient; -use crate::protocol::specialization::NetworkSpecialization; -use crate::service::TransactionPool; - -use client::light::fetcher::FetchChecker; -use futures::prelude::*; -use consensus::import_queue::SharedFinalityProofRequestBuilder; -use log::debug; -use libp2p::{PeerId, Multiaddr}; -use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; -use libp2p::core::{nodes::Substream, muxing::StreamMuxerBox}; -use libp2p::core::protocols_handler::{ProtocolsHandler, IntoProtocolsHandler}; -use runtime_primitives::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId}; -use std::sync::Arc; - -/// Implementation of `NetworkBehaviour` that handles everything related to Substrate and Polkadot. -pub struct ProtocolBehaviour, H: ExHashT> { - /// Handles opening the unique substream and sending and receiving raw messages. - behaviour: CustomProto, Substream>, - /// Handles the logic behind the raw messages that we receive. - protocol: Protocol, - /// Used to report reputation changes. - peerset_handle: peerset::PeersetHandle, - transaction_pool: Arc>, - /// When asked for a proof of finality, we use this struct to build one. - finality_proof_provider: Option>>, -} - -impl, H: ExHashT> ProtocolBehaviour { - /// Builds a new `ProtocolBehaviour`. - pub fn new( - config: ProtocolConfig, - chain: Arc>, - checker: Arc>, - specialization: S, - transaction_pool: Arc>, - finality_proof_provider: Option>>, - protocol_id: ProtocolId, - versions: &[u8], - peerset: peerset::Peerset, - peerset_handle: peerset::PeersetHandle, - ) -> crate::error::Result { - let protocol = Protocol::new(config, chain, checker, specialization)?; - let behaviour = CustomProto::new(protocol_id, versions, peerset); - - Ok(ProtocolBehaviour { - protocol, - behaviour, - peerset_handle, - transaction_pool, - finality_proof_provider, - }) - } - - /// Returns the list of all the peers we have an open channel to. - pub fn open_peers(&self) -> impl Iterator { - self.behaviour.open_peers() - } - - /// Returns true if we have a channel open with this node. - pub fn is_open(&self, peer_id: &PeerId) -> bool { - self.behaviour.is_open(peer_id) - } - - /// Disconnects the given peer if we are connected to it. - pub fn disconnect_peer(&mut self, peer_id: &PeerId) { - self.behaviour.disconnect_peer(peer_id) - } - - /// Adjusts the reputation of a node. - pub fn report_peer(&mut self, who: PeerId, reputation: i32) { - self.peerset_handle.report_peer(who, reputation) - } - - /// Returns true if we try to open protocols with the given peer. - pub fn is_enabled(&self, peer_id: &PeerId) -> bool { - self.behaviour.is_enabled(peer_id) - } - - /// Sends a message to a peer. - /// - /// Has no effect if the custom protocol is not open with the given peer. - /// - /// Also note that even we have a valid open substream, it may in fact be already closed - /// without us knowing, in which case the packet will not be received. - pub fn send_packet(&mut self, target: &PeerId, message: Message) { - self.behaviour.send_packet(target, message) - } - - /// Returns the state of the peerset manager, for debugging purposes. - pub fn peerset_debug_info(&mut self) -> serde_json::Value { - self.behaviour.peerset_debug_info() - } - - /// Returns the number of peers we're connected to. - pub fn num_connected_peers(&self) -> usize { - self.protocol.num_connected_peers() - } - - /// Returns the number of peers we're connected to and that are being queried. - pub fn num_active_peers(&self) -> usize { - self.protocol.num_active_peers() - } - - /// Current global sync state. - pub fn sync_state(&self) -> SyncState { - self.protocol.sync_state() - } - - /// Target sync block number. - pub fn best_seen_block(&self) -> Option> { - self.protocol.best_seen_block() - } - - /// Number of peers participating in syncing. - pub fn num_sync_peers(&self) -> u32 { - self.protocol.num_sync_peers() - } - - /// Starts a new data demand request. - /// - /// The parameter contains a `Sender` where the result, once received, must be sent. - pub(crate) fn add_on_demand_request(&mut self, rq: RequestData) { - self.protocol.add_on_demand_request( - &mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }, - rq - ); - } - - /// Returns information about all the peers we are connected to after the handshake message. - pub fn peers_info(&self) -> impl Iterator)> { - self.protocol.peers_info() - } - - /// Locks `self` and gives access to the protocol and a context that can be used in order to - /// use `consensus_gossip_lock` or `specialization_lock`. - /// - /// **Important**: ONLY USE THIS FUNCTION TO CALL `consensus_gossip_lock` or `specialization_lock`. - /// This function is a very bad API. - pub fn protocol_context_lock<'a>( - &'a mut self, - ) -> (&'a mut Protocol, LocalNetworkOut<'a, B>) { - let net_out = LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }; - (&mut self.protocol, net_out) - } - - /// Gossip a consensus message to the network. - pub fn gossip_consensus_message( - &mut self, - topic: B::Hash, - engine_id: ConsensusEngineId, - message: Vec, - recipient: GossipMessageRecipient, - ) { - self.protocol.gossip_consensus_message( - &mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }, - topic, - engine_id, - message, - recipient - ); - } - - /// Call when we must propagate ready extrinsics to peers. - pub fn propagate_extrinsics(&mut self) { - self.protocol.propagate_extrinsics( - &mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }, - &*self.transaction_pool - ) - } - - /// Make sure an important block is propagated to peers. - /// - /// In chain-based consensus, we often need to make sure non-best forks are - /// at least temporarily synced. - pub fn announce_block(&mut self, hash: B::Hash) { - self.protocol.announce_block( - &mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }, - hash - ) - } - - /// Call this when a block has been imported in the import queue and we should announce it on - /// the network. - pub fn on_block_imported(&mut self, hash: B::Hash, header: &B::Header) { - self.protocol.on_block_imported( - &mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }, - hash, - header - ) - } - - /// Call this when a block has been finalized. The sync layer may have some additional - /// requesting to perform. - pub fn on_block_finalized(&mut self, hash: B::Hash, header: &B::Header) { - self.protocol.on_block_finalized( - &mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }, - hash, - header - ) - } - - /// Request a justification for the given block. - /// - /// Uses `protocol` to queue a new justification request and tries to dispatch all pending - /// requests. - pub fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { - self.protocol.request_justification( - &mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }, - hash, - number - ) - } - - /// Clears all pending justification requests. - pub fn clear_justification_requests(&mut self) { - self.protocol.clear_justification_requests() - } - - /// A batch of blocks have been processed, with or without errors. - /// Call this when a batch of blocks have been processed by the import queue, with or without - /// errors. - pub fn blocks_processed( - &mut self, - processed_blocks: Vec, - has_error: bool, - ) { - self.protocol.blocks_processed( - &mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }, - processed_blocks, - has_error, - ) - } - - /// Restart the sync process. - pub fn restart(&mut self) { - let mut net_out = LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }; - self.protocol.restart(&mut net_out); - } - - /// Notify about successful import of the given block. - pub fn block_imported(&mut self, hash: &B::Hash, number: NumberFor) { - self.protocol.block_imported(hash, number) - } - - pub fn set_finality_proof_request_builder(&mut self, request_builder: SharedFinalityProofRequestBuilder) { - self.protocol.set_finality_proof_request_builder(request_builder) - } - - /// Call this when a justification has been processed by the import queue, with or without - /// errors. - pub fn justification_import_result(&mut self, hash: B::Hash, number: NumberFor, success: bool) { - self.protocol.justification_import_result(hash, number, success) - } - - /// The networking-level event has happened. - pub fn on_event(&mut self, event: Event) { - self.protocol.on_event(event); - } - - /// Request a finality proof for the given block. - /// - /// Queues a new finality proof request and tries to dispatch all pending requests. - pub fn request_finality_proof( - &mut self, - hash: &B::Hash, - number: NumberFor, - ) { - self.protocol.request_finality_proof( - &mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }, - &hash, - number, - ); - } - - pub fn finality_proof_import_result( - &mut self, - request_block: (B::Hash, NumberFor), - finalization_result: Result<(B::Hash, NumberFor), ()>, - ) { - self.protocol.finality_proof_import_result(request_block, finalization_result) - } - - pub fn tick(&mut self) { - self.protocol.tick(&mut LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }); - } -} - -impl, H: ExHashT> NetworkBehaviour for -ProtocolBehaviour { - type ProtocolsHandler = , Substream> as NetworkBehaviour>::ProtocolsHandler; - type OutEvent = CustomMessageOutcome; - - fn new_handler(&mut self) -> Self::ProtocolsHandler { - self.behaviour.new_handler() - } - - fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec { - self.behaviour.addresses_of_peer(peer_id) - } - - fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { - self.behaviour.inject_connected(peer_id, endpoint) - } - - fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) { - self.behaviour.inject_disconnected(peer_id, endpoint) - } - - fn inject_node_event( - &mut self, - peer_id: PeerId, - event: <::Handler as ProtocolsHandler>::OutEvent, - ) { - self.behaviour.inject_node_event(peer_id, event) - } - - fn poll( - &mut self, - params: &mut impl PollParameters, - ) -> Async< - NetworkBehaviourAction< - <::Handler as ProtocolsHandler>::InEvent, - Self::OutEvent - > - > { - let mut net_out = LocalNetworkOut { inner: &mut self.behaviour, peerset_handle: &self.peerset_handle }; - match self.protocol.poll(&mut net_out, &*self.transaction_pool) { - Ok(Async::Ready(v)) => void::unreachable(v), - Ok(Async::NotReady) => {} - Err(err) => void::unreachable(err), - } - - let event = match self.behaviour.poll(params) { - Async::NotReady => return Async::NotReady, - Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) => ev, - Async::Ready(NetworkBehaviourAction::DialAddress { address }) => - return Async::Ready(NetworkBehaviourAction::DialAddress { address }), - Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => - return Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }), - Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) => - return Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }), - Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) => - return Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }), - }; - - let mut network_out = LocalNetworkOut { - inner: &mut self.behaviour, - peerset_handle: &self.peerset_handle, - }; - - let outcome = match event { - CustomProtoOut::CustomProtocolOpen { peer_id, version, .. } => { - debug_assert!( - version <= protocol::CURRENT_VERSION as u8 - && version >= protocol::MIN_VERSION as u8 - ); - self.protocol.on_peer_connected(&mut network_out, peer_id); - CustomMessageOutcome::None - } - CustomProtoOut::CustomProtocolClosed { peer_id, .. } => { - self.protocol.on_peer_disconnected(&mut network_out, peer_id); - CustomMessageOutcome::None - }, - CustomProtoOut::CustomMessage { peer_id, message } => - self.protocol.on_custom_message( - &mut network_out, - &*self.transaction_pool, - peer_id, - message, - self.finality_proof_provider.as_ref().map(|p| &**p) - ), - CustomProtoOut::Clogged { peer_id, messages } => { - debug!(target: "sync", "{} clogging messages:", messages.len()); - for msg in messages.into_iter().take(5) { - debug!(target: "sync", "{:?}", msg); - self.protocol.on_clogged_peer(&mut network_out, peer_id.clone(), Some(msg)); - } - CustomMessageOutcome::None - } - }; - - if let CustomMessageOutcome::None = outcome { - Async::NotReady - } else { - Async::Ready(NetworkBehaviourAction::GenerateEvent(outcome)) - } - } - - fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) { - self.behaviour.inject_replaced(peer_id, closed_endpoint, new_endpoint) - } - - fn inject_addr_reach_failure( - &mut self, - peer_id: Option<&PeerId>, - addr: &Multiaddr, - error: &dyn std::error::Error - ) { - self.behaviour.inject_addr_reach_failure(peer_id, addr, error) - } - - fn inject_dial_failure(&mut self, peer_id: &PeerId) { - self.behaviour.inject_dial_failure(peer_id) - } - - fn inject_new_listen_addr(&mut self, addr: &Multiaddr) { - self.behaviour.inject_new_listen_addr(addr) - } - - fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { - self.behaviour.inject_expired_listen_addr(addr) - } - - fn inject_new_external_addr(&mut self, addr: &Multiaddr) { - self.behaviour.inject_new_external_addr(addr) - } -} - -impl, H: ExHashT> DiscoveryNetBehaviour - for ProtocolBehaviour { - fn add_discovered_nodes(&mut self, peer_ids: impl Iterator) { - self.behaviour.add_discovered_nodes(peer_ids) - } -} - -/// Has to be public for stupid API reasons. This should be made private again ASAP. -pub struct LocalNetworkOut<'a, B: BlockT> { - inner: &'a mut CustomProto, Substream>, - peerset_handle: &'a peerset::PeersetHandle, -} - -impl<'a, B: BlockT> NetworkOut for LocalNetworkOut<'a, B> { - fn report_peer(&mut self, who: PeerId, reputation: i32) { - self.peerset_handle.report_peer(who, reputation) - } - - fn disconnect_peer(&mut self, who: PeerId) { - self.inner.disconnect_peer(&who) - } - - fn send_message(&mut self, who: PeerId, message: Message) { - self.inner.send_packet(&who, message) - } -} diff --git a/core/network/src/service.rs b/core/network/src/service.rs index fbe095e2ecdf9..5aece788d1610 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -22,11 +22,11 @@ use std::time::Duration; use log::{warn, error, info}; use libp2p::core::swarm::NetworkBehaviour; -use libp2p::core::{nodes::Substream, transport::boxed::Boxed, muxing::StreamMuxerBox}; +use libp2p::core::{transport::boxed::Boxed, muxing::StreamMuxerBox}; use libp2p::{Multiaddr, multihash::Multihash}; use futures::{prelude::*, sync::oneshot, sync::mpsc}; use parking_lot::{Mutex, RwLock}; -use crate::protocol_behaviour::ProtocolBehaviour; +use crate::protocol::Protocol; use crate::{behaviour::{Behaviour, BehaviourOut}, parse_str_addr}; use crate::{NetworkState, NetworkStateNotConnectedPeer, NetworkStatePeer}; use crate::{transport, config::NodeKeyConfig, config::NonReservedPeerMode}; @@ -149,14 +149,13 @@ impl, H: ExHashT> NetworkWorker } } - // Build the peerset. - let (peerset, peerset_handle) = peerset::Peerset::from_config(peerset::PeersetConfig { + let peerset_config = peerset::PeersetConfig { in_peers: params.network_config.in_peers, out_peers: params.network_config.out_peers, bootnodes, reserved_only: params.network_config.non_reserved_mode == NonReservedPeerMode::Deny, reserved_nodes, - }); + }; // Private and public keys configuration. if let NodeKeyConfig::Secp256k1(_) = params.network_config.node_key { @@ -171,7 +170,7 @@ impl, H: ExHashT> NetworkWorker let is_offline = Arc::new(AtomicBool::new(true)); let is_major_syncing = Arc::new(AtomicBool::new(false)); let peers: Arc>>> = Arc::new(Default::default()); - let protocol = ProtocolBehaviour::new( + let (protocol, peerset_handle) = Protocol::new( protocol::ProtocolConfig { roles: params.roles }, params.chain, params.on_demand.as_ref().map(|od| od.checker().clone()) @@ -180,9 +179,7 @@ impl, H: ExHashT> NetworkWorker params.transaction_pool, params.finality_proof_provider, params.protocol_id, - &((protocol::MIN_VERSION as u8)..=(protocol::CURRENT_VERSION as u8)).collect::>(), - peerset, - peerset_handle.clone(), + peerset_config, )?; // Build the swarm. @@ -727,13 +724,13 @@ impl, H: ExHashT> Future for Ne ProtocolMsg::BlockFinalized(hash, header) => network_service.user_protocol_mut().on_block_finalized(hash, &header), ProtocolMsg::ExecuteWithSpec(task) => { - let (protocol, mut net_out) = network_service.user_protocol_mut().protocol_context_lock(); - let (mut context, spec) = protocol.specialization_lock(&mut net_out); + let protocol = network_service.user_protocol_mut(); + let (mut context, spec) = protocol.specialization_lock(); task.call_box(spec, &mut context); }, ProtocolMsg::ExecuteWithGossip(task) => { - let (protocol, mut net_out) = network_service.user_protocol_mut().protocol_context_lock(); - let (mut context, gossip) = protocol.consensus_gossip_lock(&mut net_out); + let protocol = network_service.user_protocol_mut(); + let (mut context, gossip) = protocol.consensus_gossip_lock(); task.call_box(gossip, &mut context); } ProtocolMsg::GossipConsensusMessage(topic, engine_id, message, recipient) => @@ -774,7 +771,7 @@ impl, H: ExHashT> Future for Ne let outcome = match poll_value { Ok(Async::NotReady) => break, - Ok(Async::Ready(Some(BehaviourOut::Behaviour(outcome)))) => outcome, + Ok(Async::Ready(Some(BehaviourOut::SubstrateAction(outcome)))) => outcome, Ok(Async::Ready(Some(BehaviourOut::Dht(ev)))) => { network_service.user_protocol_mut() .on_event(Event::Dht(ev)); @@ -812,5 +809,5 @@ impl, H: ExHashT> Future for Ne /// The libp2p swarm, customized for our needs. type Swarm = libp2p::core::Swarm< Boxed<(PeerId, StreamMuxerBox), io::Error>, - Behaviour, CustomMessageOutcome, Substream> + Behaviour >;