Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel response stream when connection closes #9071

Merged
merged 4 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/modern-ways-develop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@astrojs/node': patch
---

Fixes a bug where the response stream would not cancel when the connection closed
12 changes: 8 additions & 4 deletions packages/integrations/node/src/nodeMiddleware.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import type { NodeApp } from 'astro/app/node';
import type { ServerResponse } from 'node:http';
import type { Readable } from 'stream';
import { createOutgoingHttpHeaders } from './createOutgoingHttpHeaders.js';
import { responseIterator } from './response-iterator.js';
import type { ErrorHandlerParams, Options, RequestHandlerParams } from './types.js';

// Disable no-unused-vars to avoid breaking signature change
Expand Down Expand Up @@ -79,8 +77,14 @@ async function writeWebResponse(app: NodeApp, res: ServerResponse, webResponse:
res.writeHead(status, nodeHeaders);
if (webResponse.body) {
try {
for await (const chunk of responseIterator(webResponse) as unknown as Readable) {
res.write(chunk);
const reader = webResponse.body.getReader();
res.on("close", () => {
reader.cancel();
})
let result = await reader.read();
while (!result.done) {
res.write(result.value);
result = await reader.read();
}
} catch (err: any) {
console.error(err?.stack || err?.message || String(err));
Expand Down
228 changes: 0 additions & 228 deletions packages/integrations/node/src/response-iterator.ts

This file was deleted.

19 changes: 19 additions & 0 deletions packages/integrations/node/test/api-route.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,23 @@ describe('API routes', () => {
let [out] = await done;
expect(new Uint8Array(out.buffer)).to.deep.equal(expectedDigest);
});

it('Can bail on streaming', async () => {
const { handler } = await import('./fixtures/api-route/dist/server/entry.mjs');
let { req, res, done } = createRequestAndResponse({
url: '/streaming',
});

let locals = { cancelledByTheServer: false };

handler(req, res, () => {}, locals);
req.send();

await new Promise((resolve) => setTimeout(resolve, 500));
res.emit("close");
pilcrowonpaper marked this conversation as resolved.
Show resolved Hide resolved

await done;

expect(locals).to.deep.include({ cancelledByTheServer: true });
});
});
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import crypto from 'node:crypto';

export async function post({ request }: { request: Request }) {
export async function POST({ request }: { request: Request }) {
const hash = crypto.createHash('sha256');

const iterable = request.body as unknown as AsyncIterable<Uint8Array>;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
export const GET = ({ locals }) => {
let sentChunks = 0;

const readableStream = new ReadableStream({
async pull(controller) {
if (sentChunks === 3) return controller.close();
else sentChunks++;

await new Promise(resolve => setTimeout(resolve, 1000));
controller.enqueue(new TextEncoder().encode('hello\n'));
},
cancel() {
locals.cancelledByTheServer = true;
}
});

return new Response(readableStream, {
headers: {
"Content-Type": "text/event-stream"
}
});
}