Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ jobs:
cache-all-crates: true

- name: Run clippy
run: cargo clippy --all-features
run: cargo clippy --all-features -- -D warnings

test:
name: Test
Expand Down
10 changes: 4 additions & 6 deletions examples/custom_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,10 @@ async fn main() {
tokio::select! {
_ = executor2.next() => {}
_ = litep2p2.next_event() => {},
event = ping_event_stream2.next() => match event {
Some(PingEvent::Ping { peer, ping }) => tracing::info!(
"ping time with {peer:?}: {ping:?}"
),
_ => {}
}
event = ping_event_stream2.next() =>
if let Some(PingEvent::Ping { peer, ping }) = event {
tracing::info!("ping time with {peer:?}: {ping:?}")
}
}
}
}
16 changes: 7 additions & 9 deletions examples/echo_notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,21 @@ async fn client_event_loop(mut litep2p: Litep2p, mut handle: NotificationHandle,
loop {
tokio::select! {
_ = litep2p.next_event() => {}
event = handle.next() => match event.unwrap() {
NotificationEvent::NotificationStreamOpened { .. } => break,
_ => {},
}
event = handle.next() =>
if let NotificationEvent::NotificationStreamOpened { .. } = event.unwrap() {
break
}
}
}

// after the substream is open, send notification to server and print the response to stdout
loop {
tokio::select! {
_ = litep2p.next_event() => {}
event = handle.next() => match event.unwrap() {
NotificationEvent::NotificationReceived { peer, notification } => {
event = handle.next() =>
if let NotificationEvent::NotificationReceived { peer, notification } = event.unwrap() {
println!("received response from server ({peer:?}): {notification:?}");
}
_ => {},
},
},
_ = tokio::time::sleep(Duration::from_secs(3)) => {
handle.send_sync_notification(peer, vec![1, 3, 3, 7]).unwrap();
}
Expand Down
4 changes: 2 additions & 2 deletions src/addresses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use crate::PeerId;
///
/// # Note
///
/// - The addresses are reported to the identify protocol and are used by other nodes
/// to establish a connection with the local node.
/// - The addresses are reported to the identify protocol and are used by other nodes to establish a
/// connection with the local node.
///
/// - Users must ensure that the addresses are reachable from the network.
#[derive(Debug, Clone)]
Expand Down
2 changes: 1 addition & 1 deletion src/codec/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ mod tests {
#[test]
fn decoding_smaller_payloads() {
let mut codec = Identity::new(100);
let bytes = vec![3u8; 64];
let bytes = [3u8; 64];
let mut bytes = BytesMut::from(&bytes[..]);

let decoded = codec.decode(&mut bytes);
Expand Down
3 changes: 1 addition & 2 deletions src/crypto/noise/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -679,9 +679,8 @@ fn parse_and_verify_peer_id(
let identity = payload.identity_key.ok_or(NegotiationError::PeerIdMissing)?;
let remote_public_key = PublicKey::from_protobuf_encoding(&identity)?;
let remote_key_signature =
payload.identity_sig.ok_or(NegotiationError::BadSignature).map_err(|err| {
payload.identity_sig.ok_or(NegotiationError::BadSignature).inspect_err(|_err| {
tracing::debug!(target: LOG_TARGET, "payload without signature");
err
})?;

let peer_id = PeerId::from_public_key(&remote_public_key);
Expand Down
19 changes: 5 additions & 14 deletions src/mock/substream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,38 +102,29 @@ impl DummySubstream {
impl Sink<bytes::Bytes> for DummySubstream {
type Error = SubstreamError;

fn poll_ready<'a>(
self: Pin<&mut Self>,
_cx: &mut Context<'a>,
) -> Poll<Result<(), SubstreamError>> {
fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), SubstreamError>> {
Poll::Pending
}

fn start_send(self: Pin<&mut Self>, _item: bytes::Bytes) -> Result<(), SubstreamError> {
Ok(())
}

fn poll_flush<'a>(
self: Pin<&mut Self>,
_cx: &mut Context<'a>,
) -> Poll<Result<(), SubstreamError>> {
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), SubstreamError>> {
Poll::Pending
}

fn poll_close<'a>(
self: Pin<&mut Self>,
_cx: &mut Context<'a>,
) -> Poll<Result<(), SubstreamError>> {
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), SubstreamError>> {
Poll::Ready(Ok(()))
}
}

impl Stream for DummySubstream {
type Item = Result<BytesMut, SubstreamError>;

fn poll_next<'a>(
fn poll_next(
self: Pin<&mut Self>,
_cx: &mut Context<'a>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<BytesMut, SubstreamError>>> {
Poll::Pending
}
Expand Down
38 changes: 17 additions & 21 deletions src/multistream_select/dialer_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ mod tests {
futures_ringbuf::Endpoint::pair(100, 100);

let server = tokio::spawn(async move {
let protos = vec!["/proto2"];
let protos = ["/proto2"];

let multistream = b"/multistream/1.0.0\n";
let len = multistream.len();
Expand Down Expand Up @@ -638,7 +638,7 @@ mod tests {
let (client_connection, mut server_connection) = futures_ringbuf::Endpoint::pair(100, 100);

let server = tokio::spawn(async move {
let protos = vec!["/proto2"];
let protos = ["/proto2"];

let multistream = b"/multistream/1.0.0\n";
let len = multistream.len();
Expand Down Expand Up @@ -695,7 +695,7 @@ mod tests {
let (client_connection, mut server_connection) = futures_ringbuf::Endpoint::pair(100, 100);

let server = tokio::spawn(async move {
let protos = vec!["/proto2"];
let protos = ["/proto2"];

let multistream = b"/multistream/1.0.0\n";
let len = multistream.len();
Expand Down Expand Up @@ -809,7 +809,7 @@ mod tests {
// send only header line
let mut bytes = BytesMut::with_capacity(32);
let message = Message::Header(HeaderLine::V1);
let _ = message.encode(&mut bytes).map_err(|_| Error::InvalidData).unwrap();
message.encode(&mut bytes).map_err(|_| Error::InvalidData).unwrap();

let (mut dialer_state, _message) =
WebRtcDialerState::propose(ProtocolName::from("/13371338/proto/1"), vec![]).unwrap();
Expand All @@ -828,7 +828,7 @@ mod tests {
Protocol::try_from(&b"/13371338/proto/1"[..]).unwrap(),
Protocol::try_from(&b"/sup/proto/1"[..]).unwrap(),
]);
let _ = message.encode(&mut bytes).map_err(|_| Error::InvalidData).unwrap();
message.encode(&mut bytes).map_err(|_| Error::InvalidData).unwrap();

let (mut dialer_state, _message) =
WebRtcDialerState::propose(ProtocolName::from("/13371338/proto/1"), vec![]).unwrap();
Expand All @@ -841,12 +841,9 @@ mod tests {

#[test]
fn negotiate_main_protocol() {
let message = webrtc_encode_multistream_message(
vec![Message::Protocol(
Protocol::try_from(&b"/13371338/proto/1"[..]).unwrap(),
)]
.into_iter(),
)
let message = webrtc_encode_multistream_message(vec![Message::Protocol(
Protocol::try_from(&b"/13371338/proto/1"[..]).unwrap(),
)])
.unwrap()
.freeze();

Expand All @@ -857,20 +854,18 @@ mod tests {
.unwrap();

match dialer_state.register_response(message.to_vec()) {
Ok(HandshakeResult::Succeeded(negotiated)) =>
assert_eq!(negotiated, ProtocolName::from("/13371338/proto/1")),
Ok(HandshakeResult::Succeeded(negotiated)) => {
assert_eq!(negotiated, ProtocolName::from("/13371338/proto/1"))
}
event => panic!("invalid event {event:?}"),
}
}

#[test]
fn negotiate_fallback_protocol() {
let message = webrtc_encode_multistream_message(
vec![Message::Protocol(
Protocol::try_from(&b"/sup/proto/1"[..]).unwrap(),
)]
.into_iter(),
)
let message = webrtc_encode_multistream_message(vec![Message::Protocol(
Protocol::try_from(&b"/sup/proto/1"[..]).unwrap(),
)])
.unwrap()
.freeze();

Expand All @@ -881,8 +876,9 @@ mod tests {
.unwrap();

match dialer_state.register_response(message.to_vec()) {
Ok(HandshakeResult::Succeeded(negotiated)) =>
assert_eq!(negotiated, ProtocolName::from("/sup/proto/1")),
Ok(HandshakeResult::Succeeded(negotiated)) => {
assert_eq!(negotiated, ProtocolName::from("/sup/proto/1"))
}
_ => panic!("invalid event"),
}
}
Expand Down
34 changes: 14 additions & 20 deletions src/multistream_select/listener_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,20 +405,17 @@ mod tests {

#[test]
fn webrtc_listener_negotiate_works() {
let mut local_protocols = vec![
let mut local_protocols = [
ProtocolName::from("/13371338/proto/1"),
ProtocolName::from("/sup/proto/1"),
ProtocolName::from("/13371338/proto/2"),
ProtocolName::from("/13371338/proto/3"),
ProtocolName::from("/13371338/proto/4"),
];
let message = webrtc_encode_multistream_message(
vec![
Message::Protocol(Protocol::try_from(&b"/13371338/proto/1"[..]).unwrap()),
Message::Protocol(Protocol::try_from(&b"/sup/proto/1"[..]).unwrap()),
]
.into_iter(),
)
let message = webrtc_encode_multistream_message(vec![
Message::Protocol(Protocol::try_from(&b"/13371338/proto/1"[..]).unwrap()),
Message::Protocol(Protocol::try_from(&b"/sup/proto/1"[..]).unwrap()),
])
.unwrap()
.freeze();

Expand All @@ -433,7 +430,7 @@ mod tests {

#[test]
fn invalid_message() {
let mut local_protocols = vec![
let mut local_protocols = [
ProtocolName::from("/13371338/proto/1"),
ProtocolName::from("/sup/proto/1"),
ProtocolName::from("/13371338/proto/2"),
Expand All @@ -455,7 +452,7 @@ mod tests {

#[test]
fn only_header_line_received() {
let mut local_protocols = vec![
let mut local_protocols = [
ProtocolName::from("/13371338/proto/1"),
ProtocolName::from("/sup/proto/1"),
ProtocolName::from("/13371338/proto/2"),
Expand All @@ -466,7 +463,7 @@ mod tests {
// send only header line
let mut bytes = BytesMut::with_capacity(32);
let message = Message::Header(HeaderLine::V1);
let _ = message.encode(&mut bytes).map_err(|_| Error::InvalidData).unwrap();
message.encode(&mut bytes).map_err(|_| Error::InvalidData).unwrap();

match webrtc_listener_negotiate(&mut local_protocols.iter(), bytes.freeze()) {
Err(error) => assert!(std::matches!(
Expand All @@ -481,7 +478,7 @@ mod tests {

#[test]
fn header_line_missing() {
let mut local_protocols = vec![
let mut local_protocols = [
ProtocolName::from("/13371338/proto/1"),
ProtocolName::from("/sup/proto/1"),
ProtocolName::from("/13371338/proto/2"),
Expand All @@ -495,7 +492,7 @@ mod tests {
Protocol::try_from(&b"/13371338/proto/1"[..]).unwrap(),
Protocol::try_from(&b"/sup/proto/1"[..]).unwrap(),
]);
let _ = message.encode(&mut bytes).map_err(|_| Error::InvalidData).unwrap();
message.encode(&mut bytes).map_err(|_| Error::InvalidData).unwrap();

match webrtc_listener_negotiate(&mut local_protocols.iter(), bytes.freeze()) {
Err(error) => assert!(std::matches!(
Expand All @@ -510,19 +507,16 @@ mod tests {

#[test]
fn protocol_not_supported() {
let mut local_protocols = vec![
let mut local_protocols = [
ProtocolName::from("/13371338/proto/1"),
ProtocolName::from("/sup/proto/1"),
ProtocolName::from("/13371338/proto/2"),
ProtocolName::from("/13371338/proto/3"),
ProtocolName::from("/13371338/proto/4"),
];
let message = webrtc_encode_multistream_message(
vec![Message::Protocol(
Protocol::try_from(&b"/13371339/proto/1"[..]).unwrap(),
)]
.into_iter(),
)
let message = webrtc_encode_multistream_message(vec![Message::Protocol(
Protocol::try_from(&b"/13371339/proto/1"[..]).unwrap(),
)])
.unwrap()
.freeze();

Expand Down
2 changes: 1 addition & 1 deletion src/peer_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl<'de> Deserialize<'de> for PeerId {

struct PeerIdVisitor;

impl<'de> Visitor<'de> for PeerIdVisitor {
impl Visitor<'_> for PeerIdVisitor {
type Value = PeerId;

fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl ConnectionHandle {
protocol: protocol.clone(),
fallback_names,
substream_id,
connection_id: self.connection_id.clone(),
connection_id: self.connection_id,
permit,
})
.map_err(|error| match error {
Expand Down
4 changes: 2 additions & 2 deletions src/protocol/libp2p/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ impl Identify {
"outbound substream opened"
);

let local_peer_id = self.local_peer_id.clone();
let local_peer_id = self.local_peer_id;

self.pending_outbound.push(Box::pin(async move {
let payload =
Expand Down Expand Up @@ -484,7 +484,7 @@ mod tests {
// Create two instances of litep2p
let (mut litep2p1, mut event_stream1, peer1) = create_litep2p();
let (mut litep2p2, mut event_stream2, _peer2) = create_litep2p();
let litep2p1_address = litep2p1.listen_addresses().into_iter().next().unwrap();
let litep2p1_address = litep2p1.listen_addresses().next().unwrap();

let multiaddr: Multiaddr = "/ip6/::9/tcp/111".parse().unwrap();
// Litep2p1 is now reporting the new address.
Expand Down
4 changes: 2 additions & 2 deletions src/protocol/libp2p/kademlia/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ mod tests {
let mut iter = bucket.closest_iter(&target);
let mut prev = None;

while let Some(node) = iter.next() {
for node in iter {
if let Some(distance) = prev {
assert!(distance < target.distance(&node.key));
}
Expand Down Expand Up @@ -177,7 +177,7 @@ mod tests {
let mut prev = None;
let mut num_peers = 0usize;

while let Some(node) = iter.next() {
for node in iter {
if let Some(distance) = prev {
assert!(distance < target.distance(&node.key));
}
Expand Down
11 changes: 5 additions & 6 deletions src/protocol/libp2p/kademlia/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,11 @@ impl QueryExecutor {
self.futures.push(Box::pin(async move {
match tokio::time::timeout(WRITE_TIMEOUT, substream.send_framed(message)).await {
// Timeout error.
Err(_) =>
return QueryContext {
peer,
query_id: None,
result: QueryResult::Timeout,
},
Err(_) => QueryContext {
peer,
query_id: None,
result: QueryResult::Timeout,
},
// Writing message to substream failed.
Ok(Err(_)) => QueryContext {
peer,
Expand Down
Loading
Loading