Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #122 #123

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion crates/kira/src/backend/cpal/desktop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use stream_manager::{StreamManager, StreamManagerController};
use crate::backend::{Backend, Renderer};
use cpal::{
traits::{DeviceTrait, HostTrait},
BufferSize, Device, StreamConfig,
BufferSize, Device, StreamConfig, StreamError,
};

use super::{CpalBackendSettings, Error};
Expand Down Expand Up @@ -56,6 +56,44 @@ impl CpalBackend {
.pop()
.ok()
}

/// Returns the oldest available stream error in the queue.
pub fn pop_error(&mut self) -> Option<StreamError> {
if let State::Initialized {
stream_manager_controller,
} = &mut self.state
{
stream_manager_controller.pop_handled_error()
} else {
None
}
}

/// Returns the number of unhandled stream errors discarded because of overcrowding.
/// This increases when cpal produces errors faster than they are polled by kira.
pub fn num_unhandled_stream_errors_discarded(&self) -> Option<u64> {
if let State::Initialized {
stream_manager_controller,
} = &self.state
{
Some(stream_manager_controller.unhandled_stream_errors_discarded())
} else {
None
}
}

/// Returns the number of handled stream errors discarded because of overcrowding.
/// This increases when kira produces errors faster than they are polled by [`Self::pop_error`].
pub fn num_handled_stream_errors_discarded(&self) -> Option<u64> {
if let State::Initialized {
stream_manager_controller,
} = &self.state
{
Some(stream_manager_controller.handled_stream_errors_discarded())
} else {
None
}
}
}

impl Backend for CpalBackend {
Expand Down
128 changes: 101 additions & 27 deletions crates/kira/src/backend/cpal/desktop/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ mod send_on_drop;

use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Mutex,
},
time::Duration,
};
Expand All @@ -13,7 +13,7 @@ use cpal::{
traits::{DeviceTrait, HostTrait, StreamTrait},
BufferSize, Device, Stream, StreamConfig, StreamError,
};
use rtrb::{Consumer, RingBuffer};
use rtrb::{Consumer, Producer, PushError, RingBuffer};
use send_on_drop::SendOnDrop;

use super::super::Error;
Expand All @@ -28,19 +28,38 @@ enum State {
},
Running {
stream: Stream,
stream_error_consumer: Consumer<StreamError>,
renderer_consumer: Consumer<RendererWithCpuUsage>,
},
}

pub(super) struct StreamManagerController {
should_drop: Arc<AtomicBool>,
unhandled_stream_errors_discarded: Arc<AtomicU64>,
handled_stream_errors_discarded: Arc<AtomicU64>,
handled_stream_error_consumer: Mutex<Consumer<StreamError>>,
}

impl StreamManagerController {
pub fn stop(&self) {
self.should_drop.store(true, Ordering::SeqCst);
}

pub fn unhandled_stream_errors_discarded(&self) -> u64 {
self.unhandled_stream_errors_discarded
.load(Ordering::Acquire)
}

pub fn handled_stream_errors_discarded(&self) -> u64 {
self.handled_stream_errors_discarded.load(Ordering::Acquire)
}

pub fn pop_handled_error(&mut self) -> Option<StreamError> {
self.handled_stream_error_consumer
.get_mut()
.unwrap()
.pop()
.ok()
}
}

