Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show e2e latency in metric ui in top panel #4502

Merged
merged 11 commits into from
Dec 14, 2023
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/re_data_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ re_types_core.workspace = true
ahash.workspace = true
document-features.workspace = true
egui_plot.workspace = true
emath.workspace = true
getrandom.workspace = true
itertools.workspace = true
nohash-hasher.workspace = true
parking_lot.workspace = true
rmp-serde = { workspace = true, optional = true }
serde = { workspace = true, features = ["derive", "rc"], optional = true }
thiserror.workspace = true
Expand Down
68 changes: 68 additions & 0 deletions crates/re_data_store/src/store_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::BTreeMap;

use itertools::Itertools;
use nohash_hasher::IntMap;
use parking_lot::Mutex;

use re_arrow_store::{
DataStore, DataStoreConfig, GarbageCollectionOptions, StoreEvent, StoreSubscriber,
Expand Down Expand Up @@ -108,6 +109,8 @@ pub struct StoreDb {

/// Stores all components for all entities for all timelines.
data_store: DataStore,

stats: IngestionStatistics,
}

impl StoreDb {
Expand All @@ -125,6 +128,7 @@ impl StoreDb {
InstanceKey::name(),
DataStoreConfig::default(),
),
stats: Default::default(),
}
}

Expand Down Expand Up @@ -224,6 +228,11 @@ impl StoreDb {
self.entity_path_from_hash.values().sorted().collect()
}

#[inline]
pub fn ingestion_stats(&self) -> &IngestionStatistics {
&self.stats
}

#[inline]
pub fn entity_path_from_hash(&self, entity_path_hash: &EntityPathHash) -> Option<&EntityPath> {
self.entity_path_from_hash.get(entity_path_hash)
Expand Down Expand Up @@ -298,6 +307,8 @@ impl StoreDb {
DEFAULT_INSERT_ROW_STEP_SIZE,
)?;

let row_id = store_event.row_id;

// First-pass: update our internal views by notifying them of resulting [`StoreEvent`]s.
//
// This might result in a [`ClearCascade`] if the events trigger one or more immediate
Expand All @@ -318,6 +329,8 @@ impl StoreDb {
// cascades, thus this whole process must stabilize after one iteration.
debug_assert!(clear_cascade.is_empty());

self.stats.on_new_row_id(row_id);
emilk marked this conversation as resolved.
Show resolved Hide resolved

Ok(())
}

Expand Down Expand Up @@ -462,6 +475,7 @@ impl StoreDb {
times_per_timeline,
tree,
data_store: _,
stats: _,
} = self;

times_per_timeline.on_events(store_events);
Expand All @@ -477,3 +491,57 @@ impl StoreDb {
.map(|info| (info.application_id.0.as_str(), info.started))
}
}

// ----------------------------------------------------------------------------

pub struct IngestionStatistics {
e2e_latency_sec_history: Mutex<emath::History<f32>>,
emilk marked this conversation as resolved.
Show resolved Hide resolved
}

impl Default for IngestionStatistics {
fn default() -> Self {
let min_samples = 0; // 0: we stop displaying e2e latency if input stops
let max_samples = 1024; // don't waste too much memory on this - we just need enough to get a good average
let max_age = 1.0; // don't keep too long of a rolling average, or the stats get outdated.
Self {
e2e_latency_sec_history: Mutex::new(emath::History::new(
min_samples..max_samples,
max_age,
)),
}
}
}

impl IngestionStatistics {
pub fn on_new_row_id(&mut self, row_id: RowId) {
if let Ok(duration_since_epoch) = web_time::SystemTime::UNIX_EPOCH.elapsed() {
let nanos_since_epoch = duration_since_epoch.as_nanos() as u64;

// This only makes sense if the clocks are very good, i.e. if the recording was on the same machine!
if let Some(nanos_since_log) =
nanos_since_epoch.checked_sub(row_id.nanoseconds_since_epoch())
{
let now = nanos_since_epoch as f64 / 1e9;
let sec_since_log = nanos_since_log as f32 / 1e9;

self.e2e_latency_sec_history.lock().add(now, sec_since_log);
}
}
}

/// What is the mean latency between the time data was logged in the SDK and the time it was ingested?
///
/// This is based on the clocks of the viewer and the SDK being in sync,
/// so if the recording was done on another machine, this is likely very inaccurate.
pub fn current_e2e_latency_sec(&self) -> Option<f32> {
let mut e2e_latency_sec_history = self.e2e_latency_sec_history.lock();

if let Ok(duration_since_epoch) = web_time::SystemTime::UNIX_EPOCH.elapsed() {
let nanos_since_epoch = duration_since_epoch.as_nanos() as u64;
let now = nanos_since_epoch as f64 / 1e9;
e2e_latency_sec_history.flush(now); // make sure the average is up-to-date.
}

e2e_latency_sec_history.average()
}
}
6 changes: 6 additions & 0 deletions crates/re_log_types/src/data_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ impl RowId {
pub fn incremented_by(&self, n: u64) -> Self {
Self(self.0.incremented_by(n))
}

/// When the `RowId` was created, in nanoseconds since unix epoch.
#[inline]
pub fn nanoseconds_since_epoch(&self) -> u64 {
self.0.nanoseconds_since_epoch()
}
}

