Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix number of iterations in spin-loops in case if #cpus < #threads #284

Merged
merged 21 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import kotlinx.atomicfu.*
import kotlinx.coroutines.CancellableContinuation
import org.jetbrains.kotlinx.lincheck.*
import org.jetbrains.kotlinx.lincheck.execution.*
import org.jetbrains.kotlinx.lincheck.util.*
import java.io.*
import java.lang.*
import java.util.concurrent.*
Expand All @@ -30,11 +31,21 @@ internal class FixedActiveThreadsExecutor(private val nThreads: Int, runnerHash:
*/
private val tasks = atomicArrayOfNulls<Any>(nThreads)

/**
* Spinners for each thread used for spin-wait on tasks.
eupp marked this conversation as resolved.
Show resolved Hide resolved
*/
private val taskSpinners = SpinnerList(nThreads)

/**
* null, waiting in [submitAndAwait] thread, DONE, or exception
*/
private val results = atomicArrayOfNulls<Any>(nThreads)

/**
* Spinner for spin-wait on results.
eupp marked this conversation as resolved.
Show resolved Hide resolved
*/
private val resultSpinner = Spinner(nThreads)

/**
* This flag is set to `true` when [await] detects a hang.
* In this case, when this executor is closed, [Thread.stop]
Expand All @@ -49,8 +60,6 @@ internal class FixedActiveThreadsExecutor(private val nThreads: Int, runnerHash:
TestThread(iThread, runnerHash, testThreadRunnable(iThread)).also { it.start() }
}

val numberOfThreadsExceedAvailableProcessors = Runtime.getRuntime().availableProcessors() < threads.size

/**
* Submits the specified set of [tasks] to this executor
* and waits until all of them are completed.
Expand Down Expand Up @@ -122,9 +131,9 @@ internal class FixedActiveThreadsExecutor(private val nThreads: Int, runnerHash:

private fun getResult(iThread: Int, deadline: Long): Any {
// Active wait for a result during the limited number of loop cycles.
spinWait { results[iThread].value }?.let {
return it
}
val result = resultSpinner.spinWaitBoundedFor { results[iThread].value }
if (result != null)
eupp marked this conversation as resolved.
Show resolved Hide resolved
return result
// Park with timeout until the result is set or the timeout is passed.
val currentThread = Thread.currentThread()
if (results[iThread].compareAndSet(null, currentThread)) {
Expand Down Expand Up @@ -160,9 +169,9 @@ internal class FixedActiveThreadsExecutor(private val nThreads: Int, runnerHash:

private fun getTask(iThread: Int): Any {
// Active wait for a task for the limited number of loop cycles.
spinWait { tasks[iThread].value }?.let {
return it
}
val task = taskSpinners[iThread].spinWaitBoundedFor { tasks[iThread].value }
if (task != null)
return task
eupp marked this conversation as resolved.
Show resolved Hide resolved
// Park until a task is stored into `tasks[iThread]`.
val currentThread = Thread.currentThread()
if (tasks[iThread].compareAndSet(null, currentThread)) {
Expand All @@ -182,21 +191,6 @@ internal class FixedActiveThreadsExecutor(private val nThreads: Int, runnerHash:
LockSupport.unpark(thread)
}

private inline fun spinWait(getter: () -> Any?): Any? {
// Park immediately when the number of threads exceed the number of cores to avoid starvation.
val spinningLoopIterations = if (numberOfThreadsExceedAvailableProcessors) {
1
} else {
SPINNING_LOOP_ITERATIONS_BEFORE_PARK
}
repeat(spinningLoopIterations) {
getter()?.let {
return it
}
}
return null
}

override fun close() {
shutdown()
// Thread.stop() throws UnsupportedOperationException
Expand All @@ -217,8 +211,6 @@ internal class FixedActiveThreadsExecutor(private val nThreads: Int, runnerHash:

private val majorJavaVersion = Runtime.version().version()[0]

private const val SPINNING_LOOP_ITERATIONS_BEFORE_PARK = 1000_000

// These constants are objects for easier debugging.
private object Shutdown
private object Done
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.jetbrains.kotlinx.lincheck.runner.FixedActiveThreadsExecutor.*
import org.jetbrains.kotlinx.lincheck.runner.ParallelThreadsRunner.Completion.*
import org.jetbrains.kotlinx.lincheck.runner.UseClocks.*
import org.jetbrains.kotlinx.lincheck.strategy.*
import org.jetbrains.kotlinx.lincheck.util.*
import org.objectweb.asm.*
import java.lang.reflect.*
import java.util.concurrent.*
Expand Down Expand Up @@ -45,6 +46,8 @@ internal open class ParallelThreadsRunner(
private val runnerHash = this.hashCode() // helps to distinguish this runner threads from others
private val executor = FixedActiveThreadsExecutor(scenario.nThreads, runnerHash) // should be closed in `close()`

private val spinners = SpinnerList(executor.threads.size)

private lateinit var testInstance: Any

private var suspensionPointResults = List(scenario.nThreads) { t ->
Expand All @@ -65,7 +68,6 @@ internal open class ParallelThreadsRunner(
private fun trySetCancelledStatus(iThread: Int, actorId: Int) = completionStatuses[iThread].compareAndSet(actorId, null, CompletionStatus.CANCELLED)

private val uninitializedThreads = AtomicInteger(scenario.nThreads) // for threads synchronization
private var yieldInvokedInOnStart = false

private var initialPartExecution: TestThreadExecution? = null
private var parallelPartExecutions: Array<TestThreadExecution> = arrayOf()
Expand Down Expand Up @@ -195,22 +197,21 @@ internal open class ParallelThreadsRunner(
private fun waitAndInvokeFollowUp(iThread: Int, actorId: Int): Result {
// Coroutine is suspended. Call method so that strategy can learn it.
afterCoroutineSuspended(iThread)
// Tf the suspended method call has a follow-up part after this suspension point,
// If the suspended method call has a follow-up part after this suspension point,
// then wait for the resuming thread to write a result of this suspension point
// as well as the continuation to be executed by this thread;
// wait for the final result of the method call otherwise.
val completion = completions[iThread][actorId]
var i = 1
while (!isCoroutineResumed(iThread, actorId)) {
// Check whether the scenario is completed and the current suspended operation cannot be resumed.
if (currentExecutionPart == POST || completedOrSuspendedThreads.get() == scenario.nThreads) {
suspensionPointResults[iThread][actorId] = NoResult
return Suspended
}
// Do not call `yield()` until necessary. However, if the number of threads
// exceed the number of cores, it is better to call `yield` immediately to avoid starvation.
if (i++ % SPINNING_LOOP_ITERATIONS_BEFORE_YIELD == 0 || executor.numberOfThreadsExceedAvailableProcessors){
Thread.yield()
// Check if the coroutine is already resumed and if not, enter the spin loop.
if (!isCoroutineResumed(iThread, actorId)) {
spinners[iThread].spinWaitUntil {
// Check whether the scenario is completed and the current suspended operation cannot be resumed.
if (currentExecutionPart == POST || isParallelExecutionCompleted) {
suspensionPointResults[iThread][actorId] = NoResult
return Suspended
}
// Wait until coroutine is resumed.
isCoroutineResumed(iThread, actorId)
}
}
// Coroutine will be resumed. Call method so that strategy can learn it.
Expand Down Expand Up @@ -376,14 +377,7 @@ internal open class ParallelThreadsRunner(
super.onStart(iThread)
uninitializedThreads.decrementAndGet() // this thread has finished initialization
// wait for other threads to start
var i = 1
while (uninitializedThreads.get() != 0) {
if (i % SPINNING_LOOP_ITERATIONS_BEFORE_YIELD == 0) {
yieldInvokedInOnStart = true
Thread.yield()
}
i++
}
spinners[iThread].spinWaitUntil { uninitializedThreads.get() == 0 }
}

override fun needsTransformation() = true
Expand All @@ -401,5 +395,3 @@ internal open class ParallelThreadsRunner(
internal enum class UseClocks { ALWAYS, RANDOM }

internal enum class CompletionStatus { CANCELLED, RESUMED }

private const val SPINNING_LOOP_ITERATIONS_BEFORE_YIELD = 100_000
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.jetbrains.kotlinx.lincheck.runner.*
import org.jetbrains.kotlinx.lincheck.runner.ExecutionPart.*
import org.jetbrains.kotlinx.lincheck.strategy.*
import org.jetbrains.kotlinx.lincheck.verifier.*
import org.jetbrains.kotlinx.lincheck.util.*
import org.objectweb.asm.*
import java.lang.reflect.*
import java.util.*
Expand Down Expand Up @@ -44,6 +45,9 @@ abstract class ManagedStrategy(
// can be replaced with a new one for trace construction.
private var runner: ManagedStrategyRunner

// Spin-waiters for each thread
private val spinners = SpinnerList(nThreads)

companion object {
// Shares location ids between class transformers in order
// to keep them different in different code locations.
Expand Down Expand Up @@ -417,12 +421,11 @@ abstract class ManagedStrategy(
* the execution according to the strategy decision.
*/
private fun awaitTurn(iThread: Int) {
// Wait actively until the thread is allowed to continue
var i = 0
while (currentThread != iThread) {
spinners[iThread].spinWaitUntil {
// Finish forcibly if an error occurred and we already have an `InvocationResult`.
if (suddenInvocationResult != null) throw ForcibleExecutionFinishError
if (++i % SPINNING_LOOP_ITERATIONS_BEFORE_YIELD == 0) Thread.yield()
if (suddenInvocationResult != null)
throw ForcibleExecutionFinishError
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please put throw on the same line

currentThread == iThread
}
}

Expand Down Expand Up @@ -1439,8 +1442,6 @@ internal object ForcibleExecutionFinishError : Error() {

private const val COROUTINE_SUSPENSION_CODE_LOCATION = -1 // currently the exact place of coroutine suspension is not known

private const val SPINNING_LOOP_ITERATIONS_BEFORE_YIELD = 100_000

private const val OBSTRUCTION_FREEDOM_SPINLOCK_VIOLATION_MESSAGE =
"The algorithm should be non-blocking, but an active lock is detected"

Expand Down
Loading