Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDK: stream to disk with save feature #1405

Merged
merged 8 commits into from
Feb 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 62 additions & 23 deletions crates/re_log_types/src/encoding.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,79 @@
//! Saving/loading [`LogMsg`]:es to/from a file.
//! Encoding/decoding [`LogMsg`]:es as `.rrd` files.

use crate::LogMsg;

// ----------------------------------------------------------------------------
// native encode:

#[cfg(feature = "save")]
#[cfg(not(target_arch = "wasm32"))]
pub fn encode<'a>(
messages: impl Iterator<Item = &'a LogMsg>,
mut write: impl std::io::Write,
) -> anyhow::Result<()> {
crate::profile_function!();
mod encoder {
use anyhow::Context as _;
use std::io::Write as _;

write.write_all(b"RRF0").context("header")?;
write.write_all(&[0, 0, 0, 0]).context("header")?; // reserved for future use
use crate::LogMsg;

let level = 3;
let mut encoder = zstd::stream::Encoder::new(write, level).context("zstd start")?;
/// Encode a stream of [`LogMsg`] into an `.rrd` file.
pub struct Encoder<W: std::io::Write> {
zstd_encoder: zstd::stream::Encoder<'static, W>,
buffer: Vec<u8>,
}

let mut buffer = vec![];
impl<W: std::io::Write> Encoder<W> {
pub fn new(mut write: W) -> anyhow::Result<Self> {
write.write_all(b"RRF0").context("header")?;
write.write_all(&[0, 0, 0, 0]).context("header")?; // reserved for future use

for message in messages {
buffer.clear();
rmp_serde::encode::write_named(&mut buffer, message).context("MessagePack encoding")?;
encoder
.write_all(&(buffer.len() as u64).to_le_bytes())
.context("zstd write")?;
encoder.write_all(&buffer).context("zstd write")?;
}
let level = 3;
let zstd_encoder = zstd::stream::Encoder::new(write, level).context("zstd start")?;

encoder.finish().context("zstd finish")?;
Ok(Self {
zstd_encoder,
buffer: vec![],
})
}

pub fn append(&mut self, message: &LogMsg) -> anyhow::Result<()> {
let Self {
zstd_encoder,
buffer,
} = self;

buffer.clear();
rmp_serde::encode::write_named(buffer, message).context("MessagePack encoding")?;

Ok(())
zstd_encoder
.write_all(&(buffer.len() as u64).to_le_bytes())
.context("zstd write")?;
zstd_encoder.write_all(buffer).context("zstd write")?;

Ok(())
}

pub fn finish(self) -> anyhow::Result<()> {
self.zstd_encoder.finish().context("zstd finish")?;
Ok(())
}
}

pub fn encode<'a>(
messages: impl Iterator<Item = &'a LogMsg>,
write: impl std::io::Write,
) -> anyhow::Result<()> {
let mut encoder = Encoder::new(write)?;
for message in messages {
encoder.append(message)?;
}
encoder.finish()
}
}

#[cfg(feature = "save")]
#[cfg(not(target_arch = "wasm32"))]
pub use encoder::*;

// ----------------------------------------------------------------------------
// native
// native decode:

#[cfg(feature = "load")]
#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -95,7 +134,7 @@ impl<'r, R: std::io::BufRead> Iterator for Decoder<'r, R> {
}

// ----------------------------------------------------------------------------
// wasm:
// wasm decode:

#[cfg(feature = "load")]
#[cfg(target_arch = "wasm32")]
Expand Down
26 changes: 10 additions & 16 deletions crates/re_sdk/src/clap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use std::{net::SocketAddr, path::PathBuf};

#[derive(Debug, Clone, PartialEq, Eq)]
enum RerunBehavior {
Connect(SocketAddr),
Save(PathBuf),
#[cfg(feature = "web_viewer")]
Serve,
Connect(SocketAddr),
Spawn,
}

