Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(MockHttpSocket): handle response stream errors #548

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 31 additions & 21 deletions src/interceptors/ClientRequest/MockHttpSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import { isPropertyAccessible } from '../../utils/isPropertyAccessible'
import { baseUrlFromConnectionOptions } from '../Socket/utils/baseUrlFromConnectionOptions'
import { parseRawHeaders } from '../Socket/utils/parseRawHeaders'
import { getRawFetchHeaders } from '../../utils/getRawFetchHeaders'
import { RESPONSE_STATUS_CODES_WITHOUT_BODY } from '../../utils/responseUtils'
import {
createServerErrorResponse,
RESPONSE_STATUS_CODES_WITHOUT_BODY,
} from '../../utils/responseUtils'
import { createRequestId } from '../../createRequestId'

type HttpConnectionOptions = any
Expand Down Expand Up @@ -248,34 +251,41 @@ export class MockHttpSocket extends MockSocket {
}

if (response.body) {
const reader = response.body.getReader()

while (true) {
const { done, value } = await reader.read()

if (done) {
break
}

// Flush the headers upon the first chunk in the stream.
// This ensures the consumer will start receiving the response
// as it streams in (subsequent chunks are pushed).
if (httpHeaders.length > 0) {
flushHeaders(value)
continue
try {
const reader = response.body.getReader()

while (true) {
const { done, value } = await reader.read()

if (done) {
break
}

// Flush the headers upon the first chunk in the stream.
// This ensures the consumer will start receiving the response
// as it streams in (subsequent chunks are pushed).
if (httpHeaders.length > 0) {
flushHeaders(value)
continue
}

// Subsequent body chukns are push to the stream.
this.push(value)
}
} catch (error) {
// Coerce response stream errors to 500 responses.
// Don't flush the original response headers because
// unhandled errors translate to 500 error responses forcefully.
this.respondWith(createServerErrorResponse(error))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Treating all the response stream errors as 500 error responses! Consistency.


// Subsequent body chukns are push to the stream.
this.push(value)
return
}
}

// If the headers were not flushed up to this point,
// this means the response either had no body or had
// an empty body stream. Flush the headers.
if (httpHeaders.length > 0) {
flushHeaders()
}
flushHeaders()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's safe to call flushHeaders() because it will do nothing if the headers have already been flushed.


// Close the socket if the connection wasn't marked as keep-alive.
if (!this.shouldKeepAlive) {
Expand Down
23 changes: 22 additions & 1 deletion src/interceptors/ClientRequest/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import { MockAgent, MockHttpsAgent } from './agents'
import { emitAsync } from '../../utils/emitAsync'
import { toInteractiveRequest } from '../../utils/toInteractiveRequest'
import { normalizeClientRequestArgs } from './utils/normalizeClientRequestArgs'
import { isNodeLikeError } from '../../utils/isNodeLikeError'
import { createServerErrorResponse } from '../../utils/responseUtils'

export class ClientRequestInterceptor extends Interceptor<HttpRequestEventMap> {
static symbol = Symbol('client-request-interceptor')
Expand Down Expand Up @@ -144,13 +146,32 @@ export class ClientRequestInterceptor extends Interceptor<HttpRequestEventMap> {
})

if (listenerResult.error) {
socket.errorWith(listenerResult.error)
// Treat thrown Responses as mocked responses.
if (listenerResult.error instanceof Response) {
socket.respondWith(listenerResult.error)
return
}

// Allow mocking Node-like errors.
if (isNodeLikeError(listenerResult.error)) {
socket.errorWith(listenerResult.error)
return
}

// Unhandled exceptions in the request listeners are
// synonymous to unhandled exceptions on the server.
// Those are represented as 500 error responses.
socket.respondWith(createServerErrorResponse(listenerResult.error))
return
}

const mockedResponse = listenerResult.data

if (mockedResponse) {
/**
* @note The `.respondWith()` method will handle "Response.error()".
* Maybe we should make all interceptors do that?
*/
socket.respondWith(mockedResponse)
return
}
Expand Down
1 change: 0 additions & 1 deletion test/modules/http/compliance/http-req-write.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ it('calls all write callbacks before the mocked response', async () => {
method: 'POST',
})
request.write('one', () => {
console.log('write callback!')
request.end()
})

Expand Down
145 changes: 145 additions & 0 deletions test/modules/http/response/http-response-readable-stream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/**
* @vitest-environment node
*/
import { vi, it, expect, beforeAll, afterEach, afterAll } from 'vitest'
import { performance } from 'node:perf_hooks'
import http from 'node:http'
import https from 'node:https'
import { DeferredPromise } from '@open-draft/deferred-promise'
import { ClientRequestInterceptor } from '../../../../src/interceptors/ClientRequest'
import { sleep, waitForClientRequest } from '../../../helpers'

type ResponseChunks = Array<{ buffer: Buffer; timestamp: number }>

const encoder = new TextEncoder()

const interceptor = new ClientRequestInterceptor()

beforeAll(async () => {
interceptor.apply()
})

afterEach(() => {
interceptor.removeAllListeners()
})

afterAll(async () => {
interceptor.dispose()
})

it('supports ReadableStream as a mocked response', async () => {
const encoder = new TextEncoder()
interceptor.once('request', ({ request }) => {
const stream = new ReadableStream({
start(controller) {
controller.enqueue(encoder.encode('hello'))
controller.enqueue(encoder.encode(' '))
controller.enqueue(encoder.encode('world'))
controller.close()
},
})
request.respondWith(new Response(stream))
})

const request = http.get('http://example.com/resource')
const { text } = await waitForClientRequest(request)
expect(await text()).toBe('hello world')
})

it('supports delays when enqueuing chunks', async () => {
interceptor.once('request', ({ request }) => {
const stream = new ReadableStream({
async start(controller) {
controller.enqueue(encoder.encode('first'))
await sleep(200)

controller.enqueue(encoder.encode('second'))
await sleep(200)

controller.enqueue(encoder.encode('third'))
await sleep(200)

controller.close()
},
})

request.respondWith(
new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
},
})
)
})

const responseChunksPromise = new DeferredPromise<ResponseChunks>()

const request = https.get('https://api.example.com/stream', (response) => {
const chunks: ResponseChunks = []

response
.on('data', (data) => {
chunks.push({
buffer: Buffer.from(data),
timestamp: performance.now(),
})
})
.on('end', () => {
responseChunksPromise.resolve(chunks)
})
.on('error', responseChunksPromise.reject)
})

request.on('error', responseChunksPromise.reject)

const responseChunks = await responseChunksPromise
const textChunks = responseChunks.map((chunk) => {
return chunk.buffer.toString('utf8')
})
expect(textChunks).toEqual(['first', 'second', 'third'])

// Ensure that the chunks were sent over time,
// respecting the delay set in the mocked stream.
const chunkTimings = responseChunks.map((chunk) => chunk.timestamp)
expect(chunkTimings[1] - chunkTimings[0]).toBeGreaterThanOrEqual(150)
expect(chunkTimings[2] - chunkTimings[1]).toBeGreaterThanOrEqual(150)
})

it('forwards ReadableStream errors to the request', async () => {
const requestErrorListener = vi.fn()
const responseErrorListener = vi.fn()

interceptor.once('request', ({ request }) => {
const stream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode('original'))
queueMicrotask(() => {
controller.error(new Error('stream error'))
})
},
})
request.respondWith(new Response(stream))
})

const request = http.get('http://localhost/resource')
request.on('error', requestErrorListener)
request.on('response', (response) => {
response.on('error', responseErrorListener)
})

const response = await vi.waitFor(() => {
return new Promise<http.IncomingMessage>((resolve) => {
request.on('response', resolve)
})
})

// Response stream errors are translated to unhandled exceptions,
// and then the server decides how to handle them. This is often
// done as returning a 500 response.
expect(response.statusCode).toBe(500)
expect(response.statusMessage).toBe('Unhandled Exception')

// Response stream errors are not request errors.
expect(requestErrorListener).not.toHaveBeenCalled()
expect(request.destroyed).toBe(false)
})
78 changes: 0 additions & 78 deletions test/modules/http/response/readable-stream.test.ts

This file was deleted.

Loading
Loading