Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/concurrency/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ pub enum BlockReason {
InitOnce(InitOnceId),
/// Blocked on epoll.
Epoll,
/// Blocked on eventfd.
Eventfd,
}

/// The state of a thread.
Expand Down
201 changes: 153 additions & 48 deletions src/shims/unix/linux/eventfd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::io;
use std::io::ErrorKind;

use crate::concurrency::VClock;
use crate::shims::unix::fd::FileDescriptionRef;
use crate::shims::unix::fd::{FileDescriptionRef, WeakFileDescriptionRef};
use crate::shims::unix::linux::epoll::{EpollReadyEvents, EvalContextExt as _};
use crate::shims::unix::*;
use crate::*;
Expand All @@ -26,6 +26,10 @@ struct Event {
counter: Cell<u64>,
is_nonblock: bool,
clock: RefCell<VClock>,
/// A list of thread ids blocked on eventfd::read.
blocked_read_tid: RefCell<Vec<ThreadId>>,
/// A list of thread ids blocked on eventfd::write.
blocked_write_tid: RefCell<Vec<ThreadId>>,
}

impl FileDescription for Event {
Expand Down Expand Up @@ -72,31 +76,8 @@ impl FileDescription for Event {
// eventfd read at the size of u64.
let buf_place = ecx.ptr_to_mplace_unaligned(ptr, ty);

// Block when counter == 0.
let counter = self.counter.get();
if counter == 0 {
if self.is_nonblock {
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
}

throw_unsup_format!("eventfd: blocking is unsupported");
} else {
// Synchronize with all prior `write` calls to this FD.
ecx.acquire_clock(&self.clock.borrow());

// Give old counter value to userspace, and set counter value to 0.
ecx.write_int(counter, &buf_place)?;
self.counter.set(0);

// When any of the event happened, we check and update the status of all supported event
// types for current file description.
ecx.check_and_update_readiness(self_ref)?;

// Tell userspace how many bytes we wrote.
ecx.write_int(buf_place.layout.size.bytes(), dest)?;
}

interp_ok(())
let weak_eventfd = self_ref.downgrade();
eventfd_read(buf_place, dest, weak_eventfd, ecx)
}

/// A write call adds the 8-byte integer value supplied in
Expand Down Expand Up @@ -127,7 +108,7 @@ impl FileDescription for Event {
return ecx.set_last_error_and_return(ErrorKind::InvalidInput, dest);
}

// Read the user supplied value from the pointer.
// Read the user-supplied value from the pointer.
let buf_place = ecx.ptr_to_mplace_unaligned(ptr, ty);
let num = ecx.read_scalar(&buf_place)?.to_u64()?;

Expand All @@ -137,27 +118,8 @@ impl FileDescription for Event {
}
// If the addition does not let the counter to exceed the maximum value, update the counter.
// Else, block.
match self.counter.get().checked_add(num) {
Some(new_count @ 0..=MAX_COUNTER) => {
// Future `read` calls will synchronize with this write, so update the FD clock.
ecx.release_clock(|clock| {
self.clock.borrow_mut().join(clock);
});
self.counter.set(new_count);
}
None | Some(u64::MAX) =>
if self.is_nonblock {
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
} else {
throw_unsup_format!("eventfd: blocking is unsupported");
},
};
// When any of the event happened, we check and update the status of all supported event
// types for current file description.
ecx.check_and_update_readiness(self_ref)?;

// Return how many bytes we read.
ecx.write_int(buf_place.layout.size.bytes(), dest)
let weak_eventfd = self_ref.downgrade();
eventfd_write(num, buf_place, dest, weak_eventfd, ecx)
}
}

Expand Down Expand Up @@ -217,8 +179,151 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
counter: Cell::new(val.into()),
is_nonblock,
clock: RefCell::new(VClock::default()),
blocked_read_tid: RefCell::new(Vec::new()),
blocked_write_tid: RefCell::new(Vec::new()),
});

interp_ok(Scalar::from_i32(fd_value))
}
}

/// Block thread if the value addition will exceed u64::MAX -1,
/// else just add the user-supplied value to current counter.
fn eventfd_write<'tcx>(
num: u64,
buf_place: MPlaceTy<'tcx>,
dest: &MPlaceTy<'tcx>,
weak_eventfd: WeakFileDescriptionRef,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let Some(eventfd_ref) = weak_eventfd.upgrade() else {
throw_unsup_format!("eventfd FD got closed while blocking.")
};

// Since we pass the weak file description ref, it is guaranteed to be
// an eventfd file description.
let eventfd = eventfd_ref.downcast::<Event>().unwrap();

match eventfd.counter.get().checked_add(num) {
Some(new_count @ 0..=MAX_COUNTER) => {
// Future `read` calls will synchronize with this write, so update the FD clock.
ecx.release_clock(|clock| {
eventfd.clock.borrow_mut().join(clock);
});

// When this function is called, the addition is guaranteed to not exceed u64::MAX - 1.
eventfd.counter.set(new_count);

// When any of the event happened, we check and update the status of all supported event
// types for current file description.
ecx.check_and_update_readiness(&eventfd_ref)?;
Comment on lines +217 to +219
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This used to be done in both match arms. The comment even says that. Now it was moved to only run in one match arm. The comment is outdated now, and also -- why was it moved?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it is because in the other arm, nothing actually changes, we just block?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it is because in the other arm, nothing actually changes, we just block?

Yes exactly. There are four scenarios that could happen here:

  1. It succeed without blocking, we do check_and_update_readiness.
  2. It blocked and eventually unblock because some event happened, it will hit the succeed path that contains check_and_update_readiness.
  3. It errors out with ErrorKind::WouldBlock.
  4. It blocked and never get unblocked, in that case, that's a deadlock.


// Unblock *all* threads previously blocked on `read`.
// We need to take out the blocked thread ids and unblock them together,
// because `unblock_threads` may block them again and end up re-adding the
// thread to the blocked list.
let waiting_threads = std::mem::take(&mut *eventfd.blocked_read_tid.borrow_mut());
// FIXME: We can randomize the order of unblocking.
for thread_id in waiting_threads {
ecx.unblock_thread(thread_id, BlockReason::Eventfd)?;
}

// Return how many bytes we wrote.
return ecx.write_int(buf_place.layout.size.bytes(), dest);
}
None | Some(u64::MAX) => {
if eventfd.is_nonblock {
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
}

let dest = dest.clone();

eventfd.blocked_write_tid.borrow_mut().push(ecx.active_thread());

ecx.block_thread(
BlockReason::Eventfd,
None,
callback!(
@capture<'tcx> {
num: u64,
buf_place: MPlaceTy<'tcx>,
dest: MPlaceTy<'tcx>,
weak_eventfd: WeakFileDescriptionRef,
}
@unblock = |this| {
eventfd_write(num, buf_place, &dest, weak_eventfd, this)
}
),
);
}
};
interp_ok(())
}

/// Block thread if the current counter is 0,
/// else just return the current counter value to the caller and set the counter to 0.
fn eventfd_read<'tcx>(
buf_place: MPlaceTy<'tcx>,
dest: &MPlaceTy<'tcx>,
weak_eventfd: WeakFileDescriptionRef,
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
let Some(eventfd_ref) = weak_eventfd.upgrade() else {
throw_unsup_format!("eventfd FD got closed while blocking.")
};

// Since we pass the weak file description ref to the callback function, it is guaranteed to be
// an eventfd file description.
let eventfd = eventfd_ref.downcast::<Event>().unwrap();

// Block when counter == 0.
let counter = eventfd.counter.replace(0);

if counter == 0 {
if eventfd.is_nonblock {
return ecx.set_last_error_and_return(ErrorKind::WouldBlock, dest);
}
let dest = dest.clone();

eventfd.blocked_read_tid.borrow_mut().push(ecx.active_thread());

ecx.block_thread(
BlockReason::Eventfd,
None,
callback!(
@capture<'tcx> {
buf_place: MPlaceTy<'tcx>,
dest: MPlaceTy<'tcx>,
weak_eventfd: WeakFileDescriptionRef,
}
@unblock = |this| {
eventfd_read(buf_place, &dest, weak_eventfd, this)
}
),
);
} else {
// Synchronize with all prior `write` calls to this FD.
ecx.acquire_clock(&eventfd.clock.borrow());

// Give old counter value to userspace, and set counter value to 0.
ecx.write_int(counter, &buf_place)?;

// When any of the events happened, we check and update the status of all supported event
// types for current file description.
ecx.check_and_update_readiness(&eventfd_ref)?;

// Unblock *all* threads previously blocked on `write`.
// We need to take out the blocked thread ids and unblock them together,
// because `unblock_threads` may block them again and end up re-adding the
// thread to the blocked list.
let waiting_threads = std::mem::take(&mut *eventfd.blocked_write_tid.borrow_mut());
// FIXME: We can randomize the order of unblocking.
for thread_id in waiting_threads {
ecx.unblock_thread(thread_id, BlockReason::Eventfd)?;
}

// Tell userspace how many bytes we read.
return ecx.write_int(buf_place.layout.size.bytes(), dest);
}
interp_ok(())
}
65 changes: 65 additions & 0 deletions tests/fail-dep/libc/eventfd_block_read_twice.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//@only-target: linux
//~^ERROR: deadlocked
//~^^ERROR: deadlocked
//@compile-flags: -Zmiri-preemption-rate=0
//@error-in-other-file: deadlock

use std::thread;

// Test the behaviour of a thread being blocked on an eventfd read, get unblocked, and then
// get blocked again.

// The expected execution is
// 1. Thread 1 blocks.
// 2. Thread 2 blocks.
// 3. Thread 3 unblocks both thread 1 and thread 2.
// 4. Thread 1 reads.
// 5. Thread 2's `read` deadlocked.

fn main() {
// eventfd write will block when EFD_NONBLOCK flag is clear
// and the addition caused counter to exceed u64::MAX - 1.
let flags = libc::EFD_CLOEXEC;
let fd = unsafe { libc::eventfd(0, flags) };

let thread1 = thread::spawn(move || {
thread::park();
let mut buf: [u8; 8] = [0; 8];
// This read will block initially.
let res: i64 = unsafe { libc::read(fd, buf.as_mut_ptr().cast(), 8).try_into().unwrap() };
assert_eq!(res, 8);
let counter = u64::from_ne_bytes(buf);
assert_eq!(counter, 1_u64);
});

let thread2 = thread::spawn(move || {
thread::park();
let mut buf: [u8; 8] = [0; 8];
// This read will block initially, then get unblocked by thread3, then get blocked again
// because the `read` in thread1 executes first and set the counter to 0 again.
let res: i64 = unsafe { libc::read(fd, buf.as_mut_ptr().cast(), 8).try_into().unwrap() };
//~^ERROR: deadlocked
assert_eq!(res, 8);
let counter = u64::from_ne_bytes(buf);
assert_eq!(counter, 1_u64);
});

let thread3 = thread::spawn(move || {
thread::park();
let sized_8_data = 1_u64.to_ne_bytes();
// Write 1 to the counter, so both thread1 and thread2 will unblock.
let res: i64 = unsafe {
libc::write(fd, sized_8_data.as_ptr() as *const libc::c_void, 8).try_into().unwrap()
};
// Make sure that write is successful.
assert_eq!(res, 8);
});

thread1.thread().unpark();
thread2.thread().unpark();
thread3.thread().unpark();

thread1.join().unwrap();
thread2.join().unwrap();
thread3.join().unwrap();
}
41 changes: 41 additions & 0 deletions tests/fail-dep/libc/eventfd_block_read_twice.stderr
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
error: deadlock: the evaluated program deadlocked
--> RUSTLIB/std/src/sys/pal/PLATFORM/thread.rs:LL:CC
|
LL | let ret = unsafe { libc::pthread_join(id, ptr::null_mut()) };
| ^ the evaluated program deadlocked
|
= note: BACKTRACE:
= note: inside `std::sys::pal::PLATFORM::thread::Thread::join` at RUSTLIB/std/src/sys/pal/PLATFORM/thread.rs:LL:CC
= note: inside `std::thread::JoinInner::<'_, ()>::join` at RUSTLIB/std/src/thread/mod.rs:LL:CC
= note: inside `std::thread::JoinHandle::<()>::join` at RUSTLIB/std/src/thread/mod.rs:LL:CC
note: inside `main`
--> tests/fail-dep/libc/eventfd_block_read_twice.rs:LL:CC
|
LL | thread2.join().unwrap();
| ^^^^^^^^^^^^^^

error: deadlock: the evaluated program deadlocked
|
= note: the evaluated program deadlocked
= note: (no span available)
= note: BACKTRACE on thread `unnamed-ID`:

error: deadlock: the evaluated program deadlocked
--> tests/fail-dep/libc/eventfd_block_read_twice.rs:LL:CC
|
LL | let res: i64 = unsafe { libc::read(fd, buf.as_mut_ptr().cast(), 8).try_into().unwrap() };
| ^ the evaluated program deadlocked
|
= note: BACKTRACE on thread `unnamed-ID`:
= note: inside closure at tests/fail-dep/libc/eventfd_block_read_twice.rs:LL:CC

error: deadlock: the evaluated program deadlocked
|
= note: the evaluated program deadlocked
= note: (no span available)
= note: BACKTRACE on thread `unnamed-ID`:

note: some details are omitted, run with `MIRIFLAGS=-Zmiri-backtrace=full` for a verbose backtrace

error: aborting due to 4 previous errors

Loading