From 106f91b8ddc7a8cef098063a0a7114aad6634de2 Mon Sep 17 00:00:00 2001 From: hashmap Date: Mon, 6 May 2019 19:42:25 +0200 Subject: [PATCH 1/7] first pass --- p2p/src/conn.rs | 36 ++++++++++----- p2p/src/peer.rs | 12 ++--- servers/src/grin/dandelion_monitor.rs | 59 ++++++++++++++---------- servers/src/grin/seed.rs | 7 ++- servers/src/grin/server.rs | 64 ++++++++++++++++++++++----- servers/src/grin/sync/syncer.rs | 6 +-- src/bin/tui/ui.rs | 5 ++- 7 files changed, 129 insertions(+), 60 deletions(-) diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index e2e400a834..5591a7e30d 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -24,7 +24,11 @@ use std::fs::File; use std::io::{self, Read, Write}; use std::net::{Shutdown, TcpStream}; use std::sync::{mpsc, Arc}; -use std::{cmp, thread, time}; +use std::{ + cmp, + thread::{self, JoinHandle}, + time, +}; use crate::core::ser; use crate::core::ser::FixedLength; @@ -171,6 +175,8 @@ pub struct Tracker { pub send_channel: mpsc::SyncSender>, /// Channel to close the connection pub close_channel: mpsc::Sender<()>, + // we need Option to take ownhership of the handle in stop() + peer_thread: Option>, } impl Tracker { @@ -190,15 +196,23 @@ impl Tracker { } /// Schedule this connection to safely close via the async close_channel. - pub fn close(&self) { + pub fn close(&mut self) { let _ = self.close_channel.send(()); + if let Some(peer_thread) = self.peer_thread.take() { + // wait only if other thread is calling us, eg shutdown + if thread::current().id() != peer_thread.thread().id() { + if let Err(e) = peer_thread.join() { + error!("failed to stop peer thread: {:?}", e); + } + } + } } } /// Start listening on the provided connection and wraps it. Does not hang /// the current thread, instead just returns a future and the Connection /// itself. -pub fn listen(stream: TcpStream, handler: H) -> Tracker +pub fn listen(stream: TcpStream, handler: H) -> io::Result where H: MessageHandler, { @@ -213,21 +227,22 @@ where stream .set_nonblocking(true) .expect("Non-blocking IO not available."); - poll( + let peer_thread = poll( stream, handler, send_rx, close_rx, received_bytes.clone(), sent_bytes.clone(), - ); + )?; - Tracker { + Ok(Tracker { sent_bytes: sent_bytes.clone(), received_bytes: received_bytes.clone(), send_channel: send_tx, close_channel: close_tx, - } + peer_thread: Some(peer_thread), + }) } fn poll( @@ -237,14 +252,15 @@ fn poll( close_rx: mpsc::Receiver<()>, received_bytes: Arc>, sent_bytes: Arc>, -) where +) -> io::Result> +where H: MessageHandler, { // Split out tcp stream out into separate reader/writer halves. let mut reader = conn.try_clone().expect("clone conn for reader failed"); let mut writer = conn.try_clone().expect("clone conn for writer failed"); - let _ = thread::Builder::new() + thread::Builder::new() .name("peer".to_string()) .spawn(move || { let sleep_time = time::Duration::from_millis(5); @@ -299,5 +315,5 @@ fn poll( .unwrap_or("?".to_owned()) ); let _ = conn.shutdown(Shutdown::Both); - }); + }) } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 1b1df25d17..36eb58a7f7 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -62,17 +62,17 @@ impl fmt::Debug for Peer { impl Peer { // Only accept and connect can be externally used to build a peer - fn new(info: PeerInfo, conn: TcpStream, adapter: Arc) -> Peer { + fn new(info: PeerInfo, conn: TcpStream, adapter: Arc) -> std::io::Result { let state = Arc::new(RwLock::new(State::Connected)); let tracking_adapter = TrackingAdapter::new(adapter); let handler = Protocol::new(Arc::new(tracking_adapter.clone()), info.clone()); - let connection = Mutex::new(conn::listen(conn, handler)); - Peer { + let connection = Mutex::new(conn::listen(conn, handler)?); + Ok(Peer { info, state, tracking_adapter, connection, - } + }) } pub fn accept( @@ -85,7 +85,7 @@ impl Peer { debug!("accept: handshaking from {:?}", conn.peer_addr()); let info = hs.accept(capab, total_difficulty, &mut conn); match info { - Ok(info) => Ok(Peer::new(info, conn, adapter)), + Ok(info) => Ok(Peer::new(info, conn, adapter)?), Err(e) => { debug!( "accept: handshaking from {:?} failed with error: {:?}", @@ -111,7 +111,7 @@ impl Peer { debug!("connect: handshaking with {:?}", conn.peer_addr()); let info = hs.initiate(capab, total_difficulty, self_addr, &mut conn); match info { - Ok(info) => Ok(Peer::new(info, conn, adapter)), + Ok(info) => Ok(Peer::new(info, conn, adapter)?), Err(e) => { debug!( "connect: handshaking with {:?} failed with error: {:?}", diff --git a/servers/src/grin/dandelion_monitor.rs b/servers/src/grin/dandelion_monitor.rs index 3316d4d347..f13d1324c7 100644 --- a/servers/src/grin/dandelion_monitor.rs +++ b/servers/src/grin/dandelion_monitor.rs @@ -16,7 +16,7 @@ use chrono::prelude::Utc; use rand::{thread_rng, Rng}; use std::sync::Arc; use std::thread; -use std::time::Duration; +use std::time::{Duration, Instant}; use crate::common::adapters::DandelionAdapter; use crate::core::core::hash::Hashed; @@ -39,42 +39,53 @@ pub fn monitor_transactions( adapter: Arc, verifier_cache: Arc>, stop_state: Arc>, -) { +) -> std::io::Result> { debug!("Started Dandelion transaction monitor."); - let _ = thread::Builder::new() + thread::Builder::new() .name("dandelion".to_string()) .spawn(move || { + let run_interval = Duration::from_secs(10); + let mut last_run = Instant::now() + .checked_sub(Duration::from_secs(20)) + .unwrap_or_else(|| Instant::now()); loop { // Halt Dandelion monitor if we have been notified that we are stopping. if stop_state.lock().is_stopped() { break; } - if !adapter.is_stem() { - let _ = - process_fluff_phase(&dandelion_config, &tx_pool, &adapter, &verifier_cache) - .map_err(|e| { - error!("dand_mon: Problem processing fluff phase. {:?}", e); - }); + if last_run.elapsed() > run_interval { + if !adapter.is_stem() { + let _ = process_fluff_phase( + &dandelion_config, + &tx_pool, + &adapter, + &verifier_cache, + ) + .map_err(|e| { + error!("dand_mon: Problem processing fluff phase. {:?}", e); + }); + } + + // Now find all expired entries based on embargo timer. + let _ = process_expired_entries(&dandelion_config, &tx_pool).map_err(|e| { + error!("dand_mon: Problem processing expired entries. {:?}", e); + }); + + // Handle the tx above *before* we transition to next epoch. + // This gives us an opportunity to do the final "fluff" before we start + // stemming on the subsequent epoch. + if adapter.is_expired() { + adapter.next_epoch(); + } + last_run = Instant::now(); } - // Now find all expired entries based on embargo timer. - let _ = process_expired_entries(&dandelion_config, &tx_pool).map_err(|e| { - error!("dand_mon: Problem processing expired entries. {:?}", e); - }); - - // Handle the tx above *before* we transition to next epoch. - // This gives us an opportunity to do the final "fluff" before we start - // stemming on the subsequent epoch. - if adapter.is_expired() { - adapter.next_epoch(); - } - - // Monitor loops every 10s. - thread::sleep(Duration::from_secs(10)); + // Monitor loops every 10s, but check stop flag every second. + thread::sleep(Duration::from_secs(1)); } - }); + }) } // Query the pool for transactions older than the cutoff. diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs index cda2462541..292986fd45 100644 --- a/servers/src/grin/seed.rs +++ b/servers/src/grin/seed.rs @@ -55,8 +55,8 @@ pub fn connect_and_monitor( seed_list: Box Vec + Send>, preferred_peers: Option>, stop_state: Arc>, -) { - let _ = thread::Builder::new() +) -> std::io::Result> { + thread::Builder::new() .name("seed".to_string()) .spawn(move || { let peers = p2p_server.peers.clone(); @@ -77,7 +77,6 @@ pub fn connect_and_monitor( let mut prev_expire_check = MIN_DATE.and_hms(0, 0, 0); let mut prev_ping = Utc::now(); let mut start_attempt = 0; - let mut connecting_history: HashMap> = HashMap::new(); loop { @@ -136,7 +135,7 @@ pub fn connect_and_monitor( thread::sleep(time::Duration::from_secs(1)); } - }); + }) } fn monitor_peers( diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index d337c6bb77..2b25aff300 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -21,7 +21,10 @@ use std::fs::File; use std::io::prelude::*; use std::path::Path; use std::sync::Arc; -use std::{thread, time}; +use std::{ + thread::{self, JoinHandle}, + time, +}; use fs2::FileExt; @@ -67,6 +70,10 @@ pub struct Server { pub stop_state: Arc>, /// Maintain a lock_file so we do not run multiple Grin nodes from same dir. lock_file: Arc, + connect_thread: Option>, + sync_thread: JoinHandle<()>, + dandelion_thread: JoinHandle<()>, + p2p_thread: JoinHandle<()>, } impl Server { @@ -208,6 +215,8 @@ impl Server { pool_net_adapter.init(p2p_server.peers.clone()); net_adapter.init(p2p_server.peers.clone()); + let mut connect_thread = None; + if config.p2p_config.seeding_type != p2p::Seeding::Programmatic { let seeder = match config.p2p_config.seeding_type { p2p::Seeding::None => { @@ -226,13 +235,13 @@ impl Server { _ => unreachable!(), }; - seed::connect_and_monitor( + connect_thread = Some(seed::connect_and_monitor( p2p_server.clone(), config.p2p_config.capabilities, seeder, config.p2p_config.peers_preferred.clone(), stop_state.clone(), - ); + )?); } // Defaults to None (optional) in config file. @@ -240,17 +249,21 @@ impl Server { let skip_sync_wait = config.skip_sync_wait.unwrap_or(false); sync_state.update(SyncStatus::AwaitingPeers(!skip_sync_wait)); - sync::run_sync( + let sync_thread = sync::run_sync( sync_state.clone(), p2p_server.peers.clone(), shared_chain.clone(), stop_state.clone(), - ); + )?; let p2p_inner = p2p_server.clone(); - let _ = thread::Builder::new() + let p2p_thread = thread::Builder::new() .name("p2p-server".to_string()) - .spawn(move || p2p_inner.listen()); + .spawn(move || { + if let Err(e) = p2p_inner.listen() { + error!("P2P server failed with erorr: {:?}", e); + } + })?; info!("Starting rest apis at: {}", &config.api_http_addr); let api_secret = get_first_line(config.api_secret_path.clone()); @@ -269,6 +282,7 @@ impl Server { } }; + // TODO fix API shutdown and join this thread api::start_rest_apis( config.api_http_addr.clone(), shared_chain.clone(), @@ -279,13 +293,13 @@ impl Server { ); info!("Starting dandelion monitor: {}", &config.api_http_addr); - dandelion_monitor::monitor_transactions( + let dandelion_thread = dandelion_monitor::monitor_transactions( config.dandelion_config.clone(), tx_pool.clone(), pool_net_adapter.clone(), verifier_cache.clone(), stop_state.clone(), - ); + )?; warn!("Grin server started."); Ok(Server { @@ -300,6 +314,10 @@ impl Server { }, stop_state, lock_file, + connect_thread, + sync_thread, + p2p_thread, + dandelion_thread, }) } @@ -489,9 +507,33 @@ impl Server { } /// Stop the server. - pub fn stop(&self) { - self.p2p.stop(); + pub fn stop(self) { self.stop_state.lock().stop(); + + if let Some(connect_thread) = self.connect_thread { + match connect_thread.join() { + Err(e) => error!("failed to join to connect_and_monitor thread: {:?}", e), + Ok(_) => info!("connect_and_monitor thread stopped"), + } + } else { + info!("No active connect_and_monitor thread") + } + + match self.sync_thread.join() { + Err(e) => error!("failed to join to sync thread: {:?}", e), + Ok(_) => info!("sync thread stopped"), + } + + match self.dandelion_thread.join() { + Err(e) => error!("failed to join to dandelion_monitor thread: {:?}", e), + Ok(_) => info!("dandelion_monitor thread stopped"), + } + + self.p2p.stop(); + match self.p2p_thread.join() { + Err(e) => error!("failed to join to p2p thread: {:?}", e), + Ok(_) => info!("p2p thread stopped"), + } let _ = self.lock_file.unlock(); } diff --git a/servers/src/grin/sync/syncer.rs b/servers/src/grin/sync/syncer.rs index 99d504becf..a98835ec96 100644 --- a/servers/src/grin/sync/syncer.rs +++ b/servers/src/grin/sync/syncer.rs @@ -31,13 +31,13 @@ pub fn run_sync( peers: Arc, chain: Arc, stop_state: Arc>, -) { - let _ = thread::Builder::new() +) -> std::io::Result> { + thread::Builder::new() .name("sync".to_string()) .spawn(move || { let runner = SyncRunner::new(sync_state, peers, chain, stop_state); runner.sync_loop(); - }); + }) } pub struct SyncRunner { diff --git a/src/bin/tui/ui.rs b/src/bin/tui/ui.rs index c4f078a45d..e38b06838e 100644 --- a/src/bin/tui/ui.rs +++ b/src/bin/tui/ui.rs @@ -26,7 +26,7 @@ use cursive::theme::{BaseColor, BorderStyle, Color, Theme}; use cursive::traits::Boxable; use cursive::traits::Identifiable; use cursive::utils::markup::StyledString; -use cursive::views::{LinearLayout, Panel, StackView, TextView, ViewBox}; +use cursive::views::{Dialog, LinearLayout, Panel, StackView, TextView, ViewBox}; use cursive::Cursive; use std::sync::mpsc; @@ -173,8 +173,9 @@ impl Controller { while let Some(message) = self.rx.try_iter().next() { match message { ControllerMessage::Shutdown => { - server.stop(); self.ui.stop(); + server.stop(); + return; } } } From 63727c1dd9d178daedba70a9b280852be450c012 Mon Sep 17 00:00:00 2001 From: hashmap Date: Tue, 7 May 2019 10:02:05 +0200 Subject: [PATCH 2/7] checkpoint --- servers/src/common/types.rs | 1 + servers/src/grin/server.rs | 36 +++++++++++++++++++----------------- src/bin/tui/status.rs | 1 + src/bin/tui/ui.rs | 4 +++- 4 files changed, 24 insertions(+), 18 deletions(-) diff --git a/servers/src/common/types.rs b/servers/src/common/types.rs index 1b7c9f34d9..9b119446ca 100644 --- a/servers/src/common/types.rs +++ b/servers/src/common/types.rs @@ -344,6 +344,7 @@ pub enum SyncStatus { current_height: u64, highest_height: u64, }, + Shutdown, } /// Current sync state. Encapsulates the current SyncStatus. diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index 2b25aff300..14959393c1 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -508,27 +508,29 @@ impl Server { /// Stop the server. pub fn stop(self) { - self.stop_state.lock().stop(); - - if let Some(connect_thread) = self.connect_thread { - match connect_thread.join() { - Err(e) => error!("failed to join to connect_and_monitor thread: {:?}", e), - Ok(_) => info!("connect_and_monitor thread stopped"), + { + self.sync_state.update(SyncStatus::Shutdown); + self.stop_state.lock().stop(); + + if let Some(connect_thread) = self.connect_thread { + match connect_thread.join() { + Err(e) => error!("failed to join to connect_and_monitor thread: {:?}", e), + Ok(_) => info!("connect_and_monitor thread stopped"), + } + } else { + info!("No active connect_and_monitor thread") } - } else { - info!("No active connect_and_monitor thread") - } - match self.sync_thread.join() { - Err(e) => error!("failed to join to sync thread: {:?}", e), - Ok(_) => info!("sync thread stopped"), - } + match self.sync_thread.join() { + Err(e) => error!("failed to join to sync thread: {:?}", e), + Ok(_) => info!("sync thread stopped"), + } - match self.dandelion_thread.join() { - Err(e) => error!("failed to join to dandelion_monitor thread: {:?}", e), - Ok(_) => info!("dandelion_monitor thread stopped"), + match self.dandelion_thread.join() { + Err(e) => error!("failed to join to dandelion_monitor thread: {:?}", e), + Ok(_) => info!("dandelion_monitor thread stopped"), + } } - self.p2p.stop(); match self.p2p_thread.join() { Err(e) => error!("failed to join to p2p thread: {:?}", e), diff --git a/src/bin/tui/status.rs b/src/bin/tui/status.rs index 6651c66b30..d5bb776576 100644 --- a/src/bin/tui/status.rs +++ b/src/bin/tui/status.rs @@ -199,6 +199,7 @@ impl TUIStatusListener for TUIStatusView { }; format!("Downloading blocks: {}%, step 4/4", percent) } + SyncStatus::Shutdown => "Shutting down, closing connections".to_string(), } }; /*let basic_mining_config_status = { diff --git a/src/bin/tui/ui.rs b/src/bin/tui/ui.rs index e38b06838e..4f12826695 100644 --- a/src/bin/tui/ui.rs +++ b/src/bin/tui/ui.rs @@ -173,8 +173,10 @@ impl Controller { while let Some(message) = self.rx.try_iter().next() { match message { ControllerMessage::Shutdown => { - self.ui.stop(); + let stats = server.get_server_stats().unwrap(); + status::TUIStatusView::update(&mut self.ui.cursive, &stats); server.stop(); + self.ui.stop(); return; } } From 87204838b01b8efa1def1623155ad5a6a265bf67 Mon Sep 17 00:00:00 2001 From: hashmap Date: Tue, 7 May 2019 19:46:50 +0200 Subject: [PATCH 3/7] Remove stop status mutex --- chain/src/chain.rs | 39 +-------------------------- chain/tests/data_file_integrity.rs | 6 ++--- chain/tests/mine_simple_chain.rs | 6 ++--- chain/tests/test_coinbase_maturity.rs | 4 +-- etc/gen_gen/src/bin/gen_gen.rs | 2 +- p2p/src/conn.rs | 6 +++++ p2p/src/serv.rs | 12 ++++----- p2p/tests/peer_handshake.rs | 4 +-- servers/src/grin/dandelion_monitor.rs | 6 ++--- servers/src/grin/seed.rs | 8 +++--- servers/src/grin/server.rs | 19 +++++++------ servers/src/grin/sync/body_sync.rs | 1 + servers/src/grin/sync/syncer.rs | 10 +++---- servers/src/mining/test_miner.rs | 8 +++--- src/bin/tui/ui.rs | 7 +++-- util/src/lib.rs | 30 +++++++++------------ 16 files changed, 66 insertions(+), 102 deletions(-) diff --git a/chain/src/chain.rs b/chain/src/chain.rs index c0ec8e4331..b908935d3a 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -32,7 +32,7 @@ use crate::types::{ BlockStatus, ChainAdapter, NoStatus, Options, Tip, TxHashSetRoots, TxHashsetWriteStatus, }; use crate::util::secp::pedersen::{Commitment, RangeProof}; -use crate::util::{Mutex, RwLock, StopState}; +use crate::util::RwLock; use grin_store::Error::NotFoundErr; use std::collections::HashMap; use std::fs::{self, File}; @@ -150,7 +150,6 @@ pub struct Chain { // POW verification function pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>, archive_mode: bool, - stop_state: Arc>, genesis: BlockHeader, } @@ -165,16 +164,7 @@ impl Chain { pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>, verifier_cache: Arc>, archive_mode: bool, - stop_state: Arc>, ) -> Result { - // Note: We take a lock on the stop_state here and do not release it until - // we have finished chain initialization. - let stop_state_local = stop_state.clone(); - let stop_lock = stop_state_local.lock(); - if stop_lock.is_stopped() { - return Err(ErrorKind::Stopped.into()); - } - let store = Arc::new(store::ChainStore::new(&db_root)?); // open the txhashset, creating a new one if necessary @@ -192,7 +182,6 @@ impl Chain { pow_verifier, verifier_cache, archive_mode, - stop_state, genesis: genesis.header.clone(), }) } @@ -281,15 +270,6 @@ impl Chain { /// or false if it has added to a fork (or orphan?). fn process_block_single(&self, b: Block, opts: Options) -> Result, Error> { let (maybe_new_head, prev_head) = { - // Note: We take a lock on the stop_state here and do not release it until - // we have finished processing this single block. - // We take care to write both the txhashset *and* the batch while we - // have the stop_state lock. - let stop_lock = self.stop_state.lock(); - if stop_lock.is_stopped() { - return Err(ErrorKind::Stopped.into()); - } - let mut txhashset = self.txhashset.write(); let batch = self.store.batch()?; let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?; @@ -379,15 +359,6 @@ impl Chain { /// This is only ever used during sync and is based on sync_head. /// We update header_head here if our total work increases. pub fn sync_block_headers(&self, headers: &[BlockHeader], opts: Options) -> Result<(), Error> { - // Note: We take a lock on the stop_state here and do not release it until - // we have finished processing this single block. - // We take care to write both the txhashset *and* the batch while we - // have the stop_state lock. - let stop_lock = self.stop_state.lock(); - if stop_lock.is_stopped() { - return Err(ErrorKind::Stopped.into()); - } - let mut txhashset = self.txhashset.write(); let batch = self.store.batch()?; let mut ctx = self.new_ctx(opts, batch, &mut txhashset)?; @@ -1094,14 +1065,6 @@ impl Chain { } } - // Note: We take a lock on the stop_state here and do not release it until - // we have finished processing this chain compaction operation. - // We want to avoid shutting the node down in the middle of compacting the data. - let stop_lock = self.stop_state.lock(); - if stop_lock.is_stopped() { - return Err(ErrorKind::Stopped.into()); - } - // Take a write lock on the txhashet and start a new writeable db batch. let mut txhashset = self.txhashset.write(); let mut batch = self.store.batch()?; diff --git a/chain/tests/data_file_integrity.rs b/chain/tests/data_file_integrity.rs index 9e8b9a8166..e07fab839e 100644 --- a/chain/tests/data_file_integrity.rs +++ b/chain/tests/data_file_integrity.rs @@ -21,7 +21,7 @@ use self::core::libtx; use self::core::pow::{self, Difficulty}; use self::core::{consensus, genesis}; use self::keychain::{ExtKeychain, ExtKeychainPath, Keychain}; -use self::util::{Mutex, RwLock, StopState}; +use self::util::{RwLock, StopState}; use chrono::Duration; use grin_chain as chain; use grin_core as core; @@ -47,7 +47,7 @@ fn setup(dir_name: &str) -> Chain { pow::verify_size, verifier_cache, false, - Arc::new(Mutex::new(StopState::new())), + Arc::new(StopState::new()), ) .unwrap() } @@ -61,7 +61,7 @@ fn reload_chain(dir_name: &str) -> Chain { pow::verify_size, verifier_cache, false, - Arc::new(Mutex::new(StopState::new())), + Arc::new(StopState::new()), ) .unwrap() } diff --git a/chain/tests/mine_simple_chain.rs b/chain/tests/mine_simple_chain.rs index 32fc81173e..2961928414 100644 --- a/chain/tests/mine_simple_chain.rs +++ b/chain/tests/mine_simple_chain.rs @@ -23,7 +23,7 @@ use self::core::libtx::{self, build, reward}; use self::core::pow::Difficulty; use self::core::{consensus, global, pow}; use self::keychain::{ExtKeychain, ExtKeychainPath, Keychain}; -use self::util::{Mutex, RwLock, StopState}; +use self::util::{RwLock, StopState}; use chrono::Duration; use grin_chain as chain; use grin_core as core; @@ -47,7 +47,7 @@ fn setup(dir_name: &str, genesis: Block) -> Chain { pow::verify_size, verifier_cache, false, - Arc::new(Mutex::new(StopState::new())), + Arc::new(StopState::new()), ) .unwrap() } @@ -565,7 +565,7 @@ fn actual_diff_iter_output() { pow::verify_size, verifier_cache, false, - Arc::new(Mutex::new(StopState::new())), + Arc::new(StopState::new()), ) .unwrap(); let iter = chain.difficulty_iter().unwrap(); diff --git a/chain/tests/test_coinbase_maturity.rs b/chain/tests/test_coinbase_maturity.rs index 6f4575c1ee..2e7c607e36 100644 --- a/chain/tests/test_coinbase_maturity.rs +++ b/chain/tests/test_coinbase_maturity.rs @@ -20,7 +20,7 @@ use self::core::libtx::{self, build}; use self::core::pow::Difficulty; use self::core::{consensus, pow}; use self::keychain::{ExtKeychain, ExtKeychainPath, Keychain}; -use self::util::{Mutex, RwLock, StopState}; +use self::util::{RwLock, StopState}; use chrono::Duration; use env_logger; use grin_chain as chain; @@ -53,7 +53,7 @@ fn test_coinbase_maturity() { pow::verify_size, verifier_cache, false, - Arc::new(Mutex::new(StopState::new())), + Arc::new(StopState::new()), ) .unwrap(); diff --git a/etc/gen_gen/src/bin/gen_gen.rs b/etc/gen_gen/src/bin/gen_gen.rs index 829ba42346..b9abbb2c6b 100644 --- a/etc/gen_gen/src/bin/gen_gen.rs +++ b/etc/gen_gen/src/bin/gen_gen.rs @@ -281,7 +281,7 @@ fn setup_chain(dir_name: &str, genesis: core::core::Block) -> chain::Chain { core::pow::verify_size, verifier_cache, false, - Arc::new(util::Mutex::new(util::StopState::new())), + Arc::new(util::StopState::new()), ) .unwrap() } diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index 5591a7e30d..370fb111c1 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -201,9 +201,15 @@ impl Tracker { if let Some(peer_thread) = self.peer_thread.take() { // wait only if other thread is calling us, eg shutdown if thread::current().id() != peer_thread.thread().id() { + debug!("waiting for thread {:?} exit", peer_thread.thread().id()); if let Err(e) = peer_thread.join() { error!("failed to stop peer thread: {:?}", e); } + } else { + debug!( + "stopping thread {:?} within the same thread", + peer_thread.thread().id() + ); } } } diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 7b7ff05f2d..3afaf17c66 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -32,7 +32,7 @@ use crate::types::{ Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, TxHashSetRead, }; -use crate::util::{Mutex, StopState}; +use crate::util::StopState; use chrono::prelude::{DateTime, Utc}; /// P2P server implementation, handling bootstrapping to find and connect to @@ -42,7 +42,7 @@ pub struct Server { capabilities: Capabilities, handshake: Arc, pub peers: Arc, - stop_state: Arc>, + stop_state: Arc, } // TODO TLS @@ -54,7 +54,7 @@ impl Server { config: P2PConfig, adapter: Arc, genesis: Hash, - stop_state: Arc>, + stop_state: Arc, ) -> Result { Ok(Server { config: config.clone(), @@ -76,7 +76,7 @@ impl Server { let sleep_time = Duration::from_millis(5); loop { // Pause peer ingress connection request. Only for tests. - if self.stop_state.lock().is_paused() { + if self.stop_state.is_paused() { thread::sleep(Duration::from_secs(1)); continue; } @@ -100,7 +100,7 @@ impl Server { debug!("Couldn't establish new client connection: {:?}", e); } } - if self.stop_state.lock().is_stopped() { + if self.stop_state.is_stopped() { break; } thread::sleep(sleep_time); @@ -213,7 +213,7 @@ impl Server { } pub fn stop(&self) { - self.stop_state.lock().stop(); + self.stop_state.stop(); self.peers.stop(); } diff --git a/p2p/tests/peer_handshake.rs b/p2p/tests/peer_handshake.rs index 3afe32c809..3f72bd2df7 100644 --- a/p2p/tests/peer_handshake.rs +++ b/p2p/tests/peer_handshake.rs @@ -16,7 +16,7 @@ use grin_core as core; use grin_p2p as p2p; use grin_util as util; -use grin_util::{Mutex, StopState}; +use grin_util::StopState; use std::net::{SocketAddr, TcpListener, TcpStream}; use std::sync::Arc; @@ -56,7 +56,7 @@ fn peer_handshake() { p2p_config.clone(), net_adapter.clone(), Hash::from_vec(&vec![]), - Arc::new(Mutex::new(StopState::new())), + Arc::new(StopState::new()), ) .unwrap(), ); diff --git a/servers/src/grin/dandelion_monitor.rs b/servers/src/grin/dandelion_monitor.rs index f13d1324c7..685d36c9d6 100644 --- a/servers/src/grin/dandelion_monitor.rs +++ b/servers/src/grin/dandelion_monitor.rs @@ -23,7 +23,7 @@ use crate::core::core::hash::Hashed; use crate::core::core::transaction; use crate::core::core::verifier_cache::VerifierCache; use crate::pool::{DandelionConfig, Pool, PoolEntry, PoolError, TransactionPool, TxSource}; -use crate::util::{Mutex, RwLock, StopState}; +use crate::util::{RwLock, StopState}; /// A process to monitor transactions in the stempool. /// With Dandelion, transaction can be broadcasted in stem or fluff phase. @@ -38,7 +38,7 @@ pub fn monitor_transactions( tx_pool: Arc>, adapter: Arc, verifier_cache: Arc>, - stop_state: Arc>, + stop_state: Arc, ) -> std::io::Result> { debug!("Started Dandelion transaction monitor."); @@ -51,7 +51,7 @@ pub fn monitor_transactions( .unwrap_or_else(|| Instant::now()); loop { // Halt Dandelion monitor if we have been notified that we are stopping. - if stop_state.lock().is_stopped() { + if stop_state.is_stopped() { break; } diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs index 292986fd45..9fbecde6ba 100644 --- a/servers/src/grin/seed.rs +++ b/servers/src/grin/seed.rs @@ -30,7 +30,7 @@ use crate::core::global; use crate::p2p; use crate::p2p::types::PeerAddr; use crate::p2p::ChainAdapter; -use crate::util::{Mutex, StopState}; +use crate::util::StopState; // DNS Seeds with contact email associated const MAINNET_DNS_SEEDS: &'static [&'static str] = &[ @@ -54,7 +54,7 @@ pub fn connect_and_monitor( capabilities: p2p::Capabilities, seed_list: Box Vec + Send>, preferred_peers: Option>, - stop_state: Arc>, + stop_state: Arc, ) -> std::io::Result> { thread::Builder::new() .name("seed".to_string()) @@ -80,12 +80,12 @@ pub fn connect_and_monitor( let mut connecting_history: HashMap> = HashMap::new(); loop { - if stop_state.lock().is_stopped() { + if stop_state.is_stopped() { break; } // Pause egress peer connection request. Only for tests. - if stop_state.lock().is_paused() { + if stop_state.is_paused() { thread::sleep(time::Duration::from_secs(1)); continue; } diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index 14959393c1..dbc769f7e6 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -47,7 +47,7 @@ use crate::p2p; use crate::p2p::types::PeerAddr; use crate::pool; use crate::util::file::get_first_line; -use crate::util::{Mutex, RwLock, StopState}; +use crate::util::{RwLock, StopState}; /// Grin server holding internal structures. pub struct Server { @@ -67,7 +67,7 @@ pub struct Server { /// To be passed around to collect stats and info state_info: ServerStateInfo, /// Stop flag - pub stop_state: Arc>, + pub stop_state: Arc, /// Maintain a lock_file so we do not run multiple Grin nodes from same dir. lock_file: Arc, connect_thread: Option>, @@ -149,7 +149,7 @@ impl Server { Some(b) => b, }; - let stop_state = Arc::new(Mutex::new(StopState::new())); + let stop_state = Arc::new(StopState::new()); // Shared cache for verification results. // We cache rangeproof verification and kernel signature verification. @@ -187,7 +187,6 @@ impl Server { pow::verify_size, verifier_cache.clone(), archive_mode, - stop_state.clone(), )?); pool_adapter.set_chain(shared_chain.clone()); @@ -366,7 +365,7 @@ impl Server { pub fn start_test_miner( &self, wallet_listener_url: Option, - stop_state: Arc>, + stop_state: Arc, ) { info!("start_test_miner - start",); let sync_state = self.sync_state.clone(); @@ -510,7 +509,7 @@ impl Server { pub fn stop(self) { { self.sync_state.update(SyncStatus::Shutdown); - self.stop_state.lock().stop(); + self.stop_state.stop(); if let Some(connect_thread) = self.connect_thread { match connect_thread.join() { @@ -541,7 +540,7 @@ impl Server { /// Pause the p2p server. pub fn pause(&self) { - self.stop_state.lock().pause(); + self.stop_state.pause(); thread::sleep(time::Duration::from_secs(1)); self.p2p.pause(); } @@ -549,12 +548,12 @@ impl Server { /// Resume p2p server. /// TODO - We appear not to resume the p2p server (peer connections) here? pub fn resume(&self) { - self.stop_state.lock().resume(); + self.stop_state.resume(); } /// Stops the test miner without stopping the p2p layer - pub fn stop_test_miner(&self, stop: Arc>) { - stop.lock().stop(); + pub fn stop_test_miner(&self, stop: Arc) { + stop.stop(); info!("stop_test_miner - stop",); } } diff --git a/servers/src/grin/sync/body_sync.rs b/servers/src/grin/sync/body_sync.rs index 15c2b93322..687258642a 100644 --- a/servers/src/grin/sync/body_sync.rs +++ b/servers/src/grin/sync/body_sync.rs @@ -141,6 +141,7 @@ impl BodySync { if let Some(peer) = peers_iter.next() { if let Err(e) = peer.send_block_request(*hash) { debug!("Skipped request to {}: {:?}", peer.info.addr, e); + peer.stop(); } else { self.blocks_requested += 1; } diff --git a/servers/src/grin/sync/syncer.rs b/servers/src/grin/sync/syncer.rs index a98835ec96..98653a0c69 100644 --- a/servers/src/grin/sync/syncer.rs +++ b/servers/src/grin/sync/syncer.rs @@ -24,13 +24,13 @@ use crate::grin::sync::body_sync::BodySync; use crate::grin::sync::header_sync::HeaderSync; use crate::grin::sync::state_sync::StateSync; use crate::p2p; -use crate::util::{Mutex, StopState}; +use crate::util::StopState; pub fn run_sync( sync_state: Arc, peers: Arc, chain: Arc, - stop_state: Arc>, + stop_state: Arc, ) -> std::io::Result> { thread::Builder::new() .name("sync".to_string()) @@ -44,7 +44,7 @@ pub struct SyncRunner { sync_state: Arc, peers: Arc, chain: Arc, - stop_state: Arc>, + stop_state: Arc, } impl SyncRunner { @@ -52,7 +52,7 @@ impl SyncRunner { sync_state: Arc, peers: Arc, chain: Arc, - stop_state: Arc>, + stop_state: Arc, ) -> SyncRunner { SyncRunner { sync_state, @@ -140,7 +140,7 @@ impl SyncRunner { // Main syncing loop loop { - if self.stop_state.lock().is_stopped() { + if self.stop_state.is_stopped() { break; } diff --git a/servers/src/mining/test_miner.rs b/servers/src/mining/test_miner.rs index d671855ea5..832fc32446 100644 --- a/servers/src/mining/test_miner.rs +++ b/servers/src/mining/test_miner.rs @@ -29,14 +29,14 @@ use crate::core::core::{Block, BlockHeader}; use crate::core::global; use crate::mining::mine_block; use crate::pool; -use crate::util::{Mutex, StopState}; +use crate::util::StopState; pub struct Miner { config: StratumServerConfig, chain: Arc, tx_pool: Arc>, verifier_cache: Arc>, - stop_state: Arc>, + stop_state: Arc, // Just to hold the port we're on, so this miner can be identified // while watching debug output @@ -51,7 +51,7 @@ impl Miner { chain: Arc, tx_pool: Arc>, verifier_cache: Arc>, - stop_state: Arc>, + stop_state: Arc, ) -> Miner { Miner { config, @@ -136,7 +136,7 @@ impl Miner { let mut key_id = None; loop { - if self.stop_state.lock().is_stopped() { + if self.stop_state.is_stopped() { break; } diff --git a/src/bin/tui/ui.rs b/src/bin/tui/ui.rs index 4f12826695..d8d7a95db0 100644 --- a/src/bin/tui/ui.rs +++ b/src/bin/tui/ui.rs @@ -26,7 +26,7 @@ use cursive::theme::{BaseColor, BorderStyle, Color, Theme}; use cursive::traits::Boxable; use cursive::traits::Identifiable; use cursive::utils::markup::StyledString; -use cursive::views::{Dialog, LinearLayout, Panel, StackView, TextView, ViewBox}; +use cursive::views::{LinearLayout, Panel, StackView, TextView, ViewBox}; use cursive::Cursive; use std::sync::mpsc; @@ -173,10 +173,9 @@ impl Controller { while let Some(message) = self.rx.try_iter().next() { match message { ControllerMessage::Shutdown => { - let stats = server.get_server_stats().unwrap(); - status::TUIStatusView::update(&mut self.ui.cursive, &stats); - server.stop(); self.ui.stop(); + println!("Shutdown in progress, please wait"); + server.stop(); return; } } diff --git a/util/src/lib.rs b/util/src/lib.rs index 93883e951f..d4577ed43b 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -53,8 +53,8 @@ pub mod read_write; // other utils #[allow(unused_imports)] use std::ops::Deref; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; - mod hex; pub use crate::hex::*; @@ -112,49 +112,45 @@ pub fn to_base64(s: &str) -> String { /// Global stopped/paused state shared across various subcomponents of Grin. /// -/// Arc> allows the chain to lock the stop_state during critical processing. -/// Other subcomponents cannot abruptly shutdown the server during block/header processing. -/// This should prevent the chain ever ending up in an inconsistent state on restart. -/// /// "Stopped" allows a clean shutdown of the Grin server. /// "Paused" is used in some tests to allow nodes to reach steady state etc. /// pub struct StopState { - stopped: bool, - paused: bool, + stopped: AtomicBool, + paused: AtomicBool, } impl StopState { /// Create a new stop_state in default "running" state. pub fn new() -> StopState { StopState { - stopped: false, - paused: false, + stopped: AtomicBool::new(false), + paused: AtomicBool::new(false), } } /// Check if we are stopped. pub fn is_stopped(&self) -> bool { - self.stopped + self.stopped.load(Ordering::Relaxed) } /// Check if we are paused. pub fn is_paused(&self) -> bool { - self.paused + self.paused.load(Ordering::Relaxed) } /// Stop the server. - pub fn stop(&mut self) { - self.stopped = true; + pub fn stop(&self) { + self.stopped.store(true, Ordering::Relaxed) } /// Pause the server (only used in tests). - pub fn pause(&mut self) { - self.paused = true; + pub fn pause(&self) { + self.paused.store(true, Ordering::Relaxed) } /// Resume a paused server (only used in tests). - pub fn resume(&mut self) { - self.paused = false; + pub fn resume(&self) { + self.paused.store(false, Ordering::Relaxed) } } From 7aece30ab6190f4f577430a6cfe67dcee8d5f8a9 Mon Sep 17 00:00:00 2001 From: hashmap Date: Thu, 9 May 2019 15:57:33 +0200 Subject: [PATCH 4/7] remove some deadlocks --- p2p/src/conn.rs | 106 +++++++++++++++++++++------------------ p2p/src/peer.rs | 52 ++++++++++++------- p2p/src/protocol.rs | 10 ++-- p2p/src/serv.rs | 10 ++-- servers/src/grin/seed.rs | 45 +++++++++++------ 5 files changed, 133 insertions(+), 90 deletions(-) diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index 370fb111c1..7da51568da 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -44,7 +44,7 @@ pub trait MessageHandler: Send + 'static { &self, msg: Message<'a>, writer: &'a mut dyn Write, - received_bytes: Arc>, + tracker: Arc, ) -> Result>, Error>; } @@ -131,15 +131,12 @@ impl<'a> Response<'a> { }) } - fn write(mut self, sent_bytes: Arc>) -> Result<(), Error> { + fn write(mut self, tracker: Arc) -> Result<(), Error> { let mut msg = ser::ser_vec(&MsgHeader::new(self.resp_type, self.body.len() as u64))?; msg.append(&mut self.body); write_all(&mut self.stream, &msg[..], time::Duration::from_secs(10))?; - // Increase sent bytes counter - { - let mut sent_bytes = sent_bytes.write(); - sent_bytes.inc(msg.len() as u64); - } + tracker.inc_sent(msg.len() as u64); + if let Some(mut file) = self.attachment { let mut buf = [0u8; 8000]; loop { @@ -149,8 +146,7 @@ impl<'a> Response<'a> { write_all(&mut self.stream, &buf[..n], time::Duration::from_secs(10))?; // Increase sent bytes "quietly" without incrementing the counter. // (In a loop here for the single attachment). - let mut sent_bytes = sent_bytes.write(); - sent_bytes.inc_quiet(n as u64); + tracker.inc_quiet_sent(n as u64); } Err(e) => return Err(From::from(e)), } @@ -166,11 +162,7 @@ impl<'a> Response<'a> { pub const SEND_CHANNEL_CAP: usize = 10; -pub struct Tracker { - /// Bytes we've sent. - pub sent_bytes: Arc>, - /// Bytes we've received. - pub received_bytes: Arc>, +pub struct ConnHandle { /// Channel to allow sending data through the connection pub send_channel: mpsc::SyncSender>, /// Channel to close the connection @@ -179,8 +171,8 @@ pub struct Tracker { peer_thread: Option>, } -impl Tracker { - pub fn send(&self, body: T, msg_type: Type) -> Result<(), Error> +impl ConnHandle { + pub fn send(&self, body: T, msg_type: Type) -> Result where T: ser::Writeable, { @@ -189,15 +181,18 @@ impl Tracker { self.send_channel.try_send(buf)?; // Increase sent bytes counter - let mut sent_bytes = self.sent_bytes.write(); - sent_bytes.inc(buf_len as u64); + //let mut sent_bytes = self.sent_bytes.write(); + //sent_bytes.inc(buf_len as u64); - Ok(()) + Ok(buf_len as u64) } /// Schedule this connection to safely close via the async close_channel. pub fn close(&mut self) { - let _ = self.close_channel.send(()); + if self.close_channel.send(()).is_err() { + debug!("peer's close_channel is disconnected, must be stopped already"); + return; + } if let Some(peer_thread) = self.peer_thread.take() { // wait only if other thread is calling us, eg shutdown if thread::current().id() != peer_thread.thread().id() { @@ -207,7 +202,7 @@ impl Tracker { } } else { debug!( - "stopping thread {:?} within the same thread", + "attempt to stop thread {:?} from itself", peer_thread.thread().id() ); } @@ -215,36 +210,56 @@ impl Tracker { } } +pub struct Tracker { + /// Bytes we've sent. + pub sent_bytes: Arc>, + /// Bytes we've received. + pub received_bytes: Arc>, +} + +impl Tracker { + pub fn new() -> Tracker { + let received_bytes = Arc::new(RwLock::new(RateCounter::new())); + let sent_bytes = Arc::new(RwLock::new(RateCounter::new())); + Tracker { + received_bytes, + sent_bytes, + } + } + + pub fn inc_received(&self, size: u64) { + self.received_bytes.write().inc(size); + } + + pub fn inc_sent(&self, size: u64) { + self.sent_bytes.write().inc(size); + } + + pub fn inc_quiet_received(&self, size: u64) { + self.received_bytes.write().inc_quiet(size); + } + + pub fn inc_quiet_sent(&self, size: u64) { + self.sent_bytes.write().inc_quiet(size); + } +} + /// Start listening on the provided connection and wraps it. Does not hang /// the current thread, instead just returns a future and the Connection /// itself. -pub fn listen(stream: TcpStream, handler: H) -> io::Result +pub fn listen(stream: TcpStream, tracker: Arc, handler: H) -> io::Result where H: MessageHandler, { let (send_tx, send_rx) = mpsc::sync_channel(SEND_CHANNEL_CAP); let (close_tx, close_rx) = mpsc::channel(); - // Counter of number of bytes received - let received_bytes = Arc::new(RwLock::new(RateCounter::new())); - // Counter of number of bytes sent - let sent_bytes = Arc::new(RwLock::new(RateCounter::new())); - stream .set_nonblocking(true) .expect("Non-blocking IO not available."); - let peer_thread = poll( - stream, - handler, - send_rx, - close_rx, - received_bytes.clone(), - sent_bytes.clone(), - )?; - - Ok(Tracker { - sent_bytes: sent_bytes.clone(), - received_bytes: received_bytes.clone(), + let peer_thread = poll(stream, handler, send_rx, close_rx, tracker)?; + + Ok(ConnHandle { send_channel: send_tx, close_channel: close_tx, peer_thread: Some(peer_thread), @@ -256,8 +271,7 @@ fn poll( handler: H, send_rx: mpsc::Receiver>, close_rx: mpsc::Receiver<()>, - received_bytes: Arc>, - sent_bytes: Arc>, + tracker: Arc, ) -> io::Result> where H: MessageHandler, @@ -283,16 +297,12 @@ where ); // Increase received bytes counter - let received = received_bytes.clone(); - { - let mut received_bytes = received_bytes.write(); - received_bytes.inc(MsgHeader::LEN as u64 + msg.header.msg_len); - } + tracker.inc_received(MsgHeader::LEN as u64 + msg.header.msg_len); if let Some(Some(resp)) = - try_break!(handler.consume(msg, &mut writer, received)) + try_break!(handler.consume(msg, &mut writer, tracker.clone())) { - try_break!(resp.write(sent_bytes.clone())); + try_break!(resp.write(tracker.clone())); } } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 36eb58a7f7..a291dec17b 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -44,6 +44,7 @@ const MAX_PEER_MSG_PER_MIN: u64 = 500; enum State { Connected, Banned, + Stopping, } pub struct Peer { @@ -51,7 +52,8 @@ pub struct Peer { state: Arc>, // set of all hashes known to this peer (so no need to send) tracking_adapter: TrackingAdapter, - connection: Mutex, + tracker: Arc, + handle: Mutex, } impl fmt::Debug for Peer { @@ -66,12 +68,14 @@ impl Peer { let state = Arc::new(RwLock::new(State::Connected)); let tracking_adapter = TrackingAdapter::new(adapter); let handler = Protocol::new(Arc::new(tracking_adapter.clone()), info.clone()); - let connection = Mutex::new(conn::listen(conn, handler)?); + let tracker = Arc::new(conn::Tracker::new()); + let handle = Mutex::new(conn::listen(conn, tracker.clone(), handler)?); Ok(Peer { info, state, tracking_adapter, - connection, + tracker, + handle, }) } @@ -152,7 +156,7 @@ impl Peer { } } - // default to allowing peer connection if we do not explicitly allow or deny + // default to allowing peer tracker if we do not explicitly allow or deny // the peer false } @@ -167,6 +171,11 @@ impl Peer { State::Banned == *self.state.read() } + /// Whether this peer has been stopping. + pub fn is_stopping(&self) -> bool { + State::Banned == *self.state.read() + } + /// Whether this peer is stuck on sync. pub fn is_stuck(&self) -> (bool, Difficulty) { let peer_live_info = self.info.live_info.read(); @@ -181,30 +190,26 @@ impl Peer { /// Whether the peer is considered abusive, mostly for spammy nodes pub fn is_abusive(&self) -> bool { - let conn = self.connection.lock(); - let rec = conn.received_bytes.read(); - let sent = conn.sent_bytes.read(); + let rec = self.tracker.received_bytes.read(); + let sent = self.tracker.sent_bytes.read(); rec.count_per_min() > MAX_PEER_MSG_PER_MIN || sent.count_per_min() > MAX_PEER_MSG_PER_MIN } /// Number of bytes sent to the peer pub fn last_min_sent_bytes(&self) -> Option { - let conn = self.connection.lock(); - let sent_bytes = conn.sent_bytes.read(); + let sent_bytes = self.tracker.sent_bytes.read(); Some(sent_bytes.bytes_per_min()) } /// Number of bytes received from the peer pub fn last_min_received_bytes(&self) -> Option { - let conn = self.connection.lock(); - let received_bytes = conn.received_bytes.read(); + let received_bytes = self.tracker.received_bytes.read(); Some(received_bytes.bytes_per_min()) } pub fn last_min_message_counts(&self) -> Option<(u64, u64)> { - let conn = self.connection.lock(); - let received_bytes = conn.received_bytes.read(); - let sent_bytes = conn.sent_bytes.read(); + let received_bytes = self.tracker.received_bytes.read(); + let sent_bytes = self.tracker.sent_bytes.read(); Some((sent_bytes.count_per_min(), received_bytes.count_per_min())) } @@ -213,9 +218,19 @@ impl Peer { *self.state.write() = State::Banned; } - /// Send a msg with given msg_type to our peer via the connection. + /// Set this peer status to stopping + pub fn set_stopping(&self) { + *self.state.write() = State::Stopping; + } + + /// Send a msg with given msg_type to our peer via the tracker. fn send(&self, msg: T, msg_type: Type) -> Result<(), Error> { - self.connection.lock().send(msg, msg_type) + if self.is_stopping() { + return Err(Error::ConnectionClose); + } + let bytes = self.handle.lock().send(msg, msg_type)?; + self.tracker.inc_sent(bytes); + Ok(()) } /// Send a ping to the remote peer, providing our local difficulty and @@ -379,9 +394,10 @@ impl Peer { ) } - /// Stops the peer, closing its connection + /// Stops the peer, closing its tracker pub fn stop(&self) { - self.connection.lock().close(); + self.set_stopping(); + self.handle.lock().close(); } } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 200d1f3703..e538185a7a 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -18,9 +18,8 @@ use std::fs::{self, File, OpenOptions}; use std::io::{BufWriter, Write}; use std::sync::Arc; -use crate::conn::{Message, MessageHandler, Response}; +use crate::conn::{Message, MessageHandler, Response, Tracker}; use crate::core::core::{self, hash::Hash, CompactBlock}; -use crate::util::{RateCounter, RwLock}; use chrono::prelude::Utc; use crate::msg::{ @@ -45,7 +44,7 @@ impl MessageHandler for Protocol { &self, mut msg: Message<'a>, writer: &'a mut dyn Write, - received_bytes: Arc>, + tracker: Arc, ) -> Result>, Error> { let adapter = &self.adapter; @@ -312,10 +311,7 @@ impl MessageHandler for Protocol { // Increase received bytes quietly (without affecting the counters). // Otherwise we risk banning a peer as "abusive". - { - let mut received_bytes = received_bytes.write(); - received_bytes.inc_quiet(size as u64); - } + tracker.inc_quiet_received(size as u64) } tmp_zip .into_inner() diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 3afaf17c66..05c2862622 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -110,7 +110,11 @@ impl Server { /// Asks the server to connect to a new peer. Directly returns the peer if /// we're already connected to the provided address. - pub fn connect(&self, addr: PeerAddr) -> Result, Error> { + pub fn connect(&self, addr: PeerAddr) -> Result<(Arc, bool), Error> { + if self.stop_state.is_stopped() { + return Err(Error::ConnectionClose); + } + if Peer::is_denied(&self.config, addr) { debug!("connect_peer: peer {} denied, not connecting.", addr); return Err(Error::ConnectionClose); @@ -128,7 +132,7 @@ impl Server { if let Some(p) = self.peers.get_connected_peer(addr) { // if we're already connected to the addr, just return the peer trace!("connect_peer: already connected {}", addr); - return Ok(p); + return Ok((p, false)); } trace!( @@ -152,7 +156,7 @@ impl Server { )?; let peer = Arc::new(peer); self.peers.add_connected(peer.clone())?; - Ok(peer) + Ok((peer, true)) } Err(e) => { trace!( diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs index 9fbecde6ba..94319aafdb 100644 --- a/servers/src/grin/seed.rs +++ b/servers/src/grin/seed.rs @@ -101,7 +101,7 @@ pub fn connect_and_monitor( // with exponential backoff if Utc::now() - prev > Duration::seconds(cmp::min(20, 1 << start_attempt)) { // try to connect to any address sent to the channel - listen_for_addrs( + let connecting_threads = listen_for_addrs( peers.clone(), p2p_server.clone(), capabilities, @@ -109,6 +109,14 @@ pub fn connect_and_monitor( &mut connecting_history, ); + debug!("waiting for connecting requests to finish"); + for th in connecting_threads { + if let Err(e) = th.join() { + error!("failed to join for a connecting request thread: {:?}", e); + } + } + debug!("done waiting for connecting requests to finish"); + // monitor additional peers if we need to add more monitor_peers( peers.clone(), @@ -288,16 +296,17 @@ fn listen_for_addrs( capab: p2p::Capabilities, rx: &mpsc::Receiver, connecting_history: &mut HashMap>, -) { +) -> Vec> { // Pull everything currently on the queue off the queue. // Does not block so addrs may be empty. // We will take(max_peers) from this later but we want to drain the rx queue // here to prevent it backing up. let addrs: Vec = rx.try_iter().collect(); + let mut threads = vec![]; // If we have a healthy number of outbound peers then we are done here. if peers.peer_count() > peers.peer_outbound_count() && peers.healthy_peers_mix() { - return; + return threads; } // Try to connect to (up to max peers) peer addresses. @@ -325,17 +334,24 @@ fn listen_for_addrs( let peers_c = peers.clone(); let p2p_c = p2p.clone(); - let _ = thread::Builder::new() - .name("peer_connect".to_string()) - .spawn(move || match p2p_c.connect(addr) { - Ok(p) => { - let _ = p.send_peer_request(capab); - let _ = peers_c.update_state(addr, p2p::State::Healthy); - } - Err(_) => { - let _ = peers_c.update_state(addr, p2p::State::Defunct); - } - }); + threads.push( + thread::Builder::new() + .name("peer_connect".to_string()) + .spawn(move || match p2p_c.connect(addr) { + Ok((p, true)) => { + if p.send_peer_request(capab).is_ok() { + let _ = peers_c.update_state(addr, p2p::State::Healthy); + } + } + Ok((_, false)) => { + debug!("peer_connect: peer is already connected"); + } + Err(_) => { + let _ = peers_c.update_state(addr, p2p::State::Defunct); + } + }) + .expect("failed to launch peer_connect thread"), + ); } // shrink the connecting history. @@ -351,6 +367,7 @@ fn listen_for_addrs( connecting_history.remove(&addr); } } + threads } pub fn dns_seeds() -> Box Vec + Send> { From eed759d91ea90a0dc2c5ea7e288ce63000e9d310 Mon Sep 17 00:00:00 2001 From: hashmap Date: Fri, 10 May 2019 14:11:26 +0200 Subject: [PATCH 5/7] Rewrite stop channel handling --- p2p/src/conn.rs | 76 +++++++++++++++++++++------------ p2p/src/peer.rs | 47 +++++++++++--------- p2p/src/peers.rs | 31 ++++++++++---- p2p/src/serv.rs | 19 ++++++--- servers/src/grin/seed.rs | 45 +++++++------------ servers/src/grin/sync/syncer.rs | 11 ++++- 6 files changed, 136 insertions(+), 93 deletions(-) diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index 7da51568da..9998675ed7 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -162,37 +162,24 @@ impl<'a> Response<'a> { pub const SEND_CHANNEL_CAP: usize = 10; -pub struct ConnHandle { - /// Channel to allow sending data through the connection - pub send_channel: mpsc::SyncSender>, +pub struct StopHandle { /// Channel to close the connection pub close_channel: mpsc::Sender<()>, // we need Option to take ownhership of the handle in stop() peer_thread: Option>, } -impl ConnHandle { - pub fn send(&self, body: T, msg_type: Type) -> Result - where - T: ser::Writeable, - { - let buf = write_to_buf(body, msg_type)?; - let buf_len = buf.len(); - self.send_channel.try_send(buf)?; - - // Increase sent bytes counter - //let mut sent_bytes = self.sent_bytes.write(); - //sent_bytes.inc(buf_len as u64); - - Ok(buf_len as u64) - } - +impl StopHandle { /// Schedule this connection to safely close via the async close_channel. - pub fn close(&mut self) { + pub fn stop(&self) { if self.close_channel.send(()).is_err() { debug!("peer's close_channel is disconnected, must be stopped already"); return; } + } + + pub fn stop_and_wait(&mut self) { + self.stop(); if let Some(peer_thread) = self.peer_thread.take() { // wait only if other thread is calling us, eg shutdown if thread::current().id() != peer_thread.thread().id() { @@ -210,6 +197,28 @@ impl ConnHandle { } } +pub struct ConnHandle { + /// Channel to allow sending data through the connection + pub send_channel: mpsc::SyncSender>, +} + +impl ConnHandle { + pub fn send(&self, body: T, msg_type: Type) -> Result + where + T: ser::Writeable, + { + let buf = write_to_buf(body, msg_type)?; + let buf_len = buf.len(); + self.send_channel.try_send(buf)?; + + // Increase sent bytes counter + //let mut sent_bytes = self.sent_bytes.write(); + //sent_bytes.inc(buf_len as u64); + + Ok(buf_len as u64) + } +} + pub struct Tracker { /// Bytes we've sent. pub sent_bytes: Arc>, @@ -247,7 +256,11 @@ impl Tracker { /// Start listening on the provided connection and wraps it. Does not hang /// the current thread, instead just returns a future and the Connection /// itself. -pub fn listen(stream: TcpStream, tracker: Arc, handler: H) -> io::Result +pub fn listen( + stream: TcpStream, + tracker: Arc, + handler: H, +) -> io::Result<(ConnHandle, StopHandle)> where H: MessageHandler, { @@ -259,11 +272,15 @@ where .expect("Non-blocking IO not available."); let peer_thread = poll(stream, handler, send_rx, close_rx, tracker)?; - Ok(ConnHandle { - send_channel: send_tx, - close_channel: close_tx, - peer_thread: Some(peer_thread), - }) + Ok(( + ConnHandle { + send_channel: send_tx, + }, + StopHandle { + close_channel: close_tx, + peer_thread: Some(peer_thread), + }, + )) } fn poll( @@ -310,7 +327,12 @@ where let maybe_data = retry_send.or_else(|_| send_rx.try_recv()); retry_send = Err(()); if let Ok(data) = maybe_data { - let written = try_break!(writer.write_all(&data[..]).map_err(&From::from)); + let written = try_break!(write_all( + &mut writer, + &data[..], + std::time::Duration::from_secs(10) + ) + .map_err(&From::from)); if written.is_none() { retry_send = Ok(data); } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index a291dec17b..62d17c84d0 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -44,7 +44,6 @@ const MAX_PEER_MSG_PER_MIN: u64 = 500; enum State { Connected, Banned, - Stopping, } pub struct Peer { @@ -53,7 +52,11 @@ pub struct Peer { // set of all hashes known to this peer (so no need to send) tracking_adapter: TrackingAdapter, tracker: Arc, - handle: Mutex, + send_handle: Mutex, + // we need a special lock for stop operation, can't reuse handle mutex for that + // because it may be locked by different reasons, so we should wait for that, close + // mutex can be taken only during shutdown, it happens once + stop_handle: Mutex, } impl fmt::Debug for Peer { @@ -69,13 +72,16 @@ impl Peer { let tracking_adapter = TrackingAdapter::new(adapter); let handler = Protocol::new(Arc::new(tracking_adapter.clone()), info.clone()); let tracker = Arc::new(conn::Tracker::new()); - let handle = Mutex::new(conn::listen(conn, tracker.clone(), handler)?); + let (sendh, stoph) = conn::listen(conn, tracker.clone(), handler)?; + let send_handle = Mutex::new(sendh); + let stop_handle = Mutex::new(stoph); Ok(Peer { info, state, tracking_adapter, tracker, - handle, + send_handle, + stop_handle, }) } @@ -171,11 +177,6 @@ impl Peer { State::Banned == *self.state.read() } - /// Whether this peer has been stopping. - pub fn is_stopping(&self) -> bool { - State::Banned == *self.state.read() - } - /// Whether this peer is stuck on sync. pub fn is_stuck(&self) -> (bool, Difficulty) { let peer_live_info = self.info.live_info.read(); @@ -218,17 +219,9 @@ impl Peer { *self.state.write() = State::Banned; } - /// Set this peer status to stopping - pub fn set_stopping(&self) { - *self.state.write() = State::Stopping; - } - /// Send a msg with given msg_type to our peer via the tracker. fn send(&self, msg: T, msg_type: Type) -> Result<(), Error> { - if self.is_stopping() { - return Err(Error::ConnectionClose); - } - let bytes = self.handle.lock().send(msg, msg_type)?; + let bytes = self.send_handle.lock().send(msg, msg_type)?; self.tracker.inc_sent(bytes); Ok(()) } @@ -394,10 +387,22 @@ impl Peer { ) } - /// Stops the peer, closing its tracker + /// Stops the peer pub fn stop(&self) { - self.set_stopping(); - self.handle.lock().close(); + debug!("Stopping peer without waiting {:?}", self.info.addr); + match self.stop_handle.try_lock() { + Some(handle) => handle.stop(), + None => error!("can't get stop lock for peer"), + } + } + + /// Stops the peer and wait until peer's thread exit + pub fn stop_and_wait(&self) { + debug!("Stopping peer {:?}", self.info.addr); + match self.stop_handle.try_lock() { + Some(mut handle) => handle.stop_and_wait(), + None => error!("can't get stop lock for peer"), + } } } diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 3fe18f7085..4e8d6c2b00 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -26,21 +26,22 @@ use crate::core::core; use crate::core::core::hash::{Hash, Hashed}; use crate::core::global; use crate::core::pow::Difficulty; -use chrono::prelude::*; -use chrono::Duration; - use crate::peer::Peer; use crate::store::{PeerData, PeerStore, State}; use crate::types::{ Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, TxHashSetRead, MAX_PEER_ADDRS, }; +use crate::util::StopState; +use chrono::prelude::*; +use chrono::Duration; pub struct Peers { pub adapter: Arc, store: PeerStore, peers: RwLock>>, config: P2PConfig, + stop_state: StopState, } impl Peers { @@ -50,6 +51,7 @@ impl Peers { store, config, peers: RwLock::new(HashMap::new()), + stop_state: StopState::new(), } } @@ -67,6 +69,9 @@ impl Peers { }; debug!("Saving newly connected peer {}.", peer_data.addr); self.save_peer(&peer_data)?; + if self.stop_state.is_stopped() { + return Err(Error::ConnectionClose); + } self.peers.write().insert(peer_data.addr, peer.clone()); Ok(()) @@ -224,6 +229,9 @@ impl Peers { Ok(_) => debug!("ban reason {:?} was sent to {}", ban_reason, peer_addr), }; peer.set_banned(); + if self.stop_state.is_stopped() { + return; + } peer.stop(); self.peers.write().remove(&peer.info.addr); } @@ -263,8 +271,10 @@ impl Peers { "Error sending {:?} to peer {:?}: {:?}", obj_name, &p.info.addr, e ); - p.stop(); - self.peers.write().remove(&p.info.addr); + if !self.stop_state.is_stopped() { + p.stop(); + self.peers.write().remove(&p.info.addr); + } } } @@ -330,8 +340,10 @@ impl Peers { for p in self.connected_peers().iter() { if let Err(e) = p.send_ping(total_difficulty, height) { debug!("Error pinging peer {:?}: {:?}", &p.info.addr, e); - p.stop(); - self.peers.write().remove(&p.info.addr); + if !self.stop_state.is_stopped() { + p.stop(); + self.peers.write().remove(&p.info.addr); + } } } } @@ -435,6 +447,9 @@ impl Peers { // now clean up peer map based on the list to remove { + if self.stop_state.is_stopped() { + return; + } let mut peers = self.peers.write(); for addr in rm { let _ = peers.get(&addr).map(|peer| peer.stop()); @@ -446,7 +461,7 @@ impl Peers { pub fn stop(&self) { let mut peers = self.peers.write(); for (_, peer) in peers.drain() { - peer.stop(); + peer.stop_and_wait(); } } diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 05c2862622..9bd5c90311 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -88,9 +88,13 @@ impl Server { if self.check_undesirable(&stream) { continue; } - if let Err(e) = self.handle_new_peer(stream) { - debug!("Error accepting peer {}: {:?}", peer_addr.to_string(), e); - let _ = self.peers.add_banned(peer_addr, ReasonForBan::BadHandshake); + match self.handle_new_peer(stream) { + Err(Error::ConnectionClose) => debug!("shutting down, ignoring a new peer"), + Err(e) => { + debug!("Error accepting peer {}: {:?}", peer_addr.to_string(), e); + let _ = self.peers.add_banned(peer_addr, ReasonForBan::BadHandshake); + } + Ok(_) => {} } } Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { @@ -110,7 +114,7 @@ impl Server { /// Asks the server to connect to a new peer. Directly returns the peer if /// we're already connected to the provided address. - pub fn connect(&self, addr: PeerAddr) -> Result<(Arc, bool), Error> { + pub fn connect(&self, addr: PeerAddr) -> Result, Error> { if self.stop_state.is_stopped() { return Err(Error::ConnectionClose); } @@ -132,7 +136,7 @@ impl Server { if let Some(p) = self.peers.get_connected_peer(addr) { // if we're already connected to the addr, just return the peer trace!("connect_peer: already connected {}", addr); - return Ok((p, false)); + return Ok(p); } trace!( @@ -156,7 +160,7 @@ impl Server { )?; let peer = Arc::new(peer); self.peers.add_connected(peer.clone())?; - Ok((peer, true)) + Ok(peer) } Err(e) => { trace!( @@ -172,6 +176,9 @@ impl Server { } fn handle_new_peer(&self, stream: TcpStream) -> Result<(), Error> { + if self.stop_state.is_stopped() { + return Err(Error::ConnectionClose); + } let total_diff = self.peers.total_difficulty()?; // accept the peer and add it to the server map diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs index 94319aafdb..5ccd9af99e 100644 --- a/servers/src/grin/seed.rs +++ b/servers/src/grin/seed.rs @@ -101,7 +101,7 @@ pub fn connect_and_monitor( // with exponential backoff if Utc::now() - prev > Duration::seconds(cmp::min(20, 1 << start_attempt)) { // try to connect to any address sent to the channel - let connecting_threads = listen_for_addrs( + listen_for_addrs( peers.clone(), p2p_server.clone(), capabilities, @@ -109,14 +109,6 @@ pub fn connect_and_monitor( &mut connecting_history, ); - debug!("waiting for connecting requests to finish"); - for th in connecting_threads { - if let Err(e) = th.join() { - error!("failed to join for a connecting request thread: {:?}", e); - } - } - debug!("done waiting for connecting requests to finish"); - // monitor additional peers if we need to add more monitor_peers( peers.clone(), @@ -296,17 +288,16 @@ fn listen_for_addrs( capab: p2p::Capabilities, rx: &mpsc::Receiver, connecting_history: &mut HashMap>, -) -> Vec> { +) { // Pull everything currently on the queue off the queue. // Does not block so addrs may be empty. // We will take(max_peers) from this later but we want to drain the rx queue // here to prevent it backing up. let addrs: Vec = rx.try_iter().collect(); - let mut threads = vec![]; // If we have a healthy number of outbound peers then we are done here. if peers.peer_count() > peers.peer_outbound_count() && peers.healthy_peers_mix() { - return threads; + return; } // Try to connect to (up to max peers) peer addresses. @@ -334,24 +325,19 @@ fn listen_for_addrs( let peers_c = peers.clone(); let p2p_c = p2p.clone(); - threads.push( - thread::Builder::new() - .name("peer_connect".to_string()) - .spawn(move || match p2p_c.connect(addr) { - Ok((p, true)) => { - if p.send_peer_request(capab).is_ok() { - let _ = peers_c.update_state(addr, p2p::State::Healthy); - } - } - Ok((_, false)) => { - debug!("peer_connect: peer is already connected"); + thread::Builder::new() + .name("peer_connect".to_string()) + .spawn(move || match p2p_c.connect(addr) { + Ok(p) => { + if p.send_peer_request(capab).is_ok() { + let _ = peers_c.update_state(addr, p2p::State::Healthy); } - Err(_) => { - let _ = peers_c.update_state(addr, p2p::State::Defunct); - } - }) - .expect("failed to launch peer_connect thread"), - ); + } + Err(_) => { + let _ = peers_c.update_state(addr, p2p::State::Defunct); + } + }) + .expect("failed to launch peer_connect thread"); } // shrink the connecting history. @@ -367,7 +353,6 @@ fn listen_for_addrs( connecting_history.remove(&addr); } } - threads } pub fn dns_seeds() -> Box Vec + Send> { diff --git a/servers/src/grin/sync/syncer.rs b/servers/src/grin/sync/syncer.rs index 98653a0c69..b32a571522 100644 --- a/servers/src/grin/sync/syncer.rs +++ b/servers/src/grin/sync/syncer.rs @@ -77,6 +77,9 @@ impl SyncRunner { let mut n = 0; const MIN_PEERS: usize = 3; loop { + if self.stop_state.is_stopped() { + break; + } let wp = self.peers.more_or_same_work_peers()?; // exit loop when: // * we have more than MIN_PEERS more_or_same_work peers @@ -167,7 +170,13 @@ impl SyncRunner { unwrap_or_restart_loop!(self.chain.compact()); } - thread::sleep(time::Duration::from_secs(10)); + // sllep for 10 secs but check stop signal every second + for _ in 1..10 { + thread::sleep(time::Duration::from_secs(1)); + if self.stop_state.is_stopped() { + break; + } + } continue; } From 4ea756caf3bcbb93d9536739459fd75f55dc388b Mon Sep 17 00:00:00 2001 From: hashmap Date: Fri, 10 May 2019 19:47:57 +0200 Subject: [PATCH 6/7] fix deadlock in peers object --- p2p/src/peers.rs | 159 +++++++++++++++++++++++++++++++---------------- 1 file changed, 104 insertions(+), 55 deletions(-) diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 4e8d6c2b00..f38708b35c 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -32,16 +32,16 @@ use crate::types::{ Capabilities, ChainAdapter, Error, NetAdapter, P2PConfig, PeerAddr, PeerInfo, ReasonForBan, TxHashSetRead, MAX_PEER_ADDRS, }; -use crate::util::StopState; use chrono::prelude::*; use chrono::Duration; +const LOCK_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(2); + pub struct Peers { pub adapter: Arc, store: PeerStore, peers: RwLock>>, config: P2PConfig, - stop_state: StopState, } impl Peers { @@ -51,13 +51,19 @@ impl Peers { store, config, peers: RwLock::new(HashMap::new()), - stop_state: StopState::new(), } } /// Adds the peer to our internal peer mapping. Note that the peer is still /// returned so the server can run it. pub fn add_connected(&self, peer: Arc) -> Result<(), Error> { + let mut peers = match self.peers.try_write_for(LOCK_TIMEOUT) { + Some(peers) => peers, + None => { + error!("failed to get peers lock"); + return Err(Error::Timeout); + } + }; let peer_data = PeerData { addr: peer.info.addr, capabilities: peer.info.capabilities, @@ -69,10 +75,7 @@ impl Peers { }; debug!("Saving newly connected peer {}.", peer_data.addr); self.save_peer(&peer_data)?; - if self.stop_state.is_stopped() { - return Err(Error::ConnectionClose); - } - self.peers.write().insert(peer_data.addr, peer.clone()); + peers.insert(peer_data.addr, peer.clone()); Ok(()) } @@ -94,14 +97,26 @@ impl Peers { } pub fn is_known(&self, addr: PeerAddr) -> bool { - self.peers.read().contains_key(&addr) + let peers = match self.peers.try_read_for(LOCK_TIMEOUT) { + Some(peers) => peers, + None => { + error!("failed to get peers lock"); + return false; + } + }; + peers.contains_key(&addr) } /// Get vec of peers we are currently connected to. pub fn connected_peers(&self) -> Vec> { - let mut res = self - .peers - .read() + let peers = match self.peers.try_read_for(LOCK_TIMEOUT) { + Some(peers) => peers, + None => { + error!("failed to get peers lock"); + return vec![]; + } + }; + let mut res = peers .values() .filter(|p| p.is_connected()) .cloned() @@ -119,7 +134,14 @@ impl Peers { /// Get a peer we're connected to by address. pub fn get_connected_peer(&self, addr: PeerAddr) -> Option> { - self.peers.read().get(&addr).map(|p| p.clone()) + let peers = match self.peers.try_read_for(LOCK_TIMEOUT) { + Some(peers) => peers, + None => { + error!("failed to get peers lock"); + return None; + } + }; + peers.get(&addr).map(|p| p.clone()) } /// Number of peers currently connected to. @@ -208,9 +230,7 @@ impl Peers { pub fn is_banned(&self, peer_addr: PeerAddr) -> bool { if let Ok(peer) = self.store.get_peer(peer_addr) { - if peer.flags == State::Banned { - return true; - } + return peer.flags == State::Banned; } false } @@ -219,6 +239,7 @@ impl Peers { pub fn ban_peer(&self, peer_addr: PeerAddr, ban_reason: ReasonForBan) { if let Err(e) = self.update_state(peer_addr, State::Banned) { error!("Couldn't ban {}: {:?}", peer_addr, e); + return; } if let Some(peer) = self.get_connected_peer(peer_addr) { @@ -229,11 +250,16 @@ impl Peers { Ok(_) => debug!("ban reason {:?} was sent to {}", ban_reason, peer_addr), }; peer.set_banned(); - if self.stop_state.is_stopped() { - return; - } peer.stop(); - self.peers.write().remove(&peer.info.addr); + + let mut peers = match self.peers.try_write_for(LOCK_TIMEOUT) { + Some(peers) => peers, + None => { + error!("failed to get peers lock"); + return; + } + }; + peers.remove(&peer.info.addr); } } @@ -271,10 +297,16 @@ impl Peers { "Error sending {:?} to peer {:?}: {:?}", obj_name, &p.info.addr, e ); - if !self.stop_state.is_stopped() { - p.stop(); - self.peers.write().remove(&p.info.addr); - } + + let mut peers = match self.peers.try_write_for(LOCK_TIMEOUT) { + Some(peers) => peers, + None => { + error!("failed to get peers lock"); + break; + } + }; + p.stop(); + peers.remove(&p.info.addr); } } @@ -340,10 +372,15 @@ impl Peers { for p in self.connected_peers().iter() { if let Err(e) = p.send_ping(total_difficulty, height) { debug!("Error pinging peer {:?}: {:?}", &p.info.addr, e); - if !self.stop_state.is_stopped() { - p.stop(); - self.peers.write().remove(&p.info.addr); - } + let mut peers = match self.peers.try_write_for(LOCK_TIMEOUT) { + Some(peers) => peers, + None => { + error!("failed to get peers lock"); + break; + } + }; + p.stop(); + peers.remove(&p.info.addr); } } } @@ -399,33 +436,42 @@ impl Peers { let mut rm = vec![]; // build a list of peers to be cleaned up - for peer in self.peers.read().values() { - if peer.is_banned() { - debug!("clean_peers {:?}, peer banned", peer.info.addr); - rm.push(peer.info.addr.clone()); - } else if !peer.is_connected() { - debug!("clean_peers {:?}, not connected", peer.info.addr); - rm.push(peer.info.addr.clone()); - } else if peer.is_abusive() { - if let Some(counts) = peer.last_min_message_counts() { - debug!( - "clean_peers {:?}, abusive ({} sent, {} recv)", - peer.info.addr, counts.0, counts.1, - ); + { + let peers = match self.peers.try_read_for(LOCK_TIMEOUT) { + Some(peers) => peers, + None => { + error!("clean_peers: can't get peers lock"); + return; } - let _ = self.update_state(peer.info.addr, State::Banned); - rm.push(peer.info.addr.clone()); - } else { - let (stuck, diff) = peer.is_stuck(); - match self.adapter.total_difficulty() { - Ok(total_difficulty) => { - if stuck && diff < total_difficulty { - debug!("clean_peers {:?}, stuck peer", peer.info.addr); - let _ = self.update_state(peer.info.addr, State::Defunct); - rm.push(peer.info.addr.clone()); + }; + for peer in peers.values() { + if peer.is_banned() { + debug!("clean_peers {:?}, peer banned", peer.info.addr); + rm.push(peer.info.addr.clone()); + } else if !peer.is_connected() { + debug!("clean_peers {:?}, not connected", peer.info.addr); + rm.push(peer.info.addr.clone()); + } else if peer.is_abusive() { + if let Some(counts) = peer.last_min_message_counts() { + debug!( + "clean_peers {:?}, abusive ({} sent, {} recv)", + peer.info.addr, counts.0, counts.1, + ); + } + let _ = self.update_state(peer.info.addr, State::Banned); + rm.push(peer.info.addr.clone()); + } else { + let (stuck, diff) = peer.is_stuck(); + match self.adapter.total_difficulty() { + Ok(total_difficulty) => { + if stuck && diff < total_difficulty { + debug!("clean_peers {:?}, stuck peer", peer.info.addr); + let _ = self.update_state(peer.info.addr, State::Defunct); + rm.push(peer.info.addr.clone()); + } } + Err(e) => error!("failed to get total difficulty: {:?}", e), } - Err(e) => error!("failed to get total difficulty: {:?}", e), } } } @@ -447,10 +493,13 @@ impl Peers { // now clean up peer map based on the list to remove { - if self.stop_state.is_stopped() { - return; - } - let mut peers = self.peers.write(); + let mut peers = match self.peers.try_write_for(LOCK_TIMEOUT) { + Some(peers) => peers, + None => { + error!("failed to get peers lock"); + return; + } + }; for addr in rm { let _ = peers.get(&addr).map(|peer| peer.stop()); peers.remove(&addr); From 235ebdabbc652594fd92512ddd50d35526cd2339 Mon Sep 17 00:00:00 2001 From: hashmap Date: Fri, 10 May 2019 20:13:40 +0200 Subject: [PATCH 7/7] add missing test fixes --- chain/tests/data_file_integrity.rs | 4 +--- chain/tests/mine_simple_chain.rs | 2 -- chain/tests/test_coinbase_maturity.rs | 1 - p2p/src/conn.rs | 5 ----- p2p/src/peer.rs | 4 ++-- p2p/src/peers.rs | 16 ++++++++-------- servers/src/grin/sync/syncer.rs | 2 +- 7 files changed, 12 insertions(+), 22 deletions(-) diff --git a/chain/tests/data_file_integrity.rs b/chain/tests/data_file_integrity.rs index e07fab839e..f4f90cce53 100644 --- a/chain/tests/data_file_integrity.rs +++ b/chain/tests/data_file_integrity.rs @@ -21,7 +21,7 @@ use self::core::libtx; use self::core::pow::{self, Difficulty}; use self::core::{consensus, genesis}; use self::keychain::{ExtKeychain, ExtKeychainPath, Keychain}; -use self::util::{RwLock, StopState}; +use self::util::RwLock; use chrono::Duration; use grin_chain as chain; use grin_core as core; @@ -47,7 +47,6 @@ fn setup(dir_name: &str) -> Chain { pow::verify_size, verifier_cache, false, - Arc::new(StopState::new()), ) .unwrap() } @@ -61,7 +60,6 @@ fn reload_chain(dir_name: &str) -> Chain { pow::verify_size, verifier_cache, false, - Arc::new(StopState::new()), ) .unwrap() } diff --git a/chain/tests/mine_simple_chain.rs b/chain/tests/mine_simple_chain.rs index 2961928414..80da3b0ef3 100644 --- a/chain/tests/mine_simple_chain.rs +++ b/chain/tests/mine_simple_chain.rs @@ -47,7 +47,6 @@ fn setup(dir_name: &str, genesis: Block) -> Chain { pow::verify_size, verifier_cache, false, - Arc::new(StopState::new()), ) .unwrap() } @@ -565,7 +564,6 @@ fn actual_diff_iter_output() { pow::verify_size, verifier_cache, false, - Arc::new(StopState::new()), ) .unwrap(); let iter = chain.difficulty_iter().unwrap(); diff --git a/chain/tests/test_coinbase_maturity.rs b/chain/tests/test_coinbase_maturity.rs index 2e7c607e36..540c6ef8df 100644 --- a/chain/tests/test_coinbase_maturity.rs +++ b/chain/tests/test_coinbase_maturity.rs @@ -53,7 +53,6 @@ fn test_coinbase_maturity() { pow::verify_size, verifier_cache, false, - Arc::new(StopState::new()), ) .unwrap(); diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index 9998675ed7..2d5510bfcf 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -210,11 +210,6 @@ impl ConnHandle { let buf = write_to_buf(body, msg_type)?; let buf_len = buf.len(); self.send_channel.try_send(buf)?; - - // Increase sent bytes counter - //let mut sent_bytes = self.sent_bytes.write(); - //sent_bytes.inc(buf_len as u64); - Ok(buf_len as u64) } } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 62d17c84d0..83bb43b2e0 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -162,7 +162,7 @@ impl Peer { } } - // default to allowing peer tracker if we do not explicitly allow or deny + // default to allowing peer connection if we do not explicitly allow or deny // the peer false } @@ -219,7 +219,7 @@ impl Peer { *self.state.write() = State::Banned; } - /// Send a msg with given msg_type to our peer via the tracker. + /// Send a msg with given msg_type to our peer via the connection. fn send(&self, msg: T, msg_type: Type) -> Result<(), Error> { let bytes = self.send_handle.lock().send(msg, msg_type)?; self.tracker.inc_sent(bytes); diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index f38708b35c..cba04e8e41 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -60,7 +60,7 @@ impl Peers { let mut peers = match self.peers.try_write_for(LOCK_TIMEOUT) { Some(peers) => peers, None => { - error!("failed to get peers lock"); + error!("add_connected: failed to get peers lock"); return Err(Error::Timeout); } }; @@ -100,7 +100,7 @@ impl Peers { let peers = match self.peers.try_read_for(LOCK_TIMEOUT) { Some(peers) => peers, None => { - error!("failed to get peers lock"); + error!("is_known: failed to get peers lock"); return false; } }; @@ -112,7 +112,7 @@ impl Peers { let peers = match self.peers.try_read_for(LOCK_TIMEOUT) { Some(peers) => peers, None => { - error!("failed to get peers lock"); + error!("connected_peers: failed to get peers lock"); return vec![]; } }; @@ -137,7 +137,7 @@ impl Peers { let peers = match self.peers.try_read_for(LOCK_TIMEOUT) { Some(peers) => peers, None => { - error!("failed to get peers lock"); + error!("get_connected_peer: failed to get peers lock"); return None; } }; @@ -255,7 +255,7 @@ impl Peers { let mut peers = match self.peers.try_write_for(LOCK_TIMEOUT) { Some(peers) => peers, None => { - error!("failed to get peers lock"); + error!("ban_peer: failed to get peers lock"); return; } }; @@ -301,7 +301,7 @@ impl Peers { let mut peers = match self.peers.try_write_for(LOCK_TIMEOUT) { Some(peers) => peers, None => { - error!("failed to get peers lock"); + error!("broadcast: failed to get peers lock"); break; } }; @@ -375,7 +375,7 @@ impl Peers { let mut peers = match self.peers.try_write_for(LOCK_TIMEOUT) { Some(peers) => peers, None => { - error!("failed to get peers lock"); + error!("check_all: failed to get peers lock"); break; } }; @@ -496,7 +496,7 @@ impl Peers { let mut peers = match self.peers.try_write_for(LOCK_TIMEOUT) { Some(peers) => peers, None => { - error!("failed to get peers lock"); + error!("clean_peers: failed to get peers lock"); return; } }; diff --git a/servers/src/grin/sync/syncer.rs b/servers/src/grin/sync/syncer.rs index b32a571522..3ef061b7e5 100644 --- a/servers/src/grin/sync/syncer.rs +++ b/servers/src/grin/sync/syncer.rs @@ -170,7 +170,7 @@ impl SyncRunner { unwrap_or_restart_loop!(self.chain.compact()); } - // sllep for 10 secs but check stop signal every second + // sleep for 10 secs but check stop signal every second for _ in 1..10 { thread::sleep(time::Duration::from_secs(1)); if self.stop_state.is_stopped() {