diff --git a/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Stream.kt b/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Stream.kt index a806365d21c6..7adfbd2b91a6 100644 --- a/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Stream.kt +++ b/okhttp/src/jvmMain/kotlin/okhttp3/internal/http2/Http2Stream.kt @@ -386,7 +386,8 @@ class Http2Stream internal constructor( val unacknowledgedBytesRead = readBytes.unacknowledged if (errorExceptionToDeliver == null && - unacknowledgedBytesRead >= connection.okHttpSettings.initialWindowSize / 2) { + unacknowledgedBytesRead >= connection.okHttpSettings.initialWindowSize / 2 + ) { // Flow control: notify the peer that we're ready for more data! Only send a // WINDOW_UPDATE if the stream isn't in error. connection.writeWindowUpdateLater(id, unacknowledgedBytesRead) @@ -412,8 +413,6 @@ class Http2Stream internal constructor( } if (readBytesDelivered != -1L) { - // Update connection.unacknowledgedBytesRead outside the synchronized block. - updateConnectionFlowControl(readBytesDelivered) return readBytesDelivered } @@ -443,41 +442,39 @@ class Http2Stream internal constructor( internal fun receive(source: BufferedSource, byteCount: Long) { this@Http2Stream.assertThreadDoesntHoldLock() - var byteCount = byteCount + var remainingByteCount = byteCount - while (byteCount > 0L) { + while (remainingByteCount > 0L) { val finished: Boolean val flowControlError: Boolean synchronized(this@Http2Stream) { finished = this.finished - flowControlError = byteCount + readBuffer.size > maxByteCount + flowControlError = remainingByteCount + readBuffer.size > maxByteCount } // If the peer sends more data than we can handle, discard it and close the connection. if (flowControlError) { - source.skip(byteCount) + source.skip(remainingByteCount) closeLater(ErrorCode.FLOW_CONTROL_ERROR) return } // Discard data received after the stream is finished. It's probably a benign race. if (finished) { - source.skip(byteCount) + source.skip(remainingByteCount) return } // Fill the receive buffer without holding any locks. - val read = source.read(receiveBuffer, byteCount) + val read = source.read(receiveBuffer, remainingByteCount) if (read == -1L) throw EOFException() - byteCount -= read + remainingByteCount -= read // Move the received data to the read buffer to the reader can read it. If this source has // been closed since this read began we must discard the incoming data and tell the // connection we've done so. - var bytesDiscarded = 0L synchronized(this@Http2Stream) { if (closed) { - bytesDiscarded = receiveBuffer.size receiveBuffer.clear() } else { val wasEmpty = readBuffer.size == 0L @@ -487,10 +484,15 @@ class Http2Stream internal constructor( } } } - if (bytesDiscarded > 0L) { - updateConnectionFlowControl(bytesDiscarded) - } } + + // Update the connection flow control, as this is a shared resource. + // Even if our stream doesn't need more data, others might. + // But delay updating the stream flow control until that stream has been + // consumed + updateConnectionFlowControl(byteCount) + + // Notify that buffer size changed connection.flowControlListener.receivingStreamWindowChanged(id, readBytes, readBuffer.size) }