diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index 4a183e219fb88..0eebd1713cc81 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -22,7 +22,7 @@ use crate::{ discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut}, protocol::{message::Roles, CustomMessageOutcome, NotificationsSink, Protocol}, peer_info, request_responses, light_client_requests, - ObservedRole, DhtEvent, ExHashT, + ObservedRole, DhtEvent, }; use bytes::Bytes; @@ -54,9 +54,9 @@ pub use crate::request_responses::{ /// General behaviour of the network. Combines all protocols together. #[derive(NetworkBehaviour)] #[behaviour(out_event = "BehaviourOut", poll_method = "poll")] -pub struct Behaviour { +pub struct Behaviour { /// All the substrate-specific protocols. - substrate: Protocol, + substrate: Protocol, /// Periodically pings and identifies the nodes we are connected to, and store information in a /// cache. peer_info: peer_info::PeerInfoBehaviour, @@ -172,10 +172,10 @@ pub enum BehaviourOut { Dht(DhtEvent, Duration), } -impl Behaviour { +impl Behaviour { /// Builds a new `Behaviour`. pub fn new( - substrate: Protocol, + substrate: Protocol, user_agent: String, local_public_key: PublicKey, light_client_request_sender: light_client_requests::sender::LightClientRequestSender, @@ -256,12 +256,12 @@ impl Behaviour { } /// Returns a shared reference to the user protocol. - pub fn user_protocol(&self) -> &Protocol { + pub fn user_protocol(&self) -> &Protocol { &self.substrate } /// Returns a mutable reference to the user protocol. - pub fn user_protocol_mut(&mut self) -> &mut Protocol { + pub fn user_protocol_mut(&mut self) -> &mut Protocol { &mut self.substrate } @@ -294,15 +294,15 @@ fn reported_roles_to_observed_role(roles: Roles) -> ObservedRole { } } -impl NetworkBehaviourEventProcess for -Behaviour { +impl NetworkBehaviourEventProcess for +Behaviour { fn inject_event(&mut self, event: void::Void) { void::unreachable(event) } } -impl NetworkBehaviourEventProcess> for -Behaviour { +impl NetworkBehaviourEventProcess> for +Behaviour { fn inject_event(&mut self, event: CustomMessageOutcome) { match event { CustomMessageOutcome::BlockImport(origin, blocks) => @@ -362,7 +362,7 @@ Behaviour { } } -impl NetworkBehaviourEventProcess for Behaviour { +impl NetworkBehaviourEventProcess for Behaviour { fn inject_event(&mut self, event: request_responses::Event) { match event { request_responses::Event::InboundRequest { peer, protocol, result } => { @@ -386,8 +386,8 @@ impl NetworkBehaviourEventProcess NetworkBehaviourEventProcess - for Behaviour { +impl NetworkBehaviourEventProcess + for Behaviour { fn inject_event(&mut self, event: peer_info::PeerInfoEvent) { let peer_info::PeerInfoEvent::Identified { peer_id, @@ -416,8 +416,8 @@ impl NetworkBehaviourEventProcess NetworkBehaviourEventProcess - for Behaviour { +impl NetworkBehaviourEventProcess + for Behaviour { fn inject_event(&mut self, out: DiscoveryOut) { match out { DiscoveryOut::UnroutablePeer(_peer_id) => { @@ -450,7 +450,7 @@ impl NetworkBehaviourEventProcess } } -impl Behaviour { +impl Behaviour { fn poll( &mut self, cx: &mut Context, diff --git a/client/network/src/config.rs b/client/network/src/config.rs index 7f8cac95f9d55..d6d4d9d7162f1 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -68,6 +68,9 @@ pub struct Params { /// default. pub executor: Option + Send>>) + Send>>, + /// How to spawn the background task dedicated to the transactions handler. + pub transactions_handler_executor: Box + Send>>) + Send>, + /// Network layer configuration. pub network_config: NetworkConfiguration, diff --git a/client/network/src/gossip/tests.rs b/client/network/src/gossip/tests.rs index c0b8c5e730a11..c35159168d0fb 100644 --- a/client/network/src/gossip/tests.rs +++ b/client/network/src/gossip/tests.rs @@ -116,6 +116,7 @@ fn build_test_full_node(network_config: config::NetworkConfiguration) let worker = NetworkWorker::new(config::Params { role: config::Role::Full, executor: None, + transactions_handler_executor: Box::new(|task| { async_std::task::spawn(task); }), network_config, chain: client.clone(), on_demand: None, diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index 5bd20927869e0..556e71da23831 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -264,6 +264,7 @@ pub mod config; pub mod error; pub mod gossip; pub mod network_state; +pub mod transactions; #[doc(inline)] pub use libp2p::{multiaddr, Multiaddr, PeerId}; diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index acb5d9101eac9..e1a10b520ba9a 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -17,9 +17,8 @@ // along with this program. If not, see . use crate::{ - ExHashT, chain::Client, - config::{self, ProtocolId, TransactionPool, TransactionImportFuture, TransactionImport}, + config::{self, ProtocolId}, error, request_responses::RequestFailure, utils::{interval, LruHashSet}, @@ -27,7 +26,7 @@ use crate::{ use bytes::{Bytes, BytesMut}; use codec::{Decode, DecodeAll, Encode}; -use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered}; +use futures::{channel::oneshot, prelude::*}; use generic_proto::{GenericProto, GenericProtoOut}; use libp2p::core::{ConnectedPoint, connection::{ConnectionId, ListenerId}}; use libp2p::request_response::OutboundFailure; @@ -37,10 +36,7 @@ use libp2p::{Multiaddr, PeerId}; use log::{log, Level, trace, debug, warn, error}; use message::{BlockAnnounce, Message}; use message::generic::{Message as GenericMessage, Roles}; -use prometheus_endpoint::{ - Registry, Gauge, Counter, GaugeVec, - PrometheusError, Opts, register, U64 -}; +use prometheus_endpoint::{Registry, Gauge, GaugeVec, PrometheusError, Opts, register, U64}; use prost::Message as _; use sp_consensus::{ BlockOrigin, @@ -55,7 +51,7 @@ use sp_arithmetic::traits::SaturatedConversion; use sync::{ChainSync, SyncState}; use std::borrow::Cow; use std::convert::TryFrom as _; -use std::collections::{HashMap, HashSet, VecDeque, hash_map::Entry}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; use std::{io, iter, num::NonZeroUsize, pin::Pin, task::Poll, time}; @@ -69,28 +65,16 @@ pub use generic_proto::{NotificationsSink, Ready, NotifsHandlerError}; /// Interval at which we perform time based maintenance const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1100); -/// Interval at which we propagate transactions; -const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(2900); /// Maximum number of known block hashes to keep for a peer. const MAX_KNOWN_BLOCKS: usize = 1024; // ~32kb per peer + LruHashSet overhead -/// Maximum number of known transaction hashes to keep for a peer. -/// -/// This should be approx. 2 blocks full of transactions for the network to function properly. -const MAX_KNOWN_TRANSACTIONS: usize = 10240; // ~300kb per peer + overhead. - /// Maximum allowed size for a block announce. const MAX_BLOCK_ANNOUNCE_SIZE: u64 = 1024 * 1024; -/// Maximum allowed size for a transactions notification. -const MAX_TRANSACTIONS_SIZE: u64 = 16 * 1024 * 1024; /// Maximum size used for notifications in the block announce and transaction protocols. // Must be equal to `max(MAX_BLOCK_ANNOUNCE_SIZE, MAX_TRANSACTIONS_SIZE)`. pub(crate) const BLOCK_ANNOUNCES_TRANSACTIONS_SUBSTREAM_SIZE: u64 = 16 * 1024 * 1024; -/// Maximum number of transaction validation request we keep at any moment. -const MAX_PENDING_TRANSACTIONS: usize = 8192; - /// Current protocol version. pub(crate) const CURRENT_VERSION: u32 = 6; /// Lowest version we support @@ -98,11 +82,9 @@ pub(crate) const MIN_VERSION: u32 = 3; /// Identifier of the peerset for the block announces protocol. const HARDCODED_PEERSETS_SYNC: sc_peerset::SetId = sc_peerset::SetId::from(0); -/// Identifier of the peerset for the transactions protocol. -const HARDCODED_PEERSETS_TX: sc_peerset::SetId = sc_peerset::SetId::from(1); /// Number of hardcoded peersets (the constants right above). Any set whose identifier is equal or /// superior to this value corresponds to a user-defined protocol. -const NUM_HARDCODED_PEERSETS: usize = 2; +const NUM_HARDCODED_PEERSETS: usize = 1; /// When light node connects to the full node and the full node is behind light node /// for at least `LIGHT_MAXIMAL_BLOCKS_DIFFERENCE` blocks, we consider it not useful @@ -117,21 +99,8 @@ mod rep { pub const REFUSED: Rep = Rep::new(-(1 << 10), "Request refused"); /// Reputation change when we are a light client and a peer is behind us. pub const PEER_BEHIND_US_LIGHT: Rep = Rep::new(-(1 << 8), "Useless for a light peer"); - /// Reputation change when a peer sends us any transaction. - /// - /// This forces node to verify it, thus the negative value here. Once transaction is verified, - /// reputation change should be refunded with `ANY_TRANSACTION_REFUND` - pub const ANY_TRANSACTION: Rep = Rep::new(-(1 << 4), "Any transaction"); - /// Reputation change when a peer sends us any transaction that is not invalid. - pub const ANY_TRANSACTION_REFUND: Rep = Rep::new(1 << 4, "Any transaction (refund)"); - /// Reputation change when a peer sends us an transaction that we didn't know about. - pub const GOOD_TRANSACTION: Rep = Rep::new(1 << 7, "Good transaction"); - /// Reputation change when a peer sends us a bad transaction. - pub const BAD_TRANSACTION: Rep = Rep::new(-(1 << 12), "Bad transaction"); /// We received a message that failed to decode. pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message"); - /// We received an unexpected transaction packet. - pub const UNEXPECTED_TRANSACTIONS: Rep = Rep::new_fatal("Unexpected transactions packet"); /// Peer has different genesis. pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch"); /// Peer is on unsupported protocol version. @@ -147,7 +116,6 @@ struct Metrics { queued_blocks: Gauge, fork_targets: Gauge, justifications: GaugeVec, - propagated_transactions: Counter, } impl Metrics { @@ -175,62 +143,27 @@ impl Metrics { )?; register(g, r)? }, - propagated_transactions: register(Counter::new( - "sync_propagated_transactions", - "Number of transactions propagated to at least one peer", - )?, r)?, }) } } -#[pin_project::pin_project] -struct PendingTransaction { - #[pin] - validation: TransactionImportFuture, - tx_hash: H, -} - -impl Future for PendingTransaction { - type Output = (H, TransactionImport); - - fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { - let mut this = self.project(); - - if let Poll::Ready(import_result) = Pin::new(&mut this.validation).poll_unpin(cx) { - return Poll::Ready((this.tx_hash.clone(), import_result)); - } - - Poll::Pending - } -} - // Lock must always be taken in order declared here. -pub struct Protocol { +pub struct Protocol { /// Interval at which we call `tick`. tick_timeout: Pin + Send>>, - /// Interval at which we call `propagate_transactions`. - propagate_timeout: Pin + Send>>, /// Pending list of messages to return from `poll` as a priority. pending_messages: VecDeque>, - /// Pending transactions verification tasks. - pending_transactions: FuturesUnordered>, - /// As multiple peers can send us the same transaction, we group - /// these peers using the transaction hash while the transaction is - /// imported. This prevents that we import the same transaction - /// multiple times concurrently. - pending_transactions_peers: HashMap>, config: ProtocolConfig, genesis_hash: B::Hash, sync: ChainSync, // All connected peers - peers: HashMap>, + peers: HashMap>, chain: Arc>, /// List of nodes for which we perform additional logging because they are important for the /// user. important_peers: HashSet, /// Used to report reputation changes. peerset_handle: sc_peerset::PeersetHandle, - transaction_pool: Arc>, /// Handles opening the unique substream and sending and receiving raw messages. behaviour: GenericProto, /// List of notifications protocols that have been registered. @@ -245,15 +178,13 @@ pub struct Protocol { /// Peer information #[derive(Debug)] -struct Peer { +struct Peer { info: PeerInfo, /// Current block request, if any. Started by emitting [`CustomMessageOutcome::BlockRequest`]. block_request: Option<( message::BlockRequest, oneshot::Receiver, RequestFailure>>, )>, - /// Holds a set of transactions known to this peer. - known_transactions: LruHashSet, /// Holds a set of blocks known to this peer. known_blocks: LruHashSet, } @@ -336,18 +267,17 @@ fn build_status_message( Message::::Status(status).encode() } -impl Protocol { +impl Protocol { /// Create a new instance. pub fn new( config: ProtocolConfig, chain: Arc>, - transaction_pool: Arc>, protocol_id: ProtocolId, - config_role: &config::Role, network_config: &config::NetworkConfiguration, + notifications_protocols_handshakes: Vec>, block_announce_validator: Box + Send>, metrics_registry: Option<&Registry>, - ) -> error::Result<(Protocol, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> { + ) -> error::Result<(Protocol, sc_peerset::PeersetHandle, Vec<(PeerId, Multiaddr)>)> { let info = chain.info(); let sync = ChainSync::new( config.roles, @@ -405,17 +335,6 @@ impl Protocol { == config::NonReservedPeerMode::Deny, }); - // Set number 1 is used for transactions. - // The `reserved_nodes` of this set are later kept in sync with the peers we connect - // to through set 0. - sets.push(sc_peerset::SetConfig { - in_peers: 0, - out_peers: 0, - bootnodes: Vec::new(), - reserved_nodes: HashSet::new(), - reserved_only: true, - }); - for set_cfg in &network_config.extra_sets { let mut reserved_nodes = HashSet::new(); for reserved in set_cfg.set_config.reserved_nodes.iter() { @@ -440,14 +359,6 @@ impl Protocol { }) }; - let transactions_protocol: Cow<'static, str> = Cow::from({ - let mut proto = String::new(); - proto.push_str("/"); - proto.push_str(protocol_id.as_ref()); - proto.push_str("/transactions/1"); - proto - }); - let block_announces_protocol: Cow<'static, str> = Cow::from({ let mut proto = String::new(); proto.push_str("/"); @@ -458,7 +369,6 @@ impl Protocol { let behaviour = { let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::>(); - let handshake_message = Roles::from(config_role).encode(); let best_number = info.best_number; let best_hash = info.best_hash; @@ -477,12 +387,10 @@ impl Protocol { build_status_message::(&config, best_number, best_hash, genesis_hash), peerset, iter::once((block_announces_protocol, block_announces_handshake, MAX_BLOCK_ANNOUNCE_SIZE)) - .chain(iter::once((transactions_protocol, vec![], MAX_TRANSACTIONS_SIZE))) - .chain(network_config.extra_sets.iter().map(|s| ( - s.notifications_protocol.clone(), - handshake_message.clone(), - s.max_notification_size - ))), + .chain(network_config.extra_sets.iter() + .zip(notifications_protocols_handshakes) + .map(|(s, hs)| (s.notifications_protocol.clone(), hs, s.max_notification_size)) + ), ) }; @@ -493,17 +401,13 @@ impl Protocol { let protocol = Protocol { tick_timeout: Box::pin(interval(TICK_TIMEOUT)), - propagate_timeout: Box::pin(interval(PROPAGATE_TIMEOUT)), pending_messages: VecDeque::new(), - pending_transactions: FuturesUnordered::new(), - pending_transactions_peers: HashMap::new(), config, peers: HashMap::new(), chain, genesis_hash: info.genesis_hash, sync, important_peers, - transaction_pool, peerset_handle: peerset_handle.clone(), behaviour, notification_protocols: @@ -652,8 +556,8 @@ impl Protocol { debug!(target: "sub-libp2p", "Received unexpected Status"), GenericMessage::BlockAnnounce(announce) => self.push_block_announce_validation(who.clone(), announce), - GenericMessage::Transactions(m) => - self.on_transactions(who, m), + GenericMessage::Transactions(_) => + warn!(target: "sub-libp2p", "Received unexpected Transactions"), GenericMessage::BlockResponse(_) => warn!(target: "sub-libp2p", "Received unexpected BlockResponse"), GenericMessage::RemoteCallResponse(_) => @@ -690,7 +594,7 @@ impl Protocol { who: PeerId, request: message::BlockRequest, ) -> CustomMessageOutcome { - prepare_block_request::(&mut self.peers, who, request) + prepare_block_request::(&mut self.peers, who, request) } /// Called by peer when it is disconnecting. @@ -896,8 +800,6 @@ impl Protocol { best_number: status.best_number }, block_request: None, - known_transactions: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS) - .expect("Constant is nonzero")), known_blocks: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_BLOCKS) .expect("Constant is nonzero")), }; @@ -928,144 +830,6 @@ impl Protocol { Ok(()) } - /// Called when peer sends us new transactions - fn on_transactions( - &mut self, - who: PeerId, - transactions: message::Transactions, - ) { - // sending transaction to light node is considered a bad behavior - if !self.config.roles.is_full() { - trace!(target: "sync", "Peer {} is trying to send transactions to the light node", who); - self.behaviour.disconnect_peer(&who, HARDCODED_PEERSETS_TX); - self.peerset_handle.report_peer(who, rep::UNEXPECTED_TRANSACTIONS); - return; - } - - // Accept transactions only when fully synced - if self.sync.status().state != SyncState::Idle { - trace!(target: "sync", "{} Ignoring transactions while syncing", who); - return; - } - - trace!(target: "sync", "Received {} transactions from {}", transactions.len(), who); - if let Some(ref mut peer) = self.peers.get_mut(&who) { - for t in transactions { - if self.pending_transactions.len() > MAX_PENDING_TRANSACTIONS { - debug!( - target: "sync", - "Ignoring any further transactions that exceed `MAX_PENDING_TRANSACTIONS`({}) limit", - MAX_PENDING_TRANSACTIONS, - ); - break; - } - - let hash = self.transaction_pool.hash_of(&t); - peer.known_transactions.insert(hash.clone()); - - self.peerset_handle.report_peer(who.clone(), rep::ANY_TRANSACTION); - - match self.pending_transactions_peers.entry(hash.clone()) { - Entry::Vacant(entry) => { - self.pending_transactions.push(PendingTransaction { - validation: self.transaction_pool.import(t), - tx_hash: hash, - }); - entry.insert(vec![who.clone()]); - }, - Entry::Occupied(mut entry) => { - entry.get_mut().push(who.clone()); - } - } - } - } - } - - fn on_handle_transaction_import(&mut self, who: PeerId, import: TransactionImport) { - match import { - TransactionImport::KnownGood => self.peerset_handle.report_peer(who, rep::ANY_TRANSACTION_REFUND), - TransactionImport::NewGood => self.peerset_handle.report_peer(who, rep::GOOD_TRANSACTION), - TransactionImport::Bad => self.peerset_handle.report_peer(who, rep::BAD_TRANSACTION), - TransactionImport::None => {}, - } - } - - /// Propagate one transaction. - pub fn propagate_transaction( - &mut self, - hash: &H, - ) { - debug!(target: "sync", "Propagating transaction [{:?}]", hash); - // Accept transactions only when fully synced - if self.sync.status().state != SyncState::Idle { - return; - } - if let Some(transaction) = self.transaction_pool.transaction(hash) { - let propagated_to = self.do_propagate_transactions(&[(hash.clone(), transaction)]); - self.transaction_pool.on_broadcasted(propagated_to); - } - } - - fn do_propagate_transactions( - &mut self, - transactions: &[(H, B::Extrinsic)], - ) -> HashMap> { - let mut propagated_to = HashMap::<_, Vec<_>>::new(); - let mut propagated_transactions = 0; - - for (who, peer) in self.peers.iter_mut() { - // never send transactions to the light node - if !peer.info.roles.is_full() { - continue; - } - - if !self.behaviour.is_open(who, HARDCODED_PEERSETS_TX) { - continue; - } - - let (hashes, to_send): (Vec<_>, Vec<_>) = transactions - .iter() - .filter(|&(ref hash, _)| peer.known_transactions.insert(hash.clone())) - .cloned() - .unzip(); - - propagated_transactions += hashes.len(); - - if !to_send.is_empty() { - for hash in hashes { - propagated_to - .entry(hash) - .or_default() - .push(who.to_base58()); - } - trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who); - self.behaviour.write_notification( - who, - HARDCODED_PEERSETS_TX, - to_send.encode() - ); - } - } - - if let Some(ref metrics) = self.metrics { - metrics.propagated_transactions.inc_by(propagated_transactions as _) - } - - propagated_to - } - - /// Call when we must propagate ready transactions to peers. - pub fn propagate_transactions(&mut self) { - debug!(target: "sync", "Propagating transactions"); - // Accept transactions only when fully synced - if self.sync.status().state != SyncState::Idle { - return; - } - let transactions = self.transaction_pool.transactions(); - let propagated_to = self.do_propagate_transactions(&transactions); - self.transaction_pool.on_broadcasted(propagated_to); - } - /// Make sure an important block is propagated to peers. /// /// In chain-based consensus, we often need to make sure non-best forks are @@ -1317,25 +1081,21 @@ impl Protocol { /// Set whether the syncing peers set is in reserved-only mode. pub fn set_reserved_only(&self, reserved_only: bool) { self.peerset_handle.set_reserved_only(HARDCODED_PEERSETS_SYNC, reserved_only); - self.peerset_handle.set_reserved_only(HARDCODED_PEERSETS_TX, reserved_only); } /// Removes a `PeerId` from the list of reserved peers for syncing purposes. pub fn remove_reserved_peer(&self, peer: PeerId) { self.peerset_handle.remove_reserved_peer(HARDCODED_PEERSETS_SYNC, peer.clone()); - self.peerset_handle.remove_reserved_peer(HARDCODED_PEERSETS_TX, peer); } /// Adds a `PeerId` to the list of reserved peers for syncing purposes. pub fn add_reserved_peer(&self, peer: PeerId) { self.peerset_handle.add_reserved_peer(HARDCODED_PEERSETS_SYNC, peer.clone()); - self.peerset_handle.add_reserved_peer(HARDCODED_PEERSETS_TX, peer); } /// Sets the list of reserved peers for syncing purposes. pub fn set_reserved_peers(&self, peers: HashSet) { self.peerset_handle.set_reserved_peers(HARDCODED_PEERSETS_SYNC, peers.clone()); - self.peerset_handle.set_reserved_peers(HARDCODED_PEERSETS_TX, peers); } /// Removes a `PeerId` from the list of reserved peers. @@ -1421,8 +1181,8 @@ impl Protocol { } } -fn prepare_block_request( - peers: &mut HashMap>, +fn prepare_block_request( + peers: &mut HashMap>, who: PeerId, request: message::BlockRequest, ) -> CustomMessageOutcome { @@ -1490,7 +1250,7 @@ pub enum CustomMessageOutcome { None, } -impl NetworkBehaviour for Protocol { +impl NetworkBehaviour for Protocol { type ProtocolsHandler = ::ProtocolsHandler; type OutEvent = CustomMessageOutcome; @@ -1619,10 +1379,6 @@ impl NetworkBehaviour for Protocol { self.tick(); } - while let Poll::Ready(Some(())) = self.propagate_timeout.poll_next_unpin(cx) { - self.propagate_transactions(); - } - for (id, request) in self.sync.block_requests() { let event = prepare_block_request(&mut self.peers, id.clone(), request); self.pending_messages.push_back(event); @@ -1631,13 +1387,6 @@ impl NetworkBehaviour for Protocol { let event = prepare_block_request(&mut self.peers, id, request); self.pending_messages.push_back(event); } - if let Poll::Ready(Some((tx_hash, result))) = self.pending_transactions.poll_next_unpin(cx) { - if let Some(peers) = self.pending_transactions_peers.remove(&tx_hash) { - peers.into_iter().for_each(|p| self.on_handle_transaction_import(p, result)); - } else { - warn!(target: "sub-libp2p", "Inconsistent state, no peers for pending transaction!"); - } - } // Check if there is any block announcement validation finished. while let Poll::Ready(result) = self.sync.poll_block_announce_validation(cx) { @@ -1681,11 +1430,6 @@ impl NetworkBehaviour for Protocol { }; if self.on_sync_peer_connected(peer_id.clone(), handshake).is_ok() { - // Set 1 is kept in sync with the connected peers of set 0. - self.peerset_handle.add_reserved_peer( - HARDCODED_PEERSETS_TX, - peer_id.clone() - ); CustomMessageOutcome::SyncConnected(peer_id) } else { CustomMessageOutcome::None @@ -1705,11 +1449,6 @@ impl NetworkBehaviour for Protocol { match as DecodeAll>::decode_all(&mut &received_handshake[..]) { Ok(handshake) => { if self.on_sync_peer_connected(peer_id.clone(), handshake).is_ok() { - // Set 1 is kept in sync with the connected peers of set 0. - self.peerset_handle.add_reserved_peer( - HARDCODED_PEERSETS_TX, - peer_id.clone() - ); CustomMessageOutcome::SyncConnected(peer_id) } else { CustomMessageOutcome::None @@ -1731,19 +1470,28 @@ impl NetworkBehaviour for Protocol { } } - } else if set_id == HARDCODED_PEERSETS_TX { - // Nothing to do. - CustomMessageOutcome::None } else { - match message::Roles::decode_all(&received_handshake[..]) { - Ok(roles) => + match (message::Roles::decode_all(&received_handshake[..]), self.peers.get(&peer_id)) { + (Ok(roles), _) => CustomMessageOutcome::NotificationStreamOpened { remote: peer_id, protocol: self.notification_protocols[usize::from(set_id) - NUM_HARDCODED_PEERSETS].clone(), roles, notifications_sink, }, - Err(err) => { + (Err(_), Some(peer)) if received_handshake.is_empty() => { + // As a convenience, we allow opening substreams for "external" + // notification protocols with an empty handshake. This fetches the + // roles from the locally-known roles. + // TODO: remove this after https://github.com/paritytech/substrate/issues/5685 + CustomMessageOutcome::NotificationStreamOpened { + remote: peer_id, + protocol: self.notification_protocols[usize::from(set_id) - NUM_HARDCODED_PEERSETS].clone(), + roles: peer.info.roles, + notifications_sink, + } + }, + (Err(err), _) => { debug!(target: "sync", "Failed to parse remote handshake: {}", err); self.behaviour.disconnect_peer(&peer_id, set_id); self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE); @@ -1753,7 +1501,7 @@ impl NetworkBehaviour for Protocol { } } GenericProtoOut::CustomProtocolReplaced { peer_id, notifications_sink, set_id } => { - if set_id == HARDCODED_PEERSETS_SYNC || set_id == HARDCODED_PEERSETS_TX { + if set_id == HARDCODED_PEERSETS_SYNC { CustomMessageOutcome::None } else { CustomMessageOutcome::NotificationStreamReplaced { @@ -1767,11 +1515,6 @@ impl NetworkBehaviour for Protocol { // Set number 0 is hardcoded the default set of peers we sync from. if set_id == HARDCODED_PEERSETS_SYNC { if self.on_sync_peer_disconnected(peer_id.clone()).is_ok() { - // Set 1 is kept in sync with the connected peers of set 0. - self.peerset_handle.remove_reserved_peer( - HARDCODED_PEERSETS_TX, - peer_id.clone() - ); CustomMessageOutcome::SyncDisconnected(peer_id) } else { log::debug!( @@ -1781,8 +1524,6 @@ impl NetworkBehaviour for Protocol { ); CustomMessageOutcome::None } - } else if set_id == HARDCODED_PEERSETS_TX { - CustomMessageOutcome::None } else { CustomMessageOutcome::NotificationStreamClosed { remote: peer_id, @@ -1815,20 +1556,10 @@ impl NetworkBehaviour for Protocol { CustomMessageOutcome::None } } - HARDCODED_PEERSETS_TX if self.peers.contains_key(&peer_id) => { - if let Ok(m) = as Decode>::decode( - &mut message.as_ref(), - ) { - self.on_transactions(peer_id, m); - } else { - warn!(target: "sub-libp2p", "Failed to decode transactions list"); - } - CustomMessageOutcome::None - } - HARDCODED_PEERSETS_SYNC | HARDCODED_PEERSETS_TX => { + HARDCODED_PEERSETS_SYNC => { debug!( target: "sync", - "Received sync or transaction for peer earlier refused by sync layer: {}", + "Received sync for peer earlier refused by sync layer: {}", peer_id ); CustomMessageOutcome::None diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 9ac7483467b40..74ce9316fc41c 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -41,6 +41,7 @@ use crate::{ light_client_requests, protocol::{ self, + message::generic::Roles, NotifsHandlerError, NotificationsSink, PeerInfo, @@ -49,9 +50,13 @@ use crate::{ event::Event, sync::SyncState, }, + transactions, transport, ReputationChange, + bitswap::Bitswap, }; + +use codec::Encode as _; use futures::{channel::oneshot, prelude::*}; use libp2p::{PeerId, multiaddr, Multiaddr}; use libp2p::core::{ @@ -140,7 +145,7 @@ impl NetworkWorker { /// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order /// for the network processing to advance. From it, you can extract a `NetworkService` using /// `worker.service()`. The `NetworkService` can be shared through the codebase. - pub fn new(params: Params) -> Result, Error> { + pub fn new(mut params: Params) -> Result, Error> { // Ensure the listen addresses are consistent with the transport. ensure_addresses_consistent_with_transport( params.network_config.listen_addresses.iter(), @@ -171,6 +176,11 @@ impl NetworkWorker { fs::create_dir_all(path)?; } + let transactions_handler_proto = transactions::TransactionsHandlerPrototype::new( + params.protocol_id.clone() + ); + params.network_config.extra_sets.insert(0, transactions_handler_proto.set_config()); + // Private and public keys configuration. let local_identity = params.network_config.node_key.clone().into_keypair()?; let local_public = local_identity.public(); @@ -181,16 +191,17 @@ impl NetworkWorker { local_peer_id.to_base58(), ); + let default_notif_handshake_message = Roles::from(¶ms.role).encode(); let (protocol, peerset_handle, mut known_addresses) = Protocol::new( protocol::ProtocolConfig { roles: From::from(¶ms.role), max_parallel_downloads: params.network_config.max_parallel_downloads, }, params.chain.clone(), - params.transaction_pool, params.protocol_id.clone(), - ¶ms.role, ¶ms.network_config, + iter::once(Vec::new()).chain((0..params.network_config.extra_sets.len() - 1) + .map(|_| default_notif_handshake_message.clone())).collect(), params.block_announce_validator, params.metrics_registry.as_ref(), )?; @@ -234,7 +245,7 @@ impl NetworkWorker { // Build the swarm. let client = params.chain.clone(); - let (mut swarm, bandwidth): (Swarm, _) = { + let (mut swarm, bandwidth): (Swarm, _) = { let user_agent = format!( "{} ({})", params.network_config.client_version, @@ -377,14 +388,14 @@ impl NetworkWorker { // Listen on multiaddresses. for addr in ¶ms.network_config.listen_addresses { - if let Err(err) = Swarm::::listen_on(&mut swarm, addr.clone()) { + if let Err(err) = Swarm::::listen_on(&mut swarm, addr.clone()) { warn!(target: "sub-libp2p", "Can't listen on {} because: {:?}", addr, err) } } // Add external addresses. for addr in ¶ms.network_config.public_addresses { - Swarm::::add_external_address(&mut swarm, addr.clone(), AddressScore::Infinite); + Swarm::::add_external_address(&mut swarm, addr.clone(), AddressScore::Infinite); } let external_addresses = Arc::new(Mutex::new(Vec::new())); @@ -404,6 +415,14 @@ impl NetworkWorker { _marker: PhantomData, }); + let (tx_handler, tx_handler_controller) = transactions_handler_proto.build( + service.clone(), + params.role, + params.transaction_pool, + params.metrics_registry.as_ref() + )?; + (params.transactions_handler_executor)(tx_handler.run().boxed()); + Ok(NetworkWorker { external_addresses, num_connected, @@ -415,6 +434,7 @@ impl NetworkWorker { light_client_rqs: params.on_demand.and_then(|od| od.extract_receiver()), event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?, peers_notifications_sinks, + tx_handler_controller, metrics, boot_node_ids, }) @@ -506,14 +526,14 @@ impl NetworkWorker { /// Returns the local `PeerId`. pub fn local_peer_id(&self) -> &PeerId { - Swarm::::local_peer_id(&self.network_service) + Swarm::::local_peer_id(&self.network_service) } /// Returns the list of addresses we are listening on. /// /// Does **NOT** include a trailing `/p2p/` with our `PeerId`. pub fn listen_addresses(&self) -> impl Iterator { - Swarm::::listeners(&self.network_service) + Swarm::::listeners(&self.network_service) } /// Get network state. @@ -564,9 +584,9 @@ impl NetworkWorker { .collect() }; - let peer_id = Swarm::::local_peer_id(&swarm).to_base58(); - let listened_addresses = Swarm::::listeners(&swarm).cloned().collect(); - let external_addresses = Swarm::::external_addresses(&swarm) + let peer_id = Swarm::::local_peer_id(&swarm).to_base58(); + let listened_addresses = Swarm::::listeners(&swarm).cloned().collect(); + let external_addresses = Swarm::::external_addresses(&swarm) .map(|r| &r.addr) .cloned() .collect(); @@ -1293,7 +1313,7 @@ pub struct NetworkWorker { /// The network service that can be extracted and shared through the codebase. service: Arc>, /// The *actual* network. - network_service: Swarm, + network_service: Swarm, /// The import queue that was passed at initialization. import_queue: Box>, /// Messages from the [`NetworkService`] that must be processed. @@ -1309,6 +1329,8 @@ pub struct NetworkWorker { /// For each peer and protocol combination, an object that allows sending notifications to /// that peer. Shared with the [`NetworkService`]. peers_notifications_sinks: Arc), NotificationsSink>>>, + /// Controller for the handler of incoming and outgoing transactions. + tx_handler_controller: transactions::TransactionsHandlerController, } impl Future for NetworkWorker { @@ -1368,9 +1390,9 @@ impl Future for NetworkWorker { ServiceToWorkerMsg::RequestJustification(hash, number) => this.network_service.user_protocol_mut().request_justification(&hash, number), ServiceToWorkerMsg::PropagateTransaction(hash) => - this.network_service.user_protocol_mut().propagate_transaction(&hash), + this.tx_handler_controller.propagate_transaction(hash), ServiceToWorkerMsg::PropagateTransactions => - this.network_service.user_protocol_mut().propagate_transactions(), + this.tx_handler_controller.propagate_transactions(), ServiceToWorkerMsg::GetValue(key) => this.network_service.get_value(&key), ServiceToWorkerMsg::PutValue(key, value) => @@ -1749,7 +1771,7 @@ impl Future for NetworkWorker { // Update the variables shared with the `NetworkService`. this.num_connected.store(num_connected_peers, Ordering::Relaxed); { - let external_addresses = Swarm::::external_addresses(&this.network_service) + let external_addresses = Swarm::::external_addresses(&this.network_service) .map(|r| &r.addr) .cloned() .collect(); @@ -1761,6 +1783,8 @@ impl Future for NetworkWorker { SyncState::Downloading => true, }; + this.tx_handler_controller.set_gossip_enabled(!is_major_syncing); + this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed); if let Some(metrics) = this.metrics.as_ref() { @@ -1792,14 +1816,14 @@ impl Unpin for NetworkWorker { } /// The libp2p swarm, customized for our needs. -type Swarm = libp2p::swarm::Swarm>; +type Swarm = libp2p::swarm::Swarm>; // Implementation of `import_queue::Link` trait using the available local variables. -struct NetworkLink<'a, B: BlockT, H: ExHashT> { - protocol: &'a mut Swarm, +struct NetworkLink<'a, B: BlockT> { + protocol: &'a mut Swarm, } -impl<'a, B: BlockT, H: ExHashT> Link for NetworkLink<'a, B, H> { +impl<'a, B: BlockT> Link for NetworkLink<'a, B> { fn blocks_processed( &mut self, imported: usize, diff --git a/client/network/src/service/tests.rs b/client/network/src/service/tests.rs index f88854963fb95..defb9213a3493 100644 --- a/client/network/src/service/tests.rs +++ b/client/network/src/service/tests.rs @@ -116,6 +116,7 @@ fn build_test_full_node(config: config::NetworkConfiguration) let worker = NetworkWorker::new(config::Params { role: config::Role::Full, executor: None, + transactions_handler_executor: Box::new(|task| { async_std::task::spawn(task); }), network_config: config, chain: client.clone(), on_demand: None, diff --git a/client/network/src/transactions.rs b/client/network/src/transactions.rs new file mode 100644 index 0000000000000..e6d807c2cb78d --- /dev/null +++ b/client/network/src/transactions.rs @@ -0,0 +1,486 @@ +// This file is part of Substrate. + +// Copyright (C) 2017-2021 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Transactions handling to plug on top of the network service. +//! +//! Usage: +//! +//! - Use [`TransactionsHandlerPrototype::new`] to create a prototype. +//! - Pass the return value of [`TransactionsHandlerPrototype::set_config`] to the network +//! configuration as an extra peers set. +//! - Use [`TransactionsHandlerPrototype::build`] then [`TransactionsHandler::run`] to obtain a +//! `Future` that processes transactions. +//! + +use crate::{ + ExHashT, Event, ObservedRole, + config::{self, ProtocolId, TransactionPool, TransactionImportFuture, TransactionImport}, + error, protocol::message, service::NetworkService, utils::{interval, LruHashSet}, +}; + +use codec::{Decode, Encode}; +use futures::{channel::mpsc, prelude::*, stream::FuturesUnordered}; +use libp2p::{multiaddr, PeerId}; +use log::{trace, debug, warn}; +use prometheus_endpoint::{ + Registry, Counter, PrometheusError, register, U64 +}; +use sp_runtime::traits::Block as BlockT; +use std::borrow::Cow; +use std::collections::{HashMap, hash_map::Entry}; +use std::sync::{atomic::{AtomicBool, Ordering}, Arc}; +use std::{iter, num::NonZeroUsize, pin::Pin, task::Poll, time}; + +/// Interval at which we propagate transactions; +const PROPAGATE_TIMEOUT: time::Duration = time::Duration::from_millis(2900); + +/// Maximum number of known transaction hashes to keep for a peer. +/// +/// This should be approx. 2 blocks full of transactions for the network to function properly. +const MAX_KNOWN_TRANSACTIONS: usize = 10240; // ~300kb per peer + overhead. + +/// Maximum allowed size for a transactions notification. +const MAX_TRANSACTIONS_SIZE: u64 = 16 * 1024 * 1024; + +/// Maximum number of transaction validation request we keep at any moment. +const MAX_PENDING_TRANSACTIONS: usize = 8192; + +mod rep { + use sc_peerset::ReputationChange as Rep; + /// Reputation change when a peer sends us any transaction. + /// + /// This forces node to verify it, thus the negative value here. Once transaction is verified, + /// reputation change should be refunded with `ANY_TRANSACTION_REFUND` + pub const ANY_TRANSACTION: Rep = Rep::new(-(1 << 4), "Any transaction"); + /// Reputation change when a peer sends us any transaction that is not invalid. + pub const ANY_TRANSACTION_REFUND: Rep = Rep::new(1 << 4, "Any transaction (refund)"); + /// Reputation change when a peer sends us an transaction that we didn't know about. + pub const GOOD_TRANSACTION: Rep = Rep::new(1 << 7, "Good transaction"); + /// Reputation change when a peer sends us a bad transaction. + pub const BAD_TRANSACTION: Rep = Rep::new(-(1 << 12), "Bad transaction"); + /// We received an unexpected transaction packet. + pub const UNEXPECTED_TRANSACTIONS: Rep = Rep::new_fatal("Unexpected transactions packet"); +} + +struct Metrics { + propagated_transactions: Counter, +} + +impl Metrics { + fn register(r: &Registry) -> Result { + Ok(Metrics { + propagated_transactions: register(Counter::new( + "sync_propagated_transactions", + "Number of transactions propagated to at least one peer", + )?, r)?, + }) + } +} + +#[pin_project::pin_project] +struct PendingTransaction { + #[pin] + validation: TransactionImportFuture, + tx_hash: H, +} + +impl Future for PendingTransaction { + type Output = (H, TransactionImport); + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + let mut this = self.project(); + + if let Poll::Ready(import_result) = Pin::new(&mut this.validation).poll_unpin(cx) { + return Poll::Ready((this.tx_hash.clone(), import_result)); + } + + Poll::Pending + } +} + +/// Prototype for a [`TransactionsHandler`]. +pub struct TransactionsHandlerPrototype { + protocol_name: Cow<'static, str>, +} + +impl TransactionsHandlerPrototype { + /// Create a new instance. + pub fn new(protocol_id: ProtocolId) -> Self { + TransactionsHandlerPrototype { + protocol_name: Cow::from({ + let mut proto = String::new(); + proto.push_str("/"); + proto.push_str(protocol_id.as_ref()); + proto.push_str("/transactions/1"); + proto + }) + } + } + + /// Returns the configuration of the set to put in the network configuration. + pub fn set_config(&self) -> config::NonDefaultSetConfig { + config::NonDefaultSetConfig { + notifications_protocol: self.protocol_name.clone(), + max_notification_size: MAX_TRANSACTIONS_SIZE, + set_config: config::SetConfig { + in_peers: 0, + out_peers: 0, + reserved_nodes: Vec::new(), + non_reserved_mode: config::NonReservedPeerMode::Deny, + } + } + } + + /// Turns the prototype into the actual handler. Returns a controller that allows controlling + /// the behaviour of the handler while it's running. + /// + /// Important: the transactions handler is initially disabled and doesn't gossip transactions. + /// You must call [`TransactionsHandlerController::set_gossip_enabled`] to enable it. + pub fn build( + self, + service: Arc>, + local_role: config::Role, + transaction_pool: Arc>, + metrics_registry: Option<&Registry>, + ) -> error::Result<(TransactionsHandler, TransactionsHandlerController)> { + let event_stream = service.event_stream("transactions-handler").boxed(); + let (to_handler, from_controller) = mpsc::unbounded(); + let gossip_enabled = Arc::new(AtomicBool::new(false)); + + let handler = TransactionsHandler { + protocol_name: self.protocol_name, + propagate_timeout: Box::pin(interval(PROPAGATE_TIMEOUT)), + pending_transactions: FuturesUnordered::new(), + pending_transactions_peers: HashMap::new(), + gossip_enabled: gossip_enabled.clone(), + service, + event_stream, + peers: HashMap::new(), + transaction_pool, + local_role, + from_controller, + metrics: if let Some(r) = metrics_registry { + Some(Metrics::register(r)?) + } else { + None + }, + }; + + let controller = TransactionsHandlerController { + to_handler, + gossip_enabled, + }; + + Ok((handler, controller)) + } +} + +/// Controls the behaviour of a [`TransactionsHandler`] it is connected to. +pub struct TransactionsHandlerController { + to_handler: mpsc::UnboundedSender>, + gossip_enabled: Arc, +} + +impl TransactionsHandlerController { + /// Controls whether transactions are being gossiped on the network. + pub fn set_gossip_enabled(&mut self, enabled: bool) { + self.gossip_enabled.store(enabled, Ordering::Relaxed); + } + + /// You may call this when new transactions are imported by the transaction pool. + /// + /// All transactions will be fetched from the `TransactionPool` that was passed at + /// initialization as part of the configuration and propagated to peers. + pub fn propagate_transactions(&self) { + let _ = self.to_handler.unbounded_send(ToHandler::PropagateTransactions); + } + + /// You must call when new a transaction is imported by the transaction pool. + /// + /// This transaction will be fetched from the `TransactionPool` that was passed at + /// initialization as part of the configuration and propagated to peers. + pub fn propagate_transaction(&self, hash: H) { + let _ = self.to_handler.unbounded_send(ToHandler::PropagateTransaction(hash)); + } +} + +enum ToHandler { + PropagateTransactions, + PropagateTransaction(H), +} + +/// Handler for transactions. Call [`TransactionsHandler::run`] to start the processing. +pub struct TransactionsHandler { + protocol_name: Cow<'static, str>, + /// Interval at which we call `propagate_transactions`. + propagate_timeout: Pin + Send>>, + /// Pending transactions verification tasks. + pending_transactions: FuturesUnordered>, + /// As multiple peers can send us the same transaction, we group + /// these peers using the transaction hash while the transaction is + /// imported. This prevents that we import the same transaction + /// multiple times concurrently. + pending_transactions_peers: HashMap>, + /// Network service to use to send messages and manage peers. + service: Arc>, + /// Stream of networking events. + event_stream: Pin + Send>>, + // All connected peers + peers: HashMap>, + transaction_pool: Arc>, + gossip_enabled: Arc, + local_role: config::Role, + from_controller: mpsc::UnboundedReceiver>, + /// Prometheus metrics. + metrics: Option, +} + +/// Peer information +#[derive(Debug)] +struct Peer { + /// Holds a set of transactions known to this peer. + known_transactions: LruHashSet, + role: ObservedRole, +} + +impl TransactionsHandler { + /// Turns the [`TransactionsHandler`] into a future that should run forever and not be + /// interrupted. + pub async fn run(mut self) { + loop { + futures::select!{ + _ = self.propagate_timeout.next().fuse() => { + self.propagate_transactions(); + }, + (tx_hash, result) = self.pending_transactions.select_next_some() => { + if let Some(peers) = self.pending_transactions_peers.remove(&tx_hash) { + peers.into_iter().for_each(|p| self.on_handle_transaction_import(p, result)); + } else { + warn!(target: "sub-libp2p", "Inconsistent state, no peers for pending transaction!"); + } + }, + network_event = self.event_stream.next().fuse() => { + if let Some(network_event) = network_event { + self.handle_network_event(network_event).await; + } else { + // Networking has seemingly closed. Closing as well. + return; + } + }, + message = self.from_controller.select_next_some().fuse() => { + match message { + ToHandler::PropagateTransaction(hash) => self.propagate_transaction(&hash), + ToHandler::PropagateTransactions => self.propagate_transactions(), + } + }, + } + } + } + + async fn handle_network_event(&mut self, event: Event) { + match event { + Event::Dht(_) => {}, + Event::SyncConnected { remote } => { + let addr = iter::once(multiaddr::Protocol::P2p(remote.into())) + .collect::(); + let result = self.service.add_peers_to_reserved_set( + self.protocol_name.clone(), + iter::once(addr).collect() + ); + if let Err(err) = result { + log::error!(target: "sync", "Add reserved peer failed: {}", err); + } + }, + Event::SyncDisconnected { remote } => { + let addr = iter::once(multiaddr::Protocol::P2p(remote.into())) + .collect::(); + let result = self.service.remove_peers_from_reserved_set( + self.protocol_name.clone(), + iter::once(addr).collect() + ); + if let Err(err) = result { + log::error!(target: "sync", "Removing reserved peer failed: {}", err); + } + }, + + Event::NotificationStreamOpened { remote, protocol, role } if protocol == self.protocol_name => { + self.peers.insert(remote, Peer { + known_transactions: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS) + .expect("Constant is nonzero")), + role, + }); + } + Event::NotificationStreamClosed { remote, protocol } if protocol == self.protocol_name => { + self.peers.remove(&remote); + } + + Event::NotificationsReceived { remote, messages } => { + for (protocol, message) in messages { + if protocol != self.protocol_name { + continue; + } + + if let Ok(m) = as Decode>::decode( + &mut message.as_ref(), + ) { + self.on_transactions(remote, m); + } else { + warn!(target: "sub-libp2p", "Failed to decode transactions list"); + } + } + }, + + // Not our concern. + Event::NotificationStreamOpened { .. } | Event::NotificationStreamClosed { .. } => {} + } + } + + /// Called when peer sends us new transactions + fn on_transactions( + &mut self, + who: PeerId, + transactions: message::Transactions, + ) { + // sending transaction to light node is considered a bad behavior + if matches!(self.local_role, config::Role::Light) { + trace!(target: "sync", "Peer {} is trying to send transactions to the light node", who); + self.service.disconnect_peer(who, self.protocol_name.clone()); + self.service.report_peer(who, rep::UNEXPECTED_TRANSACTIONS); + return; + } + + // Accept transactions only when enabled + if !self.gossip_enabled.load(Ordering::Relaxed) { + trace!(target: "sync", "{} Ignoring transactions while disabled", who); + return; + } + + trace!(target: "sync", "Received {} transactions from {}", transactions.len(), who); + if let Some(ref mut peer) = self.peers.get_mut(&who) { + for t in transactions { + if self.pending_transactions.len() > MAX_PENDING_TRANSACTIONS { + debug!( + target: "sync", + "Ignoring any further transactions that exceed `MAX_PENDING_TRANSACTIONS`({}) limit", + MAX_PENDING_TRANSACTIONS, + ); + break; + } + + let hash = self.transaction_pool.hash_of(&t); + peer.known_transactions.insert(hash.clone()); + + self.service.report_peer(who.clone(), rep::ANY_TRANSACTION); + + match self.pending_transactions_peers.entry(hash.clone()) { + Entry::Vacant(entry) => { + self.pending_transactions.push(PendingTransaction { + validation: self.transaction_pool.import(t), + tx_hash: hash, + }); + entry.insert(vec![who.clone()]); + }, + Entry::Occupied(mut entry) => { + entry.get_mut().push(who.clone()); + } + } + } + } + } + + fn on_handle_transaction_import(&mut self, who: PeerId, import: TransactionImport) { + match import { + TransactionImport::KnownGood => self.service.report_peer(who, rep::ANY_TRANSACTION_REFUND), + TransactionImport::NewGood => self.service.report_peer(who, rep::GOOD_TRANSACTION), + TransactionImport::Bad => self.service.report_peer(who, rep::BAD_TRANSACTION), + TransactionImport::None => {}, + } + } + + /// Propagate one transaction. + pub fn propagate_transaction( + &mut self, + hash: &H, + ) { + debug!(target: "sync", "Propagating transaction [{:?}]", hash); + // Accept transactions only when enabled + if !self.gossip_enabled.load(Ordering::Relaxed) { + return; + } + if let Some(transaction) = self.transaction_pool.transaction(hash) { + let propagated_to = self.do_propagate_transactions(&[(hash.clone(), transaction)]); + self.transaction_pool.on_broadcasted(propagated_to); + } + } + + fn do_propagate_transactions( + &mut self, + transactions: &[(H, B::Extrinsic)], + ) -> HashMap> { + let mut propagated_to = HashMap::<_, Vec<_>>::new(); + let mut propagated_transactions = 0; + + for (who, peer) in self.peers.iter_mut() { + // never send transactions to the light node + if !matches!(peer.role, ObservedRole::Full) { + continue; + } + + let (hashes, to_send): (Vec<_>, Vec<_>) = transactions + .iter() + .filter(|&(ref hash, _)| peer.known_transactions.insert(hash.clone())) + .cloned() + .unzip(); + + propagated_transactions += hashes.len(); + + if !to_send.is_empty() { + for hash in hashes { + propagated_to + .entry(hash) + .or_default() + .push(who.to_base58()); + } + trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who); + self.service.write_notification( + who.clone(), + self.protocol_name.clone(), + to_send.encode() + ); + } + } + + if let Some(ref metrics) = self.metrics { + metrics.propagated_transactions.inc_by(propagated_transactions as _) + } + + propagated_to + } + + /// Call when we must propagate ready transactions to peers. + fn propagate_transactions(&mut self) { + // Accept transactions only when enabled + if !self.gossip_enabled.load(Ordering::Relaxed) { + return; + } + debug!(target: "sync", "Propagating transactions"); + let transactions = self.transaction_pool.transactions(); + let propagated_to = self.do_propagate_transactions(&transactions); + self.transaction_pool.on_broadcasted(propagated_to); + } +} diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index f523be857507f..6e2380b284783 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -741,6 +741,7 @@ pub trait TestNetFactory: Sized { let network = NetworkWorker::new(sc_network::config::Params { role: Role::Full, executor: None, + transactions_handler_executor: Box::new(|task| { async_std::task::spawn(task); }), network_config, chain: client.clone(), on_demand: None, @@ -831,6 +832,7 @@ pub trait TestNetFactory: Sized { let network = NetworkWorker::new(sc_network::config::Params { role: Role::Light, executor: None, + transactions_handler_executor: Box::new(|task| { async_std::task::spawn(task); }), network_config, chain: client.clone(), on_demand: None, diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 882a6c4062650..971602788cfcd 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -905,6 +905,12 @@ pub fn build_network( spawn_handle.spawn("libp2p-node", fut); })) }, + transactions_handler_executor: { + let spawn_handle = Clone::clone(&spawn_handle); + Box::new(move |fut| { + spawn_handle.spawn("network-transactions-handler", fut); + }) + }, network_config: config.network.clone(), chain: client.clone(), on_demand: on_demand,