Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions client/finality-grandpa-warp-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! [`crate::request_responses::RequestResponsesBehaviour`].

use codec::Decode;
use sc_network::config::{ProtocolId, IncomingRequest, RequestResponseConfig};
use sc_network::config::{IncomingRequest, OutgoingResponse, ProtocolId, RequestResponseConfig};
use sc_client_api::Backend;
use sp_runtime::traits::NumberFor;
use futures::channel::{mpsc, oneshot};
Expand Down Expand Up @@ -113,7 +113,7 @@ impl<TBlock: BlockT, TBackend: Backend<TBlock>> GrandpaWarpSyncRequestHandler<TB
fn handle_request(
&self,
payload: Vec<u8>,
pending_response: oneshot::Sender<Vec<u8>>
pending_response: oneshot::Sender<OutgoingResponse>
) -> Result<(), HandleRequestError>
where NumberFor<TBlock>: sc_finality_grandpa::BlockNumberOps,
{
Expand All @@ -124,8 +124,10 @@ impl<TBlock: BlockT, TBackend: Backend<TBlock>> GrandpaWarpSyncRequestHandler<TB
self.backend.blockchain(), request.begin, Some(WARP_SYNC_FRAGMENTS_LIMIT), Some(&mut cache)
)?;

pending_response.send(response)
.map_err(|_| HandleRequestError::SendResponse)
pending_response.send(OutgoingResponse {
result: Ok(response),
reputation_changes: Vec::new(),
}).map_err(|_| HandleRequestError::SendResponse)
}

