From 268bdd5f93eaca1d2e664102c1cfc84a26005a9f Mon Sep 17 00:00:00 2001 From: Mike Hommey Date: Fri, 15 Oct 2021 13:24:36 +0900 Subject: [PATCH] Use an AtomicPtr for PulseStream's drain_timer There is a race condition between `drained_cb` and `PulseStream::stop` that happens reliably on Firefox CI with rust 1.56 beta (LLVM 13) and PGO instrumentation. Here's how it goes: - in the Firefox AudioIPC Server RPC thread, `PulseStream::stop` is called - `PulseStream::stop enters the loop waiting for drain, and blocks on `mainloop.wait` - Later, some other thread calls `drained_cb`, which resets `drain_timer`, and signals the mainloop. - Back the other AudioIPC Server RPC thread, `mainloop.wait` returns, looping back to the test for `drain_timer`... which this thread doesn't know had been updated yet, so it blocks on `mainloop.wait` again. --- src/backend/stream.rs | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/src/backend/stream.rs b/src/backend/stream.rs index 8168c81..df64b0f 100644 --- a/src/backend/stream.rs +++ b/src/backend/stream.rs @@ -15,7 +15,7 @@ use ringbuf::RingBuffer; use std::ffi::{CStr, CString}; use std::os::raw::{c_long, c_void}; use std::slice; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; use std::{mem, ptr}; use self::LinearInputBuffer::*; @@ -272,7 +272,7 @@ pub struct PulseStream<'ctx> { input_stream: Option, data_callback: ffi::cubeb_data_callback, state_callback: ffi::cubeb_state_callback, - drain_timer: *mut pa_time_event, + drain_timer: AtomicPtr, output_sample_spec: pulse::SampleSpec, input_sample_spec: pulse::SampleSpec, // output frames count excluding pre-buffering @@ -411,7 +411,7 @@ impl<'ctx> PulseStream<'ctx> { data_callback, state_callback, user_ptr, - drain_timer: ptr::null_mut(), + drain_timer: AtomicPtr::new(ptr::null_mut()), output_sample_spec: pulse::SampleSpec::default(), input_sample_spec: pulse::SampleSpec::default(), output_frame_count: AtomicUsize::new(0), @@ -574,9 +574,10 @@ impl<'ctx> PulseStream<'ctx> { self.context.mainloop.lock(); { if let Some(stm) = self.output_stream.take() { - if !self.drain_timer.is_null() { + let drain_timer = self.drain_timer.load(Ordering::Acquire); + if !drain_timer.is_null() { /* there's no pa_rttime_free, so use this instead. */ - self.context.mainloop.get_api().time_free(self.drain_timer); + self.context.mainloop.get_api().time_free(drain_timer); } stm.clear_state_callback(); stm.clear_write_callback(); @@ -637,7 +638,7 @@ impl<'ctx> StreamOps for PulseStream<'ctx> { self.shutdown = true; // If draining is taking place wait to finish cubeb_log!("Stream stop: waiting for drain."); - while !self.drain_timer.is_null() { + while !self.drain_timer.load(Ordering::Acquire).is_null() { self.context.mainloop.wait(); } cubeb_log!("Stream stop: waited for drain."); @@ -988,11 +989,12 @@ impl<'ctx> PulseStream<'ctx> { ) { cubeb_logv!("Drain finished callback."); let stm = unsafe { &mut *(u as *mut PulseStream) }; - debug_assert_eq!(stm.drain_timer, e); + let drain_timer = stm.drain_timer.load(Ordering::Acquire); + debug_assert_eq!(drain_timer, e); stm.state_change_callback(ffi::CUBEB_STATE_DRAINED); /* there's no pa_rttime_free, so use this instead. */ - a.time_free(stm.drain_timer); - stm.drain_timer = ptr::null_mut(); + a.time_free(drain_timer); + stm.drain_timer.store(ptr::null_mut(), Ordering::Release); stm.context.mainloop.signal(); } @@ -1109,13 +1111,16 @@ impl<'ctx> PulseStream<'ctx> { /* pa_stream_drain is useless, see PA bug# 866. this is a workaround. */ /* arbitrary safety margin: double the current latency. */ - debug_assert!(self.drain_timer.is_null()); + debug_assert!(self.drain_timer.load(Ordering::Acquire).is_null()); let stream_ptr = self as *const _ as *mut _; if let Some(ref context) = self.context.context { - self.drain_timer = context.rttime_new( - pulse::rtclock_now() + 2 * latency, - drained_cb, - stream_ptr, + self.drain_timer.store( + context.rttime_new( + pulse::rtclock_now() + 2 * latency, + drained_cb, + stream_ptr, + ), + Ordering::Release, ); } self.shutdown = true;