Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show e2e latency in metric ui in top panel #4502

Merged
merged 11 commits into from
Dec 14, 2023
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/re_data_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ re_types_core.workspace = true
ahash.workspace = true
document-features.workspace = true
egui_plot.workspace = true
emath.workspace = true
getrandom.workspace = true
itertools.workspace = true
nohash-hasher.workspace = true
parking_lot.workspace = true
rmp-serde = { workspace = true, optional = true }
serde = { workspace = true, features = ["derive", "rc"], optional = true }
thiserror.workspace = true
Expand Down
103 changes: 96 additions & 7 deletions crates/re_data_store/src/store_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::BTreeMap;

use itertools::Itertools;
use nohash_hasher::IntMap;
use parking_lot::Mutex;

use re_arrow_store::{
DataStore, DataStoreConfig, GarbageCollectionOptions, StoreEvent, StoreSubscriber,
Expand Down Expand Up @@ -108,6 +109,8 @@ pub struct StoreDb {

/// Stores all components for all entities for all timelines.
data_store: DataStore,

stats: IngestionStatistics,
}

impl StoreDb {
Expand All @@ -121,10 +124,11 @@ impl StoreDb {
times_per_timeline: Default::default(),
tree: crate::EntityTree::root(),
data_store: re_arrow_store::DataStore::new(
store_id,
store_id.clone(),
InstanceKey::name(),
DataStoreConfig::default(),
),
stats: IngestionStatistics::new(store_id),
}
}

Expand Down Expand Up @@ -224,6 +228,11 @@ impl StoreDb {
self.entity_path_from_hash.values().sorted().collect()
}

#[inline]
pub fn ingestion_stats(&self) -> &IngestionStatistics {
&self.stats
}

#[inline]
pub fn entity_path_from_hash(&self, entity_path_hash: &EntityPathHash) -> Option<&EntityPath> {
self.entity_path_from_hash.get(entity_path_hash)
Expand Down Expand Up @@ -302,22 +311,25 @@ impl StoreDb {
//
// This might result in a [`ClearCascade`] if the events trigger one or more immediate
// and/or pending clears.
let store_events = &[store_event];
self.times_per_timeline.on_events(store_events);
let clear_cascade = self.tree.on_store_additions(store_events);
let original_store_events = &[store_event];
self.times_per_timeline.on_events(original_store_events);
let clear_cascade = self.tree.on_store_additions(original_store_events);

// Second-pass: update the [`DataStore`] by applying the [`ClearCascade`].
//
// This will in turn generate new [`StoreEvent`]s that our internal views need to be
// notified of, again!
let store_events = self.on_clear_cascade(clear_cascade);
self.times_per_timeline.on_events(&store_events);
let clear_cascade = self.tree.on_store_additions(&store_events);
let new_store_events = self.on_clear_cascade(clear_cascade);
self.times_per_timeline.on_events(&new_store_events);
let clear_cascade = self.tree.on_store_additions(&new_store_events);

// Clears don't affect `Clear` components themselves, therefore we cannot have recursive
// cascades, thus this whole process must stabilize after one iteration.
debug_assert!(clear_cascade.is_empty());

// We inform the stats last, since it measures e2e latency.
self.stats.on_events(original_store_events);

Ok(())
}

Expand Down Expand Up @@ -462,6 +474,7 @@ impl StoreDb {
times_per_timeline,
tree,
data_store: _,
stats: _,
} = self;

times_per_timeline.on_events(store_events);
Expand All @@ -477,3 +490,79 @@ impl StoreDb {
.map(|info| (info.application_id.0.as_str(), info.started))
}
}

// ----------------------------------------------------------------------------

pub struct IngestionStatistics {
store_id: StoreId,
e2e_latency_sec_history: Mutex<emath::History<f32>>,
emilk marked this conversation as resolved.
Show resolved Hide resolved
}

impl StoreSubscriber for IngestionStatistics {
fn name(&self) -> String {
"rerun.testing.store_subscribers.IngestionStatistics".into()
}

fn as_any(&self) -> &dyn std::any::Any {
self
}

fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}

fn on_events(&mut self, events: &[StoreEvent]) {
for event in events {
if event.store_id == self.store_id {
self.on_new_row_id(event.row_id);
}
}
}
}

