Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Apr 26, 2023
1 parent eba29b7 commit 36dc901
Show file tree
Hide file tree
Showing 17 changed files with 993 additions and 541 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/re_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ re_log.workspace = true
re_memory.workspace = true
re_sdk_comms = { workspace = true, features = ["client"] }

crossbeam.workspace = true
document-features = "0.2"
parking_lot.workspace = true
thiserror.workspace = true
Expand Down
23 changes: 0 additions & 23 deletions crates/re_sdk/src/global.rs

This file was deleted.

10 changes: 2 additions & 8 deletions crates/re_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,15 @@
// ----------------
// Private modules:

#[cfg(feature = "global_session")]
mod global;

mod log_sink;
mod msg_sender;
mod session;
mod recording_context;

// -------------
// Public items:

#[cfg(feature = "global_session")]
pub use self::global::global_session;

pub use self::msg_sender::{MsgSender, MsgSenderError};
pub use self::session::{Session, SessionBuilder};
pub use self::recording_context::{RecordingContext, RecordingContextBuilder};

pub use re_sdk_comms::default_server_addr;

Expand Down
60 changes: 53 additions & 7 deletions crates/re_sdk/src/log_sink.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::sync::Arc;

use parking_lot::RwLock;
use re_log_types::LogMsg;

/// Where the SDK sends its log messages.
Expand All @@ -6,71 +9,93 @@ pub trait LogSink: Send + Sync + 'static {
fn send(&self, msg: LogMsg);

/// Send all these log messages.
#[inline]
fn send_all(&self, messages: Vec<LogMsg>) {
for msg in messages {
self.send(msg);
}
}

// TODO: this needs to go
// TODO: rename this
/// Drain all buffered [`LogMsg`]es and return them.
#[inline]
fn drain_backlog(&self) -> Vec<LogMsg> {
vec![]
}

// TODO: flush_blocking?
/// Wait until all logged data have been sent to the remove server (if any).
#[inline]
fn flush(&self) {}

// TODO: that's very leaky and weird?
// TODO: rename this
/// If the TCP session is disconnected, allow it to quit early and drop unsent messages.
#[inline]
fn drop_msgs_if_disconnected(&self) {}

// TODO: this has to go
/// Returns `false` if this sink just discards all messages.
#[inline]
fn is_enabled(&self) -> bool {
true
}
}

// ----------------------------------------------------------------------------

// TODO: this has to go

struct DisabledSink;

impl LogSink for DisabledSink {
#[inline]
fn send(&self, _msg: LogMsg) {
// It's intended that the logging SDK should drop messages earlier than this if logging is disabled.
re_log::debug_once!("Logging is disabled, dropping message(s).");
}

#[inline]
fn is_enabled(&self) -> bool {
false
}
}

/// A sink that does nothing. All log messages are just dropped.
#[inline]
pub fn disabled() -> Box<dyn LogSink> {
Box::new(DisabledSink)
}

// ----------------------------------------------------------------------------

// TODO: this has to go

/// Store log messages in memory until you call [`LogSink::drain_backlog`].
#[derive(Default)]
pub struct BufferedSink(parking_lot::Mutex<Vec<LogMsg>>);

impl BufferedSink {
/// An empty buffer.
#[inline]
pub fn new() -> Self {
Self::default()
}
}

impl LogSink for BufferedSink {
#[inline]
fn send(&self, msg: LogMsg) {
self.0.lock().push(msg);
}

#[inline]
fn send_all(&self, mut messages: Vec<LogMsg>) {
self.0.lock().append(&mut messages);
}

#[inline]
fn drain_backlog(&self) -> Vec<LogMsg> {
std::mem::take(&mut self.0.lock())
}
Expand All @@ -88,35 +113,52 @@ pub struct MemorySink(MemorySinkStorage);

impl MemorySink {
/// Access the raw `MemorySinkStorage`
#[inline]
pub fn buffer(&self) -> MemorySinkStorage {
self.0.clone()
}
}

impl LogSink for MemorySink {
#[inline]
fn send(&self, msg: LogMsg) {
self.0.lock().push(msg);
self.0.write().push(msg);
}

#[inline]
fn send_all(&self, mut messages: Vec<LogMsg>) {
self.0.lock().append(&mut messages);
self.0.write().append(&mut messages);
}
}

/// The storage used by [`MemorySink`]
#[derive(Default, Clone)]
pub struct MemorySinkStorage(std::sync::Arc<parking_lot::Mutex<Vec<LogMsg>>>);
pub struct MemorySinkStorage(Arc<RwLock<Vec<LogMsg>>>);

///
impl MemorySinkStorage {
/// Lock the contained buffer
fn lock(&self) -> parking_lot::MutexGuard<'_, Vec<LogMsg>> {
self.0.lock()
#[inline]
fn write(&self) -> parking_lot::RwLockWriteGuard<'_, Vec<LogMsg>> {
self.0.write()
}

// TODO
#[inline]
pub fn read(&self) -> parking_lot::RwLockReadGuard<'_, Vec<LogMsg>> {
self.0.read()
}

