Skip to content

Commit

Permalink
SDK DataLoaders 1: barebones Rust support (#5327)
Browse files Browse the repository at this point in the history
Exposes raw `DataLoader`s to the Rust SDK through 2 new methods:
`RecordingStream::log_file_from_path` &
`RecordingStream::log_file_from_contents`.

Everything streams asynchronously, as expected.

There's no way to configure the behavior of the `DataLoader` at all,
yet. That comes next.

```rust
rec.log_file_from_path(filepath)?;
```

![image](https://github.com/rerun-io/rerun/assets/2910679/fac1514a-a00b-40c3-8950-df076080e1fc)

This highlighted some brittleness on the shutdown path, see:
- #5335 

---

Part of series of PR to expose configurable `DataLoader`s to our SDKs:
- #5327 
- #5328 
- #5330
- #5337
  • Loading branch information
teh-cmc authored Feb 29, 2024
1 parent 07c5286 commit ffe0a30
Show file tree
Hide file tree
Showing 10 changed files with 278 additions and 16 deletions.
37 changes: 25 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/re_data_source/src/load_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub fn load_from_path(
if !path.exists() {
return Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"path does not exist: {path:?}",
format!("path does not exist: {path:?}"),
)
.into());
}
Expand Down
2 changes: 2 additions & 0 deletions crates/re_log_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ pub enum FileSource {
Cli,
DragAndDrop,
FileDialog,
Sdk,
}

/// The source of a recording or blueprint.
Expand Down Expand Up @@ -358,6 +359,7 @@ impl std::fmt::Display for StoreSource {
FileSource::Cli => write!(f, "File via CLI"),
FileSource::DragAndDrop => write!(f, "File via drag-and-drop"),
FileSource::FileDialog => write!(f, "File via file dialog"),
FileSource::Sdk => write!(f, "File via SDK"),
},
Self::Viewer => write!(f, "Viewer-generated"),
Self::Other(string) => format!("{string:?}").fmt(f), // put it in quotes
Expand Down
7 changes: 7 additions & 0 deletions crates/re_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ all-features = true
[features]
default = []

## Support for using Rerun's data-loaders directly from the SDK.
##
## See our `log_file` example and <https://www.rerun.io/docs/howto/open-any-file>
## for more information.
data_loaders = ["dep:re_data_source", "dep:re_smart_channel"]

## Support serving a web viewer over HTTP.
##
## Enabling this inflates the binary size quite a bit, since it embeds the viewer wasm.
Expand Down Expand Up @@ -58,6 +64,7 @@ thiserror.workspace = true

# Optional dependencies

re_data_source = { workspace = true, optional = true }
re_smart_channel = { workspace = true, optional = true }
re_ws_comms = { workspace = true, optional = true }
re_web_viewer_server = { workspace = true, optional = true }
Expand Down
145 changes: 143 additions & 2 deletions crates/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::{atomic::AtomicI64, Arc};
use ahash::HashMap;
use crossbeam::channel::{Receiver, Sender};

use parking_lot::Mutex;
use re_log_types::{
ApplicationId, ArrowChunkReleaseCallback, DataCell, DataCellError, DataRow, DataTable,
DataTableBatcher, DataTableBatcherConfig, DataTableBatcherError, EntityPath, LogMsg, RowId,
Expand Down Expand Up @@ -61,7 +62,7 @@ pub enum RecordingStreamError {
#[error("Failed to spawn background thread '{name}': {err}")]
SpawnThread {
/// Name of the thread
name: &'static str,
name: String,

/// Inner error explaining why the thread failed to spawn.
err: std::io::Error,
Expand All @@ -79,6 +80,11 @@ pub enum RecordingStreamError {
/// An error that can occur because a row in the store has inconsistent columns.
#[error(transparent)]
DataReadError(#[from] re_log_types::DataReadError),

/// An error occurred while attempting to use a [`re_data_source::DataLoader`].
#[cfg(feature = "data_loaders")]
#[error(transparent)]
DataLoaderError(#[from] re_data_source::DataLoaderError),
}

/// Results that can occur when creating/manipulating a [`RecordingStream`].
Expand Down Expand Up @@ -623,6 +629,12 @@ struct RecordingStreamInner {
batcher: DataTableBatcher,
batcher_to_sink_handle: Option<std::thread::JoinHandle<()>>,

/// Keeps track of the top-level threads that were spawned in order to execute the `DataLoader`
/// machinery in the context of this `RecordingStream`.
///
/// See [`RecordingStream::log_file_from_path`] and [`RecordingStream::log_file_from_contents`].
dataloader_handles: Mutex<Vec<std::thread::JoinHandle<()>>>,

pid_at_creation: u32,
}

Expand All @@ -633,6 +645,16 @@ impl Drop for RecordingStreamInner {
return;
}

// Run all pending top-level `DataLoader` threads that were started from the SDK to completion.
//
// TODO(cmc): At some point we might want to make it configurable, though I cannot really
// think of a use case where you'd want to drop those threads immediately upon
// disconnection.
let dataloader_handles = std::mem::take(&mut *self.dataloader_handles.lock());
for handle in dataloader_handles {
handle.join().ok();
}

// NOTE: The command channel is private, if we're here, nothing is currently capable of
// sending data down the pipeline.
self.batcher.flush_blocking();
Expand Down Expand Up @@ -679,7 +701,10 @@ impl RecordingStreamInner {
let batcher = batcher.clone();
move || forwarding_thread(info, sink, cmds_rx, batcher.tables(), on_release)
})
.map_err(|err| RecordingStreamError::SpawnThread { name: NAME, err })?
.map_err(|err| RecordingStreamError::SpawnThread {
name: NAME.into(),
err,
})?
};

Ok(RecordingStreamInner {
Expand All @@ -688,6 +713,7 @@ impl RecordingStreamInner {
cmds_tx,
batcher,
batcher_to_sink_handle: Some(batcher_to_sink_handle),
dataloader_handles: Mutex::new(Vec::new()),
pid_at_creation: std::process::id(),
})
}
Expand Down Expand Up @@ -989,6 +1015,103 @@ impl RecordingStream {

Ok(())
}

/// Logs the file at the given `path` using all [`re_data_source::DataLoader`]s available.
///
/// A single `path` might be handled by more than one loader.
///
/// This method blocks until either at least one [`re_data_source::DataLoader`] starts
/// streaming data in or all of them fail.
///
/// See <https://www.rerun.io/docs/howto/open-any-file> for more information.
#[cfg(feature = "data_loaders")]
pub fn log_file_from_path(
&self,
filepath: impl AsRef<std::path::Path>,
) -> RecordingStreamResult<()> {
self.log_file(filepath, None)
}

/// Logs the given `contents` using all [`re_data_source::DataLoader`]s available.
///
/// A single `path` might be handled by more than one loader.
///
/// This method blocks until either at least one [`re_data_source::DataLoader`] starts
/// streaming data in or all of them fail.
///
/// See <https://www.rerun.io/docs/howto/open-any-file> for more information.
#[cfg(feature = "data_loaders")]
pub fn log_file_from_contents(
&self,
filepath: impl AsRef<std::path::Path>,
contents: std::borrow::Cow<'_, [u8]>,
) -> RecordingStreamResult<()> {
self.log_file(filepath, Some(contents))
}

#[cfg(feature = "data_loaders")]
fn log_file(
&self,
filepath: impl AsRef<std::path::Path>,
contents: Option<std::borrow::Cow<'_, [u8]>>,
) -> RecordingStreamResult<()> {
let filepath = filepath.as_ref();
let has_contents = contents.is_some();

let (tx, rx) = re_smart_channel::smart_channel(
re_smart_channel::SmartMessageSource::Sdk,
re_smart_channel::SmartChannelSource::File(filepath.into()),
);

let Some(store_id) = &self.store_info().map(|info| info.store_id.clone()) else {
// There's no recording.
return Ok(());
};
if let Some(contents) = contents {
re_data_source::load_from_file_contents(
store_id,
re_log_types::FileSource::Sdk,
filepath,
contents,
&tx,
)?;
} else {
re_data_source::load_from_path(store_id, re_log_types::FileSource::Sdk, filepath, &tx)?;
}
drop(tx);

// We can safely ignore the error on `recv()` as we're in complete control of both ends of
// the channel.
let thread_name = if has_contents {
format!("log_file_from_contents({filepath:?})")
} else {
format!("log_file_from_path({filepath:?})")
};
let handle = std::thread::Builder::new()
.name(thread_name.clone())
.spawn({
let this = self.clone();
move || {
while let Some(msg) = rx.recv().ok().and_then(|msg| msg.into_data()) {
this.record_msg(msg);
}
}
})
.map_err(|err| RecordingStreamError::SpawnThread {
name: thread_name,
err,
})?;

debug_assert!(
self.inner.is_some(),
"recording should always be fully init at this stage"
);
if let Some(inner) = self.inner.as_ref() {
inner.dataloader_handles.lock().push(handle);
}

Ok(())
}
}

#[allow(clippy::needless_pass_by_value)]
Expand Down Expand Up @@ -1450,6 +1573,22 @@ impl RecordingStream {
/// terms of data durability and ordering.
/// See [`Self::set_sink`] for more information.
pub fn disconnect(&self) {
let Some(this) = &*self.inner else {
re_log::warn_once!("Recording disabled - call to disconnect() ignored");
return;
};

// When disconnecting, we need to make sure that pending top-level `DataLoader` threads that
// were started from the SDK run to completion.
//
// TODO(cmc): At some point we might want to make it configurable, though I cannot really
// think of a use case where you'd want to drop those threads immediately upon
// disconnection.
let dataloader_handles = std::mem::take(&mut *this.dataloader_handles.lock());
for handle in dataloader_handles {
handle.join().ok();
}

self.set_sink(Box::new(crate::sink::BufferedSink::new()));
}
}
Expand All @@ -1465,11 +1604,13 @@ impl fmt::Debug for RecordingStream {
cmds_tx: _,
batcher: _,
batcher_to_sink_handle: _,
dataloader_handles,
pid_at_creation,
}) => f
.debug_struct("RecordingStream")
.field("info", &info)
.field("tick", &tick)
.field("pending_dataloaders", &dataloader_handles.lock().len())
.field("pid_at_creation", &pid_at_creation)
.finish_non_exhaustive(),
None => write!(f, "RecordingStream {{ disabled }}"),
Expand Down
1 change: 1 addition & 0 deletions crates/re_viewer/src/viewer_analytics/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub fn open_recording(
re_log_types::FileSource::Cli => "file_cli".to_owned(),
re_log_types::FileSource::DragAndDrop => "file_drag_and_drop".to_owned(),
re_log_types::FileSource::FileDialog => "file_dialog".to_owned(),
re_log_types::FileSource::Sdk => "file_sdk".to_owned(),
},
S::Viewer => "viewer".to_owned(),
S::Other(other) => other.clone(),
Expand Down
Loading

0 comments on commit ffe0a30

Please sign in to comment.