Skip to content

Commit

Permalink
Merge pull request #5434 from square/jwilson.0909.race
Browse files Browse the repository at this point in the history
Don't leak incoming bytes when we race incoming data and close
  • Loading branch information
squarejesse authored Sep 9, 2019
2 parents 2cdbbda + 510475a commit 9b60ca8
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class Http2Connection internal constructor(builder: Builder) : Closeable {

@Synchronized internal fun updateConnectionFlowControl(read: Long) {
readBytesTotal += read
val readBytesToAcknowledge = (readBytesTotal - readBytesAcknowledged)
val readBytesToAcknowledge = readBytesTotal - readBytesAcknowledged
if (readBytesToAcknowledge >= okHttpSettings.initialWindowSize / 2) {
writeWindowUpdateLater(0, readBytesToAcknowledge)
readBytesAcknowledged += readBytesToAcknowledge
Expand Down
27 changes: 21 additions & 6 deletions okhttp/src/main/java/okhttp3/internal/http2/Http2Stream.kt
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ class Http2Stream internal constructor(
readBytesDelivered = readBuffer.read(sink, minOf(byteCount, readBuffer.size))
readBytesTotal += readBytesDelivered

val unacknowledgedBytesRead = (readBytesTotal - readBytesAcknowledged)
val unacknowledgedBytesRead = readBytesTotal - readBytesAcknowledged
if (errorExceptionToDeliver == null &&
unacknowledgedBytesRead >= connection.okHttpSettings.initialWindowSize / 2) {
// Flow control: notify the peer that we're ready for more data! Only send a
Expand Down Expand Up @@ -407,6 +407,10 @@ class Http2Stream internal constructor(
connection.updateConnectionFlowControl(read)
}

/**
* Accept bytes on the connection's reader thread. This function avoids holding locks while it
* performs blocking reads for the incoming bytes.
*/
@Throws(IOException::class)
internal fun receive(source: BufferedSource, byteCount: Long) {
var byteCount = byteCount
Expand Down Expand Up @@ -438,14 +442,25 @@ class Http2Stream internal constructor(
if (read == -1L) throw EOFException()
byteCount -= read

// Move the received data to the read buffer to the reader can read it.
// Move the received data to the read buffer to the reader can read it. If this source has
// been closed since this read began we must discard the incoming data and tell the
// connection we've done so.
var bytesDiscarded = 0L
synchronized(this@Http2Stream) {
val wasEmpty = readBuffer.size == 0L
readBuffer.writeAll(receiveBuffer)
if (wasEmpty) {
this@Http2Stream.notifyAll()
if (closed) {
bytesDiscarded = receiveBuffer.size
receiveBuffer.clear()
} else {
val wasEmpty = readBuffer.size == 0L
readBuffer.writeAll(receiveBuffer)
if (wasEmpty) {
this@Http2Stream.notifyAll()
}
}
}
if (bytesDiscarded > 0L) {
updateConnectionFlowControl(bytesDiscarded)
}
}
}

Expand Down

0 comments on commit 9b60ca8

Please sign in to comment.