diff --git a/src/adapter/utils.ts b/src/adapter/utils.ts index 007f0578..69c62cf1 100644 --- a/src/adapter/utils.ts +++ b/src/adapter/utils.ts @@ -210,21 +210,34 @@ export const createStreamHandler = } } - const isBrowser = request?.headers.has('Origin') + // Get an explicit async iterator so pull() can advance one step at a time. + // Generators already implement the iterator protocol directly (.next()), + // while ReadableStream (which generator may be reassigned to above) needs + // [Symbol.asyncIterator]() to produce one. + const iterator: AsyncIterator = + typeof (generator as any).next === 'function' + ? (generator as AsyncIterator) + : (generator as any)[Symbol.asyncIterator]() + + let end = false return new Response( new ReadableStream({ - async start(controller) { - let end = false - + start(controller) { + // Register abort handler once — terminates the iterator and + // closes the stream so pull() won't be called again. request?.signal?.addEventListener('abort', () => { end = true + iterator.return?.() try { controller.close() } catch {} }) + // Enqueue the already-extracted init value (first generator + // result, used above for SSE detection). Subsequent values + // are produced on-demand by pull(). if (!init || init.value instanceof ReadableStream) { } else if ( init.value !== undefined && @@ -246,51 +259,61 @@ export const createStreamHandler = } else controller.enqueue(format(init.value.toString())) } + }, + + async pull(controller) { + // Respect abort/cancel that happened between pull() calls. + if (end) { + try { + controller.close() + } catch {} + return + } try { - for await (const chunk of generator) { - if (end) break - if (chunk === undefined || chunk === null) continue + const { value: chunk, done } = await iterator.next() + + if (done || end) { + try { + controller.close() + } catch {} + return + } + // null/undefined chunks are skipped; the runtime will + // call pull() again since nothing was enqueued. + if (chunk === undefined || chunk === null) return + + // @ts-ignore + if (chunk.toSSE) { // @ts-ignore - if (chunk.toSSE) { - // @ts-ignore - controller.enqueue(chunk.toSSE()) - } else { - if (typeof chunk === 'object') - try { - controller.enqueue( - format(JSON.stringify(chunk)) - ) - } catch { - controller.enqueue( - format(chunk.toString()) - ) - } - else - controller.enqueue(format(chunk.toString())) - - if (!allowRapidStream && isBrowser && !isSSE) - /** - * Wait for the next event loop - * otherwise the data will be mixed up - * - * @see https://github.com/elysiajs/elysia/issues/741 - */ - await new Promise((resolve) => - setTimeout(() => resolve(), 0) + controller.enqueue(chunk.toSSE()) + } else { + if (typeof chunk === 'object') + try { + controller.enqueue( + format(JSON.stringify(chunk)) ) - } + } catch { + controller.enqueue( + format(chunk.toString()) + ) + } + else + controller.enqueue(format(chunk.toString())) } } catch (error) { console.warn(error) - } - try { - controller.close() - } catch { - // nothing + try { + controller.close() + } catch {} } + }, + + cancel() { + end = true + iterator.return?.() } }), set as any diff --git a/test/response/stream.test.ts b/test/response/stream.test.ts index 3254a627..5c25923e 100644 --- a/test/response/stream.test.ts +++ b/test/response/stream.test.ts @@ -640,4 +640,71 @@ describe('Stream', () => { 'event: message\ndata: {"meow":"3"}\n\n' ]) }) + + // Regression: proxying a large upstream SSE stream caused OOM (#1801). + // The upstream generator must not be drained ahead of the slow consumer. + it('does not buffer unboundedly when proxying a slow-consuming SSE stream', async () => { + const TOTAL = 50 + let produced = 0 + + const upstream = new Elysia().get('/', async function* () { + for (let i = 0; i < TOTAL; i++) { + produced++ + yield sse(`message ${i}`) + } + }) + + // Simulate the OOM scenario: one Elysia instance re-streams another's + // SSE response while the consumer reads slowly. + const proxy = new Elysia().get('/', () => + upstream.handle(new Request('http://e.ly')) + ) + + const response = await proxy.handle(req('/')) + const reader = response.body!.getReader() + + // Slow consumer: read 3 chunks with pauses between each + await reader.read() + await Bun.sleep(20) + await reader.read() + await Bun.sleep(20) + await reader.read() + + // The upstream generator must not have raced ahead and produced all + // TOTAL chunks. A small prefetch buffer is fine, but nowhere near 50. + expect(produced).toBeLessThan(TOTAL) + + reader.cancel() + }) + + // Regression test for https://github.com/elysiajs/elysia/issues/1801 + // The generator must not be drained ahead of the consumer (backpressure). + it('does not eagerly drain generator ahead of consumer', async () => { + let nextCallCount = 0 + + async function* lazyGenerator() { + for (let i = 0; i < 10; i++) { + nextCallCount++ + yield String(i) + } + } + + const app = new Elysia().get('/', lazyGenerator) + + const response = await app.handle(req('/')) + const reader = response.body!.getReader() + + // Read only the first 3 chunks + await reader.read() + await reader.read() + await reader.read() + + // With pull()-based backpressure the generator should not have + // been advanced far beyond what was consumed. Allow a small buffer + // (ReadableStream may prefetch one extra chunk via pull()) but it + // must not have drained all 10. + expect(nextCallCount).toBeLessThan(10) + + reader.cancel() + }) })