diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index f105c4db6e..8a0177cfe2 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -47,7 +47,7 @@ pub trait MessageHandler: Send + 'static { // Macro to simplify the boilerplate around async I/O error handling, // especially with WouldBlock kind of errors. macro_rules! try_break { - ($chan:ident, $inner:expr) => { + ($inner:expr) => { match $inner { Ok(v) => Some(v), Err(Error::Connection(ref e)) if e.kind() == io::ErrorKind::WouldBlock => None, @@ -55,8 +55,8 @@ macro_rules! try_break { | Err(Error::Chain(_)) | Err(Error::Internal) | Err(Error::NoDandelionRelay) => None, - Err(e) => { - let _ = $chan.send(e); + Err(ref e) => { + debug!("try_break: exit the loop: {:?}", e); break; } } @@ -171,8 +171,6 @@ pub struct Tracker { pub send_channel: mpsc::SyncSender>, /// Channel to close the connection pub close_channel: mpsc::Sender<()>, - /// Channel to check for errors on the connection - pub error_channel: mpsc::Receiver, } impl Tracker { @@ -201,7 +199,6 @@ where { let (send_tx, send_rx) = mpsc::sync_channel(SEND_CHANNEL_CAP); let (close_tx, close_rx) = mpsc::channel(); - let (error_tx, error_rx) = mpsc::channel(); // Counter of number of bytes received let received_bytes = Arc::new(RwLock::new(RateCounter::new())); @@ -215,7 +212,6 @@ where stream, handler, send_rx, - error_tx, close_rx, received_bytes.clone(), sent_bytes.clone(), @@ -226,7 +222,6 @@ where received_bytes: received_bytes.clone(), send_channel: send_tx, close_channel: close_tx, - error_channel: error_rx, } } @@ -234,7 +229,6 @@ fn poll( conn: TcpStream, handler: H, send_rx: mpsc::Receiver>, - error_tx: mpsc::Sender, close_rx: mpsc::Receiver<()>, received_bytes: Arc>, sent_bytes: Arc>, @@ -252,7 +246,7 @@ fn poll( let mut retry_send = Err(()); loop { // check the read end - if let Some(h) = try_break!(error_tx, read_header(&mut reader, None)) { + if let Some(h) = try_break!(read_header(&mut reader, None)) { let msg = Message::from_header(h, &mut reader); trace!( @@ -269,9 +263,9 @@ fn poll( } if let Some(Some(resp)) = - try_break!(error_tx, handler.consume(msg, &mut writer, received)) + try_break!(handler.consume(msg, &mut writer, received)) { - try_break!(error_tx, resp.write(sent_bytes.clone())); + try_break!(resp.write(sent_bytes.clone())); } } @@ -279,8 +273,7 @@ fn poll( let maybe_data = retry_send.or_else(|_| send_rx.try_recv()); retry_send = Err(()); if let Ok(data) = maybe_data { - let written = - try_break!(error_tx, writer.write_all(&data[..]).map_err(&From::from)); + let written = try_break!(writer.write_all(&data[..]).map_err(&From::from)); if written.is_none() { retry_send = Ok(data); } @@ -288,17 +281,18 @@ fn poll( // check the close channel if let Ok(_) = close_rx.try_recv() { - debug!( - "Connection close with {} initiated by us", - conn.peer_addr() - .map(|a| a.to_string()) - .unwrap_or("?".to_owned()) - ); break; } thread::sleep(sleep_time); } + + debug!( + "Shutting down connection with {}", + conn.peer_addr() + .map(|a| a.to_string()) + .unwrap_or("?".to_owned()) + ); let _ = conn.shutdown(Shutdown::Both); }); } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 86b28591db..2a302db01e 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -42,11 +42,7 @@ const MAX_PEER_MSG_PER_MIN: u64 = 500; /// For example: 'Disconnected' state here could still be 'Healthy' and could reconnect in next loop. enum State { Connected, - Disconnected, Banned, - // Banned from Peers side, by ban_peer(). - // This could happen when error in block (or compact block) received, header(s) received, - // or txhashset received. } pub struct Peer { @@ -173,9 +169,12 @@ impl Peer { false } - /// Whether this peer is still connected. + /// Whether this peer is currently connected. pub fn is_connected(&self) -> bool { - self.check_connection() + if self.connection.is_none() { + return false; + } + State::Connected == *self.state.read() } /// Whether this peer has been banned. @@ -409,61 +408,9 @@ impl Peer { /// Stops the peer, closing its connection pub fn stop(&self) { if let Some(conn) = self.connection.as_ref() { - stop_with_connection(&conn.lock()); + let _ = conn.lock().close_channel.send(()); } } - - fn check_connection(&self) -> bool { - let connection = match self.connection.as_ref() { - Some(conn) => conn.lock(), - None => return false, - }; - match connection.error_channel.try_recv() { - Ok(Error::Serialization(e)) => { - let need_stop = { - let mut state = self.state.write(); - if State::Banned != *state { - *state = State::Disconnected; - true - } else { - false - } - }; - if need_stop { - debug!( - "Client {} corrupted, will disconnect ({:?}).", - self.info.addr, e - ); - stop_with_connection(&connection); - } - false - } - Ok(e) => { - let need_stop = { - let mut state = self.state.write(); - if State::Disconnected != *state { - *state = State::Disconnected; - true - } else { - false - } - }; - if need_stop { - debug!("Client {} connection lost: {:?}", self.info.addr, e); - stop_with_connection(&connection); - } - false - } - Err(_) => { - let state = self.state.read(); - State::Connected == *state - } - } - } -} - -fn stop_with_connection(connection: &conn::Tracker) { - let _ = connection.close_channel.send(()); } /// Adapter implementation that forwards everything to an underlying adapter diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 6b5986b6be..04252cda4c 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -224,6 +224,7 @@ impl Peers { }; peer.set_banned(); peer.stop(); + self.peers.write().remove(&peer.info.addr); } } @@ -256,7 +257,14 @@ impl Peers { match inner(&p) { Ok(true) => count += 1, Ok(false) => (), - Err(e) => debug!("Error sending {} to peer: {:?}", obj_name, e), + Err(e) => { + debug!( + "Error sending {:?} to peer {:?}: {:?}", + obj_name, &p.info.addr, e + ); + p.stop(); + self.peers.write().remove(&p.info.addr); + } } if count >= num_peers { @@ -318,10 +326,11 @@ impl Peers { /// Ping all our connected peers. Always automatically expects a pong back /// or disconnects. This acts as a liveness test. pub fn check_all(&self, total_difficulty: Difficulty, height: u64) { - let peers_map = self.peers.read(); - for p in peers_map.values() { - if p.is_connected() { - let _ = p.send_ping(total_difficulty, height); + for p in self.connected_peers().iter() { + if let Err(e) = p.send_ping(total_difficulty, height) { + debug!("Error pinging peer {:?}: {:?}", &p.info.addr, e); + p.stop(); + self.peers.write().remove(&p.info.addr); } } }