Skip to content

Commit

Permalink
Auto merge of rust-lang#2641 - DrMeepster:init_once_acquire, r=RalfJung
Browse files Browse the repository at this point in the history
InitOnce: synchronize with completion when already complete

The completion of an InitOnce happens-before the threads waiting on it wake up. However, this is not the case for threads that call `InitOnceBeginInitialize` after the completion, leading to data races and outdated weak memory loads as observed in the CI for  rust-lang#2638. This PR fixes this.
  • Loading branch information
bors committed Nov 4, 2022
2 parents c20217f + bc05e6b commit 9568d7e
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 33 deletions.
86 changes: 55 additions & 31 deletions src/concurrency/init_once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::num::NonZeroU32;

use rustc_index::vec::Idx;

use super::sync::EvalContextExtPriv;
use super::sync::EvalContextExtPriv as _;
use super::thread::MachineCallback;
use super::vector_clock::VClock;
use crate::*;
Expand Down Expand Up @@ -52,6 +52,43 @@ impl<'mir, 'tcx> VisitTags for InitOnce<'mir, 'tcx> {
}
}

impl<'mir, 'tcx: 'mir> EvalContextExtPriv<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {}
trait EvalContextExtPriv<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
/// Synchronize with the previous initialization attempt of an InitOnce.
#[inline]
fn init_once_observe_attempt(&mut self, id: InitOnceId) {
let this = self.eval_context_mut();
let current_thread = this.get_active_thread();

if let Some(data_race) = &this.machine.data_race {
data_race.validate_lock_acquire(
&this.machine.threads.sync.init_onces[id].data_race,
current_thread,
);
}
}

#[inline]
fn init_once_wake_waiter(
&mut self,
id: InitOnceId,
waiter: InitOnceWaiter<'mir, 'tcx>,
) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
let current_thread = this.get_active_thread();

this.unblock_thread(waiter.thread);

// Call callback, with the woken-up thread as `current`.
this.set_active_thread(waiter.thread);
this.init_once_observe_attempt(id);
waiter.callback.call(this)?;
this.set_active_thread(current_thread);

Ok(())
}
}

impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {}
pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
fn init_once_get_or_create_id(
Expand Down Expand Up @@ -141,20 +178,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
// Wake up everyone.
// need to take the queue to avoid having `this` be borrowed multiple times
for waiter in std::mem::take(&mut init_once.waiters) {
// End of the wait happens-before woken-up thread.
if let Some(data_race) = &this.machine.data_race {
data_race.validate_lock_acquire(
&this.machine.threads.sync.init_onces[id].data_race,
waiter.thread,
);
}

this.unblock_thread(waiter.thread);

// Call callback, with the woken-up thread as `current`.
this.set_active_thread(waiter.thread);
waiter.callback.call(this)?;
this.set_active_thread(current_thread);
this.init_once_wake_waiter(id, waiter)?;
}

Ok(())
Expand All @@ -172,33 +196,33 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
);

// Each complete happens-before the end of the wait
// FIXME: should this really induce synchronization? If we think of it as a lock, then yes,
// but the docs don't talk about such details.
if let Some(data_race) = &this.machine.data_race {
data_race.validate_lock_release(&mut init_once.data_race, current_thread);
}

// Wake up one waiting thread, so they can go ahead and try to init this.
if let Some(waiter) = init_once.waiters.pop_front() {
// End of the wait happens-before woken-up thread.
if let Some(data_race) = &this.machine.data_race {
data_race.validate_lock_acquire(
&this.machine.threads.sync.init_onces[id].data_race,
waiter.thread,
);
}

this.unblock_thread(waiter.thread);

// Call callback, with the woken-up thread as `current`.
this.set_active_thread(waiter.thread);
waiter.callback.call(this)?;
this.set_active_thread(current_thread);
this.init_once_wake_waiter(id, waiter)?;
} else {
// Nobody there to take this, so go back to 'uninit'
init_once.status = InitOnceStatus::Uninitialized;
}

Ok(())
}

/// Synchronize with the previous completion of an InitOnce.
/// Must only be called after checking that it is complete.
#[inline]
fn init_once_observe_completed(&mut self, id: InitOnceId) {
let this = self.eval_context_mut();

assert_eq!(
this.init_once_status(id),
InitOnceStatus::Complete,
"observing the completion of incomplete init once"
);

this.init_once_observe_attempt(id);
}
}
6 changes: 4 additions & 2 deletions src/shims/windows/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,10 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
Box::new(Callback { init_once_id: id, pending_place }),
)
}
InitOnceStatus::Complete =>
this.write_scalar(this.eval_windows("c", "FALSE")?, &pending_place)?,
InitOnceStatus::Complete => {
this.init_once_observe_completed(id);
this.write_scalar(this.eval_windows("c", "FALSE")?, &pending_place)?;
}
}

// This always succeeds (even if the thread is blocked, we will succeed if we ever unblock).
Expand Down
38 changes: 38 additions & 0 deletions tests/pass/concurrency/windows_init_once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,46 @@ fn retry_on_fail() {
waiter2.join().unwrap();
}

fn no_data_race_after_complete() {
let mut init_once = null_mut();
let mut pending = 0;

unsafe {
assert_eq!(InitOnceBeginInitialize(&mut init_once, 0, &mut pending, null_mut()), TRUE);
assert_eq!(pending, TRUE);
}

let init_once_ptr = SendPtr(&mut init_once);

let mut place = 0;
let place_ptr = SendPtr(&mut place);

let reader = thread::spawn(move || unsafe {
let mut pending = 0;

// this doesn't block because reader only executes after `InitOnceComplete` is called
assert_eq!(InitOnceBeginInitialize(init_once_ptr.0, 0, &mut pending, null_mut()), TRUE);
assert_eq!(pending, FALSE);
// this should not data race
place_ptr.0.read()
});

unsafe {
// this should not data race
place_ptr.0.write(1);
}

unsafe {
assert_eq!(InitOnceComplete(init_once_ptr.0, 0, null_mut()), TRUE);
}

// run reader (without preemption, it has not taken a step yet)
assert_eq!(reader.join().unwrap(), 1);
}

fn main() {
single_thread();
block_until_complete();
retry_on_fail();
no_data_race_after_complete();
}

0 comments on commit 9568d7e

Please sign in to comment.