Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #122 #123

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 34 additions & 25 deletions crates/kira/src/backend/cpal/desktop/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod send_on_drop;

use std::{
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
time::Duration,
Expand All @@ -13,7 +13,7 @@ use cpal::{
traits::{DeviceTrait, HostTrait, StreamTrait},
BufferSize, Device, Stream, StreamConfig, StreamError,
};
use rtrb::{Consumer, RingBuffer};
use rtrb::{Consumer, PushError, RingBuffer};
use send_on_drop::SendOnDrop;

use super::super::Error;
Expand All @@ -28,7 +28,7 @@ enum State {
},
Running {
stream: Stream,
stream_error_consumer: Consumer<StreamError>,
_stream_errors_discarded: Arc<AtomicU64>,
renderer_consumer: Consumer<RendererWithCpuUsage>,
},
}
Expand Down Expand Up @@ -71,13 +71,14 @@ impl StreamManager {
custom_device,
buffer_size,
};
stream_manager.start_stream(&device, &mut config).unwrap();
let mut stream_error_consumer =
stream_manager.start_stream(&device, &mut config).unwrap();
loop {
std::thread::sleep(CHECK_STREAM_INTERVAL);
if should_drop.load(Ordering::SeqCst) {
break;
}
stream_manager.check_stream();
stream_manager.check_stream(&mut stream_error_consumer);
}
});
StreamManagerController {
Expand All @@ -86,18 +87,19 @@ impl StreamManager {
}

/// Restarts the stream if the audio device gets disconnected.
fn check_stream(&mut self) {
if let State::Running {
stream_error_consumer,
..
} = &mut self.state
{
// check for device disconnection
if let Ok(StreamError::DeviceNotAvailable) = stream_error_consumer.pop() {
self.stop_stream();
if let Ok((device, mut config)) = default_device_and_config() {
// TODO: gracefully handle errors that occur in this function
self.start_stream(&device, &mut config).unwrap();
fn check_stream(&mut self, stream_error_consumer: &mut Consumer<StreamError>) {
if let State::Running { .. } = &self.state {
while let Ok(error) = stream_error_consumer.pop() {
match error {
// check for device disconnection
StreamError::DeviceNotAvailable => {
self.stop_stream();
if let Ok((device, mut config)) = default_device_and_config() {
// TODO: gracefully handle errors that occur in this function
self.start_stream(&device, &mut config).unwrap();
}
}
StreamError::BackendSpecific { err: _ } => {}
}
}
// check for device changes if a custom device hasn't been specified
Expand All @@ -118,7 +120,11 @@ impl StreamManager {
}
}

fn start_stream(&mut self, device: &Device, config: &mut StreamConfig) -> Result<(), Error> {
fn start_stream(
&mut self,
device: &Device,
config: &mut StreamConfig,
) -> Result<Consumer<StreamError>, Error> {
let mut renderer =
if let State::Idle { renderer } = std::mem::replace(&mut self.state, State::Empty) {
renderer
Expand All @@ -134,7 +140,9 @@ impl StreamManager {
self.device_name = device_name;
self.sample_rate = sample_rate;
let (mut renderer_wrapper, renderer_consumer) = SendOnDrop::new(renderer);
let (mut stream_error_producer, stream_error_consumer) = RingBuffer::new(1);
let (mut stream_error_producer, stream_error_consumer) = RingBuffer::new(64);
let stream_errors_discarded = Arc::new(AtomicU64::new(0));
let stream_errors_discarded_clone = Arc::clone(&stream_errors_discarded);
let channels = config.channels;
let stream = device.build_output_stream(
config,
Expand All @@ -146,20 +154,21 @@ impl StreamManager {
#[cfg(not(feature = "assert_no_alloc"))]
process_renderer(&mut renderer_wrapper, data, channels, sample_rate);
},
move |error| {
stream_error_producer
.push(error)
.expect("Stream error producer is full");
move |error| match stream_error_producer.push(error) {
Ok(()) => {}
Err(PushError::Full(_stream_error)) => {
stream_errors_discarded.fetch_add(1, Ordering::AcqRel);
}
},
None,
)?;
stream.play()?;
self.state = State::Running {
stream,
stream_error_consumer,
_stream_errors_discarded: stream_errors_discarded_clone,
renderer_consumer,
};
Ok(())
Ok(stream_error_consumer)
}

fn stop_stream(&mut self) {
Expand Down