Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(request-response): Report failure when streams are at capacity #5417

Merged
merged 10 commits into from
Jun 4, 2024
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ libp2p-pnet = { version = "0.24.0", path = "transports/pnet" }
libp2p-quic = { version = "0.10.3", path = "transports/quic" }
libp2p-relay = { version = "0.17.2", path = "protocols/relay" }
libp2p-rendezvous = { version = "0.14.0", path = "protocols/rendezvous" }
libp2p-request-response = { version = "0.26.2", path = "protocols/request-response" }
libp2p-request-response = { version = "0.27.0", path = "protocols/request-response" }
libp2p-server = { version = "0.12.7", path = "misc/server" }
libp2p-stream = { version = "0.1.0-alpha.1", path = "protocols/stream" }
libp2p-swarm = { version = "0.44.2", path = "swarm" }
Expand Down
5 changes: 5 additions & 0 deletions protocols/request-response/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.27.0

- Report outbound failure when max concurrent streams is reached.
See [PR 5417](https://github.com/libp2p/rust-libp2p/pull/5417).

## 0.26.2

- Deprecate `Behaviour::add_address` in favor of `Swarm::add_peer_address`.
Expand Down
2 changes: 1 addition & 1 deletion protocols/request-response/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "libp2p-request-response"
edition = "2021"
rust-version = { workspace = true }
description = "Generic Request/Response Protocols"
version = "0.26.2"
version = "0.27.0"
authors = ["Parity Technologies <[email protected]>"]
license = "MIT"
repository = "https://github.com/libp2p/rust-libp2p"
Expand Down
13 changes: 12 additions & 1 deletion protocols/request-response/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ where
}
};

// Inbound connections are reported to the upper layer from within the worker task,
// so by failing to schedule the worker means the upper layer will never know
// about the inbound request. Because of that we do not report any inbound failure.
if self
.worker_streams
.try_push(RequestId::Inbound(request_id), recv.boxed())
Expand Down Expand Up @@ -204,7 +207,8 @@ where
.try_push(RequestId::Outbound(request_id), send.boxed())
.is_err()
{
tracing::warn!("Dropping outbound stream because we are at capacity")
self.pending_events
.push_back(Event::OutboundMaxStreamsReached(request_id));
}
}