// TODO
#[inline]
pub fn take(&self) -> Vec<LogMsg> {
std::mem::take(&mut *self.0.write())
}

// TODO: that sounds like the greatest opportunity ever to do a perfect batch then?
/// Convert the stored messages into an in-memory Rerun log file
#[inline]
pub fn rrd_as_bytes(&self) -> Result<Vec<u8>, re_log_encoding::encoder::EncodeError> {
let messages = self.lock();
let messages = self.write();
let mut buffer = std::io::Cursor::new(Vec::new());
re_log_encoding::encoder::encode(messages.iter(), &mut buffer)?;
Ok(buffer.into_inner())
Expand All @@ -133,6 +175,7 @@ pub struct TcpSink {
impl TcpSink {
/// Connect to the given address in a background thread.
/// Retries until successful.
#[inline]
pub fn new(addr: std::net::SocketAddr) -> Self {
Self {
client: re_sdk_comms::Client::new(addr),
Expand All @@ -141,14 +184,17 @@ impl TcpSink {
}

impl LogSink for TcpSink {
#[inline]
fn send(&self, msg: LogMsg) {
self.client.send(msg);
}

#[inline]
fn flush(&self) {
self.client.flush();
}

#[inline]
fn drop_msgs_if_disconnected(&self) {
self.client.drop_if_disconnected();
}
Expand Down
43 changes: 13 additions & 30 deletions crates/re_sdk/src/msg_sender.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use std::borrow::Borrow;

use re_log_types::{component_types::InstanceKey, DataRow, DataTableError, RecordingId, RowId};
use re_log_types::{component_types::InstanceKey, DataRow, DataTableError, RowId};

use crate::{
components::Transform,
log::{DataCell, LogMsg},
sink::LogSink,
log::DataCell,
time::{Time, TimeInt, TimePoint, Timeline},
Component, EntityPath, SerializableComponent, Session,
Component, EntityPath, RecordingContext, SerializableComponent,
};

// TODO(#1619): Rust SDK batching

// ---

// TODO: effectively this is nothing but a rowbuilder? except sometimes it creates table due to
// limitations..

/// Errors that can occur when constructing or sending messages
/// using [`MsgSender`].
#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -52,6 +52,7 @@ pub enum MsgSenderError {
/// ```
// TODO(#1619): this whole thing needs to be rethought to incorporate batching and datatables.
pub struct MsgSender {
// TODO
// TODO(cmc): At the moment, a `MsgBundle` can only contain data for a single entity, so
// this must be known as soon as we spawn the builder.
// This won't be true anymore once batch insertions land.
Expand Down Expand Up @@ -231,42 +232,24 @@ impl MsgSender {

/// Consumes, packs, sanity checks and finally sends the message to the currently configured
/// target of the SDK.
pub fn send(self, session: &Session) -> Result<(), DataTableError> {
self.send_to_sink(session.recording_id(), session.borrow())
}

/// Consumes, packs, sanity checks and finally sends the message to the currently configured
/// target of the SDK.
fn send_to_sink(
self,
recording_id: RecordingId,
sink: &dyn LogSink,
) -> Result<(), DataTableError> {
if !sink.is_enabled() {
// TODO: wtf is this a DataTableError?
pub fn send(self, rec_ctx: &RecordingContext) -> Result<(), DataTableError> {
if !rec_ctx.is_enabled() {
return Ok(()); // silently drop the message
}

let [row_standard, row_transforms, row_splats] = self.into_rows();

if let Some(row_transforms) = row_transforms {
sink.send(LogMsg::ArrowMsg(
recording_id,
row_transforms.into_table().to_arrow_msg()?,
));
rec_ctx.record_row(row_transforms);
}
if let Some(row_splats) = row_splats {
sink.send(LogMsg::ArrowMsg(
recording_id,
row_splats.into_table().to_arrow_msg()?,
));
rec_ctx.record_row(row_splats);
}
// Always the primary component last so range-based queries will include the other data.
// Since the primary component can't be splatted it must be in msg_standard, see(#1215).
if let Some(row_standard) = row_standard {
sink.send(LogMsg::ArrowMsg(
recording_id,
row_standard.into_table().to_arrow_msg()?,
));
rec_ctx.record_row(row_standard);
}

Ok(())
Expand Down
Loading

0 comments on commit 36dc901

Please sign in to comment.