Expand Down Expand Up @@ -58,33 +58,27 @@ pub struct RerunArgs {

impl RerunArgs {
/// Run common Rerun script setup actions. Connect to the viewer if necessary.
pub fn on_startup(&self, session: &mut Session) -> bool {
///
/// Returns `true` if you should call `session.spawn`.
pub fn on_startup(&self, session: &mut Session) -> anyhow::Result<bool> {
match self.to_behavior() {
RerunBehavior::Connect(addr) => session.connect(addr),
RerunBehavior::Spawn => return true,
RerunBehavior::Save(path) => session.save(path)?,
#[cfg(feature = "web_viewer")]
RerunBehavior::Serve => session.serve(true),
RerunBehavior::Save(_) => {}
RerunBehavior::Spawn => return Ok(true),
}

false
Ok(false)
}

/// Run common post-actions. Sleep if serving the web viewer.
pub fn on_teardown(&self, session: &mut Session) -> anyhow::Result<()> {
let behavior = self.to_behavior();

pub fn on_teardown(&self) {
#[cfg(feature = "web_viewer")]
if behavior == RerunBehavior::Serve {
eprintln!("Sleeping while serving the web viewer. Abort with Ctrl-C");
if self.to_behavior() == RerunBehavior::Serve {
eprintln!("Sleeping while serving the web viewer. Abort with Ctrl-C"); // TODO(emilk): sleep in `drop` instead?
std::thread::sleep(std::time::Duration::from_secs(1_000_000));
}

if let RerunBehavior::Save(path) = behavior {
session.save(path)?;
}

Ok(())
}

fn to_behavior(&self) -> RerunBehavior {
Expand Down
57 changes: 57 additions & 0 deletions crates/re_sdk/src/file_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use anyhow::Context as _;

use re_log_types::LogMsg;

pub struct FileWriter {
// None = quit
tx: std::sync::mpsc::Sender<Option<LogMsg>>,
join_handle: Option<std::thread::JoinHandle<()>>,
}

impl Drop for FileWriter {
fn drop(&mut self) {
self.tx.send(None).ok();
if let Some(join_handle) = self.join_handle.take() {
join_handle.join().ok();
}
}
}

impl FileWriter {
pub fn new(path: impl Into<std::path::PathBuf>) -> anyhow::Result<Self> {
let (tx, rx) = std::sync::mpsc::channel();

let path = path.into();

re_log::debug!("Saving file to {path:?}…");

let file = std::fs::File::create(&path).with_context(|| format!("Path: {:?}", path))?;
let mut encoder = re_log_types::encoding::Encoder::new(file)?;

let join_handle = std::thread::Builder::new()
.name("file_writer".into())
.spawn(move || {
while let Ok(Some(log_msg)) = rx.recv() {
if let Err(err) = encoder.append(&log_msg) {
re_log::error!("Failed to save log stream to {path:?}: {err}");
return;
}
}
if let Err(err) = encoder.finish() {
re_log::error!("Failed to save log stream to {path:?}: {err}");
} else {
re_log::debug!("Log stream saved to {path:?}");
}
})
.context("Failed to spawn thread")?;

Ok(Self {
tx,
join_handle: Some(join_handle),
})
}

pub fn write(&self, msg: LogMsg) {
self.tx.send(Some(msg)).ok();
}
}
4 changes: 4 additions & 0 deletions crates/re_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
#![warn(missing_docs)] // Let's keep the this crate well-documented!

// Send data to a rerun session
#[cfg(not(target_arch = "wasm32"))]
mod file_writer;
mod global;
mod msg_sender;
#[cfg(feature = "web_viewer")]
mod remote_viewer_server;
mod session;

pub use self::global::{global_session, global_session_with_default_enabled};
Expand Down
59 changes: 59 additions & 0 deletions crates/re_sdk/src/remote_viewer_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use re_log_types::LogMsg;

/// Hosts two servers:
/// * A web-server, serving the web-viewer
/// * A `WebSocket` server, server [`LogMsg`]es to remote viewer(s).
pub struct RemoteViewerServer {
web_server_join_handle: tokio::task::JoinHandle<()>,
sender: re_smart_channel::Sender<LogMsg>,
}

impl Drop for RemoteViewerServer {
fn drop(&mut self) {
re_log::info!("Shutting down web server.");
self.web_server_join_handle.abort();
}
}

impl RemoteViewerServer {
pub fn new(tokio_rt: &tokio::runtime::Runtime, open_browser: bool) -> Self {
let (rerun_tx, rerun_rx) = re_smart_channel::smart_channel(re_smart_channel::Source::Sdk);

let web_server_join_handle = tokio_rt.spawn(async move {
// This is the server which the web viewer will talk to:
let ws_server = re_ws_comms::Server::new(re_ws_comms::DEFAULT_WS_SERVER_PORT)
.await
.unwrap();
let ws_server_handle = tokio::spawn(ws_server.listen(rerun_rx)); // TODO(emilk): use tokio_rt ?

// This is the server that serves the Wasm+HTML:
let web_port = 9090;
let web_server = re_web_server::WebServer::new(web_port);
let web_server_handle = tokio::spawn(async move {
web_server.serve().await.unwrap();
});

let ws_server_url = re_ws_comms::default_server_url();
let viewer_url = format!("http://127.0.0.1:{web_port}?url={ws_server_url}");
if open_browser {
webbrowser::open(&viewer_url).ok();
} else {
re_log::info!("Web server is running - view it at {viewer_url}");
}

ws_server_handle.await.unwrap().unwrap();
web_server_handle.await.unwrap();
});

Self {
web_server_join_handle,
sender: rerun_tx,
}
}

pub fn send(&self, msg: LogMsg) {
if let Err(err) = self.sender.send(msg) {
re_log::error_once!("Failed to send log message to web server: {err}");
}
}
}
Loading