Skip to content

Commit

Permalink
Simplify peer connection handling (#2801)
Browse files Browse the repository at this point in the history
* connection no longer wrapped in an Option in peer

* introduce peer.send()

* remove some Arc indirection

* self.send() cleanup

* extract Peer:new() from connect and accept

* fixup

* cleanup
  • Loading branch information
antiochp authored May 3, 2019
1 parent 6c54c90 commit 4ef4212
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 92 deletions.
5 changes: 5 additions & 0 deletions p2p/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ impl Tracker {

Ok(())
}

/// Schedule this connection to safely close via the async close_channel.
pub fn close(&self) {
let _ = self.close_channel.send(());
}
}

/// Start listening on the provided connection and wraps it. Does not hang
Expand Down
132 changes: 52 additions & 80 deletions p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ use crate::chain;
use crate::conn;
use crate::core::core::hash::{Hash, Hashed};
use crate::core::pow::Difficulty;
use crate::core::ser::Writeable;
use crate::core::{core, global};
use crate::handshake::Handshake;
use crate::msg::{self, BanReason, GetPeerAddrs, Locator, Ping, TxHashSetRequest};
use crate::msg::{self, BanReason, GetPeerAddrs, Locator, Ping, TxHashSetRequest, Type};
use crate::protocol::Protocol;
use crate::types::{
Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan,
Expand All @@ -50,16 +51,7 @@ pub struct Peer {
state: Arc<RwLock<State>>,
// set of all hashes known to this peer (so no need to send)
tracking_adapter: TrackingAdapter,
connection: Option<Mutex<conn::Tracker>>,
}

macro_rules! connection {
($holder:expr) => {
match $holder.connection.as_ref() {
Some(conn) => conn.lock(),
None => return Err(Error::ConnectionClose),
}
};
connection: Mutex<conn::Tracker>,
}

impl fmt::Debug for Peer {
Expand All @@ -70,26 +62,30 @@ impl fmt::Debug for Peer {

impl Peer {
// Only accept and connect can be externally used to build a peer
fn new(info: PeerInfo, adapter: Arc<dyn NetAdapter>) -> Peer {
fn new(info: PeerInfo, conn: TcpStream, adapter: Arc<dyn NetAdapter>) -> Peer {
let state = Arc::new(RwLock::new(State::Connected));
let tracking_adapter = TrackingAdapter::new(adapter);
let handler = Protocol::new(Arc::new(tracking_adapter.clone()), info.clone());
let connection = Mutex::new(conn::listen(conn, handler));
Peer {
info,
state: Arc::new(RwLock::new(State::Connected)),
tracking_adapter: TrackingAdapter::new(adapter),
connection: None,
state,
tracking_adapter,
connection,
}
}

pub fn accept(
conn: &mut TcpStream,
mut conn: TcpStream,
capab: Capabilities,
total_difficulty: Difficulty,
hs: &Handshake,
adapter: Arc<dyn NetAdapter>,
) -> Result<Peer, Error> {
debug!("accept: handshaking from {:?}", conn.peer_addr());
let info = hs.accept(capab, total_difficulty, conn);
let info = hs.accept(capab, total_difficulty, &mut conn);
match info {
Ok(peer_info) => Ok(Peer::new(peer_info, adapter)),
Ok(info) => Ok(Peer::new(info, conn, adapter)),
Err(e) => {
debug!(
"accept: handshaking from {:?} failed with error: {:?}",
Expand All @@ -105,17 +101,17 @@ impl Peer {
}

pub fn connect(
conn: &mut TcpStream,
mut conn: TcpStream,
capab: Capabilities,
total_difficulty: Difficulty,
self_addr: PeerAddr,
hs: &Handshake,
na: Arc<dyn NetAdapter>,
adapter: Arc<dyn NetAdapter>,
) -> Result<Peer, Error> {
debug!("connect: handshaking with {:?}", conn.peer_addr());
let info = hs.initiate(capab, total_difficulty, self_addr, conn);
let info = hs.initiate(capab, total_difficulty, self_addr, &mut conn);
match info {
Ok(peer_info) => Ok(Peer::new(peer_info, na)),
Ok(info) => Ok(Peer::new(info, conn, adapter)),
Err(e) => {
debug!(
"connect: handshaking with {:?} failed with error: {:?}",
Expand All @@ -130,14 +126,6 @@ impl Peer {
}
}

/// Main peer loop listening for messages and forwarding to the rest of the
/// system.
pub fn start(&mut self, conn: TcpStream) {
let adapter = Arc::new(self.tracking_adapter.clone());
let handler = Protocol::new(adapter, self.info.clone());
self.connection = Some(Mutex::new(conn::listen(conn, handler)));
}

pub fn is_denied(config: &P2PConfig, peer_addr: PeerAddr) -> bool {
if let Some(ref denied) = config.peers_deny {
if denied.contains(&peer_addr) {
Expand Down Expand Up @@ -171,9 +159,6 @@ impl Peer {

/// Whether this peer is currently connected.
pub fn is_connected(&self) -> bool {
if self.connection.is_none() {
return false;
}
State::Connected == *self.state.read()
}

Expand All @@ -196,76 +181,65 @@ impl Peer {

/// Whether the peer is considered abusive, mostly for spammy nodes
pub fn is_abusive(&self) -> bool {
if let Some(ref conn) = self.connection {
let conn = conn.lock();
let rec = conn.received_bytes.read();
let sent = conn.sent_bytes.read();
rec.count_per_min() > MAX_PEER_MSG_PER_MIN
|| sent.count_per_min() > MAX_PEER_MSG_PER_MIN
} else {
false
}
let conn = self.connection.lock();
let rec = conn.received_bytes.read();
let sent = conn.sent_bytes.read();
rec.count_per_min() > MAX_PEER_MSG_PER_MIN || sent.count_per_min() > MAX_PEER_MSG_PER_MIN
}

/// Number of bytes sent to the peer
pub fn last_min_sent_bytes(&self) -> Option<u64> {
if let Some(ref tracker) = self.connection {
let conn = tracker.lock();
let sent_bytes = conn.sent_bytes.read();
return Some(sent_bytes.bytes_per_min());
}
None
let conn = self.connection.lock();
let sent_bytes = conn.sent_bytes.read();
Some(sent_bytes.bytes_per_min())
}

/// Number of bytes received from the peer
pub fn last_min_received_bytes(&self) -> Option<u64> {
if let Some(ref tracker) = self.connection {
let conn = tracker.lock();
let received_bytes = conn.received_bytes.read();
return Some(received_bytes.bytes_per_min());
}
None
let conn = self.connection.lock();
let received_bytes = conn.received_bytes.read();
Some(received_bytes.bytes_per_min())
}

pub fn last_min_message_counts(&self) -> Option<(u64, u64)> {
if let Some(ref tracker) = self.connection {
let conn = tracker.lock();
let received_bytes = conn.received_bytes.read();
let sent_bytes = conn.sent_bytes.read();
return Some((sent_bytes.count_per_min(), received_bytes.count_per_min()));
}
None
let conn = self.connection.lock();
let received_bytes = conn.received_bytes.read();
let sent_bytes = conn.sent_bytes.read();
Some((sent_bytes.count_per_min(), received_bytes.count_per_min()))
}

/// Set this peer status to banned
pub fn set_banned(&self) {
*self.state.write() = State::Banned;
}

/// Send a msg with given msg_type to our peer via the connection.
fn send<T: Writeable>(&self, msg: T, msg_type: Type) -> Result<(), Error> {
self.connection.lock().send(msg, msg_type)
}

/// Send a ping to the remote peer, providing our local difficulty and
/// height
pub fn send_ping(&self, total_difficulty: Difficulty, height: u64) -> Result<(), Error> {
let ping_msg = Ping {
total_difficulty,
height,
};
connection!(self).send(ping_msg, msg::Type::Ping)
self.send(ping_msg, msg::Type::Ping)
}

/// Send the ban reason before banning
pub fn send_ban_reason(&self, ban_reason: ReasonForBan) -> Result<(), Error> {
let ban_reason_msg = BanReason { ban_reason };
connection!(self)
.send(ban_reason_msg, msg::Type::BanReason)
.map(|_| ())
self.send(ban_reason_msg, msg::Type::BanReason).map(|_| ())
}

/// Sends the provided block to the remote peer. The request may be dropped
/// if the remote peer is known to already have the block.
pub fn send_block(&self, b: &core::Block) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(b.hash()) {
trace!("Send block {} to {}", b.hash(), self.info.addr);
connection!(self).send(b, msg::Type::Block)?;
self.send(b, msg::Type::Block)?;
Ok(true)
} else {
debug!(
Expand All @@ -280,7 +254,7 @@ impl Peer {
pub fn send_compact_block(&self, b: &core::CompactBlock) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(b.hash()) {
trace!("Send compact block {} to {}", b.hash(), self.info.addr);
connection!(self).send(b, msg::Type::CompactBlock)?;
self.send(b, msg::Type::CompactBlock)?;
Ok(true)
} else {
debug!(
Expand All @@ -295,7 +269,7 @@ impl Peer {
pub fn send_header(&self, bh: &core::BlockHeader) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(bh.hash()) {
debug!("Send header {} to {}", bh.hash(), self.info.addr);
connection!(self).send(bh, msg::Type::Header)?;
self.send(bh, msg::Type::Header)?;
Ok(true)
} else {
debug!(
Expand All @@ -310,7 +284,7 @@ impl Peer {
pub fn send_tx_kernel_hash(&self, h: Hash) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(h) {
debug!("Send tx kernel hash {} to {}", h, self.info.addr);
connection!(self).send(h, msg::Type::TransactionKernel)?;
self.send(h, msg::Type::TransactionKernel)?;
Ok(true)
} else {
debug!(
Expand Down Expand Up @@ -338,7 +312,7 @@ impl Peer {

if !self.tracking_adapter.has_recv(kernel.hash()) {
debug!("Send full tx {} to {}", tx.hash(), self.info.addr);
connection!(self).send(tx, msg::Type::Transaction)?;
self.send(tx, msg::Type::Transaction)?;
Ok(true)
} else {
debug!(
Expand All @@ -355,38 +329,38 @@ impl Peer {
/// embargo).
pub fn send_stem_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
debug!("Send (stem) tx {} to {}", tx.hash(), self.info.addr);
connection!(self).send(tx, msg::Type::StemTransaction)
self.send(tx, msg::Type::StemTransaction)
}

/// Sends a request for block headers from the provided block locator
pub fn send_header_request(&self, locator: Vec<Hash>) -> Result<(), Error> {
connection!(self).send(&Locator { hashes: locator }, msg::Type::GetHeaders)
self.send(&Locator { hashes: locator }, msg::Type::GetHeaders)
}

pub fn send_tx_request(&self, h: Hash) -> Result<(), Error> {
debug!(
"Requesting tx (kernel hash) {} from peer {}.",
h, self.info.addr
);
connection!(self).send(&h, msg::Type::GetTransaction)
self.send(&h, msg::Type::GetTransaction)
}

/// Sends a request for a specific block by hash
pub fn send_block_request(&self, h: Hash) -> Result<(), Error> {
debug!("Requesting block {} from peer {}.", h, self.info.addr);
self.tracking_adapter.push_req(h);
connection!(self).send(&h, msg::Type::GetBlock)
self.send(&h, msg::Type::GetBlock)
}

/// Sends a request for a specific compact block by hash
pub fn send_compact_block_request(&self, h: Hash) -> Result<(), Error> {
debug!("Requesting compact block {} from {}", h, self.info.addr);
connection!(self).send(&h, msg::Type::GetCompactBlock)
self.send(&h, msg::Type::GetCompactBlock)
}

pub fn send_peer_request(&self, capab: Capabilities) -> Result<(), Error> {
trace!("Asking {} for more peers {:?}", self.info.addr, capab);
connection!(self).send(
self.send(
&GetPeerAddrs {
capabilities: capab,
},
Expand All @@ -399,17 +373,15 @@ impl Peer {
"Asking {} for txhashset archive at {} {}.",
self.info.addr, height, hash
);
connection!(self).send(
self.send(
&TxHashSetRequest { hash, height },
msg::Type::TxHashSetRequest,
)
}

/// Stops the peer, closing its connection
pub fn stop(&self) {
if let Some(conn) = self.connection.as_ref() {
let _ = conn.lock().close_channel.send(());
}
self.connection.lock().close();
}
}

Expand Down
14 changes: 6 additions & 8 deletions p2p/src/serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,19 +138,18 @@ impl Server {
addr
);
match TcpStream::connect_timeout(&addr.0, Duration::from_secs(10)) {
Ok(mut stream) => {
Ok(stream) => {
let addr = SocketAddr::new(self.config.host, self.config.port);
let total_diff = self.peers.total_difficulty()?;

let mut peer = Peer::connect(
&mut stream,
let peer = Peer::connect(
stream,
self.capabilities,
total_diff,
PeerAddr(addr),
&self.handshake,
self.peers.clone(),
)?;
peer.start(stream);
let peer = Arc::new(peer);
self.peers.add_connected(peer.clone())?;
Ok(peer)
Expand All @@ -168,18 +167,17 @@ impl Server {
}
}

fn handle_new_peer(&self, mut stream: TcpStream) -> Result<(), Error> {
fn handle_new_peer(&self, stream: TcpStream) -> Result<(), Error> {
let total_diff = self.peers.total_difficulty()?;

// accept the peer and add it to the server map
let mut peer = Peer::accept(
&mut stream,
let peer = Peer::accept(
stream,
self.capabilities,
total_diff,
&self.handshake,
self.peers.clone(),
)?;
peer.start(stream);
self.peers.add_connected(Arc::new(peer))?;
Ok(())
}
Expand Down
7 changes: 3 additions & 4 deletions p2p/tests/peer_handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ fn peer_handshake() {
thread::sleep(time::Duration::from_secs(1));

let addr = SocketAddr::new(p2p_config.host, p2p_config.port);
let mut socket = TcpStream::connect_timeout(&addr, time::Duration::from_secs(10)).unwrap();
let socket = TcpStream::connect_timeout(&addr, time::Duration::from_secs(10)).unwrap();

let my_addr = PeerAddr("127.0.0.1:5000".parse().unwrap());
let mut peer = Peer::connect(
&mut socket,
let peer = Peer::connect(
socket,
p2p::Capabilities::UNKNOWN,
Difficulty::min(),
my_addr,
Expand All @@ -82,7 +82,6 @@ fn peer_handshake() {

assert!(peer.info.user_agent.ends_with(env!("CARGO_PKG_VERSION")));

peer.start(socket);
thread::sleep(time::Duration::from_secs(1));

peer.send_ping(Difficulty::min(), 0).unwrap();
Expand Down

0 comments on commit 4ef4212

Please sign in to comment.