Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TxHashSet Download Improvement #2984

Merged
merged 8 commits into from
Aug 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ impl Chain {
h: Hash,
txhashset_data: File,
status: &dyn TxHashsetWriteStatus,
) -> Result<(), Error> {
) -> Result<bool, Error> {
status.on_setup();

// Initial check whether this txhashset is needed or not
Expand All @@ -879,7 +879,14 @@ impl Chain {
return Err(ErrorKind::InvalidTxHashSet("not needed".to_owned()).into());
}

let header = self.get_block_header(&h)?;
let header = match self.get_block_header(&h) {
Ok(header) => header,
Err(_) => {
warn!("txhashset_write: cannot find block header");
// This is a bannable reason
return Ok(true);
}
};

// Write txhashset to sandbox (in the Grin specific tmp dir)
let sandbox_dir = self.get_tmp_dir();
Expand Down Expand Up @@ -977,7 +984,7 @@ impl Chain {
self.check_orphans(header.height + 1);

status.on_done();
Ok(())
Ok(false)
}

/// Cleanup old blocks from the db.
Expand Down
12 changes: 11 additions & 1 deletion p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::fs::File;
use std::io::Read;
use std::net::{Shutdown, TcpStream};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use crate::chain;
Expand Down Expand Up @@ -60,6 +61,8 @@ pub struct Peer {
// because it may be locked by different reasons, so we should wait for that, close
// mutex can be taken only during shutdown, it happens once
stop_handle: Mutex<conn::StopHandle>,
// Whether or not we requested a txhashset from this peer
state_sync_requested: Arc<AtomicBool>,
}

impl fmt::Debug for Peer {
Expand All @@ -72,8 +75,13 @@ impl Peer {
// Only accept and connect can be externally used to build a peer
fn new(info: PeerInfo, conn: TcpStream, adapter: Arc<dyn NetAdapter>) -> std::io::Result<Peer> {
let state = Arc::new(RwLock::new(State::Connected));
let state_sync_requested = Arc::new(AtomicBool::new(false));
let tracking_adapter = TrackingAdapter::new(adapter);
let handler = Protocol::new(Arc::new(tracking_adapter.clone()), info.clone());
let handler = Protocol::new(
Arc::new(tracking_adapter.clone()),
info.clone(),
state_sync_requested.clone(),
);
let tracker = Arc::new(conn::Tracker::new());
let (sendh, stoph) = conn::listen(conn, info.version, tracker.clone(), handler)?;
let send_handle = Mutex::new(sendh);
Expand All @@ -85,6 +93,7 @@ impl Peer {
tracker,
send_handle,
stop_handle,
state_sync_requested,
})
}

Expand Down Expand Up @@ -387,6 +396,7 @@ impl Peer {
"Asking {} for txhashset archive at {} {}.",
self.info.addr, height, hash
);
self.state_sync_requested.store(true, Ordering::Relaxed);
self.send(
&TxHashSetRequest { hash, height },
msg::Type::TxHashSetRequest,
Expand Down
6 changes: 3 additions & 3 deletions p2p/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,15 +688,15 @@ impl ChainAdapter for Peers {
txhashset_data: File,
peer_info: &PeerInfo,
) -> Result<bool, chain::Error> {
if !self.adapter.txhashset_write(h, txhashset_data, peer_info)? {
if self.adapter.txhashset_write(h, txhashset_data, peer_info)? {
debug!(
"Received a bad txhashset data from {}, the peer will be banned",
peer_info.addr
);
self.ban_peer(peer_info.addr, ReasonForBan::BadTxHashSet);
Ok(false)
} else {
Ok(true)
} else {
Ok(false)
}
}

Expand Down
22 changes: 19 additions & 3 deletions p2p/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,28 @@ use rand::{thread_rng, Rng};
use std::cmp;
use std::fs::{self, File, OpenOptions};
use std::io::{BufWriter, Seek, SeekFrom, Write};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Instant;
use tempfile::tempfile;

pub struct Protocol {
adapter: Arc<dyn NetAdapter>,
peer_info: PeerInfo,
state_sync_requested: Arc<AtomicBool>,
}

impl Protocol {
pub fn new(adapter: Arc<dyn NetAdapter>, peer_info: PeerInfo) -> Protocol {
Protocol { adapter, peer_info }
pub fn new(
adapter: Arc<dyn NetAdapter>,
peer_info: PeerInfo,
state_sync_requested: Arc<AtomicBool>,
) -> Protocol {
Protocol {
adapter,
peer_info,
state_sync_requested,
}
}
}

Expand Down Expand Up @@ -356,6 +366,12 @@ impl MessageHandler for Protocol {
);
return Err(Error::BadMessage);
}
if !self.state_sync_requested.load(Ordering::Relaxed) {
error!("handle_payload: txhashset archive received but from the wrong peer",);
return Err(Error::BadMessage);
}
// Update the sync state requested status
self.state_sync_requested.store(false, Ordering::Relaxed);

let download_start_time = Utc::now();
self.adapter
Expand Down Expand Up @@ -425,7 +441,7 @@ impl MessageHandler for Protocol {

debug!(
"handle_payload: txhashset archive for {} at {}, DONE. Data Ok: {}",
sm_arch.hash, sm_arch.height, res
sm_arch.hash, sm_arch.height, !res
);

if let Err(e) = fs::remove_file(tmp.clone()) {
Expand Down
31 changes: 20 additions & 11 deletions servers/src/common/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,22 +419,31 @@ impl p2p::ChainAdapter for NetToChainAdapter {
// check status again after download, in case 2 txhashsets made it somehow
if let SyncStatus::TxHashsetDownload { .. } = self.sync_state.status() {
} else {
return Ok(true);
return Ok(false);
}

if let Err(e) = self
match self
.chain()
.txhashset_write(h, txhashset_data, self.sync_state.as_ref())
{
self.chain().clean_txhashset_sandbox();
error!("Failed to save txhashset archive: {}", e);

let is_good_data = !e.is_bad_data();
self.sync_state.set_sync_error(e);
Ok(is_good_data)
} else {
info!("Received valid txhashset data for {}.", h);
Ok(true)
Ok(is_bad_data) => {
if is_bad_data {
self.chain().clean_txhashset_sandbox();
error!("Failed to save txhashset archive: bad data");
self.sync_state.set_sync_error(
chain::ErrorKind::TxHashSetErr("bad txhashset data".to_string()).into(),
);
} else {
info!("Received valid txhashset data for {}.", h);
}
Ok(is_bad_data)
}
Err(e) => {
self.chain().clean_txhashset_sandbox();
error!("Failed to save txhashset archive: {}", e);
self.sync_state.set_sync_error(e);
Ok(false)
}
}
}

Expand Down