Skip to content

Commit

Permalink
SDK: stream to disk with save feature (#1405)
Browse files Browse the repository at this point in the history
* Refactor LogMsg-encoder to be a `struct`

* Small cleanup of Sender

* Refactor Session: extract the web-viewer connection

* Refactor Session::connect

* Log SDK: stream log data to file on disk

* Rust examples: --save streams to disk

* Python examples: start streaming to disk right away

* typo

Co-authored-by: Clement Rey <[email protected]>

---------

Co-authored-by: Clement Rey <[email protected]>
  • Loading branch information
emilk and teh-cmc authored Feb 26, 2023
1 parent 90db32b commit c97747a
Show file tree
Hide file tree
Showing 10 changed files with 241 additions and 146 deletions.
85 changes: 62 additions & 23 deletions crates/re_log_types/src/encoding.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,79 @@
//! Saving/loading [`LogMsg`]:es to/from a file.
//! Encoding/decoding [`LogMsg`]:es as `.rrd` files.
use crate::LogMsg;

// ----------------------------------------------------------------------------
// native encode:

#[cfg(feature = "save")]
#[cfg(not(target_arch = "wasm32"))]
pub fn encode<'a>(
messages: impl Iterator<Item = &'a LogMsg>,
mut write: impl std::io::Write,
) -> anyhow::Result<()> {
crate::profile_function!();
mod encoder {
use anyhow::Context as _;
use std::io::Write as _;

write.write_all(b"RRF0").context("header")?;
write.write_all(&[0, 0, 0, 0]).context("header")?; // reserved for future use
use crate::LogMsg;

let level = 3;
let mut encoder = zstd::stream::Encoder::new(write, level).context("zstd start")?;
/// Encode a stream of [`LogMsg`] into an `.rrd` file.
pub struct Encoder<W: std::io::Write> {
zstd_encoder: zstd::stream::Encoder<'static, W>,
buffer: Vec<u8>,
}

let mut buffer = vec![];
impl<W: std::io::Write> Encoder<W> {
pub fn new(mut write: W) -> anyhow::Result<Self> {
write.write_all(b"RRF0").context("header")?;
write.write_all(&[0, 0, 0, 0]).context("header")?; // reserved for future use

for message in messages {
buffer.clear();
rmp_serde::encode::write_named(&mut buffer, message).context("MessagePack encoding")?;
encoder
.write_all(&(buffer.len() as u64).to_le_bytes())
.context("zstd write")?;
encoder.write_all(&buffer).context("zstd write")?;
}
let level = 3;
let zstd_encoder = zstd::stream::Encoder::new(write, level).context("zstd start")?;

encoder.finish().context("zstd finish")?;
Ok(Self {
zstd_encoder,
buffer: vec![],
})
}

pub fn append(&mut self, message: &LogMsg) -> anyhow::Result<()> {
let Self {
zstd_encoder,
buffer,
} = self;

buffer.clear();
rmp_serde::encode::write_named(buffer, message).context("MessagePack encoding")?;

Ok(())
zstd_encoder
.write_all(&(buffer.len() as u64).to_le_bytes())
.context("zstd write")?;
zstd_encoder.write_all(buffer).context("zstd write")?;

Ok(())
}

pub fn finish(self) -> anyhow::Result<()> {
self.zstd_encoder.finish().context("zstd finish")?;
Ok(())
}
}

pub fn encode<'a>(
messages: impl Iterator<Item = &'a LogMsg>,
write: impl std::io::Write,
) -> anyhow::Result<()> {
let mut encoder = Encoder::new(write)?;
for message in messages {
encoder.append(message)?;
}
encoder.finish()
}
}

#[cfg(feature = "save")]
#[cfg(not(target_arch = "wasm32"))]
pub use encoder::*;

// ----------------------------------------------------------------------------
// native
// native decode:

#[cfg(feature = "load")]
#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -95,7 +134,7 @@ impl<'r, R: std::io::BufRead> Iterator for Decoder<'r, R> {
}

// ----------------------------------------------------------------------------
// wasm:
// wasm decode:

#[cfg(feature = "load")]
#[cfg(target_arch = "wasm32")]
Expand Down
26 changes: 10 additions & 16 deletions crates/re_sdk/src/clap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use std::{net::SocketAddr, path::PathBuf};

#[derive(Debug, Clone, PartialEq, Eq)]
enum RerunBehavior {
Connect(SocketAddr),
Save(PathBuf),
#[cfg(feature = "web_viewer")]
Serve,
Connect(SocketAddr),
Spawn,
}

