Skip to content

Commit

Permalink
Fix a race in FixedActiveThreadsExecutor (#202)
Browse files Browse the repository at this point in the history

---------

Co-authored-by: Aleksandr.Potapov <[email protected]>
  • Loading branch information
avpotapov00 and Aleksandr.Potapov authored Jun 30, 2023
1 parent 7d5522c commit 8578302
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ import java.util.concurrent.locks.*
* possible, so that they should not be parked and unparked between invocations.
*/
internal class FixedActiveThreadsExecutor(private val nThreads: Int, runnerHash: Int) : Closeable {
/**
* Threads used in this runner.
*/
val threads: List<TestThread>

/**
* null, waiting TestThread, Runnable task, or SHUTDOWN
*/
Expand All @@ -47,10 +42,11 @@ internal class FixedActiveThreadsExecutor(private val nThreads: Int, runnerHash:
*/
private var hangDetected = false

init {
threads = (0 until nThreads).map { iThread ->
TestThread(iThread, runnerHash, testThreadRunnable(iThread)).also { it.start() }
}
/**
* Threads used in this runner.
*/
val threads = Array(nThreads) { iThread ->
TestThread(iThread, runnerHash, testThreadRunnable(iThread)).also { it.start() }
}

/**
Expand All @@ -75,24 +71,32 @@ internal class FixedActiveThreadsExecutor(private val nThreads: Int, runnerHash:
}

private fun submitTask(iThread: Int, task: Any) {
if (tasks[iThread].compareAndSet(null, task)) return
// CAS failed => a test thread is parked.
// Submit the task and unpark the waiting thread.
val thread = tasks[iThread].value as TestThread
tasks[iThread].value = task
LockSupport.unpark(thread)
val old = tasks[iThread].getAndSet(task)
if (old is TestThread) {
LockSupport.unpark(old)
}
}

private fun await(timeoutMs: Long) {
val deadline = System.currentTimeMillis() + timeoutMs
for (iThread in 0 until nThreads)
awaitTask(iThread, deadline)
var exception: Throwable? = null
for (iThread in 0 until nThreads) {
val e = awaitTask(iThread, deadline)
if (e != null) {
if (exception == null) {
exception = e
} else {
exception.addSuppressed(e)
}
}
}
exception?.let { throw ExecutionException(it) }
}

private fun awaitTask(iThread: Int, deadline: Long) {
private fun awaitTask(iThread: Int, deadline: Long): Throwable? {
val result = getResult(iThread, deadline)
// Check whether there was an exception during the execution.
if (result != DONE) throw ExecutionException(result as Throwable)
return result as? Throwable
}

private fun getResult(iThread: Int, deadline: Long): Any {
Expand All @@ -116,14 +120,14 @@ internal class FixedActiveThreadsExecutor(private val nThreads: Int, runnerHash:
}

private fun testThreadRunnable(iThread: Int) = Runnable {
loop@while (true) {
loop@ while (true) {
val task = getTask(iThread)
if (task == SHUTDOWN) return@Runnable
tasks[iThread].value = null // reset task
val runnable = task as Runnable
try {
runnable.run()
} catch(e: Throwable) {
} catch (e: Throwable) {
setResult(iThread, wrapInvalidAccessFromUnnamedModuleExceptionWithDescription(e))
continue@loop
}
Expand All @@ -138,7 +142,7 @@ internal class FixedActiveThreadsExecutor(private val nThreads: Int, runnerHash:
}
// Park until a task is stored into `tasks[iThread]`.
val currentThread = Thread.currentThread()
if (tasks[iThread].compareAndSet(null, Thread.currentThread())) {
if (tasks[iThread].compareAndSet(null, currentThread)) {
while (tasks[iThread].value === currentThread) {
LockSupport.park()
}
Expand Down Expand Up @@ -172,12 +176,13 @@ internal class FixedActiveThreadsExecutor(private val nThreads: Int, runnerHash:
}
}

class TestThread(val iThread: Int, val runnerHash: Int, r: Runnable) : Thread(r, "FixedActiveThreadsExecutor@$runnerHash-$iThread") {
class TestThread(val iThread: Int, val runnerHash: Int, r: Runnable) :
Thread(r, "FixedActiveThreadsExecutor@$runnerHash-$iThread") {
var cont: CancellableContinuation<*>? = null
}
}

private const val SPINNING_LOOP_ITERATIONS_BEFORE_PARK = 1000_000

private val SHUTDOWN = Any()
private val DONE = Any()
private val SHUTDOWN = "SHUTDOWN"
private val DONE = "DONE"
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Please add the following lines to your test running configuration:
at org.jetbrains.kotlinx.lincheck.strategy.managed.ManagedStrategyRunner.onFailure(ManagedStrategy.kt:736)
at org.jetbrains.kotlinx.lincheck.runner.TestThreadExecution.failOnExceptionIsUnexpected(TestThreadExecution.java:52)
at org.jetbrains.kotlinx.lincheck.runner.TestThreadExecution229.run(Unknown Source)
at org.jetbrains.kotlinx.lincheck.runner.FixedActiveThreadsExecutor.testThreadRunnable$lambda-4(FixedActiveThreadsExecutor.kt:125)
at org.jetbrains.kotlinx.lincheck.runner.FixedActiveThreadsExecutor.testThreadRunnable$lambda-4(FixedActiveThreadsExecutor.kt:129)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalAccessException: module java.base does not "opens java.io" to unnamed module
at org.jetbrains.kotlinx.lincheck_test.representation.IllegalModuleAccessOutputMessageTest$operation$1.invoke(IllegalModuleAccessOutputMessageTest.kt:37)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ java.lang.IllegalStateException: Internal bug
at org.jetbrains.kotlinx.lincheck.util.InternalLincheckExceptionEmulator.throwException(InternalLincheckExceptionEmulator.kt:23)
at org.jetbrains.kotlinx.lincheck_test.representation.InternalLincheckBugTest.operation2(InternalLincheckBugTest.kt:36)
at org.jetbrains.kotlinx.lincheck.runner.TestThreadExecution304.run(Unknown Source)
at org.jetbrains.kotlinx.lincheck.runner.FixedActiveThreadsExecutor.testThreadRunnable$lambda-4(FixedActiveThreadsExecutor.kt:125)
at org.jetbrains.kotlinx.lincheck.runner.FixedActiveThreadsExecutor.testThreadRunnable$lambda-4(FixedActiveThreadsExecutor.kt:129)
at java.base/java.lang.Thread.run(Thread.java:829)

0 comments on commit 8578302

Please sign in to comment.