Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make RecordingId a string #2088

Merged
merged 3 commits into from
May 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/re_data_store/examples/memory_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ fn log_messages() {
);
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(
recording_id,
recording_id.clone(),
table.to_arrow_msg().unwrap(),
));
let log_msg_bytes = live_bytes() - used_bytes_start;
Expand Down
4 changes: 2 additions & 2 deletions crates/re_data_store/src/log_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ impl LogDb {

pub fn recording_id(&self) -> RecordingId {
if let Some(msg) = &self.recording_msg {
msg.info.recording_id
msg.info.recording_id.clone()
} else {
RecordingId::ZERO
RecordingId::unknown()
}
}

Expand Down
22 changes: 11 additions & 11 deletions crates/re_log_encoding/benches/msg_encode_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ fn decode_log_msgs(mut bytes: &[u8]) -> Vec<LogMsg> {
messages
}

fn generate_messages(recording_id: RecordingId, tables: &[DataTable]) -> Vec<LogMsg> {
fn generate_messages(recording_id: &RecordingId, tables: &[DataTable]) -> Vec<LogMsg> {
tables
.iter()
.map(|table| LogMsg::ArrowMsg(recording_id, table.to_arrow_msg().unwrap()))
.map(|table| LogMsg::ArrowMsg(recording_id.clone(), table.to_arrow_msg().unwrap()))
.collect()
}

Expand Down Expand Up @@ -89,14 +89,14 @@ fn mono_points_arrow(c: &mut Criterion) {
});
let tables = generate_tables();
group.bench_function("generate_messages", |b| {
b.iter(|| generate_messages(recording_id, &tables));
b.iter(|| generate_messages(&recording_id, &tables));
});
let messages = generate_messages(recording_id, &tables);
let messages = generate_messages(&recording_id, &tables);
group.bench_function("encode_log_msg", |b| {
b.iter(|| encode_log_msgs(&messages));
});
group.bench_function("encode_total", |b| {
b.iter(|| encode_log_msgs(&generate_messages(recording_id, &generate_tables())));
b.iter(|| encode_log_msgs(&generate_messages(&recording_id, &generate_tables())));
});

let encoded = encode_log_msgs(&messages);
Expand Down Expand Up @@ -145,14 +145,14 @@ fn mono_points_arrow_batched(c: &mut Criterion) {
});
let tables = [generate_table()];
group.bench_function("generate_messages", |b| {
b.iter(|| generate_messages(recording_id, &tables));
b.iter(|| generate_messages(&recording_id, &tables));
});
let messages = generate_messages(recording_id, &tables);
let messages = generate_messages(&recording_id, &tables);
group.bench_function("encode_log_msg", |b| {
b.iter(|| encode_log_msgs(&messages));
});
group.bench_function("encode_total", |b| {
b.iter(|| encode_log_msgs(&generate_messages(recording_id, &[generate_table()])));
b.iter(|| encode_log_msgs(&generate_messages(&recording_id, &[generate_table()])));
});

let encoded = encode_log_msgs(&messages);
Expand Down Expand Up @@ -202,14 +202,14 @@ fn batch_points_arrow(c: &mut Criterion) {
});
let tables = generate_tables();
group.bench_function("generate_messages", |b| {
b.iter(|| generate_messages(recording_id, &tables));
b.iter(|| generate_messages(&recording_id, &tables));
});
let messages = generate_messages(recording_id, &tables);
let messages = generate_messages(&recording_id, &tables);
group.bench_function("encode_log_msg", |b| {
b.iter(|| encode_log_msgs(&messages));
});
group.bench_function("encode_total", |b| {
b.iter(|| encode_log_msgs(&generate_messages(recording_id, &generate_tables())));
b.iter(|| encode_log_msgs(&generate_messages(&recording_id, &generate_tables())));
});

let encoded = encode_log_msgs(&messages);
Expand Down
70 changes: 48 additions & 22 deletions crates/re_log_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ mod data_table_batcher;
#[cfg(feature = "serde")]
pub mod serde_field;

use std::sync::Arc;

pub use self::arrow_msg::ArrowMsg;
pub use self::component::{Component, DeserializableComponent, SerializableComponent};
pub use self::component_types::context;
Expand Down Expand Up @@ -88,39 +90,30 @@ macro_rules! impl_into_enum {
// ----------------------------------------------------------------------------

/// A unique id per recording (a stream of [`LogMsg`]es).
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub struct RecordingId(uuid::Uuid);

impl nohash_hasher::IsEnabled for RecordingId {}

// required for [`nohash_hasher`].
#[allow(clippy::derived_hash_with_manual_eq)]
impl std::hash::Hash for RecordingId {
#[inline]
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
state.write_u64(self.0.as_u128() as u64);
}
}
pub struct RecordingId(Arc<String>);

impl Default for RecordingId {
fn default() -> Self {
Self::ZERO
Self::unknown()
}
}

impl RecordingId {
/// The recording id:s given to recordings that don't have an ID.
pub const ZERO: RecordingId = RecordingId(uuid::Uuid::nil());
#[inline]
pub fn unknown() -> Self {
"UNKNOWN".into()
}

#[inline]
pub fn random() -> Self {
Self(uuid::Uuid::new_v4())
Self(Arc::new(uuid::Uuid::new_v4().to_string()))
}

#[inline]
pub fn from_uuid(uuid: uuid::Uuid) -> Self {
Self(uuid)
Self(Arc::new(uuid.to_string()))
}
}

Expand All @@ -130,11 +123,44 @@ impl std::fmt::Display for RecordingId {
}
}

