Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the same RRD encoding for the SDK comms as for everything else #2065

Merged
merged 2 commits into from
May 9, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions crates/re_log_encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ pub enum DecodeError {
MsgPack(#[from] rmp_serde::decode::Error),
}

// ----------------------------------------------------------------------------

pub fn decode_bytes(bytes: &[u8]) -> Result<Vec<LogMsg>, DecodeError> {
let decoder = Decoder::new(std::io::Cursor::new(bytes))?;
let mut msgs = vec![];
for msg in decoder {
msgs.push(msg?);
}
Ok(msgs)
}

// ----------------------------------------------------------------------------
// native decode:

Expand Down
20 changes: 20 additions & 0 deletions crates/re_log_encoding/src/encoder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Encoding of [`LogMsg`]es as a binary stream, e.g. to store in an `.rrd` file, or send over network.
// TODO(1316): switch to using lz4flex - see https://github.com/rerun-io/rerun/issues/1316#issuecomment-1510967893

use std::io::Write as _;

use re_log_types::LogMsg;
Expand All @@ -20,6 +22,24 @@ pub enum EncodeError {
AlreadyFinished,
}

// ----------------------------------------------------------------------------

pub fn encode_to_bytes<'a>(
msgs: impl IntoIterator<Item = &'a LogMsg>,
) -> Result<Vec<u8>, EncodeError> {
let mut bytes: Vec<u8> = vec![];
{
let mut encoder = Encoder::new(std::io::Cursor::new(&mut bytes))?;
for msg in msgs {
encoder.append(msg)?;
}
encoder.finish()?;
}
Ok(bytes)
}

// ----------------------------------------------------------------------------

/// Encode a stream of [`LogMsg`] into an `.rrd` file.
pub struct Encoder<W: std::io::Write> {
/// Set to None when finished.
Expand Down
1 change: 1 addition & 0 deletions crates/re_sdk_comms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ server = []

[dependencies]
re_log.workspace = true
re_log_encoding.workspace = true
re_log_types = { workspace = true, features = ["serde"] }
re_smart_channel.workspace = true

Expand Down
23 changes: 16 additions & 7 deletions crates/re_sdk_comms/src/buffered_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,25 @@ fn msg_encode(
if let Ok(msg_msg) = msg_msg {
jprochazk marked this conversation as resolved.
Show resolved Hide resolved
let packet_msg = match &msg_msg {
MsgMsg::LogMsg(log_msg) => {
let packet = crate::encode_log_msg(log_msg);
re_log::trace!("Encoded message of size {}", packet.len());
PacketMsg::Packet(packet)
match re_log_encoding::encoder::encode_to_bytes(std::iter::once(log_msg)) {
Ok(packet) => {
re_log::trace!("Encoded message of size {}", packet.len());
Some(PacketMsg::Packet(packet))
}
Err(err) => {
re_log::error_once!("Failed to encode log message: {err}");
None
}
}
}
MsgMsg::Flush => PacketMsg::Flush,
MsgMsg::Flush => Some(PacketMsg::Flush),
};

if packet_tx.send(packet_msg).is_err() {
re_log::error!("Failed to send message to tcp_sender thread. Likely a shutdown race-condition.");
return;
if let Some(packet_msg) = packet_msg {
if packet_tx.send(packet_msg).is_err() {
re_log::error!("Failed to send message to tcp_sender thread. Likely a shutdown race-condition.");
jprochazk marked this conversation as resolved.
Show resolved Hide resolved
return;
}
}
if msg_drop_tx.send(msg_msg).is_err() {
re_log::error!("Failed to send message to msg_drop thread. Likely a shutdown race-condition");
Expand Down
25 changes: 0 additions & 25 deletions crates/re_sdk_comms/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ mod server;
#[cfg(feature = "server")]
pub use server::{serve, ServerOptions};

use re_log_types::LogMsg;

pub type Result<T> = anyhow::Result<T>;

pub const PROTOCOL_VERSION: u16 = 0;
Expand All @@ -31,26 +29,3 @@ pub const DEFAULT_SERVER_PORT: u16 = 9876;
pub fn default_server_addr() -> std::net::SocketAddr {
std::net::SocketAddr::from(([127, 0, 0, 1], DEFAULT_SERVER_PORT))
}

const PREFIX: [u8; 4] = *b"RR00";

pub fn encode_log_msg(log_msg: &LogMsg) -> Vec<u8> {
use bincode::Options as _;
let mut bytes = PREFIX.to_vec();
bincode::DefaultOptions::new()
.serialize_into(&mut bytes, log_msg)
.unwrap();
bytes
}

pub fn decode_log_msg(data: &[u8]) -> Result<LogMsg> {
let payload = data
.strip_prefix(&PREFIX)
.ok_or_else(|| anyhow::format_err!("Message didn't start with the correct prefix"))?;

use anyhow::Context as _;
use bincode::Options as _;
bincode::DefaultOptions::new()
.deserialize(payload)
.context("bincode")
}
30 changes: 15 additions & 15 deletions crates/re_sdk_comms/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,25 +151,25 @@ async fn run_client(
packet.resize(packet_size as usize, 0_u8);
stream.read_exact(&mut packet).await?;

re_log::trace!("Received log message of size {packet_size}.");
re_log::trace!("Received packet of size {packet_size}.");

congestion_manager.register_latency(tx.latency_sec());

let msg = crate::decode_log_msg(&packet)?;

if matches!(msg, LogMsg::Goodbye(_)) {
re_log::debug!("Received goodbye message.");
tx.send(msg)?;
return Ok(());
}
for msg in re_log_encoding::decoder::decode_bytes(&packet)? {
if matches!(msg, LogMsg::Goodbye(_)) {
re_log::debug!("Received goodbye message.");
tx.send(msg)?;
return Ok(());
}

if congestion_manager.should_send(&msg) {
tx.send(msg)?;
} else {
re_log::warn_once!(
"Input latency is over the max ({} s) - dropping packets.",
options.max_latency_sec
);
if congestion_manager.should_send(&msg) {
tx.send(msg)?;
} else {
re_log::warn_once!(
"Input latency is over the max ({} s) - dropping packets.",
options.max_latency_sec
);
}
}
}
}
Expand Down