Skip to content

Commit

Permalink
Use the same RRD encoding for the SDK comms as for everything else (#…
Browse files Browse the repository at this point in the history
…2065)

* Use the same RRD encoding for the SDK comms as for everything else

This comes with two benefits:
* We get nice error messages on version mismatch
* We get compression of the TCP stream

There is also a downside: we need to pay for the slow zstd encoding
and decoding.

Closes #2003

* Use let-else to reduce rightward drift
  • Loading branch information
emilk authored and jprochazk committed May 11, 2023
1 parent 066c068 commit b25f96c
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 54 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions crates/re_log_encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ pub enum DecodeError {
MsgPack(#[from] rmp_serde::decode::Error),
}

// ----------------------------------------------------------------------------

pub fn decode_bytes(bytes: &[u8]) -> Result<Vec<LogMsg>, DecodeError> {
let decoder = Decoder::new(std::io::Cursor::new(bytes))?;
let mut msgs = vec![];
for msg in decoder {
msgs.push(msg?);
}
Ok(msgs)
}

// ----------------------------------------------------------------------------
// native decode:

Expand Down
20 changes: 20 additions & 0 deletions crates/re_log_encoding/src/encoder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Encoding of [`LogMsg`]es as a binary stream, e.g. to store in an `.rrd` file, or send over network.
// TODO(1316): switch to using lz4flex - see https://github.com/rerun-io/rerun/issues/1316#issuecomment-1510967893

use std::io::Write as _;

use re_log_types::LogMsg;
Expand All @@ -20,6 +22,24 @@ pub enum EncodeError {
AlreadyFinished,
}

// ----------------------------------------------------------------------------

pub fn encode_to_bytes<'a>(
msgs: impl IntoIterator<Item = &'a LogMsg>,
) -> Result<Vec<u8>, EncodeError> {
let mut bytes: Vec<u8> = vec![];
{
let mut encoder = Encoder::new(std::io::Cursor::new(&mut bytes))?;
for msg in msgs {
encoder.append(msg)?;
}
encoder.finish()?;
}
Ok(bytes)
}

// ----------------------------------------------------------------------------

/// Encode a stream of [`LogMsg`] into an `.rrd` file.
pub struct Encoder<W: std::io::Write> {
/// Set to None when finished.
Expand Down
1 change: 1 addition & 0 deletions crates/re_sdk_comms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ server = []

[dependencies]
re_log.workspace = true
re_log_encoding.workspace = true
re_log_types = { workspace = true, features = ["serde"] }
re_smart_channel.workspace = true

Expand Down
37 changes: 23 additions & 14 deletions crates/re_sdk_comms/src/buffered_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,26 +188,35 @@ fn msg_encode(
loop {
select! {
recv(msg_rx) -> msg_msg => {
if let Ok(msg_msg) = msg_msg {
let packet_msg = match &msg_msg {
MsgMsg::LogMsg(log_msg) => {
let packet = crate::encode_log_msg(log_msg);
re_log::trace!("Encoded message of size {}", packet.len());
PacketMsg::Packet(packet)
let Ok(msg_msg) = msg_msg else {
return; // channel has closed
};

let packet_msg = match &msg_msg {
MsgMsg::LogMsg(log_msg) => {
match re_log_encoding::encoder::encode_to_bytes(std::iter::once(log_msg)) {
Ok(packet) => {
re_log::trace!("Encoded message of size {}", packet.len());
Some(PacketMsg::Packet(packet))
}
Err(err) => {
re_log::error_once!("Failed to encode log message: {err}");
None
}
}
MsgMsg::Flush => PacketMsg::Flush,
};
}
MsgMsg::Flush => Some(PacketMsg::Flush),
};

if let Some(packet_msg) = packet_msg {
if packet_tx.send(packet_msg).is_err() {
re_log::error!("Failed to send message to tcp_sender thread. Likely a shutdown race-condition.");
return;
}
if msg_drop_tx.send(msg_msg).is_err() {
re_log::error!("Failed to send message to msg_drop thread. Likely a shutdown race-condition");
return;
}
} else {
return; // channel has closed
}
if msg_drop_tx.send(msg_msg).is_err() {
re_log::error!("Failed to send message to msg_drop thread. Likely a shutdown race-condition");
return;
}
}
recv(quit_rx) -> _quit_msg => {
Expand Down
25 changes: 0 additions & 25 deletions crates/re_sdk_comms/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ mod server;
#[cfg(feature = "server")]
pub use server::{serve, ServerOptions};

use re_log_types::LogMsg;

pub type Result<T> = anyhow::Result<T>;

pub const PROTOCOL_VERSION: u16 = 0;
Expand All @@ -31,26 +29,3 @@ pub const DEFAULT_SERVER_PORT: u16 = 9876;
pub fn default_server_addr() -> std::net::SocketAddr {
std::net::SocketAddr::from(([127, 0, 0, 1], DEFAULT_SERVER_PORT))
}

const PREFIX: [u8; 4] = *b"RR00";

pub fn encode_log_msg(log_msg: &LogMsg) -> Vec<u8> {
use bincode::Options as _;
let mut bytes = PREFIX.to_vec();
bincode::DefaultOptions::new()
.serialize_into(&mut bytes, log_msg)
.unwrap();
bytes
}

pub fn decode_log_msg(data: &[u8]) -> Result<LogMsg> {
let payload = data
.strip_prefix(&PREFIX)
.ok_or_else(|| anyhow::format_err!("Message didn't start with the correct prefix"))?;

use anyhow::Context as _;
use bincode::Options as _;
bincode::DefaultOptions::new()
.deserialize(payload)
.context("bincode")
}
30 changes: 15 additions & 15 deletions crates/re_sdk_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,25 +151,25 @@ async fn run_client(
packet.resize(packet_size as usize, 0_u8);
stream.read_exact(&mut packet).await?;

re_log::trace!("Received log message of size {packet_size}.");
re_log::trace!("Received packet of size {packet_size}.");

congestion_manager.register_latency(tx.latency_sec());

let msg = crate::decode_log_msg(&packet)?;

if matches!(msg, LogMsg::Goodbye(_)) {
re_log::debug!("Received goodbye message.");
tx.send(msg)?;
return Ok(());
}
for msg in re_log_encoding::decoder::decode_bytes(&packet)? {
if matches!(msg, LogMsg::Goodbye(_)) {
re_log::debug!("Received goodbye message.");
tx.send(msg)?;
return Ok(());
}

if congestion_manager.should_send(&msg) {
tx.send(msg)?;
} else {
re_log::warn_once!(
"Input latency is over the max ({} s) - dropping packets.",
options.max_latency_sec
);
if congestion_manager.should_send(&msg) {
tx.send(msg)?;
} else {
re_log::warn_once!(
"Input latency is over the max ({} s) - dropping packets.",
options.max_latency_sec
);
}
}
}
}
Expand Down

0 comments on commit b25f96c

Please sign in to comment.