impl std::str::FromStr for RecordingId {
type Err = <uuid::Uuid as std::str::FromStr>::Err;
impl From<&str> for RecordingId {
fn from(s: &str) -> Self {
Self(Arc::new(s.to_owned()))
}
}

impl From<String> for RecordingId {
fn from(s: String) -> Self {
Self(Arc::new(s))
}
}
jleibs marked this conversation as resolved.
Show resolved Hide resolved

impl RecordingId {
#[inline]
pub fn as_str(&self) -> &str {
self.0.as_str()
}
}

fn from_str(s: &str) -> Result<Self, Self::Err> {
s.parse().map(Self)
impl AsRef<str> for RecordingId {
#[inline]
fn as_ref(&self) -> &str {
self.as_str()
}
}

impl std::borrow::Borrow<str> for RecordingId {
#[inline]
fn borrow(&self) -> &str {
self.as_str()
}
}

impl std::ops::Deref for RecordingId {
type Target = str;
#[inline]
fn deref(&self) -> &str {
self.as_str()
}
}

Expand Down
36 changes: 18 additions & 18 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ fn forwarding_thread(
continue;
}
};
sink.send(LogMsg::ArrowMsg(info.recording_id, table));
sink.send(LogMsg::ArrowMsg(info.recording_id.clone(), table));
}

select! {
Expand All @@ -534,7 +534,7 @@ fn forwarding_thread(
continue;
}
};
sink.send(LogMsg::ArrowMsg(info.recording_id, table));
sink.send(LogMsg::ArrowMsg(info.recording_id.clone(), table));
}
recv(cmds_rx) -> res => {
let Ok(cmd) = res else {
Expand Down Expand Up @@ -600,7 +600,7 @@ impl RecordingStream {
};

self.record_msg(LogMsg::EntityPathOpMsg(
this.info.recording_id,
this.info.recording_id.clone(),
re_log_types::EntityPathOpMsg {
row_id: re_log_types::RowId::random(),
time_point: timepoint,
Expand Down Expand Up @@ -782,15 +782,15 @@ struct ThreadInfo {
}

impl ThreadInfo {
fn thread_now(rid: RecordingId) -> TimePoint {
fn thread_now(rid: &RecordingId) -> TimePoint {
Self::with(|ti| ti.now(rid))
}

fn set_thread_time(rid: RecordingId, timeline: Timeline, time_int: Option<TimeInt>) {
fn set_thread_time(rid: &RecordingId, timeline: Timeline, time_int: Option<TimeInt>) {
Self::with(|ti| ti.set_time(rid, timeline, time_int));
}

fn reset_thread_time(rid: RecordingId) {
fn reset_thread_time(rid: &RecordingId) {
Self::with(|ti| ti.reset_time(rid));
}

Expand All @@ -808,25 +808,25 @@ impl ThreadInfo {
})
}

fn now(&self, rid: RecordingId) -> TimePoint {
let mut timepoint = self.timepoints.get(&rid).cloned().unwrap_or_default();
fn now(&self, rid: &RecordingId) -> TimePoint {
let mut timepoint = self.timepoints.get(rid).cloned().unwrap_or_default();
timepoint.insert(Timeline::log_time(), Time::now().into());
timepoint
}

fn set_time(&mut self, rid: RecordingId, timeline: Timeline, time_int: Option<TimeInt>) {
fn set_time(&mut self, rid: &RecordingId, timeline: Timeline, time_int: Option<TimeInt>) {
if let Some(time_int) = time_int {
self.timepoints
.entry(rid)
.entry(rid.clone())
.or_default()
.insert(timeline, time_int);
} else if let Some(timepoint) = self.timepoints.get_mut(&rid) {
} else if let Some(timepoint) = self.timepoints.get_mut(rid) {
timepoint.remove(&timeline);
}
}

fn reset_time(&mut self, rid: RecordingId) {
if let Some(timepoint) = self.timepoints.get_mut(&rid) {
fn reset_time(&mut self, rid: &RecordingId) {
if let Some(timepoint) = self.timepoints.get_mut(rid) {
*timepoint = TimePoint::default();
}
}
Expand All @@ -840,7 +840,7 @@ impl RecordingStream {
return TimePoint::default();
};

ThreadInfo::thread_now(this.info.recording_id)
ThreadInfo::thread_now(&this.info.recording_id)
}

/// Set the current time of the recording, for the current calling thread.
Expand All @@ -857,7 +857,7 @@ impl RecordingStream {
};

ThreadInfo::set_thread_time(
this.info.recording_id,
&this.info.recording_id,
Timeline::new(timeline, TimeType::Sequence),
sequence.map(TimeInt::from),
);
Expand All @@ -877,7 +877,7 @@ impl RecordingStream {
};

ThreadInfo::set_thread_time(
this.info.recording_id,
&this.info.recording_id,
Timeline::new(timeline, TimeType::Time),
seconds.map(|secs| Time::from_seconds_since_epoch(secs).into()),
);
Expand All @@ -897,7 +897,7 @@ impl RecordingStream {
};

ThreadInfo::set_thread_time(
this.info.recording_id,
&this.info.recording_id,
Timeline::new(timeline, TimeType::Time),
ns.map(|ns| Time::from_ns_since_epoch(ns).into()),
);
Expand All @@ -914,7 +914,7 @@ impl RecordingStream {
return;
};

ThreadInfo::reset_thread_time(this.info.recording_id);
ThreadInfo::reset_thread_time(&this.info.recording_id);
}
}

Expand Down
Loading