impl IngestionStatistics {
pub fn new(store_id: StoreId) -> Self {
let min_samples = 0; // 0: we stop displaying e2e latency if input stops
let max_samples = 1024; // don't waste too much memory on this - we just need enough to get a good average
let max_age = 1.0; // don't keep too long of a rolling average, or the stats get outdated.
Self {
store_id,
e2e_latency_sec_history: Mutex::new(emath::History::new(
min_samples..max_samples,
max_age,
)),
}
}

fn on_new_row_id(&mut self, row_id: RowId) {
if let Ok(duration_since_epoch) = web_time::SystemTime::UNIX_EPOCH.elapsed() {
let nanos_since_epoch = duration_since_epoch.as_nanos() as u64;

// This only makes sense if the clocks are very good, i.e. if the recording was on the same machine!
if let Some(nanos_since_log) =
nanos_since_epoch.checked_sub(row_id.nanoseconds_since_epoch())
{
let now = nanos_since_epoch as f64 / 1e9;
let sec_since_log = nanos_since_log as f32 / 1e9;

self.e2e_latency_sec_history.lock().add(now, sec_since_log);
}
}
}

/// What is the mean latency between the time data was logged in the SDK and the time it was ingested?
///
/// This is based on the clocks of the viewer and the SDK being in sync,
/// so if the recording was done on another machine, this is likely very inaccurate.
pub fn current_e2e_latency_sec(&self) -> Option<f32> {
let mut e2e_latency_sec_history = self.e2e_latency_sec_history.lock();

if let Ok(duration_since_epoch) = web_time::SystemTime::UNIX_EPOCH.elapsed() {
let nanos_since_epoch = duration_since_epoch.as_nanos() as u64;
let now = nanos_since_epoch as f64 / 1e9;
e2e_latency_sec_history.flush(now); // make sure the average is up-to-date.
}

e2e_latency_sec_history.average()
}
}
6 changes: 6 additions & 0 deletions crates/re_log_types/src/data_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ impl RowId {
pub fn incremented_by(&self, n: u64) -> Self {
Self(self.0.incremented_by(n))
}

/// When the `RowId` was created, in nanoseconds since unix epoch.
#[inline]
pub fn nanoseconds_since_epoch(&self) -> u64 {
self.0.nanoseconds_since_epoch()
}
}

