Skip to content

Commit

Permalink
Auto merge of #45524 - alexcrichton:improve-park-unpark, r=dtolnay
Browse files Browse the repository at this point in the history
std: Optimize thread park/unpark implementation

This is an adaptation of rust-lang/futures-rs#597 for the standard library.
The goal here is to avoid locking a mutex on the "fast path" for thread
park/unpark where you're waking up a thread that isn't sleeping or otherwise
trying to park a thread that's already been notified. Mutex performance varies
quite a bit across platforms so this should provide a nice consistent speed
boost for the fast path of these functions.
  • Loading branch information
bors committed Oct 27, 2017
2 parents bb37bc1 + 6511e46 commit 1855aff
Showing 1 changed file with 72 additions and 15 deletions.
87 changes: 72 additions & 15 deletions src/libstd/thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ use panic;
use panicking;
use str;
use sync::{Mutex, Condvar, Arc};
use sync::atomic::AtomicUsize;
use sync::atomic::Ordering::SeqCst;
use sys::thread as imp;
use sys_common::mutex;
use sys_common::thread_info;
Expand Down Expand Up @@ -694,6 +696,11 @@ pub fn sleep(dur: Duration) {
imp::Thread::sleep(dur)
}

// constants for park/unpark
const EMPTY: usize = 0;
const PARKED: usize = 1;
const NOTIFIED: usize = 2;

/// Blocks unless or until the current thread's token is made available.
///
/// A call to `park` does not guarantee that the thread will remain parked
Expand Down Expand Up @@ -771,11 +778,27 @@ pub fn sleep(dur: Duration) {
#[stable(feature = "rust1", since = "1.0.0")]
pub fn park() {
let thread = current();
let mut guard = thread.inner.lock.lock().unwrap();
while !*guard {
guard = thread.inner.cvar.wait(guard).unwrap();

// If we were previously notified then we consume this notification and
// return quickly.
if thread.inner.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
return
}

// Otherwise we need to coordinate going to sleep
let mut m = thread.inner.lock.lock().unwrap();
match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
Ok(_) => {}
Err(NOTIFIED) => return, // notified after we locked
Err(_) => panic!("inconsistent park state"),
}
loop {
m = thread.inner.cvar.wait(m).unwrap();
match thread.inner.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
Ok(_) => return, // got a notification
Err(_) => {} // spurious wakeup, go back to sleep
}
}
*guard = false;
}

/// Use [`park_timeout`].
Expand Down Expand Up @@ -842,12 +865,30 @@ pub fn park_timeout_ms(ms: u32) {
#[stable(feature = "park_timeout", since = "1.4.0")]
pub fn park_timeout(dur: Duration) {
let thread = current();
let mut guard = thread.inner.lock.lock().unwrap();
if !*guard {
let (g, _) = thread.inner.cvar.wait_timeout(guard, dur).unwrap();
guard = g;

// Like `park` above we have a fast path for an already-notified thread, and
// afterwards we start coordinating for a sleep.
// return quickly.
if thread.inner.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst).is_ok() {
return
}
let m = thread.inner.lock.lock().unwrap();
match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
Ok(_) => {}
Err(NOTIFIED) => return, // notified after we locked
Err(_) => panic!("inconsistent park_timeout state"),
}

// Wait with a timeout, and if we spuriously wake up or otherwise wake up
// from a notification we just want to unconditionally set the state back to
// empty, either consuming a notification or un-flagging ourselves as
// parked.
let (_m, _result) = thread.inner.cvar.wait_timeout(m, dur).unwrap();
match thread.inner.state.swap(EMPTY, SeqCst) {
NOTIFIED => {} // got a notification, hurray!
PARKED => {} // no notification, alas
n => panic!("inconsistent park_timeout state: {}", n),
}
*guard = false;
}

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -914,7 +955,10 @@ impl ThreadId {
struct Inner {
name: Option<CString>, // Guaranteed to be UTF-8
id: ThreadId,
lock: Mutex<bool>, // true when there is a buffered unpark

// state for thread park/unpark
state: AtomicUsize,
lock: Mutex<()>,
cvar: Condvar,
}

Expand Down Expand Up @@ -958,7 +1002,8 @@ impl Thread {
inner: Arc::new(Inner {
name: cname,
id: ThreadId::new(),
lock: Mutex::new(false),
state: AtomicUsize::new(EMPTY),
lock: Mutex::new(()),
cvar: Condvar::new(),
})
}
Expand Down Expand Up @@ -998,10 +1043,22 @@ impl Thread {
/// [park]: fn.park.html
#[stable(feature = "rust1", since = "1.0.0")]
pub fn unpark(&self) {
let mut guard = self.inner.lock.lock().unwrap();
if !*guard {
*guard = true;
self.inner.cvar.notify_one();
loop {
match self.inner.state.compare_exchange(EMPTY, NOTIFIED, SeqCst, SeqCst) {
Ok(_) => return, // no one was waiting
Err(NOTIFIED) => return, // already unparked
Err(PARKED) => {} // gotta go wake someone up
_ => panic!("inconsistent state in unpark"),
}

// Coordinate wakeup through the mutex and a condvar notification
let _lock = self.inner.lock.lock().unwrap();
match self.inner.state.compare_exchange(PARKED, NOTIFIED, SeqCst, SeqCst) {
Ok(_) => return self.inner.cvar.notify_one(),
Err(NOTIFIED) => return, // a different thread unparked
Err(EMPTY) => {} // parked thread went away, try again
_ => panic!("inconsistent state in unpark"),
}
}
}

Expand Down

0 comments on commit 1855aff

Please sign in to comment.