Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pass peer_info around rather than peer_addr (includes protocol version) #2761

Merged
merged 1 commit into from
Apr 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,8 @@ impl Peer {
/// Main peer loop listening for messages and forwarding to the rest of the
/// system.
pub fn start(&mut self, conn: TcpStream) {
let addr = self.info.addr;
let adapter = Arc::new(self.tracking_adapter.clone());
let handler = Protocol::new(adapter, addr);
let handler = Protocol::new(adapter, self.info.clone());
self.connection = Some(Mutex::new(conn::listen(conn, handler)));
}

Expand Down Expand Up @@ -533,9 +532,13 @@ impl ChainAdapter for TrackingAdapter {
self.adapter.get_transaction(kernel_hash)
}

fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) -> Result<bool, chain::Error> {
fn tx_kernel_received(
&self,
kernel_hash: Hash,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
self.push_recv(kernel_hash);
self.adapter.tx_kernel_received(kernel_hash, addr)
self.adapter.tx_kernel_received(kernel_hash, peer_info)
}

fn transaction_received(
Expand All @@ -556,34 +559,38 @@ impl ChainAdapter for TrackingAdapter {
fn block_received(
&self,
b: core::Block,
addr: PeerAddr,
peer_info: &PeerInfo,
_was_requested: bool,
) -> Result<bool, chain::Error> {
let bh = b.hash();
self.push_recv(bh);
self.adapter.block_received(b, addr, self.has_req(bh))
self.adapter.block_received(b, peer_info, self.has_req(bh))
}

fn compact_block_received(
&self,
cb: core::CompactBlock,
addr: PeerAddr,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
self.push_recv(cb.hash());
self.adapter.compact_block_received(cb, addr)
self.adapter.compact_block_received(cb, peer_info)
}

fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> Result<bool, chain::Error> {
fn header_received(
&self,
bh: core::BlockHeader,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
self.push_recv(bh.hash());
self.adapter.header_received(bh, addr)
self.adapter.header_received(bh, peer_info)
}

fn headers_received(
&self,
bh: &[core::BlockHeader],
addr: PeerAddr,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
self.adapter.headers_received(bh, addr)
self.adapter.headers_received(bh, peer_info)
}

fn locate_headers(&self, locator: &[Hash]) -> Result<Vec<core::BlockHeader>, chain::Error> {
Expand All @@ -606,9 +613,9 @@ impl ChainAdapter for TrackingAdapter {
&self,
h: Hash,
txhashset_data: File,
peer_addr: PeerAddr,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
self.adapter.txhashset_write(h, txhashset_data, peer_addr)
self.adapter.txhashset_write(h, txhashset_data, peer_info)
}

fn txhashset_download_update(
Expand Down
66 changes: 30 additions & 36 deletions p2p/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use chrono::Duration;
use crate::peer::Peer;
use crate::store::{PeerData, PeerStore, State};
use crate::types::{
Capabilities, ChainAdapter, Direction, Error, NetAdapter, P2PConfig, PeerAddr, ReasonForBan,
Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan,
TxHashSetRead, MAX_PEER_ADDRS,
};

Expand Down Expand Up @@ -104,12 +104,10 @@ impl Peers {
}

pub fn outgoing_connected_peers(&self) -> Vec<Arc<Peer>> {
let peers = self.connected_peers();
let res = peers
self.connected_peers()
.into_iter()
.filter(|x| x.info.direction == Direction::Outbound)
.collect::<Vec<_>>();
res
.filter(|x| x.info.is_outbound())
.collect()
}

/// Get a peer we're connected to by address.
Expand All @@ -119,20 +117,12 @@ impl Peers {

/// Number of peers currently connected to.
pub fn peer_count(&self) -> u32 {
self.peers
.read()
.values()
.filter(|x| x.is_connected())
.count() as u32
self.connected_peers().len() as u32
}

/// Number of outbound peers currently connected to.
pub fn peer_outbound_count(&self) -> u32 {
self.peers
.read()
.values()
.filter(|x| x.is_connected() && x.info.is_outbound())
.count() as u32
self.outgoing_connected_peers().len() as u32
}

// Return vec of connected peers that currently advertise more work
Expand Down Expand Up @@ -498,8 +488,12 @@ impl ChainAdapter for Peers {
self.adapter.get_transaction(kernel_hash)
}

fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) -> Result<bool, chain::Error> {
self.adapter.tx_kernel_received(kernel_hash, addr)
fn tx_kernel_received(
&self,
kernel_hash: Hash,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
self.adapter.tx_kernel_received(kernel_hash, peer_info)
}

fn transaction_received(
Expand All @@ -513,18 +507,18 @@ impl ChainAdapter for Peers {
fn block_received(
&self,
b: core::Block,
peer_addr: PeerAddr,
peer_info: &PeerInfo,
was_requested: bool,
) -> Result<bool, chain::Error> {
let hash = b.hash();
if !self.adapter.block_received(b, peer_addr, was_requested)? {
if !self.adapter.block_received(b, peer_info, was_requested)? {
// if the peer sent us a block that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban
debug!(
"Received a bad block {} from {}, the peer will be banned",
hash, peer_addr
hash, peer_info.addr,
);
self.ban_peer(peer_addr, ReasonForBan::BadBlock);
self.ban_peer(peer_info.addr, ReasonForBan::BadBlock);
Ok(false)
} else {
Ok(true)
Expand All @@ -534,17 +528,17 @@ impl ChainAdapter for Peers {
fn compact_block_received(
&self,
cb: core::CompactBlock,
peer_addr: PeerAddr,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
let hash = cb.hash();
if !self.adapter.compact_block_received(cb, peer_addr)? {
if !self.adapter.compact_block_received(cb, peer_info)? {
// if the peer sent us a block that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban
debug!(
"Received a bad compact block {} from {}, the peer will be banned",
hash, peer_addr
hash, peer_info.addr
);
self.ban_peer(peer_addr, ReasonForBan::BadCompactBlock);
self.ban_peer(peer_info.addr, ReasonForBan::BadCompactBlock);
Ok(false)
} else {
Ok(true)
Expand All @@ -554,12 +548,12 @@ impl ChainAdapter for Peers {
fn header_received(
&self,
bh: core::BlockHeader,
peer_addr: PeerAddr,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
if !self.adapter.header_received(bh, peer_addr)? {
if !self.adapter.header_received(bh, peer_info)? {
// if the peer sent us a block header that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban
self.ban_peer(peer_addr, ReasonForBan::BadBlockHeader);
self.ban_peer(peer_info.addr, ReasonForBan::BadBlockHeader);
Ok(false)
} else {
Ok(true)
Expand All @@ -569,12 +563,12 @@ impl ChainAdapter for Peers {
fn headers_received(
&self,
headers: &[core::BlockHeader],
peer_addr: PeerAddr,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
if !self.adapter.headers_received(headers, peer_addr)? {
if !self.adapter.headers_received(headers, peer_info)? {
// if the peer sent us a block header that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban
self.ban_peer(peer_addr, ReasonForBan::BadBlockHeader);
self.ban_peer(peer_info.addr, ReasonForBan::BadBlockHeader);
Ok(false)
} else {
Ok(true)
Expand All @@ -601,14 +595,14 @@ impl ChainAdapter for Peers {
&self,
h: Hash,
txhashset_data: File,
peer_addr: PeerAddr,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
if !self.adapter.txhashset_write(h, txhashset_data, peer_addr)? {
if !self.adapter.txhashset_write(h, txhashset_data, peer_info)? {
debug!(
"Received a bad txhashset data from {}, the peer will be banned",
&peer_addr
peer_info.addr
);
self.ban_peer(peer_addr, ReasonForBan::BadTxHashSet);
self.ban_peer(peer_info.addr, ReasonForBan::BadTxHashSet);
Ok(false)
} else {
Ok(true)
Expand Down
28 changes: 14 additions & 14 deletions p2p/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ use crate::msg::{
BanReason, GetPeerAddrs, Headers, Locator, PeerAddrs, Ping, Pong, TxHashSetArchive,
TxHashSetRequest, Type,
};
use crate::types::{Error, NetAdapter, PeerAddr};
use crate::types::{Error, NetAdapter, PeerInfo};

pub struct Protocol {
adapter: Arc<dyn NetAdapter>,
addr: PeerAddr,
peer_info: PeerInfo,
}

impl Protocol {
pub fn new(adapter: Arc<dyn NetAdapter>, addr: PeerAddr) -> Protocol {
Protocol { adapter, addr }
pub fn new(adapter: Arc<dyn NetAdapter>, peer_info: PeerInfo) -> Protocol {
Protocol { adapter, peer_info }
}
}

Expand All @@ -52,18 +52,18 @@ impl MessageHandler for Protocol {
// If we received a msg from a banned peer then log and drop it.
// If we are getting a lot of these then maybe we are not cleaning
// banned peers up correctly?
if adapter.is_banned(self.addr.clone()) {
if adapter.is_banned(self.peer_info.addr) {
debug!(
"handler: consume: peer {:?} banned, received: {:?}, dropping.",
self.addr, msg.header.msg_type,
self.peer_info.addr, msg.header.msg_type,
);
return Ok(None);
}

match msg.header.msg_type {
Type::Ping => {
let ping: Ping = msg.body()?;
adapter.peer_difficulty(self.addr, ping.total_difficulty, ping.height);
adapter.peer_difficulty(self.peer_info.addr, ping.total_difficulty, ping.height);

Ok(Some(Response::new(
Type::Pong,
Expand All @@ -77,7 +77,7 @@ impl MessageHandler for Protocol {

Type::Pong => {
let pong: Pong = msg.body()?;
adapter.peer_difficulty(self.addr, pong.total_difficulty, pong.height);
adapter.peer_difficulty(self.peer_info.addr, pong.total_difficulty, pong.height);
Ok(None)
}

Expand All @@ -93,7 +93,7 @@ impl MessageHandler for Protocol {
"handle_payload: received tx kernel: {}, msg_len: {}",
h, msg.header.msg_len
);
adapter.tx_kernel_received(h, self.addr)?;
adapter.tx_kernel_received(h, &self.peer_info)?;
Ok(None)
}

Expand Down Expand Up @@ -155,7 +155,7 @@ impl MessageHandler for Protocol {

// we can't know at this level whether we requested the block or not,
// the boolean should be properly set in higher level adapter
adapter.block_received(b, self.addr, false)?;
adapter.block_received(b, &self.peer_info, false)?;
Ok(None)
}

Expand All @@ -176,7 +176,7 @@ impl MessageHandler for Protocol {
);
let b: core::CompactBlock = msg.body()?;

adapter.compact_block_received(b, self.addr)?;
adapter.compact_block_received(b, &self.peer_info)?;
Ok(None)
}

Expand All @@ -197,7 +197,7 @@ impl MessageHandler for Protocol {
// we can go request it from some of our peers
Type::Header => {
let header: core::BlockHeader = msg.body()?;
adapter.header_received(header, self.addr)?;
adapter.header_received(header, &self.peer_info)?;
Ok(None)
}

Expand All @@ -217,7 +217,7 @@ impl MessageHandler for Protocol {
headers.push(header);
total_bytes_read += bytes_read;
}
adapter.headers_received(&headers, self.addr)?;
adapter.headers_received(&headers, &self.peer_info)?;
}

// Now check we read the correct total number of bytes off the stream.
Expand Down Expand Up @@ -335,7 +335,7 @@ impl MessageHandler for Protocol {
let tmp_zip = File::open(tmp)?;
let res = self
.adapter
.txhashset_write(sm_arch.hash, tmp_zip, self.addr)?;
.txhashset_write(sm_arch.hash, tmp_zip, &self.peer_info)?;

debug!(
"handle_payload: txhashset archive for {} at {}, DONE. Data Ok: {}",
Expand Down
Loading