Skip to content

Commit

Permalink
remove the error_channel and simplify how we close peer connections (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
antiochp authored May 3, 2019
1 parent d3b4526 commit 6c54c90
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 84 deletions.
34 changes: 14 additions & 20 deletions p2p/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ pub trait MessageHandler: Send + 'static {
// Macro to simplify the boilerplate around async I/O error handling,
// especially with WouldBlock kind of errors.
macro_rules! try_break {
($chan:ident, $inner:expr) => {
($inner:expr) => {
match $inner {
Ok(v) => Some(v),
Err(Error::Connection(ref e)) if e.kind() == io::ErrorKind::WouldBlock => None,
Err(Error::Store(_))
| Err(Error::Chain(_))
| Err(Error::Internal)
| Err(Error::NoDandelionRelay) => None,
Err(e) => {
let _ = $chan.send(e);
Err(ref e) => {
debug!("try_break: exit the loop: {:?}", e);
break;
}
}
Expand Down Expand Up @@ -171,8 +171,6 @@ pub struct Tracker {
pub send_channel: mpsc::SyncSender<Vec<u8>>,
/// Channel to close the connection
pub close_channel: mpsc::Sender<()>,
/// Channel to check for errors on the connection
pub error_channel: mpsc::Receiver<Error>,
}

impl Tracker {
Expand Down Expand Up @@ -201,7 +199,6 @@ where
{
let (send_tx, send_rx) = mpsc::sync_channel(SEND_CHANNEL_CAP);
let (close_tx, close_rx) = mpsc::channel();
let (error_tx, error_rx) = mpsc::channel();

// Counter of number of bytes received
let received_bytes = Arc::new(RwLock::new(RateCounter::new()));
Expand All @@ -215,7 +212,6 @@ where
stream,
handler,
send_rx,
error_tx,
close_rx,
received_bytes.clone(),
sent_bytes.clone(),
Expand All @@ -226,15 +222,13 @@ where
received_bytes: received_bytes.clone(),
send_channel: send_tx,
close_channel: close_tx,
error_channel: error_rx,
}
}

fn poll<H>(
conn: TcpStream,
handler: H,
send_rx: mpsc::Receiver<Vec<u8>>,
error_tx: mpsc::Sender<Error>,
close_rx: mpsc::Receiver<()>,
received_bytes: Arc<RwLock<RateCounter>>,
sent_bytes: Arc<RwLock<RateCounter>>,
Expand All @@ -252,7 +246,7 @@ fn poll<H>(
let mut retry_send = Err(());
loop {
// check the read end
if let Some(h) = try_break!(error_tx, read_header(&mut reader, None)) {
if let Some(h) = try_break!(read_header(&mut reader, None)) {
let msg = Message::from_header(h, &mut reader);

trace!(
Expand All @@ -269,36 +263,36 @@ fn poll<H>(
}

if let Some(Some(resp)) =
try_break!(error_tx, handler.consume(msg, &mut writer, received))
try_break!(handler.consume(msg, &mut writer, received))
{
try_break!(error_tx, resp.write(sent_bytes.clone()));
try_break!(resp.write(sent_bytes.clone()));
}
}

// check the write end, use or_else so try_recv is lazily eval'd
let maybe_data = retry_send.or_else(|_| send_rx.try_recv());
retry_send = Err(());
if let Ok(data) = maybe_data {
let written =
try_break!(error_tx, writer.write_all(&data[..]).map_err(&From::from));
let written = try_break!(writer.write_all(&data[..]).map_err(&From::from));
if written.is_none() {
retry_send = Ok(data);
}
}

// check the close channel
if let Ok(_) = close_rx.try_recv() {
debug!(
"Connection close with {} initiated by us",
conn.peer_addr()
.map(|a| a.to_string())
.unwrap_or("?".to_owned())
);
break;
}

thread::sleep(sleep_time);
}

debug!(
"Shutting down connection with {}",
conn.peer_addr()
.map(|a| a.to_string())
.unwrap_or("?".to_owned())
);
let _ = conn.shutdown(Shutdown::Both);
});
}
65 changes: 6 additions & 59 deletions p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,7 @@ const MAX_PEER_MSG_PER_MIN: u64 = 500;
/// For example: 'Disconnected' state here could still be 'Healthy' and could reconnect in next loop.
enum State {
Connected,
Disconnected,
Banned,
// Banned from Peers side, by ban_peer().
// This could happen when error in block (or compact block) received, header(s) received,
// or txhashset received.
}

pub struct Peer {
Expand Down Expand Up @@ -173,9 +169,12 @@ impl Peer {
false
}

/// Whether this peer is still connected.
/// Whether this peer is currently connected.
pub fn is_connected(&self) -> bool {
self.check_connection()
if self.connection.is_none() {
return false;
}
State::Connected == *self.state.read()
}

/// Whether this peer has been banned.
Expand Down Expand Up @@ -409,61 +408,9 @@ impl Peer {
/// Stops the peer, closing its connection
pub fn stop(&self) {
if let Some(conn) = self.connection.as_ref() {
stop_with_connection(&conn.lock());
let _ = conn.lock().close_channel.send(());
}
}

fn check_connection(&self) -> bool {
let connection = match self.connection.as_ref() {
Some(conn) => conn.lock(),
None => return false,
};
match connection.error_channel.try_recv() {
Ok(Error::Serialization(e)) => {
let need_stop = {
let mut state = self.state.write();
if State::Banned != *state {
*state = State::Disconnected;
true
} else {
false
}
};
if need_stop {
debug!(
"Client {} corrupted, will disconnect ({:?}).",
self.info.addr, e
);
stop_with_connection(&connection);
}
false
}
Ok(e) => {
let need_stop = {
let mut state = self.state.write();
if State::Disconnected != *state {
*state = State::Disconnected;
true
} else {
false
}
};
if need_stop {
debug!("Client {} connection lost: {:?}", self.info.addr, e);
stop_with_connection(&connection);
}
false
}
Err(_) => {
let state = self.state.read();
State::Connected == *state
}
}
}
}

fn stop_with_connection(connection: &conn::Tracker) {
let _ = connection.close_channel.send(());
}

/// Adapter implementation that forwards everything to an underlying adapter
Expand Down
19 changes: 14 additions & 5 deletions p2p/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ impl Peers {
};
peer.set_banned();
peer.stop();
self.peers.write().remove(&peer.info.addr);
}
}

Expand Down Expand Up @@ -257,7 +258,14 @@ impl Peers {
match inner(&p) {
Ok(true) => count += 1,
Ok(false) => (),
Err(e) => debug!("Error sending {} to peer: {:?}", obj_name, e),
Err(e) => {
debug!(
"Error sending {:?} to peer {:?}: {:?}",
obj_name, &p.info.addr, e
);
p.stop();
self.peers.write().remove(&p.info.addr);
}
}

if count >= num_peers {
Expand Down Expand Up @@ -319,10 +327,11 @@ impl Peers {
/// Ping all our connected peers. Always automatically expects a pong back
/// or disconnects. This acts as a liveness test.
pub fn check_all(&self, total_difficulty: Difficulty, height: u64) {
let peers_map = self.peers.read();
for p in peers_map.values() {
if p.is_connected() {
let _ = p.send_ping(total_difficulty, height);
for p in self.connected_peers().iter() {
if let Err(e) = p.send_ping(total_difficulty, height) {
debug!("Error pinging peer {:?}: {:?}", &p.info.addr, e);
p.stop();
self.peers.write().remove(&p.info.addr);
}
}
}
Expand Down

0 comments on commit 6c54c90

Please sign in to comment.