Skip to content

Commit

Permalink
support async closure in the stream camera (#133)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarriba authored Sep 12, 2024
1 parent c7fafef commit 15249fd
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 42 deletions.
2 changes: 1 addition & 1 deletion crates/kornia-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ version.workspace = true
[dependencies]

# external
arrow-buffer = "52.2.0"
arrow-buffer = "53.0.0"
num-traits = "0.2"
serde = { version = "1", features = ["derive"] }
thiserror = "1"
Expand Down
22 changes: 14 additions & 8 deletions crates/kornia-io/src/stream/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,15 @@ impl StreamCapture {
/// # Returns
///
/// A Result indicating success or a StreamCaptureError.
pub async fn run<F>(&self, f: F) -> Result<(), StreamCaptureError>
pub async fn run<F>(&self, mut f: F) -> Result<(), StreamCaptureError>
where
F: FnMut(Image<u8, 3>) -> Result<(), Box<dyn std::error::Error>>,
{
self.run_internal(f, None::<futures::future::Ready<()>>)
.await
self.run_internal(
|img| futures::future::ready(f(img)),
None::<futures::future::Ready<()>>,
)
.await
}

/// Runs the stream capture pipeline with a termination signal and processes each frame.
Expand All @@ -57,13 +60,15 @@ impl StreamCapture {
/// # Returns
///
/// A Result indicating success or a StreamCaptureError.
pub async fn run_with_termination<F, S: Future<Output = ()>>(
pub async fn run_with_termination<F, Fut, S>(
&self,
f: F,
signal: S,
) -> Result<(), StreamCaptureError>
where
F: FnMut(Image<u8, 3>) -> Result<(), Box<dyn std::error::Error>>,
F: FnMut(Image<u8, 3>) -> Fut,
Fut: Future<Output = Result<(), Box<dyn std::error::Error>>>,
S: Future<Output = ()>,
{
self.run_internal(f, Some(signal)).await
}
Expand All @@ -78,13 +83,14 @@ impl StreamCapture {
/// # Returns
///
/// A Result indicating success or a StreamCaptureError.
async fn run_internal<F, S>(
async fn run_internal<F, Fut, S>(
&self,
mut f: F,
signal: Option<S>,
) -> Result<(), StreamCaptureError>
where
F: FnMut(Image<u8, 3>) -> Result<(), Box<dyn std::error::Error>>,
F: FnMut(Image<u8, 3>) -> Fut,
Fut: Future<Output = Result<(), Box<dyn std::error::Error>>>,
S: Future<Output = ()>,
{
let (tx, mut rx) = tokio::sync::mpsc::channel(10);
Expand All @@ -109,7 +115,7 @@ impl StreamCapture {
tokio::select! {
img = rx.recv() => {
if let Some(img) = img {
f(img)?;
f(img).await?;
} else {
break;
}
Expand Down
77 changes: 44 additions & 33 deletions examples/rtspcam/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,44 +47,55 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let fps_counter = Arc::new(Mutex::new(FpsCounter::new()));

// preallocate images
let mut img_f32 = Image::<f32, 3>::from_size_val([640, 360].into(), 0.0)?;
let mut gray = Image::<f32, 1>::from_size_val(img_f32.size(), 0.0)?;
let img_f32 = Image::<f32, 3>::from_size_val([640, 360].into(), 0.0)?;
let gray = Image::<f32, 1>::from_size_val(img_f32.size(), 0.0)?;

let img_f32 = Arc::new(Mutex::new(img_f32));
let gray = Arc::new(Mutex::new(gray));

// start grabbing frames from the camera
capture
.run_with_termination(
|img| {
// update the fps counter
fps_counter
.lock()
.expect("Failed to lock fps counter")
.new_frame();

// cast the image to floating point and convert to grayscale
ops::cast_and_scale(&img, &mut img_f32, 1.0 / 255.0)?;
imgproc::color::gray_from_rgb(&img_f32, &mut gray)?;

// log the image
rec.log_static(
"image",
&rerun::Image::from_elements(
img.as_slice(),
img.size().into(),
rerun::ColorModel::RGB,
),
)?;

// log the grayscale image
rec.log_static(
"gray",
&rerun::Image::from_elements(
gray.as_slice(),
gray.size().into(),
rerun::ColorModel::L,
),
)?;

Ok(())
let rec = rec.clone();
let fps_counter = fps_counter.clone();

let img_f32 = img_f32.clone();
let gray = gray.clone();

async move {
// update the fps counter
fps_counter.lock().unwrap().new_frame();

// cast the image to floating point and convert to grayscale
let mut img_f32 = img_f32.lock().expect("Failed to lock img_f32");
ops::cast_and_scale(&img, &mut img_f32, 1.0 / 255.0)?;

let mut gray = gray.lock().expect("Failed to lock gray");
imgproc::color::gray_from_rgb(&img_f32, &mut gray)?;

// log the image
rec.log_static(
"image",
&rerun::Image::from_elements(
img.as_slice(),
img.size().into(),
rerun::ColorModel::RGB,
),
)?;

// log the grayscale image
rec.log_static(
"gray",
&rerun::Image::from_elements(
gray.as_slice(),
gray.size().into(),
rerun::ColorModel::L,
),
)?;

Ok(())
}
},
async {
signal::ctrl_c().await.expect("Failed to listen for Ctrl+C");
Expand Down

0 comments on commit 15249fd

Please sign in to comment.