Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify peer connection handling #2801

Merged
merged 7 commits into from
May 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions p2p/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ impl Tracker {

Ok(())
}

/// Schedule this connection to safely close via the async close_channel.
pub fn close(&self) {
let _ = self.close_channel.send(());
}
}

/// Start listening on the provided connection and wraps it. Does not hang
Expand Down
132 changes: 52 additions & 80 deletions p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ use crate::chain;
use crate::conn;
use crate::core::core::hash::{Hash, Hashed};
use crate::core::pow::Difficulty;
use crate::core::ser::Writeable;
use crate::core::{core, global};
use crate::handshake::Handshake;
use crate::msg::{self, BanReason, GetPeerAddrs, Locator, Ping, TxHashSetRequest};
use crate::msg::{self, BanReason, GetPeerAddrs, Locator, Ping, TxHashSetRequest, Type};
use crate::protocol::Protocol;
use crate::types::{
Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan,
Expand All @@ -50,16 +51,7 @@ pub struct Peer {
state: Arc<RwLock<State>>,
// set of all hashes known to this peer (so no need to send)
tracking_adapter: TrackingAdapter,
connection: Option<Mutex<conn::Tracker>>,
}

macro_rules! connection {
($holder:expr) => {
match $holder.connection.as_ref() {
Some(conn) => conn.lock(),
None => return Err(Error::ConnectionClose),
}
};
connection: Mutex<conn::Tracker>,
}

impl fmt::Debug for Peer {
Expand All @@ -70,26 +62,30 @@ impl fmt::Debug for Peer {

impl Peer {
// Only accept and connect can be externally used to build a peer
fn new(info: PeerInfo, adapter: Arc<dyn NetAdapter>) -> Peer {
fn new(info: PeerInfo, conn: TcpStream, adapter: Arc<dyn NetAdapter>) -> Peer {
let state = Arc::new(RwLock::new(State::Connected));
let tracking_adapter = TrackingAdapter::new(adapter);
let handler = Protocol::new(Arc::new(tracking_adapter.clone()), info.clone());
let connection = Mutex::new(conn::listen(conn, handler));
Peer {
info,
state: Arc::new(RwLock::new(State::Connected)),
tracking_adapter: TrackingAdapter::new(adapter),
connection: None,
state,
tracking_adapter,
connection,
}
}

pub fn accept(
conn: &mut TcpStream,
mut conn: TcpStream,
capab: Capabilities,
total_difficulty: Difficulty,
hs: &Handshake,
adapter: Arc<dyn NetAdapter>,
) -> Result<Peer, Error> {
debug!("accept: handshaking from {:?}", conn.peer_addr());
let info = hs.accept(capab, total_difficulty, conn);
let info = hs.accept(capab, total_difficulty, &mut conn);
match info {
Ok(peer_info) => Ok(Peer::new(peer_info, adapter)),
Ok(info) => Ok(Peer::new(info, conn, adapter)),
Err(e) => {
debug!(
"accept: handshaking from {:?} failed with error: {:?}",
Expand All @@ -105,17 +101,17 @@ impl Peer {
}

pub fn connect(
conn: &mut TcpStream,
mut conn: TcpStream,
capab: Capabilities,
total_difficulty: Difficulty,
self_addr: PeerAddr,
hs: &Handshake,
na: Arc<dyn NetAdapter>,
adapter: Arc<dyn NetAdapter>,
) -> Result<Peer, Error> {
debug!("connect: handshaking with {:?}", conn.peer_addr());
let info = hs.initiate(capab, total_difficulty, self_addr, conn);
let info = hs.initiate(capab, total_difficulty, self_addr, &mut conn);
match info {
Ok(peer_info) => Ok(Peer::new(peer_info, na)),
Ok(info) => Ok(Peer::new(info, conn, adapter)),
Err(e) => {
debug!(
"connect: handshaking with {:?} failed with error: {:?}",
Expand All @@ -130,14 +126,6 @@ impl Peer {
}
}

/// Main peer loop listening for messages and forwarding to the rest of the
/// system.
pub fn start(&mut self, conn: TcpStream) {
let adapter = Arc::new(self.tracking_adapter.clone());
let handler = Protocol::new(adapter, self.info.clone());
self.connection = Some(Mutex::new(conn::listen(conn, handler)));
}

pub fn is_denied(config: &P2PConfig, peer_addr: PeerAddr) -> bool {
if let Some(ref denied) = config.peers_deny {
if denied.contains(&peer_addr) {
Expand Down Expand Up @@ -171,9 +159,6 @@ impl Peer {

/// Whether this peer is currently connected.
pub fn is_connected(&self) -> bool {
if self.connection.is_none() {
return false;
}
State::Connected == *self.state.read()
}

Expand All @@ -196,76 +181,65 @@ impl Peer {

/// Whether the peer is considered abusive, mostly for spammy nodes
pub fn is_abusive(&self) -> bool {
if let Some(ref conn) = self.connection {
let conn = conn.lock();
let rec = conn.received_bytes.read();
let sent = conn.sent_bytes.read();
rec.count_per_min() > MAX_PEER_MSG_PER_MIN
|| sent.count_per_min() > MAX_PEER_MSG_PER_MIN
} else {
false
}
let conn = self.connection.lock();
let rec = conn.received_bytes.read();
let sent = conn.sent_bytes.read();
rec.count_per_min() > MAX_PEER_MSG_PER_MIN || sent.count_per_min() > MAX_PEER_MSG_PER_MIN
}

/// Number of bytes sent to the peer
pub fn last_min_sent_bytes(&self) -> Option<u64> {
if let Some(ref tracker) = self.connection {
let conn = tracker.lock();
let sent_bytes = conn.sent_bytes.read();
return Some(sent_bytes.bytes_per_min());
}
None
let conn = self.connection.lock();
let sent_bytes = conn.sent_bytes.read();
Some(sent_bytes.bytes_per_min())
}

/// Number of bytes received from the peer
pub fn last_min_received_bytes(&self) -> Option<u64> {
if let Some(ref tracker) = self.connection {
let conn = tracker.lock();
let received_bytes = conn.received_bytes.read();
return Some(received_bytes.bytes_per_min());
}
None
let conn = self.connection.lock();
let received_bytes = conn.received_bytes.read();
Some(received_bytes.bytes_per_min())
}

pub fn last_min_message_counts(&self) -> Option<(u64, u64)> {
if let Some(ref tracker) = self.connection {
let conn = tracker.lock();
let received_bytes = conn.received_bytes.read();
let sent_bytes = conn.sent_bytes.read();
return Some((sent_bytes.count_per_min(), received_bytes.count_per_min()));
}
None
let conn = self.connection.lock();
let received_bytes = conn.received_bytes.read();
let sent_bytes = conn.sent_bytes.read();
Some((sent_bytes.count_per_min(), received_bytes.count_per_min()))
}

/// Set this peer status to banned
pub fn set_banned(&self) {
*self.state.write() = State::Banned;
}

/// Send a msg with given msg_type to our peer via the connection.
fn send<T: Writeable>(&self, msg: T, msg_type: Type) -> Result<(), Error> {
self.connection.lock().send(msg, msg_type)
}

/// Send a ping to the remote peer, providing our local difficulty and
/// height
pub fn send_ping(&self, total_difficulty: Difficulty, height: u64) -> Result<(), Error> {
let ping_msg = Ping {
total_difficulty,
height,
};
connection!(self).send(ping_msg, msg::Type::Ping)
self.send(ping_msg, msg::Type::Ping)
}

/// Send the ban reason before banning
pub fn send_ban_reason(&self, ban_reason: ReasonForBan) -> Result<(), Error> {
let ban_reason_msg = BanReason { ban_reason };
connection!(self)
.send(ban_reason_msg, msg::Type::BanReason)
.map(|_| ())
self.send(ban_reason_msg, msg::Type::BanReason).map(|_| ())
}

/// Sends the provided block to the remote peer. The request may be dropped
/// if the remote peer is known to already have the block.
pub fn send_block(&self, b: &core::Block) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(b.hash()) {
trace!("Send block {} to {}", b.hash(), self.info.addr);
connection!(self).send(b, msg::Type::Block)?;
self.send(b, msg::Type::Block)?;
Ok(true)
} else {
debug!(
Expand All @@ -280,7 +254,7 @@ impl Peer {
pub fn send_compact_block(&self, b: &core::CompactBlock) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(b.hash()) {
trace!("Send compact block {} to {}", b.hash(), self.info.addr);
connection!(self).send(b, msg::Type::CompactBlock)?;
self.send(b, msg::Type::CompactBlock)?;
Ok(true)
} else {
debug!(
Expand All @@ -295,7 +269,7 @@ impl Peer {
pub fn send_header(&self, bh: &core::BlockHeader) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(bh.hash()) {
debug!("Send header {} to {}", bh.hash(), self.info.addr);
connection!(self).send(bh, msg::Type::Header)?;
self.send(bh, msg::Type::Header)?;
Ok(true)
} else {
debug!(
Expand All @@ -310,7 +284,7 @@ impl Peer {
pub fn send_tx_kernel_hash(&self, h: Hash) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(h) {
debug!("Send tx kernel hash {} to {}", h, self.info.addr);
connection!(self).send(h, msg::Type::TransactionKernel)?;
self.send(h, msg::Type::TransactionKernel)?;
Ok(true)
} else {
debug!(
Expand Down Expand Up @@ -338,7 +312,7 @@ impl Peer {

if !self.tracking_adapter.has_recv(kernel.hash()) {
debug!("Send full tx {} to {}", tx.hash(), self.info.addr);
connection!(self).send(tx, msg::Type::Transaction)?;
self.send(tx, msg::Type::Transaction)?;
Ok(true)
} else {
debug!(
Expand All @@ -355,38 +329,38 @@ impl Peer {
/// embargo).
pub fn send_stem_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
debug!("Send (stem) tx {} to {}", tx.hash(), self.info.addr);
connection!(self).send(tx, msg::Type::StemTransaction)
self.send(tx, msg::Type::StemTransaction)
}

/// Sends a request for block headers from the provided block locator
pub fn send_header_request(&self, locator: Vec<Hash>) -> Result<(), Error> {
connection!(self).send(&Locator { hashes: locator }, msg::Type::GetHeaders)
self.send(&Locator { hashes: locator }, msg::Type::GetHeaders)
}

pub fn send_tx_request(&self, h: Hash) -> Result<(), Error> {
debug!(
"Requesting tx (kernel hash) {} from peer {}.",
h, self.info.addr
);
connection!(self).send(&h, msg::Type::GetTransaction)
self.send(&h, msg::Type::GetTransaction)
}

/// Sends a request for a specific block by hash
pub fn send_block_request(&self, h: Hash) -> Result<(), Error> {
debug!("Requesting block {} from peer {}.", h, self.info.addr);
self.tracking_adapter.push_req(h);
connection!(self).send(&h, msg::Type::GetBlock)
self.send(&h, msg::Type::GetBlock)
}

/// Sends a request for a specific compact block by hash
pub fn send_compact_block_request(&self, h: Hash) -> Result<(), Error> {
debug!("Requesting compact block {} from {}", h, self.info.addr);
connection!(self).send(&h, msg::Type::GetCompactBlock)
self.send(&h, msg::Type::GetCompactBlock)
}

pub fn send_peer_request(&self, capab: Capabilities) -> Result<(), Error> {
trace!("Asking {} for more peers {:?}", self.info.addr, capab);
connection!(self).send(
self.send(
&GetPeerAddrs {
capabilities: capab,
},
Expand All @@ -399,17 +373,15 @@ impl Peer {
"Asking {} for txhashset archive at {} {}.",
self.info.addr, height, hash
);
connection!(self).send(
self.send(
&TxHashSetRequest { hash, height },
msg::Type::TxHashSetRequest,
)
}

/// Stops the peer, closing its connection
pub fn stop(&self) {
if let Some(conn) = self.connection.as_ref() {
let _ = conn.lock().close_channel.send(());
}
self.connection.lock().close();
}
}

Expand Down
14 changes: 6 additions & 8 deletions p2p/src/serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,19 +138,18 @@ impl Server {
addr
);
match TcpStream::connect_timeout(&addr.0, Duration::from_secs(10)) {
Ok(mut stream) => {
Ok(stream) => {
let addr = SocketAddr::new(self.config.host, self.config.port);
let total_diff = self.peers.total_difficulty()?;

let mut peer = Peer::connect(
&mut stream,
let peer = Peer::connect(
stream,
self.capabilities,
total_diff,
PeerAddr(addr),
&self.handshake,
self.peers.clone(),
)?;
peer.start(stream);
let peer = Arc::new(peer);
self.peers.add_connected(peer.clone())?;
Ok(peer)
Expand All @@ -168,18 +167,17 @@ impl Server {
}
}

fn handle_new_peer(&self, mut stream: TcpStream) -> Result<(), Error> {
fn handle_new_peer(&self, stream: TcpStream) -> Result<(), Error> {
let total_diff = self.peers.total_difficulty()?;

// accept the peer and add it to the server map
let mut peer = Peer::accept(
&mut stream,
let peer = Peer::accept(
stream,
self.capabilities,
total_diff,
&self.handshake,
self.peers.clone(),
)?;
peer.start(stream);
self.peers.add_connected(Arc::new(peer))?;
Ok(())
}
Expand Down
7 changes: 3 additions & 4 deletions p2p/tests/peer_handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ fn peer_handshake() {
thread::sleep(time::Duration::from_secs(1));

let addr = SocketAddr::new(p2p_config.host, p2p_config.port);
let mut socket = TcpStream::connect_timeout(&addr, time::Duration::from_secs(10)).unwrap();
let socket = TcpStream::connect_timeout(&addr, time::Duration::from_secs(10)).unwrap();

let my_addr = PeerAddr("127.0.0.1:5000".parse().unwrap());
let mut peer = Peer::connect(
&mut socket,
let peer = Peer::connect(
socket,
p2p::Capabilities::UNKNOWN,
Difficulty::min(),
my_addr,
Expand All @@ -82,7 +82,6 @@ fn peer_handshake() {

assert!(peer.info.user_agent.ends_with(env!("CARGO_PKG_VERSION")));

peer.start(socket);
thread::sleep(time::Duration::from_secs(1));

peer.send_ping(Difficulty::min(), 0).unwrap();
Expand Down