Skip to content

Commit

Permalink
Rust SDK: Document that you need to call flush_blocking when using …
Browse files Browse the repository at this point in the history
…memory sink (#2630)

See:
* #2627

I added some log warnings too, but they are not sufficient. If there is
no call to `flush_blocking`, data is still silently dropped somehow.
Maybe @teh-cmc has some idea where?

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested [demo.rerun.io](https://demo.rerun.io/pr/2630) (if
applicable)

- [PR Build Summary](https://build.rerun.io/pr/2630)
- [Docs
preview](https://rerun.io/preview/pr%3Aemilk%2Fdocument-flush/docs)
- [Examples
preview](https://rerun.io/preview/pr%3Aemilk%2Fdocument-flush/examples)
  • Loading branch information
emilk authored Jul 10, 2023
1 parent 9ccaabf commit d555c66
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 5 deletions.
6 changes: 5 additions & 1 deletion crates/re_log_types/src/data_table_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,11 @@ impl Drop for DataTableBatcherInner {
fn drop(&mut self) {
// Drop the receiving end of the table stream first and foremost, so that we don't block
// even if the output channel is bounded and currently full.
drop(self.rx_tables.take());
if let Some(rx_tables) = self.rx_tables.take() {
if !rx_tables.is_empty() {
re_log::warn!("Dropping data");
}
}

// NOTE: The command channel is private, if we're here, nothing is currently capable of
// sending data down the pipeline.
Expand Down
10 changes: 8 additions & 2 deletions crates/re_sdk/src/log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ pub struct MemorySinkStorage {
pub(crate) rec_stream: Option<crate::RecordingStream>,
}

impl Drop for MemorySinkStorage {
fn drop(&mut self) {
if !self.msgs.read().is_empty() {
re_log::warn!("Dropping data in MemorySink");
}
}
}

impl MemorySinkStorage {
/// Write access to the inner array of [`LogMsg`].
#[inline]
Expand All @@ -134,9 +142,7 @@ impl MemorySinkStorage {
}
std::mem::take(&mut *self.msgs.write())
}
}

impl MemorySinkStorage {
/// Convert the stored messages into an in-memory Rerun log file.
#[inline]
pub fn concat_memory_sinks_as_bytes(
Expand Down
11 changes: 9 additions & 2 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl RecordingStreamBuilder {
///
/// ## Example
///
/// ```no_run
/// ```
/// let rec_stream = re_sdk::RecordingStreamBuilder::new("my_app").buffered()?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// ```
Expand All @@ -176,8 +176,15 @@ impl RecordingStreamBuilder {
///
/// ## Example
///
/// ```no_run
/// ```
/// # fn log_data(_: &re_sdk::RecordingStream) { }
///
/// let (rec_stream, storage) = re_sdk::RecordingStreamBuilder::new("my_app").memory()?;
///
/// log_data(&rec_stream);
///
/// let data = storage.take();
///
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// ```
pub fn memory(
Expand Down

0 comments on commit d555c66

Please sign in to comment.