From f79d05ba5305ab805d6004dc095a648e4205624a Mon Sep 17 00:00:00 2001 From: Quentin Le Sceller Date: Thu, 1 Aug 2019 18:46:06 +0200 Subject: [PATCH] TxHashSet Download Improvement (#2984) * Ban on cannot get block header in txhashset_write * Rusfmt * Fix typo * Missing error handling * Rustfmt * Only accept txhashset from corresponding peer * Switch to AtomicBool instead of RwLock * Rustfmt --- chain/src/chain.rs | 13 ++++++++++--- p2p/src/peer.rs | 12 +++++++++++- p2p/src/peers.rs | 6 +++--- p2p/src/protocol.rs | 22 +++++++++++++++++++--- servers/src/common/adapters.rs | 31 ++++++++++++++++++++----------- 5 files changed, 63 insertions(+), 21 deletions(-) diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 51f5b45d1d..7640cd9fde 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -869,7 +869,7 @@ impl Chain { h: Hash, txhashset_data: File, status: &dyn TxHashsetWriteStatus, - ) -> Result<(), Error> { + ) -> Result { status.on_setup(); // Initial check whether this txhashset is needed or not @@ -879,7 +879,14 @@ impl Chain { return Err(ErrorKind::InvalidTxHashSet("not needed".to_owned()).into()); } - let header = self.get_block_header(&h)?; + let header = match self.get_block_header(&h) { + Ok(header) => header, + Err(_) => { + warn!("txhashset_write: cannot find block header"); + // This is a bannable reason + return Ok(true); + } + }; // Write txhashset to sandbox (in the Grin specific tmp dir) let sandbox_dir = self.get_tmp_dir(); @@ -977,7 +984,7 @@ impl Chain { self.check_orphans(header.height + 1); status.on_done(); - Ok(()) + Ok(false) } /// Cleanup old blocks from the db. diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index cedbd193d8..6927ce5652 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -18,6 +18,7 @@ use std::fs::File; use std::io::Read; use std::net::{Shutdown, TcpStream}; use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use crate::chain; @@ -60,6 +61,8 @@ pub struct Peer { // because it may be locked by different reasons, so we should wait for that, close // mutex can be taken only during shutdown, it happens once stop_handle: Mutex, + // Whether or not we requested a txhashset from this peer + state_sync_requested: Arc, } impl fmt::Debug for Peer { @@ -72,8 +75,13 @@ impl Peer { // Only accept and connect can be externally used to build a peer fn new(info: PeerInfo, conn: TcpStream, adapter: Arc) -> std::io::Result { let state = Arc::new(RwLock::new(State::Connected)); + let state_sync_requested = Arc::new(AtomicBool::new(false)); let tracking_adapter = TrackingAdapter::new(adapter); - let handler = Protocol::new(Arc::new(tracking_adapter.clone()), info.clone()); + let handler = Protocol::new( + Arc::new(tracking_adapter.clone()), + info.clone(), + state_sync_requested.clone(), + ); let tracker = Arc::new(conn::Tracker::new()); let (sendh, stoph) = conn::listen(conn, info.version, tracker.clone(), handler)?; let send_handle = Mutex::new(sendh); @@ -85,6 +93,7 @@ impl Peer { tracker, send_handle, stop_handle, + state_sync_requested, }) } @@ -387,6 +396,7 @@ impl Peer { "Asking {} for txhashset archive at {} {}.", self.info.addr, height, hash ); + self.state_sync_requested.store(true, Ordering::Relaxed); self.send( &TxHashSetRequest { hash, height }, msg::Type::TxHashSetRequest, diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index a2fc7c51b6..5c1ec247ac 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -688,15 +688,15 @@ impl ChainAdapter for Peers { txhashset_data: File, peer_info: &PeerInfo, ) -> Result { - if !self.adapter.txhashset_write(h, txhashset_data, peer_info)? { + if self.adapter.txhashset_write(h, txhashset_data, peer_info)? { debug!( "Received a bad txhashset data from {}, the peer will be banned", peer_info.addr ); self.ban_peer(peer_info.addr, ReasonForBan::BadTxHashSet); - Ok(false) - } else { Ok(true) + } else { + Ok(false) } } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 9e08266fc4..87f4da1b44 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -25,6 +25,7 @@ use rand::{thread_rng, Rng}; use std::cmp; use std::fs::{self, File, OpenOptions}; use std::io::{BufWriter, Seek, SeekFrom, Write}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Instant; use tempfile::tempfile; @@ -32,11 +33,20 @@ use tempfile::tempfile; pub struct Protocol { adapter: Arc, peer_info: PeerInfo, + state_sync_requested: Arc, } impl Protocol { - pub fn new(adapter: Arc, peer_info: PeerInfo) -> Protocol { - Protocol { adapter, peer_info } + pub fn new( + adapter: Arc, + peer_info: PeerInfo, + state_sync_requested: Arc, + ) -> Protocol { + Protocol { + adapter, + peer_info, + state_sync_requested, + } } } @@ -356,6 +366,12 @@ impl MessageHandler for Protocol { ); return Err(Error::BadMessage); } + if !self.state_sync_requested.load(Ordering::Relaxed) { + error!("handle_payload: txhashset archive received but from the wrong peer",); + return Err(Error::BadMessage); + } + // Update the sync state requested status + self.state_sync_requested.store(false, Ordering::Relaxed); let download_start_time = Utc::now(); self.adapter @@ -425,7 +441,7 @@ impl MessageHandler for Protocol { debug!( "handle_payload: txhashset archive for {} at {}, DONE. Data Ok: {}", - sm_arch.hash, sm_arch.height, res + sm_arch.hash, sm_arch.height, !res ); if let Err(e) = fs::remove_file(tmp.clone()) { diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index 3a2c9502f9..698850d466 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -419,22 +419,31 @@ impl p2p::ChainAdapter for NetToChainAdapter { // check status again after download, in case 2 txhashsets made it somehow if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() { } else { - return Ok(true); + return Ok(false); } - if let Err(e) = self + match self .chain() .txhashset_write(h, txhashset_data, self.sync_state.as_ref()) { - self.chain().clean_txhashset_sandbox(); - error!("Failed to save txhashset archive: {}", e); - - let is_good_data = !e.is_bad_data(); - self.sync_state.set_sync_error(e); - Ok(is_good_data) - } else { - info!("Received valid txhashset data for {}.", h); - Ok(true) + Ok(is_bad_data) => { + if is_bad_data { + self.chain().clean_txhashset_sandbox(); + error!("Failed to save txhashset archive: bad data"); + self.sync_state.set_sync_error( + chain::ErrorKind::TxHashSetErr("bad txhashset data".to_string()).into(), + ); + } else { + info!("Received valid txhashset data for {}.", h); + } + Ok(is_bad_data) + } + Err(e) => { + self.chain().clean_txhashset_sandbox(); + error!("Failed to save txhashset archive: {}", e); + self.sync_state.set_sync_error(e); + Ok(false) + } } }