Skip to content

Commit

Permalink
Make RecordingId a string
Browse files Browse the repository at this point in the history
  • Loading branch information
jleibs committed May 11, 2023
1 parent 8bc6741 commit b9732f1
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 74 deletions.
2 changes: 1 addition & 1 deletion crates/re_data_store/examples/memory_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ fn log_messages() {
);
let table_bytes = live_bytes() - used_bytes_start;
let log_msg = Box::new(LogMsg::ArrowMsg(
recording_id,
recording_id.clone(),
table.to_arrow_msg().unwrap(),
));
let log_msg_bytes = live_bytes() - used_bytes_start;
Expand Down
4 changes: 2 additions & 2 deletions crates/re_data_store/src/log_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ impl LogDb {

pub fn recording_id(&self) -> RecordingId {
if let Some(msg) = &self.recording_msg {
msg.info.recording_id
msg.info.recording_id.clone()
} else {
RecordingId::ZERO
RecordingId::unknown()
}
}

Expand Down
22 changes: 11 additions & 11 deletions crates/re_log_encoding/benches/msg_encode_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ fn decode_log_msgs(mut bytes: &[u8]) -> Vec<LogMsg> {
messages
}

fn generate_messages(recording_id: RecordingId, tables: &[DataTable]) -> Vec<LogMsg> {
fn generate_messages(recording_id: &RecordingId, tables: &[DataTable]) -> Vec<LogMsg> {
tables
.iter()
.map(|table| LogMsg::ArrowMsg(recording_id, table.to_arrow_msg().unwrap()))
.map(|table| LogMsg::ArrowMsg(recording_id.clone(), table.to_arrow_msg().unwrap()))
.collect()
}

Expand Down Expand Up @@ -89,14 +89,14 @@ fn mono_points_arrow(c: &mut Criterion) {
});
let tables = generate_tables();
group.bench_function("generate_messages", |b| {
b.iter(|| generate_messages(recording_id, &tables));
b.iter(|| generate_messages(&recording_id, &tables));
});
let messages = generate_messages(recording_id, &tables);
let messages = generate_messages(&recording_id, &tables);
group.bench_function("encode_log_msg", |b| {
b.iter(|| encode_log_msgs(&messages));
});
group.bench_function("encode_total", |b| {
b.iter(|| encode_log_msgs(&generate_messages(recording_id, &generate_tables())));
b.iter(|| encode_log_msgs(&generate_messages(&recording_id, &generate_tables())));
});

let encoded = encode_log_msgs(&messages);
Expand Down Expand Up @@ -145,14 +145,14 @@ fn mono_points_arrow_batched(c: &mut Criterion) {
});
let tables = [generate_table()];
group.bench_function("generate_messages", |b| {
b.iter(|| generate_messages(recording_id, &tables));
b.iter(|| generate_messages(&recording_id, &tables));
});
let messages = generate_messages(recording_id, &tables);
let messages = generate_messages(&recording_id, &tables);
group.bench_function("encode_log_msg", |b| {
b.iter(|| encode_log_msgs(&messages));
});
group.bench_function("encode_total", |b| {
b.iter(|| encode_log_msgs(&generate_messages(recording_id, &[generate_table()])));
b.iter(|| encode_log_msgs(&generate_messages(&recording_id, &[generate_table()])));
});

let encoded = encode_log_msgs(&messages);
Expand Down Expand Up @@ -202,14 +202,14 @@ fn batch_points_arrow(c: &mut Criterion) {
});
let tables = generate_tables();
group.bench_function("generate_messages", |b| {
b.iter(|| generate_messages(recording_id, &tables));
b.iter(|| generate_messages(&recording_id, &tables));
});
let messages = generate_messages(recording_id, &tables);
let messages = generate_messages(&recording_id, &tables);
group.bench_function("encode_log_msg", |b| {
b.iter(|| encode_log_msgs(&messages));
});
group.bench_function("encode_total", |b| {
b.iter(|| encode_log_msgs(&generate_messages(recording_id, &generate_tables())));
b.iter(|| encode_log_msgs(&generate_messages(&recording_id, &generate_tables())));
});

let encoded = encode_log_msgs(&messages);
Expand Down
43 changes: 21 additions & 22 deletions crates/re_log_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ mod data_table_batcher;
#[cfg(feature = "serde")]
pub mod serde_field;

use std::sync::Arc;

pub use self::arrow_msg::ArrowMsg;
pub use self::component::{Component, DeserializableComponent, SerializableComponent};
pub use self::component_types::context;
Expand Down Expand Up @@ -88,39 +90,32 @@ macro_rules! impl_into_enum {
// ----------------------------------------------------------------------------

/// A unique id per recording (a stream of [`LogMsg`]es).
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub struct RecordingId(uuid::Uuid);

impl nohash_hasher::IsEnabled for RecordingId {}

// required for [`nohash_hasher`].
#[allow(clippy::derived_hash_with_manual_eq)]
impl std::hash::Hash for RecordingId {
#[inline]
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
state.write_u64(self.0.as_u128() as u64);
}
}
pub struct RecordingId(Arc<String>);

impl Default for RecordingId {
fn default() -> Self {
Self::ZERO
Self::unknown()
}
}

