Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix crash on decoding old .rrd files #1579

Merged
merged 2 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/re_data_store/examples/memory_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ fn log_messages() {
fn decode_log_msg(mut bytes: &[u8]) -> LogMsg {
let mut messages = re_log_types::encoding::Decoder::new(&mut bytes)
.unwrap()
.collect::<anyhow::Result<Vec<LogMsg>>>()
.collect::<Result<Vec<LogMsg>, _>>()
.unwrap();
assert!(bytes.is_empty());
assert_eq!(messages.len(), 1);
Expand Down
2 changes: 1 addition & 1 deletion crates/re_log_types/benches/msg_encode_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fn encode_log_msgs(messages: &[LogMsg]) -> Vec<u8> {
fn decode_log_msgs(mut bytes: &[u8]) -> Vec<LogMsg> {
let messages = re_log_types::encoding::Decoder::new(&mut bytes)
.unwrap()
.collect::<anyhow::Result<Vec<LogMsg>>>()
.collect::<Result<Vec<LogMsg>, _>>()
.unwrap();
assert!(bytes.is_empty());
messages
Expand Down
69 changes: 49 additions & 20 deletions crates/re_log_types/src/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,34 @@ fn warn_on_version_mismatch(encoded_version: [u8; 4]) {
}
}

// ----------------------------------------------------------------------------

/// On failure to encode or serialize a [`LogMsg`].
#[cfg(feature = "load")]
#[derive(thiserror::Error, Debug)]
pub enum DecodeError {
#[error("Not an .rrd file")]
NotAnRrd,

#[error("Failed to read: {0}")]
Read(std::io::Error),

#[cfg(not(target_arch = "wasm32"))]
#[error("Zstd error: {0}")]
Zstd(std::io::Error),

#[cfg(target_arch = "wasm32")]
#[error("Zstd error: {0}")]
RuzstdInit(ruzstd::frame_decoder::FrameDecoderError),

#[cfg(target_arch = "wasm32")]
#[error("Zstd read error: {0}")]
RuzstdRead(std::io::Error),

#[error("MsgPack error: {0}")]
MsgPack(#[from] rmp_serde::decode::Error),
}

// ----------------------------------------------------------------------------
// native decode:

Expand All @@ -121,17 +149,18 @@ pub struct Decoder<'r, R: std::io::BufRead> {
#[cfg(feature = "load")]
#[cfg(not(target_arch = "wasm32"))]
impl<'r, R: std::io::Read> Decoder<'r, std::io::BufReader<R>> {
pub fn new(mut read: R) -> anyhow::Result<Self> {
pub fn new(mut read: R) -> Result<Self, DecodeError> {
crate::profile_function!();
use anyhow::Context as _;

let mut header = [0_u8; 4];
read.read_exact(&mut header).context("missing header")?;
anyhow::ensure!(&header == b"RRF0", "Not a rerun file");
read.read_exact(&mut header).context("missing header")?;
read.read_exact(&mut header).map_err(DecodeError::Read)?;
if &header != b"RRF0" {
return Err(DecodeError::NotAnRrd);
}
read.read_exact(&mut header).map_err(DecodeError::Read)?;
warn_on_version_mismatch(header);

let zdecoder = zstd::stream::read::Decoder::new(read).context("zstd")?;
let zdecoder = zstd::stream::read::Decoder::new(read).map_err(DecodeError::Zstd)?;
Ok(Self {
zdecoder,
buffer: vec![],
Expand All @@ -142,7 +171,7 @@ impl<'r, R: std::io::Read> Decoder<'r, std::io::BufReader<R>> {
#[cfg(feature = "load")]
#[cfg(not(target_arch = "wasm32"))]
impl<'r, R: std::io::BufRead> Iterator for Decoder<'r, R> {
type Item = anyhow::Result<LogMsg>;
type Item = Result<LogMsg, DecodeError>;

fn next(&mut self) -> Option<Self::Item> {
crate::profile_function!();
Expand All @@ -157,14 +186,14 @@ impl<'r, R: std::io::BufRead> Iterator for Decoder<'r, R> {
{
crate::profile_scope!("zstd");
if let Err(err) = self.zdecoder.read_exact(&mut self.buffer) {
return Some(Err(anyhow::anyhow!("zstd: {err}")));
return Some(Err(DecodeError::Zstd(err)));
}
}

crate::profile_scope!("MsgPack deser");
match rmp_serde::from_read(&mut self.buffer.as_slice()) {
Ok(msg) => Some(Ok(msg)),
Err(err) => Some(Err(anyhow::anyhow!("MessagePack: {err}"))),
Err(err) => Some(Err(err.into())),
}
}
}
Expand All @@ -182,18 +211,18 @@ pub struct Decoder<R: std::io::Read> {
#[cfg(feature = "load")]
#[cfg(target_arch = "wasm32")]
impl<R: std::io::Read> Decoder<R> {
pub fn new(mut read: R) -> anyhow::Result<Self> {
pub fn new(mut read: R) -> Result<Self, DecodeError> {
crate::profile_function!();
use anyhow::Context as _;

let mut header = [0_u8; 4];
read.read_exact(&mut header).context("missing header")?;
anyhow::ensure!(&header == b"RRF0", "Not a rerun file");
read.read_exact(&mut header).context("missing header")?;
read.read_exact(&mut header).map_err(DecodeError::Read)?;
if &header != b"RRF0" {
return Err(DecodeError::NotAnRrd);
}
read.read_exact(&mut header).map_err(DecodeError::Read)?;
warn_on_version_mismatch(header);

let zdecoder =
ruzstd::StreamingDecoder::new(read).map_err(|err| anyhow::anyhow!("ruzstd: {err}"))?;
let zdecoder = ruzstd::StreamingDecoder::new(read).map_err(DecodeError::RuzstdInit)?;
Ok(Self {
zdecoder,
buffer: vec![],
Expand All @@ -204,7 +233,7 @@ impl<R: std::io::Read> Decoder<R> {
#[cfg(feature = "load")]
#[cfg(target_arch = "wasm32")]
impl<R: std::io::Read> Iterator for Decoder<R> {
type Item = anyhow::Result<LogMsg>;
type Item = Result<LogMsg, DecodeError>;

fn next(&mut self) -> Option<Self::Item> {
crate::profile_function!();
Expand All @@ -219,14 +248,14 @@ impl<R: std::io::Read> Iterator for Decoder<R> {
{
crate::profile_scope!("ruzstd");
if let Err(err) = self.zdecoder.read_exact(&mut self.buffer) {
return Some(Err(anyhow::anyhow!("ruzstd: {err}")));
return Some(Err(DecodeError::RuzstdRead(err)));
}
}

crate::profile_scope!("MsgPack deser");
match rmp_serde::from_read(&mut self.buffer.as_slice()) {
Ok(msg) => Some(Ok(msg)),
Err(err) => Some(Err(anyhow::anyhow!("MessagePack: {err}"))),
Err(err) => Some(Err(err.into())),
}
}
}
Expand Down Expand Up @@ -257,7 +286,7 @@ fn test_encode_decode() {

let decoded_messages = Decoder::new(&mut file.as_slice())
.unwrap()
.collect::<anyhow::Result<Vec<LogMsg>>>()
.collect::<Result<Vec<LogMsg>, DecodeError>>()
.unwrap();

assert_eq!(messages, decoded_messages);
Expand Down
10 changes: 9 additions & 1 deletion crates/rerun/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,11 +385,19 @@ fn load_file_to_channel(path: &std::path::Path) -> anyhow::Result<Receiver<LogMs
path: path.to_owned(),
});

let path = path.to_owned();
std::thread::Builder::new()
.name("rrd_file_reader".into())
.spawn(move || {
for msg in decoder {
tx.send(msg.unwrap()).ok();
match msg {
Ok(msg) => {
tx.send(msg).ok();
}
Err(err) => {
re_log::warn_once!("Failed to decode message in {path:?}: {err}");
}
}
}
})
.expect("Failed to spawn thread");
Expand Down