Expand Down Expand Up @@ -58,33 +58,27 @@ pub struct RerunArgs {

impl RerunArgs {
/// Run common Rerun script setup actions. Connect to the viewer if necessary.
pub fn on_startup(&self, session: &mut Session) -> bool {
///
/// Returns `true` if you should call `session.spawn`.
pub fn on_startup(&self, session: &mut Session) -> anyhow::Result<bool> {
match self.to_behavior() {
RerunBehavior::Connect(addr) => session.connect(addr),
RerunBehavior::Spawn => return true,
RerunBehavior::Save(path) => session.save(path)?,
#[cfg(feature = "web_viewer")]
RerunBehavior::Serve => session.serve(true),
RerunBehavior::Save(_) => {}
RerunBehavior::Spawn => return Ok(true),
}

false
Ok(false)
}

/// Run common post-actions. Sleep if serving the web viewer.
pub fn on_teardown(&self, session: &mut Session) -> anyhow::Result<()> {
let behavior = self.to_behavior();

pub fn on_teardown(&self) {
#[cfg(feature = "web_viewer")]
if behavior == RerunBehavior::Serve {
eprintln!("Sleeping while serving the web viewer. Abort with Ctrl-C");
if self.to_behavior() == RerunBehavior::Serve {
eprintln!("Sleeping while serving the web viewer. Abort with Ctrl-C"); // TODO(emilk): sleep in `drop` instead?
std::thread::sleep(std::time::Duration::from_secs(1_000_000));
}

if let RerunBehavior::Save(path) = behavior {
session.save(path)?;
}

Ok(())
}

fn to_behavior(&self) -> RerunBehavior {
Expand Down
57 changes: 57 additions & 0 deletions crates/re_sdk/src/file_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use anyhow::Context as _;

use re_log_types::LogMsg;

pub struct FileWriter {
// None = quit
tx: std::sync::mpsc::Sender<Option<LogMsg>>,
join_handle: Option<std::thread::JoinHandle<()>>,
}

impl Drop for FileWriter {
fn drop(&mut self) {
self.tx.send(None).ok();
if let Some(join_handle) = self.join_handle.take() {
join_handle.join().ok();
}
}
}

impl FileWriter {
pub fn new(path: impl Into<std::path::PathBuf>) -> anyhow::Result<Self> {
let (tx, rx) = std::sync::mpsc::channel();

let path = path.into();

re_log::debug!("Saving file to {path:?}…");

let file = std::fs::File::create(&path).with_context(|| format!("Path: {:?}", path))?;
let mut encoder = re_log_types::encoding::Encoder::new(file)?;

let join_handle = std::thread::Builder::new()
.name("file_writer".into())
.spawn(move || {
while let Ok(Some(log_msg)) = rx.recv() {
if let Err(err) = encoder.append(&log_msg) {
re_log::error!("Failed to save log stream to {path:?}: {err}");
return;
}
}
if let Err(err) = encoder.finish() {
re_log::error!("Failed to save log stream to {path:?}: {err}");
} else {
re_log::debug!("Log stream saved to {path:?}");
}
})
.context("Failed to spawn thread")?;

Ok(Self {
tx,
join_handle: Some(join_handle),
})
}

pub fn write(&self, msg: LogMsg) {
self.tx.send(Some(msg)).ok();
}
}
4 changes: 4 additions & 0 deletions crates/re_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
#![warn(missing_docs)] // Let's keep the this crate well-documented!

// Send data to a rerun session
#[cfg(not(target_arch = "wasm32"))]
mod file_writer;
mod global;
mod msg_sender;
#[cfg(feature = "web_viewer")]
mod remote_viewer_server;
mod session;

pub use self::global::{global_session, global_session_with_default_enabled};
Expand Down
59 changes: 59 additions & 0 deletions crates/re_sdk/src/remote_viewer_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use re_log_types::LogMsg;

/// Hosts two servers:
/// * A web-server, serving the web-viewer
/// * A `WebSocket` server, server [`LogMsg`]es to remote viewer(s).
pub struct RemoteViewerServer {
web_server_join_handle: tokio::task::JoinHandle<()>,
sender: re_smart_channel::Sender<LogMsg>,
}

impl Drop for RemoteViewerServer {
fn drop(&mut self) {
re_log::info!("Shutting down web server.");
self.web_server_join_handle.abort();
}
}

impl RemoteViewerServer {
pub fn new(tokio_rt: &tokio::runtime::Runtime, open_browser: bool) -> Self {
let (rerun_tx, rerun_rx) = re_smart_channel::smart_channel(re_smart_channel::Source::Sdk);

let web_server_join_handle = tokio_rt.spawn(async move {
// This is the server which the web viewer will talk to:
let ws_server = re_ws_comms::Server::new(re_ws_comms::DEFAULT_WS_SERVER_PORT)
.await
.unwrap();
let ws_server_handle = tokio::spawn(ws_server.listen(rerun_rx)); // TODO(emilk): use tokio_rt ?

// This is the server that serves the Wasm+HTML:
let web_port = 9090;
let web_server = re_web_server::WebServer::new(web_port);
let web_server_handle = tokio::spawn(async move {
web_server.serve().await.unwrap();
});

let ws_server_url = re_ws_comms::default_server_url();
let viewer_url = format!("http://127.0.0.1:{web_port}?url={ws_server_url}");
if open_browser {
webbrowser::open(&viewer_url).ok();
} else {
re_log::info!("Web server is running - view it at {viewer_url}");
}

ws_server_handle.await.unwrap().unwrap();
web_server_handle.await.unwrap();
});

Self {
web_server_join_handle,
sender: rerun_tx,
}
}

pub fn send(&self, msg: LogMsg) {
if let Err(err) = self.sender.send(msg) {
re_log::error_once!("Failed to send log message to web server: {err}");
}
}
}
Loading

1 comment on commit c97747a

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust Benchmark

Benchmark suite Current: c97747a Previous: 90db32b Ratio
datastore/insert/batch/rects/insert 562683 ns/iter (± 5452) 547348 ns/iter (± 2815) 1.03
datastore/latest_at/batch/rects/query 1872 ns/iter (± 11) 1830 ns/iter (± 5) 1.02
datastore/latest_at/missing_components/primary 352 ns/iter (± 3) 358 ns/iter (± 0) 0.98
datastore/latest_at/missing_components/secondaries 413 ns/iter (± 5) 425 ns/iter (± 2) 0.97
datastore/range/batch/rects/query 152009 ns/iter (± 1789) 152112 ns/iter (± 580) 1.00
mono_points_arrow/generate_message_bundles 43940291 ns/iter (± 1271642) 45855910 ns/iter (± 840388) 0.96
mono_points_arrow/generate_messages 124306090 ns/iter (± 1361079) 125104798 ns/iter (± 1052527) 0.99
mono_points_arrow/encode_log_msg 148283826 ns/iter (± 1441515) 152105812 ns/iter (± 1019000) 0.97
mono_points_arrow/encode_total 322695077 ns/iter (± 3011993) 326032772 ns/iter (± 1243889) 0.99
mono_points_arrow/decode_log_msg 172713411 ns/iter (± 1540497) 177194253 ns/iter (± 828451) 0.97
mono_points_arrow/decode_message_bundles 62192589 ns/iter (± 782759) 64301048 ns/iter (± 714563) 0.97
mono_points_arrow/decode_total 233713482 ns/iter (± 2179769) 238352334 ns/iter (± 1853136) 0.98
batch_points_arrow/generate_message_bundles 328899 ns/iter (± 3928) 325314 ns/iter (± 1175) 1.01
batch_points_arrow/generate_messages 6101 ns/iter (± 62) 6245 ns/iter (± 33) 0.98
batch_points_arrow/encode_log_msg 370951 ns/iter (± 3365) 373141 ns/iter (± 1733) 0.99
batch_points_arrow/encode_total 710103 ns/iter (± 6365) 715043 ns/iter (± 2797) 0.99
batch_points_arrow/decode_log_msg 350493 ns/iter (± 2018) 350709 ns/iter (± 1761) 1.00
batch_points_arrow/decode_message_bundles 2023 ns/iter (± 16) 2031 ns/iter (± 12) 1.00
batch_points_arrow/decode_total 354190 ns/iter (± 1031) 356021 ns/iter (± 1070) 0.99
arrow_mono_points/insert 6054023341 ns/iter (± 22243189) 6105989507 ns/iter (± 13623479) 0.99
arrow_mono_points/query 1703940 ns/iter (± 23239) 1737889 ns/iter (± 10434) 0.98
arrow_batch_points/insert 2660869 ns/iter (± 28520) 2679472 ns/iter (± 10887) 0.99
arrow_batch_points/query 17451 ns/iter (± 177) 17490 ns/iter (± 145) 1.00
tuid/Tuid::random 34 ns/iter (± 0) 34 ns/iter (± 1) 1

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.