Skip to content

Commit f777277

Browse files
committed
Android: Fix online mode issues, more coroutines.
- The read thread is now a coroutine. - Convert closeConnection to coroutine. - Set connection timeout explicitly to 30s. - Replace runBlocking with custom scope. - Reject promises when ID is incorrect, instead of resolving false. - Fix lock contention between the read coroutine and encryptConnection.
1 parent f5ca01a commit f777277

File tree

1 file changed

+56
-55
lines changed

1 file changed

+56
-55
lines changed

android/app/src/main/java/com/enderchat/modules/connection/ConnectionModule.kt

+56-55
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.enderchat.modules.connection
22

3-
import android.os.Process
43
import android.util.Base64
54
import com.enderchat.modules.connection.datatypes.Packet
65
import com.enderchat.modules.connection.datatypes.VarInt
@@ -10,6 +9,7 @@ import com.facebook.react.modules.core.DeviceEventManagerModule
109
import kotlinx.coroutines.*
1110
import java.io.ByteArrayOutputStream
1211
import java.net.Socket
12+
import java.net.InetSocketAddress
1313
import java.nio.ByteBuffer
1414
import java.nio.ByteOrder
1515
import java.util.UUID
@@ -23,6 +23,10 @@ import kotlin.concurrent.write
2323