Expand Down Expand Up @@ -276,6 +280,9 @@ where
/// A response to an inbound request was omitted as a result
/// of dropping the response `sender` of an inbound `Request`.
ResponseOmission(InboundRequestId),
/// An outbound request could not be sent because maximum
/// concurrent streams reached.
OutboundMaxStreamsReached(OutboundRequestId),
/// An outbound request timed out while sending the request
/// or waiting for the response.
OutboundTimeout(OutboundRequestId),
Expand Down Expand Up @@ -320,6 +327,10 @@ impl<TCodec: Codec> fmt::Debug for Event<TCodec> {
.debug_tuple("Event::ResponseOmission")
.field(request_id)
.finish(),
Event::OutboundMaxStreamsReached(request_id) => f
.debug_tuple("Event::OutboundMaxStreamsReached")
.field(request_id)
.finish(),
Event::OutboundTimeout(request_id) => f
.debug_tuple("Event::OutboundTimeout")
.field(request_id)
Expand Down
14 changes: 14 additions & 0 deletions protocols/request-response/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ pub enum Event<TRequest, TResponse, TChannelResponse = TResponse> {
pub enum OutboundFailure {
/// The request could not be sent because a dialing attempt failed.
DialFailure,
/// The request could not be sent because maximum concurrent streams reached.
MaxStreamsReached,
jxs marked this conversation as resolved.
Show resolved Hide resolved
/// The request timed out before a response was received.
///
/// It is not known whether the request may have been
Expand All @@ -189,6 +191,7 @@ impl fmt::Display for OutboundFailure {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
OutboundFailure::DialFailure => write!(f, "Failed to dial the requested peer"),
OutboundFailure::MaxStreamsReached => write!(f, "Maximum concurrent streams reached"),
OutboundFailure::Timeout => write!(f, "Timeout while waiting for a response"),
OutboundFailure::ConnectionClosed => {
write!(f, "Connection was closed before a response was received")
Expand Down Expand Up @@ -877,6 +880,17 @@ where
error: InboundFailure::ResponseOmission,
}));
}
handler::Event::OutboundMaxStreamsReached(request_id) => {
let removed = self.remove_pending_outbound_response(&peer, connection, request_id);
debug_assert!(removed, "Expect request_id to be pending before failing.");

self.pending_events
.push_back(ToSwarm::GenerateEvent(Event::OutboundFailure {
peer,
request_id,
error: OutboundFailure::MaxStreamsReached,
}));
}
handler::Event::OutboundTimeout(request_id) => {
let removed = self.remove_pending_outbound_response(&peer, connection, request_id);
debug_assert!(
Expand Down
67 changes: 64 additions & 3 deletions protocols/request-response/tests/error_reporting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,58 @@ async fn report_outbound_timeout_on_read_response() {
futures::future::select(server_task, client_task).await;
}

#[async_std::test]
async fn report_outbound_failure_on_max_streams() {
let _ = tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.try_init();

// `swarm2` will be able to handle only 1 stream per time
jxs marked this conversation as resolved.
Show resolved Hide resolved
let swarm2_config = request_response::Config::default()
.with_request_timeout(Duration::from_millis(100))
.with_max_concurrent_streams(1);

let (peer1_id, mut swarm1) = new_swarm();
let (peer2_id, mut swarm2) = new_swarm_with_config(swarm2_config);

swarm1.listen().with_memory_addr_external().await;
swarm2.connect(&mut swarm1).await;

let swarm1_task = async move {
let _req_id = swarm1
.behaviour_mut()
.send_request(&peer2_id, Action::FailOnMaxStreams);

// Keep the connection alive, otherwise swarm2 may receive `ConnectionClosed` instead
jxs marked this conversation as resolved.
Show resolved Hide resolved
wait_no_events(&mut swarm1).await;
};

// Expects OutboundFailure::MaxStreamsReached failure
let swarm2_task = async move {
let (peer, _inbound_req_id, action, _resp_channel) =
wait_request(&mut swarm2).await.unwrap();
assert_eq!(peer, peer1_id);
assert_eq!(action, Action::FailOnMaxStreams);

// A task for sending back a response is already scheduled so max concurrent
// streams is reached and no new tasks can be sheduled.
//
// We produce the failure by creating new request before we response.
let outbound_req_id = swarm2
.behaviour_mut()
.send_request(&peer1_id, Action::FailOnMaxStreams);

let (peer, req_id_done, error) = wait_outbound_failure(&mut swarm2).await.unwrap();
assert_eq!(peer, peer1_id);
assert_eq!(req_id_done, outbound_req_id);
assert!(matches!(error, OutboundFailure::MaxStreamsReached));
};

let swarm1_task = pin!(swarm1_task);
let swarm2_task = pin!(swarm2_task);
futures::future::select(swarm1_task, swarm2_task).await;
}

#[async_std::test]
async fn report_inbound_failure_on_read_request() {
let _ = tracing_subscriber::fmt()
Expand Down Expand Up @@ -332,6 +384,7 @@ enum Action {
FailOnWriteRequest,
FailOnWriteResponse,
TimeoutOnWriteResponse,
FailOnMaxStreams,
}

impl From<Action> for u8 {
Expand All @@ -343,6 +396,7 @@ impl From<Action> for u8 {
Action::FailOnWriteRequest => 3,
Action::FailOnWriteResponse => 4,
Action::TimeoutOnWriteResponse => 5,
Action::FailOnMaxStreams => 6,
}
}
}
Expand All @@ -358,6 +412,7 @@ impl TryFrom<u8> for Action {
3 => Ok(Action::FailOnWriteRequest),
4 => Ok(Action::FailOnWriteResponse),
5 => Ok(Action::TimeoutOnWriteResponse),
6 => Ok(Action::FailOnMaxStreams),
_ => Err(io::Error::new(io::ErrorKind::Other, "invalid action")),
}
}
Expand Down Expand Up @@ -468,11 +523,10 @@ impl Codec for TestCodec {
}
}

fn new_swarm_with_timeout(
timeout: Duration,
fn new_swarm_with_config(
cfg: request_response::Config,
) -> (PeerId, Swarm<request_response::Behaviour<TestCodec>>) {
let protocols = iter::once((StreamProtocol::new("/test/1"), ProtocolSupport::Full));
let cfg = request_response::Config::default().with_request_timeout(timeout);

let swarm =
Swarm::new_ephemeral(|_| request_response::Behaviour::<TestCodec>::new(protocols, cfg));
Expand All @@ -481,6 +535,13 @@ fn new_swarm_with_timeout(
(peed_id, swarm)
}

fn new_swarm_with_timeout(
timeout: Duration,
) -> (PeerId, Swarm<request_response::Behaviour<TestCodec>>) {
let cfg = request_response::Config::default().with_request_timeout(timeout);
new_swarm_with_config(cfg)
}

fn new_swarm() -> (PeerId, Swarm<request_response::Behaviour<TestCodec>>) {
new_swarm_with_timeout(Duration::from_millis(100))
}
Expand Down
Loading