Skip to content

Commit

Permalink
KTOR-8008 Fix uncaught exceptions in websocket pinger
Browse files Browse the repository at this point in the history
  • Loading branch information
bjhham committed Jan 20, 2025
1 parent 5bed42f commit 3b44501
Show file tree
Hide file tree
Showing 11 changed files with 69 additions and 44 deletions.
18 changes: 18 additions & 0 deletions ktor-io/api/ktor-io.api
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,24 @@ public final class io/ktor/utils/io/CloseHookByteWriteChannelKt {
public static final fun onClose (Lio/ktor/utils/io/ByteWriteChannel;Lkotlin/jvm/functions/Function1;)Lio/ktor/utils/io/ByteWriteChannel;
}

public class io/ktor/utils/io/ClosedByteChannelException : java/io/IOException {
public fun <init> ()V
public fun <init> (Ljava/lang/Throwable;)V
public synthetic fun <init> (Ljava/lang/Throwable;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
}

public final class io/ktor/utils/io/ClosedReadChannelException : io/ktor/utils/io/ClosedByteChannelException {
public fun <init> ()V
public fun <init> (Ljava/lang/Throwable;)V
public synthetic fun <init> (Ljava/lang/Throwable;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
}

public final class io/ktor/utils/io/ClosedWriteChannelException : io/ktor/utils/io/ClosedByteChannelException {
public fun <init> ()V
public fun <init> (Ljava/lang/Throwable;)V
public synthetic fun <init> (Ljava/lang/Throwable;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
}

public final class io/ktor/utils/io/ConcurrentIOException : java/lang/IllegalStateException {
public fun <init> (Ljava/lang/String;Ljava/lang/Throwable;)V
public synthetic fun <init> (Ljava/lang/String;Ljava/lang/Throwable;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
Expand Down
12 changes: 12 additions & 0 deletions ktor-io/api/ktor-io.klib.api
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,14 @@ final class io.ktor.utils.io/ByteChannel : io.ktor.utils.io/BufferedByteWriteCha
final suspend fun flushAndClose() // io.ktor.utils.io/ByteChannel.flushAndClose|flushAndClose(){}[0]
}

final class io.ktor.utils.io/ClosedReadChannelException : io.ktor.utils.io/ClosedByteChannelException { // io.ktor.utils.io/ClosedReadChannelException|null[0]
constructor <init>(kotlin/Throwable? = ...) // io.ktor.utils.io/ClosedReadChannelException.<init>|<init>(kotlin.Throwable?){}[0]
}

final class io.ktor.utils.io/ClosedWriteChannelException : io.ktor.utils.io/ClosedByteChannelException { // io.ktor.utils.io/ClosedWriteChannelException|null[0]
constructor <init>(kotlin/Throwable? = ...) // io.ktor.utils.io/ClosedWriteChannelException.<init>|<init>(kotlin.Throwable?){}[0]
}

final class io.ktor.utils.io/ConcurrentIOException : kotlin/IllegalStateException { // io.ktor.utils.io/ConcurrentIOException|null[0]
constructor <init>(kotlin/String, kotlin/Throwable? = ...) // io.ktor.utils.io/ConcurrentIOException.<init>|<init>(kotlin.String;kotlin.Throwable?){}[0]
}
Expand Down Expand Up @@ -293,6 +301,10 @@ open class io.ktor.utils.io.charsets/MalformedInputException : kotlinx.io/IOExce
constructor <init>(kotlin/String) // io.ktor.utils.io.charsets/MalformedInputException.<init>|<init>(kotlin.String){}[0]
}

open class io.ktor.utils.io/ClosedByteChannelException : kotlinx.io/IOException { // io.ktor.utils.io/ClosedByteChannelException|null[0]
constructor <init>(kotlin/Throwable? = ...) // io.ktor.utils.io/ClosedByteChannelException.<init>|<init>(kotlin.Throwable?){}[0]
}

final object io.ktor.utils.io.charsets/Charsets { // io.ktor.utils.io.charsets/Charsets|null[0]
final val ISO_8859_1 // io.ktor.utils.io.charsets/Charsets.ISO_8859_1|{}ISO_8859_1[0]
final fun <get-ISO_8859_1>(): io.ktor.utils.io.charsets/Charset // io.ktor.utils.io.charsets/Charsets.ISO_8859_1.<get-ISO_8859_1>|<get-ISO_8859_1>(){}[0]
Expand Down
12 changes: 6 additions & 6 deletions ktor-io/common/src/io/ktor/utils/io/ByteChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,23 @@ public class ByteChannel(public val autoFlush: Boolean = false) : ByteReadChanne
@InternalAPI
override val readBuffer: Source
get() {
closedCause?.let { throw it }
_closedCause.value?.throwOrNull(::ClosedReadChannelException)
if (_readBuffer.exhausted()) moveFlushToReadBuffer()
return _readBuffer
}

@InternalAPI
override val writeBuffer: Sink
get() {
closedCause?.let { throw it }
if (isClosedForWrite) {
throw IOException("Channel is closed for write")
_closedCause.value?.throwOrNull(::ClosedWriteChannelException)
?: throw ClosedWriteChannelException()
}
return _writeBuffer
}

override val closedCause: Throwable?
get() = _closedCause.value?.cause
get() = _closedCause.value?.wrapCause()

override val isClosedForWrite: Boolean
get() = _closedCause.value != null
Expand Down Expand Up @@ -133,9 +133,9 @@ public class ByteChannel(public val autoFlush: Boolean = false) : ByteReadChanne

val closedToken = CloseToken(cause)
_closedCause.compareAndSet(null, closedToken)
val actualCause = closedToken.cause
val wrappedCause = closedToken.wrapCause()

closeSlot(actualCause)
closeSlot(wrappedCause)
}

override fun toString(): String = "ByteChannel[${hashCode()}]"
Expand Down
41 changes: 11 additions & 30 deletions ktor-io/common/src/io/ktor/utils/io/CloseToken.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,41 +6,22 @@ package io.ktor.utils.io

import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CopyableThrowable
import kotlinx.io.IOException
import kotlinx.coroutines.ExperimentalCoroutinesApi

internal val CLOSED = CloseToken(null)

@Suppress("OPT_IN_USAGE")
internal class CloseToken(origin: Throwable?) {
@OptIn(ExperimentalCoroutinesApi::class)
internal class CloseToken(private val origin: Throwable?) {

private val closedException: Throwable? = when {
origin == null -> null
origin is CancellationException -> {
if (origin is CopyableThrowable<*>) {
origin.createCopy()
} else {
CancellationException(origin.message ?: "Channel was cancelled", origin)
}
fun wrapCause(wrap: (Throwable?) -> Throwable = ::ClosedByteChannelException): Throwable? {
return when (origin) {
null -> null
is CopyableThrowable<*> -> origin.createCopy()
is CancellationException -> CancellationException(origin.message, origin)
else -> wrap(origin)
}

origin is IOException && origin is CopyableThrowable<*> -> origin.createCopy()
else -> IOException(origin.message ?: "Channel was closed", origin)
}

val cause: Throwable?
get() = when {
closedException == null -> null
(closedException is IOException) -> {
if (closedException is CopyableThrowable<*>) {
closedException.createCopy()
} else {
IOException(closedException.message, closedException)
}
}

closedException is CopyableThrowable<*> ->
closedException.createCopy() ?: CancellationException(closedException.message, closedException)

else -> CancellationException(closedException.message, closedException)
}
fun throwOrNull(wrap: (Throwable?) -> Throwable): Unit? =
wrapCause(wrap)?.let { throw it }
}
17 changes: 15 additions & 2 deletions ktor-io/common/src/io/ktor/utils/io/Exceptions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,21 @@

package io.ktor.utils.io

import kotlinx.io.*
import kotlinx.io.IOException

public typealias CancellationException = kotlinx.coroutines.CancellationException

public typealias ClosedWriteChannelException = IOException
/**
* Exception wrapper for causes of byte channel closures.
*/
public open class ClosedByteChannelException(cause: Throwable? = null) : IOException(cause?.message, cause)

/**
* Exception thrown when attempting to write to a closed byte channel.
*/
public class ClosedWriteChannelException(cause: Throwable? = null) : ClosedByteChannelException(cause)

/**
* Exception thrown when attempting to read from a closed byte channel.
*/
public class ClosedReadChannelException(cause: Throwable? = null) : ClosedByteChannelException(cause)
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ internal class SinkByteWriteChannel(origin: RawSink) : ByteWriteChannel {
get() = closed.value != null

override val closedCause: Throwable?
get() = closed.value?.cause
get() = closed.value?.wrapCause()

@InternalAPI
override val writeBuffer: Sink
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ internal class SourceByteReadChannel(private val source: Source) : ByteReadChann
private var closed: CloseToken? = null

override val closedCause: Throwable?
get() = closed?.cause
get() = closed?.wrapCause()

override val isClosedForRead: Boolean
get() = source.exhausted()
Expand Down
2 changes: 1 addition & 1 deletion ktor-io/jvm/src/io/ktor/utils/io/jvm/javaio/Reading.kt
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ internal class RawSourceChannel(
private val buffer = Buffer()

override val closedCause: Throwable?
get() = closedToken?.cause
get() = closedToken?.wrapCause()

override val isClosedForRead: Boolean
get() = closedToken != null && buffer.exhausted()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public abstract class NettyApplicationResponse(
internal fun close() {
val existingChannel = responseChannel
if (existingChannel is ByteWriteChannel) {
existingChannel.close(ClosedWriteChannelException("Application response has been closed"))
existingChannel.close(cause = null)
responseChannel = ByteReadChannel.Empty
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package io.ktor.websocket

import io.ktor.util.*
import io.ktor.util.date.*
import io.ktor.utils.io.ClosedByteChannelException
import io.ktor.utils.io.charsets.*
import io.ktor.utils.io.core.*
import kotlinx.coroutines.*
Expand Down Expand Up @@ -100,6 +101,7 @@ internal fun CoroutineScope.pinger(
} catch (ignore: CancellationException) {
} catch (ignore: ClosedReceiveChannelException) {
} catch (ignore: ClosedSendChannelException) {
} catch (ignore: ClosedByteChannelException) {
}
}

Expand Down
3 changes: 1 addition & 2 deletions ktor-utils/common/src/io/ktor/util/cio/Channels.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package io.ktor.util.cio

import io.ktor.utils.io.errors.*
import kotlinx.io.IOException

/**
Expand All @@ -17,7 +16,7 @@ public open class ChannelIOException(message: String, exception: Throwable) : IO
* An exception that is thrown when an IO error occurred during writing to the destination channel.
* Usually it happens when a remote client closed the connection.
*/
public class ChannelWriteException(message: String = "Cannot write to a channel", exception: Throwable) :
public class ChannelWriteException(message: String = "Cannot write to channel", exception: Throwable) :
ChannelIOException(message, exception)

/**
Expand Down

0 comments on commit 3b44501

Please sign in to comment.