Skip to content

Commit

Permalink
Remove Abomonation for encoding MessageHeader (#457)
Browse files Browse the repository at this point in the history
While this does not remove the dependency on Abomonation, it ensures that
reading and writing MessageHeader structs from the network does not use
Abomonation.

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Aug 14, 2024
1 parent 59a9aea commit f659c51
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 18 deletions.
1 change: 1 addition & 0 deletions communication/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ default = ["getopts"]
[dependencies]
getopts = { version = "0.2.14", optional = true }
bincode = { version = "1.0", optional = true }
byteorder = "1.5"
serde_derive = "1.0"
serde = "1.0"
abomonation = "0.7"
Expand Down
62 changes: 44 additions & 18 deletions communication/src/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@ use std::thread;
use std::thread::sleep;
use std::time::Duration;

use abomonation::{encode, decode};
use byteorder::{ReadBytesExt, WriteBytesExt};

// This constant is sent along immediately after establishing a TCP stream, so
// that it is easy to sniff out Timely traffic when it is multiplexed with
// other traffic on the same port.
const HANDSHAKE_MAGIC: u64 = 0xc2f1fb770118add9;

/// The byte order for writing message headers and stream initialization.
type ByteOrder = byteorder::BigEndian;

/// Framing data for each `Vec<u8>` transmission, indicating a typed channel, the source and
/// destination workers, and the length in bytes.
// *Warning*: Adding, removing and altering fields requires to adjust the implementation below!
#[derive(Abomonation, Debug, PartialEq, Eq, Hash, Clone, Copy)]
pub struct MessageHeader {
/// index of channel.
Expand All @@ -32,30 +36,51 @@ pub struct MessageHeader {
}

impl MessageHeader {

/// The number of `usize` fields in [MessageHeader].
const FIELDS: usize = 5;

/// Returns a header when there is enough supporting data
#[inline]
pub fn try_read(bytes: &mut [u8]) -> Option<MessageHeader> {
unsafe { decode::<MessageHeader>(bytes) }
.and_then(|(header, remaining)| {
if remaining.len() >= header.length {
Some(header.clone())
}
else {
None
}
})
let mut cursor = io::Cursor::new(&bytes[..]);
let mut buffer = [0; Self::FIELDS];
cursor.read_u64_into::<ByteOrder>(&mut buffer).ok()?;
let header = MessageHeader {
// Order must match writing order.
channel: buffer[0] as usize,
source: buffer[1] as usize,
target: buffer[2] as usize,
length: buffer[3] as usize,
seqno: buffer[4] as usize,
};

if bytes.len() >= header.required_bytes() {
Some(header)
} else {
None
}
}

/// Writes the header as binary data.
#[inline]
pub fn write_to<W: ::std::io::Write>(&self, writer: &mut W) -> ::std::io::Result<()> {
unsafe { encode(self, writer) }
pub fn write_to<W: ::std::io::Write>(&self, writer: &mut W) -> Result<()> {
let mut buffer = [0u8; std::mem::size_of::<u64>() * Self::FIELDS];
let mut cursor = io::Cursor::new(&mut buffer[..]);
// Order must match reading order.
cursor.write_u64::<ByteOrder>(self.channel as u64)?;
cursor.write_u64::<ByteOrder>(self.source as u64)?;
cursor.write_u64::<ByteOrder>(self.target as u64)?;
cursor.write_u64::<ByteOrder>(self.length as u64)?;
cursor.write_u64::<ByteOrder>(self.seqno as u64)?;

writer.write_all(&buffer[..])
}

/// The number of bytes required for the header and data.
#[inline]
pub fn required_bytes(&self) -> usize {
::std::mem::size_of::<MessageHeader>() + self.length
std::mem::size_of::<u64>() * Self::FIELDS + self.length
}
}

Expand Down Expand Up @@ -89,8 +114,8 @@ pub fn start_connections(addresses: Arc<Vec<String>>, my_index: usize, noisy: bo
match TcpStream::connect(address) {
Ok(mut stream) => {
stream.set_nodelay(true).expect("set_nodelay call failed");
unsafe { encode(&HANDSHAKE_MAGIC, &mut stream) }.expect("failed to encode/send handshake magic");
unsafe { encode(&(my_index as u64), &mut stream) }.expect("failed to encode/send worker index");
stream.write_u64::<ByteOrder>(HANDSHAKE_MAGIC).expect("failed to encode/send handshake magic");
stream.write_u64::<ByteOrder>(my_index as u64).expect("failed to encode/send worker index");
if noisy { println!("worker {}:\tconnection to worker {}", my_index, index); }
break Some(stream);
},
Expand All @@ -115,12 +140,13 @@ pub fn await_connections(addresses: Arc<Vec<String>>, my_index: usize, noisy: bo
stream.set_nodelay(true).expect("set_nodelay call failed");
let mut buffer = [0u8;16];
stream.read_exact(&mut buffer)?;
let (magic, mut buffer) = unsafe { decode::<u64>(&mut buffer) }.expect("failed to decode magic");
if magic != &HANDSHAKE_MAGIC {
let mut cursor = io::Cursor::new(buffer);
let magic = cursor.read_u64::<ByteOrder>().expect("failed to decode magic");
if magic != HANDSHAKE_MAGIC {
return Err(io::Error::new(io::ErrorKind::InvalidData,
"received incorrect timely handshake"));
}
let identifier = unsafe { decode::<u64>(&mut buffer) }.expect("failed to decode worker index").0.clone() as usize;
let identifier = cursor.read_u64::<ByteOrder>().expect("failed to decode worker index") as usize;
results[identifier - my_index - 1] = Some(stream);
if noisy { println!("worker {}:\tconnection from worker {}", my_index, identifier); }
}
Expand Down

0 comments on commit f659c51

Please sign in to comment.