Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add vector clock to epoll ready lists #3932

Merged
merged 4 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 27 additions & 14 deletions src/shims/unix/linux/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::io;
use std::rc::{Rc, Weak};
use std::time::Duration;

use crate::concurrency::VClock;
use crate::shims::unix::fd::{FdId, FileDescriptionRef, WeakFileDescriptionRef};
use crate::shims::unix::*;
use crate::*;
Expand All @@ -19,7 +20,7 @@ struct Epoll {
/// and file descriptor value.
// This is an Rc because EpollInterest need to hold a reference to update
// it.
ready_list: Rc<RefCell<BTreeMap<(FdId, i32), EpollEventInstance>>>,
ready_list: Rc<ReadyList>,
/// A list of thread ids blocked on this epoll instance.
thread_id: RefCell<Vec<ThreadId>>,
}
Expand Down Expand Up @@ -63,7 +64,7 @@ pub struct EpollEventInterest {
/// <https://man7.org/linux/man-pages/man3/epoll_event.3type.html>
data: u64,
/// Ready list of the epoll instance under which this EpollEventInterest is registered.
ready_list: Rc<RefCell<BTreeMap<(FdId, i32), EpollEventInstance>>>,
ready_list: Rc<ReadyList>,
/// The epoll file description that this EpollEventInterest is registered under.
weak_epfd: WeakFileDescriptionRef,
}
Expand All @@ -88,6 +89,12 @@ pub struct EpollReadyEvents {
pub epollerr: bool,
}

