-
Notifications
You must be signed in to change notification settings - Fork 442
PulseAudio support #957
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
PulseAudio support #957
Conversation
72c1c2f
to
a76575b
Compare
@colinmarc |
Pipewire and Pulseaudio are completely different protocols. Pulseaudio is the established linux audio server, and Pipewire is the new hotness. This PR implements the Pulseaudio protocol, while the other PR implements the Pipewire protocol. The other useful thing to know is that Pipewire (the server) supports the Pulse protocol as a first-class thing, and that this library has been tested with both audio servers. That means merging this would be enough to handle both cases. The PA server does not support the Pipewire protocol. Finally, this is just my opinion, but I think the Pipewire protocol is also significantly more complicated. |
I actually misread the issue and would have removed my comment if you weren't so quick to respond! 🤣
Right, since Pipewire could just interpret cpal through its PulseAudio interface. Thank you for the explanation :) |
@colinmarc new maintainer here and doing backlog grooming. So sorry this did not get picked up before, because it seems very worthwhile! Would you be so kind to resolve the conflicts so we can pick it up again? |
4aab015
to
cf32277
Compare
Great :) Just rebased and tests look good. Let me know if you want the first change as a separate PR (or feel free to just drop it). |
Wow, amazing turnaround time! 👍 Coming weeks I don't have access to a machine to test it myself, which, as much I believe you 😉 I would like to do. So for now I'm going to trigger an AI review - hope it's going to bring more value than hallucinations - and take some time for a code review over some days. This is a big contribution.
Yes, that'd be good if you could extract it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds PulseAudio support to CPAL as a new audio backend, addressing issue #259. The implementation introduces a new host type for PulseAudio/PipeWire compatibility and refactors the existing platform macro to improve type safety and maintainability.
- Adds comprehensive PulseAudio backend with input/output stream support and timing information
- Refactors the
impl_platform_host!
macro to use concrete types instead of module names for better type safety - Updates examples to support both JACK and PulseAudio host selection
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 6 comments.
Show a summary per file
File | Description |
---|---|
src/platform/mod.rs | Refactors macro to use concrete types and adds PulseAudio host integration |
src/host/pulseaudio/mod.rs | Implements PulseAudio host, device enumeration, and stream configuration |
src/host/pulseaudio/stream.rs | Implements PulseAudio playback and record stream handling with timing |
src/host/null/mod.rs | Simplifies null host implementation using standard iterators |
src/host/mod.rs | Adds conditional compilation for PulseAudio module |
src/error.rs | Adds InvalidUtf8 variant to DeviceNameError |
examples/feedback.rs | Updates example to support PulseAudio host selection |
examples/beep.rs | Updates example to support PulseAudio host selection |
Cargo.toml | Adds pulseaudio and futures dependencies |
let bps = sample_spec.format.bytes_per_sample(); | ||
let n_samples = buf.len() / bps; | ||
let data = | ||
unsafe { Data::from_parts(buf.as_ptr() as *mut _, n_samples, sample_format) }; |
Copilot
AI
Aug 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Casting a const pointer to mutable is undefined behavior. The input buffer should remain const since it's read-only data. Consider using a different approach that doesn't violate pointer constness.
unsafe { Data::from_parts(buf.as_ptr() as *mut _, n_samples, sample_format) }; | |
Data::from_const_parts(buf.as_ptr(), n_samples, sample_format); |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is done in many places in the codebase, and I don't have a better way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be nice if we could document how we ensure it's safety:
// SAFETY: We verify that:
// - buf.as_ptr() points to valid memory for at least n_samples * bytes_per_sample
// - n_samples is calculated from buf.len() / bytes_per_sample, ensuring validity
// - The buffer remains valid for the duration of the callback
// - sample_format matches the actual data layout in the buffer
|
||
// Spawn a thread to drive the stream future. | ||
let stream_clone = stream.clone(); | ||
let _worker_thread = std::thread::spawn(move || block_on(stream_clone.play_all())); |
Copilot
AI
Aug 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The worker thread handle is dropped immediately, which means there's no way to properly join or manage the thread lifecycle. Consider storing the handle or using a different pattern for thread management.
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It exits when the stream finishes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to document that thread lifecycle.
std::thread::spawn(move || loop { | ||
let Ok(timing_info) = block_on(stream_clone.timing_info()) else { | ||
break; | ||
}; | ||
|
||
store_latency( | ||
&latency_clone, | ||
sample_spec, | ||
timing_info.sink_usec, | ||
timing_info.write_offset, | ||
timing_info.read_offset, | ||
); | ||
|
||
std::thread::sleep(time::Duration::from_millis(100)); | ||
}); | ||
|
Copilot
AI
Aug 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the playback stream, this latency monitoring thread for record streams has no exit condition and will run indefinitely. Consider adding a way to signal thread termination when the stream is dropped.
std::thread::spawn(move || loop { | |
let Ok(timing_info) = block_on(stream_clone.timing_info()) else { | |
break; | |
}; | |
store_latency( | |
&latency_clone, | |
sample_spec, | |
timing_info.sink_usec, | |
timing_info.write_offset, | |
timing_info.read_offset, | |
); | |
std::thread::sleep(time::Duration::from_millis(100)); | |
}); | |
std::thread::spawn(move || { | |
loop { | |
if shutdown_flag_thread.load(atomic::Ordering::Relaxed) { | |
break; | |
} | |
let Ok(timing_info) = block_on(stream_clone.timing_info()) else { | |
break; | |
}; | |
store_latency( | |
&latency_clone, | |
sample_spec, | |
timing_info.sink_usec, | |
timing_info.write_offset, | |
timing_info.read_offset, | |
); | |
std::thread::sleep(time::Duration::from_millis(100)); | |
} | |
}); |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is incorrect, it exits when the stream finishes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yet should the handles not be joined or fused on drop for proper cleanup?
I remember we had to the same here in librespot: librespot-org/librespot@3ce9854 (you can forget about all the parking_lot
stuff and skip right to the drop
implementation at the bottom).
That thing in librespot usually didn't trigger, but it panicked in certain Tokio contexts.
Or am I missing that this is done in another way already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could do something like that, but only if we care about propogating panics. I think in this case that would be counterproductive.
Amazing pull request, but just one comment: in let client =
pulseaudio::Client::from_env(c"cpal-pulseaudio").map_err(|_| HostUnavailable)?; This means apps in the volume mixer will all show up as cpal-pulseaudio, when actually you'd want them to have their own name. It would be cool if I'm able to set this as well as other meta-data like the stream description. |
Thanks - where should I pull that from? I don't see a way to parameterize that on the generic host API. |
Hey @colinmarc, I didn't go through the code in any detail, but I gave your branch a quick test and it does work in my application. Pretty cool! |
Cool, I rebased and added some fixes.
👉 #1004 👈 |
My application now sometimes hangs when running it with the pulseaudio host.
Is as much useful information as I can share right now, I wasn't able to reproduce it outside of my application yet. I also don't know what causes it. |
Please run in debug and share the source, if possible. As it stands there's no way for me to know whether it's a bug in this PR, pulsaudio-rs, or your app. |
Thanks for the quick reply. I managed to get it to happen with a debug binary, I can't share the code of the application in which it happens but I'll try to reproduce it with standalone code or one of the examples but I can't get to it right now. Just wanted to let you know that there might be something.
rust log output
I can more or less reliably reproduce the issue in my application now by switching between devices. I tried adapting the record_wav sample to get the same result but failed so far. |
That was maddening to isolate but I can finally reproduce it outside of my application. The issue seems to happen when alsa and pulse audio devices exist at the same time, even if only one device has an active stream. Here is a crudely modified version of record_wav to reproduce the issue: I think arguably the main issue here is that the alsa device keeps an open handle around when it is not needed and not in the pulse integration. Even though it would be really nice if that failed with an error or timeout when the device is locked. Not keeping the devices around has it's issues too. As far as I know there is no reliable way to refer to a device in cpal other than keeping a reference to it. The only alternative seems to be to find the device by name, but I don't think there is any guarantee that the names are unique. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's a couple of thoughts on the thread management. Thanks for you guys vetting this PR too. Let me know when you feel it's good to go.
let bps = sample_spec.format.bytes_per_sample(); | ||
let n_samples = buf.len() / bps; | ||
let data = | ||
unsafe { Data::from_parts(buf.as_ptr() as *mut _, n_samples, sample_format) }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be nice if we could document how we ensure it's safety:
// SAFETY: We verify that:
// - buf.as_ptr() points to valid memory for at least n_samples * bytes_per_sample
// - n_samples is calculated from buf.len() / bytes_per_sample, ensuring validity
// - The buffer remains valid for the duration of the callback
// - sample_format matches the actual data layout in the buffer
|
||
// Spawn a thread to drive the stream future. | ||
let stream_clone = stream.clone(); | ||
let _worker_thread = std::thread::spawn(move || block_on(stream_clone.play_all())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good to document that thread lifecycle.
std::thread::spawn(move || loop { | ||
let Ok(timing_info) = block_on(stream_clone.timing_info()) else { | ||
break; | ||
}; | ||
|
||
store_latency( | ||
&latency_clone, | ||
sample_spec, | ||
timing_info.sink_usec, | ||
timing_info.write_offset, | ||
timing_info.read_offset, | ||
); | ||
|
||
std::thread::sleep(time::Duration::from_millis(100)); | ||
}); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yet should the handles not be joined or fused on drop for proper cleanup?
I remember we had to the same here in librespot: librespot-org/librespot@3ce9854 (you can forget about all the parking_lot
stuff and skip right to the drop
implementation at the bottom).
That thing in librespot usually didn't trigger, but it panicked in certain Tokio contexts.
Or am I missing that this is done in another way already?
This adds support for PulseAudio on hosts with a PA or PipeWire server (the latter via pipewire-pulse). Since the underlying client is async, some amount of bridging has to be done.
Pushed fixes for the documentation issues you brought up in review. For the hang @jwagner diagnosed (thank you for chasing that down!), we would need to add a timeout to On the other hand, the issue is a bit niche, and, as @jwagner said, probably a bug on the ALSA side. Most people will use either pulse or ALSA, not both. So we could just open an issue for someone else to take a look at in the future, and merge this as-is. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@colinmarc I appreciate you sticking through to get this right: adding a new host is quite a thing. Thinking about it again I'm wondering about the blocking behavior. Can you help me think through whether we're not missing edge cases that would warrant a separate worker thread with more channel-based communication?
A few smaller points beside that more architectural question, too.
Thanks again 🙏
// Run for 3 seconds before closing. | ||
println!("Playing for 3 seconds... "); | ||
std::thread::sleep(std::time::Duration::from_secs(3)); | ||
// Run for 10 seconds before closing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the change to 10 seconds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my testing, pulseaudio has a lot more recording delay. Probably it's configurable, but idk if it's worth muddying the example.
/// See the [`BackendSpecificError`] docs for more information about this error variant. | ||
BackendSpecific { err: BackendSpecificError }, | ||
/// The name is not valid UTF-8. | ||
InvalidUtf8, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we'd use String::from_utf8_lossy
and dispense with this errorr variant?
@@ -0,0 +1,395 @@ | |||
extern crate pulseaudio; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessary in Rust 2021.
@@ -0,0 +1,395 @@ | |||
extern crate pulseaudio; | |||
|
|||
use futures::executor::block_on; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry in advance for raising the big question: instead of blocking in many places, should we not spawn a dedicated thread and/or use a channel-based approach? We're trying to not block in other hosts too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't block any other hosts, because cpal isn't async (right?). And we do use dedicated threads in this PR for exactly that reason. Please tell me if I'm misunderstanding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case it helps clear up any misunderstanding, calling stream.play
synchronously uncorks the stream (it waits until the server responds to the request, which should be instantaneous), and then returns. The stream is driven by a dedicated thread. That should be similar to how the other hosts work.
|
||
fn devices(&self) -> Result<Self::Devices, DevicesError> { | ||
let sinks = block_on(self.client.list_sinks()).map_err(|_| BackendSpecificError { | ||
description: "Failed to list sinks".to_owned(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could carry over the error like format!("Failed to list sinks: {e}")
.
let bps = sample_spec.format.bytes_per_sample(); | ||
let n_samples = buf.len() / bps; | ||
|
||
// SAFETY: We verify that: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where do we verify that? Should we add an assertion that buf.len() % bps == 0
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Heh, this was your suggested comment, I didn't really read it before adding it.
params: protocol::PlaybackStreamParams, | ||
sample_format: SampleFormat, | ||
mut data_callback: D, | ||
_error_callback: E, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should capture errors from the worker thread and send them to the error callback. This could be done by the channels approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we don't need channels, since the callback is Send
.
// Spawn a thread to drive the stream future. It will exit automatically | ||
// when the stream is stopped by the user. | ||
let stream_clone = stream.clone(); | ||
let _worker_thread = std::thread::spawn(move || block_on(stream_clone.play_all())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like play_all
can return errors that we may want to send to the error callback?
Also - thinking hard here about edge cases - would it be possible to try and drop the stream before it finishes playing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dropping the stream means that the play_all
future gets resolved and the thread exits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To fully explicate this: here is the future returned by play_all
: https://github.com/colinmarc/pulseaudio-rs/blob/main/src/client/playback_stream.rs#L129-L136
Here is where we store the sender: https://github.com/colinmarc/pulseaudio-rs/blob/main/src/client/reactor.rs#L31
And here is where we drop it:
In an extreme case if the pulse daemon crashes before responding or something, we might leak a single thread, but we'll have bigger issues playing audio in that case. And dropping the Client
object will clear all the pending futures.
let stream_clone = stream.clone(); | ||
let latency_clone = current_latency_micros.clone(); | ||
std::thread::spawn(move || loop { | ||
let Ok(timing_info) = block_on(stream_clone.timing_info()) else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kind of the same question; could this thread outlive the stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above; the future is internally a oneshot
and dropping the stream resolves it.
|
||
// We always consider the full buffer filled, because cpal's | ||
// user-facing api doesn't allow for short writes. | ||
// TODO: should we preemptively zero the output buffer before |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we should, for example like we do with ALSA.
Fixes #259.
I finally got around to this after promising to PR it in #259 a year ago. 😅
This is based on colinmarc/pulseaudio-rs#2, which obviously needs to land before this. However, I developed both PRs in parallel. If you're feeling generous and would like to review that as well, I would welcome any feedback.
The first commit is unrelated and a bit opinionated, but it seemed nicer. Let me know if I should move that to a separate PR or just drop it.