Skip to content

Commit

Permalink
don't expose shutdown to make errors impossible
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc committed Apr 26, 2023
1 parent 0230255 commit 8b91b65
Showing 1 changed file with 30 additions and 61 deletions.
91 changes: 30 additions & 61 deletions crates/re_log_types/src/data_table_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
time::{Duration, Instant},
};

use crossbeam::channel::{Receiver, SendError, Sender};
use crossbeam::channel::{Receiver, Sender};

use crate::{DataRow, DataTable, SizeBytes, TableId};

Expand Down Expand Up @@ -212,14 +212,19 @@ pub struct DataTableBatcher {
// NOTE: The receiving end of the command stream as well as the sending end of the table stream are
// owned solely by the batching thread.
struct DataTableBatcherInner {
/// The one and only entrypoint into the pipeline: this is _never_ cloned nor publicly exposed,
/// therefore the `Drop` implementation is guaranteed that no more data can come in while it's
/// running.
tx_cmds: Sender<Command>,
rx_tables: Receiver<DataTable>,
cmds_to_tables_handle: Option<std::thread::JoinHandle<()>>,
}

impl Drop for DataTableBatcherInner {
fn drop(&mut self) {
self.shutdown().ok(); // can only fail if already down
// NOTE: The command channel is private, if we're here, nothing is currently capable of
// sending data down the pipeline.
self.tx_cmds.send(Command::Shutdown).ok();
if let Some(handle) = self.cmds_to_tables_handle.take() {
handle.join().ok();
}
Expand Down Expand Up @@ -292,51 +297,26 @@ impl DataTableBatcher {
/// This will call [`DataRow::compute_all_size_bytes`] from the batching thread!
///
/// See `DataTableBatcher` docs for ordering semantics and multithreading guarantees.
///
/// ## Failure
///
/// This can only fail if called on a `DataTableBatcher` that has already shutdown.
/// The data can be retrieved from the returned error value in such cases.
#[inline]
pub fn push_row(&self, row: DataRow) -> Result<(), SendError<DataRow>> {
self.inner.push_row(row)
pub fn push_row(&self, row: DataRow) {
self.inner.push_row(row);
}

/// Initiates a flush of the pipeline and returns immediately.
///
/// This does **not** wait for the flush to propagate (see [`Self::flush_blocking`]).
/// See `DataTableBatcher` docs for ordering semantics and multithreading guarantees.
///
/// ## Failure
///
/// This can only fail if called on a `DataTableBatcher` that has already shutdown.
#[inline]
pub fn flush_async(&self) -> Result<(), SendError<()>> {
self.inner.flush_and_forget()
pub fn flush_async(&self) {
self.inner.flush_async();
}

/// Initiates a flush the batching pipeline and waits for it to propagate.
///
/// See `DataTableBatcher` docs for ordering semantics and multithreading guarantees.
///
/// ## Failure
///
/// This can only fail if called on a `DataTableBatcher` that has already shutdown.
#[inline]
pub fn flush_blocking(&self) -> Result<(), SendError<()>> {
self.inner.flush_blocking()
}

/// Initiates an asynchronous shutdown of the batcher.
///
/// See `DataTableBatcher` docs for ordering semantics and multithreading guarantees.
///
/// ## Failure
///
/// This can only fail if called on a `DataTableBatcher` that has already shutdown.
#[inline]
pub fn shutdown_async(&self) -> Result<(), SendError<()>> {
self.inner.shutdown()
pub fn flush_blocking(&self) {
self.inner.flush_blocking();
}

// --- Subscribe to tables ---
Expand All @@ -352,36 +332,25 @@ impl DataTableBatcher {
}

impl DataTableBatcherInner {
fn push_row(&self, row: DataRow) -> Result<(), SendError<DataRow>> {
self.send_cmd(Command::AppendRow(row))
.map_err(|err| match err.0 {
Command::AppendRow(row) => SendError(row),
_ => unreachable!(),
})
fn push_row(&self, row: DataRow) {
self.send_cmd(Command::AppendRow(row));
}

fn flush_and_forget(&self) -> Result<(), SendError<()>> {
fn flush_async(&self) {
let (flush_cmd, _) = Command::flush();
self.send_cmd(flush_cmd).map_err(|_err| SendError(()))?;
Ok(())
self.send_cmd(flush_cmd);
}

fn flush_blocking(&self) -> Result<(), SendError<()>> {
fn flush_blocking(&self) {
let (flush_cmd, oneshot) = Command::flush();
self.send_cmd(flush_cmd).map_err(|_err| SendError(()))?;
self.send_cmd(flush_cmd);
oneshot.recv().ok();
Ok(())
}

fn shutdown(&self) -> Result<(), SendError<()>> {
self.flush_and_forget()?;
self.send_cmd(Command::Shutdown)
.map_err(|_err| SendError(()))?;
Ok(())
}

fn send_cmd(&self, cmd: Command) -> Result<(), SendError<Command>> {
self.tx_cmds.send(cmd)
fn send_cmd(&self, cmd: Command) {
// NOTE: Internal channels can never be closed outside of the `Drop` impl, this cannot
// fail.
self.tx_cmds.send(cmd).ok();
}
}

Expand Down Expand Up @@ -514,12 +483,12 @@ mod tests {
assert_eq!(Err(TryRecvError::Empty), tables.try_recv());

for row in expected.to_rows() {
batcher.push_row(row).unwrap();
batcher.push_row(row);
}

assert_eq!(Err(TryRecvError::Empty), tables.try_recv());

batcher.flush_blocking().unwrap();
batcher.flush_blocking();

{
let mut table = tables.recv().unwrap();
Expand Down Expand Up @@ -548,7 +517,7 @@ mod tests {
assert_eq!(Err(TryRecvError::Empty), tables.try_recv());

for row in rows.clone() {
batcher.push_row(row).unwrap();
batcher.push_row(row);
}

assert_eq!(Err(TryRecvError::Empty), tables.try_recv());
Expand Down Expand Up @@ -601,7 +570,7 @@ mod tests {
assert_eq!(Err(TryRecvError::Empty), tables.try_recv());

for row in table.to_rows() {
batcher.push_row(row).unwrap();
batcher.push_row(row);
}

// Expect all rows except for the last one (num_bytes trigger).
Expand Down Expand Up @@ -660,7 +629,7 @@ mod tests {
assert_eq!(Err(TryRecvError::Empty), tables.try_recv());

for row in table.to_rows() {
batcher.push_row(row).unwrap();
batcher.push_row(row);
}

// Expect all rows except for the last one.
Expand Down Expand Up @@ -721,13 +690,13 @@ mod tests {
let batcher = batcher.clone();
move || {
for row in rows.drain(..rows.len() - 1) {
batcher.push_row(row).unwrap();
batcher.push_row(row);
}

std::thread::sleep(flush_duration * 2);

let row = rows.last().cloned().unwrap();
batcher.push_row(row).unwrap();
batcher.push_row(row);
}
});

Expand Down

0 comments on commit 8b91b65

Please sign in to comment.