Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConcurrentModificationException bug fixed #216

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -276,25 +276,69 @@ abstract class ManagedStrategy(
if (!isTestThread(iThread)) return // can switch only test threads
if (inIgnoredSection(iThread)) return // cannot suspend in ignored sections
check(iThread == currentThread)
var isLoop = false
if (loopDetector.visitCodeLocation(iThread, codeLocation)) {

if (loopDetector.replayModeEnabled) {
avpotapov00 marked this conversation as resolved.
Show resolved Hide resolved
newSwitchPointInReplayMode(iThread, codeLocation, tracePoint)
} else {
newSwitchPointRegular(iThread, codeLocation, tracePoint)
}
traceCollector?.passCodeLocation(tracePoint)
// continue the operation
}

private fun newSwitchPointRegular(
iThread: Int,
codeLocation: Int,
tracePoint: TracePoint?
) {
val spinLockDetected = loopDetector.visitCodeLocation(iThread, codeLocation)

if (spinLockDetected) {
failIfObstructionFreedomIsRequired {
// Log the last event that caused obstruction freedom violation
traceCollector?.passCodeLocation(tracePoint)
OBSTRUCTION_FREEDOM_SPINLOCK_VIOLATION_MESSAGE
}
isLoop = true
}
val shouldSwitch = shouldSwitch(iThread) or isLoop
if (shouldSwitch) {
if (isLoop) {
val shouldSwitchDueToStrategy = shouldSwitch(iThread)
if (shouldSwitchDueToStrategy or spinLockDetected) {
if (spinLockDetected) {
switchCurrentThreadDueToActiveLock(iThread, loopDetector.replayModeCurrentCyclePeriod)
} else {
switchCurrentThread(iThread, SwitchReason.STRATEGY_SWITCH)
}
} else {
loopDetector.onNextExecutionPoint(codeLocation)
}
}

/**
* When replaying executions it's important to repeat exactly the same executions and switches,
avpotapov00 marked this conversation as resolved.
Show resolved Hide resolved
* that was recorded to [loopDetector] history during last execution.
* For example, let's consider that interleaving say us to switch from thread 1 to thread 2
* at the execution position 200. But after execution 10 spin cycle with period 2 occurred,
* so we will switch fom spin cycle, so when we leave this cycle due to switch for the first time
* interleaving executions counter may be near to 200 and strategy switch will happen soon.
* But on replay run we will switch from thread 1 early, after 12 operations, but no strategy switch will be
* performed for the next 200-12 operations. This leads to another executions results, comparing to the original
* failure results.
* To avoid this bug when we're replaying some executions, we have to fallow only [loopDetector] history
* during last execution. In considered example, we will retain that we will switch soon after spin cycle in thread
* 1, so no bug will appear.
*/
private fun newSwitchPointInReplayMode(iThread: Int, codeLocation: Int, tracePoint: TracePoint?) {
if (loopDetector.visitCodeLocation(iThread, codeLocation)) {
if (loopDetector.isSpinLockSwitch) {
failIfObstructionFreedomIsRequired {
// Log the last event that caused obstruction freedom violation
traceCollector?.passCodeLocation(tracePoint)
OBSTRUCTION_FREEDOM_SPINLOCK_VIOLATION_MESSAGE
}
switchCurrentThreadDueToActiveLock(iThread, loopDetector.replayModeCurrentCyclePeriod)
} else {
switchCurrentThread(iThread, SwitchReason.STRATEGY_SWITCH)
}
}
traceCollector?.passCodeLocation(tracePoint)
// continue the operation
}

/**
Expand Down Expand Up @@ -811,6 +855,14 @@ abstract class ManagedStrategy(
*/
private val currentInterleavingHistory = ArrayList<InterleavingHistoryNode>()

/**
* When we're back to some thread, newSwitchPoint won't be called before the fist
* in current thread part as it was called before switch.
* So when we return to thread that already was running, we have to start from 1 its executions counter.
* This set helps us to determine if some thread is running for the first time in an execution or not.
*/
private val threadsRan: MutableSet<Int> = hashSetOf()

/**
* Set of interleaving event sequences lead to loops. (A set of previously detected hangs)
*/
Expand All @@ -833,6 +885,11 @@ abstract class ManagedStrategy(

val replayModeCurrentCyclePeriod: Int get() = replayModeLoopDetectorHelper?.currentCyclePeriod ?: 0

val replayModeEnabled: Boolean get() = replayModeLoopDetectorHelper != null

val isSpinLockSwitch: Boolean
get() = replayModeLoopDetectorHelper?.isActiveLockNode ?: error("Loop detector is not in replay mode")

fun enableReplayMode(failDueToDeadlockInTheEnd: Boolean) {
val contextSwitchesBeforeHalt =
findMaxPrefixLengthWithNoCycleOnSuffix(currentInterleavingHistory)?.let { it.executionsBeforeCycle + it.cyclePeriod }
Expand Down Expand Up @@ -864,7 +921,6 @@ abstract class ManagedStrategy(
val count = currentThreadCodeLocationVisitCountMap.getOrDefault(codeLocation, 0) + 1
currentThreadCodeLocationVisitCountMap[codeLocation] = count
currentThreadCodeLocationsHistory += codeLocation
onNextExecutionPoint(executionIdentity = codeLocation)
val detectedEarly = loopTrackingCursor.isInCycle
// Check whether the count exceeds the maximum number of repetitions for loop/hang detection.
val detectedFirstTime = count > hangingDetectionThreshold
Expand Down Expand Up @@ -895,15 +951,29 @@ abstract class ManagedStrategy(
}

private fun onNextThreadSwitchPoint(nextThread: Int) {
/*
When we're back to some thread, newSwitchPoint won't be called before the fist
avpotapov00 marked this conversation as resolved.
Show resolved Hide resolved
in current thread part as it was called before switch.
avpotapov00 marked this conversation as resolved.
Show resolved Hide resolved
So, we're tracking that to maintain the number of performed operations correctly.
*/
val threadRunningFirstTime = threadsRan.add(nextThread)
if (currentInterleavingHistory.isNotEmpty() && currentInterleavingHistory.last().threadId == nextThread) {
return
}
currentInterleavingHistory.add(InterleavingHistoryNode(nextThread))
currentInterleavingHistory.add(
InterleavingHistoryNode(
threadId = nextThread,
executions = if (threadRunningFirstTime) 0 else 1,
)
)
loopTrackingCursor.onNextSwitchPoint(nextThread)
replayModeLoopDetectorHelper?.onNextSwitch(nextThread)
if (!threadRunningFirstTime) {
loopTrackingCursor.onNextExecutionPoint()
}
replayModeLoopDetectorHelper?.onNextSwitch(threadRunningFirstTime)
}

private fun onNextExecutionPoint(executionIdentity: Int) {
fun onNextExecutionPoint(executionIdentity: Int) {
val lastInterleavingHistoryNode = currentInterleavingHistory.last()
if (lastInterleavingHistoryNode.cycleOccurred) {
return /* If we already ran into cycle and haven't switched than no need to track executions */
Expand Down Expand Up @@ -957,6 +1027,7 @@ abstract class ManagedStrategy(
* Is called before each interleaving part processing
*/
fun beforePart(nextThread: Int) {
threadsRan.clear()
if (!firstThreadSet) {
setFirstThread(nextThread)
} else if (lastExecutedThread != nextThread) {
Expand Down Expand Up @@ -995,6 +1066,8 @@ abstract class ManagedStrategy(
*/
private val failDueToDeadlockInTheEnd: Boolean,
) {
val isActiveLockNode: Boolean get() = interleavingHistory[currentInterleavingNodeIndex].spinCyclePeriod != 0

/**
* Cycle period if is occurred in during current thread switch or 0 if no spin-cycle happened
*/
Expand Down Expand Up @@ -1037,26 +1110,22 @@ abstract class ManagedStrategy(
* @return should we switch from the current thread?
*/
fun onNextExecution(): Boolean {
require(currentInterleavingNodeIndex <= interleavingHistory.lastIndex)
require(currentInterleavingNodeIndex <= interleavingHistory.lastIndex) { "Internal error" }
val historyNode = interleavingHistory[currentInterleavingNodeIndex]
// switch current thread after we executed operations before spin cycle and cycle iteration to show it
val shouldSwitchThread =
executionsPerformedInCurrentThread++ >= historyNode.spinCyclePeriod + historyNode.executions
checkFailDueToDeadlock(shouldSwitchThread)
return historyNode.cycleOccurred && shouldSwitchThread
return shouldSwitchThread
}

/**
* Called before next thread switch
*/
fun onNextSwitch(nextThread: Int) {
fun onNextSwitch(threadRunningFirstTime: Boolean) {
currentInterleavingNodeIndex++
// See threadsRan field description to understand the following initialization logic
executionsPerformedInCurrentThread = if (executionPart != ExecutionPart.PARALLEL) {
0 // newSwitchPoint method is not called before methods in INIT and POST parts
} else {
if (executionPart != ExecutionPart.PARALLEL || threadsRan.add(nextThread)) 0 else 1
}
executionsPerformedInCurrentThread = if (threadRunningFirstTime) 0 else 1
}

private fun checkFailDueToDeadlock(shouldSwitchThread: Boolean) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Lincheck
*
* Copyright (C) 2019 - 2023 JetBrains s.r.o.
*
* This Source Code Form is subject to the terms of the
* Mozilla Public License, v. 2.0. If a copy of the MPL was not distributed
* with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

@file:Suppress("unused")

package org.jetbrains.kotlinx.lincheck_test.representation

import kotlinx.atomicfu.*
import org.jetbrains.kotlinx.lincheck.annotations.Operation
import org.jetbrains.kotlinx.lincheck.annotations.Param
import org.jetbrains.kotlinx.lincheck.checkImpl
import org.jetbrains.kotlinx.lincheck.paramgen.IntGen
import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.ModelCheckingOptions
import org.jetbrains.kotlinx.lincheck_test.util.checkLincheckOutput
import org.junit.Test

/**
* This test is created to verify this [bug](https://github.com/JetBrains/lincheck/issues/209) is resolved.
* The problem was that we ran in infinite spin-lock in trace collection mode due to incorrect Loop detector work.
*/
@Param(name = "element", gen = IntGen::class, conf = "0:3")
avpotapov00 marked this conversation as resolved.
Show resolved Hide resolved
class BoundedQueueTest {
private val queue = QueueAdaptor<Int>()

@Operation
fun enqueue(@Param(name = "element") element: Int) = queue.enqueue(element)

@Operation
fun dequeue() = queue.dequeue() != null

@Test
fun modelCheckingTest() =
ModelCheckingOptions()
ndkoval marked this conversation as resolved.
Show resolved Hide resolved
.iterations(100)
.invocationsPerIteration(5_000)
.actorsBefore(2)
.threads(3)
.actorsPerThread(2)
.actorsAfter(2)
.checkObstructionFreedom(false)
.minimizeFailedScenario(false)
.sequentialSpecification(BoundedIntQueueSequential::class.java)
.checkImpl(this::class.java)
.checkLincheckOutput("bounded_queue_incorrect_results.txt")

}

@Suppress("unused")
class BoundedIntQueueSequential {
avpotapov00 marked this conversation as resolved.
Show resolved Hide resolved
private val q = ArrayList<Int>()

fun enqueue(element: Int) {
q.add(element)
}

fun dequeue() = q.removeFirstOrNull() != null
}

class QueueAdaptor<T> {
private class Marked<T>(val element: T) {
val marked = atomic(false)
}

private val queue = BoundedQueue<Marked<T>>()
private val concurrentEnqueues = atomic(0)

fun enqueue(element: T) {
try {
concurrentEnqueues.getAndIncrement()
queue.enqueue(Marked(element))
} finally {
concurrentEnqueues.getAndDecrement()
}
}

fun dequeue(): T? {
while (true) {
val element = queue.dequeue()
if (element != null) {
val wasMarked = element.marked.getAndSet(true)
require(!wasMarked)
return element.element
}
if (concurrentEnqueues.value == 0)
return null
}
}
}


private const val CAPACITY = 10

// Based on https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
class BoundedQueue<E> {
private val buffer = Array(CAPACITY) {
Cell(it.toLong(), null)
}
private val enqueuePos = atomic(0L)
private val dequeuePos = atomic(0L)

fun enqueue(element: E) {
var pos = enqueuePos.value
while (true) {
val cell = buffer[(pos % buffer.size).toInt()]
val seq = cell.sequence.value
val dif = seq - pos
if (dif == 0L) {
if (enqueuePos.compareAndSet(pos, pos + 1)) {
cell.data.value = element
cell.sequence.value = pos + 1
return
}
} else if (dif < 0) {
error("Can't get here in the test")
} else {
pos = enqueuePos.value
}
}
}

fun dequeue(): E? {
var pos = dequeuePos.value
while (true) {
val cell = buffer[(pos % buffer.size).toInt()]
val seq = cell.sequence.value
val dif = seq - (pos + 1)
if (dif == 0L) {
if (dequeuePos.compareAndSet(pos, pos + 1)) {
val result = cell.data.value!!
cell.sequence.value = pos + buffer.size.toLong()
return result
}
} else if (dif < 0) {
return null
} else {
pos = dequeuePos.value
}
}
}

private inner class Cell(sequence: Long, data: E?) {
val sequence = atomic(sequence)
val data = atomic(data)
}
}
Loading