Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream .rrd files #2412

Merged
merged 46 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
46c1a34
temp
jprochazk Jun 12, 2023
1643efc
Merge branch 'main' into jan/stream-rrd
jprochazk Jun 13, 2023
1d2efde
Merge branch 'main' into jan/stream-rrd
jprochazk Jun 13, 2023
15d726c
start implementing stream decoder
jprochazk Jun 13, 2023
f0a5aad
remove prints
jprochazk Jun 13, 2023
46e1432
Merge branch 'main' into jan/stream-rrd
jprochazk Jun 14, 2023
4461bd4
update ehttp
jprochazk Jun 14, 2023
ace4ab8
use `ops::ControlFlow`
jprochazk Jun 14, 2023
d27c313
`finish` each chunk during encoding
jprochazk Jun 14, 2023
18ab421
`read_options` from unsized slice
jprochazk Jun 14, 2023
ecf93d8
simplify stream decoder
jprochazk Jun 14, 2023
999a3ff
do not filter tests in launch.json
jprochazk Jun 14, 2023
b222755
update ehttp commit
jprochazk Jun 14, 2023
41e4ce2
use lz4 blocks and simplify stream decoder further
jprochazk Jun 14, 2023
ec52b1c
Merge branch 'main' into jan/stream-rrd
jprochazk Jun 14, 2023
c60c2be
fix lints
jprochazk Jun 14, 2023
c89074f
update ehttp after rebase on upstream master
jprochazk Jun 14, 2023
daf3f75
Merge branch 'main' into jan/stream-rrd
jprochazk Jun 15, 2023
d51f9fd
update to `RRF2`
jprochazk Jun 15, 2023
65fcd39
update naming + add comment about message header len equality
jprochazk Jun 15, 2023
b536af7
some comments about state machine
jprochazk Jun 15, 2023
f3c661e
Update crates/re_log_encoding/src/decoder/stream.rs
jprochazk Jun 15, 2023
04c4fd7
clarify `ChunkBuffer.buffer` and `try_read` behavior
jprochazk Jun 15, 2023
d713cf8
rename `cursor` to `buffer_fill` and update comment
jprochazk Jun 15, 2023
a78512b
make `try_read` infallible
jprochazk Jun 15, 2023
bcaf5b8
prevent `try_read` from returning same bytes for the same n
jprochazk Jun 15, 2023
1825280
add more unit tests
jprochazk Jun 15, 2023
1277f3d
Update crates/re_log_encoding/src/stream_rrd_from_http.rs
jprochazk Jun 15, 2023
bb5af62
remove dead code
jprochazk Jun 15, 2023
8106e94
fix lints
jprochazk Jun 15, 2023
1d5de37
fix typo
jprochazk Jun 15, 2023
96cf7bb
Merge branch 'main' into jan/stream-rrd
jprochazk Jun 15, 2023
64b2de5
add assert against incomplete read
jprochazk Jun 15, 2023
31f4cae
update old rrd version message
jprochazk Jun 15, 2023
3a87459
add stream end debug log
jprochazk Jun 15, 2023
0e91dc2
switch ehttp to `emilk/ehttp@master`
jprochazk Jun 15, 2023
f3d2bf5
Update crates/re_log_encoding/src/decoder/stream.rs
jprochazk Jun 15, 2023
b80731a
early-out on empty chunks
jprochazk Jun 15, 2023
38d3403
Merge branch 'main' into jan/stream-rrd
jprochazk Jun 15, 2023
ae5664c
pin the ehttp version
emilk Jun 15, 2023
8b2cf30
update Cargo.lock
emilk Jun 15, 2023
14efd77
Improve OldRrdVersion error message
emilk Jun 15, 2023
12c645f
build_demo_app.py: build Python SDK before running examples
emilk Jun 15, 2023
f5f44f3
update formatting
jprochazk Jun 15, 2023
e5026f0
Merge branch 'main' into jan/stream-rrd
jprochazk Jun 15, 2023
0c82335
update ehttp
jprochazk Jun 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,44 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Launch tests",
"type": "lldb",
"request": "launch",
"cargo": {
"args": [
"test",
"-p=re_log_encoding",
"--no-run",
"--lib",
"--all-features"
],
"filter": {
"kind": "lib"
}
},
"cwd": "${workspaceFolder}"
},
{
"name": "Debug 'rerun' colmap.rrd from url",
"type": "lldb",
"request": "launch",
"cargo": {
"args": [
"build",
"--package=rerun-cli",
"--features=native_viewer"
],
"filter": {
"name": "rerun",
"kind": "bin"
}
},
"args": [
"https://demo.rerun.io/commit/0f89b62/examples/colmap/data.rrd"
],
"cwd": "${workspaceFolder}"
},
{
"name": "Debug 'rerun' data.rrd",
"type": "lldb",
Expand Down
18 changes: 16 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion crates/re_log_encoding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ re_smart_channel.workspace = true
re_tracing.workspace = true

# External:
ehttp = "0.2"
ehttp = { git = "https://github.com/jprochazk/ehttp.git", features = [
"streaming",
] }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should update this to point to the main repo before merge

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emilk/ehttp#28 has been merged, so we can update this now

parking_lot.workspace = true
thiserror.workspace = true
web-time.workspace = true
Expand Down
123 changes: 63 additions & 60 deletions crates/re_log_encoding/src/decoder.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
//! Decoding [`LogMsg`]:es from `.rrd` files/streams.

pub mod stream;

use re_log_types::LogMsg;

use crate::FileHeader;
use crate::MessageHeader;
use crate::{Compression, EncodingOptions, Serializer};

// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -41,7 +45,7 @@ pub enum DecodeError {
Read(std::io::Error),

#[error("lz4 error: {0}")]
Lz4(std::io::Error),
Lz4(lz4_flex::block::DecompressError),

#[error("MsgPack error: {0}")]
MsgPack(#[from] rmp_serde::decode::Error),
Expand All @@ -60,71 +64,50 @@ pub fn decode_bytes(bytes: &[u8]) -> Result<Vec<LogMsg>, DecodeError> {

// ----------------------------------------------------------------------------

enum Decompressor<R: std::io::Read> {
Uncompressed(R),
Lz4(lz4_flex::frame::FrameDecoder<R>),
}
pub fn read_options(bytes: &[u8]) -> Result<EncodingOptions, DecodeError> {
let mut read = std::io::Cursor::new(bytes);

impl<R: std::io::Read> Decompressor<R> {
fn new(compression: Compression, read: R) -> Self {
match compression {
Compression::Off => Self::Uncompressed(read),
Compression::LZ4 => Self::Lz4(lz4_flex::frame::FrameDecoder::new(read)),
}
let FileHeader {
magic,
version,
options,
} = FileHeader::decode(&mut read)?;

if &magic == b"RRF0" {
emilk marked this conversation as resolved.
Show resolved Hide resolved
return Err(DecodeError::OldRrdVersion);
} else if &magic != crate::RRD_HEADER {
return Err(DecodeError::NotAnRrd);
}

pub fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), DecodeError> {
use std::io::Read as _;
warn_on_version_mismatch(version);

match self {
Decompressor::Uncompressed(read) => read.read_exact(buf).map_err(DecodeError::Read),
Decompressor::Lz4(lz4) => lz4.read_exact(buf).map_err(DecodeError::Lz4),
}
match options.serializer {
Serializer::MsgPack => {}
}
}

// ----------------------------------------------------------------------------
Ok(options)
}

pub struct Decoder<R: std::io::Read> {
decompressor: Decompressor<R>,
buffer: Vec<u8>,
compression: Compression,
read: R,
uncompressed: Vec<u8>,
compressed: Vec<u8>,
}

impl<R: std::io::Read> Decoder<R> {
pub fn new(mut read: R) -> Result<Self, DecodeError> {
re_tracing::profile_function!();

{
let mut header = [0_u8; 4];
read.read_exact(&mut header).map_err(DecodeError::Read)?;
if &header == b"RRF0" {
return Err(DecodeError::OldRrdVersion);
} else if &header != crate::RRD_HEADER {
return Err(DecodeError::NotAnRrd);
}
}

{
let mut version_bytes = [0_u8; 4];
read.read_exact(&mut version_bytes)
.map_err(DecodeError::Read)?;
warn_on_version_mismatch(version_bytes);
}

let options = {
let mut options_bytes = [0_u8; 4];
read.read_exact(&mut options_bytes)
.map_err(DecodeError::Read)?;
EncodingOptions::from_bytes(options_bytes)?
};

match options.serializer {
Serializer::MsgPack => {}
}
let mut data = [0_u8; FileHeader::SIZE];
read.read_exact(&mut data).map_err(DecodeError::Read)?;
let compression = read_options(&data)?.compression;

Ok(Self {
decompressor: Decompressor::new(options.compression, read),
buffer: vec![],
compression,
read,
uncompressed: vec![],
compressed: vec![],
jprochazk marked this conversation as resolved.
Show resolved Hide resolved
})
}
}
Expand All @@ -135,21 +118,41 @@ impl<R: std::io::Read> Iterator for Decoder<R> {
fn next(&mut self) -> Option<Self::Item> {
re_tracing::profile_function!();

let mut len = [0_u8; 8];
self.decompressor.read_exact(&mut len).ok()?;
let len = u64::from_le_bytes(len) as usize;

self.buffer.resize(len, 0);
let header = match MessageHeader::decode(&mut self.read) {
Ok(header) => header,
Err(err) => match err {
DecodeError::Read(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
return None
}
other => return Some(Err(other)),
},
};

{
re_tracing::profile_scope!("lz4");
if let Err(err) = self.decompressor.read_exact(&mut self.buffer) {
return Some(Err(err));
match self.compression {
Compression::Off => {
self.uncompressed.resize(header.uncompressed as usize, 0);
if let Err(err) = self.read.read_exact(&mut self.uncompressed) {
return Some(Err(DecodeError::Read(err)));
}
}
Compression::LZ4 => {
self.compressed.resize(header.compressed as usize, 0);
if let Err(err) = self.read.read_exact(&mut self.compressed) {
return Some(Err(DecodeError::Read(err)));
}
self.uncompressed.resize(header.uncompressed as usize, 0);

re_tracing::profile_scope!("lz4");
if let Err(err) =
lz4_flex::block::decompress_into(&self.compressed, &mut self.uncompressed)
{
return Some(Err(DecodeError::Lz4(err)));
}
}
}

re_tracing::profile_scope!("MsgPack deser");
match rmp_serde::from_read(&mut self.buffer.as_slice()) {
match rmp_serde::from_slice(&self.uncompressed) {
Ok(msg) => Some(Ok(msg)),
Err(err) => Some(Err(err.into())),
}
Expand Down
Loading