Skip to content

Commit

Permalink
Add fmt::Debug implementations to various types. (#2784)
Browse files Browse the repository at this point in the history
### What

Adds `std::fmt::Debug` implementations to:

* `RecordingStream`
* `RecordingStreamBuilder`
* `FileSink`
* `MemorySink`
* `BufferedSink`
* `TcpSink`

Most of these do not add very much useful information, but having the
implementations means that users are not blocked from using
`#[derive(Debug)]` on _their_ structs. It is general [good
practice](https://rust-lang.github.io/api-guidelines/interoperability.html#types-eagerly-implement-common-traits-c-common-traits)
in Rust to implement `Debug` for all public types where this is at all
reasonable.

(I added a printable address field to `TcpSink` since that's very cheap,
but did not add one to `FileSink` because it would require cloning the
path. Perhaps that's not drawing the best line.)

### Checklist
* [X] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [X] I've included a screenshot or gif (if applicable)

---------

Co-authored-by: Clement Rey <[email protected]>
  • Loading branch information
kpreid and teh-cmc authored Jul 25, 2023
1 parent 2bf24c2 commit be043df
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 7 deletions.
28 changes: 22 additions & 6 deletions crates/re_log_encoding/src/file_sink.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::fmt;
use std::{path::PathBuf, sync::mpsc::Sender};

use parking_lot::Mutex;
Expand Down Expand Up @@ -25,6 +26,9 @@ pub struct FileSink {
// None = quit
tx: Mutex<Sender<Option<LogMsg>>>,
join_handle: Option<std::thread::JoinHandle<()>>,

/// Only used for diagnostics, not for access after `new()`.
path: PathBuf,
}

impl Drop for FileSink {
Expand Down Expand Up @@ -54,24 +58,36 @@ impl FileSink {

let join_handle = std::thread::Builder::new()
.name("file_writer".into())
.spawn(move || {
while let Ok(Some(log_msg)) = rx.recv() {
if let Err(err) = encoder.append(&log_msg) {
re_log::error!("Failed to save log stream to {path:?}: {err}");
return;
.spawn({
let path = path.clone();
move || {
while let Ok(Some(log_msg)) = rx.recv() {
if let Err(err) = encoder.append(&log_msg) {
re_log::error!("Failed to save log stream to {path:?}: {err}");
return;
}
}
re_log::debug!("Log stream saved to {path:?}");
}
re_log::debug!("Log stream saved to {path:?}");
})
.map_err(FileSinkError::SpawnThread)?;

Ok(Self {
tx: tx.into(),
join_handle: Some(join_handle),
path,
})
}

pub fn send(&self, log_msg: LogMsg) {
self.tx.lock().send(Some(log_msg)).ok();
}
}

impl fmt::Debug for FileSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FileSink")
.field("path", &self.path)
.finish_non_exhaustive()
}
}
18 changes: 18 additions & 0 deletions crates/re_sdk/src/log_sink.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::fmt;
use std::sync::Arc;

use parking_lot::RwLock;
Expand Down Expand Up @@ -69,6 +70,12 @@ impl LogSink for BufferedSink {
fn flush_blocking(&self) {}
}

impl fmt::Debug for BufferedSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "BufferedSink {{ {} messages }}", self.0.lock().len())
}
}

/// Store log messages directly in memory.
///
/// Although very similar to `BufferedSink` this sink is a real-endpoint. When creating
Expand Down Expand Up @@ -102,6 +109,16 @@ impl LogSink for MemorySink {
fn flush_blocking(&self) {}
}

impl fmt::Debug for MemorySink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"MemorySink {{ {} messages }}",
self.buffer().read().len()
)
}
}

/// The storage used by [`MemorySink`].
#[derive(Default, Clone)]
pub struct MemorySinkStorage {
Expand Down Expand Up @@ -167,6 +184,7 @@ impl MemorySinkStorage {
// ----------------------------------------------------------------------------

/// Stream log messages to a Rerun TCP server.
#[derive(Debug)]
pub struct TcpSink {
client: re_sdk_comms::Client,
}
Expand Down
25 changes: 25 additions & 0 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::fmt;
use std::sync::{atomic::AtomicI64, Arc};

use ahash::HashMap;
Expand Down Expand Up @@ -43,6 +44,7 @@ pub type RecordingStreamResult<T> = Result<T, RecordingStreamError>;
/// let rec_stream = RecordingStreamBuilder::new("my_app").save("my_recording.rrd")?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// ```
#[derive(Debug)]
pub struct RecordingStreamBuilder {
application_id: ApplicationId,
store_kind: StoreKind,
Expand Down Expand Up @@ -836,6 +838,29 @@ impl RecordingStream {
}
}

impl fmt::Debug for RecordingStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &*self.inner {
Some(RecordingStreamInner {
// This pattern match prevents _accidentally_ omitting data from the debug output
// when new fields are added.
info,
tick,
cmds_tx: _,
batcher: _,
batcher_to_sink_handle: _,
pid_at_creation,
}) => f
.debug_struct("RecordingStream")
.field("info", &info)
.field("tick", &tick)
.field("pid_at_creation", &pid_at_creation)
.finish_non_exhaustive(),
None => write!(f, "RecordingStream {{ disabled }}"),
}
}
}

// --- Stateful time ---

/// Thread-local data.
Expand Down
15 changes: 14 additions & 1 deletion crates/re_sdk_comms/src/buffered_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{net::SocketAddr, thread::JoinHandle};
use std::{fmt, net::SocketAddr, thread::JoinHandle};

use crossbeam::channel::{select, Receiver, Sender};

Expand Down Expand Up @@ -47,6 +47,9 @@ pub struct Client {
encode_join: Option<JoinHandle<()>>,
send_join: Option<JoinHandle<()>>,
drop_join: Option<JoinHandle<()>>,

/// Only used for diagnostics, not for communication after `new()`.
addr: SocketAddr,
}

impl Client {
Expand Down Expand Up @@ -105,6 +108,7 @@ impl Client {
encode_join: Some(encode_join),
send_join: Some(send_join),
drop_join: Some(drop_join),
addr,
}
}

Expand Down Expand Up @@ -164,6 +168,15 @@ impl Drop for Client {
}
}

impl fmt::Debug for Client {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// The other fields are all channels and join handles, so they are not usefully printable.
f.debug_struct("Client")
.field("addr", &self.addr)
.finish_non_exhaustive()
}
}

// We drop messages in a separate thread because the PyO3 + Arrow memory model
// means in some cases these messages actually store pointers back to
// python-managed memory. We don't want to block our send-thread waiting for the
Expand Down

0 comments on commit be043df

Please sign in to comment.