Skip to content

Commit

Permalink
TxHashSet Download Improvement (#2984)
Browse files Browse the repository at this point in the history
* Ban on cannot get block header in txhashset_write

* Rusfmt

* Fix typo

* Missing error handling

* Rustfmt

* Only accept txhashset from corresponding peer

* Switch to AtomicBool instead of RwLock<bool>

* Rustfmt
  • Loading branch information
quentinlesceller authored and antiochp committed Aug 1, 2019
1 parent 4bd3aa1 commit f79d05b
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 21 deletions.
13 changes: 10 additions & 3 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ impl Chain {
h: Hash,
txhashset_data: File,
status: &dyn TxHashsetWriteStatus,
) -> Result<(), Error> {
) -> Result<bool, Error> {
status.on_setup();

// Initial check whether this txhashset is needed or not
Expand All @@ -879,7 +879,14 @@ impl Chain {
return Err(ErrorKind::InvalidTxHashSet("not needed".to_owned()).into());
}

let header = self.get_block_header(&h)?;
let header = match self.get_block_header(&h) {
Ok(header) => header,
Err(_) => {
warn!("txhashset_write: cannot find block header");
// This is a bannable reason
return Ok(true);
}
};

// Write txhashset to sandbox (in the Grin specific tmp dir)
let sandbox_dir = self.get_tmp_dir();
Expand Down Expand Up @@ -977,7 +984,7 @@ impl Chain {
self.check_orphans(header.height + 1);

status.on_done();
Ok(())
Ok(false)
}

/// Cleanup old blocks from the db.
Expand Down
12 changes: 11 additions & 1 deletion p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::fs::File;
use std::io::Read;
use std::net::{Shutdown, TcpStream};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use crate::chain;
Expand Down Expand Up @@ -60,6 +61,8 @@ pub struct Peer {
// because it may be locked by different reasons, so we should wait for that, close
// mutex can be taken only during shutdown, it happens once
stop_handle: Mutex<conn::StopHandle>,
// Whether or not we requested a txhashset from this peer
state_sync_requested: Arc<AtomicBool>,
}

impl fmt::Debug for Peer {
Expand All @@ -72,8 +75,13 @@ impl Peer {
// Only accept and connect can be externally used to build a peer
fn new(info: PeerInfo, conn: TcpStream, adapter: Arc<dyn NetAdapter>) -> std::io::Result<Peer> {
let state = Arc::new(RwLock::new(State::Connected));
let state_sync_requested = Arc::new(AtomicBool::new(false));
let tracking_adapter = TrackingAdapter::new(adapter);
let handler = Protocol::new(Arc::new(tracking_adapter.clone()), info.clone());
let handler = Protocol::new(
Arc::new(tracking_adapter.clone()),
info.clone(),
state_sync_requested.clone(),
);
let tracker = Arc::new(conn::Tracker::new());
let (sendh, stoph) = conn::listen(conn, info.version, tracker.clone(), handler)?;
let send_handle = Mutex::new(sendh);
Expand All @@ -85,6 +93,7 @@ impl Peer {
tracker,
send_handle,
stop_handle,
state_sync_requested,
})
}

Expand Down Expand Up @@ -387,6 +396,7 @@ impl Peer {
"Asking {} for txhashset archive at {} {}.",
self.info.addr, height, hash
);
self.state_sync_requested.store(true, Ordering::Relaxed);
self.send(
&TxHashSetRequest { hash, height },
msg::Type::TxHashSetRequest,
Expand Down
6 changes: 3 additions & 3 deletions p2p/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,15 +688,15 @@ impl ChainAdapter for Peers {
txhashset_data: File,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
if !self.adapter.txhashset_write(h, txhashset_data, peer_info)? {
if self.adapter.txhashset_write(h, txhashset_data, peer_info)? {
debug!(
"Received a bad txhashset data from {}, the peer will be banned",
peer_info.addr
);
self.ban_peer(peer_info.addr, ReasonForBan::BadTxHashSet);
Ok(false)
} else {
Ok(true)
} else {
Ok(false)
}
}

Expand Down
22 changes: 19 additions & 3 deletions p2p/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,28 @@ use rand::{thread_rng, Rng};
use std::cmp;
use std::fs::{self, File, OpenOptions};
use std::io::{BufWriter, Seek, SeekFrom, Write};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Instant;
use tempfile::tempfile;

pub struct Protocol {
adapter: Arc<dyn NetAdapter>,
peer_info: PeerInfo,
state_sync_requested: Arc<AtomicBool>,
}

impl Protocol {
pub fn new(adapter: Arc<dyn NetAdapter>, peer_info: PeerInfo) -> Protocol {
Protocol { adapter, peer_info }
pub fn new(
adapter: Arc<dyn NetAdapter>,
peer_info: PeerInfo,
state_sync_requested: Arc<AtomicBool>,
) -> Protocol {
Protocol {
adapter,
peer_info,
state_sync_requested,
}
}
}

Expand Down Expand Up @@ -356,6 +366,12 @@ impl MessageHandler for Protocol {
);
return Err(Error::BadMessage);
}
if !self.state_sync_requested.load(Ordering::Relaxed) {
error!("handle_payload: txhashset archive received but from the wrong peer",);
return Err(Error::BadMessage);
}
// Update the sync state requested status
self.state_sync_requested.store(false, Ordering::Relaxed);

let download_start_time = Utc::now();
self.adapter
Expand Down Expand Up @@ -425,7 +441,7 @@ impl MessageHandler for Protocol {

debug!(
"handle_payload: txhashset archive for {} at {}, DONE. Data Ok: {}",
sm_arch.hash, sm_arch.height, res
sm_arch.hash, sm_arch.height, !res
);

if let Err(e) = fs::remove_file(tmp.clone()) {
Expand Down
31 changes: 20 additions & 11 deletions servers/src/common/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,22 +419,31 @@ impl p2p::ChainAdapter for NetToChainAdapter {
// check status again after download, in case 2 txhashsets made it somehow
if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() {
} else {
return Ok(true);
return Ok(false);
}

if let Err(e) = self
match self
.chain()
.txhashset_write(h, txhashset_data, self.sync_state.as_ref())
{
self.chain().clean_txhashset_sandbox();
error!("Failed to save txhashset archive: {}", e);

let is_good_data = !e.is_bad_data();
self.sync_state.set_sync_error(e);
Ok(is_good_data)
} else {
info!("Received valid txhashset data for {}.", h);
Ok(true)
Ok(is_bad_data) => {
if is_bad_data {
self.chain().clean_txhashset_sandbox();
error!("Failed to save txhashset archive: bad data");
self.sync_state.set_sync_error(
chain::ErrorKind::TxHashSetErr("bad txhashset data".to_string()).into(),
);
} else {
info!("Received valid txhashset data for {}.", h);
}
Ok(is_bad_data)
}
Err(e) => {
self.chain().clean_txhashset_sandbox();
error!("Failed to save txhashset archive: {}", e);
self.sync_state.set_sync_error(e);
Ok(false)
}
}
}

Expand Down

0 comments on commit f79d05b

Please sign in to comment.