Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for FUTEX_{WAIT,WAKE}_BITSET #2054

Merged
merged 7 commits into from
Apr 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
f262ca12aac76152c4b46cefcf8300f0249a5eb2
306ba8357fb36212b7d30efb9eb9e41659ac1445
80 changes: 68 additions & 12 deletions src/shims/posix/linux/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ pub fn futex<'tcx>(

let futex_private = this.eval_libc_i32("FUTEX_PRIVATE_FLAG")?;
let futex_wait = this.eval_libc_i32("FUTEX_WAIT")?;
let futex_wait_bitset = this.eval_libc_i32("FUTEX_WAIT_BITSET")?;
let futex_wake = this.eval_libc_i32("FUTEX_WAKE")?;
let futex_wake_bitset = this.eval_libc_i32("FUTEX_WAKE_BITSET")?;
let futex_realtime = this.eval_libc_i32("FUTEX_CLOCK_REALTIME")?;

// FUTEX_PRIVATE enables an optimization that stops it from working across processes.
Expand All @@ -45,12 +47,37 @@ pub fn futex<'tcx>(
// FUTEX_WAIT: (int *addr, int op = FUTEX_WAIT, int val, const timespec *timeout)
// Blocks the thread if *addr still equals val. Wakes up when FUTEX_WAKE is called on the same address,
// or *timeout expires. `timeout == null` for an infinite timeout.
op if op & !futex_realtime == futex_wait => {
if args.len() < 5 {
throw_ub_format!(
"incorrect number of arguments for `futex` syscall with `op=FUTEX_WAIT`: got {}, expected at least 5",
args.len()
);
//
// FUTEX_WAIT_BITSET: (int *addr, int op = FUTEX_WAIT_BITSET, int val, const timespec *timeout, int *_ignored, unsigned int bitset)
// This is identical to FUTEX_WAIT, except:
// - The timeout is absolute rather than relative.
// - You can specify the bitset to selecting what WAKE operations to respond to.
op if op & !futex_realtime == futex_wait || op & !futex_realtime == futex_wait_bitset => {
let wait_bitset = op & !futex_realtime == futex_wait_bitset;

let bitset = if wait_bitset {
if args.len() != 7 {
throw_ub_format!(
"incorrect number of arguments for `futex` syscall with `op=FUTEX_WAIT_BITSET`: got {}, expected 7",
args.len()
);
}
this.read_scalar(&args[6])?.to_u32()?
} else {
if args.len() < 5 {
throw_ub_format!(
"incorrect number of arguments for `futex` syscall with `op=FUTEX_WAIT`: got {}, expected at least 5",
args.len()
);
}
u32::MAX
};

if bitset == 0 {
let einval = this.eval_libc("EINVAL")?;
this.set_last_error(einval)?;
this.write_scalar(Scalar::from_machine_isize(-1, this), dest)?;
return Ok(());
}

// `deref_operand` but not actually dereferencing the ptr yet (it might be NULL!).
Expand All @@ -70,10 +97,20 @@ pub fn futex<'tcx>(
return Ok(());
}
};
Some(if op & futex_realtime != 0 {
Time::RealTime(SystemTime::now().checked_add(duration).unwrap())
Some(if wait_bitset {
// FUTEX_WAIT_BITSET uses an absolute timestamp.
if op & futex_realtime != 0 {
Time::RealTime(SystemTime::UNIX_EPOCH.checked_add(duration).unwrap())
} else {
Time::Monotonic(this.machine.time_anchor.checked_add(duration).unwrap())
}
} else {
Time::Monotonic(Instant::now().checked_add(duration).unwrap())
// FUTEX_WAIT uses a relative timestamp.
if op & futex_realtime != 0 {
Time::RealTime(SystemTime::now().checked_add(duration).unwrap())
} else {
Time::Monotonic(Instant::now().checked_add(duration).unwrap())
}
})
};
// Check the pointer for alignment and validity.
Expand Down Expand Up @@ -108,7 +145,7 @@ pub fn futex<'tcx>(
if val == futex_val {
// The value still matches, so we block the trait make it wait for FUTEX_WAKE.
this.block_thread(thread);
this.futex_wait(addr_scalar.to_machine_usize(this)?, thread);
this.futex_wait(addr_scalar.to_machine_usize(this)?, thread, bitset);
// Succesfully waking up from FUTEX_WAIT always returns zero.
this.write_scalar(Scalar::from_machine_isize(0, this), dest)?;
// Register a timeout callback if a timeout was specified.
Expand Down Expand Up @@ -140,10 +177,29 @@ pub fn futex<'tcx>(
// Wakes at most `val` threads waiting on the futex at `addr`.
// Returns the amount of threads woken up.
// Does not access the futex value at *addr.
op if op == futex_wake => {
// FUTEX_WAKE_BITSET: (int *addr, int op = FUTEX_WAKE, int val, const timespect *_unused, int *_unused, unsigned int bitset)
// Same as FUTEX_WAKE, but allows you to specify a bitset to select which threads to wake up.
op if op == futex_wake || op == futex_wake_bitset => {
let bitset = if op == futex_wake_bitset {
if args.len() != 7 {
throw_ub_format!(
"incorrect number of arguments for `futex` syscall with `op=FUTEX_WAKE_BITSET`: got {}, expected 7",
args.len()
);
}
this.read_scalar(&args[6])?.to_u32()?
} else {
u32::MAX
};
if bitset == 0 {
let einval = this.eval_libc("EINVAL")?;
this.set_last_error(einval)?;
this.write_scalar(Scalar::from_machine_isize(-1, this), dest)?;
return Ok(());
}
let mut n = 0;
for _ in 0..val {
if let Some(thread) = this.futex_wake(addr_scalar.to_machine_usize(this)?) {
if let Some(thread) = this.futex_wake(addr_scalar.to_machine_usize(this)?, bitset) {
this.unblock_thread(thread);
this.unregister_timeout_callback_if_exists(thread);
n += 1;
Expand Down
16 changes: 10 additions & 6 deletions src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ struct Futex {
struct FutexWaiter {
/// The thread that is waiting on this futex.
thread: ThreadId,
/// The bitset used by FUTEX_*_BITSET, or u32::MAX for other operations.
bitset: u32,
}

/// The state of all synchronization variables.
Expand Down Expand Up @@ -486,15 +488,15 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
this.machine.threads.sync.condvars[id].waiters.retain(|waiter| waiter.thread != thread);
}

fn futex_wait(&mut self, addr: u64, thread: ThreadId) {
fn futex_wait(&mut self, addr: u64, thread: ThreadId, bitset: u32) {
let this = self.eval_context_mut();
let futex = &mut this.machine.threads.sync.futexes.entry(addr).or_default();
let waiters = &mut futex.waiters;
assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting");
waiters.push_back(FutexWaiter { thread });
waiters.push_back(FutexWaiter { thread, bitset });
}

fn futex_wake(&mut self, addr: u64) -> Option<ThreadId> {
fn futex_wake(&mut self, addr: u64, bitset: u32) -> Option<ThreadId> {
let this = self.eval_context_mut();
let current_thread = this.get_active_thread();
let futex = &mut this.machine.threads.sync.futexes.get_mut(&addr)?;
Expand All @@ -504,13 +506,15 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriEvalContextExt<'mir, 'tcx
if let Some(data_race) = data_race {
data_race.validate_lock_release(&mut futex.data_race, current_thread);
}
let res = futex.waiters.pop_front().map(|waiter| {

// Wake up the first thread in the queue that matches any of the bits in the bitset.
futex.waiters.iter().position(|w| w.bitset & bitset != 0).map(|i| {
let waiter = futex.waiters.remove(i).unwrap();
if let Some(data_race) = data_race {
data_race.validate_lock_acquire(&futex.data_race, waiter.thread);
}
waiter.thread
});
res
})
}

fn futex_remove_waiter(&mut self, addr: u64, thread: ThreadId) {
Expand Down
86 changes: 86 additions & 0 deletions tests/run-pass/concurrency/linux-futex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#![feature(rustc_private)]
extern crate libc;

use std::mem::MaybeUninit;
use std::ptr;
use std::thread;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -93,6 +94,42 @@ fn wait_timeout() {
assert!((200..1000).contains(&start.elapsed().as_millis()));
}

fn wait_absolute_timeout() {
let start = Instant::now();

// Get the current monotonic timestamp as timespec.
let mut timeout = unsafe {
let mut now: MaybeUninit<libc::timespec> = MaybeUninit::uninit();
assert_eq!(libc::clock_gettime(libc::CLOCK_MONOTONIC, now.as_mut_ptr()), 0);
now.assume_init()
};

// Add 200ms.
timeout.tv_nsec += 200_000_000;
if timeout.tv_nsec > 1_000_000_000 {
timeout.tv_nsec -= 1_000_000_000;
timeout.tv_sec += 1;
}

let futex: i32 = 123;

// Wait for 200ms from now, with nobody waking us up early.
unsafe {
assert_eq!(libc::syscall(
libc::SYS_futex,
&futex as *const i32,
libc::FUTEX_WAIT_BITSET,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test also passes if I put FUTEX_WAIT here, as it seems that the monotonic clock starts at zero in the tests. It'd be a better test if it was run with a monotonic clock that does not start at zero.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically, our monotone clock is relative to the time_anchor which is initialized via Instant::now() when the interpreter starts.

I guess we could add an arbitrary offset to that? It also should make a difference if the interpreter has already been running for a bit (e.g. because of a previous timeout test).

123,
&timeout,
0,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This turns out to be a subtle bug, since 0 becomes 0i32 but this argument should be pointer-sized. #2058 makes Miri detect that problem.

(It'd probably still work on x86 due to details of the calling convention, but e.g. if arguments are passed on the stack then va_arg would probably do the wrong thing here.)

u32::MAX,
), -1);
assert_eq!(*libc::__errno_location(), libc::ETIMEDOUT);
}

assert!((200..1000).contains(&start.elapsed().as_millis()));
}

fn wait_wake() {
let start = Instant::now();

Expand Down Expand Up @@ -123,10 +160,59 @@ fn wait_wake() {
assert!((200..1000).contains(&start.elapsed().as_millis()));
}

fn wait_wake_bitset() {
let start = Instant::now();

static FUTEX: i32 = 0;

thread::spawn(move || {
thread::sleep(Duration::from_millis(200));
unsafe {
assert_eq!(libc::syscall(
libc::SYS_futex,
&FUTEX as *const i32,
libc::FUTEX_WAKE_BITSET,
10, // Wake up at most 10 threads.
0,
0,
0b1001, // bitset
), 0); // Didn't match any thread.
}
thread::sleep(Duration::from_millis(200));
unsafe {
assert_eq!(libc::syscall(
libc::SYS_futex,
&FUTEX as *const i32,
libc::FUTEX_WAKE_BITSET,
10, // Wake up at most 10 threads.
0,
0,
0b0110, // bitset
), 1); // Woken up one thread.
}
});

unsafe {
assert_eq!(libc::syscall(
libc::SYS_futex,
&FUTEX as *const i32,
libc::FUTEX_WAIT_BITSET,
0,
ptr::null::<libc::timespec>(),
0,
0b0100, // bitset
), 0);
}

assert!((400..1000).contains(&start.elapsed().as_millis()));
}

fn main() {
wake_nobody();
wake_dangling();
wait_wrong_val();
wait_timeout();
wait_absolute_timeout();
wait_wake();
wait_wake_bitset();
}