Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simpler, sturdier stateful time tracking in both SDKs #2506

Merged
merged 8 commits into from
Jun 26, 2023
9 changes: 5 additions & 4 deletions crates/re_sdk/src/msg_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl MsgSender {
Self {
entity_path: ent_path.into(),

timepoint: [(Timeline::log_time(), Time::now().into())].into(),
timepoint: TimePoint::default(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like right now we're no longer using this timepoint anywhere as we're always seem to use the ones set on the recording (plus the ones the recording generates on adding a row)
I find it still very useful to have this here additionally (also makes logically a lot of sense), but feels like we're missing something in the (code) docs and examples about this 🤔 : MsgSender::timepoint doc string doesn't talk about this and we don't seem to have an example now that uses with_timestamp

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're still using that timestamp the same way we did before, it gets added directly into the DataRows that MsgSender generates (MsgSender::into_rows).

The only somewhat weird part is that these timestamps get potentially overridden by RecordingStream if the internal clock already has values for the same timelines.
The reason this is weird is because MsgSender shouldn't exist anymore to begin with, it's just annoying and serves no real purpose anymore... it'll go away with codegen, in the meantime I'll mark with_timestamp & friends as #[doc(hidden) and update the starting guide.

timeless: false,

num_instances: None,
Expand All @@ -116,7 +116,7 @@ impl MsgSender {
let ent_path = re_log_types::EntityPath::from_file_path_as_single_string(file_path);
let cell = re_components::data_cell_from_file_path(file_path)?;

let mut timepoint = TimePoint::from([(Timeline::log_time(), Time::now().into())]);
let mut timepoint = TimePoint::default();

// This may sounds like a good idea, but that means `rerun *.jpg` will
// actually act like it is playing a bunch of files over time, perhaps over many years.
Expand Down Expand Up @@ -294,16 +294,17 @@ impl MsgSender {
return Ok(()); // silently drop the message
}

let timeless = self.timeless;
let [row_standard, row_splats] = self.into_rows();

if let Some(row_splats) = row_splats {
rec_stream.record_row(row_splats);
rec_stream.record_row(row_splats, !timeless);
}

// Always the primary component last so range-based queries will include the other data.
// Since the primary component can't be splatted it must be in msg_standard, see(#1215).
if let Some(row_standard) = row_standard {
rec_stream.record_row(row_standard);
rec_stream.record_row(row_standard, !timeless);
}

Ok(())
Expand Down
34 changes: 21 additions & 13 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,11 +599,7 @@ impl RecordingStream {
///
/// This is a convenience wrapper for [`Self::record_msg`].
#[inline]
pub fn record_path_op(
&self,
timepoint: re_log_types::TimePoint,
path_op: re_log_types::PathOp,
) {
pub fn record_path_op(&self, path_op: re_log_types::PathOp) {
let Some(this) = &*self.inner else {
re_log::warn_once!("Recording disabled - call to record_path_op() ignored");
return;
Expand All @@ -613,18 +609,21 @@ impl RecordingStream {
this.info.store_id.clone(),
re_log_types::EntityPathOpMsg {
row_id: re_log_types::RowId::random(),
time_point: timepoint,
time_point: self.now(),
path_op,
},
));
}

/// Records a single [`DataRow`].
///
/// If `inject_time` is set to `true`, the row's timestamp data will be overridden using the
/// [`RecordingStream`]'s internal clock.
///
/// Internally, incoming [`DataRow`]s are automatically coalesced into larger [`DataTable`]s to
/// optimize for transport.
#[inline]
pub fn record_row(&self, mut row: DataRow) {
pub fn record_row(&self, mut row: DataRow, inject_time: bool) {
let Some(this) = &*self.inner else {
re_log::warn_once!("Recording disabled - call to record_row() ignored");
return;
Expand All @@ -635,8 +634,17 @@ impl RecordingStream {
//
// NOTE: We're incrementing the current tick still.
let tick = this.tick.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if !row.timepoint().is_timeless() {
row.timepoint.insert(Timeline::log_tick(), tick.into());
if inject_time {
// Get the current time on all timelines, for the current recording, on the current
// thread...
let mut now = self.now();
// ...and then also inject the current recording tick into it.
now.insert(Timeline::log_tick(), tick.into());

// Inject all these times into the row, overriding conflicting times, if any.
for (timeline, time) in now {
row.timepoint.insert(timeline, time);
}
}

this.batcher.push_row(row);
Expand Down Expand Up @@ -956,7 +964,7 @@ mod tests {
let mut table = data_table_example(false);
table.compute_all_size_bytes();
for row in table.to_rows() {
rec_stream.record_row(row);
rec_stream.record_row(row, false);
}

let storage = rec_stream.memory();
Expand Down Expand Up @@ -1021,7 +1029,7 @@ mod tests {
let mut table = data_table_example(false);
table.compute_all_size_bytes();
for row in table.to_rows() {
rec_stream.record_row(row);
rec_stream.record_row(row, false);
}

let storage = rec_stream.memory();
Expand Down Expand Up @@ -1101,7 +1109,7 @@ mod tests {
let mut table = data_table_example(false);
table.compute_all_size_bytes();
for row in table.to_rows() {
rec_stream.record_row(row);
rec_stream.record_row(row, false);
}

{
Expand Down Expand Up @@ -1169,7 +1177,7 @@ mod tests {
let mut table = data_table_example(false);
table.compute_all_size_bytes();
for row in table.to_rows() {
rec_stream.record_row(row);
rec_stream.record_row(row, false);
}

let mut msgs = {
Expand Down
Loading