Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor p2p reader #3433

Merged
merged 8 commits into from
Sep 28, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ siphasher = "0.3"
log = "0.4"
chrono = { version = "0.4.11", features = ["serde"] }
zeroize = { version = "1.1", features =["zeroize_derive"] }
bytes = "0.5"

keychain = { package = "grin_keychain", path = "../keychain", version = "4.1.0-alpha.1" }
util = { package = "grin_util", path = "../util", version = "4.1.0-alpha.1" }
Expand Down
110 changes: 110 additions & 0 deletions core/src/ser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use crate::core::hash::{DefaultHashable, Hash, Hashed};
use crate::global::PROTOCOL_VERSION;
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use bytes::Buf;
use keychain::{BlindingFactor, Identifier, IDENTIFIER_SIZE};
use std::convert::TryInto;
use std::fmt::{self, Debug};
Expand Down Expand Up @@ -77,6 +78,12 @@ impl From<io::Error> for Error {
}
}

impl From<io::ErrorKind> for Error {
fn from(e: io::ErrorKind) -> Error {
Error::IOErr(format!("{}", io::Error::from(e)), e)
}
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
Expand Down Expand Up @@ -561,6 +568,109 @@ impl<'a> Reader for StreamingReader<'a> {
}
}

/// Protocol version-aware wrapper around a `Buf` impl
antiochp marked this conversation as resolved.
Show resolved Hide resolved
pub struct BufReader<'a, B: Buf> {
inner: &'a mut B,
version: ProtocolVersion,
bytes_read: usize,
}

impl<'a, B: Buf> BufReader<'a, B> {
/// Construct a new BufReader
pub fn new(buf: &'a mut B, version: ProtocolVersion) -> Self {
Self {
inner: buf,
version,
bytes_read: 0,
}
}

/// Check whether the buffer has enough bytes remaining to perform a read
fn has_remaining(&mut self, len: usize) -> Result<(), Error> {
if self.inner.remaining() >= len {
self.bytes_read += len;
Ok(())
} else {
Err(io::ErrorKind::UnexpectedEof.into())
}
}

/// The total bytes read
pub fn bytes_read(&self) -> u64 {
self.bytes_read as u64
}

/// Convenience function to read from the buffer and deserialize
pub fn body<T: Readable>(&mut self) -> Result<T, Error> {
T::read(self)
}
}

impl<'a, B: Buf> Reader for BufReader<'a, B> {
fn read_u8(&mut self) -> Result<u8, Error> {
self.has_remaining(1)?;
Ok(self.inner.get_u8())
}

fn read_u16(&mut self) -> Result<u16, Error> {
self.has_remaining(2)?;
Ok(self.inner.get_u16())
}

fn read_u32(&mut self) -> Result<u32, Error> {
self.has_remaining(4)?;
Ok(self.inner.get_u32())
}

fn read_u64(&mut self) -> Result<u64, Error> {
self.has_remaining(8)?;
Ok(self.inner.get_u64())
}

fn read_i32(&mut self) -> Result<i32, Error> {
self.has_remaining(4)?;
Ok(self.inner.get_i32())
}

fn read_i64(&mut self) -> Result<i64, Error> {
self.has_remaining(8)?;
Ok(self.inner.get_i64())
}

fn read_bytes_len_prefix(&mut self) -> Result<Vec<u8>, Error> {
let len = self.read_u64()?;
self.read_fixed_bytes(len as usize)
}

fn read_fixed_bytes(&mut self, len: usize) -> Result<Vec<u8>, Error> {
// not reading more than 100k bytes in a single read
if len > 100_000 {
return Err(Error::TooLargeReadErr);
}
self.has_remaining(len)?;

let mut buf = vec![0; len];
self.inner.copy_to_slice(&mut buf[..]);
Ok(buf)
}

fn expect_u8(&mut self, val: u8) -> Result<u8, Error> {
let b = self.read_u8()?;
if b == val {
Ok(b)
} else {
Err(Error::UnexpectedData {
expected: vec![val],
received: vec![b],
})
}
}

fn protocol_version(&self) -> ProtocolVersion {
self.version
}
}

