Skip to content

Commit

Permalink
introduce RecordingStream
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Apr 27, 2023
1 parent 67dc616 commit 14130c5
Show file tree
Hide file tree
Showing 19 changed files with 1,031 additions and 604 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/re_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ re_log.workspace = true
re_memory.workspace = true
re_sdk_comms = { workspace = true, features = ["client"] }

crossbeam.workspace = true
document-features = "0.2"
parking_lot.workspace = true
thiserror.workspace = true
Expand Down
23 changes: 0 additions & 23 deletions crates/re_sdk/src/global.rs

This file was deleted.

16 changes: 4 additions & 12 deletions crates/re_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,15 @@
// ----------------
// Private modules:

#[cfg(feature = "global_session")]
mod global;

mod log_sink;
mod msg_sender;
mod session;
mod recording_stream;

// -------------
// Public items:

#[cfg(feature = "global_session")]
pub use self::global::global_session;

pub use self::msg_sender::{MsgSender, MsgSenderError};
pub use self::session::{Session, SessionBuilder};
pub use self::recording_stream::{RecordingStream, RecordingStreamBuilder};

pub use re_sdk_comms::default_server_addr;

Expand All @@ -49,9 +43,7 @@ pub mod demo_util;
/// This is how you select whether the log stream ends up
/// sent over TCP, written to file, etc.
pub mod sink {
pub use crate::log_sink::{
disabled, BufferedSink, LogSink, MemorySink, MemorySinkStorage, TcpSink,
};
pub use crate::log_sink::{BufferedSink, LogSink, MemorySink, MemorySinkStorage, TcpSink};

#[cfg(not(target_arch = "wasm32"))]
pub use re_log_encoding::{FileSink, FileSinkError};
Expand Down Expand Up @@ -153,7 +145,7 @@ pub fn decide_logging_enabled(default_enabled: bool) -> bool {

// ----------------------------------------------------------------------------

/// Creates a new [`re_log_types::RecordingInfo`] which can be used with [`Session::new`].
/// Creates a new [`re_log_types::RecordingInfo`] which can be used with [`RecordingStream::new`].
#[track_caller] // track_caller so that we can see if we are being called from an official example.
pub fn new_recording_info(
application_id: impl Into<re_log_types::ApplicationId>,
Expand Down
96 changes: 53 additions & 43 deletions crates/re_sdk/src/log_sink.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::sync::Arc;

use parking_lot::RwLock;
use re_log_types::LogMsg;

/// Where the SDK sends its log messages.
Expand All @@ -6,47 +9,31 @@ pub trait LogSink: Send + Sync + 'static {
fn send(&self, msg: LogMsg);

/// Send all these log messages.
#[inline]
fn send_all(&self, messages: Vec<LogMsg>) {
for msg in messages {
self.send(msg);
}
}

/// Drain all buffered [`LogMsg`]es and return them.
///
/// Only applies to sinks that maintain a backlog.
#[inline]
fn drain_backlog(&self) -> Vec<LogMsg> {
vec![]
}

/// Wait until all logged data have been sent to the remove server (if any).
fn flush(&self) {}

/// If the TCP session is disconnected, allow it to quit early and drop unsent messages.
fn drop_msgs_if_disconnected(&self) {}

/// Returns `false` if this sink just discards all messages.
fn is_enabled(&self) -> bool {
true
}
}

// ----------------------------------------------------------------------------

struct DisabledSink;

impl LogSink for DisabledSink {
fn send(&self, _msg: LogMsg) {
// It's intended that the logging SDK should drop messages earlier than this if logging is disabled.
re_log::debug_once!("Logging is disabled, dropping message(s).");
}

fn is_enabled(&self) -> bool {
false
}
}
/// Blocks until all pending data in the sink's send buffers has been fully flushed.
///
/// See also [`LogSink::drop_if_disconnected`].
#[inline]
fn flush_blocking(&self) {}

/// A sink that does nothing. All log messages are just dropped.
pub fn disabled() -> Box<dyn LogSink> {
Box::new(DisabledSink)
/// Drops all pending data currently sitting in the sink's send buffers if it is unable to
/// flush it for any reason (e.g. a broken TCP connection for a [`TcpSink`]).
#[inline]
fn drop_if_disconnected(&self) {}
}

// ----------------------------------------------------------------------------
Expand All @@ -57,26 +44,30 @@ pub struct BufferedSink(parking_lot::Mutex<Vec<LogMsg>>);

impl BufferedSink {
/// An empty buffer.
#[inline]
pub fn new() -> Self {
Self::default()
}
}

impl LogSink for BufferedSink {
#[inline]
fn send(&self, msg: LogMsg) {
self.0.lock().push(msg);
}

#[inline]
fn send_all(&self, mut messages: Vec<LogMsg>) {
self.0.lock().append(&mut messages);
}

#[inline]
fn drain_backlog(&self) -> Vec<LogMsg> {
std::mem::take(&mut self.0.lock())
}
}

/// Store log messages directly in memory
/// Store log messages directly in memory.
///
/// Although very similar to `BufferedSink` this sink is a real-endpoint. When creating
/// a new sink the logged messages stay with the `MemorySink` (`drain_backlog` does nothing).
Expand All @@ -88,37 +79,52 @@ pub struct MemorySink(MemorySinkStorage);

impl MemorySink {
/// Access the raw `MemorySinkStorage`
#[inline]
pub fn buffer(&self) -> MemorySinkStorage {
self.0.clone()
}
}

impl LogSink for MemorySink {
#[inline]
fn send(&self, msg: LogMsg) {
self.0.lock().push(msg);
self.0.write().push(msg);
}

#[inline]
fn send_all(&self, mut messages: Vec<LogMsg>) {
self.0.lock().append(&mut messages);
self.0.write().append(&mut messages);
}
}

/// The storage used by [`MemorySink`]
/// The storage used by [`MemorySink`].
#[derive(Default, Clone)]
pub struct MemorySinkStorage(std::sync::Arc<parking_lot::Mutex<Vec<LogMsg>>>);
pub struct MemorySinkStorage(Arc<RwLock<Vec<LogMsg>>>);

///
impl MemorySinkStorage {
/// Lock the contained buffer
fn lock(&self) -> parking_lot::MutexGuard<'_, Vec<LogMsg>> {
self.0.lock()
/// Write access to the inner array of [`LogMsg`].
#[inline]
fn write(&self) -> parking_lot::RwLockWriteGuard<'_, Vec<LogMsg>> {
self.0.write()
}

/// Read access to the inner array of [`LogMsg`].
#[inline]
pub fn read(&self) -> parking_lot::RwLockReadGuard<'_, Vec<LogMsg>> {
self.0.read()
}

/// Consumes and returns the inner array of [`LogMsg`].
#[inline]
pub fn take(&self) -> Vec<LogMsg> {
std::mem::take(&mut *self.0.write())
}

/// Convert the stored messages into an in-memory Rerun log file
/// Convert the stored messages into an in-memory Rerun log file.
#[inline]
pub fn rrd_as_bytes(&self) -> Result<Vec<u8>, re_log_encoding::encoder::EncodeError> {
let messages = self.lock();
let mut buffer = std::io::Cursor::new(Vec::new());
re_log_encoding::encoder::encode(messages.iter(), &mut buffer)?;
re_log_encoding::encoder::encode(self.read().iter(), &mut buffer)?;
Ok(buffer.into_inner())
}
}
Expand All @@ -133,6 +139,7 @@ pub struct TcpSink {
impl TcpSink {
/// Connect to the given address in a background thread.
/// Retries until successful.
#[inline]
pub fn new(addr: std::net::SocketAddr) -> Self {
Self {
client: re_sdk_comms::Client::new(addr),
Expand All @@ -141,15 +148,18 @@ impl TcpSink {
}

impl LogSink for TcpSink {
#[inline]
fn send(&self, msg: LogMsg) {
self.client.send(msg);
}

fn flush(&self) {
#[inline]
fn flush_blocking(&self) {
self.client.flush();
}

fn drop_msgs_if_disconnected(&self) {
#[inline]
fn drop_if_disconnected(&self) {
self.client.drop_if_disconnected();
}
}
47 changes: 15 additions & 32 deletions crates/re_sdk/src/msg_sender.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use std::borrow::Borrow;

use re_log_types::{component_types::InstanceKey, DataRow, DataTableError, RecordingId, RowId};
use re_log_types::{component_types::InstanceKey, DataRow, DataTableError, RowId};

use crate::{
components::Transform,
log::{DataCell, LogMsg},
sink::LogSink,
log::DataCell,
time::{Time, TimeInt, TimePoint, Timeline},
Component, EntityPath, SerializableComponent, Session,
Component, EntityPath, RecordingStream, SerializableComponent,
};

// TODO(#1619): Rust SDK batching

// ---

// TODO: effectively this is nothing but a rowbuilder? except sometimes it creates table due to
// limitations..

/// Errors that can occur when constructing or sending messages
/// using [`MsgSender`].
#[derive(thiserror::Error, Debug)]
Expand All @@ -35,7 +35,7 @@ pub enum MsgSenderError {
///
/// ```ignore
/// fn log_coordinate_space(
/// session: &Session,
/// rec_stream: &RecordingStream,
/// ent_path: impl Into<EntityPath>,
/// axes: &str,
/// ) -> anyhow::Result<()> {
Expand All @@ -46,12 +46,13 @@ pub enum MsgSenderError {
/// MsgSender::new(ent_path)
/// .with_timeless(true)
/// .with_component(&[view_coords])?
/// .send(session)
/// .send(rec_stream)
/// .map_err(Into::into)
/// }
/// ```
// TODO(#1619): this whole thing needs to be rethought to incorporate batching and datatables.
pub struct MsgSender {
// TODO
// TODO(cmc): At the moment, a `MsgBundle` can only contain data for a single entity, so
// this must be known as soon as we spawn the builder.
// This won't be true anymore once batch insertions land.
Expand Down Expand Up @@ -231,42 +232,24 @@ impl MsgSender {

/// Consumes, packs, sanity checks and finally sends the message to the currently configured
/// target of the SDK.
pub fn send(self, session: &Session) -> Result<(), DataTableError> {
self.send_to_sink(session.recording_id(), session.borrow())
}

/// Consumes, packs, sanity checks and finally sends the message to the currently configured
/// target of the SDK.
fn send_to_sink(
self,
recording_id: RecordingId,
sink: &dyn LogSink,
) -> Result<(), DataTableError> {
if !sink.is_enabled() {
// TODO: wtf is this a DataTableError?
pub fn send(self, rec_stream: &RecordingStream) -> Result<(), DataTableError> {
if !rec_stream.is_enabled() {
return Ok(()); // silently drop the message
}

let [row_standard, row_transforms, row_splats] = self.into_rows();

if let Some(row_transforms) = row_transforms {
sink.send(LogMsg::ArrowMsg(
recording_id,
row_transforms.into_table().to_arrow_msg()?,
));
rec_stream.record_row(row_transforms);
}
if let Some(row_splats) = row_splats {
sink.send(LogMsg::ArrowMsg(
recording_id,
row_splats.into_table().to_arrow_msg()?,
));
rec_stream.record_row(row_splats);
}
// Always the primary component last so range-based queries will include the other data.
// Since the primary component can't be splatted it must be in msg_standard, see(#1215).
if let Some(row_standard) = row_standard {
sink.send(LogMsg::ArrowMsg(
recording_id,
row_standard.into_table().to_arrow_msg()?,
));
rec_stream.record_row(row_standard);
}

Ok(())
Expand Down
Loading

0 comments on commit 14130c5

Please sign in to comment.