Skip to content

Commit

Permalink
Implement H264 VideoWritter with gstreamer (#135)
Browse files Browse the repository at this point in the history
* video is recording

* improvements

* fix clippy

* improve errors

* add fake test

* improve example output

* added readme for the video_write_tasks example
  • Loading branch information
edgarriba authored Sep 18, 2024
1 parent 15249fd commit 08d368a
Show file tree
Hide file tree
Showing 11 changed files with 538 additions and 12 deletions.
30 changes: 21 additions & 9 deletions crates/kornia-imgproc/benches/bench_flip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,35 +86,47 @@ fn bench_flip(c: &mut Criterion) {
BenchmarkId::new("par_par_slicecopy", &parameter_string),
&(&image_f32, &output),
|b, i| {
let (src, mut dst) = (i.0.clone(), i.1.clone());
b.iter(|| black_box(par_par_slicecopy(&src, &mut dst)))
let (src, mut dst) = (i.0, i.1.clone());
b.iter(|| {
par_par_slicecopy(black_box(src), black_box(&mut dst));
black_box(())
})
},
);

group.bench_with_input(
BenchmarkId::new("par_loop_loop", &parameter_string),
&(&image_f32, &output),
|b, i| {
let (src, mut dst) = (i.0.clone(), i.1.clone());
b.iter(|| black_box(par_loop_loop(&src, &mut dst)))
let (src, mut dst) = (i.0, i.1.clone());
b.iter(|| {
par_loop_loop(black_box(src), black_box(&mut dst));
black_box(())
})
},
);

group.bench_with_input(
BenchmarkId::new("par_loop_slicecopy", &parameter_string),
&(&image_f32, &output),
|b, i| {
let (src, mut dst) = (i.0.clone(), i.1.clone());
b.iter(|| black_box(par_loop_slicecopy(&src, &mut dst)))
let (src, mut dst) = (i.0, i.1.clone());
b.iter(|| {
par_loop_slicecopy(black_box(src), black_box(&mut dst));
black_box(())
})
},
);

group.bench_with_input(
BenchmarkId::new("par_seq_slicecopy", &parameter_string),
&(&image_f32, &output),
|b, i| {
let (src, mut dst) = (i.0.clone(), i.1.clone());
b.iter(|| black_box(par_seq_slicecopy(&src, &mut dst)))
let (src, mut dst) = (i.0, i.1.clone());
b.iter(|| {
par_seq_slicecopy(black_box(src), black_box(&mut dst));
black_box(())
})
},
);

Expand All @@ -123,7 +135,7 @@ fn bench_flip(c: &mut Criterion) {
&(&image_f32, &output),
|b, i| {
let (src, mut dst) = (i.0, i.1.clone());
b.iter(|| black_box(flip::horizontal_flip(src, &mut dst)))
b.iter(|| flip::horizontal_flip(black_box(src), black_box(&mut dst)))
},
);
}
Expand Down
2 changes: 1 addition & 1 deletion crates/kornia-io/src/stream/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl StreamCapture {
fn get_appsink(&self) -> Result<gst_app::AppSink, StreamCaptureError> {
self.pipeline
.by_name("sink")
.ok_or_else(|| StreamCaptureError::DowncastAppSinkError)?
.ok_or_else(|| StreamCaptureError::GetElementByNameError)?
.dynamic_cast::<gst_app::AppSink>()
.map_err(StreamCaptureError::DowncastPipelineError)
}
Expand Down
8 changes: 6 additions & 2 deletions crates/kornia-io/src/stream/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ pub enum StreamCaptureError {
DowncastPipelineError(gst::Element),

/// An error occurred during GStreamer downcast of appsink.
#[error("Failed to downcast appsink")]
DowncastAppSinkError,
#[error("Failed to get an element by name")]
GetElementByNameError,

/// An error occurred during GStreamer to get the bus.
#[error("Failed to get the bus")]
Expand Down Expand Up @@ -67,4 +67,8 @@ pub enum StreamCaptureError {
/// An error for an invalid configuration.
#[error("Invalid configuration: {0}")]
InvalidConfig(String),

/// An error occurred during GStreamer to send end of stream event.
#[error("Error ocurred in the gstreamer flow")]
GstreamerFlowError(#[from] gst::FlowError),
}
4 changes: 4 additions & 0 deletions crates/kornia-io/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ pub mod rtsp;
/// A module for capturing video streams from v4l2 cameras.
pub mod v4l2;

/// A module for capturing video streams from video files.
pub mod video;

pub use crate::stream::camera::{CameraCapture, CameraCaptureConfig};
pub use crate::stream::capture::StreamCapture;
pub use crate::stream::error::StreamCaptureError;
pub use crate::stream::rtsp::RTSPCameraConfig;
pub use crate::stream::v4l2::V4L2CameraConfig;
pub use crate::stream::video::VideoWriter;
219 changes: 219 additions & 0 deletions crates/kornia-io/src/stream/video.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
use std::path::Path;

use futures::prelude::*;
use gst::prelude::*;

use kornia_image::{Image, ImageSize};

use super::StreamCaptureError;

/// The codec to use for the video writer.
pub enum VideoWriterCodec {
/// H.264 codec.
H264,
}

/// A struct for writing video files.
pub struct VideoWriter {
pipeline: gst::Pipeline,
appsrc: gst_app::AppSrc,
fps: i32,
counter: u64,
handle: Option<tokio::task::JoinHandle<()>>,
}

impl VideoWriter {
/// Create a new VideoWriter.
///
/// # Arguments
///
/// * `path` - The path to save the video file.
/// * `codec` - The codec to use for the video writer.
/// * `fps` - The frames per second of the video.
/// * `size` - The size of the video.
pub fn new(
path: impl AsRef<Path>,
codec: VideoWriterCodec,
fps: i32,
size: ImageSize,
) -> Result<Self, StreamCaptureError> {
gst::init()?;

// TODO: Add support for other codecs
#[allow(unreachable_patterns)]
let _codec = match codec {
VideoWriterCodec::H264 => "x264enc",
_ => {
return Err(StreamCaptureError::InvalidConfig(
"Unsupported codec".to_string(),
))
}
};

let path = path.as_ref().to_owned();

let pipeline_str = format!(
"appsrc name=src ! \
videoconvert ! video/x-raw,format=I420 ! \
x264enc ! \
video/x-h264,profile=main ! \
h264parse ! \
mp4mux ! \
filesink location={}",
path.to_string_lossy()
);

let pipeline = gst::parse::launch(&pipeline_str)?
.dynamic_cast::<gst::Pipeline>()
.map_err(StreamCaptureError::DowncastPipelineError)?;

let appsrc = pipeline
.by_name("src")
.ok_or_else(|| StreamCaptureError::GetElementByNameError)?
.dynamic_cast::<gst_app::AppSrc>()
.map_err(StreamCaptureError::DowncastPipelineError)?;

appsrc.set_format(gst::Format::Time);

let caps = gst::Caps::builder("video/x-raw")
.field("format", "RGB")
.field("width", size.width as i32)
.field("height", size.height as i32)
.field("framerate", gst::Fraction::new(fps, 1))
.build();

appsrc.set_caps(Some(&caps));

appsrc.set_is_live(true);
appsrc.set_property("block", false);

Ok(Self {
pipeline,
appsrc,
fps,
counter: 0,
handle: None,
})
}

/// Start the video writer
pub fn start(&mut self) -> Result<(), StreamCaptureError> {
self.pipeline.set_state(gst::State::Playing)?;

let bus = self.pipeline.bus().ok_or(StreamCaptureError::BusError)?;
let mut messages = bus.stream();

let handle = tokio::spawn(async move {
while let Some(msg) = messages.next().await {
match msg.view() {
gst::MessageView::Eos(..) => {
println!("EOS");
break;
}
gst::MessageView::Error(err) => {
eprintln!(
"Error from {:?}: {} ({:?})",
msg.src().map(|s| s.path_string()),
err.error(),
err.debug()
);
}
_ => {}
}
}
});

self.handle = Some(handle);

Ok(())
}

/// Stop the video writer
pub fn stop(&mut self) -> Result<(), StreamCaptureError> {
// Send end of stream to the appsrc
self.appsrc
.end_of_stream()
.map_err(StreamCaptureError::GstreamerFlowError)?;

// Take the handle and await it
// TODO: This is a blocking call, we need to make it non-blocking
if let Some(handle) = self.handle.take() {
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
if let Err(e) = handle.await {
eprintln!("Error waiting for handle: {:?}", e);
}
});
});
}

// Set the pipeline to null
self.pipeline.set_state(gst::State::Null)?;

Ok(())
}

