Skip to content

Commit

Permalink
Fix premature pausing when reaching end of still-streaming stream (#2106
Browse files Browse the repository at this point in the history
)

* Fix premature pausing when reaching end of still-streaming stream
* Release channel after first web callback

---------

Co-authored-by: Jeremy Leibs <[email protected]>
  • Loading branch information
emilk and jleibs authored May 12, 2023
1 parent 0a19555 commit 223e0bd
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 28 deletions.
11 changes: 9 additions & 2 deletions crates/re_log_encoding/src/stream_rrd_from_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ mod web_event_listener {
/// var rrd = new Uint8Array(...); // Get an RRD from somewhere
/// window.postMessage(rrd, "*")
/// ```
pub fn stream_rrd_from_event_listener(on_msg: Arc<dyn Fn(LogMsg) + Send>) {
pub fn stream_rrd_from_event_listener(mut on_msg: Option<Arc<dyn Fn(LogMsg) + Send>>) {
let window = web_sys::window().expect("no global `window` exists");
let closure =
Closure::wrap(Box::new(
Expand All @@ -62,7 +62,14 @@ mod web_event_listener {
let uint8_array = Uint8Array::new(&message_event.data());
let result: Vec<u8> = uint8_array.to_vec();

crate::stream_rrd_from_http::decode_rrd(result, on_msg.clone());
// On the first incoming message_event, take the on_msg callback
// so that the channel drops dropped when we are done. This is
// necessary to allow the viewer to know that no more data is coming.
// TODO(jleibs): In live-streaming mode we don't want to do this and
// will instead want to clone the arc.
if let Some(on_msg) = on_msg.take() {
crate::stream_rrd_from_http::decode_rrd(result, on_msg);
}
}
Err(js_val) => {
re_log::error!("Incoming event was not a MessageEvent. {:?}", js_val);
Expand Down
47 changes: 42 additions & 5 deletions crates/re_smart_channel/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! A channel that keeps track of latency and queue length.
use std::sync::{
atomic::{AtomicU64, Ordering::Relaxed},
atomic::{AtomicBool, AtomicU64, Ordering::Relaxed},
Arc,
};

Expand Down Expand Up @@ -64,7 +64,12 @@ fn smart_channel_with_stats<T: Send>(
tx,
stats: stats.clone(),
};
let receiver = Receiver { rx, stats, source };
let receiver = Receiver {
rx,
stats,
source,
connected: AtomicBool::new(true),
};
(sender, receiver)
}

Expand Down Expand Up @@ -121,25 +126,57 @@ pub struct Receiver<T: Send> {
rx: crossbeam::channel::Receiver<(Instant, T)>,
stats: Arc<SharedStats>,
source: Source,
connected: AtomicBool,
}

impl<T: Send> Receiver<T> {
/// Are we still connected?
///
/// Once false, we will never be connected again: the source has run dry.
///
/// This is only updated once one of the receive methods fails.
pub fn is_connected(&self) -> bool {
self.connected.load(Relaxed)
}

pub fn recv(&self) -> Result<T, RecvError> {
let (sent, msg) = self.rx.recv()?;
let (sent, msg) = match self.rx.recv() {
Ok(x) => x,
Err(RecvError) => {
self.connected.store(false, Relaxed);
return Err(RecvError);
}
};
let latency_ns = sent.elapsed().as_nanos() as u64;
self.stats.latency_ns.store(latency_ns, Relaxed);
Ok(msg)
}

pub fn try_recv(&self) -> Result<T, TryRecvError> {
let (sent, msg) = self.rx.try_recv()?;
let (sent, msg) = match self.rx.try_recv() {
Ok(x) => x,
Err(err) => {
if err == TryRecvError::Disconnected {
self.connected.store(false, Relaxed);
}
return Err(err);
}
};
let latency_ns = sent.elapsed().as_nanos() as u64;
self.stats.latency_ns.store(latency_ns, Relaxed);
Ok(msg)
}

pub fn recv_timeout(&self, timeout: std::time::Duration) -> Result<T, RecvTimeoutError> {
let (sent, msg) = self.rx.recv_timeout(timeout)?;
let (sent, msg) = match self.rx.recv_timeout(timeout) {
Ok(x) => x,
Err(err) => {
if err == RecvTimeoutError::Disconnected {
self.connected.store(false, Relaxed);
}
return Err(err);
}
};
let latency_ns = sent.elapsed().as_nanos() as u64;
self.stats.latency_ns.store(latency_ns, Relaxed);
Ok(msg)
Expand Down
24 changes: 15 additions & 9 deletions crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ impl eframe::App for App {
log_db,
&self.re_ui,
&self.component_ui_registry,
self.rx.source(),
&self.rx,
);
}

Expand Down Expand Up @@ -964,7 +964,7 @@ impl AppState {
log_db: &LogDb,
re_ui: &re_ui::ReUi,
component_ui_registry: &ComponentUiRegistry,
data_source: &re_smart_channel::Source,
rx: &Receiver<LogMsg>,
) {
crate::profile_function!();

Expand All @@ -984,7 +984,7 @@ impl AppState {
let rec_cfg = recording_config_entry(
recording_configs,
selected_rec_id.clone(),
data_source,
rx.source(),
log_db,
);
let selected_app_id = log_db
Expand Down Expand Up @@ -1024,12 +1024,18 @@ impl AppState {
.blueprint_panel_and_viewport(&mut ctx, ui),
});

// move time last, so we get to see the first data first!
ctx.rec_cfg
.time_ctrl
.move_time(log_db.times_per_timeline(), ui.ctx().input(|i| i.stable_dt));
if ctx.rec_cfg.time_ctrl.play_state() == PlayState::Playing {
ui.ctx().request_repaint();
{
// We move the time at the very end of the frame,
// so we have one frame to see the first data before we move the time.
let dt = ui.ctx().input(|i| i.stable_dt);
let more_data_is_coming = rx.is_connected();
let needs_repaint =
ctx.rec_cfg
.time_ctrl
.update(log_db.times_per_timeline(), dt, more_data_is_coming);
if needs_repaint == re_viewer_context::NeedsRepaint::Yes {
ui.ctx().request_repaint();
}
}

if WATERMARK {
Expand Down
4 changes: 2 additions & 2 deletions crates/re_viewer/src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ impl WebHandle {
);
let egui_ctx = cc.egui_ctx.clone();
re_log_encoding::stream_rrd_from_http::stream_rrd_from_event_listener(
Arc::new(move |msg| {
Some(Arc::new(move |msg| {
egui_ctx.request_repaint(); // wake up ui thread
tx.send(msg).ok();
}),
})),
);

Box::new(crate::App::from_receiver(
Expand Down
6 changes: 6 additions & 0 deletions crates/re_viewer_context/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ slotmap::new_key_type! {
pub struct DataBlueprintGroupHandle;
}

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum NeedsRepaint {
Yes,
No,
}

// ---------------------------------------------------------------------------

/// Profiling macro for feature "puffin"
Expand Down
37 changes: 27 additions & 10 deletions crates/re_viewer_context/src/time_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::collections::{BTreeMap, BTreeSet};
use re_data_store::TimesPerTimeline;
use re_log_types::{Duration, TimeInt, TimeRange, TimeRangeF, TimeReal, TimeType, Timeline};

use crate::NeedsRepaint;

/// The time range we are currently zoomed in on.
#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize)]
pub struct TimeView {
Expand Down Expand Up @@ -111,12 +113,18 @@ impl Default for TimeControl {
}

impl TimeControl {
/// Update the current time
pub fn move_time(&mut self, times_per_timeline: &TimesPerTimeline, stable_dt: f32) {
/// Move the time forward (if playing), and perhaps pause if we've reached the end.
#[must_use]
pub fn update(
&mut self,
times_per_timeline: &TimesPerTimeline,
stable_dt: f32,
more_data_is_coming: bool,
) -> NeedsRepaint {
self.select_a_valid_timeline(times_per_timeline);

let Some(full_range) = self.full_range(times_per_timeline) else {
return;
return NeedsRepaint::No; // we have no data on this timeline yet, so bail
};

match self.play_state() {
Expand All @@ -132,6 +140,7 @@ impl TimeControl {
full_range.min
})
});
NeedsRepaint::No
}
PlayState::Playing => {
let dt = stable_dt.min(0.1) * self.speed;
Expand All @@ -141,11 +150,17 @@ impl TimeControl {
.entry(self.timeline)
.or_insert_with(|| TimeState::new(full_range.min));

if self.looping == Looping::Off && state.time >= full_range.max {
// We've reached the end - stop playing.
if self.looping == Looping::Off && full_range.max <= state.time {
// We've reached the end of the data
state.time = full_range.max.into();
self.pause();
return;

if more_data_is_coming {
// then let's wait for it without pausing!
return NeedsRepaint::No; // ui will wake up when more data arrives
} else {
self.pause();
return NeedsRepaint::No;
}
}

let loop_range = match self.looping {
Expand All @@ -166,10 +181,12 @@ impl TimeControl {
}

if let Some(loop_range) = loop_range {
if state.time > loop_range.max {
state.time = loop_range.min;
if loop_range.max < state.time {
state.time = loop_range.min; // loop!
}
}

NeedsRepaint::Yes
}
PlayState::Following => {
// Set the time to the max:
Expand All @@ -181,7 +198,7 @@ impl TimeControl {
entry.get_mut().time = full_range.max.into();
}
}
// no need for request_repaint - we already repaint when new data arrives
NeedsRepaint::No // no need for request_repaint - we already repaint when new data arrives
}
}
}
Expand Down

0 comments on commit 223e0bd

Please sign in to comment.