diff --git a/CHANGELOG.md b/CHANGELOG.md index dd5d47ac0..bf88d2465 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,8 @@ - CoreAudio: Update `mach2` to 0.5. - CoreAudio: Configure device buffer to ensure predictable callback buffer sizes. - CoreAudio: Fix timestamp accuracy. +- CoreAudio: Make `Stream` implement `Send`. +- CoreAudio: Remove `Clone` impl from `Stream`. - Emscripten: Add `BufferSize::Fixed` validation against supported range. - iOS: Fix example by properly activating audio session. - iOS: Add complete AVAudioSession integration for device enumeration and buffer size control. @@ -29,7 +31,9 @@ - WASAPI: Expose `IMMDevice` from WASAPI host Device. - WASAPI: Add `I24` and `U24` sample format support (24-bit samples stored in 4 bytes). - WASAPI: Update `windows` to >= 0.58, <= 0.62. +- WASAPI: Make `Stream` implement `Send`. - Wasm: Removed optional `wee-alloc` feature for security reasons. +- Wasm: Make `Stream` implement `Send`. - WebAudio: Add `BufferSize::Fixed` validation against supported range. # Version 0.16.0 (2025-06-07) diff --git a/src/host/aaudio/mod.rs b/src/host/aaudio/mod.rs index 658568731..a1e48964b 100644 --- a/src/host/aaudio/mod.rs +++ b/src/host/aaudio/mod.rs @@ -49,6 +49,9 @@ pub enum Stream { // TODO: Is this still in-progress? https://github.com/rust-mobile/ndk/pull/497 unsafe impl Send for Stream {} +// Compile-time assertion that Stream is Send +crate::assert_stream_send!(Stream); + pub type SupportedInputConfigs = VecIntoIter; pub type SupportedOutputConfigs = VecIntoIter; pub type Devices = VecIntoIter; diff --git a/src/host/alsa/mod.rs b/src/host/alsa/mod.rs index f270777ef..075a26e9b 100644 --- a/src/host/alsa/mod.rs +++ b/src/host/alsa/mod.rs @@ -624,6 +624,9 @@ pub struct Stream { trigger: TriggerSender, } +// Compile-time assertion that Stream is Send +crate::assert_stream_send!(Stream); + struct StreamWorkerContext { descriptors: Box<[libc::pollfd]>, transfer_buffer: Box<[u8]>, diff --git a/src/host/asio/stream.rs b/src/host/asio/stream.rs index 5539316a1..b2f76b9ae 100644 --- a/src/host/asio/stream.rs +++ b/src/host/asio/stream.rs @@ -22,6 +22,9 @@ pub struct Stream { callback_id: sys::CallbackId, } +// Compile-time assertion that Stream is Send +crate::assert_stream_send!(Stream); + impl Stream { pub fn play(&self) -> Result<(), PlayStreamError> { self.playing.store(true, Ordering::SeqCst); diff --git a/src/host/coreaudio/ios/mod.rs b/src/host/coreaudio/ios/mod.rs index a52914410..521cfb376 100644 --- a/src/host/coreaudio/ios/mod.rs +++ b/src/host/coreaudio/ios/mod.rs @@ -1,6 +1,6 @@ //! CoreAudio implementation for iOS using AVAudioSession and RemoteIO Audio Units. -use std::cell::RefCell; +use std::sync::Mutex; use coreaudio::audio_unit::render_callback::data; use coreaudio::audio_unit::{render_callback, AudioUnit, Element, Scope}; @@ -212,20 +212,27 @@ impl DeviceTrait for Device { } pub struct Stream { - inner: RefCell, + inner: Mutex, } impl Stream { fn new(inner: StreamInner) -> Self { Self { - inner: RefCell::new(inner), + inner: Mutex::new(inner), } } } impl StreamTrait for Stream { fn play(&self) -> Result<(), PlayStreamError> { - let mut stream = self.inner.borrow_mut(); + let mut stream = self + .inner + .lock() + .map_err(|_| PlayStreamError::BackendSpecific { + err: BackendSpecificError { + description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(), + }, + })?; if !stream.playing { if let Err(e) = stream.audio_unit.start() { @@ -239,7 +246,14 @@ impl StreamTrait for Stream { } fn pause(&self) -> Result<(), PauseStreamError> { - let mut stream = self.inner.borrow_mut(); + let mut stream = self + .inner + .lock() + .map_err(|_| PauseStreamError::BackendSpecific { + err: BackendSpecificError { + description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(), + }, + })?; if stream.playing { if let Err(e) = stream.audio_unit.stop() { diff --git a/src/host/coreaudio/macos/device.rs b/src/host/coreaudio/macos/device.rs index 0d5150fcd..154a4b1f1 100644 --- a/src/host/coreaudio/macos/device.rs +++ b/src/host/coreaudio/macos/device.rs @@ -18,13 +18,12 @@ use objc2_audio_toolbox::{ }; use objc2_core_audio::{ kAudioDevicePropertyAvailableNominalSampleRates, kAudioDevicePropertyBufferFrameSize, - kAudioDevicePropertyBufferFrameSizeRange, kAudioDevicePropertyDeviceIsAlive, - kAudioDevicePropertyNominalSampleRate, kAudioDevicePropertyStreamConfiguration, - kAudioDevicePropertyStreamFormat, kAudioObjectPropertyElementMaster, - kAudioObjectPropertyScopeGlobal, kAudioObjectPropertyScopeInput, - kAudioObjectPropertyScopeOutput, AudioDeviceID, AudioObjectGetPropertyData, - AudioObjectGetPropertyDataSize, AudioObjectID, AudioObjectPropertyAddress, - AudioObjectPropertyScope, AudioObjectSetPropertyData, + kAudioDevicePropertyBufferFrameSizeRange, kAudioDevicePropertyNominalSampleRate, + kAudioDevicePropertyStreamConfiguration, kAudioDevicePropertyStreamFormat, + kAudioObjectPropertyElementMaster, kAudioObjectPropertyScopeGlobal, + kAudioObjectPropertyScopeInput, kAudioObjectPropertyScopeOutput, AudioDeviceID, + AudioObjectGetPropertyData, AudioObjectGetPropertyDataSize, AudioObjectID, + AudioObjectPropertyAddress, AudioObjectPropertyScope, AudioObjectSetPropertyData, }; use objc2_core_audio_types::{ AudioBuffer, AudioBufferList, AudioStreamBasicDescription, AudioValueRange, @@ -36,12 +35,12 @@ pub use super::enumerate::{ use std::fmt; use std::mem::{self}; use std::ptr::{null, NonNull}; -use std::rc::Rc; use std::slice; use std::sync::mpsc::{channel, RecvTimeoutError}; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; +use super::invoke_error_callback; use super::property_listener::AudioObjectPropertyListener; use coreaudio::audio_unit::macos_helpers::get_device_name; /// Attempt to set the device sample rate to the provided rate. @@ -253,45 +252,6 @@ fn get_io_buffer_frame_size_range( }) } -/// Register the on-disconnect callback. -/// This will both stop the stream and call the error callback with DeviceNotAvailable. -/// This function should only be called once per stream. -fn add_disconnect_listener( - stream: &Stream, - error_callback: Arc>, -) -> Result<(), BuildStreamError> -where - E: FnMut(StreamError) + Send + 'static, -{ - let stream_inner_weak = Rc::downgrade(&stream.inner); - let mut stream_inner = stream.inner.borrow_mut(); - stream_inner._disconnect_listener = Some(AudioObjectPropertyListener::new( - stream_inner.device_id, - AudioObjectPropertyAddress { - mSelector: kAudioDevicePropertyDeviceIsAlive, - mScope: kAudioObjectPropertyScopeGlobal, - mElement: kAudioObjectPropertyElementMaster, - }, - move || { - if let Some(stream_inner_strong) = stream_inner_weak.upgrade() { - match stream_inner_strong.try_borrow_mut() { - Ok(mut stream_inner) => { - let _ = stream_inner.pause(); - } - Err(_) => { - // Could not acquire mutable borrow. This can occur if there are - // overlapping borrows, if the stream is already in use, or if a panic - // occurred during a previous borrow. Still notify about device - // disconnection even if we can't pause. - } - } - (error_callback.lock().unwrap())(StreamError::DeviceNotAvailable); - } - }, - )?); - Ok(()) -} - impl DeviceTrait for Device { type SupportedInputConfigs = SupportedInputConfigs; type SupportedOutputConfigs = SupportedOutputConfigs; @@ -710,16 +670,14 @@ impl Device { type Args = render_callback::Args; audio_unit.set_input_callback(move |args: Args| unsafe { - let ptr = (*args.data.data).mBuffers.as_ptr(); - let len = (*args.data.data).mNumberBuffers as usize; - let buffers: &[AudioBuffer] = slice::from_raw_parts(ptr, len); - - // TODO: Perhaps loop over all buffers instead? + // SAFETY: We configure the stream format as interleaved (via asbd_from_config which + // does not set kAudioFormatFlagIsNonInterleaved). Interleaved format always has + // exactly one buffer containing all channels, so mBuffers[0] is always valid. let AudioBuffer { mNumberChannels: channels, mDataByteSize: data_byte_size, mData: data, - } = buffers[0]; + } = (*args.data.data).mBuffers[0]; let data = data as *mut (); let len = data_byte_size as usize / bytes_per_channel; @@ -727,7 +685,7 @@ impl Device { let callback = match host_time_to_stream_instant(args.time_stamp.mHostTime) { Err(err) => { - (error_callback.lock().unwrap())(err.into()); + invoke_error_callback(&error_callback, err.into()); return Err(()); } Ok(cb) => cb, @@ -750,21 +708,36 @@ impl Device { Ok(()) })?; - let stream = Stream::new(StreamInner { - playing: true, - _disconnect_listener: None, - audio_unit, - device_id: self.audio_device_id, - _loopback_device: loopback_aggregate, - }); - - // If we didn't request the default device, stop the stream if the - // device disconnects. - if !is_default_device(self) { - add_disconnect_listener(&stream, error_callback_disconnect)?; - } + // Create error callback for stream - either dummy or real based on device type + let error_callback_for_stream: super::ErrorCallback = if is_default_device(self) { + Box::new(|_: StreamError| {}) + } else { + let error_callback_clone = error_callback_disconnect.clone(); + Box::new(move |err: StreamError| { + invoke_error_callback(&error_callback_clone, err); + }) + }; + + let stream = Stream::new( + StreamInner { + playing: true, + audio_unit, + device_id: self.audio_device_id, + _loopback_device: loopback_aggregate, + }, + error_callback_for_stream, + )?; - stream.inner.borrow_mut().audio_unit.start()?; + stream + .inner + .lock() + .map_err(|_| BuildStreamError::BackendSpecific { + err: BackendSpecificError { + description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(), + }, + })? + .audio_unit + .start()?; Ok(stream) } @@ -800,9 +773,9 @@ impl Device { type Args = render_callback::Args; audio_unit.set_render_callback(move |args: Args| unsafe { - // If `run()` is currently running, then a callback will be available from this list. - // Otherwise, we just fill the buffer with zeroes and return. - + // SAFETY: We configure the stream format as interleaved (via asbd_from_config which + // does not set kAudioFormatFlagIsNonInterleaved). Interleaved format always has + // exactly one buffer containing all channels, so mBuffers[0] is always valid. let AudioBuffer { mNumberChannels: channels, mDataByteSize: data_byte_size, @@ -815,7 +788,7 @@ impl Device { let callback = match host_time_to_stream_instant(args.time_stamp.mHostTime) { Err(err) => { - (error_callback.lock().unwrap())(err.into()); + invoke_error_callback(&error_callback, err.into()); return Err(()); } Ok(cb) => cb, @@ -838,21 +811,36 @@ impl Device { Ok(()) })?; - let stream = Stream::new(StreamInner { - playing: true, - _disconnect_listener: None, - audio_unit, - device_id: self.audio_device_id, - _loopback_device: None, - }); - - // If we didn't request the default device, stop the stream if the - // device disconnects. - if !is_default_device(self) { - add_disconnect_listener(&stream, error_callback_disconnect)?; - } + // Create error callback for stream - either dummy or real based on device type + let error_callback_for_stream: super::ErrorCallback = if is_default_device(self) { + Box::new(|_: StreamError| {}) + } else { + let error_callback_clone = error_callback_disconnect.clone(); + Box::new(move |err: StreamError| { + invoke_error_callback(&error_callback_clone, err); + }) + }; + + let stream = Stream::new( + StreamInner { + playing: true, + audio_unit, + device_id: self.audio_device_id, + _loopback_device: None, + }, + error_callback_for_stream, + )?; - stream.inner.borrow_mut().audio_unit.start()?; + stream + .inner + .lock() + .map_err(|_| BuildStreamError::BackendSpecific { + err: BackendSpecificError { + description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(), + }, + })? + .audio_unit + .start()?; Ok(stream) } diff --git a/src/host/coreaudio/macos/enumerate.rs b/src/host/coreaudio/macos/enumerate.rs index 806044883..4f337352f 100644 --- a/src/host/coreaudio/macos/enumerate.rs +++ b/src/host/coreaudio/macos/enumerate.rs @@ -73,9 +73,6 @@ impl Devices { } } -unsafe impl Send for Devices {} -unsafe impl Sync for Devices {} - impl Iterator for Devices { type Item = Device; fn next(&mut self) -> Option { diff --git a/src/host/coreaudio/macos/mod.rs b/src/host/coreaudio/macos/mod.rs index e29d05768..a7a025166 100644 --- a/src/host/coreaudio/macos/mod.rs +++ b/src/host/coreaudio/macos/mod.rs @@ -7,11 +7,14 @@ use crate::traits::{HostTrait, StreamTrait}; use crate::{BackendSpecificError, DevicesError, PauseStreamError, PlayStreamError}; use coreaudio::audio_unit::AudioUnit; use objc2_core_audio::AudioDeviceID; -use std::cell::RefCell; -use std::rc::Rc; +use std::sync::{mpsc, Arc, Mutex, Weak}; pub use self::enumerate::{default_input_device, default_output_device, Devices}; +use objc2_core_audio::{ + kAudioDevicePropertyDeviceIsAlive, kAudioObjectPropertyElementMain, + kAudioObjectPropertyScopeGlobal, AudioObjectPropertyAddress, +}; use property_listener::AudioObjectPropertyListener; mod device; @@ -52,11 +55,123 @@ impl HostTrait for Host { } } +/// Type alias for the error callback to reduce complexity +type ErrorCallback = Box; + +/// Invoke error callback, recovering from poisoned mutex if needed. +/// Returns true if callback was invoked, false if skipped due to WouldBlock. +#[inline] +fn invoke_error_callback(error_callback: &Arc>, err: crate::StreamError) -> bool +where + E: FnMut(crate::StreamError) + Send, +{ + match error_callback.try_lock() { + Ok(mut cb) => { + cb(err); + true + } + Err(std::sync::TryLockError::Poisoned(guard)) => { + // Recover from poisoned lock to still report this error + guard.into_inner()(err); + true + } + Err(std::sync::TryLockError::WouldBlock) => { + // Skip if callback is busy + false + } + } +} + +/// Manages device disconnection listener on a dedicated thread to ensure the +/// AudioObjectPropertyListener is always created and dropped on the same thread. +/// This avoids potential threading issues with CoreAudio APIs. +/// +/// When a device disconnects, this manager: +/// 1. Attempts to pause the stream to stop audio I/O +/// 2. Calls the error callback with `StreamError::DeviceNotAvailable` +/// +/// The dedicated thread architecture ensures `Stream` can implement `Send`. +struct DisconnectManager { + _shutdown_tx: mpsc::Sender<()>, +} + +impl DisconnectManager { + /// Create a new DisconnectManager that monitors device disconnection on a dedicated thread + fn new( + device_id: AudioDeviceID, + stream_weak: Weak>, + error_callback: Arc>, + ) -> Result { + let (shutdown_tx, shutdown_rx) = mpsc::channel(); + let (disconnect_tx, disconnect_rx) = mpsc::channel(); + let (ready_tx, ready_rx) = mpsc::channel(); + + // Spawn dedicated thread to own the AudioObjectPropertyListener + let disconnect_tx_clone = disconnect_tx.clone(); + std::thread::spawn(move || { + let property_address = AudioObjectPropertyAddress { + mSelector: kAudioDevicePropertyDeviceIsAlive, + mScope: kAudioObjectPropertyScopeGlobal, + mElement: kAudioObjectPropertyElementMain, + }; + + // Create the listener on this dedicated thread + match AudioObjectPropertyListener::new(device_id, property_address, move || { + let _ = disconnect_tx_clone.send(()); + }) { + Ok(_listener) => { + let _ = ready_tx.send(Ok(())); + // Drop the listener on this thread after receiving a shutdown signal + let _ = shutdown_rx.recv(); + } + Err(e) => { + let _ = ready_tx.send(Err(e)); + } + } + }); + + // Wait for listener creation to complete or fail + ready_rx + .recv() + .map_err(|_| crate::BuildStreamError::BackendSpecific { + err: BackendSpecificError { + description: "Disconnect listener thread terminated unexpectedly".to_string(), + }, + })??; + + // Handle disconnect events on the main thread pool + let stream_weak_clone = stream_weak.clone(); + let error_callback_clone = error_callback.clone(); + std::thread::spawn(move || { + while disconnect_rx.recv().is_ok() { + // Check if stream still exists + if let Some(stream_arc) = stream_weak_clone.upgrade() { + // First, try to pause the stream to stop playback + if let Ok(mut stream_inner) = stream_arc.try_lock() { + let _ = stream_inner.pause(); + } + + // Always try to notify about device disconnection + invoke_error_callback( + &error_callback_clone, + crate::StreamError::DeviceNotAvailable, + ); + } else { + // Stream is gone, exit the handler thread + break; + } + } + }); + + Ok(DisconnectManager { + _shutdown_tx: shutdown_tx, + }) + } +} + struct StreamInner { playing: bool, audio_unit: AudioUnit, - /// Manage the lifetime of the closure that handles device disconnection. - _disconnect_listener: Option, // Track the device with which the audio unit was spawned. // // We must do this so that we can avoid changing the device sample rate if there is already @@ -94,28 +209,55 @@ impl StreamInner { } } -#[derive(Clone)] pub struct Stream { - inner: Rc>, + inner: Arc>, + // Manages the device disconnection listener separately to allow Stream to be Send. + // The DisconnectManager contains the non-Send AudioObjectPropertyListener. + _disconnect_manager: DisconnectManager, } impl Stream { - fn new(inner: StreamInner) -> Self { - Self { - inner: Rc::new(RefCell::new(inner)), - } + fn new( + inner: StreamInner, + error_callback: ErrorCallback, + ) -> Result { + let device_id = inner.device_id; + let inner_arc = Arc::new(Mutex::new(inner)); + let weak_inner = Arc::downgrade(&inner_arc); + + let error_callback = Arc::new(Mutex::new(error_callback)); + let disconnect_manager = DisconnectManager::new(device_id, weak_inner, error_callback)?; + + Ok(Self { + inner: inner_arc, + _disconnect_manager: disconnect_manager, + }) } } impl StreamTrait for Stream { fn play(&self) -> Result<(), PlayStreamError> { - let mut stream = self.inner.borrow_mut(); + let mut stream = self + .inner + .lock() + .map_err(|_| PlayStreamError::BackendSpecific { + err: BackendSpecificError { + description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(), + }, + })?; stream.play() } fn pause(&self) -> Result<(), PauseStreamError> { - let mut stream = self.inner.borrow_mut(); + let mut stream = self + .inner + .lock() + .map_err(|_| PauseStreamError::BackendSpecific { + err: BackendSpecificError { + description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(), + }, + })?; stream.pause() } diff --git a/src/host/coreaudio/mod.rs b/src/host/coreaudio/mod.rs index ab4950c82..ed276eca0 100644 --- a/src/host/coreaudio/mod.rs +++ b/src/host/coreaudio/mod.rs @@ -126,3 +126,6 @@ impl From for DefaultStreamConfigError { } pub(crate) type OSStatus = i32; + +// Compile-time assertion that Stream is Send +crate::assert_stream_send!(Stream); diff --git a/src/host/emscripten/mod.rs b/src/host/emscripten/mod.rs index 7a548746e..dafc041af 100644 --- a/src/host/emscripten/mod.rs +++ b/src/host/emscripten/mod.rs @@ -34,6 +34,12 @@ pub struct Stream { audio_ctxt: AudioContext, } +// WASM runs in a single-threaded environment, so Send is safe by design. +unsafe impl Send for Stream {} + +// Compile-time assertion that Stream is Send +crate::assert_stream_send!(Stream); + // Index within the `streams` array of the events loop. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct StreamId(usize); diff --git a/src/host/jack/stream.rs b/src/host/jack/stream.rs index 0d48530aa..a77c3240e 100644 --- a/src/host/jack/stream.rs +++ b/src/host/jack/stream.rs @@ -21,6 +21,9 @@ pub struct Stream { output_port_names: Vec, } +// Compile-time assertion that Stream is Send +crate::assert_stream_send!(Stream); + impl Stream { // TODO: Return error messages pub fn new_input( diff --git a/src/host/mod.rs b/src/host/mod.rs index 0c61a5910..1aecf93ab 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -28,3 +28,13 @@ pub(crate) mod null; pub(crate) mod wasapi; #[cfg(all(target_arch = "wasm32", feature = "wasm-bindgen"))] pub(crate) mod webaudio; + +/// Compile-time assertion that a type implements Send. +/// Use this macro in each host module to ensure Stream is Send. +#[macro_export] +macro_rules! assert_stream_send { + ($t:ty) => { + const fn _assert_stream_send() {} + const _: () = _assert_stream_send::<$t>(); + }; +} diff --git a/src/host/null/mod.rs b/src/host/null/mod.rs index 0a9752b70..16d83fb32 100644 --- a/src/host/null/mod.rs +++ b/src/host/null/mod.rs @@ -19,6 +19,9 @@ pub struct Host; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Stream; +// Compile-time assertion that Stream is Send +crate::assert_stream_send!(Stream); + #[derive(Clone)] pub struct SupportedInputConfigs; #[derive(Clone)] diff --git a/src/host/wasapi/stream.rs b/src/host/wasapi/stream.rs index 6b2d1c207..81f73d53c 100644 --- a/src/host/wasapi/stream.rs +++ b/src/host/wasapi/stream.rs @@ -31,6 +31,13 @@ pub struct Stream { pending_scheduled_event: Foundation::HANDLE, } +// Windows Event HANDLEs are safe to send between threads - they are designed for synchronization. +// See: https://learn.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-createeventa +unsafe impl Send for Stream {} + +// Compile-time assertion that Stream is Send +crate::assert_stream_send!(Stream); + struct RunContext { // Streams that have been created in this event loop. stream: StreamInner, diff --git a/src/host/webaudio/mod.rs b/src/host/webaudio/mod.rs index b4d840cae..ec4ed560f 100644 --- a/src/host/webaudio/mod.rs +++ b/src/host/webaudio/mod.rs @@ -31,6 +31,12 @@ pub struct Stream { buffer_size_frames: usize, } +// WASM runs in a single-threaded environment, so Send is safe by design. +unsafe impl Send for Stream {} + +// Compile-time assertion that Stream is Send +crate::assert_stream_send!(Stream); + pub type SupportedInputConfigs = ::std::vec::IntoIter; pub type SupportedOutputConfigs = ::std::vec::IntoIter;