2424
class ConnectionModule(reactContext: ReactApplicationContext)
2525
: ReactContextBaseJavaModule(reactContext) {
26+
private val scope = CoroutineScope(SupervisorJob() +
27+
Dispatchers.IO +
28+
Dispatchers.Main +
29+
Dispatchers.Default)
2630
private val lock = ReentrantReadWriteLock()
2731
private var socket: Socket? = null
2832
private var connectionId: UUID? = null
@@ -65,50 +69,48 @@ class ConnectionModule(reactContext: ReactApplicationContext)
6569

6670
@ReactMethod fun writePacket(
6771
connId: String, packetId: Int, data: String, promise: Promise
68-
) = runBlocking {
69-
launch(Dispatchers.IO) {
70-
lock.read {
71-
if (connId == connectionId.toString()) {
72-
try {
73-
val dataBytes = Base64.decode(data, Base64.DEFAULT)
74-
promise.resolve(directlyWritePacket(packetId, dataBytes))
75-
} catch (e: Exception) {
76-
promise.reject(e)
77-
}
78-
} else promise.resolve(false)
79-
}
72+
) = scope.launch(Dispatchers.IO) {
73+
lock.read {
74+
if (connId == connectionId.toString()) {
75+
try {
76+
val dataBytes = Base64.decode(data, Base64.DEFAULT)
77+
promise.resolve(directlyWritePacket(packetId, dataBytes))
78+
} catch (e: Exception) {
79+
promise.reject(e)
80+
}
81+
} else promise.reject(Exception("This connection is closed!"))
8082
}
8183
}
8284

8385
@ReactMethod fun enableEncryption(
8486
connId: String, secret: String, packet: String, promise: Promise
85-
) = runBlocking {
86-
launch(Dispatchers.IO) {
87-
lock.write {
88-
if (connId == connectionId.toString()) {
89-
try {
90-
val packetBytes = Base64.decode(packet, Base64.DEFAULT)
91-
val secretBytes = Base64.decode(secret, Base64.DEFAULT)
92-
val secretKey = SecretKeySpec(secretBytes, "AES")
93-
val iv = IvParameterSpec(secretBytes)
94-
aesDecipher = Cipher.getInstance("AES/CFB8/PKCS5Padding").apply {
95-
init(Cipher.DECRYPT_MODE, secretKey, iv)
96-
}
97-
val result = directlyWritePacket(0x01, packetBytes)
98-
aesCipher = Cipher.getInstance("AES/CFB8/PKCS5Padding").apply {
99-
init(Cipher.ENCRYPT_MODE, secretKey, iv)
100-
}
101-
promise.resolve(result)
102-
} catch (e: Exception) {
103-
promise.reject(e)
87+
) = scope.launch(Dispatchers.IO) {
88+
lock.write {
89+
if (connId == connectionId.toString()) {
90+
try {
91+
val packetBytes = Base64.decode(packet, Base64.DEFAULT)
92+
val secretBytes = Base64.decode(secret, Base64.DEFAULT)
93+
val secretKey = SecretKeySpec(secretBytes, "AES")
94+
val iv = IvParameterSpec(secretBytes)
95+
aesDecipher = Cipher.getInstance("AES/CFB8/PKCS5Padding").apply {
96+
init(Cipher.DECRYPT_MODE, secretKey, iv)
10497
}
105-
} else promise.resolve(false)
106-
}
98+
val result = directlyWritePacket(0x01, packetBytes)
99+
aesCipher = Cipher.getInstance("AES/CFB8/PKCS5Padding").apply {
100+
init(Cipher.ENCRYPT_MODE, secretKey, iv)
101+
}
102+
promise.resolve(result)
103+
} catch (e: Exception) {
104+
promise.reject(e)
105+
}
106+
} else promise.reject(Exception("This connection is closed!"))
107107
}
108108
}
109109

110-
@ReactMethod fun closeConnection(id: String) = lock.write {
111-
if (id == connectionId.toString()) directlyCloseConnection()
110+
@ReactMethod fun closeConnection(id: String) = scope.launch(Dispatchers.IO) {
111+
lock.write {
112+
if (id == connectionId.toString()) directlyCloseConnection()
113+
}
112114
}
113115

114116
@ReactMethod fun openConnection(opts: ReadableMap, promise: Promise) {
@@ -123,9 +125,7 @@ class ConnectionModule(reactContext: ReactApplicationContext)
123125

124126
// Start thread which handles creating the connection and then reads packets from it.
125127
// This avoids blocking the main thread on writeLock and keeps the UI thread responsive.
126-
thread(start = true, name = "EnderChat-conn-read") {
127-
Process.setThreadPriority(Process.THREAD_PRIORITY_MORE_FAVORABLE)
128-
128+
scope.launch(Dispatchers.IO) {
129129
lock.writeLock().lock()
130130
val socket: Socket
131131
val connectionId = UUID.randomUUID()
@@ -134,9 +134,10 @@ class ConnectionModule(reactContext: ReactApplicationContext)
134134
directlyCloseConnection()
135135

136136
// Create socket and connection ID.
137-
socket = Socket(host, port)
137+
socket = Socket()
138+
socket.connect(InetSocketAddress(host, port), 30 * 1000)
138139
socket.soTimeout = 20 * 1000
139-
this.connectionId = connectionId
140+
this@ConnectionModule.connectionId = connectionId
140141

141142
// Create data to send in Handshake.
142143
val portBuf = ByteBuffer.allocate(2)
@@ -156,14 +157,14 @@ class ConnectionModule(reactContext: ReactApplicationContext)
156157
socket.getOutputStream().write(Packet(0x00, loginPacketData).writePacket())
157158

158159
// Update the current socket and resolve/reject.
159-
this.socket = socket
160+
this@ConnectionModule.socket = socket
160161
lock.writeLock().unlock()
161162
promise.resolve(connectionId.toString())
162163
} catch (e: Exception) {
163164
directlyCloseConnection()
164165
lock.writeLock().unlock()
165166
promise.reject(e)
166-
return@thread
167+
return@launch
167168
}
168169

169170
// Calculate the necessary packet IDs.
@@ -190,14 +191,15 @@ class ConnectionModule(reactContext: ReactApplicationContext)
190191
val buffer = ByteArrayOutputStream()
191192
val buf = ByteArray(4096)
192193
while (true) {
193-
lock.readLock().lock()
194-
if (this.socket != socket) {
195-
lock.readLock().unlock()
196-
break
197-
}
194+
var lockAcquired = false
198195
try {
199196
val n = socket.getInputStream().read(buf)
200-
if (n == -1) {
197+
if (n == -1) break
198+
199+
// Make sure this is the same socket we read from.
200+
lock.readLock().lock()
201+
lockAcquired = true
202+
if (this@ConnectionModule.socket != socket) {
201203
lock.readLock().unlock()
202204
break
203205
}
@@ -209,7 +211,6 @@ class ConnectionModule(reactContext: ReactApplicationContext)
209211
buffer.write(buf, 0, n)
210212
}
211213

212-
// TODO: This could be coroutined. Maybe some actor-style threading?
213214
while (true) {
214215
// Read packets from the buffer.
215216
val bytes = buffer.toByteArray()
@@ -221,15 +222,14 @@ class ConnectionModule(reactContext: ReactApplicationContext)
221222
buffer.write(bytes, packet.totalLength!!, bytes.size - packet.totalLength)
222223

223224
// We can handle Keep Alive, Login Success and Set Compression.
225+
// No write lock since writePacket isn't called during login sequence (usually).
224226
if (packet.id.value == keepAliveClientBoundId) {
225227
directlyWritePacket(keepAliveServerBoundId, packet.data)
226228
} else if (packet.id.value == setCompressionId && !loggedIn) {
227229
val threshold = VarInt.read(packet.data)?.value ?: 0
228230
compressionThreshold = threshold
229231
compressionEnabled = threshold >= 0
230-
} else if (packet.id.value == loginSuccessId && !loggedIn) {
231-
loggedIn = true // Login Success
232-
}
232+
} else if (packet.id.value == loginSuccessId && !loggedIn) loggedIn = true
233233

234234
// Forward the packet to JavaScript.
235235
val packetLengthLength =
@@ -247,9 +247,10 @@ class ConnectionModule(reactContext: ReactApplicationContext)
247247
sendEvent(reactContext = reactApplicationContext, "ecm:packet", params)
248248
}
249249
lock.readLock().unlock()
250+
lockAcquired = false
250251
} catch (e: Exception) {
251-
lock.readLock().unlock()
252-
lock.write { if (this.socket == socket) directlyCloseConnection() }
252+
if (lockAcquired) lock.readLock().unlock()
253+
lock.write { if (this@ConnectionModule.socket == socket) directlyCloseConnection() }
253254
val params = Arguments.createMap().apply {
254255
putString("connectionId", connectionId.toString())
255256
putString("stackTrace", e.stackTraceToString())

0 commit comments

Comments
 (0)