Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return Result from methods of ChainAdapter #2722

Merged
merged 3 commits into from
Apr 8, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ chrono = { version = "0.4.4", features = ["serde"] }

grin_core = { path = "../core", version = "1.0.3" }
grin_store = { path = "../store", version = "1.0.3" }
grin_chain = { path = "../chain", version = "1.0.3" }
grin_util = { path = "../util", version = "1.0.3" }

[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use lmdb_zero as lmdb;

#[macro_use]
extern crate grin_core as core;
use grin_chain as chain;
use grin_util as util;

#[macro_use]
Expand Down
43 changes: 33 additions & 10 deletions p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::fs::File;
use std::net::{Shutdown, TcpStream};
use std::sync::Arc;

use crate::chain;
use crate::conn;
use crate::core::core::hash::{Hash, Hashed};
use crate::core::pow::Difficulty;
Expand Down Expand Up @@ -513,24 +514,28 @@ impl TrackingAdapter {
}

impl ChainAdapter for TrackingAdapter {
fn total_difficulty(&self) -> Difficulty {
fn total_difficulty(&self) -> Result<Difficulty, chain::Error> {
self.adapter.total_difficulty()
}

fn total_height(&self) -> u64 {
fn total_height(&self) -> Result<u64, chain::Error> {
self.adapter.total_height()
}

fn get_transaction(&self, kernel_hash: Hash) -> Option<core::Transaction> {
self.adapter.get_transaction(kernel_hash)
}

fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) {
fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) -> Result<bool, chain::Error> {
self.push_recv(kernel_hash);
self.adapter.tx_kernel_received(kernel_hash, addr)
}

fn transaction_received(&self, tx: core::Transaction, stem: bool) {
fn transaction_received(
&self,
tx: core::Transaction,
stem: bool,
) -> Result<bool, chain::Error> {
// Do not track the tx hash for stem txs.
// Otherwise we fail to handle the subsequent fluff or embargo expiration
// correctly.
Expand All @@ -541,27 +546,40 @@ impl ChainAdapter for TrackingAdapter {
self.adapter.transaction_received(tx, stem)
}

fn block_received(&self, b: core::Block, addr: PeerAddr, _was_requested: bool) -> bool {
fn block_received(
&self,
b: core::Block,
addr: PeerAddr,
_was_requested: bool,
) -> Result<bool, chain::Error> {
let bh = b.hash();
self.push_recv(bh);
self.adapter.block_received(b, addr, self.has_req(bh))
}

fn compact_block_received(&self, cb: core::CompactBlock, addr: PeerAddr) -> bool {
fn compact_block_received(
&self,
cb: core::CompactBlock,
addr: PeerAddr,
) -> Result<bool, chain::Error> {
self.push_recv(cb.hash());
self.adapter.compact_block_received(cb, addr)
}

fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> bool {
fn header_received(&self, bh: core::BlockHeader, addr: PeerAddr) -> Result<bool, chain::Error> {
self.push_recv(bh.hash());
self.adapter.header_received(bh, addr)
}

fn headers_received(&self, bh: &[core::BlockHeader], addr: PeerAddr) -> bool {
fn headers_received(
&self,
bh: &[core::BlockHeader],
addr: PeerAddr,
) -> Result<bool, chain::Error> {
self.adapter.headers_received(bh, addr)
}

fn locate_headers(&self, locator: &[Hash]) -> Vec<core::BlockHeader> {
fn locate_headers(&self, locator: &[Hash]) -> Result<Vec<core::BlockHeader>, chain::Error> {
self.adapter.locate_headers(locator)
}

Expand All @@ -577,7 +595,12 @@ impl ChainAdapter for TrackingAdapter {
self.adapter.txhashset_receive_ready()
}

fn txhashset_write(&self, h: Hash, txhashset_data: File, peer_addr: PeerAddr) -> bool {
fn txhashset_write(
&self,
h: Hash,
txhashset_data: File,
peer_addr: PeerAddr,
) -> Result<bool, chain::Error> {
self.adapter.txhashset_write(h, txhashset_data, peer_addr)
}

Expand Down
116 changes: 77 additions & 39 deletions p2p/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;

use rand::{thread_rng, Rng};

use crate::chain;
use crate::core::core;
use crate::core::core::hash::{Hash, Hashed};
use crate::core::global;
Expand Down Expand Up @@ -171,42 +172,48 @@ impl Peers {

// Return vec of connected peers that currently advertise more work
// (total_difficulty) than we do.
pub fn more_work_peers(&self) -> Vec<Arc<Peer>> {
pub fn more_work_peers(&self) -> Result<Vec<Arc<Peer>>, chain::Error> {
let peers = self.connected_peers();
if peers.len() == 0 {
return vec![];
return Ok(vec![]);
}

let total_difficulty = self.total_difficulty();
let total_difficulty = self.total_difficulty()?;

let mut max_peers = peers
.into_iter()
.filter(|x| x.info.total_difficulty() > total_difficulty)
.collect::<Vec<_>>();

thread_rng().shuffle(&mut max_peers);
max_peers
Ok(max_peers)
}

// Return number of connected peers that currently advertise more/same work
// (total_difficulty) than/as we do.
pub fn more_or_same_work_peers(&self) -> usize {
pub fn more_or_same_work_peers(&self) -> Result<usize, chain::Error> {
let peers = self.connected_peers();
if peers.len() == 0 {
return 0;
return Ok(0);
}

let total_difficulty = self.total_difficulty();
let total_difficulty = self.total_difficulty()?;

peers
Ok(peers
.iter()
.filter(|x| x.info.total_difficulty() >= total_difficulty)
.count()
.count())
}

/// Returns single random peer with more work than us.
pub fn more_work_peer(&self) -> Option<Arc<Peer>> {
self.more_work_peers().pop()
match self.more_work_peers() {
Ok(mut peers) => peers.pop(),
Err(e) => {
error!("failed to get more work peers: {:?}", e);
None
}
}
}

/// Return vec of connected peers that currently have the most worked
Expand Down Expand Up @@ -452,10 +459,15 @@ impl Peers {
rm.push(peer.info.addr.clone());
} else {
let (stuck, diff) = peer.is_stuck();
if stuck && diff < self.adapter.total_difficulty() {
debug!("clean_peers {:?}, stuck peer", peer.info.addr);
let _ = self.update_state(peer.info.addr, State::Defunct);
rm.push(peer.info.addr.clone());
match self.adapter.total_difficulty() {
Ok(total_difficulty) => {
if stuck && diff < total_difficulty {
debug!("clean_peers {:?}, stuck peer", peer.info.addr);
let _ = self.update_state(peer.info.addr, State::Defunct);
rm.push(peer.info.addr.clone());
}
}
Err(e) => error!("failed to get total difficulty: {:?}", e),
}
}
}
Expand Down Expand Up @@ -529,81 +541,102 @@ impl Peers {
}

impl ChainAdapter for Peers {
fn total_difficulty(&self) -> Difficulty {
fn total_difficulty(&self) -> Result<Difficulty, chain::Error> {
self.adapter.total_difficulty()
}

fn total_height(&self) -> u64 {
fn total_height(&self) -> Result<u64, chain::Error> {
self.adapter.total_height()
}

fn get_transaction(&self, kernel_hash: Hash) -> Option<core::Transaction> {
self.adapter.get_transaction(kernel_hash)
}

fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) {
fn tx_kernel_received(&self, kernel_hash: Hash, addr: PeerAddr) -> Result<bool, chain::Error> {
self.adapter.tx_kernel_received(kernel_hash, addr)
}

fn transaction_received(&self, tx: core::Transaction, stem: bool) {
fn transaction_received(
&self,
tx: core::Transaction,
stem: bool,
) -> Result<bool, chain::Error> {
self.adapter.transaction_received(tx, stem)
}

fn block_received(&self, b: core::Block, peer_addr: PeerAddr, was_requested: bool) -> bool {
fn block_received(
&self,
b: core::Block,
peer_addr: PeerAddr,
was_requested: bool,
) -> Result<bool, chain::Error> {
let hash = b.hash();
if !self.adapter.block_received(b, peer_addr, was_requested) {
if !self.adapter.block_received(b, peer_addr, was_requested)? {
// if the peer sent us a block that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban
debug!(
"Received a bad block {} from {}, the peer will be banned",
hash, peer_addr
);
self.ban_peer(peer_addr, ReasonForBan::BadBlock);
false
Ok(false)
} else {
true
Ok(true)
}
}

fn compact_block_received(&self, cb: core::CompactBlock, peer_addr: PeerAddr) -> bool {
fn compact_block_received(
&self,
cb: core::CompactBlock,
peer_addr: PeerAddr,
) -> Result<bool, chain::Error> {
let hash = cb.hash();
if !self.adapter.compact_block_received(cb, peer_addr) {
if !self.adapter.compact_block_received(cb, peer_addr)? {
// if the peer sent us a block that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban
debug!(
"Received a bad compact block {} from {}, the peer will be banned",
hash, peer_addr
);
self.ban_peer(peer_addr, ReasonForBan::BadCompactBlock);
false
Ok(false)
} else {
true
Ok(true)
}
}

fn header_received(&self, bh: core::BlockHeader, peer_addr: PeerAddr) -> bool {
if !self.adapter.header_received(bh, peer_addr) {
fn header_received(
&self,
bh: core::BlockHeader,
peer_addr: PeerAddr,
) -> Result<bool, chain::Error> {
if !self.adapter.header_received(bh, peer_addr)? {
// if the peer sent us a block header that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban
self.ban_peer(peer_addr, ReasonForBan::BadBlockHeader);
false
Ok(false)
} else {
true
Ok(true)
}
}

fn headers_received(&self, headers: &[core::BlockHeader], peer_addr: PeerAddr) -> bool {
if !self.adapter.headers_received(headers, peer_addr) {
fn headers_received(
&self,
headers: &[core::BlockHeader],
peer_addr: PeerAddr,
) -> Result<bool, chain::Error> {
if !self.adapter.headers_received(headers, peer_addr)? {
// if the peer sent us a block header that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban
self.ban_peer(peer_addr, ReasonForBan::BadBlockHeader);
false
Ok(false)
} else {
true
Ok(true)
}
}

fn locate_headers(&self, hs: &[Hash]) -> Vec<core::BlockHeader> {
fn locate_headers(&self, hs: &[Hash]) -> Result<Vec<core::BlockHeader>, chain::Error> {
self.adapter.locate_headers(hs)
}

Expand All @@ -619,16 +652,21 @@ impl ChainAdapter for Peers {
self.adapter.txhashset_receive_ready()
}

fn txhashset_write(&self, h: Hash, txhashset_data: File, peer_addr: PeerAddr) -> bool {
if !self.adapter.txhashset_write(h, txhashset_data, peer_addr) {
fn txhashset_write(
&self,
h: Hash,
txhashset_data: File,
peer_addr: PeerAddr,
) -> Result<bool, chain::Error> {
if !self.adapter.txhashset_write(h, txhashset_data, peer_addr)? {
debug!(
"Received a bad txhashset data from {}, the peer will be banned",
&peer_addr
);
self.ban_peer(peer_addr, ReasonForBan::BadTxHashSet);
false
Ok(false)
} else {
true
Ok(true)
}
}

Expand Down
Loading