Skip to content

Commit

Permalink
KTOR-7734 JS: Catch error on channel reader closure (#4641)
Browse files Browse the repository at this point in the history
* Cleanup code slightly before fix
  • Loading branch information
osipxd authored Jan 31, 2025
1 parent 747fef9 commit c6bdf2b
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
/*
* Copyright 2014-2019 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.client.engine.js.browser

import io.ktor.client.engine.js.*
import io.ktor.client.fetch.*
import io.ktor.utils.io.*
import kotlinx.coroutines.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.await
import org.khronos.webgl.Uint8Array
import org.w3c.fetch.Response
import kotlin.coroutines.*
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

internal fun CoroutineScope.readBodyBrowser(response: Response): ByteReadChannel {
@Suppress("UNCHECKED_CAST_TO_EXTERNAL_INTERFACE")
@Suppress("UnsafeCastFromDynamic")
val stream: ReadableStream<Uint8Array> = response.body ?: return ByteReadChannel.Empty
return channelFromStream(stream)
}
Expand All @@ -22,24 +25,24 @@ internal fun CoroutineScope.channelFromStream(
stream: ReadableStream<Uint8Array>
): ByteReadChannel = writer {
val reader: ReadableStreamDefaultReader<Uint8Array> = stream.getReader()
while (true) {
try {
try {
while (true) {
val chunk = reader.readChunk() ?: break
channel.writeFully(chunk.asByteArray())
channel.flush()
} catch (cause: Throwable) {
reader.cancel(cause)
throw cause
}
} catch (cause: Throwable) {
reader.cancel(cause).catch { /* ignore */ }.await()
throw cause
}
}.channel

internal suspend fun ReadableStreamDefaultReader<Uint8Array>.readChunk(): Uint8Array? =
suspendCancellableCoroutine { continuation ->
suspendCoroutine { continuation ->
read().then {
val chunk = it.value
val result = if (it.done) null else chunk
continuation.resumeWith(Result.success(result))
continuation.resume(result)
}.catch { cause ->
continuation.resumeWithException(cause)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.js.Promise

@OptIn(InternalCoroutinesApi::class)
Expand All @@ -25,9 +27,7 @@ internal suspend fun commonFetch(
val controller = AbortController()
init.signal = controller.signal

val abortOnCallCompletion = callJob.invokeOnCompletion(onCancelling = true) {
controller.abort()
}
callJob.invokeOnCompletion(onCancelling = true) { controller.abort() }

val promise: Promise<org.w3c.fetch.Response> = when {
PlatformUtils.IS_BROWSER -> fetch(input, init)
Expand All @@ -39,12 +39,12 @@ internal suspend fun commonFetch(

promise.then(
onFulfilled = {
continuation.resumeWith(Result.success(it))
continuation.resume(it)
},
onRejected = {
continuation.resumeWith(Result.failure(Error("Fail to fetch", it)))
continuation.resumeWithException(Error("Fail to fetch", it))
}
).finally { abortOnCallCompletion.dispose() }
)
}

private fun AbortController(): AbortController = js("new AbortController()")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2023 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2014-2025 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.client.engine.js.browser
Expand All @@ -8,10 +8,13 @@ import io.ktor.client.engine.js.*
import io.ktor.client.fetch.*
import io.ktor.client.utils.*
import io.ktor.utils.io.*
import kotlinx.coroutines.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.await
import org.khronos.webgl.Uint8Array
import org.w3c.fetch.Response
import kotlin.coroutines.*
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

internal fun CoroutineScope.readBodyBrowser(response: Response): ByteReadChannel {
val stream: ReadableStream<Uint8Array?> = response.body?.unsafeCast() ?: return ByteReadChannel.Empty
Expand All @@ -22,24 +25,24 @@ internal fun CoroutineScope.channelFromStream(
stream: ReadableStream<Uint8Array?>
): ByteReadChannel = writer {
val reader: ReadableStreamDefaultReader<Uint8Array?> = stream.getReader()
while (true) {
try {
try {
while (true) {
val chunk = reader.readChunk() ?: break
channel.writeFully(chunk.asByteArray())
channel.flush()
} catch (cause: Throwable) {
reader.cancel(cause.toJsReference())
throw cause
}
} catch (cause: Throwable) {
reader.cancel(cause.toJsReference()).catch { null }.await<Unit>()
throw cause
}
}.channel

internal suspend fun ReadableStreamDefaultReader<Uint8Array?>.readChunk(): Uint8Array? =
suspendCancellableCoroutine<Uint8Array?> { continuation ->
suspendCoroutine { continuation ->
read().then { stream: ReadableStreamReadResult<Uint8Array?> ->
val chunk = stream.value
val result = if (stream.done || chunk == null) null else chunk
continuation.resumeWith(Result.success(result))
continuation.resume(result)
null
}.catch { cause: JsAny ->
continuation.resumeWithException(JsError(cause))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.js.Promise

@OptIn(InternalCoroutinesApi::class)
Expand All @@ -26,9 +28,7 @@ internal suspend fun commonFetch(
val controller = AbortController()
init.signal = controller.signal

val abortOnCallCompletion = callJob.invokeOnCompletion(onCancelling = true) {
controller.abort()
}
callJob.invokeOnCompletion(onCancelling = true) { controller.abort() }

val promise: Promise<org.w3c.fetch.Response> = when {
PlatformUtils.IS_BROWSER -> fetch(input, init)
Expand All @@ -45,14 +45,14 @@ internal suspend fun commonFetch(

promise.then(
onFulfilled = { x: JsAny ->
continuation.resumeWith(Result.success(x.unsafeCast()))
continuation.resume(x.unsafeCast())
null
},
onRejected = { it: JsAny ->
continuation.resumeWith(Result.failure(Error("Fail to fetch", JsError(it))))
continuation.resumeWithException(Error("Fail to fetch", JsError(it)))
null
}
).finally { abortOnCallCompletion.dispose() }
)
}

private fun AbortController(): AbortController {
Expand Down

0 comments on commit c6bdf2b

Please sign in to comment.