Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 59 additions & 35 deletions tokio-executor/src/park.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::{Arc, Mutex, Condvar};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::time::Duration;

/// Block the current thread.
Expand Down Expand Up @@ -237,37 +238,56 @@ impl Inner {
fn park(&self, timeout: Option<Duration>) -> Result<(), ParkError> {
// If currently notified, then we skip sleeping. This is checked outside
// of the lock to avoid acquiring a mutex if not necessary.
match self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) {
NOTIFY => return Ok(()),
IDLE => {},
_ => unreachable!(),
if self.state.compare_exchange(NOTIFY, IDLE, SeqCst, SeqCst).is_ok() {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return Ok(());
}

// If the duration is zero, then there is no need to actually block
if let Some(ref dur) = timeout {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is new in tokio-executor. @carllerche, any idea why the previous implementation didn't have it? Maybe just an oversight?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oversight... it would be nice to unify the two impls instead of duplicating code.

if *dur == Duration::from_millis(0) {
return Ok(());
}
}

// The state is currently idle, so obtain the lock and then try to
// transition to a sleeping state.
let mut m = self.mutex.lock().unwrap();

// Transition to sleeping
match self.state.compare_and_swap(IDLE, SLEEP, Ordering::SeqCst) {
NOTIFY => {
match self.state.compare_exchange(IDLE, SLEEP, SeqCst, SeqCst) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok(_) => {}
Err(NOTIFY) => {
// Notified before we could sleep, consume the notification and
// exit
self.state.store(IDLE, Ordering::SeqCst);
self.state.swap(IDLE, SeqCst);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using swap as a store seems like overkill since the value is discarded. On x86 this will generate an unnecessary locked instruction. (ie, the std version should use store.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that the version in std is incorrect. The swap is required to establish ordering with this line in unpark. Without doing a "read" operation here, visibility cannot be acquired.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow. I didn't check std, but I assume the swap here came from there.

Also, my understanding is that SeqCst should establish total ordering with other SeqCst loads and stores, so this can be a plain store and still be ordered with respect to compare_exchange. (The std::atomic::Ordering docs are irritatingly imprecise here though, so I don't know if I'm actually right - it seems to imply that SeqCst isn't meaningful for store.)

Copy link
Author

@ghost ghost Aug 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsgf

A store operation with Release ordering (or stronger) writes to memory and allows other future load operations to synchronize with it.

However, here we also want to perform a load operation at the same time (hence swap) in order to synchronize with whoever last wrote to this location (i.e. acquire their writes to memory).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't SeqCst already guarantee that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsgf, from an x86 codegen perspective, I don’t think a SeqCst store or swap is going to matter - it’s going to be some instruction that entails LOCK like behavior for both. In fact, I see xchg reg, mem on the playground for both.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carllerche

Without doing a "read" operation here, visibility cannot be acquired.

Isn't the read at line 257 sufficient?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the read at line 257 sufficient?

I was about to ask the same. The read of NOTIFY with SeqCst is sufficient to establish synchronization with the thread that notified.

Of course release/acquire would be sufficient as well but I am not going to try playing that game again ;)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because a third thread can come along and unpark while the park fn is between 257 and 262. The bug is specifically that the park thread must acquire the memory from a third thread that calls unpark before the park thread hits line 262.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Continuing in rust-lang/rust#53366)

return Ok(());
}
IDLE => {},
_ => unreachable!(),
}

m = match timeout {
Some(timeout) => self.condvar.wait_timeout(m, timeout).unwrap().0,
None => self.condvar.wait(m).unwrap(),
match timeout {
Some(timeout) => {
m = self.condvar.wait_timeout(m, timeout).unwrap().0;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// Transition back to idle. If the state has transitioned to `NOTIFY`,
// this will consume that notification.
match self.state.swap(IDLE, SeqCst) {
NOTIFY => {}, // Got a notification
SLEEP => {}, // No notification, timed out
_ => unreachable!(),
}
}
None => {
loop {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

m = self.condvar.wait(m).unwrap();
match self.state.compare_exchange(NOTIFY, IDLE, SeqCst, SeqCst) {
Ok(_) => break, // Got a notification
Err(_) => {} // Spurious wakeup, go back to sleep
}
}
}
};

// Transition back to idle. If the state has transitioned to `NOTIFY`,
// this will consume that notification
self.state.store(IDLE, Ordering::SeqCst);

// Explicitly drop the mutex guard. There is no real point in doing it
// except that I find it helpful to make it explicit where we want the
// mutex to unlock.
Expand All @@ -277,26 +297,30 @@ impl Inner {
}

fn unpark(&self) {
// First, try transitioning from IDLE -> NOTIFY, this does not require a
// lock.
match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) {
IDLE | NOTIFY => return,
SLEEP => {}
_ => unreachable!(),
}

// The other half is sleeping, this requires a lock
let _m = self.mutex.lock().unwrap();
loop {
// First, try transitioning from IDLE -> NOTIFY, this does not require a
// lock.
match self.state.compare_exchange(IDLE, NOTIFY, SeqCst, SeqCst) {
Ok(_) => return, // No one was waiting
Err(NOTIFY) => return, // Already unparked
Err(SLEEP) => {} // Gotta wake up
_ => unreachable!(),
}

// Transition to NOTIFY
match self.state.swap(NOTIFY, Ordering::SeqCst) {
SLEEP => {}
NOTIFY => return,
IDLE => return,
_ => unreachable!(),
// The other half is sleeping, this requires a lock
let _m = self.mutex.lock().unwrap();

// Transition to NOTIFY
match self.state.compare_exchange(SLEEP, NOTIFY, SeqCst, SeqCst) {
Ok(_) => {
// Wakeup the sleeper
self.condvar.notify_one();
return;
}
Err(NOTIFY) => return, // A different thread unparked
Err(IDLE) => {} // Parked thread went away, try again
_ => unreachable!(),
}
}

// Wakeup the sleeper
self.condvar.notify_one();
}
}
84 changes: 50 additions & 34 deletions tokio-threadpool/src/park/default_park.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,8 @@ impl Inner {
fn park(&self, timeout: Option<Duration>) {
// If currently notified, then we skip sleeping. This is checked outside
// of the lock to avoid acquiring a mutex if not necessary.
match self.state.compare_and_swap(NOTIFY, IDLE, SeqCst) {
NOTIFY => return,
IDLE => {},
_ => unreachable!(),
if self.state.compare_exchange(NOTIFY, IDLE, SeqCst, SeqCst).is_ok() {
return;
}

// If the duration is zero, then there is no need to actually block
Expand All @@ -117,54 +115,72 @@ impl Inner {
let mut m = self.mutex.lock().unwrap();

// Transition to sleeping
match self.state.compare_and_swap(IDLE, SLEEP, SeqCst) {
NOTIFY => {
match self.state.compare_exchange(IDLE, SLEEP, SeqCst, SeqCst) {
Ok(_) => {}
Err(NOTIFY) => {
// Notified before we could sleep, consume the notification and
// exit
self.state.store(IDLE, SeqCst);
self.state.swap(IDLE, SeqCst);
return;
}
IDLE => {},
_ => unreachable!(),
}

m = match timeout {
Some(timeout) => self.condvar.wait_timeout(m, timeout).unwrap().0,
None => self.condvar.wait(m).unwrap(),
match timeout {
Some(timeout) => {
m = self.condvar.wait_timeout(m, timeout).unwrap().0;

// Transition back to idle. If the state has transitioned to `NOTIFY`,
// this will consume that notification.
match self.state.swap(IDLE, SeqCst) {
NOTIFY => {}, // Got a notification
SLEEP => {}, // No notification, timed out
_ => unreachable!(),
}
}
None => {
loop {
m = self.condvar.wait(m).unwrap();
match self.state.compare_exchange(NOTIFY, IDLE, SeqCst, SeqCst) {
Ok(_) => break, // Got a notification
Err(_) => {} // Spurious wakeup, go back to sleep
}
}
}
};

// Transition back to idle. If the state has transitioned to `NOTIFY`,
// this will consume that notification.
self.state.store(IDLE, SeqCst);

// Explicitly drop the mutex guard. There is no real point in doing it
// except that I find it helpful to make it explicit where we want the
// mutex to unlock.
drop(m);
}

fn unpark(&self) {
// First, try transitioning from IDLE -> NOTIFY, this does not require a
// lock.
match self.state.compare_and_swap(IDLE, NOTIFY, SeqCst) {
IDLE | NOTIFY => return,
SLEEP => {}
_ => unreachable!(),
}

// The other half is sleeping, this requires a lock
let _m = self.mutex.lock().unwrap();
loop {
// First, try transitioning from IDLE -> NOTIFY, this does not require a
// lock.
match self.state.compare_exchange(IDLE, NOTIFY, SeqCst, SeqCst) {
Ok(_) => return, // No one was waiting
Err(NOTIFY) => return, // Already unparked
Err(SLEEP) => {} // Gotta wake up
_ => unreachable!(),
}

// Transition to NOTIFY
match self.state.swap(NOTIFY, SeqCst) {
SLEEP => {}
NOTIFY => return,
IDLE => return,
_ => unreachable!(),
// The other half is sleeping, this requires a lock
let _m = self.mutex.lock().unwrap();

// Transition to NOTIFY
match self.state.compare_exchange(SLEEP, NOTIFY, SeqCst, SeqCst) {
Ok(_) => {
// Wakeup the sleeper
self.condvar.notify_one();
return;
}
Err(NOTIFY) => return, // A different thread unparked
Err(IDLE) => {} // Parked thread went away, try again
_ => unreachable!(),
}
}

// Wakeup the sleeper
self.condvar.notify_one();
}
}

Expand Down