Replace eager start() loop with pull()-based backpressure#1803
Replace eager start() loop with pull()-based backpressure#1803SaltyAom merged 2 commits intoelysiajs:mainfrom
Conversation
…ateStreamHandler Prevents OOM when generator produces faster than consumer reads. Removes isBrowser/setTimeout(0) hack — no longer needed with demand-driven pull().
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
WalkthroughRefactors streaming adapter to drive data via a pull()-driven AsyncIterator with an explicit iterator, end guard, abort/cancel handling, and unified chunk formatting (toSSE/JSON/string). Adds regression tests to verify backpressure prevents eager generator draining. Changes
Sequence Diagram(s)sequenceDiagram
participant Producer as Producer\n(async generator)
participant Adapter as Stream Adapter\n(pull + iterator)
participant Consumer as Consumer\n(readable stream)
Consumer->>Adapter: read / request
Adapter->>Producer: iterator.next()
Producer-->>Adapter: chunk
Adapter->>Consumer: enqueue(chunk formatted)
Note over Adapter,Producer: On abort/cancel -> Adapter sets end, calls iterator.return()
Consumer->>Adapter: cancel/abort
Adapter->>Producer: iterator.return()
Producer-->>Adapter: done
Adapter->>Consumer: close
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related issues
Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
commit: |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
🧹 Nitpick comments (1)
test/response/stream.test.ts (1)
644-678: Well-designed regression test for the proxying OOM scenario.The test effectively validates that the upstream generator respects backpressure when a slow consumer reads from a proxy. The structure clearly demonstrates the issue from
#1801.Minor improvement:
reader.cancel()returns a Promise. Awaiting it ensuresiterator.return()completes before the test ends, preventing potential resource leaks or flakiness.🔧 Suggested improvement
- reader.cancel() + await reader.cancel()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@test/response/stream.test.ts` around lines 644 - 678, The test currently calls reader.cancel() without awaiting it, which can leave the upstream async iterator's cleanup (iterator.return()) unfinished and cause resource leaks or flakiness; update the test to await the cancellation Promise returned by reader.cancel() (i.e., change the final call to await reader.cancel()) so the stream/iterator cleanup completes before the test exits.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@test/response/stream.test.ts`:
- Around line 644-678: The test currently calls reader.cancel() without awaiting
it, which can leave the upstream async iterator's cleanup (iterator.return())
unfinished and cause resource leaks or flakiness; update the test to await the
cancellation Promise returned by reader.cancel() (i.e., change the final call to
await reader.cancel()) so the stream/iterator cleanup completes before the test
exits.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: cd671303-7713-4238-9c65-f7f0fb7b9896
📒 Files selected for processing (1)
test/response/stream.test.ts
|
this is fire |
… streaming The handleStream method used `for await...of` inside ReadableStream.start(), which eagerly drains the async generator regardless of consumer speed — no backpressure, potential OOM when proxying large SSE streams. The fix: - start() is now synchronous: sets up abort handler, ping interval, enqueues init value - pull() calls iterator.next() one value at a time, only when consumer is ready - cancel() properly cleans up via shared idempotent cleanup() closure - cleanup() clears ping interval, removes abort listener, terminates iterator - error enqueue is guarded so cleanup + close always run even if stream is closed Added regression test asserting generator is not drained ahead of consumer. Ref: elysiajs/elysia#1803
this resolves #1801
Problem
createStreamHandlerusesnew ReadableStream({ async start() }), which eagerly drains the entire generator into the stream's internal queue without waiting for the consumer. When a generator produces data faster than the consumer can read, such as when SSE proxies a large upstream, memory usage can grow unbounded. This issue was reported as an out-of-memory (OOM) error, escalating from 78 MB to 14.4 GB in seconds.Root Cause
The Streams specification calls
start()only once, immediately, before any reads occur. Placing afor await...ofloop there does not create backpressure; instead, the loop races ahead of the consumer and fills the internal queue without limit.The solution is to use
pull(), which the runtime calls only when the consumer is ready for the next chunk, resulting in one invocation per read.Solution
Extract an explicit
AsyncIteratorbefore constructing theReadableStream, then divide the singleasync start()into three callbacks:start()(sync) — registers the abort handler and enqueues the already-extracted initial valueasync pull()— advances the iterator by one step on demandcancel()— callsiterator.return()to clean up the generatorThis approach also removes the
isBrowser+setTimeout(0)workaround introduced in #741. That hack was necessary because back-to-backenqueue()calls in the same microtask caused chunks to merge in the consumer's buffer. Withpull(), each enqueue occurs in a separate microtask driven by a consumer read, preventing the original issue from recurring.Summary by CodeRabbit
Bug Fixes
Tests