Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(MockHttpSocket): exhaust .write() callbacks for mocked requests #542

Merged
merged 5 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion src/interceptors/ClientRequest/MockHttpSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ export class MockHttpSocket extends MockSocket {
public passthrough(): void {
const socket = this.createConnection()

// Write the buffered request body chunks.
// Flush the buffered "socket.write()" calls onto
// the original socket instance (i.e. write request body).
// Exhaust the "requestBuffer" in case this Socket
// gets reused for different requests.
let writeArgs: NormalizedWriteArgs | undefined
Expand Down Expand Up @@ -187,6 +188,7 @@ export class MockHttpSocket extends MockSocket {
.on('prefinish', () => this.emit('prefinish'))
.on('finish', () => this.emit('finish'))
.on('close', (hadError) => this.emit('close', hadError))
.on('end', () => this.emit('end'))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never forwarded the end event of the original socket. I think this should be added.

}

/**
Expand All @@ -205,6 +207,16 @@ export class MockHttpSocket extends MockSocket {
this.mockConnect()
this.responseType = 'mock'

// Exhaust the "socket.write()" callbacks to
// transition the Writable into the right state
// (i.e. "request body written" state). There's nowhere
// to actually write the data so we disregard it.
// It has been used by the request parser to construct
// a Fetch API Request instance representing this request.
for (const [_, __, writeCallback] of this.writeBuffer) {
writeCallback?.()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual fix.

When ClientRequest writes the data to the socket, the .write() method of the MockSocket just buffers the chunks in this.writeBuffer. It never actually does any writes (we cannot do that until we know if the connection is possible) and so it never called any write callbacks (such as the onFinish callback).

Copy link
Contributor

@mikicho mikicho Mar 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we now can/should return false in the mocked socket.write?

Copy link
Contributor

@mikicho mikicho Mar 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I see we are calling the write callback after the response, which causes a deadlock because not all data is ready for the user:

const http = require('http')
const sinon = require('sinon')
const { ClientRequestInterceptor } = require('@mswjs/interceptors/ClientRequest')

const interceptor = new ClientRequestInterceptor()

interceptor.on('request', async function rootListener({ request }) {
  console.log('before');
  await request.arrayBuffer()
  console.log('never get here');
  request.respondWith(new Response('OK!'))
})
interceptor.apply()

const reqWriteCallback = sinon.spy()

const req = http.request(
  {
    host: 'example.com',
    method: 'POST',
    path: '/',
    port: 80,
  },
  res => {
    console.log(2)
    res.on('end', () => {
      console.log(3);
    })
    // Streams start in 'paused' mode and must be started.
    // See https://nodejs.org/api/stream.html#stream_class_stream_readable
    res.resume()
  },
)

req.write('mamma mia', null, () => {
  console.log(1);
  reqWriteCallback()
  req.end()
})

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you talking about the response event when you mention "response"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean we call the write callback inside the responseWith function. sorry for the confusion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that the expected order?

TL;DR: I think yes.

It's a design choice. If I understand you correctly, you would rather the whole request body to be ready for the user in the interceptor so that they can do await request.arrayBuffer without a fear of deadlock like this.
We can also say that the body isn't guaranteed to be ready, and they user needs to take care of this scenario by itself (I think they can with request.arrayBuffer().then(..))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, not quite. The request event is emitted as soon as the request's headers are sent. So the request body may still be streaming. If the user decides to read it, whichever body reading method will return a Promise, and the entire request listener would have to wait for that promise.

I wonder if that conflicts with the write callbacks. As in, in order to read the request body, the socket has to call the callbacks of .write(), and we are calling those only as a part of respondWith().

That shouldn't be the case though. The "request body" the user is reading in the request listener is our internal request body buffer where we buffer the chunks pushed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That shouldn't be the case though. The "request body" the user is reading in the request listener is our internal request body buffer where we buffer the chunks pushed.

But the internal request body never ends because we never call the write callback, which ends the request body. (we never do: this.requestStream.push(null))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we should be doing this. Need to look more into this behavior but it's the actual request that calls .end() on the writable stream. Perhaps the issue is that we are not translating that to this.requestStream.push(null).

Copy link
Member Author

@kettanaito kettanaito Apr 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've pushed a fix in 7be0242.

Basically, if something attempts to read the request buffer (e.g. await request.text() in the interceptor), the MockHttpSocket will immediately start flushing all the write callbacks.

If nothing reads the buffer, the write callbacks will be flushed before the mocked response starts streaming or before the original request is made.

@mikicho, can you please take a look if this fixes your use case? I put it in the test also with that commit.

}

const httpHeaders: Array<Buffer> = []

httpHeaders.push(
Expand Down
86 changes: 68 additions & 18 deletions src/interceptors/Socket/MockSocket.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/**
* @vitest-environment node
*/
import { Socket } from 'node:net'
import { vi, it, expect } from 'vitest'
import { MockSocket } from './MockSocket'

Expand Down Expand Up @@ -117,6 +118,20 @@ it('calls the "write" on "socket.end()" without any arguments', () => {
expect(writeCallback).toHaveBeenCalledWith(undefined, undefined, undefined)
})

it('emits "finished" on .end() without any arguments', async () => {
const finishListener = vi.fn()
const socket = new MockSocket({
write: vi.fn(),
read: vi.fn(),
})
socket.on('finish', finishListener)
socket.end()

await vi.waitFor(() => {
expect(finishListener).toHaveBeenCalledTimes(1)
})
})

it('calls the "read" on "socket.read(chunk)"', () => {
const readCallback = vi.fn()
const socket = new MockSocket({
Expand Down Expand Up @@ -150,43 +165,74 @@ it('calls the "read" on "socket.read(null)"', () => {
expect(readCallback).toHaveBeenCalledWith(null, undefined)
})

it('updates the readable/writable state on "socket.end()"', async () => {
it('updates the writable state on "socket.end()"', async () => {
const finishListener = vi.fn()
const endListener = vi.fn()
const socket = new MockSocket({
write: vi.fn(),
read: vi.fn(),
})
socket.on('finish', finishListener)
socket.on('end', endListener)

expect(socket.writable).toBe(true)
expect(socket.writableEnded).toBe(false)
expect(socket.writableFinished).toBe(false)
expect(socket.readable).toBe(true)
expect(socket.readableEnded).toBe(false)

socket.write('hello')
// Finish the writable stream.
socket.end()

expect(socket.writable).toBe(false)
expect(socket.writableEnded).toBe(true)
expect(socket.readable).toBe(true)

// The "finish" event is emitted when writable is done.
// I.e. "socket.end()" is called.
await vi.waitFor(() => {
socket.once('finish', () => {
expect(socket.writableFinished).toBe(true)
})
expect(finishListener).toHaveBeenCalledTimes(1)
})
expect(socket.writableFinished).toBe(true)
})

it('updates the readable state on "socket.push(null)"', async () => {
const endListener = vi.fn()
const socket = new MockSocket({
write: vi.fn(),
read: vi.fn(),
})
socket.on('end', endListener)

expect(socket.readable).toBe(true)
expect(socket.readableEnded).toBe(false)

socket.push('hello')
socket.push(null)

expect(socket.readable).toBe(true)
expect(socket.readableEnded).toBe(false)

// Read the data to free the buffer and
// make Socket emit "end".
socket.read()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a broken test. We have to call .read() to exhaust the readable stream so it emits the end event.


await vi.waitFor(() => {
socket.once('end', () => {
expect(socket.readableEnded).toBe(true)
})
expect(endListener).toHaveBeenCalledTimes(1)
})
expect(socket.readable).toBe(false)
expect(socket.readableEnded).toBe(true)
})

it('updates the readable/writable state on "socket.destroy()"', async () => {
const finishListener = vi.fn()
const endListener = vi.fn()
const closeListener = vi.fn()
const socket = new MockSocket({
write: vi.fn(),
read: vi.fn(),
})
socket.on('finish', finishListener)
socket.on('end', endListener)
socket.on('close', closeListener)

expect(socket.writable).toBe(true)
expect(socket.writableEnded).toBe(false)
Expand All @@ -198,17 +244,21 @@ it('updates the readable/writable state on "socket.destroy()"', async () => {
expect(socket.writable).toBe(false)
// The ".end()" wasn't called.
expect(socket.writableEnded).toBe(false)
expect(socket.writableFinished).toBe(false)
expect(socket.readable).toBe(false)

await vi.waitFor(() => {
socket.once('finish', () => {
expect(socket.writableFinished).toBe(true)
})
expect(closeListener).toHaveBeenCalledTimes(1)
})

await vi.waitFor(() => {
socket.once('end', () => {
expect(socket.readableEnded).toBe(true)
})
})
// Neither "finish" nor "end" events are emitted
// when you destroy the stream. If you want those,
// call ".end()", then destroy the stream.
expect(finishListener).not.toHaveBeenCalled()
expect(endListener).not.toHaveBeenCalled()
expect(socket.writableFinished).toBe(false)

// The "end" event was never emitted so "readableEnded"
// remains false.
expect(socket.readableEnded).toBe(false)
})
13 changes: 5 additions & 8 deletions src/interceptors/Socket/MockSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ export class MockSocket extends net.Socket {
super()
this.connecting = false
this.connect()

this._final = (callback) => {
callback(null)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implement the _final method so socket writes transition the socket to the right "final" state:

  • set writableFinished to true
  • emit finish event

See: https://github.com/nodejs/node/blob/3a456c6db802b5b25594d3a9d235d4989e9f7829/lib/internal/streams/writable.js#L907

this._final() is called on prefinish.

}
}

public connect() {
Expand All @@ -46,13 +50,6 @@ export class MockSocket extends net.Socket {

public push(chunk: any, encoding?: BufferEncoding): boolean {
this.options.read(chunk, encoding)

if (chunk !== null) {
this.emit('data', chunk)
} else {
this.emit('end')
}

return true
return super.push(chunk, encoding)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call super.push() for the .push(null) to correctly transition the socket into the end state:

  • emit end event
  • set readable to false
  • set readableEnded to true

See: https://github.com/nodejs/node/blob/3a456c6db802b5b25594d3a9d235d4989e9f7829/lib/internal/streams/readable.js#L1696

}
}
38 changes: 35 additions & 3 deletions test/modules/http/compliance/http-req-write.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ it('supports Readable as the request body', async () => {

const input = ['hello', ' ', 'world', null]
const readable = new Readable({
read: async function() {
read: async function () {
await sleep(10)
this.push(input.shift())
},
Expand All @@ -119,7 +119,7 @@ it('supports Readable as the request body', async () => {
expect(await text()).toEqual(expectedBody)
})

it('calls the callback when writing an empty string', async () => {
it('calls the write callback when writing an empty string', async () => {
const request = http.request(httpServer.http.url('/resource'), {
method: 'POST',
})
Expand All @@ -132,7 +132,7 @@ it('calls the callback when writing an empty string', async () => {
expect(writeCallback).toHaveBeenCalledTimes(1)
})

it('calls the callback when writing an empty Buffer', async () => {
it('calls the write callback when writing an empty Buffer', async () => {
const request = http.request(httpServer.http.url('/resource'), {
method: 'POST',
})
Expand All @@ -145,3 +145,35 @@ it('calls the callback when writing an empty Buffer', async () => {

expect(writeCallback).toHaveBeenCalledTimes(1)
})

it('emits "finish" for a passthrough request', async () => {
const prefinishListener = vi.fn()
const finishListener = vi.fn()
const request = http.request(httpServer.http.url('/resource'))
request.on('prefinish', prefinishListener)
request.on('finish', finishListener)
request.end()

await waitForClientRequest(request)

expect(prefinishListener).toHaveBeenCalledTimes(1)
expect(finishListener).toHaveBeenCalledTimes(1)
})

it('emits "finish" for a mocked request', async () => {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mikicho, this is the failing write finish test you sent me. It's passing now.

interceptor.once('request', ({ request }) => {
request.respondWith(new Response())
})

const prefinishListener = vi.fn()
const finishListener = vi.fn()
const request = http.request(httpServer.http.url('/resource'))
request.on('prefinish', prefinishListener)
request.on('finish', finishListener)
request.end()

await waitForClientRequest(request)

expect(prefinishListener).toHaveBeenCalledTimes(1)
expect(finishListener).toHaveBeenCalledTimes(1)
})