From dfa4f5ac880c6af7c0fff865583e41476e057cb2 Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Fri, 5 May 2023 18:17:31 +0100 Subject: [PATCH 1/4] Attempt a fix for stalled streams --- .../jvmTest/kotlin/okhttp3/TwoRequestTest.kt | 58 +++++++++++++++++++ .../okhttp3/internal/http2/Http2Connection.kt | 2 +- .../okhttp3/internal/http2/Http2Stream.kt | 33 ++++++----- 3 files changed, 77 insertions(+), 16 deletions(-) create mode 100644 okhttp-coroutines/src/jvmTest/kotlin/okhttp3/TwoRequestTest.kt diff --git a/okhttp-coroutines/src/jvmTest/kotlin/okhttp3/TwoRequestTest.kt b/okhttp-coroutines/src/jvmTest/kotlin/okhttp3/TwoRequestTest.kt new file mode 100644 index 000000000000..edded512d2b3 --- /dev/null +++ b/okhttp-coroutines/src/jvmTest/kotlin/okhttp3/TwoRequestTest.kt @@ -0,0 +1,58 @@ +@file:OptIn(ExperimentalCoroutinesApi::class) + +package okhttp3 + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.runTest +import okhttp3.HttpUrl.Companion.toHttpUrl +import org.junit.jupiter.api.Test + +class TwoRequestTest { + val file = "https://storage.googleapis.com/downloads.webmproject.org/av1/exoplayer/bbb-av1-480p.mp4".toHttpUrl() + + val client = OkHttpClient.Builder() + .protocols(listOf(Protocol.HTTP_2, Protocol.HTTP_1_1)) + .build() + + val extraRequests = false + val stalledRequest = true + + @Test + fun testTwoQueries() = runTest { + if (extraRequests) { + val callFull = client.newCall( + Request(file, Headers.headersOf("Icy-MetaData", "1", "Accept-Encoding", "identity")) + ) + callFull.executeAsync() + + callFull.cancel() + + val callEnd = client.newCall( + Request(file, Headers.headersOf("Range", "bytes=37070547-", "Icy-MetaData", "1", "Accept-Encoding", "identity")) + ) + callEnd.executeAsync() + callEnd.cancel() + } + + if (stalledRequest) { + val callStart = client.newCall( + Request(file, Headers.headersOf("Range", "bytes=44-", "Icy-MetaData", "1", "Accept-Encoding", "identity")) + ) + val responseStart = callStart.executeAsync() + val bodyStart = responseStart.body + bodyStart.byteStream().readNBytes(100_000) + } + + val callDownload = client.newCall(Request(file)) + val responseDownload = callDownload.executeAsync() + val bodyDownload = responseDownload.body + + while (true) { + val bytes = bodyDownload.byteStream().readNBytes(100_000) + + if (bytes.isEmpty()) { + break + } + } + } +} diff --git a/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Connection.kt b/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Connection.kt index 61815da06386..36cda473df43 100644 --- a/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Connection.kt +++ b/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Connection.kt @@ -188,7 +188,7 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { @Synchronized internal fun updateConnectionFlowControl(read: Long) { readBytesTotal += read val readBytesToAcknowledge = readBytesTotal - readBytesAcknowledged - if (readBytesToAcknowledge >= okHttpSettings.initialWindowSize / 2) { + if (readBytesToAcknowledge >= Http2Stream.windowThreshold(okHttpSettings)) { writeWindowUpdateLater(0, readBytesToAcknowledge) readBytesAcknowledged += readBytesToAcknowledge } diff --git a/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Stream.kt b/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Stream.kt index d91ed4bf6e9d..d5bab7da1f21 100644 --- a/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Stream.kt +++ b/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Stream.kt @@ -389,8 +389,10 @@ class Http2Stream internal constructor( readBytesTotal += readBytesDelivered val unacknowledgedBytesRead = readBytesTotal - readBytesAcknowledged + val okHttpSettings = connection.okHttpSettings if (errorExceptionToDeliver == null && - unacknowledgedBytesRead >= connection.okHttpSettings.initialWindowSize / 2) { + unacknowledgedBytesRead >= windowThreshold(okHttpSettings) + ) { // Flow control: notify the peer that we're ready for more data! Only send a // WINDOW_UPDATE if the stream isn't in error. connection.writeWindowUpdateLater(id, unacknowledgedBytesRead) @@ -415,8 +417,6 @@ class Http2Stream internal constructor( } if (readBytesDelivered != -1L) { - // Update connection.unacknowledgedBytesRead outside the synchronized block. - updateConnectionFlowControl(readBytesDelivered) return readBytesDelivered } @@ -446,41 +446,39 @@ class Http2Stream internal constructor( internal fun receive(source: BufferedSource, byteCount: Long) { this@Http2Stream.assertThreadDoesntHoldLock() - var byteCount = byteCount + var remainingByteCount = byteCount - while (byteCount > 0L) { + while (remainingByteCount > 0L) { val finished: Boolean val flowControlError: Boolean synchronized(this@Http2Stream) { finished = this.finished - flowControlError = byteCount + readBuffer.size > maxByteCount + flowControlError = remainingByteCount + readBuffer.size > maxByteCount } // If the peer sends more data than we can handle, discard it and close the connection. if (flowControlError) { - source.skip(byteCount) + source.skip(remainingByteCount) closeLater(ErrorCode.FLOW_CONTROL_ERROR) return } // Discard data received after the stream is finished. It's probably a benign race. if (finished) { - source.skip(byteCount) + source.skip(remainingByteCount) return } // Fill the receive buffer without holding any locks. - val read = source.read(receiveBuffer, byteCount) + val read = source.read(receiveBuffer, remainingByteCount) if (read == -1L) throw EOFException() - byteCount -= read + remainingByteCount -= read // Move the received data to the read buffer to the reader can read it. If this source has // been closed since this read began we must discard the incoming data and tell the // connection we've done so. - var bytesDiscarded = 0L synchronized(this@Http2Stream) { if (closed) { - bytesDiscarded = receiveBuffer.size receiveBuffer.clear() } else { val wasEmpty = readBuffer.size == 0L @@ -490,10 +488,13 @@ class Http2Stream internal constructor( } } } - if (bytesDiscarded > 0L) { - updateConnectionFlowControl(bytesDiscarded) - } } + + // Update the connection flow control, as this is a shared resource. + // Even if our stream doesn't need more data, others might. + // But delay updating the stream flow control until that stream has been + // consumed + updateConnectionFlowControl(byteCount) } override fun timeout(): Timeout = readTimeout @@ -656,6 +657,8 @@ class Http2Stream internal constructor( } companion object { + internal fun windowThreshold(okHttpSettings: Settings) = okHttpSettings.initialWindowSize / 2 + internal const val EMIT_BUFFER_SIZE = 16384L } From 31961959d8159cd8b5f2c2964b1e6e11ce06a5be Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Fri, 5 May 2023 18:19:59 +0100 Subject: [PATCH 2/4] Fix --- .../src/jvmMain/kotlin/okhttp3/internal/http2/Http2Stream.kt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Stream.kt b/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Stream.kt index d5bab7da1f21..50b93da96d63 100644 --- a/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Stream.kt +++ b/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Stream.kt @@ -389,9 +389,8 @@ class Http2Stream internal constructor( readBytesTotal += readBytesDelivered val unacknowledgedBytesRead = readBytesTotal - readBytesAcknowledged - val okHttpSettings = connection.okHttpSettings if (errorExceptionToDeliver == null && - unacknowledgedBytesRead >= windowThreshold(okHttpSettings) + unacknowledgedBytesRead >= windowThreshold(connection.okHttpSettings) ) { // Flow control: notify the peer that we're ready for more data! Only send a // WINDOW_UPDATE if the stream isn't in error. From 38cd4f56140589630b346528bc0051a6fcef55cb Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Wed, 10 May 2023 08:51:02 +0100 Subject: [PATCH 3/4] Cleanup --- .../jvmTest/kotlin/okhttp3/TwoRequestTest.kt | 58 ------------------- .../okhttp3/internal/http2/Http2Connection.kt | 2 +- .../okhttp3/internal/http2/Http2Stream.kt | 3 +- 3 files changed, 2 insertions(+), 61 deletions(-) delete mode 100644 okhttp-coroutines/src/jvmTest/kotlin/okhttp3/TwoRequestTest.kt diff --git a/okhttp-coroutines/src/jvmTest/kotlin/okhttp3/TwoRequestTest.kt b/okhttp-coroutines/src/jvmTest/kotlin/okhttp3/TwoRequestTest.kt deleted file mode 100644 index edded512d2b3..000000000000 --- a/okhttp-coroutines/src/jvmTest/kotlin/okhttp3/TwoRequestTest.kt +++ /dev/null @@ -1,58 +0,0 @@ -@file:OptIn(ExperimentalCoroutinesApi::class) - -package okhttp3 - -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.test.runTest -import okhttp3.HttpUrl.Companion.toHttpUrl -import org.junit.jupiter.api.Test - -class TwoRequestTest { - val file = "https://storage.googleapis.com/downloads.webmproject.org/av1/exoplayer/bbb-av1-480p.mp4".toHttpUrl() - - val client = OkHttpClient.Builder() - .protocols(listOf(Protocol.HTTP_2, Protocol.HTTP_1_1)) - .build() - - val extraRequests = false - val stalledRequest = true - - @Test - fun testTwoQueries() = runTest { - if (extraRequests) { - val callFull = client.newCall( - Request(file, Headers.headersOf("Icy-MetaData", "1", "Accept-Encoding", "identity")) - ) - callFull.executeAsync() - - callFull.cancel() - - val callEnd = client.newCall( - Request(file, Headers.headersOf("Range", "bytes=37070547-", "Icy-MetaData", "1", "Accept-Encoding", "identity")) - ) - callEnd.executeAsync() - callEnd.cancel() - } - - if (stalledRequest) { - val callStart = client.newCall( - Request(file, Headers.headersOf("Range", "bytes=44-", "Icy-MetaData", "1", "Accept-Encoding", "identity")) - ) - val responseStart = callStart.executeAsync() - val bodyStart = responseStart.body - bodyStart.byteStream().readNBytes(100_000) - } - - val callDownload = client.newCall(Request(file)) - val responseDownload = callDownload.executeAsync() - val bodyDownload = responseDownload.body - - while (true) { - val bytes = bodyDownload.byteStream().readNBytes(100_000) - - if (bytes.isEmpty()) { - break - } - } - } -} diff --git a/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Connection.kt b/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Connection.kt index 36cda473df43..61815da06386 100644 --- a/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Connection.kt +++ b/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Connection.kt @@ -188,7 +188,7 @@ class Http2Connection internal constructor(builder: Builder) : Closeable { @Synchronized internal fun updateConnectionFlowControl(read: Long) { readBytesTotal += read val readBytesToAcknowledge = readBytesTotal - readBytesAcknowledged - if (readBytesToAcknowledge >= Http2Stream.windowThreshold(okHttpSettings)) { + if (readBytesToAcknowledge >= okHttpSettings.initialWindowSize / 2) { writeWindowUpdateLater(0, readBytesToAcknowledge) readBytesAcknowledged += readBytesToAcknowledge } diff --git a/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Stream.kt b/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Stream.kt index 50b93da96d63..182ffe12c67e 100644 --- a/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Stream.kt +++ b/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Stream.kt @@ -390,7 +390,7 @@ class Http2Stream internal constructor( val unacknowledgedBytesRead = readBytesTotal - readBytesAcknowledged if (errorExceptionToDeliver == null && - unacknowledgedBytesRead >= windowThreshold(connection.okHttpSettings) + unacknowledgedBytesRead >= connection.okHttpSettings.initialWindowSize / 2 ) { // Flow control: notify the peer that we're ready for more data! Only send a // WINDOW_UPDATE if the stream isn't in error. @@ -656,7 +656,6 @@ class Http2Stream internal constructor( } companion object { - internal fun windowThreshold(okHttpSettings: Settings) = okHttpSettings.initialWindowSize / 2 internal const val EMIT_BUFFER_SIZE = 16384L } From f187a3311796a9bb331953fb8114cd6ead043669 Mon Sep 17 00:00:00 2001 From: Yuri Schimke Date: Wed, 10 May 2023 08:53:10 +0100 Subject: [PATCH 4/4] Fix --- okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Stream.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Stream.kt b/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Stream.kt index 182ffe12c67e..d1c0817bfc7a 100644 --- a/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Stream.kt +++ b/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Stream.kt @@ -656,7 +656,6 @@ class Http2Stream internal constructor( } companion object { - internal const val EMIT_BUFFER_SIZE = 16384L }