Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions protocols/request-response/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
- fix: public cbor/json codec module
See [PR 5830](https://github.com/libp2p/rust-libp2p/pull/5830).

- feat: add `Behaviour::send_request_with_addresses()`
See [PR 5938](https://github.com/libp2p/rust-libp2p/issues/5938).

## 0.28.0

- Deprecate `void` crate.
Expand Down
24 changes: 21 additions & 3 deletions protocols/request-response/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,9 +431,24 @@ where
/// > in another `NetworkBehaviour` that provides peer and
/// > address discovery, or known addresses of peers must be
/// > managed via [`libp2p_swarm::Swarm::add_peer_address`].
/// > Addresses are automatically removed when dial attempts
/// > to them fail.
/// > Alternatively, [`Behaviour::send_request_with_addresses`]
/// > can be used. Addresses are automatically removed when
/// > dial attempts to them fail.
Comment thread
nazar-pc marked this conversation as resolved.
Outdated
pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> OutboundRequestId {
self.send_request_with_addresses(peer, request, Vec::new())
}

/// Initiates sending a request, using provided addresses if a connection needs to be
/// established.
///
/// This is very similar to [`Behaviour::send_request`], but uses provided addresses when
/// dialing currently not connected peer.
Comment thread
nazar-pc marked this conversation as resolved.
Outdated
pub fn send_request_with_addresses(
&mut self,
peer: &PeerId,
request: TCodec::Request,
addresses: Vec<Multiaddr>,
) -> OutboundRequestId {
let request_id = self.next_outbound_request_id();
let request = OutboundMessage {
request_id,
Expand All @@ -443,7 +458,10 @@ where

if let Some(request) = self.try_send_request(peer, request) {
self.pending_events.push_back(ToSwarm::Dial {
opts: DialOpts::peer_id(*peer).build(),
opts: DialOpts::peer_id(*peer)
.addresses(addresses)
.extend_addresses_through_behaviour()
.build(),
});
self.pending_outbound_requests
.entry(*peer)
Expand Down
125 changes: 121 additions & 4 deletions protocols/request-response/tests/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@ async fn is_response_outbound() {

let mut swarm1 = Swarm::new_ephemeral(|_| {
request_response::cbor::Behaviour::<Ping, Pong>::new(
[(
StreamProtocol::new("/ping/1"),
request_response::ProtocolSupport::Full,
)],
[(StreamProtocol::new("/ping/1"), ProtocolSupport::Full)],
request_response::Config::default(),
)
});
Expand Down Expand Up @@ -180,6 +177,126 @@ async fn ping_protocol() {
peer2.await;
}

/// Exercises a simple ping protocol where peers are not connected prior to request sending.
#[async_std::test]
#[cfg(feature = "cbor")]
async fn ping_protocol_explicit_address() {
Comment thread
elenaf9 marked this conversation as resolved.
let ping = Ping("ping".to_string().into_bytes());
let pong = Pong("pong".to_string().into_bytes());

let protocols = iter::once((StreamProtocol::new("/ping/1"), ProtocolSupport::Full));
let cfg = request_response::Config::default();

let mut swarm1 = Swarm::new_ephemeral(|_| {
request_response::cbor::Behaviour::<Ping, Pong>::new(protocols.clone(), cfg.clone())
});
let peer1_id = *swarm1.local_peer_id();
let mut swarm2 = Swarm::new_ephemeral(|_| {
request_response::cbor::Behaviour::<Ping, Pong>::new(protocols, cfg)
});
let peer2_id = *swarm2.local_peer_id();

let (peer1_listen_addr, _) = swarm1.listen().with_memory_addr_external().await;

let expected_ping = ping.clone();
let expected_pong = pong.clone();

let peer1 = async move {
loop {
match swarm1.next_swarm_event().await.try_into_behaviour_event() {
Ok(request_response::Event::Message {
peer,
message:
request_response::Message::Request {
request, channel, ..
},
..
}) => {
assert_eq!(&request, &expected_ping);
assert_eq!(&peer, &peer2_id);
swarm1
.behaviour_mut()
.send_response(channel, pong.clone())
.unwrap();
}
Ok(request_response::Event::ResponseSent { peer, .. }) => {
assert_eq!(&peer, &peer2_id);
}
Ok(e) => {
panic!("Peer1: Unexpected event: {e:?}")
}
Err(..) => {}
}
}
};

let peer2 = async {
let req_id = swarm2.behaviour_mut().send_request(&peer1_id, ping.clone());
assert!(swarm2.behaviour().is_pending_outbound(&peer1_id, &req_id));

// Can't dial to unknown peer
match swarm2
.next_swarm_event()
.await
.try_into_behaviour_event()
.unwrap()
{
request_response::Event::OutboundFailure {
peer, request_id, ..
} => {
assert_eq!(&peer, &peer1_id);
assert_eq!(req_id, request_id);
}
e => panic!("Peer2: Unexpected event: {e:?}"),
}

let req_id = swarm2.behaviour_mut().send_request_with_addresses(
&peer1_id,
ping.clone(),
vec![peer1_listen_addr],
);
assert!(swarm2.behaviour().is_pending_outbound(&peer1_id, &req_id));

// Dial to peer with explicit address succeeds
match swarm2.select_next_some().await {
SwarmEvent::Dialing { peer_id, .. } => {
assert_eq!(&peer_id, &Some(peer1_id));
}
e => panic!("Peer2: Unexpected event: {e:?}"),
}
match swarm2.select_next_some().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
assert_eq!(&peer_id, &peer1_id);
}
e => panic!("Peer2: Unexpected event: {e:?}"),
}
match swarm2
.next_swarm_event()
.await
.try_into_behaviour_event()
.unwrap()
{
request_response::Event::Message {
peer,
message:
request_response::Message::Response {
request_id,
response,
},
..
} => {
assert_eq!(&response, &expected_pong);
assert_eq!(&peer, &peer1_id);
assert_eq!(req_id, request_id);
}
e => panic!("Peer2: Unexpected event: {e:?}"),
}
};

async_std::task::spawn(Box::pin(peer1));
peer2.await;
}

#[async_std::test]
#[cfg(feature = "cbor")]
async fn emits_inbound_connection_closed_failure() {
Expand Down