/// Write an image to the video file.
///
/// # Arguments
///
/// * `img` - The image to write to the video file.
// TODO: support write_async
pub fn write(&mut self, img: &Image<u8, 3>) -> Result<(), StreamCaptureError> {
// TODO: verify is there is a cheaper way to copy the buffer
let mut buffer = gst::Buffer::from_mut_slice(img.as_slice().to_vec());

let pts = gst::ClockTime::from_nseconds(self.counter * 1_000_000_000 / self.fps as u64);
let duration = gst::ClockTime::from_nseconds(1_000_000_000 / self.fps as u64);

let buffer_ref = buffer.get_mut().expect("Failed to get buffer");
buffer_ref.set_pts(Some(pts));
buffer_ref.set_duration(Some(duration));

self.counter += 1;

if let Err(err) = self.appsrc.push_buffer(buffer) {
return Err(StreamCaptureError::InvalidConfig(err.to_string()));
}

Ok(())
}
}

impl Drop for VideoWriter {
fn drop(&mut self) {
self.stop().unwrap_or_else(|e| {
eprintln!("Error stopping video writer: {:?}", e);
});
}
}

#[cfg(test)]
mod tests {
use super::{VideoWriter, VideoWriterCodec};
use kornia_image::{Image, ImageSize};

#[test]
#[ignore = "TODO: fix this test as there's a race condition in the gstreamer flow"]
fn video_writer() -> Result<(), Box<dyn std::error::Error>> {
let tmp_dir = tempfile::tempdir()?;
std::fs::create_dir_all(tmp_dir.path())?;

let file_path = tmp_dir.path().join("test.mp4");

let size = ImageSize {
width: 6,
height: 4,
};
let mut writer = VideoWriter::new(&file_path, VideoWriterCodec::H264, 30, size)?;
writer.start()?;

let img = Image::new(size, vec![0; size.width * size.height * 3])?;
writer.write(&img)?;
writer.stop()?;

assert!(file_path.exists(), "File does not exist: {:?}", file_path);

Ok(())
}
}
14 changes: 14 additions & 0 deletions examples/video_write tasks/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "video_write_tasks"
version = "0.1.0"
authors = ["Edgar Riba <[email protected]>"]
license = "Apache-2.0"
edition = "2021"
publish = false

[dependencies]
clap = { version = "4.5.4", features = ["derive"] }
ctrlc = "3.4.4"
kornia = { workspace = true, features = ["gstreamer"] }
rerun = "0.18"
tokio = { version = "1" }
20 changes: 20 additions & 0 deletions examples/video_write tasks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
Example showing how to write a video using different background tasks.

NOTE: This example requires the gstremer backend to be enabled. To enable the gstreamer backend, use the `gstreamer` feature flag when building the `kornia` crate and its dependencies.

```bash
Usage: video_write_tasks [OPTIONS] --output <OUTPUT>

Options:
-o, --output <OUTPUT>
-c, --camera-id <CAMERA_ID> [default: 0]
-f, --fps <FPS> [default: 30]
-d, --duration <DURATION>
-h, --help Print help Print help
```

Example:

```bash
cargo run --bin video_write_tasks --release -- --output output.mp4
```
Loading

0 comments on commit 08d368a

Please sign in to comment.