Skip to content

Commit

Permalink
Resolve "Default max_queue blocks websocket cancellation with high …
Browse files Browse the repository at this point in the history
…traffic"
  • Loading branch information
btschwertfeger committed Nov 27, 2024
1 parent 59d4dcf commit 0215b10
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 1 deletion.
3 changes: 3 additions & 0 deletions src/websockets/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,9 @@ async def send_context(
if wait_for_close:
try:
async with asyncio_timeout_at(self.close_deadline):
self.recv_messages.cancelling = True
if self.recv_messages.paused:
self.recv_messages.resume()
await asyncio.shield(self.connection_lost_waiter)
except TimeoutError:
# There's no risk to overwrite another error because
Expand Down
12 changes: 11 additions & 1 deletion src/websockets/asyncio/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,13 @@ def __init__( # pragma: no cover
# This flag prevents concurrent calls to get() by user code.
self.get_in_progress = False

# This flag marks a soon cancellation
self.cancelling = False

# This flag marks the end of the connection.
self.closed = False


async def get(self, decode: bool | None = None) -> Data:
"""
Read the next message.
Expand All @@ -138,6 +142,8 @@ async def get(self, decode: bool | None = None) -> Data:
:meth:`get_iter` concurrently.
"""
if self.cancelling:
return
if self.get_in_progress:
raise ConcurrencyError("get() or get_iter() is already running")
self.get_in_progress = True
Expand Down Expand Up @@ -201,6 +207,8 @@ async def get_iter(self, decode: bool | None = None) -> AsyncIterator[Data]:
:meth:`get_iter` concurrently.
"""
if self.cancelling:
return
if self.get_in_progress:
raise ConcurrencyError("get() or get_iter() is already running")
self.get_in_progress = True
Expand Down Expand Up @@ -251,6 +259,8 @@ def put(self, frame: Frame) -> None:
EOFError: If the stream of frames has ended.
"""
if self.cancelling:
return
if self.closed:
raise EOFError("stream of frames ended")

Expand Down Expand Up @@ -283,7 +293,7 @@ def close(self) -> None:
"""
End the stream of frames.
Callling :meth:`close` concurrently with :meth:`get`, :meth:`get_iter`,
Calling :meth:`close` concurrently with :meth:`get`, :meth:`get_iter`,
or :meth:`put` is safe. They will raise :exc:`EOFError`.
"""
Expand Down

0 comments on commit 0215b10

Please sign in to comment.