Skip to content

Commit 06a0745

Browse files
authored
feat: make Stream implement Send on all hosts (#1021)
* feat: add compile-time Send assertions for Stream types in all hosts * feat: impl Send for Stream on CoreAudio, WASAPI, WASM * docs: note removal of Clone from Stream * refactor: move error callback handling into helper function Consolidate repeated error callback logic into invoke_error_callback for better readability and maintainability. Update usages in device.rs and disconnect manager. Add documentation for the helper.
1 parent 8c4d971 commit 06a0745

File tree

15 files changed

+298
-106
lines changed

15 files changed

+298
-106
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,18 @@
2222
- CoreAudio: Update `mach2` to 0.5.
2323
- CoreAudio: Configure device buffer to ensure predictable callback buffer sizes.
2424
- CoreAudio: Fix timestamp accuracy.
25+
- CoreAudio: Make `Stream` implement `Send`.
26+
- CoreAudio: Remove `Clone` impl from `Stream`.
2527
- Emscripten: Add `BufferSize::Fixed` validation against supported range.
2628
- iOS: Fix example by properly activating audio session.
2729
- iOS: Add complete AVAudioSession integration for device enumeration and buffer size control.
2830
- JACK: Add `BufferSize::Fixed` validation to reject requests that don't match server buffer size.
2931
- WASAPI: Expose `IMMDevice` from WASAPI host Device.
3032
- WASAPI: Add `I24` and `U24` sample format support (24-bit samples stored in 4 bytes).
3133
- WASAPI: Update `windows` to >= 0.58, <= 0.62.
34+
- WASAPI: Make `Stream` implement `Send`.
3235
- Wasm: Removed optional `wee-alloc` feature for security reasons.
36+
- Wasm: Make `Stream` implement `Send`.
3337
- WebAudio: Add `BufferSize::Fixed` validation against supported range.
3438

3539
# Version 0.16.0 (2025-06-07)

src/host/aaudio/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ pub enum Stream {
4949
// TODO: Is this still in-progress? https://github.com/rust-mobile/ndk/pull/497
5050
unsafe impl Send for Stream {}
5151

52+
// Compile-time assertion that Stream is Send
53+
crate::assert_stream_send!(Stream);
54+
5255
pub type SupportedInputConfigs = VecIntoIter<SupportedStreamConfigRange>;
5356
pub type SupportedOutputConfigs = VecIntoIter<SupportedStreamConfigRange>;
5457
pub type Devices = VecIntoIter<Device>;

src/host/alsa/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,9 @@ pub struct Stream {
624624
trigger: TriggerSender,
625625
}
626626

627+
// Compile-time assertion that Stream is Send
628+
crate::assert_stream_send!(Stream);
629+
627630
struct StreamWorkerContext {
628631
descriptors: Box<[libc::pollfd]>,
629632
transfer_buffer: Box<[u8]>,

src/host/asio/stream.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ pub struct Stream {
2222
callback_id: sys::CallbackId,
2323
}
2424

25+
// Compile-time assertion that Stream is Send
26+
crate::assert_stream_send!(Stream);
27+
2528
impl Stream {
2629
pub fn play(&self) -> Result<(), PlayStreamError> {
2730
self.playing.store(true, Ordering::SeqCst);

src/host/coreaudio/ios/mod.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! CoreAudio implementation for iOS using AVAudioSession and RemoteIO Audio Units.
22
3-
use std::cell::RefCell;
3+
use std::sync::Mutex;
44

55
use coreaudio::audio_unit::render_callback::data;
66
use coreaudio::audio_unit::{render_callback, AudioUnit, Element, Scope};
@@ -212,20 +212,27 @@ impl DeviceTrait for Device {
212212
}
213213

214214
pub struct Stream {
215-
inner: RefCell<StreamInner>,
215+
inner: Mutex<StreamInner>,
216216
}
217217

218218
impl Stream {
219219
fn new(inner: StreamInner) -> Self {
220220
Self {
221-
inner: RefCell::new(inner),
221+
inner: Mutex::new(inner),
222222
}
223223
}
224224
}
225225

226226
impl StreamTrait for Stream {
227227
fn play(&self) -> Result<(), PlayStreamError> {
228-
let mut stream = self.inner.borrow_mut();
228+
let mut stream = self
229+
.inner
230+
.lock()
231+
.map_err(|_| PlayStreamError::BackendSpecific {
232+
err: BackendSpecificError {
233+
description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(),
234+
},
235+
})?;
229236

230237
if !stream.playing {
231238
if let Err(e) = stream.audio_unit.start() {
@@ -239,7 +246,14 @@ impl StreamTrait for Stream {
239246
}
240247

241248
fn pause(&self) -> Result<(), PauseStreamError> {
242-
let mut stream = self.inner.borrow_mut();
249+
let mut stream = self
250+
.inner
251+
.lock()
252+
.map_err(|_| PauseStreamError::BackendSpecific {
253+
err: BackendSpecificError {
254+
description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(),
255+
},
256+
})?;
243257

244258
if stream.playing {
245259
if let Err(e) = stream.audio_unit.stop() {

src/host/coreaudio/macos/device.rs

Lines changed: 74 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,12 @@ use objc2_audio_toolbox::{
1818
};
1919
use objc2_core_audio::{
2020
kAudioDevicePropertyAvailableNominalSampleRates, kAudioDevicePropertyBufferFrameSize,
21-
kAudioDevicePropertyBufferFrameSizeRange, kAudioDevicePropertyDeviceIsAlive,
22-
kAudioDevicePropertyNominalSampleRate, kAudioDevicePropertyStreamConfiguration,
23-
kAudioDevicePropertyStreamFormat, kAudioObjectPropertyElementMaster,
24-
kAudioObjectPropertyScopeGlobal, kAudioObjectPropertyScopeInput,
25-
kAudioObjectPropertyScopeOutput, AudioDeviceID, AudioObjectGetPropertyData,
26-
AudioObjectGetPropertyDataSize, AudioObjectID, AudioObjectPropertyAddress,
27-
AudioObjectPropertyScope, AudioObjectSetPropertyData,
21+
kAudioDevicePropertyBufferFrameSizeRange, kAudioDevicePropertyNominalSampleRate,
22+
kAudioDevicePropertyStreamConfiguration, kAudioDevicePropertyStreamFormat,
23+
kAudioObjectPropertyElementMaster, kAudioObjectPropertyScopeGlobal,
24+
kAudioObjectPropertyScopeInput, kAudioObjectPropertyScopeOutput, AudioDeviceID,
25+
AudioObjectGetPropertyData, AudioObjectGetPropertyDataSize, AudioObjectID,
26+
AudioObjectPropertyAddress, AudioObjectPropertyScope, AudioObjectSetPropertyData,
2827
};
2928
use objc2_core_audio_types::{
3029
AudioBuffer, AudioBufferList, AudioStreamBasicDescription, AudioValueRange,
@@ -36,12 +35,12 @@ pub use super::enumerate::{
3635
use std::fmt;
3736
use std::mem::{self};
3837
use std::ptr::{null, NonNull};
39-
use std::rc::Rc;
4038
use std::slice;
4139
use std::sync::mpsc::{channel, RecvTimeoutError};
4240
use std::sync::{Arc, Mutex};
4341
use std::time::{Duration, Instant};
4442

43+
use super::invoke_error_callback;
4544
use super::property_listener::AudioObjectPropertyListener;
4645
use coreaudio::audio_unit::macos_helpers::get_device_name;
4746
/// Attempt to set the device sample rate to the provided rate.
@@ -253,45 +252,6 @@ fn get_io_buffer_frame_size_range(
253252
})
254253
}
255254

256-
/// Register the on-disconnect callback.
257-
/// This will both stop the stream and call the error callback with DeviceNotAvailable.
258-
/// This function should only be called once per stream.
259-
fn add_disconnect_listener<E>(
260-
stream: &Stream,
261-
error_callback: Arc<Mutex<E>>,
262-
) -> Result<(), BuildStreamError>
263-
where
264-
E: FnMut(StreamError) + Send + 'static,
265-
{
266-
let stream_inner_weak = Rc::downgrade(&stream.inner);
267-
let mut stream_inner = stream.inner.borrow_mut();
268-
stream_inner._disconnect_listener = Some(AudioObjectPropertyListener::new(
269-
stream_inner.device_id,
270-
AudioObjectPropertyAddress {
271-
mSelector: kAudioDevicePropertyDeviceIsAlive,
272-
mScope: kAudioObjectPropertyScopeGlobal,
273-
mElement: kAudioObjectPropertyElementMaster,
274-
},
275-
move || {
276-
if let Some(stream_inner_strong) = stream_inner_weak.upgrade() {
277-
match stream_inner_strong.try_borrow_mut() {
278-
Ok(mut stream_inner) => {
279-
let _ = stream_inner.pause();
280-
}
281-
Err(_) => {
282-
// Could not acquire mutable borrow. This can occur if there are
283-
// overlapping borrows, if the stream is already in use, or if a panic
284-
// occurred during a previous borrow. Still notify about device
285-
// disconnection even if we can't pause.
286-
}
287-
}
288-
(error_callback.lock().unwrap())(StreamError::DeviceNotAvailable);
289-
}
290-
},
291-
)?);
292-
Ok(())
293-
}
294-
295255
impl DeviceTrait for Device {
296256
type SupportedInputConfigs = SupportedInputConfigs;
297257
type SupportedOutputConfigs = SupportedOutputConfigs;
@@ -710,24 +670,22 @@ impl Device {
710670

711671
type Args = render_callback::Args<data::Raw>;
712672
audio_unit.set_input_callback(move |args: Args| unsafe {
713-
let ptr = (*args.data.data).mBuffers.as_ptr();
714-
let len = (*args.data.data).mNumberBuffers as usize;
715-
let buffers: &[AudioBuffer] = slice::from_raw_parts(ptr, len);
716-
717-
// TODO: Perhaps loop over all buffers instead?
673+
// SAFETY: We configure the stream format as interleaved (via asbd_from_config which
674+
// does not set kAudioFormatFlagIsNonInterleaved). Interleaved format always has
675+
// exactly one buffer containing all channels, so mBuffers[0] is always valid.
718676
let AudioBuffer {
719677
mNumberChannels: channels,
720678
mDataByteSize: data_byte_size,
721679
mData: data,
722-
} = buffers[0];
680+
} = (*args.data.data).mBuffers[0];
723681

724682
let data = data as *mut ();
725683
let len = data_byte_size as usize / bytes_per_channel;
726684
let data = Data::from_parts(data, len, sample_format);
727685

728686
let callback = match host_time_to_stream_instant(args.time_stamp.mHostTime) {
729687
Err(err) => {
730-
(error_callback.lock().unwrap())(err.into());
688+
invoke_error_callback(&error_callback, err.into());
731689
return Err(());
732690
}
733691
Ok(cb) => cb,
@@ -750,21 +708,36 @@ impl Device {
750708
Ok(())
751709
})?;
752710

753-
let stream = Stream::new(StreamInner {
754-
playing: true,
755-
_disconnect_listener: None,
756-
audio_unit,
757-
device_id: self.audio_device_id,
758-
_loopback_device: loopback_aggregate,
759-
});
760-
761-
// If we didn't request the default device, stop the stream if the
762-
// device disconnects.
763-
if !is_default_device(self) {
764-
add_disconnect_listener(&stream, error_callback_disconnect)?;
765-
}
711+
// Create error callback for stream - either dummy or real based on device type
712+
let error_callback_for_stream: super::ErrorCallback = if is_default_device(self) {
713+
Box::new(|_: StreamError| {})
714+
} else {
715+
let error_callback_clone = error_callback_disconnect.clone();
716+
Box::new(move |err: StreamError| {
717+
invoke_error_callback(&error_callback_clone, err);
718+
})
719+
};
720+
721+
let stream = Stream::new(
722+
StreamInner {
723+
playing: true,
724+
audio_unit,
725+
device_id: self.audio_device_id,
726+
_loopback_device: loopback_aggregate,
727+
},
728+
error_callback_for_stream,
729+
)?;
766730

767-
stream.inner.borrow_mut().audio_unit.start()?;
731+
stream
732+
.inner
733+
.lock()
734+
.map_err(|_| BuildStreamError::BackendSpecific {
735+
err: BackendSpecificError {
736+
description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(),
737+
},
738+
})?
739+
.audio_unit
740+
.start()?;
768741

769742
Ok(stream)
770743
}
@@ -800,9 +773,9 @@ impl Device {
800773

801774
type Args = render_callback::Args<data::Raw>;
802775
audio_unit.set_render_callback(move |args: Args| unsafe {
803-
// If `run()` is currently running, then a callback will be available from this list.
804-
// Otherwise, we just fill the buffer with zeroes and return.
805-
776+
// SAFETY: We configure the stream format as interleaved (via asbd_from_config which
777+
// does not set kAudioFormatFlagIsNonInterleaved). Interleaved format always has
778+
// exactly one buffer containing all channels, so mBuffers[0] is always valid.
806779
let AudioBuffer {
807780
mNumberChannels: channels,
808781
mDataByteSize: data_byte_size,
@@ -815,7 +788,7 @@ impl Device {
815788

816789
let callback = match host_time_to_stream_instant(args.time_stamp.mHostTime) {
817790
Err(err) => {
818-
(error_callback.lock().unwrap())(err.into());
791+
invoke_error_callback(&error_callback, err.into());
819792
return Err(());
820793
}
821794
Ok(cb) => cb,
@@ -838,21 +811,36 @@ impl Device {
838811
Ok(())
839812
})?;
840813

841-
let stream = Stream::new(StreamInner {
842-
playing: true,
843-
_disconnect_listener: None,
844-
audio_unit,
845-
device_id: self.audio_device_id,
846-
_loopback_device: None,
847-
});
848-
849-
// If we didn't request the default device, stop the stream if the
850-
// device disconnects.
851-
if !is_default_device(self) {
852-
add_disconnect_listener(&stream, error_callback_disconnect)?;
853-
}
814+
// Create error callback for stream - either dummy or real based on device type
815+
let error_callback_for_stream: super::ErrorCallback = if is_default_device(self) {
816+
Box::new(|_: StreamError| {})
817+
} else {
818+
let error_callback_clone = error_callback_disconnect.clone();
819+
Box::new(move |err: StreamError| {
820+
invoke_error_callback(&error_callback_clone, err);
821+
})
822+
};
823+
824+
let stream = Stream::new(
825+
StreamInner {
826+
playing: true,
827+
audio_unit,
828+
device_id: self.audio_device_id,
829+
_loopback_device: None,
830+
},
831+
error_callback_for_stream,
832+
)?;
854833

855-
stream.inner.borrow_mut().audio_unit.start()?;
834+
stream
835+
.inner
836+
.lock()
837+
.map_err(|_| BuildStreamError::BackendSpecific {
838+
err: BackendSpecificError {
839+
description: "A cpal stream operation panicked while holding the lock - this is a bug, please report it".to_string(),
840+
},
841+
})?
842+
.audio_unit
843+
.start()?;
856844

857845
Ok(stream)
858846
}

src/host/coreaudio/macos/enumerate.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,6 @@ impl Devices {
7373
}
7474
}
7575

76-
unsafe impl Send for Devices {}
77-
unsafe impl Sync for Devices {}
78-
7976
impl Iterator for Devices {
8077
type Item = Device;
8178
fn next(&mut self) -> Option<Device> {

0 commit comments

Comments
 (0)