#[derive(Debug, Default)]
struct ReadyList {
mapping: RefCell<BTreeMap<(FdId, i32), EpollEventInstance>>,
clock: RefCell<VClock>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't we say in the issue that we wanted a clock per event?

That would avoid accidentally synchronizing with events we didn't even receive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RalfJung Do you feel this synch is too broad?

/// Callback function after epoll_wait unblocks
fn blocking_epoll_callback<'tcx>(
    epfd_value: i32,
    weak_epfd: WeakFileDescriptionRef,
    dest: &MPlaceTy<'tcx>,
    events: &MPlaceTy<'tcx>,
    ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx> {
    let Some(epfd) = weak_epfd.upgrade() else {
        throw_unsup_format!("epoll FD {epfd_value} got closed while blocking.")
    };

    let epoll_file_description = epfd
        .downcast::<Epoll>()
        .ok_or_else(|| err_unsup_format!("non-epoll FD passed to `epoll_wait`"))?;

    let ready_list = epoll_file_description.get_ready_list();

    // Synchronize waking thread from the epoll ready list.
    ecx.acquire_clock(&ready_list.clock.borrow());

    let mut ready_list = ready_list.mapping.borrow_mut();
    let mut num_of_events: i32 = 0;
    let mut array_iter = ecx.project_array_fields(events)?;

    while let Some(des) = array_iter.next(ecx)? {
        if let Some(epoll_event_instance) = ready_list_next(ecx, &mut ready_list) {
            ecx.write_int_fields_named(
                &[
                    ("events", epoll_event_instance.events.into()),
                    ("u64", epoll_event_instance.data.into()),
                ],
                &des.1,
            )?;
            num_of_events = num_of_events.strict_add(1);
        } else {
            break;
        }
    }
    ecx.write_int(num_of_events, dest)?;
    interp_ok(())
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what you are currently doing, right? Yes I think this syncs too much: #3944

}

impl EpollReadyEvents {
pub fn new() -> Self {
EpollReadyEvents {
Expand Down Expand Up @@ -127,7 +134,7 @@ impl EpollReadyEvents {
}

impl Epoll {
fn get_ready_list(&self) -> Rc<RefCell<BTreeMap<(FdId, i32), EpollEventInstance>>> {
fn get_ready_list(&self) -> Rc<ReadyList> {
Rc::clone(&self.ready_list)
}
}
Expand Down Expand Up @@ -207,9 +214,6 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
);
}

let mut epoll_instance = Epoll::default();
epoll_instance.ready_list = Rc::new(RefCell::new(BTreeMap::new()));

let fd = this.machine.fds.insert_new(Epoll::default());
Ok(Scalar::from_i32(fd))
}
Expand Down Expand Up @@ -377,7 +381,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
drop(epoll_interest);

// Remove related epoll_interest from ready list.
ready_list.borrow_mut().remove(&epoll_key);
ready_list.mapping.borrow_mut().remove(&epoll_key);

// Remove dangling EpollEventInterest from its global table.
// .unwrap() below should succeed because the file description id must have registered
Expand Down Expand Up @@ -471,8 +475,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
let epoll_file_description = epfd
.downcast::<Epoll>()
.ok_or_else(|| err_unsup_format!("non-epoll FD passed to `epoll_wait`"))?;
let binding = epoll_file_description.get_ready_list();
ready_list_empty = binding.borrow_mut().is_empty();
ready_list_empty = epoll_file_description.ready_list.mapping.borrow().is_empty();
thread_ids = epoll_file_description.thread_id.borrow_mut();
}
if timeout == 0 || !ready_list_empty {
Expand Down Expand Up @@ -561,9 +564,15 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
// holds a strong ref to epoll_interest.
let epfd = epoll_interest.borrow().weak_epfd.upgrade().unwrap();
// FIXME: We can randomly pick a thread to unblock.
if let Some(thread_id) =
epfd.downcast::<Epoll>().unwrap().thread_id.borrow_mut().pop()
{

let epoll = epfd.downcast::<Epoll>().unwrap();

// Synchronize running thread to the epoll ready list.
if let Some(clock) = &this.release_clock() {
epoll.ready_list.clock.borrow_mut().join(clock);
}

if let Some(thread_id) = epoll.thread_id.borrow_mut().pop() {
waiter.push(thread_id);
};
}
Expand Down Expand Up @@ -617,7 +626,7 @@ fn check_and_update_one_event_interest<'tcx>(
// insert an epoll_return to the ready list.
if flags != 0 {
let epoll_key = (id, epoll_event_interest.fd_num);
let ready_list = &mut epoll_event_interest.ready_list.borrow_mut();
let ready_list = &mut epoll_event_interest.ready_list.mapping.borrow_mut();
let event_instance = EpollEventInstance::new(flags, epoll_event_interest.data);
// Triggers the notification by inserting it to the ready list.
ready_list.insert(epoll_key, event_instance);
Expand All @@ -644,7 +653,11 @@ fn blocking_epoll_callback<'tcx>(
.ok_or_else(|| err_unsup_format!("non-epoll FD passed to `epoll_wait`"))?;

let ready_list = epoll_file_description.get_ready_list();
let mut ready_list = ready_list.borrow_mut();

// Synchronize waking thread from the epoll ready list.
ecx.acquire_clock(&ready_list.clock.borrow());

let mut ready_list = ready_list.mapping.borrow_mut();
let mut num_of_events: i32 = 0;
let mut array_iter = ecx.project_array_fields(events)?;

Expand Down
41 changes: 40 additions & 1 deletion tests/pass-dep/libc/libc-epoll-blocking.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//@only-target: linux
// test_epoll_block_then_unblock depends on a deterministic schedule.
// test_epoll_block_then_unblock and test_epoll_race depend on a deterministic schedule.
//@compile-flags: -Zmiri-preemption-rate=0

use std::convert::TryInto;
Expand All @@ -12,6 +12,7 @@ fn main() {
test_epoll_block_without_notification();
test_epoll_block_then_unblock();
test_notification_after_timeout();
test_epoll_race();
}

// Using `as` cast since `EPOLLET` wraps around
Expand Down Expand Up @@ -137,3 +138,41 @@ fn test_notification_after_timeout() {
let expected_value = fds[0] as u64;
check_epoll_wait::<1>(epfd, &[(expected_event, expected_value)], 10);
}

// This test shows a data_race before epoll had vector clocks added.
fn test_epoll_race() {
// Create an epoll instance.
let epfd = unsafe { libc::epoll_create1(0) };
assert_ne!(epfd, -1);

// Create an eventfd instance.
let flags = libc::EFD_NONBLOCK | libc::EFD_CLOEXEC;
let fd = unsafe { libc::eventfd(0, flags) };

// Register eventfd with the epoll instance.
let mut ev = libc::epoll_event { events: EPOLL_IN_OUT_ET, u64: fd as u64 };
let res = unsafe { libc::epoll_ctl(epfd, libc::EPOLL_CTL_ADD, fd, &mut ev) };
assert_eq!(res, 0);

static mut VAL: u8 = 0;
let thread1 = thread::spawn(move || {
// Write to the static mut variable.
unsafe { VAL = 1 };
// Write to the eventfd instance.
let sized_8_data: [u8; 8] = 1_u64.to_ne_bytes();
let res = unsafe { libc::write(fd, sized_8_data.as_ptr() as *const libc::c_void, 8) };
// read returns number of bytes that have been read, which is always 8.
assert_eq!(res, 8);
});
thread::yield_now();
// epoll_wait for the event to happen.
let expected_event = u32::try_from(libc::EPOLLIN | libc::EPOLLOUT).unwrap();
let expected_value = u64::try_from(fd).unwrap();
check_epoll_wait::<8>(epfd, &[(expected_event, expected_value)], -1);
// Read from the static mut variable.
#[allow(static_mut_refs)]
unsafe {
assert_eq!(VAL, 1)
};
thread1.join().unwrap();
}