impl Readable for Commitment {
fn read<R: Reader>(reader: &mut R) -> Result<Commitment, Error> {
let a = reader.read_fixed_bytes(PEDERSEN_COMMITMENT_SIZE)?;
Expand Down
1 change: 1 addition & 0 deletions p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ serde_derive = "1"
tempfile = "3.1"
log = "0.4"
chrono = { version = "0.4.11", features = ["serde"] }
bytes = "0.5"

grin_core = { path = "../core", version = "4.1.0-alpha.1" }
grin_store = { path = "../store", version = "4.1.0-alpha.1" }
Expand Down
137 changes: 137 additions & 0 deletions p2p/src/codec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
use crate::core::ser::{BufReader, ProtocolVersion, Readable};
use crate::msg::{MsgHeader, MsgHeaderWrapper};
use crate::types::{AttachmentMeta, AttachmentUpdate, Error};
use bytes::{BufMut, Bytes, BytesMut};
use std::cmp::min;
use std::io::Read;
use std::net::TcpStream;
use std::time::{Duration, Instant};
use MsgHeaderWrapper::*;
use State::*;

const HEADER_IO_TIMEOUT: Duration = Duration::from_millis(2000);
pub const BODY_IO_TIMEOUT: Duration = Duration::from_millis(60000);

enum State {
None,
Header(MsgHeaderWrapper),
Attachment(usize, AttachmentMeta, Instant),
}

impl State {
fn take(&mut self) -> Self {
std::mem::replace(self, State::None)
}

fn is_none(&self) -> bool {
match self {
State::None => true,
_ => false,
}
}
}

pub enum Output {
antiochp marked this conversation as resolved.
Show resolved Hide resolved
Known(MsgHeader, Bytes),
Unknown(u64, u8),
Attachment(AttachmentUpdate, Bytes),
}

pub struct Codec {
pub version: ProtocolVersion,
stream: TcpStream,
buffer: BytesMut,
state: State,
}

impl Codec {
pub fn new(version: ProtocolVersion, stream: TcpStream) -> Self {
Self {
version,
stream,
buffer: BytesMut::with_capacity(8 * 1024),
state: None,
}
}

/// Destroy the codec and return the reader
pub fn stream(self) -> TcpStream {
self.stream
}

/// Inform codec next `len` bytes are an attachment
/// Panics if already reading a body
pub fn expect_attachment(&mut self, meta: AttachmentMeta) {
assert!(self.state.is_none());
self.state = Attachment(meta.size, meta, Instant::now());
}

/// Length of the next item we are expecting, could be header, body or attachment chunk
fn next_len(&self) -> usize {
match &self.state {
None => MsgHeader::LEN,
Header(Known(header)) => header.msg_len as usize,
Header(Unknown(len, _)) => *len as usize,
Attachment(left, _, _) => min(*left, 48_000),
}
}

/// Set stream timeout depending on the next expected item
fn set_stream_timeout(&self) -> Result<(), Error> {
let timeout = match &self.state {
None => HEADER_IO_TIMEOUT,
_ => BODY_IO_TIMEOUT,
};
self.stream.set_read_timeout(Some(timeout))?;
Ok(())
}

/// Blocking read of the next message
pub fn read(&mut self) -> Result<Output, Error> {
loop {
let next_len = self.next_len();
self.buffer.reserve(next_len);
for _ in 0..next_len {
self.buffer.put_u8(0);
}
let mut buf = self.buffer.split_to(next_len);
self.set_stream_timeout()?;
self.stream.read_exact(&mut buf[..])?;
let mut raw = buf.freeze();
match self.state.take() {
None => {
// Parse header and keep reading
let mut reader = BufReader::new(&mut raw, self.version);
let header = MsgHeaderWrapper::read(&mut reader)?;
self.state = Header(header);
}
Header(Known(header)) => {
// Return message
return Ok(Output::Known(header, raw));
}
Header(Unknown(len, msg_type)) => {
// Discard body and return
return Ok(Output::Unknown(len, msg_type));
}
Attachment(mut left, meta, mut now) => {
left -= next_len;
if now.elapsed().as_secs() > 10 {
now = Instant::now();
debug!("attachment: {}/{}", meta.size - left, meta.size);
}
let update = AttachmentUpdate {
read: next_len,
left,
meta: meta.clone(),
};
if left > 0 {
self.state = Attachment(left, meta, now);
} else {
debug!("attachment: DONE");
}
return Ok(Output::Attachment(update, raw));
}
}
}
}
}
Loading