Skip to content

Commit

Permalink
[2.x.x] deserialization now protocol version aware (#2824)
Browse files Browse the repository at this point in the history
* introduce protocol version to deserialize and read

* thread protocol version through our reader

* example protocol version access in kernel read

* fix our StreamingReader impl (WouldBlock woes)

* debug log progress of txhashset download
  • Loading branch information
antiochp authored Jun 27, 2019
1 parent 9398578 commit 5aaf2d0
Show file tree
Hide file tree
Showing 20 changed files with 207 additions and 114 deletions.
8 changes: 5 additions & 3 deletions api/src/handlers/pool_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use super::utils::w;
use crate::core::core::hash::Hashed;
use crate::core::core::Transaction;
use crate::core::ser;
use crate::core::ser::{self, ProtocolVersion};
use crate::pool;
use crate::rest::*;
use crate::router::{Handler, ResponseFuture};
Expand Down Expand Up @@ -64,7 +64,6 @@ impl PoolPushHandler {

let fluff = params.get("fluff").is_some();
let pool_arc = match w(&self.tx_pool) {
//w(&self.tx_pool).clone();
Ok(p) => p,
Err(e) => return Box::new(err(e)),
};
Expand All @@ -76,7 +75,10 @@ impl PoolPushHandler {
.map_err(|e| ErrorKind::RequestError(format!("Bad request: {}", e)).into())
})
.and_then(move |tx_bin| {
ser::deserialize(&mut &tx_bin[..])
// TODO - pass protocol version in via the api call?
let version = ProtocolVersion::default();

ser::deserialize(&mut &tx_bin[..], version)
.map_err(|e| ErrorKind::RequestError(format!("Bad request: {}", e)).into())
})
.and_then(move |tx: Transaction| {
Expand Down
2 changes: 1 addition & 1 deletion api/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub struct Status {
impl Status {
pub fn from_tip_and_peers(current_tip: chain::Tip, connections: u32) -> Status {
Status {
protocol_version: p2p::msg::ProtocolVersion::default().into(),
protocol_version: ser::ProtocolVersion::default().into(),
user_agent: p2p::msg::USER_AGENT.to_string(),
connections: connections,
tip: Tip::from_tip(current_tip),
Expand Down
4 changes: 2 additions & 2 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::core::core::{
};
use crate::core::global;
use crate::core::pow;
use crate::core::ser::{Readable, StreamingReader};
use crate::core::ser::{ProtocolVersion, Readable, StreamingReader};
use crate::error::{Error, ErrorKind};
use crate::pipe;
use crate::store;
Expand Down Expand Up @@ -647,7 +647,7 @@ impl Chain {
/// TODO - Write this data to disk and validate the rebuilt kernel MMR.
pub fn kernel_data_write(&self, reader: &mut Read) -> Result<(), Error> {
let mut count = 0;
let mut stream = StreamingReader::new(reader, Duration::from_secs(1));
let mut stream = StreamingReader::new(reader, ProtocolVersion::default(), Duration::from_secs(1));
while let Ok(_kernel) = TxKernelEntry::read(&mut stream) {
count += 1;
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/core/merkle_proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
use crate::core::hash::Hash;
use crate::core::pmmr;
use crate::ser;
use crate::ser::{PMMRIndexHashable, Readable, Reader, Writeable, Writer};
use crate::ser::{PMMRIndexHashable, ProtocolVersion, Readable, Reader, Writeable, Writer};
use crate::util;

/// Merkle proof errors.
Expand Down Expand Up @@ -85,7 +85,7 @@ impl MerkleProof {
/// Convert hex string representation back to a Merkle proof instance
pub fn from_hex(hex: &str) -> Result<MerkleProof, String> {
let bytes = util::from_hex(hex.to_string()).unwrap();
let res = ser::deserialize(&mut &bytes[..])
let res = ser::deserialize(&mut &bytes[..], ProtocolVersion::default())
.map_err(|_| "failed to deserialize a Merkle Proof".to_string())?;
Ok(res)
}
Expand Down
12 changes: 9 additions & 3 deletions core/src/core/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,12 @@ impl Writeable for TxKernel {

impl Readable for TxKernel {
fn read(reader: &mut dyn Reader) -> Result<TxKernel, ser::Error> {
// We have access to the protocol version here.
// This may be a protocol version based on a peer connection
// or the version used locally for db storage.
// We can handle version specific deserialization here.
let _version = reader.protocol_version();

Ok(TxKernel {
features: KernelFeatures::read(reader)?,
fee: reader.read_u64()?,
Expand Down Expand Up @@ -338,7 +344,7 @@ impl Writeable for TxKernelEntry {
}

impl Readable for TxKernelEntry {
fn read(reader: &mut Reader) -> Result<TxKernelEntry, ser::Error> {
fn read(reader: &mut dyn Reader) -> Result<TxKernelEntry, ser::Error> {
let kernel = TxKernel::read(reader)?;
Ok(TxKernelEntry { kernel })
}
Expand Down Expand Up @@ -1523,7 +1529,7 @@ mod test {

let mut vec = vec![];
ser::serialize(&mut vec, &kernel).expect("serialized failed");
let kernel2: TxKernel = ser::deserialize(&mut &vec[..]).unwrap();
let kernel2: TxKernel = ser::deserialize_default(&mut &vec[..]).unwrap();
assert_eq!(kernel2.features, KernelFeatures::Plain);
assert_eq!(kernel2.lock_height, 0);
assert_eq!(kernel2.excess, commit);
Expand All @@ -1541,7 +1547,7 @@ mod test {

let mut vec = vec![];
ser::serialize(&mut vec, &kernel).expect("serialized failed");
let kernel2: TxKernel = ser::deserialize(&mut &vec[..]).unwrap();
let kernel2: TxKernel = ser::deserialize_default(&mut &vec[..]).unwrap();
assert_eq!(kernel2.features, KernelFeatures::HeightLocked);
assert_eq!(kernel2.lock_height, 100);
assert_eq!(kernel2.excess, commit);
Expand Down
96 changes: 91 additions & 5 deletions core/src/ser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ use crate::util::secp::pedersen::{Commitment, RangeProof};
use crate::util::secp::Signature;
use crate::util::secp::{ContextFlag, Secp256k1};
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use std::fmt::Debug;
use std::fmt::{self, Debug};
use std::io::{self, Read, Write};
use std::marker;
use std::time::Duration;
use std::{cmp, error, fmt};
use std::{cmp, error};

/// Possible errors deriving from serializing or deserializing.
#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -209,6 +209,9 @@ pub trait Reader {
/// Consumes a byte from the reader, producing an error if it doesn't have
/// the expected value
fn expect_u8(&mut self, val: u8) -> Result<u8, Error>;
/// Access to underlying protocol version to support
/// version specific deserialization logic.
fn protocol_version(&self) -> ProtocolVersion;
}

/// Trait that every type that can be serialized as binary must implement.
Expand Down Expand Up @@ -275,6 +278,54 @@ where
Ok(res)
}

/// Protocol version for serialization/deserialization.
/// Note: This is used in various places including but limited to
/// the p2p layer and our local db storage layer.
/// We may speak multiple versions to various peers and a potentially *different*
/// version for our local db.
#[derive(Clone, Copy, Debug, Deserialize, Eq, Ord, PartialOrd, PartialEq, Serialize)]
pub struct ProtocolVersion(pub u32);

impl Default for ProtocolVersion {
fn default() -> ProtocolVersion {
ProtocolVersion(1)
}
}

impl fmt::Display for ProtocolVersion {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.0)
}
}

impl ProtocolVersion {
/// We need to specify a protocol version for our local database.
/// Regardless of specific version used when sending/receiving data between peers
/// we need to take care with serialization/deserialization of data locally in the db.
pub fn local_db() -> ProtocolVersion {
ProtocolVersion(1)
}
}

impl From<ProtocolVersion> for u32 {
fn from(v: ProtocolVersion) -> u32 {
v.0
}
}

impl Writeable for ProtocolVersion {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), Error> {
writer.write_u32(self.0)
}
}

impl Readable for ProtocolVersion {
fn read(reader: &mut dyn Reader) -> Result<ProtocolVersion, Error> {
let version = reader.read_u32()?;
Ok(ProtocolVersion(version))
}
}

/// Trait that every type that can be deserialized from binary must implement.
/// Reads directly to a Reader, a utility type thinly wrapping an
/// underlying Read implementation.
Expand All @@ -287,11 +338,24 @@ where
}

/// Deserializes a Readable from any std::io::Read implementation.
pub fn deserialize<T: Readable>(source: &mut dyn Read) -> Result<T, Error> {
let mut reader = BinReader { source };
pub fn deserialize<T: Readable>(
source: &mut dyn Read,
version: ProtocolVersion,
) -> Result<T, Error> {
let mut reader = BinReader::new(source, version);
T::read(&mut reader)
}

/// Deserialize a Readable based on our local db version protocol.
pub fn deserialize_db<T: Readable>(source: &mut dyn Read) -> Result<T, Error> {
deserialize(source, ProtocolVersion::local_db())
}

/// Deserialize a Readable based on our local "default" version protocol.
pub fn deserialize_default<T: Readable>(source: &mut dyn Read) -> Result<T, Error> {
deserialize(source, ProtocolVersion::default())
}

/// Serializes a Writeable into any std::io::Write implementation.
pub fn serialize<W: Writeable>(sink: &mut dyn Write, thing: &W) -> Result<(), Error> {
let mut writer = BinWriter { sink };
Expand All @@ -309,6 +373,14 @@ pub fn ser_vec<W: Writeable>(thing: &W) -> Result<Vec<u8>, Error> {
/// Utility to read from a binary source
pub struct BinReader<'a> {
source: &'a mut dyn Read,
version: ProtocolVersion,
}

impl<'a> BinReader<'a> {
/// Constructor for a new BinReader for the provided source and protocol version.
pub fn new(source: &'a mut dyn Read, version: ProtocolVersion) -> BinReader<'a> {
BinReader { source, version }
}
}

fn map_io_err(err: io::Error) -> Error {
Expand Down Expand Up @@ -366,22 +438,32 @@ impl<'a> Reader for BinReader<'a> {
})
}
}

fn protocol_version(&self) -> ProtocolVersion {
self.version
}
}

/// A reader that reads straight off a stream.
/// Tracks total bytes read so we can verify we read the right number afterwards.
pub struct StreamingReader<'a> {
total_bytes_read: u64,
version: ProtocolVersion,
stream: &'a mut dyn Read,
timeout: Duration,
}

impl<'a> StreamingReader<'a> {
/// Create a new streaming reader with the provided underlying stream.
/// Also takes a duration to be used for each individual read_exact call.
pub fn new(stream: &'a mut dyn Read, timeout: Duration) -> StreamingReader<'a> {
pub fn new(
stream: &'a mut dyn Read,
version: ProtocolVersion,
timeout: Duration,
) -> StreamingReader<'a> {
StreamingReader {
total_bytes_read: 0,
version,
stream,
timeout,
}
Expand Down Expand Up @@ -450,6 +532,10 @@ impl<'a> Reader for StreamingReader<'a> {
})
}
}

fn protocol_version(&self) -> ProtocolVersion {
self.version
}
}

impl Readable for Commitment {
Expand Down
8 changes: 4 additions & 4 deletions core/tests/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ fn serialize_deserialize_header_version() {
assert_eq!(vec1, vec2);

// Check we can successfully deserialize a header_version.
let version: HeaderVersion = ser::deserialize(&mut &vec2[..]).unwrap();
let version: HeaderVersion = ser::deserialize_default(&mut &vec2[..]).unwrap();
assert_eq!(version.0, 1)
}

Expand All @@ -236,7 +236,7 @@ fn serialize_deserialize_block_header() {

let mut vec = Vec::new();
ser::serialize(&mut vec, &header1).expect("serialization failed");
let header2: BlockHeader = ser::deserialize(&mut &vec[..]).unwrap();
let header2: BlockHeader = ser::deserialize_default(&mut &vec[..]).unwrap();

assert_eq!(header1.hash(), header2.hash());
assert_eq!(header1, header2);
Expand All @@ -253,7 +253,7 @@ fn serialize_deserialize_block() {

let mut vec = Vec::new();
ser::serialize(&mut vec, &b).expect("serialization failed");
let b2: Block = ser::deserialize(&mut &vec[..]).unwrap();
let b2: Block = ser::deserialize_default(&mut &vec[..]).unwrap();

assert_eq!(b.hash(), b2.hash());
assert_eq!(b.header, b2.header);
Expand Down Expand Up @@ -447,7 +447,7 @@ fn serialize_deserialize_compact_block() {
cb1.header.timestamp =
origin_ts - Duration::nanoseconds(origin_ts.timestamp_subsec_nanos() as i64);

let cb2: CompactBlock = ser::deserialize(&mut &vec[..]).unwrap();
let cb2: CompactBlock = ser::deserialize_default(&mut &vec[..]).unwrap();

assert_eq!(cb1.header, cb2.header);
assert_eq!(cb1.kern_ids(), cb2.kern_ids());
Expand Down
6 changes: 3 additions & 3 deletions core/tests/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fn simple_tx_ser_deser() {
let tx = tx2i1o();
let mut vec = Vec::new();
ser::serialize(&mut vec, &tx).expect("serialization failed");
let dtx: Transaction = ser::deserialize(&mut &vec[..]).unwrap();
let dtx: Transaction = ser::deserialize_default(&mut &vec[..]).unwrap();
assert_eq!(dtx.fee(), 2);
assert_eq!(dtx.inputs().len(), 2);
assert_eq!(dtx.outputs().len(), 1);
Expand All @@ -63,11 +63,11 @@ fn tx_double_ser_deser() {

let mut vec = Vec::new();
assert!(ser::serialize(&mut vec, &btx).is_ok());
let dtx: Transaction = ser::deserialize(&mut &vec[..]).unwrap();
let dtx: Transaction = ser::deserialize_default(&mut &vec[..]).unwrap();

let mut vec2 = Vec::new();
assert!(ser::serialize(&mut vec2, &btx).is_ok());
let dtx2: Transaction = ser::deserialize(&mut &vec2[..]).unwrap();
let dtx2: Transaction = ser::deserialize_default(&mut &vec2[..]).unwrap();

assert_eq!(btx.hash(), dtx.hash());
assert_eq!(dtx.hash(), dtx2.hash());
Expand Down
5 changes: 2 additions & 3 deletions core/tests/merkle_proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ mod vec_backend;

use self::core::core::merkle_proof::MerkleProof;
use self::core::core::pmmr::PMMR;
use self::core::ser;
use self::core::ser::PMMRIndexHashable;
use self::core::ser::{self, PMMRIndexHashable};
use crate::vec_backend::{TestElem, VecBackend};
use grin_core as core;

Expand All @@ -39,7 +38,7 @@ fn merkle_proof_ser_deser() {

let mut vec = Vec::new();
ser::serialize(&mut vec, &proof).expect("serialization failed");
let proof_2: MerkleProof = ser::deserialize(&mut &vec[..]).unwrap();
let proof_2: MerkleProof = ser::deserialize_default(&mut &vec[..]).unwrap();

assert_eq!(proof, proof_2);
}
Expand Down
2 changes: 1 addition & 1 deletion core/tests/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fn test_output_ser_deser() {

let mut vec = vec![];
ser::serialize(&mut vec, &out).expect("serialized failed");
let dout: Output = ser::deserialize(&mut &vec[..]).unwrap();
let dout: Output = ser::deserialize_default(&mut &vec[..]).unwrap();

assert_eq!(dout.features, OutputFeatures::Plain);
assert_eq!(dout.commit, out.commit);
Expand Down
Loading

0 comments on commit 5aaf2d0

Please sign in to comment.