Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python SDK: introduce deferred garbage collection queue #4583

Merged
merged 10 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 7 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 69 additions & 1 deletion crates/re_log_types/src/arrow_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,68 @@
//! We have custom implementations of [`serde::Serialize`] and [`serde::Deserialize`] that wraps
//! the inner Arrow serialization of [`Schema`] and [`Chunk`].

use std::sync::Arc;

use crate::{TableId, TimePoint};
use arrow2::{array::Array, chunk::Chunk, datatypes::Schema};

/// An arbitrary callback to be run when an [`ArrowMsg`], and more specifically the
/// Arrow [`Chunk`] within it, goes out of scope.
///
/// If the [`ArrowMsg`] has been cloned in a bunch of places, the callback will run for each and
/// every instance.
/// It is up to the callback implementer to handle this, if needed.
#[allow(clippy::type_complexity)]
#[derive(Clone)]
pub struct ArrowChunkReleaseCallback(Arc<dyn Fn(Chunk<Box<dyn Array>>) + Send + Sync>);

impl std::ops::Deref for ArrowChunkReleaseCallback {
type Target = dyn Fn(Chunk<Box<dyn Array>>) + Send + Sync;

#[inline]
fn deref(&self) -> &Self::Target {
&*self.0
}
}

impl<F> From<F> for ArrowChunkReleaseCallback
where
F: Fn(Chunk<Box<dyn Array>>) + Send + Sync + 'static,
{
#[inline]
fn from(f: F) -> Self {
Self(Arc::new(f))
}
}

impl ArrowChunkReleaseCallback {
#[inline]
pub fn as_ptr(&self) -> *const () {
Arc::as_ptr(&self.0).cast::<()>()
}
}

impl PartialEq for ArrowChunkReleaseCallback {
#[inline]
fn eq(&self, other: &Self) -> bool {
std::ptr::eq(self.as_ptr(), other.as_ptr())
}
}

impl Eq for ArrowChunkReleaseCallback {}

impl std::fmt::Debug for ArrowChunkReleaseCallback {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("ArrowChunkReleaseCallback")
.field(&format!("{:p}", self.as_ptr()))
.finish()
}
}

