Skip to content

Commit

Permalink
Support RRD streams with and without compression. Turn off for SDK co…
Browse files Browse the repository at this point in the history
…mms (#2219)

Closes #2216

The compression for SDK comms was added in
#2065 and now removed in this PR.

* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)

<!-- This line will get updated when the PR build summary job finishes.
-->
PR Build Summary: https://build.rerun.io/pr/2219
  • Loading branch information
emilk committed May 25, 2023
1 parent de981bb commit 0530990
Show file tree
Hide file tree
Showing 10 changed files with 301 additions and 65 deletions.
4 changes: 3 additions & 1 deletion crates/re_data_store/examples/memory_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ fn log_messages() {

fn encode_log_msg(log_msg: &LogMsg) -> Vec<u8> {
let mut bytes = vec![];
re_log_encoding::encoder::encode(std::iter::once(log_msg), &mut bytes).unwrap();
let encoding_options = re_log_encoding::EncodingOptions::COMPRESSED;
re_log_encoding::encoder::encode(encoding_options, std::iter::once(log_msg), &mut bytes)
.unwrap();
bytes
}

Expand Down
3 changes: 2 additions & 1 deletion crates/re_log_encoding/benches/msg_encode_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ criterion_group!(
criterion_main!(benches);

fn encode_log_msgs(messages: &[LogMsg]) -> Vec<u8> {
let encoding_options = re_log_encoding::EncodingOptions::COMPRESSED;
let mut bytes = vec![];
re_log_encoding::encoder::encode(messages.iter(), &mut bytes).unwrap();
re_log_encoding::encoder::encode(encoding_options, messages.iter(), &mut bytes).unwrap();
assert!(bytes.len() > messages.len());
bytes
}
Expand Down
104 changes: 84 additions & 20 deletions crates/re_log_encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
use re_log_types::LogMsg;

use crate::{Compression, EncodingOptions, Serializer};

// ----------------------------------------------------------------------------

fn warn_on_version_mismatch(encoded_version: [u8; 4]) {
Expand Down Expand Up @@ -29,6 +31,12 @@ pub enum DecodeError {
#[error("Not an .rrd file")]
NotAnRrd,

#[error("Found an .rrd file from a Rerun version from 0.5.1 or earlier")]
OldRrdVersion,

#[error("Failed to decode the options: {0}")]
Options(#[from] crate::OptionsError),

#[error("Failed to read: {0}")]
Read(std::io::Error),

Expand All @@ -52,26 +60,70 @@ pub fn decode_bytes(bytes: &[u8]) -> Result<Vec<LogMsg>, DecodeError> {

// ----------------------------------------------------------------------------

enum Decompressor<R: std::io::Read> {
Uncompressed(R),
Lz4(lz4_flex::frame::FrameDecoder<R>),
}

impl<R: std::io::Read> Decompressor<R> {
fn new(compression: Compression, read: R) -> Self {
match compression {
Compression::Off => Self::Uncompressed(read),
Compression::LZ4 => Self::Lz4(lz4_flex::frame::FrameDecoder::new(read)),
}
}

pub fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), DecodeError> {
use std::io::Read as _;

match self {
Decompressor::Uncompressed(read) => read.read_exact(buf).map_err(DecodeError::Read),
Decompressor::Lz4(lz4) => lz4.read_exact(buf).map_err(DecodeError::Lz4),
}
}
}

// ----------------------------------------------------------------------------

pub struct Decoder<R: std::io::Read> {
lz4_decoder: lz4_flex::frame::FrameDecoder<R>,
decompressor: Decompressor<R>,
buffer: Vec<u8>,
}

impl<R: std::io::Read> Decoder<R> {
pub fn new(mut read: R) -> Result<Self, DecodeError> {
crate::profile_function!();

let mut header = [0_u8; 4];
read.read_exact(&mut header).map_err(DecodeError::Read)?;
if &header != b"RRF0" {
return Err(DecodeError::NotAnRrd);
{
let mut header = [0_u8; 4];
read.read_exact(&mut header).map_err(DecodeError::Read)?;
if &header == b"RRF0" {
return Err(DecodeError::OldRrdVersion);
} else if &header != crate::RRD_HEADER {
return Err(DecodeError::NotAnRrd);
}
}

{
let mut version_bytes = [0_u8; 4];
read.read_exact(&mut version_bytes)
.map_err(DecodeError::Read)?;
warn_on_version_mismatch(version_bytes);
}

let options = {
let mut options_bytes = [0_u8; 4];
read.read_exact(&mut options_bytes)
.map_err(DecodeError::Read)?;
EncodingOptions::from_bytes(options_bytes)?
};

match options.serializer {
Serializer::MsgPack => {}
}
read.read_exact(&mut header).map_err(DecodeError::Read)?;
warn_on_version_mismatch(header);

let lz4_decoder = lz4_flex::frame::FrameDecoder::new(read);
Ok(Self {
lz4_decoder,
decompressor: Decompressor::new(options.compression, read),
buffer: vec![],
})
}
Expand All @@ -82,18 +134,17 @@ impl<R: std::io::Read> Iterator for Decoder<R> {

fn next(&mut self) -> Option<Self::Item> {
crate::profile_function!();
use std::io::Read as _;

let mut len = [0_u8; 8];
self.lz4_decoder.read_exact(&mut len).ok()?;
self.decompressor.read_exact(&mut len).ok()?;
let len = u64::from_le_bytes(len) as usize;

self.buffer.resize(len, 0);

{
crate::profile_scope!("lz4");
if let Err(err) = self.lz4_decoder.read_exact(&mut self.buffer) {
return Some(Err(DecodeError::Lz4(err)));
if let Err(err) = self.decompressor.read_exact(&mut self.buffer) {
return Some(Err(err));
}
}

Expand Down Expand Up @@ -129,13 +180,26 @@ fn test_encode_decode() {
},
})];

let mut file = vec![];
crate::encoder::encode(messages.iter(), &mut file).unwrap();
let options = [
EncodingOptions {
compression: Compression::Off,
serializer: Serializer::MsgPack,
},
EncodingOptions {
compression: Compression::LZ4,
serializer: Serializer::MsgPack,
},
];

for options in options {
let mut file = vec![];
crate::encoder::encode(options, messages.iter(), &mut file).unwrap();

let decoded_messages = Decoder::new(&mut file.as_slice())
.unwrap()
.collect::<Result<Vec<LogMsg>, DecodeError>>()
.unwrap();
let decoded_messages = Decoder::new(&mut file.as_slice())
.unwrap()
.collect::<Result<Vec<LogMsg>, DecodeError>>()
.unwrap();

assert_eq!(messages, decoded_messages);
assert_eq!(messages, decoded_messages);
}
}
141 changes: 104 additions & 37 deletions crates/re_log_encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ use std::io::Write as _;

use re_log_types::LogMsg;

use crate::{Compression, EncodingOptions};

// ----------------------------------------------------------------------------

/// On failure to encode or serialize a [`LogMsg`].
#[derive(thiserror::Error, Debug)]
pub enum EncodeError {
Expand All @@ -26,11 +30,12 @@ pub enum EncodeError {
// ----------------------------------------------------------------------------

pub fn encode_to_bytes<'a>(
options: EncodingOptions,
msgs: impl IntoIterator<Item = &'a LogMsg>,
) -> Result<Vec<u8>, EncodeError> {
let mut bytes: Vec<u8> = vec![];
{
let mut encoder = Encoder::new(std::io::Cursor::new(&mut bytes))?;
let mut encoder = Encoder::new(options, std::io::Cursor::new(&mut bytes))?;
for msg in msgs {
encoder.append(msg)?;
}
Expand All @@ -41,14 +46,42 @@ pub fn encode_to_bytes<'a>(

// ----------------------------------------------------------------------------

/// Encode a stream of [`LogMsg`] into an `.rrd` file.
pub struct Encoder<W: std::io::Write> {
/// Set to None when finished.
struct Lz4Compressor<W: std::io::Write> {
/// `None` if finished.
lz4_encoder: Option<lz4_flex::frame::FrameEncoder<W>>,
buffer: Vec<u8>,
}

impl<W: std::io::Write> Drop for Encoder<W> {
impl<W: std::io::Write> Lz4Compressor<W> {
pub fn new(write: W) -> Self {
Self {
lz4_encoder: Some(lz4_flex::frame::FrameEncoder::new(write)),
}
}

pub fn write(&mut self, bytes: &[u8]) -> Result<(), EncodeError> {
if let Some(lz4_encoder) = &mut self.lz4_encoder {
lz4_encoder
.write_all(bytes)
.map_err(EncodeError::Lz4Write)?;

Ok(())
} else {
Err(EncodeError::AlreadyFinished)
}
}

pub fn finish(&mut self) -> Result<(), EncodeError> {
if let Some(lz4_encoder) = self.lz4_encoder.take() {
lz4_encoder.finish().map_err(EncodeError::Lz4Finish)?;
Ok(())
} else {
re_log::warn!("Encoder::finish called twice");
Ok(())
}
}
}

impl<W: std::io::Write> Drop for Lz4Compressor<W> {
fn drop(&mut self) {
if self.lz4_encoder.is_some() {
re_log::warn!("Encoder dropped without calling finish()!");
Expand All @@ -59,73 +92,107 @@ impl<W: std::io::Write> Drop for Encoder<W> {
}
}

#[allow(clippy::large_enum_variant)]
enum Compressor<W: std::io::Write> {
Off(W),
Lz4(Lz4Compressor<W>),
}

impl<W: std::io::Write> Compressor<W> {
pub fn new(compression: Compression, write: W) -> Self {
match compression {
Compression::Off => Self::Off(write),
Compression::LZ4 => Self::Lz4(Lz4Compressor::new(write)),
}
}

pub fn write(&mut self, bytes: &[u8]) -> Result<(), EncodeError> {
let len = (bytes.len() as u64).to_le_bytes();

match self {
Compressor::Off(write) => {
write.write_all(&len).map_err(EncodeError::Write)?;
write.write_all(bytes).map_err(EncodeError::Write)
}
Compressor::Lz4(lz4) => {
lz4.write(&len)?;
lz4.write(bytes)
}
}
}

pub fn finish(&mut self) -> Result<(), EncodeError> {
match self {
Compressor::Off(_) => Ok(()),
Compressor::Lz4(lz4) => lz4.finish(),
}
}
}

// ----------------------------------------------------------------------------

/// Encode a stream of [`LogMsg`] into an `.rrd` file.
pub struct Encoder<W: std::io::Write> {
compressor: Compressor<W>,
buffer: Vec<u8>,
}

impl<W: std::io::Write> Encoder<W> {
pub fn new(mut write: W) -> Result<Self, EncodeError> {
pub fn new(options: EncodingOptions, mut write: W) -> Result<Self, EncodeError> {
let rerun_version = re_build_info::CrateVersion::parse(env!("CARGO_PKG_VERSION"));

write.write_all(b"RRF0").map_err(EncodeError::Write)?;
write
.write_all(crate::RRD_HEADER)
.map_err(EncodeError::Write)?;
write
.write_all(&rerun_version.to_bytes())
.map_err(EncodeError::Write)?;
write
.write_all(&options.to_bytes())
.map_err(EncodeError::Write)?;

let lz4_encoder = lz4_flex::frame::FrameEncoder::new(write);
match options.serializer {
crate::Serializer::MsgPack => {}
}

Ok(Self {
lz4_encoder: Some(lz4_encoder),
compressor: Compressor::new(options.compression, write),
buffer: vec![],
})
}

pub fn append(&mut self, message: &LogMsg) -> Result<(), EncodeError> {
let Self {
lz4_encoder,
buffer,
} = self;
let Self { compressor, buffer } = self;

if let Some(lz4_encoder) = lz4_encoder {
buffer.clear();
rmp_serde::encode::write_named(buffer, message)?;
buffer.clear();
rmp_serde::encode::write_named(buffer, message)?;

lz4_encoder
.write_all(&(buffer.len() as u64).to_le_bytes())
.map_err(EncodeError::Lz4Write)?;
lz4_encoder
.write_all(buffer)
.map_err(EncodeError::Lz4Write)?;

Ok(())
} else {
Err(EncodeError::AlreadyFinished)
}
compressor.write(buffer)
}

pub fn finish(&mut self) -> Result<(), EncodeError> {
if let Some(lz4_encoder) = self.lz4_encoder.take() {
lz4_encoder.finish().map_err(EncodeError::Lz4Finish)?;
Ok(())
} else {
re_log::warn!("Encoder::finish called twice");
Ok(())
}
self.compressor.finish()
}
}

pub fn encode<'a>(
options: EncodingOptions,
messages: impl Iterator<Item = &'a LogMsg>,
write: &mut impl std::io::Write,
) -> Result<(), EncodeError> {
let mut encoder = Encoder::new(write)?;
let mut encoder = Encoder::new(options, write)?;
for message in messages {
encoder.append(message)?;
}
encoder.finish()
}

pub fn encode_owned(
options: EncodingOptions,
messages: impl Iterator<Item = LogMsg>,
write: impl std::io::Write,
) -> Result<(), EncodeError> {
let mut encoder = Encoder::new(write)?;
let mut encoder = Encoder::new(options, write)?;
for message in messages {
encoder.append(&message)?;
}
Expand Down
Loading

0 comments on commit 0530990

Please sign in to comment.