Skip to content

Commit

Permalink
Simpler, sturdier stateful time tracking in both SDKs (#2506)
Browse files Browse the repository at this point in the history
(Probably easier to review commit by commit)

Follow up to my discussion with @abey79 regarding his poor experience
with time tracking, which was summarized in
#2501 (comment):
> - When implementing the `step` timeline of the scalar example, I first
searched for `set_time_sequence` (used in Python API) in the Rust docs,
which I found in `RecordingStream`. Turns out it's not in 0.7 and bugged
on `main` (or rather ignored by `MsgSender`). Again, it compiled and
displayed no error, but no timeline was created.

This PR makes it so that `RecordingStream` is always in charge of
injecting its internal clock into outgoing rows (unless the caller ask
it not to, e.g. because the data is meant to be timeless).
This is pretty similar to what was already in place for `log_tick`,
except it now applies to every timelines, whether they are builtin or
user defined.

- Within the Python SDK, this gets rid of all the existing manual time
injection stuff.
- On the Rust SDK's side, this fixes the issue that `MsgSender` used to
ignore the internal clock altogether (i.e. the stateful time APIs were
not supported at all for Rust users).
- And finally this cleans up the Rust examples a bunch since we now have
access to stateful time.


---

<!-- This line will get updated when the PR build summary job finishes.
-->
PR Build Summary: https://build.rerun.io/pr/2506

<!-- pr-link-docs:start -->
Docs preview: https://rerun.io/preview/178edf5/docs
Examples preview: https://rerun.io/preview/178edf5/examples
<!-- pr-link-docs:end -->
  • Loading branch information
teh-cmc authored and emilk committed Jun 29, 2023
1 parent 29a8537 commit 3ce7d0e
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 185 deletions.
11 changes: 7 additions & 4 deletions crates/re_sdk/src/msg_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl MsgSender {
Self {
entity_path: ent_path.into(),

timepoint: [(Timeline::log_time(), Time::now().into())].into(),
timepoint: TimePoint::default(),
timeless: false,

num_instances: None,
Expand All @@ -116,7 +116,7 @@ impl MsgSender {
let ent_path = re_log_types::EntityPath::from_file_path_as_single_string(file_path);
let cell = re_components::data_cell_from_file_path(file_path)?;

let mut timepoint = TimePoint::from([(Timeline::log_time(), Time::now().into())]);
let mut timepoint = TimePoint::default();

// This may sounds like a good idea, but that means `rerun *.jpg` will
// actually act like it is playing a bunch of files over time, perhaps over many years.
Expand Down Expand Up @@ -160,6 +160,7 @@ impl MsgSender {
/// `MsgSender` automatically keeps track of the logging time, which is recorded when
/// [`Self::new`] is first called.
#[inline]
#[doc(hidden)]
pub fn with_timepoint(mut self, timepoint: TimePoint) -> Self {
for (timeline, time) in timepoint {
self.timepoint.insert(timeline, time);
Expand All @@ -176,6 +177,7 @@ impl MsgSender {
/// `MsgSender` automatically keeps track of the logging time, which is recorded when
/// [`Self::new`] is first called.
#[inline]
#[doc(hidden)]
pub fn with_time(mut self, timeline: Timeline, time: impl Into<TimeInt>) -> Self {
self.timepoint.insert(timeline, time.into());
self
Expand Down Expand Up @@ -294,16 +296,17 @@ impl MsgSender {
return Ok(()); // silently drop the message
}

let timeless = self.timeless;
let [row_standard, row_splats] = self.into_rows();

if let Some(row_splats) = row_splats {
rec_stream.record_row(row_splats);
rec_stream.record_row(row_splats, !timeless);
}

// Always the primary component last so range-based queries will include the other data.
// Since the primary component can't be splatted it must be in msg_standard, see(#1215).
if let Some(row_standard) = row_standard {
rec_stream.record_row(row_standard);
rec_stream.record_row(row_standard, !timeless);
}

Ok(())
Expand Down
52 changes: 33 additions & 19 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,11 +599,7 @@ impl RecordingStream {
///
/// This is a convenience wrapper for [`Self::record_msg`].
#[inline]
pub fn record_path_op(
&self,
timepoint: re_log_types::TimePoint,
path_op: re_log_types::PathOp,
) {
pub fn record_path_op(&self, path_op: re_log_types::PathOp) {
let Some(this) = &*self.inner else {
re_log::warn_once!("Recording disabled - call to record_path_op() ignored");
return;
Expand All @@ -613,18 +609,21 @@ impl RecordingStream {
this.info.store_id.clone(),
re_log_types::EntityPathOpMsg {
row_id: re_log_types::RowId::random(),
time_point: timepoint,
time_point: self.now(),
path_op,
},
));
}

/// Records a single [`DataRow`].
///
/// If `inject_time` is set to `true`, the row's timestamp data will be overridden using the
/// [`RecordingStream`]'s internal clock.
///
/// Internally, incoming [`DataRow`]s are automatically coalesced into larger [`DataTable`]s to
/// optimize for transport.
#[inline]
pub fn record_row(&self, mut row: DataRow) {
pub fn record_row(&self, mut row: DataRow, inject_time: bool) {
let Some(this) = &*self.inner else {
re_log::warn_once!("Recording disabled - call to record_row() ignored");
return;
Expand All @@ -635,8 +634,17 @@ impl RecordingStream {
//
// NOTE: We're incrementing the current tick still.
let tick = this.tick.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if !row.timepoint().is_timeless() {
row.timepoint.insert(Timeline::log_tick(), tick.into());
if inject_time {
// Get the current time on all timelines, for the current recording, on the current
// thread...
let mut now = self.now();
// ...and then also inject the current recording tick into it.
now.insert(Timeline::log_tick(), tick.into());

// Inject all these times into the row, overriding conflicting times, if any.
for (timeline, time) in now {
row.timepoint.insert(timeline, time);
}
}

this.batcher.push_row(row);
Expand Down Expand Up @@ -860,7 +868,11 @@ impl RecordingStream {
/// For example: `rec.set_time_sequence("frame_nr", frame_nr)`.
///
/// You can remove a timeline again using `set_time_sequence("frame_nr", None)`.
pub fn set_time_sequence(&self, timeline: impl Into<TimelineName>, sequence: Option<i64>) {
pub fn set_time_sequence(
&self,
timeline: impl Into<TimelineName>,
sequence: impl Into<Option<i64>>,
) {
let Some(this) = &*self.inner else {
re_log::warn_once!("Recording disabled - call to set_time_sequence() ignored");
return;
Expand All @@ -869,7 +881,7 @@ impl RecordingStream {
ThreadInfo::set_thread_time(
&this.info.store_id,
Timeline::new(timeline, TimeType::Sequence),
sequence.map(TimeInt::from),
sequence.into().map(TimeInt::from),
);
}

Expand All @@ -880,7 +892,7 @@ impl RecordingStream {
/// For example: `rec.set_time_seconds("sim_time", sim_time_secs)`.
///
/// You can remove a timeline again using `rec.set_time_seconds("sim_time", None)`.
pub fn set_time_seconds(&self, timeline: &str, seconds: Option<f64>) {
pub fn set_time_seconds(&self, timeline: &str, seconds: impl Into<Option<f64>>) {
let Some(this) = &*self.inner else {
re_log::warn_once!("Recording disabled - call to set_time_seconds() ignored");
return;
Expand All @@ -889,7 +901,9 @@ impl RecordingStream {
ThreadInfo::set_thread_time(
&this.info.store_id,
Timeline::new(timeline, TimeType::Time),
seconds.map(|secs| Time::from_seconds_since_epoch(secs).into()),
seconds
.into()
.map(|secs| Time::from_seconds_since_epoch(secs).into()),
);
}

Expand All @@ -900,7 +914,7 @@ impl RecordingStream {
/// For example: `rec.set_time_seconds("sim_time", sim_time_nanos)`.
///
/// You can remove a timeline again using `rec.set_time_seconds("sim_time", None)`.
pub fn set_time_nanos(&self, timeline: &str, ns: Option<i64>) {
pub fn set_time_nanos(&self, timeline: &str, ns: impl Into<Option<i64>>) {
let Some(this) = &*self.inner else {
re_log::warn_once!("Recording disabled - call to set_time_nanos() ignored");
return;
Expand All @@ -909,7 +923,7 @@ impl RecordingStream {
ThreadInfo::set_thread_time(
&this.info.store_id,
Timeline::new(timeline, TimeType::Time),
ns.map(|ns| Time::from_ns_since_epoch(ns).into()),
ns.into().map(|ns| Time::from_ns_since_epoch(ns).into()),
);
}

Expand Down Expand Up @@ -956,7 +970,7 @@ mod tests {
let mut table = data_table_example(false);
table.compute_all_size_bytes();
for row in table.to_rows() {
rec_stream.record_row(row);
rec_stream.record_row(row, false);
}

let storage = rec_stream.memory();
Expand Down Expand Up @@ -1021,7 +1035,7 @@ mod tests {
let mut table = data_table_example(false);
table.compute_all_size_bytes();
for row in table.to_rows() {
rec_stream.record_row(row);
rec_stream.record_row(row, false);
}

let storage = rec_stream.memory();
Expand Down Expand Up @@ -1101,7 +1115,7 @@ mod tests {
let mut table = data_table_example(false);
table.compute_all_size_bytes();
for row in table.to_rows() {
rec_stream.record_row(row);
rec_stream.record_row(row, false);
}

{
Expand Down Expand Up @@ -1169,7 +1183,7 @@ mod tests {
let mut table = data_table_example(false);
table.compute_all_size_bytes();
for row in table.to_rows() {
rec_stream.record_row(row);
rec_stream.record_row(row, false);
}

let mut msgs = {
Expand Down
12 changes: 5 additions & 7 deletions docs/content/getting-started/logging-rust.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ Rerun has rich support for time: whether you want concurrent or disjoint timelin

Let's add our custom timeline:
```rust
let stable_time = Timeline::new("stable_time", TimeType::Time);

for i in 0..400 {
let time = i as f32 * 0.01;

rec_stream.set_time_seconds("stable_time", time as f64);

let times = offsets.iter().map(|offset| time + offset).collect_vec();
let (beads, colors): (Vec<_>, Vec<_>) = points1
.iter()
Expand All @@ -226,15 +226,14 @@ for i in 0..400 {
.unzip();

MsgSender::new("dna/structure/scaffolding/beads")
.with_time(stable_time, Time::from_seconds_since_epoch(time as _))
.with_component(&beads)?
.with_component(&colors)?
.with_splat(Radius(0.06))?
.send(&recording)?;
}
```

First we [declare a name and type](https://docs.rs/rerun/latest/rerun/time/struct.Timeline.html#new) for our `Timeline`, then we pass it to [`MsgSender::with_time`](https://docs.rs/rerun/latest/rerun/struct.MsgSender.html#with_time) along with a timestamp for our data.
First we use [`RecordingStream::set_time_seconds`](https://docs.rs/rerun/latest/rerun/struct.RecordingStream.html#method.set_time_seconds) to declare our own custom `Timeline` and set the current timestamp.
You can add as many timelines and timestamps as you want when logging data.

⚠️ If you run this code as is, the result will be.. surprising: the beads are animating as expected, but everything we've logged until that point is gone! ⚠️
Expand All @@ -246,9 +245,9 @@ Enter...
### Latest At semantics

That's because the Rerun Viewer has switched to displaying your custom timeline by default, but the original data was only logged to the *default* timeline (called `log_time`).
To fix this, go back through the previous logging calls we made and add this:
To fix this, add this at the beginning of the main function:
```rust
.with_time(stable_time, 0)
rec_stream.set_time_seconds("stable_time", 0f64);
```

![logging data - latest at](https://static.rerun.io/0182b4795ca2fed2f2097cfa5f5271115dee0aaf_logging_data8_latest_at.png)
Expand All @@ -270,7 +269,6 @@ Expand the previous loop to also include:
for i in 0..400 {
// ...everything else...
MsgSender::new("dna/structure")
.with_time(stable_time, Time::from_seconds_since_epoch(time as _))
.with_component(&[Transform3D::new(transform::RotationAxisAngle::new(
glam::Vec3::Z,
rerun::transform::Angle::Radians(time / 4.0 * TAU),
Expand Down
Loading

0 comments on commit 3ce7d0e

Please sign in to comment.