impl RecordingId {
/// The recording id:s given to recordings that don't have an ID.
pub const ZERO: RecordingId = RecordingId(uuid::Uuid::nil());
const UNKNOWN: &str = "UNKNOWN";

#[inline]
pub fn unknown() -> Self {
RecordingId::UNKNOWN.into()
}

#[inline]
pub fn random() -> Self {
Self(uuid::Uuid::new_v4())
Self(Arc::new(uuid::Uuid::new_v4().to_string()))
}

#[inline]
pub fn from_uuid(uuid: uuid::Uuid) -> Self {
Self(uuid)
Self(Arc::new(uuid.to_string()))
}
}

Expand All @@ -130,11 +125,15 @@ impl std::fmt::Display for RecordingId {
}
}

impl std::str::FromStr for RecordingId {
type Err = <uuid::Uuid as std::str::FromStr>::Err;
impl From<&str> for RecordingId {
fn from(s: &str) -> Self {
Self(Arc::new(s.to_owned()))
}
}

fn from_str(s: &str) -> Result<Self, Self::Err> {
s.parse().map(Self)
impl From<String> for RecordingId {
fn from(s: String) -> Self {
Self(Arc::new(s))
}
}

Expand Down
36 changes: 18 additions & 18 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ fn forwarding_thread(
continue;
}
};
sink.send(LogMsg::ArrowMsg(info.recording_id, table));
sink.send(LogMsg::ArrowMsg(info.recording_id.clone(), table));
}

select! {
Expand All @@ -534,7 +534,7 @@ fn forwarding_thread(
continue;
}
};
sink.send(LogMsg::ArrowMsg(info.recording_id, table));
sink.send(LogMsg::ArrowMsg(info.recording_id.clone(), table));
}
recv(cmds_rx) -> res => {
let Ok(cmd) = res else {
Expand Down Expand Up @@ -600,7 +600,7 @@ impl RecordingStream {
};

self.record_msg(LogMsg::EntityPathOpMsg(
this.info.recording_id,
this.info.recording_id.clone(),
re_log_types::EntityPathOpMsg {
row_id: re_log_types::RowId::random(),
time_point: timepoint,
Expand Down Expand Up @@ -782,15 +782,15 @@ struct ThreadInfo {
}

impl ThreadInfo {
fn thread_now(rid: RecordingId) -> TimePoint {
fn thread_now(rid: &RecordingId) -> TimePoint {
Self::with(|ti| ti.now(rid))
}

fn set_thread_time(rid: RecordingId, timeline: Timeline, time_int: Option<TimeInt>) {
fn set_thread_time(rid: &RecordingId, timeline: Timeline, time_int: Option<TimeInt>) {
Self::with(|ti| ti.set_time(rid, timeline, time_int));
}

fn reset_thread_time(rid: RecordingId) {
fn reset_thread_time(rid: &RecordingId) {
Self::with(|ti| ti.reset_time(rid));
}

Expand All @@ -808,25 +808,25 @@ impl ThreadInfo {
})
}

fn now(&self, rid: RecordingId) -> TimePoint {
let mut timepoint = self.timepoints.get(&rid).cloned().unwrap_or_default();
fn now(&self, rid: &RecordingId) -> TimePoint {
let mut timepoint = self.timepoints.get(rid).cloned().unwrap_or_default();
timepoint.insert(Timeline::log_time(), Time::now().into());
timepoint
}

fn set_time(&mut self, rid: RecordingId, timeline: Timeline, time_int: Option<TimeInt>) {
fn set_time(&mut self, rid: &RecordingId, timeline: Timeline, time_int: Option<TimeInt>) {
if let Some(time_int) = time_int {
self.timepoints
.entry(rid)
.entry(rid.clone())
.or_default()
.insert(timeline, time_int);
} else if let Some(timepoint) = self.timepoints.get_mut(&rid) {
} else if let Some(timepoint) = self.timepoints.get_mut(rid) {
timepoint.remove(&timeline);
}
}

fn reset_time(&mut self, rid: RecordingId) {
if let Some(timepoint) = self.timepoints.get_mut(&rid) {
fn reset_time(&mut self, rid: &RecordingId) {
if let Some(timepoint) = self.timepoints.get_mut(rid) {
*timepoint = TimePoint::default();
}
}
Expand All @@ -840,7 +840,7 @@ impl RecordingStream {
return TimePoint::default();
};

ThreadInfo::thread_now(this.info.recording_id)
ThreadInfo::thread_now(&this.info.recording_id)
}

/// Set the current time of the recording, for the current calling thread.
Expand All @@ -857,7 +857,7 @@ impl RecordingStream {
};

ThreadInfo::set_thread_time(
this.info.recording_id,
&this.info.recording_id,
Timeline::new(timeline, TimeType::Sequence),
sequence.map(TimeInt::from),
);
Expand All @@ -877,7 +877,7 @@ impl RecordingStream {
};

ThreadInfo::set_thread_time(
this.info.recording_id,
&this.info.recording_id,
Timeline::new(timeline, TimeType::Time),
seconds.map(|secs| Time::from_seconds_since_epoch(secs).into()),
);
Expand All @@ -897,7 +897,7 @@ impl RecordingStream {
};

ThreadInfo::set_thread_time(
this.info.recording_id,
&this.info.recording_id,
Timeline::new(timeline, TimeType::Time),
ns.map(|ns| Time::from_ns_since_epoch(ns).into()),
);
Expand All @@ -914,7 +914,7 @@ impl RecordingStream {
return;
};

ThreadInfo::reset_thread_time(this.info.recording_id);
ThreadInfo::reset_thread_time(&this.info.recording_id);
}
}

Expand Down
Loading

0 comments on commit b9732f1

Please sign in to comment.