Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove error_channel and simplify how we close peer connections #2796

Merged
merged 1 commit into from
May 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 14 additions & 20 deletions p2p/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ pub trait MessageHandler: Send + 'static {
// Macro to simplify the boilerplate around async I/O error handling,
// especially with WouldBlock kind of errors.
macro_rules! try_break {
($chan:ident, $inner:expr) => {
($inner:expr) => {
match $inner {
Ok(v) => Some(v),
Err(Error::Connection(ref e)) if e.kind() == io::ErrorKind::WouldBlock => None,
Err(Error::Store(_))
| Err(Error::Chain(_))
| Err(Error::Internal)
| Err(Error::NoDandelionRelay) => None,
Err(e) => {
let _ = $chan.send(e);
Err(ref e) => {
debug!("try_break: exit the loop: {:?}", e);
break;
}
}
Expand Down Expand Up @@ -171,8 +171,6 @@ pub struct Tracker {
pub send_channel: mpsc::SyncSender<Vec<u8>>,
/// Channel to close the connection
pub close_channel: mpsc::Sender<()>,
/// Channel to check for errors on the connection
pub error_channel: mpsc::Receiver<Error>,
}

impl Tracker {
Expand Down Expand Up @@ -201,7 +199,6 @@ where
{
let (send_tx, send_rx) = mpsc::sync_channel(SEND_CHANNEL_CAP);
let (close_tx, close_rx) = mpsc::channel();
let (error_tx, error_rx) = mpsc::channel();

// Counter of number of bytes received
let received_bytes = Arc::new(RwLock::new(RateCounter::new()));
Expand All @@ -215,7 +212,6 @@ where
stream,
handler,
send_rx,
error_tx,
close_rx,
received_bytes.clone(),
sent_bytes.clone(),
Expand All @@ -226,15 +222,13 @@ where
received_bytes: received_bytes.clone(),
send_channel: send_tx,
close_channel: close_tx,
error_channel: error_rx,
}
}

fn poll<H>(
conn: TcpStream,
handler: H,
send_rx: mpsc::Receiver<Vec<u8>>,
error_tx: mpsc::Sender<Error>,
close_rx: mpsc::Receiver<()>,
received_bytes: Arc<RwLock<RateCounter>>,
sent_bytes: Arc<RwLock<RateCounter>>,
Expand All @@ -252,7 +246,7 @@ fn poll<H>(
let mut retry_send = Err(());
loop {
// check the read end
if let Some(h) = try_break!(error_tx, read_header(&mut reader, None)) {
if let Some(h) = try_break!(read_header(&mut reader, None)) {
let msg = Message::from_header(h, &mut reader);

trace!(
Expand All @@ -269,36 +263,36 @@ fn poll<H>(
}

if let Some(Some(resp)) =
try_break!(error_tx, handler.consume(msg, &mut writer, received))
try_break!(handler.consume(msg, &mut writer, received))
{
try_break!(error_tx, resp.write(sent_bytes.clone()));
try_break!(resp.write(sent_bytes.clone()));
}
}

// check the write end, use or_else so try_recv is lazily eval'd
let maybe_data = retry_send.or_else(|_| send_rx.try_recv());
retry_send = Err(());
if let Ok(data) = maybe_data {
let written =
try_break!(error_tx, writer.write_all(&data[..]).map_err(&From::from));
let written = try_break!(writer.write_all(&data[..]).map_err(&From::from));
if written.is_none() {
retry_send = Ok(data);
}
}

// check the close channel
if let Ok(_) = close_rx.try_recv() {
debug!(
"Connection close with {} initiated by us",
conn.peer_addr()
.map(|a| a.to_string())
.unwrap_or("?".to_owned())
);
break;
}

thread::sleep(sleep_time);
}

debug!(
"Shutting down connection with {}",
conn.peer_addr()
.map(|a| a.to_string())
.unwrap_or("?".to_owned())
);
let _ = conn.shutdown(Shutdown::Both);
});
}
65 changes: 6 additions & 59 deletions p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,7 @@ const MAX_PEER_MSG_PER_MIN: u64 = 500;
/// For example: 'Disconnected' state here could still be 'Healthy' and could reconnect in next loop.
enum State {
Connected,
Disconnected,
Banned,
// Banned from Peers side, by ban_peer().
// This could happen when error in block (or compact block) received, header(s) received,
// or txhashset received.
}

pub struct Peer {
Expand Down Expand Up @@ -173,9 +169,12 @@ impl Peer {
false
}

/// Whether this peer is still connected.
/// Whether this peer is currently connected.
pub fn is_connected(&self) -> bool {
self.check_connection()
if self.connection.is_none() {
hashmap marked this conversation as resolved.
Show resolved Hide resolved
return false;
}
State::Connected == *self.state.read()
}

/// Whether this peer has been banned.
Expand Down Expand Up @@ -409,61 +408,9 @@ impl Peer {
/// Stops the peer, closing its connection
pub fn stop(&self) {
if let Some(conn) = self.connection.as_ref() {
stop_with_connection(&conn.lock());
let _ = conn.lock().close_channel.send(());
}
}

fn check_connection(&self) -> bool {
let connection = match self.connection.as_ref() {
Some(conn) => conn.lock(),
None => return false,
};
match connection.error_channel.try_recv() {
Ok(Error::Serialization(e)) => {
let need_stop = {
let mut state = self.state.write();
if State::Banned != *state {
*state = State::Disconnected;
true
} else {
false
}
};
if need_stop {
debug!(
"Client {} corrupted, will disconnect ({:?}).",
self.info.addr, e
);
stop_with_connection(&connection);
}
false
}
Ok(e) => {
let need_stop = {
let mut state = self.state.write();
if State::Disconnected != *state {
*state = State::Disconnected;
true
} else {
false
}
};
if need_stop {
debug!("Client {} connection lost: {:?}", self.info.addr, e);
stop_with_connection(&connection);
}
false
}
Err(_) => {
let state = self.state.read();
State::Connected == *state
}
}
}
}

fn stop_with_connection(connection: &conn::Tracker) {
let _ = connection.close_channel.send(());
}

/// Adapter implementation that forwards everything to an underlying adapter
Expand Down
19 changes: 14 additions & 5 deletions p2p/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ impl Peers {
};
peer.set_banned();
peer.stop();
self.peers.write().remove(&peer.info.addr);
}
}

Expand Down Expand Up @@ -256,7 +257,14 @@ impl Peers {
match inner(&p) {
Ok(true) => count += 1,
Ok(false) => (),
Err(e) => debug!("Error sending {} to peer: {:?}", obj_name, e),
Err(e) => {
debug!(
"Error sending {:?} to peer {:?}: {:?}",
obj_name, &p.info.addr, e
);
p.stop();
self.peers.write().remove(&p.info.addr);
hashmap marked this conversation as resolved.
Show resolved Hide resolved
}
}

if count >= num_peers {
Expand Down Expand Up @@ -318,10 +326,11 @@ impl Peers {
/// Ping all our connected peers. Always automatically expects a pong back
/// or disconnects. This acts as a liveness test.
pub fn check_all(&self, total_difficulty: Difficulty, height: u64) {
let peers_map = self.peers.read();
for p in peers_map.values() {
if p.is_connected() {
let _ = p.send_ping(total_difficulty, height);
for p in self.connected_peers().iter() {
if let Err(e) = p.send_ping(total_difficulty, height) {
debug!("Error pinging peer {:?}: {:?}", &p.info.addr, e);
p.stop();
self.peers.write().remove(&p.info.addr);
}
}
}
Expand Down