Skip to content

Commit

Permalink
pass peer_info around rather than peer_addr (includes protocol versio…
Browse files Browse the repository at this point in the history
…n) (#2761)
  • Loading branch information
antiochp authored Apr 18, 2019
1 parent ea99532 commit 13c6160
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 112 deletions.
35 changes: 21 additions & 14 deletions p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,8 @@ impl Peer {
/// Main peer loop listening for messages and forwarding to the rest of the
/// system.
pub fn start(&mut self, conn: TcpStream) {
let addr = self.info.addr;
let adapter = Arc::new(self.tracking_adapter.clone());
let handler = Protocol::new(adapter, addr);
let handler = Protocol::new(adapter, self.info.clone());
self.connection = Some(Mutex::new(conn::listen(conn, handler)));
}

Expand Down Expand Up @@ -533,9 +532,13 @@ impl ChainAdapter for TrackingAdapter {
self.adapter.get_transaction(kernel_hash)
}

fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) -> Result<bool, chain::Error> {
fn tx_kernel_received(
&self,
kernel_hash: Hash,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
self.push_recv(kernel_hash);
self.adapter.tx_kernel_received(kernel_hash, addr)
self.adapter.tx_kernel_received(kernel_hash, peer_info)
}

fn transaction_received(
Expand All @@ -556,34 +559,38 @@ impl ChainAdapter for TrackingAdapter {
fn block_received(
&self,
b: core::Block,
addr: PeerAddr,
peer_info: &PeerInfo,
_was_requested: bool,
) -> Result<bool, chain::Error> {
let bh = b.hash();
self.push_recv(bh);
self.adapter.block_received(b, addr, self.has_req(bh))
self.adapter.block_received(b, peer_info, self.has_req(bh))
}

fn compact_block_received(
&self,
cb: core::CompactBlock,
addr: PeerAddr,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
self.push_recv(cb.hash());
self.adapter.compact_block_received(cb, addr)
self.adapter.compact_block_received(cb, peer_info)
}

fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> Result<bool, chain::Error> {
fn header_received(
&self,
bh: core::BlockHeader,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
self.push_recv(bh.hash());
self.adapter.header_received(bh, addr)
self.adapter.header_received(bh, peer_info)
}

fn headers_received(
&self,
bh: &[core::BlockHeader],
addr: PeerAddr,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
self.adapter.headers_received(bh, addr)
self.adapter.headers_received(bh, peer_info)
}

fn locate_headers(&self, locator: &[Hash]) -> Result<Vec<core::BlockHeader>, chain::Error> {
Expand All @@ -606,9 +613,9 @@ impl ChainAdapter for TrackingAdapter {
&self,
h: Hash,
txhashset_data: File,
peer_addr: PeerAddr,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
self.adapter.txhashset_write(h, txhashset_data, peer_addr)
self.adapter.txhashset_write(h, txhashset_data, peer_info)
}

fn txhashset_download_update(
Expand Down
66 changes: 30 additions & 36 deletions p2p/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use chrono::Duration;
use crate::peer::Peer;
use crate::store::{PeerData, PeerStore, State};
use crate::types::{
Capabilities, ChainAdapter, Direction, Error, NetAdapter, P2PConfig, PeerAddr, ReasonForBan,
Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan,
TxHashSetRead, MAX_PEER_ADDRS,
};

Expand Down Expand Up @@ -104,12 +104,10 @@ impl Peers {
}

pub fn outgoing_connected_peers(&self) -> Vec<Arc<Peer>> {
let peers = self.connected_peers();
let res = peers
self.connected_peers()
.into_iter()
.filter(|x| x.info.direction == Direction::Outbound)
.collect::<Vec<_>>();
res
.filter(|x| x.info.is_outbound())
.collect()
}

/// Get a peer we're connected to by address.
Expand All @@ -119,20 +117,12 @@ impl Peers {

/// Number of peers currently connected to.
pub fn peer_count(&self) -> u32 {
self.peers
.read()
.values()
.filter(|x| x.is_connected())
.count() as u32
self.connected_peers().len() as u32
}

/// Number of outbound peers currently connected to.
pub fn peer_outbound_count(&self) -> u32 {
self.peers
.read()
.values()
.filter(|x| x.is_connected() && x.info.is_outbound())
.count() as u32
self.outgoing_connected_peers().len() as u32
}

// Return vec of connected peers that currently advertise more work
Expand Down Expand Up @@ -498,8 +488,12 @@ impl ChainAdapter for Peers {
self.adapter.get_transaction(kernel_hash)
}

fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) -> Result<bool, chain::Error> {
self.adapter.tx_kernel_received(kernel_hash, addr)
fn tx_kernel_received(
&self,
kernel_hash: Hash,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
self.adapter.tx_kernel_received(kernel_hash, peer_info)
}

fn transaction_received(
Expand All @@ -513,18 +507,18 @@ impl ChainAdapter for Peers {
fn block_received(
&self,
b: core::Block,
peer_addr: PeerAddr,
peer_info: &PeerInfo,
was_requested: bool,
) -> Result<bool, chain::Error> {
let hash = b.hash();
if !self.adapter.block_received(b, peer_addr, was_requested)? {
if !self.adapter.block_received(b, peer_info, was_requested)? {
// if the peer sent us a block that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban
debug!(
"Received a bad block {} from {}, the peer will be banned",
hash, peer_addr
hash, peer_info.addr,
);
self.ban_peer(peer_addr, ReasonForBan::BadBlock);
self.ban_peer(peer_info.addr, ReasonForBan::BadBlock);
Ok(false)
} else {
Ok(true)
Expand All @@ -534,17 +528,17 @@ impl ChainAdapter for Peers {
fn compact_block_received(
&self,
cb: core::CompactBlock,
peer_addr: PeerAddr,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
let hash = cb.hash();
if !self.adapter.compact_block_received(cb, peer_addr)? {
if !self.adapter.compact_block_received(cb, peer_info)? {
// if the peer sent us a block that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban
debug!(
"Received a bad compact block {} from {}, the peer will be banned",
hash, peer_addr
hash, peer_info.addr
);
self.ban_peer(peer_addr, ReasonForBan::BadCompactBlock);
self.ban_peer(peer_info.addr, ReasonForBan::BadCompactBlock);
Ok(false)
} else {
Ok(true)
Expand All @@ -554,12 +548,12 @@ impl ChainAdapter for Peers {
fn header_received(
&self,
bh: core::BlockHeader,
peer_addr: PeerAddr,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
if !self.adapter.header_received(bh, peer_addr)? {
if !self.adapter.header_received(bh, peer_info)? {
// if the peer sent us a block header that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban
self.ban_peer(peer_addr, ReasonForBan::BadBlockHeader);
self.ban_peer(peer_info.addr, ReasonForBan::BadBlockHeader);
Ok(false)
} else {
Ok(true)
Expand All @@ -569,12 +563,12 @@ impl ChainAdapter for Peers {
fn headers_received(
&self,
headers: &[core::BlockHeader],
peer_addr: PeerAddr,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
if !self.adapter.headers_received(headers, peer_addr)? {
if !self.adapter.headers_received(headers, peer_info)? {
// if the peer sent us a block header that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban
self.ban_peer(peer_addr, ReasonForBan::BadBlockHeader);
self.ban_peer(peer_info.addr, ReasonForBan::BadBlockHeader);
Ok(false)
} else {
Ok(true)
Expand All @@ -601,14 +595,14 @@ impl ChainAdapter for Peers {
&self,
h: Hash,
txhashset_data: File,
peer_addr: PeerAddr,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
if !self.adapter.txhashset_write(h, txhashset_data, peer_addr)? {
if !self.adapter.txhashset_write(h, txhashset_data, peer_info)? {
debug!(
"Received a bad txhashset data from {}, the peer will be banned",
&peer_addr
peer_info.addr
);
self.ban_peer(peer_addr, ReasonForBan::BadTxHashSet);
self.ban_peer(peer_info.addr, ReasonForBan::BadTxHashSet);
Ok(false)
} else {
Ok(true)
Expand Down
28 changes: 14 additions & 14 deletions p2p/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ use crate::msg::{
BanReason, GetPeerAddrs, Headers, Locator, PeerAddrs, Ping, Pong, TxHashSetArchive,
TxHashSetRequest, Type,
};
use crate::types::{Error, NetAdapter, PeerAddr};
use crate::types::{Error, NetAdapter, PeerInfo};

pub struct Protocol {
adapter: Arc<dyn NetAdapter>,
addr: PeerAddr,
peer_info: PeerInfo,
}

impl Protocol {
pub fn new(adapter: Arc<dyn NetAdapter>, addr: PeerAddr) -> Protocol {
Protocol { adapter, addr }
pub fn new(adapter: Arc<dyn NetAdapter>, peer_info: PeerInfo) -> Protocol {
Protocol { adapter, peer_info }
}
}

Expand All @@ -52,18 +52,18 @@ impl MessageHandler for Protocol {
// If we received a msg from a banned peer then log and drop it.
// If we are getting a lot of these then maybe we are not cleaning
// banned peers up correctly?
if adapter.is_banned(self.addr.clone()) {
if adapter.is_banned(self.peer_info.addr) {
debug!(
"handler: consume: peer {:?} banned, received: {:?}, dropping.",
self.addr, msg.header.msg_type,
self.peer_info.addr, msg.header.msg_type,
);
return Ok(None);
}

match msg.header.msg_type {
Type::Ping => {
let ping: Ping = msg.body()?;
adapter.peer_difficulty(self.addr, ping.total_difficulty, ping.height);
adapter.peer_difficulty(self.peer_info.addr, ping.total_difficulty, ping.height);

Ok(Some(Response::new(
Type::Pong,
Expand All @@ -77,7 +77,7 @@ impl MessageHandler for Protocol {

Type::Pong => {
let pong: Pong = msg.body()?;
adapter.peer_difficulty(self.addr, pong.total_difficulty, pong.height);
adapter.peer_difficulty(self.peer_info.addr, pong.total_difficulty, pong.height);
Ok(None)
}

Expand All @@ -93,7 +93,7 @@ impl MessageHandler for Protocol {
"handle_payload: received tx kernel: {}, msg_len: {}",
h, msg.header.msg_len
);
adapter.tx_kernel_received(h, self.addr)?;
adapter.tx_kernel_received(h, &self.peer_info)?;
Ok(None)
}

Expand Down Expand Up @@ -155,7 +155,7 @@ impl MessageHandler for Protocol {

// we can't know at this level whether we requested the block or not,
// the boolean should be properly set in higher level adapter
adapter.block_received(b, self.addr, false)?;
adapter.block_received(b, &self.peer_info, false)?;
Ok(None)
}

Expand All @@ -176,7 +176,7 @@ impl MessageHandler for Protocol {
);
let b: core::CompactBlock = msg.body()?;

adapter.compact_block_received(b, self.addr)?;
adapter.compact_block_received(b, &self.peer_info)?;
Ok(None)
}

Expand All @@ -197,7 +197,7 @@ impl MessageHandler for Protocol {
// we can go request it from some of our peers
Type::Header => {
let header: core::BlockHeader = msg.body()?;
adapter.header_received(header, self.addr)?;
adapter.header_received(header, &self.peer_info)?;
Ok(None)
}

Expand All @@ -217,7 +217,7 @@ impl MessageHandler for Protocol {
headers.push(header);
total_bytes_read += bytes_read;
}
adapter.headers_received(&headers, self.addr)?;
adapter.headers_received(&headers, &self.peer_info)?;
}

// Now check we read the correct total number of bytes off the stream.
Expand Down Expand Up @@ -335,7 +335,7 @@ impl MessageHandler for Protocol {
let tmp_zip = File::open(tmp)?;
let res = self
.adapter
.txhashset_write(sm_arch.hash, tmp_zip, self.addr)?;
.txhashset_write(sm_arch.hash, tmp_zip, &self.peer_info)?;

debug!(
"handle_payload: txhashset archive for {} at {}, DONE. Data Ok: {}",
Expand Down
Loading

0 comments on commit 13c6160

Please sign in to comment.