Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make every RecordingId typed and preclude the existence of 'Defaults' #2110

Merged
merged 8 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/re_data_store/examples/memory_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn live_bytes() -> usize {

// ----------------------------------------------------------------------------

use re_log_types::{entity_path, DataRow, RecordingId, RowId};
use re_log_types::{entity_path, DataRow, RecordingId, RecordingType, RowId};

fn main() {
log_messages();
Expand Down Expand Up @@ -91,7 +91,7 @@ fn log_messages() {

const NUM_POINTS: usize = 1_000;

let recording_id = RecordingId::random();
let recording_id = RecordingId::random(RecordingType::Data);
let timeline = Timeline::new_sequence("frame_nr");
let mut time_point = TimePoint::default();
time_point.insert(timeline, TimeInt::from(0));
Expand Down
27 changes: 18 additions & 9 deletions crates/re_data_store/src/log_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,10 @@ impl EntityDb {
// ----------------------------------------------------------------------------

/// A in-memory database built from a stream of [`LogMsg`]es.
#[derive(Default)]
pub struct LogDb {
/// The [`RecordingId`] for this log.
recording_id: RecordingId,

/// All [`EntityPathOpMsg`]s ever received.
entity_op_msgs: BTreeMap<RowId, EntityPathOpMsg>,

Expand All @@ -175,6 +177,16 @@ pub struct LogDb {
}

impl LogDb {
pub fn new(recording_id: RecordingId) -> Self {
Self {
recording_id,
entity_op_msgs: Default::default(),
data_source: None,
recording_msg: None,
entity_db: Default::default(),
}
}

pub fn recording_msg(&self) -> Option<&BeginRecordingMsg> {
self.recording_msg.as_ref()
}
Expand All @@ -183,12 +195,8 @@ impl LogDb {
self.recording_msg().map(|msg| &msg.info)
}

pub fn recording_id(&self) -> RecordingId {
if let Some(msg) = &self.recording_msg {
msg.info.recording_id.clone()
} else {
RecordingId::unknown()
}
pub fn recording_id(&self) -> &RecordingId {
&self.recording_id
}

pub fn timelines(&self) -> impl ExactSizeIterator<Item = &Timeline> {
Expand All @@ -208,7 +216,7 @@ impl LogDb {
+ self.entity_db.data_store.num_temporal_rows() as usize
}

pub fn is_default(&self) -> bool {
pub fn is_empty(&self) -> bool {
self.recording_msg.is_none() && self.num_rows() == 0
}

Expand All @@ -227,7 +235,7 @@ impl LogDb {
self.entity_db.add_path_op(*row_id, time_point, path_op);
}
LogMsg::ArrowMsg(_, inner) => self.entity_db.try_add_arrow_msg(inner)?,
LogMsg::Goodbye(_) => {}
LogMsg::Goodbye(_, _) => {}
}

Ok(())
Expand Down Expand Up @@ -264,6 +272,7 @@ impl LogDb {
let cutoff_times = self.entity_db.data_store.oldest_time_per_timeline();

let Self {
recording_id: _,
entity_op_msgs,
data_source: _,
recording_msg: _,
Expand Down
2 changes: 1 addition & 1 deletion crates/re_data_ui/src/log_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl DataUi for LogMsg {
LogMsg::BeginRecordingMsg(msg) => msg.data_ui(ctx, ui, verbosity, query),
LogMsg::EntityPathOpMsg(_, msg) => msg.data_ui(ctx, ui, verbosity, query),
LogMsg::ArrowMsg(_, msg) => msg.data_ui(ctx, ui, verbosity, query),
LogMsg::Goodbye(_) => {
LogMsg::Goodbye(_, _) => {
ui.label("Goodbye");
}
}
Expand Down
8 changes: 4 additions & 4 deletions crates/re_log_encoding/benches/msg_encode_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;

use re_log_types::{
datagen::{build_frame_nr, build_some_colors, build_some_point2d},
entity_path, DataRow, DataTable, Index, LogMsg, RecordingId, RowId, TableId,
entity_path, DataRow, DataTable, Index, LogMsg, RecordingId, RecordingType, RowId, TableId,
};

use criterion::{criterion_group, criterion_main, Criterion};
Expand Down Expand Up @@ -81,7 +81,7 @@ fn mono_points_arrow(c: &mut Criterion) {
}

{
let recording_id = RecordingId::random();
let recording_id = RecordingId::random(RecordingType::Data);
let mut group = c.benchmark_group("mono_points_arrow");
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
group.bench_function("generate_message_bundles", |b| {
Expand Down Expand Up @@ -137,7 +137,7 @@ fn mono_points_arrow_batched(c: &mut Criterion) {
}

{
let recording_id = RecordingId::random();
let recording_id = RecordingId::random(RecordingType::Data);
let mut group = c.benchmark_group("mono_points_arrow_batched");
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
group.bench_function("generate_message_bundles", |b| {
Expand Down Expand Up @@ -194,7 +194,7 @@ fn batch_points_arrow(c: &mut Criterion) {
}

{
let recording_id = RecordingId::random();
let recording_id = RecordingId::random(RecordingType::Data);
let mut group = c.benchmark_group("batch_points_arrow");
group.throughput(criterion::Throughput::Elements(NUM_POINTS as _));
group.bench_function("generate_message_bundles", |b| {
Expand Down
4 changes: 2 additions & 2 deletions crates/re_log_encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,14 @@ impl<R: std::io::Read> Iterator for Decoder<R> {
fn test_encode_decode() {
use re_log_types::{
ApplicationId, BeginRecordingMsg, LogMsg, RecordingId, RecordingInfo, RecordingSource,
RowId, Time,
RecordingType, RowId, Time,
};

let messages = vec![LogMsg::BeginRecordingMsg(BeginRecordingMsg {
row_id: RowId::random(),
info: RecordingInfo {
application_id: ApplicationId("test".to_owned()),
recording_id: RecordingId::random(),
recording_id: RecordingId::random(RecordingType::Data),
is_official_example: true,
started: Time::now(),
recording_source: RecordingSource::RustSdk {
Expand Down
102 changes: 47 additions & 55 deletions crates/re_log_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,78 +89,70 @@ macro_rules! impl_into_enum {

// ----------------------------------------------------------------------------

/// A unique id per recording (a stream of [`LogMsg`]es).
#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
/// What type of `Recording` this is.
///
/// `Data` recordings contain user-data logged via `log_` API calls.
/// `Blueprint` recordings describe how that data is laid out.
///
/// Both of these types can go over the same stream and be stored in the
/// same datastore, but the viewer wants to treat them very differently.
#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub struct RecordingId(Arc<String>);

impl Default for RecordingId {
fn default() -> Self {
Self::unknown()
}
pub enum RecordingType {
Data,
Blueprint,
}

impl RecordingId {
impl std::fmt::Display for RecordingType {
#[inline]
pub fn unknown() -> Self {
"UNKNOWN".into()
}

#[inline]
pub fn random() -> Self {
Self(Arc::new(uuid::Uuid::new_v4().to_string()))
}

#[inline]
pub fn from_uuid(uuid: uuid::Uuid) -> Self {
Self(Arc::new(uuid.to_string()))
}
}

impl std::fmt::Display for RecordingId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}

impl From<&str> for RecordingId {
fn from(s: &str) -> Self {
Self(Arc::new(s.to_owned()))
match self {
Self::Data => "Data".fmt(f),
Self::Blueprint => "Blueprint".fmt(f),
}
}
}

impl From<String> for RecordingId {
fn from(s: String) -> Self {
Self(Arc::new(s))
}
/// A unique id per recording (a stream of [`LogMsg`]es).
#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
pub struct RecordingId {
variant: RecordingType,
id: Arc<String>,
}

impl RecordingId {
#[inline]
pub fn as_str(&self) -> &str {
self.0.as_str()
pub fn random(variant: RecordingType) -> Self {
Self {
variant,
id: Arc::new(uuid::Uuid::new_v4().to_string()),
}
}
}

impl AsRef<str> for RecordingId {
#[inline]
fn as_ref(&self) -> &str {
self.as_str()
pub fn from_uuid(variant: RecordingType, uuid: uuid::Uuid) -> Self {
Self {
variant,
id: Arc::new(uuid.to_string()),
}
}
}

impl std::borrow::Borrow<str> for RecordingId {
#[inline]
fn borrow(&self) -> &str {
self.as_str()
pub fn from_string(variant: RecordingType, str: String) -> Self {
Self {
variant,
id: Arc::new(str),
}
}
}

impl std::ops::Deref for RecordingId {
type Target = str;
impl std::fmt::Display for RecordingId {
#[inline]
fn deref(&self) -> &str {
self.as_str()
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self { variant, id } = self;
f.write_fmt(format_args!("{variant}:{id}"))?;
Ok(())
}
}

Expand Down Expand Up @@ -221,17 +213,17 @@ pub enum LogMsg {
ArrowMsg(RecordingId, ArrowMsg),

/// Sent when the client shuts down the connection.
Goodbye(RowId),
Goodbye(RecordingId, RowId),
}

impl LogMsg {
pub fn recording_id(&self) -> Option<&RecordingId> {
pub fn recording_id(&self) -> &RecordingId {
match self {
Self::BeginRecordingMsg(msg) => Some(&msg.info.recording_id),
Self::BeginRecordingMsg(msg) => &msg.info.recording_id,
Self::EntityPathOpMsg(recording_id, _) | Self::ArrowMsg(recording_id, _) => {
Some(recording_id)
recording_id
}
Self::Goodbye(_) => None,
Self::Goodbye(recording_id, _) => recording_id,
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/re_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod recording_stream;
pub use self::msg_sender::{MsgSender, MsgSenderError};
pub use self::recording_stream::{RecordingStream, RecordingStreamBuilder};

use re_log_types::RecordingType;
pub use re_sdk_comms::default_server_addr;

pub use re_log_types::{
Expand Down Expand Up @@ -155,7 +156,7 @@ pub fn new_recording_info(
) -> re_log_types::RecordingInfo {
re_log_types::RecordingInfo {
application_id: application_id.into(),
recording_id: RecordingId::random(),
recording_id: RecordingId::random(RecordingType::Data),
is_official_example: called_from_official_rust_example(),
started: re_log_types::Time::now(),
recording_source: re_log_types::RecordingSource::RustSdk {
Expand Down
31 changes: 27 additions & 4 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use ahash::HashMap;
use crossbeam::channel::{Receiver, Sender};
use re_log_types::{
ApplicationId, DataRow, DataTable, DataTableBatcher, DataTableBatcherConfig,
DataTableBatcherError, LogMsg, RecordingId, RecordingInfo, RecordingSource, Time, TimeInt,
TimePoint, TimeType, Timeline, TimelineName,
DataTableBatcherError, LogMsg, RecordingId, RecordingInfo, RecordingSource, RecordingType,
RowId, Time, TimeInt, TimePoint, TimeType, Timeline, TimelineName,
};

use crate::sink::{LogSink, MemorySinkStorage};
Expand Down Expand Up @@ -44,6 +44,7 @@ pub type RecordingStreamResult<T> = Result<T, RecordingStreamError>;
/// ```
pub struct RecordingStreamBuilder {
application_id: ApplicationId,
recording_type: RecordingType,
recording_id: Option<RecordingId>,
recording_source: Option<RecordingSource>,

Expand Down Expand Up @@ -74,6 +75,7 @@ impl RecordingStreamBuilder {

Self {
application_id,
recording_type: RecordingType::Data,
recording_id: None,
recording_source: None,

Expand Down Expand Up @@ -246,6 +248,7 @@ impl RecordingStreamBuilder {
pub fn into_args(self) -> (bool, RecordingInfo, DataTableBatcherConfig) {
let Self {
application_id,
recording_type,
recording_id,
recording_source,
default_enabled,
Expand All @@ -255,7 +258,7 @@ impl RecordingStreamBuilder {
} = self;

let enabled = enabled.unwrap_or_else(|| crate::decide_logging_enabled(default_enabled));
let recording_id = recording_id.unwrap_or_else(RecordingId::random);
let recording_id = recording_id.unwrap_or(RecordingId::random(recording_type));
let recording_source = recording_source.unwrap_or_else(|| RecordingSource::RustSdk {
rustc_version: env!("RE_BUILD_RUSTC_VERSION").into(),
llvm_version: env!("RE_BUILD_LLVM_VERSION").into(),
Expand Down Expand Up @@ -336,6 +339,13 @@ impl Drop for RecordingStreamInner {
// sending data down the pipeline.
self.batcher.flush_blocking();
self.cmds_tx.send(Command::PopPendingTables).ok();
// Announce we're gracefully leaving to the other end.
self.cmds_tx
.send(Command::RecordMsg(LogMsg::Goodbye(
self.info.recording_id.clone(),
RowId::random(),
)))
.ok();
self.cmds_tx.send(Command::Shutdown).ok();
if let Some(handle) = self.batcher_to_sink_handle.take() {
handle.join().ok();
Expand Down Expand Up @@ -762,12 +772,25 @@ impl RecordingStream {
Ok(())
}

/// Swaps the underlying sink for a [`crate::sink::BufferedSink`].
/// Swaps the underlying sink for a [`crate::sink::BufferedSink`], making sure to first send a
/// `Goodbye` message down the sink to let the other end know of the graceful disconnection.
///
/// This is a convenience wrapper for [`Self::set_sink`] that upholds the same guarantees in
/// terms of data durability and ordering.
/// See [`Self::set_sink`] for more information.
pub fn disconnect(&self) {
let Some(this) = &*self.inner else {
re_log::warn_once!("Recording disabled - call to disconnect() ignored");
return;
};

this.cmds_tx
.send(Command::RecordMsg(LogMsg::Goodbye(
this.info.recording_id.clone(),
RowId::random(),
)))
.ok();

self.set_sink(Box::new(crate::sink::BufferedSink::new()));
}
}
Expand Down
Loading