/// Run [`GrandpaWarpSyncRequestHandler`].
Expand Down
5 changes: 5 additions & 0 deletions client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,11 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<request_responses::Even
peer, protocol, duration, result,
});
},
request_responses::Event::ReputationChanges { peer, changes } => {
for change in changes {
self.substrate.report_peer(peer, change);
}
}
}
}
}
Expand Down
10 changes: 6 additions & 4 deletions client/network/src/block_request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use codec::{Encode, Decode};
use crate::chain::Client;
use crate::config::ProtocolId;
use crate::protocol::{message::BlockAttributes};
use crate::request_responses::{IncomingRequest, ProtocolConfig};
use crate::request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig};
use crate::schema::v1::block_request::FromBlock;
use crate::schema::v1::{BlockResponse, Direction};
use futures::channel::{mpsc, oneshot};
Expand Down Expand Up @@ -85,7 +85,7 @@ impl <B: BlockT> BlockRequestHandler<B> {
fn handle_request(
&self,
payload: Vec<u8>,
pending_response: oneshot::Sender<Vec<u8>>
pending_response: oneshot::Sender<OutgoingResponse>
) -> Result<(), HandleRequestError> {
let request = crate::schema::v1::BlockRequest::decode(&payload[..])?;

Expand Down Expand Up @@ -181,8 +181,10 @@ impl <B: BlockT> BlockRequestHandler<B> {
let mut data = Vec::with_capacity(res.encoded_len());
res.encode(&mut data)?;

pending_response.send(data)
.map_err(|_| HandleRequestError::SendResponse)
pending_response.send(OutgoingResponse {
result: Ok(data),
reputation_changes: Vec::new(),
}).map_err(|_| HandleRequestError::SendResponse)
}

/// Run [`BlockRequestHandler`].
Expand Down
6 changes: 5 additions & 1 deletion client/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@

pub use crate::chain::Client;
pub use crate::on_demand_layer::{AlwaysBadChecker, OnDemand};
pub use crate::request_responses::{IncomingRequest, ProtocolConfig as RequestResponseConfig};
pub use crate::request_responses::{
IncomingRequest,
OutgoingResponse,
ProtocolConfig as RequestResponseConfig,
};
pub use libp2p::{identity, core::PublicKey, wasm_ext::ExtTransport, build_multiaddr};

// Note: this re-export shouldn't be part of the public API of the crate and will be removed in
Expand Down
96 changes: 75 additions & 21 deletions client/network/src/request_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use std::{
pin::Pin, task::{Context, Poll}, time::Duration,
};
use wasm_timer::Instant;
use crate::ReputationChange;

pub use libp2p::request_response::{InboundFailure, OutboundFailure, RequestId};

Expand Down Expand Up @@ -114,8 +115,27 @@ pub struct IncomingRequest {
/// [`ProtocolConfig::max_request_size`].
pub payload: Vec<u8>,

/// Channel to send back the response to.
pub pending_response: oneshot::Sender<Vec<u8>>,
/// Channel to send back the response.
///
/// There are two ways to indicate that handling the request failed:
///
/// 1. Drop `pending_response` and thus not changing the reputation of the peer.
///
/// 2. Sending an `Err(())` via `pending_response`, optionally including reputation changes for
/// the given peer.
pub pending_response: oneshot::Sender<OutgoingResponse>,
}

/// Response for an incoming request to be send by a request protocol handler.
#[derive(Debug)]
pub struct OutgoingResponse {
/// The payload of the response.
///
/// `Err(())` if none is available e.g. due an error while handling the request.
pub result: Result<Vec<u8>, ()>,
/// Reputation changes accrued while handling the request. To be applied to the reputation of
/// the peer sending the request.
pub reputation_changes: Vec<ReputationChange>,
}

/// Event generated by the [`RequestResponsesBehaviour`].
Expand Down Expand Up @@ -150,6 +170,12 @@ pub enum Event {
/// Result of the request.
result: Result<(), RequestFailure>
},

/// A request protocol handler issued reputation changes for the given peer.
ReputationChanges {
peer: PeerId,
changes: Vec<ReputationChange>,
}
}

/// Combination of a protocol name and a request id.
Expand Down Expand Up @@ -198,10 +224,11 @@ pub struct RequestResponsesBehaviour {

/// Generated by the response builder and waiting to be processed.
struct RequestProcessingOutcome {
peer: PeerId,
request_id: RequestId,
protocol: Cow<'static, str>,
inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
response: Vec<u8>,
response: OutgoingResponse,
}

impl RequestResponsesBehaviour {
Expand Down Expand Up @@ -406,30 +433,45 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
// Poll to see if any response is ready to be sent back.
while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
let RequestProcessingOutcome {
peer,
request_id,
protocol: protocol_name,
inner_channel,
response
response: OutgoingResponse {
result,
reputation_changes,
},
} = match outcome {
Some(outcome) => outcome,
// The response builder was too busy and thus the request was dropped. This is
// The response builder was too busy or handling the request failed. This is
// later on reported as a `InboundFailure::Omission`.
None => continue,
};

if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) {
if let Err(_) = protocol.send_response(inner_channel, Ok(response)) {
// Note: Failure is handled further below when receiving `InboundFailure`
// event from `RequestResponse` behaviour.
log::debug!(
target: "sub-libp2p",
"Failed to send response for {:?} on protocol {:?} due to a \
timeout or due to the connection to the peer being closed. \
Dropping response",
request_id, protocol_name,
);
if let Ok(payload) = result {
if let Some((protocol, _)) = self.protocols.get_mut(&*protocol_name) {
if let Err(_) = protocol.send_response(inner_channel, Ok(payload)) {
// Note: Failure is handled further below when receiving
// `InboundFailure` event from `RequestResponse` behaviour.
log::debug!(
target: "sub-libp2p",
"Failed to send response for {:?} on protocol {:?} due to a \
timeout or due to the connection to the peer being closed. \
Dropping response",
request_id, protocol_name,
);
}
}
}

if !reputation_changes.is_empty() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(
Event::ReputationChanges{
peer,
changes: reputation_changes,
},
));
}
}

// Poll request-responses protocols.
Expand Down Expand Up @@ -505,7 +547,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
// `InboundFailure::Omission` event.
if let Ok(response) = rx.await {
Some(RequestProcessingOutcome {
request_id, protocol, inner_channel: channel, response
peer, request_id, protocol, inner_channel: channel, response
})
} else {
None
Expand Down Expand Up @@ -851,7 +893,10 @@ mod tests {
pool.spawner().spawn_obj(async move {
while let Some(rq) = rx.next().await {
assert_eq!(rq.payload, b"this is a request");
let _ = rq.pending_response.send(b"this is a response".to_vec());
let _ = rq.pending_response.send(super::OutgoingResponse {
result: Ok(b"this is a response".to_vec()),
reputation_changes: Vec::new(),
});
}
}.boxed().into()).unwrap();

Expand Down Expand Up @@ -934,7 +979,10 @@ mod tests {
pool.spawner().spawn_obj(async move {
while let Some(rq) = rx.next().await {
assert_eq!(rq.payload, b"this is a request");
let _ = rq.pending_response.send(b"this response exceeds the limit".to_vec());
let _ = rq.pending_response.send(super::OutgoingResponse {
result: Ok(b"this response exceeds the limit".to_vec()),
reputation_changes: Vec::new(),
});
}
}.boxed().into()).unwrap();

Expand Down Expand Up @@ -1100,11 +1148,17 @@ mod tests {

protocol_1_request.unwrap()
.pending_response
.send(b"this is a response".to_vec())
.send(OutgoingResponse {
result: Ok(b"this is a response".to_vec()),
reputation_changes: Vec::new(),
})
.unwrap();
protocol_2_request.unwrap()
.pending_response
.send(b"this is a response".to_vec())
.send(OutgoingResponse {
result: Ok(b"this is a response".to_vec()),
reputation_changes: Vec::new(),
})
.unwrap();
}.boxed().into()).unwrap();

Expand Down