/// Starts a cpal stream and restarts it if needed
Expand All @@ -63,6 +82,16 @@ impl StreamManager {
) -> StreamManagerController {
let should_drop = Arc::new(AtomicBool::new(false));
let should_drop_clone = should_drop.clone();

let unhandled_stream_errors_discarded = Arc::new(AtomicU64::new(0));
let unhandled_stream_errors_discarded_clone = unhandled_stream_errors_discarded.clone();

let handled_stream_errors_discarded = Arc::new(AtomicU64::new(0));
let handled_stream_errors_discarded_clone = handled_stream_errors_discarded.clone();

let (mut handled_stream_error_producer, handled_stream_error_consumer) =
RingBuffer::new(64);

std::thread::spawn(move || {
let mut stream_manager = StreamManager {
state: State::Idle { renderer },
Expand All @@ -71,33 +100,66 @@ impl StreamManager {
custom_device,
buffer_size,
};
stream_manager.start_stream(&device, &mut config).unwrap();
let mut unhandled_stream_error_consumer = stream_manager
.start_stream(
&device,
&mut config,
unhandled_stream_errors_discarded.clone(),
)
.unwrap();
loop {
std::thread::sleep(CHECK_STREAM_INTERVAL);
if should_drop.load(Ordering::SeqCst) {
break;
}
stream_manager.check_stream();
stream_manager.check_stream(
&mut unhandled_stream_error_consumer,
&unhandled_stream_errors_discarded,
&mut handled_stream_error_producer,
&handled_stream_errors_discarded,
);
}
});
StreamManagerController {
should_drop: should_drop_clone,
unhandled_stream_errors_discarded: unhandled_stream_errors_discarded_clone,
handled_stream_errors_discarded: handled_stream_errors_discarded_clone,
handled_stream_error_consumer: Mutex::new(handled_stream_error_consumer),
}
}

/// Restarts the stream if the audio device gets disconnected.
fn check_stream(&mut self) {
if let State::Running {
stream_error_consumer,
..
} = &mut self.state
{
// check for device disconnection
if let Ok(StreamError::DeviceNotAvailable) = stream_error_consumer.pop() {
self.stop_stream();
if let Ok((device, mut config)) = default_device_and_config() {
// TODO: gracefully handle errors that occur in this function
self.start_stream(&device, &mut config).unwrap();
fn check_stream(
&mut self,
unhandled_stream_error_consumer: &mut Consumer<StreamError>,
unhandled_stream_errors_discarded: &Arc<AtomicU64>,
handled_stream_error_producer: &mut Producer<StreamError>,
handled_stream_errors_discarded: &Arc<AtomicU64>,
) {
if let State::Running { .. } = &self.state {
while let Ok(error) = unhandled_stream_error_consumer.pop() {
match error {
// check for device disconnection
StreamError::DeviceNotAvailable => {
self.stop_stream();
if let Ok((device, mut config)) = default_device_and_config() {
// TODO: gracefully handle errors that occur in this function
*unhandled_stream_error_consumer = self
.start_stream(
&device,
&mut config,
unhandled_stream_errors_discarded.clone(),
)
.unwrap();
}
}
StreamError::BackendSpecific { err: _ } => {}
}
match handled_stream_error_producer.push(error) {
Ok(()) => {}
Err(PushError::Full(_stream_error)) => {
handled_stream_errors_discarded.fetch_add(1, Ordering::AcqRel);
}
}
}
// check for device changes if a custom device hasn't been specified
Expand All @@ -111,14 +173,25 @@ impl StreamManager {
let sample_rate = config.sample_rate.0;
if device_name != self.device_name || sample_rate != self.sample_rate {
self.stop_stream();
self.start_stream(&device, &mut config).unwrap();
*unhandled_stream_error_consumer = self
.start_stream(
&device,
&mut config,
unhandled_stream_errors_discarded.clone(),
)
.unwrap();
}
}
}
}
}

fn start_stream(&mut self, device: &Device, config: &mut StreamConfig) -> Result<(), Error> {
fn start_stream(
&mut self,
device: &Device,
config: &mut StreamConfig,
unhandled_stream_errors_discarded: Arc<AtomicU64>,
) -> Result<Consumer<StreamError>, Error> {
let mut renderer =
if let State::Idle { renderer } = std::mem::replace(&mut self.state, State::Empty) {
renderer
Expand All @@ -134,7 +207,8 @@ impl StreamManager {
self.device_name = device_name;
self.sample_rate = sample_rate;
let (mut renderer_wrapper, renderer_consumer) = SendOnDrop::new(renderer);
let (mut stream_error_producer, stream_error_consumer) = RingBuffer::new(1);
let (mut unhandled_stream_error_producer, unhandled_stream_error_consumer) =
RingBuffer::new(64);
let channels = config.channels;
let stream = device.build_output_stream(
config,
Expand All @@ -146,20 +220,20 @@ impl StreamManager {
#[cfg(not(feature = "assert_no_alloc"))]
process_renderer(&mut renderer_wrapper, data, channels, sample_rate);
},
move |error| {
stream_error_producer
.push(error)
.expect("Stream error producer is full");
move |error| match unhandled_stream_error_producer.push(error) {
Ok(()) => {}
Err(PushError::Full(_stream_error)) => {
unhandled_stream_errors_discarded.fetch_add(1, Ordering::AcqRel);
}
},
None,
)?;
stream.play()?;
self.state = State::Running {
stream,
stream_error_consumer,
renderer_consumer,
};
Ok(())
Ok(unhandled_stream_error_consumer)
}

fn stop_stream(&mut self) {
Expand Down