Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Apr 27, 2023
1 parent eba29b7 commit a3ed552
Show file tree
Hide file tree
Showing 20 changed files with 1,037 additions and 611 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions crates/re_log_types/src/data_table_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,16 @@ fn data_table_batcher_config() {
// ---

/// Implements an asynchronous batcher that coalesces [`DataRow`]s into [`DataTable`]s based upon
/// the threshold defined in the associated [`DataTableBatcherConfig`].
/// the thresholds defined in the associated [`DataTableBatcherConfig`].
///
/// ## Multithreading and ordering
///
/// `DataTableBatcher` can be cheaply clone and used freely across any number of threads.
/// [`DataTableBatcher`] can be cheaply clone and used freely across any number of threads.
///
/// Internally, all operations are linearized into a pipeline:
/// - All operations sent by a given thread will take effect in the same exact order as that
/// thread originally sent them in from its point of view.
/// - There is no defined global ordering across multiple threads.
/// thread originally sent them in, from its point of view.
/// - There isn't any well defined global order across multiple threads.
///
/// This means that e.g. flushing the pipeline ([`Self::flush_blocking`]) guarantees that all
/// previous data sent by the calling thread has been batched; no more, no less.
Expand Down Expand Up @@ -242,7 +242,7 @@ impl Command {
}

impl DataTableBatcher {
/// Creates a new `DataTableBatcher` using the passed in `config`.
/// Creates a new [`DataTableBatcher`] using the passed in `config`.
///
/// The returned object must be kept in scope: dropping it will trigger a clean shutdown of the
/// batcher.
Expand Down Expand Up @@ -292,7 +292,7 @@ impl DataTableBatcher {
///
/// This will call [`DataRow::compute_all_size_bytes`] from the batching thread!
///
/// See `DataTableBatcher` docs for ordering semantics and multithreading guarantees.
/// See [`DataTableBatcher`] docs for ordering semantics and multithreading guarantees.
#[inline]
pub fn push_row(&self, row: DataRow) {
self.inner.push_row(row);
Expand All @@ -301,15 +301,15 @@ impl DataTableBatcher {
/// Initiates a flush of the pipeline and returns immediately.
///
/// This does **not** wait for the flush to propagate (see [`Self::flush_blocking`]).
/// See `DataTableBatcher` docs for ordering semantics and multithreading guarantees.
/// See [`DataTableBatcher`] docs for ordering semantics and multithreading guarantees.
#[inline]
pub fn flush_async(&self) {
self.inner.flush_async();
}

/// Initiates a flush the batching pipeline and waits for it to propagate.
///
/// See `DataTableBatcher` docs for ordering semantics and multithreading guarantees.
/// See [`DataTableBatcher`] docs for ordering semantics and multithreading guarantees.
#[inline]
pub fn flush_blocking(&self) {
self.inner.flush_blocking();
Expand All @@ -321,7 +321,7 @@ impl DataTableBatcher {
///
/// Shutting down the batcher will close this channel.
///
/// See `DataTableBatcher` docs for ordering semantics and multithreading guarantees.
/// See [`DataTableBatcher`] docs for ordering semantics and multithreading guarantees.
pub fn tables(&self) -> Receiver<DataTable> {
self.inner.rx_tables.clone()
}
Expand Down
1 change: 1 addition & 0 deletions crates/re_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ re_log.workspace = true
re_memory.workspace = true
re_sdk_comms = { workspace = true, features = ["client"] }

crossbeam.workspace = true
document-features = "0.2"
parking_lot.workspace = true
thiserror.workspace = true
Expand Down
23 changes: 0 additions & 23 deletions crates/re_sdk/src/global.rs

This file was deleted.

16 changes: 4 additions & 12 deletions crates/re_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,15 @@
// ----------------
// Private modules:

#[cfg(feature = "global_session")]
mod global;

mod log_sink;
mod msg_sender;
mod session;
mod recording_context;

// -------------
// Public items:

#[cfg(feature = "global_session")]
pub use self::global::global_session;

pub use self::msg_sender::{MsgSender, MsgSenderError};
pub use self::session::{Session, SessionBuilder};
pub use self::recording_context::{RecordingContext, RecordingContextBuilder};

pub use re_sdk_comms::default_server_addr;

Expand All @@ -49,9 +43,7 @@ pub mod demo_util;
/// This is how you select whether the log stream ends up
/// sent over TCP, written to file, etc.
pub mod sink {
pub use crate::log_sink::{
disabled, BufferedSink, LogSink, MemorySink, MemorySinkStorage, TcpSink,
};
pub use crate::log_sink::{BufferedSink, LogSink, MemorySink, MemorySinkStorage, TcpSink};

#[cfg(not(target_arch = "wasm32"))]
pub use re_log_encoding::{FileSink, FileSinkError};
Expand Down Expand Up @@ -153,7 +145,7 @@ pub fn decide_logging_enabled(default_enabled: bool) -> bool {

// ----------------------------------------------------------------------------

/// Creates a new [`re_log_types::RecordingInfo`] which can be used with [`Session::new`].
/// Creates a new [`re_log_types::RecordingInfo`] which can be used with [`RecordingContext::new`].
#[track_caller] // track_caller so that we can see if we are being called from an official example.
pub fn new_recording_info(
application_id: impl Into<re_log_types::ApplicationId>,
Expand Down
96 changes: 53 additions & 43 deletions crates/re_sdk/src/log_sink.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::sync::Arc;

use parking_lot::RwLock;
use re_log_types::LogMsg;

/// Where the SDK sends its log messages.
Expand All @@ -6,47 +9,31 @@ pub trait LogSink: Send + Sync + 'static {
fn send(&self, msg: LogMsg);

/// Send all these log messages.
#[inline]
fn send_all(&self, messages: Vec<LogMsg>) {
for msg in messages {
self.send(msg);
}
}

/// Drain all buffered [`LogMsg`]es and return them.
///
/// Only applies to sinks that maintain a backlog.
#[inline]
fn drain_backlog(&self) -> Vec<LogMsg> {
vec![]
}

/// Wait until all logged data have been sent to the remove server (if any).
fn flush(&self) {}

/// If the TCP session is disconnected, allow it to quit early and drop unsent messages.
fn drop_msgs_if_disconnected(&self) {}

/// Returns `false` if this sink just discards all messages.
fn is_enabled(&self) -> bool {
true
}
}

// ----------------------------------------------------------------------------

struct DisabledSink;

impl LogSink for DisabledSink {
fn send(&self, _msg: LogMsg) {
// It's intended that the logging SDK should drop messages earlier than this if logging is disabled.
re_log::debug_once!("Logging is disabled, dropping message(s).");
}

fn is_enabled(&self) -> bool {
false
}
}
/// Blocks until all pending data in the sink's send buffers has been fully flushed.
///
/// See also [`LogSink::drop_if_disconnected`].
#[inline]
fn flush_blocking(&self) {}

/// A sink that does nothing. All log messages are just dropped.
pub fn disabled() -> Box<dyn LogSink> {
Box::new(DisabledSink)
/// Drops all pending data currently sitting in the sink's send buffers if it is unable to
/// flush it for any reason (e.g. a broken TCP connection for a [`TcpSink`]).
#[inline]
fn drop_if_disconnected(&self) {}
}

// ----------------------------------------------------------------------------
Expand All @@ -57,26 +44,30 @@ pub struct BufferedSink(parking_lot::Mutex<Vec<LogMsg>>);

impl BufferedSink {
/// An empty buffer.
#[inline]
pub fn new() -> Self {
Self::default()
}
}

impl LogSink for BufferedSink {
#[inline]
fn send(&self, msg: LogMsg) {
self.0.lock().push(msg);
}

#[inline]
fn send_all(&self, mut messages: Vec<LogMsg>) {
self.0.lock().append(&mut messages);
}

#[inline]
fn drain_backlog(&self) -> Vec<LogMsg> {
std::mem::take(&mut self.0.lock())
}
}

/// Store log messages directly in memory
/// Store log messages directly in memory.
///
/// Although very similar to `BufferedSink` this sink is a real-endpoint. When creating
/// a new sink the logged messages stay with the `MemorySink` (`drain_backlog` does nothing).
Expand All @@ -88,37 +79,52 @@ pub struct MemorySink(MemorySinkStorage);

impl MemorySink {
/// Access the raw `MemorySinkStorage`
#[inline]
pub fn buffer(&self) -> MemorySinkStorage {
self.0.clone()
}
}

impl LogSink for MemorySink {
#[inline]
fn send(&self, msg: LogMsg) {
self.0.lock().push(msg);
self.0.write().push(msg);
}

#[inline]
fn send_all(&self, mut messages: Vec<LogMsg>) {
self.0.lock().append(&mut messages);
self.0.write().append(&mut messages);
}
}

/// The storage used by [`MemorySink`]
/// The storage used by [`MemorySink`].
#[derive(Default, Clone)]
pub struct MemorySinkStorage(std::sync::Arc<parking_lot::Mutex<Vec<LogMsg>>>);
pub struct MemorySinkStorage(Arc<RwLock<Vec<LogMsg>>>);

///
impl MemorySinkStorage {
/// Lock the contained buffer
fn lock(&self) -> parking_lot::MutexGuard<'_, Vec<LogMsg>> {
self.0.lock()
/// Write access to the inner array of [`LogMsg`].
#[inline]
fn write(&self) -> parking_lot::RwLockWriteGuard<'_, Vec<LogMsg>> {
self.0.write()
}

/// Read access to the inner array of [`LogMsg`].
#[inline]
pub fn read(&self) -> parking_lot::RwLockReadGuard<'_, Vec<LogMsg>> {
self.0.read()
}

/// Consumes and returns the inner array of [`LogMsg`].
#[inline]
pub fn take(&self) -> Vec<LogMsg> {
std::mem::take(&mut *self.0.write())
}

/// Convert the stored messages into an in-memory Rerun log file
/// Convert the stored messages into an in-memory Rerun log file.
#[inline]
pub fn rrd_as_bytes(&self) -> Result<Vec<u8>, re_log_encoding::encoder::EncodeError> {
let messages = self.lock();
let mut buffer = std::io::Cursor::new(Vec::new());
re_log_encoding::encoder::encode(messages.iter(), &mut buffer)?;
re_log_encoding::encoder::encode(self.read().iter(), &mut buffer)?;
Ok(buffer.into_inner())
}
}
Expand All @@ -133,6 +139,7 @@ pub struct TcpSink {
impl TcpSink {
/// Connect to the given address in a background thread.
/// Retries until successful.
#[inline]
pub fn new(addr: std::net::SocketAddr) -> Self {
Self {
client: re_sdk_comms::Client::new(addr),
Expand All @@ -141,15 +148,18 @@ impl TcpSink {
}

impl LogSink for TcpSink {
#[inline]
fn send(&self, msg: LogMsg) {
self.client.send(msg);
}

fn flush(&self) {
#[inline]
fn flush_blocking(&self) {
self.client.flush();
}

fn drop_msgs_if_disconnected(&self) {
#[inline]
fn drop_if_disconnected(&self) {
self.client.drop_if_disconnected();
}
}
Loading

0 comments on commit a3ed552

Please sign in to comment.