Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support opening a pipeline via frame callback #17

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
mod active;
mod inactive;
mod profile;
mod streaming;

pub use active::{ActivePipeline, FrameWaitError};
pub use inactive::{InactivePipeline, PipelineActivationError, PipelineConstructionError};
Expand Down
35 changes: 34 additions & 1 deletion src/pipeline/inactive.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
//! Type for representing an "inactive" pipeline which is unconfigured and cannot acquire frames.

use super::{active::ActivePipeline, profile::PipelineProfile};
use super::{active::ActivePipeline, profile::PipelineProfile, streaming::{StreamingPipeline, trampoline}};
use crate::{check_rs2_error, config::Config, context::Context, kind::Rs2Exception};
use crate::frame::FrameCategory;
use anyhow::Result;
use realsense_sys as sys;
use std::{convert::TryFrom, ptr::NonNull};
Expand Down Expand Up @@ -103,6 +104,27 @@ impl InactivePipeline {
}
}

pub fn start_streaming<F>(self, f: F) -> Result<StreamingPipeline>
where
F: FnMut(&impl IntoFrame) + Send + 'static
{
unsafe {
let mut err = std::ptr::null_mut::<sys::rs2_error>();

let f = Box::into_raw(Box::new(f));
let profile_ptr = sys::rs2_pipeline_start_with_callback(self.pipeline_ptr.as_ptr(), Some(trampoline::<F>), f, &mut err);

check_rs2_error!(err, PipelineActivationError::CouldNotStartPipelineError)?;

let profile = PipelineProfile::try_from(NonNull::new(profile_ptr).unwrap())?;

let streaming = StreamingPipeline::new(self.pipeline_ptr, profile, f);

std::mem::forget(self);
Ok(streaming)
}
}

/// Resolve a configuration and get the corresponding pipeline profile.
///
/// This function checks the pipeline to see if this config can be used to start the pipeline,
Expand Down Expand Up @@ -157,3 +179,14 @@ impl InactivePipeline {
}
}
}

impl IntoFrame for NonNull<sys::rs2_frame> { }

pub(crate) trait IntoFrame {
fn of_type<F>(self) -> Option<F>
where
F: TryFrom<Self> + FrameCategory
{
F::try_from(self).ok()
}
}
95 changes: 95 additions & 0 deletions src/pipeline/streaming.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use super::{inactive::InactivePipeline, profile::PipelineProfile};
use crate::{check_rs2_error, frame::CompositeFrame, kind::Rs2Exception};
use anyhow::Result;
use realsense_sys as sys;
use std::{ptr::NonNull, task::Poll, time::Duration};
use thiserror::Error;
use std::os::raw::c_void;
use crate::frame::FrameCategory;
use super::inactive::IntoFrame;

pub(crate) unsafe extern "C" fn trampoline<F>(frame: *mut sys::rs2_frame, data: *mut c_void)
where
F: FnMut(&impl IntoFrame) + Send + 'static,
{
let panic = std::panic::catch_unwind(|| {
if frame.is_null() {
panic!("null frame");
}

let frame = core::mem::ManuallyDrop::new(NonNull::new_unchecked(frame));

if data.is_null() {
panic!("empty data");
}

let f = &mut *(data as *mut F);

f(&frame);
});

if panic.is_err() {
eprintln!("Callback function panicked");
std::process::abort();
}
}

pub struct StreamingPipeline {
/// A pointer to the callback function for the pipeline.
callback: *mut dyn FnMut(&impl IntoFrame),
/// A (non-null) pointer to the pipeline.
pipeline_ptr: NonNull<sys::rs2_pipeline>,
/// The pipeline's profile, which contains the device the pipeline is configured for alongside
/// the stream profiles for streams in the pipeline.
profile: PipelineProfile,
}

impl StreamingPipeline {
/// Constructs a new streaming pipeline from the constituent components
///
/// This is only to be used / called from the [`InactivePipeline`] type.
pub(crate) fn new<F>(pipeline_ptr: NonNull<sys::rs2_pipeline>, profile: PipelineProfile, callback: F) -> Self
where
F: FnMut(&impl IntoFrame) + Send + 'static
{
Self {
pipeline_ptr,
callback,
profile,
}
}

/// Gets the active profile of pipeline.
pub fn profile(&self) -> &PipelineProfile {
&self.profile
}

/// Stop the pipeline.
///
/// This method consumes the pipeline instance and returns pipeline markered inactive.
pub fn stop(self) -> InactivePipeline {
unsafe {
let mut err = std::ptr::null_mut::<sys::rs2_error>();

// The only "error" that can occur here is if the pipeline pointer is null.
//
// We know it is not (state is managed so that this isn't a possibility, and we use
// `NonNull` to try and guarantee that even beyond our state management), so there
// dealing with the error (and thus returning a result type) is superfluous here.
sys::rs2_pipeline_stop(self.pipeline_ptr.as_ptr(), &mut err);

let inactive = InactivePipeline::new(self.pipeline_ptr);

std::mem::forget(self);
inactive
}
}

}

impl Drop for StreamingPipeline {
fn drop(&mut self) {
let boxed = unsafe { Box::from_raw(self.pipeline_ptr.as_ptr()) };
unsafe { sys::rs2_delete_pipeline(self.pipeline_ptr.as_ptr()) };
}
}