Skip to content

Commit

Permalink
Switch compression algorithm from zstd to lz4 (#2112)
Browse files Browse the repository at this point in the history
Advantages:
* Faster compilation times
* 3x faster encode and decode
* Pure Rust crate
* Can compress in wasm

Disadvantages:
| example .rrd  |  zstd          |  lz4  |
| ------------- | ------------- | ------------- |
| api_demo      |   48 kB |  93 kB |
| car           | 120 kB        | 375 kB        |
| clock         | 28 kB         | 53 kB         |
| colmap        | 227 MB        | 241 MB        |
| deep_sdf      | 19 MB         | 20 MB         |
| dicom         | 39 MB         | 64 MB         |
| nyud          | 535 MB        | 634 MB        |
| plots         | 89 kB         | 163 kB        |
| raw_mesh      | 1.5 MB        | 5.6 MB        |
| text_logging  | 1.9 kB        | 3.0 kB        |

I'm not sure what is the best trade-off here. I'm gonna check the
compilation times too.

Note that these compilation times impact our Rust users, as well as our
contributors.

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)

<!-- This line will get updated when the PR build summary job finishes.
-->
PR Build Summary: https://build.rerun.io/pr/2112
  • Loading branch information
emilk committed May 25, 2023
1 parent 154d8e5 commit 24d9266
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 103 deletions.
12 changes: 10 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions crates/re_log_encoding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ all-features = true
default = []

## Enable loading data from an .rrd file.
decoder = ["dep:rmp-serde", "dep:zstd", "dep:ruzstd"]
decoder = ["dep:rmp-serde", "dep:lz4_flex"]

# Enable encoding of log messages to an .rrd file/stream:
encoder = ["dep:rmp-serde", "dep:zstd"]
encoder = ["dep:rmp-serde", "dep:lz4_flex"]


[dependencies]
Expand All @@ -42,16 +42,15 @@ web-time.workspace = true

# Optional external dependencies:
rmp-serde = { version = "1", optional = true }
lz4_flex = { version = "0.10", optional = true }

# Native dependencies:
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
puffin.workspace = true
zstd = { version = "0.12.0", optional = true } # native only

