From 45aa6c8d1bc2f7863c92da6643de4642bb2d83bf Mon Sep 17 00:00:00 2001 From: Simonas Kazlauskas Date: Sat, 4 Apr 2015 00:46:54 +0300 Subject: [PATCH] Implement reentrant mutexes and make stdio use them write_fmt calls write for each formatted field. The default implementation of write_fmt is used, which will call write on not-yet-locked stdout (and write locking after), therefore making print! in multithreaded environment still interleave contents of two separate prints. This patch implements reentrant mutexes, changes stdio handles to use these mutexes and overrides write_fmt to lock the stdio handle for the whole duration of the call. --- src/libstd/io/stdio.rs | 42 ++-- src/libstd/sync/condvar.rs | 6 +- src/libstd/sync/mod.rs | 13 +- src/libstd/sync/mutex.rs | 6 +- src/libstd/sync/rwlock.rs | 4 +- src/libstd/sys/common/mod.rs | 2 + src/libstd/{sync => sys/common}/poison.rs | 0 src/libstd/sys/common/remutex.rs | 233 ++++++++++++++++++++++ src/libstd/sys/unix/mutex.rs | 48 +++++ src/libstd/sys/unix/sync.rs | 29 ++- src/libstd/sys/windows/mutex.rs | 31 +++ src/libstd/sys/windows/sync.rs | 18 +- src/test/run-pass/atomic-print.rs | 48 +++++ 13 files changed, 446 insertions(+), 34 deletions(-) rename src/libstd/{sync => sys/common}/poison.rs (100%) create mode 100644 src/libstd/sys/common/remutex.rs create mode 100644 src/test/run-pass/atomic-print.rs diff --git a/src/libstd/io/stdio.rs b/src/libstd/io/stdio.rs index d361f17cbe41b..2850d92e34d40 100644 --- a/src/libstd/io/stdio.rs +++ b/src/libstd/io/stdio.rs @@ -18,6 +18,7 @@ use io::lazy::Lazy; use io::{self, BufReader, LineWriter}; use sync::{Arc, Mutex, MutexGuard}; use sys::stdio; +use sys_common::remutex::{ReentrantMutex, ReentrantMutexGuard}; /// Stdout used by print! and println! macros thread_local! { @@ -210,7 +211,7 @@ pub struct Stdout { // FIXME: this should be LineWriter or BufWriter depending on the state of // stdout (tty or not). Note that if this is not line buffered it // should also flush-on-panic or some form of flush-on-abort. - inner: Arc>>, + inner: Arc>>>, } /// A locked reference to the a `Stdout` handle. @@ -219,7 +220,7 @@ pub struct Stdout { /// method on `Stdout`. #[stable(feature = "rust1", since = "1.0.0")] pub struct StdoutLock<'a> { - inner: MutexGuard<'a, LineWriter>, + inner: ReentrantMutexGuard<'a, RefCell>>, } /// Constructs a new reference to the standard output of the current process. @@ -231,13 +232,13 @@ pub struct StdoutLock<'a> { /// The returned handle implements the `Write` trait. #[stable(feature = "rust1", since = "1.0.0")] pub fn stdout() -> Stdout { - static INSTANCE: Lazy>> = lazy_init!(stdout_init); + static INSTANCE: Lazy>>> = lazy_init!(stdout_init); return Stdout { inner: INSTANCE.get().expect("cannot access stdout during shutdown"), }; - fn stdout_init() -> Arc>> { - Arc::new(Mutex::new(LineWriter::new(stdout_raw()))) + fn stdout_init() -> Arc>>> { + Arc::new(ReentrantMutex::new(RefCell::new(LineWriter::new(stdout_raw())))) } } @@ -264,15 +265,18 @@ impl Write for Stdout { fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { self.lock().write_all(buf) } - // Don't override write_fmt as it's possible to run arbitrary code during a - // write_fmt, allowing the possibility of a recursive lock (aka deadlock) + fn write_fmt(&mut self, args: fmt::Arguments) -> io::Result<()> { + self.lock().write_fmt(args) + } } #[stable(feature = "rust1", since = "1.0.0")] impl<'a> Write for StdoutLock<'a> { fn write(&mut self, buf: &[u8]) -> io::Result { - self.inner.write(&buf[..cmp::min(buf.len(), OUT_MAX)]) + self.inner.borrow_mut().write(&buf[..cmp::min(buf.len(), OUT_MAX)]) + } + fn flush(&mut self) -> io::Result<()> { + self.inner.borrow_mut().flush() } - fn flush(&mut self) -> io::Result<()> { self.inner.flush() } } /// A handle to the standard error stream of a process. @@ -280,7 +284,7 @@ impl<'a> Write for StdoutLock<'a> { /// For more information, see `stderr` #[stable(feature = "rust1", since = "1.0.0")] pub struct Stderr { - inner: Arc>, + inner: Arc>>, } /// A locked reference to the a `Stderr` handle. @@ -289,7 +293,7 @@ pub struct Stderr { /// method on `Stderr`. #[stable(feature = "rust1", since = "1.0.0")] pub struct StderrLock<'a> { - inner: MutexGuard<'a, StderrRaw>, + inner: ReentrantMutexGuard<'a, RefCell>, } /// Constructs a new reference to the standard error stream of a process. @@ -300,13 +304,13 @@ pub struct StderrLock<'a> { /// The returned handle implements the `Write` trait. #[stable(feature = "rust1", since = "1.0.0")] pub fn stderr() -> Stderr { - static INSTANCE: Lazy> = lazy_init!(stderr_init); + static INSTANCE: Lazy>> = lazy_init!(stderr_init); return Stderr { inner: INSTANCE.get().expect("cannot access stderr during shutdown"), }; - fn stderr_init() -> Arc> { - Arc::new(Mutex::new(stderr_raw())) + fn stderr_init() -> Arc>> { + Arc::new(ReentrantMutex::new(RefCell::new(stderr_raw()))) } } @@ -333,14 +337,18 @@ impl Write for Stderr { fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { self.lock().write_all(buf) } - // Don't override write_fmt for the same reasons as Stdout + fn write_fmt(&mut self, args: fmt::Arguments) -> io::Result<()> { + self.lock().write_fmt(args) + } } #[stable(feature = "rust1", since = "1.0.0")] impl<'a> Write for StderrLock<'a> { fn write(&mut self, buf: &[u8]) -> io::Result { - self.inner.write(&buf[..cmp::min(buf.len(), OUT_MAX)]) + self.inner.borrow_mut().write(&buf[..cmp::min(buf.len(), OUT_MAX)]) + } + fn flush(&mut self) -> io::Result<()> { + self.inner.borrow_mut().flush() } - fn flush(&mut self) -> io::Result<()> { self.inner.flush() } } /// Resets the task-local stderr handle to the specified writer diff --git a/src/libstd/sync/condvar.rs b/src/libstd/sync/condvar.rs index a7d8b287a64c0..654b33f1a579d 100644 --- a/src/libstd/sync/condvar.rs +++ b/src/libstd/sync/condvar.rs @@ -11,12 +11,12 @@ use prelude::v1::*; use sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; -use sync::poison::{self, LockResult}; -use sys::time::SteadyTime; +use sync::{mutex, MutexGuard, PoisonError}; use sys_common::condvar as sys; use sys_common::mutex as sys_mutex; +use sys_common::poison::{self, LockResult}; +use sys::time::SteadyTime; use time::Duration; -use sync::{mutex, MutexGuard, PoisonError}; /// A Condition Variable /// diff --git a/src/libstd/sync/mod.rs b/src/libstd/sync/mod.rs index a5259a00390f5..91e9714fbef48 100644 --- a/src/libstd/sync/mod.rs +++ b/src/libstd/sync/mod.rs @@ -20,15 +20,15 @@ pub use alloc::arc::{Arc, Weak}; pub use core::atomic; -pub use self::mutex::{Mutex, MutexGuard, StaticMutex}; -pub use self::mutex::MUTEX_INIT; -pub use self::rwlock::{RwLock, StaticRwLock, RW_LOCK_INIT}; -pub use self::rwlock::{RwLockReadGuard, RwLockWriteGuard}; +pub use self::barrier::{Barrier, BarrierWaitResult}; pub use self::condvar::{Condvar, StaticCondvar, CONDVAR_INIT}; +pub use self::mutex::MUTEX_INIT; +pub use self::mutex::{Mutex, MutexGuard, StaticMutex}; pub use self::once::{Once, ONCE_INIT}; +pub use sys_common::poison::{PoisonError, TryLockError, TryLockResult, LockResult}; +pub use self::rwlock::{RwLockReadGuard, RwLockWriteGuard}; +pub use self::rwlock::{RwLock, StaticRwLock, RW_LOCK_INIT}; pub use self::semaphore::{Semaphore, SemaphoreGuard}; -pub use self::barrier::{Barrier, BarrierWaitResult}; -pub use self::poison::{PoisonError, TryLockError, TryLockResult, LockResult}; pub use self::future::Future; @@ -39,6 +39,5 @@ mod condvar; mod future; mod mutex; mod once; -mod poison; mod rwlock; mod semaphore; diff --git a/src/libstd/sync/mutex.rs b/src/libstd/sync/mutex.rs index 16e7f265412f3..46fb20cd6a2d6 100644 --- a/src/libstd/sync/mutex.rs +++ b/src/libstd/sync/mutex.rs @@ -11,11 +11,11 @@ use prelude::v1::*; use cell::UnsafeCell; +use fmt; use marker; use ops::{Deref, DerefMut}; -use sync::poison::{self, TryLockError, TryLockResult, LockResult}; use sys_common::mutex as sys; -use fmt; +use sys_common::poison::{self, TryLockError, TryLockResult, LockResult}; /// A mutual exclusion primitive useful for protecting shared data /// @@ -212,7 +212,7 @@ impl Mutex { /// Attempts to acquire this lock. /// - /// If the lock could not be acquired at this time, then `None` is returned. + /// If the lock could not be acquired at this time, then `Err` is returned. /// Otherwise, an RAII guard is returned. The lock will be unlocked when the /// guard is dropped. /// diff --git a/src/libstd/sync/rwlock.rs b/src/libstd/sync/rwlock.rs index d70350bc7d651..eb6d46a5dda7a 100644 --- a/src/libstd/sync/rwlock.rs +++ b/src/libstd/sync/rwlock.rs @@ -11,11 +11,11 @@ use prelude::v1::*; use cell::UnsafeCell; +use fmt; use marker; use ops::{Deref, DerefMut}; -use sync::poison::{self, LockResult, TryLockError, TryLockResult}; +use sys_common::poison::{self, LockResult, TryLockError, TryLockResult}; use sys_common::rwlock as sys; -use fmt; /// A reader-writer lock /// diff --git a/src/libstd/sys/common/mod.rs b/src/libstd/sys/common/mod.rs index d2e2f1044d612..8a01eace889c3 100644 --- a/src/libstd/sys/common/mod.rs +++ b/src/libstd/sys/common/mod.rs @@ -29,6 +29,8 @@ pub mod condvar; pub mod mutex; pub mod net; pub mod net2; +pub mod poison; +pub mod remutex; pub mod rwlock; pub mod stack; pub mod thread; diff --git a/src/libstd/sync/poison.rs b/src/libstd/sys/common/poison.rs similarity index 100% rename from src/libstd/sync/poison.rs rename to src/libstd/sys/common/poison.rs diff --git a/src/libstd/sys/common/remutex.rs b/src/libstd/sys/common/remutex.rs new file mode 100644 index 0000000000000..b35063c0e2341 --- /dev/null +++ b/src/libstd/sys/common/remutex.rs @@ -0,0 +1,233 @@ +// Copyright 2015 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. +#![unstable(feature = "reentrant_mutex", reason = "new API")] + +use prelude::v1::*; + +use fmt; +use marker; +use ops::Deref; +use sys_common::poison::{self, TryLockError, TryLockResult, LockResult}; +use sys::mutex as sys; + +/// A re-entrant mutual exclusion +/// +/// This mutex will block *other* threads waiting for the lock to become available. The thread +/// which has already locked the mutex can lock it multiple times without blocking, preventing a +/// common source of deadlocks. +pub struct ReentrantMutex { + inner: Box, + poison: poison::Flag, + data: T, +} + +unsafe impl Send for ReentrantMutex {} +unsafe impl Sync for ReentrantMutex {} + + +/// An RAII implementation of a "scoped lock" of a mutex. When this structure is +/// dropped (falls out of scope), the lock will be unlocked. +/// +/// The data protected by the mutex can be accessed through this guard via its +/// Deref and DerefMut implementations +#[must_use] +pub struct ReentrantMutexGuard<'a, T: 'a> { + // funny underscores due to how Deref/DerefMut currently work (they + // disregard field privacy). + __lock: &'a ReentrantMutex, + __poison: poison::Guard, +} + +impl<'a, T> !marker::Send for ReentrantMutexGuard<'a, T> {} + + +impl ReentrantMutex { + /// Creates a new reentrant mutex in an unlocked state. + pub fn new(t: T) -> ReentrantMutex { + ReentrantMutex { + inner: box unsafe { sys::ReentrantMutex::new() }, + poison: poison::FLAG_INIT, + data: t, + } + } + + /// Acquires a mutex, blocking the current thread until it is able to do so. + /// + /// This function will block the caller until it is available to acquire the mutex. + /// Upon returning, the thread is the only thread with the mutex held. When the thread + /// calling this method already holds the lock, the call shall succeed without + /// blocking. + /// + /// # Failure + /// + /// If another user of this mutex panicked while holding the mutex, then + /// this call will return failure if the mutex would otherwise be + /// acquired. + pub fn lock(&self) -> LockResult> { + unsafe { self.inner.lock() } + ReentrantMutexGuard::new(&self) + } + + /// Attempts to acquire this lock. + /// + /// If the lock could not be acquired at this time, then `Err` is returned. + /// Otherwise, an RAII guard is returned. + /// + /// This function does not block. + /// + /// # Failure + /// + /// If another user of this mutex panicked while holding the mutex, then + /// this call will return failure if the mutex would otherwise be + /// acquired. + pub fn try_lock(&self) -> TryLockResult> { + if unsafe { self.inner.try_lock() } { + Ok(try!(ReentrantMutexGuard::new(&self))) + } else { + Err(TryLockError::WouldBlock) + } + } +} + +#[unsafe_destructor] +impl Drop for ReentrantMutex { + fn drop(&mut self) { + // This is actually safe b/c we know that there is no further usage of + // this mutex (it's up to the user to arrange for a mutex to get + // dropped, that's not our job) + unsafe { self.inner.destroy() } + } +} + +impl fmt::Debug for ReentrantMutex { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.try_lock() { + Ok(guard) => write!(f, "ReentrantMutex {{ data: {:?} }}", &*guard), + Err(TryLockError::Poisoned(err)) => { + write!(f, "ReentrantMutex {{ data: Poisoned({:?}) }}", &**err.get_ref()) + }, + Err(TryLockError::WouldBlock) => write!(f, "ReentrantMutex {{ }}") + } + } +} + +impl<'mutex, T> ReentrantMutexGuard<'mutex, T> { + fn new(lock: &'mutex ReentrantMutex) + -> LockResult> { + poison::map_result(lock.poison.borrow(), |guard| { + ReentrantMutexGuard { + __lock: lock, + __poison: guard, + } + }) + } +} + +impl<'mutex, T> Deref for ReentrantMutexGuard<'mutex, T> { + type Target = T; + + fn deref<'a>(&'a self) -> &'a T { + &self.__lock.data + } +} + +#[unsafe_destructor] +impl<'a, T> Drop for ReentrantMutexGuard<'a, T> { + #[inline] + fn drop(&mut self) { + unsafe { + self.__lock.poison.done(&self.__poison); + self.__lock.inner.unlock(); + } + } +} + + +#[cfg(test)] +mod test { + use prelude::v1::*; + use sys_common::remutex::{ReentrantMutex, ReentrantMutexGuard}; + use cell::RefCell; + use sync::Arc; + use boxed; + use thread; + + #[test] + fn smoke() { + let m = ReentrantMutex::new(()); + { + let a = m.lock().unwrap(); + { + let b = m.lock().unwrap(); + { + let c = m.lock().unwrap(); + assert_eq!(*c, ()); + } + assert_eq!(*b, ()); + } + assert_eq!(*a, ()); + } + } + + #[test] + fn is_mutex() { + let m = ReentrantMutex::new(RefCell::new(0)); + let lock = m.lock().unwrap(); + let handle = thread::scoped(|| { + let lock = m.lock().unwrap(); + assert_eq!(*lock.borrow(), 4950); + }); + for i in 0..100 { + let mut lock = m.lock().unwrap(); + *lock.borrow_mut() += i; + } + drop(lock); + drop(handle); + } + + #[test] + fn trylock_works() { + let m = ReentrantMutex::new(()); + let lock = m.try_lock().unwrap(); + let lock2 = m.try_lock().unwrap(); + { + thread::scoped(|| { + let lock = m.try_lock(); + assert!(lock.is_err()); + }); + } + let lock3 = m.try_lock().unwrap(); + } + + pub struct Answer<'a>(pub ReentrantMutexGuard<'a, RefCell>); + impl<'a> Drop for Answer<'a> { + fn drop(&mut self) { + *self.0.borrow_mut() = 42; + } + } + + #[test] + fn poison_works() { + let m = Arc::new(ReentrantMutex::new(RefCell::new(0))); + let mc = m.clone(); + let result = thread::spawn(move ||{ + let lock = mc.lock().unwrap(); + *lock.borrow_mut() = 1; + let lock2 = mc.lock().unwrap(); + *lock.borrow_mut() = 2; + let answer = Answer(lock2); + panic!("What the answer to my lifetimes dilemma is?"); + drop(answer); + }).join(); + assert!(result.is_err()); + let r = m.lock().err().unwrap().into_inner(); + assert_eq!(*r.borrow(), 42); + } +} diff --git a/src/libstd/sys/unix/mutex.rs b/src/libstd/sys/unix/mutex.rs index 1c0ce2938040d..af814653c1466 100644 --- a/src/libstd/sys/unix/mutex.rs +++ b/src/libstd/sys/unix/mutex.rs @@ -12,6 +12,7 @@ use prelude::v1::*; use cell::UnsafeCell; use sys::sync as ffi; +use mem; pub struct Mutex { inner: UnsafeCell } @@ -67,3 +68,50 @@ impl Mutex { debug_assert!(r == 0 || r == libc::EINVAL); } } + +// FIXME: remove the box, because box happens twice now, once at the common layer and once here. +// Box is necessary here, because mutex may not change address after it is intialised on some +// platforms. Regular Mutex above handles this by offloading intialisation to the OS on first lock. +// Sadly, as far as reentrant mutexes go, this scheme is not quite portable and we must initialise +// when we create the mutex, in the `new`. +pub struct ReentrantMutex { inner: Box> } + +unsafe impl Send for ReentrantMutex {} +unsafe impl Sync for ReentrantMutex {} + +impl ReentrantMutex { + pub unsafe fn new() -> ReentrantMutex { + let mutex = ReentrantMutex { inner: box mem::uninitialized() }; + let mut attr: ffi::pthread_mutexattr_t = mem::uninitialized(); + let result = ffi::pthread_mutexattr_init(&mut attr as *mut _); + debug_assert_eq!(result, 0); + let result = ffi::pthread_mutexattr_settype(&mut attr as *mut _, + ffi::PTHREAD_MUTEX_RECURSIVE); + debug_assert_eq!(result, 0); + let result = ffi::pthread_mutex_init(mutex.inner.get(), &attr as *const _); + debug_assert_eq!(result, 0); + let result = ffi::pthread_mutexattr_destroy(&mut attr as *mut _); + debug_assert_eq!(result, 0); + mutex + } + + pub unsafe fn lock(&self) { + let result = ffi::pthread_mutex_lock(self.inner.get()); + debug_assert_eq!(result, 0); + } + + #[inline] + pub unsafe fn try_lock(&self) -> bool { + ffi::pthread_mutex_trylock(self.inner.get()) == 0 + } + + pub unsafe fn unlock(&self) { + let result = ffi::pthread_mutex_unlock(self.inner.get()); + debug_assert_eq!(result, 0); + } + + pub unsafe fn destroy(&self) { + let result = ffi::pthread_mutex_destroy(self.inner.get()); + debug_assert_eq!(result, 0); + } +} diff --git a/src/libstd/sys/unix/sync.rs b/src/libstd/sys/unix/sync.rs index 3c05fd602be85..41e1e206a423a 100644 --- a/src/libstd/sys/unix/sync.rs +++ b/src/libstd/sys/unix/sync.rs @@ -12,17 +12,25 @@ use libc; -pub use self::os::{PTHREAD_MUTEX_INITIALIZER, pthread_mutex_t}; +pub use self::os::{PTHREAD_MUTEX_INITIALIZER, PTHREAD_MUTEX_RECURSIVE, pthread_mutex_t, + pthread_mutexattr_t}; pub use self::os::{PTHREAD_COND_INITIALIZER, pthread_cond_t}; pub use self::os::{PTHREAD_RWLOCK_INITIALIZER, pthread_rwlock_t}; extern { // mutexes + pub fn pthread_mutex_init(lock: *mut pthread_mutex_t, attr: *const pthread_mutexattr_t) + -> libc::c_int; pub fn pthread_mutex_destroy(lock: *mut pthread_mutex_t) -> libc::c_int; pub fn pthread_mutex_lock(lock: *mut pthread_mutex_t) -> libc::c_int; pub fn pthread_mutex_trylock(lock: *mut pthread_mutex_t) -> libc::c_int; pub fn pthread_mutex_unlock(lock: *mut pthread_mutex_t) -> libc::c_int; + pub fn pthread_mutexattr_init(attr: *mut pthread_mutexattr_t) -> libc::c_int; + pub fn pthread_mutexattr_destroy(attr: *mut pthread_mutexattr_t) -> libc::c_int; + pub fn pthread_mutexattr_settype(attr: *mut pthread_mutexattr_t, _type: libc::c_int) + -> libc::c_int; + // cvars pub fn pthread_cond_wait(cond: *mut pthread_cond_t, lock: *mut pthread_mutex_t) -> libc::c_int; @@ -52,12 +60,14 @@ mod os { use libc; pub type pthread_mutex_t = *mut libc::c_void; + pub type pthread_mutexattr_t = *mut libc::c_void; pub type pthread_cond_t = *mut libc::c_void; pub type pthread_rwlock_t = *mut libc::c_void; pub const PTHREAD_MUTEX_INITIALIZER: pthread_mutex_t = 0 as *mut _; pub const PTHREAD_COND_INITIALIZER: pthread_cond_t = 0 as *mut _; pub const PTHREAD_RWLOCK_INITIALIZER: pthread_rwlock_t = 0 as *mut _; + pub const PTHREAD_MUTEX_RECURSIVE: libc::c_int = 2; } #[cfg(any(target_os = "macos", target_os = "ios"))] @@ -95,6 +105,12 @@ mod os { __opaque: [u8; __PTHREAD_MUTEX_SIZE__], } #[repr(C)] + pub struct pthread_mutexattr_t { + __sig: libc::c_long, + // note, that this is 16 bytes just to be safe, the actual struct might be smaller. + __opaque: [u8; 16], + } + #[repr(C)] pub struct pthread_cond_t { __sig: libc::c_long, __opaque: [u8; __PTHREAD_COND_SIZE__], @@ -117,6 +133,8 @@ mod os { __sig: _PTHREAD_RWLOCK_SIG_INIT, __opaque: [0; __PTHREAD_RWLOCK_SIZE__], }; + + pub const PTHREAD_MUTEX_RECURSIVE: libc::c_int = 2; } #[cfg(target_os = "linux")] @@ -161,6 +179,12 @@ mod os { size: [u8; __SIZEOF_PTHREAD_MUTEX_T], } #[repr(C)] + pub struct pthread_mutexattr_t { + __align: libc::c_longlong, + // note, that this is 16 bytes just to be safe, the actual struct might be smaller. + size: [u8; 16], + } + #[repr(C)] pub struct pthread_cond_t { __align: libc::c_longlong, size: [u8; __SIZEOF_PTHREAD_COND_T], @@ -183,6 +207,7 @@ mod os { __align: 0, size: [0; __SIZEOF_PTHREAD_RWLOCK_T], }; + pub const PTHREAD_MUTEX_RECURSIVE: libc::c_int = 1; } #[cfg(target_os = "android")] mod os { @@ -190,6 +215,7 @@ mod os { #[repr(C)] pub struct pthread_mutex_t { value: libc::c_int } + pub type pthread_mutexattr_t = libc::c_long; #[repr(C)] pub struct pthread_cond_t { value: libc::c_int } #[repr(C)] @@ -218,4 +244,5 @@ mod os { pendingWriters: 0, reserved: [0 as *mut _; 4], }; + pub const PTHREAD_MUTEX_RECURSIVE: libc::c_int = 1; } diff --git a/src/libstd/sys/windows/mutex.rs b/src/libstd/sys/windows/mutex.rs index 0847f3b52bfab..ca20858bb5bc5 100644 --- a/src/libstd/sys/windows/mutex.rs +++ b/src/libstd/sys/windows/mutex.rs @@ -12,6 +12,7 @@ use prelude::v1::*; use cell::UnsafeCell; use sys::sync as ffi; +use mem; pub struct Mutex { inner: UnsafeCell } @@ -57,3 +58,33 @@ impl Mutex { // ... } } + +pub struct ReentrantMutex { inner: Box> } + +unsafe impl Send for ReentrantMutex {} +unsafe impl Sync for ReentrantMutex {} + +impl ReentrantMutex { + pub unsafe fn new() -> ReentrantMutex { + let mutex = ReentrantMutex { inner: box mem::uninitialized() }; + ffi::InitializeCriticalSection(mutex.inner.get()); + mutex + } + + pub unsafe fn lock(&self) { + ffi::EnterCriticalSection(self.inner.get()); + } + + #[inline] + pub unsafe fn try_lock(&self) -> bool { + ffi::TryEnterCriticalSection(self.inner.get()) != 0 + } + + pub unsafe fn unlock(&self) { + ffi::LeaveCriticalSection(self.inner.get()); + } + + pub unsafe fn destroy(&self) { + ffi::DeleteCriticalSection(self.inner.get()); + } +} diff --git a/src/libstd/sys/windows/sync.rs b/src/libstd/sys/windows/sync.rs index 7614104c98bf3..5410259540eac 100644 --- a/src/libstd/sys/windows/sync.rs +++ b/src/libstd/sys/windows/sync.rs @@ -8,17 +8,27 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use libc::{BOOL, DWORD, LPVOID, c_ulong}; +use libc::{BOOL, DWORD, LPVOID, LONG, HANDLE, c_ulong}; use libc::types::os::arch::extra::BOOLEAN; pub type PCONDITION_VARIABLE = *mut CONDITION_VARIABLE; pub type PSRWLOCK = *mut SRWLOCK; pub type ULONG = c_ulong; +pub type ULONG_PTR = c_ulong; #[repr(C)] pub struct CONDITION_VARIABLE { pub ptr: LPVOID } #[repr(C)] pub struct SRWLOCK { pub ptr: LPVOID } +#[repr(C)] +pub struct CRITICAL_SECTION { + CriticalSectionDebug: LPVOID, + LockCount: LONG, + RecursionCount: LONG, + OwningThread: HANDLE, + LockSemaphore: HANDLE, + SpinCount: ULONG_PTR +} pub const CONDITION_VARIABLE_INIT: CONDITION_VARIABLE = CONDITION_VARIABLE { ptr: 0 as *mut _, @@ -41,4 +51,10 @@ extern "system" { pub fn ReleaseSRWLockShared(SRWLock: PSRWLOCK); pub fn TryAcquireSRWLockExclusive(SRWLock: PSRWLOCK) -> BOOLEAN; pub fn TryAcquireSRWLockShared(SRWLock: PSRWLOCK) -> BOOLEAN; + + pub fn InitializeCriticalSection(CriticalSection: *mut CRITICAL_SECTION); + pub fn EnterCriticalSection(CriticalSection: *mut CRITICAL_SECTION); + pub fn TryEnterCriticalSection(CriticalSection: *mut CRITICAL_SECTION) -> BOOLEAN; + pub fn LeaveCriticalSection(CriticalSection: *mut CRITICAL_SECTION); + pub fn DeleteCriticalSection(CriticalSection: *mut CRITICAL_SECTION); } diff --git a/src/test/run-pass/atomic-print.rs b/src/test/run-pass/atomic-print.rs new file mode 100644 index 0000000000000..df3b572bce49b --- /dev/null +++ b/src/test/run-pass/atomic-print.rs @@ -0,0 +1,48 @@ +// Copyright 2015 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::{env, fmt, process, sync, thread}; + +struct SlowFmt(u32); +impl fmt::Debug for SlowFmt { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + thread::sleep_ms(3); + self.0.fmt(f) + } +} + +fn do_print(x: u32) { + let x = SlowFmt(x); + println!("{:?}{:?}{:?}{:?}{:?}", x, x, x, x, x); +} + +fn main(){ + if env::args().count() == 2 { + let barrier = sync::Arc::new(sync::Barrier::new(2)); + let tbarrier = barrier.clone(); + let t = thread::scoped(||{ + tbarrier.wait(); + do_print(1); + }); + barrier.wait(); + do_print(2); + t.join(); + } else { + let this = env::args().next().unwrap(); + let output = process::Command::new(this).arg("-").output().unwrap(); + for line in String::from_utf8(output.stdout).unwrap().lines() { + match line.chars().next().unwrap() { + '1' => assert_eq!(line, "11111"), + '2' => assert_eq!(line, "22222"), + _ => panic!("Unexpected character") + } + } + } +}