Skip to content

Commit

Permalink
Stream .rrd files (#2412)
Browse files Browse the repository at this point in the history
<!--
Open the PR up as a draft until you feel it is ready for a proper
review.

Do not make PR:s from your own `main` branch, as that makes it difficult
for reviewers to add their own fixes.

Add any improvements to the branch as new commits to make it easier for
reviewers to follow the progress. All commits will be squashed to a
single commit once the PR is merged into `main`.

Make sure you mention any issues that this PR closes in the description,
as well as any other related issues.

To get an auto-generated PR description you can put "copilot:summary" or
"copilot:walkthrough" anywhere.
-->

### What

Closes #2262

Instead of waiting for the `.rrd` file to finish downloading,
`stream_rrd_from_http` now uses `ehttp::streaming` and parses messages
as data is streamed in.

As part of this change, compression now uses the LZ4 block format
instead of the LZ4 frame format. **This is a breaking change to the RRD
format**.

| example                | `main`     | `jan/stream-rrd` | change  |
| ---------------------- | ---------- | ---------------- | ------- |
| signed_distance_fields | 21.328 MB  | 21.420 MB        | +0.43%  |
| deep_sdf               | 21.329 MB  | 21.329 MB        | 0.00%   |
| dicom                  | 67.657 MB  | 67.657 MB        | 0.00%   |
| raw_mesh               | 109.070 KB | 110.960 KB       | +1.73%  |
| clock                  | 54.901 KB  | 80.439 KB        | +46.52% |
| dicom_mri              | 67.656 MB  | 66.434 MB        | -1.81%  |
| api_demo               | 320.549 KB | 388.021 KB       | +21.05% |
| structure_from_motion  | 252.978 MB | 253.735 MB       | +0.30%  |
| car                    | 402.542 KB | 391.056 KB       | -2.85%  |
| rgbd                   | 168.420 MB | 167.382 MB       | -0.62%  |
| colmap                 | 253.054 MB | 253.054 MB       | 0.00%   |
| plots                  | 149.360 KB | 211.155 KB       | +41.37% |
| nyud                   | 168.467 MB | 168.467 MB       | 0.00%   |
| text_logging           | 2.732 KB   | 3.091 KB         | +13.14% |

### Checklist
* [ ] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [ ] I've included a screenshot or gif (if applicable)

<!-- This line will get updated when the PR build summary job finishes.
-->
PR Build Summary: https://build.rerun.io/pr/2412

<!-- pr-link-docs:start -->
Docs preview: https://rerun.io/preview/0c82335/docs
Examples preview: https://rerun.io/preview/0c82335/examples
<!-- pr-link-docs:end -->

---------

Co-authored-by: Emil Ernerfeldt <[email protected]>
  • Loading branch information
jprochazk and emilk authored Jun 15, 2023
1 parent bbe8e1e commit 2514b72
Show file tree
Hide file tree
Showing 13 changed files with 801 additions and 234 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/reusable_build_web_demo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ jobs:
env:
COMMIT_HASH: ${{ env.SHORT_SHA }}
run: |
python3 scripts/build_demo_app.py --skip-wasm-build
python3 scripts/build_demo_app.py --skip-build
- name: Upload web demo assets
uses: actions/upload-artifact@v3
Expand Down
38 changes: 38 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,44 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Launch tests",
"type": "lldb",
"request": "launch",
"cargo": {
"args": [
"test",
"-p=re_log_encoding",
"--no-run",
"--lib",
"--all-features"
],
"filter": {
"kind": "lib"
}
},
"cwd": "${workspaceFolder}"
},
{
"name": "Debug 'rerun' colmap.rrd from url",
"type": "lldb",
"request": "launch",
"cargo": {
"args": [
"build",
"--package=rerun-cli",
"--features=native_viewer"
],
"filter": {
"name": "rerun",
"kind": "bin"
}
},
"args": [
"https://demo.rerun.io/commit/0f89b62/examples/colmap/data.rrd"
],
"cwd": "${workspaceFolder}"
},
{
"name": "Debug 'rerun' data.rrd",
"type": "lldb",
Expand Down
20 changes: 18 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/re_log_encoding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ re_smart_channel.workspace = true
re_tracing.workspace = true

# External:
ehttp = "0.2"
ehttp = { version = "0.3", features = ["streaming"] }
parking_lot.workspace = true
thiserror.workspace = true
web-time.workspace = true
Expand Down
128 changes: 67 additions & 61 deletions crates/re_log_encoding/src/decoder.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
//! Decoding [`LogMsg`]:es from `.rrd` files/streams.
pub mod stream;

use re_log_types::LogMsg;

use crate::FileHeader;
use crate::MessageHeader;
use crate::OLD_RRD_HEADERS;
use crate::{Compression, EncodingOptions, Serializer};

// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -31,7 +36,7 @@ pub enum DecodeError {
#[error("Not an .rrd file")]
NotAnRrd,

#[error("Found an .rrd file from a Rerun version from 0.5.1 or earlier")]
#[error("Found an .rrd file from an old, incompatible Rerun version")]
OldRrdVersion,

#[error("Failed to decode the options: {0}")]
Expand All @@ -41,7 +46,7 @@ pub enum DecodeError {
Read(std::io::Error),

#[error("lz4 error: {0}")]
Lz4(std::io::Error),
Lz4(lz4_flex::block::DecompressError),

#[error("MsgPack error: {0}")]
MsgPack(#[from] rmp_serde::decode::Error),
Expand All @@ -60,71 +65,50 @@ pub fn decode_bytes(bytes: &[u8]) -> Result<Vec<LogMsg>, DecodeError> {

// ----------------------------------------------------------------------------

enum Decompressor<R: std::io::Read> {
Uncompressed(R),
Lz4(lz4_flex::frame::FrameDecoder<R>),
}
pub fn read_options(bytes: &[u8]) -> Result<EncodingOptions, DecodeError> {
let mut read = std::io::Cursor::new(bytes);

impl<R: std::io::Read> Decompressor<R> {
fn new(compression: Compression, read: R) -> Self {
match compression {
Compression::Off => Self::Uncompressed(read),
Compression::LZ4 => Self::Lz4(lz4_flex::frame::FrameDecoder::new(read)),
}
let FileHeader {
magic,
version,
options,
} = FileHeader::decode(&mut read)?;

if OLD_RRD_HEADERS.contains(&magic) {
return Err(DecodeError::OldRrdVersion);
} else if &magic != crate::RRD_HEADER {
return Err(DecodeError::NotAnRrd);
}

pub fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), DecodeError> {
use std::io::Read as _;
warn_on_version_mismatch(version);

match self {
Decompressor::Uncompressed(read) => read.read_exact(buf).map_err(DecodeError::Read),
Decompressor::Lz4(lz4) => lz4.read_exact(buf).map_err(DecodeError::Lz4),
}
match options.serializer {
Serializer::MsgPack => {}
}
}

// ----------------------------------------------------------------------------
Ok(options)
}

pub struct Decoder<R: std::io::Read> {
decompressor: Decompressor<R>,
buffer: Vec<u8>,
compression: Compression,
read: R,
uncompressed: Vec<u8>, // scratch space
compressed: Vec<u8>, // scratch space
}

impl<R: std::io::Read> Decoder<R> {
pub fn new(mut read: R) -> Result<Self, DecodeError> {
re_tracing::profile_function!();

{
let mut header = [0_u8; 4];
read.read_exact(&mut header).map_err(DecodeError::Read)?;
if &header == b"RRF0" {
return Err(DecodeError::OldRrdVersion);
} else if &header != crate::RRD_HEADER {
return Err(DecodeError::NotAnRrd);
}
}

{
let mut version_bytes = [0_u8; 4];
read.read_exact(&mut version_bytes)
.map_err(DecodeError::Read)?;
warn_on_version_mismatch(version_bytes);
}

let options = {
let mut options_bytes = [0_u8; 4];
read.read_exact(&mut options_bytes)
.map_err(DecodeError::Read)?;
EncodingOptions::from_bytes(options_bytes)?
};

match options.serializer {
Serializer::MsgPack => {}
}
let mut data = [0_u8; FileHeader::SIZE];
read.read_exact(&mut data).map_err(DecodeError::Read)?;
let compression = read_options(&data)?.compression;

Ok(Self {
decompressor: Decompressor::new(options.compression, read),
buffer: vec![],
compression,
read,
uncompressed: vec![],
compressed: vec![],
})
}
}
Expand All @@ -135,21 +119,43 @@ impl<R: std::io::Read> Iterator for Decoder<R> {
fn next(&mut self) -> Option<Self::Item> {
re_tracing::profile_function!();

let mut len = [0_u8; 8];
self.decompressor.read_exact(&mut len).ok()?;
let len = u64::from_le_bytes(len) as usize;

self.buffer.resize(len, 0);
let header = match MessageHeader::decode(&mut self.read) {
Ok(header) => header,
Err(err) => match err {
DecodeError::Read(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
return None
}
other => return Some(Err(other)),
},
};

{
re_tracing::profile_scope!("lz4");
if let Err(err) = self.decompressor.read_exact(&mut self.buffer) {
return Some(Err(err));
match self.compression {
Compression::Off => {
self.uncompressed
.resize(header.uncompressed_len as usize, 0);
if let Err(err) = self.read.read_exact(&mut self.uncompressed) {
return Some(Err(DecodeError::Read(err)));
}
}
Compression::LZ4 => {
self.compressed.resize(header.compressed_len as usize, 0);
if let Err(err) = self.read.read_exact(&mut self.compressed) {
return Some(Err(DecodeError::Read(err)));
}
self.uncompressed
.resize(header.uncompressed_len as usize, 0);

re_tracing::profile_scope!("lz4");
if let Err(err) =
lz4_flex::block::decompress_into(&self.compressed, &mut self.uncompressed)
{
return Some(Err(DecodeError::Lz4(err)));
}
}
}

re_tracing::profile_scope!("MsgPack deser");
match rmp_serde::from_read(&mut self.buffer.as_slice()) {
match rmp_serde::from_slice(&self.uncompressed) {
Ok(msg) => Some(Ok(msg)),
Err(err) => Some(Err(err.into())),
}
Expand Down
Loading

1 comment on commit 2514b72

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark 'Rust Benchmark'.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 1.25.

Benchmark suite Current: 2514b72 Previous: 646825f Ratio
datastore/num_rows=1000/num_instances=1000/packed=false/insert/default 5072292 ns/iter (± 272494) 2899822 ns/iter (± 6886) 1.75
datastore/num_rows=1000/num_instances=1000/packed=false/latest_at/default 408 ns/iter (± 9) 308 ns/iter (± 1) 1.32
datastore/num_rows=1000/num_instances=1000/packed=false/latest_at_missing/primary/default 299 ns/iter (± 0) 230 ns/iter (± 0) 1.30
datastore/num_rows=1000/num_instances=1000/packed=false/latest_at_missing/secondaries/default 460 ns/iter (± 0) 341 ns/iter (± 1) 1.35
datastore/num_rows=1000/num_instances=1000/packed=false/range/default 5485391 ns/iter (± 93362) 2891988 ns/iter (± 6364) 1.90
datastore/num_rows=1000/num_instances=1000/gc/default 2726824 ns/iter (± 15564) 1743447 ns/iter (± 3514) 1.56
mono_points_arrow/generate_message_bundles 37738630 ns/iter (± 2023763) 29806987 ns/iter (± 909757) 1.27
mono_points_arrow/encode_log_msg 232505209 ns/iter (± 2256473) 139628591 ns/iter (± 842594) 1.67
mono_points_arrow/encode_total 425415092 ns/iter (± 7494942) 331815362 ns/iter (± 1431340) 1.28
mono_points_arrow_batched/generate_message_bundles 32028484 ns/iter (± 1263381) 18997538 ns/iter (± 76228) 1.69
mono_points_arrow_batched/generate_messages 8963053 ns/iter (± 402528) 3761085 ns/iter (± 12598) 2.38
mono_points_arrow_batched/encode_log_msg 660298 ns/iter (± 3141) 417734 ns/iter (± 1930) 1.58
mono_points_arrow_batched/encode_total 42121584 ns/iter (± 1174215) 23479694 ns/iter (± 114011) 1.79
mono_points_arrow_batched/decode_log_msg 508852 ns/iter (± 6057) 299741 ns/iter (± 699) 1.70
mono_points_arrow_batched/decode_message_bundles 12685150 ns/iter (± 563370) 7746989 ns/iter (± 8893) 1.64
mono_points_arrow_batched/decode_total 13379340 ns/iter (± 196697) 8200242 ns/iter (± 25366) 1.63
batch_points_arrow/encode_log_msg 75667 ns/iter (± 1554) 56432 ns/iter (± 129) 1.34
arrow_mono_points/insert 2892026758 ns/iter (± 39016864) 1764956963 ns/iter (± 6957212) 1.64
arrow_mono_points/query 1405652 ns/iter (± 31143) 952998 ns/iter (± 4479) 1.47
arrow_batch_points/insert 1502782 ns/iter (± 7050) 1189988 ns/iter (± 3605) 1.26
arrow_batch_points/query 17203 ns/iter (± 27) 12379 ns/iter (± 4) 1.39
arrow_batch_vecs/insert 31462 ns/iter (± 283) 23985 ns/iter (± 36) 1.31
arrow_batch_vecs/query 476271 ns/iter (± 857) 301347 ns/iter (± 832) 1.58

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.