impl SizeBytes for RowId {
Expand Down
23 changes: 16 additions & 7 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,9 @@ impl RecordingStream {
timeless: bool,
arch: &impl AsComponents,
) -> RecordingStreamResult<()> {
self.log_component_batches(
let row_id = RowId::new(); // Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency.
self.log_component_batches_impl(
row_id,
ent_path,
timeless,
arch.as_component_batches()
Expand Down Expand Up @@ -802,6 +804,17 @@ impl RecordingStream {
ent_path: impl Into<EntityPath>,
timeless: bool,
comp_batches: impl IntoIterator<Item = &'a dyn ComponentBatch>,
) -> RecordingStreamResult<()> {
let row_id = RowId::new(); // Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency.
self.log_component_batches_impl(row_id, ent_path, timeless, comp_batches)
}

fn log_component_batches_impl<'a>(
&self,
row_id: RowId,
ent_path: impl Into<EntityPath>,
timeless: bool,
comp_batches: impl IntoIterator<Item = &'a dyn ComponentBatch>,
) -> RecordingStreamResult<()> {
if !self.is_enabled() {
return Ok(()); // silently drop the message
Expand Down Expand Up @@ -854,7 +867,7 @@ impl RecordingStream {
None
} else {
Some(DataRow::from_cells(
RowId::new(),
row_id,
timepoint.clone(),
ent_path.clone(),
num_instances as _,
Expand All @@ -868,11 +881,7 @@ impl RecordingStream {
} else {
splatted.push(DataCell::from_native([InstanceKey::SPLAT]));
Some(DataRow::from_cells(
RowId::new(),
timepoint,
ent_path,
1,
splatted,
row_id, timepoint, ent_path, 1, splatted,
)?)
};

Expand Down
7 changes: 6 additions & 1 deletion crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ impl App {

let component_ui_registry = re_data_ui::create_component_ui_registry();

// TODO(emilk): `Instant::MIN` when we have our own `Instant` that supports it.;
let long_time_ago = web_time::Instant::now()
.checked_sub(web_time::Duration::from_secs(1_000_000_000))
.unwrap_or(web_time::Instant::now());

Self {
build_info,
startup_options,
Expand All @@ -230,7 +235,7 @@ impl App {

style_panel_open: false,

latest_queue_interest: web_time::Instant::now(), // TODO(emilk): `Instant::MIN` when we have our own `Instant` that supports it.
latest_queue_interest: long_time_ago,

frame_time_history: egui::util::History::new(1..100, 0.5),

Expand Down
79 changes: 74 additions & 5 deletions crates/re_viewer/src/ui/top_panel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ fn top_bar_ui(
ui.separator();
frame_time_label_ui(ui, app);
memory_use_label_ui(ui, gpu_resource_stats);
input_latency_label_ui(ui, app);

latency_ui(ui, app, store_context);
}

ui.with_layout(egui::Layout::right_to_left(egui::Align::Center), |ui| {
Expand Down Expand Up @@ -352,7 +353,67 @@ fn memory_use_label_ui(ui: &mut egui::Ui, gpu_resource_stats: &WgpuResourcePoolS
}
}

fn input_latency_label_ui(ui: &mut egui::Ui, app: &mut App) {
fn latency_ui(ui: &mut egui::Ui, app: &mut App, store_context: Option<&StoreContext<'_>>) {
if let Some(response) = e2e_latency_ui(ui, store_context) {
// Show queue latency on hover, as that is part of this.
// For instance, if the framerate is really bad we have less time to ingest incoming data,
// leading to an ever-increasing input queue.
let rx = app.msg_receive_set();
let queue_len = rx.queue_len();
let latency_sec = rx.latency_ns() as f32 / 1e9;
// empty queue == unreliable latency
if 0 < queue_len {
response.on_hover_ui(|ui| {
ui.label(format!(
"Queue latency: {}, length: {}",
latency_text(latency_sec),
format_number(queue_len),
));

ui.label(
"When more data is arriving over network than the Rerun Viewer can ingest, a queue starts building up, leading to latency and increased RAM use.\n\
We call this the queue latency.");
});
}
} else {
// If we don't know the e2e latency we can still show the queue latency.
input_queue_latency_ui(ui, app);
}
}

/// Shows the e2e latency.
fn e2e_latency_ui(
ui: &mut egui::Ui,
store_context: Option<&StoreContext<'_>>,
) -> Option<egui::Response> {
let Some(store_context) = store_context else {
return None;
};

let Some(recording) = store_context.recording else {
return None;
};

let Some(e2e_latency_sec) = recording.ingestion_stats().current_e2e_latency_sec() else {
return None;
};

if e2e_latency_sec > 60.0 {
return None; // Probably an old recording and not live data.
}

let text = format!("e2e latency: {}", latency_text(e2e_latency_sec));

let hover_text = "End-to-end latency from when the data was logged by the SDK to when it is shown in the viewer.\n\
This includes time for encoding, network latency, and decoding.\n\
It is also affected by the framerate of the viewer.\n\
This latency is inaccurate if the logging was done on a different machine, since it is clock-based.";

Some(ui.weak(text).on_hover_text(hover_text))
}

/// Shows the latency in the input queue.
fn input_queue_latency_ui(ui: &mut egui::Ui, app: &mut App) {
let rx = app.msg_receive_set();

if rx.is_empty() {
Expand All @@ -374,12 +435,12 @@ fn input_latency_label_ui(ui: &mut egui::Ui, app: &mut App) {
ui.separator();
if is_latency_interesting {
let text = format!(
"Latency: {:.2}s, queue: {}",
latency_sec,
"Queue latency: {}, length: {}",
latency_text(latency_sec),
format_number(queue_len),
);
let hover_text =
"When more data is arriving over network than the Rerun Viewer can index, a queue starts building up, leading to latency and increased RAM use.\n\
"When more data is arriving over network than the Rerun Viewer can ingest, a queue starts building up, leading to latency and increased RAM use.\n\
This latency does NOT include network latency.";

if latency_sec < app.app_options().warn_latency {
Expand All @@ -394,3 +455,11 @@ fn input_latency_label_ui(ui: &mut egui::Ui, app: &mut App) {
}
}
}

fn latency_text(latency_sec: f32) -> String {
if latency_sec < 1.0 {
format!("{:.0}ms", 1e3 * latency_sec)
} else {
format!("{latency_sec:.1}s")
}
}
4 changes: 3 additions & 1 deletion crates/rerun_c/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,8 @@ fn rr_log_impl(
data_row: CDataRow,
inject_time: bool,
) -> Result<(), CError> {
let row_id = re_sdk::log::RowId::new(); // Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency.
emilk marked this conversation as resolved.
Show resolved Hide resolved

let stream = recording_stream(stream)?;

let CDataRow {
Expand Down Expand Up @@ -649,7 +651,7 @@ fn rr_log_impl(
}

let data_row = DataRow::from_cells(
re_sdk::log::RowId::new(),
row_id,
TimePoint::default(), // we use the one in the recording stream for now
entity_path,
num_instances,
Expand Down
4 changes: 3 additions & 1 deletion rerun_py/src/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ pub fn build_data_row_from_components(
components: &PyDict,
time_point: &TimePoint,
) -> PyResult<DataRow> {
let row_id = RowId::new(); // Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency.
emilk marked this conversation as resolved.
Show resolved Hide resolved

let (arrays, fields): (Vec<Box<dyn Array>>, Vec<Field>) = itertools::process_results(
components.iter().map(|(name, array)| {
let name = name.downcast::<PyString>()?.to_str()?;
Expand All @@ -66,7 +68,7 @@ pub fn build_data_row_from_components(

let num_instances = cells.first().map_or(0, |cell| cell.num_instances());
let row = DataRow::from_cells(
RowId::new(),
row_id,
time_point.clone(),
entity_path.clone(),
num_instances,
Expand Down
Loading