Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 62 additions & 39 deletions src/adapter/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,21 +210,34 @@ export const createStreamHandler =
}
}

const isBrowser = request?.headers.has('Origin')
// Get an explicit async iterator so pull() can advance one step at a time.
// Generators already implement the iterator protocol directly (.next()),
// while ReadableStream (which generator may be reassigned to above) needs
// [Symbol.asyncIterator]() to produce one.
const iterator: AsyncIterator<unknown> =
typeof (generator as any).next === 'function'
? (generator as AsyncIterator<unknown>)
: (generator as any)[Symbol.asyncIterator]()

let end = false

return new Response(
new ReadableStream({
async start(controller) {
let end = false

start(controller) {
// Register abort handler once — terminates the iterator and
// closes the stream so pull() won't be called again.
request?.signal?.addEventListener('abort', () => {
end = true
iterator.return?.()

try {
controller.close()
} catch {}
})

// Enqueue the already-extracted init value (first generator
// result, used above for SSE detection). Subsequent values
// are produced on-demand by pull().
if (!init || init.value instanceof ReadableStream) {
} else if (
init.value !== undefined &&
Expand All @@ -246,51 +259,61 @@ export const createStreamHandler =
}
else controller.enqueue(format(init.value.toString()))
}
},

async pull(controller) {
// Respect abort/cancel that happened between pull() calls.
if (end) {
try {
controller.close()
} catch {}
return
}

try {
for await (const chunk of generator) {
if (end) break
if (chunk === undefined || chunk === null) continue
const { value: chunk, done } = await iterator.next()

if (done || end) {
try {
controller.close()
} catch {}
return
}

// null/undefined chunks are skipped; the runtime will
// call pull() again since nothing was enqueued.
if (chunk === undefined || chunk === null) return

// @ts-ignore
if (chunk.toSSE) {
// @ts-ignore
if (chunk.toSSE) {
// @ts-ignore
controller.enqueue(chunk.toSSE())
} else {
if (typeof chunk === 'object')
try {
controller.enqueue(
format(JSON.stringify(chunk))
)
} catch {
controller.enqueue(
format(chunk.toString())
)
}
else
controller.enqueue(format(chunk.toString()))

if (!allowRapidStream && isBrowser && !isSSE)
/**
* Wait for the next event loop
* otherwise the data will be mixed up
*
* @see https://github.com/elysiajs/elysia/issues/741
*/
await new Promise<void>((resolve) =>
setTimeout(() => resolve(), 0)
controller.enqueue(chunk.toSSE())
} else {
if (typeof chunk === 'object')
try {
controller.enqueue(
format(JSON.stringify(chunk))
)
}
} catch {
controller.enqueue(
format(chunk.toString())
)
}
else
controller.enqueue(format(chunk.toString()))
}
} catch (error) {
console.warn(error)
}

try {
controller.close()
} catch {
// nothing
try {
controller.close()
} catch {}
}
},

cancel() {
end = true
iterator.return?.()
}
}),
set as any
Expand Down
67 changes: 67 additions & 0 deletions test/response/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -640,4 +640,71 @@ describe('Stream', () => {
'event: message\ndata: {"meow":"3"}\n\n'
])
})

// Regression: proxying a large upstream SSE stream caused OOM (#1801).
// The upstream generator must not be drained ahead of the slow consumer.
it('does not buffer unboundedly when proxying a slow-consuming SSE stream', async () => {
const TOTAL = 50
let produced = 0

const upstream = new Elysia().get('/', async function* () {
for (let i = 0; i < TOTAL; i++) {
produced++
yield sse(`message ${i}`)
}
})

// Simulate the OOM scenario: one Elysia instance re-streams another's
// SSE response while the consumer reads slowly.
const proxy = new Elysia().get('/', () =>
upstream.handle(new Request('http://e.ly'))
)

const response = await proxy.handle(req('/'))
const reader = response.body!.getReader()

// Slow consumer: read 3 chunks with pauses between each
await reader.read()
await Bun.sleep(20)
await reader.read()
await Bun.sleep(20)
await reader.read()

// The upstream generator must not have raced ahead and produced all
// TOTAL chunks. A small prefetch buffer is fine, but nowhere near 50.
expect(produced).toBeLessThan(TOTAL)

reader.cancel()
})

// Regression test for https://github.com/elysiajs/elysia/issues/1801
// The generator must not be drained ahead of the consumer (backpressure).
it('does not eagerly drain generator ahead of consumer', async () => {
let nextCallCount = 0

async function* lazyGenerator() {
for (let i = 0; i < 10; i++) {
nextCallCount++
yield String(i)
}
}

const app = new Elysia().get('/', lazyGenerator)

const response = await app.handle(req('/'))
const reader = response.body!.getReader()

// Read only the first 3 chunks
await reader.read()
await reader.read()
await reader.read()

// With pull()-based backpressure the generator should not have
// been advanced far beyond what was consumed. Allow a small buffer
// (ReadableStream may prefetch one extra chunk via pull()) but it
// must not have drained all 10.
expect(nextCallCount).toBeLessThan(10)

reader.cancel()
})
})
Loading