Skip to content

Commit

Permalink
bugfix for wait-notify with lock re-entrance
Browse files Browse the repository at this point in the history
Signed-off-by: Evgenii Moiseenko <[email protected]>
  • Loading branch information
eupp committed Mar 9, 2023
1 parent 258cc49 commit 6a860c5
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -453,12 +453,12 @@ abstract class ManagedStrategy(
if (inIgnoredSection(iThread)) return false
newSwitchPoint(iThread, codeLocation, tracePoint)
// Try to acquire the monitor
if (!monitorTracker.acquireMonitor(iThread, monitor)) {
failIfObstructionFreedomIsRequired { "Obstruction-freedom is required but a lock has been found" }
while (!monitorTracker.acquireMonitor(iThread, monitor)) {
failIfObstructionFreedomIsRequired {
"Obstruction-freedom is required but a lock has been found"
}
// Switch to another thread and wait for a moment when the monitor can be acquired
switchCurrentThread(iThread, SwitchReason.LOCK_WAIT, true)
// Now it is possible to acquire the monitor, do it then.
require(monitorTracker.acquireMonitor(iThread, monitor))
}
// The monitor is acquired, finish.
return false
Expand Down Expand Up @@ -509,12 +509,14 @@ abstract class ManagedStrategy(
if (!isTestThread(iThread)) return true
if (inIgnoredSection(iThread)) return false
newSwitchPoint(iThread, codeLocation, tracePoint)
failIfObstructionFreedomIsRequired { "Obstruction-freedom is required but a waiting on a monitor block has been found" }
failIfObstructionFreedomIsRequired {
"Obstruction-freedom is required but a waiting on a monitor block has been found"
}
if (withTimeout) return false // timeouts occur instantly
monitorTracker.waitOnMonitor(iThread, monitor)
// switch to another thread and wait till a notify event happens
switchCurrentThread(iThread, SwitchReason.MONITOR_WAIT, true)
require(monitorTracker.acquireMonitor(iThread, monitor)) // acquire the lock again
while (monitorTracker.waitOnMonitor(iThread, monitor)) {
val mustSwitch = monitorTracker.isWaiting(iThread)
switchCurrentThread(iThread, SwitchReason.MONITOR_WAIT, mustSwitch)
}
return false
}

Expand Down Expand Up @@ -837,10 +839,9 @@ private class MonitorTracker(nThreads: Int) {
// performed the acquisition and the the reentrancy depth.
private val acquiredMonitors = IdentityHashMap<Any, MonitorAcquiringInfo>()
// Maintains a set of monitors on which each thread is waiting.
// Note, that a thread can wait on a free monitor if it is waiting for
// a `notify` call.
// Note, that a thread can wait on a free monitor if it is waiting for a `notify` call.
// Stores `null` if thread is not waiting on any monitor.
private val acquiringMonitors = Array<Any?>(nThreads) { null }
private val waitingMonitors = Array<MonitorAcquiringInfo?>(nThreads) { null }
// Stores `true` for the threads which are waiting for a
// `notify` call on the monitor stored in `acquiringMonitor`.
private val waitForNotify = BooleanArray(nThreads) { false }
Expand All @@ -851,13 +852,15 @@ private class MonitorTracker(nThreads: Int) {
fun acquireMonitor(iThread: Int, monitor: Any): Boolean {
// Increment the reentrant depth and store the
// acquisition info if needed.
val ai = acquiredMonitors.computeIfAbsent(monitor) { MonitorAcquiringInfo(iThread, 0) }
if (ai.iThread != iThread) {
acquiringMonitors[iThread] = monitor
val info = acquiredMonitors.computeIfAbsent(monitor) {
MonitorAcquiringInfo(monitor, iThread, 0)
}
if (info.iThread != iThread) {
waitingMonitors[iThread] = MonitorAcquiringInfo(monitor, iThread, 0)
return false
}
ai.timesAcquired++
acquiringMonitors[iThread] = null // re-set
info.timesAcquired++
waitingMonitors[iThread] = null
return true
}

Expand All @@ -867,16 +870,17 @@ private class MonitorTracker(nThreads: Int) {
fun releaseMonitor(monitor: Any) {
// Decrement the reentrancy depth and remove the acquisition info
// if the monitor becomes free to acquire by another thread.
val ai = acquiredMonitors[monitor]!!
ai.timesAcquired--
if (ai.timesAcquired == 0) acquiredMonitors.remove(monitor)
val info = acquiredMonitors[monitor]!!
info.timesAcquired--
if (info.timesAcquired == 0)
acquiredMonitors.remove(monitor)
}

/**
* Returns `true` if the corresponding threads is waiting on some monitor.
*/
fun isWaiting(iThread: Int): Boolean {
val monitor = acquiringMonitors[iThread] ?: return false
val monitor = waitingMonitors[iThread]?.monitor ?: return false
return waitForNotify[iThread] || !canAcquireMonitor(iThread, monitor)
}

Expand All @@ -889,15 +893,35 @@ private class MonitorTracker(nThreads: Int) {

/**
* Performs a logical wait, [isWaiting] for the specified thread
* returns `true` until the corresponding [notify] or [notifyAll]
* is invoked.
* returns `true` until the corresponding [notify] or [notifyAll] is invoked.
*/
fun waitOnMonitor(iThread: Int, monitor: Any) {
fun waitOnMonitor(iThread: Int, monitor: Any): Boolean {
// TODO: we can add spurious wakeups here
check(monitor in acquiredMonitors) { "Monitor should have been acquired by this thread" }
releaseMonitor(monitor)
waitForNotify[iThread] = true
acquiringMonitors[iThread] = monitor
var info = acquiredMonitors[monitor]
if (info != null) {
// in case when lock is currently acquired by another thread continue waiting
if (info.iThread != iThread)
return true
// in case when current thread owns the lock we release it
// in order to give other thread a chance to acquire it
// and put the current thread into waiting state
waitForNotify[iThread] = true
waitingMonitors[iThread] = info
acquiredMonitors.remove(monitor)
return true
}
// otherwise the lock is held by no-one and can be acquired
info = waitingMonitors[iThread]
check(info != null && info.monitor === monitor && info.iThread == iThread) {
"Monitor should have been acquired by this thread"
}
// if there has been no `notify` yet continue waiting
if (waitForNotify[iThread])
return true
// otherwise acquire monitor restoring its re-entrance depth
acquiredMonitors[monitor] = info
waitingMonitors[iThread] = null
return false
}

/**
Expand All @@ -908,15 +932,16 @@ private class MonitorTracker(nThreads: Int) {
/**
* Performs the logical `notifyAll`.
*/
fun notifyAll(monitor: Any): Unit = acquiringMonitors.forEachIndexed { iThread, m ->
if (monitor === m) waitForNotify[iThread] = false
fun notifyAll(monitor: Any): Unit = waitingMonitors.forEachIndexed { iThread, info ->
if (monitor === info?.monitor)
waitForNotify[iThread] = false
}

/**
* Stores the number of reentrant acquisitions ([timesAcquired])
* and the number of thread ([iThread]) that holds the monitor.
* Stores the [monitor], id of the thread acquired the monitor [iThread],
* and the number of reentrant acquisitions [timesAcquired].
*/
private class MonitorAcquiringInfo(val iThread: Int, var timesAcquired: Int)
private class MonitorAcquiringInfo(val monitor: Any, val iThread: Int, var timesAcquired: Int)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,31 @@ class WaitNotifyLockTest : VerifierState() {
}
}

private class WaitNotifyLock {
@ModelCheckingCTest(iterations = 30)
class NestedWaitNotifyLockTest : VerifierState() {
private var counter = 0
private val lock = NestedWaitNotifyLock()

override fun extractState() = counter

@Operation
fun getAndIncrement() = lock.withLock { counter++ }

@Test
fun test() {
LinChecker.check(this::class.java)
}
}

private interface SimpleLock {
fun lock()
fun unlock()
}

private class WaitNotifyLock : SimpleLock {
private var owner: Thread? = null

fun lock() {
override fun lock() {
val thread = Thread.currentThread()
synchronized(this) {
while (owner != null) {
Expand All @@ -55,15 +76,40 @@ private class WaitNotifyLock {
}
}

fun unlock() {
override fun unlock() {
synchronized(this) {
owner = null
(this as Object).notify()
}
}
}

private inline fun <T> WaitNotifyLock.withLock(body: () -> T): T {
private class NestedWaitNotifyLock : SimpleLock {
private var owner: Thread? = null

override fun lock() {
val thread = Thread.currentThread()
synchronized(this) {
synchronized(this) {
while (owner != null) {
(this as Object).wait()
}
owner = thread
}
}
}

override fun unlock() {
synchronized(this) {
synchronized(this) {
owner = null
(this as Object).notify()
}
}
}
}

private inline fun <T> SimpleLock.withLock(body: () -> T): T {
try {
lock()
return body()
Expand Down

0 comments on commit 6a860c5

Please sign in to comment.