Skip to content

Commit

Permalink
Use an AtomicPtr for PulseStream's drain_timer
Browse files Browse the repository at this point in the history
There is a race condition between `drained_cb` and `PulseStream::stop` that
happens reliably on Firefox CI with rust 1.56 beta (LLVM 13) and PGO
instrumentation. Here's how it goes:
- in the Firefox AudioIPC Server RPC thread, `PulseStream::stop` is
  called
- `PulseStream::stop enters the loop waiting for drain, and blocks on
  `mainloop.wait`
- Later, some other thread calls `drained_cb`, which resets `drain_timer`,
  and signals the mainloop.
- Back the other AudioIPC Server RPC thread, `mainloop.wait` returns,
  looping back to the test for `drain_timer`... which this thread
  doesn't know had been updated yet, so it blocks on `mainloop.wait`
  again.
  • Loading branch information
glandium authored and kinetiknz committed Oct 15, 2021
1 parent e9e55a4 commit 9695281
Showing 1 changed file with 19 additions and 14 deletions.
33 changes: 19 additions & 14 deletions src/backend/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use ringbuf::RingBuffer;
use std::ffi::{CStr, CString};
use std::os::raw::{c_long, c_void};
use std::slice;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use std::{mem, ptr};

use self::LinearInputBuffer::*;
Expand Down Expand Up @@ -272,7 +272,7 @@ pub struct PulseStream<'ctx> {
input_stream: Option<pulse::Stream>,
data_callback: ffi::cubeb_data_callback,
state_callback: ffi::cubeb_state_callback,
drain_timer: *mut pa_time_event,
drain_timer: AtomicPtr<pa_time_event>,
output_sample_spec: pulse::SampleSpec,
input_sample_spec: pulse::SampleSpec,
// output frames count excluding pre-buffering
Expand Down Expand Up @@ -411,7 +411,7 @@ impl<'ctx> PulseStream<'ctx> {
data_callback,
state_callback,
user_ptr,
drain_timer: ptr::null_mut(),
drain_timer: AtomicPtr::new(ptr::null_mut()),
output_sample_spec: pulse::SampleSpec::default(),
input_sample_spec: pulse::SampleSpec::default(),
output_frame_count: AtomicUsize::new(0),
Expand Down Expand Up @@ -574,9 +574,10 @@ impl<'ctx> PulseStream<'ctx> {
self.context.mainloop.lock();
{
if let Some(stm) = self.output_stream.take() {
if !self.drain_timer.is_null() {
let drain_timer = self.drain_timer.load(Ordering::Acquire);
if !drain_timer.is_null() {
/* there's no pa_rttime_free, so use this instead. */
self.context.mainloop.get_api().time_free(self.drain_timer);
self.context.mainloop.get_api().time_free(drain_timer);
}
stm.clear_state_callback();
stm.clear_write_callback();
Expand Down Expand Up @@ -637,7 +638,7 @@ impl<'ctx> StreamOps for PulseStream<'ctx> {
self.shutdown = true;
// If draining is taking place wait to finish
cubeb_log!("Stream stop: waiting for drain.");
while !self.drain_timer.is_null() {
while !self.drain_timer.load(Ordering::Acquire).is_null() {
self.context.mainloop.wait();
}
cubeb_log!("Stream stop: waited for drain.");
Expand Down Expand Up @@ -988,11 +989,12 @@ impl<'ctx> PulseStream<'ctx> {
) {
cubeb_logv!("Drain finished callback.");
let stm = unsafe { &mut *(u as *mut PulseStream) };
debug_assert_eq!(stm.drain_timer, e);
let drain_timer = stm.drain_timer.load(Ordering::Acquire);
debug_assert_eq!(drain_timer, e);
stm.state_change_callback(ffi::CUBEB_STATE_DRAINED);
/* there's no pa_rttime_free, so use this instead. */
a.time_free(stm.drain_timer);
stm.drain_timer = ptr::null_mut();
a.time_free(drain_timer);
stm.drain_timer.store(ptr::null_mut(), Ordering::Release);
stm.context.mainloop.signal();
}

Expand Down Expand Up @@ -1109,13 +1111,16 @@ impl<'ctx> PulseStream<'ctx> {

/* pa_stream_drain is useless, see PA bug# 866. this is a workaround. */
/* arbitrary safety margin: double the current latency. */
debug_assert!(self.drain_timer.is_null());
debug_assert!(self.drain_timer.load(Ordering::Acquire).is_null());
let stream_ptr = self as *const _ as *mut _;
if let Some(ref context) = self.context.context {
self.drain_timer = context.rttime_new(
pulse::rtclock_now() + 2 * latency,
drained_cb,
stream_ptr,
self.drain_timer.store(
context.rttime_new(
pulse::rtclock_now() + 2 * latency,
drained_cb,
stream_ptr,
),
Ordering::Release,
);
}
self.shutdown = true;
Expand Down

0 comments on commit 9695281

Please sign in to comment.