-
Notifications
You must be signed in to change notification settings - Fork 0
/
ChannelsLincheckTest.kt
141 lines (121 loc) · 4.26 KB
/
ChannelsLincheckTest.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
@file:Suppress("unused", "MemberVisibilityCanBePrivate")
package com.deshyt
import com.deshyt.buffered.BufferedChannel
import com.deshyt.rendezvous.RendezvousChannel
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.suspendCancellableCoroutine
import org.jetbrains.kotlinx.lincheck.scenario
import kotlin.collections.ArrayList
class RendezvousChannelTest : ChannelTestBase(
c = RendezvousChannel(),
sequentialSpecification = SequentialRendezvousChannel::class.java,
)
class Buffered2ChannelTest : ChannelTestBase(
c = BufferedChannel(2),
sequentialSpecification = SequentialBuffered2Channel::class.java,
)
class Buffered1ChannelTest : ChannelTestBase(
c = BufferedChannel(1),
sequentialSpecification = SequentialBuffered1Channel::class.java,
customScenarios = listOf(
scenario {
parallel {
thread {
blockingActor(Buffered1ChannelTest::send, 2)
}
thread {
blockingActor(Buffered1ChannelTest::send, 2)
blockingActor(Buffered1ChannelTest::receive)
blockingActor(Buffered1ChannelTest::send, 2)
}
thread {
blockingActor(Buffered1ChannelTest::receive)
}
}
}
)
)
// Sequential specification for a rendezvous channel
class SequentialRendezvousChannel {
private val senders = ArrayList<Pair<CancellableContinuation<Unit>, Int>>()
private val receivers = ArrayList<CancellableContinuation<Int>>()
suspend fun send(x: Int) {
if (resumeFirstReceiver(x)) return
suspendCancellableCoroutine { cont ->
senders.add(cont to x)
}
}
private fun resumeFirstReceiver(elem: Int): Boolean {
while (receivers.isNotEmpty()) {
val r = receivers.removeFirst()
if (r.resume(elem)) return true
}
return false
}
suspend fun receive(): Int {
return resumeFirstSender()
?: suspendCancellableCoroutine { cont -> receivers.add(cont) }
}
private fun resumeFirstSender(): Int? {
while (senders.isNotEmpty()) {
val (sender, elem) = senders.removeFirst()
if (sender.resume(Unit)) return elem
}
return null
}
}
// Sequential specification for a buffered channel
open class SequentialBufferedChannel(
private val capacity: Long
) {
private val senders = ArrayList<Pair<CancellableContinuation<Unit>, Int>>()
private val buffer = ArrayList<Int>()
private val receivers = ArrayList<CancellableContinuation<Int>>()
suspend fun send(x: Int) {
if (resumeFirstReceiver(x)) return
if (tryBufferElem(x)) return
suspendCancellableCoroutine { cont -> senders.add(cont to x) }
}
private fun tryBufferElem(elem: Int): Boolean {
if (buffer.size < capacity) {
buffer.add(elem)
return true
}
return false
}
private fun resumeFirstReceiver(elem: Int): Boolean {
while (receivers.isNotEmpty()) {
val r = receivers.removeFirst()
if (r.resume(elem)) return true
}
return false
}
suspend fun receive(): Int {
return getBufferedElem()
?: resumeFirstSender()
?: suspendCancellableCoroutine { cont -> receivers.add(cont) }
}
private fun getBufferedElem(): Int? {
val elem = buffer.removeFirstOrNull()?.also {
// The element is retrieved from the buffer, resume one sender and save its element in the buffer
resumeFirstSender()?.also { buffer.add(it) }
}
return elem
}
private fun resumeFirstSender(): Int? {
while (senders.isNotEmpty()) {
val (sender, elem) = senders.removeFirst()
if (sender.resume(Unit)) return elem
}
return null
}
}
class SequentialBuffered1Channel : SequentialBufferedChannel(1)
class SequentialBuffered2Channel : SequentialBufferedChannel(2)
@OptIn(InternalCoroutinesApi::class)
private fun <T> CancellableContinuation<T>.resume(res: T): Boolean {
val token = tryResume(res) ?: return false
completeResume(token)
return true
}