-
Notifications
You must be signed in to change notification settings - Fork 9.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix a concurrency problem with web socket cleanup #8121
Conversation
We were seeing crashes in Deflater.deflate with a root cause of closing the deflater while it was in use. The underlying problem is that web socket streams were closed when the web socket was canceled, but not by the thread that owned those streams. This moves stream cleanup to always run on the owning thread: - writer cleanup happens on the task queue - reader cleanup happens after loopReader This also introduces a new function, Streams.cancel() so a failed writer can break an active reader, and vise-versa. Previously we were using close() to do this job. There's a lot of test changes but it's mostly in adding the TaskFaker facet to RealWebSocketTest. The test now runs with a fake clock, with the deterministic benefits that brings. See also: #6719
if (closed) { | ||
throw new AssertionError("Already closed"); | ||
} | ||
try { | ||
getSource().close(); | ||
} catch (IOException ignored) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: convert to Kotlin and get closeQuietly()
!
|
||
// Even if messages are no longer being read we need to wait for the connection close signal. | ||
connectionClose.await() | ||
} catch (e: IOException) { | ||
webSocket.failWebSocket(e, null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved into loopReader()
listener.onOpen(this@RealWebSocket, response) | ||
loopReader() | ||
} catch (e: Exception) { | ||
failWebSocket(e, null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved into loopReader()
} | ||
|
||
if (!failed && streamsToClose != null && receivedCloseCode != -1) { | ||
listener.onClosed(this, code, reason!!) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is now published either when the reader exits, or when the writer writes Close, whichever happens last
} | ||
|
||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all this stuff moved to finishReader()
@@ -542,9 +554,8 @@ class RealWebSocket( | |||
|
|||
return true | |||
} finally { | |||
streamsToClose?.closeQuietly() | |||
readerToClose?.closeQuietly() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bugfix 1: the writer is closing the reader here. Racy and bad!
taskQueue.shutdown() | ||
} | ||
|
||
try { | ||
listener.onFailure(this, e, response) | ||
} finally { | ||
streamsToClose?.closeQuietly() | ||
readerToClose?.closeQuietly() | ||
writerToClose?.closeQuietly() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bugfix 2: the reader is closing the writer here. Racy and bad!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - It's a tough review without really getting into the headspace of RealWebSocket readers and writers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
We were seeing crashes in Deflater.deflate with a root cause of closing the deflater while it was in use.
The underlying problem is that web socket streams were closed when the web socket was canceled, but not by the thread that owned those streams.
This moves stream cleanup to always run on the owning thread:
This also introduces a new function, Streams.cancel() so a failed writer can break an active reader, and vise-versa. Previously we were using close() to do this job.
There's a lot of test changes but it's mostly in adding the TaskFaker facet to RealWebSocketTest. The test now runs with a fake clock, with the deterministic benefits that brings.
See also: #6719