diff --git a/protocols/autonat/src/behaviour.rs b/protocols/autonat/src/behaviour.rs index 5e6193b8700..06ce8e86e83 100644 --- a/protocols/autonat/src/behaviour.rs +++ b/protocols/autonat/src/behaviour.rs @@ -35,12 +35,13 @@ use libp2p_request_response::{ self as request_response, ProtocolSupport, RequestId, ResponseChannel, }; use libp2p_swarm::{ + behaviour::ToSwarm, behaviour::{ AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, ExpiredExternalAddr, ExpiredListenAddr, FromSwarm, }, ConnectionHandler, ExternalAddresses, IntoConnectionHandler, ListenAddresses, NetworkBehaviour, - NetworkBehaviourAction, PollParameters, + PollParameters, }; use std::{ collections::{HashMap, VecDeque}, @@ -209,12 +210,7 @@ pub struct Behaviour { last_probe: Option, - pending_actions: VecDeque< - NetworkBehaviourAction< - ::OutEvent, - ::ConnectionHandler, - >, - >, + pending_to_swarm: VecDeque>, probe_id: ProbeId, @@ -242,7 +238,7 @@ impl Behaviour { throttled_servers: Vec::new(), throttled_clients: Vec::new(), last_probe: None, - pending_actions: VecDeque::new(), + pending_to_swarm: VecDeque::new(), probe_id: ProbeId(0), listen_addresses: Default::default(), external_addresses: Default::default(), @@ -339,10 +335,8 @@ impl Behaviour { role_override: Endpoint::Dialer, } => { if let Some(event) = self.as_server().on_outbound_connection(&peer, address) { - self.pending_actions - .push_back(NetworkBehaviourAction::GenerateEvent(Event::InboundProbe( - event, - ))); + self.pending_to_swarm + .push_back(ToSwarm::::GenerateEvent(Event::InboundProbe(event))); } } ConnectedPoint::Dialer { @@ -402,10 +396,8 @@ impl Behaviour { error, })); if let Some(event) = self.as_server().on_outbound_dial_error(peer_id, error) { - self.pending_actions - .push_back(NetworkBehaviourAction::GenerateEvent(Event::InboundProbe( - event, - ))); + self.pending_to_swarm + .push_back(ToSwarm::::GenerateEvent(Event::InboundProbe(event))); } } @@ -438,14 +430,20 @@ impl NetworkBehaviour for Behaviour { as NetworkBehaviour>::ConnectionHandler; type OutEvent = Event; - fn poll(&mut self, cx: &mut Context<'_>, params: &mut impl PollParameters) -> Poll { + fn poll( + &mut self, + cx: &mut Context<'_>, + params: &mut impl PollParameters, + ) -> Poll> { loop { - if let Some(event) = self.pending_actions.pop_front() { + if let Some(event) = self.pending_to_swarm.pop_front() { return Poll::Ready(event); } match self.inner.poll(cx, params) { - Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => { + Poll::Ready( + ToSwarm::>::GenerateEvent(event), + ) => { let (events, action) = match event { request_response::Event::Message { message: request_response::Message::Response { .. }, @@ -464,16 +462,16 @@ impl NetworkBehaviour for Behaviour { request_response::Event::ResponseSent { .. } => (VecDeque::new(), None), }; - self.pending_actions.extend( + self.pending_to_swarm.extend( events .into_iter() - .map(NetworkBehaviourAction::GenerateEvent) + .map(ToSwarm::::GenerateEvent) .chain(action), ); continue; } Poll::Ready(action) => { - self.pending_actions + self.pending_to_swarm .push_back(action.map_out(|_| unreachable!())); continue; } @@ -482,10 +480,8 @@ impl NetworkBehaviour for Behaviour { match self.as_client().poll_auto_probe(cx) { Poll::Ready(event) => { - self.pending_actions - .push_back(NetworkBehaviourAction::GenerateEvent(Event::OutboundProbe( - event, - ))); + self.pending_to_swarm + .push_back(ToSwarm::::GenerateEvent(Event::OutboundProbe(event))); continue; } Poll::Pending => {} @@ -568,18 +564,13 @@ impl NetworkBehaviour for Behaviour { } } -type Action = NetworkBehaviourAction< - ::OutEvent, - ::ConnectionHandler, ->; - // Trait implemented for `AsClient` and `AsServer` to handle events from the inner [`request_response::Behaviour`] Protocol. trait HandleInnerEvent { fn handle_event( &mut self, params: &mut impl PollParameters, event: request_response::Event, - ) -> (VecDeque, Option); + ) -> (VecDeque, Option>); } trait GlobalIp { diff --git a/protocols/autonat/src/behaviour/as_client.rs b/protocols/autonat/src/behaviour/as_client.rs index 76ddebce2dc..f804961833e 100644 --- a/protocols/autonat/src/behaviour/as_client.rs +++ b/protocols/autonat/src/behaviour/as_client.rs @@ -18,20 +18,18 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::ResponseError; +use crate::{Behaviour, ResponseError}; use super::{ - Action, AutoNatCodec, Config, DialRequest, DialResponse, Event, HandleInnerEvent, NatStatus, - ProbeId, + AutoNatCodec, Config, DialRequest, DialResponse, Event, HandleInnerEvent, NatStatus, ProbeId, }; use futures::FutureExt; use futures_timer::Delay; use instant::Instant; use libp2p_core::{connection::ConnectionId, Multiaddr, PeerId}; use libp2p_request_response::{self as request_response, OutboundFailure, RequestId}; -use libp2p_swarm::{ - AddressScore, ExternalAddresses, ListenAddresses, NetworkBehaviourAction, PollParameters, -}; +use libp2p_swarm::behaviour::ToSwarm; +use libp2p_swarm::{AddressScore, ExternalAddresses, ListenAddresses, PollParameters}; use rand::{seq::SliceRandom, thread_rng}; use std::{ collections::{HashMap, VecDeque}, @@ -109,7 +107,7 @@ impl<'a> HandleInnerEvent for AsClient<'a> { &mut self, params: &mut impl PollParameters, event: request_response::Event, - ) -> (VecDeque, Option) { + ) -> (VecDeque, Option>) { let mut events = VecDeque::new(); let mut action = None; match event { @@ -158,7 +156,7 @@ impl<'a> HandleInnerEvent for AsClient<'a> { .find_map(|r| (r.addr == address).then_some(r.score)) .unwrap_or(AddressScore::Finite(0)); if let AddressScore::Finite(finite_score) = score { - action = Some(NetworkBehaviourAction::ReportObservedAddr { + action = Some(ToSwarm::::ReportObservedAddr { address, score: AddressScore::Finite(finite_score + 1), }); diff --git a/protocols/autonat/src/behaviour/as_server.rs b/protocols/autonat/src/behaviour/as_server.rs index 455ac3d16b3..1ea2a5cef08 100644 --- a/protocols/autonat/src/behaviour/as_server.rs +++ b/protocols/autonat/src/behaviour/as_server.rs @@ -19,17 +19,19 @@ // DEALINGS IN THE SOFTWARE. use super::{ - Action, AutoNatCodec, Config, DialRequest, DialResponse, Event, HandleInnerEvent, ProbeId, + AutoNatCodec, Config, DialRequest, DialResponse, Event, HandleInnerEvent, ProbeId, ResponseError, }; +use crate::Behaviour; use instant::Instant; use libp2p_core::{connection::ConnectionId, multiaddr::Protocol, Multiaddr, PeerId}; use libp2p_request_response::{ self as request_response, InboundFailure, RequestId, ResponseChannel, }; +use libp2p_swarm::behaviour::ToSwarm; use libp2p_swarm::{ dial_opts::{DialOpts, PeerCondition}, - DialError, NetworkBehaviour, NetworkBehaviourAction, PollParameters, + DialError, NetworkBehaviour, PollParameters, }; use std::{ collections::{HashMap, HashSet, VecDeque}, @@ -98,7 +100,7 @@ impl<'a> HandleInnerEvent for AsServer<'a> { &mut self, _params: &mut impl PollParameters, event: request_response::Event, - ) -> (VecDeque, Option) { + ) -> (VecDeque, Option>) { let mut events = VecDeque::new(); let mut action = None; match event { @@ -130,7 +132,7 @@ impl<'a> HandleInnerEvent for AsServer<'a> { addresses: addrs.clone(), })); - action = Some(NetworkBehaviourAction::Dial { + action = Some(ToSwarm::::Dial { opts: DialOpts::peer_id(peer) .condition(PeerCondition::Always) .override_dial_concurrency_factor(NonZeroU8::new(1).expect("1 > 0")) diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour.rs index 15dfe078bfe..c0a7f5d5225 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour.rs @@ -26,11 +26,13 @@ use either::Either; use libp2p_core::connection::{ConnectedPoint, ConnectionId}; use libp2p_core::multiaddr::Protocol; use libp2p_core::{Multiaddr, PeerId}; -use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; +use libp2p_swarm::behaviour::{ + ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm, ToSwarm, +}; use libp2p_swarm::dial_opts::{self, DialOpts}; use libp2p_swarm::{ ConnectionHandler, ConnectionHandlerUpgrErr, ExternalAddresses, IntoConnectionHandler, - NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, + NetworkBehaviour, NotifyHandler, PollParameters, }; use std::collections::{HashMap, HashSet, VecDeque}; use std::task::{Context, Poll}; @@ -68,7 +70,7 @@ pub enum UpgradeError { pub struct Behaviour { /// Queue of actions to return when polled. - queued_actions: VecDeque, + queued_events: VecDeque, /// All direct (non-relayed) connections. direct_connections: HashMap>, @@ -81,7 +83,7 @@ pub struct Behaviour { impl Behaviour { pub fn new(local_peer_id: PeerId) -> Self { Behaviour { - queued_actions: Default::default(), + queued_events: Default::default(), direct_connections: Default::default(), external_addresses: Default::default(), local_peer_id, @@ -108,22 +110,19 @@ impl Behaviour { // connection upgrade by initiating a direct connection to A. // // https://github.com/libp2p/specs/blob/master/relay/DCUtR.md#the-protocol - self.queued_actions.extend([ - ActionBuilder::Connect { + self.queued_events.extend([ + ToSwarmBuilder::Connect { peer_id, attempt: 1, handler: NotifyHandler::One(connection_id), }, - NetworkBehaviourAction::GenerateEvent( - Event::InitiatedDirectConnectionUpgrade { - remote_peer_id: peer_id, - local_relayed_addr: match connected_point { - ConnectedPoint::Listener { local_addr, .. } => local_addr.clone(), - ConnectedPoint::Dialer { .. } => unreachable!("Due to outer if."), - }, + ToSwarmBuilder::from(Event::InitiatedDirectConnectionUpgrade { + remote_peer_id: peer_id, + local_relayed_addr: match connected_point { + ConnectedPoint::Listener { local_addr, .. } => local_addr.clone(), + ConnectedPoint::Dialer { .. } => unreachable!("Due to outer if."), }, - ) - .into(), + }), ]); } } else { @@ -147,14 +146,14 @@ impl Behaviour { { let peer_id = peer_id.expect("Peer of `Prototype::DirectConnection` is always known."); if attempt < MAX_NUMBER_OF_UPGRADE_ATTEMPTS { - self.queued_actions.push_back(ActionBuilder::Connect { + self.queued_events.push_back(ToSwarmBuilder::Connect { peer_id, handler: NotifyHandler::One(relayed_connection_id), attempt: attempt + 1, }); } else { - self.queued_actions.extend([ - NetworkBehaviourAction::NotifyHandler { + self.queued_events.extend([ + ToSwarm::::NotifyHandler { peer_id, handler: NotifyHandler::One(relayed_connection_id), event: Either::Left( @@ -162,7 +161,7 @@ impl Behaviour { ), } .into(), - NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed { + ToSwarm::::GenerateEvent(Event::DirectConnectionUpgradeFailed { remote_peer_id: peer_id, error: UpgradeError::Dial, }) @@ -217,24 +216,22 @@ impl NetworkBehaviour for Behaviour { inbound_connect, remote_addr, }) => { - self.queued_actions.extend([ - ActionBuilder::AcceptInboundConnect { + self.queued_events.extend([ + ToSwarmBuilder::AcceptInboundConnect { peer_id: event_source, handler: NotifyHandler::One(connection), inbound_connect, }, - NetworkBehaviourAction::GenerateEvent( - Event::RemoteInitiatedDirectConnectionUpgrade { - remote_peer_id: event_source, - remote_relayed_addr: remote_addr, - }, - ) + ToSwarm::::GenerateEvent(Event::RemoteInitiatedDirectConnectionUpgrade { + remote_peer_id: event_source, + remote_relayed_addr: remote_addr, + }) .into(), ]); } Either::Left(handler::relayed::Event::InboundNegotiationFailed { error }) => { - self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed { + self.queued_events.push_back( + ToSwarm::::GenerateEvent(Event::DirectConnectionUpgradeFailed { remote_peer_id: event_source, error: UpgradeError::Handler(error), }) @@ -242,8 +239,8 @@ impl NetworkBehaviour for Behaviour { ); } Either::Left(handler::relayed::Event::InboundConnectNegotiated(remote_addrs)) => { - self.queued_actions.push_back( - NetworkBehaviourAction::Dial { + self.queued_events.push_back( + ToSwarm::::Dial { opts: DialOpts::peer_id(event_source) .addresses(remote_addrs) .condition(dial_opts::PeerCondition::Always) @@ -257,8 +254,8 @@ impl NetworkBehaviour for Behaviour { ); } Either::Left(handler::relayed::Event::OutboundNegotiationFailed { error }) => { - self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed { + self.queued_events.push_back( + ToSwarm::::GenerateEvent(Event::DirectConnectionUpgradeFailed { remote_peer_id: event_source, error: UpgradeError::Handler(error), }) @@ -269,8 +266,8 @@ impl NetworkBehaviour for Behaviour { remote_addrs, attempt, }) => { - self.queued_actions.push_back( - NetworkBehaviourAction::Dial { + self.queued_events.push_back( + ToSwarm::::Dial { opts: DialOpts::peer_id(event_source) .condition(dial_opts::PeerCondition::Always) .addresses(remote_addrs) @@ -289,8 +286,8 @@ impl NetworkBehaviour for Behaviour { relayed_connection_id, }, )) => { - self.queued_actions.extend([ - NetworkBehaviourAction::NotifyHandler { + self.queued_events.extend([ + ToSwarm::::NotifyHandler { peer_id: event_source, handler: NotifyHandler::One(relayed_connection_id), event: Either::Left( @@ -298,11 +295,9 @@ impl NetworkBehaviour for Behaviour { ), } .into(), - NetworkBehaviourAction::GenerateEvent( - Event::DirectConnectionUpgradeSucceeded { - remote_peer_id: event_source, - }, - ) + ToSwarm::::GenerateEvent(Event::DirectConnectionUpgradeSucceeded { + remote_peer_id: event_source, + }) .into(), ]); } @@ -310,12 +305,8 @@ impl NetworkBehaviour for Behaviour { }; } - fn poll( - &mut self, - _cx: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll> { - if let Some(action) = self.queued_actions.pop_front() { + fn poll(&mut self, _cx: &mut Context<'_>, _: &mut impl PollParameters) -> Poll> { + if let Some(action) = self.queued_events.pop_front() { return Poll::Ready(action.build(self.local_peer_id, &self.external_addresses)); } @@ -346,10 +337,10 @@ impl NetworkBehaviour for Behaviour { } } -/// A [`NetworkBehaviourAction`], either complete, or still requiring data from [`PollParameters`] +/// A [`ToSwarm`], either complete, or still requiring data from [`PollParameters`] /// before being returned in [`Behaviour::poll`]. -enum ActionBuilder { - Done(NetworkBehaviourAction), +enum ToSwarmBuilder { + Done(ToSwarm), Connect { attempt: u8, handler: NotifyHandler, @@ -362,18 +353,24 @@ enum ActionBuilder { }, } -impl From> for ActionBuilder { - fn from(action: NetworkBehaviourAction) -> Self { +impl From for ToSwarmBuilder { + fn from(e: Event) -> Self { + ToSwarmBuilder::Done(ToSwarm::::GenerateEvent(e)) + } +} + +impl From> for ToSwarmBuilder { + fn from(action: ToSwarm) -> Self { Self::Done(action) } } -impl ActionBuilder { +impl ToSwarmBuilder { fn build( self, local_peer_id: PeerId, external_addresses: &ExternalAddresses, - ) -> NetworkBehaviourAction { + ) -> ToSwarm { let obs_addrs = || { external_addresses .iter() @@ -384,12 +381,12 @@ impl ActionBuilder { }; match self { - ActionBuilder::Done(action) => action, - ActionBuilder::AcceptInboundConnect { + ToSwarmBuilder::Done(action) => action, + ToSwarmBuilder::AcceptInboundConnect { inbound_connect, handler, peer_id, - } => NetworkBehaviourAction::NotifyHandler { + } => ToSwarm::::NotifyHandler { handler, peer_id, event: Either::Left(handler::relayed::Command::AcceptInboundConnect { @@ -397,11 +394,11 @@ impl ActionBuilder { obs_addrs: obs_addrs(), }), }, - ActionBuilder::Connect { + ToSwarmBuilder::Connect { attempt, handler, peer_id, - } => NetworkBehaviourAction::NotifyHandler { + } => ToSwarm::::NotifyHandler { handler, peer_id, event: Either::Left(handler::relayed::Command::Connect { diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index 776c0e8551b..de22a4d56d2 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -27,10 +27,9 @@ use crate::FloodsubConfig; use cuckoofilter::{CuckooError, CuckooFilter}; use fnv::FnvHashSet; use libp2p_core::{connection::ConnectionId, PeerId}; -use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm}; +use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm, ToSwarm}; use libp2p_swarm::{ - dial_opts::DialOpts, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, OneShotHandler, - PollParameters, + dial_opts::DialOpts, NetworkBehaviour, NotifyHandler, OneShotHandler, PollParameters, }; use libp2p_swarm::{ConnectionHandler, IntoConnectionHandler}; use log::warn; @@ -42,12 +41,7 @@ use std::{collections::VecDeque, iter}; /// Network behaviour that handles the floodsub protocol. pub struct Floodsub { /// Events that need to be yielded to the outside when polling. - events: VecDeque< - NetworkBehaviourAction< - FloodsubEvent, - OneShotHandler, - >, - >, + events: VecDeque>, config: FloodsubConfig, @@ -92,24 +86,23 @@ impl Floodsub { // Send our topics to this node if we're already connected to it. if self.connected_peers.contains_key(&peer_id) { for topic in self.subscribed_topics.iter().cloned() { - self.events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id, - handler: NotifyHandler::Any, - event: FloodsubRpc { - messages: Vec::new(), - subscriptions: vec![FloodsubSubscription { - topic, - action: FloodsubSubscriptionAction::Subscribe, - }], - }, - }); + self.events.push_back(ToSwarm::::NotifyHandler { + peer_id, + handler: NotifyHandler::Any, + event: FloodsubRpc { + messages: Vec::new(), + subscriptions: vec![FloodsubSubscription { + topic, + action: FloodsubSubscriptionAction::Subscribe, + }], + }, + }); } } if self.target_peers.insert(peer_id) { let handler = self.new_handler(); - self.events.push_back(NetworkBehaviourAction::Dial { + self.events.push_back(ToSwarm::::Dial { opts: DialOpts::peer_id(peer_id).build(), handler, }); @@ -131,18 +124,17 @@ impl Floodsub { } for peer in self.connected_peers.keys() { - self.events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: *peer, - handler: NotifyHandler::Any, - event: FloodsubRpc { - messages: Vec::new(), - subscriptions: vec![FloodsubSubscription { - topic: topic.clone(), - action: FloodsubSubscriptionAction::Subscribe, - }], - }, - }); + self.events.push_back(ToSwarm::::NotifyHandler { + peer_id: *peer, + handler: NotifyHandler::Any, + event: FloodsubRpc { + messages: Vec::new(), + subscriptions: vec![FloodsubSubscription { + topic: topic.clone(), + action: FloodsubSubscriptionAction::Subscribe, + }], + }, + }); } self.subscribed_topics.push(topic); @@ -163,18 +155,17 @@ impl Floodsub { self.subscribed_topics.remove(pos); for peer in self.connected_peers.keys() { - self.events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: *peer, - handler: NotifyHandler::Any, - event: FloodsubRpc { - messages: Vec::new(), - subscriptions: vec![FloodsubSubscription { - topic: topic.clone(), - action: FloodsubSubscriptionAction::Unsubscribe, - }], - }, - }); + self.events.push_back(ToSwarm::::NotifyHandler { + peer_id: *peer, + handler: NotifyHandler::Any, + event: FloodsubRpc { + messages: Vec::new(), + subscriptions: vec![FloodsubSubscription { + topic: topic.clone(), + action: FloodsubSubscriptionAction::Unsubscribe, + }], + }, + }); } true @@ -240,9 +231,10 @@ impl Floodsub { ); } if self.config.subscribe_local_messages { - self.events.push_back(NetworkBehaviourAction::GenerateEvent( - FloodsubEvent::Message(message.clone()), - )); + self.events + .push_back(ToSwarm::::GenerateEvent(FloodsubEvent::Message( + message.clone(), + ))); } } // Don't publish the message if we have to check subscriptions @@ -266,15 +258,14 @@ impl Floodsub { continue; } - self.events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: *peer_id, - handler: NotifyHandler::Any, - event: FloodsubRpc { - subscriptions: Vec::new(), - messages: vec![message.clone()], - }, - }); + self.events.push_back(ToSwarm::::NotifyHandler { + peer_id: *peer_id, + handler: NotifyHandler::Any, + event: FloodsubRpc { + subscriptions: Vec::new(), + messages: vec![message.clone()], + }, + }); } } @@ -294,18 +285,17 @@ impl Floodsub { // We need to send our subscriptions to the newly-connected node. if self.target_peers.contains(&peer_id) { for topic in self.subscribed_topics.iter().cloned() { - self.events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id, - handler: NotifyHandler::Any, - event: FloodsubRpc { - messages: Vec::new(), - subscriptions: vec![FloodsubSubscription { - topic, - action: FloodsubSubscriptionAction::Subscribe, - }], - }, - }); + self.events.push_back(ToSwarm::::NotifyHandler { + peer_id, + handler: NotifyHandler::Any, + event: FloodsubRpc { + messages: Vec::new(), + subscriptions: vec![FloodsubSubscription { + topic, + action: FloodsubSubscriptionAction::Subscribe, + }], + }, + }); } } @@ -332,7 +322,7 @@ impl Floodsub { // try to reconnect. if self.target_peers.contains(&peer_id) { let handler = self.new_handler(); - self.events.push_back(NetworkBehaviourAction::Dial { + self.events.push_back(ToSwarm::::Dial { opts: DialOpts::peer_id(peer_id).build(), handler, }); @@ -371,7 +361,7 @@ impl NetworkBehaviour for Floodsub { if !remote_peer_topics.contains(&subscription.topic) { remote_peer_topics.push(subscription.topic.clone()); } - self.events.push_back(NetworkBehaviourAction::GenerateEvent( + self.events.push_back(ToSwarm::::GenerateEvent( FloodsubEvent::Subscribed { peer_id: propagation_source, topic: subscription.topic, @@ -385,7 +375,7 @@ impl NetworkBehaviour for Floodsub { { remote_peer_topics.remove(pos); } - self.events.push_back(NetworkBehaviourAction::GenerateEvent( + self.events.push_back(ToSwarm::::GenerateEvent( FloodsubEvent::Unsubscribed { peer_id: propagation_source, topic: subscription.topic, @@ -421,8 +411,7 @@ impl NetworkBehaviour for Floodsub { .any(|t| message.topics.iter().any(|u| t == u)) { let event = FloodsubEvent::Message(message.clone()); - self.events - .push_back(NetworkBehaviourAction::GenerateEvent(event)); + self.events.push_back(ToSwarm::::GenerateEvent(event)); } // Propagate the message to everyone else who is subscribed to any of the topics. @@ -459,20 +448,15 @@ impl NetworkBehaviour for Floodsub { } for (peer_id, rpc) in rpcs_to_dispatch { - self.events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id, - handler: NotifyHandler::Any, - event: rpc, - }); + self.events.push_back(ToSwarm::::NotifyHandler { + peer_id, + handler: NotifyHandler::Any, + event: rpc, + }); } } - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll> { + fn poll(&mut self, _: &mut Context<'_>, _: &mut impl PollParameters) -> Poll> { if let Some(event) = self.events.pop_front() { return Poll::Ready(event); } diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index c361fc4fdbc..42c741c5cf8 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -67,6 +67,7 @@ use crate::types::{ }; use crate::types::{GossipsubRpc, PeerConnections, PeerKind}; use crate::{rpc_proto, TopicScoreParams}; +use libp2p_swarm::behaviour::ToSwarm; use std::{cmp::Ordering::Equal, fmt::Debug}; use wasm_timer::Interval; @@ -3443,11 +3444,7 @@ where } } - fn poll( - &mut self, - cx: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll> { + fn poll(&mut self, cx: &mut Context<'_>, _: &mut impl PollParameters) -> Poll> { if let Some(event) = self.events.pop_front() { return Poll::Ready(event.map_in(|e: Arc| { // clone send event reference if others references are present diff --git a/protocols/identify/src/behaviour.rs b/protocols/identify/src/behaviour.rs index beb264da789..33eeabf4e91 100644 --- a/protocols/identify/src/behaviour.rs +++ b/protocols/identify/src/behaviour.rs @@ -23,11 +23,13 @@ use crate::protocol::{Info, Protocol, UpgradeError}; use libp2p_core::{ connection::ConnectionId, multiaddr, ConnectedPoint, Multiaddr, PeerId, PublicKey, }; -use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}; +use libp2p_swarm::behaviour::{ + ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm, ToSwarm, +}; use libp2p_swarm::{ dial_opts::DialOpts, AddressScore, ConnectionHandler, ConnectionHandlerUpgrErr, DialError, - ExternalAddresses, IntoConnectionHandler, ListenAddresses, NetworkBehaviour, - NetworkBehaviourAction, NotifyHandler, PollParameters, + ExternalAddresses, IntoConnectionHandler, ListenAddresses, NetworkBehaviour, NotifyHandler, + PollParameters, }; use lru::LruCache; use std::num::NonZeroUsize; @@ -43,7 +45,7 @@ use std::{ /// about them, and answers identify queries from other nodes. /// /// All external addresses of the local node supposedly observed by remotes -/// are reported via [`NetworkBehaviourAction::ReportObservedAddr`] with a +/// are reported via [`ToSwarm::ReportObservedAddr`] with a /// [score](AddressScore) of `1`. pub struct Behaviour { config: Config, @@ -54,7 +56,7 @@ pub struct Behaviour { /// with current information about the local peer. requests: Vec, /// Pending events to be emitted when polled. - events: VecDeque>, + events: VecDeque>, /// The addresses of all peers that we have discovered. discovered_peers: PeerCache, @@ -200,7 +202,7 @@ impl Behaviour { self.requests.push(request); let handler = self.new_handler(); - self.events.push_back(NetworkBehaviourAction::Dial { + self.events.push_back(ToSwarm::::Dial { opts: DialOpts::peer_id(p).build(), handler, }); @@ -268,27 +270,24 @@ impl NetworkBehaviour for Behaviour { let observed = info.observed_addr.clone(); self.events - .push_back(NetworkBehaviourAction::GenerateEvent(Event::Received { + .push_back(ToSwarm::::GenerateEvent(Event::Received { peer_id, info, })); - self.events - .push_back(NetworkBehaviourAction::ReportObservedAddr { - address: observed, - score: AddressScore::Finite(1), - }); + self.events.push_back(ToSwarm::::ReportObservedAddr { + address: observed, + score: AddressScore::Finite(1), + }); } handler::Event::Identification(peer) => { self.events - .push_back(NetworkBehaviourAction::GenerateEvent(Event::Sent { + .push_back(ToSwarm::::GenerateEvent(Event::Sent { peer_id: peer, })); } handler::Event::IdentificationPushed => { self.events - .push_back(NetworkBehaviourAction::GenerateEvent(Event::Pushed { - peer_id, - })); + .push_back(ToSwarm::::GenerateEvent(Event::Pushed { peer_id })); } handler::Event::Identify => { self.requests.push(Request { @@ -298,7 +297,7 @@ impl NetworkBehaviour for Behaviour { } handler::Event::IdentificationError(error) => { self.events - .push_back(NetworkBehaviourAction::GenerateEvent(Event::Error { + .push_back(ToSwarm::::GenerateEvent(Event::Error { peer_id, error, })); @@ -310,7 +309,7 @@ impl NetworkBehaviour for Behaviour { &mut self, _cx: &mut Context<'_>, params: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll> { if let Some(event) = self.events.pop_front() { return Poll::Ready(event); } @@ -320,7 +319,7 @@ impl NetworkBehaviour for Behaviour { Some(Request { peer_id, protocol: Protocol::Push, - }) => Poll::Ready(NetworkBehaviourAction::NotifyHandler { + }) => Poll::Ready(ToSwarm::::NotifyHandler { peer_id, handler: NotifyHandler::Any, event: InEvent { @@ -337,7 +336,7 @@ impl NetworkBehaviour for Behaviour { Some(Request { peer_id, protocol: Protocol::Identify(connection_id), - }) => Poll::Ready(NetworkBehaviourAction::NotifyHandler { + }) => Poll::Ready(ToSwarm::::NotifyHandler { peer_id, handler: NotifyHandler::One(connection_id), event: InEvent { diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 42081b22108..c725b8e5a13 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -41,12 +41,11 @@ use fnv::{FnvHashMap, FnvHashSet}; use instant::Instant; use libp2p_core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId}; use libp2p_swarm::behaviour::{ - AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm, + AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm, ToSwarm, }; use libp2p_swarm::{ dial_opts::{self, DialOpts}, - DialError, ExternalAddresses, ListenAddresses, NetworkBehaviour, NetworkBehaviourAction, - NotifyHandler, PollParameters, + DialError, ExternalAddresses, ListenAddresses, NetworkBehaviour, NotifyHandler, PollParameters, }; use log::{debug, info, warn}; use smallvec::SmallVec; @@ -101,7 +100,7 @@ pub struct Kademlia { connection_idle_timeout: Duration, /// Queued events to return when the behaviour is being polled. - queued_events: VecDeque>>, + queued_events: VecDeque>, listen_addresses: ListenAddresses, @@ -521,20 +520,19 @@ where match self.kbuckets.entry(&key) { kbucket::Entry::Present(mut entry, _) => { if entry.value().insert(address) { - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::RoutingUpdated { - peer: *peer, - is_new_peer: false, - addresses: entry.value().clone(), - old_peer: None, - bucket_range: self - .kbuckets - .bucket(&key) - .map(|b| b.range()) - .expect("Not kbucket::Entry::SelfEntry."), - }, - )) + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::RoutingUpdated { + peer: *peer, + is_new_peer: false, + addresses: entry.value().clone(), + old_peer: None, + bucket_range: self + .kbuckets + .bucket(&key) + .map(|b| b.range()) + .expect("Not kbucket::Entry::SelfEntry."), + }, + )) } RoutingUpdate::Success } @@ -551,20 +549,19 @@ where }; match entry.insert(addresses.clone(), status) { kbucket::InsertResult::Inserted => { - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::RoutingUpdated { - peer: *peer, - is_new_peer: true, - addresses, - old_peer: None, - bucket_range: self - .kbuckets - .bucket(&key) - .map(|b| b.range()) - .expect("Not kbucket::Entry::SelfEntry."), - }, - )); + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::RoutingUpdated { + peer: *peer, + is_new_peer: true, + addresses, + old_peer: None, + bucket_range: self + .kbuckets + .bucket(&key) + .map(|b| b.range()) + .expect("Not kbucket::Entry::SelfEntry."), + }, + )); RoutingUpdate::Success } kbucket::InsertResult::Full => { @@ -573,7 +570,7 @@ where } kbucket::InsertResult::Pending { disconnected } => { let handler = self.new_handler(); - self.queued_events.push_back(NetworkBehaviourAction::Dial { + self.queued_events.push_back(ToSwarm::Dial { opts: DialOpts::peer_id(disconnected.into_preimage()).build(), handler, }); @@ -728,15 +725,14 @@ where let stats = QueryStats::empty(); if let Some(record) = record { - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::OutboundQueryProgressed { - id, - result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(record))), - step, - stats, - }, - )); + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::OutboundQueryProgressed { + id, + result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(record))), + step, + stats, + }, + )); } id @@ -976,18 +972,17 @@ where let stats = QueryStats::empty(); if !providers.is_empty() { - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::OutboundQueryProgressed { - id, - result: QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders { - key, - providers, - })), - step, - stats, - }, - )); + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::OutboundQueryProgressed { + id, + result: QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders { + key, + providers, + })), + step, + stats, + }, + )); } id } @@ -1135,20 +1130,19 @@ where } if let Some(address) = address { if entry.value().insert(address) { - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::RoutingUpdated { - peer, - is_new_peer: false, - addresses: entry.value().clone(), - old_peer: None, - bucket_range: self - .kbuckets - .bucket(&key) - .map(|b| b.range()) - .expect("Not kbucket::Entry::SelfEntry."), - }, - )) + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::RoutingUpdated { + peer, + is_new_peer: false, + addresses: entry.value().clone(), + old_peer: None, + bucket_range: self + .kbuckets + .bucket(&key) + .map(|b| b.range()) + .expect("Not kbucket::Entry::SelfEntry."), + }, + )) } } } @@ -1169,16 +1163,14 @@ where } match (address, self.kbucket_inserts) { (None, _) => { - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::UnroutablePeer { peer }, - )); + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::UnroutablePeer { peer }, + )); } (Some(a), KademliaBucketInserts::Manual) => { - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::RoutablePeer { peer, address: a }, - )); + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::RoutablePeer { peer, address: a }, + )); } (Some(a), KademliaBucketInserts::OnConnected) => { let addresses = Addresses::new(a); @@ -1195,25 +1187,20 @@ where .map(|b| b.range()) .expect("Not kbucket::Entry::SelfEntry."), }; - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent(event)); + self.queued_events.push_back(ToSwarm::GenerateEvent(event)); } kbucket::InsertResult::Full => { debug!("Bucket full. Peer not added to routing table: {}", peer); let address = addresses.first().clone(); - self.queued_events.push_back( - NetworkBehaviourAction::GenerateEvent( - KademliaEvent::RoutablePeer { peer, address }, - ), - ); + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::RoutablePeer { peer, address }, + )); } kbucket::InsertResult::Pending { disconnected } => { let address = addresses.first().clone(); - self.queued_events.push_back( - NetworkBehaviourAction::GenerateEvent( - KademliaEvent::PendingRoutablePeer { peer, address }, - ), - ); + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::PendingRoutablePeer { peer, address }, + )); // `disconnected` might already be in the process of re-connecting. // In other words `disconnected` might have already re-connected but @@ -1223,7 +1210,7 @@ where // Only try dialing peer if not currently connected. if !self.connected_peers.contains(disconnected.preimage()) { let handler = self.new_handler(); - self.queued_events.push_back(NetworkBehaviourAction::Dial { + self.queued_events.push_back(ToSwarm::Dial { opts: DialOpts::peer_id(disconnected.into_preimage()) .build(), handler, @@ -1627,16 +1614,15 @@ where // If the (alleged) publisher is the local node, do nothing. The record of // the original publisher should never change as a result of replication // and the publisher is always assumed to have the "right" value. - self.queued_events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: source, - handler: NotifyHandler::One(connection), - event: KademliaHandlerIn::PutRecordRes { - key: record.key, - value: record.value, - request_id, - }, - }); + self.queued_events.push_back(ToSwarm::NotifyHandler { + peer_id: source, + handler: NotifyHandler::One(connection), + event: KademliaHandlerIn::PutRecordRes { + key: record.key, + value: record.value, + request_id, + }, + }); return; } @@ -1687,40 +1673,37 @@ where record.key, record.value.len() ); - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::InboundRequest { - request: InboundRequest::PutRecord { - source, - connection, - record: None, - }, + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::InboundRequest { + request: InboundRequest::PutRecord { + source, + connection, + record: None, }, - )); + }, + )); } Err(e) => { info!("Record not stored: {:?}", e); - self.queued_events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: source, - handler: NotifyHandler::One(connection), - event: KademliaHandlerIn::Reset(request_id), - }); + self.queued_events.push_back(ToSwarm::NotifyHandler { + peer_id: source, + handler: NotifyHandler::One(connection), + event: KademliaHandlerIn::Reset(request_id), + }); return; } }, KademliaStoreInserts::FilterBoth => { - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::InboundRequest { - request: InboundRequest::PutRecord { - source, - connection, - record: Some(record.clone()), - }, + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::InboundRequest { + request: InboundRequest::PutRecord { + source, + connection, + record: Some(record.clone()), }, - )); + }, + )); } } } @@ -1732,16 +1715,15 @@ where // closest nodes to the target. In addition returning // [`KademliaHandlerIn::PutRecordRes`] does not reveal any internal // information to a possibly malicious remote node. - self.queued_events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: source, - handler: NotifyHandler::One(connection), - event: KademliaHandlerIn::PutRecordRes { - key: record.key, - value: record.value, - request_id, - }, - }) + self.queued_events.push_back(ToSwarm::NotifyHandler { + peer_id: source, + handler: NotifyHandler::One(connection), + event: KademliaHandlerIn::PutRecordRes { + key: record.key, + value: record.value, + request_id, + }, + }) } /// Processes a provider record received from a peer. @@ -1760,22 +1742,20 @@ where return; } - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::InboundRequest { - request: InboundRequest::AddProvider { record: None }, - }, - )); + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::InboundRequest { + request: InboundRequest::AddProvider { record: None }, + }, + )); } KademliaStoreInserts::FilterBoth => { - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::InboundRequest { - request: InboundRequest::AddProvider { - record: Some(record), - }, + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::InboundRequest { + request: InboundRequest::AddProvider { + record: Some(record), }, - )); + }, + )); } } } @@ -1848,12 +1828,11 @@ where .position(|(p, _)| p == &peer_id) .map(|p| q.inner.pending_rpcs.remove(p)) }) { - self.queued_events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id, - event, - handler: NotifyHandler::Any, - }); + self.queued_events.push_back(ToSwarm::NotifyHandler { + peer_id, + event, + handler: NotifyHandler::Any, + }); } self.connected_peers.insert(peer_id); @@ -2047,24 +2026,22 @@ where KademliaHandlerEvent::FindNodeReq { key, request_id } => { let closer_peers = self.find_closest(&kbucket::Key::new(key), &source); - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::InboundRequest { - request: InboundRequest::FindNode { - num_closer_peers: closer_peers.len(), - }, + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::InboundRequest { + request: InboundRequest::FindNode { + num_closer_peers: closer_peers.len(), }, - )); + }, + )); - self.queued_events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: source, - handler: NotifyHandler::One(connection), - event: KademliaHandlerIn::FindNodeRes { - closer_peers, - request_id, - }, - }); + self.queued_events.push_back(ToSwarm::NotifyHandler { + peer_id: source, + handler: NotifyHandler::One(connection), + event: KademliaHandlerIn::FindNodeRes { + closer_peers, + request_id, + }, + }); } KademliaHandlerEvent::FindNodeRes { @@ -2078,26 +2055,24 @@ where let provider_peers = self.provider_peers(&key, &source); let closer_peers = self.find_closest(&kbucket::Key::new(key), &source); - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::InboundRequest { - request: InboundRequest::GetProvider { - num_closer_peers: closer_peers.len(), - num_provider_peers: provider_peers.len(), - }, + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::InboundRequest { + request: InboundRequest::GetProvider { + num_closer_peers: closer_peers.len(), + num_provider_peers: provider_peers.len(), }, - )); + }, + )); - self.queued_events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: source, - handler: NotifyHandler::One(connection), - event: KademliaHandlerIn::GetProvidersRes { - closer_peers, - provider_peers, - request_id, - }, - }); + self.queued_events.push_back(ToSwarm::NotifyHandler { + peer_id: source, + handler: NotifyHandler::One(connection), + event: KademliaHandlerIn::GetProvidersRes { + closer_peers, + provider_peers, + request_id, + }, + }); } KademliaHandlerEvent::GetProvidersRes { @@ -2119,20 +2094,19 @@ where *providers_found += provider_peers.len(); let providers = provider_peers.iter().map(|p| p.node_id).collect(); - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::OutboundQueryProgressed { - id: user_data, - result: QueryResult::GetProviders(Ok( - GetProvidersOk::FoundProviders { - key: key.clone(), - providers, - }, - )), - step: step.clone(), - stats, - }, - )); + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::OutboundQueryProgressed { + id: user_data, + result: QueryResult::GetProviders(Ok( + GetProvidersOk::FoundProviders { + key: key.clone(), + providers, + }, + )), + step: step.clone(), + stats, + }, + )); *step = step.next(); } } @@ -2177,26 +2151,24 @@ where let closer_peers = self.find_closest(&kbucket::Key::new(key), &source); - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::InboundRequest { - request: InboundRequest::GetRecord { - num_closer_peers: closer_peers.len(), - present_locally: record.is_some(), - }, + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::InboundRequest { + request: InboundRequest::GetRecord { + num_closer_peers: closer_peers.len(), + present_locally: record.is_some(), }, - )); + }, + )); - self.queued_events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: source, - handler: NotifyHandler::One(connection), - event: KademliaHandlerIn::GetRecordRes { - record, - closer_peers, - request_id, - }, - }); + self.queued_events.push_back(ToSwarm::NotifyHandler { + peer_id: source, + handler: NotifyHandler::One(connection), + event: KademliaHandlerIn::GetRecordRes { + record, + closer_peers, + request_id, + }, + }); } KademliaHandlerEvent::GetRecordRes { @@ -2220,17 +2192,16 @@ where record, }; - self.queued_events - .push_back(NetworkBehaviourAction::GenerateEvent( - KademliaEvent::OutboundQueryProgressed { - id: user_data, - result: QueryResult::GetRecord(Ok( - GetRecordOk::FoundRecord(record), - )), - step: step.clone(), - stats, - }, - )); + self.queued_events.push_back(ToSwarm::GenerateEvent( + KademliaEvent::OutboundQueryProgressed { + id: user_data, + result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord( + record, + ))), + step: step.clone(), + stats, + }, + )); *step = step.next(); } else { @@ -2291,11 +2262,7 @@ where }; } - fn poll( - &mut self, - cx: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll> { + fn poll(&mut self, cx: &mut Context<'_>, _: &mut impl PollParameters) -> Poll> { let now = Instant::now(); // Calculate the available capacity for queries triggered by background jobs. @@ -2354,7 +2321,7 @@ where addresses: value, old_peer: entry.evicted.map(|n| n.key.into_preimage()), }; - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + return Poll::Ready(ToSwarm::GenerateEvent(event)); } // Look for a finished query. @@ -2362,12 +2329,12 @@ where match self.queries.poll(now) { QueryPoolState::Finished(q) => { if let Some(event) = self.query_finished(q) { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + return Poll::Ready(ToSwarm::GenerateEvent(event)); } } QueryPoolState::Timeout(q) => { if let Some(event) = self.query_timeout(q) { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + return Poll::Ready(ToSwarm::GenerateEvent(event)); } } QueryPoolState::Waiting(Some((query, peer_id))) => { @@ -2386,16 +2353,15 @@ where } if self.connected_peers.contains(&peer_id) { - self.queued_events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id, - event, - handler: NotifyHandler::Any, - }); + self.queued_events.push_back(ToSwarm::NotifyHandler { + peer_id, + event, + handler: NotifyHandler::Any, + }); } else if &peer_id != self.kbuckets.local_key().preimage() { query.inner.pending_rpcs.push((peer_id, event)); let handler = self.new_handler(); - self.queued_events.push_back(NetworkBehaviourAction::Dial { + self.queued_events.push_back(ToSwarm::Dial { opts: DialOpts::peer_id(peer_id).build(), handler, }); diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index 815a23c9bdc..b07086fc77a 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -28,11 +28,8 @@ use crate::Config; use futures::Stream; use if_watch::IfEvent; use libp2p_core::{Multiaddr, PeerId}; -use libp2p_swarm::behaviour::{ConnectionClosed, FromSwarm}; -use libp2p_swarm::{ - dummy, ConnectionHandler, ListenAddresses, NetworkBehaviour, NetworkBehaviourAction, - PollParameters, -}; +use libp2p_swarm::behaviour::{ConnectionClosed, FromSwarm, ToSwarm}; +use libp2p_swarm::{dummy, ConnectionHandler, ListenAddresses, NetworkBehaviour, PollParameters}; use smallvec::SmallVec; use std::collections::hash_map::{Entry, HashMap}; use std::{cmp, fmt, io, net::IpAddr, pin::Pin, task::Context, task::Poll, time::Instant}; @@ -227,11 +224,7 @@ where } } - fn poll( - &mut self, - cx: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll> { + fn poll(&mut self, cx: &mut Context<'_>, _: &mut impl PollParameters) -> Poll> { // Poll ifwatch. while let Poll::Ready(Some(event)) = Pin::new(&mut self.if_watch).poll_next(cx) { match event { @@ -286,7 +279,7 @@ where let event = Event::Discovered(DiscoveredAddrsIter { inner: discovered.into_iter(), }); - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + return Poll::Ready(ToSwarm::::GenerateEvent(event)); } // Emit expired event. let now = Instant::now(); @@ -305,7 +298,7 @@ where let event = Event::Expired(ExpiredAddrsIter { inner: expired.into_iter(), }); - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + return Poll::Ready(ToSwarm::::GenerateEvent(event)); } if let Some(closest_expiration) = closest_expiration { let mut timer = P::Timer::at(closest_expiration); diff --git a/protocols/ping/src/lib.rs b/protocols/ping/src/lib.rs index 6e481500df9..0c8b0eabf9e 100644 --- a/protocols/ping/src/lib.rs +++ b/protocols/ping/src/lib.rs @@ -48,9 +48,8 @@ mod protocol; use handler::Handler; pub use handler::{Config, Failure, Success}; use libp2p_core::{connection::ConnectionId, PeerId}; -use libp2p_swarm::{ - behaviour::FromSwarm, NetworkBehaviour, NetworkBehaviourAction, PollParameters, -}; +use libp2p_swarm::behaviour::ToSwarm; +use libp2p_swarm::{behaviour, behaviour::FromSwarm, NetworkBehaviour, PollParameters}; use std::{ collections::VecDeque, task::{Context, Poll}, @@ -127,11 +126,7 @@ impl NetworkBehaviour for Behaviour { self.events.push_front(Event { peer, result }) } - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll> { + fn poll(&mut self, _: &mut Context<'_>, _: &mut impl PollParameters) -> Poll> { if let Some(e) = self.events.pop_back() { let Event { result, peer } = &e; @@ -141,16 +136,13 @@ impl NetworkBehaviour for Behaviour { _ => {} } - Poll::Ready(NetworkBehaviourAction::GenerateEvent(e)) + Poll::Ready(ToSwarm::::GenerateEvent(e)) } else { Poll::Pending } } - fn on_swarm_event( - &mut self, - event: libp2p_swarm::behaviour::FromSwarm, - ) { + fn on_swarm_event(&mut self, event: behaviour::FromSwarm) { match event { FromSwarm::ConnectionEstablished(_) | FromSwarm::ConnectionClosed(_) diff --git a/protocols/relay/src/v2/client.rs b/protocols/relay/src/v2/client.rs index d9a2d977588..cddf00c36d4 100644 --- a/protocols/relay/src/v2/client.rs +++ b/protocols/relay/src/v2/client.rs @@ -34,11 +34,10 @@ use futures::ready; use futures::stream::StreamExt; use libp2p_core::connection::ConnectionId; use libp2p_core::PeerId; -use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm}; +use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm, ToSwarm}; use libp2p_swarm::dial_opts::DialOpts; use libp2p_swarm::{ - ConnectionHandlerUpgrErr, NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, - NotifyHandler, PollParameters, + ConnectionHandlerUpgrErr, NegotiatedSubstream, NetworkBehaviour, NotifyHandler, PollParameters, }; use std::collections::{hash_map, HashMap, VecDeque}; use std::io::{Error, ErrorKind, IoSlice}; @@ -247,9 +246,9 @@ impl NetworkBehaviour for Client { &mut self, cx: &mut Context<'_>, _poll_parameters: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll> { if let Some(event) = self.queued_actions.pop_front() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + return Poll::Ready(ToSwarm::::GenerateEvent(event)); } let action = match ready!(self.from_transport.poll_next_unpin(cx)) { @@ -263,7 +262,7 @@ impl NetworkBehaviour for Client { .get(&relay_peer_id) .and_then(|cs| cs.get(0)) { - Some(connection_id) => NetworkBehaviourAction::NotifyHandler { + Some(connection_id) => ToSwarm::::NotifyHandler { peer_id: relay_peer_id, handler: NotifyHandler::One(*connection_id), event: Either::Left(handler::In::Reserve { to_listener }), @@ -273,7 +272,7 @@ impl NetworkBehaviour for Client { self.local_peer_id, Some(handler::In::Reserve { to_listener }), ); - NetworkBehaviourAction::Dial { + ToSwarm::::Dial { opts: DialOpts::peer_id(relay_peer_id) .addresses(vec![relay_addr]) .extend_addresses_through_behaviour() @@ -295,7 +294,7 @@ impl NetworkBehaviour for Client { .get(&relay_peer_id) .and_then(|cs| cs.get(0)) { - Some(connection_id) => NetworkBehaviourAction::NotifyHandler { + Some(connection_id) => ToSwarm::::NotifyHandler { peer_id: relay_peer_id, handler: NotifyHandler::One(*connection_id), event: Either::Left(handler::In::EstablishCircuit { @@ -311,7 +310,7 @@ impl NetworkBehaviour for Client { dst_peer_id, }), ); - NetworkBehaviourAction::Dial { + ToSwarm::::Dial { opts: DialOpts::peer_id(relay_peer_id) .addresses(vec![relay_addr]) .extend_addresses_through_behaviour() diff --git a/protocols/relay/src/v2/relay.rs b/protocols/relay/src/v2/relay.rs index 97f7099c95d..6abd7664794 100644 --- a/protocols/relay/src/v2/relay.rs +++ b/protocols/relay/src/v2/relay.rs @@ -30,10 +30,9 @@ use instant::Instant; use libp2p_core::connection::ConnectionId; use libp2p_core::multiaddr::Protocol; use libp2p_core::PeerId; -use libp2p_swarm::behaviour::{ConnectionClosed, FromSwarm}; +use libp2p_swarm::behaviour::{ConnectionClosed, FromSwarm, ToSwarm}; use libp2p_swarm::{ - ConnectionHandlerUpgrErr, ExternalAddresses, NetworkBehaviour, NetworkBehaviourAction, - NotifyHandler, PollParameters, + ConnectionHandlerUpgrErr, ExternalAddresses, NetworkBehaviour, NotifyHandler, PollParameters, }; use std::collections::{hash_map, HashMap, HashSet, VecDeque}; use std::num::NonZeroU32; @@ -199,8 +198,8 @@ pub struct Relay { reservations: HashMap>, circuits: CircuitsTracker, - /// Queue of actions to return when polled. - queued_actions: VecDeque, + /// Queue of events to return when polled. + queued_events: VecDeque, external_addresses: ExternalAddresses, } @@ -212,7 +211,7 @@ impl Relay { local_peer_id, reservations: Default::default(), circuits: Default::default(), - queued_actions: Default::default(), + queued_events: Default::default(), external_addresses: Default::default(), } } @@ -239,8 +238,8 @@ impl Relay { // Only emit [`CircuitClosed`] for accepted requests. .filter(|c| matches!(c.status, CircuitStatus::Accepted)) { - self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::CircuitClosed { + self.queued_events.push_back( + ToSwarm::::GenerateEvent(Event::CircuitClosed { src_peer_id: circuit.src_peer_id, dst_peer_id: circuit.dst_peer_id, error: Some(std::io::ErrorKind::ConnectionAborted.into()), @@ -335,7 +334,7 @@ impl NetworkBehaviour for Relay { .all(|limiter| { limiter.try_next(event_source, endpoint.get_remote_address(), now) }) { - NetworkBehaviourAction::NotifyHandler { + ToSwarm::::NotifyHandler { handler: NotifyHandler::One(connection), peer_id: event_source, event: Either::Left(handler::In::DenyReservationReq { @@ -351,14 +350,14 @@ impl NetworkBehaviour for Relay { .or_default() .insert(connection); - Action::AcceptReservationPrototype { + ToSwarmBuilder::AcceptReservationPrototype { handler: NotifyHandler::One(connection), peer_id: event_source, inbound_reservation_req, } }; - self.queued_actions.push_back(action); + self.queued_events.push_back(action); } handler::Event::ReservationReqAccepted { renewed } => { // Ensure local eventual consistent reservation state matches handler (source of @@ -368,8 +367,8 @@ impl NetworkBehaviour for Relay { .or_default() .insert(connection); - self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::ReservationReqAccepted { + self.queued_events.push_back( + ToSwarm::::GenerateEvent(Event::ReservationReqAccepted { src_peer_id: event_source, renewed, }) @@ -377,8 +376,8 @@ impl NetworkBehaviour for Relay { ); } handler::Event::ReservationReqAcceptFailed { error } => { - self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::ReservationReqAcceptFailed { + self.queued_events.push_back( + ToSwarm::::GenerateEvent(Event::ReservationReqAcceptFailed { src_peer_id: event_source, error, }) @@ -386,16 +385,16 @@ impl NetworkBehaviour for Relay { ); } handler::Event::ReservationReqDenied {} => { - self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::ReservationReqDenied { + self.queued_events.push_back( + ToSwarm::::GenerateEvent(Event::ReservationReqDenied { src_peer_id: event_source, }) .into(), ); } handler::Event::ReservationReqDenyFailed { error } => { - self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::ReservationReqDenyFailed { + self.queued_events.push_back( + ToSwarm::::GenerateEvent(Event::ReservationReqDenyFailed { src_peer_id: event_source, error, }) @@ -419,8 +418,8 @@ impl NetworkBehaviour for Relay { } } - self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::ReservationTimedOut { + self.queued_events.push_back( + ToSwarm::::GenerateEvent(Event::ReservationTimedOut { src_peer_id: event_source, }) .into(), @@ -449,7 +448,7 @@ impl NetworkBehaviour for Relay { limiter.try_next(event_source, endpoint.get_remote_address(), now) }) { // Deny circuit exceeding limits. - NetworkBehaviourAction::NotifyHandler { + ToSwarm::::NotifyHandler { handler: NotifyHandler::One(connection), peer_id: event_source, event: Either::Left(handler::In::DenyCircuitReq { @@ -472,7 +471,7 @@ impl NetworkBehaviour for Relay { dst_connection_id: *dst_conn, }); - NetworkBehaviourAction::NotifyHandler { + ToSwarm::::NotifyHandler { handler: NotifyHandler::One(*dst_conn), peer_id: event_source, event: Either::Left(handler::In::NegotiateOutboundConnect { @@ -485,7 +484,7 @@ impl NetworkBehaviour for Relay { } } else { // Deny circuit request if no reservation present. - NetworkBehaviourAction::NotifyHandler { + ToSwarm::::NotifyHandler { handler: NotifyHandler::One(connection), peer_id: event_source, event: Either::Left(handler::In::DenyCircuitReq { @@ -495,11 +494,11 @@ impl NetworkBehaviour for Relay { }), } }; - self.queued_actions.push_back(action.into()); + self.queued_events.push_back(action.into()); } handler::Event::CircuitReqReceiveFailed { error } => { - self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::CircuitReqReceiveFailed { + self.queued_events.push_back( + ToSwarm::::GenerateEvent(Event::CircuitReqReceiveFailed { src_peer_id: event_source, error, }) @@ -514,8 +513,8 @@ impl NetworkBehaviour for Relay { self.circuits.remove(circuit_id); } - self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::CircuitReqDenied { + self.queued_events.push_back( + ToSwarm::::GenerateEvent(Event::CircuitReqDenied { src_peer_id: event_source, dst_peer_id, }) @@ -531,8 +530,8 @@ impl NetworkBehaviour for Relay { self.circuits.remove(circuit_id); } - self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::CircuitReqDenyFailed { + self.queued_events.push_back( + ToSwarm::::GenerateEvent(Event::CircuitReqDenyFailed { src_peer_id: event_source, dst_peer_id, error, @@ -549,8 +548,8 @@ impl NetworkBehaviour for Relay { dst_stream, dst_pending_data, } => { - self.queued_actions.push_back( - NetworkBehaviourAction::NotifyHandler { + self.queued_events.push_back( + ToSwarm::::NotifyHandler { handler: NotifyHandler::One(src_connection_id), peer_id: src_peer_id, event: Either::Left(handler::In::AcceptAndDriveCircuit { @@ -573,8 +572,8 @@ impl NetworkBehaviour for Relay { status, error, } => { - self.queued_actions.push_back( - NetworkBehaviourAction::NotifyHandler { + self.queued_events.push_back( + ToSwarm::::NotifyHandler { handler: NotifyHandler::One(src_connection_id), peer_id: src_peer_id, event: Either::Left(handler::In::DenyCircuitReq { @@ -585,8 +584,8 @@ impl NetworkBehaviour for Relay { } .into(), ); - self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::CircuitReqOutboundConnectFailed { + self.queued_events.push_back( + ToSwarm::::GenerateEvent(Event::CircuitReqOutboundConnectFailed { src_peer_id, dst_peer_id: event_source, error, @@ -599,8 +598,8 @@ impl NetworkBehaviour for Relay { circuit_id, } => { self.circuits.accepted(circuit_id); - self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::CircuitReqAccepted { + self.queued_events.push_back( + ToSwarm::::GenerateEvent(Event::CircuitReqAccepted { src_peer_id: event_source, dst_peer_id, }) @@ -613,8 +612,8 @@ impl NetworkBehaviour for Relay { error, } => { self.circuits.remove(circuit_id); - self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::CircuitReqAcceptFailed { + self.queued_events.push_back( + ToSwarm::::GenerateEvent(Event::CircuitReqAcceptFailed { src_peer_id: event_source, dst_peer_id, error, @@ -629,8 +628,8 @@ impl NetworkBehaviour for Relay { } => { self.circuits.remove(circuit_id); - self.queued_actions.push_back( - NetworkBehaviourAction::GenerateEvent(Event::CircuitClosed { + self.queued_events.push_back( + ToSwarm::::GenerateEvent(Event::CircuitClosed { src_peer_id: event_source, dst_peer_id, error, @@ -641,12 +640,8 @@ impl NetworkBehaviour for Relay { } } - fn poll( - &mut self, - _cx: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll> { - if let Some(action) = self.queued_actions.pop_front() { + fn poll(&mut self, _cx: &mut Context<'_>, _: &mut impl PollParameters) -> Poll> { + if let Some(action) = self.queued_events.pop_front() { return Poll::Ready(action.build(self.local_peer_id, &self.external_addresses)); } @@ -744,11 +739,11 @@ impl Add for CircuitId { } } -/// A [`NetworkBehaviourAction`], either complete, or still requiring data from [`PollParameters`] +/// A [`ToSwarm`], either complete, or still requiring data from [`PollParameters`] /// before being returned in [`Relay::poll`]. #[allow(clippy::large_enum_variant)] -enum Action { - Done(NetworkBehaviourAction), +enum ToSwarmBuilder { + Done(ToSwarm), AcceptReservationPrototype { inbound_reservation_req: inbound_hop::ReservationReq, handler: NotifyHandler, @@ -756,25 +751,25 @@ enum Action { }, } -impl From> for Action { - fn from(action: NetworkBehaviourAction) -> Self { +impl From> for ToSwarmBuilder { + fn from(action: ToSwarm) -> Self { Self::Done(action) } } -impl Action { +impl ToSwarmBuilder { fn build( self, local_peer_id: PeerId, external_addresses: &ExternalAddresses, - ) -> NetworkBehaviourAction { + ) -> ToSwarm { match self { - Action::Done(action) => action, - Action::AcceptReservationPrototype { + ToSwarmBuilder::Done(action) => action, + ToSwarmBuilder::AcceptReservationPrototype { inbound_reservation_req, handler, peer_id, - } => NetworkBehaviourAction::NotifyHandler { + } => ToSwarm::::NotifyHandler { handler, peer_id, event: Either::Left(handler::In::AcceptReservationReq { diff --git a/protocols/rendezvous/src/client.rs b/protocols/rendezvous/src/client.rs index 599fc8f508f..60d06545066 100644 --- a/protocols/rendezvous/src/client.rs +++ b/protocols/rendezvous/src/client.rs @@ -32,22 +32,16 @@ use libp2p_core::connection::ConnectionId; use libp2p_core::identity::error::SigningError; use libp2p_core::identity::Keypair; use libp2p_core::{Multiaddr, PeerId, PeerRecord}; -use libp2p_swarm::behaviour::FromSwarm; +use libp2p_swarm::behaviour::{FromSwarm, ToSwarm}; use libp2p_swarm::{ - CloseConnection, ExternalAddresses, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, - PollParameters, + CloseConnection, ExternalAddresses, NetworkBehaviour, NotifyHandler, PollParameters, }; use std::collections::{HashMap, VecDeque}; use std::iter::FromIterator; use std::task::{Context, Poll}; pub struct Behaviour { - events: VecDeque< - NetworkBehaviourAction< - Event, - SubstreamConnectionHandler, - >, - >, + events: VecDeque>, keypair: Keypair, pending_register_requests: Vec<(Namespace, PeerId, Option)>, @@ -80,7 +74,7 @@ impl Behaviour { /// Register our external addresses in the given namespace with the given rendezvous peer. /// /// External addresses are either manually added via [`libp2p_swarm::Swarm::add_external_address`] or reported - /// by other [`NetworkBehaviour`]s via [`NetworkBehaviourAction::ReportObservedAddr`]. + /// by other [`NetworkBehaviour`]s via [`ToSwarm::ReportObservedAddr`]. pub fn register(&mut self, namespace: Namespace, rendezvous_node: PeerId, ttl: Option) { self.pending_register_requests .push((namespace, rendezvous_node, ttl)); @@ -88,14 +82,13 @@ impl Behaviour { /// Unregister ourselves from the given namespace with the given rendezvous peer. pub fn unregister(&mut self, namespace: Namespace, rendezvous_node: PeerId) { - self.events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: rendezvous_node, - event: handler::OutboundInEvent::NewSubstream { - open_info: OpenInfo::UnregisterRequest(namespace), - }, - handler: NotifyHandler::Any, - }); + self.events.push_back(ToSwarm::::NotifyHandler { + peer_id: rendezvous_node, + event: handler::OutboundInEvent::NewSubstream { + open_info: OpenInfo::UnregisterRequest(namespace), + }, + handler: NotifyHandler::Any, + }); } /// Discover other peers at a given rendezvous peer. @@ -112,18 +105,17 @@ impl Behaviour { limit: Option, rendezvous_node: PeerId, ) { - self.events - .push_back(NetworkBehaviourAction::NotifyHandler { - peer_id: rendezvous_node, - event: handler::OutboundInEvent::NewSubstream { - open_info: OpenInfo::DiscoverRequest { - namespace: ns, - cookie, - limit, - }, + self.events.push_back(ToSwarm::::NotifyHandler { + peer_id: rendezvous_node, + event: handler::OutboundInEvent::NewSubstream { + open_info: OpenInfo::DiscoverRequest { + namespace: ns, + cookie, + limit, }, - handler: NotifyHandler::Any, - }); + }, + handler: NotifyHandler::Any, + }); } } @@ -206,7 +198,7 @@ impl NetworkBehaviour for Behaviour { handler::OutboundOutEvent::OutboundError { error, .. } => { log::warn!("Connection with peer {} failed: {}", peer_id, error); - vec![NetworkBehaviourAction::CloseConnection { + vec![ToSwarm::::CloseConnection { peer_id, connection: CloseConnection::One(connection_id), }] @@ -216,11 +208,7 @@ impl NetworkBehaviour for Behaviour { self.events.extend(new_events); } - fn poll( - &mut self, - cx: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll> { + fn poll(&mut self, cx: &mut Context<'_>, _: &mut impl PollParameters) -> Poll> { if let Some(event) = self.events.pop_front() { return Poll::Ready(event); } @@ -232,13 +220,13 @@ impl NetworkBehaviour for Behaviour { let external_addresses = self.external_addresses.iter().cloned().collect::>(); if external_addresses.is_empty() { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent( - Event::RegisterFailed(RegisterError::NoExternalAddresses), - )); + return Poll::Ready(ToSwarm::::GenerateEvent(Event::RegisterFailed( + RegisterError::NoExternalAddresses, + ))); } let action = match PeerRecord::new(&self.keypair, external_addresses) { - Ok(peer_record) => NetworkBehaviourAction::NotifyHandler { + Ok(peer_record) => ToSwarm::::NotifyHandler { peer_id: rendezvous_node, event: handler::OutboundInEvent::NewSubstream { open_info: OpenInfo::RegisterRequest(NewRegistration { @@ -249,7 +237,7 @@ impl NetworkBehaviour for Behaviour { }, handler: NotifyHandler::Any, }, - Err(signing_error) => NetworkBehaviourAction::GenerateEvent(Event::RegisterFailed( + Err(signing_error) => ToSwarm::::GenerateEvent(Event::RegisterFailed( RegisterError::FailedToMakeRecord(signing_error), )), }; @@ -261,7 +249,7 @@ impl NetworkBehaviour for Behaviour { futures::ready!(self.expiring_registrations.poll_next_unpin(cx)) { self.discovered_peers.remove(&expired_registration); - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(Event::Expired { + return Poll::Ready(ToSwarm::::GenerateEvent(Event::Expired { peer: expired_registration.0, })); } @@ -294,28 +282,23 @@ fn handle_outbound_event( peer_id: PeerId, discovered_peers: &mut HashMap<(PeerId, Namespace), Vec>, expiring_registrations: &mut FuturesUnordered>, -) -> Vec< - NetworkBehaviourAction< - Event, - SubstreamConnectionHandler, - >, -> { +) -> Vec> { match event { outbound::OutEvent::Registered { namespace, ttl } => { - vec![NetworkBehaviourAction::GenerateEvent(Event::Registered { + vec![ToSwarm::::GenerateEvent(Event::Registered { rendezvous_node: peer_id, ttl, namespace, })] } outbound::OutEvent::RegisterFailed(namespace, error) => { - vec![NetworkBehaviourAction::GenerateEvent( - Event::RegisterFailed(RegisterError::Remote { + vec![ToSwarm::::GenerateEvent(Event::RegisterFailed( + RegisterError::Remote { rendezvous_node: peer_id, namespace, error, - }), - )] + }, + ))] } outbound::OutEvent::Discovered { registrations, @@ -339,20 +322,18 @@ fn handle_outbound_event( .boxed() })); - vec![NetworkBehaviourAction::GenerateEvent(Event::Discovered { + vec![ToSwarm::::GenerateEvent(Event::Discovered { rendezvous_node: peer_id, registrations, cookie, })] } outbound::OutEvent::DiscoverFailed { namespace, error } => { - vec![NetworkBehaviourAction::GenerateEvent( - Event::DiscoverFailed { - rendezvous_node: peer_id, - namespace, - error, - }, - )] + vec![ToSwarm::::GenerateEvent(Event::DiscoverFailed { + rendezvous_node: peer_id, + namespace, + error, + })] } } } diff --git a/protocols/rendezvous/src/server.rs b/protocols/rendezvous/src/server.rs index 4126b6e3e28..fcb8d9c20ba 100644 --- a/protocols/rendezvous/src/server.rs +++ b/protocols/rendezvous/src/server.rs @@ -29,10 +29,8 @@ use futures::stream::FuturesUnordered; use futures::{FutureExt, StreamExt}; use libp2p_core::connection::ConnectionId; use libp2p_core::PeerId; -use libp2p_swarm::behaviour::FromSwarm; -use libp2p_swarm::{ - CloseConnection, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, -}; +use libp2p_swarm::behaviour::{FromSwarm, ToSwarm}; +use libp2p_swarm::{CloseConnection, NetworkBehaviour, NotifyHandler, PollParameters}; use std::collections::{HashMap, HashSet, VecDeque}; use std::iter::FromIterator; use std::task::{Context, Poll}; @@ -40,9 +38,7 @@ use std::time::Duration; use void::Void; pub struct Behaviour { - events: VecDeque< - NetworkBehaviourAction>, - >, + events: VecDeque>, registrations: Registrations, } @@ -133,7 +129,7 @@ impl NetworkBehaviour for Behaviour { handler::InboundOutEvent::InboundError { error, .. } => { log::warn!("Connection with peer {} failed: {}", peer_id, error); - vec![NetworkBehaviourAction::CloseConnection { + vec![ToSwarm::::CloseConnection { peer_id, connection: CloseConnection::One(connection), }] @@ -144,15 +140,11 @@ impl NetworkBehaviour for Behaviour { self.events.extend(new_events); } - fn poll( - &mut self, - cx: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll> { + fn poll(&mut self, cx: &mut Context<'_>, _: &mut impl PollParameters) -> Poll> { if let Poll::Ready(ExpiredRegistration(registration)) = self.registrations.poll(cx) { - return Poll::Ready(NetworkBehaviourAction::GenerateEvent( - Event::RegistrationExpired(registration), - )); + return Poll::Ready(ToSwarm::::GenerateEvent(Event::RegistrationExpired( + registration, + ))); } if let Some(event) = self.events.pop_front() { @@ -186,7 +178,7 @@ fn handle_inbound_event( connection: ConnectionId, id: InboundSubstreamId, registrations: &mut Registrations, -) -> Vec>> { +) -> Vec> { match event { // bad registration inbound::OutEvent::RegistrationRequested(registration) @@ -195,7 +187,7 @@ fn handle_inbound_event( let error = ErrorCode::NotAuthorized; vec![ - NetworkBehaviourAction::NotifyHandler { + ToSwarm::::NotifyHandler { peer_id, handler: NotifyHandler::One(connection), event: handler::InboundInEvent::NotifyInboundSubstream { @@ -203,7 +195,7 @@ fn handle_inbound_event( message: inbound::InEvent::DeclineRegisterRequest(error), }, }, - NetworkBehaviourAction::GenerateEvent(Event::PeerNotRegistered { + ToSwarm::::GenerateEvent(Event::PeerNotRegistered { peer: peer_id, namespace: registration.namespace, error, @@ -216,7 +208,7 @@ fn handle_inbound_event( match registrations.add(registration) { Ok(registration) => { vec![ - NetworkBehaviourAction::NotifyHandler { + ToSwarm::::NotifyHandler { peer_id, handler: NotifyHandler::One(connection), event: handler::InboundInEvent::NotifyInboundSubstream { @@ -226,7 +218,7 @@ fn handle_inbound_event( }, }, }, - NetworkBehaviourAction::GenerateEvent(Event::PeerRegistered { + ToSwarm::::GenerateEvent(Event::PeerRegistered { peer: peer_id, registration, }), @@ -236,7 +228,7 @@ fn handle_inbound_event( let error = ErrorCode::InvalidTtl; vec![ - NetworkBehaviourAction::NotifyHandler { + ToSwarm::::NotifyHandler { peer_id, handler: NotifyHandler::One(connection), event: handler::InboundInEvent::NotifyInboundSubstream { @@ -244,7 +236,7 @@ fn handle_inbound_event( message: inbound::InEvent::DeclineRegisterRequest(error), }, }, - NetworkBehaviourAction::GenerateEvent(Event::PeerNotRegistered { + ToSwarm::::GenerateEvent(Event::PeerNotRegistered { peer: peer_id, namespace, error, @@ -262,7 +254,7 @@ fn handle_inbound_event( let discovered = registrations.cloned().collect::>(); vec![ - NetworkBehaviourAction::NotifyHandler { + ToSwarm::::NotifyHandler { peer_id, handler: NotifyHandler::One(connection), event: handler::InboundInEvent::NotifyInboundSubstream { @@ -273,7 +265,7 @@ fn handle_inbound_event( }, }, }, - NetworkBehaviourAction::GenerateEvent(Event::DiscoverServed { + ToSwarm::::GenerateEvent(Event::DiscoverServed { enquirer: peer_id, registrations: discovered, }), @@ -283,7 +275,7 @@ fn handle_inbound_event( let error = ErrorCode::InvalidCookie; vec![ - NetworkBehaviourAction::NotifyHandler { + ToSwarm::::NotifyHandler { peer_id, handler: NotifyHandler::One(connection), event: handler::InboundInEvent::NotifyInboundSubstream { @@ -291,7 +283,7 @@ fn handle_inbound_event( message: inbound::InEvent::DeclineDiscoverRequest(error), }, }, - NetworkBehaviourAction::GenerateEvent(Event::DiscoverNotServed { + ToSwarm::::GenerateEvent(Event::DiscoverNotServed { enquirer: peer_id, error, }), @@ -301,7 +293,7 @@ fn handle_inbound_event( inbound::OutEvent::UnregisterRequested(namespace) => { registrations.remove(namespace.clone(), peer_id); - vec![NetworkBehaviourAction::GenerateEvent( + vec![ToSwarm::::GenerateEvent( Event::PeerUnregistered { peer: peer_id, namespace, diff --git a/protocols/request-response/src/lib.rs b/protocols/request-response/src/lib.rs index f5fa3067f80..2811f1c9c0a 100644 --- a/protocols/request-response/src/lib.rs +++ b/protocols/request-response/src/lib.rs @@ -71,10 +71,11 @@ pub use handler::ProtocolSupport; use futures::channel::oneshot; use handler::{Handler, RequestProtocol}; use libp2p_core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId}; +use libp2p_swarm::behaviour::ToSwarm; use libp2p_swarm::{ behaviour::{AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm}, dial_opts::DialOpts, - IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters, + IntoConnectionHandler, NetworkBehaviour, NotifyHandler, PollParameters, }; use smallvec::SmallVec; use std::{ @@ -348,8 +349,7 @@ where /// The protocol codec for reading and writing requests and responses. codec: TCodec, /// Pending events to return from `poll`. - pending_events: - VecDeque, Handler>>, + pending_events: VecDeque>, /// The currently connected peers, their pending outbound and inbound responses and their known, /// reachable addresses, if any. connected: HashMap>, @@ -417,7 +417,7 @@ where if let Some(request) = self.try_send_request(peer, request) { let handler = self.new_handler(); - self.pending_events.push_back(NetworkBehaviourAction::Dial { + self.pending_events.push_back(ToSwarm::::Dial { opts: DialOpts::peer_id(*peer).build(), handler, }); @@ -538,7 +538,7 @@ where let conn = &mut connections[ix]; conn.pending_inbound_responses.insert(request.request_id); self.pending_events - .push_back(NetworkBehaviourAction::NotifyHandler { + .push_back(ToSwarm::::NotifyHandler { peer_id: *peer, handler: NotifyHandler::One(conn.id), event: request, @@ -674,24 +674,20 @@ where for request_id in connection.pending_outbound_responses { self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::InboundFailure { - peer: peer_id, - request_id, - error: InboundFailure::ConnectionClosed, - }, - )); + .push_back(ToSwarm::::GenerateEvent(Event::InboundFailure { + peer: peer_id, + request_id, + error: InboundFailure::ConnectionClosed, + })); } for request_id in connection.pending_inbound_responses { self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::OutboundFailure { - peer: peer_id, - request_id, - error: OutboundFailure::ConnectionClosed, - }, - )); + .push_back(ToSwarm::::GenerateEvent(Event::OutboundFailure { + peer: peer_id, + request_id, + error: OutboundFailure::ConnectionClosed, + })); } } @@ -709,13 +705,11 @@ where if let Some(pending) = self.pending_outbound_requests.remove(&peer) { for request in pending { self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::OutboundFailure { - peer, - request_id: request.request_id, - error: OutboundFailure::DialFailure, - }, - )); + .push_back(ToSwarm::::GenerateEvent(Event::OutboundFailure { + peer, + request_id: request.request_id, + error: OutboundFailure::DialFailure, + })); } } } @@ -794,7 +788,7 @@ where response, }; self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent(Event::Message { + .push_back(ToSwarm::::GenerateEvent(Event::Message { peer, message, })); @@ -811,7 +805,7 @@ where channel, }; self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent(Event::Message { + .push_back(ToSwarm::::GenerateEvent(Event::Message { peer, message, })); @@ -824,13 +818,11 @@ where // Connection closed after `Event::Request` has been emitted. None => { self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::InboundFailure { - peer, - request_id, - error: InboundFailure::ConnectionClosed, - }, - )); + .push_back(ToSwarm::::GenerateEvent(Event::InboundFailure { + peer, + request_id, + error: InboundFailure::ConnectionClosed, + })); } } } @@ -842,7 +834,7 @@ where ); self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent(Event::ResponseSent { + .push_back(ToSwarm::::GenerateEvent(Event::ResponseSent { peer, request_id, })); @@ -855,13 +847,11 @@ where ); self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::InboundFailure { - peer, - request_id, - error: InboundFailure::ResponseOmission, - }, - )); + .push_back(ToSwarm::::GenerateEvent(Event::InboundFailure { + peer, + request_id, + error: InboundFailure::ResponseOmission, + })); } handler::Event::OutboundTimeout(request_id) => { let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); @@ -871,13 +861,11 @@ where ); self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::OutboundFailure { - peer, - request_id, - error: OutboundFailure::Timeout, - }, - )); + .push_back(ToSwarm::::GenerateEvent(Event::OutboundFailure { + peer, + request_id, + error: OutboundFailure::Timeout, + })); } handler::Event::InboundTimeout(request_id) => { // Note: `Event::InboundTimeout` is emitted both for timing @@ -887,13 +875,11 @@ where self.remove_pending_outbound_response(&peer, connection, request_id); self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::InboundFailure { - peer, - request_id, - error: InboundFailure::Timeout, - }, - )); + .push_back(ToSwarm::::GenerateEvent(Event::InboundFailure { + peer, + request_id, + error: InboundFailure::Timeout, + })); } handler::Event::OutboundUnsupportedProtocols(request_id) => { let removed = self.remove_pending_inbound_response(&peer, connection, &request_id); @@ -903,35 +889,27 @@ where ); self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::OutboundFailure { - peer, - request_id, - error: OutboundFailure::UnsupportedProtocols, - }, - )); + .push_back(ToSwarm::::GenerateEvent(Event::OutboundFailure { + peer, + request_id, + error: OutboundFailure::UnsupportedProtocols, + })); } handler::Event::InboundUnsupportedProtocols(request_id) => { // Note: No need to call `self.remove_pending_outbound_response`, // `Event::Request` was never emitted for this request and // thus request was never added to `pending_outbound_responses`. self.pending_events - .push_back(NetworkBehaviourAction::GenerateEvent( - Event::InboundFailure { - peer, - request_id, - error: InboundFailure::UnsupportedProtocols, - }, - )); + .push_back(ToSwarm::::GenerateEvent(Event::InboundFailure { + peer, + request_id, + error: InboundFailure::UnsupportedProtocols, + })); } } } - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll> { + fn poll(&mut self, _: &mut Context<'_>, _: &mut impl PollParameters) -> Poll> { if let Some(ev) = self.pending_events.pop_front() { return Poll::Ready(ev); } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD { diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index e6f0a92c8ec..ef48ead00f3 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -5,9 +5,14 @@ - Add `estblished_in` to `SwarmEvent::ConnectionEstablished`. See [PR 3134]. +- Introduce `ToSwarm` type alias as an alternative to `NetworkBehaviourAction`. + `ToSwarm` should generally be preferred because it only requires the corresponding `NetworkBehaviour` as a type parameter. + See [PR XXXX]. + [PR 3170]: https://github.com/libp2p/rust-libp2p/pull/3170 [PR 3134]: https://github.com/libp2p/rust-libp2p/pull/3134 [PR 3153]: https://github.com/libp2p/rust-libp2p/pull/3153 +[PR XXXX]: https://github.com/libp2p/rust-libp2p/pull/XXXX # 0.41.1 diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index 141d6c3efbd..79d21846249 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -28,7 +28,7 @@ pub use listen_addresses::ListenAddresses; use crate::dial_opts::DialOpts; use crate::handler::{ConnectionHandler, IntoConnectionHandler}; -use crate::{AddressRecord, AddressScore, DialError}; +use crate::{AddressRecord, AddressScore, DialError, TBehaviourOutEvent, THandler}; use libp2p_core::{ connection::ConnectionId, transport::ListenerId, ConnectedPoint, Multiaddr, PeerId, }; @@ -142,7 +142,7 @@ pub trait NetworkBehaviour: 'static { /// (ie. the objects returned by `new_handler`) can communicate by passing messages. Messages /// sent from the handler to the behaviour are injected with [`NetworkBehaviour::inject_event`], /// and the behaviour can send a message to the handler by making [`NetworkBehaviour::poll`] - /// return [`NetworkBehaviourAction::NotifyHandler`]. + /// return [`ToSwarm::NotifyHandler`]. /// /// Note that the handler is returned to the [`NetworkBehaviour`] on connection failure and /// connection closing. @@ -386,7 +386,7 @@ pub trait NetworkBehaviour: 'static { &mut self, cx: &mut Context<'_>, params: &mut impl PollParameters, - ) -> Poll>; + ) -> Poll>; } /// Parameters passed to `poll()`, that the `NetworkBehaviour` has access to. @@ -428,6 +428,12 @@ pub trait PollParameters { fn local_peer_id(&self) -> &PeerId; } +pub type ToSwarm = NetworkBehaviourAction< + TBehaviourOutEvent, + THandler, + THandlerInEvent>, +>; + /// An action that a [`NetworkBehaviour`] can trigger in the [`Swarm`] /// in whose context it is executing. /// @@ -470,7 +476,7 @@ pub enum NetworkBehaviourAction< /// # use libp2p_swarm::{ /// # DialError, IntoConnectionHandler, KeepAlive, NegotiatedSubstream, /// # NetworkBehaviour, NetworkBehaviourAction, PollParameters, ConnectionHandler, - /// # ConnectionHandlerEvent, ConnectionHandlerUpgrErr, SubstreamProtocol, Swarm, SwarmEvent, + /// # ConnectionHandlerEvent, ConnectionHandlerUpgrErr, SubstreamProtocol, Swarm, SwarmEvent, behaviour::ToSwarm /// # }; /// # use libp2p_swarm::dial_opts::{DialOpts, PeerCondition}; /// # use libp2p_yamux as yamux; @@ -552,14 +558,14 @@ pub enum NetworkBehaviourAction< /// // the precious message is not lost and we can return it back to the user. /// let msg = handler.message.unwrap(); /// self.outbox_to_swarm - /// .push_back(NetworkBehaviourAction::GenerateEvent(msg)) + /// .push_back(ToSwarm::GenerateEvent(msg)) /// } /// # /// # fn poll( /// # &mut self, /// # _: &mut Context<'_>, /// # _: &mut impl PollParameters, - /// # ) -> Poll> { + /// # ) -> Poll> { /// # if let Some(action) = self.outbox_to_swarm.pop_front() { /// # return Poll::Ready(action); /// # } @@ -676,7 +682,7 @@ pub enum NetworkBehaviourAction< /// with the given peer. /// /// Note: Closing a connection via - /// [`NetworkBehaviourAction::CloseConnection`] does not inform the + /// [`ToSwarm::CloseConnection`] does not inform the /// corresponding [`ConnectionHandler`]. /// Closing a connection via a [`ConnectionHandler`] can be done /// either in a collaborative manner across [`ConnectionHandler`]s diff --git a/swarm/src/behaviour/either.rs b/swarm/src/behaviour/either.rs index 4154db1a0de..15c2d8a104c 100644 --- a/swarm/src/behaviour/either.rs +++ b/swarm/src/behaviour/either.rs @@ -18,9 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::behaviour::{ - self, inject_from_swarm, NetworkBehaviour, NetworkBehaviourAction, PollParameters, -}; +use crate::behaviour::{self, inject_from_swarm, NetworkBehaviour, PollParameters, ToSwarm}; use crate::handler::either::IntoEitherHandler; use either::Either; use libp2p_core::{Multiaddr, PeerId}; @@ -97,7 +95,7 @@ where &mut self, cx: &mut Context<'_>, params: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll> { let event = match self { Either::Left(behaviour) => futures::ready!(behaviour.poll(cx, params)) .map_out(Either::Left) diff --git a/swarm/src/behaviour/toggle.rs b/swarm/src/behaviour/toggle.rs index 81255a40274..d0fe6f38497 100644 --- a/swarm/src/behaviour/toggle.rs +++ b/swarm/src/behaviour/toggle.rs @@ -18,14 +18,14 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::behaviour::{inject_from_swarm, FromSwarm}; +use crate::behaviour::{inject_from_swarm, FromSwarm, ToSwarm}; use crate::handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, IntoConnectionHandler, KeepAlive, ListenUpgradeError, SubstreamProtocol, }; use crate::upgrade::SendWrapper; -use crate::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; +use crate::{NetworkBehaviour, PollParameters}; use either::Either; use libp2p_core::{ either::{EitherError, EitherOutput}, @@ -108,7 +108,7 @@ where &mut self, cx: &mut Context<'_>, params: &mut impl PollParameters, - ) -> Poll> { + ) -> Poll> { if let Some(inner) = self.inner.as_mut() { inner.poll(cx, params).map(|action| { action.map_handler(|h| ToggleIntoConnectionHandler { inner: Some(h) }) diff --git a/swarm/src/dial_opts.rs b/swarm/src/dial_opts.rs index edc69484b68..5495eb187ae 100644 --- a/swarm/src/dial_opts.rs +++ b/swarm/src/dial_opts.rs @@ -26,7 +26,7 @@ use std::num::NonZeroU8; /// Options to configure a dial to a known or unknown peer. /// /// Used in [`Swarm::dial`](crate::Swarm::dial) and -/// [`NetworkBehaviourAction::Dial`](crate::behaviour::NetworkBehaviourAction::Dial). +/// [`NetworkBehaviourAction::Dial`](crate::behaviour::ToSwarm::Dial). /// /// To construct use either of: /// diff --git a/swarm/src/dummy.rs b/swarm/src/dummy.rs index 4ec58581c2e..e4c01681123 100644 --- a/swarm/src/dummy.rs +++ b/swarm/src/dummy.rs @@ -1,4 +1,4 @@ -use crate::behaviour::{FromSwarm, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; +use crate::behaviour::{FromSwarm, NetworkBehaviour, PollParameters, ToSwarm}; use crate::handler::{ ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, }; @@ -25,11 +25,7 @@ impl NetworkBehaviour for Behaviour { void::unreachable(event) } - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll> { + fn poll(&mut self, _: &mut Context<'_>, _: &mut impl PollParameters) -> Poll> { Poll::Pending } diff --git a/swarm/src/keep_alive.rs b/swarm/src/keep_alive.rs index bd1ed812b8b..ee9618eb8c3 100644 --- a/swarm/src/keep_alive.rs +++ b/swarm/src/keep_alive.rs @@ -1,4 +1,4 @@ -use crate::behaviour::{FromSwarm, NetworkBehaviour, NetworkBehaviourAction, PollParameters}; +use crate::behaviour::{FromSwarm, NetworkBehaviour, PollParameters, ToSwarm}; use crate::handler::{ ConnectionEvent, ConnectionHandlerEvent, FullyNegotiatedInbound, FullyNegotiatedOutbound, KeepAlive, SubstreamProtocol, @@ -30,11 +30,7 @@ impl NetworkBehaviour for Behaviour { void::unreachable(event) } - fn poll( - &mut self, - _: &mut Context<'_>, - _: &mut impl PollParameters, - ) -> Poll> { + fn poll(&mut self, _: &mut Context<'_>, _: &mut impl PollParameters) -> Poll> { Poll::Pending } diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index a2ef648d6e6..05cf5cfd881 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -704,7 +704,7 @@ where /// order in which addresses are used to connect to) as well as /// how long the address is retained in the list, depending on /// how frequently it is reported by the `NetworkBehaviour` via - /// [`NetworkBehaviourAction::ReportObservedAddr`] or explicitly + /// [`ToSwarm::ReportObservedAddr`] or explicitly /// through this method. pub fn add_external_address(&mut self, a: Multiaddr, s: AddressScore) -> AddAddressResult { let result = self.external_addrs.add(a.clone(), s); @@ -2061,7 +2061,7 @@ mod tests { /// Establishes multiple connections between two peers, /// after which one peer disconnects the other - /// using [`NetworkBehaviourAction::CloseConnection`] returned by a [`NetworkBehaviour`]. + /// using [`ToSwarm::CloseConnection`] returned by a [`NetworkBehaviour`]. /// /// The test expects both behaviours to be notified via pairs of /// [`NetworkBehaviour::inject_connection_established`] / [`NetworkBehaviour::inject_connection_closed`] calls. @@ -2129,7 +2129,7 @@ mod tests { /// Establishes multiple connections between two peers, /// after which one peer closes a single connection - /// using [`NetworkBehaviourAction::CloseConnection`] returned by a [`NetworkBehaviour`]. + /// using [`ToSwarm::CloseConnection`] returned by a [`NetworkBehaviour`]. /// /// The test expects both behaviours to be notified via pairs of /// [`NetworkBehaviour::inject_connection_established`] / [`NetworkBehaviour::inject_connection_closed`] calls. diff --git a/swarm/src/test.rs b/swarm/src/test.rs index 94a5fbfef54..c632dac864f 100644 --- a/swarm/src/test.rs +++ b/swarm/src/test.rs @@ -20,7 +20,7 @@ use crate::behaviour::{ ConnectionClosed, ConnectionEstablished, DialFailure, ExpiredExternalAddr, ExpiredListenAddr, - FromSwarm, ListenerClosed, ListenerError, NewExternalAddr, NewListenAddr, NewListener, + FromSwarm, ListenerClosed, ListenerError, NewExternalAddr, NewListenAddr, NewListener, ToSwarm, }; use crate::{ ConnectionHandler, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction, @@ -80,11 +80,7 @@ where self.addresses.get(p).map_or(Vec::new(), |v| v.clone()) } - fn poll( - &mut self, - _: &mut Context, - _: &mut impl PollParameters, - ) -> Poll> { + fn poll(&mut self, _: &mut Context, _: &mut impl PollParameters) -> Poll> { self.next_action.take().map_or(Poll::Pending, Poll::Ready) } @@ -466,11 +462,7 @@ where self.inner.inject_event(p, c, e); } - fn poll( - &mut self, - cx: &mut Context, - args: &mut impl PollParameters, - ) -> Poll> { + fn poll(&mut self, cx: &mut Context, args: &mut impl PollParameters) -> Poll> { self.poll += 1; self.inner.poll(cx, args) } diff --git a/swarm/tests/swarm_derive.rs b/swarm/tests/swarm_derive.rs index 84fb3bf4683..115274ff794 100644 --- a/swarm/tests/swarm_derive.rs +++ b/swarm/tests/swarm_derive.rs @@ -21,6 +21,7 @@ use futures::StreamExt; use libp2p_identify as identify; use libp2p_ping as ping; +use libp2p_swarm::behaviour::ToSwarm; use libp2p_swarm::{behaviour::FromSwarm, dummy, NetworkBehaviour, SwarmEvent}; use std::fmt::Debug; @@ -372,9 +373,7 @@ fn generated_out_event_derive_debug() { fn custom_out_event_no_type_parameters() { use libp2p_core::connection::ConnectionId; use libp2p_core::PeerId; - use libp2p_swarm::{ - ConnectionHandler, IntoConnectionHandler, NetworkBehaviourAction, PollParameters, - }; + use libp2p_swarm::{ConnectionHandler, IntoConnectionHandler, PollParameters}; use std::task::Context; use std::task::Poll; @@ -399,11 +398,7 @@ fn custom_out_event_no_type_parameters() { void::unreachable(message); } - fn poll( - &mut self, - _ctx: &mut Context, - _: &mut impl PollParameters, - ) -> Poll> { + fn poll(&mut self, _ctx: &mut Context, _: &mut impl PollParameters) -> Poll> { Poll::Pending }