From b81036c3ecb9412bf6bd5a1b583ac9936579ccfe Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Wed, 1 May 2019 15:43:49 +0100 Subject: [PATCH 1/4] wrap MsgHeader in MsgHeaderWrapper for Known/Unknown msg type support. --- p2p/src/conn.rs | 45 +++++++++++-------- p2p/src/msg.rs | 104 +++++++++++++++++++++++++++++++++----------- p2p/src/protocol.rs | 13 ++++-- p2p/src/types.rs | 2 + 4 files changed, 119 insertions(+), 45 deletions(-) diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index f105c4db6e..71e994356d 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -28,7 +28,10 @@ use std::{cmp, thread, time}; use crate::core::ser; use crate::core::ser::FixedLength; -use crate::msg::{read_body, read_header, read_item, write_to_buf, MsgHeader, Type}; +use crate::msg::{ + read_body, read_discard, read_header, read_item, write_to_buf, MsgHeader, MsgHeaderWrapper, + Type, +}; use crate::types::Error; use crate::util::read_write::{read_exact, write_all}; use crate::util::{RateCounter, RwLock}; @@ -252,27 +255,35 @@ fn poll( let mut retry_send = Err(()); loop { // check the read end - if let Some(h) = try_break!(error_tx, read_header(&mut reader, None)) { - let msg = Message::from_header(h, &mut reader); + match try_break!(error_tx, read_header(&mut reader, None)) { + Some(MsgHeaderWrapper::Known(header)) => { + let msg = Message::from_header(header, &mut reader); - trace!( - "Received message header, type {:?}, len {}.", - msg.header.msg_type, - msg.header.msg_len - ); + trace!( + "Received message header, type {:?}, len {}.", + msg.header.msg_type, + msg.header.msg_len + ); + + // Increase received bytes counter + received_bytes + .write() + .inc(MsgHeader::LEN as u64 + msg.header.msg_len); - // Increase received bytes counter - let received = received_bytes.clone(); - { - let mut received_bytes = received_bytes.write(); - received_bytes.inc(MsgHeader::LEN as u64 + msg.header.msg_len); + if let Some(Some(resp)) = try_break!( + error_tx, + handler.consume(msg, &mut writer, received_bytes.clone()) + ) { + try_break!(error_tx, resp.write(sent_bytes.clone())); + } } + Some(MsgHeaderWrapper::Unknown(msg_len)) => { + // Increase received bytes counter + received_bytes.write().inc(MsgHeader::LEN as u64 + msg_len); - if let Some(Some(resp)) = - try_break!(error_tx, handler.consume(msg, &mut writer, received)) - { - try_break!(error_tx, resp.write(sent_bytes.clone())); + try_break!(error_tx, read_discard(msg_len, &mut reader)); } + None => {} } // check the write end, use or_else so try_recv is lazily eval'd diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 30a7892af1..18c5a12654 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -119,24 +119,20 @@ fn magic() -> [u8; 2] { /// Read a header from the provided stream without blocking if the /// underlying stream is async. Typically headers will be polled for, so /// we do not want to block. -pub fn read_header(stream: &mut dyn Read, msg_type: Option) -> Result { +/// +/// Note: We return a MsgHeaderWrapper here as we may encounter an unknown msg type. +/// +pub fn read_header( + stream: &mut dyn Read, + msg_type: Option, +) -> Result { let mut head = vec![0u8; MsgHeader::LEN]; if Some(Type::Hand) == msg_type { read_exact(stream, &mut head, time::Duration::from_millis(10), true)?; } else { read_exact(stream, &mut head, time::Duration::from_secs(10), false)?; } - let header = ser::deserialize::(&mut &head[..])?; - let max_len = max_msg_size(header.msg_type); - - // TODO 4x the limits for now to leave ourselves space to change things - if header.msg_len > max_len * 4 { - error!( - "Too large read {}, had {}, wanted {}.", - header.msg_type as u8, max_len, header.msg_len - ); - return Err(Error::Serialization(ser::Error::TooLargeReadErr)); - } + let header = ser::deserialize::(&mut &head[..])?; Ok(header) } @@ -158,13 +154,28 @@ pub fn read_body(h: &MsgHeader, stream: &mut dyn Read) -> Result Result<(), Error> { + let mut buffer = vec![0u8; msg_len as usize]; + read_exact(stream, &mut buffer, time::Duration::from_secs(20), true)?; + Ok(()) +} + /// Reads a full message from the underlying stream. pub fn read_message(stream: &mut dyn Read, msg_type: Type) -> Result { - let header = read_header(stream, Some(msg_type))?; - if header.msg_type != msg_type { - return Err(Error::BadMessage); + match read_header(stream, Some(msg_type))? { + MsgHeaderWrapper::Known(header) => { + if header.msg_type == msg_type { + read_body(&header, stream) + } else { + Err(Error::BadMessage) + } + } + MsgHeaderWrapper::Unknown(msg_len) => { + read_discard(msg_len, stream)?; + Err(Error::UnknownMessage) + } } - read_body(&header, stream) } pub fn write_to_buf(msg: T, msg_type: Type) -> Result, Error> { @@ -191,7 +202,19 @@ pub fn write_message( Ok(()) } +/// A wrapper around a message header. If the header is for an unknown msg type +/// then we will be unable to parse the msg itself (just a bunch of random bytes). +/// But we need to know how many bytes to discard to discard the full message. +#[derive(Clone)] +pub enum MsgHeaderWrapper { + /// A "known" msg type with deserialized msg header. + Known(MsgHeader), + /// An unknown msg type with corresponding msg size in bytes. + Unknown(u64), +} + /// Header of any protocol message, used to identify incoming messages. +#[derive(Clone)] pub struct MsgHeader { magic: [u8; 2], /// Type of the message. @@ -228,19 +251,50 @@ impl Writeable for MsgHeader { } } -impl Readable for MsgHeader { - fn read(reader: &mut dyn Reader) -> Result { +impl Readable for MsgHeaderWrapper { + fn read(reader: &mut dyn Reader) -> Result { let m = magic(); reader.expect_u8(m[0])?; reader.expect_u8(m[1])?; - let (t, len) = ser_multiread!(reader, read_u8, read_u64); + + // Read the msg header. + // We do not yet know if the msg type is one we support locally. + let (t, msg_len) = ser_multiread!(reader, read_u8, read_u64); + + // Attempt to convert the msg type byte into one of our known msg type enum variants. + // Check the msg_len while we are at it. match Type::from_u8(t) { - Some(ty) => Ok(MsgHeader { - magic: m, - msg_type: ty, - msg_len: len, - }), - None => Err(ser::Error::CorruptedData), + Some(msg_type) => { + // TODO 4x the limits for now to leave ourselves space to change things. + let max_len = max_msg_size(msg_type) * 4; + if msg_len > max_len { + error!( + "Too large read {:?}, max_len: {}, msg_len: {}.", + msg_type, max_len, msg_len + ); + return Err(ser::Error::TooLargeReadErr); + } + + Ok(MsgHeaderWrapper::Known(MsgHeader { + magic: m, + msg_type, + msg_len, + })) + } + None => { + // Unknown msg type, but we still want to limit how big the msg is. + // Default to max_block_size (4x as above for space to change things). + let max_len = max_block_size() * 4; + if msg_len > max_len { + error!( + "Too large read (unknown msg type) {:?}, max_len: {}, msg_len: {}.", + t, max_len, msg_len + ); + return Err(ser::Error::TooLargeReadErr); + } + + Ok(MsgHeaderWrapper::Unknown(msg_len)) + } } } } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index 200d1f3703..da9968f8d2 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -353,9 +353,16 @@ impl MessageHandler for Protocol { Ok(None) } - - _ => { - debug!("unknown message type {:?}", msg.header.msg_type); + Type::Error => { + debug!("Received an unexpected msg: {:?}", msg.header.msg_type); + Ok(None) + } + Type::Hand => { + debug!("Received an unexpected msg: {:?}", msg.header.msg_type); + Ok(None) + } + Type::Shake => { + debug!("Received an unexpected msg: {:?}", msg.header.msg_type); Ok(None) } } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index b431b7189b..618e7a9644 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -60,6 +60,8 @@ pub enum Error { Connection(io::Error), /// Header type does not match the expected message type BadMessage, + /// Unkown message type + UnknownMessage, MsgLen, Banned, ConnectionClose, From ebcdba1949fff7c9763f10ad7bea5e22356dbf11 Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Wed, 1 May 2019 16:02:28 +0100 Subject: [PATCH 2/4] cleanup --- p2p/src/msg.rs | 2 +- p2p/src/types.rs | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 18c5a12654..2a7fc1d791 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -173,7 +173,7 @@ pub fn read_message(stream: &mut dyn Read, msg_type: Type) -> Resul } MsgHeaderWrapper::Unknown(msg_len) => { read_discard(msg_len, stream)?; - Err(Error::UnknownMessage) + Err(Error::BadMessage) } } } diff --git a/p2p/src/types.rs b/p2p/src/types.rs index 618e7a9644..b431b7189b 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -60,8 +60,6 @@ pub enum Error { Connection(io::Error), /// Header type does not match the expected message type BadMessage, - /// Unkown message type - UnknownMessage, MsgLen, Banned, ConnectionClose, From 1c252eefb579c1521180d50af443a232125b1b22 Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Thu, 2 May 2019 10:18:53 +0100 Subject: [PATCH 3/4] cleanup based on feedback --- p2p/src/msg.rs | 8 ++++++-- p2p/src/protocol.rs | 10 +--------- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/p2p/src/msg.rs b/p2p/src/msg.rs index 2a7fc1d791..a87372568f 100644 --- a/p2p/src/msg.rs +++ b/p2p/src/msg.rs @@ -81,6 +81,11 @@ fn max_block_size() -> u64 { (global::max_block_weight() / consensus::BLOCK_OUTPUT_WEIGHT * 708) as u64 } +// Max msg size when msg type is unknown. +fn default_max_msg_size() -> u64 { + max_block_size() +} + // Max msg size for each msg type. fn max_msg_size(msg_type: Type) -> u64 { match msg_type { @@ -283,8 +288,7 @@ impl Readable for MsgHeaderWrapper { } None => { // Unknown msg type, but we still want to limit how big the msg is. - // Default to max_block_size (4x as above for space to change things). - let max_len = max_block_size() * 4; + let max_len = default_max_msg_size() * 4; if msg_len > max_len { error!( "Too large read (unknown msg type) {:?}, max_len: {}, msg_len: {}.", diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index da9968f8d2..9b3100ce1f 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -353,15 +353,7 @@ impl MessageHandler for Protocol { Ok(None) } - Type::Error => { - debug!("Received an unexpected msg: {:?}", msg.header.msg_type); - Ok(None) - } - Type::Hand => { - debug!("Received an unexpected msg: {:?}", msg.header.msg_type); - Ok(None) - } - Type::Shake => { + Type::Error | Type::Hand | Type::Shake => { debug!("Received an unexpected msg: {:?}", msg.header.msg_type); Ok(None) } From 7eaca0831fe520ebc77d5ced35e67c39308769fa Mon Sep 17 00:00:00 2001 From: antiochp <30642645+antiochp@users.noreply.github.com> Date: Fri, 3 May 2019 16:14:02 +0100 Subject: [PATCH 4/4] rustfmt --- p2p/src/conn.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/p2p/src/conn.rs b/p2p/src/conn.rs index 425583c924..344ddef59c 100644 --- a/p2p/src/conn.rs +++ b/p2p/src/conn.rs @@ -269,9 +269,9 @@ fn poll( .write() .inc(MsgHeader::LEN as u64 + msg.header.msg_len); - if let Some(Some(resp)) = try_break!( - handler.consume(msg, &mut writer, received_bytes.clone()) - ) { + if let Some(Some(resp)) = + try_break!(handler.consume(msg, &mut writer, received_bytes.clone())) + { try_break!(resp.write(sent_bytes.clone())); } }