Skip to content

Commit

Permalink
Show e2e latency in metric ui in top panel (#4502)
Browse files Browse the repository at this point in the history
### What
When the top panel metrics are enabled, you will now see the e2e
latency, from when `log` is called in the SDK to when the data is added
to the data store in the viewer.

This is only accurate if the logging SDK is running on the same machine
as the viewer (or if the clocks on the two machines are very closely
synced).

Example when running
`examples/python/live_camera_edge_detection/main.py`:


![image](https://github.com/rerun-io/rerun/assets/1148717/ce7bd082-d06b-4220-8f9e-ad2d7513dd2f)


### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested the web demo (if applicable):
  * Full build: [app.rerun.io](https://app.rerun.io/pr/4502/index.html)
* Partial build:
[app.rerun.io](https://app.rerun.io/pr/4502/index.html?manifest_url=https://app.rerun.io/version/nightly/examples_manifest.json)
- Useful for quick testing when changes do not affect examples in any
way
* [x] The PR title and labels are set such as to maximize their
usefulness for the next release's CHANGELOG

- [PR Build Summary](https://build.rerun.io/pr/4502)
- [Docs
preview](https://rerun.io/preview/34843c640f3e411bbca05917de81e68c696a8844/docs)
<!--DOCS-PREVIEW-->
- [Examples
preview](https://rerun.io/preview/34843c640f3e411bbca05917de81e68c696a8844/examples)
<!--EXAMPLES-PREVIEW-->
- [Recent benchmark results](https://build.rerun.io/graphs/crates.html)
- [Wasm size tracking](https://build.rerun.io/graphs/sizes.html)
  • Loading branch information
emilk authored Dec 14, 2023
1 parent fd7e962 commit 003f167
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 18 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/re_data_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ re_types_core.workspace = true
ahash.workspace = true
document-features.workspace = true
egui_plot.workspace = true
emath.workspace = true
getrandom.workspace = true
itertools.workspace = true
nohash-hasher.workspace = true
parking_lot.workspace = true
rmp-serde = { workspace = true, optional = true }
serde = { workspace = true, features = ["derive", "rc"], optional = true }
thiserror.workspace = true
Expand Down
103 changes: 96 additions & 7 deletions crates/re_data_store/src/store_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::BTreeMap;

use itertools::Itertools;
use nohash_hasher::IntMap;
use parking_lot::Mutex;

use re_arrow_store::{
DataStore, DataStoreConfig, GarbageCollectionOptions, StoreEvent, StoreSubscriber,
Expand Down Expand Up @@ -108,6 +109,8 @@ pub struct StoreDb {

/// Stores all components for all entities for all timelines.
data_store: DataStore,

stats: IngestionStatistics,
}

impl StoreDb {
Expand All @@ -121,10 +124,11 @@ impl StoreDb {
times_per_timeline: Default::default(),
tree: crate::EntityTree::root(),
data_store: re_arrow_store::DataStore::new(
store_id,
store_id.clone(),
InstanceKey::name(),
DataStoreConfig::default(),
),
stats: IngestionStatistics::new(store_id),
}
}

Expand Down Expand Up @@ -224,6 +228,11 @@ impl StoreDb {
self.entity_path_from_hash.values().sorted().collect()
}

#[inline]
pub fn ingestion_stats(&self) -> &IngestionStatistics {
&self.stats
}

#[inline]
pub fn entity_path_from_hash(&self, entity_path_hash: &EntityPathHash) -> Option<&EntityPath> {
self.entity_path_from_hash.get(entity_path_hash)
Expand Down Expand Up @@ -302,22 +311,25 @@ impl StoreDb {
//
// This might result in a [`ClearCascade`] if the events trigger one or more immediate
// and/or pending clears.
let store_events = &[store_event];
self.times_per_timeline.on_events(store_events);
let clear_cascade = self.tree.on_store_additions(store_events);
let original_store_events = &[store_event];
self.times_per_timeline.on_events(original_store_events);
let clear_cascade = self.tree.on_store_additions(original_store_events);

// Second-pass: update the [`DataStore`] by applying the [`ClearCascade`].
//
// This will in turn generate new [`StoreEvent`]s that our internal views need to be
// notified of, again!
let store_events = self.on_clear_cascade(clear_cascade);
self.times_per_timeline.on_events(&store_events);
let clear_cascade = self.tree.on_store_additions(&store_events);
let new_store_events = self.on_clear_cascade(clear_cascade);
self.times_per_timeline.on_events(&new_store_events);
let clear_cascade = self.tree.on_store_additions(&new_store_events);

// Clears don't affect `Clear` components themselves, therefore we cannot have recursive
// cascades, thus this whole process must stabilize after one iteration.
debug_assert!(clear_cascade.is_empty());

// We inform the stats last, since it measures e2e latency.
self.stats.on_events(original_store_events);

Ok(())
}

Expand Down Expand Up @@ -462,6 +474,7 @@ impl StoreDb {
times_per_timeline,
tree,
data_store: _,
stats: _,
} = self;

times_per_timeline.on_events(store_events);
Expand All @@ -477,3 +490,79 @@ impl StoreDb {
.map(|info| (info.application_id.0.as_str(), info.started))
}
}

// ----------------------------------------------------------------------------

pub struct IngestionStatistics {
store_id: StoreId,
e2e_latency_sec_history: Mutex<emath::History<f32>>,
}

impl StoreSubscriber for IngestionStatistics {
fn name(&self) -> String {
"rerun.testing.store_subscribers.IngestionStatistics".into()
}

fn as_any(&self) -> &dyn std::any::Any {
self
}

fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}

fn on_events(&mut self, events: &[StoreEvent]) {
for event in events {
if event.store_id == self.store_id {
self.on_new_row_id(event.row_id);
}
}
}
}

impl IngestionStatistics {
pub fn new(store_id: StoreId) -> Self {
let min_samples = 0; // 0: we stop displaying e2e latency if input stops
let max_samples = 1024; // don't waste too much memory on this - we just need enough to get a good average
let max_age = 1.0; // don't keep too long of a rolling average, or the stats get outdated.
Self {
store_id,
e2e_latency_sec_history: Mutex::new(emath::History::new(
min_samples..max_samples,
max_age,
)),
}
}

fn on_new_row_id(&mut self, row_id: RowId) {
if let Ok(duration_since_epoch) = web_time::SystemTime::UNIX_EPOCH.elapsed() {
let nanos_since_epoch = duration_since_epoch.as_nanos() as u64;

// This only makes sense if the clocks are very good, i.e. if the recording was on the same machine!
if let Some(nanos_since_log) =
nanos_since_epoch.checked_sub(row_id.nanoseconds_since_epoch())
{
let now = nanos_since_epoch as f64 / 1e9;
let sec_since_log = nanos_since_log as f32 / 1e9;

self.e2e_latency_sec_history.lock().add(now, sec_since_log);
}
}
}

/// What is the mean latency between the time data was logged in the SDK and the time it was ingested?
///
/// This is based on the clocks of the viewer and the SDK being in sync,
/// so if the recording was done on another machine, this is likely very inaccurate.
pub fn current_e2e_latency_sec(&self) -> Option<f32> {
let mut e2e_latency_sec_history = self.e2e_latency_sec_history.lock();

if let Ok(duration_since_epoch) = web_time::SystemTime::UNIX_EPOCH.elapsed() {
let nanos_since_epoch = duration_since_epoch.as_nanos() as u64;
let now = nanos_since_epoch as f64 / 1e9;
e2e_latency_sec_history.flush(now); // make sure the average is up-to-date.
}

e2e_latency_sec_history.average()
}
}
6 changes: 6 additions & 0 deletions crates/re_log_types/src/data_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ impl RowId {
pub fn incremented_by(&self, n: u64) -> Self {
Self(self.0.incremented_by(n))
}

/// When the `RowId` was created, in nanoseconds since unix epoch.
#[inline]
pub fn nanoseconds_since_epoch(&self) -> u64 {
self.0.nanoseconds_since_epoch()
}
}

impl SizeBytes for RowId {
Expand Down
19 changes: 16 additions & 3 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,9 @@ impl RecordingStream {
timeless: bool,
arch: &impl AsComponents,
) -> RecordingStreamResult<()> {
self.log_component_batches(
let row_id = RowId::new(); // Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency.
self.log_component_batches_impl(
row_id,
ent_path,
timeless,
arch.as_component_batches()
Expand Down Expand Up @@ -802,6 +804,17 @@ impl RecordingStream {
ent_path: impl Into<EntityPath>,
timeless: bool,
comp_batches: impl IntoIterator<Item = &'a dyn ComponentBatch>,
) -> RecordingStreamResult<()> {
let row_id = RowId::new(); // Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency.
self.log_component_batches_impl(row_id, ent_path, timeless, comp_batches)
}

fn log_component_batches_impl<'a>(
&self,
row_id: RowId,
ent_path: impl Into<EntityPath>,
timeless: bool,
comp_batches: impl IntoIterator<Item = &'a dyn ComponentBatch>,
) -> RecordingStreamResult<()> {
if !self.is_enabled() {
return Ok(()); // silently drop the message
Expand Down Expand Up @@ -854,7 +867,7 @@ impl RecordingStream {
None
} else {
Some(DataRow::from_cells(
RowId::new(),
row_id,
timepoint.clone(),
ent_path.clone(),
num_instances as _,
Expand All @@ -868,7 +881,7 @@ impl RecordingStream {
} else {
splatted.push(DataCell::from_native([InstanceKey::SPLAT]));
Some(DataRow::from_cells(
RowId::new(),
row_id.incremented_by(1), // we need a unique RowId from what is used for the instanced data
timepoint,
ent_path,
1,
Expand Down
7 changes: 6 additions & 1 deletion crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,11 @@ impl App {

let component_ui_registry = re_data_ui::create_component_ui_registry();

// TODO(emilk): `Instant::MIN` when we have our own `Instant` that supports it.;
let long_time_ago = web_time::Instant::now()
.checked_sub(web_time::Duration::from_secs(1_000_000_000))
.unwrap_or(web_time::Instant::now());

Self {
build_info,
startup_options,
Expand All @@ -231,7 +236,7 @@ impl App {

style_panel_open: false,

latest_queue_interest: web_time::Instant::now(), // TODO(emilk): `Instant::MIN` when we have our own `Instant` that supports it.
latest_queue_interest: long_time_ago,

frame_time_history: egui::util::History::new(1..100, 0.5),

Expand Down
80 changes: 75 additions & 5 deletions crates/re_viewer/src/ui/top_panel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ fn top_bar_ui(
ui.separator();
frame_time_label_ui(ui, app);
memory_use_label_ui(ui, gpu_resource_stats);
input_latency_label_ui(ui, app);

latency_ui(ui, app, store_context);
}

ui.with_layout(egui::Layout::right_to_left(egui::Align::Center), |ui| {
Expand Down Expand Up @@ -352,7 +353,68 @@ fn memory_use_label_ui(ui: &mut egui::Ui, gpu_resource_stats: &WgpuResourcePoolS
}
}

fn input_latency_label_ui(ui: &mut egui::Ui, app: &mut App) {
fn latency_ui(ui: &mut egui::Ui, app: &mut App, store_context: Option<&StoreContext<'_>>) {
if let Some(response) = e2e_latency_ui(ui, store_context) {
// Show queue latency on hover, as that is part of this.
// For instance, if the framerate is really bad we have less time to ingest incoming data,
// leading to an ever-increasing input queue.
let rx = app.msg_receive_set();
let queue_len = rx.queue_len();
let latency_sec = rx.latency_ns() as f32 / 1e9;
// empty queue == unreliable latency
if 0 < queue_len {
response.on_hover_ui(|ui| {
ui.label(format!(
"Queue latency: {}, length: {}",
latency_text(latency_sec),
format_number(queue_len),
));

ui.label(
"When more data is arriving over network than the Rerun Viewer can ingest, a queue starts building up, leading to latency and increased RAM use.\n\
We call this the queue latency.");
});
}
} else {
// If we don't know the e2e latency we can still show the queue latency.
input_queue_latency_ui(ui, app);
}
}

/// Shows the e2e latency.
fn e2e_latency_ui(
ui: &mut egui::Ui,
store_context: Option<&StoreContext<'_>>,
) -> Option<egui::Response> {
let Some(store_context) = store_context else {
return None;
};

let Some(recording) = store_context.recording else {
return None;
};

let Some(e2e_latency_sec) = recording.ingestion_stats().current_e2e_latency_sec() else {
return None;
};

if e2e_latency_sec > 60.0 {
return None; // Probably an old recording and not live data.
}

let text = format!("latency: {}", latency_text(e2e_latency_sec));
let response = ui.weak(text);

let hover_text = "End-to-end latency from when the data was logged by the SDK to when it is shown in the viewer.\n\
This includes time for encoding, network latency, and decoding.\n\
It is also affected by the framerate of the viewer.\n\
This latency is inaccurate if the logging was done on a different machine, since it is clock-based.";

Some(response.on_hover_text(hover_text))
}

/// Shows the latency in the input queue.
fn input_queue_latency_ui(ui: &mut egui::Ui, app: &mut App) {
let rx = app.msg_receive_set();

if rx.is_empty() {
Expand All @@ -374,12 +436,12 @@ fn input_latency_label_ui(ui: &mut egui::Ui, app: &mut App) {
ui.separator();
if is_latency_interesting {
let text = format!(
"Latency: {:.2}s, queue: {}",
latency_sec,
"Queue latency: {}, length: {}",
latency_text(latency_sec),
format_number(queue_len),
);
let hover_text =
"When more data is arriving over network than the Rerun Viewer can index, a queue starts building up, leading to latency and increased RAM use.\n\
"When more data is arriving over network than the Rerun Viewer can ingest, a queue starts building up, leading to latency and increased RAM use.\n\
This latency does NOT include network latency.";

if latency_sec < app.app_options().warn_latency {
Expand All @@ -394,3 +456,11 @@ fn input_latency_label_ui(ui: &mut egui::Ui, app: &mut App) {
}
}
}

fn latency_text(latency_sec: f32) -> String {
if latency_sec < 1.0 {
format!("{:.0} ms", 1e3 * latency_sec)
} else {
format!("{latency_sec:.1} s")
}
}
6 changes: 5 additions & 1 deletion crates/rerun_c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,10 @@ fn rr_log_impl(
data_row: CDataRow,
inject_time: bool,
) -> Result<(), CError> {
// Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency.
// TODO(emilk): move to before we arrow-serialize the data
let row_id = re_sdk::log::RowId::new();

let stream = recording_stream(stream)?;

let CDataRow {
Expand Down Expand Up @@ -649,7 +653,7 @@ fn rr_log_impl(
}

let data_row = DataRow::from_cells(
re_sdk::log::RowId::new(),
row_id,
TimePoint::default(), // we use the one in the recording stream for now
entity_path,
num_instances,
Expand Down
Loading

0 comments on commit 003f167

Please sign in to comment.