/// Message containing an Arrow payload
#[must_use]
#[derive(Clone, Debug, PartialEq)]
#[must_use]
pub struct ArrowMsg {
/// Unique identifier for the [`crate::DataTable`] in this message.
pub table_id: TableId,
Expand All @@ -24,6 +80,17 @@ pub struct ArrowMsg {

/// Data for all control & data columns.
pub chunk: Chunk<Box<dyn Array>>,

// pub on_release: Option<Arc<dyn FnOnce() + Send + Sync>>,
pub on_release: Option<ArrowChunkReleaseCallback>,
}

impl Drop for ArrowMsg {
fn drop(&mut self) {
if let Some(on_release) = self.on_release.take() {
(*on_release)(self.chunk.clone() /* shallow */);
}
}
}

#[cfg(feature = "serde")]
Expand Down Expand Up @@ -127,6 +194,7 @@ impl<'de> serde::Deserialize<'de> for ArrowMsg {
timepoint_max,
schema,
chunk,
on_release: None,
})
} else {
Err(serde::de::Error::custom(
Expand Down
2 changes: 2 additions & 0 deletions crates/re_log_types/src/data_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,7 @@ impl DataTable {
timepoint_max: _,
schema,
chunk,
on_release: _,
} = msg;

Self::deserialize(*table_id, schema, chunk)
Expand All @@ -1066,6 +1067,7 @@ impl DataTable {
timepoint_max,
schema,
chunk,
on_release: None,
})
}
}
Expand Down
10 changes: 9 additions & 1 deletion crates/re_log_types/src/data_table_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub type DataTableBatcherResult<T> = Result<T, DataTableBatcherError>;
/// Defines the different thresholds of the associated [`DataTableBatcher`].
///
/// See [`Self::default`] and [`Self::from_env`].
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DataTableBatcherConfig {
/// Duration of the periodic tick.
//
Expand All @@ -64,6 +64,11 @@ pub struct DataTableBatcherConfig {
///
/// Unbounded if left unspecified.
pub max_tables_in_flight: Option<u64>,

/// Callback to be run when an Arrow Chunk` goes out of scope.
///
/// See [`crate::ArrowChunkReleaseCallback`] for more information.
pub on_release: Option<crate::ArrowChunkReleaseCallback>,
}

impl Default for DataTableBatcherConfig {
Expand All @@ -80,6 +85,7 @@ impl DataTableBatcherConfig {
flush_num_rows: u64::MAX,
max_commands_in_flight: None,
max_tables_in_flight: None,
on_release: None,
};

/// Always flushes ASAP.
Expand All @@ -89,6 +95,7 @@ impl DataTableBatcherConfig {
flush_num_rows: 0,
max_commands_in_flight: None,
max_tables_in_flight: None,
on_release: None,
};

/// Never flushes unless manually told to.
Expand All @@ -98,6 +105,7 @@ impl DataTableBatcherConfig {
flush_num_rows: u64::MAX,
max_commands_in_flight: None,
max_tables_in_flight: None,
on_release: None,
};

/// Environment variable to configure [`Self::flush_tick`].
Expand Down
2 changes: 1 addition & 1 deletion crates/re_log_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ mod data_table_batcher;

use std::sync::Arc;

pub use self::arrow_msg::ArrowMsg;
pub use self::arrow_msg::{ArrowChunkReleaseCallback, ArrowMsg};
pub use self::data_cell::{DataCell, DataCellError, DataCellInner, DataCellResult};
pub use self::data_row::{
DataCellRow, DataCellVec, DataReadError, DataReadResult, DataRow, DataRowError, DataRowResult,
Expand Down
14 changes: 14 additions & 0 deletions crates/re_sdk/src/log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,20 @@ pub trait LogSink: Send + Sync + 'static {
#[derive(Default)]
pub struct BufferedSink(parking_lot::Mutex<Vec<LogMsg>>);

impl Drop for BufferedSink {
fn drop(&mut self) {
for msg in self.0.lock().iter() {
// Sinks intentionally end up with pending SetStoreInfo messages
// these are fine to drop safely. Anything else should produce a
// warning.
if !matches!(msg, LogMsg::SetStoreInfo(_)) {
re_log::warn!("Dropping data in BufferedSink");
return;
}
}
}
}

impl BufferedSink {
/// An empty buffer.
#[inline]
Expand Down
21 changes: 13 additions & 8 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use ahash::HashMap;
use crossbeam::channel::{Receiver, Sender};

use re_log_types::{
ApplicationId, DataCell, DataCellError, DataRow, DataTable, DataTableBatcher,
DataTableBatcherConfig, DataTableBatcherError, EntityPath, LogMsg, RowId, StoreId, StoreInfo,
StoreKind, StoreSource, Time, TimeInt, TimePoint, TimeType, Timeline, TimelineName,
ApplicationId, ArrowChunkReleaseCallback, DataCell, DataCellError, DataRow, DataTable,
DataTableBatcher, DataTableBatcherConfig, DataTableBatcherError, EntityPath, LogMsg, RowId,
StoreId, StoreInfo, StoreKind, StoreSource, Time, TimeInt, TimePoint, TimeType, Timeline,
TimelineName,
};
use re_types_core::{components::InstanceKey, AsComponents, ComponentBatch, SerializationError};

Expand Down Expand Up @@ -610,6 +611,7 @@ impl RecordingStreamInner {
batcher_config: DataTableBatcherConfig,
sink: Box<dyn LogSink>,
) -> RecordingStreamResult<Self> {
let on_release = batcher_config.on_release.clone();
let batcher = DataTableBatcher::new(batcher_config)?;

{
Expand All @@ -636,7 +638,7 @@ impl RecordingStreamInner {
.spawn({
let info = info.clone();
let batcher = batcher.clone();
move || forwarding_thread(info, sink, cmds_rx, batcher.tables())
move || forwarding_thread(info, sink, cmds_rx, batcher.tables(), on_release)
})
.map_err(|err| RecordingStreamError::SpawnThread { name: NAME, err })?
};
Expand Down Expand Up @@ -956,6 +958,7 @@ fn forwarding_thread(
mut sink: Box<dyn LogSink>,
cmds_rx: Receiver<Command>,
tables: Receiver<DataTable>,
on_release: Option<ArrowChunkReleaseCallback>,
) {
/// Returns `true` to indicate that processing can continue; i.e. `false` means immediate
/// shutdown.
Expand Down Expand Up @@ -1018,15 +1021,16 @@ fn forwarding_thread(
// NOTE: Always pop tables first, this is what makes `Command::PopPendingTables` possible,
// which in turns makes `RecordingStream::flush_blocking` well defined.
while let Ok(table) = tables.try_recv() {
let table = match table.to_arrow_msg() {
let mut arrow_msg = match table.to_arrow_msg() {
Ok(table) => table,
Err(err) => {
re_log::error!(%err,
"couldn't serialize table; data dropped (this is a bug in Rerun!)");
continue;
}
};
sink.send(LogMsg::ArrowMsg(info.store_id.clone(), table));
arrow_msg.on_release = on_release.clone();
sink.send(LogMsg::ArrowMsg(info.store_id.clone(), arrow_msg));
}

select! {
Expand All @@ -1037,15 +1041,16 @@ fn forwarding_thread(
re_log::trace!("Shutting down forwarding_thread: batcher is gone");
break;
};
let table = match table.to_arrow_msg() {
let mut arrow_msg = match table.to_arrow_msg() {
Ok(table) => table,
Err(err) => {
re_log::error!(%err,
"couldn't serialize table; data dropped (this is a bug in Rerun!)");
continue;
}
};
sink.send(LogMsg::ArrowMsg(info.store_id.clone(), table));
arrow_msg.on_release = on_release.clone();
sink.send(LogMsg::ArrowMsg(info.store_id.clone(), arrow_msg));
}
recv(cmds_rx) -> res => {
let Ok(cmd) = res else {
Expand Down
Loading
Loading