Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions protocols/identify/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

- Update to `libp2p-swarm` `v0.37.0`.

- Extend log message on second identify push stream with peer ID.

# 0.36.1

- Allow at most one inbound identify push stream.
Expand Down
40 changes: 35 additions & 5 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,48 @@ use futures::prelude::*;
use futures_timer::Delay;
use libp2p_core::either::{EitherError, EitherOutput};
use libp2p_core::upgrade::{EitherUpgrade, InboundUpgrade, OutboundUpgrade, SelectUpgrade};
use libp2p_core::{ConnectedPoint, PeerId};
use libp2p_swarm::{
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive,
NegotiatedSubstream, SubstreamProtocol,
ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, IntoConnectionHandler,
KeepAlive, NegotiatedSubstream, SubstreamProtocol,
};
use log::warn;
use smallvec::SmallVec;
use std::{io, pin::Pin, task::Context, task::Poll, time::Duration};

pub struct IdentifyHandlerProto {
initial_delay: Duration,
interval: Duration,
}

impl IdentifyHandlerProto {
pub fn new(initial_delay: Duration, interval: Duration) -> Self {
IdentifyHandlerProto {
initial_delay,
interval,
}
}
}

impl IntoConnectionHandler for IdentifyHandlerProto {
type Handler = IdentifyHandler;

fn into_handler(self, remote_peer_id: &PeerId, _endpoint: &ConnectedPoint) -> Self::Handler {
IdentifyHandler::new(self.initial_delay, self.interval, *remote_peer_id)
}

fn inbound_protocol(&self) -> <Self::Handler as ConnectionHandler>::InboundProtocol {
SelectUpgrade::new(IdentifyProtocol, IdentifyPushProtocol::inbound())
}
}

/// Protocol handler for sending and receiving identification requests.
///
/// Outbound requests are sent periodically. The handler performs expects
/// at least one identification request to be answered by the remote before
/// permitting the underlying connection to be closed.
pub struct IdentifyHandler {
remote_peer_id: PeerId,
inbound_identify_push: Option<BoxFuture<'static, Result<IdentifyInfo, UpgradeError>>>,
/// Pending events to yield.
events: SmallVec<
Expand Down Expand Up @@ -81,8 +109,9 @@ pub struct IdentifyPush(pub IdentifyInfo);

impl IdentifyHandler {
/// Creates a new `IdentifyHandler`.
pub fn new(initial_delay: Duration, interval: Duration) -> Self {
pub fn new(initial_delay: Duration, interval: Duration, remote_peer_id: PeerId) -> Self {
IdentifyHandler {
remote_peer_id,
inbound_identify_push: Default::default(),
events: SmallVec::new(),
trigger_next_identify: Delay::new(initial_delay),
Expand Down Expand Up @@ -120,8 +149,9 @@ impl ConnectionHandler for IdentifyHandler {
EitherOutput::Second(fut) => {
if self.inbound_identify_push.replace(fut).is_some() {
warn!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated: We might want to upgrade to tracing at some point so we can emit these values in a structured way.

Or we wait until log has a structured key-value support.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 for structured logging, though not something I will focus on any time soon. Don't have an opinion on crates providing structured logging.

"New inbound identify push stream while still upgrading previous one. \
Replacing previous with new.",
"New inbound identify push stream from {} while still \
upgrading previous one. Replacing previous with new.",
self.remote_peer_id,
);
}
}
Expand Down
10 changes: 5 additions & 5 deletions protocols/identify/src/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::handler::{IdentifyHandler, IdentifyHandlerEvent, IdentifyPush};
use crate::handler::{IdentifyHandlerEvent, IdentifyHandlerProto, IdentifyPush};
use crate::protocol::{IdentifyInfo, ReplySubstream, UpgradeError};
use futures::prelude::*;
use libp2p_core::{
Expand Down Expand Up @@ -54,7 +54,7 @@ pub struct Identify {
/// Pending replies to send.
pending_replies: VecDeque<Reply>,
/// Pending events to be emitted when polled.
events: VecDeque<NetworkBehaviourAction<IdentifyEvent, IdentifyHandler>>,
events: VecDeque<NetworkBehaviourAction<IdentifyEvent, IdentifyHandlerProto>>,
/// Peers to which an active push with current information about
/// the local peer should be sent.
pending_push: HashSet<PeerId>,
Expand Down Expand Up @@ -208,11 +208,11 @@ impl Identify {
}

impl NetworkBehaviour for Identify {
type ConnectionHandler = IdentifyHandler;
type ConnectionHandler = IdentifyHandlerProto;
type OutEvent = IdentifyEvent;

fn new_handler(&mut self) -> Self::ConnectionHandler {
IdentifyHandler::new(self.config.initial_delay, self.config.interval)
IdentifyHandlerProto::new(self.config.initial_delay, self.config.interval)
}

fn inject_connection_established(
Expand Down Expand Up @@ -296,7 +296,7 @@ impl NetworkBehaviour for Identify {
&mut self,
peer_id: PeerId,
connection: ConnectionId,
event: <Self::ConnectionHandler as ConnectionHandler>::OutEvent,
event: <<Self::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent,
) {
match event {
IdentifyHandlerEvent::Identified(mut info) => {
Expand Down