Skip to content

Commit

Permalink
Return Result from methods of ChainAdapter (#2722)
Browse files Browse the repository at this point in the history
Most of the methods return nothing or bool which is used to decide if a
sender of a message should be banned or not. However underlying chain
implementation may fail so we need a way to reflect this fact in API.

Also it allows to reduce number of unwraps and makes the code more robust.
  • Loading branch information
hashmap authored Apr 8, 2019
1 parent cdc17c6 commit 94732b0
Show file tree
Hide file tree
Showing 13 changed files with 303 additions and 156 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ chrono = { version = "0.4.4", features = ["serde"] }

grin_core = { path = "../core", version = "1.0.3" }
grin_store = { path = "../store", version = "1.0.3" }
grin_chain = { path = "../chain", version = "1.0.3" }
grin_util = { path = "../util", version = "1.0.3" }

[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use lmdb_zero as lmdb;

#[macro_use]
extern crate grin_core as core;
use grin_chain as chain;
use grin_util as util;

#[macro_use]
Expand Down
43 changes: 33 additions & 10 deletions p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::fs::File;
use std::net::{Shutdown, TcpStream};
use std::sync::Arc;

use crate::chain;
use crate::conn;
use crate::core::core::hash::{Hash, Hashed};
use crate::core::pow::Difficulty;
Expand Down Expand Up @@ -513,24 +514,28 @@ impl TrackingAdapter {
}

impl ChainAdapter for TrackingAdapter {
fn total_difficulty(&self) -> Difficulty {
fn total_difficulty(&self) -> Result<Difficulty, chain::Error> {
self.adapter.total_difficulty()
}

fn total_height(&self) -> u64 {
fn total_height(&self) -> Result<u64, chain::Error> {
self.adapter.total_height()
}

fn get_transaction(&self, kernel_hash: Hash) -> Option<core::Transaction> {
self.adapter.get_transaction(kernel_hash)
}

fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) {
fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) -> Result<bool, chain::Error> {
self.push_recv(kernel_hash);
self.adapter.tx_kernel_received(kernel_hash, addr)
}

fn transaction_received(&self, tx: core::Transaction, stem: bool) {
fn transaction_received(
&self,
tx: core::Transaction,
stem: bool,
) -> Result<bool, chain::Error> {
// Do not track the tx hash for stem txs.
// Otherwise we fail to handle the subsequent fluff or embargo expiration
// correctly.
Expand All @@ -541,27 +546,40 @@ impl ChainAdapter for TrackingAdapter {
self.adapter.transaction_received(tx, stem)
}

fn block_received(&self, b: core::Block, addr: PeerAddr, _was_requested: bool) -> bool {
fn block_received(
&self,
b: core::Block,
addr: PeerAddr,
_was_requested: bool,
) -> Result<bool, chain::Error> {
let bh = b.hash();
self.push_recv(bh);
self.adapter.block_received(b, addr, self.has_req(bh))
}

fn compact_block_received(&self, cb: core::CompactBlock, addr: PeerAddr) -> bool {
fn compact_block_received(
&self,
cb: core::CompactBlock,
addr: PeerAddr,
) -> Result<bool, chain::Error> {
self.push_recv(cb.hash());
self.adapter.compact_block_received(cb, addr)
}

fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> bool {
fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> Result<bool, chain::Error> {
self.push_recv(bh.hash());
self.adapter.header_received(bh, addr)
}

fn headers_received(&self, bh: &[core::BlockHeader], addr: PeerAddr) -> bool {
fn headers_received(
&self,
bh: &[core::BlockHeader],
addr: PeerAddr,
) -> Result<bool, chain::Error> {
self.adapter.headers_received(bh, addr)
}

fn locate_headers(&self, locator: &[Hash]) -> Vec<core::BlockHeader> {
fn locate_headers(&self, locator: &[Hash]) -> Result<Vec<core::BlockHeader>, chain::Error> {
self.adapter.locate_headers(locator)
}

Expand All @@ -577,7 +595,12 @@ impl ChainAdapter for TrackingAdapter {
self.adapter.txhashset_receive_ready()
}

fn txhashset_write(&self, h: Hash, txhashset_data: File, peer_addr: PeerAddr) -> bool {
fn txhashset_write(
&self,
h: Hash,
txhashset_data: File,
peer_addr: PeerAddr,
) -> Result<bool, chain::Error> {
self.adapter.txhashset_write(h, txhashset_data, peer_addr)
}

Expand Down
116 changes: 77 additions & 39 deletions p2p/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;

use rand::{thread_rng, Rng};

use crate::chain;
use crate::core::core;
use crate::core::core::hash::{Hash, Hashed};
use crate::core::global;
Expand Down Expand Up @@ -171,42 +172,48 @@ impl Peers {

// Return vec of connected peers that currently advertise more work
// (total_difficulty) than we do.
pub fn more_work_peers(&self) -> Vec<Arc<Peer>> {
pub fn more_work_peers(&self) -> Result<Vec<Arc<Peer>>, chain::Error> {
let peers = self.connected_peers();
if peers.len() == 0 {
return vec![];
return Ok(vec![]);
}

let total_difficulty = self.total_difficulty();
let total_difficulty = self.total_difficulty()?;

let mut max_peers = peers
.into_iter()
.filter(|x| x.info.total_difficulty() > total_difficulty)
.collect::<Vec<_>>();

thread_rng().shuffle(&mut max_peers);
max_peers
Ok(max_peers)
}

// Return number of connected peers that currently advertise more/same work
// (total_difficulty) than/as we do.
pub fn more_or_same_work_peers(&self) -> usize {
pub fn more_or_same_work_peers(&self) -> Result<usize, chain::Error> {
let peers = self.connected_peers();
if peers.len() == 0 {
return 0;
return Ok(0);
}

let total_difficulty = self.total_difficulty();
let total_difficulty = self.total_difficulty()?;

peers
Ok(peers
.iter()
.filter(|x| x.info.total_difficulty() >= total_difficulty)
.count()
.count())
}

/// Returns single random peer with more work than us.
pub fn more_work_peer(&self) -> Option<Arc<Peer>> {
self.more_work_peers().pop()
match self.more_work_peers() {
Ok(mut peers) => peers.pop(),
Err(e) => {
error!("failed to get more work peers: {:?}", e);
None
}
}
}

/// Return vec of connected peers that currently have the most worked
Expand Down Expand Up @@ -452,10 +459,15 @@ impl Peers {
rm.push(peer.info.addr.clone());
} else {
let (stuck, diff) = peer.is_stuck();
if stuck && diff < self.adapter.total_difficulty() {
debug!("clean_peers {:?}, stuck peer", peer.info.addr);
let _ = self.update_state(peer.info.addr, State::Defunct);
rm.push(peer.info.addr.clone());
match self.adapter.total_difficulty() {
Ok(total_difficulty) => {
if stuck && diff < total_difficulty {
debug!("clean_peers {:?}, stuck peer", peer.info.addr);
let _ = self.update_state(peer.info.addr, State::Defunct);
rm.push(peer.info.addr.clone());
}
}
Err(e) => error!("failed to get total difficulty: {:?}", e),
}
}
}
Expand Down Expand Up @@ -529,81 +541,102 @@ impl Peers {
}

impl ChainAdapter for Peers {
fn total_difficulty(&self) -> Difficulty {
fn total_difficulty(&self) -> Result<Difficulty, chain::Error> {
self.adapter.total_difficulty()
}

fn total_height(&self) -> u64 {
fn total_height(&self) -> Result<u64, chain::Error> {
self.adapter.total_height()
}

fn get_transaction(&self, kernel_hash: Hash) -> Option<core::Transaction> {
self.adapter.get_transaction(kernel_hash)
}

fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) {
fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) -> Result<bool, chain::Error> {
self.adapter.tx_kernel_received(kernel_hash, addr)
}

fn transaction_received(&self, tx: core::Transaction, stem: bool) {
fn transaction_received(
&self,
tx: core::Transaction,
stem: bool,
) -> Result<bool, chain::Error> {
self.adapter.transaction_received(tx, stem)
}

fn block_received(&self, b: core::Block, peer_addr: PeerAddr, was_requested: bool) -> bool {
fn block_received(
&self,
b: core::Block,
peer_addr: PeerAddr,
was_requested: bool,
) -> Result<bool, chain::Error> {
let hash = b.hash();
if !self.adapter.block_received(b, peer_addr, was_requested) {
if !self.adapter.block_received(b, peer_addr, was_requested)? {
// if the peer sent us a block that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban
debug!(
"Received a bad block {} from {}, the peer will be banned",
hash, peer_addr
);
self.ban_peer(peer_addr, ReasonForBan::BadBlock);
false
Ok(false)
} else {
true
Ok(true)
}
}

fn compact_block_received(&self, cb: core::CompactBlock, peer_addr: PeerAddr) -> bool {
fn compact_block_received(
&self,
cb: core::CompactBlock,
peer_addr: PeerAddr,
) -> Result<bool, chain::Error> {
let hash = cb.hash();
if !self.adapter.compact_block_received(cb, peer_addr) {
if !self.adapter.compact_block_received(cb, peer_addr)? {
// if the peer sent us a block that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban
debug!(
"Received a bad compact block {} from {}, the peer will be banned",
hash, peer_addr
);
self.ban_peer(peer_addr, ReasonForBan::BadCompactBlock);
false
Ok(false)
} else {
true
Ok(true)
}
}

fn header_received(&self, bh: core::BlockHeader, peer_addr: PeerAddr) -> bool {
if !self.adapter.header_received(bh, peer_addr) {
fn header_received(
&self,
bh: core::BlockHeader,
peer_addr: PeerAddr,
) -> Result<bool, chain::Error> {
if !self.adapter.header_received(bh, peer_addr)? {
// if the peer sent us a block header that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban
self.ban_peer(peer_addr, ReasonForBan::BadBlockHeader);
false
Ok(false)
} else {
true
Ok(true)
}
}

fn headers_received(&self, headers: &[core::BlockHeader], peer_addr: PeerAddr) -> bool {
if !self.adapter.headers_received(headers, peer_addr) {
fn headers_received(
&self,
headers: &[core::BlockHeader],
peer_addr: PeerAddr,
) -> Result<bool, chain::Error> {
if !self.adapter.headers_received(headers, peer_addr)? {
// if the peer sent us a block header that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban
self.ban_peer(peer_addr, ReasonForBan::BadBlockHeader);
false
Ok(false)
} else {
true
Ok(true)
}
}

fn locate_headers(&self, hs: &[Hash]) -> Vec<core::BlockHeader> {
fn locate_headers(&self, hs: &[Hash]) -> Result<Vec<core::BlockHeader>, chain::Error> {
self.adapter.locate_headers(hs)
}

Expand All @@ -619,16 +652,21 @@ impl ChainAdapter for Peers {
self.adapter.txhashset_receive_ready()
}

fn txhashset_write(&self, h: Hash, txhashset_data: File, peer_addr: PeerAddr) -> bool {
if !self.adapter.txhashset_write(h, txhashset_data, peer_addr) {
fn txhashset_write(
&self,
h: Hash,
txhashset_data: File,
peer_addr: PeerAddr,
) -> Result<bool, chain::Error> {
if !self.adapter.txhashset_write(h, txhashset_data, peer_addr)? {
debug!(
"Received a bad txhashset data from {}, the peer will be banned",
&peer_addr
);
self.ban_peer(peer_addr, ReasonForBan::BadTxHashSet);
false
Ok(false)
} else {
true
Ok(true)
}
}

Expand Down
Loading

0 comments on commit 94732b0

Please sign in to comment.