|
| 1 | +// Copyright 2020 Parity Technologies (UK) Ltd. |
| 2 | +// This file is part of Substrate. |
| 3 | +// |
| 4 | +// Substrate is free software: you can redistribute it and/or modify |
| 5 | +// it under the terms of the GNU General Public License as published by |
| 6 | +// the Free Software Foundation, either version 3 of the License, or |
| 7 | +// (at your option) any later version. |
| 8 | +// |
| 9 | +// Substrate is distributed in the hope that it will be useful, |
| 10 | +// but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 12 | +// GNU General Public License for more details. |
| 13 | +// |
| 14 | +// You should have received a copy of the GNU General Public License |
| 15 | +// along with Substrate. If not, see <http://www.gnu.org/licenses/>. |
| 16 | + |
| 17 | +//! `NetworkBehaviour` implementation which handles incoming finality proof requests. |
| 18 | +//! |
| 19 | +//! Every request is coming in on a separate connection substream which gets |
| 20 | +//! closed after we have sent the response back. Incoming requests are encoded |
| 21 | +//! as protocol buffers (cf. `finality.v1.proto`). |
| 22 | +
|
| 23 | +#![allow(unused)] |
| 24 | + |
| 25 | +use bytes::Bytes; |
| 26 | +use codec::{Encode, Decode}; |
| 27 | +use crate::{ |
| 28 | + chain::FinalityProofProvider, |
| 29 | + config::ProtocolId, |
| 30 | + protocol::{api, message::BlockAttributes} |
| 31 | +}; |
| 32 | +use futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered}; |
| 33 | +use libp2p::{ |
| 34 | + core::{ |
| 35 | + ConnectedPoint, |
| 36 | + Multiaddr, |
| 37 | + PeerId, |
| 38 | + connection::ConnectionId, |
| 39 | + upgrade::{InboundUpgrade, ReadOneError, UpgradeInfo, Negotiated}, |
| 40 | + upgrade::{DeniedUpgrade, read_one, write_one} |
| 41 | + }, |
| 42 | + swarm::{ |
| 43 | + NegotiatedSubstream, |
| 44 | + NetworkBehaviour, |
| 45 | + NetworkBehaviourAction, |
| 46 | + OneShotHandler, |
| 47 | + OneShotHandlerConfig, |
| 48 | + PollParameters, |
| 49 | + SubstreamProtocol |
| 50 | + } |
| 51 | +}; |
| 52 | +use prost::Message; |
| 53 | +use sp_runtime::{generic::BlockId, traits::{Block, Header, One, Zero}}; |
| 54 | +use std::{ |
| 55 | + cmp::min, |
| 56 | + io, |
| 57 | + iter, |
| 58 | + sync::Arc, |
| 59 | + time::Duration, |
| 60 | + task::{Context, Poll} |
| 61 | +}; |
| 62 | +use void::{Void, unreachable}; |
| 63 | + |
| 64 | +// Type alias for convenience. |
| 65 | +pub type Error = Box<dyn std::error::Error + 'static>; |
| 66 | + |
| 67 | +/// Configuration options for `FinalityProofRequests`. |
| 68 | +#[derive(Debug, Clone)] |
| 69 | +pub struct Config { |
| 70 | + max_request_len: usize, |
| 71 | + inactivity_timeout: Duration, |
| 72 | + protocol: Bytes, |
| 73 | +} |
| 74 | + |
| 75 | +impl Config { |
| 76 | + /// Create a fresh configuration with the following options: |
| 77 | + /// |
| 78 | + /// - max. request size = 1 MiB |
| 79 | + /// - inactivity timeout = 15s |
| 80 | + pub fn new(id: &ProtocolId) -> Self { |
| 81 | + let mut c = Config { |
| 82 | + max_request_len: 1024 * 1024, |
| 83 | + inactivity_timeout: Duration::from_secs(15), |
| 84 | + protocol: Bytes::new(), |
| 85 | + }; |
| 86 | + c.set_protocol(id); |
| 87 | + c |
| 88 | + } |
| 89 | + |
| 90 | + /// Limit the max. length of incoming finality proof request bytes. |
| 91 | + pub fn set_max_request_len(&mut self, v: usize) -> &mut Self { |
| 92 | + self.max_request_len = v; |
| 93 | + self |
| 94 | + } |
| 95 | + |
| 96 | + /// Limit the max. duration the substream may remain inactive before closing it. |
| 97 | + pub fn set_inactivity_timeout(&mut self, v: Duration) -> &mut Self { |
| 98 | + self.inactivity_timeout = v; |
| 99 | + self |
| 100 | + } |
| 101 | + |
| 102 | + /// Set protocol to use for upgrade negotiation. |
| 103 | + pub fn set_protocol(&mut self, id: &ProtocolId) -> &mut Self { |
| 104 | + let mut v = Vec::new(); |
| 105 | + v.extend_from_slice(b"/"); |
| 106 | + v.extend_from_slice(id.as_bytes()); |
| 107 | + v.extend_from_slice(b"/finality-proof/1"); |
| 108 | + self.protocol = v.into(); |
| 109 | + self |
| 110 | + } |
| 111 | +} |
| 112 | + |
| 113 | +/// The finality proof request handling behaviour. |
| 114 | +pub struct FinalityProofRequests<B: Block> { |
| 115 | + /// This behaviour's configuration. |
| 116 | + config: Config, |
| 117 | + /// How to construct finality proofs. |
| 118 | + finality_proof_provider: Arc<dyn FinalityProofProvider<B>>, |
| 119 | + /// Futures sending back the finality proof request responses. |
| 120 | + outgoing: FuturesUnordered<BoxFuture<'static, ()>>, |
| 121 | +} |
| 122 | + |
| 123 | +impl<B> FinalityProofRequests<B> |
| 124 | +where |
| 125 | + B: Block, |
| 126 | +{ |
| 127 | + /// Initializes the behaviour. |
| 128 | + pub fn new(cfg: Config, finality_proof_provider: Arc<dyn FinalityProofProvider<B>>) -> Self { |
| 129 | + FinalityProofRequests { |
| 130 | + config: cfg, |
| 131 | + finality_proof_provider, |
| 132 | + outgoing: FuturesUnordered::new(), |
| 133 | + } |
| 134 | + } |
| 135 | + |
| 136 | + /// Callback, invoked when a new finality request has been received from remote. |
| 137 | + fn on_finality_request(&mut self, peer: &PeerId, request: &api::v1::finality::FinalityProofRequest) |
| 138 | + -> Result<api::v1::finality::FinalityProofResponse, Error> |
| 139 | + { |
| 140 | + let block_hash = Decode::decode(&mut request.block_hash.as_ref())?; |
| 141 | + |
| 142 | + log::trace!(target: "sync", "Finality proof request from {} for {}", peer, block_hash); |
| 143 | + |
| 144 | + let finality_proof = self.finality_proof_provider |
| 145 | + .prove_finality(block_hash, &request.request)? |
| 146 | + .unwrap_or(Vec::new()); |
| 147 | + // Note that an empty Vec is sent if no proof is available. |
| 148 | + |
| 149 | + Ok(api::v1::finality::FinalityProofResponse { proof: finality_proof }) |
| 150 | + } |
| 151 | +} |
| 152 | + |
| 153 | +impl<B> NetworkBehaviour for FinalityProofRequests<B> |
| 154 | +where |
| 155 | + B: Block |
| 156 | +{ |
| 157 | + type ProtocolsHandler = OneShotHandler<Protocol, DeniedUpgrade, Request<NegotiatedSubstream>>; |
| 158 | + type OutEvent = Void; |
| 159 | + |
| 160 | + fn new_handler(&mut self) -> Self::ProtocolsHandler { |
| 161 | + let p = Protocol { |
| 162 | + max_request_len: self.config.max_request_len, |
| 163 | + protocol: self.config.protocol.clone(), |
| 164 | + }; |
| 165 | + let mut cfg = OneShotHandlerConfig::default(); |
| 166 | + cfg.inactive_timeout = self.config.inactivity_timeout; |
| 167 | + OneShotHandler::new(SubstreamProtocol::new(p), cfg) |
| 168 | + } |
| 169 | + |
| 170 | + fn addresses_of_peer(&mut self, _: &PeerId) -> Vec<Multiaddr> { |
| 171 | + Vec::new() |
| 172 | + } |
| 173 | + |
| 174 | + fn inject_connected(&mut self, _peer: &PeerId) { |
| 175 | + } |
| 176 | + |
| 177 | + fn inject_disconnected(&mut self, _peer: &PeerId) { |
| 178 | + } |
| 179 | + |
| 180 | + fn inject_event( |
| 181 | + &mut self, |
| 182 | + peer: PeerId, |
| 183 | + connection: ConnectionId, |
| 184 | + Request(request, mut stream): Request<NegotiatedSubstream> |
| 185 | + ) { |
| 186 | + match self.on_finality_request(&peer, &request) { |
| 187 | + Ok(res) => { |
| 188 | + log::trace!("enqueueing finality response for peer {}", peer); |
| 189 | + let mut data = Vec::with_capacity(res.encoded_len()); |
| 190 | + if let Err(e) = res.encode(&mut data) { |
| 191 | + log::debug!("error encoding finality response for peer {}: {}", peer, e) |
| 192 | + } else { |
| 193 | + let future = async move { |
| 194 | + if let Err(e) = write_one(&mut stream, data).await { |
| 195 | + log::debug!("error writing finality response: {}", e) |
| 196 | + } |
| 197 | + }; |
| 198 | + self.outgoing.push(future.boxed()) |
| 199 | + } |
| 200 | + } |
| 201 | + Err(e) => log::debug!("error handling finality request from peer {}: {}", peer, e) |
| 202 | + } |
| 203 | + } |
| 204 | + |
| 205 | + fn poll(&mut self, cx: &mut Context, _: &mut impl PollParameters) -> Poll<NetworkBehaviourAction<DeniedUpgrade, Void>> { |
| 206 | + while let Poll::Ready(Some(_)) = self.outgoing.poll_next_unpin(cx) {} |
| 207 | + Poll::Pending |
| 208 | + } |
| 209 | +} |
| 210 | + |
| 211 | +/// The incoming finality proof request. |
| 212 | +/// |
| 213 | +/// Holds the protobuf value and the connection substream which made the |
| 214 | +/// request and over which to send the response. |
| 215 | +#[derive(Debug)] |
| 216 | +pub struct Request<T>(api::v1::finality::FinalityProofRequest, T); |
| 217 | + |
| 218 | +impl<T> From<Void> for Request<T> { |
| 219 | + fn from(v: Void) -> Self { |
| 220 | + unreachable(v) |
| 221 | + } |
| 222 | +} |
| 223 | + |
| 224 | +/// Substream upgrade protocol. |
| 225 | +/// |
| 226 | +/// We attempt to parse an incoming protobuf encoded request (cf. `Request`) |
| 227 | +/// which will be handled by the `FinalityProofRequests` behaviour, i.e. the request |
| 228 | +/// will become visible via `inject_node_event` which then dispatches to the |
| 229 | +/// relevant callback to process the message and prepare a response. |
| 230 | +#[derive(Debug, Clone)] |
| 231 | +pub struct Protocol { |
| 232 | + /// The max. request length in bytes. |
| 233 | + max_request_len: usize, |
| 234 | + /// The protocol to use during upgrade negotiation. |
| 235 | + protocol: Bytes, |
| 236 | +} |
| 237 | + |
| 238 | +impl UpgradeInfo for Protocol { |
| 239 | + type Info = Bytes; |
| 240 | + type InfoIter = iter::Once<Self::Info>; |
| 241 | + |
| 242 | + fn protocol_info(&self) -> Self::InfoIter { |
| 243 | + iter::once(self.protocol.clone()) |
| 244 | + } |
| 245 | +} |
| 246 | + |
| 247 | +impl<T> InboundUpgrade<T> for Protocol |
| 248 | +where |
| 249 | + T: AsyncRead + AsyncWrite + Unpin + Send + 'static |
| 250 | +{ |
| 251 | + type Output = Request<T>; |
| 252 | + type Error = ReadOneError; |
| 253 | + type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>; |
| 254 | + |
| 255 | + fn upgrade_inbound(self, mut s: T, _: Self::Info) -> Self::Future { |
| 256 | + async move { |
| 257 | + let len = self.max_request_len; |
| 258 | + let vec = read_one(&mut s, len).await?; |
| 259 | + match api::v1::finality::FinalityProofRequest::decode(&vec[..]) { |
| 260 | + Ok(r) => Ok(Request(r, s)), |
| 261 | + Err(e) => Err(ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e))) |
| 262 | + } |
| 263 | + }.boxed() |
| 264 | + } |
| 265 | +} |
| 266 | + |
0 commit comments