impl SizeBytes for RowId {
Expand Down
19 changes: 16 additions & 3 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,9 @@ impl RecordingStream {
timeless: bool,
arch: &impl AsComponents,
) -> RecordingStreamResult<()> {
self.log_component_batches(
let row_id = RowId::new(); // Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency.
self.log_component_batches_impl(
row_id,
ent_path,
timeless,
arch.as_component_batches()
Expand Down Expand Up @@ -802,6 +804,17 @@ impl RecordingStream {
ent_path: impl Into<EntityPath>,
timeless: bool,
comp_batches: impl IntoIterator<Item = &'a dyn ComponentBatch>,
) -> RecordingStreamResult<()> {
let row_id = RowId::new(); // Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency.
self.log_component_batches_impl(row_id, ent_path, timeless, comp_batches)
}

fn log_component_batches_impl<'a>(
&self,
row_id: RowId,
ent_path: impl Into<EntityPath>,
timeless: bool,
comp_batches: impl IntoIterator<Item = &'a dyn ComponentBatch>,
) -> RecordingStreamResult<()> {
if !self.is_enabled() {
return Ok(()); // silently drop the message
Expand Down Expand Up @@ -854,7 +867,7 @@ impl RecordingStream {
None
} else {
Some(DataRow::from_cells(
RowId::new(),
row_id,
timepoint.clone(),
ent_path.clone(),
num_instances as _,
Expand All @@ -868,7 +881,7 @@ impl RecordingStream {
} else {
splatted.push(DataCell::from_native([InstanceKey::SPLAT]));
Some(DataRow::from_cells(
RowId::new(),
row_id.incremented_by(1), // we need a unique RowId from what is used for the instanced data
timepoint,
ent_path,
1,
Expand Down
7 changes: 6 additions & 1 deletion crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ impl App {

let component_ui_registry = re_data_ui::create_component_ui_registry();

// TODO(emilk): `Instant::MIN` when we have our own `Instant` that supports it.;
let long_time_ago = web_time::Instant::now()
.checked_sub(web_time::Duration::from_secs(1_000_000_000))
.unwrap_or(web_time::Instant::now());

Self {
build_info,
startup_options,
Expand All @@ -230,7 +235,7 @@ impl App {

style_panel_open: false,

latest_queue_interest: web_time::Instant::now(), // TODO(emilk): `Instant::MIN` when we have our own `Instant` that supports it.
latest_queue_interest: long_time_ago,

frame_time_history: egui::util::History::new(1..100, 0.5),

Expand Down
80 changes: 75 additions & 5 deletions crates/re_viewer/src/ui/top_panel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ fn top_bar_ui(
ui.separator();
frame_time_label_ui(ui, app);
memory_use_label_ui(ui, gpu_resource_stats);
input_latency_label_ui(ui, app);

latency_ui(ui, app, store_context);
}

ui.with_layout(egui::Layout::right_to_left(egui::Align::Center), |ui| {
Expand Down Expand Up @@ -352,7 +353,68 @@ fn memory_use_label_ui(ui: &mut egui::Ui, gpu_resource_stats: &WgpuResourcePoolS
}
}

fn input_latency_label_ui(ui: &mut egui::Ui, app: &mut App) {
fn latency_ui(ui: &mut egui::Ui, app: &mut App, store_context: Option<&StoreContext<'_>>) {
if let Some(response) = e2e_latency_ui(ui, store_context) {
// Show queue latency on hover, as that is part of this.
// For instance, if the framerate is really bad we have less time to ingest incoming data,
// leading to an ever-increasing input queue.
let rx = app.msg_receive_set();
let queue_len = rx.queue_len();
let latency_sec = rx.latency_ns() as f32 / 1e9;
// empty queue == unreliable latency
if 0 < queue_len {
response.on_hover_ui(|ui| {
ui.label(format!(
"Queue latency: {}, length: {}",
latency_text(latency_sec),
format_number(queue_len),
));

ui.label(
"When more data is arriving over network than the Rerun Viewer can ingest, a queue starts building up, leading to latency and increased RAM use.\n\
We call this the queue latency.");
});
}
} else {
// If we don't know the e2e latency we can still show the queue latency.
input_queue_latency_ui(ui, app);
}
}

/// Shows the e2e latency.
fn e2e_latency_ui(
ui: &mut egui::Ui,
store_context: Option<&StoreContext<'_>>,
) -> Option<egui::Response> {
let Some(store_context) = store_context else {
return None;
};

let Some(recording) = store_context.recording else {
return None;
};

let Some(e2e_latency_sec) = recording.ingestion_stats().current_e2e_latency_sec() else {
return None;
};

if e2e_latency_sec > 60.0 {
return None; // Probably an old recording and not live data.
}

let text = format!("latency: {}", latency_text(e2e_latency_sec));
let response = ui.weak(text);

let hover_text = "End-to-end latency from when the data was logged by the SDK to when it is shown in the viewer.\n\
This includes time for encoding, network latency, and decoding.\n\
It is also affected by the framerate of the viewer.\n\
This latency is inaccurate if the logging was done on a different machine, since it is clock-based.";

Some(response.on_hover_text(hover_text))
}

/// Shows the latency in the input queue.
fn input_queue_latency_ui(ui: &mut egui::Ui, app: &mut App) {
let rx = app.msg_receive_set();

if rx.is_empty() {
Expand All @@ -374,12 +436,12 @@ fn input_latency_label_ui(ui: &mut egui::Ui, app: &mut App) {
ui.separator();
if is_latency_interesting {
let text = format!(
"Latency: {:.2}s, queue: {}",
latency_sec,
"Queue latency: {}, length: {}",
latency_text(latency_sec),
format_number(queue_len),
);
let hover_text =
"When more data is arriving over network than the Rerun Viewer can index, a queue starts building up, leading to latency and increased RAM use.\n\
"When more data is arriving over network than the Rerun Viewer can ingest, a queue starts building up, leading to latency and increased RAM use.\n\
This latency does NOT include network latency.";

if latency_sec < app.app_options().warn_latency {
Expand All @@ -394,3 +456,11 @@ fn input_latency_label_ui(ui: &mut egui::Ui, app: &mut App) {
}
}
}

fn latency_text(latency_sec: f32) -> String {
if latency_sec < 1.0 {
format!("{:.0} ms", 1e3 * latency_sec)
} else {
format!("{latency_sec:.1} s")
}
}
6 changes: 5 additions & 1 deletion crates/rerun_c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,10 @@ fn rr_log_impl(
data_row: CDataRow,
inject_time: bool,
) -> Result<(), CError> {
// Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency.
// TODO(emilk): move to before we arrow-serialize the data
let row_id = re_sdk::log::RowId::new();

let stream = recording_stream(stream)?;

let CDataRow {
Expand Down Expand Up @@ -649,7 +653,7 @@ fn rr_log_impl(
}

let data_row = DataRow::from_cells(
re_sdk::log::RowId::new(),
row_id,
TimePoint::default(), // we use the one in the recording stream for now
entity_path,
num_instances,
Expand Down
Loading
Loading