Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce usage of unwrap in p2p crate #2627

Merged
merged 1 commit into from
Feb 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions p2p/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,18 @@ impl<'a> Response<'a> {
resp_type: Type,
body: T,
stream: &'a mut dyn Write,
) -> Response<'a> {
let body = ser::ser_vec(&body).unwrap();
Response {
) -> Result<Response<'a>, Error> {
let body = ser::ser_vec(&body)?;
Ok(Response {
resp_type,
body,
stream,
attachment: None,
}
})
}

fn write(mut self, sent_bytes: Arc<RwLock<RateCounter>>) -> Result<(), Error> {
let mut msg =
ser::ser_vec(&MsgHeader::new(self.resp_type, self.body.len() as u64)).unwrap();
let mut msg = ser::ser_vec(&MsgHeader::new(self.resp_type, self.body.len() as u64))?;
msg.append(&mut self.body);
write_all(&mut self.stream, &msg[..], time::Duration::from_secs(10))?;
// Increase sent bytes counter
Expand Down Expand Up @@ -177,7 +176,7 @@ impl Tracker {
where
T: ser::Writeable,
{
let buf = write_to_buf(body, msg_type);
let buf = write_to_buf(body, msg_type)?;
let buf_len = buf.len();
self.send_channel.try_send(buf)?;

Expand Down
38 changes: 19 additions & 19 deletions p2p/src/msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,26 +160,26 @@ pub fn read_message<T: Readable>(stream: &mut dyn Read, msg_type: Type) -> Resul
read_body(&header, stream)
}

pub fn write_to_buf<T: Writeable>(msg: T, msg_type: Type) -> Vec<u8> {
pub fn write_to_buf<T: Writeable>(msg: T, msg_type: Type) -> Result<Vec<u8>, Error> {
// prepare the body first so we know its serialized length
let mut body_buf = vec![];
ser::serialize(&mut body_buf, &msg).unwrap();
ser::serialize(&mut body_buf, &msg)?;

// build and serialize the header using the body size
let mut msg_buf = vec![];
let blen = body_buf.len() as u64;
ser::serialize(&mut msg_buf, &MsgHeader::new(msg_type, blen)).unwrap();
ser::serialize(&mut msg_buf, &MsgHeader::new(msg_type, blen))?;
msg_buf.append(&mut body_buf);

msg_buf
Ok(msg_buf)
}

pub fn write_message<T: Writeable>(
stream: &mut dyn Write,
msg: T,
msg_type: Type,
) -> Result<(), Error> {
let buf = write_to_buf(msg, msg_type);
let buf = write_to_buf(msg, msg_type)?;
stream.write_all(&buf[..])?;
Ok(())
}
Expand Down Expand Up @@ -268,11 +268,11 @@ impl Writeable for Hand {
[write_u32, self.capabilities.bits()],
[write_u64, self.nonce]
);
self.total_difficulty.write(writer).unwrap();
self.sender_addr.write(writer).unwrap();
self.receiver_addr.write(writer).unwrap();
writer.write_bytes(&self.user_agent).unwrap();
self.genesis.write(writer).unwrap();
self.total_difficulty.write(writer)?;
self.sender_addr.write(writer)?;
self.receiver_addr.write(writer)?;
writer.write_bytes(&self.user_agent)?;
self.genesis.write(writer)?;
Ok(())
}
}
Expand Down Expand Up @@ -323,9 +323,9 @@ impl Writeable for Shake {
[write_u32, self.version],
[write_u32, self.capabilities.bits()]
);
self.total_difficulty.write(writer).unwrap();
writer.write_bytes(&self.user_agent).unwrap();
self.genesis.write(writer).unwrap();
self.total_difficulty.write(writer)?;
writer.write_bytes(&self.user_agent)?;
self.genesis.write(writer)?;
Ok(())
}
}
Expand Down Expand Up @@ -379,7 +379,7 @@ impl Writeable for PeerAddrs {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
writer.write_u32(self.peers.len() as u32)?;
for p in &self.peers {
p.write(writer).unwrap();
p.write(writer)?;
}
Ok(())
}
Expand Down Expand Up @@ -484,8 +484,8 @@ pub struct Ping {

impl Writeable for Ping {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
self.total_difficulty.write(writer).unwrap();
self.height.write(writer).unwrap();
self.total_difficulty.write(writer)?;
self.height.write(writer)?;
Ok(())
}
}
Expand All @@ -511,8 +511,8 @@ pub struct Pong {

impl Writeable for Pong {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
self.total_difficulty.write(writer).unwrap();
self.height.write(writer).unwrap();
self.total_difficulty.write(writer)?;
self.height.write(writer)?;
Ok(())
}
}
Expand All @@ -537,7 +537,7 @@ pub struct BanReason {
impl Writeable for BanReason {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ser::Error> {
let ban_reason_i32 = self.ban_reason as i32;
ban_reason_i32.write(writer).unwrap();
ban_reason_i32.write(writer)?;
Ok(())
}
}
Expand Down
105 changes: 32 additions & 73 deletions p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ pub struct Peer {
connection: Option<Mutex<conn::Tracker>>,
}

macro_rules! connection {
($holder:expr) => {
match $holder.connection.as_ref() {
Some(conn) => conn.lock(),
None => return Err(Error::Internal),
}
};
}

impl Peer {
// Only accept and connect can be externally used to build a peer
fn new(info: PeerInfo, adapter: Arc<dyn NetAdapter>) -> Peer {
Expand Down Expand Up @@ -233,41 +242,23 @@ impl Peer {
total_difficulty,
height,
};
self.connection
.as_ref()
.unwrap()
.lock()
.send(ping_msg, msg::Type::Ping)
connection!(self).send(ping_msg, msg::Type::Ping)
}

/// Send the ban reason before banning
pub fn send_ban_reason(&self, ban_reason: ReasonForBan) {
pub fn send_ban_reason(&self, ban_reason: ReasonForBan) -> Result<(), Error> {
let ban_reason_msg = BanReason { ban_reason };
match self
.connection
.as_ref()
.unwrap()
.lock()
connection!(self)
.send(ban_reason_msg, msg::Type::BanReason)
{
Ok(_) => debug!("Sent ban reason {:?} to {}", ban_reason, self.info.addr),
Err(e) => error!(
"Could not send ban reason {:?} to {}: {:?}",
ban_reason, self.info.addr, e
),
};
.map(|_| ())
}

/// Sends the provided block to the remote peer. The request may be dropped
/// if the remote peer is known to already have the block.
pub fn send_block(&self, b: &core::Block) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(b.hash()) {
trace!("Send block {} to {}", b.hash(), self.info.addr);
self.connection
.as_ref()
.unwrap()
.lock()
.send(b, msg::Type::Block)?;
connection!(self).send(b, msg::Type::Block)?;
Ok(true)
} else {
debug!(
Expand All @@ -282,11 +273,7 @@ impl Peer {
pub fn send_compact_block(&self, b: &core::CompactBlock) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(b.hash()) {
trace!("Send compact block {} to {}", b.hash(), self.info.addr);
self.connection
.as_ref()
.unwrap()
.lock()
.send(b, msg::Type::CompactBlock)?;
connection!(self).send(b, msg::Type::CompactBlock)?;
Ok(true)
} else {
debug!(
Expand All @@ -301,11 +288,7 @@ impl Peer {
pub fn send_header(&self, bh: &core::BlockHeader) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(bh.hash()) {
debug!("Send header {} to {}", bh.hash(), self.info.addr);
self.connection
.as_ref()
.unwrap()
.lock()
.send(bh, msg::Type::Header)?;
connection!(self).send(bh, msg::Type::Header)?;
Ok(true)
} else {
debug!(
Expand All @@ -320,11 +303,7 @@ impl Peer {
pub fn send_tx_kernel_hash(&self, h: Hash) -> Result<bool, Error> {
if !self.tracking_adapter.has_recv(h) {
debug!("Send tx kernel hash {} to {}", h, self.info.addr);
self.connection
.as_ref()
.unwrap()
.lock()
.send(h, msg::Type::TransactionKernel)?;
connection!(self).send(h, msg::Type::TransactionKernel)?;
Ok(true)
} else {
debug!(
Expand Down Expand Up @@ -352,11 +331,7 @@ impl Peer {

if !self.tracking_adapter.has_recv(kernel.hash()) {
debug!("Send full tx {} to {}", tx.hash(), self.info.addr);
self.connection
.as_ref()
.unwrap()
.lock()
.send(tx, msg::Type::Transaction)?;
connection!(self).send(tx, msg::Type::Transaction)?;
Ok(true)
} else {
debug!(
Expand All @@ -373,59 +348,38 @@ impl Peer {
/// embargo).
pub fn send_stem_transaction(&self, tx: &core::Transaction) -> Result<(), Error> {
debug!("Send (stem) tx {} to {}", tx.hash(), self.info.addr);
self.connection
.as_ref()
.unwrap()
.lock()
.send(tx, msg::Type::StemTransaction)?;
Ok(())
connection!(self).send(tx, msg::Type::StemTransaction)
}

/// Sends a request for block headers from the provided block locator
pub fn send_header_request(&self, locator: Vec<Hash>) -> Result<(), Error> {
self.connection
.as_ref()
.unwrap()
.lock()
.send(&Locator { hashes: locator }, msg::Type::GetHeaders)
connection!(self).send(&Locator { hashes: locator }, msg::Type::GetHeaders)
}

pub fn send_tx_request(&self, h: Hash) -> Result<(), Error> {
debug!(
"Requesting tx (kernel hash) {} from peer {}.",
h, self.info.addr
);
self.connection
.as_ref()
.unwrap()
.lock()
.send(&h, msg::Type::GetTransaction)
connection!(self).send(&h, msg::Type::GetTransaction)
}

/// Sends a request for a specific block by hash
pub fn send_block_request(&self, h: Hash) -> Result<(), Error> {
debug!("Requesting block {} from peer {}.", h, self.info.addr);
self.tracking_adapter.push_req(h);
self.connection
.as_ref()
.unwrap()
.lock()
.send(&h, msg::Type::GetBlock)
connection!(self).send(&h, msg::Type::GetBlock)
}

/// Sends a request for a specific compact block by hash
pub fn send_compact_block_request(&self, h: Hash) -> Result<(), Error> {
debug!("Requesting compact block {} from {}", h, self.info.addr);
self.connection
.as_ref()
.unwrap()
.lock()
.send(&h, msg::Type::GetCompactBlock)
connection!(self).send(&h, msg::Type::GetCompactBlock)
}

pub fn send_peer_request(&self, capab: Capabilities) -> Result<(), Error> {
trace!("Asking {} for more peers {:?}", self.info.addr, capab);
self.connection.as_ref().unwrap().lock().send(
connection!(self).send(
&GetPeerAddrs {
capabilities: capab,
},
Expand All @@ -438,19 +392,24 @@ impl Peer {
"Asking {} for txhashset archive at {} {}.",
self.info.addr, height, hash
);
self.connection.as_ref().unwrap().lock().send(
connection!(self).send(
&TxHashSetRequest { hash, height },
msg::Type::TxHashSetRequest,
)
}

/// Stops the peer, closing its connection
pub fn stop(&self) {
stop_with_connection(&self.connection.as_ref().unwrap().lock());
if let Some(conn) = self.connection.as_ref() {
stop_with_connection(&conn.lock());
}
}

fn check_connection(&self) -> bool {
let connection = self.connection.as_ref().unwrap().lock();
let connection = match self.connection.as_ref() {
Some(conn) => conn.lock(),
None => return false,
};
match connection.error_channel.try_recv() {
Ok(Error::Serialization(e)) => {
let need_stop = {
Expand Down
Loading