Skip to content

Commit

Permalink
Make FileSink actually flush its data when asked to (#3525)
Browse files Browse the repository at this point in the history
`FileSink` currently ignores flushing requests, which leads to [very
hard to track down issues where data is silently dropped even though
you're really really trying very hard to flush that dang
RecordingStream](#3522 (comment)).

- Unblocks #3522
  • Loading branch information
teh-cmc authored Sep 28, 2023
1 parent 421b6a7 commit 74526f8
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 8 deletions.
4 changes: 4 additions & 0 deletions crates/re_log_encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ impl<W: std::io::Write> Encoder<W> {

Ok(())
}

pub fn flush_blocking(&mut self) -> std::io::Result<()> {
self.write.flush()
}
}

pub fn encode<'a>(
Expand Down
49 changes: 42 additions & 7 deletions crates/re_log_encoding/src/file_sink.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::fmt;
use std::{path::PathBuf, sync::mpsc::Sender};
use std::{
path::PathBuf,
sync::mpsc::{Receiver, Sender, SyncSender},
};

use parking_lot::Mutex;

Expand All @@ -21,10 +24,22 @@ pub enum FileSinkError {
LogMsgEncode(#[from] crate::encoder::EncodeError),
}

enum Command {
Send(LogMsg),
Flush(SyncSender<()>),
}

impl Command {
fn flush() -> (Self, Receiver<()>) {
let (tx, rx) = std::sync::mpsc::sync_channel(0); // oneshot
(Self::Flush(tx), rx)
}
}

/// Stream log messages to an `.rrd` file.
pub struct FileSink {
// None = quit
tx: Mutex<Sender<Option<LogMsg>>>,
tx: Mutex<Sender<Option<Command>>>,
join_handle: Option<std::thread::JoinHandle<()>>,

/// Only used for diagnostics, not for access after `new()`.
Expand Down Expand Up @@ -65,10 +80,22 @@ impl FileSink {
.spawn({
let path = path.clone();
move || {
while let Ok(Some(log_msg)) = rx.recv() {
if let Err(err) = encoder.append(&log_msg) {
re_log::error!("Failed to save log stream to {path:?}: {err}");
return;
while let Ok(Some(cmd)) = rx.recv() {
match cmd {
Command::Send(log_msg) => {
if let Err(err) = encoder.append(&log_msg) {
re_log::error!("Failed to save log stream to {path:?}: {err}");
return;
}
}
Command::Flush(oneshot) => {
re_log::trace!("Flushing…");
if let Err(err) = encoder.flush_blocking() {
re_log::error!("Failed to flush log stream to {path:?}: {err}");
return;
}
drop(oneshot); // signals the oneshot
}
}
}
re_log::debug!("Log stream saved to {path:?}");
Expand All @@ -83,8 +110,16 @@ impl FileSink {
})
}

#[inline]
pub fn flush_blocking(&self) {
let (cmd, oneshot) = Command::flush();
self.tx.lock().send(Some(cmd)).ok();
oneshot.recv().ok();
}

#[inline]
pub fn send(&self, log_msg: LogMsg) {
self.tx.lock().send(Some(log_msg)).ok();
self.tx.lock().send(Some(Command::Send(log_msg))).ok();
}
}

Expand Down
4 changes: 3 additions & 1 deletion crates/re_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ impl crate::sink::LogSink for re_log_encoding::FileSink {
}

#[inline]
fn flush_blocking(&self) {}
fn flush_blocking(&self) {
re_log_encoding::FileSink::flush_blocking(self);
}
}

// ---------------
Expand Down

0 comments on commit 74526f8

Please sign in to comment.