Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #122 #123

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 24 additions & 46 deletions crates/kira/src/backend/cpal/desktop/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ mod send_on_drop;

use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
},
time::Duration,
};
Expand All @@ -13,26 +13,13 @@ use cpal::{
traits::{DeviceTrait, HostTrait, StreamTrait},
BufferSize, Device, Stream, StreamConfig, StreamError,
};
use rtrb::Consumer;
use rtrb::{Consumer, PushError, RingBuffer};
use send_on_drop::SendOnDrop;

use super::super::Error;

const CHECK_STREAM_INTERVAL: Duration = Duration::from_millis(500);

#[derive(Clone)]
struct StreamErrorQueue {
queue: Arc<Mutex<Vec<StreamError>>>,
}

impl Default for StreamErrorQueue {
fn default() -> Self {
Self {
queue: Arc::new(Mutex::new(Vec::with_capacity(3))),
}
}
}

#[allow(clippy::large_enum_variant)]
enum State {
Empty,
Expand All @@ -41,7 +28,7 @@ enum State {
},
Running {
stream: Stream,
stream_error_consumer: StreamErrorQueue,
_stream_errors_discarded: Arc<AtomicU64>,
renderer_consumer: Consumer<RendererWithCpuUsage>,
},
}
Expand Down Expand Up @@ -84,14 +71,14 @@ impl StreamManager {
custom_device,
buffer_size,
};
stream_manager.start_stream(&device, &mut config).unwrap();
let mut stream_error_buffer = Vec::with_capacity(3);
let mut stream_error_consumer =
stream_manager.start_stream(&device, &mut config).unwrap();
loop {
std::thread::sleep(CHECK_STREAM_INTERVAL);
if should_drop.load(Ordering::SeqCst) {
break;
}
stream_manager.check_stream(&mut stream_error_buffer);
stream_manager.check_stream(&mut stream_error_consumer);
}
});
StreamManagerController {
Expand All @@ -100,20 +87,9 @@ impl StreamManager {
}

/// Restarts the stream if the audio device gets disconnected.
fn check_stream(&mut self, stream_error_buffer: &mut Vec<StreamError>) {
if let State::Running {
stream_error_consumer,
..
} = &mut self.state
{
stream_error_buffer.append(
&mut stream_error_consumer
.queue
.lock()
.expect("Audio thread panicked while sending error"),
);

for error in stream_error_buffer.drain(..) {
fn check_stream(&mut self, stream_error_consumer: &mut Consumer<StreamError>) {
if let State::Running { .. } = &self.state {
while let Ok(error) = stream_error_consumer.pop() {
match error {
// check for device disconnection
StreamError::DeviceNotAvailable => {
Expand All @@ -126,7 +102,6 @@ impl StreamManager {
StreamError::BackendSpecific { err: _ } => {}
}
}

// check for device changes if a custom device hasn't been specified
// Disabled on macos due to audio artifacts that seem to occur when the device is
// queried while playing.
Expand All @@ -145,7 +120,11 @@ impl StreamManager {
}
}

fn start_stream(&mut self, device: &Device, config: &mut StreamConfig) -> Result<(), Error> {
fn start_stream(
&mut self,
device: &Device,
config: &mut StreamConfig,
) -> Result<Consumer<StreamError>, Error> {
let mut renderer =
if let State::Idle { renderer } = std::mem::replace(&mut self.state, State::Empty) {
renderer
Expand All @@ -161,8 +140,9 @@ impl StreamManager {
self.device_name = device_name;
self.sample_rate = sample_rate;
let (mut renderer_wrapper, renderer_consumer) = SendOnDrop::new(renderer);
let stream_error_producer = StreamErrorQueue::default();
let stream_error_consumer = stream_error_producer.clone();
let (mut stream_error_producer, stream_error_consumer) = RingBuffer::new(64);
let stream_errors_discarded = Arc::new(AtomicU64::new(0));
let stream_errors_discarded_clone = Arc::clone(&stream_errors_discarded);
let channels = config.channels;
let stream = device.build_output_stream(
config,
Expand All @@ -174,23 +154,21 @@ impl StreamManager {
#[cfg(not(feature = "assert_no_alloc"))]
process_renderer(&mut renderer_wrapper, data, channels, sample_rate);
},
move |error| {
if let Ok(mut queue_lock) = stream_error_producer.queue.lock() {
// If the stream error queue mutex is poisoned,
// the stream manager thread panicked while holding the lock,
// which must be an allocator error.
queue_lock.push(error);
move |error| match stream_error_producer.push(error) {
Ok(()) => {}
Err(PushError::Full(_stream_error)) => {
stream_errors_discarded.fetch_add(1, Ordering::AcqRel);
}
},
None,
)?;
stream.play()?;
self.state = State::Running {
stream,
stream_error_consumer,
_stream_errors_discarded: stream_errors_discarded_clone,
renderer_consumer,
};
Ok(())
Ok(stream_error_consumer)
}

fn stop_stream(&mut self) {
Expand Down