# Web dependencies:
[target.'cfg(target_arch = "wasm32")'.dependencies]
js-sys = "0.3"
ruzstd = { version = "0.3.0", optional = true } # works on wasm, in contrast to zstd
wasm-bindgen = "0.2"
wasm-bindgen-futures = "0.4"
web-sys = { version = "0.3.52", features = ["Window"] }
Expand Down
90 changes: 9 additions & 81 deletions crates/re_log_encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,8 @@ pub enum DecodeError {
#[error("Failed to read: {0}")]
Read(std::io::Error),

#[cfg(not(target_arch = "wasm32"))]
#[error("Zstd error: {0}")]
Zstd(std::io::Error),

#[cfg(target_arch = "wasm32")]
#[error("Zstd error: {0}")]
RuzstdInit(ruzstd::frame_decoder::FrameDecoderError),

#[cfg(target_arch = "wasm32")]
#[error("Zstd read error: {0}")]
RuzstdRead(std::io::Error),
#[error("lz4 error: {0}")]
Lz4(std::io::Error),

#[error("MsgPack error: {0}")]
MsgPack(#[from] rmp_serde::decode::Error),
Expand All @@ -60,74 +51,12 @@ pub fn decode_bytes(bytes: &[u8]) -> Result<Vec<LogMsg>, DecodeError> {
}

// ----------------------------------------------------------------------------
// native decode:

#[cfg(not(target_arch = "wasm32"))]
pub struct Decoder<'r, R: std::io::BufRead> {
zdecoder: zstd::stream::Decoder<'r, R>,
buffer: Vec<u8>,
}

#[cfg(not(target_arch = "wasm32"))]
impl<'r, R: std::io::Read> Decoder<'r, std::io::BufReader<R>> {
pub fn new(mut read: R) -> Result<Self, DecodeError> {
crate::profile_function!();

let mut header = [0_u8; 4];
read.read_exact(&mut header).map_err(DecodeError::Read)?;
if &header != b"RRF0" {
return Err(DecodeError::NotAnRrd);
}
read.read_exact(&mut header).map_err(DecodeError::Read)?;
warn_on_version_mismatch(header);

let zdecoder = zstd::stream::read::Decoder::new(read).map_err(DecodeError::Zstd)?;
Ok(Self {
zdecoder,
buffer: vec![],
})
}
}

#[cfg(not(target_arch = "wasm32"))]
impl<'r, R: std::io::BufRead> Iterator for Decoder<'r, R> {
type Item = Result<LogMsg, DecodeError>;

fn next(&mut self) -> Option<Self::Item> {
crate::profile_function!();
use std::io::Read as _;

let mut len = [0_u8; 8];
self.zdecoder.read_exact(&mut len).ok()?;
let len = u64::from_le_bytes(len) as usize;

self.buffer.resize(len, 0);

{
crate::profile_scope!("zstd");
if let Err(err) = self.zdecoder.read_exact(&mut self.buffer) {
return Some(Err(DecodeError::Zstd(err)));
}
}

crate::profile_scope!("MsgPack deser");
match rmp_serde::from_read(&mut self.buffer.as_slice()) {
Ok(msg) => Some(Ok(msg)),
Err(err) => Some(Err(err.into())),
}
}
}

// ----------------------------------------------------------------------------
// wasm decode:

#[cfg(target_arch = "wasm32")]
pub struct Decoder<R: std::io::Read> {
zdecoder: ruzstd::StreamingDecoder<R>,
lz4_decoder: lz4_flex::frame::FrameDecoder<R>,
buffer: Vec<u8>,
}

#[cfg(target_arch = "wasm32")]
impl<R: std::io::Read> Decoder<R> {
pub fn new(mut read: R) -> Result<Self, DecodeError> {
crate::profile_function!();
Expand All @@ -140,15 +69,14 @@ impl<R: std::io::Read> Decoder<R> {
read.read_exact(&mut header).map_err(DecodeError::Read)?;
warn_on_version_mismatch(header);

let zdecoder = ruzstd::StreamingDecoder::new(read).map_err(DecodeError::RuzstdInit)?;
let lz4_decoder = lz4_flex::frame::FrameDecoder::new(read);
Ok(Self {
zdecoder,
lz4_decoder,
buffer: vec![],
})
}
}

#[cfg(target_arch = "wasm32")]
impl<R: std::io::Read> Iterator for Decoder<R> {
type Item = Result<LogMsg, DecodeError>;

Expand All @@ -157,15 +85,15 @@ impl<R: std::io::Read> Iterator for Decoder<R> {
use std::io::Read as _;

let mut len = [0_u8; 8];
self.zdecoder.read_exact(&mut len).ok()?;
self.lz4_decoder.read_exact(&mut len).ok()?;
let len = u64::from_le_bytes(len) as usize;

self.buffer.resize(len, 0);

{
crate::profile_scope!("ruzstd");
if let Err(err) = self.zdecoder.read_exact(&mut self.buffer) {
return Some(Err(DecodeError::RuzstdRead(err)));
crate::profile_scope!("lz4");
if let Err(err) = self.lz4_decoder.read_exact(&mut self.buffer) {
return Some(Err(DecodeError::Lz4(err)));
}
}

Expand Down
34 changes: 18 additions & 16 deletions crates/re_log_encoding/src/encoder.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
//! Encoding of [`LogMsg`]es as a binary stream, e.g. to store in an `.rrd` file, or send over network.
// TODO(1316): switch to using lz4flex - see https://github.com/rerun-io/rerun/issues/1316#issuecomment-1510967893

use std::io::Write as _;

use re_log_types::LogMsg;
Expand All @@ -12,8 +10,11 @@ pub enum EncodeError {
#[error("Failed to write: {0}")]
Write(std::io::Error),

#[error("Zstd error: {0}")]
Zstd(std::io::Error),
#[error("lz4 error: {0}")]
Lz4Write(std::io::Error),

#[error("lz4 error: {0}")]
Lz4Finish(lz4_flex::frame::Error),

#[error("MsgPack error: {0}")]
MsgPack(#[from] rmp_serde::encode::Error),
Expand Down Expand Up @@ -43,13 +44,13 @@ pub fn encode_to_bytes<'a>(
/// Encode a stream of [`LogMsg`] into an `.rrd` file.
pub struct Encoder<W: std::io::Write> {
/// Set to None when finished.
zstd_encoder: Option<zstd::stream::Encoder<'static, W>>,
lz4_encoder: Option<lz4_flex::frame::FrameEncoder<W>>,
buffer: Vec<u8>,
}

impl<W: std::io::Write> Drop for Encoder<W> {
fn drop(&mut self) {
if self.zstd_encoder.is_some() {
if self.lz4_encoder.is_some() {
re_log::warn!("Encoder dropped without calling finish()!");
if let Err(err) = self.finish() {
re_log::error!("Failed to finish encoding: {err}");
Expand All @@ -67,29 +68,30 @@ impl<W: std::io::Write> Encoder<W> {
.write_all(&rerun_version.to_bytes())
.map_err(EncodeError::Write)?;

let level = 3;
let zstd_encoder = zstd::stream::Encoder::new(write, level).map_err(EncodeError::Zstd)?;
let lz4_encoder = lz4_flex::frame::FrameEncoder::new(write);

Ok(Self {
zstd_encoder: Some(zstd_encoder),
lz4_encoder: Some(lz4_encoder),
buffer: vec![],
})
}

pub fn append(&mut self, message: &LogMsg) -> Result<(), EncodeError> {
let Self {
zstd_encoder,
lz4_encoder,
buffer,
} = self;

if let Some(zstd_encoder) = zstd_encoder {
if let Some(lz4_encoder) = lz4_encoder {
buffer.clear();
rmp_serde::encode::write_named(buffer, message)?;

zstd_encoder
lz4_encoder
.write_all(&(buffer.len() as u64).to_le_bytes())
.map_err(EncodeError::Zstd)?;
zstd_encoder.write_all(buffer).map_err(EncodeError::Zstd)?;
.map_err(EncodeError::Lz4Write)?;
lz4_encoder
.write_all(buffer)
.map_err(EncodeError::Lz4Write)?;

Ok(())
} else {
Expand All @@ -98,8 +100,8 @@ impl<W: std::io::Write> Encoder<W> {
}

pub fn finish(&mut self) -> Result<(), EncodeError> {
if let Some(zstd_encoder) = self.zstd_encoder.take() {
zstd_encoder.finish().map_err(EncodeError::Zstd)?;
if let Some(lz4_encoder) = self.lz4_encoder.take() {
lz4_encoder.finish().map_err(EncodeError::Lz4Finish)?;
Ok(())
} else {
re_log::warn!("Encoder::finish called twice");
Expand Down

0 comments on commit 24d9266

Please sign in to comment.