Skip to content

Commit

Permalink
fix: Client.stream writableNeedDrain
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Nov 13, 2020
1 parent 37ecb75 commit 1be85c2
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 2 deletions.
6 changes: 6 additions & 0 deletions lib/client-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ class StreamHandler extends AsyncResource {
})

this.res = res

const needDrain = res.writableNeedDrain !== undefined
? res.writableNeedDrain
: res._writableState && res._writableState.needDrain

return needDrain !== true
}

onData (chunk) {
Expand Down
45 changes: 43 additions & 2 deletions lib/core/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -374,19 +374,37 @@ class Parser extends HTTPParser {
this.headers = null
this.shouldKeepAlive = false
this.request = null
this.paused = false
this.queue = []

this._resume = () => {
// TODO: Resume parser.
this.paused = false

while (this.queue.length) {
const [fn, ...args] = this.queue.shift()

fn.apply(this, args)

if (this.paused) {
return
}
}

socketResume(socket)
}

this._pause = () => {
// TODO: Pause parser.
this.paused = true
socketPause(socket)
}
}

[HTTPParser.kOnHeaders] (rawHeaders) {
if (this.paused) {
this.queue.push([this[HTTPParser.kOnHeaders], rawHeaders])
return
}

if (this.headers) {
Array.prototype.push.apply(this.headers, rawHeaders)
} else {
Expand All @@ -395,6 +413,11 @@ class Parser extends HTTPParser {
}

[HTTPParser.kOnExecute] (ret) {
if (this.paused) {
this.queue.push([this[HTTPParser.kOnExecute], ret])
return
}

const { upgrade, socket } = this

if (!Number.isFinite(ret)) {
Expand Down Expand Up @@ -465,6 +488,12 @@ class Parser extends HTTPParser {

[HTTPParser.kOnHeadersComplete] (versionMajor, versionMinor, rawHeaders, method,
url, statusCode, statusMessage, upgrade, shouldKeepAlive) {
if (this.paused) {
this.queue.push([this[HTTPParser.kOnHeadersComplete], versionMajor, versionMinor, rawHeaders, method,
url, statusCode, statusMessage, upgrade, shouldKeepAlive])
return
}

const { client, socket } = this

const request = client[kQueue][client[kRunningIdx]]
Expand Down Expand Up @@ -560,6 +589,11 @@ class Parser extends HTTPParser {
}

[HTTPParser.kOnBody] (chunk, offset, length) {
if (this.paused) {
this.queue.push([this[HTTPParser.kOnBody], chunk, offset, length])
return
}

const { socket, statusCode, request } = this

if (socket.destroyed) {
Expand All @@ -578,6 +612,11 @@ class Parser extends HTTPParser {
}

[HTTPParser.kOnMessageComplete] () {
if (this.paused) {
this.queue.push([this[HTTPParser.kOnMessageComplete]])
return
}

const { client, socket, statusCode, headers, upgrade, request, trailers } = this

if (socket.destroyed) {
Expand Down Expand Up @@ -785,6 +824,7 @@ function connect (client) {
}

function socketPause (socket) {
// TODO: Pause parser.
if (socket._handle && socket._handle.reading) {
socket._handle.reading = false
const err = socket._handle.readStop()
Expand All @@ -795,6 +835,7 @@ function socketPause (socket) {
}

function socketResume (socket) {
// TODO: Resume parser.
if (socket._handle && !socket._handle.reading) {
socket._handle.reading = true
const err = socket._handle.readStart()
Expand Down
1 change: 1 addition & 0 deletions lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ class Request {

onBody (chunk, offset, length) {
assert(!this.aborted)
assert(!this[kPaused])

if (this[kTimeout] && this[kTimeout].refresh) {
this[kTimeout].refresh()
Expand Down
44 changes: 44 additions & 0 deletions test/client-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -657,3 +657,47 @@ test('stream body destroyed on invalid callback', (t) => {
}
})
})

test('stream needDrain', (t) => {
t.plan(1)

const server = createServer((req, res) => {
res.end(Buffer.alloc(4096))
})
t.tearDown(server.close.bind(server))

server.listen(0, async () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(() => {
console.error(3)
client.destroy()
})

const dst = new PassThrough()
dst.pause()

while (dst.write(Buffer.alloc(4096))) {

}

const orgWrite = dst.write
dst.write = () => t.fail()
const p = client.stream({
path: '/',
method: 'GET'
}, () => {
return dst
})

setTimeout(() => {
dst.write = (...args) => {
orgWrite.call(dst, ...args)
}
dst.resume()
}, 1e3)

await p

t.pass()
})
})

0 comments on commit 1be85c2

Please sign in to comment.