-
-
Notifications
You must be signed in to change notification settings - Fork 133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(MockHttpSocket): handle response stream errors #548
Changes from 3 commits
a192a3a
3510477
69679a7
60f0f27
a36b790
8f84796
41a661f
07834f1
84fca0e
22dedf5
194c9c8
e93cef2
9f04c0e
48f3338
700212c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -239,34 +239,47 @@ export class MockHttpSocket extends MockSocket { | |
} | ||
|
||
if (response.body) { | ||
const reader = response.body.getReader() | ||
|
||
while (true) { | ||
const { done, value } = await reader.read() | ||
|
||
if (done) { | ||
break | ||
try { | ||
const reader = response.body.getReader() | ||
|
||
while (true) { | ||
const { done, value } = await reader.read() | ||
|
||
if (done) { | ||
break | ||
} | ||
|
||
// Flush the headers upon the first chunk in the stream. | ||
// This ensures the consumer will start receiving the response | ||
// as it streams in (subsequent chunks are pushed). | ||
if (httpHeaders.length > 0) { | ||
flushHeaders(value) | ||
continue | ||
} | ||
|
||
// Subsequent body chukns are push to the stream. | ||
this.push(value) | ||
} | ||
|
||
// Flush the headers upon the first chunk in the stream. | ||
// This ensures the consumer will start receiving the response | ||
// as it streams in (subsequent chunks are pushed). | ||
if (httpHeaders.length > 0) { | ||
flushHeaders(value) | ||
continue | ||
} catch (error) { | ||
if (error instanceof Error) { | ||
// Flush the headers on response stream errors. | ||
// This way, the client still receives the "response" event, | ||
// and the actual stream error is forwarded as the "error" | ||
// event on the http.IncomingMessage instance. | ||
flushHeaders() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will emit the |
||
|
||
// Forward response streaming errors as response errors. | ||
/** @todo This doesn't quite do it. */ | ||
// this.destroy(error) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Needs helpI'm struggling to find a way to trigger this: The |
||
} | ||
|
||
// Subsequent body chukns are push to the stream. | ||
this.push(value) | ||
return | ||
} | ||
} | ||
|
||
// If the headers were not flushed up to this point, | ||
// this means the response either had no body or had | ||
// an empty body stream. Flush the headers. | ||
if (httpHeaders.length > 0) { | ||
flushHeaders() | ||
} | ||
flushHeaders() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's safe to call |
||
|
||
// Close the socket if the connection wasn't marked as keep-alive. | ||
if (!this.shouldKeepAlive) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
/** | ||
* @vitest-environment node | ||
*/ | ||
import { vi, it, expect, beforeAll, afterAll } from 'vitest' | ||
import http from 'node:http' | ||
import { Readable } from 'node:stream' | ||
import { HttpServer } from '@open-draft/test-server/http' | ||
import { ClientRequestInterceptor } from '../../../../src/interceptors/ClientRequest' | ||
import { waitForClientRequest } from '../../../helpers' | ||
|
||
function createErrorStream() { | ||
return new ReadableStream({ | ||
start(controller) { | ||
controller.enqueue(new TextEncoder().encode('hello')) | ||
controller.error(new Error('stream error')) | ||
}, | ||
}) | ||
} | ||
|
||
const httpServer = new HttpServer((app) => { | ||
app.get('/resource', (req, res) => { | ||
res.pipe(Readable.fromWeb(createErrorStream())) | ||
}) | ||
}) | ||
|
||
const interceptor = new ClientRequestInterceptor() | ||
|
||
beforeAll(async () => { | ||
interceptor.apply() | ||
await httpServer.listen() | ||
}) | ||
|
||
afterAll(async () => { | ||
interceptor.dispose() | ||
await httpServer.close() | ||
}) | ||
|
||
it('supports ReadableStream as a mocked response', async () => { | ||
const encoder = new TextEncoder() | ||
interceptor.once('request', ({ request }) => { | ||
const stream = new ReadableStream({ | ||
start(controller) { | ||
controller.enqueue(encoder.encode('hello')) | ||
controller.enqueue(encoder.encode(' ')) | ||
controller.enqueue(encoder.encode('world')) | ||
controller.close() | ||
}, | ||
}) | ||
request.respondWith(new Response(stream)) | ||
}) | ||
|
||
const request = http.get('http://example.com/resource') | ||
const { text } = await waitForClientRequest(request) | ||
expect(await text()).toBe('hello world') | ||
}) | ||
|
||
it('forwards ReadableStream errors to the request', async () => { | ||
const requestErrorListener = vi.fn() | ||
const responseErrorListener = vi.fn() | ||
|
||
interceptor.once('request', ({ request }) => { | ||
request.respondWith(new Response(createErrorStream())) | ||
}) | ||
|
||
const request = http.get(httpServer.http.url('/resource')) | ||
request.on('error', requestErrorListener) | ||
request.on('response', (response) => { | ||
response.on('error', responseErrorListener) | ||
}) | ||
|
||
await vi.waitFor(() => { | ||
return new Promise<http.IncomingMessage>((resolve) => { | ||
request.on('response', resolve) | ||
}) | ||
}) | ||
|
||
// Response stream errors are not request errors. | ||
expect(requestErrorListener).not.toHaveBeenCalled() | ||
expect(request.destroyed).toBe(false) | ||
|
||
// Response stream errors are "error" events on the response. | ||
await vi.waitFor(() => { | ||
expect(responseErrorListener).toHaveBeenCalledWith( | ||
new Error('stream error') | ||
) | ||
}) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a blocker, but I think we need to return 500 for any error.
The current implementation swallows the error from the user in case it does not extend
Error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I completely agree! I revisited by previous point and found it wrong. I'm alining the uncaught exceptions handling in #555, and then will update this branch to migrate the implementation from
NodeClientRequest.ts
.