Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fmt::Debug implementations to various types. #2784

Merged
merged 6 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions crates/re_log_encoding/src/file_sink.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::fmt;
use std::{path::PathBuf, sync::mpsc::Sender};

use parking_lot::Mutex;
Expand Down Expand Up @@ -25,6 +26,9 @@ pub struct FileSink {
// None = quit
tx: Mutex<Sender<Option<LogMsg>>>,
join_handle: Option<std::thread::JoinHandle<()>>,

/// Only used for diagnostics, not for access after `new()`.
path: PathBuf,
}

impl Drop for FileSink {
Expand Down Expand Up @@ -54,24 +58,36 @@ impl FileSink {

let join_handle = std::thread::Builder::new()
.name("file_writer".into())
.spawn(move || {
while let Ok(Some(log_msg)) = rx.recv() {
if let Err(err) = encoder.append(&log_msg) {
re_log::error!("Failed to save log stream to {path:?}: {err}");
return;
.spawn({
let path = path.clone();
move || {
while let Ok(Some(log_msg)) = rx.recv() {
if let Err(err) = encoder.append(&log_msg) {
re_log::error!("Failed to save log stream to {path:?}: {err}");
return;
}
}
re_log::debug!("Log stream saved to {path:?}");
}
re_log::debug!("Log stream saved to {path:?}");
})
.map_err(FileSinkError::SpawnThread)?;

Ok(Self {
tx: tx.into(),
join_handle: Some(join_handle),
path,
})
}

pub fn send(&self, log_msg: LogMsg) {
self.tx.lock().send(Some(log_msg)).ok();
}
}

impl fmt::Debug for FileSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FileSink")
.field("path", &self.path)
.finish_non_exhaustive()
}
}
18 changes: 18 additions & 0 deletions crates/re_sdk/src/log_sink.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::fmt;
use std::sync::Arc;

use parking_lot::RwLock;
Expand Down Expand Up @@ -69,6 +70,12 @@ impl LogSink for BufferedSink {
fn flush_blocking(&self) {}
}

impl fmt::Debug for BufferedSink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "BufferedSink {{ {} messages }}", self.0.lock().len())
}
}

/// Store log messages directly in memory.
///
/// Although very similar to `BufferedSink` this sink is a real-endpoint. When creating
Expand Down Expand Up @@ -102,6 +109,16 @@ impl LogSink for MemorySink {
fn flush_blocking(&self) {}
}

impl fmt::Debug for MemorySink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"MemorySink {{ {} messages }}",
self.buffer().read().len()
)
}
}

/// The storage used by [`MemorySink`].
#[derive(Default, Clone)]
pub struct MemorySinkStorage {
Expand Down Expand Up @@ -167,6 +184,7 @@ impl MemorySinkStorage {
// ----------------------------------------------------------------------------

/// Stream log messages to a Rerun TCP server.
#[derive(Debug)]
pub struct TcpSink {
client: re_sdk_comms::Client,
}
Expand Down
25 changes: 25 additions & 0 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::fmt;
use std::sync::{atomic::AtomicI64, Arc};

use ahash::HashMap;
Expand Down Expand Up @@ -43,6 +44,7 @@ pub type RecordingStreamResult<T> = Result<T, RecordingStreamError>;
/// let rec_stream = RecordingStreamBuilder::new("my_app").save("my_recording.rrd")?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// ```
#[derive(Debug)]
pub struct RecordingStreamBuilder {
application_id: ApplicationId,
store_kind: StoreKind,
Expand Down Expand Up @@ -836,6 +838,29 @@ impl RecordingStream {
}
}

impl fmt::Debug for RecordingStream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &*self.inner {
Some(RecordingStreamInner {
// This pattern match prevents _accidentally_ omitting data from the debug output
// when new fields are added.
info,
tick,
cmds_tx: _,
batcher: _,
batcher_to_sink_handle: _,
pid_at_creation,
}) => f
.debug_struct("RecordingStream")
.field("info", &info)
.field("tick", &tick)
.field("pid_at_creation", &pid_at_creation)
.finish_non_exhaustive(),
None => write!(f, "RecordingStream {{ disabled }}"),
}
}
}

// --- Stateful time ---

/// Thread-local data.
Expand Down
15 changes: 14 additions & 1 deletion crates/re_sdk_comms/src/buffered_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{net::SocketAddr, thread::JoinHandle};
use std::{fmt, net::SocketAddr, thread::JoinHandle};

use crossbeam::channel::{select, Receiver, Sender};

Expand Down Expand Up @@ -47,6 +47,9 @@ pub struct Client {
encode_join: Option<JoinHandle<()>>,
send_join: Option<JoinHandle<()>>,
drop_join: Option<JoinHandle<()>>,

/// Only used for diagnostics, not for communication after `new()`.
addr: SocketAddr,
}

impl Client {
Expand Down Expand Up @@ -105,6 +108,7 @@ impl Client {
encode_join: Some(encode_join),
send_join: Some(send_join),
drop_join: Some(drop_join),
addr,
}
}

Expand Down Expand Up @@ -164,6 +168,15 @@ impl Drop for Client {
}
}

impl fmt::Debug for Client {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// The other fields are all channels and join handles, so they are not usefully printable.
f.debug_struct("Client")
.field("addr", &self.addr)
.finish_non_exhaustive()
}
}

// We drop messages in a separate thread because the PyO3 + Arrow memory model
// means in some cases these messages actually store pointers back to
// python-managed memory. We don't want to block our send-thread waiting for the
Expand Down