fix: close streaming connections to prevent connection pool exhaustion#21213
fix: close streaming connections to prevent connection pool exhaustion#21213ryan-crabbe merged 12 commits intomainfrom
Conversation
- Add aclose() to CustomStreamWrapper to delegate to underlying stream - Add finally block in async_data_generator to release HTTP connections - Thread shared_session through async_streaming to reuse connection pool - Set finite default timeout (600s) in _get_openai_client
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
Greptile SummaryThis PR fixes a critical connection pool exhaustion bug in streaming responses. The fix addresses three root causes: Key Changes:
The fix ensures HTTP connections are properly released in all scenarios: client disconnects, normal completion, mid-stream errors, and task cancellation. Comprehensive test coverage validates all cleanup paths. Confidence Score: 5/5
|
| Filename | Overview |
|---|---|
| litellm/llms/custom_httpx/aiohttp_transport.py | Fixed critical bug by changing content= to stream= so aclose() properly releases HTTP connections |
| litellm/litellm_core_utils/streaming_handler.py | Added idempotent aclose() method with cancellation shielding to properly cleanup streams |
| litellm/proxy/proxy_server.py | Added finally block to async_data_generator to close response streams with cancellation shielding |
| litellm/router.py | Added finally block to stream_with_fallbacks to cleanup both primary and fallback response streams |
| pyproject.toml | Updated uvicorn constraint to >=0.32.1,<1.0.0 to ensure proper ASGI disconnect signaling |
| tests/test_litellm/test_streaming_connection_cleanup.py | Comprehensive test coverage for transport layer, CustomStreamWrapper, and router cleanup behavior |
Sequence Diagram
sequenceDiagram
participant Client
participant Proxy as proxy_server.py<br/>async_data_generator
participant Router as router.py<br/>stream_with_fallbacks
participant Wrapper as CustomStreamWrapper
participant Transport as AiohttpTransport
participant Provider as LLM Provider
Client->>Proxy: Stream request
Proxy->>Router: Forward to router
Router->>Wrapper: Wrap provider stream
Wrapper->>Transport: Get httpx.Response
Transport->>Provider: HTTP request
Provider-->>Transport: Streaming response
Transport-->>Wrapper: stream= (not content=)
alt Client disconnects mid-stream
Client-xProxy: Disconnect
Note over Proxy: finally block executes
Proxy->>Router: Close generator
Note over Router: finally block executes
Router->>Wrapper: await aclose()
Note over Wrapper: Shield from cancellation
Wrapper->>Transport: Close stream
Transport->>Provider: Release connection to pool
else Normal completion
Provider-->>Wrapper: All chunks
Wrapper-->>Router: Stream chunks
Router-->>Proxy: Stream chunks
Proxy-->>Client: SSE events
Note over Proxy,Router: finally blocks still execute
Proxy->>Router: Cleanup
Router->>Wrapper: await aclose()
Wrapper->>Transport: Close stream
Transport->>Provider: Release connection to pool
end
Last reviewed commit: 28c7cc6
litellm/llms/openai/openai.py
Outdated
| api_base: Optional[str] = None, | ||
| api_version: Optional[str] = None, | ||
| timeout: Union[float, httpx.Timeout] = httpx.Timeout(None), | ||
| timeout: Union[float, httpx.Timeout] = httpx.Timeout(timeout=600.0, connect=5.0), |
There was a problem hiding this comment.
Default timeout change is a behavioral risk
Changing the default from httpx.Timeout(None) (no timeout) to httpx.Timeout(timeout=600.0, connect=5.0) affects any caller of _get_openai_client that doesn't explicitly pass a timeout parameter. Specifically, the moderations API call in litellm/main.py:6150 does not pass a timeout and will now inherit this 600s default instead of no timeout.
While 600s is generous and matches common defaults, this is a silent behavioral change that could cause timeout errors for long-running moderation calls that previously had no timeout. Consider whether this change should be scoped more narrowly (e.g., only in the streaming path) or if it's intentionally applied globally.
Address three additional root causes that prevented connection cleanup on client disconnect: Starlette/Uvicorn disconnect detection gap, content= vs stream= no-op wrapper in aiohttp transport, and anyio CancelledError interrupting cleanup awaits.
|
@greptileai review this please |
Greptile OverviewGreptile SummaryThis PR fixes a connection pool exhaustion bug in the proxy's streaming path. When clients disconnected mid-stream, the upstream HTTP connections (proxy → LLM provider) were never closed, eventually filling the connection pool and blocking new requests.\n\nKey changes:\n- Confidence Score: 3/5
|
| Filename | Overview |
|---|---|
| litellm/litellm_core_utils/streaming_handler.py | Adds aclose() to CustomStreamWrapper for proper stream cleanup with anyio cancellation shielding. Correct approach, minor concern about non-idempotent close and inline import. |
| litellm/llms/custom_httpx/aiohttp_transport.py | Critical fix: Changes content= to stream= in httpx.Response constructor so that aclose() properly propagates to AiohttpResponseStream, enabling connection pool release. |
| litellm/llms/openai/openai.py | Passes shared_session to the async streaming path and changes default timeout from None to 600s. The timeout change is a global behavioral shift already flagged in prior review threads. |
| litellm/proxy/common_request_processing.py | Monkey-patches StreamingResponse.call globally at import time to restore disconnect detection. Uses private Starlette API (starlette._utils.collapse_excgroups) which may break on upgrades. |
| litellm/proxy/proxy_server.py | Adds a finally block to async_data_generator that calls response.aclose() to release HTTP connections on generator exit. Clean and correct. |
| litellm/router.py | Adds finally block in stream_with_fallbacks to close model_response on generator exit, with anyio cancellation shielding. Minor import ordering issue with anyio. |
| tests/test_litellm/test_streaming_connection_cleanup.py | Comprehensive regression test suite covering: transport stream parameter, aclose propagation, fallback-to-close, cancellation shielding, router stream cleanup, and monkey-patch verification. All mock-based. |
Sequence Diagram
sequenceDiagram
participant Client
participant Proxy as Proxy (StreamingResponse)
participant Generator as async_data_generator
participant Router as Router.stream_with_fallbacks
participant CSW as CustomStreamWrapper
participant Transport as AiohttpTransport
participant Provider as LLM Provider
Client->>Proxy: POST /chat/completions (stream=true)
Proxy->>Generator: Start streaming
Generator->>Router: iterate response
Router->>CSW: iterate chunks
CSW->>Transport: iterate stream (stream= param)
Transport->>Provider: HTTP connection (shared_session)
Provider-->>Transport: SSE chunks
Transport-->>CSW: bytes
CSW-->>Router: ModelResponseStream
Router-->>Generator: chunk
Generator-->>Proxy: data: {...}
Proxy-->>Client: SSE chunk
Note over Client: Client disconnects mid-stream
Proxy->>Proxy: _disconnect_aware_call detects disconnect
Proxy->>Generator: cancel task group
Generator->>Generator: finally: response.aclose()
Generator->>CSW: aclose() [shielded]
CSW->>Transport: aclose()
Transport->>Provider: Release HTTP connection
Note over Transport,Provider: Connection returned to pool
Last reviewed commit: 9f09b02
| import orjson | ||
| from fastapi import HTTPException, Request, status | ||
| from fastapi.responses import JSONResponse, Response, StreamingResponse | ||
| from starlette._utils import collapse_excgroups |
There was a problem hiding this comment.
Reliance on private Starlette API
starlette._utils.collapse_excgroups is a private (underscore-prefixed) module. Private APIs can change or be removed in any Starlette release without notice, which could break this monkey-patch silently on upgrade. Consider either:
- Vendoring the
collapse_excgroupsimplementation (it's typically a small context manager for flattening ExceptionGroups) - Adding a version guard or try/except fallback so the proxy degrades gracefully if this import fails
| async def aclose(self): | ||
| if self.completion_stream is not None: | ||
| # Shield from anyio cancellation so cleanup awaits can complete. | ||
| # Without this, CancelledError is thrown into every await during | ||
| # task group cancellation, preventing HTTP connection release. | ||
| import anyio | ||
|
|
||
| with anyio.CancelScope(shield=True): | ||
| try: | ||
| if hasattr(self.completion_stream, "aclose"): | ||
| await self.completion_stream.aclose() | ||
| elif hasattr(self.completion_stream, "close"): | ||
| result = self.completion_stream.close() | ||
| if result is not None: | ||
| await result | ||
| except BaseException: | ||
| pass |
There was a problem hiding this comment.
aclose() is not idempotent — may double-close the stream
aclose() does not set self.completion_stream = None after closing, so calling aclose() twice will attempt to close the underlying stream twice. This can happen because aclose() is called from multiple locations (e.g., async_data_generator's finally block AND stream_with_fallbacks' finally block). While the broad except BaseException: pass prevents errors from propagating, it would be cleaner to guard against double-close:
| async def aclose(self): | |
| if self.completion_stream is not None: | |
| # Shield from anyio cancellation so cleanup awaits can complete. | |
| # Without this, CancelledError is thrown into every await during | |
| # task group cancellation, preventing HTTP connection release. | |
| import anyio | |
| with anyio.CancelScope(shield=True): | |
| try: | |
| if hasattr(self.completion_stream, "aclose"): | |
| await self.completion_stream.aclose() | |
| elif hasattr(self.completion_stream, "close"): | |
| result = self.completion_stream.close() | |
| if result is not None: | |
| await result | |
| except BaseException: | |
| pass | |
| async def aclose(self): | |
| if self.completion_stream is not None: | |
| # Shield from anyio cancellation so cleanup awaits can complete. | |
| # Without this, CancelledError is thrown into every await during | |
| # task group cancellation, preventing HTTP connection release. | |
| import anyio | |
| with anyio.CancelScope(shield=True): | |
| stream = self.completion_stream | |
| self.completion_stream = None | |
| try: | |
| if hasattr(stream, "aclose"): | |
| await stream.aclose() | |
| elif hasattr(stream, "close"): | |
| result = stream.close() | |
| if result is not None: | |
| await result | |
| except BaseException: | |
| pass |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| import json | ||
|
|
||
| import anyio | ||
| import logging |
There was a problem hiding this comment.
Import ordering: anyio mixed with stdlib imports
anyio is a third-party package but is inserted between stdlib imports (json and logging). Per convention, third-party imports should be separated from stdlib imports with a blank line.
| import json | |
| import anyio | |
| import logging | |
| import json | |
| import logging |
And add import anyio in the third-party imports section (after the blank line following stdlib imports).
Context Used: Context from dashboard - CLAUDE.md (source)
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| # Shield from anyio cancellation so cleanup awaits can complete. | ||
| # Without this, CancelledError is thrown into every await during | ||
| # task group cancellation, preventing HTTP connection release. | ||
| import anyio |
There was a problem hiding this comment.
Inline import of anyio inside method
Per the CLAUDE.md style guide: "Avoid imports within methods — place all imports at the top of the file (module-level)." If anyio is intentionally deferred here because streaming_handler.py is in the core SDK and anyio is only a transitive dependency, consider adding a comment explaining why, or move it to the top of the file since httpx (which already depends on anyio) is already a top-level import.
Context Used: Context from dashboard - CLAUDE.md (source)
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Uvicorn 0.31.x falsely advertised ASGI spec_version "2.4" without implementing send() raising OSError on disconnect. Starlette trusted this and skipped its disconnect listener, causing generators to run forever. Uvicorn 0.32.1 corrected this to "2.3", restoring native disconnect detection. The monkey-patch is no longer needed. Also adds fallback_response cleanup in stream_with_fallbacks and moves inline import anyio to module level in streaming_handler.
|
@greptileai please review |
Greptile OverviewGreptile SummaryThis PR fixes streaming connection pool exhaustion by ensuring HTTP connections are properly released when clients disconnect mid-stream. The fix touches three layers: (1) adds
Confidence Score: 4/5
|
| Filename | Overview |
|---|---|
| litellm/litellm_core_utils/streaming_handler.py | Adds aclose() method to CustomStreamWrapper with anyio cancellation shielding and fallback from aclose() to close(). Missing idempotency guard (should set completion_stream = None after closing). |
| litellm/llms/custom_httpx/aiohttp_transport.py | Changed content= to stream= in httpx.Response constructor, correctly preserving lazy streaming semantics and enabling aclose() propagation to AiohttpResponseStream. |
| litellm/llms/openai/openai.py | Passes shared_session through to astreaming() so the streaming path uses the shared connection pool instead of creating a fallback one. |
| litellm/proxy/proxy_server.py | Adds finally block in async_data_generator to call response.aclose(), ensuring HTTP connections are released back to the pool on client disconnect or normal completion. |
| litellm/router.py | Adds finally block in stream_with_fallbacks to close both model_response and fallback_response with anyio cancellation shielding. Import ordering issue with anyio mixed into stdlib imports. |
| pyproject.toml | Bumps uvicorn from ^0.31.1 to >=0.32.1. The >= constraint has no upper bound, which could allow future breaking changes in uvicorn 1.0+. |
| tests/test_litellm/litellm_core_utils/test_streaming_handler.py | Adds three well-structured unit tests for CustomStreamWrapper.aclose() covering normal, no-aclose, and None-stream cases. |
| tests/test_litellm/proxy/test_proxy_server.py | Adds three proxy-level tests verifying async_data_generator calls response.aclose() on early exit, normal completion, and mid-stream error. All mock-based, no real network calls. |
| tests/test_litellm/test_streaming_connection_cleanup.py | New comprehensive regression test file covering aiohttp transport, CustomStreamWrapper.aclose(), cancellation shielding, and router stream_with_fallbacks cleanup. All mock-based. |
Sequence Diagram
sequenceDiagram
participant Client
participant Proxy as Proxy (async_data_generator)
participant Router as Router (stream_with_fallbacks)
participant CSW as CustomStreamWrapper
participant Provider as Provider Stream (httpx/aiohttp)
Client->>Proxy: POST /chat/completions (stream=true)
Proxy->>Router: iterate model_response
Router->>CSW: async for chunk in model_response
CSW->>Provider: read chunks from HTTP stream
Provider-->>CSW: chunk data
CSW-->>Router: yield chunk
Router-->>Proxy: yield chunk
Proxy-->>Client: SSE data chunk
Note over Client: Client disconnects mid-stream
Client--xProxy: connection closed
Proxy->>Proxy: finally: response.aclose()
Proxy->>CSW: aclose() [FallbackStreamWrapper]
CSW->>Router: generator.aclose() triggers finally
Router->>Router: finally: anyio.CancelScope(shield=True)
Router->>CSW: model_response.aclose()
CSW->>CSW: anyio.CancelScope(shield=True)
CSW->>Provider: completion_stream.aclose()
Provider->>Provider: Release HTTP connection back to pool
Note over Provider: Connection available for reuse
Last reviewed commit: f5e3606
| async def aclose(self): | ||
| if self.completion_stream is not None: | ||
| # Shield from anyio cancellation so cleanup awaits can complete. | ||
| # Without this, CancelledError is thrown into every await during | ||
| # task group cancellation, preventing HTTP connection release. | ||
| with anyio.CancelScope(shield=True): | ||
| try: | ||
| if hasattr(self.completion_stream, "aclose"): | ||
| await self.completion_stream.aclose() | ||
| elif hasattr(self.completion_stream, "close"): | ||
| result = self.completion_stream.close() | ||
| if result is not None: | ||
| await result | ||
| except BaseException as e: | ||
| verbose_logger.debug( | ||
| "CustomStreamWrapper.aclose: error closing completion_stream: %s", | ||
| e, | ||
| ) |
There was a problem hiding this comment.
aclose() is not idempotent — may double-close the underlying stream
aclose() does not set self.completion_stream = None after closing, so calling it twice will attempt to close the underlying stream twice. This can happen because aclose() is called from multiple code paths:
async_data_generator'sfinallyblock callsresponse.aclose()(where response is aFallbackStreamWrapperinheriting this method)stream_with_fallbacks'sfinallyblock callsmodel_response.aclose()(triggered when theFallbackStreamWrapper's inner generator is closed)
While the except BaseException prevents crashes, double-closing AiohttpResponseStream calls __aexit__ twice on the aiohttp response, which can corrupt connection pool state. Adding a guard would make this robust:
| async def aclose(self): | |
| if self.completion_stream is not None: | |
| # Shield from anyio cancellation so cleanup awaits can complete. | |
| # Without this, CancelledError is thrown into every await during | |
| # task group cancellation, preventing HTTP connection release. | |
| with anyio.CancelScope(shield=True): | |
| try: | |
| if hasattr(self.completion_stream, "aclose"): | |
| await self.completion_stream.aclose() | |
| elif hasattr(self.completion_stream, "close"): | |
| result = self.completion_stream.close() | |
| if result is not None: | |
| await result | |
| except BaseException as e: | |
| verbose_logger.debug( | |
| "CustomStreamWrapper.aclose: error closing completion_stream: %s", | |
| e, | |
| ) | |
| async def aclose(self): | |
| if self.completion_stream is not None: | |
| stream_to_close = self.completion_stream | |
| self.completion_stream = None | |
| # Shield from anyio cancellation so cleanup awaits can complete. | |
| # Without this, CancelledError is thrown into every await during | |
| # task group cancellation, preventing HTTP connection release. | |
| with anyio.CancelScope(shield=True): | |
| try: | |
| if hasattr(stream_to_close, "aclose"): | |
| await stream_to_close.aclose() | |
| elif hasattr(stream_to_close, "close"): | |
| result = stream_to_close.close() | |
| if result is not None: | |
| await result | |
| except BaseException as e: | |
| verbose_logger.debug( | |
| "CustomStreamWrapper.aclose: error closing completion_stream: %s", | |
| e, | |
| ) |
pyproject.toml
Outdated
| numpydoc = {version = "*", optional = true} # used in utils.py | ||
|
|
||
| uvicorn = {version = "^0.31.1", optional = true} | ||
| uvicorn = {version = ">=0.32.1", optional = true} |
There was a problem hiding this comment.
Unbounded uvicorn version constraint
The constraint >=0.32.1 has no upper bound, meaning it allows any future version including a hypothetical uvicorn 1.0+ with breaking changes. Other dependencies in this file use more conservative constraints (e.g., fastapi = ">=0.120.1" is similarly unbounded, but gunicorn = "^23.0.0" and pydantic = "^2.5.0" are capped). Consider adding an upper bound for consistency and safety:
| uvicorn = {version = ">=0.32.1", optional = true} | |
| uvicorn = {version = ">=0.32.1,<1.0.0", optional = true} |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
|
fixed @greptileai re review |
litellm/proxy/proxy_server.py
Outdated
| finally: | ||
| # Close the response stream to release the underlying HTTP connection | ||
| # back to the connection pool. This prevents pool exhaustion when | ||
| # clients disconnect mid-stream. | ||
| if hasattr(response, "aclose"): | ||
| try: | ||
| await response.aclose() | ||
| except Exception as e: | ||
| verbose_proxy_logger.debug( | ||
| "async_data_generator: error closing response stream: %s", e | ||
| ) |
There was a problem hiding this comment.
Missing cancellation shielding in cleanup
The other two cleanup sites (CustomStreamWrapper.aclose() at streaming_handler.py:166 and stream_with_fallbacks at router.py:1603) both use anyio.CancelScope(shield=True) to protect the aclose() from being interrupted by task cancellation. This finally block does not shield its await response.aclose() call.
When a client disconnects, ASGI servers may cancel the task, injecting CancelledError into any pending await. Without shielding, response.aclose() could itself be cancelled before completing, defeating the purpose of the cleanup.
| finally: | |
| # Close the response stream to release the underlying HTTP connection | |
| # back to the connection pool. This prevents pool exhaustion when | |
| # clients disconnect mid-stream. | |
| if hasattr(response, "aclose"): | |
| try: | |
| await response.aclose() | |
| except Exception as e: | |
| verbose_proxy_logger.debug( | |
| "async_data_generator: error closing response stream: %s", e | |
| ) | |
| finally: | |
| # Close the response stream to release the underlying HTTP connection | |
| # back to the connection pool. This prevents pool exhaustion when | |
| # clients disconnect mid-stream. | |
| # Shield from cancellation so the close awaits can complete. | |
| import anyio | |
| with anyio.CancelScope(shield=True): | |
| if hasattr(response, "aclose"): | |
| try: | |
| await response.aclose() | |
| except Exception as e: | |
| verbose_proxy_logger.debug( | |
| "async_data_generator: error closing response stream: %s", e | |
| ) |
|
fixed @greptileai re review |
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
|
@greptileai please review this |
litellm/proxy/proxy_server.py
Outdated
| # back to the connection pool. This prevents pool exhaustion when | ||
| # clients disconnect mid-stream. | ||
| # Shield from cancellation so the close awaits can complete. | ||
| import anyio |
There was a problem hiding this comment.
Inline import violates style guide
Per CLAUDE.md: "Avoid imports within methods — place all imports at the top of the file (module-level)." The import anyio should be moved to the top-level imports section of proxy_server.py alongside the other third-party imports (around line 33). The same import is already correctly placed at module-level in both streaming_handler.py and router.py.
| import anyio | |
| import anyio # TODO: move to top-level imports per CLAUDE.md style guide |
Context Used: Context from dashboard - CLAUDE.md (source)
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
Resolve conflict in test_proxy_server.py: keep both async_data_generator cleanup tests and store_model_in_db DB config override tests.
Rewrite blog for accuracy against PR #21213, add interactive diagrams, fix technical inaccuracies, and improve readability.
Relevant issues
Fixes #9178 — Connection to backing provider leaking when source request cancels
Fixes #19549 — TCP connections starvation issue on 1.80.10
Helps:
Pre-Submission checklist
Please complete all items before asking a LiteLLM maintainer to review your PR
tests/litellm/directory, Adding at least 1 test is a hard requirement - see detailsmake test-unit@greptileaiand received a Confidence Score of at least 4/5 before requesting a maintainer reviewCI (LiteLLM team)
Branch creation CI run
Link: https://app.circleci.com/pipelines/github/BerriAI/litellm/59224/workflows/443e0fe7-7bad-4ecb-b40d-cdea7fd1b609
CI run for the last commit
Link: https://app.circleci.com/pipelines/github/BerriAI/litellm/60212/workflows/dd8ac434-6e06-4013-abd3-84348fc5d813
Merge / cherry-pick CI run
Links:
Type
🐛 Bug Fix
✅ Test
Changes
The proxy's CustomStreamWrapper is the universal streaming adapter litellm uses to wrap provider specific streams and normalizes them to a single response.
When we create a CustomStreamWrapper, we pass in the raw provider stream, then this raw stream object has a reference to the HTTP connection and we process those chunks as they come in.
When a client disconnected mid-stream, the client → proxy connection closed, but the proxy → provider connection was never released back to the pool. Over time this filled the connection pool and this would cause requests to hang.
The problem is 3 fold (after a lot of digging):
Closing the stream didn't actually close the connection. The aiohttp transport wrapped the response using
content=instead ofstream=when building the httpx.Response. This silently replaced the real stream with a different wrapper that when callingaclose()on the response did nothing and the underlying HTTP connection was never returned to the pool. This leaked on every streaming request, even ones that completed successfully.Nobody was calling close in the first place. Even if aclose() had worked, neither the proxy's
async_data_generatornor the router'sstream_with_fallbackshad finally blocks to call it. When a client disconnected, the response was just abandoned without cleanup. However clients that didn't disconnect were okay and cleaned up correctly. (Also, these finally blocks weren't running because it was just endin the whole thing so had to use anyio to shield cancellation so it would run).Doesn't work for Uvicorn 0.28–0.32 because claimed to support ASGI spec 2.4 but didn't actually implement disconnect signaling. Starlette saw the spec version and skipped its own fallback disconnect detection. This has since been fixed and 0.32+ references ASGI spec 2.3.