Skip to content

Commit

Permalink
Refactor re_sdk::Session (#1528)
Browse files Browse the repository at this point in the history
* Refactor: clean up Session slightly

* Simplify the API of RerunArgs

* CI: run `cargo check -p rerun --no-default-features --features sdk`

* Fix errors and warnings when using only the `sdk` feature of rerun

* Make LogSink: Sync

* Refactor: clean up the contents of Session

* Move the tokio runtime out of Session

* Clean up Session a bit

* Refactor sink names and modules

* Clone Session as PythonSession

The Python API have different needs from the Rust API

* justfile: make sure our just-scripts use `set -euo pipefail`

* Add just rs-lint

* lint.py: white-list "./web_viewer/re_viewer_debug.js"

* Run `typos` in `just lint`

* Simplify `Session`, and add `SessionBuilder`

* Use --deny-warnings instead of setting RUSTFLAGS

less re-compiling

* rust.yml: replace `-D` with more explicit `--deny`

* Make ahash a workspace dependency

* Remove typos from just py-lint again since it runs on CI

* Make Session: Clone

* Make sure `Session` is `Send` and `Sync`

* Make `tracing` and `tracing-subscriber` workspace dependencies

* Less `mut Session`

* Remove lint of `dbg!` (clippy checks it now)

* MsgSender::send can be used with both Session and LogSink

* bug fix

* Cleanup

* Add Session::sink to access the underlying sink

* Document the built-in log level names

* Simplify TCP client by removing the `set_addr` method

* Update TcpClient documentation

* Use thiserror to report LogMsg encoding errors

* Replace some more `anyhow` with `thiserror`

* fix copy-paste bug

Co-authored-by: Jeremy Leibs <[email protected]>

* fix copy-paste bug

Co-authored-by: Jeremy Leibs <[email protected]>

* Sleep longer

* Simplify code with SessionBuilder::buffered

---------

Co-authored-by: Jeremy Leibs <[email protected]>
  • Loading branch information
emilk and jleibs authored Mar 9, 2023
1 parent c8c91a4 commit 93edfca
Show file tree
Hide file tree
Showing 49 changed files with 998 additions and 600 deletions.
13 changes: 10 additions & 3 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ env:
# web_sys_unstable_apis is required to enable the web_sys clipboard API which egui_web uses
# https://rustwasm.github.io/wasm-bindgen/api/web_sys/struct.Clipboard.html
# https://rustwasm.github.io/docs/wasm-bindgen/web-sys/unstable-apis.html
RUSTFLAGS: --cfg=web_sys_unstable_apis -D warnings
RUSTFLAGS: --cfg=web_sys_unstable_apis --deny warnings

# See https://github.com/ericseppanen/cargo-cranky/issues/8
RUSTDOCFLAGS: -D warnings -D rustdoc::missing_crate_level_docs
RUSTDOCFLAGS: --deny warnings --deny rustdoc::missing_crate_level_docs

permissions:
# deployments permission to deploy GitHub pages website
Expand Down Expand Up @@ -122,14 +122,21 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: cranky
args: --all-targets --all-features -- -D warnings
args: --all-targets --all-features -- --deny warnings

- name: Check no default features
uses: actions-rs/cargo@v1
with:
command: check
args: --locked --no-default-features --features __ci --lib

# Check a few important permutations of the feature flags for our `rerun` library:
- name: Check rerun with --features sdk
uses: actions-rs/cargo@v1
with:
command: check
args: --locked --no-default-features --features sdk

- name: Test doc-tests
uses: actions-rs/cargo@v1
with:
Expand Down
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ re_web_viewer_server = { path = "crates/re_web_viewer_server", version = "0.3.0"
re_ws_comms = { path = "crates/re_ws_comms", version = "0.3.0" }
rerun = { path = "crates/rerun", version = "0.3.0" }

ahash = "0.8"
anyhow = "1.0"
arrow2 = "0.16"
arrow2_convert = "0.4.2"
Expand Down Expand Up @@ -77,6 +78,8 @@ puffin = "0.14"
thiserror = "1.0"
time = { version = "0.3", features = ["wasm-bindgen"] }
tokio = "1.24"
tracing = "0.1"
tracing-subscriber = "0.3"
wgpu = { version = "0.15", default-features = false }
wgpu-core = { version = "0.15", default-features = false }

Expand Down
2 changes: 1 addition & 1 deletion crates/re_analytics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ web-sys = { version = "0.3.58", features = ["Storage"] }


[dev-dependencies]
tracing-subscriber = "0.3"
tracing-subscriber.workspace = true


[build-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/re_arrow_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ re_log_types.workspace = true
re_log.workspace = true

# External dependencies:
ahash = "0.8"
ahash.workspace = true
anyhow.workspace = true
arrow2 = { workspace = true, features = [
"compute_concatenate",
Expand Down
2 changes: 1 addition & 1 deletion crates/re_data_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ re_log.workspace = true
re_smart_channel.workspace = true
re_string_interner.workspace = true

ahash = "0.8"
ahash.workspace = true
anyhow = "1.0"
document-features = "0.2"
itertools = "0.10"
Expand Down
4 changes: 2 additions & 2 deletions crates/re_log/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ all-features = true

[dependencies]
log-once = "0.4"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing.workspace = true
tracing-subscriber = { workspace = true, features = ["env-filter"] }

# web dependencies:
[target.'cfg(target_arch = "wasm32")'.dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/re_log_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ re_string_interner.workspace = true
re_tuid.workspace = true

# External
ahash = "0.8"
ahash.workspace = true
array-init = "2.1.0"
arrow2 = { workspace = true, features = ["io_ipc", "io_print"] }
arrow2_convert.workspace = true
Expand Down
8 changes: 8 additions & 0 deletions crates/re_log_types/src/component_types/text_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ use crate::msg_bundle::Component;
#[derive(Clone, Debug, ArrowField, ArrowSerialize, ArrowDeserialize, PartialEq, Eq)]
pub struct TextEntry {
pub body: String,

// Recommended to be one of:
// * `"CRITICAL"`
// * `"ERROR"`
// * `"WARN"`
// * `"INFO"`
// * `"DEBUG"`
// * `"TRACE"`
pub level: Option<String>,
}

Expand Down
37 changes: 25 additions & 12 deletions crates/re_log_types/src/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,62 +8,75 @@ use crate::LogMsg;
#[cfg(feature = "save")]
#[cfg(not(target_arch = "wasm32"))]
mod encoder {
use anyhow::Context as _;
use std::io::Write as _;

use crate::LogMsg;

/// On failure to encode or serialize a [`LogMsg`].
#[derive(thiserror::Error, Debug)]
pub enum EncodeError {
#[error("Failed to write: {0}")]
Write(std::io::Error),

#[error("Zstd error: {0}")]
Zstd(std::io::Error),

#[error("MsgPack error: {0}")]
MsgPack(#[from] rmp_serde::encode::Error),
}

/// Encode a stream of [`LogMsg`] into an `.rrd` file.
pub struct Encoder<W: std::io::Write> {
zstd_encoder: zstd::stream::Encoder<'static, W>,
buffer: Vec<u8>,
}

impl<W: std::io::Write> Encoder<W> {
pub fn new(mut write: W) -> anyhow::Result<Self> {
pub fn new(mut write: W) -> Result<Self, EncodeError> {
let rerun_version = re_build_info::CrateVersion::parse(env!("CARGO_PKG_VERSION"));

write.write_all(b"RRF0").context("header")?;
write.write_all(b"RRF0").map_err(EncodeError::Write)?;
write
.write_all(&rerun_version.to_bytes())
.context("header")?;
.map_err(EncodeError::Write)?;

let level = 3;
let zstd_encoder = zstd::stream::Encoder::new(write, level).context("zstd start")?;
let zstd_encoder =
zstd::stream::Encoder::new(write, level).map_err(EncodeError::Zstd)?;

Ok(Self {
zstd_encoder,
buffer: vec![],
})
}

pub fn append(&mut self, message: &LogMsg) -> anyhow::Result<()> {
pub fn append(&mut self, message: &LogMsg) -> Result<(), EncodeError> {
let Self {
zstd_encoder,
buffer,
} = self;

buffer.clear();
rmp_serde::encode::write_named(buffer, message).context("MessagePack encoding")?;
rmp_serde::encode::write_named(buffer, message)?;

zstd_encoder
.write_all(&(buffer.len() as u64).to_le_bytes())
.context("zstd write")?;
zstd_encoder.write_all(buffer).context("zstd write")?;
.map_err(EncodeError::Zstd)?;
zstd_encoder.write_all(buffer).map_err(EncodeError::Zstd)?;

Ok(())
}

pub fn finish(self) -> anyhow::Result<()> {
self.zstd_encoder.finish().context("zstd finish")?;
pub fn finish(self) -> Result<(), EncodeError> {
self.zstd_encoder.finish().map_err(EncodeError::Zstd)?;
Ok(())
}
}

pub fn encode<'a>(
messages: impl Iterator<Item = &'a LogMsg>,
write: impl std::io::Write,
) -> anyhow::Result<()> {
) -> Result<(), EncodeError> {
let mut encoder = Encoder::new(write)?;
for message in messages {
encoder.append(message)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/re_memory/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ all-features = true
re_format.workspace = true
re_log.workspace = true

ahash = "0.8"
ahash.workspace = true
backtrace = { version = "0.3" }
emath.workspace = true
instant = { version = "0.1", features = ["wasm-bindgen"] }
Expand Down
4 changes: 2 additions & 2 deletions crates/re_renderer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ serde = ["dep:serde"]
re_error.workspace = true
re_log.workspace = true

ahash = "0.8"
ahash.workspace = true
anyhow.workspace = true
bitflags = "1.3"
bytemuck = { version = "1.12", features = ["derive"] }
Expand Down Expand Up @@ -92,7 +92,7 @@ instant = { version = "0.1", features = ["wasm-bindgen"] }
log = "0.4"
pollster = "0.3"
rand = "0.8"
tracing = "0.1"
tracing.workspace = true
winit = "0.28.1"
zip = { version = "0.6", default-features = false, features = ["deflate"] }

Expand Down
16 changes: 3 additions & 13 deletions crates/re_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,16 @@ demo = []
glam = ["re_log_types/glam"]

## Add the `global_session` method.
global_session = ["dep:once_cell", "dep:parking_lot"]
global_session = ["dep:once_cell"]

## Integration with the [`image`](https://crates.io/crates/image/) crate.
image = ["re_log_types/image"]

# Add a tokio runtime to the `Session` type.
tokio_runtime = ["dep:tokio"]


[dependencies]
re_build_info.workspace = true
re_error.workspace = true
re_log_types.workspace = true
re_log_types = { workspace = true, features = ["save"] }
re_log.workspace = true
re_memory.workspace = true
re_sdk_comms = { workspace = true, features = ["client"] }
Expand All @@ -50,18 +47,11 @@ crossbeam = "0.8"
document-features = "0.2"
lazy_static.workspace = true
nohash-hasher = "0.2"
parking_lot.workspace = true
thiserror.workspace = true

# Optional dependencies:
once_cell = { version = "1.12", optional = true }
parking_lot = { version = "0.12", optional = true }

# Native dependencies:
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { workspace = true, optional = true, features = [
"macros",
"rt-multi-thread",
] }


[dev-dependencies]
Expand Down
80 changes: 80 additions & 0 deletions crates/re_sdk/src/file_sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use std::{path::PathBuf, sync::mpsc::Sender};

use parking_lot::Mutex;

use re_log_types::LogMsg;

/// Errors that can occur when creating a [`FileSink`].
#[derive(thiserror::Error, Debug)]
pub enum FileSinkError {
/// Error creating the file.
#[error("Failed to create file {0}: {1}")]
CreateFile(PathBuf, std::io::Error),

/// Error spawning the file writer thread.
#[error("Failed to spawn thread: {0}")]
SpawnThread(std::io::Error),

/// Error encoding a log message.
#[error("Failed to encode LogMsg: {0}")]
LogMsgEncode(#[from] re_log_types::encoding::EncodeError),
}

/// Stream log messages to an `.rrd` file.
pub struct FileSink {
// None = quit
tx: Mutex<Sender<Option<LogMsg>>>,
join_handle: Option<std::thread::JoinHandle<()>>,
}

impl Drop for FileSink {
fn drop(&mut self) {
self.tx.lock().send(None).ok();
if let Some(join_handle) = self.join_handle.take() {
join_handle.join().ok();
}
}
}

impl FileSink {
/// Start writing log messages to a file at the given path.
pub fn new(path: impl Into<std::path::PathBuf>) -> Result<Self, FileSinkError> {
let (tx, rx) = std::sync::mpsc::channel();

let path = path.into();

re_log::debug!("Saving file to {path:?}…");

let file = std::fs::File::create(&path)
.map_err(|err| FileSinkError::CreateFile(path.clone(), err))?;
let mut encoder = re_log_types::encoding::Encoder::new(file)?;

let join_handle = std::thread::Builder::new()
.name("file_writer".into())
.spawn(move || {
while let Ok(Some(log_msg)) = rx.recv() {
if let Err(err) = encoder.append(&log_msg) {
re_log::error!("Failed to save log stream to {path:?}: {err}");
return;
}
}
if let Err(err) = encoder.finish() {
re_log::error!("Failed to save log stream to {path:?}: {err}");
} else {
re_log::debug!("Log stream saved to {path:?}");
}
})
.map_err(FileSinkError::SpawnThread)?;

Ok(Self {
tx: tx.into(),
join_handle: Some(join_handle),
})
}
}

impl crate::sink::LogSink for FileSink {
fn send(&self, msg: LogMsg) {
self.tx.lock().send(Some(msg)).ok();
}
}
Loading

1 comment on commit 93edfca

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust Benchmark

Benchmark suite Current: 93edfca Previous: c8c91a4 Ratio
datastore/insert/batch/rects/insert 557404 ns/iter (± 1931) 548497 ns/iter (± 2367) 1.02
datastore/latest_at/batch/rects/query 1821 ns/iter (± 8) 1813 ns/iter (± 19) 1.00
datastore/latest_at/missing_components/primary 356 ns/iter (± 1) 358 ns/iter (± 0) 0.99
datastore/latest_at/missing_components/secondaries 424 ns/iter (± 1) 428 ns/iter (± 1) 0.99
datastore/range/batch/rects/query 152292 ns/iter (± 498) 154861 ns/iter (± 1385) 0.98
mono_points_arrow/generate_message_bundles 52935324 ns/iter (± 946607) 47672724 ns/iter (± 865707) 1.11
mono_points_arrow/generate_messages 140038594 ns/iter (± 1121718) 124757765 ns/iter (± 1181023) 1.12
mono_points_arrow/encode_log_msg 165644765 ns/iter (± 1298114) 153929398 ns/iter (± 963943) 1.08
mono_points_arrow/encode_total 362959572 ns/iter (± 1770620) 326350073 ns/iter (± 1562282) 1.11
mono_points_arrow/decode_log_msg 188895962 ns/iter (± 1148497) 176527715 ns/iter (± 826705) 1.07
mono_points_arrow/decode_message_bundles 75769133 ns/iter (± 2391124) 64587172 ns/iter (± 803674) 1.17
mono_points_arrow/decode_total 262326604 ns/iter (± 1900899) 239435670 ns/iter (± 1442385) 1.10
batch_points_arrow/generate_message_bundles 333138 ns/iter (± 1004) 321217 ns/iter (± 2360) 1.04
batch_points_arrow/generate_messages 6392 ns/iter (± 21) 6074 ns/iter (± 38) 1.05
batch_points_arrow/encode_log_msg 375721 ns/iter (± 1652) 367448 ns/iter (± 1512) 1.02
batch_points_arrow/encode_total 729575 ns/iter (± 2652) 713399 ns/iter (± 3010) 1.02
batch_points_arrow/decode_log_msg 353623 ns/iter (± 928) 345738 ns/iter (± 1971) 1.02
batch_points_arrow/decode_message_bundles 2099 ns/iter (± 16) 2087 ns/iter (± 13) 1.01
batch_points_arrow/decode_total 357774 ns/iter (± 3224) 353766 ns/iter (± 940) 1.01
arrow_mono_points/insert 7259721552 ns/iter (± 149385179) 6018780763 ns/iter (± 22048901) 1.21
arrow_mono_points/query 1743373 ns/iter (± 36429) 1707188 ns/iter (± 9562) 1.02
arrow_batch_points/insert 2772973 ns/iter (± 78195) 2685137 ns/iter (± 18624) 1.03
arrow_batch_points/query 16888 ns/iter (± 47) 16849 ns/iter (± 93) 1.00
arrow_batch_vecs/insert 41866 ns/iter (± 141) 41667 ns/iter (± 1063) 1.00
arrow_batch_vecs/query 389761 ns/iter (± 1875) 388567 ns/iter (± 1289) 1.00
tuid/Tuid::random 34 ns/iter (± 0) 34 ns/iter (± 0) 1

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.