From 83476cb1c1d75ff8b64af4bebdd4ea61d8869b7b Mon Sep 17 00:00:00 2001 From: will <104373134+w-utter@users.noreply.github.com> Date: Fri, 19 Apr 2024 19:39:23 +1000 Subject: [PATCH] Support opening a pipeline via frame callback fixes --- src/pipeline.rs | 1 + src/pipeline/inactive.rs | 35 ++++++++++++++- src/pipeline/streaming.rs | 95 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 130 insertions(+), 1 deletion(-) create mode 100644 src/pipeline/streaming.rs diff --git a/src/pipeline.rs b/src/pipeline.rs index 955807c..34abe6b 100644 --- a/src/pipeline.rs +++ b/src/pipeline.rs @@ -19,6 +19,7 @@ mod active; mod inactive; mod profile; +mod streaming; pub use active::{ActivePipeline, FrameWaitError}; pub use inactive::{InactivePipeline, PipelineActivationError, PipelineConstructionError}; diff --git a/src/pipeline/inactive.rs b/src/pipeline/inactive.rs index 44cb7cd..b718d30 100644 --- a/src/pipeline/inactive.rs +++ b/src/pipeline/inactive.rs @@ -1,7 +1,8 @@ //! Type for representing an "inactive" pipeline which is unconfigured and cannot acquire frames. -use super::{active::ActivePipeline, profile::PipelineProfile}; +use super::{active::ActivePipeline, profile::PipelineProfile, streaming::{StreamingPipeline, trampoline}}; use crate::{check_rs2_error, config::Config, context::Context, kind::Rs2Exception}; +use crate::frame::FrameCategory; use anyhow::Result; use realsense_sys as sys; use std::{convert::TryFrom, ptr::NonNull}; @@ -103,6 +104,27 @@ impl InactivePipeline { } } + pub fn start_streaming(self, f: F) -> Result + where + F: FnMut(&impl IntoFrame) + Send + 'static + { + unsafe { + let mut err = std::ptr::null_mut::(); + + let f = Box::into_raw(Box::new(f)); + let profile_ptr = sys::rs2_pipeline_start_with_callback(self.pipeline_ptr.as_ptr(), Some(trampoline::), f, &mut err); + + check_rs2_error!(err, PipelineActivationError::CouldNotStartPipelineError)?; + + let profile = PipelineProfile::try_from(NonNull::new(profile_ptr).unwrap())?; + + let streaming = StreamingPipeline::new(self.pipeline_ptr, profile, f); + + std::mem::forget(self); + Ok(streaming) + } + } + /// Resolve a configuration and get the corresponding pipeline profile. /// /// This function checks the pipeline to see if this config can be used to start the pipeline, @@ -157,3 +179,14 @@ impl InactivePipeline { } } } + +impl IntoFrame for NonNull { } + +pub(crate) trait IntoFrame { + fn of_type(self) -> Option + where + F: TryFrom + FrameCategory + { + F::try_from(self).ok() + } +} diff --git a/src/pipeline/streaming.rs b/src/pipeline/streaming.rs new file mode 100644 index 0000000..36a188c --- /dev/null +++ b/src/pipeline/streaming.rs @@ -0,0 +1,95 @@ +use super::{inactive::InactivePipeline, profile::PipelineProfile}; +use crate::{check_rs2_error, frame::CompositeFrame, kind::Rs2Exception}; +use anyhow::Result; +use realsense_sys as sys; +use std::{ptr::NonNull, task::Poll, time::Duration}; +use thiserror::Error; +use std::os::raw::c_void; +use crate::frame::FrameCategory; +use super::inactive::IntoFrame; + +pub(crate) unsafe extern "C" fn trampoline(frame: *mut sys::rs2_frame, data: *mut c_void) +where + F: FnMut(&impl IntoFrame) + Send + 'static, +{ + let panic = std::panic::catch_unwind(|| { + if frame.is_null() { + panic!("null frame"); + } + + let frame = core::mem::ManuallyDrop::new(NonNull::new_unchecked(frame)); + + if data.is_null() { + panic!("empty data"); + } + + let f = &mut *(data as *mut F); + + f(&frame); + }); + + if panic.is_err() { + eprintln!("Callback function panicked"); + std::process::abort(); + } +} + +pub struct StreamingPipeline { + /// A pointer to the callback function for the pipeline. + callback: *mut dyn FnMut(&impl IntoFrame), + /// A (non-null) pointer to the pipeline. + pipeline_ptr: NonNull, + /// The pipeline's profile, which contains the device the pipeline is configured for alongside + /// the stream profiles for streams in the pipeline. + profile: PipelineProfile, +} + +impl StreamingPipeline { + /// Constructs a new streaming pipeline from the constituent components + /// + /// This is only to be used / called from the [`InactivePipeline`] type. + pub(crate) fn new(pipeline_ptr: NonNull, profile: PipelineProfile, callback: F) -> Self + where + F: FnMut(&impl IntoFrame) + Send + 'static + { + Self { + pipeline_ptr, + callback, + profile, + } + } + + /// Gets the active profile of pipeline. + pub fn profile(&self) -> &PipelineProfile { + &self.profile + } + + /// Stop the pipeline. + /// + /// This method consumes the pipeline instance and returns pipeline markered inactive. + pub fn stop(self) -> InactivePipeline { + unsafe { + let mut err = std::ptr::null_mut::(); + + // The only "error" that can occur here is if the pipeline pointer is null. + // + // We know it is not (state is managed so that this isn't a possibility, and we use + // `NonNull` to try and guarantee that even beyond our state management), so there + // dealing with the error (and thus returning a result type) is superfluous here. + sys::rs2_pipeline_stop(self.pipeline_ptr.as_ptr(), &mut err); + + let inactive = InactivePipeline::new(self.pipeline_ptr); + + std::mem::forget(self); + inactive + } + } + +} + +impl Drop for StreamingPipeline { + fn drop(&mut self) { + let boxed = unsafe { Box::from_raw(self.pipeline_ptr.as_ptr()) }; + unsafe { sys::rs2_delete_pipeline(self.pipeline_ptr.as_ptr()) }; + } +}