diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs
index 7b1a35354c913..a34f6e0960c47 100644
--- a/client/network/src/behaviour.rs
+++ b/client/network/src/behaviour.rs
@@ -17,14 +17,15 @@
// along with this program. If not, see .
use crate::{
- config::{ProtocolId, Role}, light_client_handler, peer_info, request_responses,
+ config::{ProtocolId, Role},
discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
protocol::{message::Roles, CustomMessageOutcome, NotificationsSink, Protocol},
+ peer_info, request_responses, light_client_requests,
ObservedRole, DhtEvent, ExHashT,
};
use bytes::Bytes;
-use futures::channel::oneshot;
+use futures::{channel::oneshot, stream::StreamExt};
use libp2p::NetworkBehaviour;
use libp2p::core::{Multiaddr, PeerId, PublicKey};
use libp2p::identify::IdentifyInfo;
@@ -59,8 +60,6 @@ pub struct Behaviour {
discovery: DiscoveryBehaviour,
/// Generic request-reponse protocols.
request_responses: request_responses::RequestResponsesBehaviour,
- /// Light client request handling.
- light_client_handler: light_client_handler::LightClientHandler,
/// Queue of events to produce for the outside.
#[behaviour(ignore)]
@@ -70,6 +69,10 @@ pub struct Behaviour {
#[behaviour(ignore)]
role: Role,
+ /// Light client request handling.
+ #[behaviour(ignore)]
+ light_client_request_sender: light_client_requests::sender::LightClientRequestSender,
+
/// Protocol name used to send out block requests via
/// [`request_responses::RequestResponsesBehaviour`].
#[behaviour(ignore)]
@@ -174,10 +177,10 @@ impl Behaviour {
role: Role,
user_agent: String,
local_public_key: PublicKey,
- light_client_handler: light_client_handler::LightClientHandler,
+ light_client_request_sender: light_client_requests::sender::LightClientRequestSender,
disco_config: DiscoveryConfig,
- // Block request protocol config.
block_request_protocol_config: request_responses::ProtocolConfig,
+ light_client_request_protocol_config: request_responses::ProtocolConfig,
// All remaining request protocol configs.
mut request_response_protocols: Vec,
) -> Result {
@@ -185,13 +188,15 @@ impl Behaviour {
let block_request_protocol_name = block_request_protocol_config.name.to_string();
request_response_protocols.push(block_request_protocol_config);
+ request_response_protocols.push(light_client_request_protocol_config);
+
Ok(Behaviour {
substrate,
peer_info: peer_info::PeerInfoBehaviour::new(user_agent, local_public_key),
discovery: disco_config.finish(),
request_responses:
request_responses::RequestResponsesBehaviour::new(request_response_protocols.into_iter())?,
- light_client_handler,
+ light_client_request_sender,
events: VecDeque::new(),
role,
@@ -268,8 +273,11 @@ impl Behaviour {
}
/// Issue a light client request.
- pub fn light_client_request(&mut self, r: light_client_handler::Request) -> Result<(), light_client_handler::Error> {
- self.light_client_handler.request(r)
+ pub fn light_client_request(
+ &mut self,
+ r: light_client_requests::sender::Request,
+ ) -> Result<(), light_client_requests::sender::SendRequestError> {
+ self.light_client_request_sender.request(r)
}
}
@@ -289,13 +297,6 @@ fn reported_roles_to_observed_role(local_role: &Role, remote: &PeerId, roles: Ro
}
}
-impl NetworkBehaviourEventProcess for
-Behaviour {
- fn inject_event(&mut self, event: void::Void) {
- void::unreachable(event)
- }
-}
-
impl NetworkBehaviourEventProcess> for
Behaviour {
fn inject_event(&mut self, event: CustomMessageOutcome) {
@@ -343,12 +344,16 @@ Behaviour {
self.events.push_back(BehaviourOut::NotificationsReceived { remote, messages });
},
CustomMessageOutcome::PeerNewBest(peer_id, number) => {
- self.light_client_handler.update_best_block(&peer_id, number);
+ self.light_client_request_sender.update_best_block(&peer_id, number);
+ }
+ CustomMessageOutcome::SyncConnected(peer_id) => {
+ self.light_client_request_sender.inject_connected(peer_id);
+ self.events.push_back(BehaviourOut::SyncConnected(peer_id))
+ }
+ CustomMessageOutcome::SyncDisconnected(peer_id) => {
+ self.light_client_request_sender.inject_disconnected(peer_id);
+ self.events.push_back(BehaviourOut::SyncDisconnected(peer_id))
}
- CustomMessageOutcome::SyncConnected(peer_id) =>
- self.events.push_back(BehaviourOut::SyncConnected(peer_id)),
- CustomMessageOutcome::SyncDisconnected(peer_id) =>
- self.events.push_back(BehaviourOut::SyncDisconnected(peer_id)),
CustomMessageOutcome::None => {}
}
}
@@ -443,7 +448,20 @@ impl NetworkBehaviourEventProcess
}
impl Behaviour {
- fn poll(&mut self, _: &mut Context, _: &mut impl PollParameters) -> Poll>> {
+ fn poll(
+ &mut self,
+ cx: &mut Context,
+ _: &mut impl PollParameters,
+ ) -> Poll>> {
+ use light_client_requests::sender::OutEvent;
+ while let Poll::Ready(Some(event)) = self.light_client_request_sender.poll_next_unpin(cx) {
+ match event {
+ OutEvent::SendRequest { target, request, pending_response, protocol_name } => {
+ self.request_responses.send_request(&target, &protocol_name, request, pending_response)
+ }
+ }
+ }
+
if let Some(event) = self.events.pop_front() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event))
}
diff --git a/client/network/src/block_request_handler.rs b/client/network/src/block_request_handler.rs
index 1a6c09eff1303..92f21f44f9d1c 100644
--- a/client/network/src/block_request_handler.rs
+++ b/client/network/src/block_request_handler.rs
@@ -39,7 +39,7 @@ const MAX_BLOCKS_IN_RESPONSE: usize = 128;
const MAX_BODY_BYTES: usize = 8 * 1024 * 1024;
/// Generates a [`ProtocolConfig`] for the block request protocol, refusing incoming requests.
-pub fn generate_protocol_config(protocol_id: ProtocolId) -> ProtocolConfig {
+pub fn generate_protocol_config(protocol_id: &ProtocolId) -> ProtocolConfig {
ProtocolConfig {
name: generate_protocol_name(protocol_id).into(),
max_request_size: 1024 * 1024,
@@ -50,7 +50,10 @@ pub fn generate_protocol_config(protocol_id: ProtocolId) -> ProtocolConfig {
}
/// Generate the block protocol name from chain specific protocol identifier.
-fn generate_protocol_name(protocol_id: ProtocolId) -> String {
+//
+// Visibility `pub(crate)` to allow `crate::light_client_requests::sender` to generate block request
+// protocol name and send block requests.
+pub(crate) fn generate_protocol_name(protocol_id: &ProtocolId) -> String {
let mut s = String::new();
s.push_str("/");
s.push_str(protocol_id.as_ref());
@@ -66,7 +69,7 @@ pub struct BlockRequestHandler {
impl BlockRequestHandler {
/// Create a new [`BlockRequestHandler`].
- pub fn new(protocol_id: ProtocolId, client: Arc>) -> (Self, ProtocolConfig) {
+ pub fn new(protocol_id: &ProtocolId, client: Arc>) -> (Self, ProtocolConfig) {
// Rate of arrival multiplied with the waiting time in the queue equals the queue length.
//
// An average Polkadot sentry node serves less than 5 requests per second. The 95th percentile
@@ -82,6 +85,22 @@ impl BlockRequestHandler {
(Self { client, request_receiver }, protocol_config)
}
+ /// Run [`BlockRequestHandler`].
+ pub async fn run(mut self) {
+ while let Some(request) = self.request_receiver.next().await {
+ let IncomingRequest { peer, payload, pending_response } = request;
+
+ match self.handle_request(payload, pending_response) {
+ Ok(()) => debug!(target: LOG_TARGET, "Handled block request from {}.", peer),
+ Err(e) => debug!(
+ target: LOG_TARGET,
+ "Failed to handle block request from {}: {}",
+ peer, e,
+ ),
+ }
+ }
+ }
+
fn handle_request(
&self,
payload: Vec,
@@ -186,22 +205,6 @@ impl BlockRequestHandler {
reputation_changes: Vec::new(),
}).map_err(|_| HandleRequestError::SendResponse)
}
-
- /// Run [`BlockRequestHandler`].
- pub async fn run(mut self) {
- while let Some(request) = self.request_receiver.next().await {
- let IncomingRequest { peer, payload, pending_response } = request;
-
- match self.handle_request(payload, pending_response) {
- Ok(()) => debug!(target: LOG_TARGET, "Handled block request from {}.", peer),
- Err(e) => debug!(
- target: LOG_TARGET,
- "Failed to handle block request from {}: {}",
- peer, e,
- ),
- }
- }
- }
}
#[derive(derive_more::Display, derive_more::From)]
diff --git a/client/network/src/config.rs b/client/network/src/config.rs
index 29d238c368a78..3eb53dabf0459 100644
--- a/client/network/src/config.rs
+++ b/client/network/src/config.rs
@@ -111,6 +111,14 @@ pub struct Params {
/// [`block_request_handler::BlockRequestHandler::new`] allowing both outgoing and incoming
/// requests.
pub block_request_protocol_config: RequestResponseConfig,
+
+ /// Request response configuration for the light client request protocol.
+ ///
+ /// Can be constructed either via [`light_client_requests::generate_protocol_config`] allowing
+ /// outgoing but not incoming requests, or constructed via
+ /// [`light_client_requests::handler::LightClientRequestHandler::new`] allowing both outgoing
+ /// and incoming requests.
+ pub light_client_request_protocol_config: RequestResponseConfig,
}
/// Role of the local node.
diff --git a/client/network/src/gossip/tests.rs b/client/network/src/gossip/tests.rs
index e0941357e8441..c0b8c5e730a11 100644
--- a/client/network/src/gossip/tests.rs
+++ b/client/network/src/gossip/tests.rs
@@ -17,6 +17,7 @@
// along with this program. If not, see .
use crate::block_request_handler::BlockRequestHandler;
+use crate::light_client_requests::handler::LightClientRequestHandler;
use crate::gossip::QueuedSender;
use crate::{config, Event, NetworkService, NetworkWorker};
@@ -96,7 +97,16 @@ fn build_test_full_node(network_config: config::NetworkConfiguration)
let block_request_protocol_config = {
let (handler, protocol_config) = BlockRequestHandler::new(
- protocol_id.clone(),
+ &protocol_id,
+ client.clone(),
+ );
+ async_std::task::spawn(handler.run().boxed());
+ protocol_config
+ };
+
+ let light_client_request_protocol_config = {
+ let (handler, protocol_config) = LightClientRequestHandler::new(
+ &protocol_id,
client.clone(),
);
async_std::task::spawn(handler.run().boxed());
@@ -117,6 +127,7 @@ fn build_test_full_node(network_config: config::NetworkConfiguration)
),
metrics_registry: None,
block_request_protocol_config,
+ light_client_request_protocol_config,
})
.unwrap();
diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs
index ab7625ff9fe8a..007928ad425f7 100644
--- a/client/network/src/lib.rs
+++ b/client/network/src/lib.rs
@@ -249,7 +249,6 @@ mod behaviour;
mod chain;
mod peer_info;
mod discovery;
-mod light_client_handler;
mod on_demand_layer;
mod protocol;
mod request_responses;
@@ -259,6 +258,7 @@ mod transport;
mod utils;
pub mod block_request_handler;
+pub mod light_client_requests;
pub mod config;
pub mod error;
pub mod gossip;
diff --git a/client/network/src/light_client_handler.rs b/client/network/src/light_client_handler.rs
deleted file mode 100644
index 1062236e25eb3..0000000000000
--- a/client/network/src/light_client_handler.rs
+++ /dev/null
@@ -1,2061 +0,0 @@
-// This file is part of Substrate.
-
-// Copyright (C) 2020-2021 Parity Technologies (UK) Ltd.
-// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
-
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with this program. If not, see .
-
-//! [`NetworkBehaviour`] implementation which handles light client requests.
-//!
-//! Every request is coming in on a separate connection substream which gets
-//! closed after we have sent the response back. Requests and responses are
-//! encoded as protocol buffers (cf. `api.v1.proto`).
-//!
-//! For every outgoing request we likewise open a separate substream.
-
-#![allow(unused)]
-
-use bytes::Bytes;
-use codec::{self, Encode, Decode};
-use crate::{
- chain::Client,
- config::ProtocolId,
- protocol::message::{BlockAttributes, Direction, FromBlock},
- schema,
-};
-use futures::{channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered};
-use libp2p::{
- core::{
- ConnectedPoint,
- Multiaddr,
- PeerId,
- connection::ConnectionId,
- upgrade::{InboundUpgrade, ReadOneError, UpgradeInfo, Negotiated},
- upgrade::{OutboundUpgrade, read_one, write_one}
- },
- swarm::{
- AddressRecord,
- NegotiatedSubstream,
- NetworkBehaviour,
- NetworkBehaviourAction,
- NotifyHandler,
- OneShotHandler,
- OneShotHandlerConfig,
- PollParameters,
- SubstreamProtocol,
- }
-};
-use nohash_hasher::IntMap;
-use prost::Message;
-use sc_client_api::{
- StorageProof,
- light::{
- self, RemoteReadRequest, RemoteBodyRequest, ChangesProof,
- RemoteCallRequest, RemoteChangesRequest, RemoteHeaderRequest,
- }
-};
-use sc_peerset::ReputationChange;
-use sp_core::{
- storage::{ChildInfo, ChildType,StorageKey, PrefixedStorageKey},
- hexdisplay::HexDisplay,
-};
-use smallvec::SmallVec;
-use sp_blockchain::{Error as ClientError};
-use sp_runtime::{
- traits::{Block, Header, NumberFor, Zero},
- generic::BlockId,
-};
-use std::{
- collections::{BTreeMap, VecDeque, HashMap},
- iter,
- io,
- sync::Arc,
- time::Duration,
- task::{Context, Poll}
-};
-use void::Void;
-use wasm_timer::Instant;
-
-/// Reputation change for a peer when a request timed out.
-pub(crate) const TIMEOUT_REPUTATION_CHANGE: i32 = -(1 << 8);
-
-/// Configuration options for `LightClientHandler` behaviour.
-#[derive(Debug, Clone)]
-pub struct Config {
- max_request_size: usize,
- max_response_size: usize,
- max_pending_requests: usize,
- inactivity_timeout: Duration,
- request_timeout: Duration,
- light_protocol: Bytes,
- block_protocol: Bytes,
-}
-
-impl Config {
- /// Create a fresh configuration with the following options:
- ///
- /// - max. request size = 1 MiB
- /// - max. response size = 16 MiB
- /// - max. pending requests = 128
- /// - inactivity timeout = 15s
- /// - request timeout = 15s
- pub fn new(id: &ProtocolId) -> Self {
- let mut c = Config {
- max_request_size: 1 * 1024 * 1024,
- max_response_size: 16 * 1024 * 1024,
- max_pending_requests: 128,
- inactivity_timeout: Duration::from_secs(15),
- request_timeout: Duration::from_secs(15),
- light_protocol: Bytes::new(),
- block_protocol: Bytes::new(),
- };
- c.set_protocol(id);
- c
- }
-
- /// Limit the max. length in bytes of a request.
- pub fn set_max_request_size(&mut self, v: usize) -> &mut Self {
- self.max_request_size = v;
- self
- }
-
- /// Limit the max. length in bytes of a response.
- pub fn set_max_response_size(&mut self, v: usize) -> &mut Self {
- self.max_response_size = v;
- self
- }
-
- /// Limit the max. number of pending requests.
- pub fn set_max_pending_requests(&mut self, v: usize) -> &mut Self {
- self.max_pending_requests = v;
- self
- }
-
- /// Limit the max. duration the connection may remain inactive before closing it.
- pub fn set_inactivity_timeout(&mut self, v: Duration) -> &mut Self {
- self.inactivity_timeout = v;
- self
- }
-
- /// Limit the max. request duration.
- pub fn set_request_timeout(&mut self, v: Duration) -> &mut Self {
- self.request_timeout = v;
- self
- }
-
- /// Set protocol to use for upgrade negotiation.
- pub fn set_protocol(&mut self, id: &ProtocolId) -> &mut Self {
- let mut vl = Vec::new();
- vl.extend_from_slice(b"/");
- vl.extend_from_slice(id.as_ref().as_bytes());
- vl.extend_from_slice(b"/light/2");
- self.light_protocol = vl.into();
-
- let mut vb = Vec::new();
- vb.extend_from_slice(b"/");
- vb.extend_from_slice(id.as_ref().as_bytes());
- vb.extend_from_slice(b"/sync/2");
- self.block_protocol = vb.into();
-
- self
- }
-}
-
-/// Possible errors while handling light clients.
-#[derive(Debug, thiserror::Error)]
-pub enum Error {
- /// There are currently too many pending request.
- #[error("too many pending requests")]
- TooManyRequests,
- /// The response type does not correspond to the issued request.
- #[error("unexpected response")]
- UnexpectedResponse,
- /// A bad request has been received.
- #[error("bad request: {0}")]
- BadRequest(&'static str),
- /// The chain client errored.
- #[error("client error: {0}")]
- Client(#[from] ClientError),
- /// Encoding or decoding of some data failed.
- #[error("codec error: {0}")]
- Codec(#[from] codec::Error),
-}
-
-/// The possible light client requests we support.
-///
-/// The associated `oneshot::Sender` will be used to convey the result of
-/// their request back to them (cf. `Reply`).
-//
-// This is modeled after light_dispatch.rs's `RequestData` which is not
-// used because we currently only support a subset of those.
-#[derive(Debug)]
-pub enum Request {
- Body {
- request: RemoteBodyRequest,
- sender: oneshot::Sender, ClientError>>
- },
- Header {
- request: light::RemoteHeaderRequest,
- sender: oneshot::Sender>
- },
- Read {
- request: light::RemoteReadRequest,
- sender: oneshot::Sender, Option>>, ClientError>>
- },
- ReadChild {
- request: light::RemoteReadChildRequest,
- sender: oneshot::Sender, Option>>, ClientError>>
- },
- Call {
- request: light::RemoteCallRequest,
- sender: oneshot::Sender, ClientError>>
- },
- Changes {
- request: light::RemoteChangesRequest,
- sender: oneshot::Sender, u32)>, ClientError>>
- }
-}
-
-/// The data to send back to the light client over the oneshot channel.
-//
-// It is unified here in order to be able to return it as a function
-// result instead of delivering it to the client as a side effect of
-// response processing.
-#[derive(Debug)]
-enum Reply {
- VecU8(Vec),
- VecNumberU32(Vec<(::Number, u32)>),
- MapVecU8OptVecU8(HashMap, Option>>),
- Header(B::Header),
- Extrinsics(Vec),
-}
-
-/// Augments a light client request with metadata.
-#[derive(Debug)]
-struct RequestWrapper {
- /// Time when this value was created.
- timestamp: Instant,
- /// Remaining retries.
- retries: usize,
- /// The actual request.
- request: Request,
- /// The peer to send the request to, e.g. `PeerId`.
- peer: P,
- /// The connection to use for sending the request.
- connection: Option,
-}
-
-/// Information we have about some peer.
-#[derive(Debug)]
-struct PeerInfo {
- connections: SmallVec<[(ConnectionId, Multiaddr); crate::MAX_CONNECTIONS_PER_PEER]>,
- best_block: Option>,
- status: PeerStatus,
-}
-
-impl Default for PeerInfo {
- fn default() -> Self {
- PeerInfo {
- connections: SmallVec::new(),
- best_block: None,
- status: PeerStatus::Idle,
- }
- }
-}
-
-type RequestId = u64;
-
-/// A peer is either idle or busy processing a request from us.
-#[derive(Debug, Clone, PartialEq, Eq)]
-enum PeerStatus {
- /// The peer is available.
- Idle,
- /// We wait for the peer to return us a response for the given request ID.
- BusyWith(RequestId),
-}
-
-/// The light client handler behaviour.
-pub struct LightClientHandler {
- /// This behaviour's configuration.
- config: Config,
- /// Blockchain client.
- chain: Arc>,
- /// Verifies that received responses are correct.
- checker: Arc>,
- /// Peer information (addresses, their best block, etc.)
- peers: HashMap>,
- /// Futures sending back response to remote clients.
- responses: FuturesUnordered>,
- /// Pending (local) requests.
- pending_requests: VecDeque>,
- /// Requests on their way to remote peers.
- outstanding: IntMap>,
- /// (Local) Request ID counter
- next_request_id: RequestId,
- /// Handle to use for reporting misbehaviour of peers.
- peerset: sc_peerset::PeersetHandle,
-}
-
-impl LightClientHandler
-where
- B: Block,
-{
- /// Construct a new light client handler.
- pub fn new(
- cfg: Config,
- chain: Arc>,
- checker: Arc>,
- peerset: sc_peerset::PeersetHandle,
- ) -> Self {
- LightClientHandler {
- config: cfg,
- chain,
- checker,
- peers: HashMap::new(),
- responses: FuturesUnordered::new(),
- pending_requests: VecDeque::new(),
- outstanding: IntMap::default(),
- next_request_id: 1,
- peerset,
- }
- }
-
- /// We rely on external information about peers best blocks as we lack the
- /// means to determine it ourselves.
- pub fn update_best_block(&mut self, peer: &PeerId, num: NumberFor) {
- if let Some(info) = self.peers.get_mut(peer) {
- log::trace!("new best block for {:?}: {:?}", peer, num);
- info.best_block = Some(num)
- }
- }
-
- /// Issue a new light client request.
- pub fn request(&mut self, req: Request) -> Result<(), Error> {
- if self.pending_requests.len() >= self.config.max_pending_requests {
- return Err(Error::TooManyRequests)
- }
- let rw = RequestWrapper {
- timestamp: Instant::now(),
- retries: retries(&req),
- request: req,
- peer: (), // we do not know the peer yet
- connection: None,
- };
- self.pending_requests.push_back(rw);
- Ok(())
- }
-
- fn next_request_id(&mut self) -> RequestId {
- let id = self.next_request_id;
- self.next_request_id += 1;
- id
- }
-
- /// Remove the given peer.
- ///
- /// If we have a request to this peer in flight, we move it back to
- /// the pending requests queue.
- fn remove_peer(&mut self, peer: &PeerId) {
- if let Some(id) = self.outstanding.iter().find(|(_, rw)| &rw.peer == peer).map(|(k, _)| *k) {
- let rw = self.outstanding.remove(&id).expect("key belongs to entry in this map");
- let rw = RequestWrapper {
- timestamp: rw.timestamp,
- retries: rw.retries,
- request: rw.request,
- peer: (), // need to find another peer
- connection: None,
- };
- self.pending_requests.push_back(rw);
- }
- self.peers.remove(peer);
- }
-
- /// Prepares a request by selecting a suitable peer and connection to send it to.
- ///
- /// If there is currently no suitable peer for the request, the given request
- /// is returned as `Err`.
- fn prepare_request(&self, req: RequestWrapper)
- -> Result<(PeerId, RequestWrapper), RequestWrapper>
- {
- let number = required_block(&req.request);
-
- let mut peer = None;
- for (peer_id, peer_info) in self.peers.iter() {
- if peer_info.status == PeerStatus::Idle {
- match peer_info.best_block {
- Some(n) => if n >= number {
- peer = Some((peer_id, peer_info));
- break
- },
- None => peer = Some((peer_id, peer_info))
- }
- }
- }
-
- if let Some((peer_id, peer_info)) = peer {
- let connection = peer_info.connections.iter().next().map(|(id, _)| *id);
- let rw = RequestWrapper {
- timestamp: req.timestamp,
- retries: req.retries,
- request: req.request,
- peer: peer_id.clone(),
- connection,
- };
- Ok((peer_id.clone(), rw))
- } else {
- Err(req)
- }
- }
-
- /// Process a local request's response from remote.
- ///
- /// If successful, this will give us the actual, checked data we should be
- /// sending back to the client, otherwise an error.
- fn on_response
- ( &mut self
- , peer: &PeerId
- , request: &Request
- , response: Response
- ) -> Result, Error>
- {
- log::trace!("response from {}", peer);
- match response {
- Response::Light(r) => self.on_response_light(peer, request, r),
- Response::Block(r) => self.on_response_block(peer, request, r),
- }
- }
-
- fn on_response_light
- ( &mut self
- , peer: &PeerId
- , request: &Request
- , response: schema::v1::light::Response
- ) -> Result, Error>
- {
- use schema::v1::light::response::Response;
- match response.response {
- Some(Response::RemoteCallResponse(response)) =>
- if let Request::Call { request , .. } = request {
- let proof = Decode::decode(&mut response.proof.as_ref())?;
- let reply = self.checker.check_execution_proof(request, proof)?;
- Ok(Reply::VecU8(reply))
- } else {
- Err(Error::UnexpectedResponse)
- }
- Some(Response::RemoteReadResponse(response)) =>
- match request {
- Request::Read { request, .. } => {
- let proof = Decode::decode(&mut response.proof.as_ref())?;
- let reply = self.checker.check_read_proof(&request, proof)?;
- Ok(Reply::MapVecU8OptVecU8(reply))
- }
- Request::ReadChild { request, .. } => {
- let proof = Decode::decode(&mut response.proof.as_ref())?;
- let reply = self.checker.check_read_child_proof(&request, proof)?;
- Ok(Reply::MapVecU8OptVecU8(reply))
- }
- _ => Err(Error::UnexpectedResponse)
- }
- Some(Response::RemoteChangesResponse(response)) =>
- if let Request::Changes { request, .. } = request {
- let max_block = Decode::decode(&mut response.max.as_ref())?;
- let roots_proof = Decode::decode(&mut response.roots_proof.as_ref())?;
- let roots = {
- let mut r = BTreeMap::new();
- for pair in response.roots {
- let k = Decode::decode(&mut pair.fst.as_ref())?;
- let v = Decode::decode(&mut pair.snd.as_ref())?;
- r.insert(k, v);
- }
- r
- };
- let reply = self.checker.check_changes_proof(&request, light::ChangesProof {
- max_block,
- proof: response.proof,
- roots,
- roots_proof,
- })?;
- Ok(Reply::VecNumberU32(reply))
- } else {
- Err(Error::UnexpectedResponse)
- }
- Some(Response::RemoteHeaderResponse(response)) =>
- if let Request::Header { request, .. } = request {
- let header =
- if response.header.is_empty() {
- None
- } else {
- Some(Decode::decode(&mut response.header.as_ref())?)
- };
- let proof = Decode::decode(&mut response.proof.as_ref())?;
- let reply = self.checker.check_header_proof(&request, header, proof)?;
- Ok(Reply::Header(reply))
- } else {
- Err(Error::UnexpectedResponse)
- }
- None => Err(Error::UnexpectedResponse)
- }
- }
-
- fn on_response_block
- ( &mut self
- , peer: &PeerId
- , request: &Request
- , response: schema::v1::BlockResponse
- ) -> Result, Error>
- {
- let request = if let Request::Body { request , .. } = &request {
- request
- } else {
- return Err(Error::UnexpectedResponse);
- };
-
- let body: Vec<_> = match response.blocks.into_iter().next() {
- Some(b) => b.body,
- None => return Err(Error::UnexpectedResponse),
- };
-
- let body = body.into_iter()
- .map(|mut extrinsic| B::Extrinsic::decode(&mut &extrinsic[..]))
- .collect::>()?;
-
- let body = self.checker.check_body_proof(&request, body)?;
- Ok(Reply::Extrinsics(body))
- }
-
- fn on_remote_call_request
- ( &mut self
- , peer: &PeerId
- , request: &schema::v1::light::RemoteCallRequest
- ) -> Result
- {
- log::trace!("remote call request from {} ({} at {:?})",
- peer,
- request.method,
- request.block,
- );
-
- let block = Decode::decode(&mut request.block.as_ref())?;
-
- let proof = match self.chain.execution_proof(&BlockId::Hash(block), &request.method, &request.data) {
- Ok((_, proof)) => proof,
- Err(e) => {
- log::trace!("remote call request from {} ({} at {:?}) failed with: {}",
- peer,
- request.method,
- request.block,
- e,
- );
- StorageProof::empty()
- }
- };
-
- let response = {
- let r = schema::v1::light::RemoteCallResponse { proof: proof.encode() };
- schema::v1::light::response::Response::RemoteCallResponse(r)
- };
-
- Ok(schema::v1::light::Response { response: Some(response) })
- }
-
- fn on_remote_read_request
- ( &mut self
- , peer: &PeerId
- , request: &schema::v1::light::RemoteReadRequest
- ) -> Result
- {
- if request.keys.is_empty() {
- log::debug!("invalid remote read request sent by {}", peer);
- return Err(Error::BadRequest("remote read request without keys"))
- }
-
- log::trace!("remote read request from {} ({} at {:?})",
- peer,
- fmt_keys(request.keys.first(), request.keys.last()),
- request.block);
-
- let block = Decode::decode(&mut request.block.as_ref())?;
-
- let proof = match self.chain.read_proof(&BlockId::Hash(block), &mut request.keys.iter().map(AsRef::as_ref)) {
- Ok(proof) => proof,
- Err(error) => {
- log::trace!("remote read request from {} ({} at {:?}) failed with: {}",
- peer,
- fmt_keys(request.keys.first(), request.keys.last()),
- request.block,
- error);
- StorageProof::empty()
- }
- };
-
- let response = {
- let r = schema::v1::light::RemoteReadResponse { proof: proof.encode() };
- schema::v1::light::response::Response::RemoteReadResponse(r)
- };
-
- Ok(schema::v1::light::Response { response: Some(response) })
- }
-
- fn on_remote_read_child_request
- ( &mut self
- , peer: &PeerId
- , request: &schema::v1::light::RemoteReadChildRequest
- ) -> Result
- {
- if request.keys.is_empty() {
- log::debug!("invalid remote child read request sent by {}", peer);
- return Err(Error::BadRequest("remove read child request without keys"))
- }
-
- log::trace!("remote read child request from {} ({} {} at {:?})",
- peer,
- HexDisplay::from(&request.storage_key),
- fmt_keys(request.keys.first(), request.keys.last()),
- request.block);
-
- let block = Decode::decode(&mut request.block.as_ref())?;
-
- let prefixed_key = PrefixedStorageKey::new_ref(&request.storage_key);
- let child_info = match ChildType::from_prefixed_key(prefixed_key) {
- Some((ChildType::ParentKeyId, storage_key)) => Ok(ChildInfo::new_default(storage_key)),
- None => Err(sp_blockchain::Error::InvalidChildStorageKey),
- };
- let proof = match child_info.and_then(|child_info| self.chain.read_child_proof(
- &BlockId::Hash(block),
- &child_info,
- &mut request.keys.iter().map(AsRef::as_ref)
- )) {
- Ok(proof) => proof,
- Err(error) => {
- log::trace!("remote read child request from {} ({} {} at {:?}) failed with: {}",
- peer,
- HexDisplay::from(&request.storage_key),
- fmt_keys(request.keys.first(), request.keys.last()),
- request.block,
- error);
- StorageProof::empty()
- }
- };
-
- let response = {
- let r = schema::v1::light::RemoteReadResponse { proof: proof.encode() };
- schema::v1::light::response::Response::RemoteReadResponse(r)
- };
-
- Ok(schema::v1::light::Response { response: Some(response) })
- }
-
- fn on_remote_header_request
- ( &mut self
- , peer: &PeerId
- , request: &schema::v1::light::RemoteHeaderRequest
- ) -> Result
- {
- log::trace!("remote header proof request from {} ({:?})", peer, request.block);
-
- let block = Decode::decode(&mut request.block.as_ref())?;
- let (header, proof) = match self.chain.header_proof(&BlockId::Number(block)) {
- Ok((header, proof)) => (header.encode(), proof),
- Err(error) => {
- log::trace!("remote header proof request from {} ({:?}) failed with: {}",
- peer,
- request.block,
- error);
- (Default::default(), StorageProof::empty())
- }
- };
-
- let response = {
- let r = schema::v1::light::RemoteHeaderResponse { header, proof: proof.encode() };
- schema::v1::light::response::Response::RemoteHeaderResponse(r)
- };
-
- Ok(schema::v1::light::Response { response: Some(response) })
- }
-
- fn on_remote_changes_request
- ( &mut self
- , peer: &PeerId
- , request: &schema::v1::light::RemoteChangesRequest
- ) -> Result
- {
- log::trace!("remote changes proof request from {} for key {} ({:?}..{:?})",
- peer,
- if !request.storage_key.is_empty() {
- format!("{} : {}", HexDisplay::from(&request.storage_key), HexDisplay::from(&request.key))
- } else {
- HexDisplay::from(&request.key).to_string()
- },
- request.first,
- request.last);
-
- let first = Decode::decode(&mut request.first.as_ref())?;
- let last = Decode::decode(&mut request.last.as_ref())?;
- let min = Decode::decode(&mut request.min.as_ref())?;
- let max = Decode::decode(&mut request.max.as_ref())?;
- let key = StorageKey(request.key.clone());
- let storage_key = if request.storage_key.is_empty() {
- None
- } else {
- Some(PrefixedStorageKey::new_ref(&request.storage_key))
- };
-
- let proof = match self.chain.key_changes_proof(first, last, min, max, storage_key, &key) {
- Ok(proof) => proof,
- Err(error) => {
- log::trace!("remote changes proof request from {} for key {} ({:?}..{:?}) failed with: {}",
- peer,
- format!("{} : {}", HexDisplay::from(&request.storage_key), HexDisplay::from(&key.0)),
- request.first,
- request.last,
- error);
-
- light::ChangesProof:: {
- max_block: Zero::zero(),
- proof: Vec::new(),
- roots: BTreeMap::new(),
- roots_proof: StorageProof::empty(),
- }
- }
- };
-
- let response = {
- let r = schema::v1::light::RemoteChangesResponse {
- max: proof.max_block.encode(),
- proof: proof.proof,
- roots: proof.roots.into_iter()
- .map(|(k, v)| schema::v1::light::Pair { fst: k.encode(), snd: v.encode() })
- .collect(),
- roots_proof: proof.roots_proof.encode(),
- };
- schema::v1::light::response::Response::RemoteChangesResponse(r)
- };
-
- Ok(schema::v1::light::Response { response: Some(response) })
- }
-}
-
-impl NetworkBehaviour for LightClientHandler
-where
- B: Block
-{
- type ProtocolsHandler = OneShotHandler>;
- type OutEvent = Void;
-
- fn new_handler(&mut self) -> Self::ProtocolsHandler {
- let p = InboundProtocol {
- max_request_size: self.config.max_request_size,
- protocol: self.config.light_protocol.clone(),
- };
- let mut cfg = OneShotHandlerConfig::default();
- cfg.keep_alive_timeout = self.config.inactivity_timeout;
- OneShotHandler::new(SubstreamProtocol::new(p, ()), cfg)
- }
-
- fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec {
- self.peers.get(peer)
- .map(|info| info.connections.iter().map(|(_, a)| a.clone()).collect())
- .unwrap_or_default()
- }
-
- fn inject_connected(&mut self, peer: &PeerId) {
- }
-
- fn inject_connection_established(&mut self, peer: &PeerId, conn: &ConnectionId, info: &ConnectedPoint) {
- let peer_address = match info {
- ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr.clone(),
- ConnectedPoint::Dialer { address } => address.clone()
- };
-
- log::trace!("peer {} connected with address {}", peer, peer_address);
-
- let entry = self.peers.entry(peer.clone()).or_default();
- entry.connections.push((*conn, peer_address));
- }
-
- fn inject_disconnected(&mut self, peer: &PeerId) {
- log::trace!("peer {} disconnected", peer);
- self.remove_peer(peer)
- }
-
- fn inject_connection_closed(&mut self, peer: &PeerId, conn: &ConnectionId, info: &ConnectedPoint) {
- let peer_address = match info {
- ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
- ConnectedPoint::Dialer { address } => address
- };
-
- log::trace!("connection to peer {} closed: {}", peer, peer_address);
-
- if let Some(info) = self.peers.get_mut(peer) {
- info.connections.retain(|(c, _)| c != conn)
- }
-
- // Add any outstanding requests on the closed connection back to the
- // pending requests.
- if let Some(id) = self.outstanding.iter()
- .find(|(_, rw)| &rw.peer == peer && rw.connection == Some(*conn)) // (*)
- .map(|(id, _)| *id)
- {
- let rw = self.outstanding.remove(&id).expect("by (*)");
- let rw = RequestWrapper {
- timestamp: rw.timestamp,
- retries: rw.retries,
- request: rw.request,
- peer: (), // need to find another peer
- connection: None,
- };
- self.pending_requests.push_back(rw);
- }
- }
-
- fn inject_event(&mut self, peer: PeerId, conn: ConnectionId, event: Event) {
- match event {
- // An incoming request from remote has been received.
- Event::Request(request, mut stream) => {
- log::trace!("incoming request from {}", peer);
- let result = match &request.request {
- Some(schema::v1::light::request::Request::RemoteCallRequest(r)) =>
- self.on_remote_call_request(&peer, r),
- Some(schema::v1::light::request::Request::RemoteReadRequest(r)) =>
- self.on_remote_read_request(&peer, r),
- Some(schema::v1::light::request::Request::RemoteHeaderRequest(r)) =>
- self.on_remote_header_request(&peer, r),
- Some(schema::v1::light::request::Request::RemoteReadChildRequest(r)) =>
- self.on_remote_read_child_request(&peer, r),
- Some(schema::v1::light::request::Request::RemoteChangesRequest(r)) =>
- self.on_remote_changes_request(&peer, r),
- None => {
- log::debug!("ignoring request without request data from peer {}", peer);
- return
- }
- };
- match result {
- Ok(response) => {
- log::trace!("enqueueing response for peer {}", peer);
- let mut data = Vec::new();
- if let Err(e) = response.encode(&mut data) {
- log::debug!("error encoding response for peer {}: {}", peer, e)
- } else {
- let future = async move {
- if let Err(e) = write_one(&mut stream, data).await {
- log::debug!("error writing response: {}", e)
- }
- };
- self.responses.push(future.boxed())
- }
- }
- Err(Error::BadRequest(_)) => {
- self.remove_peer(&peer);
- self.peerset.report_peer(peer, ReputationChange::new(-(1 << 12), "bad request"))
- }
- Err(e) => log::debug!("error handling request from peer {}: {}", peer, e)
- }
- }
- // A response to one of our own requests has been received.
- Event::Response(id, response) => {
- if let Some(request) = self.outstanding.remove(&id) {
- // We first just check if the response originates from the expected peer
- // and connection.
- if request.peer != peer {
- log::debug!("Expected response from {} instead of {}.", request.peer, peer);
- self.outstanding.insert(id, request);
- self.remove_peer(&peer);
- self.peerset.report_peer(peer, ReputationChange::new_fatal("response from unexpected peer"));
- return
- }
-
- if let Some(info) = self.peers.get_mut(&peer) {
- if info.status != PeerStatus::BusyWith(id) {
- // If we get here, something is wrong with our internal handling of peer
- // status information. At any time, a single peer processes at most one
- // request from us and its status should contain the request ID we are
- // expecting a response for. If a peer would send us a response with a
- // random ID, we should not have an entry for it with this peer ID in
- // our `outstanding` map, so a malicious peer should not be able to get
- // us here. It is our own fault and must be fixed!
- panic!("unexpected peer status {:?} for {}", info.status, peer);
- }
-
- info.status = PeerStatus::Idle; // Make peer available again.
-
- match self.on_response(&peer, &request.request, response) {
- Ok(reply) => send_reply(Ok(reply), request.request),
- Err(Error::UnexpectedResponse) => {
- log::debug!("unexpected response {} from peer {}", id, peer);
- self.remove_peer(&peer);
- self.peerset.report_peer(peer, ReputationChange::new_fatal("unexpected response from peer"));
- let rw = RequestWrapper {
- timestamp: request.timestamp,
- retries: request.retries,
- request: request.request,
- peer: (),
- connection: None,
- };
- self.pending_requests.push_back(rw);
- }
- Err(other) => {
- log::debug!("error handling response {} from peer {}: {}", id, peer, other);
- self.remove_peer(&peer);
- self.peerset.report_peer(peer, ReputationChange::new_fatal("invalid response from peer"));
- if request.retries > 0 {
- let rw = RequestWrapper {
- timestamp: request.timestamp,
- retries: request.retries - 1,
- request: request.request,
- peer: (),
- connection: None,
- };
- self.pending_requests.push_back(rw)
- } else {
- send_reply(Err(ClientError::RemoteFetchFailed), request.request)
- }
- }
- }
- } else {
- // If we get here, something is wrong with our internal handling of peers.
- // We apparently have an entry in our `outstanding` map and the peer is the one we
- // expected. So, if we can not find an entry for it in our peer information table,
- // then these two collections are out of sync which must not happen and is a clear
- // programmer error that must be fixed!
- panic!("missing peer information for {}; response {}", peer, id);
- }
- } else {
- log::debug!("unexpected response {} from peer {}", id, peer);
- self.remove_peer(&peer);
- self.peerset.report_peer(peer, ReputationChange::new_fatal("response from unexpected peer"));
- }
- }
- }
- }
-
- fn poll(&mut self, cx: &mut Context, _: &mut impl PollParameters) -> Poll> {
- // Process response sending futures.
- while let Poll::Ready(Some(_)) = self.responses.poll_next_unpin(cx) {}
-
- // If we have a pending request to send, try to find an available peer and send it.
- let now = Instant::now();
- while let Some(mut request) = self.pending_requests.pop_front() {
- if now > request.timestamp + self.config.request_timeout {
- if request.retries == 0 {
- send_reply(Err(ClientError::RemoteFetchFailed), request.request);
- continue
- }
- request.timestamp = Instant::now();
- request.retries -= 1
- }
-
-
- match self.prepare_request(request) {
- Err(request) => {
- self.pending_requests.push_front(request);
- log::debug!("no peer available to send request to");
- break
- }
- Ok((peer, request)) => {
- let request_bytes = match serialize_request(&request.request) {
- Ok(bytes) => bytes,
- Err(error) => {
- log::debug!("failed to serialize request: {}", error);
- send_reply(Err(ClientError::RemoteFetchFailed), request.request);
- continue
- }
- };
-
- let (expected, protocol) = match request.request {
- Request::Body { .. } =>
- (ExpectedResponseTy::Block, self.config.block_protocol.clone()),
- _ =>
- (ExpectedResponseTy::Light, self.config.light_protocol.clone()),
- };
-
- let peer_id = peer.clone();
- let handler = request.connection.map_or(NotifyHandler::Any, NotifyHandler::One);
-
- let request_id = self.next_request_id();
- if let Some(p) = self.peers.get_mut(&peer) {
- p.status = PeerStatus::BusyWith(request_id);
- }
- self.outstanding.insert(request_id, request);
-
- let event = OutboundProtocol {
- request_id,
- request: request_bytes,
- expected,
- max_response_size: self.config.max_response_size,
- protocol,
- };
-
- log::trace!("sending request {} to peer {}", request_id, peer_id);
-
- return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
- peer_id,
- handler,
- event,
- })
- }
- }
- }
-
- // Look for ongoing requests that have timed out.
- let mut expired = Vec::new();
- for (id, rw) in &self.outstanding {
- if now > rw.timestamp + self.config.request_timeout {
- log::debug!("request {} timed out", id);
- expired.push(*id)
- }
- }
- for id in expired {
- if let Some(rw) = self.outstanding.remove(&id) {
- self.remove_peer(&rw.peer);
- self.peerset.report_peer(rw.peer.clone(),
- ReputationChange::new(TIMEOUT_REPUTATION_CHANGE, "light request timeout"));
- if rw.retries == 0 {
- send_reply(Err(ClientError::RemoteFetchFailed), rw.request);
- continue
- }
- let rw = RequestWrapper {
- timestamp: Instant::now(),
- retries: rw.retries - 1,
- request: rw.request,
- peer: (),
- connection: None,
- };
- self.pending_requests.push_back(rw)
- }
- }
-
- Poll::Pending
- }
-}
-
-fn required_block(request: &Request) -> NumberFor {
- match request {
- Request::Body { request, .. } => *request.header.number(),
- Request::Header { request, .. } => request.block,
- Request::Read { request, .. } => *request.header.number(),
- Request::ReadChild { request, .. } => *request.header.number(),
- Request::Call { request, .. } => *request.header.number(),
- Request::Changes { request, .. } => request.max_block.0,
- }
-}
-
-fn retries(request: &Request) -> usize {
- let rc = match request {
- Request::Body { request, .. } => request.retry_count,
- Request::Header { request, .. } => request.retry_count,
- Request::Read { request, .. } => request.retry_count,
- Request::ReadChild { request, .. } => request.retry_count,
- Request::Call { request, .. } => request.retry_count,
- Request::Changes { request, .. } => request.retry_count,
- };
- rc.unwrap_or(0)
-}
-
-fn serialize_request(request: &Request) -> Result, prost::EncodeError> {
- let request = match request {
- Request::Body { request, .. } => {
- let rq = schema::v1::BlockRequest {
- fields: BlockAttributes::BODY.to_be_u32(),
- from_block: Some(schema::v1::block_request::FromBlock::Hash(
- request.header.hash().encode(),
- )),
- to_block: Default::default(),
- direction: schema::v1::Direction::Ascending as i32,
- max_blocks: 1,
- };
-
- let mut buf = Vec::with_capacity(rq.encoded_len());
- rq.encode(&mut buf)?;
- return Ok(buf);
- }
- Request::Header { request, .. } => {
- let r = schema::v1::light::RemoteHeaderRequest { block: request.block.encode() };
- schema::v1::light::request::Request::RemoteHeaderRequest(r)
- }
- Request::Read { request, .. } => {
- let r = schema::v1::light::RemoteReadRequest {
- block: request.block.encode(),
- keys: request.keys.clone(),
- };
- schema::v1::light::request::Request::RemoteReadRequest(r)
- }
- Request::ReadChild { request, .. } => {
- let r = schema::v1::light::RemoteReadChildRequest {
- block: request.block.encode(),
- storage_key: request.storage_key.clone().into_inner(),
- keys: request.keys.clone(),
- };
- schema::v1::light::request::Request::RemoteReadChildRequest(r)
- }
- Request::Call { request, .. } => {
- let r = schema::v1::light::RemoteCallRequest {
- block: request.block.encode(),
- method: request.method.clone(),
- data: request.call_data.clone(),
- };
- schema::v1::light::request::Request::RemoteCallRequest(r)
- }
- Request::Changes { request, .. } => {
- let r = schema::v1::light::RemoteChangesRequest {
- first: request.first_block.1.encode(),
- last: request.last_block.1.encode(),
- min: request.tries_roots.1.encode(),
- max: request.max_block.1.encode(),
- storage_key: request.storage_key.clone().map(|s| s.into_inner())
- .unwrap_or_default(),
- key: request.key.clone(),
- };
- schema::v1::light::request::Request::RemoteChangesRequest(r)
- }
- };
-
- let rq = schema::v1::light::Request { request: Some(request) };
- let mut buf = Vec::with_capacity(rq.encoded_len());
- rq.encode(&mut buf)?;
- Ok(buf)
-}
-
-fn send_reply(result: Result, ClientError>, request: Request) {
- fn send(item: T, sender: oneshot::Sender) {
- let _ = sender.send(item); // It is okay if the other end already hung up.
- }
- match request {
- Request::Body { request, sender } => match result {
- Err(e) => send(Err(e), sender),
- Ok(Reply::Extrinsics(x)) => send(Ok(x), sender),
- reply => log::error!("invalid reply for body request: {:?}, {:?}", reply, request),
- }
- Request::Header { request, sender } => match result {
- Err(e) => send(Err(e), sender),
- Ok(Reply::Header(x)) => send(Ok(x), sender),
- reply => log::error!("invalid reply for header request: {:?}, {:?}", reply, request),
- }
- Request::Read { request, sender } => match result {
- Err(e) => send(Err(e), sender),
- Ok(Reply::MapVecU8OptVecU8(x)) => send(Ok(x), sender),
- reply => log::error!("invalid reply for read request: {:?}, {:?}", reply, request),
- }
- Request::ReadChild { request, sender } => match result {
- Err(e) => send(Err(e), sender),
- Ok(Reply::MapVecU8OptVecU8(x)) => send(Ok(x), sender),
- reply => log::error!("invalid reply for read child request: {:?}, {:?}", reply, request),
- }
- Request::Call { request, sender } => match result {
- Err(e) => send(Err(e), sender),
- Ok(Reply::VecU8(x)) => send(Ok(x), sender),
- reply => log::error!("invalid reply for call request: {:?}, {:?}", reply, request),
- }
- Request::Changes { request, sender } => match result {
- Err(e) => send(Err(e), sender),
- Ok(Reply::VecNumberU32(x)) => send(Ok(x), sender),
- reply => log::error!("invalid reply for changes request: {:?}, {:?}", reply, request),
- }
- }
-}
-
-/// Output type of inbound and outbound substream upgrades.
-#[derive(Debug)]
-pub enum Event {
- /// Incoming request from remote and substream to use for the response.
- Request(schema::v1::light::Request, T),
- /// Incoming response from remote.
- Response(RequestId, Response),
-}
-
-/// Incoming response from remote.
-#[derive(Debug, Clone)]
-pub enum Response {
- /// Incoming light response from remote.
- Light(schema::v1::light::Response),
- /// Incoming block response from remote.
- Block(schema::v1::BlockResponse),
-}
-
-/// Substream upgrade protocol.
-///
-/// Reads incoming requests from remote.
-#[derive(Debug, Clone)]
-pub struct InboundProtocol {
- /// The max. request length in bytes.
- max_request_size: usize,
- /// The protocol to use for upgrade negotiation.
- protocol: Bytes,
-}
-
-impl UpgradeInfo for InboundProtocol {
- type Info = Bytes;
- type InfoIter = iter::Once;
-
- fn protocol_info(&self) -> Self::InfoIter {
- iter::once(self.protocol.clone())
- }
-}
-
-impl InboundUpgrade for InboundProtocol
-where
- T: AsyncRead + AsyncWrite + Unpin + Send + 'static
-{
- type Output = Event;
- type Error = ReadOneError;
- type Future = BoxFuture<'static, Result>;
-
- fn upgrade_inbound(self, mut s: T, _: Self::Info) -> Self::Future {
- let future = async move {
- let vec = read_one(&mut s, self.max_request_size).await?;
- match schema::v1::light::Request::decode(&vec[..]) {
- Ok(r) => Ok(Event::Request(r, s)),
- Err(e) => Err(ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e)))
- }
- };
- future.boxed()
- }
-}
-
-/// Substream upgrade protocol.
-///
-/// Sends a request to remote and awaits the response.
-#[derive(Debug, Clone)]
-pub struct OutboundProtocol {
- /// The serialized protobuf request.
- request: Vec,
- /// Local identifier for the request. Used to associate it with a response.
- request_id: RequestId,
- /// Kind of response expected for this request.
- expected: ExpectedResponseTy,
- /// The max. response length in bytes.
- max_response_size: usize,
- /// The protocol to use for upgrade negotiation.
- protocol: Bytes,
-}
-
-/// Type of response expected from the remote for this request.
-#[derive(Debug, Clone)]
-enum ExpectedResponseTy {
- Light,
- Block,
-}
-
-impl UpgradeInfo for OutboundProtocol {
- type Info = Bytes;
- type InfoIter = iter::Once;
-
- fn protocol_info(&self) -> Self::InfoIter {
- iter::once(self.protocol.clone())
- }
-}
-
-impl OutboundUpgrade for OutboundProtocol
-where
- T: AsyncRead + AsyncWrite + Unpin + Send + 'static
-{
- type Output = Event;
- type Error = ReadOneError;
- type Future = BoxFuture<'static, Result>;
-
- fn upgrade_outbound(self, mut s: T, _: Self::Info) -> Self::Future {
- let future = async move {
- write_one(&mut s, &self.request).await?;
- let vec = read_one(&mut s, self.max_response_size).await?;
-
- match self.expected {
- ExpectedResponseTy::Light => {
- schema::v1::light::Response::decode(&vec[..])
- .map(|r| Event::Response(self.request_id, Response::Light(r)))
- .map_err(|e| {
- ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e))
- })
- },
- ExpectedResponseTy::Block => {
- schema::v1::BlockResponse::decode(&vec[..])
- .map(|r| Event::Response(self.request_id, Response::Block(r)))
- .map_err(|e| {
- ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e))
- })
- }
- }
- };
- future.boxed()
- }
-}
-
-fn fmt_keys(first: Option<&Vec>, last: Option<&Vec>) -> String {
- if let (Some(first), Some(last)) = (first, last) {
- if first == last {
- HexDisplay::from(first).to_string()
- } else {
- format!("{}..{}", HexDisplay::from(first), HexDisplay::from(last))
- }
- } else {
- String::from("n/a")
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use async_std::task;
- use assert_matches::assert_matches;
- use codec::Encode;
- use crate::{
- chain::Client,
- config::ProtocolId,
- schema,
- };
- use futures::{channel::oneshot, prelude::*};
- use libp2p::{
- PeerId,
- Multiaddr,
- core::{
- ConnectedPoint,
- connection::ConnectionId,
- identity,
- muxing::{StreamMuxerBox, SubstreamRef},
- transport::{Transport, Boxed, memory::MemoryTransport},
- upgrade
- },
- noise::{self, Keypair, X25519, NoiseConfig},
- swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters},
- yamux
- };
- use sc_client_api::{StorageProof, RemoteReadChildRequest, FetchChecker};
- use sp_blockchain::{Error as ClientError};
- use sp_core::storage::ChildInfo;
- use std::{
- collections::{HashMap, HashSet},
- io,
- iter::{self, FromIterator},
- pin::Pin,
- sync::Arc,
- task::{Context, Poll}
- };
- use sp_runtime::{generic::Header, traits::{BlakeTwo256, Block as BlockT, NumberFor}};
- use super::{Event, LightClientHandler, Request, Response, OutboundProtocol, PeerStatus};
- use void::Void;
-
- type Block = sp_runtime::generic::Block, substrate_test_runtime::Extrinsic>;
- type Handler = LightClientHandler;
- type Swarm = libp2p::swarm::Swarm;
-
- fn empty_proof() -> Vec {
- StorageProof::empty().encode()
- }
-
- fn make_swarm(ok: bool, ps: sc_peerset::PeersetHandle, cf: super::Config) -> Swarm {
- let client = Arc::new(substrate_test_runtime_client::new());
- let checker = Arc::new(DummyFetchChecker { ok, _mark: std::marker::PhantomData });
- let id_key = identity::Keypair::generate_ed25519();
- let dh_key = Keypair::::new().into_authentic(&id_key).unwrap();
- let local_peer = id_key.public().into_peer_id();
- let transport = MemoryTransport::default()
- .upgrade(upgrade::Version::V1)
- .authenticate(NoiseConfig::xx(dh_key).into_authenticated())
- .multiplex(yamux::YamuxConfig::default())
- .boxed();
- Swarm::new(transport, LightClientHandler::new(cf, client, checker, ps), local_peer)
- }
-
- struct DummyFetchChecker {
- ok: bool,
- _mark: std::marker::PhantomData
- }
-
- impl light::FetchChecker for DummyFetchChecker {
- fn check_header_proof(
- &self,
- _request: &RemoteHeaderRequest,
- header: Option,
- _remote_proof: StorageProof,
- ) -> Result {
- match self.ok {
- true if header.is_some() => Ok(header.unwrap()),
- _ => Err(ClientError::Backend("Test error".into())),
- }
- }
-
- fn check_read_proof(
- &self,
- request: &RemoteReadRequest,
- _: StorageProof,
- ) -> Result, Option>>, ClientError> {
- match self.ok {
- true => Ok(request.keys
- .iter()
- .cloned()
- .map(|k| (k, Some(vec![42])))
- .collect()
- ),
- false => Err(ClientError::Backend("Test error".into())),
- }
- }
-
- fn check_read_child_proof(
- &self,
- request: &RemoteReadChildRequest,
- _: StorageProof,
- ) -> Result, Option>>, ClientError> {
- match self.ok {
- true => Ok(request.keys
- .iter()
- .cloned()
- .map(|k| (k, Some(vec![42])))
- .collect()
- ),
- false => Err(ClientError::Backend("Test error".into())),
- }
- }
-
- fn check_execution_proof(
- &self,
- _: &RemoteCallRequest,
- _: StorageProof,
- ) -> Result, ClientError> {
- match self.ok {
- true => Ok(vec![42]),
- false => Err(ClientError::Backend("Test error".into())),
- }
- }
-
- fn check_changes_proof(
- &self,
- _: &RemoteChangesRequest,
- _: ChangesProof
- ) -> Result, u32)>, ClientError> {
- match self.ok {
- true => Ok(vec![(100u32.into(), 2)]),
- false => Err(ClientError::Backend("Test error".into())),
- }
- }
-
- fn check_body_proof(
- &self,
- _: &RemoteBodyRequest,
- body: Vec
- ) -> Result, ClientError> {
- match self.ok {
- true => Ok(body),
- false => Err(ClientError::Backend("Test error".into())),
- }
- }
- }
-
- fn make_config() -> super::Config {
- super::Config::new(&ProtocolId::from("foo"))
- }
-
- fn dummy_header() -> sp_test_primitives::Header {
- sp_test_primitives::Header {
- parent_hash: Default::default(),
- number: 0,
- state_root: Default::default(),
- extrinsics_root: Default::default(),
- digest: Default::default(),
- }
- }
-
- struct EmptyPollParams(PeerId);
-
- impl PollParameters for EmptyPollParams {
- type SupportedProtocolsIter = iter::Empty>;
- type ListenedAddressesIter = iter::Empty;
- type ExternalAddressesIter = iter::Empty;
-
- fn supported_protocols(&self) -> Self::SupportedProtocolsIter {
- iter::empty()
- }
-
- fn listened_addresses(&self) -> Self::ListenedAddressesIter {
- iter::empty()
- }
-
- fn external_addresses(&self) -> Self::ExternalAddressesIter {
- iter::empty()
- }
-
- fn local_peer_id(&self) -> &PeerId {
- &self.0
- }
- }
-
- fn peerset() -> (sc_peerset::Peerset, sc_peerset::PeersetHandle) {
- let cfg = sc_peerset::SetConfig {
- in_peers: 128,
- out_peers: 128,
- bootnodes: Default::default(),
- reserved_only: false,
- reserved_nodes: Default::default(),
- };
- sc_peerset::Peerset::from_config(sc_peerset::PeersetConfig{ sets: vec![cfg] })
- }
-
- fn make_behaviour
- ( ok: bool
- , ps: sc_peerset::PeersetHandle
- , cf: super::Config
- ) -> LightClientHandler
- {
- let client = Arc::new(substrate_test_runtime_client::new());
- let checker = Arc::new(DummyFetchChecker { ok, _mark: std::marker::PhantomData });
- LightClientHandler::new(cf, client, checker, ps)
- }
-
- fn empty_dialer() -> ConnectedPoint {
- ConnectedPoint::Dialer { address: Multiaddr::empty() }
- }
-
- fn poll(mut b: &mut LightClientHandler) -> Poll> {
- let mut p = EmptyPollParams(PeerId::random());
- match future::poll_fn(|cx| Pin::new(&mut b).poll(cx, &mut p)).now_or_never() {
- Some(a) => Poll::Ready(a),
- None => Poll::Pending
- }
- }
-
- #[test]
- fn disconnects_from_peer_if_told() {
- let peer = PeerId::random();
- let pset = peerset();
- let mut behaviour = make_behaviour(true, pset.1, make_config());
-
- behaviour.inject_connection_established(&peer, &ConnectionId::new(1), &empty_dialer());
- behaviour.inject_connected(&peer);
- assert_eq!(1, behaviour.peers.len());
-
- behaviour.inject_connection_closed(&peer, &ConnectionId::new(1), &empty_dialer());
- behaviour.inject_disconnected(&peer);
- assert_eq!(0, behaviour.peers.len())
- }
-
- #[test]
- fn disconnects_from_peer_if_request_times_out() {
- let peer0 = PeerId::random();
- let peer1 = PeerId::random();
- let pset = peerset();
- let mut behaviour = make_behaviour(true, pset.1, make_config());
-
- behaviour.inject_connection_established(&peer0, &ConnectionId::new(1), &empty_dialer());
- behaviour.inject_connected(&peer0);
- behaviour.inject_connection_established(&peer1, &ConnectionId::new(2), &empty_dialer());
- behaviour.inject_connected(&peer1);
-
- // We now know about two peers.
- assert_eq!(HashSet::from_iter(&[peer0.clone(), peer1.clone()]), behaviour.peers.keys().collect::>());
-
- // No requests have been made yet.
- assert!(behaviour.pending_requests.is_empty());
- assert!(behaviour.outstanding.is_empty());
-
- // Issue our first request!
- let chan = oneshot::channel();
- let request = light::RemoteCallRequest {
- block: Default::default(),
- header: dummy_header(),
- method: "test".into(),
- call_data: vec![],
- retry_count: Some(1),
- };
- behaviour.request(Request::Call { request, sender: chan.0 }).unwrap();
- assert_eq!(1, behaviour.pending_requests.len());
-
- // The behaviour should now attempt to send the request.
- assert_matches!(poll(&mut behaviour), Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, .. }) => {
- assert!(peer_id == peer0 || peer_id == peer1)
- });
-
- // And we should have one busy peer.
- assert!({
- let (idle, busy): (Vec<_>, Vec<_>) =
- behaviour.peers.iter().partition(|(_, info)| info.status == PeerStatus::Idle);
-
- idle.len() == 1 && busy.len() == 1
- && (idle[0].0 == &peer0 || busy[0].0 == &peer0)
- && (idle[0].0 == &peer1 || busy[0].0 == &peer1)
- });
-
- // No more pending requests, but one should be outstanding.
- assert_eq!(0, behaviour.pending_requests.len());
- assert_eq!(1, behaviour.outstanding.len());
-
- // We now set back the timestamp of the outstanding request to make it expire.
- let request = behaviour.outstanding.values_mut().next().unwrap();
- request.timestamp -= make_config().request_timeout;
-
- // Make progress, but do not expect some action.
- assert_matches!(poll(&mut behaviour), Poll::Pending);
-
- // The request should have timed out by now and the corresponding peer be removed.
- assert_eq!(1, behaviour.peers.len());
- // Since we asked for one retry, the request should be back in the pending queue.
- assert_eq!(1, behaviour.pending_requests.len());
- // No other request should be ongoing.
- assert_eq!(0, behaviour.outstanding.len());
- }
-
- #[test]
- fn disconnects_from_peer_on_incorrect_response() {
- let peer = PeerId::random();
- let pset = peerset();
- let mut behaviour = make_behaviour(false, pset.1, make_config());
- // ^--- Making sure the response data check fails.
-
- let conn = ConnectionId::new(1);
- behaviour.inject_connection_established(&peer, &conn, &empty_dialer());
- behaviour.inject_connected(&peer);
- assert_eq!(1, behaviour.peers.len());
-
- let chan = oneshot::channel();
- let request = light::RemoteCallRequest {
- block: Default::default(),
- header: dummy_header(),
- method: "test".into(),
- call_data: vec![],
- retry_count: Some(1),
- };
- behaviour.request(Request::Call { request, sender: chan.0 }).unwrap();
-
- assert_eq!(1, behaviour.pending_requests.len());
- assert_eq!(0, behaviour.outstanding.len());
- poll(&mut behaviour); // Make progress
- assert_eq!(0, behaviour.pending_requests.len());
- assert_eq!(1, behaviour.outstanding.len());
-
- let request_id = *behaviour.outstanding.keys().next().unwrap();
-
- let response = {
- let r = schema::v1::light::RemoteCallResponse { proof: empty_proof() };
- schema::v1::light::Response {
- response: Some(schema::v1::light::response::Response::RemoteCallResponse(r)),
- }
- };
-
- behaviour.inject_event(peer.clone(), conn, Event::Response(request_id, Response::Light(response)));
- assert!(behaviour.peers.is_empty());
-
- poll(&mut behaviour); // More progress
-
- // The request should be back in the pending queue
- assert_eq!(1, behaviour.pending_requests.len());
- assert_eq!(0, behaviour.outstanding.len());
- }
-
- #[test]
- fn disconnects_from_peer_on_unexpected_response() {
- let peer = PeerId::random();
- let pset = peerset();
- let mut behaviour = make_behaviour(true, pset.1, make_config());
-
- let conn = ConnectionId::new(1);
- behaviour.inject_connection_established(&peer, &conn, &empty_dialer());
- behaviour.inject_connected(&peer);
- assert_eq!(1, behaviour.peers.len());
- assert_eq!(0, behaviour.pending_requests.len());
- assert_eq!(0, behaviour.outstanding.len());
-
- // Some unsolicited response
- let response = {
- let r = schema::v1::light::RemoteCallResponse { proof: empty_proof() };
- schema::v1::light::Response {
- response: Some(schema::v1::light::response::Response::RemoteCallResponse(r)),
- }
- };
-
- behaviour.inject_event(peer.clone(), conn, Event::Response(2347895932, Response::Light(response)));
-
- assert!(behaviour.peers.is_empty());
- poll(&mut behaviour);
- assert_eq!(0, behaviour.pending_requests.len());
- assert_eq!(0, behaviour.outstanding.len());
- }
-
- #[test]
- fn disconnects_from_peer_on_wrong_response_type() {
- let peer = PeerId::random();
- let pset = peerset();
- let mut behaviour = make_behaviour(true, pset.1, make_config());
-
- let conn = ConnectionId::new(1);
- behaviour.inject_connection_established(&peer, &conn, &empty_dialer());
- behaviour.inject_connected(&peer);
- assert_eq!(1, behaviour.peers.len());
-
- let chan = oneshot::channel();
- let request = light::RemoteCallRequest {
- block: Default::default(),
- header: dummy_header(),
- method: "test".into(),
- call_data: vec![],
- retry_count: Some(1),
- };
- behaviour.request(Request::Call { request, sender: chan.0 }).unwrap();
-
- assert_eq!(1, behaviour.pending_requests.len());
- assert_eq!(0, behaviour.outstanding.len());
- poll(&mut behaviour); // Make progress
- assert_eq!(0, behaviour.pending_requests.len());
- assert_eq!(1, behaviour.outstanding.len());
-
- let request_id = *behaviour.outstanding.keys().next().unwrap();
-
- let response = {
- let r = schema::v1::light::RemoteReadResponse { proof: empty_proof() }; // Not a RemoteCallResponse!
- schema::v1::light::Response {
- response: Some(schema::v1::light::response::Response::RemoteReadResponse(r)),
- }
- };
-
- behaviour.inject_event(peer.clone(), conn, Event::Response(request_id, Response::Light(response)));
- assert!(behaviour.peers.is_empty());
-
- poll(&mut behaviour); // More progress
-
- // The request should be back in the pending queue
- assert_eq!(1, behaviour.pending_requests.len());
- assert_eq!(0, behaviour.outstanding.len());
- }
-
- #[test]
- fn receives_remote_failure_after_retry_count_failures() {
- let peer1 = PeerId::random();
- let peer2 = PeerId::random();
- let peer3 = PeerId::random();
- let peer4 = PeerId::random();
- let pset = peerset();
- let mut behaviour = make_behaviour(false, pset.1, make_config());
- // ^--- Making sure the response data check fails.
-
- let conn1 = ConnectionId::new(1);
- behaviour.inject_connection_established(&peer1, &conn1, &empty_dialer());
- behaviour.inject_connected(&peer1);
- let conn2 = ConnectionId::new(2);
- behaviour.inject_connection_established(&peer2, &conn2, &empty_dialer());
- behaviour.inject_connected(&peer2);
- let conn3 = ConnectionId::new(3);
- behaviour.inject_connection_established(&peer3, &conn3, &empty_dialer());
- behaviour.inject_connected(&peer3);
- let conn4 = ConnectionId::new(3);
- behaviour.inject_connection_established(&peer4, &conn4, &empty_dialer());
- behaviour.inject_connected(&peer4);
- assert_eq!(4, behaviour.peers.len());
-
- let mut chan = oneshot::channel();
- let request = light::RemoteCallRequest {
- block: Default::default(),
- header: dummy_header(),
- method: "test".into(),
- call_data: vec![],
- retry_count: Some(3), // Attempt up to three retries.
- };
- behaviour.request(Request::Call { request, sender: chan.0 }).unwrap();
-
- assert_eq!(1, behaviour.pending_requests.len());
- assert_eq!(0, behaviour.outstanding.len());
- assert_matches!(poll(&mut behaviour), Poll::Ready(NetworkBehaviourAction::NotifyHandler { .. }));
- assert_eq!(0, behaviour.pending_requests.len());
- assert_eq!(1, behaviour.outstanding.len());
-
- for i in 1 ..= 3 {
- // Construct an invalid response
- let request_id = *behaviour.outstanding.keys().next().unwrap();
- let responding_peer = behaviour.outstanding.values().next().unwrap().peer.clone();
- let response = {
- let r = schema::v1::light::RemoteCallResponse { proof: empty_proof() };
- schema::v1::light::Response {
- response: Some(schema::v1::light::response::Response::RemoteCallResponse(r))
- }
- };
- let conn = ConnectionId::new(i);
- behaviour.inject_event(responding_peer, conn, Event::Response(request_id, Response::Light(response.clone())));
- assert_matches!(poll(&mut behaviour), Poll::Ready(NetworkBehaviourAction::NotifyHandler { .. }));
- assert_matches!(chan.1.try_recv(), Ok(None))
- }
- // Final invalid response
- let request_id = *behaviour.outstanding.keys().next().unwrap();
- let responding_peer = behaviour.outstanding.values().next().unwrap().peer.clone();
- let response = {
- let r = schema::v1::light::RemoteCallResponse { proof: empty_proof() };
- schema::v1::light::Response {
- response: Some(schema::v1::light::response::Response::RemoteCallResponse(r)),
- }
- };
- behaviour.inject_event(responding_peer, conn4, Event::Response(request_id, Response::Light(response)));
- assert_matches!(poll(&mut behaviour), Poll::Pending);
- assert_matches!(chan.1.try_recv(), Ok(Some(Err(ClientError::RemoteFetchFailed))))
- }
-
- fn issue_request(request: Request) {
- let peer = PeerId::random();
- let pset = peerset();
- let mut behaviour = make_behaviour(true, pset.1, make_config());
-
- let conn = ConnectionId::new(1);
- behaviour.inject_connection_established(&peer, &conn, &empty_dialer());
- behaviour.inject_connected(&peer);
- assert_eq!(1, behaviour.peers.len());
-
- let response = match request {
- Request::Body { .. } => unimplemented!(),
- Request::Header{..} => {
- let r = schema::v1::light::RemoteHeaderResponse {
- header: dummy_header().encode(),
- proof: empty_proof()
- };
- schema::v1::light::Response {
- response: Some(schema::v1::light::response::Response::RemoteHeaderResponse(r)),
- }
- }
- Request::Read{..} => {
- let r = schema::v1::light::RemoteReadResponse { proof: empty_proof() };
- schema::v1::light::Response {
- response: Some(schema::v1::light::response::Response::RemoteReadResponse(r)),
- }
- }
- Request::ReadChild{..} => {
- let r = schema::v1::light::RemoteReadResponse { proof: empty_proof() };
- schema::v1::light::Response {
- response: Some(schema::v1::light::response::Response::RemoteReadResponse(r)),
- }
- }
- Request::Call{..} => {
- let r = schema::v1::light::RemoteCallResponse { proof: empty_proof() };
- schema::v1::light::Response {
- response: Some(schema::v1::light::response::Response::RemoteCallResponse(r)),
- }
- }
- Request::Changes{..} => {
- let r = schema::v1::light::RemoteChangesResponse {
- max: iter::repeat(1).take(32).collect(),
- proof: Vec::new(),
- roots: Vec::new(),
- roots_proof: empty_proof()
- };
- schema::v1::light::Response {
- response: Some(schema::v1::light::response::Response::RemoteChangesResponse(r)),
- }
- }
- };
-
- behaviour.request(request).unwrap();
-
- assert_eq!(1, behaviour.pending_requests.len());
- assert_eq!(0, behaviour.outstanding.len());
- assert_matches!(poll(&mut behaviour), Poll::Ready(NetworkBehaviourAction::NotifyHandler { .. }));
- assert_eq!(0, behaviour.pending_requests.len());
- assert_eq!(1, behaviour.outstanding.len());
- assert_eq!(1, *behaviour.outstanding.keys().next().unwrap());
-
- behaviour.inject_event(peer.clone(), conn, Event::Response(1, Response::Light(response)));
-
- poll(&mut behaviour);
-
- assert_eq!(0, behaviour.pending_requests.len());
- assert_eq!(0, behaviour.outstanding.len())
- }
-
- #[test]
- fn receives_remote_call_response() {
- let mut chan = oneshot::channel();
- let request = light::RemoteCallRequest {
- block: Default::default(),
- header: dummy_header(),
- method: "test".into(),
- call_data: vec![],
- retry_count: None,
- };
- issue_request(Request::Call { request, sender: chan.0 });
- assert_matches!(chan.1.try_recv(), Ok(Some(Ok(_))))
- }
-
- #[test]
- fn receives_remote_read_response() {
- let mut chan = oneshot::channel();
- let request = light::RemoteReadRequest {
- header: dummy_header(),
- block: Default::default(),
- keys: vec![b":key".to_vec()],
- retry_count: None,
- };
- issue_request(Request::Read { request, sender: chan.0 });
- assert_matches!(chan.1.try_recv(), Ok(Some(Ok(_))))
- }
-
- #[test]
- fn receives_remote_read_child_response() {
- let mut chan = oneshot::channel();
- let child_info = ChildInfo::new_default(&b":child_storage:default:sub"[..]);
- let request = light::RemoteReadChildRequest {
- header: dummy_header(),
- block: Default::default(),
- storage_key: child_info.prefixed_storage_key(),
- keys: vec![b":key".to_vec()],
- retry_count: None,
- };
- issue_request(Request::ReadChild { request, sender: chan.0 });
- assert_matches!(chan.1.try_recv(), Ok(Some(Ok(_))))
- }
-
- #[test]
- fn receives_remote_header_response() {
- let mut chan = oneshot::channel();
- let request = light::RemoteHeaderRequest {
- cht_root: Default::default(),
- block: 1,
- retry_count: None,
- };
- issue_request(Request::Header { request, sender: chan.0 });
- assert_matches!(chan.1.try_recv(), Ok(Some(Ok(_))))
- }
-
- #[test]
- fn receives_remote_changes_response() {
- let mut chan = oneshot::channel();
- let request = light::RemoteChangesRequest {
- changes_trie_configs: vec![sp_core::ChangesTrieConfigurationRange {
- zero: (0, Default::default()),
- end: None,
- config: Some(sp_core::ChangesTrieConfiguration::new(4, 2)),
- }],
- first_block: (1, Default::default()),
- last_block: (100, Default::default()),
- max_block: (100, Default::default()),
- tries_roots: (1, Default::default(), Vec::new()),
- key: Vec::new(),
- storage_key: None,
- retry_count: None,
- };
- issue_request(Request::Changes { request, sender: chan.0 });
- assert_matches!(chan.1.try_recv(), Ok(Some(Ok(_))))
- }
-
- fn send_receive(request: Request) {
- // We start a swarm on the listening side which awaits incoming requests and answers them:
- let local_pset = peerset();
- let local_listen_addr: libp2p::Multiaddr = libp2p::multiaddr::Protocol::Memory(rand::random()).into();
- let mut local_swarm = make_swarm(true, local_pset.1, make_config());
- Swarm::listen_on(&mut local_swarm, local_listen_addr.clone()).unwrap();
-
- // We also start a swarm that makes requests and awaits responses:
- let remote_pset = peerset();
- let mut remote_swarm = make_swarm(true, remote_pset.1, make_config());
-
- // We now schedule a request, dial the remote and let the two swarm work it out:
- remote_swarm.request(request).unwrap();
- Swarm::dial_addr(&mut remote_swarm, local_listen_addr).unwrap();
-
- let future = {
- let a = local_swarm.for_each(|_| future::ready(()));
- let b = remote_swarm.for_each(|_| future::ready(()));
- future::join(a, b).map(|_| ())
- };
-
- task::spawn(future);
- }
-
- #[test]
- fn send_receive_call() {
- let chan = oneshot::channel();
- let request = light::RemoteCallRequest {
- block: Default::default(),
- header: dummy_header(),
- method: "test".into(),
- call_data: vec![],
- retry_count: None,
- };
- send_receive(Request::Call { request, sender: chan.0 });
- assert_eq!(vec![42], task::block_on(chan.1).unwrap().unwrap());
- // ^--- from `DummyFetchChecker::check_execution_proof`
- }
-
- #[test]
- fn send_receive_read() {
- let chan = oneshot::channel();
- let request = light::RemoteReadRequest {
- header: dummy_header(),
- block: Default::default(),
- keys: vec![b":key".to_vec()],
- retry_count: None
- };
- send_receive(Request::Read { request, sender: chan.0 });
- assert_eq!(Some(vec![42]), task::block_on(chan.1).unwrap().unwrap().remove(&b":key"[..]).unwrap());
- // ^--- from `DummyFetchChecker::check_read_proof`
- }
-
- #[test]
- fn send_receive_read_child() {
- let chan = oneshot::channel();
- let child_info = ChildInfo::new_default(&b":child_storage:default:sub"[..]);
- let request = light::RemoteReadChildRequest {
- header: dummy_header(),
- block: Default::default(),
- storage_key: child_info.prefixed_storage_key(),
- keys: vec![b":key".to_vec()],
- retry_count: None,
- };
- send_receive(Request::ReadChild { request, sender: chan.0 });
- assert_eq!(Some(vec![42]), task::block_on(chan.1).unwrap().unwrap().remove(&b":key"[..]).unwrap());
- // ^--- from `DummyFetchChecker::check_read_child_proof`
- }
-
- #[test]
- fn send_receive_header() {
- sp_tracing::try_init_simple();
- let chan = oneshot::channel();
- let request = light::RemoteHeaderRequest {
- cht_root: Default::default(),
- block: 1,
- retry_count: None,
- };
- send_receive(Request::Header { request, sender: chan.0 });
- // The remote does not know block 1:
- assert_matches!(task::block_on(chan.1).unwrap(), Err(ClientError::RemoteFetchFailed));
- }
-
- #[test]
- fn send_receive_changes() {
- let chan = oneshot::channel();
- let request = light::RemoteChangesRequest {
- changes_trie_configs: vec![sp_core::ChangesTrieConfigurationRange {
- zero: (0, Default::default()),
- end: None,
- config: Some(sp_core::ChangesTrieConfiguration::new(4, 2)),
- }],
- first_block: (1, Default::default()),
- last_block: (100, Default::default()),
- max_block: (100, Default::default()),
- tries_roots: (1, Default::default(), Vec::new()),
- key: Vec::new(),
- storage_key: None,
- retry_count: None,
- };
- send_receive(Request::Changes { request, sender: chan.0 });
- assert_eq!(vec![(100, 2)], task::block_on(chan.1).unwrap().unwrap());
- // ^--- from `DummyFetchChecker::check_changes_proof`
- }
-
- #[test]
- fn body_request_fields_encoded_properly() {
- let (sender, _) = oneshot::channel();
- let serialized_request = serialize_request::(&Request::Body {
- request: RemoteBodyRequest {
- header: dummy_header(),
- retry_count: None,
- },
- sender,
- }).unwrap();
- let deserialized_request = schema::v1::BlockRequest::decode(&serialized_request[..]).unwrap();
- assert!(
- BlockAttributes::from_be_u32(deserialized_request.fields)
- .unwrap()
- .contains(BlockAttributes::BODY)
- );
- }
-}
diff --git a/client/network/src/light_client_requests.rs b/client/network/src/light_client_requests.rs
new file mode 100644
index 0000000000000..f859a35f45b24
--- /dev/null
+++ b/client/network/src/light_client_requests.rs
@@ -0,0 +1,334 @@
+// This file is part of Substrate.
+
+// Copyright (C) 2020-2021 Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+//! Helpers for outgoing and incoming light client requests.
+
+/// For outgoing light client requests.
+pub mod sender;
+/// For incoming light client requests.
+pub mod handler;
+
+use crate::config::ProtocolId;
+use crate::request_responses::ProtocolConfig;
+
+use std::time::Duration;
+
+/// Generate the light client protocol name from chain specific protocol identifier.
+fn generate_protocol_name(protocol_id: &ProtocolId) -> String {
+ let mut s = String::new();
+ s.push_str("/");
+ s.push_str(protocol_id.as_ref());
+ s.push_str("/light/2");
+ s
+}
+
+/// Generates a [`ProtocolConfig`] for the light client request protocol, refusing incoming requests.
+pub fn generate_protocol_config(protocol_id: &ProtocolId) -> ProtocolConfig {
+ ProtocolConfig {
+ name: generate_protocol_name(protocol_id).into(),
+ max_request_size: 1 * 1024 * 1024,
+ max_response_size: 16 * 1024 * 1024,
+ request_timeout: Duration::from_secs(15),
+ inbound_queue: None,
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::request_responses::IncomingRequest;
+ use crate::config::ProtocolId;
+
+ use assert_matches::assert_matches;
+ use futures::executor::{block_on, LocalPool};
+ use futures::task::Spawn;
+ use futures::{channel::oneshot, prelude::*};
+ use libp2p::PeerId;
+ use sc_client_api::StorageProof;
+ use sc_client_api::light::{RemoteCallRequest, RemoteChangesRequest, RemoteHeaderRequest};
+ use sc_client_api::light::{self, RemoteReadRequest, RemoteBodyRequest, ChangesProof};
+ use sc_client_api::{FetchChecker, RemoteReadChildRequest};
+ use sp_blockchain::Error as ClientError;
+ use sp_core::storage::ChildInfo;
+ use sp_runtime::generic::Header;
+ use sp_runtime::traits::{BlakeTwo256, Block as BlockT, NumberFor};
+ use std::collections::HashMap;
+ use std::sync::Arc;
+
+ pub struct DummyFetchChecker {
+ pub ok: bool,
+ pub _mark: std::marker::PhantomData,
+ }
+
+ impl FetchChecker for DummyFetchChecker {
+ fn check_header_proof(
+ &self,
+ _request: &RemoteHeaderRequest,
+ header: Option,
+ _remote_proof: StorageProof,
+ ) -> Result {
+ match self.ok {
+ true if header.is_some() => Ok(header.unwrap()),
+ _ => Err(ClientError::Backend("Test error".into())),
+ }
+ }
+
+ fn check_read_proof(
+ &self,
+ request: &RemoteReadRequest,
+ _: StorageProof,
+ ) -> Result, Option>>, ClientError> {
+ match self.ok {
+ true => Ok(request
+ .keys
+ .iter()
+ .cloned()
+ .map(|k| (k, Some(vec![42])))
+ .collect()),
+ false => Err(ClientError::Backend("Test error".into())),
+ }
+ }
+
+ fn check_read_child_proof(
+ &self,
+ request: &RemoteReadChildRequest,
+ _: StorageProof,
+ ) -> Result, Option>>, ClientError> {
+ match self.ok {
+ true => Ok(request
+ .keys
+ .iter()
+ .cloned()
+ .map(|k| (k, Some(vec![42])))
+ .collect()),
+ false => Err(ClientError::Backend("Test error".into())),
+ }
+ }
+
+ fn check_execution_proof(
+ &self,
+ _: &RemoteCallRequest,
+ _: StorageProof,
+ ) -> Result, ClientError> {
+ match self.ok {
+ true => Ok(vec![42]),
+ false => Err(ClientError::Backend("Test error".into())),
+ }
+ }
+
+ fn check_changes_proof(
+ &self,
+ _: &RemoteChangesRequest,
+ _: ChangesProof,
+ ) -> Result, u32)>, ClientError> {
+ match self.ok {
+ true => Ok(vec![(100u32.into(), 2)]),
+ false => Err(ClientError::Backend("Test error".into())),
+ }
+ }
+
+ fn check_body_proof(
+ &self,
+ _: &RemoteBodyRequest,
+ body: Vec,
+ ) -> Result, ClientError> {
+ match self.ok {
+ true => Ok(body),
+ false => Err(ClientError::Backend("Test error".into())),
+ }
+ }
+ }
+
+ pub fn protocol_id() -> ProtocolId {
+ ProtocolId::from("test")
+ }
+
+ pub fn peerset() -> (sc_peerset::Peerset, sc_peerset::PeersetHandle) {
+ let cfg = sc_peerset::SetConfig {
+ in_peers: 128,
+ out_peers: 128,
+ bootnodes: Default::default(),
+ reserved_only: false,
+ reserved_nodes: Default::default(),
+ };
+ sc_peerset::Peerset::from_config(sc_peerset::PeersetConfig { sets: vec![cfg] })
+ }
+
+ pub fn dummy_header() -> sp_test_primitives::Header {
+ sp_test_primitives::Header {
+ parent_hash: Default::default(),
+ number: 0,
+ state_root: Default::default(),
+ extrinsics_root: Default::default(),
+ digest: Default::default(),
+ }
+ }
+
+ type Block =
+ sp_runtime::generic::Block, substrate_test_runtime::Extrinsic>;
+
+ fn send_receive(request: sender::Request, pool: &LocalPool) {
+ let client = Arc::new(substrate_test_runtime_client::new());
+ let (handler, protocol_config) = handler::LightClientRequestHandler::new(&protocol_id(), client);
+ pool.spawner().spawn_obj(handler.run().boxed().into()).unwrap();
+
+ let (_peer_set, peer_set_handle) = peerset();
+ let mut sender = sender::LightClientRequestSender::::new(
+ &protocol_id(),
+ Arc::new(crate::light_client_requests::tests::DummyFetchChecker {
+ ok: true,
+ _mark: std::marker::PhantomData,
+ }),
+ peer_set_handle,
+ );
+ sender.inject_connected(PeerId::random());
+
+ sender.request(request).unwrap();
+ let sender::OutEvent::SendRequest { pending_response, request, .. } = block_on(sender.next()).unwrap();
+ let (tx, rx) = oneshot::channel();
+ block_on(protocol_config.inbound_queue.unwrap().send(IncomingRequest {
+ peer: PeerId::random(),
+ payload: request,
+ pending_response: tx,
+ })).unwrap();
+ pool.spawner().spawn_obj(async move {
+ pending_response.send(Ok(rx.await.unwrap().result.unwrap())).unwrap();
+ }.boxed().into()).unwrap();
+
+ pool.spawner().spawn_obj(sender.for_each(|_| future::ready(())).boxed().into()).unwrap();
+ }
+
+ #[test]
+ fn send_receive_call() {
+ let chan = oneshot::channel();
+ let request = light::RemoteCallRequest {
+ block: Default::default(),
+ header: dummy_header(),
+ method: "test".into(),
+ call_data: vec![],
+ retry_count: None,
+ };
+
+ let mut pool = LocalPool::new();
+ send_receive(sender::Request::Call {
+ request,
+ sender: chan.0,
+ }, &pool);
+ assert_eq!(vec![42], pool.run_until(chan.1).unwrap().unwrap());
+ // ^--- from `DummyFetchChecker::check_execution_proof`
+ }
+
+ #[test]
+ fn send_receive_read() {
+ let chan = oneshot::channel();
+ let request = light::RemoteReadRequest {
+ header: dummy_header(),
+ block: Default::default(),
+ keys: vec![b":key".to_vec()],
+ retry_count: None,
+ };
+ let mut pool = LocalPool::new();
+ send_receive(sender::Request::Read {
+ request,
+ sender: chan.0,
+ }, &pool);
+ assert_eq!(
+ Some(vec![42]),
+ pool.run_until(chan.1)
+ .unwrap()
+ .unwrap()
+ .remove(&b":key"[..])
+ .unwrap()
+ );
+ // ^--- from `DummyFetchChecker::check_read_proof`
+ }
+
+ #[test]
+ fn send_receive_read_child() {
+ let chan = oneshot::channel();
+ let child_info = ChildInfo::new_default(&b":child_storage:default:sub"[..]);
+ let request = light::RemoteReadChildRequest {
+ header: dummy_header(),
+ block: Default::default(),
+ storage_key: child_info.prefixed_storage_key(),
+ keys: vec![b":key".to_vec()],
+ retry_count: None,
+ };
+ let mut pool = LocalPool::new();
+ send_receive(sender::Request::ReadChild {
+ request,
+ sender: chan.0,
+ }, &pool);
+ assert_eq!(
+ Some(vec![42]),
+ pool.run_until(chan.1)
+ .unwrap()
+ .unwrap()
+ .remove(&b":key"[..])
+ .unwrap()
+ );
+ // ^--- from `DummyFetchChecker::check_read_child_proof`
+ }
+
+ #[test]
+ fn send_receive_header() {
+ sp_tracing::try_init_simple();
+ let chan = oneshot::channel();
+ let request = light::RemoteHeaderRequest {
+ cht_root: Default::default(),
+ block: 1,
+ retry_count: None,
+ };
+ let mut pool = LocalPool::new();
+ send_receive(sender::Request::Header {
+ request,
+ sender: chan.0,
+ }, &pool);
+ // The remote does not know block 1:
+ assert_matches!(
+ pool.run_until(chan.1).unwrap(),
+ Err(ClientError::RemoteFetchFailed)
+ );
+ }
+
+ #[test]
+ fn send_receive_changes() {
+ let chan = oneshot::channel();
+ let request = light::RemoteChangesRequest {
+ changes_trie_configs: vec![sp_core::ChangesTrieConfigurationRange {
+ zero: (0, Default::default()),
+ end: None,
+ config: Some(sp_core::ChangesTrieConfiguration::new(4, 2)),
+ }],
+ first_block: (1, Default::default()),
+ last_block: (100, Default::default()),
+ max_block: (100, Default::default()),
+ tries_roots: (1, Default::default(), Vec::new()),
+ key: Vec::new(),
+ storage_key: None,
+ retry_count: None,
+ };
+ let mut pool = LocalPool::new();
+ send_receive(sender::Request::Changes {
+ request,
+ sender: chan.0,
+ }, &pool);
+ assert_eq!(vec![(100, 2)], pool.run_until(chan.1).unwrap().unwrap());
+ // ^--- from `DummyFetchChecker::check_changes_proof`
+ }
+}
diff --git a/client/network/src/light_client_requests/handler.rs b/client/network/src/light_client_requests/handler.rs
new file mode 100644
index 0000000000000..08de99a0a5de4
--- /dev/null
+++ b/client/network/src/light_client_requests/handler.rs
@@ -0,0 +1,399 @@
+// This file is part of Substrate.
+
+// Copyright (C) 2020-2021 Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+//! Helper for incoming light client requests.
+//!
+//! Handle (i.e. answer) incoming light client requests from a remote peer received via
+//! [`crate::request_responses::RequestResponsesBehaviour`] with [`LightClientRequestHandler`].
+
+use codec::{self, Encode, Decode};
+use crate::{
+ chain::Client,
+ config::ProtocolId,
+ schema,
+ PeerId,
+};
+use crate::request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig};
+use futures::{channel::mpsc, prelude::*};
+use prost::Message;
+use sc_client_api::{
+ StorageProof,
+ light
+};
+use sc_peerset::ReputationChange;
+use sp_core::{
+ storage::{ChildInfo, ChildType,StorageKey, PrefixedStorageKey},
+ hexdisplay::HexDisplay,
+};
+use sp_runtime::{
+ traits::{Block, Zero},
+ generic::BlockId,
+};
+use std::{
+ collections::{BTreeMap},
+ sync::Arc,
+};
+use log::debug;
+
+const LOG_TARGET: &str = "light-client-request-handler";
+
+/// Handler for incoming light client requests from a remote peer.
+pub struct LightClientRequestHandler {
+ request_receiver: mpsc::Receiver,
+ /// Blockchain client.
+ client: Arc>,
+}
+
+impl LightClientRequestHandler {
+ /// Create a new [`BlockRequestHandler`].
+ pub fn new(
+ protocol_id: &ProtocolId,
+ client: Arc>,
+ ) -> (Self, ProtocolConfig) {
+ // For now due to lack of data on light client request handling in production systems, this
+ // value is chosen to match the block request limit.
+ let (tx, request_receiver) = mpsc::channel(20);
+
+ let mut protocol_config = super::generate_protocol_config(protocol_id);
+ protocol_config.inbound_queue = Some(tx);
+
+ (Self { client, request_receiver }, protocol_config)
+ }
+
+ /// Run [`LightClientRequestHandler`].
+ pub async fn run(mut self) {
+ while let Some(request) = self.request_receiver.next().await {
+ let IncomingRequest { peer, payload, pending_response } = request;
+
+ match self.handle_request(peer, payload) {
+ Ok(response_data) => {
+ let response = OutgoingResponse { result: Ok(response_data), reputation_changes: Vec::new() };
+ match pending_response.send(response) {
+ Ok(()) => debug!(
+ target: LOG_TARGET,
+ "Handled light client request from {}.",
+ peer,
+ ),
+ Err(_) => debug!(
+ target: LOG_TARGET,
+ "Failed to handle light client request from {}: {}",
+ peer, HandleRequestError::SendResponse,
+ ),
+ };
+ } ,
+ Err(e) => {
+ debug!(
+ target: LOG_TARGET,
+ "Failed to handle light client request from {}: {}",
+ peer, e,
+ );
+
+ let reputation_changes = match e {
+ HandleRequestError::BadRequest(_) => {
+ vec![ReputationChange::new(-(1 << 12), "bad request")]
+ }
+ _ => Vec::new(),
+ };
+
+ let response = OutgoingResponse { result: Err(()), reputation_changes };
+ if pending_response.send(response).is_err() {
+ debug!(
+ target: LOG_TARGET,
+ "Failed to handle light client request from {}: {}",
+ peer, HandleRequestError::SendResponse,
+ );
+ };
+ },
+ }
+ }
+ }
+
+
+ fn handle_request(
+ &mut self,
+ peer: PeerId,
+ payload: Vec,
+ ) -> Result, HandleRequestError> {
+ let request = schema::v1::light::Request::decode(&payload[..])?;
+
+ let response = match &request.request {
+ Some(schema::v1::light::request::Request::RemoteCallRequest(r)) =>
+ self.on_remote_call_request(&peer, r)?,
+ Some(schema::v1::light::request::Request::RemoteReadRequest(r)) =>
+ self.on_remote_read_request(&peer, r)?,
+ Some(schema::v1::light::request::Request::RemoteHeaderRequest(r)) =>
+ self.on_remote_header_request(&peer, r)?,
+ Some(schema::v1::light::request::Request::RemoteReadChildRequest(r)) =>
+ self.on_remote_read_child_request(&peer, r)?,
+ Some(schema::v1::light::request::Request::RemoteChangesRequest(r)) =>
+ self.on_remote_changes_request(&peer, r)?,
+ None => {
+ return Err(HandleRequestError::BadRequest("Remote request without request data."));
+ }
+ };
+
+ let mut data = Vec::new();
+ response.encode(&mut data)?;
+
+ Ok(data)
+ }
+
+ fn on_remote_call_request(
+ &mut self,
+ peer: &PeerId,
+ request: &schema::v1::light::RemoteCallRequest,
+ ) -> Result {
+ log::trace!(
+ "Remote call request from {} ({} at {:?}).",
+ peer, request.method, request.block,
+ );
+
+ let block = Decode::decode(&mut request.block.as_ref())?;
+
+ let proof = match self.client.execution_proof(
+ &BlockId::Hash(block),
+ &request.method, &request.data,
+ ) {
+ Ok((_, proof)) => proof,
+ Err(e) => {
+ log::trace!(
+ "remote call request from {} ({} at {:?}) failed with: {}",
+ peer, request.method, request.block, e,
+ );
+ StorageProof::empty()
+ }
+ };
+
+ let response = {
+ let r = schema::v1::light::RemoteCallResponse { proof: proof.encode() };
+ schema::v1::light::response::Response::RemoteCallResponse(r)
+ };
+
+ Ok(schema::v1::light::Response { response: Some(response) })
+ }
+
+ fn on_remote_read_request(
+ &mut self,
+ peer: &PeerId,
+ request: &schema::v1::light::RemoteReadRequest,
+ ) -> Result {
+ if request.keys.is_empty() {
+ log::debug!("Invalid remote read request sent by {}.", peer);
+ return Err(HandleRequestError::BadRequest("Remote read request without keys."))
+ }
+
+ log::trace!(
+ "Remote read request from {} ({} at {:?}).",
+ peer, fmt_keys(request.keys.first(), request.keys.last()), request.block,
+ );
+
+ let block = Decode::decode(&mut request.block.as_ref())?;
+
+ let proof = match self.client.read_proof(
+ &BlockId::Hash(block),
+ &mut request.keys.iter().map(AsRef::as_ref),
+ ) {
+ Ok(proof) => proof,
+ Err(error) => {
+ log::trace!(
+ "remote read request from {} ({} at {:?}) failed with: {}",
+ peer, fmt_keys(request.keys.first(), request.keys.last()), request.block, error,
+ );
+ StorageProof::empty()
+ }
+ };
+
+ let response = {
+ let r = schema::v1::light::RemoteReadResponse { proof: proof.encode() };
+ schema::v1::light::response::Response::RemoteReadResponse(r)
+ };
+
+ Ok(schema::v1::light::Response { response: Some(response) })
+ }
+
+ fn on_remote_read_child_request(
+ &mut self,
+ peer: &PeerId,
+ request: &schema::v1::light::RemoteReadChildRequest,
+ ) -> Result {
+ if request.keys.is_empty() {
+ log::debug!("Invalid remote child read request sent by {}.", peer);
+ return Err(HandleRequestError::BadRequest("Remove read child request without keys."))
+ }
+
+ log::trace!(
+ "Remote read child request from {} ({} {} at {:?}).",
+ peer,
+ HexDisplay::from(&request.storage_key),
+ fmt_keys(request.keys.first(), request.keys.last()),
+ request.block,
+ );
+
+ let block = Decode::decode(&mut request.block.as_ref())?;
+
+ let prefixed_key = PrefixedStorageKey::new_ref(&request.storage_key);
+ let child_info = match ChildType::from_prefixed_key(prefixed_key) {
+ Some((ChildType::ParentKeyId, storage_key)) => Ok(ChildInfo::new_default(storage_key)),
+ None => Err(sp_blockchain::Error::InvalidChildStorageKey),
+ };
+ let proof = match child_info.and_then(|child_info| self.client.read_child_proof(
+ &BlockId::Hash(block),
+ &child_info,
+ &mut request.keys.iter().map(AsRef::as_ref)
+ )) {
+ Ok(proof) => proof,
+ Err(error) => {
+ log::trace!(
+ "remote read child request from {} ({} {} at {:?}) failed with: {}",
+ peer,
+ HexDisplay::from(&request.storage_key),
+ fmt_keys(request.keys.first(), request.keys.last()),
+ request.block,
+ error,
+ );
+ StorageProof::empty()
+ }
+ };
+
+ let response = {
+ let r = schema::v1::light::RemoteReadResponse { proof: proof.encode() };
+ schema::v1::light::response::Response::RemoteReadResponse(r)
+ };
+
+ Ok(schema::v1::light::Response { response: Some(response) })
+ }
+
+ fn on_remote_header_request(
+ &mut self,
+ peer: &PeerId,
+ request: &schema::v1::light::RemoteHeaderRequest,
+ ) -> Result {
+ log::trace!("Remote header proof request from {} ({:?}).", peer, request.block);
+
+ let block = Decode::decode(&mut request.block.as_ref())?;
+ let (header, proof) = match self.client.header_proof(&BlockId::Number(block)) {
+ Ok((header, proof)) => (header.encode(), proof),
+ Err(error) => {
+ log::trace!(
+ "Remote header proof request from {} ({:?}) failed with: {}.",
+ peer, request.block, error
+ );
+ (Default::default(), StorageProof::empty())
+ }
+ };
+
+ let response = {
+ let r = schema::v1::light::RemoteHeaderResponse { header, proof: proof.encode() };
+ schema::v1::light::response::Response::RemoteHeaderResponse(r)
+ };
+
+ Ok(schema::v1::light::Response { response: Some(response) })
+ }
+
+ fn on_remote_changes_request(
+ &mut self,
+ peer: &PeerId,
+ request: &schema::v1::light::RemoteChangesRequest,
+ ) -> Result {
+ log::trace!(
+ "Remote changes proof request from {} for key {} ({:?}..{:?}).",
+ peer,
+ if !request.storage_key.is_empty() {
+ format!("{} : {}", HexDisplay::from(&request.storage_key), HexDisplay::from(&request.key))
+ } else {
+ HexDisplay::from(&request.key).to_string()
+ },
+ request.first,
+ request.last,
+ );
+
+ let first = Decode::decode(&mut request.first.as_ref())?;
+ let last = Decode::decode(&mut request.last.as_ref())?;
+ let min = Decode::decode(&mut request.min.as_ref())?;
+ let max = Decode::decode(&mut request.max.as_ref())?;
+ let key = StorageKey(request.key.clone());
+ let storage_key = if request.storage_key.is_empty() {
+ None
+ } else {
+ Some(PrefixedStorageKey::new_ref(&request.storage_key))
+ };
+
+ let proof = match self.client.key_changes_proof(first, last, min, max, storage_key, &key) {
+ Ok(proof) => proof,
+ Err(error) => {
+ log::trace!(
+ "Remote changes proof request from {} for key {} ({:?}..{:?}) failed with: {}.",
+ peer,
+ format!("{} : {}", HexDisplay::from(&request.storage_key), HexDisplay::from(&key.0)),
+ request.first,
+ request.last,
+ error,
+ );
+
+ light::ChangesProof:: {
+ max_block: Zero::zero(),
+ proof: Vec::new(),
+ roots: BTreeMap::new(),
+ roots_proof: StorageProof::empty(),
+ }
+ }
+ };
+
+ let response = {
+ let r = schema::v1::light::RemoteChangesResponse {
+ max: proof.max_block.encode(),
+ proof: proof.proof,
+ roots: proof.roots.into_iter()
+ .map(|(k, v)| schema::v1::light::Pair { fst: k.encode(), snd: v.encode() })
+ .collect(),
+ roots_proof: proof.roots_proof.encode(),
+ };
+ schema::v1::light::response::Response::RemoteChangesResponse(r)
+ };
+
+ Ok(schema::v1::light::Response { response: Some(response) })
+ }
+}
+
+#[derive(derive_more::Display, derive_more::From)]
+enum HandleRequestError {
+ #[display(fmt = "Failed to decode request: {}.", _0)]
+ DecodeProto(prost::DecodeError),
+ #[display(fmt = "Failed to encode response: {}.", _0)]
+ EncodeProto(prost::EncodeError),
+ #[display(fmt = "Failed to send response.")]
+ SendResponse,
+ /// A bad request has been received.
+ #[display(fmt = "bad request: {}", _0)]
+ BadRequest(&'static str),
+ /// Encoding or decoding of some data failed.
+ #[display(fmt = "codec error: {}", _0)]
+ Codec(codec::Error),
+}
+
+fn fmt_keys(first: Option<&Vec>, last: Option<&Vec>) -> String {
+ if let (Some(first), Some(last)) = (first, last) {
+ if first == last {
+ HexDisplay::from(first).to_string()
+ } else {
+ format!("{}..{}", HexDisplay::from(first), HexDisplay::from(last))
+ }
+ } else {
+ String::from("n/a")
+ }
+}
diff --git a/client/network/src/light_client_requests/sender.rs b/client/network/src/light_client_requests/sender.rs
new file mode 100644
index 0000000000000..652f465d6f250
--- /dev/null
+++ b/client/network/src/light_client_requests/sender.rs
@@ -0,0 +1,1343 @@
+// This file is part of Substrate.
+
+// Copyright (C) 2020-2021 Parity Technologies (UK) Ltd.
+// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+//! Helper for outgoing light client requests.
+//!
+//! Call [`LightClientRequestSender::send_request`] to send out light client requests. It will:
+//!
+//! 1. Build the request.
+//!
+//! 2. Forward the request to [`crate::request_responses::RequestResponsesBehaviour`] via
+//! [`OutEvent::SendRequest`].
+//!
+//! 3. Wait for the response and forward the response via the [`oneshot::Sender`] provided earlier
+//! with [`LightClientRequestSender::send_request`].
+
+use codec::{self, Encode, Decode};
+use crate::{
+ config::ProtocolId,
+ protocol::message::{BlockAttributes},
+ schema,
+ PeerId,
+};
+use crate::request_responses::{RequestFailure, OutboundFailure};
+use futures::{channel::{oneshot}, future::BoxFuture, prelude::*, stream::FuturesUnordered};
+use prost::Message;
+use sc_client_api::{
+ light::{
+ self, RemoteBodyRequest,
+ }
+};
+use sc_peerset::ReputationChange;
+use sp_blockchain::{Error as ClientError};
+use sp_runtime::{
+ traits::{Block, Header, NumberFor},
+};
+use std::{
+ collections::{BTreeMap, VecDeque, HashMap},
+ pin::Pin,
+ sync::Arc,
+ task::{Context, Poll},
+};
+
+mod rep {
+ use super::*;
+
+ /// Reputation change for a peer when a request timed out.
+ pub const TIMEOUT: ReputationChange = ReputationChange::new(-(1 << 8), "light client request timeout");
+ /// Reputation change for a peer when a request is refused.
+ pub const REFUSED: ReputationChange = ReputationChange::new(-(1 << 8), "light client request refused");
+}
+
+/// Configuration options for [`LightClientRequestSender`].
+#[derive(Debug, Clone)]
+struct Config {
+ max_pending_requests: usize,
+ light_protocol: String,
+ block_protocol: String,
+}
+
+impl Config {
+ /// Create a new [`LightClientRequestSender`] configuration.
+ pub fn new(id: &ProtocolId) -> Self {
+ Config {
+ max_pending_requests: 128,
+ light_protocol: super::generate_protocol_name(id),
+ block_protocol: crate::block_request_handler::generate_protocol_name(id),
+ }
+ }
+}
+
+/// State machine helping to send out light client requests.
+pub struct LightClientRequestSender {
+ /// This behaviour's configuration.
+ config: Config,
+ /// Verifies that received responses are correct.
+ checker: Arc>,
+ /// Peer information (addresses, their best block, etc.)
+ peers: HashMap>,
+ /// Pending (local) requests.
+ pending_requests: VecDeque>,
+ /// Requests on their way to remote peers.
+ sent_requests: FuturesUnordered, Result, RequestFailure>, oneshot::Canceled>),
+ >>,
+ /// Handle to use for reporting misbehaviour of peers.
+ peerset: sc_peerset::PeersetHandle,
+}
+
+/// Augments a pending light client request with metadata.
+#[derive(Debug)]
+struct PendingRequest {
+ /// Remaining attempts.
+ attempts_left: usize,
+ /// The actual request.
+ request: Request,
+}
+
+impl PendingRequest {
+ fn new(req: Request) -> Self {
+ PendingRequest {
+ // Number of retries + one for the initial attempt.
+ attempts_left: req.retries() + 1,
+ request: req,
+ }
+ }
+
+ fn into_sent(self, peer_id: PeerId) -> SentRequest {
+ SentRequest {
+ attempts_left: self.attempts_left,
+ request: self.request,
+ peer: peer_id,
+ }
+ }
+}
+
+/// Augments a light client request with metadata that is currently being send to a remote.
+#[derive(Debug)]
+struct SentRequest {
+ /// Remaining attempts.
+ attempts_left: usize,
+ /// The actual request.
+ request: Request,
+ /// The peer that the request is send to.
+ peer: PeerId,
+}
+
+impl SentRequest {
+ fn into_pending(self) -> PendingRequest {
+ PendingRequest {
+ attempts_left: self.attempts_left,
+ request: self.request,
+ }
+ }
+}
+
+impl Unpin for LightClientRequestSender {}
+
+impl LightClientRequestSender
+where
+ B: Block,
+{
+ /// Construct a new light client handler.
+ pub fn new(
+ id: &ProtocolId,
+ checker: Arc>,
+ peerset: sc_peerset::PeersetHandle,
+ ) -> Self {
+ LightClientRequestSender {
+ config: Config::new(id),
+ checker,
+ peers: Default::default(),
+ pending_requests: Default::default(),
+ sent_requests: Default::default(),
+ peerset,
+ }
+ }
+
+ /// We rely on external information about peers best blocks as we lack the
+ /// means to determine it ourselves.
+ pub fn update_best_block(&mut self, peer: &PeerId, num: NumberFor) {
+ if let Some(info) = self.peers.get_mut(peer) {
+ log::trace!("new best block for {:?}: {:?}", peer, num);
+ info.best_block = Some(num)
+ }
+ }
+
+ /// Issue a new light client request.
+ pub fn request(&mut self, req: Request) -> Result<(), SendRequestError> {
+ if self.pending_requests.len() >= self.config.max_pending_requests {
+ return Err(SendRequestError::TooManyRequests)
+ }
+ self.pending_requests.push_back(PendingRequest::new(req));
+ Ok(())
+ }
+
+ /// Remove the given peer.
+ ///
+ /// In-flight requests to the given peer might fail and be retried. See
+ /// [`::poll_next`].
+ fn remove_peer(&mut self, peer: PeerId) {
+ self.peers.remove(&peer);
+ }
+
+ /// Process a local request's response from remote.
+ ///
+ /// If successful, this will give us the actual, checked data we should be
+ /// sending back to the client, otherwise an error.
+ fn on_response(
+ &mut self,
+ peer: PeerId,
+ request: &Request,
+ response: Response,
+ ) -> Result, Error> {
+ log::trace!("response from {}", peer);
+ match response {
+ Response::Light(r) => self.on_response_light(request, r),
+ Response::Block(r) => self.on_response_block(request, r),
+ }
+ }
+
+ fn on_response_light(
+ &mut self,
+ request: &Request,
+ response: schema::v1::light::Response,
+ ) -> Result, Error> {
+ use schema::v1::light::response::Response;
+ match response.response {
+ Some(Response::RemoteCallResponse(response)) =>
+ if let Request::Call { request , .. } = request {
+ let proof = Decode::decode(&mut response.proof.as_ref())?;
+ let reply = self.checker.check_execution_proof(request, proof)?;
+ Ok(Reply::VecU8(reply))
+ } else {
+ Err(Error::UnexpectedResponse)
+ }
+ Some(Response::RemoteReadResponse(response)) =>
+ match request {
+ Request::Read { request, .. } => {
+ let proof = Decode::decode(&mut response.proof.as_ref())?;
+ let reply = self.checker.check_read_proof(&request, proof)?;
+ Ok(Reply::MapVecU8OptVecU8(reply))
+ }
+ Request::ReadChild { request, .. } => {
+ let proof = Decode::decode(&mut response.proof.as_ref())?;
+ let reply = self.checker.check_read_child_proof(&request, proof)?;
+ Ok(Reply::MapVecU8OptVecU8(reply))
+ }
+ _ => Err(Error::UnexpectedResponse)
+ }
+ Some(Response::RemoteChangesResponse(response)) =>
+ if let Request::Changes { request, .. } = request {
+ let max_block = Decode::decode(&mut response.max.as_ref())?;
+ let roots_proof = Decode::decode(&mut response.roots_proof.as_ref())?;
+ let roots = {
+ let mut r = BTreeMap::new();
+ for pair in response.roots {
+ let k = Decode::decode(&mut pair.fst.as_ref())?;
+ let v = Decode::decode(&mut pair.snd.as_ref())?;
+ r.insert(k, v);
+ }
+ r
+ };
+ let reply = self.checker.check_changes_proof(&request, light::ChangesProof {
+ max_block,
+ proof: response.proof,
+ roots,
+ roots_proof,
+ })?;
+ Ok(Reply::VecNumberU32(reply))
+ } else {
+ Err(Error::UnexpectedResponse)
+ }
+ Some(Response::RemoteHeaderResponse(response)) =>
+ if let Request::Header { request, .. } = request {
+ let header =
+ if response.header.is_empty() {
+ None
+ } else {
+ Some(Decode::decode(&mut response.header.as_ref())?)
+ };
+ let proof = Decode::decode(&mut response.proof.as_ref())?;
+ let reply = self.checker.check_header_proof(&request, header, proof)?;
+ Ok(Reply::Header(reply))
+ } else {
+ Err(Error::UnexpectedResponse)
+ }
+ None => Err(Error::UnexpectedResponse)
+ }
+ }
+
+ fn on_response_block(
+ &mut self,
+ request: &Request,
+ response: schema::v1::BlockResponse,
+ ) -> Result, Error> {
+ let request = if let Request::Body { request , .. } = &request {
+ request
+ } else {
+ return Err(Error::UnexpectedResponse);
+ };
+
+ let body: Vec<_> = match response.blocks.into_iter().next() {
+ Some(b) => b.body,
+ None => return Err(Error::UnexpectedResponse),
+ };
+
+ let body = body.into_iter()
+ .map(|extrinsic| B::Extrinsic::decode(&mut &extrinsic[..]))
+ .collect::>()?;
+
+ let body = self.checker.check_body_proof(&request, body)?;
+ Ok(Reply::Extrinsics(body))
+ }
+
+ /// Signal that the node is connected to the given peer.
+ pub fn inject_connected(&mut self, peer: PeerId) {
+ let prev_entry = self.peers.insert(peer, Default::default());
+ debug_assert!(
+ prev_entry.is_none(),
+ "Expect `inject_connected` to be called for disconnected peer.",
+ );
+ }
+
+ /// Signal that the node disconnected from the given peer.
+ pub fn inject_disconnected(&mut self, peer: PeerId) {
+ self.remove_peer(peer)
+ }
+}
+
+
+impl Stream for LightClientRequestSender {
+ type Item = OutEvent;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll