Skip to content

Commit

Permalink
Auto merge of #3932 - rust-lang:epoll-with-vector-clock, r=oli-obk
Browse files Browse the repository at this point in the history
Add vector clock to epoll ready lists

Replaces #3928

Fixes #3911
  • Loading branch information
bors committed Oct 2, 2024
2 parents 0b9a318 + d7695a0 commit 483eb7d
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 15 deletions.
41 changes: 27 additions & 14 deletions src/shims/unix/linux/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::io;
use std::rc::{Rc, Weak};
use std::time::Duration;

use crate::concurrency::VClock;
use crate::shims::unix::fd::{FdId, FileDescriptionRef, WeakFileDescriptionRef};
use crate::shims::unix::*;
use crate::*;
Expand All @@ -19,7 +20,7 @@ struct Epoll {
/// and file descriptor value.
// This is an Rc because EpollInterest need to hold a reference to update
// it.
ready_list: Rc<RefCell<BTreeMap<(FdId, i32), EpollEventInstance>>>,
ready_list: Rc<ReadyList>,
/// A list of thread ids blocked on this epoll instance.
thread_id: RefCell<Vec<ThreadId>>,
}
Expand Down Expand Up @@ -63,7 +64,7 @@ pub struct EpollEventInterest {
/// <https://man7.org/linux/man-pages/man3/epoll_event.3type.html>
data: u64,
/// Ready list of the epoll instance under which this EpollEventInterest is registered.
ready_list: Rc<RefCell<BTreeMap<(FdId, i32), EpollEventInstance>>>,
ready_list: Rc<ReadyList>,
/// The epoll file description that this EpollEventInterest is registered under.
weak_epfd: WeakFileDescriptionRef,
}
Expand All @@ -88,6 +89,12 @@ pub struct EpollReadyEvents {
pub epollerr: bool,
}

#[derive(Debug, Default)]
struct ReadyList {
mapping: RefCell<BTreeMap<(FdId, i32), EpollEventInstance>>,
clock: RefCell<VClock>,
}

impl EpollReadyEvents {
pub fn new() -> Self {
EpollReadyEvents {
Expand Down Expand Up @@ -127,7 +134,7 @@ impl EpollReadyEvents {
}

impl Epoll {
fn get_ready_list(&self) -> Rc<RefCell<BTreeMap<(FdId, i32), EpollEventInstance>>> {
fn get_ready_list(&self) -> Rc<ReadyList> {
Rc::clone(&self.ready_list)
}
}
Expand Down Expand Up @@ -207,9 +214,6 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
);
}

let mut epoll_instance = Epoll::default();
epoll_instance.ready_list = Rc::new(RefCell::new(BTreeMap::new()));

let fd = this.machine.fds.insert_new(Epoll::default());
Ok(Scalar::from_i32(fd))
}
Expand Down Expand Up @@ -377,7 +381,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
drop(epoll_interest);

// Remove related epoll_interest from ready list.
ready_list.borrow_mut().remove(&epoll_key);
ready_list.mapping.borrow_mut().remove(&epoll_key);

// Remove dangling EpollEventInterest from its global table.
// .unwrap() below should succeed because the file description id must have registered
Expand Down Expand Up @@ -471,8 +475,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
let epoll_file_description = epfd
.downcast::<Epoll>()
.ok_or_else(|| err_unsup_format!("non-epoll FD passed to `epoll_wait`"))?;
let binding = epoll_file_description.get_ready_list();
ready_list_empty = binding.borrow_mut().is_empty();
ready_list_empty = epoll_file_description.ready_list.mapping.borrow().is_empty();
thread_ids = epoll_file_description.thread_id.borrow_mut();
}
if timeout == 0 || !ready_list_empty {
Expand Down Expand Up @@ -561,9 +564,15 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
// holds a strong ref to epoll_interest.
let epfd = epoll_interest.borrow().weak_epfd.upgrade().unwrap();
// FIXME: We can randomly pick a thread to unblock.
if let Some(thread_id) =
epfd.downcast::<Epoll>().unwrap().thread_id.borrow_mut().pop()
{

let epoll = epfd.downcast::<Epoll>().unwrap();

// Synchronize running thread to the epoll ready list.
if let Some(clock) = &this.release_clock() {
epoll.ready_list.clock.borrow_mut().join(clock);
}

if let Some(thread_id) = epoll.thread_id.borrow_mut().pop() {
waiter.push(thread_id);
};
}
Expand Down Expand Up @@ -617,7 +626,7 @@ fn check_and_update_one_event_interest<'tcx>(
// insert an epoll_return to the ready list.
if flags != 0 {
let epoll_key = (id, epoll_event_interest.fd_num);
let ready_list = &mut epoll_event_interest.ready_list.borrow_mut();
let ready_list = &mut epoll_event_interest.ready_list.mapping.borrow_mut();
let event_instance = EpollEventInstance::new(flags, epoll_event_interest.data);
// Triggers the notification by inserting it to the ready list.
ready_list.insert(epoll_key, event_instance);
Expand All @@ -644,7 +653,11 @@ fn blocking_epoll_callback<'tcx>(
.ok_or_else(|| err_unsup_format!("non-epoll FD passed to `epoll_wait`"))?;

let ready_list = epoll_file_description.get_ready_list();
let mut ready_list = ready_list.borrow_mut();

// Synchronize waking thread from the epoll ready list.
ecx.acquire_clock(&ready_list.clock.borrow());

let mut ready_list = ready_list.mapping.borrow_mut();
let mut num_of_events: i32 = 0;
let mut array_iter = ecx.project_array_fields(events)?;

Expand Down
41 changes: 40 additions & 1 deletion tests/pass-dep/libc/libc-epoll-blocking.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//@only-target: linux
// test_epoll_block_then_unblock depends on a deterministic schedule.
// test_epoll_block_then_unblock and test_epoll_race depend on a deterministic schedule.
//@compile-flags: -Zmiri-preemption-rate=0

use std::convert::TryInto;
Expand All @@ -12,6 +12,7 @@ fn main() {
test_epoll_block_without_notification();
test_epoll_block_then_unblock();
test_notification_after_timeout();
test_epoll_race();
}

// Using `as` cast since `EPOLLET` wraps around
Expand Down Expand Up @@ -137,3 +138,41 @@ fn test_notification_after_timeout() {
let expected_value = fds[0] as u64;
check_epoll_wait::<1>(epfd, &[(expected_event, expected_value)], 10);
}

// This test shows a data_race before epoll had vector clocks added.
fn test_epoll_race() {
// Create an epoll instance.
let epfd = unsafe { libc::epoll_create1(0) };
assert_ne!(epfd, -1);

// Create an eventfd instance.
let flags = libc::EFD_NONBLOCK | libc::EFD_CLOEXEC;
let fd = unsafe { libc::eventfd(0, flags) };

// Register eventfd with the epoll instance.
let mut ev = libc::epoll_event { events: EPOLL_IN_OUT_ET, u64: fd as u64 };
let res = unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, fd, &mut ev) };
assert_eq!(res, 0);

static mut VAL: u8 = 0;
let thread1 = thread::spawn(move || {
// Write to the static mut variable.
unsafe { VAL = 1 };
// Write to the eventfd instance.
let sized_8_data: [u8; 8] = 1_u64.to_ne_bytes();
let res = unsafe { libc::write(fd, sized_8_data.as_ptr() as *const libc::c_void, 8) };
// read returns number of bytes that have been read, which is always 8.
assert_eq!(res, 8);
});
thread::yield_now();
// epoll_wait for the event to happen.
let expected_event = u32::try_from(libc::EPOLLIN | libc::EPOLLOUT).unwrap();
let expected_value = u64::try_from(fd).unwrap();
check_epoll_wait::<8>(epfd, &[(expected_event, expected_value)], -1);
// Read from the static mut variable.
#[allow(static_mut_refs)]
unsafe {
assert_eq!(VAL, 1)
};
thread1.join().unwrap();
}

0 comments on commit 483eb7d

Please sign in to comment.